Flink电商实时数仓(三)

DIM层代码流程图

维度层的重点和难点在于实时电商数仓需要的维度信息一般是动态的变化的,并且由于实时数仓一般需要一直运行,无法使用常规的配置文件重启加载方式来修改需要读取的ODS层数据,因此需要通过Flink-cdc实时监控MySql中的维度数据配置信息表,实时动态的发布广播信息。主流数据根据广播数据及时调整处理逻辑,并自动在HBase中创建相应的维度表和写入相应的维度数据。

  1. 消费Kafka ods业务主题数据
  2. 数据清洗:是否为JSON格式
  3. 使用flink-cdc读取监控配置表数据
  4. 在HBase中创建维度表
  5. 做成广播流
  6. 连接主流和广播流
  7. 筛选出需要写出的字段
  8. 写出到Hbase

在这里插入图片描述

整体架构

  • realtime-common模块
    • base: 所有Flink程序的基类,负责搭建Flink运行环境和设置并行度和检查点等相关参数。其中我们的数据来源也确定为Kafka,故数据源代码也写在这里。每个Flink程序的具体处理逻辑由handle()函数来负责处理。
    • bean:负责存放项目运行过程中需要用到的bean对象,比如当前flink-cdc程序中需要用到的TableProcessDim类,配置信息表对象。
    • constant:负责存放程序中需要使用到常量参数
    • function:负责存放一些通用的函数方法
    • util:一般存放和数据连接相关的工具类
    • test目录: 用来在写正式代码前测试连接是否通畅,数据是否可以正常发送。
  • realtime-dim模块
    • app:DimApp里面写的是dim层的具体实现,具体步骤如上述流程图所示。
    • function:负责存放数据处理的实现类,一般会继承相应的父类,在dim层可以直接调用这里的子类来实现父类接口,让dim层的代码逻辑更加清晰。
  • realtime-dwd模块:如上
  • realtime-dws模块:如上

在这里插入图片描述

数据清洗ETL

数据清洗,简单来说就是对数据进行简单的转换筛选。首先如果在转换过程中出现异常,直接过滤掉。注意这里无需抛出异常,因为如果throw a exception会导致整个程序异常终止,而在数据处理过程中出现部分数据格式错误而无法正常进行格式转换是很常见的,只需将异常信息打印到控制台即可。如果转换正常,再判断是否满足以下三个条件:

  1. 数据库名为gmall
  2. 数据类型不是bootstrap-start或者bootstrap-complete
  3. data字段不是null且长度不为0

Flink-cdc读取配置表的数据

Flink中获取数据主要有两个步骤:

  1. 获取相应的数据源Source
    • 注意:在构建Flink-cdc对应的MySQLSource时,tableList参数必须是库表.表名结构
  2. 调用env.fromSource()方法将数据源的发送过来的数据转换Ds数据流,在该方法中可以设置数据的水位线。
  3. 获取到数据后,建议先打印到控制台查看数据的具体结构。
public static MySqlSource<String> getMySqlSource(String databaseName, String tableName){MySqlSource<String> mySqlSource = MySqlSource.<String>builder().hostname(Constant.MYSQL_HOST).port(Constant.MYSQL_PORT).username(Constant.MYSQL_USER_NAME).password(Constant.MYSQL_PASSWORD).databaseList(databaseName) // set captured database.tableList(databaseName+"."+tableName) // set captured table.deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String.startupOptions(StartupOptions.initial()).build();return mySqlSource;}

在HBase中创建维度表

数据库中的配置表数据经过Flink-cdc处理后发送到这里是json格式的字符串,这里根据数据的四种类型op在HBase中进行不同的建表删表操作,同时对数json字符数据进行转换映射处理,转换为对应的bean对象数据流。这里一个数据产生一个处理后的对象,故使用Map算子或FlatMap算子都可以。

  • op类型
    • d 代表delete,需要删除before字段中对应的表
    • c 代表create,r 代表 read,需要创建after字段中对应的表
    • u 代表update,需要先删除掉旧表,然后根据新表的字段创建一个新表
  • 创建HBase连接,创建连接是很耗费资源的行为,因此新建连接和关闭连接需要写在open和close方法中
  • HBase中想要对表进行创建和删除等DDL操作,都由Admin对象管理;如果需要对数据进行插入删除等DML操作,需要创建Table对象。详细操作细节请看相应代码即可。
public static SingleOutputStreamOperator<TableProcessDim> createHbaseTable(DataStreamSource<String> mysqlSource) {SingleOutputStreamOperator<TableProcessDim> createHBaseTable = mysqlSource.flatMap(new RichFlatMapFunction<String, TableProcessDim>() {public Connection connection ;@Overridepublic void open(Configuration parameters) throws Exception {//获取连接connection = HBaseUtil.getHBaseConnection();}@Overridepublic void close() throws Exception {//关闭连接HBaseUtil.closeHBaseConn(connection);}@Overridepublic void flatMap(String s, Collector<TableProcessDim> out){//使用读取的配置表数据,到HBase中创建与之对应的表格try {JSONObject jsonObject = JSONObject.parseObject(s);String op = jsonObject.getString("op");TableProcessDim dim;//维度表if ("d".equals(op)) {dim = jsonObject.getObject("before", TableProcessDim.class);dim.setOp(op);//当配置表发送一个D类型的数据,对应的HBase需要删除一张维度表deleteTable(dim);} else if ("c".equals(op) || "r".equals(op)) {dim = jsonObject.getObject("after", TableProcessDim.class);createTable(dim);dim.setOp(op);} else {//op = 'u', 即修改dim = jsonObject.getObject("after", TableProcessDim.class);deleteTable(dim);createTable(dim);}dim.setOp(op);out.collect(dim);} catch (Exception e) {e.printStackTrace();}}private void createTable(TableProcessDim dim) {String sinkFamily = dim.getSinkFamily();String[] split = sinkFamily.split(",");try {HBaseUtil.createHBaseTable(connection,Constant.HBASE_NAMESPACE,dim.getSinkTable(),split);} catch (IOException e) {e.printStackTrace();}}private void deleteTable(TableProcessDim dim) {try {HBaseUtil.dropHBaseTable(connection, Constant.HBASE_NAMESPACE, dim.getSinkTable());} catch (IOException e) {e.printStackTrace();}}});return createHBaseTable;}

主流连接广播流

从Flink-cdc获取的数据(gmall2023_config)是作为一个参数来控制我们对于主流即ODS层数据(gmall数据库的业务数据)的处理逻辑。gmall2023)_config库中的Table_process_dim表决定了后续程序筛选哪个表作为维度信息,并且定义了表中有哪些字段。

  1. 转换为广播流只需要调用上述得到的TableProcessDimStream的broadcast方法
  2. 使用的主流(gmall业务数据)的connect方法,得到一个连接流,然后对连接流进行process处理。
  3. 创建BroadcastProcessFunction,在里面分别有两个函数
    • processBroadcastElement():处理广播流数据
    • processElement():处理主流数据
  4. 广播流处理逻辑:
    • 读取广播状态
    • 将配置表信息写到广播状态中
    • 根据广播状态数据的op对状态做相应的修改
  5. 主流处理逻辑:
    • 查询广播状态,判断当前数据对应的表是否存在于状态中
    • 如果数据比状态来的更早,造成状态为空,需要对状态做预处理(提前从mysql中读取维表配置表信息)
    • 如果根据当前表的表名查询的状态不为空,说明该表为维度数据,使用收集器收集起来。

筛选出需要的字段

在这里插入图片描述
在维度配置信息表中的sink_column字段里定义了维度表需要的字段,使用filter算子对JsonObj里面的data字段进行过滤即可获取到想要的字段数据。

写出到Hbase

过滤后的数据流调用它的addSink方法,方法中需要传入一个SinkFunction接口类。该接口需要实现三个方法分别是:

  • open方法:获取HBase连接
  • close方法:关闭HBase连接
  • invoke方法:写入数据时调用的方法,根据jsonObj中的type做不同处理,如果是delete,需要删除对应的维度表数据;否则都是直接覆盖写入。

代码的Gitee仓库地址:https://gitee.com/langpaian/gmall2023-realtime.git

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/236327.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

相互独立的Gamma分布变量之和的分布

两个变量之和的情况 设随机变量相互独立&#xff0c;并且服从参数为的分布&#xff0c;记作&#xff1b;服从参数为的分布&#xff0c;记作&#xff0c;和的概率密度分别为&#xff1a; 其中 其中 那么随机变量服从参数为的分布&#xff0c;记作。 推广到n个变量之和的情况 …

云原生消息流系统 Apache Pulsar 在腾讯云的大规模生产实践

导语 由 InfoQ 主办的 Qcon 全球软件开发者大会北京站上周已精彩落幕&#xff0c;腾讯云中间件团队的冉小龙参与了《云原生机构设计与音视频技术应用》专题&#xff0c;带来了以《云原生消息流系统 Apache Pulsar 在腾讯云的大规模生产实践》为主题的精彩演讲&#xff0c;在本…

进阶之路:高级Spring整合技术解析

Spring整合 1.1 Spring整合Mybatis思路分析1.1.1 环境准备步骤1:准备数据库表步骤2:创建项目导入jar包步骤3:根据表创建模型类步骤4:创建Dao接口步骤5:创建Service接口和实现类步骤6:添加jdbc.properties文件步骤7:添加Mybatis核心配置文件步骤8:编写应用程序步骤9:运行程序 1.…

挑选知识付费平台不再迷茫:掌握这些技巧,轻松找到适合自己的平台!

明理信息科技知识付费平台 在当今的知识付费市场中&#xff0c;用户面临的选择越来越多&#xff0c;如何从众多知识付费平台中正确选择属于自己的平台呢&#xff1f;下面&#xff0c;我们将为您介绍明理信息科技知识付费平台相比同行的优势&#xff0c;帮助您做出明智的选择。…

Dubbo面试题及答案,持续更新

在准备Dubbo相关的面试题时&#xff0c;我发现网络上的资源往往缺乏深度和全面性。为了帮助广大Java程序员更好地准备面试&#xff0c;我花费了大量时间进行研究和整理&#xff0c;形成了这套Dubbo面试题大全。 这套题库不仅包含了一系列经典的Dubbo面试题及其详尽答案&#x…

雅思考试笔试还是机试,哪个更好,为什么?

想要参加或者即将参加雅思考试的同学们&#xff0c;你们知道雅思笔试和机试的有哪些相似点和不同点吗&#xff1f;下面由E趣少儿英语&#xff08;LUEnglish&#xff09;为您具体分析。 雅思纸笔考试与雅思机考&#xff1a;两者之间的相似之处 两个版本的雅思考试&#xff08;…

concat_ws()和college_list()配合=>实现多行转一行

concat_ws()&#xff1a; concat_ws()是“with separator”的缩写&#xff0c;它是一个用于连接字符串的函数。ws代表“with separator”&#xff0c;即带有分隔符。这个函数的作用是将多个字符串连接起来&#xff0c;并在它们之间插入指定的分隔符。语法&#xff1a;concat_ws…

2023 英特尔On技术创新大会直播 | 边云协同加速 AI 解决方案商业化落地

目录 前言边云协同时代背景边缘人工智能边缘挑战英特尔边云协同的创新成果最后 前言 最近观看了英特尔On技术创新大会直播&#xff0c;学到了挺多知识&#xff0c;其中对英特尔高级首席 AI 工程张宇博士讲解的边云协同加速 AI 解决方案商业化落地特别感兴趣。张宇博士讲解了英…

开源堡垒机JumpServer结合内网穿透实现远程访问

开源堡垒机JumpServer结合内网穿透实现远程访问 前言1. 安装Jump server2. 本地访问jump server3. 安装 cpolar内网穿透软件4. 配置Jump server公网访问地址5. 公网远程访问Jump server6. 固定Jump server公网地址 前言 JumpServer 是广受欢迎的开源堡垒机&#xff0c;是符合 …

网络安全:网络安全的技术趋势与发展

1.背景介绍 网络安全是现代信息化时代的重要问题之一&#xff0c;它涉及到计算机网络的安全性、数据的完整性、隐私保护等方面。随着互联网的不断发展&#xff0c;网络安全问题也日益复杂化。本文将从多个方面进行探讨&#xff0c;以帮助读者更好地理解网络安全的技术趋势与发…

spring依赖注入对象类型属性----外部bean的引入(bean和bean之间的引入)

文章目录 注入普通属性的方式1、set方法注入2、构造器&#xff08;构造方法&#xff09;注入 总结&#xff1a;注入对象类型属性 注入普通属性的方式 1、set方法注入 2、构造器&#xff08;构造方法&#xff09;注入 总结&#xff1a; set方法注入和构造器方法的注入&#…

Ansible自动化工具之Playbook剧本编写

目录 Playbook的组成部分 实例模版 切换用户 指定声明用户 声明和引用变量&#xff0c;以及外部传参变量 playbook的条件判断 ​编辑 习题 ​编辑 ansible-playbook的循环 item的循环 ​编辑 list循环 ​编辑 together的循环&#xff08;列表对应的列&#xff0…

【蓝桥杯一对一保奖辅导】国奖学姐蓝桥杯经验分享

目录 写在前面有关报名费如何准备&#xff1f;看书 /练习 /分类 /总结比赛技巧与指导 写在前面 蓝桥杯对于计算机专业相关的同学来说是非常值得参加的。 蓝桥杯相对于ACM比赛而言获奖难度较小&#xff0c;只要掌握技巧&#xff0c;拿到 省一甚至国奖是比较容易的&#xff0c;但…

Qt前端技术:3.QSS字体样式

small-caps就是让这个文本中的小写字母用大写的形式写出来并且在用大写的形式表达出来后他本身的大小会变小 有绝对尺寸和相对尺寸的区别 绝对尺寸一般是cm&#xff0c;英寸之类的 相对尺寸如px之类的是由显示器的屏幕分辨率来决定的 如windows用户分辨率一般是96像素点每英…

网络安全事件频发现状

近日&#xff0c;腾讯视频、菜鸟、滴滴等App崩溃的消息登上热搜&#xff0c;引发不少网友热议。今年以来&#xff0c;已有多起App崩溃事件发生&#xff0c;甚至有企业因此业绩损失超亿元。互联网应用的系统安全和稳定性建设越来越被社会广泛关注。 12月3日晚&#xff0c;有网友…

【力扣100】543.二叉树的直径

添加链接描述 # Definition for a binary tree node. # class TreeNode: # def __init__(self, val0, leftNone, rightNone): # self.val val # self.left left # self.right right class Solution:def __init__(self):self.max 0def diamete…

uniapp整合echarts(目前性能最优、渲染最快方案)

本文echarts示例如上图,可扫码体验渲染速度及loading效果,下文附带本小程序uniapp相关代码 实现代码 <template><view class="source

【python】在线代码混淆方案及注意事项

▒ 目录 ▒ &#x1f6eb; 导读开发环境 1️⃣ 在线网站pyob混淆操作步骤编写测试代码混淆转pyc缺点中文路径问题&#xff1a;python: Cant reopen .pyc file 2️⃣ 反编译python文件格式对比uncompyle6 3️⃣ 其它方案cpythonpython-obfuscatorPyInstaller【不推荐】pyminifie…

数字人解决方案——ER-NeRF实时对话数字人模型推理部署带UI交互界面

简介 这个是一个使用ER-NeRF来实现实时对话数字人、口播数字人的整体架构&#xff0c;其中包括了大语言回答模型、语音合成、成生视频流、背景替换等功能&#xff0c;项目对显存的要求很高&#xff0c;想要达到实时推理的效果&#xff0c;建议显存在24G以上。 实时对话数字人 …

众和策略:大盘涨手中的股票却大跌,到底怎么回事?

大盘涨手中的股票却大跌&#xff0c;究竟怎么回事&#xff1a; 1、大盘上涨是权重股所造成的 大盘上涨可能是受一些权重比较大的工作所影响&#xff0c;比如证券工作、钢铁工作、银行工作等等&#xff0c;这些工作的大涨&#xff0c;可以拉升大盘的上涨&#xff0c;可是其它工…