Flink -- window(窗口)

1、窗口主要分成三大种:
        1、Time Window (时间窗口):固定时间触发一次窗口

                a、SlidingEventTimeWindows: 滑动的事件时间窗口

public class Demo1TImeWindow {public static void main(String[] args) throws Exception {/*** 时间窗口:由时间触发的窗口* SlidingEventTimeWindows: 滑动的事件时间窗口* SlidingProcessingTimeWindows:滑动的处理时间窗口* TumblingEventTimeWindows:滚动的事件时间窗口* TumblingProcessingTimeWindows:滚动的处理时间窗口*/StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStream<String> linesDS = env.socketTextStream("master", 8888);//解析数据,取出时间字段DataStream<Tuple2<String, Long>> wordAndTsDS = linesDS.map(line -> {String[] split = line.split(",");String word = split[0];//将时间戳转换成long类型long ts = Long.parseLong(split[1]);return Tuple2.of(word, ts);}, Types.TUPLE(Types.STRING, Types.LONG));//设置时间字段和水位线DataStream<Tuple2<String, Long>> assDS = wordAndTsDS.assignTimestampsAndWatermarks(WatermarkStrategy//水位线前移1秒.<Tuple2<String, Long>>forBoundedOutOfOrderness(Duration.ofSeconds(1)).withTimestampAssigner((kv, ts) -> kv.f1));/*** 每隔5秒统计最近10秒单词的数量*/SingleOutputStreamOperator<Tuple2<String, Integer>> kvDS = assDS.map(kv -> Tuple2.of(kv.f0, 1), Types.TUPLE(Types.STRING, Types.INT));kvDS.keyBy(kv -> kv.f0)//滑动的事件时间窗口.window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5))).sum(1).print();env.execute();}
}

                b、SlidingProcessingTimeWindows:滑动的处理时间窗口

public class Demo03ProcessingTime {public static void main(String[] args)  throws Exception{/*** 数据处理时间:一般会结合窗口使用,一般值的是接受数据后对数据操作的时间* 需求:每过5秒中统计15秒内的单词的数量*///构建Flink的环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//使用socket模拟实时的操作DataStreamSource<String> wordDS = env.socketTextStream("master", 8888);//将接受的数据的转换成kv的格式SingleOutputStreamOperator<Tuple2<String, Integer>> kvDS = wordDS.map(word -> Tuple2.of(word, 1), Types.TUPLE(Types.STRING,Types.INT));//按照单词进行分组KeyedStream<Tuple2<String, Integer>, String> keyByDS = kvDS.keyBy(key -> key.f0);//划分窗口,窗口的大小是10秒钟,滑动的时间是5秒钟WindowedStream<Tuple2<String, Integer>, String, TimeWindow> windowDS = keyByDS .window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)));//对统计的单词进行求和SingleOutputStreamOperator<Tuple2<String, Integer>> countDS = windowDS.sum(1);countDS.print();//启动Flinkenv.execute();}
}

                c、TumblingEventTimeWindows:滚动的事件时间窗口

kvDS.keyBy(kv -> kv.f0) 
//滚动的事件时间窗口.window(TumblingEventTimeWindows.of(Time.seconds(5))).sum(1).print();env.execute();

                d、TumblingProcessingTimeWindows:滚动的处理时间窗口

kvDS.keyBy(kv -> kv.f0)//滚动的处理时间窗口: 
.window(TumblingProcessingTimeWindows.of(Time.seconds(5))).sum(1).print();env.execute();
        2、Session Window (会话窗口):如果一段时间没有数据就会生成一个窗口,将前面的数据拉去过来一起计算。

        1、 ProcessingTimeSessionWindows: 处理时间的会话窗口,是针对每一个key都会统计他的数量。

        2、EventTimeSessionWindows: 事件时间的会话窗口(需要由时间字段和水位线)(使用的比较少)

public class Demo3SessionWindow {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStream<String> linesDS = env.socketTextStream("master", 8888);/*** ProcessingTimeSessionWindows: 处理时间的会话窗口* EventTimeSessionWindows: 事件时间的会话窗口(需要由时间字段和水位线)**/linesDS.map(word -> Tuple2.of(word, 1), Types.TUPLE(Types.STRING, Types.INT)).keyBy(kv -> kv.f0)//某一个key如果5秒没有数据产生,将前面的数据放一起进行计算.window(ProcessingTimeSessionWindows.withGap(Time.seconds(5))).sum(1).print();env.execute();}
}
        3、Count Window (统计窗口):固定的数据量计算一次

                1、countWindow(10): 滚动的统计窗口
                 2、countWindow(10,2):滑动的统计窗口

public class Demo2CountWindow {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStream<String> linesDS = env.socketTextStream("master", 8888);/*** 实时统计单词的数量,每10条数据统计一次*  .countWindow(10): 滚动的统计窗口*  .countWindow(10,2):滑动的统计窗口*/linesDS.map(word -> Tuple2.of(word, 1), Types.TUPLE(Types.STRING, Types.INT)).keyBy(kv -> kv.f0)//每隔10条数据计算一次,同一个key每隔10条计算一次.countWindow(10).sum(1).print();env.execute();}
}

注意:对于事件时间,需要指定时间字段和水位线,处理时间不需要指定。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/134617.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

vue3+ts 项目遇到的问题和bug

1.router中使用pinia报错 pinia.mjs:1709 Uncaught Error: [&#x1f34d;]: "getActivePinia()" was called but there was no active Pinia. Are you trying to use a store before calling "app.use(pinia)"? See https://pinia.vuejs.org/core-concep…

python爬虫怎么翻页 ?

首先&#xff0c;你需要安装相关的库。在你的命令行窗口中&#xff0c;输入以下命令来安装所需的库&#xff1a; pip install requests beautifulsoup4然后&#xff0c;你可以使用以下代码来爬取网页内容并翻页&#xff1a; package mainimport ("fmt""net/htt…

ffmpeg 从内存中读取数据(或将数据输出到内存)

1.为了使本文更通俗易懂&#xff0c;更新了部分内容&#xff0c;将例子改为从内存中打开。 2.增加了将数据输出到内存的方法。 从内存中读取数据 ffmpeg一般情况下支持打开一个本地文件&#xff0c;例如“C:\test.avi” 或者是一个流媒体协议的URL&#xff0c;例如“rtmp:/…

【沐风老师】3dMax快速平铺纹理插件QuickTiles教程

QuickTiles是3ds max的一个插件&#xff0c;允许您将常规瓷砖纹理转换为交互式纹理&#xff0c;就在mat.editor中。 换言之&#xff0c;您可以根据需要对任何纹理进行修改和重新创建&#xff1a;更改布局、瓷砖大小、格式、颜色、接缝、体积、随机化形状或纹理等等。 这种方法大…

打造高效的客服体系,就在于这个“专属链接”

想要追踪特定数据&#xff0c;但是得在数据库中大海捞针&#xff1f; 想要知道某个推广的效果&#xff0c;但是无法追踪&#xff1f; 想要获得个性化的报告&#xff0c;但是数据不够精准&#xff1f; 面对这些情况&#xff0c;只需要靠一条“专属链接”就能一一击破&#xff…

qt多线程例子,不断输出数字

dialog.h #include "dialog.h" #include "ui_dialog.h"Dialog::Dialog(QWidget *parent) :QDialog(parent),ui(new Ui::Dialog) {ui->setupUi(this); }Dialog::~Dialog() {delete ui; }// 启动线程按钮 void Dialog::on_startButton_clicked() {//conn…

C#解析XML并反序列化为Model的方法

虽然现在json大行其道&#xff0c;但是xml格式依旧占据着广阔的编程世界&#xff0c;不管光伏锂电激光卫星汽车等等工业领域&#xff0c;基本上都是以xml为主&#xff0c;广大的.NET开发人员有很多被xml折磨的都要转java了&#xff0c;这篇小作文就来玩一种迅速完成xml到model的…

Hello Vue!

目录 前言 hello vue 为什么要new Vue(),而不能直接调用Vue()? Vue构造函数中的形参options template配置项 $mount()方法 前言 从此篇博客开始&#xff0c;将开启vue的学习&#xff0c;查缺补漏。 只要学计算机语言&#xff0c;那么hello xxx那一定是入门第一行代码了…

【深度学习】pytorch——Autograd

笔记为自我总结整理的学习笔记&#xff0c;若有错误欢迎指出哟~ 深度学习专栏链接&#xff1a; http://t.csdnimg.cn/dscW7 pytorch——Autograd Autograd简介requires_grad计算图没有梯度追踪的张量ensor.data 、tensor.detach()非叶子节点的梯度计算图特点总结 利用Autograd实…

chatGPT对英语论文怎么润色呢?

chatGPT对英语论文怎么润色呢&#xff1f; 回答1&#xff1a; 润色英语论文是一项重要的任务&#xff0c;它有助于提高论文的质量、语法准确性和清晰度。以下是一些关于如何润色英语论文的建议&#xff1a; 语法和拼写检查&#xff1a; 使用拼写和语法检查工具&#xff0c;如…

vmware虚拟机设置静态ip之后无法联网

今天在vmware虚拟机设置静态ip&#xff0c;设置静态ip之后无法联网&#xff08;ping&#xff09;&#xff0c;并且SecureCRT无法连接上虚拟机。 网卡参数配置没有问题&#xff0c;可是却发联网&#xff0c;ping网站也不通 显示未知的名称和服务&#xff0c;开始以为网管和DNS是…

注册虾皮买家号需要哪些资料?

注册虾皮买家号其实是很简单的&#xff0c;使用相应国家的手机号及对应的环境就可以注册了的&#xff0c;如果想要账号更方便使用&#xff0c;也可以绑定邮箱进行认证。 而如果想要使用shopee买家通系统进行自动化的注册&#xff0c;那么对于资料就有一定的要求了。 1、手机号…

flask参数校验自定义返回

parser RequestParser() parser.add_argument(name, typestr, requiredTrue, locationjson) args parser.parse_args()上面是默认情况&#xff0c;如果参数校验出错&#xff0c;会返回&#xff1a; {"message": {"name": "Missing required parame…

C之(10)CMocka-单元测试框架使用

CMocka基础使用 Author&#xff1a;Once Day Date&#xff1a;2023年6月15日 参考文档&#xff1a; GoogleTest User’s Guide | GoogleTest嵌入式自动化单元测试(2)-Cmocka - 知乎 (zhihu.com)使用 cmocka 进行单元测试 | 前尘逐梦 (qianchenzhumeng.github.io)cmocka - un…

C# Winform自定义点阵控件

1、创建点阵控件 在控件库添加用户控件&#xff08;Windows窗体&#xff09;&#xff0c;命名为MatrixArray&#xff1b; 在属性/布局栏将Size设置为680&#xff0c;700。 2、创建数据模型 using System; using System.Collections.Generic; using System.Linq; using System.…

Go语言条件语句

文章目录 1. if 语句:2. if-else 语句:3. if-else if-else 语句:4. switch 语句:5.select语句 Go语言提供了一些条件语句来实现不同的条件分支和决策逻辑。以下是Go语言中常用的条件语句&#xff1a; 1. if 语句: if 语句用于执行一个代码块&#xff0c;如果给定的条件为真&am…

【算法 | 模拟No.3】leetcode 38. 外观数列

个人主页&#xff1a;兜里有颗棉花糖 欢迎 点赞&#x1f44d; 收藏✨ 留言✉ 加关注&#x1f493;本文由 兜里有颗棉花糖 原创 收录于专栏【手撕算法系列专栏】【Leetcode】 &#x1f354;本专栏旨在提高自己算法能力的同时&#xff0c;记录一下自己的学习过程&#xff0c;希望…

微服务-grpc

微服务 一、微服务&#xff08;microservices&#xff09; 近几年,微服这个词闯入了我们的视线范围。在百度与谷歌中随便搜一搜也有几千万条的结果。那么&#xff0c;什么是微服务 呢&#xff1f;微服务的概念是怎么产生的呢&#xff1f; 我们就来了解一下Go语言与微服务的千丝…

RDS for Mysql 到云数据库GaussDB

前言 该实验旨在指导用户使用DRS将RDS MySQL上的数据迁移到 GaussDB中。 本实验涉及数据复制服务DRS&#xff08;Data Replication Service&#xff09;、关系型数据库服务RDS&#xff08;Relational Database Service&#xff09;、GaussDB、数据管理服务DAS&#xff08;Data…

金融学习资料维护库

诸神缄默不语-个人CSDN博文目录 金融那块也会用到很多计算机知识&#xff0c;所以学习金融的博文也可以放到技术博客里&#xff0c;这很河狸。 &#xff08;好吧其实主要是我写博客的主阵地在CSDN懒得挪窝了&#xff09; 文章目录 1. 术语金融产品基金 1. 术语 金融产品 基金…