消费者pull和push
pull 为主动从broker获取消息
Push为broker主动推送消息个consumer 实时性更高,但流量要自己控制
PullBatchSize,代表的是每次从broker的一个队列上拉取的最大消息数。
consumeThreadMax 和 consumeThreadMin 代表消费者pull消息时需要的线程最大和最小数量
广播模式和集群模式
广播把消息发送给订阅这个主题的所有消费者
广播消息不支持消息重试
集群是消息只要有一个消费者消费后变算为消费成功
顺序消息必须为集群模式
顺序消息:
Rocketmq全局顺序消息和局部顺序
全局顺序:一个Topic内所有的消息按照先进先出的顺序进行发布和消费。
部分顺序:一个部分内所有的消息按照先进先出的顺序进行发布和消费。
三个阶段保证消息顺序:
生产顺序性:单一的生产者 并且 串行发送消息
存储时保持和发送的顺序一致
消费时保持和存储的顺序一致
关于重试:
顺序消息可以设置最大重试次数,若不设置则可以认为是无限次
若设置,则达到最大重试次数时消息会变为已消费,会执行后序消息,可能无法保证消息的顺序性,所以要做好顺序验证
顺序消息的重试间隔为固定时间
无序消息的重试时间为阶梯时间
重试次数可通过maxReconsumeTimes 参数进行设置
广播消息不可重试
全局顺序
生产者
创建topic时只创建一个queue,所有的消息都保存在同一个broker里就可以保证顺序
或者选择其中一个队列也可以
生产者就和普通的没区别
producer.send(message);
消费者
@PostConstruct
public void init{consumer = new DefaultMQPushConsumer("group");consumer.setNamesrvAddr("1270.0.1:9876");consumer.setMessageModel(MessageModel.BROADCASTING);consumer.setPullInterval(2000);consumer.setPullBatchSize(100);// 顺序消息设置为1,多个其他的会被空置consumer.setConsumeThreadMin(1);try {consumer.subscribe("topic", "tag");} catch (MQClientException e) {throw new RuntimeException(e);}consumer.registerMessageListener((MessageListenerOrderly) (msgs, context) -> {if (CollectionUtils.isEmpty(msgs)) {return ConsumeOrderlyStatus.SUCCESS;}for (MessageExt msg : msgs) {try {// 处理message消息} catch (Exception e) {log.info("Consumer message failed", e);throw new RuntimeException(e);}}return ConsumeOrderlyStatus.SUCCESS;});try {consumer.start();} catch (Exception e) {throw new RuntimeException(e);}
}
局部顺序
假设场景,一个订单的不同操作需要保证顺序,比如订单生成->支付->完成
此时方法中arg参数传订单号即可,保证需要顺序的消息有一个统一的标识可以进入到同一个队列中
顺序消息的逻辑就是通过统一标识的hashcode和队列数量size进行取余操作
所以顺序消息有个前提是这个topic的队列数量不可随意修改(倍数可以),否则顺序消息会出现异常(可提前设置此topic的队列数量为最大值16)
生产者
public void sendMsg(MessageInfo info,Object arg){String json = JSON.toJSONString(info);Message message = new Message("messageType", json.getBytes());try {producer.send(message, new MessageQueueSelector() {@Overridepublic MessageQueue select(List<MessageQueue> list, Message message, Object o) {int value = arg.hashCode() % list.size();if (value < 0) {value = Math.abs(value);}return list.get(value);}}, arg);} catch (Exception e) {throw new RuntimeException(e);}
}
消费者
同全局顺序消费