第五章 实战案例
5.1. 案例一
5.1.1. 案例介绍
MySQL数据库中有两张表:用户表(users),订单表(orders)。其中用户表中存储的是所有的用户的信息,订单表中存储的是所有的订单的信息。表结构如下:
-
用户表 users:
- id:用户id
- username:用户名
- password:用户密码
- email:用户邮箱
- phone:用户手机号码
- real_name:用户的真实姓名
- registration_time:用户的注册时间
- last_login_time:用户的上次登录时间
- status:用户的状态(活跃、不活跃、冻结)
-
订单表 orders:
- id:订单ID
- user_id:用户ID
- seller_id:卖家ID
- product_id:商品ID
- product_name:商品名称
- product_price:商品单价
- quantity:购买数量
- total_price:订单总价
- order_time:订单时间
业务系统中,每天都有新的用户注册,每天也都在产生大量的订单。今天公司刚刚搭建了数据仓库,需要将已有的数据导入到Hive表中,此时需要将已有的数据全量的导入到Hive的表中。后续每天产生的新用户注册和新的订单,增量的导入到对应的Hive表中。
5.1.2. 数据准备
MySQL中初始数据
# 创建数据库
CREATE DATABASE datax_shop;
USE datax_shop;# 创建用户表
DROP TABLE IF EXISTS users;
CREATE TABLE users (id INT(11) UNSIGNED NOT NULL AUTO_INCREMENT,username VARCHAR(50) NOT NULL,password VARCHAR(255) NOT NULL,email VARCHAR(255) NOT NULL,phone VARCHAR(20) NOT NULL,real_name VARCHAR(50) NOT NULL,registration_time DATE NOT NULL,last_login_time DATE NULL DEFAULT NULL,status ENUM('active', 'inactive', 'frozen') NOT NULL DEFAULT 'active',PRIMARY KEY (id),UNIQUE KEY (username),UNIQUE KEY (email),UNIQUE KEY (phone)
);# 插入一些数据
INSERT INTO users (username, password, email, phone, real_name, registration_time, last_login_time) VALUES
('johndoe','123456','johndoe@163.com','17767827612','John Doe','2020-12-12','2022-09-12'),
('janedoe','123123','janedoe@qq.com','18922783392','Jane Doe','2021-02-12','2022-12-10'),
('bobsmith','121212','bobsmith@126.com','17122811292','Bob Smith','2020-10-11','2022-01-15'),
('sarahlee','11111','sarahlee@qq.com','17122810911','Sarah Lee','2019-03-15','2022-02-15'),
('jimmychang','123121','jimmychang@qq.com','155514442134','Jimmy Chang','2022-12-11', NULL),
('alexjohnson','121212','alexjohnson@126.com','15522427212','Alex Johnson','2021-09-01', NULL);# 创建订单表
DROP TABLE IF EXISTS orders;
CREATE TABLE orders (id INT PRIMARY KEY AUTO_INCREMENT,user_id INT NOT NULL,seller_id INT NOT NULL,product_id INT NOT NULL,product_name VARCHAR(255) NOT NULL,product_price DECIMAL(10, 2) NOT NULL,quantity INT NOT NULL,total_price DECIMAL(10, 2) NOT NULL,order_time DATE NOT NULL
);# 插入一些数据
INSERT INTO orders (user_id, seller_id, product_id, product_name, product_price, quantity, total_price, order_time) VALUES
(1, 1, 12, '电动牙刷', 90, 1, 90, '2020-12-20'),
(1, 2, 15, '洗面奶', 45, 1, 45, '2020-12-20'),
(1, 3, 17, '面膜', 110, 2, 220, '2020-12-20'),
(2, 1, 11, 'iPad', 5990, 1, 5990, '2021-12-20'),
(2, 2, 19, 'iPhone数据线', 18, 1, 18, '2021-11-20'),
(3, 1, 20, 'iPhone手机壳', 80, 1, 80, '2020-12-20'),
(3, 2, 22, '榴莲', 45, 4, 180, '2021-09-12'),
(3, 3, 23, '西瓜', 12, 5, 60, '2021-11-11'),
(4, 1, 4, '洗地机', 2990, 1, 2990, '2020-06-18'),
(4, 2, 7, '油污清洁剂', 78, 2, 156, '2020-07-11'),
(4, 3, 11, '镜子', 10, 1, 10, '2020-06-20'),
(5, 1, 9, '健力宝', 48, 2, 96, '2022-12-20');
Hive表的创建
-- 创建数据库
create database datax_shop;-- 创建用户表
drop table if exists datax_shop.users;
create table datax_shop.users (id int,username string,password string,email string,phone string,real_name string,registration_time string,last_login_time string,status string
)
row format delimited
fields terminated by '\t'
lines terminated by '\n'
stored as orcfile;-- 创建订单表
drop table if exists datax_shop.orders;
create table datax_shop.orders (id int,user_id int,seller_id int,product_id int,product_name string,product_price double,quantity int,total_price double,order_time string
)
partitioned by (year string, month string)
row format delimited
fields terminated by '\t'
lines terminated by '\n'
stored as orcfile;
5.1.3. 数据全量导入
用户表全量导入
{"job": {"setting": {"speed": {"channel": 1}},"content": [{"reader": {"name": "mysqlreader","parameter": {"username": "root","password": "123456","column": ["id","username","password","email","phone","real_name","registration_time","last_login_time","status"],"connection": [{"jdbcUrl": ["jdbc:mysql://qianfeng01:3306/datax_shop"],"table": ["users"]}]}},"writer": {"name": "hdfswriter","parameter": {"defaultFS": "hdfs://qianfeng01:9820","path": "/user/hive/warehouse/datax_shop.db/users","fileName": "original","writeMode": "append","fieldDelimiter": "\t","fileType": "orc","column": [{"name": "id", "type": "int"},{"name": "username", "type": "string"},{"name": "password", "type": "string"},{"name": "email", "type": "string"},{"name": "phone", "type": "string"},{"name": "real_name", "type": "string"},{"name": "registration_time", "type": "string"},{"name": "last_login_time", "type": "string"},{"name": "status", "type": "string"}]}}}]}
}
订单表全量导入
订单表在全量导入的时候,因为要按照订单创建时候的日期作为分区的字段。所以需要创建一张临时表,先将MySQL中的订单数据全量的导入到这个临时表中,然后再从这个临时表加载到订单表的指定分区中。
-- 创建临时表,用来承接全量导入的订单信息
drop table if exists datax_shop.orders_origin;
create table datax_shop.orders_origin (id int,user_id int,seller_id int,product_id int,product_name string,product_price double,quantity int,total_price double,order_time string,year string,month string
)
row format delimited
fields terminated by '\t'
lines terminated by '\n';
创建数据同步方案,同步MySQL的订单数据到这个临时表中
{"job": {"setting": {"speed": {"channel": 1}},"content": [{"reader": {"name": "mysqlreader","parameter": {"username": "root","password": "123456","connection": [{"jdbcUrl": ["jdbc:mysql://qianfeng01:3306/datax_shop"],"table": ["orders"]}],"column": ["id","user_id","seller_id","product_id","product_name","product_price","quantity","total_price","order_time","year(order_time)","lpad(month(order_time), 2, 0)"]}},"writer": {"name": "hdfswriter","parameter": {"defaultFS": "hdfs://qianfeng01:9820","path": "/user/hive/warehouse/datax_shop.db/orders_origin/","fileName": "orders_origin","writeMode": "append","fieldDelimiter": "\t","fileType": "text","column": [{"name": "id", "type": "int"},{"name": "user_id", "type": "int"},{"name": "seller_id", "type": "int"},{"name": "product_id", "type": "int"},{"name": "product_name", "type": "string"},{"name": "product_price", "type": "double"},{"name": "quantity", "type": "double"},{"name": "total_price", "type": "double"},{"name": "order_time", "type": "string"},{"name": "year", "type": "string"},{"name": "month", "type": "string"}]}}}]}
}
加载数据,到orders表的对应分区中
-- 关闭严格模式
set hive.exec.dynamic.partition.mode=nonstrict;-- 导入数据到订单表中
insert into datax_shop.orders partition(year, month) select * from datax_shop.orders_origin;
5.1.4. 增量数据导入
用户表增量导入
在现有数据全量导入到Hive表中之后,每日新增的数据只需要增量导入到Hive即可。此时我们就可以按照用户注册的时间来确定需要将什么数据导入到Hive的用户表中。
首先,我们在将现有的数据全量的导入到Hive之后,模拟新用户的注册。
INSERT INTO users (username, password, email, phone, real_name, registration_time, last_login_time) VALUES
('natalielin','121212','natalielin@qq.com','17788889999','Natalie Lin','2023-01-01', NULL),
('harrytran','123123','harrytran@126.com','17666228192','Harry Tran','2023-01-01', NULL),
('gracewang','313131','gracewang@163.com','18872631272','Grace Wang','2023-01-01', NULL),
('peterlee','123123','peterlee@qq.com','19822781829','Peter Lee','2023-01-01',NULL);
现在我们需要将在 2023-01-01 注册的用户,增量导入到Hive的用户表中。
{"job": {"setting": {"speed": {"channel": 1}},"content": [{"reader": {"name": "mysqlreader","parameter": {"username": "root","password": "123456","column": ["id","username","password","email","phone","real_name","registration_time","last_login_time","status"],"connection": [{"jdbcUrl": ["jdbc:mysql://qianfeng01:3306/datax_shop"],"table": ["users"]}],"where": "registration_time = '$date'"}},"writer": {"name": "hdfswriter","parameter": {"defaultFS": "hdfs://qianfeng01:9820","path": "/user/hive/warehouse/datax_shop.db/users","fileName": "append","writeMode": "append","fieldDelimiter": "\t","fileType": "orc","column": [{"name": "id", "type": "int"},{"name": "username", "type": "string"},{"name": "password", "type": "string"},{"name": "email", "type": "string"},{"name": "phone", "type": "string"},{"name": "real_name", "type": "string"},{"name": "registration_time", "type": "string"},{"name": "last_login_time", "type": "string"},{"name": "status", "type": "string"}]}}}]}
}
在上述的数据同步的方案中,我们使用到了变量 date
用来表示需要增量导入用户的注册时间。在使用这个数据同步方案的时候,需要指定变量 date
的值:
python3 /usr/local/datax/bin/datax.py -p "-Ddate=2023-01-01" incr-users.json
订单表增量导入
在现有的所有订单数据全量导入到Hive的订单表后,每天仍然会有大量的订单数据生成。此时我们只需要按照天为单位,将某一天产生的所有的数据增量导入到Hive的订单表中,并放置在指定的分区位置即可。
首先,在现有的订单数据全量导入到Hive的订单表之后,我们来模拟一些新增的订单信息
INSERT INTO orders (user_id, seller_id, product_id, product_name, product_price, quantity, total_price, order_time) VALUES
(6, 1, 110, '大米', 90, 1, 90, '2023-01-01'),
(6, 2, 120, '护手霜', 20, 2, 40, '2023-01-01'),
(6, 3, 130, '地板', 120, 5, 600, '2023-01-01'),
(7, 1, 140, '筒灯', 100, 10, 1000, '2023-01-01'),
(7, 2, 150, '假发', 2000, 1, 2000, '2023-01-01'),
(7, 3, 160, '牛奶', 100, 2, 200, '2023-01-01'),
(8, 1, 170, '百褶裙', 1000, 2, 2000, '2023-01-01'),
(8, 2, 180, '真丝丝巾', 300, 2, 600, '2023-01-01'),
(8, 3, 190, '太阳镜', 250, 1, 250, '2023-01-01'),
(9, 1, 200, '遮阳伞', 120, 1, 120, '2023-01-01'),
(9, 2, 210, '盆栽', 220, 5, 1100, '2023-01-01'),
(10, 1, 220, '口琴', 50, 1, 50, '2023-01-01'),
(10, 2, 230, 'RIO', 12, 10, 120, '2023-01-01');
现在我们需要将某一天的数据增量的导入到Hive对应的分区中,其实这个过程就是使用hdfswriter
将增量的数据直接写入到HDFS的指定分区目录下即可。但是需要保证这个分区已经被创建出来了。
-- 检查指定分区是否存在
show partitions datax_shop.orders partition(year='2023', month='01');-- 如果这个分区不存在,就创建这个分区
alter table datax_shop.orders add partition(year='2023', month='01');
分区创建完成之后,就可以将某天新增的数据同步到对应的分区目录了
{"job": {"setting": {"speed": {"channel": 1}},"content": [{"reader": {"name": "mysqlreader","parameter": {"username": "root","password": "123456","column": ["id","user_id","seller_id","product_id","product_name","product_price","quantity","total_price","order_time"],"connection": [{"jdbcUrl": ["jdbc:mysql://qianfeng01:3306/datax_shop"],"table": ["orders"]}],"where": "order_time = '$date'"}},"writer": {"name": "hdfswriter","parameter": {"defaultFS": "hdfs://qianfeng01:9820","path": "/user/hive/warehouse/datax_shop.db/orders/year=$year/month=$month","fileName": "append","writeMode": "append","fieldDelimiter": "\t","fileType": "orc","column": [{"name": "id", "type": "int"},{"name": "user_id", "type": "int"},{"name": "seller_id", "type": "int"},{"name": "product_id", "type": "int"},{"name": "product_name", "type": "string"},{"name": "product_price", "type": "double"},{"name": "quantity", "type": "double"},{"name": "total_price", "type": "double"},{"name": "order_time", "type": "string"}]}}}]}
}
在上述的数据同步方案中,我们定义了三个变量:date
、year
、month
,用来控制从MySQL数据库导入的数据和存放到Hive的对应的分区。因此我们在使用这个配置同步数据的时候,需要指定这三个变量值:
python3 /usr/local/datax/bin/datax.py -p "-Ddate=2023-01-01 -Dyear=2023 -Dmonth=01" incr-orders.json
5.1.5. 脚本调度
我们已经实现了将指定日期(2023-01-01)的增量的数据导入到Hive对应的数据表中,但是这样做不够灵活,我们不能每一次在需要增量导入的时候都去执行上述的一个个命令。为了能够更加方便的同步数据,以及可以定时的进行调度,我们可以将其做成一个脚本,在需要的时候直接调用即可。
shell脚本
#!/bin/bash# 获取需要同步的数据的日期为昨天
# dt=`date -d yesterday +"%Y-%m-%d"`
dt='2023-01-01'# 提取年
year=${dt:0:4}
month=${dt:5:2}# 设置DataX路径
DATAX_HOME=/usr/local/datax# 设置jobs路径
JOBS_HOME=/root/datax-example# 增量导入用户数据
python $DATAX_HOME/bin/datax.py -p "-Ddate=$dt" $JOBS_HOME/incr-users.json# 增量导入订单数据
# 1. 检查Hive表目标分区是否存在,如果目标分区不存在,创建分区
if [ ! hive -e "show partitions datax_shop.orders partition(year='$year', month='$month')" ]; thenhive -e "alter table datax_shop.orders add partition(year='$year', month='$month')"
fi# 3. 执行增量导入订单
python $DATAX_HOME/bin/datax.py -p "-Ddate=$dt -Dyear=$year -Dmonth=$month" $JOBS_HOME/incr-orders.json
python脚本
# @Author :
# @Company : import datetime
import os
from pyhive import hive# 在脚本中需要和Hive进行交互,查询Hive表的分区是否存在、创建分区等操作,因此需要使用到PyHive
# 如果没有安装的话,需要手动安装一下PyHive
# yum install cyrus-sasl-plain cyrus-sasl-devel cyrus-sasl-gssapi
# pip3 install thrift
# pip3 install sasl
# pip3 install thrift-sasl
# pip3 install pyhive# PyHive需要使用Hive的ThriftServer服务,因此需要保证你的Hive对应的服务是打开的
# nohup hive --service hiveserver2 > /var/log/hiveserver2 2>&1 &# 设置 DataX 和 Jobs 的Home路径
DATAX_HOME = "/usr/local/datax"
JOBS_HOME = "/root/datax-example"# 设置同步任务的JSON配置文件名
INCR_USERS = "incr-users.json"
INCR_ORDERS = "incr-orders.json"# 获取当前时间
# now = datetime.datetime(2023, 1, 1)
now = datetime.datetime.now()
dt = str(now.date())
year = f"{now.year:0>4}"
month = f"{now.month:0>2}"# 增量导入用户数据
os.system(f'python {DATAX_HOME}/bin/datax.py -p "-Ddate={dt}" {JOBS_HOME}/{INCR_USERS}')# 查看Hive的指定分区是否存在,如果不存在,创建分区
with hive.Connection(host="192.168.10.111", port=10000, username="root", database="datax_shop") as conn:with conn.cursor() as cursor:cursor.execute(f"show partitions orders partition(year='{year}', month='{month}')")partitions = cursor.fetchone()if partitions is None:# 说明这个分区不存在,创建cursor.execute(f"alter table orders add partition(year='{year}', month='{month}')")# 增量导入订单数据
os.system(f'python {DATAX_HOME}/bin/datax.py -p "-Ddate={dt} -Dyear={year} -Dmonth={month}" {JOBS_HOME}/{INCR_ORDERS}')