The Discovery of Spark Series Directory:
- Saprk3.x Journey of Discovery | Spark Basics & Important Concepts
- Saprk3.x Journey of Discovery | Understanding and Introduction to New Features AQE in Spark3.x
- Saprk3.x Journey of Discovery | Kyuubi1.7 Overview and Deployment Core Parameter Tuning
- Saprk3.x Journey of Discovery | The Development History of Spark RPC Framework and Core Class Diagram Relationships
- Saprk3.x Journey of Discovery | Release Notes on Spark Core and SQL from Spark 2.4 to 3.4
Starting the source code analysis of spark3.2.x version, beginning on 2023.4.27!
1. The Role of Spark RPC Communication#
As a distributed computing engine, Spark involves many areas that require network communication, such as message communication between various Spark components, jar package uploads, data transfer between nodes during shuffle, and broadcasting Block data.
Abstracting all these communications is similar to the framework of the human body, requiring a pipeline structure for the interconnection of various organs (components), connecting all organs (components) together. In Spark, communication between driver, executor, worker, master, etc., is also similar, achieved through the RPC (Remote Procedure Call) framework.
Examples of communication in Spark:
- Communication between driver and master, where the driver sends a RegisterApplication message to the master.
- Communication between master and worker, where the worker reports the Executor information running on the worker to the master.
- Communication between executor and driver, where the executor runs on the worker, and Spark's tasks are distributed to run on various executors, with the executor needing to send task execution results to the driver.
- Communication between worker and worker, where tasks need to fetch data from other sources during execution.
In Spark, RPC communication mainly serves two purposes: first, to synchronize status information, such as changes in tasks; second, to transfer data content, such as data transfer during shuffle or board processes
2. Iterations of Spark RPC Framework Versions#
Before Spark 1.6, the Spark RPC framework was implemented based on Akka. After Spark 1.6, it adopted the architecture design of Akka to implement an RPC framework based on Netty. The reasons are detailed in the following Jira.
From the specific Jira SPARK-5293: Enable Spark user applications to use different versions of Akka:
A lot of Spark user applications are using (or want to use) Akka. Akka as a whole can contribute great architectural simplicity and uniformity. However, because Spark depends on Akka, it is not possible for users to rely on different versions, and we have received many requests in the past asking for help about this specific issue. For example, Spark Streaming might be used as the receiver of Akka messages - but our dependency on Akka requires the upstream Akka actors to also use the identical version of Akka.
Since our usage of Akka is limited (mainly for RPC and single-threaded event loop), we can replace it with alternative RPC implementations and a common event loop in Spark.
The core idea is to remove the Akka dependency in Spark 2.0, allowing users to program with any version of Akka (Akka is an excellent open-source distributed system that can utilize its rich features to solve issues like distributed consistency, eventual consistency, and distributed transactions in Java applications or Java web environments).
Currently, the Spark 3.2 version is still based on the Netty RPC framework (the source code is located in the rpc directory under spark-core), where NettyStreamManager
** is used for managing file and jar uploads, and the Netty encapsulation implements the shuffle process and Block data replication and backup between nodes**.
3. Components of Spark's RPC Framework#
Spark 3.2 is based on Netty, borrowing from the Akka framework design to implement the RPC framework. The core components include the following:
org/apache/spark/rpc/RpcTimeout.scala
org/apache/spark/rpc/RpcEnvStoppedException.scala
org/apache/spark/rpc/RpcEnv.scala
org/apache/spark/rpc/RpcEndpointRef.scala
org/apache/spark/rpc/RpcEndpointNotFoundException.scala
org/apache/spark/rpc/RpcEndpointAddress.scala
org/apache/spark/rpc/RpcEndpoint.scala
org/apache/spark/rpc/RpcCallContext.scala
org/apache/spark/rpc/RpcAddress.scala
org/apache/spark/rpc/netty/RpcEndpointVerifier.scala
org/apache/spark/rpc/netty/Outbox.scala
org/apache/spark/rpc/netty/NettyStreamManager.scala
org/apache/spark/rpc/netty/NettyRpcEnv.scala
org/apache/spark/rpc/netty/NettyRpcCallContext.scala
org/apache/spark/rpc/netty/MessageLoop.scala
org/apache/spark/rpc/netty/Inbox.scala
org/apache/spark/rpc/netty/Dispatcher.scala
Among them, some important classes and features are listed:
0. private[spark] trait RpcEnvFactory
- private[spark] abstract class RpcEnv(conf: SparkConf)
- private[spark] case class RpcAddress(host: String, port: Int)
- private[spark] abstract class RpcEndpointRef(conf: SparkConf)
- private[netty] class NettyRpcEndpointRef(@transient private val conf: SparkConf,
private val endpointAddress: RpcEndpointAddress,@transient @volatile private var nettyEnv: NettyRpcEnv)
extends RpcEndpointRef(conf) - private[netty] class NettyStreamManager(rpcEnv: NettyRpcEnv)
- private[netty] class NettyRpcEnv(val conf: SparkConf,javaSerializerInstance: JavaSerializerInstance,
host: String,securityManager: SecurityManager,numUsableCores: Int)
The core logic is roughly as shown in the figure below:

<center> figure 1: Spark RPC</center>
### 3.1 RpcEndpoint and RpcCallContext
RpcEndpoint is a service that responds to requests. In Spark, it can represent various components that need to communicate, such as master, worker, driver, etc., processing based on received messages. The lifecycle of a RpcEndpoint is:
**Construct -> Initialize Start -> Receive Process -> Stop (constructor -> onStart -> receive -> onStop)**
This RpcEndpoint ensures that `onStart`, `receive`, and `onStop` are executed in queue order.
In RpcEndpoint, there are two key methods for receiving and responding to messages: `def receive: PartialFunction[Any, Unit]` and `def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit]`.
The former handles messages from `RpcEndpointRef.send` or `RpcCallContext.reply`, while the latter handles messages from `RpcEndpointRef.ask` and responds.
RpcCallContext implements the information callback for RpcEndpoint.
### 3.2 RpcEndpointRef
RpcEndpointRef is a reference to a remote RpcEndpoint, holding the address name and other information of the remote RpcEndpoint, providing send and ask methods for sending requests.

When we need to send a message to a specific RpcEndpoint, we generally need to obtain a reference to that RpcEndpoint and then send the message through that reference.
### 3.3 RpcEnv and NettyRpcEnv
RpcEnv is used by the server and client to communicate, providing an environment for RpcEndpoint to process messages.
RpcEnv manages the entire lifecycle of RpcEndpoint, including: registering endpoints, routing messages between endpoints, and stopping endpoints.
For the server side, `RpcEnv is the runtime environment for RpcEndpoint`, responsible for managing the lifecycle of RpcEndpoint, parsing TCP layer data packets, and deserializing data into RpcMessage, which is then routed to the corresponding Endpoint.
For the client side, `RpcEnv can obtain a reference to the RpcEndpoint`, which is the RpcEndpointRef, and then communicate with the corresponding Endpoint through RpcEndpointRef.
NettyRpcEnv is a Netty implementation that inherits from RpcEnv.
### 3.4 Dispatcher, Inbox, and Outbox
Related technical points:
**NettyRpcEnv** contains a Dispatcher, mainly for the server side, helping route to the specified RpcEndPoint and invoke business logic; it includes NettyStreamManager, responsible for managing file and jar uploads.
``` scala
// code in NettyRpcEnv
private val dispatcher: Dispatcher = new Dispatcher(this, numUsableCores)
private val streamManager = new NettyStreamManager(this)
RPC Endpoint, Spark can refer to each node (Client/Master/Worker) as an Rpc endpoint, all of which implement the RpcEndpoint interface, internally designing different messages and business processing according to the needs of different endpoints. If a send (ask) is needed, it calls the Dispatcher.
Key points:
Dispatcher: The message dispatcher routes messages to different RpcEndpoints. For RPC endpoints that need to send messages or receive messages from remote RPC, it distributes them to the corresponding instruction inbox/outbox. If the instruction recipient is itself, it is stored in the inbox; if the instruction recipient is a different endpoint, it is placed in the outbox.
Inbox: The instruction message inbox, with one local endpoint corresponding to one inbox.
Outbox: The instruction message outbox, with one remote endpoint corresponding to one outbox. When a message is placed in the Outbox, it is immediately sent out via TransportClient. The process of placing a message in the outbox and sending it occurs in the same thread, primarily because remote messages are divided into RpcOutboxMessage and OneWayOutboxMessage, and messages requiring a response are sent directly and need to be processed for results.
TransportClient: In Spark's underlying network package network-common
in org.apache.spark.network.client
, the Netty communication client is created by TransportClientFactory
, which requests the corresponding remote TransportServer based on the Outbox message's receiver information. A typical workflow example:
For example, a typical workflow might be:
client.sendRPC(new OpenFile("/foo")) --> returns StreamId = 100
client.fetchChunk(streamId = 100, chunkIndex = 0, callback)
client.fetchChunk(streamId = 100, chunkIndex = 1, callback)
...
client.sendRPC(new CloseStream(100))
TransportServer: The Netty communication server, with one RPC endpoint corresponding to one TransportServer, accepts remote messages and calls the Dispatcher to distribute messages to the corresponding inbox/outbox.
3.5 RpcAddress#
The address of the remote RpcEndpointRef: Host + Port.
4. Core Class Diagram Relationships Involved in Spark3.x RPC Framework#
Key points:
- The core RpcEnv is a trait that mainly provides method definitions for stopping, registering, and obtaining endpoints, while NettyRpcEnv provides a concrete implementation of this interface class.
- A RpcEnv is produced through the factory RpcEnvFactory, while NettyRpcEnvFactory is used to generate NettyRpcEnv objects.
- When we call setupEndpoint in RpcEnv to register an endpoint to the rpcEnv, the internal NettyRpcEnv maps the name of that endpoint to its own, and the mapping relationship between rpcEndpoint and rpcEndpointRef is stored in the corresponding member variable of the dispatcher.
- Master, Worker, BlockManager, HeartBeat all inherit from the trait RpcEndpoint.
- transportContext, as a key member of NettyRpcEnv, plays the role of handling low-level interaction information in Netty.