FlinkSQL 语法扩展
参考flink-sql-parser模块了解下Flink如何扩展Calcite语法,创建空项目进行语法扩展。首先,拷贝codegen文件夹下的内容,调整生成解析器实现类包名称和类名称,删除imports中的内容及Flink引入的解析方法。然后,将pom中javcc编译、freemarker相关的插件全部拷贝过来。
DQL
show tables
扩展show tables语法,当匹配到show tables语句后最终生成SqlShowTables。因此,需要扩展SqlNode构建SqlShowTables,同时从parserImpls.ftl中定义解析规则,还要从Parser.tdd中增加TABLES 关键字不然定义的规则无法识别,同时statementParserMethods中引入解析规则方法。
扩展SqlNode
规则匹配成功返回一个SqlShowTables节点,作为解析树中的sqlnode。
package cn.todd.flink.sql.parser.dql;
// show tables 语句解析后生成的SqlNode
public class SqlShowTables extends SqlCall {
public static final SqlSpecialOperator OPERATOR = new SqlSpecialOperator("SHOW TABLES", SqlKind.OTHER);
public SqlShowTables(SqlParserPos pos) {
super(pos);
}
@Override
public SqlOperator getOperator() {
return OPERATOR;
}
@Override
public List<SqlNode> getOperandList() {
return Collections.emptyList();
}
@Override
public void unparse(
SqlWriter writer,
int leftPrec,
int rightPrec) {
writer.keyword("SHOW TABLES");
}
}
定义规则
从parserImpls.ftl定义解析show tables的规则,包含了方法名及对应的规则。 如果javacc不想花太多时间系统学习的,可以根据生成的SqlParserImpl类生成的方法逆向学习。
SqlShowTables()理解成包裹解析规则的方法,输出的字符匹配 show tables关键字,如果匹配到则返回SqlShowTables(Sqlnode)。
/**
* Parse a "Show Tables" metadata query command.
* 匹配 SHOW TABLES 关键字,如果匹配到返回SqlShowTables sqlNode
*/
SqlShowTables SqlShowTables() :
{
}
{
<SHOW> <TABLES>
{
return new SqlShowTables(getPos());
}
}
应用规则
SHOW关键字已经从Parser.jj文件中定义,所以需要扩展TABLES关键字才能正确匹配SHOW TABLES。Parser.tdd中的扩展:
- 新增TABLES关键字。
- imports增加SqlShowTables类。因为sql语法解析全部由SqlParserImpl类完成,SqlShowTables()返回值为自己定义的SqlShowTables。
- statementParserMethods增加定义的规则方法。新规则给Parser使用。
keywords: ["TABLES"]
imports: [
"cn.todd.flink.sql.parser.dql.SqlShowTables"
]
statementParserMethods:[
"SqlShowTables()"
]
执行mvn -X clean compile
生成定义的解析类SqlParserImpl.java。重点看生成的SqlShowTables(),词法解析器成功消费两个连续的TOKEN SHOW``TABLES
则创建SqlShowTables
。
/**
* Parse a "Show Tables" metadata query command.
* 匹配 SHOW TABLES 关键字,如果匹配到返回SqlShowTables sqlNode
*/
final public SqlShowTables SqlShowTables() throws ParseException {
jj_consume_token(SHOW);
jj_consume_token(TABLES);
{if (true) return new SqlShowTables(getPos());}
throw new Error("Missing return statement in function");
}
测试解析
protected SqlNode parseStmtAndHandleEx(String sql) {
final SqlNode sqlNode;
try {
sqlNode = getSqlParser(sql).parseStmt();
} catch (SqlParseException e) {
throw new RuntimeException("Error while parsing SQL: " + sql, e);
}
return sqlNode;
}
private SqlParser getSqlParser(String sql) {
return SqlParser.create(sql, SqlParser.configBuilder()
//myself SqlParserImpl
.setParserFactory(SqlParserImpl.FACTORY)
.setQuoting(Quoting.BACK_TICK)
.setUnquotedCasing(Casing.UNCHANGED)
.setQuotedCasing(Casing.UNCHANGED)
.build());
}
public class dqlStatementTest extends BaseParser {
@Test
public void testShowTables() throws SqlParseException {
String sql = "show tables";
SqlNode sqlNode = parseStmtAndHandleEx(sql);
Assert.assertTrue(sqlNode instanceof SqlShowTables);
}
}
show current catalog(database)
解析show current catalog,show current database。
扩展SqlNode
同org.apache.flink.sql.parser.dql.SqlShowCurrentCatalog,
同org.apache.flink.sql.parser.dql.SqlShowCurrentDatabase
定义规则
( <TokenA> do.. | <TokenB> do.. ) javacc if语法格式。
SqlCall SqlShowCurrentCatalogOrDatabase() :
{
}
{
<SHOW> <CURRENT> ( <CATALOG>
{
return new SqlShowCurrentCatalog(getPos());
}
| <DATABASE>
{
return new SqlShowCurrentDatabase(getPos());
}
)
}
应用规则
Parser.tdd中的扩展:
- import: ["xxx.SqlShowCurrentCatalog" "xxx.SqlShowCurrentDatabase"]
- statementParserMethods:[ "SqlShowCurrentCatalogOrDatabase()" ]
执行mvn -X clean compile
生成定义的解析类SqlParserImpl.java,看 SqlShowCurrentCatalogOrDatabase()方法具体的解析流程。
/**
* Parse a "show current catalog or show current database" metadata query command.
*
*/
final public SqlCall SqlShowCurrentCatalogOrDatabase() throws ParseException {
// 连续消费 SHOW CURRENT
jj_consume_token(SHOW);
jj_consume_token(CURRENT);
// 下一个Token如果是CATALOG 生成SqlShowCurrentDatabase,否则生成SqlShowCurrentDatabase
switch ((jj_ntk==-1)?jj_ntk():jj_ntk) {
case CATALOG:
jj_consume_token(CATALOG);
{if (true) return new SqlShowCurrentCatalog(getPos());}
break;
case DATABASE:
jj_consume_token(DATABASE);
{if (true) return new SqlShowCurrentDatabase(getPos());}
break;
default:
jj_la1[22] = jj_gen;
jj_consume_token(-1);
throw new ParseException();
}
throw new Error("Missing return statement in function");
}
测试解析
@Test
public void testShowCurrentCatalogOrDatabase() throws SqlParseException {
String catalogSql = "show current catalog";
SqlNode catalogSqNode = parseStmtAndHandleEx(catalogSql);
Assert.assertTrue(catalogSqNode instanceof SqlShowCurrentCatalog);
String databaseSql = "show current database";
SqlNode databaseSqlNode = parseStmtAndHandleEx(databaseSql);
Assert.assertTrue(databaseSqlNode instanceof SqlShowCurrentDatabase);
}
describe catalog
扩展SqlNode
同org.apache.flink.sql.parser.dql.SqlDescribeCatalog
定义规则
SimpleIdentifier()规则为parser.jj已经定义,DESCRIBE CATALOG后边提取catalogName,并标识解析位置getPos();
/**
* Parse describe catalog xx. SimpleIdentifier() defined in parser.jj
*/
SqlDescribeCatalog SqlDescribeCatalog() :
{
SqlIdentifier catalogName;
SqlParserPos pos;
}
{
<DESCRIBE> <CATALOG> { pos = getPos();}
catalogName = SimpleIdentifier()
{
return new SqlDescribeCatalog(pos, catalogName);
}
}
应用规则
Parser.tdd中的扩展:xx 为包名
- import: [ "xxx.SqlDescribeCatalog"]
- statementParserMethods:[ "SqlDescribeCatalog()" ]
执行mvn -X clean compile
生成定义的解析类SqlParserImpl.java,查看 SqlDescribeCatalog()方法具体的解析流程。
/**
*
* Parse describe catalog xx. SimpleIdentifier() defined in parser.jj
*/
final public SqlDescribeCatalog SqlDescribeCatalog() throws ParseException {
SqlIdentifier catalogName;
SqlParserPos pos;
jj_consume_token(DESCRIBE);
jj_consume_token(CATALOG);
// 匹配 DESCRIBE CATALOG后,提取pos,解析出catalogName
pos = getPos();
catalogName = SimpleIdentifier();
{if (true) return new SqlDescribeCatalog(pos, catalogName);}
throw new Error("Missing return statement in function");
}
测试解析
@Test
public void testDescribeCatalog() throws SqlParseException {
String sql = "describe catalog testCatalog";
SqlNode sqlNode = parseStmtAndHandleEx(sql);
Assert.assertTrue(sqlNode instanceof SqlDescribeCatalog);
String catalogName = ((SqlDescribeCatalog) sqlNode).getCatalogName();
Assert.assertEquals(catalogName,"testCatalog");
}
DDL
period for system
首先实现最简单的DDL语句,只包含对表名、列名、类型、属性、维表标识字段的解析,有了最初的框架后扩展watermark字段、计算列、数据类型等语法规则。period for system
是Blink最初的维表标识字段,用来区分源表和结果表。语法结构:
CREATE TABLE user_info (
userId BIGINT,
userName VARCHAR(10),
userAge BIGIN
PERIOD FOR SYSTEM_TIME
) WITH (
'connector' = 'kafka',
'properties.bootstrap.servers' = 'localhost:9092',
'topic' = 'tp01',
'format' = 'json',
'scan.startup.mode' = 'latest-offset'
);
扩展SqlNode
- 参考org.apache.flink.sql.parser.ddl.SqlCreateTable实现,侧重于SQL解析部分。只保留最基础的属性信息。
- 未实现ExtendedSqlNode#validate,暂时不需要对sqlNode进行验证。Flink执行时会对ExtendedSqlNode进行验证,具体查看org.apache.flink.table.planner.calcite.FlinkPlannerImpl#validate。
- unparse方法输出SqlNode具体内容, 如果不重写的话DEBUG或者toString时看不到具体sql语句。
public class SqlCreateTable extends SqlCreate {
public static final SqlSpecialOperator OPERATOR = new SqlSpecialOperator("CREATE TABLE", SqlKind.CREATE_TABLE);
private final SqlIdentifier tableName;
private final SqlNodeList columnList;
private final SqlNodeList propertyList;
private final boolean sideFlag;
public SqlCreateTable(
SqlParserPos pos,
SqlIdentifier tableName,
SqlNodeList columnList,
SqlNodeList propertyList,
boolean sideFlag) {
super(OPERATOR, pos, false, false);
this.tableName = requireNonNull(tableName, "Table name is missing");
this.columnList = requireNonNull(columnList, "Column list should not be null");
this.propertyList = propertyList;
this.sideFlag = sideFlag;
}
@Override
public SqlOperator getOperator() {
return OPERATOR;
}
@Override
public List<SqlNode> getOperandList() {
return ImmutableNullableList.of(tableName, columnList, propertyList);
}
...........
/**
* Table creation context.
*/
public static class TableCreationContext {
public List<SqlNode> columnList = new ArrayList<>();
public boolean sideFlag;
}
}
参考引入:org.apache.flink.sql.parser.ddl.SqlTableColumn代表解析后的字段,包含名称和类型。
参考引入:org.apache.flink.sql.parser.ddl.SqlTableOption 代表属性值
创建FlinkSqlDataTypeSpec代表数据类型,使用calcite原始的类型,不做任何数据类型的扩展。
public class FlinkSqlDataTypeSpec extends SqlDataTypeSpec {
public FlinkSqlDataTypeSpec(SqlIdentifier typeName, int precision, int scale,
String charSetName, TimeZone timeZone,
SqlParserPos pos) {
super(typeName, precision, scale, charSetName, timeZone, pos);
}
public FlinkSqlDataTypeSpec(SqlIdentifier collectionsTypeName, SqlIdentifier typeName, int precision, int scale,
String charSetName, SqlParserPos pos) {
super(collectionsTypeName, typeName, precision, scale, charSetName, pos);
}
public FlinkSqlDataTypeSpec(SqlIdentifier collectionsTypeName, SqlIdentifier typeName, int precision, int scale,
String charSetName, TimeZone timeZone, Boolean nullable,
SqlParserPos pos) {
super(collectionsTypeName, typeName, precision, scale, charSetName, timeZone, nullable, pos);
}
public FlinkSqlDataTypeSpec(SqlIdentifier collectionsTypeName, SqlIdentifier typeName,
SqlIdentifier baseTypeName, int precision, int scale, String charSetName,
TimeZone timeZone, Boolean nullable, SqlParserPos pos) {
super(collectionsTypeName, typeName, baseTypeName, precision, scale, charSetName, timeZone, nullable, pos);
}
}
定义规则
围绕create table ..with()结构进行匹配。TableCreationContext
对象封装相关属性,在SqlCreateTable
中定义。
// 解析create table语法方法,calcite规定参数必须为(Span s, boolean replace)
SqlCreate SqlCreateTable(Span s, boolean replace) :
{ // 定义相关属性,在解析时进行赋值
final SqlParserPos startPos = s.pos();
SqlIdentifier tableName;
SqlNodeList columnList = SqlNodeList.EMPTY;
SqlNodeList propertyList = SqlNodeList.EMPTY;
SqlParserPos pos = startPos;
boolean sideFlag = false;
}
{
<TABLE>
tableName = CompoundIdentifier()
//<LPAREN> 左括号 <RPAREN>右括号
<LPAREN> { pos = getPos(); TableCreationContext ctx = new TableCreationContext();}
TableColumn(ctx)
(
// <COMMA> 表示逗号, 属性列可以有多个使用 ()*表示
<COMMA> TableColumn(ctx)
)*
{
pos = pos.plus(getPos());
// 根据解析出的columnList为SqlCreate绑定columnList信息
columnList = new SqlNodeList(ctx.columnList, pos);
sideFlag = ctx.sideFlag;
}
<RPAREN>
[
<WITH>
// 解析出表中的属性值
propertyList = TableProperties()
]
{
return new SqlCreateTable(startPos.plus(getPos()),
tableName,
columnList,
propertyList,
sideFlag);
}
}
/**
* 解析表字段列.
* LOOKAHEAD(3)规则查看https://blog.csdn.net/qingmengwuhen1/article/details/83313303
*
*/
void TableColumn(TableCreationContext context) :
{
}
{
// 向下扫描3个token值,进行正确匹配,用来解决选择点冲突
// 列可能是正常列或者维表标识,Flink还定义了watermark相关规则
(LOOKAHEAD(3)
TableColumn2(context.columnList)
|
context.sideFlag = SideFlag()
)
}
// 对普通列的解析,e.g: name varchar(10)
void TableColumn2(List<SqlNode> list) :
{
SqlIdentifier name;
SqlDataTypeSpec type;
}
{
// 复合类型解析xx.xxx.xxx格式
name = CompoundIdentifier()
type = FlinkDataType()
{
SqlTableColumn tableColumn = new SqlTableColumn(name, type, getPos());
list.add(tableColumn);
}
}
// 引入对类型规则的解析
SqlDataTypeSpec FlinkDataType() :
{
final SqlIdentifier typeName;
int scale = -1;
int precision = -1;
String charSetName = null;
boolean nullable = true;
final Span s;
}
{
typeName = FlinkTypeName() {
s = span();
}
[ // 类型的精确度,egg:(1024) ,(10,12)
<LPAREN>
precision = UnsignedIntLiteral()
[
<COMMA>
scale = UnsignedIntLiteral()
]
<RPAREN>
]
{
// 返回对数据类型节点
return new FlinkSqlDataTypeSpec(typeName,
precision,
scale,
charSetName,
null,
s.end(this));
}
}
// 对类型名称的解析,egg: bigint,varchar
SqlIdentifier FlinkTypeName() :
{
final SqlTypeName sqlTypeName;
final SqlIdentifier typeName;
final Span s = Span.of();
}
{
(
LOOKAHEAD(2)
// Types used for JDBC and ODBC scalar conversion function
sqlTypeName = SqlTypeName(s)
{
typeName = new SqlIdentifier(sqlTypeName.name(), s.end(this));
}
|
typeName = CompoundIdentifier() {
throw new ParseException("UDT in DDL is not supported yet.");
}
)
{
return typeName;
}
}
/**
* 维表标识
*/
boolean SideFlag() :
{
SqlParserPos pos;
SqlIdentifier columnName;
}
{
<PERIOD> { pos = getPos(); } <FOR> <SYSTEM_TIME> { return true; }
|
{ return false; }
}
/** Parse a table properties. */
SqlNodeList TableProperties():
{
SqlNode property;
final List<SqlNode> proList = new ArrayList<SqlNode>();
final Span span;
}
{
<LPAREN> { span = span(); }
[
// 对具体某一个KEY VALUE的解析
property = TableOption()
{
proList.add(property);
}
( // 属性可以有多个,使用逗号分隔
<COMMA> property = TableOption()
{
proList.add(property);
}
)*
]
<RPAREN>
{ return new SqlNodeList(proList, span.end(this)); }
}
SqlNode TableOption() :
{
SqlNode key;
SqlNode value;
SqlParserPos pos;
}
{
key = StringLiteral()
// TODO SqlParserPos位置有啥要求?
{ pos = getPos(); }
// <EQ> 等号
<EQ> value = StringLiteral()
{
return new SqlTableOption(key, value, getPos());
}
}
应用规则
parser.tdd
引入扩展的类以及规则:
- imports导入相关类:
imports: [
"org.apache.calcite.sql.SqlCreate"
"cn.todd.flink.sql.parser.ddl.SqlCreateTable"
"cn.todd.flink.sql.parser.ddl.SqlCreateTable.TableCreationContext"
"cn.todd.flink.sql.parser.ddl.SqlTableColumn"
"cn.todd.flink.sql.parser.ddl.SqlTableOption"
"cn.todd.flink.sql.parser.FlinkSqlDataTypeSpec"
]
- createStatementParserMethods引入对create table解析的规则方法。
# List of methods for parsing extensions to "CREATE [OR REPLACE]" calls.
# Each must accept arguments "(SqlParserPos pos, boolean replace)".
createStatementParserMethods: [
"SqlCreateTable"
]
执行mvn -X clean compile
生成定义的解析类SqlParserImpl.java,查看方法具体执行逻辑。编译失败时,根据提示修改parserImpls.ftl中的内容,提示信息通常为parser.jj的具体行,可以直接打开generated-sources生成的parser.jj文件内容查看。
测试解析
public class ddlStatementTest extends BaseParser {
private final static Logger LOG = LoggerFactory.getLogger(ddlStatementTest.class);
@Test
public void testSimpleCreateTable() throws SqlParseException {
String ddlSql = "CREATE TABLE abc.user_info (\n"
+ " userId BIGINT,\n"
+ " a.b.userName VARCHAR(10),\n"
+ " userAge BIGINT,"
+ " PERIOD FOR SYSTEM_TIME"
+ ") WITH (\n"
+ " 'connector' = 'kafka',\n"
+ " 'properties.bootstrap.servers' = 'localhost:9092',\n"
+ " 'topic' = 'tp01',\n"
+ " 'format' = 'json',\n"
+ " 'scan.startup.mode' = 'latest-offset'\n"
+ ")";
SqlNode ddlSqlNode = parseStmtAndHandleEx(ddlSql);
Assert.assertTrue(ddlSqlNode instanceof SqlCreateTable);
SqlCreateTable createTable = (SqlCreateTable) ddlSqlNode;
SqlNodeList columns = createTable.getColumnList();
System.out.println(String.format("tableName: %s",createTable.getTableName()));
for (SqlNode column : columns) {
SqlTableColumn tableColumn = (SqlTableColumn) column;
String columnName = tableColumn.getName().toString();
String typeName = tableColumn.getType().getTypeName().getSimple();
System.out.println(String.format("columnName: %s, typeName: %s", columnName, typeName));
}
SqlNodeList properties = createTable.getPropertyList();
for (SqlNode sqlNode: properties) {
SqlNode key = ((SqlTableOption) sqlNode).getKey();
SqlNode value = ((SqlTableOption) sqlNode).getValue();
System.out.println(String.format("properties: key:%s, value:%s ", key, value));
}
boolean sideFlag = createTable.isSideFlag();
System.out.println(String.format("sideFlag: %s", sideFlag));
}
}
Computed Column
扩展SqlNode
SqlTableColumn
增加表达式参数。
public class SqlTableColumn extends SqlCall {
private static final SqlSpecialOperator OPERATOR = new SqlSpecialOperator("COLUMN_DECL", SqlKind.COLUMN_DECL);
private SqlIdentifier name;
private SqlDataTypeSpec type;
private SqlNode expr;
public SqlTableColumn(SqlIdentifier name,
SqlDataTypeSpec type,
SqlParserPos pos) {
super(pos);
this.name = requireNonNull(name, "Column name should not be null");
this.type = requireNonNull(type, "Column type should not be null");
}
public SqlTableColumn(SqlIdentifier name,
SqlNode expr,
SqlParserPos pos) {
super(pos);
this.name = requireNonNull(name, "Column name should not be null");
this.expr = requireNonNull(expr, "Column expression should not be null");
}
@Override
public SqlOperator getOperator() {
return OPERATOR;
}
@Override
public List<SqlNode> getOperandList() {
return isGenerated() ?
ImmutableNullableList.of(name, expr) :
ImmutableNullableList.of(name, type);
}
public boolean isGenerated() {
return expr != null;
}
@Override
public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
this.name.unparse(writer, leftPrec, rightPrec);
if (isGenerated()) {
writer.keyword("AS");
this.expr.unparse(writer, leftPrec, rightPrec);
} else {
writer.print(" ");
this.type.unparse(writer, leftPrec, rightPrec);
}
}
public SqlNode getExpr() {
return expr;
}
public SqlIdentifier getName() {
return name;
}
public void setName(SqlIdentifier name) {
this.name = name;
}
public SqlDataTypeSpec getType() {
return type;
}
public void setType(SqlDataTypeSpec type) {
this.type = type;
}
}
定义规则
void TableColumn(TableCreationContext context) :
{
}
{
(LOOKAHEAD(3)
TableColumn2(context.columnList)
|
ComputedColumn(context) // 新增规则
|
context.sideFlag = SideFlag()
)
}
/**
* 支持计算列 egg: proctime as proctime()
*/
void ComputedColumn(TableCreationContext context) :
{
SqlIdentifier identifier;
SqlNode expr;
SqlParserPos pos;
}
{
identifier = SimpleIdentifier() {pos = getPos();}
<AS>
// 解析非查询的行表达式
expr = Expression(ExprContext.ACCEPT_NON_QUERY)
{
SqlTableColumn computedColumn = new SqlTableColumn(identifier, expr, getPos());
context.columnList.add(computedColumn);
}
}
应用规则
parser.tdd不需要做变更。
测试解析
@Test
public void testCreateTableContainsExpr() throws SqlParseException {
String ddlSql = "CREATE TABLE user_info (\n"
+ " userId BIGINT,\n"
+ " userName VARCHAR(10),\n"
+ " proctime AS xxxx() \n"
+ ") WITH (\n"
+ " 'connector' = 'kafka',\n"
+ " 'properties.bootstrap.servers' = 'localhost:9092',\n"
+ " 'topic' = 'tp01',\n"
+ " 'format' = 'json',\n"
+ " 'scan.startup.mode' = 'latest-offset'\n"
+ ")";
SqlNode ddlSqlNode = parseStmtAndHandleEx(ddlSql);
Assert.assertTrue(ddlSqlNode instanceof SqlCreateTable);
SqlCreateTable createTable = (SqlCreateTable) ddlSqlNode;
SqlNodeList columns = createTable.getColumnList();
System.out.println(String.format("tableName: %s",createTable.getTableName()));
for (SqlNode column : columns) {
SqlTableColumn tableColumn = (SqlTableColumn) column;
if (!tableColumn.isGenerated()){
String columnName = tableColumn.getName().toString();
String typeName = tableColumn.getType().getTypeName().getSimple();
System.out.println(String.format("columnName: %s, typeName: %s", columnName, typeName));
} else {
String name = tableColumn.getName().toString();
SqlNode expr = tableColumn.getExpr();
System.out.println(String.format("name: %s, expr: %s", name, expr));
}
}
}
watermark for xx as ..
扩展SqlNode
新增SqlWatermark类,同org.apache.flink.sql.parser.ddl.SqlWatermark
SqlCreateTable新增SqlWatermark属性。
TableCreationContext新增SqlWatermark属性。
定义规则
/**
* 解析表字段列
*/
void TableColumn(TableCreationContext context) :
{
}
{
(LOOKAHEAD(3)
TableColumn2(context.columnList)
|
ComputedColumn(context)
|
Watermark(context) // 引入对Watermark字段的解析
|
context.sideFlag = SideFlag()
)
}
void Watermark(TableCreationContext context) :
{
SqlIdentifier eventTimeColumnName;
SqlParserPos pos;
SqlNode watermarkStrategy;
}
{
<WATERMARK> {pos = getPos();} <FOR>
eventTimeColumnName = CompoundIdentifier()
<AS>
// 非查询 表达式
watermarkStrategy = Expression(ExprContext.ACCEPT_NON_QUERY)
{
if (context.watermark != null) {
throw new RuntimeException("Multiple WATERMARK statements is not supported yet.");
} else {
context.watermark = new SqlWatermark(pos, eventTimeColumnName, watermarkStrategy);
}
}
}
应用规则
- import:"cn.todd.flink.sql.parser.ddl.SqlWatermark"
- keywords: "WATERMARK"
测试解析
@Test
public void testCreateTableWaterMark() throws SqlParseException {
String ddlSql = "CREATE TABLE user_info (\n"
+ " userId BIGINT,\n"
+ " userName VARCHAR(10),\n"
+ " ts timestamp(10),\n"
+ " WATERMARK FOR ts AS ts - INTERVAL '5' SECOND\n"
+ ") WITH (\n"
+ " 'connector' = 'kafka',\n"
+ " 'properties.bootstrap.servers' = 'localhost:9092',\n"
+ " 'topic' = 'tp01',\n"
+ " 'format' = 'json',\n"
+ " 'scan.startup.mode' = 'latest-offset'\n"
+ ")";
SqlNode ddlSqlNode = parseStmtAndHandleEx(ddlSql);
Assert.assertTrue(ddlSqlNode instanceof SqlCreateTable);
SqlCreateTable createTable = (SqlCreateTable) ddlSqlNode;
SqlNodeList columns = createTable.getColumnList();
System.out.println(String.format("tableName: %s",createTable.getTableName()));
for (SqlNode column : columns) {
SqlTableColumn tableColumn = (SqlTableColumn) column;
if (!tableColumn.isGenerated()){
String columnName = tableColumn.getName().toString();
String typeName = tableColumn.getType().getTypeName().getSimple();
System.out.println(String.format("columnName: %s, typeName: %s", columnName, typeName));
} else {
String name = tableColumn.getName().toString();
SqlNode expr = tableColumn.getExpr();
System.out.println(String.format("name: %s, expr: %s", name, expr));
}
}
if (createTable.getWatermark() !=null ) {
SqlWatermark watermark = createTable.getWatermark();
String eventTimeColumnName = watermark.getEventTimeColumnName().toString();
String watermarkStrategy = watermark.getWatermarkStrategy().toString();
System.out.println(String.format("eventTimeColumnName: %s, watermarkStrategy: %s", eventTimeColumnName, watermarkStrategy));
}
}
create view
扩展SqlNode
同:org.apache.flink.sql.parser.ddl.SqlCreateView
定义规则
/**
* Parses a "IF EXISTS" option, default is false.
*/
boolean IfExistsOpt() :
{
}
{
(
LOOKAHEAD(2)
<IF> <EXISTS> { return true; }
|
{ return false; }
)
}
/**
* Parses a "IF NOT EXISTS" option, default is false.
*/
boolean IfNotExistsOpt() :
{
}
{
(
LOOKAHEAD(3)
<IF> <NOT> <EXISTS> { return true; }
|
{ return false; }
)
}
/**
* parse : create view xxx as select ... or create view xxx(..)
*/
SqlCreate SqlCreateView(Span s, boolean replace) : {
SqlIdentifier viewName;
SqlNode query=null;
SqlNodeList fieldList = SqlNodeList.EMPTY;
boolean ifNotExists = false;
}
{
<VIEW>
viewName = CompoundIdentifier()
ifNotExists = IfNotExistsOpt()
[
// 括号中的简单标识符列表(a,b,c)。parser.jj定义
fieldList = ParenthesizedSimpleIdentifierList()
]
[
<AS>
//解析查询表达式
query = OrderedQueryOrExpr(ExprContext.ACCEPT_QUERY)
]
{
return new SqlCreateView(s.pos(), viewName, fieldList, query, replace, ifNotExists);
}
}
应用规则
- import:""cn.todd.flink.sql.parser.ddl.SqlCreateView""
- createStatementParserMethods:"SqlCreateView"
测试解析
@Test
public void testCreateView() throws SqlParseException {
String ddlSql = "CREATE view user_info AS Select * from infos";
String ddlSql2 = "CREATE view user_info (userId,userName ) AS Select * from infos";
SqlNode ddlSqlNode = parseStmtAndHandleEx(ddlSql);
System.out.println(ddlSqlNode);
}
@Test
public void testCreateViewAsSelect() throws SqlParseException {
String ddlSql = "create view user_info as select * from infos";
SqlNode ddlSqlNode = parseStmtAndHandleEx(ddlSql);
Assert.assertTrue(ddlSqlNode instanceof SqlCreateView);
System.out.println(ddlSqlNode);
}
create function
扩展SqlNode
同:org.apache.flink.sql.parser.ddl.SqlCreateFunction
定义规则
/**
* parse : create function name as 'xxx.ccc.cc' LANGUAGE java
*/
SqlCreate SqlCreateFunction(Span s, boolean replace) :
{
SqlIdentifier functionIdentifier = null;
SqlCharStringLiteral functionClassName = null;
String functionLanguage = null;
boolean ifNotExists = false;
}
{
<FUNCTION>
ifNotExists = IfNotExistsOpt()
functionIdentifier = CompoundIdentifier()
<AS> <QUOTED_STRING>
{
// 获取字符串中的内容
String p = SqlParserUtil.parseString(token.image);
functionClassName = SqlLiteral.createCharString(p, getPos());
}
[<LANGUAGE>
(
<JAVA> { functionLanguage = "JAVA"; }
|
<SCALA> { functionLanguage = "SCALA"; }
|
<SQL> { functionLanguage = "SQL"; }
|
<PYTHON> { functionLanguage = "PYTHON"; }
)
]
{
return new SqlCreateFunction(s.pos(), functionIdentifier, functionClassName, functionLanguage, ifNotExists);
}
}
应用规则
- import:"cn.todd.flink.sql.parser.ddl.SqlCreateFunction"
- createStatementParserMethods:"SqlCreateFunction"
- keywords: "PYTHON","SCALA"
测试解析
public class CreateFunctionTest extends BaseParser {
@Test
public void testCreateFunction() throws SqlParseException {
String ddlSql = "create function str2Timestamp as 'aaa.b.c'";
SqlNode ddlSqlNode = parseStmtAndHandleEx(ddlSql);
Assert.assertTrue(ddlSqlNode instanceof SqlCreateFunction);
SqlCreateFunction createFunction = (SqlCreateFunction) ddlSqlNode;
String functionClassName = createFunction.getFunctionClassName().toString();
String functionIdentifier = createFunction.getFunctionIdentifier()[0];
System.out.println(String.format("functionIdentifier:%s , functionClassName: %s,",functionIdentifier, functionClassName));
}
@Test
public void testCreateFunctionWithLanguage() throws SqlParseException {
String ddlSql = "create function str2Timestamp as 'aaa.b.c' language java";
SqlNode ddlSqlNode = parseStmtAndHandleEx(ddlSql);
Assert.assertTrue(ddlSqlNode instanceof SqlCreateFunction);
SqlCreateFunction createFunction = (SqlCreateFunction) ddlSqlNode;
String functionClassName = createFunction.getFunctionClassName().toString();
String functionIdentifier = createFunction.getFunctionIdentifier()[0];
String functionLanguage = createFunction.getFunctionLanguage();
System.out.println(String.format("functionIdentifier:%s , functionClassName: %s,functionLanguage: %s",
functionIdentifier, functionClassName,functionLanguage));
}
}
DML
insert into..select
扩展SqlNode
FlinkSQL为支持HIVE语法引入了partition相关属性,对SqlInsert做了部分扩展。创建RichSqlInsert直接继承SqlInsert。
定义规则
完全使用calcite定义的规则,不做任何扩展。
/**
* Parses an INSERT statement. Copy from parser.jj
*/
SqlNode RichSqlInsert() :
{
final List<SqlLiteral> keywords = new ArrayList<SqlLiteral>();
final SqlNodeList keywordList;
SqlNode table;
SqlNodeList extendList = null;
SqlNode source;
SqlNodeList columnList = null;
final Span s;
}
{
(
<INSERT>
|
<UPSERT> { keywords.add(SqlInsertKeyword.UPSERT.symbol(getPos())); }
)
{ s = span(); }
SqlInsertKeywords(keywords) {
keywordList = new SqlNodeList(keywords, s.addAll(keywords).pos());
}
<INTO> table = CompoundIdentifier()
[
LOOKAHEAD(5)
[ <EXTEND> ]
extendList = ExtendList() {
table = extend(table, extendList);
}
]
[
LOOKAHEAD(2)
{ final Pair<SqlNodeList, SqlNodeList> p; }
p = ParenthesizedCompoundIdentifierList() {
if (p.right.size() > 0) {
table = extend(table, p.right);
}
if (p.left.size() > 0) {
columnList = p.left;
}
}
]
source = OrderedQueryOrExpr(ExprContext.ACCEPT_QUERY) {
return new RichSqlInsert(s.end(source), keywordList, table, source, columnList);
}
}
应用规则
- statementParserMethods: [ "RichSqlInsert()" ]
- import:["cn.todd.flink.sql.parser.dml.RichSqlInsert"]
测试解析
public class InsertStatementTest extends BaseParser {
@Test
public void testSimpleInsertStat() throws SqlParseException {
String insertSql = "insert into sinkTable select name ,age from sourceTable";
SqlNode insertSqlNode = parseStmtAndHandleEx(insertSql);
Assert.assertTrue(insertSqlNode instanceof RichSqlInsert);
SqlNode targetTable = ((SqlInsert) insertSqlNode).getTargetTable();
SqlNode source = ((SqlInsert) insertSqlNode).getSource();
System.out.println(String.format("targetTable: %s, \nsource: %s", targetTable, source));
}
}
总结
- 扩展语法规则:Parser.jj包含了对标准SQL的解析,如果要扩展则在原始规则上做修改,从Parser.jj搜索要使用的方法。例如:Flink在Parser.jj#SqlInsert语法上扩展出RichSqlInsert规则,支持对Partion的解析。
- 了解规则含义:引入的规则不清楚含义时,可以通过阅读生成的parserImpl类,找到对应的方法。
- 规则调试:从
parserImpls.ftl
引入的规则在编译时可能会出错,需要打开生成的parserImpl类找到对应的行数进行排查。