人脸识别与检测(保姆级教程–附带源码)
项目背景
因项目需要招聘了一些日结工人,因此需要对工地现场的工人进行考勤管理,但工地只有海康摄像头没有专业考勤设备,因此需要基于视频流开发人脸识别与检测功能;因之前没有做个做一块所以再技术选型上面要具有,易用性、跨平台、不用训练,等等。因此我采用了face_recognition;
技术介绍
face_recognition
是一个流行的Python库,专门用于执行面部识别任务。它是基于dlib
的深度学习模型构建的,特别是基于深度卷积神经网络的模型。
一、人脸录入
要进行人脸识别与检测,首先就是要将图像中人像的面部编码提取出来,并保存到数据库中。
import os
import face_recognition
import mysql.connector
from mysql.connector import Error
import cv2
import numpy as np
import logginglogging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')model_path = "../ckpt/face/res10_300x300_ssd_iter_140000_fp16.caffemodel"
det = "../ckpt/face\deploy.prototxt"# 加载预训练的人脸检测模型
face_detector = cv2.dnn.readNetFromCaffe(det, model_path)# 数据库配置
db_config = {'user': 'root','password': 'cqccc132645!!','host': '10.10.12.72','database': 'ai_platform','buffered': True
}def connect_to_database():try:connection = mysql.connector.connect(**db_config)if connection.is_connected():print("成功连接到MySQL数据库")return connectionexcept Error as e:print(f"连接数据库时出错: {e}")return Nonedef check_table_structure(connection):try:cursor = connection.cursor()check_table_query = """SELECT COUNT(*) FROM information_schema.columns WHERE table_schema = 'ai_platform' AND table_name = 'faces' AND column_name IN ('id', 'name', 'embedding', 'image')"""cursor.execute(check_table_query)count = cursor.fetchone()[0]if count == 4:print("faces表结构正确")else:print("faces表结构不正确,请检查数据库")return Falsereturn Trueexcept Error as e:print(f"检查表结构时出错: {e}")return Falsedef insert_face(connection, name, face_embedding, face_image):try:cursor = connection.cursor()insert_query = "INSERT INTO faces (name, embedding, image) VALUES (%s, %s, %s)"cursor.execute(insert_query, (name, face_embedding.tobytes(), face_image))connection.commit()print(f"成功插入 {name} 的人脸数据")except Error as e:print(f"插入人脸数据时出错: {e}")def crop_face(image, face_location):top, right, bottom, left = face_locationreturn image[top:bottom, left:right]def load_known_faces(connection):known_face_encodings = []known_face_names = []try:cursor = connection.cursor()query = "SELECT name, embedding FROM faces"cursor.execute(query)for (name, face_encoding) in cursor:known_face_encodings.append(np.frombuffer(face_encoding, dtype=np.float64))known_face_names.append(name)logging.info(f"从数据库加载了 {len(known_face_names)} 个人脸")except Error as e:logging.error(f"加载人脸数据时出错: {e}")finally:cursor.close()return known_face_encodings, known_face_namesdef register_faces(image_folder):connection = connect_to_database()if connection is None:returnif not check_table_structure(connection):connection.close()returnknown_face_encodings, known_face_names = load_known_faces(connection)for filename in os.listdir(image_folder):if filename.endswith((".jpg", ".jpeg", ".png")):image_path = os.path.join(image_folder, filename)name = os.path.splitext(filename)[0] # 使用文件名作为人名# 使用 face_recognition 库读取图像,它可以处理中文路径image = face_recognition.load_image_file(image_path)rgb_image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)face_locations = face_recognition.face_locations(rgb_image)face_encodings = face_recognition.face_encodings(rgb_image, face_locations)if face_encodings:face_encoding = face_encodings[0] # 假设每张图片只有一张脸face_location = face_locations[0]# 裁剪人脸区域face_image = crop_face(image, face_location)# 将图像编码为JPEG格式_, buffer = cv2.imencode('.jpg', face_image)face_image_bytes = buffer.tobytes()face_distances = face_recognition.face_distance(known_face_encodings, face_encoding)best_match_index = np.argmin(face_distances)if face_distances[best_match_index] < 0.5:name2 = known_face_names[best_match_index]print(f"{name} 与 {known_face_names[best_match_index]} 的距离为 {face_distances[best_match_index]}")print(f"{name} 已存在于数据库中,{name2}")else:insert_face(connection, name, face_encoding, face_image_bytes)else:print(f"在 {filename} 中没有检测到人脸")connection.close()if __name__ == "__main__":image_folder = "../face/train/ces/" # 替换为实际的图片文件夹路径register_faces(image_folder)
二、人脸检测
除了前面讲到的使用了face_recognition,我还使用到了res10_300x300_ssd_iter_140000_fp16.caffemodel
深度学习模型文件,用于面部检测,它是一个使用ResNet作为基础网络,输入尺寸为300x300像素,经过140,000次迭代训练,并以半精度浮点数格式存储的SSD对象检测模型,deploy.prototxt配合使用。
整个流程就是,先加载模型文件,再数据库中取出之前录入的人脸编码,随后是从视频流中取出视频帧(每隔10帧进行一次处理)进行检测与数据库中人类编码进行对比,如果是数据库中的人脸就将名字打印出来并进行标识,反之着直接跳过,命名为Unknown。其次为了加速检测效率,因此我采用了多线程的每一个视频流就添加一个线程。
import cv2
import face_recognition
import mysql.connector
import numpy as np
from mysql.connector import Error
from threading import Thread
import time
import logging
from PIL import Image,ImageDraw,ImageFont# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
# logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s')
# 数据库配置
db_config = {'user': 'root','password': 'cqccc132645!!','host': '10.10.12.72','database': 'ai_platform','buffered': True
}# model_path = "D:\work\project\py\face_detection\ckpt\face\res10_300x300_ssd_iter_140000_fp16.caffemodel"
model_path = "../ckpt/face/res10_300x300_ssd_iter_140000_fp16.caffemodel"
det = "../ckpt/face\deploy.prototxt"# 加载预训练的人脸检测模型
face_detector = cv2.dnn.readNetFromCaffe(det, model_path)def connect_to_database():try:connection = mysql.connector.connect(**db_config)if connection.is_connected():logging.info("成功连接到MySQL数据库")return connectionexcept Error as e:logging.error(f"连接数据库时出错: {e}")return Nonedef load_known_faces(connection):known_face_encodings = []known_face_names = []try:cursor = connection.cursor()query = "SELECT name, embedding FROM faces"cursor.execute(query)for (name, face_encoding) in cursor:known_face_encodings.append(np.frombuffer(face_encoding, dtype=np.float64))known_face_names.append(name)logging.info(f"从数据库加载了 {len(known_face_names)} 个人脸")except Error as e:logging.error(f"加载人脸数据时出错: {e}")finally:cursor.close()return known_face_encodings, known_face_namesdef detect_faces(frame, face_detector):start_time = time.time()height, width = frame.shape[:2]blob = cv2.dnn.blobFromImage(cv2.resize(frame, (300, 300)), 1.0, (300, 300), (104.0, 177.0, 123.0))face_detector.setInput(blob)detections = face_detector.forward()face_locations = []for i in range(detections.shape[2]):confidence = detections[0, 0, i, 2]if confidence > 0.8:box = detections[0, 0, i, 3:7] * np.array([width, height, width, height])(startX, startY, endX, endY) = box.astype("int")face_locations.append((startY, endX, endY, startX))end_time = time.time()logging.debug(f"Face detection time: {end_time - start_time:.4f} seconds")return face_locationsdef process_frames(frames_buffer, known_face_encodings, known_face_names, camera_id, face_detector):processed_frames = []faces_detected = Falsefor frame in frames_buffer:face_locations = detect_faces(frame, face_detector)if face_locations:face_encodings = face_recognition.face_encodings(frame, face_locations)for face_encoding in face_encodings:face_distances = face_recognition.face_distance(known_face_encodings, face_encoding)best_match_index = np.argmin(face_distances)if face_distances[best_match_index] < 0.6:name = known_face_names[best_match_index]else:name = "Unknown"logging.info(f"摄像头 {camera_id} 检测到 {name}")faces_detected = Truefor (top, right, bottom, left) in face_locations:cv2.rectangle(frame, (left, top), (right, bottom), (0, 0, 255), 2)cv2.rectangle(frame, (left, bottom - 35), (right, bottom), (0, 0, 255), cv2.FILLED)# 显示中文# frame = cv2_img_add_text(frame, name, left + 6, bottom - 30, text_color=(255, 255, 255),# text_size=20)# font = cv2.FONT_HERSHEY_DUPLEX# cv2.putText(frame, name, (left + 6, bottom - 6), font, 0.5, (255, 255, 255), 1)processed_frames.append(frame)return processed_frames, faces_detecteddef video_stream_thread(stream_url, known_face_encodings, known_face_names):camera_id = stream_url.split('@')[1].split('/')[0]# camera_id = stream_url.split('_')[1] # 提取IP地址作为摄像头IDcap = cv2.VideoCapture(stream_url)if not cap.isOpened():logging.error(f"无法打开视频流: {camera_id}")returnframe_count = 0start_time = time.time()faces_detected = Falseframes_buffer = []while True:ret, frame = cap.read()if not ret:logging.warning(f"无法从摄像头 {camera_id} 获取帧,尝试重新连接...")cap.release()time.sleep(5)cap = cv2.VideoCapture(stream_url)if not cap.isOpened():logging.error(f"无法重新连接到摄像头: {camera_id}")breakcontinueframe_count += 1frames_buffer.append(frame)if len(frames_buffer) == 10: # 每10帧处理一次# processed_frames, faces_detected = process_frames(frames_buffer, known_face_encodings, known_face_names, camera_id)processed_frames, faces_detected = process_frames(frames_buffer, known_face_encodings, known_face_names,camera_id, face_detector)frames_buffer = []# # 显示最后一帧(如果需要的话)# cv2.imshow(f'Stream: {camera_id}', processed_frames[-1])## if cv2.waitKey(1) & 0xFF == ord('q'):# break# 每100帧计算和显示FPSif frame_count % 100 == 0:end_time = time.time()fps = frame_count / (end_time - start_time)if faces_detected:logging.info(f"摄像头 {camera_id}, FPS: {fps:.2f}, 检测到人脸")else:logging.info(f"摄像头 {camera_id}, FPS: {fps:.2f}, 未检测到人脸")cap.release()cv2.destroyAllWindows()def cv2AddChineseText(img, text, position, textColor, textSize):if (isinstance(img, np.ndarray)): # 判断是否OpenCV图片类型img = Image.fromarray(cv2.cvtColor(img, cv2.COLOR_BGR2RGB))draw = ImageDraw.Draw(img)fontStyle = ImageFont.truetype("C:/WINDOWS/FONTS/ARLRDBD.TTF", textSize, encoding="utf-8")# 绘制文本draw.text(position, text, textColor, font=fontStyle)# 转换回OpenCV格式return cv2.cvtColor(np.asarray(img), cv2.COLOR_RGB2BGR)def cv2_img_add_text(img, text, left, top, text_color=(255, 255, 255), text_size=20):if isinstance(img, np.ndarray): # 判断是否OpenCV图片类型img = Image.fromarray(cv2.cvtColor(img, cv2.COLOR_BGR2RGB))draw = ImageDraw.Draw(img)font_style = ImageFont.truetype(r"C:\ProgramData\kingsoft\office6\muifont\GBK\FZXBSK.TTF", text_size, encoding="utf-8") # 请替换为你的中文字体路径draw.text((left, top), text, text_color, font=font_style)return cv2.cvtColor(np.asarray(img), cv2.COLOR_RGB2BGR)def main():# 检查CUDA是否可用,如果可用则使用GPUif cv2.cuda.getCudaEnabledDeviceCount() > 0:face_detector.setPreferableBackend(cv2.dnn.DNN_BACKEND_CUDA)face_detector.setPreferableTarget(cv2.dnn.DNN_TARGET_CUDA)logging.info("使用CUDA进行GPU加速")print("使用CUDA进行GPU加速")else:logging.warning("CUDA不可用,使用CPU进行处理")connection = connect_to_database()if connection is None:returnknown_face_encodings, known_face_names = load_known_faces(connection)connection.close()video_streams = ["rtsp://xxxxxxxxx/cam/realmonitor?channel=1&subtype=1",# 添加更多视频流URL]threads = []for stream_url in video_streams:thread = Thread(target=video_stream_thread, args=(stream_url, known_face_encodings, known_face_names))thread.start()threads.append(thread)for thread in threads:thread.join()if __name__ == "__main__":main()
三、加速
我发现在python中opencv-python并不支持GPU加速仅支持CPU,如果需要GPU加速需求自己下载opencv源码用cmake和VS2019进行编译,可以参考这篇文章https://blog.csdn.net/yangyu0515/article/details/133794355
至于有没有成功,复制这个代码就知道了,如果cv2.cuda.getCudaEnabledDeviceCount()大于0就是成功了
import cv2# 检查CUDA是否可用,如果可用则使用GPUif cv2.cuda.getCudaEnabledDeviceCount() > 0:print("使用CUDA进行GPU加速")else:logging.warning("CUDA不可用,使用CPU进行处理")
opencv源码 百度网盘地址:
链接:https://pan.baidu.com/s/1JzgFHmtnV6pScZLInoa6oA?pwd=u46d
提取码:u46d
若有需求的可以联系我
QQ: 2375255615