Flink随笔 20241129 流数据处理:以生产线烤鸡为例理解 Flink

        流数据(streaming data)就像是一条永不停歇的生产线,源源不断地向前推进,带来新的数据。而 Apache Flink 就是这条生产线的核心,它负责对数据进行处理、分类、聚合和存储。为了更好地理解 Flink 的流处理,我们可以通过一个简单的类比来解释:流数据就像是流水线上的烤鸡,而 Flink 就是这条流水线上的工作部,负责对这些烤鸡进行加工、分拣、统计和存储。

下面,我们将通过这个类比一步步讲解 Flink 的核心概念,并展示如何在 Flink 中实现这些操作。

类比:生产线的烤鸡

        想象一下,你在一个生产线的末端,看到源源不断的烤鸡从流水线上流过。你需要对这些鸡进行以下操作:

  1. 去掉烤糊的鸡肉(过滤):生产线上有些鸡被烤得过焦,不能食用,需要将其剔除。
  2. 分拣出火鸡还是普通鸡肉(映射):在生产线的下一步,你需要把鸡肉分成不同的类别,比如火鸡和普通鸡肉。
  3. 统计有多少只火鸡,多少只普通鸡肉(聚合):你需要统计每种鸡肉的数量,以便后续处理。
  4. 将鸡肉存储(存储):最后,所有的合格鸡肉需要被存储,或者被送到市场。

        这些操作正是 Flink 中流数据处理的核心功能。下面我们将逐一介绍如何在 Flink 中实现这些步骤。

1. 去掉烤糊的鸡肉(过滤)

        在生产线的第一步,你需要对烤鸡进行筛选,去掉那些已经烤糊或者质量不合格的鸡肉。在 Flink 中,这就是 过滤操作(Filter)。Flink 提供了 filter() 函数,可以根据条件筛选出符合要求的数据。

        例如,假设我们有一个数据流,其中包含不同类型的鸡肉,而我们只关心健康的鸡肉。你可以通过过滤操作去除那些不符合条件的数据。

DataStream<String> chickenStream = env.fromElements("Turkey", "Burnt Chicken", "Chicken", "Turkey");DataStream<String> validChickenStream = chickenStream.filter(chicken -> !chicken.equals("Burnt Chicken"));
validChickenStream.print();
2. 分拣出火鸡还是普通鸡肉(映射)

        在生产线的第二步,你需要把鸡肉分成火鸡和普通鸡肉两类,这就是 映射操作(Map)。在 Flink 中,map() 函数可以将输入数据转换成不同的输出数据。在本例中,我们可以将鸡肉类型映射成不同的标签,例如“火鸡”和“普通鸡肉”。

DataStream<String> sortedChickenStream = chickenStream.map(new MapFunction<String, String>() {@Overridepublic String map(String value) throws Exception {return value.equals("Turkey") ? "Turkey" : "Chicken";}});sortedChickenStream.print();
3. 统计有多少只火鸡,多少只普通鸡肉(聚合)

        接下来,你需要统计每种类型的鸡肉有多少只,这就是 聚合操作(Aggregation)。在 Flink 中,keyBy()reduce() 函数常常一起使用,按指定的键(比如鸡肉类型)对数据进行分组,并对每个组进行聚合计算。

        例如,你可以按鸡肉类型分组,并统计每种鸡肉的数量:

DataStream<Tuple2<String, Integer>> mappedStream = chickenStream.map(chicken -> new Tuple2<>(chicken, 1));  // 每个鸡肉事件映射为类型和数量的元组DataStream<Tuple2<String, Integer>> aggregatedStream = mappedStream.keyBy(0)  // 按鸡肉类型分组.reduce(new ReduceFunction<Tuple2<String, Integer>>() {@Overridepublic Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception {return new Tuple2<>(value1.f0, value1.f1 + value2.f1);  // 累加数量}});aggregatedStream.print();
4. 将鸡肉存储(存储)

        在生产线的最后一步,合格的鸡肉需要被存储或发送到市场。Flink 提供了多种 Sink 操作,可以将处理结果输出到外部系统,如 Kafka、MySQL、Elasticsearch 或 HDFS。你可以根据需要将聚合结果写入数据库或消息队列,便于后续的处理和存储。

        例如,将处理后的数据输出到 Kafka:

aggregatedStream.addSink(new FlinkKafkaProducer<>("kafka-broker",  // Kafka 服务器地址"chicken-topic",  // Kafka 主题new SimpleStringSchema()  // 数据序列化方式
));
Flink 流数据处理的完整示例

        通过上述步骤,我们可以编写一个完整的 Flink 程序,来实现流数据的处理,类似于生产线上的烤鸡加工过程。以下是一个简单的例子,展示如何使用 Flink 进行数据流的过滤、映射、聚合和存储:

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;public class ChickenProcessing {public static void main(String[] args) throws Exception {// 1. 设置流处理环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 2. 输入数据流:每个事件包含一个鸡肉的类型DataStream<String> chickenStream = env.fromElements("Turkey", "Chicken", "Turkey", "Chicken", "Turkey");// 3. 映射:将每个事件映射为一个元组(鸡肉类型, 数量)DataStream<Tuple2<String, Integer>> mappedStream = chickenStream.map(new MapFunction<String, Tuple2<String, Integer>>() {@Overridepublic Tuple2<String, Integer> map(String value) throws Exception {return new Tuple2<>(value, 1);  // 每个事件都标记为1个鸡肉}});// 4. 聚合:按类型统计每种鸡肉的数量DataStream<Tuple2<String, Integer>> aggregatedStream = mappedStream.keyBy(0)  // 按鸡肉类型分组.reduce(new ReduceFunction<Tuple2<String, Integer>>() {@Overridepublic Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception {return new Tuple2<>(value1.f0, value1.f1 + value2.f1);  // 累加数量}});// 5. 输出结果到控制台aggregatedStream.print();// 6. 启动流处理作业env.execute("Chicken Processing");}
}

输出示例

(Turkey, 3)
(Chicken, 2)
结论

        通过上面的例子,我们可以看到 Flink 在流数据处理中的应用。它不仅可以对数据进行过滤、分类和聚合,还能够将处理结果存储或发送到其他系统。流数据处理就像生产线上的烤鸡加工一样,Flink 作为流水线的核心,不仅对数据进行有效处理,还确保了高效、实时地传递和存储处理结果。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/62374.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Next.js -服务端组件如何渲染

#题引&#xff1a;我认为跟着官方文档学习不会走歪路 服务器组件渲染到客户端发生了什么&#xff1f; 请求到达服务器 用户在浏览器中请求一个页面。 Next.js 服务器接收到这个请求&#xff0c;并根据路由找到相应的页面组件。服务器组件的渲染 Next.js 识别出请求的页面包含…

架构03-事务处理

零、文章目录 架构03-事务处理 1、本地事务实现原子性和持久性 &#xff08;1&#xff09;事务类型 **本地事务&#xff1a;**单个服务、单个数据源**全局事务&#xff1a;**单个服务、多个数据源**共享事务&#xff1a;**多个服务、单个数据源**分布式事务&#xff1a;**多…

基于深度学习的手势识别算法

基于深度学习的手势识别算法 概述算法原理核心逻辑效果演示使用方式参考文献 概述 本文基于论文 [Simple Baselines for Human Pose Estimation and Tracking[1]](ECCV 2018 Open Access Repository (thecvf.com)) 实现手部姿态估计。 手部姿态估计是从图像或视频帧集中找到手…

硬件基础22 反馈放大电路

目录 一、反馈的基本概念与分类 1、什么是反馈 2、直流反馈与交流反馈 3、正反馈与负反馈 4、串联反馈与并联反馈 5、电压反馈与电流反馈 二、负反馈四种组态 1、电压串联负反馈放大电路 2、电压并联负反馈放大电路 3、电流串联负反馈放大电路 4、电流并联负反馈放大…

亚马逊开发视频人工智能模型,The Information 报道

根据《The Information》周三的报道&#xff0c;电子商务巨头亚马逊&#xff08;AMZN&#xff09;已开发出一种新的生成式人工智能&#xff08;AI&#xff09;&#xff0c;不仅能处理文本&#xff0c;还能处理图片和视频&#xff0c;从而减少对人工智能初创公司Anthropic的依赖…

Spring Boot教程之十二: Spring – RestTemplate

Spring – RestTemplate 由于流量大和快速访问服务&#xff0c;REST API越来越受欢迎。REST 不是一种协议或标准方式&#xff0c;而是一组架构约束。它也被称为 RESTful API 或 Web API。当发出客户端请求时&#xff0c;它只是通过 HTTP 将资源状态的表示传输给请求者或端点。传…

通过 JNI 实现 Java 与 Rust 的 Channel 消息传递

做纯粹的自己。“你要搞清楚自己人生的剧本——不是父母的续集&#xff0c;不是子女的前传&#xff0c;更不是朋友的外篇。对待生命你不妨再大胆一点&#xff0c;因为你好歹要失去它。如果这世上真有奇迹&#xff0c;那只是努力的另一个名字”。 一、crossbeam_channel 参考 cr…

CSS笔记(一)炉石传说卡牌设计1

目标 我要通过html实现一张炉石传说的卡牌设计 问题 其中必须就要考虑到各个元素的摆放&#xff0c;形状的调整来达到满意的效果。通过这个联系来熟悉一下CSS的基本操作。 1️⃣ 基本概念 在CSS里面有行元素&#xff0c;块元素&#xff0c;内联元素&#xff0c;常见的行元…

GAMES101:现代计算机图形学入门-笔记-09

久违的101图形学回归咯 今天的话题应该是比较轻松的&#xff1a;聊一聊在渲染中比较先进的topics Advanced Light Transport 首先是介绍一系列比较先进的光线传播方法&#xff0c;有无偏的如BDPT&#xff08;双向路径追踪&#xff09;&#xff0c;MLT&#xff08;梅特罗波利斯…

Oracle 数据库 IDENTITY 列

IDENTITY列是Oracle数据库12c推出的新特性。之所以叫IDENTITY列&#xff0c;是由于其支持ANSI SQL 关键字 IDENTITY&#xff0c;其内部实现还是使用SEQUENCE。 不过推出这个新语法也是应该的&#xff0c;毕竟MyQL已经有 AUTO_INCREMENT列&#xff0c;而SQL Server也已经有IDENT…

前端学习笔记之文件下载(1.0)

因为要用到这样一个场景&#xff0c;需要下载系统的使用教程&#xff0c;所以在前端项目中就提供了一个能够下载系统教程的一个按钮&#xff0c;供使用者进行下载。 所以就试着写一下这个功能&#xff0c;以一个demo的形式进行演示&#xff0c;在学习的过程中也发现了中文路径…

【阅读记录-章节4】Build a Large Language Model (From Scratch)

文章目录 4. Implementing a GPT model from scratch to generate text4.1 Coding an LLM architecture4.1.1 配置小型 GPT-2 模型4.1.2 DummyGPTModel代码示例4.1.3 准备输入数据并初始化 GPT 模型4.1.4 初始化并运行 GPT 模型 4.2 Normalizing activations with layer normal…

Python PDF转JPG图片小工具

Python PDF转JPG图片小工具 1.简介 将单个pdf装换成jpg格式图片 Tip: 1、软件窗口默认最前端&#xff0c;不支持调整窗口大小&#xff1b; 2、可通过按钮选择PDF文件&#xff0c;也可以直接拖拽文件到窗口&#xff1b; 3、转换质量有5个档位&#xff0c;&#xff08;0.25&a…

使用SOAtest进行功能回归测试

持续集成是将所有开发人员的工作副本合并到共享的主线上。这个过程使软件开发对开发人员来说更容易访问、更快、风险更小。 阅读这篇文章&#xff0c;让我们了解如何配置Parasoft SOAtest作为持续集成过程的一部分&#xff0c;来执行功能测试和回归测试。我们将介绍如何使用主…

ais_server 学习笔记

ais_server 学习笔记 一前序二、ais init1、时序图如下2. 初始化一共分为以下几个重要步骤&#xff1a;2.1.1、在ais_server中启动main函数&#xff0c;然后创建AisEngine&#xff0c;接着初始化AisEngine2.1.2、解析/var/camera_config.xml 文件&#xff0c;获取相关配置参数。…

L1G3000 任务-浦语提示词工程

基础任务 (完成此任务即完成闯关) 背景问题&#xff1a;近期相关研究指出&#xff0c;在处理特定文本分析任务时&#xff0c;语言模型的表现有时会遇到挑战&#xff0c;例如在分析单词内部的具体字母数量时可能会出现错误。任务要求&#xff1a;利用对提示词的精确设计&#xf…

Unity之一键创建自定义Package包

内容将会持续更新&#xff0c;有错误的地方欢迎指正&#xff0c;谢谢! Unity之一键创建自定义Package包 TechX 坚持将创新的科技带给世界&#xff01; 拥有更好的学习体验 —— 不断努力&#xff0c;不断进步&#xff0c;不断探索 TechX —— 心探索、心进取&#xff01; …

python的Flask框架使用

python的Flask框架使用 python环境搭建conda安装python自带的虚拟环境&#xff1a;venv python环境搭建 官网地址 点击downloads 选择你需要的版本&#xff0c;我这里使用的3.12.6 选择Windows installer (64-bit) 选择自定义安装&#xff0c;勾选以管理员权限安装&#xff0…

网络原理(一)—— http

什么是 http http 是一个应用层协议&#xff0c;全称为“超文本传输协议”。 http 自 1991 年诞生&#xff0c;目前已经发展为最主流使用的一种应用层协议。 HTTP 往往基于传输层的 TCP 协议实现的&#xff0c;例如 http1.0&#xff0c;http1.0&#xff0c;http2.0 http3 是…

103.【C语言】数据结构之建堆的时间复杂度分析

1.向下调整的时间复杂度 推导 设树高为h 发现如下规律 按最坏的情况考虑(即调整次数最多) 第1层,有个节点,最多向上调整h-1次 第2层,有个节点,最多向上调整h-2次 第3层,有个节点,最多向上调整h-3次 第4层,有个节点,最多向上调整h-4次 ... 第h-1层,有个节点,最多向上调整1次 第…