前言
目前想要实现flink sql数据落地到ck,可以修改jdbc connector的源码,增加ck方言,或者采用阿里提供的ck connector包,为了更好的理解flink connector的原理,这里自定义connector实现。
- 实现DynamicTableSinkFactory接口并在SPI配置文件中对工厂类进行配置
- 实现Clickhouse自己的方言ClickHouseDialect
- 实现动态表接口DynamicTableSourceFactory, DynamicTableSinkFactory
package com.sht.connector.clickhouse.table;
import com.sht.connector.clickhouse.dialect.ClickHouseDialect;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.connector.jdbc.internal.options.JdbcOptions;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.connector.format.EncodingFormat;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.factories.DynamicTableSinkFactory;
import org.apache.flink.table.factories.DynamicTableSourceFactory;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.factories.SerializationFormatFactory;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.utils.TableSchemaUtils;
import java.util.HashSet;
import java.util.Set;
public class ClickHouseDynamicTableFactory implements DynamicTableSourceFactory, DynamicTableSinkFactory {
public static final String IDENTIFIER = "clickhouse";
private static final String DRIVER_NAME = "ru.yandex.clickhouse.ClickHouseDriver";
public static final ConfigOption<String> URL = ConfigOptions
.key("url")
.stringType()
.noDefaultValue()
.withDescription("the jdbc database url.");
public static final ConfigOption<String> TABLE_NAME = ConfigOptions
.key("table-name")
.stringType()
.noDefaultValue()
.withDescription("the jdbc table name.");
public static final ConfigOption<String> USERNAME = ConfigOptions
.key("username")
.stringType()
.noDefaultValue()
.withDescription("the jdbc user name.");
public static final ConfigOption<String> PASSWORD = ConfigOptions
.key("password")
.stringType()
.noDefaultValue()
.withDescription("the jdbc password.");
public static final ConfigOption<String> FORMAT = ConfigOptions
.key("format")
.stringType()
.noDefaultValue()
.withDescription("the format.");
@Override
public String factoryIdentifier() {
return IDENTIFIER;
}
@Override
public Set<ConfigOption<?>> requiredOptions() {
Set<ConfigOption<?>> requiredOptions = new HashSet<>();
requiredOptions.add(URL);
requiredOptions.add(TABLE_NAME);
requiredOptions.add(USERNAME);
requiredOptions.add(PASSWORD);
requiredOptions.add(FORMAT);
return requiredOptions;
}
@Override
public Set<ConfigOption<?>> optionalOptions() {
return new HashSet<>();
}
@Override
public DynamicTableSource createDynamicTableSource(Context context) {
// either implement your custom validation logic here ...
final FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
final ReadableConfig config = helper.getOptions();
// validate all options
helper.validate();
JdbcOptions jdbcOptions = getJdbcOptions(config);
// get table schema
TableSchema physicalSchema = TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema());
// table source
return new ClickHouseDynamicTableSource(jdbcOptions, physicalSchema);
}
@Override
public DynamicTableSink createDynamicTableSink(Context context) {
// either implement your custom validation logic here ...
final FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
// discover a suitable decoding format
final EncodingFormat<SerializationSchema<RowData>> encodingFormat = helper.discoverEncodingFormat(
SerializationFormatFactory.class,
FactoryUtil.FORMAT);
final ReadableConfig config = helper.getOptions();
// validate all options
helper.validate();
// get the validated options
JdbcOptions jdbcOptions = getJdbcOptions(config);
// derive the produced data type (excluding computed columns) from the catalog table
final DataType dataType = context.getCatalogTable().getSchema().toPhysicalRowDataType();
// table sink
return new ClickHouseDynamicTableSink(jdbcOptions, encodingFormat, dataType);
}
private JdbcOptions getJdbcOptions(ReadableConfig readableConfig) {
final String url = readableConfig.get(URL);
final JdbcOptions.Builder builder = JdbcOptions.builder()
.setDriverName(DRIVER_NAME)
.setDBUrl(url)
.setTableName(readableConfig.get(TABLE_NAME))
.setDialect(new ClickHouseDialect());
readableConfig.getOptional(USERNAME).ifPresent(builder::setUsername);
readableConfig.getOptional(PASSWORD).ifPresent(builder::setPassword);
return builder.build();
}
}
- Source方法的构建
package com.sht.connector.clickhouse.table;
import org.apache.flink.connector.jdbc.dialect.JdbcDialect;
import org.apache.flink.connector.jdbc.internal.options.JdbcOptions;
import org.apache.flink.connector.jdbc.table.JdbcRowDataInputFormat;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.connector.source.InputFormatProvider;
import org.apache.flink.table.connector.source.ScanTableSource;
import org.apache.flink.table.types.logical.RowType;
public class ClickHouseDynamicTableSource implements ScanTableSource {
private final JdbcOptions options;
private final TableSchema tableSchema;
public ClickHouseDynamicTableSource(JdbcOptions options, TableSchema tableSchema) {
this.options = options;
this.tableSchema = tableSchema;
}
@Override
public ChangelogMode getChangelogMode() {
return ChangelogMode.insertOnly();
}
@Override
@SuppressWarnings("unchecked")
public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderContext) {
final JdbcDialect dialect = options.getDialect();
String query = dialect.getSelectFromStatement(
options.getTableName(), tableSchema.getFieldNames(), new String[0]);
final RowType rowType = (RowType) tableSchema.toRowDataType().getLogicalType();
final JdbcRowDataInputFormat.Builder builder = JdbcRowDataInputFormat.builder()
.setDrivername(options.getDriverName())
.setDBUrl(options.getDbURL())
.setUsername(options.getUsername().orElse(null))
.setPassword(options.getPassword().orElse(null))
.setQuery(query)
.setRowConverter(dialect.getRowConverter(rowType))
.setRowDataTypeInfo(runtimeProviderContext
.createTypeInformation(tableSchema.toRowDataType()));
return InputFormatProvider.of(builder.build());
}
@Override
public DynamicTableSource copy() {
return new ClickHouseDynamicTableSource(options, tableSchema);
}
@Override
public String asSummaryString() {
return "ClickHouse Table Source";
}
}
- Sink方法的构建
package com.sht.connector.clickhouse.table;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.connector.jdbc.internal.options.JdbcOptions;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.format.EncodingFormat;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.sink.SinkFunctionProvider;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.DataType;
public class ClickHouseDynamicTableSink implements DynamicTableSink {
private final JdbcOptions jdbcOptions;
private final EncodingFormat<SerializationSchema<RowData>> encodingFormat;
private final DataType dataType;
public ClickHouseDynamicTableSink(JdbcOptions jdbcOptions, EncodingFormat<SerializationSchema<RowData>> encodingFormat, DataType dataType) {
this.jdbcOptions = jdbcOptions;
this.encodingFormat = encodingFormat;
this.dataType = dataType;
}
@Override
public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
return requestedMode;
}
@Override
public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
SerializationSchema<RowData> serializationSchema = encodingFormat.createRuntimeEncoder(context, dataType);
ClickHouseSinkFunction clickHouseSinkFunction = new ClickHouseSinkFunction(jdbcOptions, serializationSchema);
return SinkFunctionProvider.of(clickHouseSinkFunction);
}
@Override
public DynamicTableSink copy() {
return new ClickHouseDynamicTableSink(jdbcOptions, encodingFormat, dataType);
}
@Override
public String asSummaryString() {
return "ClickHouse Table Sink";
}
}
- Sink端RichSinkFunction方法的实现
package com.sht.connector.clickhouse.table;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.jdbc.internal.options.JdbcOptions;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.table.data.RowData;
import ru.yandex.clickhouse.BalancedClickhouseDataSource;
import ru.yandex.clickhouse.ClickHouseConnection;
import ru.yandex.clickhouse.ClickHouseStatement;
import ru.yandex.clickhouse.domain.ClickHouseFormat;
import ru.yandex.clickhouse.settings.ClickHouseProperties;
import ru.yandex.clickhouse.settings.ClickHouseQueryParam;
import java.io.ByteArrayInputStream;
import java.sql.SQLException;
public class ClickHouseSinkFunction extends RichSinkFunction<RowData> {
private static final long serialVersionUID = 1L;
private static final String MAX_PARALLEL_REPLICAS_VALUE = "2";
private final JdbcOptions jdbcOptions;
private final SerializationSchema<RowData> serializationSchema;
private ClickHouseConnection conn;
private ClickHouseStatement stmt;
public ClickHouseSinkFunction(JdbcOptions jdbcOptions, SerializationSchema<RowData> serializationSchema) {
this.jdbcOptions = jdbcOptions;
this.serializationSchema = serializationSchema;
}
@Override
public void open(Configuration parameters) {
ClickHouseProperties properties = new ClickHouseProperties();
properties.setUser(jdbcOptions.getUsername().orElse(null));
properties.setPassword(jdbcOptions.getPassword().orElse(null));
BalancedClickhouseDataSource dataSource;
try {
if (null == conn) {
dataSource = new BalancedClickhouseDataSource(jdbcOptions.getDbURL(), properties);
conn = dataSource.getConnection();
}
} catch (SQLException e) {
e.printStackTrace();
}
}
@Override
public void invoke(RowData value, Context context) throws Exception {
byte[] serialize = serializationSchema.serialize(value);
stmt = conn.createStatement();
stmt.write().table(jdbcOptions.getTableName()).data(new ByteArrayInputStream(serialize), ClickHouseFormat.JSONEachRow)
.addDbParam(ClickHouseQueryParam.MAX_PARALLEL_REPLICAS, MAX_PARALLEL_REPLICAS_VALUE).send();
}
@Override
public void close() throws Exception {
if (stmt != null) {
stmt.close();
}
if (conn != null) {
conn.close();
}
}
}
- Clickhouse JDBC方言的定义
package com.sht.connector.clickhouse.dialect;
import com.sht.connector.clickhouse.converter.ClickHouseRowConverter;
import org.apache.flink.connector.jdbc.dialect.JdbcDialect;
import org.apache.flink.connector.jdbc.internal.converter.JdbcRowConverter;
import org.apache.flink.table.types.logical.RowType;
import java.util.Optional;
public class ClickHouseDialect implements JdbcDialect {
private static final long serialVersionUID = 1L;
@Override
public String dialectName() {
return "ClickHouse";
}
@Override
public boolean canHandle(String url) {
return url.startsWith("jdbc:clickhouse:");
}
@Override
public JdbcRowConverter getRowConverter(RowType rowType) {
return new ClickHouseRowConverter(rowType);
}
@Override
public String getLimitClause(long l) {
return "limit num : " + l;
}
@Override
public Optional<String> defaultDriverName() {
return Optional.of("ru.yandex.clickhouse.ClickHouseDriver");
}
@Override
public String quoteIdentifier(String identifier) {
return "`" + identifier + "`";
}
}
- 数据行转换方法的定义
package com.sht.connector.clickhouse.converter;
import org.apache.flink.connector.jdbc.internal.converter.AbstractJdbcRowConverter;
import org.apache.flink.table.types.logical.RowType;
/**
* @author dpf
*/
public class ClickHouseRowConverter extends AbstractJdbcRowConverter {
public ClickHouseRowConverter(RowType rowType) {
super(rowType);
}
private static final long serialVersionUID = 1L;
@Override
public String converterName() {
return "ClickHouse";
}
}
- 用到的依赖库
<dependency>
<groupId>ru.yandex.clickhouse</groupId>
<artifactId>clickhouse-jdbc</artifactId>
<version>${clickhouse-jdbc.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc_${scala.bin.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
- 测试代码
package com.sht.connector.clickhouse;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
public class TestClickHouseConnector {
public static void main(String[] args) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(env);
tableEnvironment.executeSql(" " +
" CREATE TABLE sourceTable ( " +
" a varchar, " +
" b varchar " +
" ) WITH ( " +
" 'connector.type' = 'kafka', " +
" 'connector.version' = 'universal', " +
" 'connector.topic' = 'flink_source', " +
" 'connector.startup-mode' = 'earliest-offset', " +
" 'connector.properties.zookeeper.connect' = '127.0.0.1:2181', " +
" 'connector.properties.bootstrap.servers' = '127.0.0.1:9092', " +
" 'format.type' = 'json' " +
" ) ");
tableEnvironment.executeSql("CREATE TABLE sinkTable (" +
" a String," +
" b String" +
") WITH (" +
" 'connector' = 'clickhouse'," +
" 'url' = 'jdbc:clickhouse://127.0.0.1:8123/nicetuan_webapp'," +
" 'table-name' = 'dwd_user_login'," +
" 'username' = 'default'," +
" 'password' = ''," +
" 'format' = 'json'" +
")");
tableEnvironment.executeSql(
"insert into sinkTable " +
"select * " +
"from sourceTable");
}
}
结
目前支持Flink写入Clickhouse的依赖哭比较多,如果数据格式固定,可以CSV的方式写入,如果不固定,可采用Json的方式写入。