博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
【Spark】编程实战之模拟SparkRPC原理实现自定义RPC
阅读量:4877 次
发布时间:2019-06-11

本文共 6979 字,大约阅读时间需要 23 分钟。

1. 什么是RPC   

    RPC(Remote Procedure Call)远程过程调用。在Hadoop和Spark中都使用了PRC,它是一种通过网络从远程计算机程序上请求服务,而不需要了解底层网络技术的协议。简单来说,就是有A、B两台机器,A机器可以调用B机器上的程序。

2. Spark 的RPC

    Master和Worker的启动流程:

    (1) 启动Master,会启动一个定时器,定时检查超时的Worker,并移除超时Worker信息。

    (2) 启动Worker,向Master发送注册信息。

    (3) Master收到Worker发来的注册信息后,保存到内存中,并返回一个响应信息,这个信息就是自己的masterUrl。

    (4) Worker接收到Master发来的响应信息(masterUrl)之后,保存到内存中,并开启一个定时器,定时向Master发送心跳信息。

    (5) Master 不断的接收Worker发来的心跳信息,并将每个Worker的最后一次心跳时间为当前接收到心跳信息的时间。

    流程如下图。

3. 编程实战

3.1 项目代码(Scala语言)

    WorkInfo.scala

package com.nova.rpc/**  * @author Supernova  * @date 2018/06/15  */class WorkerInfo(val id: String, val host: String, val port: Int,val memory: Int, val cores: Int) {  // 记录最后一次心跳时间  var lastHeartbeatTime: Long = _}

    RemoteMsg.scala 

package com.nova.rpc/**  * @author Supernova  * @date 2018/06/15  */trait RemoteMsg extends Serializable{}// Master 向自己发送检查超时Worker的信息case object CheckTimeOutWorker// Worker向Master发送的注册信息case class RegisterWorker(id: String, host: String,port: Int, memory: Int, cores: Int) extends RemoteMsg// Master向Worker发送的响应信息case class RegisteredWorker(masterUrl: String) extends RemoteMsg// Worker向Master发送的心跳信息case class Heartbeat(workerId: String) extends RemoteMsg// Worker向自己发送的要执行发送心跳信息的消息case object SendHeartbeat

  

Master.scala    

package com.nova.rpcimport akka.actor.{Actor, ActorSystem, Props}import com.typesafe.config.{Config, ConfigFactory}import scala.collection.mutableimport scala.concurrent.duration._/**  * @author Supernova  * @date 2018/06/15  */class Master(val masterHost: String, val masterPort: Int) extends Actor{  // 用来存储Worker的注册信息: 
val idToWorker = new mutable.HashMap[String, WorkerInfo]() // 用来存储Worker的信息,必须使用可变的HashSet val workers = new mutable.HashSet[WorkerInfo]() // Worker的超时时间间隔 val checkInterval: Long = 15000 /** * 重写生命周期preStart方法 * 作用:当Master启动时,开启定时器,定时检查超时Worker */ override def preStart(): Unit = { // 启动定时器,定时检查超时的Worker import context.dispatcher context.system.scheduler.schedule(0 millis,checkInterval millis, self,CheckTimeOutWorker) } /** * 重写生命周期receive方法 * 作用: * 1.接收Worker发来的注册信息 * 2.不断接收Worker发来的心跳信息,并更新最后一次心跳时间 * 3.过滤出超时的Worker并移除 */ override def receive = { // 接收Worker给Master发送过来的注册信息 case RegisterWorker(id, host, port, memory, cores) => { //判断改Worker是否已经注册过,已注册的不执行任何操作,未注册的将进行注册 if (!idToWorker.contains(id)) { val workerInfo = new WorkerInfo(id, host, port, memory, cores) idToWorker += (id -> workerInfo) workers += workerInfo println("一个新的Worker注册成功") //向Worker发送响应信息,将masterUrl返回 sender ! RegisteredWorker(s"akka.tcp://${Master.MASTER_SYSTEM}" + s"@${masterHost}:${masterPort}/user/${Master.MASTER_ACTOR}") } } //接收Worker发来的心跳信息 case Heartbeat(workerId) => { // 通过传输过来的workerId获取对应的WorkerInfo val workerInfo = idToWorker(workerId) // 获取当前时间 val currentTime = System.currentTimeMillis() // 更新最后一次心跳时间 workerInfo.lastHeartbeatTime = currentTime } //检查超时Worker并移除 case CheckTimeOutWorker => { val currentTime = System.currentTimeMillis() // 把超时的Worker过滤出来 val toRemove: mutable.HashSet[WorkerInfo] = workers.filter(w => currentTime - w.lastHeartbeatTime > checkInterval) // 将超时的Worker移除 toRemove.foreach(deadWorker => { idToWorker -= deadWorker.id workers -= deadWorker }) } println(s"当前Worker的数量: ${workers.size}") }}object Master{ val MASTER_SYSTEM = "MasterSystem" val MASTER_ACTOR = "Master" def main(args: Array[String]): Unit = { val host = args(0) // 通过main方法参数制定master主机名 val port = args(1).toInt //通过main方法参数指定Master的端口号 //akka配置信息 val configStr: String = s""" |akka.actor.provider = "akka.remote.RemoteActorRefProvider" |akka.remote.netty.tcp.hostname = "$host" |akka.remote.netty.tcp.port = "$port" """.stripMargin // 配置创建Actor需要的配置信息 val config: Config = ConfigFactory.parseString(configStr) // 创建ActorSystem val actorSystem: ActorSystem = ActorSystem(MASTER_SYSTEM, config) // 用actorSystem实例创建Actor actorSystem.actorOf(Props(new Master(host, port)), MASTER_ACTOR) actorSystem.awaitTermination() }}

  

Worker.scala

package com.nova.rpcimport java.util.UUIDimport akka.actor.{Actor, ActorSelection, ActorSystem, Props}import com.typesafe.config.{Config, ConfigFactory}import scala.concurrent.duration._/**  * @author Supernova  * @date 2018/06/15  */class Worker(val host: String, val port: Int, val masterHost: String,val masterPort: Int, val memory: Int, val cores: Int) extends Actor{  // 生成一个Worker ID  val workerId: String = UUID.randomUUID().toString  // 用来存储MasterUrl  var masterUrl: String = _  // 心跳时间间隔  val heartbeat_interval: Long = 10000  // Master的Actor  var master: ActorSelection = _  /**    * 生命周期preStart方法    * 作用:当启动Worker时,向master发送注册信息    */  override def preStart(): Unit = {    // 获取Master的Actor    master = context.actorSelection(s"akka.tcp://${Master.MASTER_SYSTEM}" +      s"@${masterHost}:${masterPort}/user/${Master.MASTER_ACTOR}")    master ! RegisterWorker(workerId, host, port, memory, cores)  }  /**    * 生命周期receive方法    * 作用:    * 定时向Master发送心跳信息    */  override def receive: Receive = {    // Worker接收到Master发送过来的注册成功的信息(masterUrl)    case RegisteredWorker(masterUrl) => {      this.masterUrl = masterUrl      // 启动一个定时器, 定时的给Master发送心跳      import context.dispatcher      context.system.scheduler.schedule(        0 millis, heartbeat_interval millis, self, SendHeartbeat)    }    case SendHeartbeat => {      // 向Master发送心跳信息      master ! Heartbeat(workerId)    }  }}object Worker{  val WORKER_SYSTEM = "WorkerSystem"  val WORKER_ACTOR = "Worker"  def main(args: Array[String]): Unit = {    /**      * 通过main方法参数指定相应的      * worker主机名、端口号,master主机名、端口号,使用的内存和核数      */    val host = args(0)    val port = args(1).toInt    val masterHost = args(2)    val masterPort = args(3).toInt    val memory = args(4).toInt    val cores = args(5).toInt    //akka配置信息    val configStr =      s"""         |akka.actor.provider = "akka.remote.RemoteActorRefProvider"         |akka.remote.netty.tcp.hostname = "$host"         |akka.remote.netty.tcp.port = "$port"      """.stripMargin    // 配置创建Actor需要的配置信息    val config: Config = ConfigFactory.parseString(configStr)    // 创建ActorSystem    val actorSystem: ActorSystem = ActorSystem(WORKER_SYSTEM, config)    // 用actorSystem实例创建Actor    actorSystem.actorOf(Props(new Worker(      host, port, masterHost, masterPort, memory, cores)), WORKER_ACTOR)    actorSystem.awaitTermination()  }}

  

    3.2 测试运行

由于Master 和Worker的运行都是使用main方法参数传入相应的主机名端口等参数,所以在运行前要在IDEA中的Edit Configurations 窗口中传入相应的参数。在本次测试中,我指定的参数如图:

【Master端】

【Worker端】

【运行结果】

1. 先运行Master,可以看到一旦运行Master,就启动了定时器检查超时Worker,因为还没有Worker进行注册,所以结果一直为0

2. 启动Worker

3. 启动Worker后,再看Master的窗口可以发现Worker注册成功,并且数量为1

4. 关闭Worker,此时Worker已经宕掉了,可以发现Master窗口会收到一条警告信息,并且Master在定时检查超时Worker的时候移除了过期未收到心跳的Worker

 

转载于:https://www.cnblogs.com/snova/p/9195690.html

你可能感兴趣的文章
用信号量进程同步与互斥
查看>>
java容器简要概述
查看>>
Xdebug断点调试的工作原理详解
查看>>
CentOS7+Nginx设置Systemctl restart nginx.service服务
查看>>
web服务器,验证码,Xftp使用方法
查看>>
割点 - 模板
查看>>
使用maven 如何生成源代码的jar包
查看>>
Ubuntu 16.04.6 + Win10 双系统时间错误且不一致
查看>>
协同过滤代码---loadMovieLens.py文件
查看>>
条件分布
查看>>
Python之字符串的特性及常用方法
查看>>
第三次作业——结对编程
查看>>
ora-12899解决方法
查看>>
(8)关于flexbox的一些想法。
查看>>
一台机子同时启动两个相同版本的tomcat
查看>>
剑指offer——python【第29题】最小的K个数
查看>>
带你入门代理模式/SpringAop的运行机制
查看>>
IOC 的理解与解释
查看>>
参考的博客
查看>>
移动端适配方案
查看>>