chromium通信系统-ipcz系统(七)-ipcz系统代码实现-跨Node通信-NonBroker和NonBroker通信

在chromium通信系统-ipcz系统(六)-ipcz系统代码实现-跨Node通信-基础通信 一文中我们分析了broker 和 nonbroker 通信的过程。本文我们来分析NonBroker 和NonBroker的通信过程,同样以单元测试为例子分析。

mojo/core/invitation_unittest.cc

 951 DEFINE_TEST_CLIENT(NonBrokerToNonBrokerHost) {952   MojoHandle invitation = AcceptInvitation(MOJO_ACCEPT_INVITATION_FLAG_NONE);953   MojoHandle test = ExtractPipeFromInvitation(invitation);954 955   MojoHandle pipe_for_client;956   EXPECT_EQ("aaa", ReadMessageWithHandles(test, &pipe_for_client, 1));957 958   MojoHandle client;959   base::Process client_process =960       LaunchChildTestClient("NonBrokerToNonBrokerClient", &client, 1,961                             MOJO_SEND_INVITATION_FLAG_SHARE_BROKER);962 963   // Forward the pipe from the test to the client, then wait. We're done964   // whenever the client acks. The success of the test is determined by965   // the outcome of interactions between the test and the client process.966   WriteMessageWithHandles(client, "ddd", &pipe_for_client, 1);967 968   // Wait for a signal from the test to let us know we can terminate.969   EXPECT_EQ("bye", ReadMessage(test));970   WriteMessage(client, "bye");971   WaitForProcessToTerminate(client_process);972 973   MojoClose(client);974   MojoClose(test);975 }

在TEST_F(MAYBE_InvitationTest, NonBrokerToNonBroker) 这个单元测试工A和B进程是Broker和NonBroker 通信,B进程启动后又调用LaunchChildTestClient 启动C进程(代码960行), 我们进入590行来分析B和C如何通信。LaunchChildTestClient我们已经分析过了,不过在broker 启动非broker进程中第三个参数是MOJO_SEND_INVITATION_FLAG_NONE,而这里是MOJO_SEND_INVITATION_FLAG_SHARE_BROKER,通过值的名称我们可以看出来,这里是要建立共享broker。LaunchChildTestClient前边的调用流程和Broker 启动NonBroker没有区别,我们重点分析对MOJO_SEND_INVITATION_FLAG_SHARE_BROKER的特殊处理, 直接看建立Connector的过程。

third_party/ipcz/src/ipcz/node_connector.cc

528 // static
529 IpczResult NodeConnector::ConnectNode(
530     Ref<Node> node,
531     Ref<DriverTransport> transport,
532     IpczConnectNodeFlags flags,
533     const std::vector<Ref<Portal>>& initial_portals,
534     ConnectCallback callback) {
535   const bool from_broker = node->type() == Node::Type::kBroker;
536   const bool to_broker = (flags & IPCZ_CONNECT_NODE_TO_BROKER) != 0;
537   const bool inherit_broker = (flags & IPCZ_CONNECT_NODE_INHERIT_BROKER) != 0;
538   const bool share_broker = (flags & IPCZ_CONNECT_NODE_SHARE_BROKER) != 0;
539   Ref<NodeLink> broker_link = node->GetBrokerLink();......
546 
547   auto [connector, result] = CreateConnector(
548       std::move(node), std::move(transport), flags, initial_portals,
549       std::move(broker_link), std::move(callback));
550   if (result != IPCZ_RESULT_OK) {
551     return result;
552   }
553 
554   if (!share_broker && !connector->ActivateTransport()) {
555     // Note that when referring another node to our own broker, we don't
556     // activate the transport, since the transport will be passed to the broker.
557     // See NodeConnectorForReferrer.
558     return IPCZ_RESULT_UNKNOWN;
559   }
560 
561   if (!connector->Connect()) {
562     return IPCZ_RESULT_UNKNOWN;
563   }
564 
565   return IPCZ_RESULT_OK;
566 }

先分析B进程
函数参数transport是指B和C的传输点。flags 为IPCZ_CONNECT_NODE_SHARE_BROKER,所以538行share_broker为真。547行创建Connector。 与Broker 和NonBroker通信不同,这里554行并不会激活传输点。 561行直接调用Connector的Connect方法。我们先来看CreateConnector方法创建哪一类Connector。

471 std::pair<Ref<NodeConnector>, IpczResult> CreateConnector(
472     Ref<Node> node,
473     Ref<DriverTransport> transport,
474     IpczConnectNodeFlags flags,
475     const std::vector<Ref<Portal>>& initial_portals,
476     Ref<NodeLink> broker_link,
477     NodeConnector::ConnectCallback callback) {
478   const bool from_broker = node->type() == Node::Type::kBroker;
479   const bool to_broker = (flags & IPCZ_CONNECT_NODE_TO_BROKER) != 0;
480   const bool share_broker = (flags & IPCZ_CONNECT_NODE_SHARE_BROKER) != 0;
481   const bool inherit_broker = (flags & IPCZ_CONNECT_NODE_INHERIT_BROKER) != 0;......
508 
509   if (share_broker) {
510     return {MakeRefCounted<NodeConnectorForReferrer>(
511                 std::move(node), std::move(transport), flags, initial_portals,
512                 std::move(broker_link), std::move(callback)),
513             IPCZ_RESULT_OK};
514   }
515 ......
522 
523   return {nullptr, IPCZ_RESULT_INVALID_ARGUMENT};
524 }

函数510行创建了NodeConnectorForReferrer。

  NodeConnectorForReferrer(Ref<Node> node,Ref<DriverTransport> transport,IpczConnectNodeFlags flags,std::vector<Ref<Portal>> waiting_portals,Ref<NodeLink> broker_link,ConnectCallback callback): NodeConnector(std::move(node),/*transport=*/nullptr,flags,std::move(waiting_portals),std::move(callback)),transport_for_broker_(std::move(transport)),broker_link_(std::move(broker_link)) {}

NodeConnectorForReferrer的成员变量transport_for_broker_代表B和C的传输点。 broker_link_则表示B->A(Broker) 的链接。

166   // NodeConnector:
167   bool Connect() override {
168     ABSL_ASSERT(node_->type() == Node::Type::kNormal);
169     if (!broker_link_) {
170       // If there's no broker link yet, wait for one.
171       node_->WaitForBrokerLinkAsync(
172           [connector = WrapRefCounted(this)](Ref<NodeLink> broker_link) {
173             connector->broker_link_ = std::move(broker_link);
174             connector->Connect();
175           });
176       return true;
177     }
178 
179     broker_link_->ReferNonBroker(
180         std::move(transport_for_broker_), checked_cast<uint32_t>(num_portals()),
181         [connector = WrapRefCounted(this), broker = broker_link_](
182             Ref<NodeLink> link_to_referred_node,
183             uint32_t remote_num_initial_portals) {
184           if (link_to_referred_node) {
185             connector->AcceptConnection(
186                 {.link = link_to_referred_node, .broker = broker},
187                 remote_num_initial_portals);
188           } else {
189             connector->RejectConnection();
190           }
191         });
192     return true;
193   }

NodeConnectorForReferrer->Connect()函数:
169到177行如果A和B还没完全建立链接,则调用node_->WaitForBrokerLinkAsync异步等待链接建立后再执行NodeConnectorForReferrer->Connect()函数。
179-191行broker_link_->ReferNonBroker() 函数建立链接。181-191行为一个回调函数,我们后面分析。

 223 void NodeLink::ReferNonBroker(Ref<DriverTransport> transport,224                               uint32_t num_initial_portals,225                               ReferralCallback callback) {226   ABSL_ASSERT(node_->type() == Node::Type::kNormal &&227               remote_node_type_ == Node::Type::kBroker);228 229   uint64_t referral_id;230   {231     absl::MutexLock lock(&mutex_);232     for (;;) {233       referral_id = next_referral_id_++;234       auto [it, inserted] =235           pending_referrals_.try_emplace(referral_id, std::move(callback));236       if (inserted) {237         break;238       }239     }240   }241 242   msg::ReferNonBroker refer;243   refer.params().referral_id = referral_id;244   refer.params().num_initial_portals = num_initial_portals;245   refer.params().transport =246       refer.AppendDriverObject(transport->TakeDriverObject());247   Transmit(refer);248 }

注意这个NodeLink是B->A的NodeLink,A是Broker。
229 -240行生成一个referral_id,用于维护请求和回调函数的关系。
242-247行创建ReferNonBroker消息体,并发起传输。ReferNonBroker有三个参数referral_id用于找到对应回调。num_initial_portals表示B和C要建立的RouterLink。 transport为B->C的传输端点。
我们来看A如何处理该消息

 366 bool NodeLink::OnReferNonBroker(msg::ReferNonBroker& refer) {367   if (remote_node_type_ != Node::Type::kNormal ||368       node()->type() != Node::Type::kBroker) {369     return false;370   }371 372   DriverObject transport = refer.TakeDriverObject(refer.params().transport);373   if (!transport.is_valid()) {374     return false;375   }376 377   DriverMemoryWithMapping link_memory =378       NodeLinkMemory::AllocateMemory(node()->driver());379   DriverMemoryWithMapping client_link_memory =380       NodeLinkMemory::AllocateMemory(node()->driver());......390 391   return NodeConnector::HandleNonBrokerReferral(392       node(), refer.params().referral_id, refer.params().num_initial_portals,393       WrapRefCounted(this),394       MakeRefCounted<DriverTransport>(std::move(transport)),395       std::move(link_memory), std::move(client_link_memory));396 }

函数很简单,377-380行创建了两个内存映射对象,实际上指向同一块内存。
391行调用 NodeConnector::HandleNonBrokerReferral() 函数进一步处理。注意这里的transport是通过B->C的文件描述符反序列化出来的(文件描述符通过socket传递),所以这里A->C也具备了信道。

bool NodeConnector::HandleNonBrokerReferral(Ref<Node> node,uint64_t referral_id,uint32_t num_initial_portals,Ref<NodeLink> referrer,Ref<DriverTransport> transport_to_referred_node,DriverMemoryWithMapping link_memory,DriverMemoryWithMapping client_link_memory) {ABSL_ASSERT(node->type() == Node::Type::kBroker);auto connector = MakeRefCounted<NodeConnectorForBrokerReferral>(std::move(node), referral_id, num_initial_portals, std::move(referrer),std::move(transport_to_referred_node), std::move(link_memory),std::move(client_link_memory));// The connector effectively owns itself and lives only until its transport is// disconnected or it receives a greeting from the referred node.return connector->ActivateTransport();
}

HandleNonBrokerReferral函数先创建了NodeConnectorForBrokerReferral对象,这是一个Connector对象,然后调用connector->ActivateTransport()激活传输点。先来看NodeConnectorForBrokerReferral对象创建

  NodeConnectorForBrokerReferral(Ref<Node> node,uint64_t referral_id,uint32_t num_initial_portals,Ref<NodeLink> referrer,Ref<DriverTransport> transport,DriverMemoryWithMapping link_memory,DriverMemoryWithMapping client_link_memory): NodeConnector(std::move(node),std::move(transport),IPCZ_NO_FLAGS,/*waiting_portals=*/{},/*callback=*/nullptr),referral_id_(referral_id),num_initial_portals_(num_initial_portals),referrer_(std::move(referrer)),link_memory_(std::move(link_memory)),client_link_memory_(std::move(client_link_memory)) {ABSL_HARDENING_ASSERT(link_memory_.mapping.is_valid());ABSL_HARDENING_ASSERT(client_link_memory_.mapping.is_valid());}

NodeConnectorForBrokerReferral的成员变量如下:

  • referral_id_ 对端请求的id
  • num_initial_portals_ B和C要初始建立的RouterLink数量
  • referrer_: A->B的链接
  • transport:A->C的传输点
  • link_memory_ : Broker创建的共享内存对象
  • client_link_memory_: Broker创建的共享内存对象
    到目前位置建立的信道:
    A<->B, A<->C, B<->C。

传输点激活的代码我们在chromium通信系统-ipcz系统(五)-ipcz系统代码实现-信道和共享内存一文已经分析过了。

我们接下来看C进程接收邀请。

DEFINE_TEST_CLIENT(NonBrokerToNonBrokerClient) {MojoHandle invitation =AcceptInvitation(MOJO_ACCEPT_INVITATION_FLAG_INHERIT_BROKER);......
}

这里面标志为MOJO_ACCEPT_INVITATION_FLAG_INHERIT_BROKER, AcceptInvitation在chromium通信系统-ipcz系统(五)-ipcz系统代码实现-信道和共享内存一文已经分析过了。我们主要关注对MOJO_ACCEPT_INVITATION_FLAG_INHERIT_BROKER标志的处理。同样直接看Connect的过程

528 // static
529 IpczResult NodeConnector::ConnectNode(
530     Ref<Node> node,
531     Ref<DriverTransport> transport,
532     IpczConnectNodeFlags flags,
533     const std::vector<Ref<Portal>>& initial_portals,
534     ConnectCallback callback) {
535   const bool from_broker = node->type() == Node::Type::kBroker;
536   const bool to_broker = (flags & IPCZ_CONNECT_NODE_TO_BROKER) != 0;
537   const bool inherit_broker = (flags & IPCZ_CONNECT_NODE_INHERIT_BROKER) != 0;
538   const bool share_broker = (flags & IPCZ_CONNECT_NODE_SHARE_BROKER) != 0;
539   Ref<NodeLink> broker_link = node->GetBrokerLink();
540   if (share_broker && (from_broker || to_broker || inherit_broker)) {
541     return IPCZ_RESULT_INVALID_ARGUMENT;
542   }
543   if ((to_broker || from_broker) && (inherit_broker || share_broker)) {
544     return IPCZ_RESULT_INVALID_ARGUMENT;
545   }
546 
547   auto [connector, result] = CreateConnector(
548       std::move(node), std::move(transport), flags, initial_portals,
549       std::move(broker_link), std::move(callback));
550   if (result != IPCZ_RESULT_OK) {
551     return result;
552   }
553 
554   if (!share_broker && !connector->ActivateTransport()) {
555     // Note that when referring another node to our own broker, we don't
556     // activate the transport, since the transport will be passed to the broker.
557     // See NodeConnectorForReferrer.
558     return IPCZ_RESULT_UNKNOWN;
559   }
560 
561   if (!connector->Connect()) {
562     return IPCZ_RESULT_UNKNOWN;
563   }
564 
565   return IPCZ_RESULT_OK;
566 }

这里inherit_broker表示继承broker,也代表该节点非中心链接。
函数先创建Connector,然后激活传输点,最后请求建立链接,这个函数我们分析好几遍了,直接先Connector的创建。

471 std::pair<Ref<NodeConnector>, IpczResult> CreateConnector(
472     Ref<Node> node,
473     Ref<DriverTransport> transport,
474     IpczConnectNodeFlags flags,
475     const std::vector<Ref<Portal>>& initial_portals,
476     Ref<NodeLink> broker_link,
477     NodeConnector::ConnectCallback callback) {
478   const bool from_broker = node->type() == Node::Type::kBroker;
479   const bool to_broker = (flags & IPCZ_CONNECT_NODE_TO_BROKER) != 0;
480   const bool share_broker = (flags & IPCZ_CONNECT_NODE_SHARE_BROKER) != 0;
481   const bool inherit_broker = (flags & IPCZ_CONNECT_NODE_INHERIT_BROKER) != 0;......
516   if (inherit_broker) {
517     return {MakeRefCounted<NodeConnectorForReferredNonBroker>(
518                 std::move(node), std::move(transport), flags, initial_portals,
519                 std::move(callback)),
520             IPCZ_RESULT_OK};
521   }
522 
523   return {nullptr, IPCZ_RESULT_INVALID_ARGUMENT};
524 }
525 
526 }  // namespace
527 

这里创建的Connector为NodeConnectorForReferredNonBroker。我们看下它是如何创建的

203   NodeConnectorForReferredNonBroker(Ref<Node> node,
204                                     Ref<DriverTransport> transport,
205                                     IpczConnectNodeFlags flags,
206                                     std::vector<Ref<Portal>> waiting_portals,
207                                     ConnectCallback callback)
208       : NodeConnector(std::move(node),
209                       std::move(transport),
210                       flags,
211                       std::move(waiting_portals),
212                       std::move(callback)) {}

比较简单, 传输点的激活我们在chromium通信系统-ipcz系统(五)-ipcz系统代码实现-信道和共享内存一文已经分析过了。接下来看一下请求链接的过程

216   // NodeConnector:
217   bool Connect() override {
218     ABSL_ASSERT(node_->type() == Node::Type::kNormal);
219     msg::ConnectToReferredBroker connect;
220     connect.params().protocol_version = msg::kProtocolVersion;
221     connect.params().num_initial_portals =
222         checked_cast<uint32_t>(num_portals());
223     return IPCZ_RESULT_OK == transport_->Transmit(connect);
224   }

创建了一个ConnectToReferredBroker对象,这个对象非常简单。只有要初始化的端口数一个参数。 由于此时B进程并没有激活传输点,消息会被A进程读取。我们来看A进程如何处理。

回到A进程, NodeConnectorForBrokerReferral会收到对应消息。

314   // NodeMessageListener overrides:
315   bool OnConnectToReferredBroker(
316       msg::ConnectToReferredBroker& connect_to_broker) override {
317     DVLOG(4) << "Accepting ConnectToReferredBroker on broker "
318              << broker_name_.ToString() << " from new referred node "
319              << referred_node_name_.ToString();
320 
321     // Ensure this NodeConnector stays alive until this method returns.
322     // Otherwise the last reference may be dropped when the new NodeLink below
323     // takes over listening on `transport_`.
324     Ref<NodeConnector> self(this);
325 
326     // First, accept the new non-broker client on this broker node. There are
327     // no initial portals on this link, as this link was not established
328     // directly by the application. Note that this takes over listsening on
329     // `transport_`
330     const uint32_t protocol_version = std::min(
331         connect_to_broker.params().protocol_version, msg::kProtocolVersion);// 建立A->C的NodeLink
332     Ref<NodeLink> link_to_referree = NodeLink::CreateActive(
333         node_, LinkSide::kA, broker_name_, referred_node_name_,
334         Node::Type::kNormal, protocol_version, transport_,
335         NodeLinkMemory::Create(node_, std::move(link_memory_.mapping)));
336     AcceptConnection({.link = link_to_referree}, /*num_remote_portals=*/0);
337 
338     // Now we can create a new link to introduce both clients -- the referrer
339     // and the referree -- to each other.// 创建一对传输点
340     auto [referrer, referree] = DriverTransport::CreatePair(
341         node_->driver(), referrer_->transport().get(), transport_.get());
342 
343     // Give the referred node a reply with sufficient details for it to
344     // establish links to both this broker and the referrer simultaneously.
345     //
346     // SUBTLE: It's important that this message is sent before the message to
347     // the referrer below. Otherwise the referrer might begin relaying messages
348     // through the broker to the referree before this handshake is sent to the
349     // referree, which would be bad.
350     msg::ConnectToReferredNonBroker connect;
351     connect.params().name = referred_node_name_;
352     connect.params().broker_name = broker_name_;
353     connect.params().referrer_name = referrer_->remote_node_name();
354     connect.params().broker_protocol_version = protocol_version;
355     connect.params().referrer_protocol_version =
356         referrer_->remote_protocol_version();
357     connect.params().num_initial_portals = num_initial_portals_;
358     connect.params().broker_link_buffer =
359         connect.AppendDriverObject(link_memory_.memory.TakeDriverObject());
360     connect.params().referrer_link_transport =
361         connect.AppendDriverObject(referree->TakeDriverObject());
362     connect.params().referrer_link_buffer = connect.AppendDriverObject(
363         client_link_memory_.memory.Clone().TakeDriverObject());// 向C发送ConnectToReferredNonBroker 消息
364     link_to_referree->Transmit(connect);
365 
366     // Finally, give the referrer a repy which includes details of its new link
367     // to the referred node.
368     msg::NonBrokerReferralAccepted accepted;
369     accepted.params().referral_id = referral_id_;
370     accepted.params().protocol_version =
371         connect_to_broker.params().protocol_version;
372     accepted.params().num_initial_portals =
373         connect_to_broker.params().num_initial_portals;
374     accepted.params().name = referred_node_name_;
375     accepted.params().transport =
376         accepted.AppendDriverObject(referrer->TakeDriverObject());
377     accepted.params().buffer = accepted.AppendDriverObject(
378         client_link_memory_.memory.TakeDriverObject());// 向B发送NonBrokerReferralAccepted 消息
379     referrer_->Transmit(accepted);
380     return true;
381   }

函数332-336 建立A->C的NodeLink。340-341行创建一对传输点(这对传输点可以互相通信)
350行-364行向C发送一个ConnectToReferredNonBroker消息
368-379行向B发送一个NonBrokerReferralAccepted 消息。我们先来看一下ConnectToReferredNonBroker包含的参数。

ConnectToReferredNonBroker:

  • name: 为C分配的NodeName
  • broker_name_: A进程为broker进程,它对应的名字
  • referrer_name: B进程Node的名字
  • broker_protocol_version: broker协议版本
  • referrer_protocol_version:B的协议版本
  • num_initial_portals 初始化端口数量
  • broker_link_buffer 和broker的共享内存
  • referrer_link_transport:创建的一对传输点的一端
  • referrer_link_buffer 和B的共享内存

再来看一下NonBrokerReferralAccepted的数据过程

  • referral_id:B的请求id
  • protocol_version 协议版本
  • num_initial_portals 初始化端口数
  • referred_node_name_ C NodeName
  • transport:创建的一对传输点的另一端
  • buffer:共享内存

知道的消息体的结构,我们大概可以知道A进程帮助B、C进程创建了一个通信管道,这就是broker的作用,我们来具体看B、C进程如何处理消息。 先看C进程
third_party/ipcz/src/ipcz/node_connector.cc
NodeConnectorForReferredNonBroker

227   bool OnConnectToReferredNonBroker(
228       msg::ConnectToReferredNonBroker& connect) override {......
234     DriverMemoryMapping broker_mapping =
235         DriverMemory(
236             connect.TakeDriverObject(connect.params().broker_link_buffer))
237             .Map();
238     DriverMemoryMapping referrer_mapping =
239         DriverMemory(
240             connect.TakeDriverObject(connect.params().referrer_link_buffer))
241             .Map();
242     Ref<DriverTransport> referrer_transport = MakeRefCounted<DriverTransport>(
243         connect.TakeDriverObject(connect.params().referrer_link_transport));
244     if (!broker_mapping.is_valid() || !referrer_mapping.is_valid() ||
245         !referrer_transport->driver_object().is_valid()) {
246       return false;
247     }
248 
249     // Ensure this NodeConnector stays alive until this method returns.
250     // Otherwise the last reference may be dropped when the new NodeLink takes
251     // over listening on `transport_`.
252     Ref<NodeConnector> self(this);
253     const uint32_t broker_protocol_version = std::min(
254         connect.params().broker_protocol_version, msg::kProtocolVersion);
255     auto broker_link = NodeLink::CreateActive(
256         node_, LinkSide::kB, connect.params().name,
257         connect.params().broker_name, Node::Type::kBroker,
258         broker_protocol_version, transport_,
259         NodeLinkMemory::Create(node_, std::move(broker_mapping)));
260     if ((flags_ & IPCZ_CONNECT_NODE_TO_ALLOCATION_DELEGATE) != 0) {
261       node_->SetAllocationDelegate(broker_link);
262     }
263     node_->AddConnection(connect.params().broker_name,
264                          {
265                              .link = broker_link,
266                              .broker = broker_link,
267                          });
268 
269     const uint32_t referrer_protocol_version = std::min(
270         connect.params().referrer_protocol_version, msg::kProtocolVersion);
271     auto referrer_link = NodeLink::CreateInactive(
272         node_, LinkSide::kB, connect.params().name,
273         connect.params().referrer_name, Node::Type::kNormal,
274         referrer_protocol_version, std::move(referrer_transport),
275         NodeLinkMemory::Create(node_, std::move(referrer_mapping)));
276 
277     AcceptConnection({.link = referrer_link, .broker = broker_link},
278                      connect.params().num_initial_portals);
279     referrer_link->Activate();
280     return true;
281   }

252-267行创建了和broker(A进程)的NodeLink。
269-278行 创建了C和B的NodeLink,并初始化端口,然后激活链接。

我们再开看B进程如何处理NonBrokerReferralAccepted消息
third_party/ipcz/src/ipcz/node_link.cc

398 bool NodeLink::OnNonBrokerReferralAccepted(399     msg::NonBrokerReferralAccepted& accepted) {400   if (remote_node_type_ != Node::Type::kBroker) {401     return false;402   }403 404   ReferralCallback callback;405   {406     absl::MutexLock lock(&mutex_);407     auto it = pending_referrals_.find(accepted.params().referral_id);408     if (it == pending_referrals_.end()) {409       return false;410     }411     callback = std::move(it->second);412     pending_referrals_.erase(it);413   }414 415   const uint32_t protocol_version =416       std::min(msg::kProtocolVersion, accepted.params().protocol_version);417   auto transport = MakeRefCounted<DriverTransport>(418       accepted.TakeDriverObject(accepted.params().transport));419   DriverMemoryMapping mapping =420       DriverMemory(accepted.TakeDriverObject(accepted.params().buffer)).Map();421   if (!transport->driver_object().is_valid() || !mapping.is_valid()) {422     // Not quite a validation failure if the broker simply failed to allocate423     // resources for this link. Treat it like a connection failure.424     callback(/*link=*/nullptr, /*num_initial_portals=*/0);425     return true;426   }427 428   Ref<NodeLink> link_to_referree = NodeLink::CreateInactive(429       node_, LinkSide::kA, local_node_name_, accepted.params().name,430       Node::Type::kNormal, protocol_version, std::move(transport),431       NodeLinkMemory::Create(node_, std::move(mapping)));432   callback(link_to_referree, accepted.params().num_initial_portals);433   link_to_referree->Activate();434   return true;435 }

创建了B->C的NodeLink,并激活。这个过程中还会回调callback。回顾一下回调函数

166   // NodeConnector:
167   bool Connect() override {
168     ABSL_ASSERT(node_->type() == Node::Type::kNormal);
169     if (!broker_link_) {
170       // If there's no broker link yet, wait for one.
171       node_->WaitForBrokerLinkAsync(
172           [connector = WrapRefCounted(this)](Ref<NodeLink> broker_link) {
173             connector->broker_link_ = std::move(broker_link);
174             connector->Connect();
175           });
176       return true;
177     }
178 
179     broker_link_->ReferNonBroker(
180         std::move(transport_for_broker_), checked_cast<uint32_t>(num_portals()),
181         [connector = WrapRefCounted(this), broker = broker_link_](
182             Ref<NodeLink> link_to_referred_node,
183             uint32_t remote_num_initial_portals) {
184           if (link_to_referred_node) {
185             connector->AcceptConnection(
186                 {.link = link_to_referred_node, .broker = broker},
187                 remote_num_initial_portals);
188           } else {
189             connector->RejectConnection();
190           }
191         });
192     return true;
193   }

181-191行,调用AcceptConnection 接受链接。

到这里我们大概理解了NonBroker和NonBroker通信的流程,是借助broker实现的,broker帮忙建立Transport,并且分配共享内存。

到这里NonBroker 和NonBroker通信我们就分析完了。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/589655.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

面试算法:快速排序

题目 快速排序是一种非常高效的算法&#xff0c;从其名字可以看出这种排序算法最大的特点就是快。当表现良好时&#xff0c;快速排序的速度比其他主要对手&#xff08;如归并排序&#xff09;快2&#xff5e;3倍。 分析 快速排序的基本思想是分治法&#xff0c;排序过程如下…

Python期末复习资料

一、基础知识点 1. Python基础语法 变量与数据类型 定义变量&#xff0c;理解变量的命名规则基本数据类型&#xff1a;整数、浮点数、字符串列表、元组、字典、集合等复合数据类型 1. 1 变量 1.1.1 变量的定义 在Python中&#xff0c;变量是用来存储数据值的标识符。你可…

.Net8 新特性之键控服务-依赖注入一对多模式

在.NET8 中引入了KeyedService支持&#xff0c;这使得可以支持一对多的依赖注入了。在官方&#xff0c;这个功能叫键化DI服务。 什么是键化DI服务&#xff1f;官方解释是这样的&#xff0c;键化依赖项注入&#xff08;DI&#xff09;服务提供了一种适用键来注册和检索DI服务的…

【随口一说】最近的CSDN

这段时间随便发的一篇博文很快就有“点赞”、“收藏”、“关注”的信息&#xff0c; 而且简单看了一眼用户&#xff0c;很多都是空的或者一堆转载&#xff0c; 机器人也太明显了点&#xff0c;很让人不舒服&#xff0c; 不花点心思设计文章评优推送算法反倒用机器人刷热门&…

问题描述:智能驾驶中的FSD是什么意思。

问题描述&#xff1a;智能驾驶中的FSD是什么意思。 问题描述&#xff1a; FSD 在智能驾驶领域通常指的是 "Full Self-Driving"&#xff0c;即全自动驾驶。这表示汽车具备了足够的智能和技术&#xff0c;能够在不需要人类干预的情况下完成所有驾驶任务。全自动驾驶系…

硬件知识之ESD保护器件

ESD保护器件&#xff0c;全称是Electrostatic Discharge Protection Device&#xff0c;是一种专门用与防止电子设备遭受外部静电放电而损坏的元器件。静电放电会在接口或器件表面积累成千上万伏特的能量&#xff0c;这些能量可能会引起设备故障或者持久性损伤&#xff0c;甚至…

185.【2023年华为OD机试真题(C卷)】报文重排序(顺序索引实现JavaPythonC++JS)

请到本专栏顶置查阅最新的华为OD机试宝典 点击跳转到本专栏-算法之翼:华为OD机试 🚀你的旅程将在这里启航!本专栏所有题目均包含优质解题思路,高质量解题代码,详细代码讲解,助你深入学习,深度掌握! 文章目录 【2023年华为OD机试真题(C卷)】报文重排序(顺序索引…

电机(一):直流有刷电机和舵机

声明&#xff1a;以下图片来自于正点原子&#xff0c;仅做学习笔记使用 电机专题&#xff1a; 直流电机&#xff1a;直流有刷BDC&#xff08;内含电刷&#xff09;&#xff0c;直流无刷BLDC&#xff08;大疆的M3508和M2006&#xff09;,无刷电机有以下三种形式&#xff1a;&a…

C语言之分支与循环【附6个练习】

文章目录 前言一、什么是语句&#xff1f;1.1 表达式语句1.2 函数调用语句1.3 控制语句1.4 复合语句1.5 空语句 二、分支语句&#xff08;选择结构&#xff09;2.1 if语句2.1.1 悬空else2.1.2 练习&#xff08;1. 判断一个数是否为奇数 2. 输出1-100之间的奇数&#xff09; 2.2…

deepfacelive实时换脸教程(2024最新版)

deepfacelive其实操作用法很简单&#xff0c;难的是模型的制作。本帖主要讲deepfacelive&#xff08;下文简称dflive&#xff09;软件本身的操作&#xff0c;以及模型怎么从dfl转格式过来&#xff0c;至于模型如何训练才能效果好&#xff0c;请移步教程区&#xff0c;看deepfac…

51单片机中TCON, IE, PCON等寄存器的剖析

在单片机中&#xff0c;如何快速通过名字记忆IQ寄存器中每一个控制位的作用呢&#xff1f; IE&#xff08;interrupt enable&#xff09;寄存器中&#xff0c;都是中断的使能位置。 其中的EA&#xff08;enable all&#xff09;是总使能位&#xff0c;ES(enable serial)是串口…

构建安全的SSH服务体系

某公司的电子商务站点由专门的网站管理员进行配置和维护&#xff0c;并需要随时从Internet进行远程管理&#xff0c;考虑到易用性和灵活性&#xff0c;在Web服务器上启用OpenSSH服务&#xff0c;同时基于安全性考虑&#xff0c;需要对 SSH登录进行严格的控制&#xff0c;如图10…

WorkQueue模型

WorkQueues&#xff0c;也被称为任务队列模型。当消息处理比较耗时的时候&#xff0c;可能生产消息的速度会远远大于消息的消费速度。长此以往&#xff0c;消息就会堆积越来越多&#xff0c;无法及时的处理。此时就可以使用work模型&#xff1a;让多个消费者绑定到一个队列&…

问题描述:与编码器对应的解码器是什么,说明一下解码器的名字由来,结构,原理,特点,用处。

问题描述&#xff1a;与编码器对应的解码器是什么&#xff0c;说明一下解码器的名字由来&#xff0c;结构&#xff0c;原理&#xff0c;特点&#xff0c;用处。 问题解答&#xff1a; 定义&#xff1a;解码器是一种电子设备或程序&#xff0c;用于将经过编码的数据转换回原始…

概率论基础

1.概率论 1.1 随机事件与概率 1.1.1 基本概念 ​ 样本点(sample point)&#xff1a; 称为试验 S S S的可能结果为样本点&#xff0c;用 ω \omega ω表示。 ​ 样本空间(sample space)&#xff1a;称试验 S S S的样本点构成的集合为样本空间&#xff0c;用 Ω \Omega Ω表示…

R语言【CoordinateCleaner】——cc_gbif(): 根据通过 method 参数定义的方法,删除或标记地理空间中异常值的记录。

cc_gbif()是R语言包coordinatecleaner中的一个函数&#xff0c;用于清理GBIF&#xff08;全球生物多样性信息设施&#xff09;数据集的地理坐标。该函数可以识别潜在的坐标错误&#xff0c;并对其进行修正或删除。 以下是cc_gbifl()函数的一般用法和主要参数&#xff1a; cc_…

把form表单数据转为json,并传给父页面

阻止form表单提交&#xff0c;表单数据转为json字符串&#xff0c;并传给父页面 // 获取表单元素var form document.getElementById(myForm);// 监听表单提交事件form.addEventListener(submit, function(event) {// 在这里处理表单提交的逻辑var rental_id $("#c-id&q…

gem5学习(8):创建一个简单的缓存对象--Creating a simple cache object

目录 一、SimpleCache SimObject 二、Implementing the SimpleCache 1、getSlavePort() 2、handleRequest() 3、AccessEvent() 4、accessTiming() &#xff08;1&#xff09;缓存命中&#xff1a;sendResponse() &#xff08;2&#xff09;缓存未命中&#xff1a; 三、…

实现3x3卷积的手写FIFO

例子来自米联科例程&#xff0c; 因为不同平台之间调IP会变麻烦&#xff0c;重新阅读手册太花时间了&#xff08;虽然我觉得fifo这种常用IP尽量掌握为好&#xff09;&#xff0c;使用手写的FIFO可以节约开发的流程。 通过这个例子也可以优化自己所使用的手写FIFO。 // by C…

matlab概率论例子

高斯概率模型&#xff1a; [f,xi] ksdensity(x): returns a probability density estimate, f, for the sample in the vector x. The estimate is based on a normal kernel function, and is evaluated at 100 equally spaced points, xi, that cover the range of the da…