生产者发送消息,在消息不可达指定队列时,可以借助扇出类型交换机(之前写过消息回退的处理方案,扇出交换机处理的方案优先级高于消息回退)处理不可达消息,然后放置一个备份队列,供消费者处理不可达消息,同时也加一个报警队列,对于不能走正常流程的消息进行消费者告警。
先用方法配置类把各个组件声明:
package com.esint.configs;import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;
import java.util.Map;@Configuration
public class BackupConfig {/*** 定义组件常量名字*///交换机- 确认交换机额public static final String EXCHANGE_SURE = "sure.ex";//交换机- 备份交换机额public static final String EXCHANGE_BACK = "backup.ex";//队列- 正常确认队列public static final String QUEUE_SURE = "sure.queue";//队列-备份队列public static final String QUEUE_BACKUP = "backup.queue";//队列-警告队列public static final String QUEUE_WARN = "warn.queue";//routing-keypublic static final String ROUTING_KEY_SURE = "key1";/*** 声明组件*///确认交换机@Bean("sureExchange")public DirectExchange sureExchange(){Map<String, Object> arguments = new HashMap<>();arguments.put("alternate-exchange",EXCHANGE_BACK);return ExchangeBuilder.directExchange(EXCHANGE_SURE).durable(true).withArguments(arguments).build();}//备份交换机@Bean("backExchange")public FanoutExchange backExchange(){return new FanoutExchange(EXCHANGE_BACK);}//确认队列@Bean("sureQueue")public Queue sureQueue(){return QueueBuilder.durable(QUEUE_SURE).build();}//备份队列@Bean("backupQueue")public Queue backupQueue(){return QueueBuilder.durable(QUEUE_BACKUP).build();}//警告队列@Bean("warnQueue")public Queue warnQueue(){return QueueBuilder.durable(QUEUE_WARN).build();}/*** 绑定组件 确认队列 绑定 确认交换机 with key1*/@Beanpublic Binding sureQueueBindingSureExchange(@Qualifier("sureQueue") Queue sureQueue,@Qualifier("sureExchange")DirectExchange sureExchange){return BindingBuilder.bind(sureQueue).to(sureExchange).with(ROUTING_KEY_SURE);}/*** 绑定组件 备份队列 绑定 备份交换机*/@Beanpublic Binding backupQueueBindingBackupExchange(@Qualifier("backupQueue") Queue backupQueue,@Qualifier("backExchange")FanoutExchange backExchange){return BindingBuilder.bind(backupQueue).to(backExchange);}/*** 绑定组件 警告队列 绑定 备份交换机*/@Beanpublic Binding warnQueueBindingBackupExchange(@Qualifier("warnQueue") Queue warnQueue,@Qualifier("backExchange")FanoutExchange backExchange){return BindingBuilder.bind(warnQueue).to(backExchange);}
}
生产者: 我们做出两个方法,一个可正常进行流程,一个routingKey异常无法路由到指定队列
package com.esint.controller;import com.esint.configs.BackupConfig;
import com.esint.constants.ResponseCode;
import com.esint.entity.ResponseEntity;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiImplicitParam;
import io.swagger.annotations.ApiImplicitParams;
import io.swagger.annotations.ApiOperation;
import org.apache.kafka.clients.producer.*;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.bind.annotation.RestController;import java.nio.charset.StandardCharsets;
import java.util.Properties;
import java.util.concurrent.ExecutionException;@Api(value = "rabbitMQ-备份队列测试")
@RestController
@RequestMapping("/rabbit")
public class BackUpExchangeController {@Autowiredprivate RabbitTemplate rabbitTemplate;@ApiOperation(value = "routingKey正常测试",httpMethod = "GET",tags = {"去正常流程"})@ApiImplicitParams({@ApiImplicitParam(name="str",value="消息体",required = false,dataType = "String")})@ResponseBody@RequestMapping(value = "/test1", method = RequestMethod.GET)public ResponseEntity test1(String str ) {rabbitTemplate.convertAndSend(BackupConfig.EXCHANGE_SURE, BackupConfig.ROUTING_KEY_SURE,str);return new ResponseEntity(ResponseCode.SUCCESS).addData("routingKeyOk:"+str);}@ApiOperation(value = "routingKey非正常测试",httpMethod = "GET",tags = {"去备份-警告"})@ApiImplicitParams({@ApiImplicitParam(name="str",value="消息体",required = false,dataType = "String")})@ResponseBody@RequestMapping(value = "/test2", method = RequestMethod.GET)public ResponseEntity test2(String str ) {rabbitTemplate.convertAndSend(BackupConfig.EXCHANGE_SURE, BackupConfig.ROUTING_KEY_SURE+"wrong",str);return new ResponseEntity(ResponseCode.SUCCESS).addData("routingKeyWrong:"+str);}}
三个消费者分别监听正常队列 备份队列 警告队列
确认队列消费者:
package com.esint.consumer;import com.esint.configs.BackupConfig;
import com.esint.configs.ConfirmConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
@Slf4j
public class BackUpConsumer01 {@RabbitListener(queues = BackupConfig.QUEUE_SURE)public void reveiver(Message message){log.info("正常消费者C1:" + new String(message.getBody()),"UTF-8");}}
备份队列消费者:
package com.esint.consumer;import com.esint.configs.BackupConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
@Slf4j
public class BackUpConsumer02 {@RabbitListener(queues = BackupConfig.QUEUE_BACKUP)public void reveiver(Message message){log.info("备份消费者C2:" + new String(message.getBody()),"UTF-8");}
}
警告队列消费者:
package com.esint.consumer;import com.esint.configs.BackupConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
@Slf4j
public class BackUpConsumer03 {@RabbitListener(queues = BackupConfig.QUEUE_WARN)public void reveiver(Message message){log.info("警告消费者C3:" + new String(message.getBody()),"UTF-8");}}
测试:
1.正常流程测试
com.esint.consumer.BackUpConsumer01 : 正常消费者C1:你好啊 正常队列
2.路由不达消息测试
com.esint.consumer.BackUpConsumer03 : 警告消费者C3:这个消息不可达 routing-key不对 它去哪里了?
com.esint.consumer.BackUpConsumer02 : 备份消费者C2:这个消息不可达 routing-key不对 它去哪里了?
测试达到预期结果!