class WorkerInfo(val id : String, val workerHost : String, val memory : String, val cores : String) { var lastHeartbeat : Long = System.currentTimeMillis() override def toString = s"WorkerInfo($id, $workerHost, $memory, $cores)" }
case class RegisterWorker(val id : String, val workerHost : String, val memory : String, val cores : String) case class HeartBeat(val workid : String) case class CheckOfTimeOutWorker() case class RegisteredWorker(val workerHost : String) case class SendHeartBeat()
import akka.actor.{Actor, ActorSystem, Props} import com.typesafe.config.ConfigFactory import scala.collection.mutable import scala.concurrent.duration._ import scala.concurrent.ExecutionContext.Implicits.global class Master extends Actor{ //保存 WorkerID 和 Work 信息的 map val idToWorker = new mutable.HashMap[String, WorkerInfo] //保存所有 Worker 信息的 Set val workers = new mutable.HashSet[WorkerInfo] //Worker 超时时间 val WORKER_TIMEOUT = 10 * 1000 //构造方法执行完执行一次 override def preStart(): Unit = { //启动定时器,定时执行 context.system.scheduler.schedule(5 millis, WORKER_TIMEOUT millis, self, CheckOfTimeOutWorker) } //该方法会被反复执行,用于接收消息,通过 case class 模式匹配接收消息 override def receive: Receive = { //Worker 向 Master 发送的注册消息 case RegisterWorker(id, workerHost, memory, cores) => { if(!idToWorker.contains(id)) { val worker = new WorkerInfo(id, workerHost, memory, cores) workers.add(worker) idToWorker(id) = worker println("new register worker: "+worker) sender ! RegisteredWorker(worker.id) } } //Worker 向 Master 发送的心跳消息 case HeartBeat(workerId) => { val workerInfo = idToWorker(workerId) println("get heartbeat message from: "+workerInfo) workerInfo.lastHeartbeat = System.currentTimeMillis() } //Master 自己向自己发送的定期检查超时 Worker 的消息 case CheckOfTimeOutWorker => { val currentTime = System.currentTimeMillis() val toRemove = workers.filter(w => currentTime - w.lastHeartbeat > WORKER_TIMEOUT).toArray for(worker <- toRemove){ workers -= worker idToWorker.remove(worker.id) } println("worker size: " + workers.size) } } } object Master { //程序执行入口 def main(args: Array[String]) { val host = "localhost" val port = 8888 //创建 ActorSystem 的必要参数 val configStr = s""" |akka.actor.provider = "akka.remote.RemoteActorRefProvider" |akka.remote.netty.tcp.hostname = "$host" |akka.remote.netty.tcp.port = "$port" """.stripMargin val config = ConfigFactory.parseString(configStr) //ActorSystem 是单例的,用来创建 Actor val actorSystem = ActorSystem.create("MasterActorSystem", config) //启动 Actor,Master 会被实例化,生命周期方法会被调用 actorSystem.actorOf(Props[Master], "Master") } }
import java.util.UUID import akka.actor.{Actor, ActorSelection, ActorSystem, Props} import com.typesafe.config.ConfigFactory import scala.concurrent.duration._ import scala.concurrent.ExecutionContext.Implicits.global class Worker extends Actor{ //Worker 端持有 Master 端的引用(代理对象) var master: ActorSelection = null //生成一个 UUID,作为 Worker 的标识 val id = UUID.randomUUID().toString //构造方法执行完执行一次 override def preStart(): Unit = { //Worker 向 MasterActorSystem 发送建立连接请求 master = context.system.actorSelection("akka.tcp://MasterActorSystem@localhost:8888/use r/Master") //Worker 向 Master 发送注册消息 master ! RegisterWorker(id, "localhost", "10240", "8") } //该方法会被反复执行,用于接收消息,通过 case class 模式匹配接收消息 override def receive: Receive = { //Master 向 Worker 的反馈信息 case RegisteredWorker(masterUrl) => { //启动定时任务,向 Master 发送心跳 context.system.scheduler.schedule(0 millis, 5000 millis, self, SendHeartBeat) } case SendHeartBeat => { println("worker send heartbeat") master ! HeartBeat(id) } } } object Worker { def main(args: Array[String]) { val clientPort = 8889 //创建 WorkerActorSystem 的必要参数 val configStr = s""" |akka.actor.provider = "akka.remote.RemoteActorRefProvider" |akka.remote.netty.tcp.port = $clientPort """.stripMargin val config = ConfigFactory.parseString(configStr) val actorSystem = ActorSystem("WorkerActorSystem", config) //启动 Actor,Master 会被实例化,生命周期方法会被调用 actorSystem.actorOf(Props[Worker], "Worker") } }
原文:https://www.cnblogs.com/LXL616/p/11136054.html