Flink入门实战详解

Flink入门实战

Flink项目构建

1)基于Maven+Idea创建项目:

使用maven进行项目构建,如图1所示。

图-34 构建maven项目

输入项目中的maven的坐标和存储坐标,如图2所示。

图2 maven坐标和存储位置

2)Maven依赖:

    <properties><maven.compiler.source>1.8</maven.compiler.source><maven.compiler.target>1.8</maven.compiler.target><encoding>UTF-8</encoding><scala.version>2.11.12</scala.version><scala.compat.version>2.11</scala.compat.version><hadoop.version>2.6.0</hadoop.version><flink.version>1.9.1</flink.version>
</properties>
<dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.11</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-scala_2.11</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-scala_2.11</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka-0.10_2.11</artifactId><version>${flink.version}</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.23</version></dependency><!-- flink-2-hadoop--><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-shaded-hadoop-2 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-shaded-hadoop-2</artifactId><version>2.7.5-9.0</version></dependency><!-- lombok --><!-- https://mvnrepository.com/artifact/org.projectlombok/lombok --><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.12</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.bahir/flink-connector-redis --><dependency><groupId>org.apache.bahir</groupId><artifactId>flink-connector-redis_2.11</artifactId><version>1.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-statebackend-rocksdb_2.11</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table</artifactId><type>pom</type><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner_2.11</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge_2.11</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-scala-bridge_2.11</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-common</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-cep-scala_2.11</artifactId><version>${flink.version}</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.58</version></dependency><dependency><groupId>ru.yandex.clickhouse</groupId><artifactId>clickhouse-jdbc</artifactId><version>0.2.4</version><exclusions><exclusion><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId></exclusion></exclusions></dependency>
</dependencies>
<build><sourceDirectory>src/main/scala</sourceDirectory><testSourceDirectory>src/test/scala</testSourceDirectory><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>2.5.1</version><configuration><source>${maven.compiler.source}</source><target>${maven.compiler.target}</target><!--<encoding>${project.build.sourceEncoding}</encoding>--></configuration></plugin><plugin><groupId>net.alchim31.maven</groupId><artifactId>scala-maven-plugin</artifactId><version>3.2.0</version><executions><execution><goals><goal>compile</goal><goal>testCompile</goal></goals><configuration><args><!--<arg>-make:transitive</arg>--><arg>-dependencyfile</arg><arg>${project.build.directory}/.scala_dependencies</arg></args></configuration></execution></executions></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-surefire-plugin</artifactId><version>2.18.1</version><configuration><useFile>false</useFile><disableXmlReport>true</disableXmlReport><includes><include>**/*Test.*</include><include>**/*Suite.*</include></includes></configuration></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>2.3</version><executions><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><filters><filter><artifact>*:*</artifact><excludes><!--zip -d learn_spark.jar META-INF/*.RSA META-INF/*.DSA META-INF/*.SF--><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters><transformers><transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"><mainClass>chapter1.BatchWordCount</mainClass></transformer></transformers></configuration></execution></executions></plugin></plugins>
</build>

Flink基础API概念

Flink编程是在分布式集合的基础的规律的编程模型(比如,执行filtering,mapping,updating,state,joining,grouping,defining,windows,aggregating)。这些集合可以通过外部数据源(比如从文件,kafka的topics、本地或者内存的集合)。通过下沉算子返回结果,比如将数据写入到一个分布式的文件中,或者控制台。Flink程序可以基于各种context、stanalone或者嵌入其他程序进行运行。可以在本地的jvm或者在多台机器间分布式运行。

基于外部的数据源,比如有界或者无界的数据源,我们可能会选择使用批处理的DataSet API或者流处理的DataStream API来处理。

需要注意的是,在DataStream和DataSet中的绝大多数的API是一致的,只需要替换对应的ExecutionEnvironment或者StreamExecutionEnvironment即可。

Flink在编程的过程中使用特定类——DataSet和DataStream来体现数据,类似Spark中的RDD。可以将其认为是一个可以拥有重复的不可变的集合。其中DataSet表示的是一个有界的数据集,DataStream则表示的是无界的集合。

这些集合在一些关键的地方和Java中的普通集合不同。首先,DataSet和DataStream是不可变的,这就意味着一旦被创建,便不能进行add或者remove的操作。同样也不能简单的查看集合内部的元素。

Flink可以通过外部的数据源来创建DataSet或者DataStream,也可以通过在一个已知的集合上面执行一系列的Transformation操作来转换产生新的集合。

Flink程序看起来就是一个普通的程序,进行数据的转换,每一个程序包含如下相同的集合基础概念,通用编程步骤如下:

1)创建一个执行环境ExecutionEnvironment。

2)加载或者创建初始化数据——DataSet或者DataStream。

3)在此数据基础之上进行特定的转化操作。

4)将计算的结果输出到特定的目的地。

5)触发作业的执行。

Flink编程的入口,便是ExecutionEnvironment,不同之处在于,DataSet和DataStream使用的ExecutionEnvironment不同。DataSet使用ExecutionEnviroment,而DataStream使用StreamExectionEnvironment。

获得ExecutionEnvironment可以通过ExecutionEnvironment的如下方法:

getExecutionEnvironment()

createLocalEnvironment()

createRemoteEnvironment(String host, int port, String... jarFiles)

通常情况下,我们只需要使用getExecutionEnvironment()即可,因为这种方式会自动选择正确的context。如果我们在IDE中执行,则会创建一个Local的Context,如果打包到集群中执行,会返回一个Cluster的Context。

加载数据源的方式有多种。可以一行一个的读入,比如CSV文件,或者自定义格式。如果只是从一个文本文件中按顺序读取行数据。只需要如下操作即可。

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<String> text = env.readTextFile("file:///path")

创建了一个DataStream或者DataSet,接下来便可以执行各种transformation转换操作。比如执行一个map操作。

创建一个新的DataStream,类型为Integer的集合。

DataStream中包含了最终的结果,我们可以将结果通过创建一个sink操作,写入外部系统中,比如:writeAsText(path)

一旦我们完成整个程序,我们需要通过调用StreamExecutionEnvironment的execute()方法来触发作业的执行。基于ExecutionEnvironment会在本地或者集群中执行。

execute()方法返回值为JobExecutionResult,包含本次执行时间或者累加器结果信息。

与Spark中的Transformation操作相同,Flink中的Transformation操作是Lazy懒加载的,需要execute()去触发。基于此,我们可以创建并添加程序的执行计划。进行任务调度和数据分离,执行更加高效。

目前Flink支持7种数据类型,分别为:

1)Java Tuples和Scala Case Classes。

2)Java POJOS(一种数据结构类型)。

3)Primitive Types(Java的基本数据类型)。

4)Regular Classes(普通类)。

5)Values。

6)Hadoop Writables。

7)SpecialTypes。

DataSet批处理API

Flink中的DataSet程序是实现数据集转换的常规程序(例如,Filter,映射,连接, 分组)。数据集最初是从某些来源创建的(例如,通过读取文件或从本地集合创建)。结果通过接收器返回,接收器可以例如将数据写入(分布式)文件或标准输出(例如命令行终端)。Flink程序可以在各种环境中运行,独立运行或嵌入其他程序中。执行可以在本地JVM中执行,也可以在许多计算机的集群上执行。

import org.apache.flink.api.scala._object WordCountOps {def main(args: Array[String]): Unit = {val env = ExecutionEnvironment.getExecutionEnvironmentval text = env.fromElements("Who's there?","I think I hear them. Stand, ho! Who's there?")val wordCounts:DataSet[(String, Int)] = text.flatMap(line => line.split("\\s+")).map((_, 1)).groupBy(0).sum(1)wordCounts.print()}}

Streaming流式处理API

Flink中的DataStream程序是实现数据流转换的常规程序(例如 filtering, updating state, defining windows, aggregating)。最初从各种源(例如, message queues, socket streams, files)创建数据流。结果通过接收器返回,接收器可以例如将数据写入文件或标准输出(例如命令行终端)。Flink程序可以在各种环境中运行,独立运行或嵌入其他程序中。执行可以在本地JVM中执行,也可以在许多计算机的集群上执行。

import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.scala.{DataStream, KeyedStream, StreamExecutionEnvironment}
object StreamDemo {
    def main(args: Array[String]): Unit = {
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        val file = env.socketTextStream("localhost", 9999)
        val spliFile: DataStream[String] = file.flatMap(_.split(" "))
        val wordAndOne: DataStream[(String, Int)] = spliFile.map((_, 1))
        val keyed = wordAndOne.keyBy(data=>data._1)
        val wordAndCount: DataStream[(String, Int)] = keyed.sum(1)
        wordAndCount.print()
        env.execute()
    }
}

要运行示例程序,首先从终端使用netcat启动输入流:

nc -lk 9999

只需键入一些单词就可以返回一个新单词。这些将是字数统计程序的输入。

Flink程序提交到集群

1)Web提交方式:

图3 web提交方式

1)脚本方式:

#!/bin/shFLINK_HOME=/home/bigdata/apps/flink$FLINK_HOME/bin/flink run \-c BatchDemo \/root/wc.jar \hdfs://hadoop101:8020/wordcount/words.txt \hdfs://hadoop101:8020/wordcount/output3

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/36899.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

开源API网关-ApacheShenYu首次按照启动遇到的问题

一.背景 公司有API网关产品需求&#xff0c;希望有图形化的后台管理功能。看到了ApacheShenYu&#xff0c;作为Apache的顶级项目&#xff0c;直接认可了。首先&#xff0c;感谢各位大神的付出&#xff0c;初步看这个项目是国内大厂中的大神创立的&#xff0c;在此表示膜拜&…

Zookeeper集群安装部署

简介 ZooKeeper是一个分布式的&#xff0c;开放源码的分布式应用程序协调服务&#xff0c;是Hadoop和Hbase的重要组件。它是一个为分布式应用提供一致性服务的软件&#xff0c;提供的功能包括&#xff1a;配置维护、域名服务、分布式同步、组服务等。 除了为Hadoop和HBase提供…

区块链开发基础知识及应用

区块链开发基础知识及应用 大家好&#xff0c;我是免费搭建查券返利机器人省钱赚佣金就用微赚淘客系统3.0的小编&#xff0c;也是冬天不穿秋裤&#xff0c;天冷也要风度的程序猿&#xff01;在本文中&#xff0c;我们将深入探讨区块链技术的基础知识及其在应用开发中的实际应用…

阿里云服务器入门使用教程——购买及操作系统选择并进行远程连接

文章目录 一、首先选择一个你自己要买的云服务器类型二、能选的就一个地域和一个操作系统&#xff0c;其他都是固定的三、创建完实例并使用finalshell连接的效果(要在完成后续步骤后才能连接)四、购买之后进入阿里云控制台&#xff0c;开通资源中心五、然后就可以看到已经帮你创…

耐磨材料元宇宙:探索未来科技的无限可能

随着科技的不断发展&#xff0c;我们正逐渐进入一个全新的时代——元宇宙。在这个虚拟世界中&#xff0c;人们可以自由地创造、探索和交流。而在元宇宙中&#xff0c;耐磨材料作为一种重要的基础资源&#xff0c;将为我们的虚拟世界带来更多的可能性。 一、耐磨材料在元宇宙中…

星戈瑞Sulfo Cy3 NHS Ester的水溶性与生物相容性

在生物医学研究领域&#xff0c;荧光标记技术已经成为一种科研工具。其中&#xff0c;Sulfo Cy3 NHS Ester作为一种荧光染料&#xff0c;因其水溶性和生物相容性而受应用。 Sulfo Cy3 NHS Ester的水溶性 Sulfo Cy3 NHS Ester在水中的溶解性较好&#xff0c;能够快速溶解并形成…

JS面试题5——JS继承有哪些方式

1. ES6 /* 此时的Child上只有name属性&#xff0c;没有age属性 */ <script> // 父 class Parent{constructor(){this.age 18;} } // 子 class Child{constructor(){this.name 张三;} } let o1 new Child(); console.log(o1, o1.name, o1.age); // 打印出&#xff1a;C…

Vue基础了解

目录 1、什么是Vue.js 2、Vue的优点 3、Vue的安装 4、Vue程序 5、Vue指令 代码演示&#xff1a; 6、Vue实例的生命周期 1、什么是Vue.js Vue (读音 /vjuː /&#xff0c;类似于 view) 是一套用于构建用户界面的渐进式框架。Vue 的核心库只关注视图层&#xff0c;不仅易…

Python28 十大机器学习算法之线性回归和逻辑回归

1.三类广义上的机器学习算法 监督学习。工作原理&#xff1a;该算法由一个目标/结果变量&#xff08;或因变量&#xff09;组成&#xff0c;该变量将从一组给定的预测变量&#xff08;自变量&#xff09;进行预测。使用这组变量&#xff0c;我们生成了一个将输入数据映射到所…

常见的工业信号指示灯及按钮开关代表什么?如何辨认?

信号指示灯&#xff0c;是用灯光监视电路和电气设备工作或位置状态的器件。也是自动化设备中最直观&#xff0c;唯一的监视元器件。主要的作用是通常用于反映电路的工作状态&#xff08;有电或无电&#xff09;、电气设备的工作状态&#xff08;运行、停运或试验&#xff09;和…

小程序web-view无法打开该页面的解决方法

问题&#xff1a;开发者工具可以正常打开&#xff0c;正式上线版小程序使用 web-view 组件测试时提示&#xff1a;“无法打开该页面&#xff0c;不支持打开 https://xxxxxx&#xff0c;请在“小程序右上角更多->反馈与投诉”中和开发者反馈。” 解决方法&#xff1a;需要配…

市场拓展招聘:完整指南

扩大招聘业务会给你带来很多挑战&#xff0c;更不用说你已经在处理的问题了。助教专业人士每周花近13个小时为一个角色寻找候选人。此外&#xff0c;客户的需求也在不断变化&#xff0c;招聘机构之间的竞争也在加剧。毫无疑问&#xff0c;对增长有战略的方法会有很大的帮助。一…

大数据面试题之Kafka(4)

目录 Kafka如何保证数据的ExactlyOnce? Kafka消费者怎么保证ExactlyOnce Kafka监控实现? Kafka中的数据能彻底删除吗? Kafka复制机制? Kafka分区多副本机制? Kafka分区分配算法 Kafka蓄水池机制 Kafka如何实现幂等性? Kafka的offset存在哪? Kafka中如何…

jeecg导入excel 含图片(嵌入式,浮动式)

jeecgboot的excel导入 含图片&#xff08;嵌入式&#xff0c;浮动式&#xff09; 一、啰嗦二、准备三、 代码1、代码&#xff08;修改覆写的ExcelImportServer&#xff09;2、代码&#xff08;修改覆写的PoiPublicUtil&#xff09;3、代码&#xff08;新增类SAXParserHandler&a…

【数学建模】——【python库】——【Pandas学习】

专栏&#xff1a;数学建模学习笔记 pycharm专业版免费激活教程见资源&#xff0c;私信我给你发 python相关库的安装&#xff1a;pandas,numpy,matplotlib&#xff0c;statsmodels 总篇&#xff1a;【数学建模】—【新手小白到国奖选手】—【学习路线】 第一卷&#xff1a;【数学…

总结一下Linux、Windows、Ubuntu、Debian、CentOS等到底是啥?及它们的区别是什么

小朋友你总是有很多问好 你是否跟我一样&#xff0c;不是计算机科班出身&#xff0c;很多东西都是拿着在用&#xff0c;并不知道为什么&#xff0c;或者对于它们的概念也是稀里糊涂的&#xff0c;比如今天说的这个。先简单描述下&#xff0c;我先前的疑问&#xff1a; Linux是…

layui+jsp项目中实现table单元格嵌入下拉选择框功能,下拉选择框可手动输入内容或选择默认值,修改后数据正常回显。

需求 table列表中的数据实现下拉框修改数据&#xff0c;当默认的下拉框不符合要求时&#xff0c;可手动输入内容保存。内容修改后表格显示修改后的值同时表格不刷新。 实现 layui框架下拉框组件只能选择存在的数据&#xff0c;不支持将输入的内容显示在input中的功能&#x…

学会这7种方法让你的Python代码更容易维护

随着软件项目进入“维护模式”&#xff0c;对可读性和编码标准的要求很容易落空&#xff08;甚至从一开始就没有建立过那些标准&#xff09;。然而&#xff0c;在代码库中保持一致的代码风格和测试标准能够显著减轻维护的压力&#xff0c;也能确保新的开发者能够快速了解项目的…

【leetcode--三数之和】

这道题记得之前做过&#xff0c;但是想不起来了。。总结一下&#xff1a; 函数的主要步骤和关键点&#xff1a; 排序&#xff1a;对输入的整数数组nums进行排序。这是非常重要的&#xff0c;因为它允许我们使用双指针技巧来高效地找到满足条件的三元组。初始化&#xff1a;定…

【C++进阶9】异常

一、C语言传统的处理错误的方式 终止程序&#xff0c;如assert 如发生内存错误&#xff0c;除0错误时就会终止程序返回错误码 需要程序员自己去查找对应的错误 z如系统的很多库的接口函数都是通 过把错误码放到errno中&#xff0c;表示错误 二、C异常概念 异常&#xff1a;函…