RabbitMQ + Spring Boot + Python的使用过程

需求:后端执行Pytorch框架下的模型,对输入图像的评估,得到一个分数。

首先,实现Java和Python的交互,参考以下资料:

spring boot 项目实现调用python工程的方法_springboot中可以用python吗-CSDN博客

有五种方法:

  1. 执行 Python 脚本:以终端cmd的方式运行.py文件。
  2. Jython:一个 Python 的 Java 实现。(只支持 Python 2.x)
  3. 使用 Web 服务:将 Python 脚本或应用封装为一个 Web 服务,然后通过 HTTP 请求进行交互。
  4. 使用消息队列:实现 Java 和 Python 之间进行异步通信。优点:支持高并发,解耦合。
  5. 使用 gRPC 或 Thrift :使用 gRPC 或 Apache Thrift 进行跨语言的 RPC(远程过程调用)。

尝试一:使用 Runtime 执行 Python 脚本

注:ProcessBuilder 不支持第三方库的 Python 脚本运行。

以下为简单实现的例子,没有考虑执行失败情况。

 public double getModel(String imageDir) throws Exception {Process process = Runtime.getRuntime().exec("绝对地址\\envs\\VEnet\\python.exe 绝对地址\\model.py 绝对地址\\example.jpg");BufferedReader in = new BufferedReader(new InputStreamReader(process.getInputStream(), "GBK"));String line = null;Double score = 0.0; // 记录得分while ((line = in.readLine()) != null) {System.out.println(line);score = Double.parseDouble(line);}in.close();int re = process.waitFor(); // re表示Python执行的结果return score;

其中,re 用来让主线程等待子线程进行完毕。最终 re 为0或者1,表示子线程是否执行成功。我使用的是conda的虚拟环境,所以用了虚拟环境的python.exe所在的绝对路径。

此外,还可以执行 conda activate 虚拟环境名 && python ...\Model.py ...\example.jpg

此外,应该也可以把Python默认的系统路径,从base环境改成虚拟环境,从而直接执行 python ...\Model.py ...\example.jpg。(未尝试)

然而,这种方法不具备高并发的性能,每次请求都需要配置Python环境(Pytorch)、下载模型,耗费了很多没必要的资源。且耦合度高。所以考虑另外高并发的方法。

尝试二:使用消息队列

1.选择RabbitMQ

主要有四种实现方式,RabbitMQ、Apache Kafka和ActiveMQ,资料是:

一文讲清RabbitMQ、Apache Kafka、ActiveMQ_activemq kafaka-CSDN博客

消息中间件(MQ)对比:RabbitMQ、Kafka、ActiveMQ 和 RocketMQ_mq对比-CSDN博客

  1. 信息传递模式
    1. RabbitMQ和ActiveMQ使用传统消息模型,非常适合需要严格排序和可靠交付消息的应用程序。
    2. Kafka使用发布/订阅消息模型,更适合流数据场景,需要实时处理数据。
  2. 性能
    1. RabbitMQ被设计为可靠的消息系统,这意味着它优先考虑消息传递而不是性能。RabbitMQ可以处理中等消息速率,适用于需要严格排序和可靠传递消息的应用程序。
    2. Kafka被设计为高性能系统,可以处理大量数据并具有低延迟。Kafka通过使用分布式架构和优化顺序I/O来实现这种性能。
    3. ActiveMQ也被设计为高性能系统,可以处理高消息速率。ActiveMQ通过使用异步架构和优化消息批处理来实现这种性能。

开始以为是流数据场景,尝试了Kafka。后来意识到,还需要把模型的输出值返回回来,所以应该是可靠通信。然后说RabbitMQ简单易用,适合初学者,所以果断采用RabbitMQ。

2. 学习

6种消息模型

  1. 基本消息模型:
    1. 1个生产者,1个消费者。
    2. 有消息确认机制(ACK) 
  2. work消息模型:
    1. 和简单队列模式基本一样,不过有一点不同,该模式有多个消费者在监听队列。
    2. 以轮询的方式将消息发给多个消费者确保一条消息只会被一个消费者消费。
    3. 任务分发默认使用的是公平队列调度的原则。
    4. 不需要设置队列和交换机的绑定,因为这个模式会将队列绑定到默认的交换机 。
  3. Publish/subscribe(发布订阅模式):交换器类型是 Fanout
    1. 和上面2种模式默认提供交换机不同的是,该模式需要显示声明交换机
    2. 生产者:声明Exchange,不再声明Queue。 发送消息到Exchange,不再发送到Queue。即,生产者没有将消息直接发送给队列,而是发送给exchange(交换机、转发器)
    3. 交换机:将消息转发给与自己绑定的所有队列,实现一个消息被多个消费者消费。
    4. 消费者监听指定的队列获得消息。每个队列可以有多个消费者监听,同样也是以轮询的机制发给消费者。所以,多个消费端监听同一个队列不会重复消费消息。
    5. 注:Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!
    6. 这个模式需要设置队列和交换机的绑定。
  4. Routing 路由模型:交换机类型是 Direct
    1. 交换机:接收生产者的消息,然后把消息递交给与路由键(routing key)完全匹配的队列
    2. 消费者所在队列:可以指定多个路由键(routing key)
    3. 生产者:发送消息时需要声明路由键(routing key)
  5. Topics 通配符模式:交换机类型是 Topics
    1. 每个消费者监听自己的队列,并且设置带统配符的routingkey,
    2. 生产者将消息发给broker
    3. 交换机根据routingkey来转发消息到指定的队列。
    4. Broker:消息队列服务进程,此进程包括两个部分:Exchange和Queue。
  6. RPC 模型:
    1. 基本概念:Callback queue 回调队列、Correlation id 关联标识

4种交换机

  1. Direct Exchange:定向(路由)。交换机通过路由键(routing key)把消息路由到指定的队列。相当于路由键是队列的名字。发送消息时,需要设置路由键。
  2. Topic Exchange:交换机通过通配符匹配路由键和绑定键的关系。把消息交给符合路由模式(routing pattern) 的队列
  3. Fanout Exchange:交换机把消息广播给路由机绑定的所有队列
  4. Headers Exchange:交换机通过消息的headers属性的键值对(key/value)来确定消息的路由。每个队列有一组键值对。当发送消息时,需要在headers属性中设置一组键值对。如果消息的headers中包含了指定的键值对,则该消息将被路由到该队列中。
    1. x-match= all:当消息的所有键值对与绑定的键值对匹配时,才会将消息路由到绑定的队列。这相当于“与”逻辑。如果绑定中没有任何键值对,则所有消息都会被路由到与该绑定相关联的队列。
    2. x-match= any:当消息中的至少一个键值对与绑定的键值对匹配时,就会将消息路由到绑定的队列。这相当于“或”逻辑。如果绑定中没有任何键值对,则没有消息会被路由到与该绑定相关联的队列。

rabbitmq RPC 交换机 rabbitmq几种交换机_mob64ca14122c74的技术博客_51CTO博客 详细解释了交换机。

消息队列可以解决什么问题呢?

  • 业务解耦:A系统需要耦合B、C、D系统,在消息队列之前可以通过共享数据、接口调用等方式来实现业务,现在可以通过消息中间件进行解耦。

  • 削峰填谷:在互联网经常会出现流量突然飙升的情况,以前很多时候就是通过性能优化、加服务器等方式,可以通过消息中间件缓存相关任务,然后按计划的进行处理。

  • 异步:可以通过消息推送及短信发送进行说明,业务平台并不关注具体消息的发送细则,完全可以通过消息队列的方式,直接下发任务,由任务消费者进行处理。

以上都来这两篇文章: 

RabbitMQ介绍 + python操作 - dongye95 - 博客园 (cnblogs.com) 写的很好,面试前看这个。

Python角度介绍RabbitMQ。

还介绍了高级特性:过期时间、消息确认、持久化、死信队列、延迟队列。

rabbitmq RPC 交换机 rabbitmq几种交换机_mob64ca14122c74的技术博客_51CTO博客 

Spring Boot角度介绍RabbitMQ。

还介绍了消息持久化、延迟发送(TTL机制和rabbitmq插件两种方式)、可靠性发送与接收。

RabbitTemplate在Spring中的所有方法:

Rabbittemplate所有方法.简介.-CSDN博客

RabbitMQ在Python中的常见错误:

RabbitMq使用中常见错误小结_pika.exceptions.probableauthenticationerror: conne-CSDN博客

pika库的错误我基本都遇到了,可以加深对代码的理解。

3. 准备工作

3.1 安装和配置RabbitMQ

RabbitMQ安装教程(非常详细)从零基础入门到精通,看完这一篇就够了_rabbitmq安装详细教程-CSDN博客

注:15672端口是图形化界面的,而RabbitMQ服务仍然是在默认端口5672上。如果想用新增用户登入图形化界面,需要给新增的用户添加管理员权限。

此外,这篇文章提到:web管理界面把消息内容序列化了(因为它默认使用的还是jdk的序列化的默认序列化器),所以他介绍了如何把web管理界面的默认序列化器更改为json类型的序列化器。这样,我们在web管理界面看消息会更直观。

3.2 Spring Boot端的准备

在Spring Boot引入maven相关依赖:

SpringBoot学习之路---使用RabbitTemplate操作RabbitMq_rabbittemplate用法-CSDN博客

然后,SpringBoot会自动帮我们注入RabbitTemplate。

在yml文件配置 RabbitMQ 的信息:

rabbitmq:username: xxxpassword: xxxaddresses: 127.0.0.1:5672

3.3 Python端的准备

在Python端直接 conda install pika

3.4 模式的思考

尝试简单模式:Spring发送消息。Python接收消息,处理消息,再发送消息。二者都有监听器。这种做法只适用于单线程。在多线程中,如果只使用一个队列,那么不能保证多线程的数据的准确传输;如果每个线程都创建一个队列,那么会造成资源浪费。

后来查阅资料,发现RPC模式非常合适。

2.RPC模式

Spring端

参考了:

RabbitMQ学习整理————基于RabbitMQ实现RPC_spring mvc rabbit mq 如何事项rpc-CSDN博客

他采用了rabbitTemplate.sendAndReceive方法,该方法有三个参数:第一个是交换机(exchange)的名字,第二个是路由键(我感觉就是队列的意思)的名字,第三个则为消息的内容。(注:RabbitMQ中所有的消息都要先通过交换机,空字符串表示使用默认的交换机)

以下我的Spring Boot端的代码:

@Service
public class GetModelRabbitMQService {@Autowiredprivate AmqpTemplate rabbitTemplate;private static final String s = "image2Model";public String sendMessage(String imageDir) {// 设置correlationIdString corrId = UUID.randomUUID().toString();MessageProperties messageProperties = MessagePropertiesBuilder.newInstance().setCorrelationId(corrId).build();Message message = new Message(imageDir.getBytes(StandardCharsets.UTF_8), messageProperties);System.out.println("Spring要发出咯~~"+message);Message response = rabbitTemplate.sendAndReceive("", s, message);if(response == null){System.out.println("没有收到哟~~~");return null;}else{String res = new String(response.getBody(), StandardCharsets.UTF_8);System.out.println("Spring收到咯~~"+res);return res;}}
}

其中,客户端在等待回调队列里的数据时,如果有消息出现,它会检查 correlation_id 属性。如果此属性的值与请求匹配,就返回给应用。所以,能从回调队列中得到数据,就说明id一致。

Python端

参考了:

python对RabbitMQ的简单使用_python rabbitmq-CSDN博客

他实现了简单模式、发布订阅模式和RPC模式,我参考了RPC模式,以下是我的代码

if __name__ == '__main__':# 连接到RabbitMQ服务器user_info = pika.PlainCredentials('xxx', 'xxx')  # 用户名和密码connection = pika.BlockingConnection(pika.ConnectionParameters('127.0.0.1', 5672, '/', user_info))channel = connection.channel()# 声明持久化队列queue_name = 'image2Model'channel.queue_declare(queue=queue_name ,durable=True) # durable=True,声明队列是持久化。# 清空队列,简单模式的持久化需要,RPC模式不需要# channel.queue_purge(queue=queue_name)def on_request(ch, method, props, body):body = body.decode()print('body decode后:', body)response = str(Train(body))ch.basic_publish(exchange='',routing_key=props.reply_to,  # props.reply_to 把消息发送到用来返回消息的queueproperties=pika.BasicProperties(correlation_id=props.correlation_id),body=response,)ch.basic_ack(delivery_tag=method.delivery_tag)channel.basic_qos(prefetch_count=1)  # 一次处理一个队列# auto_ack指定为True,表示消息接收到后自动给消息发送方回复确认,已收到消息channel.basic_consume(queue=queue_name, on_message_callback=on_request)print(' 开始监听. To exit press CTRL+C')channel.start_consuming()

交换机默认是持久的,队列中的消息属性默认是持久的,即 properties=pika.BasicProperties(delivery_mode=2),其中,1表示非持久。所以,我采用了都设置为持久的方法,即在声明queue的时候,设置queue为持久的,不然无法运行。以下讲到了具体的持久化操作(需要重启服务):RabbitMQ基础学习_rabbitmq channel.basic_qos-CSDN博客

如果auto_ack设置为True,需要手动给消息发送方回复确认。这样,如果程序没有成功运行,可以返回一个错误信息给到客户端。默认是False,即自动回复,然后从内存或者硬盘中删除。 在简单模式中,如果设置了持久化和手动回复,而没有手动回复(只进行了重新发送其他数据,没有再进行手动回复)。每次Python服务启动,都会把之前队列的消息再重新读取和处理。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/861.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

吃鸡游戏msvcp140.dll丢失的解决方法

msvcp140.dll 是一个与 Microsoft Visual C Redistributable 相关的动态链接库(DLL)文件,是 Windows 操作系统中众多应用程序正常运行所必需的关键组件之一。以下是对 msvcp140.dll 文件的总体介绍和msvcp140.dll丢失的多个解决方案分享。 *…

C++高级特性:nullptr和NULL的区别(十)

1、NULL和0 在C语言标准定义中,NULL的定义为(void *)0 这样的代码意味着可以进行强制类型转换一个void *类型的指针到任意类型的指针 #define NULL (void*)0 char *p NULL;C11之后NULL被定义为0,此时可以认为NULL不完全是一个空指针。 #ifndef __cplusp…

ROS仿真小车(一)—— urdf模型+rviz可视化

文章目录 前言一、创建功能包二、urdf文件三、launch文件四、图形化显示五、RVIZ可视化总结参考文献 前言 ROS学习过程记录,从零开始仿真一辆小车,之后会实现运动控制、雷达、相机等 部分代码已上传至本人的GitHub,如果需要请自行下载&…

算法题解记录18+++搜索二维矩阵Ⅱ

本题可以说是运用二分查找的典例,即使是面对矩阵,只要是它保持“排序好”这样的结构,就一定能采用二分查找法。【你知道的,对于排序好的数组,二分查找几乎是最优秀的算法】 当然,答案提供的是“Z字形查找法…

实在Agent:超自动化智能体的革命(附导引指南)

在自动化技术的浪潮中,实在智能推出了实在Agent(智能体),一款基于大语言模型和屏幕语义理解技术的超自动化智能体。它通过自然对话交互,将复杂工作简化为一句话指令,自动规划并执行工作任务,极大…

C++奇迹之旅:深入理解赋值运算符重载

文章目录 📝赋值运算符重载🌠 运算符重载🌉特性 🌠 赋值运算符重载🌠传值返回:🌠传引用赋值:🌉两种返回选择🌉赋值运算符只能重载成类的成员函数不能重载成全…

【御控物联】物联网数据传输数据格式

随着物联网技术的快速发展,越来越多的系统和设备被接入到了物联网管理中,数据交换与共享成为了一个重要的问题。不同的设备和系统之间存在着各种各样的通信协议和数据格式,这就给数据获取、交换和共享带来了很大的不便利。 MQTT 是机器对机器…

unity学习(86)——细节优化

东西已经做出来了,现在需要的是优化,说得简单,做起来难。 1.122包的优化,避免重复创建! 2.为何会出现一边动,一边不动的情况。重复登录后依旧是unity可以看到移动,但是exe那边看不到移动&#…

关于图像YUV格式分类和排布方式的全学习

【学习笔记】关于图像YUV格式分类和排布方式的全学习_yuv图像-CSDN博客 下图是将多个yuv420p图像(A和B),拼接成一个画面的思路 A大小:416*64 B大小:416*208 将A和B合并到一个416*416的尺寸上,代码如下 //整合char * ptd;ptd (char * ) malloc (416*41…

C#通用类库封装实战

数据库查询 特性方式获取数据库列的别名 数据库更新 使用简单工厂配置的方式

套接字基础

套接字基础 套接字一、socket二、setsockopt三、bind四、listen五、select六、poll七、epoll一、水平模式(Level-Triggered,LT)二、边沿模式(Edge-Triggered,ET) 套接字 最近学习网络编程的时候&#xff0c…

甲辰年半日闲有得

甲辰年半日闲有得 出入一世红尘客,得失九台江湖人。 纠结苦海他日心,​何须挂碍当下身。 ​曾是春风得意时,重游故地情念真。 ​忘我才知小境遇,利他方明大原本。

矽塔SA8321 单通道 2.7-12.0V 持续电流 3.0A H 桥驱动芯片

描述 SA8321是为消费类产品,玩具和其他低压或者电池供电的运动控制类应用提供了一个集成的电机驱动器解决方案。此器件能够驱动一个直流无刷电机,由一个内部电荷泵生成所需的栅极驱动电压电路和4个功率 NMOS组成H桥驱动,集成了电机正转/反…

2024上海国际半导体制造设备材料与核心部件展览会

2024上海国际半导体制造设备材料与核心部件展览会 2024 Shanghai International Semiconductor Manufacturing Equipment Materials and Core Components Exhibition 时间:2024年11月18日-20日 地点:上海新国际博览中心 详询主办方陆先生 I38&#…

2024蓝桥杯嵌入式模板代码详解

文章目录 一、STM32CubeMx配置二、LED模板代码三、LCD模板代码 一、STM32CubeMx配置 打开STM32CubeMx,选择【File】->【New Project】,进入芯片选择界面,搜索到蓝桥杯官方的芯片型号,并点击收藏,下次直接点击收藏就…

【LeetCode: 39. 组合总和 + 递归】

🚀 算法题 🚀 🌲 算法刷题专栏 | 面试必备算法 | 面试高频算法 🍀 🌲 越难的东西,越要努力坚持,因为它具有很高的价值,算法就是这样✨ 🌲 作者简介:硕风和炜,…

比特币减半倒计时:NFT 生态将受到怎样的影响?

BTC 减半倒计时仅剩不到 1 天,预计在 4 月 20 日迎来减半。当前区块奖励为 6.25 BTC,减半后区块奖励为 3.125 BTC,剩余区块为 253。比特币减半无疑是比特币发展史上最重要的事件之一,每当这一事件临近,整个加密社区都充…

每日三个JAVA经典面试题(四十二)

1.Java中的线程池如何帮助优化性能? Java中的线程池是一种重要的并发编程工具,它可以帮助优化性能的方式有以下几点: 资源管理:线程池可以管理并重用线程,而不是为每个任务都创建一个新的线程。这减少了线程创建和销毁…

Linux下SPI设备驱动实验:测试读取ICM20608设备中数据是否正常

一. 简介 前面文章实现了 SPI设备的读写功能,也对ICM20608设备中(即SPI设备)寄存器里的数据进行了读取。文章如下: Linux下SPI设备驱动实验:读取ICM20608设备的数据-CSDN博客 本文对驱动功能进行测试,即…

大数据平台搭建2024(二)

二:Hive安装 只在node01上操作 1 安装MySQL 8.0 最小化安装需要安装这个 yum install -y wget1-1 下载MySQL的yum源 wget http://dev.mysql.com/get/mysql80-community-release-el7-7.noarch.rpm检查是否安装成功 rpm -qpl mysql80-community-release-el7-7.n…