RTPS协议之Behavior Module

目录

  • 交互要求
    • 基本要求
    • RTPS Writer 行为
    • RTPS Reader行为
  • RTPS协议的实现
  • 与Reader匹配的Writer的行为
  • 涉及到的类型
  • RTPS Writer实现
    • RTPS Writer
    • RTPS StatelessWriter
    • RTPS ReaderLocator
    • RTPS StatefulWriter
    • RTPS ReaderProxy
    • RTPS ChangeForReader
  • RTPS StatelessWriter Behavior
    • Best-Effort StatelessWriter Behavior
      • Reliable StatelessWriter Behavior
  • RTPS StatefulWriter Behavior
    • Best-Effort StatefulWriter
    • Reliable StatefulWriter
  • Writer Liveliness Protocol
    • built-in Endpoints
      • Qos
    • Data Types
    • 使用BuiltinParticipantMessageWriter和BuiltinParticipantMessageReader实现Writer Liveliness Protocol
  • Optional Behavior
    • Large Data
      • 如何选择fragment的size
      • 如何发送片段
      • 如何重组分片
      • Reliable通信

主要描述rtps实体的动态行为,主要记录rtps writer和rtps reader之间的有效的序列消息交换,和这些消息交换的时间限制。RTPS Writer和RTPS Reader的时序交互如下:
RTPS Endpoints交互
说明:

  1. DDS DataWriter使用writer发送数据
  2. DataWriter调用RTPS Writer的new_change创建一个新的CacheChange,每个CacheChange使用SequenceNumber唯一标识;
  3. new_change返回;
  4. DDS的DataWriter操作使用add_change操作将CacheChange添加到RTPS的Writer的HistoryCache中;
  5. add_change操作返回
  6. writer操作返回,用户完成写数据
  7. RTPS writer使用Data Submessage发送CacheChange的内容给RTPS reader,并通过同时发送一个Heartbeat的Submessage的ack请求;
  8. RTPS reader接收到Data消息,通过add_change操作将CacheChange添加到RTPS reader的HistoryCache中;
  9. add_change返回,此时CacheChange对于DDS DataReader和DDS user是有效且可见的,这依赖于RTPS reader的reliabilityLevel属性;
    a. 对于RELIABLE 的DDS DataReader,只有当之前的RTPS Reader’s HistoryCache的changes可见时,当前的changes可见;
    b. 对于BEST_EFFORT DDS DataReader,只有没有后面的changes是可见的时候即可(i.e., if there are no changes in the RTPS Receiver’s HistoryCache with a higher sequence number)
  10. DDS user收到通知(通过listener或者WaitSet)并通过DataReader的take函数读取数据
  11. DDS 的DataReader通过HistoryCache的get_change函数读取change;
  12. get_change操作返回CacheChange给DataReader;
  13. take操作返回数据给DDS user;
  14. RTPS Reader发送AckNack消息表示CacheChange已存放到Reader的HistoryCache中,AckNack消息中包含RTPS reader的GUID,和change的SequenceNumber。这些操作和通知DDS user或take操作是独立的(即并行的);
  15. StatefulWriter记录RTPS Reader已经收到CacheChange并使用acked_changes_set操作将其加入到由ReaderProxy维护的acked_changes的set中。
  16. DDS user在DataReader上调用return_loan操作表明不再使用前面通过take操作获取到的数据;
  17. DataReader使用remove_change操作从HistoryCache中删除数据;
  18. remove_change操作返回;
  19. return_loan操作返回;
  20. DataWriter通过is_acked_by_all查看和该StatefulWriter匹配的哪些RTPS reader的CacheChanges接收到了;
  21. DataWriter通过remove_change从RTPS Writer的HistoryCache中删除指定seq_num的change;这一步操作也需要考虑到DDS Qos如DURABILITY
  22. remove_change返回

交互要求

基本要求

  • 所有的通信必须使用RTPS Messages
  • 必须实现RTPS Message Receiver

RTPS Writer 行为

  • writers发送数据不能out-of-order,即必须要按添加到HistoryCache中的数据发送
  • 如果reader要求,Writers必须包含 in-line Qos
  • Writers只有在reliable下必须周期性发送HEARTBEAT Messages。
    • Writer必须通过周期性的发送HEARTBEAT message通知每个匹配的reliable readers存在有效的data sample,这个HEARTBEAT message中包含有效sample的Sequence number;如果没有有效的samples,就没有HEARTBEAT Message需要发送;
    • 对于严格使用reliable通信的情况,Writer必须持续性给Readers发送HEARTBEAT Messages,直到Reader要么确认收到了所有有效的samples,要么消失
    • 其他情况下,HEARTBEAT消息的发送可以是特定实现的,也可以是有限的
  • 在reliable模式下(且只在该模式下),Writer必须对negative acknowledgment作出回应。当收到ACKNACK Message,表明Reader丢失了一些data sample,Writer必须要么发送丢失的data sample,发送一个GAP message当sample是无关的时候;要么当sample不再有效时发送一个HEARTBEAT message;Writer可以立即回应,或者选择未来的某个时间点发送;可以合并多个相关的response一起发送;
  • 属于一个Group的Writer会发送HEARTBEAT or GAP Submessages给和他匹配的Readers,即使是Reader接收到了Writer的所有的samples。This is necessary for the Subscriber to detect the
    group sequence numbers that are not available in that Writer. The exception to this rule is when the Writer has
    sent DATA or DATA_FRAG Submessages that contain the same information.

RTPS Reader行为

best-effort Reader只接收数据,不会发送任何数据;所以下面描述的要求仅限于reliable Reader。

  • Readers在收到一个没有设置最终flag的HEARTBEAT消息时必须作出响应。一旦收到一个未设置最终flag的HEARTBEAT消息时,Reader必须回应一个ACKNACK Message。这个ACKNACK Message消息可以用于回应收到了所有的data sample,或者一些data sample丢失了。响应可能会延迟,以避免消息风暴(在RTPS协议中,当一个节点收到一个HEARTBEAT消息时,它并不一定立即作出响应。相反,它可能会延迟一段时间后才做出响应,以防止在短时间内收到大量的HEARTBEAT消息,导致网络拥塞或过载。通过延迟响应,系统可以更有效地管理通信流量,确保通信的稳定性和可靠性)。
  • Readers在收到一个HEARTBEAT消息后发现data sample丢失了的时候必须响应一个ACKNACK Message。这一要求只有在Reader的缓存可以容纳这些丢失的data sample时才适用,并且与HEARTBEAT消息中的最终标志的设置无关。换句话说,在RTPS协议中,如果接收者的缓存能够处理缺失的数据样本,并且即使在HEARTBEAT消息中未设置最终标志,接收者仍然需要按照先前提到的规则做出响应。这强调了接收者在处理数据丢失时的一致性和可靠性。
    • 这个消息可以延迟,以避免消息风暴
    • 当一个liveness HEARTBEAT同时设置了liveness标志和final标志时,表明这是一个仅包含liveness信息的消息,此时不需要进行响应。换句话说,在RTPS协议中,如果一个HEARTBEAT消息被设置了liveness标志和flag标志,那么接收者不需要做出响应,因为这个消息只是用来表明发送者的存活状态,而不需要接收者采取任何措施。
  • 一旦一个Reader使用ACKNACK消息积极地确认接收了一个数据样本,那么它就不能在之后的时间点再次使用负面确认(NAK)消息来否定地确认相同的数据样本。
    • 当一个Writer从所有Readers那里都收到了积极的确认消息(positive acknowledgement),表示所有Readers都成功接收了相应的数据样本后,Writer可以收回与该数据样本相关的任何资源。这意味着Readers可以释放内存或其他资源,因为数据已经成功传输给了所有的Readers。
    • 如果一个Writer之前收到了积极的确认消息,表示某个数据样本已经成功接收,然后在之后又收到了一个负面的确认消息(negative acknowledgement),表示该样本未成功接收,但Readers仍能够处理这个请求,那么Readers应该重新发送这个数据样本。这样做是为了确保数据的完整性和可靠性,即使在之前已经收到积极确认的情况下,如果有必要,仍然应该重新发送未成功接收的数据样本。——?TODO 是否矛盾?
  • Readers只能发送ACKNACK message用于回应HEARTBEAT Message。在稳定状态下,ACKNACK消息只能作为对来自Writer的HEARTBEAT消息的响应而发送。当Reader首次发现写入者时,可以发送ACKNACK消息作为一种优化。Reader不需要对这些预先发送的ACKNACK消息做出响应。

RTPS协议的实现

  • Stateless实现:被优化用于可伸缩性。它在远端entities上几乎不保存任何状态,因此在大型系统中具有非常好的可伸缩性。这涉及到一种权衡,因为改进的可伸缩性和减少的内存使用可能需要额外的带宽使用。Stateless的实现非常适合通过组播进行best-effort通信。
  • Stateful实现:在远端entities上维护完整状态。这种方法最小化了带宽使用,但需要更多的内存,并可能导致可伸缩性降低。与Stateless实现相比,它可以保证严格可靠的通信,并能够在写入者端应用基于QoS或基于内容的过滤。
  • 实际的实现不需要完全遵守上面的规则,可以根据需要结合以上特点。Stateless Reference Implementation在远程实体上保持了最少的信息和状态。因此,它无法在写入端执行基于时间的过滤,因为这需要跟踪每个远程读取器及其属性。它也无法在读取器端丢弃无序样本,因为这需要跟踪从每个远程写入器接收到的最大序列号。一些实现可能模仿 Stateless Reference Implementation,但选择存储足够的额外状态以避免上述一些限制。所需的额外信息可以永久性地存储,这种情况下,实现接近于状态参考实现,或者可以慢慢老化并在需要时保留,以尽可能地模拟保持状态时的行为。

与Reader匹配的Writer的行为

与Reader匹配的RTPS Writer的行为,取决于RTPS Writer和RTPS Reader的reliabilityLevel属性(reliable和best-effort)的设置。
Writer和Reader允许的匹配:

  1. RTPS Writer的reliabilityLevel属性设置为RELIABLE
  2. RTPS Writer和RTPS Reader的reliabilityLevel属性都设置为BEST_EFFORT

这是因为DDS标准要求,BEST_EFFORT DDS DataWriter只能BEST_EFFORT DDS DataReader,RELIABLE DDS DataWriter可以匹配RELIABLE 和 BEST_EFFORT DDS DataReader
以上说明只限定于匹配,对于Stateful和Stateless来说,它们是可以通信的。

涉及到的类型

Attribute typePurpose
Duration_t时间
ChangeForReaderStatusKindChangeForReader.的状态,有以下值:UNSENT, UNACKNOWLEDGED, REQUESTED, ACKNOWLEDGED, UNDERWAY
ChangeFromWriterStatusKindChangeFromWriter.的状态值,有以下:LOST, MISSING, RECEIVED, UNKNOWN
InstanceHandle_t
ParticipantMessageData

RTPS Writer实现

RTPS Writer和RTPS Reader是对RTPS Endpoint的特化实现,StatelessWriter和StatefulWriter是RTPS Reader的特化,它们的区别在于处理匹配的Reader端点的信息时,它们的方法有所不同
TODO P70

RTPS Writer

RTPS Writer属性

attributetypemeaningrelation to DDS
pushModebool
heartbeatPeriodDuration_t
nackResponseDelayDuration_t
nackSuppression DurationDuration_t
lastChangeSequenceNumberSequenceNumber_t
writer_cacheHistoryCache
dataMaxSize Serialized
  1. 默认的时间相关的值
  2. new:new一个RTPS Writer
  3. new_change:new一个CacheChange并添加到RTPS Writer的HistoryCache,sequenceNumber相对上次自动+1;
++this.lastChangeSequenceNumber;
a_change := new CacheChange(kind, this.guid, this.lastChangeSequenceNumber,data, inlineQos, handle);
RETURN a_change;

RTPS StatelessWriter

StatelessWriter不需要知道匹配的readers的数量,也不需要维护匹配的Reader的任何状态;只维护RTPS ReaderLocator,用于向已经匹配的readers发送信息
StatelessWriter属性:

attributetypemeaningrelation to DDS
reader_locatorsReaderLocator[*]StatelessWriter维护一个locators列表,用于发送CacheChanges.N/A

StatelessWriter用于以下场景:

  • writer的HistoryCache交小
  • 通信方式是best-effort
  • writer通过multicast和很多readers通信

StatelessWriter的方法:

  1. new:创建一个StatelessWriter,并进行初始化:this.readerlocators := <empty>;
  2. reader_locator_add:将ReaderLocator对象添加到StatelessWriter::reader_locators中,伪代码:ADD a_locator TO {this.reader_locators};
  3. reader_locator_remove:将ReaderLocator对象从StatelessWriter::reader_locators中移除,伪代码:REMOVE a_locator FROM {this.reader_locators};
  4. unsent_changes_reset:This operation modifies the set of ‘unsent_changes’ for all the ReaderLocators in the StatelessWriter::reader_locators. The list of unsent changes is reset to match the complete list of changes available in the writer’s HistoryCache. This operation is useful when called periodically to cause the StatelessWriter to keep re-sending all available changes in its HistoryCache.伪代码:FOREACH readerLocator in {this.reader_locators} DO readerLocator.unsent_changes := {this.writer_cache.changes}

RTPS ReaderLocator

用于跟踪所有匹配的Readers的locators,用于跟踪和管理与特定Writer匹配的所有远程Reader的位置信息。这使得Writer能够有效地定位和发送数据到正确的Reader。具体来说,ReaderLocator的主要职责是存储和管理远程Reader的网络地址(通常是IP地址和端口号)
RTPS ReaderLocator属性:

attributetypemeaningrelation to DDS
requested_changesCacheChange[*]A list of changes in the writer’s HistoryCache that were requested by remote Readers at this ReaderLocator.
unsent_changesCacheChange[*]A list of changes in the writer’s HistoryCache that have not been sent yet to this ReaderLocator.
locatorLocator_tUnicast or multicast locator through which the readers represented by this ReaderLocator can be reached.
expectsInlineQosboolSpecifies whether the readers represented by this ReaderLocator expect inline QoS to be sent with every Data Message.

RTPS ReaderLocator operations:

  1. new
  2. next_requested_change
  3. next_unsent_change
  4. requested_changes
  5. requested_changes_set
  6. unsent_changes

RTPS StatefulWriter

RTPS StatefulWriter用于配置所有匹配的RTPS Reader,维护每个RTPS Reader的状态,这包括每个匹配的Reader的状态,包括序列号,确认的变化,缓冲的更改,过去的交互等等。这种状态使得StatefulWriter能够提供丰富的增强功能,如确保可靠的信息交付,处理不同的发送速率和网络故障,以及跟踪未确认的更改。
通过维护每个匹配的RTPS Reader端点的状态,RTPS StatefulWriter可以确定所有匹配的RTPS Reader端点是否已收到特定的CacheChange,并且可以通过避免向已接收到Writer的HistoryCache中所有changes的reader发送announcements,从而最优化地使用网络带宽。它维护的信息也简化了写入端的基于QoS的过滤。
RTPS StatefulWriter Attributes

attributetypemeaningrelation to DDS
matched_readersReaderProxy[*]StatefulWriter使用该字段跟踪与其匹配的所有RTPS readers。每个匹配的reader由ReaderProxy类的一个实例表示。

StatefulWriter Operations

  • new:new一个StatefulWriter,并将matched_readers初始化为空:this.matched_readers := <empty>;
  • is_acked_by_all:
  • matched_reader_add
  • matched_reader_remove
  • matched_reader_lookup

RTPS ReaderProxy

用于维护和StatefulWriter匹配的每一个Reader的信息;
RTPS ReaderProxy Attributes:

attributetypemeaningrelation to DDS
remoteReaderGuidGUID_t远端匹配的RTPS Reader的guid
remoteGroupEntityIdEntityId_tgroup中匹配的reader的EntityIdDataReader所属的Subscriber的EntityId
unicastLocatorListLocator_t[*]用于向匹配的RTPS reader发送消息的unicast locators (transport, address, port combinations),该列表可能是空的
multicastLocatorListLocator_t[*]用于向匹配的RTPS reader发送的multicast locators (transport, address, port combinations),可能为空
changes_for_readerCacheChange[*]和RTPS Reader相关的CacheChange的列表——什么作用?
expectsInlineQosboolSpecifies whether the remote matched RTPS Reader expects in-line QoS to be sent along with any data.
isActivebool远端Reader是否需要向writer回应

StatefulWriter将把Writer的HistoryCache中的CacheChange changes发送到由ReaderProxy表示的匹配的RTPS reader中。

ReaderProxy Operations

  • new:创建一个ReaderProxy对象
  • acked_changes_set
  • next_requested_change
  • next_unsent_change
  • requested_changes
  • requested_changes_set
  • unsent_changes
  • unacked_changes

RTPS ChangeForReader

RTPS的ChangeForReader是一个关联类,它保存了RTPS Writer的HistoryCache中的CacheChange信息,这些信息与ReaderProxy所代表的RTPS Reader有关。换句话说,这个类是用来记录和管理RTPS Writer中的缓存改变(CacheChange)对于特定的RTPS Reader(由ReaderProxy代表)的相关信息。

RTPS StatelessWriter Behavior

Best-Effort StatelessWriter Behavior

Best-Effort StatelessWriter和每个ReaderLocator的交互

Transitionstateeventnext state
T1initial给RTPS Best-Effort StatelessWriter配置RTPS ReaderLocatoridle
T2idleGuardCondition: RL::unsent_changes() != <empty>pushing
T3pushingGuardCondition: RL::unsent_changes() == <empty>idle
T4pushingGuardCondition: RL::can_send() == truepushing
T5any stateRTPS Writer is configured to no longer have the ReaderLocatorfinal
  1. T1:这一步是服务发现阶段完成,给RTPS Best-Effort StatelessWriter配置DataReader到RTPS ReaderLocator。
a_locator := new ReaderLocator( locator, expectsInlineQos ); 
the_rtps_writer.reader_locator_add( a_locator );
  1. 表示RTPS Writer的HistoryCache中有一些changes还没有被发送给RTPS ReaderLocator
  2. [RL::unsent_changes() == <empty>] 表明所有RTPS Writer的HistoryCache中的changes被发送到RTPS ReaderLocator中。但这并不意味着这些changes被接收到。
  3. [RL::can_send() == true]表示RTPS Writer有一些resources需要发送change到RTPS ReaderLocator 对象中,伪代码如下:
a_change := the_reader_locator.next_unsent_change();
IF a_change IN the_writer.writer_cache.changes {DATA = new DATA(a_change);IF (the_reader_locator.expectsInlineQos) {DATA.inlineQos := the_writer.related_dds_writer.qos;DATA.inlineQos += a_change.inlineQos;}DATA.readerId := ENTITYID_UNKNOWN;sendto the_reader_locator.locator, DATA;
}
ELSE {GAP = new GAP(a_change.sequenceNumber);GAP.readerId := ENTITYID_UNKNOWN;sendto the_reader_locator.locator, GAP;
}
  1. RTPS Writer不再发送消息给RTPS ReaderLocator。这一步也是由服务发现完成。
the_rtps_writer.reader_locator_remove(the_reader_locator);
delete the_reader_locator;

Reliable StatelessWriter Behavior

Reliable StatelessWriter with respect to each ReaderLocator

Transitionstateeventnext state
T1initialRTPS Writer is configured with a ReaderLocatorannouncing
T2announcingGuardCondition: RL::unsent_changes() != <empty>pushing
T3pushingGuardCondition: RL::unsent_changes() == <empty>announcing
T4pushingGuardCondition: RL::can_send() == truepushing
T5announcingafter(W::heartbeatPeriod)announcing
T6waitingACKNACK message is receivedwaiting
T7waitingGuardCondition: RL::requested_changes() != must_repair
T8must_repairACKNACK message is receivedmust_repair
T9must_repairafter(W::nackResponseDelay)repairing
T10repairingGuardCondition: RL::can_send() == truerepairing
T11repairingGuardCondition: RL::requested_changes() == waiting
T12any stateRTPS Writer is configured to no longer have the ReaderLocatorfinal
  1. 通过服务发现,配置Reliable StatelessWriter的ReaderLocator,将所有匹配的DataReader添加到ReaderLocator.
a_locator := new ReaderLocator( locator, expectsInlineQos ); 
the_rtps_writer.reader_locator_add( a_locator );
  1. [RL::unsent_changes() != ] 表明,RTPS Writer HistoryCache中还有changes没有发送给ReaderLocator;
  2. [RL::unsent_changes == ] 表明,所有RTPS Writer HistoryCache中的消息都已发送到ReaderLocator,但这并不保证这些changes已经被接收;
  3. [RL::can_send() == true]表明RTPS Writer有一些resources需要发送给ReaderLocator对象。执行流程如下:
a_change := the_reader_locator.next_unsent_change(); 
DATA = new DATA(a_change);
IF (the_reader_locator.expectsInlineQos) { DATA.inlineQos := the_writer.related_dds_writer.qos;
}
DATA.readerId := ENTITYID_UNKNOWN;
sendto the_reader_locator.locator, DATA;

After the transition the following post-conditions hold:

( a_change BELONGS-TO the_reader_locator.unsent_changes() ) == FALSE
  1. 这个状态转变由W::heartbeatPeriod.的timer触发,执行逻辑如下:
seq_num_min := the_rtps_writer.writer_cache.get_seq_num_min();
seq_num_max := the_rtps_writer.writer_cache.get_seq_num_max();
HEARTBEAT := new HEARTBEAT(the_rtps_writer.writerGuid, seq_num_min,
seq_num_max);
HEARTBEAT.FinalFlag := SET;
HEARTBEAT.readerId := ENTITYID_UNKNOWN;
sendto the_reader_locator, HEARTBEAT;
  1. 这个状态转换是由于RTPS StatelessWriter接收到一个由某个RTPS Reader发送的ACKNACK message而触发的,执行以下逻辑:
FOREACH reply_locator_t IN { Receiver.unicastReplyLocatorList,
Receiver.multicastReplyLocatorList }
reader_locator := the_rtps_writer.reader_locator_lookup(reply_locator_t); reader_locator.requested_changes_set(ACKNACK.readerSNState.set);

需要注意的是处理这类消息使用RTPS Receiver中的reply locators。这是StatelessWriter确定发送回复的位置的唯一信息源。协议的正常运行需要RTPS Reader在AckNack前插入一个InfoReply Submessage,以便正确设置这些字段。

  1. 这个状态转换由guard condition[RL::requested_changes() != <empty>]触发,表明RTPS ReaderLocator中的有一些RTPS Reader的request
  2. 这个状态转换是由于RTPS StatelessWriter收到由RTPS Reader发送的ACKNACK消息。
  3. 这个状态转换是由一个定时器触发的,表示自从进入must_repair状态以来,W::nackResponseDelay的持续时间已经过去。不执行任何逻辑操作。
  4. todo
  5. todo
  6. todo

RTPS StatefulWriter Behavior

Best-Effort StatefulWriter

Best-Effort StatefulWriter和与其匹配的Reader

Transitionstateeventnext state
T1initialRTPS Writer匹配到了RTPS Readeridle
T2idleGuardCondition: RP::unsent_changes() != <empty>pushing
T3pushingGuardCondition: RP::unsent_changes() == <empty>idle
T4pushingGuardCondition: RP::can_send() == truepushing
T5ready一个新的change被添加到RTPS Writer’s HistoryCache中ready
T6any stateRTPS Writer不再有匹配的RTPS Readerfinal
  1. 这个状态的转变是由RTPS Writer配置到了匹配的RTPS Reader。配置动作是由服务发现完成DataReader匹配到了DataWriter。服务发现提供了初始化一个ReaderProxy对象的参数;执行如下操作:
a_reader_proxy := new ReaderProxy( remoteReaderGuid,remoteGroupEntityId, expectsInlineQos, unicastLocatorList, multicastLocatorList);
the_rtps_writer.matched_reader_add(a_reader_proxy);
  1. 这个状态转变由[RP::unsent_changes() != <empty>]触发,表明RTPS HistoryCache中有一些changes没有发送给代表RTPS Reader的ReaderProxy
  2. 这个状态转变由[RP::unsent_changes() == <empty>]触发,表示RTPS Writer HistoryCache中所有changes都被发送给了代表RTPS Reader的ReaderProxy。但这并不意味着changes被接收到,
  3. 这个状态转变由[RP::can_send() == true]触发,表示RTPS Writer有一些resources需要发送给代表RTPS Reader的ReaderProxy。执行以下操作:
a_change := the_reader_proxy.next_unsent_change();
a_change.status := UNDERWAY;
if (a_change.is_relevant) { DATA = new DATA(a_change);IF (the_reader_proxy.expectsInlineQos) {DATA.inlineQos := the_rtps_writer.related_dds_writer.qos;DATA.inlineQos += a_change.inlineQos;}DATA.readerId := ENTITYID_UNKNOWN;send DATA;
} else {GAP = new GAP(a_change.sequenceNumber);GAP.readerId := ENTITYID_UNKNOWN;Send GAP;
}

上述逻辑并不意味着每个DATA Submessage以分离的形式发送,相反多个子消息可以组合成一个RTPS message。

  1. 这个转变是由于添加了一个新的CacheChange到HistoryCache中。这个change是否和代表RTPS Reader的ReaderProxy有关,是由DDS_FILTER决定。执行逻辑如下:
ADD a_change TO the_reader_proxy.changes_for_reader;
IF (DDS_FILTER(the_reader_proxy, change)) THEN change.is_relevant := FALSE; 
ELSE change.is_relevant := TRUE;
IF (the_rtps_writer.pushMode == true) THEN change.status := UNSENT;
ELSE change.status := UNACKNOWLEDGED;
  1. 这个状态转变是由于RTPS Writer不再和ReaderProxy代表的RTPS Reader匹配。执行如下操作:
the_rtps_writer.matched_reader_remove(the_reader_proxy);
delete the_reader_proxy;

Reliable StatefulWriter

Reliable StatefulWriter与其匹配的Reader

Transitionstateeventnext state
T1initialRTPS Writer匹配到了RTPS Readerannouncing
T2announcingGuardCondition: RP::unsent_changes() != <empty>pushing
T3pushingGuardCondition: RP::unsent_changes() == <empty>announcing
T4pushingGuardCondition: RP::can_send() == truepushing
T5announcingGuardCondition: RP::unacked_changes() == <empty>idle
T6idleGuardCondition: RP::unacked_changes() != <empty>announcing
T7announcingafter(W::heartbeatPeriod)announcing
T8waitingACKNACK消息被接收waiting
T9waitingGuardCondition: RP::requested_changes() != <empty>must_repair
T10must_repairACKNACK message被接收must_repair
T11must_repairafter(W::nackResponseDelay)repairing
T12repairingGuardCondition: RP::can_send() == truerepairing
T13repairingGuardCondition: RP::requested_changes() == <empty>waiting
T14ready一个change添加到RTPS Writer’s HistoryCacheready
T15ready一个change从RTPS Writer’s HistoryCache中移除ready
T16any stateRTPS Writer不再和RTPS Reader匹配final

Writer Liveliness Protocol

DDS标准要求存在一个活跃性机制,RTPS用来实现这个要求的就是Writer Liveliness Protocol。这个协议定义了两个participant之间,为了确认它们所包含的Writer的活跃性所需要的信息交换。在这里,Writer通常指的是在数据传输过程中,负责产生和发送数据的实体或组件。
Writer Liveliness Protocol使用预先定义的内置的Endpoints。使用内置端点意味着一旦一个参与者知道了另一个参与者的存在,它可以假定远程参与者提供的内置端点的存在,并与本地匹配的内置端点建立关联。——?用于内置端点之间通信的协议与用于应用定义端点的协议相同。

built-in Endpoints

Writer Liveliness Protocol要求的built-in Endpoints是BuiltinParticipantMessageWriter和BuiltinParticipantMessageReader.这些Endpoints被用于liveliness,但是在以后也可以被用于其他数据;
RTPS 协议为这些built-in Endpoints预留的EntityId_t:

  • ENTITYID_P2P_BUILTIN_PARTICIPANT_MESSAGE_WRITER
  • ENTITYID_P2P_BUILTIN_PARTICIPANT_MESSAGE_READER

Qos

为了交互,BuiltinParticipantMessageWriter 和 BuiltinParticipantMessageReader使用如下Qos:

  • durability.kind = TRANSIENT_LOCAL_DURABILITY
  • history.kind = KEEP_LAST_HISTORY_QOS
  • history.depth = 1

BuiltinParticipantMessageWriter shall use reliability.kind = RELIABLE_RELIABILITY_QOS,BuiltinParticipantMessageReader既可以使用RELIABLE_RELIABILITY_QOS 也可以使用 BEST_EFFORT_RELIABILITY_QOS. 如果BuiltinParticipantMessageReader使用BEST_EFFORT_RELIABILITY_QOS,那么ParticipantProxy::builtinEndpointQos中就会设置BEST_EFFORT_PARTICIPANT_MESSAGE_DATA_READER flag.

If the ParticipantProxy::builtinEndpointQos is included in the SPDPdiscoveredParticipantData, then the BuiltinParticipantMessageWriter shall treat the BuiltinParticipantMessageReader as indicated by the flags. If the ParticipantProxy::builtinEndpointQos is not included then the BuiltinParticipantMessageWriter shall treat the BuiltinParticipantMessageReader as if it is configured with RELIABLE_RELIABILITY_QOS.

Data Types

同其他RTPS Endpoint一样,RTPS built-in Endpoints也有HistoryCache存储changes。
RTPS built-in Endpoint中DCPSParticipantMessage Topic使用的Data type:
Participant Message Data

使用BuiltinParticipantMessageWriter和BuiltinParticipantMessageReader实现Writer Liveliness Protocol

通过向BuiltinParticipantMessageWriter写入一个sample,就可以判断一个participant的一部分writer的liveliness。如果Participant包含一个或多个liveliness为AUTOMATIC_LIVELINESS_QOS的Writers,

Optional Behavior

Large Data

传输可能会限制最大数据包大小(例如,UDP的最大值为64K),从而限制最大的RTPS子消息大小。这主要影响数据子消息,因为它限制了serializedData的最大大小,或者使用的数据类型的最大序列化大小。为了解决这个限制,以下子消息实现了对大数据的分段:

  • DataFrag
  • HeartbeatFrag
  • NackFrag

下面列出了交互所需的相应行为

如何选择fragment的size

fragment size的大小由Writer以及下面条件:

  • 所有Writer有效的传输方式必须至少能容纳包含一个片段的DataFrag子消息,这意味着具有最小最大消息大小的传输方式决定了片段的大小。
  • 对于特定的编写者,片段大小必须是固定的,并且对于所有远程readers都是相同的。通过固定片段大小,片段编号所指的数据不依赖于特定的远程reader。这简化了来自reader的NackFrag的处理。
  • 片段大小必须 <= 65536字节

注意,片段大小由writer可用的所有传输方式确定,而不仅仅是到达所有当前已知reader所需的传输方式的子集(什么意思?)。这确保了新发现的readers,无论他们可以通过哪种传输方式到达,都可以在不改变片段大小的情况下进行适应,否则将违反上述要求。

如何发送片段

如果分段要求将Data Submessage分成若干DataFrag Submessages,需要满足以下要求:

  • DataFrag Submessages需要按顺序发送,序号是fragment numbers递增的,但注意这并不保证按顺序达到
  • Data只能在被要求的时候分段。如果多种传输方式对Writer有效,其中一些传输方式不要求分段,那么就应该在这些传输方式上发送Data Submessage。同样,对于可变大小的数据类型,如果对于特定的序列号不需要进行分段,那么就应该使用常规的Data Submessage(后一句啥意思)
  • 对于给定的序列号,如果使用了内联的QoS(服务质量)参数,那么必须将这些参数与第一个DataFrag子消息(包含碎片编号等于1的碎片)一起包含。对于该序列号的后续DataFrag子消息,也可以包含这些参数,但这并不是必须的。这段说明主要指出了在使用QoS参数时的处理方式和要求。

如何重组分片

DataFrag Submessages包含了所有需要重组data的信息,一旦所有片段都收到,就可以作为常规Data Submessage处理。实现上也必须要能够处理乱序到达的DataFrag submessages。

Reliable通信

在可靠地发送DataFrag Submessage的协议行为上,除了要满足发送常规Data Submessage的要求外,还需满足以下条件:

  • 心跳子消息的语义保持不变:心跳消息只能包括所有片段都可用的序列号。也就是说,心跳消息的发送应该仅限于那些所有数据片段都已经准备就绪,可以进行发送的序列号,不能包含那些尚未准备就绪的序列号。这就确保了数据传输的完整性和可靠性。
  • AckNack子消息的语义保持不变:只有在接收到特定序列号的所有片段时,AckNack消息才能对该序列号进行肯定确认。同样,只有在所有片段都丢失时,才能对序列号进行否定确认。这就意味着,AckNack消息只有在数据完全收到或完全丢失的情况下才能进行确认,确保了数据传输的准确性和可靠性。
  • 为了对某一给定序列号的片段子集进行否定确认,必须使用NackFrag Submessage。当数据被分片时,心跳可能会触发AckNack和NackFrag Submessage。

另外,

  • As mentioned above, a Heartbeat Submessage can only include a sequence number once all fragments for that sequence number are available. If a Writer wants to inform a Reader on the partial availability of fragments for a given sequence number, a HeartbeatFrag Submessage can be used instead. Fragment level reliability may be helpful for very large data and when using flow control.
  • NackFrag Submessage消息只能用来回应Heartbeat或HeartbeatFrag submessage.

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/845861.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

ARM-V9 RME(Realm Management Extension)系统架构之系统安全能力的信任根服务

安全之安全(security)博客目录导读 目录 一、信任根服务 1、非易失性存储 2、根看门狗 3、随机数生成器 4、加密服务 5、硬件强制安全性 本节定义了系统架构必须支持的一般安全属性和能力&#xff0c;以确保RME安全性。 本章扩展了可能属于系统认证配置文件的一部分的其…

30 分钟内掌握 Mainnet、Testnet 和 Devnet。Devnet是什么??

在区块链技术领域&#xff0c;Mainnet、Testnet 和 Devnet 等术语经常被使用&#xff0c;但也经常被误解。 这三种环境在区块链应用的开发和部署中起着至关重要的作用&#xff0c;但它们的区别和目的却常常被混淆。 让我们踏上探索之旅&#xff0c;揭开 Mainnet、Testnet 和 De…

Simulink中使用ROS1自定义消息

Simulink中使用ROS1自定义消息 简介前提条件操作流程问题一问题二问题三 吐槽 简介 最近在做的项目里需要使用Simulink与ROS联合仿真&#xff0c;这里就遇到了一个问题&#xff0c;Simulink无法直接使用ROS中的自定义消息&#xff0c;需要在MATLAB中生成一下&#xff0c;再引入…

GiantPandaCV | FasterTransformer Decoding 源码分析(六)-CrossAttention介绍

本文来源公众号“GiantPandaCV”&#xff0c;仅用于学术分享&#xff0c;侵权删&#xff0c;干货满满。 原文链接&#xff1a;FasterTransformer Decoding 源码分析(六)-CrossAttention介绍 GiantPandaCV | FasterTransformer Decoding 源码分析(一)-整体框架介绍-CSDN博客 …

MyBatis系统学习篇 - 分页插件

MyBatis是一个非常流行的Java持久层框架&#xff0c;它简化了数据库操作的代码。分页是数据库查询中常见的需求&#xff0c;MyBatis本身并不直接支持分页功能&#xff0c;但可以通过插件来实现&#xff0c;从而帮助我们在查询数据库的时候更加方便快捷 引入依赖 <dependen…

移动端路由切换解决方案 —— 虚拟任务栈让你的 H5 像APP一样丝滑

目录 01: 前言 02: 通用组件&#xff1a;trigger-menu 和 trigger-menu-item 构建方案分析 03: 通用组件&#xff1a;构建 trigger-menu 和 trigger-menu-item 04: 前台业务下 H5 的应用场景 05: 通用组件&#xff1a;transition-router-view 构建方案分析 与 虚拟任务栈…

Java实战:将学生列表写入文件

本实战项目旨在演示如何使用Java语言将学生信息列表写入到一个文本文件中&#xff0c;并进行单元测试以确保代码的正确性。 创建静态方法 定义一个名为writeStudentsToFile的静态方法&#xff0c;该方法接收两个参数&#xff1a;一个Student对象的列表和一个文件路径。使用File…

Python疑难杂症--考试复习

1.排序输出字典中数据 dic1 {Tom:21,Bob:18,Jack:23,Ana:20} dic2 {李雷:21,韩梅梅:18,小明:23,小红:20} nint(input()) if n>len(dic1):nlen(dic1) print(sorted(dic1.keys())[:n]) print(sorted(dic2.items(),keylambda item:item[1])[:n]) 2.罗马数字转换 def F(s):d{…

SQL—DQL(数据查询语言)之小结

一、引言 在前面我们已经学习完了所有的关于DQL&#xff08;数据查询语言&#xff09;的基础语法块部分&#xff0c;现在对DQL语句所涉及的语法&#xff0c;以及需要注意的事项做一个简单的总结。 二、DQL语句 1、基础查询 注意&#xff1a; 基础查询的语法是&#xff1a;SELE…

FineBi导出Excel后台版实现

就是不通过浏览器,在后台运行的导出 参考文档在:仪表板查看接口- FineBI帮助文档 FineBI帮助文档 我这里是将这个帮助文档中导出的excel文件写到服务器某个地方后,对excel进行其他操作后再下载。由于原有接口耦合了HttpServletRequest req, HttpServletResponse res对象,…

海外短剧APP/H5 系统开发搭建

目前已经有多个客户用我们搭建的海外短剧系统&#xff0c;在使用中已经取得了较高的收益。目前一个客户打算做日本区域的海外短剧项目&#xff0c;需求已经理清楚了&#xff0c;系统正在搭建中

[MYSQL] 部门工资最高的员工

表&#xff1a; Employee ----------------------- | 列名 | 类型 | ----------------------- | id | int | | name | varchar | | salary | int | | departmentId | int | ----------------------- 在 SQL 中&#xff0c;id…

Deconfounding Duration Bias in Watch-time Prediction for Video Recommendation

Abstract 观看时间预测仍然是通过视频推荐加强用户粘性的关键因素。然而&#xff0c;观看时间的预测不仅取决于用户与视频的匹配&#xff0c;而且经常被视频本身的持续时间所误导。为了提高观看时间&#xff0c;推荐总是偏向于长时间的视频。在这种不平衡的数据上训练的模型面…

[机器学习]GPT LoRA 大模型微调,生成猫耳娘

往期热门专栏回顾 专栏描述Java项目实战介绍Java组件安装、使用&#xff1b;手写框架等Aws服务器实战Aws Linux服务器上操作nginx、git、JDK、VueJava微服务实战Java 微服务实战&#xff0c;Spring Cloud Netflix套件、Spring Cloud Alibaba套件、Seata、gateway、shadingjdbc…

牛客网刷题 | BC104 翻转金字塔图案

目前主要分为三个专栏&#xff0c;后续还会添加&#xff1a; 专栏如下&#xff1a; C语言刷题解析 C语言系列文章 我的成长经历 感谢阅读&#xff01; 初来乍到&#xff0c;如有错误请指出&#xff0c;感谢&#xff01; 描述 KiKi学习了循环&am…

万字详解 MySQL MGR 高可用集群搭建

文章目录 1、MGR 前置介绍1.1、什么是 MGR1.2、MGR 优点1.3、MGR 缺点1.4、MGR 适用场景 2、MySQL MGR 搭建流程2.1、环境准备2.2、搭建流程2.2.1、配置系统环境2.2.2、安装 MySQL2.2.3、配置启动 MySQL2.2.4、修改密码、设置主从同步2.2.5、安装 MGR 插件 3、MySQL MGR 故障转…

智慧排水监测系统方案

智慧排水监测系统方案 智慧排水监测系统作为现代城市基础设施管理的重要组成部分&#xff0c;旨在通过先进的信息技术手段&#xff0c;实现对城市排水系统的全面、实时、高效的远程监控与管理。该系统整合了物联网技术、大数据分析、云计算平台与人工智能算法&#xff0c;不仅…

告别暗黄,唤醒肌肤

&#x1f3ad; 想象一下&#xff0c;你的皮肤是舞台上的主角&#xff0c;但最近它似乎有些“疲惫”和“黯淡”&#xff0c;仿佛失去了往日的星光✨。别急&#xff0c;今天&#xff0c;我要为你揭秘一个能让肌肤重新焕发光彩的“魔法”——胶原蛋白&#xff01;&#x1f3a9; &a…

docker查看容器目录挂载

查看命令 docker inspect --format{{ json .Mounts }} <container_id_or_name> | jq 示例 docker inspect --format{{ json .Mounts }} af656ae540af | jq输出

FreeRTOS笔记 - 二(正点原子)

一&#xff0c;任务创建和删除 具体的参数&#xff08;看视频&#xff09; 1&#xff0c;动态和静态创建的区别 动态: 任务的任务控制块以及任务的栈空间所需的内存&#xff0c;均由FreeRTOS从 FreeRTOS 管理的堆中分配。 静态: 任务的任务控制块以及任务的栈空间所需的内存&am…