给 Spark2.2 加上自定义 Hint 优化

由于我们底层数据存的类型是String,所以在做比较或排序时,数据计算错误。比如 "9" 比 "10" 大。为了只对比较的字段 cast 成 Double,老大突发奇想:在 Hint 传入需要转换字段名,然后 Analyzer 做计算的判断,将指定数据 cast 成 Double。

在Spark2.2中,增加了对Hint的解析,支持用户broadcast hint。本文的目的是在 Analyzer 里增加一个 function,支持将指定的 UnresolvedAttribute 加上 Cast。

首先,Analyzer.scala 中和 Hint 相关的代码

lazy val batches: Seq[Batch] = Seq(
  Batch("Hints", fixedPoint,
    new ResolveHints.ResolveBroadcastHints(conf),
    ResolveHints.RemoveAllHints),
    ……
  )

这里有一个 Object 叫 ResolveHints,和 Hint 处理相关的都在这里面。ResolveHints 里有一个 class 叫ResolveBroadcastHints,用来处理和 Broadcast 相关的 Hint;有一个 object 叫 RemoveAllHints 只有一个功能:将 UnresolvedHint 节点从解析树上删掉

case h: UnresolvedHint => h.child

在 ResolveBroadcastHints 里,是解析一棵 LogicalPlan 树,这里 transformUp 接受的是偏函数对象的参数(什么叫偏函数?请自行百度),自顶向下递归判断当前节点是否为 UnresolvedHint,再在applyBroadcastHint 中递归标记可以广播的信息。

def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
  case h: UnresolvedHint if BROADCAST_HINT_NAMES.contains(h.name.toUpperCase(Locale.ROOT)) =>
    if (h.parameters.isEmpty) {
      // If there is no table alias specified, turn the entire subtree into a BroadcastHint.
      ResolvedHint(h.child, HintInfo(isBroadcastable = Option(true)))
    } else {
      // Otherwise, find within the subtree query plans that should be broadcasted.
      applyBroadcastHint(h.child, h.parameters.map {
        case tableName: String => tableName
        case tableId: UnresolvedAttribute => tableId.name
        case unsupported => throw new AnalysisException("Broadcast hint parameter should be " +
          s"an identifier or string but was $unsupported (${unsupported.getClass}")
      }.toSet)
    }
}

在 UnresolvedHint 中,有一个 name 属性,标记这个函数名;有一个 parameters 属性,标记这个函数接受的参数。

OK,以上关于已有代码的一些理解。也许理解上仍有偏差,同时也不够接地气,接下去接地气一些。既然是要修改 Spark 源码,测试当然少不了。关于 Spark 的单元测试,功能还是相当丰富的。本文由于是个人学习的一些记录,所以我会记一些单元测试中遇到的坑,以及一些技巧,也许对于读者无用,请读者跳过。

关于单元测试,首先当然是记录老大牛逼闪闪的blog

我们应该先关注一下当前 Parser 结束之后的 plan,在 catalyst 里,有一个 parsePlan 方法,可以实现这个目的。(当然,这是在一些 parser 的测试里看到的)

explain extended select /*+ CAST_AS_DOUBLE(`key`) */ if(`key` > `value`, `key`, `value`) from src order by `key`

得到这样的结果

== Parsed Logical Plan ==
'Sort ['key ASC NULLS FIRST], true
+- 'UnresolvedHint CAST_AS_DOUBLE, ['key]
   +- 'Project [unresolvedalias('if(('key > 'value), 'key, 'value), None)]
      +- 'UnresolvedRelation `src`

可以发现 UnresolvedHint 确实是像 Spark 源码注释里说的那样

A broadcast hint plan node will be inserted on top of any relation (that is not aliased differently), subquery, or common table expression that match the specified name.

但是我希望它是 Sort 的父节点,这样就可以和 applyBroadcastHint 方法走类似的逻辑;很遗憾,只能重新遍历一次树了。解决方案是,记录下所有需要做 Cast 操作的 UnresolvedAttribute,然后在 RemoveAllHints 的时候做一次节点替换。

这边再记录踩的一个坑吧。在做节点替换的时候,我一开始是这样写的:

case a => a transformExpressions {
  case sortOrder: SortOrder => sortOrder transformUp {
    case child: UnresolvedAttribute => Cast(child, DoubleType)
  }
}

这里面会出现一个死循环,或者说无限递归调用,因为原来的 UnresolvedAttribute 是 Cast 的子节点,然后遍历子节点再次遍历到 UnresolvedAttribute 时,又会再加一层 Cast,无限循环到堆栈溢出。

最后是这样写的:

case a => a transformExpressions {
  case SortOrder(child, direction, nullOrdering, sameOrderExpressions)
    if child.isInstanceOf[UnresolvedAttribute] &&
      CAST_FIELD_ID.contains(child.asInstanceOf[UnresolvedAttribute].name) =>
    SortOrder(Cast(child, DoubleType), direction, nullOrdering, sameOrderExpressions)

效果如下:

== Analyzed Logical Plan ==
(IF((CAST(key AS DOUBLE) > CAST(value AS DOUBLE)), key, value)): string
Project [(IF((CAST(key AS DOUBLE) > CAST(value AS DOUBLE)), key, value))#234]
+- Sort [cast(key#226 as double) ASC NULLS FIRST], true
   +- ResolvedHint none
      +- Project [if ((cast(key#226 as double) > cast(value#227 as double))) key#226 else value#227 AS (IF((CAST(key AS DOUBLE) > CAST(value AS DOUBLE)), key, value))#234, key#226]
         +- SubqueryAlias src
            +- Project [_1#223 AS key#226, _2#224 AS value#227]
               +- LocalRelation [_1#223, _2#224]

对于希望用SQL去做单元测试,在 catalyst 里,创建一张表会比较麻烦(可以执行SQL,并且使用断点调试达到目的,但是最终是会报错,终究还是很不爽的),建议在 sql core 里写单元测试。

路漫漫其修远兮,吾将上下而求索。加油加油!

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 211,265评论 6 490
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,078评论 2 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 156,852评论 0 347
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,408评论 1 283
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,445评论 5 384
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,772评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,921评论 3 406
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,688评论 0 266
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,130评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,467评论 2 325
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,617评论 1 340
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,276评论 4 329
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,882评论 3 312
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,740评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,967评论 1 265
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,315评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,486评论 2 348

推荐阅读更多精彩内容