当前位置: 移动技术网 > 科技>人工智能>云计算 > sparkRPC通信框架

sparkRPC通信框架

2018年01月22日  | 移动技术网科技  | 我要评论

厦门鱼皮花生,鲸鱼向渔民求助,鱼台天气

业务需求
1,master worker 都要启动
2,worker在启动之后,需要向master发送注册请求 附带信息 workerId, cores 内存 可以使用 case calss 封装数据
3,master接收到worker的请求信息之后,保存worker的注册信息,向worker发送响应信息(注册成功)
4,worker收到注册成功的信息之后,要定时发送心跳(报活) 定时任务 case class workerId
5,master收到worker发送的心跳信息之后,就要更新worker的心跳时间
6,master启动之后,定时检测worker的状态 (如果检测出来worker挂掉了,那 删除该worker的注册信息) 程序实现
## Master ##
import akka.actor.{Actor, ActorRef, ActorSystem, Props}
import com.typesafe.config.{Config, ConfigFactory}
import akka.actor.{Actor, ActorSystem, Props}
import com.typesafe.config.ConfigFactory

import scala.collection.mutable


class Master extends Actor{

// 启动定时任务,检测超时的worker,把超时的worker删除掉   本身就是发送给自己的
  override def preStart(): Unit = {

    import scala.concurrent.duration._
    import context.dispatcher
    // 检测的时间 15s >  心跳的时间(10s)
    context.system.scheduler.schedule(0 seconds,15 seconds,self,CheckWorkerStatus)
  }
  //HashMap存放worker的注册信息
  val workersMap = new mutable.HashMap[String,WorkerInfor]()
  override def receive: Receive = {

    case RegisterToMaster(workerHost,memory,cpu) => {
      println(workerHost,memory,cpu)

      val infor: WorkerInfor = new WorkerInfor(workerHost,memory,cpu)
      workersMap(workerHost) = infor
      println(s"There is recieve ${workerHost} register massage: Worker-num=${workersMap.size}")

      //返回注册成功的信息
      sender() ! RegisterSuccess
    }
      //worker注册成功以后开始定时发送心跳信息,master要实时更新新的worker信息
    case HeartBeat(workerHost) => {
      //判断master是否有worker的信息
      if(workersMap.contains(workerHost)){
        //将worker信息替换成最新的
        val workerInfo: WorkerInfor = workersMap(workerHost)

        workerInfo.LastHeartBeatTime = System.currentTimeMillis()
      }
    }
      //检测master中的worker状态,将超时没有更新信息的worker剔除
    case CheckWorkerStatus => {
      //超时规则2次未注册则剔除 当前时间 - worker最后一次注册的时间 > 2次未注册的时间
      val deadWorker = workersMap.filter({
        case (workerHost,info) => {
          System.currentTimeMillis() - info.LastHeartBeatTime > 20*1000
        }
      })
      //剔除已经死亡的worker
      /*deadWorker.foreach({
        case (workerHost,_) => {
          workersMap -= (workerHost)
        }
      })*/
      workersMap --= deadWorker.map(_._1)
      println(s"完成检测,最新的worker的数量为——${workersMap.size}")
    }
  }
}

object Master {
  val MASTER_ACS_NAME = "Master_acs_name"
  val MASTER_AC_NAME = "Master_ac_name"

  def main(args: Array[String]): Unit = {
    if(args.size!=2){
      println("cn.edu360.scala_maven.MasterAndWorker.Master   ")
      sys.exit(1)//输入参数错误时,结束程序
    }
    val Array(masterHost,masterPort) = args
    val src =
      s"""
         |akka.actor.provider = "akka.remote.RemoteActorRefProvider"
         |akka.remote.netty.tcp.hostname = ${masterHost}
         |akka.remote.netty.tcp.port = ${masterPort}
      """.stripMargin
    //获取以个CongigFactory工厂
    val conf: Config = ConfigFactory.parseString(src)
    //创建一个ActorSystem
    val asc: ActorSystem = ActorSystem.create(MASTER_ACS_NAME,conf)

    val masteracname: ActorRef = asc.actorOf(Props(new Master()),MASTER_AC_NAME)
  }
}

## Worker ##

import java.util.UUID

import akka.actor.{Actor, ActorRef, ActorSelection, ActorSystem, Props}
import com.typesafe.config.{Config, ConfigFactory}

class Worker(val masterHost:String,val masterPost:Int,var memory:Int,var cpu:Int) extends Actor{
  var selection: ActorSelection = null
  val workerHost:String = UUID.randomUUID().toString
  override def preStart(): Unit = {
    println("run preStart")
    //连接master
    val path = s"akka.tcp://${Master.MASTER_ACS_NAME}@${masterHost}:${masterPost}/user/${Master.MASTER_AC_NAME}"
    selection = context.actorSelection(path)
    //向Master发送注册信息
    selection ! RegisterToMaster(workerHost,memory,cpu)
  }
  override def receive: Receive = {
    case RegisterSuccess => {
      println("Register success! begin to start schedule!")

      import scala.concurrent.duration._
      import context.dispatcher

      //启动定时任务
      /**
        * initialDelay: FiniteDuration, 启动延时
          interval:     FiniteDuration, 间隔时间
          receiver:     ActorRef, 信息接收方
          message:      Any       信息内容
        */
      //ActorRef类型不匹配
      //context.system.scheduler.schedule(0 seconds,10 seconds,selection,HeartBeat(workerHost))
      context.system.scheduler.schedule(0 seconds,10 seconds,self,SendHeartBeat)
    }
    case SendHeartBeat => {
      selection ! HeartBeat(workerHost)
      println("worker实时向master发送心跳信息!")
    }
  }
}

object Worker {
  val WORKER_ACS_NAME = "Worker_acs_name"
  val WORKER_AC_NAME = "Worker_ac_name"
  def main(args: Array[String]): Unit = {
    if(args.size!=6){
      println("cn.edu360.scala_maven.MasterAndWorker.Master  masterHost,masterPost,workerHost,workerPost,memory,cpu")
      sys.exit(1)//输入参数错误时,结束程序
    }
    val Array(masterHost,masterPost,workerHost,workerPost,memory,cpu) = args
    val src =
      s"""
        |akka.actor.provider = "akka.remote.RemoteActorRefProvider"
        |akka.remote.netty.tcp.hostname = ${workerHost}
        |akka.remote.netty.tcp.port = ${workerPost}
      """.stripMargin
    //获取以个CongigFactory工厂
    val conf: Config = ConfigFactory.parseString(src)
    //创建一个ActorSystem
    val acs: ActorSystem = ActorSystem.create(WORKER_ACS_NAME,conf)

    val workeracname: ActorRef = acs.actorOf(Props(new Worker(masterHost,masterPost.toInt,memory.toInt,cpu.toInt)),WORKER_AC_NAME)
  }
}

## 封装类 ##

class MassagePassing {

}
//worker向master发送注册信息
case class RegisterToMaster(workerHost:String,memory:Int,cpu:Int)

//向worker发送注册成功的信息
case object RegisterSuccess

//向master发送心跳信息证明该worker正常工作
case class HeartBeat(workerHost:String)

//向master发送心跳信息证明该worker正常工作
case object SendHeartBeat

//检测worker注册信息
case object CheckWorkerStatus
case class WorkerInfor(val workerHost:String,var memory:Int,var cpu:Int) {
  var LastHeartBeatTime:Long = _
}
windows运行结果
这里写图片描述
这里写图片描述 集群运行
这里写图片描述

这里写图片描述

这里写图片描述

这里写图片描述

这里写图片描述

如对本文有疑问,请在下面进行留言讨论,广大热心网友会与你互动!! 点击进行留言回复

相关文章:

验证码:
移动技术网