springboot使用rabbitmq

使用springboot创建rabbitMQ的链接。

整个项目结构如下:

img

1.maven依赖

<dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>3.4.1</version>
</dependency>

application.yaml的配置如下

spring:application:name: rabbitMQrabbitmq:host: 192.168.142.128  #rabbitmq的主机名port: 5672			   #端口,默认5672username: itheima      #rabbitmq的账号password: 123321	   #密码
server:port: 8081               #项目启动端口

2.创建rabbitMQ配置类 – RabbitConfig。

@Configuration
@Slf4j
public class RabbitConfig {@Bean("directExchange")public DirectExchange directExchange() {return new DirectExchange(MQConstant.DIRECT_EXCHANGE);}@Bean("directQueue")public Queue directQueue() {return new Queue(MQConstant.DIRECT_QUEUE);}@Bean("bindingDirectExchange")public Binding bindingDirectExchange(@Qualifier("directExchange") DirectExchange directExchange,@Qualifier("directQueue") Queue directQueue) {return BindingBuilder.bind(directQueue).to(directExchange).with(MQConstant.ROUTING_KEY);}}

3.创建RabbitMQ客户端类,主要是用来发送消息用的。

@Component
@Slf4j
public class RabbitMqClient {@Autowiredprivate RabbitTemplate rabbitTemplate;public void send(MessageBody messageBody){try{String uuid = UUID.randomUUID().toString();CorrelationData correlationData = new CorrelationData(uuid);rabbitTemplate.convertAndSend(MQConstant.DIRECT_EXCHANGE, MQConstant.ROUTING_KEY, JSON.toJSONString(messageBody),new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {// 消息持久化message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);log.info("message send,{}", message);return message;}},correlationData);log.info("message send successful");}catch (Exception e){log.info("send message error:{}",e);}}
}

4.创建接收消息类 —RabbitMqServer

@Component
@Slf4j
public class RabbitMqServer {@RabbitListener(queues = MQConstant.DIRECT_QUEUE)public void receive(String message) {try {log.info("receive message:{}", message);MessageBody messageBody = JSON.parseObject(message, MessageBody.class);switch (messageBody.getTopic()) {case QueueTopic.USER_LOGIN:User user = JSON.parseObject(messageBody.getData(), User.class);log.info("receive user:{}", user);break;default:log.info("no need hanndle message:{},topic:{}", message, messageBody.getTopic());break;}}catch (Exception e){log.error("rabbitmq receive message error:{}", e);}}
}

5.有了以上准备后就可以开始向mq里面发送消息了,在单元测试编写测试代码。

@SpringBootTest(classes = RabbitMqApplication.class)
class RabbitMqApplicationTests {@Autowiredprivate RabbitMqClient rabbitMqClient;@Testvoid testDirectSend() {//数据User user = new User();user.setId(123);user.setName("Lewin-jie2");user.setPassword("123");MessageBody messageBody = new MessageBody();messageBody.setData(JSON.toJSONString(user));long time = new Date().getTime();messageBody.setSendTime(time);//添加主题messageBody.setTopic(QueueTopic.USER_LOGIN);rabbitMqClient.send(messageBody);}}

6.运行后,可以看到后台的日志,证明我们消息发送已经成功了。

image-20241109151751035

我们打开rabbitmq的控制台(http://你的主机名:15672),可以开到队列里面也收到了消息,但是还没有被消费。

image-20241109152401671

以上出现结果就证明rabbit已经是配置好了。那么我们来了解一下啊rabbitmq

简介:rabbitmq是基于amqp协议,用elang语言开发的一个高级的消息队列,以高性能,高可靠,高吞吐量而被大量应用到应用系统作为第三方消息中间件使用,为应用系统实现应用解耦削峰减流,异步消息

rabbitmq主要构造有,producter,consumer,exchange,queue组成

1.直连交换机(direct_exchange)。

刚刚配置的时候就是演示的producter发消息到直连交换机,然后再发送到queue中的过程。

2.广播交换机(fanout_exchange).

顾名思义,就是绑定该交换机的所有队列都可以收到这个交换机的消息

	@Bean("fanoutExchange")public FanoutExchange fanoutExchange() {return new FanoutExchange(MQConstant.FANOUT_EXCHANGE);}@Bean("aQueue")public Queue aQueue(){return new Queue(MQConstant.FANOUT_QUEUE_A);}@Bean("bQueue")public Queue bQueue(){return new Queue(MQConstant.FANOUT_QUEUE_B);}@Bean("cQueue")public Queue cQueue(){return new Queue(MQConstant.FANOUT_QUEUE_C);}/*** 绑定队列aQueue bQueue cQueue*/@Bean("bindingFanoutExchange1")public Binding bindingFanoutExchange1(@Qualifier("fanoutExchange") FanoutExchange fanoutExchange,@Qualifier("aQueue") Queue aQueue){return BindingBuilder.bind(aQueue).to(fanoutExchange);}@Bean("bindingFanoutExchange2")public Binding bindingFanoutExchange2(@Qualifier("fanoutExchange") FanoutExchange fanoutExchange,@Qualifier("bQueue") Queue bQueue){return BindingBuilder.bind(bQueue).to(fanoutExchange);}@Bean("bindingFanoutExchange3")public Binding bindingFanoutExchange3(@Qualifier("fanoutExchange") FanoutExchange fanoutExchange,@Qualifier("cQueue") Queue cQueue){return BindingBuilder.bind(cQueue).to(fanoutExchange);}

编写controller类,再postman上面测试[http://localhost:8081/mq/sendFanoutMsg?msg=hi i am a fanoutmag](http://localhost:8081/mq/sendFanoutMsg?msg=hi i am a fanoutmag)

@Controller
@RequestMapping("/mq")
@Slf4j
public class SendMessageController {@Autowiredprivate RabbitMqClient rabbitMqClient;@PostMapping("/sendFanoutMsg")public String sendFanoutMsg(@RequestParam("msg") String msg){try {MessageBody messageBody = new MessageBody();messageBody.setData(msg);rabbitMqClient.send1(messageBody);}catch (Exception e){log.error("sendFanoutMsg error{}", e);}return "send fanout msg success";}
}

结果:控制台收到消息了!!!

image-20241109213739205

3.主题交换机(topic_exchange)

topic_exchange和direct_exchange很像,topic有通配符。direct没有。

image-20241109214721827
  1. china.news 代表只关心中国新闻

  2. china.weather 代表只关心中国天气

  3. japan.news 代表只关心日本的新闻

  4. japan.weather 代表只关心日本的天气

    controller接口

    @PostMapping("/sendTopicMsg")public String sendTopicMsg(@RequestParam("msg") String msg,@RequestParam("type") String type){try {MessageBody messageBody = new MessageBody();messageBody.setData(msg);rabbitMqClient.send3(messageBody,type);}catch (Exception e){log.error("sendTopicMsg error{}", e);}return "send topic msg success";}
    

    利用postman测试。

    1.msg: “祖国75岁生日快乐”,type:“china.news”

    image-20241110102452558

    预测:queue1,queue4会接收到消息。

    image-20241110102659758

    2.msg: “日本大量排核废水,导致哥斯拉出现”,type:“japan.news”

    image-20241110102738974

    预测:queue2,queue4会接收到消息。

    image-20241110103638277

    3.msg: “今日日本出现大暴雨,怀疑是哥斯拉来了”,type:“Japan.weather”

    image-20241110103727452

    预测:queue2,queue3会接收到消息

    image-20241110103839382

topic-exchange在代码中如何使用。首先创建交换机,和队列,绑定交换机。

/*============================topic===========================*/@Bean("topicExchange")public TopicExchange topicExchange() {return new TopicExchange(MQConstant.TOPIC_EXCHANGE);}@Bean("queue1")public Queue queue1(){return new Queue(MQConstant.QUEUE1);}@Bean("queue2")public Queue queue2(){return new Queue(MQConstant.QUEUE2);}@Bean("queue3")public Queue queue3(){return new Queue(MQConstant.QUEUE3);}@Bean("queue4")public Queue queue4(){return new Queue(MQConstant.QUEUE4);}@Bean("bingTopicExchange1")public Binding bingTopicExchange1(@Qualifier("queue1") Queue queue1,@Qualifier("topicExchange") TopicExchange topicExchange) {return BindingBuilder.bind(queue1).to(topicExchange).with(MQConstant.CHINA_);}@Bean("bingTopicExchange2")public Binding bingTopicExchange2(@Qualifier("queue2") Queue queue2,@Qualifier("topicExchange") TopicExchange topicExchange) {return BindingBuilder.bind(queue2).to(topicExchange).with(MQConstant.JAPAN_);}@Bean("bingTopicExchange3")public Binding bingTopicExchange3(@Qualifier("queue3") Queue queue3,@Qualifier("topicExchange") TopicExchange topicExchange) {return BindingBuilder.bind(queue3).to(topicExchange).with(MQConstant._WEATHER);}@Bean("bingTopicExchange4")public Binding bingTopicExchange4(@Qualifier("queue4") Queue queue4,@Qualifier("topicExchange") TopicExchange topicExchange) {return BindingBuilder.bind(queue4).to(topicExchange).with(MQConstant._NEWS);}

消息发送

 public void send3(MessageBody messageBody,String routingKey) {try{String uuid = UUID.randomUUID().toString();CorrelationData correlationData = new CorrelationData(uuid);rabbitTemplate.convertAndSend(MQConstant.TOPIC_EXCHANGE, routingKey , JSON.toJSONString(messageBody),new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {// 消息持久化message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);log.info("message send,{}", message);return message;}},correlationData);log.info("message send successful");}catch (Exception e){log.info("send message error:{}",e);}}

消息接收

@RabbitListener(queues = MQConstant.QUEUE1)public void receive4(String message) {log.info("topic exchange");try {log.info("queue1 receive message:{}", message);MessageBody messageBody = JSON.parseObject(message, MessageBody.class);log.info("receive message:{}", messageBody.getData());}catch (Exception e){log.error("rabbitmq receive a message error:{}", e);}}@RabbitListener(queues = MQConstant.QUEUE2)public void receive5(String message) {log.info("topic exchange");try {log.info("queue2 receive message:{}", message);MessageBody messageBody = JSON.parseObject(message, MessageBody.class);log.info("receive message:{}", messageBody.getData());}catch (Exception e){log.error("rabbitmq receive a message error:{}", e);}}@RabbitListener(queues = MQConstant.QUEUE3)public void receive6(String message) {log.info("topic exchange");try {log.info("queue3 receive message:{}", message);MessageBody messageBody = JSON.parseObject(message, MessageBody.class);log.info("receive message:{}", messageBody.getData());}catch (Exception e){log.error("rabbitmq receive a message error:{}", e);}}@RabbitListener(queues = MQConstant.QUEUE4)public void receive7(String message) {log.info("topic exchange");try {log.info("queue4 receive message:{}", message);MessageBody messageBody = JSON.parseObject(message, MessageBody.class);log.info("receive message:{}", messageBody.getData());}catch (Exception e){log.error("rabbitmq receive a message error:{}", e);}}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/69637.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

论文阅读(九):通过概率图模型建立连锁不平衡模型和进行关联研究:最新进展访问之旅

1.论文链接&#xff1a;Modeling Linkage Disequilibrium and Performing Association Studies through Probabilistic Graphical Models: a Visiting Tour of Recent Advances 摘要&#xff1a; 本章对概率图模型&#xff08;PGMs&#xff09;的最新进展进行了深入的回顾&…

php twig模板引擎详细使用教程

php twig模板引擎 1. 什么是Twig模板引擎 Twig是一个强大且灵活的PHP模板引擎&#xff0c;它提供了一种更简洁和可扩展的方法来创建PHP应用程序的视图层。Twig模板引擎旨在将设计与业务逻辑分离&#xff0c;并为开发人员提供一种更加清晰和易于维护的方式来构建网页。Twig由S…

蓝桥杯之c++入门(一)【C++入门】

目录 前言5. 算术操作符5.1 算术操作符5.2 浮点数的除法5.3 负数取模5.4 数值溢出5.5 练习练习1&#xff1a;计算 ( a b ) ⋆ c (ab)^{\star}c (ab)⋆c练习2&#xff1a;带余除法练习3&#xff1a;整数个位练习4&#xff1a;整数十位练习5&#xff1a;时间转换练习6&#xff…

设想中的计算机语言:可执行对象的构造函数和析构函数

经典 C语言的内存管理&#xff0c;是一块一块的&#xff0c;用malloc分配内存&#xff0c;用free释放内存。 C有对象&#xff0c;一个对象是好几片内存&#xff0c;用指针连接起来&#xff0c;用构造函数和析构函数管理对象。 创意 如图&#xff0c;是一个“可执行对象”&am…

SAP系统中的主要采购类型/采购模式总结

在 SAP 系统中,采购类型主要有以下几种: 一、标准采购订单(Standard Purchase Order) 描述:这是最常用的采购类型,用于一次性采购货物或服务。采购部门根据需求部门提出的采购申请,向供应商发出采购订单,明确规定了采购的物料、数量、价格、交货日期等详细信息。 应…

SpringCloud系列教程:微服务的未来(十七)监听Nacos配置变更、更新路由、实现动态路由

前言 在微服务架构中&#xff0c;API 网关是各个服务之间的入口点&#xff0c;承担着路由、负载均衡、安全认证等重要功能。为了实现动态的路由配置管理&#xff0c;通常需要通过中心化的配置管理系统来实现灵活的路由更新&#xff0c;而无需重启网关服务。Nacos 作为一个开源…

pycharm(2)

conda 我下载安装conda的时候产生了各种问题&#xff0c;最终我发现&#xff0c;打开杀毒软件会有阻碍 cuda的版本问题很大&#xff0c;我尝试多个版本之后&#xff0c;发现anaconda3-2024.06.1-windows-x86_64安装了之后不会报错&#xff0c;另外pycharm的版本也一直有问题&a…

DeepSeek-R1:通过强化学习激励大型语言模型(LLMs)的推理能力

摘要 我们推出了第一代推理模型&#xff1a;DeepSeek-R1-Zero和DeepSeek-R1。DeepSeek-R1-Zero是一个未经监督微调&#xff08;SFT&#xff09;作为初步步骤&#xff0c;而是通过大规模强化学习&#xff08;RL&#xff09;训练的模型&#xff0c;展现出卓越的推理能力。通过强…

Maven的下载安装配置

maven的下载安装配置 maven是什么 Maven 是一个用于 Java 平台的 自动化构建工具&#xff0c;由 Apache 组织提供。它不仅可以用作包管理&#xff0c;还支持项目的开发、打包、测试及部署等一系列行为 Maven的核心功能 项目构建生命周期管理&#xff1a;Maven定义了项目构建…

< OS 有关 > 阿里云 几个小时前 使用密钥替换 SSH 密码认证后, 发现主机正在被“攻击” 分析与应对

信息来源&#xff1a; 文件&#xff1a;/var/log/auth.log 因为在 sshd_config 配置文件中&#xff0c;已经定义 LogLevel INFO 部分内容&#xff1a; 2025-01-27T18:18:55.68272708:00 jpn sshd[15891]: Received disconnect from 45.194.37.171 port 58954:11: Bye Bye […

解决幂等问题的4种方案

幂等问题引入与准备工作 幂等概念&#xff1a;幂等指多次操作影响仅与首次执行结果相同&#xff0c;重复执行不会对系统造成额外变化。业务场景问题&#xff1a;以网站金币充值为例&#xff0c;因网络不稳定&#xff0c;支付宝支付成功的异步通知可能多次发送&#xff0c;若商家…

LitServe - 闪电般快速服务AI模型⚡

文章目录 一、关于 LitServe二、快速启动定义服务器测试服务器LLM 服务小结 三、特色示例功能特点 四、性能表现五、托管选项 一、关于 LitServe LitServe是一个易于使用、灵活的服务引擎&#xff0c;适用于基于FastAPI构建的AI模型。批处理、流式传输和GPU自动缩放等功能消除…

小程序电商运营内容真实性增强策略及开源链动2+1模式AI智能名片S2B2C商城系统源码的应用探索

摘要&#xff1a;随着互联网技术的不断发展&#xff0c;小程序电商已成为现代商业的重要组成部分。然而&#xff0c;如何在竞争激烈的市场中增强小程序内容的真实性&#xff0c;提高用户信任度&#xff0c;成为电商运营者面临的一大挑战。本文首先探讨了通过图片、视频等方式增…

【HarmonyOS之旅】基于ArkTS开发(三) -> 兼容JS的类Web开发(三)

目录 1 -> 生命周期 1.1 -> 应用生命周期 1.2 -> 页面生命周期 2 -> 资源限定与访问 2.1 -> 资源限定词 2.2 -> 资源限定词的命名要求 2.3 -> 限定词与设备状态的匹配规则 2.4 -> 引用JS模块内resources资源 3 -> 多语言支持 3.1 -> 定…

Linux网络 | 理解TCP面向字节流、打通socket与文件的关系

前言&#xff1a;我们经常说TCP是面向字节流的&#xff0c; TCP是面向字节流的。 但是&#xff0c; 到底是什么事面向字节流呢&#xff1f; 另外&#xff0c; 我们知道sockfd其实就是文件fd。 但是&#xff0c;为什么sockfd是文件fd呢&#xff1f; 这些问题都在本节内容中的到回…

FireFox | Google Chrome | Microsoft Edge 禁用更新 final版

之前的方式要么失效&#xff0c;要么对设备有要求&#xff0c;这次梳理一下对设备、环境几乎没有要求的通用方式&#xff0c;universal & final 版。 1.Firefox 方式 FireFox火狐浏览器企业策略禁止更新_火狐浏览器禁止更新-CSDN博客 这应该是目前最好用的方式。火狐也…

大数据学习之Kafka消息队列、Spark分布式计算框架一

Kafka消息队列 章节一.kafka入门 4.kafka入门_消息队列两种模式 5.kafka入门_架构相关名词 Kafka 入门 _ 架构相关名词 事件 记录了世界或您的业务中 “ 发生了某事 ” 的事实。在文档中 也称为记录或消息。当您向 Kafka 读取或写入数据时&#xff0c;您以事件的 形式执行…

深度学习指标可视化案例

TensorBoard 代码案例&#xff1a;from torch.utils.tensorboard import SummaryWriter import torch import torchvision from torchvision import datasets, transforms# 设置TensorBoard日志路径 writer SummaryWriter(runs/mnist)# 加载数据集 transform transforms.Comp…

Linux文件原生操作

Linux 中一切皆文件&#xff0c;那么 Linux 文件是什么&#xff1f; 在 Linux 中的文件 可以是&#xff1a;传统意义上的有序数据集合&#xff0c;即&#xff1a;文件系统中的物理文件 也可以是&#xff1a;设备&#xff0c;管道&#xff0c;内存。。。(Linux 管理的一切对象…

基于springboot+vue的流浪动物救助系统的设计与实现

开发语言&#xff1a;Java框架&#xff1a;springbootJDK版本&#xff1a;JDK1.8服务器&#xff1a;tomcat7数据库&#xff1a;mysql 5.7&#xff08;一定要5.7版本&#xff09;数据库工具&#xff1a;Navicat11开发软件&#xff1a;eclipse/myeclipse/ideaMaven包&#xff1a;…