RabbitMQ python第三方库pika应用入门实践

1. RabbitMQ简介

RabbitMQ是一个可靠、高效的开源消息代理服务器,基于AMQP协议。它具备以下特点:

  • 可以支持多种消息协议,如AMQP、STOMP和MQTT等。
  • 提供了持久化、可靠性和灵活的路由等功能。
  • 支持消息的发布和订阅模式。
  • 具备高可用性和可扩展性。

RabbiMQ的核心概念包括生产者、消费者、队列、交换机和绑定。生产者将消息发送到交换机,交换机根据其类型和绑定规则将消息路由到队列,然后消费者从队列中获取消息进行处理。

RabbitMQ相关概念

  • Broker:接收和分发消息的应用,RabbitMQ Server就是Message Broker。
  • Virtual host:出于多租户和安全因素的设计,把AMQP的基本组件划分到一个虚拟的分组中,类似于网络中的namespace概念,当多个不同的用户使用同一个RabbitMQ Server提供的服务时,可以划分出多个vhost,每个用户在自己的vhost创建exchange/queue等。
  • Connection:publisher/consumer和broker之间的TCP连接。
  • Channel:如果每一次访问RabbitMQ都建立一个Connection,在消息量大的时候建立TCP Connection的开销都将是巨大的,效率也是非常低的。Channel是在Connection内部建立的逻辑连接,如果应用程序支持多线程,通常每个thread会创建单独的Channel进行通信,AMQP的method包含了channel id帮助客户端和message broker识别channel,所以channel之间是完全隔离的。Channel作为轻量级的Connection,极大减少了操作系统建立TCP连接的开销。

相关术语

  • producer:生产者,向队列中发送消息的程序。(在图表中通常使用P表示)
  • queue:队列,用于存储消息,定义在RabbitMQ内部,queue本质上是一个消息缓存buffer,生产者可以往里发送消息,消费者也可以从里面获取消息。(在图表中通常使用Q表示)
  • consumer:消费者,等待并从消息队列中获取消息的程序。(在图表中通常使用C表示)
  • exchange:交换机,用于将producer发送来的消息发送到queue,事实上,producer是不能直接将message发送到queue,必须先发送到exchange,再由exchange发送到queue。

注:生产者和消费者可能在不同的程序或主机中,当然也有可能一个程序有可能既是生产者,也是消费者。

2. pika简介

在Python中,pika是一个用于处理RabbitMQ消息队列的第三方库,它允许开发者在Python应用程序中发送和接收消息,实现应用程序之间的异步通信。

主要功能

  • 连接管理:pika提供了与RabbitMQ服务器建立连接的功能。
  • 通道管理:通过连接,可以创建多个通道(channel),每个通道代表一个独立的通信流。
  • 消息发送与接收:开发者可以使用pika发送消息到指定的队列(queue),并从队列中接收消息。
  • 交换机与队列声明:支持声明交换机(exchange)、队列,以及它们之间的绑定(binding)关系。
  • 消息确认:支持消息的自动确认(auto-ack)或手动确认(manual ack),以确保消息的可靠传递。

使用流程

  • 创建连接:使用pika.BlockingConnection或pika.SelectConnection等类创建与RabbitMQ服务器的连接。
  • 创建通道:通过连接对象的channel()方法创建通道。
  • 声明交换机与队列:使用通道对象的exchange_declare()和queue_declare()方法声明交换机和队列。
  • 绑定交换机与队列:使用通道对象的queue_bind()方法将队列绑定到交换机。
  • 发送消息:使用通道对象的basic_publish()方法发送消息到指定的交换机和路由键(routing key)。
  • 接收消息:使用通道对象的basic_consume()方法开始消费队列中的消息,并通过回调函数处理接收到的消息。
  • 关闭连接:在不再需要时,使用连接对象的close()方法关闭连接。

安装pika

pip install -i https://pypi.tuna.tsinghua.edu.cn/simple pika

3. pika应用入门

3.1. 生产者

import pika# 1.连接rabbit
credentials = pika.PlainCredentials('rabbit', '*****')  # rabbit用户名和密码
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.17.61',port = 5671,virtual_host = '/typc-fpd-dev',credentials = credentials))
channel = connection.channel()# 2.创建持久化队列
# 注意:非持久化队列不能变持久化队列,反之也是这样的,所有创建队列中不能创建和非持久化队列重名的队列
channel.queue_declare(queue='hello_world', durable=True)# 3.向指定队列插入数据
poiid = 'xxxxxx'
channel.basic_publish(exchange='',  # 简单模式routing_key='hello_world',  # 指定队列body=poiid,  # 向队列中添加的数据properties=pika.BasicProperties(delivery_mode=2,  # make message persistent))
print(" [x] Sent 'Hello World!'") 

在这里插入图片描述
查看虚拟主机virtual-host: /typc-fpd-dev下队列hello_world。
在这里插入图片描述

3.2. 侦听消费者

import pika# 1.连接rabbit
credentials = pika.PlainCredentials('rabbit', '*****')  # rabbit用户名和密码
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.17.61',port = 5671,virtual_host = '/typc-fpd-dev',credentials = credentials))# 2.创建持久化队列
# 注意:非持久化队列不能变持久化队列,反之也是这样的,所有创建队列中不能创建和非持久化队列重名的队列
# 注意:这一步不是必须的,但是如果消费者先启动而不是生成者先启动时,这时队列中还没有hello_world队列,这时就会报错
channel.queue_declare(queue='hello_world', durable=True)# 3.确定回调函数
def callback(ch, method, properties, body):print(" [x] Received %r" % body)# 手动应答poiid = body.decode('utf-8')  # 将 bytes 转换为字符串 Core.setPIO(poiid)            # 输入数据Core.task_process()           # 处理数据ch.basic_ack(delivery_tag=method.delivery_tag)# 4.确定监听队列参数
channel.basic_consume(queue='hello_world',  # 指定队列auto_ack=False,  # 手动应答方式on_message_callback=callback)print(' [*] Waiting for messages. To exit press CTRL+C')# 5.正式监听
channel.start_consuming()

3.3. 主动处理消费消息

在Pika中,basic_get方法确实可以用于从队列中直接获取消息,但通常不推荐在生产环境中使用,因为它不是高效的消息处理方式。不过,如果你确实需要这种方法,以下是如何使用basic_get的示例:

# 1.连接rabbit
credentials = pika.PlainCredentials('rabbit', '*****')  # rabbit用户名和密码
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.17.61',port = 5671,virtual_host = '/typc-fpd-dev',credentials = credentials))
channel = connection.channel()time.sleep(1)# 2.创建持久化队列
# 注意:非持久化队列不能变持久化队列,反之也是这样的,所有创建队列中不能创建和非持久化队列重名的队列
# 注意:这一步不是必须的,但是如果消费者先启动而不是生成者先启动时,这时队列中还没有hello2队列,这时就会报错
channel.queue_declare(queue='hello_world', durable=True)
count = 5
for i in range(count):  print('取消息开始时间')  method_frame, header_frame, body = channel.basic_get(queue='hello_world', auto_ack=False)  if method_frame:  # 处理消息体 print('body:',body)  poiid = body.decode('utf-8')  # 将 bytes 转换为字符串 Core.setPIO(poiid)Core.task_process()time.sleep(2)        # 如果你设置了auto_ack=False,则需要手动确认消息  channel.basic_ack(delivery_tag=method_frame.delivery_tag)  else:  print("没有消息可以获取,", str(i))time.sleep(1)
print('取消息完成时间')
connection.close()#来关闭连接 

参考:

三只松鼠. python 操作RabbitMq详解. 博客园. 2019.03

卫玠_juncheng. Python三方库:Pika(RabbitMQ基础使用). CSDN博客. 2024.03

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/850024.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Java开发-面试题-0004-HashMap 和 Hashtable的区别

Java开发-面试题-0004-HashMap 和 Hashtable的区别 更多内容欢迎关注我(持续更新中,欢迎Star✨) Github:CodeZeng1998/Java-Developer-Work-Note 技术公众号:CodeZeng1998(纯纯技术文) 生活…

Java | Leetcode Java题解之第137题只出现一次的数字II

题目: 题解: class Solution {public int singleNumber(int[] nums) {int a 0, b 0;for (int num : nums) {b ~a & (b ^ num);a ~b & (a ^ num);}return b;} }

javaweb学习(day14-ThreadLocal文件上传下载)

一、线程数据共享和安全 -ThreadLocal 1 什么是 ThreadLocal ThreadLocal 的作用,可以实现在同一个线程数据共享, 从而解决多线程数据安全问题. ThreadLocal 可以给当前线程关联一个数据(普通变量、对象、数组)set 方法 [源码!] ThreadLocal 可以像 Map 一样存取数…

Vue的基础知识:v-model的原理,由:value与@input合写。

原理:v-model本质上是一个语法糖,比如应用在输入框上,就是value属性和input事件的合写。(补充说明:语法糖就是语法的简写) 作用:提供数据的双向绑定 1.数据变,视图(也就…

[数据集][目标检测]叶子计数检测数据集VOC+YOLO格式240张1类别

数据集格式:Pascal VOC格式YOLO格式(不包含分割路径的txt文件,仅仅包含jpg图片以及对应的VOC格式xml文件和yolo格式txt文件) 图片数量(jpg文件个数):240 标注数量(xml文件个数):240 标注数量(txt文件个数):240 标注类别…

2024年谷歌SEO如何快速出排名效果抢占首页制高点?

2024年,随着谷歌搜索引擎算法的不断更新,SEO策略也需要与时俱进才能快速出排名。本文将结合谷歌最新SEO趋势,平哥SEO分享一些实操性的快速排名技巧,帮助你在竞争激烈的搜索结果中脱颖而出。 额外话题:就是通过微信公众…

Java Web学习笔记25——Vue组件库Element

什么是Element? Element: 是饿了么团队研发的,一套为开发者、设计师和产品经理准备的基于Vue2.0的桌面端组件库。 组件:组成网页的部件,例如:超链接、按钮、图片、表格、表单、分页条等等。 官网:https:…

深拷贝、浅拷贝、引用拷贝

深拷贝和浅拷贝的区别 1. 引用拷贝2. 对象拷贝 1. 引用拷贝 两个对象指向同一个地址值。 创建一个指向对象的引用变量的拷贝Teacher teacher new Teacher("Taylor",26); Teacher otherteacher teacher; System.out.println(teacher); System.out.println(otherte…

前端多人项目开发中,如何保证CSS样式不冲突?

在前端项目开发中,例如突然来了一个大项目,很可能就需要多人一起开发,领导说了,要快,要快,要快,你们给我快。然后下面大伙就一拥而上,干着干着发现,一更新代码&#xff0…

【AI论文与新生技术】Follow-Your-Emoji:精细可控且富有表现力的自由式人像动画技术

我们提出了 Follow-Your-Emoji,这是一种基于扩散的肖像动画框架,它使用目标地标序列对参考肖像进行动画处理。肖像动画的主要挑战是保留参考肖像的身份并将目标表情转移到该肖像,同时保持时间一致性和保真度。为了应对这些挑战,Fo…

JFinal学习07 控制器——接收数据之getBean()和getModel()

JFinal学习07 控制器——接收数据之getBean()和getModel() 视频来源https://www.bilibili.com/video/BV1Bt411H7J9/?spm_id_from333.337.search-card.all.click 文章目录 JFinal学习07 控制器——接收数据之getBean()和getModel()一、接收数据的类型二、getBean()和getModel()…

GDPU JavaWeb Ajax请求

异步请求可以提升用户体验并优化页面性能。 ajax登录 实现ajax异步登录。 注意,ajax用到了jQuery库,先下载好相应的js库,然后复制导入到工程的web目录下,最好与你的前端页面同一层级。然后编写时路径一定要找准,“pag…

转让北京公司带旅行许可证流程和要求

旅行社是开展旅游服务业务的专项经济主体,旅行社开展相关业务必须持有旅行社业务经营许可证。该资质又分为国内旅行社经营许可证和出境旅行社经营许可证。主要区别在于能否开展出境旅游业务,下面老耿带大家了解,新成立国内旅行社要求及出境旅…

python-windows10普通笔记本跑bert mrpc数据样例0.1.001

python-windows10普通笔记本跑bert mrpc数据样例0.1.000 背景参考章节获取数据下载bert模型下载bert代码windows10的cpu执行结果注意事项TODOLIST背景 看了介绍说可以在gpu或者tpu上去微调,当前没环境,所以先在windows10上跑一跑,看是否能顺利进行,目标就是训练的过程中没…

js--hasOwnProperty()讲解与使用

@TOC 前言 hasOwnProperty(propertyName)方法 是用来检测属性是否为对象的自有属性 object.hasOwnProperty(propertyName) // true/false 讲解 hasOwnProperty() 方法是 Object 的原型方法(也称实例方法),它定义在 Object.prototype 对象之上,所有 Object 的实例对象都会继…

6.7 输入输出流

输入:将数据放到程序(内存)中 输出:将数据从程序(内存)放到设备中 C的输入输出分为3种形式: 从键盘屏幕中输入输出,称为标准IO 对于磁盘进行标准输入输出,称为文件IO…

第5章 if语句

第5章 if语句 5.1 示例5.2 条件测试5.2.1 检查是否相等5.2.2 检查是否相等时忽略大小写5.2.3 检查是否不相等5.2.4 数值比较5.2.5 检查多个条件5.2.6 检查特定值是否包含在列表中5.2.7 检查特定值是否不包含在列表中5.2.8 布尔表达式 5.3 if 语句5.3.1 简单的if 语句5.3.2 if-e…

硕思闪客精灵(shankejingling)软件最新版下载及详细安装教程

闪客精灵(Sothink SWF Decompiler)是一款先进的SWF反编译软件,它不但能捕捉、反编译、查看和提取Shock Wave Flash影片(.swf和.exe格式文件),而且可以将SWF格式文件转化为FLA格式文件。它能反编译Flash的所…

四天工作制,比你想象的更近了一点

原文:Andrew Keshner - 2024.05.30 软件公司、大型企业甚至警察部门都在试验这一看似遥不可及的概念。 教育软件公司 Kuali 的会议精简,除非绝对必要,员工尽量避免安排会议。即使有会议,也鼓励员工跳过与自己工作无关的部分。在…

从技术到产品:以客户为中心的产品研发之路

一、引言 在快速发展的商业环境中,产品作为连接企业与市场的桥梁,其重要性不言而喻。从摸着石头过河搞产品,到广泛传播NPDP(新产品开发流程)理念,产品研发的道路经历了从直觉驱动到系统思维的转变。本文将…