RabbitMQ的介绍和使用

1.同步通讯和异步通讯

        举个例子,同步通讯就像是在打电话,因此它时效性较强,可以立即得到结果,但如果你正在和一个MM打电话,其他MM找你的话,你们之间是不能进行消息的传递和响应的

        异步通讯就像是微信,你可以和多个MM进行消息的传递,对方可以立即响应或者有空了在”回“你

        同步调用问题

        微服务间基于Feign的调用就属于同步方式,存在一些问题。

        1.代码耦合:如果后续需要添加业务,需要不断修改支付服务的代码

        2.性能下降,吞吐量下降:支付服务一直在等待,订单等服务的响应,cpu资源一直在占用。

        3.级联失败:如果服务提供者出现问题,所有调用方都会跟着出问题。

2.异步调用方案

        异步调用常见的就是事件驱动模式:Broker是事件代理者,一旦用户支付成功,这就是一个事件,这个事件就会被Broker管理,订单服务、仓储服务、短信服务。就会找Broker,这个叫做订阅事件。一旦用户支付成功之后,Broker就会通知被订阅过事件的服务,支付服务完成事件发布之后,就结束了服务,返回给用户

        优点:

        解决代码解耦:添加业务时不需要再更改支付服务的代码,支付服务只需要发布事件就行。至于后面的业务支付服务可以不用考虑。

        性能提升,吞吐量提供:相比较同步服务,业务处理时间的累加,支付服务还需要等待其他服务完成并响应,通过异步的方式,支付服务发布时间之后就结束服务,无需等待其他服务响应。提升了性能,和吞吐量

        服务没有依赖关系,不用担心级联失败问题

        流量削峰: 有多个事件发布,可以囤积到broker上,订阅该事件的服务可以按自己的处理能力来稳步进行。broker起到缓冲作用

        缺点:

        依赖Broker的可靠性、安全性、吞吐能力

        架构复杂了,业务没有明显的流程线,不好追踪管理

3.什么是MQ

MQ(MessageQueue),中文是消息队列,字面来看就是存放消息的队列。也就是事件驱动架构中的Broker

常见的MQ

RabbitMQ,适用于中小型企业开发,如果对性能要求比较高的并且需要定制服务的大型企业推荐使用Kafka。下面会介绍RabbitMQ的使用。

4.RabbitMQ概述和安装

RabbitMQ概述

RabbitMQ是基于Erlang语言开发的开源消息通信中间件,官网地址:https://www.rabbitmq.com

RabbitMQ的部署

环境:centos7,docker在线拉取的方式部署。

#拉取RabbitMQ镜像

输入:docker pull rabbitmq:3-management

#设置默认用户名密码并启动容器

docker run --name rabbitmq -e RABBITMQ_DEFAULT_USER=root -e RABBITMQ_DEFAULT_PASS=123 -p 15672:15672 -p 5672:5672 -d rabbitmq:3-management

端口15672是rabbitMQ的ui界面,5672是服务端口

ui界面展示:

RabbitMQ结构和概念

PubLisher是我们的消息(事件)发送者,consumer是消息的消费者,PubLisher将来会把消息发送到我们的exchange(交换机)上,交换机负责路由,并把消息投射到queue(队列),queue负责暂存消息,consumer负责从queue里面获取消息处理消息。

 

5.RabbitMQ的常见消息模型

一、基本消息队列(BasicQueue)

HelloWorld是最基本的消息队列模型,实现的话,包含三个角色

publisher:消息发布者,将消息发送到队列queue

queue:消息队列,负责接受并缓存消息

consumer:订阅队列,处理队列中的消息

官方提供的编码方式非常麻烦,下面我们介绍学习一下SpringAMQP,它可以大大简化我们消息发送和接收API。

SpringAMQP简介

AMQP:是用于在应用程序或之间传递业务消息的开放标准,该协议与语言的平台无关,更符合微服务中独立性的要求。

Spring AMQP:是基于AMQP协议定义的一套API规范,提供了模板来发送和接收消息,包含两部分,其中spring-amqp是基础抽象,spring-rabbit是底层的默认实现。

利用SpringAMQP实现基础消息队列功能

通过rabbitTemplate提供的convertAndSend就可以实现消息的发送

引入相关依赖

<dependency><groupId>org.springframework.amqp</groupId><artifactId>spring-rabbit</artifactId><scope>test</scope>
</dependency>

publisher(消息发布者)的application.yml的配置

test的测试代码:

@RunWith(SpringRunner.class)
@SpringBootTest
class PublisherApplicationTests {@Autowiredprivate RabbitTemplate rabbitTemplate;@Testvoid contextLoads() {String queuename="simple queue";String message="hello,spring AMQP";rabbitTemplate.convertAndSend(queuename,message);}}

通过rabbitTemplate实现对队列消息的监听

引入依赖

<dependency><groupId>org.springframework.amqp</groupId><artifactId>spring-rabbit</artifactId><scope>test</scope>
</dependency>

配置consumer(消息接收者),的application.yml文件

spring:rabbitmq:host: 192.168.10.8 #主机名port: 5672         #端口username: root      #用户名password: 123       #密码virtual-host: /     #虚拟主机名

监听类的代码

@Component
public class SimpleListener {@RabbitListener(queues = "simple queue")public void ListenSimpleQueue(String msg){System.out.println("消费者接收到simple queue的消息:"+msg);}
}

ps:消息一旦消费就会从队列中删除,RabbitMQ没有消息回溯功能。

二、工作消息队列(WorkQueue)

工作队列的结构如下:

工作消息队列的结构,相比于基础消息队列多了个消费者,因为rabbitMQ阅后即焚的特性这两个消费者属于共同工作的关系,如果有50个消息,他们两个消费者就会一人分一半,也就是一人25条,为啥会多一个消费者?这是因为如果一个消费者每次处理40个消息,但是publisher一次发布50个消息,多出来的消息会存储在queue里面,又因为queue是占用内存的假以时日,内存就会爆满,新的消息就存不进去了,多一个消费者每次就可以处理80条消息,可以有效解决这个问题。

work queue,工作队列,可以提高消费处理速度,避免队列消息堆积。但是这里有一个消息预取机制 ,消费者会提前把消息拿过来,因此消息是平局分配,并不是“能者多劳”的模式,通过设置prefetch的值来实现每次只能获取一条消息,处理完成才能接取下一个消息。实现“能者多劳”的模式。

  1. 发布订阅(Publish、Subscribe)

基础消息队列和工作消息队列都是一条信息只被一个消费者消费,消费完就删除,显然不能实现我们之前预想的完成支付之后,通知仓储、短信等服务。这就需要我们了解学习发布订阅模式。发布订阅模式与之前案例的区别就是允许同一消息发送给多个消费者,实现方式是加入exchange(交换机),结构如下:

publisher将消息发送给exchange(交换机),交换机把这个消息转发给队列,因此,发布者(publisher)并不需要知到转发给了那个 队列或多个队列,转发给多个队列,这种方式就能实现被多个消费者消费,那么交换机到底是发给一个还是多个呢?这是由交换机类型来决定的。常见的exchange的类型包括:

广播:Fanout

路由:Direct

主题:Topic

注意: exchange负责消息路由,而不是储存,路由失败则消息丢失。

广播-Fanout Exchange

Fanout Exchange 会将接收到的消息路由到每一个绑定的queue。

实现思路:

1.在consumer服务中,利用代码声明队列、交换机、并将两者绑定。

在consumer服务上添加@Configuration注解,并声明FanoutExchange、Queue和关系对象Binding,代码如下:

@Configuration
public class FanoutConfig {//声明交换机对象@Beanpublic FanoutExchange fanoutExchange(){return new FanoutExchange("fanout");}//声明队列1@Beanpublic Queue fanoutqueue1(){return new Queue("fanoutqueue1");}//绑定队列一到交换机@Beanpublic Binding fanoutBinding1(Queue fanoutqueue1,FanoutExchange fanoutExchange){return BindingBuilder.bind(fanoutqueue1).to(fanoutExchange);}//声明队列2@Beanpublic Queue fanoutqueue2(){return new Queue("fanoutqueue2");}//绑定队列二到交换机@Beanpublic Binding fanoutBinding2(Queue fanoutqueue2,FanoutExchange fanoutExchange){return BindingBuilder.bind(fanoutqueue2).to(fanoutExchange);}
}

2.在consumer服务中,编写两个消费者方法,分别监听fanout.queue1和fanout.queue2

@RabbitListener(queues = "fanoutqueue1")
public void ListenSimpleQueue3(String msg) throws InterruptedException {System.out.println("消费者222接收到fanoutqueue1的消息:"+msg);Thread.sleep(100);
}
@RabbitListener(queues = "fanoutqueue2")
public void ListenSimpleQueue4(String msg) throws InterruptedException {System.out.println("消费者222接收到fanoutqueue2的消息:"+msg);Thread.sleep(100);
}

3.在publisher中编写测试方法,向fanout发送消息。

@Test
public void contectFonoutExchange()
{//交换机名称String exchangeName="fanout";//消息内容String message="hello everyone";//发送消息rabbitTemplate.convertAndSend(exchangeName,"",message);
}

路由-DirectExchange

Direct Exchange 会将接收到的消息根据规则路由到指定的Queue,因此称为路由模式(routes)

每一个Queue都与Exchange设置一个BindingKey,一个队列可以绑定多个BindingKey。

发布者发送消息时,指定消息的RoutingKey

Exchange(交换机)将消息路由到Bindingkey与消息RoutingKey一致的队列

实现思路:

1.利用@RabbitListenner声明Exchange、Queue、RountingKey

2.在consumer服务中编写两个消费者方法,分别监听queue1和queue2

在我们的监听类里面增加两个方法,用来声明交换机、队列和RountingKey

//发布订阅DirectExchange
@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "queue1"),exchange = @Exchange(name = "direct",type= ExchangeTypes.DIRECT),key = {"red","blue"}
))
public void listenerDirectqueue1(String msg)
{System.out.println("消费者接收到direct.queue1的消息"+msg);
}@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "queue2"),exchange = @Exchange(name = "direct",type= ExchangeTypes.DIRECT),key = {"red","yellow"}
))
public void listenerDirectqueue2(String msg)
{System.out.println("消费者接收到direct.queue2的消息"+msg);
}

3.在publisher中编写测试方法,向Exchange发送消息。

@Test
public void contextDirectExchange()
{String exchangeName="direct";String msgblue="hello blue";String msgRed="hello red";String msgYellow="hello yellow";rabbitTemplate.convertAndSend(exchangeName,"blue",msgblue);
}

@Test
public void contextDirectExchange()
{String exchangeName="direct";String msgblue="hello blue";String msgRed="hello red";String msgYellow="hello yellow";rabbitTemplate.convertAndSend(exchangeName,"red",msgRed);
}

话题-TopicExchange

TopicExchange与DirectExchange类似,区别在于rountingKey必须是多个单词的列表,并且以"."分割。Queue与Exchange指定BindingKey可以使用通配符

#:代表0个或多个单词

*:代表一个单词

我们使用Direct的时候一个队列如果绑定了很多key,会非常麻烦,通配符的引入就把key的绑定简化许多,原来绑定多个key现在只需要绑定一个key。

实现思路:

1.利用@RabbitListenter声明Exchange、Queue、RoutingKey

2.在consumer服务中,编写两个消费者方法,分别监听topic.queue1和topic.queue2

//topic话题
@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "topic.queue1"),exchange = @Exchange(name = "topic",type = ExchangeTypes.TOPIC),key ="china.#"
))
public void listennertopicqueue1(String msg)
{System.out.println("消费者接收到topic.queue1的消息"+msg);
}
@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "topic.queue2"),exchange = @Exchange(name = "topic",type = ExchangeTypes.TOPIC),key ="#.news"
))
public void listennertopicqueue2(String msg)
{System.out.println("消费者接收到topic.queue2的消息"+msg);
}

3.在publisher中编写测试方法,向交换机topic发送消息

@Test
public void contextTopicExchange()
{String exchangeName="topic";String msg="我是懒大王";rabbitTemplate.convertAndSend(exchangeName,"china.news",msg);
}

6.消息转换器

在SpringAMQP的发送方法中,接收消息的类型是Object,也就是说我们可以发送任意对象类型的消息,SpringAMQP会帮我们序列化为字节后发送。

Spring的消息对象处理是由MessageConcerter来处理的,而默认实现是SimpleMessageConverter,基于JDK的ObjectOutputStream完成序列化,这种序列化方式,比较浪费内存资源,如果需要修改,只需要定义一个MessageConverter类型的Bean即可。推荐用JSON方式序列化,步骤如下:

我们在publisher服务引入依赖

我们在publisher服务中声明MessageConverter

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/7568.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

流畅的python-学习笔记_一等函数

函数对象 函数也是对象&#xff0c;操作可像对象一般操作 高阶函数 高阶函数指接受参数为函数&#xff0c;或返回函数的函数 不少高阶函数在py3已经有了替代品。map&#xff0c; filter可通过生成式实现&#xff0c;reduce&#xff08;在functools里&#xff09;可通过sum实…

解决 git克隆拉取代码报SSL certificate problem错误

问题&#xff1a;拉取代码时报错&#xff0c;SSL证书问题:证书链中的自签名证书问题 解决&#xff1a;只需要关闭证书验证&#xff0c;执行下面代码即可&#xff1a; git config --global http.sslVerify "false" 再次拉取代码就可以了

使用代理IP时,如何预防未知的风险?

在使用代理IP时&#xff0c;预防未知的风险是至关重要的。代理IP虽然提供了诸多便利&#xff0c;如匿名浏览、访问控制和内容过滤等&#xff0c;但如果不加以妥善管理和使用&#xff0c;可能会面临数据泄露、隐私暴露、恶意活动关联等风险。以下是一些建议&#xff0c;以帮助您…

使用mxnet中的img2rec.py制作rec数据集

源码链接&#xff1a;mxnet/tools/im2rec.py at master apache/mxnet GitHub 重点关注入参函数即可&#xff0c; def parse_args():"""Defines all arguments.Returns-------args object that contains all the params"""parser argparse.A…

一键实现在VS Code中绘制流程图

VS Code是一款常用的IDE&#xff0c;受到许多用户的欢迎和喜爱。而其较为出众的一点&#xff0c;就是较好的可拓展性&#xff0c;即丰富的插件应用&#xff0c;这些应用可以极大地提高生产效率&#xff0c;并优化日常使用。 流程图是一种直观的图示方法&#xff0c;可以用简明…

BIGRU、CNN-BIGRU、CNN-BIGRU-ATTENTION、TCN-BIGRU、TCN-BIGRU-ATTENTION合集

&#xff08;BIGRU、CNN-BIGRU、CNN-BIGRU-ATTENTION、TCN-BIGRU、TCN-BIGRU-ATTENTION&#xff09;时&#xff0c;我们可以从它们的基本结构、工作原理、应用场景以及优缺点等方面进行详细介绍和分析。 BIGRU、CNN-BIGRU、CNN-BIGRU-ATTENTION、TCN-BIGRU等&#xff08;matlab…

层级实例化静态网格体组件:开启大量模型处理之门

前言 在数字孪生的世界里&#xff0c;我们常常需要构建大量的模型来呈现真实而丰富的场景。然而&#xff0c;当使用静态网格体 &#xff08;StaticMesh &#xff09;构建大量模型时&#xff0c;可能会遇到卡顿的问题&#xff0c;这给我们带来了不小的困扰&#x1f623;。那么&…

Spring Data JPA自定义Id生成策略、复合主键配置、Auditing使用

前言 在Spring Data JPA系列的第一篇文章 SpringBoot集成JPA及基本使用-CSDN博客 中讲解了实体类的Id生成策略可以通过GeneratedValue注解进行配置&#xff0c;该注解的strategy为GenerationType类型&#xff0c;GenerationType为枚举类&#xff0c;支持四种Id的生成策略&…

STM32F407VET6 学习笔记2:定时器、串口、自定义串口打印函数

今日继续学习使用嘉立创购买的 立创梁山派天空星&#xff0c;芯片是 STM32F407VET6 因为已经有学习基础了&#xff0c;所以学习进度十分快&#xff0c;这次也是直接一块学习配置定时器与串口了&#xff0c;文章也愈来愈对基础的解释越来越少了...... 文章提供测试代码讲解、完…

【如此简单!数据库入门系列】之思想地图 -- 系列目录

文章目录 1 前言2 基本概念3 基本原理4 数据库历史5 数据模型6 数据库规范化7 数据存储8 总结 1 前言 目录是思想地图&#xff0c;指引我们穿越文字的森林。 为了方便系统性阅读&#xff0c;将【如此简单&#xff01;数据库入门系列】按照模块划分了目录结构。 2 基本概念 【…

Jetpack Compose(一

Intellij IDEA构建Android开发环境 IntelliJ IDEA 2023.2.1 Android开发变化 IDEA配置使用Gradle 新建Compose工程&#xff0c;取名ComposeStudy 可以看到的是IDEA为项目初始化了部分代码 使用Compose开发不再需要使用xml文件来设计布局了 Compose中的Text也不同于Android V…

ogv转mp4怎么转?只需3个步骤~

OGV&#xff08;Ogg Video&#xff09;是一种开源的视频文件格式&#xff0c;起源于对数字媒体领域的开放标准的需求。作为Ogg多媒体容器的一部分&#xff0c;OGV的设计初衷是提供一种自由、高质量的视频编码方案&#xff0c;以满足多样化的应用需求。 支持打开MP4文件格式的软…

119. 再谈接口幂等性

文章目录 0. 前言1. insert前先select2. 加悲观锁3. 加乐观锁5. 加唯一索引【配合 &#xff08;1. insert前先select &#xff09;最常用 】6. 建防重表6. 根据状态机7. 加分布式锁8. 获取token 0. 前言 在 93. 通用防重幂等设计 一文中&#xff0c;已经介绍过幂等的使用。该文…

【一看就懂】UART、IIC、SPI、CAN四种通讯协议对比介绍

UART、IIC、SPI、CAN四种通信协议对比 通信方式传输线通讯方式标准传输速度使用场景UARTTX(发送数据线)、RX(接收数据线)串行、异步、全双工115.2 kbit/s(常用)计算机和外部设备通信&#xff08;打印机&#xff09;IICSCL(时钟线)、SDA(数据线)串行、同步、半双工100 kbit/s(标…

一种由RSOA和PIC集成的宽可调激光器

----翻译自Nouman Zia, Samu-Pekka Ojanen, Jukka Viheriala, Eero Koivusalo, Joonas Hilska, Heidi Tuorila, and Mircea Guina在optics letter上发的文章vol.48, Issue 5, pp. 1319-1322(2023) 摘要&#xff1a;通过光子集成方式实现的2-3μm波长的可调激光器&#xff0c;在…

Adobe Acrobat Pro DC 2022一款高效强大的PDF阅读编辑专业软件(240506)

01 软件介绍 Adobe Acrobat Pro DC 2022&#xff0c;作为一款专业的PDF处理工具&#xff0c;它集成了强大的制作功能&#xff0c;能够与Adobe Photoshop的高级图像编辑能力无缝衔接&#xff0c;进而将各类纸质文档高效转换成可编辑的电子格式&#xff0c;以便于文件的传输和签…

跨越语言界限,多语言盲盒小程序带你领略全球风情

在全球化的今天&#xff0c;我们生活在一个多元文化的世界中&#xff0c;不同的语言、风俗、习惯共同构成了这个五彩斑斓的地球村。为了让每个人都能轻松体验到世界各地的独特风情&#xff0c;一款创新的多语言盲盒小程序应运而生&#xff0c;它跨越了语言的界限&#xff0c;让…

老阳分享:跨境选品师普通人做能否借此赚钱?

在跨境电商日益繁荣的今天&#xff0c;选品师这一职业逐渐进入大众视野。老阳&#xff0c;作为业内知名的跨境选品师&#xff0c;经常分享他的选品经验和心得。那么&#xff0c;对于普通人来说&#xff0c;成为跨境选品师是否真的能赚钱呢? 首先&#xff0c;我们需要明确什么是…

mySQL (基础面试)实物四属性 ACID属性,以及开启事务

mySQL具备四个基本属性 原子性atomicity 事务是一个完整的操作&#xff0c;事务的各个步骤是不可分的&#xff08;原子的&#xff09;&#xff0c;要么执行要么不执行 一致性consistency 当事务完成时&#xff0c;数据处于一致状态 隔离性isolation 并发事物之间彼此隔离…

基于springboot+vue+Mysql的租房网站

开发语言&#xff1a;Java框架&#xff1a;springbootJDK版本&#xff1a;JDK1.8服务器&#xff1a;tomcat7数据库&#xff1a;mysql 5.7&#xff08;一定要5.7版本&#xff09;数据库工具&#xff1a;Navicat11开发软件&#xff1a;eclipse/myeclipse/ideaMaven包&#xff1a;…