Flink的ProcessFunction API

1 ProcessFunction

   ProcessFunction是一个低阶的流处理操作,可以访问事件(event)(流元素),状态(state)(容错性,一致性,仅在keyed stream中),定时器(timers)(event time和processing time, 仅在keyed stream中)。也就是说可以访问普通的转换算子无法访问事件的时间戳信息和Watermark的。

   ProcessFunction可以看作是一个具有keyed state 键控状态和 timers定时器访问权的FlatMapFunction,通过对输入流中接收的每个事件调用来处理事件。①通过RuntimeContext访问keyed state②计时器允许应用程序对处理时间和事件时间中的更改作出响应。对processElement(…)函数的每次调用都获得一个Context对象,该对象可以访问元素的event time timestamp和TimerService;③TimerService可用于为将来的event/process time瞬间注册回调。当到达计时器的特定时间时,将调用onTimer(…)方法。在该调用期间,所有状态都再次限定在创建计时器时使用的键的范围内,从而允许计时器操作键控状态。总之ProcessFunction可以访问时间戳、watermark以及注册定时事件,输出特定的一些事件等。Flink SQL就是使用Process Function实现的。

   如果要访问键控状态和计时器,则必须应用在keyedStream上

stream.keyBy(...).process(new MyProcessFunction())

   Flink提供了8个Process Function:ProcessFunction,KeyedProcessFunction,CoProcessFunction,ProcessJoinFunction,BroadcastProcessFunction,KeyedBroadcastProcessFunction,ProcessWindowFunction,ProcessAllWindowFunction。

   所有的Process Function都继承自RichFunction接口,所以都有open()、close()和getRuntimeContext()等方法,还额外提供了两个方法processElement和onTimer

   processElement:每来一个元素都会调用这个方法,调用结果将会放在Collector数据类型中输出。获得的Context可以访问元素的时间戳,元素的key,以及TimerService时间服务。Context还可以将结果输出到别的流(side outputs)。

   onTimer:是一个回调函数,当之前注册的定时器到达触发时间调用。参数timestamp为定时器所设定的触发的时间戳。Collector为输出结果的集合。OnTimerContext和processElement的Context参数一样,提供了上下文的一些信息,例如定时器触发的时间信息(事件时间或者处理时间)。

2 低阶join

   要实现对两个输入的低级操作,应用程序可以使用CoProcessFunction或KeyedCoProcessFunction。

   CoProcessFunction实现对两个输入的低阶操作,它绑定到两个不同的输入流,分别调用processElement1(…)和processElement2(…)对两个输入流的数据进行处理

   实现低阶join通常遵循以下模式:①为一个(或两个)输入创建一个状态对象②当从输入源收到元素时,更新状态③从另一个输入接收元素后,检索状态并生成连接的结果

3 KeyedProcessFunction

   KeyedProcessFunction作为ProcessFunction的扩展,在其onTimer(…)方法中提供对定时器对应key的访问。

   KeyedProcessFunction用来操作KeyedStream。KeyedProcessFunction会处理流的每一个元素,输出为0个、1个或者多个元素。

override def onTimer(timestamp: Long, ctx: OnTimerContext, out: Collector[OUT]): Unit = {var key = ctx.getCurrentKey// ...
}

4 Timers

   processing-time/event-time timer都由TimerService在内部维护并排队等待执行,仅在keyed stream中有效。

   由于Flink对(每个key+timestamp)只维护一个计时器。如果为相同的timestamp注册了多个timer ,则只调用onTimer()方法一次。

   Flink保证同步调用onTimer()和processElement() 。因此用户不必担心状态的并发修改。

   容错:Timer具有容错和checkpoint能力(基于flink app的状态)。从故障恢复或从savepoint启动应用程序时,Timer将被恢复。大量计时器会增加检查点时间,因为计时器是检查点状态的一部分。

   定时器合并:由于Flink对每个键和时间戳只维护一个计时器,因此可以通过降低计时器频率来合并计时器,从而减少计时器的数量。 event-time timer只会在watermarks到来时触发。

//对于1秒的定时器分辨率(事件或处理时间),可以将目标时间舍入整秒。计时器的发射时间最多提前1秒,但不迟于要求的毫秒精度。因此,每键最多有一个定时器和第二个定时器。
val coalescedTime = ((ctx.timestamp + timeout) / 1000) * 1000
ctx.timerService.registerProcessingTimeTimer(coalescedTime)//事件时间计时器只在水印进入的情况下触发,您还可以使用当前Watermark调度这些计时器并将其与下一个Watermark合并:
val coalescedTime = ctx.timerService.currentWatermark + 1
ctx.timerService.registerEventTimeTimer(coalescedTime)//停止处理时间计时器:
val timestampOfTimerToStop = ...
ctx.timerService.deleteProcessingTimeTimer(timestampOfTimerToStop)//停止事件时间计时器:
val timestampOfTimerToStop = ...
ctx.timerService.deleteEventTimeTimer(timestampOfTimerToStop)

5 官方案例

   KeyedProcessFunction维护每个键的计数,并在没有对该键进行更新的情况下,在一分钟内(在事件发生时)发出一个键/计数对:

  • 计数、键和最后修改时间戳存储在ValueState,这是由Key隐式限定范围的。
  • 对于每个记录,KeyedProcessFunction增加计数器并设置最后修改的时间戳。
  • 该函数还会在以后的一分钟内安排一个回调(在事件发生时)。
  • 在每次回调时,它会检查回调的事件时间戳和存储计数的最后修改时间,如果它们匹配,则发出键/计数(也就是说,在这一分钟内没有发生进一步的更新)。
import org.apache.flink.api.common.state.ValueState
import org.apache.flink.api.common.state.ValueStateDescriptor
import org.apache.flink.api.java.tuple.Tuple
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.util.Collector// the source data stream
val stream: DataStream[Tuple2[String, String]] = ...// apply the process function onto a keyed stream
val result: DataStream[Tuple2[String, Long]] = stream.keyBy(0).process(new CountWithTimeoutFunction())/*** The data type stored in the state*/
case class CountWithTimestamp(key: String, count: Long, lastModified: Long)/*** The implementation of the ProcessFunction that maintains the count and timeouts*/
class CountWithTimeoutFunction extends KeyedProcessFunction[Tuple, (String, String), (String, Long)] {/** The state that is maintained by this process function */lazy val state: ValueState[CountWithTimestamp] = getRuntimeContext.getState(new ValueStateDescriptor[CountWithTimestamp]("myState", classOf[CountWithTimestamp]))override def processElement(value: (String, String), ctx: KeyedProcessFunction[Tuple, (String, String), (String, Long)]#Context, out: Collector[(String, Long)]): Unit = {// initialize or retrieve/update the stateval current: CountWithTimestamp = state.value match {case null =>CountWithTimestamp(value._1, 1, ctx.timestamp)case CountWithTimestamp(key, count, lastModified) =>CountWithTimestamp(key, count + 1, ctx.timestamp)}// write the state backstate.update(current)// schedule the next timer 60 seconds from the current event timectx.timerService.registerEventTimeTimer(current.lastModified + 60000)}override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[Tuple, (String, String), (String, Long)]#OnTimerContext, out: Collector[(String, Long)]): Unit = {state.value match {case CountWithTimestamp(key, count, lastModified) if (timestamp == lastModified + 60000) =>out.collect((key, count))case _ =>}}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/473557.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

LeetCode 1647. 字符频次唯一的最小删除次数(贪心)

文章目录1. 题目2. 解题1. 题目 如果字符串 s 中 不存在 两个不同字符 频次 相同的情况,就称 s 是 优质字符串 。 给你一个字符串 s,返回使 s 成为 优质字符串 需要删除的 最小 字符数。 字符串中字符的 频次 是该字符在字符串中的出现次数。 例如&am…

分享Db4o的便捷封装类源码

导言 大家好,话说真是好久好久没写文章了,哈哈。 最近在写网站,个人对传统数据库天然抵触,感觉非常繁冗,即便是Entity Framework也过于庞杂了,Db4o这种轻量级且读写、配置都极其方便的新型数据库非常适合我…

Flink中的状态管理

1 Flink中的状态 当数据流中的许多操作只查看一个每次事件(如事件解析器),一些操作会跨多个事件的信息(如窗口操作)。这些操作称为有状态。状态由一个任务维护,并且用来计算某个结果的所有数据,都属于这个任务的状态。可以简单的任务状态就是…

Python之日志处理(logging模块)

主要内容 日志相关概念logging模块简介使用logging提供的模块级别的函数记录日志logging模块日志流处理流程使用logging四大组件记录日志配置logging的几种方式向日志输出中添加上下文信息参考文档 一、日志相关概念 日志是一种可以追踪某些软件运行时所发生事件的方法。软件开…

LeetCode 514. 自由之路(记忆化递归 / DP)

文章目录1. 题目2. 解题1. 题目 电子游戏“辐射4”中,任务“通向自由”要求玩家到达名为“Freedom Trail Ring”的金属表盘,并使用表盘拼写特定关键词才能开门。 给定一个字符串 ring,表示刻在外环上的编码;给定另一个字符串 ke…

thinkpad s3 安装win8 kali双系统笔记

前段时间入手了一台thinkpad s3(i7,8G),预装了win8系统,windows下写代码,环境配置比较麻烦,就想安装kali linux做双系统,官方参考文档:http://docs.kali.org/installation/dual-boot-kali-with-windows但s3预装了64bit win8,采用的是UEFI启动,官方文档并不完全适用,所以折腾了一…

Flink中的容错机制

1 checkpoint Flink 故障恢复机制的核心,就是应用状态的一致性检查点checkpoint。 在Spark Streaming中仅仅是针对driver的故障恢复做了数据和元数据的Checkpoint,处理的是当前时间点所有分区当前数据的状态。在Flink中不能把当前所有分区的数据直接存下…

os、os.path、shutil操作文件和文件路径的常用方法总结

os模块是python标准库中的一个用于访问操作系统功能的模块,下面简要介绍一下常用的命令 1、os.name(). 判断现在正在使用的平台,windows返回’nt’,Linux返回‘posix’ 2、os.getcwd() 得到当前工作的目录 3、os.listdir(). 指定所在目…

LeetCode 698. 划分为k个相等的子集(回溯)

文章目录1. 题目2. 解题1. 题目 给定一个整数数组 nums 和一个正整数 k,找出是否有可能把这个数组分成 k 个非空子集,其总和都相等。 示例 1: 输入: nums [4, 3, 2, 3, 5, 2, 1], k 4 输出: True 说明:…

Linux网络服务器epoll模型的socket通讯的实现(一)

准备写一个网络游戏的服务器的通讯模块&#xff0c;参考网上看到的一些代码&#xff0c;在linux下面实现一个多线程的epoll模型的socket通讯的代码,以下是第一部分多线程的切换代码: 1 #include <stdio.h>2 #include <sys/types.h>3 #include <sys/epoll.h>…

MySQL中的表中增加删除字段

1增加两个字段&#xff1a; mysql> create table id_name(id int,name varchar(20)); Query OK, 0 rows affected (0.13 sec)mysql> alter table id_name add age int,add address varchar(11); Query OK, 0 rows affected (0.13 sec) Records: 0 Duplicates: 0 Warnin…

Ubuntu编写开机自启动脚本(转载)

From:http://blog.csdn.net/marujunyy/article/details/8466255 1、首先编写一个简单的shell脚本test.sh #! /bin/bash echo "Hello world!" filenamedate"%Y%m%d" echo $filename 2、设置脚本开机自启动 方法一&#xff1a; 编辑/etc/init.d/rc.local文件…

Ubuntu下svn 版本管理客户端工具及常用方法

Ubuntu16.04系统下安装RapidSVN版本控制器及配置diff,editor,merge和exploer工具&#xff0c;在Window下我们使用TortoiseSVN(小乌龟)&#xff0c;可以很方便地进行查看、比较、更新、提交、回滚等SVN版本控制操作。 在Linux下我们可以使用RapidSVN。RapidSVN是一款轻量级的免费…

Flink的Table API 与SQL介绍及调用

1 概述 DataSetAPI和DateStreamAPI是基于整个Flink的运行时环境做操作处理的&#xff0c;Table API和SQL是在DateStreamAPI上又包了一层。对于新版本的Blink在DateStream基础上又包了一层实现了批流统一&#xff0c;上层执行环境都是基于流处理&#xff0c;做批流统一的查询。T…

Python编程中一些异常处理的小技巧

编程中经常会需要使用到异常处理的情况&#xff0c;在阅读了一些资料后&#xff0c;整理了关于异常处理的一些小技巧记录如下。 1 如何自定义异常 1.1 定义异常类 在实际编程中&#xff0c;有时会发现Python提供的内建异常的不够用&#xff0c;我们需要在特殊业务场景下的异常…

Flink的Table API 与SQL的流处理

1 流处理与SQL的区别 Table API和SQL&#xff0c;本质上还是基于关系型表的操作方式&#xff1b;而关系型表、SQL本身&#xff0c;一般是有界的&#xff0c;更适合批处理的场景。所以在流处理的过程中&#xff0c;有一些特殊概念。 SQL流处理处理对象字段元组的有界集合字段元…

LeetCode 833. 字符串中的查找与替换(排序,replace)

文章目录1. 题目2. 解题1. 题目 某个字符串 S 需要执行一些替换操作&#xff0c;用新的字母组替换原有的字母组&#xff08;不一定大小相同&#xff09;。 每个替换操作具有 3 个参数&#xff1a;起始索引 i&#xff0c;源字 x 和目标字 y。 规则是&#xff1a;如果 x 从原始…

Json.NET

我前面的一篇博客 Metro应用Json数据处理 介绍了如何使用 DataContractJsonSerializer 类将对象的实例序列化为JSON字符串以及将JSON字符串反序列化为对象的实例的处理方式。而此种处理方式的一个很大的缺点就是要求JSON字符串格式是约定好的&#xff0c;而在很多情况下我们无法…

MySQL如何跨机器迁移数据?

经常会遇到如此需求&#xff0c;需把A主机上的MySQL数据库所有迁移到B主机上&#xff0c;或者部分数据库&#xff0c;所以接下来将介绍迁移所有数据库和迁移单个数据库时的数据迁移步骤。 1 实验环境 A主机&#xff08;源主机&#xff09;&#xff1a; IP地址&#xff1a;19…

ClickHouse的特性及读写

1 ClickHouse特性 OLAP数据库一般有2个要求&#xff1a;①容量要比关系型数据库大&#xff0c;②在线查询的速度要快。ClickHouse这两点都满足并且还支持标准的sql&#xff0c;支持比较复杂的语句&#xff0c;支持分布式。ClickHouse的几个显著特点如下&#xff1a; &#xff0…