Flink 实时数仓(二)【ODS 层开发】

前言

        最近投了不少的实习,也收到不错的反馈,虽然是中小公司偏多,但是毕竟现在这个环境双非进大厂实习可不同当年了。可惜的是学院不放人,无奈啊,遍身罗绮者,不是养蚕人。我累死累活肝了两年了,好不容易找到个不错的实习,可是学院...

        不骂了,没那功夫扯淡。回归主题,实时数仓将来真要是有机会从事的话那真不错,也不枉我当时学Flink的时候花了一整个学期,没暖气没电源的教室跑着三台虚拟机,CPU爆满,每天学俩小时就没电了,但还是花了半个学期学完了。今天开始正式的数仓搭建。

1、ODS 层开发

        在实时数仓这里,当我们把数据采集到 Kafka (topic_log 和 topic_db 主题)的时候,其实 ODS 层的数据存储任务就已经完成了(ODS 层的任务:数据的存储和备份)。接下来我们需要做的就是保持数据的有序:

1.1、Kafka 数据有序

        Kafka 只能保证单分区内有序,并不能保证全局有序。

1.1.1、设置 Kafka 分区默认个数

        这里我们需要设置 Kafka 的分区个数为 4,毕竟实时数仓对数据的吞吐量、并发性能的要求是比较高的,所以我们不能为了数据的有序性而把数据到挤到一个分区中:

vim /opt/module/kafka_2.12-3.4.1/config/server.properties
// 修改配置
num.partition=4

1.1.2、设置 Flink 精准一次

        Flink 程序从 Kafka 消费数据时会启动同属于一个消费者组的四个消费者,Kafka 消费者的默认分区分配策略是 Range + CooperativeSticky,消费者数和分区数相同时,每个消费者消费一个分区的数据。只要单分区数据有序,即可保证 Flink 单个并行度数据有序。

        我们这里的 Kafka 版本是 3.0.0,在 Kafka 1.x 及之后的版本中,保证数据单分区有序,条件如下:

不开启 Kafka 幂等性的情况

max.in.flight.requests.per.connection=1

开启 Kafka 幂等性的情况: 

// 必须小于等于5
max.in.flight.requests.per.connection=5

        在kafka1.x以后,启用幂等后,kafka服务端会缓存producer发来的最近5个request的元数据,故无论如何,都可以保证最近5个request的数据都是有序的。

        默认情况下幂等性是开启的,max.in.flight.requests.per.connection 默认值为 5,所以单分区数据默认是有序的,不需要做任何配置。

综上,我们可以保证 Flink 程序单个并行度的数据有序。

1.2、Maxwell 同步历史维度数据

        我们的实时计算不需要考虑历史的事实数据(比如下单、加购),但要考虑历史维度数据,毕竟没有维度只有业务过程是没法进行计算的。

1.2.1、为什么要同步历史维度数据而不同步历史业务数据

        当一个用户进行下单这个业务过程的时候,比如 user_id 为 1001 的用户在 province_id 为 17 的省份下单了 sku_id 为 10 的商品,并使用了 cupon_id 为 1001 的优惠券。

        当这个数据传到我们实时数仓的时候,我们必须知道用户是谁,它买了什么东西,有没有使用优惠券、使用了什么类型的优惠券、这个省份 id 是什么地方、下单方式是什么。这就涉及到了很多维度数据,所以我们必须提前把维度数据准备好,等到数据来的时候直接拿来用而不是才去业务库同步。

        在 ODS 层这里我们只需要原封不动的把维度数据导入到 Kafka ,等到搭建 DIM 层的时候直接从 ODS 层拿,而不是让 DIM 层去业务数据库中同步。

        在我们这个项目中,我们需要通过 Maxwell 同步下面这些维度表到 Kafka 的 

activity_info
activity_rule
activity_sku
base_category1
base_category2
base_category3
base_province
base_region
base_trademark
coupon_info
coupon_range
financial_sku_cost
sku_info
spu_info
user_info

编写同步脚本:

#!/bin/bash# 该脚本的作用是初始化所有的业务数据,只需执行一次MAXWELL_HOME=/opt/module/maxwellimport_data() {for tab in $@do$MAXWELL_HOME/bin/maxwell-bootstrap --database gmall --table ${tab} --config $MAXWELL_HOME/config.propertiesdone
}case $1 in
activity_info | activity_rule | activity_sku | base_category1 | base_category2 | base_category3 | base_dic | base_province | base_region | base_trademark | coupon_info | coupon_range | financial_sku_cost | sku_info | spu_info | user_info)import_data $1 ;;
"all")import_data activity_info activity_rule activity_sku base_category1 base_category2 base_category3 base_dic base_province base_region base_trademark coupon_info coupon_range financial_sku_cost sku_info spu_info user_info;;
esac

思考

        我们之前在学习离线数仓的时候, 使用 Maxwell 和 DataX 来同步业务数据,其中的 Maxwell 在离线数仓中其实并没有什么作用,至于削峰解耦在离线数仓中是根本不用考虑的。而如果不使用 Kafka ,我们可以直接通过 Flume 直接采集到 HDFS。

        在离线数仓中使用 Maxwell 的作用完全是为了现在学习实时数仓时,方便 Flink 来直接从 Kafka 去读取数据。但是 Flume 的数据中包含的 Event Header ,它对于实时数仓来说是完全没有用的,所以我们当时为了妥协实时数仓,就把 Flume 数据中的 Header 给去掉了,但是也就引入了零点漂移的问题,毕竟 Event Header 中保存着 timestamp 信息,而它在经过 Kafka 之后,会被 Kafka 给它添加一个 Header ,Header 中的 timestamp 时间默认为 Kafka 处理的时间,所以我们当时又设置了 Flume 拦截器来把 Header 中的 timestamp 值设置为 body 中的时间戳(因为拦截器只能设置在 Source 和 Channel 之间,所以还需要一个 Flume 再从 Kafka 读出来)。

Flume 拦截器代码

import com.alibaba.fastjson.JSONObject;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;public class TimeStampInterceptor implements Interceptor {private ArrayList<Event> events = new ArrayList<>();@Overridepublic void initialize() {}@Overridepublic Event intercept(Event event) {Map<String, String> headers = event.getHeaders();String log = new String(event.getBody(), StandardCharsets.UTF_8);JSONObject jsonObject = JSONObject.parseObject(log);String ts = jsonObject.getString("ts");headers.put("timestamp", ts);return event;}@Overridepublic List<Event> intercept(List<Event> list) {events.clear();for (Event event : list) {events.add(intercept(event));}return events;}@Overridepublic void close() {}public static class Builder implements Interceptor.Builder {@Overridepublic Interceptor build() {return new TimeStampInterceptor();}@Overridepublic void configure(Context context) {}}
}

总结

        实时数仓中 ODS 层的工作很简单,我们只需要用 Maxwell 把业务数据库中的维度表进行实时同步即可。至于服务器里的日志数据我们根本不需要实时处理!一个日志数据没必要实时处理!所以实时数仓中我们也就不需要用 Flume 这个工具。

        所以对于实时数仓的 ODS 层,我们主要用的就是 Maxwell 来同步维度数据,而对于事实数据(比如下单、加购),等到 DWD 层再进行处理。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/4274.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

终端安全管理软件哪个好?

终端安全管理软件是保障企业信息安全的重要工具。 它们能够有效地防范恶意软件、黑客攻击和其他安全威胁&#xff0c;并提供多方面的终端设备安全保护措施。 终端安全软件的功能和保护机制各不相同&#xff0c;这就需要企业根据自身的需求和情况来进行评估和选择。 下面总结了…

spring的常用注解

目录 1.前言 2.web url映射 2.1RequestMapping 2.2PostMapping 2.3GetMapping 3.参数接受和接口响应 3.1RequestParam 3.2RequstBoby 3.3ResponseBoby 3.4RestController 4.bean的存储 4.1Controller 4.2Service 4.3Repository 4.4Compontent 4.5Configuration …

短视频生成背景文字工具(前端工具)

过年这两天有些无聊就刷刷抖音&#xff0c;刷着刷着自己也蠢蠢欲动&#xff0c;想发上几个&#xff0c;可是却找不到合适自己的模板。由于个人喜欢一些古诗文之类的&#xff0c;所以自己简单的编写了一个小工具&#xff0c;如下图&#xff1a; 当设置好了之后&#xff0c;将浏…

关于Spring Aop的通知类型

一、概述 1.1 通知类型 为了符合各种流程处理&#xff0c;通知类型提供了5种&#xff0c;可以对目标方法进行全方位处理&#xff0c;如下所示&#xff1a; 通知类型说明前置通知&#xff08;Before advice&#xff09;在某连接点之前执行的通知&#xff0c;但这个通知不能阻止…

dart-sdk 安装以及vscode开发工具安装dart

1.安装dart-sdk 官方文档&#xff1a;&#xff1a;Dart 编程语言主页 | Dart 中文文档 | Dart 里面是命令行安装。 也可以使用安装包安装&#xff1a; 地址&#xff1a;https://www.gekorm.com/dart-windows/ 2.测试dart-sdk 安装成功与否 打开cmd &#xff0c;输入 dar…

Python数据分析实验二:Python数据预处理

目录 一、实验目的与要求二、实验任务三、主要程序清单和运行结果&#xff08;一&#xff09;对chipotle.csv文件的销售数据进行分析&#xff08;二&#xff09;对描述泰坦尼克号成员的信息进行可视化和相关分析 四、实验体会 一、实验目的与要求 1、目的&#xff1a;   掌握…

linux kernel内存泄漏检测工具之slub debug

一、背景 slub debug 是一个debug集&#xff0c;聚焦于kmem_cache 分配机制的slub内存&#xff08;比如kmalloc&#xff09;&#xff0c;这部分内存在内核中使用最频繁&#xff0c;slub debug其中有相当部分是用来处理内存踩踏&#xff0c;内存use after free 等异常的&#x…

创建electron,解决包清理的问题,解决镜像源卡住下载时间长

我的电脑用户名是Anyphasy,我的node.js安装在D:\developp\nodejss18.18.0 使用npm config get prefix查看node.js安装路径 npm config get prefix 创建electron 创建package.json文件,它里面记载了你的electron版本,项目描述,以及启动命令等信息 npm init -y 先查看你自己的…

MYSQL之DUPLICATE KEY

在 MySQL 中&#xff0c;DUPLICATE KEY 是用于处理插入数据时遇到唯一键&#xff08;Unique Key&#xff09;冲突的情况的一种机制。当向表中插入数据时&#xff0c;如果插入的数据违反了唯一约束&#xff08;比如唯一索引或主键约束&#xff09;&#xff0c;就会触发 DUPLICAT…

禅道项目管理系统身份认证绕过漏洞

禅道项目管理系统身份认证绕过漏洞 1.漏洞描述 禅道项目管理软件是国产的开源项目管理软件&#xff0c;专注研发项目管理&#xff0c;内置需求管理、任务管理、bug管理、缺陷管理、用例管理、计划发布等功能&#xff0c;完整覆盖了研发项目管理的核心流程。 禅道项目管理系统…

手写一个RNN前向传播以及反向传播

前向传播 根据公式 st tanh (Uxt Wst-1 ba) ot softmax(Vst by ) m 3 词的个数 n 5 import numpy as np import tensorflow as tf # 单个cell 的前向传播过程 # 两个输入&#xff0c;x_t&#xff0c;s_prev,parameters def rnn_cell_forward(x_t,s_prev,parameter…

运算符重载(1)

1.加号运算符重载&#xff0c;这里用编译器统一的名称operator代替函数名 #include<iostream> using namespace std; //1.成员函数的加号重载 //2.全局函数的加号重载 class Person { public:Person() {};//1.成员函数的加号重载//Person operator(Person& p)//{// P…

前端HTML5学习2(新增多媒体标签,H5的兼容性处理)

前端HTML5学习2新增多媒体标签&#xff0c;H5的兼容性处理&#xff09; 分清标签和属性新增多媒体标签新增视频标签新增音频标签新增全局属性 H5的兼容性处理 分清标签和属性 标签&#xff08;HTML元素&#xff09;和属性&#xff0c;标签定义了内容的类型或结构&#xff0c;而…

k8s学习(三十七)centos下离线部署kubernetes1.30(高可用)

文章目录 准备工作1、升级操作系统内核1.1、查看操作系统和内核版本1.2、下载内核离线升级包1.3、升级内核1.4、确认内核版本 2、修改主机名/hosts文件2.1、修改主机名2.2、修改hosts文件 3、关闭防火墙4、关闭SELINUX配置5、时间同步5.1、下载NTP5.2、卸载5.3、安装5.4、配置5…

【QEMU系统分析之启动篇(二十一)】

系列文章目录 第二十一章 QEMU系统仿真的加速器上电后设置分析 文章目录 系列文章目录第二十一章 QEMU系统仿真的加速器上电后设置分析 前言一、QEMU是什么&#xff1f;二、QEMU系统仿真的启动分析1.系统仿真的初始化代码2.主循环3. qemu_default_main()4. qemu_main_loop()ma…

BPE、Wordpiece、Unigram、SpanBERT等Tokenizer细节总结

BPE(Byte Pair Encoding) GPT-2和Roberta用的是这种&#xff0c;不会产生[UNK]这个unknown字符 这部分部分摘录自https://martinlwx.github.io/zh-cn/the-bpe-tokenizer/ 看以下code例子就足够理解了&#xff0c;核心是维护self.merges&#xff08;维护一个pair->str的字…

[蓝桥杯2024]-Reverse:rc4解析(对称密码rc4)

无壳 查看ida 这里应该运行就可以得flag&#xff0c;但是这个程序不能直接点击运行 按照伪代码写exp 完整exp&#xff1a; keylist(gamelab) content[0xB6,0x42,0xB7,0xFC,0xF0,0xA2,0x5E,0xA9,0x3D,0x29,0x36,0x1F,0x54,0x29,0x72,0xA8, 0x63,0x32,0xF2,0x44,0x8B,0x85,0x…

如何在 Visual Studio 中通过 NuGet 添加包

在安装之前要先确定Nuget的包源是否有问题。 Visual Studio中怎样更改Nuget程序包源-CSDN博客 1.图形界面安装 打开您的项目&#xff0c;并在解决方案资源管理器中选择您的项目。单击“项目”菜单&#xff0c;然后选择“管理 NuGet 程序包”选项。在“NuGet 包管理器”窗口中…

详解如何品味品深茶的精髓

在众多的茶品牌中&#xff0c;品深茶以其独特的韵味和深厚的文化底蕴&#xff0c;赢得了众多茶友的喜爱。今天&#xff0c;让我们一同探寻品深茶的精髓&#xff0c;品味其独特的魅力。 品深茶&#xff0c;源自中国传统茶文化的精髓&#xff0c;承载着世代茶人的智慧与匠心。这…

理解JavaScript对象[Bom]:属性、方法

对象是什么 在JavaScript中&#xff0c;对象&#xff08;Object&#xff09;是一种复合的数据类型&#xff0c;它允许你将数据和功能&#xff08;方法&#xff09;组合在一起。 创建对象 var person { firstName: John, lastName: Doe, age: 30, greet: function() …