【初始RabbitMQ】工作队列的实现

工作队列

工作队列(又称为任务队列)的主要思想是避免立即执行资源密集型任务,而不得不等待它完成。 相反我们安排任务在之后执行。我们把任务封装为消息并将其发送到队列。在后台运行的工作进 程将弹出任务并最终执行作业。当有多个工作线程时,这些工作线程将一起处理这些任务

轮训分发消息

我们启动两个工作线程,一个消息发送线程,一个用来接受线程,我们来看看它们两个工作线程是如何工作的

抽取工具类

我们将获取信道这个重复的代码封装为一个类,当时用的时候直接调用

/*** 连接工厂创建信道工具类*/
public class RabbitMqUtils {public static Channel getChannel(){ConnectionFactory factory = new ConnectionFactory();factory.setHost("118.31.6.132");factory.setUsername("admin");factory.setPassword("123");Connection connection = null;try {try {connection = factory.newConnection();} catch (IOException e) {e.printStackTrace();}} catch (TimeoutException e) {e.printStackTrace();}Channel channel = null;try {channel = connection.createChannel();} catch (IOException e) {e.printStackTrace();}return channel;}
}

这里使用try...catch防止之后每次调用都需要抛出异常

生产者代码

/*** 生产者 发送大量消息*/
public class Task01 {//队列名称public static final String QUEUE_NAME = "hello";//发送大量的消息public static void main(String[] args) throws IOException, TimeoutException {Channel channel = RabbitMqUtils.getChannel();/**生成一个队列* 1.队列名称* 2.队列里面的信息是否持久化(磁盘)默认情况时在内存* 3.该队列是否只供一个消费者进行消费 是否消费共享 true是允许* 4.是否自动删除 最后一个消费者断开连接之后 该队列是否自动删除 true自动删除 false不自动删除* 5.其他参数 延迟消息等*/channel.queueDeclare(QUEUE_NAME,false,false,false,null);//从控制台接受消息Scanner scanner = new Scanner(System.in);/*** 发送一个消息* 1.发送到那个交换机* 2.路由的KEY值是哪个 本次是队列的名称* 3.其他参数信息* 4.发送消息的消息体*/while(scanner.hasNext()){String message = scanner.next();channel.basicPublish("",QUEUE_NAME,null,message.getBytes());System.out.println("发送"+message+"完成");}}
}

消费者代码

/*** 这是一个工作线程(消费者)*/
public class Worker01 {//队列名称public static final String QUEUE_NAME = "hello";public static void main(String[] args) throws IOException, TimeoutException {Channel channel = RabbitMqUtils.getChannel();//消息接受DeliverCallback deliverCallback = (consumerTag,message)->{System.out.println(new String(message.getBody()));};//消息接受被取消时CancelCallback cancelCallback = (consumerTag)->{System.out.println(consumerTag+"消息取消消费接口回调");};/*** 消费者信息* 1.消费哪个队列* 2.消费成功之后是否要自动应答 true自动应答 false手动应答* 3.消费者微车才能更改消费的回调* 4.消费者取消消费回调*/System.out.println("C2等待接受消息......................");channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);}
}

启动两个工作线程

在运行之前我们要修改一个选项,这样我们就不需要重复的写消费者2了

启动一个发送线程 

启动程序,用生产者发送四条消息,

结果分析

通过程序执行发现生产者总共发送 4 个消息,消费者 1 和消费者 2 分别分得两个消息,并且 是按照有序的一个接收一次消息

 消息应答

消费者完成一个任务可能需要一段时间,如果其中一个消费者处理一个长的任务并仅只完成 了部分突然它挂掉了,会发生什么情况。RabbitMQ 一旦向消费者传递了一条消息,便立即将该消 息标记为删除。在这种情况下,突然有个消费者挂掉了,我们将丢失正在处理的消息。以及后续 发送给该消费这的消息,因为它无法接收到。 为了保证消息在发送过程中不丢失,rabbitmq 引入消息应答机制,消息应答就是:消费者在接收到消息并且处理该消息之后,告诉 rabbitmq 它已经处理了,rabbitmq 可以把该消息删除了

自动应答

消息发送后立即被认为已经传送成功,这种模式需要在高吞吐量和数据传输安全性方面做权衡,因为这种模式如果消息在接收到之前,消费者那边出现连接或者 channel 关闭,那么消息就丢 失了,当然另一方面这种模式消费者那边可以传递过载的消息,没有对传递的消息数量进行限制,当然这样有可能使得消费者这边由于接收太多还来不及处理的消息,导致这些消息的积压,最终使得内存耗尽,最终这些消费者线程被操作系统杀死,所以这种模式仅适用在消费者可以高效并以某种速率能够处理这些消息的情况下使用

消息应答的方法

  1. Channel.basicAck(用于肯定确认)
    1. RabbitMQ 已知道该消息并且成功的处理消息,可以将其丢弃了
  2. Channel.basicNack(用于否定确认)
  3. Channel.basicReject(用于否定确认)
    1. 与 Channel.basicNack 相比少一个参数(批量应答参数) 不处理该消息了直接拒绝,可以将其丢弃了

Multiple(批量应答)的解释

手动应答的一个好处就是可以批量应答并且减少网络拥堵

multiple 的 true 和 false 代表不同意思:

true:代表批量应答channel 上未应答的消息,比如说 channel 上有传送 tag 的消息 5,6,7,8 当前 tag 是 8 那么此时 5-8 的这些还未应答的消息都会被确认收到消息应答

false:同上面相比 只会应答 tag=8 的消息 5,6,7 这三个消息依然不会被确认收到消息应答

消息自动重新入队

如果消费者由于某些原因失去连接(其通道已关闭,连接已关闭或 TCP 连接丢失),导致消息 未发送 ACK 确认,RabbitMQ 将了解到消息未完全处理,并将对其重新排队。如果此时其他消费者 可以处理,它将很快将其重新分发给另一个消费者。这样,即使某个消费者偶尔死亡,也可以确 保不会丢失任何消息

消息手动应答代码

默认消息采用的是自动应答,所以我们要想实现消息消费过程中不丢失,需要把自动应答改 为手动应答,消费者在上面代码的基础上增加下面画红色部分代码

睡眠工具类:

public class SleepUtils {public static void sleep(int second){try {Thread.sleep(1000*second);} catch (InterruptedException e) {e.printStackTrace();}}
}

生产者:

/*
* 消息再手动应答不丢失、放回消息队列重新消费*/
public class Task2 {//队列名称public static final String TASK_QUEUE_NAME = "ack_queue";public static void main(String[] args) throws IOException, TimeoutException {Channel channel = RabbitMqUtils.getChannel();//声明队列channel.queueDeclare(TASK_QUEUE_NAME,true,false,false,null);//从控制台中输入信息Scanner scanner = new Scanner(System.in);while(scanner.hasNext()){String message = scanner.next();channel.basicPublish("",TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes(StandardCharsets.UTF_8));System.out.println("生产者发出消息:"+message);}}
}

消费者01:

public class Work01 {private static final String ACK_QUEUE_NAME = "ack_queue";public static void main(String[] args) throws IOException {Channel channel = RabbitMqUtils.getChannel();System.out.println("C1等待接收消息短");//消息消费的时候如何处置消息DeliverCallback deliverCallback = (consumerTag,delivery)->{String message = new String(delivery.getBody());SleepUtils.sleep(1);System.out.println("接收到消息:"+message);/*** 1.消息标记tag* 2.是否批量应答未应答的消息*/channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);};//取消消息的回调CancelCallback cancelCallback = consumerTag->{System.out.println("消息消费被中断");};/*** 消费者信息* 1.消费哪个队列* 2.消费成功之后是否要自动应答 true自动应答 false手动应答* 3.消费者微车才能更改消费的回调* 4.消费者取消消费回调*/boolean autoAck = false;channel.basicConsume(ACK_QUEUE_NAME,autoAck,deliverCallback,cancelCallback);}
}

消费者02:

public class Work02 {private static final String ACK_QUEUE_NAME = "ack_queue";public static void main(String[] args) throws IOException {Channel channel = RabbitMqUtils.getChannel();System.out.println("C2等待接收消息长");//消息消费的时候如何处置消息DeliverCallback deliverCallback = (consumerTag,delivery)->{String message = new String(delivery.getBody());SleepUtils.sleep(30);System.out.println("接收到消息:"+message);/*** 1.消息标记tag* 2.是否批量应答未应答的消息*/channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);};//取消消息的回调CancelCallback cancelCallback = consumerTag->{System.out.println("消息消费被中断");};/*** 消费者信息* 1.消费哪个队列* 2.消费成功之后是否要自动应答 true自动应答 false手动应答* 3.消费者微车才能更改消费的回调* 4.消费者取消消费回调*/boolean autoAck = false;channel.basicConsume(ACK_QUEUE_NAME,autoAck,deliverCallback,cancelCallback);}
}

手动应答效果演示

在发送者发送消息 dd,发出消息之后的把 C2 消费者停掉,按理说该 C2 来处理该消息,但是 由于它处理时间较长,在还未处理完,也就是说 C2 还没有执行 ack 代码的时候,C2 被停掉了, 此时会看到消息被 C1 接收到了,说明消息 dd 被重新入队,然后分配给能处理消息的 C1 处理了

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/686360.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【NI-DAQmx入门】调整数据记录长度再进行数据处理

需要注意的是,初学者很容易造成一个大循环,导致采集循环的执行时间过长,最佳操作是采集循环只干采集的事,另起一个循环做数据拆解或分析。 有时需要以一定的采样率获取数据并记录所需的长度。然而,在处理这些数据时&am…

2-17作业

#!/bin/bash read zifu case $zifu in [[:lower:]]) echo "小写字母" ;; [0-9]) echo "是数字字符" ;; [[:upper:]]) echo "是大写字母" ;; esac #!/bin/bash arr(ls ~) len${#arr…

把Activity当做dialog对话框使用

1、引言 在安卓开发中对话框的使用是不可避免的,但是原生的对话框用起来总感觉差点意思,而且位置不好控制,在与界面的交互上也不够灵活,没有像activity那样的生命周期方法,以至于某些特殊的功能无法实现。此时我们就希…

Android Studio新建EMPTY,提示Gradle下载失败,Connect timed out

Android Studio新建EMPTY,提示Gradle下载失败,Connect timed out 找到gradle-wrapper.properites文件,替换distributeionUrl为腾讯镜像,就好了,网上找了好久,就这个有效,是在这里Android导入项…

【分享】图解ADS+JLINK调试ARM

文章是对LPC2148而写的,但是对三星的44B0芯片同样适用,只需要在选择时将相应的CPU选择的S3C44B0就可以了。 JLINK在ADS下调试心得 前两天一个客户用jlink在ADS下调试LPC2148总报错,这个错误我之前在调试LPC2200的时候也碰到过,后…

ESP32学习(2)——点亮LED灯

1.前期准备 开发板原理图如下: 可见LED灯接在了GPIO2口 那么要如何编写代码控制GPIO口的电平高低呢? 我们可以参考micropython的官方文档Quick reference for the ESP32 — MicroPython latest documentation 可见,需要导入machine包 若要…

[嵌入式系统-24]:RT-Thread -11- 内核组件编程接口 - 网络组件 - TCP/UDP Socket编程

目录 一、RT-Thread网络组件 1.1 概述 1.2 RT-Thread支持的网络协议栈 1.3 RT-Thread如何选择不同的网络协议栈 二、Socket编程 2.1 概述 2.2 UDP socket编程 2.3 TCP socket编程 2.4 TCP socket收发数据 一、RT-Thread网络组件 1.1 概述 RT-Thread 是一个开源的嵌入…

Springboot 配置使用 Elasticsearch

一、安装Elasticsearch 1、Windows安装 Windows安装比较简单,ES官网Download Elasticsearch | Elastic下载压缩包,解压出来, bin 目录下有个elasticsearch.bat,双击,就运行起来了。 然后在浏览器输入localhost:9200…

php基础学习之作用域和静态变量

作用域 变量(常量)能够被访问的区域,变量可以在常规代码中定义,也可以在函数内部定义 变量的作用域 在 PHP 中作用域严格来说分为两种,但是 PHP内部还定义一些在严格意义之外的一种,所以总共算三种—— 局部…

《Go 简易速速上手小册》第1章:Go 语言基础(2024 最新版)

文章目录 1.1 Go 语言的安装与环境配置1.1.1 基础知识讲解案例 Demo:简单的 Go 程序 1.1.2 重点案例:搭建一个 Go Web 服务准备工作步骤 1:创建项目目录步骤 2:编写 Web 服务代码步骤 3:运行你的 Web 服务步骤 4&#…

RM电控工程讲义

HAL_CAN_RxFifo0MsgPendingCallback(CAN_HandleTypeDef *hcan) 是一个回调函数,通常在STM32的HAL库中用于处理CAN(Controller Area Network)接收FIFO 0中的消息。当CAN接口在FIFO 0中有待处理的消息时,这个函数会被调用。 HAL库C…

斯坦福大学全能家政服务机器人Mobile ALOHA以及“小群体大智慧”Zooids集群机器人

斯坦福大学成功研发出低成本自主进化克隆人类行为和任务的能力全能型家政服务机器人。 原文标题: 【Mobile ALOHA-Learning Bimanual Mobile Manipulation with Low-Cost Whole-Body Teleoperation】 论文链接:【Mobile ALOHA (mobile-aloha.github.io)】。 以及由斯坦福大学…

【Linux】进程信号的保存 | 自定义捕捉

文章目录 三、信号的阻塞(信号的保存)1. 信号相关其他常见概念2. 在内核中的表示3. sigset_t类型4. 信号集操作函数函数列表注意事项 5. 读取/修改block位图 - sigprocmask6. 读取pending位图 - sigpending 四、信号捕捉1. 信号捕捉的初步认识自定义捕捉…

A股上市公司绿色化转型指数(2007-2022)

数据来源:上市公司年报、上市公司网站信息、上市公司社会责任报告 时间跨度:2007-2022年 数据范围:中国A股上市公司 数据指标 参考Loughran & Mcdonald(2011)的研究,利用年报中披露的文本信息测量企业…

心律守护 基于机器学习的心脏病预测

心律守护 基于机器学习的心脏病预测 心律守护 基于机器学习的心脏病预测项目背景与意义项目数据与特征数据分析与预处理机器学习模型建立与评估结语 心律守护 基于机器学习的心脏病预测 在当今数字化时代,机器学习的应用已经渗透到了医疗保健领域的各个层面。其中&…

什么是PAGA系统

PAGA系统是一种公共广播和通用报警系统,它在船舶、海上钻井平台、石油化工、天然气开采等行业的应用非常广泛。当遇到紧急情况或其他特殊情况时,PAGA系统能够在大范围内进行喊话广播或报警。这种系统通过自动电话系统(如PABX,即自…

【Python--网络编程之Ping命令的实现】

🚀 作者 :“码上有前” 🚀 文章简介 :Python开发技术 🚀 欢迎小伙伴们 点赞👍、收藏⭐、留言💬 Python网络编程之Ping命令的实现 往期内容代码见资源,效果图如下一、实验要求二、协…

巴伦周刊:全球最赚钱对冲基金的成功秘诀是什么?

对冲基金界的明星们过去常常大起大落,有时甚至会实现大满贯式的全面成功,比如约翰•保尔森(John Paulson)在2007-09年金融危机前做空美国房地产市场赚了200亿美元。 但摇摆不定的对冲基金近年来并不是大赢家,相反最好的回报来自于多策略、多…

深度学习系列53:大模型微调概述

参考系列文章:https://zhuanlan.zhihu.com/p/635152813 github链接:https://github.com/liguodongiot/llm-action 1 训练范式 下面这种instructive learning,在模型参数达到80亿后,会发生质的提升: 类似的还有手写pr…

鸿蒙开发系列教程(二十三)--List 列表操作(2)

列表样式 1、设置内容间距 在列表项之间添加间距,可以使用space参数,主轴方向 List({ space: 10 }) { … } 2、添加分隔线 分隔线用来将界面元素隔开,使单个元素更加容易识别。 startMargin和endMargin属性分别用于设置分隔线距离列表侧…