1.简介和架构
Apache Calcite 是一个动态的数据管理框架, 可以实现 SQL 的解析、验证、优化和执行。Calcite 是模块化和插件式的, 解析、验证、优化和执行的步骤都对应着一个相对独立的模块。用户可以选择使用其中的一个或多个模块,也可以对任意模型进行定制化扩展。
Calcite 的架构如下图所示,
① JDBC:构建了一个独立的 Avatica 框架,可以通过标准的 JDBC 接口访问 Calcite 获取数据。
② SQL Parser 和 SQL Validator:可以进行 SQL 的解析和验证,,并将原始的 SQL 字符串解析并转化为内部的 SqlNode
树来表示。
③ Query Optimizer:进行查询优化,,基于在关系代数在 Calcite 内部有一种关系代数表示方法,将关系代数表示为 RelNode
树。RelNode
树不只是由 SqlNode
树转化而来,也可以通过Calcite 提供的 Expressions Builder 接口构建。
说明:Calcite 包含许多组成典型数据库管理系统的部件,但是省略了一些关键的组成部分,例如数据的存储、处理数据的算法和存储元数据的存储库等。因为对不同的数据类型有不同的存储和计算引擎,是不可能将它们统一到一个框架的,所以 Calcite 是一个统一的 SQL 接口实现数据访问框架。
2.SQL 处理流程
如下图所示,Calcite 的处理流程实际上就是 SQL 的解析、校验、优化和执行。
① Parser:解析 SQL,将输入的 SQL 字符串转化为抽象语法树(AST),即 SqlNode
树表示
② Validator:根据元数据信息对 SqlNode
树进行验证, 其输出仍是 SqlNode
树
③ Converter:将 SqlNode
树转化为关系代数,其中 RelNode
树表示关系代数
④ Optimizer:对输入的关系代数进行优化,并输出优化后的 RelNode
树
⑤ Execute:根据优化后的 RelNode
生成执行计划
3.案例分析
users 表的内容:
id:string,name:string,age:int
1,Jack,28
2,John,21
3,Tom,32
4,Peter,24
orders 表内容:
id:string,user_id:string,goods:string,price:double
001,1,Cola,3.5
002,1,Hat,38.9
003,2,Shoes,199.9
004,3,Book,39.9
005,4,Phone,2499.9
查询的 SQL 语句:
SELECT u.id, name, age, sum(price)
FROM users AS u join orders AS o ON u.id = o.user_id
WHERE age >= 20 AND age <= 30
GROUP BY u.id, name, age
ORDER BY u.id
3.1 SQL 解析
通过词法分析和语法分析将 SQL 字符串转化为 AST。在Calcite中, 借助 JavaCC 实现了 SQL 的解析, 并转化为 SqlNode
表示。
SqlNode
是 AST 的抽象基类,不同类型的节点有对应的实现类。下面的 SQL 语句会生成 SqlSelect
和 SqlOrderBy
两个主要的节点。
String sql = "SELECT u.id, name, age, sum(price) " +
"FROM users AS u join orders AS o ON u.id = o.user_id " +
"WHERE age >= 20 AND age <= 30 " +
"GROUP BY u.id, name, age " +
"ORDER BY u.id";
// 创建SqlParser, 用于解析SQL字符串
SqlParser parser = SqlParser.create(sql, SqlParser.Config.DEFAULT);
// 解析SQL字符串, 生成SqlNode树
SqlNode rootSqlNode = parser.parseStmt();
上述代码中的 rootSqlNode
是 AST 的根节点。如下图所示,可以看到 rootSqlNode
是SqlOrderBy
类型,其中 query
字段是一个 SqlSelect
类型,即代表原始的 SQL 语句去掉ORDER BY 部分。
3.2 SQL 校验
SQL 校验阶段一方面会借助元数据信息执行上述验证,另一方面会对 SqlNode
树进行一些改写,以转化为统一的格式。
// 创建Schema, 一个Schema中包含多个表
SimpleTable userTable = SimpleTable.newBuilder("users")
.addField("id", SqlTypeName.VARCHAR)
.addField("name", SqlTypeName.VARCHAR)
.addField("age", SqlTypeName.INTEGER)
.withFilePath("/path/to/user.csv")
.withRowCount(10)
.build();
SimpleTable orderTable = SimpleTable.newBuilder("orders")
.addField("id", SqlTypeName.VARCHAR)
.addField("user_id", SqlTypeName.VARCHAR)
.addField("goods", SqlTypeName.VARCHAR)
.addField("price", SqlTypeName.DECIMAL)
.withFilePath("/path/to/order.csv")
.withRowCount(10)
.build();
SimpleSchema schema = SimpleSchema.newBuilder("s")
.addTable(userTable)
.addTable(orderTable)
.build();
CalciteSchema rootSchema = CalciteSchema.createRootSchema(false, false);
rootSchema.add(schema.getSchemaName(), schema);
RelDataTypeFactory typeFactory = new JavaTypeFactoryImpl();
// 创建CatalogReader, 用于指示如何读取Schema信息
Prepare.CatalogReader catalogReader = new CalciteCatalogReader(
rootSchema,
Collections.singletonList(schema.getSchemaName()),
typeFactory,
config);
// 创建SqlValidator, 用于执行SQL验证
SqlValidator.Config validatorConfig = SqlValidator.Config.DEFAULT
.withLenientOperatorLookup(config.lenientOperatorLookup())
.withSqlConformance(config.conformance())
.withDefaultNullCollation(config.defaultNullCollation())
.withIdentifierExpansion(true);
SqlValidator validator = SqlValidatorUtil.newValidator(
SqlStdOperatorTable.instance(), catalogReader, typeFactory, validatorConfig);
// 执行SQL验证
SqlNode validateSqlNode = validator.validate(node);
如下图可知,SQL 验证后的输出结果仍是 SqlNode 树。不过其内部结构发生了改变,一个明显的变化是验证后的 SqlOrderBy
节点被改写为了 SqlSelect
节点,并在其 orderBy
变量中记录了排序字段。
如果把表名或者字段写错,validator.validate(node)
运行时在就会报错。如果把验证前后的SqlNode
完全打印出来,可以发现在校验时会为每个字段加上表名限定。
-- 验证前的SqlNode树打印结果
SELECT `u`.`id`, `name`, `age`, SUM(`price`)
FROM `users` AS `u`
INNER JOIN `orders` AS `o` ON `u`.`id` = `o`.`user_id`
WHERE `age` >= 20 AND `age` <= 30
GROUP BY `u`.`id`, `name`, `age`
ORDER BY `u`.`id`
-- 验证后的SqlNode树打印结果
SELECT `u`.`id`, `u`.`name`, `u`.`age`, SUM(`o`.`price`)
FROM `s`.`users` AS `u`
INNER JOIN `s`.`orders` AS `o` ON `u`.`id` = `o`.`user_id`
WHERE `u`.`age` >= 20 AND `u`.`age` <= 30
GROUP BY `u`.`id`, `u`.`name`, `u`.`age`
ORDER BY `u`.`id`
3.3 转换为关系代数 RelNode
关系代数是 SQL 的理论基础,可以阅读 Introduction of Relational Algebra in DBMS简单了解,其中“数据库系统概念“中对关系代数有更深入的介绍。
在 Calcite 中, 关系代数由 RelNode
表示。如下代码所示,将校验后的 SqlNode
树转化为RelNode
树。
// 创建VolcanoPlanner, VolcanoPlanner在后面的优化中还需要用到
VolcanoPlanner planner = new VolcanoPlanner(RelOptCostImpl.FACTORY, Contexts.of(config));
planner.addRelTraitDef(ConventionTraitDef.INSTANCE);
// 创建SqlToRelConverter
RelOptCluster cluster = RelOptCluster.create(planner, new RexBuilder(typeFactory));
SqlToRelConverter.Config converterConfig = SqlToRelConverter.config()
.withTrimUnusedFields(true)
.withExpand(false);
SqlToRelConverter converter = new SqlToRelConverter(
null,
validator,
catalogReader,
cluster,
StandardConvertletTable.INSTANCE,
converterConfig);
// 将SqlNode树转化为RelNode树
RelNode relNode = converter.convertQuery(validateSqlNode, false, true);
RelNode
树实质上是一个逻辑执行计划,上述 SQL 对应的逻辑执行计划如下,其中每一行都表示一个节点,即 RelNode
的实现类。
LogicalSort(sort0=[$0], dir0=[ASC])
LogicalAggregate(group=[{0, 1, 2}], EXPR$3=[SUM($3)])
LogicalProject(id=[$0], name=[$1], age=[$2], price=[$6])
LogicalFilter(condition=[AND(>=($2, 20), <=($2, 30))])
LogicalJoin(condition=[=($0, $4)], joinType=[inner])
LogicalTableScan(table=[[s, users]])
LogicalTableScan(table=[[s, orders]])
3.4 查询优化
查询优化是 Calcite 的核心模块,主要有三部分组成:
① Planner rules:优化规则,例如内置优化规则有谓词下推、投影下推等。用户也可定义自己的优化规则。
② Metadata providers:元数据,主要用于基于成本的优化(Cost-based Optimize 即 CBO),包括表的行数、表的大小、给定列的值是否唯一等信息。
③ Planner engines:优化器实现,HepPlanner
用于实现基于规则的优化(Rule-based Optimize 即 RBO),VolcanoPlanner
用于实现基于成本的优化。
// 优化规则
RuleSet rules = RuleSets.ofList(
CoreRules.FILTER_TO_CALC,
CoreRules.PROJECT_TO_CALC,
CoreRules.FILTER_CALC_MERGE,
CoreRules.PROJECT_CALC_MERGE,
CoreRules.FILTER_INTO_JOIN, // 过滤谓词下推到Join之前
EnumerableRules.ENUMERABLE_TABLE_SCAN_RULE,
EnumerableRules.ENUMERABLE_PROJECT_TO_CALC_RULE,
EnumerableRules.ENUMERABLE_FILTER_TO_CALC_RULE,
EnumerableRules.ENUMERABLE_JOIN_RULE,
EnumerableRules.ENUMERABLE_SORT_RULE,
EnumerableRules.ENUMERABLE_CALC_RULE,
EnumerableRules.ENUMERABLE_AGGREGATE_RULE);
Program program = Programs.of(RuleSets.ofList(rules));
RelNode optimizerRelTree = program.run(
planner,
relNode,
relNode.getTraitSet().plus(EnumerableConvention.INSTANCE),
Collections.emptyList(),
Collections.emptyList());
经过优化后的输出如下,可知所有的节点都变成了 Enumerable
开头的物理节点,其基类是EnumerableRel
EnumerableSort(sort0=[$0], dir0=[ASC])
EnumerableAggregate(group=[{0, 1, 2}], EXPR$3=[SUM($3)])
EnumerableCalc(expr#0..6=[{inputs}], proj#0..2=[{exprs}], price=[$t6])
EnumerableHashJoin(condition=[=($0, $4)], joinType=[inner])
EnumerableCalc(expr#0..2=[{inputs}], expr#3=[Sarg[[20..30]]], expr#4=[SEARCH($t2, $t3)], proj#0..2=[{exprs}], $condition=[$t4])
EnumerableTableScan(table=[[s, users]])
EnumerableTableScan(table=[[s, orders]])
优化前后的计划:users 表的过滤位置发生了变动,从先 Join
后过滤,变成了先过滤后 Join
,如下图所示。
3.5 执行计划
将物理计划转化为执行计划通常需要自定义代码。Calcite 提供了一种执行计划生成方法,如下所示,可以生成执行计划并读取CSV文件中的数据。
EnumerableRel enumerable = (EnumerableRel) optimizerRelTree;
Map<String, Object> internalParameters = new LinkedHashMap<>();
EnumerableRel.Prefer prefer = EnumerableRel.Prefer.ARRAY;
Bindable bindable = EnumerableInterpretable.toBindable(internalParameters,
null, enumerable, prefer);
Enumerable bind = bindable.bind(new SimpleDataContext(rootSchema.plus()));
Enumerator enumerator = bind.enumerator();
while (enumerator.moveNext()) {
Object current = enumerator.current();
Object[] values = (Object[]) current;
StringBuilder sb = new StringBuilder();
for (Object v : values) {
sb.append(v).append(",");
}
sb.setLength(sb.length() - 1);
System.out.println(sb);
}
执行结果:
1,Jack,28,42.40
2,John,21,199.90
4,Peter,24,2499.90
参考:
[1] SQL over anything with an Optiq Adapter
[2] Apache Calcite 处理流程详解(一)
[3] 编译原理实践 - JavaCC 解析表达式并生成抽象语法树.