SpringBoot整合消息中间件(ActiveMQ,RabbitMQ,RocketMQ,Kafka)

消息中间件

  • 消息
    • 消息队列
      • JMS
    • AMQP
    • MQTT
    • Kafka
  • Spring整合消息队列
    • 模拟消息队列的工作流程
    • Spring整合ActiveMQ
    • Spring整合RabbitMQ
      • 直连交换机模式
      • 主题交换机模式
    • Spring整合RocketMQ
    • Spring整合kafka

消息

消息的发送方:生产者
消息的接收方:消费者
同步消息:发送方发送消息到接收方,接收方有所回应后才能够进行下一次的消息发送
异步消息:不需要接收方回应就可以进行下一步的发送

消息队列

什么是消息队列?
消息队列
当此时有很多个用户同时访问服务器,需要服务器进行操作,但此时由于操作太多服务器运转不过来,这时将非常多的操作转换成消息的格式储存器来,所有的子服务器从中获取到消息进行操作分担主服务器的压力,而这个中间存储消息的容器我们一般称为消息队列

  • 企业级应用中广泛使用的三种异步消息传递技术(实现高并发的有效处理):
  1. JMS
  2. AMQP
  3. MQTT

JMS

(java Message Service):一个规范,等同于JDBC规范,提供了与消息服务相关的API接口

  • JMS消息模型
  1. peer-2-peer: 点对点模型,消息发送到一个队列中,队列保存信息,队列的消息只能被一个消费者消费,或超时
  2. publish-subscribe:发布订阅模式,消息可以被多个消费者消费,生产者和消费者完全独立,不需要感知对方存在
  • JMS消息种类

TextMessage,MapMessage, BytesMessage,StreamMessage,ObjectMessage,Message(Message只有消息头和属性)

  • 实现JMS规范的MQ

ActiveMQ,Redis,HornetMQ,RabbitMQ,RocketMQ(RocketMQ并未完全遵守JMS规范)

AMQP

AMQP(advanced message queuing protocol):一种协议(高级队列协议,消息代理规范),规范了网络交换的数据格式,兼容JMS

JMS存在一定的问题,JMS规范对对应的语言进行了规范,但若是我使用不是规范语言进行操作的时候就会出现问题,这时我们推出AMQP,这更像是一种协议,规范消息的格式,就是无论用什么语言什么环境都无所谓,它只人消息的格式

优点:跨平台性,服务器供应商,生产者,消费者可以使用不同的语言来实现

  • AMQP的消息模型

direct exchange,fanout exchange,topic exchange,headers exchange,system exchange

AMQP的消息种类:byte[]

  • 实现AMQP的MQ:

RabbitMQ,StormMQ,RocketMQ

MQTT

(Message Queueing Telemetry Transport)消息队列遥测传输,专为小设备设计,是物联网(IOT)生态系统中主要成分之一

Kafka

kafka,一种高吞吐量的分布式订阅消息系统,提供实时消息功能

Spring整合消息队列

模拟消息队列的工作流程

模拟消息队列的处理过程

import java.util.ArrayList;@Service
public class Messageservice implements MessageService {private ArrayList<String> megList=new ArrayList<String>();@Overridepublic void sendMessage(String id) {System.out.println("将待发送的消息订单纳入到处理队列.id:"+id);megList.add(id);}@Overridepublic String doMessage() {String remove = megList.remove(0);System.out.println("已完成短信业务的发送,id:"+remove);return remove;}
}

模拟将消息导入到消息队列

@Service
public class orderserviceimpl implements orderService {@Autowiredprivate MessageService messageService;@Overridepublic void order(String id) {//发送消息队列messageService.sendMessage(id);}
}

Spring整合ActiveMQ

首先下载activeMQ
下载地址:https://activemq.apache.org/components/classic/download/
下载之后进行解压缩

  • 启动服务

打开x64的bin目录下执行activemq.bat命令启动服务
在这里插入图片描述
在这里插入图片描述
启动成功 ,其中给出其web控制台的访问地址:
在这里插入图片描述
进入其管理界面:
在这里插入图片描述
默认用户名&密码:admin

  • SpringBoot进行整合activemq

添加依赖:

        <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-activemq</artifactId></dependency>

添加配置:
配置Spirng连接的地址,以及后边消息存入的位置

server:port: 80
spring:activemq:# 说明spring连接的active的端口地址broker-url: tcp://localhost:61616

进行消息队列的操作:


@Service
public class Messageservice implements MessageService {@Autowiredprivate JmsMessagingTemplate jmsMessagingTemplate;@Overridepublic void sendMessage(String id) {System.out.println("将待发送的消息订单纳入到处理队列.id:"+id);jmsMessagingTemplate.convertAndSend(id);}@Overridepublic String doMessage() {//将消息队列中的类型转移出来,并在参数中规定转移出来的消息类型String s = jmsMessagingTemplate.receiveAndConvert(String.class);System.out.println("已完成短信业务的发送,id:"+s);return s;}
}

在发送和获取期间也可以规定名称

 jmsMessagingTemplate.convertAndSend("order.shishi.id",id);String s = jmsMessagingTemplate.receiveAndConvert("order.shishi.id",String.class);

上述之中也有一个小问题,就是在并不是每次消费都需要进行访问,而是当消息队列中有消息就开始消费我们可以创建一个Listener


@Component
public class MessageListener {@JmsListener(destination = "order.shishi.id")public void receive(String id){System.out.println("已完成的短信业务:id:"+id);}
}

这样就自动监听指定位置下的消息,一有消息就自动开始消费,从服务开始就一直存在
还有一个消息转发的操作:

@Component
public class MessageListener {@JmsListener(destination = "order.shishi.id")@SendTo("order.bushi.id")public void receive(String id){System.out.println("已完成的短信业务:id:"+id);}
}

注解 @SendTo的作用是将监听到的消息消费之后将返回值返回到对应的消息中去
上述使用的都是点对点的模型,如果要使用发布订阅的模型,可以在配置文件中进行配置:

spring:activemq:broker-url: tcp://localhost:61616jms:template:default-destination: shishipub-sub-domain: true

Spring整合RabbitMQ

rabbitMQ基于Erlang语言编写,需要安装Erlang
首先需要下载Erlang:
下载地址:https://www.erlang.org/downloads
下载完成之需要重启一下操作系统(重启电脑)
配置环境变量
在这里插入图片描述
添加path:
在这里插入图片描述
安装完成后下载RabbitMQ
下载地址:https://rabbitmq.com/install-windows.html

  • 启动rabbitMQ

在这里插入图片描述
注意:要启动rabbitMQ服务需要命令行进入到管理员身份运行
rabbitMQ的控制台界面(需要手动配置插件):
在sbin目录下找到:rabbitmq:plugins.bat命令
在这里插入图片描述
执行命令展示其插件列表,通过命令开启插件
在这里插入图片描述
这样就可以访问它的控制台界面,端口号是15672,地址:http://localhost:15672
在这里插入图片描述
输入默认密码:guest

Spring进行整合rabbitMQ首先添加依赖:

        <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>

在配置文件中进行rabbits的配置:

spring:activemq:broker-url: tcp://localhost:61616jms:template:default-destination: shishipub-sub-domain: truerabbitmq:host: localhostport: 5672

直连交换机模式

使用直连模式的交换机进行消息队列的开发:
首先需要在配置类中进行直连交换机与消息队列的绑定


import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class ConfigQM {@Beanpublic Queue directQueue(){//第一个是消息队列的名称,第一个true表示消息持久化,第二个表示当前的消息队列是否是连接专用(连接一关消息队列就关闭),第三个参数是是否删除(当消费者生产者都不使用就删除)return new Queue("direct_queue",true,true,true);}//我们需要一个交换机去绑定消息队列,此处设置一个交换机@Beanpublic DirectExchange directExchange(){return new DirectExchange("directexchange");}@Beanpublic Binding binding(){//将消息队列与交换机进行绑定return BindingBuilder.bind(directQueue()).to(directExchange()).with("direct");}
}

绑定之后通过直连交换机进行消息的存储

@Service
public class amqpservice implements MessageService{@Autowiredprivate AmqpTemplate amqpTemplate;@Overridepublic void sendMessage(String id) {//使用直连交换机amqpTemplate.convertAndSend("directExchange","direct",id);}@Overridepublic String doMessage() {return null;}
}

然后从消息队列中读取消息写在rabbitMQ监听器下面:

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class RabbitMQListener {@RabbitListener(queues = "direct_queue")public void reveive(String id){System.out.println("已完成短信发送业务 id:"+id);}
}

主题交换机模式

主题交换机可以模糊设置交换机绑定的名称来达到分发的目的
例如:

    @Beanpublic Binding binding(){//将消息队列与交换机进行绑定return BindingBuilder.bind(directQueue()).to(directExchange()).with("topic_*_id");}

在消息进入消息队列的时候:

        amqpTemplate.convertAndSend("directExchange","topic_ni_id",id);amqpTemplate.convertAndSend("directExchange","topic_bu_id",id);

这个两种消息都可以进入到消息队列中去,而且通过这种方式也可以使消息进入到不同的消息队列中去

  • 绑定案件的规则:

*(星号):用来表示一个单词,且该单词必须出现
#(井号):用来表示任意数量
在这里插入图片描述

Spring整合RocketMQ

下载地址:https://rocketmq.apache.org/
默认服务端口:9876
配置环境变量:ROCKETMQ_HOME,PATH,NAMESER_ADDR(建议):127.0.0.1:9876

  • 命名服务器与broker

在这里插入图片描述
当后期的业务服务器增多时,就需要不停的进行服务器之间的连接,会变得非常繁琐,但是如果我们有一台服务器将所有的业务服务器注册进行,消费者与生产者只需要连接命名服务器即可

  • 首先启动命名服务器
    在这里插入图片描述
    双击文件启动命名服务器
    在这里插入图片描述
    然后双击mqbroker文件启动服务器:
    在这里插入图片描述
    如何测试服务器是否正常启动:
    在bin目录下启动cmd:
    在这里插入图片描述

首先使用第一个命名生成对应的消息:
在这里插入图片描述
再使用第二个命令对生成的消息进行消费:
在这里插入图片描述
进行整合:
首先导入依赖坐标:

        <dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.2.1</version></dependency>

在配置文件中配置其命名服务器:
rocketmq是与spring在同一层次下

rocketmq:name-server: localhost:9876producer:group: group_rocketmq

进行消息队列的相关操作:

@Service
public class MessageRocketmqimpl implements MessageService {@Autowiredprivate RocketMQTemplate rocketMQTemplate;@Overridepublic void sendMessage(String id) {rocketMQTemplate.convertAndSend("sdasda",id);}@Overridepublic String doMessage() {return null;}
}

消费者监听器:

 
@Component
@RocketMQMessageListener(topic = "sdasda",consumerGroup = "group_rocketmq")
public class MessageRocketmqListener implements RocketMQListener<String> {@Overridepublic void onMessage(String s) {System.out.println("id:"+s);}
}

使用异步方式进行发送:


@Service
public class MessageRocketmqimpl implements MessageService {@Autowiredprivate RocketMQTemplate rocketMQTemplate;@Overridepublic void sendMessage(String id) {SendCallback callback=new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {System.out.println("消息发送成功");}@Overridepublic void onException(Throwable throwable) {System.out.println("消息发送失败");}}
//        rocketMQTemplate.convertAndSend("sdasda",id);rocketMQTemplate.asyncSend("sdasda",id,callback);}
  • 同步发送与异步发送的区别:

同步发送和异步发送是两种不同的消息发送方式。在同步发送中,发送线程会等待消息发送完成并收到发送结果后继续执行,而在异步发送中,发送线程不会阻塞,可以立即执行后续逻辑。选择哪种方式取决于业务需求和对消息发送结果的要求。

Spring整合kafka

下载地址:https://kafka.apache.org/downloads
下载之后进行解压缩文件
解压之后首先需要运行:zookeeper-server-start.bat文件
这个文件相当于一个注册中心,需要先进行注册才能够启动kafka服务器,作用相当于RocketMQ中的命名服务器,需要在对应目录下cmd命令携带参数进行启动:
在这里插入图片描述
启动注册服务器后,然后开启kafka服务器:
在这里插入图片描述
在这里插入图片描述
spring进行整合kafka:
导入依赖坐标:

        <dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency>

在配置文件中进行配置,配置注册服务器的地址:

  kafka:bootstrap-servers: localhost:9092consumer:group-id: order
@Service
public class kafka implements MessageService {@Autowiredprivate KafkaTemplate<String,String> kafkaTemplate;@Overridepublic void sendMessage(String id) {kafkaTemplate.send("adad",id);}@Overridepublic String doMessage() {return null;}
}

创建消费者监听器:


@Component
public class kafkaListener {@KafkaListener(topics = "adad")public void onMessage(ConsumerRecord<String,String> consumerRecord){System.out.println("id:"+consumerRecord.value());}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/819093.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

基于SSM项目高校在线请假与审批系统

采用技术 基于SpringBoot框架实现的web的智慧社区系统的设计与实现~ 开发语言&#xff1a;Java 数据库&#xff1a;MySQL 技术&#xff1a;SpringMVCMyBatis 工具&#xff1a;IDEA/Ecilpse、Navicat、Maven 简介 本系统实现了管理员&#xff0c;教师&#xff0c;学生三个模…

OSPF动态路由实验(华为)

思科设备参考&#xff1a;OSPF动态路由实验&#xff08;思科&#xff09; 一&#xff0c;技术简介 OSPF&#xff08;Open Shortest Path First&#xff09;是一种内部网关协议&#xff0c;主要用于在单一自治系统内决策路由。它是一种基于链路状态的路由协议&#xff0c;通过…

Linux下SPI设备驱动实验:实现SPI发送/接收数据的函数

一. 简介 前面文章介绍了SPI设备数据收发处理流程&#xff0c;后面几篇文章实现了SPI设备驱动框架&#xff0c;加入了字符设备驱动框架代码。文章如下&#xff1a; SPI 设备驱动编写流程&#xff1a;SPI 设备数据收发处理流程中涉及的结构体与函数-CSDN博客 SPI 设备驱动编写…

【洛谷 P8802】[蓝桥杯 2022 国 B] 出差 题解(带权无向图+单源最短路+Dijkstra算法+链式前向星+最小堆)

[蓝桥杯 2022 国 B] 出差 题目描述 A \mathrm{A} A 国有 N N N 个城市&#xff0c;编号为 1 … N 1 \ldots N 1…N 小明是编号为 1 1 1 的城市中一家公司的员工&#xff0c;今天突然接到了上级通知需要去编号为 N N N 的城市出差。 由于疫情原因&#xff0c;很多直达的交…

MXXE利用XXE漏洞快速获取服务器敏感文件工具

https://github.com/MartinxMax/MXXE 关于 MXXEV1.2升级版,快速获取服务器敏感文件 获取Windows服务器敏感文件 把数据包复制到payload.txt进行自动注入 $ python3 MXXE.py -lh 10.10.16.5 -user Daniel -server windows 幸运的是我们找到了服务器的私匙 获取Linux服务器敏感…

LeetCode-706. 设计哈希映射【设计 数组 哈希表 链表 哈希函数】

LeetCode-706. 设计哈希映射【设计 数组 哈希表 链表 哈希函数】 题目描述&#xff1a;解题思路一&#xff1a;超大数组解题思路二&#xff1a;拉链法解题思路三&#xff1a; 题目描述&#xff1a; 不使用任何内建的哈希表库设计一个哈希映射&#xff08;HashMap&#xff09;。…

数字孪生与企业

数字孪生技术&#xff0c;简而言之&#xff0c;就是创造一个物理实体的数字双胞胎&#xff0c;在虚拟世界中精确模拟现实世界的行为、过程和系统。这种技术的核心在于&#xff0c;它允许我们在数字环境中实时地监控、分析和优化其物理对应物的性能和效率。数字孪生的应用场景极…

【深入理解】width 的默认值,2024年最新面试复盘

先自我介绍一下&#xff0c;小编浙江大学毕业&#xff0c;去过华为、字节跳动等大厂&#xff0c;目前阿里P7 深知大多数程序员&#xff0c;想要提升技能&#xff0c;往往是自己摸索成长&#xff0c;但自己不成体系的自学效果低效又漫长&#xff0c;而且极易碰到天花板技术停滞…

Win 运维 | Windows Server 系统事件日志浅析与日志审计实践

[ 重剑无锋&#xff0c;大巧不工。] 大家好&#xff0c;我是【WeiyiGeek/唯一极客】一个正在向全栈工程师(SecDevOps)前进的技术爱好者 作者微信&#xff1a;WeiyiGeeker 公众号/知识星球&#xff1a;全栈工程师修炼指南 主页博客: 【 https://weiyigeek.top 】- 为者常成&…

leetcode代码记录(Z 字形变换

目录 1. 题目&#xff1a;2. 我的代码&#xff1a;小结&#xff1a; 1. 题目&#xff1a; 将一个给定字符串 s 根据给定的行数 numRows &#xff0c;以从上往下、从左到右进行 Z 字形排列。 比如输入字符串为 “PAYPALISHIRING” 行数为 3 时&#xff0c;排列如下&#xff1a;…

《QT实用小工具·二十五》日志重定向输出

1、概述 源码放在文章末尾 日志重定向输出&#xff0c;包含如下功能&#xff1a; 支持动态启动和停止。支持日志存储的目录。支持网络发出打印日志。支持输出日志上下文信息比如所在代码文件、行号、函数名等。支持设置日志文件大小限制&#xff0c;超过则自动分文件&#xf…

Unity笔记之下拉刷新列表

这样的效果&#xff1b; 代码&#xff1a; using System; using System.Collections; using System.Collections.Generic; using Sirenix.OdinInspector; using UnityEngine; using UnityEngine.EventSystems; using UnityEngine.UI;public class ScrollRectUpdateView : Mon…

NLP中的Transformer,一文掌握

Transformer变压器模型的出现 2017 年&#xff0c;Vaswani 等人在关键论文“Attention is All You Need”中介绍了 Transformer 模型&#xff0c;它标志着与以前占主导地位的基于递归神经网络的模型&#xff08;如 LSTM&#xff08;长短期记忆&#xff09;和 GRU&#xff08;门…

北京大学快手发布统一的图文视频生成大模型Video-LaVIT

随着多模态大语言模型&#xff08;LLMs&#xff09;的新发展&#xff0c;人们越来越关注如何将它们从图像-文本数据扩展到更具信息量的真实世界视频。与静态图像相比&#xff0c;视频为有效的大规模预训练带来了独特的挑战&#xff0c;因为需要对其时空动态进行建模。 针对视频…

【JavaEE初阶系列】——网络原理之进一步了解应用层以及传输层的UDP协议

目录 &#x1f6a9;进一步讲应用层 &#x1f388;自定义应用层协议 &#x1f388;用什么格式组织 &#x1f469;&#x1f3fb;‍&#x1f4bb;xml(远古的数据组织格式) &#x1f469;&#x1f3fb;‍&#x1f4bb;json(当下最流行得一种数据组织格式) &#x1f469;&…

[lesson31]完善的复数类

完善的复数类 完善的复数类 复数类应该具有的操作 运算&#xff1a;&#xff0c;-&#xff0c;*&#xff0c;/比较&#xff1a;&#xff0c;!赋值&#xff1a;求模&#xff1a;modulus 利用操作符重载 统一复数与实数的运算方式统一复数与实数的比较方式 注意事项 C规定赋…

【max材质addtive叠加模式特效渲染不出通道的解决办法】

max材质addtive叠加模式特效渲染不出通道的解决办法 2021-12-22 18:15 max的scanline扫描线&#xff0c;vray渲染可以&#xff0c;红移不行(只支持它自己的材质&#xff0c;它自己的材质没有additive模式)。据说mr是可以的。 右侧的球体使用附加不透明度。 附加不透明度通过将…

C++内存管理与模版(用法详解)

C/C中程序内存区域划分 内核空间&#xff08;用户代码不能读写&#xff09;栈&#xff08;函数中存放的变量&#xff09;内存映射段堆&#xff08;重点&#xff09;数据段&#xff08;静态区&#xff09;全局变量 / 静态变量代码段&#xff08;常量区&#xff09; 试分析下列…

Opencv3.4+FFMpeg3.4+pkg-config交叉编译arm开发板

Ubuntu16.04 64位 FFmpeg3.4 OpenCv3.4 一、下载FFmpeg https://github.com/FFmpeg/FFmpeg 1.配置 ./configure --prefix/home/zeng/ffmpeg_install --enable-cross-compile --cross-prefixarm-linux-gnueabihf- --ccarm-linux-gnueabihf-gcc --target-oslinux --cpuco…

负载均衡器如何工作,为什么如此重要?

现代应用程序和网站处理大量流量。负载均衡器是保证大型系统平稳运行的主要工具之一。 负载平衡器负责跨多个服务器路由客户端请求以分配负载并防止出现瓶颈。 这有助于最大限度地提高吞吐量、减少响应时间并优化资源使用。 负载均衡器的运行情况&#xff1a; (1).客户端请…