大数据之-Flink学习笔记

Flink

Apache Flink — 数据流上的有状态计算。

Apache Flink 是一个框架和分布式处理引擎,用于对无界和有界数据流进行有状态计算处理。

任何类型的数据都以事件流的形式生成。信用卡交易、传感器测量、机器日志或网站或移动应用程序 2上的用户交互,所有这些数据都以流的形式生成。

数据可以作为无界有界流进行处理。

无界数据流:有定义流的开始,但是没有定义结束。会一直提供数据,没有结束。所以要一直连续的处理无界流,所以一旦有数据到来就要立即处理,不能等数据都到再处理,因为输入是无限的。处理无界数据通常需要按特定顺序(如数据引入的顺序),以便能够推断结果的完整性。

有界数据流:有具体的开始和结束。有界流的处理也称为批处理。有界数据可以等待所有数据到达之后再进行计算处理。有界数据不需要按顺序引入,因为可以对有界的数据集进行排序。

在这里插入图片描述
在这里插入图片描述

# 添加完这些依赖就可以使用Java代码使用Flink的流处理功能。# 这个依赖项包含了Flink的流处理API和相关的类库。主要干活的
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>${flink.version}</version>
</dependency># Flink客户端库,用这个可以连接到Flink集群并提交或管理Flink作业。
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>${flink.version}</version>
</dependency>

Flink部署

在这里插入图片描述

flink部署模式

在一些应用场景中,对于集群资源分配和占用的方式,可能会有特定的需求。Flink为各种场景提供了不同的部署模式,主要有以下三种:会话模式(Session Mode)、单作业模式(Per-Job Mode)、应用模式(Application Mode)。

它们的区别主要在于:集群的生命周期以及资源的分配方式;以及应用的main方法到底在哪里执行——客户端(Client)还是JobManager。

会话模式(Session Mode)

请添加图片描述

*0)集群规划*

表3-1 集群角色分配

节点服务器*hadoop**102**hadoop**103**hadoop**104*
角色充当JobManager和TaskManagerTaskManagerTaskManager

具体安装部署步骤如下:

*1)下载并解压安装包*

(1)https://flink.apache.org/downloads/ 下载安装包flink-1.17.0-bin-scala_2.12.tgz,将该jar包上传到hadoop102节点服务器的/opt/software路径上。

(2)在/opt/software路径上解压flink-1.17.0-bin-scala_2.12.tgz到/opt/module路径上。

[atguigu@hadoop102 software]$ tar -zxvf flink-1.17.0-bin-scala_2.12.tgz -C /opt/module/

*2)修改集群配置*

(1)进入conf路径,修改flink-conf.yaml文件,指定hadoop102节点服务器为JobManager

[atguigu@hadoop102 conf]$ vim flink-conf.yaml

修改如下内容:

# JobManager节点地址.jobmanager.rpc.address: hadoop102jobmanager.bind-host: 0.0.0.0rest.address: hadoop102rest.bind-address: 0.0.0.0\# TaskManager节点地址.需要配置为当前机器名taskmanager.bind-host: 0.0.0.0taskmanager.host: hadoop102

(2)修改workers文件,指定hadoop102、hadoop103和hadoop104为TaskManager

[atguigu@hadoop102 conf]$ vim workers

修改如下内容:

hadoop102hadoop103hadoop104

(3)修改masters文件

[atguigu@hadoop102 conf]$ vim masters

修改如下内容:

hadoop102:8081

(4)另外,在flink-conf.yaml文件中还可以对集群中的JobManager和TaskManager组件进行优化配置,主要配置项如下:

l jobmanager.memory.process.size:对JobManager进程可使用到的全部内存进行配置,包括JVM元空间和其他开销,默认为1600M,可以根据集群规模进行适当调整。

l taskmanager.memory.process.size:对TaskManager进程可使用到的全部内存进行配置,包括JVM元空间和其他开销,默认为1728M,可以根据集群规模进行适当调整。

l taskmanager.numberOfTaskSlots:对每个TaskManager能够分配的Slot数量进行配置,默认为1,可根据TaskManager所在的机器能够提供给Flink的CPU数量决定。所谓Slot就是TaskManager中具体运行一个任务所分配的计算资源。

l parallelism.default:Flink任务执行的并行度,默认为1。优先级低于代码中进行的并行度配置和任务提交时使用参数指定的并行度数量。

关于Slot和并行度的概念,我们会在下一章做详细讲解。

*3)分发安装目录*

(1)配置修改完毕后,将Flink安装目录发给另外两个节点服务器。

[atguigu@hadoop102 module]$ xsync flink-1.17.0/

(2)修改hadoop103的 taskmanager.host

[atguigu@hadoop103 conf]$ vim flink-conf.yaml

修改如下内容:

# TaskManager节点地址.需要配置为当前机器名taskmanager.host: hadoop103

(3)修改hadoop104的 taskmanager.host

[atguigu@hadoop104 conf]$ vim flink-conf.yaml

修改如下内容:

# TaskManager节点地址.需要配置为当前机器名taskmanager.host: hadoop104

*4)启动集群*

(1)在hadoop102节点服务器上执行start-cluster.sh启动Flink集群:

[atguigu@hadoop102 flink-1.17.0]$ bin/start-cluster.sh

(2)查看进程情况:

[atguigu@hadoop102 flink-1.17.0]$ jpsall 

=============== hadoop102 ===============

4453 StandaloneSessionClusterEntrypoint

4458 TaskManagerRunner

4533 Jps

=============== hadoop103 ===============

2872 TaskManagerRunner

2941 Jps

=============== hadoop104 ===============

2948 Jps

2876 TaskManagerRunner

*5)访问Web UI*

启动成功后,同样可以访问http://hadoop102:8081对flink集群和任务进行监控管理。
请添加图片描述

这里可以明显看到,当前集群的TaskManager数量为3;由于默认每个TaskManager的Slot数量为1,所以总Slot数和可用Slot数都为3。

向集群提交作业

在上一章中,我们已经编写读取socket发送的单词并统计单词的个数程序案例。本节我们将以该程序为例,演示如何将任务提交到集群中进行执行。具体步骤如下。

*1**)环境准备*

在hadoop102中执行以下命令启动netcat。

[atguigu@hadoop102 flink-1.17.0]$ nc -lk 7777

*2**)**程序**打包*

(1)在我们编写的Flink入门程序的pom.xml文件中添加打包插件的配置,具体如下:

<build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>3.2.4</version><executions><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><artifactSet><excludes><exclude>com.google.code.findbugs:jsr305</exclude><exclude>org.slf4j:*</exclude><exclude>log4j:*</exclude></excludes></artifactSet><filters><filter><!-- Do not copy the signatures in the META-INF folder.Otherwise, this might cause SecurityExceptions when using the JAR. --><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters><transformers combine.children="append"><transformerimplementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"></transformer></transformers></configuration></execution></executions></plugin></plugins></build>

(2)插件配置完毕后,可以使用IDEA的Maven工具执行package命令,出现如下提示即表示打包成功。

-------------------------------------------------------------------

[INFO] BUILD SUCCESS

-------------------------------------------------------------------

打包完成后,在target目录下即可找到所需JAR包,JAR包会有两个,FlinkTutorial-1.0-SNAPSHOT.jar和FlinkTutorial-1.0-SNAPSHOT-jar-with-dependencies.jar,因为集群中已经具备任务运行所需的所有依赖,所以建议使用FlinkTutorial-1.0-SNAPSHOT.jar。比较大的带有依赖。

*3)**在Web* *UI上提交作业*

(1)任务打包完成后,我们打开Flink的WEB UI页面,在右侧导航栏点击“Submit New Job”,然后点击按钮“+ Add New”,选择要上传运行的JAR包,如下图所示。
在这里插入图片描述

JAR包上传完成,如下图所示:
在这里插入图片描述

(2)点击该JAR包,出现任务配置页面,进行相应配置。

主要配置程序入口主类的全类名,任务运行的并行度,任务运行所需的配置参数和保存点路径等,如下图所示,配置完成后,即可点击按钮“Submit”,将任务提交到集群运行。
在这里插入图片描述

(3)任务提交成功之后,可点击左侧导航栏的“Running Jobs”查看程序运行列表情况。
在这里插入图片描述

(4)测试

​ ①在socket端口中输入hello

[atguigu@hadoop102 flink-1.17.0]$ nc -lk 7777

hello

②先点击Task Manager,然后点击右侧的192.168.10.104服务器节点
在这里插入图片描述

​ ③点击Stdout,就可以看到hello单词的统计
在这里插入图片描述

​ 注意:如果hadoop104节点没有统计单词数据,可以去其他TaskManager节点查看。

(4)点击该任务,可以查看任务运行的具体情况,也可以通过点击“Cancel Job”结束任务运行。
在这里插入图片描述

*4**)命令行提交作业*

除了通过WEB UI界面提交任务之外,也可以直接通过命令行来提交任务。这里为方便起见,我们可以先把jar包直接上传到目录flink-1.17.0下

(1)首先需要启动集群。

[atguigu@hadoop102 flink-1.17.0]$ bin/start-cluster.sh

(2)在hadoop102中执行以下命令启动netcat。

[atguigu@hadoop102 flink-1.17.0]$ nc -lk 7777

(3)将flink程序运行jar包上传到/opt/module/flink-1.17.0路径。

(4)进入到flink的安装路径下,在命令行使用flink run命令提交作业。

[atguigu@hadoop102 flink-1.17.0]$ bin/flink run -m hadoop102:8081 -c com.atguigu.wc.SocketStreamWordCount ./FlinkTutorial-1.0-SNAPSHOT.jar

这里的参数 -m指定了提交到的JobManager,-c指定了入口类。

(5)在浏览器中打开Web UI,http://hadoop102:8081查看应用执行情况。

用netcat输入数据,可以在TaskManager的标准输出(Stdout)看到对应的统计结果。
在这里插入图片描述

(6)在/opt/module/flink-1.17.0/log路径中,可以查看TaskManager节点。

[atguigu@hadoop102 log]$ cat flink-atguigu-standalonesession-0-hadoop102.out

(hello,1)

(hello,2)

(flink,1)

(hello,3)

(scala,1)

单作业模式(Per-Job Mode)

在这里插入图片描述

应用模式(Application Mode)

在这里插入图片描述

这里我们所讲到的部署模式,相对是比较抽象的概念。实际应用时,一般需要和资源管理平台结合起来,选择特定的模式来分配资源、部署应用。接下来,我们就针对不同的资源提供者的场景,具体介绍Flink的部署方式。

DataStream API

DataStream API是Flink的核心层API,使用API实现对数据流的计算和处理。

一个Flink程序,其实就是对数据流DataStream的各种转换。具体来说,代码基本上都由以下几部分构成:
在这里插入图片描述

/*** 计算单词出现个数** flink处理无界数据流* 程序会一直运行,一有数据来就处理** @author shkstart* @create 2023-09-10 16:44*/
public class SocketStreamWordCount {public static void main(String[] args) throws Exception {// 1.创建flink流式处理环境 StreamExecutionEnvironmentStreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();// 2.接收要待处理的数据DataStreamSource<String> dateStream = see.socketTextStream("192.168.239.128", 7777);// 3.处理数据 数据处理后格式:(word,2)单词和对应出现的次数/*** flatMap(FlatMapFunction<T, R> flatMapper)* 为数据流的每一个元素调用flatMapper*/System.out.println("原始数据流:" + dateStream);// FlatMapFunction转换,处理数据流元素FlatMapFunction<String, Tuple2<String, Integer>> flatMapFunction = new FlatMapFunctionImpl();SingleOutputStreamOperator<Tuple2<String, Integer>> transformedDataStream =dateStream.flatMap(flatMapFunction);System.out.println("处理后的数据流:" + transformedDataStream);// 按照word分组 按string分组 将Integer累加SingleOutputStreamOperator<Tuple2<String, Integer>> sum = transformedDataStream.keyBy(data -> data.f0).sum(1);// 4.展示sum.print();// 5.执行 开始处理// 代码末尾需要调用 流式处理环境 的execute方法,开始执行任务see.execute();}
}
public class FlatMapFunctionImpl implements FlatMapFunction<String , Tuple2<String,Integer>> {/*** 转换数据流元素* @param value 输入的元素* @param out 输出的元素* @throws Exception*/@Overridepublic void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {// 切分String[] words = value.split(" ");// 收集for (String word : words) {out.collect(Tuple2.of(word,1));}}
}

1、执行环境

Flink程序可以在各种上下文环境中运行:我们可以在本地JVM中执行程序,也可以提交到远程集群上运行。

不同的环境,代码的提交运行的过程会有所不同。这就要求我们在提交作业执行计算时,首先必须获取当前Flink的运行环境,从而建立起与Flink框架之间的联系。

DataStream API执行模式包括:流执行模式、批执行模式和自动模式。

流执行模式(Streaming)

这是DataStream API最经典的模式,一般用于需要持续实时处理的无界数据流。默认情况下,程序使用的就是Streaming执行模式。

// 流处理环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

批执行模式(Batch)

专门用于批处理(处理有界数据)的执行模式。

自动模式(AutoMatic)

在这种模式下,将由程序根据输入数据源是否有界,来自动选择执行模式。

批执行模式的使用。主要有两种方式:

(1)通过命令行配置

bin/flink run -Dexecution.runtime-mode=BATCH ...

在提交作业时,增加execution.runtime-mode参数,指定值为BATCH。

(2)通过代码配置

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.BATCH);

在代码中,直接基于执行环境调用setRuntimeMode方法,传入BATCH模式。

实际应用中一般不会在代码中配置,而是使用命令行,这样更加灵活。

2、数据源

从socket中读取数据

不论从集合还是文件,我们读取的其实都是有界数据。在流处理的场景中,数据往往是无界的。

我们之前用到的读取socket文本流,就是流处理场景。但是这种方式由于吞吐量小、稳定性较差,一般也是用于测试。

DataStream<String> stream = env.socketTextStream("localhost", 7777);
从Kafka读取数据

Flink官方提供了连接工具flink-connector-kafka,直接帮我们实现了一个消费者FlinkKafkaConsumer,它就是用来读取Kafka数据的SourceFunction。

所以想要以Kafka作为数据源获取数据,我们只需要引入Kafka连接器的依赖。Flink官方提供的是一个通用的Kafka连接器,它会自动跟踪最新版本的Kafka客户端。目前最新版本只支持0.10.0版本以上的Kafka。这里我们需要导入的依赖如下。

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka</artifactId><version>${flink.version}</version></dependency>

代码如下:

public class SourceKafka {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();KafkaSource<String> kafkaSource = KafkaSource.<String>builder().setBootstrapServers("hadoop102:9092").setTopics("topic_1").setGroupId("atguigu").setStartingOffsets(OffsetsInitializer.latest()).setValueOnlyDeserializer(new SimpleStringSchema()) .build();DataStreamSource<String> stream = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kafka-source");stream.print("Kafka");env.execute();}
}

3、转换算子

基本转换算子(map/** filter**/** flat**Map)****

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/84322.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

如何通过百度SEO优化提升网站排名(掌握基础概念,实现有效优化)

随着互联网的发展&#xff0c;搜索引擎优化&#xff08;SEO&#xff09;成为了网站优化中不可或缺的一部分。在中国&#xff0c;百度搜索引擎占据着主导地位&#xff0c;因此掌握百度SEO概念和优化技巧对网站的排名和曝光非常重要。 百度SEO排名的6个有效方法&#xff1a; 首…

CTF —— 网络安全大赛(这不比王者好玩吗?)

前言 随着大数据、人工智能的发展&#xff0c;人们步入了新的时代&#xff0c;逐渐走上科技的巅峰。 \ ⚔科技是一把双刃剑&#xff0c;网络安全不容忽视&#xff0c;人们的隐私在大数据面前暴露无遗&#xff0c;账户被盗、资金损失、网络诈骗、隐私泄露&#xff0c;种种迹象…

PostgreSQL设置主键为自增

1、创建自增序列 CREATE SEQUENCE table_name_id_seq START 1; 2、设置字段默认值 字段默认值中设置 nextval(table_name_id_seq) 3、常用查询 -- 查询所有序列 select * from information_schema.sequences where sequence_schema public; -- 查询自增序列的当前值 select cu…

2023年浙工商MBA新生奖学金名单公布,如何看待?

浙工商MBA项目官方最新公布了2023年的非全日制新生奖学金名单&#xff0c;按照政策约定&#xff0c;共分为特等奖学金1名&#xff0c;一等奖学金10名&#xff0c;二等奖学金15名&#xff0c;三等奖学金30名&#xff0c;额度对应3万、1万、0.8万、0.5万不等&#xff0c;主要名单…

Mac电脑安装Zulu Open JDK 8 使用 spring-kafka 消费不到Kafka Partition中的消息

一、现象描述 使用Mac电脑本地启动spring-kakfa消费不到Kafka的消息&#xff0c;监控消费组的消息偏移量发现存在Lag的消息&#xff0c;但是本地客户端就是拉取不到&#xff0c;通过部署到公司k8s容器上消息却能正常消费&#xff01; 本地启动的服务消费组监控 公司k8s容器服…

C语言入门Day_23 指针的使用

目录 前言&#xff1a; 1.指针运算符 2.指针的运算和使用 3.易错点 4.思维导图 前言&#xff1a; 上一篇博客中我们了解到指针的两个运算符号"&#xff1a; 一个是星号*&#xff0c;一个是&&#xff0c;他们的名字分别是指针运算符和取地址运算符。 1.指针运算…

QT-day5

1、添加注册功能到数据库 头文件 #ifndef WIDGET_H #define WIDGET_H#include <QWidget> #include <QMessageBox> //消息对话框类头文件 #include <QDebug> #include <QPushButton> #include <QSqlDatabase> //数据库管理类 #include…

Vue路由和Node.js环境搭建

文章目录 一、vue路由1.1 简介1.2 SPA1.3 实例 二、Node.js环境搭建2.1 Node.js简介2.2 npm2.3 环境搭建2.3.1 下载解压2.3.2 配置环境变量2.3.3 配置npm全局模块路径和cache默认安装位置2.3.4 修改npm镜像提高下载速度 2.4 运行项目 一、vue路由 1.1 简介 Vue 路由是 Vue.js…

C++项目:仿mudou库实现高性能高并发服务器

文章目录 一、实现目标二、前置知识&#xff08;一&#xff09;HTTP服务器1.概念2.Reactor模型&#xff1a;3.分类 一、实现目标 仿muduo库One Thread One Loop式主从Reactor模型实现高并发服务器&#xff1a; 通过咱们实现的高并发服务器组件&#xff0c;可以简洁快速的完成⼀…

【深度学习】卷积神经网络(LeNet)【文章重新修改中】

卷积神经网络 LeNet 前言LeNet 模型代码实现MINST代码分块解析1 构建 LeNet 网络结构2 加载数据集3 初始化模型和优化器4 训练模型5 训练完成 完整代码 Fashion-MINST代码分块解析1 构建 LeNet 网络结构2 初始化模型参数3 加载数据集4 定义损失函数和优化器5 训练模型 完整代码…

云服务器ECS_云主机_服务器托管_计算-阿里云

阿里云服务器是什么&#xff1f;云服务器ECS是一种安全可靠、弹性可伸缩的云计算服务&#xff0c;云服务器可以降低IT成本提升运维效率&#xff0c;免去企业或个人前期采购IT硬件的成本&#xff0c;阿里云服务器让用户像使用水、电、天然气等公共资源一样便捷、高效地使用服务器…

新手怎样快速上手接口测试?掌握这几个知识点直接起飞!

接口测试是测试系统组件间接口的一种方式&#xff0c;接口测试主要用于检测外部系统与系统之间以及内部各个子系统之间的交互点。测试的重点是检查数据的增删改查操作&#xff0c;以及系统之间的逻辑关系等 接口的几种类型 接口的类型包括&#xff1a;post &#xff0c;get&am…

沉积物微体古生物鉴定

声明 本文是学习GB-T 42629.4-2023 国际海底区域和公海环境调查规程 第4部分&#xff1a;海洋沉积物物理特性调查. 而整理的学习笔记,分享出来希望更多人受益,如果存在侵权请及时联系我们 1 范围 本文件规定了国际海底区域和公海环境调查中的沉积物组成、物理力学性质、生物…

【C++】unordered_map与unorder_set的封装(哈希桶)

文章目录 前言一、模板参数的改造二、模板的特例化操作三、仿函数的妙用四、unordered迭代器基本操作1.const迭代器注意&#xff1a;2.HashTable与HTIterator的冲突 五、迭代器的构造问题六、完整代码1.hash_bucket.h2.unordered_set.h3.unordered_map.h 前言 我们开辟一个指针…

使用LDA(线性判别公式)进行iris鸢尾花的分类

线性判别分析((Linear Discriminant Analysis &#xff0c;简称 LDA)是一种经典的线性学习方法&#xff0c;在二分类问题上因为最早由 [Fisher,1936] 提出&#xff0c;亦称 ”Fisher 判别分析“。并且LDA也是一种监督学习的降维技术&#xff0c;也就是说它的数据集的每个样本都…

驱动开发--汇总

一&#xff0c;【驱动相关概念】 1&#xff0c;什么是驱动 能够驱使硬件实现特定功能的软件代码 根据驱动程序是否依赖于系统内核将驱动分为裸机驱动和系统驱动 2&#xff0c;逻辑驱动和系统驱动的区别 裸机驱动&#xff1a;编写的驱动代码中没有进行任何内核相关API的调用…

Flutter插件的制作和发布

Flutter制作插件有两种方式&#xff08;以下以android和ios为例&#xff09;&#xff1a; 目录 1.直接在主工程下的android和ios项目内写插件代码&#xff1a;2.创建独立Flutter Plugin项目&#xff0c;制作各端插件后&#xff0c;再引入项目&#xff1a;1. 创建Flutter Plugin…

Webpack打包CSS文件,解决You may need an appropriate loader to handle this file type报错

在项目文件夹下创建webpack.config.js文件&#xff0c;该文件就是Webpack的配置文件 注意&#xff1a;该文件中遵循Node.js的代码格式规范 &#xff0c;需要对导出配置文件中的内容 Webpack在默认情况下只能打包js文件&#xff0c;如果我们希望他能够打包其他类型的文件&#…

物联网安全优秀实践:2023年设备保护指南

物联网的发展可谓是革命性的&#xff0c;数十亿台设备实时互连、通信和共享数据。因此&#xff0c;考虑物联网安全的最佳实践至关重要。 物联网的重要性日益上升 在数字时代&#xff0c;物联网(IoT)已成为一股革命力量&#xff0c;重塑了企业运营和个人生活方式。从调节家庭温…

堆的OJ题

&#x1f525;&#x1f525; 欢迎来到小林的博客&#xff01;&#xff01;       &#x1f6f0;️博客主页&#xff1a;✈️林 子       &#x1f6f0;️博客专栏&#xff1a;✈️ 小林的算法笔记       &#x1f6f0;️社区 :✈️ 进步学堂       &am…