SpringAMQP使用

说明:SpringAMQP(官网:https://spring.io/projects/spring-amqp)是基于RabbitMQ封装的一套模板,并利用了SpringBoot对其实现了自动装配,使用起来非常方便。安装和原始使用参考:http://t.csdn.cn/51qyD

基础操作

创建两个模块,一个用于发送消息(sender),一个用于接收消息(receiver),两个模块拥有共同的父模块

第一步:添加依赖

在父模块的pom.xml文件中,添加依赖,如下:

    <parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.3.9.RELEASE</version><relativePath/></parent><dependencies><!--lombok依赖,用于生成set、get、toString方法--><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency><!--AMQP依赖,包含RabbitMQ--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><!--单元测试--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId></dependency></dependencies>

第二步:创建配置文件

配置文件(application.yml)内容如下,两个模块的内容一样

spring:rabbitmq:# MQ ip地址host: XXX.XXX.XXX.XXX# MQ的端口号port: 5672# 虚拟主机 每个用户单独对应一个 不同用户之间无法访问彼此的虚拟主机virtual-host: /# 用户名username: root# 密码password: 123456

第三步:创建Listener类

在接收方,创建监听类,用来接收消息,如下:

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
public class RabbitListenerDemo {@RabbitListener(queues = "demo.queue")public void listenDemoQueueMessage(String msg){System.out.println("msg = " + msg);}
}

第四步:编写发送端代码

在发送方的测试类中,写测试代码,发消息给接收方,其中RunWith()注解用于构建程序运行的上下文环境;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;@RunWith(SpringRunner.class)
@SpringBootTest
public class SenderTest {@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void testSender() {rabbitTemplate.convertAndSend("demo.queue","hello rabbit mq!");}
}

第五步:启动

先启动接收方(这是因为,如果队列在RabbitMQ管理平台上不存在的话,先启动发送方会造成消息丢失,而先启动接收方,RabbitMQ会根据队列名先创建出队列),再启动发送方;

可以看到,测试完成,接收方可以接收到消息

在这里插入图片描述

工作队列

实际的业务情况是一个发送方,可能会有多个接收方来接收,而且接收方处理效率可能各不相同。这样,接收方的代码可以写成这样,使用线程休眠模拟接收方执行的效率,再设置变量用于统计各个接收方执行的次数:

(RabbitListenerDemo.java)

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
public class RabbitListenerDemo {private static int count1 = 0;private static int count2 = 0;private static int count3 = 0;@RabbitListener(queues = "demo.queue")public void listenDemoQueueMessage1(String msg) throws InterruptedException {System.out.println("msg1 = " + msg + "======= count1 =" + (++count1));Thread.sleep(10);}@RabbitListener(queues = "demo.queue")public void listenDemoQueueMessage2(String msg) throws InterruptedException {System.out.println("msg2 = " + msg + "======= count2 =" + (++count2));Thread.sleep(20);}@RabbitListener(queues = "demo.queue")public void listenDemoQueueMessage3(String msg) throws InterruptedException {System.out.println("msg3 = " + msg + "======= count3 =" + (++count3));Thread.sleep(50);}
}

(SenderTest:循环发送200次,休眠10毫秒)

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;@RunWith(SpringRunner.class)
@SpringBootTest
public class SenderTest {@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void testSender() throws InterruptedException {for (int i = 0; i < 200; i++) {rabbitTemplate.convertAndSend("demo.queue","hello rabbit mq!======>" + i);Thread.sleep(10);}}
}

启动,可以看到执行效率最低的3号,也和1号、2号接收到了等量的消息量,

在这里插入图片描述

这是因为RabbitMQ有默认的分配策略,使每个接收方都可以接收到等量的消息量,而不是处理越快的处理越多。可以在接收方的配置文件中,添加这个配置,表示每个接收方只能一个消息一个消息处理(可以推测默认是先按照接收方数量,把请求都平均分配好之后,再让它们各自处理的);

spring:rabbitmq:listener:simple:prefetch: 1

重启测试,可以看到,达到了“能者多劳”的效果

在这里插入图片描述

发布/订阅

发布/订阅,是指在消息发给队列前,对消息所绑定的队列信息做判断,然后按照绑定的队列对消息进行分发;

在这里插入图片描述

根据分发的情况,可分为以下三种:

  • 广播(Fanout):消息分发给所有队列;

  • 路由(Direct):消息只分发给拥有关键字(RoutingKey)的队列;

  • 主题(Topic):消息只分发给符合条件的队列;

Fanout(广播)

创建一个广播配置类,用于绑定队列与广播交换机(FanoutExchange);

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** 广播配置类*/
@Configuration
public class FanoutConfig {/*** 声明交换机* @return*/@Beanpublic FanoutExchange fanoutExchange(){return new FanoutExchange("essay.fanout");}/*** 生成第一个队列* @return*/@Beanpublic Queue fanoutQueue1(){return new Queue("fanout.queue1");}/*** 绑定队列和交换机** @return*/@Beanpublic Binding bindingQueue1(){return BindingBuilder.bind(fanoutQueue1()).to(fanoutExchange());}/*** 生成第二个队列* @return*/@Beanpublic Queue fanoutQueue2(){return new Queue("fanout.queue2");}/*** 绑定队列和交换机** @return*/@Beanpublic Binding bindingQueue2(){return BindingBuilder.bind(fanoutQueue2()).to(fanoutExchange());}
}

接收方代码

    @RabbitListener(queues = "fanout.queue1")public void listenFanoutQueue1(String msg){System.out.println("接收者1接收到了消息:" + msg);}@RabbitListener(queues = "fanout.queue2")public void listenFanoutQueue2(String msg){System.out.println("接收者2接收到了消息:" + msg);}

发送方代码:消息并不直接发送给队列,而是发送个交换机;

    @Testpublic void fanoutExchangeTest(){// 第二个参数是routeKey(路由转发关键字)不能不加,可以为空字符串rabbitTemplate.convertAndSend("essay.fanout","", "hello everyone!");}

测试结果,每个队列都接收到了消息,并发给各自的接收方

在这里插入图片描述

Direct(路由)

在接收方的接收方法上,创建对应的队列、路由交换机,并设置routeKey(路由关键字),接收者1号(group1, group2),接收者2号(group1, group3)

    @RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue1"),exchange = @Exchange(name = "essay.direct",type = ExchangeTypes.DIRECT),key = {"group1", "group2"}))public void listenDirectQueue1(String msg){System.out.println("接收者1号接收到了消息:" + msg);}@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue2"),exchange = @Exchange(name = "essay.direct",type = ExchangeTypes.DIRECT),key = {"group1", "group3"}))public void listenDirectQueue2(String msg){System.out.println("接收者2号接收到了消息:" + msg);}

发送方发送消息,routeKey = group1

	rabbitTemplate.convertAndSend("essay.direct","group1", "hello group1!");

在这里插入图片描述


发送方发送消息,routeKey = group2

	rabbitTemplate.convertAndSend("essay.direct","group2", "hello group2!");

只有接收者1号拥有group2,故只有接收者1号接收到消息

在这里插入图片描述

Topic(主题)

与路由类似,不同的是ExchangeTypes的类型key的组成,key由通配符和关键字组成

  • #:表示一个或多个字符;

  • *:表示一个字符;

如下面的三个key分别表示:

  • group.#:表示以“group”开头的消息都发过来;

  • #.class:表示以“class”结尾的消息都发过来;

  • *.person:表示两个字符,并以“person”结尾的消息都发过来;

    @RabbitListener(bindings = @QueueBinding(value = @Queue(name = "topic.queue1"),exchange = @Exchange(name = "essay.topic",type = ExchangeTypes.TOPIC),key = "group.#"))public void listenTopicQueue1(String msg){System.out.println("接收者1号接收到了消息:" + msg);}@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "topic.queue2"),exchange = @Exchange(name = "essay.topic",type = ExchangeTypes.TOPIC),key = "#.class"))public void listenTopicQueue2(String msg){System.out.println("接收者2号接收到了消息:" + msg);}@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "topic.queue3"),exchange = @Exchange(name = "essay.topic",type = ExchangeTypes.TOPIC),key = "*.person"))public void listenTopicQueue3(String msg){System.out.println("接收者3号接收到了消息:" + msg);}

发送方测试

	// 1号接收rabbitTemplate.convertAndSend("essay.topic","group.b.c.d", "hello NO.1!");// 2号接收rabbitTemplate.convertAndSend("essay.topic","b.c.d.class", "hello NO.2!");// 3号接收rabbitTemplate.convertAndSend("essay.topic","b.person", "hello NO.3!");

启动,测试结果如下,可以看到达到了预期结果

在这里插入图片描述

总结

RabbitMQ是一门异步通信的技术,SpringAMQP是基于RabbitMQ的模版,可以省去原始操作RabbitMQ的繁琐(建立连接、设置连接参数、创建通道、创建队列、发送消息/接收消息)。

另外,可以使用SpringAMQP建立工作队列、发布/订阅等模式,其中工作队列可设置spring.rabbitmq.listener.simple.prefetch=1,达到“能者多劳”的效果;

而发布/订阅模式又分为广播、路由和主题,广播模式需要手动建立队列和路由交换机的关联,路由与主题的区别在于路由交换机的类型和路由关键字的格式。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/4715.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

视频做成GIF动图怎么做?分享超简单的制作方法

将视频制作GIF动图的好处在于它可以将原本较长的视频压缩成一个简短、易于分享的图像文件。这使得它们非常适合用于社交媒体、博客、电子邮件等场景&#xff0c;可以当做表情包来使用&#xff0c;尤其是看到一段搞笑的视频&#xff0c;想要把它做成GIF动图该怎么做呢&#xff1…

常见java知识点1

目录 1 什么是Spring框架&#xff1f;Spring框架有哪些主要模块&#xff1f; 2 使用Spring框架有什么好处&#xff1f; 3 Java常用的包&#xff08;列举六个&#xff09; 4 Arraylist 和 Linkedlist 的区别 5 HashMap和Hashtable的区别 6 Java中常见的…

小白到运维工程师的自学之路 第五十四集 (ansible自动化运维工具)

一、概述 Ansible是一种开源的自动化工具&#xff0c;用于自动化任务的执行、配置管理和应用部署。它采用基于Python编写的简单、轻量级的语法&#xff0c;可以通过SSH协议远程管理和配置多台计算机。 Ansible的主要特点包括&#xff1a; 1、简单易用&#xff1a;设计简单&a…

LCD—STM32液晶显示(1.显示器简介及LCD显示原理)(6000字详细介绍)

目录 显示器简介 液晶显示器 液晶 像素 液晶屏缺点 LED显示器 OLED显示器 显示器的基本参数 STM32板载液晶控制原理&#xff08;不带微控制器&#xff09; 液晶控制原理 控制信号线(不带液晶控制器) 液晶数据传输时序 显存 总结 3.2寸液晶屏介绍&#xff08;搭载…

基于单片机的智能鞋柜的设计与实现

功能介绍 以51单片机作为主控系统&#xff1b;通过DHT11温湿度采集&#xff1b;通过按键设置逻辑处理&#xff1b;通过LED紫外线消毒&#xff1b;通过继电器控制风扇进行换气除湿&#xff1b;通过继电器控制加热片进行加热&#xff1b;整个电路以5v供电; 电路图 PCB 源代码 #i…

cocosCreator 3.6以上接入腾迅Bugly 捕捉JS错误 Android

cocosCreator3.6以上接入Bugly上报其实很简单&#xff0c;不需要网上那么多弯弯绕&#xff0c;三须三步走。 1. 按照官网方式接入android的bugly 2. android端写一个Bugly上报管理类 3. 修改你工程目录下native\engine\common\Classes\目录下的Game.h, Game.cpp两个文件&…

大语言模型的预训练[2]:GPT、GPT2、GPT3、GPT3.5、GPT4相关理论知识和模型实现、模型应用以及各个版本之间的区别详解

大语言模型的预训练[2]:GPT、GPT2、GPT3、GPT3.5、GPT4相关理论知识和模型实现、模型应用以及各个版本之间的区别详解 1.GPT 模型 1.1 GPT 模型简介 在自然语言处理问题中&#xff0c;可从互联网上下载大量无标注数据&#xff0c;而针对具体问题的有标注数据却非常少&#x…

【JavaEE】Tomcat的安装和使用、创建Mevan项目使用Servlet写一个程序

目录 前言 一、Tomcat的下载和安装 二、写一个简单的Servlet项目 1、创建一个Maven项目 2、引入依赖 3、创建目录 4、编写Servlet代码。 5、打包程序 6、将程序部署到Tomcat上 7、验证程序运行结果 三、在IDEA上安装Smart Tomcat插件 四、Servlet中的一些常见错误 …

基于timegan扩增技术,进行多维度数据扩增(Python编程,数据集为瓦斯浓度气体数据集)

1.数据集介绍 瓦斯是被预测气体&#xff0c;其它列为特征列,原始数据一共有472行数据&#xff0c;因为原始数据比较少&#xff0c;所以要对原始数据&#xff08;总共8列数据&#xff09;进行扩增。 开始数据截图 截止数据截图 2. 文件夹介绍 lstm.py是对未扩增的数据进行训练…

ChatGLM-6B+LangChain实战

目标&#xff1a;原始使用ChatGLM-6B可接受的文字长度有限&#xff0c;打算结合LangChain实现长文本生成摘要. 方法&#xff1a; step1&#xff1a;自定义一个GLM继承LangChain中的langchain.llms.base.LLM&#xff0c;load自己的模型. step2&#xff1a;使用LangChain的mapred…

electron globalShortcut 快捷键与系统全局快捷键冲突

用 electron 开发自己的接口测试工具&#xff08;Post Tools&#xff09;&#xff0c;在设置了 globalShortcut 快捷键后&#xff0c;发现应用中的快捷键与系统全局快捷键冲突了&#xff0c;导致系统快捷键不可正常使用。 快捷键配置 export function initGlobalShortcut(main…

MySQL数据库(一)

目录 一、MySQL安装与配置 1.1什么是数据库 1.2数据库的分类 二、MySQL服务器安装 2.1Windows绿色安装 2.2配置环境 一、MySQL安装与配置 1.1什么是数据库 存储数据用文件就可以了&#xff0c;为什么还要弄个数据库? 文件保存数据有以下几个缺点&#xff1a; 文件的安全性问…

typescript manual

这里写目录标题 throw new Error在浏览器中调试Json定义类型定义数组 functionNamed functionanonymous function Axios经典片段 错误及解决ref valuebecause it is a constantAPI 和 客户端定义的数据结构不一样ServerClient throw new Error throw new Error(“Get data err…

leetcode 1218. Longest Arithmetic Subsequence of Given Difference(给定差值的最长算术子序列)

给数组arr和一个差值difference, 不打乱arr中数字的顺序&#xff0c;抽取最长的子序列&#xff0c;使序列中每相邻两个元素的差值为difference. 求满足条件的最长子序列的长度。 思路&#xff1a; DP 因为差值difference是固定的&#xff0c;每抽取一个元素&#xff0c;它前…

【lesson2】Linux基本指令1

文章目录 touch创建文件更新文件最新修改时间 lslsls -lls -als -i pwd...cdcd 路径法一&#xff1a;cd 绝对路径法二&#xff1a;cd 相对路径 cd - stattreemkdirmkdir创建一个目录mkdir -p创建一串路径目录 ~/rmdirrmrmrm -frm -rrm -i mancpcpcp -r mvnaocatcatcat -n ta…

信息泄露与大数据:隐私安全的挑战与对策

随着大数据时代的到来&#xff0c;我们生活的方方面面都与数据息息相关。然而&#xff0c;随之而来的信息泄露问题也日益严重&#xff0c;给个人隐私和数据安全带来了巨大挑战。本文将围绕信息泄露与大数据展开讨论&#xff0c;探讨其中的问题、原因以及如何应对。 山海鲸大屏 …

16 | 视图:如何实现服务和数据在微服务各层的协作?

目录 服务的协作 1. 服务的类型 2. 服务的调用 微服务内跨层 微服务之间的服务调用 领域事件驱动 3. 服务的封装与组合 基础层 领域层 应用层 用户接口层 4. 两种分层架构的服务依赖关系 松散分层架构的服务依赖 严格分层架构的服务依赖 数据对象视图 基础层 领…

【Linux】分布式存储系统 Ceph

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 分布式存储系统 Ceph Ceph 概述1、Ceph 简介2、存储基础3、Ceph 优势4、Ceph 架构4、Ceph 核心组件5、OSD 存储后端6、Ceph 数据的存储过程7、Ceph 版本发行生命周期 Ceph 集…

向量检索增强chatglm生成

背景&#xff1a; 基于chatglm构建agnet&#xff1a;chatglm实现Agent控制 - 知乎 前面一篇文章已经介绍了如何去搭建LLM Agent控制系统&#xff0c;也简单介绍了如何去构建Toolset和构建Action。但是在上篇文章中Toolset其实是基于搜索api构建的&#xff0c;从这篇文章开始后…

svn迁移到git实际操作

1.到svn项目目录右键选中gitbash打开窗口&#xff0c;执行获取用户并映射成git样式账号命令如下: svn log -q | awk -F | /^r/ {sub("^ ", "", $2); sub(" $", "", $2); print $2" "$2" <"$2"163.cn>…