RaabitMQ 快速入门

🎉欢迎大家观看AUGENSTERN_dc的文章(o゜▽゜)o☆✨✨

🎉感谢各位读者在百忙之中抽出时间来垂阅我的文章,我会尽我所能向的大家分享我的知识和经验📖

🎉希望我们在一篇篇的文章中能够共同进步!!!

🌈个人主页:AUGENSTERN_dc

🔥个人专栏:C语言 |Java | 数据结构 | 算法 | MySQL | RabbitMQ | Redis

⭐个人格言:

一重山有一重山的错落,我有我的平仄

一笔锋有一笔锋的着墨,我有我的舍得


接下来,我会向大家介绍如何快速入门RabbitMQ,以及如何编写一个简单的RabbitMQ代码

1. 引入依赖

在编写我们的代码之前,我们需要引入RabbitMQ的依赖:

如果你使用的是Maven, 你可以使用以下依赖:

<dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.7.3</version>
</dependency>

 2. 生产者消费者模型

在编写代码之前,大家需要了解生产者消费者模型:

生产者-消费者模型(Producer-Consumer Model): 是一种经典的多线程同步问题,用于解决生产者线程和消费者线程之间的数据共享和同步问题。它在多线程编程、并发编程以及分布式系统中被广泛应用。

2.1 主要角色

生产者-消费者模型涉及两个主要角色:

1. 生产者(producer)

  • 负责生成数据并将其放入缓冲区(Buffer)。
  • 如果缓冲区已满,生产者需要等待,直到缓冲区有空间可以存放数据。

2. 消费者(consumer)

  • 负责从缓冲区中取出数据并消费。
  • 如果缓冲区为空,消费者需要等待,直到缓冲区中有数据可以消费。

缓冲区(Buffer)是一个共享资源,用于存储生产者生成的数据,供消费者消费。

2.2 关键问题

生产者-消费者模型需要解决以下两个关键问题:

1. 互斥访问

  • 多个线程(生产者和消费者)需要访问共享的缓冲区,因此需要确保对缓冲区的访问是互斥的,避免数据竞争和不一致。

2. 同步问题:

  • 生产者需要在缓冲区有空间时才能生产数据。
  • 消费者需要在缓冲区有数据时才能消费数据。
  • 需要一种机制来协调生产者和消费者之间的同步。

3. 编写生产者代码

3.1 创建连接

要想使用创建一个生产者,首先需要将生产者和RabbitMQ的服务器进行连接

// 1. 创建连接⼯⼚
ConnectionFactory factory = new ConnectionFactory();
//2. 设置参数
factory.setHost("你的RabbitMQ服务器IP");//ip 默认值localhost
factory.setPort(5672); //默认值5672
factory.setVirtualHost("test");//虚拟机名称, 默认 /
factory.setUsername("guest");//⽤⼾名,默认guest
factory.setPassword("guest");//密码, 默认guest
//3. 创建连接Connection
Connection connection = factory.newConnection();

RabbitMQ 默认的⽤于客⼾端连接的TCP 端⼝号是5672, 需要提前进⾏开放

3.2 创建Channel

//4. 创建channel通道
Channel channel = connection.createChannel();

⽣产者和消费者创建的channel并不是同⼀个

3.3 声明一个队列Queue

/*queueDeclare(String queue, boolean durable, boolean exclusive, booleanautoDelete, Map<String, Object> arguments)1.queue: 队列名称2.durable: 是否持久化.true-设置队列为持久化, 待久化的队列会存盘,服务器重启之后, 消息不丢失。3.exclusive:* 是否独占, 只能有⼀个消费者监听队列* 当Connection关闭时, 是否删除队列4.autoDelete: 是否⾃动删除, 当没有Consumer时, ⾃动删除掉5.arguments: ⼀些参数
*/
//如果没有⼀个hello_world 这样的⼀个队列, 会⾃动创建, 如果有, 则不创建
channel.queueDeclare("hello",true,false,false,null);

3.4 发送消息

当一个新的RabbitMQ节点启动时,他会预声明(declare)几个内置的交换机, 内置交换机名称是空字符串(""), 生产者发送的消息会根据队列名称直接路由到对应的队列.

例如: 如果有⼀个名为 "hello" 的队列, ⽣产者可以直接发送消息到 "hello" 队列, ⽽消费者可以从 "hello" 队列中接收消息, ⽽不需要关⼼交换机的存在. 这种模式⾮常适合简单的应⽤场景,其中⽣产者和消费者之间的通信是⼀对⼀的.
 

//6. 通过channel发送消息到队列中
/*basicPublish(String exchange, String routingKey, AMQP.BasicProperties props, byte[] body)1.exchange: 交换机名称, 简单模式下, 交换机会使⽤默认的""2.routingKey: 路由名称, routingKey = 队列名称3.props: 配置信息4.body: 发送消息的数据
*/
String msg = "Hello World";
//使⽤的是内置交换机. 使⽤内置交换机时, routingKey要和队列名称⼀样, 才可以路由到对应的队列上去
channel.basicPublish("","hello",null,msg.getBytes());
System.out.println(msg + "消息发送成功");

3.5 释放资源

//显式地关闭Channel是个好习惯, 但这不是必须的, Connection关闭的时候,Channel也会⾃动关闭.
channel.close();
connection.close();

4. 编写消费者代码

4.1 创建连接

和生产者类似, 想要接收RabbtiMQ的消息, 首先需要和RabbitMQ建立一个连接

//1. 创建连接
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("你的RabbitMQ服务器IP");
factory.setPort(5672);
factory.setUsername("guest");
factory.setPassword("guest");
factory.setVirtualHost("test");

 这里要注意和生产者使用同一个虚拟机

4.2 创建Channel

//2. 创建Channel
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();

4.3 声明队列

//3. 声明队列
channel.queueDeclare("hello", true, false, false, null);

 这里需要注意, 要和生产者使用同一个队列, 这样生产者发送的消息, 才能被消费者正常接收

4.4 消费资源

//4. 消费资源
/**
*  参数说明:
*  consumerTag : 消费者标签, 通常是消费者在订阅队列时指定的.
*  envelope : 包含消息的封包信息,如队列名称, 交换机等.
*  properties : ⼀些配置信息
*  body : 消息的具体内容
*/
DefaultConsumer consumer = new DefaultConsumer(channel) {//从队列中收到消息就会执行的方法@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("Received: " + new String(body));}
};
/**
*  参数说明:
*  queue: 队列名称
*  autoAck: 是否自动确认
*  callback: 接收到消息后, 执行的逻辑是什么
*/
channel.basicConsume("hello", true, consumer);

这里的DefaultConsumer 是 RabbitMQ提供的⼀个默认消费者, 实现了Consumer 接⼝.

Consumer ⽤于定义消息消费者的⾏为. 当我们需要从RabbitMQ接收消息时, 需要提供⼀个实现了Consumer 接⼝的对象.

4.5 释放资源

// 5. 释放资源
channel.close();
connection.close();

当我们运行生产者代码时, 就会向RabbitMQ服务器发送一条消息

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class ProducerDemo {public static void main(String[] args) throws IOException, TimeoutException {//1.建立连接ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");factory.setPort(5672);factory.setUsername("study");factory.setPassword("study");factory.setVirtualHost("test");Connection connection = factory.newConnection();//2. 开启通道Channel channel = connection.createChannel();//3. 声明交换机  使用内置的交换机//4. 声明队列/***  参数说明:*  queue: 队列名称*  durable: 可持久化*  exclusive: 是否独占*  autoDelete: 是否自动删除*  arguments: 参数*/channel.queueDeclare("hello", true, false, false, null);//5. 发送消息/***  参数说明:*  exchange: 交换机名称*  routingKey: 路由的规则, 使用内置交换机, routingKey和队列名称保持一致*  props: 属性配置*  body: 要发送的消息*/String msg = "hello rabbitmq";channel.basicPublish("", "hello", null, msg.getBytes());System.out.println("消息发送成功!!");//6. 资源释放channel.close();connection.close();}
}

当我运行消费者代码时, 就会从RabbitMQ服务器中获取一条消息

import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class ConsumerDemo {public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {//1. 创建连接ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");factory.setPort(5672);factory.setUsername("study");factory.setPassword("study");factory.setVirtualHost("test");//2. 创建ChannelConnection connection = factory.newConnection();Channel channel = connection.createChannel();//3. 声明队列channel.queueDeclare("hello", true, false, false, null);//4. 消费资源/***  参数说明:*  queue: 队列名称*  autoAck: 是否自动确认*  callback: 接收到消息后, 执行的逻辑是什么*/DefaultConsumer consumer = new DefaultConsumer(channel) {//从队列中收到消息就会执行的方法@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("Received: " + new String(body));}};channel.basicConsume("hello", true, consumer);//5. 关闭资源
//        Thread.sleep(1000);channel.close();connection.close();}
}

依次运行生产者消费者代码, 就能得到以下结果

以上就是本章的所有内容, 谢谢大家观看!!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/77034.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

语音识别——根据声波能量、VAD 和 频谱分析实时输出文字

SenseVoiceSmall网络结构图 ASR(语音识别)是将音频信息转化为文字的技术。在实时语音识别中,一个关键问题是:如何决定将采集的音频数据输入大模型的最佳时机?固定时间间隔显然不够灵活,太短可能导致频繁调用模型,太长则会延迟文字输出。有没有更智能的方式?答案是肯定…

AI大模型如何重塑科研范式:从“假说驱动”到“数据涌现”

📝个人主页🌹:慌ZHANG-CSDN博客 🌹🌹期待您的关注 🌹🌹 一、引言:科研进入“模型共研”时代 传统科研范式通常以“假设→实验→验证→理论”的方式推进,这一经典路径建立在人类的认知能力与逻辑推理基础上。然而,随着数据规模的爆炸式增长与知识系统的高度复杂…

使用Python写入JSON、XML和YAML数据到Excel文件

在当今数据驱动的技术生态中&#xff0c;JSON、XML和YAML作为主流结构化数据格式&#xff0c;因其层次化表达能力和跨平台兼容性&#xff0c;已成为系统间数据交换的通用载体。然而&#xff0c;当需要将这类半结构化数据转化为具备直观可视化、动态计算和协作共享特性的载体时&…

面试题:Eureka和Nocas的区别

Eureka 与 Nacos 核心区别对比 一、功能定位与核心能力 ‌维度‌‌Eureka‌‌Nacos‌‌核心功能‌专注服务注册与发现&#xff0c;无配置管理功能‌:ml-citation{ref“1,3” data“citationList”}集成服务注册、发现、配置管理、动态DNS等‌:ml-citation{ref“1,3” data“c…

2025年4月15日 百度一面 面经

目录 1. 代理相关 从静态代理到动态代理 2. cglib可以代理被final修饰的类吗,为什么 3. JVM 体系结构 4. 垃圾回收算法 5. 什么是注解 如何使用 底层原理 6. synchronized和reentrantlock 7. 讲一下你项目中 redis的分布式锁 与java自带的锁有啥区别 8. post 请求和 ge…

AI改变生活

AI改变生活 人工智能&#xff08;AI&#xff09;在我们生活中的应用越来越广泛&#xff0c;深刻地改变了我们的工作和生活方式。以下是一些AI实际应用的实例&#xff0c;以及它们如何影响我们的日常生活。 1. 智能助手 智能助手如Siri、Alexa和Google Assistant等&#xff0…

信奥赛之c++基础(取模运算与数位分离)

🎮 数字拆解大冒险——取模运算与数位分离魔法课 🍬 第一章:糖果分装术——取模运算 🍭 分糖果游戏 7颗糖每人分3颗: 每人得到:7 / 3 = 2颗剩余糖果:7 % 3 = 1颗(%就是取模符号) 就像把糖果装袋后剩下的零散糖粒!🔧 取模运算说明书 算式比喻结果10 % 310颗糖分…

揭秘大数据 | 21、软件定义计算

老夫先将这个小系列的前两篇内容链接奉上&#xff0c;方便感兴趣的朋友一气读之。 揭秘大数据 | 19、软件定义的世界-CSDN博客 揭秘大数据 | 20、软件定义数据中心-CSDN博客 今天&#xff0c;书接上文&#xff0c;开聊软件定义计算的那些事儿&#xff01; 虚拟化是软件定义…

FPGA-DDS技术的波形发生器

1.实验目的 1.1掌握直接数字频率合成&#xff08;DDS&#xff09;的基本原理及其实现方法。 1.2在DE2-115 FPGA开发板上设计一个可调频率的正弦波和方波发生器&#xff0c;频率范围10Hz~5MHz&#xff0c;最小分辨率小于1kHz。 1.3使用Quartus II进行仿真&#xff0c;并通过S…

LeetCode[541]反转字符串Ⅱ

思路&#xff1a; 题目给我们加了几个规则&#xff0c;剩余长度小于2k&#xff0c;大于等于k就反转k个&#xff0c;小于k就全部反转&#xff0c;我们按照这个逻辑来就行。 第一就是大于等于k就反转k个&#xff0c;我们for循环肯定是i2k了&#xff0c;接下来就是判断是否大于等于…

实现定长的内存池

池化技术 所谓的池化技术&#xff0c;就是程序预先向系统申请过量的资源&#xff0c;然后自己管理起来&#xff0c;以备不时之需。这个操作的价值就是&#xff0c;如果申请与释放资源的开销较大&#xff0c;提前申请资源并在使用后并不释放而是重复利用&#xff0c;能够提高程序…

路由器原理与配置技术详解

一、路由基础原理 1.1 路由器的核心功能 网络层设备&#xff1a;工作在OSI参考模型第三层&#xff0c;实现不同网络间的互联互通智能路径选择&#xff1a;基于路由表为数据包选择最优传输路径协议转换&#xff1a;处理不同网络接口间的协议差异&#xff08;如以太网与PPP&…

Leetcode 3518. Smallest Palindromic Rearrangement II

Leetcode 3518. Smallest Palindromic Rearrangement II 1. 解题思路2. 代码实现 题目链接&#xff1a;Leetcode 3518. Smallest Palindromic Rearrangement II 1. 解题思路 这一题是题目Leetcode 3517. Smallest Palindromic Rearrangement I的升级版本&#xff0c;其主要的…

大模型——Crawl4AI 中的数据提取策略

大模型——Crawl4AI 中的数据提取策略 在本章中,将详细介绍在 Crawl4AI 中可用的数据提取策略。这些策略包括: LLMExtractionStrategy:用于详细内容提取。JsonCssExtractionStrategy:使用 CSS 选择器进行结构化数据检索。CosineStrategy:基于余弦相似性进行有效的语义分段…

职坐标解码互联网行业转型发展新动能

当前&#xff0c;互联网行业正以前所未有的速度重塑全球产业格局。工信部最新数据显示&#xff0c;我国互联网企业营收连续三年保持双位数增长&#xff0c;其中百强企业在人工智能、物联网等领域的投入强度同比提升40%&#xff0c;展现出强劲的技术引领力。与此同时&#xff0c…

linux多线(进)程编程——(4)进程间的传音术(命名管道)

前言&#xff08;前情回顾&#xff09; 进程君&#xff08;父进程&#xff09;在开发出匿名管道这门传音术后&#xff0c;解决了和自己孩子&#xff08;子进程&#xff09;间的沟通问题&#xff0c;父子关系趋于融洽。和孩子沟通后&#xff0c;进程君发现&#xff0c;自己脱离…

在IDEA里面建立maven项目(便于java web使用)

具体步骤&#xff1a; 第一次有的电脑你再创建项目的时候右下角会提醒你弹窗&#xff1a;让你下载没有的东西 一定要下载&#xff01;&#xff01;可能会很慢 运行结果&#xff1a; 因为他是默认的8080端口所以在运行的时候输入的url如下图&#xff1a; 新建了一个controller代…

【13】数据结构之树结构篇章

目录标题 树Tree树的定义树的基本概念树的存储结构双亲表示法孩子表示法孩子兄弟表示法 二叉树二叉树与度不超过&#xff12;的普通树的不同之处二叉树的基本形态二叉树的分类二叉树的性质 二叉树的顺序存储二叉树的链式存储二叉树的链式存储的结点结构树的遍历先序遍历中序遍历…

雷达生命探测仪,地震救援的生命探测先锋|鼎跃安全

在地震、山体滑坡、坍塌建筑等突发灾害中&#xff0c;会严重摧毁建筑物&#xff0c;造成倒塌和人员被困&#xff1b;在瓦砾堆、混凝土板层中&#xff0c;受困人员的生命安全常常面临严峻威胁。传统救援手段通常存在响应时间长、监测精度有限等不足。 救援现场往往环境复杂&…

512天,倔强生长:一位技术创作者的独白

亲爱的读者与同行者&#xff1a; 我是倔强的石头_&#xff0c;今天是我在CSDN成为创作者的第512天。当系统提示我写下这篇纪念日文章时&#xff0c;我恍惚间想起了2023年11月19日的那个夜晚——指尖敲下《开端——》的标题&#xff0c;忐忑又坚定地按下了“发布”键。那时的我…