前面两节中介绍了 spark rpc 的基本知识,以及深入剖析了 spark rpc 中一些源码的实现流程。
具体可以看这里:
这一节我们来看看一个 spark rpc 中的运用实例 -- spark 的心跳机制。这次主要还是从代码的角度来看。
我们首先要知道 spark 的心跳有什么用。心跳是分布式技术的基础,我们知道在 spark 中,是有一个 master 和众多的 worker,那么 master 怎么知道每个 worker 的情况呢,这就需要借助心跳机制了。心跳除了传输信息,另一个主要的作用就是 worker 告诉 master 它还活着,当心跳停止时,方便 master 进行一些容错操作,比如数据转移备份等等。
我们同样分成两部分来分析 spark 的心跳机制,分为服务端(spark context)和客户端(executor)。
我们可以发现,sparkcontext 中有关于心跳的类以及 rpcendpoint 注册代码。
class sparkcontext(config: sparkconf) extends logging { ...... private var _heartbeatreceiver: rpcendpointref = _ ...... //向 rpcenv 注册 endpoint。 _heartbeatreceiver = env.rpcenv.setupendpoint(heartbeatreceiver.endpoint_name, new heartbeatreceiver(this)) ...... val (sched, ts) = sparkcontext.createtaskscheduler(this, master, deploymode) _schedulerbackend = sched _taskscheduler = ts _dagscheduler = new dagscheduler(this) _heartbeatreceiver.ask[boolean](taskschedulerisset) ...... }
这里 rpcenv 已经在上下文中创建好,通过 setupendpoint 向 rpcenv 注册一个心跳的 endpoint。还记得上一节中 helloworldserver 的例子吗,在 setupendpoint 方法中,会去调用 dispatcher 创建这个 endpoint(这里就是heartbeatreceiver) 对应的 inbox 和 endpointref ,然后在 inbox 监听是否有新消息,有新消息则处理它。注册完会返回一个 endpointref (注意这里有 refer,即是客户端,用来发送消息的)。
所以这一句
_heartbeatreceiver = env.rpcenv.setupendpoint(heartbeatreceiver.endpoint_name, new heartbeatreceiver(this))
就已经完成了心跳服务端监听的功能。
那么这条代码的作用呢?
_heartbeatreceiver.ask[boolean](taskschedulerisset)
这里我们要看上面那句 val (sched, ts) = sparkcontext.createtaskscheduler(this, master, deploymode) ,它会根据 master url 创建 schedulerbackend 和 taskscheduler。这两个类都是和资源调度有关的,所以需要借助心跳机制来传送消息。其中 taskscheduler 负责任务调度资源分配,schedulerbackend 负责与 master、worker 通信收集 worker 上分配给该应用使用的资源情况。
这里主要是告诉 heartbeatreceiver(心跳) 的监听端 ,告诉它 taskscheduler 这个东西已经设置好啦。heartbeatreceiver 就会回应你说好的,我知道的,并持有这个 taskscheduler。
到这里服务端 heartbeatreceiver 就差不多完了,我们可以发现,heartbeatreceiver 除了向 rpcenv 注册并监听消息之外,还会去持有一些资源调度相关的类 ,比如 taskschedulerisset 。
发送心跳发送在 worker ,每个 worker 都会有一个 executor ,所以我们可以发现在 executor 中发送心跳的代码。
private[spark] class executor( executorid: string, executorhostname: string, env: sparkenv, userclasspath: seq[url] = nil, islocal: boolean = false) extends logging { ...... // must be initialized before running startdriverheartbeat() //创建心跳的 endpointref private val heartbeatreceiverref = rpcutils.makedriverref(heartbeatreceiver.endpoint_name, conf, env.rpcenv) ...... startdriverheartbeater() ...... /** * schedules a task to report heartbeat and partial metrics for active tasks to driver. * 用一个 task 来报告活跃任务的信息以及发送心跳。 */ private def startdriverheartbeater(): unit = { val intervalms = conf.gettimeasms("spark.executor.heartbeatinterval", "10s") // wait a random interval so the heartbeats don't end up in sync val initialdelay = intervalms + (math.random * intervalms).asinstanceof[int] val heartbeattask = new runnable() { override def run(): unit = utils.loguncaughtexceptions(reportheartbeat()) } //heartbeater是一个单线程线程池,scheduleatfixedrate 是定时执行任务用的,和 schedule 类似,只是一些策略不同。 heartbeater.scheduleatfixedrate(heartbeattask, initialdelay, intervalms, timeunit.milliseconds) } ...... }
可以看到,在 executor 中会创建心跳的 endpointref ,变量名为 heartbeatreceiverref 。
然后我们主要看 startdriverheartbeater() 这个方法,它是关键。
我们可以看到最后部分代码
val heartbeattask = new runnable() { override def run(): unit = utils.loguncaughtexceptions(reportheartbeat()) } heartbeater.scheduleatfixedrate(heartbeattask, initialdelay, intervalms, timeunit.milliseconds)
heartbeattask 是一个 runaable,即一个线程任务。scheduleatfixedrate 则是 java concurrent 包中用来执行定时任务的一个类,这里的意思是每隔 10s 跑一次 heartbeattask 中的线程任务,超时时间 30s 。
为什么到这里还是没看到 heartbeatreceiverref 呢,说好的发送心跳呢?别急,其实在 heartbeattask 线程任务中又调用了另一个方法,我们到里面去一探究竟。
private[spark] class executor( executorid: string, executorhostname: string, env: sparkenv, userclasspath: seq[url] = nil, islocal: boolean = false) extends logging { ...... private def reportheartbeat(): unit = { // list of (task id, accumupdates) to send back to the driver val accumupdates = new arraybuffer[(long, seq[accumulatorv2[_, _]])]() val curgctime = computetotalgctime() for (taskrunner <- runningtasks.values().asscala) { if (taskrunner.task != null) { taskrunner.task.metrics.mergeshufflereadmetrics() taskrunner.task.metrics.setjvmgctime(curgctime - taskrunner.startgctime) accumupdates += ((taskrunner.taskid, taskrunner.task.metrics.accumulators())) } } val message = heartbeat(executorid, accumupdates.toarray, env.blockmanager.blockmanagerid) try { //终于看到 heartbeatreceiverref 的身影了 val response = heartbeatreceiverref.askwithretry[heartbeatresponse]( message, rpctimeout(conf, "spark.executor.heartbeatinterval", "10s")) if (response.reregisterblockmanager) { loginfo("told to re-register on heartbeat") env.blockmanager.reregister() } heartbeatfailures = 0 } catch { case nonfatal(e) => logwarning("issue communicating with driver in heartbeater", e) heartbeatfailures += 1 if (heartbeatfailures >= heartbeat_max_failures) { logerror(s"exit as unable to send heartbeats to driver " + s"more than $heartbeat_max_failures times") system.exit(executorexitcode.heartbeat_failure) } } } ...... }
可以看到,这里 heartbeatreceiverref 和我们上一节的例子, helloworldclient 类似,核心也是调用了 askwithretry() 方法,这个方法是通过同步的方式发送 rpc 消息。而这个方法里其他代码其实就是获取 task 的信息啊,或者是一些容错处理。核心就是调用 askwithretry() 方法来发送消息。
看到这你就明白了吧。executor 初始化便会用一个定时任务不断发送心跳,同时当有 task 的时候,会获取 task 的信息一并发送。这就是心跳的大概内容了。
ok,spark rpc 三部曲完毕。如果你能看到这里那不容易呀,给自己点个赞吧!!
推荐阅读 :
从分治算法到 mapreduce
大数据存储的进化史 --从 raid 到 hadoop hdfs
如对本文有疑问, 点击进行留言回复!!
before社区电量是什么意思 Before社区电量获得方法
RecycleView入门详解(教你全面掌握RecycleView用法)
动态权限请求框架RxPermissions(几行代码搞定权限)
URL路径@PathVariable出现点号“.“时值遭截断问题
网友评论