Flink---1、概述、快速上手

1、Flink概述

1.1 Flink是什么

Flink的官网主页地址:https://flink.apache.org/
Flink的核心目标是“数据流上有状态的计算”(Stateful Computations over Data Streams)。
具体说明:Apache Flink是一个“框架和分布式处理引擎”,用于对无界有界数据流进行有状态计算。
在这里插入图片描述

1.1.1 无界数据流

  • 有定义流的开始,但是没有定义流的结束
  • 它们会无休止的产生数据
  • 无界流的数据必须持续处理,即数据被摄取后需要立即处理。我们不能等到所有数据都到达再处理,因为输入时无限的。

1.1.2 有界数据流

  • 有定义流的开始,也有定义流的结束
  • 有界流可以在摄取所有数据后再进行计算
  • 有界流所有的数据可以被排序,所有并不需要有序摄取
  • 有界流处理通常被称为批处理

1.1.3 有状态流处理

把流处理需要的额外数据保存成一个“状态”,然后针对这条数据进行处理,并且更新状态,这就是所谓的“有状态的流处理”。
在这里插入图片描述

  • 状态在内存中:优点:速度快;缺点:可靠性差
  • 状态在分布式系统中:优点:可靠性高;缺点:速度慢

1.1.4 Flink发展历史

在这里插入图片描述

1.2 Flink特点

我们处理数据的目标是:低延迟、高吞吐、结果的准确性和良好的容错性。
Flink主要特点如下:

  • 高吞吐和低延迟:每秒处理数百万个事件,毫秒级延迟
  • 结果的准确性:Flink提供了事件时间(event-time)和处理时间(processing-time)语义。对于乱序事件流,事件时间语义仍然能提供一致且准确的结果。
  • 精确一次(exactly-once)的状态一致性保证
  • 可以连接到最常用的外部系统,如kafka、Hive、JDBC、HDFS、Redis等
  • 高可用:本身高可用的设置,加上K8S,Yarn和Mesos的紧密集成,再加上从故障中快速恢复和动态扩展任务的能力,Flink能做到以极少的停机时间7x24全天候运行。

1.3 Flink和SparkStreaming(说实话没有比较的必要)

1、Spark是以批处理为根本。
2、Flink是以流处理为根本。

1.4 Flink的应用场景

1、电商和市场营销
2、物联网(IOT)
3、物流配送和服务业
4、银行和金融业

1.5 Flink分层API

在这里插入图片描述

  • 有状态流处理:通过底层API(处理函数),对原始数据加工处理。底层API和DataStreamAPI相集成,可以处理复杂的计算。
  • DataStreamAPI(流处理)和DataSetAPI(批处理)封装了底层处理函数,提供了通用的模块,比如转换(transformations,包括map,flatMap等),连接(joins),聚合(aggregations),窗口(Windows)操作等。注意:Flink1.12后,DataStreamAPI已经实现真正的流批一体,所以DataSetAPI已经过时。
  • TableAPI是以表为中心的声明式编程,其中表可能会动态变化。TableAPI遵循关系模型;表有二维数据结构,类似于关系数据库中的表,同时API提供可比较的操作,例如select、project、join、group by、aggregate等。我们可以在表与DataStream/DataSet之间无缝切换,以允许程序将TableAPI与DataStream以及DataSet混用。
  • SQL这一层在语法与表达能力上与TableAPI类似,但是是以SQL查询表达式的形式表现程序。SQL抽象与TableAPI交互密切,同时SQL查询可以直接在TableAPI定义的表上执行。

2、Flink快速上手

2.1 创建项目

在准备好所有的开发环境之后,我们就可以开始开发自己的第一个Flink程序了。首先我们要做的,就是在IDEA中搭建一个Flink项目的骨架。我们会使用Java项目中常见的Maven来进行依赖管理。
1、创建工程
(1)打开IntelliJ IDEA,创建一个Maven工程。
在这里插入图片描述
2、添加项目依赖

<properties><flink.version>1.17.0</flink.version>
</properties><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>${flink.version}</version></dependency>
</dependencies>

2.2 WordCount代码编写(大数据常用的例子)

需求:统计一段文字中,每个单词出现的频次
环境准备:创建一个com.zhm.wordcount包

2.2.1 批处理

批处理的基本思路:先逐行读入文件数据,然后将每一行文子拆分成单词;接着按照单词分组,统计每组数据的个数,就是对应单词的频次。
1、数据准备
(1)在工程根目录下新建一个data文件夹,并在下面创建文本文件words.txt
(2)在文件中输入一些单词

hello hello hello
world world
hello world

2、代码编写
(1)在com.zhm.wordcount包下新建一个Demo01_BatchProcess类


/*** @ClassName Batch* @Description 利用Flink批处理单词统计* @Author Zouhuiming* @Date 2023/9/3 9:58* @Version 1.0*/import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;/**计算的套路:(1) 计算的环境Spark:SparkContextMR:DriverFlink:ExecutionEnvironment(2) 把要计算的数据封装为计算模型Spark:RDD(Spark Core)DateFrame|DataSet(SparkSQL)DStream(SparkStream)MR:k-VFlink:DataSource(3)调用计算APIRDD.转换算子()MR:自己去编写Mapper、ReducerFlink:DataSource.算子()*/
public class Demo01_BatchProcess {public static void main(String[] args) throws Exception {//创建支持Flink计算的环境ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();//使用环境去读取数据,封装为计算模型DataSource<String> dataSource = env.readTextFile("data/words.txt");//调用计算APIdataSource.flatMap(new FlatMapFunction<String, Tuple2<String,Integer>>() {@Overridepublic void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {String[] split = s.split(" ");for (String s1 : split) {collector.collect(new Tuple2<String,Integer>(s1,1));}}}).groupBy(0).sum(1).print();}
}

运行结果:
在这里插入图片描述
注意:这种实现是基于DataSetAPI的,也就是我们对数据的处理转换,是看作数据集来进行操作的。事实上Flink本身是流批统一的处理架构,批量的数据集本质上也是流,没有必要用两套不同的API来实现。所以从Flink1.12开始,官方推荐的做法是直接使用DataStreamAPI,在提交任务时通过将执行模式设为BATCH来进行批处理;

bin/flink run -Dexecution.runtime-mode=BATCH BatchWordCount.jar

这样,DataSetAPI就没有用了,在实际应用中我们只要维护一套DataStreamAPI就可以。这里只是为了方便大家理解,我们依然用DataSetAPI做了批处理的实现。

2.2.2 流处理

对于Flink而言,流才是整个处理逻辑的底层核心,所以流批一体之后的DataStreamAPI更加强大,可以直接处理批处理和流处理的所有场景。
下面我们就针对不同类型的的输入数据源,用具体的代码来实现流处理。
1、读取文件(有界流)
我们同样试图读取文档words.txt中的数据,并统计每个单词出现的频次。整体思路与之前的批处理非常类似,代码模式也基本一致。
在com.zhm.wordcount包下新建一个Demo02_BoundedStreamProcess类

package com.zhm.wordcount;import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.connector.file.src.FileSource;
import org.apache.flink.connector.file.src.reader.TextLineInputFormat;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;/*** @ClassName Demo02_BoundedStreamProcess* @Description 有界流* @Author Zouhuiming* @Date 2023/9/3 10:26* @Version 1.0*/public class Demo02_BoundedStreamProcess {public static void main(String[] args) throws Exception {//1、创建支持Flink计算的环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//1.1 设置一个线程处理这个流(默认是根据你的cpu数和单词种类个数,取最小值)
//        env.setParallelism(1);//2、获取数据源FileSource<String> source = FileSource.forRecordStreamFormat(new TextLineInputFormat(), new Path("data/words.txt")).build();//3、利用环境将数据源的数据封装为计算模型DataStreamSource<String> streamSource = env.fromSource(source, WatermarkStrategy.noWatermarks(), "myfile");//4、调用API对数据进行计算//4.1 将每行数据按照给定的分割符拆分为Tuple2类型的数据模型(word,1)streamSource.flatMap(new FlatMapFunction<String, Tuple2<String,Integer>>() {@Overridepublic void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {String[] split = s.split(" ");for (String s1 : split) {collector.collect(new Tuple2<>(s1,1));}}//4.2 根据word分组}).keyBy(new KeySelector<Tuple2<String, Integer>, String>() {@Overridepublic String getKey(Tuple2<String, Integer> stringIntegerTuple2) throws Exception {return stringIntegerTuple2.f0;}//4.3 根据分组之后,按照元组中的第二列聚相加}).sum(1)// 4.4 打印结果.print();//5、提交jobenv.execute();}
}

运行结果:
在这里插入图片描述
和批处理程序BatchWordCount的不同:

  • 创建执行环境的不同,流处理程序使用的是StreamExecutionEnvironment。
  • 转换处理之后,得到的数据对象类型不同
  • 分组操作调用的方法是keyBy方法,可以传入一个匿名函数作为键选择器(KeySelector),指定当前分组的key是什么。
  • 代码末尾需要调用env的execute方法,开始执行任务。

2、读取Socket文本流(无界流)
在实际的生产环境中,真正的数据流其实是无界的,有开始却没有结束,这就要求我们需要持续的处理捕获的数据。为了模拟这种场景,可以监听Socket端口,然后向该端口不断地发生数据。
(1)将StreamWordCount代码中读取文件数据的readTextFile方法,替换成读取Socket文本流的方法socketTextStream。具体代码实现如下:

package com.zhm.wordcount;import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.file.src.FileSource;
import org.apache.flink.connector.file.src.reader.TextLineInputFormat;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;/*** @ClassName Demo03_UnBoundedStreamProcess* @Description 无界流* @Author Zouhuiming* @Date 2023/9/3 10:39* @Version 1.0*/
public class Demo03_UnBoundedStreamProcess {public static void main(String[] args) throws Exception {//1、创建支持Flink计算的环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//1.1 设置一个线程处理这个流env.setParallelism(1);//2、获取数据源DataStreamSource<String> streamSource = env.socketTextStream("hadoop102", 9999);//3.1 将每行数据按照给定的分割符拆分为Tuple2类型的数据模型(word,1)streamSource.flatMap(new FlatMapFunction<String, Tuple2<String,Integer>>() {@Overridepublic void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {String[] split = s.split(" ");for (String s1 : split) {collector.collect(new Tuple2<>(s1,1));}}//3.2 根据word分组}).keyBy(new KeySelector<Tuple2<String, Integer>, String>() {@Overridepublic String getKey(Tuple2<String, Integer> stringIntegerTuple2) throws Exception {return stringIntegerTuple2.f0;}//3.3 根据分组之后,按照元组中的第二列聚相加}).sum(1)// 3.4 打印结果.print();//4、提交jobenv.execute();}
}

(2)在Linux环境的主机hadoop102上,执行下列命令,发送数据进行测试(前提是要安装netcat)

nc -lk hadoop102 9999

(3)启动Demo03_UnBoundedStreamProcess程序
我们会发现程序启动之后没有任何输出、也不会退出。这是正常的,因为Flink的流处理是事件驱动的,当前程序会一直处于监听状态,只有接受数据才会执行任务、输出统计结果。
在这里插入图片描述

(4)从hadoop102发送数据
在这里插入图片描述
(5)观察idea控制台
在这里插入图片描述
说明:Flink还具有一个类型提前系统,可以分析函数的输入和返回类型,自动获取类型信息,从而获得对应的序列化器和反序列化器。但是,由于java中泛型擦除的存在,在某些特殊情况下(比如Lambda表达式中),自动提取的信息是不够精细的–只告诉Flink当前的元素由“船头、船身、船尾”构成,根本无法重建出“大船”的模样;这时就需要显示地提供类型信息,才能使得应用程序正常工作或提高其性能。
因为对于flatMap里传入的Lambda表达式,系统只能推断出返回的是Tuple2类型,而无法得到Tuple<String,Long>。只有显示地告诉系统当前的返回类型,才能正确的解析出完整数据。

2.2.3 执行模式

从Flink 1.12开始,官方推荐的做法是直接使用DataStream API,在提交任务时通过将执行模式设为BATCH来进行批处理。不建议使用DataSet API。

// 流处理环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStreamAPI执行模式包括:流执行模式、批执行模式和自动模式。

  • 流执行模式(Streaming)
    这是DataStreamAPI最经典的模式,一边用于需要持续实时处理的无界数据流。默认情况下,程序使用的就是Streaming执行模式。
  • 批执行模式(Batch)
    专门用于批处理的执行模式
  • 自动模式
    在这种模式下,将由程序根据输入数据源是否有界来自动选择执行模式。
    批执行模式的使用:主要有两种方式:
    (1)通过命令行配置
bin/flink run -Dexecution.runtime-mode=BATCH ...

在提交作业时,增加execution.runtime-mode参数,指定值为BATCH。

(2)通过代码设置

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.BATCH);

在代码中,直接基于执行环境调用setRuntimeMode方法,传入BATCH模式。
实际应用中一般不会在代码中配置,而是使用命令行,这样更加灵活。

2.2.4 本地WebUI

在Idea本地运行程序时,可以通过添加本地WebUI依赖,使用WebUI界面查看Job的运行情况。

  <dependency><groupId>org.apache.flink</groupId><artifactId>flink-runtime-web</artifactId><version>${flink.version}</version><scope>provided</scope></dependency>

添加后,在代码中可以指定绑定的端口:

Configuration conf = new Configuration();conf.setInteger("rest.port", 3333);StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);

之后,在程序启动后,打开本地浏览器,访问localhost:3333即可查看job的运行情况。
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/67262.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

故障分析 | OceanBase 频繁更新数据后读性能下降的排查

以下文章来源于爱可生开源社区 &#xff0c;作者张乾 爱可生开源社区. 爱可生开源社区&#xff0c;提供稳定的MySQL企业级开源工具及服务&#xff0c;每年1024开源一款优良组件&#xff0c;并持续运营维护。 测试在做 OceanBase 纯读性能压测的时候&#xff0c;发现对数据做过…

Redis 7 第八讲 集群模式(cluster)架构篇

集群架构 Redis 集群架构图 集群定义 Redis 集群是一个提供在多个Redis节点间共享数据的程序集;Redis集群可以支持多个master 应用场景 Redis集群支持多个master,每个master又可以挂载多个slave读写分离支持数据的高可用支持海量数据的读写存储操作集群自带Sentinel的故障…

等保测评各个级别的详细内容

等保测评是指信息系统安全等级保护测评&#xff0c;是我国信息安全领域中的一项重要工作。根据国家标准《信息系统安全等级保护基本要求》(GB/T 22239-2008)和《信息系统安全等级保护测评技术要求》(GB/T 25070-2010)。 等保测评分为五个级别&#xff0c;分别是&#xff1a;一级…

C#FreeSql分库分表

using FreeSql; using FreeSql.DataAnnotations;namespace FreeSqlMaster {internal class Program{static IFreeSql freeSql null;static void Main(string[] args){// 读写分离// FreeSql 实现了第3种方案&#xff0c;支持一个【主库】多个【从库】&#xff0c;【从库】的查…

栈和队列OJ

一、括号的匹配 题目介绍&#xff1a; 思路&#xff1a; 如果 c 是左括号&#xff0c;则入栈 push&#xff1b;否则通过哈希表判断括号对应关系&#xff0c;若 stack 栈顶出栈括号 stack.pop() 与当前遍历括号 c 不对应&#xff0c;则提前返回 false。栈 stack 为空&#xff1…

(10)(10.8) 固件下载

文章目录 ​​​​​​​前言 10.8.1 固件 10.8.2 Bootloader 10.8.3 APM2.x Autopilot 10.8.4 许可证 10.8.5 安全 前言 固件服务器(firmware server)可提供所有飞行器的最新固件。其中包括&#xff1a; CopterPlaneRoverAntennaTrackerSub 本页提供了一些被视为&quo…

JavaWeb | 常用的HTML(JavaWeb)标签

目录&#xff1a; HTML简介HTML的基本结构HTML的常用标签&#xff1a;“标题” 标签“换行” 标签“段落” 标签“水平线” 标签“文字” 标签“粗体” 标签“下划线” 标签“斜体” 标签“上标” 标签“下标” 标签“闪烁” 标签表示 “空格”“列表” 标签&#xff1a;无序列…

window 常用基础命令

0、起步 0-1) 获取命令的参数指引 netstat /? 0-2) 关于两个斜杠&#xff1a; window 文件路径中使用反斜杠&#xff1a;\ linux 文件路径中使用&#xff1a;/ 1、开关机类指令 shutdown /s # 关机shutdown /r # 重启shutdown /l …

C# void 关键字学习

C#中void关键字是System.Void的别名&#xff1b; 可以将 void 用作方法&#xff08;或本地函数&#xff09;的返回类型来指定该方法不返回值&#xff1b; 如果C&#xff03;方法中没有参数&#xff0c;则不能将void用作参数&#xff1b;这是与C语言不同的&#xff0c;C语言有…

机器人制作开源方案 | 桌面级全向底盘--本体说明+驱动控制

一、本体说明 1. 底盘概述 该底盘是一款模块化的桌面级应用型底盘&#xff0c;基于应用级软件架构设计、应用级硬件系统设计、典型应用型底盘机械系统设计。 底盘本体为一个采用半独立刚性悬挂的四驱全向底盘。 2. 软件环境介绍 操作系统&#xff1a;Ubuntu18.04系统。基于Deb…

【STM32】学习笔记(串口通信)

串口通信 通信接口硬件电路电平标准USARTUSART框图 通信接口 串口是一种应用十分广泛的通讯接口&#xff0c;串口成本低、容易使用、通信线路简单&#xff0c;可实现两个设备的互相通信 单片机的串口可以使单片机与单片机、单片机与电脑、单片机与各式各样的模块互相通信&#…

深入浅出AXI协议(5)——数据读写结构读写响应结构

目录 一、前言 二、写选通&#xff08;Write strobes&#xff09; 三、窄传输&#xff08;Narrow transfers&#xff09; 1、示例1 2、示例2 四、字节不变性&#xff08;Byte invariance&#xff09; 五、未对齐的传输&#xff08;Unaligned transfers&#xff09; 六…

MySQL基本查询

MySQL基本查询 表的增删查改Create(增)Retrieve&#xff08;查&#xff09;select列全列查询指定列查询查询字段为表达式为查询结果指定别名结果去重 where 条件英语不及格的同学的英语成绩语文成绩在 [80, 90] 分的同学及语文成绩数学成绩是 58 或者 59 或者 98 或者 99 分的同…

华为云服务

【计算】 弹性云服务器ECS 弹性云服务器&#xff08;Elastic Cloud Server&#xff0c;ECS&#xff09;是由CPU、内存、操作系统、云硬盘组成的基础的计算组件。弹性云服务器创建成功后&#xff0c;您就可以像使用自己的本地PC或物理服务器一样&#xff0c;在云上使用弹性云服…

【C#项目实战】控制台游戏——勇士斗恶龙(1)

君兮_的个人主页 即使走的再远&#xff0c;也勿忘启程时的初心 C/C 游戏开发 Hello,米娜桑们&#xff0c;这里是君兮_&#xff0c;最近开始正式的步入学习游戏开发的正轨&#xff0c;想要通过写博客的方式来分享自己学到的知识和经验&#xff0c;这就是开设本专栏的目的。希望…

因为axios请求后端,接收不到token的问引出的问题

vue axios请求后端接受不到token的问题。 相关概念 什么是跨域&#xff1f; 跨域指的是在浏览器环境下&#xff0c;当发起请求的域&#xff08;或者网站&#xff09;与请求的资源所在的域之间存在协议、主机或端口中的任何一个条件不同的情况。换句话说&#xff0c;只要协议、…

ubuntu下Anaconda安装与使用教程

前言 好久没用anaconda了&#xff0c;还记得之前用anaconda的欢乐时光。pytorch和paddlepaddle(飞浆)&#xff0c;怀念&#xff0c;可生活&#xff08;换了ubuntu系统之后&#xff09;教会了我残忍&#xff08;可能很难有机会再用windows的anaconda了&#xff09;。找个时间&a…

爬虫源码---爬取小猫猫交易网站

前言&#xff1a; 本片文章主要对爬虫爬取网页数据来进行一个简单的解答&#xff0c;对与其中的数据来进行一个爬取。 一&#xff1a;环境配置 Python版本&#xff1a;3.7.3 IDE:PyCharm 所需库&#xff1a;requests &#xff0c;parsel 二&#xff1a;网站页面 我们需要…

Java设计模式:四、行为型模式-07:状态模式

文章目录 一、定义&#xff1a;状态模式二、模拟场景&#xff1a;状态模式2.1 状态模式2.2 引入依赖2.3 工程结构2.4 模拟审核状态流转2.4.1 活动状态枚举2.4.2 活动信息类2.4.3 活动服务接口2.4.4 返回结果类 三、违背方案&#xff1a;状态模式3.0 引入依赖3.1 工程结构3.2 活…

欧科云链研究院探析Facebook稳定币发行经历会不会在PayPal重演

引言 作者最近的报告-探析PayPal发行稳定币是否会重蹈Facebook覆辙-近期被英国的金融时报&#xff08;中文版&#xff09;刊登。由于该报告在欧科云链研究院内部反响较好&#xff0c;下面就带大家简单的剖析这篇报告的主要内容。 *这篇文章主要由对比分析&#xff08;已删减&a…