优先级队列
方式一:可以通过RabbitMQ管理界面配置队列的优先级属性,如下图的x-max-priority
方式二:代码设置
Map<String,Object> args = new HashMap<String,Object>(); args.put("x-max-priority", 10); channel.queueDeclare("queue_priority", true, false, false, args); |
这里设置的是一个队列queue的最大优先级,之后要在发送的消息中设置消息本身的优先级,设置代码:
AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder(); builder.priority(5); AMQP.BasicProperties properties = builder.build(); channel.basicPublish("exchange_priority","rk_priority",properties,("messages").getBytes()); |
完整代码:生产者
public class Producer { public static final String ip = "10.0.40.127"; public static final int port = 5672; public static final String username = "admin"; public static final String password = "123456";
public static void main(String[] args) throws IOException{ ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setPassword(password); connectionFactory.setUsername(username); connectionFactory.setPort(port); connectionFactory.setHost(ip);
/* Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel();
//create exchange channel.exchangeDeclare("exchange_priority", "direct", true);
//create queue with priority Map<String, Object> params = new HashMap<>(); params.put("x-max-priority", 10); channel.queueDeclare("queue_priority", true, false, false, params); channel.queueBind("queue_priority", "exchange_priority", "rk_priority");
//send message with priority for (int i = 0; i < 10; i++) { AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder(); if (i % 2 == 0) { builder.priority(5); } AMQP.BasicProperties properties = builder.build(); channel.basicPublish("exchange_priority", "rk_priority", properties, ("produce messages-" + i).getBytes()); }
channel.close(); connection.close();*/ } } |
消费者
public class Consumer { public static final String ip = "10.0.40.127"; public static final int port = 5672; public static final String username = "admin"; public static final String password = "123456";
public static void main(String[] args) throws IOException, InterruptedException { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setPassword(password); connectionFactory.setUsername(username); connectionFactory.setPort(port); connectionFactory.setHost(ip);
/* Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel();
QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume("queue_priority",consumer); while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String msg = new String(delivery.getBody()); System.out.println(msg); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); }*/ } } |
打印输出:先输出偶数,后输出奇数
如何限流?
1、为什么要对消费端限流?
如果Rabbitmq 服务器积压了有上万条未处理的消息,如果这时候连上了一个消费端,那么巨量的消息瞬间全部推送过来,但是单个客户端无法同时处理这么多。当数据量特别大的时候对消费端限流,用于保持消费端的稳定,当消息数量激增的时候很有可能造成资源耗尽,以及影响服务的性能,导致系统的卡顿甚至直接崩溃。
2、限流的实现方式—限流api
RabbitMQ 提供了一种 qos (服务质量保证)功能,即在非自动确认消息的前提下,如果一定数目的消息(通过基于 consume 或者 channel 设置 Qos 的值)未被确认前,不进行消费新的消息。
void basicQos(int prefetchSize, int prefetchCount, boolean global) throws IOException; |
- prefetchSize:0,单条消息大小限制,0代表不限制
- prefetchCount:一次性消费的消息数量。告诉 RabbitMQ 不要同时给一个消费者推送多于 N 个消息,即一旦有 N 个消息还没有 ack,则该 consumer 将 block 掉,直到有消息 ack。
- global:true、false 是否将上面设置应用于 channel,就是上面限制 channel 级别还是 consumer 级别。当我们设置为 false 的时候生效
- prefetchCount 在 no_ask=false 的情况下才生效,即在自动应答的情况下这两个值是不生效的。
3、如何进行限流?
- 首先第一步,使用消费端限流需要关闭自动 ack,将 autoAck 设置为 falsechannel.basicConsume(queueName, false, consumer);
- 第二步设置具体的限流大小以及数量。channel.basicQos(0, 15, false);
- 第三步在消费者的 handleDelivery 消费方法中手动 ack,并且设置批量处理 ack 回应为 truechannel.basicAck(envelope.getDeliveryTag(), true);
消息确认机制
1、如果没有开启ack消息确认,rabbitmq会认为这条消息没有被消费,会将消息再次放入到队列中,再次让你消费,形成死循环;
2、消费端配置了手动ack,但是在异常捕获中设置了消息重新入队,那么还是会出现死循环
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false); |
因为最后一个参数requeue一般都会为true,此次没调用到数据,把这个消息返回到队列中再消费,如果代码中出现了int a=1/0,那么还是会造成死循环。
消息重试机制
当你开启了手动ack的时候再消费端如果在消费的时候出现异常也会导致循环消费,所以要启动消息重试机制,默认是3次重试去消费一条消息,如果没有消费完成,则丢弃(删除)该消息或者放入死信队列中或者进行人工补偿。
erver.port=8889
spring.rabbitmq.host=192.168.221.150 spring.rabbitmq.port=5672 spring.rabbitmq.username=zl spring.rabbitmq.password=123 #开启消息确认机制 spring.rabbitmq.publisher-confirms=true #支持消息发送失败返回队列 spring.rabbitmq.publisher-returns=true
#设置为 true 后 消费者在消息没有被路由到合适队列情况下会被return监听,而不会自动删除 spring.rabbitmq.template.mandatory=true
spring.rabbitmq.connection-timeout=15000 #用户虚拟机权限名称 spring.rabbitmq.virtual-host=/
#设置消费端手动 ack none不确认 auto自动确认 manual手动确认 spring.rabbitmq.listener.simple.acknowledge-mode=manual #消费者最小数量 spring.rabbitmq.listener.simple.concurrency=1 #消费之最大数量 spring.rabbitmq.listener.simple.max-concurrency=1
#开启消费者重试机制(为false时关闭消费者重试,这时消费端代码异常会一直重复收到消息) spring.rabbitmq.listener.simple.retry.enabled=true #重试次数5 spring.rabbitmq.listener.simple.retry.max-attempts=5 #重试时间间隔 spring.rabbitmq.listener.simple.retry.initial-interval=5000
#重试次数超过上面的设置之后是否丢弃(false不丢弃时需要写相应代码将该消息加入死信队列) spring.rabbitmq.listener.simple.default-requeue-rejected=true
#在单个请求中处理的消息个数,他应该大于等于事务数量(unack的最大数量) spring.rabbitmq.listener.simple.prefetch=2 |
1、触发重试机制需要消费者抛出异常,而不能try/catch捕捉异常,不然会死循环。
2、对于重试之后仍然异常的消息,mq默认的处理类是RejectAndDontRequeueRecoverer
见名知意。
SimpleRabbitListenerContainerFactoryConfigurer——>>RejectAndDontRequeueRecoverer(实现了MessageRecoverer接口)
MessageRecoverer接口实现类 | RejectAndDontRequeueRecoverer |
RepublishMessageRecoverer | |
ImmediateRequeueMessageRecoverer |
优化处理一:对于重试之后仍然异常的消息,可以采用RepublishMessageRecoverer,将消息发送到其他的队列中,再专门针对新的队列进行处理。
优化处理二:采用死信队列的方式处理重试失败的消息。
/** * 死信交换机 * @return */ @Bean public DirectExchange dlxExchange(){ return new DirectExchange(dlxExchangeName); }
/** * 死信队列 * @return */ @Bean public Queue dlxQueue(){ return new Queue(dlxQueueName); }
/** * 死信队列绑定死信交换机 * @param dlxQueue * @param dlxExchange * @return */ @Bean public Binding dlcBinding(Queue dlxQueue, DirectExchange dlxExchange){ return BindingBuilder.bind(dlxQueue).to(dlxExchange).with(dlxRoutingKey); } |
业务代码添加死信交换机、死信路由配置
/** * 业务队列 * @return */ @Bean public Queue queue(){ Map<String,Object> params = new HashMap<>(); params.put("x-dead-letter-exchange",dlxExchangeName);//声明当前队列绑定的死信交换机 params.put("x-dead-letter-routing-key",dlxRoutingKey);//声明当前队列的死信路由键 return QueueBuilder.durable(queueName).withArguments(params).build(); //return new Queue(queueName,true); } |
|
注意点:
1、消费者在重试5次后,由于MessageCover默认的实现类是RejectAndDontRequeueRecoverer,也就是requeue=false,因为业务队列绑定了死信队列,消息会从业务队列中删除,同时发送到死信队列中。
2、如果ack模式是手动ack,那么需要调用channe.nack方法,同时设置requeue=false才会将异常消息发送到死信队列中
重试使用场景:
对于消费端异常的消息,如果在有限次重试过程中消费成功是最好,如果有限次重试之后仍然失败的消息,不管是采用RejectAndDontRequeueRecoverer还是使用死信队列都是可以的,同时也可以采用折中的方法,先将消息从业务队列中ack掉,再将消息发送到另外的一个队列中,后续再单独处理异常数据的队列
考虑下面两个场景:
1、http下载视频或者图片或者调用第三方接口
2、空指针异常或者类型转换异常(其他的受检查的运行时异常)
第一种重试有意义,第二种重试无意义,需要记录日志以及人工处理或者轮询任务方式处理。
重试的使用方式:
1、自动ack模式,不能catch异常
2、手动ack模式,不能try—catch异常
建议自动ack模式使用重试机制,如果一定要在手动ack模式下使用retry功能,最好还是确认在有限次重试过程中可以重试成功,否则超过重试次数,又没办法执行nack,会出现消息一直unack死循环
消息幂等性
问题 | 解决方案 |
消息重复消费问题:消费者消息处理了,没来的及提交offset,再重启可能导致重复消费 | 方式一:使用全局MessageID判断消费方使用同一个,解决幂等性。 方式二:用一个消息消费表来记录每一条消息,给每个一个消息设置一个id(uuid),消费了就保存到表中去。消息过来的时候先查询是否已经消费。 |