厦门鱼皮花生,鲸鱼向渔民求助,鱼台天气
业务需求import akka.actor.{Actor, ActorRef, ActorSystem, Props} import com.typesafe.config.{Config, ConfigFactory} import akka.actor.{Actor, ActorSystem, Props} import com.typesafe.config.ConfigFactory import scala.collection.mutable class Master extends Actor{ // 启动定时任务,检测超时的worker,把超时的worker删除掉 本身就是发送给自己的 override def preStart(): Unit = { import scala.concurrent.duration._ import context.dispatcher // 检测的时间 15s > 心跳的时间(10s) context.system.scheduler.schedule(0 seconds,15 seconds,self,CheckWorkerStatus) } //HashMap存放worker的注册信息 val workersMap = new mutable.HashMap[String,WorkerInfor]() override def receive: Receive = { case RegisterToMaster(workerHost,memory,cpu) => { println(workerHost,memory,cpu) val infor: WorkerInfor = new WorkerInfor(workerHost,memory,cpu) workersMap(workerHost) = infor println(s"There is recieve ${workerHost} register massage: Worker-num=${workersMap.size}") //返回注册成功的信息 sender() ! RegisterSuccess } //worker注册成功以后开始定时发送心跳信息,master要实时更新新的worker信息 case HeartBeat(workerHost) => { //判断master是否有worker的信息 if(workersMap.contains(workerHost)){ //将worker信息替换成最新的 val workerInfo: WorkerInfor = workersMap(workerHost) workerInfo.LastHeartBeatTime = System.currentTimeMillis() } } //检测master中的worker状态,将超时没有更新信息的worker剔除 case CheckWorkerStatus => { //超时规则2次未注册则剔除 当前时间 - worker最后一次注册的时间 > 2次未注册的时间 val deadWorker = workersMap.filter({ case (workerHost,info) => { System.currentTimeMillis() - info.LastHeartBeatTime > 20*1000 } }) //剔除已经死亡的worker /*deadWorker.foreach({ case (workerHost,_) => { workersMap -= (workerHost) } })*/ workersMap --= deadWorker.map(_._1) println(s"完成检测,最新的worker的数量为——${workersMap.size}") } } } object Master { val MASTER_ACS_NAME = "Master_acs_name" val MASTER_AC_NAME = "Master_ac_name" def main(args: Array[String]): Unit = { if(args.size!=2){ println("cn.edu360.scala_maven.MasterAndWorker.Master ") sys.exit(1)//输入参数错误时,结束程序 } val Array(masterHost,masterPort) = args val src = s""" |akka.actor.provider = "akka.remote.RemoteActorRefProvider" |akka.remote.netty.tcp.hostname = ${masterHost} |akka.remote.netty.tcp.port = ${masterPort} """.stripMargin //获取以个CongigFactory工厂 val conf: Config = ConfigFactory.parseString(src) //创建一个ActorSystem val asc: ActorSystem = ActorSystem.create(MASTER_ACS_NAME,conf) val masteracname: ActorRef = asc.actorOf(Props(new Master()),MASTER_AC_NAME) } }
## Worker ##
import java.util.UUID import akka.actor.{Actor, ActorRef, ActorSelection, ActorSystem, Props} import com.typesafe.config.{Config, ConfigFactory} class Worker(val masterHost:String,val masterPost:Int,var memory:Int,var cpu:Int) extends Actor{ var selection: ActorSelection = null val workerHost:String = UUID.randomUUID().toString override def preStart(): Unit = { println("run preStart") //连接master val path = s"akka.tcp://${Master.MASTER_ACS_NAME}@${masterHost}:${masterPost}/user/${Master.MASTER_AC_NAME}" selection = context.actorSelection(path) //向Master发送注册信息 selection ! RegisterToMaster(workerHost,memory,cpu) } override def receive: Receive = { case RegisterSuccess => { println("Register success! begin to start schedule!") import scala.concurrent.duration._ import context.dispatcher //启动定时任务 /** * initialDelay: FiniteDuration, 启动延时 interval: FiniteDuration, 间隔时间 receiver: ActorRef, 信息接收方 message: Any 信息内容 */ //ActorRef类型不匹配 //context.system.scheduler.schedule(0 seconds,10 seconds,selection,HeartBeat(workerHost)) context.system.scheduler.schedule(0 seconds,10 seconds,self,SendHeartBeat) } case SendHeartBeat => { selection ! HeartBeat(workerHost) println("worker实时向master发送心跳信息!") } } } object Worker { val WORKER_ACS_NAME = "Worker_acs_name" val WORKER_AC_NAME = "Worker_ac_name" def main(args: Array[String]): Unit = { if(args.size!=6){ println("cn.edu360.scala_maven.MasterAndWorker.Master masterHost,masterPost,workerHost,workerPost,memory,cpu") sys.exit(1)//输入参数错误时,结束程序 } val Array(masterHost,masterPost,workerHost,workerPost,memory,cpu) = args val src = s""" |akka.actor.provider = "akka.remote.RemoteActorRefProvider" |akka.remote.netty.tcp.hostname = ${workerHost} |akka.remote.netty.tcp.port = ${workerPost} """.stripMargin //获取以个CongigFactory工厂 val conf: Config = ConfigFactory.parseString(src) //创建一个ActorSystem val acs: ActorSystem = ActorSystem.create(WORKER_ACS_NAME,conf) val workeracname: ActorRef = acs.actorOf(Props(new Worker(masterHost,masterPost.toInt,memory.toInt,cpu.toInt)),WORKER_AC_NAME) } }
## 封装类 ##
class MassagePassing { } //worker向master发送注册信息 case class RegisterToMaster(workerHost:String,memory:Int,cpu:Int) //向worker发送注册成功的信息 case object RegisterSuccess //向master发送心跳信息证明该worker正常工作 case class HeartBeat(workerHost:String) //向master发送心跳信息证明该worker正常工作 case object SendHeartBeat //检测worker注册信息 case object CheckWorkerStatus
case class WorkerInfor(val workerHost:String,var memory:Int,var cpu:Int) { var LastHeartBeatTime:Long = _ }windows运行结果
如对本文有疑问,请在下面进行留言讨论,广大热心网友会与你互动!! 点击进行留言回复
网友评论