【消息中间件】Rabbitmq的基本要素、生产和消费、发布和订阅

原文作者:我辈李想
版权声明:文章原创,转载时请务必加上原文超链接、作者信息和本声明。


文章目录

  • 前言
  • 一、消息队列的基本要素
    • 1.队列:queue
    • 2.交换机:exchange
    • 3.事件:routing_key
    • 4.任务:task
  • 二、生产消费模式
    • 1.安装pika
    • 2.模拟生产者进程
    • 3.模拟消费者进程
    • 4.ACK消息确认机制
    • 5.类的写法
      • (1)新建MyRabbitMQ.py文件
      • (2)基础RabiitMQ
  • 三、发布订阅模式
  • 四、多消息队列
  • 五、异常处理
    • 1. 死信队列


前言

Rabbitmq消息队列,Windows安装RabbitMQ教程


一、消息队列的基本要素

队列是一种特殊的线性表,特殊之处在于它只允许在表的前端(front)进行删除操作,而在表的后端(rear)进行插入操作,和栈一样,队列是一种操作受限制的线性表。进行插入操作的端称为队尾,进行删除操作的端称为队头。

消息队列是一种中间件 ,用于在不同的组件或系统之间传递消息(进程间通讯的一种)。 它提供了一种可靠的机制(AMQP)来存储和传递消息,并确保消息的顺序性和可靠性。消息队列需要存储消息。

1.队列:queue

用于接入消息队列的出入口

2.交换机:exchange

用于存储的一种通道

3.事件:routing_key

用于记录的一种标记

4.任务:task

这里的任务就是处理程序,还可能包含回调函数

注:基于我们使用不同的要素组合,分化出了基础的生产消费模式和发布订阅模式。其中只使用队列和任务的方式划为生产消费模式,4个同时使用的方式划为发布订阅模式。

二、生产消费模式

消息队列处理的是进程间通讯问题,生产者和消费者正是2个进程的程序,代表了不同的组件或系统。
我们使用python来实现相关功能,可以通过pika这个三方库来实现。

1.安装pika

pip install pika -i https://pypi.tuna.tsinghua.edu.cn/simple

2.模拟生产者进程

这里的生产者进程可能是一个后端程序、也可能是一个py文件、也可能知识一条触发命令。

# !/usr/bin/env python
import pika# ######################### 生产者 #########################
# 如果设置密码,那么就需要加以下一句话配置用户名与密码
credentials = pika.PlainCredentials("root", "123")
# host:ip地址 credentials:链接凭证
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost', credentials=credentials))
# 声明一个channel,类似数据库打开一个链接
channel = connection.channel()
# 创建一个队列,队列名称叫做hello
channel.queue_declare(queue='hello')
# 向hello这个队列里发送一个Hello World!   exchange:如果当做一个普通队列,就为空
channel.basic_publish(exchange='', routing_key='hello', body='Hello World!')
print(" [x] Sent 'Hello World!'")
connection.close()

3.模拟消费者进程

消费者

# !/usr/bin/env python
import pika# ########################## 消费者 ##########################
# 如果设置密码,那么就需要加以下一句话配置用户名与密码
credentials = pika.PlainCredentials("root", "123")
# host:ip地址 credentials:链接凭证
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost', credentials=credentials))
channel = connection.channel()# channel.queue_declare(queue='hello')def callback(ch, method, properties, body):print(" [x] Received %r" % body)# 取一个就关掉的方法channel.stop_consuming()
# 去hello队列里拿数据,一但有数据,就执行callback
channel.basic_consume(callback, queue='hello', no_ack=True)print(' [*] Waiting for messages. To exit press CTRL+C')
# 开始去取得意思【表示一直去队列中取会夯住】注意可以去一个就关掉
channel.start_consuming()

4.ACK消息确认机制

ACK机制是消费者从RabbitMQ收到消息并处理完成后,反馈给RabbitMQ,RabbitMQ收到反馈后才将次消息从队列中删除。

生产者

# !/usr/bin/env python
import pika# ######################### 生产者 #########################
# 如果设置密码,那么就需要加以下一句话配置用户名与密码
credentials = pika.PlainCredentials("root", "123")
# host:ip地址 credentials:链接凭证
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost', credentials=credentials))
# 声明一个channel,类似数据库打开一个链接
channel = connection.channel()
# 创建一个队列,队列名称叫做hello
channel.queue_declare(queue='hello')
# 向hello这个队列里发送一个Hello World!   exchange:如果当做一个普通队列,就为空
channel.basic_publish(exchange='', routing_key='hello', body='Hello World!')
print(" [x] Sent 'Hello World!'")
connection.close()

消费者

# !/usr/bin/env python
import pika# ########################## 消费者 ##########################
# 如果设置密码,那么就需要加以下一句话配置用户名与密码
credentials = pika.PlainCredentials("root", "123")
# host:ip地址 credentials:链接凭证
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost', credentials=credentials))channel = connection.channel()# channel.queue_declare(queue='hello')def callback(ch, method, properties, body):print(" [x] Received %r" % body)# 取值做确认工作ch.basic_ack(delivery_tag=method.deliver_tag)# 去hello队列里拿数据,一但有数据,就执行callback,
# no_ack=Flask必须在取值时做确认工作,否则值不会被取出
channel.basic_consume(callback, queue='hello', no_ack=False)print(' [*] Waiting for messages. To exit press CTRL+C')
# 开始去取得意思【表示一直去队列中取会夯住】注意可以去一个就关掉
channel.start_consuming()

5.类的写法

这个类使用 pika 库进行与 RabbitMQ 的通信。当你使用 send_message() 或 receive_message() 、consume_messages方法时,Channel 对象必须是打开的。如果没有连接或者通道没有打开,这些方法将引发 ValueError 异常。

(1)新建MyRabbitMQ.py文件

文件包含rabbitmq的类,类中包含连接到RabbitMQ,并在连接对象上创建一个管道,然后就可以使用send_message()receive_message()方法、consume_messages发送和接收消息,接收消息会调用回调方法。

下面是一个带有消费回调的完整 RabbitMQ 类

import pika
import timeclass RabbitMQ:def __init__(self, host, port, username, password):self.host = hostself.port = portself.username = usernameself.password = passwordself.connection = Noneself.channel = Nonedef connect(self, timeout=10):credentials = pika.PlainCredentials(self.username, self.password)parameters = pika.ConnectionParameters(host=self.host,port=self.port,credentials=credentials)start_time = time.time()while time.time() - start_time < timeout:try:self.connection = pika.BlockingConnection(parameters)self.channel = self.connection.channel()return Trueexcept pika.exceptions.AMQPConnectionError:time.sleep(1)return Falsedef send_message(self, exchange, routing_key, message):try:self.channel.basic_publish(exchange=exchange,routing_key=routing_key,body=message,properties=pika.BasicProperties(delivery_mode=2))except AttributeError:raise ValueError("Channel is not open. Call connect() before send_message().")def receive_message(self, queue, auto_ack=False):try:method_frame, properties, body = self.channel.basic_get(queue=queue, auto_ack=auto_ack)if method_frame:return body.decode('utf-8')else:return Noneexcept AttributeError:raise ValueError("Channel is not open. Call connect() before receive_message().")def consume_messages(self, queue, callback):try:self.channel.basic_consume(queue=queue, on_message_callback=callback, auto_ack=True)self.channel.start_consuming()except AttributeError:raise ValueError("Channel is not open. Call connect() before consume_messages().")def create_queue(self, name):try:self.channel.queue_declare(queue=name, durable=True)except AttributeError:raise ValueError("Channel is not open. Call connect() before create_queue().")def bind_queue(self, queue, exchange, routing_key):try:self.channel.queue_bind(queue=queue, exchange=exchange, routing_key=routing_key)except AttributeError:raise ValueError("Channel is not open. Call connect() before bind_queue().")def close(self):try:self.connection.close()except AttributeError:raise ValueError("Connection is not open. Call connect() before close().")

(2)基础RabiitMQ

基于队列_生产

创建RabiitMQ_生产.py文件,内容如下:

from MyRabbitMQ import RabbitMQif __name__ == '__main__':print('RabbitMQ生产')my_host = '127.0.0.1'my_username = 'guest'my_password = 'guest'my_queue = 'hello'my_exchange = 'BBB'my_routing_key = 'hello'rabbitmq = RabbitMQ(my_host, 5672, my_username, my_password)if rabbitmq.connect():rabbitmq.create_queue(my_queue)rabbitmq.send_message('', my_queue, message='开始了')else:print("Failed to connect to RabbitMQ.")

基于队列_消费

from MyRabbitMQ import RabbitMQif __name__ == '__main__':print('RabbitMQ消费')my_host = '127.0.0.1'my_username = 'guest'my_password = 'guest'my_queue = 'hello'my_exchange = 'BBB'my_routing_key = 'hello'def callback(channel, method, properties, body):print("Received message: %s" % body.decode('utf-8'))channel.basic_ack(delivery_tag=method.delivery_tag)rabbitmq = RabbitMQ(my_host, 5672, my_username, my_password)if rabbitmq.connect():rabbitmq.create_queue(my_queue)rabbitmq.consume_messages(my_queue, callback)else:print("Failed to connect to RabbitMQ.")

在此例中,当一个新的消息从名为 my_queue 的队列中接收时,回调函数 callback 将被调用并打印消息内容。

注意:如果你的回调函数需要执行较复杂的操作(例如长时间运行或使用多线程),则你应该确保它是线程安全的,并且在操作完成后调用 ch.basic_ack,这样 RabbitMQ 就知道消息已经被处理并可以将其从队列中删除。

三、发布订阅模式

发布订阅模式的消费者是queue队列,需要绑定exchange和routing_key,实际使用时可能存在一个队列绑定多个routing_key,或多个queue绑定一个routing_key,所以在我们的消费者处理中,需要判断routing_key事件做必要的区分。

基于exchangs交换机的生产者

from MyRabbitMQ import RabbitMQif __name__ == '__main__':print('RabbitMQ消费')my_host = '127.0.0.1'my_username = 'guest'my_password = 'guest'my_queue = 'hello'my_exchange = 'BBB'my_routing_key = 'hello'rabbitmq = RabbitMQ(my_host, 5672, my_username, my_password)if rabbitmq.connect():rabbitmq.send_message(my_exchange, my_routing_key, message='开始了')else:print("Failed to connect to RabbitMQ.")

基于exchangs交换机的消费者

from MyRabbitMQ import RabbitMQif __name__ == '__main__':print('RabbitMQ消费')my_host = '127.0.0.1'my_username = 'guest'my_password = 'guest'my_queue = 'hello'my_exchange = 'BBB'my_routing_key = 'hello'def callback(channel, method, properties, body):print("Received message: %s" % body.decode('utf-8'))channel.basic_ack(delivery_tag=method.delivery_tag)rabbitmq = RabbitMQ(my_host, 5672, my_username, my_password)if rabbitmq.connect():rabbitmq.create_queue(my_queue)# rabbitmq.send_message(my_exchange, my_routing_key, message='开始了')rabbitmq.bind_queue(my_queue, my_exchange, my_routing_key)rabbitmq.consume_messages(my_queue, callback)else:print("Failed to connect to RabbitMQ.")

在这里插入图片描述

四、多消息队列

import pika
import random
from retry import retry
def on_message(channel, method_frame, header_frame, body)print(method_frame.delivery_tag)print(body)print(header_frame)channel.basic_ack(delivery_tag=method_frame.delivery_tag)node1 = pika.URLParameters('amqp://node1')
node2 = pika.URLParameters('amqp://node2')
node3 = pika.URLParameters('amqp://node3')
all_endpoints = [node1, node2, node3]@retry(pika.exceptions.AMQPConnectionError, delay=5, jitter(1, 3)
def consume():random.shuffle(all_endpoints)connection = pika.BlockingConnection(all_endpoints)channel = connection.channel()channel.basic_qos(prefetch_count=1)channel.queue_declare('recovery-example', durable=False, auto_delete=True)channel.basic_consume('recovery-example', on_message)try:channel.start_consuming()except KeyboardInterrupt:channel.stop_consuming()connection.close()except pika.excaptions.ConnectionClosedByBroker:continue
consume()

五、异常处理

from pika.exceptions import ChannelClosed
from pika.exceptions import ConnectionClosedtry:mq.start_consuming_message()except ConnectionClosed as e:mq.clear()mq.reconnect(queue_oname, exchange, route_key, is_use_rabbitpy=1)mq.start_consuming_message()except ChannelClosed as e:mq.clear()mq.reconnect(queue_oname, exchange, route_key, is_use_rabbitpy=1)mq.start_consuming_message()

1. 死信队列

死信队列就是备份队列,rabbitMQ有,kafka还没有

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/228217.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【HR培训】行为反馈复盘,走出舒适区--20231217

行为反馈复盘&#xff0c;走出舒适区–鱼缸会议 要点&#xff1a;在于建立平等、透明、敢说的反馈环境&#xff0c;不打断、不争论 鱼缸会议流程 导入——入缸——反馈——承诺——关闭 步骤1&#xff1a;导入 目的&#xff1a;平等、透明、敢说的反馈 人员&#xff1a;主…

maui中实现加载更多 RefreshView跟ListView(1)

效果如图&#xff1a; MainPage.xaml.cs: using System; using System.Collections.ObjectModel; using System.Threading.Tasks; using Microsoft.Maui.Controls; using Microsoft.Maui.Controls.Xaml; using System.ComponentModel; using System.Runtime.CompilerServices…

计算机网络基础——网线认识与制作,线缆类型、线序、端接标准及注意事项

一、引言 网线制作是网络基础知识中不可或缺的。网络传输过程中&#xff0c;网线的质量和制作方法都会直接影响传输的速度和稳定性。本文将详细介绍网线制作的基础知识、线缆类型、线序、端接标准及注意事项。希望通过本文&#xff0c;读者能够更好地了解和掌握网线制作的方法…

AMD 自适应和嵌入式产品技术日

概要 时间&#xff1a;2023年11月28日 地点&#xff1a;北京朝阳新云南皇冠假日酒店 主题内容&#xff1a;AMD自适应和嵌入式产品的更新&#xff0c;跨越 云、边、端的AI解决方案&#xff0c;赋能智能制造的机器视觉与机器人等热门话题。 注&#xff1a;本文重点关注FPGA&a…

ASP.NET MVC实战之权限拦截Authorize使用

1&#xff0c;具体的实现方法代码如下 public class CustomAuthorizeAttribute : FilterAttribute, IAuthorizationFilter{/// <summary>/// 如果需要验证权限的时候&#xff0c;就执行进来/// </summary>/// <param name"filterContext"></par…

Ubuntu系统入门指南:基础操作和使用

Ubuntu系统的基础操作和使用 一、引言二、安装Ubuntu系统三、Ubuntu系统的基础操作3.1、界面介绍3.2、应用程序的安装和卸载3.3、文件管理3.4、系统设置 四、Ubuntu系统的日常使用4.1、使用软件中心4.2、浏览器的使用和网络连接设置4.3、邮件客户端的配置和使用4.4、文件备份和…

HTML5+CSS3小实例:3D发光切换按钮效果

目录 一、运行效果 图片效果 二、项目概述 三、开发环境 四、实现步骤及代码 1.创建空文件夹 2.完成页面内容 3.完成css样式 五、项目总结 六、源码获取 一、运行效果 图片效果 二、项目概述 这个项目是一个演示3D发光切换按钮效果的网页。按钮由一个开关和一个指…

Linux之进程(四)(进程地址空间)

目录 一、程序地址空间 二、进程地址空间 1、概念 2、写时拷贝 3、为什么要有进程地址空间 四、总结 一、程序地址空间 我们先来看看下面这张图。这张图是我们在学习语言时就见到过的内存区域划分图。 下面我们在Linux下看一看内存区域是不是也是这么划分的。 可见在Li…

圣诞树绘制合集-python绘制

使用Python绘制迷人的圣诞树 引言 随着圣诞节的临近&#xff0c;我们都希望以各种方式庆祝这个欢乐的节日。作为一名编程爱好者&#xff0c;你有没有想过用Python来创造节日的气氛呢&#xff1f;在这篇文章中&#xff0c;我将向你展示如何用Python绘制几种不同风格的圣诞树&a…

索尼(ILCE-7M3)MP4文件只能播放前两分钟修复案例

索尼的ILCE-7M3是一款经典设备&#xff0c;其HEVC编码效果是比较不错的&#xff0c;因此受到很多专业人士的青睐。之前我们说过很多索尼摄像机断电生成RSV文件修复的案例&#xff0c;今天来讲一个特殊的&#xff0c;文件已经正常封装但仅能播放前两分钟多一点的画面。 故障文件…

详细教程 - 从零开发 鸿蒙harmonyOS应用 第四节 (鸿蒙Stage模型 登录页面 ArkTS版 推荐使用)

在鸿蒙OS中&#xff0c;Ability是应用程序提供的抽象功能&#xff0c;可以理解为一种功能。在应用程序中&#xff0c;一个页面即一种能力&#xff0c;如登录页面&#xff0c;即具有登录功能的能力。以下是对鸿蒙新建项目的登录代码功能的详细解读和工作流程的描述&#xff1a; …

C++入门篇

呀哈喽&#xff0c;我是结衣。 了解完C的发展历程&#xff0c;我们当然也要会用C啊。今天这篇博客就是来帮助我们来入门C的&#xff0c;当然要入门C当然也要先学会C语言啦。在我学习C的过程中我会一直把C博客更新下去的。 C关键字 我们都知道C语言是有32个关键字的&#xff0…

json JSON.parse()与JSON.stringify()

JSON.parse() 属于解析 JSON.parse()方法解析一个JSON字符串为ECMAScript值&#xff0c;返回解析后的值&#xff0c; JSON.parse({}); // -> {}JSON.parse([]); // -> []JSON.parse(1); // -> {}注意&#xff1a;JSON.parse()解析的JSON字符串不允许以逗…

Python-数据分析可视化实例图

Python-数据分析可视化实例图 一&#xff1a;3D纹理图 运行效果图&#xff1a; Python代码&#xff1a; import math from typing import Unionimport pyecharts.options as opts from pyecharts.charts import Surface3Ddef float_range(start: int, end: int, step: Union[…

分享66个Java源码总有一个是你想要的

分享66个Java源码总有一个是你想要的 学习知识费力气&#xff0c;收集整理更不易。 知识付费甚欢喜&#xff0c;为咱码农谋福利。 链接&#xff1a;https://pan.baidu.com/s/1hKlZJB3KrHcOuKWyV1xjKw?pwd6666 提取码&#xff1a;6666 项目名称 ava web个人网站项目 ea…

不是生活有意思,是你热爱生活它才有意思

明制汉服的设计 同样是一款很重工的外套 细节上也是做到了极致 顺毛毛呢面料 领口袖口拼接仿貂毛环保毛条 前胸欧根纱刺绣圆形布 袖子贴民族风珠片刺绣织带 门襟搭配金属子母扣&#xff0c;真盘扣设计 时尚经典&#xff0c;搭配马面裙孩子穿上 真的很有气质奢华富贵 …

程序人生15年人生感悟

计算机程序员并不是一件什么高大上的职业。而仅仅是一份普通的工作。就像医生能治病救人&#xff0c;我们能治蓝屏救程序&#xff0c;我们都在为这个世界默默的做出自己的贡献。刻意或无意宣扬某个职业高大上&#xff0c;其实质是对其它行业从业者的不公平。但是有些人却常常这…

Node.js安装教程

虽然网上Node.js的安装教程有很多&#xff0c;但是基本上都是千篇一律。虽然跟着网上内容安装&#xff0c;却总会遇到乱七八糟的问题。为此&#xff0c;我写下这篇文章&#xff0c;除了描述node的安装教程&#xff0c;还会解释这样安装的过程起到一个什么作用。 文章大致上分为…

【PHP入门】1.2-常量与变量

-常量与变量- PHP是一种动态网站开发的脚本语言&#xff0c;动态语言特点是交互性&#xff0c;会有数据的传递&#xff0c;而PHP作为“中间人”&#xff0c;需要进行数据的传递&#xff0c;传递的前提就是PHP能自己存储数据&#xff08;临时存储&#xff09; 1.2.1变量基本概…

微服务实战系列之ZooKeeper(下)

前言 通过前序两篇关于ZooKeeper的介绍和总结&#xff0c;我们可以大致理解了它是什么&#xff0c;它有哪些重要组成部分。 今天&#xff0c;博主特别介绍一下ZooKeeper的一个核心应用场景&#xff1a;分布式锁。 应用ZooKeeper Q&#xff1a;什么是分布式锁 首先了解一下&…