Python通过amqp消息队列协议中的Qpid实现数据通信

简介:

    这两天看了消息队列通信,打算在配置平台上应用起来。以前用过zeromq但是这东西太快了,还有就是rabbitmq有点大,新浪的朋友推荐了qpid,简单轻便。自己总结了下文档,大家可以瞅瞅。


AMQP(消息队列协议Advanced Message Queuing Protocol)是一种消息协议 ,等同于JMS,但是JMS只是java平台的方案,AMQP是一个跨语言的协议。


AMQP 不分语言平台,主流的语言都支持,运维这边的perl,python,ruby更是支持,所以大家就放心用吧。


主流的消息队列通信类型:

点对点:A 发消息给 B。
广播:A 发给所有其他人的消息
组播:A 发给多个但不是所有其他人的消息。
Requester/response:类似访问网页的通信方式,客户端发请求并等待,服务端回复该请求
Pub-sub:类似杂志发行,出版杂志的人并不知道谁在看这本杂志,订阅的人并不关心谁在发表这本杂志。出版的人只管将信息发布出去,订阅的人也只在需要的时候收到该信息。
Store-and-forward:存储转发模型类似信件投递,写信的人将消息写给某人,但在将信件发出的时候,收信的人并不一定在家等待,也并不知道有消息给他。但这个消息不会丢失,会放在收信者的信箱中。这种模型允许信息的异步交换。
其他通信模型。。。


Publisher --->Exchange ---> MessageQueue --->Consumer


整个过程是异步的.Publisher,Consumer相互不知道对方的存在,Exchange负责交换/路由,依靠Routing Key,每个消息者有一个Routing Key,每个Binding将自已感兴趣的RoutingKey告诉Exchange,以便Exchange将相关的消息转发给相应的Queue !


几个概念

几个概念
Producer,Routing Key,Exchange,Binding,Queue,Consumer.
Producer: 消息的创建者,消息的发送者
Routing Key:唯一用来映射消息该进入哪个队列的标识
Exchange:负责消息的路由,交换
Binding:定义Queue和Exchange的映射关系
Queue:消息队列
Consumer:消息的使用者
Exchange类型
Fan-Out:类似于广播方式,不管RoutingKey
Direct:根据RoutingKey,进行关联投寄
Topic:类似于Direct,但是支持多个Key关联,以组的方式投寄。key以.来定义界限。类似于usea.news,usea.weather.这两个消息是一组的。


wKioL1MA096hj-AFAAENzUSW7c0736.jpg


QPID


Qpid 是 Apache 开发的一款面向对象的消息中间件,它是一个 AMQP 的实现,可以和其他符合 AMQP 协议的系统进行通信。Qpid 提供了 C++/Python/Java/C# 等主流编程语言的客户端库,安装使用非常方便。相对于其他的 AMQP 实现,Qpid 社区十分活跃,有望成为标准 AMQP 中间件产品。除了符合 AMQP 基本要求之外,Qpid 提供了很多额外的 HA 特性,非常适于集群环境下的消息通信!


基本功能外提供以下特性:


采用 Corosync(?)来保证集群环境下的Fault-tolerant(?) 特性

支持XML的Exchange,消息为XML时,彩用Xquery过滤

支持plugin

提供安全认证,可对producer/consumer提供身份认证

qpidd --port --no-data-dir --auth

port:端口

--no-data-dir:不指定数据目录

--auth:不启用安全身份认证



启动后自动创建一些Exchange,amp.topic,amp.direct,amp.fanout


tools:


Qpid-config:维护Queue,Exchange,内部配置

Qpid-route:配置broker Federation(联盟?集群?)

Qpid-tool:监控


咱们说完介绍了,这里就赶紧测试下。


服务器端的安装:


yum install qpid-cpp-server
yum install qpid-tools
/etc/init.d/qpidd start


发布端的测试代码

wKioL1MAxo6idPUMAAM2EoX3xTs484.jpg


一些测试代码转自: ibm 的开发社区 


#!/usr/bin/env python
#xiaorui.cc
#转自ibm开发社区
import optparse, time
from qpid.messaging import *
from qpid.util import URL
from qpid.log import enable, DEBUG, WARN
def nameval(st):idx = st.find("=")if idx >= 0:name = st[0:idx]value = st[idx+1:]else:name = stvalue = Nonereturn name, value
parser = optparse.OptionParser(usage="usage: %prog [options] ADDRESS [ CONTENT ... ]",description="Send messages to the supplied address.")
parser.add_option("-b", "--broker", default="localhost",help="connect to specified BROKER (default %default)")
parser.add_option("-r", "--reconnect", action="store_true",help="enable auto reconnect")
parser.add_option("-i", "--reconnect-interval", type="float", default=3,help="interval between reconnect attempts")
parser.add_option("-l", "--reconnect-limit", type="int",help="maximum number of reconnect attempts")
parser.add_option("-c", "--count", type="int", default=1,help="stop after count messages have been sent, zero disables (default %default)")
parser.add_option("-t", "--timeout", type="float", default=None,help="exit after the specified time")
parser.add_option("-I", "--id", help="use the supplied id instead of generating one")
parser.add_option("-S", "--subject", help="specify a subject")
parser.add_option("-R", "--reply-to", help="specify reply-to address")
parser.add_option("-P", "--property", dest="properties", action="append", default=[],metavar="NAME=VALUE", help="specify message property")
parser.add_option("-M", "--map", dest="entries", action="append", default=[],metavar="KEY=VALUE",help="specify map entry for message body")
parser.add_option("-v", dest="verbose", action="store_true",help="enable logging")
opts, args = parser.parse_args()
if opts.verbose:enable("qpid", DEBUG)
else:enable("qpid", WARN)
if opts.id is None:spout_id = str(uuid4())
else:spout_id = opts.id
if args:addr = args.pop(0)
else:parser.error("address is required")
content = None
if args:text = " ".join(args)
else:text = None
if opts.entries:content = {}if text:content["text"] = textfor e in opts.entries:name, val = nameval(e)content[name] = val
else:content = text
conn = Connection(opts.broker,reconnect=opts.reconnect,reconnect_interval=opts.reconnect_interval,reconnect_limit=opts.reconnect_limit)
try:conn.open()ssn = conn.session()snd = ssn.sender(addr)count = 0start = time.time()while (opts.count == 0 or count < opts.count) and \(opts.timeout is None or time.time() - start < opts.timeout):msg = Message(subject=opts.subject,reply_to=opts.reply_to,content=content)msg.properties["spout-id"] = "%s:%s" % (spout_id, count)for p in opts.properties:name, val = nameval(p)msg.properties[name] = valsnd.send(msg)count += 1print msg
except SendError, e:print e
except KeyboardInterrupt:pass
conn.close()


客户端的测试代码:

wKiom1MAxznzudhOAARm6bOwuRE241.jpg


#!/usr/bin/env python
#xiaorui.cc
##转自ibm开发社区
import optparse
from qpid.messaging import *
from qpid.util import URL
from qpid.log import enable, DEBUG, WARN
parser = optparse.OptionParser(usage="usage: %prog [options] ADDRESS ...",description="Drain messages from the supplied address.")
parser.add_option("-b", "--broker", default="localhost",help="connect to specified BROKER (default %default)")
parser.add_option("-c", "--count", type="int",help="number of messages to drain")
parser.add_option("-f", "--forever", action="store_true",help="ignore timeout and wait forever")
parser.add_option("-r", "--reconnect", action="store_true",help="enable auto reconnect")
parser.add_option("-i", "--reconnect-interval", type="float", default=3,help="interval between reconnect attempts")
parser.add_option("-l", "--reconnect-limit", type="int",help="maximum number of reconnect attempts")
parser.add_option("-t", "--timeout", type="float", default=0,help="timeout in seconds to wait before exiting (default %default)")
parser.add_option("-p", "--print", dest="format", default="%(M)s",help="format string for printing messages (default %default)")
parser.add_option("-v", dest="verbose", action="store_true",help="enable logging")
opts, args = parser.parse_args()
if opts.verbose:enable("qpid", DEBUG)
else:enable("qpid", WARN)
if args:addr = args.pop(0)
else:parser.error("address is required")
if opts.forever:timeout = None
else:timeout = opts.timeout
class Formatter:def __init__(self, message):self.message = messageself.environ = {"M": self.message,"P": self.message.properties,"C": self.message.content}def __getitem__(self, st):return eval(st, self.environ)
conn = Connection(opts.broker,reconnect=opts.reconnect,reconnect_interval=opts.reconnect_interval,reconnect_limit=opts.reconnect_limit)
try:conn.open()ssn = conn.session()rcv = ssn.receiver(addr)count = 0while not opts.count or count < opts.count:try:msg = rcv.fetch(timeout=timeout)print opts.format % Formatter(msg)count += 1ssn.acknowledge()except Empty:break
except ReceiverError, e:print e
except KeyboardInterrupt:pass
conn.close()


Browse 模式的意思是,浏览的意思,一个特殊的需求,我访问了一次,别人也能访问。

Consume 模式的意思是,我浏览了一次后,删除这一条。别人就访问不到啦。

这个是浏览模式:

wKiom1MAx87zwLEFAAbCk_wE0PY041.jpg


sub-pub 通信的例子


Pub-sub 是另一种很有用的通信模型。恐怕它的名字就源于出版发行这种现实中的信息传递方式吧,publisher 就是出版商,subscriber 就是订阅者。


服务端
qpid-config add exchange topic news-service
./spout news-service/news xiaorui.cc
客户端:
./drain -t 120 news-service/#.news


PUB端,创建TOPIC点 !

wKiom1MAzGDg3nZbAAPRnFen8Y4988.jpg


SUB端,也就是接收端!

wKiom1MAzG2Tsf9YAAO76LbxAS8638.jpg


总结:

 qpid挺好用的,比rabbitmq要轻型,比zeromq保险点! 各方面的文档也都很健全,值得一用。    话说,这三个消息队列我也都用过,最一开始用的是redis的pubsub做日志收集和信息通知,后来在做集群相关的项目的时候,我自己搞了一套zeromq的分布式任务分发,和saltstack很像,当然了远没有万人用的salt强大。  rabbitmq的用法,更是看中他的安全和持久化,当然性能真的不咋地。

关于qpid的性能我没有亲自做大量的测试,但是听朋友说,加持久化可以到7k,不加持久化可以到1500左右。


本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/300229.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

基于 Blazor 打造一款实时字幕

早先在录制视频的时候一直使用的是 obs-auto-subtitle 作为实时字幕展示功能。不过这个是以 OBS 插件的形式存在&#xff0c;不管是语言和功能上都有一定的限制。故而使用 Blazor server 实现一个。总体思路 实时字幕自然需要语音转文字的功能。考察了一些服务之后&#xff0c;…

一个数学系毕业的物理学家,是怎么拿到诺贝尔化学奖的?

全世界只有3.14 % 的人关注了青少年数学之旅2019年10月9日&#xff0c;这个“特别好”教授&#xff0c;1940年&#xff0c;“特别好”考上了耶鲁&#xff0c;1943年&#xff0c;“特别好”终于拿到数学学士学位。“特别好”特别沮丧&#xff0c;▲ “特别好”在牛津大学&#x…

杀鸡焉用牛刀!放下Windbg,让dotnet-stack来快速定位死锁原因

我们用来分析CPU过高、死锁问题的常见方案是使用Windbg分析dump文件。但是这种方式存在一些缺点&#xff0c;比如dump文件过大难以下载&#xff0c;windbg使用过于复杂难以掌握等。这里介绍一个小工具dotnet-stack&#xff0c;帮助我们检查托管代码调用堆栈&#xff0c;快速定位…

数学中那些非常奇葩的证明

全世界只有3.14 % 的人关注了青少年数学之旅一、费马大定理证明证&#xff1a;是无理数假设是有理数&#xff0c;p和q是互素正整数那么移项得又由费马大定理可知&#xff1a;与费马大定理(Fermats last therorem)矛盾, Q.E.D. &#xff08;也可易证2的n分之一次方且n属于大于2的…

使用 Minimal API 改造动态文件提供者

使用 Minimal API 改造动态文件提供者Intro之前介绍过一个基于动态文件提供者来实现静态网站的动态更新&#xff0c;可以参考 ASP.NET Core 实现一个简单的静态网站滚动更新&#xff0c;在 Minimal API 出现之后想改造成 Minimal API 的写法&#xff0c;但是由于之前版本的 Min…

[导入]体验Asp.Net Mvc Preview5(3)-探索ModelBinder的工作原理

摘要: 在前面的两篇文章中,我们研究了Asp.Net Mvc Preview5的ViewEingine的改进,从本篇开始,我们开始研究Preview5中的新特性:ModelBinder,首先我们来了解下什么是ModelBinder特性,这有什么用处,在以前的版本中,如果我们要在Action中获取数据,一般有三种方式,一是通过Action的参…

复盘:我的三个月远程办公实践,有自由,也有代价

这是头哥侃码的第244篇原创有人说&#xff0c;人生就是一个不断尝试的过程。我觉得&#xff0c;有时候这个词其实不准确&#xff0c;因为每个人的性格不同&#xff0c;成长经历及运势不同&#xff0c;所以对 “尝试” 俩字的理解也就不同。在我还是孩子的时候&#xff0c;几乎所…

Silverlight专题(10)- WatermarkedTextBox使用

问题&#xff1a; 之前的Silverlight版本都有一个WatermarkedTextBox控件 但是到了Silverlight 2 Beta2版本&#xff0c;由于和WPF兼容的考虑 WatermarkedTextBox被移除了 虽然之前我有看到消息说Silverlight 2正式Release的时候会给TextBox一个Watermark属性 但是Silverlight …

90后一代人还能通过攒钱改变现状吗?

全世界只有3.14 % 的人关注了青少年数学之旅每次打开公号&#xff0c;扑面而来一阵阵焦虑&#xff1a;95后毕业3个月就买房&#xff0c;你的同龄人正在抛弃你毕业3年&#xff0c;年薪超100万&#xff1a;赚钱&#xff0c;是一种修行一线城市财务自由门槛2.9亿&#xff0c;看看你…

从高德侯军到《李嘉诚:商者无域》

从高德侯军到《李嘉诚&#xff1a;商者无域》 【编者按】转载这篇文章是因为看到了业内著名企业高德董事长侯军跻身2008胡润排行榜&#xff0c;让人不禁联想起高德在业内一贯的潜行风格&#xff0c;而侯军先生也颇有点“忍者神龟”的隐喻&#xff0c;在业内企业家当中属闷声发大…

测试龙芯 LoongArch .NET之 使用 FastTunnel 做内网穿透远程计算机

龙芯3A5000 已经上市&#xff0c;从老伙计哪里搞来一台3A5000 机器&#xff0c;安装统信UOS。使用体验上看还可以&#xff0c;就是软件生态急需建设&#xff0c;软件生态的建设上自然有我dotnet 的一份力量。龙芯团队已经完成了LoongArch 的.NET Core 3.1版本的研发&#xff0c…

利用jquery给指定的table动态添加一行、删除一行

今天在项目中&#xff0c;刚好用到给指定的table添加一行、删除一行&#xff0c;就直接找google&#xff0c;搜出来的东西不尽如人意&#xff0c;不是功能不好就是千篇一律&#xff0c;简直浪费时间还不讨好&#xff0c;于是乎就自己动手封装个&#xff0c;现就把代码分享出来&…

求求你把输入法调小一点... | 今日最佳

全世界只有3.14 % 的人关注了青少年数学之旅

linux安装卷管理,Linux安装管理ISCSI卷(initiator端)

Internet SCSI(iSCSI)是一种网络协议&#xff0c;使用TCP/IP网络来传输SCSI协议。它是代替FC(Fibre Channel-based&#xff0c;光纤通道&#xff1f;) SAN的很好选Internet SCSI(iSCSI)是一种网络协议&#xff0c;使用TCP/IP网络来传输SCSI协议。它是代替FC(Fibre Channel-base…

Blazor 事件处理开发指南

翻译自 Waqas Anwar 2021年3月25日的文章 《A Developer’s Guide To Blazor Event Handling》 [1]如果您正在开发交互式 Web 应用程序&#xff0c;根据不同的应用程序事件和用户操作动态更新用户界面是十分常见的做法。这些操作会触发事件&#xff0c;而作为开发人员&#xff…

android 开源组件合集-UI篇(2013-11-07更新)

其实也算不上合集,只是将我经常用到的部分整理一下,如果您有好东西,也可以留言补充 1.actionbar http://actionbarsherlock.com/ https://github.com/JakeWharton/ActionBarSherlock (推荐) 2.下拉刷新pulltorefresh https://github.com/chrisbanes/Android-PullToRefresh 支持…

改变世界的5大常数,学过数学的人,这一辈子都不会忘记!

全世界只有3.14 % 的人关注了青少年数学之旅何谓数学&#xff1f;数学家Eduardo曾这样回答“数学是永恒&#xff0c;是真理&#xff0c;是一切的答案。”回首往昔数学始终伴随我们左右纵横交错的几何、繁琐复杂的运算难以求解的方程、无从下手的猜想......尽管在数学道路上有多…

创维linux进入工厂模式,创维电视怎么进入工厂模式?

满意答案zrwemshwt54推荐于 2019.11.03创维彩电进入与退出工厂模式方法的汇总一&#xff0e; D系列5D01机芯&#xff1a;进入&#xff1a;在遥控器屏显键的正下方&#xff0c;加装一个按键(SERVICE键)&#xff0c;按该键即可进入工厂模式。退出&#xff1a; 按遥控器上的TV/AV键…

收到在微软商店购买的商品

今天收到了在微软商店购买的商品&#xff0c;送货速度真快&#xff0c;20号下的订单&#xff0c;今天就拿到了&#xff0c;这么快就从美国通过UPS快递到国内&#xff0c;现在的物流越来越发达了。我购买的商品是&#xff1a;1、WM USB Powered Speakers(USB扬声器)2、LifeCam V…

[导入]纹理拼接后的Wrap寻址

拼接后的纹理:正常的草地(不进行WRAP寻址):文章来源:http://blog.csdn.net/xoyojank/archive/2008/11/03/3212425.aspx转载于:https://www.cnblogs.com/xoyojank/archive/2008/11/04/1343671.html