18 Spark Streaming程序的优雅停止

  1. Spark Streaming程序的停止可以是强制停止、异常停止或其他方式停止。
    首先我们看StreamingContext的stop()方法
def stop(
      stopSparkContext: Boolean = conf.getBoolean("spark.streaming.stopSparkContextByDefault", true)
     ): Unit = synchronized {
    stop(stopSparkContext, false)
}

这里定义了两个参数,stopSparkContext可以通过配置文件定义,接着看接收两个参数的stop方法,代码如下

/**
   * Stop the execution of the streams, with option of ensuring all received data
   * has been processed.
   *
   * @param stopSparkContext if true, stops the associated SparkContext. The underlying SparkContext
   *                         will be stopped regardless of whether this StreamingContext has been
   *                         started.
   * @param stopGracefully if true, stops gracefully by waiting for the processing of all
   *                       received data to be completed
   */
def stop(stopSparkContext: Boolean, stopGracefully: Boolean): Unit = {
    var shutdownHookRefToRemove: AnyRef = null
    if (AsynchronousListenerBus.withinListenerThread.value) {
      throw new SparkException("Cannot stop StreamingContext within listener thread of" +
        " AsynchronousListenerBus")
    }
    synchronized {
      try {
        state match {
          case INITIALIZED =>
            logWarning("StreamingContext has not been started yet")
          case STOPPED =>
            logWarning("StreamingContext has already been stopped")
          case ACTIVE =>
            scheduler.stop(stopGracefully)
            // Removing the streamingSource to de-register the metrics on stop()
            env.metricsSystem.removeSource(streamingSource)
            uiTab.foreach(_.detach())
            StreamingContext.setActiveContext(null)
            waiter.notifyStop()
            if (shutdownHookRef != null) {
              shutdownHookRefToRemove = shutdownHookRef
              shutdownHookRef = null
            }
            logInfo("StreamingContext stopped successfully")
        }
      } finally {
        // The state should always be Stopped after calling `stop()`, even if we haven't started yet
        state = STOPPED
      }
    }
    if (shutdownHookRefToRemove != null) {
      ShutdownHookManager.removeShutdownHook(shutdownHookRefToRemove)
    }
    // Even if we have already stopped, we still need to attempt to stop the SparkContext because
    // a user might stop(stopSparkContext = false) and then call stop(stopSparkContext = true).
    if (stopSparkContext) sc.stop()
}

注释中说明要停止程序时,正确的方式是需要所有接收的数据被处理完成后再停止,那么就需要我们传入的stopGracefully参数为true,然后停止时会等待所有任务执行完成

  1. Spark Streaming提供了一个优雅停止的方法,在StreamingContext里面有一个stopOnShutdown()方法,代码如下
private def stopOnShutdown(): Unit = {
    val stopGracefully = conf.getBoolean("spark.streaming.stopGracefullyOnShutdown", false)
    logInfo(s"Invoking stop(stopGracefully=$stopGracefully) from shutdown hook")
    // Do not stop SparkContext, let its own shutdown hook stop it
    stop(stopSparkContext = false, stopGracefully = stopGracefully)
}

stopOnShutdown()方法是什么意思呢,在我们的程序退出时,不管是正常退出或异常退出,stopOnShutdown()方法都会被回调,然后调用stop方法。stopGracefully 可以通过配置项spark.streaming.stopGracefullyOnShutdown配置,生产环境需要配置为true.

  1. stopOnShutdown()方法是怎样被调用的呢?在StreamingContext的start方法中有一行代码
shutdownHookRef = ShutdownHookManager.addShutdownHook(StreamingContext.SHUTDOWN_HOOK_PRIORITY)(stopOnShutdown)

添加stopOnShutdown函数到ShutdownHookManager中,addShutdownHook代码如下

def addShutdownHook(priority: Int)(hook: () => Unit): AnyRef = {
    shutdownHooks.add(priority, hook)
}

看SparkShutdownHookManager 里都有什么,看代码注释了解SparkShutdownHookManager的功能,不一一介绍

private [util] class SparkShutdownHookManager {

  // 优先级队列,优先级越大,越优先执行
  private val hooks = new PriorityQueue[SparkShutdownHook]()
  @volatile private var shuttingDown = false

  /**
   * Install a hook to run at shutdown and run all registered hooks in order. Hadoop 1.x does not
   * have `ShutdownHookManager`, so in that case we just use the JVM's `Runtime` object and hope for
   * the best.
   */
  // 这里实例化一个线程,添加到jvm的关闭钩子中,等到jvm退出时才会被调用
  def install(): Unit = {
    val hookTask = new Runnable() {
      override def run(): Unit = runAll()
    }    Try(Utils.classForName("org.apache.hadoop.util.ShutdownHookManager")) match {
      case Success(shmClass) =>
        val fsPriority = classOf[FileSystem]
          .getField("SHUTDOWN_HOOK_PRIORITY")
          .get(null) // static field, the value is not used
          .asInstanceOf[Int]
        val shm = shmClass.getMethod("get").invoke(null)
        shm.getClass().getMethod("addShutdownHook", classOf[Runnable], classOf[Int])
          .invoke(shm, hookTask, Integer.valueOf(fsPriority + 30))
      case Failure(_) =>
        Runtime.getRuntime.addShutdownHook(new Thread(hookTask, "Spark Shutdown Hook"));
    }
  }
  // jvm退出时钩子回调此函数
  def runAll(): Unit = {
    shuttingDown = true
    var nextHook: SparkShutdownHook = null
    //循环从优先级队列取数据执行,优先级越大,越优先执行
    while ({ nextHook = hooks.synchronized { hooks.poll() }; nextHook != null }) {
      Try(Utils.logUncaughtExceptions(nextHook.run()))
    }
  }
  def add(priority: Int, hook: () => Unit): AnyRef = {
    hooks.synchronized {
      if (shuttingDown) {
        throw new IllegalStateException("Shutdown hooks cannot be modified during shutdown.")
      }
      val hookRef = new SparkShutdownHook(priority, hook)
      hooks.add(hookRef)
      hookRef
    }
  }
  def remove(ref: AnyRef): Boolean = {
    hooks.synchronized { hooks.remove(ref) }
  }
}
  1. 看到这里就明白了,把stopOnShutdown()函数放入SparkShutdownHookManager 中的优化级队列hooks中,默认优先级为51,jvm退出时启动一个线程,调用runAll()方法,然后从hooks队列中一个一个取数据(函数),然后执行,就调用了stopOnShutdown()函数,接着调用stop()函数,我们的应用程序就可以优雅的执行停止工作了。
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 198,030评论 5 464
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 83,198评论 2 375
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 144,995评论 0 327
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,973评论 1 268
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,869评论 5 359
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,766评论 1 275
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,967评论 3 388
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,599评论 0 254
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,886评论 1 293
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,901评论 2 314
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,728评论 1 328
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,504评论 3 316
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,967评论 3 302
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,128评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,445评论 1 255
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,018评论 2 343
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 41,224评论 2 339

推荐阅读更多精彩内容