Flink之状态TTL机制内容详解

1 状态TTL机制

状态的 TTL机制就是Flink提供的自动化删除状态中的过期数据,配置 TTL的 API可以做到对状态中的数据进行冷热数据分离,将热数据一直保存在状态存储器中,将冷数据进行定期删除.
1.1 API简介

TTL常用API如下:

API注解
setTtl(Time.seconds(…))配置过期时长,当状态中的数据到达这个时长则判定为过期数据,在new StateTtlConfig.Builder(Time.seconds(...))也可以配置,如果同时调用setTtl()方法则进行覆盖
updateTtlOnCreateAndWrite()当该条数据在State中插入或者更新的时候,刷新计时,可用于冷热数据分离
updateTtlOnReadAndWrite()读或写都刷新该数据的TTL计时,可用于冷热数据分离
setStateVisibility(…)用于控制状态中过期数据的可见性,当方法中设置StateTtlConfig.StateVisibility.NeverReturnExpired)时则不可见过期未被清理的数据,如果设置StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp则可见过期未被清理的数据.setStateVisibility(...)由异步线程执行,默认是NeverReturnExpired.
setTtlTimeCharacteristic(…)指定TTL的时间语义,默认是event time,可以配置process time,将StateTtlConfig.TtlTimeCharacteristic.ProcessingTime填入方法的参数即可.
disableCleanupInBackground()禁用后台清理过期数据,使用后则不会清理过期数据
cleanupIncrementally(… , …)针对本地状态后端,即HashMapStateBackend. 增量清理, 每当访问状态数据时都会驱动一次过期检查,清除其中部分数据, 这也是HashMapBackend状态后端唯一能真正清理过期数据的方法,cleanupIncrementally(... , ...)方法中需要传入两个参数int cleanupSizeboolean runCleanupForEveryRecord,cleanupSize是指key的数据量,runCleanupForEveryRecord是指是否清理所有过期数据,如果runCleanupForEveryRecord设置的值为true此时cleanupSize就会失效,但是状态数据较多时会严重影响时效性.
cleanupFullSnapshot()针对快照数据,即checkpoint快照. 全量清理, 在做快照时将所有的过期数据进行清理保证快照中没有过期数据,但是状态后端中的过期数据没有进行清理.
cleanupInRocksdbCompactFilter(xxx)针对于RocksdbStateBackend. 只生效于RocksDB状态后端,通过Flink将CompactFilter传给RocksDB,在RocksDBCompact过程中根据过滤条件将过期数据删除,传入的参数为过期时间.
1.2 代码模板
  • 代码

    class StateMapFunc2 implements MapFunction<String, String>, CheckpointedFunction {private ListState<String> listState;@Overridepublic String map(String s) throws Exception {// 将数据添加到状态存储器中,split[0]为用户IDlistState.add(s);// 获取状态存储器中的数据Iterable<String> iter = listState.get();StringBuffer buffer = new StringBuffer();for (String str : iter) {buffer.append(str);}// 将数据添加到状态存储中return buffer.toString();}@Overridepublic void snapshotState(FunctionSnapshotContext ctx) throws Exception {}@Overridepublic void initializeState(FunctionInitializationContext ctx) throws Exception {OperatorStateStore operatorStateStore = ctx.getOperatorStateStore();// 配置State TTLStateTtlConfig ttlConfig = new StateTtlConfig.Builder(Time.seconds(10)) // 设置数据存活时长,当该数据在State中存活时间超过10s时删除该数据// 这个方法也是设置数据存活时长,和StateTtlConfig.Builder(Time.seconds(10))的作用一样,可以不用这个方法,如果用了会覆盖上面设置的时长.setTtl(Time.seconds(10))/*** updateTtlOnCreateAndWrite和updateTtlOnReadAndWrite二选一即可, 这两个方法的主要作用就是配合setTtl方法将冷热数据进行分离**/// 当该条数据在State中插入或者更新的时候,刷新计时.updateTtlOnCreateAndWrite()// 读或写都刷新该数据的TTL计时.updateTtlOnReadAndWrite()/*** setStateVisibility就是设置状态的可见性,前面setTtl方法是设置删除过期数据,删除过期数据实际上是由另一个异步线程周期性(定时器)的完成,也就是说超过10s的数据不一定会马上被删除,但是* 获取数据的时候底层会将超过存活时间的数据进行判断过滤,setStateVisibility就是可以设置是否可以查询到这些过期的数据,NeverReturnExpired和ReturnExpiredIfNotCleanedUp二选一.**/// 不返回过期数据,这个也是默认策略.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)// 返回还没有被清除的过期数据.setStateVisibility(StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp)// 指定TTL计时时间语义(默认处理时间).setTtlTimeCharacteristic(StateTtlConfig.TtlTimeCharacteristic.ProcessingTime)// 禁用后台清理过期数据.disableCleanupInBackground()/*** 针对本地状态后端,即HashMapStateBackend* 增量清理, 每当获取状态数据时,迭代器都会向前推进。对遍历的状态数据进行检查,并清理过期的数据* 参数1: 设置每次清理的key的数据量(copyOnWriteStateMap中的key的条目数量)* 参数2: 设置是否清理所有条目也就是key对应的数据,如果设置为true则参数1失效,在状态数据较多时不建议设置为true,会严重影响时效性**/.cleanupIncrementally(10, false)/*** 针对快照数据,即checkpoint快照* 全量清理, 在做快照时将所有的过期数据进行清理保证快照中没有过期数据,但是不会清状态后端中的过期数据**/.cleanupFullSnapshot()/*** 针对于RocksdbStateBackend* 只生效于RocksDB状态后端,通过Flink将CompactFilter传给RocksDB,在RocksDB在Compact过程中根据过滤条件将过期数据删除,传入的参数为过期时间(也就是发生Compact时的过滤条件)**/.cleanupInRocksdbCompactFilter(10000).build();// 配置状态描述,在ListStateDescriptor构造器中声明数据类型,简单类型可以使用xxx.class,符合类型需要使用到TypeInformation.of()ListStateDescriptor descriptor = new ListStateDescriptor("MapState", String.class);// 状态描述器加载TTL配置descriptor.enableTimeToLive(ttlConfig);listState = operatorStateStore.getListState(descriptor);}
    }
    

    代码中是以Operator State为例,如果是Keyed State则在open方法中配置TTL.

1.3 TTL机制详解

在代码模板中有API的使用方式,但是TTL机制不同的方法之间存在互斥或者互不影响的关系.

1.3.1 过期时间设置策略
过期时间设置有两种方式:
  • new StateTtlConfig.Builder(Time.seconds(10))
  • setTtl(Time.seconds(10))

这两种方式都是设置过期时间使用的,但是只需要选用其中一种即可,如果在创建StateTtlConfig对象时就设置了过期时间,又在setTtl方法中设置了过期时间,则会对过期时间进行覆盖,本质上二者都是对同一个变量进行赋值.

  • 源码

    new StateTtlConfig.Builder(Time.seconds(10))

    public static class Builder {private UpdateType updateType = OnCreateAndWrite;private StateVisibility stateVisibility = NeverReturnExpired;private TtlTimeCharacteristic ttlTimeCharacteristic = ProcessingTime;private Time ttl;private boolean isCleanupInBackground = true;// ...// 调用Builder时对ttl变量进行了赋值public Builder(@Nonnull Time ttl) {this.ttl = ttl;}// ...
    }
    

    setTtl(Time.seconds(10))

    public static class Builder {private UpdateType updateType = OnCreateAndWrite;private StateVisibility stateVisibility = NeverReturnExpired;private TtlTimeCharacteristic ttlTimeCharacteristic = ProcessingTime;private Time ttl;private boolean isCleanupInBackground = true;// ...// 这里同样是对ttl进行了赋值@Nonnullpublic Builder setTtl(@Nonnull Time ttl) {this.ttl = ttl;return this;}// ...
    }
    

    通过源码可以看出,使用此API时在创建StateTtlConfig对象时给一个过期时间即可,不需要再调用setTtl方法

1.3.2 过期时间刷新策略
过期时间刷新策略有两种:
  • updateTtlOnCreateAndWrite()
  • updateTtlOnReadAndWrite()

这两方法就是互斥的,只能生效一个,同样是因为二者都是对同一个变量进行赋值,就是说在二者同时调用的情况下,谁在后面调用谁就生效,如代码模板中线调用的updateTtlOnCreateAndWrite()后调用的updateTtlOnReadAndWrite()那么生效的就是updateTtlOnReadAndWrite()策略.

  • 源码

    public static class Builder {private UpdateType updateType = OnCreateAndWrite;private StateVisibility stateVisibility = NeverReturnExpired;private TtlTimeCharacteristic ttlTimeCharacteristic = ProcessingTime;private Time ttl;private boolean isCleanupInBackground = true;// ...// 此方法给updateType进行赋值@Nonnullpublic Builder setUpdateType(UpdateType updateType) {this.updateType = updateType;return this;}/** 二者方法体中调用的都是同一个方法setUpdateType*/@Nonnullpublic Builder updateTtlOnCreateAndWrite() {return setUpdateType(UpdateType.OnCreateAndWrite);}@Nonnullpublic Builder updateTtlOnReadAndWrite() {return setUpdateType(UpdateType.OnReadAndWrite);}// ...
    }
    

    源码可以看出二者调用同一个方法setUpdateType,而setUpdateType方法又是给updateType赋值的一个方法,所以再使用时要根据实际的业务场景选择updateTtlOnCreateAndWrite()updateTtlOnReadAndWrite()中的一个.

1.3.3 返回过期数据策略
回返过期数据策略有两种:
  • setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
  • setStateVisibility(StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp)
  • 源码

    public static class Builder {private UpdateType updateType = OnCreateAndWrite;private StateVisibility stateVisibility = NeverReturnExpired;private TtlTimeCharacteristic ttlTimeCharacteristic = ProcessingTime;private Time ttl;private boolean isCleanupInBackground = true;// ...// 此方法给stateVisibility进行赋值@Nonnullpublic Builder setStateVisibility(@Nonnull StateVisibility stateVisibility) {this.stateVisibility = stateVisibility;return this;}// 下面两个方法体中都是调用setStateVisibility方法@Nonnullpublic Builder returnExpiredIfNotCleanedUp() {return setStateVisibility(StateVisibility.ReturnExpiredIfNotCleanedUp);}@Nonnullpublic Builder neverReturnExpired() {return setStateVisibility(StateVisibility.NeverReturnExpired);}// ...
    }
    

    这二者同样是互斥的原则,使用选其一即可,即使都调用也是后被调用者生效.

1.3.4 过期数据清除策略
过期数据清除策略有三种:
  • cleanupIncrementally(10, false)
  • cleanupFullSnapshot()
  • cleanupInRocksdbCompactFilter(10000)

这三种过期数据清除策略针对的是不同的场景(本地状态后端、快照、RocksDB状态后端),所以三者是可以同时使用的,不会存在同时调用后者会对前者进行覆盖的问题,在API简介章节介绍了这种三策略的作用,这里着重介绍cleanupIncrementally策略.

HashMapStateBackend使用的存储结构是Flink团队自己开发的一种数据存储结构copyOnWriteStateMap,说这个存储结构是因为cleanupIncrementally策略删除过期数据的操作和这种结构息息相关.

关于copyOnWriteStateMap的结构可以简单的理解为K,V形式存储的结构,其中的Key就是使用keyBy时指定的key,如果没有使用keyBy那么所有数据key都会给一个相同的默认值,其中的Value是指ListStateMapState等,也就是在构建状态存储器时候选择存储形式,如下图:
ttl02

在本地状态后端(HashMapStateBackend)中默认使用的就是cleanupIncrementally清除策略,默认值为cleanupIncrementally(5, false),也就是说只要设置了TTL的过期时间,HashMapStateBackend就会使用cleanupIncrementally策略来清理过期数据,只不过cleanupIncrementally对用户提供了选择方式,这里将结合图解说明cleanupIncrementally如何清除过期数据的.
ttl02

  1. 只要访问状态数据就会触发cleanupIncrementally执行.
  2. 如果用户没有设置cleanupIncrementally,TTL会根据cleanupIncrementally(5, false)来删除过期数据,如果用户指定了参数则按照用户定义的参数删除数据.
  3. 比如现在是cleanupIncrementally(10, false),迭代器会从k1开始,到k10结束,将这10个条目的key中的ListState中的过期数据进行清理.
  4. CopyOnWriteStateMap中的数据存放是无序的,而且Flink在创建CopyOnWriteStateMap时候给的默认大小是128,也就说处理数据中key的数量超过128,否则就算只有一个key,CopyOnWriteStateMap的大小也是128,迭代器最少也要迭代128次.
  5. 当设置cleanupIncrementally(10, false)时,假如数据中只有一个key,那么这个k -> ListState(...)CopyOnWriteStateMap中的存放位置是任意的,假设在CopyOnWriteStateMap中存放的位置是22,就会出现当第一次访问状态数据时,并不会删除这个key对应的ListState中的数据,访问状态数据时同样还是不会删除过期数据,只有第三次访问时,才会删除过期数据,因为cleanupSize设置的大小为10,迭代器每次只会迭代10个条目的key,每当访问状态数据时,迭代器都会从最后一次迭代的指针位置开始继续推进.
  6. 当迭代器的指针推进位置到128时,又会从0的位置从新开始推进(这里是指CopyOnWriteStateMap的大小是128),以此类推.
  7. 如果cleanupIncrementally(10, true)中的runCleanupForEveryRecordtrue时,那就是说每次访问状态数据迭代器都会把CopyOnWriteStateMap中的所有条目都清理一遍,所以说为true时第一个参数(cleanupSize)会失效.

cleanupIncrementally的执行机制就很好的解释了,为什么在使用本地状态后端(HashMapStateBackend)时经常会出现明明已经来了7,8条数据,数据过期数据还没有清理到,或者距离上一次访问状态数据过了1h甚至更久都没有清理过期数据的情况.

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/164048.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Docker可视化管理界面工具Portainer安装

Portainer是Docker容器管理界面工具&#xff0c;可以直观的管理Docker。 部署也很简单&#xff1a; 官方安装文档地址 1、创建数据卷 docker volume create portainer_data2、下载允许容器 docker run -d -p 8000:8000 -p 9443:9443 --name portainer --restartalways -v /v…

放弃无谓的「技术氛围」幻想,准备战斗

大型科技公司每年都招聘大量研发人才&#xff0c;这给了很多人一种错觉&#xff0c;认为是「技术」导致了这些公司的成功&#xff0c;其实他们的成功是技术推动的市场战略的成功&#xff0c;是市场需要某项服务&#xff0c;才需要研发人员夜以继日的埋头苦干。资本绝不会做亏本…

vue2 element el-transfer穿梭框组件支持拖拽及排序 已封装,随取随用

项目场景&#xff1a; 项目中有个功能用到穿梭框组件&#xff0c;新版本需要支持穿梭框组件排序&#xff0c;由于element2版本中的穿梭框组件本身不支持排序功能 在此不仅需要支持随意更换顺序&#xff0c;还支持从一侧拖拽至另一侧&#xff0c;具体功能效果图如下&#xff1…

为什么JSX只能在函数的返回语句中使用

JSX只能在函数的返回语句中使用&#xff0c;因为JSX本质上是一种声明式的语法&#xff0c;用于描述React组件的结构和外观。在函数的返回语句中使用JSX&#xff0c;可以将JSX表达式嵌入到组件的输出中。 当我们编写一个React组件时&#xff0c;我们通常需要定义一个Render函数…

消息中间件——RabbitMQ(五)快速入门生产者与消费者,SpringBoot整合RabbitMQ!

前言 本章我们来一次快速入门RabbitMQ——生产者与消费者。需要构建一个生产端与消费端的模型。什么意思呢&#xff1f;我们的生产者发送一条消息&#xff0c;投递到RabbitMQ集群也就是Broker。 我们的消费端进行监听RabbitMQ&#xff0c;当发现队列中有消息后&#xff0c;就进…

森利威尔SL4010 升压恒压 12V升压24V 12V升压36V 12V升压48V

在当今的电子设备中&#xff0c;电源管理系统的设计是非常重要的。为了保证设备的稳定运行&#xff0c;升压和恒压电源的应用已经成为不可或缺的一部分。在这篇文章中&#xff0c;我们将介绍森利威尔SL4010升压恒压电源&#xff0c;它可以实现12V升压24V、12V升压36V、12V升压4…

c 在文本终端中显示yuv图片

把yuv422 转为rgb32 &#xff0c;利用framebuffer 显示 #include <stdio.h> #include <sys/types.h> #include <sys/stat.h> #include <fcntl.h> #include <stdlib.h> #include <unistd.h> #include <sys/ioctl.h> #include <lin…

vue2.6源码分析

vue相关文档 vue-cli官方文档 vuex官方文档 vue-router 官方文档 vue2.6源码地址 如何调试源码 package.json 添加了--sourcemap "scripts": {"dev": "rollup -w -c scripts/config.js --environment TARGET:web-full-dev --sourcemap" }新增…

linux apt update错误提示修复

错误提示&#xff1a; E: Release file for http://security.debian.org/dists/bullseye-security/InRelease is expired (invalid since 15d 14h 45min 26s). Updates for this repository will not be applied. E: Release file for http://ftp.jp.debian.org/debian/dists/b…

【Hello Go】Go语言并发编程

并发编程 概述基本概念go语言的并发优势 goroutinegoroutine是什么创建goroutine如果主goroutine退出runtime包GoschedGoexitGOMAXPROCS channel无缓冲的channel有缓冲的channelrange和close单向channel 定时器TimerTicker Select超时 概述 基本概念 并行和并发概念 并行 &…

CVE-2023-6099:优卡特脸爱云一脸通智慧管理平台SystemMng.ashx接口未授权漏洞复现

文章目录 优卡特脸爱云一脸通智慧管理平台未授权SystemMng.ashx接口漏洞复现&#xff08;CVE-2023-6099&#xff09; [附POC]0x01 前言0x02 漏洞描述0x03 影响版本0x04 漏洞环境0x05 漏洞复现1.访问漏洞环境2.构造POC3.复现 0x06 修复建议 优卡特脸爱云一脸通智慧管理平台未授权…

mysql字符串转为数字的三种方法、字符串转日期

隐式转换 在MySQL中&#xff0c;使用0运算符可以将一个非数字的值隐式地转换为数字。这在进行数学运算或比较操作时非常有用。 需要注意的是&#xff0c;在使用0进行隐式转换时&#xff0c;MySQL会尽可能将字符串转换为数字。如果字符串不能转换为数字&#xff0c;则会返回0。…

【解决】HDFS JournalNode启动慢问题排查

文章目录 一. 问题描述二. 问题分析1. 排查机器性能2. DNS的问题 三. 问题解决 一句话&#xff1a;因为dns的问题导致journalnode启动时很慢&#xff0c;通过修复dns对0.0.0.0域名解析&#xff0c;修复此问题。 一. 问题描述 从journalnode启动到服务可用&#xff0c;完成RPC…

使用Python将图片转换为PDF

将图片转为 PDF 的主要原因之一是为了方便共享和传输。此外&#xff0c;将多张图片合并成一个 PDF 文件还可以简化文件管理。之前文章详细介绍过如何使用第三方库Spire.PDF for Python将PDF文件转为图片&#xff0c;那么本文介绍使用同样工具在Python中实现图片转PDF文件的功能…

【OpenCV+OCR】计算机视觉:识别图像验证码中指定颜色文字

文章目录 1. 写在前面2. 读取验证码图像3. 生成颜色掩码4. 生成黑白结果图5. OCR文字识别6. 测试结果 【作者主页】&#xff1a;吴秋霖 【作者介绍】&#xff1a;Python领域优质创作者、阿里云博客专家、华为云享专家。长期致力于Python与爬虫领域研究与开发工作&#xff01; 【…

Spring Security(安全框架,必须登录成功才能访问指定资源)

一、背景知识 1、Spring Security 是一个能够为基于Spring的企业应用系统提供声明式的安全访问控制解决方案的安全框架。它提供了一组可以在Spring应用上下文中配置的Bean&#xff0c;充分利用了Spring IoC&#xff0c;DI&#xff08;IOC: 控制反转Inversion of Control ,DI:D…

24路电磁锁控板的特点和主要参数

智能快递柜、智能生鲜柜、电子存储柜、超市寄存柜、智能送餐柜、电子更衣柜、档案柜等物联网终端设备&#xff0c;都是采用电磁锁控制&#xff0c;这种电磁锁控制板俗称锁控板。锁控板可以远程控制储物柜的开关以及远程监控并提供锁的反馈信号。沐渥开发的24路电磁锁控板可以控…

AI:87-基于深度学习的街景图像地理位置识别

🚀 本文选自专栏:人工智能领域200例教程专栏 从基础到实践,深入学习。无论你是初学者还是经验丰富的老手,对于本专栏案例和项目实践都有参考学习意义。 ✨✨✨ 每一个案例都附带有在本地跑过的代码,详细讲解供大家学习,希望可以帮到大家。欢迎订阅支持,正在不断更新中,…

OpenAI 曾收到 AI 重大突破警告;半独立的 OpenAI 比与微软合并更好丨 RTE 开发者日报 Vol.91

开发者朋友们大家好&#xff1a; 这里是 「RTE 开发者日报」 &#xff0c;每天和大家一起看新闻、聊八卦。我们的社区编辑团队会整理分享 RTE &#xff08;Real Time Engagement&#xff09; 领域内「有话题的 新闻 」、「有态度的 观点 」、「有意思的 数据 」、「有思考的 文…

ubuntu下docker环境使用GPU配置

本文主要讲述整个命令流程&#xff0c;具体讲解请看官网nvidia-容器工具包和一篇总结得很详细的博文docker使用GPU总结 docker的版本必须安装19.0版本以上的&#xff0c;这里也只讲19.0版本以上的使用方法 首先设置一下网络信息 curl -fsSL https://nvidia.github.io/libnvi…