Flink实时电商数仓(十)

common模块回顾

  1. app
    • BaseApp: 作为其他子模块中使用Flink - StreamAPI的父类,实现了StreamAPI中的通用逻辑,在其他子模块中只需编写关于数据处理的核心逻辑。
    • BaseSQLApp: 作为其他子模块中使用Flink- SQLAPI的父类。在里面设置了使用SQL API的环境、并行度、检查点等固定逻辑。
  2. bean:存放其他子模块中使用到的javaBean对象,因为如果一直使用jsonObject对象调用数据的话,需要使用类似getString("字段名")的方式,没有直接使用javaBean对象那么方便。
  3. constant
    • 存储字符串常量
    • 为了保证一致性,如果某个常量修改时,只需在这里修改即可对整个项目进行修改
  4. function
    • DorisMapFunction:将javaBean对象转换为对应的json字符串对象,并且将驼峰式命名方式修改为蛇形命名方式。便于写入doris。
  5. util
    • DateFormateUtil
    • FlinkSinkUtil
    • FlinkSourceUtil
    • HBaseUtil
    • IkUtil
    • JdbcUtil
    • SQLUtil
      • getUpsertKafakaSQL: 一定要声明主键,支持撤回流
      • getDorisSinkSQL: 用于写入Doris

dim层回顾

  • Flink-cdc监控mysql中的维度配置表
  • 将监控的数据流做成广播流
  • 将广播流和读取数据的主流进行connect
  • 主流数据根据广播流的配置信息进行分流,注意需要先提前缓存一次配置表信息
  • 达到动态拆分数据表的效果

dwd层FlinkSQL回顾

  • 注意join时会将所有数据都存储到内存中,需要考虑设置TTL
  • 大表join小表时,可以考虑使用lookup join
  • 如果数据流有明确的先后关系时,考虑使用Interval join

在支付成功模块,由于订单详情表处理时已经存在撤回流,但支付成功模块也是使用left join方式调用订单详情数据,会导致产生两次撤回流。在后续dws层处理时,要注意对数据进行去重过滤。

dws层回顾

  • 如何判断使用FlinkSQL还是StreamAPI
    • 如果比较标准化, 比如简单的开窗聚合,一般使用FlinkSQL
    • 如果需要使用状态处理数据,比如判断是否为独立用户,使用StreamAPI

交易域sku粒度订单下单各窗口汇总

  • 需求分析:从Kafka订单明细主题读取数据,过滤null数据并按照唯一键对数据去重,按照SKU维度分组,统计原始金额、活动减免金额、优惠券减免金额和订单金额,并关联维度信息,将数据写入Doris交易域SKU粒度下单各窗口汇总表

  • 思路分析:

    • 方案一:按照订单ID进行分组,根据业务逻辑设置定时器取最后一个数据进行发送
    • 方案二:将度量值存放到状态中,每次新数据到达时,将新的度量值减去状态中的度量值
  • 具体实现

    • 因为需要使用状态,故使用BaseApp; 设置端口号10029,并发度4,消费者组为类名,消费者主题名称为dwd订单详情
    • 读取dwd下单主题数据, stream.print()
    • 过滤清洗:
      • 去掉null数据, stream.flatMap(new FlatMapFunction<>())
      • ts: 水位线,不能为空;进行位数的修正,如果是10位的,使用 jsonObj.put("ts", ts*1000)
      • id: keyby的关键字,不能为空
      • sku_id: group by的粒度关键字,也不能为空
    • 添加水位线
      • 网络延迟5L
      • 添加数据的泛型,提取数据中的ts,作为水位线(注意观察ts的位数,需要为13位,毫秒级)
    • 修正度量值,转换数据结构
      • 使用id关键字进行分组
      • 使用process算子中的状态来进行处理stream.process(new KeyedProcessFunction<>),返回值为对应的javabean对象
      • 在状态中存储上一次的度量值大小,只保存30秒
      • processElement()方法中获取状态中的度量值,使用前需要判空,如果为空设置为0,之后才能进行数值计算。
      • 创建对应的bean对象,度量值都减去状态中的度量值和更新状态中当前的度量值
    • 分组开窗聚合
      • 使用skuId进行keyby
      • 分组后使用window算子进行开窗,设置窗口时间,注意Time属于org.apache.flink.streaming.api.windowing.time.Time.seconds()
      • 使用reduce算子进行聚合计算, 聚合时需要累积所有度量值
      • new ProcessWindowFunction()获取窗口信息, startTime, EndTime, curTime, 获取到后写入javaBean对象中
    • 关联维度信息
      • 先分组聚合再关联维度信息的原因:关联维度信息需要join操作,是很耗费性能的大操作。先聚合数据能大幅度减少数据量。
      • 启动HBase,查看对的sku_info表中是否存储着对应的维度信息
      • 获取外部连接,需要使用生命周期方法(open,close在整个算子执行过程中只运行一次);对应的关联维度信息,即RichMapFunction()
      • map方法中使用HBase的API读取表格数据,使用读取到的字段补全原本的信息
    • 创建HBase的API:读取表格数据 get
      • 获取table
      • 创建get对象
      • 调用get方法
      • 获取数据写入jsonObj
    • 写出到Doris

维度关联优化

  1. 旁路缓存:独立缓存服务有(redis, memcache).
    在这里插入图片描述
  • 使用旁路缓存时要注意保持数据的一致性,如果数据发生修改和删除,直接删除redis中的数据。

同步+旁路缓存模式

  1. 引入Jedis相关依赖
<dependency><groupId>redis.clients</groupId><artifactId>jedis</artifactId>
</dependency>
  1. 创建Redis工具类RedisUtil
  2. 在RichMapFunction中的open和close方法中获取和关闭HBase和Redisd的连接。
  3. 拼接对应的redisRowKey
  4. 读取Redis缓存的数据,jsonObj的字符串
  5. 判断redis读取到的数据是否为空
    • 没有数据:需要读取HBase;jsonObj = HBaseUtil.getCells(), 读取到数据后,使用jedis.setex()存储到redis
    • redis有缓存,直接返回
  6. 进行维度关联

Dim层写入HBase修正

  • 在dim层将数据写入HBase时,需要同时获取Redis的连接。
  • 判断redis中的缓存是否发生变化
    • 判断数据类型是修改或删除时,删除Redis中对应的数据
    • 拼写数据的rowkey
    • 使用jedis.del(rediskey)来删除

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/591416.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

数据库攻防学习之Redis

Redis 0x01 redis学习 在渗透测试面试或者网络安全面试中可能会常问redis未授权等一些知识&#xff0c;那么什么是redis&#xff1f;redis就是个数据库&#xff0c;常见端口为6379&#xff0c;常见漏洞为未授权访问。 0x02 环境搭建 这里可以自己搭建一个redis环境&#xf…

文件监控软件丨文件权限管理工具

文件已经成为企业最重要的资产之一。然而&#xff0c;文件的安全性和完整性经常受到威胁&#xff0c;如恶意软件感染、人为误操作、内部泄密等。 为了确保文件的安全&#xff0c;文件监控软件应运而生。本文将深入探讨文件监控软件的概念、功能、应用场景和未来发展等方面。 文…

7、InternVL

简介 github demo 使用网络获取的油画图片&#xff0c;InternVL识别还算可以。 使用stable diffusion生成的图片&#xff0c;InternVL能很好的识别。 权重 huggingface地址 模型搭建 github地址 下载源码 git clone https://github.com/OpenGVLab/InternVL.git创建环…

Windows 使用 nmap软件测试 UDP 端口

下载windows版nmap &#xff0c;下载后双机默认安装。 Download the Free Nmap Security Scanner for Linux/Mac/Windows 打开CMD &#xff0c; 输入 cd C:\Program Files (x86)\Nmap C:\Program Files (x86)\Nmap>ncat -z -v -u ntp.aliyun.com 123 Ncat: Version 7.80 ( …

【HarmonyOS开发】共享包HAR和HSP的创建和使用以及三方库的发布

OpenHarmony提供了两种共享包&#xff0c;HAR&#xff08;Harmony Archive&#xff09;静态共享包&#xff0c;和HSP&#xff08;Harmony Shared Package&#xff09;动态共享包。 HAR与HSP都是为了实现代码和资源的共享&#xff0c;都可以包含代码、C库、资源和配置文件&…

redis的搭建及应用(七)-redis的限流插件redis-cell

Redis限流插件-redis-cell redis-cell 是一个用rust语言编写的基于令牌桶算法的的限流模块&#xff0c;提供原子性的限流功能&#xff0c;并允许突发流量&#xff0c;可以很方便的应用于分布式环境中。 下载redis-cell插件 访问Releases brandur/redis-cell (github.com) 上传…

计算机网络——应用层与网络安全(六)

前言&#xff1a; 前几章我们已经对TCP/IP协议的下四层已经有了一个简单的认识与了解&#xff0c;下面让我们对它的最顶层&#xff0c;应用层进行一个简单的学习与认识&#xff0c;由于计算机网络多样的连接形式、不均匀的终端分布&#xff0c;以及网络的开放性和互联性等特征&…

Python流星雨完整代码

文章目录 环境需求完整代码详细分析环境需求 python3.11.4PyCharm Community Edition 2023.2.5pyinstaller6.2.0(可选,这个库用于打包,使程序没有python环境也可以运行,如果想发给好朋友的话需要这个库哦~)【注】 python环境搭建请见:https://want595.blog.csdn.net/arti…

找第三方数据公司获取电商平台商品数据订单数据店铺信息等

API文档 如何获取&#xff1f; 应用业务场景&#xff08;不限&#xff09;

京东tp3手势验证

2024祝我们越来越好。 新年第二天&#xff0c;来看下这最新的tp3手势验证码&#xff0c;很在之前就发过一篇&#xff0c;最近看了看更新了一个东西&#xff0c;但是难点还是在轨迹上面&#xff0c;感兴趣的朋友可以去看看。 risk_jd[jstub] 改了下这&#xff0c;之前我都没带…

基于ThinkPHP的云盘系统Cloudreve本地搭建并实现远程访问

文章目录 1、前言2、本地网站搭建2.1 环境使用2.2 支持组件选择2.3 网页安装2.4 测试和使用2.5 问题解决 3、本地网页发布3.1 cpolar云端设置3.2 cpolar本地设置 4、公网访问测试5、结语 1、前言 自云存储概念兴起已经有段时间了&#xff0c;各互联网大厂也纷纷加入战局&#…

Flume基础知识(一):Flume组成原理与架构

1. Flume定义 Flume是Cloudera提供的一个高可用的&#xff0c;高可靠的&#xff0c;分布式的海量日志采集、聚合和传输的系统。Flume基于流式架构&#xff0c;灵活简单。 Flume最主要的作用就是&#xff0c;实时读取服务器本地磁盘的数据&#xff0c;将数据写入到HDFS。 2. Fl…

关于标准那些事——第六篇 四象之“白虎”(要素的编写)

两仪生四象——东方青龙&#xff08;木&#xff09;、西方白虎&#xff08;金&#xff09;、南方朱雀&#xff08;火&#xff09;、北方玄武&#xff08;水&#xff09; 分别对应标准编写之四象——层次的编写、要素的编写、要素的表述、格式的编排。 今天来分享一下 要素的编…

【Bug解决】Failed to configure a DataSource

1、问题描述 SpringBoot项目在启动时报出下面的错误&#xff1a; Description: Failed to configure a DataSource: url attribute is not specified and no embedded datasource could be configured. Reason: Failed to determine a suitable driver class Action: Consider…

大一C语言程序细节复盘2

7-4 学生成绩排序 分数 27 全屏浏览题目 切换布局 作者 张泳 单位 浙大城市学院 假设学生的基本信息包括学号、姓名、三门课程成绩以及个人平均成绩&#xff0c;定义一个能够表示学生信息的结构类型。输入n&#xff08;n<50&#xff09;个学生的成绩信息&#xff0c;按照学生…

NFS(文件存储服务)

题目 创建NFS共享文件夹,允许InsideCli可以远程挂载,映射挂载到D卷。共享文件夹路径为D:\shares\NFSshare。共享名称为NFSshare。允许未映射的用户访问。共享权限为读/写。服务配置步骤( 服务端 ) 步骤一 - 安装服务 步骤二 - 配置NFS服务 配置共享文件夹 选择共享路径

WSUS更新服务

题目 安装WSUS更新服务,更新补丁目录设置为“c:\wsusbackup”。创建更新组名称为“CHINASKILLS-WSUS”。每天凌晨03:00下发自动更新。更新服务器地址为“http://wsus.chinaskills.com:8530”。服务配置步骤 步骤一 - 安装Windows Server 更新服务 安装Windows Server 更新服…

外汇天眼:注意!年末大量无监管平台上榜,有的仍在诈骗!

纵观整个10月的天眼客诉排行榜&#xff0c;可以发现此次名单基本上都是无监管的外汇平台&#xff0c;无法出金依旧仍是客诉的关键来源。在本月的客诉榜单中&#xff0c;超过半数的平台仍然在活跃中&#xff0c;交易者们一定要远离&#xff0c;注意警惕。 接下来&#xff0c;就…

一键批量处理,轻松管理TXT文本,自动识别编码,快速转换为ANSI!

在数字化时代&#xff0c;文本文件的管理显得尤为重要。你是否曾经遇到过TXT文本编码格式混乱&#xff0c;导致文件无法正常打开或显示乱码的情况&#xff1f;现在&#xff0c;我们为您带来了一款强大的文本批量处理工具&#xff0c;具备自动识别TXT文本编码并转换为ANSI的功能…

如何使用Docker部署Swagger Editor结合内网穿透实现远程编辑API文档

文章目录 Swagger Editor本地接口文档公网远程访问1. 部署Swagger Editor2. Linux安装Cpolar3. 配置Swagger Editor公网地址4. 远程访问Swagger Editor5. 固定Swagger Editor公网地址 Swagger Editor本地接口文档公网远程访问 Swagger Editor是一个用于编写OpenAPI规范的开源编…