Flink 06 聚合操作入门学习,真不难

抛砖引玉

  1. 让你统计1小时内每种商品的销售额,用Flink 该怎么实现。

  2. 还是让你统计1小时内每种商品的销售额,但是要过滤掉退款的订单,用Flink 该怎么实现。

学了本文两个操作,不信你还不会。

AggregateFunction

通常用于对数据流中的数据进行分组聚合。它可以将一组数据逐步合并、计算,最终得到一个聚合结果。

AggregateFunction 接口包含几个关键的方法,这些方法定义了如何进行状态初始化、累加、合并和获取结果:

createAccumulator():该方法在聚合前被调用,用于初始化聚合状态。

add(value, accumulator)该方法将新的输入值加到累加器上。在每个事件到达时调用会调用该方法。

getResult(accumulator):该方法用于返回最终聚合结果。这在聚合操作结束时被调用。

merge(acc1, acc2)(可选):该方法作用是,在并行流处理情况下,需要合并不同实例的聚合结果。

以下示例模拟统计每小时各商品的销售额

public class AggregateFunctionDemo {public static class Order{String goods;int amount;public Order(String goods, int amount) {this.goods = goods;this.amount = amount;}}public static class OrderACC{String goods;int amount;public OrderACC(String goods, int amount) {this.goods = goods;this.amount = amount;}@Overridepublic String toString() {return "OrderACC{" +"goods='" + goods + '\'' +", amount=" + amount +'}';}}public  static class OrderACCFunction implements AggregateFunction<Order, OrderACC, OrderACC> {@Overridepublic OrderACC createAccumulator() {return new OrderACC(null,0);}@Overridepublic OrderACC add(Order value, OrderACC accumulator) {if (accumulator.goods == null) {accumulator.goods = value.goods;}accumulator.amount += value.amount;return accumulator;}@Overridepublic OrderACC getResult(OrderACC accumulator) {return accumulator;}@Overridepublic OrderACC merge(OrderACC a, OrderACC b) {a.amount += b.amount;return a;}}public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStream<Order> dataStream = env.addSource(new SourceFunction<Order>() {boolean running = true;List<String> goods = Arrays.asList("书包","本子","笔");@Overridepublic void run(SourceContext<Order> ctx) throws Exception {Random random = new Random();while (running){int goodsIndex =   random.nextInt(goods.size());int amount = random.nextInt(1000);Order order = new Order(goods.get(goodsIndex), amount);ctx.collect(order);Thread.sleep(200);}}@Overridepublic void cancel() {running = false;}});DataStream<OrderACC> resultStream =dataStream.keyBy(order -> order.goods).window(TumblingProcessingTimeWindows.of(Time.hours(5))).aggregate(new OrderACCFunction());resultStream.print();env.execute();}
}

AggregateFunction 小结

  • AggregateFunction 常用于对窗口内的数据进行聚合计算。

例如,你可能需要计算某个时间窗口内某个指标的平均值、总和、最大值或最小值等。

  • 在分布式计算环境中,通过实现 merge 方法,Flink 可以在不同的节点上并行地执行聚合计算,并在最后将结果合并。

ProcessWindowFunction

ProcessWindowFunction 是 Flink 提供的一个强大的窗口函数接口,允许开发者对窗口中的元素进行自定义处理,包括访问窗口的元数据和状态。

来看看ProcessWindowFunction中 process方法的定义

 void process(KEY key, Context context, Iterable<IN> elements, Collector<OUT> out) throws Exception;

图片

从上面方法定义我们基本可以推断ProcessWindowFunction 的特点

  • Iterable<IN> elements 窗口中所有元素 ,这与 ReduceFunction 或 AggregateFunction 不同,后者主要关注于元素之间的聚合操作。我们可以遍历elements,实现自己的聚合逻辑。

  • Context context:你可以通过Context获取到窗口的元数据,如窗口的开始和结束时间戳。甚至进行状态管理

ProcessWindowFunction 的使用

public class AggregateFunctionDemo2 {public static class Order{String goods;int amount;boolean refund;public Order(String goods, int amount, boolean refund) {this.goods = goods;this.amount = amount;this.refund = refund;}}public static class OrderACC{String goods;int amount;public OrderACC(String goods, int amount) {this.goods = goods;this.amount = amount;}@Overridepublic String toString() {return "OrderACC{" +"goods='" + goods + '\'' +", amount=" + amount +'}';}}public  static class OrderProcessWindowFunction extends ProcessWindowFunction<Order,OrderACC,String, TimeWindow> {@Overridepublic void process(String key, ProcessWindowFunction<Order, OrderACC, String, TimeWindow>.Context context, Iterable<Order> elements, Collector<OrderACC> out) throws Exception {int sum = 0;for(Order order : elements){if(!order.refund){sum += order.amount;}}out.collect(new OrderACC(key,sum));}}public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStream<Order> dataStream = env.addSource(new SourceFunction<Order>() {boolean running = true;List<String> goods = Arrays.asList("书包","本子","笔");@Overridepublic void run(SourceContext<Order> ctx) throws Exception {Random random = new Random();while (running) {int goodsIndex = random.nextInt(goods.size());int amount = random.nextInt(1000);boolean refund = random.nextBoolean();Order order = new Order(goods.get(goodsIndex), amount, refund);ctx.collect(order);Thread.sleep(100);}}@Overridepublic void cancel() {running = false;}});DataStream<OrderACC> resultStream = dataStream.keyBy(order -> order.goods).window(TumblingProcessingTimeWindows.of(Time.seconds(5))).process(new OrderProcessWindowFunction());resultStream.print();env.execute();}
}

图片

ProcessWindowFunction小结

  • 可以实现复杂的聚合逻辑,比如对窗口内元素进行过滤、排序之后 再进行聚合。

  • 可以获取窗口的状态信息,(如窗口的开始和结束时间)来满足一些特定的需求

总结

本文介绍了如何使用ProcessWindowFunction/AggregateFunction 完成一些聚合操作。通过对比两端代码,相信聪明的你已经体会到两者差异。再回到开头的问题,相信已经不是问题,信手拈来了。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/55694.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Android从上帝视角来看PackageManagerService

戳蓝字“牛晓伟”关注我哦&#xff01; 用心坚持输出易读、有趣、有深度、高质量、体系化的技术文章&#xff0c;技术文章也可以有温度。 前言 阅读该篇之前&#xff0c;建议先阅读下面的系列文章&#xff1a; Android深入理解包管理–PackageManagerService和它的“小伙伴…

HTB:Bashed[WriteUP]

目录 连接至HTB服务器并启动靶机 1.How many open TCP ports are listening on Bashed? 2.What is the relative path on the webserver to a folder that contains phpbash.php? 3.What user is the webserver running as on Bashed? 执行命令&#xff1a;whoami 4.S…

GraphRAG 与 RAG 的比较分析,收藏这一篇就够了!!!

检索增强生成&#xff08;RAG&#xff09;技术概述 检索增强生成&#xff08;Retrieval-Augmented Generation&#xff0c;简称 RAG&#xff09;是一种旨在提升大型语言模型&#xff08;Large Language Models&#xff0c;LLMs&#xff09;性能的技术方法。其核心思想是通过整…

5 -《本地部署开源大模型》在Ubuntu 22.04系统下ChatGLM3-6B高效微调实战

在Ubuntu 22.04系统下ChatGLM3-6B高效微调实战 无论是在单机单卡&#xff08;一台机器上只有一块GPU&#xff09;还是单机多卡&#xff08;一台机器上有多块GPU&#xff09;的硬件配置上启动ChatGLM3-6B模型&#xff0c;其前置环境配置和项目文件是相同的。如果大家对配置过程还…

如何给手机换ip地址

在当今数字化时代&#xff0c;IP地址作为设备在网络中的唯一标识&#xff0c;扮演着举足轻重的角色。然而&#xff0c;有时出于隐私保护、网络访问需求或其他特定原因&#xff0c;我们可能需要更改手机的IP地址。本文将详细介绍几种实用的方法&#xff0c;帮助您轻松实现手机IP…

一元n次多项式乘法【数据结构-链表】

一元n次多项式定义如下&#xff1a; 其中Ai​为实数&#xff0c;i为不小于0的整数。在完成“一元n次多项式输入输出”题目的基础上实现一元n次多项式的乘法。要求使用链表实现上述运算。 输入格式: 有两个一元n次多项式&#xff0c;格式分别为&#xff1a; f(X)3X2 X1 g(X)−…

MySQL 知识点_01

1、DISTINCT select DISTINCT EMPLOYEE_ID ,FIRST_NAME from employees 按照ID去重&#xff0c;DISTINCT的字段要放在前面&#xff0c;不会再继续在FIRST_NAME上去重判断&#xff1b; 如果需要多字段去重&#xff0c;需要用到group by&#xff0c;这个后面讲&#xff1b; …

一次恶意程序分析

首先F12shift查看字符表 字符表发现可疑字符串 双击进入 再tab 进入这里 推测为main函数 可见一些可疑的api FindResourceW推测该木马使用了资源加载 VirtualAlloc申请内存 然后sub_1400796E0 有 dwSize 参数 推测为 拷贝内存 memcpy类似函数 、 然后sub_140078CB0函数 跟进函…

HarmonyOS NEXT 应用开发实战(五、页面的生命周期及使用介绍)

HarmonyOS NEXT是华为推出的最新操作系统&#xff0c;arkUI是其提供的用户界面框架。arkUI的页面生命周期管理对于开发者来说非常重要&#xff0c;因为它涉及到页面的创建、显示、隐藏、销毁等各个阶段。以下是arkUI页面生命周期的介绍及使用举例。 页面的生命周期的作用 页面…

【正点原子K210连载】第四十六章 车牌识别实验 摘自【正点原子】DNK210使用指南-CanMV版指南

第四十六章 车牌识别实验 在上一章节中&#xff0c;介绍了利用maix.KPU模块实现了通过提取图像中人脸的特征进行人脸识别&#xff0c;本章将继续介绍利用maix.KPU模块实现的车牌识别。通过本章的学习&#xff0c;读者将学习到车牌识别应用在CanMV上的实现。 本章分为如下几个小…

视觉识别技术:开启智能视觉新时代

引言 在数字化时代&#xff0c;信息的获取和处理变得前所未有的重要。视觉识别技术&#xff0c;作为人工智能领域的一个重要分支&#xff0c;正在逐渐改变我们与数字世界的互动方式。它通过模拟人类视觉系统&#xff0c;使计算机能够识别和理解图像和视频中的内容&#xff0c;…

Shell案例之一键部署mysql

1.问题 我认为啊学习就是一个思考的过程&#xff0c;思考问题的一个流程应该是&#xff1a;提出问题&#xff0c;分析问题&#xff0c;解决问题 在shell里部署mysql服务时&#xff0c;我出现一些问题&#xff1a; 1.安装mysql-server时&#xff0c;没有密钥&#xff0c;安装…

普通java web项目集成spring-session

之前的老项目&#xff0c;希望使用spring-session管理会话&#xff0c;存储到redis。 项目环境&#xff1a;eclipse、jdk8、jetty嵌入式启动、非spring项目。 实现思路&#xff1a; 1.添加相关依赖jar。 2.配置redis连接。 3.配置启动spring。 4.配置过滤器&#xff0c;拦…

L1练习-鸢尾花数据集处理(分类/聚类)

背景 前文&#xff08;《AI 自学 Lesson1 - Sklearn&#xff08;开源Python机器学习包&#xff09;》&#xff09;以鸢尾花数据集的处理为例&#xff0c;本文将完善其代码&#xff0c;在使用 sklearn 的部分工具包基础上&#xff0c;增加部分数据预处理、数据分析和数据可视化…

QUIC 协议的优势

QUIC 协议的优势包括&#xff1a; 快速建立连接&#xff1a;将传输层和加密层的握手合并&#xff0c;减少了连接建立的延迟。QUIC 建连时间大约为 0~1RTT&#xff0c;相比 HTTPS 的 3RTT 建连&#xff0c;具有极大的优势。客户端第一次建连的握手协商需 1RTT&#xff0c;而已建…

Linux 和Windows创建共享文件夹实现文件共享

直接开整 1.Windows下创建共享文件夹share右击-》属性—》共享-》选择所有人-》点击共享 2.共享创建完成后可以使他的共享网络地址或者Windows ip地址-推荐使用Windows ip地址有时候 不知道什么原因他Linux解析不了网络地址 共享网络地址 —共享文件夹share 右击-》属性—》共…

扫普通链接二维码打开小程序

1. 2.新增规则&#xff08;注意下载文件到跟目录下&#xff0c;需要建个文件夹放下载的校验文件&#xff09; 3.发布 ps&#xff1a;发布后&#xff0c;只能访问正式版本。体验版本如果加了 测试链接http://xxx/xsc/10 那么http://xxx/xsc/aa.....应该都能访问 例如aa101 aa…

CMOS晶体管的串联与并联

CMOS晶体管的串联与并联 前言 对于mos管的串联和并联&#xff0c;一直没有整明白&#xff0c;特别是设计到EDA软件中&#xff0c;关于MOS的M和F参数&#xff0c;就更困惑了&#xff0c;今天看了许多资料以及在EDA软件上验证了电路结构与版图的对应关系&#xff0c;总算有点收…

VScode中CMake无高亮(就是没有补全的提示)

在我学的过程中我发现我的CMake是这样的&#xff0c;如下图 但在教学视频里是这样的&#xff08;如下图&#xff09; 这非常的难受&#xff0c;所以疯狂的找&#xff0c;最后是CMake报错有 原因就是&#xff1a;本地没有配置环境变量&#xff0c;解决方法是下一个cmake然后直接…

STM32-CubeIDE用串口通讯

USART串口通讯 一、轮询模式 1.设置所接引脚为UART异步模式 选择完成CTRLS保存。 2.编写测试代码&#xff08;自动发送hello world&#xff09; 在mian函数里面编写代码 原函数 调用函数&#xff0c;需要数据类型一致&#xff0c;使用函数通过串口发送数组里面的数据 打开串…