JMS规范
- 一、JMS是什么
- 二、MQ中间件对比
- 三、JMS组成
- 1.JMS Provider
- 2.JMS Producer
- 3.JMS Consumer
- 4.JSM Message
- 4.1 消息头
- 4.2 消息体
- 4.2.1 生产者
- 4.2.2 消费者
- 4.3 消息属性
- 四、JMS可靠性
- 1.PERSISTENT - 持久化
- 1.1 参数设置
- 1.2 Queue持久化
- 1.3 Topic持久化
- 1.3.1 持久的发布主题生产者
- 1.3.2 持久的订阅主题消费者
- 2.Transaction - 事务
- 2.1 生产者开启事务
- 2.2 消费者开启事务
- 3.Acknowledge - 签收
- 3.1 案例-手动签收
- 4.签收和事务的关系
一、JMS是什么
- 首先需要区分JavaSE、JavaEE、JMS
- JavaSE:是一门编程语言
- JavaEE:
是一套使用Java进行企业级应用开发的大家一致遵循的13个核心规范工业标准
。JavaEE平台提供了一个基于组件的方法来加快设计,开发。装配及部署企业应用程序。- DBC(Java Databease)数据库连接
- JNDI(Java Naming and Directory Interfaces)Java的命令和目录接口
- EJB(Enterprise JavaBean)
- RMI(Remote Method Invoke)远程方法调用
- Java IDL(Interface Description Language)/CORBA(Common Object Broker Architecture)接口定义语言/共用对象请求代理程序体系结构
- JSP(Java Server Page)
- Servlet
- XML(Extensible Markup Language)可标记白标记语言
- JMS(Java Message Service)Java消息服务
- JTA(Java Transaction API)Java事务API
- JTS(Java Transaction Service)Java事务服务
- JavaMail
- JAF(JavaBean Activation Framework)
- JMS(Java Message Service)是JavaEE中的一个技术,是消息服务
- Java消息服务:指的是两个应用程序之间进行异步通信的API,它为标准协议和消息服务提供了一组通用接口,包括创建、发送、读取消息等,用于支持Java应用程序开发。在JavaEE中,当两个应用程序使用JMS进行通信时,它们之间不是直接相连的,而是通过一个共同的消息收发服务组件关联起来以达到解耦/异步削峰的效果。
- Java消息服务:指的是两个应用程序之间进行异步通信的API,它为标准协议和消息服务提供了一组通用接口,包括创建、发送、读取消息等,用于支持Java应用程序开发。在JavaEE中,当两个应用程序使用JMS进行通信时,它们之间不是直接相连的,而是通过一个共同的消息收发服务组件关联起来以达到解耦/异步削峰的效果。
二、MQ中间件对比
特性 | ActiveMQ | RabbitMQ | Kafka | RocketMQ |
---|---|---|---|---|
PRODUCER-CUMSUMER | 支持 | 支持 | 支持 | 支持 |
PUBLISH-SUBSCRIBE | 支持 | 支持 | 支持 | 支持 |
REQUEST-REPLY(请求-响应) | 支持 | 支持 | - | 支持 |
API完备性 | 高 | 高 | 高 | 低(静态配置) |
多语言支持 | 支持,Java优先 | 语言无关 | 支持,Java优先 | 支持 |
单机吞吐量 | 万级 | 万级 | 十万级 | 单机万级 |
消息延迟 | - | 微秒级 | 毫秒级 | - |
可用性 | 高(主从) | 高(主从) | 非常高(分布式) | 高 |
消息丢失 | - | 低 | 理论上不会丢失 | - |
消息重复 | - | 可控制 | 理论上会有重复 | - |
文档的完备性 | 高 | 高 | 高 | 中 |
提供快速入门 | 有 | 有 | 有 | 无 |
首次部署难度 | - | 低 | 中 | 高 |
三、JMS组成
1.JMS Provider
- 实现JMS接口和规范的消息中间件,也就是我们说的MQ服务器
2.JMS Producer
- 消息生产者,创建和发送JMS消息的客户端应用
3.JMS Consumer
- 消息消费者,接收和处理JMS消息的客户端应用
4.JSM Message
4.1 消息头
- JMSDestination:消息发送的目的地,主要是指Queue和Topic
- JMSDeliveryMode:持久模式和非持久模式。
- 一条持久性的消息:应该被传送“一次仅仅一次”,这就意味着如果JMS提供者出现故障,该消息并不会丢失,它会在服务器恢复之后再次传递。
- 一条非持久的消息:最多会传递一次,这意味着服务器出现故障,该消息将会永远丢失。
- JMSExpiration:可以设置消息在一定时间后过期,
默认是永不过期
- 消息过期时间,等于Destination的send方法中的timeToLive值加上发送时刻的GMT时间值。
- 如果timeToLive值等于0,则JMSExpiration被设为0,表示该消息永不过期。
- 如果发送后,在消息过期时间之后还没有被发送到目的地,则该消息被清除。
- JMSPriority:消息优先级,从0-9十个级别,0-4是普通消息5-9是加急消息。
- JMS
不要求
MQ严格按照这十个优先级发送消息但必须保证加急消息要先于普通消息到达。默认是4级。
- JMS
- JMSMessageID:唯一标识每个消息的标识由MQ产生。
4.2 消息体
- 封装具体的消息数据
- 5种消息格式
TxtMessage
:普通字符串消息,包含一个StringMapMessage
:一个Map类型的消息,key为Strng类型,而值为Java基本类型- BytesMessage:二进制数组消息,包含一个byte[]
- StreamMessage:Java数据流消息,用标准流操作来顺序填充和读取
- ObjectMessage:对象消息,包含一个可序列化的Java对象
注意:发送和接收的消息体类型必须一致对应
4.2.1 生产者
package com.qingsi.activemq;import org.apache.activemq.ActiveMQConnectionFactory;import javax.jms.*;public class JmsProduce {public static final String ACTIVEMQ_URL = "tcp://192.168.86.128:61616";public static final String QUEUE_NAME = "queue01";public static void main(String[] args) throws JMSException {// 1.创建连接工厂, 采用默认的用户名密码ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);// 2.通过连接工厂,获得connection并启动访问Connection connection = activeMQConnectionFactory.createConnection();connection.start();// 3.创建会话session// 两个参数,第一个叫事务,第二个叫签收Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);// 4.创建目的地(是队列还是主题)Queue queue = session.createQueue(QUEUE_NAME);// 5.创建消息的生产者MessageProducer producer = session.createProducer(queue);// 6.使用生产者生成3条消息发送到MQ队列for (int i = 1; i <= 3; i++) {// 7.1 创建文本消息TextMessage textMessage = session.createTextMessage("msg--" + i);// 最简单的字符串producer.send(textMessage);// 7.2 创建Map类型的消息MapMessage mapMessage = session.createMapMessage();mapMessage.setString("k1", "v1");producer.send(mapMessage);// 7.3 创建 BytesMessageBytesMessage bytesMessage = session.createBytesMessage();bytesMessage.writeBytes("hello".getBytes());bytesMessage.writeBytes("world".getBytes());producer.send(bytesMessage);// 7.4 创建 StreamMessageStreamMessage streamMessage = session.createStreamMessage();streamMessage.writeString("hello");streamMessage.writeInt(123);producer.send(streamMessage);// 7.5 创建 ObjectMessage// ObjectMessage objectMessage = session.createObjectMessage();// MyObject myObject = new MyObject();// myObject.setName("张三");// myObject.setAge(20);// objectMessage.setObject(myObject);// producer.send(objectMessage);}// 9.关闭资源producer.close();session.close();connection.close();System.out.println("MQ消息发布完成");}}
4.2.2 消费者
package com.qingsi.activemq;import org.apache.activemq.ActiveMQConnectionFactory;import javax.jms.*;
import java.io.IOException;public class JmsConsumer {public static final String ACTIVEMQ_URL = "tcp://192.168.86.128:61616";public static final String QUEUE_NAME = "queue01";public static void main(String[] args) throws JMSException, IOException {// 1.创建连接工厂, 采用默认的用户名密码ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);// 2.通过连接工厂,获得connection并启动访问Connection connection = activeMQConnectionFactory.createConnection();connection.start();// 3.创建会话session// 两个参数,第一个叫事务,第二个叫签收Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);// 4.创建目的地(是队列还是主题)Queue queue = session.createQueue(QUEUE_NAME);// 5.创建消息的消费者MessageConsumer consumer = session.createConsumer(queue);// 6.通过监听的方式来消费消息consumer.setMessageListener(new MessageListener() {@Overridepublic void onMessage(Message message) {if (message instanceof TextMessage) {TextMessage textMessage = (TextMessage) message;try {System.out.println("消费者消费消息:" + textMessage.getText());} catch (JMSException e) {e.printStackTrace();}}if (message instanceof MapMessage) {MapMessage mapMessage = (MapMessage) message;try {System.out.println("消费者消费map消息:" + mapMessage.getString("k1"));} catch (JMSException e) {e.printStackTrace();}}if (message instanceof BytesMessage) {BytesMessage bytesMessage = (BytesMessage) message;byte[] bytes = new byte[0];try {bytes = new byte[(int) bytesMessage.getBodyLength()];} catch (JMSException e) {e.printStackTrace();}try {bytesMessage.readBytes(bytes);} catch (JMSException e) {e.printStackTrace();}String content = new String(bytes);System.out.println("消费者消费 BytesMessage 消息:" + content);}if (message instanceof StreamMessage) {StreamMessage streamMessage = (StreamMessage) message;String text = null;try {text = streamMessage.readString();} catch (JMSException e) {e.printStackTrace();}int number = 0;try {number = streamMessage.readInt();} catch (JMSException e) {e.printStackTrace();}System.out.println("消费者消费 StreamMessage 消息:" + text + ", " + number);}// if (message instanceof ObjectMessage) {// ObjectMessage objectMessage = (ObjectMessage) message;// Serializable object = objectMessage.getObject();// if (object instanceof MyObject) {// MyObject myObject = (MyObject) object;// System.out.println("消费者消费 ObjectMessage 消息:" + myObject.getName() + ", " + myObject.getAge());// }//}}});// 保证控制台不关掉System.in.read();consumer.close();session.close();connection.close();}
}
4.3 消息属性
- 如果需要除消息字段以外的值,那么可以使用消息属性
- 识别/去重/重点标注等操作非常有用的方法
- 有以下API
四、JMS可靠性
1.PERSISTENT - 持久化
1.1 参数设置
- 非持久:当服务器宕机,消息不存在。
- messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT)
- 持久:持久化:当服务器宕机,消息依然存在。
- messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT)
1.2 Queue持久化
- Queue默认是持久
1.3 Topic持久化
- 场景:只要订阅了之后,离线了重新上线,就会继续消费
- 类似微信公众号
注意:先启动定阅消费者再启动定阅生产者
- 当所有的消息必须被接收,则用持久订阅。当消息丢失能够被容忍,则用非持久订阅
1.3.1 持久的发布主题生产者
package com.qingsi.activemq;import org.apache.activemq.ActiveMQConnectionFactory;import javax.jms.*;public class JmsTopicProduce {public static final String ACTIVEMQ_URL = "tcp://192.168.86.128:61616";public static final String TOPIC_NAME = "topic01";public static void main(String[] args) throws JMSException {// 1.创建连接工厂, 采用默认的用户名密码ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);// 2.通过连接工厂,获得connection并启动访问Connection connection = activeMQConnectionFactory.createConnection();// 3.创建会话session// 两个参数,第一个叫事务,第二个叫签收Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);// 4.创建目的地(是队列还是主题)Topic topic = session.createTopic(TOPIC_NAME);// 5.创建消息的生产者MessageProducer producer = session.createProducer(topic);//6.设置生产者生产持久化的Topicproducer.setDeliveryMode(DeliveryMode.PERSISTENT);//7.启动连接connection.start();// 8.使用生产者生成3条消息发送到MQ主题for (int i = 1; i <= 3; i++) {// 9.创建消息TextMessage textMessage = session.createTextMessage("msg-topic--" + i);// 最简单的字符串// 10.通过producer发送给mqproducer.send(textMessage);}// 11.关闭资源producer.close();session.close();connection.close();System.out.println("MQ消息发布到topic完成");}}
1.3.2 持久的订阅主题消费者
package com.qingsi.activemq;import org.apache.activemq.ActiveMQConnectionFactory;import javax.jms.*;
import java.io.IOException;public class JmsTopicConsumer {public static final String ACTIVEMQ_URL = "tcp://192.168.86.128:61616";public static final String TOPIC_NAME = "topic01";public static void main(String[] args) throws JMSException, IOException {System.out.println("我是1号消费者");// 1.创建连接工厂, 采用默认的用户名密码ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);// 2.通过连接工厂,获得connection并启动访问Connection connection = activeMQConnectionFactory.createConnection();connection.setClientID("王五");// 3.创建会话session// 两个参数,第一个叫事务,第二个叫签收Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);// 4.创建目的地(是队列还是主题)Topic topic = session.createTopic(TOPIC_NAME);//5.通过session创建持久化订阅TopicSubscriber topicSubscriber = session.createDurableSubscriber(topic, "我是王五");//6.启动连接connection.start();// 7.通过监听的方式来消费消息topicSubscriber.setMessageListener(new MessageListener() {@Overridepublic void onMessage(Message message) {if (message instanceof TextMessage){TextMessage textMessage = (TextMessage) message;try {System.out.println("我是1号消费者消费消息:" + textMessage.getText());} catch (JMSException e) {e.printStackTrace();}}}});System.in.read();session.close();connection.close();}
}
订阅者在线
订阅者离线
- 这个必须订阅(启动过一次程序)过一次才行
2.Transaction - 事务
- 事务偏生产者
注意:如果生产者开了事务,那么签收默认就是自动签收(指定了其他签收类型,也是自动签收)
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
- createSession的第一个参数决定是否开启。如果开启了 需要提交事务才会起效
- 关闭事务:只要执行send,就进入到队列中。关闭事务,那第2个签收参数的设置需要有效
- 开启事务:先执行send再执行commit,消息才被真正提交到队列中。消息需要批量提交,需要缓冲处理。
- 如果开启了事务,那么级别是比签收更高一些。
- 事务场景:涉及到一次性发送两条及以上的消息,那么需要使用事务。
容易出现的生产事故:消费者开启了事务,但是没有commit,就会造成消息重复消费。
2.1 生产者开启事务
package com.qingsi.activemq;import org.apache.activemq.ActiveMQConnectionFactory;import javax.jms.*;public class JmsProduce {public static final String ACTIVEMQ_URL = "tcp://192.168.86.128:61616";public static final String QUEUE_NAME = "queue01";public static void main(String[] args) throws JMSException {//1.创建连接工厂,按照给定的URL,采用默认的用户名密码ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);//2.通过连接工厂,获得connection并启动访问Connection connection = activeMQConnectionFactory.createConnection();connection.start();//3.创建会话session//两个参数transacted=事务,acknowledgeMode=确认模式(签收)//开启事务需要commitSession session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);//4.创建目的地(具体是队列queue还是主题topic)Queue queue = session.createQueue(QUEUE_NAME);//5.创建消息的生产者,并设置不持久化消息MessageProducer producer = session.createProducer(queue);//6.通过使用消息生产者,生产三条消息,发送到MQ的队列里面try {for (int i = 0; i < 3; i++) {TextMessage textMessage = session.createTextMessage("tx msg--" + i);producer.send(textMessage);}//7.提交事务session.commit();System.out.println("消息发送完成");} catch (Exception e) {System.out.println("出现异常,消息回滚");session.rollback();} finally {//8.关闭资源producer.close();session.close();connection.close();}}}
2.2 消费者开启事务
package com.qingsi.activemq;import org.apache.activemq.ActiveMQConnectionFactory;import javax.jms.*;public class JmsConsumer {public static final String ACTIVEMQ_URL = "tcp://192.168.86.128:61616";public static final String QUEUE_NAME = "queue01";public static void main(String[] args) throws JMSException {//1.创建连接工厂,按照给定的URL,采用默认的用户名密码ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);//2.通过连接工厂,获得connection并启动访问Connection connection = activeMQConnectionFactory.createConnection();connection.start();//3.创建会话session//两个参数transacted=事务,acknowledgeMode=确认模式(签收)//消费者开启了事务就必须手动提交,不然会重复消费消息Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);//4.创建目的地(具体是队列queue还是主题topic)Queue queue = session.createQueue(QUEUE_NAME);//5.创建消息的消费者,指定消费哪一个队列里面的消息MessageConsumer messageConsumer = session.createConsumer(queue);//6.通过监听的方式消费消息messageConsumer.setMessageListener(new MessageListener() {int a = 0;@Overridepublic void onMessage(Message message) {if (message instanceof TextMessage) {try {if (a == 2) {System.out.println(1 / 0);}TextMessage textMessage = (TextMessage) message;System.out.println("***消费者接收到的消息: " + textMessage.getText());session.commit();a = a + 1;} catch (Exception e) {System.out.println("出现异常,消费失败,放弃消费");try {session.rollback();a=0;} catch (JMSException ex) {ex.printStackTrace();}}}}});//7.关闭资源}
}
3.Acknowledge - 签收
- 签收偏消费者
- 签收(Acknowledge)是指消费者告知消息代理(Broker)已经成功处理并消费了特定消息的操作。
- 签收的作用主要有以下几点:
- 1.确保可靠性:当消费者接收到一条消息时,可以对该消息进行签收。通过签收,消费者向 Broker 表示已经成功处理了该消息,并且要求 Broker 删除该消息或将其标记为已消费,以确保消息不会再次被传递给其他消费者。
- 2.消息顺序:签收还可以控制消息的消费顺序。在 ActiveMQ 中,可以使用消息的签收方式来控制消息的顺序性。例如,如果消费者使用手动签收模式(MANUAL_ACKNOWLEDGE),消费者可以在处理完当前消息后再进行签收,这样可以确保消息按照顺序进行消费。
- 3.事务支持:签收和事务密切相关。在使用事务模式时,消费者可以批量消费多条消息,并在事务提交时进行签收。如果事务回滚,则消息将被重新传递给消费者,以确保消息的可靠性处理。
签收是非事务的
- 签收类型(以下标红的是常用):
自动签收(默认):Session.AUTO_ACKNOWLEDGE
- 在这种模式下,当消费者从队列或主题中接收到消息后,消息代理会立即将该消息视为已经被消费,不需要消费者显式地调用acknowledge()方法进行确认。因此,存在一定的消息传递风险,因为如果消费者在处理消息期间发生故障,消息可能会丢失。
手动签收:Session.CLIENT_ACKNOWLEDGE
- 在这种模式下,消费者必须显式地调用acknowledge()方法来确认消息的接收。消费者可以在处理完消息后手动调用acknowledge()方法,以通知消息代理该消息已成功处理。这种方式能够确保消息在被消费后才会被视为已被消费,从而增加了消息传递的可靠性。
- 延迟确认:Session.DUPS_OK_ACKNOWLEDGE。
- 消费者不需要在接收消息时就立即确认。
消息代理会允许消息重复传递
,但是这种模式可以提高消息的传递性能。消费者可以在后续的某个时间点调用acknowledge()方法来确认消息的接收。
- 消费者不需要在接收消息时就立即确认。
- 事务签收:Session.SESSION_TRANSACTED(事务签收)
- 在使用会话(Session)进行消息接收时,可以选择开启事务,并在事务提交时进行消息的确认。这种方式可以确保一组消息要么全部被消费,要么全部不被消费,从而保证消息的一致性和可靠性。
3.1 案例-手动签收
package com.qingsi.activemq;import org.apache.activemq.ActiveMQConnectionFactory;import javax.jms.*;
import java.io.IOException;public class JmsConsumer {public static final String ACTIVEMQ_URL = "tcp://192.168.86.128:61616";public static final String QUEUE_NAME = "queue01";public static void main(String[] args) throws JMSException, IOException {//1.创建连接工厂,按照给定的URL,采用默认的用户名密码ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);//2.通过连接工厂,获得connection并启动访问Connection connection = activeMQConnectionFactory.createConnection();connection.start();//3.创建会话session//两个参数transacted=事务,acknowledgeMode=确认模式(签收)//消费者开启了事务就必须手动提交,不然会重复消费消息Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);//4.创建目的地(具体是队列queue还是主题topic)Queue queue = session.createQueue(QUEUE_NAME);//5.创建消息的消费者,指定消费哪一个队列里面的消息MessageConsumer messageConsumer = session.createConsumer(queue);//6.通过监听的方式消费消息messageConsumer.setMessageListener(new MessageListener() {@Overridepublic void onMessage(Message message) {if (message instanceof TextMessage) {try {TextMessage textMessage = (TextMessage) message;System.out.println("***消费者接收到的消息: " + textMessage.getText());// 手动签收textMessage.acknowledge();} catch (Exception e) {System.out.println("出现异常,消费失败,放弃消费");}}}});//7.关闭资源System.in.read();messageConsumer.close();session.close();connection.close();}
}
4.签收和事务的关系
- 在事务性会话中,当一个事务被成功提交则消息被自动签收。如果事务回滚,则消息会被再次传送。
- 非事务性会话中,消息何时被确认取决于创建会话时的应答模式(acknowledgement mode)