ActiveMQ 在java中的使用,通过单例模式、工厂实现
Jms规范里的两种message传输方式Topic和Queue,两者的对比如下表():
Topic | Queue | |
概要 | Publish Subscribe messaging 发布订阅消息 | Point-to-Point 点对点 |
有无状态 | topic数据默认不落地,是无状态的。 | Queue数据默认会在mq服务器上以文件形式保存,比如Active MQ一般保存在$AMQ_HOME\data\kr-store\data下面。也可以配置成DB存储。 |
完整性保障 | 并不保证publisher发布的每条数据,Subscriber都能接受到。 | Queue保证每条数据都能被receiver接收。 |
消息是否会丢失 | 一般来说publisher发布消息到某一个topic时,只有正在监听该topic地址的sub能够接收到消息;如果没有sub在监听,该topic就丢失了。 | Sender发送消息到目标Queue,receiver可以异步接收这个Queue上的消息。Queue上的消息如果暂时没有receiver来取,也不会丢失。 |
消息发布接收策略 | 一对多的消息发布接收策略,监听同一个topic地址的多个sub都能收到publisher发送的消息。Sub接收完通知mq服务器 | 一对一的消息发布接收策略,一个sender发送的消息,只能有一个receiver接收。receiver接收完后,通知mq服务器已接收,mq服务器对queue里的消息采取删除或其他操作。 |
一、导jar包
activemq的依赖包
<dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>4.12</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>jul-to-slf4j</artifactId><version>1.6.1</version></dependency><dependency><groupId>org.apache.activemq</groupId><artifactId>activemq-all</artifactId><version>5.13.3</version></dependency><dependency><groupId>org.springframework</groupId><artifactId>spring-jms</artifactId><version>4.3.1.RELEASE</version><exclusions><exclusion><groupId>org.springframework</groupId><artifactId>spring-messaging</artifactId></exclusion><exclusion><groupId>org.springframework</groupId><artifactId>spring-context</artifactId></exclusion><exclusion><groupId>org.springframework</groupId><artifactId>spring-beans</artifactId></exclusion><exclusion><groupId>org.springframework</groupId><artifactId>spring-aop</artifactId></exclusion><exclusion><groupId>org.springframework</groupId><artifactId>spring-tx</artifactId></exclusion><exclusion><groupId>org.springframework</groupId><artifactId>spring-core</artifactId></exclusion></exclusions></dependency>
二、java代码
创建一下四个java文件,成为mq的公共数据连接池
1、连接工厂 配置
package com.broadsense.iov.base.jms;import org.apache.activemq.ActiveMQConnectionFactory; import org.springframework.jms.connection.CachingConnectionFactory; /*** 连接工厂 配置* * @author flm* 2017年10月13日*/ public class ConnectionFactory {private static final String URL = "tcp://10.10.1.1:61616";private static final String USERNAME = "hkadmin";private static final String PASSWORD = "hk667";private static final int SESSIONCACHESIZE = 20;private javax.jms.ConnectionFactory factory;public static synchronized javax.jms.ConnectionFactory getInstance(){if (SingletonHolder.INSTANCE.factory == null) {SingletonHolder.INSTANCE.build();}return SingletonHolder.INSTANCE.factory;}private void build(){AMQConfigBean bean = loadConfigure();this.factory = buildConnectionFactory(bean);}private javax.jms.ConnectionFactory buildConnectionFactory(AMQConfigBean bean) {javax.jms.ConnectionFactory targetFactory = new ActiveMQConnectionFactory(bean.getUserName(), bean.getPassword(), bean.getBrokerURL());CachingConnectionFactory connectoryFacotry = new CachingConnectionFactory();connectoryFacotry.setTargetConnectionFactory(targetFactory);connectoryFacotry.setSessionCacheSize(bean.getSessionCacheSize());return connectoryFacotry;}private AMQConfigBean loadConfigure() {if ("tcp://10.10.1.1:61616" != null) {try {return new AMQConfigBean("tcp://10.10.1.1:61616", "hkadmin", "hk667", 20);} catch (Exception e) {throw new IllegalStateException("load amq config error!");}}throw new IllegalStateException("load amq config error!");}private static class AMQConfigBean{private String brokerURL;private String userName;private String password;private int sessionCacheSize;public AMQConfigBean() {}public AMQConfigBean(String brokerURL, String userName, String password, int sessionCacheSize) {this.brokerURL = brokerURL;this.userName = userName;this.password = password;this.sessionCacheSize = sessionCacheSize;}public String getBrokerURL() {return this.brokerURL;}public void setBrokerURL(String brokerURL) {this.brokerURL = brokerURL;}public String getUserName() {return this.userName;}public void setUserName(String userName) {this.userName = userName;}public String getPassword() {return this.password;}public void setPassword(String password) {this.password = password;}public int getSessionCacheSize() {return this.sessionCacheSize;}public void setSessionCacheSize(int sessionCacheSize) {this.sessionCacheSize = sessionCacheSize;}}private static class SingletonHolder{static ConnectionFactory INSTANCE = new ConnectionFactory(null);} }
2、模版
package com.broadsense.iov.base.jms;import org.springframework.jms.core.JmsTemplate;
/**
* 模板厂
*
* @author flm
* 2017年10月13日
*/
public class JmsTemplateFactory {private final javax.jms.ConnectionFactory factory;private JmsTemplate topicJmsTemplate;private JmsTemplate queueJmsTemplate;public static JmsTemplateFactory getInstance(){return SingletonHolder.INSTANCE;}private JmsTemplateFactory(){this.factory = ConnectionFactory.getInstance();}public synchronized JmsTemplate getTopicJmsTemplate() {if (this.topicJmsTemplate == null) {this.topicJmsTemplate = createTemplate(this.factory, true);}return this.topicJmsTemplate;}public synchronized JmsTemplate getQueueJmsTemplate() {if (this.queueJmsTemplate == null) {this.queueJmsTemplate = createTemplate(this.factory, false);}return this.queueJmsTemplate;}private JmsTemplate createTemplate(javax.jms.ConnectionFactory factory, boolean pubSubDomain) {JmsTemplate template = new JmsTemplate(factory);template.setPubSubDomain(pubSubDomain);return template;}public static class SingletonHolder{static JmsTemplateFactory INSTANCE = new JmsTemplateFactory(null);} }
3、消费者 模版
package com.broadsense.iov.base.jms;import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import javax.jms.Destination; import javax.jms.MessageListener; import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQTopic; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.jms.listener.SimpleMessageListenerContainer; /*** JMS监听器 创建消费者* * @author flm* 2017年10月13日*/ public class JMSListener {private static final Logger LOGGER = LoggerFactory.getLogger(JMSListener.class);private static final Map<String, Destination> MQDESTS = new ConcurrentHashMap();
/**
* 开启一个 点对点的 消息队列监听 的消费者
*
* @param queueName 队列名称
* @param subName 订阅者的名字
* @param listener 监听
*/
public static synchronized void startJmsQueueListener(String queueName, MessageListener listener)
{startJmsQueueListener(queueName, null, listener);}public static synchronized void startJmsQueueListener(String queueName, String subName, MessageListener listener) {Destination dst = (Destination)MQDESTS.get("QUEUE_" + queueName);if (dst == null) {ActiveMQQueue mq = new ActiveMQQueue(queueName);startJmsListener(mq, subName, listener);MQDESTS.put("QUEUE_" + queueName, mq);} else {LOGGER.warn(queueName + " already started");}}
/**
* 开启 一对多 主题的 消息监听的消费者
*
* @param topicName 主题消息名称
* @param subName 订阅者的名字
* @param listener 监听
*/
public static synchronized void startJmsTopicListener(String topicName, MessageListener listener){startJmsTopicListener(topicName, null, listener);}public static synchronized void startJmsTopicListener(String topicName, String subName, MessageListener listener) {ActiveMQTopic mq = new ActiveMQTopic(topicName);startJmsListener(mq, subName, listener);MQDESTS.put("QUEUE_" + topicName, mq);}
/**
* 开始 消息监听器 消费者
*
* @param dest 目的地
* @param subName 持久订阅的名字
* @param msgListener 消息监听器
*/
private static void startJmsListener(Destination dest, String subName, MessageListener msgListener){javax.jms.ConnectionFactory factory = ConnectionFactory.getInstance();SimpleMessageListenerContainer listener = new SimpleMessageListenerContainer();listener.setConnectionFactory(factory);listener.setDestination(dest);listener.setMessageListener(msgListener);if ((subName != null) && (subName != "")) {listener.setDurableSubscriptionName(subName);}listener.start();} }
4、生产者 模版
package com.broadsense.iov.base.jms;import javax.jms.JMSException; import javax.jms.Message; import javax.jms.Session; import org.springframework.jms.core.JmsTemplate; import org.springframework.jms.core.MessageCreator;/*** 创建 jms生产者* * @author flm* 2017年10月13日*/ public class JMSPublisher {
/**
* 发送消息
* Topic 生产者
*
* @param dest 目的地
* @param msg 消息内容
*/
public static void sendTopicMessage(String dest, String msg){JmsTemplateFactory.getInstance().getTopicJmsTemplate().send(dest, new MessageCreator(msg){public Message createMessage(Session session) throws JMSException {return session.createTextMessage(this.val$msg);}});}
/**
* 发送消息
* Queue 生产者
*
* @param dest 目的地
* @param msg 消息内容
*/
public static void sendQueueMessage(String dest, String msg){JmsTemplateFactory.getInstance().getQueueJmsTemplate().send(dest, new MessageCreator(msg){public Message createMessage(Session session) throws JMSException {return session.createTextMessage(this.val$msg);}});} }
三、activemq的使用
1、创建一个junit测试,@Test 发布、接受、即可看到消息,mq管理后台也可以看到
package com.broadsense.iov.base.jms;import com.broadsense.iov.base.jms.JMSListener; import com.broadsense.iov.base.jms.JMSPublisher; import java.io.IOException; import java.util.logging.Level; import java.util.logging.Logger; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.TextMessage; import org.junit.Test;/**** @author flm*/ public class JMSPublisherTest {public JMSPublisherTest() {}/*** 生产者 发布消息* @throws */@Testpublic void testSendMessage() throws InterruptedException {for (int idx = 1; idx < 3; idx++) {/** 生产者 发布 消息到 queue/queue_b 的队列中*/JMSPublisher.sendQueueMessage("queue/queue_b", String.valueOf(idx * 1111));/** 生产者 发布消息 到 topic/send 的Topic 主题中 *///JMSPublisher.sendTopicMessage("topic/send", String.valueOf(idx * 1111)); }}/*** 消费者 订阅接受消息*/@Testpublic void receiver() {/** 消费者 订阅主题 topic/send 是否有消息发布,有侧打印出来 (通过 onMessage 监听)*//*JMSListener.startJmsTopicListener("topic/send", new MessageListener() {@Overridepublic void onMessage(Message message) {try {if (message instanceof TextMessage) {TextMessage msg = (TextMessage) message;System.out.println("== 收到一个JMS消息..." + msg.getText());} } catch (JMSException e) {e.printStackTrace();}}});*//** 消费者 订阅队列 queue/queue_b 是否有消息发布,有侧打印出来 (通过 onMessage 监听)*/JMSListener.startJmsQueueListener("queue/queue_b" ,new MessageListener() {@Overridepublic void onMessage(Message message) {try {if (message instanceof TextMessage) {TextMessage msg = (TextMessage) message;System.out.println("== 收到一个JMS消息..." + msg.getText());} } catch (JMSException e) {e.printStackTrace();}}});try {System.in.read();} catch (IOException ex) {Logger.getLogger(JMSPublisherTest.class.getName()).log(Level.SEVERE, null, ex);}}}
2、真正的项目实现
在项目的中具体实现,是加载一个类来实现订阅消息
加载启动一个订阅的主题,给一个类MQ()处理
package com.ifengSearch.track.dao;import org.springframework.stereotype.Repository;import com.broadsense.iov.base.jms.JMSListener;/*** 项目启动即 开启* 通过 spring 依赖加载 Lister 订阅topic/send* @author flm* @2017年10月16日*/ @Repository public class Lister {public Lister(){try {JMSListener.startJmsTopicListener("topic/send",new QM());// QM() 订阅 主题 topic/send} catch (Exception e) {}} }
MQ()订阅消息的处理类,通过实现
package com.ifengSearch.track.dao;import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.TextMessage;/*** 通过 实现 MessageListener 的 onMessage 来监听消息 * 接受、处理消息* @author flm* @2017年10月16日*/ public class MQ implements MessageListener {@Overridepublic void onMessage(Message message) {try {if (message instanceof TextMessage) {TextMessage msg = (TextMessage) message;System.out.println("== 收到一个JMS消息..." + msg.getText());} } catch (JMSException e) {e.printStackTrace();}} }