The Discovery of Spark 系列目錄:
- Saprk3.x 发现之旅 | Spark 基礎 & 重要的概念(base and important conception)
- Saprk3.x 发现之旅 | Spark3.x 新特性 AQE 的理解和介紹
- Saprk3.x 发现之旅 | Kyuubi1.7 概述和部署核心參數調優
- Saprk3.x 发现之旅 | Spark RPC 框架的發展歷史和 RPC 核心類圖關係
- Saprk3.x 发现之旅 | Spark 2.4 到 3.4 版本更新說明
開始 spark3.2.x 版本的源碼分析,2023.4.27 開始!
1. Spark RPC 通信的作用#
Spark 作為分佈式的計算引擎,涉及非常多的地方需要進行網絡通信,比如 spark 各個組件的消息通信、jar 包上傳、shuffle 過程中節點間傳輸、Block 數據的廣播等。
將所有的這些通信抽象出來,就和人體的框架類似,需要有管道結構進行各個器官(組件)的互通有無,將所有器官(組件)連接起來。在 spark 中,driver、executor、worker、master 等通信也是類似,通過 RPC(Remote Procedure Call) 框架實現。
spark 中通信舉些例子:
- driver 和 master 通信,driver 向 master 發送 RegisterApplication 消息
- master 和 worker 通信,worker 向 master 上報 worker 上運行 Executor 信息
- executor 和 driver 通信,executor 運行在 worker 上,spark 的 tasks 被分發到運行在各個 executor 中,executor 需要通過向 driver 發送任務運行結果。
- worker 和 worker 的通信,task 運行期間需要從其他地方拿數據
在 spark 中,RPC 通信主要有兩個作用:一是狀態信息同步,比如 task 等變化信息;二是傳輸數據內容,比如 shuffle 過程中數據傳輸或者 board 過程中數據傳輸
2. Spark RPC 框架幾個版本的迭代#
在 spark1.6 版本之前,spark rpc 框架是基於 Akka 來實現。spark1.6 版本之後借鑒了 Akka 架構設計實現了基於 Netty 的 RPC 框架。原因詳見以下 Jira。
From 具體 jira SPARK-5293: Enable Spark user applications to use different versions of Akka:
很多 Spark 用戶應用程序正在使用(或想要使用)Akka。Akka 整體上可以貢獻巨大的架構簡單性和統一性。然而,由於 Spark 依賴於 Akka,使用者無法依賴不同版本,我們在過去收到了許多請求,要求幫助解決這個具體問題。例如,Spark Streaming 可能被用作 Akka 消息的接收者 - 但我們對 Akka 的依賴要求上游 Akka 演員也必須使用相同版本的 Akka。
由於我們對 Akka 的使用有限(主要用於 RPC 和單線程事件循環),我們可以用替代的 RPC 實現和 Spark 中的通用事件循環來替代它。
核心意思是在 spark2.0 版本中移除 Akka 依賴,可以讓用戶使用任何版本的 Akka 來編程(Akka 是一款非常優秀的開源分佈式系統,在一些 Java Application 或者 Java Web 可以利用 Akka 的豐富特性實現分佈式一致性、最 一致性以及分佈式事務等分佈式環境面對的問題)。
現在 saprk3.2 的版本依然是基於 netty 的 rpc 框架(源碼位於:spark-core 下的 rpc 目錄),其中通過NettyStreamManager
來進行文件、jar 上傳等管理,基於 netty 的封裝實現節點間的 Shuffle 過程和 Block 數據的複製與備份。
3. Spark 的 RPC 框架組成#
spark3.2 基於 netty ,借鑒 Akka 框架設計和實現 RPC 框架。核心組件包括以下幾個
org/apache/spark/rpc/RpcTimeout.scala
org/apache/spark/rpc/RpcEnvStoppedException.scala
org/apache/spark/rpc/RpcEnv.scala
org/apache/spark/rpc/RpcEndpointRef.scala
org/apache/spark/rpc/RpcEndpointNotFoundException.scala
org/apache/spark/rpc/RpcEndpointAddress.scala
org/apache/spark/rpc/RpcEndpoint.scala
org/apache/spark/rpc/RpcCallContext.scala
org/apache/spark/rpc/RpcAddress.scala
org/apache/spark/rpc/netty/RpcEndpointVerifier.scala
org/apache/spark/rpc/netty/Outbox.scala
org/apache/spark/rpc/netty/NettyStreamManager.scala
org/apache/spark/rpc/netty/NettyRpcEnv.scala
org/apache/spark/rpc/netty/NettyRpcCallContext.scala
org/apache/spark/rpc/netty/MessageLoop.scala
org/apache/spark/rpc/netty/Inbox.scala
org/apache/spark/rpc/netty/Dispatcher.scala
其中列舉其中比較重要的類和特徵
0. private[spark] trait RpcEnvFactory
1. private[spark] abstract class RpcEnv(conf: SparkConf)
2. private[spark] case class RpcAddress(host: String, port: Int)
3. private[spark] abstract class RpcEndpointRef(conf: SparkConf)
4. private[netty] class NettyRpcEndpointRef(@transient private val conf: SparkConf,
private val endpointAddress: RpcEndpointAddress,@transient @volatile private var nettyEnv: NettyRpcEnv)
extends RpcEndpointRef(conf)
5. private[netty] class NettyStreamManager(rpcEnv: NettyRpcEnv)
6. private[netty] class NettyRpcEnv(val conf: SparkConf,javaSerializerInstance: JavaSerializerInstance,
host: String,securityManager: SecurityManager,numUsableCores: Int)
核心的大致邏輯如下圖
3.1 RpcEndpoint 和 RpcCallContext#
RpcEndpoint 是一個響應請求的服務。在 spark 中可以表示為一個個需要通信的組件,如 master、worker、driver 等,根據接收到的消息進行處理,一個 RpcEndpoint 的生命周期是:
構造 -> 初始化啟動 -> 接收處理 -> 停止(constructor -> onStart -> receive -> onStop)
這個 RpcEndpoint 確保onStart
, receive
and onStop
按照隊列順序執行。
RpcEndpoint 中有 def receive: PartialFunction[Any, Unit]
和def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit]
兩個關鍵的接收和回應消息的方法。
前者處理來自 RpcEndpointRef.send
or RpcCallContext.reply
的消息,後者處理來自RpcEndpointRef.ask
的消息並且進行回應。
RpcCallContext 實現 RpcEndpoint 的信息回調。
3.2 RpcEndpointRef#
RpcEndpointRef,是遠程 RpcEndpoint 的引用,持有遠程 RpcEndpoint 的地址名稱等,提供 send 方法和 ask 方法用於發送請求。
當我們需要向一個具體的 RpcEndpoint 發送 消息時,一般我們需要獲取到該 RpcEndpoint 的引用,然後通過該引用發送消息。
3.3 RpcEnv 和 NettyRpcEnv#
RpcEnv,服務端和客戶端使用這個來通信,為 RpcEndpoint 提供處理消息的環境。
RpcEnv 負責 RpcEndpoint 整個生命周期的管理,包括:註冊 endpoint、endpoint 之間消息的路由、以及停止 endpoint。
對於 server 端來說,RpcEnv 是 RpcEndpoint 的運行環境
,負責 RpcEndPoint 的生命周期管理, 解析 Tcp 層的數據包以及反序列化數據封裝成 RpcMessage,然後根據路由傳送到對應的 Endpoint
對於 client 端來說,可以通過 RpcEnv 獲取 RpcEndpoint 的引用
,也就是 RpcEndpointRef,然後通過 RpcEndpointRef 與對應的 Endpoint 通信
NettyRpcEnv 是繼承 RpcEnv 的一個 netty 的實現。
3.4 Dispatcher 、Inbox 和 Outbox#
關聯的技術點:
NettyRpcEnv 中包含 Dispatcher,主要針對服務端,幫助路由到指定的 RpcEndPoint,並調用起業務邏輯;包含 NettyStreamManager, 負責文件、jar 上傳等管理。
// code in NettyRpcEnv
private val dispatcher: Dispatcher = new Dispatcher(this, numUsableCores)
private val streamManager = new NettyStreamManager(this)
RPC 端點 ,Spark 針對每個節點(Client/Master/Worker)都可以稱之為一個 Rpc 端點,他們都實現 RpcEndpoint 接口,內部根據不同端點的需求,設計不同的消息和不同的業務處理,如果需要發送 (詢問)則調用 Dispatcher
關鍵點:
Dispatcher: 消息分發器,路由消息到不同的 RpcEndopint。針對 RPC 端點需要發送消息或者從遠程 RPC 接收到的消息,分發至對應的指令 收件箱 / 發件箱。如果指令接收方是自己存入收件箱,如果指令接收方為非自身端點,則放入發件箱。
Inbox: 指令消息收件箱,一個本地端點對應一個收件箱。
OutBox: 指令消息發件箱,一個遠程端點對應一個發件箱,當消息放入 Outbox 後,緊接著將消息通過 TransportClient 發送出去。消息放入發件箱以及發送過程是在同一個線程中進行,這樣做的主要原因是遠 稜消息分為 RpcOutboxMessage, OneWayOutboxMessage 兩種消息,而針對需要應答的消息直接發送且需要得到結果進行處理
TransportClient: ,在 spark 底層網絡包network-common
中org.apache.spark.network.client
,Netty 通信客戶端,被TransportClientFactory
創建,根據 OutBox 消息的 receiver 信息,請求對應遠程 TransportServer。 典型的工作流例子:
For example, a typical workflow might be:
client.sendRPC(new OpenFile("/foo")) --> returns StreamId = 100
client.fetchChunk(streamId = 100, chunkIndex = 0, callback)
client.fetchChunk(streamId = 100, chunkIndex = 1, callback)
...
client.sendRPC(new CloseStream(100))
TransportServer: Netty 通信服務端,一個 RPC 端點一個 TransportServer, 接受遠程消息後調用 Dispatcher 分發消息至對應收發件箱
3.5 RpcAddress#
遠程的 RpcEndpointRef 的地址:Host + Port。
4. Spark3.x RPC 框架涉及到的核心類圖關係#
關鍵點
- 核心的 RpcEnv 是一個 trait ,它主要提供了停止,註冊,獲取 endpoint 等方法的定義,而 NettyRpcEnv 提供了該接口類的一個具體實現。
- 通過工廠 RpcEnvFactory 來產生一個 RpcEnv,而 NettyRpcEnvFactory 用來生成 NettyRpcEnv 對象
- 當我們調用 RpcEnv 中的 setupEndpoint 來註冊一個 endpoint 到 rpcEnv 的時候,在 NettyRpcEnv 內部,會將該 endpoint 的名稱與其本身映射關係,rpcEndpoint 與 rpcEndpointRef 之間映射關係保存在 dispatcher 對應的成員變量中
- Master、Worker、BlockManager、HeartBeat 都是繼承 trait RpcEndpoint 而來
- transportContext 作為 NettyRpcEnv 關鍵成員,承擔 netty 底層交互信息的角色