RabbitMQ/pika模块

简介

MessageQueue用于解决跨进程、跨线程、跨应用、跨网络的通信问题。

RabbitMQ使用erlang开发,在windows上使用时要先安装erlang。

官方的示例比较容易理解,可以点这里去看看。

结构

生产者 ---> exchange ---> queue ---> 消费者。

生产者负责提供消息,exchange负责分发消息到指定queue,queue存储消息(默认临时,可设置持久化),消费者接收处理消息。

基本模型

流程:

  1. 建立到rabbitmq的连接
  2. 建立通道
  3. 声明使用的队列(生产者和消费者都要声明,因为不能确定两者谁先运行)
  4. 生产/消费
  5. 持续监听/关闭连接

python中使用pika模块来处理与rabbitmq的连接。

# 生产者
import pikaconnection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')
)
channel = connection.channel()
r = channel.queue_declare(queue='name', exclusive=False, durable=False) # exclusive设置True是随机生成一个queue名字并返回,durable设置True是队列持久化
queue_name = r.method.queuechannel.basic_publish(exchange = '', # 使用默认分发器routing_key = queue_name,properties = pika.BasicProperties(delivery_mode = 2 # 这个参数用于设置消息持久化),body = [data]
)connection.close()# 消费者
import pikaconnection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')
)
channel = connection.channel()
r = channel.queue_declare(queue='name', exclusive=False, durable=False)
queue_name = r.method.queuedef callback(channel, method, properties, body):pass# channel.basic_ack(delivery_tag = method.delivery_tag) 在回调函数最后调用手工应答,表示消息处理完毕,queue可以删除消息了channel.basic_consume(callback, # 这是个回调函数,接收生产者发来的bodyqueue = queue_name,no_ack = True # 设置True表示消息一经获取就被从queue中删除,如果这时消费者崩溃,则这条消息将永久丢失,所以去掉这个属性,在回调函数中手工应答比较安全
)channel.basic_qos(prefetch_count = [num]) # 设置消费者的消费能力,数字越大,则说明该消费者能力越强,往往与设备性能成正比channel.start_consuming() # 阻塞模式获取消息
# connection.process_data_events() 非阻塞模式获取消息

发布订阅模型

类似收音机广播,订阅者只要打开收音机就能收听信息,但接收不到它打开之前的消息。

包括发布订阅模型以及接下来的一些模型,都是通过exchange和routing_key这两个属性来控制的。直接用官网的源码来做注释。

流程:

  1. 发布者设置发布频道
  2. 收听者配置频道信息
  3. 收听者通过随机queue绑定频道接收消息
# 发布者
#!/usr/bin/env python
import pika
import sysconnection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()# 创建一个命名exchange,并设置其type为fanout,表示广播
channel.exchange_declare(exchange='logs',exchange_type='fanout')# 从命令行接收输入
message = ' '.join(sys.argv[1:]) or "info: Hello World!"# 由于是广播模式,任意消费者只要设置同样的exchange,就能以任意queue来接收消息,所以这里routing_key置空
channel.basic_publish(exchange='logs',routing_key='',body=message)
print(" [x] Sent %r" % message)
connection.close()# 收听者
#!/usr/bin/env python
import pikaconnection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()# 这里使用同样的exchange配置,就像调节收音机频道
channel.exchange_declare(exchange='logs',exchange_type='fanout')# 在基础模型中提到过,设置exclusive=True表示生成随机的queue
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue# 生成了queue,还要将它与exchange进行绑定,这样消息才能通过exchange进入queue
channel.queue_bind(exchange='logs',queue=queue_name)print(' [*] Waiting for logs. To exit press CTRL+C')def callback(ch, method, properties, body):print(" [x] %r" % body)channel.basic_consume(callback,queue=queue_name,no_ack=True)channel.start_consuming()

路由/级别模型

将消息发送到指定的路由处,类似于logging模块的分级日志消息。

主要利用channel.queue_bind(routing_key=[route])这个方法,来为queue增加路由。

流程:

  1. 生产者向指定路由发送消息
  2. 消费者绑定路由
  3. 根据路由接收到不同的消息
# 生产者
#!/usr/bin/env python
import pika
import sysconnection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()# 同样使用命名exchange,主要是type为direct
channel.exchange_declare(exchange='direct_logs',exchange_type='direct')# 将命令行输入的路由作为接收消息的queue的属性,只有匹配的才能接收到消息
severity = sys.argv[1] if len(sys.argv) > 2 else 'info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='direct_logs',routing_key=severity, body=message)
print(" [x] Sent %r:%r" % (severity, message))
connection.close()# 消费者
#!/usr/bin/env python
import pika
import sysconnection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()channel.exchange_declare(exchange='direct_logs',exchange_type='direct')result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue# 指定该消费者接收的消息路由
severities = sys.argv[1:]
if not severities:sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])sys.exit(1)# 对该消费者的queue绑定路由
for severity in severities:channel.queue_bind(exchange='direct_logs',queue=queue_name,routing_key=severity)print(' [*] Waiting for logs. To exit press CTRL+C')def callback(ch, method, properties, body):print(" [x] %r:%r" % (method.routing_key, body))channel.basic_consume(callback,queue=queue_name,no_ack=True)channel.start_consuming()

细目模型/更细致的划分

这个模型比前几种更强大,但是原理与路由模型是相同的。

如果routing_key='#',它就相当于发布订阅模式,向所有queue发送消息,如果routing_key值中不包含*,#,则相当于路由模型。

该模型主要是实现了模糊匹配。

# 生产者
#!/usr/bin/env python
import pika
import sysconnection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()channel.exchange_declare(exchange='topic_logs',exchange_type='topic')routing_key = sys.argv[1] if len(sys.argv) > 2 else 'anonymous.info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='topic_logs',routing_key=routing_key,body=message)
print(" [x] Sent %r:%r" % (routing_key, message))
connection.close()# 消费者
#!/usr/bin/env python
import pika
import sysconnection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()channel.exchange_declare(exchange='topic_logs',exchange_type='topic')result = channel.queue_declare(exclusive=True)
queue_name = result.method.queuebinding_keys = sys.argv[1:]
if not binding_keys:sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0])sys.exit(1)for binding_key in binding_keys:channel.queue_bind(exchange='topic_logs',queue=queue_name,routing_key=binding_key)print(' [*] Waiting for logs. To exit press CTRL+C')def callback(ch, method, properties, body):print(" [x] %r:%r" % (method.routing_key, body))channel.basic_consume(callback,queue=queue_name,no_ack=True)channel.start_consuming()

RPC模型

前面的几种模型都只能是一端发消息,另一端接收,RPC模型实现的就是单端收发功能。

主要是通过两个队列实现,一个发,一个收。

流程:

  1. 客户端发送消息到约定队列,并且附带返回队列的名称和验证id
  2. 服务器接到消息,将处理过的消息发送给指定队列并附带验证id
  3. 客户端接到消息先验证id,通过则处理消息
# 服务器
#!/usr/bin/env python
import pikaconnection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))channel = connection.channel()channel.queue_declare(queue='rpc_queue')def fib(n):if n == 0:return 0elif n == 1:return 1else:return fib(n-1) + fib(n-2)def on_request(ch, method, props, body):n = int(body)print(" [.] fib(%s)" % n)response = fib(n)ch.basic_publish(exchange='',routing_key=props.reply_to,properties=pika.BasicProperties(correlation_id = \props.correlation_id),body=str(response))ch.basic_ack(delivery_tag = method.delivery_tag)channel.basic_qos(prefetch_count=1)
channel.basic_consume(on_request, queue='rpc_queue')print(" [x] Awaiting RPC requests")
channel.start_consuming()# 客户端
#!/usr/bin/env python
import pika
import uuidclass FibonacciRpcClient(object):def __init__(self):self.connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))self.channel = self.connection.channel()result = self.channel.queue_declare(exclusive=True)self.callback_queue = result.method.queueself.channel.basic_consume(self.on_response, no_ack=True,queue=self.callback_queue)def on_response(self, ch, method, props, body):if self.corr_id == props.correlation_id:self.response = bodydef call(self, n):self.response = Noneself.corr_id = str(uuid.uuid4())self.channel.basic_publish(exchange='',routing_key='rpc_queue',properties=pika.BasicProperties(reply_to = self.callback_queue,correlation_id = self.corr_id,),body=str(n))while self.response is None:self.connection.process_data_events()return int(self.response)fibonacci_rpc = FibonacciRpcClient()print(" [x] Requesting fib(30)")
response = fibonacci_rpc.call(30)
print(" [.] Got %r" % response)

转载于:https://www.cnblogs.com/ikct2017/p/9434468.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/427236.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

java 对象流_java 对象流的简单使用

对象的输入输出流的作用: 用于写入对象 的信息和读取对象的信息。 使得对象持久化。ObjectInputStream : 对象输入流ObjectOutPutStream :对象输出流简单的实例1 importjava.io.File;2 importjava.io.FileInputStream;3 importjava.io.FileOutputStre…

centos搭建ftp服务器

1安装vsftpd 2备份配置文件 3修改配置文件 vi /etc/vsftpd/vsftpd.conf anonymous_enableNO #允许匿名用户访问为了安全选择关闭 local_enableYES # 允许本地用户登录 write_enableYES # 是否允许写入 local_umask022 # 本地用户上传文件的umask dirmessage_enableYES #为YES…

ihtml2document能不能根据id获取dom_一段监视 DOM 的神奇代码

作者:Eddie Aich翻译:疯狂的技术宅原文:https://dev.to/eddieaich/spy-on-the-dom-3d47未经允许严禁转载通过使用此模块,只需将鼠标悬停在浏览器中,即可快速查看DOM元素的属性。基本上它是一个即时检查器。将鼠标悬停在…

let 和const

let 命令 es6新增了let命令,用于声明变量,与var用法类似,但是使用let声明变量只在它所在的块内有效,而var则是定义的全局变量 {let a10;var b1; } a //a is not defined,外部的a不能访问到上面块中定义的a变量 b //1let不存在…

centos7搭建apache服务器(亲测可用)

1安装apache yum install httpd httpd-devel -y 2开启服务器 systemctl start httpd.service 3开机自启 systemctl enable httpd.service 4关闭防火墙 5端口访问 6修改vi /etc/httpd/conf/httpd.conf,替换 7查看selinux 也可以不修改,放入/var/www/h…

java ssl 双向认证_Java实现 SSL双向认证

我们常见的SSL验证较多的只是验证我们的服务器是否是真实正确的,当然如果你访问的URL压根就错了,那谁也没有办法。这个就是所谓的SSL单向认证。但是实际中,我们有可能还会验证客户端是否符合要求,也就是给我们每个用户颁发一个证书…

python基础公式_一、Python基础(数据类型、基本函数、基本运算)

​1.变量作用:为了简便,运算时方便修改运算中的值,代指一些复杂过长的数据;what:用变量代指一些内容;how:全部由字母、数字和下划线组成,数字不能开头,不能和Python关键词…

Python爬去知乎上问题下所有图片

from zhihu_oauth import ZhihuClient from zhihu_oauth.exception import NeedCaptchaExceptionclient ZhihuClient()try:client.login(email_or_phone, password)print(u"登陆成功!") except NeedCaptchaException:# 保存验证码并提示输入,重新登录wit…

xshell连接突然报Connection closed by foreign host.

1问题描述报错 Connection closed by foreign host. Disconnected from remote host(yaoGS) at 155513. 2登入虚拟机 在linux系统操作中,经常需要连接其他的主机,连接其他主机的服务是openssh-server,它的功能是让远程主机可以通过网络访问…

java 爬虫_探索Java 多线程爬虫及分布式爬虫架构

在我们调试爬虫程序的时候,单线程爬虫没什么问题,但是当我们在线上环境使用单线程爬虫程序去采集网页时,单线程就暴露出了两个致命的问题:采集效率特别慢,单线程之间都是串行的,下一个执行动作需要等上一个…

數據庫ORACLE轉MYSQL存儲過程遇到的坑~(總結)

ORACLE數據庫轉MySQL數據庫遇到的坑 總結 最近在做Oracle轉mysql的工程,遇到的坑是真的多,尤其是存儲過程,以前都沒接觸過類似的知識,最近也差不多轉完了就總結一下。希望能幫到一些人(包括以後的自己)~ 1&…

java jdbc开启事务_spring jdbc 事务配置

配置WEB.XMLxmlns:xsi"http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation"http://xmlns.jcp.org/xml/ns/javaee http://xmlns.jcp.org/xml/ns/javaee/web-app_3_0.xsd"version"3.0">org.springframework.web.context.ContextLoa…

python 文件指针在文件末尾_python文件操作及seek偏移详解

一、python文件操作中的编码本次测试是基于python 2.7.12 OS:Ubuntu16.04 pycharm环境,以及win7下2.7.12;首先说下汉字在文件中占用的字节数,这个先看以下实验(win7)下 因为linux下不支持gbk,本文不讲utf-8 ,gbk编码具体知识,有…

docker小实战和应用

1运行一个docker 一开始docker进不去,需要去https://hub.docker.com注册一个 2docker info查看信息 3docker run ubuntu echo hello world 查看第一个命令输出 4docker images 查看本地的镜像 5查看开启的容器和没有开启的容器 Docker ps -a 6 docker pull ngi…

java 窗口 单例_java单例模式实现面板切换

本文实例为大家分享了java单例模式实现面板切换的具体代码,供大家参考,具体内容如下1、首先介绍一下什么是单例模式:java单例模式是一种常见的设计模式,那么我们先看看懒汉模式:public class Singleton_ {//设为私有方…

java垃圾回收机制_干货:Java 垃圾回收机制

什么是自动垃圾回收?自动垃圾回收是一种在堆内存中找出哪些对象在被使用,还有哪些对象没被使用,并且将后者删掉的机制。所谓使用中的对象(已引用对象),指的是程序中有指针指向的对象;而未使用中的对象(未引用对象)&…

java项目定时任务_java项目定时任务实现

首先配置spring-context.xml文件在xmlns 下加如下代码xmlns:task"http://www.springframework.org/schema/task"在xsi:schemaLocation里添加如下代码http://www.springframework.org/schema/taskhttp://www.springframework.org/schema/task/spring-task-3.1.xsd还有…

enter power save mode解决

这个问题是什么产生的呢?这是我刚来公司的第三天,公司停电,等重新来电的时候有三台电脑都出现了这个问题。连接显示屏没有反应 遇到这种问题,首先这是主机没有正常启动引起的 1第一步:先插拔下电源,重新启动…

python多线程编程_Python 多线程编程

Thread类classThread:def __init__(self,groupNone,targetNone,nameNone,args(),kwargsNone,*,daemonNone)group:None,为日后扩展 ThreadGroup 类实现而保留。target&…

linux修改网卡名(亲测有效)

1查看网卡ip addr 2cd /etc/sysconfig/network-scripts Ls查看 3mv ifcfg-eno16777736 ifcfg-eth0重命名,然后编辑 最后一行加入IPADDR192.168.30.136 NETMASK255.255.255.0 HWADDR00:0C:29:aa?2f BOOTPROTO改成static 4 vi /etc/default/grub 5 grub2-mkconfig…