Hadoop源码分析7: IPC流程(1) 主要类

1.服务器端主要类

public abstractclass Server

{

  public static final ByteBuffer HEADER =ByteBuffer.wrap("hrpc".getBytes());

  public static final byte CURRENT_VERSION =4;

  private static finalThreadLocalServer SERVER= new ThreadLocalServer();  

  private static finalThreadLocal<CallCurCall = newThreadLocal<Call>();

  private String bindAddress;   

  private int port;           

  private int handlerCount;     

  private int readThreads;     

  private Class<? extendsWritable> paramClass;    

  private int maxIdleTime;   

  private int thresholdIdleConnections;     

  private Configuration conf;

  private int maxQueueSize;

  private final int maxRespSize;

  private int socketSendBufferSize;

  volatile private boolean running =true;

  privateBlockingQueue<CallcallQueue;

  privateList<ConnectionconnectionList = Collections.synchronizedList(newLinkedList<Connection>());

  private Listener listener= null;

  private Responder responder= null;

  private int numConnections = 0;

  private Handler[]handlers = null;

 

  //内部类Server.Call,包装请求参数

  private static class Call {

     private int id;                          // theclient's call id    

     private Writable param;                   // the parameter passed   

     private Connection connection;   

     private ByteBuffer response;     

  }   

 

//内部类Server.Listener ,线程  

  private class Listener extendsThread {

     privateServerSocketChannel acceptChannel =null; //the accept channel

     privateSelector selector = null; //theselector that we use for the server

     private Reader[]readers = null;

     private int currentReader =0;

     private InetSocketAddressaddress; //the address we bind at

     private Random rand = newRandom();

     private longlastCleanupRunTime = 0; 

     private ExecutorServicereadPool;    

 

  //内部类Server.Listener.Reader 线程    

    privateclass Reader implementsRunnable {

       privatevolatile boolean adding = false;

       privateSelector readSelector =null;

   }  

 

   //内部类Server.Responder  , 线程  

    privateclass Responder extendsThread {

      privateSelector writeSelector;

      private intpending;     

   } 

 

//内部类Server.Connection,而Client.Connection是线程 

    publicclass Connection {

      privateboolean rpcHeaderRead = false; // if initial rpc header isread

      privateboolean headerRead = false;  

      privateSocketChannel channel;

      privateByteBuffer data;

      privateByteBuffer dataLengthBuffer;

      privateLinkedList<CallresponseQueue;

      privatevolatile int rpcCount = 0; // number of outstanding rpcs

      privatelong lastContact;

      private intdataLength;

      privateSocket socket;

      privateString hostAddress;

      private intremotePort;

      privateInetAddress addr;

     ConnectionHeader header = new ConnectionHeader();

     Class<?> protocol;

      privateAuthMethod authMethod; 

   }  

 

  //内部类Server.Handler,线程 

  private class Handler extendsThread {

 }

 

}

 

2.客户端主要类

public classClient {

  privateHashtable<ConnectionId, Connectionconnections = new Hashtable<ConnectionId,Connection>(); 

   privateClass? extendsWritable valueClass;  

   private intcounter;                        // counter for call ids

   privateAtomicBoolean running = new AtomicBoolean(true); // if clientruns

   finalprivate Configuration conf;

   privateSocketFactory socketFactory;          // how tocreate sockets

 

   private intrefCount = 1;


  //内部类Client.Call

   private class Call {

     int id;                                 // callid

     Writable param;                          //parameter

     Writable value;                          // value,null if error

     IOException error;                       // exception, null ifvalue

     boolean done; 

  }

   //内部类Client.Connection ,线程 ,而Server.Connection不是线程 

   private class Connection extendsThread {

        privateInetSocketAddress server;           // server ip:port

        privateString serverPrincipal;  // server's krb5principal name

        privateConnectionHeader header;            // connection header

        privatefinal ConnectionId remoteId;             // connection id

        privateAuthMethod authMethod; // authentication method

        private Socket socket = null;              // connected socket

        privateDataInputStream in;

        privateDataOutputStream out;

        private intrpcTimeout;

        private intmaxIdleTime;  

        private intmaxRetries; //the max. no. of retries for socket connections

        privateboolean tcpNoDelay; // if T then disable Nagle's Algorithm

        private intpingInterval; / 

        privateHashtable<Integer, Callcalls= new Hashtable<Integer, Call>();

        privateAtomicLong lastActivity = new AtomicLong(); 

        privateAtomicBoolean shouldCloseConnection = new AtomicBoolean();  

        private IOException closeException; // closereason  


       //内部类Client.Connection.PingInputStream

        private class PingInputStream extendsFilterInputStream {

      }


   }

   //内部类Client.ParallelCall 

   privateclass ParallelCall extends Call {

      privateParallelResults results;

      private intindex;

   }   

   //内部类Client.ParallelResults 

    private static classParallelResults {

       privateWritable[] values;

       privateint size;

       privateint count;

     //

    }

   

  //内部类Client.ConnectionId 

   static class ConnectionId {

      InetSocketAddress address;

      UserGroupInformationticket;

      Class<?>protocol;

      privatestatic final int PRIME = 16777619;

      privateint rpcTimeout;

      privateString serverPrincipal;

      privateint maxIdleTime;  

      privateint maxRetries; //the max. no. of retries for socketconnections

      privateboolean tcpNoDelay; // if T then disable Nagle's Algorithm

      privateint pingInterval; // how often sends ping to the server inmsecs

    }

    

}

3.RPC主要类

public class RPC {

  private static ClientCache CLIENTS=newClientCache();


  //内部类RPC.ClientCache 

  static privateclass ClientCache {

     private MapSocketFactory, Client clients=  new HashMapSocketFactory, Client();

  }


 //内部类RPC.Invocation ,只是一个包装请求参数的普通类,不执行动态代理方法

   private static class Invocationimplements Writable, Configurable {

      privateString methodName;

      privateClass[] parameterClasses;

      privateObject[] parameters;

 

      privateConfiguration conf;

  }


  //内部类RPC.Invoker ,执行动态代理方法

   private static class Invoker implementsInvocationHandler {

       privateClient.ConnectionId remoteId;

       privateClient client;

       privateboolean isClosed = false;

   }

    //内部类RPC.VersionMismatch 

    public static classVersionMismatch extends IOException {

       privateString interfaceName;

       privatelong clientVersion;

       privatelong serverVersion;

   }  


   //内部类RPC.Server ,添加了两个成员  instance,verbose

  public static class Server extendsorg.apache.hadoop.ipc.Server {

        private Object instance;

        private boolean verbose;

   }



}

4.其他类

 

//IPC所有类都要实现的接口

publicinterface VersionedProtocol {

  publiclong getProtocolVersion(String protocol,  longclientVersion) throws IOException;

}

 

 

//连接头信息,包括protocol,userGroupInformation ,  authMethod三个成员变量

class ConnectionHeader implementsWritable  { 

   private String protocol;

   private UserGroupInformation ugi =null;

   private AuthMethod authMethod; ......

 

//访问状况,包括SUCCESS、ERROR、FATAL

enum Status{

  SUCCESS(0),

  ERROR(1),

  FATAL(-1);.......  

}

 

//包装IO异常

publicclass RemoteException extendsIOException {

}

 

 

转载于:https://www.cnblogs.com/leeeee/p/7276533.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/375842.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

html5 服务器手机编程,html5实现服务器发送事件

页面DocumentStatus:Server Datajs代码创建一个新的 EventSource 对象,然后规定发送更新的页面的 URL(本例中是 "demo_sse.php")每接收到一次更新,就会发生 onmessage 事件当 onmessage 事件发生时,把已接收的数据推入 id 为 "…

【动态规划】【多重背包】[HDU 1291]悼念512汶川大地震遇难同胞――珍惜现在,感恩生活...

这道题目是一个多重背包的题目,多重背包实际上就是把整个物品的件数拆分成a0∗20a1∗21a2∗22...an∗2n且a0或1这样每一次最优解实际上就是在之前的基础上进行的最优解的累加,但是发现如果物品数量不是恰好是某几个数之和,那么就会出现有几个…

输出字符串的比特串

48是0的ASCII码&#xff0c;49是1的ASCII码&#xff0c;char型占一个字节&#xff0c;四个比特。 #include<iostream> #include<string.h> using namespace std; string Str2Bin (char* str){int change,k0,mask8;char bit;char stack[100]{0};for (short i 0; i…

法律专业计算机基础试卷答案,大学计算机基础试题及答案

以下是小编整理的关于大学计算机基础试题及答案&#xff0c;希望对你有帮助。一、单选题1、完整的计算机系统由(C)组成。A、运算器、控制器、存储器、输入设备和输出设备B、主机和外部设备C、硬件系统和软件系统D、主机箱、显示器、键盘、鼠标、打印机2、以下软件中&#xff0c…

凭据不工作

最悲催的事情是什么&#xff1f;那就是你可以远程别人的电脑&#xff0c;但是别人不能远程自己的电脑&#xff01; 背景&#xff1a; 换了个win8.1的系统&#xff0c;刚开始可以远程上,过了几天,电脑突然不能远程了,让我很是郁闷呀. 于是在网上查了好多资料,看看他到底是什么原…

【最后的冲刺】android中excel表的导入和数据处理

【最后的冲刺】android中excel表的导入和数据处理 ——学校课程的查询和修改 1.编写 The Class类把课程表courses.db当做一个实体类&#xff0c;hashcode和equals这两个类是为了判断输入的查询内容和Excel表中的内容是否一致。 并在java里面区别两个对象是否一致 1 public clas…

详解C++函数模板

函数模板属于类属&#xff0c;能够处理不同的数据类型&#xff0c;当编译器遇到函数调用是&#xff0c;将根据实际参数的类型产生特定的代码&#xff0c;函数模板的定义形式是&#xff1a; template <类型参数表> 返回值类型 函数名&#xff08;形式参数表&#xff09;{…

计算机专业女兵,陈豪2010《点解阿Sir》剧照

0陈豪2010《点解阿Sir》剧照2012-07-21 08:24{"info": {"setname": "陈豪2010《点解阿Sir》剧照","imgsum_bk": 20,"imgsum": 20,"lmodify": "2012-07-21 08:24:00","prevue": " "…

ASP.NET MVC学习之Ajax(完结)

一.前言 通过上面的一番学习&#xff0c;大家一定收获不少。但是总归会有一个结束的时候&#xff0c;但是这个结束也意味着新的开始。 如果你是从事ASP.NET开发&#xff0c;并且也使用了第三方控件&#xff0c;那么一定会觉得ASP.NET开发ajax十分的简单&#xff0c;而ASP.NET M…

认知计算机语言学,什么是认知语言学

文献综述&#xff1a;“语文素养”内涵研究综述“语文素养”内涵研究综述摘要&#xff1a;“语文素养”是新一轮语文课程改革所提出的一个重要概念&#xff0c;其作为语文课程改革的目标与核心理念&#xff0c;挤兑了“语文能力”的核心地位。目前&#xff0c;人们对“语文素养…

data URI scheme及其应用

data URI scheme通俗来讲就是图片直接塞到HTML而不是由HTTP。这样从表面上看会降低一次HTTP的请求&#xff0c;实现了对于网页的优化&#xff08;只是看了其它一些文章data URI由于将图片採用了base 64的编码方式进行表达&#xff0c;所以还是须要进行HTTP去下载内容&#xff0…

Linux 禁用触摸板

1&#xff0c;首先需要查看触摸板&#xff1a; 命令&#xff1a;xinput list 结果&#xff1a; ⎡ Virtual core pointer         id2 [master pointer (3)]    ⎜ ↳ Virtual core XTEST pointer      id4 […

大学新生学计算机推荐电脑,大学新生用什么电脑好呢?

科技的发展日新月异&#xff0c;数码的yi巴为你资讯。今天是7月的开头&#xff0c;我们正式迈入了2019下半年。7月开头也正是许多大多数高考生快忙完志愿填报&#xff0c;开始考虑大学该选择什么电脑的时候。今天yi巴就来跟大家聊聊该大学新生该怎么选择电脑&#xff0c;并给予…

NewCode----句子反转

题目描述 给定一个句子&#xff08;只包含字母和空格&#xff09;&#xff0c; 将句子中的单词位置反转&#xff0c;单词用空格分割, 单词之间只有一个空格&#xff0c;前后没有空格。 比如&#xff1a; &#xff08;1&#xff09; “hello xiao mi”-> “mi xiao hello” …

mac boot2docker certs not valid with 1.7

摘自&#xff1a;https://github.com/boot2docker/boot2docker/issues/824 An error occurred trying to connect: Get https://192.168.59.103:2376/v1.19/containers/json: x509: certificate is valid for 127.0.0.1, 10.0.2.15, not 192.168.59.103 I come with the same p…

对象之间的交互

之前写过一篇随笔《剪刀剪纸》是给一些新同事讲面向对象时用的&#xff0c;当时就感觉有些不顺畅&#xff0c;不过用来给新同事入门足够了就没多想&#xff0c;最近看书时偶尔走神把这件事想起来了&#xff0c;顺便群里讨论时谈到聚合之间的方法调用&#xff0c;于是决定写一篇…

NewCode----数串

题目描述&#xff1a; 设有n个正整数&#xff0c;将他们连接成一排&#xff0c;组成一个最大的多位整数。 如:n3时&#xff0c;3个整数13,312,343,连成的最大整数为34331213。 如:n4时,4个整数7,13,4,246连接成的最大整数为7424613。 输入描述: 有多组测试样例&#xff0c…

计算机跨专业专插本学音乐,欢迎投稿丨专插本可以跨专业考,只要肯坚持!

点击上方△蓝字可关注我们昵称E师姐性别女插本复习资料教材、小红书、CB398、启航等插本关注的公众号、网站等介绍微信公众号居多&#xff1a;专插本资料库、专插本直通车、广东省专插本、插本最前线等等……专科学校和专业广州城市职业学院 会计插本学校和专业广东财经大学华商…

Android,监控ContentProvider的数据改变

有时候应用中需要监听ContentProvider的改变并提供响应&#xff0c;这时候就要利用ContentObserver类了 不管是ContentProvider中实现的,insert,delete,update方法中的任何一个&#xff0c;程序都会调用getContext().getContentResolver().notifyChange(uri,null); 这行代码可用…

[leetcode]Sort List

题目要求&#xff1a;Sort a linked list in O(n log n) time using constant space complexity. 数据结构定义&#xff1a; 1 /** 2 * Definition for singly-linked list. 3 * struct ListNode { 4 * int val; 5 * ListNode *next; 6 * ListNode(int x) : v…