Python通过amqp消息队列协议中的Qpid实现数据通信

简介:

    这两天看了消息队列通信,打算在配置平台上应用起来。以前用过zeromq但是这东西太快了,还有就是rabbitmq有点大,新浪的朋友推荐了qpid,简单轻便。自己总结了下文档,大家可以瞅瞅。


AMQP(消息队列协议Advanced Message Queuing Protocol)是一种消息协议 ,等同于JMS,但是JMS只是java平台的方案,AMQP是一个跨语言的协议。


AMQP 不分语言平台,主流的语言都支持,运维这边的perl,python,ruby更是支持,所以大家就放心用吧。


主流的消息队列通信类型:

点对点:A 发消息给 B。
广播:A 发给所有其他人的消息
组播:A 发给多个但不是所有其他人的消息。
Requester/response:类似访问网页的通信方式,客户端发请求并等待,服务端回复该请求
Pub-sub:类似杂志发行,出版杂志的人并不知道谁在看这本杂志,订阅的人并不关心谁在发表这本杂志。出版的人只管将信息发布出去,订阅的人也只在需要的时候收到该信息。
Store-and-forward:存储转发模型类似信件投递,写信的人将消息写给某人,但在将信件发出的时候,收信的人并不一定在家等待,也并不知道有消息给他。但这个消息不会丢失,会放在收信者的信箱中。这种模型允许信息的异步交换。
其他通信模型。。。


Publisher --->Exchange ---> MessageQueue --->Consumer


整个过程是异步的.Publisher,Consumer相互不知道对方的存在,Exchange负责交换/路由,依靠Routing Key,每个消息者有一个Routing Key,每个Binding将自已感兴趣的RoutingKey告诉Exchange,以便Exchange将相关的消息转发给相应的Queue !


几个概念

几个概念
Producer,Routing Key,Exchange,Binding,Queue,Consumer.
Producer: 消息的创建者,消息的发送者
Routing Key:唯一用来映射消息该进入哪个队列的标识
Exchange:负责消息的路由,交换
Binding:定义Queue和Exchange的映射关系
Queue:消息队列
Consumer:消息的使用者
Exchange类型
Fan-Out:类似于广播方式,不管RoutingKey
Direct:根据RoutingKey,进行关联投寄
Topic:类似于Direct,但是支持多个Key关联,以组的方式投寄。key以.来定义界限。类似于usea.news,usea.weather.这两个消息是一组的。


wKioL1MA096hj-AFAAENzUSW7c0736.jpg


QPID


Qpid 是 Apache 开发的一款面向对象的消息中间件,它是一个 AMQP 的实现,可以和其他符合 AMQP 协议的系统进行通信。Qpid 提供了 C++/Python/Java/C# 等主流编程语言的客户端库,安装使用非常方便。相对于其他的 AMQP 实现,Qpid 社区十分活跃,有望成为标准 AMQP 中间件产品。除了符合 AMQP 基本要求之外,Qpid 提供了很多额外的 HA 特性,非常适于集群环境下的消息通信!


基本功能外提供以下特性:


采用 Corosync(?)来保证集群环境下的Fault-tolerant(?) 特性

支持XML的Exchange,消息为XML时,彩用Xquery过滤

支持plugin

提供安全认证,可对producer/consumer提供身份认证

qpidd --port --no-data-dir --auth

port:端口

--no-data-dir:不指定数据目录

--auth:不启用安全身份认证



启动后自动创建一些Exchange,amp.topic,amp.direct,amp.fanout


tools:


Qpid-config:维护Queue,Exchange,内部配置

Qpid-route:配置broker Federation(联盟?集群?)

Qpid-tool:监控


咱们说完介绍了,这里就赶紧测试下。


服务器端的安装:


yum install qpid-cpp-server
yum install qpid-tools
/etc/init.d/qpidd start


发布端的测试代码

wKioL1MAxo6idPUMAAM2EoX3xTs484.jpg


一些测试代码转自: ibm 的开发社区 


#!/usr/bin/env python
#xiaorui.cc
#转自ibm开发社区
import optparse, time
from qpid.messaging import *
from qpid.util import URL
from qpid.log import enable, DEBUG, WARN
def nameval(st):idx = st.find("=")if idx >= 0:name = st[0:idx]value = st[idx+1:]else:name = stvalue = Nonereturn name, value
parser = optparse.OptionParser(usage="usage: %prog [options] ADDRESS [ CONTENT ... ]",description="Send messages to the supplied address.")
parser.add_option("-b", "--broker", default="localhost",help="connect to specified BROKER (default %default)")
parser.add_option("-r", "--reconnect", action="store_true",help="enable auto reconnect")
parser.add_option("-i", "--reconnect-interval", type="float", default=3,help="interval between reconnect attempts")
parser.add_option("-l", "--reconnect-limit", type="int",help="maximum number of reconnect attempts")
parser.add_option("-c", "--count", type="int", default=1,help="stop after count messages have been sent, zero disables (default %default)")
parser.add_option("-t", "--timeout", type="float", default=None,help="exit after the specified time")
parser.add_option("-I", "--id", help="use the supplied id instead of generating one")
parser.add_option("-S", "--subject", help="specify a subject")
parser.add_option("-R", "--reply-to", help="specify reply-to address")
parser.add_option("-P", "--property", dest="properties", action="append", default=[],metavar="NAME=VALUE", help="specify message property")
parser.add_option("-M", "--map", dest="entries", action="append", default=[],metavar="KEY=VALUE",help="specify map entry for message body")
parser.add_option("-v", dest="verbose", action="store_true",help="enable logging")
opts, args = parser.parse_args()
if opts.verbose:enable("qpid", DEBUG)
else:enable("qpid", WARN)
if opts.id is None:spout_id = str(uuid4())
else:spout_id = opts.id
if args:addr = args.pop(0)
else:parser.error("address is required")
content = None
if args:text = " ".join(args)
else:text = None
if opts.entries:content = {}if text:content["text"] = textfor e in opts.entries:name, val = nameval(e)content[name] = val
else:content = text
conn = Connection(opts.broker,reconnect=opts.reconnect,reconnect_interval=opts.reconnect_interval,reconnect_limit=opts.reconnect_limit)
try:conn.open()ssn = conn.session()snd = ssn.sender(addr)count = 0start = time.time()while (opts.count == 0 or count < opts.count) and \(opts.timeout is None or time.time() - start < opts.timeout):msg = Message(subject=opts.subject,reply_to=opts.reply_to,content=content)msg.properties["spout-id"] = "%s:%s" % (spout_id, count)for p in opts.properties:name, val = nameval(p)msg.properties[name] = valsnd.send(msg)count += 1print msg
except SendError, e:print e
except KeyboardInterrupt:pass
conn.close()


客户端的测试代码:

wKiom1MAxznzudhOAARm6bOwuRE241.jpg


#!/usr/bin/env python
#xiaorui.cc
##转自ibm开发社区
import optparse
from qpid.messaging import *
from qpid.util import URL
from qpid.log import enable, DEBUG, WARN
parser = optparse.OptionParser(usage="usage: %prog [options] ADDRESS ...",description="Drain messages from the supplied address.")
parser.add_option("-b", "--broker", default="localhost",help="connect to specified BROKER (default %default)")
parser.add_option("-c", "--count", type="int",help="number of messages to drain")
parser.add_option("-f", "--forever", action="store_true",help="ignore timeout and wait forever")
parser.add_option("-r", "--reconnect", action="store_true",help="enable auto reconnect")
parser.add_option("-i", "--reconnect-interval", type="float", default=3,help="interval between reconnect attempts")
parser.add_option("-l", "--reconnect-limit", type="int",help="maximum number of reconnect attempts")
parser.add_option("-t", "--timeout", type="float", default=0,help="timeout in seconds to wait before exiting (default %default)")
parser.add_option("-p", "--print", dest="format", default="%(M)s",help="format string for printing messages (default %default)")
parser.add_option("-v", dest="verbose", action="store_true",help="enable logging")
opts, args = parser.parse_args()
if opts.verbose:enable("qpid", DEBUG)
else:enable("qpid", WARN)
if args:addr = args.pop(0)
else:parser.error("address is required")
if opts.forever:timeout = None
else:timeout = opts.timeout
class Formatter:def __init__(self, message):self.message = messageself.environ = {"M": self.message,"P": self.message.properties,"C": self.message.content}def __getitem__(self, st):return eval(st, self.environ)
conn = Connection(opts.broker,reconnect=opts.reconnect,reconnect_interval=opts.reconnect_interval,reconnect_limit=opts.reconnect_limit)
try:conn.open()ssn = conn.session()rcv = ssn.receiver(addr)count = 0while not opts.count or count < opts.count:try:msg = rcv.fetch(timeout=timeout)print opts.format % Formatter(msg)count += 1ssn.acknowledge()except Empty:break
except ReceiverError, e:print e
except KeyboardInterrupt:pass
conn.close()


Browse 模式的意思是,浏览的意思,一个特殊的需求,我访问了一次,别人也能访问。

Consume 模式的意思是,我浏览了一次后,删除这一条。别人就访问不到啦。

这个是浏览模式:

wKiom1MAx87zwLEFAAbCk_wE0PY041.jpg


sub-pub 通信的例子


Pub-sub 是另一种很有用的通信模型。恐怕它的名字就源于出版发行这种现实中的信息传递方式吧,publisher 就是出版商,subscriber 就是订阅者。


服务端
qpid-config add exchange topic news-service
./spout news-service/news xiaorui.cc
客户端:
./drain -t 120 news-service/#.news


PUB端,创建TOPIC点 !

wKiom1MAzGDg3nZbAAPRnFen8Y4988.jpg


SUB端,也就是接收端!

wKiom1MAzG2Tsf9YAAO76LbxAS8638.jpg


总结:

 qpid挺好用的,比rabbitmq要轻型,比zeromq保险点! 各方面的文档也都很健全,值得一用。    话说,这三个消息队列我也都用过,最一开始用的是redis的pubsub做日志收集和信息通知,后来在做集群相关的项目的时候,我自己搞了一套zeromq的分布式任务分发,和saltstack很像,当然了远没有万人用的salt强大。  rabbitmq的用法,更是看中他的安全和持久化,当然性能真的不咋地。

关于qpid的性能我没有亲自做大量的测试,但是听朋友说,加持久化可以到7k,不加持久化可以到1500左右。


本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/300229.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

基于 Blazor 打造一款实时字幕

早先在录制视频的时候一直使用的是 obs-auto-subtitle 作为实时字幕展示功能。不过这个是以 OBS 插件的形式存在&#xff0c;不管是语言和功能上都有一定的限制。故而使用 Blazor server 实现一个。总体思路 实时字幕自然需要语音转文字的功能。考察了一些服务之后&#xff0c;…

一个数学系毕业的物理学家,是怎么拿到诺贝尔化学奖的?

全世界只有3.14 % 的人关注了青少年数学之旅2019年10月9日&#xff0c;这个“特别好”教授&#xff0c;1940年&#xff0c;“特别好”考上了耶鲁&#xff0c;1943年&#xff0c;“特别好”终于拿到数学学士学位。“特别好”特别沮丧&#xff0c;▲ “特别好”在牛津大学&#x…

U盘安装Linux挂载cd,U盘安装Ubuntu Server CD-ROM挂载失败

U盘安装 Ubuntu Server 发生Failed to copy file from CD-ROM问题解决方案使用UltraISO制做Ubuntu Server安装盘&#xff0c;在安装过程当中出现[!!] Load installer components from CD警告&#xff0c;这一步应该是安装文件检查步骤&#xff0c;没有检测到完整镜像文件而提示…

三十五例网络故障排除方法

上网时&#xff0c;我们经常会碰到这样、那样的网络故障&#xff0c;如何应付呢?今天&#xff0c;我们就针对一些常见的故障给大家分析一下! 1.故障现象:网络适配器(网卡)设置与计算机资源有冲突。   分析、排除:通过调整网卡资源中的IRQ和I/O值来避开与计算机其它资源的冲突…

遭遇“烧钱瓶颈” 优酷成本结构堪忧

从奥运结束至今&#xff0c;视频网站的短暂喧嚣终于渐归沉寂。尽管各个视频网站通过拼命烧钱而维系出的“看上去很美”的表象让很多人甚至开始盲目高呼&#xff1a;视频网站的春天已经到来&#xff0c;然而&#xff0c;从之前媒体曝出的“优酷网获千万过桥贷款疑是成本逼迫”到…

XHTML教会我的一些东西-1

第一次写博客&#xff0c;虽然以前写作文是我的强项&#xff0c;我也很能说&#xff0c;但是似乎现在这种能力正在退化。不知为什么&#xff0c;到了大学之后我就变得跟以前不一样&#xff0c;似乎是回到了小学时的我。我在大学开始变得内向、沉默、不去主动和别人交谈。因为这…

杀鸡焉用牛刀!放下Windbg,让dotnet-stack来快速定位死锁原因

我们用来分析CPU过高、死锁问题的常见方案是使用Windbg分析dump文件。但是这种方式存在一些缺点&#xff0c;比如dump文件过大难以下载&#xff0c;windbg使用过于复杂难以掌握等。这里介绍一个小工具dotnet-stack&#xff0c;帮助我们检查托管代码调用堆栈&#xff0c;快速定位…

数学中那些非常奇葩的证明

全世界只有3.14 % 的人关注了青少年数学之旅一、费马大定理证明证&#xff1a;是无理数假设是有理数&#xff0c;p和q是互素正整数那么移项得又由费马大定理可知&#xff1a;与费马大定理(Fermats last therorem)矛盾, Q.E.D. &#xff08;也可易证2的n分之一次方且n属于大于2的…

linux代码段起始地址设置,Arch Linux安装后的一些初始设置简介

配置有线网络。没网络的时候&#xff0c;可以直接设定ip应急&#xff0c;后面 netctl 才是正规设置&#xff1a;复制代码代码如下:# ip addr add 192.168.0.100/24 dev enp0s4# ip link set dev enp0s4 up# ip route add default via 192.168.0.1# echo nameserver 208.67.222.…

安装Linux后的遗留问题

一些Linux用户经常询问这样的问题&#xff1a;Linux能兼容XXX卡么&#xff1f;其实&#xff0c;Linux是一个开放性的系统&#xff0c;只要通过Linux爱好者们的努力&#xff0c;Linux可以兼容任何硬件。 一&#xff0c;声卡 首先要知道声卡的类型&#xff0c;或者是某种声卡兼容…

使用 Minimal API 改造动态文件提供者

使用 Minimal API 改造动态文件提供者Intro之前介绍过一个基于动态文件提供者来实现静态网站的动态更新&#xff0c;可以参考 ASP.NET Core 实现一个简单的静态网站滚动更新&#xff0c;在 Minimal API 出现之后想改造成 Minimal API 的写法&#xff0c;但是由于之前版本的 Min…

[导入]体验Asp.Net Mvc Preview5(3)-探索ModelBinder的工作原理

摘要: 在前面的两篇文章中,我们研究了Asp.Net Mvc Preview5的ViewEingine的改进,从本篇开始,我们开始研究Preview5中的新特性:ModelBinder,首先我们来了解下什么是ModelBinder特性,这有什么用处,在以前的版本中,如果我们要在Action中获取数据,一般有三种方式,一是通过Action的参…

bcdedit

我的电脑装了双系统&#xff1a;Win2003 SP2&#xff08;C盘&#xff09;和Win2008 SP2&#xff08;D盘&#xff09;&#xff0c;最近2003一启动就蓝屏unknown hard error&#xff0c;安全模式也进不去&#xff0c;恢复注册表等方法试过也不行&#xff0c;但2008正常&#xff0…

一招教你舍友学会尤克里里 | 今日最佳

全世界只有3.14 % 的人关注了青少年数学之旅视频源 洋味铁汁联盟

linux cpu softirq,linux softirq机制

Copyright © 2003 by 詹荣开E-mail:zhanrksohu.comLinux-2.4.0Version 1.0.0&#xff0c;2003-2-14摘要&#xff1a;本文主要从内核实现的角度分析了Linux 2.4.0内核的Softirq机制。本文是为那些想要了解Linux I/O子系统的读者和Linux驱动程序开发人员而写的。关键词&…

复盘:我的三个月远程办公实践,有自由,也有代价

这是头哥侃码的第244篇原创有人说&#xff0c;人生就是一个不断尝试的过程。我觉得&#xff0c;有时候这个词其实不准确&#xff0c;因为每个人的性格不同&#xff0c;成长经历及运势不同&#xff0c;所以对 “尝试” 俩字的理解也就不同。在我还是孩子的时候&#xff0c;几乎所…

信息网络传播权保护条例(2006)

信息网络传播权保护条例(2006)[url]http://www.ncac.gov.cn/GalaxyPortal/inner/bqj/include/detail.jsp?articleid9400&boardpid175&boardid11501010111602[/url]转载于:https://blog.51cto.com/dgcnn/20682

Silverlight专题(10)- WatermarkedTextBox使用

问题&#xff1a; 之前的Silverlight版本都有一个WatermarkedTextBox控件 但是到了Silverlight 2 Beta2版本&#xff0c;由于和WPF兼容的考虑 WatermarkedTextBox被移除了 虽然之前我有看到消息说Silverlight 2正式Release的时候会给TextBox一个Watermark属性 但是Silverlight …

asp.net ajax检查用户名是否存在代码

原文 asp.net ajax检查用户名是否存在代码 用户注册时&#xff0c;我们经常需要检查用户名是否存在&#xff0c;本文就是实现无刷新验证用户名 打开开发环境VS 2005,新建项目(或打开现有项目),新建一个Web窗体,命名为 Default.aspx 创建 XMLHttpRequest 对象所有现代浏览器 (I…

90后一代人还能通过攒钱改变现状吗?

全世界只有3.14 % 的人关注了青少年数学之旅每次打开公号&#xff0c;扑面而来一阵阵焦虑&#xff1a;95后毕业3个月就买房&#xff0c;你的同龄人正在抛弃你毕业3年&#xff0c;年薪超100万&#xff1a;赚钱&#xff0c;是一种修行一线城市财务自由门槛2.9亿&#xff0c;看看你…