Flink+Hologres亿级用户实时UV精确去重最佳实践

简介: Flink+Hologres亿级用户实时UV精确去重最佳实践

UV、PV计算,因为业务需求不同,通常会分为两种场景:

  • 离线计算场景:以T+1为主,计算历史数据
  • 实时计算场景:实时计算日常新增的数据,对用户标签去重

针对离线计算场景,Hologres基于RoaringBitmap,提供超高基数的UV计算,只需进行一次最细粒度的预聚合计算,也只生成一份最细粒度的预聚合结果表,就能达到亚秒级查询。具体详情可以参见往期文章>>Hologres如何支持超高基数UV计算(基于RoaringBitmap实现)

对于实时计算场景,可以使用Flink+Hologres方式,并基于RoaringBitmap,实时对用户标签去重。这样的方式,可以较细粒度的实时得到用户UV、PV数据,同时便于根据需求调整最小统计窗口(如最近5分钟的UV),实现类似实时监控的效果,更好的在大屏等BI展示。相较于以天、周、月等为单位的去重,更适合在活动日期进行更细粒度的统计,并且通过简单的聚合,也可以得到较大时间单位的统计结果。

主体思想

  1. Flink将流式数据转化为表与维表进行JOIN操作,再转化为流式数据。此举可以利用Hologres维表的insertIfNotExists特性结合自增字段实现高效的uid映射。
  2. Flink把关联的结果数据按照时间窗口进行处理,根据查询维度使用RoaringBitmap进行聚合,并将查询维度以及聚合的uid存放在聚合结果表,其中聚合出的uid结果放入Hologres的RoaringBitmap类型的字段中。
  3. 查询时,与离线方式相似,直接按照查询条件查询聚合结果表,并对其中关键的RoaringBitmap字段做or运算后并统计基数,即可得出对应用户数。
  4. 处理流程如下图所示

0.jpeg

方案最佳实践

1.创建相关基础表

1)创建表uid_mapping为uid映射表,用于映射uid到32位int类型。

  • RoaringBitmap类型要求用户ID必须是32位int类型且越稠密越好(即用户ID最好连续)。常见的业务系统或者埋点中的用户ID很多是字符串类型或Long类型,因此需要使用uid_mapping类型构建一张映射表。映射表利用Hologres的SERIAL类型(自增的32位int)来实现用户映射的自动管理和稳定映射。
  • 由于是实时数据, 设置该表为行存表,以提高Flink维表实时JOIN的QPS。
BEGIN;
CREATE TABLE public.uid_mapping (
uid text NOT NULL,
uid_int32 serial,
PRIMARY KEY (uid)
);
--将uid设为clustering_key和distribution_key便于快速查找其对应的int32值
CALL set_table_property('public.uid_mapping', 'clustering_key', 'uid');
CALL set_table_property('public.uid_mapping', 'distribution_key', 'uid');
CALL set_table_property('public.uid_mapping', 'orientation', 'row');
COMMIT;

2)创建表dws_app为基础聚合表,用于存放在基础维度上聚合后的结果。

  • 使用RoaringBitmap前需要创建RoaringBitmap extention,同时也需要Hologres实例为0.10版本
CREATE EXTENSION IF NOT EXISTS roaringbitmap;
  • 为了更好性能,建议根据基础聚合表数据量合理的设置Shard数,但建议基础聚合表的Shard数设置不超过计算资源的Core数。推荐使用以下方式通过Table Group来设置Shard数
--新建shard数为16的Table Group,
--因为测试数据量百万级,其中后端计算资源为100core,设置shard数为16
BEGIN;
CREATE TABLE tg16 (a int);                             --Table Group哨兵表
call set_table_property('tg16', 'shard_count', '16'); 
COMMIT;
  • 相比离线结果表,此结果表增加了时间戳字段,用于实现以Flink窗口周期为单位的统计。结果表DDL如下:
BEGIN;
create table dws_app(country text,prov text,city text, ymd text NOT NULL,  --日期字段timetz TIMESTAMPTZ,  --统计时间戳,可以实现以Flink窗口周期为单位的统计uid32_bitmap roaringbitmap, -- 使用roaringbitmap记录uvprimary key(country, prov, city, ymd, timetz)--查询维度和时间作为主键,防止重复插入数据
);
CALL set_table_property('public.dws_app', 'orientation', 'column');
--日期字段设为clustering_key和event_time_column,便于过滤
CALL set_table_property('public.dws_app', 'clustering_key', 'ymd');
CALL set_table_property('public.dws_app', 'event_time_column', 'ymd');
--等价于将表放在shard数为16的table group
call set_table_property('public.dws_app', 'colocate_with', 'tg16');
--group by字段设为distribution_key
CALL set_table_property('public.dws_app', 'distribution_key', 'country,prov,city');
COMMIT;

2.Flink实时读取数据并更新dws_app基础聚合表

完整示例源码请见alibabacloud-hologres-connectors examples

1)Flink 流式读取数据源(DataStream),并转化为源表(Table)

//此处使用csv文件作为数据源,也可以是kafka等
DataStreamSource odsStream = env.createInput(csvInput, typeInfo);
// 与维表join需要添加proctime字段,详见https://help.aliyun.com/document_detail/62506.html
Table odsTable =tableEnv.fromDataStream(odsStream,$("uid"),$("country"),$("prov"),$("city"),$("ymd"),$("proctime").proctime());
// 注册到catalog环境
tableEnv.createTemporaryView("odsTable", odsTable);

2)将源表与Hologres维表(uid_mapping)进行关联

其中维表使用insertIfNotExists参数,即查询不到数据时自行插入,uid_int32字段便可以利用Hologres的serial类型自增创建。

// 创建Hologres维表,其中nsertIfNotExists表示查询不到则自行插入
String createUidMappingTable =String.format("create table uid_mapping_dim("+ "  uid string,"+ "  uid_int32 INT"+ ") with ("+ "  'connector'='hologres',"+ "  'dbname' = '%s'," //Hologres DB名+ "  'tablename' = '%s',"//Hologres 表名+ "  'username' = '%s'," //当前账号access id+ "  'password' = '%s'," //当前账号access key+ "  'endpoint' = '%s'," //Hologres endpoint+ "  'insertifnotexists'='true'"+ ")",database, dimTableName, username, password, endpoint);
tableEnv.executeSql(createUidMappingTable);
// 源表与维表join
String odsJoinDim ="SELECT ods.country, ods.prov, ods.city, ods.ymd, dim.uid_int32"+ "  FROM odsTable AS ods JOIN uid_mapping_dim FOR SYSTEM_TIME AS OF ods.proctime AS dim"+ "  ON ods.uid = dim.uid";
Table joinRes = tableEnv.sqlQuery(odsJoinDim);

3)将关联结果转化为DataStream,通过Flink时间窗口处理,结合RoaringBitmap进行聚合

DataStream<Tuple6<String, String, String, String, Timestamp, byte[]>> processedSource =source// 筛选需要统计的维度(country, prov, city, ymd).keyBy(0, 1, 2, 3)// 滚动时间窗口;此处由于使用读取csv模拟输入流,采用ProcessingTime,实际使用中可使用EventTime.window(TumblingProcessingTimeWindows.of(Time.minutes(5)))// 触发器,可以在窗口未结束时获取聚合结果.trigger(ContinuousProcessingTimeTrigger.of(Time.minutes(1))).aggregate(// 聚合函数,根据key By筛选的维度,进行聚合new AggregateFunction<Tuple5<String, String, String, String, Integer>,RoaringBitmap,RoaringBitmap>() {@Overridepublic RoaringBitmap createAccumulator() {return new RoaringBitmap();}@Overridepublic RoaringBitmap add(Tuple5<String, String, String, String, Integer> in,RoaringBitmap acc) {// 将32位的uid添加到RoaringBitmap进行去重acc.add(in.f4);return acc;}@Overridepublic RoaringBitmap getResult(RoaringBitmap acc) {return acc;}@Overridepublic RoaringBitmap merge(RoaringBitmap acc1, RoaringBitmap acc2) {return RoaringBitmap.or(acc1, acc2);}},//窗口函数,输出聚合结果new WindowFunction<RoaringBitmap,Tuple6<String, String, String, String, Timestamp, byte[]>,Tuple,TimeWindow>() {@Overridepublic void apply(Tuple keys,TimeWindow timeWindow,Iterable<RoaringBitmap> iterable,Collector<Tuple6<String, String, String, String, Timestamp, byte[]>> out)throws Exception {RoaringBitmap result = iterable.iterator().next();// 优化RoaringBitmapresult.runOptimize();// 将RoaringBitmap转化为字节数组以存入Holo中byte[] byteArray = new byte[result.serializedSizeInBytes()];result.serialize(ByteBuffer.wrap(byteArray));// 其中 Tuple6.f4(Timestamp) 字段表示以窗口长度为周期进行统计,以秒为单位out.collect(new Tuple6<>(keys.getField(0),keys.getField(1),keys.getField(2),keys.getField(3),new Timestamp(timeWindow.getEnd() / 1000 * 1000),byteArray));}});

4)写入结果表

需要注意的是,Hologres中RoaringBitmap类型在Flink中对应Byte数组类型

// 计算结果转换为表
Table resTable =tableEnv.fromDataStream(processedSource,$("country"),$("prov"),$("city"),$("ymd"),$("timest"),$("uid32_bitmap"));
// 创建Hologres结果表, 其中Hologres的RoaringBitmap类型通过Byte数组存入
String createHologresTable =String.format("create table sink("+ "  country string,"+ "  prov string,"+ "  city string,"+ "  ymd string,"+ "  timetz timestamp,"+ "  uid32_bitmap BYTES"+ ") with ("+ "  'connector'='hologres',"+ "  'dbname' = '%s',"+ "  'tablename' = '%s',"+ "  'username' = '%s',"+ "  'password' = '%s',"+ "  'endpoint' = '%s',"+ "  'connectionSize' = '%s',"+ "  'mutatetype' = 'insertOrReplace'"+ ")",database, dwsTableName, username, password, endpoint, connectionSize);
tableEnv.executeSql(createHologresTable);
// 写入计算结果到dws表
tableEnv.executeSql("insert into sink select * from " + resTable);

3.数据查询

查询时,从基础聚合表(dws_app)中按照查询维度做聚合计算,查询bitmap基数,得出group by条件下的用户数

  • 查询某天内各个城市的uv
--运行下面RB_AGG运算查询,可执行参数先关闭三阶段聚合开关(默认关闭),性能更好
set hg_experimental_enable_force_three_stage_agg=off  SELECT  country,prov,city,RB_CARDINALITY(RB_OR_AGG(uid32_bitmap)) AS uv
FROM    dws_app
WHERE   ymd = '20210329'
GROUP BY country,prov,city
;

  • 查询某段时间内各个省份的uv
--运行下面RB_AGG运算查询,可执行参数先关闭三阶段聚合开关(默认关闭),性能更好
set hg_experimental_enable_force_three_stage_agg=off SELECT  country,prov,RB_CARDINALITY(RB_OR_AGG(uid32_bitmap)) AS uv
FROM    dws_app
WHERE   time > '2021-04-19 18:00:00+08' and time < '2021-04-19 19:00:00+08'
GROUP BY country,prov
;

原文链接
本文为阿里云原创内容,未经允许不得转载。 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/513109.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

如何评估Serverless服务能力,这份报告给出了40条标准

简介&#xff1a; 如今&#xff0c;已经有评测机构给出了40条标准来对Serverless的服务能力进行评估&#xff0c;这些评估细则既是技术生态繁荣发展的一种表现&#xff0c;也可以作为新进入者评估Serverless落地成效的一种参考依据。 编者按&#xff1a;两年前&#xff0c;我们…

Redis 分布式锁的正确实现原理演化历程与 Redisson 实战总结

作者 | 码哥来源 | 码哥字节❝可能是最完善的 Redis 分布式锁原理与实战总结&#xff0c;建议收藏。Redis 分布式锁使用 SET 指令就可以实现了么&#xff1f;在分布式领域 CAP 理论一直存在。分布式锁的门道可没那么简单&#xff0c;我们在网上看到的分布式锁方案可能是有问题的…

OceanBase时序数据库CeresDB正式商用 为用户提供安全可靠的数据存储管理服务

简介&#xff1a; OceanBase完成OLAP和OLTP双重能力并行后&#xff0c;向数据管理领域多模方向迈出第一步。 近日&#xff0c;在数据库OceanBase3.0峰会上&#xff0c;OceanBase CEO杨冰宣布首个时序数据库产品CeresDB正式商用。该数据库将为用户提供安全可靠的数据查询和存储…

html伸缩布局,CSS3 伸缩布局(一)

CSS3引入了一种新的布局模式——Flexbox布局&#xff0c;即伸缩布局盒模型(Flexible Box)&#xff0c;用来提供一个更加有效的方式制定、调整和分布一个容器里项目布局&#xff0c;即使它们的大小是未知或者动态的&#xff0c;这里简称为Flex。Flexbox布局常用于设计比较复杂的…

从0开始:500行代码实现 LSM 数据库

简介&#xff1a; LSM-Tree 是很多 NoSQL 数据库引擎的底层实现&#xff0c;例如 LevelDB&#xff0c;Hbase 等。本文基于《数据密集型应用系统设计》中对 LSM-Tree 数据库的设计思路&#xff0c;结合代码实现完整地阐述了一个迷你数据库&#xff0c;核心代码 500 行左右&#…

从 Docker 的信号机制看容器的优雅停止

作者 | Addo Zhang来源 | 云原生指北有太多的文章介绍如何运行容器&#xff0c;然而如何停止容器的文章相对少很多。根据运行的应用类型&#xff0c;应用的停止过程非常重要。如果应用要写文件&#xff0c;停止前要保证正确刷新数据并关闭文件&#xff1b;如果是 HTTP 服务&…

使用 Arthas 排查开源 Excel 组件问题

简介&#xff1a; 有了实际的使用之后&#xff0c;不免会想到&#xff0c;Arthas 是如何做到在程序运行时&#xff0c;动态监测我们的代码的呢&#xff1f;带着这样的问题&#xff0c;我们一起来看下 Java Agent 技术实现原理。 背景介绍 ​ 项目中有使用到 com.github.dream…

如何选择python书籍_关于 Python 的经典入门书籍有哪些?

展开全部 关于Python&#xff0c;是最近最火最的编程语言e68a843231313335323631343130323136353331333365643631&#xff0c;挺多人都在学习的&#xff0c;关于它的入门书籍&#xff0c;我大概推荐以下几本&#xff1a; 首先我介绍的是《Python基础教程(第2版修订版)》&#x…

“融合、智能、绿色”施耐德电气线上工博以全生命周期解决方案助推数字化

原定于12月1-5日在上海举办的第23届中国国际工业博览会因为疫情再次延期。不必翘首等待&#xff0c;施耐德电气将以线上云展厅的形式如期与您见面&#xff0c;为工业用户呈现一场以“绿色智能制造&#xff0c;共塑可持续未来”为主题的云端盛宴。凭借在绿色智能制造领域的丰富实…

运维更简单、更智能,让运维人不再 “拼命”

简介&#xff1a; 云原生智能运维解决方案&#xff0c;利用大数据为企业日常运维服务&#xff0c;通过可观测数据&#xff0c;融合智能告警与响应中枢&#xff0c;结合机器学习的方法进一步解决自动化运维所未解决的问题&#xff0c;让运维更简单、更智能。 在90%的科幻片中 万…

python全栈马哥_马哥Python全栈+爬虫+高端自动化,资源教程下载

资源名称 马哥Python全栈爬虫高端自动化&#xff0c;资源教程下载 资源介绍 这套课程最后是有项目实战的&#xff0c;如项目四-多人博客开发、项目五CMDB资产管理、项目七-运维流程系统。 资源目录 01Python开班仪式及职业指导 02linux基础-1 03linux基础-2 04linux基础-3 05li…

从操作系统层面分析Java IO演进之路

简介&#xff1a; 本文从操作系统实际调用角度&#xff08;以CentOS Linux release 7.5操作系统为示例&#xff09;&#xff0c;力求追根溯源看IO的每一步操作到底发生了什么。 作者 | 道坚 来源 | 阿里技术公众号 前言 本文从操作系统实际调用角度&#xff08;以CentOS Linu…

教程系列——用模板快速上线一个HR 服务中心

简介&#xff1a; 【开箱即用的模板使用系列教程】将会手把手教给大家如何快速启用钉钉宜搭提供各类模板。今天第一讲&#xff0c;介绍《HR 服务中心》的模板启用。 【开箱即用的模板使用系列教程】将会手把手教给大家如何快速启用钉钉宜搭提供各类模板。今天第1讲&#xff0c;…

数字化“团险”黑科技,保险极客技术升级背后心经

作者 | 宋慧 出品 | CSDN 云计算 疫情之后&#xff0c;一切都在“内卷”&#xff0c;HR 也逃不过。初创公司想要招到优秀人才&#xff0c;除了对市场和未来发展的预期和潜力&#xff0c;提供补充医疗险也是对人才重要的保障。另外&#xff0c;现在补充医疗也是知名大企业高福利…

powershell快捷键_借助Windows Terminal搞一个花里胡哨的PowerShell终端

一提起PowerShell&#xff0c;命令提示符等等&#xff0c;想到的就是丑、难用&#xff0c;非常丑&#xff01;各位可以先感受一下。不过&#xff0c;现在我们可以对它做一个美化&#xff0c;美化后的效果如下&#xff0c;各位也可以感受下(本人不提供背景图)下面做简单记录1、必…

【详谈 Delta Lake 】系列技术专题 之 特性(Features)

简介&#xff1a; 本文翻译自大数据技术公司 Databricks 针对数据湖 Delta Lake 的系列技术文章。众所周知&#xff0c;Databricks 主导着开源大数据社区 Apache Spark、Delta Lake 以及 ML Flow 等众多热门技术&#xff0c;而 Delta Lake 作为数据湖核心存储引擎方案给企业带来…

深度解读畅捷通云原生架构转型实战历程

简介&#xff1a; 畅捷通公司是用友集团旗下的成员企业&#xff0c;专注于服务国内小微企业的财务和管理服务。一方面&#xff0c;畅捷通将自己的产品、业务、技术架构互联网化&#xff1b;另一方面&#xff0c;畅捷通推出了畅捷通一站式云服务平台&#xff0c;面向小微企业提供…

Apache Dubbo 3.0.0 正式发布 - 全面拥抱云原生

简介&#xff1a; 一个新的里程碑&#xff01; 一、背景 自从 Apache Dubbo 在 2011 年开源以来&#xff0c;在一众大规模互联网、IT公司的实践中积累了大量经验后&#xff0c;Dubbo 凭借对 Java 用户友好、功能丰富、治理能力强等优点在过去取得了很大的成功&#xff0c;成为…

python关键字中文意思_python 字符串只保留汉字的方法

如下所示&#xff1a; def is_chinese(uchar): """判断一个unicode是否是汉字""" if uchar > u\u4e00 and uchar < u\u9fa5: return True else: return False def is_number(uchar): """判断一个unicode是否是数字"&q…

启明星辰集团DT总部落地杭州 数据绿洲版图驱动未来发展

12月1日&#xff0c;启明星辰集团DT&#xff08;数据时代&#xff09;总部正式落地于杭州高新区&#xff08;滨江&#xff09;&#xff0c;与北京IT总部形成南北两个总部基地新格局&#xff0c;并发布数据安全新版图--数据绿洲&#xff0c;将结合杭州领先的数字应用的场景&…