Flink常见流处理API

Flink 流处理API的编程可以分为environment,source,transform,sink四大部分

1 Flink支持的数据类型

  在Flink底层因为要对所有的数据序列化,反序列化对数据进行传输,以便通过网络传送它们,或者从状态后端、检查点和保存点读取它们。所以Flink要有一套自己的类型提取系统,就是TypeInformation机制。Flink使用类型信息的概念来表示数据类型,并为每个数据类型生成特定的序列化器、反序列化器和比较器。这里其实就是说在转换过程中必须是他支持的数据类型才能转换成TypeInformation。

基本上我们一般能够用到的数据类型常见的都支持,如下:

  (1)Flink支持所有的Java和Scala基础数据类型,Int, Double, Long, String, …

  (2)Java和Scala元组(Tuples),最多25个字段,不支持空字段

  (3)cala样例类(case classes),最多22个字段,不支持空字段

  (4) Java简单对象(POJOs)

  (5)Row具有任意数量字段的元组并支持空字段

  (6)Arrays, Lists, Maps, Enums, 等等

2 执行环境Environment

  Flink编程的第一步首先是创建一个执行环境,表示当前执行程序的上下文。Environment可以通过以下几种方式构建

  (1)getExecutionEnvironment

val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
或
val env: ExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

  如果程序是独立调用的,则此方法返回本地执行环境;如果从命令行客户端调用程序以提交到集群,则此方法返回此集群的执行环境,也就是说,getExecutionEnvironment会根据查询运行的方式决定返回什么样的运行环境,是最常用的一种创建执行环境的方式。如果没有设置并行度,会以flink-conf.yaml中的配置为准,默认是1。

  (2)createLocalEnvironment

val env = StreamExecutionEnvironment.createLocalEnvironment(1)

  返回本地执行环境,需要在调用时指定默认的并行度。

  (3)createRemoteEnvironment

val env = ExecutionEnvironment.createRemoteEnvironment("jobmanage-hostname", YOURJobManagerHOST,"YOURPATH//wordcount.jar")

  返回集群执行环境,将Jar提交到远程服务器。需要在调用时指定JobManager的IP和端口号,并指定要在集群中运行的Jar包。

3 Source

  (1)从集合读取数据

val stream = env.fromCollection(List("a","b","c"))

  (2)从文件读取数据

val stream = env.readTextFile("YOUR_FILE_PATH")

  (3)kafka消息队列的数据作为来源

  需要引入kafka连接器的依赖:

<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka-0.11 -->
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka-0.11_2.11</artifactId><version>1.10.0</version>
</dependency>

  具体代码如下:

val properties = new Properties()
properties.setProperty("bootstrap.servers", "localhost:9092")
properties.setProperty("group.id", "consumer-group")
properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
properties.setProperty("auto.offset.reset", "latest")val stream = env.addSource(new FlinkKafkaConsumer011[String]("topic_name", new SimpleStringSchema(), properties))

  (4)自定义Source

    除了以上的source数据来源,我们还可以自定义source。需要做的,只是传入一个SourceFunction就可以。具体调用如下:

val stream = env.addSource( new MySource() )case class CustomSource(id:String,times:String)
class MySource extends SourceFunction[CustomSource]{// running表示数据源是否还在正常运行var running: Boolean = trueoverride def cancel(): Unit = {running = false}override def run(ctx: SourceFunction.SourceContext[CustomSource]): Unit = {while(running){ctx.collect(CustomSource(UUID.randomUUID().toString,System.currentTimeMillis().toString))Thread.sleep(100)}}
}

4 Transform

  (1)map:输入一个元素,输出一个元素,可以用来做一些清洗,转换工作。DataStream → DataStream

val streamMap = stream.map { x => x * 2 }

  (2)flatMap:和Map相似,可以理解为将输入的元素压平,从而对输出结果的数量不做要求,可以为0、1或者多个,多用于拆分操作。DataStream → DataStream

val streamFlatMap = stream.flatMap{x => x.split(" ")
}

  (3)filter:过滤筛选,将所有符合判断条件的结果集输出,DataStream → DataStream

val streamFilter = stream.filter{x => x > 1
}

  (4)KeyBy:逻辑地将一个流拆分成不相交的分区,每个分区包含具有相同key的元素,在内部以hash的形式实现的,返回KeyedStream。DataStream -> KeyedStream

注意:以下类型无法作为key①POJO类,且没有实现hashCode函数②任意形式的数组类型

dataStream.keyBy("someKey") // Key by field "someKey"
dataStream.keyBy(0) // Key by the first element of a Tuple

  (5)滚动聚合算子(Rolling Aggregation)

对KeyedStream按指定字段滚动聚合并输出每一次滚动聚合后的结果,常见的有sum(),min(),max(),minBy(),maxBy()等,KeyedStream → DataStream

min(),max(), minBy(),maxBy()这些算子可以针对KeyedStream的每一个支流做聚合

keyedStream.sum(0)
keyedStream.sum("key")
keyedStream.min(0)
keyedStream.min("key")
keyedStream.max(0)
keyedStream.max("key")
keyedStream.minBy(0)
keyedStream.minBy("key")
keyedStream.maxBy(0)
keyedStream.maxBy("key")

  min和minBy的区别是min返回的是一个最小值,而minBy返回的是其字段中包含的最小值的元素(同样元原理适用于max和maxBy)

  (6)fold:用一个初始的一个值,与其每个元素进行滚动合并操作。KeyedStream → DataStream

val result: DataStream[String] =keyedStream.fold("start")((str, i) => { str + "-" + i })

当应用于序列(1,2,3,4,5)时,发出序列“start-1”、“start-1-2”、“start-1-2”,…

  (6)reduce:一个分组数据流的聚合操作,合并当前的元素和上次聚合的结果,产生一个新的值,返回的流中包含每一次聚合的结果,而不是只返回最后一次聚合的最终结果。KeyedStream → DataStream

case class WC(val word: String, val count: Int)val wordCounts = stream.groupBy("word").reduce {(w1, w2) => new WC(w1.word, w1.count + w2.count)
}

  (7) Split 和 Select

  Split :根据某些特征把一个DataStream拆分成两个或者多个DataStream。DataStream → SplitStream

  Select:从一个SplitStream中获取一个或者多个DataStream。SplitStream→DataStream

val split = someDataStream.split((num: Int) =>(num % 2) match {case 0 => List("even")case 1 => List("odd")}
)val even = split select "even"
val odd = split select "odd"
val all = split.select("even","odd")

  (8) Connect和 CoMap、CoFlatMap

  Connect:连接两个保持他们类型的数据流,两个数据流被Connect之后,只是被放在了同一个流中,内部依然保持各自的数据形式不发生任何变化,两个流相互独立。DataStream,DataStream → ConnectedStreams

  CoMap,CoFlatMap:作用于ConnectedStreams上,功能与map和flatMap一样,对ConnectedStreams中的每一个Stream分别进行map和flatMap处理。ConnectedStreams → DataStream

someStream : DataStream[Int] = ...
otherStream : DataStream[String] = ...
val connectedStreams = someStream.connect(otherStream)connectedStreams.map((_ : Int) => true,(_ : String) => false
)
connectedStreams.flatMap((_ : Int) => true,(_ : String) => false
)

  Connect与 Union 区别:①Union之前两个流的类型必须是一样,Connect可以不一样,在之后的coMap中再去调整成为一样的。② Connect只能操作两个流,Union可以操作多个。

  (9)iterate

  在流程中创建一个反馈循环,将一个操作的输出重定向到之前的操作。DataStream --> IterativeStream --> DataStream

initialStream.iterate {iteration => {val iterationBody = iteration.map {/*do something*/}(iterationBody.filter(_ > 0), iterationBody.filter(_ <= 0))}
}

  (10)extract timestamps

  提取记录中的时间戳来跟需要事件时间的window一起发挥作用。DataStream --> DataStream

stream.assignTimestamps { timestampExtractor }

5 Sink

  官方提供了一部分的框架的sink。除此以外,需要用户自定义实现sink。

Apache Kafka (source/sink)
Apache Cassandra (sink)
Amazon Kinesis Streams (source/sink)
Elasticsearch (sink)
Hadoop FileSystem (sink)
RabbitMQ (source/sink)
Apache NiFi (source/sink)
Twitter Streaming API (source)Apache ActiveMQ (source/sink)
Apache Flume (sink)
Redis (sink)
Akka (sink)
Netty (source)

  (1)kafka

  需要添加依赖

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka-0.11_2.11</artifactId><version>1.10.0</version>
</dependency>

  主函数中添加sink

datastream.addSink(new FlinkKafkaProducer011[String]("localhost:9092", "test", new SimpleStringSchema()))

  (2)redis

  添加依赖

<dependency><groupId>org.apache.bahir</groupId><artifactId>flink-connector-redis_2.11</artifactId><version>1.0</version>
</dependency>

  定义一个redis的mapper类,用于定义保存到redis时调用的命令:

class RedisExampleMapper extends RedisMapper[(String, String)]{override def getCommandDescription: RedisCommandDescription = {new RedisCommandDescription(RedisCommand.HSET, "HASH_NAME")}override def getKeyFromData(data: (String, String)): String = data._1override def getValueFromData(data: (String, String)): String = data._2
}

  在主函数中调用:

val conf = new FlinkJedisPoolConfig.Builder().setHost("127.0.0.1").build()
stream.addSink(new RedisSink[(String, String)](conf, new RedisExampleMapper))

  (3)Elasticsearch

  添加依赖

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-elasticsearch6_2.11</artifactId><version>1.10.0</version>
</dependency>

  在主函数中调用:

import org.apache.flink.api.common.functions.RuntimeContext
import org.apache.flink.streaming.api.datastream.DataStream
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer
import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSinkimport org.apache.http.HttpHost
import org.elasticsearch.action.index.IndexRequest
import org.elasticsearch.client.Requestsimport java.util.ArrayList
import java.util.Listval input: DataStream[String] = ...val httpHosts = new java.util.ArrayList[HttpHost]
httpHosts.add(new HttpHost("127.0.0.1", 9300, "http"))
httpHosts.add(new HttpHost("10.2.3.1", 9300, "http"))val esSinkBuilder = new ElasticsearchSink.Builer[String](httpHosts,new ElasticsearchSinkFunction[String] {def createIndexRequest(element: String): IndexRequest = {val json = new java.util.HashMap[String, String]json.put("data", element)return Requests.indexRequest().index("my-index").type("my-type").source(json)}}
)// configuration for the bulk requests; this instructs the sink to emit after every element, otherwise they would be buffered
esSinkBuilder.setBulkFlushMaxActions(1)// provide a RestClientFactory for custom configuration on the internally created REST client
esSinkBuilder.setRestClientFactory(restClientBuilder -> {restClientBuilder.setDefaultHeaders(...)restClientBuilder.setMaxRetryTimeoutMillis(...)restClientBuilder.setPathPrefix(...)restClientBuilder.setHttpClientConfigCallback(...)}
)// finally, build and add the sink to the job's pipeline
input.addSink(esSinkBuilder.build)

  (4)JDBC 自定义sink

  以mysql为例,添加依赖

<dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.44</version>
</dependency>

  添加MysqlJdbcSink

class MysqlJdbcSink() extends RichSinkFunction[(String, String)]{var conn: Connection = _var insertStmt: PreparedStatement = _var updateStmt: PreparedStatement = _// open 主要是创建连接override def open(parameters: Configuration): Unit = {super.open(parameters)conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/test", "root", "root")insertStmt = conn.prepareStatement("INSERT INTO mysqljdbcsink (id, name) VALUES (?, ?)")updateStmt = conn.prepareStatement("UPDATE mysqljdbcsink SET id = ? WHERE name = ?")}// 调用连接,执行sqloverride def invoke(value: (String, String), context: SinkFunction.Context[_]): Unit = {updateStmt.setString(1, value._1)updateStmt.setString(2, value._2)updateStmt.execute()if (updateStmt.getUpdateCount == 0) {insertStmt.setString(1, value._1)insertStmt.setString(2, value._2)insertStmt.execute()}}override def close(): Unit = {insertStmt.close()updateStmt.close()conn.close()}
}

  主函数中调用

dataStream.addSink(new MysqlJdbcSink())

6 UDF函数

6.1 函数类(Function Classes)

  函数类:就是在Flink里面每一步运算,转换,包括source和sink。每一个算子里面的参数都可以传入一个所谓的函数类。就提供了更多更灵活的实现自己功能的方法。Flink暴露了所有udf函数的接口(实现方式为接口或者抽象类)。例如MapFunction, FilterFunction, ProcessFunction等等

  实现了FilterFunction接口如下:

class MyFilter extends FilterFunction[String] {override def filter(value: String): Boolean = {value.contains("flink")}
}
val filterStream = stream.filter(new FlinkFilter)

  将函数实现成匿名类

val filterStream = stream.filter(new RichFilterFunction[String] {override def filter(value: String): Boolean = {value.contains("flink")}}
)

6.2 富函数(Rich Functions)

  “富函数”是DataStream API提供的一个函数类的接口,所有Flink函数类都有其Rich版本。它与常规函数的不同在于,可以获取运行环境的上下文,并拥有一些生命周期方法,所以可以实现更复杂的功能。如RichMapFunction, RichFlatMapFunction,RichFilterFunction

  Rich Function有一个生命周期的概念。典型的生命周期方法有:

  ①open()方法是rich function的初始化方法,当一个算子例如map或者filter被调用之前open()会被调用。

  ②close()方法是生命周期中的最后一个调用的方法,做一些清理工作。

  ③getRuntimeContext()方法提供了函数的RuntimeContext的一些信息,例如函数执行的并行度,任务的名字,以及state状态

class MyFlatMap extends RichFlatMapFunction[Int, (Int, Int)] {var subTaskIndex = 0override def open(configuration: Configuration): Unit = {subTaskIndex = getRuntimeContext.getIndexOfThisSubtask// 以下可以做一些初始化工作,
}override def flatMap(in: Int, out: Collector[(Int, Int)]): Unit = {if (in % 2 == subTaskIndex) {out.collect((subTaskIndex, in))}
}override def close(): Unit = {// 以下做一些清理工作}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/473570.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Flask框架项目实例:**租房网站(二)

Flask是一款MVC框架&#xff0c;主要是从模型、视图、模板三个方面对Flask框架有一个全面的认识&#xff0c;通过完成作者-读书功能&#xff0c;先来熟悉Flask框架的完整使用步骤。 操作步骤为&#xff1a; 1.创建项目2.配置数据库3.定义模型类4.定义视图并配置URL 5.定义模板…

Android中的APK,TASK,PROCESS,USERID之间的关系

开发Android已经有一段时间了&#xff0c;今天接触到底层的东西&#xff0c;所以对于进程&#xff0c;用户的id以及Android中的Task,Apk之间的关系&#xff0c;要做一个研究&#xff0c;下面就是研究结果: apk一般占一个dalvik,一个进程,一个task。当然通过通过设置也可以多个进…

天池 在线编程 插入五

文章目录1. 题目2. 解题1. 题目 描述 给定一个数字&#xff0c;在数字的任意位置插入一个5&#xff0c;使得插入后的这个数字最大 示例 样例 1: 输入: a 234 输出: 5234 来源&#xff1a;https://tianchi.aliyun.com/oj/141758389886413149/160295184768372892 2. 解…

Flink的Window

1 Window概述 streaming流式计算是一种被设计用于处理无限数据集的数据处理引擎&#xff0c;而无限数据集是指一种不断增长的本质上无限的数据集&#xff0c;而window是一种切割无限数据为有限块进行处理的手段。 Window是无限数据流处理的核心&#xff0c;Window将一个无限的s…

标记语言Markdown介绍以及日常使用

Markdown介绍 Markdown是一种文本标记语言&#xff0c;用于快速文档排版Markdown文件为纯文本文件&#xff0c;后缀名为 .mdMarkdown介于Word和HTML之间 比起Word&#xff0c;Markdown是纯文本&#xff0c;排版文档轻量、方便、快速。比起HTML&#xff0c;Markdown简单直观&…

Everyday is an Opportunity

Quote Of The Day: “Everyday is an Opportunity to Learn and Grow, Don’t Waste Your Opportunity.” – Alan THE END 转载于:https://www.cnblogs.com/lan1x/p/3572914.html

天池 在线编程 有效的字符串

文章目录1. 题目2. 解题1. 题目 描述 如果字符串的所有字符出现的次数相同&#xff0c;则认为该字符串是有效的。 如果我们可以在字符串的某1个索引处删除1个字符&#xff0c;并且其余字符出现的次数相同&#xff0c;那么它也是有效的。 给定一个字符串s&#xff0c;判断它是否…

Flink的时间语义和Watermark

1 时间语义 数据迟到的概念是&#xff1a;数据先产生&#xff0c;但是处理的时候滞后了 在Flink的流式处理中&#xff0c;会涉及到时间的不同概念&#xff0c;如下图所示&#xff1a; Event Time&#xff1a;是事件创建的时间。它通常由事件中的时间戳描述&#xff0c;例如采集…

数据分析案例:亚洲国家人口数据计算

数据截图: 数据下载地址&#xff1a;链接&#xff1a;https://pan.baidu.com/s/1dGHwAC5 密码&#xff1a;nfd2 该数据包含了2006年-2015年10年间亚洲地区人口数量数据&#xff0c;共10行50列数据。我们需要使用Numpy完成如下数据任务: 计算2015年各个国家人口数据计算朝鲜历…

LeetCode 1646. 获取生成数组中的最大值

文章目录1. 题目2. 解题1. 题目 给你一个整数 n 。按下述规则生成一个长度为 n 1 的数组 nums &#xff1a; nums[0] 0nums[1] 1当 2 < 2 * i < n 时&#xff0c;nums[2 * i] nums[i]当 2 < 2 * i 1 < n 时&#xff0c;nums[2 * i 1] nums[i] nums[i 1]…

【CentOS 6.5】QtCreator启动时关于dbus-1的错误解决方法

关于上篇文章留下的启动QtCreator提示:dbus_connection_can_send_type的错误,解决办法: 更新dbus版本来解决.. 首先去 http://dbus.freedesktop.org/releases/dbus/ 下载dbus的最新版本... 解压后,进入 文件夹:dbus-1.8.0运行如下命令: ./configure --prefix/usrmakesudo make …

TotoiseSVN的基本使用方法

一、签入源代码到SVN服务器 假如我们使用Visual Studio在文件夹StartKit中创建了一个项目&#xff0c;我们要把这个项目的源代码签入到SVN Server上的代码库中里&#xff0c;首先右键点击StartKit文件夹&#xff0c;这时候的右键菜单如下图所示&#xff1a; 图2-2-1 点击Import…

Flink的ProcessFunction API

1 ProcessFunction ProcessFunction是一个低阶的流处理操作&#xff0c;可以访问事件(event)(流元素)&#xff0c;状态(state)(容错性&#xff0c;一致性&#xff0c;仅在keyed stream中)&#xff0c;定时器(timers)(event time和processing time&#xff0c; 仅在keyed stream…

LeetCode 1647. 字符频次唯一的最小删除次数(贪心)

文章目录1. 题目2. 解题1. 题目 如果字符串 s 中 不存在 两个不同字符 频次 相同的情况&#xff0c;就称 s 是 优质字符串 。 给你一个字符串 s&#xff0c;返回使 s 成为 优质字符串 需要删除的 最小 字符数。 字符串中字符的 频次 是该字符在字符串中的出现次数。 例如&am…

分享Db4o的便捷封装类源码

导言 大家好&#xff0c;话说真是好久好久没写文章了&#xff0c;哈哈。 最近在写网站&#xff0c;个人对传统数据库天然抵触&#xff0c;感觉非常繁冗&#xff0c;即便是Entity Framework也过于庞杂了&#xff0c;Db4o这种轻量级且读写、配置都极其方便的新型数据库非常适合我…

Flink中的状态管理

1 Flink中的状态 当数据流中的许多操作只查看一个每次事件(如事件解析器)&#xff0c;一些操作会跨多个事件的信息(如窗口操作)。这些操作称为有状态。状态由一个任务维护&#xff0c;并且用来计算某个结果的所有数据&#xff0c;都属于这个任务的状态。可以简单的任务状态就是…

Python之日志处理(logging模块)

主要内容 日志相关概念logging模块简介使用logging提供的模块级别的函数记录日志logging模块日志流处理流程使用logging四大组件记录日志配置logging的几种方式向日志输出中添加上下文信息参考文档 一、日志相关概念 日志是一种可以追踪某些软件运行时所发生事件的方法。软件开…

LeetCode 514. 自由之路(记忆化递归 / DP)

文章目录1. 题目2. 解题1. 题目 电子游戏“辐射4”中&#xff0c;任务“通向自由”要求玩家到达名为“Freedom Trail Ring”的金属表盘&#xff0c;并使用表盘拼写特定关键词才能开门。 给定一个字符串 ring&#xff0c;表示刻在外环上的编码&#xff1b;给定另一个字符串 ke…

thinkpad s3 安装win8 kali双系统笔记

前段时间入手了一台thinkpad s3(i7,8G),预装了win8系统,windows下写代码,环境配置比较麻烦,就想安装kali linux做双系统,官方参考文档:http://docs.kali.org/installation/dual-boot-kali-with-windows但s3预装了64bit win8,采用的是UEFI启动,官方文档并不完全适用,所以折腾了一…

Flink中的容错机制

1 checkpoint Flink 故障恢复机制的核心&#xff0c;就是应用状态的一致性检查点checkpoint。 在Spark Streaming中仅仅是针对driver的故障恢复做了数据和元数据的Checkpoint&#xff0c;处理的是当前时间点所有分区当前数据的状态。在Flink中不能把当前所有分区的数据直接存下…