文章目录
- 前言
- 简单例程
- 运行小海龟仿真
- 启动节点
- 查看计算图
- 发布 Topic
- 调用 Serviece
- 用 Python 发布和接收 Topic
- 创建工作空间
- 创建功能包,编译
- 编写 Topic Publisher 节点
- 编写 Topic Subscriber 节点
- 运行节点
- 自定义消息类型
- 用 Python 注册和调用 Serviece
- 新建功能包
- 在 srv 文件夹下,创建 AddTwoNum.srv 文件
- 同上节,修改 package.xml 和 CMakeLists.txt 文件后编译
- 编写 Service 服务器代码
- 编写 Service 客户端代码
- 运行代码
前言
💫你好,我是辰chen,本文旨在准备考研复试或就业
💫本文内容是我为复试准备的第二个项目
💫欢迎大家的关注,我的博客主要关注于考研408以及AIoT的内容
🌟 预置知识:基本Python语法,基本linux命令行使用
以下的几个专栏是本人比较满意的专栏(大部分专栏仍在持续更新),欢迎大家的关注:
💥ACM-ICPC算法汇总【基础篇】
💥ACM-ICPC算法汇总【提高篇】
💥AIoT(人工智能+物联网)
💥考研
💥CSP认证考试历年题解
简单例程
运行小海龟仿真
打开虚拟机,进入Vscode,创建一个新窗口和新终端
启动节点
在终端依次运行如下指令
roscore
rosrun turtlesim turtlesim_node
rosrun turtlesim turtle_teleop_key
roscore
:启动 Master 节点
启动后把Master这个界面放到一边,再开一个终端,运行 rosrun turtlesim turtlesim_node
,结果如下,rosrun
后面跟了两个参数,其中 turtlesim
就是功能包的名字,turtlesim_node
就是可执行文件的名字,对应着 src 中的一个文件叫做 turtlesim_node
,用 rosrun
就把这个文件跑起来了,跑起来的结果就是出现了一个小海龟,接着再跑下一个节点
把终端拆分:
运行第二个节点:rosrun turtlesim turtle_teleop_key
,如此,两个节点就都启动好了,第一个节点是一个可视化的模拟器,第二个节点是可以用键盘去控制小海龟(用箭头键移动海龟,按q退出控制)
注意我们在控制小海龟进行移动的时候,最后一次点击需要停留在第二个节点所在的终端。
查看计算图
我们的两个节点是在两个终端运行的,但是却协同运行,我们可以查看一下计算图:
rosrun rqt_graph rqt_graph
继续拆分终端后运行 rosrun rqt_graph rqt_graph
结果如下:
如图,有两个椭圆,即有两个节点,有一个方框,即有一个 Topic,Topic 的名字叫做 /turtle1/cmd_vel,该 Topic 由节点 /teleop_turtle 发布,由 /turtlesim 订阅,每按下一次键盘都会有一个 Topic 发布,并被另一节点接收。
也就是说,海归模拟器只要收到了 /turtle1/cmd_vel 这个 Topic 之后,就会让海龟去做相应的移动,即只要有该 Topic 发出,海龟就可以移动,即不一定必须通过 /teleop_turtle 这个节点发出,下面试一下不通过 /teleop_turtle 节点直接发送该 Topic
发布 Topic
# -r 参数设置发送频率, 不带 -r 参数则只发送一次,我们让海龟动起来的话需要持续的发送,1为1Hz,即每秒发1次
rostopic pub -r 1 /turtle1/cmd_vel ...
按q退出,新建终端:
使用 -h
可以查看命令的使用方式:
写命令行多用 Tab
补全,可以查看目前有哪些 Topic:
可以看到目前不止一个 Topic,当然我们需要的 /turtle1/cmd_vel 也在其中,但是我们刚刚的图中却仅显示了一个 Topic,这是因为我们只选择了 active(活跃:有发布者有订阅者)的 Topic,选择 all 即可查看其余的 Topic:
可以看到,/turtle1/color_senor 和 /turtle1/pose 这两个 Topic 只有发布者,没有订阅者,剩余的 Topic 是用来做系统调试的,关掉 Debug:
接下来回到正题:用 Topic 直接控制小海龟:
再按下 Tab
会自动补全 Topic 的数据类型(知道了 Topic 的名字就知道了它的数据类型):
再按下 Tab
会自动补全要发送的内容(数据类型如何定义 ROS 也是知道的):
类型由两个字段组成:linear : 线性线速度,angular : 角速度(均是三维),现在让它往前走:前是x轴,让它转圈,转圈就是z轴:
可以看到小海龟不停的在转圈:
刷新计算图:
就会发现之前的节点 /teleop_turtle 没有了(之前按q退出了),多了一个命令行工作自动创建的一个节点 /rostopic_7836_1705655818540,但本质上,当前计算图和之前的计算图的拓扑结构没有什么区别。
Topic演示到此结束,按 Ctrl C
停止,接下来演示 Service
调用 Serviece
rosservice list
rosservice call /spawn ...
同样,我们可以用 -h 进行查看:
其中的 list 就是列举当前活跃的 Service,继续查看:
如下就是 turtle1 创建的 Service
/spawn 用来新建一个机器人,就是新建一个海龟的意思
再按下 Tab
,自动补全调用服务需要的参数:
可以看到有 x,y,z 这样的参数,x,y,z就是新的小海龟出生的位置坐标,theta就是朝向,可以修改部分值然后进行创建:
坐标的定义:x,y轴就是在最左下角的位置,这样,我们就通过调用 Service 创建了一个海龟,再来看一下计算图的变化:
可以看到就有了两个海龟。
关掉所有终端,Ctrl C
用 Python 发布和接收 Topic
上面我们讲述了用命令行区发送 Topic,当然我们也可以用代码去发送(以后大多也是用代码进行发送)
创建工作空间
新开一个终端,依次执行如下命令
# 在用户目录下创建工作空间
cd ~
mkdir -p learn_ws/src# 在工作空间的根目录编译
cd learn_ws/
catkin_make
-p
递归创建
找到我们创建好的文件夹:
创建功能包,编译
新开一个终端,依次执行如下命令
# 进入工作空间的src文件夹
cd ~/learn_ws/src# 创建功能包,指定名称、依赖
catkin_create_pkg learn_topic std_msgs rospy# 回到工作空间根目录编译,每加入一个功能包都要重新编译
cd ..
catkin_make# 编译后,setup文件会更新,需要重新source
source devel/setup.bash
执行完catkin_create_pkg learn_topic std_msgs rospy
可以看到左侧多了 learn_topic
以及其内的 src
以及相关文件
编写 Topic Publisher 节点
src下新建一个Python文件talker.py
#! /usr/bin/env python
import rospy
from std_msgs.msg import Stringdef talker():# 向 ROS Master 注册名为 talker 的节点rospy.init_node('talker')# 创建 Topic 发布者, Topic 名称为 test_msg, 数据类型为 String, 消息队列长度为10pub = rospy.Publisher('test_msg', String, queue_size=10)# 定义 2Hz 的 rate 变量, 即每次 sleep 时长为 0.5srate = rospy.Rate(2)# 在节点没有关闭时,执行循环while not rospy.is_shutdown():msg = f'Hello ROS! From talker, at {rospy.get_time()}'# 在终端打印日志rospy.loginfo(msg)# 发布 Topic pub.publish(msg)# 等待 0.5srate.sleep()rospy.loginfo('Talker exist.')if __name__ == '__main__':talker()
记得写完后保存代码
解释如下:
#! /usr/bin/env python
该代码不可缺少,为指定用Python区运行代码,否则ROS可能不知道
rospy.init_node('talker')
向 Master 注册一个节点,运行到这一行时,计算图里就会多一个叫做 talker 的节点
pub = rospy.Publisher('test_msg', String, queue_size=10)
每一个 Topic 都要用名字以及类型去定义,此外这里还指定了一个消息队列长度,这是因为 Topic 是异步通信,发送者是只管发,至于接收者是否及时接收到,接收的速度够不够对于发送者而言都是不知道的,这里 queue_size=10
就意味着在接收者有一个队列,长度为10,每次发的消息都会送入队列中,每次接收者取消息都会出队,如果设为 0
,即队列无限长,此时如果发布的速度比接收的速度快,就会导致队列越来越长,最后爆内存,如果设为 1
那么实时性就会比较高,即会取到最新的 Topic 信息
rate = rospy.Rate(2) # 作用就是为了后续的 sleep()
rate.sleep()
while not rospy.is_shutdown():
没有关闭就会进入循环,关闭条件如在终端中按下Ctrl C
msg = f'Hello ROS! From talker, at {rospy.get_time()}'
rospy.loginfo(msg)
pub.publish(msg)
rate.sleep()
获取一个时间戳并打印、发布,然后睡眠0.5s后继续发
编写 Topic Subscriber 节点
src下新建一个Python文件listener.py
#! /usr/bin/env python
import rospy
from std_msgs.msg import Stringdef get_test_msg(data):# data.data 中是 Topic 数据rospy.loginfo(f'Listener get msg: {data.data}')def listener():rospy.init_node('listener')# 创建 Topic 订阅者, 订阅名为 test_msg 的 Topic, 数据类型为 String, 收到 Topic 后的回调函数是 get_test_msgrospy.Subscriber('test_msg', String, get_test_msg)# 防止程序提前退出,等到节点关闭时再退出rospy.spin()if __name__ == '__main__':listener()
记得写完后保存代码
解释如下:
rospy.init_node('listener')
注册节点
rospy.Subscriber('test_msg', String, get_test_msg)
告诉 Master 想要订阅 test_msg 这个 Topic,类型是 String,回调函数为 get_test_msg
回调函数是一种在编程中常见的模式,特别是在处理异步事件或消息传递时。它基本上是一个被传递到另一个函数或方法中的函数,然后在适当的时刻被调用。回调函数的概念是核心的一部分,特别是在消息订阅和事件处理方面。
简单理解回调函数:设函数A是回调函数,还有另一个函数是函数B,所谓回调函数,就是因为A是函数B的一个参数,当有特定的事件发生时,函数B会执行函数A
在这里,get_test_msg 是一个回调函数。当 test_msg 话题上接收到一个新消息时,ROS 会自动调用 get_test_msg 函数,并将接收到的消息作为参数传递给这个函数。这样,你就可以在 get_test_msg 函数内定义当接收到消息时你希望执行的操作。
rospy.spin()
我们可以对这句代码 Ctrl 左键
查看其源码:
其实可以看到就是让它睡 0.5s,和我们 talker 中写的一样,其实目的就是为了防止程序退出,因为退出了的话,你创建的 Subscriber 也就被销毁了,这个进程就没了,不能再处理;所以如果本身有一个循环,如编写 Topic Publisher 中的那样,就不需要再 spin(),没有循环的话,如编写 Topic Subscriber 这样,就要调用一下 spin()
def get_test_msg(data):
这个函数的参数 data 就是我们拿到的 Topic
rospy.loginfo(f'Listener get msg: {data.data}')
打印拿到的数据
运行节点
新建一个终端
# 新建的 Python 文件,需要添加运行权限
chmod +x src/learn_topic/src/*# 运行
roscore
rosrun learn_topic talker.py
rosrun learn_topic listener.py# 工具
rosrun rqt_graph rqt_graph # 下面只演示了这个工具
rosrun rqt_topic rqt_topic
拆分终端,输入 source devel/setup.bash
(启动 Master,因为刚刚的 Master关了),继续按上述代码运行:
新建终端,输入 source devel/setup.bash
可以看到可以正常的发出 Topic 和接收 Topic
新建一个终端:
rqt_topic
可以用可视化的方式把现在有的 Topic 都显示出来,✔代表接收这个 Topic:
就会显示它的带宽,以及他的频率为 2Hz(就是我们当初设置的那样),展开后可以查看 Type 和 Value
自定义消息类型
上述中,我们用的是 String,只是发布了一个很简单的字符串,我们再很多条件下是需要自定义消息类型的,去应对更复杂的需求:
.msg
文件:描述 ROS 消息的文本文件,是一种编程语言无关的接口,用于为不同编程语言的消息生成源代码.srv
文件:与msg文件类似,但用来描述一个 Service,包括请求参数和返回值两部分- 可用的数据类型
• int8, int16, int32, int64(加uint*)
• float32, float64
• string
• time, duration
• 变长数组<type>[]
,定长数组<type>[N]
• 其他自定义类型,即支持类型嵌套
使用自定义消息类型的 Topic:
1.在上面创建的工作空间,新建功能包
cd src
catkin_create_pkg custom_msg std_msgs rospy
创建名为:custom_msg 的功能包,有两个依赖:std_msgs、rospy
2.在功能包的 msg 文件夹下,创建 robot_state.msg 文件
注:文件名必须叫这个名,在该文件下输入下述代码
string name
uint8 id
float64 speed
float64[2] position
记得写完后保存代码
这里就是编程无关的语言了,接下来,就是要让 ROS 去编译这个文件,然后自动生成,这些数据类型它在 Python 下面对应的这些代码和类型是什么,这些都是自动完成的
3.修改 package.xml
,添加依赖项,用于生成自定义类型的源代码(C++,Python等)
注意找对 package,每个功能包里都有一个 package
把如下代码添入 package 中:
<build_depend>message_generation</build_depend>
<exec_depend>message_runtime</exec_depend>
记得写完后保存代码
4.修改 CMakeLists.txt
,在编译时自动生成源代码(不要直接复制,逐项修改)
注意找对 CmakeLists,每个功能包里都有一个 CmakeLists
# 在 find_package 中添加 message_generation, 修改后如下:
find_package(catkin REQUIRED COMPONENTSrospystd_msgsmessage_generation
)# 在 catkin_package 中添加 CATKIN_DEPENDS message_runtime,修改后如下:
catkin_package(
# INCLUDE_DIRS include
# LIBRARIES custom_msgCATKIN_DEPENDS rospy std_msgs message_runtime
# DEPENDS system_lib
)# 解除 add_message_files 的注释, 添加 .msg文件,修改后如下:
add_message_files(FILESrobot_state.msg
)# 解除 generate_messages 的注释, 修改后如下:
generate_messages(DEPENDENCIESstd_msgs
)
记得写完后保存代码
5.编译
cd .. # 就是来到 learn_ws 下
catkin_make
6.编写发布节点
此代码相当于是 Topic 中的 Talker
#! /usr/bin/env python
import rospy
from custom_msg.msg import robot_statedef pub_state():rospy.init_node('robot_state_publisher')pub = rospy.Publisher('robot_state', robot_state, queue_size=10)rate = rospy.Rate(1)pos = [0, 0]speed = 0.5while not rospy.is_shutdown():msg = robot_state()msg.id = 0msg.name = 'qrobo'msg.position = posmsg.speed = speedpub.publish(msg)pos[0] += 0.5rate.sleep()if __name__ == '__main__':pub_state()
记得写完后保存代码
注意这个代码运行的话不会有输出的:因为代码中没有让打印日志之类的信息,运行代码前要记得要给新建的 Python 文件添加运行权限。
7.(可选)将编译的源代码加入 VSCode 的 Python 依赖路径,这一步只是为了写代码时 VSCode 能进行补全、分析和跳转,不影响代码运行。此操作每个工作空间做一次就可以
在下图标准位置填入下述代码:
,"${workspaceFolder}/devel/lib/python3/dist-packages"
记得写完后保存代码
用 Python 注册和调用 Serviece
新建功能包
新开一个终端:
cd src
catkin_create_pkg learn_service std_msgs rospy
在 srv 文件夹下,创建 AddTwoNum.srv 文件
---
上面是请求参数,下面是返回参数
int64 A
int64 B
---
int64 Sum
记得写完后保存代码
同上节,修改 package.xml 和 CMakeLists.txt 文件后编译
注意找对 package,每个功能包里都有一个 package
把如下代码添入 package 中:
<build_depend>message_generation</build_depend>
<exec_depend>message_runtime</exec_depend>
注意找对 CmakeLists,每个功能包里都有一个 CmakeLists
# 在 find_package 中添加 message_generation, 修改后如下:
find_package(catkin REQUIRED COMPONENTSrospystd_msgsmessage_generation
)# 在 catkin_package 中添加 CATKIN_DEPENDS message_runtime,修改后如下:
catkin_package(
# INCLUDE_DIRS include
# LIBRARIES custom_msgCATKIN_DEPENDS rospy std_msgs message_runtime
# DEPENDS system_lib
)# 解除 add_service_files 的注释, 添加 .srv文件,修改后如下:
add_service_files(FILESAddTwoNum.srv
)# 解除 generate_messages 的注释, 修改后如下:
generate_messages(DEPENDENCIESstd_msgs
)
记得写完后保存代码
编写 Service 服务器代码
记得写完后保存代码
#! /usr/bin/env python
import rospy
from learn_service.srv import AddTwoNum, AddTwoNumRequest, AddTwoNumResponse# AddTwoNum 的处理函数,参数为 AddTwoNumRequest, 返回 AddTwoNumResponse
def handle_add_two_num(req: AddTwoNumRequest):sum = req.A + req.Brospy.loginfo(f'{req.A} + {req.B} = {sum}')return AddTwoNumResponse(sum)if __name__ == '__main__':rospy.init_node('add_two_num_server')# 创建 Service 服务器, 类型为 AddTwoNum, 处理函数为 handle_add_two_numrospy.Service('add_two_num', AddTwoNum, handle_add_two_num)rospy.loginfo('add_two_num server ready.')rospy.spin()
编写 Service 客户端代码
#! /usr/bin/env python
import rospy
from learn_service.srv import *def add_two_num_client(a, b):# 等待该 Service 在 Master 中注册rospy.loginfo('waitting for service: add_two_num ...')rospy.wait_for_service('add_two_num')# 捕获可能发生的错误try:# 请求服务service = rospy.ServiceProxy('add_two_num', AddTwoNum)res: AddTwoNumResponse = service(a, b)return res.Sumexcept rospy.ServiceException as e:rospy.logerr(f'Service call failed: {e}')if __name__ == '__main__':rospy.init_node('add_two_num_client')a = 2b = 3rospy.loginfo(f'request service: {a} + {b}')res = add_two_num_client(a, b)rospy.loginfo(f'get response: {res}')
记得写完后保存代码
部分代码解释:
rospy.wait_for_service('add_two_num')
欲调用 add_to_num
这个 Service,如果此刻没有的话,程序就会在这一步停住,等待其注册
service = rospy.ServiceProxy('add_two_num', AddTwoNum)
这行代码创建了一个服务代理,名为 service
。rospy.ServiceProxy
是创建服务代理的方法。这个代理允许你通过一个简单的本地函数调用来调用远程服务。
add_two_num
是你想要连接的服务的名称。
AddTwoNum
是服务的类型,它是一个自动生成的 Python
类,用于表示服务的请求和响应结构。
一旦这个代理被创建,你就可以像调用本地函数一样调用服务。
res: AddTwoNumResponse = service(a, b)
这行代码实际上是在调用服务。当你使用 service
代理并传入参数 a 和 b 时,客户端会向服务器发送一个 AddTwoNumRequest
消息,并等待服务器响应一个 AddTwoNumResponse
消息。
service(a, b)
调用服务并传入参数 a 和 b。这相当于发送一个包含这些参数的服务请求。
res: AddTwoNumResponse
这部分是类型注解,它指明变量 res
应该是一个 AddTwoNumResponse
类型的实例。这不是调用服务必需的,但它有助于代码的可读性和类型检查。
服务的调用是阻塞的,这意味着程序将在这一行暂停执行,直到服务响应被接收或发生错误。
运行代码
记得要给代码添加权限,以及编译代码:
catkin_make # 编译代码
chmod +x src/learn_service/src/* # 给代码加运行权限
roscore
source devel/setup.bash # 新建的终端要运行该命令
rosrun learn_service server.py
source devel/setup.bash # 新建的终端要运行该命令
rosrun learn_service client.py
上述所有内容出处如下,博主在此基础上仅为添加个人理解:
本项目为北大团队出品【项目三:深度学习&仿真机器人 - 丘丘老师】原创(部分代码为开源代码)。课程团队:B站ID【M学长的考研top帮】UID【3546580235848566】复试项目班QQ大群:885884619,负责人QQ:674799975