从Flink的Kafka消费者看算子联合列表状态的使用

背景

算子的联合列表状态是平时使用的比较少的一种状态,本文通过kafka的消费者实现来看一下怎么使用算子列表联合状态

算子联合列表状态

首先我们看一下算子联合列表状态的在进行故障恢复或者从某个保存点进行扩缩容启动应用时状态的恢复情况
在这里插入图片描述
算子联合列表状态主要由这两个方法处理:
1初始化方法

public final void initializeState(FunctionInitializationContext context) throws Exception {OperatorStateStore stateStore = context.getOperatorStateStore();// 在初始化方法中获取联合列表状态this.unionOffsetStates =stateStore.getUnionListState(new ListStateDescriptor<>(OFFSETS_STATE_NAME,createStateSerializer(getRuntimeContext().getExecutionConfig())));if (context.isRestored()) {restoredState = new TreeMap<>(new KafkaTopicPartition.Comparator());
// 把联合列表状态的数据都恢复成类的本地变量中// populate actual holder for restored statefor (Tuple2<KafkaTopicPartition, Long> kafkaOffset : unionOffsetStates.get()) {restoredState.put(kafkaOffset.f0, kafkaOffset.f1);}LOG.info("Consumer subtask {} restored state: {}.",getRuntimeContext().getIndexOfThisSubtask(),restoredState);} else {LOG.info("Consumer subtask {} has no restore state.",getRuntimeContext().getIndexOfThisSubtask());}}

2.开始通知检查点开始的方法:

public final void snapshotState(FunctionSnapshotContext context) throws Exception {if (!running) {LOG.debug("snapshotState() called on closed source");} else {unionOffsetStates.clear();final AbstractFetcher<?, ?> fetcher = this.kafkaFetcher;if (fetcher == null) {// the fetcher has not yet been initialized, which means we need to return the// originally restored offsets or the assigned partitionsfor (Map.Entry<KafkaTopicPartition, Long> subscribedPartition :subscribedPartitionsToStartOffsets.entrySet()) {// 进行checkpoint时,把数据保存到联合列表状态中进行保存unionOffsetStates.add(Tuple2.of(subscribedPartition.getKey(), subscribedPartition.getValue()));}if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) {// the map cannot be asynchronously updated, because only one checkpoint call// can happen// on this function at a time: either snapshotState() or// notifyCheckpointComplete()pendingOffsetsToCommit.put(context.getCheckpointId(), restoredState);}} else {HashMap<KafkaTopicPartition, Long> currentOffsets = fetcher.snapshotCurrentState();if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) {// the map cannot be asynchronously updated, because only one checkpoint call// can happen// on this function at a time: either snapshotState() or// notifyCheckpointComplete()pendingOffsetsToCommit.put(context.getCheckpointId(), currentOffsets);}for (Map.Entry<KafkaTopicPartition, Long> kafkaTopicPartitionLongEntry :currentOffsets.entrySet()) {unionOffsetStates.add(Tuple2.of(kafkaTopicPartitionLongEntry.getKey(),kafkaTopicPartitionLongEntry.getValue()));}}if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) {// truncate the map of pending offsets to commit, to prevent infinite growthwhile (pendingOffsetsToCommit.size() > MAX_NUM_PENDING_CHECKPOINTS) {pendingOffsetsToCommit.remove(0);}}}}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/107886.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

用建筑中智能消防应急照明系统的应用

【摘要】&#xff1a;火灾应急照明是火灾安全疏散、保障消防人员生命安全的关键。对电气设计人员来说&#xff0c;火灾紧急照明系统的设计非常必要&#xff0c;消防紧急照明系统启动与其正常工作状态有直接的关系&#xff0c;但由于其存在的问题通常不能被及时发现&#xff0c;…

C#,数值计算——分类与推理Gaumixmod的计算方法与源程序

1 文本格式 using System; using System.Collections.Generic; namespace Legalsoft.Truffer { public class Gaumixmod { private int nn { get; set; } private int kk { get; set; } private int mm { get; set; } private double…

CEC2013(MATLAB):墨西哥蝾螈优化算法(Mexican Axolotl Optimization,MAO)求解CEC2013

一、墨西哥蝾螈优化算法MAO 墨西哥蝾螈优化算法&#xff08;Mexican Axolotl Optimization&#xff0c;MAO&#xff09;由Yenny Villuendas-Rey 1等人于2021年提出&#xff0c;该算法具有较强的平衡全局搜索与局部搜索能力。 参考文献&#xff1a; [1]Villuendas-Rey, Yenny,…

opencv定位图片中的图案?

使用opencv模板匹配方式。 这种方式针对从原始图片中直接扣小图情况比较好。 import cv2 as cv2def find_positions(image_path, small_image_path):# 读取大图和小图large_image cv2.imread(image_path)small_image cv2.imread(small_image_path)# 小图规格small_image_h, …

07-ConfigurationClassPostProces的解析

文章目录 如何解析Component,Service,Configurationd,Bean,Import等注解1. 源码描述2. 类继承结构图3. 解析流程4. 具体的注解解析 如何解析Component,Service,Configurationd,Bean,Import等注解 1. 源码描述 BeanFactoryPostProcessor used for bootstrapping processing of…

Android Fragment 基本概念和基本使用

Android Fragment 基本概念和基本使用 一、基本概念 Fragment&#xff0c;简称碎片&#xff0c;是Android 3.0&#xff08;API 11&#xff09;提出的&#xff0c;为了兼容低版本&#xff0c;support-v4库中也开发了一套Fragment API&#xff0c;最低兼容Android 1.6。 过去s…

学术特稿 | 著名书法家项国就:中国古代书法章草美学展现的形式分析

、 论文入编&#xff1a;大型综合美术类核心期刊《新美域》杂志2023年第七期。 中国古代书法章草美学展现的形式分析 摘要&#xff1a;本文旨在探讨中国古代书法风格章草的美学特点、审美价值以及代表性作品和艺术家。章草的美学特点体现在简洁流畅的笔画、清晰规整的字形结构以…

【JAVA】日志打印java.util.logging.*函数自定义格式,并且显示调用时行号

1、JAVA自带的这样&#xff1a; 代码如下&#xff1a; import java.util.logging.*; Logger logger Logger.getLogger(MyLogger.class.toString()); logger.info("123");显示效果&#xff1a; 这样的格式&#xff0c;看起来不太好看&#xff0c;比如&#xff1a;会…

【Unity3D编辑器开发】Unity3D中实现Transform组件拓展,快速复制、粘贴、复原【非常实用】

推荐阅读 CSDN主页GitHub开源地址Unity3D插件分享简书地址我的个人博客 大家好&#xff0c;我是佛系工程师☆恬静的小魔龙☆&#xff0c;不定时更新Unity开发技巧&#xff0c;觉得有用记得一键三连哦。 一、前言 在开发中&#xff0c;常常会遇到频繁复制粘贴物体的坐标、旋转…

小程序需带参数跳转

1、需要生成二维码的数据 直接在浏览器中替换成自己的appid&#xff0c;secret及可生成一个access_token https://api.weixin.qq.com/cgi-bin/token?grant_typeclient_credential&appidwxxxxx&secretxxxxx用access_token https://api.weixin.qq.com/wxa/getwxacode…

数据库、数据中台、数据仓库、数据湖区别

数据时代&#xff0c;各行业的企业都已经开始通过数据库来沉淀数据&#xff0c;但是真的论起数据库、数据仓库、数据中台&#xff0c;还是新出现的数据湖&#xff0c;它们的概念和区别&#xff0c;可能知道的人就比较少了&#xff0c;今天我们详细来比较了解一下。 一、数据仓…

Oracle基础学习

文章目录 1. oracle数据库安装2. sqlplus连接数据库方式3. 创建用户信息4. 基本概念5. 基本SQL语句6. Springboot开发 1. oracle数据库安装 安装教程 安装包地址 2. sqlplus连接数据库方式 无用户信息登录 使用用户信息登录 登录最高权限管理员&#xff0c;如果不加上as …

Spring Cloud Alibaba—Sentinel 控制台安装

1、Sentinel 控制台包含如下功能: 查看机器列表以及健康情况&#xff1a;收集 Sentinel 客户端发送的心跳包&#xff0c;用于判断机器是否在线。 监控 (单机和集群聚合)&#xff1a;通过 Sentinel 客户端暴露的监控 API&#xff0c;定期拉取并且聚合应用监控信息&#xff0c;最…

如何通过内网穿透实现远程连接NAS群晖drive并挂载电脑硬盘?

文章目录 前言1.群晖Synology Drive套件的安装1.1 安装Synology Drive套件1.2 设置Synology Drive套件1.3 局域网内电脑测试和使用 2.使用cpolar远程访问内网Synology Drive2.1 Cpolar云端设置2.2 Cpolar本地设置2.3 测试和使用 3. 结语 前言 群晖作为专业的数据存储中心&…

Day1力扣打卡

打卡记录 最长相邻不相等子序列 I&#xff08;脑筋急转弯&#xff09; 链接 思路&#xff1a;形如 11100110001 要达到最大&#xff0c;必须在重复数字选出一个&#xff0c;即在111中取一个1&#xff0c;在00中取一个0&#xff0c;以此类推最终便得到最长相邻不相等子序列。 c…

Elasticsearch 8.11 中的合并更少,摄取更快

作者&#xff1a;ADRIEN GRAND Elasticsearch 8.11 改进了管理索引缓存的方式&#xff0c;从而减少了段合并。 我们对 Elasticsearch 8.11 从索引缓存回收内存的方式进行了重大更改&#xff0c;这有助于减少合并开销&#xff0c;从而加快索引速度。 使用我们的日志跟踪&#x…

maven 常用知识速记

创建项目 maven archetype:generate依赖范围 有如下依赖示例&#xff1a; <dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.7</version><scope>test</scope> </dependency>其中…

力扣刷题 day46:10-16

1.最大整除子集 给你一个由 无重复 正整数组成的集合 nums &#xff0c;请你找出并返回其中最大的整除子集 answer &#xff0c;子集中每一元素对 (answer[i], answer[j]) 都应当满足&#xff1a; answer[i] % answer[j] 0 &#xff0c;或 answer[j] % answer[i] 0 如果存在…

百度测试开发工程师面试心得

百度测试开发实习生面试心得&#xff1a; 电话面试&#xff1a; 面试官&#xff1a;首先做一下自我介绍吧 我&#xff1a;我是***&#xff0c;来自什么大学&#xff0c;现在大三&#xff0c;在学校期间担任过部长&#xff0c;副主席等职务&#xff0c; 组织举办了很多比赛&…

DITA-OT 4.0新特性 - PDF themes,定制PDF样式的新方法

随着DITA-OT 4.0的发布&#xff0c;它提供了一种新的定制PDF样式方法&#xff0c;这种方法就是PDF theme。这篇文章来聊一聊这种定制PDF输出的新方法和实验结果。 在进入PDF theme细节之前&#xff0c;为各位读者梳理一下DITA-OT将DITA和Markdown发布成PDF的几种方法。 - 1 …