本文主要介绍ROS的Topics概念,如何创建Publisher和Subscriber,通过Topic在ROS程序间通信;介绍ROS的Services概念,如何创建Client和Server并建立通信。
更多内容,访问专栏目录获取实时更新。
ROS Topics
Topics可以被视为一种具名的总线,用于节点间交换数据,通过Topics可以发布和订阅消息,实现单向的流式通信。需要注意的重点包括:
- 单向流式通信(发布/订阅模式)
- 匿名通信
- 每个Topic都有特定的消息格式
- 可以在ROS节点中通过Python, C++等多种语言实现
- 一个节点可以拥有多个Topic
Publisher in Python
在ROS2基础编程一文中,我们已经创建了工作空间和工作包,在此基础上,来通过Python实现一个Topic的发布者。
在my_py_pkg工作包下创建一个新的文件robot_news_station.py
robot_news_station.py
#!/usr/bin/env python3
import rclpy
from rclpy.node import Node
from example_interfaces.msg import String # do not forget to add package dependencyclass RobotNewsPublisherNode(Node):def __init__(self):super().__init__("robot_news_publisher")self.robot_name = "HiRobot"self.publisher = self.create_publisher(String, "robot_news", 10)self.timer = self.create_timer(1, self.publish_news)self.get_logger().info("Robot News Publisher has been started")def publish_news(self):msg = String()msg.data = "Hi, this is " + str(self.robot_name) + " from the robot news publisher"self.publisher.publish(msg)def main(args=None):rclpy.init(args=args)node = RobotNewsPublisherNode()rclpy.spin(node)rclpy.shutdown()if __name__ == "__main__":main()
还需要添加依赖包引用,修改package.xml文件:
然后装载我们的新节点,修改setup.py:
之后执行下面的指令就可以启动我们的publisher了:
colcon build --packages-select my_py_pkg --symlink-install
source ~/.bashrc
ros2 run my_py_pkg robot_news_station
ros2 topic echo /robot_news
Subscriber in Python
创建订阅者的方式与创建发布者类似,这里我们添加了一个名为robot_news_subscriber.py的文件:
#!/usr/bin/env python3
import rclpy
from rclpy.node import Node
from example_interfaces.msg import String # do not forget to add package dependencyclass RobotNewsSubscriberNode(Node):def __init__(self):super().__init__("robot_news_subscriber")self.subscriber = self.create_subscription(String, "robot_news", self.subscribe_news_callback, 10)self.get_logger().info("Robot News Subscriber has been started")def subscribe_news_callback(self, msg):self.get_logger().info(msg.data)def main(args=None):rclpy.init(args=args)node = RobotNewsSubscriberNode()rclpy.spin(node)rclpy.shutdown()if __name__ == "__main__":main()
不要忘记修改setup.py和colcon build
编译。之后同时运行publisher和subscriber就可以看到右侧命令行里的subscriber每1s收到一条来自publisher的信息,并打印:
使用命令行工具调试Topics
ros2 topic list
ros2 topic info <topic_name> # 查看所有激活的topic
ros2 topic echo <topic_name> # 创建一个订阅者监听发布的消息
ros2 interface show <msg_type> # 显示话题消息的类型
ros2 topic hz <topic_name> # 获取发布频率
ros2 topic bw <topic_name> # 获取消息的长度,byte width
ros2 topic pub <topic_name> <msg_type> <msg_data> # 发布一个topic
ros2 topic pub -r 10 /robot_news example_interfaces/msg/String "{data: 'hello from robot'}"
ros2 node list
ros2 info <node_name>
ros2 run <pkg_name> <node_name> --ros-args -r __node:=<new_node_name>
ros2 run <pkg_name> <node_name> --ros-args -r __node:=<new_node_name> -r <topic_name>:=<new_topic_name>
Ros Services
前文提到Topics实现的是单向的传输,通过发布/订阅模式建立连接,但用在一些需要请求/回复的分布式系统中就不太合适了。
Services可以帮助我们实现请求/回复的通信模式,一条消息用于请求,一条用于回复,ROS节点以字符串名称提供服务,客户端通过发送请求消息并等待回复来调用服务,从而建立持久性的连接。
Service Server in Python
ros2 interface show example_interfaces/srv
执行上面的指令,你能看到example_interface包里提供了哪些服务,这里我们使用AddTwoInts来演示如何创建一个service server
add_two_ints_server.py
#!/usr/bin/env python3
import rclpy
from rclpy.node import Node
from example_interfaces.srv import AddTwoIntsclass AddTwoIntsServerNode(Node):def __init__(self):super().__init__("add_two_ints_server")self.server = self.create_service(AddTwoInts, "add_two_ints", self.callback_add_two_ints)self.get_logger().info("Add Two Ints Server has started")def callback_add_two_ints(self, request, response):response.sum = request.a + request.bself.get_logger().info(str(request.a) + " + " + str(request.b) + " = " + str(response.sum))return responsedef main(args=None):rclpy.init(args=args)node = AddTwoIntsServerNode()rclpy.spin(node)rclpy.shutdown()if __name__ == "__main__":main()
编译并运行,通过ros2 service list
就能在服务列表里看到我们启动的服务了。
ros2 service call /add_two_ints example_interfaces/srv/AddTwoInts "{a: 3, b: 4}"
执行上面的指令就可以在命令行里调用我们创建的service。
Service Client in Python
add_two_ints_client.py
#!/usr/bin/env python3
import rclpy
from rclpy.node import Node
from example_interfaces.srv import AddTwoIntsdef main(args=None):rclpy.init(args=args)node = Node("add_two_ints_no_oop")client = node.create_client(AddTwoInts, "add_two_ints")while not client.wait_for_service(1):node.get_logger().warn("Waiting for server Add Two Ints...")request = AddTwoInts.Request()request.a = 3request.b = 8future = client.call_async(request)rclpy.spin_until_future_complete(node, future)try:response = future.result()node.get_logger().info(str(request.a) + " + " + str(request.b) + " = " + str(response.sum))except Exception as e:node.get_logger().error("Service call failed %r" % (e,))rclpy.shutdown()if __name__ == "__main__":main()
使用命令行工具调试Services
ros2 service list
ros2 service type <service_name>
ros2 interface show <service type>
ros2 service call <service_name> <service_type> <value>
ros2 run <pkg_name> <node_name> --ros-args -r <service_name>:=<new_service_name>
ROS Interface
ROS应用之间有三种通信接口:messages, services和actions,ROS2使用IDL(interface definition language)来描述这些接口,使得在不同应用,不同编程语言间进行交互更加简单。例如前文提到的Topic和Service:
Topic:
- Name:话题名
- 消息定义:Msg,如example_interfaces/msg/Int64
Service:
- Name: 服务名
- 服务定义:Srv,如example_interfaces/srv/AddTwoInts (包含request和response)
创建自定义的Msg
创建一个新的工作包my_robot_interfaces,移除目录下的include和src文件夹,并新建msg文件夹:
在msg文件夹下新建msg定义文件: HardwareStatus.msg
int64 temperature
string debug_message
更新package.xml,添加:
<build_depend>rosidl_default_generators</build_depend><exec_depend>rosidl_default_runtime</exec_depend><member_of_group>rosidl_interface_packages</member_of_group>
更新CMakeLists.txt,添加:
find_package(rosidl_default_generators REQUIRED)rosidl_generate_interfaces(${PROJECT_NAME}"msg/HardwareStatus.msg"
)ament_export_dependencies(rosidl_default_runtime)
ament_package()
然后编译工作包,就可以在其他工程中使用该Msg定义了。
使用自定义的Msg
在my_py_pkg工作包下创建一个新的publisher:hw_status_publisher.py
#!/usr/bin/env python3
import rclpy
from rclpy.node import Node
from my_robot_interfaces.msg import HardwareStatusclass HardwareStatusPublisherNode(Node):def __init__(self):super().__init__("hardware_status_publisher")self.hw_status_publisher = self.create_publisher(HardwareStatus, "hardware_status", 10)self.timer = self.create_timer(1, self.publish_hw_status)self.get_logger().info("Hardware Status Publisher has been started")def publish_hw_status(self):msg = HardwareStatus()msg.temperature = 45msg.debug_message = "No error"self.hw_status_publisher.publish(msg)def main(args=None):rclpy.init(args=args)node = HardwareStatusPublisherNode()rclpy.spin(node)rclpy.shutdown()if __name__ == "__main__":main()
不要忘记在package.xml中添加对my_robot_interface的依赖,并把新的节点加载到setup.py并编译。运行haredware_status_publisher并监听,可以看到如下的效果:
创建自定义的Srv
在my_robot_interfaces工作包目录下创建srv文件夹,并新建文件ComputerRectangleArea.srv
float64 length
float64 width
---
float64 area
更新CMakeLists.txt
成功编译了工作包后我们就能够获得自定义的Srv了
如有错误,欢迎留言或来信指正:hbin6358@163.com