Flink随笔 20241129 流数据处理:以生产线烤鸡为例理解 Flink

        流数据(streaming data)就像是一条永不停歇的生产线,源源不断地向前推进,带来新的数据。而 Apache Flink 就是这条生产线的核心,它负责对数据进行处理、分类、聚合和存储。为了更好地理解 Flink 的流处理,我们可以通过一个简单的类比来解释:流数据就像是流水线上的烤鸡,而 Flink 就是这条流水线上的工作部,负责对这些烤鸡进行加工、分拣、统计和存储。

下面,我们将通过这个类比一步步讲解 Flink 的核心概念,并展示如何在 Flink 中实现这些操作。

类比:生产线的烤鸡

        想象一下,你在一个生产线的末端,看到源源不断的烤鸡从流水线上流过。你需要对这些鸡进行以下操作:

  1. 去掉烤糊的鸡肉(过滤):生产线上有些鸡被烤得过焦,不能食用,需要将其剔除。
  2. 分拣出火鸡还是普通鸡肉(映射):在生产线的下一步,你需要把鸡肉分成不同的类别,比如火鸡和普通鸡肉。
  3. 统计有多少只火鸡,多少只普通鸡肉(聚合):你需要统计每种鸡肉的数量,以便后续处理。
  4. 将鸡肉存储(存储):最后,所有的合格鸡肉需要被存储,或者被送到市场。

        这些操作正是 Flink 中流数据处理的核心功能。下面我们将逐一介绍如何在 Flink 中实现这些步骤。

1. 去掉烤糊的鸡肉(过滤)

        在生产线的第一步,你需要对烤鸡进行筛选,去掉那些已经烤糊或者质量不合格的鸡肉。在 Flink 中,这就是 过滤操作(Filter)。Flink 提供了 filter() 函数,可以根据条件筛选出符合要求的数据。

        例如,假设我们有一个数据流,其中包含不同类型的鸡肉,而我们只关心健康的鸡肉。你可以通过过滤操作去除那些不符合条件的数据。

DataStream<String> chickenStream = env.fromElements("Turkey", "Burnt Chicken", "Chicken", "Turkey");DataStream<String> validChickenStream = chickenStream.filter(chicken -> !chicken.equals("Burnt Chicken"));
validChickenStream.print();
2. 分拣出火鸡还是普通鸡肉(映射)

        在生产线的第二步,你需要把鸡肉分成火鸡和普通鸡肉两类,这就是 映射操作(Map)。在 Flink 中,map() 函数可以将输入数据转换成不同的输出数据。在本例中,我们可以将鸡肉类型映射成不同的标签,例如“火鸡”和“普通鸡肉”。

DataStream<String> sortedChickenStream = chickenStream.map(new MapFunction<String, String>() {@Overridepublic String map(String value) throws Exception {return value.equals("Turkey") ? "Turkey" : "Chicken";}});sortedChickenStream.print();
3. 统计有多少只火鸡,多少只普通鸡肉(聚合)

        接下来,你需要统计每种类型的鸡肉有多少只,这就是 聚合操作(Aggregation)。在 Flink 中,keyBy()reduce() 函数常常一起使用,按指定的键(比如鸡肉类型)对数据进行分组,并对每个组进行聚合计算。

        例如,你可以按鸡肉类型分组,并统计每种鸡肉的数量:

DataStream<Tuple2<String, Integer>> mappedStream = chickenStream.map(chicken -> new Tuple2<>(chicken, 1));  // 每个鸡肉事件映射为类型和数量的元组DataStream<Tuple2<String, Integer>> aggregatedStream = mappedStream.keyBy(0)  // 按鸡肉类型分组.reduce(new ReduceFunction<Tuple2<String, Integer>>() {@Overridepublic Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception {return new Tuple2<>(value1.f0, value1.f1 + value2.f1);  // 累加数量}});aggregatedStream.print();
4. 将鸡肉存储(存储)

        在生产线的最后一步,合格的鸡肉需要被存储或发送到市场。Flink 提供了多种 Sink 操作,可以将处理结果输出到外部系统,如 Kafka、MySQL、Elasticsearch 或 HDFS。你可以根据需要将聚合结果写入数据库或消息队列,便于后续的处理和存储。

        例如,将处理后的数据输出到 Kafka:

aggregatedStream.addSink(new FlinkKafkaProducer<>("kafka-broker",  // Kafka 服务器地址"chicken-topic",  // Kafka 主题new SimpleStringSchema()  // 数据序列化方式
));
Flink 流数据处理的完整示例

        通过上述步骤,我们可以编写一个完整的 Flink 程序,来实现流数据的处理,类似于生产线上的烤鸡加工过程。以下是一个简单的例子,展示如何使用 Flink 进行数据流的过滤、映射、聚合和存储:

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;public class ChickenProcessing {public static void main(String[] args) throws Exception {// 1. 设置流处理环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 2. 输入数据流:每个事件包含一个鸡肉的类型DataStream<String> chickenStream = env.fromElements("Turkey", "Chicken", "Turkey", "Chicken", "Turkey");// 3. 映射:将每个事件映射为一个元组(鸡肉类型, 数量)DataStream<Tuple2<String, Integer>> mappedStream = chickenStream.map(new MapFunction<String, Tuple2<String, Integer>>() {@Overridepublic Tuple2<String, Integer> map(String value) throws Exception {return new Tuple2<>(value, 1);  // 每个事件都标记为1个鸡肉}});// 4. 聚合:按类型统计每种鸡肉的数量DataStream<Tuple2<String, Integer>> aggregatedStream = mappedStream.keyBy(0)  // 按鸡肉类型分组.reduce(new ReduceFunction<Tuple2<String, Integer>>() {@Overridepublic Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception {return new Tuple2<>(value1.f0, value1.f1 + value2.f1);  // 累加数量}});// 5. 输出结果到控制台aggregatedStream.print();// 6. 启动流处理作业env.execute("Chicken Processing");}
}

输出示例

(Turkey, 3)
(Chicken, 2)
结论

        通过上面的例子,我们可以看到 Flink 在流数据处理中的应用。它不仅可以对数据进行过滤、分类和聚合,还能够将处理结果存储或发送到其他系统。流数据处理就像生产线上的烤鸡加工一样,Flink 作为流水线的核心,不仅对数据进行有效处理,还确保了高效、实时地传递和存储处理结果。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/62374.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Langchain 实现 RAG

RAG 实现包括三部分,文档向量化、相似度搜索和大模型回答,本文将使用 LangChain 进行 RAG 的实现。RAG 中最重要的是向量,向量模型的好坏直接反映到最终结果的好坏,如果不能搜索到相对准确的文档,RAG 就没有起到该有的效果。文章将分为两部分,首先是向量索引以及搜索,然…

Next.js -服务端组件如何渲染

#题引&#xff1a;我认为跟着官方文档学习不会走歪路 服务器组件渲染到客户端发生了什么&#xff1f; 请求到达服务器 用户在浏览器中请求一个页面。 Next.js 服务器接收到这个请求&#xff0c;并根据路由找到相应的页面组件。服务器组件的渲染 Next.js 识别出请求的页面包含…

如何构建一个高效安全的图书管理系统

文章目录 技术栈功能需求实现步骤1. 准备开发环境2. 创建项目结构3. 配置数据库4. 创建实体类5. 创建仓库接口6. 创建服务类7. 创建控制器8. 创建前端页面9. 运行项目 技术栈 前端&#xff1a;HTML5、CSS3、JavaScript后端&#xff1a;Java&#xff08;Spring Boot框架&#x…

MongoDB注入攻击测试与防御技术深度解析

MongoDB注入攻击测试与防御技术深度解析 随着NoSQL数据库的兴起&#xff0c;MongoDB作为其中的佼佼者&#xff0c;因其灵活的数据模型和强大的查询能力&#xff0c;受到了众多开发者的青睐。然而&#xff0c;与任何技术一样&#xff0c;MongoDB也面临着安全威胁&#xff0c;其…

架构03-事务处理

零、文章目录 架构03-事务处理 1、本地事务实现原子性和持久性 &#xff08;1&#xff09;事务类型 **本地事务&#xff1a;**单个服务、单个数据源**全局事务&#xff1a;**单个服务、多个数据源**共享事务&#xff1a;**多个服务、单个数据源**分布式事务&#xff1a;**多…

基于深度学习的手势识别算法

基于深度学习的手势识别算法 概述算法原理核心逻辑效果演示使用方式参考文献 概述 本文基于论文 [Simple Baselines for Human Pose Estimation and Tracking[1]](ECCV 2018 Open Access Repository (thecvf.com)) 实现手部姿态估计。 手部姿态估计是从图像或视频帧集中找到手…

硬件基础22 反馈放大电路

目录 一、反馈的基本概念与分类 1、什么是反馈 2、直流反馈与交流反馈 3、正反馈与负反馈 4、串联反馈与并联反馈 5、电压反馈与电流反馈 二、负反馈四种组态 1、电压串联负反馈放大电路 2、电压并联负反馈放大电路 3、电流串联负反馈放大电路 4、电流并联负反馈放大…

亚马逊开发视频人工智能模型,The Information 报道

根据《The Information》周三的报道&#xff0c;电子商务巨头亚马逊&#xff08;AMZN&#xff09;已开发出一种新的生成式人工智能&#xff08;AI&#xff09;&#xff0c;不仅能处理文本&#xff0c;还能处理图片和视频&#xff0c;从而减少对人工智能初创公司Anthropic的依赖…

Spring Boot教程之十二: Spring – RestTemplate

Spring – RestTemplate 由于流量大和快速访问服务&#xff0c;REST API越来越受欢迎。REST 不是一种协议或标准方式&#xff0c;而是一组架构约束。它也被称为 RESTful API 或 Web API。当发出客户端请求时&#xff0c;它只是通过 HTTP 将资源状态的表示传输给请求者或端点。传…

el-table 根据屏幕大小 动态调整max-height 的值

<template><div><p>窗口高度&#xff1a;{{ windowHeight }} px</p></div> </template><script> export default {data() {return {// 下面的 -250 表示减去一些表单元素高度 这个值需要自己手动调整windowHeight: document.docume…

通过 JNI 实现 Java 与 Rust 的 Channel 消息传递

做纯粹的自己。“你要搞清楚自己人生的剧本——不是父母的续集&#xff0c;不是子女的前传&#xff0c;更不是朋友的外篇。对待生命你不妨再大胆一点&#xff0c;因为你好歹要失去它。如果这世上真有奇迹&#xff0c;那只是努力的另一个名字”。 一、crossbeam_channel 参考 cr…

SQL EXISTS 子句的深入解析

SQL EXISTS 子句的深入解析 引言 SQL&#xff08;Structured Query Language&#xff09;作为一种强大的数据库查询语言&#xff0c;广泛应用于各种数据库管理系统中。在SQL查询中&#xff0c;EXISTS子句是一种非常实用的工具&#xff0c;用于检查子查询中是否存在至少一行数…

Python 3 教程第22篇(数据结构)

Python3 数据结构 本章节我们主要结合前面所学的知识点来介绍Python数据结构。 列表 Python中列表是可变的&#xff0c;这是它区别于字符串和元组的最重要的特点&#xff0c;一句话概括即&#xff1a;列表可以修改&#xff0c;而字符串和元组不能。 以下是 Python 中列表的方…

构建现代Web应用:FastAPI、SQLModel、Vue 3与Axios的结合使用

FastAPI介绍 FastAPI是一个用于构建API的现代、快速&#xff08;高性能&#xff09;的Web框架&#xff0c;使用Python并基于标准的Python类型提示。它的关键特性包括快速性能、高效编码、减少bug、智能编辑器支持、简单易学、简短代码、健壮性以及标准化。FastAPI自动提供了交互…

CSS笔记(一)炉石传说卡牌设计1

目标 我要通过html实现一张炉石传说的卡牌设计 问题 其中必须就要考虑到各个元素的摆放&#xff0c;形状的调整来达到满意的效果。通过这个联系来熟悉一下CSS的基本操作。 1️⃣ 基本概念 在CSS里面有行元素&#xff0c;块元素&#xff0c;内联元素&#xff0c;常见的行元…

文本搜索程序(Qt)

头文件 #ifndef TEXTFINDER_H #define TEXTFINDER_H#include <QWidget> #include <QFileDialog> #include <QFile> #include <QTextEdit> #include <QLineEdit> #include <QTextStream> #include <QPushButton> #include <QMess…

GAMES101:现代计算机图形学入门-笔记-09

久违的101图形学回归咯 今天的话题应该是比较轻松的&#xff1a;聊一聊在渲染中比较先进的topics Advanced Light Transport 首先是介绍一系列比较先进的光线传播方法&#xff0c;有无偏的如BDPT&#xff08;双向路径追踪&#xff09;&#xff0c;MLT&#xff08;梅特罗波利斯…

【C++篇】排队的艺术:用生活场景讲解优先级队列的实现

文章目录 须知 &#x1f4ac; 欢迎讨论&#xff1a;如果你在学习过程中有任何问题或想法&#xff0c;欢迎在评论区留言&#xff0c;我们一起交流学习。你的支持是我继续创作的动力&#xff01; &#x1f44d; 点赞、收藏与分享&#xff1a;觉得这篇文章对你有帮助吗&#xff1…

【unity】WebSocket 与 EventSource 的区别

WebSocket 也是一种很好的选择&#xff0c;尤其是在需要进行 双向实时通信&#xff08;例如聊天应用、实时数据流等&#xff09;时。与 EventSource 不同&#xff0c;WebSocket 允许客户端和服务器之间建立一个持久的、全双工的通信通道。两者的区别和适用场景如下&#xff1a;…

Oracle 数据库 IDENTITY 列

IDENTITY列是Oracle数据库12c推出的新特性。之所以叫IDENTITY列&#xff0c;是由于其支持ANSI SQL 关键字 IDENTITY&#xff0c;其内部实现还是使用SEQUENCE。 不过推出这个新语法也是应该的&#xff0c;毕竟MyQL已经有 AUTO_INCREMENT列&#xff0c;而SQL Server也已经有IDENT…