37、Flink 的 WindowAssigner之会话窗口示例

1、处理时间
无需设置水位线和时间间隔。

input.keyBy(e -> e).window(ProcessingTimeSessionWindows.withGap(Time.minutes(10))).apply(new WindowFunction<String, String, String, TimeWindow>() {@Overridepublic void apply(String s, TimeWindow timeWindow, Iterable<String> iterable, Collector<String> collector) throws Exception {for (String string : iterable) {collector.collect(string);}}}).print();

2、事件时间
需设置水位线和时间间隔。

// 事件时间需要设置水位线策略和时间戳SingleOutputStreamOperator<Tuple2<String, Long>> map = input.map(new MapFunction<String, Tuple2<String, Long>>() {@Overridepublic Tuple2<String, Long> map(String input) throws Exception {String[] fields = input.split(",");return new Tuple2<>(fields[0], Long.parseLong(fields[1]));}});SingleOutputStreamOperator<Tuple2<String, Long>> watermarks = map.assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple2<String, Long>>forBoundedOutOfOrderness(Duration.ofSeconds(0)).withTimestampAssigner(new SerializableTimestampAssigner<Tuple2<String, Long>>() {@Overridepublic long extractTimestamp(Tuple2<String, Long> input, long l) {return input.f1;}}));// 设置了固定间隔的 event-time 会话窗口watermarks.keyBy(e -> e.f0).window(EventTimeSessionWindows.withGap(Time.minutes(10))).apply(new WindowFunction<Tuple2<String, Long>, String, String, TimeWindow>() {@Overridepublic void apply(String s, TimeWindow timeWindow, Iterable<Tuple2<String, Long>> iterable, Collector<String> collector) throws Exception {for (Tuple2<String, Long> stringLongTuple2 : iterable) {collector.collect(stringLongTuple2.f0);}}}).print();

3、固定间隔和动态间隔

EventTimeSessionWindows.withGap(Time.minutes(10));EventTimeSessionWindows.withDynamicGap(new SessionWindowTimeGapExtractor<Tuple2<String, Long>>() {@Overridepublic long extract(Tuple2<String, Long> element) {return element.f1 + 2000L;}});

4、完整代码示例

import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.assigners.ProcessingTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.assigners.SessionWindowTimeGapExtractor;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;import java.time.Duration;public class _04_WindowAssignerSession {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<String> input = env.socketTextStream("localhost", 8888);// 测试时限制了分区数,生产中需要设置空闲数据源env.setParallelism(2);// 事件时间需要设置水位线策略和时间戳SingleOutputStreamOperator<Tuple2<String, Long>> map = input.map(new MapFunction<String, Tuple2<String, Long>>() {@Overridepublic Tuple2<String, Long> map(String input) throws Exception {String[] fields = input.split(",");return new Tuple2<>(fields[0], Long.parseLong(fields[1]));}});SingleOutputStreamOperator<Tuple2<String, Long>> watermarks = map.assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple2<String, Long>>forBoundedOutOfOrderness(Duration.ofSeconds(0)).withTimestampAssigner(new SerializableTimestampAssigner<Tuple2<String, Long>>() {@Overridepublic long extractTimestamp(Tuple2<String, Long> input, long l) {return input.f1;}}));// 设置了固定间隔的 event-time 会话窗口watermarks.keyBy(e -> e.f0).window(EventTimeSessionWindows.withGap(Time.minutes(10))).apply(new WindowFunction<Tuple2<String, Long>, String, String, TimeWindow>() {@Overridepublic void apply(String s, TimeWindow timeWindow, Iterable<Tuple2<String, Long>> iterable, Collector<String> collector) throws Exception {for (Tuple2<String, Long> stringLongTuple2 : iterable) {collector.collect(stringLongTuple2.f0);}}}).print();// 设置了动态间隔的 event-time 会话窗口watermarks.keyBy(e -> e.f0).window(EventTimeSessionWindows.withDynamicGap(new SessionWindowTimeGapExtractor<Tuple2<String, Long>>() {@Overridepublic long extract(Tuple2<String, Long> element) {return element.f1 + 2000L;}})).apply(new WindowFunction<Tuple2<String, Long>, String, String, TimeWindow>() {@Overridepublic void apply(String s, TimeWindow timeWindow, Iterable<Tuple2<String, Long>> iterable, Collector<String> collector) throws Exception {for (Tuple2<String, Long> stringLongTuple2 : iterable) {collector.collect(stringLongTuple2.f0);}}}).print();// 设置了固定间隔的 processing-time session 窗口input.keyBy(e -> e).window(ProcessingTimeSessionWindows.withGap(Time.minutes(10))).apply(new WindowFunction<String, String, String, TimeWindow>() {@Overridepublic void apply(String s, TimeWindow timeWindow, Iterable<String> iterable, Collector<String> collector) throws Exception {for (String string : iterable) {collector.collect(string);}}}).print();// 设置了动态间隔的 processing-time 会话窗口input.keyBy(e -> e).window(ProcessingTimeSessionWindows.withDynamicGap(new SessionWindowTimeGapExtractor<String>() {@Overridepublic long extract(String s) {return System.currentTimeMillis() / 1000;}})).apply(new WindowFunction<String, String, String, TimeWindow>() {@Overridepublic void apply(String s, TimeWindow timeWindow, Iterable<String> iterable, Collector<String> collector) throws Exception {for (String string : iterable) {collector.collect(string);}}}).print();env.execute();}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/21884.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

探索气象数据的多维度三维可视化:PM2.5、风速与高度分析

探索气象数据的多维度可视化&#xff1a;PM2.5、风速与高度分析 摘要 在现代气象学中&#xff0c;数据可视化是理解复杂气象模式和趋势的关键工具。本文将介绍一种先进的数据可视化技术&#xff0c;它能够将PM2.5浓度、风速和高度等多维度数据以直观和动态的方式展现出来。 …

ChatTTS 如何安装可视化操作

可视化一键安装下载地址&#xff1a; 百度网盘 Download from GitHub 从 GitHub 下载代码。 git clone https://github.com/2noise/ChatTTS 下载地址 Install Dependencies 在开始之前&#xff0c;请确保已安装必要的软件包。如果您尚未安装它们&#xff0c;可以使用 pip …

Android百度人脸识别3.0配置

JDK 必须是16的版本 如果报错的错误是"opens java.io" org.gradle.jvmargs -Xmx2048M -Dkotlin.daemon.jvm.options\"-Xmx2048M" --add-exportsjava.base/sun.nio.chALL-UNNAMED --add-opensjava.base/java.langALL-UNNAMED --add-opensjava.base/java.…

智能售货机投资指南:从成本预算到市场策略的全方位解析

现代化智能设施的典范&#xff0c;智能售货机以其丰富的商品选项与无缝购物体验著称。然而&#xff0c;涉足此领域前&#xff0c;一番周密的投资考量不可或缺。 首要因素聚焦于售货机本身的购置费用&#xff0c;该费用弹性颇大&#xff0c;依据型号与功能差异而定。基础的小型…

Linux 36.3 + JetPack v6.0@jetson-inference之语义分割

Linux 36.3 JetPack v6.0jetson-inference之语义分割 1. 源由2. segNet2.1 命令选项2.2 下载模型2.2.1 Cityscapes2.2.2 DeepScene2.2.3 MHP2.2.4 VOC2.2.5 SUN 2.3 操作示例2.3.1 单张照片2.3.2 多张照片2.3.3 视频 3. 代码3.1 Python3.2 C 4. 参考资料 1. 源由 分类和目标识…

Docker面试整理-Docker的核心组件是什么?

Docker 的核心组件构成了它的基本架构,使其能够高效地进行容器化部署和管理。这些组件协同工作,为开发者和系统管理员提供了一个强大的工具集,用于构建、分发和运行容器。主要的核心组件包括: Docker 客户端和服务器(Docker Engine):Docker 客户端(Client)允许用户通过…

【Unity】使用Jenkins实现远程Unity打包

前言 很多时候&#xff0c;我们需要自动打包&#xff0c;比如下班了&#xff0c;我要出一个包明天早上用。比如每天夜里12点&#xff0c;我需要定时出一个稳定包。 这个时候就需要Jenkins了。 1.安装环境 安装 jenkins 之前&#xff0c;需要安装Java 。Java下载网站 ①下载…

揭秘大数据时代的数据库存储引擎:关系型、NoSQL与NewSQL如何选择?

文章目录 01 关系型数据库&NoSQL数据库&NewSQL数据库1. 关系型数据库2. NoSQL数据库3. NewSQL数据库 02 OLTP&OLAP&HTAP对比1. OLTP数据库2. OLAP数据库3. HTAP数据库 03 总结 在大数据和AI时代&#xff0c;数据库成为各类应用不可或缺的重要组成部分。而数据库…

数据湖构建基础:高效数据提取与存储策略

随着大数据技术的快速发展&#xff0c;企业对于数据处理和分析的需求日益增加。数据湖作为一种集中式存储和处理大规模数据的架构&#xff0c;已经成为企业处理非结构化和半结构化数据的重要工具。然而&#xff0c;构建一个高效、稳定的数据湖并非易事&#xff0c;其中高效的数…

JDK参数设置中文版

java最新JDK参数设置 行为选项Garbage First&#xff08;G1&#xff09;垃圾收集选项性能选项调试选项 行为选项 选项默认值描述-XX:-AllowUserSignalHandlers未设置如果应用程序安装了信号处理程序&#xff0c;不要抱怨。&#xff08;仅适用于 Solaris 和 Linux。&#xff09…

2024年生物、农业与工程技术国际会议(BAET 2024)

2024年生物、农业与工程技术国际会议&#xff08;BAET 2024&#xff09; 2024 International Conference on Biology, Agriculture, and Engineering Technology 目录 【会议简介】2024年生物、农业与工程技术国际会议将于昆明盛大召开。此次会议汇聚了全球生物、农业与工程技…

网络编程(一)

网络编程&#xff08;一&#xff09; 网络基础网络体系结构**OSI的7层模型**&#xff1a;&#xff08;理想化&#xff09;**每层的功能** **TCP/IP的4层模型**&#xff1a;&#xff08;在使用&#xff09;常见的协议IP地址IPV4分类A类&#xff08;第1位固定为0&#xff09;B类&…

「前端+鸿蒙」核心技术HTML5+CSS3(十二)

1、CSS3 伸缩盒模型简介 CSS3 的 Flexbox(伸缩盒)模型是一种布局模式,用于在容器内对齐和分配空间,即使它们的大小未知或动态变化。Flexbox 使得布局能够适应不同屏幕尺寸和设备。 示例代码: <div class="flex-container"><div>项目1</div>…

大文件续传,文件分享

1. 最近各种文件分享平台&#xff0c;很多都要注册&#xff0c; 对于很多需要临时分享文件下的场景&#xff0c;不想被这种东西烦恼&#xff0c;于是借鉴网上代码&#xff0c;进行了一些修改&#xff0c; 写了一个文件分享项目&#xff0c; 该项目只是自用&#xff0c;数据库都…

为何使用代理池:

匿名性&#xff1a; 代理池允许爬虫在请求目标网站时使用不同的IP地址&#xff0c;从而保护真实身份。 防封锁&#xff1a; 通过动态切换IP&#xff0c;可以规避网站对特定IP的封锁&#xff0c;提高爬虫的稳定性。 分布式请求&#xff1a; 代理池使爬虫能够通过多个IP地址发起…

go语言接口之接口类型

接口类型具体描述了一系列方法的集合&#xff0c;一个实现了这些方法的具体类型是这个接口类型的 实例。 io.Writer类型是用的最广泛的接口之一&#xff0c;因为它提供了所有的类型写入bytes的抽象&#xff0c;包括文 件类型&#xff0c;内存缓冲区&#xff0c;网络链接&#x…

Science Robotics 可实现中心聚焦与多光谱成像的鸟类视觉启发钙钛矿人工视觉系统

一、前沿速览 来自韩国基础科学研究所&#xff08;IBS&#xff09;纳米粒子研究中心的研究人员及其合作者提出了一个利用鸟类视觉注视点和多光谱成像的人工视觉系统。近日在Science Robotics 上发表的文章引入了人工中央凹和垂直堆叠的钙钛矿光电探测器阵列&#xff0c;其设计…

webserver timer

定时器用来处理非活动链接。 webserver项目中&#xff0c;通过信号函数来实现定时。 调用alarm()系统调用&#xff0c;设置好时间&#xff0c;这段时间结束后&#xff0c;alarm会发出sig_alarm信号。而信号处理函数做的事情仅仅只是将代表该信号的值写入管道(pipefd)。 在event…

NLP基础——序列模型(动手学深度学习)

序列模型 定义 序列模型是自然语言处理&#xff08;NLP&#xff09;和机器学习领域中一类重要的模型&#xff0c;它们特别适合处理具有时间顺序或序列结构的数据&#xff0c;例如文本、语音信号或时间序列数据。 举个例子&#xff1a;一部电影的评分在不同时间段的评分可能是…

#!/usr/bin/env bash

#!/usr/bin/env bash 是一个在 Unix 和 Unix-like 系统&#xff08;如 Linux 和 macOS&#xff09;中常见的 shebang&#xff08;或称为 shebang 行、hashbang、pound bang 或 hash-bang&#xff09;指令。 这个指令有以下几个部分&#xff1a; #!&#xff1a;这是一个特殊的…