Flink-SQL如何连接外部资源
最近项目中需要把FlinkSQL对标SparkSQL做一套可视化页面,但网上针对Flink的相关博客很少,官网的例子给的也不太全。目前大多数人给的结论就是现在Flink Table/SQL的功能还不稳定,都在等待阿里的Blink和Flink合并后在使用。这篇我想用现在最新发行版1.7.2给大家点参考demo。
目前官网支持的Connectors
**注意 :目前1.72版本的CSV只支持批处理不支持流处理,如果流处理的环境使用会报下面的错误。
Exception in thread "main"org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.DeserializationSchemaFactory' in
the classpath.
Reason: No context matches.
**
编程范式
tableEnvironment
.connect(...) // 需要传入继承ConnectorDescriptor的实现类 eg/ Kafka,FileSystem
.withFormat(...) // 需要传入继承FormatDescriptor的实现类 eg/ Json,Avor,Csv
.withSchema(...) // 需要传入new Schema() 这里边的Schema是FIink注册表的字段和类型
.inAppendMode()
/** 支持三种格式
inAppendMode(只支持动态表的insert过来的数据)
inRetractMode (支持动态刷新数据表 包括update 和 delete,但性能会受影响)
inUpsertMode (因为操作的是单条数据,所以性能高于inRetractMode)
**/
.registerTableSource("MyTable") // 注册的表名称
Example1 Kafka连接器 JSON -> CSV
package mqz.connector;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.descriptors.Json;
import org.apache.flink.table.descriptors.Kafka;
import org.apache.flink.table.descriptors.Schema;
import org.apache.flink.table.sinks.CsvTableSink;
import org.apache.flink.types.Row;
/**
* @author maqingze
* @version v1.0
* @date 2019/3/7 11:24
*/
public class KafkaConnectorFormatJSON2CSV {
private final static String SOURCE_TOPIC = "source";
private final static String SINK_TOPIC = "sink";
private final static String ZOOKEEPER_CONNECT = "hadoop003:2181,hadoop004:2181";
private final static String GROUP_ID = "group1";
private final static String METADATA_BROKER_LIST = "hadoop003:9092,hadoop004:9092";
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env);
tEnv.connect(
new Kafka()
.version("0.10")
.topic(SOURCE_TOPIC)
.startFromEarliest()
.property("zookeeper.connect", ZOOKEEPER_CONNECT)
.property("bootstrap.servers", METADATA_BROKER_LIST)
)
.withFormat(
new Json()
.schema(
org.apache.flink.table.api.Types.ROW(
new String[]{"id", "product", "amount"},
new TypeInformation[]{
org.apache.flink.table.api.Types.LONG()
, org.apache.flink.table.api.Types.STRING()
, org.apache.flink.table.api.Types.INT()
}))
.failOnMissingField(true) // optional: flag whether to fail if a field is missing or not, false by default
)
.withSchema(
new Schema()
.field("id", Types.LONG)
.field("product", Types.STRING)
.field("amount", Types.INT)
)
.inAppendMode()
.registerTableSource("sourceTable");
Table result = tEnv.sqlQuery("select * from sourceTable ");
DataStream<Row> rowDataStream = tEnv.toAppendStream(result, Row.class);
rowDataStream.print();
CsvTableSink sink = new CsvTableSink(
"D:\\Aupload\\flink\\sink.csv", // 输出路径
"|", // 字段分隔符
1, // 写入的文件个数
FileSystem.WriteMode.OVERWRITE); // 是否覆盖原文件 还有NO_OVERWRITE模式
tEnv.registerTableSink(
"csvOutputTable",
new String[]{"f0", "f1", "f2"},
new TypeInformation[]{Types.LONG, Types.STRING, Types.INT},
sink);
result.insertInto("csvOutputTable");
env.execute(" tesst kafka connector demo");
}
}
Example2 Kafka连接器 JSON -> JSON
package mqz.connector;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.descriptors.Json;
import org.apache.flink.table.descriptors.Kafka;
import org.apache.flink.table.descriptors.Schema;
import org.apache.flink.table.sinks.CsvTableSink;
import org.apache.flink.types.Row;
/**
* @author maqingze
* @version v1.0
* @date 2019/3/7 11:24
*/
public class KafkaConnectorFormatJSON2JSON {
private final static String SOURCE_TOPIC = "source";
private final static String SINK_TOPIC = "sink";
private final static String ZOOKEEPER_CONNECT = "hadoop003:2181,hadoop004:2181";
private final static String GROUP_ID = "group1";
private final static String METADATA_BROKER_LIST = "hadoop003:9092,hadoop004:9092";
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env);
tEnv.connect(
new Kafka()
.version("0.10")
.topic(SOURCE_TOPIC)
.startFromEarliest()
.property("zookeeper.connect", ZOOKEEPER_CONNECT)
.property("bootstrap.servers", METADATA_BROKER_LIST)
)
.withFormat(
new Json()
.schema(
org.apache.flink.table.api.Types.ROW(
new String[]{"id", "product", "amount"},
new TypeInformation[]{
org.apache.flink.table.api.Types.LONG()
, org.apache.flink.table.api.Types.STRING()
, org.apache.flink.table.api.Types.INT()
}))
.failOnMissingField(true) // optional: flag whether to fail if a field is missing or not, false by default
)
.withSchema(
new Schema()
.field("id", Types.LONG)
.field("product", Types.STRING)
.field("amount", Types.INT)
)
.inAppendMode()
.registerTableSource("sourceTable");
tEnv.connect(
new Kafka()
.version("0.10") // required: valid connector versions are
// "0.8", "0.9", "0.10", "0.11", and "universal"
.topic(SINK_TOPIC) // required: topic name from which the table is read
// optional: connector specific properties
.property("zookeeper.connect", ZOOKEEPER_CONNECT)
.property("bootstrap.servers", METADATA_BROKER_LIST)
.property("group.id", GROUP_ID)
// optional: select a startup mode for Kafka offsets
.startFromEarliest()
.sinkPartitionerFixed() // each Flink partition ends up in at-most one Kafka partition (default)
).withFormat(
new Json()
.schema(
org.apache.flink.table.api.Types.ROW(
new String[]{"yid", "yproduct", "yamount"},
new TypeInformation[]{
org.apache.flink.table.api.Types.LONG()
, org.apache.flink.table.api.Types.STRING()
, org.apache.flink.table.api.Types.INT()
}))
.failOnMissingField(true) // optional: flag whether to fail if a field is missing or not, false by default
)
.withSchema(
new Schema()
.field("id", Types.LONG)
.field("product", Types.STRING)
.field("amount", Types.INT)
)
.inAppendMode()
.registerTableSink("sinkTable");
tEnv.sqlUpdate("insert into sinkTable(id,product,amount) select id,product,33 from sourceTable ");
env.execute(" tesst kafka connector demo");
}
}