合肥专业做网站的公司有哪些/济南网站制作公司

合肥专业做网站的公司有哪些,济南网站制作公司,东莞专业网,no.7主题wordpress&&大数据学习&& 🔥系列专栏: 👑哲学语录: 承认自己的无知,乃是开启智慧的大门 💖如果觉得博主的文章还不错的话,请点赞👍收藏⭐️留言📝支持一下博主哦&#x1f91…

&&大数据学习&&

🔥系列专栏: 👑哲学语录: 承认自己的无知,乃是开启智慧的大门
💖如果觉得博主的文章还不错的话,请点赞👍+收藏⭐️+留言📝支持一下博主哦🤞


之前提到,只有在KeyedStream中才支持使用TimerService设置定时器的操作。所以一般情况下,我们都是先做了keyBy分区之后,再去定义处理操作;代码中更加常见的处理函数是KeyedProcessFunction。

1 定时器(Timer)和定时服务(TimerService

在.onTimer()方法中可以实现定时处理的逻辑,而它能触发的前提,就是之前曾经注册过定时器、并且现在已经到了触发时间。注册定时器的功能,是通过上下文中提供的“定时服务”来实现的。

定时服务与当前运行的环境有关。前面已经介绍过,ProcessFunction的上下文(Context)中提供了.timerService()方法,可以直接返回一个TimerService对象。TimerService是Flink关于时间和定时器的基础服务接口,包含以下六个方法:

// 获取当前的处理时间long currentProcessingTime();// 获取当前的水位线(事件时间)long currentWatermark();// 注册处理时间定时器,当处理时间超过time时触发void registerProcessingTimeTimer(long time);// 注册事件时间定时器,当水位线超过time时触发void registerEventTimeTimer(long time);// 删除触发时间为time的处理时间定时器void deleteProcessingTimeTimer(long time);// 删除触发时间为time的处理时间定时器void deleteEventTimeTimer(long time);

六个方法可以分成两大类:基于处理时间和基于事件时间。而对应的操作主要有三个:获取当前时间,注册定时器,以及删除定时器。需要注意,尽管处理函数中都可以直接访问TimerService,不过只有基于KeyedStream的处理函数,才能去调用注册和删除定时器的方法;未作按键分区的DataStream不支持定时器操作,只能获取当前时间。

TimerService会以键(key)和时间戳为标准,对定时器进行去重;也就是说对于每个key和时间戳,最多只有一个定时器,如果注册了多次,onTimer()方法也将只被调用一次

2 KeyedProcessFunction案例

基于keyBy之后的KeyedStream,直接调用.process()方法,这时需要传入的参数就是KeyedProcessFunction的实现类。

stream.keyBy( t -> t.f0 ).process(new MyKeyedProcessFunction())

类似地,KeyedProcessFunction也是继承自AbstractRichFunction的一个抽象类,与ProcessFunction的定义几乎完全一样,区别只是在于类型参数多了一个K,这是当前按键分区的key的类型。同样地,我们必须实现一个.processElement()抽象方法,用来处理流中的每一个数据;另外还有一个非抽象方法.onTimer(),用来定义定时器触发时的回调操作。

代码如下:

public class KeyedProcessTimerDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("hadoop102", 7777).map(new WaterSensorMapFunction()).assignTimestampsAndWatermarks(WatermarkStrategy.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner((element, ts) -> element.getTs() * 1000L));KeyedStream<WaterSensor, String> sensorKS = sensorDS.keyBy(sensor -> sensor.getId());// TODO Process:keyedSingleOutputStreamOperator<String> process = sensorKS.process(new KeyedProcessFunction<String, WaterSensor, String>() {/*** 来一条数据调用一次* @param value* @param ctx* @param out* @throws Exception*/@Overridepublic void processElement(WaterSensor value, Context ctx, Collector<String> out) throws Exception {//获取当前数据的keyString currentKey = ctx.getCurrentKey();// TODO 1.定时器注册TimerService timerService = ctx.timerService();// 1、事件时间的案例Long currentEventTime = ctx.timestamp(); // 数据中提取出来的事件时间timerService.registerEventTimeTimer(5000L);System.out.println("当前key=" + currentKey + ",当前时间=" + currentEventTime + ",注册了一个5s的定时器");// 2、处理时间的案例//                        long currentTs = timerService.currentProcessingTime();//                        timerService.registerProcessingTimeTimer(currentTs + 5000L);//                        System.out.println("当前key=" + currentKey + ",当前时间=" + currentTs + ",注册了一个5s后的定时器");// 3、获取 process的 当前watermark//                        long currentWatermark = timerService.currentWatermark();//                        System.out.println("当前数据=" + value + ",当前watermark=" + currentWatermark);// 注册定时器: 处理时间、事件时间//                        timerService.registerProcessingTimeTimer();//                        timerService.registerEventTimeTimer();// 删除定时器: 处理时间、事件时间//                        timerService.deleteEventTimeTimer();//                        timerService.deleteProcessingTimeTimer();// 获取当前时间进展: 处理时间-当前系统时间,  事件时间-当前watermark//                        long currentTs = timerService.currentProcessingTime();//                        long wm = timerService.currentWatermark();}/*** TODO 2.时间进展到定时器注册的时间,调用该方法* @param timestamp 当前时间进展,就是定时器被触发时的时间* @param ctx       上下文* @param out       采集器* @throws Exception*/@Overridepublic void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {super.onTimer(timestamp, ctx, out);String currentKey = ctx.getCurrentKey();System.out.println("key=" + currentKey + "现在时间是" + timestamp + "定时器触发");}});process.print();env.execute();}}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/70303.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

10分钟上手DeepSeek开发:SpringBoot + Vue2快速构建AI对话系统

作者&#xff1a;后端小肥肠 目录 1. 前言 为什么选择DeepSeek&#xff1f; 本文技术栈 2. 环境准备 2.1. 后端项目初始化 2.2. 前端项目初始化 3. 后端服务开发 3.1. 配置文件 3.2. 核心服务实现 4. 前端服务开发 4.1. 聊天组件ChatWindow.vue开发 5. 效果展示及源…

通过监督微调提升多语言大语言模型性能

引言 澳鹏助力一家全球科技公司提升其大语言模型&#xff08;LLM&#xff09;的性能。通过提供结构化的人工反馈形式的大语言模型训练数据&#xff0c;让该模型在30多种语言、70多种方言中的表现得到优化。众包人员们进行多轮对话&#xff0c;并依据回复的相关性、连贯性、准确…

大数据开发治理平台~DataWorks(核心功能汇总)

目录 数据集成 功能概述 使用限制 功能相关补充说明 数据开发 功能概述 数据建模 功能概述 核心技术与架构 数据分析 功能概述 数据治理 数据地图 功能概述 数据质量 功能概述 数据治理资产 功能概述 使用限制 数据服务 功能概述 数据集成 DataWorks的数据…

STM32 看门狗

目录 背景 独立看门狗&#xff08;IWDG&#xff09; 寄存器访问保护 窗口看门狗&#xff08;WWDG&#xff09; 程序 独立看门狗 设置独立看门狗程序 第一步、使能对独立看门狗寄存器的写操作 第二步、设置预分频和重装载值 第三步、喂狗 第四步、使能独立看门狗 喂狗…

vue3-03初学vue3中的配置项setup(Composition API (组合API组件中所用到的:数据、方法等,均要配置在setup中)

1.关于setup Vue3.0中一个新的配置项&#xff0c;值为一个函数.setup是所有Composition API (组合API)“表演的舞台”m组件中所用到的:数据、方法等等&#xff0c;均要配置在setup中。 2..setup函数使用 setup函数的两种返回值 1.若返回一个对象&#xff0c;则对象中的属性、…

leetcode 2585. 获得分数的方法数

题目如下 数据范围 莫要被困难的外衣骗了&#xff0c;本题就是有数量限制的完全背包问题。显然我们可以令 f(x,y)为当有x种题目时分数为y时的方法数 令某种题目的数量为k 那么方法数应该是 f(x,y) f(x - 1,y - k * (分值))其中(0 < k < 题目数量)通过代码 class So…

VS Code 如何搭建C/C++开发环境

目录 1.VS Code是什么 2. VS Code的下载和安装 2.1 下载和安装 2.2.1 下载 2.2.2 安装 2.2 环境的介绍 2.3 安装中文插件 3. VS Code配置C/C开发环境 3.1 下载和配置MinGW-w64编译器套件 3.1.1 下载 3.1.2 配置 3.2 安装C/C插件 3.3 重启VSCode 4. 在VSCode上编写…

数仓搭建:DWS层(服务数据层)

DWS层示例: 搭建日主题宽表 需求 维度 步骤 在hive中建数据库dws >>建表 CREATE DATABASE if NOT EXISTS DWS; 建表sql CREATE TABLE yp_dws.dws_sale_daycount( --维度 city_id string COMMENT 城市id, city_name string COMMENT 城市name, trade_area_id string COMME…

伪类选择器

作用&#xff1a;选中特殊状态的元素 一、动态伪类 1. :link 超链接 未被访问 的状态。 2. :visited 超链接 访问过 的状态。 3. :hover 鼠标 悬停 在元素上的状态。 4. :active 元素 激活 的状态。 什么是激活&#xff1f; —— 按下鼠标不松开。 注意点&#xf…

Kubernetes:EKS 中 Istio Ingress Gateway 负载均衡器配置及常见问题解析

引言 在云原生时代&#xff0c;Kubernetes 已经成为容器编排的事实标准。AWS EKS (Elastic Kubernetes Service) 作为一项完全托管的 Kubernetes 服务&#xff0c;简化了在 AWS 上运行 Kubernetes 的复杂性。Istio 作为服务网格领域的佼佼者&#xff0c;为微服务提供了流量管理…

挪车小程序挪车二维码php+uniapp

一款基于FastAdminThinkPHP开发的匿名通知车主挪车微信小程序&#xff0c;采用匿名通话的方式&#xff0c;用户只能在有效期内拨打车主电话&#xff0c;过期失效&#xff0c;从而保护车主和用户隐私。提供微信小程序端和服务端源码&#xff0c;支持私有化部署。 更新日志 V1.0…

unity 设置可配置文件asset

使用可序列化类保存配置&#xff0c;并且将可序列化类保存成Unity的自定义文件&#xff08;.asset&#xff09;,然后配置自定义文件&#xff08;.asset&#xff09;。 [Serializable][CreateAssetMenu(menuName "ScriptableOject/BuildConfig")]public class BuildC…

一周学会Flask3 Python Web开发-http响应状态码

锋哥原创的Flask3 Python Web开发 Flask3视频教程&#xff1a; 2025版 Flask3 Python web开发 视频教程(无废话版) 玩命更新中~_哔哩哔哩_bilibili 在Flask程序中&#xff0c;客户端发出的请求触发相应的视图函数&#xff0c;获取返回值会作为响应的主体&#xff0c;最后生成…

scratch猜年龄互动小游戏 2024年12月scratch四级真题 中国电子学会 图形化编程 scratch四级真题和答案解析

scratch猜年龄互动小游戏 2024年12月电子学会图形化编程Scratch等级考试四级真题 一、题目要求 老爷爷的年龄是1-100的随机数,老爷爷询问“请猜猜我的年龄是多少?”,输入年龄,老爷爷会回答"大了"或者"小了,直到最后成功猜出年龄。 1、准备工作 (1)删…

unity学习47:寻路和导航,unity2022后版本如何使用 Navmesh 和 bake

目录 1 寻路和导航对移动的不同 1.1 基础的移动功能 1.1.1 基础移动 1.1.2 智能导航寻路 1.1.3 智能导航寻路还可以 2 如何实现这个效果&#xff1f; 2.1 通过地图网格的形式 2.1.1 警告信息 the static value has been deprecated的对应搜索 2.1.2 新的navigation ba…

达梦存储过程执行后 sql日志信息粗读

如何调试达梦存储过程&#xff1f;快速定位问题 dmgdb 或 manager图形工具 我觉得还可以靠sql日志和DBMS_OUTPUT包&#xff0c;不过最省事的办法放到了最后面&#xff0c;一个sql就能搞清楚了 来段演示代码 set serveroutput on drop table t1; create TABLE t1 (id int, gc…

fpga助教面试题

第一题 module sfp_pwm( input wire clk, //clk is 200M input wire rst_n, input wire clk_10M_i, input wire PPS_i, output reg pwm ) reg [6:0] cunt ;always (posedge clk ) beginif(!rst_n)cunt<0;else if(cunt19) //200M是10M的20倍cunt<0;elsecunt<cunt1;…

【分布式】Hadoop完全分布式的搭建(零基础)

Hadoop完全分布式的搭建 环境准备&#xff1a; &#xff08;1&#xff09;VMware Workstation Pro17&#xff08;其他也可&#xff09; &#xff08;2&#xff09;Centos7 &#xff08;3&#xff09;FinalShell &#xff08;一&#xff09;模型机配置 0****&#xff09;安…

GPT-Sovits:语音克隆训练-遇坑解决

前言 本来以为3050完全无法执行GPT-Sovits训练的&#xff0c;但经过实践发现其实是可以&#xff0c;并且仅花费了十数分钟便成功训练和推理验证了自己的语音模型。 官方笔记&#xff1a;GPT-SoVITS指南 语雀 项目地址&#xff1a;https://github.com/RVC-Boss/GPT-SoVITS 本人…

React之旅-03 路由

做为前端开发框架&#xff0c;React 的组件化设计思想&#xff0c;使前端开发变得更加灵活高效。对于大型复杂的项目来说&#xff0c;页面之间的导航变得尤为重要。因此如何管理路由&#xff0c;是所有开发者必须考虑的问题。 React 官方推荐的路由库-React Router&#xff0c…