方案一:
本地消息表 + 定时任务
本地消息表:主要用于存储 业务数据、交换机、队列、路由、次数
定时任务:定时扫描本地消息表,重新给业务队列投递消息。
具体思路:业务队列消费失败时,把 业务数据、交换机、队列、路由、次数(需要重新计算) 存储在本地消息表里,然后定时任务去扫描本地消息表,把符合条件(是否满足重试次数,是否达到重试时间)的数据筛选出来进行二次投递,消费者正常消费,在消费失败时需要入库。
方案二:
利用 rabbitmq_delayed_message_exchange 插件 实现延迟队列
具体思路:业务队列消费失败时,给延迟队列发送一条消息,消息包含业务数据、交换机、队列、次数、最大次数等,延迟队列收到消息后重新给业务队列投递消息。业务队列二次收到消息时,再次消费失败,校验最大次数,判断是否再次重试。
具体实现
- pom.xml
<dependencies><dependency><groupId>run.siyuan</groupId><artifactId>siyuan-common</artifactId><version>1.0-SNAPSHOT</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency></dependencies>
- application.yml
server:
port: 8080spring:
rabbitmq:
addresses: 127.0.0.1
port: 5672
username: siyuan
password: siyuan123456virtual-host: /
- PluginDelayRabbitConfig.java
import com.rabbitmq.client.ConnectionFactory;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;
import java.util.Map;/*** @className: PluginDelayRabbitConfig* @Description: TODO* @author: wzq* @date: 2022/5/17 10:50 AM*/
@Configuration
public class PluginDelayRabbitConfig {@Bean("pluginDelayExchange")public CustomExchange pluginDelayExchange() {Map<String, Object> argMap = new HashMap<>();argMap.put("x-delayed-type", "direct");//必须要配置这个类型,可以是direct,topic和fanout//第二个参数必须为x-delayed-messagereturn new CustomExchange("PLUGIN_DELAY_EXCHANGE","x-delayed-message",false, false, argMap);}@Bean("pluginDelayQueue")public Queue pluginDelayQueue(){return new Queue("PLUGIN_DELAY_QUEUE");}@Beanpublic Binding pluginDelayBinding(@Qualifier("pluginDelayQueue") Queue queue, @Qualifier("pluginDelayExchange") CustomExchange customExchange){return BindingBuilder.bind(queue).to(customExchange).with("delay").noargs();}
}
- RabbitmqConsumer.java
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.text.SimpleDateFormat;
import java.util.Date;/*** @className: consumer* @Description: TODO* @author: wzq* @date: 2022/5/17 10:52 AM*/
@Slf4j
@Component
public class RabbitmqConsumer {@RabbitHandler@RabbitListener(queues = "PLUGIN_DELAY_QUEUE")//监听延时队列public void fanoutConsumer(String msg){SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");System.out.println("【插件延迟队列】【" + sdf.format(new Date()) + "】收到消息:" + msg);}
}
- RabbitMqController.java
import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;import java.text.SimpleDateFormat;
import java.util.Date;/*** @className: RabbitMqController* @Description: TODO* @author: wzq* @date: 2022/5/17 10:54 AM*/
@RestController
public class RabbitMqController {@AutowiredRabbitTemplate rabbitTemplate;@GetMapping(value = "/plugin/send")public String pluginMsgSend(@RequestParam Integer time) {JSONObject json = new JSONObject();json.set("name", "插件延迟消息");json.set("time", System.currentTimeMillis());json.set("delayTime", time);MessageProperties messageProperties = new MessageProperties();messageProperties.setHeader("x-delay", 1000 * time);//延迟5秒被删除Message message = new Message(JSONUtil.toJsonStr(json).getBytes(), messageProperties);rabbitTemplate.convertAndSend("PLUGIN_DELAY_EXCHANGE", "delay", message);//交换机和路由键必须和配置文件类中保持一致SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");System.out.println("消息发送成功【" + sdf.format(new Date()) + "】" + "延迟时间:" + time);return "succ";}
}
方案三:
利用 TTL 消息 + DLX 死信队列 实现延迟队列
具体思路:业务队列消费失败时,会发送一条TTL 消息,消息包含业务数据、交换机、队列、次数、最大次数等,TTL 消息过期后会进入死信队列,此时监听死信队列接收消息,校验是否达到重试次数,再重新投递给业务队列,业务队列二次收到消息时,再次消费失败,校验最大次数,判断是否再次重试。超过最大次数入库,人工干预处理
具体实现
- pom.xml
<dependencies><dependency><groupId>run.siyuan</groupId><artifactId>siyuan-common</artifactId><version>1.0-SNAPSHOT</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency></dependencies>
- application.yml
server:
port: 8080spring:
rabbitmq:
addresses: 127.0.0.1
port: 5672
username: siyuan
password: siyuan123456virtual-host: /
- Constants.java
public interface Constants {// ------------------------------ delay -------------------------------------// 延时交换机String DELAY_EXCHANGE = "delay.exchange";// 延时交换机队列String DELAY_EXCHANGE_QUEUE = "delay.exchange.queue";// 延时交换机路由键String DELAY_EXCHANGE_ROUTE_KEY = "delay.exchange.route.key";// ------------------------------ dead.letter.fanout -------------------------------------// 死信交换机String DELAY_LETTER_EXCHANGE = "dead.letter.exchange";// 死信交换机队列String DELAY_LETTER_EXCHANGE_QUEUE = "dead.letter.exchange.queue";// 死信交换机路由键String DELAY_LETTER_EXCHANGE_ROUTE_KEY = "dead.letter.exchange.route.key";// ------------------------------ 业务队列 -------------------------------------String SERVICE_QUEUE = "service.queue";}
- RetryRabbitConfig.java
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import run.siyuan.common.rabbitmq.Constants;import java.util.HashMap;
import java.util.Map;@Configuration
public class RetryRabbitConfig {/*** ------------------- 延时队列相关 -------------------* @return*//*** 延时交换机*/@Beanpublic DirectExchange ttlDelayExchangeRetry() {return new DirectExchange(Constants.DELAY_EXCHANGE);}/*** 延时交换机队列*/@Beanpublic Queue ttlDelayExchangeQueueRetry() {Map<String, Object> map = new HashMap<String, Object>();//队列中所有消息5秒后过期//map.put("x-message-ttl", 1000 * 60 * 5);//过期后进入死信队列map.put("x-dead-letter-exchange", Constants.DELAY_LETTER_EXCHANGE);return new Queue(Constants.DELAY_EXCHANGE_QUEUE, false, false, false, map);}/*** Fanout交换机和productQueue绑定*/@Beanpublic Binding bindTtlExchangeAndQueueRetry() {return BindingBuilder.bind(ttlDelayExchangeQueueRetry()).to(ttlDelayExchangeRetry()).with(Constants.DELAY_EXCHANGE_ROUTE_KEY);}/*** ------------------- 死信队列相关 -------------------*//*** fanout死信交换机** @return*/@Beanpublic FanoutExchange deadLetterExchange() {return new FanoutExchange(Constants.DELAY_LETTER_EXCHANGE);}/*** 死信队列** @return*/@Beanpublic Queue deadLetterQueue() {return new Queue(Constants.DELAY_LETTER_EXCHANGE_QUEUE);}/*** 正常业务队列* @return*/@Beanpublic Queue serviceQueue() {return new Queue(Constants.SERVICE_QUEUE);}/*** 死信队列和死信交换机绑定** @return*/@Beanpublic Binding deadLetterBind() {return BindingBuilder.bind(deadLetterQueue()).to(deadLetterExchange());}
}
- MessageRetryVo
package run.siyuan.rabbitmq.retry.message.model;import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.experimental.Accessors;import java.io.Serializable;
import java.util.Date;@Data
@EqualsAndHashCode(callSuper = false)
@Accessors(chain = true)
public class MessageRetryVo implements Serializable {private static final long serialVersionUID = 1L;/*** 原始消息body*/private String bodyMsg;/*** 交换器*/private String exchangeName;/*** 路由键*/private String routingKey;/*** 队列*/private String queueName;/*** 最大重试次数*/private Integer maxTryCount = 3;/*** 当前重试次数*/private Integer currentRetryCount = 0;/*** 任务失败信息*/private String errorMsg;/*** 创建时间*/private Date createTime;/*** 消息类型 0.延时消息 1.重试消息*/private Integer type;@Overridepublic String toString() {return "MessageRetryDTO{" +"bodyMsg='" + bodyMsg + '\'' +", exchangeName='" + exchangeName + '\'' +", routingKey='" + routingKey + '\'' +", queueName='" + queueName + '\'' +", maxTryCount=" + maxTryCount +", currentRetryCount=" + currentRetryCount +", errorMsg='" + errorMsg + '\'' +", createTime=" + createTime +'}';}/*** 检查重试次数是否超过最大值** @return*/public boolean checkRetryCount(Integer type) {//检查重试次数是否超过最大值if (this.currentRetryCount <= this.maxTryCount) {if (type.equals(0)) {retryCountCalculate();}return true;}return false;}/*** 重新计算重试次数*/private void retryCountCalculate() {this.currentRetryCount = this.currentRetryCount + 1;}}
- ServiceConsumer.java
import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;
import run.siyuan.common.rabbitmq.Constants;
import run.siyuan.rabbitmq.retry.message.service.CommonMessageDelayService;import java.io.IOException;/**
* 正常消费
*/
@Slf4j
@Component
public class ServiceConsumer extends CommonMessageDelayService {@RabbitListener(queues = Constants.SERVICE_QUEUE, ackMode = "MANUAL", concurrency = "1")private void consumer(Message message, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag, Channel channel) throws IOException {try {byte[] body = message.getBody();String msg = new String(body);log.info("【正常队列】【" + System.currentTimeMillis() + "】收到死信队列消息:" + msg);JSONObject json = JSONUtil.parseObj(msg);if (json.getInt("id") < 0) {throw new Exception("id 小于 0");}channel.basicAck(deliveryTag, false);} catch (Exception e) {log.info("消费异常:{}", e.getMessage());channel.basicNack(deliveryTag, false, false);sendDelayMessage(message, e);}}}
- DeadLetterConsumer
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;
import run.siyuan.common.rabbitmq.Constants;
import run.siyuan.rabbitmq.retry.message.service.CommonMessageRetryService;import java.io.IOException;/*** @className: DeadLetterConsumer* @Description: TODO 死信队列消费者* @author: wzq* @date: 2022/5/13 3:05 PM*/
@Slf4j
@Component
public class DeadLetterConsumer extends CommonMessageRetryService {@RabbitHandler@RabbitListener(queues = Constants.DELAY_LETTER_EXCHANGE_QUEUE, ackMode = "MANUAL", concurrency = "1")public void consumer(Message message, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag, Channel channel) throws IOException {try {log.info("【死信队列】【" + System.currentTimeMillis() + "】收到死信队列消息:", new String(message.getBody()));retryMessage(message);channel.basicAck(deliveryTag, false);} catch (Exception e) {channel.basicNack(deliveryTag, false, false);}}
}
- CommonMessageDelayService.java
import cn.hutool.json.JSONUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import run.siyuan.common.rabbitmq.Constants;
import run.siyuan.rabbitmq.retry.message.model.MessageRetryVo;/*** @className: TtlTetsConsumer* @Description: TODO rabbitmq 补偿机制--发送延时消息* @author: wzq* @date: 2022/5/13 3:05 PM*/@Slf4j
public abstract class CommonMessageDelayService extends AbstractCommonMessageService {@Autowiredprivate RabbitTemplate rabbitTemplate;/*** 发送延时消息** @param message*/protected void sendDelayMessage(Message message, Exception e) {try {//封装消息MessageRetryVo delayMessageVo = buildMessageRetryInfo(message);log.info("延时消息:{}", delayMessageVo);//获取所有堆栈信息StackTraceElement[] stackTraceElements = e.getStackTrace();//默认的异常类全路径为第一条异常堆栈信息的String exceptionClassTotalName = stackTraceElements[0].toString();//遍历所有堆栈信息,找到vip.xiaonuo开头的第一条异常信息for (StackTraceElement stackTraceElement : stackTraceElements) {if (stackTraceElement.toString().contains("com.central")) {exceptionClassTotalName = stackTraceElement.toString();break;}}log.info("异常信息:{}", exceptionClassTotalName);delayMessageVo.setErrorMsg(exceptionClassTotalName);delayMessageVo.setType(0);prepareAction(delayMessageVo);} catch (Exception exception) {log.warn("处理消息异常,错误信息:", exception);}}/*** 异常消息重新入库** @param retryVo*/@Overrideprotected void sendMessage(MessageRetryVo retryVo) {//将补偿消息实体放入头部,原始消息内容保持不变MessageProperties messageProperties = new MessageProperties();// 消息的有效时间固定,不使用自定义时间messageProperties.setExpiration(String.valueOf(1000 * 10 * 1));messageProperties.setHeader("message_retry_info", JSONUtil.toJsonStr(retryVo));Message ttlMessage = new Message(JSONUtil.toJsonStr(retryVo).getBytes(), messageProperties);rabbitTemplate.convertAndSend(Constants.DELAY_EXCHANGE, Constants.DELAY_EXCHANGE_ROUTE_KEY, ttlMessage);log.info("发送业务消息 完成 时间:{}", System.currentTimeMillis());}}
- CommonMessageRetryService.java
import cn.hutool.json.JSONUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import run.siyuan.rabbitmq.retry.message.model.MessageRetryVo;/*** @className: TtlTetsConsumer* @Description: TODO rabitmq 补偿机制--重新发送业务消息* @author: wzq* @date: 2022/5/13 3:05 PM*/
@Slf4j
public abstract class CommonMessageRetryService extends AbstractCommonMessageService {@Autowiredprivate RabbitTemplate rabbitTemplate;/*** 发送延时消息** @param message*/public void retryMessage(Message message) {try {//封装消息MessageRetryVo retryMessageVo = buildMessageRetryInfo(message);log.info("重试消息:{}", retryMessageVo);retryMessageVo.setType(1);prepareAction(retryMessageVo);} catch (Exception exception) {log.warn("处理消息异常,错误信息:", exception);}}/*** 异常消息重新入库** @param retryVo*/@Overrideprotected void sendMessage(MessageRetryVo retryVo) {//将补偿消息实体放入头部,原始消息内容保持不变MessageProperties messageProperties = new MessageProperties();messageProperties.setHeader("message_retry_info", JSONUtil.toJsonStr(retryVo));Message message = new Message(retryVo.getBodyMsg().getBytes(), messageProperties);rabbitTemplate.convertAndSend(retryVo.getExchangeName(), retryVo.getRoutingKey(), message);log.info("发送业务消息 完成 时间:{}", System.currentTimeMillis());}}
- AbstractCommonMessageService
import cn.hutool.json.JSONUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import run.siyuan.rabbitmq.retry.message.model.MessageRetryVo;import java.nio.charset.StandardCharsets;
import java.util.Date;
import java.util.Map;
import java.util.Objects;/*** @className: TtlTetsConsumer* @Description: TODO rabbitmq 补偿机制 抽象类* @author: wzq* @date: 2022/5/13 3:05 PM*/
@Slf4j
public abstract class AbstractCommonMessageService {/*** 发送消息** @param retryVo*/protected abstract void sendMessage(MessageRetryVo retryVo);/*** 构建消息补偿实体** @param message* @return*/protected MessageRetryVo buildMessageRetryInfo(Message message) {//如果头部包含补偿消息实体,直接返回Map<String, Object> messageHeaders = message.getMessageProperties().getHeaders();if (messageHeaders.containsKey("message_retry_info")) {Object retryMsg = messageHeaders.get("message_retry_info");if (Objects.nonNull(retryMsg)) {return JSONUtil.toBean(JSONUtil.parseObj(retryMsg), MessageRetryVo.class);}}//自动将业务消息加入补偿实体MessageRetryVo messageVo = new MessageRetryVo();messageVo.setBodyMsg(new String(message.getBody(), StandardCharsets.UTF_8));messageVo.setExchangeName(message.getMessageProperties().getReceivedExchange());messageVo.setRoutingKey(message.getMessageProperties().getReceivedRoutingKey());messageVo.setQueueName(message.getMessageProperties().getConsumerQueue());messageVo.setCreateTime(new Date());return messageVo;}/*** 准备执行** @param messageVo*/protected void prepareAction(MessageRetryVo messageVo) {if (messageVo.checkRetryCount(messageVo.getType())) {this.sendMessage(messageVo);} else {if (log.isWarnEnabled()) {log.warn("当前任务重试次数已经到达最大次数,业务数据:" + messageVo.toString());}doFailCallBack(messageVo);}}/*** 重试失败,回调服务** @param messageVo*/protected void doFailCallBack(MessageRetryVo messageVo) {try {saveRetryMessageInfo(messageVo);} catch (Exception e) {log.warn("执行失败回调异常,错误原因:{}", e.getMessage());}}/*** 将异常消息入库** @param messageVo*/protected void saveRetryMessageInfo(MessageRetryVo messageVo) {try {log.info("重试消息次数:{} message_retry_info:{}", messageVo.getCurrentRetryCount(), messageVo);} catch (Exception e) {log.error("将异常消息存储到mongodb失败,消息数据:" + messageVo.toString(), e);}}
}
- RetryController.java
import cn.hutool.core.util.StrUtil;
import cn.hutool.json.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import run.siyuan.common.rabbitmq.Constants;@Slf4j
@RestController
@RequestMapping("/retry")
public class RetryController {@Autowiredprivate RabbitTemplate rabbitTemplate;@GetMapping(value = "/service/message")public String consumerFailQueue(@RequestParam(required = false, defaultValue = "1") Integer id) {JSONObject json = new JSONObject();json.set("id", id);json.set("name", "消息名称");json.set("time", System.currentTimeMillis());String msg = StrUtil.format("消息发送时间:{} 消息数据:{}", System.currentTimeMillis(), json);log.info(msg);rabbitTemplate.convertAndSend(Constants.SERVICE_QUEUE, json);log.info("消息发送完成时间:{}", System.currentTimeMillis());return "success";}
}
PS:方案三会照成消息的阻塞,例如:发送第一个延时消息,10分钟过期,再发送第二个延时消息,5分钟过期。第二个消息肯定要比第一个消息提前过期,但此时因为前一个消息没有过期也就没有出队列,那第二个消息只能等待第一个出队列之后才能出队列。这样就照成了消息的阻塞。业务上允许的情况下,可以使用这种方式。