62、Flink自定义 SimpleAccumulator 代码示例

1、概述

需要实现 SimpleAccumulator 接口,并重写 add,getLocalValue,resetLocal,merge,clone 等方法。

2、代码示例

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.api.common.accumulators.SimpleAccumulator;@NoArgsConstructor
@AllArgsConstructor
@Data
public class _05_CustomSimpleAccumulator implements SimpleAccumulator<Integer> {private Integer count;@Overridepublic void add(Integer value) {this.count += value;}@Overridepublic Integer getLocalValue() {return this.count;}@Overridepublic void resetLocal() {this.count = 0;}@Overridepublic void merge(Accumulator<Integer, Integer> other) {this.count += other.getLocalValue();}@Overridepublic Accumulator<Integer, Integer> clone() {return new _05_CustomSimpleAccumulator(this.count);}
}

3、使用自定义的累加器

import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;public class _05_MyCustomAccumulator {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(2);env.disableOperatorChaining();_05_CustomSimpleAccumulator counterAccumulator = new _05_CustomSimpleAccumulator(0);env.socketTextStream("localhost", 8888).keyBy(e -> e).process(new KeyedProcessFunction<String, String, String>() {@Overridepublic void open(Configuration parameters) throws Exception {getRuntimeContext().addAccumulator("word-count-with-default", counterAccumulator);}@Overridepublic void processElement(String value, KeyedProcessFunction<String, String, String>.Context ctx, Collector<String> out) throws Exception {counterAccumulator.add(1);out.collect(value);}}).print();JobExecutionResult jobExecutionResult = env.execute();Object accumulatorResult = jobExecutionResult.getAccumulatorResult("word-count-with-default");System.out.println("累加器结果=>" + accumulatorResult);}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/39087.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

mysql-5.6.26-winx64免安装版本

mysql为什么要使用免安装 MySQL 提供免安装版本主要有以下几个原因和优势&#xff1a; 便捷性&#xff1a;用户无需经历安装过程&#xff0c;直接解压即可使用。这对于需要快速部署环境或者在不支持安装权限的系统上使用MySQL非常有用。灵活性&#xff1a;免安装版允许用户将…

actual combat 32 —— RabbitMQ

1. 五种模式 RabbitMQ有五种工作模式&#xff0c;分别是简单模式、工作模式、发布订阅模式、路由模式和主题模式 简单模式&#xff08;Simple Mode&#xff09;&#xff1a;这种模式包含一个生产者和一个消费者。生产者将消息发送到队列&#xff0c;消费者从队列中获取消息。…

Optional类方法

Optional类 简介方法empty()方法of(T value)ofNullable(T value)filter(Predicate<? super T> predicate)get()ifPresent(Consumer<? super T> consumer)isPresent()map(Function<? super T,? extends U> mapper)orElse(T other)orElseGet(Supplier<?…

撸包看广告小游戏app开发现有案例搭建

开发一个撸包看广告小游戏涉及多个关键步骤和考虑因素。以下是一个大致的开发流程&#xff1a; 市场调研与定位&#xff1a; 深入了解目标用户群体的需求和偏好&#xff0c;以及市场上类似游戏的状况。 确定游戏的目标受众和定位&#xff0c;如休闲益智、动作冒险等类型。 游…

基于Chrome扩展的浏览器可信事件与网页离线PDF导出

基于Chrome扩展的浏览器可信事件与网页离线PDF导出 Chrome扩展是一种可以在浏览器中添加新功能和修改浏览器行为的软件程序&#xff0c;我们可以基于Manifest规范的API实现对于浏览器和Web页面在一定程度上的修改&#xff0c;例如广告拦截、代理控制等。Chrome DevTools Proto…

React 高频面试题1(答案和题目都是根据讯飞星火写的)

1. 解释React中的虚拟dom&#xff0c;并说明它是如何工作的。 虚拟dom是介于数据与真实dom之间的抽象层&#xff0c;是一个轻量级的JavaScript对象。react组件的状态或属性发生改变时会重新渲染组件&#xff0c;生成新的虚拟dom。然后react会比较新旧两个dom&#xff08;这个过…

LeetCode 子集

原题链接78. 子集 - 力扣&#xff08;LeetCode&#xff09; 这是一道暴力搜索问题参考大佬们的题解&#xff0c;对这类题目做出一下总结 1.确定递归参数变量 2.递归结束条件 3.做出选择&#xff0c;递归调用进入下一层 4.回溯&#xff0c;返回到递归前的状态 要完成前面这…

.net 百度翻译接口核心类

百度翻译api &#xff1a;http://developer.baidu.com/wiki/index.php?title帮助文档首页/百度翻译/翻译AP 核心翻译类 using System; using System.Collections.Generic; using System.Linq; using System.Text; using Newtonsoft.Json; using System.Net; using System.I…

最新扣子(Coze)实战案例:图像流工具之创建一个精美的LOGO,完全免费教程

&#x1f9d9;‍♂️ 大家好&#xff0c;我是斜杠君&#xff0c;手把手教你搭建扣子AI应用。 &#x1f4dc; 本教程是《AI应用开发系列教程之扣子(Coze)实战教程》&#xff0c;完全免费学习。 &#x1f440; 关注斜杠君&#xff0c;可获取完整版教程。&#x1f44d;&#x1f3f…

03.音频处理流程

一、直播客户端的处理流程 共享端&#xff1a;音视频采集------>音视频编码------ | 传输 观看端&#xff1a;音视频渲染<------音视频解码------ 二、音频数据的流转 1、PCM PCM&#xff08;Pulse Code Modulation&#xff09;是一种未压缩的数字音频格式&#xff0c;…

商家团购app微信小程序模板

手机微信商家团购小程序页面&#xff0c;商家订餐外卖小程序前端模板下载。包含&#xff1a;团购主页、购物车订餐页面、我的订单、个人主页等。 商家团购app微信小程序模板

算法:哈希表

目录 题目一&#xff1a;两数之和 题目二&#xff1a;判定是否互为字符重排 题目三&#xff1a;存在重复元素I 题目四&#xff1a;存在重复元素II 题目五&#xff1a;字母异位词分组 关于哈希表 哈希表就是存储数据的容器 哈希表的优势是&#xff1a;快速查找某个元素O(…

Hive On Spark语法

内层对象定义之特殊数据类型 Array DROP TABLE IF EXISTS test_table_datatype_array; CREATE TABLE test_table_datatype_array (ids array<INT> ) LOCATION test/test_table_datatype_array;SELECTnames,names[1]array(names[2],names[3])names[5],names[-1],array_c…

linux-内存映射MMAP-lseek-dup-fifo-通信-IO多路复用

1、内存映射MMap&#xff1a; DMA&#xff1a; 可以用*/[]取代read和write&#xff1b; 限制&#xff1a; 1、文件大小固定不能改变&#xff1b;&#xff08;ftruncate&#xff09; 2、只能是磁盘文件&#xff1b; 3、建立映射之前先open mmap函数&#xff1a; mmap第一个…

生产环境 CentOS 7 k8s v1.28.0离线部署

背景描述&#xff1a;CentOS 7 Kubernetes 离线部署 随着云计算和微服务架构的普及&#xff0c;Kubernetes&#xff08;K8s&#xff09;已经成为容器编排的标准工具。它能够自动化应用的部署、扩展和管理&#xff0c;使得开发和运维的工作更加高效和可靠。然而&#xff0c;在一…

腾讯开源高质量人类运动视频的框架;通过音频指令修改图像;利用YOLO分析网球视频;Gemma-2中文微调模型

✨ 1: MimicMotion MimicMotion 腾讯开源的通过姿态指导生成高质量任意长度人类运动视频的框架 MimicMotion 是一种可控视频生成框架&#xff0c;旨在生成高质量的任意长度人物动作视频&#xff0c;采用带有置信度的姿态引导&#xff0c;并通过区域损失放大来缓解图像失真。其…

C++视觉开发 三.缺陷检测

一.距离变换 1.概念和功能 距离变换是一种图像处理技术&#xff0c;用于计算图像中每个像素到最近的零像素&#xff08;背景像素&#xff09;的距离。它常用于图像分割、形态学操作和形状分析等领域。它计算图像中每个像素到最近的零像素&#xff08;背景像素&#xff09;的距…

LeetCode 176, 289, 437

目录 176. 第二高的薪水题目链接表要求知识点思路代码 289. 生命游戏题目链接标签简单版思路代码 进阶版思路代码 437. 路径总和 III题目链接标签思路代码 176. 第二高的薪水 题目链接 176. 第二高的薪水 表 表Employee的字段为id和salary。 要求 查询并返回 Employee 表…

苍穹外卖--sky-take-out(五)前端

大部分笔记都是写在语雀的&#xff0c;这是一次性从本人语雀复制过来的&#xff0c;可能结构有些错乱 基础创建 环境要求 node.js npm Vue CLI 创建前端工程 使用vue ui命令创建 项目结构 启动项目 打开命令行窗口 快捷键ctrlj 或者 运行 输入&#xff1a;npm run ser…

010-GeoGebra基础篇-动态验证三角形外接圆的圆心是否可以位于三角形的外部

接下来我们将进行一些稍微高级一点操作&#xff0c;一边学习新东西的同时&#xff0c;也开始对数学、物理等内容的研究。 目录 一、项目截图二、涉及内容三、问题设置1. 问题提出2. 验证方案 三、做图步骤1. 绘制定点A、B&#xff1b;2. 绘制动点C&#xff1b;&#xff08;1&am…