flink分别使用FilterMap和ProcessFunction实现去重逻辑

背景

在日常的工作中,对数据去重是一件很常见的操作,比如我们只需要保留重复记录的第一条,而忽略掉后续重复的记录,达到去重的效果,本文就使用flink的FilterMap和ProcessFunction来实现去重逻辑

FilterMap和ProcessFunction去重实现

filterMap实现去重

public class DuplicateRichFlatMap extends RichFlatMapFunction<WikipediaEditEvent, WikipediaEditEvent> {ValueState<Boolean> duplicateInput;@Overridepublic void open(Configuration parameters) throws Exception {duplicateInput = getRuntimeContext().getState(new ValueStateDescriptor<Boolean>("duplicate", Types.BOOLEAN));}@Overridepublic void flatMap(WikipediaEditEvent in, Collector<WikipediaEditEvent> collector) throws Exception {if (duplicateInput.value() == null) {collector.collect(in);duplicateInput.update(true);}}}

这里实现的关键就是有一个key-value的flink状态

ProcessFunction去重

public class DupliacateProcessFunction extends KeyedProcessFunction<String, WikipediaEditEvent, WikipediaEditEvent> {ValueState<Boolean> duplicateInput;@Overridepublic void open(Configuration parameters) throws Exception {ValueStateDescriptor stateDescriptor = new ValueStateDescriptor<Boolean>("previousInput", Types.BOOLEAN);// 状态ttl超时时间设置StateTtlConfig ttlConfig =StateTtlConfig.newBuilder(Time.days(1)).setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite).setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)// check 10 keys for every state access.cleanupIncrementally(100, false).build();stateDescriptor.enableTimeToLive(ttlConfig);duplicateInput = getRuntimeContext().getState(stateDescriptor);}@Overridepublic void processElement(WikipediaEditEvent in, Context context, Collector<WikipediaEditEvent> collector)throws Exception {if (duplicateInput.value() == null) {collector.collect(in);duplicateInput.update(true);}}
}

这里的关键代码也是拥有一个key-value的状态

触发计算的job代码如下

public class DuplicateJob {public static void main(String[] args) throws Exception {StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();see.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);DataStream<WikipediaEditEvent> edits = see.addSource(new RandomStringSource());KeyedStream<WikipediaEditEvent, String> keyedEdits = edits.keyBy(new KeySelector<WikipediaEditEvent, String>() {@Overridepublic String getKey(WikipediaEditEvent event) {return event.getUser();}});// 通过RichFlatMap实现去重DataStream<WikipediaEditEvent> result = keyedEdits.flatMap(new DuplicateRichFlatMap());// 通过ProcessFunction实现去重
//        DataStream<WikipediaEditEvent> result = keyedEdits.process(new DupliacateProcessFunction());result.print();see.execute();}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/656743.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Compose | UI组件(十) | Box,Surface - 帧布局

文章目录 前言Box 组件的参数说明Box 组件的使用Surface 的参数说明Surface 的使用 总结 前言 Box组件是 按子组件依次叠加 的布局组件&#xff0c;相当传统View中的 FrameLayout Box 组件的参数说明 Composable inline fun Box(modifier: Modifier Modifier, …

Web服务器之Tomcat

文章目录 Web 服务器软件简介资源分类访问流程常见的Web服务器软件 Tomcat简介使用步骤使用Tomcat注意事项部署项目的方式方式一方式二方式三 问题中文乱码黑窗口一闪而过启动报错 Web 服务器软件 简介 服务器&#xff1a;安装了服务器软件的计算机服务器软件&#xff1a;接收…

漏洞原理远程命令执行

漏洞原理远程命令/代码执行 远程命令执行函数&#xff08;Remote Command Execution Function&#xff09;是指在一个网络环境中&#xff0c;通过远程执行命令来控制另一个计算机系统或设备的功能。 远程命令执行函数可以通过网络协议&#xff08;如SSH、Telnet、RPC等&#x…

伊恩·斯图尔特《改变世界的17个方程》毕达哥拉斯定理笔记

它告诉我们什么&#xff1f; 直角三角形的三个边之间有什么关系。 为什么重要&#xff1f; 它提供了几何和代数之间的重要联系&#xff0c;使我们能够根据坐标计算距离。它也催生出了三角学。 它带来了什么&#xff1f; 测绘、导航&#xff0c;以及较近代出现的狭义和广义相对论…

第一节课,用户管理--后端初始化,项目调通。二次翻工2

一、网址来源&#xff1a; 快速开始 | MyBatis-Plus (baomidou.com) 进程&#xff1a; ​ 二、[此处不看]添加测试类&#xff0c;看下效果 2.1 参考 一、第一节课&#xff0c;用户管理--后端初始化&#xff0c;项目调通-CSDN博客 ​ 2.2 新建 SampleTest ​ 2.3 复…

SouthLeetCode-打卡24年01月第3周

SouthLeetCode-打卡24年01月第3周 // Date : 2024/01/15 ~ 2024/01/21 013.二分查找 (1) 题目描述 013#LeetCode.27.#北岸计划2024/01/15 // 略 (2) 题解代码 // 重做 014.移除元素 (1) 题目描述 014#LeetCode.160.#北岸计划2024/01/16 (2) 题解代码 Version1.0 c…

SpringMVC RESTful风格

Restful是一种软件架构风格、设计风格&#xff0c;而不是标准&#xff0c;只是提供了一组设计原则和约束条件。主要用于客户端和服务器交互类的软件&#xff0c;基于这个风格设计的软件可以更简洁&#xff0c;更有层次&#xff0c;更易于实现缓存机制等。 Restful风格的请求是…

python-自动化篇-运维-语音识别

文章目录 理论文本转换为语音使用 pyttsx使用 SAPI使用 SpeechLib 语音转换为文本 代码和效果01使用pyttsx实现文本_语音02使用SAPI实现文本_语音03使用SpeechLib实现文本_语音04使用PocketSphinx实现语音转换文本 理论 语音识别技术&#xff0c;也被称为自动语音识别&#xf…

C++ STL库之Vector简介及例题(二)

C STL库之Vector简介及例题&#xff08;二&#xff09; 继“C STL库之Vector简介及例题&#xff08;一&#xff09;【点击查看】”之后&#xff0c;这篇文章我们继续上一次的介绍&#xff0c;继续对vector的一些算法的函数进行简析及例题分析。 元素操作 删除第2个元素&…

Hive之set参数大全-22(完)

指定是否启用矢量化处理复杂数据类型 在 Hive 中&#xff0c;hive.vectorized.complex.types.enabled 是一个配置参数&#xff0c;用于指定是否启用矢量化处理复杂数据类型。以下是有关该参数的一些解释&#xff1a; 用途&#xff1a; 该参数用于控制是否启用 Hive 的矢量化执…

安卓逆向学习之ADB的配置和使用及刷机root

ADB的配置和使用 ADB即Android Debug Bridge&#xff0c;安卓调试桥&#xff0c;是谷歌为安卓开发者提供的开发工具之一&#xff0c;可以让你的电脑以指令窗口的方式控制手机。可以在安卓开发者网页中的 SDK 平台工具页面下直接下载对应系统的 adb 配置文件&#xff0c;大小只…

矩阵键盘的使用

在定义局部变量时&#xff0c;一定要给该变量赋初值。在这个程序中&#xff0c;给按键按下的返回值变量 KeyNum 赋值为 20 。 矩阵键盘线行扫描法的学习链接&#xff1a;https://www.bilibili.com/video/BV1dv411z7Gd/?spm_id_from333.999.0.0&vd_sourceb91967c499b23106…

SpringBoot项目监听reids中过期的key

使用redission客户端操作redis maven依赖 <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId><version>2.2.8.RELEASE</version> </dependency> <depe…

Vue3的v-model说明和使用方法

Vue 3 的 v-model 是一个语法糖&#xff0c;它为表单输入和应用状态之间创建了双向绑定。这样&#xff0c;当用户在表单中输入时&#xff0c;数据会自动更新&#xff0c;反之亦然。 说明 在 Vue 3 中&#xff0c;v-model 实际上是基于 value 属性和 input 事件实现的。这意味…

Python 因果推断(上)

引言 原文&#xff1a;causal-methods.github.io/Book/Introduction.html 译者&#xff1a;飞龙 协议&#xff1a;CC BY-NC-SA 4.0 作者&#xff1a;Vitor Kamada 电子邮件&#xff1a;econometrics.methodsgmail.com 最后更新日期&#xff1a;2020 年 8 月 15 日 这本书是使…

jupyter出现问题ModuleNotFoundError: No module named ‘exceptiongroup‘

今天使用pyg的jupyter环境发现这个环境没法用, 所以只能把这个kernel给重删了然后再装&#xff0c;操作记录如下 查看kernel jupyter kernelspec list注意不是jupyter kernel --list 需要加关键字spec, 删除kernel jupyter kernelspec remove pyg当重新安装这个kernel时可能…

阿里云云上微服务 EDAS接入使用以及注意事项

阿里云分布式应用服务&#xff08;EDAS, Elastic Distributed Application Service&#xff09;是阿里云提供的一款支持微服务架构的PaaS平台&#xff0c;它帮助用户在云端构建、部署和管理分布式应用。接入EDAS并使用其进行云上微服务管理主要包括以下步骤及注意事项&#xff…

Qt容器类

一、概述 用来存储其他的类或者数据。存储基础的int&#xff0c;float类型&#xff0c;也可以是QString&#xff0c;QDate类型。Qt的容器类比STL(标准模板库)中的容器类更轻巧、安全和易于使用。 Qt的容器类分为顺序容器&#xff08;sequential containers&#xff09;和关联容…

Redis -- 常用数据结构,认识数据类型和编码方式

"人生就像骑自行车&#xff0c;要保持平衡&#xff0c;就必须保持前进。" — 爱因斯坦 说到数据结构&#xff0c;或许就能想到哈希表&#xff0c;列表集合等数据结构。对于redis来说对应的key的value的形式也可以是这些数据结构&#xff0c;如下&#xff1a; 针对上面…

深度学习:机器学习的革命性突破

深度学习&#xff08;Deep Learning&#xff09;是机器学习领域中的一个新的研究方向&#xff0c;主要是通过建立类似于人脑的神经网络来模仿人类的感知、记忆、理解和生成等能力。深度学习的核心是神经网络&#xff0c;它能够从大量的数据中自动提取有用的特征&#xff0c;并基…