1.创建机器人节点接口
cd chapt3_ws/
ros2 pkg create example_interfaces_rclpy --build-type ament_python --dependencies rclpy example_ros2_interfaces --destination-directory src --node-name example_interfaces_robot_02 --maintainer-name "Joe Chen" --maintainer-email "1027038527@qq.com"
touch src/example_interfaces_rclpy/example_interfaces_rclpy/example_interfaces_control_02.py
2.控制器节点代码
#!/usr/bin/env python3
import rclpy
from rclpy.node import Node
from example_ros2_interfaces.msg import RobotStatus
from example_ros2_interfaces.srv import MoveRobot
class ExampleInterfacesControl02(Node):
def __init__(self,name):
super().__init__(name)
self.get_logger().info("节点已启动:%s!" % name)
self.client_ = self.create_client(MoveRobot,"move_robot")
self.robot_status_subscribe_ = self.create_subscription(RobotStatus,"robot_status",self.robot_status_callback,10)
def robot_status_callback(self,msg):
self.get_logger().info(f"收到状态数据位置:{msg.pose} 状态:{msg.status}")
def move_result_callback_(self, result_future):
response = result_future.result()
self.get_logger().info(f"收到返回结果:{response.pose}")
def move_robot(self, distance):
while rclpy.ok() and self.client_.wait_for_service(1)==False:
self.get_logger().info(f"等待服务端上线....")
request = MoveRobot.Request()
request.distance = distance
self.get_logger().info(f"请求服务让机器人移动{distance}")
self.client_.call_async(request).add_done_callback(self.move_result_callback_)
def main(args=None):
rclpy.init(args=args) # 初始化rclpy
node = ExampleInterfacesControl02("example_interfaces_control_02") # 新建一个节点
node.move_robot(5.0) #移动5米
rclpy.spin(node) # 保持节点运行,检测是否收到退出指令(Ctrl+C)
rclpy.shutdown() # 关闭rclpy
3.编写机器人节点
#!/usr/bin/env python3
import rclpy
from rclpy.node import Node
from example_ros2_interfaces.msg import RobotStatus
import math
from time import sleep
from example_ros2_interfaces.srv import MoveRobot
class Robot():
def __init__(self) -> None:
self.current_pose_ = 0.0
self.target_pose_ = 0.0
self.status_ = RobotStatus.STATUS_STOP
def get_status(self):
return self.status_
def get_current_pose(self):
return self.current_pose_
def move_distance(self,distance):
self.status_ = RobotStatus.STATUS_MOVEING # 更新状态为移动、
self.target_pose_ += distance # 更新目标位置
while math.fabs(self.target_pose_ - self.current_pose_) > 0.01:
step = distance / math.fabs(distance) * math.fabs(self.target_pose_ - self.current_pose_) * 0.1 # 计算一步移动距离
self.current_pose_ += step # 移动一步
print(f"移动了:{step}当前位置:{self.current_pose_}")
sleep(0.5) #休息0.5s
self.status_ = RobotStatus.STATUS_STOP # 更新状态为停止
return self.current_pose_
class ExampleInterfacesRobot02(Node):
def __init__(self,name):
super().__init__(name)
self.get_logger().info("节点已启动:%s!" % name)
self.robot = Robot()
self.move_robot_server_ = self.create_service(MoveRobot,"move_robot", self.handle_move_robot)
self.robot_status_publisher_ = self.create_publisher(RobotStatus,"robot_status", 10)
self.publisher_timer_ = self.create_timer(0.5, self.publisher_timer_callback)
def publisher_timer_callback(self):
"""
定时器回调发布数据函数
"""
msg = RobotStatus() #构造消息
msg.status = self.robot.get_status()
msg.pose = self.robot.get_current_pose()
self.robot_status_publisher_.publish(msg) # 发布消息
self.get_logger().info(f'发布了当前的状态:{msg.status} 位置:{msg.pose}')
def handle_move_robot(self,request, response):
self.robot.move_distance(request.distance)
response.pose = self.robot.get_current_pose()
return response
def main(args=None):
rclpy.init(args=args) # 初始化rclpy
node = ExampleInterfacesRobot02("example_interfaces_robot_02") # 新建一个节点
rclpy.spin(node) # 保持节点运行,检测是否收到退出指令(Ctrl+C)
rclpy.shutdown() # 关闭rclpy
4.链接、编译、运行节点
setup.py
from setuptools import find_packages, setup
package_name = 'example_interfaces_rclpy'
setup(
name=package_name,
version='0.0.0',
packages=find_packages(exclude=['test']),
data_files=[
('share/ament_index/resource_index/packages',
['resource/' + package_name]),
('share/' + package_name, ['package.xml']),
],
install_requires=['setuptools'],
zip_safe=True,
maintainer='Joe Chen',
maintainer_email='1027038527@qq.com',
description='TODO: Package description',
license='TODO: License declaration',
tests_require=['pytest'],
entry_points={
'console_scripts': [
'example_interfaces_control_02 = example_interfaces_rclpy.example_interfaces_control_02:main',
'example_interfaces_robot_02 = example_interfaces_rclpy.example_interfaces_robot_02:main'
],
},
)
source install/setup.bash
ros2 run example_interfaces_rclpy example_interfaces_robot_02
source install/setup.bash
ros2 run example_interfaces_rclpy example_interfaces_control_02