DataStream编程模型之数据源、数据转换、数据输出

Flink之DataStream数据源、数据转换、数据输出(scala)

0.前言–数据源

在进行数据转换之前,需要进行数据读取。
数据读取分为4大部分:

(1)内置数据源;

又分为文件数据源;在这里插入图片描述
socket数据源;
在这里插入图片描述

集合数据源三类
在这里插入图片描述

(2)Kafka数据源

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
第二个参数用到的SimpleStringSchema对象是一个内置的DeserializationSchema对象,可以把字节数据反序列化程一个String对象。
另外,FlinkKafkaConsumer开始读取Kafka消息时,可以配置他的 读 起始位置,有如下四种。
在这里插入图片描述

import java.util.Properties
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.windowing.time.Time
object KafkaWordCount {def main(args: Array[String]): Unit = {val kafkaProps = new Properties()//Kafka的一些属性kafkaProps.setProperty("bootstrap.servers", "localhost:9092")//所在的消费组kafkaProps.setProperty("group.id", "group1")//获取当前的执行环境val evn = StreamExecutionEnvironment.getExecutionEnvironment
//创建Kafka的消费者,wordsendertest是要消费的Topicval kafkaSource = new FlinkKafkaConsumer[String]("wordsendertest",new SimpleStringSchema,kafkaProps)//设置从最新的offset开始消费kafkaSource.setStartFromLatest()//自动提交offset
kafkaSource.setCommitOffsetsOnCheckpoints(true)//绑定数据源val stream = evn.addSource(kafkaSource)//设置转换操作逻辑val text = stream.flatMap{ _.toLowerCase().split("\\W+")filter{ _.nonEmpty} }.map{(_,1)}.keyBy(0).timeWindow(Time.seconds(5)).sum(1)//打印输出text.print()//程序触发执行evn.execute("Kafka Word Count")}
}

(3)HDFS数据源

在这里插入图片描述

(4)自定义数据源

在这里插入图片描述
一个例子:

import java.util.Calendar
import org.apache.flink.streaming.api.functions.source.RichSourceFunction
import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import scala.util.Randomcase class StockPrice(stockId:String,timeStamp:Long,price:Double)
object StockPriceStreaming {def main(args: Array[String]) { //设置执行环境val env = StreamExecutionEnvironment.getExecutionEnvironment//设置程序并行度    
env.setParallelism(1)    
//股票价格数据流val stockPriceStream: DataStream[StockPrice] = env//该数据流由StockPriceSource类随机生成.addSource(new StockPriceSource)//打印结果stockPriceStream.print()//程序触发执行env.execute("stock price streaming")}class StockPriceSource extends RichSourceFunction[StockPrice]{ var isRunning: Boolean = trueval rand = new Random()//初始化股票价格var priceList: List[Double] = List(10.0d, 20.0d, 30.0d, 40.0d, 50.0d)var stockId = 0var curPrice = 0.0d
override def run(srcCtx: SourceContext[StockPrice]): Unit = {while (isRunning) {//每次从列表中随机选择一只股票stockId = rand.nextInt(priceList.size)val curPrice =  priceList(stockId) + rand.nextGaussian() * 0.05priceList = priceList.updated(stockId, curPrice)val curTime = Calendar.getInstance.getTimeInMillis//将数据源收集写入SourceContextsrcCtx.collect(StockPrice("stock_" + stockId.toString, curTime, curPrice))Thread.sleep(rand.nextInt(10))}
} override def cancel(): Unit = {isRunning = false}}
}

1.数据转换之map操作

1.数据转换算子的四种类型
基于单条记录:fliter、map
基于窗口:window
合并多条数据流:union,join,connect
拆分多条数据流:split

2.map(func)操作将一个DataStream中的每个元素传递到函数func中,并将结果返回为一个新的DataStream。输出的数据流DataStream[OUT]类型可能和输入的数据流DataStream[IN]不同
理解:一 一对应的关系,一个x得到一个y

val dataStream = env.fromElements(1,2,3,4,5)
val mapStream = dataStream.map(x=>x+10)

在这里插入图片描述
3.演示代码

import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironmentcase class StockPrice(stockId:String,timeStamp:Long,price:Double) 
object MapFunctionTest {def main(args: Array[String]): Unit = {//设定执行环境val env = StreamExecutionEnvironment.getExecutionEnvironment//设定程序并行度env.setParallelism(1)//创建数据源val dataStream: DataStream[Int] = env.fromElements(1, 2, 3, 4, 5, 6, 7)//设置转换操作逻辑val richFunctionDataStream = dataStream.map {new MyMapFunction()}//打印输出richFunctionDataStream.print()//程序触发执行env.execute("MapFunctionTest")}//自定义函数,继承RichMapFunctionclass MyMapFunction extends RichMapFunction[Int, String] {override def map(input: Int): String =("Input : " + input.toString + ", Output : " + (input * 3).toString)}
}

2.数据转换之flatMap操作

1.flatMap和map相似,每个输入元素都可以映射到0或多个输出结果。

val dataStream = env.fromElements("Hadoop is good","Flink is fast","Flink is better")
val flatMapStream = dataStream.flatMap(line => line.split(" "))

在这里插入图片描述
可以理解为flatMap比map多了flat操作。如图。map是将输入数据映射成数组,flat是将数据拍扁,成为一个个元素。把元素映射成了多个。

2.代码演示

import org.apache.flink.api.common.functions.FlatMapFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.util.Collectorcase class StockPrice(stockId:String,timeStamp:Long,price:Double) 
object FlatMapFunctionTest {def main(args: Array[String]): Unit = {//设定执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment//设定程序并行度
env.setParallelism(1)
//设置数据源
val dataStream: DataStream[String] = env.fromElements("Hello Spark", "Flink is excellent“) //针对数据集的转换操作逻辑
val result = dataStream.flatMap(new WordSplitFlatMap(15)) //打印输出
result.print() 
//程序触发执行env.execute("FlatMapFunctionTest")} //使用FlatMapFunction实现过滤逻辑,只对字符串长度大于threshold的内容进行切词class WordSplitFlatMap(threshold: Int) extends FlatMapFunction[String, String] {override def flatMap(value: String, out: Collector[String]): Unit = {if (value.size > threshold) {value.split(" ").foreach(out.collect)}}}
}

预计输出:

Flink
is
excellent

这里只对字符长度超过15的做切割。threshold是阈值,少于15的不做切割。

3.数据转换之filter和keyBy操作

1.filter(func)操作会筛选出满足函数func的元素,并返回一个新的数据集
2.代码举例

val dataStream = env.fromElements("Hadoop is good","Flink is fast","Flink is better")
val filterStream = dataStream.filter(line => line.contains("Flink"))

如图所示
在这里插入图片描述

3.keyBy(注意方法里k小写B大写):将相同Key的数据放置在相同的分区中。
keyBy算子根据元素的形状对数据进行分组,相同形状的元素被分到了一起,可被后续算子统一处理

比如在词频统计时:

				hello flink hello hadoophello zhangsan

这里 词频(hello,1),(hello,1),(hello,1)统计出来之后,通过keyBy,就可以聚合,放在了相同的分区里进行统一计算。

在这里插入图片描述
通过聚合函数后又可以吧KeyedStream转换成DataStream。

4.在使用keyBy算子时,需要向keyBy算子传递一个参数, 可使用数字位置来指定Key
比如刚才词频统计时,keyBy(0)就是hello这个单词。

val dataStream: DataStream[(Int, Double)] =env.fromElements((1, 2.0), (2, 1.7), (1, 4.9), (3, 8.5), (3, 11.2))
//使用数字位置定义Key 按照第一个字段进行分组
val keyedStream = dataStream.keyBy(0)

这里keyby 是第一个字段1或者2或者3分组(分类)。

5.keyBy代码举例:

import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment//声明一个样例类,包含三个字段:股票ID、交易时间、交易价格
case class StockPrice(stockId:String,timeStamp:Long,price:Double)object KeyByTest{def main(args: Array[String]): Unit = {//获取执行环境val env = StreamExecutionEnvironment.getExecutionEnvironment//设置程序并行度env.setParallelism(1)
//创建数据源val stockList = List(StockPrice("stock_4",1602031562148L,43.4D),StockPrice("stock_1",1602031562148L,22.9D),StockPrice("stock_0",1602031562153L,8.2D),StockPrice("stock_3",1602031562153L,42.1D),StockPrice("stock_2",1602031562153L,29.2D),StockPrice("stock_0",1602031562159L,8.1D),StockPrice("stock_4",1602031562159L,43.7D),StockPrice("stock_4",1602031562169L,43.5D))val dataStream = env.fromCollection(stockList) //设定转换操作逻辑val keyedStream = dataStream.keyBy("stockId“) //打印输出keyedStream.print() //程序触发执行env.execute("KeyByTest")}
}

在这里插入图片描述
这里看起来没什么变换 ,因为没进行聚合操作,所以什么变化都没有,原样输出。
我加上聚合函数,看起来就有变化了。

//简写上面的代码 加上聚合函数val keyedStream = dataStream.keyBy("stockId")val aggre = keyedStream.sum(2) //这里相加的是价格price(第三个字段)// keyedStream.print()aggre.print()//聚合后打印

结果
在这里插入图片描述
对比上面哪里变化了呢?
stcok_id顺序,4-1-0-3-2-0(这里之前也有0,就会加上之前的0,变为16.299,后面的4也在累加前面的price了

4.数据转换之reduce操作和聚合操作

1.reduce:reduce算子将输入的KeyedStream通过传入的用户自定义函数滚动地进行数据聚合处理,处理以后得到一个新的DataStream,如下实例

import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment//声明一个样例类,包含三个字段:股票ID、交易时间、交易价格
case class StockPrice(stockId:String,timeStamp:Long,price:Double)object ReduceTest{def main(args: Array[String]): Unit = {//获取执行环境val env = StreamExecutionEnvironment.getExecutionEnvironment//设置程序并行度env.setParallelism(1)//创建数据源val stockList = List(StockPrice("stock_4",1602031562148L,43.4D),StockPrice("stock_1",1602031562148L,22.9D),StockPrice("stock_0",1602031562153L,8.2D),StockPrice("stock_3",1602031562153L,42.1D),StockPrice("stock_2",1602031562153L,29.2D),StockPrice("stock_0",1602031562159L,8.1D),StockPrice("stock_4",1602031562159L,43.7D),StockPrice("stock_4",1602031562169L,43.5D))val dataStream = env.fromCollection(stockList)//设定转换操作逻辑val keyedStream = dataStream.keyBy("stockId")val reduceStream = keyedStream.reduce((t1,t2)=>StockPrice(t1.stockId,t1.timeStamp,t1.price+t2.price))//打印输出reduceStream.print()//程序触发执行env.execute("ReduceTest")}
}

reduce结果和上面的一样,就是累加
在这里插入图片描述

2.flink也支持自定义的reduce函数

import org.apache.flink.api.common.functions.ReduceFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment//声明一个样例类,包含三个字段:股票ID,交易时间,交易价格
case class StockPrice(stockId:String,timeStamp:Long,price:Double)object MyReduceFunctionTest{def main(args: Array[String]): Unit = {//获取执行环境val env = StreamExecutionEnvironment.getExecutionEnvironment//设置程序并行度env.setParallelism(1)//创建数据源val stockList = List(StockPrice("stock_4",1602031562148L,43.4D),StockPrice("stock_1",1602031562148L,22.9D),StockPrice("stock_0",1602031562153L,8.2D),StockPrice("stock_3",1602031562153L,42.1D),StockPrice("stock_2",1602031562153L,29.2D),StockPrice("stock_0",1602031562159L,8.1D),StockPrice("stock_4",1602031562159L,43.7D),StockPrice("stock_4",1602031562169L,43.5D))val dataStream = env.fromCollection(stockList) //设定转换操作逻辑val keyedStream = dataStream.keyBy("stockId")val reduceStream = keyedStream.reduce(new MyReduceFunction)//打印输出reduceStream.print()//程序触发执行env.execute("MyReduceFunctionTest")}class MyReduceFunction extends ReduceFunction[StockPrice] {override def reduce(t1: StockPrice,t2:StockPrice):StockPrice = {StockPrice(t1.stockId,t1.timeStamp,t1.price+t2.price)}}
}

主要不同的就是创建了MyReduceFunction ().
3.聚合算子
在这里插入图片描述
和excel一样。
代码举例:

import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment//声明一个样例类,包含三个字段:股票ID、交易时间、交易价格
case class StockPrice(stockId:String,timeStamp:Long,price:Double) 
object AggregationTest{def main(args: Array[String]): Unit = {//获取执行环境val env = StreamExecutionEnvironment.getExecutionEnvironment//设置程序并行度env.setParallelism(1)    
//创建数据源val stockList = List(StockPrice("stock_4",1602031562148L,43.4D),StockPrice("stock_1",1602031562148L,22.9D),StockPrice("stock_0",1602031562153L,8.2D),StockPrice("stock_3",1602031562153L,42.1D),StockPrice("stock_2",1602031562153L,29.2D),StockPrice("stock_0",1602031562159L,8.1D),StockPrice("stock_4",1602031562159L,43.7D),StockPrice("stock_4",1602031562169L,43.5D))val dataStream = env.fromCollection(stockList)//设定转换操作逻辑val keyedStream = dataStream.keyBy("stockId")val aggregationStream = keyedStream.sum(2)  //区别在这里   sum聚合 2表示第三个字段//打印输出aggregationStream.print()//执行操作env.execute(" AggregationTest")}
}

运行结果
在这里插入图片描述

5.数据输出

1.基本数据输出包括:文件输出,客户端输出,socket网络端口输出。
文件输出具体代码

val dataStream = env.fromElements("hadoop","spark","flink")
//文件输出
dataStream.writeAsText("file:///home/hadoop/output.txt")
//hdfs输出//把数据写入HDFS
dataStream.writeAsText("hdfs://localhost:9000/output.txt“) //通过writeToSocket方法将DataStream数据集输出到指定socket端口
dataStream.writeToSocket(outputHost,outputPort,new SimpleStringSchema())

2.输出到kafka
代码举例:

import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerobject SinkKafkaTest{def main(args: Array[String]): Unit = {//获取执行环境val env = StreamExecutionEnvironment.getExecutionEnvironment//加载或创建数据源val dataStream = env.fromElements("hadoop","spark","flink")//把数据输出到Kafka
dataStream.addSink(new FlinkKafkaProducer [String]("localhost:9092", "sinkKafka", new SimpleStringSchema()))//程序触发执行env.execute()}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/61120.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

CSS盒子的定位>(上篇)#定位属性#相对定位-附练习

一、定位属性 1.定位方式 position属性可以选择4种不同类型的定位方式。 语法格式:position:relation | absolute | fixed参数:①relative生成相对定位的元素,相对于其正常位置进行定位。 ②absolute生成绝对定位的…

Redis/Codis性能瓶颈揭秘:网卡软中断的影响与优化

目录 现象回顾 问题剖析 现场分析 解决方案 总结与反思 1.调整中断亲和性(IRQ Affinity): 2.RPS(Receive Packet Steering)和 RFS(Receive Flow Steering): 近期,…

WordPress设置自动更新CSS版本号

WordPress 通常会在引用 CSS 文件时添加版本号参数(?verx.x.x)。如果版本号未更新,浏览器可能继续加载旧的文件。 解决方法:确保你在 functions.php 文件中正确加载了 CSS 文件,并动态更新版本号。例如在functions.p…

若依权限控制

springbootvue2项目中的权限控制(若依项目) 步骤: 1.登录管理员账号,为普通用户增加权限按钮 绿色部分为权限控制字符 2.在后端对应的方法上增加权限控制(这里以删除操作为例):PreAuthorize(“ss.hasPermi(‘area:store:remove’)”) 3.在前端对应的按钮上增加权限控制:v-ha…

【机器学习】如何配置anaconda环境(无脑版)

马上就要上机器学习的实验,这里想写一下我配置机器学习的anaconda环境的二三事 一、首先,下载安装包: Download Now | Anaconda 二、打开安装包,一直点NEXT进行安装 这里要记住你要下载安装的路径在哪,后续配置环境…

OceanBase 升级过程研究(4.2.1.6-4.2.1.8)

模拟业务 使用benchmark加载10仓数据模拟业务场景 升级方法 使用滚动升级方式来进行OB升级。该方法前提是OB集群必须满足官方规定的高可用架构(如果 Zone 个数小于 3,滚动升级时则无法构成多数派), 滚动升级的原理就是轮流完成每个ZONE的升级工作,由于…

微知-DOCA ARGP参数模块的相关接口和用法(config单元、params单元,argp pipe line,回调)

文章目录 1. 背景2. 设置参数的主要流程2.1 初始化2.2 注册某个params的处理方式以及回调函数2.4 定义好前面的params以及init指定config地点后start处理argv 3. 其他4. DOCA ARGP包相关4.1 主要接口4.2 DOCA ARGP的2个rpm包4.2.1 doca-sdk-argp-2.9.0072-1.el8.x86_64.rpm4.2.…

C#.Net筑基-字符串超全总结

字符串是日常编码中最常用的引用类型了,可能没有之一,加上字符串的不可变性、驻留性,很容易产生性能问题,因此必须全面了解一下。 01、字符与字符编码 1.1、字符Char 字符 char 表示为 Unicode字符,在C#中用 UTF-16 …

苍穹外卖-后端部分

软件开发整体介绍 前端搭建 在非中文目录中双击nginx.exe然后浏览器访问localhost即可 后端搭建 基础准备 导入初始文件 使用git进行版本控制 创建本地仓库和远程仓库,提交Git 连接数据库 连接数据库把资料中的文件放入运行即可 前后端联调测试 苍穹外卖项目接口文档…

剧本杀门店预约小程序,解锁沉浸式推理体验

一、开发背景 剧本杀作为一种热门娱乐游戏,深受大众的欢迎,但随着市场的快速发展,竞争也在不断加大,对于剧本杀线下商家来说面临着发展创新。 剧本杀线下门店数量目前正在逐渐增加,竞争激烈,而门店的获客…

【WPF】Prism学习(二)

Prism Commands 1.命令(Commanding) 1.1. ViewModel的作用: ViewModel不仅提供在视图中显示或编辑的数据,还可能定义一个或多个用户可以执行的动作或操作。这些用户可以通过用户界面(UI)执行的动作或操作…

学者观察 | 元计算、人工智能和Web 3.0——山东大学教授成秀珍

导语 成秀珍教授提出元计算是在开放的零信任环境下整合算力资源打通数据壁垒构建自进化智能的新质生产力技术,是一种新计算范式;区块链是Web3.0的核心技术之一,有助于保障开放零信任环境下,用户、设备和服务间去中心化数据流通的…

学习笔记022——Ubuntu 安装 MySQL8.0版本踩坑记录

目录 1、查看可安装 MySQL 版本 2、Ubuntu安装 MySQL8.0 3、MySQL8.0 区分大小写问题 4、MySQL8.0 设置sql_mode 5、MySQL8.0 改端口33060(个人遇到问题) 1、查看可安装 MySQL 版本 ## 列出可用的MySQL版本(列出所有可用的MySQL版本以…

「AI Infra 软件开源不是一个选项,而是必然」丨云边端架构和 AI Infra专场回顾@RTE2024

在人工智能和开源技术蓬勃发展的当下,AI Infra 项目正经历着日新月异的变革。从跨平台运行时到云边端 AI 基础设施,再到多模态知识助手,创新浪潮席卷而来。这些进步不仅显著提升了技术指标,也为实时音视频处理、边缘计算、大模型应…

《Python制作动态爱心粒子特效》

一、实现思路 粒子效果: – 使用Pygame模拟粒子运动,粒子会以爱心的轨迹分布并运动。爱心公式: 爱心的数学公式: x16sin 3 (t),y13cos(t)−5cos(2t)−2cos(3t)−cos(4t) 参数 t t 的范围决定爱心形状。 动态效果: 粒子…

免费实时图片编辑工具:MagicQuill

参看: https://huggingface.co/spaces/AI4Editing/MagicQuill 人工智能交互式图像编辑:可以制定涂改增加删除

web——upload-labs——第九关——特殊字符::$DATA绕过

特殊字符::$DATA绕过 典型绕过场景 在一些系统中,::$DATA 被用于绕过文件路径的限制。比如: 路径过滤绕过:如果系统有某种机制来检查和限制文件路径(例如,禁止访问某些系统目录或敏感文件),通…

本地部署 excalidraw

本地部署 excalidraw 0. 引言1. 本地部署 excalidraw2. 访问 excalidraw 0. 引言 Excalidraw 编辑器是一款开源虚拟手绘白板,支持协作且端到端加密。 1. 本地部署 excalidraw git clone https://github.com/excalidraw/excalidraw.git; cd excalidrawvi docker-c…

《Java核心技术 卷I》用户界面AWT事件继承层次

AWT事件继承层次 EventObject类有一个子类AWTEvent,它是所有AWT事件类的父类。 Swing组件会生成更多其他事件对象,都直接拓展自EventObject而不是AWTEvent。 AWT将事件分为底层(low-level)事件和语义事件。 语义事件:表示用户的动作事件&…

三周精通FastAPI:42 手动运行服务器 - Uvicorn Gunicorn with Uvicorn

官方文档:Server Workers - Gunicorn with Uvicorn - FastAPI 使用 fastapi 运行命令 可以直接使用fastapi run命令来启动FastAPI应用: fastapi run main.py如创建openapi.py文件: from fastapi import FastAPIapp FastAPI(openapi_url&…