OSError [WinError 1455]页面文件太小,无法完成操作”解决方案“_深度学习_yangshejun-GitCode 开源社区 (csdn.net)
python对RabbitMQ的简单使用_python rabbitmq-CSDN博客
【Windows安装RabbitMQ详细教程】_rabbitmq windows-CSDN博客
Windows 10安装Minio 文件服务器-CSDN博客
发送到minio服务器
import collections
import os
import time
import cv2
import numpy as np
from ultralytics import YOLO
from threading import Thread, Lock
import minio
minio_client = minio.Minio('10.160.86.76:9000',access_key='minioadmin',secret_key='minioadmin',secure=False)
shared_model_1 = YOLO("yolov8n.pt")
model_results = {shared_model_1: []}
lock = Lock()
os.makedirs('stream1/images', exist_ok=True)
os.makedirs('stream1/videos', exist_ok=True)
frame_rate = 30
record_length = 10 # seconds
target_class_value1 = 0 # e.g. class 1
results_list1 = collections.deque(maxlen=frame_rate * record_length)
def predict(model, image_path):results = model.predict(image_path, show=True, stream=True)last_save_time1 = time.time()with lock:model_results[model] = resultsfor result in model_results[model]:print('model_results[model]', model_results[model])if model == shared_model_1:boxes = result.boxesresults_list1.append(result)output_dir = 'stream1'if boxes.cls.numel() != 0:cls_tensor = boxes.clscls_list = cls_tensor.cpu().numpy().tolist()else:cls_list = [99999]if target_class_value1 in cls_list and (time.time() - last_save_time1) > record_length:timestamp = int(time.time())img_filepath = f"{output_dir}/images/{timestamp}.jpg"cv2.imwrite(img_filepath, result.plot(boxes=True))with open(img_filepath, 'rb') as file_data:file_stat = os.stat(img_filepath)minio_client.put_object('iot-db-img', f'{timestamp}.jpg', file_data,file_stat.st_size)last_save_time1 = time.time()Thread(target=predict, args=(shared_model_1, "0")).start() # " "填写地址
import boto3
import pika
import time# 初始化Minio客户端
s3 = boto3.client('s3',endpoint_url='http://10.160.86.76:9000', # 这需要替换成你的Minio服务器地址aws_access_key_id='minioadmin', # 这需要替换成你的access keyaws_secret_access_key='minioadmin') # 这需要替换成你的secret key# 初始化RabbitMQ连接和通道
connection = pika.BlockingConnection(pika.URLParameters('amqp://guest:guest@localhost:5672/%2F')) # 这需要替换成你的RabbitMQ服务器信息
channel = connection.channel()# 声明队列
channel.queue_declare(queue='image_urls')# 获取bucket中的初始对象列表
response = s3.list_objects(Bucket='iot-db-img') # 你需要把bucket's name替换为你自己的
old_keys = {content['Key'] for content in response.get('Contents', [])}while True:# 循环获取新的对象列表response = s3.list_objects(Bucket='iot-db-img')new_keys = {content['Key'] for content in response.get('Contents', [])}# 找出新添加的图片added_keys = new_keys - old_keysfor key in added_keys:image_url = f"http://10.160.86.76:9000/iot-db-img/{key}" # 这需要替换成你的Minio服务器和bucket信息# 发送新图片的URL到RabbitMQchannel.basic_publish(exchange='',routing_key='image_urls',body=image_url)# 更新旧的图片列表old_keys = new_keys# 每次循环后休息一段时间以免对服务器造成过大压力time.sleep(1)# 关闭RabbitMQ连接
connection.close()
接收端测试
import os
import pika
import requests# establishes a connection with RabbitMQ
connection = pika.BlockingConnection(pika.URLParameters('amqp://guest:guest@localhost:5672/%2F'))
channel = connection.channel()# declare queue
channel.queue_declare(queue='image_urls')def callback(ch, method, properties, body):image_url = body.decode()image_name = os.path.basename(image_url)response = requests.get(image_url)# Check if the response is okif response.status_code == 200:# save the image to the localwith open(os.path.join(r'E:\ultralytics-main\rabbit\rabbit', image_name), 'wb') as f:f.write(response.content)print(f"Image saved: {image_name}")else:print(f"Unable to download image, response code: {response.status_code}, url: {image_url}")ch.basic_ack(delivery_tag=method.delivery_tag)# start the message consumer
channel.basic_consume(queue='image_urls', on_message_callback=callback)print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()