Spring Cloud Stream实现数据流处理

1.什么是Spring Cloud Stream?

我看很多回答都是“为了屏蔽消息队列的差异,使我们在使用消息队列的时候能够用统一的一套API,无需关心具体的消息队列实现”。 这样理解是有些不全面的,Spring Cloud Stream的核心是Stream,准确来讲Spring Cloud Stream提供了一整套数据流走向(流向)的API, 它的最终目的是使我们不关心数据的流入和写出,而只关心对数据的业务处理 我们举一个例子:你们公司有一套系统,这套系统由多个模块组成,你负责其中一个模块。数据会从第一个模块流入,处理完后再交给下一个模块。对于你负责的这个模块来说,它的功能就是接收上一个模块处理完成的数据,自己再加工加工,扔给下一个模块。

module

我们很容易总结出每个模块的流程:

  1. 从上一个模块拉取数据
  2. 处理数据
  3. 将处理完成的数据发给下一个模块

其中流程1和3代表两个模块间的数据交互,这种数据交互往往会采用一些中间件(middleware)。比如模块1和模块2间数据可能使用的是kafka,模块1向kafka中push数据,模块2向kafka中poll数据。而模块2和模块3可能使用的是rabbitMQ。很明显,它们的功能都是一样的:提供数据的流向,让数据可以流入自己同时又可以从自己流出发给别人。但由于中间件的不同,需要使用不同的API。 为了消除这种数据流入(输入)和数据流出(输出)实现上的差异性,因此便出现了Spring Cloud Stream。

2.环境准备

采用docker-compose搭建kafaka环境

version: '3'networks:kafka:ipam:driver: defaultconfig:- subnet: "172.22.6.0/24"services:zookepper:image: registry.cn-hangzhou.aliyuncs.com/zhengqing/zookeeper:latestcontainer_name: zookeeper-serverrestart: unless-stoppedvolumes:- "/etc/localtime:/etc/localtime"environment:ALLOW_ANONYMOUS_LOGIN: yesports:- "2181:2181"networks:kafka:ipv4_address: 172.22.6.11kafka:image: registry.cn-hangzhou.aliyuncs.com/zhengqing/kafka:3.4.1container_name: kafkarestart: unless-stoppedvolumes:- "/etc/localtime:/etc/localtime"environment:ALLOW_PLAINTEXT_LISTENER: yesKAFKA_CFG_ZOOKEEPER_CONNECT: zookepper:2181KAFKA_CFG_ADVERTISED_LISTENERS: PLAINTEXT://10.11.68.77:9092ports:- "9092:9092"depends_on:- zookeppernetworks:kafka:ipv4_address: 172.22.6.12kafka-map:image: registry.cn-hangzhou.aliyuncs.com/zhengqing/kafka-mapcontainer_name: kafka-maprestart: unless-stoppedvolumes:- "./kafka/kafka-map/data:/usr/local/kafka-map/data"environment:DEFAULT_USERNAME: adminDEFAULT_PASSWORD: 123456ports:- "9080:8080"depends_on:                         - kafkanetworks:kafka:ipv4_address: 172.22.6.13

run

docker-compose -f docker-compose-kafka.yml -p kafka up -d

kafka-map

https://github.com/dushixiang/kafka-map

  • 访问:http://127.0.0.1:9080
  • 账号密码:admin/123456

3.代码工程

stream

 实验目标

  1. 生成UUID并将其发送到Kafka主题batch-in
  2. batch-in主题接收UUID的批量消息,移除其中的数字,并将结果发送到batch-out主题。
  3. 监听batch-out主题并打印接收到的消息。

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><parent><artifactId>springcloud-demo</artifactId><groupId>com.et</groupId><version>1.0-SNAPSHOT</version></parent><modelVersion>4.0.0</modelVersion><artifactId>spring-cloud-stream-kafaka</artifactId><properties><maven.compiler.source>17</maven.compiler.source><maven.compiler.target>17</maven.compiler.target></properties><dependencies><!-- Spring Boot Starter Web --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!-- Spring Boot Starter Test --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-stream-kafka</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency></dependencies></project>

处理流

/** Copyright 2023 the original author or authors.** Licensed under the Apache License, Version 2.0 (the "License");* you may not use this file except in compliance with the License.* You may obtain a copy of the License at**      https://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/package com.et;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;import java.util.List;
import java.util.UUID;
import java.util.function.Function;
import java.util.function.Supplier;/*** @author Steven Gantz*/
@SpringBootApplication
public class CloudStreamsFunctionBatch {public static void main(String[] args) {SpringApplication.run(CloudStreamsFunctionBatch.class, args);}@Beanpublic Supplier<UUID> stringSupplier() {return () -> {var uuid = UUID.randomUUID();System.out.println(uuid + " -> batch-in");return uuid;};}@Beanpublic Function<List<UUID>, List<Message<String>>> digitRemovingConsumer() {return idBatch -> {System.out.println("Removed digits from batch of " + idBatch.size());return idBatch.stream().map(UUID::toString)// Remove all digits from the UUID.map(uuid -> uuid.replaceAll("\\d","")).map(noDigitString -> MessageBuilder.withPayload(noDigitString).build()).toList();};}@KafkaListener(id = "batch-out", topics = "batch-out")public void listen(String in) {System.out.println("batch-out -> " + in);}}
  • 定义一个名为stringSupplier的Bean,它实现了Supplier<UUID>接口。这个方法生成一个随机的UUID,并打印到控制台,表示这个UUID将被发送到batch-in主题。
  • 定义一个名为digitRemovingConsumer的Bean,它实现了Function<List<UUID>, List<Message<String>>>接口。这个方法接受一个UUID的列表,打印出处理的UUID数量,然后将每个UUID转换为字符串,移除其中的所有数字,最后将结果封装为消息并返回。
  • 使用@KafkaListener注解定义一个Kafka监听器,监听batch-out主题。当接收到消息时,调用listen方法并打印接收到的消息内容。

配置文件

spring:cloud:function:definition: stringSupplier;digitRemovingConsumerstream:bindings:stringSupplier-out-0:destination: batch-indigitRemovingConsumer-in-0:destination: batch-ingroup: batch-inconsumer:batch-mode: truedigitRemovingConsumer-out-0:destination: batch-outkafka:binder:brokers: localhost:9092bindings:digitRemovingConsumer-in-0:consumer:configuration:# Forces consumer to wait 5 seconds before polling for messagesfetch.max.wait.ms: 5000fetch.min.bytes: 1000000000max.poll.records: 10000000

参数解释

spring:cloud:function:definition: stringSupplier;digitRemovingConsumer
  • spring.cloud.function.definition:定义了两个函数,stringSupplierdigitRemovingConsumer。这两个函数将在应用程序中被使用。
    stream:bindings:stringSupplier-out-0:destination: batch-in
  • stream.bindings.stringSupplier-out-0.destination:将stringSupplier函数的输出绑定到Kafka主题batch-in
        digitRemovingConsumer-in-0:destination: batch-ingroup: batch-inconsumer:batch-mode: true
  • stream.bindings.digitRemovingConsumer-in-0.destination:将digitRemovingConsumer函数的输入绑定到Kafka主题batch-in
  • group: batch-in:指定消费者组为batch-in,这意味着多个实例可以共享这个组来处理消息。
  • consumer.batch-mode: true:启用批处理模式,允许消费者一次处理多条消息。
        digitRemovingConsumer-out-0:destination: batch-out
  • stream.bindings.digitRemovingConsumer-out-0.destination:将digitRemovingConsumer函数的输出绑定到Kafka主题batch-out

以上只是一些关键代码,所有代码请参见下面代码仓库

代码仓库

  • https://github.com/Harries/springcloud-demo(Spring Cloud Stream)

4.测试

启动弄Spring Boot应用,可以看到控制台输出日志如下:

291ea6cc-1e5e-4dfb-92b6-5d5ea43d4277 -> batch-in
c746ba4e-835e-4f66-91c5-7a5cf8b01068 -> batch-in
a661145b-2dd9-4927-8806-919ad258ade5 -> batch-in
db150918-0f0b-49f6-b7bb-77b0f580de4c -> batch-in
b0d4917b-6777-4d96-a6d0-bb96715b5b20 -> batch-in
Removed digits from batch of 5
batch-out -> eacc-ee-dfb-b-dead
batch-out -> cbae-e-f-c-acfb
batch-out -> ab-dd---adade
batch-out -> db-fb-f-bbb-bfdec
batch-out -> bdb--d-ad-bbbb

5.引用

  • https://github.com/spring-cloud/spring-cloud-stream-samples
  • Spring Cloud Stream 3.X - coderZoe的博客
  • Spring Cloud Stream实现数据流处理 | Harries Blog™

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/886960.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Notepad++--在开头快速添加行号

原文网址&#xff1a;Notepad--在开头快速添加行号_IT利刃出鞘的博客-CSDN博客 简介 本文介绍Notepad怎样在开头快速添加行号。 需求 原文件 想要的效果 方法 1.添加点号 Alt鼠标左键&#xff0c;从首行选中首列下拉&#xff0c;选中需要添加序号的所有行的首列&#xff…

【ROS2】多传感器融合、实现精准定位:robot_localization

1、简述 robot_localization在SLAM建图、导航中常用于将多个传感器融合(IMU、里程计、深度相机、GPS等),以提高定位精度,为机器人提供了在三维空间中的非线性状态估计 robot_localization包含两个状态估计节点: ekf_localization_node:扩展卡尔曼滤波(EKF),缺点是非…

【智谱开放平台-注册_登录安全分析报告】

前言 由于网站注册入口容易被机器执行自动化程序攻击&#xff0c;存在如下风险&#xff1a; 暴力破解密码&#xff0c;造成用户信息泄露&#xff0c;不符合国家等级保护的要求。短信盗刷带来的拒绝服务风险 &#xff0c;造成用户无法登陆、注册&#xff0c;大量收到垃圾短信的…

永夜星河主题特效2(星河背景 + 闪烁文字+点击星星 + 文字弹出特效)

目录 图片展示 星河背景 闪烁文字点击星星 文字弹出特效 特效介绍&#xff1a; 使用方式&#xff1a; 图片展示 星河背景 闪烁文字点击星星 文字弹出特效 <!DOCTYPE html> <html lang"en"><head><meta charset"UTF-8">&l…

SAP Ariba Contracts_Author the Main Agreement

SAP Ariba Contracts_Author the Main Agreement Workspace Documents 从SAP Ariba 项目模板中继承的文档将自动出现在控件的“文档”选项卡中项目&#xff0c;Workspace Documents 管理所有合同相关文件的地方&#xff0c;如主要协议&#xff0c;附录&#xff0c;合同条款等…

C++练习题(5)

//函数打印素数 #include <iostream> using namespace std; int is_prime(int n){ for(int j2;j<n;j){ if(n%j0) return 0; } return 1; } int main(){ int i0; for(int i100;i<200;i){ if(is_prime(i)1) pr…

编程之路,从0开始:动态内存管理

Hello&#xff0c;大家好&#xff01;很高兴我们又见面啦&#xff01;给生活添点passion&#xff0c;开始今天的编程之路。 我们今天来学习C语言中的动态内存管理。 目录 1、为什么要有动态内存管理&#xff1f; 2、malloc和free &#xff08;1&#xff09;malloc函数 &…

【题解】AT_joisc2007_mall ショッピングモール (Mall)

原题传送门 温馨提示&#xff1a;岛国题要换行&#xff01; 需要求一个矩阵的和&#xff0c;考虑二维前缀和。 题目中不允许矩阵中有负数&#xff0c;结合求和的最小值&#xff0c;我们把负数赋为最大值不就行了吗。 接下来就是求二维前缀和了。 基于容斥原理&#xff0c;二…

Apifox软件Mock前端数据,帮忙生成API接口文档

Apifox是一款功能强大的接口调试软件&#xff0c;其特色功能丰富&#xff0c;且在前端mock数据生成方面表现出色。以下是对Apifox软件特色功能的详解&#xff0c;以及如何进行前端mock数据生成的步骤&#xff1a; https://apifox.com/help/api-docs/exporting-api https://www…

入门pandas

pandas是本书后续内容的首选库。它含有使数据清洗和分析工作变得更快更简单的数据结构和操作工具。pandas经常和其它工具一同使用&#xff0c;如数值计算工具NumPy和SciPy&#xff0c;分析库statsmodels和scikit-learn&#xff0c;和数据可视化库matplotlib。pandas是基于NumPy…

Apple Vision Pro开发001-开发配置

一、Vision Pro开发硬件和软件要求 硬件要求软件要求 1、Apple Silicon Mac(M系列芯片的Mac电脑) 2、Apple vision pro-真机调试 XCode15.2及以上&#xff0c;调试开发和打包发布Unity开发者账号&&苹果开发者账号 二 、开启无线调试 1、Apple Vision Pro和Mac连接同…

无人机与低空经济:开启新质生产力的新时代

无人机技术作为低空经济的核心技术之一&#xff0c;正以其独特的优势在多个行业中发挥着重要作用&#xff0c;成为推动新质生产力革命的重要力量。无人机的应用范围广泛&#xff0c;从农业植保到物流配送&#xff0c;从城市监测到紧急救援&#xff0c;无人机的身影无处不在&…

故障排除-------K8s挂载集群外NFS异常

故障排除-------K8s挂载集群外NFS异常 1. 故障现象2. 原因梳理2.1 排查思路2.2 确认yaml内容2.3 创建k8s内的nfs测试2.3.1 创建nfs和svc2.3.2 测试创建pvc2.3.3 测试结果 2.4 NFS服务端故障排除2.4.1 网络阻断排除2.4.2 排除服务状态问题2.4.3 排查NFS权限问题 3. 故障排除 1. …

微服务即时通讯系统的实现(客户端)----(2)

目录 1. 将protobuf引入项目当中2. 前后端交互接口定义2.1 核心PB类2.2 HTTP接口定义2.3 websocket接口定义 3. 核心数据结构和PB之间的转换4. 设计数据中心DataCenter类5. 网络通信5.1 定义NetClient类5.2 引入HTTP5.3 引入websocket 6. 小结7. 搭建测试服务器7.1 创建项目7.2…

凸函数与深度学习调参

问题1&#xff1a;如何区分凸问题和凹问题&#xff1f; 问题2&#xff1a;深度学习如何区分调参&#xff1f;

使用可视化工具kafkatool连接docker的kafka集群,查看消息内容和offset

1、下载kafkatool 下载地址Offset Explorer&#xff0c;下载对应系统的offset explorer 下载完&#xff0c;傻瓜安装即可&#xff08;建议放D盘&#xff09;&#xff0c;在开始菜单输入offset找到该应用打开 打开 2、连接kafka 点击File > add new connection Bootstrap…

关于Java使用ueditor上传图片的一些总结

1.如何配置ueditor让上传的图片到项目之外&#xff1f; 因为图片上传到web项目中,重新部署项目可能会丢失图片。 解决方法&#xff1a;下载ueditor.1.1.2.jar. 地址&#xff1a;ueditor-1.1.2项目源码及jar包.zip 链接: https://pan.baidu.com/s/1Bhumfw8OX16n0MTO9ur73g 提…

React可以做全栈开发吗

React可以做全栈开发吗? 答案是肯定的&#xff0c;而且还比较完美 React可以用于全栈开发&#xff0c;以下是具体的介绍&#xff1a; 前端部分 构建用户界面 React是一个用于构建用户界面的JavaScript库&#xff0c;它通过组件化的方式让开发者能够高效地创建交互式的UI。例…

【前端学习笔记】Javascript学习二(运算符、数组、函数)

一、运算符 运算符&#xff08;operator&#xff09;也被称为操作符&#xff0c;是用于实现赋值、比较和执行算数运算等功能的符号。 JavaScript中常用的运算符有&#xff1a; 算数运算符、递增和递减运算符、比较运算符、逻辑运算符、赋值运算符 算数运算符&#xff1a; 、-…

Redis五大基本类型——List列表命令详解(命令用法详解+思维导图详解)

目录 一、List列表类型介绍 二、常见命令 1、LPUSH 2、LPUSHX 3、RPUSH 4、RPUSHX 5、LRANGE 6、LPOP 7、RPOP 8、LREM 9、LSET 10、LINDEX 11、LINSERT 12、LLEN 13、阻塞版本命令 BLPOP BRPOP 三、命令小结 相关内容&#xff1a; Redis五大基本类型——Ha…