目录
实现功能
代码
测试
问题
官网描述:https://ci.apache.org/projects/flink/flink-docs-release-1.10/zh/dev/stream/operators/process_function.html
The
ProcessFunction
is a low-level stream processing operation, giving access to the basic building blocks of all (acyclic) streaming applications:
- events (stream elements)
- state (fault-tolerant, consistent, only on keyed stream)
- timers (event time and processing time, only on keyed stream)
The
ProcessFunction
can be thought of as aFlatMapFunction
with access to keyed state and timers. It handles events by being invoked for each event received in the input stream(s).For fault-tolerant state, the
ProcessFunction
gives access to Flink’s keyed state, accessible via theRuntimeContext
, similar to the way other stateful functions can access keyed state.The timers allow applications to react to changes in processing time and in event time. Every call to the function
processElement(...)
gets aContext
object which gives access to the element’s event time timestamp, and to the TimerService. TheTimerService
can be used to register callbacks for future event-/processing-time instants. With event-time timers, theonTimer(...)
method is called when the current watermark is advanced up to or beyond the timestamp of the timer, while with processing-time timers,onTimer(...)
is called when wall clock time reaches the specified time. During that call, all states are again scoped to the key with which the timer was created, allowing timers to manipulate keyed state.
ProcessFunction是一个低阶的流处理操作,它可以访问流处理程序的基础构建模块:
1.事件(event)(流元素)。
2.状态(state)(容错性,一致性,仅在keyed stream中)。
3.定时器(timers)(event time和processing time, 仅在keyed stream中)。
state和timers 仅在keyed stream中使用,这里我们先介绍KeyedProcessFunction方法使用
实现功能
通过socketTextStream读取9999端口数据,统计在一定时间内不同类型商品的销售总额度,如果持续销售额度为0,则执行定时器通知老板,是不是卖某种类型商品的员工偷懒了(只做功能演示,根据个人业务来使用,比如统计UV等操作)
代码
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.tuple.Tuple
import org.apache.flink.api.scala.typeutils.Types
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collectorobject ProcessFuncationScala {def main(args: Array[String]): Unit = {val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironmentval stream: DataStream[String] = env.socketTextStream("localhost", 9999)val typeAndData: DataStream[(String, String)] = stream.map(x => (x.split(",")(0), x.split(",")(1))).setParallelism(4)typeAndData.keyBy(0).process(new MyprocessFunction()).print("结果")env.execute()}/*** 实现:* 根据key分类,统计每个key进来的数据量,定期统计数量,如果数量为0则预警*/class MyprocessFunction extends KeyedProcessFunction[Tuple,(String,String),String]{//统计间隔时间val delayTime : Long = 1000 * 10lazy val state : ValueState[(String,Long)] = getRuntimeContext.getState[(String,Long)](new ValueStateDescriptor[(String, Long)]("cjcount",classOf[Tuple2[String,Long]]))override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[Tuple, (String, String), String]#OnTimerContext, out: Collector[String]): Unit = {printf("定时器触发,时间为:%d,状态为:%s,key为:%s\n",timestamp,state.value(),ctx.getCurrentKey)if(state.value()._2==0){//该时间段数据为0,进行预警printf("类型为:%s,数据为0,预警\n",state.value()._1)}//定期数据统计完成后,清零state.update(state.value()._1,0)//再次注册定时器执行val currentTime: Long = ctx.timerService().currentProcessingTime()ctx.timerService().registerProcessingTimeTimer(currentTime + delayTime)}override def processElement(value: (String, String), ctx: KeyedProcessFunction[Tuple, (String, String), String]#Context, out: Collector[String]): Unit = {printf("状态值:%s,state是否为空:%s\n",state.value(),(state.value()==null))if(state.value() == null){//获取时间val currentTime: Long = ctx.timerService().currentProcessingTime()//注册定时器十秒后触发ctx.timerService().registerProcessingTimeTimer(currentTime + delayTime)printf("定时器注册时间:%d\n",currentTime+10000L)state.update(value._1,value._2.toInt)} else{//统计数据val key: String = state.value()._1var count: Long = state.value()._2count += value._2.toInt//更新state值state.update((key,count))}println(getRuntimeContext.getTaskNameWithSubtasks+"->"+value)printf("状态值:%s\n",state.value())//返回处理后结果out.collect("处理后返回数据->"+value)}}}
代码中使用ValueState记录了状态信息,每次来商品都会进行总额度累加;商品第一次进入的时候会注册一个定时器,每隔十秒执行一次,定时器做预警功能,如果十秒内商品销售等于0,我们则进行预警。
测试
往端口输入数据
十秒内输入四条数据
帽子,12
帽子,12
鞋,10
鞋,10
通过我们打印我们会发现统计完成,
定时器触发,时间为:1586005420511,状态为:(鞋,20),key为:(鞋)
定时器触发,时间为:1586005421080,状态为:(帽子,24),key为:(帽子)
如果我们十秒内不输入数据,则会提示数据为0,进行预警
定时器触发,时间为:1586005406244,状态为:(帽子,0),key为:(帽子)
类型为:帽子,数据为0,预警
定时器触发,时间为:1586005406244,状态为:(鞋,0),key为:(鞋)
类型为:鞋,数据为0,预警
问题
到这里我们已经实现了定期统计功能,但有没有发现,如果帽子分配在task1执行,鞋在task2执行,鞋一天进来1亿条数据,帽子进来1条数据,我们会出现严重的数据倾斜问题。
我们实际看一下具体问题
计算结果我们就先不看了,直接看数据分配问题
三个task阶段 , Socket是单并行的source,我们将并行度改为4
输入数据:1条 帽子,10 ;50条 鞋,10
我们看Map阶段,数据是均衡的,因为这里还没有进行keyby
我们再看keyby后的task
我们发现50条数据都在ID为3的subtask中,出现了严重数据倾斜问题
这种问题我们可以进行两阶段keyby解决该问题
具体数据倾斜问题参考:https://datamining.blog.csdn.net/article/details/105322423