RabbitMQ的安装使用

RabbitMQ是什么?

MQ全称为Message Queue,消息队列,在程序之间发送消息来通信,而不是通过彼此调用通信。
RabbitMQ 主要是为了实现系统之间的双向解耦而实现的。当生产者大量产生数据时,消费者无法快速消费,那么需要一个中间层。保存这个数据。

为什么使用RabbitMQ?

优点:
1、实现应用系统的解耦,客户端只关心发送消息,而不关心处理。
2、异步提升效率,在主业务逻辑发送消息,异步去处理消息
3、流量削峰,将请求放到mq消息队列中,mysql每秒去拉取请求消费,避免请求全部一下子全部打到mysql,请求过多而崩溃

怎么使用RabbitMQ?

1.安装windows的客户端,参考链接3

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

2.java 代码引入相关jar包
		<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.amqp</groupId><artifactId>spring-amqp</artifactId><version>2.2.3.RELEASE</version></dependency>
3.编写发送,接收消息的工具类
延迟队列配置
package com.next.mq;import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** @desc 延迟队列配置*/
@Configuration
public class RabbitDelayMqConfig {@Bean("delayDirectExchange")public DirectExchange delayDirectExchange() {DirectExchange directExchange = new DirectExchange(QueueConstants.DELAY_EXCHANGE, true, false);//交换机开启延迟设置true,延迟才会生效directExchange.setDelayed(true);return directExchange;}@Bean("delayNotifyQueue")public Queue delayNotifyQueue() {return new Queue(QueueConstants.DELAY_QUEUE);}@Bean("delayBindingNotify")public Binding delayBindingNotify(@Qualifier("delayDirectExchange") DirectExchange delayDirectExchange,@Qualifier("delayNotifyQueue") Queue delayNotifyQueue) {return BindingBuilder.bind(delayNotifyQueue).to(delayDirectExchange).with(QueueConstants.DELAY_ROUTING);}
}
队列配置
package com.next.mq;import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;/*** @desc 队列配置*/
@Configuration
public class RabbitMqConfig {@Bean("directExchange")@Primarypublic DirectExchange directExchange() {return new DirectExchange(QueueConstants.COMMON_EXCHANGE, true, false);}@Bean("notifyQueue")@Primarypublic Queue notifyQueue() {return new Queue(QueueConstants.COMMON_QUEUE);}@Bean("bindingNotify")@Primarypublic Binding bindingNotify(@Qualifier("directExchange") DirectExchange directExchange,@Qualifier("notifyQueue") Queue notifyQueue) {return BindingBuilder.bind(notifyQueue).to(directExchange).with(QueueConstants.COMMON_ROUTING);}
}
发送消息工具类
package com.next.mq;import com.next.util.JsonMapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;import javax.annotation.Resource;
import java.util.UUID;/*** @desc 客户端工具类 -- 发送消息*/
@Component
@Slf4j
public class RabbitMqClient {@Resourceprivate RabbitTemplate rabbitTemplate;//发送同步消息public void send(MessageBody messageBody) {try {//生成唯一的消息idString uuid = UUID.randomUUID().toString();//初始话消息CorrelationData correlationData = new CorrelationData(uuid);//使用模板工具类rabbitTemplate 来发消息rabbitTemplate.convertAndSend(QueueConstants.COMMON_EXCHANGE, QueueConstants.COMMON_ROUTING,JsonMapper.obj2String(messageBody), new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {// 消息持久化message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);//记录日志log.info("message send, {}", message);return message;}}, correlationData);} catch (Exception e) {//日志打印,以便定位问题log.error("message send exception, msg:{}", messageBody.toString(), e);}}/*** @desc 发送延迟消息*/public void sendDelay(MessageBody messageBody, int delayMillSeconds) {try {//设置消息延迟时间messageBody.setDelay(delayMillSeconds);String uuid = UUID.randomUUID().toString();CorrelationData correlationData = new CorrelationData(uuid);//延迟交换机和路由rabbitTemplate.convertAndSend(QueueConstants.DELAY_EXCHANGE, QueueConstants.DELAY_ROUTING,JsonMapper.obj2String(messageBody), new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);// 消息持久化//设置消息延迟的时间(毫秒值)message.getMessageProperties().setDelay(delayMillSeconds);log.info("delay message send, {}", message);return message;}}, correlationData);} catch (Exception e) {log.error("delay message send exception, msg:{}", messageBody.toString(), e);}}
}
接收消息工具类
package com.next.mq;import com.next.dto.RollbackSeatDto;
import com.next.model.TrainOrder;
import com.next.service.TrainOrderService;
import com.next.service.TrainSeatService;
import com.next.util.JsonMapper;
import lombok.extern.slf4j.Slf4j;
import org.codehaus.jackson.type.TypeReference;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import javax.annotation.Resource;/*** @desc rabbitmq的server端 - 延迟接收消息* 用处:在主流程里面发送消息,异步流程里面接收消息,处理。提升代码性能*/@Component
@Slf4j
public class RabbitDelayMqServer {@Resourceprivate TrainSeatService trainSeatService;@Resourceprivate TrainOrderService trainOrderService;@RabbitListener(queues = QueueConstants.DELAY_QUEUE)public void receive(String message) {log.info("delay queue receive message, {}", message);try {MessageBody messageBody = JsonMapper.string2Obj(message, new TypeReference<MessageBody>() {});if (messageBody == null) {return;}switch (messageBody.getTopic()) {case QueueTopic.SEAT_PLACE_ROLLBACK:RollbackSeatDto dto = JsonMapper.string2Obj(messageBody.getDetail(), new TypeReference<RollbackSeatDto>() {});trainSeatService.batchRollbackSeat(dto.getTrainSeat(), dto.getFromStationIdList(), messageBody.getDelay());break;case QueueTopic.ORDER_PAY_DELAY_CHECK:TrainOrder trainOrder = JsonMapper.string2Obj(messageBody.getDetail(), new TypeReference<TrainOrder>() {});trainOrderService.delayCheckOrder(trainOrder);break;default:log.warn("delay queue receive message, {}, no need handle", message);}} catch (Exception e) {log.error("delay queue message handle exception, msg:{}", message, e);}}
}

参考链接:
1.rabbitMQ到底是个啥东西?
2.超详细!!!Windows下安装RabbitMQ的步骤详解
3.windows安装rabbitmq和环境erlang(最详细版,包括对应关系,安装错误解决方法)
4.RabbitMQ安装或启动后,无法访问http://localhost:15672/

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/639853.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Webpack5入门到原理22:提升打包构建速度

HotModuleReplacement 为什么 开发时我们修改了其中一个模块代码&#xff0c;Webpack 默认会将所有模块全部重新打包编译&#xff0c;速度很慢。 所以我们需要做到修改某个模块代码&#xff0c;就只有这个模块代码需要重新打包编译&#xff0c;其他模块不变&#xff0c;这样…

kafka入门(九):副本

副本 副本&#xff08;Replica&#xff09;&#xff0c;指的是分布式系统对数据和服务提供的一种冗余方式。 Kafka通过多副本机制实现故障自动转移&#xff0c;在Kafka集群中某个broker节点失效的情况下仍然保证服务可用。 kafka 副本之间是 一主多从的关系。 其中 leader 副…

打开json文件,读取里边的每一行数据,每一行数据是一个字典,使用matplotlib画图

这段代码的目的是读取 JSON 文件&#xff0c;提取关键信息&#xff0c;然后使用 Matplotlib 绘制四个子图&#xff0c;分别显示不同的指标随着 iter 变化的情况。这种图形化分析有助于直观地了解模型的性能。 画图结果如下&#xff1a; json文件格式如下&#xff1a;下面只粘贴…

大模型镜像打包实战:CodeGeeX2为例

资源地址 docker torch镜像地址 CodeGeeX2 github 构建思路 查看CodeGeeX2项目&#xff0c;官方已经提供好启动脚本&#xff0c;配置好各种依赖应该就可以运行。 python ./demo/run_demo.pyusage: run_demo.py [-h] [--model-path MODEL_PATH] [--example-path EXAMPLE_PAT…

计算机网络学习The next day

在计算机网络first day中&#xff0c;我们了解了计算机网络这个科目要学习什么&#xff0c;因特网的概述&#xff0c;三种信息交换方式等&#xff0c;在今天&#xff0c;我们就来一起学习一下计算机网络的定义和分类&#xff0c;以及计算机网络中常见的几个性能指标。 废话不多…

yarn集群datanode无法启动问题排查

一、问题场景 hdfs无法访问&#xff0c;通过jps命令查看进程&#xff0c;发现namenode启动成功&#xff0c;但是所有datanode都没有启动&#xff0c;重启集群&#xff08;start-dfs.sh&#xff09;后仍然一样 二、原因分析 先看下启动的日志有无报错。打开Hadoop的日志目录 …

C#,入门教程(24)——类索引器(this)的基础知识

上一篇&#xff1a; C#&#xff0c;入门教程(23)——数据类型转换的一点基础知识https://blog.csdn.net/beijinghorn/article/details/124187182 工业软件首先要求高可靠性、高可维护性。 作为工业软件的开发者&#xff0c;我们对语言重载的需求是&#xff1a;“不可或缺”。 …

创建数组(数组基本方法)

组相同类型数据的集合 java中数组特点&#xff1a; 1.数组在内存中是连续分配的 2.在创建数组时&#xff0c;要指明数组的长度 3.访问数组&#xff0c;通过索引&#xff0c;从0开始&#xff0c;到数组长度-1 功能&#xff1a; 1.插入&#xff1a;向索引位置插入一个元素&#…

Gitee Reward让开源作者不再为爱发电

一、什么是Gitee Reward&#xff1f; Gitee Reward是Gitee为改善开源开发生命周期提出的新策略。开源项目的支持者们可以更轻松地为其喜爱的项目提供资金&#xff0c;贡献者们也可以因为其不懈的开源贡献得到奖励。 二、Gitee Reward上允许哪些类型的项目&#xff1f; 允许任…

stable diffuison的安装和使用

stable diffuison的安装和使用 简单介绍 Stable Diffusion是一个深度学习文本到图像的生成模型&#xff0c;它可以根据文本描述生成详细的图像。这个模型主要应用于文本生成图像的场景中&#xff0c;通过给定的文本提示词&#xff0c;模型会输出一张与提示词相匹配的图片。 S…

Cadence——布局部分相关教程

本文章基于【凡亿】Cadence Allegro 17.4零基础入门66讲PCB Layout设计实战加个人理解写出 &#xff08;一&#xff09;中英文切换 注意&#xff1a;只是将选项卡部分切换中文 1&#xff0c;设置中文 a,打开PCB Editor 17.4以后&#xff0c;点击Help和About b,可以看到与下…

服务器或服务器主板中的BIOS更新详解

BIOS更新总共有三种方式&#xff1a;DOS、UEFI Shell以及BMC网页更新&#xff0c;而其中&#xff0c;DOS与Shell的更新方式类似&#xff0c;因此以下为统一描述。 一、UEFI Shell或DOS下更新 当我们下载了官网的BIOS更新包并解压后可以获得一些更新文件&#xff0c;在更新文件…

舞动微服务的安全舞伴:服务熔断与服务降级的精妙演绎

目录 引言 1、服务熔断&#xff1a;避免连锁反应的舞姿 1.1 什么是服务熔断&#xff1f; 1.2服务熔断的工作原理 场景&#xff1a;支付服务的重要性 1. 监控支付服务 2. 设定阈值 3. 熔断器状态 4. 触发熔断 5. 定时检测 6. 自动恢复 1.3解析 2、服务降级&#xf…

Python基础第七篇(Python的文件操作)

文章目录 一、文件编码二、文件的读取操作1.操作代码2.读出结果 三、文件的写出操作1.源代码2.读出结果 四、文件的追加操作1.源代码2.读出结果 这篇文章旨在深入浅出地介绍Python在文件操作上的能力&#xff0c;包括文件的编码、读取和写入等基本操作。内容丰富、易于理解&…

Homicide+Reports1980-2014连环凶案数据,CSV格式

这个数据集提供了关于谋杀案件的详细信息&#xff0c;包括涉及的机构、受害者和肇事者的信息&#xff0c;以及犯罪的其他细节。下面是每个字段的详细解释&#xff1a; Record ID: 记录的唯一标识符。Agency Code: 执行机构的代码。Agency Name: 执行机构的名称。Agency Type: …

公网环境调试本地配置的Java支付宝沙箱环境模拟支付场景

文章目录 前言1. 下载当面付demo2. 修改配置文件3. 打包成web服务4. 局域网测试5. 内网穿透6. 测试公网访问7. 配置二级子域名8. 测试使用固定二级子域名访问 前言 在沙箱环境调试支付SDK的时候&#xff0c;往往沙箱环境部署在本地&#xff0c;局限性大&#xff0c;在沙箱环境…

java SSM项目预算生成管理系统myeclipse开发mysql数据库springMVC模式java编程计算机网页设计

一、源码特点 java SSM项目预算生成管理系统是一套完善的web设计系统&#xff08;系统采用SSM框架进行设计开发&#xff0c;springspringMVCmybatis&#xff09;&#xff0c;对理解JSP java编程开发语言有帮助&#xff0c;系统具有完整的 源代码和数据库&#xff0c;系统主…

浅析Java虚拟机中的ZGC

引言 为什么需要垃圾回收&#xff08;Garbage Collection&#xff09; 垃圾回收是Java开发中的关键机制&#xff0c;负责自动管理内存&#xff0c;防止内存泄漏&#xff0c;提高开发效率和应用程序的稳定性。 Java中主要的垃圾回收方法 标记-清除算法&#xff08;Mark and …

盘点好用内容合规监测工具

网页敏感内容监测 Web Purify 由 WebPurify 提供&#xff0c;这是一个专门从事内容审核和过滤服务的公司。 核心功能 ● 文本审核&#xff1a;加强脏话过滤&#xff0c;标记仇恨言论、偏执、性挑逗等 ● 图片审核&#xff1a;让个人资料照片、社交应用程序、产品定制远离令…

python windows和linux 文件同步

在Python中&#xff0c;可以使用paramiko库来实现Windows和Linux之间的文件同步。paramiko是一个用于SSH连接的Python库&#xff0c;可以用于在Windows和Linux之间进行文件传输。 以下是一个简单的示例代码&#xff0c;演示如何使用paramiko库在Windows和Linux之间同步文件&am…