JMS 消息队列接口基本使用指南

概述

介绍

JMS(Java Message Service)即 Java 消息服务应用程序接口,是一个 Java 平台中关于面向消息中间件(MOM)的 API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。Java 消息服务是一个与具体平台无关的 API,绝大多数 MOM 提供商都对JMS 提供支持。

简短来说,JMS 是一种与厂商无关的 API,是 sun 公司为了统一厂商的接口规范,而定义出的一组api接口,用来访问消息收发系统消息。它类似于 JDBC(Java Database Connectivity),提供了应用程序之间异步通信的功能。


JMS 体系结构

  • JMS 提供者(JMS 的实现者,比如 activemq、jbossmq、tonglinkmq 等)
  • JMS 客户(使用提供者发送消息的程序或对象,例如在 12306 中,负责发送一条购票消息到处理队列中,用来解决购票高峰问题,那么,发送消息到队列的程序和从队列获取消息的程序都叫做客户)
  • JMS 生产者(producer、sender):负责创建并发送消息的客户
  • JMS 消费者(customer、listener):负责接收并处理消息的客户
  • JMS 消息(message):在 JMS 客户之间传递数据的对象
  • JMS 队列(queue):一个容纳那些被发送的等待阅读的消息的区域
  • JMS 主题(topic):一种支持发送消息给多个订阅者的机制

JMS 对象模型

  • 连接工厂(connectionFactory)客户端使用 JNDI 查找连接工厂,然后利用连接工厂创建一个 JMS 连接
  • JMS 连接:表示 JMS 客户端和服务器端之间的一个活动的连接,是由客户端通过调用连接工厂的方法建立的
  • JMS 会话:session 标识 JMS 客户端和服务端的会话状态。会话建立在 JMS 连接上,标识客户与服务器之间的一个会话进程。
  • JMS 目的(Destination): 又称为消息队列,是实际的消息源
  • 生产者和消费者
  • 消息类型:分为队列类型(优先先进先出)以及订阅类型

消息监听器

MessageListener

MessageListener 是最原始的消息监听器,它是 JMS 规范中定义的一个接口。其中定义了一个用于处理接收到的消息的 onMessage() 方法,该方法只接收一个 Message 参数。

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;public class ConsumerMessageListener implements MessageListener {public void onMessage(Message message) {// 若生产者发送的是一个纯文本消息,可以直接进行强制转换,或者直接把onMessage方法的参数改成Message的子类TextMessageTextMessage textMsg = (TextMessage) message;System.out.println("接收到一个纯文本消息。");try {System.out.println("消息内容是:" + textMsg.getText());} catch (JMSException e) {e.printStackTrace();}}
}

SessionAwareMessageListener

SessionAwareMessageListener 是 Spring 提供的,它不是标准的 JMS MessageListener。

MessageListener 的设计只是纯粹用来接收消息的,假如在使用 MessageListener 处理接收到的消息时需要发送一个消息通知对方已经收到这个消息了,那么这个时候就需要在代码里面去重新获取一个 Connection 或 Session。而 SessionAwareMessageListener 的设计就是为了方便在接收到消息后发送一个回复的消息,它同样提供了一个处理接收到的消息的 onMessage() 方法,但是这个方法可以同时接收两个参数,一个是表示当前接收到的消息Message,另一个就是可以用来发送消息的 Session 对象。

使用 SessionAwareMessageListener 监听器,可以在监听并消费了消息后,不用重新获取一个 Connection 或 Session,而是直接向原 Connection 或 Session 的某一个队列发送消息。

代码示例:

import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.springframework.jms.listener.SessionAwareMessageListener;public class ConsumerSessionAwareMessageListener implements SessionAwareMessageListener {private Destination destination;public void onMessage(TextMessage message, Session session) throws JMSException {System.out.println("收到一条消息");System.out.println("消息内容是:" + message.getText());MessageProducer producer = session.createProducer(destination);Message textMessage = session.createTextMessage("ConsumerSessionAwareMessageListener。。。");producer.send(textMessage);}public Destination getDestination() {returndestination;}public void setDestination(Destination destination) {this.destination = destination;}
}

说明:定义了一个 SessionAwareMessageListener,在这个 Listener 中在接收到了一个消息之后,利用对应的 Session 创建了一个到 destination 的生产者和对应的消息,然后利用创建好的生产者发送对应的消息。


MessageListenerAdapter

MessageListenerAdapter 类实现了 MessageListener 接口和 SessionAwareMessageListener 接口,它的主要作用是将接收到的消息进行类型转换,然后通过反射的形式把它交给一个普通的 Java 类进行处理。

  • MessageListenerAdapter 会把接收到的消息做如下转换:

    • TextMessage 转换为 String 对象
    • BytesMessage 转换为 byte 数组
    • MapMessage 转换为 Map 对象
    • ObjectMessage 转换为对应的 Serializable 对象

    代码示例:

    // 目标处理器类
    public class ConsumerListener {  public void handleMessage(String message) {  System.out.println("ConsumerListener通过handleMessage接收到一个纯文本消息,消息内容是:" + message);  }  public void receiveMessage(String message) {  System.out.println("ConsumerListener通过receiveMessage接收到一个纯文本消息,消息内容是:" + message);  }  
    }  
    
    <!-- 消息监听适配器 -->  
    <bean id="messageListenerAdapter" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">  <property name="delegate">  <bean class="com.tiantian.springintejms.listener.ConsumerListener"/>  </property>  <property name="defaultListenerMethod" value="receiveMessage"/>  
    </bean>  <!-- 消息监听适配器对应的监听容器 -->  
    <bean id="messageListenerAdapterContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">  <property name="connectionFactory" ref="connectionFactory"/>  <property name="destination" ref="adapterQueue"/>  <!-- 使用MessageListenerAdapter来作为消息监听器 -->  <property name="messageListener" ref="messageListenerAdapter"/>
    </bean>  
    

    注意:

    • MessageListenerAdapter 会把接收到的消息做一个类型转换,然后利用反射把它交给真正的目标处理器:一个普通的 Java 类(ConsumerListener)进行处理。

      如果真正的目标处理器是一个 MessageListener 或者是一个 SessionAwareMessageListener,那么 Spring 将直接使用接收到的Message 对象作为参数调用它们的 onMessage 方法,而不会再利用反射去进行调用。

      故在定义一个 MessageListenerAdapter 的时候就需要为它指定这样一个目标类。这个目标类可以通过 MessageListenerAdapter 的构造方法参数指定,也可以通过它的 delegate 属性来指定。

  • MessageListenerAdapter 另外一个主要的功能是可以通过 MessageListenerAdapter 注入的 handleMessage 方法自动的发送返回消息。

    当用于处理接收到的消息的方法(默认是 handleMessage)的返回值不为空(null或者void)的时候,Spring 会自动将它封装为一个 JMS Message,然后自动进行回复。这个回复消息将发送到的地址主要有两种方式可以指定:

    • 可以通过发送的 Message 的 setJMSReplyTo 方法指定该消息对应的回复消息的目的地
    • 通过 MessageListenerAdapter 的 defaultResponseDestination 属性来指定

基本使用

依赖

<!-- jms -->
<dependency><groupId>javax.jms</groupId><artifactId>javax.jms-api</artifactId>
</dependency>
<!-- spring jms -->
<dependency><groupId>org.springframework</groupId><artifactId>spring-jms</artifactId>
</dependency><!-- tonglinkMq jms api -->
<dependency><groupId>com.tongtech.tlq</groupId><artifactId>TongJMS-without-atomikos</artifactId><version>8.1.0-SNAPSHOT</version>
</dependency>

SpringBoot 集成 jms

jms 配置类

import org.springframework.jms.connection.CachingConnectionFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.connection.UserCredentialsConnectionFactoryAdapter;
import org.springframework.jms.core.JmsOperations;
import org.springframework.jms.core.JmsTemplate;
import org.tongtech.tmqi.ConnectionFactory;@EnableJms	// 声明对 JMS 注解的支持
@Configuration
public class TestCreator {private String host;private Integer port;private String queueManager;private String channel;private String username;private String password;private int ccsid;private String queueName;private long receiveTimeout;// 配置连接工厂(tonglinkMq)@Beanpublic ConnectionFactory connectionFactory() throws JMSException {ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setProperty("tmqiAddressList", "tlq://127.0.0.1:10024");connectionFactory.setProperty("tmqiDefaultUsername", "admin");connectionFactory.setProperty("tmqiDefaultPassword", "123456");return connectionFactory;}// 配置缓存连接工厂 不配置该类则每次与MQ交互都需要重新创建连接,大幅降低速度。@Bean@Primarypublic CachingConnectionFactory cachingConnectionFactory(ConnectionFactory connectionFactory) {CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory();cachingConnectionFactory.setTargetConnectionFactory(connectionFactory);cachingConnectionFactory.setSessionCacheSize(500);cachingConnectionFactory.setReconnectOnException(true);return cachingConnectionFactory;}// 配置DefaultJmsListenerContainerFactory, 用@JmsListener注解来监听队列消息时,尤其存在多个监听的时候,通过实例化配置DefaultJmsListenerContainerFactory来控制消息分发@Bean(name = "jmsQueueListenerCF")public DefaultJmsListenerContainerFactory jmsQueueListenerContainerFactory(CachingConnectionFactory cachingConnectionFactory) {DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();factory.setConnectionFactory(cachingConnectionFactory);// 设置连接数。如果对消息消费有顺序要求,这里建议设置为"1-1"// 注:使用同一个监听工厂类监听多个队列时,连接数需大于等于监听队列数factory.setConcurrency("3-10");	// 下限-上限// 重连间隔时间factory.setRecoveryInterval(1000L);// factory.setPubSubDomain(true);	// 支持发布订阅功能(topic)// factory.setConcurrency("1"); 	// topic 模式,并发必须设置为1,不然一条消息可能会被消费多次return factory;}// 配置JMS模板,实例化jmsTemplate后,可以在方法中通过@autowired的方式注入模板,用方法调用发送/接收消息// 注:如果只是接收消息,可以不配置此步@Beanpublic JmsTemplate jmsQueueTemplate(CachingConnectionFactory cachingConnectionFactory) {JmsTemplate jmsTemplate = new JmsTemplate(cachingConnectionFactory);jmsTemplate.setReceiveTimeout(receiveTimeout); // 设置超时时间// jmsTemplate.setPubSubDomain(true);	// 开启发布订阅功能(topic)return jmsTemplate;}
}

发送消息

public class jmsUtil {@Autowiredprivate JmsTemplate jmsQueueTemplate;/*** 发送原始消息 Message*/public void send(){jmsQueueTemplate.send("queue1", new MessageCreator() {@Overridepublic Message createMessage(Session session) throws JMSException {return session.createTextMessage("我是原始消息");}});}/*** 发送消息自动转换成原始消息* 注:关于消息转换,还可以通过实现MessageConverter接口来自定义转换内容*/public void convertAndSend(){jmsQueueTemplate.convertAndSend("queue1", "我是自动转换的消息");}
}

监听接收消息

采用注解 @JmsListener 来设置监听方法

@Slf4j
@Component
// 此处继承MessageListenerAdapter非必需。但若只使用@JmsListener注解监听,可能会出现监听消息获取不及时或者获取不到消息的情况,加上继承MessageListenerAdapter后便不会出现
public class MdxpMessageListener extends MessageListenerAdapter {/*** 消息队列监听器* destination 队列地址,此处使用静态变量,支持配置化详见下文* containerFactory 监听器容器工厂(包含配置源), 若存在2个以上的监听容器工厂,需进行指定*/@Override@JmsListener(destination = "TEST_QUEUE",containerFactory = "jmsQueueListenerCF")public void onMessage(Message message) {// JmsListener收到消息后,会自动封装成自己特有的数据格式,需要自行根据消息类型解析原始消息String msgText = ""; double d = 0; try { if (msg instanceof TextMessage) {    msgText = ((TextMessage) msg).getText(); } else if (msg instanceof StreamMessage) {    msgText = ((StreamMessage) msg).readString();    d = ((StreamMessage) msg).readDouble();    } else if (msg instanceof BytesMessage) {    byte[] block = new byte[1024];    ((BytesMessage) msg).readBytes(block);    msgText = String.valueOf(block);    } else if (msg instanceof MapMessage) {    msgText = ((MapMessage) msg).getString("name");    }log.info("接收消息={}", msgText);} catch (JMSException e) { log.error("消息接收异常!", e);}}@JmsListener(destination = "TEST_QUEUE2",containerFactory = "jmsQueueListenerCF")// @Payload是消费者接受生产者发送的队列消息,将队列中的json字符串变成对象的注解,注意填充类需要实现序列化接口public void messageListener2(@payload User user){log.info("message={}", user)}
}

@JmsListener 注解 destination 支持配置化

注入配置读取类

import lombok.Data;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;/*** 队列名称配置* 这里切记要@Data,或手动set和get*/
@Component
@Data
public class QueueNameConfig {@Value("${ibmmq.queue-test}")private String testQueue;}

队列监听类

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.jms.listener.adapter.MessageListenerAdapter;
import org.springframework.stereotype.Component;
import javax.jms.Message;
import javax.jms.TextMessage;/*** MQ消费者*/
@Component
@Slf4j
public class ReceiveMessage extends MessageListenerAdapter {/*** destination:监听的队列名称,使用SpEL表达式写入* containerFactory:监听的工厂类,为配置类中所配置的名字*/@Override@JmsListener(destination = "#{@queueNameConfig.testQueue}", containerFactory = "jmsListenerContainerFactory")public void onMessage(Message message) {TextMessage textMessage = (TextMessage) message;  //转换成文本消息try {String text = textMessage.getText();log.info("接收信息:{}", text);} catch (Exception e) {e.printStackTrace();}}
}

javax 原生 jms

public class jmstest {public static void main(String[] args) throws Exception { // 配置工厂ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setProperty("tmqiAddressList", "tlq://127.0.0.1:10024");connectionFactory.setProperty("tmqiDefaultUsername", "admin");connectionFactory.setProperty("tmqiDefaultPassword", "123456");// 获取连接和会话Connection mqConn = connectionFactory.createConnection(); // 创建会话。CLIENT_ACKNOWLEDGE:手动应答,AUTO_ACKNOWLEDGE:自动应答Session mqSession = mqConn.createQueueSession(false, Session.CLIENT_ACKNOWLEDGE); // 创建队列Queue queuemq = Session.createQueue(queueName);// 获取消费者MessageConsumer consumer = mqSession.createConsumer(mqSession.createQueue(queueName)); // 设置监听器consumer.setMessageListener(new MessageListener() { public void onMessage(Message msg) { // JmsListener收到消息后,会自动封装成自己特有的数据格式,需要自行根据消息类型解析原始消息String msgText = ""; double d = 0; try { if (msg instanceof TextMessage) {msgText = ((TextMessage) msg).getText(); } else if (msg instanceof StreamMessage) {    msgText = ((StreamMessage) msg).readString();    d = ((StreamMessage) msg).readDouble();    } else if (msg instanceof BytesMessage) {    byte[] block = new byte[1024];    ((BytesMessage) msg).readBytes(block);    msgText = String.valueOf(block);    } else if (msg instanceof MapMessage) {    msgText = ((MapMessage) msg).getString("name");    }log.info("接收消息={}", msgText);// 手动应答textMessage.acknowledge();} catch (JMSException e) { log.error("消息接收异常!", e);}}}); // 启动连接mqConn.start(); }// 获取生产者MessageProducer producer = mqSession.createProducer(mqSession.createQueue(queueName)); // topic(广播)模式// Topic topic = Session.createTopic(queueName);// MessageProducer producer = mqSession.createProducer(topic); producer.setDeliveryMode(DeliveryMOde.NON_PERSISTENT);producer.send(mqSession.createTexttMessage("这是一条消息"));// 关闭资源producer.close();// 断开连接connection.close();
} 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/36076.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

[保研/考研机试] KY103 2的幂次方 上海交通大学复试上机题 C++实现

题目链接&#xff1a; KY103 2的幂次方 https://www.nowcoder.com/share/jump/437195121691999575955 描述 Every positive number can be presented by the exponential form.For example, 137 2^7 2^3 2^0。 Lets present a^b by the form a(b).Then 137 is present…

k8s containerd 配置 http访问harbor image【最新--官方文档】

不看官方文档的代价&#xff1a;在搜索了很多中文资料发现配置了都不起作用&#xff0c;浪费了很多时间。 https://github.com/containerd/containerd/blob/main/docs/cri/config.md#registry-configuration The old CRI config pattern for specifying registry.mirrors and…

MySQL8安装和删除教程 保姆级(Windows)

下载 官网: mysql官网点击Downloads->MySQL Community(GPL) Downloads->MySQL Community Server(或者点击MySQL installer for Windows) Windows下有两种安装方式 在线安装 一般带有 web字样 这个需要联网离线安装 一般没有web字样 安装 下载好之后,版本号可以不一样&…

Postman中,既想传递文件,还想传递多个参数(后端)

需求:既想传文件又想传多个参数可以用以下方式实现

Django rest_framework Serializer中的create、Views中的create/perform_create的区别

Django rest_framework Serializer中的create、Views中的create/perform_create的区别 对于后端来说&#xff0c;前后端分离的方式能让前后端的开发都爽。和所有的爽一样&#xff0c;每爽一次都要付出一定的代价。而前后端分离的代价&#xff0c;就是后端要面对巨量的模块化的功…

C语言实现插入排序

什么是插入排序&#xff1f; 插入排序&#xff08;Insertion Sort&#xff09; 是一种简单且逐步构建有序序列的排序算法。它的思想是将数组分为两部分&#xff1a;已排序的部分和未排序的部分。初始时&#xff0c;已排序部分只包含数组的第一个元素&#xff0c;然后逐步将未排…

Process.Start 报错

Process.Start 报错 System.Diagnostics.Process.StartWithShellExecuteEx Process.Start 为什么会引发“系统找不到指定的文件”异常 Process.Start 报错 找不到路径 ,System.ComponentModel.Win32Exception:“系统找不到指定的文件。 问题1、 在WinForm中可能是权限问题&…

做了这件事,精准拿捏企业资产管理!

资产管理系统是一种为组织和个人提供管理各类资产的重要工具。无论是金融资产还是实物资产&#xff0c;这些都构成了一个实体或个人财务状况的重要组成部分。 无论是企业寻求优化其固定资产维护&#xff0c;还是个人希望更好地管理他们的投资组合&#xff0c;资产管理系统在现代…

NZ系列工具NZ02:VBA读取PDF使用说明

【分享成果&#xff0c;随喜正能量】时光绽放并蒂莲&#xff0c;更是一份殷殷嘱托&#xff0c;更是一份诚挚祝福&#xff0c;是一份时光馈赠&#xff0c;又是一份时光陪伴。。 我的教程一共九套及VBA汉英手册一部&#xff0c;分为初级、中级、高级三大部分。是对VBA的系统讲解…

“深入解析JVM:探索Java虚拟机的工作原理与优化技巧“

标题&#xff1a;深入解析JVM&#xff1a;探索Java虚拟机的工作原理与优化技巧 摘要&#xff1a;本文将深入探讨Java虚拟机&#xff08;JVM&#xff09;的工作原理、内部结构以及如何优化Java应用程序的性能。我们将介绍JVM的主要组件&#xff0c;包括类加载器、运行时数据区域…

关于openssl SM2 ECC以及密钥生成和签名验签

SM2是基于ECC的国密算法,本身也是ECC算法。 openssl生成ECC公私钥并签名验签 #!/bin/sh openssl ecparam -genkey -name prime256v1 -out private.pem #print pri #openssl ec -in private.pem -text -noout openssl ec -in private.pem -pubout -out public.pem #gen test.…

uniapp+uview封装小程序请求

提要&#xff1a; uniapp项目引入uview库 此步骤不再阐述 1.创建环境文件 env.js&#xff1a; let BASE_URL;if (process.env.NODE_ENV development) {// 开发环境BASE_URL 请求地址; } else {// 生产环境BASE_URL 请求地址; }export default BASE_URL; 2.创建请求文件 该…

QLExpress动态脚本引擎解析工具

介绍 QLExpress脚本引擎 1、线程安全&#xff0c;引擎运算过程中的产生的临时变量都是threadlocal类型。 2、高效执行&#xff0c;比较耗时的脚本编译过程可以缓存在本地机器&#xff0c;运行时的临时变量创建采用了缓冲池的技术&#xff0c;和groovy性能相当。 3、弱类型脚本…

广西Geotrust单位多域名https证书推荐

Geotrust是国际知名CA认证机构&#xff0c;根证书是Digicert&#xff0c;还有RapidSSL、QuickSSL等子品牌&#xff0c;拥有多种类型的多域名https证书&#xff0c;比如OV企业型https证书和EV增强型多域名https证书。那么&#xff0c;哪种多域名https证书更适合企事业单位使用呢…

SpringBoot复习:(43)如何以war包的形式运行SpringBoot程序

一、.pom.xml配置packging为war <?xml version"1.0" encoding"UTF-8"?> <project xmlns"http://maven.apache.org/POM/4.0.0"xmlns:xsi"http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation"http://maven…

Android 内存泄漏

名词解释 内存泄漏:即memory leak。是指内存空间使用完毕后无法被释放的现象&#xff0c;虽然Java有垃圾回收机制&#xff08;GC&#xff09;&#xff0c;但是对于还保持着引用&#xff0c; 该内存不能再被分配使用&#xff0c;逻辑上却已经不会再用到的对象&#xff0c;垃圾回…

react如何实现数据渲染

React数据渲染是指将组件中的数据映射到页面上&#xff0c;以展示出来。在React中&#xff0c;数据渲染通常是通过JSX和组件的state或props完成的。 JSX是一个类似HTML的语法&#xff0c;可以在其中嵌入JavaScript表达式。在JSX中&#xff0c;可以使用{}包裹JavaScript表达式&…

解决C语言中使用scanf输入字符串导致for循环失效的问题

在C语言编程中&#xff0c;使用scanf函数输入字符串是一项基本操作。然而&#xff0c;当我们尝试在for循环中使用scanf输入字符串时&#xff0c;可能会遇到意外的问题&#xff0c;导致循环无法正常执行。本文将深入探讨这个问题&#xff0c;并提供解决方案&#xff0c;让你能够…

考公-判断推理-定义判断

第九节课 例题 例题 例题 例题 例题 例题 脚一滑&#xff0c;就是工伤&#xff0c;这难道不是操作不当吗 例题 不要较真&#xff0c;公务员&#xff0c;把没有全局观念的人排除在公务员队伍之外 例题 例题 下次看到不字&#xff0c;先给我画上 例题 例题 例题 例题…

微信群聊微信机器人实现流程

1.注册微信账号 要使用一个微信机器人账号来实现在微信群聊中的自动回复功能&#xff0c;你需要注册一个专门用于机器人的微信账号。 注册微信机器人账号的步骤如下&#xff1a; 下载微信&#xff1a;在手机或者电脑上下载并安装微信应用程序。创建新账号&#xff1a;打开微信…