全面解剖 消息中间件 RocketMQ-(5)
一、RocketMQ :过滤消息的两种方式
1、Tag 过滤
在大多数情况下,TAG 是一个简单而有用的设计,其可以来选择您想要的消息。
例如:
DefaultMoPushconsumer consumer = new DefaultMoPushconsumer(“CID EXAMPLE”).consumer.subscribe(“TOPIC”,“TAGA II TAGB II TAGC”):
消费者将接收包含 TAGA 或 TAGB 或 TAGC 的消息。但是限制是一个消息只能有一个标签,这对于复杂的场景可能不起作用。
在这种情况下,可以使用 SQL 表达式筛选消息。SQL 特性可以通过发送消息时的属性来进行计算。在 RocketMQ 定义的语法下,可以实现一些简单的逻辑。
2、SQL 语法过滤
2.1 RocketMQ 只定义了一些基本语法来支持这个特性。你也可以很容易地扩展它。
- 数值比较,比如:**>,>=,<,<=,BETWEEN,=;活活
- 字符比较,比如:=,<>,IN;
- IS NULL 或者 IS NOT NULL;
- 逻辑符号 AND,OR,NOT;
2.2 常量支持类型为:
- 数值,比如:123,3.1415;
- 字符,比如:‘abc’,必须用单引号包裹起来:
- NULL,特殊的常量
- 布尔值,TRUE 或 FALSE
2.3 只有使用 push 模式的消费者才能使用 SQL92 标准的 sql 语句,接口如下:
public void subscribe(finalstring topic, final Messageselector messageselector)
二、RocketMQ :Tag 过滤
1、在工程 rocketmq_demo (模块)中,创建 Tag 过滤 生产 类 Producer.java
/*** rocketmq_demo\src\main\java\djh\it\mq\rocketmq\filter\tag\Producer.java** 2024-5-24 创建 Tag 过滤 生产 类 Producer.java*/
package djh.it.mq.rocketmq.filter.tag;import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;import java.util.concurrent.TimeUnit;public class Producer {public static void main(String[] args) throws Exception {//1.创建消息生产者 producer,并制定生产者组名DefaultMQProducer producer = new DefaultMQProducer("group1");//2.指定 Nameserver 地址(集群配置)//producer.setNamesrvAddr("192.168.25.135:9876;192.168.25.138:9876");//2.指定 Nameserver 地址(非集群配置)producer.setNamesrvAddr("172.18.30.110:9876");//3.启动 producerproducer.start();for(int i=0; i<3; i++){//4.创建消息对象,指定主题 Topic、Tag 和消息体//参数一:消息主题 Topic, 参数二:消息 Tag, 参数三:消息内容//Message msg = new Message("FilterTagTopic", "Tag1", ("Hello World"+i).getBytes());Message msg = new Message("FilterTagTopic", "Tag2", ("Hello World"+i).getBytes());//5.发送消息SendResult result = producer.send(msg);System.out.println("发送结果:"+result);TimeUnit.SECONDS.sleep(1); //线程睡1秒}//6.关闭生产者 producer。producer.shutdown();}
}
2、在工程 rocketmq_demo (模块)中,创建 Tag 过滤 消费 类 Consumer.java
/*** rocketmq_demo\src\main\java\djh\it\mq\rocketmq\filter\tag\Consumer.java** 2024-5-24 创建 Tag 过滤 消费 类 Consumer.java*/
package djh.it.mq.rocketmq.filter.tag;import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;import java.util.List;public class Consumer {public static void main(String[] args) throws Exception {//1.创建消费者 Consumer,制定消费者组名DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");//2.指定 Nameserver 地址(集群配置)//consumer.setNamesrvAddr("192.168.25.135:9876;192.168.25.138:9876");//2.指定 Nameserver 地址(非集群配置)consumer.setNamesrvAddr("172.18.30.110:9876");//3.订阅主题 Topic 和 Tag//consumer.subscribe("FilterTagTopic", "Tag1"); //接收同步消息//consumer.subscribe("FilterTagTopic", "Tag2"); //接收异步消息前,可以让先发送异步消息。consumer.subscribe("FilterTagTopic", "Tag1 || Tag2"); //接收同步消息 和 异步消息//consumer.subscribe("FilterTagTopic", "*"); //接收所有消息。//添加消费模式//consumer.setMessageModel(MessageModel.CLUSTERING); //默认是负载均衡模式消费//consumer.setMessageModel(MessageModel.BROADCASTING); //广播模式消费//4.设置回调函数,处理消息consumer.registerMessageListener(new MessageListenerConcurrently(){//接受消息内容public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context){//System.out.println(msgs); //接收到的消息是未转换的字节码for(MessageExt msg : msgs){System.out.println("consumeThread="+ Thread.currentThread().getName() + ", " + new String(msg.getBody())); //转换为字符串消息}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});//5.启动消费者 consumer。consumer.start();System.out.println("消费者启动");}
}
3、启动 消费 类 Consumer.java 和 发送 类 Producer.java 进行测试。
三、RocketMQ :SQL 语法过滤
1、RocketMQ :SQL 语法过滤
1.1 RocketMQ 只定义了一些基本语法来支持这个特性。你也可以很容易地扩展它。
- 数值比较,比如:**>,>=,<,<=,BETWEEN,=;活活
- 字符比较,比如:=,<>,IN;
- IS NULL 或者 IS NOT NULL;
- 逻辑符号 AND,OR,NOT;
1.2 常量支持类型为:
- 数值,比如:123,3.1415;
- 字符,比如:‘abc’,必须用单引号包裹起来:
- NULL,特殊的常量
- 布尔值,TRUE或 FALSE
1.3 只有使用 push 模式的消费者才能用使用 SQL92 标准的 sql 语句,接口如下:
public void subscribe(finalstring topic, final Messageselector messageselector)
1.4 注意:使用 SQL92 标准的 sql 语句,需要在 broker 配置文件 添加支持 SQL92 标准的 sql 语句。
# 单机模式下,在 ./conf/broker.conf 中,添加支持 SQL92 标准的 sql 语句
enablePropertyFilter=true# 集群模式下,修改 broker-m.conf 和 broker-s.conf。然后重启 broker 。
enablePropertyFilter=true
2、在工程 rocketmq_demo (模块)中,创建 SQL 过滤 生产 类 Producer.java
/*** rocketmq_demo\src\main\java\djh\it\mq\rocketmq\filter\sql\Producer.java** 2024-5-24 创建 SQL 过滤 生产 类 Producer.java*/
package djh.it.mq.rocketmq.filter.sql;import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;import java.util.concurrent.TimeUnit;public class Producer {public static void main(String[] args) throws Exception {//1.创建消息生产者 producer,并制定生产者组名DefaultMQProducer producer = new DefaultMQProducer("group1");//2.指定 Nameserver 地址(集群配置)//producer.setNamesrvAddr("192.168.25.135:9876;192.168.25.138:9876");//2.指定 Nameserver 地址(非集群配置)producer.setNamesrvAddr("172.18.30.110:9876");//3.启动 producerproducer.start();for(int i=0; i<10; i++){//4.创建消息对象,指定主题 Topic、Tag 和消息体//参数一:消息主题 Topic, 参数二:消息 Tag, 参数三:消息内容//Message msg = new Message("FilterTagTopic", "Tag1", ("Hello World"+i).getBytes());Message msg = new Message("FilterSQLTopic", "Tag1", ("Hello World"+i).getBytes());//设置一个用户属性msg.putUserProperty("i", String.valueOf(i));//5.发送消息SendResult result = producer.send(msg);System.out.println("发送结果:"+result);TimeUnit.SECONDS.sleep(1); //线程睡1秒}//6.关闭生产者 producer。producer.shutdown();}
}
3、在工程 rocketmq_demo (模块)中,创建 SQL 过滤 消费 类 Consumer.java
/*** rocketmq_demo\src\main\java\djh\it\mq\rocketmq\filter\sql\Consumer.java** 2024-5-24 创建 SQL 过滤 消费 类 Consumer.java*/
package djh.it.mq.rocketmq.filter.sql;import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.MessageSelector;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;import java.util.List;public class Consumer {public static void main(String[] args) throws Exception {//1.创建消费者 Consumer,制定消费者组名DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");//2.指定 Nameserver 地址(集群配置)//consumer.setNamesrvAddr("192.168.25.135:9876;192.168.25.138:9876");//2.指定 Nameserver 地址(非集群配置)consumer.setNamesrvAddr("172.18.30.110:9876");//3.订阅主题 Topic 和 Tag//注意:需要在 broker 配置文件 ./conf/broker.conf 中,# 添加支持 SQL92 标准的 sql 语句 //enablePropertyFilter=trueconsumer.subscribe("FilterSQLTopic", MessageSelector.bySql("i>5")); //接收同步消息 和 异步消息//4.设置回调函数,处理消息consumer.registerMessageListener(new MessageListenerConcurrently(){//接受消息内容public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context){//System.out.println(msgs); //接收到的消息是未转换的字节码for(MessageExt msg : msgs){System.out.println("consumeThread="+ Thread.currentThread().getName() + ", " + new String(msg.getBody())); //转换为字符串消息}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});//5.启动消费者 consumer。consumer.start();System.out.println("消费者启动");}
}
4、启动 消费 类 Consumer.java 和 发送 类 Producer.java 进行测试。
四、RocketMQ :事务消息的流程分析
1、事务消息的大致方案分为两个流程:正常事务消息的发送及提交、事务消息的补偿流程。
1.1 事务消息发送及提交
- 发送消息 ( half 消息 )。
- 服务端响应消息写入结果。
- 根据发送结果执行本地事务(如果写入失败,此时half消息对业务不可见,本地逻辑不执行)。
- 根据本地事务状态执行Commit或者Rollback(Commit操作生成消息索引,消息对消费者可见)。
1.2 事务补偿
- 对没有 Commit/Rollback 的事务消息 ( pending 状态的消息),从服务端发起一次“回查。
- Producer 收到回查消息,检查回查消息对应的本地事务的状态。
- 根据本地事务状态,重新 Commit 或者 Rollback。
其中,补偿阶段用于解决消息 Commit 或者 Rollback 发生超时或者失败的情况。
2、事务消息共有三种状态,提交状态、回滚状态中间状态。
- TransactionStatus.commitTransaction : 提交事务,它允许消费者消费此消息不允许被消费。
- TransactionStatus.RollbackTransaction : 回滚事务,它代表该消息将被删除,
- TransactionStatus.Unknown : 中间状态,它代表需要检查消息队列来确定状态。
五、RocketMQ :事务消息的实现
1、在工程 rocketmq_demo (模块)中,创建 事务消息 生产 类 Producer.java
/*** rocketmq_demo\src\main\java\djh\it\mq\rocketmq\transaction\Producer.java** 2024-5-24 创建 事务消息 生产 类 Producer.java*/
package djh.it.mq.rocketmq.transaction;import io.netty.util.internal.StringUtil;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.producer.*;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;import java.util.concurrent.TimeUnit;public class Producer {public static void main(String[] args) throws Exception {//1.创建消息生产者 producer,并制定生产者组名//DefaultMQProducer producer = new DefaultMQProducer("group1");TransactionMQProducer producer = new TransactionMQProducer("group2");//2.指定 Nameserver 地址(集群配置)//producer.setNamesrvAddr("192.168.25.135:9876;192.168.25.138:9876");//2.指定 Nameserver 地址(非集群配置)producer.setNamesrvAddr("172.18.30.110:9876");//添加事务监听器producer.setTransactionListener(new TransactionListener() {//在该方法中执行本地事务public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {if(StringUtils.equals("TAGA", msg.getTags())){return LocalTransactionState.COMMIT_MESSAGE;}else if(StringUtils.equals("TAGB", msg.getTags())){return LocalTransactionState.ROLLBACK_MESSAGE;}else if(StringUtils.equals("TAGC", msg.getTags())){return LocalTransactionState.UNKNOW;}return LocalTransactionState.UNKNOW;}//在该方法中 MQ 进行消息事务状态的回查public LocalTransactionState checkLocalTransaction(MessageExt msg) {System.out.println("消息的Tag:"+msg.getTags());return LocalTransactionState.COMMIT_MESSAGE;}});//3.启动 producerproducer.start();String[] tags = {"TAGA", "TAGB", "TAGC"};for(int i=0; i<3; i++){//4.创建消息对象,指定主题 Topic、Tag 和消息体//参数一:消息主题 Topic, 参数二:消息 Tag, 参数三:消息内容//Message msg = new Message("FilterTagTopic", "Tag1", ("Hello World"+i).getBytes());Message msg = new Message("TransactionTopic", tags[i], ("Hello World"+i).getBytes());//5.发送消息SendResult result = producer.sendMessageInTransaction(msg, null);//发送状态SendStatus status = result.getSendStatus();System.out.println("发送结果:"+result);TimeUnit.SECONDS.sleep(1); //线程睡1秒}//6.关闭生产者 producer。//producer.shutdown();}
}
2、在工程 rocketmq_demo (模块)中,创建 事务消息 消费 类 Consumer.java
/*** rocketmq_demo\src\main\java\djh\it\mq\rocketmq\transaction\Consumer.java** 2024-5-24 创建 事务消息 消费 类 Consumer.java*/
package djh.it.mq.rocketmq.transaction;import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;import java.util.List;public class Consumer {public static void main(String[] args) throws Exception {//1.创建消费者 Consumer,制定消费者组名DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");//2.指定 Nameserver 地址(集群配置)//consumer.setNamesrvAddr("192.168.25.135:9876;192.168.25.138:9876");//2.指定 Nameserver 地址(非集群配置)consumer.setNamesrvAddr("172.18.30.110:9876");//3.订阅主题 Topic 和 Tag//consumer.subscribe("FilterTagTopic", "Tag1"); //接收同步消息//consumer.subscribe("FilterTagTopic", "Tag2"); //接收异步消息前,可以让先发送异步消息。//consumer.subscribe("TransactionTopic", "Tag1 || Tag2"); //接收同步消息 和 异步消息consumer.subscribe("TransactionTopic", "*"); //接收所有消息。//添加消费模式//consumer.setMessageModel(MessageModel.CLUSTERING); //默认是负载均衡模式消费//consumer.setMessageModel(MessageModel.BROADCASTING); //广播模式消费//4.设置回调函数,处理消息consumer.registerMessageListener(new MessageListenerConcurrently(){//接受消息内容public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context){//System.out.println(msgs); //接收到的消息是未转换的字节码for(MessageExt msg : msgs){System.out.println("consumeThread="+ Thread.currentThread().getName() + ", " + new String(msg.getBody())); //转换为字符串消息}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});//5.启动消费者 consumer。consumer.start();System.out.println("消费者启动");}
}
3、启动 消费 类 Consumer.java 和 发送 类 Producer.java 进行测试。
1) Producer.java 输出 三条记录 和 消息的Tag:TAGC
3) Consumer.java 输出 2 条记录,其中一条回滚了。
上一节关联链接请点击:
# 全面解剖 消息中间件 RocketMQ-(4)