以ClientProtocol接口中的rename RPC调用进行一次实例分析。
rename方法在ClientProtocol接口中定义,它的两个参数是String类型的,不能直接通过网络传输。
我们看谁实现了ClientProtocol
接口并重写rename方法。
看到是ClientNamenodeProtocolTranslatorPB
这个类。看下这个类如何实现的rename方法,代码如下:
@Override
public boolean rename(String src, String dst) throws IOException {
//把String类型的参数进行包装,变成可通过网络传输的序列化类型
RenameRequestProto req = RenameRequestProto.newBuilder()
.setSrc(src)
.setDst(dst).build();
try {
//通过调用rpcProxy的rename方法实现rpc调用,接下来我们就看rpcProxy这个对象
return rpcProxy.rename(null, req).getResult();
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
}
rpcProxy
是ClientNamenodeProtocolTranslatorPB
这个类的成员变量,类型是ClientNamenodeProtocolPB
。
那我们就继续点进去rpcProxy.rename方法。发现点不进去了,因为rpcProxy的类型是ClientNamenodeProtocolPB
。这个类继承了protoBuf生成的类,我们还没有编译所以点不进去。
但是可以看看相关的.proto文件。找到ClientNamenodeProtocol.proto文件,搜索rename。能够看到protobuf定义的参数和返回值。参数就是把原本的两个字符串类型包装成新的RenameRequestProto类型。
似乎我们是没法往下看了。感觉也没什么收获?那你感觉错了。我们还可以看看ClientNamenodeProtocolTranslatorPB的构造函数,看看是怎么初始化的rpcProxy对象。
继续追,看看谁调用了这个构造函数。
这三处地方调用了。我们进入第一个方法:
我们继续看这个proxy是怎么生成的。
这是一个链式调用。先调用了RPC.getProtocolProxy,然后调用了getProxy()。
调用getProtocolProxy得到一个ProtocolProxy< ClientNamenodeProtocolPB>对象,这个对象包含了用于和服务端通信的代理对象和服务端支持的方法集合。在这个对象的基础上调用getProxy方法得到那个代理对象。
RPC.getProtocolProxy()的代码如下:
可以看到首先调用了getProtocolEngine获得通信所使用的RpcEngine,然后又调用了getProxy获得一个客户端-side 的代理对象。
RpcEngine.getProxy()的实现如下(我们进入的是ProtobufRpcEngine的实现):
进到这里一切豁然开朗,可以看到这里使用了JDK的动态代理。我们关注这句代码:
return new ProtocolProxy(protocol, (T) Proxy.newProxyInstance(
protocol.getClassLoader(), new Class[]{protocol}, invoker), false);
new ProtocolProxy()的第二个参数代表这个对象里保存的代理对象。
(T) Proxy.newProxyInstance(
protocol.getClassLoader(), new Class[]{protocol}, invoker)
生成代理对象使用JDK中Proxy类的静态方法newProxyInstance。第三个参数是InvocationHandler的实现,真正的方法调用就在这个InvocationHandler里的invoke里调用。我们看到传入的invoke是new的ProtobufRpcEngine的内部类Invoker。这个Invoker最终会在invoke方法中去调用Client.call方法向RPC Server发送RPC请求,这个过程对客户端是完全透明的。如下图所示, 已用红框标出。
至此我们是明白了客户端是如何把RPC请求发送到服务器了。总结一下(采用正序的方式)
因为我们调用方法一般是使用DFSClient这个类的对象去调用,所以在DFSClient里搜rename方法。可以看到这个方法内部调用namenode.rename()方法。那么我们就需要知道那么namenode是怎么构造出来的。
namenode的声明如下:
final ClientProtocol namenode;
它是一个ClientProtocol类型的变量,因此可以调用所有客户端对NN进行操作的方法。接下来找构造函数,看看如何给namenode成员变量赋值的。
DFSClient在自己的构造函数里会调用 NameNodeProxiesClient.createProxyWithClientProtocol()
来获得一个支持ClientProtocol协议的代理对象。最后把NameNode的代理赋值给namenode成员。这样我们就可以通过namenode对象来调用各种方法操作了。
创建这个代理的方法是如下代码:
红色箭头所指分别代表创建non-HA的和HA的代理。如果配置了dfs.client.failover.proxy.provider.$nameservice属性,就认为是开启HA的集群。因为现在集群几乎都是HA的,所以我们看下面这个箭头的创建HA代理对象的方法createHAProxy
。记住传入的最后两个参数ClientProtocol.class和failoverProxyProvider。后面我们还会用到这两个参数,到时候可以再回来看。
createHAProxy
方法代码如下:
public static ProxyAndInfo createHAProxy(
Configuration conf, URI nameNodeUri, Class xface,
AbstractNNFailoverProxyProvider failoverProxyProvider) {
Preconditions.checkNotNull(failoverProxyProvider);// HA case
DfsClientConf config = new DfsClientConf(conf);//我们主要看这个,因为你看方法的返回值是Proxy和Info,我们只需要关注Proxy就好了。
T proxy = (T) RetryProxy.create(xface, failoverProxyProvider,
RetryPolicies.failoverOnNetworkException(
RetryPolicies.TRY_ONCE_THEN_FAIL, config.getMaxFailoverAttempts(),
config.getMaxRetryAttempts(), config.getFailoverSleepBaseMillis(),
config.getFailoverSleepMaxMillis()));
Text dtService;if (failoverProxyProvider.useLogicalURI()) {
dtService = HAUtilClient.buildTokenServiceForLogicalUri(nameNodeUri,
HdfsConstants.HDFS_URI_SCHEME);
} else {
dtService = SecurityUtil.buildTokenService(
DFSUtilClient.getNNAddress(nameNodeUri));
}return new ProxyAndInfo<>(proxy, dtService,
DFSUtilClient.getNNAddressCheckLogical(conf, nameNodeUri));
}
因为一般我们都会有retry机制,所以这里使用了RetryProxy去create代理。跟到RetryProxy.create中。注意create的前两个参数就是刚刚让大家记住的两个。分别是:ClientProtocol.class和failoverProxyProvider。
进到RetryProxy.create源码:
此时这个泛型T,其实就是ClientProtocol。这个方法里使用JDK的动态代理生成代理对象,这块我们应该很熟悉。newProxyInstance的三个参数,我们看第三个参数,因为在代理对象调用方法最终都会由这个RetryInvocationHandler的invoke方法接管。所以我们去看看RetryInvocationHandler的实现。点进去。
可以看到,RetryInvocationHandler的构造方法中还是有我们之前让大家记住的参数failoverProxyProvider。
因为JDK的动态代理最终方法会走到invoke。所以我们直接去看RetryInvocationHandler的invoke方法。
这里主要是通过循环去实现retry。循环中关键方法是invokeOnce()。点进去。我们关注正常情况下的处理,先不关注异常情况。正常情况下就是再调用invoke()
再点进去:
继续点进invokeMethod()方法中:
再点进invokeMethod中,终于到头了。不能再继续往下点invoke了,那就是JDK的方法了。框起来的代码表示在代理对象上调用此Method底层代表的方法(本例是rename),并把args作为参数传给它。
那这个代理对象是谁呢,proxyDescriptor.getProxy()返回的是什么呢?这时候想想刚才让大家记住的那个参数。
所以proxyDescriptor里面的proxy就是proxyProvider中保存的代理对象信息。而创建这个proxyProvider的时机正是在DFSClient调用的createProxyWithClientProtocol
方法里。而且,从下面的的代码可以看到,if (failoverProxyProvider == null)
还会再这样我们就可以唱:“又回到最初的起点,呆呆的站在代码前”。
点进去看看怎么创建的failoverProxyProvider。经过一系列重载,最终走到下图这个地方。可以看到是通过反射机制,去new的FailoverProxyProvider对象。newInstance的第三个参数xface就是我们的代理类的类型,所以最终这个failoverProxyprovider里面会有我们需要的代理对象。到此分析结束。
这里面还有一个细节。就是如何得到的FailoverProxyProvider的Class对象。通过跟踪getFailoverProxyProviderClass
方法。
上图红框中圈出的configKey = “dfs.client.failover.proxy.provider”+host。这个host是在core-site.xml中fs.defaultFS这个属性里配置的,把hdfs的schema去掉就是host。这个配置正好对应了官网hdfs-site.xml中的:
那这个host就是nameservice ID。为什么这么说呢,因为官网配置解释就这么说的:
那我们看看一般这个类是什么,可以看到,一般是ConfiguredFailoverProxyProvider这个类。我们通过这个配置项拿到了类的全路径名。
最后通过Class.forName就加载了这个类。
大功告成。
创建完namenode这个代理对象之后,就可以通过它调用各种方法了。比如我们之前举例的rename。我们点进去,进入了ClientProtocol接口。那最终肯定调用的是它的实现类。这就回到了我们文章开头。
至此Hadoop RPC在客户端一侧的调用流程已经全部分析完毕。