Flink 1.8.0中的状态生存时间特性:如何自动清理应用程序的状态

对于许多状态流式计算程序来说,一个常见的需求是自动清理应用程序的状态(state),以便有效地控制状态大小,或者控制程序访问状态的有效时间(例如受限于诸如GDPR等法律条规)。Apache Flink自1.6.0版本引入了状态的生存时间(time-to-live,TTL)功能,使得应用程序的状态清理和有效的状态大小管理成为可能。

在本文中,我们将讨论引入状态生存时间特性的动机并讨论其相关用例。此外,我们还将演示如何使用和配置该特性。同时,我们将会解释Flink如何借用状态生存时间特性在内部管理状态,并对Flink 1.8.0中该功能引入的相关新特性进行一些展示。本文章最后对未来的改进和扩展作了展望。

状态的暂时性

有两个主要原因可以解释为什么状态只应该维持有限的时间。让我们先设想一个Flink应用程序,它接收用户登录事件流,并为每个用户存储上一次登录时的相关事件信息和时间戳,以改善高频访问用户的体验。

  • 控制状态的大小。 状态生存时间特性的主要使用场景,就是能够有效地管理不断增长的状态大小。通常情况下,数据只需要暂时保存,例如用户处在一次网络连接会话中。当用户访问事件结束时,我们实际上就没有必要保存该用户的状态,来减少无谓的状态存储空间占用。Flink 1.8.0引入的基于生存时间的后台状态清理机制,使得我们能够自动地对无用数据进行清理。此前,应用程序开发人员必须采取额外的操作并显式地删除无用状态以释放存储空间。这种手动清理过程不仅容易出错,而且效率低下。以我们上述用户登录的案例为例,因为这些不活跃用户的相关信息会被自动过期清理掉,我们就不再需要额外存储上次登录的时间戳。
  •  
  • 符合(敏感)数据保护的要求。 随着数据隐私法规的发展(例如欧盟颁布的通用数据保护法规GDPR),遵守此类法规的相关要求,或将数据进行敏感处理已经成为许多应用程序的首要任务。此类使用场景的的一个典型案例,就需要仅在特定时间段内保存数据并防止其后可以再次访问该数据。这对于为客户提供短期服务的公司来说是一个常见的挑战。状态生存时间这一特性,就保证了应用程序仅可以在有限时间内进行访问,有助于遵守数据保护法规。

这两个需求都可以通过状态生存时间来解决,这个功能可以周期性地、持续地删除状态中的键值,一旦它变得不必要或不重要,并且不再需要保存在存储中时。

对应用状态的持续清理

Apache Flink的1.6.0版本引入了状态生存时间特性。它使流处理应用程序的开发人员能够配置算子的状态,使其在定义的超时(生存时间)后过期并被清除。在Flink 1.8.0中,该功能得到了进一步扩展,对RocksDB和堆内存状态后端(FsStateBackendMemoryStateBackend)的旧数据进行连续性的清理。
在Flink的DataStream API中,应用程序状态是由状态描述符(state descriptor)来定义的。状态生存时间是通过将StateTtlConfiguration对象传递给状态描述符来配置的。下面的Java示例演示了如何创建状态生存时间的配置,并将其提供给状态描述符,该状态描述符将用户的上次登录时间保存为Long值:

import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.common.state.ValueStateDescriptor;StateTtlConfig ttlConfig = StateTtlConfig.newBuilder(Time.days(7)).setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite).setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired).build();ValueStateDescriptor<Long> lastUserLogin =new ValueStateDescriptor<>("lastUserLogin", Long.class);lastUserLogin.enableTimeToLive(ttlConfig);

Flink提供了多个选项来配置状态生存时间的行为:

  • 什么时候重置生存时间? 默认情况下,当状态被修改时,生存时间就会被更新。我们也可以在读操作访问状态时更新相关项的生存时间,但这样要花费额外的写操作来更新时间戳。
  • 已经过期的数据是否可以访问? 状态生存时间机制使用的是惰性策略来清除过期状态。这可能导致应用程序会尝试读取过期但尚未删除的状态。用户可以配置对这样的读取请求是否返回过期状态。无论哪种情况,过期状态都会在之后立即被删除。虽然返回已经过期的状态有利于数据可用性,但不返回过期状态更符合相关数据保护法规的要求。
  • 哪种时间语义被用于定义生存时间? 在Apache Flink 1.8.0中,用户只能根据处理时间(Processing Time)定义状态生存时间。未来的Flink版本中计划支持事件时间(Event Time)。

关于状态生存时间的更多信息,可以参考Flink官方文档。

在实现上,状态生存时间特性会额外存储上一次相关状态访问的时间戳。虽然这种方法增加了一些存储开销,但它允许Flink在访问状态、创建检查点、恢复或存储清理过程时可以检查过期状态。

“取走垃圾数据”

在访问状态对象时,Flink将检查其时间戳,并在状态过期时清除状态(是否返回过期状态,则取决于配置的过期数据可见性)。由于这种访问时才删除的特性,除非被垃圾回收,否则那些永远不被访问过期数据将仍然占用存储空间。
那么,在没有显示处理过期状态的情况下,如何删除这些数据呢?通常,我们可以配置不同的策略进行后台删除。

保证完整快照中不包含过期数据

Flink 1.6.0已经支持在创建检查点(checkpoint)或保存点(savepoint)的完整快照时不包含过期状态。需要注意的是,创建增量快照时并不支持剔除过期状态。完整快照时的过期状态剔除必须如下例所示进行显示启用:

StateTtlConfig ttlConfig = StateTtlConfig.newBuilder(Time.days(7)).cleanupFullSnapshot().build();

上述配置并不会影响本地状态存储的大小,但是整个作业的完整快照的大小将会减小。只有当用户从快照重新加载其状态到本地时,才会清除用户的本地状态。

由于上述这些限制,在Flink 1.6.0中程序仍需要过期后主动删除状态。为了改善用户体验,Flink1.8.0引入了两种自主清理策略,分别针对两种状态后端类型:

堆内存状态后端的增量清理

此方法只适用于堆内存状态后端(FsStateBackendMemoryStateBackend)。其基本思路是在存储后端的所有状态条目上维护一个全局的惰性迭代器。某些事件(例如状态访问)会触发增量清理,而每次触发增量清理时,迭代器都会向前遍历删除已遍历的过期数据。以下代码示例展示了如何启用增量清理:

StateTtlConfig ttlConfig = StateTtlConfig.newBuilder(Time.days(7))// check 10 keys for every state access.cleanupIncrementally(10, false).build();

如果启用该功能,则每次状态访问都会触发清除。而每次清理时,都会检查一定数量的状态条目是否过期。其中有两个调整参数。第一个定义了每次清理时要检查的状态条目数。第二个参数是一个标志位,用于表示是否在每条记录处理(record processed)之后(而不仅仅是访问状态,state accessed),都还额外触发清除逻辑。
关于这种方法有两个重要的注意事项:首先是增量清理所花费的时间会增加记录处理的延迟。其次,如果没有状态被访问(state accessed)或者没有记录被处理(record processed),过期的状态也将不会被删除。

RocksDB状态后端利用后台压缩来清理过期状态

如果使用RocksDB状态后端,则可以启用另一种清理策略,该策略基于Flink定制的RocksDB压缩过滤器(compaction filter)。RocksDB会定期运行异步的压缩流程以合并数据并减少相关存储的数据量,该定制的压缩过滤器使用生存时间检查状态条目的过期时间戳,并丢弃所有过期值。
使用此功能的第一步,需要设置以下配置选项:state.backend.rocksdb.ttl.compaction.filter.enabled。一旦配置使用RocksDB状态后端后,如以下代码示例将会启用压缩清理策略:

StateTtlConfig ttlConfig = StateTtlConfig.newBuilder(Time.days(7)).cleanupInRocksdbCompactFilter().build();

需要注意的是启用Flink的生存时间压缩过滤机制后,会放缓RocksDB的压缩速度。

使用定时器进行状态清理

另一种手动清除状态的方法是基于Flink的计时器,这也是社区评估的一个想法。使用这种方法,将为每个状态访问注册一个清除计时器。这种方法的清理更加精准,因为状态一旦过期就会被立刻删除。但是由于计时器会与原始状态一起存储会消耗空间,开销也更大一些。

未来展望

除了上面提到的基于计时器的清理策略之外,Flink社区还计划进一步改进状态生存时间特性。可能的改进包括为事件时间(event time)添加生存时间的支持(目前只支持处理时间)和为可查询状态(queryable state)启用状态生存时间机制。

总结

状态可访问时间的限制和应用程序状态大小的控制,是状态流处理领域的常见挑战,Flink的1.8.0版本通过添加对过期状态对象连续性后台清理的支持,显著改进了状态生存时间特性。新的清理机制可以不再需要手动实现状态清理的工作,而且由于惰性清理的机制,执行效率也更高。总得来说,状态生存时间方便用户控制应用程序状态的大小,使得用户可以将精力集中在应用程序的核心逻辑开发上。

原文链接
本文为云栖社区原创内容,未经允许不得转载。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/518309.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

JavaScript-Iterable迭代

Iterable ES6新特性 遍历数组 // for of 打印值 &#xff0c; for in 打印下标 var arr [4,5,6] for (const number of arr) {console.log(number) }遍历Map var map new Map([[whl,100],[ht,110],[other,0]]) for (let x of map) {console.log(x)console.log(x[0])consol…

阿里小程序亮相2019上海云峰会:大生态促成许多“小而美”

7月25日下午&#xff0c;在上海世博中心的阿里云峰会上海站上&#xff0c;阿里巴巴小程序繁星计划以展区加开放式论坛形式&#xff0c;与各领域开发者、企业和生态合作伙伴充分交流了小程序一云多端的规划和进展&#xff0c;以及阿里系各端APP向小程序开放的资源和能力。 与会者…

REDIS 关键配置简述

配置关键词配置说明daemonize开启守护式进程后台启动&#xff0c; 将no改为yes 默认前台启动bind指定red只接收来自该ip的请求isport监听端口&#xff0c;默认6379databases设置数据库的个数&#xff0c;默认16个&#xff0c;默认使用的数据库是0save设置redis进行数据库镜像的…

快速验证业务决策,“玩转”用户增长

背景 闲鱼目前已经是国内最大的闲置物品交易平台&#xff0c;每天都有数以千万计的用户过来闲鱼&#xff0c;以C2C交易为主。在闲鱼里面&#xff0c;用户的C2C购物频率其实是很低的&#xff0c;而纯粹地逛商品feed流是一件挺无聊的事情。在业务上做加法&#xff0c;突破闲鱼用…

JavaScript-函数

函数 定义函数 定义方式一 绝对值函数 function abs(x) {if (x>0){return x;}else{return -x;} }一旦执行到return 代表函数结束&#xff0c;返回结果&#xff01; 如果没有执行return&#xff0c;函数执行完也会返回结果&#xff0c;结果就是NaN / undefined 定义方式二…

领航智变时代 2020 NAVIGATE领航者峰会云上起航

4月20日&#xff0c;由紫光集团和旗下新华三集团主办的2020 NAVIGATE领航者峰会首次全面移师线上&#xff0c;盛大启航。本次线上峰会从4月20日到25日持续6天&#xff0c;以“智变”为主题&#xff0c;通过33个专题&#xff0c;超过120场演讲&#xff0c;聚焦探索智能时代的智与…

在阿里,我如何做好技术项目管理?

阿里妹导读&#xff1a;在技术公司、尤其是互联网公司&#xff0c;技术人员作为PM(项目经理)是非常常见的。有些同学得心应手&#xff0c;有条不紊&#xff0c;能得到清晰稳定的预期结果&#xff1b;有些同学则在过程中遇到各种闹心的事&#xff0c;最后不是项目上不了线&#…

云原生化的迁云实战

云原生的时代已经到来&#xff0c;云原生技术正在重塑整个软件生命周期&#xff0c;阿里巴巴是国内最早布局云原生技术的公司之一。 容器服务团队在过去的几年时间内帮助很多用户成功把业务云原生化并迁移上云&#xff0c;其中有现在已经是我们TOP10的大客户&#xff0c;也有需…

超详细!一文告诉你 SparkStreaming 如何整合 Kafka !附代码可实践

来源 | Alice菌责编 | Carol封图 | CSDN 下载于视觉中国相信很多小伙伴已经接触过 SparkStreaming 了&#xff0c;理论就不讲太多了&#xff0c;今天的内容主要是为大家带来的是 SparkStreaming 整合 Kafka 的教程。文中含代码&#xff0c;感兴趣的朋友可以复制动手试试&#…

PerfDog-移动端性能测试-基本使用

常见的腾讯性能测试工具&#xff1a;腾讯gt、腾讯wetest、腾讯perfdog 腾讯perfdog&#xff1a; https://perfdog.qq.com/ 一、介绍&#xff1a; 移动全平台iOS/Android性能测试、分析工具平台。快速定位分析性能问题&#xff0c;提升APP应用及游戏性能和品质。手机无需ROOT/越…

通过windows客户端访问

文章目录1. 原因2. 解决方案1. 原因 1.防火墙已开启 2.redis服务没启动 3.redis默认只能本地访问 2. 解决方案 1.安装Redis客户端,推荐redis-desktop-manager 2.修改配置文件redis.conf 为bind 0.0.0.0 设置密码 3.修改后kill -9 xxx停止redis的进程号&#xff0c;重启启动redi…

网易云音乐的消息队列改造之路

十年文案老司机&#xff0c;不如网易评论区。 网易云音乐自2013年上线后&#xff0c;业务保持了高速增长。云音乐除了提供好听的音乐外&#xff0c;还留下了我们在乐和人上的美好回忆。本文整理自网易云音乐消息队列负责人林德智在近期 Apache Flink&RocketMQ Meetup 上海…

OSS在线迁移服务剖析

在前迁移说明 目前由于 OSS 数据迁移服务涉及到对目标的 OSS 要有很多 action 的 API 授权&#xff0c;为避免用户产生过多的学习成本&#xff0c;我们直接强制使用主账号进行迁移&#xff1b;该服务正在公测中&#xff0c;目前仍在免费使用阶段&#xff1b;服务使用需要提前工…

JavaScript-变量的作用域 、const、let

作用域 局部函数 在javascript中&#xff0c;var定义变量实际是有作用域的。 假设在函数体中声明&#xff0c;则在函数体外不可以使用~&#xff08;如果非要使用的话&#xff0c;可以用闭包&#xff09; function qj() {var x 1;x x 1; } x x 2; // Uncaught Referenc…

idea 开启Run DashBoard

文章目录1.项目的.idea文件夹下&#xff0c;打开workspace.xml文件2. 添加 RunDashboard 节点&#xff1a;IDEA中&#xff0c;run dashboard是一个直观、方便好用的面板 1.项目的.idea文件夹下&#xff0c;打开workspace.xml文件 2. 添加 RunDashboard 节点&#xff1a; <co…

那些你不知道的 LVS 秘密!

作者 | 故事凌责编 | 郭芮近来在群里,看到大家说对lvskeepalived不太了解&#xff0c;我想我应该是有发言权的。自己本身就是运维出身&#xff0c;原来在京东物流的时候&#xff0c;lvskeepalived就是仓库物流在用的&#xff0c;踩了很多坑&#xff0c;只不过后来都上云了&…

从零到破万节点!支撑618大促背后的蚂蚁金服Kubernetes集群

2019年天猫618大促&#xff0c;蚂蚁金服首次在大促中对调度系统和技术栈全面应用Kubernetes&#xff0c;突破了Kubernetes单集群万节点的规模&#xff0c;总节点数达到数十万个&#xff0c;这是世界最大规模的 Kubernetes 集群之一&#xff0c;而这距离开发团队下载Kubernetes代…

Vue3和Vue2的区别

前言 Vue 3的文章也跟新了不少&#xff0c;相比vue2还是有很多区别的&#xff0c;有许多重要的变化和改进。以下是 Vue 3 相对于 Vue 2 的一些主要区别&#xff1a; 生命周期函数 生命周期函数基本和vue2差不多&#xff0c;名字前加了on&#xff1b; 具体如下&#xff1a; …

使用npm失败解决方案

npm config rm proxy npm config rm https-proxy

MongoDB 定位 oplog 必须全表扫描吗?

MongoDB oplog &#xff08;类似于 MySQL binlog&#xff09; 记录数据库的所有修改操作&#xff0c;除了用于主备同步&#xff1b;oplog 还能玩出很多花样&#xff0c;比如 全量备份 增量备份所有的 oplog&#xff0c;就能实现 MongoDB 恢复到任意时间点的功能通过 oplog&am…