如何在Linux环境中的Qt项目中使用ActiveMQ-CPP

文章目录

  • 代码1:消费者
  • 代码2:生成者

之前在Linux下的qt程序中使用activeMQ的时候也是用了很多时间去研究,本来想的是好好记录一下,但是当时顾着写代码。很多细节也不想再去走一遍了。大概写一下怎么使用就行了。注意:一定要先开启服务器。

代码1:消费者

代码;这个代码其实是官网的源码里面提供的例子
我来对这个代码做个解释,也是作为自己的笔记。

#include <decaf/lang/Thread.h>
#include <decaf/lang/Runnable.h>
#include <decaf/util/concurrent/CountDownLatch.h>
#include <activemq/core/ActiveMQConnectionFactory.h>
#include <activemq/core/ActiveMQConnection.h>
#include <activemq/transport/DefaultTransportListener.h>
#include <activemq/library/ActiveMQCPP.h>
#include <decaf/lang/Integer.h>
#include <activemq/util/Config.h>
#include <decaf/util/Date.h>
#include <cms/Connection.h>
#include <cms/Session.h>
#include <cms/TextMessage.h>
#include <cms/BytesMessage.h>
#include <cms/MapMessage.h>
#include <cms/ExceptionListener.h>
#include <cms/MessageListener.h>
#include <stdlib.h>
#include <stdio.h>
#include <iostream>using namespace activemq;
using namespace activemq::core;
using namespace activemq::transport;
using namespace decaf::lang;
using namespace decaf::util;
using namespace decaf::util::concurrent;
using namespace cms;
using namespace std;
class SimpleAsyncConsumer : public ExceptionListener,public MessageListener,public DefaultTransportListener {
private:Connection* connection;Session* session;Destination* destination;MessageConsumer* consumer;bool useTopic;std::string brokerURI;std::string destURI;bool clientAck;private:SimpleAsyncConsumer(const SimpleAsyncConsumer&);SimpleAsyncConsumer& operator=(const SimpleAsyncConsumer&);public:SimpleAsyncConsumer(const std::string& brokerURI,const std::string& destURI,bool useTopic = false,bool clientAck = false) :connection(NULL),session(NULL),destination(NULL),consumer(NULL),useTopic(useTopic),brokerURI(brokerURI),destURI(destURI),clientAck(clientAck) {}virtual ~SimpleAsyncConsumer() {this->cleanup();}void close() {this->cleanup();}void runConsumer() {try {// Create a ConnectionFactoryActiveMQConnectionFactory* connectionFactory = new ActiveMQConnectionFactory(brokerURI);// Create a Connectionconnection = connectionFactory->createConnection();delete connectionFactory;ActiveMQConnection* amqConnection = dynamic_cast<ActiveMQConnection*>(connection);if (amqConnection != NULL) {amqConnection->addTransportListener(this);}//如果不是使用的failover连接的话。如果服务器没有开启,执行这一步的时候会抛出错误//如果是使用failover连接,执行到这一步的时候,就不会往下继续执行了,就一直卡在这里,直到和服务器建立连接//failover连接是一个什么模式我也不是很清楚,我只知道在使用这个模式连接的时候,它是会自动重连的。即使服务器中途断了,服务器再		    	次开启的时候,它是能自动连接上的connection->start();//监听异常。有异常抛出的时候会执行相关的函数connection->setExceptionListener(this);// Create a Sessionif (clientAck) {session = connection->createSession(Session::CLIENT_ACKNOWLEDGE);} else {session = connection->createSession(Session::AUTO_ACKNOWLEDGE);}// Create the destination (Topic or Queue)//使用queue模式还是topic模式//queue模式是一对一连接:一个生成者一个消费者。//topic模式是多对多连接;可以有多个生成者和多个消费者//如果你需要既发送有接收的消息的话,这个情况大概率是应该使用topic模式的//在使用topic模式的时候,你自己通过生成者发出去的消息,你自己的消费者也是会收到这个消息的//这个时候,你可以设置消息过滤,或者使用消息属性识别。if (useTopic) {destination = session->createTopic(destURI);} else {destination = session->createQueue(destURI);}// Create a MessageConsumer from the Session to the Topic or Queueconsumer = session->createConsumer(destination);consumer->setMessageListener(this);} catch (CMSException& e) {e.printStackTrace();}}// Called from the consumer since this class is a registered MessageListener.virtual void onMessage(const Message* message) {static int count = 0;try {count++;const TextMessage* textMessage = dynamic_cast<const TextMessage*>(message);string text = "";if (textMessage != NULL) {text = textMessage->getText();} else {text = "NOT A TEXTMESSAGE!";}if (clientAck) {message->acknowledge();}printf("Message #%d Received: %s\n", count, text.c_str());} catch (CMSException& e) {e.printStackTrace();}}// If something bad happens you see it here as this class is also been// registered as an ExceptionListener with the connection.virtual void onException(const CMSException& ex AMQCPP_UNUSED) {printf("CMS Exception occurred.  Shutting down client.\n");exit(1);}virtual void onException(const decaf::lang::Exception& ex) {printf("Transport Exception occurred: %s \n", ex.getMessage().c_str());}virtual void transportInterrupted() {std::cout << "The Connection's Transport has been Interrupted." << std::endl;}virtual void transportResumed() {std::cout << "The Connection's Transport has been Restored." << std::endl;}private:void cleanup(){//*************************************************// Always close destination, consumers and producers before// you destroy their sessions and connection.//*************************************************// Destroy resources.try{if( destination != NULL ) delete destination;}catch (CMSException& e) {}destination = NULL;try{if( consumer != NULL ) delete consumer;}catch (CMSException& e) {}consumer = NULL;// Close open resources.try{if( session != NULL ) session->close();if( connection != NULL ) connection->close();}catch (CMSException& e) {}// Now Destroy themtry{if( session != NULL ) delete session;}catch (CMSException& e) {}session = NULL;try{if( connection != NULL ) delete connection;}catch (CMSException& e) {}connection = NULL;}
};
int main(int argc AMQCPP_UNUSED, char* argv[] AMQCPP_UNUSED) {activemq::library::ActiveMQCPP::initializeLibrary();std::cout << "=====================================================\n";std::cout << "Starting the example:" << std::endl;std::cout << "-----------------------------------------------------\n";// Set the URI to point to the IPAddress of your broker.// add any optional params to the url to enable things like// tightMarshalling or tcp logging etc.  See the CMS web site for// a full list of configuration options.////  http://activemq.apache.org/cms///// Wire Format Options:// =====================// Use either stomp or openwire, the default ports are different for each//// Examples://    tcp://127.0.0.1:61616                      default to openwire//    tcp://127.0.0.1:61616?wireFormat=openwire  same as above//    tcp://127.0.0.1:61613?wireFormat=stomp     use stomp instead//std::string brokerURI ="failover:(tcp://127.0.0.1:61616"
//        "?wireFormat=openwire"
//        "&connection.useAsyncSend=true"
//        "&transport.commandTracingEnabled=true"
//        "&transport.tcpTracingEnabled=true"
//        "&wireFormat.tightEncodingEnabled=true"")";//============================================================// This is the Destination Name and URI options.  Use this to// customize where the consumer listens, to have the consumer// use a topic or queue set the 'useTopics' flag.//============================================================std::string destURI = "TEST.FOO"; //?consumer.prefetchSize=1";//============================================================// set to true to use topics instead of queues// Note in the code above that this causes createTopic or// createQueue to be used in the consumer.//============================================================bool useTopics = false;//============================================================// set to true if you want the consumer to use client ack mode// instead of the default auto ack mode.//============================================================bool clientAck = false;// Create the consumerSimpleAsyncConsumer consumer( brokerURI, destURI, useTopics, clientAck );// Start it up and it will listen forever.consumer.runConsumer();// Wait to exit.std::cout << "Press 'q' to quit" << std::endl;while( std::cin.get() != 'q') {}// All CMS resources should be closed before the library is shutdown.consumer.close();std::cout << "-----------------------------------------------------\n";std::cout << "Finished with the example." << std::endl;std::cout << "=====================================================\n";activemq::library::ActiveMQCPP::shutdownLibrary();
}

代码2:生成者

#include <decaf/lang/Thread.h>
#include <decaf/lang/Runnable.h>
#include <decaf/util/concurrent/CountDownLatch.h>
#include <decaf/lang/Long.h>
#include <decaf/util/Date.h>
#include <activemq/core/ActiveMQConnectionFactory.h>
#include <activemq/util/Config.h>
#include <activemq/library/ActiveMQCPP.h>
#include <cms/Connection.h>
#include <cms/Session.h>
#include <cms/TextMessage.h>
#include <cms/BytesMessage.h>
#include <cms/MapMessage.h>
#include <cms/ExceptionListener.h>
#include <cms/MessageListener.h>
#include <stdlib.h>
#include <stdio.h>
#include <iostream>
#include <memory>using namespace activemq;
using namespace activemq::core;
using namespace decaf;
using namespace decaf::lang;
using namespace decaf::util;
using namespace decaf::util::concurrent;
using namespace cms;
using namespace std;
class SimpleProducer : public Runnable {
private:Connection* connection;Session* session;Destination* destination;MessageProducer* producer;bool useTopic;bool clientAck;unsigned int numMessages;std::string brokerURI;std::string destURI;private:SimpleProducer( const SimpleProducer& );SimpleProducer& operator= ( const SimpleProducer& );public:SimpleProducer( const std::string& brokerURI, unsigned int numMessages,const std::string& destURI, bool useTopic = false, bool clientAck = false ) :connection(NULL),session(NULL),destination(NULL),producer(NULL),useTopic(useTopic),clientAck(clientAck),numMessages(numMessages),brokerURI(brokerURI),destURI(destURI) {}virtual ~SimpleProducer(){cleanup();}void close() {this->cleanup();}virtual void run() {try {// Create a ConnectionFactoryauto_ptr<ActiveMQConnectionFactory> connectionFactory(new ActiveMQConnectionFactory( brokerURI ) );// Create a Connectiontry{connection = connectionFactory->createConnection();connection->start();} catch( CMSException& e ) {e.printStackTrace();throw e;}// Create a Sessionif( clientAck ) {session = connection->createSession( Session::CLIENT_ACKNOWLEDGE );} else {session = connection->createSession( Session::AUTO_ACKNOWLEDGE );}// Create the destination (Topic or Queue)if( useTopic ) {destination = session->createTopic( destURI );} else {destination = session->createQueue( destURI );}// Create a MessageProducer from the Session to the Topic or Queueproducer = session->createProducer( destination );producer->setDeliveryMode( DeliveryMode::NON_PERSISTENT );// Create the Thread Id Stringstring threadIdStr = Long::toString( Thread::currentThread()->getId() );// Create a messagesstring text = (string)"Hello world! from thread " + threadIdStr;for( unsigned int ix=0; ix<numMessages; ++ix ){TextMessage* message = session->createTextMessage( text );//这里有一个setIntProperty,应该也会有一个对应的getIntProperty(我没有去看有没有)。你可以在消费者获取消息的时候调用这个函数,函数属性是否一致,如果一致说明是自己发送的消息,就可以不去处理它。//可以确认的是TextMessage是有一个setPropertyString和getPropertyString函数的,这个函数可以用了区别是否是自己的消息。message->setIntProperty( "Integer", ix );// Tell the producer to send the messageprintf( "Sent message #%d from thread %s\n", ix+1, threadIdStr.c_str() );producer->send( message );delete message;}}catch ( CMSException& e ) {e.printStackTrace();}}private:void cleanup(){// Destroy resources.try{if( destination != NULL ) delete destination;}catch ( CMSException& e ) { e.printStackTrace(); }destination = NULL;try{if( producer != NULL ) delete producer;}catch ( CMSException& e ) { e.printStackTrace(); }producer = NULL;// Close open resources.try{if( session != NULL ) session->close();if( connection != NULL ) connection->close();}catch ( CMSException& e ) { e.printStackTrace(); }try{if( session != NULL ) delete session;}catch ( CMSException& e ) { e.printStackTrace(); }session = NULL;try{if( connection != NULL ) delete connection;}catch ( CMSException& e ) { e.printStackTrace(); }connection = NULL;}
};
int main(int argc AMQCPP_UNUSED, char* argv[] AMQCPP_UNUSED) {//这个函数只调用一次activemq::library::ActiveMQCPP::initializeLibrary();std::cout << "=====================================================\n";std::cout << "Starting the example:" << std::endl;std::cout << "-----------------------------------------------------\n";// Set the URI to point to the IPAddress of your broker.// add any optional params to the url to enable things like// tightMarshalling or tcp logging etc.  See the CMS web site for// a full list of configuration options.////  http://activemq.apache.org/cms///// Wire Format Options:// =====================// Use either stomp or openwire, the default ports are different for each//// Examples://    tcp://127.0.0.1:61616                      default to openwire//    tcp://127.0.0.1:61616?wireFormat=openwire  same as above//    tcp://127.0.0.1:61613?wireFormat=stomp     use stomp instead//std::string brokerURI ="failover://(tcp://127.0.0.1:61616"
//        "?wireFormat=openwire"
//        "&connection.useAsyncSend=true"
//        "&transport.commandTracingEnabled=true"
//        "&transport.tcpTracingEnabled=true"
//        "&wireFormat.tightEncodingEnabled=true"")";//============================================================// Total number of messages for this producer to send.//============================================================unsigned int numMessages = 2000;//============================================================// This is the Destination Name and URI options.  Use this to// customize where the Producer produces, to have the producer// use a topic or queue set the 'useTopics' flag.//============================================================std::string destURI = "TEST.FOO";//============================================================// set to true to use topics instead of queues// Note in the code above that this causes createTopic or// createQueue to be used in the producer.//============================================================bool useTopics = false;// Create the producer and run it.SimpleProducer producer( brokerURI, numMessages, destURI, useTopics );// Publish the given number of Messagesproducer.run();// Before exiting we ensure that all CMS resources are closed.producer.close();std::cout << "-----------------------------------------------------\n";std::cout << "Finished with the example." << std::endl;std::cout << "=====================================================\n";activemq::library::ActiveMQCPP::shutdownLibrary();
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/59218.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Qt QCheckBox、QPushButton和QRadioButton详解

QCheckBox&#xff08;复选框&#xff09; 功能&#xff1a;QCheckBox用于创建一个复选框控件&#xff0c;允许用户从多个选项中选择多个。 属性&#xff1a; checkable&#xff1a;决定复选框是否可以被选中或取消选中。checked&#xff1a;表示复选框当前的选中状态&#…

6、显卡品牌分类介绍:技嘉 - 计算机硬件品牌系列文章

技嘉科技是一家以主板、‌显卡在业界缔造无以撼动的地位的科技公司&#xff0c;‌其核心理念是「‌技术创新、‌质量稳定」‌的高标准。‌技嘉专注于关键技术研发&#xff0c;‌其经营范围涵盖家用、‌商用、‌电竞等多元科技领域。‌通过应用突破性的专利技术&#xff0c;‌技…

自编以e为底的指数函数exp,性能接近标准库函数

算法描述&#xff1a; (1). 先做自变量x的范围检查&#xff0c;对于双精度浮点数&#xff0c;自变量不能超出(-1022ln2, 1024ln2)(-708.39, 709.78)&#xff0c;否则exp(x)会溢出。对于单精度浮点数&#xff0c;自变量不能超出(-126ln2, 128ln2)(-87.33, 88.72). 自己使用此函数…

es安装拼音分词后Kibana出现内存错误

出现错误 今天在安装es的拼音分词器&#xff0c;并重启es容器后&#xff0c;登录Kibana无法使用&#xff0c;查询日志发现如下报错 Waiting until all Elasticsearch nodes are compatible with Kibana before starting saved objects migrations... | typelog timestamp2024…

前端react面试基础知识(II)

这些问题涵盖了 React 的很多核心概念和实际应用场景。下面是针对每个问题的详细回答&#xff1a; 1. **React 项目中&#xff0c;如何动态改变组件的 class 来切换样式?** 可以通过条件判断或者状态&#xff08;state&#xff09;来动态改变组件的 class。例如&#xff0c;使…

Day 42 || 完全背包、518. 零钱兑换 II 、 377. 组合总和 Ⅳ、70. 爬楼梯 (进阶)

完全背包 题目链接&#xff1a;卡码网第52题 思路&#xff1a;和之前01背包一样&#xff0c;但是物品可以无限放置&#xff0c;所以之前二维数组中的背包容量是倒序遍历的&#xff0c;现在可以正序遍历即可重复放入。 import java.util.Scanner; public class Main {public …

数据结构-二叉树中的递归

目录 前言 简单手撕二叉树 二叉树节点的求解 二叉树叶子节点的求解 二叉树高度 二叉树第K层节点的个数 二叉树查找值为X的节点 结束语 前言 在这里说声抱歉&#xff0c;好久没更新数据结构了&#xff0c;二叉树的相关内容还没有更新完&#xff0c;是小编的失职&#xff…

在基于AWS EC2的云端k8s环境中 搭建开发基础设施

中间件下载使用helm,这里部署的都是单机版的 aws-ebs-storageclass.yaml apiVersion: storage.k8s.io/v1 kind: StorageClass metadata:name: aws-ebs-storageclass provisioner: kubernetes.io/aws-ebs parameters:type: gp2 # 选择合适的 EBS 类型&#xff0c;如 gp2、io1…

Apache Calcite - 查询优化之自定义优化规则

RelOptRule简介 为了自定义优化规则&#xff0c;我们需要继承RelOptRule类。org.apache.calcite.plan.RelOptRule 是 Apache Calcite 中的一个抽象类&#xff0c;用于定义优化规则。优化规则是用于匹配查询计划中的特定模式&#xff0c;并将其转换为更优化的形式的逻辑。通过继…

2024网鼎杯青龙组wp:Crypto1

题目 附件内容如下 from Crypto.Util.number import * from secret import flag from Cryptodome.PublicKey import RSAp getPrime(512) q getPrime(512) n p * q d getPrime(299) e inverse(d,(p-1)*(q-1)) m bytes_to_long(flag) c pow(m,e,n) hint1 p >> (51…

Python 单元测试中的 Mocking 与 Stubbing:提高测试效率的关键技术

在软件开发过程中&#xff0c;单元测试是确保代码质量的重要环节。为了实现高效的单元测试&#xff0c;我们常常需要隔离待测试的代码与其外部依赖。这时候&#xff0c;Mocking&#xff08;模拟&#xff09;和 Stubbing&#xff08;桩&#xff09;技术就显得尤为重要。这两种技…

Golang | Leetcode Golang题解之第528题按权重随机选择

题目&#xff1a; 题解&#xff1a; type Solution struct {pre []int }func Constructor(w []int) Solution {for i : 1; i < len(w); i {w[i] w[i-1]}return Solution{w} }func (s *Solution) PickIndex() int {x : rand.Intn(s.pre[len(s.pre)-1]) 1return sort.Searc…

3D打印机 屏幕的固定挂钩断后的一次自己修复经历

引子 3D打印机的屏幕固定挂钩断了 这次确实不知道咋断的&#xff0c;这可咋办呢&#xff0c;到网上看了一下&#xff0c;一个屏幕要2佰多&#xff0c;有些小贵&#xff0c;要不就自己修修吧&#xff0c;打个挂钩按上&#xff0c;说干就干。 正文 freecad的设计图如下【其中各…

PHP合成图片,生成海报图,poster-editor使用说明

之前写过一篇使用Grafika插件生成海报图的文章&#xff0c;但是当我再次使用时&#xff0c;却发生了错误&#xff0c;回看Grafika文档&#xff0c;发现很久没更新了&#xff0c;不兼容新版的GD&#xff0c;所以改用了intervention/image插件来生成海报图。 但是后来需要对海报…

Java基于微信小程序的美食推荐系统(附源码,文档)

博主介绍&#xff1a;✌程序猿徐师兄、8年大厂程序员经历。全网粉丝15w、csdn博客专家、掘金/华为云/阿里云/InfoQ等平台优质作者、专注于Java技术领域和毕业项目实战✌ &#x1f345;文末获取源码联系&#x1f345; &#x1f447;&#x1f3fb; 精彩专栏推荐订阅&#x1f447;…

Linux的IP网路命令: 用于显示和操作网络接口(网络设备)的命令ip link详解

目录 一、概述 二、用法 1、基本语法 2、常用选项 3、常用参数 4、获取帮助 三、示例 1. 显示所有网络接口的信息 &#xff08;1&#xff09;命令 &#xff08;2&#xff09;输出示例 &#xff08;3&#xff09;实际操作 2. 启动网络接口 3. 停止网络接口 4. 更改…

【驱动】地平线X3交叉编译工具搭建、源码下载

1、交叉编译工具搭建 1)安装依赖包 sudo apt-get install -y build-essential make cmake libpcre3 libpcre3-dev bc bison \ flex python3-numpy mtd-utils zlib1g-dev debootstrap \ libdata-hexdumper-perl libncurses5-dev zip qemu-user-static \ curl repo git liblz4…

服务器上清理Docker容器运行日志的正确方法

为啥要清理服务器上docker容器的日志 因为是服务器的磁盘空间资源有限&#xff0c;由于docker容器在启动的时候没有限制&#xff0c;导致运行的docker容器随着时间的推移产生的日志越来越多&#xff0c;最后把服务磁盘资源耗尽&#xff0c;服务器的磁盘满了会导致服务器的应用无…

C语言 | Leetcode C语言题解之第526题优美的排列

题目&#xff1a; 题解&#xff1a; int countArrangement(int n) {int f[1 << n];memset(f, 0, sizeof(f));f[0] 1;for (int mask 1; mask < (1 << n); mask) {int num __builtin_popcount(mask);for (int i 0; i < n; i) {if (mask & (1 <<…

SpringBoot篇(自动装配原理)

目录 一、自动装配机制 1. 简介 2. 自动装配主要依靠三个核心的关键技术 3. run()方法加载启动类 4. 注解SpringBootApplication包含了多个注解 4.1 SpringBootConfiguration 4.2 ComponentScan 4.3 EnableAutoConfiguration 5. SpringBootApplication一共做了三件事 …