当前位置: 移动技术网 > IT编程>数据库>MSSQL > 荐 深入理解Spark远程通信组件RPC及消息处理机制

荐 深入理解Spark远程通信组件RPC及消息处理机制

2020年07月17日  | 移动技术网IT编程  | 我要评论

1.简介

在Spark中,不同组件像driver,executor,worker,master(stanalone模式)之间的通信是基于RPC来实现的。Spark 1.6之前,Spark的RPC是基于Akka来实现的。Akka是一个基于scala语言的异步的消息框架。Spark1.6后,spark借鉴Akka的设计自己实现了一个基于Netty的rpc框架。本文主要对spark1.6之后基于netty新开发的rpc框架做一个较为深入的分析。

2.整体架构

在这里插入图片描述

3.核心组件

3.1 RpcEnv

在介绍RpcEnv之前,我们先介绍SparkEnv。SparkEnv是Spark的执行环境对象,其中包括与众多Executor执行相关的对象。Spark 对任务的计算都依托于 Executor 的能力,所有的 Executor 都有自己的 Spark 的执行环境 SparkEnv。有了 SparkEnv,就可以将数据存储在存储体系中;就能利用计算引擎对计算任务进行处理,就可以在节点间进行通信等。

RpcEnv为RpcEndpoint提供处理消息的环境。RpcEnv负责RpcEndpoint整个生命周期的管理,包括:注册endpoint,endpoint之间消息的路由,以及停止endpoint。
在这里插入图片描述

private[spark] abstract class RpcEnv(conf: SparkConf) {
	/**
     * RPC远程终端查找的默认超时时间,默认为120s
     */
   private[spark] val defaultLookupTimeout = RpcUtils.lookupRpcTimeout(conf)

   /**
     * 返回已经注册的[[RpcEndpoint]]的RpcEndpointRef。
     * 该方法只用于[[RpcEndpoint.self]]方法实现中。
     * 如果终端相关的[[RpcEndpointRef]]不存在,则返回null。
     */
   private[rpc] def endpointRef(endpoint: RpcEndpoint): RpcEndpointRef

   /**
     * 如果是服务器模式,则返回当前服务器监听的地址;否则为空
     */
   def address: RpcAddress

   /**
     * 使用一个name来注册一个[[RpcEndpoint]],并且返回它的[[RpcEndpointRef]]对象。
     * [[RpcEnv]]并不保证线程安全性。
     */
   def setupEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef

   /**
     * 通过一个URI来异步检索[[RpcEndpointRef]]对象
     */
   def asyncSetupEndpointRefByURI(uri: String): Future[RpcEndpointRef]

   /**
     * 通过一个URI来同步检索[[RpcEndpointRef]]对象
     */
   def setupEndpointRefByURI(uri: String): RpcEndpointRef = {
       defaultLookupTimeout.awaitResult(asyncSetupEndpointRefByURI(uri))
   }

   /**
     * 根据`address` 和 `endpointName`对 [[RpcEndpointRef]]进行同步检索。
     */
   def setupEndpointRef(address: RpcAddress, endpointName: String): RpcEndpointRef = {
       setupEndpointRefByURI(RpcEndpointAddress(address, endpointName).toString) // URI:
   }

   /**
     * 停止指定的[[RpcEndpoint]]对象。
     */
   def stop(endpoint: RpcEndpointRef): Unit

   /**
     * 异步关闭当前的[[RpcEnv]]。
     * 如果需要确保成功地退出[[RpcEnv]],在执行[[shutdown()]]之后需要调用[[awaitTermination()]]。
     */
   def shutdown(): Unit

   /**
     * 等待直到[[RpcEnv]]退出。
     * TODO do we need a timeout parameter?
     */
   def awaitTermination(): Unit

   /**
     * 如果没有[[RpcEnv]]对象,那么[[RpcEndpointRef]]将不能被反序列化。
     * 因此,如果任何反序列化的对象中包含了[[RpcEndpointRef]],那么这些反序列化的代码都应该在该方法中执行。
     */
   def deserialize[T](deserializationAction: () => T): T

   /**
     * 用于返回文件服务器的实例。
     * 如果RpcEnv不是以服务器模式运行,那么该项可能为null。
     *
     */
   def fileServer: RpcEnvFileServer

   /**
     * 打开一个通道从给定的URI下载文件。
     * 如果由RpcEnvFileServer返回的URI使用"spark"模式,那么该方法将会被工具类调用来进行文件检索。
     *
     * @param uri URI with location of the file.
     */
   def openChannel(uri: String): ReadableByteChannel

}

3.2 RpcEndpoint

RpcEndPoint 代表具体的通信节点,例如Master、Worker、CoarseGrainedSchedulerBackend中的DriverEndpoint、CoarseGrainedExecutorBackend等,都实现了该接口,在具体的函数中定义了消息传递来时的处理逻辑,整个生命周期是constructor -> onStart -> receive* -> onStop,即调用构造函数,然后向RpcEnv注册,内部调用onStart,之后如果收到消息,RpcEnv会调用receive*方法,结束时调用onStop方法,并给出了通信过程中RpcEndpoint所具有的基于事件驱动的行为(连接、断开、网络异常),实际上对于Spark框架来说主要是接收消息并处理

private[spark] trait RpcEndpoint {
  // 当前RpcEndpoint注册的RpcEnv
  val rpcEnv: RpcEnv
  /**
   * 当前[[RpcEndpoint]]的代理,当`onStart`方法被调用时`self`生效,当`onStop`被调用时,`self`变
   * 成null。
   * 注意:在`onStart`方法被调用之前,[[RpcEndpoint]]对象还未进行注册,所以就没有有效的			 *    * [[RpcEndpointRef]]。
   */
  final def self: RpcEndpointRef
  /**
   * 接收由RpcEndpointRef.send方法发送的消息,
   * 该类消息不需要进行响应消息(Reply),而只是在RpcEndpoint端进行处理。
   * 如果接收到一个不匹配的消息,将会抛出SparkException异常,并发送给`onError`。
   */
  def receive: PartialFunction[Any, Unit]
  /**
   * 处理来自`RpcEndpointRef.ask`的消息,RpcEndpoint端处理完消息后,需要给调用RpcEndpointRef.ask    * 的通信端返回响应消息。
   */
  def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit]

  //一系列的回调函数
  def onError(cause: Throwable): Unit
  def onConnected(remoteAddress: RpcAddress): Unit
  def onDisconnected(remoteAddress: RpcAddress): Unit
  def onNetworkError(cause: Throwable, remoteAddress: RpcAddress): Unit
  def onStart(): Unit
  def onStop(): Unit

  // 停止RpcEndpoint
  final def stop(): Unit 
}

3.3 RpcEndpointRef

RpcEndpointRef是一个对RpcEndpoint的远程引用对象,内部记录了RpcEndpoint的位置信息,通过它可以向远程的RpcEndpoint端发送消息以进行通信。

private[spark] abstract class RpcEndpointRef(conf: SparkConf)
  extends Serializable with Logging {
  // 最大重连次数(3),重新尝试的等待事件(3s),默认的超时事件(120s)
  private[this] val maxRetries = RpcUtils.numRetries(conf)
  private[this] val retryWaitMs = RpcUtils.retryWaitMs(conf)
  private[this] val defaultAskTimeout = RpcUtils.askRpcTimeout(conf)

  // 对应RpcEndpoint的地址(host:port),名称
  def address: RpcAddress
  def name: String

  // 发送一条单向的异步消息,并且发送消息后不等待响应,亦即Send-and-forget。
  def send(message: Any): Unit
  /**
   * 发送消息给相关的[[RpcEndpoint.receiveAndReply]],并且返回一个 Future,能够在timeout时间内接    * 收回复。
   * 该方法只会发送一次消息,失败后不重试。
   * 而ask方法发送消息后需要等待通信对端给予响应,通过Future来异步获取响应结果。
   */
  def ask[T: ClassTag](message: Any, timeout: RpcTimeout): Future[T]
      
  // 发送消息到相应的`RpcEndpoint.receiveAndReply`,阻塞等待回复的结果
  def askSync[T: ClassTag](message: Any): T = askSync(message, defaultAskTimeout)
  ....
}

4.收件箱Inbox

从上面的体系结构图可知,InBox作用于服务器端。它与RpcEndpoint是一对一的关系,每一个命名唯一的RpcEndpoint对应一个线程安全的InBox。所有发送给一个RpcEndpoint的消息,都由对应的InBox进行存储。InBox提供一个process方法实现,该方法会在一个dispatcher-event-loop线程池中被调用,将InBox中的消息提供给关联的RpcEndpoint进行消费。

需要注意的是,如果通信端端点的实现是继承自ThreadSafeRpcEndpoint,则表明该Endpoint不允许并发处理消息。如果继承自RpcEndpoint,那么就可以并发的调用该服务。在具体的process方法中,如果enableConcurrent为false,即只允许单线程处理。那么执行process方法时,如果numActiveThreads大于0,说明已经至少有一个线程正在处理,则立即返回,取消本次处理操作。

4.1 InboxMessage

InboxMessage是一个特质,是Inbox中所处理的基本消息单位,OneWayMessage,RpcMessage,OnStart等消息都继承了InboxMessage

4.2 messages

存储InboxMessage的链表

protected val messages = new java.util.LinkedList[InboxMessage]()

4.3 消息处理

def process(dispatcher: Dispatcher): Unit = {
    var message: InboxMessage = null
    inbox.synchronized {
      //enableConcurrent默认为false
      //当前线程被占用时,即返回
      if (!enableConcurrent && numActiveThreads != 0) {
        return
      }
      
      //只有当Inbox中有新消息(InboxMessage)时,才会启用当前线程
      message = messages.poll()
      if (message != null) {
        numActiveThreads += 1
      } else {
        return
      }
    }
    while (true) {
      ...
      //消息处理
      }

      inbox.synchronized {
        //enableConcurrent会被设置为false在onStop()被调用后
        if (!enableConcurrent && numActiveThreads != 1) {
          // If we are not the only one worker, exit
          numActiveThreads -= 1
          return
        }
        message = messages.poll()
        if (message == null) {
          numActiveThreads -= 1
          return
        }
      }
    }
  }

4.4消息发送

def post(message: InboxMessage): Unit = inbox.synchronized {
    if (stopped) {
      // We already put "OnStop" into "messages", so we should drop further messages
      onDrop(message)
    } else {
      messages.add(message)
      false
    }
  }

5.消息转发路由Dispatcher

消息调度器,有效提高NettyRpcEnv对消息的处理并最大提升并行处理,主要负责将RPC消息发送到要对该消息处理的RpcEndpoint.

5.1 EndpointData

EndpointData是一个简单的JavaBean类,RPC端点数据,把RpcEndpoint,RpcEndpointRef,Inbox封装在一起。

private class EndpointData(
      val name: String,
      val endpoint: RpcEndpoint,
      val ref: NettyRpcEndpointRef) {
    val inbox = new Inbox(ref, endpoint)
  }

5.2 receivers

端点数据的阻塞队列(LinkedBlockingQueue),只有当Inbox中有新消息(InboxMessage)时,才会放入到此队列中。

private val receivers = new LinkedBlockingQueue[EndpointData]

5.3 threadpool

用于对消息进行调度的线程池,默认线程池大小为2,线程为MessageLoop

/** Message loop used for dispatching messages. */
  private class MessageLoop extends Runnable {
    override def run(): Unit = {
      try {
        while (true) {
          try {
            //从receivers队列中拿取data数据,只有当Inbox中有新消息时,data才会被放入到receivers中,由于receivers是一个阻塞队列,所以没有消息时,线程会被阻塞
            val data = receivers.take()
            //毒丸,空数据
            if (data == PoisonPill) {
              // Put PoisonPill back so that other MessageLoops can see it.
              receivers.offer(PoisonPill)
              return
            }
            data.inbox.process(Dispatcher.this)
          } catch {
            case NonFatal(e) => logError(e.getMessage, e)
          }
        }
      } catch {
        case ie: InterruptedException => // exit
      }
    }
  }

5.5. 发送消息

/**
   * Posts a message to a specific endpoint.
   *
   * @param endpointName name of the endpoint.
   * @param message the message to post
   * @param callbackIfStopped callback function if the endpoint is stopped.
   */
private def postMessage(
      endpointName: String,
      message: InboxMessage,
      callbackIfStopped: (Exception) => Unit): Unit = {
    val error = synchronized {
      val data = endpoints.get(endpointName)
      if (stopped) {
        Some(new RpcEnvStoppedException())
      } else if (data == null) {
        Some(new SparkException(s"Could not find $endpointName."))
      } else {
        data.inbox.post(message)
        receivers.offer(data)
        None
      }
    }
    // We don't need to call `onStop` in the `synchronized` block
    error.foreach(callbackIfStopped)
  }

5.5 Dispatcher内存模型

在这里插入图片描述

6.发件箱OutBox

从上面的体系结构图可知,OnBox作用于客户端。类似于收件箱InBox,它与TransportClient是一对一的关系,而一个TransportClient对应着一个远程的RpcEndPoint。

6.1 OutBoxMessage

具体的消息发送逻辑则交由OutboxMessage的实现来完成,OutboxMessage有两个子类,OneWayOutboxMessageRpcOutboxMessage,分别对应调用RpcEndpointreceivereceiveAndReply方法。

对于OneWayOutboxMessage,由于不需要返回值,则简单地通过调用传输层client.send方法将消息发出。

private[netty] case class OneWayOutboxMessage(content: ByteBuffer) extends OutboxMessage
  with Logging {

  override def sendWith(client: TransportClient): Unit = {
    client.send(content)
  }

  override def onFailure(e: Throwable): Unit = {
    e match {
      case e1: RpcEnvStoppedException => logWarning(e1.getMessage)
      case e1: Throwable => logWarning(s"Failed to send one-way RPC.", e1)
    }
  }

}

对于RpcOutboxMessage,由于需要服务器的响应结果,因此需要实现传输层提供的RpcResponseCallback接口,并提供onFailure和onSuccess的方法实现。在实际的发送消息时会使用client.sendRpc方法,将消息内容和RpcResponseCallback对象传递给传输层,该方法会立即返回一个requestId。

而传输层底层会有独立的线程负责将消息序列化并且发送出去,每个Message都会返回一个UUID,由底层来维护一个发送出去消息与其Callback的HashMap。

  1. 如果请求超时,会通过requestId在传输层中移除该RPC请求,从而达到取消消息发送的效果;
  2. 如果请求的消息成功返回,则会使用RpcResponseCallback对象根据返回的状态回调对应的onFailure和onSuccess的方法,进而回调Spark core中的业务逻辑,执行Promise/Future的done方法,上层退出阻塞。
private[netty] case class RpcOutboxMessage(
    content: ByteBuffer,
    _onFailure: (Throwable) => Unit,
    _onSuccess: (TransportClient, ByteBuffer) => Unit)
  extends OutboxMessage with RpcResponseCallback with Logging {

  private var client: TransportClient = _
  private var requestId: Long = _

  override def sendWith(client: TransportClient): Unit = {
    this.client = client
    this.requestId = client.sendRpc(content, this)
  }

  def onTimeout(): Unit = {
    if (client != null) {
      client.removeRpcRequest(requestId)
    } else {
      logError("Ask timeout before connecting successfully")
    }
  }

  override def onFailure(e: Throwable): Unit = {
    _onFailure(e)
  }

  override def onSuccess(response: ByteBuffer): Unit = {
    _onSuccess(client, response)
  }

}

6.2 发送消息

/**
 * 用于发送消息。
 * - 如果目前没有可用的连接,则将消息缓存并建立一个连接。
 * - 如果[[Outbox]]已经停止,那么sender将会抛出一个[[SparkException]]
 */
  def send(message: OutboxMessage): Unit = {
    val dropped = synchronized {
      if (stopped) {
        true
      } else {
        //将RpcEnv中所有转发给某个RpcEndPoint的消息都先放到一个messages链表中
        messages.add(message)
        false
      }
    }
    if (dropped) {
      message.onFailure(new SparkException("Message is dropped because Outbox is stopped"))
    } else {
      drainOutbox()
    }
  }

6.3 清空发件箱

drainOutbox主要用于清空发件箱中的消息,消息会通过传输层TransportClient发送给远端服务器。该方法会在开始处进行一系列的检查,需要保证传输层的连接已经建立,如果没有建立,则向nettyEnv.clientConnectionExecutor提交建立连接的任务后并返回,待连接任务完成后会再次调用drainOutox方法。另外,drainOutox会保证线程安全性,通过布尔值draining可以保证同一时刻只会有一个线程能够进行消息的处理和发送。

6.4 关闭发件箱

网络连接错误和RpcEnv的停止运行都会触发OutBox的关闭和资源的清理,OutBox关闭的处理逻辑如下:

  1. 如果connectFuture不为空,说明这会正在执行连接任务,那么调用connectFuture.cancel(true)方法,将任务取消。

  2. 调用closeClient方法,关闭客户端,这里仅仅将client引用置为null,但并不是真正的关闭,因为需要重用连接。

  3. 调用nettyEnv.removeOutbox(remoteAddress)方法,从nettyEnv中移除OutBox,因此将来的消息将会使用一个新的或原有的client连接并创建一个新的OutBox。

  4. 执行所有还未处理的消息的onFailure方法,并告知失败的原因。

会再次调用drainOutox方法。另外,drainOutox会保证线程安全性,通过布尔值draining可以保证同一时刻只会有一个线程能够进行消息的处理和发送。

6.4 关闭发件箱

网络连接错误和RpcEnv的停止运行都会触发OutBox的关闭和资源的清理,OutBox关闭的处理逻辑如下:

  1. 如果connectFuture不为空,说明这会正在执行连接任务,那么调用connectFuture.cancel(true)方法,将任务取消。

  2. 调用closeClient方法,关闭客户端,这里仅仅将client引用置为null,但并不是真正的关闭,因为需要重用连接。

  3. 调用nettyEnv.removeOutbox(remoteAddress)方法,从nettyEnv中移除OutBox,因此将来的消息将会使用一个新的或原有的client连接并创建一个新的OutBox。

  4. 执行所有还未处理的消息的onFailure方法,并告知失败的原因。

本文地址:https://blog.csdn.net/wangsl754/article/details/104978021

如对本文有疑问, 点击进行留言回复!!

相关文章:

验证码:
移动技术网