Flink电商实时数仓(六)

交易域支付成功事务事实表

  1. 从topic_db业务数据中筛选支付成功的数据
  2. 从dwd_trade_order_detail主题中读取订单事实数据、LookUp字典表
  3. 关联三张表形成支付成功宽表
  4. 写入 Kafka 支付成功主题
执行步骤
  1. 设置ttl,通过Interval join实现左右流的状态管理
  2. 获取下单明细数据:用户必然要先下单才有可能支付成功,因此支付成功明细数据集必然是订单明细数据集的子集。要注意:Interval Join要求表中均为Append数据,即“只能新增,不能修改”,订单明细表数据生成过程中用到了left join,生成了回撤流,看似不满足Interval Join的条件。但是,回撤数据进入Kafka会以null值形式存在,如果用Kafka Connector将订单明细封装为动态表,null值会被过滤,最终得到的是相同主键存在重复数据的Append流(动态表本质上就是流),满足Interval Join的条件。
    • Interval join只支持事件时间,因此数据必须携带水位线;建表时水位线的相关语法为 water for order_time as order_time - interval '5' second,这里要求数据是timestamp(3)
    • 原有的时间数据类型是bigint类型的ts,使用row_time as TO_TIMESTAMP_LTZ(ts,3)这个函数即可将原有的时间数据转换为水位线所需的数据类型
  3. 筛选支付数据:
    • 支付状态为支付成功
    • 操作类型为update
  4. 构建 LookUp 字典表
  5. 联上述三张表形成支付成功宽表,写入 Kafka 支付成功主题

核心代码如下

 public void handle(StreamExecutionEnvironment env, TableEnvironment tableEnv, String groupId) {//核心业务逻辑//1. 读取TopicDB主题数据createTopicDb(groupId,tableEnv);//2. 筛选支付成功的数据,从业务数据topic_db中filterPaymentTable(tableEnv);//3. 读取下单详情表数据, 从kafka读取数据createOrderDetailTable(tableEnv, groupId);//4. 创建base.dic字典表,从HBase维度数据中读取createBaseDic(tableEnv);//tableEnv.executeSql("select * from order_detail").print();//tableEnv.executeSql("select * from base_dic").print();//tableEnv.executeSql("select to_timestamp_ltz(ts,3) from order_detail");//5. 使用interval join 完成支付成功流和订单详情数据关联intervalJoin(tableEnv);//6. 使用lookup join完成维度退化Table resultTable = lookupJoin(tableEnv);//7. 创建upsert kafka连接器写出createKafkaSink(tableEnv);resultTable.insertInto(Constant.TOPIC_DWD_TRADE_ORDER_PAYMENT_SUCCESS).execute();}

事实表动态分流

在这里插入图片描述

dwd层其他的事实表都是从topic_db中去业务数据库一张表的变更数据,按照某些过滤后写入kafka的对应主题,它们处理逻辑相似且较为简单,可以结合配置表动态分流在同一个程序中处理。有点类似我们前面实现DIM层的动态配置。

  1. 清洗过滤和转换:判断是否满足json格式,如果满足转换为jsonObj对象
  2. 读取配置表数据,使用flink-cdc读取
  3. 转换数据格式,转换到对应bean对象中
  4. 配置信息广播话,然后跟主流数据进行连接
  5. 筛选出需要的字段
  6. 根据表中的sink table字段来动态写出到对应的kafka主题中

核心代码如下

public static void main(String[] args) {new DwdBaseDb().start(10019, 4, "dwd_base_db", Constant.TOPIC_DB);}@Overridepublic void handle(StreamExecutionEnvironment env, DataStreamSource<String> stream) {//核心业务逻辑//1. 读取topic_db数据//stream.print();//2. 清洗过滤和转换, jsonObjStream是主流数据SingleOutputStreamOperator<JSONObject> jsonObjStream = filterJson(stream);//jsonObjStream.print();//3. 读取配置表数据,使用flink-cdc读取,读取配置文件时并发度最好为1DataStreamSource<String> tableProcessDwd = getTableProcessDwd(env);//tableProcessDwd.print();4. 转换数据格式 string -> TableProcessDwd -> broadcastStream,广播流数据SingleOutputStreamOperator<TableProcessDwd> processDwdStream = getProcessDwdStream(tableProcessDwd);MapStateDescriptor<String, TableProcessDwd> mapStateDescriptor = new MapStateDescriptor<>("process_state", String.class, TableProcessDwd.class);BroadcastStream<TableProcessDwd> broadcastStream = processDwdStream.broadcast(mapStateDescriptor);//5. 连接主流和广播流,对主流数据进行判断是否需要保留SingleOutputStreamOperator<Tuple2<JSONObject, TableProcessDwd>> processStream = processBaseDb(jsonObjStream, broadcastStream, mapStateDescriptor);//processStream.print();//6. 筛选最后需要写出的字段SingleOutputStreamOperator<JSONObject> dataStream = filterColumns(processStream);//7. 通过sink_table的表名来动态写出到对应kafka主题//在setRecordSerializer()设置dataStream.sinkTo(FlinkSinkUtil.getKafkaSinkWithTopicName());}

gitee地址 :https://gitee.com/langpaian/gmall2023-realtime

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/578007.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

zookeeper基本使用

目录 环境搭建 单机版搭建 集群版搭建 基本语法使用 可视化客户端 数据结构 节点分类 1. 持久节点 2. 临时节点 3. 有序节点 4. 容器节点 5. TTL节点 节点状态 监听机制 watch监听 永久性watch 应用场景 1. 实现分布式锁 2. 乐观锁更新数据 应用场景总结 选…

nodejs如何使用clusster配置多cpu

Nodejs的主进程是单线程的&#xff0c;但它有多线程处理⽅案&#xff08;更准备来说是多进程⽅案&#xff09;&#xff0c;即主进程开启不同的⼦进程&#xff0c;主进程接收所有请求&#xff0c;然后将分发给其它不同的nodejs⼦进程处理。 它⼀般有两种实现&#xff1a; 1. 主进…

C++中的存储类及其实例

文章目录 0. 语法1. 自动存储类自动存储类对象的属性自动存储类的例子 2. 外部存储类extern存储类对象的属性extern存储类的例子 3. 静态存储类静态存储类的属性静态存储类的例子 4. 寄存器存储类寄存器存储类对象的属性寄存器存储类例子 5. 可变&#xff08;mutable&#xff0…

【机器学习】Boosting算法-梯度提升算法(Gradient Boosting)

一、原理 梯度提升算法是一种集成学习方法&#xff0c;它可以将多个弱分类器或回归器组合成一个强分类器或回归器&#xff0c;提高预测性能。梯度提升算法的核心思想是利用损失函数的负梯度作为残差的近似值&#xff0c;然后用一个基学习器拟合这个残差&#xff0c;再将其加到之…

MATLAB绘图

1.二维图 1.线图 要创建二维线图&#xff0c;请使用plot函数。例如&#xff0c;绘制正弦函数&#xff1a; x linspace(0,2*pi); y sin(x); plot(x,y)可以添加轴并添加标题&#xff1a; xlabel("x") ylabel("y") title("plot of the sine Functio…

二维码智慧门牌管理系统:提升社区管理智能化水平

文章目录 前言一、全方位信息录入与查询二、公安权限账户访问的公安大数据后台三、社区工作人员申请权限安装录入软件四、业主通过移动终端扫描标准地址二维码门牌自主申报录入五、系统的价值 前言 在数字化时代&#xff0c;社区管理面临着更新流动人口信息、准确录入六实相关…

AI 绘画 | Stable Diffusion 视频生成重绘

前言 本篇文章教会你如何使用Stable Diffusion WEB UI,实现视频的人物,或是动物重绘,可以更换人物或者动物,也可以有真实变为二次元。 视频展示 左边是原视频,右边是重绘视频原视频和Ai视频画面合并 教程 这里需要用到Stable Diffusion WEB UI的扩展插件ebsynth_utility…

docker安装入门及redis,minio,rabbitmq应用安装

部分笔记来自黑马课堂&#xff1a;【黑马程序员Docker快速入门到项目部署&#xff0c;MySQL部署Nginx部署docker自定义镜像DockerCompose项目实战一套搞定-哔哩哔哩】 https://b23.tv/niWEhEF 一、什么是docker&#xff1a; 快速构建、运行、管理应用的工具。--帮助我们快速部…

【中小型企业网络实战案例 二】配置网络互连互通

​【中小型企业网络实战案例 一】规划、需求和基本配置-CSDN博客 热门IT技术视频教程&#xff1a;https://xmws-it.blog.csdn.net/article/details/134398330?spm1001.2014.3001.5502 配置接入层交换机 1.以接入交换机ACC1为例&#xff0c;创建ACC1的业务VLAN 10和20。 <…

nginx反向代理服务器及负载均衡服务配置

一、正向代理与反向代理 正向代理&#xff1a;是一个位于客户端和原始服务器(oricin server)之间的服务器&#xff0c;为了从原始服务器取得内容&#xff0c;客户端向代理发送一个请求并指定目标(原始服务器)&#xff0c;然后代理向原始服务器转交请求并将获得的内容返回给客户…

南邮最优化期末复习

黄金分割法 单纯形法&#xff08;大M法&#xff09; 求min, σ找最小&#xff0c;终止条件全部大于0 θ找最小&#xff0c;且不能为负数求max, σ找最大&#xff0c;终止条件全部小于0 θ找最小&#xff0c;且不能为负数 例题 二阶段单纯形法想 分支定界法&#xff08;第二章&…

【单调队列】LeetCode1499:满足不等式的最大值

涉及知识点 单调队列 题目 给你一个数组 points 和一个整数 k 。数组中每个元素都表示二维平面上的点的坐标&#xff0c;并按照横坐标 x 的值从小到大排序。也就是说 points[i] [xi, yi] &#xff0c;并且在 1 < i < j < points.length 的前提下&#xff0c; xi &…

前端性能优化三十七:花裤衩模板路由懒加载

(1). 目的: 1. 让包不需要一次把所有的页面的加载进来,只加载当前页面的路由组件即可.(2). 3种引入方式: export default new Router({routes: [{path: /,name: "index",// 1. 普通引用component: index// 2. 路由懒加载component: resolve > require([/views/i…

iMazing2024免费版iOS移动设备管理软件

以自己的方式管理iPhone&#xff0c;让备受信赖的软件为您传输和保存音乐、消息、文件和数据。安全备份任何 iPhone、iPad 或 iPod touch。iMazing 功能强大、易于使用&#xff0c;称得上是 Mac 和 PC 上最好的 iOS 设备管理器。 正在为iTunes繁琐的操作发愁&#xff1f;设备数…

leetcode——打家劫舍问题汇总

本章汇总一下leetcode中的打家劫舍问题&#xff0c;使用经典动态规划算法求解。 1、梦开始的地方——打家劫舍&#xff08;★&#xff09; 本题关键点就是不能在相邻房屋偷东西。 采用常规动态规划做法&#xff1a; 根据题意设定dp数组&#xff0c;dp[i]的含义为&#xff1a…

Typora Mac激活

首先去官网选择mac版本下载安装 typora下载 然后打开typora包内容找到 /Applications/Typora.app/Contents/Resources/TypeMark/page-dist 找到/static/js/Licen..如下图 编辑器打开上面文件夹 输入 hasActivated"true"e.hasActivated 进行搜索 将它改为 hasA…

微机原理4练习题答案

一、单项选择题(本大题共 15小题,每小题 3 分,共45 分。在每小题给出的四个备选项中,选出一个正确的答案。 8086/8088系统中,堆栈以(C)为单位进行操作。A. 半字节 B. 字节 C. 字 D. 双字 2、下列指令中不影响进位标志CF的指令是©。 A. SUB AX, BX B. SHL AL, 1…

Word使用技巧【开题报告】

1、修改目录&#xff1a;选中目录&#xff0c;点击更新域。 2、更改或删除单个页面上的页眉或页脚 3、借助其他软件在Word导入参考文献 利用zetero导入文献&#xff1a;安装zetero 解决参考文献插入问题 在Word中插入文献操作步骤 英文文献出现“等”&#xff0c;如何解决 Zote…

人工智能:网络犯罪分子的驱动力

随着 2024 年的临近&#xff0c;是时候展望今年的网络安全状况了。由于网络犯罪日益复杂&#xff0c;预计到 2025 年&#xff0c;全球网络安全成本将增至 10.5 万亿美元。 人工智能的使用不断发展&#xff0c;网络犯罪分子变得越来越有创造力 我们注意到&#xff0c;联邦调查…

Dash中的callback的使用 多input 6

代码说明 import plotly.express as pxmport plotly.express as px用于导入plotly.express模块并给它起一个别名px。这样在后续的代码中&#xff0c;你可以使用px来代替plotly.express&#xff0c;使代码更加简洁。 plotly.express是Plotly的一个子模块&#xff0c;用于快速创…