【rabbitmq基础】

RabbitMq基础

  • 1.概念
  • 2.数据隔离
  • 3.使用控制台向mq传递消息
    • 1.创建两个队列-“测试队列”,“测试队列2”
    • 2.创建一个交换机-"测试交换机"
    • 3.测试发送消息
      • 3.1让交换机和队列进行绑定
      • 3.2发送消息
      • 3.3查看消息
  • 4.创建虚拟主机
  • 5.java使用rabbitmq
    • 5.1 发送消息
    • 5.2 消费消息
  • 6.任务模型work queues
  • 7.交换机
    • 7.1.为什么使用交换机
    • 7.2.交换机模型
      • 7.2.1交换机模型Fanout(广播)
        • 7.2.1.1改造java代码
      • 7.2.2交换机模型Direct(订阅)
        • 7.2.2.1
      • 7.2.3交换机模型Topic()
    • 7.3.队列和交换机的申明
  • 8.消息转换器

1.概念

  • 消息发送者(publisher):生产消息
  • 交换机(exchange):负责路由消息,把消息路由给队列,可以路由给一个队列,也可以路由给多个队列,这取决于交换机的类型
  • 队列(queue):队列,存储消息
  • 消息消费者(coumsmser):消费消息
  • 虚拟主机(virtual-host):虚拟主机,数据隔离作用
    在这里插入图片描述

2.数据隔离

在实际工作中,公司一般是在一个指定的服务器上去搭建mq,或者多个机器上去搭建集群模式,那一个公司肯定不止一个项目组,多个项目组的情况下,不可能每个项目都搞一套自己的mq,费时费力不说,维护还麻烦,所以mq就有数据隔离,多个项目组用一个环境的mq,数据不一样而已
在这里插入图片描述

3.使用控制台向mq传递消息

1.创建两个队列-“测试队列”,“测试队列2”

在这里插入图片描述

2.创建一个交换机-“测试交换机”

在这里插入图片描述

3.测试发送消息

3.1让交换机和队列进行绑定

在这里插入图片描述
绑定成功之后在指定的"测试队列"中也可以看到他和交换机的绑定关系
在这里插入图片描述

3.2发送消息

在这里插入图片描述

在这里插入图片描述

3.3查看消息

在这里插入图片描述
当然你也可以使用这个交换机同时绑定创建的两个队列

4.创建虚拟主机

在这里插入图片描述
在这里插入图片描述

5.java使用rabbitmq

5.1 发送消息

接着之前的在common里面引入依赖(没看之前的文章的直接就创建一个单体的springboot项目引入这个依赖就行)
在这里插入图片描述

		<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>

在用户工程作为消息投递方,订单工程作为消费者,不通过交换机投递消息,并且消费
在这里插入图片描述

spring:rabbitmq:host: localhostport: 5672username: guestpassword: guest

userController

 @AutowiredRabbitTemplate rabbitTemplate;@GetMapping("/sendMassage")@ApiOperation(value = "不通过交换机发送消息")public void sendMassage( String queueName ,String msg ){rabbitTemplate.convertAndSend(queueName,msg);}

接口测试
在这里插入图片描述
查看消息
在这里插入图片描述
在这里插入图片描述

5.2 消费消息

order工程加配置

spring:rabbitmq:host: localhostport: 5672username: guestpassword: guest

创建orderListen

@Component
public class orderListen {@RabbitListener(queues = "测试队列2")public void listenOrder(String msg){System.out.println("我已经接收到订单消息:"+msg);}
}

在这里插入图片描述

6.任务模型work queues

简单的说就是多个消费者绑定一个队列

  1. 创建一个队列work.queue
  2. 生产者(用户服务)向队列(work.queue)中发送消息,每秒钟100条记录
  3. 创建两个消费者(订单服务)监听队列,一个消费者一秒钟消费20条,一个消费者一秒钟消费30条记录
    生产者代码
 @GetMapping("/sendWorkQueueMassage")@ApiOperation(value = "发送到任务模型")public void sendWorkQueueMassage() throws InterruptedException {String queueName="work.queue";for (int i = 1; i <=100 ; i++) {String msg="msg_"+i;rabbitTemplate.convertAndSend(queueName,msg);//休眠20毫秒Thread.sleep(20);}}

消费者代码

 @RabbitListener(queues = "work.queue")public void listenWorkQueueOrder(String msg) throws InterruptedException {System.out.println("消费者1已经接收到订单消息:"+msg);// Thread.sleep(30);}@RabbitListener(queues = "work.queue")public void listenWorkQueueOrder2(String msg) throws InterruptedException {System.err.println("消费者2已经接收到订单消息:"+msg);//  Thread.sleep(40);}

结果:
1.队列在被多个消费者绑定的时候,队列会把消息轮询分配给每一个消费者
2.消息被消费方消费之后就消失了

在这里插入图片描述
产生的问题:
问题1.资源浪费:现实生活中,每个服务器的负载能力都是不一样的,假如B服务器一秒钟只能处理2个请求,A服务器一秒钟能处理20个,那在轮询消费的时候,假设时间过去0.3秒,B服务器还没消费完一个消息,按照A服务器的性能,他0.3秒都可以处理好几个了,他应该在0.05秒的时候就处理完毕一个了,但是由于轮询他只能处理一个,这个时候A就要等着B消费完,这样就很浪费A的服务器资源。
2.消息积压,以上代码,生产方发送消息到队列,休眠时间为20毫秒,消费者1消费一个消息要30毫秒,B需要40毫秒,时间长了。生产者发的消息消费者就消费不过来

问题1处理方案:
增加配置

spring:rabbitmq:listener:direct:prefetch: 1  #保证同一时刻最多投递一条消息给消费者

结果,因为消费者1的消费能力比消费者2要快,所有可以看到他没有等着
在这里插入图片描述
问题2处理方案:
很明显能看到,两个消费者的消费能力跟不上生产者的生产速度,所有只能再增加多个消费者,直到消费者的消费能力快过生产者的生产能力

7.交换机

7.1.为什么使用交换机

我们上面的代码,是生产者直接连接队列,然后消费者消费,实际业务中,你在网购平台买东西,购买成功你的订单微服务得知道,积分微服务得知道,购物车微服务得知道,如果按照不用交换机去做,那消息一旦被订单服务消费了,这条消息在队列认为就消费完毕了,直接就会删除,造成的结果就是积分微服务就不知道了。那咋搞,所以就可以用到交换机

7.2.交换机模型

7.2.1交换机模型Fanout(广播)

把消息放到交换机,然后交换机广播给多个队列(积分队列,购物车队列,订单队列),然后相应得微服务去跟相应得队列绑定,这种方式叫做广播
在这里插入图片描述

7.2.1.1改造java代码

在这里插入图片描述

  1. 使用之前创建的交换机,测试交换机,并且绑定"测试队列","测试队列2"两个队列
    在这里插入图片描述
  2. 编写两个消费者方法,分别监听两个队列
    创建积分微服务
spring:rabbitmq:host: localhostport: 5672username: guestpassword: guestlistener:direct:prefetch: 1  #保证同一时刻最多投递一条消息给消费者
@Component
public class PointsFanoutListen {@RabbitListener(queues = "测试队列2")public void listenPoints(String msg){System.out.println("积分服务已经接收到消息:"+msg);}
}

订单微服务微服务中监听另外一个队列

@Component
public class OrderFanoutListen {@RabbitListener(queues = "测试队列")public void listenOrder(String msg){System.out.println("订单服务已经接收到消息:"+msg);}}
  1. 编写生产者方法,向交换机发送消息
 @GetMapping("/sendFanoutMassage")@ApiOperation(value = "发送消息到广播交换机")public void sendFanoutMassage() throws InterruptedException {String exchangeName="测试交换机";String msg="用户成功下单了";rabbitTemplate.convertAndSend(exchangeName,null,msg);}

测试
本地调用接口: loclahost:8001/user/sendFanoutMassage
在这里插入图片描述
启动两个消费者

在这里插入图片描述
在这里插入图片描述

7.2.2交换机模型Direct(订阅)

实际业务中,我可能不需要把消息发送给每个队列,比如。我订单交易失败,我的积分微服务就不需要接收到这种,积分微服务只有在交易成功才做积分减少或者增加的操作,那就是我只订阅交易成功的订单消息

7.2.2.1
  1. 创建交换机
    在这里插入图片描述

  2. 创建队列
    在这里插入图片描述

  3. 交换机跟队列绑定
    在这里插入图片描述

  4. 创建消费者
    消费者1:订单服务监听队列1

@Component
public class PointsDirectListen {@RabbitListener(queues = "driect.queue2")public void listenPoints(String msg){System.out.println("积分服务已经接收到用户成功下单消息:"+msg);}
}

消费者2:积分服务监听队列2

@Component
public class PointsDirectListen {@RabbitListener(queues = "driect.queue2")public void listenPoints(String msg){System.out.println("积分服务已经接收到用户成功下单消息:"+msg);}
}

创建生产者用户服务

@GetMapping("/sendDirectMassage")@ApiOperation(value = "发送消息到订阅交换机")public void sendDirectMassage() throws InterruptedException {String exchangeName="work.dirice";String msg="用户成功下单了";rabbitTemplate.convertAndSend(exchangeName,"red",msg);}@GetMapping("/sendDirectMassageFaild")@ApiOperation(value = "发送消息到订阅交换机")public void sendDirectMassageFaild() throws InterruptedException {String exchangeName="work.dirice";String msg="用户下单失败了";rabbitTemplate.convertAndSend(exchangeName,"blue",msg);}@GetMapping("/sendDirectMassageWait")@ApiOperation(value = "发送消息到订阅交换机")public void sendDirectMassageWait() throws InterruptedException {String exchangeName="work.dirice";String msg="用户下单但是还未付款";rabbitTemplate.convertAndSend(exchangeName,"yellow",msg);}

分别调用三个接口.结果如下
sendDirectMassage接口两个消费者都能接收到
sendDirectMassageFaild只有消费者1能接收到
sendDirectMassageWait只有消费者2能接收到

7.2.3交换机模型Topic()

在这里插入图片描述
编写案例
在这里插入图片描述
创建绑定关系
在这里插入图片描述
在这里插入图片描述

7.3.队列和交换机的申明

在之前我们都是手动在控制台去创建队列或者交换机,但是在真实企业中,不可能手动在控制台去创建,而且这样创建的,一旦中间件出问题了,所有的队列和交换机就没了,一般是用代码处理。


/*** 注解的方式创建队列* 一般在消费方创建* 1.创建一个名字叫annotate.work的且类型为TOPIC的交换机* 2.交换机绑定的队列为annotate.queue,该队列持久化* 3.交换机绑定的key为"red","yellow"* @param msg* @throws InterruptedException*/@Componentpublic class orderListen {@RabbitListener(bindings = @QueueBinding(value = @Queue(name ="annotate.queue",declare = "true"), //队列名称叫annotate.queue,且需要持久化exchange = @Exchange(name = "annotate.work",type = ExchangeTypes.TOPIC),//交换机名称和类型key={"red","yellow"} //路由key))public void listenWorkAnnotateQueueOrder2(String msg) throws InterruptedException {System.err.println("注解方式生成的队列收到消息:"+msg);Thread.sleep(50);}}

项目启动之后就能直接创建相应的队列和交换机

8.消息转换器

1.创建一个队列,名字叫object.queue
在这里插入图片描述

2.创建生产者往这个队列发送一个消息,消息的类型为map或者java对象


import com.threesum.OderApplication;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;import java.util.HashMap;@SpringBootTest(classes = OderApplication.class)
public class orderTest {@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void sendObjectMsg(){HashMap<String, Object> msg = new HashMap<>();msg.put("name","aa");msg.put("age",21);rabbitTemplate.convertAndSend("object.queue",msg);}
}

3.观察队列中的消息
在这里插入图片描述
结论:会发现变成了一堆乱码(因为默认采用的是java的jdk序列化)

4.采用java序列化方式处理问题
4.1引入依赖

 <dependency><groupId>com.fasterxml.jackson.dataformat</groupId><artifactId>jackson-dataformat-yaml</artifactId></dependency>

4.2发送方和消费方都使用java序列化

package com.threesum.config.rabbit;import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;@Component
public class JacksonDada {@Beanpublic MessageConverter JacksonJsonMessageConverter(){return new Jackson2JsonMessageConverter();}
}

4.3再次获取,就转换正常了
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/75626.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

加固计算机厂家 | 工业加固笔记本电脑厂家

北京鲁成伟业科技发展有限公司&#xff08;以下简称“鲁成伟业”&#xff09;成立于2005年&#xff0c;是集研发、生产、销售与服务于一体的高新技术企业&#xff0c;专注于加固计算机、工业加固笔记本电脑及特种计算机的研发与制造。凭借20年的技术积累与行业深耕&#xff0c;…

链路聚合配置命令

技术信息 加入捆绑组&#xff0c;加大链路间带宽等 配置命令 华三 静态聚合 将接口加入聚合口后再进行配置 //创建静态链路聚合口1&#xff0c;不启用lacp[SWB]interface Bridge-Aggregation 1 [SWB-Bridge-Aggregation1]port link-type trunk [SWB-Bridge-Aggregation…

ekf-imu --- 四元数乘法符号 ⊗ 的含义

⊗ 表示四元数的乘法运算&#xff1a; 用于组合两个四元数代表的旋转。四元数乘法是非交换的&#xff08;即顺序不同结果不同&#xff09;&#xff0c;其定义如下&#xff1a; 若两个四元数分别为&#xff1a; qq0q1iq2jq3k, pp0p1ip2jp3k, 则它们的乘积为&#xff1a;4*1 …

论文阅读Diffusion Autoencoders: Toward a Meaningful and Decodable Representation

原文框架图&#xff1a; 官方代码&#xff1a; https://github.com/phizaz/diffae/blob/master/interpolate.ipynb 主要想记录一下模型的推理过程 &#xff1a; %load_ext autoreload %autoreload 2 from templates import * device cuda:1 conf ffhq256_autoenc() # pri…

OpenVLA-OFT——微调VLA的三大关键设计:并行解码、动作分块、连续动作表示以及L1回归目标

前言 25年3.26日&#xff0c;这是一个值得纪念的日子&#xff0c;这一天&#xff0c;我司「七月在线」的定位正式升级为了&#xff1a;具身智能的场景落地与定制开发商 &#xff0c;后续则从定制开发 逐步过渡到 标准产品化 比如25年q2起&#xff0c;在定制开发之外&#xff0…

【论文阅读】Dynamic Adversarial Patch for Evading Object Detection Models

一、介绍 这篇文章主要是针对目标检测框架的攻击&#xff0c;不同于现有的攻击方法&#xff0c;该论文主要的侧重点是考虑视角的变化问题&#xff0c;通过在车上布置多个显示器&#xff0c;利用视角动态选择哪一个显示器播放攻击内容&#xff0c;通过这种方法达到隐蔽与攻击的…

多模态技术概述(一)

1.1 多模态技术简介 1.1.1 什么是多模态 多模态(Multimodal)涉及多种不同类型数据或信号的处理和融合&#xff0c;每种数据类型或信号被称为一种模态。常见的模态包括文本、图像、音频、视频等。多模态技术旨在同时利用这些不同模态的数据&#xff0c;以实现更全面、更准确的理…

nginx2

Nginx反向代理(七层代理)、Nginx的TCP/UDP调度器(四层代理)、 一、Nginx反向代理(七层代理) 步骤&#xff1a; ​ 部署后端web服务器集群 ​ 配置Nginx代理服务器 ​ 配置upstream集群池 ​ 调节集群池权重比 <img src"/home/student/Deskt…

调用kimi api

官网支持python&#xff0c;curl和node.js 因为服务器刚好有php环境&#xff0c;所以先用curl调个普通的语音沟通api <?php // 定义 API Key 和请求地址 define(MOONSHOT_API_KEY, sk-PXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXgk1); define(MOONSHOT_API_URL, https://…

关于 UPDATE 语句 和 SELECT ... FOR UPDATE 的对比分析,包括语法、功能、锁机制、使用场景及示例代码

以下是关于 UPDATE 语句 和 SELECT ... FOR UPDATE 的对比分析&#xff0c;包括语法、功能、锁机制、使用场景及示例代码&#xff1a; 1. UPDATE 语句 功能 直接修改数据&#xff1a;立即更新表中的数据&#xff0c;并提交修改。无显式锁&#xff1a;虽然会自动加锁&#xff…

在航电系统中提高可靠性的嵌入式软件设计

1.总线余度设计 数据传输采用双余度总线设计&#xff0c;CANFD为主&#xff0c;RS485为备。发送方将相同的数据分别通过双总线来发送&#xff0c;接收方优先处理主线数据。由于总线上数据频率固定&#xff0c;可设置定时器监控主总线的数据&#xff0c;当定时器超时后&#xff…

第十五届蓝桥杯大赛软件赛省赛Python 大学 C 组:5.回文数组

题目1 回文数组 小蓝在无聊时随机生成了一个长度为 n 的整数数组&#xff0c;数组中的第 i 个数为 ai&#xff0c;他觉得随机生成的数组不太美观&#xff0c;想把它变成回文数组&#xff0c;也是就对于任意 i∈[1,n] 满足 a i a n − i 1 a_ia_{n−i}1 ai​an−i​1。 小蓝…

netty中的WorkerGroup使用详解

Netty中WorkerGroup的深度解析 WorkerGroup是Netty线程模型中的从Reactor线程组&#xff0c;负责处理已建立连接的I/O读写、编解码及业务逻辑执行。其设计基于主从多Reactor模型&#xff0c;与BossGroup分工协作&#xff0c;共同实现高并发网络通信的高效处理。 一、WorkerGro…

模运算核心性质与算法应用:从数学原理到编程实践

目录 &#x1f680;前言&#x1f31f;数学性质&#xff1a;模运算的理论基石&#x1f4af;基本定义&#xff1a;余数的本质&#x1f4af;四则运算规则&#xff1a;保持同余性的关键 &#x1f99c;编程实践&#xff1a;模运算的工程化技巧&#x1f4af;避免数值溢出&#xff1a;…

#Git 变基(Rebase)案例

适合学习理解的 Git 变基&#xff08;Rebase&#xff09;案例 为了帮助你更好地理解 Git 变基&#xff08;Rebase&#xff09;的操作和效果&#xff0c;下面通过一个简单的案例来演示变基的过程和影响。 案例背景 假设我们有一个 Git 仓库&#xff0c;包含两个分支&#xff1…

泰博云平台solr接口存在SSRF漏洞

免责声明&#xff1a;本号提供的网络安全信息仅供参考&#xff0c;不构成专业建议。作者不对任何由于使用本文信息而导致的直接或间接损害承担责任。如涉及侵权&#xff0c;请及时与我联系&#xff0c;我将尽快处理并删除相关内容。 漏洞描述 SSRF漏洞是一种在未能获取服务器…

MyBatis 动态SQL 详解!

目录 一、 什么是动态 SQL&#xff1f;二、 为什么需要动态 SQL&#xff1f;三、 MyBatis 动态 SQL 标签四、 标签详解及示例1、 if 标签2、 choose、when、otherwise 标签3、 where 标签4、 set 标签5、 foreach 标签6、 sql、include 标签 五、 总结 &#x1f31f;我的其他文…

阿里云服务器遭遇DDoS攻击有争议?

近年来&#xff0c;阿里云服务器频繁遭遇DDoS攻击的事件引发广泛争议。一方面&#xff0c;用户质疑其防御能力不足&#xff0c;导致服务中断甚至被迫进入“黑洞”&#xff08;清洗攻击流量的隔离机制&#xff09;&#xff0c;轻则中断半小时&#xff0c;重则长达24小时&#xf…

如何在Springboot的Mapper中轻松添加新的SQL语句呀?

在如今的软件开发界&#xff0c;Spring Boot可是非常受欢迎的框架哦&#xff0c;尤其是在微服务和RESTful API的构建上&#xff0c;真的是让人爱不释手&#xff01;今天&#xff0c;我们就来聊聊如何为Spring Boot项目中的Mapper添加新的SQL语句吧&#xff01;说起来&#xff0…

Qt 中 findChild和findChildren绑定自定义控件

在 Qt 中&#xff0c;findChild 和 findChildren 是两个非常实用的方法&#xff0c;用于在对象树中查找特定类型的子对象。这两个方法是 QObject 类的成员函数&#xff0c;因此所有继承自 QObject 的类都可以使用它们。当您需要查找并绑定自定义控件时&#xff0c;可以按照以下…