python rabbitmq实现简单/持久/广播/组播/topic/rpc消息异步发送可配置Django

windows首先安装rabbitmq 点击参考安装

1、环境介绍

Python 3.10.16
其他通过pip安装的版本(Django、pika、celery这几个必须要有最好版本一致)
amqp              5.3.1
asgiref           3.8.1
async-timeout     5.0.1
billiard          4.2.1
celery            5.4.0
click             8.1.7
click-didyoumean  0.3.1
click-plugins     1.1.1
click-repl        0.3.0
colorama          0.4.6
Django            4.2
dnspython         2.7.0
eventlet          0.38.2
greenlet          3.1.1
kombu             5.4.2
pika              1.3.2
pip               24.2
prompt_toolkit    3.0.48
python-dateutil   2.9.0.post0
redis             5.2.1
setuptools        75.1.0
six               1.17.0
sqlparse          0.5.3
typing_extensions 4.12.2
tzdata            2024.2
vine              5.1.0
wcwidth           0.2.13
wheel             0.44.0

2、创建Django 项目

django-admin startproject django_rabbitmq

3、在setting最下边写上

# settings.py    guest:guest 表示的是你安装好的rabbitmq的登录账号和密码
BROKER_URL = 'amqp://guest:guest@localhost:15672/'
CELERY_RESULT_BACKEND = 'rpc://'

4.1 简单模式

4.1.1 在和setting同级的目录下创建一个叫consumer.py的消费者文件,其内容如下:

import pikadef callback(ch, method, properties, body):print(f"[x] Received {body.decode()}")def start_consuming():# 创建与RabbitMQ的连接connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))channel = connection.channel()# 声明一个队列channel.queue_declare(queue='hello')# 指定回调函数channel.basic_consume(queue='hello', on_message_callback=callback, auto_ack=True)print('[*] Waiting for messages. To exit press CTRL+C')channel.start_consuming()if __name__ == "__main__":start_consuming()

4.1.2 在和setting同级的目录下创建一个叫producer.py的生产者文件,其内容如下:

import pikadef publish_message():# message = request.GET.get('msg')# 创建与RabbitMQ的连接connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))channel = connection.channel()# 声明一个队列channel.queue_declare(queue='hello')# 发布消息message = "Hello World!"channel.basic_publish(exchange='', routing_key='hello', body=message)print(f"[x] Sent '{message}'")# 关闭连接connection.close()if __name__ == "__main__":publish_message()

4.1.3 先运行消费者代码(consumer.py)再运行生产者代码(producer.py)

先:python consumer.py
再: python producer.py

4.1.4 运行结果如下:

在这里插入图片描述
在这里插入图片描述

4.2 消息持久化模式

4.2.1 在和setting同级的目录下创建一个叫recv_msg_safe.py的消费者文件,其内容如下:

import time
import pikadef callback(ch, method, properties, body):print(" [x] Received %r" % body)time.sleep(20)print(" [x] Done")# 下边这个就是标记消费完成了,下次在启动接受消息就不用从头开始了,即# 手动确认消息消费完成 和auto_ack=False 搭配使用ch.basic_ack(delivery_tag=method.delivery_tag)  # method.delivery_tag就是一个标识符,方便找对人def start_consuming():# 创建与RabbitMQ的连接connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))channel = connection.channel()# 声明一个队列channel.queue_declare(queue='hello2', durable=True)  # 若声明过,则换一个名字# 指定回调函数channel.basic_consume(queue='hello2',on_message_callback=callback,# auto_ack=True  # 为true则不能持久话消息,即消费者关闭后下次收不到之前未收取的消息auto_ack=False  # 为False则下次依然从头开始收取消息,直到callback函数调用完成)print('[*] Waiting for messages. To exit press CTRL+C')channel.start_consuming()if __name__ == "__main__":start_consuming()

4.2.2 在和setting同级的目录下创建一个叫send_msg_safe.py的生产者文件,其内容如下:

import pikadef publish_message():# 创建与RabbitMQ的连接connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))channel = connection.channel()# 声明一个队列  durable=True队列持久化channel.queue_declare(queue='hello2', durable=True)channel.basic_publish(exchange='',routing_key='hello2',body='Hello World!',# 消息持久话用,主要用作宕机的时候,估计是写入本地硬盘了properties=pika.BasicProperties(delivery_mode=2,  # make message persistent))# 关闭连接connection.close()if __name__ == "__main__":publish_message()

4.2.3 先运行消费者代码(recv_msg_safe.py)再运行生产者代码(send_msg_safe.py) 执行结果如下:

在这里插入图片描述

4.3 广播模式

4.3.1 在和setting同级的目录下创建一个叫fanout_receive.py的消费者文件,其内容如下:

# 广播模式
import pika# credentials = pika.PlainCredentials('guest', 'guest')
# connection = pika.BlockingConnection(pika.ConnectionParameters(
#     host='localhost', credentials=credentials))
# 在setting中如果不配置BROKER_URL和CELERY_RESULT_BACKEND的情况下请使用上边的代码
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='logs', exchange_type='fanout')  # 指定发送类型
# 必须能过queue来收消息
result = channel.queue_declare("", exclusive=True)  # 不指定queue名字,rabbit会随机分配一个名字,exclusive=True会在使用此queue的消费者断开后,自动将queue删除
queue_name = result.method.queue
channel.queue_bind(exchange='logs', queue=queue_name)  # 随机生成的Q,绑定到exchange上面。
print(' [*] Waiting for logs. To exit press CTRL+C')def callback(ch, method, properties, body):print(" [x] %r" % body)channel.basic_consume(on_message_callback=callback, queue=queue_name, auto_ack=True)
channel.start_consuming()

4.3.2 在和setting同级的目录下创建一个叫fanout_send.py的生产者文件,其内容如下:

# 通过广播发消息
import pika
import sys# credentials = pika.PlainCredentials('guest', 'guest')
# connection = pika.BlockingConnection(pika.ConnectionParameters(
#     host='localhost', credentials=credentials))
# 在setting中如果不配置BROKER_URL和CELERY_RESULT_BACKEND的情况下请使用上边的代码
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='logs', exchange_type='fanout') #发送消息类型为fanout,就是给所有人发消息# 如果等于空,就输出hello world!
message = ' '.join(sys.argv[1:]) or "info: Hello World!"channel.basic_publish(exchange='logs',routing_key='',  # routing_key 转发到那个队列,因为是广播所以不用写了body=message)print(" [x] Sent %r" % message)
connection.close()

4.3.3 先运行消费者代码(fanout_receive.py)再运行生产者代码(fanout_send.py) 执行结果如下:

在这里插入图片描述

4.4 组播模式

4.4.1 在和setting同级的目录下创建一个叫direct_recv.py的消费者文件,其内容如下:

import pika
import syscredentials = pika.PlainCredentials('guest', 'guest')
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost', credentials=credentials))channel = connection.channel() channel.exchange_declare(exchange='direct_logs', exchange_type='direct')
result = channel.queue_declare("", exclusive=True)
queue_name = result.method.queueseverities = sys.argv[1:]   # 接收那些消息(指info,还是空),没写就报错
if not severities:sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])  # 定义了三种接收消息方式info,warning,errorsys.exit(1)for severity in severities:  # [error  info  warning],循环severitieschannel.queue_bind(exchange='direct_logs',queue=queue_name,routing_key=severity)  # 循环绑定关键字
print(' [*] Waiting for logs. To exit press CTRL+C')def callback(ch, method, properties, body):print(" [x] %r:%r" % (method.routing_key, body))channel.basic_consume(on_message_callback=callback, queue=queue_name,)
channel.start_consuming()

4.4.2 在和setting同级的目录下创建一个叫direct_send.py的生产者文件,其内容如下:

# 组播
import pika
import syscredentials = pika.PlainCredentials('guest', 'guest')
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost', credentials=credentials))channel = connection.channel()channel.exchange_declare(exchange='direct_logs',exchange_type='direct') #指定类型severity = sys.argv[1] if len(sys.argv) > 1 else 'info'  #严重程序,级别;判定条件到底是info,还是空,后面接消息message = ' '.join(sys.argv[2:]) or 'Hello World!'  #消息channel.basic_publish(exchange='direct_logs',routing_key=severity, #绑定的是:error  指定关键字(哪些队列绑定了,这个级别,那些队列就可以收到这个消息)body=message)print(" [x] Sent %r:%r" % (severity, message))
connection.close()

4.4.3 先运行消费者代码(direct_recv.py)再运行生产者代码(direct_send.py) 执行结果如下:

在这里插入图片描述

4.5 更细致的topic模式

4.5.1 在和setting同级的目录下创建一个叫topic_recv.py的消费者文件,其内容如下:

import pika
import syscredentials = pika.PlainCredentials('guest', 'guest')
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost', credentials=credentials))channel = connection.channel()
channel.exchange_declare(exchange='topic_logs',exchange_type='topic')result = channel.queue_declare("", exclusive=True)
queue_name = result.method.queuebinding_keys = sys.argv[1:]
if not binding_keys:print("sys.argv[0]", sys.argv[0])sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0])sys.exit(1)for binding_key in binding_keys:channel.queue_bind(exchange='topic_logs',queue=queue_name,routing_key=binding_key)print(' [*] Waiting for logs. To exit press CTRL+C')def callback(ch, method, properties, body):print(" [x] %r:%r" % (method.routing_key, body))channel.basic_consume(on_message_callback=callback,queue=queue_name)channel.start_consuming()

4.5.2 在和setting同级的目录下创建一个叫topic_send.py的生产者文件,其内容如下:

import pika
import syscredentials = pika.PlainCredentials('guest', 'guest')
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost', credentials=credentials))channel = connection.channel()channel.exchange_declare(exchange='topic_logs',exchange_type='topic') #指定类型routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info'message = ' '.join(sys.argv[2:]) or 'Hello World!'  #消息channel.basic_publish(exchange='topic_logs',routing_key=routing_key,body=message)
print(" [x] Sent %r:%r" % (routing_key, message))
connection.close()

4.5.3 先运行消费者代码(topic_recv.py)再运行生产者代码(topic_send.py) 执行结果如下:

在这里插入图片描述

4.6 Remote procedure call (RPC) 双向模式

4.6.1 在和setting同级的目录下创建一个叫rpc_client.py的消费者文件,其内容如下:

import pika
import uuid
import time# 斐波那契数列 前两个数相加依次排列
class FibonacciRpcClient(object):def __init__(self):# 赋值变量,一个循环值self.response = None# 链接远程# self.connection = pika.BlockingConnection(pika.ConnectionParameters(#         host='localhost'))credentials = pika.PlainCredentials('guest', 'guest')self.connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost', credentials=credentials))self.channel = self.connection.channel()# 生成随机queueresult = self.channel.queue_declare("", exclusive=True)# 随机取queue名字,发给消费端self.callback_queue = result.method.queue# self.on_response 回调函数:只要收到消息就调用这个函数。# 声明收到消息后就 收queue=self.callback_queue内的消息  准备接受命令结果self.channel.basic_consume(queue=self.callback_queue,auto_ack=True, on_message_callback=self.on_response)# 收到消息就调用# ch 管道内存对象地址# method 消息发给哪个queue# body数据对象def on_response(self, ch, method, props, body):# 判断本机生成的ID 与 生产端发过来的ID是否相等if self.corr_id == props.correlation_id:# 将body值 赋值给self.responseself.response = bodydef call(self, n):# 随机一次唯一的字符串self.corr_id = str(uuid.uuid4())# routing_key='rpc_queue' 发一个消息到rpc_queue内self.channel.basic_publish(exchange='',routing_key='rpc_queue',properties=pika.BasicProperties(# 执行命令之后结果返回给self.callaback_queue这个队列中reply_to=self.callback_queue,# 生成UUID 发送给消费端correlation_id=self.corr_id,),# 发的消息,必须传入字符串,不能传数字body=str(n))# 没有数据就循环收while self.response is None:# 非阻塞版的start_consuming()# 没有消息不阻塞  检查队列里有没有新消息,但不会阻塞self.connection.process_data_events()print("no msg...")time.sleep(0.5)return int(self.response)# 实例化
fibonacci_rpc = FibonacciRpcClient()response = fibonacci_rpc.call(5)
print(" [.] Got %r" % response)

4.6.2 在和setting同级的目录下创建一个叫rpc_server.py的生产者文件,其内容如下:

#_*_coding:utf-8_*_
import pika
import time
# 链接socket
credentials = pika.PlainCredentials('guest', 'guest')
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost', credentials=credentials))channel = connection.channel()# 生成rpc queue  在这里声明的所以先启动这个
channel.queue_declare(queue='rpc_queue')# 斐波那契数列
def fib(n):if n == 0:return 0elif n == 1:return 1else:return fib(n-1) + fib(n-2)# 收到消息就调用
# ch 管道内存对象地址
# method 消息发给哪个queue
# props 返回给消费的返回参数
# body数据对象
def on_request(ch, method, props, body):n = int(body)print(" [.] fib(%s)" % n)# 调用斐波那契函数 传入结果response = fib(n)ch.basic_publish(exchange='',# 生产端随机生成的queuerouting_key=props.reply_to,# 获取UUID唯一 字符串数值properties=pika.BasicProperties(correlation_id=props.correlation_id),# 消息返回给生产端body=str(response))# 确保任务完成# ch.basic_ack(delivery_tag = method.delivery_tag)# 每次只处理一个任务
# channel.basic_qos(prefetch_count=1)
# rpc_queue收到消息:调用on_request回调函数
# queue='rpc_queue'从rpc内收
channel.basic_consume(queue="rpc_queue",auto_ack=True,on_message_callback=on_request)
print(" [x] Awaiting RPC requests")
channel.start_consuming()

4.6.3 先运行消费者代码(rpc_server.py)再运行生产者代码(rpc_client.py) 执行结果如下:

在这里插入图片描述

参考实现1

参考实现2

参考实现3

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/64487.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

XML Schema 复合类型 - 混合内容

XML Schema 复合类型 - 混合内容 XML Schema 是一种用于定义 XML 文档结构和内容的语言。在 XML Schema 中,复合类型是一种包含其他元素和/或属性的复杂类型。混合内容(Mixed Content)是复合类型的一种特殊形式,它允许元素包含其…

nano编辑器的使用

nano 是一个非常简单易用的命令行文本编辑器,它常用于在 Linux 或类 Unix 系统中快速编辑文件,特别适用于需要修改配置文件或快速编辑文本的场景。以下是一些常见的 nano 使用技巧和基本操作。 1. 打开文件 要使用 nano 编辑文件,打开终端并…

Numpy基本介绍

目录 1、Numpy的优势 1.1、ndarray介绍 1.2、ndarray与Python原生list运算效率对比 1.3、ndarray的优势 1.3.1、内存块风格 1.3.2、ndarray支持并行化运算(向量化运算) 1.3.3、效率远高于纯Python代码 2、N维数组-ndarray 2.1、ndarray的属性 2.2、ndarray的形状 2…

XXE练习

pikachu-XXE靶场 1.POC:攻击测试 <?xml version"1.0"?> <!DOCTYPE foo [ <!ENTITY xxe "a">]> <foo>&xxe;</foo> 2.EXP:查看文件 <?xml version"1.0"?> <!DOCTYPE foo [ <!ENTITY xxe SY…

正则表达式在线校验(RegExp) - 加菲工具

正则表达式在线校验 - 加菲工具 打开网站 加菲工具 选择“正则表达式在线校验” 或者直接打开https://www.orcc.online/tools/regexp 输入待校验的源文本与正则表达式&#xff0c;点击“校验”按钮 需要注意检验后的内容可能存在多空格&#xff0c;可以拉下去看看~

java后端环境配置

因为现在升学了&#xff0c;以前本来想毕业干java的&#xff0c;很多java的环境配置早就忘掉了&#xff08;比如mysql maven jdk idea&#xff09;&#xff0c;想写个博客记录下来&#xff0c;以后方便自己快速搭建环境 JAVA后端开发配置 环境配置jdkideamavenMySQLnavicate17…

51c嵌入式~合集3

我自己的原文哦~ https://blog.51cto.com/whaosoft/12847563 一、UCIe 2.0 日前&#xff0c;通用芯粒互连&#xff08;UCIe&#xff09;产业联盟最新公布了 UCIe 2.0 规范&#xff0c;支持可管理性标准化系统架构&#xff0c;并全面解决了系统级封装&#xff08;SiP&#x…

解决电脑网速慢问题:硬件检查与软件设置指南

电脑网速慢是许多用户在使用过程中常见的问题&#xff0c;它不仅会降低工作效率&#xff0c;还可能影响娱乐体验。导致电脑网速慢的原因多种多样&#xff0c;包括硬件问题、软件设置和网络环境等。本文将从不同角度分析这些原因&#xff0c;并提供提高电脑网速的方法。 一、检查…

6、AI测试辅助-测试报告编写(生成Bug分析柱状图)

AI测试辅助-测试报告编写&#xff08;生成Bug分析柱状图&#xff09; 一、测试报告1. 创建测试报告2. 报告补充优化2.1 Bug图表分析 3. 风险评估 总结 一、测试报告 测试报告内容应该包含&#xff1a; 1、测试结论 2、测试执行情况 3、测试bug结果分析 4、风险评估 5、改进措施…

【C++ 】for 循环系统深入解析与实现法比较

博客主页&#xff1a; [小ᶻ☡꙳ᵃⁱᵍᶜ꙳] 本文专栏: C 文章目录 &#x1f4af;前言&#x1f4af;for 循环的基本语法格式语法格式&#xff1a;格式一&#xff1a;单行语句的 for 循环格式二&#xff1a;多行语句的 for 循环循环流程图实例代码 for 循环中变量初始化的作…

Protobuf: 初识

protobuf是什么 简单来讲&#xff0c;ProtoBuf&#xff08;全称为ProtocolBuffer&#xff09;是让结构数据序列化的⽅法&#xff0c;其具有以下特点&#xff1a; • 语⾔⽆关、平台⽆关&#xff1a;即ProtoBuf⽀持Java、C、Python等多种语⾔&#xff0c;⽀持多个平台。 • ⾼效…

CSS 语法

CSS 语法 CSS(层叠样式表)是一种用于描述HTML或XML文档样式的样式表语言。它允许您将样式信息与文档内容分离,从而更有效地控制网页的布局和外观。本文将详细介绍CSS的基本语法和结构,帮助您更好地理解和应用CSS。 CSS的基本结构 CSS由一系列的规则组成,每个规则包含一…

Vue3安装配置、开发环境搭建(组件安装卸载)(图文详细)

Vue3安装配置、开发环境搭建(组件安装卸载)&#xff08;图文详细&#xff09; 本文目录&#xff1a; 一、vue的主要安装使用方式 二、node.js安装和配置 1、支持运行 Node.js的平台 2、Node.js 版本开发发布时间表&#xff08;日期可能会有变化&#xff09; 3、下载安装n…

Oracle 适配 OpenGauss 数据库差异语法汇总

背景 国产化进程中&#xff0c;需要将某项目的数据库从 Oracle 转为 OpenGauss &#xff0c;项目初期也是规划了适配不同数据库的&#xff0c;MyBatis 配置加载路径设计的是根据数据库类型加载指定文件夹的 xml 文件。 后面由于固定了数据库类型为 Oracle 后&#xff0c;只写…

Vue进阶之状态管理,解锁项目开发超能力

一、概念 状态管理是指对应用程序中状态的管理。在软件领域&#xff0c;状态是指在某个特定时刻&#xff0c;应用程序的数据和行为表现。 以一个简单的购物网站为例&#xff0c;购物车中的商品列表、用户的登录状态等都是状态。状态管理主要涉及这些状态如何被存储、更新和在…

操作系统(16)I/O软件

前言 操作系统I/O软件是负责管理和控制计算机系统与外围设备&#xff08;例如键盘、鼠标、打印机、存储设备等&#xff09;之间交互的软件。 一、I/O软件的定义与功能 定义&#xff1a;I/O软件&#xff0c;也称为输入/输出软件&#xff0c;是计算机系统中用于管理和控制设备与主…

WPF+MVVM案例实战与特效(四十二)- 打造炫酷彩虹字体控件,让你的应用闪耀起来

文章目录 1、引言2、案例实现1、依赖属性2、代码解释3、转换器实现3、控件使用4、运行效果4、总结1、引言 在WPF 应用程序中,视觉效果往往是吸引用户注意力的关键。一个小小的字体控件,如果能够以彩虹般的色彩展示文本,不仅能让界面更加生动,还能为用户提供独特的交互体验…

游戏AI实现-寻路算法(Dijkstra)

戴克斯特拉算法&#xff08;英语&#xff1a;Dijkstras algorithm&#xff09;&#xff0c;又称迪杰斯特拉算法、Dijkstra算法&#xff0c;是由荷兰计算机科学家艾兹赫尔戴克斯特拉在1956年发现的算法。 算法过程&#xff1a; 1.首先设置开始节点的成本值为0&#xff0c;并将…

CTFshow-文件上传(Web151-170)

CTFshow-文件上传(Web151-170) 参考了CTF show 文件上传篇&#xff08;web151-170&#xff0c;看这一篇就够啦&#xff09;-CSDN博客 Web151 要求png&#xff0c;然后上传带有一句话木马的a.png&#xff0c;burp抓包后改后缀为a.php&#xff0c;然后蚁剑连接&#xff0c;找fl…

Unity超优质动态天气插件(含一年四季各种天气变化,可用于单机局域网VR)

效果展示&#xff1a;https://www.bilibili.com/video/BV1CkkcYHENf/?spm_id_from333.1387.homepage.video_card.click 在你的项目中设置enviro真的很容易&#xff01;导入包裹并按照以下步骤操作开始的步骤&#xff01; 1. 拖拽“EnviroSky”预制件&#xff08;“environme…