RocketMQ角色
RocketMQ的基本概念
同步发送消息
/*** @author* @create 2023-04-08 17:24* 发送同步消息* 适用于重要的消息 例如消息通知,短信通知*/
public class SyncProducer {public static void main(String[] args) throws Exception {String topic="testRocketMQ";String tag="Tag1";String body="Hello RocketMQ";//创建消息生产者Producer,并制订生产者组名DefaultMQProducer producer = new DefaultMQProducer("group1");//指定NameServer地址producer.setNamesrvAddr("localhost:9876");//启动producerproducer.start();//创建消息对象,指定主题topic,tag和消息体for (int i = 0; i < 10; i++) {/*** 主题 topic* 消息的tag* 消息内容*/Message message = new Message("testRocketMQ","Tag1",("Hello RocketMQ"+i).getBytes(RemotingHelper.DEFAULT_CHARSET));//发送消息SendResult result = producer.send(message,1000);//发送状态SendStatus status = result.getSendStatus();System.out.println("发送状态->"+status);String msgId = result.getMsgId();System.out.println("消息id->"+msgId);int queueId = result.getMessageQueue().getQueueId();System.out.println("消息接收队列id"+queueId);TimeUnit.SECONDS.sleep(1);//线程休息1s}//关闭生产者producerproducer.shutdown();}
}
异步发送
/*** @author* @create 2023-04-08 19:52* 发送异步消息,异步监听,通过回调接口的方式接受服务端的响应* 适用于数据量较大,不能容忍阻塞场景的*/
public class ASyncProducer {public static void main(String[] args) throws Exception {
//创建消息生产者Producer,并制订生产者组名DefaultMQProducer producer = new DefaultMQProducer("group1");//指定NameServer地址producer.setNamesrvAddr("localhost:9876");//启动producerproducer.start();//创建消息对象,指定主题topic,tag和消息体for (int i = 0; i < 10; i++) {/*** 主题 topic* 消息的tag* 消息内容*/Message message = new Message("testRocketMQ","Tag1",("Hello RocketMQ"+i).getBytes(RemotingHelper.DEFAULT_CHARSET));int index=i;//发送消息producer.send(message, new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {System.out.printf("%-10d OK %s %n", index,sendResult.getMsgId());System.out.println("发送成功"+sendResult);}@Overridepublic void onException(Throwable throwable) {System.out.printf("%-10d Exception %s %n", index, throwable);}});TimeUnit.SECONDS.sleep(1);//线程休息1s}//关闭生产者producerproducer.shutdown();}}
单向发送
/*** 单向发送,只管发送,不管响应* 例如日志收集场景* @author* @create 2023-04-08 20:22*/
public class OneWayProducer {public static void main(String[] args) throws Exception {//创建消息生产者Producer,并制订生产者组名DefaultMQProducer producer = new DefaultMQProducer("group1");//指定NameServer地址producer.setNamesrvAddr("localhost:9876");//启动producerproducer.start();//创建消息对象,指定主题topic,tag和消息体for (int i = 0; i < 10; i++) {/*** 主题 topic* 消息的tag* 消息内容*/Message message = new Message("testRocketMQ","Tag1",("Hello RocketMQ"+i).getBytes(RemotingHelper.DEFAULT_CHARSET));int index=i;//发送消息producer.sendOneway(message);TimeUnit.SECONDS.sleep(1);//线程休息1s}//关闭生产者producerproducer.shutdown();}
}
集群消费 /负载均衡模式消费
/*** @author* @create 2023-04-08 20:29*/
public class Consumer {public static void main(String[] args) throws Exception {// 实例化消息生产者,指定组名DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");// 指定Namesrv地址信息.consumer.setNamesrvAddr("localhost:9876");// 订阅Topicconsumer.subscribe("testRocketMQ","*");
// consumer.subscribe("base","Tag1 || Tag2 || Tag3");
// consumer.subscribe("testRocketMQ","Tag1");//负载均衡模式消费 集群模式消费consumer.setMessageModel(MessageModel.CLUSTERING);
// consumer.setMessageModel(MessageModel.BROADCASTING);//广播模式// 注册回调函数,处理消息consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {for (MessageExt ext : list) {String str = new String(ext.getBody());System.out.println(str);}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();}
}
广播模式
consumer.setMessageModel(MessageModel.BROADCASTING);//广播模式
顺序消息的生产与消费
/*** 顺序消息*/
public class Producer {public static void main(String[] args) throws Exception {
// 1.创建消息生产者producer ,并制定生产者组名DefaultMQProducer producer = new DefaultMQProducer("group1");
// 2.制定NameServer地址producer.setNamesrvAddr("192.168.40.147:9876");
// producer.setSendMsgTimeout(10000);
// 3.启动Producerproducer.start();List<OrderStep> orderSteps = OrderStep.buildOrders();for (int i=0;i<=orderSteps.size()+11;i++) {//4.创建消息Message message = new Message("OrderTopic","Order",String.valueOf(orderSteps.get(i)).getBytes());/*** 参数一 消息对象* 参数二 消息队列的选择器* 参数三 选择队列的业务标识(订单ID)*///5.发送消息SendResult sendResult = producer.send(message, new MessageQueueSelector() {/*** @param list 队列集合* @param message 消息对象* @param obj 业务标识参数* @return*/@Overridepublic MessageQueue select(List<MessageQueue> list, Message message, Object obj) {long orderId = (long) obj;int size = list.size();int index = (int) (orderId % size);return list.get(index);}}, orderSteps.get(i).getOrderId()); //orderSteps.get(i).getOrderId() 订单ID}
// 6.关闭生产者Producerproducer.shutdown();}
}
/*** 顺序消息*/
public class Consumer {public static void main(String[] args) throws MQClientException {//1.创建消费者Consumer,制定消费者组名DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");//2.指定Nameserver地址consumer.setNamesrvAddr("192.168.40.147:9876");//3.订阅主题Topic和Tagconsumer.subscribe("OrderTopic","*");//4.注册消息监听器consumer.registerMessageListener(new MessageListenerOrderly() {//使用监听器消费消息@Overridepublic ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) {for (MessageExt message : list) {System.out.println("消息内容"+new String(message.getBody()));}return ConsumeOrderlyStatus.SUCCESS;}});//5.启动消费者consumer.start();System.out.println("消费者启动");}
}
/*** 订单构建者*/
public class OrderStep {private long orderId;private String desc;public long getOrderId() {return orderId;}public void setOrderId(long orderId) {this.orderId = orderId;}public String getDesc() {return desc;}public void setDesc(String desc) {this.desc = desc;}@Overridepublic String toString() {return "OrderStep{" +"orderId=" + orderId +", desc='" + desc + '\'' +'}';}public static List<OrderStep> buildOrders() {// 1039L : 创建 付款 推送 完成// 1065L : 创建 付款// 7235L :创建 付款List<OrderStep> orderList = new ArrayList<OrderStep>();OrderStep orderDemo = new OrderStep();orderDemo.setOrderId(1039L);orderDemo.setDesc("创建");orderList.add(orderDemo);orderDemo = new OrderStep();orderDemo.setOrderId(1065L);orderDemo.setDesc("创建");orderList.add(orderDemo);orderDemo = new OrderStep();orderDemo.setOrderId(1039L);orderDemo.setDesc("付款");orderList.add(orderDemo);orderDemo = new OrderStep();orderDemo.setOrderId(7235L);orderDemo.setDesc("创建");orderList.add(orderDemo);orderDemo = new OrderStep();orderDemo.setOrderId(1065L);orderDemo.setDesc("付款");orderList.add(orderDemo);orderDemo = new OrderStep();orderDemo.setOrderId(7235L);orderDemo.setDesc("付款");orderList.add(orderDemo);orderDemo = new OrderStep();orderDemo.setOrderId(1065L);orderDemo.setDesc("完成");orderList.add(orderDemo);orderDemo = new OrderStep();orderDemo.setOrderId(1039L);orderDemo.setDesc("推送");orderList.add(orderDemo);orderDemo = new OrderStep();orderDemo.setOrderId(7235L);orderDemo.setDesc("完成");orderList.add(orderDemo);orderDemo = new OrderStep();orderDemo.setOrderId(1039L);orderDemo.setDesc("完成");orderList.add(orderDemo);return orderList;}
}
延时消息
/*** 延时消息* @author* @create 2022-02-19 23:46*/
public class Consumer {public static void main(String[] args) throws MQClientException {//1.创建消费者Consumer,制定消费者组DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");//2.指定NameServer地址consumer.setNamesrvAddr("192.168.40.147:9876");//3.订阅主题Topic和Tagconsumer.subscribe("DelayTopic","*");//4.设置回调函数,处理消息consumer.registerMessageListener(new MessageListenerConcurrently() {//接受消息内容@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {for (MessageExt message : list) {
// System.out.println(new String(message.getBody()));// Print approximate delay time periodSystem.out.println("Receive message[msgId=" + message.getMsgId() + "] " + (System.currentTimeMillis() - message.getStoreTimestamp()) + "ms later");}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});//5.启动消费者consumerconsumer.start();System.out.println("消费者启动");}
}
/*** 延时消息* @author* @create 2022-02-19 22:52*/
public class Producer {public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
// 1.创建消息生产者producer ,并制定生产者组名DefaultMQProducer producer = new DefaultMQProducer("group1");
// 2.制定NameServer地址producer.setNamesrvAddr("192.168.40.147:9876");
// 3.启动Producerproducer.start();
// 4.创建消息对象,指定主题Topic、Tag和消息体for (int i = 0; i < 10; i++) {/*** 消息主题 topic* 消息tag* 消息内容*/Message message = new Message("DelayTopic","Tag2",("渔阳+"+i).getBytes());//延时级别 设置延时时间 //从1到18 依次设定message.setDelayTimeLevel(2);// 5.发送消息producer.send(message);System.out.println("=====================");
// System.out.printf("%s%n", result);TimeUnit.SECONDS.sleep(1); //线程休眠}
// 6.关闭生产者Producerproducer.shutdown();}
}
批量消息
/*** 批量消息* @author* @create 2022-02-19 23:46*/
public class Consumer {public static void main(String[] args) throws MQClientException {//1.创建消费者Consumer,制定消费者组DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");//2.指定NameServer地址consumer.setNamesrvAddr("192.168.40.147:9876");//3.订阅主题Topic和Tagconsumer.subscribe("BatchTest","*");//4.设置回调函数,处理消息consumer.registerMessageListener(new MessageListenerConcurrently() {//接受消息内容@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {for (MessageExt message : list) {
// System.out.println(new String(message.getBody()));// Print approximate delay time periodSystem.out.println("Receive message[msgId=" + message.getMsgId() + "] " + (System.currentTimeMillis() - message.getStoreTimestamp()) + "ms later");}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});//5.启动消费者consumerconsumer.start();System.out.println("消费者启动");}
}
/*** 批量消息* 注意点:每次只发送不超过4MB的消息* 如果消息的总长度可能超过4MB时,这时候最好把消息进行分割* @author* @create 2022-02-19 22:52*/
public class Producer {public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
// 1.创建消息生产者producer ,并制定生产者组名DefaultMQProducer producer = new DefaultMQProducer("group1");
// 2.制定NameServer地址producer.setNamesrvAddr("192.168.40.147:9876");
// 3.启动Producerproducer.start();
// 4.创建消息对象,指定主题Topic、Tag和消息体String topic = "BatchTest";List<Message> messageList = new ArrayList<>();messageList.add(new Message(topic, "TagA", "OrderID001", "Hello world 0".getBytes()));messageList.add(new Message(topic, "TagA", "OrderID002", "Hello world 1".getBytes()));messageList.add(new Message(topic, "TagA", "OrderID003", "Hello world 2".getBytes()));try {//5.发生消息 //批量producer.send(messageList);} catch (Exception e) {e.printStackTrace();//处理error}
// 6.关闭生产者Producerproducer.shutdown();System.out.println("over");}
}
过滤消息
sql模式
/*** 使用tag 过滤 本质就是 消费者的tag要和生产者的tag一致* 可以使用 "*" 代表所有tag* @author* @create 2022-02-20 14:52*/
public class Consumer {public static void main(String[] args) throws MQClientException {//1.创建消费者Consumer,制定消费者组DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");//2.指定NameServer地址consumer.setNamesrvAddr("192.168.40.147:9876");//3.订阅主题Topic和Tagconsumer.subscribe("FilterSQLTopic", MessageSelector.bySql("i>5"));//4.设置回调函数,处理消息consumer.registerMessageListener(new MessageListenerConcurrently() {//接受消息内容@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {for (MessageExt msg : msgs) {System.out.println("consumeThread=" + Thread.currentThread().getName() + "," + new String(msg.getBody()));}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});//5.启动消费者consumerconsumer.start();System.out.println("消费者启动");}
}
/*** 过滤消息* 使用tag 过滤 本质就是 消费者的tag要和生产者的tag一致* @author* @create 2022-02-19 20:26*/
public class Producer {public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
// 1.创建消息生产者producer ,并制定生产者组名DefaultMQProducer producer = new DefaultMQProducer("group1");
// 2.制定NameServer地址producer.setNamesrvAddr("192.168.40.147:9876");
// producer.setSendMsgTimeout(10000);
// 3.启动Producerproducer.start();
// 4.创建消息对象,指定主题Topic、Tag和消息体for (int i = 0; i < 10; i++) {/*** 参数一:消息主题Topic* 参数二:消息Tag* 参数三:消息内容*/Message message = new Message("FilterSQLTopic","Tag1",("渔阳+"+i).getBytes());message.putUserProperty("i",String.valueOf(i));//5.发送消息SendResult result = producer.send(message);//发送状态SendStatus status = result.getSendStatus();System.out.println("发送结果:" + result);//线程睡1秒TimeUnit.SECONDS.sleep(2);}
// 6.关闭生产者Producerproducer.shutdown();}}
tag模式
/*** 使用tag 过滤 本质就是 消费者的tag要和生产者的tag一致* 可以使用 "*" 代表所有tag* @author* @create 2022-02-20 14:52*/
public class Consumer {public static void main(String[] args) throws MQClientException {//1.创建消费者Consumer,制定消费者组DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");//2.指定NameServer地址consumer.setNamesrvAddr("192.168.40.147:9876");//3.订阅主题Topic和Tagconsumer.subscribe("FilterTagTopic","Tag1 || Tag2");//4.设置回调函数,处理消息consumer.registerMessageListener(new MessageListenerConcurrently() {//接受消息内容@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {for (MessageExt message : list) {System.out.println(new String(message.getBody()));}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});//5.启动消费者consumerconsumer.start();System.out.println("消费者启动");}
}
/*** 过滤消息* 使用tag 过滤 本质就是 消费者的tag要和生产者的tag一致* @author* @create 2022-02-19 20:26*/
public class Producer {public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
// 1.创建消息生产者producer ,并制定生产者组名DefaultMQProducer producer = new DefaultMQProducer("group1");
// 2.制定NameServer地址producer.setNamesrvAddr("192.168.40.147:9876");
// producer.setSendMsgTimeout(10000);
// 3.启动Producerproducer.start();//String [] tags = {"Tag1","Tag2","Tag3"};
// 4.创建消息对象,指定主题Topic、Tag和消息体for (int i = 0; i < 10; i++) {Message message = new Message("FilterTagTopic","Tag1",("渔阳+"+i).getBytes());//可以写正则表达式//Message message = new Message("FilterTagTopic",tags[i%tags.length],("渔阳+"+i).getBytes());
// 5.发送消息SendResult result = producer.send(message);//发送状态SendStatus status = result.getSendStatus();System.out.println("发送结果:"+result);System.out.println("=====================");
// System.out.printf("%s%n", result);}
// 6.关闭生产者Producerproducer.shutdown();}
}
事务消息
/*** 过滤消息* 使用tag 过滤 本质就是 消费者的tag要和生产者的tag一致* @author* @create 2022-02-19 20:26*/
public class Producer {public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
// 1.创建消息生产者producer ,并制定生产者组名TransactionMQProducer producer = new TransactionMQProducer("group5");
// 2.制定NameServer地址producer.setNamesrvAddr("192.168.40.147:9876");//添加事务监听器producer.setTransactionListener(new TransactionListener() {/*** 在该方法中执行本地事务* @param message* @param o* @return*/@Overridepublic LocalTransactionState executeLocalTransaction(Message message, Object o) {
// if (message.getTags().equals("TAGA")){if (StringUtils.equals("TAGA", message.getTags())) {return LocalTransactionState.COMMIT_MESSAGE;} else if (StringUtils.equals("TAGB", message.getTags())) {return LocalTransactionState.ROLLBACK_MESSAGE;} else if (StringUtils.equals("TAGC", message.getTags())) {return LocalTransactionState.UNKNOW;}return LocalTransactionState.UNKNOW;}/*** LocalTransactionState.UNKNOW* 该方法时MQ进行消息事务状态回查* @param messageExt* @return*/@Overridepublic LocalTransactionState checkLocalTransaction(MessageExt messageExt) {System.out.println("消息的Tag:" + messageExt.getTags());return LocalTransactionState.COMMIT_MESSAGE;}});// 3.启动Producerproducer.start();String[] tags={"TAGA","TAGB","TAGC"};
// 4.创建消息对象,指定主题Topic、Tag和消息体/*** 参数一:消息主题Topic* 参数二:消息Tag* 参数三:消息内容*/for (int i = 0; i < 3; i++) {Message message = new Message("TransactionTopic",tags[i],("渔阳+"+i).getBytes());// 5.发送消息SendResult result = producer.sendMessageInTransaction(message,null);//发送状态SendStatus status = result.getSendStatus();System.out.println("发送结果:"+result);System.out.println("=====================");
// System.out.printf("%s%n", result);}//6.关闭生产者producerproducer.shutdown();}
}
/*** 使用tag 过滤 本质就是 消费者的tag要和生产者的tag一致* 可以使用 "*" 代表所有tag* @author* @create 2022-02-20 14:52*/
public class Consumer {public static void main(String[] args) throws MQClientException {//1.创建消费者Consumer,制定消费者组DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");//2.指定NameServer地址consumer.setNamesrvAddr("192.168.40.147:9876");//3.订阅主题Topic和Tagconsumer.subscribe("TransactionTopic","*");//4.设置回调函数,处理消息consumer.registerMessageListener(new MessageListenerConcurrently() {//接受消息内容@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {for (MessageExt message : list) {System.out.println(new String(message.getBody()));}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});//5.启动消费者consumerconsumer.start();System.out.println("消费者启动");}
}
刷盘机制
RocketMQ的消息是存储到磁盘上的,这样既能保证断电后恢复, 又可以让存储的消息量超出内存的限制。RocketMQ为了提高性能,会尽可能地保证磁盘的顺序写。消息在通过Producer写入RocketMQ的时 候,有两种写磁盘方式,分布式同步刷盘和异步刷盘。
1)同步刷盘
在返回写成功状态时,消息已经被写入磁盘。具体流程是,消息写入内存的PAGECACHE后,立刻通知刷盘线程刷盘, 然后等待刷盘完成,刷盘线程执行完成后唤醒等待的线程,返回消息写 成功的状态。
2)异步刷盘
在返回写成功状态时,消息可能只是被写入了内存的PAGECACHE,写操作的返回快,吞吐量大;当内存里的消息量积累到一定程度时,统一触发写磁盘动作,快速写入。
####3)配置
同步刷盘还是异步刷盘,都是通过Broker配置文件里的flushDiskType 参数设置的,这个参数被配置成SYNC_FLUSH、ASYNC_FLUSH中的 一个。
RocketMQ生产者,消费者,Broker,NameServer高可用机制
Broker 高可用性:
主从复制
: 每个 Broker 可以配置多个副本(Replica),其中一个副本为主副本(Master),其余的为从副本(Slave)。主副本负责处理消息的写入和读取请求,从副本通过复制主副本的数据来实现数据的冗余备份。
数据持久化
: RocketMQ 使用磁盘持久化消息数据,确保即使在 Broker 重启或故障发生时,数据也能够安全地恢复。
NameServer 高可用性:
集群部署
: RocketMQ 支持多个 NameServer 的集群部署,客户端可以连接到任意一个可用的 NameServer,从而实现故障转移和负载均衡。
心跳检测
: NameServer 会定期发送心跳检测给 Broker 和其他 NameServer,以检测它们的健康状态。
消费者 天生就是一个高可用的保证
生产者 通过将broker搭建成双主双重的方式(投递消息时保证一定能投递成功)
消息存储(持久化机制)
存储介质
关系型数据库 DB
Apache下的ActiveMQ采用的就是JDBC的方式
文件系统
RocketMQ/Kafka/RabbitMQ均采用的是消息刷盘至所部署虚拟机/物理机的文件系统来做持久化(刷盘分为同步刷盘和异步刷盘两种方式)
RocketMQ写操作利用了顺序写的方式保证了消息存储的速度
读操作利用零拷贝 提高消息存盘和网络发送的速度
消息重试
顺序消息的重试
对于顺序消息,当消费者消费消息失败后,消息队列 RocketMQ 会自动不断进行消息重试(每次间隔时间为 1 秒),这时,应用会出现消息消费被阻塞的情况。因此,在使用顺序消息时,务必保证应用能够及时监控并处理消费失败的情况,避免阻塞现象的发生。
无序消息的重试
对于无序消息(普通、定时、延时、事务消息),当消费者消费消息失败时,您可以通过设置返回状态达到消息重试的结果。
无序消息的重试只针对集群消费方式生效;广播方式不提供失败重试特性,即消费失败后,失败消息不再重试,继续消费新的消息。
1)重试次数
消息队列 RocketMQ 默认允许每条消息最多重试 16 次,每次重试的间隔时间如下:
死信队列
当一条消息初次消费失败,消息队列 RocketMQ 会自动进行消息重试;达到最大重试次数后,若消费依然失败,则表明消费者在正常情况下无法正确地消费该消息,此时,消息队列 RocketMQ 不会立刻将消息丢弃,而是将其发送到该消费者对应的特殊队列中。
在消息队列 RocketMQ 中,这种正常情况下无法被消费的消息称为死信消息(Dead-Letter Message),存储死信消息的特殊队列称为死信队列(Dead-Letter Queue)。
死信特性
死信消息具有以下特性
- 不会再被消费者正常消费。
- 有效期与正常消息相同,均为 3 天,3 天后会被自动删除。因此,请在死信消息产生后的 3 天内及时处理。
死信队列具有以下特性:
- 一个死信队列对应一个 Group ID, 而不是对应单个消费者实例。
- 如果一个 Group ID 未产生死信消息,消息队列 RocketMQ 不会为其创建相应的死信队列。
- 一个死信队列包含了对应 Group ID 产生的所有死信消息,不论该消息属于哪个 Topic。
消费幂等
消息队列 RocketMQ 消费者在接收到消息以后,有必要根据业务上的唯一 Key 对消息做幂等处理的必要性。
消费幂等的必要性
在互联网应用中,尤其在网络不稳定的情况下,消息队列 RocketMQ 的消息有可能会出现重复,这个重复简单可以概括为以下情况:
-
发送时消息重复
当一条消息已被成功发送到服务端并完成持久化,此时出现了网络闪断或者客户端宕机,导致服务端对客户端应答失败。 如果此时生产者意识到消息发送失败并尝试再次发送消息,消费者后续会收到两条内容相同并且 Message ID 也相同的消息。
-
投递时消息重复
消息消费的场景下,消息已投递到消费者并完成业务处理,当客户端给服务端反馈应答的时候网络闪断。 为了保证消息至少被消费一次,消息队列 RocketMQ 的服务端将在网络恢复后再次尝试投递之前已被处理过的消息,消费者后续会收到两条内容相同并且 Message ID 也相同的消息。
-
负载均衡时消息重复(包括但不限于网络抖动、Broker 重启以及订阅方应用重启)
当消息队列 RocketMQ 的 Broker 或客户端重启、扩容或缩容时,会触发 Rebalance,此时消费者可能会收到重复消息。
处理方式
因为 Message ID 有可能出现冲突(重复)的情况,所以真正安全的幂等处理,不建议以 Message ID 作为处理依据。 最好的方式是以业务唯一标识作为幂等处理的关键依据
ROCKETMQ事务消息
事务消息默认补偿15次
事务消息大致流程
生产者发送一个半消息,然后执行本地事务,成功进行提交让消费者消费,反之回滚
SpringBoot整合RocketMQ
将rocketmq-spring安装到本地仓库
mvn install -Dmaven.skip.test=true
<!--RocketMQ--><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>${rocketmq-spring-boot-starter-version}</version></dependency>
生产者配置文件
# application.yamlrocketmq:consumer:group: springboot_consumer_group# 一次拉取消息最大值,注意是拉取消息的最大值而非消费最大值pull-batch-size: 10name-server: 192.175.25.135:9876;192.175.25.138:9876producer:# 发送同一类消息的设置为同一个group,保证唯一group: my-group# 发送消息超时时间,默认3000sendMessageTimeout: 10000# 发送消息失败重试次数,默认2retryTimesWhenSendFailed: 2# 异步消息重试此处,默认2retryTimesWhenSendAsyncFailed: 2# 消息最大长度,默认1024 * 1024 * 4(默认4M)maxMessageSize: 4096# 压缩消息阈值,默认4k(1024 * 4)compressMessageBodyThreshold: 4096# 是否在内部发送失败时重试另一个broker,默认falseretryNextServer: false
测试类
@Autowiredprivate RocketMQTemplate rocketMQTemplate;@Testpublic void test1(){rocketMQTemplate.convertAndSend("springboot-mq","hello springboot rocketmq");//Message<String> msg = MessageBuilder.withPayload("Hello,RocketMQ").build();//SendResult sendResult = rocketMQTemplate.send(topic, msg);}
消费者同消息生产者
消费者同消息生产者配置文件
测试类
@Slf4j
@Component
@RocketMQMessageListener(topic = "springboot-mq",consumerGroup = "springboot-mq-consumer-1")
public class Consumer implements RocketMQListener<String> {@Overridepublic void onMessage(String message) {log.info("Receive message:"+message);}
}
实际生产中
@Overridepublic Result confirmOrder(TradeOrder order) {//1.校验订单//2.生成预订单try {//3.扣减库存//4.扣减优惠券//5.使用余额//6.确认订单//7.返回成功状态return new Result(Success,Code);} catch (Exception e) {//1.确认订单失败,发送消息MQEntity mqEntity = new MQEntity();mqEntity.setOrderId(orderId);mqEntity.setUserId(order.getUserId());mqEntity.setUserMoney(order.getMoneyPaid());mqEntity.setGoodsId(order.getGoodsId());mqEntity.setGoodsNum(order.getGoodsNumber());mqEntity.setCouponId(order.getCouponId());//2.返回订单确认失败消息try {sendCancelOrder(topic,tag,order.getOrderId().toString(), JSON.toJSONString(mqEntity));} catch (Exception e1) {e1.printStackTrace();}return new Result(Fail,Code);}}public class MQEntity {private Long orderId;private Long couponId;private Long userId;private BigDecimal userMoney;private Long goodsId;private Integer goodsNum;//set get}/*** 发送订单确认失败消息* @param topic* @param tag* @param keys* @param body*/private void sendCancelOrder(String topic, String tag, String keys, String body) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {//判断Topic是否为空if (StringUtils.isEmpty(topic)) {CastException.cast(ShopCode.SHOP_MQ_TOPIC_IS_EMPTY);}//判断消息内容是否为空if (StringUtils.isEmpty(body)) {CastException.cast(ShopCode.SHOP_MQ_MESSAGE_BODY_IS_EMPTY);}//消息体Message message = new Message(topic,tag,keys,body.getBytes());rocketMQTemplate.getProducer().send(message);}
消费者
rocketmq.name-server=192.168.25.135:9876;192.168.25.138:9876
mq.order.consumer.group.name=order_orderTopic_cancel_group
mq.order.topic=orderTopic
@Slf4j
@Component
@RocketMQMessageListener(topic = "${mq.pay.topic}",consumerGroup = "${mq.pay.consumer.group.name}",messageModel = MessageModel.BROADCASTING)
public class CancelOrderConsumer implements RocketMQListener<MessageExt>{@Overridepublic void onMessage(MessageExt messageExt) {try {//1.解析消息内容String body = new String(messageExt.getBody(),"UTF-8");//TradePay tradePay = JSON.parseObject(body,TradePay.class);MQEntity mqEntity = JSON.parseObject(body, MQEntity.class);//2.业务内容} catch (UnsupportedEncodingException e) {e.printStackTrace();}}
}
线程池方式优化
executorService.submit(new Runnable() {@Overridepublic void run() {try {SendResult sendResult = sendMessage(topic, tag, finalTradePay.getPayId(), JSON.toJSONString(finalTradePay));log.info(JSON.toJSONString(sendResult));if (SendStatus.SEND_OK.equals(sendResult.getSendStatus())) {mqProducerTempMapper.deleteByPrimaryKey(mqProducerTemp.getId());System.out.println("删除消息表成功");}} catch (Exception e) {e.printStackTrace();}}});} else {CastException.cast(ShopCode.SHOP_PAYMENT_IS_PAID);}}
@Bean
public ThreadPoolTaskExecutor getThreadPool() {ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();executor.setCorePoolSize(4);executor.setMaxPoolSize(8);executor.setQueueCapacity(100);executor.setKeepAliveSeconds(60);executor.setThreadNamePrefix("Pool-A");executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());executor.initialize();return executor;}
雪花算法
时间搓+机器号+序列号
时间戳: 雪花算法的64位ID中,首先占用了41位用于存储时间戳。这个时间戳记录了生成ID的时间,精确到毫秒级。
机器ID: 接下来的10位用于存储机器的唯一ID。在分布式系统中,不同机器需要有唯一的标识符,以避免产生重复的ID。
序列号: 最后的13位用于存储序列号。在同一毫秒内,可以生成多个ID。序列号是递增的,可以用来解决同一毫秒内生成多个ID时的冲突问题。
雪花算法的特点:
可以在分布式环境中生成唯一ID,避免了重复ID的问题。
生成的ID是递增的,有序的,方便数据库索引和查询。
生成ID的速度很快,基本上可以达到每毫秒生成数万个ID。
雪花算法的实现相对简单,不需要依赖其他外部组件。
需要注意的是,雪花算法并不保证绝对的全局唯一性,因为时钟回拨、机器ID重用等问题都可能导致ID重复。在实际使用中,应该根据具体情况来进行适当的配置和处理,以确保ID的唯一性。另外,随着分布式系统的扩展,机器ID的分配和管理也需要注意,避免冲突和重复。