JMS 在 SpringBoot 中的使用

当前环境

  1. Mac OS 10.11.x
  2. docker 1.12.1
  3. JDK 1.8
  4. SpringBoot 1.5

前言

基于之前一篇“一个故事告诉你什么是消息队列”,了解了消息队列的使用场景以及相关的特性。本文主要讲述消息服务在 JAVA 中的使用。

市面上的有关消息队列的技术选型非常多,如果我们的代码框架要支持不同的消息实现,在保证框架具有较高扩展性的前提下,我们势必要进行一定的封装。

在 JAVA 中,大可不必如此。因为 JAVA 已经制定了一套标准的 JMS 规范。该规范定义了一套通用的接口和相关语义,提供了诸如持久、验证和事务的消息服务,其最主要的目的是允许Java应用程序访问现有的消息中间件。就和 JDBC 一样。

基本概念

在介绍具体的使用之前,先简单介绍一下 JMS 的一些基本知识。这里我打算分为 3 部分来介绍,即 消息队列(MQ)的连接、消息发送与消息接收。

这里我们的技术选型是 SpringBoot、JMS、ActiveMQ

为了更好的理解 JMS,这里没有使用 SpringBoot 零配置来搭建项目

MQ 的连接

使用 MQ 的第一步一定是先连接 MQ。因为这里使用的是 JMS 规范,对于任何遵守 JMS 规范的 MQ 来说,都会实现相应的ConnectionFactory接口,因此我们只需要创建一个ConnectionFactory工厂类,由它来实现 MQ 的连接,以及封装一系列特性的 MQ 参数。

例子:这里我们以 ActiveMQ 为例,

maven 依赖:

<parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>1.5.3.RELEASE</version> </parent> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-activemq</artifactId> </dependency> </dependencies>

创建 ActiveMQ 连接工厂:

@Bean
public ConnectionFactory connectionFactory(){ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(); connectionFactory.setBrokerURL(ActiveMQ_URL); connectionFactory.setUserName(ActiveMQ_USER); connectionFactory.setPassword(ActiveMQ_PASSWORD); return connectionFactory; }

消息发送

关于消息的发送,是通过 JMS 核心包中的JmsTemplate类来实现的,它简化了 JMS 的使用,因为在发送或同步接收消息时它帮我们处理了资源的创建和释放。从它的作用也不难推测出,它需要引用我们上面创建的连接工厂,具体代码如下:

@Bean
public JmsTemplate jmsQueueTemplate(){return new JmsTemplate(connectionFactory()); }

JmsTemplate创建完成后,我们就可以调用它的方法来发送消息了。这里有两个概念需要注意:

  1. 消息会发送到哪里?-> 即需要指定发送队列的目的地(Destination),是可以在 JNDI 中进行存储和提取的 JMS 管理对象。
  2. 发送的消息体具体是什么?-> 实现了javax.jms.Message的对象,类似于 JAVA RMI 的 Remote 对象。

代码示例:

@Autowired
private JmsTemplate jmsQueueTemplate;/**
 * 发送原始消息 Message  */ public void send(){ jmsQueueTemplate.send("queue1", new MessageCreator() { @Override public Message createMessage(Session session) throws JMSException { return session.createTextMessage("我是原始消息"); } }); }

优化:当然,我们不用每次都通过MessageCreator匿名类的方式来创建Message对象,JmsTemplate类中提供了对象实体自动转换为Message对象的方法,convertAndSend(String destinationName, final Object message)

优化代码示例:

/**
 * 发送消息自动转换成原始消息
 */
public void convertAndSend(){ jmsQueueTemplate.convertAndSend("queue1", "我是自动转换的消息"); }

注:关于消息转换,还可以通过实现MessageConverter接口来自定义转换内容

消息接收

讲完了消息发送,我们最后来说说消息是如何接收的。消息既然是以Message对象的形式发送到指定的目的地,那么消息的接收势必会去指定的目的地上去接收消息。这里采用的是监听者的方式来监听指定地点的消息,采用注解@JmsListener来设置监听方法。

代码示例:

@Component
public class Listener1 {@JmsListener(destination = "queue1") public void receive(String msg){ System.out.println("监听到的消息内容为: " + msg); } }

有了监听的目标和方法后,监听器还得和 MQ 关联起来,这样才能运作起来。这里的监听器可能不止一个,如果每个都要和 MQ 建立连接,肯定不太合适。所以需要一个监听容器工厂的概念,即接口JmsListenerContainerFactory,它会引用上面创建好的与 MQ 的连接工厂,由它来负责接收消息以及将消息分发给指定的监听器。当然也包括事务管理、资源获取与释放和异常转换等。

代码示例:

@Bean
public DefaultJmsListenerContainerFactory jmsQueueListenerContainerFactory() {DefaultJmsListenerContainerFactory factory =new DefaultJmsListenerContainerFactory(); factory.setConnectionFactory(connectionFactory()); //设置连接数 factory.setConcurrency("3-10"); //重连间隔时间 factory.setRecoveryInterval(1000L); return factory; }

场景

代码地址:https://github.com/jasonGeng88/springboot-jms

对 JMS 有了基本的理解后,我们就来在具体的场景中使用一下。

首先,我们需要先启动 ActiveMQ,这里我们以 Docker 容器化的方式进行启动。

启动命令:

docker run -d -p 8161:8161 -p 61616:61616 --name activemq webcenter/activemq

启动成功后,在 ActiveMQ 可视化界面查看效果(http://localhost:8161):


点对点模式(单消费者)

下面介绍消息队列中最常用的一种场景,即点对点模式。基本概念如下:

  1. 每个消息只能被一个消费者(Consumer)进行消费。一旦消息被消费后,就不再在消息队列中存在。
  2. 发送者和接收者之间在时间上没有依赖性,也就是说当发送者发送了消息之后,不管接收者有没有正在运行,它不会影响到消息被发送到队列。
  3. 接收者在成功接收消息之后需向队列应答成功。

代码实现(为简化代码,部分代码沿用上面所述):

  • 启动文件(Application.java)
@SpringBootApplication
@EnableJms
public class Application {... /**  * JMS 队列的模板类  * connectionFactory() 为 ActiveMQ 连接工厂  */ @Bean public JmsTemplate jmsQueueTemplate(){ return new JmsTemplate(connectionFactory()); } public static void main(String[] args) { SpringApplication.run(Application.class, args); } }

注解@EnableJms设置在@Configuration类上,用来声明对 JMS 注解的支持。

  • 消息生产者(PtpProducer.java)
@Component
public class PtpProducer {@Autowiredprivate JmsTemplate jmsQueueTemplate; /**  * 发送消息自动转换成原始消息  */ public void convertAndSend(){ jmsQueueTemplate.convertAndSend("ptp", "我是自动转换的消息"); } }
  • 生产者调用类(PtpController.java)
@RestController
@RequestMapping(value = "/ptp") public class PtpController { @Autowired private PtpProducer ptpProducer; @RequestMapping(value = "/convertAndSend") public Object convertAndSend(){ ptpProducer.convertAndSend(); return "success"; } }
  • 消息监听容器工厂
@SpringBootApplication
@EnableJms
public class Application {... /**  * JMS 队列的监听容器工厂  */ @Bean(name = "jmsQueueListenerCF") public DefaultJmsListenerContainerFactory jmsQueueListenerContainerFactory() { DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory(); factory.setConnectionFactory(connectionFactory()); //设置连接数 factory.setConcurrency("3-10"); //重连间隔时间 factory.setRecoveryInterval(1000L); return factory; } ... }
  • 消息监听器
@Component
public class PtpListener1 {/**  * 消息队列监听器  * destination 队列地址  * containerFactory 监听器容器工厂, 若存在2个以上的监听容器工厂,需进行指定  */ @JmsListener(destination = "ptp", containerFactory = "jmsQueueListenerCF") public void receive(String msg){ System.out.println("点对点模式1: " + msg); } }

演示

启动项目启动后,通过 REST 接口的方式来调用消息生产者发送消息,请求如下:

curl -XGET 127.0.0.1:8080/ptp/convertAndSend

消费者控制台信息:

ActiveMQ 控制台信息:

列表说明:

  • Name:队列名称。
  • Number Of Pending Messages:等待消费的消息个数。
  • Number Of Consumers:当前连接的消费者数目,因为我们采用的是连接池的方式连接,初始连接数为 3,所以显示数字为 3。
  • Messages Enqueued:进入队列的消息总个数,包括出队列的和待消费的,这个数量只增不减。
  • Messages Dequeued:出了队列的消息,可以理解为是已经消费的消息数量。

点对点模式(多消费者)

基于上面一个消费者消费的模式,因为生产者可能会有很多,同时像某个队列发送消息,这时一个消费者可能会成为瓶颈。所以需要多个消费者来分摊消费压力(消费线程池能解决一定压力,但毕竟在单机上,做不到分布式分布,所以多消费者是有必要的),也就产生了下面的场景。

代码实现

  • 添加新的监听器
@Component
public class PtpListener2 {@JmsListener(destination = Constant.QUEUE_NAME, containerFactory = "jmsQueueListenerCF") public void receive(String msg){ System.out.println("点对点模式2: " + msg); } }

演示

这里我们发起 10 次请求,来观察消费者的消费情况:

这里因为监听容器设置了线程池的缘故,在实际消费过程中,监听器消费的顺序会有所差异。

发布订阅模式

除了点对点模式,发布订阅模式也是消息队列中常见的一种使用。试想一下,有一个即时聊天群,你在群里发送一条消息。所有在这个群里的人(即订阅了该群的人),都会收到你发送的信息。

基本概念:

  1. 每个消息可以有多个消费者。
  2. 发布者和订阅者之间有时间上的依赖性。针对某个主题(Topic)的订阅者,它必须创建一个订阅者之后,才能消费发布者的消息。
  3. 为了消费消息,订阅者必须保持运行的状态。

代码实现

  • 修改 JmsTemplate 模板类,使其支持发布订阅功能
@SpringBootApplication
@EnableJms
public class Application {... @Bean public JmsTemplate jmsTopicTemplate(){ JmsTemplate jmsTemplate = new JmsTemplate(connectionFactory()); jmsTemplate.setPubSubDomain(true); return jmsTemplate; } ... }
  • 消息生产者(PubSubProducer.java)
@Component
public class PtpProducer {@Autowiredprivate JmsTemplate jmsTopicTemplate; public void convertAndSend(){ jmsTopicTemplate.convertAndSend("topic", "我是自动转换的消息"); } }
  • 生产者调用类(PubSubController.java)
@RestController
@RequestMapping(value = "/pubsub") public class PtpController { @Autowired private PubSubProducer pubSubProducer; @RequestMapping(value = "/convertAndSend") public String convertAndSend(){ pubSubProducer.convertAndSend(); return "success"; } }
  • 修改 DefaultJmsListenerContainerFactory 类,使其支持发布订阅功能
@SpringBootApplication
@EnableJms
public class Application {... /**  * JMS 队列的监听容器工厂  */ @Bean(name = "jmsTopicListenerCF") public DefaultJmsListenerContainerFactory jmsTopicListenerContainerFactory() { DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory(); factory.setConnectionFactory(connectionFactory()); factory.setConcurrency("1"); factory.setPubSubDomain(true); return factory; } ... }
  • 消息监听器(这里设置2个订阅者)
@Component
public class PubSubListener1 {@JmsListener(destination = "topic", containerFactory = "jmsTopicListenerCF") public void receive(String msg){ System.out.println("订阅者1 - " + msg); } } @Component public class PubSubListener2 { @JmsListener(destination = "topic", containerFactory = "jmsTopicListenerCF") public void receive(String msg){ System.out.println("订阅者2 - " + msg); } }

演示

curl -XGET 127.0.0.1:8080/pubSub/convertAndSend

消费者控制台信息:

ActiveMQ 控制台信息:

总结

这里只是对 SpringBoot 与 JMS 集成的简单说明与使用,详细的介绍可以查看 Spring 的官方文档,我这里也有幸参与 并发编程网 发起的 Spring 5 的翻译工作,我主要翻译了 Spring 5 的 JMS 章节,其内容对于上述 JMS 的基本概念,都有详细的展开说明,有兴趣的可以看一下,当然翻译水平有限,英文好的建议看原文。

转载于:https://www.cnblogs.com/niit-soft-518/p/6957384.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/400779.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

MSP432P401R TI Drivers 库函数学习笔记(二)认识TI-RTOS (TI-POSIX)

目录简介TI-RTOSFreeRTOSPOSIX运行时对象查看器 (Runtime Object Viewer)TI-POSIX 介绍在源代码中使用 POSIXTI-POSIX支持的函数摘要线程函数调用的前后关系线程管理线程属性线程同步障碍属性条件变量条件变量属性互斥锁互斥属性读写锁定读写锁属性辅助函数调用的前后关系时钟消…

arcgis创建postgre企业级数据库

&#x1f680; 优质资源分享 &#x1f680; 学习路线指引&#xff08;点击解锁&#xff09;知识定位人群定位&#x1f9e1; Python实战微信订餐小程序 &#x1f9e1;进阶级本课程是python flask微信小程序的完美结合&#xff0c;从项目搭建到腾讯云部署上线&#xff0c;打造一…

[floyd+路径输出]HDU1385 Minimum Transport Cost

题目链接 题目翻译: 有N个城市&#xff0c;然后直接给出这些城市之间的邻接矩阵&#xff0c;矩阵中-1代表那两个城市无道路相连&#xff0c;其他值代表路径长度。 如果一辆汽车经过某个城市&#xff0c;必须要交一定的钱(可能是过路费)。 现在要从a城到b城&#xff0c;花费为路…

MSP432P401R TI Drivers 库函数学习笔记(三)认识任务的创建及图形化配置

目录任务的简单创建图形化配置上手简简单单点个灯配置引脚建立任务实验结果整体代码main.cmyTask.cmyTask.h平台&#xff1a;Code Composer Studio 10.4.0 MSP432P401R SimpleLink™ 微控制器 LaunchPad™ 开发套件 (MSP-EXP432P401R) 任务的简单创建 根据上篇机翻的函数作用…

JavaScript实现自适应宽度的瀑布流

摘要: 主要介绍瀑布流的一种实现方法&#xff1a;绝对定位(css)javascriptajaxjson。简单一点如果不做滚动加载的话就是绝对定位(css)javascript了&#xff0c;ajax和json是滚动加载更多内容的时候用到的。 这样的布局并不陌生&#xff0c;从2011年Pinterest创立以来&#xff0…

.net web 开发平台- 表单设计器 一(web版)

如今为了适应需求的不断变化&#xff0c;动态表单设计器应运而生。它主要是为了满足界面的不断变化和提高开发速度。比如&#xff1a;一些页面客户可能也无法确定页面的终于布局&#xff0c;控件的位置&#xff0c;在哪种情况下显示或不显示等可能须要随时改动。为了应对这些需…

.NET程序配置文件操作(ini,cfg,config)

&#x1f680; 优质资源分享 &#x1f680; 学习路线指引&#xff08;点击解锁&#xff09;知识定位人群定位&#x1f9e1; Python实战微信订餐小程序 &#x1f9e1;进阶级本课程是python flask微信小程序的完美结合&#xff0c;从项目搭建到腾讯云部署上线&#xff0c;打造一…

MSP432P401R TI Drivers 库函数学习笔记(四)GPIO

目录API头文件函数 (机翻)宏GPIO驱动程序api返回的通用状态代码GPIO_PinConfig输出引脚配置宏GPIO_PinConfig输入引脚配置宏GPIO_PinConfig中断配置宏特殊的GPIO_PinConfig配置宏类型别名示例配置引脚示例配置外部中断示例上机实战配置引脚main.cmyTask.cmyTask.h实验结果平台&…

# Vue3 toRef 和 toRefs 函数

&#x1f680; 优质资源分享 &#x1f680; 学习路线指引&#xff08;点击解锁&#xff09;知识定位人群定位&#x1f9e1; Python实战微信订餐小程序 &#x1f9e1;进阶级本课程是python flask微信小程序的完美结合&#xff0c;从项目搭建到腾讯云部署上线&#xff0c;打造一…

二分检索函数lower_bound()和upper_bound()

二分检索函数lower_bound()和upper_bound() 一、说明 头文件&#xff1a;<algorithm> 二分检索函数lower_bound()和upper_bound() lower_bound()&#xff1a;找到大于等于某值的第一次出现upper_bound()&#xff1a;找到大于某值的第一次出现必须从小到大排序后才能用 内…

MSP432P401R TI Drivers 库函数学习笔记(五)PWM

目录API (机翻)函数上机实战配置引脚PWM初始化&#xff0c;实现简易呼吸灯的效果实验结果完整代码myPWM.cmyPWM.hmyTask.cmyTask.hmain.cmain.h平台&#xff1a;Code Composer Studio 10.4.0 MSP432P401R SimpleLink™ 微控制器 LaunchPad™ 开发套件 (MSP-EXP432P401R) API …

防火墙配置十大任务之十,构建虚拟防火墙

防火墙配置任务十构建虚拟防火墙任务拓扑图10.11.inside区域的交换机的基本配置&#xff0c;在交换机上开启vlan2&#xff0c;vlan3&#xff0c;vlan4.三个vlan。图10.22.outside区域的Internet基本配置。图10.33.交换机上连接防火墙接口的配置。图10.44.inside区域各个PC的主机…

使用纯 CSS 实现超酷炫的粘性气泡效果

&#x1f680; 优质资源分享 &#x1f680; 学习路线指引&#xff08;点击解锁&#xff09;知识定位人群定位&#x1f9e1; Python实战微信订餐小程序 &#x1f9e1;进阶级本课程是python flask微信小程序的完美结合&#xff0c;从项目搭建到腾讯云部署上线&#xff0c;打造一…

MSP432P401R TI Drivers 库函数学习笔记(六)UART 串口

目录API (机翻)上机实战配置初始化和实验实验结果整体代码myUart.cmyUart.hmyTask.cmyTask.hmain.cmain.h平台&#xff1a;Code Composer Studio 10.4.0 MSP432P401R SimpleLink™ 微控制器 LaunchPad™ 开发套件 (MSP-EXP432P401R) API (机翻) 官方UART API 手册 void UA…

Linux 批量杀死进程(详细版本)

&#x1f680; 优质资源分享 &#x1f680; 学习路线指引&#xff08;点击解锁&#xff09;知识定位人群定位&#x1f9e1; Python实战微信订餐小程序 &#x1f9e1;进阶级本课程是python flask微信小程序的完美结合&#xff0c;从项目搭建到腾讯云部署上线&#xff0c;打造一…

MSP432 库函数实现 PID 电机调角度、调速

目录引脚配置PWM引脚外部中断测量编码器引脚配置代码部分初始化编码器解读Encoder.cEncoder.h测速和控制部分卡尔曼滤波器&#xff0c;用于对所测速度进行滤波kalman.ckalman.h实验效果速度滤波效果控速效果控角效果平台&#xff1a;Code Composer Studio 10.4.0 MSP432P401R …

20.网页卷去的距离与偏移量

我们先来看看下面的图&#xff1a; scrollLeft:设置或获取位于给定对象左边界与窗口中目前可见内容的最左端之间的距离 &#xff0c;即左边灰色的内容。 scrollTop:设置或获取位于对象最顶端与窗口中可见内容的最顶端之间的距离 &#xff0c;即上边灰色的内容。 offsetLeft:获取…

【电赛】一阶卡尔曼滤波器 滤波效果良好

目录代码kalman.ckalman.h滤波效果很久以前抄的&#xff0c;忘了是从哪弄的了 我把它改成了这种结构体指针传参的形式&#xff0c;方便在比赛中应用。应用举例见MSP432 PID 电机调角度、调速。 它曾助力笔者获2020年电赛省一等奖。 代码 Q:过程噪声协方差 Q参数调滤波后的曲线…

计算机是如何启动的?

从打开电源到开始操作&#xff0c;计算机的启动是一个非常复杂的过程。 我一直搞不清楚&#xff0c;这个过程到底是怎么回事&#xff0c;只看见屏幕快速滚动各种提示......这几天&#xff0c;我查了一些资料&#xff0c;试图搞懂它。下面就是我整理的笔记。 零、boot的含义 先问…

hdu 1536(博弈)

传送门&#xff1a;S-Nim 题意&#xff1a;给n个数的集合s&#xff0c; 再给m 组数据&#xff0c;每组表示 k 堆石子&#xff0c;每次可以取的个数只能是集合s中的数量。问先手胜还是输&#xff1f; 分析&#xff1a;sg函数的经典运用&#xff0c;先预处理出所有数量为0~10000的…