8、Flink 在 source 处生成水位线 和 在 source 之后生成水位线案例

1、AtSourceGenerateWatermark
注意:从 Flink 1.17开始,FLIP-27 源框架支持拆分级别的水印对齐。

import java.time.Duration;public class _02_AtSourceGenerateWatermark {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();KafkaSource<String> kafkaSource = KafkaSource.<String>builder().setBootstrapServers("my-broker").setTopics("my-topic").setGroupId("my-group").setStartingOffsets(OffsetsInitializer.earliest()).setValueOnlyDeserializer(new SimpleStringSchema()).build();DataStreamSource<String> source = env.fromSource(kafkaSource, WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3)), "kafka_source", TypeInformation.of(new TypeHint<String>() {}));source.print();env.execute();}
}

2、在 source 之后生成水位线

import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import java.time.Duration;public class _03_AfterSourceGenerateWatermark {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<String> source = env.socketTextStream("localhost", 8888);SingleOutputStreamOperator<_01_MyEvent> eventMap = source.map(new MapFunction<String, _01_MyEvent>() {@Overridepublic _01_MyEvent map(String value) throws Exception {String[] fields = value.split(",");return new _01_MyEvent(Integer.parseInt(fields[0]),fields[1],Long.parseLong(fields[2]));}});SingleOutputStreamOperator<_01_MyEvent> timestampsAndWatermarks = eventMap.assignTimestampsAndWatermarks(WatermarkStrategy.<_01_MyEvent>forBoundedOutOfOrderness(Duration.ofSeconds(5)).withTimestampAssigner(new SerializableTimestampAssigner<_01_MyEvent>() {@Overridepublic long extractTimestamp(_01_MyEvent element, long recordTimestamp) {return element.getEventTime();}}));timestampsAndWatermarks.print();env.execute();}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/830364.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

摇杆控制电机

参考&#xff1a; 摇杆电位器控制步进电机正反转调速-Arduino中文社区 - Powered by Discuz! 一个基于树莓派和Python的无人机视觉跟踪系统_ 北漠苍狼的专栏(QQ:1746430162)-CSDN博客

数字化wms仓库管理软件,实现企业仓储信息共享与智慧运行-亿发

在经济飞速发展的今天&#xff0c;企业面临着客户需求多样化、质量和交期要求提高以及激烈的市场竞争等挑战。在这样的背景下&#xff0c;许多企业开始考虑采用数字化仓储WMS系统来解决这些问题。 数字化仓储WMS系统通过打造高效、规范的仓库管理体系&#xff0c;实现了对产品…

爱普生晶振在物联网LoRa通讯中的应用

LoRa 是LPWAN通信技术中的一种&#xff0c;是美国Semtech公司采用和推广的一种基于扩频技术的超远距离无线传输方案。这一方案改变了以往关于传输距离与功耗的折衷考虑方式&#xff0c;为用户提供一种简单的能实现远距离、长电池寿命、大容量的系统&#xff0c;进而扩展传感网络…

【Spring】2.Spring中Bean的生命周期管理及定义

在Spring框架中&#xff0c;Bean是一个被Spring IoC容器实例化、组装和管理的对象。Bean就是Spring应用中的对象&#xff0c;它们形成了Spring应用的骨干。Spring IoC容器负责创建Bean&#xff0c;配置Bean以及管理Bean的完整生命周期。 Bean的生命周期 实例化Bean&#xff1a…

神经网络高效训练:优化GPU受限环境下的大规模CSV数据处理指南

最近训练模型,需要加载wifi sci data 数据量特别大,直接干爆内存,训练也特别慢,快放弃了!随后冷静下来,然后靠着多年的经验,来进行层层优化,随诞生了这篇博客。 背景介绍 机器学习模型的训练通常需要大量的数据,尤其是对于深度神经网络模型。然而,当数据集非常庞大时…

网络之路29:三层链路聚合

正文共&#xff1a;1666 字 17 图&#xff0c;预估阅读时间&#xff1a;3 分钟 目录 网络之路第一章&#xff1a;Windows系统中的网络 0、序言 1、Windows系统中的网络1.1、桌面中的网卡1.2、命令行中的网卡1.3、路由表1.4、家用路由器 网络之路第二章&#xff1a;认识企业设备…

wow_iot模块说明

wow_iot模块说明 wow_iot模块主要用于系统关联与基础接口封装库的实现&#xff0c;以供其它库文件与可执行文件调用&#xff0c;主要涉及algorith算法模块、config配置模块、database数据库模块、plugin插件模块、encode编码模块、encrypt加密模块、hash哈希模块、protocol协议…

愚安科技安全工程师面经:

1自我介绍 2讲项目经历 3“”符号&#xff08;反单引号&#xff09;在PHP语言以及SQL语言中的作用分别是什么 4Java中的反射有什么作用 5Java反序列化的基本原理 6 SSRF漏洞有什么漏洞利用思路&#xff1f; 7 利用XSS漏洞可以达到什么效果&#xff1f;有哪些防御XSS漏洞的手段/…

新质生产力实践,我用chatgpt开发网站

是的&#xff0c;我用chatgpt开发了一个网站&#xff0c;很轻松。 我之前一点不懂前端&#xff0c;也没有网站开发的代码基础&#xff0c;纯正的0基础。 从0开始到最后成品上线&#xff0c;时间总计起来大致一共花了2-3周的时间。 初始想法我是想给我公司开发一个网站&#…

【弱监督语义分割】AllSpark:从transformer中的未标记特征重生标记特征,用于半监督语义分割

AllSpark: Reborn Labeled Features from Unlabeled in Transformer for Semi-Supervised Semantic Segmentation 摘要&#xff1a; 目前最先进的方法是用真实标签训练标注数据&#xff0c;用伪标签训练未标注数据。然而&#xff0c;这两个训练流程是分开的&#xff0c;这就使…

mybatis - XxxMapper.java接口中方法的参数 和 返回值类型,怎样在 XxxMapper.xml 中配置的问题

这个例子中的mybatis-config.xml文件&#xff0c;引用这个文件即可 实体类src/main/java/com.atguigu.pojo/Employee.java package com.atguigu.pojo;public class Employee {private Integer id;private String name;private String plone;public Integer getId() {return i…

Android数据恢复:如何在手机上恢复丢失的文件和照片

我们都有 我们错误地从手机中删除重要内容的时刻。确实如此 不一定是我们的错。其他人可以对您的手机数据执行此操作 有意或无意。这在某个时间点发生在我们所有人身上。 但是&#xff0c;今天市场上有各种各样的软件可以 帮助恢复已删除的文件。这些类型的软件被归类为数据恢复…

OceanBase 分布式数据库【信创/国产化】- OceanBase 数据库整体架构

本心、输入输出、结果 文章目录 OceanBase 分布式数据库【信创/国产化】- OceanBase 数据库整体架构前言OceanBase 数据更新架构OceanBase 数据库采用 Shared-Nothing 架构OceanBase 分布式数据库【信创/国产化】- OceanBase 数据库整体架构 编辑 | 简简单单 Online zuozuo 地址…

linux 修改hosts文件新增域名映射

1、 切换到root账号 su - root 2、 输入root密码 xxxxxx 3、 进入hosts文件 vi /etc/hosts 4、 输入i进入新增模式 i 5、写入新的 127.0.0.1 rm-8123456789k7x6zr6.mysql.sss 6、保存退出 按下esc &#xff08;确保你在命令模式&#xff1a;按下Esc键确保你处于正…

Pandas数据可视化 - Matplotlib、Seaborn、Pandas Plot、Plotly

可视化工具介绍 让我们一起探讨Matplotlib、Seaborn、Pandas Plot和Plotly这四个数据可视化库的优缺点以及各自的适用场景。这有助于你根据不同的需求选择合适的工具。 1. Matplotlib 优点: 功能强大&#xff1a;几乎可以用于绘制任何静态、动画和交互式图表。高度可定制&a…

机器学习-- 爬虫IntelliScraper 重大更新说明

IntelliScraper &#x1f577;️ 地址&#xff1a;IntelliScraper 介绍 &#x1f31f; IntelliScraper 是一个高级的Python网络抓取项目&#xff0c;专为精确解析HTML内容和特征匹配而设计&#xff0c;用于从特定网页提取关键信息。该项目利用了如BeautifulSoup和scikit-le…

用OpenCV先去除边框线,以提升OCR准确率

在OpenCV的魔力下&#xff0c;我们如魔法师般巧妙地抹去表格的边框线&#xff0c;让文字如诗如画地跃然纸上。 首先&#xff0c;我们挥动魔杖&#xff0c;将五彩斑斓的图像转化为单一的灰度世界&#xff0c;如同将一幅绚丽的油画化为水墨画&#xff0c;通过cv2.cvtColor()函数的…

寝室快修|基于SprinBoot+vue的贵工程寝室快修小程序(源码+数据库+文档)

贵工程寝室快修目录 目录 基于SprinBootvue的贵工程寝室快修小程序 一、前言 二、系统设计 三、系统功能设计 1学生信息管理 2 在线报修管理 3公告信息管理 4论坛信息管理 四、数据库设计 五、核心代码 六、论文参考 七、最新计算机毕设选题推荐 八、源码获取&a…

结构方程模型【SEM】:非线性、非正态、交互作用及分类变量分析

张老师&#xff08;研究员&#xff09;&#xff0c;长期从事R语言结构方程模型、群落生态学、保护生物学、景观生态学和生态模型方面的研究和教学工作&#xff0c;已发表了多篇论文&#xff0c;拥有丰富的科研及实践经验。 利用结构方程模型建模往往遇到很多‘特殊’情况&…

CDA一级备考策略分享

但对于很多考生来说&#xff0c;没有备考经验&#xff0c;不知道应该如何备考&#xff1f;今天&#xff0c;我来指导大家应该如何备考&#xff0c;让大家充分准备&#xff0c;拿下CDA考试。在CDA考试大纲中为新考生讲解备考经验一下。 1、数据分析概述与职业操守、数据结构 考…