Flink实时电商数仓(二)

GitLab的用户创建和推送

  1. 在root用户-密码界面重新设置密码
  2. 添加Leader用户和自己使用的用户
  3. 使用root用户创建相应的群组
  4. 使用Leader用户创建对应的项目
  5. 设置分支配置为“初始推送后完全保护”
  6. 设置.gitignore文件,项目配置文件等其他非通用代码无需提交
  7. 安装gitlab project 2020插件
  8. 点击share project on gitlab 即可将项目上传到gitlab中

Flink集群的搭建

  • 只需要运行Yarn模式
  • 配置Hadoop的环境变量
    在这里插入图片描述
  • 将Flink1.17解压安装到对应为止即可

Hbase的配置

  1. 依赖zookeeper和hadoop这两个框架
  2. 检查Hadoop是否退出安全模式,如果丢失文件,先退出安全模式,hdfs dfsadmin -safemode leave
  3. 解压Hbase2.4.11的安装包
  4. 添加Hbase的环境变量
    在这里插入图片描述
  5. 修改配置文件
    • hbase-env.xml
      • export hbase_manages_zk=false 不使用自带的zookeeper
    • hbase-site.xml
      • hbase.cluster.distributed = true 使用集群模式
      • hbase.zookeeper.quorum = hadoop102… zookeeper连接地址
      • hbase.rootdir = hdfs://hadoop102:8020, hbase在hdfs的存放根路径
      • hbase.wal.provider = filesystem 预写日志
    • regionservers: 添加hbase小弟的主机名称

Redise的配置

  1. 进入redise目录,执行make指令进行编译
  2. make instanll安装
  3. 将myredis.conf文件复制到~/目录下
  4. 将bind 127.0.0.1 注释掉,并且关闭保护模式
  5. 设置daemon 后台启动模式为yes
  6. redis-server ./my_redis.conf后台启动

实时数仓ODS层

  • 保证数据模拟器产生的数据是有序的
    • 设置mock.if-realtime:1,重复执行数据模拟器产生数据时,会从当前时间继续产生数据。
    • Kafka数据有序:Flink并发度和Kafka的分区数一致
      • 设置三个kafka节点的分区个数都为4,num.partitions=4
      • Flink的并发度=4
  • 历史维度数据
    • 使用maxwell的bootstrap功能初始化维度信息(json格式),写入到kafka
    • 编写mysql_to_kafka_init.sh脚本
    • maxwell需要检查是否连接mysql的binlog成功,查看日志;如果出错,需要在mysql的maxwell库中删除所有表即可

实时数仓dim层

  • dim层的设计依据是维度建模理论,并且遵循三范式,使用雪花模型
  • dim层的数据存储在Hbase中
  • 开发时需要切换到dev开发分支
  • 为Flink的开发创建一个基类,名为BaseApp
    • 抽象方法handle(): 每个主程序的业务逻辑
    • 具体方法start():里面实现Flink代码的通用逻辑
  • 不同分组的数据只能消费一次,如果数据需要给多个程序使用,就需要分为不同的group

Flink-cdc获取维度信息

  1. 数据清洗
  2. 动态拆分维度表功能
    • 方式1:直接将维度表做成List< String > (维度表名称)保存
      • 如果将代码写死,后续想要修改,需要重新编译修改
    • 方式2:将维度表名称设计为单独的一个配置文件,而不是在代码里面写死;后续想要修改,直接改配置文件,重启任务即可生效
    • 方式3:热修改hotfix, 热加载配置文件,不需要重启;热加载文件一般是以时间周期作为加载逻辑。时间长时会出现时效性问题,时间短的话过于耗费资源。
    • 方式4:zookeeper的watch的监控,能够存储基础的表名,但是不适合存储完整的表格信息,除了要判断哪些是维度表,还需要记录哪些数据需要写出到Hbase。
    • 方式5:cdc,变更数据抓取,类似与maxwell。
  3. 注意:运行下面的代码需要再虚拟机的/etc/my.cnf文件中开启对应数据库的binlog日志。注意对照库名是否填写正确。
public class Test02 {public static void main(String[] args) {//创建env//1.创建运行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//默认是最大并行度env.setParallelism(4);System.setProperty("HADOOP_USER_NAME", "atguigu");//设置检查点和状态后端// 1.4 状态后端及检查点相关配置// 1.4.1 设置状态后端env.setStateBackend(new HashMapStateBackend()); 1.4.2 开启 checkpoint//env.enableCheckpointing(5000); 1.4.3 设置 checkpoint 模式: 精准一次//env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); 1.4.4 checkpoint 存储//env.getCheckpointConfig().setCheckpointStorage("hdfs://hadoop102:8020/gmall2023/stream/" + "test01"); 1.4.5 checkpoint 并发数//env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); 1.4.6 checkpoint 之间的最小间隔//env.getCheckpointConfig().setMinPauseBetweenCheckpoints(5000); 1.4.7 checkpoint  的超时时间//env.getCheckpointConfig().setCheckpointTimeout(10000); 1.4.8 job 取消时 checkpoint 保留策略//env.getCheckpointConfig().setExternalizedCheckpointCleanup(RETAIN_ON_CANCELLATION);//读取数据//mysql sourceMySqlSource<String> mySqlSource = MySqlSource.<String>builder().hostname(Constant.MYSQL_HOST).port(Constant.MYSQL_PORT).username(Constant.MYSQL_USER_NAME).password(Constant.MYSQL_PASSWORD).databaseList("gmall2023_config").tableList("gmall2023_config.table_process_dim").deserializer(new JsonDebeziumDeserializationSchema()).startupOptions(StartupOptions.initial()).build();DataStreamSource<String> ds = env.fromSource(mySqlSource,WatermarkStrategy.noWatermarks(),"kafkasource").setParallelism(1);ds.print();try {env.execute();} catch (Exception e) {throw new RuntimeException(e);}}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/233661.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

(JAVA)-创建多线程的方式

1.继承Thread类 1.创建一个继承字Thread类的子类 2.重写Thread类的run方法 public class MyThread extends Thread{Overridepublic void run() {for (int i 0; i < 100; i) {System.out.println(getName()"hello");}} }3.创建Thread类的子类对象 4.通过子类对象调…

Ubuntu:那些年踩过的坑?注意事项分享

Ubuntu系统在使用过程中可能会遇到一些需要注意的"坑"&#xff0c;以下是一些常见的问题和示例&#xff1a; 1.权限问题&#xff1a; 在Ubuntu中&#xff0c;默认情况下&#xff0c;许多系统文件和目录只有root用户才能修改。如果你试图以普通用户身份修改这些文件&a…

HarmonyOS应用开发实战—开箱即用的应用首页页面【ArkTS】【鸿蒙专栏-34】

一.HarmonyOS应用开发实战—开箱即用的应用首页页面【ArkTS】【鸿蒙专栏-34】 1.1 项目背景 HarmonyOS(鸿蒙操作系统)是华为公司推出的一种分布式操作系统。它被设计为一种全场景、全连接的操作系统,旨在实现在各种设备之间的无缝协同和共享,包括智能手机、平板电脑、智能…

轻量级购物小程序H5产品设计经典样例

主要是看到这个产品设计的不错值得借鉴特记录如下&#xff1a; 不过大多数购物app都大致相同&#xff0c;这个算是经典样例&#xff0c;几乎都可以复制&#xff0c;我第一次使用&#xff0c;感觉和顺畅。看上去产品是经过打磨的&#xff0c;布局非常好。内容也很丰富。支持异业…

Leetcode—128.最长连续序列【中等】

2023每日刷题&#xff08;六十四&#xff09; Leetcode—128.最长连续序列 实现代码 class Solution { public:int longestConsecutive(vector<int>& nums) {unordered_set<int> s;for(auto num: nums) {s.insert(num);}int longestNum 0;for(auto num: s) …

LeetCode day27

LeetCode day27 —今天做到树&#xff0c;&#xff0c;&#xff0c;对不起我的数据结构老师啊~~~ 7. 整数反转 给你一个 32 位的有符号整数 x &#xff0c;返回将 x 中的数字部分反转后的结果。 如果反转后整数超过 32 位的有符号整数的范围 [−231, 231 − 1] &#xff0c…

Maven scope属性解读和使用注意事项

目录 compile runtime test system provided import dependencyManagement标签介绍 maven的scope有哪些&#xff1a; maven的scope一共包括&#xff1a;compile、runtime、test、system、provided、import。 compile <dependency><groupId>org.apache.htt…

【PostgreSQL】从零开始:(十六)数据类型-数值类型

数值类型定义 数值类型是一种用于存储数字的数据类型。在编程语言中&#xff0c;数值类型通常包括整数类型和浮点数类型。 整数类型用于存储整数值&#xff0c;包括正整数、负整数和零。在不同的编程语言中&#xff0c;整数类型可能有不同的大小限制&#xff0c;例如8位、16位…

【AI图集】猫狗的自动化合成图集

猫是一种哺乳动物&#xff0c;通常被人们作为宠物饲养。它们有柔软的毛发&#xff0c;灵活的身体和尖锐的爪子。猫是肉食性动物&#xff0c;主要以肉类为食&#xff0c;但也可以吃一些蔬菜和水果。猫通常在夜间活动&#xff0c;因此它们需要足够的玩具和活动空间来保持健康和快…

k8s pod常用资源清单

K8S 的资源清单 参数名类型字段说明apiVersionStringK8S APl 的版本&#xff0c;可以用 kubectl api versions 命令查询kindStringyam 文件定义的资源类型和角色metadataObject元数据对象&#xff0c;下面是它的属性metadata.nameString元数据对象的名字&#xff0c;比如 pod …

Android Uri scheme协议file转content

一、Uri的介绍 在Android开发中&#xff0c;Uri&#xff08;Uniform Resource Identifier&#xff09;是用于标识和访问各种资源的核心概念。这些资源可能包括文件、网络URL、数据库记录等。在处理这些资源时&#xff0c;我们可能会遇到不同的Uri协议&#xff0c;如file和conte…

[PTA]矩阵列平移

给定一个 nn 的整数矩阵。对任一给定的正整数 k<n&#xff0c;我们将矩阵的偶数列的元素整体向下依次平移 1、……、k、1、……、k、…… 个位置&#xff0c;平移空出的位置用整数 x 补。你需要计算出结果矩阵的每一行元素的和。 输入格式&#xff1a; 输入第一行给出 3 个…

Another git process seems to be running in this repository, e.g. an editor o

操作任何git命令&#xff0c;都提示该内容。 Another git process semms to be running in this repository, e.g. an editor opened by ‘git commit’. Please make sure all processes are terminated then try again. If it still fails, a git process remove the file m…

Flink系列之:Print SQL连接器

Flink系列之&#xff1a;Print SQL连接器 一、Print SQL连接器二、创建一张基于Print的表三、连接器参数 一、Print SQL连接器 Print 连接器允许将每一行写入标准输出流或者标准错误流。 设计目的&#xff1a; 简单的流作业测试。对生产调试带来极大便利。 四种 format 选项…

如何保证架构的质量

1. 如何保证架构的质量: ①. 稳定性、健壮性(1). 系统稳定性: ①. 定义:a. 当一个实际的系统处于一个平衡的状态时,如果受到外来作用的影响时,系统经过一个过渡过程仍然能够回到原来的平衡状态.b. 可以说这个系统是稳定的,否则系统不稳定c. 如一根绳子绑着小球,处于垂直状态,…

Golang(壹)

爱情不需要华丽的言语&#xff0c;只需要默默的行动。 简介 应用领域&#xff1a; 下载vscode 使用vscode Go下载 - Go语言中文网 - Golang中文社区 下载sdk 解压到文件中&#xff0c;打开sdk解压文件 穿插dos操作系统知识点&#xff1a; 测试go语言环境 看到vscode 的目录结…

SpringIOC之AnnotatedElementKey

博主介绍&#xff1a;✌全网粉丝5W&#xff0c;全栈开发工程师&#xff0c;从事多年软件开发&#xff0c;在大厂呆过。持有软件中级、六级等证书。可提供微服务项目搭建与毕业项目实战&#xff0c;博主也曾写过优秀论文&#xff0c;查重率极低&#xff0c;在这方面有丰富的经验…

Unity学习笔记(零基础到就业)|Chapter01:C#入门

Unity学习笔记&#xff08;零基础到就业&#xff09;&#xff5c;Chapter01:C#入门 前言一、控制台输入输出语句二、初识变量1.一些好用的tips2.变量声明的固定写法3.变量类型 三、变量的本质1.变量的存储空间2.变量的本质&#xff1a;2进制 四、变量的命名规范1.必须遵守的规则…

centos开机自启动实战小案例

1.编写一个我们需要做事的脚本 #!/bin/bash # 打印 "Hello" echo "Hello,Mr.Phor" # 为了更好的能看到效果 我们把这段文本放置到一个文件中 如果重启能够看到 /a.txt文件 我们实验成功 echo "hahahahahahahaha" > /a.txt #每次开机 执行…

AUTOSAR从入门到精通-面向服务的中间件SOME/IP(七)

目录 前言 原理 SOME/IP-TP SOME/IP 传输层协议 SOME/IP-SD服务发现(Service Discovery)