Flink Window DEMO 学习

该文档演示了fink windows的操作DEMO

环境准备:

  • kafka本地运行:kafka部署
  • 自动生成名字代码:随机名
  • 自动生成随机IP代码:随机IP
  • Flink 1.18

测试数据

自动向kafka推送数据

import cn.hutool.core.date.DateUtil;
import com.alibaba.fastjson2.JSONObject;
import com.wfg.flink.example.dto.KafkaPvDto;
import com.wfg.flink.example.utils.RandomGeneratorUtils;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;import java.time.LocalDateTime;
import java.util.Properties;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;import static com.wfg.flink.example.constants.Constants.KAFKA_BROKERS;
import static com.wfg.flink.example.constants.Constants.TOPIC_NAME;public class KafkaTestProducer {public static void main(String[] args) {Properties props = new Properties();props.put("bootstrap.servers", KAFKA_BROKERS);props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");try (Producer<String, String> producer = new KafkaProducer<>(props)) {int times = 100000;for (int i = 0; i < times; i++) {System.out.println("Send No. :" + i);CompletableFuture.allOf(CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),CompletableFuture.runAsync(() -> sendKafkaMsg(producer)),CompletableFuture.runAsync(() -> sendKafkaMsg(producer))).join();producer.flush();Random random = new Random();int randomNumber = random.nextInt(7); // 生成一个0到6的随机数Thread.sleep(1000 * randomNumber);}} catch (InterruptedException e) {throw new RuntimeException(e);}}private static void sendKafkaMsg(Producer<String, String> producer) {String msg = createMsg();System.out.println(msg);producer.send(new ProducerRecord<>(TOPIC_NAME, UUID.randomUUID().toString().replaceAll("-", ""), msg));}private static String createMsg() {KafkaPvDto dto = new KafkaPvDto();dto.setUuid(UUID.randomUUID().toString().replaceAll("-", ""));dto.setUserName(RandomGeneratorUtils.generateRandomFullName());dto.setVisitIp(RandomGeneratorUtils.generateRandomIp());
//        DateTime begin = DateUtil.beginOfDay(new Date());
//        String timeStr = DateUtil.format(RandomGeneratorUtils.generateRandomDateTime(LocalDateTimeUtil.of(begin).toLocalDate(), LocalDate.now()), "yyyy-MM-dd HH:mm:ss");String timeStr = DateUtil.format(LocalDateTime.now(), "yyyy-MM-dd HH:mm:ss");dto.setVisitTime(timeStr);dto.setVisitServiceIp(RandomGeneratorUtils.generateRandomIp());return JSONObject.toJSONString(dto);}
}

注意:

  • kafka本地运行:kafka部署
  • 自动生成名字代码:随机名
  • 自动生成随机IP代码:随机IP

FLINK 数据


/**** @author wfg*/
@Slf4j
public class DataSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {@Overridepublic void flatMap(String value, Collector<Tuple2<String, Integer>> collector) {KafkaPvDto data = JSONObject.parseObject(value, KafkaPvDto.class);if (data != null) {collector.collect(new Tuple2<>(data.getUserName(), 1));}}
}

基于时间窗口

*** Desc: Flink Window 学习*/
@Slf4j
public class WindowsDemo {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();String brokers = "localhost:9092";KafkaSource<String> source = KafkaSource.<String>builder().setBootstrapServers(brokers).setTopics(TOPIC_NAME).setGroupId("my-group").setStartingOffsets(OffsetsInitializer.earliest()).setValueOnlyDeserializer(new SimpleStringSchema()).build();DataStreamSource<String> data = env.fromSource(source, WatermarkStrategy.noWatermarks(), "wfgxxx");//基于时间窗口data.flatMap(new DataSplitter()).keyBy(1).timeWindow(Time.seconds(30)).sum(0).print();*/env.execute("flink window example");}
}

基于滑动时间窗口

/*** Desc: Flink Window 学习*/
@Slf4j
public class WindowsDemo {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();String brokers = "localhost:9092";KafkaSource<String> source = KafkaSource.<String>builder().setBootstrapServers(brokers).setTopics(TOPIC_NAME).setGroupId("my-group").setStartingOffsets(OffsetsInitializer.earliest()).setValueOnlyDeserializer(new SimpleStringSchema()).build();DataStreamSource<String> data = env.fromSource(source, WatermarkStrategy.noWatermarks(), "wfgxxx");//基于滑动时间窗口data.flatMap(new DataSplitter()).keyBy(1).timeWindow(Time.seconds(60), Time.seconds(30)).sum(0).print();env.execute("flink window example");}
}

基于事件数量窗口

/*** Desc: Flink Window 学习*/
@Slf4j
public class WindowsDemo {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();String brokers = "localhost:9092";KafkaSource<String> source = KafkaSource.<String>builder().setBootstrapServers(brokers).setTopics(TOPIC_NAME).setGroupId("my-group").setStartingOffsets(OffsetsInitializer.earliest()).setValueOnlyDeserializer(new SimpleStringSchema()).build();DataStreamSource<String> data = env.fromSource(source, WatermarkStrategy.noWatermarks(), "wfgxxx");//基于事件数量窗口data.flatMap(new DataSplitter()).keyBy(1).countWindow(3).sum(0).print();env.execute("flink window example");}
}

基于事件数量滑动窗口

/*** Desc: Flink Window 学习*/
@Slf4j
public class WindowsDemo {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();String brokers = "localhost:9092";KafkaSource<String> source = KafkaSource.<String>builder().setBootstrapServers(brokers).setTopics(TOPIC_NAME).setGroupId("my-group").setStartingOffsets(OffsetsInitializer.earliest()).setValueOnlyDeserializer(new SimpleStringSchema()).build();DataStreamSource<String> data = env.fromSource(source, WatermarkStrategy.noWatermarks(), "wfgxxx");//基于事件数量滑动窗口data.flatMap(new DataSplitter()).keyBy(1).countWindow(4, 3).sum(0).print();*env.execute("flink window example");}
}

基于会话时间窗口

/*** Desc: Flink Window 学习*/
@Slf4j
public class WindowsDemo {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();String brokers = "localhost:9092";KafkaSource<String> source = KafkaSource.<String>builder().setBootstrapServers(brokers).setTopics(TOPIC_NAME).setGroupId("my-group").setStartingOffsets(OffsetsInitializer.earliest()).setValueOnlyDeserializer(new SimpleStringSchema()).build();DataStreamSource<String> data = env.fromSource(source, WatermarkStrategy.noWatermarks(), "wfgxxx");//基于会话时间窗口data.flatMap(new DataSplitter()).keyBy(v->v.f0).window(ProcessingTimeSessionWindows.withGap(Time.seconds(5)))//表示如果 5s 内没出现数据则认为超出会话时长,然后计算这个窗口的和.sum(1).print();env.execute("flink window example");}
}

滚动窗口(Tumbling Window)

滚动窗口(Tumbling Window)

/*** Desc: Flink Window 学习*/
@Slf4j
public class WindowsDemo {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();String brokers = "localhost:9092";KafkaSource<String> source = KafkaSource.<String>builder().setBootstrapServers(brokers).setTopics(TOPIC_NAME).setGroupId("my-group").setStartingOffsets(OffsetsInitializer.earliest()).setValueOnlyDeserializer(new SimpleStringSchema()).build();DataStreamSource<String> data = env.fromSource(source, WatermarkStrategy.noWatermarks(), "wfgxxx");//滚动窗口(Tumbling Window) 基于处理时间的 30 秒滚动窗口data.flatMap(new DataSplitter()).keyBy(v->v.f0).window(TumblingProcessingTimeWindows.of(Time.seconds(30))).sum(1).print();;env.execute("flink window example");}
}

基于事件时间

/*** Desc: Flink Window 学习*/
@Slf4j
public class WindowsDemo {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();String brokers = "localhost:9092";KafkaSource<String> source = KafkaSource.<String>builder().setBootstrapServers(brokers).setTopics(TOPIC_NAME).setGroupId("my-group").setStartingOffsets(OffsetsInitializer.earliest()).setValueOnlyDeserializer(new SimpleStringSchema()).build();DataStreamSource<String> data = env.fromSource(source, WatermarkStrategy.noWatermarks(), "wfgxxx");// 基于事件时间的 30 秒滚动窗口data.flatMap(new DataSplitter()).keyBy(v->v.f0).assignTimestampsAndWatermarks(/* 分配时间戳和水印 */).window(TumblingEventTimeWindows.of(Time.seconds(30))).sum(1).print();env.execute("flink window example");}
}

滑动窗口(Sliding Window)

基于处理时间

/*** Desc: Flink Window 学习*/
@Slf4j
public class WindowsDemo {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();String brokers = "localhost:9092";KafkaSource<String> source = KafkaSource.<String>builder().setBootstrapServers(brokers).setTopics(TOPIC_NAME).setGroupId("my-group").setStartingOffsets(OffsetsInitializer.earliest()).setValueOnlyDeserializer(new SimpleStringSchema()).build();DataStreamSource<String> data = env.fromSource(source, WatermarkStrategy.noWatermarks(), "wfgxxx");// 基于处理时间的 30 秒滑动窗口,滑动间隔为 10 秒data.flatMap(new DataSplitter()).keyBy(v->v.f0).window(SlidingProcessingTimeWindows.of(Time.seconds(30), Time.seconds(10))).sum(1).print();env.execute("flink window example");}
}

基于事件时间

/*** Desc: Flink Window 学习*/
@Slf4j
public class WindowsDemo {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();String brokers = "localhost:9092";KafkaSource<String> source = KafkaSource.<String>builder().setBootstrapServers(brokers).setTopics(TOPIC_NAME).setGroupId("my-group").setStartingOffsets(OffsetsInitializer.earliest()).setValueOnlyDeserializer(new SimpleStringSchema()).build();DataStreamSource<String> data = env.fromSource(source, WatermarkStrategy.noWatermarks(), "wfgxxx");// 基于事件时间的 30 秒滑动窗口,滑动间隔为 10 秒  data.flatMap(new DataSplitter()).keyBy(v->v.f0).assignTimestampsAndWatermarks(/* 分配时间戳和水印 */).window(SlidingEventTimeWindows.of(Time.seconds(30), Time.seconds(10))).sum(1).print();env.execute("flink window example");}
}

注意:

  • kafka本地运行:kafka部署
  • 自动生成名字代码:随机名
  • 自动生成随机IP代码:随机IP

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/37692.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

技术赋能教育:校园3D电子地图与AR导航解决方案

随着高考的落幕&#xff0c;又一批新鲜血液即将注入大学校园。面对陌生的环境&#xff0c;如何快速适应、准确找到目标地点&#xff0c;成为新生们的一大难题。同时&#xff0c;对于学校而言&#xff0c;如何向报考人员直观展示校园环境&#xff0c;提供沉浸式参观体验&#xf…

Mybatis-Plus学习|快速入门CRUD、主键生成策略(雪花算法、主键自增等)、自动填充、乐观锁、分页插件、逻辑删除

MyBatisPlus概述 为什么要学习它呢?MyBatisPlus可以节省我们大量工作时间&#xff0c;所有的CRUD代码它都可以自动化完成! JPA、tk-mapper、MyBatisPlus 偷懒的! MyBatis-Plus(简称 MP)是一个 MyBatis 的增强工具&#xff0c;在 MyBatis 的基础上只做增强不做改变&#xff…

Pytorch学习之torch.nn.functional.pad()函数

PyTorch学习之torch.nn.functional.pad函数 一、简介 torch.nn.functional.pad 是 PyTorch 中用于对张量进行填充操作的函数。填充操作在处理图像、序列数据等任务时非常常见&#xff0c;它可以在张量的指定维度两端添加一定数量的元素&#xff0c;填充方式多样&#xff0c;包…

Git的基本使用方法

Git的基本使用方法 大家好&#xff0c;我是免费搭建查券返利机器人省钱赚佣金就用微赚淘客系统3.0的小编&#xff0c;也是冬天不穿秋裤&#xff0c;天冷也要风度的程序猿&#xff01;今天我们将深入探讨Git的基本使用方法&#xff0c;Git作为目前最流行的版本控制系统之一&…

Day 48 消息队列集群RabbitMQ

消息队列集群-RabbitMQ 一、消息中间件 中间件 tomcat java web中间件 web容器 mysql php php mysql uwsgi python mysql mycat 数据库中间件 rabbitMQ 消息中间件 1、简介 MQ 全称为&#xff08;Message Queue消息队列&#xff09;。是一种应用程序对应用程序的通信方…

【全球首个开源AI数字人】DUIX数字人-打造你的AI伴侣!

目录 1. 引言1.1 数字人技术的发展背景1.2 DUIX数字人项目的开源意义1.3 DUIX数字人技术的独特价值1.4 本文目的与结构 2. DUIX数字人概述2.1 定义与核心概念2.2 硅基智能与DUIX的关系2.3 技术架构2.4 开源优势2.5 应用场景2.6 安全与合规性 3. DUIX数字人技术特点3.1 开源性与…

【Java Gui精美界面】IDEA安装及配置SwingX

SwingX 是一个基于 Swing 的 Java GUI 库&#xff0c;旨在为 Swing 提供额外的功能和丰富的组件 特点描述基于 Swing继承了 Swing 的所有特性和功能。丰富组件SwingX 提供了一组高级 UI 组件&#xff0c;例如 TreeTable仍在发展中不活跃的发展ing。。。支持搜索高亮如 TreeTab…

【分布式系列】分布式锁的设计与实现

&#x1f49d;&#x1f49d;&#x1f49d;欢迎来到我的博客&#xff0c;很高兴能够在这里和您见面&#xff01;希望您在这里可以感受到一份轻松愉快的氛围&#xff0c;不仅可以获得有趣的内容和知识&#xff0c;也可以畅所欲言、分享您的想法和见解。 推荐:kwan 的首页,持续学…

steam社区加载异常、加载失败、无法加载、黑屏的解决方法

随着steam夏季特卖的临近&#xff0c;最近几天开启史低折扣的大作已经越来越少了&#xff0c;不过也并不是没有。最经典的知名大作文明6之前已经打到1折的骨折价了&#xff0c;没想到也能背刺&#xff0c;现在是新史低价0.5折11元&#xff0c;很多玩家入手后纷纷前往社区看新手…

ZABBIX-7.0LTS在线部署部署教程

ZABBIX-7.0LTS在线部署部署教程 环境&#xff1a; 操作系统&#xff1a; ubuntu 22.04zabbix-server版本&#xff1a; 7.0LTS系统配置[需结合监控的业务量提供配置]&#xff1a; 建议2C(CPU)8G(运行) 100GB(存储)架构&#xff1a;LNMP 第一步&#xff1a; 系统初始化 1.配置…

计算机网络知识整理笔记

目录 1.对网络协议的分层&#xff1f; 2.TCP/IP和UDP之间的区别&#xff1f; 3.建立TCP连接的三次握手&#xff1f; 4.断开TCP连接的四次挥手&#xff1f; 5.TCP协议如何保证可靠性传输&#xff1f; 6.什么是TCP的拥塞控制&#xff1f; 7.什么是HTTP协议&#xff1f; 8…

MySQL InnoDB支持几种行格式

数据库表的行格式决定了一行数据是如何进行物理存储的&#xff0c;进而影响查询和DML操作的性能。 在InnoDB中&#xff0c;常见的行格式有4种&#xff1a; 1、COMPACT&#xff1a;是MySQL 5.0之前的默认格式&#xff0c;除了保存字段值外&#xff0c;还会利用空值列表保存null…

快速傅里叶变换(Fast Fourier Transform,FFT)

快速傅里叶变换&#xff08;Fast Fourier Transform&#xff0c;FFT&#xff09;是一种算法&#xff0c;用于快速计算离散傅里叶变换&#xff08;DFT&#xff09;及其逆变换。傅里叶变换将时间或空间域的信号转换为频率域的信号&#xff0c;便于分析信号的频率特性。FFT显著提高…

动手学深度学习(Pytorch版)代码实践 -卷积神经网络-20填充与步幅

20填充与步幅 import torch from torch import nn# 此函数初始化卷积层权重&#xff0c;并对输入和输出提高和缩减相应的维数 def comp_conv2d(conv2d, X):# 这里的&#xff08;1&#xff0c;1&#xff09;表示批量大小和通道数都是1#将输入张量 X 的形状调整为 (1, 1, height,…

Grafana-11.0.0 在线部署教程

Grafana-11.0.0 在线部署教程 环境&#xff1a; 操作系统&#xff1a; ubuntugrafana版本&#xff1a; 11.0.0 &#xff08;建议不要按照最新版&#xff09;grafana要求的系统配置不高&#xff0c;建议直接部署在监控服务器上&#xff0c;比如zabbix服务器、prometheus服务器…

从菌群代谢到健康影响——认识肠道丙酸和丁酸

谷禾健康 短链脂肪酸这一词经常出现在谷禾的文章和报告中&#xff0c;那你真的了解短链脂肪酸吗&#xff1f;短链脂肪酸(SCFA)主要是肠道微生物群在结肠内通过发酵碳水化合物(包括膳食和内源性碳水化合物&#xff0c;主要是抗性淀粉和膳食纤维)和一些微生物可利用的蛋白质而产生…

光线追踪:原理与实现

版权声明 本文为“优梦创客”原创文章&#xff0c;您可以自由转载&#xff0c;但必须加入完整的版权声明文章内容不得删减、修改、演绎本文视频版本&#xff1a;见文末 各位同学大家好&#xff0c;今天我要给大家分享的是光线追踪的原理和实现大家知道在过往很多年里面&#x…

超简单的nodejs使用log4js保存日志到本地(可直接复制使用)

引入依赖 npm install log4js 新建配置文件logUtil.js const log4js require(log4js);// 日志配置 log4js.configure({appenders: {// 控制台输出consoleAppender: { type: console },// 文件输出fileAppender: {type: dateFile,filename: ./logs/default, //日志文件的存…

如何从0构建一款类似pytest的工具

Pytest主要模块 Pytest 是一个强大且灵活的测试框架&#xff0c;它通过一系列步骤来发现和运行测试。其核心工作原理包括以下几个方面&#xff1a;测试发现&#xff1a;Pytest 会遍历指定目录下的所有文件&#xff0c;找到以 test_ 开头或 _test.py 结尾的文件&#xff0c;并且…

python 实例002 - 数据转换

题目&#xff1a; 有一组用例数据如下&#xff1a; cases [[case_id, case_title, url, data, excepted],[1, 用例1, www.baudi.com, 001, ok],[4, 用例4, www.baudi.com, 002, ok],[2, 用例2, www.baudi.com, 002, ok],[3, 用例3, www.baudi.com, 002, ok],[5, 用例5, www.ba…