spark在生产中是否要禁止掉BHJ(BroadcastHashJoin)

背景

本文基于spark 3.2
driver内存 2G

问题描述

在基于复杂的sql运行中,或者说是存在多个join操作的sql中,如果说driver内存不是很大的情况下,我们经常会遇到如下报错:

Caused by: org.apache.spark.SparkException: Could not execute broadcast in 800 secs. You can increase the timeout for broadcasts via spark.sql.broadcastTimeout or disable broadcast join by setting spark.sql.autoBroadcastJoinThreshold to -1
    at org.apache.spark.sql.execution.adaptive.BroadcastQueryStageExec$$anon$1.run(QueryStageExec.scala:217)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

其实从字面上就可以理解:broadcast的数据超时了,这种经常是由于广播的数据量太大引起的,spark中默认的广播大小不是只有10M才会进行广播么?
spark.sql.autoBroadcastJoinThreshold默认为10M
为什么还会存在广播的数据量很大呢?

问题分析

直接说重点:
在spark中 SMJ转BHJ 在两个阶段会发生:

  1. 正常的物理计划的生成阶段,也就是SparkPlanner中的JoinSelection规则中
  2. AQE阶段,也就是AdaptiveSparkPlanExec中的reOptimize方法
    其实这两个阶段调用的方法都是一样的都是调用了sparkPlanner的JoinSelection规则

我们说说第一阶段,也就是正常的物理计划的生成阶段,即JoinSelection规则

这里的重要的方法是canBroadcastBySize:

def canBroadcastBySize(plan: LogicalPlan, conf: SQLConf): Boolean = {
    plan.stats.sizeInBytes >= 0 && plan.stats.sizeInBytes <= conf.autoBroadcastJoinThreshold
  }

具体的逻辑阶段的统计信息,可以参考spark logicalPlan Statistics (逻辑计划阶段的统计信息),这里如果我们是基于文件读取的话(大部分就是基于文件的读取),如果说我们的的sql是

##其中tableA有很多字段,我们只选取a,b两个字段 
select a,b from tableA

这里的逻辑阶段的数据统计就是一个大概的计算:

private def visitUnaryNode(p: UnaryNode): Statistics = {
    // There should be some overhead in Row object, the size should not be zero when there is
    // no columns, this help to prevent divide-by-zero error.
    val childRowSize = EstimationUtils.getSizePerRow(p.child.output)
    val outputRowSize = EstimationUtils.getSizePerRow(p.output)
    // Assume there will be the same number of rows as child has.
    var sizeInBytes = (p.child.stats.sizeInBytes * outputRowSize) / childRowSize
    if (sizeInBytes == 0) {
      // sizeInBytes can't be zero, or sizeInBytes of BinaryNode will also be zero
      // (product of children).
      sizeInBytes = 1
    }

    // Don't propagate rowCount and attributeStats, since they are not estimated here.
    Statistics(sizeInBytes = sizeInBytes)
}
...
def getSizePerRow(
      attributes: Seq[Attribute],
      attrStats: AttributeMap[ColumnStat] = AttributeMap(Nil)): BigInt = {
    // We assign a generic overhead for a Row object, the actual overhead is different for different
    // Row format.
    8 + attributes.map { attr =>
      if (attrStats.get(attr).map(_.avgLen.isDefined).getOrElse(false)) {
        attr.dataType match {
          case StringType =>
            // UTF8String: base + offset + numBytes
            attrStats(attr).avgLen.get + 8 + 4
          case _ =>
            attrStats(attr).avgLen.get
        }
      } else {
        attr.dataType.defaultSize
      }
    }.sum
  }

我们看到其实就是对于从一个表中读取一个字段的大小是基于改字段类型所占avgLen的大小除以该表中所有字段总的类型的avgLen的一个比值
如: select a from tableA, 假如tableA是50M 有a,b,c,d,e三个字段,其中a是int类型,且avgLen是10,b,c,d,e是string类型,且avgLen是20。
则该sql算出来的size就是 10/(10+4(10+8+4))50M=5.1M 这样这个sql所对应的临时表就能进行广播。
但是正如spark里说的:

Statistics collected for a column.
1. The JVM data type stored in min/max is the internal data type for the corresponding Catalyst data type. For example, the internal type of DateType is Int, and that the internal type of TimestampType is Long. 2. There is no guarantee that the statistics collected are accurate. Approximation algorithms (sketches) might have been used, and the data collected can also be stale.
Params:
distinctCount – number of distinct values
min – minimum value
max – maximum value
nullCount – number of nulls
avgLen – average length of the values. For fixed-length types, this should be a constant.
maxLen – maximum length of the values. For fixed-length types, this should be a constant.
histogram – histogram of the values
version – version of statistics saved to or retrieved from the catalog

这些统计信息有可能是不准确的,所以在计算的时候,有可能broadcast的数据就相对比较大,如果存在这种join有很多的情况下,就会导致driver端很卡,甚至OOM

我们再说说第二阶段,也就是AQE阶段

其实这个阶段核心还是getFinalPhysicalPlan方法中的createQueryStages方法和reOptimize方法,
在createQueryStages方法中如果方法已经是BroadcastExchangeExec的话,就会直接包装成ShuffleQueryStageExec,
如果是ShuffleExchangeExec的话,在下一个阶段会经过reOptimize方法根据运行时的统计信息大小,来进行是否可以进行SMJ到BHJ的转换
这里在的阈值判断是通过spark.sql.adaptive.autoBroadcastJoinThreshold来判断的,默认也是10M,

所以在spark UI上有时候能看到broadcast 的datasize有50M甚至100多M,而明明broadcast的阈值是10M,却变成了BroadCastHashJoin。

结论

所以在大数据量,以及在复杂的sql情况下,禁止broadcasthashjoin是明确的选择,毕竟稳是一切运行的条件,但是也是可以根据单个任务个别开启。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 217,542评论 6 504
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,822评论 3 394
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 163,912评论 0 354
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,449评论 1 293
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,500评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,370评论 1 302
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,193评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,074评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,505评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,722评论 3 335
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,841评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,569评论 5 345
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,168评论 3 328
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,783评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,918评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,962评论 2 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,781评论 2 354

推荐阅读更多精彩内容