7.5.1 需求及设计
又到了小鱼老师带着做最佳实践项目了。需求:做一个在各个房间不断巡逻并记录图像的机器人。
到达目标点后首先通过语音播放到达目标点信息,
再通过摄像头拍摄一张图片保存到本地。
7.5.2 编写巡检控制节点
在chapt7_ws/src下新建功能包,添加rclpy和nav2_simple_commander依赖。
目录src/autopartol_robot/autopartol_robot/新建文件patrol_node.py
from geometry_msgs.msg import PoseStamped,Pose
from nav2_simple_commander.robot_navigator import BasicNavigator,TaskResult
import rclpy
from rclpy.node import Node
import rclpy.time
from tf2_ros import TransformListener, Buffer #坐标监听器
from tf_transformations import quaternion_from_euler,euler_from_quaternion# 欧拉角转换class PatrolNode(BasicNavigator):def __init__(self,node_name='patrol_node'):super().__init__(node_name)self.buffer_ = Buffer()self.listner_ = TransformListener(self.buffer_,self)#声明相关参数self.declare_parameter('initial_point',[0.0, 0.0, 0.0])self.declare_parameter('target_points',[0.0, 0.0, 0.0, 1.0, 1.0, 1.57])self.initial_point_ = self.get_parameter('initial_point').valueself.initial_points_= self.get_parameter('target_points').valuedef get_pose_by_xyyaw(self, x, y, yaw):# 通过x,y,yaw return PoseStamped对象 pose = PoseStamped()pose.header.frame_id = 'map'pose.pose.position.x = xpose.pose.position.y = yquat = quaternion_from_euler(0,0,yaw)pose.pose.orientation.x =quat[0]pose.pose.orientation.y =quat[1]pose.pose.orientation.z =quat[2]pose.pose.orientation.w =quat[3]return posedef init_robot_pose(self):#初始化机器人位姿self.initial_point_ = self.get_parameter('initial_point').valueinit_pose = self.get_pose_by_xyyaw(self.initial_point_[0],self.initial_point_[1],self.initial_point_[2]) self.setInitialPose(init_pose)self.waitUntilNav2Active()#等待导航可用def get_target_points(self):#通过参数值获取目标点集合points = []self.target_points_ = self.get_parameter('target_points').valuefor index in range(int(len(self.target_points_)/3)):x = self.target_points_[index*3]y = self.target_points_[index*3+1]yaw = self.target_points_[index*3+2]points.append([x,y,yaw])self.get_logger().info(f'获取到目标点:{index}->{x},{y},{yaw}')return points def nav_to_pose(self,target_pose):#导航到指定位姿self.waitUntilNav2Active()self.goToPose(target_pose)while not self.isTaskComplete():feedback = self.getFeedback()self.get_logger().info(f'剩余距离:{feedback.distance_remaining }')reslut = self.getResult()self.get_logger().info(f'导航结果:{reslut}') def get_cuurent_pose(self):#通过TF获取当前位姿while rclpy.ok():try:result = self.buffer_.lookup_transform('map','base_footprint',rclpy.time.Time(seconds=0),rclpy.time.Duration(seconds=1))transform = result.transformself.get_logger().info(f'平移:{transform.translation}')# self.get_logger().info(f'旋转:{transform.rotation}')# ratation_euler = euler_from_quaternion([# transform.rotation.x,# transform.rotation.y,# transform.rotation.z,# transform.rotation.w]# )# self.get_logger().info(f'平移rpy:{ratation_euler}')return transformexcept Exception as e:self.get_logger().warn(f'获取坐标转换异常{str(e)}')def main():rclpy.init()patrol = PatrolNode() #节点patrol.initial_pose()while rclpy.ok():points = patrol.get_target_points()for point in points:x,y,yaw = point[0],point[1],point[2]target_pose = patrol.get_pose_by_xyyaw(x,y,yaw)patrol.nav_to_pose(target_pose)rclpy.shutdown()
基本上吧之前的单接口调用综合起来。
为方便参数配置,在src/autopartol_robot/新建目录config,新增配置文件
patrol_config.yaml
patrol_node:ros__parameters:initial_point: [0.0, 0.0, 0.0]target_points: [0.0,0.0,0.0,1.0,2.0,3.14,-4.5,1.5,1.57,-8.0,-5.0,1.57,1.0,-5.0,3.14,]
注意目标点的选取是机器人可达的位置,不能选到障碍物。
启动gazebo模拟器,启动nav2.
再启动patrol_node
ros2 run autopartol_robot patrol_node --ros-args --params-file /home/bohu/chapt7/chapt7_ws/install/autopartol_robot/share/autopartol_robot/config/patrol_config.yaml
会发现等一会开始初始化地图后,开始沿着设定目标点运动。效果如下
也有异常情况,机器人靠墙太近,gazebo看出距离墙还有段距离,但是在rviz里面局部代价地图有一点变形,导致机器以为有障碍物卡住的现象。
7.5.3 添加语音播报功能
在chatpt7_ws/src下新建功能包autopatrol_interfaces,功能包下新建目录srv,srv新新建接口文件
speachText.srv
string text # 合成文字
---
bool result # 合成结果
在CMakeLists.txt注册,并在package.xml添加标签
<member_of_group>rosidl_interface_packages</member_of_group>
重新构建
src/autopartol_robot/autopartol_robot/目录下新建文件speaker.py
import rclpy
from rclpy.node import Node
from autopatrol_interfaces.srv import SpeachText
import espeakngclass Speaker(Node):def __init__(self):super().__init__('speaker')self.speech_service_ = self.create_service(SpeachText,'speech_text',self.speech_text_callback)self.speaker_ = espeakng.Speaker()self.speaker_.voice ='zh'def speech_text_callback(self,request,response):self.get_logger().info(f'正在朗读 {request.text}')self.speaker_.say(request.text)self.speaker_.wait()response.result = Truereturn responsedef main():rclpy.init()node = Speaker()rclpy.spin(node)rclpy.shutdown()
修改patrol_node.py,增加语音合成调用
from geometry_msgs.msg import PoseStamped,Pose
from nav2_simple_commander.robot_navigator import BasicNavigator,TaskResult
import rclpy
from rclpy.node import Node
import rclpy.time
from tf2_ros import TransformListener, Buffer #坐标监听器
from tf_transformations import quaternion_from_euler,euler_from_quaternion# 欧拉角转换
from autopatrol_interfaces.srv import SpeachTextclass PatrolNode(BasicNavigator):def __init__(self,node_name='patrol_node'):super().__init__(node_name)self.buffer_ = Buffer()self.listner_ = TransformListener(self.buffer_,self)#声明相关参数self.declare_parameter('initial_point',[0.0, 0.0, 0.0])self.declare_parameter('target_points',[0.0, 0.0, 0.0, 1.0, 1.0, 1.57])self.initial_point_ = self.get_parameter('initial_point').valueself.initial_points_= self.get_parameter('target_points').valueself.speech_client_ = self.create_client(SpeachText,'speech_text')def get_pose_by_xyyaw(self, x, y, yaw):# 通过x,y,yaw return PoseStamped对象 pose = PoseStamped()pose.header.frame_id = 'map'pose.pose.position.x = xpose.pose.position.y = yquat = quaternion_from_euler(0,0,yaw)pose.pose.orientation.x =quat[0]pose.pose.orientation.y =quat[1]pose.pose.orientation.z =quat[2]pose.pose.orientation.w =quat[3]return posedef init_robot_pose(self):#初始化机器人位姿self.initial_point_ = self.get_parameter('initial_point').valueinit_pose = self.get_pose_by_xyyaw(self.initial_point_[0],self.initial_point_[1],self.initial_point_[2]) self.setInitialPose(init_pose)self.waitUntilNav2Active()#等待导航可用def get_target_points(self):#通过参数值获取目标点集合points = []self.target_points_ = self.get_parameter('target_points').valuefor index in range(int(len(self.target_points_)/3)):x = self.target_points_[index*3]y = self.target_points_[index*3+1]yaw = self.target_points_[index*3+2]points.append([x,y,yaw])self.get_logger().info(f'获取到目标点:{index}->{x},{y},{yaw}')return points def nav_to_pose(self,target_pose):#导航到指定位姿self.waitUntilNav2Active()self.goToPose(target_pose)while not self.isTaskComplete():feedback = self.getFeedback()self.get_logger().info(f'剩余距离:{feedback.distance_remaining }')reslut = self.getResult()self.get_logger().info(f'导航结果:{reslut}') def get_cuurent_pose(self):#通过TF获取当前位姿while rclpy.ok():try:result = self.buffer_.lookup_transform('map','base_footprint',rclpy.time.Time(seconds=0),rclpy.time.Duration(seconds=1))transform = result.transformself.get_logger().info(f'平移:{transform.translation}')# self.get_logger().info(f'旋转:{transform.rotation}')# ratation_euler = euler_from_quaternion([# transform.rotation.x,# transform.rotation.y,# transform.rotation.z,# transform.rotation.w]# )# self.get_logger().info(f'平移rpy:{ratation_euler}')return transformexcept Exception as e:self.get_logger().warn(f'获取坐标转换异常{str(e)}')def speech_text(self,text):#调用服务合成语音while not self.speech_client_.wait_for_service(timeout_sec=1):self.get_logger().info("wait for speech service")request = SpeachText.Request()request.text = textfuture = self.speech_client_.call_async(request)rclpy.spin_until_future_complete(self,future)if future.result() is not None:result = future.result().resultif result:self.get_logger().info(f'语音合成成功:{text}')else:self.get_logger().warn(f'语音合成失败:{text}')else:self.get_logger().warn('语音合成服务请求失败')def main():rclpy.init()patrol = PatrolNode() #节点patrol.speech_text('正在准备初始化位置')patrol.init_robot_pose()patrol.speech_text('初始化位置完成')while rclpy.ok():points = patrol.get_target_points()for point in points:x,y,yaw = point[0],point[1],point[2]target_pose = patrol.get_pose_by_xyyaw(x,y,yaw)patrol.speech_text(text=f'准备前往目标点{x},{y}')patrol.nav_to_pose(target_pose)rclpy.shutdown()
效果跟上面类似,日志输出多了语音的
[speaker-2] [INFO] [1737017187.818829250] [speaker]: 正在朗读 准备前往目标点1.0,2.0
[patrol_node-1] [INFO] [1737017191.194245817] [patrol_node]: 语音合成成功:准备前往目标点1.0,2.0
[patrol_node-1] [INFO] [1737017195.311579068] [patrol_node]: Nav2 is ready for use!
[patrol_node-1] [INFO] [1737017195.314096555] [patrol_node]: Navigating to goal: 1.0 2.0...
[patrol_node-1] [INFO] [1737017195.509438991] [patrol_node]: 剩余距离:0.2445448786020279
[patrol_node-1] [INFO] [1737017195.612344544] [patrol_node]: 剩余距离:2.230710744857788
[patrol_node-1] [INFO] [1737017195.716231929] [patrol_node]: 剩余距离:2.230710744857788
[patrol_node-1] [INFO] [1737017195.819218225] [patrol_node]: 剩余距离:2.230710744857788
[patrol_node-1] [INFO] [1737017195.923079415] [patrol_node]: 剩余距离:2.230710744857788
7.5.4订阅图像并记录
from geometry_msgs.msg import PoseStamped,Pose
from nav2_simple_commander.robot_navigator import BasicNavigator,TaskResult
import rclpy
from rclpy.node import Node
import rclpy.time
from tf2_ros import TransformListener, Buffer #坐标监听器
from tf_transformations import quaternion_from_euler,euler_from_quaternion# 欧拉角转换
from autopatrol_interfaces.srv import SpeachText
from sensor_msgs.msg import Image #消息接口
from cv_bridge import CvBridge #图像转换
import cv2 #保存图像class PatrolNode(BasicNavigator):def __init__(self,node_name='patrol_node'):super().__init__(node_name)self.buffer_ = Buffer()self.listner_ = TransformListener(self.buffer_,self)#声明相关参数self.declare_parameter('initial_point',[0.0, 0.0, 0.0])self.declare_parameter('target_points',[0.0, 0.0, 0.0, 1.0, 1.0, 1.57])self.initial_point_ = self.get_parameter('initial_point').valueself.initial_points_= self.get_parameter('target_points').valueself.speech_client_ = self.create_client(SpeachText,'speech_text')# 订阅与保存图像相关定义self.declare_parameter('image_save_path', '')self.image_save_path = self.get_parameter('image_save_path').valueself.bridge = CvBridge()self.latest_img_ = Noneself.image_sub_ = self.create_subscription(Image,'/camera_sensor/image_raw',self.img_callback,1)def img_callback(self,msg):self.latest_img_ = msgdef record_img(self):if self.latest_img_ is not Node:pose = self.get_cuurent_pose()cv_image = self.bridge.imgmsg_to_cv2(self.latest_img_)cv2.imwrite(f'{self.image_save_path}img_{pose.translation.x:3.2f}_{pose.translation.y:3.2f}.png',cv_image) def get_pose_by_xyyaw(self, x, y, yaw):# 通过x,y,yaw return PoseStamped对象 pose = PoseStamped()pose.header.frame_id = 'map'pose.pose.position.x = xpose.pose.position.y = yquat = quaternion_from_euler(0,0,yaw)pose.pose.orientation.x =quat[0]pose.pose.orientation.y =quat[1]pose.pose.orientation.z =quat[2]pose.pose.orientation.w =quat[3]return posedef init_robot_pose(self):#初始化机器人位姿self.initial_point_ = self.get_parameter('initial_point').valueinit_pose = self.get_pose_by_xyyaw(self.initial_point_[0],self.initial_point_[1],self.initial_point_[2]) self.setInitialPose(init_pose)self.waitUntilNav2Active()#等待导航可用def get_target_points(self):#通过参数值获取目标点集合points = []self.target_points_ = self.get_parameter('target_points').valuefor index in range(int(len(self.target_points_)/3)):x = self.target_points_[index*3]y = self.target_points_[index*3+1]yaw = self.target_points_[index*3+2]points.append([x,y,yaw])self.get_logger().info(f'获取到目标点:{index}->{x},{y},{yaw}')return points def nav_to_pose(self,target_pose):#导航到指定位姿self.waitUntilNav2Active()self.goToPose(target_pose)while not self.isTaskComplete():feedback = self.getFeedback()self.get_logger().info(f'剩余距离:{feedback.distance_remaining }')reslut = self.getResult()self.get_logger().info(f'导航结果:{reslut}') def get_cuurent_pose(self):#通过TF获取当前位姿while rclpy.ok():try:result = self.buffer_.lookup_transform('map','base_footprint',rclpy.time.Time(seconds=0),rclpy.time.Duration(seconds=1))transform = result.transformself.get_logger().info(f'平移:{transform.translation}')# self.get_logger().info(f'旋转:{transform.rotation}')# ratation_euler = euler_from_quaternion([# transform.rotation.x,# transform.rotation.y,# transform.rotation.z,# transform.rotation.w]# )# self.get_logger().info(f'平移rpy:{ratation_euler}')return transformexcept Exception as e:self.get_logger().warn(f'获取坐标转换异常{str(e)}')def speech_text(self,text):#调用服务合成语音while not self.speech_client_.wait_for_service(timeout_sec=1):self.get_logger().info("wait for speech service")request = SpeachText.Request()request.text = textfuture = self.speech_client_.call_async(request)rclpy.spin_until_future_complete(self,future)if future.result() is not None:result = future.result().resultif result:self.get_logger().info(f'语音合成成功:{text}')else:self.get_logger().warn(f'语音合成失败:{text}')else:self.get_logger().warn('语音合成服务请求失败')def main():rclpy.init()patrol = PatrolNode() #节点patrol.speech_text('正在准备初始化位置')patrol.init_robot_pose()patrol.speech_text('初始化位置完成')while rclpy.ok():points = patrol.get_target_points()for point in points:x,y,yaw = point[0],point[1],point[2]target_pose = patrol.get_pose_by_xyyaw(x,y,yaw)patrol.speech_text(text=f'准备前往目标点{x},{y}')patrol.nav_to_pose(target_pose)patrol.speech_text(text=f'已到达目标点{x},{y},准备记录图像')patrol.record_img()patrol.speech_text(text=f'图像记录完成')rclpy.shutdown()
重新构建后运行,拍照效果如下