ActiveMQ——activemq的使用java代码实例(精选)

ActiveMQ 在java中的使用,通过单例模式、工厂实现

Jms规范里的两种message传输方式Topic和Queue,两者的对比如下表():

 

 

 

 TopicQueue
概要Publish  Subscribe messaging 发布订阅消息Point-to-Point  点对点
有无状态topic数据默认不落地,是无状态的。

Queue数据默认会在mq服务器上以文件形式保存,比如Active MQ一般保存在$AMQ_HOME\data\kr-store\data下面。也可以配置成DB存储。

完整性保障并不保证publisher发布的每条数据,Subscriber都能接受到。Queue保证每条数据都能被receiver接收。
消息是否会丢失一般来说publisher发布消息到某一个topic时,只有正在监听该topic地址的sub能够接收到消息;如果没有sub在监听,该topic就丢失了。Sender发送消息到目标Queue,receiver可以异步接收这个Queue上的消息。Queue上的消息如果暂时没有receiver来取,也不会丢失。
消息发布接收策略一对多的消息发布接收策略,监听同一个topic地址的多个sub都能收到publisher发送的消息。Sub接收完通知mq服务器一对一的消息发布接收策略,一个sender发送的消息,只能有一个receiver接收。receiver接收完后,通知mq服务器已接收,mq服务器对queue里的消息采取删除或其他操作。

 

一、导jar包

 activemq的依赖包

<dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>4.12</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>jul-to-slf4j</artifactId><version>1.6.1</version></dependency><dependency><groupId>org.apache.activemq</groupId><artifactId>activemq-all</artifactId><version>5.13.3</version></dependency><dependency><groupId>org.springframework</groupId><artifactId>spring-jms</artifactId><version>4.3.1.RELEASE</version><exclusions><exclusion><groupId>org.springframework</groupId><artifactId>spring-messaging</artifactId></exclusion><exclusion><groupId>org.springframework</groupId><artifactId>spring-context</artifactId></exclusion><exclusion><groupId>org.springframework</groupId><artifactId>spring-beans</artifactId></exclusion><exclusion><groupId>org.springframework</groupId><artifactId>spring-aop</artifactId></exclusion><exclusion><groupId>org.springframework</groupId><artifactId>spring-tx</artifactId></exclusion><exclusion><groupId>org.springframework</groupId><artifactId>spring-core</artifactId></exclusion></exclusions></dependency>

 

 

二、java代码

 创建一下四个java文件,成为mq的公共数据连接池

1、连接工厂 配置

package com.broadsense.iov.base.jms;import org.apache.activemq.ActiveMQConnectionFactory;
import org.springframework.jms.connection.CachingConnectionFactory;
/*** 连接工厂 配置* * @author flm* 2017年10月13日*/
public class ConnectionFactory
{private static final String URL = "tcp://10.10.1.1:61616";private static final String USERNAME = "hkadmin";private static final String PASSWORD = "hk667";private static final int SESSIONCACHESIZE = 20;private javax.jms.ConnectionFactory factory;public static synchronized javax.jms.ConnectionFactory getInstance(){if (SingletonHolder.INSTANCE.factory == null) {SingletonHolder.INSTANCE.build();}return SingletonHolder.INSTANCE.factory;}private void build(){AMQConfigBean bean = loadConfigure();this.factory = buildConnectionFactory(bean);}private javax.jms.ConnectionFactory buildConnectionFactory(AMQConfigBean bean) {javax.jms.ConnectionFactory targetFactory = new ActiveMQConnectionFactory(bean.getUserName(), bean.getPassword(), bean.getBrokerURL());CachingConnectionFactory connectoryFacotry = new CachingConnectionFactory();connectoryFacotry.setTargetConnectionFactory(targetFactory);connectoryFacotry.setSessionCacheSize(bean.getSessionCacheSize());return connectoryFacotry;}private AMQConfigBean loadConfigure() {if ("tcp://10.10.1.1:61616" != null) {try {return new AMQConfigBean("tcp://10.10.1.1:61616", "hkadmin", "hk667", 20);} catch (Exception e) {throw new IllegalStateException("load amq config error!");}}throw new IllegalStateException("load amq config error!");}private static class AMQConfigBean{private String brokerURL;private String userName;private String password;private int sessionCacheSize;public AMQConfigBean() {}public AMQConfigBean(String brokerURL, String userName, String password, int sessionCacheSize) {this.brokerURL = brokerURL;this.userName = userName;this.password = password;this.sessionCacheSize = sessionCacheSize;}public String getBrokerURL() {return this.brokerURL;}public void setBrokerURL(String brokerURL) {this.brokerURL = brokerURL;}public String getUserName() {return this.userName;}public void setUserName(String userName) {this.userName = userName;}public String getPassword() {return this.password;}public void setPassword(String password) {this.password = password;}public int getSessionCacheSize() {return this.sessionCacheSize;}public void setSessionCacheSize(int sessionCacheSize) {this.sessionCacheSize = sessionCacheSize;}}private static class SingletonHolder{static ConnectionFactory INSTANCE = new ConnectionFactory(null);}
}

 

2、模版

package com.broadsense.iov.base.jms;import org.springframework.jms.core.JmsTemplate;

/**
* 模板厂
*
* @author flm
* 2017年10月13日
*/

public class JmsTemplateFactory
{private final javax.jms.ConnectionFactory factory;private JmsTemplate topicJmsTemplate;private JmsTemplate queueJmsTemplate;public static JmsTemplateFactory getInstance(){return SingletonHolder.INSTANCE;}private JmsTemplateFactory(){this.factory = ConnectionFactory.getInstance();}public synchronized JmsTemplate getTopicJmsTemplate() {if (this.topicJmsTemplate == null) {this.topicJmsTemplate = createTemplate(this.factory, true);}return this.topicJmsTemplate;}public synchronized JmsTemplate getQueueJmsTemplate() {if (this.queueJmsTemplate == null) {this.queueJmsTemplate = createTemplate(this.factory, false);}return this.queueJmsTemplate;}private JmsTemplate createTemplate(javax.jms.ConnectionFactory factory, boolean pubSubDomain) {JmsTemplate template = new JmsTemplate(factory);template.setPubSubDomain(pubSubDomain);return template;}public static class SingletonHolder{static JmsTemplateFactory INSTANCE = new JmsTemplateFactory(null);}
}

 

3、消费者 模版

package com.broadsense.iov.base.jms;import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import javax.jms.Destination;
import javax.jms.MessageListener;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.jms.listener.SimpleMessageListenerContainer;
/*** JMS监听器  创建消费者* * @author flm* 2017年10月13日*/
public class JMSListener
{private static final Logger LOGGER = LoggerFactory.getLogger(JMSListener.class);private static final Map<String, Destination> MQDESTS = new ConcurrentHashMap();


  

  /**
  * 开启一个 点对点的 消息队列监听 的消费者
  *
  * @param queueName 队列名称
  * @param subName 订阅者的名字
  * @param listener 监听
  */

   public static synchronized void startJmsQueueListener(String queueName, MessageListener listener)

  {startJmsQueueListener(queueName, null, listener);}public static synchronized void startJmsQueueListener(String queueName, String subName, MessageListener listener) {Destination dst = (Destination)MQDESTS.get("QUEUE_" + queueName);if (dst == null) {ActiveMQQueue mq = new ActiveMQQueue(queueName);startJmsListener(mq, subName, listener);MQDESTS.put("QUEUE_" + queueName, mq);} else {LOGGER.warn(queueName + " already started");}}


  /**
  * 开启 一对多 主题的 消息监听的消费者
  *
  * @param topicName 主题消息名称
  * @param subName 订阅者的名字
  * @param listener 监听
  */

public static synchronized void startJmsTopicListener(String topicName, MessageListener listener){startJmsTopicListener(topicName, null, listener);}public static synchronized void startJmsTopicListener(String topicName, String subName, MessageListener listener) {ActiveMQTopic mq = new ActiveMQTopic(topicName);startJmsListener(mq, subName, listener);MQDESTS.put("QUEUE_" + topicName, mq);}

  

  /**
  * 开始 消息监听器 消费者
  *
  * @param dest 目的地
  * @param subName 持久订阅的名字
  * @param msgListener 消息监听器
  */

private static void startJmsListener(Destination dest, String subName, MessageListener msgListener){javax.jms.ConnectionFactory factory = ConnectionFactory.getInstance();SimpleMessageListenerContainer listener = new SimpleMessageListenerContainer();listener.setConnectionFactory(factory);listener.setDestination(dest);listener.setMessageListener(msgListener);if ((subName != null) && (subName != "")) {listener.setDurableSubscriptionName(subName);}listener.start();}
}

 

4、生产者 模版

package com.broadsense.iov.base.jms;import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;/*** 创建 jms生产者* * @author flm* 2017年10月13日*/
public class JMSPublisher
{

  


  /**
  * 发送消息
  * Topic 生产者
  *
  * @param dest 目的地
  * @param msg 消息内容
  */

public static void sendTopicMessage(String dest, String msg){JmsTemplateFactory.getInstance().getTopicJmsTemplate().send(dest, new MessageCreator(msg){public Message createMessage(Session session) throws JMSException {return session.createTextMessage(this.val$msg);}});}


  /**
  * 发送消息
  * Queue 生产者
  *
  * @param dest 目的地
  * @param msg 消息内容
  */

public static void sendQueueMessage(String dest, String msg){JmsTemplateFactory.getInstance().getQueueJmsTemplate().send(dest, new MessageCreator(msg){public Message createMessage(Session session) throws JMSException {return session.createTextMessage(this.val$msg);}});}
}

 

 

三、activemq的使用

1、创建一个junit测试,@Test 发布、接受、即可看到消息,mq管理后台也可以看到

package com.broadsense.iov.base.jms;import com.broadsense.iov.base.jms.JMSListener;
import com.broadsense.iov.base.jms.JMSPublisher;
import java.io.IOException;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
import org.junit.Test;/**** @author flm*/
public class JMSPublisherTest {public JMSPublisherTest() {}/*** 生产者 发布消息* @throws */@Testpublic void testSendMessage() throws InterruptedException {for (int idx = 1; idx < 3; idx++) {/** 生产者 发布 消息到 queue/queue_b 的队列中*/JMSPublisher.sendQueueMessage("queue/queue_b", String.valueOf(idx * 1111));/** 生产者 发布消息 到  topic/send 的Topic 主题中 *///JMSPublisher.sendTopicMessage("topic/send", String.valueOf(idx * 1111));
        }}/*** 消费者 订阅接受消息*/@Testpublic void receiver() {/** 消费者 订阅主题  topic/send 是否有消息发布,有侧打印出来  (通过 onMessage 监听)*//*JMSListener.startJmsTopicListener("topic/send", new MessageListener() {@Overridepublic void onMessage(Message message) {try {if (message instanceof TextMessage) {TextMessage msg = (TextMessage) message;System.out.println("== 收到一个JMS消息..." + msg.getText());} } catch (JMSException e) {e.printStackTrace();}}});*//** 消费者 订阅队列  queue/queue_b 是否有消息发布,有侧打印出来  (通过 onMessage 监听)*/JMSListener.startJmsQueueListener("queue/queue_b" ,new MessageListener() {@Overridepublic void onMessage(Message message) {try {if (message instanceof TextMessage) {TextMessage msg = (TextMessage) message;System.out.println("== 收到一个JMS消息..." + msg.getText());} } catch (JMSException e) {e.printStackTrace();}}});try {System.in.read();} catch (IOException ex) {Logger.getLogger(JMSPublisherTest.class.getName()).log(Level.SEVERE, null, ex);}}}
View Code

 

 2、真正的项目实现

在项目的中具体实现,是加载一个类来实现订阅消息

加载启动一个订阅的主题,给一个类MQ()处理

package com.ifengSearch.track.dao;import org.springframework.stereotype.Repository;import com.broadsense.iov.base.jms.JMSListener;/*** 项目启动即 开启* 通过 spring 依赖加载 Lister 订阅topic/send* @author flm* @2017年10月16日*/
@Repository
public class Lister {public Lister(){try {JMSListener.startJmsTopicListener("topic/send",new QM());// QM() 订阅 主题  topic/send} catch (Exception e) {}}
}
View Code

 

MQ()订阅消息的处理类,通过实现

package com.ifengSearch.track.dao;import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;/*** 通过 实现 MessageListener 的 onMessage 来监听消息 * 接受、处理消息* @author flm* @2017年10月16日*/
public class MQ implements MessageListener {@Overridepublic void onMessage(Message message) {try {if (message instanceof TextMessage) {TextMessage msg = (TextMessage) message;System.out.println("== 收到一个JMS消息..." + msg.getText());} } catch (JMSException e) {e.printStackTrace();}}
}
View Code

 

转载于:https://www.cnblogs.com/lemon-flm/p/7668076.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/368979.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Eclipse+GitHub 提交代码错误 -“rejected - non-fast-forward”

Eclipse Push出现rejected - non-fast-forward错误 在 Push到服务器时有时会出现 rejected - non-fast-forward 错误&#xff0c;这是由于远端发生改变&#xff0c;此时再提交之前你需要将远端的改变合并到本地上错误原因&#xff1a;文件冲突&#xff0c;本地的代码和远程Repo…

两天学会css基础(一)

什么是css&#xff1f;css的作用是什么&#xff1f; CSS 指层叠样式表 (Cascading Style Sheets)主要作用就是给HTML结构添加样式&#xff0c;搭建页面结构&#xff0c;比如设置元素的宽高大小&#xff0c;颜色&#xff0c;位置等等。 学习css之前先了解一下css代码在HTML中的…

LVM 逻辑卷 (logica volume manager)

逻辑卷轴管理员 (Logical Volume Manager) 想像一个情况&#xff0c;你在当初规划主机的时候将 /home 只给他 50G &#xff0c;等到使用者众多之后导致这个 filesystem 不够大&#xff0c; 此时你能怎么作&#xff1f; 多数的朋友都是这样&#xff1a;再加一颗新硬盘&#xff0…

sql查询语句for xml path语法

【原地址】 for xml path作用&#xff1a;将多行的查询结果&#xff0c;根据某一些条件合并到一行。 例&#xff1a;现有一张表 执行下面语句 select Department,(SELECT Employee, FROM People b WHERE b.Departmenta.Department For XML Path()) Student from People as a g…

Android Studio打包以及Gradle配置构建

本文转载 郭霖公众号 https://mp.weixin.qq.com/s?__bizMzA5MzI3NjE2MA&mid2650241610&idx1&snb8af73f6c288b6617d9fe0ab3618118d&pass_ticketQK4j37kpmGNlsYcECWMb64HxKHEVJG5mSJubQEQguKI%3D 生成签名文件手动打包 首先生成签名文件&#xff0c;点击 Build…

java重载方法math_Java语言程序设计(十二)Math数学类,方法重载及变量作用域...

1.重载方法上一篇文章用到的max方法只能用于int型数据类型&#xff0c;但是如果需要决定两个浮点数中哪个较大&#xff0c;解决方法是创建另一个方法名相同但参数不同的方法&#xff0c;代码如下&#xff1a;public static double max(double num1, double num2){if(num1>nu…

编码(转)

https://www.zhihu.com/question/28164512 关于编码和乱码的问题&#xff0c;我简单讲一下。 通常问这类问题的人是混淆了若干个不同的概念&#xff0c;并且他们自己也没有意识到自己混淆了这些概念的。 终端显示字符的编码&#xff08;windows下终端是cmd&#xff0c;linux下是…

Spring MVC:测试简介

测试是软件开发中最重要的部分之一。 井井有条的测试有助于使应用程序代码保持良好状态&#xff0c;并且处于工作状态。 有很多不同类型的测试和方法。 在本文中&#xff0c;我想对基于Spring MVC的应用程序进行单元测试进行介绍。 不要希望在这里阅读有关Spring MVC测试的全部…

试验ConcurrentHashmap

我正在研究我最近的一个项目中的内存问题&#xff0c;该项目将数据保留在内存中以进行快速访问&#xff0c;但是应用程序的内存占用量非常大。 该应用程序大量使用CHM&#xff08;即Concurrenthashmap&#xff09; &#xff0c;因此&#xff0c;无需再费脑筋地猜测CHM是问题所…

java线程池任务失败_ThreadPoolExecutor线程池任务执行失败的时候会怎样

1. 任务执行失败时的处理逻辑1.1. WorkerWorker相当于线程池中的线程可以看到&#xff0c;Worker有几个重要的属性&#xff1a;thread &#xff1a; 这是Worker运行的线程&#xff0c;可以理解为一个Worker就是一个线程firstTask &#xff1a; 初始任务&#xff0c;可能为为n…

转:HttpModule与HttpHandler详解

ASP.NET对请求处理的过程&#xff1a;当请求一个*.aspx文件的时候&#xff0c;这个请求会被inetinfo.exe进程截获&#xff0c;它判断文件的后缀&#xff08;aspx&#xff09;之后&#xff0c;将这个请求转交给 ASPNET_ISAPI.dll&#xff0c;ASPNET_ISAPI.dll会通过http管道&…

mysql时间函数总结_MySQL 日期时间函数常用总结

获得当前日期时间(date time)1.1 函数&#xff1a;now()相关函数&#xff1a;current_timestamp()&#xff0c;localtime()&#xff0c;localtimestamp()举例说明&#xff1a;2. 获得当前日期(date)函数&#xff1a;curdate()相关函数&#xff1a;current_date()&#xff0…

Apache CXF – JAX-WS –简单教程

许多Java开发人员认为Web Service实现的任务艰巨-没人能真正责怪他们&#xff0c;尤其是在企业应用程序开发的多年中&#xff0c;这给开发和设计带来了很多复杂性。 对于某些人来说&#xff0c;了解它是构建完整的企业应用程序的下一步-Web服务-是实现面向服务设计的关键方案之…

Java StringBuilder神话被揭穿

神话 用加号运算符连接两个字符串是万恶之源 -匿名Java开发人员 注意 &#xff1a;此处讨论的测试的源代码可以在Github上找到 从大学时代起&#xff0c;我就学会了使用运算符将Java中的String连接视为致命的性能缺陷。 最近&#xff0c;在Backbase R&#xff06;D上进行了一…

java3d创建立方体_Opengl创建几何实体——四棱锥和立方体

//#include #include #include using namespace std;float rtri;float rquad;GLfloat points0[5][3] { {0,1,0},{-1,-1,1},{1,-1,1},{1,-1,-1},{-1,-1,-1} };GLfloat points1[8][3] { {1,1,-1},{-1,1,-1},{-1,1,1},{1,1,1},{1,-1,1},{-1,-1,1},{-1,-1,-1},{1,-1,-1} };//四棱…

WSO2 ESB的一种消息传递方式

正如我之前在WSO2 ESB工作时所发布的那样。 为了更好地理解此ESB&#xff0c;我一直在浏览示例 &#xff08;尚未完成所有示例 &#xff09;。 示例12是关于与ESB的单向消息传递&#xff0c;并使用TCP监视器使其可见。 我之前已经介绍过如何设置类似的工具“ TcpTunnelGUI”&am…

Eclipse-Java代码规范和质量检查插件-Checkstyle

CheckStyle是SourceForge下的一个项目&#xff0c;提供了一个帮助JAVA开发人员遵守某些编码规范的工具。它能够自动化代码规范检查过程&#xff0c;从而使得开发人员从这项重要但枯燥的任务中解脱出来。它可以根据设置好的编码规则来检查代码。比如符合规范的变量命名&#xff…

介绍一款好用 mongodb 可视化工具

最近想自己搭建一个个人博客&#xff0c;所以学了下mongodb&#xff0c;mongodb是用命令行输入的&#xff0c;有些人可能不太习惯&#xff0c;我自己找了下mongodb的一些可视化工具&#xff0c;发现了一款adminmongo很好用&#xff0c;这里介绍给你们用一下。 github地址&#…

用CSS3来代替JS实现交互

【CSS3和JS】 对于CSS了解的同学都知道&#xff0c;CSS的实现是最底层的&#xff0c;在实现方式和性能上都不是&#xff0c;JS这种提供接口的脚本可比的&#xff1b;从CSS3的动画和JS动画对比角度来看两者&#xff0c;会更清晰&#xff1b;而且随着前端框架的使用&#xff0c;…

php 如何宏定义,php – 在html中实现宏定义的方法

也许显而易见,但C预处理器可以完成这项工作.index._html#define _em(a) a #define _image(a, b) #define _list(a, b, c) a \\ b \ c \#define _theTile The Bar Title#include "head._html"_list(foo, bar, bean)This is really _em(great)_image(media/cat.jpg, …