一文弄懂SpringCloud Stream

目录

    • SpringCloud Stream
    • SpringCloud Stream相关概念
    • SpringCloud Stream使用

SpringCloud Stream

Spring Cloud Stream 是一个构建消息驱动微服务的框架,Spring Cloud Stream 提供了一个抽象层,屏蔽了不同消息中间件之间的差异,使得开发人员可以不再关注具体的消息中间件,只需关注Binder对应用程序提供的抽象概念来使用消息中间件实现业务。

程序模型
在这里插入图片描述
Spring Cloud Stream的核心与中间件实现无关。Stream应用通过输入输出通道(channel)来与外界交互。通道(channel)通过与外部中间件对应的绑定器(Binder)具体实现,来与外部的中间件产品进行通信。

Spring Cloud Stream提供了对 Kafka, RabbitMQ等中间件的绑定实现。

SpringCloud Stream相关概念

Channel(通道):Channel是消息的传输管道,用于在生产者和消费者之间传递消息。生产者通过输出通道将消息发送到Destination,消费者通过输入通道从Destination接收消息。

在Spring Cloud Stream中,有两种类型的通道:输入(input)和输出(output)。这两种通道分别用于消费者接收消息和生产者发送消息。

  1. Input(输入):Input通道用于消费者从消息代理接收消息。消费者可以通过监听Input通道来实时接收传入的消息。在应用程序中,可以使用@StreamListener注解将方法标记为Input通道的监听器,并在方法参数中指定接收到的消息类型。

  2. Output(输出):Output通道用于生产者向消息代理发送消息。生产者可以通过向Output通道发送消息来发布新的消息。在应用程序中,可以使用@Output注解定义一个Output通道,然后在需要发送消息的方法上使用MessageChannel或OutputStream参数来将消息发送到Output通道。

Destination(目标):Destination是消息的目的地,通常对应于消息代理中的Topic或Queue。生产者将消息发送到特定的Destination,消费者从其中接收消息。

Binder(绑定器):Binder是Spring Cloud Stream的核心组件之一。它作为消息代理与外部消息中间件进行交互,并负责将消息发送到消息总线或从消息总线接收消息。Binder负责处理消息传递、序列化、反序列化、消息路由等底层细节,使得开发者能够以统一的方式与不同的消息中间件进行交互。Spring Cloud Stream提供了多个可用的Binder实现,包括RabbitMQ、Kafka等。

消费者组:在Spring Cloud Stream中,消费组(Consumer Group)是一组具有相同功能的消费者实例。当多个消费者实例属于同一个消费组时,消息代理会将消息均匀地分发给消费者实例,以实现负载均衡。如果其中一个消费者实例失效,消息代理会自动将消息重新分配给其他可用的消费者实例,以实现高可用性。

注:对于一个消息来说,每一个消费者组只会有一个消费者消费消息

分区:Spring Cloud Stream支持在多个消费者实例之间创建分区,这样我们通过某些特征量做消息分发,保证相同标识的消息总是能被同一个消费者处理

SpringCloud Stream使用

1、添加依赖

根据我们使用的中间件来选择我们的依赖,因为我使用的是kafka,所以使用的是spring-cloud-stream-binder-kafka依赖,该依赖会帮我们引入Spring Cloud Stream 和kafka的相关依赖

<dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-stream-binder-kafka</artifactId><version>${binder.version}</version>
</dependency>

如果你使用的是RabbitMQ,那么请使用以下依赖

<dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-stream-binder-rabbit</artifactId><version>${binder.version}</version>
</dependency>

2、配置文件配置

下面是kafka的简单配置,关于配置的详细介绍,我会在下一篇文章进行介绍

spring:cloud:stream:kafka:binder:brokers: <kafka_broker_address>  # Kafka broker地址bindings:myInput:destination: <input_topic>  # 输入通道对应的Kafka主题名称myOutput:destination: <output_topic>  # 输出通道对应的Kafka主题名称

3、创建绑定接口类,定义输入和输出通道

/*** 定义输出和输入通道*/
public interface MyProcessor {String INPUT = "myInputChannel";String OUTPUT = "myOutputChannel";/***监听一个通道,通道名为TNPUT的值*/@Input(INPUT)SubscribableChannel myInputChannel();/*** 发送消息到输出通道,通道门为OUTPUT的值* @return*/@Output(OUTPUT)MessageChannel myOutputChannel();}

在上述示例中,MyProcessor是一个绑定接口,定义了一个名为myInputChannel的输入通道和一个名为myOutputChannel的输出通道。通过@Input和@Output注解来标识通道的名称。

输入和输出通道的定义我们可以定义在一个接口,也可以输入通道一个接口,输出通道一个接口

4、创建绑定接类

通过@EnableBinding注解将绑定接口绑定到应用程序的逻辑处理器

使用@EnableBinding时,需要指定一个或多个接口类作为参数,这些接口类包含表示可绑定组件(通常是消息通道)的方法。Spring Cloud Stream会自动扫描这些接口类,并根据配置创建相应的消息代理中间件和应用程序之间的连接。

然后我们可以使用@StreamListener注解,指定一个方法作为消息的监听器,当消息到达时会自动调用该方法进行处理。

/*** 指定接口来绑定消息通道,多个接口使用逗号分隔*/
@EnableBinding(MyProcessor.class)
public class MyProcessorHandler {/*** 处理输入通道的消息* @param message*/@StreamListener(INPUT)public void handleInputMessage(String message) {System.out.println(message);}/*** 处理输出通道的消息* @param message*/@StreamListener(OUTPUT)public void handleOutputMessage(String message) {System.out.println(message);}
}

在上述示例中,MyProcessor是一个绑定接口,INPUT和OUTPUT分别是输入和输出通道的名称。handleInputMessage和handleOutputMessage方法用于处理从输入通道和输出通道接收到的消息。

当使用了@EnableBinding注解,Spring Cloud Stream会自动创建与绑定接口中定义的通道相关的Bean,并将其添加到应用程序的上下文中。这些Bean就可以通过自动装配(@Autowired)来进行访问和使用了,下面我们就使用通道来发送消息

5、发送消息

@Component
public class SendUtil {@Autowiredprivate MyProcessor myProcessor;/*** 发送消息* @param msg*/public void sendMsg(String msg){myProcessor.myOutputChannel().send(MessageBuilder.withPayload(msg).build());}}

上述例子是通过myOutputChannel通道来发送消息,因为@EnableBinding注解帮我们生成了MyProcessor 的bean,所以我们可以直接注入使用

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/608448.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Java学习笔记-day05-响应式编程初探-自定义实现Reactive Streams规范

最近在学响应式编程&#xff0c;这里先记录下&#xff0c;响应式编程的一些基础内容 1.名词解释 Reactive Streams、Reactor、WebFlux以及响应式编程之间存在密切的关系&#xff0c;它们共同构成了在Java生态系统中处理异步和响应式编程的一系列工具和框架。 Reactive Streams…

3D人体姿态估计

3D人体姿态估计是指通过算法对输入的图像或视频进行分析&#xff0c;推断出人体的三维姿态信息。该技术可以应用于许多领域&#xff0c;如虚拟现实、运动分析、人机交互等。 1. 算法原理&#xff1a; 3D人体姿态估计利用深度学习模型作为算法的核心&#xff0c;通过网络学习人…

html js加载本地文件报错处理,跨域问题

这个问题是怎么来的&#xff1f;我写了一个本地html文件&#xff0c;里面通过three.js加载并显示一个本地三维模型&#xff0c;结果报错了。 报错如下&#xff1a; Access to XMLHttpRequest at file:///C:/model/quater.mtl from origin null has been blocked by CORS poli…

是面试官放水,还是公司实在是太缺人?这都没挂,字节原来这么容易进....

“字节是大企业&#xff0c;是不是很难进去啊&#xff1f;” “在字节做软件测试&#xff0c;能得到很好的发展吗&#xff1f; 一进去就有11.5K&#xff0c;其实也没有想的那么难” 直到现在&#xff0c;心情都还是无比激动&#xff01; 本人211非科班&#xff0c;之前在字节和…

uni-app发版及分包要求

uni-app发版及分包要求 发版 注意&#xff0c;小程序的接口不允许http&#xff0c;只支持https。仅仅是https还不够&#xff0c;正式版和体验版上的接口功能实现还需要将接口地址添加到开发管理——开发设置——服务器域名——request合法域名中去。否则&#xff0c;手机预览…

Spark---RDD(双值类型转换算子)

文章目录 1.RDD双值类型算子1.1 intersection1.2 union1.3 subtract1.4 zip 1.RDD双值类型算子 RDD双Value算子就是对两个RDD进行操作或行动&#xff0c;生成一个新的RDD。 1.1 intersection 对源 RDD 和参数 RDD 求交集后返回一个新的 RDD 函数定义&#xff1a; def inters…

解读 Sobit v2:铭文资产跨链更注重安全、易用性

铭文市场的发展正在从早期的“无序”进入到“有序”阶段&#xff0c;我们看到从 12 月份以来&#xff0c;比特币生态内的多个应用纷纷宣布获得融资。这表明&#xff0c;目前仍旧有大量的资金有意向铭文领域&#xff0c;同样铭文赛道新一轮浪潮或许正在酝酿。 另一方面&#xff…

【设计模式-01】Singleton单利模式

一、方式1(最常用&#xff0c;推荐使用) 单例实现方式一: 饿汉式 类加载到内存后&#xff0c;就实例化一个单例&#xff0c;JVM保证线程安全 简单实用&#xff0c;推荐使用。 唯一缺点: 不管用到与否&#xff0c;类装载时就完成加载。 /*** description: 单例实现方式一: 饿汉…

Java 求2个整数,3个整数 的 10等分比例值

10等份取整比 比如 1.5 &#xff1a; 4 &#xff1a; 4.5 会变成 1&#xff1a;4&#xff1a;5 &#xff0c;当然小数后一位的四舍五入是向上还是向下去整&#xff0c;这个根据自己需要调整即可。 代码 &#xff1a; public static Integer getIntTenPerNum(Integer nu…

YOLOv8改进 | Neck篇 | 利用ASF-YOLO改进特征融合层(适用于分割和目标检测)

一、本文介绍 本文给大家带来的改进机制是ASF-YOLO(发布于2023.12月份的最新机制),其是特别设计用于细胞实例分割。这个模型通过结合空间和尺度特征,提高了在处理细胞图像时的准确性和速度。在实验中,ASF-YOLO在2018年数据科学竞赛数据集上取得了卓越的分割准确性和速度,…

Java项目:115SSM宿舍管理系统

博主主页&#xff1a;Java旅途 简介&#xff1a;分享计算机知识、学习路线、系统源码及教程 文末获取源码 一、项目介绍 宿舍管理系统基于SpringSpringMVCMybatis开发&#xff0c;系统主要功能如下&#xff1a; 学生管理班级管理宿舍管理卫生管理维修登记访客管理 二、技术框…

网络安全新形势下的动态防御体系研究(上)

文章目录 前言一、网络安全的趋势二、网络安全背景&#xff08;一&#xff09;整体形势对网络安全防护提出新挑战&#xff08;二&#xff09;发展对网络安全防护提出新目标 三、网络安全现状分析&#xff08;一&#xff09;国外网络安全现状分析&#xff08;二&#xff09;国内…

短视频实景直播源码+短视频矩阵+多平台分发技术搭建

建立一个短视频实景直播平台&#xff0c;需要以下几个关键组成部分&#xff1a; 短视频实景直播源码&#xff1a;需要开发或购买适用于短视频实景直播的源码。这个源码可以包括实时视频流的采集和传输、直播界面的展示、弹幕功能、礼物打赏等特色功能。可以使用常见的开发框架如…

【教程】代码混淆详解

【教程】代码混淆详解 本文将对代码混淆进行详细解释&#xff0c;并介绍ProGuard代码混淆器以及Ipa Guard工具的使用方法。首先&#xff0c;我们将了解代码混淆的概念和作用&#xff0c;然后深入讨论ProGuard混淆文件的参数设置以及代码混淆的方法。接着&#xff0c;我们将介绍…

解决spring-session-data-redis包redis的session失效时间设置失败问题

这个属于是本人问题&#xff0c;小脑萎缩了 我使用了 EnableRedisHttpSession 这个注解 经过查询这个注解是需要过期时间的 EnableRedisHttpSession(maxInactiveIntervalInSeconds 3600,redisNamespace "tl") 像这样 可以在参数中设置过期时间&#xff0c;只要你…

Java_Swing程序设计

swing组件允许编程人员在跨平台时指定统一的外观和风格。 Swing组件通常被称为轻量级组件&#xff0c; JFrame在程序中的语法格式&#xff1a; JFrame jfnew JFrame(title); Container containerjf.getContentPane(); jf:JFrame类的对象 container:Container类的对象。 J…

腾讯云优惠券怎么获取(腾讯云优惠券在哪领取)

随着云计算技术的快速发展&#xff0c;越来越多的企业开始选择使用云服务来降低成本、提高效率。腾讯云作为国内领先的云服务提供商之一&#xff0c;也提供了丰富的优惠券政策来吸引更多的用户。本文将介绍如何获取腾讯云的优惠券&#xff0c;以及如何使用这些优惠券来获得更好…

基于SpringBoot的康复中心管理系统 JAVA简易版

目录 一、摘要1.1 项目介绍1.2 项目录屏 二、功能模块2.1 普通用户模块2.2 护工模块2.3 管理员模块 三、系统展示四、核心代码4.1 查询康复护理4.2 新增康复训练4.3 查询房间4.4 查询来访4.5 新增用药 五、免责说明 一、摘要 1.1 项目介绍 基于JAVAVueSpringBootMySQL的康复中…

A借助AI工具提升电子邮件营销内容效果

随着互联网的普及和电子邮件的广泛应用&#xff0c;邮件营销已成为企业推广产品和服务的重要手段之一。为了提高邮件营销的效果&#xff0c;我们需要关注邮件内容的质量和吸引力。而百度文言一心等AI工具作为一款强大的在线写作工具&#xff0c;可以帮助我们提升邮件营销内容的…

MySql01:初识

1.mysql数据库2.配置环境变量3. 列的类型和属性&#xff0c;索引&#xff0c;注释3.1 类型3.2 属性3.3 主键(主键索引)3.4 注释 4.结构化查询语句分类&#xff1a;5.列类型--表列类型设置 1.mysql数据库 数据库&#xff1a; ​ 数据仓库&#xff0c;存储数据&#xff0c;以前我…