一、分发到多Consumer(fanout)
二、Routing路由(Direct)
三、主题路由(Topic)
一、分发到多Consumer(fanout)
将同一个Message deliver到多个Consumer中。这个模式也被称为"publish/subscribe"
创建一个日志系统,包含两部分:第一部分发出log(Producer),第二部分接收到并打印(Consumer)。两个Consumer,第一个将log写到物理磁盘上;第二个将log输出的屏幕。
1.发送消息流程:
1.Producer发送的Message实际上是发到了Exchange中。
2.Exchanges从Producer接收message投递到queue中
3.Prducer发送的消息只是到达了Exchange中,Exchange具有不同的类型实现不同的分发方式
Exchnges的类型:direct、topic和fanout
fanout就是广播模式,会将所有的Message都放到它所知道的queue中
channel.exchange_declare(exchange='logs',
type='fanout') //创建一个名字为logs,类型为fanout的Exchange:
1 2 3 4 5 6 7 8 9 10 11 | [root@node 112 ~]# rabbitmqctl list_exchanges //查看所有的Exchanges Listing exchanges ... logs fanout amq.direct direct amq.fanout fanout amq.headers headers amq.match headers amq.rabbitmq.log topic amq.rabbitmq.trace topic amq.topic topic ...done. |
注意:amq.* exchanges 和the default (unnamed)exchange是RabbitMQ默认创建的。
通过exchange,而不是routing_key来publish Message:
channel.basic_publish(exchange='logs',
routing_key='',
body=message)
2.临时队列
截至现在,我们用的queue都是有名字的:第一个是hello,第二个是task_queue。使用有名字的queue,使得在Producer和Consumer之前共享queue成为可能。
但是对于我们将要构建的日志系统,并不需要有名字的queue。我们希望得到所有的log,而不是它们中间的一部分。而且我们只对当前的log感兴趣。为了实现这个目标,我们需要两件事情:
1)每当Consumer连接时,我们需要一个新的,空的queue。因为我们不对老的log感兴趣。幸运的是,如果在声明queue时不指定名字,那么RabbitMQ会随机为我们选择这个名字。方法:
result = channel.queue_declare()
通过result.method.queue 可以取得queue的名字。基本上都是这个样子:amq.gen-JzTY20BRgKO-HjmUJj0wLg。
2)当Consumer关闭连接时,这个queue要被deleted。可以加个exclusive的参数。方法:
result = channel.queue_declare(exclusive=True) //每次获取的都是新的,单独使用的
3.Bindings绑定
创建好fanout类型的Exchange和没有名字的queue后(实际上是RabbitMQ帮我们取的名字)Exchange通过bindings把它的Message发送到目标queue
channel.queue_bind(exchange='logs',
queue=result.method.queue)
使用命令rabbitmqctl list_bindings 查看bindings
4.最终代码
拓扑图:
Producer,在这里就是产生log的program,基本上和前几个都差不多。最主要的区别就是publish通过了exchange而不是routing_key。
emit_log.py script:
===========================================================================
1 2 3 4 5 6 7 8 9 10 11 12 13 14 | #!/usr/bin/env python import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host= 'localhost' )) channel = connection.channel() channel.exchange_declare(exchange= 'logs' , type= 'fanout' ) message = ' ' .join(sys.argv[ 1: ]) or "info: Hello World!" channel.basic_publish(exchange= 'logs' , routing_key= '' , body=message) print " [x] Sent %r" % (message,) connection.close() |
还有一点要注意的是我们声明了exchange。publish到一个不存在的exchange是被禁止的。如果没有queue bindings exchange的话,log是被丢弃的。
Consumer:receive_logs.py:
===========================================================================
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 | #!/usr/bin/env python import pika connection = pika.BlockingConnection(pika.ConnectionParameters( host= 'localhost' )) channel = connection.channel() channel.exchange_declare(exchange= 'logs' , type= 'fanout' ) result = channel.queue_declare(exclusive=True) queue_name = result.method.queue channel.queue_bind(exchange= 'logs' , queue=queue_name) print ' [*] Waiting for logs. To exit press CTRL+C' def callback(ch, method, properties, body): print " [x] %r" % (body,) channel.basic_consume(callback, queue=queue_name, no_ack=True) channel.start_consuming() |
试运行:
Consumer1:$ python receive_logs.py > logs_from_rabbit.log //追加到文件
Consumer2:python receive_logs.py //输出到屏幕
Producer:python emit_log.py
也可通过修改callback自己写文件
输出结果如图:
二、Routing路由(Direct)
对于上一个日志系统改进。能够使用不同的severity来监听不同等级的log。比如我们希望只有error的log才保存到磁盘上。
1.Bindings绑定
之前的绑定
channel.queue_bind(exchange=exchange_name,
queue=queue_name)
绑定其实就是关联了exchange和queue。或者这么说:queue对exchagne的内容感兴趣,exchange要把它的Message deliver到queue中。
实际上,绑定可以带routing_key 这个参数。其实这个参数的名称和basic_publish 的参数名是相同了。为了避免混淆,我们把它成为binding key。
使用一个key来创建binding :
channel.queue_bind(exchange=exchange_name,
queue=queue_name,
routing_key='black')
对于fanout的exchange来说,这个参数是被忽略的。
2.Direct Exchange
通过Bindings key完全匹配
图Direct路由模型
exchange X和两个queue绑定在一起。Q1的binding key是orange。Q2的binding key是black和green。
当P publish key是orange时,exchange会把它放到Q1。如果是black或者green那么就会到Q2。其余的Message都会被丢弃。
3.多重绑定(Multiple Bindings)
多个queue绑定同一个key是可以的。对于下图的例子,Q1和Q2都绑定了black。也就是说,对于routing key是black的Message,会被deliver到Q1和Q2。其余的Message都会被丢弃。
图muliti-bindings
4.生产者和消费者
生产者:
===========================================================================
1 2 3 4 5 6 7 8 | channel.exchange_declare(exchange= 'direct_logs' , type= 'direct' ) //创建一个direct的exchange。使用log的severity作为routing key,这样Consumer可以针对不同severity的log进行不同的处理。 publish: channel.basic_publish(exchange= 'direct_logs' , routing_key=severity, body=message) //涉及三种severity: 'info' , 'warning' , 'error' . |
消费者:
===========================================================================
1 2 3 4 5 6 7 | result = channel.queue_declare(exclusive=True) queue_name = result.method.queue for severity in severities: channel.queue_bind(exchange= 'direct_logs' , queue=queue_name, routing_key=severity) //queue需要绑定severity |
5.最终版本
图:direct_2
emit_log_direct.py
===========================================================================
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 | #!/usr/bin/env python import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host= 'localhost' )) channel = connection.channel() channel.exchange_declare(exchange= 'direct_logs' , type= 'direct' ) severity = sys.argv[ 1 ] if len(sys.argv) > 1 else 'info' message = ' ' .join(sys.argv[ 2: ]) or 'Hello World!' channel.basic_publish(exchange= 'direct_logs' , routing_key=severity, body=message) print " [x] Sent %r:%r" % (severity, message) connection.close() |
receive_logs_direct.py:
===========================================================================
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 | #!/usr/bin/env python import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host= 'localhost' )) channel = connection.channel() channel.exchange_declare(exchange= 'direct_logs' , type= 'direct' ) result = channel.queue_declare(exclusive=True) queue_name = result.method.queue severities = sys.argv[ 1: ] if not severities: print >> sys.stderr, "Usage: %s [info] [warning] [error]" % \ (sys.argv[ 0 ],) sys.exit( 1 ) for severity in severities: channel.queue_bind(exchange= 'direct_logs' , queue=queue_name, routing_key=severity) print ' [*] Waiting for logs. To exit press CTRL+C' def callback(ch, method, properties, body): print " [x] %r:%r" % (method.routing_key, body,) channel.basic_consume(callback, queue=queue_name, no_ack=True) channel.start_consuming() |
===========================================================================
试运行:
$ python receive_logs_direct.py warning error > logs_from_rabbit.log
//把warning和error的log记录到一个文件中
$ python receive_logs_direct.py info warning error
//打印所有log到屏幕
三、主题路由(Topic)
1.Topic exchange
Message的routing_key使用限制,不能使任意的。格式是以点号“."分割的字符表。
比如:"stock.usd.nyse", "nyse.vmw", "quick.orange.rabbit"。你可以放任意的key在routing_key中,当然最长不能超过255 bytes。
对于routing_key,有两个特殊字符(在正则表达式里叫元字符):
* (星号) 代表任意 一个单词
# (hash) 0个或者多个单词
示例:
Producer发送消息时需要设置routing_key,routing_key包含三个单词和两个点号。
第一个key是描述了celerity(灵巧,敏捷),第二个是colour(色彩),第三个是species(物种):"<celerity>.<colour>.<species>"。
在这里我们创建了两个绑定: Q1 的binding key 是"*.orange.*"; Q2 是 "*.*.rabbit" 和 "lazy.#":
Q1 感兴趣所有orange颜色的动物
Q2 感兴趣所有的rabbits和所有的lazy的
比如routing_key是 "quick.orange.rabbit"将会发送到Q1和Q2中。消息"lazy.orange.elephant" 也会发送到Q1和Q2。但是"quick.orange.fox" 会发送到Q1;"lazy.brown.fox"会发送到Q2。"lazy.pink.rabbit" 也会发送到Q2,但是尽管两个routing_key都匹配,它也只是发送一次。"quick.brown.fox" 会被丢弃。
如果发送的单词不是3个呢? 答案要看情况,因为#是可以匹配0个或任意个单词。比如"orange" or "quick.orange.male.rabbit",它们会被丢弃。如果是lazy那么就会进入Q2。类似的还有 "lazy.orange.male.rabbit",尽管它包含四个单词。
Topic exchange和其他exchange
由于有"*" (star) and "#" (hash), Topic exchange 非常强大并且可以转化为其他的exchange:
如果binding_key 是 "#" - 它会接收所有的Message,不管routing_key是什么,就像是fanout exchange。
如果 "*" (star) and "#" (hash) 没有被使用,那么topic exchange就变成了direct exchange。
2.代码实现
The code for emit_log_topic.py:
========================================================================
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 | #!/usr/bin/env python import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host= 'localhost' )) channel = connection.channel() channel.exchange_declare(exchange= 'topic_logs' , type= 'topic' ) routing_key = sys.argv[ 1 ] if len(sys.argv) > 1 else 'anonymous.info' message = ' ' .join(sys.argv[ 2: ]) or 'Hello World!' channel.basic_publish(exchange= 'topic_logs' , routing_key=routing_key, body=message) print " [x] Sent %r:%r" % (routing_key, message) connection.close() |
========================================================================
The code for receive_logs_topic.py:
========================================================================
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 | #!/usr/bin/env python import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host= 'localhost' )) channel = connection.channel() channel.exchange_declare(exchange= 'topic_logs' , type= 'topic' ) result = channel.queue_declare(exclusive=True) queue_name = result.method.queue binding_keys = sys.argv[ 1: ] if not binding_keys: print >> sys.stderr, "Usage: %s [binding_key]..." % (sys.argv[ 0 ],) sys.exit( 1 ) for binding_key in binding_keys: channel.queue_bind(exchange= 'topic_logs' , queue=queue_name, routing_key=binding_key) print ' [*] Waiting for logs. To exit press CTRL+C' def callback(ch, method, properties, body): print " [x] %r:%r" % (method.routing_key, body,) channel.basic_consume(callback, queue=queue_name, no_ack=True) channel.start_consuming() |
3.运行和结果
python receive_logs_topic.py "#" //接收所有的log
python receive_logs_topic.py "kern.*" //接收所有kern facility的log
python receive_logs_topic.py "*.critical" //仅仅接收critical的log:
python receive_logs_topic.py "kern.*" "*.critical" //可以创建多个绑定:
python emit_log_topic.py "kern.critical" "A critical kernel error" //Producer产生一个log:"kern.critical" type:
参考:
http://www.rabbitmq.com/tutorials/tutorial-three-python.html
本文转自MT_IT51CTO博客,原文链接:http://blog.51cto.com/hmtk520/2051247,如需转载请自行联系原作者