python消息队列中间件_python-RabbtiMQ消息队列

1.RabbitMQ简介

AMQP,即Advanced Message Queuing Protocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然。

AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。

RabbitMQ是一个开源的AMQP实现,服务器端用Erlang语言编写,支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。

2.RabbitMQ能为你做些什么?

消息系统允许软件、应用相互连接和扩展.这些应用可以相互链接起来组成一个更大的应用,或者将用户设备和数据进行连接.消息系统通过将消息的发送和接收分离来实现应用程序的异步和解偶.

或许你正在考虑进行数据投递,非阻塞操作或推送通知。或许你想要实现发布/订阅,异步处理,或者工作队列。所有这些都可以通过消息系统实现。

RabbitMQ是一个消息代理- 一个消息系统的媒介。它可以为你的应用提供一个通用的消息发送和接收平台,并且保证消息在传输过程中的安全。

3.RabbitMQ 安装使用

4.Python应用RabbitMQ

python操作RabbitMQ的模块有三种:pika,Celery,Haigha。

本文使用的是pika。

"""RabbitMQ-生产者。"""

importpika"""声明socket"""connection=pika.BlockingConnection(

pika.ConnectionParameters('localhost')

)"""声明一个管道"""channel=connection.channel()"""定义一个queue,定义queue名称,标识"""channel.queue_declare(queue='hello')"""定义queue中的消息内容"""channel.basic_publish(exchange='',

routing_key='hello',

body='Hello World!')print("[x] Sent 'Hello World!'")

"""RabbitMQ-消费者。"""

importpika"""声明socket"""connection=pika.BlockingConnection(

pika.ConnectionParameters('localhost')

)"""声明一个管道"""channel=connection.channel()"""定义一个queue,定义queue名称,标识,与生产者队列中对应"""channel.queue_declare(queue='hello')defcallback(ch,method,properties,body):print('rev-->',ch,method,properties,body)print('rev messages-->',body)"""消费,接收消息..."""channel.basic_consume(

consumer_callback=callback, #如果收到消息,则回调这个函数处理消息

queue='hello', #queue_declare(queue='hello') 对应

no_ack=True

)"""消费者会一直监听这queue,如果队列中没有消息,则会卡在这里,等待消息队列中生成消息。"""

print('waiting for meassages, to exit press CTRL+C')

channel.start_consuming()

5.RabbitMQ消息持久化

importpika

queue_name= 'xiaoxi_'

"""声明socket"""connection=pika.BlockingConnection(

pika.ConnectionParameters('localhost')

)"""声明一个管道"""channel=connection.channel()"""定义一个queue,定义queue名称,标识

queue,durable 持久化"""channel.queue_declare(queue=queue_name)whileTrue:

input_value= input(">>:").strip()ifinput_value:"""定义queue中的消息内容"""

print('producer messages:{0}'.format(input_value))

channel.basic_publish(

exchange='',

routing_key=queue_name,

body=input_value,

properties=pika.BasicProperties( #消息持久化.....

delivery_mode=2,

)

)continue

producer.py

importpika,time

queue_name= 'xiaoxi_'

"""声明socket"""connection=pika.BlockingConnection(

pika.ConnectionParameters('localhost')

)"""声明一个管道"""channel=connection.channel()"""定义一个queue,定义queue名称,标识"""channel.queue_declare(queue=queue_name)defcallback(ch,method,properties,body):print('rev-->',ch,method,properties,body)#time.sleep(5) # 模拟消费者丢失生产者发送的消息,生产者消息队列中的这一条消息则不会删除。

print('rev messages-->',body)"""手动向生产者确认收到消息"""

#ch.basic_ack(delivery_tag=method.delivery_tag)

"""消费,接收消息..."""channel.basic_consume(

consumer_callback=callback, #如果收到消息,则回调这个函数处理消息

queue=queue_name,#no_ack=True #接收到消息,主动向生产者确认已经接收到消息。

)print('waiting for meassages, to exit press CTRL+C')

channel.start_consuming()

consumer.py

6.RabbitMQ消息公平分发

importpika

queue_name= 'xiaoxi_1'

"""声明socket"""connection=pika.BlockingConnection(

pika.ConnectionParameters('localhost')

)"""声明一个管道"""channel=connection.channel()"""定义一个queue,定义queue名称,标识

queue,durable 持久化"""channel.queue_declare(queue=queue_name)whileTrue:

input_value= input(">>:").strip()ifinput_value:"""定义queue中的消息内容"""

print('producer messages:{0}'.format(input_value))

channel.basic_publish(

exchange='',

routing_key=queue_name,

body=input_value,

)continue

producer.py

importpika,time

queue_name= 'xiaoxi_1'

"""声明socket"""connection=pika.BlockingConnection(

pika.ConnectionParameters('localhost')

)"""声明一个管道"""channel=connection.channel()"""定义一个queue,定义queue名称,标识

queue,durable 持久化"""channel.queue_declare(queue=queue_name)defcallback(ch,method,properties,body):print('rev-->',ch,method,properties,body)print('rev messages-->',body)"""模拟处理消息快慢速度"""time.sleep(1)

ch.basic_ack(delivery_tag=method.delivery_tag)"""根据消费者处理消息的快慢公平分发消息"""channel.basic_qos(prefetch_count=1)"""消费,接收消息..."""channel.basic_consume(

consumer_callback=callback, #如果收到消息,则回调这个函数处理消息

queue=queue_name,#no_ack=True #接收到消息,主动向生产者确认已经接收到消息。

)print('waiting for meassages, to exit press CTRL+C')

channel.start_consuming()

consumer.py

7.RabbitMQ-广播模式。

消息的发送模式类型1.fanout: 所有bind到此exchange的queue都可以接收消息 即是广播模式,所有的consumer都能收到。2.direct: 通过routingKey和exchange决定的那个唯一的queue可以接收消息 ,指定唯一的。3.topic:所有符合routingKey(此时可以是一个表达式)的routingKey所bind的queue可以接收消息。符合条件的。

表达式符号说明:#代表一个或多个字符,*代表任何字符

例:#.a会匹配a.a,aa.a,aaa.a等

*.a会匹配a.a,b.a,c.a等

注:使用RoutingKey为#,Exchange Type为topic的时候相当于使用fanout

4.headers: 通过headers 来决定把消息发给哪些queue (少用)

7.1 topic 广播模式。

importpika"""声明socket"""connection=pika.BlockingConnection(

pika.ConnectionParameters('localhost')

)"""声明一个管道"""channel=connection.channel()"""通过routingKey和exchange决定的那个唯一的queue可以接收消息 ,指定唯一的。"""exchange_name= 'topic_messages1'routing_key= 'my_topic'

"""定义exchage模式 direct广播模式"""channel.exchange_declare(exchange=exchange_name,exchange_type='topic')"""消息的发送模式类型

1.fanout: 所有bind到此exchange的queue都可以接收消息 即是广播模式,所有的consumer都能收到。

2.direct: 通过routingKey和exchange决定的那个唯一的queue可以接收消息 ,指定唯一的。

3.topic:所有符合routingKey(此时可以是一个表达式)的routingKey所bind的queue可以接收消息。符合条件的。

表达式符号说明:#代表一个或多个字符,*代表任何字符

例:#.a会匹配a.a,aa.a,aaa.a等

*.a会匹配a.a,b.a,c.a等

注:使用RoutingKey为#,Exchange Type为topic的时候相当于使用fanout

4.headers: 通过headers 来决定把消息发给哪些queue (少用)"""

whileTrue:

input_value= input(">>:").strip()ifinput_value:"""定义queue中的消息内容"""

print('producer messages:{0}'.format(input_value))

channel.basic_publish(

exchange=exchange_name,

routing_key=routing_key,

body=input_value,

)continue

producer.py

importpika,time"""声明socket"""connection=pika.BlockingConnection(

pika.ConnectionParameters('localhost')

)"""声明一个管道"""channel=connection.channel()"""通过routingKey和exchange决定的那个唯一的queue可以接收消息 ,指定唯一的。"""exchange_name= 'topic_messages1'routing_key= 'my_topic'channel.exchange_declare(exchange=exchange_name,exchange_type='topic')"""不指定queue名字,rabbit会随机分配一个名字,exclusive=True会在使用此queue的消费者断开后,自动将queue删除"""res= channel.queue_declare(exclusive=True)

queue_name=res.method.queue

channel.queue_bind(exchange=exchange_name,queue=queue_name,routing_key=routing_key)print('direct_key:{0}'.format(routing_key))print('queue_name:{0}'.format(queue_name))defcallback(ch,method,properties,body):print('rev-->',ch,method,properties,body)print('rev messages-->',body)

ch.basic_ack(delivery_tag=method.delivery_tag)"""消费,接收消息..."""channel.basic_consume(

consumer_callback=callback, #如果收到消息,则回调这个函数处理消息

queue=queue_name,

)print('waiting for meassages, to exit press CTRL+C')

channel.start_consuming()

consumer.py

7.2 direct 广播模式

importpika

connection=pika.BlockingConnection(

pika.ConnectionParameters('localhost')

)

channel=connection.channel()"""通过routingKey和exchange决定的那个唯一的queue可以接收消息 ,指定唯一的。"""exchange_name= 'direct_messages'routing_key= 'my_direct'

"""定义exchage模式 direct广播模式

消息的发送模式类型

1.fanout: 所有bind到此exchange的queue都可以接收消息 即是广播模式,所有的consumer都能收到。

2.direct: 通过routingKey和exchange决定的那个唯一的queue可以接收消息 ,指定唯一的。

3.topic:所有符合routingKey(此时可以是一个表达式)的routingKey所bind的queue可以接收消息。符合条件的。

表达式符号说明:#代表一个或多个字符,*代表任何字符

例:#.a会匹配a.a,aa.a,aaa.a等

*.a会匹配a.a,b.a,c.a等

注:使用RoutingKey为#,Exchange Type为topic的时候相当于使用fanout

4.headers: 通过headers 来决定把消息发给哪些queue (少用)"""channel.exchange_declare(exchange=exchange_name,exchange_type='direct')

channel.basic_publish(

exchange=exchange_name,

routing_key=routing_key,

body='hello word!',

)#while True:#input_value = input(">>:").strip()#if input_value:#"""定义queue中的消息内容"""#print('producer messages:{0}'.format(input_value))#channel.basic_publish(#exchange=exchange_name,#routing_key=routing_key,#body=input_value,#)#continue

producer.py

importpika,time

connection=pika.BlockingConnection(

pika.ConnectionParameters('localhost')

)

channel=connection.channel()

exchange_name= 'direct_messages'routing_key= 'my_direct'channel.exchange_declare(exchange=exchange_name,exchange_type='direct')"""不指定queue名字,rabbit会随机分配一个名字,exclusive=True会在使用此queue的消费者断开后,自动将queue删除"""res= channel.queue_declare(exclusive=True)

queue_name=res.method.queue

channel.queue_bind(exchange=exchange_name,queue=queue_name,routing_key=routing_key)print('direct_key:{0}'.format(routing_key))print('queue_name:{0}'.format(queue_name))defcallback(ch,method,properties,body):print('rev-->',ch,method,properties,body)print('rev messages-->',body)

ch.basic_ack(delivery_tag=method.delivery_tag)"""消费,接收消息..."""channel.basic_consume(

consumer_callback=callback, #如果收到消息,则回调这个函数处理消息

queue=queue_name,

)print('waiting for meassages, to exit press CTRL+C')

channel.start_consuming()

consumer.py

7.3 fanout 广播模式

importpika"""声明socket"""connection=pika.BlockingConnection(

pika.ConnectionParameters('localhost')

)"""声明一个管道"""channel=connection.channel()

exchange_name= 'messages'

"""定义exchage模式 fanout广播模式"""channel.exchange_declare(exchange=exchange_name,exchange_type='fanout')"""消息的发送模式类型

1.fanout: 所有bind到此exchange的queue都可以接收消息 即是广播模式,所有的consumer都能收到。

2.direct: 通过routingKey和exchange决定的那个唯一的queue可以接收消息 ,指定唯一的。

3.topic:所有符合routingKey(此时可以是一个表达式)的routingKey所bind的queue可以接收消息。符合条件的。

表达式符号说明:#代表一个或多个字符,*代表任何字符

例:#.a会匹配a.a,aa.a,aaa.a等

*.a会匹配a.a,b.a,c.a等

注:使用RoutingKey为#,Exchange Type为topic的时候相当于使用fanout

4.headers: 通过headers 来决定把消息发给哪些queue (少用)"""

whileTrue:

input_value= input(">>:").strip()ifinput_value:"""定义queue中的消息内容"""

print('producer messages:{0}'.format(input_value))

channel.basic_publish(

exchange=exchange_name,

routing_key='',

body=input_value,

)continue

producer.py

importpika,time"""声明socket"""connection=pika.BlockingConnection(

pika.ConnectionParameters('localhost')

)"""声明一个管道"""channel=connection.channel()"""

"""exchange_name= 'messages'channel.exchange_declare(exchange=exchange_name,exchange_type='fanout')"""不指定queue名字,rabbit会随机分配一个名字,exclusive=True会在使用此queue的消费者断开后,自动将queue删除"""res= channel.queue_declare(exclusive=True)

queue_name=res.method.queue

channel.queue_bind(exchange=exchange_name,queue=queue_name)"""每一个消费者随机一个唯一的queue_name"""

print('queue_name:{0}',format(queue_name))defcallback(ch,method,properties,body):print('rev-->',ch,method,properties,body)print('rev messages-->',body)

ch.basic_ack(delivery_tag=method.delivery_tag)"""消费,接收消息..."""channel.basic_consume(

consumer_callback=callback, #如果收到消息,则回调这个函数处理消息

queue=queue_name,#no_ack=True #接收到消息,主动向生产者确认已经接收到消息。

)print('waiting for meassages, to exit press CTRL+C')

channel.start_consuming()

consumer.py

8 RabbitMQ 实现 RPC

"""RabbitMQ-生产者。

利用rabbitMQ 实现一个能收能发的RPC小程序。

重点需要注意的是:queue的绑定。接收的一端必选预先绑定queue生成队列,发送端才能根据queue发送。"""

importpika,uuid,timeclassrabbitmqClient(object):def __init__(self,rpc_queue):

self.rpc_queue=rpc_queue

self.app_id=str(uuid.uuid4())

self.connection= pika.BlockingConnection(pika.ConnectionParameters('localhost'))

self.channel=self.connection.channel()"""生成一个自动queue,传过去server,server再往这个自动queue回复数据"""autoqueue= self.channel.queue_declare(exclusive=True)

self.callback_queue=autoqueue.method.queue"""先定义一个接收回复的动作"""self.channel.basic_consume(self.on_response, no_ack=True,

queue=self.callback_queue)defon_response(self,ch,method,properties,body):if self.app_id ==properties.app_id:

self.response=bodydefsend(self,msg):

self.response=None

self.channel.basic_publish(

exchange='',

routing_key=self.rpc_queue,

properties=pika.BasicProperties(

reply_to=self.callback_queue,

app_id=self.app_id,

),

body=str(msg)

)#发送完消息,进入接收模式。

while self.response isNone:#print('callback_queue:{0} app_id:{1} wait...'.format(self.callback_queue,self.app_id))

self.connection.process_data_events()#time.sleep(0.5)

returnself.response

rpc_request_queue= 'rpc_request_queue'rb=rabbitmqClient(rpc_request_queue)whileTrue:

msg= input('input >> :').strip()ifmsg:print('rpc_queue:{0} app_id:{1}'.format(rb.rpc_queue,rb.app_id))print('send msg:{}'.format(msg))

reponses=rb.send(msg)print('reponses msg:{}'.format(reponses.decode('utf-8')))continue

client.py

"""RabbitMQ-消费者。"""

importpikaclassrabbitmqServer(object):def __init__(self,rpc_queue):

self.rpc_queue=rpc_queue

self.connection= pika.BlockingConnection(pika.ConnectionParameters('localhost'))

self.channel=self.connection.channel()

self.channel.queue_declare(queue=self.rpc_queue)defon_reponses(self,ch,method,properties,body):ifbody:#reponser ...

ch.basic_publish(exchange='',

routing_key=properties.reply_to,

properties=pika.BasicProperties(

reply_to=properties.reply_to,

app_id=properties.app_id,

),

body='reponses ok! msg is:{}'.format(body.decode('utf-8')))defstart_consuming(self):

self.channel.basic_consume(consumer_callback=self.on_reponses,queue=self.rpc_queue,no_ack=True)print('waiting for meassages, to exit press CTRL+C')

self.channel.start_consuming()

rpc_request_queue= 'rpc_request_queue'rd_server=rabbitmqServer(rpc_request_queue)

rd_server.start_consuming()

server.py

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/370137.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

CSS position(定位)属性

关于CSS position,来自MDN的描述: CSS position属性用于指定一个元素在文档中的定位方式。top、right、bottom、left 属性则决定了该元素的最终位置。 然后来看看什么是文档流(normal flow),下面是 www.w3.org 的描述: Normal flo…

tomcat配置文件server.xml详解

版权声明:本文为博主原创文章,未经博主允许不得转载。 目录(?)[] 元素名 属性 解释 server port 指定一个端口,这个端口负责监听关闭tomcat 的请求 shutdown 指定向端口发送的命令字符串 service name 指定service 的名字 Con…

均值,方差,协方差,协方差矩阵,特征值,特征向量

大牛博客,收藏一下 http://blog.csdn.net/yangleo1987/article/details/52845912转载于:https://www.cnblogs.com/gaohai/p/8086626.html

Java ByteBuffer –速成课程

以我的经验,当开发人员第一次遇到java.nio.ByteBuffer时,会引起混乱和细微的错误,因为如何正确使用它尚不明显。 在我对API文档感到满意之前,需要反复阅读API文档和一些经验以实现一些微妙之处。 这篇文章是关于如何正确使用它们的…

c语言cth三角函数表示,三角函数与双曲函数基本公式对照表

圆函数(三角函数)1.基本性质:sin tan cos x x x ,cos cot sin xx x 1sec cos x x ,1csc sin x x tan cot 1x x sin csc 1x x sec cos 1x x 22sin cos 1x x 221tan sec x x ,221cot csc x x 2.奇偶性:sin()sin x x -- cos()cos x x - tan()tan x x --3.…

实现编辑功能有哪几个action_Web 应用的撤销重做实现

背景前不久,我参与开发了团队中的一个 web 应用,其中的一个页面操作如下图所示:GIF这个制作间页面有着类似 PPT 的交互:从左侧的工具栏中选择元素放入中间的画布、在画布中可以删除、操作(拖动、缩放、旋转等&#xff…

为什么我们要做三份 Webpack 配置文件

时至今日,Webpack 已经成为前端工程必备的基础工具之一,不仅被广泛用于前端工程发布前的打包,还在开发中担当本地前端资源服务器(assets server)、模块热更新(hot module replacement)、API Pro…

使用maven插件构建docker镜像

为什么要用插件 主要还是自动化的考虑,如果额外使用Dockerfile进行镜像生成,可能会需要自己手动指定jar/war位置,并且打包和生成镜像间不同步,带来很多琐碎的工作。 插件选择 使用比较多的是spotify的插件:https://github.com/spo…

windows下如何安装pip以及如何查看pip是否已经安装成功?

最近刚学习python,发现很多关于安装以及查看pip是否安装成的例子都比较老,不太适合于现在(python 3.6 )因此,下一个入门级别的教程。 0:首先如何安装python我就不做介绍了。 1:如果安装的是pyth…

检查用户显示器的分辨率

检查用户显示器的分辨率 转载于:https://www.cnblogs.com/Renyi-Fan/p/8088012.html

android 字体 dpi,详解Android开发中常用的 DPI / DP / SP

Android的碎片化已经被喷了好多年,随着国内手机厂商的崛起,碎片化也越来越严重,根据OpenSignal的最新调查,2014年市面上有18796种不同的Android设备,作为开发者,一个无法回避的难题就是需要适配各种各样奇奇…

android studio闪退代码不报错_代码不报错,不代表真的没错

今天是生信星球陪你的第695天大神一句话,菜鸟跑半年。我不是大神,但我可以缩短你走弯路的半年~就像歌儿唱的那样,如果你不知道该往哪儿走,就留在这学点生信好不好~这里有豆豆和花花的学习历程,从新手到进阶&#xff0c…

Centos7操作系统部署指南

一、硬件环境: Dell R620 二、软件环境: Centos6.4 X86_64 KVM Windows7vnc 三、安装说明 操作系统更新之迅速,让作为新手的系统运维人员有点措手不及,相对于老手就胸有成竹。怎么讲?由于老手对技术方向把握的非常好&…

Eclipse插件中的SLF4J登录

一直都在使用Maven和纯Java库进行开发,我从没想过在开发Eclipse插件时发出一些日志语句可能会成为问题。 但是,在Eclipse开发人员的想象中,一切似乎总是在Eclipse环境中,而Eclipse宇宙之外则什么都没有。 如果您使用Google搜索上…

CSS(四)

css元素溢出 当子元素的尺寸超过父元素的尺寸时,需要设置父元素显示溢出的子元素的方式,设置的方法是通过overflow属性来设置。 overflow的设置项: 1、visible 默认值。内容不会被修剪,会呈现在元素框之外。2、hidden 内容会被修…

mysql排名

转载自思心思危http://www.cnblogs.com/zengguowang/p/5541431.html 一、sql1{不管数据相同与否,排名依次排序(1,2,3,4,5,6,7.....)} SELECTobj.user_id,   obj.score,  rownum : rownum 1 AS rownum FROM(SELECT…

python中变量名后的逗号_深入浅析python变量加逗号,的含义

逗号,用于生成一个长度为1的元组>>> (1)1>>> (1,)(1,)>>> 1,(1,)因此需要将长度为1的元组中元素提取出来可以用,简化赋值操作>>> a(1,)>>> ba>>> b(1,)>>> b,a>>> b1最后print打印变量加,实现连续打印…

广告的显示和关闭

app或游戏的主页显示广告页面,实现方式: public class MainActivity extends Activity implements View.OnClickListener{private Button btnShowAd;private RelativeLayout layoutAd;Overrideprotected void onCreate(Bundle savedInstanceState) {supe…

android签到功能模块,基于android的课堂签到系统.doc

基于android的课堂签到系统本科毕业论文(设计)题 目 基于Android的课堂签到系统学生姓名 XXX指导教师 XX学 院 信息科学与工程学院专业班级 计算机科学与技术0908班完成时间 2013年5月 摘 要在大学课堂中,签到问题一直困扰着老师和同学们。传统课堂签到的手段大多是…

Java EE 7社区调查结果!

在JSR 342下可以继续进行Java EE 7的工作。一切进展顺利,Java EE 7现在处于“初稿审查”阶段。 在11月初, Oracle发布了一个有关即将推出的Java EE 7功能的小型社区调查 。 昨天结果公布了。 超过1,100名开发人员参加了调查,并且几乎对每个问…