Flink实战五_直播礼物统计

接上文:Flink实战四_TableAPI&SQL

1、需求背景

现在网络直播平台非常火爆,在斗鱼这样的网络直播间,经常可以看到这样的总榜排名,体现了主播的人气值。

人气值计算规则:用户发送1条弹幕互动,赠送1个荧光棒免费道具、100个免费鱼丸、亲密度礼物等行为,均可为主播贡献1点及以上人气值。

我们就以这个人气值日榜为例,来设计一个Flink的计算程序。

在这里插入图片描述
对于人气值日榜这样的功能,可以理解为是一个典型的流式计算的场景,强调的是数据的实时处理。因为在这个场景下,必须要及时的累计用户的送礼物数据,才能形成你追我赶的实时效果,提升用户的参与体验。这个场景下的实时性,虽然不要求每一条数据都及时响应,但是整体的数据延迟还是要尽量缩短的。

这种场景下,使用Flink进行流批统一的计算,感觉就非常合适。

2、数据流程设计

在确定了使用Flink进行计算后,首先就需要设计出数据的上下游流程,进行简单的方案可行性评估。

对于数据上游,我们这个人气值日榜统计的业务场景,数据来源自然就是粉丝们的打赏行为。一方面整个平台的打赏行为的数据量是非常大的,另一方面这些打赏行为涉及到账户操作,所以他的作用,更大的是体现在人气值榜功能以外的其他业务过程中。基于这两方面考虑,自然就会想到使用kafka来进行削峰以及解耦。而Flink在DataStream/DataSet API和 Table API&SQL 两个部分都对kafka提供了连接器实现,所以用kafka作为数据接入是可行的。

而对于数据下游,其实可以想象,最终计算出来的数据,最为重要的是要强调查询的灵活性以及时效性,这样才能支持页面的快速查询。如果考虑查询的时效性,HBase和ElasticSearch都是比较理想的大数据存储引擎。但是如果考虑到查询的灵活性,就会想到ElasticSearch会相比HBase更适合。因为我们统计出来的这些粉丝人气值度的结果,不光可以作为每个直播间人气值榜的排名,也应该可以作为以后平台主播年度排名等其他业务场景的数据来源。如果想要兼顾这些查询场景,使用HBase就会对Rowkey产生大量的侵入,而Elasticsearch可以根据任意字段快速查询,就比较有优势。 另外,从官方文档中可以查到,对于HBase,Flink只提供了Table API&SQL 模块的connector支持,而DataStream/DataSet API中没有提供支持,而ElasticSearch则支持更为全面。当然,这跟HBase的具体场景是有关联的,但是也可以从另一个角度认为,使用ElasticSearch的可行性更高。

这样,就初步确定了 kafka-> Flink -> ElasticSearch 这样的大致数据流程。这
也是在实际开发中非常典型的一个组合方式。后续就可以着手搭建kafka集群以及ElasticSearch+Kibana的集群了。搭建的过程就略过了。

确定数据的基础结构
这一步主要是确定入口数据和出口数据的结构。只要这两个数据结构确定了,那
么应用程序模块和大数据计算模块就可以分开进行开发了。是双方主要的解耦方
式。

在数据入口处,可以定义这样的简化的数据结构:

public static class GiftRecord{
private String hostId; //主播ID
private String fansId; //粉丝ID
private long giftCount; //礼物数量
private String giftTime; //送礼物时间。时间格式 yyyy-MM-DD HH:mm:SS
.....
}

在kafka中,确定使用gift作为Topic,MQ的消息格式为 #{hostId},#{fansId},#{giftCount},#{giftTime} 这样的字符串。

在数据出口处,可以定义ES中这样简化的索引结构:

-- 贡献日榜索引
PUT daygiftanalyze
{
"mappings":{"properties": {"windowEnd":{"type": "long"},"hostId": {"type": "keyword"},"fansId": {"type": "keyword"},"giftCount":{"type": "long"}}}
}

这样,一个简单的设计方案就形成了。应用程序只需要在粉丝发送礼物时往kafka中同步一条消息记录,然后从ES中查询主播的人气值日榜和人气值周榜数据即可。而我们也可以模拟数据格式进行开发了。

3、应用实现

人气值日榜:
基础数据结构:

public static class GiftRecord{private String hostId; //主播IDprivate String fansId; //粉丝IDprivate long giftCount; //礼物数量private String giftTime; //送礼物时间。时间格式 yyyy-MM-DD HH:mm:SS.....
}

在kafka中,确定使用gift作为Topic,MQ的消息格式为 #{hostId},#{fansId},#{giftCount},#{giftTime} 这样的字符串。

ES索引:

PUT daygiftanalyze
{"mappings": {"properties": {"windowEnd": {"type": "long"},"hostId": {"type": "keyword"},"fansId": {"type": "keyword"},"giftCount": {"type": "long"}}}
}

然后运行Flink程序,com.flink.project.flink.DayGiftAna,从kafka中读取数
据。测试数据见giftrecord.txt。计算程序会及时将十分钟内的粉丝礼物统计都存入到ES当中。

giftrecord.txt如下:

1001,3001,100,2021-09-15 15:15:10
1001,3002,321,2021-09-15 15:17:14
1001,3003,234,2021-09-15 15:16:24
1001,3004,15,2021-09-15 15:17:13
1001,3005,264,2021-09-15 15:18:14
1001,3006,678,2021-09-15 15:17:54
1001,3007,123,2021-09-15 15:19:22
1001,3008,422,2021-09-15 15:18:37
1001,3009,566,2021-09-15 15:22:43
1001,3001,76,2021-09-15 15:21:28
1001,3001,88,2021-09-15 15:26:28
1001,3007,168,2021-09-15 15:32:29
1001,3002,157,2021-09-15 15:28:56
1001,3009,567,2021-09-15 15:27:32
1001,3004,145,2021-09-15 15:30:26
1001,3003,1656,2021-09-15 15:31:19
1001,3005,543,2021-09-15 15:36:49
1001,3001,864,2021-09-15 15:38:26
1001,3001,548,2021-09-15 15:45:10
1001,3007,359,2021-09-15 15:52:39
1001,3008,394,2021-09-15 15:59:48

com.flink.project.flink.DayGiftAna,如下:


import com.roy.flink.project.fansgift.FansGiftResult;
import com.roy.flink.project.fansgift.GiftRecord;
import org.apache.commons.lang.SystemUtils;
import org.apache.flink.api.common.eventtime.*;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichAggregateFunction;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
import org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink;
import org.apache.flink.util.Collector;
import org.apache.http.HttpHost;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Requests;import java.io.IOException;
import java.text.SimpleDateFormat;
import java.time.Duration;
import java.util.*;import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;/*** @desc 贡献日榜计算程序*/
public class DayGiftAna {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.getConfig().setAutoWatermarkInterval(1000L); //BoundedOutOfOrdernessWatermarks定时提交Watermark的间隔
//        env.setStateBackend(new RocksDBStateBackend("hdfs://hadoop01:8020/dayGiftAna"));// Checkpoint存储到文件if(SystemUtils.IS_OS_WINDOWS){env.setStateBackend(new FsStateBackend("file:///D:/flink_file"));}else{// linuxenv.setStateBackend(new FsStateBackend("file:///home/file_file"));}//使用Socket测试。env.setParallelism(1);final DataStreamSource<String> dataStream = env.socketTextStream("10.86.97.206", 7777);final SingleOutputStreamOperator<FansGiftResult> fansGiftResult = dataStream.map((MapFunction<String, GiftRecord>) value -> {final String[] valueSplit = value.split(",");//SimpleDateFormat 多线程不安全。SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");final long giftTime = sdf.parse(valueSplit[3]).getTime();return new GiftRecord(valueSplit[0], valueSplit[1], Integer.parseInt(valueSplit[2]), giftTime);}).assignTimestampsAndWatermarks(WatermarkStrategy.<GiftRecord>forBoundedOutOfOrderness(Duration.ofSeconds(2)).withTimestampAssigner((SerializableTimestampAssigner<GiftRecord>) (element, recordTimestamp) -> element.getGiftTime()))
//          .keyBy((KeySelector<GiftRecord, String>) value -> value.getHostId() + "_" + value.getFansId()) //按照HostId_FansId分组.keyBy((KeySelector<GiftRecord, String>) value -> value.getHostId()) //按照HostId分组.window(TumblingEventTimeWindows.of(Time.seconds(10)))
//                .allowedLateness(Time.seconds(2)).aggregate(new WinodwGiftRecordAgg(), new AllWindowGiftRecordAgg());//打印结果测试fansGiftResult.print("fansGiftResult");env.execute("DayGiftAna");}//在每个子任务中将窗口期内的礼物进行累计合并//增加状态后端。private static class WinodwGiftRecordAgg implements AggregateFunction<GiftRecord, Long, Long> {@Overridepublic Long createAccumulator() {return 0L;}@Overridepublic Long add(GiftRecord value, Long accumulator) {Long res = accumulator + value.getGiftCount();return res;}@Overridepublic Long getResult(Long accumulator) {return accumulator;}@Overridepublic Long merge(Long a, Long b) {return a + b;}}//对窗口期内的所有子任务进行窗口聚合操作。private static class AllWindowGiftRecordAgg extends RichWindowFunction<Long, FansGiftResult, String, TimeWindow> {ValueState<FansGiftResult> state;@Overridepublic void apply(String s, TimeWindow window, java.lang.Iterable<Long> input, Collector<FansGiftResult> out) throws Exception {final String[] splitKey = s.split("_");String hostId = splitKey[0];String fansId ="";if(splitKey.length>1){fansId=splitKey[1];}final Long giftCount = input.iterator().next();final long windowEnd = window.getEnd();final FansGiftResult fansGiftResult = new FansGiftResult(hostId, fansId, giftCount, windowEnd);out.collect(fansGiftResult);state.update(fansGiftResult);}@Overridepublic void open(Configuration parameters) throws Exception {final ValueStateDescriptor<FansGiftResult> stateDescriptor = new ValueStateDescriptor<>("WinodwGiftRecordAgg", TypeInformation.of(new TypeHint<FansGiftResult>() {}));state = this.getRuntimeContext().getState(stateDescriptor);}}
}

FansGiftResult,代码如下:

public class FansGiftResult {private String hostId;private String fansId;private long giftCount;private long windowEnd;public FansGiftResult() {}public FansGiftResult(String hostId, String fansId, long giftCount, long windowEnd) {this.hostId = hostId;this.fansId = fansId;this.giftCount = giftCount;this.windowEnd = windowEnd;}@Overridepublic String toString() {if(fansId!=null && fansId.length()>0){return "FansGiftResult{" +"hostId='" + hostId + '\'' +", fansId='" + fansId + '\'' +", giftCount=" + giftCount +", windowEnd=" + windowEnd +'}';}else{return "FansGiftResult{" +"hostId='" + hostId + '\'' +", giftCount=" + giftCount +", windowEnd=" + windowEnd +'}';}}public String getHostId() {return hostId;}public void setHostId(String hostId) {this.hostId = hostId;}public String getFansId() {return fansId;}public void setFansId(String fansId) {this.fansId = fansId;}public long getGiftCount() {return giftCount;}public void setGiftCount(long giftCount) {this.giftCount = giftCount;}public long getWindowEnd() {return windowEnd;}public void setWindowEnd(long windowEnd) {this.windowEnd = windowEnd;}
}

GiftRecord,代码如下:


public class GiftRecord {private String hostId; //主播IDprivate String fansId; //粉丝IDprivate int giftCount; //礼物数量private long giftTime; //送礼物时间。原始时间格式 yyyy-MM-DD HH:mm:ss,ssspublic GiftRecord() {}public GiftRecord(String hostId, String fansId, int giftCount, long giftTime) {this.hostId = hostId;this.fansId = fansId;this.giftCount = giftCount;this.giftTime = giftTime;}public String getHostId() {return hostId;}public void setHostId(String hostId) {this.hostId = hostId;}public String getFansId() {return fansId;}public void setFansId(String fansId) {this.fansId = fansId;}public int getGiftCount() {return giftCount;}public void setGiftCount(int giftCount) {this.giftCount = giftCount;}public long getGiftTime() {return giftTime;}public void setGiftTime(long giftTime) {this.giftTime = giftTime;}@Overridepublic String toString() {return "GiftRecord{" +"hostId='" + hostId + '\'' +", fansId='" + fansId + '\'' +", giftCount=" + giftCount +", giftTime='" + giftTime + '\'' +'}';}
}

ES查询语句:

GET daygiftanalyze/_search
{"query": {"bool": {"must": [{"range": {"windowEnd": {"gte": 1631635200000,"lte": 1631721600000}}},{"match": {"hostId": "1001"}}]}},"aggs": {"groupByFans": {"terms": {"field": "fansId","size": 3,"order": {"giftCount": "desc"}},"aggs": {"giftCount": {"sum": {"field": "giftCount"}}}}}
}

ES中的查询结果:
在这里插入图片描述
直播应用就可以根据这个查询结果组织客户端查询代码,最终实现日榜排名的功能。

4、实现效果分析

具体的计算方案参见示例代码,这里就不多做分析了。这里只分析一下在实现过程中需要注意的几个重要的问题:

  • 时间语义分析
    对于网络直播这样的场景,从下午六点到第二天早上六点才是一天的高峰期,所以,在进行统计时,将每一天的统计时间定义为从早上六点到第二天早上六点,这样就能尽量保持高峰期的完整性。很多跟娱乐相关的场景,比如网络游戏,也大都是以这样的范围来定义一天,而不是传统意义上的从0点到24点。

  • 并行度优化
    可以直接使用Flink的开窗机制,待一周的数据收集完整了之后,一次性向ES中输出统计结果,这种场景下要注意累计器的持久化,以及计算程序出错后的重启恢复机制。

  • 后续改进方式
    状态后端、而对于人气值日榜的计算,就不能等一天的数据收集齐了再计算了。这时是有两种解决方案,一种是完全的流处理方式。也就是每来一条数据就往ES中更新结果。另一中方式是采用小批量的流处理方式。以五分钟为单位,将数据拆分成一个一个小窗
    口来进行处理。显然后一种方式对数据处理的压力会比较小一点。虽然数据量会更
    多,但是ES的存储以及快速查询能力可以比较好的弥补数据量的问题。也因此,在
    设计ES数据机构时,将人气值日榜的文档结构设计成了一个一个的小范围。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/667503.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

在线JSON转SQL工具

在线JSON转SQL - BTool在线工具软件&#xff0c;为开发者提供方便。在线JSON转SQL工具可以将JSON文件中的数据或者JSON对象转换为SQL插入语句&#xff0c;方便用户将数据导入到数据库中。用户可以通过简单的界面上传JSON文件&#xff0c;或者文本框输入&#xff0c;点击JSON转S…

Redis——SpringBoot整合Redis实战

1、基本配置 1.1、引入依赖 首先&#xff0c;建立Maven项目&#xff0c;在Maven项目中引入pom.xml文件&#xff1a; <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> &l…

计算机网络_1.6.3 计算机网络体系结构分层思想举例

1.6.3 计算机网络体系结构分层思想举例 1、实例引入&#xff08;用户在主机中使用浏览器访问web服务器&#xff09;2、从五层原理体系结构的角度研究该实例3、练习题 笔记来源&#xff1a; B站 《深入浅出计算机网络》课程 本节通过一个常见的网络应用实例&#xff0c;来介绍计…

灵活应对:策略模式在软件设计中的应用

策略模式是一种行为型设计模式&#xff0c;它允许定义一系列算法&#xff0c;并将每个算法封装起来&#xff0c;使它们可以互换使用。策略模式让算法的变化独立于使用算法的客户端&#xff0c;使得在不修改原有代码的情况下切换或扩展新的算法成为可能。 使用策略模式的场景包…

android inset 管理

目录 简介 Insets管理架构 Insets相关类图 app侧的类 WMS侧的类 inset show的流程 接口 流程 WMS侧确定InsetsSourceControl的流程 两个问题 窗口显示时不改变现有的inset状态 全屏窗口上的dialog 不显示statusbar问题 View 和 DecorView 设置insets信息 输入法显…

幻兽帕鲁客户端存档文件 - 云上备份和恢复教程

本文将详细介绍如何将幻兽帕鲁游戏客户端的存档文件备份至云端&#xff0c;以及如何从云端恢复存档数据至本地。 一、游戏存档备份场景 幻兽帕鲁的游戏进度存储在电脑本地磁盘上&#xff0c;游戏中创建的每个世界都对应一个本地存档文件夹。在玩游戏过程中&#xff0c;客户端…

智能边缘计算网关实现高效数据处理与实时响应-天拓四方

在当今时代&#xff0c;数据已经成为驱动业务决策的关键因素。然而&#xff0c;传统的数据处理方式往往存在延迟&#xff0c;无法满足实时性要求。此时&#xff0c;智能边缘计算网关应运而生&#xff0c;它能够将数据处理和分析的能力从中心服务器转移至设备边缘&#xff0c;大…

基于单片机控制的智能门锁设计

摘要&#xff1a;阐述基于STC15F2K60S2单片机控制的智能门锁设计&#xff0c;包括CPU控制单元模块、液晶显示LCD、 Wi-Fi模块&#xff0c;实现远程控制开门&#xff0c;密码开门的智能化功能。 关键词&#xff1a;控制技术&#xff0c;单片机&#xff0c;智能门锁&#xff0c;…

游戏视频录制软件推荐,打造专业电竞视频(3款)

随着游戏产业的快速发展&#xff0c;越来越多的玩家开始关注游戏视频录制软件。一款好的录制软件不仅可以帮助玩家记录游戏中的精彩瞬间&#xff0c;还可以让其与他人分享自己的游戏体验。接下来&#xff0c;我们将介绍三款热门的游戏视频录制软件&#xff0c;并对其进行详细的…

收放卷转动线速度计算FC(SCL+梯形图代码)

这篇博客是收放控制算法的基础系列,通过这篇文章的学习。大家能更好的理解收放卷控制里的前馈量计算,收放卷前馈PID大家可以参考下面链接文章: https://rxxw-control.blog.csdn.net/article/details/129352629https://rxxw-control.blog.csdn.net/article/details/12935262…

3D Line Mapping Revisited论文阅读

1. 代码地址 GitHub - cvg/limap: A toolbox for mapping and localization with line features. 2. 项目主页 3D Line Mapping Revisited 3. 摘要 提出了一种基于线的重建算法&#xff0c;Limap&#xff0c;可以从多视图图像中构建3D线地图&#xff0c;通过线三角化、精心…

06、全文检索 -- Solr -- Solr 全文检索之在图形界面管理 Core 的 Schema(演示对 普通字段、动态字段、拷贝字段 的添加和删除)

目录 Solr 全文检索之管理 Schema使用Web控制台管理Core的Schema3 种 字段解释&#xff1a;Field&#xff1a;普通字段Dynamic Field&#xff1a;动态字段Copy Field&#xff1a;拷贝字段 演示&#xff1a;添加 普通字段&#xff08; Field &#xff09;演示&#xff1a;添加 动…

C++入门的基础

幸福比傲慢更容易蒙住人的眼睛。 ——大仲马 C入门 1、属于C的关键字1、1、C从何而来1、2、C关键字(C98) 2、命名空间2、1、命名空间的定义2、2、命名空间使用 3、C输入和输出4、缺省参数4、1、缺省参数概念4、2、缺省参数分类 5、函数重载5、1、函数重载概念 6、引用6、1、引用…

电脑/机顶盒/ps3/4/连接老电视(只有AV、S-Video接口)解决方案之HDMI转AV/S-Video转换器HAV

HDMI转AV/S-Video转换器功能 01、将HDMI高清信号经过视频处理转换成AV、S-VIDEO(PAL/NTSC)的视频信号输出 02、将HDMI数字音频&#xff0c;经过DAC数模芯片处理转成模拟立体声输出 03、采用先进的视频处理技术&#xff0c;对图像的亮度&#xff0c;对比度及色彩进行增强处理 04…

使用Docker本地部署Jupyter Notebook并结合内网穿透实现远程访问

文章目录 1. 选择与拉取镜像2. 创建容器3. 访问Jupyter工作台4. 远程访问Jupyter工作台4.1 内网穿透工具安装4.2 创建远程连接公网地址4.3 使用固定二级子域名地址远程访问 本文主要介绍如何在Ubuntu系统中使用Docker本地部署Jupyter Notebook&#xff0c;并结合cpolar内网穿透…

01-操作系统_名词_文件下载_反弹

操作系统_名词_文件下载_反弹 一、渗透测试1.1、POC、EXP、Payload与Shellcode1.2、后门1.3、木马1.4、反弹1.5、回显1.6、跳板1.7、黑白盒测试1.8、暴力破解1.9、社会工程学1.10、撞库1.11、ATT&CK 二、案例演示2.1、基础案例1&#xff1a;操作系统-用途&命令&权限…

Android学习之路(27) ProGuard,混淆,R8优化

前言 使用java编写的源代码编译后生成了对于的class文件&#xff0c;但是class文件是一个非常标准的文件&#xff0c;市面上很多软件都可以对class文件进行反编译&#xff0c;为了我们app的安全性&#xff0c;就需要使用到Android代码混淆这一功能。 针对 Java 的混淆&#x…

【Docker】Docker Registry(镜像仓库)

文章目录 一、什么是 Docker Registry二、镜像仓库分类三、镜像仓库工作机制四、常用的镜像仓库五、常用命令镜像仓库命令镜像命令(部分)容器命令(部分) 六、docker镜像仓库实战综合实战一&#xff1a;搭建一个 nginx 服务综合实战二&#xff1a;Docker hub上创建自己私有仓库综…

B站课程评分

Spring6 https://www.bilibili.com/video/BV1Ft4y1g7Fb/ 评价: 推荐一看 配套文档优秀, 老师口齿清晰, 条理不错. mybatis https://www.bilibili.com/video/BV1JP4y1Z73S/?spm_id_from333.337.search-card.all.click 评价: 推荐一看 配套文档优秀, 老师口齿清晰, 条理不错…

等变和不变 、向量神经元(vector neurons)是什么?

等变和不变 等变&#xff1a;如果输入是一个旋转后的椅子&#xff0c;那么输出也应该是一个旋转后的椅子 不变&#xff1a;如果输入是一个旋转后的椅子&#xff0c;那么输出应该是一个椅子&#xff0c;而不是一只狗。 向量神经元&#xff08;vector neurons&#xff09; 向量…