RabbitMQ笔记

RabbitMQ

安装MQ
docker run \-e RABBITMQ_DEFAULT_USER=itheima \-e RABBITMQ_DEFAULT_PASS=123321 \-v mq-plugins:/plugins \--name mq \--hostname mq \-p 15672:15672 \-p 5672:5672 \--network hmall \-d \rabbitmq:3.8-management

可以看到在安装命令中有两个映射的端口:

  • 15672:RabbitMQ提供的管理控制台的端口
  • 5672:RabbitMQ的消息发送处理接口

安装完成后,我们访问 http://192.168.150.101:15672即可看到管理控制台。首次访问需要登录,默认的用户名和密码在配置文件中已经指定了。

  • **publisher**:生产者,也就是发送消息的一方
  • **consumer**:消费者,也就是消费消息的一方
  • **queue**:队列,存储消息。生产者投递的消息会暂存在消息队列中,等待消费者处理
  • **exchange**:交换机,负责消息路由。生产者发送的消息由交换机决定投递到哪个队列。
  • **virtual host**:虚拟主机,起到数据隔离的作用。每个虚拟主机相互独立,有各自的exchange、queue
SpringAMQP
<!--AMQP依赖,包含RabbitMQ--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>
消息发送

首先配置MQ地址,在publisher服务的application.yml中添加配置:

spring:rabbitmq:host: 192.168.150.101 # 你的虚拟机IPport: 5672 # 端口virtual-host: /hmall # 虚拟主机username: hmall # 用户名password: 123 # 密码

然后在publisher服务中编写测试类SpringAmqpTest,并利用RabbitTemplate实现消息发送:

package com.itheima.publisher.amqp;import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;@SpringBootTest
public class SpringAmqpTest {@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void testSimpleQueue() {// 队列名称String queueName = "simple.queue";// 消息String message = "hello, spring amqp!";// 发送消息rabbitTemplate.convertAndSend(queueName, message);}
}
消息接收

首先配置MQ地址,在consumer服务的application.yml中添加配置:

spring:rabbitmq:host: 192.168.150.101 # 你的虚拟机IPport: 5672 # 端口virtual-host: /hmall # 虚拟主机username: hmall # 用户名password: 123 # 密码

然后在consumer服务的com.itheima.consumer.listener包中新建一个类SpringRabbitListener,代码如下:

package com.itheima.consumer.listener;import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
public class SpringRabbitListener {// 利用RabbitListener来声明要监听的队列信息// 将来一旦监听的队列中有了消息,就会推送给当前服务,调用当前方法,处理消息。// 可以看到方法体中接收的就是消息体的内容@RabbitListener(queues = "simple.queue")public void listenSimpleQueueMessage(String msg) throws InterruptedException {System.out.println("spring 消费者接收到消息:【" + msg + "】");}
}
work模型

在spring中有一个简单的配置,可以解决这个问题。我们修改consumer服务的application.yml文件,添加配置:

spring:rabbitmq:listener:simple:prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息

充分利用了每一个消费者的处理能力,可以有效避免消息积压问题。

Work模型的使用:

  • 多个消费者绑定到一个队列,同一条消息只会被一个消费者处理
  • 通过设置prefetch来控制消费者预取的消息数量
交换机

Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!

交换机的类型有四种:

  • Fanout:广播,将消息交给所有绑定到交换机的队列。我们最早在控制台使用的正是Fanout交换机
  • Direct:订阅,基于RoutingKey(路由key)发送给订阅了消息的队列
  • Topic:通配符订阅,与Direct类似,只不过RoutingKey可以使用通配符
  • Headers:头匹配,基于MQ的消息头匹配,用的较少。
Fanout交换机

在publisher服务的SpringAmqpTest类中添加测试方法生产者:

@Test
public void testFanoutExchange() {// 交换机名称String exchangeName = "hmall.fanout";// 消息String message = "hello, everyone!";rabbitTemplate.convertAndSend(exchangeName, "", message);
}

在consumer服务的SpringRabbitListener中添加两个方法,作为消费者:

@RabbitListener(queues = "fanout.queue1")
public void listenFanoutQueue1(String msg) {System.out.println("消费者1接收到Fanout消息:【" + msg + "】");
}@RabbitListener(queues = "fanout.queue2")
public void listenFanoutQueue2(String msg) {System.out.println("消费者2接收到Fanout消息:【" + msg + "】");
}
Direct交换机
  • 队列与交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey(路由key)
  • 消息的发送方在 向 Exchange发送消息时,也必须指定消息的 RoutingKey
  • Exchange不再把消息交给每一个绑定的队列,而是根据消息的Routing Key进行判断,只有队列的Routingkey与消息的 Routing key完全一致,才会接收到消息
Topic交换机

Topic类型的ExchangeDirect相比,都是可以根据RoutingKey把消息路由到不同的队列。
只不过Topic类型Exchange可以让队列在绑定BindingKey 的时候使用通配符!

BindingKey 一般都是有一个或多个单词组成,多个单词之间以.分割,例如: item.insert

通配符规则:

  • #:匹配一个或多个词
  • *:匹配不多不少恰好1个词

举例:

  • item.#:能够匹配item.spu.insert 或者 item.spu
  • item.*:只能匹配item.spu
声明队列和交换机

由程序启动时检查队列和交换机是否存在,如果不存在自动创建。

fanout示例

在consumer中创建一个类,声明队列和交换机:

package com.itheima.consumer.config;import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class FanoutConfig {/*** 声明交换机* @return Fanout类型交换机*/@Beanpublic FanoutExchange fanoutExchange(){return new FanoutExchange("hmall.fanout");}/*** 第1个队列*/@Beanpublic Queue fanoutQueue1(){return new Queue("fanout.queue1");}/*** 绑定队列和交换机*/@Beanpublic Binding bindingQueue1(Queue fanoutQueue1, FanoutExchange fanoutExchange){return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);}/*** 第2个队列*/@Beanpublic Queue fanoutQueue2(){return new Queue("fanout.queue2");}/*** 绑定队列和交换机*/@Beanpublic Binding bindingQueue2(Queue fanoutQueue2, FanoutExchange fanoutExchange){return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);}
}
direct示例

direct模式由于要绑定多个KEY,会非常麻烦,每一个Key都要编写一个binding:

package com.itheima.consumer.config;import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class DirectConfig {/*** 声明交换机* @return Direct类型交换机*/@Beanpublic DirectExchange directExchange(){return ExchangeBuilder.directExchange("hmall.direct").build();}/*** 第1个队列*/@Beanpublic Queue directQueue1(){return new Queue("direct.queue1");}/*** 绑定队列和交换机*/@Beanpublic Binding bindingQueue1WithRed(Queue directQueue1, DirectExchange directExchange){return BindingBuilder.bind(directQueue1).to(directExchange).with("red");}/*** 绑定队列和交换机*/@Beanpublic Binding bindingQueue1WithBlue(Queue directQueue1, DirectExchange directExchange){return BindingBuilder.bind(directQueue1).to(directExchange).with("blue");}/*** 第2个队列*/@Beanpublic Queue directQueue2(){return new Queue("direct.queue2");}/*** 绑定队列和交换机*/@Beanpublic Binding bindingQueue2WithRed(Queue directQueue2, DirectExchange directExchange){return BindingBuilder.bind(directQueue2).to(directExchange).with("red");}/*** 绑定队列和交换机*/@Beanpublic Binding bindingQueue2WithYellow(Queue directQueue2, DirectExchange directExchange){return BindingBuilder.bind(directQueue2).to(directExchange).with("yellow");}
}
基于注解声明

基于@Bean的方式声明队列和交换机比较麻烦,Spring还提供了基于注解方式来声明。

例如,我们同样声明Direct模式的交换机和队列:

@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue1"),exchange = @Exchange(name = "hmall.direct", type = ExchangeTypes.DIRECT),key = {"red", "blue"}
))
public void listenDirectQueue1(String msg){System.out.println("消费者1接收到direct.queue1的消息:【" + msg + "】");
}@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue2"),exchange = @Exchange(name = "hmall.direct", type = ExchangeTypes.DIRECT),key = {"red", "yellow"}
))
public void listenDirectQueue2(String msg){System.out.println("消费者2接收到direct.queue2的消息:【" + msg + "】");
}

是不是简单多了。
再试试Topic模式:

@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "topic.queue1"),exchange = @Exchange(name = "hmall.topic", type = ExchangeTypes.TOPIC),key = "china.#"
))
public void listenTopicQueue1(String msg){System.out.println("消费者1接收到topic.queue1的消息:【" + msg + "】");
}@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "topic.queue2"),exchange = @Exchange(name = "hmall.topic", type = ExchangeTypes.TOPIC),key = "#.news"
))
public void listenTopicQueue2(String msg){System.out.println("消费者2接收到topic.queue2的消息:【" + msg + "】");
}
消息转换器

Spring的消息发送代码接收的消息体是一个Object,而在数据传输时,它会把你发送的消息序列化为字节发送给MQ,接收消息的时候,还会把字节反序列化为Java对象。

只不过,默认情况下Spring采用的序列化方式是JDK序列化。众所周知,JDK序列化存在下列问题:

  • 数据体积过大
  • 有安全漏洞
  • 可读性差

JDK序列化方式并不合适。我们希望消息体的体积更小、可读性更高,因此可以使用JSON方式来做序列化和反序列化。

publisherconsumer两个服务中都引入依赖:

<dependency><groupId>com.fasterxml.jackson.dataformat</groupId><artifactId>jackson-dataformat-xml</artifactId><version>2.9.10</version>
</dependency>

注意,如果项目中引入了spring-boot-starter-web依赖,则无需再次引入Jackson依赖。

配置消息转换器,在publisherconsumer两个服务的启动类中添加一个Bean即可:

@Bean
public MessageConverter messageConverter(){// 1.定义消息转换器Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();// 2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息jackson2JsonMessageConverter.setCreateMessageIds(true);return jackson2JsonMessageConverter;
}

消息转换器中添加的messageId可以便于我们将来做幂等性判断。

消费者接收Object

我们在consumer服务中定义一个新的消费者,publisher是用Map发送,那么消费者也一定要用Map接收,格式如下:

@RabbitListener(queues = "object.queue")
public void listenSimpleQueueMessage(Map<String, Object> msg) throws InterruptedException {System.out.println("消费者接收到object.queue消息:【" + msg + "】");
}
业务改造

案例需求:改造余额支付功能,将支付成功后基于OpenFeign的交易服务的更新订单状态接口的同步调用,改为基于RabbitMQ的异步通知。

我们只关注交易服务,步骤如下:

  • 定义topic类型交换机,命名为pay.topic
  • 定义消息队列,命名为mark.order.pay.queue
  • mark.order.pay.queuepay.topic绑定,BindingKeypay.success
  • 支付成功时不再调用交易服务更新订单状态的接口,而是发送一条消息到pay.topic,发送消息的RoutingKeypay.success,消息内容是订单id
  • 交易服务监听mark.order.pay.queue队列,接收到消息后更新订单状态为已支付
配置MQ

不管是生产者还是消费者,都需要配置MQ的基本信息。分为两步:
1)添加依赖:

  <!--消息发送--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>

2)配置MQ地址:

spring:rabbitmq:host: 192.168.150.101 # 你的虚拟机IPport: 5672 # 端口virtual-host: /hmall # 虚拟主机username: hmall # 用户名password: 123 # 密码
接收消息

在trade-service服务中定义一个消息监听类:
其代码如下:

package com.hmall.trade.listener;import com.hmall.trade.service.IOrderService;
import lombok.RequiredArgsConstructor;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
@RequiredArgsConstructor
public class PayStatusListener {private final IOrderService orderService;@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "mark.order.pay.queue", durable = "true"),exchange = @Exchange(name = "pay.topic", type = ExchangeTypes.TOPIC),key = "pay.success"))public void listenPaySuccess(Long orderId){orderService.markOrderPaySuccess(orderId);}
}
发送消息

修改pay-service服务下的com.hmall.pay.service.impl.PayOrderServiceImpl类中的tryPayOrderByBalance方法:

private final RabbitTemplate rabbitTemplate;@Override
@Transactional
public void tryPayOrderByBalance(PayOrderDTO payOrderDTO) {// 1.查询支付单PayOrder po = getById(payOrderDTO.getId());// 2.判断状态if(!PayStatus.WAIT_BUYER_PAY.equalsValue(po.getStatus())){// 订单不是未支付,状态异常throw new BizIllegalException("交易已支付或关闭!");}// 3.尝试扣减余额userClient.deductMoney(payOrderDTO.getPw(), po.getAmount());// 4.修改支付单状态boolean success = markPayOrderSuccess(payOrderDTO.getId(), LocalDateTime.now());if (!success) {throw new BizIllegalException("交易已支付或关闭!");}// 5.修改订单状态// tradeClient.markOrderPaySuccess(po.getBizOrderNo());try {rabbitTemplate.convertAndSend("pay.topic", "pay.success", po.getBizOrderNo());} catch (Exception e) {log.error("支付成功的消息发送失败,支付单id:{}, 交易单id:{}", po.getId(), po.getBizOrderNo(), e);}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/609138.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

线程与UI操作

子线程中不能执行UI操作。 UI 操作指的是与用户界面&#xff08;User Interface&#xff09;相关的操作&#xff0c;包括但不限于以下几种&#xff1a; 更新视图&#xff1a;例如更改 TextView 的文本内容、设置 ImageView 的图片等。处理用户输入&#xff1a;例如响应按钮点…

银联扫码第三方支付接口申请:开启便捷支付新时代

随着移动支付的普及&#xff0c;越来越多的商家开始接受微信、支付宝等第三方支付平台的付款方式。然而&#xff0c;作为国内最大的银行卡组织&#xff0c;银联也在不断拓展其业务范围&#xff0c;推出了自己的扫码支付接口。本文将为您详细介绍银联扫码第三方支付接口的申请流…

GO语言笔记3-指针

指针的概念 先看一段代码的输出 package main import "fmt" func main(){ var age int 18fmt.Println("age的内存地址值是:",&age)//age的内存地址值是: 0xc000012090// 定义一个指针变量// *int 是一个指针类型&#xff0c;可以理解为指向int类型的…

Python数据分析:入门到实践

一、引言 &#xff08;用手机写的&#xff0c;明天重新排版。&#xff09; 在当今数据驱动的时代&#xff0c;数据分析已经成为各行各业不可或缺的一部分。Python作为一种高效、易学的编程语言&#xff0c;在数据分析领域具有广泛的应用。本文将带你从Python数据分析的入门知…

两个视频怎么合并成一个视频?教你合并视频

两个视频怎么合并成一个视频&#xff1f;如果你是一名视频爱好者&#xff0c;或者是一名自媒体创作者&#xff0c;那么你一定遇到过需要将两个视频合并为一个的情况。有时候&#xff0c;你可能需要将一个长视频切割成多个片段&#xff0c;或者将多个视频片段合并成一个完整的视…

Spring MVC的RequestMapping注解、controller方法返回值

1.使用说明 作用&#xff1a;用于建立请求URL和处理请求方法之间的对应关系。 出现位置&#xff1a; 类上&#xff1a; 请求 URL的第一级访问目录。此处不写的话&#xff0c;就相当于应用的根目录。写的话需要以/开头。它出现的目的是为了使我们的 URL 可以按照模块化管理&…

我的1827创作纪念日

机缘 习惯性早上打开电脑&#xff0c;看看CSDN上的资讯&#xff0c;了解行业动态、当前新的技术和大佬的分享。自己动手写应该是2019 年 01 月 08 日&#xff0c;当时应该是在用安装和使用Oracle&#xff0c;遇到一些问题&#xff0c;写下第一篇博客 Oracle存储过程常见问题及…

一、Mybatis 简介

本章概要 简介持久层框架对比快速入门&#xff08;基于Mybatis3方式&#xff09; 1.1 简介 https://mybatis.org/mybatis-3/zh/index.html MyBatis最初是Apache的一个开源项目iBatis, 2010年6月这个项目由Apache Software Foundation迁移到了Google Code。随着开发团队转投G…

2024.1.9 Spark SQL day06 homework

目录 一. Spark SQL中数据清洗的API有哪些&#xff0c;各自作用是什么&#xff1f; 二. 设置Spark SQL的shuffle分区数的方式有哪几种 三. 数据写出到数据库需要注意什么? 四. Spark程序运行集群分类 一. Spark SQL中数据清洗的API有哪些&#xff0c;各自作用是什么&#x…

【解决方案】 无法将“pip“项识别为 cmdlet、函数、脚本文件

在当今的软件开发和运维领域&#xff0c;Python已经成为了一个不可或缺的工具。而pip&#xff0c;作为Python的包管理工具&#xff0c;更是Python生态系统中不可或缺的一部分。然而&#xff0c;有时候我们可能会遇到一个令人困扰的问题&#xff1a;无法将“pip”项识别为cmdlet…

zookeeper 与eureka区别

CAP定理 在分布式系统的发展中&#xff0c;影响最大的莫过于CAP定理了&#xff0c;是分布式系统发展的理论基石。 2000年&#xff0c;加州大学的计算机科学家 Eric Brewer提出了CAP猜想 2002 年&#xff0c;麻省理工学院的 Seth Gilbert 和 Nancy Lynch 从理论上证明了 CAP 猜…

c++实现支持动态扩容的栈(stack)

1.在栈容量满时自动扩容: 支持自动扩容栈实现: // // myStack.hpp // algo_demo // // Created by Hacker X on 2024/1/9. //#ifndef myStack_hpp #define myStack_hpp #include <stdio.h> #include <string.h> //栈实现 //1.入栈 //2.出栈 //3.空栈 //4.满栈 …

栈的模拟实现

栈的模拟实现 一:什么是栈二:IStack 接口三:MyStack类:1:push(int x):2:pop()3:peek()4:size(),empty(),full() 三:四:栈的时间复杂度: 一:什么是栈 栈是以先进后出(后进先出)的形式来组织数据结构 比如: 先装入的子弹后射出,后装入的子弹先射出,这就是一种典型的栈. 二:ISta…

扩展欧几里得算法总结

知识概览 裴蜀定理&#xff1a;对于任意正整数a&#xff0c;b&#xff0c;一定存在非零整数x&#xff0c;y&#xff0c;使得 而且(a, b)是a和b能凑出来的最小的正整数。 通过扩展欧几里得算法可以求得裴蜀定理中x和y的值&#xff0c;x和y的通解为 &#xff0c; 例题展示 扩展欧…

ChatGPT扩展系列之网易数帆ChatBI

在当今数字化快速发展的时代,数据已经成为业务经营与管理决策的核心驱要素。无论是跨国大企业还是新兴创业公司,正确、迅速地洞察数据已经变得至关重要。然而,传统的BI工具往往对用户有一定的技术门槛,需要熟练的操作技能和复杂的查询语句,这使得大部分的企业员工难以深入…

2023,半路转行程序员的第一年

键盘敲着总结&#xff0c;抬头看桌面的日期&#xff0c;转眼间来到了 2024 年&#xff0c;时间就这么悄悄的流逝。本来想 12 月底就把总结给写完的&#xff0c;结果一拖&#xff0c;拖到了 2024&#x1f602;。 我本科专业是材料&#xff0c;当时属于生环化材“天坑”专业&…

QT DAY1作业

1.QQ登录界面 头文件代码 #ifndef MYWIDGET_H #define MYWIDGET_H#include <QWidget> #include <QIcon> #include <QLabel> #include <QPushButton> #include <QMovie> #include <QLineEdit>class MyWidget : public QWidget {Q_OBJECTpu…

nn网络层-卷积层

一、1d/2d/3d Convolution 卷积运算&#xff1a;卷积核在输入信号&#xff08;图像&#xff09;上滑动&#xff0c;相应位置上进行乘加卷积核&#xff1a;又称为滤波器&#xff0c;过滤器&#xff0c;可认为是某种模式&#xff0c;某种特征。卷积过程类似于用一个模版去图像上…

将Llama2上下文长度扩展100倍;效率更高的SeTformer;LLM准确度基本不变加速1.56×;FreeTalker

本文首发于公众号&#xff1a;机器感知 将Llama2上下文长度扩展100倍&#xff1b;效率更高的SeTformer&#xff1b;LLM准确度基本不变加速1.56&#xff1b;FreeTalker Latte: Latent Diffusion Transformer for Video Generation 本文使用Latent Diffusion Transformer(Latte…

JS入门笔记整理:函数

函数一般用来实现某种重复使用的功能&#xff0c;在需要使用该功能的时候&#xff0c;直接调用函数就可以了&#xff0c;不需要再重复地编写一大堆代码。并且在需要修改该函数功能的时候&#xff0c;也只需要修改和维护这一个函数就行。函数一般会在两种情况下使用&#xff1a;…