Flink电商实时数仓(六)

交易域支付成功事务事实表

  1. 从topic_db业务数据中筛选支付成功的数据
  2. 从dwd_trade_order_detail主题中读取订单事实数据、LookUp字典表
  3. 关联三张表形成支付成功宽表
  4. 写入 Kafka 支付成功主题
执行步骤
  1. 设置ttl,通过Interval join实现左右流的状态管理
  2. 获取下单明细数据:用户必然要先下单才有可能支付成功,因此支付成功明细数据集必然是订单明细数据集的子集。要注意:Interval Join要求表中均为Append数据,即“只能新增,不能修改”,订单明细表数据生成过程中用到了left join,生成了回撤流,看似不满足Interval Join的条件。但是,回撤数据进入Kafka会以null值形式存在,如果用Kafka Connector将订单明细封装为动态表,null值会被过滤,最终得到的是相同主键存在重复数据的Append流(动态表本质上就是流),满足Interval Join的条件。
    • Interval join只支持事件时间,因此数据必须携带水位线;建表时水位线的相关语法为 water for order_time as order_time - interval '5' second,这里要求数据是timestamp(3)
    • 原有的时间数据类型是bigint类型的ts,使用row_time as TO_TIMESTAMP_LTZ(ts,3)这个函数即可将原有的时间数据转换为水位线所需的数据类型
  3. 筛选支付数据:
    • 支付状态为支付成功
    • 操作类型为update
  4. 构建 LookUp 字典表
  5. 联上述三张表形成支付成功宽表,写入 Kafka 支付成功主题

核心代码如下

 public void handle(StreamExecutionEnvironment env, TableEnvironment tableEnv, String groupId) {//核心业务逻辑//1. 读取TopicDB主题数据createTopicDb(groupId,tableEnv);//2. 筛选支付成功的数据,从业务数据topic_db中filterPaymentTable(tableEnv);//3. 读取下单详情表数据, 从kafka读取数据createOrderDetailTable(tableEnv, groupId);//4. 创建base.dic字典表,从HBase维度数据中读取createBaseDic(tableEnv);//tableEnv.executeSql("select * from order_detail").print();//tableEnv.executeSql("select * from base_dic").print();//tableEnv.executeSql("select to_timestamp_ltz(ts,3) from order_detail");//5. 使用interval join 完成支付成功流和订单详情数据关联intervalJoin(tableEnv);//6. 使用lookup join完成维度退化Table resultTable = lookupJoin(tableEnv);//7. 创建upsert kafka连接器写出createKafkaSink(tableEnv);resultTable.insertInto(Constant.TOPIC_DWD_TRADE_ORDER_PAYMENT_SUCCESS).execute();}

事实表动态分流

在这里插入图片描述

dwd层其他的事实表都是从topic_db中去业务数据库一张表的变更数据,按照某些过滤后写入kafka的对应主题,它们处理逻辑相似且较为简单,可以结合配置表动态分流在同一个程序中处理。有点类似我们前面实现DIM层的动态配置。

  1. 清洗过滤和转换:判断是否满足json格式,如果满足转换为jsonObj对象
  2. 读取配置表数据,使用flink-cdc读取
  3. 转换数据格式,转换到对应bean对象中
  4. 配置信息广播话,然后跟主流数据进行连接
  5. 筛选出需要的字段
  6. 根据表中的sink table字段来动态写出到对应的kafka主题中

核心代码如下

public static void main(String[] args) {new DwdBaseDb().start(10019, 4, "dwd_base_db", Constant.TOPIC_DB);}@Overridepublic void handle(StreamExecutionEnvironment env, DataStreamSource<String> stream) {//核心业务逻辑//1. 读取topic_db数据//stream.print();//2. 清洗过滤和转换, jsonObjStream是主流数据SingleOutputStreamOperator<JSONObject> jsonObjStream = filterJson(stream);//jsonObjStream.print();//3. 读取配置表数据,使用flink-cdc读取,读取配置文件时并发度最好为1DataStreamSource<String> tableProcessDwd = getTableProcessDwd(env);//tableProcessDwd.print();4. 转换数据格式 string -> TableProcessDwd -> broadcastStream,广播流数据SingleOutputStreamOperator<TableProcessDwd> processDwdStream = getProcessDwdStream(tableProcessDwd);MapStateDescriptor<String, TableProcessDwd> mapStateDescriptor = new MapStateDescriptor<>("process_state", String.class, TableProcessDwd.class);BroadcastStream<TableProcessDwd> broadcastStream = processDwdStream.broadcast(mapStateDescriptor);//5. 连接主流和广播流,对主流数据进行判断是否需要保留SingleOutputStreamOperator<Tuple2<JSONObject, TableProcessDwd>> processStream = processBaseDb(jsonObjStream, broadcastStream, mapStateDescriptor);//processStream.print();//6. 筛选最后需要写出的字段SingleOutputStreamOperator<JSONObject> dataStream = filterColumns(processStream);//7. 通过sink_table的表名来动态写出到对应kafka主题//在setRecordSerializer()设置dataStream.sinkTo(FlinkSinkUtil.getKafkaSinkWithTopicName());}

gitee地址 :https://gitee.com/langpaian/gmall2023-realtime

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/578007.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

zookeeper基本使用

目录 环境搭建 单机版搭建 集群版搭建 基本语法使用 可视化客户端 数据结构 节点分类 1. 持久节点 2. 临时节点 3. 有序节点 4. 容器节点 5. TTL节点 节点状态 监听机制 watch监听 永久性watch 应用场景 1. 实现分布式锁 2. 乐观锁更新数据 应用场景总结 选…

C++中的存储类及其实例

文章目录 0. 语法1. 自动存储类自动存储类对象的属性自动存储类的例子 2. 外部存储类extern存储类对象的属性extern存储类的例子 3. 静态存储类静态存储类的属性静态存储类的例子 4. 寄存器存储类寄存器存储类对象的属性寄存器存储类例子 5. 可变&#xff08;mutable&#xff0…

【机器学习】Boosting算法-梯度提升算法(Gradient Boosting)

一、原理 梯度提升算法是一种集成学习方法&#xff0c;它可以将多个弱分类器或回归器组合成一个强分类器或回归器&#xff0c;提高预测性能。梯度提升算法的核心思想是利用损失函数的负梯度作为残差的近似值&#xff0c;然后用一个基学习器拟合这个残差&#xff0c;再将其加到之…

二维码智慧门牌管理系统:提升社区管理智能化水平

文章目录 前言一、全方位信息录入与查询二、公安权限账户访问的公安大数据后台三、社区工作人员申请权限安装录入软件四、业主通过移动终端扫描标准地址二维码门牌自主申报录入五、系统的价值 前言 在数字化时代&#xff0c;社区管理面临着更新流动人口信息、准确录入六实相关…

docker安装入门及redis,minio,rabbitmq应用安装

部分笔记来自黑马课堂&#xff1a;【黑马程序员Docker快速入门到项目部署&#xff0c;MySQL部署Nginx部署docker自定义镜像DockerCompose项目实战一套搞定-哔哩哔哩】 https://b23.tv/niWEhEF 一、什么是docker&#xff1a; 快速构建、运行、管理应用的工具。--帮助我们快速部…

【中小型企业网络实战案例 二】配置网络互连互通

​【中小型企业网络实战案例 一】规划、需求和基本配置-CSDN博客 热门IT技术视频教程&#xff1a;https://xmws-it.blog.csdn.net/article/details/134398330?spm1001.2014.3001.5502 配置接入层交换机 1.以接入交换机ACC1为例&#xff0c;创建ACC1的业务VLAN 10和20。 <…

nginx反向代理服务器及负载均衡服务配置

一、正向代理与反向代理 正向代理&#xff1a;是一个位于客户端和原始服务器(oricin server)之间的服务器&#xff0c;为了从原始服务器取得内容&#xff0c;客户端向代理发送一个请求并指定目标(原始服务器)&#xff0c;然后代理向原始服务器转交请求并将获得的内容返回给客户…

南邮最优化期末复习

黄金分割法 单纯形法&#xff08;大M法&#xff09; 求min, σ找最小&#xff0c;终止条件全部大于0 θ找最小&#xff0c;且不能为负数求max, σ找最大&#xff0c;终止条件全部小于0 θ找最小&#xff0c;且不能为负数 例题 二阶段单纯形法想 分支定界法&#xff08;第二章&…

【单调队列】LeetCode1499:满足不等式的最大值

涉及知识点 单调队列 题目 给你一个数组 points 和一个整数 k 。数组中每个元素都表示二维平面上的点的坐标&#xff0c;并按照横坐标 x 的值从小到大排序。也就是说 points[i] [xi, yi] &#xff0c;并且在 1 < i < j < points.length 的前提下&#xff0c; xi &…

iMazing2024免费版iOS移动设备管理软件

以自己的方式管理iPhone&#xff0c;让备受信赖的软件为您传输和保存音乐、消息、文件和数据。安全备份任何 iPhone、iPad 或 iPod touch。iMazing 功能强大、易于使用&#xff0c;称得上是 Mac 和 PC 上最好的 iOS 设备管理器。 正在为iTunes繁琐的操作发愁&#xff1f;设备数…

leetcode——打家劫舍问题汇总

本章汇总一下leetcode中的打家劫舍问题&#xff0c;使用经典动态规划算法求解。 1、梦开始的地方——打家劫舍&#xff08;★&#xff09; 本题关键点就是不能在相邻房屋偷东西。 采用常规动态规划做法&#xff1a; 根据题意设定dp数组&#xff0c;dp[i]的含义为&#xff1a…

Typora Mac激活

首先去官网选择mac版本下载安装 typora下载 然后打开typora包内容找到 /Applications/Typora.app/Contents/Resources/TypeMark/page-dist 找到/static/js/Licen..如下图 编辑器打开上面文件夹 输入 hasActivated"true"e.hasActivated 进行搜索 将它改为 hasA…

人工智能:网络犯罪分子的驱动力

随着 2024 年的临近&#xff0c;是时候展望今年的网络安全状况了。由于网络犯罪日益复杂&#xff0c;预计到 2025 年&#xff0c;全球网络安全成本将增至 10.5 万亿美元。 人工智能的使用不断发展&#xff0c;网络犯罪分子变得越来越有创造力 我们注意到&#xff0c;联邦调查…

Dash中的callback的使用 多input 6

代码说明 import plotly.express as pxmport plotly.express as px用于导入plotly.express模块并给它起一个别名px。这样在后续的代码中&#xff0c;你可以使用px来代替plotly.express&#xff0c;使代码更加简洁。 plotly.express是Plotly的一个子模块&#xff0c;用于快速创…

路由器常见故障分析及处理方法!

对当前的大多数网络来说&#xff0c;无论是实现网络互连还是访问Internet&#xff0c;路由器是不可或缺的。 由于路由器的重要性&#xff0c;对它的管理就成了维护人员的日常工作中重要的一部分&#xff0c;而路由器的故障分析和排除也是令许多维护人员极为困扰的问题之一。 路…

jenkins Job华为云EIP变更带宽

引言: 在数字化时代&#xff0c;云服务资源的弹性管理是企业降低运营成本、提高效率的关键手段。通过弹性公网IP&#xff08;EIP&#xff09;服务&#xff0c;企业可以实现按需计费&#xff0c;优化网络支出。然而&#xff0c;根据业务流量的不同阶段调整计费模式&#xff0c;…

批量归一化

目录 一、BN层介绍 1、深层神经网络存在的问题 2、批量归一化的解决方案 3、BN层作用位置 4、BN层在做什么 5、总结 二、批量归一化从零实现 1、实现批量归一化操作 2、创建BN层 3、对LeNet加入批量归一化 4、开始训练 三、简明实现 1、对LeNet加入批量归一化 2…

【SD】IP-Adapter 进阶 - 垫图 【画风重绘-必看】

目录 关于SD1.5的画风迁移 修改动作-方法一&#xff1a;提示词 修改动作-方法二&#xff1a;openpose 关于SD1.5的画风迁移 1.5测试模型&#xff1a;flat2DAnimerge_v30_2.safetensors [b2c93e7a89] 测试图&#xff1a; 文生图&#xff1a;best quality,masterpiece, co…

20231225在WIN10下使用SSH连接Ubuntu20.04.6

20231225在WIN10下使用SSH连接Ubuntu20.04.6 2023/12/25 23:03 https://jingyan.baidu.com/article/5552ef479e1856108ffbc9e3.html Win10怎么开启SSH功能 Win10怎么开启SSH功能,下面就一起来看看吧! 工具/原料 华硕天选4 Windows10 方法/步骤 点击左下角的开始菜单,打开Wind…

Redis过期删除策略和内存淘汰策略

1、设置Redis键过期时间 Redis提供了四个命令来设置过期时间&#xff08;生存时间&#xff09;。 EXPIRE <key> <ttl> &#xff1a;表示将键 key 的生存时间设置为 ttl 秒。 PEXPIRE <key> <ttl> &#xff1a;表示将键 key 的生存时间设置为 ttl 毫秒。…