文章目录
- apollo 安装
- apollo的基本架构
- 组件机制
- component
- 编译与加载
- 节点通讯
- 数据的传输
- 消息读写的实现
- 消息的写端
- 消息读端
- 常用术语
- Component
- Channel
- Task
- Node
- Reader/Writer
- Service/Client
- Parameter
- 服务发现
- CRoutine
- Scheduler
- Message
- Dag文件
- Launch文件
- Record文件
- Mainboard
- Monitor工具使用
原文
apollo 安装
sudo apt-get update
sudo apt-add-repository multiverse
sudo apt-get update
# sudo apt-get install nividia-driver-455 # 看版本
启用cuda
distribution=$(. /etc/os-release;echo $ID$VERSION_ID)
curl -s -L https://nvidia.github.io/nvidia-docker/gpgkey | sudo apt-key add -
curl -s -L https://nvidia.github.io/nvidia-docker/$distribution/nvidia-docker.list | sudo tee /etc/apt/sources.list.d/nvidia-docker.list
sudo apt-get -y update
sudo apt-get install -y nvidia-docker2
clone源代码
# 使用 SSH 的方式
git clone git@github.com:ApolloAuto/apollo.git
启动容器
./docker/scripts/dev_start.sh
进入容器
./docker/scripts/dev_into.sh
构建
./apollo.sh build
启动
./scripts/bootstrap.sh start
http://localhost:8888 可以显示DreamView界面
回放demo
在docs/02_Quick Start/demo_guide
python3 record_helper.py demo_3.5.record
cyber_recorder play -f demo_3.5.record -l
cyber_recorder在apollo/中
apollo的基本架构
apollo开发文档
常用数据见本篇末尾
组件机制
Cyber RT 组件机制
Cyber RT采用了基于Component模块和有向无环图(DAG)的动态加载配置的工程框架。即将相关算法模块通过Component创建,并通过DAG拓扑定义对各Component依赖关系进行动态加载和配置,从而实现对算法进行统一调度,对资源进行统一分配。
component
Component构成去中心化的网络,系统由多个component组成,构成一张计算图,如
component分为两类:
apollo::cyber::Component最多可以支持4路消息融合。多个channel读取数据时,第一个channel为主channel。当主 Channel 有消息到达,Cyber RT会调用 Component 的 apollo::cyber::Component::Proc 进行一次数据处理。
也就是说,每次有主消息到达,其就会执行一次Proc。
apollo::cyber::TimerComponent不会被消息出发,而是由系统定时调用。
创建一个Component的方式如下:
//1、包含头文件
#include "cyber/component/component.h"
#include "test/proto/examples.pb.h"// 2、定义一个类,继承以Driver和Chatter为参数的Component模版类
class ComponentSample : public Component<Driver, Chatter> {
public://3、重写Init() 函数和Proc() 函数bool Init() override;bool Proc(const std::shared_ptr<Driver>& msg0,const std::shared_ptr<Chatter>& msg1) override;
};
//4、 在cyber中注册ComopnentSample
CYBER_REGISTER_COMPONENT(ComponentSample)
也就是说我们需要继承一个模版类,并在cyber RT中注册这个类。
编译与加载
在Cyber RT中,Component会变编译为独立的.so文件。Cyber RT可以使用cyber_launch工具启动componenet对应的launch文件(可以是二进制文件或dag)或使用mainboard启动component对应的dag文件。
![[Apollo部署与简易架构梳理/Pasted image 20240723163142.png]]
节点通讯
参考资料
Publish-Subscribe模式
Reader和Writer
![[apollo/Apollo部署与简易架构梳理/Pasted image 20240715111104.png]]
RTPS(Real-Time Publish-Subscribe) 是一种用于实时分布式系统的通信协议,主要用于实现设备间的实时数据交换。
Fast RTPS是DDS标准的一个非常流行的开源实现。
DDS(Data Distribution Service)标准提供了一个平台无关的数据模型,主要用于实时分布式系统。不同的实现可以相互通信。
- 数据中心化:DDS采用数据为中心的架构,数据被看作是核心,而不是进程之间的直接通信。
- 发布/订阅模型:DDS使用发布/订阅(Pub/Sub)模式,允许数据生产者(发布者)和数据消费者(订阅者)之间进行解耦,从而提高系统的灵活性和可扩展性。
节点间通过channel进行读写形成数据通路,这些数据通路形成一个动态的数据流图。
cyber/service_discovery/TopologyManager对网络拓扑进行监控管理。只需要一个管理因此单例即可
- NodeManager
用于管理网络拓扑中的节点。
- ChannelManager
用于管理channel,即网络拓扑中的边。
- ServiceManager
用于管理Service
和Client
。
TopologyManager::TopologyManager(): init_(false),node_manager_(nullptr),channel_manager_(nullptr),service_manager_(nullptr),participant_(nullptr),participant_listener_(nullptr) {Init();}
TopologyManager::CreateParticipant()函数可以创建网络拓扑的参与者——transport::Participant对象,其包含host name与process id的名称。
ParticipantListener用于监听网络的变化。网络拓扑发生变化时,Fast RTPS传上来ParticipantDiscoveryInfo,在TopologyManager::Convert()函数中对该信息转换成Cyber RT中的数据结构ChangeMsg。然后调用回调函数TopologyManager::OnParticipantChange(),它会调用其它几个子管理器的OnTopoModuleLeave()函数。然后子管理器中便可以将相应维护的信息进行更新(如NodeManager中将相应的节点删除)。
初始化的时候会创建实例
数据的传输
- INTRA:如果是同进程的,因为在同一地址空间,直接传指针就完了。
- SHM(Shared memory):如果是同一机器上,但跨进程的,为了高效可以使用共享内存。
- RTPS:如果是跨设备的,那就老老实实通过网络传吧。
(目前我们的车机应该不太设计跨设备咯?)
![[apollo/Apollo部署与简易架构梳理/Pasted image 20240715120551.png]]
每个Writer有Transmitter,每个Reader有Receiver。它们是负责消息发送与收取的类。Transmitter与Receiver的基类为Endpoint,代表一个通信的端点,它主要的信息是身份标识与属性。其类型为RoleAttributes(定义在role_attributes.proto)的成员attr_包含了host name,process id和一个根据uuid产生的hash值作为id。通过它们就可以判断节点之间的相对位置关系了。
Reader和Writer会调用Transport的方法CreateTransmitter()和CreateReceiver()用于创建发送端的transmitter和接收端的receiver。创建时有四种模式可选,分别是INTRA,SHM和RTPS,和HYBRID。最后一种是前三种的混合模式,也是默认的模式。
消息读写的实现
消息的写端
在Component组件中通过CreaterWriter()函数创建Writer对象,然后向指定的channel发送消息。
如
writer_ = node_->CreateWriter<Image>(camera_config_->channel_name());// 创建writer对象// 在创建该对象时,会在Init()初始化中通过CreateTransmitter()创建Transmitter对象,并调用JointTheTopology()将其加入到ChannelManager维护的拓扑信息中。
//
//此外,还会对该channel的读者相应的调用Enable函数和Disable函数。
//
...
auto pb_image = std::make_shared<Image>();
pb_image->mutable_header()->set_frame_id(camera_config_->frame_id());
pb_image->set_width(raw_image_->width);
pb_image->set_height(raw_image_->height);
pb_image->mutable_data()->reserve(raw_image_->image_size);
// 以上完成发送数据的准备
// 这些消息如pb_image是定义好的protobuf类消息。
...
writer_->Write(pb_image);
// 其会调用Transmitter::Transmit()将message发出
消息读端
对于一个component而言,其可以从多个channel收取消息。
上层模块要取用这些消息,主要两种方式:一种是通过Component
的Proc()
接口,它被调用时参数就是最新的消息。另一种是通过Reader
的Observe()
函数直接拿
例:
template <typename M0, typename M1> // 声明一个模板类Component,其中M0和M1是数据类型参数
bool Component<M0, M1, NullType, NullType>::Initialize( // 初始化函数的定义...ReaderConfig reader_cfg; // 创建一个ReaderConfig对象用于配置读者reader_cfg.channel_name = config.readers(1).channel(); // 设置通道名称为配置中的第二个读者通道reader_cfg.qos_profile.CopyFrom(config.readers(1).qos_profile()); // 从配置中复制第二个读者的QoS配置reader_cfg.pending_queue_size = config.readers(1).pending_queue_size(); // 设置待处理队列大小为配置中的第二个读者大小auto reader1 = node_->template CreateReader<M1>(reader_cfg); // 使用配置创建一个类型为M1的读者,并命名为reader1reader_cfg.channel_name = config.readers(0).channel(); // 更新通道名称为配置中的第一个读者通道reader_cfg.qos_profile.CopyFrom(config.readers(0).qos_profile()); // 从配置中复制第一个读者的QoS配置reader_cfg.pending_queue_size = config.readers(0).pending_queue_size(); // 设置待处理队列大小为配置中的第一个读者大小auto reader0 = node_->template CreateReader<M0>(reader_cfg); // 使用配置创建一个类型为M0的读者,并命名为reader0...readers_.push_back(std::move(reader0)); // 将reader0移动到readers_向量中readers_.push_back(std::move(reader1)); // 将reader1移动到readers_向量中... std::vector<data::VisitorConfig> config_list; // 创建一个data::VisitorConfig类型的向量用于存储访问者配置for (auto& reader : readers_) { // 遍历所有读者config_list.emplace_back(reader->ChannelId(), reader->PendingQueueSize()); // 将每个读者的配置添加到配置列表中} auto dv = std::make_shared<data::DataVisitor<M0, M1>>(config_list); // 使用配置列表创建一个DataVisitor智能指针,并特化为M0和M1类型croutine::RoutineFactory factory = // 创建一个RoutineFactory对象croutine::CreateRoutineFactory<M0, M1>(func, dv); // 使用工厂函数创建RoutineFactory,并传入func函数和数据访问者return sched->CreateTask(factory, node_->Name()); // 创建一个任务并返回创建结果,任务使用RoutineFactory和节点名称
}
这个案例中有两个channel并分别创建了reader对象。
每个channel都有对应的协程进行数据处理,还有一个协程实现消息的组合,所以有n个channel的component至少会有n+1个协程
常用术语
Component
在自动驾驶系统中,模块(如感知、定位、控制系统…)在 Cyber RT 下以 Component 的形式存在。不同 Component 之间通过 Channel 进行通信。Component 概念不仅解耦了模块,还为将模块拆分为多个子模块提供了灵活性。
Channel
Channel 用于管理 Cyber RT 中的数据通信。用户可以发布/订阅同一个 Channel,实现 p2p 通信。
Task
Task 是 Cyber RT 中异步计算任务的抽象描述。
Node
Node 是 Cyber RT 的基本组成部分;每个模块都包含一个 Node 并通过 Node 进行通信。通过在节点中定义 Reader/Writer 或 Service/Client,模块可以具有不同类型的通信形式。
Reader/Writer
Reader/Writer 通常在 Node 内创建,作为 Cyber RT 中的主要消息传输接口。
Service/Client
除了 Reader/Writer 之外,Cyber RT 还提供了用于模块通信的 Service/Client 模式。它支持节点之间的双向通信。当对服务发出请求时,客户端节点将收到响应。
Parameter
参数服务在 Cyber RT 中提供了全局参数访问接口。它是基于 Service/Client 模式构建的。
服务发现
作为一个去中心化的框架,Cyber RT 没有用于服务注册的主/中心节点。所有节点都被平等对待,可以通过“服务发现”找到其他服务节点。使用UDP
用来服务发现。
CRoutine
参考协程(Coroutine)的概念,Cyber RT 实现了 Coroutine 来优化线程使用和系统资源分配。
Scheduler
为了更好地支持自动驾驶场景,Cyber RT 提供了多种资源调度算法供开发者选择。
Message
Message 是 Cyber RT 中用于模块之间数据传输的数据单元。
Dag文件
Dag 文件是模块拓扑关系的配置文件。您可以在 dag 文件中定义使用的 Component 和上游/下游通道。
Launch文件
Launch 文件提供了一种启动模块的简单方法。通过在launch文件中定义一个或多个 dag 文件,可以同时启动多个模块。
Record文件
Record 文件用于记录从 Cyber RT 中的 Channel 发送/接收的消息。回放 Record 文件可以帮助重现Cyber RT之前操作的行为。
Mainboard
Cyber RT 的主入口,可以通过mainboard -d xxx.dag
来启动一个模块进程。
Monitor工具使用
ESC | q key ---- 退出
Backspace ---- 后退
h | H ---- 显示帮助页
PageDown | Ctrl+d ---- 上一页
PageUp | Ctrl+u ---- 下一页
Up, down or w, s keys ---- 上下移动当前的高亮行
Right arrow or d key ---- 进入高亮行, 显示高亮行数据的详细信息
Left arrow or a key ---- 从当前界面返回上一层界面
Enter key ---- 与d键相同
f | F ---- 显示数据帧频率
t | T ---- 显示channel消息类型
Space ---- 关闭|开启 channel (仅在channel有数据到达时有效; channel关闭后会变成黄色)
i | I ---- 显示channel的Reader和Writer信息
b | B ---- 显示channel消息内容
n | N ---- 显示消息中RepeatedField的下一条数据
m | M ---- 显示消息中RepeatedField的上一条数据