Flink 计数器Accumulator

简述

在 Apache Flink 中,Accumulator 是一个用于收集作业执行期间聚合信息的工具。它允许在用户定义的函数(如 MapFunction, FlatMapFunction, ProcessFunction 等)中累积值,并在作业完成后检索这些值。这对于跟踪诸如事件数量、处理延迟等统计信息非常有用。

要使用 Accumulator,需要首先定义一个 Accumulator 接口的实现,然后在用户定义函数中注册和使用它。

1. 定义 Accumulator:

通常,不需要直接定义 Accumulator 接口的实现,因为 Flink 已经为提供了一些内置的 Accumulator 类型,如 IntCounter, LongCounter, DoubleCounter 等。但如果需要自定义的聚合逻辑,可以实现 Accumulator 接口。

2. 在函数中使用 Accumulator:

在用户定义函数中,可以通过 getRuntimeContext().getAccumulator(“name”) 来获取或注册一个 Accumulator。然后,可以在逻辑中更新它的值。

3. 检索 Accumulator 的值:

在作业执行完成后,可以通过 JobExecutionResult 的 getAccumulatorResult() 方法来检索 Accumulator 的值。

但请注意,由于 Accumulator 已经被 Metric 系统所取代,以下是一个使用 Metric 的示例,它提供了类似的功能:

import org.apache.flink.api.common.functions.RuntimeContext;  
import org.apache.flink.metrics.Counter;  
import org.apache.flink.streaming.api.functions.source.SourceFunction;  public class MySourceFunction implements SourceFunction<String> {  private transient Counter counter;  @Override  public void open(Configuration parameters) throws Exception {  super.open(parameters);  // 获取或注册一个 Counter  this.counter = getRuntimeContext().getMetricGroup().counter("my-counter");  }  @Override  public void run(SourceContext<String> ctx) throws Exception {  // ... 数据源逻辑 ...  // 更新 Counter 的值  counter.inc();  // 发送数据到下游  ctx.collect("some data");  }  // ... 其他必要的方法 ...  
}

在这个示例中,我们使用了 Flink 的 Metric 系统来创建一个计数器,并在数据源函数中更新它的值。这样,就可以在作业执行期间跟踪和检索这个计数器的值了。

代码

package com.wfg.flink.connector.mongodb;import cn.hutool.core.date.DateUtil;
import com.alibaba.fastjson2.JSONObject;
import com.wfg.flink.connector.dto.KafkaPvDto;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.accumulators.LongCounter;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.connector.mongodb.source.MongoSource;
import org.apache.flink.connector.mongodb.source.enumerator.splitter.PartitionStrategy;
import org.apache.flink.connector.mongodb.source.reader.deserializer.MongoDeserializationSchema;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.bson.BsonDocument;import static com.wfg.flink.connector.constants.Constants.MONGO_TEST_PV_COLLECTION;/*** @author wfg* 根据名字统计访问次数*/
public class MongoAccumulatorCounts {public static void main(String[] args) throws Exception {String startTime = DateUtil.format(DateUtil.date(), "yyyy-MM-dd HH:mm:ss");System.out.println("StartTime:" + startTime);final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 开启Checkpointing,设置Checkpoint间隔env.enableCheckpointing(30000);// 设置Checkpoint模式env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);// 设置最小Checkpoint间隔env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);// 设置最大并发Checkpoint数目env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);// 使用RocksDB作为状态后端env.setStateBackend(new HashMapStateBackend());env.setParallelism(10);// 配置MongoDB源MongoSource<String> mongoSource = MongoSource.<String>builder().setUri("mongodb://root:123456@127.0.0.1:27017,127.0.0.1:27018,127.0.0.1:27019/admin?replicaSet=rs0&authSource=admin").setDatabase("sjzz").setCollection(MONGO_TEST_PV_COLLECTION).setFetchSize(2048)
//                .setLimit(10000).setNoCursorTimeout(true).setPartitionStrategy(PartitionStrategy.SINGLE).setPartitionSize(MemorySize.ofMebiBytes(64)).setDeserializationSchema(new MongoDeserializationSchema<>() {@Overridepublic String deserialize(BsonDocument document) {return document.toJson();}@Overridepublic TypeInformation<String> getProducedType() {return BasicTypeInfo.STRING_TYPE_INFO;}}).build();// 创建MongoDB数据流DataStream<String> sourceStream = env.fromSource(mongoSource, WatermarkStrategy.noWatermarks(), "kafka Mongo Source");
// 转换数据,提取人名作为KeyDataStream<Tuple2<String, Integer>> nameCountStream = sourceStream.map(new RichMapFunction<String, Tuple2<String, Integer>>() {private LongCounter elementCounter = new LongCounter();Long count = 0L;@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);//-2注册累加器getRuntimeContext().addAccumulator("elementCounter", elementCounter);}@Overridepublic Tuple2<String, Integer> map(String value) {KafkaPvDto data = JSONObject.parseObject(value, KafkaPvDto.class);//-3.使用累加器this.elementCounter.add(1);count += 1;System.out.println("不使用累加器统计的结果:" + count);return Tuple2.of(data.getUserName(), 1);}}).setParallelism(10);
//                .keyBy(value->value.f0)
//                 .sum("f1");sourceStream.writeAsText("data/output/test", FileSystem.WriteMode.OVERWRITE);
//-4.获取加强结果JobExecutionResult jobResult = env.execute();long nums = jobResult.getAccumulatorResult("elementCounter");System.out.println("使用累加器统计的结果:" + nums);System.out.println("-----------------------------------");System.out.println("startTime: " + startTime);System.out.println("EndTime: " + DateUtil.format(DateUtil.date(), "yyyy-MM-dd HH:mm:ss"));}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/855056.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

使用 Hugging Face 推理终端搭建强大的“语音识别 + 说话人分割 + 投机解码”工作流

Whisper 是当前最先进的开源语音识别模型之一&#xff0c;毫无疑问&#xff0c;也是应用最广泛的模型。如果你想部署 Whisper 模型&#xff0c;Hugging Face推理终端能够让你开箱即用地轻松部署任何 Whisper 模型。但是&#xff0c;如果你还想叠加其它功能&#xff0c;如用于分…

项目实战中学透Spring-业务场景驱动-Spring01(IOCDI)

软件环境 JDK1.8 Maven3.6 IDEA2022.3(Ultimate Edition) Spring5.3.29 主要知识点大纲 1.Spring简介 2.Spring整体架构 3.业务场景中理解Spring IOC(控制反转)和DI(依赖注入) 4.业务场景中理解IOC容器&#xff0c;实例化容器&#xff0c;实例化Bean的几种方式 5.业务…

java基础·小白入门(一)

目录 Java语言概述Java的性质三种平台跨平台原理 Java语言开发环境相关概念Java开发工具的安装Java程序的编译与运行基本注意事项 Java语言基础数据类型基本数据类型引用数据类型 关键字与标识符常量与变量常量变量 数据类型转换常见运算符 Java语言概述 这一部分主要讲讲Java的…

Chrome扩展程序开发新手指南:事件监听器的应用技巧

问题背景 最近我在开发Chrome浏览器插件时&#xff0c;遇到了一个需要脚本初始化的问题。在插件被安装后或浏览器标签页被刷新时&#xff0c;我需要重新初始化插件。为了实现这一点&#xff0c;我研究了Chrome提供的几个API接口&#xff0c;它们分别是chrome.runtime.onInstal…

[面试题]Redis

[面试题]Java【基础】[面试题]Java【虚拟机】[面试题]Java【并发】[面试题]Java【集合】[面试题]MySQL[面试题]Maven[面试题]Spring Boot[面试题]Spring Cloud[面试题]Spring MVC[面试题]Spring[面试题]MyBatis[面试题]Nginx[面试题]缓存[面试题]Redis 什么是 Redis &#xff…

随机产生一些江河上的坐标数据

不久前收到一个需求&#xff0c;说要随机创建约一百个某段江河上的坐标点&#xff0c;用于做一些数据呈现。 我首先是想到用AI直接给我一点数据&#xff0c;没想到给出来的坐标&#xff0c;有许多都落在陆地上&#xff0c;根本不符合我的要求。后来结合AI给出的建议&#xff0…

生成对抗网络——GAN深度卷积实现(代码+理解)

本篇博客为 上篇博客的 另一个实现版本&#xff0c;训练流程相同&#xff0c;所以只实现代码&#xff0c;感兴趣可以跳转看一下。 生成对抗网络—GAN&#xff08;代码理解&#xff09; http://t.csdnimg.cn/HDfLOhttp://t.csdnimg.cn/HDfLO 目录 一、GAN深度卷积实现 1. 模型…

面试题分享--Spring02

Spring 框架中都用到了哪些设计模式?(必会) 1. 工厂模式&#xff1a;BeanFactory 就是简单工厂模式的体现&#xff0c;用来创建对象的实例 2. 单例模式&#xff1a;Bean 默认为单例模式 3. 代理模式&#xff1a;Spring 的 AOP 功能用到了 JDK 的动态代理和 CGLIB 字节码生成…

The First项目报告:深度解读Layer 2生态zkSync

zkSync发币了&#xff0c;这个无数撸毛党心心念念数年之久的项目终于要来了&#xff0c;zkSync 是由Matter Labs 于2019 年推出的以太坊Layer 2 扩容解决方案&#xff0c;作为L2龙头项目之一&#xff0c;与其同属一个层次的L2四大天王之三Optimism、Arbitrum、zkSync、StarkNet…

Profibus协议转Modbus协议网关模块帮助PLC实现智能激光设备通讯

一、前言 Profibus转Modbus网关&#xff08;XD-MDPB100&#xff09;是一种工业通信协议转换设备&#xff0c;用于实现Profibus协议与Modbus协议之间的转换。Profibus转Modbus网关在工业自动化系统中具有广泛的应用&#xff0c;它解决了不同协议设备之间的通信问题。本文将深入…

怎么样判断真假单北斗

国产化替代正在中国各行各业逐步提升中&#xff0c;特别涉及重点产业——国家安全&#xff01; 只有仅支持B1I和B3信号的芯片才是真正的单北斗芯片。但凡你支持了B1C、B2a、B2b中的一个就是假的单北斗。 B1C/L1/E1、B2a/ L5/E5a、B2b/G3/E5b这些频点与其他GNSS系统是完全重合的…

湖北科技学院2024年成人高等继续教育招生简章

湖北科技学院&#xff0c;这所坐落在荆楚大地的高等学府&#xff0c;一直以来都是培养各类专业人才的重要基地。随着社会的快速发展&#xff0c;终身学习的理念深入人心&#xff0c;成人高等继续教育作为满足广大成年人提升学历、增强职业技能的重要途径&#xff0c;受到了越来…

Java输入输出语句 和 保留字

目录 键盘输入语句 保留字 键盘输入语句 Input.java , 需要一个 扫描器(对象), 就是Scanner 步骤 &#xff1a; 导入该类的所在包, java.util.*创建该类对象&#xff08;声明变量&#xff09;调用里面的功能 案例要求&#xff1a;可以从控制台接收用户信息&#xff0c;【姓…

润滑不良:滚珠花键磨损的隐形杀手!

滚珠花键作为一种精密机械传动元件&#xff0c;被广泛应用于各种机器和设备中&#xff0c;起着传递动力和运动的重要作用。滚珠花键经过长时间的运行&#xff0c;难免会多少些磨损&#xff0c;严重的话还会导致设备不能正常运转。那么&#xff0c;如何保证它的正常运行呢&#…

88. 合并两个有序数组(简单)

88. 合并两个有序数组 1. 题目描述2.详细题解3.代码实现3.1 Python3.2 Java 1. 题目描述 题目中转&#xff1a;88. 合并两个有序数组 2.详细题解 两个数组均有序&#xff08;非递减&#xff09;&#xff0c;要求合并两个数组&#xff0c;直观的思路&#xff0c;借助第三个数…

【Linux环境下Hadoop部署】—报错“Unit ntpd.service could not be found.“

项目场景&#xff1a; 执行 “systemctl status ntpd” 命令。 问题描述 报错&#xff1a;Unit ntpd.service could not be found. 原因分析&#xff1a; 没有安装ntp 解决方案&#xff1a; 执行 “yum install ntp” 命令&#xff0c;再次执行 “systemctl status ntpd” 命令…

Docker部署私有仓库Harbor

Harbor构建Docker私有仓库 文章目录 Harbor构建Docker私有仓库资源列表一、部署Docker-Compose服务1.1、下载最新Docker-Compose1.2、查看Docker-Compose版本 二、部署Harbor服务2.1、下载Harbor安装程序2.2、配置Harbor参数文件2.3、所需参数和可选参数2.3.1、所需参数2.3.2、…

平庸的学术工作者

自己进入学术这条路&#xff0c;差不多十年了&#xff0c;回想自己目前的成果&#xff0c;自我评价为平庸。如果将同领域清华的年轻学者打分为 100 分的话&#xff0c;我将自己打分 65。 到目前为止&#xff0c;并不觉得智力因素在管理科学与工程领域的科研中有太大决定作用&a…

Demeditec Diagnostics DmbH兽医诊断类科研试剂

此类产品包括用于多种动物物种包括狗(犬)、猫、牛、马、大鼠和小鼠等分析物的检测。 皮质酮酶联免疫检测试剂盒(Corticosterone rat/mouse Elisa Kit) 产品编号&#xff1a;DEV9922 皮质酮是肾上腺皮质在应答促肾上腺皮质激素时所产生的一种糖皮质激素&#xff0c;是醛甾酮前…

CP AUTOSAR标准之MemoryDriver(AUTOSAR_CP_SWS_MemoryDriver)

1 简介和功能概述 该规范描述了AUTOSAR基础软件模块内存驱动程序(Mem)的功能、API和配置。   内存驱动程序提供访问不同类型内存设备的基本服务,如读取、写入、擦除和空白检查。   尽管闪存仍然是最常见的非易失性存储器技术,但内存驱动程序规范考虑了所有相关的内存设备…