目前业务上有同步数据到sqlServer的需求,但是flink1.12.1版本的JdbcDialects不支持SqlServerDialect,
科学上网后发现袋鼠云的flinkStreamSql已经有支持sqlserver,那就开始动手,参考实现一波
1、参考MySQLDialect及flinkStreamSql 实现SqlServerDialect
主要实现getUpsertStatement的方法,本来以为能直接copy一波flinkStreamSql 的实现,结果发现
报错 SQL statement must not contain ? character.
查看源码发现, flink在构建mysql的Statement,是先把需要替换的字段前面拼接了 : fieldNames,然后在org.apache.flink.connector.jdbc.statement.FieldNamedPreparedStatementImpl类的parseNamedStatement 替换成 ?号, 既然如此,就针对了buildDualQueryStatement进行修改
完整的SqlServerDialect文件
package org.apache.flink.connector.jdbc.dialect;
/**
* SqlServerDialect
*
* @author zhanjian@pcuuu.com
* @date 2021/4/20 10:13
*/
public class SqlServerDialect extends AbstractDialect {
private static final long serialVersionUID = 1L;
// Define MAX/MIN precision of TIMESTAMP type according to Mysql docs:
// https://dev.mysql.com/doc/refman/8.0/en/fractional-seconds.html
private static final int MAX_TIMESTAMP_PRECISION = 6;
private static final int MIN_TIMESTAMP_PRECISION = 1;
// Define MAX/MIN precision of DECIMAL type according to Mysql docs:
// https://dev.mysql.com/doc/refman/8.0/en/fixed-point-types.html
private static final int MAX_DECIMAL_PRECISION = 65;
private static final int MIN_DECIMAL_PRECISION = 1;
@Override
public String dialectName() {
return "SqlServer";
}
@Override
public boolean canHandle(String url) {
return url.startsWith("jdbc:jtds:");
}
@Override
public JdbcRowConverter getRowConverter(RowType rowType) {
return new SqlServerConverter(rowType);
}
@Override
public Optional<String> defaultDriverName() {
return Optional.of("net.sourceforge.jtds.jdbc.Driver");
}
@Override
public String quoteIdentifier(String identifier) {
return identifier;
}
@Override
public Optional<String> getUpsertStatement(
String tableName, String[] fieldNames, String[] uniqueKeyFields) {
StringBuilder sb = new StringBuilder();
sb.append(
"MERGE INTO "
+ tableName
+ " T1 USING "
+ "("
+ buildDualQueryStatement(fieldNames)
+ ") T2 ON ("
+ buildConnectionConditions(uniqueKeyFields)
+ ") ");
String updateSql = buildUpdateConnection(fieldNames, uniqueKeyFields, true);
if (StringUtils.isNotEmpty(updateSql)) {
sb.append(" WHEN MATCHED THEN UPDATE SET ");
sb.append(updateSql);
}
sb.append(
" WHEN NOT MATCHED THEN "
+ "INSERT ("
+ Arrays.stream(fieldNames)
.map(this::quoteIdentifier)
.collect(Collectors.joining(","))
+ ") VALUES ("
+ Arrays.stream(fieldNames)
.map(col -> "T2." + quoteIdentifier(col))
.collect(Collectors.joining(","))
+ ")");
sb.append(";");
return Optional.of(sb.toString());
}
/**
* build T1."A"=T2."A" or T1."A"=nvl(T2."A",T1."A")
*
* @param fieldNames
* @param uniqueKeyFields
* @param allReplace
* @return
*/
private String buildUpdateConnection(
String[] fieldNames, String[] uniqueKeyFields, boolean allReplace) {
List<String> uniqueKeyList = Arrays.asList(uniqueKeyFields);
return Arrays.stream(fieldNames)
.filter(col -> !uniqueKeyList.contains(col))
.map(
col -> {
return allReplace
? quoteIdentifier("T1")
+ "."
+ quoteIdentifier(col)
+ " ="
+ quoteIdentifier("T2")
+ "."
+ quoteIdentifier(col)
: quoteIdentifier("T1")
+ "."
+ quoteIdentifier(col)
+ " =ISNULL("
+ quoteIdentifier("T2")
+ "."
+ quoteIdentifier(col)
+ ","
+ quoteIdentifier("T1")
+ "."
+ quoteIdentifier(col)
+ ")";
})
.collect(Collectors.joining(","));
}
private String buildConnectionConditions(String[] uniqueKeyFields) {
return Arrays.stream(uniqueKeyFields)
.map(col -> "T1." + quoteIdentifier(col) + "=T2." + quoteIdentifier(col))
.collect(Collectors.joining(","));
}
/**
* build select sql , such as (SELECT ? "A",? "B" FROM DUAL)
*
* @param column destination column
* @return
*/
public String buildDualQueryStatement(String[] column) {
StringBuilder sb = new StringBuilder("SELECT ");
String collect =
Arrays.stream(column)
.map(col -> ":" + quoteIdentifier(col) + " " + quoteIdentifier(col))
.collect(Collectors.joining(", "));
sb.append(collect);
return sb.toString();
}
@Override
public int maxDecimalPrecision() {
return MAX_DECIMAL_PRECISION;
}
@Override
public int minDecimalPrecision() {
return MIN_DECIMAL_PRECISION;
}
@Override
public int maxTimestampPrecision() {
return MAX_TIMESTAMP_PRECISION;
}
@Override
public int minTimestampPrecision() {
return MIN_TIMESTAMP_PRECISION;
}
@Override
public List<LogicalTypeRoot> unsupportedTypes() {
// The data types used in Mysql are list at:
// https://dev.mysql.com/doc/refman/8.0/en/data-types.html
// TODO: We can't convert BINARY data type to
// PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO in
// LegacyTypeInfoDataTypeConverter.
return Arrays.asList(
LogicalTypeRoot.BINARY,
LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE,
LogicalTypeRoot.TIMESTAMP_WITH_TIME_ZONE,
LogicalTypeRoot.INTERVAL_YEAR_MONTH,
LogicalTypeRoot.INTERVAL_DAY_TIME,
LogicalTypeRoot.ARRAY,
LogicalTypeRoot.MULTISET,
LogicalTypeRoot.MAP,
LogicalTypeRoot.ROW,
LogicalTypeRoot.DISTINCT_TYPE,
LogicalTypeRoot.STRUCTURED_TYPE,
LogicalTypeRoot.NULL,
LogicalTypeRoot.RAW,
LogicalTypeRoot.SYMBOL,
LogicalTypeRoot.UNRESOLVED);
}
2、另外参考MySQLRowConverter另外实现一个SqlServerConverter
最后替换原有的flink-jar包后,就可以用类似flink定义mysql的ddl进行定义表了
注意url写法为:jdbc:jtds:sqlserver://xxx:1433;databaseName=master;
[flinkStreamSQL链接] https://github.com/DTStack/flinkStreamSQL/blob/1.11_release/sqlserver/sqlserver-sink/src/main/java/com/dtstack/flink/sql/sink/sqlserver/SqlserverDialect.java