Spark源码分析九-AsyncEventQueue

链接:https://blog.csdn.net/nazeniwaresakini/java/article/details/104220341

前言

1)异步事件队列AsyncEventQueue

  • eventQueue、eventCount属性
  • droppedEventsCounter、lastReportTimestamp、logDroppedEvent属性
  • started、stopped属性
  • dispatchThread属性
  • dispatch()方法
  • post()方法
  1. 异步事件总线LiveListenerBus
  • queues属性
  • queuedEvents属性
  • addToQueue()方法
  • post()、postToQueues()方法

SparkListenerBus默认提供的事件投递方法是同步调用的。如果注册的监听器和产生的事件非常多,同步调用必然会造成事件的积压以及处理的延时。因此,在SparkListenerBus的实现类AsyncEventQueue中,提供了异步事件队列机制,它也是SparkContext中的事件总线LiveListenerBus的基础

异步事件队列AsyncEventQueue

AsyncEventQueue类声明及其属性

private class AsyncEventQueue(
    val name: String,
    conf: SparkConf,
    metrics: LiveListenerBusMetrics,
    bus: LiveListenerBus)
  extends SparkListenerBus
  with Logging {
  import AsyncEventQueue._
 
  private val eventQueue = new LinkedBlockingQueue[SparkListenerEvent](
    conf.get(LISTENER_BUS_EVENT_QUEUE_CAPACITY))
 
  private val eventCount = new AtomicLong()
 
  private val droppedEventsCounter = new AtomicLong(0L)
 
  @volatile private var lastReportTimestamp = 0L
 
  private val logDroppedEvent = new AtomicBoolean(false)
 
  private var sc: SparkContext = null
 
  private val started = new AtomicBoolean(false)
  private val stopped = new AtomicBoolean(false)
 
  private val droppedEvents = metrics.metricRegistry.counter(s"queue.$name.numDroppedEvents")
  private val processingTime = metrics.metricRegistry.timer(s"queue.$name.listenerProcessingTime")
 
  private val dispatchThread = new Thread(s"spark-listener-group-$name") {
    setDaemon(true)
    override def run(): Unit = Utils.tryOrStopSparkContext(sc) {
      dispatch()
    }
  }
 
  // ...
}

该类的构造参数有四个,分别是队列名、Spark配置项、LiveListenerBus的监控度量,以及LiveListenerBus本身。下面来看一下它的主要属性。

eventQueue、eventCount属性

eventQueue是一个存储SparkListenerEvent事件的阻塞队列LinkedBlockingQueue。它的大小是通过配置参数spark.scheduler.listenerbus.eventqueue.capacity来设置的,默认值10000。如果不设置阻塞队列的大小,那么默认值会是Integer.MAX_VALUE,有OOM的风险。

eventCount则是当前待处理事件的计数。因为事件从队列中弹出不代表已经处理完成,所以不能直接用队列的实际大小来表示。它是AtomicLong类型的,以保证修改的原子性。

droppedEventsCounter、lastReportTimestamp、logDroppedEvent属性

droppedEventsCounter是被丢弃事件的计数。当阻塞队列已满后,新产生的事件无法入队,就会被丢弃。日志中定期输出该计数器的值,用lastReportTimestamp记录下每次输出的时间戳,并且输出后都会将计数器重新置为0。

logDroppedEvent用于指示是否发生过了事件丢弃的情况。它与droppedEventsCounter一样也都是原子类型的。

started、stopped属性

这两个属性分别用来标记队列的启动与停止状态。

dispatchThread属性
dispatchThread是将队列中的事件分发到各监听器的守护线程,实际上调用了dispatch()方法。而Utils.tryOrStopSparkContext()方法的作用在于执行代码块时如果抛出异常,就另外起一个线程关闭SparkContext。

下面就来看看dispatch()方法的源码。

private def dispatch(): Unit = LiveListenerBus.withinListenerThread.withValue(true) {
    var next: SparkListenerEvent = eventQueue.take()
    while (next != POISON_PILL) {
      val ctx = processingTime.time()
      try {
        super.postToAll(next)
      } finally {
        ctx.stop()
      }
      eventCount.decrementAndGet()
      next = eventQueue.take()
    }
    eventCount.decrementAndGet()
  }

可见,该方法循环地从事件队列中取出事件,并调用父类ListenerBus特征的postToAll()方法(文章#5已经讲过)将其投递给所有已注册的监听器,并减少计数器的值。“毒药丸”POISON_PILL是伴生对象中定义的一个特殊的空事件,在队列停止(即调用stop()方法)时会被放入,dispatcherThread取得它之后就会“中毒”退出循环。

有了处理事件的方法,还得有将事件放入队列的方法才完整。下面是入队的方法post()。

 def post(event: SparkListenerEvent): Unit = {
    if (stopped.get()) {
      return
    }
 
    eventCount.incrementAndGet()
    if (eventQueue.offer(event)) {
      return
    }
 
    eventCount.decrementAndGet()
    droppedEvents.inc()
    droppedEventsCounter.incrementAndGet()
    if (logDroppedEvent.compareAndSet(false, true)) {
      logError(s"Dropping event from queue $name. " +
        "This likely means one of the listeners is too slow and cannot keep up with " +
        "the rate at which tasks are being started by the scheduler.")
    }
    logTrace(s"Dropping event $event")
 
    val droppedCount = droppedEventsCounter.get
    if (droppedCount > 0) {
      if (System.currentTimeMillis() - lastReportTimestamp >= 60 * 1000) {
        if (droppedEventsCounter.compareAndSet(droppedCount, 0)) {
          val prevLastReportTimestamp = lastReportTimestamp
          lastReportTimestamp = System.currentTimeMillis()
          val previous = new java.util.Date(prevLastReportTimestamp)
          logWarning(s"Dropped $droppedCount events from $name since $previous.")
        }
      }
    }
  }

该方法首先检查队列是否已经停止。如果是运行状态,就试图将事件event入队。若offer()方法返回false,表示队列已满,将丢弃事件的计数器自增,并标记有事件被丢弃。最后,若当前的时间戳与上一次输出droppedEventsCounter值的间隔大于1分钟,就在日志里输出它的值。

理解了AsyncEventQueue的细节之后,我们就可以进一步来看LiveListenerBus的实现了。

异步事件总线LiveListenerBus

AsyncEventQueue已经继承了SparkListenerBus特征,LiveListenerBus内部用到了AsyncEventQueue作为核心。来看它的声明以及属性的定义。

private[spark] class LiveListenerBus(conf: SparkConf) {
  import LiveListenerBus._
 
  private var sparkContext: SparkContext = _
 
  private[spark] val metrics = new LiveListenerBusMetrics(conf)
 
  private val started = new AtomicBoolean(false)
  private val stopped = new AtomicBoolean(false)
 
  private val droppedEventsCounter = new AtomicLong(0L)
 
  @volatile private var lastReportTimestamp = 0L
 
  private val queues = new CopyOnWriteArrayList[AsyncEventQueue]()
 
  @volatile private[scheduler] var queuedEvents = new mutable.ListBuffer[SparkListenerEvent]()
 
  // ...
}

这里的属性与AsyncEventQueue大同小异,多出来的主要是queues与queuedEvents两个。

queues属性
queues维护一个AsyncEventQueue的列表,也就是说LiveListenerBus中会有多个事件队列。它采用CopyOnWriteArrayList来保证线程安全性。

queuedEvents属性
queuedEvents维护一个SparkListenerEvent的列表,它的用途是在LiveListenerBus启动成功之前,缓存可能已经收到的事件。在启动之后,这些缓存的事件会首先投递出去。

LiveListenerBus作为一个事件总线,也必须提供监听器注册、事件投递等功能,这些都是在AsyncEventQueue基础之上实现的,下面来看一看。

addToQueue()方法

 private[spark] def addToQueue(
      listener: SparkListenerInterface,
      queue: String): Unit = synchronized {
    if (stopped.get()) {
      throw new IllegalStateException("LiveListenerBus is stopped.")
    }
 
    queues.asScala.find(_.name == queue) match {
      case Some(queue) =>
        queue.addListener(listener)
 
      case None =>
        val newQueue = new AsyncEventQueue(queue, conf, metrics, this)
        newQueue.addListener(listener)
        if (started.get()) {
          newQueue.start(sparkContext)
        }
        queues.add(newQueue)
    }
  }

该方法将监听器listener注册到名为queue的队列中。它会在queues列表中寻找符合条件的队列,如果该队列已经存在,就调用父类ListenerBus的addListener()方法直接注册监听器。反之,就先创建一个AsyncEventQueue,注册监听器到新的队列中。

LiveListenerBus还提供了另外4种直接注册监听器的方法,分别对应内置的4个队列,其名称在伴生对象中有定义。

def addToSharedQueue(listener: SparkListenerInterface): Unit = {
    addToQueue(listener, SHARED_QUEUE)
  }
 
  def addToManagementQueue(listener: SparkListenerInterface): Unit = {
    addToQueue(listener, EXECUTOR_MANAGEMENT_QUEUE)
  }
 
  def addToStatusQueue(listener: SparkListenerInterface): Unit = {
    addToQueue(listener, APP_STATUS_QUEUE)
  }
 
  def addToEventLogQueue(listener: SparkListenerInterface): Unit = {
    addToQueue(listener, EVENT_LOG_QUEUE)
  }

post()、postToQueues()方法

 def post(event: SparkListenerEvent): Unit = {
    if (stopped.get()) {
      return
    }
    metrics.numEventsPosted.inc()
 
    if (queuedEvents == null) {
      postToQueues(event)
      return
    }
 
    synchronized {
      if (!started.get()) {
        queuedEvents += event
        return
      }
    }
 
    postToQueues(event)
  }
 
  private def postToQueues(event: SparkListenerEvent): Unit = {
    val it = queues.iterator()
    while (it.hasNext()) {
      it.next().post(event)
    }
  }

post()方法会检查queuedEvents中有无缓存的事件,以及事件总线是否还没有启动。投递时会调用postToQueues()方法,将事件发送给所有队列,由AsyncEventQueue来完成投递到监听器的工作。

总结

本文研究了与SparkContext相关的异步事件处理机制的实现,即AsyncEventQueue与LiveListenerBus。它们之间的关系可以用下面的简图来表示。


image.png
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,539评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,911评论 3 391
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,337评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,723评论 1 290
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,795评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,762评论 1 294
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,742评论 3 416
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,508评论 0 271
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,954评论 1 308
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,247评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,404评论 1 345
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,104评论 5 340
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,736评论 3 324
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,352评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,557评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,371评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,292评论 2 352