项目实战--Spring Boot 3整合Flink实现大数据文件处理

一、应用背景

公司大数据项目中,需要构建和开发高效、可靠的数据处理子系统,实现大数据文件处理、整库迁移、延迟与乱序处理、数据清洗与过滤、实时数据聚合、增量同步(CDC)、状态管理与恢复、反压问题处理、数据分库分表、跨数据源一致性以及实时异常检测与告警等功能,确保数据的准确性、一致性和实时性。采用Spring Boot 3.+和Flink平台上进行数据治理的方案。

二、方案优势

由于是大数据项目,因此在处理大规模数据集时,文件处理能力直接影响到数据驱动决策的效果,高效的大数据文件处理既要能保证数据的时效性和准确性,也要能提升整体系统的性能和可靠性。
Spring Boot 3.+和Flink结合使用,在处理大数据文件时有不少独特的优势。
首先,这两者能够相互补充,带来高效和便捷的文件处理能力的原因在于:

1)统一的开发体验:
Spring Boot 3.+和Flink结合使用,可以在同一项目中综合应用两者的优势。Spring Boot可以负责微服务的治理、API的管理和调度,而Flink则专注于大数据的实时处理和分析。两者的结合能够提供一致的开发体验和简化的集成方式。(2)动态扩展和高可用性:
微服务架构下,Spring Boot提供的良好扩展性和Flink的高可用性,使得系统可以在需求增长时动态扩展,确保系统稳定运行。Flink的容错机制配合Spring Boot的服务治理能力,可以有效提高系统的可靠性。(3)灵活的数据传输和处理:
通过Spring Boot的REST API和消息队列,可以轻松地将数据传输到Flink进行处理,Flink处理完毕后还可以将结果返回到Spring Boot处理的后续业务逻辑中。这种灵活的处理方式使得整个数据处理流程更为高效且可控。
三、实现步骤

1.首先配置Spring Boot 3.x和Flink的开发环境。在pom.xml中添加必要的依赖:

<dependencies><!-- Spring Boot 依赖 --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!-- Apache Flink 依赖 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>1.14.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.11</artifactId><version>1.14.0</version></dependency><!-- 其他必要依赖 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-filesystem_2.11</artifactId><version>1.14.0</version></dependency>
</dependencies>

2.数据的读取、处理和写入流程
2.1 数据读取
数据源选择:(项目中使用的是HDFS,故后续文档展示从HDFS中并行读取数据)

(1)本地文件系统:适用于中小规模数据处理,开发和调试方便。
(2)分布式文件系统(HDFS):适用于大规模数据处理,具备高扩展性和容错能力。
(3)云存储(S3):适用于云环境下的数据处理,支持弹性存储和高可用性。

为提高读取性能,采用多线程并行读取和数据分片等策略。

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class HDFSDataReader {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 从 HDFS 中读取数据,并通过并行流的方式对数据进行处理和统计。DataStream<String> text = env.readTextFile("hdfs://localhost:9000/resources/datafile");DataStream<Tuple2<String, Integer>> wordCounts = text.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {@Overridepublic void flatMap(String value, Collector<Tuple2<String, Integer>> out) {for (String word : value.split("\\s")) {out.collect(new Tuple2<>(word, 1));}}}).keyBy(0).sum(1);wordCounts.writeAsText("hdfs:///path/to/output/file", FileSystem.WriteMode.OVERWRITE);env.execute("HDFS Data Reader");}
}

2.2 数据处理
数据清洗和预处理是大数据处理中重要的一环,包括步骤:

数据去重:移除重复的数据,确保数据唯一性。
数据过滤:排除不符合业务规则的无效数据。
数据转换:将数据格式转换为统一的规范格式,便于后续处理。

进行简单的数据清洗操作:

DataStream<String> cleanedData = inputStream.filter(new FilterFunction<String>() {@Overridepublic boolean filter(String value) {// 过滤空行和不符合格式的数据return value != null && !value.trim().isEmpty() && value.matches("regex");}}).map(new MapFunction<String, String>() {@Overridepublic String map(String value) {// 数据格式转换return transformData(value);}});

在数据清洗之后,需要对数据进行各种聚合和分析操作,如统计分析、分类聚类等。这是大数据处理的核心部分,Flink 提供丰富的内置函数和算子来帮助实现这些功能。

对数据进行简单的聚合统计:

DataStream<Tuple2<String, Integer>> aggregatedData = cleanedData.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {@Overridepublic void flatMap(String value, Collector<Tuple2<String, Integer>> out) {for (String word : value.split("\\s+")) {out.collect(new Tuple2<>(word, 1));}}}).keyBy(0).sum(1);

2.3 数据写入
处理后的数据需要高效地写入目标存储系统,常见的数据存储包括文件系统、数据库和消息队列等。选择合适的存储系统不仅有助于提升整体性能,同时也有助于数据的持久化和后续分析。

文件系统:适用于批处理结果的落地存储。
数据库:适用于结构化数据的存储和查询。
消息队列:适用于实时流处理结果的传输和消费。

为提高写入性能,可以采取分区写入、批量写入和压缩等策略。
使用分区写入和压缩技术将处理后的数据写入文件系统:

outputStream.map(new MapFunction<Tuple2<String, Integer>, String>() {@Overridepublic String map(Tuple2<String, Integer> value) {// 数据转换为字符串格式return value.f0 + "," + value.f1;}}).writeAsText("file:output/tag/datafile", FileSystem.WriteMode.OVERWRITE).setParallelism(4) // 设置并行度.setWriteModeWriteParallelism(FileSystem.WriteMode.NO_OVERWRITE); // 设置写入模式和压缩

3.性能优化
3.1 并行度设置
Flink 支持高度并行的数据处理,通过设置并行度可以提高整体处理性能。
设置Flink的全局并行度和算子级并行度:

env.setParallelism(8); // 设置全局并行度DataStream<Tuple2<String, Integer>> result = inputStream.flatMap(new Tokenizer()).keyBy(0).sum(1).setParallelism(4); // 设置算子级并行度

3.2 资源管理
合理管理计算资源,避免资源争用,可以显著提高数据处理性能。在实际开发中,可以通过配置Flink的TaskManager资源配额(如内存、CPU)来优化资源使用:

# Flink 配置文件 (flink-conf.yaml)
taskmanager.memory.process.size: 2048m
taskmanager.memory.framework.heap.size: 512m
taskmanager.numberOfTaskSlots: 4

3.3 数据切分和批处理
对于大文件处理,可以采用数据切分技术,将大文件拆分为多个小文件进行并行处理,避免单个文件过大导致的处理瓶颈。同时,使用批处理可以减少网络和I/O操作,提高整体效率。

DataStream<String> partitionedStream = inputStream.rebalance() // 重新分区.mapPartition(new MapPartitionFunction<String, String>() {@Overridepublic void mapPartition(Iterable<String> values, Collector<String> out) {for (String value : values) {out.collect(value);}}}).setParallelism(env.getParallelism());

3.4 使用缓存和压缩

对于高频访问的数据,可将中间结果缓存到内存中,以减少重复计算和I/O操作。此外,在写入前对数据进行压缩(如 gzip)可以减少存储空间和网络传输时间。

四、完整示例

通过一个完整的示例来实现Spring Boot 3.+和Flink大数据文件的读取和写入。涵盖上述从数据源读取文件、数据处理、数据写入到目标文件的过程。

首先,通过Spring Initializer创建一个新的Spring Boot项目(spring boot 3需要jdk17+),添加以下依赖:

<dependencies><!-- Spring Boot 依赖 --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!-- Apache Flink 依赖 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>1.14.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.11</artifactId><version>1.14.0</version></dependency><!-- 其他必要依赖 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-filesystem_2.11</artifactId><version>1.14.0</version></dependency>
</dependencies>

定义一个配置类来管理文件路径和其他配置项:

import org.springframework.context.annotation.Configuration;@Configuration
public class FileProcessingConfig {// 输入文件路径public static final String INPUT_FILE_PATH = "fhdfs://localhost:9000/resources/datafile";// 输出文件路径public static final String OUTPUT_FILE_PATH = "file:output/tag/datafile";
}

在业务逻辑层定义文件处理操作:

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.core.fs.FileSystem;
import org.springframework.stereotype.Service;@Service
public class FileProcessingService {public void processFiles() throws Exception {// 创建Flink执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 配置数据源,读取文件DataStream<String> inputStream = env.readTextFile(FileProcessingConfig.INPUT_FILE_PATH);// 数据处理逻辑,将数据转换为大写DataStream<String> processedStream = inputStream.map(new MapFunction<String, String>() {@Overridepublic String map(String value) {return value.toUpperCase();}});// 将处理后的数据写入文件processedStream.writeAsText(FileProcessingConfig.OUTPUT_FILE_PATH, FileSystem.WriteMode.OVERWRITE);// 启动Flink任务env.execute("File Processing Job");}
}

在主应用程序类中启用Spring调度任务:

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.beans.factory.annotation.Autowired;@EnableScheduling
@SpringBootApplication
public class FileProcessingApplication {@Autowiredprivate FileProcessingService fileProcessingService;public static void main(String[] args) {SpringApplication.run(FileProcessingApplication.class, args);}// 定时任务,每分钟执行一次@Scheduled(fixedRate = 60000)public void scheduleFileProcessingTask() {try {fileProcessingService.processFiles();} catch (Exception e) {e.printStackTrace();}}
}

优化数据处理部分,加入更多处理步骤,包括数据校验和过滤来确保数据的质量和准确性。

import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;public class EnhancedFileProcessingService {public void processFiles() throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStream<String> inputStream = env.readTextFile(FileProcessingConfig.INPUT_FILE_PATH);// 数据预处理:数据校验和过滤DataStream<String> filteredStream = inputStream.filter(new FilterFunction<String>() {@Overridepublic boolean filter(String value) {// 过滤长度小于5的字符串return value != null && value.trim().length() > 5;}});// 数据转换:将每行数据拆分为单词DataStream<Tuple2<String, Integer>> wordStream = filteredStream.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {@Overridepublic void flatMap(String value, Collector<Tuple2<String, Integer>> out) {for (String word : value.split("\\W+")) {out.collect(new Tuple2<>(word, 1));}}});// 数据聚合:统计每个单词的出现次数DataStream<Tuple2<String, Integer>> wordCounts = wordStream.keyBy(value -> value.f0).sum(1);// 将结果转换为字符串并写入输出文件DataStream<String> resultStream = wordCounts.map(new MapFunction<Tuple2<String, Integer>, String>() {@Overridepublic String map(Tuple2<String, Integer> value) {return value.f0 + ": " + value.f1;}});resultStream.writeAsText(FileProcessingConfig.OUTPUT_FILE_PATH, FileSystem.WriteMode.OVERWRITE);env.execute("Enhanced File Processing Job");}
}

增加以下步骤:

数据校验和过滤:过滤掉长度小于5的行,确保数据质量。
数据转换:将每行数据拆分为单词,并为每个单词附加计数1。
数据聚合:统计每个单词的出现次数。
结果写入:将统计结果写入输出文件。

对Flink的资源配置进行优化,有效管理 TaskManager 的内存和并行度,以确保文件处理任务的高效执行:

# Flink 配置文件 (flink-conf.yaml)
taskmanager.memory.process.size: 4096m
taskmanager.memory.framework.heap.size: 1024m
taskmanager.numberOfTaskSlots: 4
parallelism.default: 4

好,ok,刹国!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/44389.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

把 .py 文件编译成 .pyd 文件

将 .py 文件编译成 .pyd 文件&#xff08;在Windows上&#xff09;或 .so 文件&#xff08;在Linux或macOS上&#xff09;&#xff0c;实际上是将Python代码编译成一种可以被Python解释器直接加载的二进制模块。这种编译过程通常使用cython、pyinstaller的钩子&#xff08;hook…

JRT打印药敏报告

最近没写jrt系列博客&#xff0c;不是中途而废了。而是在写微生物系统。今天终于把微生物大体完成了&#xff0c;伴随着业务的实现&#xff0c;框架趋于完善和稳定。构建一套完美而强大的打印体系一直是我的理想&#xff0c;从最开始C#的winform打印控件到刚接触bs时候用js打印…

day11:文件处理

一、文件与文件模式介绍 1、什么是文件 文件是操作系统提供给用户/应用程序操作硬盘的一种虚拟的概念/接口 用户/应用程序(open()) 操作系统&#xff08;文件&#xff09; 计算机硬件&#xff08;硬盘&#xff09;2、为何要用文件 ①用户/应用程序可以通过文件将数据永久保存…

【最强八股文 -- 计算机网络】【快速版】DNS 解析过程

步骤一&#xff1a;查询浏览器及计算机本地 HOST 文件中是否有对应的 缓存 步骤二&#xff1a;请求本地 DNS 服务器 (无则请求 根级 获取能提供信息的权威 DNS 服务器 ) 步骤三&#xff1a;逐级返回对应的 IP 至浏览器 步骤四&#xff1a;浏览器发起连接并缓存 参考资料&#x…

通过Arcgis从逐月平均气温数据中提取并计算年平均气温

通过Arcgis快速将逐月平均气温数据生成年平均气温数据。本次用2020年逐月平均气温数据操作说明。 一、准备工作 &#xff08;1&#xff09;准备Arcmap桌面软件&#xff1b; &#xff08;2&#xff09;准备2020年逐月平均气温数据&#xff08;NC格式&#xff09;、范围图层数据&…

JAVA分布式事务详情分布式事务的解决方案Java中的分布式事务实现

本人详解 作者:王文峰,参加过 CSDN 2020年度博客之星,《Java王大师王天师》 公众号:JAVA开发王大师,专注于天道酬勤的 Java 开发问题中国国学、传统文化和代码爱好者的程序人生,期待你的关注和支持!本人外号:神秘小峯 山峯 转载说明:务必注明来源(注明:作者:王文峰…

数一140+上岸|七月强化一定要避开这3个雷区!

当然可以&#xff0c;强化阶段的主要任务就是做题&#xff01; 但是不用一刀切&#xff0c;强化阶段听课和做题可以二八原则&#xff0c;就是听课占20%&#xff0c;做题占80%。 因为自己去自学讲义的话&#xff0c;比如张宇18讲&#xff0c;会漏掉一些重点&#xff0c;有的技…

进程间的通信--管道

文章目录 一、进程通信的介绍1.1进程间为什么需要通信1.2进程如何通信 二、管道2.1匿名管道2.1.1文件描述符理解管道2.1.2接口使用2.1.3管道的4种情况2.1.4管道的五种特征 2.2管道的使用场景2.2.1命令行中的管道2.2.2进程池 2.命名管道2.1.1原理2.2.2接口2.2.3代码实例 一、进程…

scipy库中,不同应用滤波函数的区别,以及FIR滤波器和IIR滤波器的区别

一、在 Python 中&#xff0c;有多种函数可以用于应用 FIR/IIR 滤波器&#xff0c;每个函数的使用场景和特点各不相同。以下是一些常用的 FIR /IIR滤波器应用函数及其区别&#xff1a; from scipy.signal import lfiltery lfilter(fir_coeff, 1.0, x)from scipy.signal impo…

【Docker-compose】搭建php 环境

文章目录 Docker-compose容器编排1. 是什么2. 能干嘛3. 去哪下4. Compose 核心概念5. 实战 &#xff1a;linux 配置dns 服务器&#xff0c;搭建lemp环境&#xff08;Nginx MySQL (MariaDB) PHP &#xff09;要求6. 配置dns解析配置 lemp Docker-compose容器编排 1. 是什么 …

springer latex模板参考文献不显示

原因 his is BibTeX, Version 0.99d (TeX Live 2024) The top-level auxiliary file: sn-article.aux I couldn’t open style file sn-mathphys-num.bst —line 2 of file sn-article.aux : \bibstyle{sn-mathphys-num : } I’m skipping whatever remains of this command I…

【智能算法改进】一种混合多策略改进的麻雀搜索算法

目录 1.算法原理2.改进点3.结果展示4.参考文献5.代码获取 1.算法原理 【智能算法】麻雀搜索算法&#xff08;SSA&#xff09;原理及实现 2.改进点 精英反向学习策略 将精英反向学习策略应用到初始化阶段, 通过反向解的生成与精英个体的选择, 不仅使算法搜索范围得到扩大, 提…

Gunicorn的预分叉架构:快速启动与高效资源利用

Gunicorn的预分叉架构&#xff1a;快速启动与高效资源利用 引言 Gunicorn&#xff08;Green Unicorn&#xff09;是一个Python WSGI HTTP服务器&#xff0c;设计用来为Python web应用提供高效、可靠的服务。Gunicorn支持同步和异步工作模式&#xff0c;并且兼容多种Web框架。…

从零开学C++:类和对象(上)

引言&#xff1a;在学习了C的入门级知识之后&#xff0c;现在就让我们一起进入类和对象的学习吧&#xff0c;该知识点我将分为上&#xff0c;中&#xff0c;下三个部分对其进行讲解。 更多有关C语言和数据结构的知识详解可前往个人主页&#xff1a;计信猫 目录 一&#xff0c;类…

通过实例说明.NET Autofac依赖注入的多种方式

Autofac提供了多种灵活的依赖注入方式&#xff0c;包括但不限于构造函数注入、属性注入、方法注入、字段注入、元数据注入和动态参数注入。根据具体的需求选择合适的注入方式&#xff0c;可以有效地管理对象的依赖关系和生命周期 构造函数注入 构造函数注入&#xff08;Const…

CSS 【详解】自定义属性(又名 CSS 变量)

声明变量 - - 变量命名规则 支持数字命名支持使用短横线和空格命名支持中文等CJK文字不支持包含$、[、]、^、(、)、%、"等特殊字符的命名&#xff0c;要使用这些特殊字符&#xff0c;需要使用反斜杠转义。 变量值 可以是任意值或表达式 --direction: to top;--gradient…

Android liveData 监听异常,fragment可见时才收到回调记录

背景&#xff1a;在app的fragment不可见的情况下使用&#xff0c;发现注册了&#xff0c;但是没有回调导致数据一直未更新&#xff0c;只有在fragment可见的时候才收到回调 // 观察通用信息mLightNaviTopViewModel.getUpdateCommonInfo().observe(this, new Observer<Common…

python 之修改host配置

背景&#xff1a;生产环境登录 test.ai.com &#xff0c;如果要登录验收的 test.ai.com 要改host配置&#xff0c;所以写了个python脚本 host生产环境配置为 # 192.163.0.0 test.ai.com host验收环境为 192.163.0.0 test.ai.com 不加host配置默认是生成哈 import os # C:…

[嵌入式 C 语言] 按位与、或、取反、异或

若协议中如下图所示&#xff1a; 注意&#xff1a; 长度为1&#xff0c;表示1个字节&#xff0c;也就是0xFF&#xff0c;也就是 1111 1111 &#xff08;这里0xFF只是单纯表示一个数&#xff0c;也可以是其他数&#xff0c;这里需要注意的是1个字节的意思&#xff09; 一、按位…