RabbitMQ入门案例

RabbitMQ 是目前比较主流的MQ消息队列中间件,下面简单总结RabbitMQ入门时所做的一些笔记

1.RabbitMQ 入门案例

需求:用 Java 编写两个程序。发送单个消息的生产者和接收消息并打印出来的消费者

1.1 添加依赖

<!--rabbitmq 依赖客户端-->
<dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.8.0</version>
</dependency>
<!--操作文件流的一个依赖-->
<dependency><groupId>commons-io</groupId><artifactId>commons-io</artifactId><version>2.6</version>
</dependency>

1.2 消息生产者

创建一个类作为生产者,最终生产消息到 RabbitMQ 的队列里

步骤:

  1. 创建 RabbitMQ 连接工厂
  2. 进行 RabbitMQ 工厂配置信息
  3. 创建 RabbitMQ 连接
  4. 创建 RabbitMQ 信道
  5. 生成一个队列
  6. 发送一个消息到交换机,交换机发送到队列。“” 代表默认交换机
/*** <p>Class: Producer </p>* <p>Description: 生产者:发消息</p>** @author zhouyi* @version 1.0* @date 2023/7/9*/
public class Producer {//对列名称public static final String QUEUE_NAME = "hello";//发消息public static void main(String[] args) throws IOException, TimeoutException {//创建一个连接工厂ConnectionFactory factory = new ConnectionFactory();//工厂IP 连接RabbitMQ对列factory.setHost("8.219.165.36");//用户名factory.setUsername("admin");//密码factory.setPassword("admin123");//创建连接Connection connection = factory.newConnection();//获取信道Channel channel = connection.createChannel();/*** 生产一个对列* 1.对列名称* 2.对列里面的消息是否持久化,默认情况下,消息存储在内存中* 3.该队列是否只供一个消费者进行消费,是否进行消息共享,true可以多个消费者消费 false:只能一个消费者消费* 4.是否自动删除,最后一个消费者端开链接以后,该队列是否自动删除,true表示自动删除* 5.其他参数*/channel.queueDeclare(QUEUE_NAME, false, false, false, null);//发消息String message = "Hello,world";/*** 发送一个消息* 1.发送到哪个交换机* 2.路由的key值是哪个本次是队列的名称* 3.其他参数信息* 4.发送消息的消息体*/channel.basicPublish("", QUEUE_NAME, null, message.getBytes());System.out.println("消息发送完毕");}
}

运行代码发现如下错误,即读写权限没设置好

Caused by: com.rabbitmq.client.ShutdownSignalException: connection error; protocol method: #method<connection.close>(reply-code=530, reply-text=NOT_ALLOWED - access to vhost '/' refused for user 'admin', class-id=10, method-id=40)

image-20230709180328444

再次运行看到消息队列中已存在消息

image-20230709180630502

image-20230709180852356

方法解释

  • 声明队列:
channel.queueDeclare(队列名/String, 持久化/boolean, 共享消费/boolean, 自动删除/boolean, 配置参数/Map);

配置参数现在是 null,后面死信队列、延迟队列等会用到,如:队列的优先级

队列里的消息如果没有被消费,何去何从?(死信队列)

Map<String, Object> params = new HashMap();
// 设置队列的最大优先级 最大可以设置到 255 官网推荐 1-10 如果设置太高比较吃内存和 CPU
params.put("x-max-priority", 10);
// 声明当前队列绑定的死信交换机
params.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);
// 声明当前队列的死信路由 key
params.put("x-dead-letter-routing-key", "YD");
channel.queueDeclare(QUEUE_NAME, true, false, false, params);
  • 发布消息:

    channel.basicPublish(交换机名/String, 队列名/String, 配置参数/Map, 消息/String);
    

    配置参数现在是 null,后面死信队列、延迟队列等会用到,如:发布的消息优先级

    发布的消息标识符 id

// 给消息赋予 优先级 ID 属性
AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().priority(10).messageId("1")build();
channel.basicPublish("", QUEUE_NAME, properties, message.getBytes());

1.3 消息消费者

创建一个类作为消费者,消费 RabbitMQ 队列的消息,消息消费是通过Channel来完成的

public class Consumer {//队列的名称public static final String QUEUE_NAME = "hello";//接受消息public static void main(String[] args) throws IOException, TimeoutException {//创建一个连接工厂ConnectionFactory factory = new ConnectionFactory();//工厂IP 连接RabbitMQ对列factory.setHost("8.219.165.36");//用户名factory.setUsername("admin");//密码factory.setPassword("admin123");Connection connection = factory.newConnection();Channel channel = connection.createChannel();//声明接收消息DeliverCallback deliverCallback = (consumerTag, message) -> {System.out.println(new String(message.getBody()));};//取消消息时的回调CancelCallback cancelCallback = consumerTag -> {System.out.println("消息消费被中断");};/*** 消费者消费消息* 1.消费哪个队列* 2.消费成功之后是否要自动应答true:代表自动应答false:代表手动应答* 3.消费者未成功消费的回调* 4.消费者取消消费的回调*/channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);}
}

运行结果如下

image-20230709182737671

此时队列里面的消息都被全部消费了

image-20230709182819693

说明消息已被消费掉了

2.Work Queues(工作队列)

Work Queues 是工作队列(又称任务队列)的主要思想是避免立即执行资源密集型任务,而不得不等待它完成。相反我们安排任务在之后执行。我们把任务封装为消息并将其发送到队列。在后台运行的工作进程将弹出任务并最终执行作业。当有多个工作线程时,这些工作线程将一起处理这些任务。

2.1 轮询消费

轮询消费消息指的是轮流消费消息,即每个工作队列都会获取一个消息进行消费,并且获取的次数按照顺序依次往下轮流。

案例中生产者叫做 Task,一个消费者就是一个工作队列,启动两个工作队列消费消息,这个两个工作队列会以轮询的方式消费消息。

轮询案例

  • 首先把 RabbitMQ 的配置参数封装为一个工具类:`RabbitMQUtils,创建信道的工具类
public class RabbitMqUtils {//得到一个连接的 channelpublic static Channel getChannel() throws Exception{//创建一个连接工厂ConnectionFactory factory = new ConnectionFactory();factory.setHost("182.92.234.71");factory.setUsername("admin");factory.setPassword("123");Connection connection = factory.newConnection();Channel channel = connection.createChannel();return channel;}
}

创建一个工作线程,相当于一个消费者

public class Work01 {//队列的名称public static final String QUEUE_NAME = "hello";//接收消息public static void main(String[] args) throws IOException, TimeoutException {Channel channel = RabbitMQUtils.getChannel();//消息的接受DeliverCallback deliverCallback = (consumerTag, message) -> {System.out.println("接收到的消息:" + new String(message.getBody()));};//消息接受被取消时,执行下面的内容CancelCallback cancelCallback = consumerTag -> {System.out.println(consumerTag + "消息被消费者取消消费接口回调逻辑");};//消息的接受channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);}
}

为了演示多个工作线程,可以在IDEA中设置允许同时运行多次

image-20231215224904321

创建好一个工作队列,只需要以多线程方式启动两次该 main 函数即可,以 first、second 区别消息队列。

要开启多线程功能,首先启动该消息队列,然后如图开启多线程:

先运行第一个工作线程:

image-20230709200835846

在运行第二个工作线程,此时已经开了两个工作线程,如下

image-20230709201135491

  • 创建一个生产者,发送消息进程,为了方便消息直接从控制台输入

    public class Task01 {//队列名称public static final String QUEUE_NAME="hello";//发送大量消息public static void main(String[] args) throws IOException, TimeoutException {Channel channel = RabbitMQUtils.getChannel();//队列的声明channel.queueDeclare(QUEUE_NAME,false,false,false,null);//发送消息//从控制台当中接受信息Scanner scanner = new Scanner(System.in);while (scanner.hasNext()) {String message = scanner.next();channel.basicPublish("",QUEUE_NAME,null,message.getBytes());System.out.println("消息发送完成:"+message);}}
    }
    

结果验证

假设生产者生产 AA BB CC DD这四条消息,理论上工作线程C1和工作线程C2轮询接收到消息,期望测试结果如下

  • 生产者(Task01) 生产AA消息,工作线程C1接收到AA消息;
  • 生产者(Task01) 生产BB消息,工作线程C2接收到BB消息;
  • 生产者(Task01) 生产CC消息,工作线程C1接收到CC消息;
  • 生产者(Task01) 生产DD消息,工作线程C2接收到DD消息;

实际运行结果如下,和期望一致

image-20230709201946579

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/224371.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

C++中的reverse函数

1.实现反转数组。 //头文件 #include <algorithm> //使用方法 reverse(a, an);//n为数组中的元素个数 #include<cstdio> #include<iostream> #include<algorithm> using namespace std; int main() {int a[100];int n,k;cin >> n >> k; …

八大排序(插入排序 | 选择排序 | 冒泡排序)

在我们内存中我们一般会有一些没有顺序的数据&#xff0c;我们成为内排序&#xff0c;而今天分享八大排序的是时间复杂度为O&#xff08;N^2&#xff09;的插入排序&#xff0c;选择排序和教学意义比较强的冒泡排序。 插入排序 这是插入排序的动图&#xff0c;通过动图我们也…

Python3 中常见的数据类型

目录 数字(Number)总结 字符串(String)字符串运算符字符串格式化字符串的截取总结 List&#xff08;列表&#xff09;更新列表删除列表元素列表函数&方法总结 Tuple&#xff08;元组&#xff09;修改元组删除元组总结 Set&#xff08;集合&#xff09;Dictionary&#xff0…

3D点云广义零样本分类的递归循环对比生成网络笔记

1 Title Contrastive Generative Network with Recursive-Loop for 3D point cloud generalized zero-shot classification(Yun Hao, Yukun Su, Guosheng Lin, Hanjing Su, Qingyao Wu)【Pattern Recognition】 2 Conclusion This work aims to facilitate research on 3D poi…

【Spring Boot】内网穿透实现远程调用调试

文章目录 1. 本地环境搭建1.1 环境参数1.2 搭建springboot服务项目 2. 内网穿透2.1 安装配置cpolar内网穿透2.1.1 windows系统2.1.2 linux系统 2.2 创建隧道映射本地端口2.3 测试公网地址 3. 固定公网地址3.1 保留一个二级子域名3.2 配置二级子域名3.2 测试使用固定公网地址 4.…

机器学习入门笔记

文章目录 背景具体步骤1.环境搭建2.写个demo1.数据处理2.分割数据集3.用模型训练数据&#xff0c;并得到预测结果4.绘制结果5.评估 背景 最近学习了一些关于机器学习的内容&#xff0c;做个笔记。 具体步骤 1.环境搭建 需要用到的工具&#xff1a;pycharm&#xff0c;anaco…

如何了解蜘蛛池蚂蚁SEO

蜘蛛池是一种基于搜索引擎优化的技术手段&#xff0c;通过模拟蜘蛛爬行行为来提高网站在搜索引擎中的排名&#xff0c;从而增加网站的流量和曝光率。 编辑搜图 如何联系蚂蚁seo&#xff1f; baidu搜索&#xff1a;如何联系蚂蚁SEO&#xff1f; baidu搜索&#xff1a;如何联…

【Pytorch】Fizz Buzz

文章目录 1 数据编码2 网络搭建3 网络配置&#xff0c;训练4 结果预测5 翻车现场 学习参考来自&#xff1a; Fizz Buzz in Tensorflowhttps://github.com/wmn7/ML_Practice/tree/master/2019_06_10Fizz Buzz in Pytorch I need you to print the numbers from 1 to 100, excep…

牛客网BC92逆序输出

答案&#xff1a; #include <stdio.h>int main() {int i0, j0;int arr[10]{0};for(i0;i<10;i) //将10个整数存进数组里{scanf("%d",&arr[i]);}for(j9;j>0;j--) //逆序打印{printf("%d ",arr[j]); //若要求最后一个数后面不打印空格…

【Hive】——CLI客户端(bin/beeline,bin/hive)

1 HiveServer、HiveServer2 2 bin/hive 、bin/beeline 区别 3 bin/hive 客户端 hive-site.xml 配置远程 MateStore 地址 XML <?xml version"1.0" encoding"UTF-8" standalone"no"?> <?xml-stylesheet type"text/xsl" hre…

C# WPF上位机开发(利用tcp/ip网络访问plc)

【 声明&#xff1a;版权所有&#xff0c;欢迎转载&#xff0c;请勿用于商业用途。 联系信箱&#xff1a;feixiaoxing 163.com】 c# wpf如果是用来开发非标上位机的&#xff0c;那么和plc的通信肯定是少不了的。而且&#xff0c;大部分plc都支持modbus协议&#xff0c;所以这个…

neo4j安装报错:neo4j.bat : 无法将“neo4j.bat”项识别为 cmdlet、函数、脚本文件或可运行程序的名称。

neo4j安装报错&#xff1a; neo4j.bat : 无法将“neo4j.bat”项识别为 cmdlet、函数、脚本文件或可运行程序的名称。请检查名称的拼写&#xff0c;如果包括路径&#xff0c;请确 保路径正确&#xff0c;然后再试一次。 解决办法&#xff1a; 在环境变量中的&#xff0c;用户…

Shopee ERP:提升电商管理效率的终极解决方案

Shopee ERP&#xff08;Enterprise Resource Planning&#xff0c;企业资源规划&#xff09;是一款专为Shopee卖家设计的集成化电商管理软件。通过使用Shopee ERP系统&#xff0c;卖家可以更高效地管理他们的在线商店&#xff0c;实现库存管理、订单处理、物流跟踪、财务管理、…

优先考虑类型安全的异构容器

在Java中&#xff0c;异构容器是一种可以存储不同类型元素的容器。为了提高类型安全性&#xff0c;可以使用泛型和类型安全的异构容器&#xff0c;而不是传统的非类型安全容器。下面是一个例子&#xff0c;演示如何使用类型安全的异构容器 import java.util.HashMap; import j…

alpine linux 之嵌入式搭建

目录 序启动修改源安装 openssh设置开机网络 ip参考 序 最近发现了 alpine linux 这个文件系统&#xff0c;这是一个基于 musl libc 和 busybox 的面向安全的轻量级 Linux 发行版。 下载了他的文件系统&#xff0c;只有 3M 多的压缩包&#xff0c;非常适合嵌入式系统。 地址…

AIGC专题报告:ChatGPT的工作原理

今天分享的AIGC系列深度研究报告&#xff1a;《AIGC专题报告&#xff1a;ChatGPT的工作原理》。 &#xff08;报告出品方&#xff1a;省时查&#xff09; 报告共计&#xff1a;107页 前言 ChatGPT 能够自动生成一些读起来表面上甚至像人写的文字的东西&#xff0c;这非常了不…

计算机毕业设计 基于SpringBoot的日常办公用品直售推荐系统的设计与实现 Java实战项目 附源码+文档+视频讲解

博主介绍&#xff1a;✌从事软件开发10年之余&#xff0c;专注于Java技术领域、Python人工智能及数据挖掘、小程序项目开发和Android项目开发等。CSDN、掘金、华为云、InfoQ、阿里云等平台优质作者✌ &#x1f345;文末获取源码联系&#x1f345; &#x1f447;&#x1f3fb; 精…

AWTK 串口屏开发(2) - 数据绑定高级用法

AWTK 串口屏 智能家居示例 1. 功能 这个例子稍微复杂一点&#xff0c;界面这里直接使用了 立功科技 ZDP1440 HMI 显示驱动芯片 例子中的 UI 文件和资源&#xff0c;重点关注数据绑定。在这里例子中&#xff0c;模型&#xff08;也就是数据&#xff09;里包括一台空调和一台咖…

申请香港高才通计划有什么好处和优势?

申请香港高才通计划有什么好处和优势&#xff1f; 据香港特区政府最新消息&#xff0c;截至今年11月底&#xff0c;各项输入人才计划共收到超过20万宗申请&#xff0c;是2022年申请数目的接近四倍。 在香港特区政府积极引进人才的推动下&#xff0c;今年首11个月&#xff0c;超…

FreeModbus--学习函数指针

目录 函数指针 最简单的例子 稍作修改例子 引入协议栈的函数指针 引入协议栈第二处函数指针 函数指针 该协议栈中使用到函数指针&#xff0c;现开展一篇专门存放函数指针的文章。 C语言的函数指针是指向函数的指针变量&#xff0c;可以用来存储和调用函数的地址。在C语言中…