RabbitMQ的简单使用 —— Python篇

(一)RabbitMQ的简介
RabbitMq 是实现了高级消息队列协议(AMQP)的开源消息代理中间件。消息队列是一种应用程序对应用程序的通行方式,应用程序通过写消息,将消息传递于队列,由另一应用程序读取 完成通信。而作为中间件的 RabbitMq 无疑是目前最流行的消息队列之一。目前使用较多的消息队列有ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ。

RabbitMQ总体架构

PS:生产者和消费者可能在不同的程序或主机中,当然也有可能一个程序有可能既是生产者,也是消费者。 

RabbitMq 应用场景广泛:
1.系统的高可用:日常生活当中各种商城秒杀,高流量,高并发的场景。当服务器接收到如此大量请求处理业务时,有宕机的风险。某些业务可能极其复杂,但这部分不是高时效性,不需要立即反馈给用户,我们可以将这部分处理请求抛给队列,让程序后置去处理,减轻服务器在高并发场景下的压力。

2.分布式系统,集成系统,子系统之间的对接,以及架构设计中常常需要考虑消息队列的应用。(二)RabbitMQ的安装

apt-get update
apt-get install erlang
apt-get install rabbitmq-server#启动rabbitmq: service rabbitmq-server start
#停止rabbitmq: service rabbitmq-server stop
#重启rabbitmq: service rabbitmq-server restart#启动rabbitmq插件:rabbitmq-plugins enable rabbitmq_management

启用rabbitmq_management插件后就可以登录后台管理页面了,浏览器输入ip:15672

自带的密码和用户名都是guest,但是只能本机登录

所以下面我们添加新用户,和自定义权限

#添加新用户
rabbitmqctl add_user 用户名 密码#给指定用户添加管理员权限
rabbitmqctl set_user_tags 用户名 administrator给用户添加权限
rabbitmqctl set_permissions -p / 用户名 ".*" ".*" ".*"

在web页面输入用户名,和密码

(三)python操作RabbitMQ
python中使用pika操作RabbitMQ

pip install pika
#皮卡皮卡,哈哈

(四)RabbitMQ简单模式

上代码

# coding=utf-8
### 生产者import pika
import timeuser_info = pika.PlainCredentials('root', 'root')#用户名和密码
connection = pika.BlockingConnection(pika.ConnectionParameters('ip', 5672, '/', user_info))#连接服务器上的RabbitMQ服务# 创建一个channel
channel = connection.channel()# 如果指定的queue不存在,则会创建一个queue,如果已经存在 则不会做其他动作,官方推荐,每次使用时都可以加上这句
channel.queue_declare(queue='hello')for i in range(0, 100):channel.basic_publish(exchange='',#当前是一个简单模式,所以这里设置为空字符串就可以了routing_key='hello',# 指定消息要发送到哪个queuebody='{}'.format(i)# 指定要发送的消息)time.sleep(1)# 关闭连接
# connection.close()

PS:RabbitMQ中所有的消息都要先通过交换机,空字符串表示使用默认的交换机
 

# coding=utf-8
### 消费者import pikauser_info = pika.PlainCredentials('root', 'root')
connection = pika.BlockingConnection(pika.ConnectionParameters('ip', 5672, '/', user_info))
channel = connection.channel()# 如果指定的queue不存在,则会创建一个queue,如果已经存在 则不会做其他动作,生产者和消费者都做这一步的好处是
# 这样生产者和消费者就没有必要的先后启动顺序了
channel.queue_declare(queue='hello')# 回调函数
def callback(ch, method, properties, body):print('消费者收到:{}'.format(body))# channel: 包含channel的一切属性和方法
# method: 包含 consumer_tag, delivery_tag, exchange, redelivered, routing_key
# properties: basic_publish 通过 properties 传入的参数
# body: basic_publish发送的消息channel.basic_consume(queue='hello',  # 接收指定queue的消息auto_ack=True,  # 指定为True,表示消息接收到后自动给消息发送方回复确认,已收到消息on_message_callback=callback  # 设置收到消息的回调函数)print('Waiting for messages. To exit press CTRL+C')# 一直处于等待接收消息的状态,如果没收到消息就一直处于阻塞状态,收到消息就调用上面的回调函数
channel.start_consuming()

对于上面的这种模式,有一下两个不好的地方: 

一个是在我们的消费者还没开始消费完队列里的消息,如果这时rabbitmq服务挂了,那么消息队列里的消息将会全部丢失,解决方法是在声明队列时,声明队列为可持久化存储队列,并且在生产者将消息插入到消息队列时,设置消息持久化存储,具体如下 

# coding=utf-8
### 生产者
import pika
import timeuser_info = pika.PlainCredentials('root', 'root')
connection = pika.BlockingConnection(pika.ConnectionParameters('ip', 5672, '/', user_info))# 创建一个channel
channel = connection.channel()# 如果指定的queue不存在,则会创建一个queue,如果已经存在 则不会做其他动作,官方推荐,每次使用时都可以加上这句
channel.queue_declare(queue='durable_queue',durable=True)
#PS:这里不同种队列不允许名字相同for i in range(0, 100):channel.basic_publish(exchange='',routing_key='durable_queue',body='{}'.format(i),properties=pika.BasicProperties(delivery_mode=2))# 关闭连接
# connection.close()

消费者与上面的消费者没有什么不同,具体的就是消费声明的队列,也要是可持久化的队列,还有就是,即使在生产者插入消息时,设置当前消息持久化存储(properties=pika.BasicProperties(delivery_mode=2)),并不能百分百保证消息真的被持久化,因为RabbitMQ挂掉的时候它可能还保存在缓存中,没来得及同步到磁盘中

在生产者插入消息后,立刻停止rabbitmq,并重新启动,其实我们在web管理页面也可看到未被消费的信息,当然在启动消费者后也成功接收到了消息


上面说的第二点不好就是,如果在消费者获取到队列里的消息后,在回调函数的处理过程中,消费者突然出错或程序崩溃等异常,那么就会造成这条消息并未被实际正常的处理掉。为了解决这个问题,我们只需在消费者basic_consume(auto_ack=False),并在回调函数中设置手动应答即可ch.basic_ack(delivery_tag=method.delivery_tag),具体如下

# coding=utf-8
### 消费者import pika
import timeuser_info = pika.PlainCredentials('root', 'root')
connection = pika.BlockingConnection(pika.ConnectionParameters('ip', 5672, '/', user_info))
channel = connection.channel()# 如果指定的queue不存在,则会创建一个queue,如果已经存在 则不会做其他动作,生产者和消费者都做这一步的好处是
# 这样生产者和消费者就没有必要的先后启动顺序了
channel.queue_declare(queue='queue')# 回调函数
def callback(ch, method, properties, body):time.sleep(5)ch.basic_ack(delivery_tag=method.delivery_tag)print('消费者收到:{}'.format(body.decode('utf-8')))# channel: 包含channel的一切属性和方法
# method: 包含 consumer_tag, delivery_tag, exchange, redelivered, routing_key
# properties: basic_publish 通过 properties 传入的参数
# body: basic_publish发送的消息channel.basic_consume(queue='queue',  # 接收指定queue的消息auto_ack=False,  # 指定为False,表示取消自动应答,交由回调函数手动应答on_message_callback=callback  # 设置收到消息的回调函数)# 应答的本质是告诉消息队列可以将这条消息销毁了print('Waiting for messages. To exit press CTRL+C')# 一直处于等待接收消息的状态,如果没收到消息就一直处于阻塞状态,收到消息就调用上面的回调函数
channel.start_consuming()

这里只需要配置消费者,生产者并不要修改

还有就是在上的使用方式在,都是一个生产者和一个消费者,还有一种情况就是,一个生产者和多个消费者,即多个消费者同时监听一个消息队列,这时候队列里的消息就是轮询分发(即如果消息队列里有100条信息,如果有2个消费者,那么每个就会收到50条信息),但是在某些情况下,不同的消费者处理任务的能力是不同的,这时还按照轮询的方式分发消息并不是很合理,那么只需要再配合手动应答的方式,设置消费者接收的消息没有处理完,队列就不要给我放送新的消息即可,具体配置方式如下:

# coding=utf-8
### 消费者import pika
import timeuser_info = pika.PlainCredentials('root', 'root')
connection = pika.BlockingConnection(pika.ConnectionParameters('ip', 5672, '/', user_info))
channel = connection.channel()# 如果指定的queue不存在,则会创建一个queue,如果已经存在 则不会做其他动作,生产者和消费者都做这一步的好处是
# 这样生产者和消费者就没有必要的先后启动顺序了
channel.queue_declare(queue='queue')# 回调函数
def callback(ch, method, properties, body):time.sleep(0)#通过设置休眠时间来模拟不同消费者的处理时间ch.basic_ack(delivery_tag=method.delivery_tag)print('消费者收到:{}'.format(body.decode('utf-8')))# prefetch_count表示接收的消息数量,当我接收的消息没有处理完(用basic_ack标记消息已处理完毕)之前不会再接收新的消息了
channel.basic_qos(prefetch_count=1)  # 还有就是这个设置必须在basic_consume之上,否则不生效channel.basic_consume(queue='queue',  # 接收指定queue的消息auto_ack=False,  # 指定为False,表示取消自动应答,交由回调函数手动应答on_message_callback=callback  # 设置收到消息的回调函数)# 应答的本质是告诉消息队列可以将这条消息销毁了print('Waiting for messages. To exit press CTRL+C')# 一直处于等待接收消息的状态,如果没收到消息就一直处于阻塞状态,收到消息就调用上面的回调函数
channel.start_consuming()

PS:这种情况必须关闭自动应答ack,改成手动应答。使用basicQos(perfetch=1)限制每次只发送不超过1条消息到同一个消费者,消费者必须手动反馈告知队列,才会发送下一个

(五)RabbitMQ发布订阅模式
发布订阅会将消息发送给所有的订阅者,而消息队列中的数据被消费一次便消失。所以,RabbitMQ实现发布和订阅时,会为每一个订阅者创建一个队列,而发布者发布消息时,会将消息放置在所有相关队列中

这个模式中会引入交换机的概念,其实在RabbitMQ中,所有的生产者都不会直接把消息发送到队列中,甚至生产者都不知道消息在发出后有没有发送到queue中,事实上,生产者只能将消息发送给交换机,由交换机来决定发送到哪个队列中。 

交换机的一端用来从生产者中接收消息,另一端用来发送消息到队列,交换机的类型规定了怎么处理接收到的消息,发布订阅模式使用到的交换机类型为 fanout ,这种交换机类型非常简单,就是将接收到的消息广播给已知的(即绑定到此交换机的)所有消费者。

当然,如果不想使用特定的交换机,可以使用 exchange=‘’ 表示使用默认的交换机,默认的交换机会将消息发送到 routing_key 指定的queue,可以参考简单模式。

上代码:

#生产者
import pikauser_info = pika.PlainCredentials('root', 'root')
connection = pika.BlockingConnection(pika.ConnectionParameters('ip', 5672, '/', user_info))
channel = connection.channel()# 创建一个指定名称的交换机,并指定类型为fanout,用于将接收到的消息广播到所有queue中
channel.exchange_declare(exchange='交换机', exchange_type='fanout')# 将消息发送给指定的交换机,在fanout类型中,routing_key=''表示不用发送到指定queue中,
# 而是将发送到绑定到此交换机的所有queue
channel.basic_publish(exchange='交换机', routing_key='', body='这是一条测试消息')
#消费者
import pikauser_info = pika.PlainCredentials('root', 'root')
connection = pika.BlockingConnection(pika.ConnectionParameters('ip', 5672, '/', user_info))
channel = connection.channel()channel.exchange_declare(exchange='交换机', exchange_type='fanout')# 使用RabbitMQ给自己生成一个专有的queue
result = channel.queue_declare(queue='333')
# result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue
# 这里如果设置exclusive=True参数,那么该队列就是一个只有队列,在消费者结束后,该专有队列也会自动清除,如果queue=''没有设置名字的话,那么就会自动生成一个
# 不会重复的队列名# 将queue绑定到指定交换机
channel.queue_bind(exchange='交换机', queue=queue_name)print(' [*] Waiting for  message.')def callback(ch, method, properties, body):print("消费者收到:{}".format(body.decode('utf-8')))channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)channel.start_consuming()

该模式与简单模式的还有一个区别就是,这里的消息队列都是由消费者声明的,所以如果是生产者先启动,并将消息发给交换机的画,这里的消息就会丢失,所以我们也可以在消费者端声明队列并绑定交换机(不能是专有队列),所以仔细想想,其实这所谓的发布订阅模式并没有说什么了不起,它不过是让交换机同时推送多条消息给绑定的队列,我们当然也可以在简单模式的基础上多进行几次basic_publish发送消息到指定的队列。当然我们这样做的话,可能就没办法做到由交换机的同时发送了,效率可能也没有一次basic_publish的高

(六)RabbitMQ RPC模式

下面实现由rpc远程调用加减运算 

客户端

import pika
import uuid
import jsonclass RPC(object):def __init__(self):self.call_id = Noneself.response = Noneuser_info = pika.PlainCredentials('root', 'root')self.connection = pika.BlockingConnection(pika.ConnectionParameters('ip', 5672, '/', user_info))self.channel = self.connection.channel()# 创建一个此客户端专用的queue,用于接收服务端发过来的消息result = self.channel.queue_declare(queue='', exclusive=True)self.callback_queue = result.method.queueself.channel.basic_consume(queue=self.callback_queue,on_message_callback=self.on_response,auto_ack=True)def on_response(self, ch, method, props, body):# 判断接收到的response是否属于对应requestif self.call_id == props.correlation_id:self.response = json.loads(body.decode('utf-8')).get('result')def call(self, func, param):self.response = Noneself.call_id = str(uuid.uuid4())  # 为该消息指定uuid,类似于请求idself.channel.queue_declare(queue='rpc_queue')self.channel.basic_publish(exchange='',routing_key='rpc_queue',  # 将消息发送到该queueproperties=pika.BasicProperties(reply_to=self.callback_queue,  # 从该queue中取消息correlation_id=self.call_id,  # 为此次消息指定uuid),body=json.dumps({'func': func,'param': {'a': param[0], 'b': param[1]}}))self.connection.process_data_events(time_limit=3)# 与start_consuming()相似,可以设置超时参数return self.responserpc = RPC()print("发送消息到消费者,等待返回结果")response = rpc.call(func='del', param=(1, 2))print("收到来自消费者返回的结果:{}".format(response))


服务端

import pika
import jsonuser_info = pika.PlainCredentials('root', 'root')
connection = pika.BlockingConnection(pika.ConnectionParameters('ip', 5672, '/', user_info))channel = connection.channel()# 指定接收消息的queue
channel.queue_declare(queue='rpc_queue')def add_number(a, b):return a + bdef del_num(a, b):return a - bexecute_map = {'add': add_number,'del': del_num
}def on_request(ch, method, props, body):body = json.loads(body.decode('utf-8'))func = body.get('func')param = body.get('param')result = execute_map.get(func)(param.get('a'), param.get('b'))print('进行{}运算,并将结果返回个消费者'.format(func))ch.basic_publish(exchange='',  # 使用默认交换机routing_key=props.reply_to,  # response发送到该queueproperties=pika.BasicProperties(correlation_id=props.correlation_id),  # 使用correlation_id让此response与请求消息对应起来body=json.dumps({'result': result}))ch.basic_ack(delivery_tag=method.delivery_tag)channel.basic_qos(prefetch_count=1)
# 从rpc_queue中取消息,然后使用on_request进行处理
channel.basic_consume(queue='rpc_queue', on_message_callback=on_request)print(" [x] Awaiting RPC requests")
channel.start_consuming()

 本次分享到此结束,感谢大家的阅读!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/30769.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

JavaWeb项目配置教程

将你的项目(只有代码的文件,不是整个文件)拖入idea 找到数据库配置代码(一般在Util包里面,或者是properties配置文件)并将密码修改为你的数据库密码。 点击Edit Configurations 点击Configure&#xff0…

SOLIDWORKS安装运行环境建议 慧德敏学

SOLIDWORKS是一款要求很高的软件。无可否认。您的电脑功能越强大,运行得越好,但是我们也要考虑购买成本,因此,选择正确的配置很重要。在选择用于SOLIDWORKS的电脑配置时,需要综合考虑多个方面以确保软件能够流畅、以更…

Hi3861 OpenHarmony嵌入式应用入门--PWM 三色灯

这篇文章是讲解的pwm控制三色灯的部分,这部分也是后续全彩智能灯的基础。 硬件原理如下 IO管脚定义在hi-12f_v1.1.2-规格书-20211202.pdf文档中 GPIO API API名称 说明 unsigned int IoTGpioInit(unsigned int id); GPIO模块初始化 hi_u32 hi_io_set_func(hi_i…

无引擎游戏开发(2):最简游戏框架 | EasyX制作井字棋小游戏I

一、EasyX中的坐标系 不同于数理中的坐标系,EasyX中的y轴是竖直向下的 二、渲染缓冲区 之前的程序添加了这三个函数改善了绘图时闪烁的情况: 小球在"画布“上移动的过程就是我们在调用绘图函数,这个”画布“就是渲染缓冲区,先绘制的内…

【配置】Notion自动化备份到github方案

步骤 打开notion网页,获取到需要的值 token_v2 找到请求getSpaces的 Cookie 值 token_v2 space_id 找到请求getSpaces的响应结果space,如下图: file_token 找个页面点击导出,之后拿到这个配置项 注意:配置项会过期&#xff0c…

基于一种改进熵方法的旋转机械故障诊断模型(MATLAB)

熵的概念起源于热力学,1884年,玻尔兹曼定义熵,用以描述分子热运动的无序性和混乱度。1948年,Shannon在其发表的《AMathematicalTheoryofCommunication》中提出香农熵,首次将“熵”引入信息度量范畴,为信息论…

RK3568技术笔记十三 Ubuntu的编译

Ubuntu文件系统编译 在编译前需要按照前面的方法初始化编译环境,否则会导致编译失败(若配置过则无需重复配置)。 按下述方法编译的Ubuntu系统,用户名是:dianyu 密码:1 编译Ubuntu,执…

机械臂 CoppeliaSim Simulink联合仿真

实现机械臂在CoppeliaSim(以前称为V-REP)和Simulink上的联合仿真涉及多个步骤,包括环境设置、模型导入、通信配置、控制算法设计和测试调试。 前期准备 安装软件配置工作环境创建和配置CoppeliaSim场景 导入机械臂模型配置机械臂参数在Simuli…

webp图片格式怎么转换成jpg?值得收藏的四种转换方法介绍!

webp图片格式怎么转换成jpg?在当今数字图像处理领域,新兴的WebP格式以其独特的特性和兼容性,迅速成为备受追捧的图像格式之一,这一格式以其高效的压缩能力和优秀的网络传输速度著称,为网络图像的传输和存储提供了全新的…

一种稀疏贝叶斯学习的旋转机械故障诊断方法(MATLAB)

轴承的故障诊断技术是通过检测轴承故障特征信息来判断轴承的具体故障为位置或损伤程度。在轴承发生损坏时,故障特征信息会随着工作时间的增长变得明显。轴承的损坏过程可以分为四个阶段。第一个阶段为损伤初始阶段,轴承故障特征信号一般无法测量。第二个…

基于SSM+Jsp的书店仓库管理系统

摘要:仓库作为储存货物的核心功能之一,在整个仓储中具有非常重要的作用,是社会物质生产的必要条件。良好的仓库布局环境能够对货物进入下一个环节前的质量起保证作用,能够为货物进入市场作好准备,在设计中我们根据书店…

【人工智能】音乐大模型的深入探讨——当机器有了创意,是机遇还是灾难?

👀国内外音乐大模型基本情况👀 ♥概述♥ ✈✈✈如FreeCompose、一术科技等,这些企业专注于开发人工智能驱动的语音、音效和音乐生成工具,致力于利用核心技术驱动文化产业升级。虽然具体公司未明确提及,但可以预见的是…

产业园区空间优化设计的创新实践者

树莓集团在产业园区运营中的空间优化设计方面,通过全面规划与科学布局、绿色智能与可持续发展、个性化定制与灵活多变、创新实践与数字化升级等措施,为企业提供了高品质、高效率的空间环境和服务支持。 一、全面规划与科学布局 明确产业定位&#xff1a…

Nuxt3 实战 (十一):添加路由 Transition 过渡效果和 Loading 动画

页面过渡效果 Nuxt3 利用 Vue 的 组件 在页面和布局之间应用过渡效果。 nuxt.config.ts 文件配置: export default defineNuxtConfig({app: {pageTransition: { name: page, mode: out-in }}, })在页面之间添加过渡效果,在 app.vue 文件中添加以下 CS…

USAD: 多元时间序列的无监督异常检测

USAD: 多元时间序列的无监督异常检测 原创 小王搬运工 时序课堂 2024-06-20 10:43 四川 论文地址:https://dl.acm.org/doi/abs/10.1145/3394486.3403392 论文源码:https://github.com/manigalati/usad 期刊:KDD 20: Proceedings of the 26…

嵌入式开发二十:定时器之基本定时器

定时器是微控制器中的关键外设,用于精确控制时间和事件。通过配置时钟源、预分频器、计数周期和比较值,可以实现各种时间控制任务,如定时中断、PWM生成和时间测量。理解定时器的工作原理和配置方法是嵌入式系统开发中的基本技能。 STM32F407 …

人工智能的头号威胁:投毒攻击

随着掌管数字生活入口的万亿美元俱乐部企业——苹果公司跳入人工智能(AI)赛道,AI技术民主化的大幕正式拉开,同时也将AI安全问题推向舆论的风口浪尖。 根据瑞银本周一的智能手机调查报告,在中国以外的智能手机用户中&am…

安装MySQL5.7版本步骤遇到问题

方法一:下载zip版本(我用的这个) 参考视频(已收藏):windows安装MySQL5.7_哔哩哔哩_bilibili 下载zip压缩包的MySQL的网址:上面这个视频中有哦。 my.ini文件内容如下: [client] p…

有什么值得推荐的文件外发权限管理软件?

你的身边有这种事情发生吗? 一些工作人员在传达文件、部署工作时,为贪图方便,直接将涉密文件、涉密工作通过微信群传达部署,造成了大量泄密案件发生。 有些责任人员为了工作方便,对涉密文件进行拍照,通过…

jar包运行脚本

start: # 启动项目 #!/bin/bash nohup java -jar audit-2.1.0.jar > app.log 2>&1 & quit: # 关闭程序 #!/bin/bash PID$(pgrep -f audit-2.1.0.jar) # 根据应用程序名称查找进程ID kill -9 $PID # 结束进程使用 sh命令运行