Hadoop RPC框架

原文:http://blog.csdn.net/thomas0yang/article/details/41211259

----------------------------------------------------------------------------------------------

1、RPC框架概述
1.1 RPC(Remote Procedure Call Protocol)——远程过程调用协议,它是一种通过网络从远程计算机程序上请求服务,而不需要了解底层网络技术的协议。RPC协议假定某些传输协议的存在,如TCP或UDP,为通信程序之间携带信息数据。在OSI网络通信模型中,RPC跨越了传输层和应用层。RPC使得开发包括网络分布式多程序在内的应用程序更加容易。
1.2 RPC通常采用客户端服务器模型,其框架主要有以下几部分
  • 通信模块:实现请求应该协议。主要分为同步方式和异步方式。
  • stub程序:客户端和服务器均包含stub程序,可以看做代理程序。使得远程函数表现的跟本地调用一样,对用户程序完全透明。
  • 调度程序:接受来自通信模块的请求消息,根据标识选择stub程序处理。并发量大一般采用线程池处理。
  • 客户程序/服务过程:请求发出者和请求的处理者。
1.3 RPC流程图


2、Hadoop RPC基本框架
2.1Hadoop RPC的使用方法见代码
服务
public interface MyBiz extends VersionedProtocol {
    long PROTOCOL_VERSION = 12321443L;
    String hello(String name);
}
public class MyBizImpl implements MyBiz {
    @Override
    public long getProtocolVersion(String arg0, long arg1) throws IOException {
        return PROTOCOL_VERSION;
    }

    @Override
    public String hello(String name) {
        System. out.println( "invoked");
        return "hello " + name;
    }
}

服务器
public class MyServer {
    public static final String SERVER_ADDRESS = "localhost";
    public static final int SERVER_PORT = 12345;

    public static void main(String[] args) throws IOException {
        Server server = RPC. getServer(new MyBizImpl(), SERVER_ADDRESS, SERVER_PORT , new Configuration());
        server.start();
    }
}

客户端
public class MyClient {
    public static void main(String[] args) throws IOException {
        MyBiz proxy = (MyBiz) RPC. getProxy(MyBiz.class, MyBiz.PROTOCOL_VERSION,
                new InetSocketAddress(MyServer. SERVER_ADDRESS,MyServer.SERVER_PORT),
                new Configuration());
        String result = proxy.hello( "5");
        System. out.println(result);
        RPC.stopProxy(proxy);
    }
}

2.2 org.apache.hadoop.ipc.RPC类解析
RPC类主要包含三部分:
  • ClientCache(成员变量):根据用户提供的SocketFactory来缓存Client对象,以便重用Client对象。
  • Server(内部类):继承Server抽象类,利用反射实现了call方法,即客户端请求的方法和对应参数完成方法调用。
  • Invocation(内部类):将要调用的方法名和参数打包成可序列化的对象,方便客户端和服务器之间传递。

2.3 客户端和服务器端的关系
  • Client-NameNode之间,其中NameNode是服务器
  • Client-DataNode之间,其中DataNode是服务器
  • DataNode-NameNode之间,其中NameNode是服务器
  • DataNode-DateNode之间,其中某一个DateNode是服务器,另一个是客户端
2.4 org.apache.hadoop.ipc.Client类解析
2.4.1 Client类中主要包含:
  • Call(内部类):封装了一个RPC请求,包含5个成员变量,唯一表示id、函数调用信息param、函数返回值value、函数异常信息error、函数完成标识done。Hadoop rpc server采用异步方式处理客户端请求,使得远程过程调用的发生顺序和返回顺序无直接关系,而客户端正是通过id识别不同的函数调用。当客户端向服务器发送请求,只需填充id和param两个变量,其余3个变量由服务器端根据函数执行情况填充。
  • Connection(内部类,一个线程):是client和server之间的一个通信连接,封装了连接先关的基本信息和操作。基本信息包括:通信连接唯一标识remoteId(ConnectionId)、与Server端通信的scoket、网络输入输出流in/out、保存RPC请求的哈希表calls(Hashtable<Integer, Call>)。操作包括:addCall将一个Call对象添加到哈希表中;sendParam想服务器端发送RPC请求;receiveResponse从服务器端接收已经处理完成的RPC请求;run调用receiveResponse方法,等待返回结果。
  • ConnectionId(内部类):连接的标记(包括server地址,协议,其他一些连接的配置项信息)
  • ParallelCall(内部类):实现并行调用的请求
  • ParallelResults(内部类):并行调用的执行结果
2.4.2 Client类中主要对外通过两个接口,分别用于单个远程调用和批量远程调用。
public Writable call(Writable param, ConnectionId remoteId)  throws InterruptedException, IOException
public Writable call(Writable param, InetSocketAddress addr,  Class<?> protocol, UserGroupInformation ticket,
int rpcTimeout, Configuration conf)  throws InterruptedException, IOException

2.4.3 调用流程分析,当调用call函数执行某个远程方法时,有以下几个步骤:
1)创建一个Connection对象,并将远程方法调用信息封装成Call对象,放到Connection对象中的哈希表中;
2)调用Connection类中的sendRpcRequest()方法将当前Call对象发送给Server端;
3)Server端处理完RPC请求后,将结果通过网络返回给Client端,Client端通过receiveRpcResponse()函数获取结果;
4)Client检查结果处理状态(成功还是失败),并将对应Call对象从哈希表中删除。

2.4.4 一个Client包含多个连接,private Hashtable<ConnectionId, Connection> connections = new Hashtable<ConnectionId, Connection>();

2.5 org.apache.hadoop.ipc.Server类解析

2.5.1 背景
Hadoop采用了Master/Slave结构,其中Master是整个系统的单点,如NameNode或JobTracker,这是制约系统性能和可扩展性的最关键因素之一;而Master通过ipc.Server接收并处理所有Slave发送的请求,这就要求ipc.Server 将高并发和可扩展性作为设计目标。为此,ipc.Server采用了很多提高并发处理能力的技术,主要包括线程池、事件驱动和Reactor设计模式等,这些技术均采用了JDK自带的库实现,这里重点分析它是如何利用Reactor设计模式提高整体性能的。

2.5.2 reactor设计模式
Reactor是并发编程中的一种基于事件驱动的设计模式,它具有以下两个特点:通过派发/分离I/O操作事件提高系统的并发性能;提供了粗粒度的并发控制,使用单线程实现,避免了复杂的同步处理。典型的Reactor实现原理如图所示。

典型的Reactor模式中主要包括以下几个角色。
  • Reactor:I/O事件的派发者。
  • Acceptor:接受来自Client的连接,建立与Client对应的Handler,并向Reactor注册此Handler。
  • Handler:与一个Client通信的实体,并按一定的过程实现业务的处理。Handler内部往往会有更进一步的层次划分,用来抽象诸如read、decode、compute、encode和send等过程。在Reactor模式中,业务逻辑被分散的I/O事件所打破,所以Handler需要有适当的机制在所需的信息还不全(读到一半)的时候保存上下文,并在下一次I/O事件到来的时候(另一半可读)能继续上次中断的处理。
  • Reader/Sender:为了加速处理速度,Reactor模式往往构建一个存放数据处理线程的线程池,这样数据读出后,立即扔到线程池中等待后续处理即可。为此,Reactor模式一般分离Handler中的读和写两个过程,分别注册成单独的读事件和写事件,并由对应的Reader和Sender线程处理。
2.5.3 java nio代码实例
package com.sohu.tv.nio;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;

/**
* NIO服务端
* @author 小路
*/
public class NIOServer {
    //通道管理器
    private Selector selector;

    /**
     * 获得一个ServerSocket通道,并对该通道做一些初始化的工作
     * @param port  绑定的端口号
     * @throws IOException
     */
    public void initServer(int port) throws IOException {
        // 获得一个ServerSocket通道
        ServerSocketChannel serverChannel = ServerSocketChannel.open();
        // 设置通道为非阻塞
        serverChannel.configureBlocking(false);
        // 将该通道对应的ServerSocket绑定到port端口
        serverChannel.socket().bind(new InetSocketAddress(port));
        // 获得一个通道管理器
        this.selector = Selector.open();
        //将通道管理器和该通道绑定,并为该通道注册SelectionKey.OP_ACCEPT事件,注册该事件后,
        //当该事件到达时,selector.select()会返回,如果该事件没到达selector.select()会一直阻塞。
        serverChannel.register(selector, SelectionKey.OP_ACCEPT);
    }

    /**
     * 采用轮询的方式监听selector上是否有需要处理的事件,如果有,则进行处理
     * @throws IOException
     */
    @SuppressWarnings("unchecked")
    public void listen() throws IOException {
        System.out.println("服务端启动成功!");
        // 轮询访问selector
        while (true) {
            //当注册的事件到达时,方法返回;否则,该方法会一直阻塞
            selector.select();
            // 获得selector中选中的项的迭代器,选中的项为注册的事件
            Iterator ite = this.selector.selectedKeys().iterator();
            while (ite.hasNext()) {
                SelectionKey key = (SelectionKey) ite.next();
                // 删除已选的key,以防重复处理
                ite.remove();
                // 客户端请求连接事件
                if (key.isAcceptable()) {
                    ServerSocketChannel server = (ServerSocketChannel) key
                            .channel();
                    // 获得和客户端连接的通道
                    SocketChannel channel = server.accept();
                    // 设置成非阻塞
                    channel.configureBlocking(false);

                    //在这里可以给客户端发送信息哦
                    channel.write(ByteBuffer.wrap(new String("向客户端发送了一条信息").getBytes()));
                    //在和客户端连接成功之后,为了可以接收到客户端的信息,需要给通道设置读的权限。
                    channel.register(this.selector, SelectionKey.OP_READ);

                    // 获得了可读的事件
                } else if (key.isReadable()) {
                    read(key);
                }

            }

        }
    }
    /**
     * 处理读取客户端发来的信息 的事件
     * @param key
     * @throws IOException
     */
    public void read(SelectionKey key) throws IOException{
        // 服务器可读取消息:得到事件发生的Socket通道
        SocketChannel channel = (SocketChannel) key.channel();
        // 创建读取的缓冲区
        ByteBuffer buffer = ByteBuffer.allocate(10);
        channel.read(buffer);
        byte[] data = buffer.array();
        String msg = new String(data).trim();
        System.out.println("服务端收到信息:"+msg);
        ByteBuffer outBuffer = ByteBuffer.wrap(msg.getBytes());
        channel.write(outBuffer);// 将消息回送给客户端
    }

    /**
     * 启动服务端测试
     * @throws IOException
     */
    public static void main(String[] args) throws IOException {
        NIOServer server = new NIOServer();
        server.initServer(8000);
        server.listen();
    }
}



package com.sohu.tv.nio;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;

/**
* NIO客户端
* @author 小路
*/
public class NIOClient {
    //通道管理器
    private Selector selector;

    /**
     * 获得一个Socket通道,并对该通道做一些初始化的工作
     * @param ip 连接的服务器的ip
     * @param port  连接的服务器的端口号
     * @throws IOException
     */
    public void initClient(String ip,int port) throws IOException {
        // 获得一个Socket通道
        SocketChannel channel = SocketChannel.open();
        // 设置通道为非阻塞
        channel.configureBlocking(false);
        // 获得一个通道管理器
        this.selector = Selector.open();

        // 客户端连接服务器,其实方法执行并没有实现连接,需要在listen()方法中调
        //用channel.finishConnect();才能完成连接
        channel.connect(new InetSocketAddress(ip,port));
        //将通道管理器和该通道绑定,并为该通道注册SelectionKey.OP_CONNECT事件。
        channel.register(selector, SelectionKey.OP_CONNECT);
    }

    /**
     * 采用轮询的方式监听selector上是否有需要处理的事件,如果有,则进行处理
     * @throws IOException
     */
    @SuppressWarnings("unchecked")
    public void listen() throws IOException {
        // 轮询访问selector
        while (true) {
            selector.select();
            // 获得selector中选中的项的迭代器
            Iterator ite = this.selector.selectedKeys().iterator();
            while (ite.hasNext()) {
                SelectionKey key = (SelectionKey) ite.next();
                // 删除已选的key,以防重复处理
                ite.remove();
                // 连接事件发生
                if (key.isConnectable()) {
                    SocketChannel channel = (SocketChannel) key
                            .channel();
                    // 如果正在连接,则完成连接
                    if(channel.isConnectionPending()){
                        channel.finishConnect();

                    }
                    // 设置成非阻塞
                    channel.configureBlocking(false);

                    //在这里可以给服务端发送信息哦
                    channel.write(ByteBuffer.wrap(new String("向服务端发送了一条信息").getBytes()));
                    //在和服务端连接成功之后,为了可以接收到服务端的信息,需要给通道设置读的权限。
                    channel.register(this.selector, SelectionKey.OP_READ);

                    // 获得了可读的事件
                } else if (key.isReadable()) {
                    read(key);
                }
            }
        }
    }
    /**
     * 处理读取服务端发来的信息 的事件
     * @param key
     * @throws IOException
     */
    public void read(SelectionKey key) throws IOException{
        //和服务端的read方法一样
    }


    /**
     * 启动客户端测试
     * @throws IOException
     */
    public static void main(String[] args) throws IOException {
        NIOClient client = new NIOClient();
        client.initClient("localhost",8000);
        client.listen();
    }

}


2.5.4 server处理流程
ipc.Server的主要功能是接收来自客户端的RPC请求,经过调用相应的函数获取结果后,返回给对应的客户端。为此,ipc.Server被划分成3个阶段:接收请求、处理请求和返回结果。
(1)接收请求
     该阶段主要任务是接收来自各个客户端的RPC请求,并将它们封装成固定的格式(Call类)放到一个共享队列(callQueue)中,以便进行后续处理。该阶段内部又分为建立连接和接收请求两个子阶段,分别由Listener和Reader两种线程完成。
     整个Server只有一个Listener线程,统一负责监听来自客户端的连接请求,一旦有新的请求到达,它会采用轮询的方式从线程池中选择一个Reader线程进行处理,而Reader线程可同时存在多个,它们分别负责接收一部分客户端连接的RPC请求,至于每个Reader线程负责哪些客户端连接,完全由Listener决定,当前Listener只是采用了简单的轮询分配机制。
     Listener和Reader线程内部各自包含一个Selector对象,分别用于监听SelectionKey.OP_ACCEPT和SelectionKey.OP_READ事件。对于Listener线程,主循环的实现体是监听是否有新的连接请求到达,并采用轮询策略选择一个Reader线程处理新连接;对于Reader线程,主循环的实现体是监听(它负责的那部分)客户端连接中是否有新的RPC请求到达,并将新的RPC请求封装成Call对象,放到共享队列callQueue中。

(2)处理请求
     该阶段主要任务是从共享队列callQueue中获取Call对象,执行对应的函数调用,并将结果返回给客户端,这全部由Handler线程完成。
     Server端可同时存在多个Handler线程,它们并行从共享队列中读取Call对象,经执行对应的函数调用后,将尝试着直接将结果返回给对应的客户端。但考虑到某些函数调用返回结果很大或者网络速度过慢,可能难以将结果一次性发送到客户端,此时Handler将尝试着将后续发送任务交给Responder线程。

(3)返回结果
     前面提到,每个Handler线程执行完函数调用后,会尝试着将执行结果返回给客户端,但对于特殊情况,比如函数调用返回结果过大或者网络异常情况(网速过慢),会将发送任务交给Responder线程。
     Server端仅存在一个Responder线程,它的内部包含一个Selector对象,用于监听SelectionKey.OP_WRITE事件。当Handler没能将结果一次性发送到客户端时,会向该Selector对象注册SelectionKey.OP_WRITE事件,进而由Responder线程采用异步方式继续发送未发送完成的结果。







本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/539337.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

JavaSE基础知识学习-----泛型

泛型 Java泛型是jdk1.5的一个新特性&#xff0c;jdk的性特性还包括&#xff1a;泛型&#xff0c;枚举&#xff0c;装箱和拆箱&#xff0c;可变参数等。这里先主要学习泛型。这些特性&#xff0c;现在都在广泛的使用。因为现在使用IDE编写代码&#xff0c;都是标准的代码提示&am…

centos7 校正linux系统时间_Linux系统:Centos7下搭建ClickHouse列式存储数据库

一、ClickHouse简介1、基础简介Yandex开源的数据分析的数据库&#xff0c;名字叫做ClickHouse&#xff0c;适合流式或批次入库的时序数据。ClickHouse不应该被用作通用数据库&#xff0c;而是作为超高性能的海量数据快速查询的分布式实时处理平台&#xff0c;在数据汇总查询方面…

html调用js页面显示不出来了,JS代码文件调用显示乱码,直接写在html页面的里可以调用,但是单独放在js文件里不能调用...

最近遇到了一个很奇怪的问题&#xff0c;就是在HTML网页代码里直接写JS代码可以正常运行的代码&#xff0c;使用JS文件调用就不行。var cities [ {"name" : "北京"}, {"name" : "上海"}, {"name" : "广州"} ];$(…

水系图一般在哪里找得到_城市供水系统防护体系的探索与思考

城市是一个国家或地区的政治、经济和文化中心&#xff0c; 在战争中常常被选为重点打击目标。1988年&#xff0c;时任美国空军司令部副参谋长助理的约翰A. 沃登上校提出“五环”目标打击理论&#xff0c;将 对敌打击目标分为五个层&#xff0c;其中就将基础设施列为第三层打击目…

Hadoop webHDFS设置和使用说明

原文&#xff1a;http://blog.csdn.net/iloveyin/article/details/28264027 ---------------------------------------------------------------------------------------- 1.配置 namenode的hdfs-site.xml是必须将dfs.webhdfs.enabled属性设置为true&#xff0c;否则就不能使…

CES 2017前瞻之AI:无人机依旧小巧,机器人主打家庭服务

再过2天&#xff0c;CES 2017就要开始了&#xff0c;根据这些已知晓的部分展商&#xff0c;我们也许能够看到未来的一些趋势。 还有2天&#xff0c;备受瞩目的CES 2017&#xff08;2017年国际消费类电子产品展览会&#xff09;就要拉开帷幕了。 每一年&#xff0c;CES上都会出…

ionic html5 上传图片,ionic4+angular7+cordova上传图片功能的实例代码

前言ionic是一个垮平台开发框架&#xff0c;可通过web技术开发出多平台的应用。但只建议开发简单应用。复杂的应用需要用到许多cordova插件&#xff0c;而cordova插件的更新或者移动平台的更新很可能导致插件的不可用&#xff0c;维护升级成本较高。安装插件安装插件Image Pick…

HDFS体系结构

Namenode 是整个文件系统的管理节点。它维护着整个文件系统的文件目录树&#xff0c;文件/目录的元信息metadate和每个文件对应的数据块列表。 功能&#xff1a;接收用户的操作请求。 metadate信息包括&#xff1a; 1、文件的owership和permission。 2、文件包含哪些block块…

为什么要将html页面和样式表分离,0031 如何使用css文件对网页内容和样式进行分离...

原标题&#xff1a;0031 如何使用css文件对网页内容和样式进行分离上节课&#xff0c;学习了针对文字可以设置很多种样式。这节课&#xff0c;学习如何将内容和样式进行分离。上节课的课后练习1.将斜体字体效果去除2.将工作经历和工作经验(部分)这2行文字也做成简介这行文字的效…

redis 关系数据库怎么转换 和_redis数据库设计(转)

阅读目录redis是什么redis就是一个存储key-value键值对的仓库&#xff0c;如何使用redis在于如何理解你需要设计的系统的E-R的模型&#xff0c;然后合理的规划redis的数据库结构场景我举一个简单的消息系统的例子&#xff0c;业务需求&#xff1a;服务器端发送消息给用户E-R模型…

Hadoop Archives

介绍 时间&#xff1a; Hadoop Archives (HAR files)是在0.18.0版本中引入的。 作用&#xff1a; 将hdfs里的小文件打包成一个文件&#xff0c;相当于windows的zip&#xff0c;rar。Linux的 tar等压缩文件。把多个文件打包一个文件。 意义&#xff1a; 它的出现就是为了缓…

js 判断日期时间差

2019独角兽企业重金招聘Python工程师标准>>> alert(GetDateDiff("2018-02-27 19:20:22","2018-02-27 09:20:22","hour"));function GetDateDiff(startTime, endTime, diffType) {//将xxxx-xx-xx的时间格式&#xff0c;转换为 xxxx/xx…

python 图形_Python图形数据

CSGraph代表 压缩稀疏图 &#xff0c;它着重于基于稀疏矩阵表示的快速图算法。 图表表示 首先&#xff0c;让我们了解一个稀疏图是什么以及它在图表示中的作用。 什么是稀疏图&#xff1f; 图形只是节点的集合&#xff0c;它们之间有链接。图表几乎可以代表任何事物 - 社交网络…

本地运行hadoop-Failed to locate the winutils binary in the hadoop binary path

转自&#xff1a;http://www.cnblogs.com/zq-inlook/p/4386216.html 之前在mac上调试hadoop程序&#xff08;mac之前配置过hadoop环境&#xff09;一直都是正常的。因为工作需要&#xff0c;需要在windows上先调试该程序&#xff0c;然后再转到linux下。程序运行的过程中&#…

dubbo 支持服务降级吗_dubbo面试题!会这些,说明你真正看懂了dubbo源码

整理了一些dubbo可能会被面试的面试题&#xff0c;感觉非常不错。如果你基本能回答说明你看懂了dubbo源码&#xff0c;对dubbo了解的足够全面。你可以尝试看能不能回答下。我们一起看下有哪些问题吧&#xff1f;dubbo中"读接口"和"写接口"有什么区别?谈谈…

不满足于汽车制造,丰田展示仿钢铁侠机器支撑腿架

而汽车制造商开发机器人也不是丰田一家的专利&#xff0c;此前现代也推出过类似的支撑机器人腿架 大多数人对于丰田的印象都停留在汽车制造上&#xff0c;不过他们却不仅仅满足于汽车事业的发展&#xff0c;最近&#xff0c;丰田正在研发一款机器人支撑腿架&#xff0c;来帮助…

js html异步加载的属性,异步加载JS的五种方式

方案一&#xff1a;点评&#xff1a;HTML5中新增的属性&#xff0c;Chrome、FF、IE9&IE9均支持(IE6~8不支持)。此外&#xff0c;这种方法不能保证脚本按顺序执行。方案二&#xff1a;点评&#xff1a;兼容所有浏览器。此外&#xff0c;这种方法可以确保所有设置defer属性的…

python中各操作符的优先级_Python3练习题系列(06)——各种符号总结

Python3中的各种符号总结 1关键字 import keyword print(keyword.kwlist, end\t) [False, None, True, and, as, assert, break, class, continue, def, del, elif, else, except, finally, for, from, global, if, import, in, is, lambda, nonlocal, not, or, pass, raise, r…

hdfs java读写hdfs demo

windows环境配置&#xff1a; 1.下载winutils的windows版本 GitHub上&#xff0c;有人提供了winutils的windows的版本&#xff0c; 项目地址是&#xff1a;https://github.com/srccodes/hadoop-common-2.2.0-bin,直接下载此项目的zip包&#xff0c;下载后是文件名是hadoop-comm…

cesium 经纬度绘制点_NCL绘制2016年1号台风(Nepartak)

begin ncol 6 ;台风参数 nrow 31 ;时次总数 nbin 6 ;已知该该气旋共经历了6个等级的演变 ;读入台风资料 data asciiread("NEPARTAK.txt",(/nrow,ncol/),"integer") ;/31,6/ 31行6列&#xff0c;integer整数类型 ;;数据读取函数总结&…