-
在workspace/modules下新建包
buildtool create --template component modules/test_one
-
编译包
buildtool build -p modules/test_two/
-
增加自己的proto消息
在刚才自动生成的proto文件里面添加自己定义的消息,记得重新编译.
syntax = "proto2";package apollo;// message type of channel, just a placeholder for demo,
// you should use `--channel_message_type` option to specify the real message type
message TestTwoMsg {}message TestTwoConfig {optional string name = 1;
};
// new added
message TestTwoMymessage{optional string twoInfo = 1;
}
也可以新建proto文件, 修改BUILD文件;记得重新编译.
proto文件内容如下
syntax = "proto2";
package apollo;message TestTwoLwlgzy{optional string twoInfo = 1;optional string twoFault = 2;
}
修改对应BUILF的文件
load("//tools:apollo_package.bzl", "apollo_package")
load("//tools/proto:proto.bzl", "proto_library")
load("//tools:cpplint.bzl", "cpplint")package(default_visibility = ["//visibility:public"])proto_library(name = "test_two_proto",srcs = ["test_two.proto"],
)proto_library(name = "test_two_lwlgzy_proto",srcs = ["test_two_lwlgzy.proto"],
)
apollo_package()cpplint()
- 生成的pb2文件路径在
/opt/apollo/neo/python/modules/test_two/proto
a). 新建test.py文件加入如下内容from modules.test_two.proto.test_two_pb2 import TestTwoMymessage
b). 给运行权限sudo chmod 755 *.py
运行没有报错即为正常.
本机没有编译or缺少需要引用的proto文件时候, 也可以直接将其他机器生成的pb2.py文件拷贝到类似
/opt/apollo/neo/python/xxx/xxx/proto
下import才不会报错.
- python 文件中import 路径为
from 包名路径.proto.xx_pb2 import xxx
. 例如:from modules.test_two.proto.test_two_lwlgzy_pb2 import TestTwoLwlgzy
modules/test_two/
|-- BUILD
|-- conf
| |-- test_two.conf
| `-- test_two.pb.txt
|-- cyberfile.xml
|-- dag
| `-- test_two.dag
|-- launch
| `-- test_two.launch
|-- proto
| |-- BUILD
| |-- test_two.proto
| `-- test_two_lwlgzy.proto
|-- test.py
|-- test_two_component.cc
`-- test_two_component.h4 directories, 12 files
- 播包
cyber_recorder play -f modules/chassis.00000.20210622135417
订阅底盘话题处理故障实例
import time
import asyncio
from datetime import datetime
from cyber.python.cyber_py3 import cyber
from modules.canbus_vehicle.yt.proto.yt_pb2 import Yt
from od_health_test.proto.od_health_test_pb2 import OdErrorInfo, OdErrorMsgsimport logging
from modules.tools.common.logger import Logger
import osAPOLLO_ROOT = "/apollo"class xxHealthMonitor:def __init__(self) -> None:self.health_node_ = cyber.Node("xx_health_monitor_node")Logger.config(log_file = os.path.join(APOLLO_ROOT, 'data/log/xx_health.log'),use_stdout=True,log_level=logging.DEBUG)self.logger = Logger.get_logger("OdHealthMonitor")self.logger.info("od_health_monitor_node is started. ")self.health_node_.create_reader("/apollo/canbus/chassis_detail",Yt,self._chassis_cb_)self.health_writer = self.health_node_.create_writer("/od/health", OdErrorMsgs)self.has_data = Falseself.bat_total_voltage = 0self.bat_total_current = 0self.od_ErrMessages = OdErrorMsgs()e_info = OdErrorInfo()e_info.err_level = ''e_info.err_message = ''self.od_ErrMessages.error_msgs.append(e_info)self.last_data_time = None# /apollo/canbus/chassis_detail 100hzdef _chassis_cb_(self,msg):self.last_data_time = time.time()#...解析数据,添加逻辑判断def _padding_fault_msgs(self, level, msg):err_info = OdErrorInfo()err_info.err_level = levelerr_info.err_message = msgself.od_ErrMessages.error_msgs.append(err_info)def _error_checking(self):passdef run(self):while not cyber.is_shutdown():try:current_time = time.time()if self.last_data_time is None or (current_time - self.last_data_time)>1:self.od_ErrMessages = OdErrorMsgs()e_info = OdErrorInfo()e_info.err_level = '1'e_info.err_message = 'No data received from chassis_detail'self.od_ErrMessages.error_msgs.append(e_info)self.health_writer.write(self.od_ErrMessages)else:self.od_ErrMessages = OdErrorMsgs()self._error_checking()self.health_writer.write(self.od_ErrMessages) # print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"))self.dump2log()except Exception as e:print("Error:", e)time.sleep(1) # 1hzself.has_data = Falsedef dump2log(self):dumpMsg = ""for oo in self.od_ErrMessages.error_msgs:dumpMsg += f"[{oo.err_level}:{oo.err_message}],"# print(writemsg.strip(','))self.logger.info(dumpMsg.strip(','))if __name__=="__main__":cyber.init()print("health monitor is starting!")health_monitor = OdHealthMonitor()health_monitor.run()print("health monitor is closing!")cyber.shutdown()