Rabbitmq如何设置优先级队列?如何限流?如何重试?如何处理幂等性?

优先级队列

方式一:可以通过RabbitMQ管理界面配置队列的优先级属性,如下图的x-max-priority

方式二:代码设置

Map<String,Object> args = new HashMap<String,Object>();

args.put("x-max-priority", 10);

channel.queueDeclare("queue_priority", true, false, false, args);

这里设置的是一个队列queue的最大优先级,之后要在发送的消息中设置消息本身的优先级,设置代码:

AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();

builder.priority(5);

AMQP.BasicProperties properties = builder.build();

channel.basicPublish("exchange_priority","rk_priority",properties,("messages").getBytes());

完整代码:生产者

public class Producer {

    public static final String ip = "10.0.40.127";

    public static final int port = 5672;

    public static final String username = "admin";

    public static final String password = "123456";

 

    public static void main(String[] args) throws IOException{

        ConnectionFactory connectionFactory = new ConnectionFactory();

        connectionFactory.setPassword(password);

        connectionFactory.setUsername(username);

        connectionFactory.setPort(port);

        connectionFactory.setHost(ip);

 

     /*   Connection connection = connectionFactory.newConnection();

        Channel channel = connection.createChannel();

 

        //create exchange

        channel.exchangeDeclare("exchange_priority", "direct", true);

 

        //create queue with priority

        Map<String, Object> params = new HashMap<>();

        params.put("x-max-priority", 10);

        channel.queueDeclare("queue_priority", true, false, false, params);

        channel.queueBind("queue_priority", "exchange_priority", "rk_priority");

 

        //send message with priority

        for (int i = 0; i < 10; i++) {

            AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();

            if (i % 2 == 0) {

                builder.priority(5);

            }

            AMQP.BasicProperties properties = builder.build();

            channel.basicPublish("exchange_priority", "rk_priority", properties, ("produce messages-" + i).getBytes());

        }

 

        channel.close();

        connection.close();*/

    }

}

消费者

public class Consumer {

    public static final String ip = "10.0.40.127";

    public static final int port = 5672;

    public static final String username = "admin";

    public static final String password = "123456";

 

    public static void main(String[] args) throws IOException, InterruptedException {

        ConnectionFactory connectionFactory = new ConnectionFactory();

        connectionFactory.setPassword(password);

        connectionFactory.setUsername(username);

        connectionFactory.setPort(port);

        connectionFactory.setHost(ip);

 

/*        Connection connection = connectionFactory.newConnection();

        Channel channel = connection.createChannel();

 

        QueueingConsumer consumer = new QueueingConsumer(channel);

        channel.basicConsume("queue_priority",consumer);

        while (true) {

            QueueingConsumer.Delivery delivery = consumer.nextDelivery();

            String msg = new String(delivery.getBody());

            System.out.println(msg);

            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);

        }*/

    }

}

打印输出:先输出偶数,后输出奇数

如何限流?

1、为什么要对消费端限流?

如果Rabbitmq 服务器积压了有上万条未处理的消息,如果这时候连上了一个消费端,那么巨量的消息瞬间全部推送过来,但是单个客户端无法同时处理这么多。当数据量特别大的时候对消费端限流,用于保持消费端的稳定,当消息数量激增的时候很有可能造成资源耗尽,以及影响服务的性能,导致系统的卡顿甚至直接崩溃。

2、限流的实现方式—限流api

RabbitMQ 提供了一种 qos (服务质量保证)功能,即在非自动确认消息的前提下,如果一定数目的消息(通过基于 consume 或者 channel 设置 Qos 的值)未被确认前,不进行消费新的消息。

void basicQos(int prefetchSize, int prefetchCount, boolean global) throws IOException;

  • prefetchSize:0,单条消息大小限制,0代表不限制
  • prefetchCount:一次性消费的消息数量。告诉 RabbitMQ 不要同时给一个消费者推送多于 N 个消息,即一旦有 N 个消息还没有 ack,则该 consumer 将 block 掉,直到有消息 ack。
  • global:true、false 是否将上面设置应用于 channel,就是上面限制 channel 级别还是 consumer 级别。当我们设置为 false 的时候生效
  • prefetchCount 在 no_ask=false 的情况下才生效,即在自动应答的情况下这两个值是不生效的

3、如何进行限流?

  • 首先第一步,使用消费端限流需要关闭自动 ack,将 autoAck 设置为 falsechannel.basicConsume(queueName, false, consumer);
  • 第二步设置具体的限流大小以及数量。channel.basicQos(0, 15, false);
  • 第三步在消费者的 handleDelivery 消费方法中手动 ack,并且设置批量处理 ack 回应为 truechannel.basicAck(envelope.getDeliveryTag(), true);

 

 消息确认机制

1如果没有开启ack消息确认,rabbitmq会认为这条消息没有被消费,会将消息再次放入到队列中,再次让你消费,形成死循环;

2、消费端配置了手动ack,但是在异常捕获中设置了消息重新入队,那么还是会出现死循环

channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);

因为最后一个参数requeue一般都会为true,此次没调用到数据,把这个消息返回到队列中再消费,如果代码中出现了int a=1/0,那么还是会造成死循环。

 消息重试机制

当你开启了手动ack的时候再消费端如果在消费的时候出现异常也会导致循环消费,所以要启动消息重试机制,默认是3次重试去消费一条消息,如果没有消费完成,则丢弃(删除)该消息或者放入死信队列中或者进行人工补偿。

erver.port=8889

 

spring.rabbitmq.host=192.168.221.150

spring.rabbitmq.port=5672

spring.rabbitmq.username=zl

spring.rabbitmq.password=123

#开启消息确认机制

spring.rabbitmq.publisher-confirms=true

#支持消息发送失败返回队列

spring.rabbitmq.publisher-returns=true

 

#设置为 true 后 消费者在消息没有被路由到合适队列情况下会被return监听,而不会自动删除

spring.rabbitmq.template.mandatory=true

 

spring.rabbitmq.connection-timeout=15000

#用户虚拟机权限名称

spring.rabbitmq.virtual-host=/

 

#设置消费端手动 ack   none不确认  auto自动确认  manual手动确认

spring.rabbitmq.listener.simple.acknowledge-mode=manual

#消费者最小数量

spring.rabbitmq.listener.simple.concurrency=1

#消费之最大数量

spring.rabbitmq.listener.simple.max-concurrency=1

 

#开启消费者重试机制(为false时关闭消费者重试,这时消费端代码异常会一直重复收到消息)

spring.rabbitmq.listener.simple.retry.enabled=true

#重试次数5

spring.rabbitmq.listener.simple.retry.max-attempts=5

#重试时间间隔

spring.rabbitmq.listener.simple.retry.initial-interval=5000

 

#重试次数超过上面的设置之后是否丢弃(false不丢弃时需要写相应代码将该消息加入死信队列)

spring.rabbitmq.listener.simple.default-requeue-rejected=true

 

#在单个请求中处理的消息个数,他应该大于等于事务数量(unack的最大数量)

spring.rabbitmq.listener.simple.prefetch=2

 

1、触发重试机制需要消费者抛出异常,而不能try/catch捕捉异常,不然会死循环

2、对于重试之后仍然异常的消息,mq默认的处理类是RejectAndDontRequeueRecoverer

见名知意。

SimpleRabbitListenerContainerFactoryConfigurer——>>RejectAndDontRequeueRecoverer(实现了MessageRecoverer接口

 

 

MessageRecoverer接口实现类

RejectAndDontRequeueRecoverer

RepublishMessageRecoverer

ImmediateRequeueMessageRecoverer

 

优化处理一对于重试之后仍然异常的消息,可以采用RepublishMessageRecoverer,将消息发送到其他的队列中,再专门针对新的队列进行处理

 

优化处理二:采用死信队列的方式处理重试失败的消息

/**

 * 死信交换机

 * @return

 */

@Bean

public DirectExchange dlxExchange(){

    return new DirectExchange(dlxExchangeName);

}

 

/**

 * 死信队列

 * @return

 */

@Bean

public Queue dlxQueue(){

    return new Queue(dlxQueueName);

}

 

/**

 * 死信队列绑定死信交换机

 * @param dlxQueue

 * @param dlxExchange

 * @return

 */

@Bean

public Binding dlcBinding(Queue dlxQueue, DirectExchange dlxExchange){

    return BindingBuilder.bind(dlxQueue).to(dlxExchange).with(dlxRoutingKey);

}

业务代码添加死信交换机、死信路由配置

/**

 * 业务队列

 * @return

 */

@Bean

public Queue queue(){

    Map<String,Object> params = new HashMap<>();

    params.put("x-dead-letter-exchange",dlxExchangeName);//声明当前队列绑定的死信交换机

    params.put("x-dead-letter-routing-key",dlxRoutingKey);//声明当前队列的死信路由键

    return QueueBuilder.durable(queueName).withArguments(params).build();

    //return new Queue(queueName,true);

}

 

注意点:

1消费者在重试5次后,由于MessageCover默认的实现类是RejectAndDontRequeueRecoverer,也就是requeue=false,因为业务队列绑定了死信队列,消息会从业务队列中删除,同时发送到死信队列中。

2如果ack模式是手动ack,那么需要调用channe.nack方法,同时设置requeue=false才会将异常消息发送到死信队列中

重试使用场景:

对于消费端异常的消息,如果在有限次重试过程中消费成功是最好,如果有限次重试之后仍然失败的消息,不管是采用RejectAndDontRequeueRecoverer还是使用死信队列都是可以的,同时也可以采用折中的方法,先将消息从业务队列中ack掉,再将消息发送到另外的一个队列中,后续再单独处理异常数据的队列

考虑下面两个场景:

1http下载视频或者图片或者调用第三方接口

2空指针异常或者类型转换异常(其他的受检查的运行时异常)

第一种重试有意义,第二种重试无意义,需要记录日志以及人工处理或者轮询任务方式处理。

重试的使用方式:

1、自动ack模式,不能catch异常

2、手动ack模式,不能try—catch异常

建议自动ack模式使用重试机制,如果一定要在手动ack模式下使用retry功能,最好还是确认在有限次重试过程中可以重试成功,否则超过重试次数,又没办法执行nack,会出现消息一直unack死循环

消息幂等性

问题

解决方案

消息重复消费问题:消费者消息处理了,没来的及提交offset,再重启可能导致重复消费

方式一:使用全局MessageID判断消费方使用同一个,解决幂等性。

方式二:用一个消息消费表来记录每一条消息,给每个一个消息设置一个id(uuid),消费了就保存到表中去。消息过来的时候先查询是否已经消费。

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/419073.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【Qt】Qt之进程间通信(Windows消息)【转】

简述 通过上一节的了解&#xff0c;我们可以看出进程通信的方式很多&#xff0c;今天分享下如何利用Windows消息机制来进行不同进程间的通信。 简述效果发送消息 自定义类型与接收窗体发送数据接收消息 设置标题重写nativeEvent效果 发送消息 自定义类型与接收窗体 包含所需库&…

启动nginx服务报错Job for nginx.service failed because the control process exited with error code.

nginx使用service nginx restart报错 启动nginx服务时如果遇到这个错误 Job for nginx.service failed because the control process exited with error code. See "systemctl status nginx.service" and "journalctl -xe" for details. 可能原因: 1、配…

27 | 递归树:如何借助树来求解递归算法的时间复杂度?

目的 借助递归树来分析递归算法的时间复杂度 递归树 递归的思想就是将大问题分解为小问题来求解&#xff0c;然后再将小问题分解为小小问题。这样一层一层地分解&#xff0c;直到问题的数据规模被分解得足够小&#xff0c;不用继续递归分解为止。 如果我们把这个一层一层的…

28 | 堆和堆排序:为什么说堆排序没有快速排序快?

如何理解“堆” 堆排序是一种原地的、时间复杂度为 O(nlogn) 的排序算法 堆的两个特点&#xff1a; 一颗完全二叉树堆中每个节点都必须大于等于&#xff08;或者小于等于&#xff09;其左右子节点的值&#xff1b; 对于每个节点的值都大于等于子树中每个节点值的堆&#xff…

29 | 堆的应用:如何快速获取到Top 10最热门的搜索关键词?

为什么评价算法性能是根据时间和空间复杂度&#xff0c;而不是别的参数&#xff1f;是因为计算机结构是冯诺依曼体系&#xff0c;除了输入输出设备和控制器&#xff0c;就剩下运算器和存储器了 问题引入 搜索引擎的热门搜索排行榜功能是如何实现的&#xff1f;搜索引擎每天会…

多线程——线程间的同步通信

1、概要 线程间的相互作用&#xff1a;线程之间需要一些协调通信&#xff0c;来共同完成一件任务。线程间的协调通信主要通过wait方法和notify方法来完成。因为wait和notify方法定义在Object类中&#xff0c;因此会被所有的类所继承。这些方法都是final的&#xff0c;即它们都是…

30 | 图的表示:如何存储微博、微信等社交网络中的好友关系?

列出功能需求->翻译成逻辑算法->抽象出数据结构->确定物理存储结构 后面的不会脱离前面的独立存在&#xff0c;只存在于工作流的运用中&#xff0c;所以不能把它们独立地看。 问题引入 在微博中&#xff0c;两个人可以互相关注&#xff1b;在微信中&#xff0c;两个…

部署OpenStack问题汇总(五)--openstack中删除虚拟主机,状态一直未deleting

【原创文章&#xff0c;转载请注明出处】 一、我重启了该机器&#xff0c;之后想删除没有创建成功的虚拟机(没有打开cpu的vt)&#xff0c;结果发现状态一直为deleting状态。在这个状态下创建虚拟机也失败。 二、分析&#xff1a;在/var/log/nova/nova-compute.log的log找到如下…