由于我们底层数据存的类型是String,所以在做比较或排序时,数据计算错误。比如 "9" 比 "10" 大。为了只对比较的字段 cast 成 Double,老大突发奇想:在 Hint 传入需要转换字段名,然后 Analyzer 做计算的判断,将指定数据 cast 成 Double。
在Spark2.2中,增加了对Hint的解析,支持用户broadcast hint。本文的目的是在 Analyzer 里增加一个 function,支持将指定的 UnresolvedAttribute 加上 Cast。
首先,Analyzer.scala 中和 Hint 相关的代码
lazy val batches: Seq[Batch] = Seq(
Batch("Hints", fixedPoint,
new ResolveHints.ResolveBroadcastHints(conf),
ResolveHints.RemoveAllHints),
……
)
这里有一个 Object 叫 ResolveHints,和 Hint 处理相关的都在这里面。ResolveHints 里有一个 class 叫ResolveBroadcastHints,用来处理和 Broadcast 相关的 Hint;有一个 object 叫 RemoveAllHints 只有一个功能:将 UnresolvedHint 节点从解析树上删掉
case h: UnresolvedHint => h.child
在 ResolveBroadcastHints 里,是解析一棵 LogicalPlan 树,这里 transformUp 接受的是偏函数对象的参数(什么叫偏函数?请自行百度),自顶向下递归判断当前节点是否为 UnresolvedHint,再在applyBroadcastHint 中递归标记可以广播的信息。
def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
case h: UnresolvedHint if BROADCAST_HINT_NAMES.contains(h.name.toUpperCase(Locale.ROOT)) =>
if (h.parameters.isEmpty) {
// If there is no table alias specified, turn the entire subtree into a BroadcastHint.
ResolvedHint(h.child, HintInfo(isBroadcastable = Option(true)))
} else {
// Otherwise, find within the subtree query plans that should be broadcasted.
applyBroadcastHint(h.child, h.parameters.map {
case tableName: String => tableName
case tableId: UnresolvedAttribute => tableId.name
case unsupported => throw new AnalysisException("Broadcast hint parameter should be " +
s"an identifier or string but was $unsupported (${unsupported.getClass}")
}.toSet)
}
}
在 UnresolvedHint 中,有一个 name 属性,标记这个函数名;有一个 parameters 属性,标记这个函数接受的参数。
OK,以上关于已有代码的一些理解。也许理解上仍有偏差,同时也不够接地气,接下去接地气一些。既然是要修改 Spark 源码,测试当然少不了。关于 Spark 的单元测试,功能还是相当丰富的。本文由于是个人学习的一些记录,所以我会记一些单元测试中遇到的坑,以及一些技巧,也许对于读者无用,请读者跳过。
关于单元测试,首先当然是记录老大牛逼闪闪的blog。
我们应该先关注一下当前 Parser 结束之后的 plan,在 catalyst 里,有一个 parsePlan 方法,可以实现这个目的。(当然,这是在一些 parser 的测试里看到的)
explain extended select /*+ CAST_AS_DOUBLE(`key`) */ if(`key` > `value`, `key`, `value`) from src order by `key`
得到这样的结果
== Parsed Logical Plan ==
'Sort ['key ASC NULLS FIRST], true
+- 'UnresolvedHint CAST_AS_DOUBLE, ['key]
+- 'Project [unresolvedalias('if(('key > 'value), 'key, 'value), None)]
+- 'UnresolvedRelation `src`
可以发现 UnresolvedHint 确实是像 Spark 源码注释里说的那样
A broadcast hint plan node will be inserted on top of any relation (that is not aliased differently), subquery, or common table expression that match the specified name.
但是我希望它是 Sort 的父节点,这样就可以和 applyBroadcastHint 方法走类似的逻辑;很遗憾,只能重新遍历一次树了。解决方案是,记录下所有需要做 Cast 操作的 UnresolvedAttribute,然后在 RemoveAllHints 的时候做一次节点替换。
这边再记录踩的一个坑吧。在做节点替换的时候,我一开始是这样写的:
case a => a transformExpressions {
case sortOrder: SortOrder => sortOrder transformUp {
case child: UnresolvedAttribute => Cast(child, DoubleType)
}
}
这里面会出现一个死循环,或者说无限递归调用,因为原来的 UnresolvedAttribute 是 Cast 的子节点,然后遍历子节点再次遍历到 UnresolvedAttribute 时,又会再加一层 Cast,无限循环到堆栈溢出。
最后是这样写的:
case a => a transformExpressions {
case SortOrder(child, direction, nullOrdering, sameOrderExpressions)
if child.isInstanceOf[UnresolvedAttribute] &&
CAST_FIELD_ID.contains(child.asInstanceOf[UnresolvedAttribute].name) =>
SortOrder(Cast(child, DoubleType), direction, nullOrdering, sameOrderExpressions)
效果如下:
== Analyzed Logical Plan ==
(IF((CAST(key AS DOUBLE) > CAST(value AS DOUBLE)), key, value)): string
Project [(IF((CAST(key AS DOUBLE) > CAST(value AS DOUBLE)), key, value))#234]
+- Sort [cast(key#226 as double) ASC NULLS FIRST], true
+- ResolvedHint none
+- Project [if ((cast(key#226 as double) > cast(value#227 as double))) key#226 else value#227 AS (IF((CAST(key AS DOUBLE) > CAST(value AS DOUBLE)), key, value))#234, key#226]
+- SubqueryAlias src
+- Project [_1#223 AS key#226, _2#224 AS value#227]
+- LocalRelation [_1#223, _2#224]
对于希望用SQL去做单元测试,在 catalyst 里,创建一张表会比较麻烦(可以执行SQL,并且使用断点调试达到目的,但是最终是会报错,终究还是很不爽的),建议在 sql core 里写单元测试。
路漫漫其修远兮,吾将上下而求索。加油加油!