当前位置: 移动技术网 > 科技>人工智能>云计算 > spark源代码学习之ContextCleaner清理器

spark源代码学习之ContextCleaner清理器

2017年12月27日  | 移动技术网科技  | 我要评论

spark源代码学习之ContextCleaner清理器,Spark运行的时候,会产生一堆临时文件,临时数据,比如持久化的RDD数据在磁盘上,没有持久化的在内存中,比如shuffle的临时数据等,如果每次运行完,或者没有运行完杀掉了,不清理,会产生大量的无用数据,最终造成大数据集群崩溃而死。

初始化

ContextCleaner的初始化是在SparkContext中初始化的,这个功能默认是必须开
启的。

  _cleaner =
      if (_conf.getBoolean("spark.cleaner.referenceTracking", true)) {
        Some(new ContextCleaner(this))
      } else {
        None
      }
    _cleaner.foreach(_.start())

初始化 的时候主要newle一个清理线程

// 清理线程===》很重要
  private val cleaningThread = new Thread() { override def run() { keepCleaning() }}

这个清理线程,主要清理了RDD,shuffle,Broadcast,累加器,检查点等数据

 /** Keep cleaning RDD, shuffle, and broadcast state.
    * 保持一个干净的RDD,shuffle和broadcast状态
    *
    * ContextCleaner的工作原理和listenerBus一样,也采用监听器模式,由线程来处理,此线程实际还是那个只是
    * 调用keepCleanning方法。
    * */
  private def keepCleaning(): Unit = Utils.tryOrStopSparkContext(sc) {
    // 默认一直为真true
    while (!stopped) {
      try {
        val reference = Option(referenceQueue.remove(ContextCleaner.REF_QUEUE_POLL_TIMEOUT))
          .map(_.asInstanceOf[CleanupTaskWeakReference])
        // Synchronize here to avoid being interrupted on stop()
        synchronized {
          reference.foreach { ref =>
            logDebug("Got cleaning task " + ref.task)
            referenceBuffer.remove(ref)
            // 清除Shuffle和Broadcast相关的数据会分别调用doCleanupShuffle和doCleanupBroadcast函数。根据需要清除数据的类型分别调用
            ref.task match {
              case CleanRDD(rddId) =>
                doCleanupRDD(rddId, blocking = blockOnCleanupTasks)
              case CleanShuffle(shuffleId) =>
                doCleanupShuffle(shuffleId, blocking = blockOnShuffleCleanupTasks)
              case CleanBroadcast(broadcastId) =>
                doCleanupBroadcast(broadcastId, blocking = blockOnCleanupTasks)
              case CleanAccum(accId) =>
                doCleanupAccum(accId, blocking = blockOnCleanupTasks)
              case CleanCheckpoint(rddId) =>
                doCleanCheckpoint(rddId)
            }
          }
        }
      } catch {
        case ie: InterruptedException if stopped => // ignore
        case e: Exception => logError("Error in cleaning thread", e)
      }
    }
  }

RDD的清理

 /** Perform RDD cleanup.
    * 在ContextCleaner 中会调用RDD.unpersist()来清除已经持久化的RDD数据
    * */
  def doCleanupRDD(rddId: Int, blocking: Boolean): Unit = {
    try {
      logDebug("Cleaning RDD " + rddId)
      // 被SparkContext的unpersistRDD方法
      sc.unpersistRDD(rddId, blocking)
      listeners.asScala.foreach(_.rddCleaned(rddId))
      logInfo("Cleaned RDD " + rddId)
    } catch {
      case e: Exception => logError("Error cleaning RDD " + rddId, e)
    }
  }

shuffle的清理

/** Perform shuffle cleanup.
    *
    * 清理Shuffle
    * */
  def doCleanupShuffle(shuffleId: Int, blocking: Boolean): Unit = {
    try {
      logDebug("Cleaning shuffle " + shuffleId)
      // 把mapOutputTrackerMaster跟踪的shuffle数据不注册(具体做了什么,还没处理)
      mapOutputTrackerMaster.unregisterShuffle(shuffleId)
      // 删除shuffle的块数据
      blockManagerMaster.removeShuffle(shuffleId, blocking)
      listeners.asScala.foreach(_.shuffleCleaned(shuffleId))
      logInfo("Cleaned shuffle " + shuffleId)
    } catch {
      case e: Exception => logError("Error cleaning shuffle " + shuffleId, e)
    }
  }

广播的清理

/** Perform broadcast cleanup.
    * 清除广播
    * */
  def doCleanupBroadcast(broadcastId: Long, blocking: Boolean): Unit = {
    try {
      logDebug(s"Cleaning broadcast $broadcastId")
      // 广播管理器 清除广播
      broadcastManager.unbroadcast(broadcastId, true, blocking)
      listeners.asScala.foreach(_.broadcastCleaned(broadcastId))
      logDebug(s"Cleaned broadcast $broadcastId")
    } catch {
      case e: Exception => logError("Error cleaning broadcast " + broadcastId, e)
    }
  }

累加器的清理

/** Perform accumulator cleanup.
    * 清除累加器
    * */
  def doCleanupAccum(accId: Long, blocking: Boolean): Unit = {
    try {
      logDebug("Cleaning accumulator " + accId)
      AccumulatorContext.remove(accId)
      listeners.asScala.foreach(_.accumCleaned(accId))
      logInfo("Cleaned accumulator " + accId)
    } catch {
      case e: Exception => logError("Error cleaning accumulator " + accId, e)
    }
  }

检查点的清理

/**
   * Clean up checkpoint files written to a reliable storage.
   * Locally checkpointed files are cleaned up separately through RDD cleanups.
    *
    * 清理记录到可靠存储的检查点文件。
    * 局部检查点文件通过RDD清理被单独清理。
   */
  def doCleanCheckpoint(rddId: Int): Unit = {
    try {
      logDebug("Cleaning rdd checkpoint data " + rddId)
      // 这里直接调用文件系统删除  是本地 就本地删除,是hdfs就hdfs删除
      ReliableRDDCheckpointData.cleanCheckpoint(sc, rddId)
      listeners.asScala.foreach(_.checkpointCleaned(rddId))
      logInfo("Cleaned rdd checkpoint data " + rddId)
    }
    catch {
      case e: Exception => logError("Error cleaning rdd checkpoint data " + rddId, e)
    }
  }
启动方法

在sparkContext中调用启动方法

    _cleaner.foreach(_.start())

这里是启动方法

/** Start the cleaner.
    * 开始清理
    * */
  def start(): Unit = {
    // 设置清理线程为守护进程
    cleaningThread.setDaemon(true)
    // 设置守护进程的名称
    cleaningThread.setName("Spark Context Cleaner")
    // 启动守护进程
    cleaningThread.start()

    // scheduleAtFixedRate 在给定的初始延迟之后,并随后在给定的时间内,创建并执行一个已启用的周期操作
    // periodicGCInterval=30分钟 也就是没=每过30分钟运行一次清理线程清理垃圾
    periodicGCService.scheduleAtFixedRate(new Runnable {
      // 执行系统的垃圾清理
      override def run(): Unit = System.gc()
    }, periodicGCInterval, periodicGCInterval, TimeUnit.SECONDS)
  }

这里启动线程 // 启动守护进程 cleaningThread.start(),这里自我感觉一下,因为下面调用System.gc()是清理垃圾,所以这个cleaningThread线程应该是收集那些需要清理的数据,保存它的引用(引用就是一个地址,一个指针,指向要删除的数据),最后调用System.gc()方法,才真正清理。

结束

最后是关闭这个应用的时候,调用Stop()方法

/**
   * Stop the cleaning thread and wait until the thread has finished running its current task.
    * 停止清理线程并等待线程完成其当前任务。
   */
  def stop(): Unit = {
    stopped = true
    // Interrupt the cleaning thread, but wait until the current task has finished before
    // doing so. This guards against the race condition where a cleaning thread may
    // potentially clean similarly named variables created by a different SparkContext,
    // resulting in otherwise inexplicable block-not-found exceptions (SPARK-6132).
    // 中断清理线程,但等待当前任务完成后再执行。
    // This guards against the race condition where a cleaning thread may
    // potentially clean similarly named variables created by a different SparkContext,
    // ,导致其他令人费解的块未发现异常(spark-6132)。
    synchronized {
      // 打断线程
      cleaningThread.interrupt()
    }
    // 设置0 等待这个线程死掉
    cleaningThread.join()
    // 关闭垃圾清理
    periodicGCService.shutdown()
  }

如对本文有疑问, 点击进行留言回复!!

相关文章:

验证码:
移动技术网