RabbitMQ详解(三)

一、分发到多Consumer(fanout)
二、Routing路由(Direct)
三、主题路由(Topic)

一、分发到多Consumer(fanout)
将同一个Message deliver到多个Consumer中。这个模式也被称为"publish/subscribe"
创建一个日志系统,包含两部分:第一部分发出log(Producer),第二部分接收到并打印(Consumer)。两个Consumer,第一个将log写到物理磁盘上;第二个将log输出的屏幕。

1.发送消息流程:
    1.Producer发送的Message实际上是发到了Exchange中。
    2.Exchanges从Producer接收message投递到queue中
    3.Prducer发送的消息只是到达了Exchange中,Exchange具有不同的类型实现不同的分发方式

Exchnges的类型:direct、topic和fanout
fanout就是广播模式,会将所有的Message都放到它所知道的queue中
channel.exchange_declare(exchange='logs',  
    type='fanout')   //创建一个名字为logs,类型为fanout的Exchange:

1
2
3
4
5
6
7
8
9
10
11
[root@node112 ~]# rabbitmqctl list_exchanges //查看所有的Exchanges
Listing exchanges ...
logs  fanout
amq.direct    direct
amq.fanout    fanout
amq.headers    headers
amq.match    headers
amq.rabbitmq.log    topic
amq.rabbitmq.trace    topic
amq.topic    topic
...done.

注意:amq.* exchanges 和the default (unnamed)exchange是RabbitMQ默认创建的。 

通过exchange,而不是routing_key来publish Message:
channel.basic_publish(exchange='logs',  
    routing_key='',  
    body=message)  

2.临时队列
截至现在,我们用的queue都是有名字的:第一个是hello,第二个是task_queue。使用有名字的queue,使得在Producer和Consumer之前共享queue成为可能。
但是对于我们将要构建的日志系统,并不需要有名字的queue。我们希望得到所有的log,而不是它们中间的一部分。而且我们只对当前的log感兴趣。为了实现这个目标,我们需要两件事情:
    1)每当Consumer连接时,我们需要一个新的,空的queue。因为我们不对老的log感兴趣。幸运的是,如果在声明queue时不指定名字,那么RabbitMQ会随机为我们选择这个名字。方法:
    result = channel.queue_declare() 
    通过result.method.queue 可以取得queue的名字。基本上都是这个样子:amq.gen-JzTY20BRgKO-HjmUJj0wLg。
    2)当Consumer关闭连接时,这个queue要被deleted。可以加个exclusive的参数。方法:
    result = channel.queue_declare(exclusive=True)   //每次获取的都是新的,单独使用的
    
3.Bindings绑定
    创建好fanout类型的Exchange和没有名字的queue后(实际上是RabbitMQ帮我们取的名字)Exchange通过bindings把它的Message发送到目标queue
    channel.queue_bind(exchange='logs',  
        queue=result.method.queue)      
    使用命令rabbitmqctl list_bindings 查看bindings
    
4.最终代码
拓扑图:
1.png

Producer,在这里就是产生log的program,基本上和前几个都差不多。最主要的区别就是publish通过了exchange而不是routing_key。
emit_log.py script:
===========================================================================

1
2
3
4
5
6
7
8
9
10
11
12
13
14
#!/usr/bin/env python
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='logs',
    type='fanout')
message = ' '.join(sys.argv[1:]) or "info: Hello World!"
channel.basic_publish(exchange='logs',
    routing_key='',
    body=message)
print " [x] Sent %r" % (message,)
connection.close()

还有一点要注意的是我们声明了exchange。publish到一个不存在的exchange是被禁止的。如果没有queue bindings exchange的话,log是被丢弃的。
Consumer:receive_logs.py:
===========================================================================

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
#!/usr/bin/env python
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='logs',
    type='fanout')
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
channel.queue_bind(exchange='logs',
    queue=queue_name)
print ' [*] Waiting for logs. To exit press CTRL+C'
def callback(ch, method, properties, body):
    print " [x] %r" % (body,)
channel.basic_consume(callback,
    queue=queue_name,
    no_ack=True)
channel.start_consuming()

试运行:
    Consumer1:$ python receive_logs.py > logs_from_rabbit.log  //追加到文件
    Consumer2:python receive_logs.py //输出到屏幕
    Producer:python emit_log.py
也可通过修改callback自己写文件
输出结果如图:
3.png

二、Routing路由(Direct)
对于上一个日志系统改进。能够使用不同的severity来监听不同等级的log。比如我们希望只有error的log才保存到磁盘上。
1.Bindings绑定
之前的绑定
channel.queue_bind(exchange=exchange_name,  
    queue=queue_name)  
绑定其实就是关联了exchange和queue。或者这么说:queue对exchagne的内容感兴趣,exchange要把它的Message deliver到queue中。
实际上,绑定可以带routing_key 这个参数。其实这个参数的名称和basic_publish 的参数名是相同了。为了避免混淆,我们把它成为binding key。
    使用一个key来创建binding :
channel.queue_bind(exchange=exchange_name,  
    queue=queue_name,  
    routing_key='black') 
对于fanout的exchange来说,这个参数是被忽略的。

2.Direct Exchange
通过Bindings key完全匹配
图Direct路由模型
Direct.png

exchange X和两个queue绑定在一起。Q1的binding key是orange。Q2的binding key是black和green。
当P publish key是orange时,exchange会把它放到Q1。如果是black或者green那么就会到Q2。其余的Message都会被丢弃。

3.多重绑定(Multiple Bindings)
多个queue绑定同一个key是可以的。对于下图的例子,Q1和Q2都绑定了black。也就是说,对于routing key是black的Message,会被deliver到Q1和Q2。其余的Message都会被丢弃。
图muliti-bindings
multi.png

4.生产者和消费者
生产者:
===========================================================================

1
2
3
4
5
6
7
8
channel.exchange_declare(exchange='direct_logs',  
    type='direct')  
//创建一个direct的exchange。使用log的severity作为routing key,这样Consumer可以针对不同severity的log进行不同的处理。
publish:
channel.basic_publish(exchange='direct_logs',  
    routing_key=severity, 
    body=message)  
//涉及三种severity:'info''warning''error'.

消费者:
===========================================================================

1
2
3
4
5
6
7
result = channel.queue_declare(exclusive=True)  
queue_name = result.method.queue  
for severity in severities:  
    channel.queue_bind(exchange='direct_logs',  
        queue=queue_name,  
        routing_key=severity) 
//queue需要绑定severity

5.最终版本
图:direct_2
direct_2.png

emit_log_direct.py 
===========================================================================

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
#!/usr/bin/env python
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='direct_logs',
    type='direct')
severity = sys.argv[1] if len(sys.argv) > 1 else 'info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='direct_logs',
    routing_key=severity,
    body=message)
print " [x] Sent %r:%r" % (severity, message)
connection.close()

receive_logs_direct.py: 
===========================================================================

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
#!/usr/bin/env python  
import pika  
import sys  
connection = pika.BlockingConnection(pika.ConnectionParameters(  
    host='localhost'))  
channel = connection.channel()  
channel.exchange_declare(exchange='direct_logs',  
    type='direct')  
result = channel.queue_declare(exclusive=True)  
queue_name = result.method.queue  
severities = sys.argv[1:]  
if not severities:  
    print >> sys.stderr, "Usage: %s [info] [warning] [error]" % \  
        (sys.argv[0],)  
    sys.exit(1)  
for severity in severities:      
    channel.queue_bind(exchange='direct_logs',  
        queue=queue_name,  
        routing_key=severity)  
print ' [*] Waiting for logs. To exit press CTRL+C'  
def callback(ch, method, properties, body):  
    print " [x] %r:%r" % (method.routing_key, body,)  
channel.basic_consume(callback,  
    queue=queue_name,  
    no_ack=True)  
channel.start_consuming()

===========================================================================
试运行:
$ python receive_logs_direct.py warning error > logs_from_rabbit.log 
    //把warning和error的log记录到一个文件中
$ python receive_logs_direct.py info warning error  
    //打印所有log到屏幕    

三、主题路由(Topic)
1.Topic exchange
Message的routing_key使用限制,不能使任意的。格式是以点号“."分割的字符表。
比如:"stock.usd.nyse", "nyse.vmw", "quick.orange.rabbit"。你可以放任意的key在routing_key中,当然最长不能超过255 bytes。
    对于routing_key,有两个特殊字符(在正则表达式里叫元字符):
    * (星号) 代表任意 一个单词
    # (hash) 0个或者多个单词
示例:
Producer发送消息时需要设置routing_key,routing_key包含三个单词和两个点号。
    第一个key是描述了celerity(灵巧,敏捷),第二个是colour(色彩),第三个是species(物种):"<celerity>.<colour>.<species>"。
在这里我们创建了两个绑定: Q1 的binding key 是"*.orange.*"; Q2 是  "*.*.rabbit" 和 "lazy.#":
    Q1 感兴趣所有orange颜色的动物
    Q2 感兴趣所有的rabbits和所有的lazy的
比如routing_key是 "quick.orange.rabbit"将会发送到Q1和Q2中。消息"lazy.orange.elephant" 也会发送到Q1和Q2。但是"quick.orange.fox" 会发送到Q1;"lazy.brown.fox"会发送到Q2。"lazy.pink.rabbit" 也会发送到Q2,但是尽管两个routing_key都匹配,它也只是发送一次。"quick.brown.fox" 会被丢弃。
如果发送的单词不是3个呢? 答案要看情况,因为#是可以匹配0个或任意个单词。比如"orange" or "quick.orange.male.rabbit",它们会被丢弃。如果是lazy那么就会进入Q2。类似的还有 "lazy.orange.male.rabbit",尽管它包含四个单词。

Topic exchange和其他exchange
    由于有"*" (star) and "#" (hash), Topic exchange 非常强大并且可以转化为其他的exchange:
    如果binding_key 是 "#" - 它会接收所有的Message,不管routing_key是什么,就像是fanout exchange。
    如果 "*" (star) and "#" (hash) 没有被使用,那么topic exchange就变成了direct exchange。

2.代码实现
The code for emit_log_topic.py:
========================================================================

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
#!/usr/bin/env python
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='topic_logs',
    type='topic')
routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='topic_logs',
    routing_key=routing_key,
    body=message)
print " [x] Sent %r:%r" % (routing_key, message)
connection.close()

========================================================================

The code for receive_logs_topic.py:     
========================================================================    

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
#!/usr/bin/env python
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='topic_logs',
    type='topic')
     
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
binding_keys = sys.argv[1:]
if not binding_keys:
    print >> sys.stderr, "Usage: %s [binding_key]..." % (sys.argv[0],)
    sys.exit(1)
for binding_key in binding_keys:
    channel.queue_bind(exchange='topic_logs',
        queue=queue_name,
        routing_key=binding_key)
print ' [*] Waiting for logs. To exit press CTRL+C'
def callback(ch, method, properties, body):
    print " [x] %r:%r" % (method.routing_key, body,)
channel.basic_consume(callback,
    queue=queue_name,
    no_ack=True)
channel.start_consuming()

    
3.运行和结果
    python receive_logs_topic.py "#"  //接收所有的log
    python receive_logs_topic.py "kern.*"  //接收所有kern facility的log
    python receive_logs_topic.py "*.critical"  //仅仅接收critical的log: 
    python receive_logs_topic.py "kern.*" "*.critical"  //可以创建多个绑定: 
    python emit_log_topic.py "kern.critical" "A critical kernel error"  //Producer产生一个log:"kern.critical" type: 
    
参考:    
http://www.rabbitmq.com/tutorials/tutorial-three-python.html










本文转自MT_IT51CTO博客,原文链接:http://blog.51cto.com/hmtk520/2051247,如需转载请自行联系原作者

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/286089.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

重磅 | Linux内核5.19初步支持LoongArch架构

经过龙芯中科与内核社区一年多的紧密合作&#xff0c;北京时间2022年6月4日清晨&#xff0c;Linux内核社区正式合并LoongArch架构支持代码。随着Linux-5.19的rc1版本的正式发布&#xff0c;LoongArch体系结构主体部分的源码已合并到内核主线之中&#xff0c;其余相关代码正在进…

C语言试题五十之请编写一个函数void function(char *ss),其功能时:将字符串ss中所有下标为奇数位置上的字母转换为大写(若位置上不是字母,则不转换)。

📃个人主页:个人主页 🔥系列专栏:C语言试题200例目录 💬推荐一款刷算法、笔试、面经、拿大公司offer神器 👉 点击跳转进入网站 ✅作者简介:大家好,我是码莎拉蒂,CSDN博客专家(全站排名Top 50),阿里云博客专家、51CTO博客专家、华为云享专家 1、题目 请编写一个…

【MATLAB统计分析与应用100例】案例011:matlab读取Excel数据,调用regress函数作一元线性回归分析

数据拟合效果预览: 文章目录 1. 读取数据,绘制散点图2. 计算相关系数3. 绘制回归直线4. 剔除异常数据,重新调用regress函数作一元线性回归1. 读取数据,绘制散点图 ClimateData = xlsread(examp08_01.xls); % 从Excel文件读取数据 x &

“*** IS NOT TRANSLATED IN …….. 解决办法

首先引起提示的原因是因为Lint 代码检查工具发现你的项目中&#xff08;或者引用的三方库&#xff09;有部分string.xml文件内容做了国际化操作&#xff0c;但却不完整&#xff0c;有些文本内容并没有相应的国际化翻译&#xff0c;在android开发中常见于项目引用的Libraries第三…

[转] ArcEngine 产生专题图

小生原文 ArcEngine 产生专题图 ArcEngine提供多个着色对象用于产生专题图&#xff0c;可以使用标准着色方案&#xff0c;也可以自定义着色方案&#xff0c;ArcEngine提供8中标准着色方案。 一、SimpleRenderer专题图 是使用单一符号进行着色分类&#xff0c;不涉及对要素的数据…

iVX无代码挑战五秒游戏制作

一、五秒挑战游戏简介及思考 制作iVX 低代码项目需要进入在线IDE&#xff1a;https://editor.ivx.cn/ 五秒挑战游戏指的是点击一个按钮开始计时&#xff0c;随后需要用户再次点击计时按钮&#xff0c;将会停止计时&#xff0c;当计时的时间等于五秒时将挑战成功&#xff0c;否…

C语言试题五十一之已知学生的记录是由学号和学习成绩构成,n名学生的数据已存入s结构体数组中。请编写函数fun,该函数的功能是:找出成绩最高的学生记录,通过形参返回主函数(规定只有一个最高分)。

📃个人主页:个人主页 🔥系列专栏:C语言试题200例目录 💬推荐一款刷算法、笔试、面经、拿大公司offer神器 👉 点击跳转进入网站 ✅作者简介:大家好,我是码莎拉蒂,CSDN博客专家(全站排名Top 50),阿里云博客专家、51CTO博客专家、华为云享专家 1、题目 请编写一个…

CSS 巧用 :before和:after

前几天的晚上较全面的去看了下css的一些文档和资料&#xff0c;大部分的样式运用都没什么大问题了&#xff0c;只是有些许较陌生&#xff0c;但是也知道他们的存在和实现的是什么样式。今天主要想在这篇学习笔记中写的也不多&#xff0c;主要是针对:before和:after写一些内容&a…

MAUI 入门教程系列(4.通用主机)

前言对于ASP.NET Core 开发人员而言, 这并不陌生, 当ASP.NET Core应用程序启动时, 会创建默认的应用程序主机, 我们可以为应用程序配置所有的依赖关系、系统设置, 最终启动。如下所示:using IHost host Host.CreateDefaultBuilder(args).ConfigureServices((_, services) >…

【MATLAB统计分析与应用100例】案例012:matlab读取Excel数据,调用robustfit函数作稳健回归

稳健回归效果预览: 文章目录 1. 读取数据2. 调用robustfit函数作稳健回归3 .绘制残差和权重的散点图4. 绘制regress函数和robustfit函数对应的回归直线5. 拟合效果1. 读取数据 ClimateData = xlsread(examp08_01.xls); % 从Excel文件读取数据 x

Android单击、长按获取当前触点坐标下(TextView)文字字符

package com.*.*.*.utils;import android.graphics.Rect; import android.text.Layout; import android.widget.TextView;public class TextViewUtils {/**获取TextView某一个字符的坐标位置return 返回的是相对坐标parms tvparms index 字符索引*/public static Rect getTextV…

后台页制作01《ivx低代码签到系统制作》

制作iVX 低代码项目需要进入在线IDE&#xff1a;https://editor.ivx.cn/ 一、签到系统思考 签到系统一般是指公布一个签到链接或者二维码&#xff0c;随后用户扫码后即可完成签到。 那如何制作呢&#xff1f;首先我们可以先不考虑签到页面的制作&#xff0c;既然签到暂时没有…

个人作业-Week2

第一部分 调研&#xff0c; 评测 运行平台 win 8 软件版本&#xff1a;微软必应词典桌面版 3.5.2 BUG标题&#xff1a;必应背单词无法发音 BUG详细描述&#xff1a;如图&#xff0c;左边为必应词典该单词的搜索&#xff0c;可以发音&#xff0c;而右边必应背单词中该单词的发音…

Blazor WebAssembly + Grpc Web=未来?

Blazor WebAssembly是什么首先来说说WebAssembly是什么&#xff0c;WebAssembly是一个可以使C#,Java,Golang等静态强类型编程语言&#xff0c;运行在浏览器中的标准&#xff0c;浏览器厂商基于此标准实现执行引擎。在实现了WebAssembly标准引擎之后&#xff0c;浏览器中可以执行…

C语言试题五十二之学生的记录由学号和成绩组称个,n名大学生得数据已在主函数中放入结构体数组a中,请编写函数fun,它的功能时:按分数的高低排列学生的记录,高分在前。

📃个人主页:个人主页 🔥系列专栏:C语言试题200例目录 💬推荐一款刷算法、笔试、面经、拿大公司offer神器 👉 点击跳转进入网站 ✅作者简介:大家好,我是码莎拉蒂,CSDN博客专家(全站排名Top 50),阿里云博客专家、51CTO博客专家、华为云享专家 1、题目 请编写一个…

Xtrabackup备份MySQL

一、安装Xtrabackup 1234# wget --no-check-certificate http://www.percona.com/downloads/percona-release/redhat/0.1-4/percona-release-0.1-4.noarch.rpm# rpm -ivh percona-release-0.1-4.noarch.rpm# yum list | grep percona# yum -y install percona-xtrabackup-24二、…

Can't create directory 'E:\Repositories\***\db\transactions\138-41.txn':

遇到这种问题应该是迁移SVN库时丢失了文件夹引起的&#xff0c;直接在服务器上找到对应项目的db文件夹&#xff0c;手动创建“transactions”目录和“txn-protorevs”目录即可正常提交。

[它山之石] 一件事情,假设你不能说清楚,十有八九你就做不好

记得有一次开会。我的头儿说了标题所写的这句话,自己深以为然。 有过较多解决这个问题的经历的人可能会有这种感觉&#xff0c;非常多时候&#xff0c;面对一个问题。我们即使没有全然将之想清 楚。也可以基于已有的经验给出一个可以work的解决方式&#xff0c;当然这样的情况下…

【MATLAB统计分析与应用100例】案例013:matlab读取Excel数据,调用nlinfit函数作一元非线性回归

1. 一元线性回归分析效果预览 2. matlab完整实现代码 %读取数据,绘制散点图** HeadData = xlsread(examp08_02.xls); %从Excel文

C语言试题五十三之将所有大于1小于整数m的非素数存入xx所指的数组中,非素数的个数通过k传回。

📃个人主页:个人主页 🔥系列专栏:C语言试题200例目录 💬推荐一款刷算法、笔试、面经、拿大公司offer神器 👉 点击跳转进入网站 ✅作者简介:大家好,我是码莎拉蒂,CSDN博客专家(全站排名Top 50),阿里云博客专家、51CTO博客专家、华为云享专家 1、题目 请编写一个…