如何使用RabbitMQ和Python实现广播消息

使用 RabbitMQ 和 Python 实现广播消息的过程涉及设置一个消息队列和多个消费者,以便接收相同的消息。RabbitMQ 的 “fanout” 交换机允许你将消息广播到所有绑定的队列。以下是如何实现这一过程的详细步骤。

在这里插入图片描述

1、问题背景

在将系统从Morbid迁移到RabbitMQ时,发现RabbitMQ无法提供Morbid默认提供的广播行为。在广播模式下,当一个消息被添加到队列时,所有的消费者都会收到它。然而,在RabbitMQ中,消息会以轮询的方式分发给各个监听器。

代码例子如下:

# 消费者
import stompclass MyListener(object):def on_error(self, headers, message):print 'recieved an error %s' % messagedef on_message(self, headers, message):print 'recieved a message %s' % messageconn = stomp.Connection([('0.0.0.0', 61613), ('127.0.0.1', 61613)], 'user', 'password')
conn.set_listener('', MyListener())
conn.start()
conn.connect(username="user", password="password")
headers = {}conn.subscribe(destination='/topic/demoqueue', ack='auto')while True:pass
conn.disconnect()# 发送者
import stompclass MyListener(object):def on_error(self, headers, message):print 'recieved an error %s' % messagedef on_message(self, headers, message):print 'recieved a message %s' % messageconn = stomp.Connection([('0.0.0.0', 61613), ('127.0.0.1', 61613)], 'user', 'password')
conn.set_listener('', MyListener())
conn.start()
conn.connect(username="user", password="password")
headers = {}conn.subscribe(destination='/topic/demotopic', ack='auto')while True:pass
conn.disconnect()

通过上述代码,将会出现问题,导致无法实现广播消息。

2、解决方案

  1. 使用交换机和队列来实现广播消息。具体方法如下:

(1)使用amqplib库来创建交换机和队列。在发送消息时,将消息发送到交换机,而不是队列。在接收消息时,将队列绑定到交换机,这样就可以收到交换机上所有消息。代码修改如下:

# 发送者
import stompclass MyListener(object):def on_error(self, headers, message):print 'recieved an error %s' % messagedef on_message(self, headers, message):print 'recieved a message %s' % messageconn = stomp.Connection([('0.0.0.0', 61613), ('127.0.0.1', 61613)], 'user', 'password')
conn.set_listener('', MyListener())
conn.start()
conn.connect(username="user", password="password")
headers = {}# 发送消息到交换机
exchange = 'my_exchange'
conn.send(str(i), exchange=exchange, destination='')# 接收者
import stomp
import sys
from amqplib import client_0_8 as amqp
#read in the exchange name so I can set up multiple recievers for different exchanges to tset
exchange = sys.argv[1]
conn = amqp.Connection(host="localhost:5672", userid="username", password="password",virtual_host="/", insist=False)chan = conn.channel()chan.access_request('/', active=True, write=True, read=True)#declare my exchange
chan.exchange_declare(exchange, 'topic')
#not passing a queue name means I get a new unique one back
qname,_,_ = chan.queue_declare()
#bind the queue to the exchange
chan.queue_bind(qname, exchange=exchange)class MyListener(object):def on_error(self, headers, message):print 'recieved an error %s' % messagedef on_message(self, headers, message):print 'recieved a message %s' % messageconn = stomp.Connection([('0.0.0.0', 61613), ('127.0.0.1', 61613)], 'browser', 'browser')
conn.set_listener('', MyListener())
conn.start()
conn.connect(username="username", password="password")
headers = {}#subscribe to the queue
conn.subscribe(destination=qname, ack='auto')while True:pass
conn.disconnect()

(2)使用StompJS 库来实现广播消息。具体方法如下:

// 消费者
var stompClient = Stomp.client('ws://localhost:61613');stompClient.connect({}, function(frame) {stompClient.subscribe('/topic/demoqueue', function(message) {console.log('Received message: ' + message.body);});
});// 发送者
var stompClient = Stomp.client('ws://localhost:61613');stompClient.connect({}, function(frame) {stompClient.send('/topic/demoqueue', {}, 'Hello, world!');
});

通过以上步骤,你可以实现 RabbitMQ 的消息广播功能。多个消费者可以同时接收来自同一个生产者的消息,这是构建分布式系统时非常常见的场景。如果需要更复杂的消息处理,可以在此基础上进行扩展。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/59232.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

C#-类:成员变量

声明在类语句块中,描述对象的特征,可为任意变量类型 可包含:枚举、结构体、类、其他 1. 类成员的详细定义 特征->成员变量:包括类的数据:变量、常量、事件的成员行为->成员方法:普通方法、属性、构…

PAT甲级-1133 Splitting A Linked List

题目 题目大意 给定一个链表的首节点地址和节点个数&#xff0c;以及一个数k。要求重新排列该链表&#xff0c;使其按<0 &#xff0c;> 0 && < k&#xff0c;>k 的顺序排序。但是不改变原有顺序&#xff0c;比如-4 -> -6 -> -2&#xff0c;不需要再…

【spark的集群模式搭建】spark集群之Yarn集群模式搭建(清晰明了的搭建流程)

文章目录 1、使用Anaconda部署Python2、上传、解压、重命名3、创建软连接&#xff08;如果在Standalone模式中创建有就删除&#xff09;4、配置spark环境变量5、修改spark-env.sh配置文件6、修改spark-defaults.conf 配置文件7、修改log4j.properties配置文件8、上传spark jar包…

Android IPC机制(三)进程间通信方式

在Android中有以下几种进程间通信方式: 目录 1.Bundle 2.文件共享 3.Messenger 4.ContentProvider 5.AIDL 1.Bundle Bundle是Android中用于存储一组键值对的类&#xff0c;它实现了Parcelable接口。这使得Bundle能够在不同的进程之间传递数据。当我们通过Intent启动其他应…

ubuntu系统安装升级jdk到17

百度安全验证 https://blog.csdn.net/qq_44866828/article/details/130557027 然后修改一下配置路径 试下java --version命令

GEE数据集:全球天然林和人工林数据集提供了一张高分辨率(30 米)地图,用于区分截至 2021 年全球的天然林和人工林

目录 简介 全球天然林和人工林 数据生成和分类 代码 引用 License 网址推荐 知识星球 机器学习 GEE数据集&#xff1a;全球天然林和人工林数据集提供了一张高分辨率&#xff08;30 米&#xff09;地图&#xff0c;用于区分截至 2021 年全球的天然林和人工林 简介 全球…

20241031 Apache2修改日志里面的时间格式

问题背景,Apache2里面日志输出,关于时间这一块,看着难受,所以有了如下需求,修改日志里面的时间格式 案例日志 127.0.0.1 - - [31/Oct/2024:19:20:34 0800] "GET /index.php/vod/search/actor/XimenadelSolar.html HTTP/1.1" 200 4233 "-" "Mozilla/5…

基于SpringBoot司机信用评价的货运管理系统【附源码】

基于SpringBoot司机信用评价的货运管理系统 效果如下&#xff1a; 系统主页面 系统注册页面 司机注册页面 管理员主页面 订单评价页面 货物信息页面 个人信息页面 研究背景 随着我国物流行业的迅猛发展&#xff0c;货运管理系统的效率与安全性日益受到重视。在货运过程中&am…

11.4OpenCV_图像预处理习题02

1.身份证号码识别&#xff08;结果&#xff1a;身份证号识别结果为&#xff1a;911124198108030024&#xff09; import cv2 import numpy as np import paddlehub as hubdef get_text():img cv2.imread("images1/images/shenfen03.jpg")# 灰度化gray_img cv2.cvt…

推荐:自然语言处理方向的一些创新点

以下是自然语言处理研究方向的一些创新点&#xff1a; 一、预训练模型的改进与优化 模型架构创新 融合多模态信息&#xff1a; 传统的自然语言处理模型主要处理文本信息。创新点在于将图像、音频等多模态信息融合到预训练模型中。例如&#xff0c;对于描述一幅画的文本&#x…

<项目代码>YOLOv8 煤矸石识别<目标检测>

YOLOv8是一种单阶段&#xff08;one-stage&#xff09;检测算法&#xff0c;它将目标检测问题转化为一个回归问题&#xff0c;能够在一次前向传播过程中同时完成目标的分类和定位任务。相较于两阶段检测算法&#xff08;如Faster R-CNN&#xff09;&#xff0c;YOLOv8具有更高的…

netty之实现一个redis的客户端

写在前面 本文看下如何使用redis来实现一个类似于redis官方提供的redis-cli.exe的客户端工具。 1&#xff1a;用到的模块 主要需要用到netty针对redis的编解码模块&#xff0c;可以解析redis的协议&#xff0c;从而可以实现和redis交互的功能。 2&#xff1a;正文 首先来…

防重方案-订单防重方案笔记

订单防重设计 订单重复提交概念解决方案前端防重机制后端防重机制利用Token机制基于数据库的唯一索引 Token机制方案介绍 其他 订单重复提交概念 重复提交指&#xff0c;连点按钮进行重复提交操作&#xff0c;不包括刷新后的重新下单&#xff0c;重新下单已非同一订单的概念。…

Vision - 开源视觉分割算法框架 Grounded SAM2 配置与推理 教程 (1)

欢迎关注我的CSDN&#xff1a;https://spike.blog.csdn.net/ 本文地址&#xff1a;https://spike.blog.csdn.net/article/details/143388189 免责声明&#xff1a;本文来源于个人知识与公开资料&#xff0c;仅用于学术交流&#xff0c;欢迎讨论&#xff0c;不支持转载。 Ground…

【C++刷题】力扣-#697-数组的度

题目描述 给定一个非空且只包含非负数的整数数组 nums&#xff0c;数组的 度 的定义是指数组里任一元素出现频数的最大值。 你的任务是在 nums 中找到与 nums 拥有相同大小的度的最短连续子数组&#xff0c;返回其长度。 示例 示例 1 输入&#xff1a;nums [1,2,2,3,1] 输出…

LocalDate 类常用方法详解(日期时间类)

LocalDate 类常用方法详解 LocalDate 是 Java 8 引入的日期时间API中的一个类&#xff0c;用于表示不含时间和时区的日期&#xff08;年、月、日&#xff09;。以下是一些常用的 LocalDate 方法&#xff1a; 创建 LocalDate 实例 now()&#xff1a;获取当前日期 LocalDate t…

一些常用的react hooks以及各自的作用

一些常用的react hooks以及各自的作用 一、React Hooks是什么二、一些常用的Hooks以及各自的作用1、useState2、useEffect3、useContext4、useMemo5、useCallback6、useReducer7、useRef 一、React Hooks是什么 Hook 是 React 16.8 的新增特性。它可以让你在不编写 class 的情…

不用买PSP,画质甚至更好,这款免费神器让你玩遍经典游戏

作为掌机游戏爱好者的福音&#xff0c;PPSSPP模拟器为玩家带来了前所未有的PSP游戏体验&#xff0c;彻底改变了掌机游戏的体验方式。这款精湛的软件不仅完美复刻了PSP主机的游戏体验&#xff0c;更通过先进的模拟技术&#xff0c;将经典游戏提升到了全新的高度。对于那些珍藏PS…

lua学习笔记---面向对象

在 Lua 中&#xff0c;封装主要通过元表&#xff08;metatable&#xff09;来实现。元表可以定义 __index、__newindex、__call 等元方法来控制对表的访问和赋值行为。 __index 元方法&#xff1a;当尝试访问一个不存在的键时&#xff0c;Lua 会查找元表的 __index 字段。如果 …

第15课 算法(下)

掌握冒泡排序、选择排序、插入排序、顺序查找、对分查找的的基本原理&#xff0c;并能使用这些算法编写简单的Python程序。 一、冒泡排序 1、冒泡排序的概念 冒泡排序是最简单的排序算法&#xff0c;是在一列数据中把较大&#xff08;或较小&#xff09;的数据逐次向右推移的…