工作中常用的RabbitMQ实践

目录

1.前置

2.导入依赖

3.生产者

4.消费者

5.验证

验证Direct

验证Fanout

验证Topic


1.前置

安装了rabbitmq,并成功启动

2.导入依赖

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

3.生产者

生产端项目结构:
 

逻辑:生产者只对交换机进行生产,至于队列绑定等放在消费端进行执行

BusinessConfig

定义了三个不同类型的交换机

direct类型:(当生产者往该交换机发送消息时,他必须指定固定的routingkey,当routingkey值为空,他也会匹配routingkey为空的队列)

fanout类型:(当生产者往该交换机发送消息时,他所绑定的队列都会收到消息,routingkey即使写了也会忽略,一般为空字符串)

Topic类型:(当生产者往该交换机发送消息时,他并不像direct指定固定的routingkey,可以进行模糊匹配,当该routingkey为空时,他会匹配routingkey为空的队列)

package com.zsp.quartz.queue;import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Configuration;/*** @Author: ZhangSP* @Date: 2023/12/7  14:05*/
@Slf4j
@Configuration
public class BusinessConfig {// 声明direct交换机public static final String EXCHANGE_DIRECT= "exchange_direct_inform";// 声明fanout交换机public static final String EXCHANGE_FANOUT= "exchange_fanout_inform";// 声明topic交换机public static final String EXCHANGE_TOPIC= "exchange_topic_inform";
}

TestProducer

生产消息

package com.zsp.quartz.queue;import com.alibaba.fastjson.JSON;
import com.zsp.quartz.entity.User;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;@SpringBootTest
@RunWith(SpringRunner.class)
public class TestProducer {@AutowiredRabbitTemplate rabbitTemplate;@Testpublic void Producer_topics_springbootTest() {//使用rabbitTemplate发送消息String message = "";User user = new User();user.setName("张三");user.setEmail("anjduahsd");message = JSON.toJSONString(user);// directrabbitTemplate.convertAndSend(BusinessConfig.EXCHANGE_DIRECT,"",message);// fanoutrabbitTemplate.convertAndSend(BusinessConfig.EXCHANGE_FANOUT,"",message);// topicrabbitTemplate.convertAndSend(BusinessConfig.EXCHANGE_TOPIC,"",message);}
}

4.消费者

消费者目录结构:

BusinessConfig:定义交换机类型,配置交换机与队列的绑定关系,通过容器工厂声明队列

package com.zsp.consumer.queue;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.Connection;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Configuration;import javax.annotation.PostConstruct;/*** @Author: ZhangSP* @Date: 2023/12/7  14:05*/
@Slf4j
@Configuration
public class BusinessConfig {// 声明directpublic static final String EXCHANGE_DIRECT= "exchange_direct_inform";public static final String QUEUE_DIRECT_EMAIL = "queue_direct_inform_email";public static final String QUEUE_DIRECT_SMS = "queue_direct_inform_sms";public void BindDirectEmail(Channel channel) {try {channel.exchangeDeclare(EXCHANGE_DIRECT, BuiltinExchangeType.DIRECT.getType(), true);channel.queueDeclare(QUEUE_DIRECT_EMAIL, true, false, false, null);channel.queueBind(QUEUE_DIRECT_EMAIL, EXCHANGE_DIRECT, "");} catch (Exception e) {log.error("声明Direct->email队列时失败", e);}}public void BindDirectSms(Channel channel) {try {channel.exchangeDeclare(EXCHANGE_DIRECT, BuiltinExchangeType.DIRECT.getType(), true);channel.queueDeclare(QUEUE_DIRECT_SMS, true, false, false, null);channel.queueBind(QUEUE_DIRECT_SMS, EXCHANGE_DIRECT, "123");} catch (Exception e) {log.error("声明Direct->sms失败", e);}}// 声明fanoutpublic static final String EXCHANGE_FANOUT= "exchange_fanout_inform";public static final String QUEUE_FANOUT_EMAIL = "queue_fanout_inform_email";public static final String QUEUE_FANOUT_SMS = "queue_fanout_inform_sms";public void BindFanoutEmail(Channel channel) {try {channel.exchangeDeclare(EXCHANGE_FANOUT, BuiltinExchangeType.FANOUT.getType(), true);channel.queueDeclare(QUEUE_FANOUT_EMAIL, true, false, false, null);channel.queueBind(QUEUE_FANOUT_EMAIL, EXCHANGE_FANOUT, "");} catch (Exception e) {log.error("声明Fanout->email队列时失败", e);}}public void BindFanoutSms(Channel channel) {try {channel.exchangeDeclare(EXCHANGE_FANOUT, BuiltinExchangeType.FANOUT.getType(), true);channel.queueDeclare(QUEUE_FANOUT_SMS, true, false, false, null);channel.queueBind(QUEUE_FANOUT_SMS, EXCHANGE_FANOUT,"");} catch (Exception e) {log.error("声明Fanout->sms失败", e);}}// 声明topicpublic static final String EXCHANGE_TOPIC= "exchange_topic_inform";public static final String QUEUE_TOPIC_EMAIL = "queue_topic_inform_email";public static final String QUEUE_TOPIC_SMS = "queue_topic_inform_sms";public static final String ROUTINGKEY_EMAIL="inform.#.email.#";public static final String ROUTINGKEY_SMS="inform.#.sms.#";public void BindTopicEmail(Channel channel) {try {channel.exchangeDeclare(EXCHANGE_TOPIC, BuiltinExchangeType.TOPIC.getType(),true);channel.queueDeclare(QUEUE_TOPIC_EMAIL, true, false, false, null);channel.queueBind(QUEUE_TOPIC_EMAIL, EXCHANGE_TOPIC, ROUTINGKEY_EMAIL);} catch (Exception e) {log.error("声明Topic->email队列时失败", e);}}public void BindTopicSms(Channel channel) {try {channel.exchangeDeclare(EXCHANGE_TOPIC, BuiltinExchangeType.TOPIC.getType(),true);channel.queueDeclare(QUEUE_TOPIC_SMS, true, false, false, null);channel.queueBind(QUEUE_TOPIC_SMS, EXCHANGE_TOPIC,"");} catch (Exception e) {log.error("声明Topic->sms失败", e);}}// 声明队列@Autowired@Qualifier(value = "zspConnectionFactory")private ConnectionFactory connectionFactory;@PostConstructpublic void shengmingQueue() {try {Connection connection = connectionFactory.createConnection();Channel channel = connection.createChannel(false);BindDirectEmail(channel);BindDirectSms(channel);BindFanoutEmail(channel);BindFanoutSms(channel);BindTopicEmail(channel);BindTopicSms(channel);} catch (Exception e) {log.error("业务实例声明绑定队列报错:",e);}}
}

RabbitFactory:自定义容器工厂

package com.zsp.consumer.queue;import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
@EnableRabbit
public class RabbitFactory {@Bean("zspConnectionFactory")public ConnectionFactory connectionFactory() {CachingConnectionFactory connectionFactory = new CachingConnectionFactory();// 设置RabbitMQ的连接信息,如主机名、端口号、用户名和密码等connectionFactory.setHost("localhost");connectionFactory.setPort(5672);connectionFactory.setUsername("root");connectionFactory.setPassword("root");return connectionFactory;}@Bean("rabbitListenerContainerFactory")public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(@Qualifier("zspConnectionFactory") ConnectionFactory connectionFactory) {SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();factory.setConnectionFactory(connectionFactory);factory.setConcurrentConsumers(5);factory.setMaxConcurrentConsumers(10);return factory;}
}

ReceiveHandler:队列监听

package com.zsp.consumer.queue;import com.alibaba.fastjson.JSONObject;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
public class ReceiveHandler {//监听自定义的Direct队列@RabbitListener(queues = BusinessConfig.QUEUE_DIRECT_SMS, containerFactory = "rabbitListenerContainerFactory")public void directSMS(String msg, Message message, Channel channel) {JSONObject jsonObject = JSONObject.parseObject(msg);System.out.println("Direct队列->sms队列" + jsonObject);}@RabbitListener(queues = BusinessConfig.QUEUE_DIRECT_EMAIL, containerFactory = "rabbitListenerContainerFactory")public void directEmail(String msg, Message message, Channel channel) {JSONObject jsonObject = JSONObject.parseObject(msg);System.out.println("Direct队列->email队列" + jsonObject);}//监听自定义的Fanout队列@RabbitListener(queues = BusinessConfig.QUEUE_FANOUT_SMS, containerFactory = "rabbitListenerContainerFactory")public void FanoutSMS(String msg, Message message, Channel channel) {JSONObject jsonObject = JSONObject.parseObject(msg);System.out.println("Fanout队列->sms队列" + jsonObject);}@RabbitListener(queues = BusinessConfig.QUEUE_FANOUT_EMAIL, containerFactory = "rabbitListenerContainerFactory")public void FanoutEmail(String msg, Message message, Channel channel) {JSONObject jsonObject = JSONObject.parseObject(msg);System.out.println("Fanout队列->email队列" + jsonObject);}//监听自定义的Topic队列@RabbitListener(queues = BusinessConfig.QUEUE_TOPIC_SMS, containerFactory = "rabbitListenerContainerFactory")public void TopicSMS(String msg, Message message, Channel channel) {JSONObject jsonObject = JSONObject.parseObject(msg);System.out.println("Topic队列->sms队列" + jsonObject);}@RabbitListener(queues = BusinessConfig.QUEUE_TOPIC_EMAIL, containerFactory = "rabbitListenerContainerFactory")public void TopicEmail(String msg, Message message, Channel channel) {JSONObject jsonObject = JSONObject.parseObject(msg);System.out.println("Topic队列->email队列" + jsonObject);}
}

5.验证

先启动消费者端,然后执行TestProducer

验证Direct

1.向routingkey为空的队列发消息

我们在消费者端配置了routingkey为空的队列,叫做 QUEUE_DIRECT_EMAIL

因此会打印出下面这条记录

2.向routingkey为123的队列发消息

我们在消费者端配置了routingkey为123的队列,叫做 QUEUE_DIRECT_SMS

因此会打出下面这条记录

验证Fanout

谁跟我绑定了,我都发

验证Topic

模糊匹配routingkey

匹配sms队列

会把下面这个打印出来

需要注意的是如果我们没有自定义容器工厂的话,这个containerFactory可以不写

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/205839.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

uni-app 微信小程序之好看的ui登录页面(五)

文章目录 1. 页面效果2. 页面样式代码 更多登录ui页面 uni-app 微信小程序之好看的ui登录页面&#xff08;一&#xff09; uni-app 微信小程序之好看的ui登录页面&#xff08;二&#xff09; uni-app 微信小程序之好看的ui登录页面&#xff08;三&#xff09; uni-app 微信小程…

力扣面试150题 | 88.合并两个有序数组

力扣面试150题 &#xff5c; 88.合并两个有序数组 题目描述解题思路代码实现复杂度分析 题目描述 88.合并两个有序数组 给你两个按 非递减顺序 排列的整数数组 nums1 和 nums2&#xff0c;另有两个整数 m 和 n &#xff0c;分别表示 nums1 和 nums2 中的元素数目。 请你 合并…

【Flink on k8s】- 12 - Flink kubernetes operator 的高级特性

目录 1、自动伸缩 1.1 工作原理 1.2 Job 要求和限制 1.2.1 要求 1.2.2 限制

Ray构建GPU隔离的机器学习平台

Ray框架介绍 Ray 是一个开源分布式计算框架,在 机器学习基础设施中发挥着至关重要的作用。Ray 促进分布式机器学习训练,使机器学习从业者能够有效利用多个 GPU 的能力。 Ray可以在集群上分布式地运行任务,并且可以指定任务运行时需要使用的GPU数量。Ray可与Nvidia-docker等…

异常检测 | MATLAB实现基于支持向量机和孤立森林的数据异常检测(结合t-SNE降维和DBSCAN聚类)

异常检测 | MATLAB实现基于支持向量机和孤立森林的数据异常检测(结合t-SNE降维和DBSCAN聚类) 目录 异常检测 | MATLAB实现基于支持向量机和孤立森林的数据异常检测(结合t-SNE降维和DBSCAN聚类)效果一览基本介绍模型准备模型设计参考资料 效果一览 基本介绍 提取有用的特征&…

GEE——利用Landsat系列数据集进行1984-2023EVI指数趋势分析

简介: 利用Landsat系列数据集进行1984-2023EVI指数趋势分析其主要目的是进行长时序的分析,这里我们选用EVI指数,然后进行了4个月的分析,查看其最后的线性趋势以及分布状况。 EVI指数: EVI指数(Enhanced Vegetation Index,增强型植被指数)是一种反映植被生长状态的遥…

springboot项目使用Layui作为前端UI的一系列前后端交互的解决方法

背景&#xff1a; 因为比较喜欢Layui&#xff0c;因为多个项目都是从零开始就使用的layui开发的&#xff0c;并且开发过程中借鉴了很多其他项目&#xff08;如Ruoyi、Pear Admin&#xff09;&#xff0c;因此最终选用大部分Pear Admin的项目中使用的一系列解决方案&#xff0c;…

Java的三种代理模式实现

代理模式的定义&#xff1a; Provide a surrogate or placeholder for another object to control access to it.&#xff08;为其他对象提供一种代理以控制对这个对象的访问。&#xff09; 简单说&#xff0c;就是设置一个中间代理来控制访问原目标对象&#xff0c;达到增强原…

领域驱动架构(DDD)建模

一、背景 常见的软件开发方式是拿到产品需求后&#xff0c;直接考虑数据库中表应该如何设计&#xff0c;这种方式已经将设计与业务需求脱节&#xff0c;而更多的是直接考虑应该如何实现了&#xff0c;这有点本末倒置。而DDD是从领域(问题域)为出发点进行的设计方法。 领域驱动…

记录 | centos源码编译bazel

tensorflow的源码编译依赖于 bazel 这里进行 bazel 的源码编译 1、安装依赖 sudo yum install -y java-11-openjdk sudo yum install -y java-11-openjdk-devel sudo yum install -y protobuf-compiler zip unzip2、知悉要安装的 bazel 的版本 务必安装受支持的 Bazel 版本…

Linux下c开发

编程环境 Linux 下的 C 语言程序设计与在其他环境中的 C 程序设计一样&#xff0c; 主要涉及到编辑器、编译链接器、调试器及项目管理工具。编译流程 编辑器 Linux 中最常用的编辑器有 Vi。编译连接器 编译是指源代码转化生成可执行代码的过程。在 Linux 中&#xff0c;最常用…

网络安全行业大模型调研总结

随着人工智能技术的发展&#xff0c;安全行业大模型SecLLM&#xff08;security Large Language Model&#xff09;应运而生&#xff0c;可应用于代码漏洞挖掘、安全智能问答、多源情报整合、勒索情报挖掘、安全评估、安全事件研判等场景。 参考&#xff1a; 1、安全行业大模…

uniapp 打开文件管理器上传(H5、微信小程序、android app三端)文件

H5跟安卓APP 手机打开的效果图&#xff1a; Vue页面&#xff1a; <template><view class"content"><button click"uploadFiles">点击上传</button></view> </template><script>export default {data() {return…

java将List<Object>转为List<Map<String, Object>>,可复用

在开发时我们有时候需要将将List转为List<Map<String, Object>>&#xff0c;可以参考下面的方法 ObjectToMapConverter.java import java.lang.reflect.Field; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.M…

【开放集检测OSR】开放集检测和闭集检测的区别和联系:从模型角度进行理解

定义一个分类器&#xff1a; D t r a i n { ( x i , y i ) } i 1 N ⊂ X C D_{train} \{(x_i, y_i)\}^N _{i1} ⊂ X C Dtrain​{(xi​,yi​)}i1N​⊂XC X&#xff1a;输入空间 ( x i , y i ) (x_i, y_i) (xi​,yi​): 输入的图像x以及其对象的类别标签yC &#xff1a;已知…

web:[GXYCTF2019]BabyUpload(文件上传、一句话木马、文件过滤)

题目 页面显示为文件上传 随便上传一个文件看看 上传一个文本文件显示 上传了一个图片显示 上传包含一句话木马的图片 上传了一个包含php一句话木马的文件&#xff0c;显示如上 换一个写法 上传成功 尝试上传.htaccess&#xff0c;上传失败&#xff0c;用抓包修改文件后缀 …

LangChain 23 Agents中的Tools用于增强和扩展智能代理agent的功能

LangChain系列文章 LangChain 实现给动物取名字&#xff0c;LangChain 2模块化prompt template并用streamlit生成网站 实现给动物取名字LangChain 3使用Agent访问Wikipedia和llm-math计算狗的平均年龄LangChain 4用向量数据库Faiss存储&#xff0c;读取YouTube的视频文本搜索I…

Apache Flink(四):Flink 其他实时计算框架对比

🏡 个人主页:IT贫道_大数据OLAP体系技术栈,Apache Doris,Clickhouse 技术-CSDN博客 🚩 私聊博主:加入大数据技术讨论群聊,获取更多大数据资料。 🔔 博主个人B栈地址:豹哥教你大数据的个人空间-豹哥教你大数据个人主页-哔哩哔哩视频 根据前文描述我们知道Flink主要处…

LNMP网站架构分布式搭建部署(编译安装)

目录 一、数据库编译安装 二、nginx编译安装 三、php编译安装 三、通过nfs将三台不同的主机资源共享 四、基础测试 五、完成WordPress站点部署 六、完成bbs论坛站点部署 一、数据库编译安装 1、先下载安装包到/opt目录中&#xff0c;最好选择mysql-boost-5.7.44.tar.gz版…