1.流表
DataStream<Integer> dataStream = streamExecutionEnvironment
.socketTextStream("localhost", 9999).map(x -> {
return Integer.parseInt(x);
});
2.mysql 维表
CREATE TABLE `source5` (
`id` int(11) NOT NULL,
`user_name` varchar(19) DEFAULT NULL,
`age` int(11) DEFAULT NULL,
PRIMARY KEY (`id`)
)
TypeInformation<?>[] fieldTypes = new TypeInformation<?>[]{
BasicTypeInfo.INT_TYPE_INFO,
BasicTypeInfo.STRING_TYPE_INFO,
BasicTypeInfo.INT_TYPE_INFO
};
RowTypeInfo rowTypeInfo = new RowTypeInfo(fieldTypes);
JDBCInputFormat jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
.setDrivername("com.mysql.jdbc.Driver")
.setDBUrl("jdbc:mysql://localhost/test1")
.setUsername("root")
.setPassword("123456")
.setQuery("select * from source5")
.setRowTypeInfo(rowTypeInfo)
.finish();
StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment stableEnv = new StreamTableEnvironment(streamExecutionEnvironment, TableConfig.DEFAULT());
DataStreamSource<Row> dataSource1 = streamExecutionEnvironment.createInput(jdbcInputFormat);
3.join
stableEnv.registerDataStream("mysql", dataSource1, "id1,name,age");
stableEnv.registerDataStream("stream1", dataStream, "id2");
// stableEnv.sqlQuery("update mysql set user_name='b where id1 =410000");
Table mysql = stableEnv.sqlQuery("select * from mysql");
Table stream1 = stableEnv.scan("stream1");
Table joined = stream1.join(mysql, "id1=id2");
DataStream<Row> res = stableEnv.toAppendStream(joined, Row.class);
res.addSink(new SinkFunction<Row>() {
@Override
public void invoke(Row value) throws Exception {
logger.info(value.toString());
}
});
streamExecutionEnvironment.execute("Window WordCount");
4.在终端执行 nc -lk 9999,即可输入流数据,实现流数据与mysql数据的join
5.依赖
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.version>1.7.0</flink.version>
<java.version>1.8</java.version>
<scala.binary.version>2.11</scala.binary.version>
<maven.compiler.source>${java.version}</maven.compiler.source>
<maven.compiler.target>${java.version}</maven.compiler.target>
</properties>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-jdbc_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<!-- <dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
</dependency>-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table_${scala.binary.version}</artifactId>
<version>1.7.0</version>
</dependency>
<!-- Add connector dependencies here. They must be in the default scope (compile). -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.10_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/mysql/mysql-connector-java -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.46</version>
</dependency>
<!-- Add logging framework, to produce console output when running in the IDE. -->
<!-- These dependencies are excluded from the application JAR by default. -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.7</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
<scope>runtime</scope>
</dependency>