RocketMQ系列文章
RocketMQ(一):基本概念和环境搭建
RocketMQ(二):原生API快速入门
RocketMQ(三):集成SpringBoot
目录
- 一、搭建环境
- 二、不同类型消息
- 1、同步消息
- 2、异步消息
- 3、单向消息
- 4、延迟消息
- 5、顺序消息
- 6、带tag消息
- 7、带key消息
一、搭建环境
- 需要创建两个服务,消息生产服务和消息消费者服务
- 生产消息存在多个服务,消费则统一由一个服务处理
- 这样可以做到
解耦
pom.xml
- 生产者和消费者都需要
<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.2.2</version>
</dependency>
生产者配置文件
- 设置统一的
生产者组
,这样发送消息时就不用指定了
rocketmq:name-server: 127.0.0.1:9876 # rocketMq的nameServer地址producer:group: boot-producer-group # 生产者组别send-message-timeout: 3000 # 消息发送的超时时间retry-times-when-send-async-failed: 2 # 异步消息发送失败重试次数max-message-size: 4194304 # 消息的最大长度
生产者配置文件
- 不能设置统一的消费者组,因为不同的消费者订阅关系不一致,需要设置不同的消费者组
rocketmq:name-server: localhost:9876
二、不同类型消息
直接引入即可
@Autowired
private RocketMQTemplate rocketMQTemplate;
1、同步消息
生产消息
- 消息由消费者发送到broker后,会得到一个确认,是具有可靠性的
- 比如:重要的消息通知,短信通知等
rocketMQTemplate.syncSend("bootTestTopic", "我是boot的一个消息");
消费消息
- RocketMQListener的泛型类型即
消息类型
- MessageExt类型是消息的所有内容
- 其他类型则就只是
消息体内容
,没有消息头内容(keys、msgId、延迟时间、重试次数、主题名称...)
- onMessage方法内没有报错就是签收了,报错就是拒收会重试
@Component
@RocketMQMessageListener(topic = "bootTestTopic", consumerGroup = "boot-test-consumer-group")
public class ABootSimpleMsgListener implements RocketMQListener<MessageExt> {@Overridepublic void onMessage(MessageExt message) {System.out.println(new String(message.getBody()));}
}
2、异步消息
- 发送异步消息,发送完以后会有一个异步通知
- 不影响程序往下执行
rocketMQTemplate.asyncSend("bootAsyncTestTopic", "我是boot的一个异步消息", new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {System.out.println("成功");}@Overridepublic void onException(Throwable throwable) {System.out.println("失败" + throwable.getMessage());}
});
3、单向消息
- 不关心发送结果的场景,这种方式吞吐量很大,但是存在消息丢失的风险
- 例如日志信息的发送
rocketMQTemplate.sendOneWay("bootOnewayTopic", "单向消息");
4、延迟消息
- RocketMQ不支持任意时间的延时
- 只支持以下
18
个固定的延时等级,等级1就对应1s,以此类推,最高支持2h延迟 - private String messageDelayLevel = “
1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
”; - 发送一个延时消息,延迟等级为4级,也就是30s后被监听消费
Message<String> msg = MessageBuilder.withPayload("我是一个延迟消息").build();
rocketMQTemplate.syncSend("bootMsTopic", msg, 3000, 4);
5、顺序消息
生产消息
- 根据syncSendOrderly方法的第三个参数计算
hash值
决定消息放入哪个队列
// 顺序消息 发送者放 需要将一组消息 都发在同一个队列中去 消费者 需要单线程消费
List<MsgModel> msgModels = Arrays.asList(new MsgModel("qwer", 1, "下单"),new MsgModel("qwer", 1, "短信"),new MsgModel("qwer", 1, "物流"),new MsgModel("zxcv", 2, "下单"),new MsgModel("zxcv", 2, "短信"),new MsgModel("zxcv", 2, "物流")
);
msgModels.forEach(msgModel -> {// 发送 一般都是以json的方式进行处理// 根据第三个参数计算hash值决定消息放入哪个队列rocketMQTemplate.syncSendOrderly("bootOrderlyTopic", JSON.toJSONString(msgModel), msgModel.getOrderSn());
});
消费消息
- 默认是并发消费模式,可以设置为单线程顺序模式
- 设置消费重试次数
@Component
@RocketMQMessageListener(topic = "bootOrderlyTopic",consumerGroup = "boot-orderly-consumer-group",consumeMode = ConsumeMode.ORDERLY, // 顺序消费模式 单线程maxReconsumeTimes = 5 // 消费重试的次数
)
public class BOrderlyMsgListener implements RocketMQListener<MessageExt> {@Overridepublic void onMessage(MessageExt message) {MsgModel msgModel = JSON.parseObject(new String(message.getBody()), MsgModel.class);System.out.println(msgModel);}
}
6、带tag消息
- tag带在
主题
后面用:
来携带
rocketMQTemplate.syncSend("bootTagTopic:tagA", "我是一个带tag的消息");
7、带key消息
Message<String> message = MessageBuilder.withPayload("我是一个带key的消息").setHeader(RocketMQHeaders.KEYS, "10086").build();
rocketMQTemplate.syncSend("bootKeyTopic", message);
获取带key和tag的消费者
- 过滤模式有两种:正则表达式和sql92方式
- keys从MessageExt对象中获取
@Component
@RocketMQMessageListener(topic = "bootTagTopic",consumerGroup = "boot-tag-consumer-group",selectorType = SelectorType.TAG,// tag过滤模式selectorExpression = "tagA || tagB"
// selectorType = SelectorType.SQL92,// sql92过滤模式
// selectorExpression = "a in (3,5,7)" // broker.conf中开启enbalePropertyFilter=true
)
public class CTagMsgListener implements RocketMQListener<MessageExt> {@Overridepublic void onMessage(MessageExt message) {System.out.println("获取keys: " + message.getKeys());System.out.println("消息内容: " + new String(message.getBody()));}
}
查看源码
- destination目标 = 主题 : 标签
- keys从消息头里面获取