RabbitMQ队列

RabbitMQ是什么?

RabbitMQ是一个在AMQP基础上完整的,可复用的企业消息系统。他遵循Mozilla Public License开源协议。

MQ全称为Message Queue, 消息队列(MQ)是一种应用程序对应用程序的通信方法。应用程序通过读写出入队列的消息(针对应用程序的数据)来通信,而无需专用连接来链接它们。消 息传递指的是程序之间通过在消息中发送数据进行通信,而不是通过直接调用彼此来通信,直接调用通常是用于诸如远程过程调用的技术。排队指的是应用程序通过 队列来通信。队列的使用除去了接收和发送应用程序同时执行的要求。

RabbitMQ的安装

首先说明,RabbitMQ在win上安装是一件颇为麻烦的事情。试了很长时间都没有成功,后来就转战linux了。在linux的安装中也可能会出现一点问题,下面会贴出一个网址有安装中出现问题的解决办法。

linux上都是直接install rabbitmq-server

当然可能会在安装中和后来的使用上出现这样或者是那样的问题,解决办法参见这篇博客http://www.cnblogs.com/kaituorensheng/p/4985767.html

RabbitMQ的语法以及实例

1.基本实例

基于Queue实现生产者消费者模型

 1 import Queue
 2 import threading
 3 
 4 
 5 message = Queue.Queue(10)
 6 
 7 
 8 def producer(i):
 9     while True:
10         message.put(i)
11 
12 
13 def consumer(i):
14     while True:
15         msg = message.get()
16 
17 
18 for i in range(12):
19     t = threading.Thread(target=producer, args=(i,))
20     t.start()
21 
22 for i in range(10):
23     t = threading.Thread(target=consumer, args=(i,))
24     t.start()
View Code

对于RabbitMQ来说,生产和消费不再针对内存里的一个Queue对象,而是某台服务器上的RabbitMQ Server实现的消息队列。

import pikaconnection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()#开通一个管道#声明queue
channel.queue_declare(queue='hello')channel.basic_publish(exchange='',routing_key='hello',#queue名字body='Hello World!')#消息内容
print(" [x] Sent 'Hello World!'")
connection.close()
import pika
#建立连接
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()channel.queue_declare(queue='hello')def callback(ch, method, properties, body):print(" [x] Received %r" % body)channel.basic_consume(#消费消息callback,#如果收到消息就调用callback函数处理消息queue='hello',no_ack=True)print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

2.消息发布轮询

  • 上面的只是一个生产者、一个消费者,能不能一个生产者多个消费者呢? 
    可以上面的例子,多启动几个消费者consumer,看一下消息的接收情况。 
    采用轮询机制;把消息依次分发

 

  • 假如消费者处理消息需要15秒,如果当机了,那这个消息处理明显还没处理完,怎么处理? 
    (可以模拟消费端断了,分别注释和不注释 no_ack=True 看一下) 
    你没给我回复确认,就代表消息没处理完。

 

  • 上面的效果消费端断了就转到另外一个消费端去了,但是生产者怎么知道消费端断了呢? 
    因为生产者和消费者是通过socket连接的,socket断了,就说明消费端断开了。

 

  • 上面的模式只是依次分发,实际情况是机器配置不一样。怎么设置类似权重的操作?
    RabbitMQ怎么办呢,RabbitMQ做了简单的处理就能实现公平的分发。 
    就是RabbitMQ给消费者发消息的时候检测下消费者里的消息数量,如果超过指定值(比如1条),就不给你发了。 
    只需要在消费者端,channel.basic_consume前加上就可以了。
channel.basic_qos(prefetch_count=1)  # 类似权重,按能力分发,如果有一个消息,就不在给你发

3. acknowledgment 消息持久化

 no-ack = False

如果消费者遇到情况(its channel is closed, connection is closed, or TCP connection is lost)挂掉了,那么,RabbitMQ会重新将该任务添加到队列中。

import pikaconnection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()channel.queue_declare(queue='hello')def callback(ch, method, properties, body):print(" [x] Received %r" % body)import timetime.sleep(10)print ('ok')ch.basic_ack(delivery_tag = method.delivery_tag)channel.basic_consume(callback,queue='hello',no_ack=False)print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
生产者

durable  

如果队列里还有消息,RabbitMQ 服务端宕机了呢?消息还在不在? 
把RabbitMQ服务重启,看一下消息在不在。 
上面的情况下,宕机了,消息就久了,下面看看如何把消息持久化。 
每次声明队列的时候,都加上durable,注意每个队列都得写,客户端、服务端声明的时候都得写。

# 在管道里声明queue
channel.queue_declare(queue='hello2', durable=True)

durable的作用只是把队列持久化。离消息持久话还差一步: 
发送端发送消息时,加上properties

properties=pika.BasicProperties(delivery_mode=2,  # 消息持久化)
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import pikaconnection = pika.BlockingConnection(pika.ConnectionParameters(host='10.211.55.4'))
channel = connection.channel()# make message persistent
channel.queue_declare(queue='hello', durable=True)def callback(ch, method, properties, body):print(" [x] Received %r" % body)import timetime.sleep(10)print 'ok'ch.basic_ack(delivery_tag = method.delivery_tag)channel.basic_consume(callback,queue='hello',no_ack=False)print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()消费者
生产者
import pikaconnection = pika.BlockingConnection(pika.ConnectionParameters(host='10.211.55.4'))
channel = connection.channel()# make message persistent
channel.queue_declare(queue='hello', durable=True)def callback(ch, method, properties, body):print(" [x] Received %r" % body)import timetime.sleep(10)print( 'ok')ch.basic_ack(delivery_tag = method.delivery_tag)channel.basic_consume(callback,queue='hello',no_ack=False)print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
消费者

4.消息获取顺序

默认消息队列里的数据是按照顺序被消费者拿走,例如:消费者1 去队列中获取 奇数 序列的任务,消费者2去队列中获取 偶数 序列的任务。

channel.basic_qos(prefetch_count=1) 表示谁来谁取,不再按照奇偶数排列

 1 #Auther: Xiaoliuer Li
 2 
 3 import pika
 4 
 5 connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
 6 channel = connection.channel()
 7 
 8 # make message persistent
 9 channel.queue_declare(queue='hello')
10 
11 
12 def callback(ch, method, properties, body):
13     print(" [x] Received %r" % body)
14     import time
15     time.sleep(10)
16     print ('ok')
17     ch.basic_ack(delivery_tag = method.delivery_tag)
18 
19 channel.basic_qos(prefetch_count=1)
20 
21 channel.basic_consume(callback,
22                       queue='hello',
23                       no_ack=False)
24 
25 print(' [*] Waiting for messages. To exit press CTRL+C')
26 channel.start_consuming()
消费者

5.发布订阅(广播模式)

前面的效果都是一对一发,如果做一个广播效果可不可以,这时候就要用到exchange了 
exchange必须精确的知道收到的消息要发给谁。exchange的类型决定了怎么处理, 
类型有以下几种:

  • fanout: 所有绑定到此exchange的queue都可以接收消息
  • direct: 通过routingKey和exchange决定的那个唯一的queue可以接收消息
  • topic: 所有符合routingKey(此时可以是一个表达式)的routingKey所bind的queue可以接收消息

fanout 纯广播、all

发布订阅和简单的消息队列区别在于,发布订阅会将消息发送给所有的订阅者,而消息队列中的数据被消费一次便消失。所以,RabbitMQ实现发布和订阅时,会为每一个订阅者创建一个队列,而发布者发布消息时,会将消息放置在所有相关队列中。

import pika
import sysconnection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()channel.exchange_declare(exchange='logs',type='fanout')message = ' '.join(sys.argv[1:]) or "info: Hello World!"
channel.basic_publish(exchange='logs',routing_key='',body=message)
print(" [x] Sent %r" % message)
connection.close()
发布者
import pikaconnection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()channel.exchange_declare(exchange='logs',type='fanout')result = channel.queue_declare(exclusive=True)
queue_name = result.method.queuechannel.queue_bind(exchange='logs',queue=queue_name)print(' [*] Waiting for logs. To exit press CTRL+C')def callback(ch, method, properties, body):print(" [x] %r" % body)channel.basic_consume(callback,queue=queue_name,no_ack=True)channel.start_consuming()
订阅者

注意:广播,是实时的,收不到就没了,消息不会存下来,类似收音机。

direct 有选择的接收消息

 

之前事例,发送消息时明确指定某个队列并向其中发送消息,RabbitMQ还支持根据关键字发送,即:队列绑定关键字,发送者将数据根据关键字发送到消息exchange,exchange根据 关键字 判定应该将数据发送至指定队列。

import pika
import sysconnection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()channel.exchange_declare(exchange='direct_logs',type='direct')severity = sys.argv[1] if len(sys.argv) > 1 else 'info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='direct_logs',routing_key=severity,body=message)
print(" [x] Sent %r:%r" % (severity, message))
connection.close()
发送者
import pika
import sysconnection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()channel.exchange_declare(exchange='direct_logs',type='direct')result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
# 获取运行脚本所有的参数
severities = sys.argv[1:]
if not severities:sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])sys.exit(1)
# 循环列表去绑定
for severity in severities:channel.queue_bind(exchange='direct_logs',queue=queue_name,routing_key=severity)print(' [*] Waiting for logs. To exit press CTRL+C')def callback(ch, method, properties, body):print(" [x] %r:%r" % (method.routing_key, body))channel.basic_consume(callback,queue=queue_name,no_ack=True)channel.start_consuming()
接收者

运行接收端,指定接收级别的参数,例:

python direct_sonsumer.py info warning 
python direct_sonsumer.py warning error

topic 更细致的过滤

在topic类型下,可以让队列绑定几个模糊的关键字,之后发送者将数据发送到exchange,exchange将传入”路由值“和 ”关键字“进行匹配,匹配成功,则将数据发送到指定队列。

  • # 表示可以匹配 0 个 或 多个 单词
  • *  表示只能匹配 一个 单词
发送者路由值              队列中
old.boy.python          old.*  -- 不匹配
old.boy.python          old.#  -- 匹配
import pika
import sysconnection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()channel.exchange_declare(exchange='topic_logs',type='topic')routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='topic_logs',routing_key=routing_key,body=message)
print(" [x] Sent %r:%r" % (routing_key, message))
connection.close()
生产者
import pika
import sysconnection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()channel.exchange_declare(exchange='topic_logs',type='topic')routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='topic_logs',routing_key=routing_key,body=message)
print(" [x] Sent %r:%r" % (routing_key, message))
connection.close()
消费者

注意:

sudo rabbitmqctl add_user alex 123
# 设置用户为administrator角色
sudo rabbitmqctl set_user_tags alex administrator
# 设置权限
sudo rabbitmqctl set_permissions -p "/" alex '.''.''.'# 然后重启rabbiMQ服务
sudo /etc/init.d/rabbitmq-server restart# 然后可以使用刚才的用户远程连接rabbitmq server了。------------------------------
credentials = pika.PlainCredentials("alex","123")connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.14.47',credentials=credentials))
View Code

6.RabbitMQ RPC 实现(Remote procedure call)

不知道你有没有发现,上面的流都是单向的,如果远程的机器执行完返回结果,就实现不了了。 
如果返回,这种模式叫什么呢,RPC(远程过程调用),snmp就是典型的RPC 
RabbitMQ能不能返回呢,怎么返回呢?既是发送端又是接收端。 
但是接收端返回消息怎么返回?可以发送到发过来的queue里么?不可以。 
返回时,再建立一个queue,把结果发送新的queue里 
为了服务端返回的queue不写死,在客户端给服务端发指令的的时候,同时带一条消息说,你结果返回给哪个queue

import pika
import uuid
import timeclass FibonacciRpcClient(object):def __init__(self):self.connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))self.channel = self.connection.channel()result = self.channel.queue_declare(exclusive=True)self.callback_queue = result.method.queueself.channel.basic_consume(self.on_response,  # 只要一收到消息就调用on_responseno_ack=True,queue=self.callback_queue)  # 收这个queue的消息def on_response(self, ch, method, props, body):  # 必须四个参数# 如果收到的ID和本机生成的相同,则返回的结果就是我想要的指令返回的结果if self.corr_id == props.correlation_id:self.response = bodydef call(self, n):self.response = None  # 初始self.response为Noneself.corr_id = str(uuid.uuid4())  # 随机唯一字符串
        self.channel.basic_publish(exchange='',routing_key='rpc_queue',  # 发消息到rpc_queueproperties=pika.BasicProperties(  # 消息持久化reply_to = self.callback_queue,  # 让服务端命令结果返回到callback_queuecorrelation_id = self.corr_id,  # 把随机uuid同时发给服务器
                ),body=str(n))while self.response is None:  # 当没有数据,就一直循环# 启动后,on_response函数接到消息,self.response 值就不为空了self.connection.process_data_events()  # 非阻塞版的start_consuming()# print("no msg……")# time.sleep(0.5)# 收到消息就调用on_responsereturn int(self.response)if __name__ == '__main__':fibonacci_rpc = FibonacciRpcClient()print(" [x] Requesting fib(7)")response = fibonacci_rpc.call(7)print(" [.] Got %r" % response)
RPC client
import pika
import timedef fib(n):if n == 0:return 0elif n == 1:return 1else:return fib(n-1) + fib(n-2)def on_request(ch, method, props, body):n = int(body)print(" [.] fib(%s)" % n)response = fib(n)ch.basic_publish(exchange='',  # 把执行结果发回给客户端routing_key=props.reply_to,  # 客户端要求返回想用的queue# 返回客户端发过来的correction_id 为了让客户端验证消息一致性properties=pika.BasicProperties(correlation_id = props.correlation_id),body=str(response))ch.basic_ack(delivery_tag = method.delivery_tag)  # 任务完成,告诉客户端if __name__ == '__main__':connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))channel = connection.channel()channel.queue_declare(queue='rpc_queue')  # 声明一个rpc_queue ,
channel.basic_qos(prefetch_count=1)# 在rpc_queue里收消息,收到消息就调用on_requestchannel.basic_consume(on_request, queue='rpc_queue')print(" [x] Awaiting RPC requests")channel.start_consuming()
RPC server

 

转载于:https://www.cnblogs.com/lixiaoliuer/p/6846063.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/283616.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

《ASP.NET Core 6框架揭秘实例》演示[14]:日志的进阶用法

为了对各种日志框架进行整合,微软创建了一个用来提供统一的日志编程模式的日志框架。《ASP.NET Core 6框架揭秘》实例演示[13]:日志的基本编程模式》以实例演示的方式介绍了日志的基本编程模式,现在我们来补充几种“进阶”用法。[本文节选《A…

什么是云原生,云原生技术为什么这么火?

文章目录 一、开篇浅谈二、云计算是什么三、云原生是什么四、云计算的四个层次 4.1 IaaS(基础架构即服务)4.2 PaaS(平台即服务)4.3 SaaS(软件即服务)4.4 DaaS(数据即服务)五、云原生…

PerfView专题 (第五篇):如何寻找 C# 托管内存泄漏

一:背景 前几篇我们聊的都是 非托管内存泄漏,这一篇我们再看下如何用 PerfView 来排查 托管内存泄漏 ,其实 托管内存泄漏 比较好排查,尤其是用 WinDbg,毕竟C#是带有丰富的元数据,不像C下去就是二进制。二&a…

DevOps及DevOps常用的工具介绍

目录 1. 什么是 DevOps2. DevOps 概念的起源 2.1. 单体架构 瀑布模式2.2. 分布式架构 敏捷开发模式 2.2.1. 多人协同开发问题2.2.2. 多机器问题2.2.3. 开发和运维角色的天生对立问题2.3. 微服务架构 DevOps3. DevOps 到底是什么4. DevOps 常用的工具 4.1. Jenkins4.2. Kuber…

2018年SIAF 广州国际工业自动化技术及装备展览会下周隆重开幕

同期研讨活动聚焦行业未来趋势,探索技术发展及实际应用层面。 华南最重要的工业自动化行业盛会之一,SIAF广州国际工业自动化技术及装备展览会,将于2018年3月4至6日在广州中国进出口商品交易会展馆隆重开幕。为期三天的展会将再度与广州国际模…

相约现在,遇见未来

# 遇见未来这个世界很小,我们就这样遇见。这个世界很大,分开就很难再见。大家好,我是 chait,很高兴我们在这里《遇见》。今天是我申请公众号通过后的第一天,也是在该平台发表的第一篇文章,唠嗑点啥呢&#…

有关并行的两个重要定律

本文摘自 葛一鸣 老师的《实战java高并发程序设计》一书。因为觉得写得好就摘下来了 将串行程序改造成并发程序,一般来说可以提高程序的整体性能,但是究竟能提升多少,甚至说究竟是否真的可以提高,还是一个需要研究的问题。目前&am…

IT圈中的Bug的类型与历史

美国计算机科学家、图灵奖获得者詹姆斯尼古拉格雷(Jim Gray),在他的著名的论文“Why do computers stop and what can be done about it?”中首次提出了程序bug的类型,比如玻尔bug(Bohrbug)、 海森堡bug(Heisenbugs)等用著名科学家名称命名的bug。后来又…

Windows Nano Server安装配置详解03:远程管理Nano Server

远程管理Nano Server主要是通过使用远程powershell的方式。首先,我们把Nano Server的登录凭据保存到$cred变量之中,如图。其次,把远程Nano Server服务器添加到远程管理机本地的trustedHosts中,否则会报下面的错误,如图…

你和阿里资深架构师之间,差的不仅仅是年龄(进阶必看)

导读:阅读本文需要有足够的时间,笔者会由浅到深带你一步一步了解一个资深架构师所要掌握的各类知识点,你也可以按照文章中所列的知识体系对比自身,对自己进行查漏补缺,觉得本文对你有帮助的话,可以点赞关注…

[luoguP2601] [ZJOI2009]对称的正方形(二维Hash + 二分 || Manacher)

传送门 很蒙蔽,不知道怎么搞。 网上看题解有说可以哈希二分搞,也有的人说用Manacher搞,Manacher是什么鬼?以后再学。 对于这个题,可以从矩阵4个角hash一遍,然后枚举矩阵中的点,再二分半径。 但是…

Semaphore详解

Semaphore基本使用场景 Semaphore的基本使用场景是限制一定数量的线程能够去执行. 举个简单的例子: 一个单向隧道能同时容纳10个小汽车或5个卡车通过(1个卡车等效与2个小汽车), 而隧道入口记录着当前已经在隧道内的汽车等效比重. 比如1个小汽车和1个卡车, 则隧道入口显示3. 若…

PerfView专题 (第六篇):如何洞察 C# 中 GC 的变化

一:背景 在洞察 GC 方面,我觉得市面上没有任何一款工具可以和 PerfView 相提并论,这也是为什么我会在 WinDbg 之外还要学习这么一款工具的原因,这篇我们先简单聊聊 PerfView 到底能洞察 GC 什么东西?二:洞察…

Linux_日志管理介绍(一)

一、介绍1、CentOS 6.x中日志服务已经由rsyslogd取代了原先的syslogd服务,但是rsyslogd是和syslogd服务相兼容的2、除了系统默认的日志之外,采用RPM方式安装的系统服务也会默认把日志记录在/var/log/目录中(源码包安装的服务日志是在源码包指…

如何将exe文件添加到开机启动

1、先创建exe文件的快捷方式 2、打开windows的startup启动目录(针对win10以上) windows有两个以上startup目录,一个是针对所有用户有效的,另外是每个用户下边有一个: 针对当前用户 : C:\Users\{当前用户}\A…

.NET MAUI 跨平台应用程序 (Windows App 和 Android )示例

也就前周,.Net MAUI正式版出来了 ,一个支持跨平台的UI框架,Linux支持情况官网也没说,按理来说应该也是支持的,刚好,我最近也在研究GUI的基本原理,微软出品还是值得深入研究一下的,就先来个样例&…

OpenStack 计算节点删除

前提 计算节点中一个僵尸计算节点存在,而里面的CPU数目在总物理CPU中,导致认为当前能创建实例。而实际没有这么多资源。其中node-11为僵尸节点。 原因 删除计算节点不能直接格式化该服务器,否则在控制节点的数据库上会存在该计算节点的数据。…

PHP 7.2 新功能介绍

PHP 7.2 已經在 2017 年 11 月 30 日 正式發布 。這次發布包含新特性、功能,及優化,以讓我們寫出更好的代碼。在這篇文章裡,我將會介紹一些 PHP 7.2 最有趣的語言特性。 你可以在 Requests For Comments 頁面查看完整的更動清單。 核心改进 参…

如何打造单文件 Blazor Server 应用

前言上次&#xff0c;我们介绍了《如何打造单文件前后端集成 ASP.NET Core 应用》。但是&#xff0c;网友说&#xff0c;对于 Blazor Server 项目此方法无效。于是&#xff0c;我们测试了一下&#xff1a;BlazorApp1.csproj<Project Sdk"Microsoft.NET.Sdk.Web"&g…

Android线程池详解

引入线程池的好处 1&#xff09;提升性能。创建和消耗对象费时费CPU资源 2&#xff09;防止内存过度消耗。控制活动线程的数量&#xff0c;防止并发线程过多。 我们来看一下线程池的简单的构造 [html] view plaincopy print?public ThreadPoolExecutor(int corePoolSize, …