Flink nc -l -p 监听端口测试

1、9999端口未占用

netstat -apn|grep 9999

2、消息发送端

nc -l -k -p 9999
{"user":"ming","url":"www.baidu1.com", "timestamp":1200L, "score":1}
{"user":"xiaohu","url":"www.baidu5.com","timestamp":1267L, "score":10}
{"user":"ming","url":"www.baidu7.com","timestamp":4200L, "score":9}
{"user":"xiaohu","url":"www.baidu8.com","timestamp":5500L, "score":90}
{"user":"Biu","url":"www.baidu8.com","timestamp":5500L, "score":1000}{"user":"ming","url":"www.baidu1.com", "timestamp":1717171200000, "score":1}
{"user":"xiaohu","url":"www.baidu5.com","timestamp":1717171202000, "score":10}
{"user":"ming","url":"www.baidu7.com","timestamp":1717171260000, "score":9}
{"user":"xiaohu","url":"www.baidu8.com","timestamp":1717264860000, "score":90}
{"user":"Biu","url":"www.baidu8.com","timestamp":1718780790000, "score":1000}

3、运行

周期性水位线

import com.alibaba.fastjson2.JSONObject;
import org.apache.flink.api.common.eventtime.*;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import java.sql.Timestamp;
import java.util.ArrayList;/*** Description: * forMonotonousTimestamps->AscendingTimestampsWatermarks 有序流 -> 自定义断点式水位线(周期延迟时间=0ms)\* forBoundedOutOfOrderness->BoundedOutOfOrdernessWatermarks 无序流 -> 自定义周期性水位线*/
public class FlinkPeriodicWatermarkGeneratorTestJob {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//        ArrayList<Event> list = new ArrayList<>();
//        list.add(new Event("ming","www.baidu1.com",1200L));
//        list.add(new Event("xiaohu","www.baidu5.com",1267L));
//        list.add(new Event("ming","www.baidu7.com",4200L));
//        list.add(new Event("xiaohu","www.baidu8.com",5500L));
//
//        DataStreamSource<Event> ds = env.fromCollection(list, BasicTypeInfo.of(Event.class));DataStreamSource<String> dss = env.socketTextStream("test002", 9999);SingleOutputStreamOperator<Event> ds = dss.map(new MapFunction<String, Event>() {@Overridepublic Event map(String value) throws Exception {Event event = new Event();event.toEvent(value);return event;}});
//        ds.print();SingleOutputStreamOperator<Event> watermarks = ds// AscendingTimestampsWatermarks 有序流 查看源码,实际上是延迟时间=0ms的乱序流
//                .assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forMonotonousTimestamps()// BoundedOutOfOrdernessWatermarks 无序流 5ms固定延迟时间/表示最大乱序程度 处理乱序流数据.assignTimestampsAndWatermarks(new WatermarkStrategy<Event>() {@Overridepublic TimestampAssigner<Event> createTimestampAssigner(TimestampAssignerSupplier.Context context) {return new SerializableTimestampAssigner<Event>() {@Overridepublic long extractTimestamp(Event element, long recordTimestamp) {return element.getTimestamp();}};}@Overridepublic WatermarkGenerator<Event> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {return new WatermarkGenerator<Event>() {private Long delayTime = 5000L; // 延迟时间private Long maxTs = Long.MIN_VALUE + delayTime + 1L;@Overridepublic void onEvent(Event event, long eventTimestamp, WatermarkOutput output) {// 每来一条数据就调用一次maxTs = Math.max(event.timestamp, maxTs);// 更新最大时间戳}@Overridepublic void onPeriodicEmit(WatermarkOutput output) {// 发射水位线,默认 200ms 调用一次 可以使用 env.getConfig().setAutoWatermarkInterval(60 * 1000L); 调整周期时间 flink时间窗口(左开,右闭]output.emitWatermark(new Watermark(maxTs - delayTime - 1L));}};}});ds.print();env.setParallelism(1);env.execute();}public static class Event{String user;String url;Long timestamp;public Event(){}public Event(String user, String url, Long timestamp) {this.user = user;this.url = url;this.timestamp = timestamp;}public String getUser() {return user;}public String getUrl() {return url;}public Long getTimestamp() {return timestamp;}@Overridepublic String toString() {return "Event{" +"user='" + user + '\'' +", url='" + url + '\'' +", timestamp=" + new Timestamp(timestamp) +'}';}public void toEvent(String val){JSONObject js = JSONObject.parseObject(val);this.user = js.getString("user");this.url = js.getString("url");this.timestamp = js.getLong("timestamp");}}
}

断点式水位线

import com.alibaba.fastjson2.JSONObject;
import org.apache.flink.api.common.eventtime.*;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import java.sql.Timestamp;
import java.util.ArrayList;/*** Description: * forMonotonousTimestamps->AscendingTimestampsWatermarks 有序流 -> 自定义断点式水位线(周期延迟时间=0ms)\* forBoundedOutOfOrderness->BoundedOutOfOrdernessWatermarks 无序流 -> 自定义周期性水位线*/
public class FlinkPunctuatedWatermarkGeneratorTestJob {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<String> dss = env.socketTextStream("test002", 9999);SingleOutputStreamOperator<Event> ds = dss.map(new MapFunction<String, Event>() {@Overridepublic Event map(String value) throws Exception {Event event = new Event();event.toEvent(value);return event;}});
//        ds.print();SingleOutputStreamOperator<Event> watermarks = ds// AscendingTimestampsWatermarks 有序流 查看源码,实际上是延迟时间=0ms的乱序流
//                .assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forMonotonousTimestamps()// BoundedOutOfOrdernessWatermarks 无序流 5ms固定延迟时间/表示最大乱序程度 处理乱序流数据.assignTimestampsAndWatermarks(new WatermarkStrategy<Event>() {@Overridepublic TimestampAssigner<Event> createTimestampAssigner(TimestampAssignerSupplier.Context context) {return new SerializableTimestampAssigner<Event>() {@Overridepublic long extractTimestamp(Event element, long recordTimestamp) {return element.getTimestamp();}};}@Overridepublic WatermarkGenerator<Event> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {return new WatermarkGenerator<Event>() {@Overridepublic void onEvent(Event event, long eventTimestamp, WatermarkOutput output) {// 只有在遇到特定的 itemId 时,才发出水位线if (event.getUser().equals("Biu")) {output.emitWatermark(new Watermark(event.getTimestamp() - 1));}}@Overridepublic void onPeriodicEmit(WatermarkOutput output) {// 不需要做任何事情,因为我们在 onEvent 方法中发射了水位线}};}});ds.print();env.setParallelism(1);env.execute();}public static class Event{String user;String url;Long timestamp;public Event(){}public Event(String user, String url, Long timestamp) {this.user = user;this.url = url;this.timestamp = timestamp;}public String getUser() {return user;}public String getUrl() {return url;}public Long getTimestamp() {return timestamp;}@Overridepublic String toString() {return "Event{" +"user='" + user + '\'' +", url='" + url + '\'' +", timestamp=" + new Timestamp(timestamp) +'}';}public void toEvent(String val){JSONObject js = JSONObject.parseObject(val);this.user = js.getString("user");this.url = js.getString("url");this.timestamp = js.getLong("timestamp");}}
}

4、打印

3> Event{user='ming', url='www.baidu1.com', timestamp=1970-01-01 08:00:01.2}
4> Event{user='xiaohu', url='www.baidu5.com', timestamp=1970-01-01 08:00:01.267}
5> Event{user='ming', url='www.baidu7.com', timestamp=1970-01-01 08:00:04.2}
6> Event{user='xiaohu', url='www.baidu8.com', timestamp=1970-01-01 08:00:05.5}

参考:

【Flink】Flink 中的时间和窗口之水位线(Watermark)-CSDN博客

Flink watermark_nc -lp 9999-CSDN博客

NoteWarehouse/05_BigData/09_Flink(1).md at main · FGL12321/NoteWarehouse · GitHub

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/857491.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

GIT 合拼

合拼有多种方式&#xff1a; 1&#xff09;合拼分支&#xff1a; git merge [source-branch] 2&#xff09;合拼提交 &#xff1a; git cherry-pick [commit-hash] 3&#xff09;合拼单个文件&#xff1a; git checkout [source-branch] – [file] 以上合拼&#xff0c;比如将分…

qml:一个基础的界面设计

文章目录 1、文章说明2、效果图3、重要代码说明3.1 组件切换开关下拉框矩形卡片 3.2 窗口最大化后组件全部居中3.3 菜单栏3.4 Repeater实现重复8行3.5 图片加载直接加载图片文本转图片FluentUI中可供选择的图标 1、文章说明 qt6.5.3 qml写的一个界面配置设计软件&#xff0c;目…

docker in docker 连私有仓库时报错 https

背景 jenkins 是使用 docker 方式部署的, 在 jenkins中又配置了 docker 的命令, 使用的宿主机的 docker 环境, 在jenkins 中执行 docker 相关命令的时候报错 jenkinse0e7b943b6e4:/$ docker login -u admin -p Harbor12345 172.16.100.15:80 WARNING! Using --password via t…

小白科普篇:详解Java对象的强引用、软引用、弱引用和虚引用

在Java中&#xff0c;有四种类型的引用&#xff0c;它们定义了对象被垃圾收集器(GC)处理的不同时机。这四种引用分别是&#xff1a; 强引用&#xff08;Strong Reference&#xff09;软引用&#xff08;Soft Reference&#xff09;弱引用&#xff08;Weak Reference&#xff0…

算法设计与分析:动态规划法求扔鸡蛋问题 C++

目录 一、实验目的 二、问题描述 三、实验要求 四、算法思想和实验结果 1、动态规划法原理&#xff1a; 2、解决方法&#xff1a; 2.1 方法一&#xff1a;常规动态规划 2.1.1 算法思想&#xff1a; 2.1.2 时间复杂度分析 2.1.3 时间效率分析 2.2 方法二&#xff1a;动态规划加…

Java面试题:解释Java的类加载过程,包括加载、链接和初始化阶段

Java的类加载过程是将类从其二进制表示&#xff08;通常是一个 .class 文件&#xff09;加载到JVM中并准备使用的过程。这个过程分为三个主要阶段&#xff1a;加载&#xff08;Loading&#xff09;、链接&#xff08;Linking&#xff09;和初始化&#xff08;Initialization&am…

python数据分析-糖尿病数据集数据分析预测

一、研究背景和意义 糖尿病是美国最普遍的慢性病之一&#xff0c;每年影响数百万美国人&#xff0c;并对经济造成重大的经济负担。糖尿病是一种严重的慢性疾病&#xff0c;其中个体失去有效调节血液中葡萄糖水平的能力&#xff0c;并可能导致生活质量和预期寿命下降。。。。 …

若依框架集成微信支付

1. 添加微信支付相关依赖 <!-- 微信支付 --> <dependency><groupId>com.github.wxpay</groupId><artifactId>wxpay-sdk</artifactId><version>0.0.3</version> </dependency> <dependency><groupId>com.gi…

微信小程序开发---自定义底部tabBar

自定义tabBar注意事项&#xff1a; 在自定义 tabBar 模式下 &#xff0c;为了保证低版本兼容以及区分哪些页面是 tab 页&#xff0c;app.json文件中 tabBar 的相关配置项需完整声明&#xff0c;但这些字段不会作用于自定义 tabBar 的渲染。所有 tabBar 的样式都由该自定义组件…

x86 汇编中的 “lock“ 指令详解

在深入理解 “lock” 指令之前&#xff0c;我们先来看一下 Qt 源代码中的一段 x86 汇编代码&#xff1a; q_atomic_increment:movl 4(%esp), %ecxlock incl (%ecx)mov $0,%eaxsetne %alret.align 4,0x90.type q_atomic_increment,function.size q_atomic_increment,.-q_atom…

网工常见面试题

1-10题 1.介绍TCP/IP四层、五层模型作用及每层包含的协议 TCP/IP四层模型 应用层&#xff1a; 作用&#xff1a;直接与应用程序交互&#xff0c;定义了应用程序如何通过网络发送数据。包含协议&#xff1a;HTTP&#xff08;网页浏览&#xff09;、FTP&#xff08;文件传输&…

Linux学习笔记:前言与操作系统的初识【1】

前言 为什么学习Linux 作为当下最流行的操作系统之一&#xff0c;学会如何使用和操作Linux操作系统也就是每位计算机学者的看家必备技能了。其次呢&#xff0c;本人受Linux的创始人林纳斯的影响太深了&#xff0c;觉得这个人太了不起了&#xff0c;而且人家大学里就自研开发出…

javascript的toFixed()以及使用

toFixed() 是 JavaScript 中数字类型&#xff08;Number&#xff09;的一个方法&#xff0c;用来将数字转换为指定小数位数的字符串表示形式。 使用方式和示例&#xff1a; let num 123.45678; let fixedNum num.toFixed(2); console.log(fixedNum); // 输出 "123.46&qu…

【Gradio】构建自定义多模态聊天机器人

这是我们构建自定义多模态聊天机器人组件两部分系列的第一部分。在第一部分中&#xff0c;我们将修改 Gradio 聊天机器人组件&#xff0c;使其能够在同一消息中显示文本和媒体文件&#xff08;视频、音频、图片&#xff09;。在第二部分中&#xff0c;我们将构建一个自定义的文…

深度解析RocketMq源码-持久化组件(一) MappedFile

1. 绪论 rocketmq之所以能够有如此大的吞吐量&#xff0c;离不开两个组件&#xff0c;一个是利用netty实现的高性能网络通信组件&#xff1b;另一个就是利用mmap技术实现的存储组件。而在rocketmq的存储组件中主要有三个组件&#xff0c;分别是持久化文件commitLog&#xff0c…

音樂大模型的崛起:技術革新與行業變革

音樂大模型的崛起&#xff1a;技術革新與行業變革 在過去的一個月中&#xff0c;隨著多個音樂大模型的輪番上線&#xff0c;音樂創作的門檻驟然降低&#xff0c;使得普通人也能輕鬆創作出高質量的音樂作品。這一技術進步引發了廣泛的討論&#xff0c;尤其是在音樂圈內&#xf…

多模态MLLM都是怎么实现的(10)-Chameleon和Florence-2如果你想玩多模态就不能不了解

这个也是一个补充文&#xff0c;前9章基本把该讲的讲了&#xff0c;今天这个内容主要是因为Meta出了一个Chameleon&#xff0c;这个以后可能会成为LLaMA的一个很好的补充&#xff0c;或者说都有可能统一起来&#xff0c;叫LLaMA或者Chamleon或者什么别的&#xff0c;另外我司把…

【图解IO与Netty系列】Netty源码解析——事件循环

Netty源码解析——事件循环 Netty事件循环源码解析select()processSelectedKeys()NioMessageUnsafe#read()NioByteUnsafe#read() runAllTasks() Netty事件循环 当Netty服务端启动起来以后&#xff0c;就可以接受客户端发送的请求&#xff0c;接收到客户端发来的请求后就会有事…

计算机网络 交换机的VLAN配置

一、理论知识 1.VLAN的定义 ①VLAN虚拟局域网&#xff0c;是一种通过将局域网内的设备逻辑地而不是物理地划分成一个个网段从而实现虚拟工作组的技术。 ②IEEE于1999年颁布了用以标准化VLAN实现方案的802.1Q协议标准草案。 ③VLAN技术允许网络管理者将一个物理的LAN逻辑地划…

【Ruby简单脚本02】双色球系统

# frozen_string_literal: true require date # 生成中奖号码的工具 # 红球 1-32 篮球 1-15 def create_num nums [] 6.times do while true num rand(1..32) unless nums.include?(num) nums << num break end end end blue rand(1..15) nums…