功能
一、Gmapping建图
二、自主导航 起始点 、终点
三、人脸识别
四、语音控制
完成任务: 机器人先建图 建完图后给出目标点,机器人就可以完成调用自主导航走到目标点,期间会调用激光雷达扫描局部环境来进行自主避障,到达终点后进行语音播报和人脸识别
主要功能文件
按照工作目录来讲
一、Gmapping
就是开启运动服务器 然后通过语音控制或者键盘控制让机器人跑一遍地图,在跑的时候机器人会调用激光雷达进行环境扫描 ,绘制地图
二、自主导航
给定机器人初始路径点,结束路径点并存入文件,有起始位置,有终点位置,机器人就能使用move_base
动作服务器将机器人导航到每个路径点,运行时出现障碍物 激光雷达进行环境扫描 绘制出局部地图 进行自主避障
auto_slam.py
#!/usr/bin/env python
import rospyimport actionlib
import roslaunch
from actionlib_msgs.msg import *
from move_base_msgs.msg import MoveBaseAction, MoveBaseGoal
from nav_msgs.msg import Path
from std_msgs.msg import String
from geometry_msgs.msg import PoseWithCovarianceStamped
from tf_conversions import transformations
from xml.dom.minidom import parse
from math import pi
import tf
###定义对象
class navigation_demo:def __init__(self):self.set_pose_pub = rospy.Publisher('/initialpose', PoseWithCovarianceStamped, queue_size=5)self.move_base = actionlib.SimpleActionClient("move_base", MoveBaseAction)### 运动信息的节点(动作服务器)self.move_base.wait_for_server(rospy.Duration(60))###用于等待与动作服务器的连接建立。self.tf_listener = tf.TransformListener()### 监听阵列self.get_point = rospy.Publisher('get_pos', String, queue_size=5)self.plist = []self.success_count = 0def set_plist(self,plist):self.plist = plist## 初始化机器人姿态 def set_pose(self, p):if self.move_base is None:return Falsex, y, th = ppose = PoseWithCovarianceStamped()pose.header.stamp = rospy.Time.now()pose.header.frame_id = 'map'pose.pose.pose.position.x = xpose.pose.pose.position.y = yq = transformations.quaternion_from_euler(0.0, 0.0, th/180.0*pi)pose.pose.pose.orientation.x = q[0]pose.pose.pose.orientation.y = q[1]pose.pose.pose.orientation.z = q[2]pose.pose.pose.orientation.w = q[3]self.set_pose_pub.publish(pose)return True# 当导航行为完成时的回调函数def _done_cb(self, status, result):rospy.loginfo("navigation done! status:%d result:%s"%(status, result))# 当导航行为激活时的回调函数def _active_cb(self):rospy.loginfo("[Navi] navigation has be actived")# 导航过程中的反馈回调函数def _feedback_cb(self, feedback):rospy.loginfo("[Navi] navigation feedback\r\n%s"%feedback)def goto(self, p):goal = MoveBaseGoal()### 定义MoveBaseGoal对象 进行goal.target_pose.header.frame_id = 'map' ###建立坐标’### 设置goal的移动目标地点goal.target_pose.header.stamp = rospy.Time.now()goal.target_pose.pose.position.x = p[0]goal.target_pose.pose.position.y = p[1]goal.target_pose.pose.position.z = p[2]#q = transformations.quaternion_from_euler(0.0, 0.0, p[2]/180.0*pi)###欧拉数转化为四元数,三维空间的旋转方向goal.target_pose.pose.orientation.x = p[3]goal.target_pose.pose.orientation.y = p[4]goal.target_pose.pose.orientation.z = p[5]goal.target_pose.pose.orientation.w = p[6]### 发送导航目标,并指定回调函数self.move_base.send_goal(goal, self._done_cb, self._active_cb, self._feedback_cb)# 等待导航结果,超时时间为60秒result = self.move_base.wait_for_result(rospy.Duration(60))### 是否到达这个导航点print(result)state = self.move_base.get_state()if state == GoalStatus.SUCCEEDED:self.success_count += 1### 到达的导航点是否为最终目标点if len(self.plist) == self.success_count:rospy.loginfo("arrived goal point")self.get_point.publish("1")self.isSendVoice = Falsereturn Truedef cancel(self):self.move_base.cancel_all_goals()return True###定义回调函数
def callback(msg):###调用回调函数 向订阅话题发消息 就会调用回调函数doc = parse("/home/bcsh/waypoints.xml")### parse对象处理xml文档 Domroot_element = doc.documentElement###文档根结点points = root_element.getElementsByTagName("Waypoint")### 每个航点包含七个plist = []rospy.loginfo("set pose...")navi = navigation_demo() ##创建一个navigation_demo对象 for p in points:point = [0] * 7point[0] = float(p.getElementsByTagName("Pos_x")[0].childNodes[0].data)point[1] = float(p.getElementsByTagName("Pos_y")[0].childNodes[0].data)point[2] = float(p.getElementsByTagName("Pos_z")[0].childNodes[0].data)###三维空间旋转方向的四元数point[3] = float(p.getElementsByTagName("Ori_x")[0].childNodes[0].data)point[4] = float(p.getElementsByTagName("Ori_y")[0].childNodes[0].data)point[5] = float(p.getElementsByTagName("Ori_z")[0].childNodes[0].data)point[6] = float(p.getElementsByTagName("Ori_w")[0].childNodes[0].data)plist.append(point)print(plist)rospy.loginfo("goto goal...")navi.set_plist(plist)for waypoint in plist:#print(waypoint)navi.goto(waypoint)if __name__ == "__main__":rospy.init_node('auto_slam_node',anonymous=True)#### 初始化ROS节点,命名'auto_slam_node'rospy.Subscriber("auto_slam", String,callback)###订阅 "auto_slam" 话题并设置回调函数处理消息rospy.spin()r = rospy.Rate(0.2)# 创建一个rate对象以控制循环速率r.sleep()
首先第一步完成建图
关于waypoints.xml
创建完图之后,用Rviz 插件 waterplus_map_tools 通过输入指令进行航点标注,
三、 人脸识别
Take_photo.py
照片存放位置
Face_Rec.py
1.Take_photo.py
拍照 存储 调用人脸识别
TakePhoto类继承了之前 ROS 与 Opencv 接口类,在这个类里面我们重写了 process_imag 函数,使得该函数可以完成人脸识别功能。核心函数为 detectMultiScale 函数,这个函数实现了将视频中的人脸提取出来,反馈值为 faces,faces 是由多个数组组成,每个数组代表人脸在当前图像中的位置(x,y,w,h)分别代表人脸框的左上角点的坐标,人脸框的宽度和长度。
#!/usr/bin/env pythonimport rospy
import cv2
from ros_opencv import ROS2OPENCV
import sys, select, os# 定义一个类 TakePhoto,继承 ROS2OPENCV 类
class TakePhoto(ROS2OPENCV):def __init__(self, node_name): # 调用 ROS2OPENCV 类的构造函数super(TakePhoto, self).__init__(node_name)self.detect_box = None ##用于存储检测到的人脸的框的坐标信息。。self.result = None ###存储处理后的图像,其中人脸被矩形框标记self.count = 0 ##用于计数保存的人脸图像数量,初始化为 0,每次按下 'p' 键保存一张图像时递增。self.person_name = rospy.get_param('~person_name', 'name_one')self.face_cascade = cv2.CascadeClassifier('/home/bcsh/robot_ws/src/match_mini/scripts/cascades/haarcascade_frontalface_default.xml')###Haar 级联分类器 存放一组描述人脸特征的模型,用来识别人脸self.dirname = "/home/bcsh/robot_ws/src/match_mini/scripts/p1/" + self.person_name + "/"self.X = Noneself.Y = None# 定义图像处理函数def process_image(self, frame):# print("sss")src = frame.copy()##复制输入的图像帧,以便在不修改原始数据的情况下进行处理。gray = cv2.cvtColor(src, cv2.COLOR_BGR2GRAY)##将复制的图像帧转换为灰度图像,因为 Haar 级联分类器通常在灰度图像上执行人脸检测。faces = self.face_cascade.detectMultiScale(gray, 1.3, 5)###使用预训练的 Haar 级联分类器检测灰度图像中的人脸。detectMultiScale 返回一个包含检测到的人脸位置坐标的列表。result = src.copy() ###以便在上面绘制矩形框。self.result 保存了处理后的图像。self.result = result#### 遍历检测到的人脸,并在图像上画矩形框### 遍历检测到的人脸坐标,将每个人脸用蓝色矩形框标记在图像上。同时,如果按下 'p' 键并且 self.count 小于 20,将当前人脸图像保存到指定的目录,并递增 self.count。for (x, y, w, h) in faces:### 给人脸用矩阵框住 左上角,长度,宽度,颜色等参数 result = cv2.rectangle(result, (x, y), (x+w, y+h), (255, 0, 0), 2)f = cv2.resize(gray[y:y+h, x:x+w], (200, 200))##对存储图片尺寸进行处理if self.count<20:# 如果按下 'p' 键,保存人脸图像if key == 'p' :cv2.imwrite(self.dirname + '%s.pgm' % str(self.count), f)self.count += 1return resultif __name__ == '__main__':try:# 初始化节点并运行node_name = "take_photo_rec"TakePhoto(node_name)rospy.spin()except KeyboardInterrupt:print "Shutting down face detector node."
cv2.destroyAllWindows()
Face_Rec.py
#!/usr/bin/env python
# encoding: utf-8import sys,os,cv2
import numpy as npimport rospyfrom geometry_msgs.msg import Twist
from std_msgs.msg import Stringpub = rospy.Publisher('cmd_vel', Twist, queue_size = 1)speed = 0.3
turn = 1.0face_path = "/home/bcsh/robot_ws/src/match_mini/scripts/data"
face_name = ""def read_images(path, sz=None):c = 0X, y = [], []names = []for dirname, dirnames, filenames in os.walk(path):for subdirname in dirnames:subject_path = os.path.join(dirname, subdirname)for filename in os.listdir(subject_path):try:if (filename == ".directory"):continuefilepath = os.path.join(subject_path, filename)im = cv2.imread(os.path.join(subject_path, filename), cv2.IMREAD_GRAYSCALE)if (im is None):print("image" + filepath + "is None")if (sz is not None):im = cv2.resize(im, sz)X.append(np.asarray(im, dtype=np.uint8))y.append(c)except:print("unexpected error")raisec = c + 1names.append(subdirname)###函数返回一个包含主题名称(names)、图像数据(X)和相应标签(y)的列表。return [names, X, y]def face_rec():[names,X, y] = read_images(face_path)y = np.asarray(y, dtype=np.int32)#model = cv2.face_EigenFaceRecognizer.create()### 创建训练模型model = cv2.face.LBPHFaceRecognizer_create()model.train(np.asarray(X), np.asarray(y))face_cascade = cv2.CascadeClassifier('/home/bcsh/robot_ws/src/match_mini/scripts/cascades/haarcascade_frontalface_default.xml')cap = cv2.VideoCapture(0)###调用cv的图象识别### 大筐筐 视图cv2.namedWindow("face_detector",0) ##框框名字cv2.resizeWindow("face_detector",480,320)## 框框大小while True:ret, frame = cap.read()## frame 传过来的一帧图片### 对图片进行处理x, y = frame.shape[0:2]small_frame = cv2.resize(frame, (int(y / 2), int(x / 2)))result = small_frame.copy()gray = cv2.cvtColor(small_frame, cv2.COLOR_BGR2GRAY)faces = face_cascade.detectMultiScale(gray, 1.3, 5)##人脸在当前图像中的位置(x,y,w,h)for (x, y, w, h) in faces:### 小框框result = cv2.rectangle(result, (x, y), (x + w, y + h), (255, 0, 0), 2)roi = gray[y:y + h,x:x + w]try:roi = cv2.resize(roi, (200, 200), interpolation=cv2.INTER_LINEAR)### 模型预测 对新图像 p_label,p_confidence进行预测[p_label, p_confidence] = model.predict(roi)cv2.putText(result, names[p_label], (x, y - 20), cv2.FONT_HERSHEY_SIMPLEX, 1, 255, 2)print("p_confidence = " + str( ) +" name=" + names[p_label])if p_confidence<60 and names[p_label] == face_name:### 机器人停止位置有一段距离 所以因为距离误差,就得实际改变 置信度 因为要一直往一个方向走 所以p_confidence必须小于 所以只能实际情况确定来小于 机器人能识别置信度的最大值offset_x = ((x+w) / 2 - 240)target_area = w * h###摄像头看见人脸的目标区域linear_vel = 0angular_vel = 0print(target_area)## 到一定距离才能识别if target_area<100:linear_vel = 0.0elif target_area >110:linear_vel = 0.3else:linear_vel = 0.0if offset_x > 0:angular_vel = 0.1if offset_x < 0:angxular_vel = -0.1update_cmd(linear_vel,angular_vel)except:continue#update_cmd(linear_vel,angular_vel)cv2.imshow("face_detector", result)if cv2.waitKey(30) & 0xFF == ord('q'):breakcap.release()cv2.destroyAllWindows()def update_cmd(linear_speed, angular_speed):twist = Twist()twist.linear.x = 1*linear_speed; twist.linear.y = 1*linear_speed; twist.linear.z = 1*linear_speed;twist.angular.x = 0; twist.angular.y = 0; twist.angular.z = 1*angular_speedpub.publish(twist) def callback(msg):global face_pathglobal face_nameif msg.data == "liwei":face_name = "liwei"if msg.data == "yaom":face_name = "yaom"face_rec()if __name__ == "__main__":rospy.init_node('face_detector')rospy.Subscriber("auto_face", String, callback)###订阅消息 定义回调函数rospy.spin()
四、语音控制
voicecontroller.py
#!/usr/bin/env python
# -*- coding: UTF-8 -*-
import sys
reload(sys)
sys.setdefaultencoding('utf8')
import os
import rospyfrom respeaker_interface import x
from respeaker_audio import RespeakerAudio
from std_msgs.msg import Stringclass VoiceController(object):def __init__(self, node_name):self.node_name = node_namerospy.init_node(node_name)rospy.on_shutdown(self.shutdown)self.respeaker_interface = RespeakerInterface()self.respeaker_audio = RespeakerAudio()self.ask_pub = rospy.Publisher('cmd_msg', String, queue_size=5)def shutdown(self):self.respeaker_interface.close()self.respeaker_audio.stop()
def callback(msg):os.system("mpg123 /home/bcsh/robot_ws/src/match_mini/voice/zhuabu.mp3")if __name__ == '__main__':voice_controller = VoiceController("voice_controller")auto_slam = rospy.Publisher('auto_slam', String, queue_size=10) # 定义了话题对象auto_slam 发布话题的时候会调用话题的回调函数auto_face = rospy.Publisher('auto_face', String, queue_size=10) # 定义了话题对象auto_face 发布话题的时候会调用话题的回调函数rospy.Subscriber("get_pos", String,callback, queue_size=10)rate = rospy.Rate(100)isPub = Falsewhile not rospy.is_shutdown():text = voice_controller.respeaker_audio.record()### 记录音频输入流if text.find("开始") >= 0 and isPub is not True:auto_slam.publish("start")isPub = Trueif text.find("右") >= 0:print("send liwei to auto_face")auto_face.publish("liwei")elif text.find("偷") >= 0:print("send yaom to auto_face")auto_face.publish("yaom")direction = voice_controller.respeaker_interface.directionprint(text)print(direction)rate.sleep()
用到的类
RespeakerInterface
类用于与 Respeaker 设备进行通信
respeaker_audio.py
#!/usr/bin/env pythonimport pyaudio
from baidu_speech_api import BaiduVoiceApi
import json
import sys
import os
from aip.speech import AipSpeech
from contextlib import contextmanager# 重新设置默认字符编码为 utf-8
reload(sys)
sys.setdefaultencoding("utf-8")# 定义音频采样参数
CHUNK = 1024
RECORD_SECONDS = 5# 百度语音识别 API 的应用参数
APP_ID = '41721436'
API_KEY = 'QG7UA5m5YZC0PLTw3qWzh2Xd'
SECRET_KEY = 'Y9Q22OM13s2oXLzMUzETiQk96SX7Geq3'@contextmanager
def ignore_stderr(enable=True):"""用于忽略标准错误流的上下文管理器。"""if enable:devnull = Nonetry:devnull = os.open(os.devnull, os.O_WRONLY)stderr = os.dup(2)sys.stderr.flush()os.dup2(devnull, 2)try:yieldfinally:os.dup2(stderr, 2)os.close(stderr)finally:if devnull is not None:os.close(devnull)else:yieldclass RespeakerAudio(object):def __init__(self, channel=0, suppress_error=True):"""初始化 RespeakerAudio 类。"""# 忽略标准错误流以避免输出 PyAudio 警告信息with ignore_stderr(enable=suppress_error):self.pyaudio = pyaudio.PyAudio()# 初始化音频参数和设备信息self.channels = Noneself.channel = channelself.device_index = Noneself.rate = 16000self.bitwidth = 2self.bitdepth = 16# 查找 Respeaker 设备count = self.pyaudio.get_device_count()for i in range(count):info = self.pyaudio.get_device_info_by_index(i)name = info["name"].encode("utf-8")chan = info["maxInputChannels"]# 如果设备名中包含 "respeaker",则认为是 Respeaker 设备if name.lower().find("respeaker") >= 0:self.channels = chanself.device_index = ibreak # 如果没有找到 Respeaker 设备,则使用默认输入设备if self.device_index is None:info = self.pyaudio.get_default_input_device_info()self.channels = info["maxInputChannels"]self.device_index = info["index"]# 确保选择的通道在有效范围内self.channel = min(self.channels - 1, max(0, self.channel))# 打开音频输入流self.stream = self.pyaudio.open(rate=self.rate,format=self.pyaudio.get_format_from_width(self.bitwidth),channels=1,input=True,input_device_index=self.device_index,)# 初始化百度语音 APIself.aipSpeech = AipSpeech(APP_ID, API_KEY, SECRET_KEY)self.baidu = BaiduVoiceApi(appkey=API_KEY, secretkey=SECRET_KEY)def stop(self):"""停止音频输入流。"""# 停止音频输入流self.stream.stop_stream()self.stream.close()self.stream = None# 终止 PyAudioself.pyaudio.terminate()def generator_list(self, lst):"""生成列表的生成器。"""for l in lst:yield ldef record(self):"""录制音频并发送到百度语音识别 API 进行识别。"""# 启动音频输入流self.stream.start_stream()print("* recording")frames = [] # 用于存储音频帧# 录制指定的音频for i in range(0, int(self.rate / CHUNK * RECORD_SECONDS)):data = self.stream.read(CHUNK)frames.append(data)print("done recording")# 停止音频输入流self.stream.stop_stream()print("start to send to Baidu")# 将录制的音频发送到百度语音识别 API 进行识别text = self.baidu.server_api(self.generator_list(frames))# 解析识别结果并返回if text:try:text = json.loads(text)#### for t in text['result']:print(t)return str(t)except KeyError:return "get nothing"else:print("get nothing")return "get nothing"if __name__ == '__main__':# 创建 RespeakerAudio 实例snowman_audio = RespeakerAudio()# 持续录制并输出识别结果while True:text = snowman_audio.record()