Flink DataStream API详解

DataStream API

参考:https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/datastream_api.html

Data Sources

Source是程序读取其输入的位置,您可以使用env.addSource(sourceFunction)将Source附加到程序中。Flink内置了许多预先实现的SourceFunction,但是您始终可以通过实现SourceFunction(non-parallel sources)来编写自定义Source,或者通过继承RichParallelSourceFunction或实现ParallelSourceFunction接口来实现并行Source.

File-based

readTextFile(path) - 逐行读取文本文件,底层使用TextInputFormat规范读取文件,并将其作为字符串返回

val env = StreamExecutionEnvironment.getExecutionEnvironment
val lines:DataStream[String]=env.readTextFile("file:///E:\\demo\\words")
lines.flatMap(_.split("\\s+"))
.map((_,1))
.keyBy(t=>t._1)
.sum(1)
.print()
env.execute("wordcount")

readFile(fileInputFormat, path) - 根据指定的文件输入格式读取文件(仅仅读取一次,类似批处理)

val env = StreamExecutionEnvironment.getExecutionEnvironment
val inputFormat=new TextInputFormat(null)
val lines:DataStream[String]=env.readFile(inputFormat,"file:///E:\\demo\\words")
lines.flatMap(_.split("\\s+"))
.map((_,1))
.keyBy(t=>t._1)
.sum(1)
.print()
env.execute("wordcount")

readFile(fileInputFormat, path, watchType, interval, pathFilter) - 这是前两个内部调用的方法。它根据给定的FileInputFormat读取指定路径下的文件,可以根据watchType定期检测指定路径下的文件,其中watchType的可选值为FileProcessingMode.PROCESS_CONTINUOUSLY或者FileProcessingMode.PROCESS_ONCE,检查的周期由interval参数决定。用户可以使用pathFilter参数排除该路径下需要排除的文件。如果指定watchType的值被设置为PROCESS_CONTINUOUSLY,表示一旦文件内容发生改变,整个文件内容会被重复处理。

val env = StreamExecutionEnvironment.getExecutionEnvironment
val inputFormat=new TextInputFormat(null)
val lines:DataStream[String]=env.readFile(inputFormat,"file:///E:\\demo\\words",FileProcessingMode.PROCESS_CONTINUOUSLY,5000,new FilePathFilter {override def filterPath(filePath: Path): Boolean = {filePath.getPath.endsWith(".txt")}})
lines.flatMap(_.split("\\s+"))
.map((_,1))
.keyBy(t=>t._1)
.sum(1)
.print()
env.execute("wordcount")

Socket-based

val env = StreamExecutionEnvironment.getExecutionEnvironment
val lines:DataStream[String]=env.socketTextStream("centos",9999)
lines.flatMap(_.split("\\s+"))
.map((_,1))
.keyBy(t=>t._1)
.sum(1)
.print()
env.execute("wordcount")

Collection-based(测试)

val env = StreamExecutionEnvironment.getExecutionEnvironment
val lines:DataStream[String]=env.fromCollection(List("this is a demo","good good"))
//val lines:DataStream[String]=env.fromElements("this is a demo","good good")
lines.flatMap(_.split("\\s+"))
.map((_,1))
.keyBy(t=>t._1)
.sum(1)
.print()
env.execute("wordcount")

Custom Source

import org.apache.flink.streaming.api.functions.source.{ParallelSourceFunction, SourceFunction}
import scala.util.Randomclass CustomSourceFunction extends ParallelSourceFunction[String]{@volatilevar isRunning:Boolean = trueval lines:Array[String] = Array("this is a demo","hello word","are you ok")override def run(ctx: SourceFunction.SourceContext[String]): Unit = {while(isRunning){Thread.sleep(1000)ctx.collect(lines(new Random().nextInt(lines.length)))//将数据输出给下游}}override def cancel(): Unit = {isRunning=false}
}
val env = StreamExecutionEnvironment.getExecutionEnvironment
val lines:DataStream[String]=env.addSource[String](new CustomSourceFunction)
lines.flatMap(_.split("\\s+"))
.map((_,1))
.keyBy(t=>t._1)
.sum(1)
.print()
env.execute("wordcount")

FlinkKafkaConsumer√

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_${scala.version}</artifactId><version>${flink.version}</version>
</dependency>
val env = StreamExecutionEnvironment.getExecutionEnvironment
val props = new Properties()
props.setProperty("bootstrap.servers", "centos:9092")
props.setProperty("group.id", "g1")
val lines=env.addSource(new FlinkKafkaConsumer("topic01",new SimpleStringSchema(),props))lines.flatMap(_.split("\\s+")).map((_,1)).keyBy(t=>t._1).sum(1).print()
env.execute("wordcount")

如果使用SimpleStringSchema,仅仅能获取value,如果用户希望获取更多信息,比如 key/value/partition/offset ,用户可以通过继承KafkaDeserializationSchema类自定义反序列化对象。

import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.flink.streaming.api.scala._class UserKafkaDeserializationSchema extends KafkaDeserializationSchema[(String,String)] {//这个方法永远返回falseoverride def isEndOfStream(nextElement: (String, String)): Boolean = {false}override def deserialize(record: ConsumerRecord[Array[Byte], Array[Byte]]): (String, String) = {var key=""if(record.key()!=null && record.key().size!=0){key=new String(record.key())}val value=new String(record.value())(key,value)}//告诉Flink tuple元素类型override def getProducedType: TypeInformation[(String, String)] = {createTypeInformation[(String, String)]}
}
val env = StreamExecutionEnvironment.getExecutionEnvironment
val props = new Properties()
props.setProperty("bootstrap.servers", "centos:9092")
props.setProperty("group.id", "g1")
val lines:DataStream[(String,String)]=env.addSource(new FlinkKafkaConsumer("topic01",new UserKafkaDeserializationSchema(),props))
lines.map(t=>t._2).flatMap(_.split("\\s+"))
.map((_,1))
.keyBy(t=>t._1)
.sum(1)
.print()
env.execute("wordcount")

如果Kafka存储的都是json格式的字符串数据,用户可以使用系统自带的一些支持json的Schema,推荐使用:

  • JsonNodeDeserializationSchema:要求value必须是json格式的字符串
  • JSONKeyValueDeserializationSchema(meta):要求key、value都必须是josn格式数据,同时可以携带元数据(分区、 offset等)
val env = StreamExecutionEnvironment.getExecutionEnvironment
val props = new Properties()
props.setProperty("bootstrap.servers", "centos:9092")
props.setProperty("group.id", "g1")
val jsonData:DataStream[ObjectNode]=env.addSource(new FlinkKafkaConsumer("topic01",new JSONKeyValueDeserializationSchema(true),props))
jsonData.map(on=> (on.get("value").get("id").asInt(),on.get("value").get("name")))
.print()
env.execute("wordcount")

Data Sinks

Data Sinks接收DataStream数据,并将其转发到指定文件,socket,外部存储系统或者print它们,Flink预定义一些输出Sink。

File-based

write*:writeAsText|writeAsCsv(…)|writeUsingOutputFormat,请注意DataStream上的write*()方法主要用于调试目的。

val env = StreamExecutionEnvironment.getExecutionEnvironment
val props = new Properties()
props.setProperty("bootstrap.servers", "centos:9092")
props.setProperty("group.id", "g1")
env.addSource(new FlinkKafkaConsumer[String]("topic01",new SimpleStringSchema(),props))
.flatMap(_.split("\\s+"))
.map((_,1))
.keyBy(0)
.sum(1)
.writeAsText("file:///E:/results/text",WriteMode.OVERWRITE)
env.execute("wordcount")

以上写法只能保证at_least_once的语义处理,如果是在生产环境下,推荐使用flink-connector-filesystem将数据写到外围系统,可以保证exactly-once语义处理。

val env = StreamExecutionEnvironment.getExecutionEnvironment
val props = new Properties()
props.setProperty("bootstrap.servers", "centos:9092")
props.setProperty("group.id", "g1")
val bucketingSink = new BucketingSink[(String,Int)]("hdfs://centos:9000/BucketingSink")
bucketingSink.setBucketer(new DateTimeBucketer("yyyyMMddHH"))//文件目录
bucketingSink.setBatchSize(1024)
env.addSource(new FlinkKafkaConsumer[String]("topic01",new SimpleStringSchema(),props))
.flatMap(_.split("\\s+"))
.map((_,1))
.keyBy(0)
.sum(1)
.addSink(bucketingSink)
.setParallelism(6)
env.execute("wordcount")

print() | printToErr()

val env = StreamExecutionEnvironment.getExecutionEnvironment
val props = new Properties()
props.setProperty("bootstrap.servers", "centos:9092")
props.setProperty("group.id", "g1")
fsEnv.addSource(new FlinkKafkaConsumer[String]("topic01",new SimpleStringSchema(),props))
.flatMap(_.split("\\s+"))
.map((_,1))
.keyBy(0)
.sum(1)
.print("测试") //输出前缀  当有多个流输出到控制台时,可以添加前缀加以区分
.setParallelism(2)
env.execute("wordcount")

Custom Sink

class  CustomSinkFunction extends  RichSinkFunction[(String,Int)]{override def open(parameters: Configuration): Unit = {println("初始化连接")}override def invoke(value: (String, Int), context: SinkFunction.Context[_]): Unit = {println(value)}override def close(): Unit = {println("关闭连接")}
}
val env = StreamExecutionEnvironment.getExecutionEnvironment
val props = new Properties()
props.setProperty("bootstrap.servers", "centos:9092")
props.setProperty("group.id", "g1")
env.addSource(new FlinkKafkaConsumer[String]("topic01",new SimpleStringSchema(),props))
.flatMap(_.split("\\s+"))
.map((_,1))
.keyBy(0)
.sum(1)
.addSink(new CustomSinkFunction)
env.execute("wordcount")

RedisSink√

  • 添加
<dependency><groupId>org.apache.bahir</groupId><artifactId>flink-connector-redis_2.11</artifactId><version>1.0</version>
</dependency>
class UserRedisMapper extends RedisMapper[(String,Int)]{// 设置数据类型override def getCommandDescription: RedisCommandDescription = {new RedisCommandDescription(RedisCommand.HSET,"wordcount")}override def getKeyFromData(data: (String, Int)): String = {data._1}override def getValueFromData(data: (String, Int)): String = {data._2.toString}
}
val env = StreamExecutionEnvironment.getExecutionEnvironment
val props = new Properties()
props.setProperty("bootstrap.servers", "centos:9092")
props.setProperty("group.id", "g1")
val jedisConfig=new FlinkJedisPoolConfig.Builder()
.setHost("centos")
.setPort(6379)
.build()
env.addSource(new FlinkKafkaConsumer[String]("topic01",new SimpleStringSchema(),props))
.flatMap(_.split("\\s+"))
.map((_,1))
.keyBy(0)
.sum(1)
.addSink(new RedisSink[(String, Int)](jedisConfig,new UserRedisMapper))
env.execute("wordcount")

FlinkKafkaProducer√

class UserKeyedSerializationSchema extends KeyedSerializationSchema[(String,Int)]{
Intoverride def serializeKey(element: (String, Int)): Array[Byte] = {element._1.getBytes()}override def serializeValue(element: (String, Int)): Array[Byte] = {element._2.toString.getBytes()}//可以覆盖 默认是topic,如果返回值为null,表示将数据写入到默认的topic中override def getTargetTopic(element: (String, Int)): String = {null}
}
val env = StreamExecutionEnvironment.getExecutionEnvironment
val props1 = new Properties()
props1.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "centos:9092")
props1.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "g1")
val props2 = new Properties()
props2.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "centos:9092")
props2.setProperty(ProducerConfig.BATCH_SIZE_CONFIG,"100")
props2.setProperty(ProducerConfig.LINGER_MS_CONFIG,"500")
props2.setProperty(ProducerConfig.ACKS_CONFIG,"all")
props2.setProperty(ProducerConfig.RETRIES_CONFIG,"2")
env.addSource(new FlinkKafkaConsumer[String]("topic01",new SimpleStringSchema(),props1))
.flatMap(_.split("\\s+"))
.map((_,1))
.keyBy(0)
.sum(1)
.addSink(new FlinkKafkaProducer[(String, Int)]("topic02",new UserKeyedSerializationSchema,props2))
env.execute("wordcount")

DataStream Transformations

Map

Takes one element and produces one element.

dataStream.map { x => x * 2 }

FlatMap

Takes one element and produces zero, one, or more elements.

dataStream.flatMap { str => str.split(" ") }

Filter

Evaluates a boolean function for each element and retains those for which the function returns true.

dataStream.filter { _ != 0 }

Union

Union of two or more data streams creating a new stream containing all the elements from all the streams.

dataStream.union(otherStream1, otherStream2, ...)

Connect

“Connects” two data streams retaining their types, allowing for shared state between the two streams.

val stream1 = env.socketTextStream("centos",9999)
val stream2 = env.socketTextStream("centos",8888)
stream1.connect(stream2).flatMap(line=>line.split("\\s+"),line=>line.split("\\s+"))
.map(Word(_,1))
.keyBy("word")
.sum("count")
.print()

Split

Split the stream into two or more streams according to some criterion.

val split = someDataStream.split((num: Int) =>(num % 2) match {case 0 => List("even")case 1 => List("odd")}
)               

Select

Select one or more streams from a split stream.

val even = split select "even"
val odd = split select "odd"
val all = split.select("even","odd")
val lines = env.socketTextStream("centos",9999)
val splitStream: SplitStream[String] = lines.split(line => {if (line.contains("error")) {List("error") //分支名称} else {List("info") //分支名称}
})
splitStream.select("error").print("error")
splitStream.select("info").print("info")

Side Out

val lines = env.socketTextStream("centos",9999)
//设置边输出标签 
val outTag = new OutputTag[String]("error") 
val results = lines.process(new ProcessFunction[String, String] {override def processElement(value: String, ctx: ProcessFunction[String,                 String]#Context, out: Collector[String]): Unit = {if (value.contains("error")) {ctx.output(outTag, value)} else {out.collect(value)}}
})
results.print("正常结果")
//获取边输出
results.getSideOutput(outTag)
.print("错误结果")

KeyBy

Logically partitions a stream into disjoint partitions, each partition containing elements of the same key. Internally, this is implemented with hash partitioning.

dataStream.keyBy("someKey") // Key by field "someKey"
dataStream.keyBy(0) // Key by the first element of a Tuple

Reduce

A “rolling” reduce on a keyed data stream. Combines the current element with the last reduced value and emits the new value.

env.socketTextStream("centos",9999)
.flatMap(_.split("\\s+"))
.map((_,1))
.keyBy(0)
.reduce((t1,t2)=>(t1._1,t1._2+t2._2))
.print()

Fold

A “rolling” fold on a keyed data stream with an initial value. Combines the current element with the last folded value and emits the new value.

env.socketTextStream("centos",9999)
.flatMap(_.split("\\s+"))
.map((_,1))
.keyBy(0)
.fold(("",0))((t1,t2)=>(t2._1,t1._2+t2._2))
.print()

Aggregations

Rolling aggregations on a keyed data stream. The difference between min and minBy is that min returns the minimum value, whereas minBy returns the element that has the minimum value in this field (same for max and maxBy).

zs 001 1200
ww 001 1500
zl 001 1000
env.socketTextStream("centos",9999)
.map(_.split("\\s+"))
.map(ts=>(ts(0),ts(1),ts(2).toDouble))
.keyBy(1)
.minBy(2)//输出含有最小值的记录
.print()
1> (zs,001,1200.0)
1> (zs,001,1200.0)
1> (zl,001,1000.0)
env.socketTextStream("centos",9999)
.map(_.split("\\s+"))
.map(ts=>(ts(0),ts(1),ts(2).toDouble))
.keyBy(1)
.min(2)
.print()
1> (zs,001,1200.0)
1> (zs,001,1200.0)
1> (zs,001,1000.0)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/19665.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

安全狗V3.512048版本绕过

安全狗安装 安全狗详细安装、遇见无此服务器解决、在windows中命令提示符中进入查看指定文件夹手动启动Apache_安全狗只支持 glibc_2.14 但是服务器是2.17_黑色地带(崛起)的博客-CSDN博客 安全狗 safedogwzApacheV3.5.exe 右键电脑右下角安全狗图标-->选择插件-->安装…

Jvm实际运行情况-JVM(十七)

上篇文章说jmap和jstat的命令&#xff0c;如何查看youngGc和FullGc耗时和次数。 Jmap-JVM&#xff08;十六&#xff09; Jvm实际运行情况 背景&#xff1a; 机器配置&#xff1a;2核4G JVM内存大小&#xff1a;2G 系统运行天数&#xff1a;7天 期间发生FULL GC次数和耗时…

untiy代码打压缩包,可设置密码

1、简单介绍&#xff1a; 用的是一个插件SharpZipLib&#xff0c;在vs的Nuget下载&#xff0c;也可以去github下载https://github.com/icsharpcode/SharpZipLib 用这个最主要的是因为&#xff0c;这个不用请求windows的文件读写权限&#xff0c;关于这个权限我搞了好久&#…

【设计模式——学习笔记】23种设计模式——命令模式Command(原理讲解+应用场景介绍+案例介绍+Java代码实现)

案例引入 有一套智能家电&#xff0c;其中有照明灯、风扇、冰箱、洗衣机&#xff0c;这些智能家电来自不同的厂家&#xff0c;我们不想针对每一种家电都安装一个手机App来分别控制&#xff0c;希望只要一个app就可以控制全部智能家电要实现一个app控制所有智能家电的需要&…

Jenkins 自动化部署实例讲解,另附安装教程!

【2023】Jenkins入门与安装_jenkins最新版本_丶重明的博客-CSDN博客 也可以结合这个互补看 前言 你平常在做自己的项目时&#xff0c;是否有过部署项目太麻烦的想法&#xff1f;如果你是单体项目&#xff0c;可能没什么感触&#xff0c;但如果你是微服务项目&#xff0c;相…

okhttp异步get和post请求,实现读取获取、增加http文件数据

Okhttp类&#xff0c;封装方法 package com.example.httptest;import android.content.ContentValues; import android.content.Context; import android.database.sqlite.SQLiteDatabase; import android.util.Log;import androidx.annotation.NonNull;import com.google.gso…

JVM的组件、自动垃圾回收的工作原理、分代垃圾回收过程、可用的垃圾回收器类型

详细画的图片 https://www.processon.com/diagraming/64c8aa11c07d99075d934311 官方网址 https://www.oracle.com/webfolder/technetwork/tutorials/obe/java/gc01/index.html 相关概念 年轻代是所有新对象被分配和老化的地方。当年轻代填满时&#xff0c;这会导致minor …

【Rust 基础篇】Rust Never类型:表示不会返回的类型

导言 Rust是一种以安全性和高效性著称的系统级编程语言&#xff0c;其设计哲学是在不损失性能的前提下&#xff0c;保障代码的内存安全和线程安全。在Rust中&#xff0c;Never类型是一种特殊的类型&#xff0c;它表示一个函数永远不会返回。Never类型在Rust中有着重要的应用场…

Rust dyn - 动态分发 trait 对象

dyn - 动态分发 trait 对象 dyn是关键字&#xff0c;用于指示一个类型是动态分发&#xff08;dynamic dispatch&#xff09;&#xff0c;也就是说&#xff0c;它是通过trait object实现的。这意味着这个类型在编译期间不确定&#xff0c;只有在运行时才能确定。 practice tr…

《Spring Boot源码解读与原理分析》书籍推荐

Spring Boot 1.0.0 早在2014年就已经发布&#xff0c;只不过到了提倡“降本增效”的今天&#xff0c;Spring Boot才引起了越来越多企业的关注。Spring Boot是目前Java EE开发中颇受欢迎的框架之一。依托于底层Spring Framework的基础支撑&#xff0c;以及完善强大的特性设计&am…

设计模式之中介者模式

中介者模式 用一个中介对象来封装一系列的对象交互。中介者使得各对象不需要显示地相互引用&#xff0c;从而使其耦合松散&#xff0c;而且可以独立地改变他们之间的交互。 电脑主板的功能就类似于一个中介者 经典中介者模式UML 例子 经典的中介者模式 package com.tao.Ya…

Linux用户权限信息、chmod以及chown命令

权限修改 权限信息chmod命令chown命令 权限信息 在Linux系统中&#xff0c;每个文件和目录都包含了权限信息&#xff0c;用于控制对其的访问权限。 文件权限&#xff1a;Linux系统中的文件权限由三组权限表示&#xff0c;分别是所有者权限、组权限和其他用户权限。 所有者权…

Excel快捷键F1-F9详解:掌握实用快捷操作,提升工作效率

Excel是广泛应用于办公场景的优质电子表格软件&#xff0c;然而&#xff0c;许多人只是使用鼠标点击菜单和工具栏来完成操作&#xff0c;而忽略了快捷键的威力。在本文中&#xff0c;我们将详解Excel中的F1-F9快捷键&#xff0c;帮助您掌握实用的快捷操作&#xff0c;提升工作效…

leetcode(力扣)剑指 Offer 16. 数值的整数次方 (快速幂)

文章目录 题目描述思路分析完整代码 题目描述 实现 pow(x, n) &#xff0c;即计算 x 的 n 次幂函数&#xff08;即&#xff0c;xn&#xff09;。不得使用库函数&#xff0c;同时不需要考虑大数问题。 示例 1&#xff1a; 输入&#xff1a;x 2.00000, n 10 输出&#xff1a;10…

通向架构师的道路之Apache整合Tomcat

一、先从J2EE工程的通用架构说起 这是一个通用的Web即B/S工程的架构&#xff0c;它由&#xff1a; Web Server App Server DB Server 三大部分组成&#xff0c;其中&#xff1a; Web Server 置于企业防火墙外&#xff0c;这个防火墙&#xff0c;大家可以认为是…

c语言(函数)

目录 何为函数 库函数 自定义函数 二分查找数组下标 链式访问 函数的声明 函数定义 递归 正向打印数字 打印字符个数 使用临时变量 递归(不使用临时变量) n的阶乘 一般形式 递归 斐波那契数 递归 正常做法 何为函数 在计算机科学中&#xff0c;子程序是一个…

150. 逆波兰表达式求值

150. 逆波兰表达式求值 题目-中等难度示例1. 字典存储function2. if-else 题目-中等难度 给你一个字符串数组 tokens &#xff0c;表示一个根据 逆波兰表示法 表示的算术表达式。 请你计算该表达式。返回一个表示表达式值的整数。 注意&#xff1a; 有效的算符为 ‘’、‘-’…

华为数通HCIA-实验环境ensp简介

ensp 路由器&#xff1a;AR系列、NE系列&#xff1b; 模拟器中使用AR2220&#xff1b; 交换机&#xff1a;S系列、CE系列&#xff1b; 模拟器中使用S5700&#xff1b; 线缆&#xff1a;copper——以太网链路&#xff1b; serial——串行链路&#xff0c;在模拟器中用于模…

单片机外部晶振故障后自动切换内部晶振——以STM32为例

单片机外部晶振故障后自动切换内部晶振——以STM32为例 作者日期版本说明Dog Tao2023.08.02V1.0发布初始版本 文章目录 单片机外部晶振故障后自动切换内部晶振——以STM32为例背景外部晶振与内部振荡器STM32F103时钟系统STM32F407时钟系统 代码实现系统时钟设置流程时钟源检测…

spring AOP学习

概念 面向切面编程横向扩展动态代理 相关术语 动态代理 spring在运行期&#xff0c;生成动态代理对象&#xff0c;不需要特殊的编译器 Spring AOP的底层就是通过JDK动态代理或者CGLIb动态代理技术为目标Bean执行横向织入 目标对象实现了接口&#xff0c;spring使用JDK的ja…