这一篇讲解消费者
文章目录
- 一、依赖配置
- 1. 引入依赖
- 2. 配置文件
- 3. 主配置
- 二、代码Conding
- 2.1. 消费者代码
一、依赖配置
1. 引入依赖
<!--springboot整合RabbitMQ依赖--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>
2. 配置文件
spring.rabbitmq.addresses=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=admin
spring.rabbitmq.virtual-host=/admin
spring.rabbitmq.connection-timeout=15000#手动签收
spring.rabbitmq.listener.simple.acknowledge-mode=manual
spring.rabbitmq.listener.simple.concurrency=5
spring.rabbitmq.listener.simple.max-concurrency=10spring.rabbitmq.listener.order.queue.name=queue-2
spring.rabbitmq.listener.order.queue.durable=true
spring.rabbitmq.listener.order.exchange.name=exchange-2
spring.rabbitmq.listener.order.exchange.durable=true
spring.rabbitmq.listener.order.exchange.type=topic
spring.rabbitmq.listener.order.exchange.ignoreDeclarationExceptions=true
spring.rabbitmq.listener.order.key=springboot.*
3. 主配置
package com.gblfy.springboot.config;import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;@Configuration
@ComponentScan({"com.gblfy.springboot.*"})
public class MainConfig {
}
二、代码Conding
2.1. 消费者代码
package com.gblfy.springboot.consumer;import com.gblfy.springboot.entity.Order;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;import java.util.Map;@Component
public class RabbitReceiver {@RabbitListener(bindings = @QueueBinding(value = @Queue(value = "queue-1",durable = "true"),exchange = @Exchange(value = "exchange-1",durable = "true",type = "topic",ignoreDeclarationExceptions = "true"),key = "springboot.*"))@RabbitHandlerpublic void onMessage(Message message, Channel channel) throws Exception {System.err.println("--------------------------------------");System.err.println("消费端Payload: " + message.getPayload());Long deliveryTag = (Long) message.getHeaders().get(AmqpHeaders.DELIVERY_TAG);//手工ACKchannel.basicAck(deliveryTag, false);}/*** 接收Order对象监听处理器** @param order* @param channel* @param headers* @throws Exception*/@RabbitListener(bindings = @QueueBinding(value = @Queue(value = "${spring.rabbitmq.listener.order.queue.name}",durable = "${spring.rabbitmq.listener.order.queue.durable}"),exchange = @Exchange(value = "${spring.rabbitmq.listener.order.exchange.name}",durable = "${spring.rabbitmq.listener.order.exchange.durable}",type = "${spring.rabbitmq.listener.order.exchange.type}",ignoreDeclarationExceptions = "${spring.rabbitmq.listener.order.exchange.ignoreDeclarationExceptions}"),key = "${spring.rabbitmq.listener.order.key}"))@RabbitHandlerpublic void onOrderMessage(@Payload Order order,Channel channel,@Headers Map<String, Object> headers) throws Exception {System.err.println("--------------------------------------");System.err.println("消费端order: " + order.getId());Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);//手工ACKchannel.basicAck(deliveryTag, false);}
}