Flink入门实战详解

Flink入门实战

Flink项目构建

1)基于Maven+Idea创建项目:

使用maven进行项目构建,如图1所示。

图-34 构建maven项目

输入项目中的maven的坐标和存储坐标,如图2所示。

图2 maven坐标和存储位置

2)Maven依赖:

    <properties><maven.compiler.source>1.8</maven.compiler.source><maven.compiler.target>1.8</maven.compiler.target><encoding>UTF-8</encoding><scala.version>2.11.12</scala.version><scala.compat.version>2.11</scala.compat.version><hadoop.version>2.6.0</hadoop.version><flink.version>1.9.1</flink.version>
</properties>
<dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.11</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-scala_2.11</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-scala_2.11</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka-0.10_2.11</artifactId><version>${flink.version}</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.23</version></dependency><!-- flink-2-hadoop--><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-shaded-hadoop-2 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-shaded-hadoop-2</artifactId><version>2.7.5-9.0</version></dependency><!-- lombok --><!-- https://mvnrepository.com/artifact/org.projectlombok/lombok --><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.12</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.bahir/flink-connector-redis --><dependency><groupId>org.apache.bahir</groupId><artifactId>flink-connector-redis_2.11</artifactId><version>1.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-statebackend-rocksdb_2.11</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table</artifactId><type>pom</type><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner_2.11</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge_2.11</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-scala-bridge_2.11</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-common</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-cep-scala_2.11</artifactId><version>${flink.version}</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.58</version></dependency><dependency><groupId>ru.yandex.clickhouse</groupId><artifactId>clickhouse-jdbc</artifactId><version>0.2.4</version><exclusions><exclusion><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId></exclusion></exclusions></dependency>
</dependencies>
<build><sourceDirectory>src/main/scala</sourceDirectory><testSourceDirectory>src/test/scala</testSourceDirectory><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>2.5.1</version><configuration><source>${maven.compiler.source}</source><target>${maven.compiler.target}</target><!--<encoding>${project.build.sourceEncoding}</encoding>--></configuration></plugin><plugin><groupId>net.alchim31.maven</groupId><artifactId>scala-maven-plugin</artifactId><version>3.2.0</version><executions><execution><goals><goal>compile</goal><goal>testCompile</goal></goals><configuration><args><!--<arg>-make:transitive</arg>--><arg>-dependencyfile</arg><arg>${project.build.directory}/.scala_dependencies</arg></args></configuration></execution></executions></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-surefire-plugin</artifactId><version>2.18.1</version><configuration><useFile>false</useFile><disableXmlReport>true</disableXmlReport><includes><include>**/*Test.*</include><include>**/*Suite.*</include></includes></configuration></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>2.3</version><executions><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><filters><filter><artifact>*:*</artifact><excludes><!--zip -d learn_spark.jar META-INF/*.RSA META-INF/*.DSA META-INF/*.SF--><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters><transformers><transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"><mainClass>chapter1.BatchWordCount</mainClass></transformer></transformers></configuration></execution></executions></plugin></plugins>
</build>

Flink基础API概念

Flink编程是在分布式集合的基础的规律的编程模型(比如,执行filtering,mapping,updating,state,joining,grouping,defining,windows,aggregating)。这些集合可以通过外部数据源(比如从文件,kafka的topics、本地或者内存的集合)。通过下沉算子返回结果,比如将数据写入到一个分布式的文件中,或者控制台。Flink程序可以基于各种context、stanalone或者嵌入其他程序进行运行。可以在本地的jvm或者在多台机器间分布式运行。

基于外部的数据源,比如有界或者无界的数据源,我们可能会选择使用批处理的DataSet API或者流处理的DataStream API来处理。

需要注意的是,在DataStream和DataSet中的绝大多数的API是一致的,只需要替换对应的ExecutionEnvironment或者StreamExecutionEnvironment即可。

Flink在编程的过程中使用特定类——DataSet和DataStream来体现数据,类似Spark中的RDD。可以将其认为是一个可以拥有重复的不可变的集合。其中DataSet表示的是一个有界的数据集,DataStream则表示的是无界的集合。

这些集合在一些关键的地方和Java中的普通集合不同。首先,DataSet和DataStream是不可变的,这就意味着一旦被创建,便不能进行add或者remove的操作。同样也不能简单的查看集合内部的元素。

Flink可以通过外部的数据源来创建DataSet或者DataStream,也可以通过在一个已知的集合上面执行一系列的Transformation操作来转换产生新的集合。

Flink程序看起来就是一个普通的程序,进行数据的转换,每一个程序包含如下相同的集合基础概念,通用编程步骤如下:

1)创建一个执行环境ExecutionEnvironment。

2)加载或者创建初始化数据——DataSet或者DataStream。

3)在此数据基础之上进行特定的转化操作。

4)将计算的结果输出到特定的目的地。

5)触发作业的执行。

Flink编程的入口,便是ExecutionEnvironment,不同之处在于,DataSet和DataStream使用的ExecutionEnvironment不同。DataSet使用ExecutionEnviroment,而DataStream使用StreamExectionEnvironment。

获得ExecutionEnvironment可以通过ExecutionEnvironment的如下方法:

getExecutionEnvironment()

createLocalEnvironment()

createRemoteEnvironment(String host, int port, String... jarFiles)

通常情况下,我们只需要使用getExecutionEnvironment()即可,因为这种方式会自动选择正确的context。如果我们在IDE中执行,则会创建一个Local的Context,如果打包到集群中执行,会返回一个Cluster的Context。

加载数据源的方式有多种。可以一行一个的读入,比如CSV文件,或者自定义格式。如果只是从一个文本文件中按顺序读取行数据。只需要如下操作即可。

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<String> text = env.readTextFile("file:///path")

创建了一个DataStream或者DataSet,接下来便可以执行各种transformation转换操作。比如执行一个map操作。

创建一个新的DataStream,类型为Integer的集合。

DataStream中包含了最终的结果,我们可以将结果通过创建一个sink操作,写入外部系统中,比如:writeAsText(path)

一旦我们完成整个程序,我们需要通过调用StreamExecutionEnvironment的execute()方法来触发作业的执行。基于ExecutionEnvironment会在本地或者集群中执行。

execute()方法返回值为JobExecutionResult,包含本次执行时间或者累加器结果信息。

与Spark中的Transformation操作相同,Flink中的Transformation操作是Lazy懒加载的,需要execute()去触发。基于此,我们可以创建并添加程序的执行计划。进行任务调度和数据分离,执行更加高效。

目前Flink支持7种数据类型,分别为:

1)Java Tuples和Scala Case Classes。

2)Java POJOS(一种数据结构类型)。

3)Primitive Types(Java的基本数据类型)。

4)Regular Classes(普通类)。

5)Values。

6)Hadoop Writables。

7)SpecialTypes。

DataSet批处理API

Flink中的DataSet程序是实现数据集转换的常规程序(例如,Filter,映射,连接, 分组)。数据集最初是从某些来源创建的(例如,通过读取文件或从本地集合创建)。结果通过接收器返回,接收器可以例如将数据写入(分布式)文件或标准输出(例如命令行终端)。Flink程序可以在各种环境中运行,独立运行或嵌入其他程序中。执行可以在本地JVM中执行,也可以在许多计算机的集群上执行。

import org.apache.flink.api.scala._object WordCountOps {def main(args: Array[String]): Unit = {val env = ExecutionEnvironment.getExecutionEnvironmentval text = env.fromElements("Who's there?","I think I hear them. Stand, ho! Who's there?")val wordCounts:DataSet[(String, Int)] = text.flatMap(line => line.split("\\s+")).map((_, 1)).groupBy(0).sum(1)wordCounts.print()}}

Streaming流式处理API

Flink中的DataStream程序是实现数据流转换的常规程序(例如 filtering, updating state, defining windows, aggregating)。最初从各种源(例如, message queues, socket streams, files)创建数据流。结果通过接收器返回,接收器可以例如将数据写入文件或标准输出(例如命令行终端)。Flink程序可以在各种环境中运行,独立运行或嵌入其他程序中。执行可以在本地JVM中执行,也可以在许多计算机的集群上执行。

import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.scala.{DataStream, KeyedStream, StreamExecutionEnvironment}
object StreamDemo {
    def main(args: Array[String]): Unit = {
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        val file = env.socketTextStream("localhost", 9999)
        val spliFile: DataStream[String] = file.flatMap(_.split(" "))
        val wordAndOne: DataStream[(String, Int)] = spliFile.map((_, 1))
        val keyed = wordAndOne.keyBy(data=>data._1)
        val wordAndCount: DataStream[(String, Int)] = keyed.sum(1)
        wordAndCount.print()
        env.execute()
    }
}

要运行示例程序,首先从终端使用netcat启动输入流:

nc -lk 9999

只需键入一些单词就可以返回一个新单词。这些将是字数统计程序的输入。

Flink程序提交到集群

1)Web提交方式:

图3 web提交方式

1)脚本方式:

#!/bin/shFLINK_HOME=/home/bigdata/apps/flink$FLINK_HOME/bin/flink run \-c BatchDemo \/root/wc.jar \hdfs://hadoop101:8020/wordcount/words.txt \hdfs://hadoop101:8020/wordcount/output3

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/36899.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

开源API网关-ApacheShenYu首次按照启动遇到的问题

一.背景 公司有API网关产品需求&#xff0c;希望有图形化的后台管理功能。看到了ApacheShenYu&#xff0c;作为Apache的顶级项目&#xff0c;直接认可了。首先&#xff0c;感谢各位大神的付出&#xff0c;初步看这个项目是国内大厂中的大神创立的&#xff0c;在此表示膜拜&…

阿里云服务器入门使用教程——购买及操作系统选择并进行远程连接

文章目录 一、首先选择一个你自己要买的云服务器类型二、能选的就一个地域和一个操作系统&#xff0c;其他都是固定的三、创建完实例并使用finalshell连接的效果(要在完成后续步骤后才能连接)四、购买之后进入阿里云控制台&#xff0c;开通资源中心五、然后就可以看到已经帮你创…

Vue基础了解

目录 1、什么是Vue.js 2、Vue的优点 3、Vue的安装 4、Vue程序 5、Vue指令 代码演示&#xff1a; 6、Vue实例的生命周期 1、什么是Vue.js Vue (读音 /vjuː /&#xff0c;类似于 view) 是一套用于构建用户界面的渐进式框架。Vue 的核心库只关注视图层&#xff0c;不仅易…

Python28 十大机器学习算法之线性回归和逻辑回归

1.三类广义上的机器学习算法 监督学习。工作原理&#xff1a;该算法由一个目标/结果变量&#xff08;或因变量&#xff09;组成&#xff0c;该变量将从一组给定的预测变量&#xff08;自变量&#xff09;进行预测。使用这组变量&#xff0c;我们生成了一个将输入数据映射到所…

常见的工业信号指示灯及按钮开关代表什么?如何辨认?

信号指示灯&#xff0c;是用灯光监视电路和电气设备工作或位置状态的器件。也是自动化设备中最直观&#xff0c;唯一的监视元器件。主要的作用是通常用于反映电路的工作状态&#xff08;有电或无电&#xff09;、电气设备的工作状态&#xff08;运行、停运或试验&#xff09;和…

小程序web-view无法打开该页面的解决方法

问题&#xff1a;开发者工具可以正常打开&#xff0c;正式上线版小程序使用 web-view 组件测试时提示&#xff1a;“无法打开该页面&#xff0c;不支持打开 https://xxxxxx&#xff0c;请在“小程序右上角更多->反馈与投诉”中和开发者反馈。” 解决方法&#xff1a;需要配…

市场拓展招聘:完整指南

扩大招聘业务会给你带来很多挑战&#xff0c;更不用说你已经在处理的问题了。助教专业人士每周花近13个小时为一个角色寻找候选人。此外&#xff0c;客户的需求也在不断变化&#xff0c;招聘机构之间的竞争也在加剧。毫无疑问&#xff0c;对增长有战略的方法会有很大的帮助。一…

jeecg导入excel 含图片(嵌入式,浮动式)

jeecgboot的excel导入 含图片&#xff08;嵌入式&#xff0c;浮动式&#xff09; 一、啰嗦二、准备三、 代码1、代码&#xff08;修改覆写的ExcelImportServer&#xff09;2、代码&#xff08;修改覆写的PoiPublicUtil&#xff09;3、代码&#xff08;新增类SAXParserHandler&a…

【数学建模】——【python库】——【Pandas学习】

专栏&#xff1a;数学建模学习笔记 pycharm专业版免费激活教程见资源&#xff0c;私信我给你发 python相关库的安装&#xff1a;pandas,numpy,matplotlib&#xff0c;statsmodels 总篇&#xff1a;【数学建模】—【新手小白到国奖选手】—【学习路线】 第一卷&#xff1a;【数学…

总结一下Linux、Windows、Ubuntu、Debian、CentOS等到底是啥?及它们的区别是什么

小朋友你总是有很多问好 你是否跟我一样&#xff0c;不是计算机科班出身&#xff0c;很多东西都是拿着在用&#xff0c;并不知道为什么&#xff0c;或者对于它们的概念也是稀里糊涂的&#xff0c;比如今天说的这个。先简单描述下&#xff0c;我先前的疑问&#xff1a; Linux是…

layui+jsp项目中实现table单元格嵌入下拉选择框功能,下拉选择框可手动输入内容或选择默认值,修改后数据正常回显。

需求 table列表中的数据实现下拉框修改数据&#xff0c;当默认的下拉框不符合要求时&#xff0c;可手动输入内容保存。内容修改后表格显示修改后的值同时表格不刷新。 实现 layui框架下拉框组件只能选择存在的数据&#xff0c;不支持将输入的内容显示在input中的功能&#x…

【C++进阶9】异常

一、C语言传统的处理错误的方式 终止程序&#xff0c;如assert 如发生内存错误&#xff0c;除0错误时就会终止程序返回错误码 需要程序员自己去查找对应的错误 z如系统的很多库的接口函数都是通 过把错误码放到errno中&#xff0c;表示错误 二、C异常概念 异常&#xff1a;函…

传神论文中心|第14期人工智能领域论文推荐

在人工智能领域的快速发展中&#xff0c;我们不断看到令人振奋的技术进步和创新。近期&#xff0c;开放传神&#xff08;OpenCSG&#xff09;社区发现了一些值得关注的成就。传神社区本周也为对AI和大模型感兴趣的读者们提供了一些值得一读的研究工作的简要概述以及它们各自的论…

steam搬砖

​   CS2/Steam游戏拆砖项目如何赚钱&#xff0c;利润在哪里&#xff1f;    1、利润主要来自于汇差。例如&#xff0c;今天美元的汇率是1美元7.3人民币&#xff0c;100美元730人民币。但事实上&#xff0c;通过某些特定渠道&#xff08;如TB&#xff09;充值100美元仅需55…

Meet AI4S 直播预告丨房价分析新思路:神经网络直击复杂地理环境中的空间异质性

近年来&#xff0c;房地产市场起起落落&#xff0c;房价已经成为了扰动居民幸福感的重要影响因素。大多数家庭都需要面对「买不买房、何时买房、在哪儿买房、买什么房」的艰难抉择&#xff0c;每一个问题的答案都在某种程度上与房价的波动息息相关。 近年来&#xff0c;我国各…

RocketMq源码解析九:刷盘机制及过期文件删除

一、刷盘机制 刷盘策略在不同时间进行刷写磁盘。RocketMQ的存储是基于JDK NIO的内存映射机制(MappedByteBuffer)的,消息存储首先将消息追加到内存,再根据配置的刷写磁盘 同步刷盘表示消息追加到内存后,立即将数据刷写到文件系统中。代码的调用链如下: submi…

【新版本来袭】ONLYOFFICE桌面编辑器8.1 —— 重塑办公效率与体验

文章目录 一、功能完善的PDF编辑器&#xff1a;重塑文档处理体验编辑文本插入和修改各种对象&#xff0c;如表格、形状、文本框、图像、艺术字、超链接、方程式等添加、旋转和删除页面添加文本注释和标注 二、幻灯片版式设计&#xff1a;创意展示的无限舞台三、改进从右至左显示…

OCR训练和C#部署英文字符训练

PaddleOCR是一个基于飞桨开发的OCR&#xff08;Optical Character Recognition&#xff0c;光学字符识别&#xff09;系统。其技术体系包括文字检测、文字识别、文本方向检测和图像处理等模块。以下是其优点&#xff1a; 高精度&#xff1a;PaddleOCR采用深度学习算法进行训练…

Web渗透:php反序列化漏洞

反序列化漏洞&#xff08;Deserialization Vulnerability&#xff09;是一种在应用程序处理数据的过程中&#xff0c;因不安全的反序列化操作引发的安全漏洞&#xff1b;反序列化是指将序列化的数据&#xff08;通常是字节流或字符串&#xff09;转换回对象的过程&#xff0c;如…

【MySQL备份】lvm-snapshot篇

目录 1.简介 1.1.如何工作 1.2.应用场景 1.3.注意事项 1.4.优缺点 2.为什么选择lvm快照备份&#xff1f; 3.创建LVM 3.1.操作流程 3.2.正常安装MySQL后进行备份 3.3.MySQL运行一段时间后进行备份 3.3.1.准备lvm及文件系统//先添加一块磁盘 3.3.2.将数据迁移到LVM …