大数据-玩转数据-Flink窗口函数

一、Flink窗口函数

前面指定了窗口的分配器, 接着我们需要来指定如何计算, 这事由window function来负责. 一旦窗口关闭, window function 去计算处理窗口中的每个元素.
window function 可以是ReduceFunction,AggregateFunction,or ProcessWindowFunction中的任意一种.
ReduceFunction,AggregateFunction更加高效, 原因就是Flink可以对到来的元素进行增量聚合 . ProcessWindowFunction 可以得到一个包含这个窗口中所有元素的迭代器, 以及这些元素所属窗口的一些元数据信息.
ProcessWindowFunction不能被高效执行的原因是Flink在执行这个函数之前, 需要在内部缓存这个窗口上所有的元素。
除了一些简单聚合,比如 sum,max,min,maxBay,minBay ,有以下窗口聚合函数。

二、ReduceFunction(增量聚合函数)

输入和输出必须一致

package com.lyh.flink07;import com.lyh.bean.WaterSensor;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;public class Window_s_function {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);env.socketTextStream("hadoop100",9999).map(line -> {String[] data = line.split(",");return new WaterSensor(data[0],Long.valueOf(data[1]),Integer.valueOf(data[2]));}).keyBy(WaterSensor::getId).window(TumblingProcessingTimeWindows.of(Time.seconds(5))).reduce(new ReduceFunction<WaterSensor>() {@Overridepublic WaterSensor reduce(WaterSensor value1,WaterSensor value2) throws Exception {System.out.println("Window_s_function.reduce");value1.setVc ( value1.getVc() + value2.getVc());return (value1);}}).print();env.execute();}
}

运行结果
在这里插入图片描述
在这里插入图片描述

三、AggregateFunction(增量聚合函数)

输入和输出可以不一致

package com.lyh.flink07;import com.lyh.bean.WaterSensor;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.apache.kafka.common.metrics.stats.Avg;import java.util.List;public class Window_s_function_2 {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);env.socketTextStream("hadoop100",9999).map(line -> {String[] data = line.split(",");return new WaterSensor(data[0],Long.valueOf(data[1]),Integer.valueOf(data[2]));}).keyBy(WaterSensor::getId).window(TumblingProcessingTimeWindows.of(Time.seconds(5))).aggregate(new AggregateFunction<WaterSensor, Avg, Double>() {@Overridepublic Avg createAccumulator() {return new Avg();}@Overridepublic Avg add(WaterSensor value, Avg acc) {acc.sum += value.getVc();acc.couunt++;return acc;}@Overridepublic Double getResult(Avg acc) {return acc.sum * 1.0 / acc.couunt;}@Overridepublic Avg merge(Avg avg, Avg acc1) {return null;}},new ProcessWindowFunction<Double, String, String, TimeWindow>() {@Overridepublic void process(String key,Context ctx,Iterable<Double> elements,Collector<String> out) throws Exception {Double result = elements.iterator().next();long starttime = ctx.window().getStart();long endtime = ctx.window().getEnd();out.collect("窗口:" + starttime + "  " + endtime +  " key: " + key + " result: " + result);}}).print();env.execute();}public static class Avg {public Integer sum = 0;public Long couunt = 0L;};
}

运行结果
在这里插入图片描述
在这里插入图片描述

四、ProcessWindowFunction(全窗口函数)

上面例子里已经用到

new ProcessWindowFunction<Double, String, String, TimeWindow>() {@Overridepublic void process(String key,Context ctx,Iterable<Double> elements,Collector<String> out) throws Exception {Double result = elements.iterator().next();long starttime = ctx.window().getStart();long endtime = ctx.window().getEnd();out.collect("窗口:" + starttime + "  " + endtime +  " key: " + key + " result: " + result);}}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/65104.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

打包个七夕exe玩玩

前段时间七夕 当别的哥们都在酒店不要不要的时候 身为程序员的我 还在单位群收到收到 正好后来看到大佬些的这个 https://www.52pojie.cn/thread-1823963-1-1.html 这个贱 我必须要犯&#xff0c;可是我也不能直接给他装个python吧 多麻烦 就这几个弹窗 好low 加上bgm 再打包成…

Nexus仓库介绍以及maven deploy配置

一 、Nexus仓库介绍 首先介绍一下Nexus的四个仓库的结构&#xff1a; maven-central 代理仓库&#xff0c;代理了maven的中央仓库&#xff1a;https://repo1.maven.org/maven2/&#xff1b; maven-public 仓库组&#xff0c;另外三个仓库都归属于这个组&#xff0c;所以我们的…

贝叶斯神经网络 - 捕捉现实世界的不确定性

贝叶斯神经网络 - 捕捉现实世界的不确定性 Bayesian Neural Networks 生活本质上是不确定性和概率性的&#xff0c;贝叶斯神经网络 (BNN) 旨在捕获和量化这种不确定性 在许多现实世界的应用中&#xff0c;仅仅做出预测是不够的&#xff1b;您还想知道您对该预测的信心有多大。例…

第2章 Linux多进程开发 2.18 内存映射

内存映射&#xff1a;可以进行进程间的通信 1.如果对mmap的返回值(ptr)做操作(ptr), munmap是否能够成功? void * ptr mmap(…); ptr; 可以对其进行操作 munmap(ptr, len); // 错误,要保存地址 2.如果open时O_RDONLY, mmap时prot参数指定PROT_READ | PROT_WRITE会怎样? 错…

二进制安全虚拟机Protostar靶场 安装,基础知识讲解,破解STACK ZERO

简介 pwn是ctf比赛的方向之一&#xff0c;也是门槛最高的&#xff0c;学pwn前需要很多知识&#xff0c;这里建议先去在某宝上买一本汇编语言第四版&#xff0c;看完之后学一下python和c语言&#xff0c;python推荐看油管FreeCodeCamp的教程&#xff0c;c语言也是 pwn题目大部…

Rest和Http什么关系?

分析&回答 REST 定义了一组体系架构原则&#xff0c;您可以根据这些&#xff0c;包括使用不同语言编写的客户端如何通过 HTTP 处理和传输资源状态。 REST只是一种风格&#xff0c;不是一种标准REST是以资源为中心的 用不同的 HTTP 请求方法来处理对资源的 CRUD&#xff0…

Lambda表达式第四版

1、冗余的Runnbale代码 package com.lambda;public class Demo01Runnable {public static void main(String[] args) {RunnableImpl runnable new RunnableImpl();Thread thread new Thread(runnable);thread.start();//Lambda表达式} }class RunnableImpl implements Runnab…

SpringBoot 使用MyBatis分页插件实现分页功能

SpringBoot 使用MyBatis分页插件实现分页功能 1、集成pagehelper2、配置pagehelper3、编写代码4、分页效果 案例地址&#xff1a; https://gitee.com/vinci99/paging-pagehelper-demo/tree/master 1、集成pagehelper <!-- 集成pagehelper --> <dependency><gr…

“亚马逊云科技创业加速器”首期聚焦AI,促进入营企业业务发展

生成式AI技术飞速发展&#xff0c;颠覆着人们的生活&#xff0c;正在掀起新一轮的科技革命。在生成式AI的浪潮中&#xff0c;亚马逊云科技旨在为中国的优秀初创企业提供全方位支持&#xff0c;助其抢占先机。 在6月底举办的亚马逊云科技中国峰会上&#xff0c;亚马逊云科技联合…

6. series对象及DataFrame对象知识总结

【目录】 文章目录 6. series对象及DataFrame对象知识总结1. 导入pandas库2. pd.Series创建Series对象2.1 data 列表2.2 data 字典 3. s1.index获取索引4. s1.value获取值5. pd.DataFrame()-创建DataFrame 对象5.1 data 列表5.2 data 嵌套列表5.3 data 字典 6. df[列索引]…

java对象创建的过程

1、检查指令的参数是否能在常量池中定位到一个类的符号引用 2、检查此符号引用代表的类是否已被加载、解析和初始化过。如果没有&#xff0c;就先执行相应的类加载过程 3、类加载检查通过后&#xff0c;接下来虚拟机将为新生对象分配内存。 4、内存分配完成之后&#xff0c;…

一句话画出动漫效果

链接&#xff1a; AI Comic Factory - a Hugging Face Space by jbilcke-hfDiscover amazing ML apps made by the communityhttps://huggingface.co/spaces/jbilcke-hf/ai-comic-factory 选择类型&#xff1a; Japanese 输入提示词&#xff1a; beauty and school love st…

webSocket前后端交互pc端版

前端代码 <!--* Author: 第一好帅宝* Date: 2023-08-29 16:12:26* LastEditTime: 2023-08-29 16:54:50* FilePath: \websocket\ceshi.html --> <!DOCTYPE html> <html lang"en"><head><meta charset"UTF-8"><meta name&…

Tomcat快速入门

概念&#xff1a;Tomcat是Apache软件基金会一个核心项目&#xff0c;是一个开源免费的轻量级Web服务器&#xff0c;支持Servlet/JSP少量JavaEE规范 Tomcat也被称为Web容器、Servlet容器。Servlet程序需要依赖于Tomcat才能运行 官网&#xff1a;Apache Tomcat - Welcome! 基本…

12、监测数据采集物联网应用开发步骤(9.1)

监测数据采集物联网应用开发步骤(8.2) TCP/IP Server开发 在com.zxy.common.Com_Para.py中添加如下内容 #锁机制 lock threading.Lock() #本机服务端端口已被连接客户端socket list dServThreadList {} #作为服务端接收数据拦截器 ServerREFLECT_IN_CLASS "com.plug…

设计模式-装饰模式

文章目录 一、简介二、基本概念三、装饰模式的结构和实现类图解析&#xff1a;装饰器的实现方式继承实现&#xff1a;组合实现&#xff1a;继承和组合对比 四、装饰模式的应用场景五、与其他模式的关系六、总结 一、简介 装饰模式是一种结构型设计模式&#xff0c;它允许动态地…

msvcp120.dll丢失的解决方法?全面解决方法推荐

msvcp120.dll是Windows操作系统中的一个关键组件&#xff0c;如果丢失或损坏&#xff0c;可能会导致系统崩溃或无法正常运行。本文将介绍三种解决msvcp120.dll丢失问题的方法。 随着计算机应用的广泛普及&#xff0c;越来越多的人开始遇到各种电脑问题。其中&#xff0c;msvcp…

FPGA原理与结构——FIFO IP核的使用与测试

一、前言 本文介绍FIFO Generator v13.2 IP核的具体使用与例化&#xff0c;在学习一个IP核的使用之前&#xff0c;首先需要对于IP核的具体参数和原理有一个基本的了解&#xff0c;具体可以参考&#xff1a; FPGA原理与结构——FIFO IP核原理学习https://blog.csdn.net/apple_5…

算法通关村第十二关——字符串反转问题解析

前言 字符串反转是关于字符串算法里的重要问题&#xff0c;虽然不是太难&#xff0c;但需要考虑到一些边界问题。本篇文章就对几道字符串反转题目进行分析。 1.反转字符串 力扣344题&#xff0c;编写一个函数&#xff0c;其作用是将输入的字符串反转过来。输入字符串以字符数…

亚马逊云科技生成式AI技术辅助教学领域,近实时智能应答2D数字人搭建

早在大语言模型如GPT-3.5等的兴起和被日渐广泛的采用之前&#xff0c;教育行业已经在AI辅助教学领域有过各种各样的尝试。在教育行业&#xff0c;人工智能技术的采用帮助教育行业更好地实现教学目标&#xff0c;提高教学质量、学习效率、学习体验、学习成果。例如&#xff0c;人…