GitLab的用户创建和推送
- 在root用户-密码界面重新设置密码
- 添加Leader用户和自己使用的用户
- 使用root用户创建相应的群组
- 使用Leader用户创建对应的项目
- 设置分支配置为“初始推送后完全保护”
- 设置.gitignore文件,项目配置文件等其他非通用代码无需提交
- 安装gitlab project 2020插件
- 点击share project on gitlab 即可将项目上传到gitlab中
Flink集群的搭建
- 只需要运行Yarn模式
- 配置Hadoop的环境变量
- 将Flink1.17解压安装到对应为止即可
Hbase的配置
- 依赖zookeeper和hadoop这两个框架
- 检查Hadoop是否退出安全模式,如果丢失文件,先退出安全模式,
hdfs dfsadmin -safemode leave
- 解压Hbase2.4.11的安装包
- 添加Hbase的环境变量
- 修改配置文件
- hbase-env.xml
- export hbase_manages_zk=false 不使用自带的zookeeper
- hbase-site.xml
- hbase.cluster.distributed = true 使用集群模式
- hbase.zookeeper.quorum = hadoop102… zookeeper连接地址
- hbase.rootdir = hdfs://hadoop102:8020, hbase在hdfs的存放根路径
- hbase.wal.provider = filesystem 预写日志
- regionservers: 添加hbase小弟的主机名称
Redise的配置
- 进入redise目录,执行make指令进行编译
- make instanll安装
- 将myredis.conf文件复制到~/目录下
- 将bind 127.0.0.1 注释掉,并且关闭保护模式
- 设置daemon 后台启动模式为yes
- redis-server ./my_redis.conf后台启动
实时数仓ODS层
- 保证数据模拟器产生的数据是有序的
- 设置mock.if-realtime:1,重复执行数据模拟器产生数据时,会从当前时间继续产生数据。
- Kafka数据有序:Flink并发度和Kafka的分区数一致
- 设置三个kafka节点的分区个数都为4,num.partitions=4
- Flink的并发度=4
- 历史维度数据
- 使用maxwell的bootstrap功能初始化维度信息(json格式),写入到kafka
- 编写mysql_to_kafka_init.sh脚本
- maxwell需要检查是否连接mysql的binlog成功,查看日志;如果出错,需要在mysql的maxwell库中删除所有表即可
实时数仓dim层
- dim层的设计依据是维度建模理论,并且遵循三范式,使用雪花模型
- dim层的数据存储在Hbase中
- 开发时需要切换到dev开发分支
- 为Flink的开发创建一个基类,名为BaseApp
- 抽象方法handle(): 每个主程序的业务逻辑
- 具体方法start():里面实现Flink代码的通用逻辑
- 不同分组的数据只能消费一次,如果数据需要给多个程序使用,就需要分为不同的group
Flink-cdc获取维度信息
- 数据清洗
- 动态拆分维度表功能
- 方式1:直接将维度表做成List< String > (维度表名称)保存
- 方式2:将维度表名称设计为单独的一个配置文件,而不是在代码里面写死;后续想要修改,直接改配置文件,重启任务即可生效
- 方式3:热修改hotfix, 热加载配置文件,不需要重启;热加载文件一般是以时间周期作为加载逻辑。时间长时会出现时效性问题,时间短的话过于耗费资源。
- 方式4:zookeeper的watch的监控,能够存储基础的表名,但是不适合存储完整的表格信息,除了要判断哪些是维度表,还需要记录哪些数据需要写出到Hbase。
- 方式5:cdc,变更数据抓取,类似与maxwell。
- 注意:运行下面的代码需要再虚拟机的/etc/my.cnf文件中开启对应数据库的binlog日志。注意对照库名是否填写正确。
public class Test02 {public static void main(String[] args) {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(4);System.setProperty("HADOOP_USER_NAME", "atguigu");env.setStateBackend(new HashMapStateBackend());MySqlSource<String> mySqlSource = MySqlSource.<String>builder().hostname(Constant.MYSQL_HOST).port(Constant.MYSQL_PORT).username(Constant.MYSQL_USER_NAME).password(Constant.MYSQL_PASSWORD).databaseList("gmall2023_config").tableList("gmall2023_config.table_process_dim").deserializer(new JsonDebeziumDeserializationSchema()).startupOptions(StartupOptions.initial()).build();DataStreamSource<String> ds = env.fromSource(mySqlSource,WatermarkStrategy.noWatermarks(),"kafkasource").setParallelism(1);ds.print();try {env.execute();} catch (Exception e) {throw new RuntimeException(e);}}
}