大数据基础设施搭建 - 业务数据同步策略

文章目录

  • 一、全量同步
    • 1.1 梳理需要全量同步的业务表
    • 1.2 Sqoop: MySQL To HDFS
      • 1.2.1 开发脚本
      • 1.2.2 授予脚本执行权限
      • 1.2.3 执行脚本
      • 1.2.4 定时调度
  • 二、增量同步
    • 2.1 梳理需要增量同步的业务表
    • 2.2 Maxwell: MySQL To Kafka
      • 2.2.1 首次全量同步
      • 2.2.2 每日增量同步
        • 2.2.2.1 编写Maxwell配置文件
        • 2.2.2.2 启动Maxwell
    • 2.3 Flume: Kafka To HDFS
      • 2.3.1 自定义拦截器
      • 2.3.2 编写配置文件
      • 2.3.3 创建Kafka Topic
      • 2.3.4 启动/停止Flume
  • 三、正式上线
    • 3.1 删除HDFS上业务测试数据
    • 3.2 删除Hive外部表对应HDFS的数据
    • 3.3 Sqoop全量同步
      • 3.3.1 首次执行
      • 3.3.2 定时调度
    • 3.4 Maxwell全量同步

一、全量同步

数据流转:MySQL ---------Sqoop脚本---------》 HDFS

1.1 梳理需要全量同步的业务表

序号表名表中文名
1sku_info库存单元表
2base_category1一级分类表
3base_category2二级分类表
4base_category3三级分类表
5base_province省份表
6base_trademark品牌表
7spu_info商品表
8favor_info商品收藏表
9cart_info购物车表
10coupon_info优惠券表
11activity_info活动表
12activity_rule优惠规则
13base_dic字典表
14sku_attr_valuesku平台属性值关联表
15sku_sale_attr_valuesku销售属性值

1.2 Sqoop: MySQL To HDFS

1.2.1 开发脚本

脚本放在/home/hadoop/bin目录下的原因:该脚本是给普通用户hadoop使用的,在普通用户hadoop登陆的情况下执行echo $PATH命令,发现/home/hadoop/bin在其中,表示普通用户hadoop可以任意地方使用该目录下的可执行文件。

[hadoop@hadoop102 ~]$ cd /home/hadoop/
[hadoop@hadoop102 ~]$ mkdir bin
[hadoop@hadoop102 ~]$ vim /home/hadoop/bin/mysql_to_hdfs_full.sh

脚本内容:

–delete-target-dir:如果目标文件夹存在,先删除后插入。

#! /bin/bashif [ -n "$2" ] ;thendo_date=$2
elsedo_date=`date -d '-1 day' +%F`
fiimport_data(){
/opt/module/sqoop-1.4.6.bin__hadoop-2.0.4-alpha/bin/sqoop import \
--connect jdbc:mysql://mall:3306/gmall \
--username root \
--password 123456 \
--target-dir /warehouse/db/gmall/$1_full/$do_date \
--delete-target-dir \
--query "$2 and  \$CONDITIONS" \
--num-mappers 1 \
--fields-terminated-by '\t' \
--compress \
--compression-codec gzip \
--null-string '\\N' \
--null-non-string '\\N'
}import_sku_info(){import_data "sku_info" "select id,spu_id,price,sku_name,sku_desc,weight,tm_id,category3_id,is_sale,create_timefrom sku_info where 1=1"
}import_base_category1(){import_data "base_category1" "select id,name from base_category1 where 1=1"
}import_base_category2(){import_data "base_category2" "selectid,name,category1_id from base_category2 where 1=1"
}import_base_category3(){import_data "base_category3" "selectid,name,category2_idfrom base_category3 where 1=1"
}import_base_province(){import_data "base_province" "selectid,name,region_id,area_code,iso_code,iso_3166_2from base_provincewhere 1=1"
}import_base_region(){import_data "base_region" "selectid,region_namefrom base_regionwhere 1=1"
}import_base_trademark(){import_data "base_trademark" "selectid,tm_namefrom base_trademarkwhere 1=1"
}import_spu_info(){import_data "spu_info" "selectid,spu_name,category3_id,tm_idfrom spu_infowhere 1=1"
}import_favor_info(){import_data "favor_info" "selectid,user_id,sku_id,spu_id,is_cancel,create_time,cancel_timefrom favor_infowhere 1=1"
}import_cart_info(){import_data "cart_info" "selectid,user_id,sku_id,cart_price,sku_num,sku_name,create_time,operate_time,is_ordered,order_time,source_type,source_idfrom cart_infowhere 1=1"
}import_coupon_info(){import_data "coupon_info" "selectid,coupon_name,coupon_type,condition_amount,condition_num,activity_id,benefit_amount,benefit_discount,create_time,range_type,limit_num,taken_count,start_time,end_time,operate_time,expire_timefrom coupon_infowhere 1=1"
}import_activity_info(){import_data "activity_info" "selectid,activity_name,activity_type,start_time,end_time,create_timefrom activity_infowhere 1=1"
}import_activity_rule(){import_data "activity_rule" "selectid,activity_id,activity_type,condition_amount,condition_num,benefit_amount,benefit_discount,benefit_levelfrom activity_rulewhere 1=1"
}import_base_dic(){import_data "base_dic" "selectdic_code,dic_name,parent_code,create_time,operate_timefrom base_dicwhere 1=1"
}import_sku_attr_value(){import_data "sku_attr_value" "selectid,attr_id,value_id,sku_id,attr_name,value_namefrom sku_attr_valuewhere 1=1"
}import_sku_sale_attr_value(){import_data sku_sale_attr_value "selectid,sku_id,spu_id,sale_attr_value_id,sale_attr_id,sale_attr_name,sale_attr_value_namefrom sku_sale_attr_valuewhere 1=1"
}case $1 in"sku_info")import_sku_info
;;"base_category1")import_base_category1
;;"base_category2")import_base_category2
;;"base_category3")import_base_category3
;;"base_province")import_base_province
;;"base_region")import_base_region
;;"base_trademark")import_base_trademark
;;"spu_info")import_spu_info
;;"favor_info")import_favor_info
;;"cart_info")import_cart_info
;;"coupon_info")import_coupon_info
;;"activity_info")import_activity_info
;;"activity_rule")import_activity_rule
;;"base_dic")import_base_dic
;;"sku_attr_value")import_sku_attr_value
;;"sku_sale_attr_value")import_sku_sale_attr_value
;;
"all")import_sku_infoimport_base_category1import_base_category2import_base_category3import_base_provinceimport_base_regionimport_base_trademarkimport_spu_infoimport_favor_infoimport_cart_infoimport_coupon_infoimport_activity_infoimport_activity_ruleimport_base_dicimport_sku_attr_valueimport_sku_sale_attr_value
;;
esac

1.2.2 授予脚本执行权限

[hadoop@hadoop102 bin]$ chmod +x mysql_to_hdfs_full.sh

1.2.3 执行脚本

每天凌晨执行不应该带上日期参数。

[hadoop@hadoop102 ~]$ mysql_to_hdfs_full.sh all 2023-12-02

1.2.4 定时调度

凌晨执行,拉取MySQL全量数据放入HDFS昨天分区。

二、增量同步

数据流转:MySQL ---------Maxwell监控程序--------》 Kafka ----------Flume监控程序---------》 HDFS
怎么处理删除和更新??????

2.1 梳理需要增量同步的业务表

序号表名表中文名
1order_info订单表
2coupon_use优惠券领用表
3order_status_log订单状态日志表
4user_info用户信息表
5order_detail订单明细表
6payment_info支付信息表
7comment_info商品评论表
8order_refund_info退单表
9order_detail_activity订单明细活动表
10order_detail_coupon订单明细购物券表
11refund_payment退款信息表

2.2 Maxwell: MySQL To Kafka

2.2.1 首次全量同步

首次全量同步:使用maxwell-bootstrap功能将MySQL历史全量数据导入Kafka。
注意:只需在业务表首次入仓时执行一次。

[mall@mall ~]$ mkdir bin
[mall@mall bin]$ vim /home/mall/bin/mysql_to_kafka_inc_init.sh

配置文件:

见下面2.2.2.1,与增量同步使用同一个配置文件。

脚本内容:

#!/bin/bash# 该脚本的作用是初始化增量表,只需执行一次import_data() {/opt/module/maxwell-1.29.2/bin/maxwell-bootstrap --database gmall --table $1 --config /opt/module/maxwell-1.29.2/config.properties
}case $1 in
"order_info")import_data order_info;;
"coupon_use")import_data coupon_use;;
"order_status_log")import_data order_status_log;;
"user_info")import_data user_info;;
"order_detail")import_data order_detail;;
"payment_info")import_data payment_info;;
"comment_info")import_data comment_info;;
"order_refund_info")import_data order_refund_info;;
"order_detail_activity")import_data order_detail_activity;;
"order_detail_coupon")import_data order_detail_coupon;;
"refund_payment")import_data refund_payment;;
"all")import_data order_infoimport_data coupon_useimport_data order_status_logimport_data user_infoimport_data order_detailimport_data payment_infoimport_data comment_infoimport_data order_refund_infoimport_data order_detail_activityimport_data order_detail_couponimport_data refund_payment;;
esac

授予脚本执行权限:

[mall@mall bin]$ chmod +x mysql_to_kafka_inc_init.sh

执行脚本:

[mall@mall ~]$ mysql_to_kafka_inc_init.sh all

2.2.2 每日增量同步

2.2.2.1 编写Maxwell配置文件
[mall@mall maxwell-1.29.2]$ vim config.properties

内容:

#Maxwell数据发送目的地,可选配置有stdout|file|kafka|kinesis|pubsub|sqs|rabbitmq|redis
producer=kafka
#目标Kafka集群地址
kafka.bootstrap.servers=hadoop102:9092,hadoop103:9092
#目标Kafka topic,可静态配置,例如:maxwell,也可动态配置,例如:%{database}_%{table}
kafka_topic=topic_mall_db_binlog#MySQL相关配置
host=mall
user=maxwell
password=maxwell
jdbc_options=useSSL=false&serverTimezone=Asia/Shanghai
2.2.2.2 启动Maxwell
# 启动
[mall@mall ~]$ /opt/module/maxwell-1.29.2/bin/maxwell --config /opt/module/maxwell-1.29.2/config.properties --daemon
# 停止
[mall@mall ~]$ ps -ef | grep maxwell | grep -v grep | awk '{print $2}'
[mall@mall ~]$ kill 

2.3 Flume: Kafka To HDFS

2.3.1 自定义拦截器

套路:从body中拿出采集到的数据,解析出有用字段放入header中,配置文件中可以获取header中的东西。
作用1:把从Kafka中获取的json串的业务表名放到header中。
作用2:把从Kafka中获取的json串的ts时间戳转换成毫秒,放入header中。

代码:

package com.songshuang.flume.interceptor;import com.alibaba.fastjson.JSONObject;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;/*** @date 2023/12/1 15:24*/
public class TimestampAndTableNameInterceptor implements Interceptor {@Overridepublic void initialize() {}@Overridepublic Event intercept(Event event) {// 1、从body中解析json拿出想要字段byte[] body = event.getBody();String db_binlog = new String(body, StandardCharsets.UTF_8);JSONObject jsonObject = JSONObject.parseObject(db_binlog);Long ts = jsonObject.getLong("ts");//Maxwell输出的数据中的ts字段时间戳单位为秒,Flume HDFSSink要求单位为毫秒String timeMills = String.valueOf(ts * 1000);String tableName = jsonObject.getString("table");// 2、放入header中Map<String, String> headers = event.getHeaders();headers.put("timestamp", timeMills);headers.put("tableName", tableName);return event;}@Overridepublic List<Event> intercept(List<Event> events) {for (Event event : events) {intercept(event);}return events;}@Overridepublic void close() {}// 建造者模式public static class Builder implements Interceptor.Builder {@Overridepublic Interceptor build() {return new TimestampAndTableNameInterceptor ();}@Overridepublic void configure(Context context) {}}
}

2.3.2 编写配置文件

[hadoop@hadoop104 ~]$ cd /opt/module/apache-flume-1.9.0-bin/job/
[hadoop@hadoop104 job]$ vim kafka_to_hdfs_db.conf

内容:

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1# 配置source
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092
a1.sources.r1.kafka.topics = topic_mall_db_binlog
a1.sources.r1.kafka.consumer.group.id = consumer_group_flume_mall_db_binlog
# 指定consumer从哪个offset开始消费,默认latest
# a1.sources.r1.kafka.consumer.auto.offset.reset = earliest
# 自定义拦截器
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.songshuang.flume.interceptor.TimestampAndTableNameInterceptor$Builder# 配置sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /warehouse/db/gmall/%{tableName}_inc/%Y-%m-%d
a1.sinks.k1.hdfs.fileType = CompressedStream
a1.sinks.k1.hdfs.codeC = gzip# 配置channel
a1.channels.c1.type = file
a1.channels.c1.dataDirs = /opt/module/apache-flume-1.9.0-bin/data/kafka_to_hdfs_mall_db_binlog
a1.channels.c1.checkpointDir = /opt/module/apache-flume-1.9.0-bin/checkpoint/kafka_to_hdfs_mall_db_binlog# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

2.3.3 创建Kafka Topic

手动创建topic好处:可以自定义分区数、分区副本数。

[hadoop@hadoop102 ~]$ cd /opt/module/kafka_2.11-2.4.1/
[hadoop@hadoop102 kafka_2.11-2.4.1]$ bin/kafka-topics.sh --bootstrap-server hadoop102:9092,hadoop103:9092,hadoop104:9092 --create --replication-factor 2 --partitions 3 --topic topic_mall_db_binlog

2.3.4 启动/停止Flume

# 启动
[hadoop@hadoop104 job]$ cd /opt/module/apache-flume-1.9.0-bin/
[hadoop@hadoop104 apache-flume-1.9.0-bin]$ nohup bin/flume-ng agent --conf conf/ --name a1 --conf-file job/kafka_to_hdfs_db.conf &>/dev/null 2>/dev/null &
# 停止
[hadoop@hadoop104 apache-flume-1.9.0-bin]$ ps -ef | grep kafka_to_hdfs_db.conf | grep -v grep | awk '{print $2}'
[hadoop@hadoop104 apache-flume-1.9.0-bin]$ kill

三、正式上线

3.1 删除HDFS上业务测试数据

删除HDFS目录 /warehouse/db/gmall

3.2 删除Hive外部表对应HDFS的数据

删除HDFS目录 /warehouse/dw_ods.db/ods_业务表名

3.3 Sqoop全量同步

3.3.1 首次执行

上线后每日凌晨执行脚本不能带着日期参数,默认昨天日期。
注意:首次同步的数据不是很准确,分区数据可能错乱!

[hadoop@hadoop102 ~]$ mysql_to_hdfs_full.sh all 2023-12-02

3.3.2 定时调度

默认给当前用户添加定时调度任务。

[hadoop@hadoop102 ~]$ crontab -e

内容:

每天的零点零分执行Sqoop全量同步脚本!

0 0 * * * mysql_to_hdfs_full.sh all

查看当前用户具有的定时任务:

[hadoop@hadoop102 ~]$ crontab -l

3.4 Maxwell全量同步

[mall@mall ~]$ mysql_to_kafka_inc_init.sh all

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/192193.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

周一到周五进行osg和渲染,周末进行ue

之所以重新规划&#xff0c;是因为我在某家小公司做Ue开发顾问。其实&#xff0c;我这烂水平&#xff0c;也有人看得上&#xff0c;哈哈。 正好督促我学习ue了&#xff0c;也不荒废掉ue&#xff0c;也不用盲目跳槽&#xff0c;时间也分得很开&#xff0c;very good。

rdf-file:SM2加解密

一&#xff1a;SM2简介 SM2是中国密码学算法标准中的一种非对称加密算法&#xff08;包括公钥和私钥&#xff09;。SM2主要用于数字签名、密钥交换和加密解密等密码学。 生成秘钥&#xff1a;用于生成一对公钥和私钥。公钥&#xff1a;用于加密数据和验证数字签名。私钥&…

javaSE学习-1-数据类型与运算符

目录 字面常量 数据类型 int Long short Byte float double char boolean 类型转换 强转 自动类型转换(隐式) 字符串类型 字符串和整形数字之间进行转换 字面常量 比如 System.Out.println("Hello World") &#xff1b; 语句&#xff0c;不论程序何时…

代码随想录第二十二天(一刷C语言)|组合总数电话号码的字母组合

创作目的&#xff1a;为了方便自己后续复习重点&#xff0c;以及养成写博客的习惯。 一、组合总数 思路&#xff1a;参考carl文档和视频 1、需要一维数组path来存放符合条件的结果&#xff0c;二维数组result来存放结果集。 2、targetSum 目标和&#xff0c;也就是题目中的…

企业数字化转型应该做什么

以下是通过学习华为数字化转型并结合自身情况引发的学习思考,供大家参考。企业做数字化转型目的是为了赋能业务,因为原有老业务发展了这么多年,好做的都已经做完了,目前已进入了深水区,需要引入新的创新源泉,需要跨界创新,特别是这几年IT技术的发展,需要通过IT进行借力…

Rust多线程任务,发现有些线程一直获取不到锁【已解决】

问题描述 项目中用到rust&#xff0c;其中在多线程中用到了同一个对象的锁&#xff0c;然而发现其中一个线程一直拿不到这个锁。 解决过程 我先是在线程A中加入了sleep方法&#xff0c;这样做的效果就是&#xff0c;比最初好一些&#xff0c;但是拿到锁还是要较长时间&#xf…

动态页面技术的发展与应用

jsp 静态页面&#xff1a;web诞生后的html文档&#xff0c;不论多少次访问都是同一份html文档或者是其他的什么文档&#xff0c;所以说是”静态“的。 虽然js能让页面产生互动&#xff0c;但是不论什么人访问&#xff0c;看到的都是放在服务器的那一份写死的文件/文档activexa…

【Python】Python给工作减负-读Excel文件生成xml文件

目录 ​前言 正文 1.Python基础学习 2.Python读取Excel表格 2.1安装xlrd模块 2.2使用介绍 2.2.1常用单元格中的数据类型 2.2.2 导入模块 2.2.3打开Excel文件读取数据 2.2.4常用函数 2.2.5代码测试 2.2.6 Python操作Excel官方网址 3.Python创建xml文件 3.1 xml语法…

自定义类型:结构体(自引用、内存对齐、位段(位域))

目录 一. 结构体类型的声明和定义 1.1结构体相关概念 1.11结构的声明 1.12成员列表 1.2定义结构体类型变量的方法 1.21先声明结构体类型再定义变量名 ​​​​1.22在声明类型的同时定义变量 1.23直接定义结构类型变量 二、结构体变量的创建、初始化​和访问 2.1结构体…

初探webpack之单应用多端构建

初探webpack之单应用多端构建 在现代化前端开发中&#xff0c;我们可以借助构建工具来简化很多工作&#xff0c;单应用多端构建就是其中应用比较广泛的方案&#xff0c;webpack中提供了loader与plugin来给予开发者非常大的操作空间来操作构建过程&#xff0c;通过操作中间产物…

[二分查找]LeetCode2009 :使数组连续的最少操作数

本文涉及的基础知识点 二分查找算法合集 作者推荐 动态规划LeetCode2552&#xff1a;优化了6版的1324模式 题目 给你一个整数数组 nums 。每一次操作中&#xff0c;你可以将 nums 中 任意 一个元素替换成 任意 整数。 如果 nums 满足以下条件&#xff0c;那么它是 连续的 …

记录 | ssh config免密连接

[适用于 linux 和 mac] 比如 ~/.ssh/config: Host targetHostName 192.168.2.12User rootPort 9990这样每次 ssh target 的时候每次都需要输入密码&#xff0c;比较麻烦 解决办法如下&#xff0c;可让下次不用重新输入密码 # 先要生成 ssh 密钥 ssh-keygen# 配置永久性密码 s…

Java Web——动态Web开发核心-Servlet

1. 官方文档 官方文档地址&#xff1a;Overview (Servlet 4.0 API Documentation - Apache Tomcat 9.0.83) servlet 与 Tomcat 的关系&#xff1a;Tomcat 支持 Servlet Tomcat 是一个开源的 Java 服务器&#xff0c;它主要用来提供 Web 服务&#xff0c;包括 HTTP 请求和响应…

Qt基础之四十:Qt Installer Framework(QtIFW)的编译、使用和实现原理

一.编译 编译环境: 系统:Windows 10 专业版 64位 编译器:Visual Studio 2017 本文编译的是当前最新版本qt-installer-framework 4.6.1 源码下载地址:清华大学开源软件镜像站 解压后可以看到里面自带编译指导文件—INSTALL,INSTALL里明确了以下几点: 1.静态编译Qt Qt版…

Git 合并冲突解决步骤

Git 合并冲突解决步骤 1. 找到并打开冲突文件 定位到发生冲突的文件。可以通过 Git 的命令行输出找到这些文件。例如&#xff1a; pom.xmlsrc/main/java/com/zzm/config/SecurityConfig.javasrc/main/java/com/zzm/service/chat/UserConversationsServiceImpl.javasrc/main/…

EasyExcel写入多个sheet

直接上代码&#xff1a; public static void main(String[] args) {// 设置excel工作簿ExcelWriter excelWriter EasyExcel.write("F:\\excel\\a.xls").build();List<User> userList new ArrayList<>();userList.add(new User("lisi", "…

初始数据结构(加深对旋转的理解)

力扣&#xff08;LeetCode&#xff09;官网 - 全球极客挚爱的技术成长平台备战技术面试&#xff1f;力扣提供海量技术面试资源&#xff0c;帮助你高效提升编程技能&#xff0c;轻松拿下世界 IT 名企 Dream Offer。https://leetcode.cn/problems/rotate-array/submissions/ 与字…

Android Wifi的扫描机制

Android Wifi 的扫描场景分为下面四种情况&#xff1a; 1.亮屏情况下&#xff0c;在Wifi settings界面&#xff0c;固定扫描&#xff0c;扫描时间为10s。 2.亮屏情况下&#xff0c;在非Wifi settings界面&#xff0c;二进制指数退避扫描&#xff0c;退避&#xff1a;interval…

11.1每日一题(关于函数定义域)

f(x1)&#xff1a;自变量为x&#xff0c;x1为中间变量&#xff0c;所以f(x1)的定义域的取值范围是x的取值范围 f(x)&#xff1a;自变量为x&#xff0c;f(x)的定义域等价于f(x1)中 x1整体的定义域