HDFSRPC通信框架详解

本文主要对HDFSRPC通信框架解析。包括listener,reader,handler,responser等实现类的源码分析。注意hadoop版本为3.1.1。

写在前面

rpc肯定依赖于socket通信,并且使用的是java NIO。读者最好对nio有一定的了解,文章中不会对相关知识作过多的介绍。

https://blog.csdn.net/yhl_jxy/article/details/79332092

还有本文中涉及到的代码大部分都是作者都整理过的,会和server源码有些许区别。

RPC框架架构图

1871_2.jpeg

从架构图中可以看出一个socket连接的数据处理被多个模块分割,每个模块处理特定的问题。这样做的好处一方面保证了call的并发,另一方面也保证了代码的可扩展性。

Listener

listener就是监听线程,那到底是监听什么?显而易见是socket连接又称connection。

Listener.run、doAccpect

public void run() {LOG.info(Thread.currentThread().getName() + ": starting");Server.connectionManager.startIdleScan();while (Server.running) {SelectionKey key = null;try {getSelector().select();Iterator<SelectionKey> iter = getSelector().selectedKeys().iterator();while (iter.hasNext()) {key = iter.next();iter.remove();try {if (key.isValid()) {if (key.isAcceptable())doAccept(key);}} catch (IOException e) {}key = null;}} catch (OutOfMemoryError e) {// we can run out of memory if we have too many threads// log the event and sleep for a minute and give // some thread(s) a chance to finishLOG.warn("Out of Memory in server select", e);closeCurrentConnection(key, e);Server.connectionManager.closeIdle(true);try { Thread.sleep(60000); } catch (Exception ie) {}} catch (Exception e) {closeCurrentConnection(key, e);}}LOG.info("Stopping " + Thread.currentThread().getName());synchronized (this) {try {acceptChannel.close();selector.close();} catch (IOException e) { }selector= null;acceptChannel= null;// close all connectionsServer.connectionManager.stopIdleScan();Server.connectionManager.closeAll();}}void doAccept(SelectionKey key) throws InterruptedException, IOException,  OutOfMemoryError {ServerSocketChannel server = (ServerSocketChannel) key.channel();SocketChannel channel;while ((channel = server.accept()) != null) {channel.configureBlocking(false);channel.socket().setTcpNoDelay(tcpNoDelay);channel.socket().setKeepAlive(true);Reader reader = getReader();Connection c = Server.connectionManager.register(channel);// If the connectionManager can't take it, close the connection.if (c == null) {if (channel.isOpen()) {IOUtils.cleanup(null, channel);}Server.connectionManager.droppedConnections.getAndIncrement();continue;}key.attach(c);  // so closeCurrentConnection can get the objectreader.addConnection(c);}}

简单来说就是accept channel,变成connection,然后交给reader处理。

Reader

Reader在整个RPC框架中起着举足轻重的作用。在HDFSRPC协议详解一文中processOneRpc之前的工作都是reader完成的。总结一下就是以下几点:

  1. rpc connection初始7字节的检查。
  2. sasl握手与验证。
  3. IpcConnectionContext读取。
  4. processOneRpc准备工作,包括RequestHeaderProto解析。

还有一点要注意的一次reader就包含完成这所有工作,而不是多次完成。单次reader生成call以后,就会马上下次call的read,本质上call是并发的,由handler处理。

reader的源码其实很简单,本质上是循环执行了connection.readAndProcess()。本文不会对readAndProcess过多介绍,有兴趣可以查看HDFSRPC协议详解。

@Overridepublic void run() {LOG.info("Starting " + Thread.currentThread().getName());try {doRunLoop();} finally {try {readSelector.close();} catch (IOException ioe) {LOG.error("Error closing read selector in " + Thread.currentThread().getName(), ioe);}}}private synchronized void doRunLoop() {while (Server.running) {SelectionKey key = null;try {// consume as many connections as currently queued to avoid// unbridled acceptance of connections that starves the selectint size = pendingConnections.size();for (int i=size; i>0; i--) {Connection conn = pendingConnections.take();conn.channel.register(readSelector, SelectionKey.OP_READ, conn);}readSelector.select();Iterator<SelectionKey> iter = readSelector.selectedKeys().iterator();while (iter.hasNext()) {key = iter.next();iter.remove();try {if (key.isReadable()) {doRead(key);}} catch (CancelledKeyException cke) {// something else closed the connection, ex. responder or// the listener doing an idle scan.  ignore it and let them// clean up.LOG.info(Thread.currentThread().getName() +": connection aborted from " + key.attachment());}key = null;}} catch (InterruptedException e) {if (Server.running) {                      // unexpected -- log itLOG.info(Thread.currentThread().getName() + " unexpectedly interrupted", e);}} catch (IOException ex) {LOG.error("Error in Reader", ex);} catch (Throwable re) {LOG.error("Bug in read selector!", re);//ExitUtil.terminate(1, "Bug in read selector!");}}}//from Listener doReadvoid doRead(SelectionKey key) throws InterruptedException {int count;Connection c = (Connection)key.attachment();if (c == null) {return;  }c.setLastContact(Time.now());try {count = c.readAndProcess();} catch (InterruptedException ieo) {LOG.info(Thread.currentThread().getName() + ": readAndProcess caught InterruptedException", ieo);throw ieo;} catch (Exception e) {// Any exceptions that reach here are fatal unexpected internal errors// that could not be sent to the client.LOG.info(Thread.currentThread().getName() +": readAndProcess from client " + c +" threw exception [" + e + "]", e);count = -1; //so that the (count < 0) block is executed}// setupResponse will signal the connection should be closed when a// fatal response is sent.if (count < 0 || c.shouldClose()) {Server.closeConnection(c);c = null;}else {c.setLastContact(Time.now());}}   

CallQueue

callQueue主要是存放call队列,由于callqueue在hdfs是一个较为复杂的东西,后期会单做一期介绍。

Handler

handler线程也比较简单,实际上就是执行了call.run()。

@Overridepublic void run() {LOG.debug(Thread.currentThread().getName() + ": starting");while (Server.running) {try {final Call call = Server.callQueue.take(); // pop the queue; maybe blocked hereif (LOG.isDebugEnabled()) {LOG.debug(Thread.currentThread().getName() + ": " + call);}CurCall.set(call);/*TODOUserGroupInformation remoteUser = call.getRemoteUser();if (remoteUser != null) {remoteUser.doAs(call);} else {call.run();}*/call.run();} catch (InterruptedException e) {if (Server.running) {                          // unexpected -- log itLOG.info(Thread.currentThread().getName() + " unexpectedly interrupted", e);}} catch (Exception e) {LOG.info(Thread.currentThread().getName() + " caught an exception", e);} finally {CurCall.set(null);}}LOG.debug(Thread.currentThread().getName() + ": exiting");}

主要的难点是这么执行call.run()。要知道call.run首先要知道protocols。

Protocols

每个server都自己的Protocols,protocols首先是以rpcKind分类的。

enum RpcKindProto {RPC_BUILTIN          = 0;  // Used for built in calls by testsRPC_WRITABLE         = 1;  // Use WritableRpcEngine RPC_PROTOCOL_BUFFER  = 2;  // Use ProtobufRpcEngine
}

3.x的rpckind都使用的是RPC_PROTOCOL_BUFFER,所以以这个为例。

RPC_PROTOCOL_BUFFER的protocols会放到一个hashmap里面。

Map<ProtoNameVer, ProtoClassProtoImpl> protocolImplMapArray = new HashMap<ProtoNameVer, ProtoClassProtoImpl>(10);

key为ProtoNameVer,要注意的hashcode的实现方法。

static class ProtoNameVer {final String protocol;final long   version;ProtoNameVer(String protocol, long ver) {this.protocol = protocol;this.version = ver;}@Overridepublic boolean equals(Object o) {if (o == null) return false;if (this == o) return true;if (! (o instanceof ProtoNameVer))return false;ProtoNameVer pv = (ProtoNameVer) o;return ((pv.protocol.equals(this.protocol)) && (pv.version == this.version));     }@Overridepublic int hashCode() {return protocol.hashCode() * 37 + (int) version;    }}

所以任何protocol必须有protocol和version,即注解类ProtocolInfo。

@Retention(RetentionPolicy.RUNTIME)
public @interface ProtocolInfo {String protocolName();  // the name of the protocol (i.e. rpc service)long protocolVersion() default -1; // default means not defined use old way
}

一个protocol的接口类类似这样。

@ProtocolInfo(protocolName = HdfsConstants.CLIENT_NAMENODE_PROTOCOL_NAME, protocolVersion = 1)
/*** Protocol that a clients use to communicate with the NameNode.** Note: This extends the protocolbuffer service based interface to* add annotations required for security.*/
public interface ClientNamenodeProtocolPB extends ClientNamenodeProtocol.BlockingInterface {
}

那反射的方法怎么来呢?我们可以发现ClientNamenodeProtocol.BlockingInterface其实是protobuf编译出来的,可以看一下ClientNamenodeProtocol.proto文件的最后service定义。

service ClientNamenodeProtocol {rpc getBlockLocations(GetBlockLocationsRequestProto)returns(GetBlockLocationsResponseProto);rpc getServerDefaults(GetServerDefaultsRequestProto)returns(GetServerDefaultsResponseProto);rpc create(CreateRequestProto)returns(CreateResponseProto);rpc append(AppendRequestProto) returns(AppendResponseProto);rpc setReplication(SetReplicationRequestProto)returns(SetReplicationResponseProto);rpc setStoragePolicy(SetStoragePolicyRequestProto)...
}

编译出来就是ClientNamenodeProtocol.BlockingInterface,里面就是方法列表。

我们自己的实现类只需要实现ClientNamenodeProtocolPB即可。例如ClientNamenodeProtocolServerSideTranslatorPB。

//add protocols
ClientNamenodeProtocolServerSideTranslatorPB cnn = new ClientNamenodeProtocolServerSideTranslatorPB();
BlockingService cnnService = ClientNamenodeProtocol.newReflectiveBlockingService(cnn);
Server.addProtocol(ClientNamenodeProtocolPB.class, cnnService);    

最后call.run其实是根据RequestHeaderProto来找到对应的实现类。

message RequestHeaderProto {/** Name of the RPC method */required string methodName = 1;/** * RPCs for a particular interface (ie protocol) are done using a* IPC connection that is setup using rpcProxy.* The rpcProxy's has a declared protocol name that is * sent form client to server at connection time. * * Each Rpc call also sends a protocol name * (called declaringClassprotocolName). This name is usually the same* as the connection protocol name except in some cases. * For example metaProtocols such ProtocolInfoProto which get metainfo* about the protocol reuse the connection but need to indicate that* the actual protocol is different (i.e. the protocol is* ProtocolInfoProto) since they reuse the connection; in this case* the declaringClassProtocolName field is set to the ProtocolInfoProto*/required string declaringClassProtocolName = 2;/** protocol version of class declaring the called method */required uint64 clientProtocolVersion = 3;
}

然后通过反射,去执行了实现类的方法。

 Writable call(String protocol, Writable writableRequest, long receiveTime) throws Exception {RpcProtobufRequest request = (RpcProtobufRequest) writableRequest;RequestHeaderProto rpcRequest = request.getRequestHeader();String methodName = rpcRequest.getMethodName();/** * RPCs for a particular interface (ie protocol) are done using a* IPC connection that is setup using rpcProxy.* The rpcProxy's has a declared protocol name that is * sent form client to server at connection time. * * Each Rpc call also sends a protocol name * (called declaringClassprotocolName). This name is usually the same* as the connection protocol name except in some cases. * For example metaProtocols such ProtocolInfoProto which get info* about the protocol reuse the connection but need to indicate that* the actual protocol is different (i.e. the protocol is* ProtocolInfoProto) since they reuse the connection; in this case* the declaringClassProtocolName field is set to the ProtocolInfoProto.*/String declaringClassProtoName = rpcRequest.getDeclaringClassProtocolName();long clientVersion = rpcRequest.getClientProtocolVersion();//LOG.info("Call: connectionProtocolName=" + connectionProtocolName + ", method=" + methodName + ", declaringClass=" + declaringClassProtoName);ProtoClassProtoImpl protocolImpl = getProtocolImpl(declaringClassProtoName, clientVersion);BlockingService service = (BlockingService) protocolImpl.protocolImpl;MethodDescriptor methodDescriptor = service.getDescriptorForType().findMethodByName(methodName);if (methodDescriptor == null) {String msg = "Unknown method " + methodName + " called on " + protocol + " protocol.";LOG.warn(msg);throw new RpcNoSuchMethodException(msg);}Message prototype = service.getRequestPrototype(methodDescriptor);Message param = request.getValue(prototype);Message result = null;long startTime = Time.now();int qTime = (int) (startTime - receiveTime);Exception exception = null;boolean isDeferred = false;try {//server.rpcDetailedMetrics.init(protocolImpl.protocolClass);result = service.callBlockingMethod(methodDescriptor, null, param);// Check if this needs to be a deferred response,// by checking the ThreadLocal callback being set} catch (ServiceException e) {exception = (Exception) e.getCause();throw (Exception) e.getCause();} catch (Exception e) {exception = e;throw e;} finally {int processingTime = (int) (Time.now() - startTime);//if (LOG.isDebugEnabled()) {String msg ="Served: " + methodName + (isDeferred ? ", deferred" : "") +", queueTime= " + qTime +" procesingTime= " + processingTime;if (exception != null) {msg += " exception= " + exception.getClass().getSimpleName();}//LOG.debug(msg);LOG.info(msg);//LOG.info("params:" + param.toString());//LOG.info("result:" + result.toString());//}String detailedMetricsName = (exception == null) ?methodName :exception.getClass().getSimpleName();//server.updateMetrics(detailedMetricsName, qTime, processingTime, isDeferred);}return RpcWritable.wrap(result);}

完成以后如果有返回Message会放入rpccall.rpcResponse。然后再把call放入ResponseQueue。

ResponseQueue

在connection中,主要存放处理完的rpccall。

Responder

Responder线程主要负责call结果的返回。

 private boolean processResponse(LinkedList<RpcCall> responseQueue,boolean inHandler) throws IOException {boolean error = true;boolean done = false;       // there is more data for this channel.int numElements = 0;RpcCall call = null;try {synchronized (responseQueue) {//// If there are no items for this channel, then we are done//numElements = responseQueue.size();if (numElements == 0) {error = false;return true;              // no more data for this channel.}//// Extract the first call//call = responseQueue.removeFirst();SocketChannel channel = call.connection.channel;if (LOG.isDebugEnabled()) {LOG.debug(Thread.currentThread().getName() + ": responding to " + call);}//// Send as much data as we can in the non-blocking fashion//int numBytes = call.connection.channelWrite(channel, call.rpcResponse);if (numBytes < 0) {return true;}if (!call.rpcResponse.hasRemaining()) {//Clear out the response buffer so it can be collectedcall.rpcResponse = null;call.connection.decRpcCount();if (numElements == 1) {    // last call fully processes.done = true;             // no more data for this channel.} else {done = false;            // more calls pending to be sent.}if (LOG.isDebugEnabled()) {LOG.debug(Thread.currentThread().getName() + ": responding to " + call+ " Wrote " + numBytes + " bytes.");}} else {//// If we were unable to write the entire response out, then // insert in Selector queue. //call.connection.responseQueue.addFirst(call);if (inHandler) {// set the serve time when the response has to be sent latercall.timestamp = Time.now();incPending();try {// Wakeup the thread blocked on select, only then can the call // to channel.register() complete.writeSelector.wakeup();channel.register(writeSelector, SelectionKey.OP_WRITE, call);} catch (ClosedChannelException e) {//Its ok. channel might be closed else where.done = true;} finally {decPending();}}if (LOG.isDebugEnabled()) {LOG.debug(Thread.currentThread().getName() + ": responding to " + call+ " Wrote partial " + numBytes + " bytes.");}}error = false;              // everything went off well}} finally {if (error && call != null) {LOG.warn(Thread.currentThread().getName()+", call " + call + ": output error");done = true;               // error. no more data for this channel.Server.closeConnection(call.connection);}}return done;}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/772807.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

HTML5 、CSS3 、ES6 新特性

HTML5 新特性 1. 新的语义化元素&#xff1a;article 、footer 、header 、nav 、section 2. 表单增强&#xff0c;新的表单控件&#xff1a;calendar 、date 、time 、email 、url 、search 3. 新的 API&#xff1a;音频(用于媒介回放的 video 和 audio 元素)、图形&#x…

古河云科技校园数字孪生解决方案

智慧校园需将环境信息、资源信息和应用信息全部转化为数字化信息&#xff0c;为管理决策和服务提供强有力的支持。智慧系统集智能化感知、智能化控制、智能化管理、智能化互动反馈、智能化数据分析、智能化视窗等功能于一体&#xff0c;旨在实现校园信息服务的全面提升。 行业…

教程1_图像视频入门

一、图像入门 1、cv2.imread()函数 cv2.imread() 是 OpenCV 库中的一个函数&#xff0c;用于读取图像文件。下面是 cv2.imread() 函数的基本介绍和使用方法&#xff1a; 函数定义 cv2.imread(filename, flagscv2.IMREAD_COLOR) 参数 filename&#xff1a;要读取的图像的路…

使用Spring Data Elasticsearch实现与Elasticsearch的集成,进行全文搜索和数据分析。

使用Spring Data Elasticsearch实现与Elasticsearch的集成&#xff0c;进行全文搜索和数据分析。 使用Spring Data Elasticsearch可以很容易地实现与Elasticsearch的集成&#xff0c;从而进行全文搜索和数据分析。下面是一个简单的示例&#xff0c;演示如何在Spring Boot应用程…

Excel 导入、导出的封装

最近在封装公司统一使用的组件&#xff0c;主要目的是要求封装后开发人员调用简单&#xff0c;不用每个项目组中重复去集成同一个依赖l&#xff0c;写的五花八门&#xff0c;代码不规范&#xff0c;后者两行泪。 为此&#xff0c;我们对EasyExcel进行了二次封装&#xff0c;我…

flutter const InviteFriendReward(),用setState刷新不了

列布局里面添加了InviteFriendReward()&#xff0c;InviteFriendReward()里面有请求接口的开关是否显示&#xff0c;但是因为里面有波浪形&#xff0c;所以加了const&#xff0c;导致setState时&#xff0c;即使开关是开的&#xff0c;也没有再显示了 const InviteFriendRewar…

python怎样打开一个pdf文件?

要在Python中打开PDF文件&#xff0c;可以使用PyPDF2库。 首先&#xff0c;确保已安装PyPDF2库&#xff0c;可以使用以下命令安装&#xff1a; pip install PyPDF2 然后&#xff0c;可以按照以下步骤打开PDF文件&#xff1a; 导入PyPDF2库&#xff1a; import PyPDF2 打开…

《机器学习:引领数字化时代的技术革命》

随着科技的不断发展&#xff0c;机器学习作为人工智能的重要支柱之一&#xff0c;正迅速崛起并引领着数字化时代的技术革命。本文将从机器学习的技术进展、技术原理、行业应用案例、面临的挑战与机遇以及未来趋势预测和学习路线等方面展开探讨&#xff0c;为您揭示机器学习的神…

人工智能时代如何高效完成营销内容计划

智能对话升级&#xff01;【Kompas AI】AI对话助手&#xff0c;让沟通更高效 在人工智能时代&#xff0c;要高效完成营销计划&#xff0c;我们可以利用人工智能的多种能力来增强营销策略的精准度和执行效率。借助人工智能的力量&#xff0c;企业不仅可以提高营销计划的执行效率…

python项目练习——4.手写数字识别

使用Python和Scikit-learn库进行机器学习模型训练的项目——手写数字识别。 项目分析&#xff1a; 数据准备&#xff1a;使用公开数据集&#xff08;如MNIST&#xff09;作为训练和测试数据。数据预处理&#xff1a;对图像数据进行归一化、展平等操作&#xff0c;以便输入到机…

DMA知识

提示&#xff1a;文章 文章目录 前言一、背景二、 2.1 2.2 总结 前言 前期疑问&#xff1a; 本文目标&#xff1a; 一、背景 2024年3月26日23:32:43 今天看了DMA存储器到存储器的DMA传输和存储器到外设的DMA实验&#xff0c;在keil仿真可以看到效果。还没有在protues和开发…

Linux(CentOS)/Windows-C++ 云备份项目(服务器数据管理模块设计)

数据管理模块功能&#xff1a; 后续项目需要使用的数据如下 文件实际存储路径&#xff1a;当客户端需要下载文件时&#xff0c;从这个文件中进行读取响应文件压缩包存放路径名&#xff1a;如果文件是非热点文件会被压缩 如果客户端需要下载这些文件&#xff0c;需要先进行解压…

使用Flask实现:基于midjourney-proxy的MJ绘画实现(开源)

文章目录 实现效果实现步骤完整源码 实现效果 运行mj.py&#xff0c;如下所示。输入中文&#xff0c;自动生成提示词&#xff0c;自动开始下载。用户选择是否需要变换图片&#xff0c;选择需要对哪个图片变换&#xff0c;自动保存。 之前想做一个网页版&#xff0c;只实现了…

一些常见的ClickHouse问题和答案

什么是ClickHouse&#xff1f;它与其他数据库系统有什么区别&#xff1f; ClickHouse是一个开源的列式数据库管理系统&#xff08;DBMS&#xff09;&#xff0c;专门用于高性能、大规模数据分析。与传统的行式数据库相比&#xff0c;ClickHouse具有更高的查询性能、更高的数据…

新书速览|Django 5企业级Web应用开发实战:视频教学版

掌握Django框架开发技能&#xff0c;实战投票应用系统和内容管理系统 本书内容 《Django 5企业级Web应用开发实战&#xff1a;视频教学版》精选当前简单、实用和流行的Django实例代码&#xff0c;帮助读者学习和掌握Django 5框架及其相关技术栈的开发知识。本书系统全面、内容…

【STM32学习计划】

项目名称&#xff1a;STM32学习计划 项目目标&#xff1a; 熟悉STM32单片机的基本概念和应用掌握STM32的软硬件开发流程完成一个基于STM32的简单项目 项目任务和交付物&#xff1a; 任务 1&#xff1a;熟悉STM32单片机基本概念 学习STM32单片机的基本架构和功能特性交付物…

mysql如何存Emoji表情

如何存Emoji表情 背景解决方案一&#xff1a; 如果是自己搭建的数据库&#xff0c;参考如下。 1&#xff1a;先创建数据库&#xff0c;utf8编码2&#xff1a; 修改mysql 的配置文件 /etc/my.cnf 文件3&#xff1a;然后把你的表和字段也要支持utf8md4编码4&#xff1a;修改你连…

javaWeb教务查询系统

一、简介 在教育管理领域&#xff0c;教务管理系统是一个至关重要的工具&#xff0c;它能够有效地协调学校、教师和学生之间的各种活动。我设计了一个基于JavaWeb的教务管理系统&#xff0c;该系统包括三个角色&#xff1a;管理员、教师和学生。管理员拥有课程管理、学生管理、…

Spark重温笔记(五):SparkSQL进阶操作——迭代计算,开窗函数,结合多种数据源,UDF自定义函数

Spark学习笔记 前言&#xff1a;今天是温习 Spark 的第 5 天啦&#xff01;主要梳理了 SparkSQL 的进阶操作&#xff0c;包括spark结合hive做离线数仓&#xff0c;以及结合mysql&#xff0c;dataframe&#xff0c;以及最为核心的迭代计算逻辑-udf函数等&#xff0c;以及演示了几…

JWT(JSON Web Token)

JSON Web Token 是一种开放标准&#xff0c;用于在网络上安全传输信息的简洁、自包含的方式。它通常被用于身份验证和授权机制。 JWT 由三部分组成&#xff1a;头部&#xff08;Header&#xff09;、载荷&#xff08;Payload&#xff09;和签名&#xff08;Signature&#xff…