spring Cloud Stream 实战应用深度讲解

springCloudStream

简介

Spring Cloud Stream是一个框架,用于构建与共享消息传递系统连接的高度可扩展的事件驱动微服务。

该框架提供了一个灵活的编程模型,该模型建立在已经建立和熟悉的 Spring 习惯用语和最佳实践之上,包括对持久发布/订阅语义、消费者组和有状态分区的支持。

核心模块

  • Destination Binders: 负责提供与外部消息系统集成的组件
  • Destination Bindings: 外部消息系统和用户程序代码之间的桥梁(生产者-使用者之间的桥梁)
  • Message:生产者和消费者用于与Destination Binders(以及通过外部消息系统与其他应用程序)通信的规范数据结构。

历史

Spring 的数据集成之旅始于 Spring Integration。通过其编程模型,它提供了一致的开发人员体验来构建应用程序,这些应用程序可以采用企业集成模式来连接外部系统,例如数据库、消息代理等。

快进到云时代,微服务在企业环境中变得突出。Spring Boot 改变了开发人员构建应用程序的方式。借助 Spring 的编程模型和 Spring Boot 处理的运行时职责,可以无缝开发独立的、基于 Spring 的生产级微服务。

为了将其扩展到数据集成工作负载,Spring Integration 和 Spring Boot 被放在一个新项目中。Spring Cloud Stream 诞生了。

架构模型

在这里插入图片描述

这张图是spring-stream官网的,里面的Middleware指的就是RabbitMQ或者KafKa这些消息队列。

下图是我们原来和消息队列通信的方式。我们的程序直接发送数据给MQ或者监听到MQ的数据。

在这里插入图片描述

通过spring stream来做的话,就增加了Binder层来做统一调度,我们的程序只需要和Binder层通信,不需要关注底层的MQ是RabbitMQ还是Kafka

目前官方提供了两个Binder,分别是RabbitMQ的和Kafka的,其余队列的有一些第三方维护的。同时我们也可以自己实现Binder

一开始图中的InputOutput是对于spring stream来说的,input就是输入消息到stream中,output就是输出消息到我们的程序中。

简单介绍一下Binder,其实就是策略模式,统一接口实现,比如MQ1里面发送消息到MQ的方法叫Publish,MQ2里面发送消息到MQ的方法叫Release,但是在Binder接口里面提供了一个方法,就叫做add。也只需要提供一个Message消息。

public interface Binder{function add(Message msg);
}// 连接MQ1的Binder
public class Binder1 implements Binder{public function add(Message msg){// 消息处理// 发送到MQ1publish(msg);}
}// 连接MQ2的Binder
public class Binder2 implements Binder{public function add(Message msg){// 消息处理// 发送到MQ2release(msg);}
}

当我们使用的时候只需要自己决定使用哪个Binder就可以了。就是就和连接数据库一样,不需要关心连接的是Mysql还是PostgreSql。

public class main{public static function main() {Binder binder = new Binder1();Message msg = new Message();binder.add(msg);}
}
Bindings

Bindings作为一个桥梁,负责连接MQ和用户代码。比如绑定一个代码作为input往某一个Queue里面输入信息,绑定一个代码作为output从某个Queue里面接收信息。然后我们使用Binder来实现推送消息到MQ和消费消息。

这里是官网原文:The application communicates with the outside world by establishing bindings between destinations exposed by the external brokers and input/output arguments in your code. Broker specific details necessary to establish bindings are handled by middleware-specific Binder implementations.

下图为Bindings和Binder的关系

在这里插入图片描述

source 和 sink

source其实就是发送方的发送的Message. sink就是接收方接受的Message

注解实现

注解的实现已经被彻底删除,只有之前低版本的还能使用

函数式编程实现示例

依赖引入

将下面的代码加入pom文件,然后使用maven导入相关依赖即可。

// 引入spring cloud stream依赖
<dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-stream</artifactId>
</dependency>
// 引入spring cloud stream的rabbit binder依赖
// 如果是kafka,那么把这个换成kafka的binder
// 在这个binder里面已经引入了 rabbit MQ依赖,所以不需要再单独引入rabbit MQ了
<dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-stream-binder-rabbit</artifactId>
</dependency>

配置文件

server:port: 8801spring:application:name: cloud-stream-providercloud:stream: # stream的配置binders: # 在此处配置要绑定的rabbitmq的服务信息;defaultRabbit: # 表示定义的名称,用于于binding整合type: rabbit # 消息组件类型environment: # 设置rabbitmq的相关的环境配置spring:rabbitmq:host: localhostport: 5672username: guestpassword: guest

生产者

配置文件修改

对于函数式编程来说,spring cloud stream有一些约定或者说规定。比如我们注册了一个logPubBean,那么它对应的bindings配置的名称就是logPub-in-0或者logPub-out-0,前面是我们的方法名,中间表示生产者或消费者,in表示消费者,out表示生产者。这里的in or out是对于我们的代码来说的。后面的0就是一个序号。

写生产者之前我们需要加上对应的bindings配置。如果注册了多个Bean作为生产者或消费者,那么还需要配置哪些Bean是生产者和消费者。


spring:cloud:function: # 配置哪些Bean是Stream可以用的definition: log;logPub;sendLogstream: # stream的配置bindings: # 服务的整合处理logPub-out-0:destination: log # 表示要使用的Exchange名称定义,不存在会自动创建content-type: application/json # 设置消息类型,本次为json,文本则设置“text/plain”
写代码

随便新建一个类,并标记为@Component,主要是要让spring知道这个类。类名可以随便起。

@Component
public class logProducer {}

然后开始编写生产者的代码。加入主要的方法log,方法名可以随便起,只需要记得把这个方法注册为一个Bean就可以了。一定要在上面加@Bean注解。

方法的返回值只能是Supplier函数接口类型。不能是其他的。

方法里面可以写生产者的具体代码。会注册一个名为logPubBean作为生产者。

@Component
public class logListener {@Beanpublic Supplier<logListener.Person> logPub() {return () -> {Person person = new Person();person.setName("张三");System.out.println("生产者:"+person);return person;};}public static class Person {private String name;public String getName() {return name;}public void setName(String name) {this.name = name;}@Overridepublic String toString() {return this.name;}}
}

关于Supplier,这个是java提供的函数式编程的接口。从java8开始提供的,java8里面的stream功能也用到了函数式编程。

下面是Supplier的注释和定义

//Represents a supplier of results.
//There is no requirement that a new or distinct result be returned each time the supplier is invoked.
//This is a functional interface whose functional method is get().public interface Supplier<T>

翻译过来大概就是:一个结果的提供者或者一个结果的生产者。正好对应我们的生产者。该接口只有一个方法T get(),没有参数并且仅返回一个结果。

运行

运行的话会发现控制台一直在打印。我们的队列里面也一直在新增。

在这里插入图片描述

StreamBridge

当前的运行方式是当写完生产者以后,spring cloud stream会1/s次来调用我们的生产者,但是我们一般是自己来控制生产者的调用。就可以使用下面的方法。

我们可以通过StreamBridge来做到这一点。他有四个send方法。

  • public boolean send(String bindingName, Object data):第一个参数是bindingName,我们输入的是sendLog,就需要增加sendLog的配置,我们也可以用之前的logPub-out-0。第二个参数是发送的数据。
  • public boolean send(String bindingName, Object data, MimeType outputContentType):比上面的多了一个数据类型。
  • public boolean send(String bindingName, @Nullable String binderName, Object data):还可以指定Binder的name
  • public boolean send(String bindingName, @Nullable String binderName, Object data, MimeType outputContentType): 四个参数放在一起了。
@RestController
public class logController {@Autowiredprivate StreamBridge streamBridge;@GetMapping("/sendLog")public void sendLog() {logListener.Person person = new logListener.Person();person.setName("李四");System.out.println("生产者发送消息"+person);streamBridge.send("sendLog", person);}
}

消费者

随便新建一个类,并标记为@Component,主要是要让spring知道这个类。类名可以随便起。

@Component
public class logListener {}

然后开始编写消费者的代码。加入主要的方法log,方法名可以随便起,只需要记得把这个方法注册为一个Bean就可以了。一定要在上面加@Bean注解。

方法的返回值可以是Consumer,也可以是Function。不能是其他的。

方法里面就可以写消费的具体代码了。

@Component
public class logListener {@Beanpublic Consumer<logListener.Person> log() {return person -> {System.out.println("Received: " + person);};}public static class Person {private String name;public String getName() {return name;}public void setName(String name) {this.name = name;}@Overridepublic String toString() {return this.name;}}
}

关于ConsumerFunction,这两个是java提供的函数式编程的接口。从java8开始提供的,java8里面的stream功能也用到了函数式编程。

下面是Consumer接口的注释和接口的定义。

//Represents an operation that accepts a single input argument and returns no result. Unlike most other functional interfaces, Consumer is expected to operate via side-effects.
//This is a functional interface whose functional method is accept(Object).public interface Consumer<T>

翻译过来大概就是说Consumer接口仅接收一个参数并且没有返回值,我们的代码里面也可以看到,接收了一个person参数,没有return。

该接口只有一个方法void accept(T t),T类型就是我们的Person类型。

下面是Function接口的注释和定义

//Represents a function that accepts one argument and produces a result.
//This is a functional interface whose functional method is apply(Object).
public interface Function<T, R>

翻译过来大概就是说Function接口仅接收一个参数并且返回一个结果。该接口只有一个方法R apply(T t),接收一个T类型的参数,返回一个R类型的结果。

在这里插入图片描述

手动ACK

通过禁止使用死信队列来执行手动的ACK,这个时候如果抛出异常,则会重试。如果开启了死信队列,那么抛出异常以后则会进入死信队列。

log-in-0:consumer:auto-bind-dlq: false

队列持久化

上面可以看出来,创建的都是匿名队列,当程序启动的时候自动创建,当程序关闭的时候自动删除。

但是正常开发中,很少使用这种,都会指定一个持久化的队列,不管程序是否运行,队列都存在。

我们可以在bindings的配置里面增加group配置来显式指定哪个队列,我们指定log123队列。

log-in-0:destination: logcontent-type: application/json # 设置消息类型,本次为json,文本则设置“text/plain”group: log123
sendLog:destination: logcontent-type: application/json # 设置消息类型,本次为json,文本则设置“text/plain”group: log123

再次运行程序,可以看到该队列被创建。接下来停止程序,可以看到队列还存在那里。

bindings重命名

默认约定的名称为log-in-0这种形式

但是我们也可以将它重命名。通过配置文件可以将log-in-0重命名为input,不过这样的话,所有的log-in-0的bindings配置都需要修改成input,使用上也是。注意官方并不推荐这种做法,他们认为在大多数情况下,这有点矫枉过正。

spring:cloud:stream:function:bindings:log-in-0: input

显式绑定创建

默认约定的是log-in-0负责输入,log-out-0负责输出,我们也可以显式的创建这些。

通过配置文件

spring:cloud:stream:input-bindings: login;fooinoutput-bindings: logout;fooout

轮询配置属性

spring:integration:poller:# 全局配置fixedDelay: 1000L # 默认轮询器的延迟 单位毫秒,默认1000L maxMessagesPerPoll: 1L # 默认轮询器的每个轮询事件的最大消息数。默认 1Lcron: none # Cron 触发器的 Cron 表达式值。默认 noneinitialDelay: 0 # 周期性触发的初始延迟。 默认0timeUnit: MILLISECONDS # 要应用于延迟值的 TimeUnit。默认 MILLISECONDS

也可以单独为某个bindings来配置

spring:cloud:stream:bindings:log-out-0:producer:poller:# log-out-0的单独配置fixedDelay: 1000L # 默认轮询器的延迟 单位毫秒,默认1000L maxMessagesPerPoll: 1L # 默认轮询器的每个轮询事件的最大消息数。默认 1Lcron: none # Cron 触发器的 Cron 表达式值。默认 noneinitialDelay: 0 # 周期性触发的初始延迟。 默认0timeUnit: MILLISECONDS # 要应用于延迟值的 TimeUnit。默认 MILLISECONDS

函数组合

假设我们有两个处理Bean,enrich负责检查header,如果缺少foo,就添加为foo,bar。然后第二个echo则负责检查是否包含foo这个Header然后输出消息内容。

@Bean
public Function<Message<String>, Message<String>> enrich() {return message -> {Assert.isTrue(!message.getHeaders().containsKey("foo"), "Should NOT contain 'foo' header");return MessageBuilder.fromMessage(message).setHeader("foo", "bar").build();};
}@Bean
public Function<Message<String>, Message<String>> echo() {return message -> {Assert.isTrue(message.getHeaders().containsKey("foo"), "Should contain 'foo' header");System.out.println("Incoming message " + message);return message;};
}

通过配置将这两个bean组合起来,组合之后,这个bean名称就编程了enrich|echo,后续的配置都需要这种冗长的名称,所以这里官方推荐使用重命名的方式将它变成简单的名称。

spring:cloud:function:definition: enrich|echo # 函数组合stream:function: bindings:enrich|echo-in-0: input # 重命名

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/644737.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

算法基础学习|双指针算法

双指针算法 代码模板 for (int i 0, j 0; i < n; i ){while (j < i && check(i, j)) j ;// 具体问题的逻辑 } 常见问题分类&#xff1a;(1) 对于一个序列&#xff0c;用两个指针维护一段区间(2) 对于两个序列&#xff0c;维护某种次序&#xff0c;比如归并…

Transformer and Pretrain Language Models3-1

content transformer attention mechanism transformer structure​​​​​​​ pretrained language models language modeling pre-trained langue models(PLMs&#xff09; fine-tuning approaches PLMs after BERT applications of masked LM frontiers of PLMs …

RocketMQ的一万字全面总结,带你快速入门消息队列

前言 近日偶然聊起消息队列&#xff0c;发现知识模糊又破碎&#xff0c;遂广泛查询资料&#xff0c;做了这么一篇非常浅显的总结&#xff0c;聊以充作入门参考资料吧。 下面几个问题&#xff0c;如果不能回答地很好&#xff0c;可以试着在文中找寻一下答案。&#xff08;答案…

时间序列大模型:TimeGPT

论文&#xff1a;https://arxiv.org/pdf/2310.03589.pdf TimeGPT&#xff0c;这是第一个用于时间序列的基础模型&#xff0c;能够为训练期间未见过的多样化数据集生成准确的预测。 大规模时间序列模型通过利用当代深度学习进步的能力&#xff0c;使精确预测和减少不确定性成为…

VSCode 更换默认的 terminal(终端)

Win10 中 VSCode 默认的 terminal 为 PowerShell, 想要更换为 Git bash。 1. 按快捷键&#xff1a;Ctrl Shift P 2. 搜索&#xff1a;“erminal: Select Default Profile” 3. 你会看到可选的终端列表&#xff0c;然后选择 Git Bash

3.Eureka注册中心

3.Eureka注册中心 假如我们的服务提供者user-service部署了多个实例&#xff0c;如图&#xff1a; 大家思考几个问题&#xff1a; order-service在发起远程调用的时候&#xff0c;该如何得知user-service实例的ip地址和端口&#xff1f;有多个user-service实例地址&#xff0…

JVM对象创建与内存回收机制

对象的创建过程有如下步骤&#xff1a; 1.类加载检查&#xff1a; 虚拟机遇到一个new指令时&#xff0c;首先将去检查这个指令的参数是否能在常量池中定位到一个类的符号引用&#xff0c;并且检查这个符号引用代表的类是否已被加载、解析和初始化过&#xff0c;如果没…

长城资产信息技术岗24届校招面试面经

本文介绍2024届秋招中&#xff0c;中国长城资产管理股份有限公司的信息技术岗岗位一面的面试基本情况、提问问题等。 10月投递了中国长城资产管理股份有限公司的信息技术岗岗位&#xff0c;所在部门为长城新盛信托有限责任公司。目前完成了一面&#xff0c;在这里记录一下一面经…

函数递归(Recursion)一篇便懂

递归的概念 在 C 语言中&#xff0c;递归&#xff08;Recursion&#xff09;是一种函数调用自身的编程技术。当一个函数在其定义中调用自身时&#xff0c;就称为递归函数。 了解递归思想 把⼀个大型复杂问题层层转化为⼀个与原问题相似&#xff0c;但规模较小的子问题来求解…

3.chrony服务器

目录 1. 简介 1.1. 重要性 1.2. Linux的两个时钟 1.3. 设置日期时间 1.3.1. timedatectl命令设置 1.3.2. date命令设置 1.4. NTP 1.5. Chrony介绍 2. 安装与配置 2.1. 安装&#xff1a; 2.2. Chrony配置文件分析 2.3. 同步时间服务器 2.3.1. 授时中心 2.3.2. 实验…

制造业中的数据治理

随着信息技术的飞速发展&#xff0c;数据已经成为现代制造业的核心资产。数据治理作为确保数据质量、安全性、可靠性和一致性的关键过程&#xff0c;对于提高生产效率和质量控制具有不可忽视的影响。本文将深入探讨制造业中数据治理的重要性、挑战和实践&#xff0c;以揭示其对…

HCIP:不同VLAN下实现网络互相通信

配置pc1 配置pc2 配置pc3 将sw1划分到vlan3 将sw3划分到vlan3 在sw1上进行缺省 将sw1上&#xff08;g0/0/1&#xff09;的untagged改成 1 3 则在pc1上ping pc2可通 在sw1上进行缺省 在sw3上&#xff08;e0/0/1&#xff09;打标记 则在pc1上ping pc3可通&#xff08;实现互通&am…

新特性Record最全用法总结---动力节点总结

目录 0、有用的新特性 一、Record 1.1、Record的介绍&#xff1a; 1.2、Record的声明&#xff1a; 1.3、Record的创建&#xff1a; 1.4、Record使用举例&#xff1a; 1.5、Record-实例方法、静态方法 1.6、Record-三类构造方法 1.6.1、紧凑型构造、定制构造方法&#…

服务器的组成

服务器的重要结构组成 家用电脑组成&#xff1a; CPU、主板、内存条、显卡、硬盘、电源、风扇、网卡、显示器、机箱、键盘鼠标等等。 CPU CPU是电脑的大脑&#xff0c; CPU发展史&#xff1a; 32 位CPU&#xff1a;最大的内存寻址地址2^32&#xff0c;大约4G的大小。 CP…

爬虫进阶之selenium模拟浏览器

爬虫进阶之selenium模拟浏览器 简介环境配置1、建议先安装conda2、创建虚拟环境并安装对应的包3、下载对应的谷歌驱动以及与驱动对应的浏览器 代码setting.py配置scrapy脚本参考中间件middlewares.py 附录&#xff1a;selenium教程 简介 Selenium是一个用于自动化浏览器操作的…

CSS 楼梯弹弹球

<template><view class="loader"></view> </template><script></script><style>body {background-color: #212121;/* 设置背景颜色为 #212121 */}.loader {position: relative;/* 设置定位为相对定位 */width: 120px;/* 设…

38-WEB漏洞-反序列化之PHPJAVA全解(下)

WEB漏洞-反序列化之PHP&JAVA全解&#xff08;下&#xff09; 一、Java中API实现二、序列化理解三、案例演示3.1、本地3.2、Java 反序列化及命令执行代码测试3.3、WebGoat_Javaweb 靶场反序列化测试3.4、2020-网鼎杯-朱雀组-Web-think_java 真题复现 四、涉及资源 一、Java中…

springboot118共享汽车管理系统

简介 【毕设源码推荐 javaweb 项目】基于springbootvue 的共享汽车管理系统 适用于计算机类毕业设计&#xff0c;课程设计参考与学习用途。仅供学习参考&#xff0c; 不得用于商业或者非法用途&#xff0c;否则&#xff0c;一切后果请用户自负。 看运行截图看 第五章 第四章 获…

『论文阅读|2024 WACV 多目标跟踪Deep-EloU|纯中文版』

论文题目&#xff1a; Iterative Scale-Up ExpansionIoU and Deep Features Association for Multi-Object Tracking in Sports 论文特点&#xff1a; 作者提出了一种迭代扩展的 ExpansionIoU 和深度特征关联方法Deep-EIoU&#xff0c;用于体育场景中的多目标跟踪&#xff0c;旨…

基于springboot家政服务管理平台源码和论文

随着家政服务行业的不断发展&#xff0c;家政服务在现实生活中的使用和普及&#xff0c;家政服务行业成为近年内出现的一个新行业&#xff0c;并且能够成为大众广为认可和接受的行为和选择。设计家政服务管理平台的目的就是借助计算机让复杂的销售操作变简单&#xff0c;变高效…