Springboot集成Rabbitmq实现请求异步处理
一、Docker部署rabbitmq
1. docker pull rabbitmq:3.7.8
rabbitmq为需要拉取的镜像名称,3.7.8为版本号
2. docker run -d -p 5672:5672 -p 15672:15672 --name rabbitmq rabbitmq:3.7.8
运行拉取到的镜像文件;-d表示后台运行镜像;-p指将容器的端口映射到主机中;–name为设置容器的名称
3. docker ps -a
查看镜像是否运行成功
4. docker exec -it rabbitmq rabbitmq-plugins enable rabbitmq_management
启动rabbitmq_management,可以在浏览器访问web页面,默认账号密码为guest/guest
5. docker exec -it rabbitmq rabbitmqctl add_user admin xxx
docker exec -it rabbitmq rabbitmqctl set_user_tags admin administrator
为rabbitmq创建用户并设置权限,admin为用户名,xxx为密码
二、Springboot集成rabbitmq
1.maven依赖添加
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2.application.properties配置
#rabbitmq
#常规配置
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=433430
#特定需求配置
spring.rabbitmq.publisher-confirm-type=correlated
spring.rabbitmq.publisher-returns=true
spring.rabbitmq.template.mandatory=true
3.rabbitmq配置文件类
@Configuration
public class RabbitConfig {@Beanpublic RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);rabbitTemplate.setMandatory(true);//设置rabbitTemplate确认机制rabbitTemplate.setConfirmCallback(new RabbitConfirm());rabbitTemplate.setReturnCallback(new RabbitReturn());return rabbitTemplate;}
}
4.生产者发送数据
@RestController
public class SendMessageController {@AutowiredRabbitTemplate rabbitTemplate;@GetMapping("/send")public String sendDirectMessage(){String messageId = String.valueOf(UUID.randomUUID());String messageData = "test message, hello!";String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));Map<String,Object> map=new HashMap<>();map.put("messageId",messageId);map.put("messageData",messageData);map.put("createTime",createTime);//将消息携带绑定键值:TestDirectRouting 发送到交换机TestDirectExchangerabbitTemplate.convertAndSend("test", "a", map);return "ok";}
}
4.消费者消费数据
@Component
@RabbitListener(queues = "test-que") //配置需要监听的队列名
public class DirectReceiver {@RabbitHandlerpublic void process(Map test) throws InterruptedException {System.out.println("DirectReceiver消费者收到消息 : " + test.toString());}
}
5.ConfirmCallback确认机制配置(确认消息是否发送到交换机上)
在application.properties中配置:spring.rabbitmq.publisher-confirm-type=correlated
publisher-confirm-type 新版发布确认属性有三种确认类型:
- NONE值是禁用发布确认模式,是默认值
- CORRELATED值是发布消息成功到交换器后会触发回调方法,如1示例
- SIMPLE值经测试有两种效果,其一效果和CORRELATED值一样会触发回调方法,其二在发布消息成功后使用rabbitTemplate调用waitForConfirms或waitForConfirmsOrDie方法等待broker节点返回发送结果,根据返回结果来判定下一步的逻辑,要注意的点是waitForConfirmsOrDie方法如果返回false则会关闭channel,则接下来无法发送消息到broker;
public class RabbitConfirm implements RabbitTemplate.ConfirmCallback {@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {if (b){System.out.println("接收到了来自生产者的消息!");}else {System.out.println("未接收到");}//可自行定义业务逻辑}
}
6.ReturnCallback确认机制配置(未投递到队列上的数据将退回)
在application.properties中配置:
spring.rabbitmq.publisher-returns=true
spring.rabbitmq.template.mandatory=true(将 mandatory设置为true,如果消息不可路由那么rabbitmq会把完整的消息退回到发布者中)
public class RabbitReturn implements RabbitTemplate.ReturnCallback {@Overridepublic void returnedMessage(Message message, int i, String s, String s1, String s2) {System.out.println("消息主体:"+message.toString());System.out.println("回复编码:"+i);System.out.println("回复内容:"+s);System.out.println("交换器:"+s1);System.out.println("路由键:"+s2);//可自行定义业务逻辑}
}
7.发送自定义对象(两种方法)
rabbitTemplate源码默认采用SimpleMessageConverter来序列化消息的,只接受byte数组,string字符串,可序列化对象,如果传入费序列化的对象,会报错:java.lang.IllegalArgumentException: SimpleMessageConverter only supports String, byte[] and Serializable payloads, received: com.rabbitmq.producer.model.xxx
(1).自定义MessageConverter
public static class SelfConverter extends AbstractMessageConverter {@Overrideprotected Message createMessage(Object object, MessageProperties messageProperties) {messageProperties.setContentType("application/json");return new Message(JSON.toJSONBytes(object), messageProperties);}@Overridepublic Object fromMessage(Message message) throws MessageConversionException {return JSON.parse(message.getBody());}
}
然后在rabbitmq的配置类代码中添加
rabbitTemplate.setMessageConverter(new SelfConverter());
(2).Jackson2JsonMessageConverter
直接在rabbitmq的配置类代码中添加
rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
8.设置消费者应答机制
//在rabbit配置文件类中添加如下代码
@Beanpublic SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();factory.setConnectionFactory(connectionFactory);//设置手动ack模式 要和yml配置保持一致,不然会覆盖yml文件的配置factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);return factory;}
//消费者消费数据
@Component
public class DirectReceiver {@RabbitListener(queues = "test-que")@RabbitHandlerpublic void process(String test, Channel channel, Message message) throws IOException {long deliveryTag = message.getMessageProperties().getDeliveryTag();System.out.println("DirectReceiver消费者收到消息 : " + test.toString());channel.basicAck(deliveryTag,false);System.out.println("消息回退");}
}//应答机制共有三种:
/**
首先需要在SimpleRabbitListenerContainerFactory中配置 “factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);”
AcknowledgeMode共有三种:
1.NONE:默认情况下消息消费者是NONE模式,默认所有消息消费成功,会不断的向消费者推送消息。因为rabbitMq认为所有消息都被消费成功,所以队列中不在存有消息,消息存在丢失的危险
2.AUTO(自动确认):在自动确认模式下,消息发送后即被认为成功投递,不管消费者端是否成功处理本次投递
3.MANUAL(手动确认):消费者收到消息后,手动调用basic.ack/basic.nack/basic.reject后,RabbitMQ收到这些消息后,才认为本次投递成功(1)basicAck(long deliveryTag, boolean multiple):表示成功确认,使用此回执方法后,消息会被rabbitmq broker删除。deliveryTag:消息投递序号。multiple:是否批量确认,true->将一次性拒绝所有小于deliveryTag的消息。(2)basicNack(long deliveryTag, boolean multiple, boolean requeue):表示失败确认,一般在消费消息异常时用到此方法,可以将消息重新投递入队列。deliveryTag:表示消息投递序号。multiple:是否批量;true->将一次性拒绝所有小于deliveryTag的消息。requeue: 表示消息是否重新入队列,true表示重新投入队列中。(3)basicReject(long deliveryTag, boolean requeue):拒绝消息,与basicNack区别在于不能进行批量操作,其他用法很相似。deliveryTag:消息投递序号。requeue:值为true表示消息重新入队列。
**/