Python如何操作RabbitMQ实现fanout发布订阅模式?有录播直播私教课视频教程

fanout发布订阅模式

基本用法

生产者

import json
import rabbitmq# 建立连接
credentials = rabbitmq.PlainCredentials('zhangdapeng','zhangdapeng520',
)  # mq用户名和密码
connection_target = rabbitmq.ConnectionParameters(host='127.0.0.1',port=5672,virtual_host='/',credentials=credentials,
)
connection = rabbitmq.BlockingConnection(connection_target)# 队列信息
exchange_name = "user_manager_fanout"
queue_name = "user_manager_fanout"# 创建管道
channel = connection.channel()# 声明一个交换机
channel.exchange_declare(exchange=exchange_name, exchange_type=rabbitmq.ExchangeType.fanout)# 向队列中写入数据
user = {"id": 1, "name": "张三", "age": 23}
message = json.dumps(user, ensure_ascii=True)
channel.basic_publish(exchange=exchange_name,routing_key=queue_name,  # 队列名body=message.encode('utf8'),properties=rabbitmq.BasicProperties(delivery_mode=2),  # 声明消息在队列中持久化
)
print(message)# 关闭连接
connection.close()

消费者

import rabbitmq
import json# 创建连接
credentials = rabbitmq.PlainCredentials('zhangdapeng','zhangdapeng520',
)
target = rabbitmq.ConnectionParameters(host='127.0.0.1',port=5672,virtual_host='/',credentials=credentials,
)
connection = rabbitmq.BlockingConnection(target)# 创建管道
channel = connection.channel()# 队列信息
exchange_name = "user_manager_fanout"
queue_name = "user_manager_fanout"# 绑定交换机
channel.exchange_declare(exchange=exchange_name,exchange_type=rabbitmq.ExchangeType.fanout,
)# 绑定队列
result = channel.queue_declare(queue=queue_name,exclusive=True,
)
channel.queue_bind(exchange=exchange_name,queue=queue_name,
)def callback(ch, method, properties, body):"""每次接收到消息的消费回调方法"""ch.basic_ack(delivery_tag=method.delivery_tag)data = body.decode("utf8")print(json.loads(data))# 开始消费
channel.basic_consume(queue=queue_name,on_message_callback=callback,auto_ack=False,
)
try:channel.start_consuming()
finally:connection.close()

简化代码

生产者

import rabbitmq# 建立连接
connection = rabbitmq.get_connection()# 队列信息
exchange_name = "user_manager_fanout"
queue_name = "user_manager_fanout"# 创建管道
channel = connection.channel()# 声明一个交换机
channel.exchange_declare(exchange=exchange_name, exchange_type=rabbitmq.ExchangeType.fanout)# 向队列中写入数据
user = {"id": 1, "name": "张三", "age": 23}
rabbitmq.send_json(channel, user, exchange_name, queue_name)# 关闭连接
connection.close()

消费者

import rabbitmq
import json# 创建连接
connection = rabbitmq.get_connection()# 创建管道
channel = connection.channel()# 队列信息
exchange_name = "user_manager_fanout"
queue_name = "user_manager_fanout"# 绑定交换机
channel.exchange_declare(exchange=exchange_name,exchange_type=rabbitmq.ExchangeType.fanout,
)# 绑定队列
result = channel.queue_declare(queue=queue_name,exclusive=True,
)
channel.queue_bind(exchange=exchange_name,queue=queue_name,
)def callback(ch, method, properties, body):"""每次接收到消息的消费回调方法"""print(rabbitmq.receive_json(ch, method, body))# 开始消费
rabbitmq.consume(connection, queue_name, callback)

进一步简化代码

生产者

import rabbitmq# 建立连接
connection = rabbitmq.get_connection()# 队列信息
exchange_name = "user_manager_fanout"
queue_name = "user_manager_fanout"# 创建管道
channel = rabbitmq.get_fanout_channel(connection, exchange_name)# 向队列中写入数据
user = {"id": 1, "name": "张三", "age": 23}
rabbitmq.send_json(channel, user, exchange_name, queue_name)# 关闭连接
connection.close()

消费者

import rabbitmq# 创建连接
connection = rabbitmq.get_connection()# 队列信息
exchange_name = "user_manager_fanout"
queue_name = "user_manager_fanout"# 创建管道
channel = rabbitmq.get_fanout_channel(connection, exchange_name, queue_name)def callback(ch, method, properties, body):"""每次接收到消息的消费回调方法"""print(rabbitmq.receive_json(ch, method, body))# 开始消费
rabbitmq.consume(connection, queue_name, callback)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/630337.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

HCIP BGP(一)

任务: 1.R1上有两个环回,分别为192.168.1.0/24&192.168.2.0/24,只允许学到汇总&1.0 2.R7上有两个环回172.16.1.0/24&172.16.2.0/24,要求全部宣告,但是只有2.0可以通过 3.全网可达 拓扑图如下&#xff…

C语言经典算法之顺序查找算法

目录 前言 A.建议 B.简介 一 代码实现 二 算法时空复杂度 A.时间复杂度: B.空间复杂度: 三 优点和缺点 A.优点: B.缺点: 四 现实中的应用 前言 A.建议 1.学习算法最重要的是理解算法的每一步,而不是记住算…

常见的反爬虫风控 | 验证码风控

一.前言 在当今信息技术迅速发展的背景下,网站和在线服务面临着日益增长的自动化访问威胁,这些大多来自于各类爬虫程序。这种大量的自动化访问不仅对网站的正常运行构成压力,还可能导致敏感数据的泄露,甚至被用于不正当竞争和恶意…

多表关联查询

基本信息: 1.创建student和score表 CREATE TABLE student ( id INT(10) NOT NULL UNIQUE PRIMARY KEY , name VARCHAR(20) NOT NULL , sex VARCHAR(4) , birth YEAR, department VARCHAR(20) , address VARCHAR(50) ); 创建score表。SQL代码如下&#…

PHP反序列化漏洞-魔术方法绕过

一、__wakeup()魔法函数绕过: 在PHP中,__wakeup()是一个魔术方法,用于在反序列化对象时自动调用。当反序列化字符串中的对象属性个数大于实际属性个数时,可以利用这个漏洞进行绕过。 触发条件: PHP版本为5.6.25或早期版本,或者PHP7版本小于7.0.10。反序列化字符串中的对…

C++大学教程(第九版)5.15修改GradeBook

目录 题目 代码 运行命令(在控制台输入) 运行截图 题目 (修改GradeBook)修改图5.9~图5.11所示的 GradeBook 程序,使它计算一组成绩的平均成绩。 成绩A为4分,成绩B为3分,依次类推。 A:4 B:3…

transbigdata 笔记: 轨迹密集化/稀疏化 轨迹平滑

1 密集化 transbigdata.traj_densify(data, col[Vehicleid, Time, Lng, Lat], timegap15) 轨迹致密化,保证至多每隔timegap秒都有一个轨迹点 这边插补使用的是pandas的interpolate,method设置的是index 1.1 举例 transbigdata 笔记: 官方…

开发实践7_文件上传

以下学习 朔宁夫 开发课(Python)。 文件上传&表单类 一 Django文件上传 表单上传。 前端:表单设置 enctype "multipart/form-data" 后端:获取上传文件对象 upload_dile request.FILES.get("文件域名…

《数据结构、算法与应用C++语言描述》-红黑树的C++实现-百万级数据量测试通过

红黑树 完整可编译运行代码见仓库:GitHub - Jasmine-up/Data-Structures-Algorithms-and-Applications/_35Red black tree。 如有问题请在评论区指出。另外,Github仓库会根据我的学习情况持续更新,欢迎大家点star,谢谢。 基本概…

【LeetCode】151. 反转字符串中的单词(中等)——代码随想录算法训练营Day08

题目链接:151. 反转字符串中的单词 题目描述 给你一个字符串 s ,请你反转字符串中 单词 的顺序。 单词 是由非空格字符组成的字符串。s 中使用至少一个空格将字符串中的 单词 分隔开。 返回 单词 顺序颠倒且 单词 之间用单个空格连接的结果字符串。 …

【卡码网】55. 右旋转字符串——代码随想录算法训练营Day08

题目链接:55. 右旋转字符串 题目描述 字符串的右旋转操作是把字符串尾部的若干个字符转移到字符串的前面。给定一个字符串 s 和一个正整数 k,请编写一个函数,将字符串中的后面 k 个字符移到字符串的前面,实现字符串的右旋转操作…

YOLOv8改进 | 主干篇 | 低照度增强网络PE-YOLO改进主干(改进暗光条件下的物体检测模型)

一、本文介绍 本文给大家带来的改进机制是低照度图像增强网络PE-YOLO中的PENet,PENet通过拉普拉斯金字塔将图像分解成多个分辨率的组件,增强图像细节和低频信息。它包括一个细节处理模块(DPM),用于通过上下文分支和边缘分支增强图像细节,以及一个低频增强滤波器(LEF),…

力扣909. 蛇梯棋

广度优先搜索 动态规划 思路: 定义 pair {id, step} 为到达格子编号 id,使用的步数 step,记作 step[id];记录下所摇骰子 1 - 6 到达的格子编号 next,step[next] step[id] 1: 走了 1 步,所能…

指纹浏览器为什么要搭配代理IP?如何选择?

跨境电商无论是店群模式还是社媒矩阵运营,都必须涉及管理多个社媒/电商帐户的动作,但这很容易引发网站怀疑并最终被批量封号。使用指纹浏览器浏览器的主要目的是通过创建新的浏览器指纹来隐藏用户的真实浏览器指纹。 但浏览器指纹并不是网站关注的唯一…

ELK之Filebeat安装配置及日志抓取

一、Filebeat是什么 轻量型日志采集器 无论您是从安全设备、云、容器、主机还是 OT 进行数据收集,Filebeat 都将为您提供一种轻量型方法,用于转发和汇总日志与文件,让简单的事情不再繁杂。 Filebeat 随附可观测性和安全数据源模块,这些模块简化了常见格式的日志的收集、解…

边缘计算发展的瓶颈

上次我们讨论了边缘计算,并用无人零售和边缘计算的例子,说明了边缘计算的便利性。 但是边缘计算,也有很多限制。 硬件资源限制 边缘设备通常具有有限的计算、存储和网络资源。这种资源限制使得在处理大量数据或执行复杂任务时,…

Qt/QML编程之路:Grid、GridLayout、GridView、Repeater(33)

GRID网格用处非常大,不仅在excel中,在GUI中,也是非常重要的一种控件。 Grid 网格是一种以网格形式定位其子项的类型。网格创建一个足够大的单元格网格,以容纳其所有子项,并将这些项从左到右、从上到下放置在单元格中。每个项目都位于其单元格的左上角,位置为(0,0)。…

Nsis常量的使用

常量定义 !define 常量名 常量值 使用 ${常量名} 作用 常量可与字符串拼接使用 实例 !define EXENAME "Test" OutFile "${EXENAME}.exe" #常量与字符串拼接使用 Section SectionEnd

What is `DOM注入攻击` does?

DOM(Document Object Model)注入攻击,也称为DOM型跨站脚本(DOM-based XSS, 或简称DXSS),是一种特殊的跨站脚本攻击方式。不同于传统的存储型XSS或反射型XSS,DOM注入攻击的关键在于客户端JavaScr…

电脑DIY-显卡

显卡 显卡代号前缀英伟达(NVIDIA)RTX系列实时光线追踪的基本原理DLSS技术的基本原理 显卡代号前缀 前缀品牌首次发布时间定位适用人群主流产品GTNVIDIA2006年较低性能办公用户、轻度游戏GT 1030GTXNVIDIA2008年高性能游戏玩家、图形设计师GTX 1080 TiRT…