从Flink的Kafka消费者看算子联合列表状态的使用

背景

算子的联合列表状态是平时使用的比较少的一种状态,本文通过kafka的消费者实现来看一下怎么使用算子列表联合状态

算子联合列表状态

首先我们看一下算子联合列表状态的在进行故障恢复或者从某个保存点进行扩缩容启动应用时状态的恢复情况
在这里插入图片描述
算子联合列表状态主要由这两个方法处理:
1初始化方法

public final void initializeState(FunctionInitializationContext context) throws Exception {OperatorStateStore stateStore = context.getOperatorStateStore();// 在初始化方法中获取联合列表状态this.unionOffsetStates =stateStore.getUnionListState(new ListStateDescriptor<>(OFFSETS_STATE_NAME,createStateSerializer(getRuntimeContext().getExecutionConfig())));if (context.isRestored()) {restoredState = new TreeMap<>(new KafkaTopicPartition.Comparator());
// 把联合列表状态的数据都恢复成类的本地变量中// populate actual holder for restored statefor (Tuple2<KafkaTopicPartition, Long> kafkaOffset : unionOffsetStates.get()) {restoredState.put(kafkaOffset.f0, kafkaOffset.f1);}LOG.info("Consumer subtask {} restored state: {}.",getRuntimeContext().getIndexOfThisSubtask(),restoredState);} else {LOG.info("Consumer subtask {} has no restore state.",getRuntimeContext().getIndexOfThisSubtask());}}

2.开始通知检查点开始的方法:

public final void snapshotState(FunctionSnapshotContext context) throws Exception {if (!running) {LOG.debug("snapshotState() called on closed source");} else {unionOffsetStates.clear();final AbstractFetcher<?, ?> fetcher = this.kafkaFetcher;if (fetcher == null) {// the fetcher has not yet been initialized, which means we need to return the// originally restored offsets or the assigned partitionsfor (Map.Entry<KafkaTopicPartition, Long> subscribedPartition :subscribedPartitionsToStartOffsets.entrySet()) {// 进行checkpoint时,把数据保存到联合列表状态中进行保存unionOffsetStates.add(Tuple2.of(subscribedPartition.getKey(), subscribedPartition.getValue()));}if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) {// the map cannot be asynchronously updated, because only one checkpoint call// can happen// on this function at a time: either snapshotState() or// notifyCheckpointComplete()pendingOffsetsToCommit.put(context.getCheckpointId(), restoredState);}} else {HashMap<KafkaTopicPartition, Long> currentOffsets = fetcher.snapshotCurrentState();if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) {// the map cannot be asynchronously updated, because only one checkpoint call// can happen// on this function at a time: either snapshotState() or// notifyCheckpointComplete()pendingOffsetsToCommit.put(context.getCheckpointId(), currentOffsets);}for (Map.Entry<KafkaTopicPartition, Long> kafkaTopicPartitionLongEntry :currentOffsets.entrySet()) {unionOffsetStates.add(Tuple2.of(kafkaTopicPartitionLongEntry.getKey(),kafkaTopicPartitionLongEntry.getValue()));}}if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) {// truncate the map of pending offsets to commit, to prevent infinite growthwhile (pendingOffsetsToCommit.size() > MAX_NUM_PENDING_CHECKPOINTS) {pendingOffsetsToCommit.remove(0);}}}}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/107886.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

用建筑中智能消防应急照明系统的应用

【摘要】&#xff1a;火灾应急照明是火灾安全疏散、保障消防人员生命安全的关键。对电气设计人员来说&#xff0c;火灾紧急照明系统的设计非常必要&#xff0c;消防紧急照明系统启动与其正常工作状态有直接的关系&#xff0c;但由于其存在的问题通常不能被及时发现&#xff0c;…

ubuntu 22.04版本修改服务器名、ip,dns信息的操作方法

总结 1、ubuntu修改服务器名重启后生效的方法是直接修改/etc/hostname文件 2、ubuntu 22.04操作系统配置ip和dns信息&#xff0c;一般只需要使用netplan命令行工具来配置就行&#xff0c;在/etc/netplan/在目录下创建一个yaml文件就可以实现ip和dns的配置&#xff0c;当然如果…

C#,数值计算——分类与推理Gaumixmod的计算方法与源程序

1 文本格式 using System; using System.Collections.Generic; namespace Legalsoft.Truffer { public class Gaumixmod { private int nn { get; set; } private int kk { get; set; } private int mm { get; set; } private double…

【SA8295P 源码分析 (一)】62 - Android GVM Kernel 内核 make bootimage 过程分析

【SA8295P 源码分析】62 - Android GVM Kernel 内核 make bootimage 过程分析 一、make bootimage 命令执行过程分析1.1 source buid/envsetup.sh 分析1.2 lunch msmnile_gvmq-userdebug 分析1.3 make bootimage:step 1 之 加载配置文件过程分析1.4 make bootimage:step 2 之…

CEC2013(MATLAB):墨西哥蝾螈优化算法(Mexican Axolotl Optimization,MAO)求解CEC2013

一、墨西哥蝾螈优化算法MAO 墨西哥蝾螈优化算法&#xff08;Mexican Axolotl Optimization&#xff0c;MAO&#xff09;由Yenny Villuendas-Rey 1等人于2021年提出&#xff0c;该算法具有较强的平衡全局搜索与局部搜索能力。 参考文献&#xff1a; [1]Villuendas-Rey, Yenny,…

opencv定位图片中的图案?

使用opencv模板匹配方式。 这种方式针对从原始图片中直接扣小图情况比较好。 import cv2 as cv2def find_positions(image_path, small_image_path):# 读取大图和小图large_image cv2.imread(image_path)small_image cv2.imread(small_image_path)# 小图规格small_image_h, …

07-ConfigurationClassPostProces的解析

文章目录 如何解析Component,Service,Configurationd,Bean,Import等注解1. 源码描述2. 类继承结构图3. 解析流程4. 具体的注解解析 如何解析Component,Service,Configurationd,Bean,Import等注解 1. 源码描述 BeanFactoryPostProcessor used for bootstrapping processing of…

Android Fragment 基本概念和基本使用

Android Fragment 基本概念和基本使用 一、基本概念 Fragment&#xff0c;简称碎片&#xff0c;是Android 3.0&#xff08;API 11&#xff09;提出的&#xff0c;为了兼容低版本&#xff0c;support-v4库中也开发了一套Fragment API&#xff0c;最低兼容Android 1.6。 过去s…

学术特稿 | 著名书法家项国就:中国古代书法章草美学展现的形式分析

、 论文入编&#xff1a;大型综合美术类核心期刊《新美域》杂志2023年第七期。 中国古代书法章草美学展现的形式分析 摘要&#xff1a;本文旨在探讨中国古代书法风格章草的美学特点、审美价值以及代表性作品和艺术家。章草的美学特点体现在简洁流畅的笔画、清晰规整的字形结构以…

【JAVA】日志打印java.util.logging.*函数自定义格式,并且显示调用时行号

1、JAVA自带的这样&#xff1a; 代码如下&#xff1a; import java.util.logging.*; Logger logger Logger.getLogger(MyLogger.class.toString()); logger.info("123");显示效果&#xff1a; 这样的格式&#xff0c;看起来不太好看&#xff0c;比如&#xff1a;会…

【Unity3D编辑器开发】Unity3D中实现Transform组件拓展,快速复制、粘贴、复原【非常实用】

推荐阅读 CSDN主页GitHub开源地址Unity3D插件分享简书地址我的个人博客 大家好&#xff0c;我是佛系工程师☆恬静的小魔龙☆&#xff0c;不定时更新Unity开发技巧&#xff0c;觉得有用记得一键三连哦。 一、前言 在开发中&#xff0c;常常会遇到频繁复制粘贴物体的坐标、旋转…

小程序需带参数跳转

1、需要生成二维码的数据 直接在浏览器中替换成自己的appid&#xff0c;secret及可生成一个access_token https://api.weixin.qq.com/cgi-bin/token?grant_typeclient_credential&appidwxxxxx&secretxxxxx用access_token https://api.weixin.qq.com/wxa/getwxacode…

数据库、数据中台、数据仓库、数据湖区别

数据时代&#xff0c;各行业的企业都已经开始通过数据库来沉淀数据&#xff0c;但是真的论起数据库、数据仓库、数据中台&#xff0c;还是新出现的数据湖&#xff0c;它们的概念和区别&#xff0c;可能知道的人就比较少了&#xff0c;今天我们详细来比较了解一下。 一、数据仓…

Oracle基础学习

文章目录 1. oracle数据库安装2. sqlplus连接数据库方式3. 创建用户信息4. 基本概念5. 基本SQL语句6. Springboot开发 1. oracle数据库安装 安装教程 安装包地址 2. sqlplus连接数据库方式 无用户信息登录 使用用户信息登录 登录最高权限管理员&#xff0c;如果不加上as …

Spring Cloud Alibaba—Sentinel 控制台安装

1、Sentinel 控制台包含如下功能: 查看机器列表以及健康情况&#xff1a;收集 Sentinel 客户端发送的心跳包&#xff0c;用于判断机器是否在线。 监控 (单机和集群聚合)&#xff1a;通过 Sentinel 客户端暴露的监控 API&#xff0c;定期拉取并且聚合应用监控信息&#xff0c;最…

如何通过内网穿透实现远程连接NAS群晖drive并挂载电脑硬盘?

文章目录 前言1.群晖Synology Drive套件的安装1.1 安装Synology Drive套件1.2 设置Synology Drive套件1.3 局域网内电脑测试和使用 2.使用cpolar远程访问内网Synology Drive2.1 Cpolar云端设置2.2 Cpolar本地设置2.3 测试和使用 3. 结语 前言 群晖作为专业的数据存储中心&…

Access denied for user ‘itstar‘@‘%localhost‘ to database ‘sc‘

今天发现数据库报这个错&#xff0c;经过排查&#xff0c;原因如下 原因一 本地装了两个版本的mysql&#xff0c;连错数据库了。 原因二 连上正确的数据库后&#xff0c;还是报这个错&#xff0c;发现是当前用户没有这个数据库的权限&#xff0c;开通权限。问题解决 Dupli…

为什么要把 String 设计为不可变?

将字符串设计为不可变具有多个重要的原因&#xff1a; 线程安全性&#xff1a; 不可变字符串可以在多线程环境中共享而无需额外的同步措施。因为字符串不会改变&#xff0c;多个线程可以同时访问它而不会导致竞态条件或数据不一致性。 缓存和性能优化&#xff1a; 字符串不可变…

Day1力扣打卡

打卡记录 最长相邻不相等子序列 I&#xff08;脑筋急转弯&#xff09; 链接 思路&#xff1a;形如 11100110001 要达到最大&#xff0c;必须在重复数字选出一个&#xff0c;即在111中取一个1&#xff0c;在00中取一个0&#xff0c;以此类推最终便得到最长相邻不相等子序列。 c…

Elasticsearch 8.11 中的合并更少,摄取更快

作者&#xff1a;ADRIEN GRAND Elasticsearch 8.11 改进了管理索引缓存的方式&#xff0c;从而减少了段合并。 我们对 Elasticsearch 8.11 从索引缓存回收内存的方式进行了重大更改&#xff0c;这有助于减少合并开销&#xff0c;从而加快索引速度。 使用我们的日志跟踪&#x…