本文是ROS2基础知识的综合小应用,练习如何创建工作包,创建Node,定义Topic和Service,以及通过LaunchFile启动多个节点。基础知识可以参考:ROS2基础编程,ROS2 Topics和Services,ROS2 LaunchFile和Parameter
更多内容,访问专栏目录获取实时更新。
创建工作包
ros2 pkg create turtlesim_greedy_turtle --build-type ament_python
创建节点
touch turtle_controller.py
touch turtle_spawner.py
chmod +x turtle_controller.py
chmod +x turtle_spawner.py
添加依赖
<depend>rclpy</depend>
<depend>turtlesim</depend>
<depend>geometry_msgs</depend>
<depend>my_robot_interfaces</depend>
创建Msg
Turtle.msg
string name
float64 x
float64 y
float64 theta
TurtleArray.msg
Turtle[] turtles
创建Srv
CatchTurtle.srv
string name
---
bool success
添加节点控制Turtle
turtle_controller.py
#!/usr/bin/env python3
import rclpy
import math
from functools import partial
from rclpy.node import Node
from turtlesim.msg import Pose
from geometry_msgs.msg import Twist
from my_robot_interfaces.msg import Turtle
from my_robot_interfaces.msg import TurtleArray
from my_robot_interfaces.srv import CatchTurtleclass TurtleControllerNode(Node):def __init__(self):super().__init__("turtle_controller")self.pose = Noneself.turtle_to_catch = Noneself.pose_subscriber = self.create_subscription(Pose, "turtle1/pose", self.callback_turtle_pose, 10)self.cmd_vel_publisher = self.create_publisher(Twist, "turtle1/cmd_vel", 10)self.alive_turtles_subscriber = self.create_subscription(TurtleArray, "alive_turtles", self.callback_alive_turtles, 10)self.control_loop_timer = self.create_timer(0.01, self.control_loop)def callback_turtle_pose(self, msg):self.pose = msgdef control_loop(self):if self.pose == None or self.turtle_to_catch == None:return# calcu the distance between current position and target positiondist_x = self.turtle_to_catch.x - self.pose.xdist_y = self.turtle_to_catch.y - self.pose.ydistance = math.sqrt(dist_x * dist_x + dist_y * dist_y)# construct cmd_vel msg to control the turtlemsg = Twist()if distance <= 0.5: # target reachedmsg.linear.x = 0.0 # stop the turtlemsg.angular.z = 0.0self.call_catch_turtle_server(self.turtle_to_catch.name)self.turtle_to_catch = Noneelse: # move to the target positionmsg.linear.x = 2 * distance # optimizegoal_theta = math.atan2(dist_y, dist_x) # orientationdiff = goal_theta - self.pose.thetaif diff > math.pi:diff -= 2 * math.pielif diff < -math.pi:diff += 2 * math.pimsg.angular.z = 6 * diff # optimizeself.cmd_vel_publisher.publish(msg)def callback_alive_turtles(self, msg):if len(msg.turtles) > 0:self.turtle_to_catch = msg.turtles[0]def call_catch_turtle_server(self, turtle_name):client = self.create_client(CatchTurtle, "catch_turtle")while not client.wait_for_service(1.0):self.get_logger().warn("Waiting for server...")request = CatchTurtle.Request()request.name = turtle_namefuture = client.call_async(request)future.add_done_callback(partial(self.callback_call_catch_turtle, turtle_name=turtle_name))def callback_call_catch_turtle(self, future, turtle_name):try:response = future.result()if not response.success:self.get_logger().error("Turtle " + str(turtle_name) + " could not be caught")except Exception as e:self.get_logger().error("Service call failed %r" & (e,))def main(args=None):rclpy.init(args=args)node = TurtleControllerNode()rclpy.spin(node)rclpy.shutdown()if __name__ == "__main__":main()
添加节点产生或移除Turtle
#!/usr/bin/env python3
import rclpy
import random
import math
from functools import partial
from rclpy.node import Node
from turtlesim.srv import Spawn
from turtlesim.srv import Kill
from my_robot_interfaces.msg import Turtle
from my_robot_interfaces.msg import TurtleArray
from my_robot_interfaces.srv import CatchTurtleclass TurtleSpawnerNode(Node):def __init__(self):super().__init__("turtle_spawner")self.declare_parameter("spawn_frequency", 1.0)self.declare_parameter("turtle_name_prefix", "turtle")self.spawn_frequency = self.get_parameter("spawn_frequency").valueself.turtle_name_prefix = self.get_parameter("turtle_name_prefix").valueself.turtle_counter = 0self.alive_turtles = []self.spawn_turtle_timer = self.create_timer(1.0 / self.spawn_frequency, self.spawn_new_turtle)self.alive_turtles_publisher = self.create_publisher(TurtleArray, "alive_turtles", 10)self.catch_turtle_service = self.create_service(CatchTurtle, "catch_turtle", self.callback_catch_turtle)def spawn_new_turtle(self):self.turtle_counter += 1name = self.turtle_name_prefix + "_" + str(self.turtle_counter)x = random.uniform(0.0, 10.0)y = random.uniform(0.0, 10.0)theta = random.uniform(0.0, 2 * math.pi)self.call_spawn_server(name, x, y, theta)def call_spawn_server(self, turtle_name, x, y, theta):client = self.create_client(Spawn, "spawn")while not client.wait_for_service(1.0):self.get_logger().warn("Waiting for server...")request = Spawn.Request()request.name = turtle_namerequest.x = xrequest.y = yrequest.theta = thetafuture = client.call_async(request)future.add_done_callback(partial(self.callback_call_spawn, turtle_name=turtle_name, x=x, y=y, theta=theta))def callback_call_spawn(self, future, turtle_name, x, y, theta):try:response = future.result()if response.name != "":self.get_logger().info("Turtle " + response.name + " is now alive")new_turtle = Turtle()new_turtle.name = turtle_namenew_turtle.x = xnew_turtle.y = ynew_turtle.theta = thetaself.alive_turtles.append(new_turtle)self.publish_alive_turtles()except Exception as e:self.get_logger().error("Service call failed %r" % (e,))def publish_alive_turtles(self):msg = TurtleArray()msg.turtles = self.alive_turtlesself.alive_turtles_publisher.publish(msg)def callback_catch_turtle(self, request, response):self.call_kill_server(request.name)response.success = Truereturn responsedef call_kill_server(self, turtle_name):client = self.create_client(Kill, "kill")while not client.wait_for_service(1.0):self.get_logger().warn("Waiting for server...")request = Kill.Request()request.name = turtle_namefuture = client.call_async(request)future.add_done_callback(partial(self.callback_call_kill, turtle_name=turtle_name))def callback_call_kill(self, future, turtle_name):try:future.result()for (i, turtle) in enumerate(self.alive_turtles):if turtle.name == turtle_name:del self.alive_turtles[i]self.publish_alive_turtles()breakexcept Exception as e:self.get_logger().error("Service call failed %r" & (e,))def main(args=None):rclpy.init(args=args)node = TurtleSpawnerNode()rclpy.spin(node)rclpy.shutdown()if __name__ == "__main__":main()
创建LaunchFile
不要忘记更新my_robot_bringup工作包的package.xml添加依赖
通过LaunchFile启动
ros2 launch my_robot_bringup turtlesim_greedy_turtle.launch.py
最终的效果:
如有错误,欢迎留言或来信指正:hbin6358@163.com