RabbitMq深度学习

什么是RabbitMq?

RabbitMQ是一个开源的消息队列中间件,它实现了高级消息队列协议(AMQP)。它被广泛用于分布式系统中的消息传递和异步通信。RabbitMQ提供了一种可靠的、可扩展的机制来传递消息,使不同的应用程序能够相互之间进行通信。它支持多种编程语言和平台,并且具有灵活的路由和队列配置选项。

同步调用 

同步调用的优点:

  • 时效性较强,可以立即得到结果

同步调用的问题:

  • 耦合度高

  • 性能和吞吐能力下降

  • 有额外的资源消耗

  • 有级联失败问题

异步调用

好处:

  • 吞吐量提升:无需等待订阅者处理完成,响应更快速

  • 故障隔离:服务没有直接调用,不存在级联失败问题

  • 调用间没有阻塞,不会造成无效的资源占用

  • 耦合度极低,每个服务都可以灵活插拔,可替换

  • 流量削峰:不管发布事件的流量波动多大,都由Broker接收,订阅者可以按照自己的速度去处理事件

缺点:

  • 架构复杂了,业务没有明显的流程线,不好管理

  • 需要依赖于Broker的可靠、安全、性能

MQ的种类 

 RabbitMq安装和使用 

 云服务器安装Rabbitmq。

 在docker 中拉去Ribbitmq镜像。

在docker 中运行ribbitmq。

docker run -d -p 5672:5672 -p 15672:15672 -p 25672:25672 --name rabbitmq rabbitmq

 查看rabbitmq的状态。

rabbitmqctl status

接着我们还可以将Rabbitmq的管理面板开启,这样就可以在浏览器上进行实时访问和监控了。 

我们需要先进入rabbitmq容器。

docker exec -it [在docker中对应的ID] [进入容器的路径] #路径一般为/bin/bash

开启rabbitmq的控制面板设置。

rabbitmq-plugins enable rabbitmq_management

打开rabbitmq的控制面板,就是对应的控制面板端口为15672。

账号和密码都是:guest

 消息队列模型

 SpringAMQP

 什么是springAMQP?

Spring AMQP 是一个基于 Spring 框架的 AMQP(高级消息队列协议)的开发框架。它提供了一种简化和抽象化的方式来使用 AMQP,使得在应用程序中使用消息队列变得更加容易。

springAMQP的使用

导入依赖

<!--AMQP依赖,包含RabbitMQ-->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

编写发送者

编写applcation.yml文件

spring:rabbitmq:host: 119.9.212.171 # 主机名port: 5672 # 端口virtual-host: / # 虚拟主机username: guest # 用户名password: guest # 密码

进行测试

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;import java.io.IOException;
import java.util.concurrent.TimeoutException;@RunWith(SpringRunner.class) #如果不加此注解,spring容器无法自动注入RabbitTemplate
@SpringBootTest
public class PublisherTest {@AutowiredRabbitTemplate rabbitTemplate;@Testpublic void tess1() {String queueName = "queueName";String message = "hello, tolen";rabbitTemplate.convertAndSend(queueName, message);}
}

测试结果为下:

 可能会出现没有队列生成的情况,这是因为@Test无法自动一个 queue,我们手动创建一个即可。

编写消费者

编辑application.yml文件

spring:rabbitmq:host: 192.168.150.101 # 主机名port: 5672 # 端口virtual-host: / # 虚拟主机username: test # 用户名password: 123456 # 密码

创建消息监听者

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
public class RabbitMqListener {@RabbitListener(queues = "queueName")public void getMessage(String message) {System.out.println("获取的消息是:" + message);}
}

直接配置即可,在后续的项目中消费者会监听对应的消息进行操作。

WorkQueue

我们可以对一个消息标签设置多个监听者,并且默认的设置是预取,也就是即使服务模块处理能力差的情况也会分配到相同个数的信息,不能达到能者多劳的效果,为了到达此效果,我们可以在application.yml中进行设置。

spring:rabbitmq:listener:simple:prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息

发布与订阅

FanoutExchange的使用

在消费者模块编写:新建交换机,新建队列,交换机和队列绑定操作。

在配置类中完成上述操作

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class MQConfiguration {//声明交换机FanoutExchange@Beanpublic FanoutExchange fanoutExchange() {
//        设置交换机的名字return new FanoutExchange("tolen.fanout");}
//    创建一个信息队列1@Beanpublic Queue fanoutQueue1() {return new Queue("fanout.queue1");}
//    创建信息队列2@Beanpublic Queue fanoutQueue2() {return new Queue("fanout.queue2");}//将交换机和队列1进行绑定@Beanpublic Binding bindingQueue1(Queue fanoutQueue1, FanoutExchange fanoutExchange) {//绑定队列给对应的交换机return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);}//将交换机和队列2进行绑定@Beanpublic Binding bindingQueue2(Queue fanoutQueue2, FanoutExchange fanoutExchange) {return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);}
}

在消费者模块中创建两个队列的监听器

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
public class RabbitMqListener {@RabbitListener(queues = "fanout.queue1")public void getMessage1(String message) {System.out.println("消息队列1中获取的消息是:" + message);}@RabbitListener(queues = "fanout.queue2")public void getMessage2(String message) {System.out.println("消息队列2中获取的消息是:" + message);}}

接下来不信消息发送模块,这里需要注意的是,此时我们是向对应的交换机发送消息,通过交换机发送消息给两个消息队列。

发送消息的代码为下:

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;import java.io.IOException;
import java.util.concurrent.TimeoutException;@RunWith(SpringRunner.class)
@SpringBootTest
public class PublisherTest {@AutowiredRabbitTemplate rabbitTemplate;@Testpublic void tess1() {String queueName = "queueName";String message = "hello, tolen";rabbitTemplate.convertAndSend(queueName, message);}@Testpublic void fanoutTest() {String exchangeName = "tolen.fanout";String message = "hi, tolen!";//routingKey不进行设置rabbitTemplate.convertAndSend(exchangeName, "", message);}
}

如果不设置routingKey的话,就会默认将消息发送到使用绑定的消息队列上。 

测试结果为下:

交换机状态

监听器接收到的消息 

 DirectExchange

可以设置routingKey,交换机可以向指定的队列发送消息。

配置监听器

import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
public class RabbitMqListener {//使用注解进行绑定, 不再需要configuration配置@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "directQueue1"),exchange = @Exchange(name = "direct"), //默认使用的交换机类型就是directExchangekey = {"red", "blue"}))public void directQueue1(String message) {System.out.println("directQueue2:" + message);}//使用注解进行绑定, 不再需要configuration配置@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "directQueue2"),exchange = @Exchange(name = "direct"), //默认使用的交换机类型就是directExchangekey = {"red"}))public void directQueue2(String message) {System.out.println("directQueue2:" + message);}
}

编写消息发布模块

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;@RunWith(SpringRunner.class)
@SpringBootTest
public class PublisherTest {@AutowiredRabbitTemplate rabbitTemplate;@Testpublic void fanoutTest() {String exchangeName = "direct";String message = "hi, tolen!";//设置routingKeyrabbitTemplate.convertAndSend(exchangeName, "blue", message);}
}

测试结果为下:

此时就只有routingKey=blue的监听器才会接收到消息。

TopicExchage

Topic类型的ExchangeDirect相比,都是可以根据RoutingKey把消息路由到不同的队列。只不过Topic类型Exchange可以让队列在绑定Routing key 的时候使用通配符!

Routingkey 一般都是有一个或多个单词组成,多个单词之间以”.”分割,例如: item.insert

通配符规则:

#:匹配一个或多个词

*:匹配不多不少恰好1个词

修改编写监听器的配置

//使用注解进行绑定, 不再需要configuration配置@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "directQueue2"),exchange = @Exchange(name = "direct", type = ExchangeTypes.TOPIC), //默认使用的交换机类型就是directExchangekey = {"#.new"}))public void directQueue2(String message) {System.out.println("directQueue2:" + message);}

只要发送的消息中的routingKey中尾部为新闻的消息全部会被监听。(routingKey使用"."作间隔)

消息转换器

在springboot中默认使用JDK的序列化,为了提高使用性,我们可以使用json转换器。

在消费者和发送者中都导入对应的依赖。

<dependency><groupId>com.fasterxml.jackson.dataformat</groupId><artifactId>jackson-dataformat-xml</artifactId><version>2.9.10</version>
</dependency>

在configuration中配置信息转换器。(消费者和发布者都需要配置)

import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class MQConfiguration {@Beanpublic MessageConverter jsonMessageConverter(){return new Jackson2JsonMessageConverter();}
}

进行测试,在发送一个对象类型的消息。

对应的监听器

import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.util.Map;
import java.util.Objects;@Component
public class RabbitMqListener {//使用注解进行绑定, 不再需要configuration配置@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "directQueue2"),exchange = @Exchange(name = "direct"), //默认使用的交换机类型就是directExchangekey = {"blue"}))public void directQueue2(Map<String, String> message) {System.out.println("directQueue2:" + message);}
}

对应的发送代码

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;import java.util.LinkedHashMap;
import java.util.Map;@RunWith(SpringRunner.class)
@SpringBootTest
public class PublisherTest {@AutowiredRabbitTemplate rabbitTemplate;@Testpublic void fanoutTest() {String exchangeName = "direct";Map<String, String> message = new LinkedHashMap<>();message.put("name", "tolen");message.put("age", "19");//设置routingKeyrabbitTemplate.convertAndSend(exchangeName, "blue", message);}
}

测试效果为下:

接收到的数据 。

 消息队列中的数据。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/59225.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Llama-2大模型本地部署研究与应用测试

最近在研究自然语言处理过程中&#xff0c;正好接触到大模型&#xff0c;特别是在年初chatgpt引来的一大波AIGC热潮以来&#xff0c;一直都想着如何利用大模型帮助企业的各项业务工作&#xff0c;比如智能检索、方案设计、智能推荐、智能客服、代码设计等等&#xff0c;总得感觉…

C语言网络编程实现广播

1.概念 如果同时发给局域网中的所有主机&#xff0c;称为广播 我们可以使用命令查看我们Linux下当前的广播地址&#xff1a;ifconfig 2.广播地址 以192.168.1.0 (255.255.255.0) 网段为例&#xff0c;最大的主机地址192.168.1.255代表该网段的广播地址&#xff08;具体以ifcon…

开源的经济影响:商业与社区的平衡

&#x1f337;&#x1f341; 博主猫头虎 带您 Go to New World.✨&#x1f341; &#x1f984; 博客首页——猫头虎的博客&#x1f390; &#x1f433;《面试题大全专栏》 文章图文并茂&#x1f995;生动形象&#x1f996;简单易学&#xff01;欢迎大家来踩踩~&#x1f33a; &a…

ChatGPT 一条命令总结Mysql所有知识点

想学习Mysql的同学,可以使用ChatGPT直接总结mysql所有的内容与知识点大纲 输入 总结Mysql数据库所有内容大纲与大纲细分内容 ChatGPT不光生成内容,并且直接完成了思维导图。 AIGC ChatGPT ,BI商业智能, 可视化Tableau, PowerBI, FineReport, 数据库Mysql Oracle, Offi…

K 次取反后最大化的数组和【贪心算法】

1005 . K 次取反后最大化的数组和 给你一个整数数组 nums 和一个整数 k &#xff0c;按以下方法修改该数组&#xff1a; 选择某个下标 i 并将 nums[i] 替换为 -nums[i] 。 重复这个过程恰好 k 次。可以多次选择同一个下标 i 。 以这种方式修改数组后&#xff0c;返回数组 可能…

word 调整列表缩进

word 调整列表缩进的一种方法&#xff0c;在试了其他方法无效后&#xff0c;按下图所示顺序处理&#xff0c;编号和文字之间的空白就没那么大了。 即右键word上方样式->点击修改格式->定义新编号格式->字体->取消勾选 “……对齐到网格”->确定

AndroidStudio3.5.2修改项目项目包名

公司项目要打造成产品进行演示&#xff0c;需要更换不同的包名进行安装在同一设备上&#xff0c;即所谓的马甲包 更改步骤基本一样 https://blog.csdn.net/qq_35270692/article/details/78336049 需要注意的是&#xff0c;按照上边的步骤修改完后&#xff0c;如果项目中有数据…

机器学习基础之《分类算法(4)—案例:预测facebook签到位置》

一、背景 1、说明 2、数据集 row_id&#xff1a;签到行为的编码 x y&#xff1a;坐标系&#xff0c;人所在的位置 accuracy&#xff1a;定位的准确率 time&#xff1a;时间戳 place_id&#xff1a;预测用户将要签到的位置 3、数据集下载 https://www.kaggle.com/navoshta/gr…

TCP数据报结构分析(面试重点)

在传输层中有UDP和TCP两个重要的协议&#xff0c;下面将针对TCP数据报的结构进行分析 关于UDP数据报的结构分析推荐看UDP数据报结构分析&#xff08;面试重点&#xff09; TCP结构图示 TCP报头结构的分析 一.16位源端口号 源端口表示发送数据时&#xff0c;发送方的端口号&am…

Flutter开发- iOS 问题CocoaPods not installed or not in valid state

解决问题方案&#xff1a; 1、先检查本机CocoaPods是否安装&#xff0c;通过gem list 查看是否安装 打开终端&#xff0c;执行gem list&#xff0c;出现图中的数据即为已安装。未安装看第4 步 2、已经安装了CocoaPods&#xff0c;还出现了图中的提示&#xff0c;你可能已经猜…

java内存模型讨论及案例分析

常用内存选项 -Xmx&#xff1a; 最大堆大小 -Xms&#xff1a;最小堆大小 -Xss &#xff1a;线程堆栈大小&#xff0c;默认1M 生产环境最好保持 Xms Xmx java内存研究 内存布局 可见&#xff1a; 堆大小 新生代 老年代&#xff0c;新生代EFrom SurvivorTo Survivor。新…

Particle Life粒子生命演化的MATLAB模拟

Particle Life粒子生命演化的MATLAB模拟 0 前言1 基本原理1.1 力影响-吸引排斥行为1.2 距离rmax影响 2 多种粒子相互作用2.1 双种粒子作用2.1 多种粒子作用 3 代码 惯例声明&#xff1a;本人没有相关的工程应用经验&#xff0c;只是纯粹对相关算法感兴趣才写此博客。所以如果有…

【已解决】Java 后端使用数组流 Array.stream() 将数组格式的 Cookie 转换成字符串格式

&#x1f389;工作中遇到这样一个场景&#xff1a;远程调用某个接口&#xff0c;该接口需要用户的 Cookie 信息进行权限认证&#xff0c;认证通过之后才可以打通并返回数据。 在后端拿到 httpServletRequest 后&#xff0c;调用 getCookies() 方法&#xff0c;返回的是一个 Coo…

WPF基础入门-Class6-WPF通知更改

WPF基础入门 Class6-WPF通知 1、显示页面&#xff1a; <Grid><StackPanel><TextBox Text"{Binding Name}"></TextBox><TextBox Text"{Binding Title}"></TextBox><Button Command"{Binding ShowCommand}&qu…

el-table动态生成多级表头的表格(js + ts)

展示形式&#xff1a; 详细代码&#xff1a; &#xff08;js&#xff09; <template><div><el-table :data"tableData" style"width: 100%"><el-table-column label"题目信息" align"center"><el-table-…

【C++】C++11的新特性(上)

引入 C11作为C标准的一个重要版本&#xff0c;引入了许多令人振奋的新特性&#xff0c;极大地丰富了这门编程语言的功能和表达能力。本章将为您介绍C11的一些主要变化和改进&#xff0c;为接下来的章节铺垫。 文章目录 引入 一、列表初始化 1、1 {} 初始化 1、2 std::initiali…

java 桥接模式

桥接模式 桥接模式简介桥接模式的实现总结 桥接模式简介 桥接模式&#xff08;Bridge&#xff09;是将抽象部分与它的实现部分分离&#xff0c;使它们都可以独立地变化。它是一种对象结构型模式&#xff0c;又称为柄体(Handle and Body)模式或接口(Interfce)模式。 桥接模式基于…

正则表达式 之 断言详解

正则表达式的先行断言和后行断言一共有 4 种形式&#xff1a; (?pattern) 零宽正向先行断言(zero-width positive lookahead assertion)(?!pattern) 零宽负向先行断言(zero-width negative lookahead assertion)(?<pattern) 零宽正向后行断言(zero-width positive lookb…

NEOVIM学习笔记

GitHub - blogercn/nvim-config: A pretty epic NeoVim setup 一直使用vim&#xff0c;每次到了新公司都要配置半天&#xff0c;而且常常配置失败&#xff0c;很多插件过期不好用。偶然看到别人的NEO VIM&#xff0c;就试着用了一下&#xff0c;感觉还不错。 用来开发和阅读C代…

Kubernetes(K8s)基本环境部署

此处只做学习使用&#xff0c;配置单master环境。 一、环境准备 1、ip主机规划&#xff08;准备五台新机&#xff09;>修改各个节点的主机名 注意&#xff1a;关闭防火墙与selinux 节点主机名ip身份joshua1 kubernetes-master.openlab.cn 192.168.134.151masterjoshua2k…