RabbitMQ MQTT协议和AMQP协议
1 序言... 1
1.1 RabbitMq结构... 1
1.2 RabbitMq消息接收... 4
1.3 Exchange种类和消息发送模式... 4
1.4 RabbitMq的协议... 6
1.4.1 AMQP协议... 6
1.4.2 MQTT协议... 8
2 RabbitMq服务器安装和使用... 9
2.1 Windows下安装RabbitMQ.. 9
2.2 centos7 Linux安装RabbitMq. 11
3 MQTT协议C++开发... 12
3.1 库函数介绍... 12
3.2 MQTT协议实例代码... 19
4 AMQP协议开发... 25
4.1 AMQP库函数介绍... 25
4.2 AMQP实例代码... 26
4.2.1 生产者实例... 27
4.2.2 消费者实例代码... 29
1 序言
1.1 RabbitMq结构
RabbitMQ是一种异步通信机制,消息的发送者和接收者之间不建立直接的联系,而是通过RabbitMQ服务器去做中间代理,生产者向服务器发布消息,消费者向服务器去订阅消息;生产者与服务器建立连接,将消息发给服务器,服务器通过映射关系将消息缓存到指定队列中,消费者再与RabbitMQ服务器建立连接,队列中有消息时,服务器会将消息发给消费者;这样做可以降低发送者和接受者之间的耦合度,一方断开连接,消息也不会丢失,会在服务器中进行缓存;通过中间服务器代理可以做到负载均衡、集群扩展、 优先级分配等;
图1.1.1Rabbitmq通讯机制
图1.1.2 RabbitMQ服务网页界面
如图1.1.1所示,发布者和接收者都是作为客户端和RabbitMq服务器建立一个TCP Connection连接,也可以建立多个TCP connection连接;在一个TCP连接之上又可以创建多个通道Channel与一个Exchange建立连接;发布者就像是淘宝卖家一样,exchange就像是快递公司,卖家与多家快递公司建立合作连接,一个卖家的多个分店和快递公司建立多个通道,rootingkey就像是快递地址,queue就像是集散中心,接收者就像是收快递的买家;相关概念如下所示:
Producer:消息的发布者;相当于淘宝卖家;
Consumer:消息的接收者;相当于淘宝买家;
Connection: 就是一个TCP的连接。Producer和Consumer都是通过TCP连接到RabbitMQ Server的。相当于卖家和快递公司建立合作协议;
Channels: 虚拟连接。它建立在上述的TCP连接中。数据流动都是在Channel中进行的。相当于卖家的分店与快递公司之间的生意往来;建立和关闭TCP连接耗资源,影响性能,而且TCP的连接数也有限制,限制了系统处理高并发的能力。但是,在TCP连接中建立Channel是没有上述代价的。对于Producer或者Consumer来说,可以并发的使用多个Channel进行Publish或者Receive;
Exchange:交换机,将消息路由到指定的消费者;相当于快递公司,将接到的快递集散之后发给各个城市的集散中心;有三种exchange,通过参数来设置;第一个模式是定向模式Direct exchange,只有当routing key 匹配时, 消息才会被传递到相应的queue中。第二种模式是广播模式Fanout exchange, 会向所有绑定的队列发送消息。第三种模式是模糊匹配模式Topic exchange,routing key由通配符构成,对routing key进行模式匹配,比如ab*可以传递到所有ab*的queue。
RootingKey:消息发送给谁的标示符,用来连接Exchange和queue;相当于快递中的地址,让快递公司知道将快递发给哪个集散中心;
Queue:消息队列,用于缓存消息的队,Consumer和Procuder都可以创建queue,队列的持久化也可以设置;相当于快递的集散中心,用来暂时存放快递;消费者从队列中取消息;相当于买家从集散中心取快递,多个买家可以从同一个集散中心取快递;相当于多个客户端从队列里取消息;一个客户端也可以创建多个通道从队列里取消息,相当于一个家庭的不同成员去取快递; 程序中就是开启多个线程,通过通道从队列中取消息,实现高并发;
Binding:绑定exchange和queue,建立联系;
1.2 RabbitMq消息接收
同一个客户端可以开启多个线程consumer从一个队列里获取消息,队列按照轮询的方式发给每个consumer,假如队列里有六条消息1、2、4、5、6,有两个consumer,consumer1接收到1、3、5,consumer2接收到2、4、6。如果希望每个consumer都获取完整的6条消息,需要进建立两个对列;为了保证数据不被丢失,RabbitMQ支持消息确认机制,即acknowledgments。Consumer正确处理数据后给Rabbit发送确认细信息,然后Rabbit再从queue中删除消息;
1.3 Exchange种类和消息发送模式
(1)default exchange
default exchange是一个没有名称的(空字符串)被broker预先申明的direct exchange。它所拥有的一个特殊属性使它对于简单的应用程序很有作用:每个创建的queue会与它自动绑定,使用queue名称作为routing key。举例说,当你申明一个名称为“search-indexing-online”的queue时,AMQP broker使用“search-indexing-online”作为routing key将它绑定到default exchange。因此,一条被发布到default exchange并且routing key为"search-indexing-online"将被路由到名称为"search-indexing-online"的queue。
(2)定向模式direct exchange
direct exchange严格根据消息的routing key来传送消息。direct exchange是单一传播路由消息的最佳选择,routing key将queue与exchange进行绑定,消息根据routing key分配到指定的queue中;一个rooting key可以绑定多个queue,多个rooting key也可以绑定到同一个queue上面;
图1.3.1 定向模式队列绑定示意图
(3)广播模式Fanout exchange
fanout exchange路由消息到所有的与其绑定的queue中,忽略routing key。如果N个queue被绑定到一个fanout exchange,当一条新消息被发布到exchange时,消息会被复制并且传送到这N个queue。fanout exchange是广播路由的最佳选择。
因为一个fanout exchange传送消息的副本到每一个与其绑定的queue,它的使用情况很相似:
1)大量的多用户在线(multi-player online MMO)游戏使用它更新排行榜或者其他的全体事件
2)体育新闻网站使用fanout exchange向手机客户端实时发送比分更新
3)分布式系统可以广播各种状态与配置更新
4)群聊可以使用fanout exchange让消息在参与者之间传输
图1.3.2广播模式示意图
(4)模糊匹配模式Topic exchange
Topic exchange路由消息到一个或者多个queue, routing key可以是包含通配符的字符串,用于模糊匹配多个rooting key。Topic exchange经常被用于实现各种发布/订阅模式的变化。Topic exchanges通常被用于多路广播路由消息。
topic类型的Exchange在匹配规则上进行了扩展,它与direct类型的Exchage相似,也是将消息路由到binding key与routing key相匹配的Queue中,但这里的匹配规则有些不同,它约定:
1)routing key与binding key是用一个句点号“. ”分隔的字符串(我们将被句点号“. ”分隔开的每一段独立的字符串称为一个单词),如“stock.usd.nyse”、“nyse.vmw”、“quick.orange.rabbit”
2)binding key中可以存在两种特殊字符“*”与“#”,用于做模糊匹配,其中“*”用于匹配一个单词,“#”用于匹配多个单词(可以是零个)
图1.3.3广播模式示意图
1.4 RabbitMq的协议
1.4.1 AMQP协议
AMQP是由各种数据帧组成的协议,包括方法帧、内容帧,心跳帧等,每一帧数据所有的帧都由一个头(header,7个字节),任意大小的负载(payload),和一个检测错误的帧结束(frame-end)字节组成:
图1.4.1 AMQP协议结构
要读取一个帧,我们必须:
(1)读取header,检查帧类型(frame type)和通道(channel)。
(2) 根据帧类型,我们读取负载并进行处理。
(3)读取帧结束字节。
在实际实现中,如果性能很关键的话,我们应该使用读前缓冲(read-ahead buffering)”或“收集读取(gathering reads)”,以避免为了读一个帧而做三次独立的系统调用。
AMQP是一套标准的底层协议,加入了许多其他特征来支持互用性,具有跨语言和跨平台的特性。AMQP协议的主要特性如下:
•独立于平台的底层消息传递协议
•消费者驱动消息传递
•跨语言和平台的互用性
•它是底层协议的
•有5种交换类型direct,fanout,topic,headers,system
•面向缓存的
•可实现高性能
•支持长周期消息传递
•支持经典的消息队列,循环,存储和转发
•支持事务(跨消息队列)
•支持分布式事务(XA,X/OPEN,MS DTC)
•使用SASL和TLS确保安全性
•支持代理安全服务器
•元数据可以控制消息流
•不支持LVQ
•客户端和服务端对等
•可扩展
1.4.2 MQTT协议
传输协议实际上是一种消息通讯机制,通讯的双方进行约定消息内容的格式,一方发送,另一方接收,按照协议格式进行解析,获取到正确的消息内容。传输协议一般包括协议头,版本号,消息长度,消息内容等。不同的协议头代表不同的消息类型。MQTT协议结构体如下:
typedef struct
{
/** The eyecatcher for this structure. must be MQTM.识别码*/
char struct_id[4];
/** The version number of this structure. Must be 0 版本号*/
int struct_version;
/** The length of the MQTT message payload in bytes. 消息长度*/
int payloadlen;
/** A pointer to the payload of the MQTT message. 消息数据*/
void* payload;
int qos;
int retained;
int dup;
int msgid;
} MQTTAsync_message;
MQTT协议是它是专门为小设备设计的。计算性能不高的设备不能适应AMQP上的复杂操作,它们需要一种简单而且可互用的方式进行通信。这是MQTT的基本要求,而如今,MQTT是物联网(IOT)生态系统中主要成分之一。
MQTT的主要特性:
•面向流,内存占用低
•为小型无声设备之间通过低带宽发送短消息而设计
•不支持长周期存储和转发
•不允许分段消息(很难发送长消息)
•支持主题发布-订阅
•消息实际上是短暂的(短周期)
•简单用户名和密码,基于没有足够信息熵的安全
•不支持安全连接
•消息不透明
•Topic是全局的(一个全局的命名空间)
•支持最新值队列(Last Value Queue (LVQ) )
•客户端和服务端不对称
•不能扩展
2 RabbitMq服务器安装和使用
2.1 Windows下安装RabbitMQ
(1):下载erlang,原因在于RabbitMQ服务端代码是使用并发式语言erlang编写的,下载地址:http://www.erlang.org/downloads,双击.exe文件进行安装就好,安装完成之后创建一个名为ERLANG_HOME的环境变量,其值指向erlang的安装目录,同时将%ERLANG_HOME%\bin加入到Path中,最后打开命令行,输入erl,如果出现erlang的版本信息就表示erlang语言环境安装成功;
图2.1.1 环境变量设置
(2):下载RabbitMQ,下载地址:http://www.rabbitmq.com/,同样双击.exe进行安装就好(这里需要注意一点,默认的安装目录是C:/Program Files/....,这个目录中是存在空格符的,我们需要改变安装目录,貌似RabbitMQ安装目录中是不允许有空格的,我之前踩过这个大坑);
(3):安装RabbitMQ-Plugins,这个相当于是一个管理界面,方便我们在浏览器界面查看RabbitMQ各个消息队列以及exchange的工作情况,安装方法是:打开命令行cd进入rabbitmq的sbin目录(我的目录是:E:\software\rabbitmq\rabbitmq_server-3.6.5\sbin),输入:rabbitmq-plugins enable rabbitmq_management命令,稍等会会发现出现plugins安装成功的提示,默认是安装6个插件,如果你在安装插件的过程中出现了下面的错误:
图2.1.2错误示意图
解决方法是:首先在命令行输入:rabbitmq-service stop,接着输入rabbitmq-service remove,再接着输入rabbitmq-service install,接着输入rabbitmq-service start,最后重新输入rabbitmq-plugins enable rabbitmq_management试试,我是这样解决的;
(4):插件安装完之后,本机可以在浏览器输入http://localhost:15672进行验证,其他电脑需要输入http://IP地址:15672进行访问,你会看到下面界面,输入用户名:guest,密码:guest你就可以进入管理界面,当然用户名密码你都可以变的;
图2.1.3 RabbitMq服务器登陆界面
2.2 centos7 Linux安装RabbitMq
(1).首先需要安装erlang
#rpm -Uvh http://download.fedoraproject.org/pub/epel/7/x86_64/e/epel-release-7-8.noarch.rpm
#yum install erlang
安装过程中会有提示,一路输入“y”即可。
(2).完成后安装RabbitMQ:
先下载rpm:
#wget http://www.rabbitmq.com/releases/rabbitmq-server/v3.6.6/rabbitmq-server-3.6.6-1.el7.noarch.rpm
下载完成后安装:
#yum install rabbitmq-server-3.6.6-1.el7.noarch.rpm
完成后启动服务:
#service rabbitmq-server start
可以查看服务状态:
#service rabbitmq-server status
3 MQTT协议C++开发
3.1 库函数介绍
在http://mqtt.org/网站下载MQTT协议的软件开发包,paho.mqtt.c-master.rar;下载CMAKE编译工具并安装,编译生成paho-mqtt3a.dll和paho-mqtt3a.lib文件,复制头文件MQTTAsync.h到工程,进行软件开发。函数的调用顺序如下图所示。
图3.1.1 MQTT协议函数调用流程图
RabbitMQ MQTT的函数调用步骤为:
(1) 创建MQTT客户端
MQTTAsync_create输入参数服务器的地址url,标识客户端的clientid和持久化参数,输出一个客户端的句柄;这个句柄作为其他函数的的输入参数;
int MQTTAsync_create(MQTTAsync* handle, const char* serverURI, const char* clientId,int persistence_type, void* persistence_context);
/**
功能:创建客户端句柄的函数
* @param handle out 返回的客户端句柄
* @param serverURI in 服务器的URL tcp://localhost:1883
* @param clientId in 连接服务器时提供给服务器用来标识客户端的id,需要是null结尾的UTF-8格式的字符串
* @param persistence_type in 持久化的类型
1)MQTTCLIENT_PERSISTENCE_NONE:无持久化,客户端运行失败或者关闭时,消息会丢失,persistence_context参数无效且应该被设置为NULL;
2)MQTTCLIENT_PERSISTENCE_DEFAULT:默认持久化,消息保存在硬盘中,有保护机制防止消息丢失,persistence_context指定保存数据的目录路径,如果persistence_context是NULL,采用默认的了路径
3)MQTTCLIENT_PERSISTENCE_USER:指定的消息持久化实现,将持久化的控制交给程序,程序要实现MQTTClient_persistence 接口,persistence_context指定一个有效的MQTTClient_persistence结构体指针;
* @param persistence_context 持久化参数的上下文指针
* @return ::成功返回MQTTASYNC_SUCCESS,失败返回错误码;
*/
(2)设置回调函数
MQTTAsync_setCallbacks是设置回调函数的函数,handle为MQTTAsync_create创建的句柄,三个回调函数分别是连接断开回调函数、接收到消息回调函数,发送完成回调函数;三个回调函数根据需要添加,不需要的直接设置为NULL;
int MQTTAsync_setCallbacks (MQTTAsync handle, void* context, MQTTAsync_connectionLost* cl,MQTTAsync_messageArrived* ma, MQTTAsync_deliveryComplete* dc);
/**功能:设置回到函数
* @param handle in 客户端句柄
* @param context in 回调函数返回时,上下文指针,一般传入this指针
* @param cl in 断开连接的回调函数指针
* @param ma in 接收到消息的回调函数指针
* @param dc in 发送成功的回调函数指针
* @return 成功返回MQTTASYNC_SUCCESS 失败返回MQTTASYNC_FAILURE
*/
(3)连接到服务器
MQTTAsync_connect向服务器发起连接请求,返回成功后并不一定立刻连接成功,需要通过MQTTAsync_connectOptions中设置的成功或者失败的回调函数来显示是否连接成功;所以要等到回调函数onSuccess返回时,才可以调用其他的函数,例如发送消息,订阅消息,测试是否连接正常;
int MQTTAsync_connect(MQTTAsync handle, const MQTTAsync_connectOptions * options);
/**功能:异步连接到服务器,成功失败会调用参数中提供的指针
* @param handle in MQTTAsync_create()创建的有效的客户端句柄
* @param options A pointer to a valid MQTTAsync_connectOptions
* structure.连接参数
* @return ::MQTTASYNC_SUCCESS if the client connect request was accepted.
* If the client was unable to connect to the server, an error code is
* returned via the onFailure callback, if set.返回值,以及错误码
* Error codes greater than 0 are returned by the MQTT protocol:<br><br>
* <b>1</b>: Connection refused: Unacceptable protocol version<br>
* <b>2</b>: Connection refused: Identifier rejected<br>
* <b>3</b>: Connection refused: Server unavailable<br>
* <b>4</b>: Connection refused: Bad user name or password<br>
* <b>5</b>: Connection refused: Not authorized<br>
* <b>6-255</b>: Reserved for future use<br>
*/
MQTTAsync_connectOptions是连接结构体,里面可以设置很多连接参数;可以设置是否使用SSL,内部重连机制,内部重连时间间隔,用户名,密码等;具体看下面的结构体介绍;
typedef struct
{
/** The eyecatcher for this structure. must be MQTC. */
char struct_id[4];
/** The version number of this structure. Must be 0, 1, 2, 3 4 or 5.
* 0 signifies no SSL options and no serverURIs 0 表示没有SSL选项和服务url
* 1 signifies no serverURIs 1表示没有服务器UIRL
* 2 signifies no MQTTVersion 2表示没有MQTT版本
* 3 signifies no automatic reconnect options 3表示没有自动重连选项
* 4 signifies no binary password option (just string) 4表示没有二进制密码选项,只有字符串
*/
int struct_version;
/** 内部心跳间隔,以秒为单位;在心跳间隔内,客户端至少发送一条
消息给服务器,如果没有则发送默认的ping消息给服务器,这样是为了检测
客户端和服务器之间的连接,避免时间较长的TCP/IP超时;设置为0时,表示没有心跳机制
int keepAliveInterval;
/**
这是一个布尔值,当客户端和服务器连接和断开连接时起作用;客户端和服务器都保存着session状态信息,session值用来确定通讯消息至少发送一次,且仅发送一次;session状态包括客户端创建的订阅,你可以选择保存或者放弃session之间的状态信息设置为true时,连接和断开连接时清除session,为false时,保留session;客户端连接服务器,客户端通过客户端标识和服务器地址识别本次连接服务器检查这个客户端连接的session是否存在;如果为ture,则清除之前的session,如果为false,则之前的session会被重新使用;之前没有的则重新创建
int cleansession;
/**这个值设置最大的在飞行中的消息数量 */
int maxInflight;
/**这个参数设置断开连接时的临终遗嘱消息,断开连接时发送消息到LWP topic*/
MQTTAsync_willOptions* will;
/**服务器的用户名*/
const char* username;
/**密码*/
const char* password;
/**连接超时时间*/
int connectTimeout;
/***以秒为单位的内部时钟间隔*/
int retryInterval;
/**指向ssl设置参数的结构体指针,为NULL时表示不使用SSL*/
MQTTAsync_SSLOptions* ssl;
/**连接成功的回调函数
MQTTAsync_onSuccess* onSuccess;
/**连接失败时的回调函数
MQTTAsync_onFailure* onFailure;
/**上下文信息,一般传入this指针
void* context;
/**服务器的url数组数量
int serverURIcount;
/**服务器url数组,null结尾的字符串数组,地址格式为protocol://host:port
protocol可以是tcp或者ssl,host可以是ip地址或者域名
char* const* serverURIs;
/**MQTT的版本
* Sets the version of MQTT to be used on the connect.
* MQTTVERSION_DEFAULT (0) = default: start with 3.1.1, and if that fails, fall back to 3.1 先尝试.1.1版本,失败在尝试.1
* MQTTVERSION_3_1 (3) = only try version 3.1只尝试.1
* MQTTVERSION_3_1_1 (4) = only try version 3.1.1只尝试.1.1
*/
int MQTTVersion;
/**自动重新连接
int automaticReconnect;
/**最小的重试间隔,以秒为单位,没失败一次,值增加一倍
int minRetryInterval;
/**最大的重连时间间隔,最小重连时间间隔超过这个值时停止
int maxRetryInterval;
/**如果password为NULL时,才会采用这个结构体,二进制格式的密码
struct {
int len; /**< binary password length */
const void* data; /**< binary password data */
} binarypwd;
} MQTTAsync_connectOptions;
(4)发送消息;
回调函数onSuccess返回连接成功后,就可以向服务器发送消息了;这个函数的功能是发布消息,输入参数为客户端句柄、发送的目的topic、发送消息结构体;发送消息也是异步的过程,函数返回成功并不代表消息已经传递到消费者,只有回调函数MQTTAsync_deliveryComplete回调函数返回的时候才表示传递成功,回调函数中返回参数有个token值,发送消息的结构体MQTTAsync_responseOptions中也有一个token值,回调返回token值,表示这个token值代表的这个消息已经传递成功;
DLLExport int MQTTAsync_sendMessage(MQTTAsync handle, const char* destinationName, const MQTTAsync_message* msg, MQTTAsync_responseOptions* response);
/**发送信息,
* @param handle A valid client handle from a successful call to
* MQTTAsync_create().
* @param destinationName in 发送消的topic
* @param msg in 发送消息MQTTAsync_message结构体
* @param response 设置回调函数的结构体MQTTAsync_responseOptions
* @return :成功返回MQTTASYNC_SUCCESS 失败返回错误码
*/
(5)订阅消息
DLLExport int MQTTAsync_subscribe(MQTTAsync handle, const char* topic, int qos, MQTTAsync_responseOptions* response);
/**订阅消息函数,topic可能包含通配符
* @param handle in MQTTAsync_create().客户端句柄
* @param topic in topic名称,可能包含通配符
* @param qos in 服务器处理重复消息的机制0表示可能会丢失,1至少发一次,可能发多次;2严格只发一次;
* @param response out 返回结果函数
* @return 成功返回MQTTASYNC_SUCCESS 失败返回错误码
*/
(6)接收消息
订阅消息之后就可以接收消息了,通过设置的回调函数接收消息;context就是设置回调函数时设置的上下文指针,传入的一般是this,可以用这个指针来获取消息接收的对象;topic名称则是用于接收多个topic消息时,区分消息的作用;message则是接收到消息的结构体,
int MQTTAsync_messageArrived(void* context, char* topicName, int topicLen, MQTTAsync_message* message);
(7)判断是否正常连接
/**测试连接是否正常
* @param handle A valid client handle from a successful call to MQTTAsync_create().
* @return Boolean true if the client is connected, otherwise false.
*/
DLLExport int MQTTAsync_isConnected(MQTTAsync handle);
(8)重新连接函数
/**调用该函数前必须连接成功过,用之前的参数了重新连接,所以一定要等到成功连接一次后才能重连,否则会报错;
DLLExport int MQTTAsync_reconnect(MQTTAsync handle);
(9)销毁句柄函数
DLLExport void MQTTAsync_destroy(MQTTAsync* handle);
3.2 MQTT协议实例代码
(1)初始化参数,设置回调函数,连接服务器
HPR_INT32 Init()
{
HPR_INT32 iRetVal = HPR_ERROR;
m_bStart=HPR_FALSE;
//从配置文件获取参数
m_strAddress=CConfig::instance()->GetRabbitMqMQTTRecvInfo().m_strAddress;
m_strTopic=CConfig::instance()->GetRabbitMqMQTTRecvInfo().m_strTopic;
m_strUserName=CConfig::instance()->GetRabbitMqMQTTRecvInfo().m_strUserName;
m_strPsw=CConfig::instance()->GetRabbitMqMQTTRecvInfo().m_strPassword;
m_strClientId=CConfig::instance()->GetRabbitMqMQTTRecvInfo().m_strClientId;
do
{
//传入参数,创建客户端句柄
int rc=MQTTAsync_create(&m_RmqMqttAClient, m_strAddress.c_str(),m_strClientId.c_str(), MQTTCLIENT_PERSISTENCE_NONE, NULL);
if (rc!=MQTTASYNC_SUCCESS||m_RmqMqttAClient==NULL)
{
FIRE_ERROR("MQTTAsync_create failed");
break;
}
//设置回调函数
rc=MQTTAsync_setCallbacks(m_RmqMqttAClient, this,OnConnectLost, OnMessageArrived, NULL);
if (MQTTASYNC_SUCCESS!=rc)
{
FIRE_ERROR("MQTTAsync_setCallbacks failed errorcode %d",rc);
//break;
}
//连接到服务器
MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer;
conn_opts.keepAliveInterval = 5;
conn_opts.cleansession = 1;
conn_opts.onSuccess = onConnectSuccess;
conn_opts.onFailure = onConnectFailure;
conn_opts.context = this;
conn_opts.username=m_strUserName.c_str();
conn_opts.password=m_strPsw.c_str();
conn_opts.retryInterval=1;
conn_opts.automaticReconnect=1;
int rc=0;
if ((rc = MQTTAsync_connect(m_RmqMqttAClient, &conn_opts)) != MQTTASYNC_SUCCESS)
{
FIRE_ERROR("Failed to start connect, return code %d\n", rc);
break;
}
}
(2)连接失败的回调函数
static void onConnectFailure(void* context, MQTTAsync_failureData* response)
{
if (context==NULL||response==NULL)
{
FIRE_ERROR("input para is null!");
return;
}
CRabbitMqQTRecv *p=(CRabbitMqQTRecv*)context;
if (p==NULL)
{
FIRE_ERROR("onConnectFailure input para is NULL");
return ;
}
FIRE_ERROR("Connect failed, rc %d errror message%s\n", response->code ,response->message==NULL? "no msg":response->message);
return ;
}
(3)连接成功的回调函数
static void onConnectSuccess(void* context, MQTTAsync_successData* response)
{
if (context==NULL||response==NULL)
{
FIRE_ERROR("onConnectSuccess input para is NULL");
return ;
}
CRabbitMqQTRecv *p=(CRabbitMqQTRecv*)context;
if (p==NULL)
{
FIRE_ERROR("onConnectFailure input para is NULL");
return ;
}
p->SetConnectState(HPR_TRUE);
//订阅消息函数
MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
int rc=0;
opts.onSuccess = NULL;
opts.onFailure = NULL;
opts.context = p->this;
if ((rc = MQTTAsync_subscribe(p->m_RmqMqttAClient, p-> m_strTopic.c_str(), 1, &opts)) != MQTTASYNC_SUCCESS)
{
FIRE_ERROR("MQTTAsync_subscribe failed return code %d",rc);
}
FIRE_INFO("Connect rabbitmq MQTT success!");
}
(4)连接断开的回调函数
static void OnConnectLost(void* context, char* cause)
{
if (context==NULL||cause==NULL)
{
FIRE_ERROR("MQTTAsync_connectionLost input para is NULL");
return ;
}
CRabbitMqQTRecv *p=(CRabbitMqQTRecv*)context;
if (p==NULL)
{
FIRE_ERROR("onConnectFailure input para is NULL");
return ;
}
p->SetConnectState(HPR_FALSE);
FIRE_INFO("Connect rabbitmq MQTT success!");
}
(5)接收到消息的回调函数
static int OnMessageArrived(void* context, char* topicName, int topicLen, MQTTAsync_message* message)
{
if (context==NULL||topicName==NULL||message==NULL)
{
FIRE_ERROR("OnMessageArrived input para is NULL");
return false ;
}
CRabbitMqQTRecv *p=(CRabbitMqQTRecv*)context;
if (p==NULL)
{
FIRE_ERROR("onConnectFailure input para is NULL");
return false ;
}
char* ReciveBuff=g_MemPool.MemAlloc((message->payloadlen)+1);
memmove_s(ReciveBuff,(message->payloadlen)+1,message->payload,message->payloadlen);
p->ProcessMsg(ReciveBuff);
g_MemPool.MemRstore(ReciveBuff);
return true;
}
(6)发送消息的函数
HPR_INT32 CRabbitMqQT::RabbitMqMQTTPublish(char* msg)
{
MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
MQTTAsync_message pubmsg = MQTTAsync_message_initializer;
int rc=0;
opts.onSuccess =NULL ;
opts.context = this;
pubmsg.payload = msg;
pubmsg.payloadlen =strlen(msg);
pubmsg.qos = 1;
pubmsg.retained = 0;
if ((rc = MQTTAsync_sendMessage(m_RmqMqttAClient, m_strTopic.c_str() ,&pubmsg, &opts)) != MQTTASYNC_SUCCESS)
{
FIRE_ERROR("Failed to start sendMessage, return code %d\n", rc);
return HPR_ERROR;
}
else
{
return HPR_OK;
}
}
(7)重新连接的函数,一般定义在定时器中
HPR_INT32 CRabbitMqQT::ReConnect()
{
if (MQTTAsync_isConnected(m_RmqMqttAClient)==false)
{
if (MQTTASYNC_SUCCESS==MQTTAsync_reconnect(m_RmqMqttAClient))
{
m_bConnectSuccess=HPR_TRUE;
FIRE_INFO("MQTTAsync_reconnect sucess!");
return HPR_OK;
}
else
{
m_bConnectSuccess=HPR_FALSE;
FIRE_ERROR("MQTTAsync_reconnect ERROR!");
return HPR_ERROR;
}
}
}
(8)结束连接,销毁句柄
if(m_RmqMqttAClient!=NULL)
{
MQTTAsync_destroy(&m_RmqMqttAClient);
m_RmqMqttAClient=NULL;
}
4 AMQP协议开发
4.1 AMQP库函数介绍
下载 rabbitmq-c源码包 : https://github.com/alanxz/rabbitmq-c需要用到SSL库文件和头文件。下载CMAKE编译工具并安装,编译生成rabbitmq.4.dll和rabbitmq.4.lib文件,和四个头文件amqp.h、amqp_framing.h、amqp_tcp_socket.h、stdint.h一起复制到工程。AMQP协议的库函数调用流程图如下图所示。
图4.1库函数调用流程图
4.2 AMQP实例代码
4.2.1 生产者实例
(1)建立连接
HPR_INT32 CRabbitMqQP::creat_amqp_publisher()
{
HPR_INT32 iRetVal = HPR_ERROR;
m_mqPublishConn = amqp_new_connection();
m_mqPublishSocket = amqp_tcp_socket_new(m_mqPublishConn);
if(!m_mqPublishSocket)
{
FIRE_ERROR("New amqp socket error");
return iRetVal;
}
int iStatus = amqp_socket_open(m_mqPublishSocket, m_strMqIp.c_str(), m_iMqPort);
if(iStatus)
{
FIRE_ERROR("amqp_socket open failed");
m_bConnectSuccess = HPR_FALSE;
return iRetVal;
}
string strRabbitMqName = CConfig::instance()->GetRabbitMqAMQPSendInfo().m_strUserName;
string strRabbitMqPwd = CConfig::instance()->GetRabbitMqAMQPSendInfo().m_strPassword;
m_mqReply = amqp_login(m_mqPublishConn,"/",AMQP_DEFAULT_MAX_CHANNELS,AMQP_DEFAULT_FRAME_SIZE,AMQP_DEFAULT_HEARTBEAT,AMQP_SASL_METHOD_PLAIN,strRabbitMqName.c_str(),strRabbitMqPwd.c_str());
if(m_mqReply.reply_type != AMQP_RESPONSE_NORMAL)
{
FIRE_ERROR("amqp login failed");
return iRetVal;
}
amqp_channel_open(m_mqPublishConn,1);
m_mqReply = amqp_get_rpc_reply(m_mqPublishConn);
if(m_mqReply.reply_type == AMQP_RESPONSE_SERVER_EXCEPTION)
{
FIRE_ERROR("open channel faild");
return iRetVal;
}
string strExchange = CConfig::instance()->GetRabbitMqAMQPSendInfo().m_strExchange;
amqp_exchange_declare(m_mqPublishConn, 1, amqp_cstring_bytes(strExchange.c_str()), amqp_cstring_bytes("direct"),
0, 1, 0, 0, amqp_empty_table);
m_mqReply = amqp_get_rpc_reply(m_mqPublishConn);
if(m_mqReply.reply_type == AMQP_RESPONSE_SERVER_EXCEPTION)
{
FIRE_ERROR("declare exchange faild\n");
return iRetVal;
}
return HPR_OK;
}
(2)发布消息
HPR_VOID CRabbitMqQP::DoPublishAlarm()
{
int iStatus = 0;
string strBindingKey = CConfig::instance()->GetRabbitMqAMQPSendInfo().m_strRountKey;
string strExchange = CConfig::instance()->GetRabbitMqAMQPSendInfo().m_strExchange;
std::string strData= "hello Rabbit";
while (!m_bExit)
{
if ((m_bConnectSuccess == HPR_TRUE&&m_mqPublishConn !=NULL))
{
char* msg=PopMq();
if (msg!=NULL)
{//发布消息到服务器
iStatus = amqp_basic_publish(m_mqPublishConn,1,amqp_cstring_bytes(strExchange.c_str()),\
amqp_cstring_bytes(strBindingKey.c_str()),0,0,NULL,amqp_cstring_bytes(msg));
if(iStatus<0)
{
FIRE_ERROR("send data %s faild\n",msg);
PushMq(msg);
continue;
}
g_MemPool.MemRstore(msg);
FIRE_INFO("send data sucess %s\n",msg);
}
}
else
{
FIRE_ERROR("RabbitMq is not connected !");
Sleep(1000);
}
}
}
(3)结束清理资源
HPR_VOID CRabbitMqQP::destory_amqp_publisher()
{
amqp_channel_close(m_mqPublishConn, 1, AMQP_REPLY_SUCCESS);
amqp_connection_close(m_mqPublishConn, AMQP_REPLY_SUCCESS);
amqp_destroy_connection(m_mqPublishConn);
}
4.2.2 消费者实例代码
(1) 建立连接
HPR_INT32 CRabbitMqQPRecv::creat_amqp_consumer()
{
HPR_INT32 iRetVal = HPR_ERROR;
//创建新的连接句柄
m_mqConsumConn = amqp_new_connection();
if (m_mqConsumConn==NULL)
{
FIRE_ERROR("New mq connecttion error");
return iRetVal;
}
//创建tcp socket
m_mqSocket = amqp_tcp_socket_new(m_mqConsumConn);
if(!m_mqSocket)
{
FIRE_ERROR("New amqp socket error");
return iRetVal;
}
//建立socket连接
int iStatus = amqp_socket_open(m_mqSocket, m_strMqIp.c_str(), m_iMqPort);
if(iStatus)
{
FIRE_ERROR("amqp_socket open failed");
m_bConnectSuccess = HPR_FALSE;
return iRetVal;
}
string strRabbitMqName = CConfig::instance()->GetRabbitMqAMQPRecvInfo().m_strUserName;
string strRabbitMqPwd = CConfig::instance()->GetRabbitMqAMQPRecvInfo().m_strPassword;
//登陆服务器
m_mqReply = amqp_login(m_mqConsumConn,"/",AMQP_DEFAULT_MAX_CHANNELS,AMQP_DEFAULT_FRAME_SIZE,AMQP_DEFAULT_HEARTBEAT,AMQP_SASL_METHOD_PLAIN,strRabbitMqName.c_str(),strRabbitMqPwd.c_str());
if(m_mqReply.reply_type != AMQP_RESPONSE_NORMAL)
{
FIRE_ERROR("amqp login failed");
return iRetVal;
}
//在连接上创建一个通道
amqp_channel_open(m_mqConsumConn, 1);
//异步查看是否调用成功
m_mqReply = amqp_get_rpc_reply(m_mqConsumConn);
if(m_mqReply.reply_type == AMQP_RESPONSE_SERVER_EXCEPTION)
{
FIRE_ERROR("ConnectRabbitmq::amqp_channel_open error\n");
return iRetVal;
}
string strExchange = CConfig::instance()->GetRabbitMqAMQPRecvInfo().m_strExchange;
string strBindingKey =CConfig::instance()->GetRabbitMqAMQPRecvInfo().m_strRountKey;
//创建队列queue
amqp_queue_declare_ok_t *r = amqp_queue_declare(m_mqConsumConn, 1, amqp_empty_bytes, 0, 0, 0, 1,
amqp_empty_table);
FIRE_INFO("The test name is %s",r->queue.bytes);
m_mqReply = amqp_get_rpc_reply(m_mqConsumConn);
if(m_mqReply.reply_type == AMQP_RESPONSE_SERVER_EXCEPTION)
{
FIRE_ERROR("ConnectRabbitmq::amqp_queue_declare error\n");
return iRetVal;
}
m_mqQueuename = amqp_bytes_malloc_dup(r->queue);
if (m_mqQueuename.bytes == NULL) {
FIRE_ERROR("Out of memory while copying queue name\n");
return iRetVal;
}
//绑定队列到exchange
amqp_queue_bind(m_mqConsumConn,
1,
amqp_empty_bytes,
amqp_cstring_bytes(strExchange.c_str()),
amqp_cstring_bytes(strBindingKey.c_str()),
amqp_empty_table);
m_mqReply = amqp_get_rpc_reply(m_mqConsumConn);
if(m_mqReply.reply_type == AMQP_RESPONSE_SERVER_EXCEPTION)
{
FIRE_ERROR("ConnectRabbitmq::amqp_queue_bind error\n");
return iRetVal;
}
//消费消息
amqp_basic_consume(m_mqConsumConn, 1, m_mqQueuename, amqp_empty_bytes, 0, 1, 0, amqp_empty_table);
m_mqReply = amqp_get_rpc_reply(m_mqConsumConn);
if(m_mqReply.reply_type == AMQP_RESPONSE_SERVER_EXCEPTION)
{
FIRE_ERROR("amqp_basic_consume \n");
return iRetVal;
}
return HPR_OK;
}
(2) 接收消息
HPR_VOID CRabbitMqQPRecv::DoRevAlarm()
{
char* ReciveBuff=NULL;
while (!m_bExit)
{
if(HPR_FALSE == m_bConnectSuccess)
{
Sleep(1000);
continue;
}
if(m_mqConsumConn !=NULL)
{
amqp_rpc_reply_t ret;
amqp_envelope_t envelope;
amqp_maybe_release_buffers(m_mqConsumConn);
ret = amqp_consume_message(m_mqConsumConn, &envelope, NULL, 0);
if (AMQP_RESPONSE_NORMAL != ret.reply_type) {
amqp_destroy_envelope(&envelope);
FIRE_ERROR("amqp_consume_message fail");
Sleep(1000);
continue;
}
ReciveBuff=g_MemPool.MemAlloc(MSG_LEN);
if (ReciveBuff==NULL)
{
FIRE_ERROR("g_MemPool MemAlloc fail");
return;
}
HPR_ZeroMemory(ReciveBuff, MSG_LEN);
memmove_s(ReciveBuff,MSG_LEN,envelope.message.body.bytes,envelope.message.body.len);
m_ReciveCallback(ReciveBuff,m_pDialog);
FIRE_INFO("Receieve the alarm msg is%s",ReciveBuff);
g_MemPool.MemRstore(ReciveBuff);
ReciveBuff=NULL;
amqp_destroy_envelope(&envelope);
}
//HPR_Sleep(1);
}
}
(3) 结束清除资源
HPR_VOID CRabbitMqQPRecv::destory_amqp_consumer()
{
if (m_mqConsumConn!=NULL)
{
amqp_maybe_release_buffers(m_mqConsumConn);
amqp_channel_close(m_mqConsumConn, 1, AMQP_REPLY_SUCCESS);
amqp_connection_close(m_mqConsumConn, AMQP_REPLY_SUCCESS);
amqp_destroy_connection(m_mqConsumConn);
}
}
自己编了一个股票监控软件,有如下功能,有兴趣的朋友可以下载;
(1) 个股监测。监测个股实时变化,可以监测个股大单交易、急速拉升和下降、主力入场和出场、股票最高点和最低点提醒。检测到最高点、最低点、主力进场点、主力退场点、急速拉升点、急速下跌点,给出语音或者声音提醒,不用再时刻看着大盘了,给你更多自由的时间;
(2) 大盘监测。监测大盘的走势,采用上证、深证、创业三大指数的综合指数作为大盘走势。并实时监测大盘的最高点和最低点、中间的转折点。
(3) 股票推荐。还能根据历史数据长期或短期走势进行分析,对股市3千多个股票进行分析对比,选出涨势良好的股票,按照增长速度从大到小排序,推荐给你涨势良好的股票;
下载地址:
1.0.3版本(修复大盘指数崩溃缺陷)下载地址:
链接:https://pan.baidu.com/s/1BJcTp-kdniM7VE9K5Kd3vg 提取码:003h
更新链接:
https://www.cnblogs.com/bclshuai/p/10621613.html