Spark的RPC通信二-初稿
Spark RPC的传输层
传输层主要还是借助netty框架进行实现。
TransportContext包含创建 TransportServer、TransportClientFactory 和使用 TransportChannelHandler 设置 Netty Channel 管道的上下文。TransportClient 提供两种通信协议:control-plane RPCs 和data-plane的 “chunk fetching”。RPC 的处理在 TransportContext 的范围之外进行(即由用户提供的处理程序执行),它负责设置流,这些流可以使用零拷贝 IO 以块为单位通过数据平面进行流式传输。对消息的处理由RpcHandler处理。TransportServer 和 TransportClientFactory 都会为每个通道创建一个 TransportChannelHandler。由于每个 TransportChannelHandler 都包含一个 TransportClient,因此服务器进程可以通过现有通道向客户端发送消息。
传输上下文TransportContext
TransportContext的核心成员与核心方法
TransportConf conf:传输的配置信息RpcHandler rpcHandler:对接收的RPC消息进行处理EventLoopGroup chunkFetchWorkers:处理 ChunkFetchRequest 的独立线程池。这有助于控制通过底层通道将 ChunkFetchRequest 信息写回客户端时阻塞的 TransportServer 工作线程的最大数量。createClientFactory():初始化 ClientFactory,在返回新客户端之前运行给定的 TransportClientBootstraps。Bootstraps 将同步执行,并且必须成功运行才能创建客户端。createServer():创建传输服务端TransportServer的实例initializePipeline():对TransportClient,TransportRequestHandler,TransportResponseHandler进行初始化,然后在用其构造TransportChannelHandler对象。借助Netty的API对管道进行配置。
TransportContext的createClientFactory方法创建传输客户端工厂TransportClientFactory的实例。在构造TransportClientFactory的实例时,还会传递客户端引导程序TransportClientBootstrap的列表。TransportClientFactory内部维护每个Socket地址的连接池。通过调用TransportContext的createServer方法创建传输服务端TransportServer的实例。
核心类TransportClientFactory
用于使用 createClient方法 创建 TransportClients 的工厂。该工厂负责维护与其他主机的连接池,并为同一远程主机返回相同的 TransportClient。它还为所有 TransportClients 共享一个工作线程池。只要有可能,就会重复使用 TransportClients。在完成创建新的 TransportClient 之前,将运行所有给定的 TransportClientBootstraps。
TransportClientFactory的核心成员和核心方法
-
静态内部类
ClientPool:一种简单的数据结构,用于跟踪两个对等节点之间的客户端连接池,保障其可以复用,由于线程不安全,所以增加了客户端对应的锁。private static class ClientPool {TransportClient[] clients;Object[] locks;ClientPool(int size) {clients = new TransportClient[size];locks = new Object[size];for (int i = 0; i < size; i++) {locks[i] = new Object();}}} -
TransportContext context:TransportContext 的实例对象 -
TransportConf conf:链接配置信息的实例对象 -
List<TransportClientBootstrap> clientBootstraps:客户端的引导程序,主要是客户端在建立连接的时候,进行一些初始化的准备操作。 -
ConcurrentHashMap<SocketAddress, ClientPool> connectionPool:维护了连接地址上的客户端连接池的映射表。 -
createClient(String remoteHost, int remotePort):- 首先根据远程地址,确认客户端连接池connectionPool中是否存在关于这个地址的客户端池clientPool,如果没有就新建一个客户端池放入连接池中。
- 检查通道是否超时和客户端是否存活,如果客户端失活,则需要重建一个客户端。创建客户端的在
createClient(InetSocketAddress address)方法中。
public TransportClient createClient(String remoteHost, int remotePort)throws IOException, InterruptedException {// 此处使用未解析地址,以避免每次创建客户端时都进行 DNS 解析。final InetSocketAddress unresolvedAddress =InetSocketAddress.createUnresolved(remoteHost, remotePort);// 如果clientPool不存在,则新建.ClientPool clientPool = connectionPool.get(unresolvedAddress);if (clientPool == null) {connectionPool.putIfAbsent(unresolvedAddress, new ClientPool(numConnectionsPerPeer));clientPool = connectionPool.get(unresolvedAddress);}int clientIndex = rand.nextInt(numConnectionsPerPeer);TransportClient cachedClient = clientPool.clients[clientIndex];if (cachedClient != null && cachedClient.isActive()) {// 更新处理程序的最后使用时间,确保通道不会超时TransportChannelHandler handler = cachedClient.getChannel().pipeline().get(TransportChannelHandler.class);synchronized (handler) {handler.getResponseHandler().updateTimeOfLastRequest();}// 然后检查客户端是否还活着,以防在代码更新之前超时。if (cachedClient.isActive()) {logger.trace("Returning cached connection to {}: {}",cachedClient.getSocketAddress(), cachedClient);return cachedClient;}}// 如果我们到达这里,就没有打开现有连接,尝试创建一个新连接。final long preResolveHost = System.nanoTime();final InetSocketAddress resolvedAddress = new InetSocketAddress(remoteHost, remotePort);final long hostResolveTimeMs = (System.nanoTime() - preResolveHost) / 1000000;if (hostResolveTimeMs > 2000) {logger.warn("DNS resolution for {} took {} ms", resolvedAddress, hostResolveTimeMs);} else {logger.trace("DNS resolution for {} took {} ms", resolvedAddress, hostResolveTimeMs);}// 多个线程可能会竞相在这里创建新连接。通过同步原语只保留其中一个处于活动状态。synchronized (clientPool.locks[clientIndex]) {cachedClient = clientPool.clients[clientIndex];if (cachedClient != null) {if (cachedClient.isActive()) {logger.trace("Returning cached connection to {}: {}", resolvedAddress, cachedClient);return cachedClient;} else {logger.info("Found inactive connection to {}, creating a new one.", resolvedAddress);}}clientPool.clients[clientIndex] = createClient(resolvedAddress);return clientPool.clients[clientIndex];}} -
createClient(InetSocketAddress address):- 通过Netty的根引导程序进行初始化配置
- 通过回调函数初始化bootstrap的Pipeline,设置好客户端引用和管道引用。
- 遍历客户端引导程序集clientBootstraps,执行其初始化的内容
private TransportClient createClient(InetSocketAddress address)throws IOException, InterruptedException {logger.debug("Creating new connection to {}", address);// netty的连接创建的根引导程序Bootstrap bootstrap = new Bootstrap();bootstrap.group(workerGroup).channel(socketChannelClass)// 禁用纳格尔算法,因为我们不想让数据包等待.option(ChannelOption.TCP_NODELAY, true).option(ChannelOption.SO_KEEPALIVE, true).option(ChannelOption.CONNECT_TIMEOUT_MILLIS, conf.connectionTimeoutMs()).option(ChannelOption.ALLOCATOR, pooledAllocator);if (conf.receiveBuf() > 0) {bootstrap.option(ChannelOption.SO_RCVBUF, conf.receiveBuf());}if (conf.sendBuf() > 0) {bootstrap.option(ChannelOption.SO_SNDBUF, conf.sendBuf());}final AtomicReference<TransportClient> clientRef = new AtomicReference<>();final AtomicReference<Channel> channelRef = new AtomicReference<>();// 通过回调函数初始化bootstrap的Pipelinebootstrap.handler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel ch) {TransportChannelHandler clientHandler = context.initializePipeline(ch);clientRef.set(clientHandler.getClient());channelRef.set(ch);}});// 连接远程服务器long preConnect = System.nanoTime();ChannelFuture cf = bootstrap.connect(address);if (!cf.await(conf.connectionTimeoutMs())) {throw new IOException(String.format("Connecting to %s timed out (%s ms)", address, conf.connectionTimeoutMs()));} else if (cf.cause() != null) {throw new IOException(String.format("Failed to connect to %s", address), cf.cause());}TransportClient client = clientRef.get();Channel channel = channelRef.get();assert client != null : "Channel future completed successfully with null client";// 在将客户端标记为成功之前,同步执行任何客户端引导。long preBootstrap = System.nanoTime();logger.debug("Connection to {} successful, running bootstraps...", address);try {// 遍历客户端引导程序集clientBootstraps,执行其初始化的内容for (TransportClientBootstrap clientBootstrap : clientBootstraps) {clientBootstrap.doBootstrap(client, channel);}} catch (Exception e) { // catch non-RuntimeExceptions too as bootstrap may be written in Scalalong bootstrapTimeMs = (System.nanoTime() - preBootstrap) / 1000000;logger.error("Exception while bootstrapping client after " + bootstrapTimeMs + " ms", e);client.close();throw Throwables.propagate(e);}long postBootstrap = System.nanoTime();logger.info("Successfully created connection to {} after {} ms ({} ms spent in bootstraps)",address, (postBootstrap - preConnect) / 1000000, (postBootstrap - preBootstrap) / 1000000);return client;}
TransportClient
用于向server端发送rpc请求和从server 端获取流的chunk块,旨在高效传输大量数据,这些数据被分成大小从几百 KB 到几 MB 不等的数据块。
典型流程
// 打开远程文件
client.sendRPC(new OpenFile("/foo")) --> returns StreamId = 100
// 获取远程文件的chunk
client.fetchChunk(streamId = 100, chunkIndex = 0, callback)
client.fetchChunk(streamId = 100, chunkIndex = 1, callback)
// 关闭远程文件
client.sendRPC(new CloseStream(100))
用于获取预协商数据流中连续数据块的客户端,处理的是从数据流(即数据平面)中获取数据块的过程,但数据流的实际设置是在传输层范围之外完成的。提供 "sendRPC "方便方法是为了在客户端和服务器之间进行控制平面通信,以执行此设置。使用 TransportClientFactory 构建一个 TransportClient 实例。单个 TransportClient 可用于多个流,但任何给定的流都必须仅限于单个客户端,以避免响应顺序混乱。注意:该类用于向服务器发出请求,而 TransportResponseHandler 则负责处理来自服务器的响应。并发性:线程安全,可由多个线程调用。
TransportServer
服务器,提供高效的底层流媒体服务。
消息的处理
消息处理类MessageHandler处理来自 Netty 的请求或响应信息。一个 MessageHandler 实例只与一个Netty通道相关联(尽管同一通道上可能有多个客户端)。以下是其定义的抽象方法。
abstract void handle(T message):对接收的单条信息的处理。abstract void channelActive():当该消息处理程序所在的频道处于活动状态时调用。abstract void exceptionCaught(Throwable cause):当通道上出现异常时调用。abstract void channelInactive():当此MessageHandler所处的通道处于非活动状态时调用。
MessageHandler有两个继承类TransportRequestHandler和TransportResponseHandler分别用来进行Server端处理Client的请求信息和Client端处理Server的响应信息。
TransportRequestHandler的handle(RequestMessage request)方法
public void handle(RequestMessage request) {if (request instanceof RpcRequest) {// 处理RPC请求,依赖RpcHandler的receive()方法processRpcRequest((RpcRequest) request);} else if (request instanceof OneWayMessage) {// 处理无需回复的RPC请求,依赖RpcHandler的receive()方法processOneWayMessage((OneWayMessage) request);} else if (request instanceof StreamRequest) {// 处理流请求,依赖StreamManager的openStream()方法获取流数据并封装成ManagedBufferprocessStreamRequest((StreamRequest) request);} else {// 未知请求抛异常throw new IllegalArgumentException("Unknown request type: " + request);}}
TransportResponseHandler的handle(ResponseMessage message)方法
在client端发送消息时,根据发送消息的类型调用TransportResponseHandler中的方法注册回调函数,回调函数和请求信息放入相应的缓存中。
待TransportResponseHandler收到server端的响应消息时,再调用主要的工作方法handle(),根据响应消息类型从对应缓存中取出回调函数并调用。
@Overridepublic void handle(ResponseMessage message) throws Exception {if (message instanceof ChunkFetchSuccess) {ChunkFetchSuccess resp = (ChunkFetchSuccess) message;ChunkReceivedCallback listener = outstandingFetches.get(resp.streamChunkId);if (listener == null) {logger.warn("Ignoring response for block {} from {} since it is not outstanding",resp.streamChunkId, getRemoteAddress(channel));resp.body().release();} else {outstandingFetches.remove(resp.streamChunkId);listener.onSuccess(resp.streamChunkId.chunkIndex, resp.body());resp.body().release();}} else if (message instanceof ChunkFetchFailure) {ChunkFetchFailure resp = (ChunkFetchFailure) message;ChunkReceivedCallback listener = outstandingFetches.get(resp.streamChunkId);if (listener == null) {logger.warn("Ignoring response for block {} from {} ({}) since it is not outstanding",resp.streamChunkId, getRemoteAddress(channel), resp.errorString);} else {outstandingFetches.remove(resp.streamChunkId);listener.onFailure(resp.streamChunkId.chunkIndex, new ChunkFetchFailureException("Failure while fetching " + resp.streamChunkId + ": " + resp.errorString));}} else if (message instanceof RpcResponse) {RpcResponse resp = (RpcResponse) message;RpcResponseCallback listener = outstandingRpcs.get(resp.requestId);if (listener == null) {logger.warn("Ignoring response for RPC {} from {} ({} bytes) since it is not outstanding",resp.requestId, getRemoteAddress(channel), resp.body().size());} else {outstandingRpcs.remove(resp.requestId);try {listener.onSuccess(resp.body().nioByteBuffer());} finally {resp.body().release();}}} else if (message instanceof RpcFailure) {RpcFailure resp = (RpcFailure) message;RpcResponseCallback listener = outstandingRpcs.get(resp.requestId);if (listener == null) {logger.warn("Ignoring response for RPC {} from {} ({}) since it is not outstanding",resp.requestId, getRemoteAddress(channel), resp.errorString);} else {outstandingRpcs.remove(resp.requestId);listener.onFailure(new RuntimeException(resp.errorString));}} else if (message instanceof StreamResponse) {StreamResponse resp = (StreamResponse) message;Pair<String, StreamCallback> entry = streamCallbacks.poll();if (entry != null) {StreamCallback callback = entry.getValue();if (resp.byteCount > 0) {StreamInterceptor interceptor = new StreamInterceptor(this, resp.streamId, resp.byteCount,callback);try {TransportFrameDecoder frameDecoder = (TransportFrameDecoder)channel.pipeline().get(TransportFrameDecoder.HANDLER_NAME);frameDecoder.setInterceptor(interceptor);streamActive = true;} catch (Exception e) {logger.error("Error installing stream handler.", e);deactivateStream();}} else {try {callback.onComplete(resp.streamId);} catch (Exception e) {logger.warn("Error in stream handler onComplete().", e);}}} else {logger.error("Could not find callback for StreamResponse.");}} else if (message instanceof StreamFailure) {StreamFailure resp = (StreamFailure) message;Pair<String, StreamCallback> entry = streamCallbacks.poll();if (entry != null) {StreamCallback callback = entry.getValue();try {callback.onFailure(resp.streamId, new RuntimeException(resp.error));} catch (IOException ioe) {logger.warn("Error in stream failure handler.", ioe);}} else {logger.warn("Stream failure with unknown callback: {}", resp.error);}} else {throw new IllegalStateException("Unknown response type: " + message.type());}}
消息的分类
MessageHandler用来处理的消息都是继承或实现自Message接口的。
根据上面的类图可以看出,主要分类
-
AbstractMessage:抽象类,用于在单独的缓冲区中保存正文。其他消息类基本都继承该类。 -
RequestMessage:定义了从客户端到服务端的消息接口ChunkFetchRequest:请求获取数据流中单个数据块的序列。这将对应一个响应信息(成功或失败)。RpcRequest:由远程服务端org.apache.spark.network.server.RpcHandler处理的通用 RPC。这将对应一个响应信息(成功或失败)。OneWayMessage:由远程服务端org.apache.spark.network.server.RpcHandler处理。不需要进行回复客户端。StreamRequest:请求从远端流式传输数据。数据流 ID 是一个任意字符串,需要两个端点协商后才能流式传输数据
-
ResponseMessage:定义了从服务端到客户端的消息接口AbstractResponseMessage:响应信息的抽象类。ChunkFetchSuccess:处理ChunkFetchRequest成功后返回的消息。RpcResponse:处理RpcRequest成功后返回的消息。StreamResponse:处理StreamRequest成功后返回的消息。
ChunkFetchFailure:处理ChunkFetchRequest失败后返回的消息。RpcFailure:处理RpcRequest失败后返回的消息。StreamFailure:处理StreamRequest失败后返回的消息。