Flink之DataStream API的转换算子

简单转换算子

函数的实现方式

  1. 自定义类,实现函数接口:编码麻烦,使用灵活
  2. 匿名内部类:编码简单
  3. Lambda:编码简洁
public class Flink02_FunctionImplement {public static void main(String[] args) {//1.创建运行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//默认是最大并行度env.setParallelism(1);env.socketTextStream("hadoop102",8888).flatMap((String line, Collector<Tuple2<String, Integer>> out)->{String[] words = line.split(" ");for (String word : words) {out.collect(Tuple2.of(word,1));}}).returns(Types.TUPLE(Types.STRING,Types.INT)).keyBy(0).sum(1).print();try {env.execute();} catch (Exception e) {throw new RuntimeException(e);}}public static class MyFlatMapFunction implements FlatMapFunction<String, Tuple2<String,Integer>> {private String Operator;@Overridepublic void flatMap(String line, Collector<Tuple2<String, Integer>> out) throws Exception {String[] words = line.split(" ");for (String word : words) {out.collect(Tuple2.of(word,1));}}}
}

Reduce规约聚合

  1. reduce:规约聚合
    • 聚合的原理:两两聚合,上一次的聚合值与本次新到的值进行聚合
    • 泛型 T : 流中的数据类型, 从方法声明中可以看到,输入输出类型一直
    • 方法: T reduce(T value1, T value2) throws Exception
      • value1:上一次的聚合值
      • value2:本次新到的值
public class Flink04_ReduceAggOpterator {public static void main(String[] args) {//1.创建运行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//默认是最大并行度env.setParallelism(1);DataStreamSource<Event> ds = Flink06_EventSource.getEventSource(env);ds.print("input================");//reduce:每个用户的点击次数ds.map(event->new WordCount(event.getUser(),1)).keyBy(WordCount::getWord).reduce(new ReduceFunction<WordCount>() {/**** @param value1 上次聚合的结果,第一个数据不参与聚合,直接输出* @param value2 新来的值* @return* @throws Exception*/@Overridepublic WordCount reduce(WordCount value1, WordCount value2) throws Exception {System.out.println("测试");return new WordCount(value1.getWord(),value1.getCount()+value2.getCount());}}).print("reduce");try {env.execute();} catch (Exception e) {throw new RuntimeException(e);}}
}

富函数

调用算子的时候,需要传入对应的用户自定义函数来完成具体的功能

  • 函数:
    • 普通函数
      Map
      filter
      flatMap
      reduce
      富函数:基本上每个普通函数都有对应的富函数
      统一接口interface RichFunction extends Function
      具体使用的富函数类:
      - RichMapFunction
      - RichFilterFunction
      - RichFlatMapFunction
      - RichReduceFunction
      - …
      富函数功能:
      • 生命周期方法:
        • open(): 当前算子的每个并行子任务的实例创建时会调用一次
        • close():当前算子的每个并行子任务的实例销毁时(有界流),调用一次
      • 获取运行时上下文对象 getRuntimeContext
        • 可以获取当前作业,当前task的相关信息
        • 获取不同类型的状态,进行状态编程*
          getState | getListState | getReducingState | getMapState
public class Flink05_RichFunction {public static void main(String[] args) {//1.创建运行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//默认是最大并行度env.setParallelism(1);//        DataStreamSource<Event> ds = Flink06_EventSource.getEventSource(env);FileSource<String> fileSource = FileSource.<String>forRecordStreamFormat(new TextLineInputFormat(),new Path("input/enents.txt")).build();DataStreamSource<String> fileDs = env.fromSource(fileSource, WatermarkStrategy.noWatermarks(), "fileSource");SingleOutputStreamOperator<Event> ds = fileDs.map(new MapFunction<String, Event>() {@Overridepublic Event map(String value) throws Exception {String[] valueArr = value.split("");return new Event(valueArr[0], valueArr[1], Long.valueOf(valueArr[2]));}});ds.print("input================");ds.map(new RichMapFunction<Event, WordCount>() {/*** 生命周期open方法,当前算子实例创建时执行一次,只执行一次* @param parameters The configuration containing the parameters attached to the contract.* @throws Exception*/@Overridepublic void open(Configuration parameters) throws Exception {System.out.println("创建Redis的连接对象");}@Overridepublic WordCount map(Event value) throws Exception {System.out.println("每条数据执行一次");return new WordCount(value.getUser(),1);}/*** 生命周期的close方法* 当前算子实例销毁时执行一次* @throws Exception*/@Overridepublic void close() throws Exception {System.out.println("关闭连接对象");}}).print("map");try {env.execute();} catch (Exception e) {throw new RuntimeException(e);}}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/202856.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

二值图像分割统一项目

1. 项目文件介绍 本章为二值图像的分割任务做统一实现&#xff0c;下面是项目的实现目录 项目和文章绑定了&#xff0c;之前没用过&#xff0c;不知道行不行 data 文件夹下负责摆放数据的训练集测试集inference 负责放待推理的图片(支持多张图片预测分割)run_results 是网络训…

centos7 yum安装nginx

1.安装源 rpm -ivh http://nginx.org/packages/centos/5/noarch/RPMS/nginx-release-centos-5-0.el5.ngx.noarch.rpm 2.安装 (-y 的意思是自动yes) yum install nginx -y 3.查找安装到哪里了 whereis nginx 一般都是在 /etc/nginx下面 4.常用命令 检查配置文件是否正确 …

【华为OD题库-079】周末爬山-Java

题目 周末小明准备去爬山锻炼&#xff0c;0代表平地&#xff0c;山的高度使用1到9来表示&#xff0c;小明每次爬山或下山高度只能相差k及k以内&#xff0c;每次只能上下左右一个方向上移动—格&#xff0c;小明从左上角(0,0)位置出发 输入描述 第一行输入m n k(空格分隔)。代表…

InnoDB的数据存储结构

一 数据库的存储结构&#xff1a;页 索引结构提供了高效的检索方式&#xff0c;不过索引信息和数据记录都是保存在文件上的&#xff0c;确切的说是存储在页结构中。另一方面&#xff0c;索引是在引擎中实现的&#xff0c;MySQL服务器上的存储引擎负责对表中数据的读取和写入。…

R语言学习

Part1阶段1&#xff1a;入门基础 1安装R和RStudio&#xff1a; 下载并安装R&#xff1a;https://cran.r-project.org/ 下载并安装RStudio&#xff1a;https://www.rstudio.com/products/rstudio/download/ 2Hello World&#xff1a; 学习如何在R中输出"Hello, World!"…

软考高项第四版五组十域表+ITTO背诵笔记及助记

基于第四版做的笔记&#xff0c;助记是自己编的 还是得靠理解记忆&#xff0c;下面是文档&#xff0c;也用anki制作了记忆卡片&#xff0c;需要的可以自行导入卡包

高德地图加载三维模型vue(.obj转.gltf)

官方glTF模型案例 obj2gltf 的开发文档 第一步&#xff1a;这里首先要将我们的.obj文件转换为.gltf文件 全局安装 npm install -g obj2gltf终端打开.obj文件所在的文件夹执行 obj2gltf -i model.obj -o model.gltf -t &#xff08;-i model.obj对应你的obj文件的名字&#x…

企业部署Windows活动目录有什么好处?

在一个现代化的企业中&#xff0c;高效、安全地管理公司的IT资源是至关重要的。Windows Active Directory&#xff08;活动目录&#xff09;是一个强大的功能&#xff0c;可以帮助企业实现集中管理用户、计算机、组策略和其他资源的目的。本文将探讨部署Windows AD域即活动目录…

【往届见刊检索速度hin OK】 第五届计算机工程与应用国际学术会议 (ICCEA 2024)

第五届计算机工程与应用国际学术会议 (ICCEA 2024) 2024 5th International Conference on Computer Engineering and Application 2024年4月12-14日 中国-杭州 计算机工程与应用在人工智能、大数据、云计算、物联网、网络安全等领域发挥着重要作用&#xff0c;随着科技日…

[NAND Flash 2.3] 闪存芯片国产进程

依公知及经验整理&#xff0c;原创保护&#xff0c;禁止转载。 专栏 《深入理解NAND Flash》 <<<< 返回总目录 <<<< 目录 前言1 闪存介质1.1 NOR 闪存国产技术发展1.2 NAND 闪存国产技术 2 闪存国产厂商与产品2.1 NOR FLASH 国产厂商与产品2.2 NAND FA…

开发重要网站

dockerhub hub.docker.comhutool工具包 https://hutool.cn/docs/#/rgb颜色 https://m.fontke.com/tool/rgb/7badb1/json查看 https://www.bejson.com/jsonviewernew/大小写等转换 https://www.iamwawa.cn/daxiaoxie.htmlmaven库查询 https://mvnrepository.com/

java开发中Dao层和Mapper层的关系

Mapper 层和 DAO&#xff08;Data Access Object&#xff09;层是在持久层中用于处理数据访问的两个概念。虽然这两者的目的都是用于访问数据库&#xff0c;但它们之间有一些区别。在Java开发中&#xff0c;这两个概念通常与MyBatis&#xff08;或其他ORM框架&#xff09;结合使…

Vue学习计划-Vue2--Vue核心(四)watch、class、style、set

Vue 监听(watch): 监听一个属性的变化 监事属性watch: 当监视的属性变化时&#xff0c;回调函数自动调用&#xff0c;进行相关操作监视的属性必须存在&#xff0c;才能进入监视监视的两种写法&#xff1a; new Vue 时传入watch配置通过 vm.$watch()监视 immediate初始化时让han…

运行在多个端系统上的程序是如何互相通信的?

一、进程通信 1.首先搞清楚一点&#xff0c;对于操作系统而言&#xff0c;进行通信的实际上是进程&#xff0c;而不是程序。 2.一个进程可以被认为是运行在端系统上的一个程序&#xff0c;当多个进程运行在相同的端系统上的时候&#xff0c;它们使用进程间通信机制相互通信。…

深入Os--动态链接

1.动态链接库的使用 动态库支持以两种模式使用&#xff0c;一种模式下&#xff0c;在程序加载运行时&#xff0c;完成动态链接。一种模式下&#xff0c;在程序运行中&#xff0c;完成动态链接。 1.1.程序加载运行时完成动态链接 我们通过一个实例介绍程序加载运行时&#xff0c…

【Pandas思考记录】力扣181. 超过经理收入的员工

原题链接 Pandas 代码&#xff1a; import pandas as pddef find_employees(employee: pd.DataFrame) -> pd.DataFrame:merged_df pd.merge(employee, employee, left_onmanagerId, right_onid, howinner, suffixes(, _manager))print("merged_df", merged_df)#…

Socket.D 网络应用协议,首版发布!

有用户说&#xff0c;“Socket.D 之于 Socket&#xff0c;尤如 Vue 之于 Js、Mvc 之于 Http” 主要特性 基于事件&#xff0c;每个消息都可事件路由所谓语义&#xff0c;通过元信息进行语义描述流关联性&#xff0c;有相关的消息会串成一个流语言无关&#xff0c;使用二进制输…

【debug】Image 库 字体问题

可能的报错信息&#xff1a; from PIL import ImageFont, ImageDrawdraw ImageDraw.Draw(image)# use a bitmap font font ImageFont.load("arial.pil")draw.text((10, 10), "hello", fontfont)# use a truetype font font ImageFont.truetype("a…

4G基站BBU、RRU、核心网设备

目录 前言 基站 核心网 信号传输 前言 移动运营商在建设4G基站的时候&#xff0c;除了建设一座铁塔之外&#xff0c;更重要的是建设搭载铁塔之上的移动通信设备&#xff0c;这篇博客主要介绍BBU&#xff0c;RRU以及机房的核心网等设备。 基站 一个基站有BBU&#xff0c;…

代数学笔记7: 交换群结构定理,群在集合上的作用

交换群结构定理 G ≅ Z / d 1 Z Z / d 2 Z ⋯ Z / d n Z , d 1 ∣ d 2 , ⋯ , d n − 1 ∣ d n G\cong \mathbb{Z}/d_1\mathbb{Z}\times \mathbb{Z}/d_2\mathbb{Z}\times\cdots\times \mathbb{Z}/d_n\mathbb{Z}, \quad d_1|d_2,\cdots,d_{n-1}|d_n G≅Z/d1​ZZ/d2​Z⋯Z/dn​…