RabbitMQ的安装与使用

RabbitMQ的安装与使用

  • 介绍
  • 一、RabbitMQ的安装
    • 1 查找镜像
    • 2 拉取镜像
    • 3 查看镜像
    • 4 创建容器
    • 5 查看容器
    • 6 访问测试
  • 二、RabbitMQ的使用
    • 1 创建项目
    • 2 配置文件
    • 3 队列配置文件
    • 4 消费者
    • 5 生产者
    • 6 测试
  • 三、交换器
  • 四、普通队列Demo
  • 五、死信队列Demo
    • 1 介绍
    • 2 示例
      • 2.1 配置
      • 2.2 生产者
      • 2.3 消费者
      • 2.4 死信消费者
      • 2.5 结果
  • 六、延时队列Demo
    • 1 安装延迟插件
      • 1.1 下载插件
      • 1.2 将插件拷贝到RabbitMQ容器的插件目录
      • 1.3 进入到容器
      • 1.4 开启插件
      • 1.5 查看
    • 2 示例
      • 2.1 配置
      • 2.2 生产者
      • 2.3 消费产者
      • 2.4 结果

介绍

RabbitMQ是一个在AMQP基础上完成的,可复用的企业消息系统。他遵循Mozilla Public License开源协议。开发语言为Erlang。
linux系统中安装RabbitMQ比较繁琐,这里使用的是Docker安装。

一、RabbitMQ的安装

1 查找镜像

docker search rabbitmq:management

在这里插入图片描述

2 拉取镜像

docker pull macintoshplus/rabbitmq-management

在这里插入图片描述

3 查看镜像

docker images

在这里插入图片描述

4 创建容器

docker run -d --hostname mzw-rabbitmq --name rabbitmq -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin -p 15672:15672 -p 5672:5672 c20

命令解读:

  • 运行一个镜像
  • -d 后台守护运行
  • –hostname mzw-rabbitmq 指定主机名称
  • –name 指定容器名称
  • -e RABBITMQ_DEFAULT_USER=admin 指定用户名
  • -e RABBITMQ_DEFAULT_PASS=admin 指定密码
  • -p 15672:15672 -p 5672:5672 端口映射
  • c20 镜像ID 简写
    在这里插入图片描述

5 查看容器

docker ps

在这里插入图片描述

6 访问测试

访问地址:http://192.168.2.xx:15672/
在这里插入图片描述
输入启动容器时设置的用户密码登录
在这里插入图片描述
这就表示RabbitMQ安装成功了

二、RabbitMQ的使用

1 创建项目

创建SpringBoot项目并引入相关依赖
在这里插入图片描述

2 配置文件

# RabbitMQ 配置
spring.rabbitmq.name=rabbitmq-demo01
spring.rabbitmq.host=192.168.2.22
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=admin# 自定义一个属性,设置队列的名称
mq.queue.name=hello-queue

3 队列配置文件

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.CustomExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;
import java.util.Map;// 添加@Configuration 注解,表示一个注解类
@Configuration
public class QueueConfig {@Value("${mq.queue.name}")private String queueName;/*** 初始化短信队列* @return*/@Beanpublic Queue delayedSmsQueueInit() {return new Queue(queueName);}
}

4 消费者

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;/*** 创建一个rabbitmq消费者*/
@Component
public class Receiver {// 接受MQ消息 并 处理消息@RabbitListener(queues = {"${mq.queue.name}"})public void process(String msg){// 处理消息System.out.println("我是MQ消费者,我接收到的消息是:" + msg );}
}

5 生产者

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;/*** 消息提供者*/
@Component
public class Sender {@Autowiredprivate AmqpTemplate template;@Value("${mq.queue.name}")private String queueName;// 发送消息public void send(String msg){// 队列名,消息内容template.convertAndSend(queueName,msg);}}

6 测试

  • 发送消息
    @Autowired
    private Sender sender;
    @Test
    void contextLoads() {sender.send("你好啊......");
    }
    
  • 接收消息
    在这里插入图片描述

三、交换器

RabbitMQ中有五种主要的交互器分别如下

交换器说明
direct发布与订阅 完全匹配
fanout广播
topic主体,规则匹配
fanout转发
custom自定义

四、普通队列Demo

上边已经演示,这里不重复演示。

五、死信队列Demo

1 介绍

死信队列就是在某种情况下,导致消息无法被正常消费(异常,过期,队列已满等),存放这些未被消费的消息的队列即为死信队列

2 示例

2.1 配置

  • 配置文件
# RabbitMQ 配置
spring.rabbitmq.name=rabbitmq-demo01
spring.rabbitmq.host=192.168.2.22
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=admin###死信队列
mq.dlx.exchange=mq_dlx_exchange
mq.dlx.queue=mq_dlx_queue
mq.dlx.routingKey=mq_dlx_key
###备胎交换机
mq.exchange=mq_exchange
mq.queue=mq_queue
mq.routingKey=routing_key
  • 配置类
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;@Configuration
public class MQConfig {/*** 普通交换机*/@Value("${mq.exchange}")private String mqExchange;/*** 普通队列*/@Value("${mq.queue}")private String mqQueue;/*** 普通路由key*/@Value("${mq.routingKey}")private String mqRoutingKey;/*** 死信交换机*/@Value("${mq.dlx.exchange}")private String dlxExchange;/*** 死信队列*/@Value("${mq.dlx.queue}")private String dlxQueue;/*** 死信路由*/@Value("${mq.dlx.routingKey}")private String dlxRoutingKey;/*** 声明死信交换机* @return DirectExchange*/@Beanpublic DirectExchange dlxExchange() {return new DirectExchange(dlxExchange);}/*** 声明死信队列* @return Queue*/@Beanpublic Queue dlxQueue() {return new Queue(dlxQueue);}/*** 声明普通业务交换机* @return DirectExchange*/@Beanpublic DirectExchange mqExchange() {return new DirectExchange(mqExchange);}/*** 声明普通队列* @return Queue*/@Beanpublic Queue mqQueue() {// 普通队列绑定我们的死信交换机Map<String, Object> arguments = new HashMap<>(2);//死信交换机arguments.put("x-dead-letter-exchange", dlxExchange);//死信队列arguments.put("x-dead-letter-routing-key", dlxRoutingKey);return new Queue(mqQueue, true, false, false, arguments);}/*** 绑定死信队列到死信交换机* @return Binding*/@Beanpublic Binding binding(Queue dlxQueue,DirectExchange dlxExchange) {return BindingBuilder.bind(dlxQueue).to(dlxExchange).with(dlxRoutingKey);}/*** 绑定普通队列到普通交换机* @return Binding*/@Beanpublic Binding mqBinding(Queue mqQueue,DirectExchange mqExchange) {return BindingBuilder.bind(mqQueue).to(mqExchange).with(mqRoutingKey);}
}

2.2 生产者

import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.web.bind.annotation.RequestMapping;/*** 生产者*/
@RestController
@Slf4j
public class MQProducer {@Autowiredprivate RabbitTemplate rabbitTemplate;/*** 普通交换机*/@Value("${mq.exchange}")private String mqExchange;/*** 普通路由key*/@Value("${mq.routingKey}")private String mqRoutingKey;@RequestMapping("/sendMsg")public String sendMsg() {String msg = "Hello RabbitMQ ......";//发送消息  参数一:交换机 参数二:路由键(用来指定发送到哪个队列)rabbitTemplate.convertAndSend(mqExchange, mqRoutingKey, msg, message -> {// 设置消息过期时间 10秒过期    如果过期时间内还没有被消费 就会发送给死信队列message.getMessageProperties().setExpiration("10000");return message;});log.info("生产者发送消息:{}", msg);return "success";}
}

2.3 消费者

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;/*** 消费者*/
@Component
@Slf4j
public class MQConsumer {/*** 监听队列回调的方法** @param msg*/@RabbitListener(queues = {"${mq.queue}"})public void mqConsumer(String msg) {log.info("正常普通消费者消息MSG:{}", msg);}
}

2.4 死信消费者

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;/*** 死信消费者*/
@Component
@Slf4j
public class MQDlxConsumer {/*** 死信队列监听队列回调的方法** @param msg*/@RabbitListener(queues = {"${mq.dlx.queue}"})public void mqConsumer(String msg) {log.info("死信队列消费普通消息:msg{}", msg);}}

2.5 结果

访问:http://127.0.0.1:9023/sendMsg 会被 消费者 消费掉
消费者 代码注释掉,在访问http://127.0.0.1:9023/sendMsg,等待10秒钟后会被死信队列接收到。
在这里插入图片描述

六、延时队列Demo

  • 两种方式:
    • 第一种:使用死信队列,将消息放入一个没有被监听的队列上,设置TTL(一条消息的最大存活时间)为延迟的时间,时间到了没有被消费,直接成为死信。监听死信队列来进行操作。
    • 第二种:使用rabbitmq官方提供的delayed插件来真正实现延迟队列。本文对第二种进行详解

1 安装延迟插件

官网下载:https://www.rabbitmq.com/community-plugins.html
我的RabbitMQ是3.12 b版本的,下载此插件
在这里插入图片描述

1.1 下载插件

wget https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/v3.12.0/rabbitmq_delayed_message_exchange-3.12.0.ez

在这里插入图片描述

1.2 将插件拷贝到RabbitMQ容器的插件目录

docker cp ./rabbitmq_delayed_message_exchange-3.12.0.ez de24369edeb4:/plugins

在这里插入图片描述

1.3 进入到容器

docker exec -it de24369edeb4 /bin/bash

1.4 开启插件

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

在这里插入图片描述

1.5 查看

rabbitmq-plugins list

E* 或 e* 代表 插件已启用
在这里插入图片描述
在RabbitMQ控制台可以看到
在这里插入图片描述

2 示例

2.1 配置

  • 配置文件
# RabbitMQ 配置
spring.rabbitmq.name=rabbitmq-demo01
spring.rabbitmq.host=192.168.2.22
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=admin# 自定义一个属性,设置队列的名称
mq.queue.name=hello-queue
  • 配置类
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.CustomExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;
import java.util.Map;/*** 使用x-delayed-message 延时队列插件*/
@Configuration
public class QueueConfig {@Value("${mq.queue.name}")private String queueName;/*** 初始化短信队列* @return*/@Beanpublic Queue delayedSmsQueueInit() {return new Queue(queueName);}/*** 初始化延迟交换机* @return*/@Beanpublic CustomExchange delayedExchangeInit() {Map<String, Object> args = new HashMap<>();// 设置类型,可以为fanout、direct、topicargs.put("x-delayed-type", "direct");// 第一个参数是延迟交换机名字,第二个是交换机类型,第三个设置持久化,第四个设置自动删除,第五个放参数return new CustomExchange("delayed_exchange","x-delayed-message", true,false,args);}/*** 短信队列绑定到交换机* @param delayedSmsQueueInit* @param customExchange* @return*/@Beanpublic Binding delayedBindingSmsQueue(Queue delayedSmsQueueInit, CustomExchange customExchange) {// 延迟队列绑定延迟交换机并设置RoutingKey为smsreturn BindingBuilder.bind(delayedSmsQueueInit).to(customExchange).with("sms").noargs();}
}

2.2 生产者

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;/*** 生产者*/
@RestController
@Slf4j
public class Sender {@Autowiredprivate AmqpTemplate template;@Value("${mq.queue.name}")private String queueName;// 发送消息@RequestMapping("/sendMsg")public void send(){String msg = "Hello RabbitMQ ......";// 队列名,消息内容template.convertAndSend(queueName,msg);log.info("生产者发送消息:{}", msg);}@RequestMapping("/sendDelayedMsg")public void sendDelayedMsg(){String msg = "Hello RabbitMQ Delayed ......";// 第一个参数是延迟交换机名称,第二个是Routingkey,第三个是消息主题,第四个是X,并设置延迟时间,单位		是毫秒template.convertAndSend("delayed_exchange","sms",msg,a -> {a.getMessageProperties().setDelay(2000);return a;});log.info("生产者发送延时消息:{}", msg);}
}

2.3 消费产者

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;/*** 消费者*/
@Component
@Slf4j
public class Receiver {// 接受MQ消息 并 处理消息@RabbitListener(queues = {"${mq.queue.name}"})public void process(String msg){// 处理消息log.info("我是MQ消费者,我接收到的消息是:{}", msg);}
}

2.4 结果

访问:http://127.0.0.1:9022/sendMsg
访问:http://127.0.0.1:9022/sendDelayedMsg
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/691095.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

10_Java泛型

一、为什么要有泛型 1.泛型的设计背景 集合容器类在设计阶段/声明阶段不能确定这个容器到底实际存的是什么类型的对象&#xff0c;所以在JDK1.5之前只能把元素类型设计为Object&#xff0c;JDK1.5之后使用泛型来解决。因为这个时候除了元素的类型不确定&#xff0c;其他的部分…

Qt C++春晚刘谦魔术约瑟夫环问题的模拟程序

什么是约瑟夫环问题&#xff1f; 约瑟夫问题是个有名的问题&#xff1a;N个人围成一圈&#xff0c;从第一个开始报数&#xff0c;第M个将被杀掉&#xff0c;最后剩下一个&#xff0c;其余人都将被杀掉。例如N6&#xff0c;M5&#xff0c;被杀掉的顺序是&#xff1a;5&#xff…

14. UE5 RPG使用曲线表格设置回复血量值

之前的文章中&#xff0c;我使用的都是固定的数值来设置血量回复或者蓝量回复&#xff0c;在这篇文章里面&#xff0c;介绍一下使用曲线表格。通过曲线表格我们可以设置多个数值&#xff0c;然后通过去通过修改索引对应的数值去修改回复的血量或者蓝量。 创建曲线表格 首先创…

林浩然与杨凌芸的Java奇遇记:字节流世界的二进制爱情

林浩然与杨凌芸的Java奇遇记&#xff1a;字节流世界的二进制爱情 The Java Adventure of Lin Haoran and Yang Lingyun: Binary Love in the Byte Stream World 在编程宇宙中&#xff0c;有一对程序员CP——林浩然和杨凌芸&#xff0c;他们共同编织着Java王国里那些神秘而又充满…

MySQL--SQL解析顺序

前言&#xff1a; 一直是想知道一条SQL语句是怎么被执行的&#xff0c;它执行的顺序是怎样的&#xff0c;然后查看总结各方资料&#xff0c;就有了下面这一篇博文了。 本文将从MySQL总体架构—>查询执行流程—>语句执行顺序来探讨一下其中的知识。 一、MySQL架构总览&a…

Swift Combine 使用从 PassthroughSubject 预定好的发送的事件测试订阅者 从入门到精通二十三

Combine 系列 Swift Combine 从入门到精通一Swift Combine 发布者订阅者操作者 从入门到精通二Swift Combine 管道 从入门到精通三Swift Combine 发布者publisher的生命周期 从入门到精通四Swift Combine 操作符operations和Subjects发布者的生命周期 从入门到精通五Swift Com…

MSS与cwnd的关系,rwnd又是什么?

慢启动算法是指数递增的 这种指数增长的方式是慢启动算法的一个核心特点&#xff0c;它确保了TCP连接在开始传输数据时能够快速地探测网络的带宽容量&#xff0c;而又不至于过于激进导致网络拥塞。具体来说&#xff1a; 初始阶段&#xff1a;当TCP连接刚建立时&#xff0c;拥…

ubuntu屏幕小的解决办法

1. 安装vmware tools , 再点自适应客户机 执行里面的vmware-install.pl这个文件 &#xff1a;sudo ./vmware-install.pl 执行不了可以放到家目录&#xff0c;我放在了/home/book 里面 最后点这个自适应客户机 然后我这里点不了是因为我点了控制台视图和拉伸客户机&#xff0c…

【Java中23种设计模式-单例模式2--懒汉式2线程安全】

加油&#xff0c;新时代打工人&#xff01; 简单粗暴&#xff0c;学习Java设计模式。 23种设计模式定义介绍 Java中23种设计模式-单例模式 Java中23种设计模式-单例模式2–懒汉式线程不安全 package mode;/*** author wenhao* date 2024/02/19 09:38* description 单例模式…

鸿蒙开发 之 工具安装和环境搭建

DevEco Studio 面向HarmonyOS应用及元服务开发者提供的集成开发环境(IDE)&#xff0c; 助力高效开发。 ArkTS 语言 ArkTS是鸿蒙生态的应用开发语言。它在保持TypeScript&#xff08;简称TS&#xff09;基本语法风格的基础上&#xff0c;对TS的动态类型特性施加更严格的约束&…

Flask 学习99-Flask-SocketIO 快速入门与使用

前言 flask-socketio 为flask应用提供了一个客户端与服务器之间低延迟的双向通讯 官网地址:https://flask-socketio.readthedocs.io/en/latest/intro.html 环境准备 先安装flask-socketio pip install flask-socketio说明Flask-SocketIO 与 js版本客户端不匹配,二者不能正…

机器学习之梯度下降法直观理解

形象化举例&#xff0c;由上图所示&#xff0c;假如最开始&#xff0c;我们在一座大山上的某处位置&#xff0c;因为到处都是陌生的不知道下山的路&#xff0c;所以只能摸索着根据直觉&#xff0c;走一步算一步。在此过程中&#xff0c;每走到一个位置的时候&#xff0c;都会求…

五步解决 Ubuntu 18.04 出现GLIBC_2.28 not found的解决方法

Ubuntu 18.04 出现GLIBC_2.28 not found的解决方法 参考debian网址https://packages.debian.org/buster/并搜索想要的软件或者工具等&#xff0c;如libc6,有结果如下&#xff1a; 具体就不介绍了&#xff0c;请浏览官网了解。 第一步&#xff1a;添加软件源&#xff0c;在/et…

STM32-点亮 LED

目录 1 、电路构成及原理图 2 、编写实现代码 3、代码讲解 4、烧录到开发板调试、验证代码 5、检验效果 本人使用的是朗峰 STM32F103 系列开发板&#xff0c;此笔记基于这款开发板记录。 1 、电路构成及原理图 首先&#xff0c;通过朗峰 F1 开发板 LED 部分原理图看到…

第三十六天| 435. 无重叠区间、763.划分字母区间、56. 合并区间

Leetcode 435. 无重叠区间 题目链接&#xff1a;435 无重叠区间 题干&#xff1a;给定一个区间的集合 intervals &#xff0c;其中 intervals[i] [starti, endi] 。返回 需要移除区间的最小数量&#xff0c;使剩余区间互不重叠 。 思考&#xff1a;贪心法。和452 用最少数量的…

Gin框架: HTML模板渲染之配置与语法详解

Gin的HTML模板配置 1 &#xff09;单一目录的配置 配置模板目录&#xff0c;在与main.go同级下, 新建目录&#xff0c;下面二选一&#xff0c;仅作举例, 这里选择 tpls templatestpls 在 tpls 目录下新建 news.html <!-- 最简单的 --> <h1>News Page</h1>&l…

如何在nginx增加健康检查接口

在docker中部署的nginx或者在nginx部署的nginx一般是需要一个健康检查接口的 这样的话&#xff0c;就可以确定容器当前的状态是否是健康的 那么&#xff0c;如何给nginx增加一个健康检查的接口呢&#xff1f; 接下来呢&#xff0c;我们就演示一个在nginx中如何增加健康检查的…

ArcGIS API for JavaScript 4.X 本地部署(js,字体)

0 目录&#xff08;4.19&#xff09; /4.19/ 1 修改文件 1.1 init.js 编辑器打开/4.19/init.js搜索文本[HOSTNAME_AND_PATH_TO_JSAPI]&#xff0c;然后将其连同前面的https://替换为http://ip地址/4.19&#xff0c;可以是localhost&#xff0c;只能本机引用 替换后&#xff…

Minio Server + Minio Client 数据迁移、备份

文章目录 1、概要2、mc安装3、添加minio集群4、数据同步4、cp 和 mirror 区别5、效果 1、概要 Minio Server Minio Client 实现minio 不同集群之间的数据迁移、数据备份 2、mc安装 $ wget http://dl.minio.org.cn/client/mc/release/linux-amd64/mc -P /usr/local/bin/ $ c…

Camunda快速入门(三):设计一个人工任务流程并配置表单

接上一篇文章&#xff1a;Camunda快速入门&#xff08;二&#xff09;&#xff1a;设计并执行第一个BPMN流程 在本节中&#xff0c;您将学习如何使用 BPMN 2.0 用户任务让人类参与到您的流程中。 1、添加用户任务活动节点 我们想修改我们的流程&#xff0c;以便我们可以让人…