RabbitMQ详解(三)

一、分发到多Consumer(fanout)
二、Routing路由(Direct)
三、主题路由(Topic)

一、分发到多Consumer(fanout)
将同一个Message deliver到多个Consumer中。这个模式也被称为"publish/subscribe"
创建一个日志系统,包含两部分:第一部分发出log(Producer),第二部分接收到并打印(Consumer)。两个Consumer,第一个将log写到物理磁盘上;第二个将log输出的屏幕。

1.发送消息流程:
    1.Producer发送的Message实际上是发到了Exchange中。
    2.Exchanges从Producer接收message投递到queue中
    3.Prducer发送的消息只是到达了Exchange中,Exchange具有不同的类型实现不同的分发方式

Exchnges的类型:direct、topic和fanout
fanout就是广播模式,会将所有的Message都放到它所知道的queue中
channel.exchange_declare(exchange='logs',  
    type='fanout')   //创建一个名字为logs,类型为fanout的Exchange:

1
2
3
4
5
6
7
8
9
10
11
[root@node112 ~]# rabbitmqctl list_exchanges //查看所有的Exchanges
Listing exchanges ...
logs  fanout
amq.direct    direct
amq.fanout    fanout
amq.headers    headers
amq.match    headers
amq.rabbitmq.log    topic
amq.rabbitmq.trace    topic
amq.topic    topic
...done.

注意:amq.* exchanges 和the default (unnamed)exchange是RabbitMQ默认创建的。 

通过exchange,而不是routing_key来publish Message:
channel.basic_publish(exchange='logs',  
    routing_key='',  
    body=message)  

2.临时队列
截至现在,我们用的queue都是有名字的:第一个是hello,第二个是task_queue。使用有名字的queue,使得在Producer和Consumer之前共享queue成为可能。
但是对于我们将要构建的日志系统,并不需要有名字的queue。我们希望得到所有的log,而不是它们中间的一部分。而且我们只对当前的log感兴趣。为了实现这个目标,我们需要两件事情:
    1)每当Consumer连接时,我们需要一个新的,空的queue。因为我们不对老的log感兴趣。幸运的是,如果在声明queue时不指定名字,那么RabbitMQ会随机为我们选择这个名字。方法:
    result = channel.queue_declare() 
    通过result.method.queue 可以取得queue的名字。基本上都是这个样子:amq.gen-JzTY20BRgKO-HjmUJj0wLg。
    2)当Consumer关闭连接时,这个queue要被deleted。可以加个exclusive的参数。方法:
    result = channel.queue_declare(exclusive=True)   //每次获取的都是新的,单独使用的
    
3.Bindings绑定
    创建好fanout类型的Exchange和没有名字的queue后(实际上是RabbitMQ帮我们取的名字)Exchange通过bindings把它的Message发送到目标queue
    channel.queue_bind(exchange='logs',  
        queue=result.method.queue)      
    使用命令rabbitmqctl list_bindings 查看bindings
    
4.最终代码
拓扑图:
1.png

Producer,在这里就是产生log的program,基本上和前几个都差不多。最主要的区别就是publish通过了exchange而不是routing_key。
emit_log.py script:
===========================================================================

1
2
3
4
5
6
7
8
9
10
11
12
13
14
#!/usr/bin/env python
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='logs',
    type='fanout')
message = ' '.join(sys.argv[1:]) or "info: Hello World!"
channel.basic_publish(exchange='logs',
    routing_key='',
    body=message)
print " [x] Sent %r" % (message,)
connection.close()

还有一点要注意的是我们声明了exchange。publish到一个不存在的exchange是被禁止的。如果没有queue bindings exchange的话,log是被丢弃的。
Consumer:receive_logs.py:
===========================================================================

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
#!/usr/bin/env python
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='logs',
    type='fanout')
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
channel.queue_bind(exchange='logs',
    queue=queue_name)
print ' [*] Waiting for logs. To exit press CTRL+C'
def callback(ch, method, properties, body):
    print " [x] %r" % (body,)
channel.basic_consume(callback,
    queue=queue_name,
    no_ack=True)
channel.start_consuming()

试运行:
    Consumer1:$ python receive_logs.py > logs_from_rabbit.log  //追加到文件
    Consumer2:python receive_logs.py //输出到屏幕
    Producer:python emit_log.py
也可通过修改callback自己写文件
输出结果如图:
3.png

二、Routing路由(Direct)
对于上一个日志系统改进。能够使用不同的severity来监听不同等级的log。比如我们希望只有error的log才保存到磁盘上。
1.Bindings绑定
之前的绑定
channel.queue_bind(exchange=exchange_name,  
    queue=queue_name)  
绑定其实就是关联了exchange和queue。或者这么说:queue对exchagne的内容感兴趣,exchange要把它的Message deliver到queue中。
实际上,绑定可以带routing_key 这个参数。其实这个参数的名称和basic_publish 的参数名是相同了。为了避免混淆,我们把它成为binding key。
    使用一个key来创建binding :
channel.queue_bind(exchange=exchange_name,  
    queue=queue_name,  
    routing_key='black') 
对于fanout的exchange来说,这个参数是被忽略的。

2.Direct Exchange
通过Bindings key完全匹配
图Direct路由模型
Direct.png

exchange X和两个queue绑定在一起。Q1的binding key是orange。Q2的binding key是black和green。
当P publish key是orange时,exchange会把它放到Q1。如果是black或者green那么就会到Q2。其余的Message都会被丢弃。

3.多重绑定(Multiple Bindings)
多个queue绑定同一个key是可以的。对于下图的例子,Q1和Q2都绑定了black。也就是说,对于routing key是black的Message,会被deliver到Q1和Q2。其余的Message都会被丢弃。
图muliti-bindings
multi.png

4.生产者和消费者
生产者:
===========================================================================

1
2
3
4
5
6
7
8
channel.exchange_declare(exchange='direct_logs',  
    type='direct')  
//创建一个direct的exchange。使用log的severity作为routing key,这样Consumer可以针对不同severity的log进行不同的处理。
publish:
channel.basic_publish(exchange='direct_logs',  
    routing_key=severity, 
    body=message)  
//涉及三种severity:'info''warning''error'.

消费者:
===========================================================================

1
2
3
4
5
6
7
result = channel.queue_declare(exclusive=True)  
queue_name = result.method.queue  
for severity in severities:  
    channel.queue_bind(exchange='direct_logs',  
        queue=queue_name,  
        routing_key=severity) 
//queue需要绑定severity

5.最终版本
图:direct_2
direct_2.png

emit_log_direct.py 
===========================================================================

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
#!/usr/bin/env python
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='direct_logs',
    type='direct')
severity = sys.argv[1] if len(sys.argv) > 1 else 'info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='direct_logs',
    routing_key=severity,
    body=message)
print " [x] Sent %r:%r" % (severity, message)
connection.close()

receive_logs_direct.py: 
===========================================================================

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
#!/usr/bin/env python  
import pika  
import sys  
connection = pika.BlockingConnection(pika.ConnectionParameters(  
    host='localhost'))  
channel = connection.channel()  
channel.exchange_declare(exchange='direct_logs',  
    type='direct')  
result = channel.queue_declare(exclusive=True)  
queue_name = result.method.queue  
severities = sys.argv[1:]  
if not severities:  
    print >> sys.stderr, "Usage: %s [info] [warning] [error]" % \  
        (sys.argv[0],)  
    sys.exit(1)  
for severity in severities:      
    channel.queue_bind(exchange='direct_logs',  
        queue=queue_name,  
        routing_key=severity)  
print ' [*] Waiting for logs. To exit press CTRL+C'  
def callback(ch, method, properties, body):  
    print " [x] %r:%r" % (method.routing_key, body,)  
channel.basic_consume(callback,  
    queue=queue_name,  
    no_ack=True)  
channel.start_consuming()

===========================================================================
试运行:
$ python receive_logs_direct.py warning error > logs_from_rabbit.log 
    //把warning和error的log记录到一个文件中
$ python receive_logs_direct.py info warning error  
    //打印所有log到屏幕    

三、主题路由(Topic)
1.Topic exchange
Message的routing_key使用限制,不能使任意的。格式是以点号“."分割的字符表。
比如:"stock.usd.nyse", "nyse.vmw", "quick.orange.rabbit"。你可以放任意的key在routing_key中,当然最长不能超过255 bytes。
    对于routing_key,有两个特殊字符(在正则表达式里叫元字符):
    * (星号) 代表任意 一个单词
    # (hash) 0个或者多个单词
示例:
Producer发送消息时需要设置routing_key,routing_key包含三个单词和两个点号。
    第一个key是描述了celerity(灵巧,敏捷),第二个是colour(色彩),第三个是species(物种):"<celerity>.<colour>.<species>"。
在这里我们创建了两个绑定: Q1 的binding key 是"*.orange.*"; Q2 是  "*.*.rabbit" 和 "lazy.#":
    Q1 感兴趣所有orange颜色的动物
    Q2 感兴趣所有的rabbits和所有的lazy的
比如routing_key是 "quick.orange.rabbit"将会发送到Q1和Q2中。消息"lazy.orange.elephant" 也会发送到Q1和Q2。但是"quick.orange.fox" 会发送到Q1;"lazy.brown.fox"会发送到Q2。"lazy.pink.rabbit" 也会发送到Q2,但是尽管两个routing_key都匹配,它也只是发送一次。"quick.brown.fox" 会被丢弃。
如果发送的单词不是3个呢? 答案要看情况,因为#是可以匹配0个或任意个单词。比如"orange" or "quick.orange.male.rabbit",它们会被丢弃。如果是lazy那么就会进入Q2。类似的还有 "lazy.orange.male.rabbit",尽管它包含四个单词。

Topic exchange和其他exchange
    由于有"*" (star) and "#" (hash), Topic exchange 非常强大并且可以转化为其他的exchange:
    如果binding_key 是 "#" - 它会接收所有的Message,不管routing_key是什么,就像是fanout exchange。
    如果 "*" (star) and "#" (hash) 没有被使用,那么topic exchange就变成了direct exchange。

2.代码实现
The code for emit_log_topic.py:
========================================================================

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
#!/usr/bin/env python
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='topic_logs',
    type='topic')
routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='topic_logs',
    routing_key=routing_key,
    body=message)
print " [x] Sent %r:%r" % (routing_key, message)
connection.close()

========================================================================

The code for receive_logs_topic.py:     
========================================================================    

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
#!/usr/bin/env python
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='topic_logs',
    type='topic')
     
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
binding_keys = sys.argv[1:]
if not binding_keys:
    print >> sys.stderr, "Usage: %s [binding_key]..." % (sys.argv[0],)
    sys.exit(1)
for binding_key in binding_keys:
    channel.queue_bind(exchange='topic_logs',
        queue=queue_name,
        routing_key=binding_key)
print ' [*] Waiting for logs. To exit press CTRL+C'
def callback(ch, method, properties, body):
    print " [x] %r:%r" % (method.routing_key, body,)
channel.basic_consume(callback,
    queue=queue_name,
    no_ack=True)
channel.start_consuming()

    
3.运行和结果
    python receive_logs_topic.py "#"  //接收所有的log
    python receive_logs_topic.py "kern.*"  //接收所有kern facility的log
    python receive_logs_topic.py "*.critical"  //仅仅接收critical的log: 
    python receive_logs_topic.py "kern.*" "*.critical"  //可以创建多个绑定: 
    python emit_log_topic.py "kern.critical" "A critical kernel error"  //Producer产生一个log:"kern.critical" type: 
    
参考:    
http://www.rabbitmq.com/tutorials/tutorial-three-python.html










本文转自MT_IT51CTO博客,原文链接:http://blog.51cto.com/hmtk520/2051247,如需转载请自行联系原作者

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/286089.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

重磅 | Linux内核5.19初步支持LoongArch架构

经过龙芯中科与内核社区一年多的紧密合作&#xff0c;北京时间2022年6月4日清晨&#xff0c;Linux内核社区正式合并LoongArch架构支持代码。随着Linux-5.19的rc1版本的正式发布&#xff0c;LoongArch体系结构主体部分的源码已合并到内核主线之中&#xff0c;其余相关代码正在进…

【MATLAB统计分析与应用100例】案例011:matlab读取Excel数据,调用regress函数作一元线性回归分析

数据拟合效果预览: 文章目录 1. 读取数据,绘制散点图2. 计算相关系数3. 绘制回归直线4. 剔除异常数据,重新调用regress函数作一元线性回归1. 读取数据,绘制散点图 ClimateData = xlsread(examp08_01.xls); % 从Excel文件读取数据 x &

“*** IS NOT TRANSLATED IN …….. 解决办法

首先引起提示的原因是因为Lint 代码检查工具发现你的项目中&#xff08;或者引用的三方库&#xff09;有部分string.xml文件内容做了国际化操作&#xff0c;但却不完整&#xff0c;有些文本内容并没有相应的国际化翻译&#xff0c;在android开发中常见于项目引用的Libraries第三…

[转] ArcEngine 产生专题图

小生原文 ArcEngine 产生专题图 ArcEngine提供多个着色对象用于产生专题图&#xff0c;可以使用标准着色方案&#xff0c;也可以自定义着色方案&#xff0c;ArcEngine提供8中标准着色方案。 一、SimpleRenderer专题图 是使用单一符号进行着色分类&#xff0c;不涉及对要素的数据…

iVX无代码挑战五秒游戏制作

一、五秒挑战游戏简介及思考 制作iVX 低代码项目需要进入在线IDE&#xff1a;https://editor.ivx.cn/ 五秒挑战游戏指的是点击一个按钮开始计时&#xff0c;随后需要用户再次点击计时按钮&#xff0c;将会停止计时&#xff0c;当计时的时间等于五秒时将挑战成功&#xff0c;否…

MAUI 入门教程系列(4.通用主机)

前言对于ASP.NET Core 开发人员而言, 这并不陌生, 当ASP.NET Core应用程序启动时, 会创建默认的应用程序主机, 我们可以为应用程序配置所有的依赖关系、系统设置, 最终启动。如下所示:using IHost host Host.CreateDefaultBuilder(args).ConfigureServices((_, services) >…

【MATLAB统计分析与应用100例】案例012:matlab读取Excel数据,调用robustfit函数作稳健回归

稳健回归效果预览: 文章目录 1. 读取数据2. 调用robustfit函数作稳健回归3 .绘制残差和权重的散点图4. 绘制regress函数和robustfit函数对应的回归直线5. 拟合效果1. 读取数据 ClimateData = xlsread(examp08_01.xls); % 从Excel文件读取数据 x

后台页制作01《ivx低代码签到系统制作》

制作iVX 低代码项目需要进入在线IDE&#xff1a;https://editor.ivx.cn/ 一、签到系统思考 签到系统一般是指公布一个签到链接或者二维码&#xff0c;随后用户扫码后即可完成签到。 那如何制作呢&#xff1f;首先我们可以先不考虑签到页面的制作&#xff0c;既然签到暂时没有…

个人作业-Week2

第一部分 调研&#xff0c; 评测 运行平台 win 8 软件版本&#xff1a;微软必应词典桌面版 3.5.2 BUG标题&#xff1a;必应背单词无法发音 BUG详细描述&#xff1a;如图&#xff0c;左边为必应词典该单词的搜索&#xff0c;可以发音&#xff0c;而右边必应背单词中该单词的发音…

Blazor WebAssembly + Grpc Web=未来?

Blazor WebAssembly是什么首先来说说WebAssembly是什么&#xff0c;WebAssembly是一个可以使C#,Java,Golang等静态强类型编程语言&#xff0c;运行在浏览器中的标准&#xff0c;浏览器厂商基于此标准实现执行引擎。在实现了WebAssembly标准引擎之后&#xff0c;浏览器中可以执行…

Can't create directory 'E:\Repositories\***\db\transactions\138-41.txn':

遇到这种问题应该是迁移SVN库时丢失了文件夹引起的&#xff0c;直接在服务器上找到对应项目的db文件夹&#xff0c;手动创建“transactions”目录和“txn-protorevs”目录即可正常提交。

【MATLAB统计分析与应用100例】案例013:matlab读取Excel数据,调用nlinfit函数作一元非线性回归

1. 一元线性回归分析效果预览 2. matlab完整实现代码 %读取数据,绘制散点图** HeadData = xlsread(examp08_02.xls); %从Excel文

发布功能完成02《ivx低代码签到系统制作》

制作iVX 低代码项目需要进入在线IDE&#xff1a;https://editor.ivx.cn/ 一、发布功能思考、数据库、服务创建 上一节我们制作了后台的页面&#xff0c;那么此时我们应该需要制作发布功能了&#xff0c;那么在制作之前&#xff0c;我们应该思考一下如何制作这个签到内容发布的…

WPF 实现简易北京地铁效果图

本文经原作者授权以原创方式二次分享&#xff0c;欢迎转载、分享。原文作者&#xff1a;眾尋原文地址&#xff1a; https://www.cnblogs.com/ZXdeveloper/p/8600785.html前言这个是百度地图上北京地铁的地址&#xff0c;我们先看下百度上面的效果图&#xff1b;我要实现的内容比…

C#学习笔记-Windows窗体自定义初始位置

根据屏幕大小定义初始位置&#xff1a; &#xff08;这个不是难&#xff0c;但是最近常常忘记&#xff0c;记着方便查看。&#xff09; 1 //获取当前屏幕的长和宽2 int ScreenX Screen.PrimaryScreen.Bounds.Width;3 int ScreenY Screen…

【MATLAB统计分析与应用100例】案例014:matlab读取Excel数据,调用stepwise函数作交互式逐步回归分析

文章目录 1. 交互式逐步回归分析结果预览2. 完整matlab代码1. 交互式逐步回归分析结果预览 2. 完整matlab代码 % 从Excel文件examp08_03.xls中读取数值型数据 xydata = xlsread(examp08_03.xls); y = xydata<

MsSql 自定义分数段,按分数段统计考试人次

--分数段分布 DECLARE levels VARCHAR(100) 10,20,30,40,50,60,70,80,90,100; --自定义分数段 DECLARE paperId VARCHAR(100)0000000000001019--试卷编号 WITH tbTemp AS (--处理分数段SELECT L.levelFROM (SELECT [value] CONVERT(XML, <v> REPLACE(levels, ,, &l…

opencv---颜色空间转化并实现物体跟踪

一、图像处理的基本操作 因为这是第一篇写opencv的笔记&#xff0c;故先讲讲在python下写opencv的基本操作。总共总结了三点如下&#xff1a; 开头一定要加编码声明:-*- coding: utf-8 -*-python下记得引入opencv模块:import cv2要知道如何读取并展示图片,代码如下:# -*- codin…

签到功能完成03《ivx低代码签到系统制作》

制作iVX 低代码项目需要进入在线IDE&#xff1a;https://editor.ivx.cn/ 一、签到页制作 上一节完成了签到内容的发布&#xff0c;那我们接下来如何制作一个签到页面并且签到呢&#xff1f; 此时我们先创建一个页面叫做签到二维码页面&#xff1a; 随后直接预览这个页面&am…

System.CommandLine参数Argument

前两篇说到Command和Option&#xff0c;这篇说说Argument。Argument的用法和Option很像&#xff0c;只是他的构造和属性有些差别&#xff0c;再就是在运行输入时有区别&#xff0c;接下来看看实现。一个参数//创建根命令 var rootCommand new RootCommand("这是一个命令行…