深入探索Apache Flink:流处理的艺术与实践

在当今的大数据时代,流处理已成为处理实时数据的关键技术。Apache Flink,作为一个开源的流处理框架,以其高吞吐量、低延迟和精确一次(exactly-once)的语义处理能力,在众多流处理框架中脱颖而出。本文将深入探讨如何使用Apache Flink进行流处理,并通过详细的代码示例帮助新手快速上手。

1. Apache Flink简介

Apache Flink是一个分布式处理引擎,支持批处理和流处理。它提供了DataStream API和DataSet API,分别用于处理无界和有界数据集。Flink的核心优势在于其能够以事件时间(event-time)处理数据,确保即使在乱序或延迟数据的情况下,也能得到准确的结果。

2. 环境搭建

在开始编写代码之前,我们需要搭建Flink的开发环境。以下是步骤:

  1. 下载并安装Flink

    wget https://archive.apache.org/dist/flink/flink-1.14.3/flink-1.14.3-bin-scala_2.12.tgz
    tar -xzf flink-1.14.3-bin-scala_2.12.tgz
    cd flink-1.14.3
    

  2. 启动Flink集群

    ./bin/start-cluster.sh
    

  3. 验证Flink集群: 打开浏览器,访问http://localhost:8081,确保Flink的Web UI正常运行。

3. 第一个Flink流处理程序

我们将从一个简单的WordCount程序开始,该程序从一个文本流中读取数据,并计算每个单词的出现次数。

3.1 创建Flink项目

使用Maven创建一个新的Flink项目:

mvn archetype:generate \-DarchetypeGroupId=org.apache.flink \-DarchetypeArtifactId=flink-quickstart-java \-DarchetypeVersion=1.14.3

3.2 编写WordCount程序

src/main/java目录下创建一个新的Java类WordCount.java

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;public class WordCount {public static void main(String[] args) throws Exception {// 创建执行环境final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 从Socket读取数据DataStream<String> text = env.socketTextStream("localhost", 9999);// 进行单词计数DataStream<Tuple2<String, Integer>> counts = text.flatMap(new Tokenizer()).keyBy(0).sum(1);// 打印结果counts.print();// 执行程序env.execute("Socket WordCount");}// 自定义FlatMapFunction,用于分割单词public static class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {@Overridepublic void flatMap(String value, Collector<Tuple2<String, Integer>> out) {// 分割单词String[] words = value.toLowerCase().split("\\W+");for (String word : words) {if (word.length() > 0) {out.collect(new Tuple2<>(word, 1));}}}}
}

3.3 运行WordCount程序

  1. 启动Socket服务器

    nc -lk 9999
    

  2. 运行Flink程序: 在IDE中运行WordCount类,或者使用Maven打包并提交到Flink集群:

    mvn clean package
    ./bin/flink run target/your-project-name-1.0-SNAPSHOT.jar
    

  3. 输入数据: 在启动的Socket服务器中输入一些文本,例如:

    Hello World
    Hello Flink
    

  4. 查看结果: 在Flink的Web UI中查看输出结果,或者在控制台中查看打印的输出。

4. 高级特性与实践

4.1 事件时间与水印

Flink支持事件时间(event-time)处理,这意味着可以按照事件发生的时间进行处理,而不是数据到达的时间。为了处理乱序数据,Flink引入了水印(watermark)的概念。

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;import java.time.Duration;public class EventTimeWordCount {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStream<String> text = env.socketTextStream("localhost", 9999);DataStream<Tuple2<String, Integer>> counts = text.flatMap(new Tokenizer()).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple2<String, Integer>>forBoundedOutOfOrderness(Duration.ofSeconds(5)).withTimestampAssigner((event, timestamp) -> event.f1)).keyBy(0).timeWindow(Time.seconds(10)).sum(1);counts.print();env.execute("EventTime WordCount");}public static class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {@Overridepublic void flatMap(String value, Collector<Tuple2<String, Integer>> out) {String[] words = value.toLowerCase().split("\\W+");for (String word : words) {if (word.length() > 0) {out.collect(new Tuple2<>(word, 1));}}}}
}

4.2 状态管理与容错

Flink提供了强大的状态管理机制,可以轻松处理有状态的计算。以下是一个简单的例子,展示了如何使用Flink的状态API。

import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;public class StatefulWordCount {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStream<String> text = env.socketTextStream("localhost", 9999);DataStream<Tuple2<String, Integer>> counts = text.flatMap(new StatefulTokenizer());counts.print();env.execute("Stateful WordCount");}public static class StatefulTokenizer extends RichFlatMapFunction<String, Tuple2<String, Integer>> {private transient ValueState<Integer> countState;@Overridepublic void open(Configuration config) {ValueStateDescriptor<Integer> descriptor = new ValueStateDescriptor<>("wordCount", // 状态名称TypeInformation.of(Integer.class)); // 状态类型countState = getRuntimeContext().getState(descriptor);}@Overridepublic void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {String[] words = value.toLowerCase().split("\\W+");for (String word : words) {if (word.length() > 0) {Integer currentCount = countState.value();if (currentCount == null) {currentCount = 0;}currentCount += 1;countState.update(currentCount);out.collect(new Tuple2<>(word, currentCount));}}}}
}

4.3 容错与恢复

Flink通过检查点(checkpoint)机制实现容错。以下是一个简单的例子,展示了如何启用检查点。

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;public class FaultTolerantWordCount {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 启用检查点env.enableCheckpointing(1000, CheckpointingMode.EXACTLY_ONCE);DataStream<String> text = env.socketTextStream("localhost", 9999);DataStream<Tuple2<String, Integer>> counts = text.flatMap(new Tokenizer()).keyBy(0).sum(1);counts.print();env.execute("Fault Tolerant WordCount");}public static class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {@Overridepublic void flatMap(String value, Collector<Tuple2<String, Integer>> out) {String[] words = value.toLowerCase().split("\\W+");for (String word : words) {if (word.length() > 0) {out.collect(new Tuple2<>(word, 1));}}}}
}

5. 总结

本文详细介绍了如何使用Apache Flink进行流处理,并通过多个代码示例展示了Flink的基本用法和高级特性。从简单的WordCount程序到事件时间处理、状态管理和容错机制,Flink提供了丰富的功能来应对各种流处理场景。

通过深入学习和实践,你将能够更好地利用Flink处理实时数据,构建高效、可靠的流处理应用。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/44142.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

在树莓派设备上导出系统镜像

镜像导出 前提条件&#xff1a; 已获取可以正常使用的设备。已获取鼠标、键盘和电源适配器。已将设备接入可正常使用的网络。 操作步骤&#xff1a; 连接适配器给设备上电&#xff0c;正常启动设备&#xff0c;连接鼠标和键盘。在终端命令窗格执行如下命令&#xff0c;安装…

数据模型-ER图在数据模型设计中的应用

ER图在数据模型设计中的应用 1. ER图概述&#xff1a;起源与发展​ 实体-关系图&#xff08;Entity Relationship Diagram&#xff0c;简称ER图&#xff09;起源于1970年代&#xff0c;由Peter Chen首次提出&#xff0c;作为描述数据和信息间关系的图形化语言。随着数据库技术…

[PM]流程与结构设计

流程图 流程就是为了达到特定目标, 进行的一系列有逻辑性的操作步骤, 由两个及已上的步骤, 完成一个完整的行为过程, 即可称为流程, 流程图就是对这个过程的图形化展示 分类 业务流程图 概念: 描述业务流程的一种图, 通过特定符号和连线表示具体某个业务的处理步骤和过程作…

MyBatis与JDBC相比,有哪些优势

MyBatis与JDBC&#xff08;Java Database Connectivity&#xff09;相比&#xff0c;在多个方面展现出显著的优势。这些优势使得MyBatis在现代软件开发中成为一个非常受欢迎的选择&#xff0c;特别是在处理数据库交互时。以下是MyBatis相比JDBC的主要优势&#xff1a; 1. 简化…

极狐GitLab亮相世界人工智能大会,开启开源大模型赋能软件研发新时代

GitLab 是一个全球知名的一体化 DevOps 平台&#xff0c;很多人都通过私有化部署 GitLab 来进行源代码托管。极狐GitLab &#xff1a;https://gitlab.cn/install?channelcontent&utm_sourcecsdn 是 GitLab 在中国的发行版&#xff0c;专门为中国程序员服务。可以一键式部署…

285个地级市-胡焕庸线数据

全国285个地级市-胡焕庸线数据.zip资源-CSDN文库 胡焕庸线&#xff1a;中国人口与生态的分界线 胡焕庸线&#xff0c;一条在中国地理学界具有划时代意义的分界线&#xff0c;由著名地理学家胡焕庸于1935年提出。这条线从黑龙江省的瑷珲&#xff08;现黑河市&#xff09;延伸至…

json-server总结

Json-server 是一个专门用于模拟 RESTful API 的工具&#xff0c;它允许前端开发人员在不依赖后端 API 的情况下进行开发&#xff0c;通过本地搭建一个 JSON 服务来快速生成 REST API 风格的后端服务。 一、主要特点与功能 快速搭建&#xff1a;Json-server 使用 JSON 文件作…

HippoRAG如何从大脑获取线索以改进LLM检索

知识存储和检索正在成为大型语言模型(LLM)应用的重要组成部分。虽然检索增强生成(RAG)在该领域取得了巨大进步&#xff0c;但一些局限性仍然没有克服。 俄亥俄州立大学和斯坦福大学的研究团队推出了HippoRAG&#xff0c;这是一种创新性的检索框架&#xff0c;其设计理念源于人类…

数学建模美赛论文文档

目录 1. 摘要&#xff1a;1.1 阅读并理解题目1.2 背景介绍1.3 问题提出 2. 目录&#xff1a;2.1 引言&#xff08;Introduction&#xff09;2.2 假设与合理性说明&#xff08;Assumptions and Justifications&#xff09;2.3 符号说明&#xff08;Notations&#xff09;2.4 模型…

2.Date类型的请求参数

前端 <el-form-item label"结束日期" prop"endTime"><el-date-pickerv-model"dataForm.endTime"type"date"value-format"yyyy-MM-dd HH:mm:ss"placeholder"选择日期"></el-date-picker></el…

线下线上游戏电竞陪伴APP小程序H5同城线下约玩APP开发,语聊约玩平台搭建游戏陪玩APP源码

开发一款线下陪玩约玩APP的实际意义和在生活中的应用场景 1、满足社交需求:现代社会人们的社交圈往往受到时间、地点和其他限制的影响。线下陪玩约玩APP可以提供一个平台&#xff0c;让用户通过约玩的方式结识新朋友、扩大社交圈 2、解决孤独感:有些人由于工作忙碌、居住环境单…

论文阅读2-《Dynamic Multimodal Fusion》

摘要 &#xff08;DynMM&#xff09;&#xff0c;一种新的方法&#xff0c;自适应融合多模态数据和 d在推理过程中生成依赖于数据的前向路径。为此&#xff0c;我们提出了一种门控功能来提供基于多模态特征和一个的模态级或融合级决策提高计算效率的源感知损失函数。 细节 模…

185240-00G 同轴连接器

型号简介 185240-00G是Southwest Microwave的2.92 mm连接器。该连接器采用铍铜合金、工具钢和不锈钢等优质材料&#xff0c;并经过金镀层和钝化处理&#xff0c;确保其稳定可靠&#xff0c;经久耐用。它还兼容欧盟 RoHS 和 WEEE 指令&#xff0c;是一位环保使者&#xff0c;致力…

AI绘画Midjourney从入门到实战应用

大家好&#xff0c;我是爱编程的喵喵。双985硕士毕业&#xff0c;现担任全栈工程师一职&#xff0c;热衷于将数据思维应用到工作与生活中。从事机器学习以及相关的前后端开发工作。曾在阿里云、科大讯飞、CCF等比赛获得多次Top名次。现为CSDN博客专家、人工智能领域优质创作者。…

概率论习题

泊松分布习题 假设你在医院值班&#xff0c;每天需要安保人员出动的次数N~P(1),则关于任一天安保人员出动次数&#xff1a; A&#xff1a;出动一次的概率是多少 B&#xff1a;出动次数小于等于一次的概率为 C&#xff1a;出动次数小于一次的概率为 D&#xff1a;若随机事件发生…

C# 装饰器模式(Decorator Pattern)

装饰器模式动态地给一个对象添加一些额外的职责。就增加功能来说&#xff0c;装饰器模式相比生成子类更为灵活。 // 组件接口 public interface IComponent { void Operation(); } // 具体组件 public class ConcreteComponent : IComponent { public void Opera…

AI推荐系统落地的实现与应用

目录 一、推荐系统的基础二、推荐系统的设计与实现三、推荐系统落地的挑战四、推荐系统的成功案例五、结语 AI推荐系统近年来在各个领域得到了广泛应用&#xff0c;从电子商务到娱乐&#xff0c;再到个性化学习平台。它们通过分析用户行为、偏好和历史数据&#xff0c;为用户提…

【NOI-题解】1108 - 正整数N转换成一个二进制数1290 - 二进制转换十进制1386 - 小丽找半个回文数1405 - 小丽找潜在的素数?

文章目录 一、前言二、问题问题&#xff1a;1108 - 正整数N转换成一个二进制数问题&#xff1a;1290 - 二进制转换十进制问题&#xff1a;1386 - 小丽找半个回文数问题&#xff1a;1405 - 小丽找潜在的素数&#xff1f; 三、感谢 一、前言 本章节主要对进制转换的题目进行讲解…

ubuntu下aarch64-linux-gnu(交叉编译) gdb/gdbserver

ubuntu下aarch64-linux-gnu(交叉编译) gdb/gdbserver gdb是一款开源的、强大的、跨平台的程序调试工具。主要用于在程序运行时对程序进行控制和检查&#xff0c;如设置断点、单步执行、查看变量值、修改内存数据等&#xff0c;从而帮助开发者定位和修复代码中的错误。 gdbserve…

密态计算,大模型商用数据瓶颈的新解法?

大数据产业创新服务媒体 ——聚焦数据 改变商业 大模型迈向产业的深度应用&#xff0c;首要挑战是高质量数据供给和安全流通。正如在今年的世界人工智能大会上&#xff0c;产学研届多位专家达成的共识是&#xff0c;数据决定了AI能力的上限。 在实践中&#xff0c;行业大模型难…