概览
ParallelChannel (有时被称为“pchan”)同时访问其包含的sub channel,并合并它们的结果。用户可通过CallMapper修改请求,通过ResponseMerger合并结果。ParallelChannel看起来就像是一个Channel:
-
支持同步和异步访问。
-
发起异步操作后可以立刻删除。
-
可以取消。
-
支持超时。
任何brpc::ChannelBase的子类都可以加入ParallelChannel,包括ParallelChannel和其他组合Channel。用户可以设置ParallelChannelOptions.fail_limit来控制访问的最大失败次数,当失败的访问达到这个数目时,RPC会立刻结束而不等待超时。
一个sub channel可多次加入同一个ParallelChannel。当你需要对同一个服务发起多次异步访问并等待它们完成的话,这很有用。
ParallelChannel的内部结构大致如下:
应用场景及详细应用剖析
实际开发中,我们常对下游服务有多次访问的需求,使用该接口能让串行的多次访问并行化,从而降低服务的整体平响。
官方的案例中默认不去对CallMapper做调整,实际开发中通常需要重写自己的CallMapper。
class TestCallMapper : public baidu::rpc::CallMapper {
public:virtual baidu::rpc::SubCall Map(int channel_index/*starting from 0*/,const google::protobuf::MethodDescriptor* method,const google::protobuf::Message* request,google::protobuf::Message* response);
};class TestResponseMerger : public baidu::rpc::ResponseMerger {
public:virtual baidu::rpc::ResponseMerger::Result Merge(google::protobuf::Message* response,const google::protobuf::Message* sub_response);
};
下面我们来看下pchan的整体使用流程
与常规channel一样我们通常需要在client中初始化pchan
baidu::rpc::ParallelChannelOptions pchan_optons;
pchan_optons.timeout_ms = _options.timeout_ms;
pchan_optons.fail_limit = all_requests.requests_size();baidu::rpc::ParallelChannel pchan;
pchan.Init(&pchan_optons);
同时需要绑定我们自己定义的CallMapper和Merger
TestCallMapper* test_call_mapper = new BusBasicEtaCallMapper();
TestResponseMerger* test_response_merger = new BusBasicEtaResponseMerger();
call_mapper内含引用计数,一个call_mapper可与多个sub channel关联。所以我们只用定义一个就可以,根据请求的数量添加对应数量的sub_channel,注意,这里如果是对同一个下游多次请求,就只用同一个channel即可,如果我们想做的是同时请求多个下游,可以初始化不同的sub_channel,然后添加进来,只不过就会导致我们的协议(request、response)要求能复用,一般对多个下游我们还是推荐使用bthread或者pthread来使用不同的channel,以便使用不同的协议。当然我们也可以初始化多个pchan来同时请求多个下游。
for () {for (int i = 0; i < all_requests.requests_size(); ++i) {pchan.AddChannel(&_channel,baidu::rpc::DOESNT_OWN_CHANNEL,eta_call_mapper,eta_response_merger);}
}
AddChannel的源码如下
int ParallelChannel::AddChannel(ChannelBase* sub_channel,ChannelOwnership ownership,CallMapper* call_mapper,ResponseMerger