The Discovery of Spark 系列目录:
- Saprk3.x Journey of Discovery | Spark 基础 & 重要的概念(base and important conception)
- Saprk3.x Journey of Discovery | Spark3.x 新特性 AQE 的理解和介绍
- Saprk3.x Journey of Discovery | Kyuubi1.7 Overview 和部署核心参数调优
- Saprk3.x Journey of Discovery | Spark RPC 框架的发展历史和 RPC 核心类图关系
- Saprk3.x Journey of Discovery | Spark 2.4 to 3.4 releases notes on spark core and SQL
spark3.2.x バージョンのソースコード分析を開始します。2023.4.27 から!
1. Spark RPC 通信の役割#
Spark は分散計算エンジンとして、ネットワーク通信が必要な多くの場所があります。例えば、spark の各コンポーネント間のメッセージ通信、jar パッケージのアップロード、シャッフルプロセス中のノード間のデータ転送、ブロックデータのブロードキャストなどです。
これらすべての通信を抽象化すると、人体のフレームワークに似ており、各器官(コンポーネント)の相互接続のためにパイプライン構造が必要です。Spark では、ドライバー、エグゼキューター、ワーカー、マスターなどの通信も同様で、RPC(Remote Procedure Call)フレームワークを通じて実現されています。
spark における通信の例:
- ドライバーとマスターの通信、ドライバーがマスターに RegisterApplication メッセージを送信
- マスターとワーカーの通信、ワーカーがマスターに実行中のエグゼキューター情報を報告
- エグゼキューターとドライバーの通信、エグゼキューターはワーカー上で実行され、spark のタスクは各エグゼキューターに分配され、エグゼキューターはドライバーにタスクの実行結果を送信する必要があります。
- ワーカー間の通信、タスク実行中に他の場所からデータを取得する必要があります。
spark における RPC 通信の主な役割は二つあります:一つは状態情報の同期、例えばタスクなどの変化情報;二つはデータ内容の転送、例えばシャッフルプロセス中のデータ転送やボードプロセス中のデータ転送
2. Spark RPC フレームワークのいくつかのバージョンの進化#
spark1.6 バージョン以前、spark rpc フレームワークは Akka に基づいて実装されていました。spark1.6 バージョン以降は、Akka のアーキテクチャ設計を参考にして、Netty に基づく RPC フレームワークを実装しました。理由は以下の Jira を参照してください。
具体的な jira から SPARK-5293: Enable Spark user applications to use different versions of Akka:
多くの Spark ユーザーアプリケーションが Akka を使用している(または使用したい)です。Akka は全体として大きなアーキテクチャの単純さと一貫性を提供できます。しかし、Spark が Akka に依存しているため、ユーザーが異なるバージョンに依存することは不可能であり、この特定の問題についての支援を求める多くのリクエストを受け取っています。例えば、Spark Streaming は Akka メッセージの受信者として使用される可能性がありますが、Akka に対する依存関係により、上流の Akka アクターも同じバージョンの Akka を使用する必要があります。
Akka の使用は限られているため(主に RPC と単一スレッドのイベントループ)、代替の RPC 実装と Spark における共通のイベントループに置き換えることができます。
核心的な意味は、spark2.0 バージョンで Akka 依存を削除することで、ユーザーが任意のバージョンの Akka を使用してプログラミングできるようにすることです(Akka は非常に優れたオープンソースの分散システムであり、いくつかの Java アプリケーションや Java Web で Akka の豊富な機能を利用して分散整合性、最も一貫性、分散トランザクションなどの分散環境における問題を解決できます)。
現在の saprk3.2 バージョンも引き続き netty に基づく rpc フレームワークです(ソースコードは spark-core の rpc ディレクトリにあります)。その中でNettyStreamManager
を使用してファイル、jar のアップロードなどを管理し、netty のラッピングを基にノード間のシャッフルプロセスとブロックデータの複製とバックアップを実現しています。
3. Spark の RPC フレームワークの構成#
spark3.2 は netty に基づき、Akka フレームワークの設計を参考にして RPC フレームワークを実装しています。核心コンポーネントは以下の通りです。
org/apache/spark/rpc/RpcTimeout.scala
org/apache/spark/rpc/RpcEnvStoppedException.scala
org/apache/spark/rpc/RpcEnv.scala
org/apache/spark/rpc/RpcEndpointRef.scala
org/apache/spark/rpc/RpcEndpointNotFoundException.scala
org/apache/spark/rpc/RpcEndpointAddress.scala
org/apache/spark/rpc/RpcEndpoint.scala
org/apache/spark/rpc/RpcCallContext.scala
org/apache/spark/rpc/RpcAddress.scala
org/apache/spark/rpc/netty/RpcEndpointVerifier.scala
org/apache/spark/rpc/netty/Outbox.scala
org/apache/spark/rpc/netty/NettyStreamManager.scala
org/apache/spark/rpc/netty/NettyRpcEnv.scala
org/apache/spark/rpc/netty/NettyRpcCallContext.scala
org/apache/spark/rpc/netty/MessageLoop.scala
org/apache/spark/rpc/netty/Inbox.scala
org/apache/spark/rpc/netty/Dispatcher.scala
その中で比較的重要なクラスと特徴を挙げます。
0. private[spark] trait RpcEnvFactory
1. private[spark] abstract class RpcEnv(conf: SparkConf)
2. private[spark] case class RpcAddress(host: String, port: Int)
3. private[spark] abstract class RpcEndpointRef(conf: SparkConf)
4. private[netty] class NettyRpcEndpointRef(@transient private val conf: SparkConf,
private val endpointAddress: RpcEndpointAddress,@transient @volatile private var nettyEnv: NettyRpcEnv)
extends RpcEndpointRef(conf)
5. private[netty] class NettyStreamManager(rpcEnv: NettyRpcEnv)
6. private[netty] class NettyRpcEnv(val conf: SparkConf,javaSerializerInstance: JavaSerializerInstance,
host: String,securityManager: SecurityManager,numUsableCores: Int)
核心の大まかなロジックは以下の図の通りです。
3.1 RpcEndpoint と RpcCallContext#
RpcEndpoint はリクエストに応じるサービスです。Spark では、マスター、ワーカー、ドライバーなどの通信が必要なコンポーネントとして表現され、受信したメッセージに基づいて処理を行います。RpcEndpoint のライフサイクルは次のようになります:
構造体 -> 初期化開始 -> 受信処理 -> 停止(constructor -> onStart -> receive -> onStop)
この RpcEndpoint はonStart
, receive
および onStop
がキュー順に実行されることを保証します。
RpcEndpoint には def receive: PartialFunction[Any, Unit]
と def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit]
の二つの重要な受信および応答メッセージのメソッドがあります。
前者は RpcEndpointRef.send
または RpcCallContext.reply
からのメッセージを処理し、後者は RpcEndpointRef.ask
からのメッセージを処理して応答します。
RpcCallContext は RpcEndpoint の情報コールバックを実装します。
3.2 RpcEndpointRef#
RpcEndpointRef は、リモート RpcEndpoint の参照であり、リモート RpcEndpoint のアドレス名などを保持し、リクエストを送信するための send メソッドと ask メソッドを提供します。
具体的な RpcEndpoint にメッセージを送信する必要がある場合、通常はその RpcEndpoint の参照を取得し、その参照を通じてメッセージを送信します。
3.3 RpcEnv と NettyRpcEnv#
RpcEnv は、サーバーとクライアントが通信するために使用され、RpcEndpoint にメッセージを処理する環境を提供します。
RpcEnv は RpcEndpoint のライフサイクル全体の管理を担当し、エンドポイントの登録、エンドポイント間のメッセージのルーティング、およびエンドポイントの停止を行います。
サーバー側にとって、RpcEnv は RpcEndpoint の実行環境
であり、RpcEndPoint のライフサイクル管理、Tcp 層のデータパケットの解析、およびデータを RpcMessage にデシリアライズしてから、ルーティングに基づいて対応するエンドポイントに送信します。
クライアント側にとっては、RpcEnv を通じて RpcEndpoint の参照を取得
し、それを使用して対応するエンドポイントと通信します。
NettyRpcEnv は RpcEnv を継承した Netty の実装です。
3.4 Dispatcher 、Inbox と Outbox#
関連する技術点:
NettyRpcEnv には Dispatcher が含まれており、主にサーバー側に対して、指定された RpcEndPoint へのルーティングを支援し、ビジネスロジックを呼び出します。また、NettyStreamManager が含まれており、ファイル、jar のアップロードなどを管理します。
// code in NettyRpcEnv
private val dispatcher: Dispatcher = new Dispatcher(this, numUsableCores)
private val streamManager = new NettyStreamManager(this)
RPC エンドポイント は、Spark において各ノード(クライアント / マスター / ワーカー)を指し、すべて RpcEndpoint インターフェースを実装しています。内部では、異なるエンドポイントのニーズに応じて異なるメッセージと異なるビジネス処理を設計し、送信(問い合わせ)する必要がある場合は Dispatcher を呼び出します。
重要なポイント:
Dispatcher: メッセージディスパッチャーであり、メッセージを異なる RpcEndopint にルーティングします。RPC エンドポイントに対してメッセージを送信する必要がある場合や、リモート RPC から受信したメッセージを、対応する指令の受信箱 / 発信箱に配信します。指令の受信者が自身であれば受信箱に保存し、他のエンドポイントであれば発信箱に保存します。
Inbox: 指令メッセージの受信箱であり、ローカルエンドポイントに対応する受信箱です。
OutBox: 指令メッセージの発信箱であり、リモートエンドポイントに対応する発信箱です。メッセージが Outbox に保存されると、すぐに TransportClient を通じてメッセージが送信されます。メッセージを発信箱に保存し、送信するプロセスは同じスレッド内で行われます。これは、リモートメッセージが RpcOutboxMessage と OneWayOutboxMessage の二種類に分かれ、応答が必要なメッセージは直接送信され、結果を得て処理する必要があるためです。
TransportClient: Spark の底層ネットワークパッケージ network-common
にある org.apache.spark.network.client
で、Netty 通信クライアントであり、TransportClientFactory
によって作成され、OutBox メッセージの受信者情報に基づいて、対応するリモート TransportServer にリクエストを送信します。典型的なワークフローの例:
例えば、典型的なワークフローは次のようになります:
client.sendRPC(new OpenFile("/foo")) --> returns StreamId = 100
client.fetchChunk(streamId = 100, chunkIndex = 0, callback)
client.fetchChunk(streamId = 100, chunkIndex = 1, callback)
...
client.sendRPC(new CloseStream(100))
TransportServer: Netty 通信サーバーであり、各 RPC エンドポイントに対して一つの TransportServer があり、リモートメッセージを受信した後、Dispatcher を呼び出してメッセージを対応する受信箱 / 発信箱に配信します。
3.5 RpcAddress#
リモートの RpcEndpointRef のアドレス:ホスト + ポート。
4. Spark3.x RPC フレームワークに関する核心クラス図関係#
重要なポイント
- 核心の RpcEnv は trait であり、停止、登録、エンドポイントの取得などのメソッド定義を提供し、NettyRpcEnv はこのインターフェースクラスの具体的な実装を提供します。
- 工場 RpcEnvFactory を通じて RpcEnv を生成し、NettyRpcEnvFactory は NettyRpcEnv オブジェクトを生成します。
- RpcEnv の setupEndpoint を呼び出してエンドポイントを rpcEnv に登録する際、NettyRpcEnv 内部ではそのエンドポイントの名前とその本体のマッピング関係が行われ、rpcEndpoint と rpcEndpointRef の間のマッピング関係は dispatcher の対応するメンバー変数に保存されます。
- マスター、ワーカー、ブロックマネージャ、ハートビートはすべて trait RpcEndpoint から継承されています。
- transportContext は NettyRpcEnv の重要なメンバーであり、netty の底層インタラクション情報の役割を担っています。