1、配置ActiveMQ连接工厂、JmsTemplate等
注意:需要开启@EnableJms。
注解@EnableJms
自动扫描带有@JmsListener
的Bean方法,并为其创建一个MessageListener
把它包装起来
import javax.jms.ConnectionFactory;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Conditional;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.annotation.EnableJms;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.config.JmsListenerContainerFactory;
import org.springframework.jms.connection.CachingConnectionFactory;
import org.springframework.jms.core.JmsTemplate;/*** ActiveMQ配置.<br>* @author gqltt<br>*/
@Configuration
@EnableJms
public class ActiveMQConfig {/*** 队列JmsTemplate的名称.*/public static final String JMS_TEMPLATE_QUEUE = "innerJmsQueueTemplate";/*** 订阅JmsTemplate的名称.*/public static final String JMS_TEMPLATE_TOPIC = "innerJmsTopicTemplate";/*** 队列JmsListenerContainerFactory的名称.*/public static final String JMS_CONTAINER_FACTORY_QUEUE = "innerJmsQueueListenerContainerFactory";/*** 订阅JmsListenerContainerFactory的名称.*/public static final String JMS_CONTAINER_FACTORY_TOPIC = "innerJmsTopicListenerContainerFactory";/*** 获取brokerURL.*/public static String getBrokerURL() {return PropertyConfigUtil.getProperty("spring.activemq.brokerURL");}/*** 获取userName.* @return*/public static String getUserName() {return PropertyConfigUtil.getProperty("spring.activemq.userName");}/*** 获取password.* @return*/public static String getPassword() {return PropertyConfigUtil.getProperty("spring.activemq.password");}/*** ActiveMQConnectionFactory.* @return*/@Conditional(ActiveMQCondition.class)@Bean(name = "innerActiveMQConnectionFactory")public ActiveMQConnectionFactory getActiveMQConnectionFactory() {final ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();connectionFactory.setBrokerURL(getBrokerURL());connectionFactory.setUserName(getUserName());connectionFactory.setPassword(getPassword());return connectionFactory;}/*** CachingConnectionFactory.* @param targetConnectionFactory* @return*/@Conditional(ActiveMQCondition.class)@Bean(name = "innerCachingConnectionFactory")public CachingConnectionFactory getCachingConnectionFactory(@Qualifier("abcInnerActiveMQConnectionFactory") ActiveMQConnectionFactory targetConnectionFactory) {final CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory();cachingConnectionFactory.setTargetConnectionFactory(targetConnectionFactory);cachingConnectionFactory.setSessionCacheSize(20);return cachingConnectionFactory;}/*** 队列JmsTemplate.* @param connectionFactory* @return*/@Conditional(ActiveMQCondition.class)@Bean(name = JMS_TEMPLATE_QUEUE)public JmsTemplate getJmsQueueTemplate(@Qualifier("innerCachingConnectionFactory") ConnectionFactory connectionFactory) {final JmsTemplate jmsTemplate = new JmsTemplate();jmsTemplate.setConnectionFactory(connectionFactory);jmsTemplate.setPubSubDomain(false);jmsTemplate.setReceiveTimeout(30_000);return jmsTemplate;}/*** 订阅JmsTemplate.* @param connectionFactory* @return*/@Conditional(ActiveMQCondition.class)@Bean(name = JMS_TEMPLATE_TOPIC)public JmsTemplate getJmsTopicTemplate(@Qualifier("innerCachingConnectionFactory") ConnectionFactory connectionFactory) {final JmsTemplate jmsTemplate = new JmsTemplate();jmsTemplate.setConnectionFactory(connectionFactory);jmsTemplate.setPubSubDomain(true);jmsTemplate.setReceiveTimeout(30_000);return jmsTemplate;}/*** 队列模式JmsListenerContainerFactory.* @param connectionFactory* @return*/@Conditional(ActiveMQCondition.class)@Bean(name = JMS_CONTAINER_FACTORY_QUEUE)public JmsListenerContainerFactory<?> getJmsQueueListenerContainerFactory(@Qualifier("innerCachingConnectionFactory") ConnectionFactory connectionFactory) {DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();factory.setConnectionFactory(connectionFactory);factory.setPubSubDomain(false);factory.setReceiveTimeout(30_000L);return factory;}/*** 订阅模式JmsListenerContainerFactory.* @param connectionFactory* @return*/@Conditional(ActiveMQCondition.class)@Bean(name = JMS_CONTAINER_FACTORY_TOPIC)public JmsListenerContainerFactory<?> getJmsTopicListenerContainerFactory(@Qualifier("innerCachingConnectionFactory") ConnectionFactory connectionFactory) {DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();factory.setConnectionFactory(connectionFactory);//设置为发布订阅方式, 默认情况下使用的生产消费者方式factory.setPubSubDomain(true);factory.setReceiveTimeout(30_000L);return factory;}}
注意:使用条件判断是否初始化ActiveMQ相关对象
import org.springframework.context.annotation.Condition;
import org.springframework.context.annotation.ConditionContext;
import org.springframework.core.type.AnnotatedTypeMetadata;/*** 是否初始化ActiveQM的判断.<br>* @author gqltt<br>*/
public class ActiveMQCondition implements Condition {/*** {@inheritDoc}*/@Overridepublic boolean matches(ConditionContext context, AnnotatedTypeMetadata metadata) {final String brokerUrl = ActiveMQConfig.getBrokerURL();return StringUtil.isNotEmpty(brokerUrl);}}
2、发送消息
/*** 发送保存成功【确认】信息.* @param gid*/private void sendPay(long gid) {try {final String msg = CastUtil.getString(gid);final JmsTemplate jmsTemplate = JmsTemplateUtil.getJmsQueueTemplate();jmsTemplate.convertAndSend(OrderListener.ORDER_PAYED_QUEUE, msg);} catch (Exception e) {if (LOG.isErrorEnabled()) {LOG.error("发送消息:" + OrderListener.ORDER_PAYED_QUEUE + "失败。", e);}}}
3、@JmsListener配置消息监听
import javax.annotation.PostConstruct;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;@Component
public class OrderListener {/*** 订单支付队列.*/public static final String ORDER_PAYED_QUEUE = "order_payed_queue";@Autowiredprivate IOrderService orderService;/*** 订单已支付,通知改订单状态.* @param msg*/@JmsListener(containerFactory = ActiveMQConfig.JMS_CONTAINER_FACTORY_QUEUE, destination = ORDER_PAYED_QUEUE)public void payed(final String msg) {if (StringUtil.isNullOrTrimEmptyString(msg)) {return;}final long gid = CastUtil.getLong(msg);orderService.updatePayed(gid);}
}
@JmsListener 接收元素的Message对象
@Component
public class MailMessageListener {final Logger logger = LoggerFactory.getLogger(getClass());@Autowired ObjectMapper objectMapper;@Autowired MailService mailService;@JmsListener(destination = "jms/queue/mail", concurrency = "10")public void onMailMessageReceived(Message message) throws Exception {logger.info("received message: " + message);if (message instanceof TextMessage) {String text = ((TextMessage) message).getText();MailMessage mm = objectMapper.readValue(text, MailMessage.class);mailService.sendRegistrationMail(mm);} else {logger.error("unable to process non-text message!");}}
}
参考:集成JMS - 廖雪峰的官方网站