Flink作业骨架结构

前言

Flink 是大数据流计算引擎,开发者通过程序语言开发一个 Flink 作业,然后提交这个作业到服务端并执行,以完成对大数据流的处理。

Flink 作业有一个基本骨架,再复杂的 Flink 作业都离不开这个基本骨架,了解作业的基本骨架有助于我们更快上手 Flink 作业的开发。

骨架结构

Flink 作业的基本骨架包含三个部分:

  • 创建Flink执行环境
  • 定义数据处理逻辑
  • 提交并执行Flink作业

创建Flink执行环境

Flink 执行环境被封装成 StreamExecutionEnvironment 对象,通过该对象,我们可以给Flink作业添加数据源、添加数据处理和输出逻辑、配置Flink作业的并行度、故障重启策略等参数。

定义数据处理逻辑

Flink是大数据流计算引擎,本质上是对大数据的计算处理,那么首先要解决的问题是:数据从哪儿来?

解决这个问题,就是给Flink作业定义数据源,数据源被抽象成了 SourceFunction 接口,实现该接口重写 run 方法即可接收数据。

有了数据,接下来就是声明要对这些数据做哪些处理?对数据的处理被抽象成了 ProcessFunction 接口,实现该接口重写 processElement 方法即可处理一条条数据。常见的数据处理操作有:过滤、转换、聚合等。

数据计算以后,还需要把计算结果给保存下来,所以最后还需要一步数据汇 sink 操作,把计算结果保存到数据存储引擎,例如写入MySQL、Redis等存储引擎。

提交并执行Flink作业

到目前为止,我们只描述了Flink的数据处理逻辑,Flink作业不手动触发的前提下,是不会自动执行的。

所以最后,如果要让上述流程跑起来,还需要手动提交并触发Flink作业。开发环境下,可以直接在本地提交并执行,生产环境一般是提交到Flink集群执行。

执行Flink作业,对应的是 StreamExecutionEnvironment#execute 方法。

字数统计作业

这里以一个统计字数作业作为示例,它被称作是 Flink 版的 Hello World,虽然简单,但是很好的体现了 Flink 作业的流程。

如下代码所示,我们先是创建了Flink作业执行环境对象 StreamExecutionEnvironment,然后定义了数据源监听本地的8888端口读取文本数据。紧接着定义数据处理逻辑,先是过滤操作,只有接收到的字符串是单个英文字母时才处理,然后把单个英文字母映射为WordCount对象,用于统计次数。然后是根据英文字母分组,相同的字母会被分到同一组,最后统计所有相同字母的 count 字段,得到的结果就是字母出现的次数。最终的 sink 操作只是简单的把结果输出到控制台。

public class WordCountJob {public static void main(String[] args) throws Exception {// 创建Flink执行环境StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();// 1. 定义数据源 从Socket读取数据environment.socketTextStream("127.0.0.1", 8888)// 2. 定义数据处理逻辑// 2.1 过滤 接收到的数据必须是英文字母.filter(e -> e.length() == 1 && Character.isLetter(e.codePointAt(0)))// 2.2 映射 单个字符映射成WordCount对象.map(e -> new WordCount(e.toUpperCase(), 1))// 2.3 分组 相同字母分为一组.keyBy(WordCount::getWord)// 2.4 分组后相同字母聚合求和.sum("count")// 3. 定义数据汇sink 这里输出到控制台.addSink(new SinkFunction<WordCount>() {@Overridepublic void invoke(WordCount value, Context context) throws Exception {System.err.println(value.word + ":" + value.count);}});// 执行Flink作业environment.execute();}@Data@AllArgsConstructor@NoArgsConstructorpublic static class WordCount {public String word;public int count;}
}

Flink 作业启动时,就会去连接本地的8888端口,如果连接不上,会报错退出。所以启动作业前,需要先保证8888端口开启。所以Mac系统下先打开8888端口:

nc -l 8888

然后启动 Flink 作业,此时控制台什么也没有,因为数据源没有数据,Flink也就没法处理。然后我们往 8888 端口写点东西

nc -l 8888
1
a
b
c
a

控制台输出

A:1
B:1
C:1
A:2

分析一下结果,第一次发出的“1”,因为不是英文字母,所以会被filter算子过滤掉。发出第一个“a”时,因为符合条件,所以会被后续所有算子处理,最终到达sink算法,输出到控制台。发出第二个“a"时,因为之前已经有一个a了,所以sum算子求和结果是2。

尾巴

Flink作业的基本骨架包含三部分:创建作业执行环境、定义数据处理逻辑、提交并启动作业。执行环境主要用来对Flink作业进行一些设置,例如 故障重启策略、并行度等参数。定义数据处理逻辑是我们开发Flink作业最重要的部分,首先要定义数据源,告诉Flink数据从哪里来,然后声明要对数据做哪些处理,最终计算结果要保存到哪里等。Flink作业是懒执行的,前面的这些操作都只是对Flink作业的一个声明和描述,必须调用execute方法作业才会真正跑起来。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/882153.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

spring boot itext7 修改生成文档的作者、制作者、标题,并且读取相关的信息。

1、官方的example文件&#xff1a;iText GitHub itext-java-7.2.5\kernel\src\test\java\com\itextpdf\kernel\pdf\PdfStampingTest.java 2、修改代码&#xff1a; Testpublic void stamping1() throws IOException {String filename1 destinationFolder "stamping1_…

期货考核系统部署/配资net源码

关于期货考核系统部署与配资NET源码的问题&#xff0c;以下是一些相关的信息和建议&#xff1a; 一、期货考核系统部署 期货考核系统的部署是一个复杂的过程&#xff0c;涉及需求分析、系统设计、开发、测试、部署等多个环节。以下是一些关键步骤&#xff1a; 需求分析&…

如何替换OCP节点(二):使用 antman脚本 | OceanBase应用实践

前言&#xff1a; OceanBase Cloud Platform&#xff08;简称OCP&#xff09;&#xff0c;是 OceanBase数据库的专属企业级数据库管理平台。 在实际生产环境中&#xff0c;OCP的安装通常是第一步&#xff0c;先搭建OCP平台&#xff0c;进而依赖OCP来创建、管理和监控我们的生…

营销邮件软件:提升邮件营销效率必备工具!

营销邮件软件选择技巧&#xff1f;免费高效的邮件营销软件推荐&#xff1f; 如何高效地管理和优化邮件营销活动成为了企业面临的一大挑战。营销邮件软件成为提升邮件营销效率的必备工具。MailBing将深入探讨营销邮件软件的功能、优势以及如何选择合适的工具。 营销邮件软件&a…

element checkbox选框和文字分开点击---更改一列checkbox的顺序(进阶版)

选框和文字分开点击&#xff0c;找了很多&#xff0c;没有我想要的效果&#xff0c;但也借鉴了一下&#xff0c;实现了&#xff0c;记录一下 样式看起来倒是没多大区别&#xff0c;需求&#xff1a; 勾选了选框才可以点击文字 &#xff0c;一次只能点击一条数据&#xff0c;点…

Parameter-Efficient Fine-Tuning for Large Models: A Comprehensive Survey阅读笔记

Parameter-Efficient Fine-Tuning for Large Models: A Comprehensive Survey 综述阅读笔记 仅记录个人比较感兴趣的部分 基本知识 PEFT的三种分类&#xff1a;additive, selective, reparameterized, and hybrid fine-tuning selective fine-tuning 不需要任何额外的参数&am…

Axure横向菜单高级交互

亲爱的小伙伴&#xff0c;在您浏览之前&#xff0c;烦请关注一下&#xff0c;在此深表感谢&#xff01; 课程主题&#xff1a;横向菜单高级交互 主要内容&#xff1a;横向菜单左右拖动、选中效果 应用场景&#xff1a;app横向菜单、pc后台动态区域 案例展示&#xff1a; 演…

SpringBoot技术的车辆管理流程自动化

4系统概要设计 4.1概述 本系统采用B/S结构(Browser/Server,浏览器/服务器结构)和基于Web服务两种模式&#xff0c;是一个适用于Internet环境下的模型结构。只要用户能连上Internet,便可以在任何时间、任何地点使用。系统工作原理图如图4-1所示&#xff1a; 图4-1系统工作原理…

uniapp-实现天地图以及行政区划图层覆盖

前言&#xff1a; 在uniapp中&#xff0c;难免会遇到使用地图展示的功能&#xff0c;但是百度谷歌这些收费的显然对于大部分开源节流的开发者是不愿意接受的&#xff0c;所以天地图则是最佳选择。 此篇文章&#xff0c;详细的实现地图展示功能&#xff0c;并且可以自定义容器宽…

探索 Jupyter 笔记本转换的无限可能:nbconvert 库的神秘面纱

文章目录 探索 Jupyter 笔记本转换的无限可能&#xff1a;nbconvert 库的神秘面纱背景&#xff1a;为何选择 nbconvert&#xff1f;库简介&#xff1a;nbconvert 是什么&#xff1f;安装指南&#xff1a;如何安装 nbconvert&#xff1f;函数用法&#xff1a;简单函数示例应用场…

安装vue发生异常:npm ERR! the command again as root/Administrator.

一、异常 npm ERR! The operation was rejected by your operating system. npm ERR! Its possible that the file was already in use (by a text editor or antivirus), npm ERR! or that you lack permissions to access it. npm ERR! npm ERR! If you believe this might b…

安卓开发中轮播图和其指示器的设置

在安卓开发中&#xff0c;轮播图&#xff08;Carousel&#xff09;是一种常见的UI组件&#xff0c;用于展示一系列图片或内容&#xff0c;用户可以左右滑动来切换不同的视图。轮播图通常用于展示广告、新闻、产品图片等。 轮播图的指示器&#xff08;Indicator&#xff09;则是…

大模型生图安全疫苗注入赛题解析(DataWhale组队学习)

引言 大家好&#xff0c;我是GISer Liu&#x1f601;&#xff0c;一名热爱AI技术的GIS开发者。本系列文章是我跟随DataWhale 2024年10月实践赛的大模型生图安全疫苗注入赛道&#xff1b;本文主要整理本次赛事的基本流程和优化方法。&#x1f495;&#x1f495;&#x1f60a; 一…

IEC104规约的秘密之九----链路层和应用层

104规约从TCP往上&#xff0c;分成链路层和应用层。 如图&#xff0c;APCI就是链路层&#xff0c;ASDU的就是应用层 我们看到报文都是68打头的&#xff0c;因为应用层报文也要交给链路层发送&#xff0c;链路层增加了开头的6个字节再进行发送。 完全用于链路层的报文每帧都只有…

好用,易用,高效,稳定 基于opencv 的 图像模板匹配 - python 实现

在定位、搜索固定界面图块时&#xff0c;经常用到模板匹配&#xff0c;opencv自带的图像模板匹配好用&#xff0c;易用&#xff0c;高效&#xff0c;稳定&#xff0c;且有多种匹配计算方式。 具体示例如下&#xff1a; 模板图&#xff1a; 待搜索图&#xff1a; 具体实现代码…

FreeRTOS - 任务管理

在学习FreeRTOS过程中&#xff0c;结合韦东山-FreeRTOS手册和视频、野火-FreeRTOS内核实现与应用开发、及网上查找的其他资源&#xff0c;整理了该篇文章。如有内容理解不正确之处&#xff0c;欢迎大家指出&#xff0c;共同进步。 参考&#xff1a;https://rtos.100ask.net/zh/…

SpringColoud GateWay 核心组件

优质博文&#xff1a;IT-BLOG-CN 【1】Route路由&#xff1a; Gateway的基本构建模块&#xff0c;它由ID、目标URL、断言集合和过滤器集合组成。如果聚合断言结果为真&#xff0c;则匹配到该路由。 Route路由-动态路由实现原理&#xff1a; 配置变化Apollo 服务地址实例变化…

H3C设备连接方式

Console线本地连接 协议Serial&#xff0c;接口com口&#xff0c;波特率9600&#xff08;设备管理器中查看com口&#xff09; 适用于设备初次调试 使用Telnet远程访问 适用于设备上架配置好后的维护管理 使用SSH远程访问 数据传输过程加密&#xff0c;安全的远程访问

洞察数字化营销的本质

数字化营销&#xff0c;即借助互联网、移动互联网、社交媒体等数字技术与渠道实现营销目标。涵盖市场调研、品牌推广、产品销售到客户服务全过程。 其特点显著。精准定位是一大优势&#xff0c;利用大数据分析和人工智能&#xff0c;深入了解客户需求、兴趣和行为&#xff0c;精…

AdmX_new

0x00前言 因为环境问题&#xff0c;此次靶场都放在vm上。都为NAT模式。 靶机地址: https://download.vulnhub.com/admx/AdmX_new.7z 需要找到两个flag文件。 0x01信息搜集 搜集IP 确认目标IP为172.16.8.131&#xff0c;进一步信息搜集 获取端口开放情况&#xff0c;版本信…