RocketMQ使用

RocketMQ角色
在这里插入图片描述

RocketMQ的基本概念
在这里插入图片描述

同步发送消息

/*** @author* @create 2023-04-08 17:24* 发送同步消息* 适用于重要的消息 例如消息通知,短信通知*/
public class SyncProducer {public static void main(String[] args) throws Exception {String topic="testRocketMQ";String tag="Tag1";String body="Hello RocketMQ";//创建消息生产者Producer,并制订生产者组名DefaultMQProducer producer = new DefaultMQProducer("group1");//指定NameServer地址producer.setNamesrvAddr("localhost:9876");//启动producerproducer.start();//创建消息对象,指定主题topic,tag和消息体for (int i = 0; i < 10; i++) {/*** 主题 topic* 消息的tag* 消息内容*/Message message = new Message("testRocketMQ","Tag1",("Hello RocketMQ"+i).getBytes(RemotingHelper.DEFAULT_CHARSET));//发送消息SendResult result = producer.send(message,1000);//发送状态SendStatus status = result.getSendStatus();System.out.println("发送状态->"+status);String msgId = result.getMsgId();System.out.println("消息id->"+msgId);int queueId = result.getMessageQueue().getQueueId();System.out.println("消息接收队列id"+queueId);TimeUnit.SECONDS.sleep(1);//线程休息1s}//关闭生产者producerproducer.shutdown();}
}

异步发送

/*** @author* @create 2023-04-08 19:52* 发送异步消息,异步监听,通过回调接口的方式接受服务端的响应* 适用于数据量较大,不能容忍阻塞场景的*/
public class ASyncProducer {public static void main(String[] args) throws Exception {
//创建消息生产者Producer,并制订生产者组名DefaultMQProducer producer = new DefaultMQProducer("group1");//指定NameServer地址producer.setNamesrvAddr("localhost:9876");//启动producerproducer.start();//创建消息对象,指定主题topic,tag和消息体for (int i = 0; i < 10; i++) {/*** 主题 topic* 消息的tag* 消息内容*/Message message = new Message("testRocketMQ","Tag1",("Hello RocketMQ"+i).getBytes(RemotingHelper.DEFAULT_CHARSET));int index=i;//发送消息producer.send(message, new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {System.out.printf("%-10d OK %s %n", index,sendResult.getMsgId());System.out.println("发送成功"+sendResult);}@Overridepublic void onException(Throwable throwable) {System.out.printf("%-10d Exception %s %n", index, throwable);}});TimeUnit.SECONDS.sleep(1);//线程休息1s}//关闭生产者producerproducer.shutdown();}}

单向发送

/*** 单向发送,只管发送,不管响应* 例如日志收集场景* @author* @create 2023-04-08 20:22*/
public class OneWayProducer {public static void main(String[] args) throws Exception {//创建消息生产者Producer,并制订生产者组名DefaultMQProducer producer = new DefaultMQProducer("group1");//指定NameServer地址producer.setNamesrvAddr("localhost:9876");//启动producerproducer.start();//创建消息对象,指定主题topic,tag和消息体for (int i = 0; i < 10; i++) {/*** 主题 topic* 消息的tag* 消息内容*/Message message = new Message("testRocketMQ","Tag1",("Hello RocketMQ"+i).getBytes(RemotingHelper.DEFAULT_CHARSET));int index=i;//发送消息producer.sendOneway(message);TimeUnit.SECONDS.sleep(1);//线程休息1s}//关闭生产者producerproducer.shutdown();}
}

集群消费 /负载均衡模式消费

/*** @author* @create 2023-04-08 20:29*/
public class Consumer {public static void main(String[] args) throws Exception {// 实例化消息生产者,指定组名DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");// 指定Namesrv地址信息.consumer.setNamesrvAddr("localhost:9876");// 订阅Topicconsumer.subscribe("testRocketMQ","*");
//        consumer.subscribe("base","Tag1 || Tag2 || Tag3");
//        consumer.subscribe("testRocketMQ","Tag1");//负载均衡模式消费  集群模式消费consumer.setMessageModel(MessageModel.CLUSTERING);
//        consumer.setMessageModel(MessageModel.BROADCASTING);//广播模式// 注册回调函数,处理消息consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {for (MessageExt ext : list) {String str = new String(ext.getBody());System.out.println(str);}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();}
}

广播模式

        consumer.setMessageModel(MessageModel.BROADCASTING);//广播模式

顺序消息的生产与消费

/*** 顺序消息*/
public class Producer {public static void main(String[] args) throws Exception {
//        1.创建消息生产者producer ,并制定生产者组名DefaultMQProducer producer = new DefaultMQProducer("group1");
//        2.制定NameServer地址producer.setNamesrvAddr("192.168.40.147:9876");
//        producer.setSendMsgTimeout(10000);
//        3.启动Producerproducer.start();List<OrderStep> orderSteps = OrderStep.buildOrders();for (int i=0;i<=orderSteps.size()+11;i++) {//4.创建消息Message message = new Message("OrderTopic","Order",String.valueOf(orderSteps.get(i)).getBytes());/*** 参数一 消息对象* 参数二 消息队列的选择器* 参数三 选择队列的业务标识(订单ID)*///5.发送消息SendResult sendResult = producer.send(message, new MessageQueueSelector() {/*** @param list  队列集合* @param message  消息对象* @param obj  业务标识参数* @return*/@Overridepublic MessageQueue select(List<MessageQueue> list, Message message, Object obj) {long orderId = (long) obj;int size = list.size();int index = (int) (orderId % size);return list.get(index);}}, orderSteps.get(i).getOrderId());  //orderSteps.get(i).getOrderId() 订单ID}
//        6.关闭生产者Producerproducer.shutdown();}
}
/*** 顺序消息*/
public class Consumer {public static void main(String[] args) throws MQClientException {//1.创建消费者Consumer,制定消费者组名DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");//2.指定Nameserver地址consumer.setNamesrvAddr("192.168.40.147:9876");//3.订阅主题Topic和Tagconsumer.subscribe("OrderTopic","*");//4.注册消息监听器consumer.registerMessageListener(new MessageListenerOrderly() {//使用监听器消费消息@Overridepublic ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) {for (MessageExt message : list) {System.out.println("消息内容"+new String(message.getBody()));}return ConsumeOrderlyStatus.SUCCESS;}});//5.启动消费者consumer.start();System.out.println("消费者启动");}
}
/*** 订单构建者*/
public class OrderStep {private long orderId;private String desc;public long getOrderId() {return orderId;}public void setOrderId(long orderId) {this.orderId = orderId;}public String getDesc() {return desc;}public void setDesc(String desc) {this.desc = desc;}@Overridepublic String toString() {return "OrderStep{" +"orderId=" + orderId +", desc='" + desc + '\'' +'}';}public static List<OrderStep> buildOrders() {//  1039L   : 创建    付款 推送 完成//  1065L   : 创建   付款//  7235L   :创建    付款List<OrderStep> orderList = new ArrayList<OrderStep>();OrderStep orderDemo = new OrderStep();orderDemo.setOrderId(1039L);orderDemo.setDesc("创建");orderList.add(orderDemo);orderDemo = new OrderStep();orderDemo.setOrderId(1065L);orderDemo.setDesc("创建");orderList.add(orderDemo);orderDemo = new OrderStep();orderDemo.setOrderId(1039L);orderDemo.setDesc("付款");orderList.add(orderDemo);orderDemo = new OrderStep();orderDemo.setOrderId(7235L);orderDemo.setDesc("创建");orderList.add(orderDemo);orderDemo = new OrderStep();orderDemo.setOrderId(1065L);orderDemo.setDesc("付款");orderList.add(orderDemo);orderDemo = new OrderStep();orderDemo.setOrderId(7235L);orderDemo.setDesc("付款");orderList.add(orderDemo);orderDemo = new OrderStep();orderDemo.setOrderId(1065L);orderDemo.setDesc("完成");orderList.add(orderDemo);orderDemo = new OrderStep();orderDemo.setOrderId(1039L);orderDemo.setDesc("推送");orderList.add(orderDemo);orderDemo = new OrderStep();orderDemo.setOrderId(7235L);orderDemo.setDesc("完成");orderList.add(orderDemo);orderDemo = new OrderStep();orderDemo.setOrderId(1039L);orderDemo.setDesc("完成");orderList.add(orderDemo);return orderList;}
}

延时消息

/*** 延时消息* @author* @create 2022-02-19 23:46*/
public class Consumer {public static void main(String[] args) throws MQClientException {//1.创建消费者Consumer,制定消费者组DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");//2.指定NameServer地址consumer.setNamesrvAddr("192.168.40.147:9876");//3.订阅主题Topic和Tagconsumer.subscribe("DelayTopic","*");//4.设置回调函数,处理消息consumer.registerMessageListener(new MessageListenerConcurrently() {//接受消息内容@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {for (MessageExt message : list) {
//                    System.out.println(new String(message.getBody()));// Print approximate delay time periodSystem.out.println("Receive message[msgId=" + message.getMsgId() + "] " + (System.currentTimeMillis() - message.getStoreTimestamp()) + "ms later");}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});//5.启动消费者consumerconsumer.start();System.out.println("消费者启动");}
}
/*** 延时消息* @author* @create 2022-02-19 22:52*/
public class Producer {public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
//        1.创建消息生产者producer ,并制定生产者组名DefaultMQProducer producer = new DefaultMQProducer("group1");
//        2.制定NameServer地址producer.setNamesrvAddr("192.168.40.147:9876");
//        3.启动Producerproducer.start();
//        4.创建消息对象,指定主题Topic、Tag和消息体for (int i = 0; i < 10; i++) {/*** 消息主题 topic* 消息tag* 消息内容*/Message message = new Message("DelayTopic","Tag2",("渔阳+"+i).getBytes());//延时级别   设置延时时间   //从1到18 依次设定message.setDelayTimeLevel(2);//        5.发送消息producer.send(message);System.out.println("=====================");
//            System.out.printf("%s%n", result);TimeUnit.SECONDS.sleep(1);  //线程休眠}
//        6.关闭生产者Producerproducer.shutdown();}
}

批量消息

/*** 批量消息* @author* @create 2022-02-19 23:46*/
public class Consumer {public static void main(String[] args) throws MQClientException {//1.创建消费者Consumer,制定消费者组DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");//2.指定NameServer地址consumer.setNamesrvAddr("192.168.40.147:9876");//3.订阅主题Topic和Tagconsumer.subscribe("BatchTest","*");//4.设置回调函数,处理消息consumer.registerMessageListener(new MessageListenerConcurrently() {//接受消息内容@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {for (MessageExt message : list) {
//                    System.out.println(new String(message.getBody()));// Print approximate delay time periodSystem.out.println("Receive message[msgId=" + message.getMsgId() + "] " + (System.currentTimeMillis() - message.getStoreTimestamp()) + "ms later");}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});//5.启动消费者consumerconsumer.start();System.out.println("消费者启动");}
}
/*** 批量消息* 注意点:每次只发送不超过4MB的消息* 如果消息的总长度可能超过4MB时,这时候最好把消息进行分割* @author* @create 2022-02-19 22:52*/
public class Producer {public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
//        1.创建消息生产者producer ,并制定生产者组名DefaultMQProducer producer = new DefaultMQProducer("group1");
//        2.制定NameServer地址producer.setNamesrvAddr("192.168.40.147:9876");
//        3.启动Producerproducer.start();
//        4.创建消息对象,指定主题Topic、Tag和消息体String topic = "BatchTest";List<Message> messageList = new ArrayList<>();messageList.add(new Message(topic, "TagA", "OrderID001", "Hello world 0".getBytes()));messageList.add(new Message(topic, "TagA", "OrderID002", "Hello world 1".getBytes()));messageList.add(new Message(topic, "TagA", "OrderID003", "Hello world 2".getBytes()));try {//5.发生消息   //批量producer.send(messageList);} catch (Exception e) {e.printStackTrace();//处理error}
//        6.关闭生产者Producerproducer.shutdown();System.out.println("over");}
}

过滤消息
sql模式

/*** 使用tag  过滤  本质就是  消费者的tag要和生产者的tag一致* 可以使用 "*"  代表所有tag* @author* @create 2022-02-20 14:52*/
public class Consumer {public static void main(String[] args) throws MQClientException {//1.创建消费者Consumer,制定消费者组DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");//2.指定NameServer地址consumer.setNamesrvAddr("192.168.40.147:9876");//3.订阅主题Topic和Tagconsumer.subscribe("FilterSQLTopic", MessageSelector.bySql("i>5"));//4.设置回调函数,处理消息consumer.registerMessageListener(new MessageListenerConcurrently() {//接受消息内容@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {for (MessageExt msg : msgs) {System.out.println("consumeThread=" + Thread.currentThread().getName() + "," + new String(msg.getBody()));}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});//5.启动消费者consumerconsumer.start();System.out.println("消费者启动");}
}
/*** 过滤消息* 使用tag  过滤  本质就是  消费者的tag要和生产者的tag一致* @author* @create 2022-02-19 20:26*/
public class Producer {public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
//        1.创建消息生产者producer ,并制定生产者组名DefaultMQProducer producer = new DefaultMQProducer("group1");
//        2.制定NameServer地址producer.setNamesrvAddr("192.168.40.147:9876");
//        producer.setSendMsgTimeout(10000);
//        3.启动Producerproducer.start();
//        4.创建消息对象,指定主题Topic、Tag和消息体for (int i = 0; i < 10; i++) {/*** 参数一:消息主题Topic* 参数二:消息Tag* 参数三:消息内容*/Message message = new Message("FilterSQLTopic","Tag1",("渔阳+"+i).getBytes());message.putUserProperty("i",String.valueOf(i));//5.发送消息SendResult result = producer.send(message);//发送状态SendStatus status = result.getSendStatus();System.out.println("发送结果:" + result);//线程睡1秒TimeUnit.SECONDS.sleep(2);}
//        6.关闭生产者Producerproducer.shutdown();}}

tag模式

/*** 使用tag  过滤  本质就是  消费者的tag要和生产者的tag一致* 可以使用 "*"  代表所有tag* @author* @create 2022-02-20 14:52*/
public class Consumer {public static void main(String[] args) throws MQClientException {//1.创建消费者Consumer,制定消费者组DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");//2.指定NameServer地址consumer.setNamesrvAddr("192.168.40.147:9876");//3.订阅主题Topic和Tagconsumer.subscribe("FilterTagTopic","Tag1  || Tag2");//4.设置回调函数,处理消息consumer.registerMessageListener(new MessageListenerConcurrently() {//接受消息内容@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {for (MessageExt message : list) {System.out.println(new String(message.getBody()));}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});//5.启动消费者consumerconsumer.start();System.out.println("消费者启动");}
}
/*** 过滤消息* 使用tag  过滤  本质就是  消费者的tag要和生产者的tag一致* @author* @create 2022-02-19 20:26*/
public class Producer {public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
//        1.创建消息生产者producer ,并制定生产者组名DefaultMQProducer producer = new DefaultMQProducer("group1");
//        2.制定NameServer地址producer.setNamesrvAddr("192.168.40.147:9876");
//        producer.setSendMsgTimeout(10000);
//        3.启动Producerproducer.start();//String [] tags = {"Tag1","Tag2","Tag3"};
//        4.创建消息对象,指定主题Topic、Tag和消息体for (int i = 0; i < 10; i++) {Message message = new Message("FilterTagTopic","Tag1",("渔阳+"+i).getBytes());//可以写正则表达式//Message message = new Message("FilterTagTopic",tags[i%tags.length],("渔阳+"+i).getBytes());
//        5.发送消息SendResult result = producer.send(message);//发送状态SendStatus status = result.getSendStatus();System.out.println("发送结果:"+result);System.out.println("=====================");
//            System.out.printf("%s%n", result);}
//        6.关闭生产者Producerproducer.shutdown();}
}

事务消息

/*** 过滤消息* 使用tag  过滤  本质就是  消费者的tag要和生产者的tag一致* @author* @create 2022-02-19 20:26*/
public class Producer {public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
//        1.创建消息生产者producer ,并制定生产者组名TransactionMQProducer producer = new TransactionMQProducer("group5");
//        2.制定NameServer地址producer.setNamesrvAddr("192.168.40.147:9876");//添加事务监听器producer.setTransactionListener(new TransactionListener() {/*** 在该方法中执行本地事务* @param message* @param o* @return*/@Overridepublic LocalTransactionState executeLocalTransaction(Message message, Object o) {
//                if (message.getTags().equals("TAGA")){if (StringUtils.equals("TAGA", message.getTags())) {return LocalTransactionState.COMMIT_MESSAGE;} else if (StringUtils.equals("TAGB", message.getTags())) {return LocalTransactionState.ROLLBACK_MESSAGE;} else if (StringUtils.equals("TAGC", message.getTags())) {return LocalTransactionState.UNKNOW;}return LocalTransactionState.UNKNOW;}/*** LocalTransactionState.UNKNOW* 该方法时MQ进行消息事务状态回查* @param messageExt* @return*/@Overridepublic LocalTransactionState checkLocalTransaction(MessageExt messageExt) {System.out.println("消息的Tag:" + messageExt.getTags());return LocalTransactionState.COMMIT_MESSAGE;}});//        3.启动Producerproducer.start();String[] tags={"TAGA","TAGB","TAGC"};
//        4.创建消息对象,指定主题Topic、Tag和消息体/*** 参数一:消息主题Topic* 参数二:消息Tag* 参数三:消息内容*/for (int i = 0; i < 3; i++) {Message message = new Message("TransactionTopic",tags[i],("渔阳+"+i).getBytes());//        5.发送消息SendResult result = producer.sendMessageInTransaction(message,null);//发送状态SendStatus status = result.getSendStatus();System.out.println("发送结果:"+result);System.out.println("=====================");
//            System.out.printf("%s%n", result);}//6.关闭生产者producerproducer.shutdown();}
}
/*** 使用tag  过滤  本质就是  消费者的tag要和生产者的tag一致* 可以使用 "*"  代表所有tag* @author* @create 2022-02-20 14:52*/
public class Consumer {public static void main(String[] args) throws MQClientException {//1.创建消费者Consumer,制定消费者组DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");//2.指定NameServer地址consumer.setNamesrvAddr("192.168.40.147:9876");//3.订阅主题Topic和Tagconsumer.subscribe("TransactionTopic","*");//4.设置回调函数,处理消息consumer.registerMessageListener(new MessageListenerConcurrently() {//接受消息内容@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {for (MessageExt message : list) {System.out.println(new String(message.getBody()));}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});//5.启动消费者consumerconsumer.start();System.out.println("消费者启动");}
}

刷盘机制
RocketMQ的消息是存储到磁盘上的,这样既能保证断电后恢复, 又可以让存储的消息量超出内存的限制。RocketMQ为了提高性能,会尽可能地保证磁盘的顺序写。消息在通过Producer写入RocketMQ的时 候,有两种写磁盘方式,分布式同步刷盘和异步刷盘。
在这里插入图片描述

1)同步刷盘

在返回写成功状态时,消息已经被写入磁盘。具体流程是,消息写入内存的PAGECACHE后,立刻通知刷盘线程刷盘, 然后等待刷盘完成,刷盘线程执行完成后唤醒等待的线程,返回消息写 成功的状态。

2)异步刷盘

在返回写成功状态时,消息可能只是被写入了内存的PAGECACHE,写操作的返回快,吞吐量大;当内存里的消息量积累到一定程度时,统一触发写磁盘动作,快速写入。

####3)配置

同步刷盘还是异步刷盘,都是通过Broker配置文件里的flushDiskType 参数设置的,这个参数被配置成SYNC_FLUSH、ASYNC_FLUSH中的 一个。

RocketMQ生产者,消费者,Broker,NameServer高可用机制
Broker 高可用性:

主从复制: 每个 Broker 可以配置多个副本(Replica),其中一个副本为主副本(Master),其余的为从副本(Slave)。主副本负责处理消息的写入和读取请求,从副本通过复制主副本的数据来实现数据的冗余备份。
数据持久化: RocketMQ 使用磁盘持久化消息数据,确保即使在 Broker 重启或故障发生时,数据也能够安全地恢复。

NameServer 高可用性:

集群部署: RocketMQ 支持多个 NameServer 的集群部署,客户端可以连接到任意一个可用的 NameServer,从而实现故障转移和负载均衡。

心跳检测: NameServer 会定期发送心跳检测给 Broker 和其他 NameServer,以检测它们的健康状态。

消费者 天生就是一个高可用的保证
生产者 通过将broker搭建成双主双重的方式(投递消息时保证一定能投递成功)

消息存储(持久化机制)
存储介质
关系型数据库 DB
Apache下的ActiveMQ采用的就是JDBC的方式
文件系统
RocketMQ/Kafka/RabbitMQ均采用的是消息刷盘至所部署虚拟机/物理机的文件系统来做持久化(刷盘分为同步刷盘和异步刷盘两种方式)

RocketMQ写操作利用了顺序写的方式保证了消息存储的速度
读操作利用零拷贝 提高消息存盘和网络发送的速度

消息重试

顺序消息的重试

对于顺序消息,当消费者消费消息失败后,消息队列 RocketMQ 会自动不断进行消息重试(每次间隔时间为 1 秒),这时,应用会出现消息消费被阻塞的情况。因此,在使用顺序消息时,务必保证应用能够及时监控并处理消费失败的情况,避免阻塞现象的发生。

无序消息的重试

对于无序消息(普通、定时、延时、事务消息),当消费者消费消息失败时,您可以通过设置返回状态达到消息重试的结果。

无序消息的重试只针对集群消费方式生效;广播方式不提供失败重试特性,即消费失败后,失败消息不再重试,继续消费新的消息。

1)重试次数

消息队列 RocketMQ 默认允许每条消息最多重试 16 次,每次重试的间隔时间如下:

死信队列

当一条消息初次消费失败,消息队列 RocketMQ 会自动进行消息重试;达到最大重试次数后,若消费依然失败,则表明消费者在正常情况下无法正确地消费该消息,此时,消息队列 RocketMQ 不会立刻将消息丢弃,而是将其发送到该消费者对应的特殊队列中。

在消息队列 RocketMQ 中,这种正常情况下无法被消费的消息称为死信消息(Dead-Letter Message),存储死信消息的特殊队列称为死信队列(Dead-Letter Queue)。

死信特性

死信消息具有以下特性

  • 不会再被消费者正常消费。
  • 有效期与正常消息相同,均为 3 天,3 天后会被自动删除。因此,请在死信消息产生后的 3 天内及时处理。

死信队列具有以下特性:

  • 一个死信队列对应一个 Group ID, 而不是对应单个消费者实例。
  • 如果一个 Group ID 未产生死信消息,消息队列 RocketMQ 不会为其创建相应的死信队列。
  • 一个死信队列包含了对应 Group ID 产生的所有死信消息,不论该消息属于哪个 Topic。

消费幂等

消息队列 RocketMQ 消费者在接收到消息以后,有必要根据业务上的唯一 Key 对消息做幂等处理的必要性。

消费幂等的必要性

在互联网应用中,尤其在网络不稳定的情况下,消息队列 RocketMQ 的消息有可能会出现重复,这个重复简单可以概括为以下情况:

  • 发送时消息重复

    当一条消息已被成功发送到服务端并完成持久化,此时出现了网络闪断或者客户端宕机,导致服务端对客户端应答失败。 如果此时生产者意识到消息发送失败并尝试再次发送消息,消费者后续会收到两条内容相同并且 Message ID 也相同的消息。

  • 投递时消息重复

    消息消费的场景下,消息已投递到消费者并完成业务处理,当客户端给服务端反馈应答的时候网络闪断。 为了保证消息至少被消费一次,消息队列 RocketMQ 的服务端将在网络恢复后再次尝试投递之前已被处理过的消息,消费者后续会收到两条内容相同并且 Message ID 也相同的消息。

  • 负载均衡时消息重复(包括但不限于网络抖动、Broker 重启以及订阅方应用重启)

    当消息队列 RocketMQ 的 Broker 或客户端重启、扩容或缩容时,会触发 Rebalance,此时消费者可能会收到重复消息。

处理方式

因为 Message ID 有可能出现冲突(重复)的情况,所以真正安全的幂等处理,不建议以 Message ID 作为处理依据。 最好的方式是以业务唯一标识作为幂等处理的关键依据

ROCKETMQ事务消息

事务消息默认补偿15次

事务消息大致流程
生产者发送一个半消息,然后执行本地事务,成功进行提交让消费者消费,反之回滚

SpringBoot整合RocketMQ

将rocketmq-spring安装到本地仓库

mvn install -Dmaven.skip.test=true
        <!--RocketMQ--><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>${rocketmq-spring-boot-starter-version}</version></dependency>

生产者配置文件

# application.yamlrocketmq:consumer:group: springboot_consumer_group# 一次拉取消息最大值,注意是拉取消息的最大值而非消费最大值pull-batch-size: 10name-server: 192.175.25.135:9876;192.175.25.138:9876producer:# 发送同一类消息的设置为同一个group,保证唯一group: my-group# 发送消息超时时间,默认3000sendMessageTimeout: 10000# 发送消息失败重试次数,默认2retryTimesWhenSendFailed: 2# 异步消息重试此处,默认2retryTimesWhenSendAsyncFailed: 2# 消息最大长度,默认1024 * 1024 * 4(默认4M)maxMessageSize: 4096# 压缩消息阈值,默认4k(1024 * 4)compressMessageBodyThreshold: 4096# 是否在内部发送失败时重试另一个broker,默认falseretryNextServer: false

测试类

    @Autowiredprivate RocketMQTemplate rocketMQTemplate;@Testpublic void test1(){rocketMQTemplate.convertAndSend("springboot-mq","hello springboot rocketmq");//Message<String> msg = MessageBuilder.withPayload("Hello,RocketMQ").build();//SendResult sendResult = rocketMQTemplate.send(topic, msg);}

消费者同消息生产者
消费者同消息生产者配置文件
测试类

@Slf4j
@Component
@RocketMQMessageListener(topic = "springboot-mq",consumerGroup = "springboot-mq-consumer-1")
public class Consumer implements RocketMQListener<String> {@Overridepublic void onMessage(String message) {log.info("Receive message:"+message);}
}

实际生产中

	@Overridepublic Result confirmOrder(TradeOrder order) {//1.校验订单//2.生成预订单try {//3.扣减库存//4.扣减优惠券//5.使用余额//6.确认订单//7.返回成功状态return new Result(Success,Code);} catch (Exception e) {//1.确认订单失败,发送消息MQEntity mqEntity = new MQEntity();mqEntity.setOrderId(orderId);mqEntity.setUserId(order.getUserId());mqEntity.setUserMoney(order.getMoneyPaid());mqEntity.setGoodsId(order.getGoodsId());mqEntity.setGoodsNum(order.getGoodsNumber());mqEntity.setCouponId(order.getCouponId());//2.返回订单确认失败消息try {sendCancelOrder(topic,tag,order.getOrderId().toString(), JSON.toJSONString(mqEntity));} catch (Exception e1) {e1.printStackTrace();}return new Result(Fail,Code);}}public class MQEntity {private Long orderId;private Long couponId;private Long userId;private BigDecimal userMoney;private Long goodsId;private Integer goodsNum;//set  get}/*** 发送订单确认失败消息* @param topic* @param tag* @param keys* @param body*/private void sendCancelOrder(String topic, String tag, String keys, String body) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {//判断Topic是否为空if (StringUtils.isEmpty(topic)) {CastException.cast(ShopCode.SHOP_MQ_TOPIC_IS_EMPTY);}//判断消息内容是否为空if (StringUtils.isEmpty(body)) {CastException.cast(ShopCode.SHOP_MQ_MESSAGE_BODY_IS_EMPTY);}//消息体Message message = new Message(topic,tag,keys,body.getBytes());rocketMQTemplate.getProducer().send(message);}

消费者

rocketmq.name-server=192.168.25.135:9876;192.168.25.138:9876
mq.order.consumer.group.name=order_orderTopic_cancel_group
mq.order.topic=orderTopic
@Slf4j
@Component
@RocketMQMessageListener(topic = "${mq.pay.topic}",consumerGroup = "${mq.pay.consumer.group.name}",messageModel = MessageModel.BROADCASTING)
public class CancelOrderConsumer implements RocketMQListener<MessageExt>{@Overridepublic void onMessage(MessageExt messageExt) {try {//1.解析消息内容String body = new String(messageExt.getBody(),"UTF-8");//TradePay tradePay = JSON.parseObject(body,TradePay.class);MQEntity mqEntity = JSON.parseObject(body, MQEntity.class);//2.业务内容} catch (UnsupportedEncodingException e) {e.printStackTrace();}}
}

线程池方式优化

 executorService.submit(new Runnable() {@Overridepublic void run() {try {SendResult sendResult = sendMessage(topic, tag, finalTradePay.getPayId(), JSON.toJSONString(finalTradePay));log.info(JSON.toJSONString(sendResult));if (SendStatus.SEND_OK.equals(sendResult.getSendStatus())) {mqProducerTempMapper.deleteByPrimaryKey(mqProducerTemp.getId());System.out.println("删除消息表成功");}} catch (Exception e) {e.printStackTrace();}}});} else {CastException.cast(ShopCode.SHOP_PAYMENT_IS_PAID);}}
@Bean
public ThreadPoolTaskExecutor getThreadPool() {ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();executor.setCorePoolSize(4);executor.setMaxPoolSize(8);executor.setQueueCapacity(100);executor.setKeepAliveSeconds(60);executor.setThreadNamePrefix("Pool-A");executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());executor.initialize();return executor;}

雪花算法

时间搓+机器号+序列号
时间戳: 雪花算法的64位ID中,首先占用了41位用于存储时间戳。这个时间戳记录了生成ID的时间,精确到毫秒级。

机器ID: 接下来的10位用于存储机器的唯一ID。在分布式系统中,不同机器需要有唯一的标识符,以避免产生重复的ID。

序列号: 最后的13位用于存储序列号。在同一毫秒内,可以生成多个ID。序列号是递增的,可以用来解决同一毫秒内生成多个ID时的冲突问题。

雪花算法的特点:

可以在分布式环境中生成唯一ID,避免了重复ID的问题。
生成的ID是递增的,有序的,方便数据库索引和查询。
生成ID的速度很快,基本上可以达到每毫秒生成数万个ID。
雪花算法的实现相对简单,不需要依赖其他外部组件。
需要注意的是,雪花算法并不保证绝对的全局唯一性,因为时钟回拨、机器ID重用等问题都可能导致ID重复。在实际使用中,应该根据具体情况来进行适当的配置和处理,以确保ID的唯一性。另外,随着分布式系统的扩展,机器ID的分配和管理也需要注意,避免冲突和重复。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/26872.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

re学习(29)攻防世界-CatFly(复原反汇编)

因为这是一个.dll文件&#xff0c;在Linux上运行一下&#xff1a; 找到主要函数&#xff1a;&#xff08;以及由上面三部分对应的代码部分&#xff09; __int64 __fastcall main(int a1, char **a2, char **a3) {size_t v3; // rbx__int16 v5[4]; // [rsp10h] [rbp-4B0h] B…

【MCU学习】GD32F427VG开发

&#xff08;一&#xff09;学习文档和例程 兆易创新GD32 MCU参考资料下载 1.GD232F4xx的Keil芯片支持包 2.标准固件库和示例程序 3.GD32F4xx_固件库使用指南_Rev1.2 4.用户手册&#xff1a;GD32F4xx_User_Manual_Rev2.8_CN 5.数据手册&#xff1a;GD32F427xx_Datasheet_Rev…

通过MySQL删除Hive元数据信息

之前遇到过一个问题&#xff0c;在进行Hive的元数据采集时&#xff0c;因为Hive表的文件已经被删除了&#xff0c;当时是无法删除表&#xff0c;导致元数据采集也发生了问题&#xff0c;所以希望通过删除Hive表的元数据解决上述问题。 之前安装时&#xff0c;经过特定的配置后…

SpringBoot使用@Autowired将实现类注入到List或者Map集合中

前言 最近看到RuoYi-Vue-Plus翻译功能 Translation的翻译模块配置类TranslationConfig&#xff0c;其中有一个注入TranslationInterface翻译接口实现类的写法让我感到很新颖&#xff0c;但这种写法在Spring 3.0版本以后就已经支持注入List和Map&#xff0c;平时都没有注意到这…

WEB集群——http、tomcat

1. 简述静态网页和动态网页的区别。 2. 简述 Webl.0 和 Web2.0 的区别。 3. 安装tomcat8&#xff0c;配置服务启动脚本&#xff0c;部署jpress应用。 1. 简述静态网页和动态网页的区别。 1&#xff09;、静态网页 &#xff08;1&#xff09;、什么是静态网页 请求响应信息&…

基于fpga的电子时钟

文章目录 前言实验手册一、实验目的二、实验原理1&#xff0e;理论原理2&#xff0e;硬件原理 三、系统架构设计四、模块说明1&#xff0e;模块端口信号列表按键消抖模块&#xff08;key&#xff09;计数器模块&#xff08;counter&#xff09;蜂鸣器乐谱模块(music)蜂鸣器发声…

GD32F103VE侵入事件

GD32F103VE的TAMPER引脚(PC13)&#xff0c;当PC13输入低电平时&#xff0c;会产生一个侵入检测事件。它会将所有“数据备份寄存器”内容清除。 这个功能有什么用&#xff1f; 一是防止被人开壳&#xff0c;抄袭。二是自毁功能。 直奔主题&#xff0c;多一句就是浪费时间。测试…

flutter开发实战-flutter_spinkit实现多种风格进度指示器

flutter开发实战-flutter_spinkit实现多种风格进度指示器 最近开发过程中flutter_spinkit&#xff0c;这个拥有多种种风格加载指示器 一、flutter_spinkit 引入flutter_spinkit # 多种风格的模糊进度指示器flutter_spinkit: ^5.1.0效果示例 const spinkit SpinKitRotatingC…

如何找到死锁的线程?_java都学什么

在Java中&#xff0c;死锁是指两个或多个线程被无限地阻塞&#xff0c;等待彼此持有的资源&#xff0c;从而导致程序无法继续执行的情况。死锁通常是由于线程之间循环等待资源而产生的。要找到死锁的线程&#xff0c;可以采用以下方法&#xff1a; 1.线程转储(Thread Dump) 通过…

6.6 实现卷积神经网络LeNet训练并预测手写体数字

模型架构 代码实现 import torch from torch import nn from d2l import torch as d2lnet nn.Sequential(nn.Conv2d(1,6,kernel_size5,padding2),nn.Sigmoid(),#padding2补偿5x5卷积核导致的特征减少。nn.AvgPool2d(kernel_size2,stride2),nn.Conv2d(6,16,kernel_size5),nn.S…

OpenStreetMap数据转3D场景【Python + PostgreSQL】

很长一段时间以来&#xff0c;我对 GIS 和渲染感兴趣&#xff0c;在分别尝试这两者之后&#xff0c;我决定最终尝试以 3D 方式渲染 OpenStreetMap 中的地理数据&#xff0c;重点关注不超过城市的小规模。 在本文中&#xff0c;我将介绍从建筑形状生成三角形网格、以适合 Blend…

iperf3-性能测试

iperf3-性能测试 安装1.apt安装2.源码安装 使用方法iperf原理测试参考文档性能测试客户端服务端 官方文档&#xff1a;https://iperf.fr/iperf-doc.php 安装 1.apt安装 sudo apt-get install iperf32.源码安装 # 按照官方说明安装 ./configure make sudo make install执行编…

GATK ApplyBQSRSpark 过程中因No space left on device终止

Error&#xff1a; GATK ApplyBQSRSpark 过程中因No space left on device终止 执行命令&#xff1a; nohup time ./gatk --java-options "-Xmx128G" ApplyBQSRSpark --spark-master local[20] -R ../../alignment/hg38/hg38.fa -I ../../alignment/bam/P368T.s…

微信小程序nodejs+vue+uniapp高校食堂线上预约点餐系统

本次设计任务是要设计一个食堂线上预约点餐系统&#xff0c;通过这个系统能够满足管理员及学生的食堂线上预约点餐分享功能。系统的主要包括首页、个人中心、学生管理、菜品分类管理、菜品管理、关于我们管理、意见反馈、系统管理、订单管理等功能。 开发语言 node.js 框架&am…

【论文阅读】对抗溯源图主机入侵检测系统的模仿攻击(NDSS-2023)

作者&#xff1a;伊利诺伊大学芝加哥分校-Akul Goyal、Gang Wang、Adam Bates&#xff1b;维克森林大学-Xueyuan Han、 引用&#xff1a;Goyal A, Han X, Wang G, et al. Sometimes, You Aren’t What You Do: Mimicry Attacks against Provenance Graph Host Intrusion Detect…

BenchmarkSQL 支持 TiDB 驱动以及 tidb-loadbalance

作者&#xff1a; GangShen 原文来源&#xff1a; https://tidb.net/blog/3c274180 使用 BenchmarkSQL 对 TiDB 进行 TPC-C 测试 众所周知 TiDB 是一个兼容 MySQL 协议的分布式关系型数据库&#xff0c;用户可以使用 MySQL 的驱动以及连接方式连接 TiDB 进行使用&#xff0…

Git从远程仓库中删除文件,并上传新文件

目录 删除&#xff1a; 拉取远程分支的更新&#xff1a; ​编辑 首先查看git状态&#xff1a; ​编辑 删除文件并提交版本库&#xff1a; 提交&#xff1a; 上传新文件&#xff1a; 首先查看git状态&#xff1a; 提交到暂存区&#xff1a; 提交到版本库&#xff1a; 上…

基于Spring Boot的在线视频教育培训网站设计与实现(Java+spring boot+MySQL)

获取源码或者论文请私信博主 演示视频&#xff1a; 基于Spring Boot的在线视频教育培训网站设计与实现&#xff08;Javaspring bootMySQL&#xff09; 使用技术&#xff1a; 前端&#xff1a;html css javascript jQuery ajax thymeleaf 微信小程序 后端&#xff1a;Java sp…

skywalking日志收集

文章目录 一、介绍二、添加依赖三、修改日志配置1. 添加链路表示traceId2. 添加链路上下文3. 异步日志 四、收集链路日志 一、介绍 在上一篇文章skywalking全链路追踪中我们介绍了在微服务项目中使用skywalking进行服务调用链路的追踪。 本文在全链路追踪的基础上&#xff0c…

gradle项目Connection timed out,build时先下载gradle问题download gradle-x.x-bin.zip

IDEA 导入 Gradle 项目&#xff0c;编译的时候会默认下载 配置版本的Gradle.zip问题&#xff0c;一般会下载失败&#xff0c;提示Connection timed out&#xff0c;连接超时。 解决办法&#xff1a; 修改项目根目录下gradle目录下的gradle-wrapper.properties文件&#xff0c;…