Flink ProcessFunction 介绍使用

目录

实现功能

代码

测试

问题


官网描述:https://ci.apache.org/projects/flink/flink-docs-release-1.10/zh/dev/stream/operators/process_function.html

The ProcessFunction is a low-level stream processing operation, giving access to the basic building blocks of all (acyclic) streaming applications:

  • events (stream elements)
  • state (fault-tolerant, consistent, only on keyed stream)
  • timers (event time and processing time, only on keyed stream)

The ProcessFunction can be thought of as a FlatMapFunction with access to keyed state and timers. It handles events by being invoked for each event received in the input stream(s).

For fault-tolerant state, the ProcessFunction gives access to Flink’s keyed state, accessible via the RuntimeContext, similar to the way other stateful functions can access keyed state.

The timers allow applications to react to changes in processing time and in event time. Every call to the function processElement(...) gets a Context object which gives access to the element’s event time timestamp, and to the TimerService. The TimerService can be used to register callbacks for future event-/processing-time instants. With event-time timers, the onTimer(...) method is called when the current watermark is advanced up to or beyond the timestamp of the timer, while with processing-time timers, onTimer(...) is called when wall clock time reaches the specified time. During that call, all states are again scoped to the key with which the timer was created, allowing timers to manipulate keyed state.

 

ProcessFunction是一个低阶的流处理操作,它可以访问流处理程序的基础构建模块:

1.事件(event)(流元素)。

2.状态(state)(容错性,一致性,仅在keyed stream中)。

3.定时器(timers)(event time和processing time, 仅在keyed stream中)。

 

state和timers 仅在keyed stream中使用,这里我们先介绍KeyedProcessFunction方法使用

实现功能

通过socketTextStream读取9999端口数据,统计在一定时间内不同类型商品的销售总额度,如果持续销售额度为0,则执行定时器通知老板,是不是卖某种类型商品的员工偷懒了(只做功能演示,根据个人业务来使用,比如统计UV等操作)

代码

import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.tuple.Tuple
import org.apache.flink.api.scala.typeutils.Types
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collectorobject ProcessFuncationScala {def main(args: Array[String]): Unit = {val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironmentval stream: DataStream[String] = env.socketTextStream("localhost", 9999)val typeAndData: DataStream[(String, String)] = stream.map(x => (x.split(",")(0), x.split(",")(1))).setParallelism(4)typeAndData.keyBy(0).process(new MyprocessFunction()).print("结果")env.execute()}/*** 实现:*    根据key分类,统计每个key进来的数据量,定期统计数量,如果数量为0则预警*/class MyprocessFunction extends  KeyedProcessFunction[Tuple,(String,String),String]{//统计间隔时间val delayTime : Long = 1000 * 10lazy val state : ValueState[(String,Long)] = getRuntimeContext.getState[(String,Long)](new ValueStateDescriptor[(String, Long)]("cjcount",classOf[Tuple2[String,Long]]))override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[Tuple, (String, String), String]#OnTimerContext, out: Collector[String]): Unit = {printf("定时器触发,时间为:%d,状态为:%s,key为:%s\n",timestamp,state.value(),ctx.getCurrentKey)if(state.value()._2==0){//该时间段数据为0,进行预警printf("类型为:%s,数据为0,预警\n",state.value()._1)}//定期数据统计完成后,清零state.update(state.value()._1,0)//再次注册定时器执行val currentTime: Long = ctx.timerService().currentProcessingTime()ctx.timerService().registerProcessingTimeTimer(currentTime + delayTime)}override def processElement(value: (String, String), ctx: KeyedProcessFunction[Tuple, (String, String), String]#Context, out: Collector[String]): Unit = {printf("状态值:%s,state是否为空:%s\n",state.value(),(state.value()==null))if(state.value() == null){//获取时间val currentTime: Long = ctx.timerService().currentProcessingTime()//注册定时器十秒后触发ctx.timerService().registerProcessingTimeTimer(currentTime + delayTime)printf("定时器注册时间:%d\n",currentTime+10000L)state.update(value._1,value._2.toInt)} else{//统计数据val key: String = state.value()._1var count: Long = state.value()._2count += value._2.toInt//更新state值state.update((key,count))}println(getRuntimeContext.getTaskNameWithSubtasks+"->"+value)printf("状态值:%s\n",state.value())//返回处理后结果out.collect("处理后返回数据->"+value)}}}

 

代码中使用ValueState记录了状态信息,每次来商品都会进行总额度累加;商品第一次进入的时候会注册一个定时器,每隔十秒执行一次,定时器做预警功能,如果十秒内商品销售等于0,我们则进行预警。

测试

往端口输入数据

十秒内输入四条数据

帽子,12
帽子,12
鞋,10
鞋,10

 通过我们打印我们会发现统计完成,

定时器触发,时间为:1586005420511,状态为:(鞋,20),key为:(鞋)
定时器触发,时间为:1586005421080,状态为:(帽子,24),key为:(帽子)

如果我们十秒内不输入数据,则会提示数据为0,进行预警

定时器触发,时间为:1586005406244,状态为:(帽子,0),key为:(帽子)
类型为:帽子,数据为0,预警
定时器触发,时间为:1586005406244,状态为:(鞋,0),key为:(鞋)
类型为:鞋,数据为0,预警

问题

到这里我们已经实现了定期统计功能,但有没有发现,如果帽子分配在task1执行,鞋在task2执行,鞋一天进来1亿条数据,帽子进来1条数据,我们会出现严重的数据倾斜问题。

我们实际看一下具体问题

计算结果我们就先不看了,直接看数据分配问题

三个task阶段 , Socket是单并行的source,我们将并行度改为4

 

输入数据:1条 帽子,10 ;50条 鞋,10

我们看Map阶段,数据是均衡的,因为这里还没有进行keyby

 

我们再看keyby后的task

我们发现50条数据都在ID为3的subtask中,出现了严重数据倾斜问题 

这种问题我们可以进行两阶段keyby解决该问题

具体数据倾斜问题参考:https://datamining.blog.csdn.net/article/details/105322423

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/509759.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Flink keyby 数据倾斜问题处理

上一篇我们使用keyby后发现数据严重倾斜 https://datamining.blog.csdn.net/article/details/105316728 大概看下问题所在,大量数据在一个subtask中运行 这里我们使用两阶段keyby 解决该问题 之前的问题如下图所示 我们期望的是 但我们的需要根据key进行聚合统计&a…

linux中iptables对防火墙的操作

Iptables教程 1. iptables防火墙简介 Iptables也叫netfilter是Linux下自带的一款免费且优秀的基于包过滤的防火墙工具,它的功能十分强大,使用非常灵活,可以对流入、流出、流经服务器的数据包进行精细的控制。iptables是Linux2.4及2.6内核中…

Web Components入门不完全指北

目前流行的各类前端框架,不管是react, angular还是vue,都有一个共同点,那就是支持组件化开发,但事实上随着浏览器的发展,现在浏览器也原生支持组件式开发,本文将通过介绍Web Components 的三个主要概念&…

Flink 1.9 CDH 6.3 集成

目录 1.下载准备文件 2.felink csa jar包准备 3.将 Flink Parcel放入httpd目录下 4.配置CDH Flink Parcel 5.安装Flink 1.下载准备文件 https://archive.cloudera.com/csa/1.0.0.0/csd/FLINK-1.9.0-csa1.0.0.0-cdh6.3.0.jarhttps://archive.cloudera.com/csa/1.0.0.0/parc…

ssh免密登陆机制示意图

ssh免密登陆机制示意图

CDH 6.x 安装 Phoenix 服务

最近有个新项目启动,版本升级到6.3,发现CDH6.2 版本已经支持Phoenix parcel安装 一、准备文件 下载 https://archive.cloudera.com/phoenix/6.2.0/csd/PHOENIX-1.0.jar 下载parcel #目录 https://archive.cloudera.com/phoenix/6.2.0/parcels/ #根据…

域名服务的工作流程

域名服务的工作流程

Kafka 消费者组 Rebalance 详解

Rebalance作用 Rebalance 本质上是一种协议,主要作用是为了保证消费者组(Consumer Group)下的所有消费者(Consumer)消费的主体分区达成均衡。 比如:我们有10个分区,当我们有一个消费者时&…

Kafka JMX 监控 之 jmxtrans + influxdb + grafana

目录 效果图 环境准备 安装 influxdb 安装我们刚刚下载 influxdb rpm文件 查看默认配置 修改参数 启动 influxdb 查看启动状态 设置基本配置 influxdb 其他命令扩展 安装 jmxtrans 可能遇到的异常 验证jmxtrans是否成功运行 安装 Grafana 安装 influxDB 与 Grafa…

实例浅析epoll的水平触发和边缘触发,以及边缘触发为什么要使用非阻塞IO

一.基本概念 我们通俗一点讲: Level_triggered(水平触发):当被监控的文件描述符上有可读写事件发生时,epoll_wait()会通知处理程序去读写。如果这次没有把数据一次性全部读写完(如…

UML序列图

UML学习(三)-----序列图 UML的模型中可分为两种,动态模型和静态模型。用例图、类图和对象图都是UML中的静态结构模型。而在UML系统动态模型的其中一种就是交互视图,它描述了执行系统功能的各个角色之间相互传递消息的顺序关系。序…

OpenTSDB 开发指南之 查询数据

前面博主写了一篇文章去介绍opentsdb的http接口的使用方法,但是某一些接口的使用还是比较复杂,这篇文章会通过example来详细讲述opentsdb的一些特性。 本文的举的例子有这些: 基本的写入和查询数据的注释和说明子查询查询中的filters使用查询数据的rat…

libcurl使用方法

原文地址:http://curl.haxx.se/libcurl/c/libcurl-tutorial.html 译者:JGood(http://blog.csdn.net/JGood ) 译者注:这是一篇介绍如何使用libcurl的入门教程。文档不是逐字逐句按原文翻译,而是根据笔者对libcurl的理解&#xff0c…

OpenTSDB 开发指南之 Grafana 展示OpenTSDB监控数据

目录 准备数据 在Grafana创建OpenTSDB连接 创建一个仪表盘 统计 准备数据 将数据插入OpenTSDB {"metric":"jast.data","value":1023,"timestamp":1588742563,"tags":{"type":"jast-graph-data"}}…

CDH 版本 Kafka 外网设置

登陆CDH页面,进入Kafka配置页面 搜索 advertised 修改advertised.host.name,这里我们有三台Broker,我们把每台的外网ip填写到对应的机器上 advertised.port不填写 我们kafka的端口设置的是9099 将外网端口9099开放,允许外网访问 (这里不做介绍

OpenTSDB 安装

下载目录 https://github.com/OpenTSDB/opentsdb/releases https://github.com/OpenTSDB/opentsdb/releases/download/v2.4.0/opentsdb-2.4.0.noarch.rpm 安装 GnuPlot yum install gnuplot -y 直接安装OpenTSDB会报错 [rootecs-t-001-0001 openTSDB]# rpm -ivh opentsdb-2.…

HBase原理 – snapshot 快照

目录 snapshot(快照)基础原理 snapshot能实现什么功能? hbase snapshot用法大全 hbase snapshot分布式架构-两阶段提交 snapshot核心实现 clone_snapshot如何实现呢? 其他需要注意的 参考文献 更多信息可参考《…

linux如何自动化部署脚本实现免密登录并访问资源

任务把weijie主机jdk文件安装到weijie1中。 首先再各台主机中安装必要的命令: expect、wget、httpd、ssh 执行命令 如:expect提示命令不存在,则分别安装命令 yum install expect yum install wget yum install httpd yum install ssh 开…

时序数据库技术体系 – InfluxDB TSM存储引擎之数据读取

任何一个数据库系统内核关注的重点无非:数据在内存中如何存储、在文件中如何存储、索引结构如何存储、数据写入流程以及数据读取流程。关于InfluxDB存储内核,笔者在之前的文章中已经比较全面的介绍了数据的文件存储格式、倒排索引存储实现以及数据写入流…