DataX 3.0 实战案例

第五章 实战案例

5.1. 案例一

5.1.1. 案例介绍

MySQL数据库中有两张表:用户表(users),订单表(orders)。其中用户表中存储的是所有的用户的信息,订单表中存储的是所有的订单的信息。表结构如下:

  • 用户表 users:

    • id:用户id
    • username:用户名
    • password:用户密码
    • email:用户邮箱
    • phone:用户手机号码
    • real_name:用户的真实姓名
    • registration_time:用户的注册时间
    • last_login_time:用户的上次登录时间
    • status:用户的状态(活跃、不活跃、冻结)
  • 订单表 orders:

    • id:订单ID
    • user_id:用户ID
    • seller_id:卖家ID
    • product_id:商品ID
    • product_name:商品名称
    • product_price:商品单价
    • quantity:购买数量
    • total_price:订单总价
    • order_time:订单时间

业务系统中,每天都有新的用户注册,每天也都在产生大量的订单。今天公司刚刚搭建了数据仓库,需要将已有的数据导入到Hive表中,此时需要将已有的数据全量的导入到Hive的表中。后续每天产生的新用户注册和新的订单,增量的导入到对应的Hive表中。

5.1.2. 数据准备
MySQL中初始数据
# 创建数据库
CREATE DATABASE datax_shop;
USE datax_shop;# 创建用户表
DROP TABLE IF EXISTS users;
CREATE TABLE users (id INT(11) UNSIGNED NOT NULL AUTO_INCREMENT,username VARCHAR(50) NOT NULL,password VARCHAR(255) NOT NULL,email VARCHAR(255) NOT NULL,phone VARCHAR(20) NOT NULL,real_name VARCHAR(50) NOT NULL,registration_time DATE NOT NULL,last_login_time DATE NULL DEFAULT NULL,status ENUM('active', 'inactive', 'frozen') NOT NULL DEFAULT 'active',PRIMARY KEY (id),UNIQUE KEY (username),UNIQUE KEY (email),UNIQUE KEY (phone)
);# 插入一些数据
INSERT INTO users (username, password, email, phone, real_name, registration_time, last_login_time) VALUES
('johndoe','123456','johndoe@163.com','17767827612','John Doe','2020-12-12','2022-09-12'),
('janedoe','123123','janedoe@qq.com','18922783392','Jane Doe','2021-02-12','2022-12-10'),
('bobsmith','121212','bobsmith@126.com','17122811292','Bob Smith','2020-10-11','2022-01-15'),
('sarahlee','11111','sarahlee@qq.com','17122810911','Sarah Lee','2019-03-15','2022-02-15'),
('jimmychang','123121','jimmychang@qq.com','155514442134','Jimmy Chang','2022-12-11', NULL),
('alexjohnson','121212','alexjohnson@126.com','15522427212','Alex Johnson','2021-09-01', NULL);# 创建订单表
DROP TABLE IF EXISTS orders;
CREATE TABLE orders (id INT PRIMARY KEY AUTO_INCREMENT,user_id INT NOT NULL,seller_id INT NOT NULL,product_id INT NOT NULL,product_name VARCHAR(255) NOT NULL,product_price DECIMAL(10, 2) NOT NULL,quantity INT NOT NULL,total_price DECIMAL(10, 2) NOT NULL,order_time DATE NOT NULL
);# 插入一些数据
INSERT INTO orders (user_id, seller_id, product_id, product_name, product_price, quantity, total_price, order_time) VALUES
(1, 1, 12, '电动牙刷', 90, 1, 90, '2020-12-20'),
(1, 2, 15, '洗面奶', 45, 1, 45, '2020-12-20'),
(1, 3, 17, '面膜', 110, 2, 220, '2020-12-20'),
(2, 1, 11, 'iPad', 5990, 1, 5990, '2021-12-20'),
(2, 2, 19, 'iPhone数据线', 18, 1, 18, '2021-11-20'),
(3, 1, 20, 'iPhone手机壳', 80, 1, 80, '2020-12-20'),
(3, 2, 22, '榴莲', 45, 4, 180, '2021-09-12'),
(3, 3, 23, '西瓜', 12, 5, 60, '2021-11-11'),
(4, 1, 4, '洗地机', 2990, 1, 2990, '2020-06-18'),
(4, 2, 7, '油污清洁剂', 78, 2, 156, '2020-07-11'),
(4, 3, 11, '镜子', 10, 1, 10, '2020-06-20'),
(5, 1, 9, '健力宝', 48, 2, 96, '2022-12-20');
Hive表的创建
-- 创建数据库
create database datax_shop;-- 创建用户表
drop table if exists datax_shop.users;
create table datax_shop.users (id int,username string,password string,email string,phone string,real_name string,registration_time string,last_login_time string,status string
)
row format delimited
fields terminated by '\t'
lines terminated by '\n'
stored as orcfile;-- 创建订单表
drop table if exists datax_shop.orders;
create table datax_shop.orders (id int,user_id int,seller_id int,product_id int,product_name string,product_price double,quantity int,total_price double,order_time string
)
partitioned by (year string, month string)
row format delimited
fields terminated by '\t'
lines terminated by '\n'
stored as orcfile;
5.1.3. 数据全量导入
用户表全量导入
{"job": {"setting": {"speed": {"channel": 1}},"content": [{"reader": {"name": "mysqlreader","parameter": {"username": "root","password": "123456","column": ["id","username","password","email","phone","real_name","registration_time","last_login_time","status"],"connection": [{"jdbcUrl": ["jdbc:mysql://qianfeng01:3306/datax_shop"],"table": ["users"]}]}},"writer": {"name": "hdfswriter","parameter": {"defaultFS": "hdfs://qianfeng01:9820","path": "/user/hive/warehouse/datax_shop.db/users","fileName": "original","writeMode": "append","fieldDelimiter": "\t","fileType": "orc","column": [{"name": "id", "type": "int"},{"name": "username", "type": "string"},{"name": "password", "type": "string"},{"name": "email", "type": "string"},{"name": "phone", "type": "string"},{"name": "real_name", "type": "string"},{"name": "registration_time", "type": "string"},{"name": "last_login_time", "type": "string"},{"name": "status", "type": "string"}]}}}]}
}
订单表全量导入

订单表在全量导入的时候,因为要按照订单创建时候的日期作为分区的字段。所以需要创建一张临时表,先将MySQL中的订单数据全量的导入到这个临时表中,然后再从这个临时表加载到订单表的指定分区中。

-- 创建临时表,用来承接全量导入的订单信息
drop table if exists datax_shop.orders_origin;
create table datax_shop.orders_origin (id int,user_id int,seller_id int,product_id int,product_name string,product_price double,quantity int,total_price double,order_time string,year string,month string
)
row format delimited
fields terminated by '\t'
lines terminated by '\n';

创建数据同步方案,同步MySQL的订单数据到这个临时表中

{"job": {"setting": {"speed": {"channel": 1}},"content": [{"reader": {"name": "mysqlreader","parameter": {"username": "root","password": "123456","connection": [{"jdbcUrl": ["jdbc:mysql://qianfeng01:3306/datax_shop"],"table": ["orders"]}],"column": ["id","user_id","seller_id","product_id","product_name","product_price","quantity","total_price","order_time","year(order_time)","lpad(month(order_time), 2, 0)"]}},"writer": {"name": "hdfswriter","parameter": {"defaultFS": "hdfs://qianfeng01:9820","path": "/user/hive/warehouse/datax_shop.db/orders_origin/","fileName": "orders_origin","writeMode": "append","fieldDelimiter": "\t","fileType": "text","column": [{"name": "id", "type": "int"},{"name": "user_id", "type": "int"},{"name": "seller_id", "type": "int"},{"name": "product_id", "type": "int"},{"name": "product_name", "type": "string"},{"name": "product_price", "type": "double"},{"name": "quantity", "type": "double"},{"name": "total_price", "type": "double"},{"name": "order_time", "type": "string"},{"name": "year", "type": "string"},{"name": "month", "type": "string"}]}}}]}
}

加载数据,到orders表的对应分区中

-- 关闭严格模式
set hive.exec.dynamic.partition.mode=nonstrict;-- 导入数据到订单表中
insert into datax_shop.orders partition(year, month) select * from datax_shop.orders_origin;
5.1.4. 增量数据导入
用户表增量导入

在现有数据全量导入到Hive表中之后,每日新增的数据只需要增量导入到Hive即可。此时我们就可以按照用户注册的时间来确定需要将什么数据导入到Hive的用户表中。

首先,我们在将现有的数据全量的导入到Hive之后,模拟新用户的注册。

INSERT INTO users (username, password, email, phone, real_name, registration_time, last_login_time) VALUES
('natalielin','121212','natalielin@qq.com','17788889999','Natalie Lin','2023-01-01', NULL),
('harrytran','123123','harrytran@126.com','17666228192','Harry Tran','2023-01-01', NULL),
('gracewang','313131','gracewang@163.com','18872631272','Grace Wang','2023-01-01', NULL),
('peterlee','123123','peterlee@qq.com','19822781829','Peter Lee','2023-01-01',NULL);

现在我们需要将在 2023-01-01 注册的用户,增量导入到Hive的用户表中。

{"job": {"setting": {"speed": {"channel": 1}},"content": [{"reader": {"name": "mysqlreader","parameter": {"username": "root","password": "123456","column":  ["id","username","password","email","phone","real_name","registration_time","last_login_time","status"],"connection": [{"jdbcUrl": ["jdbc:mysql://qianfeng01:3306/datax_shop"],"table": ["users"]}],"where": "registration_time = '$date'"}},"writer": {"name": "hdfswriter","parameter": {"defaultFS": "hdfs://qianfeng01:9820","path": "/user/hive/warehouse/datax_shop.db/users","fileName": "append","writeMode": "append","fieldDelimiter": "\t","fileType": "orc","column": [{"name": "id", "type": "int"},{"name": "username", "type": "string"},{"name": "password", "type": "string"},{"name": "email", "type": "string"},{"name": "phone", "type": "string"},{"name": "real_name", "type": "string"},{"name": "registration_time", "type": "string"},{"name": "last_login_time", "type": "string"},{"name": "status", "type": "string"}]}}}]}
}

在上述的数据同步的方案中,我们使用到了变量 date 用来表示需要增量导入用户的注册时间。在使用这个数据同步方案的时候,需要指定变量 date 的值:

python3 /usr/local/datax/bin/datax.py -p "-Ddate=2023-01-01" incr-users.json
订单表增量导入

在现有的所有订单数据全量导入到Hive的订单表后,每天仍然会有大量的订单数据生成。此时我们只需要按照天为单位,将某一天产生的所有的数据增量导入到Hive的订单表中,并放置在指定的分区位置即可。

首先,在现有的订单数据全量导入到Hive的订单表之后,我们来模拟一些新增的订单信息

INSERT INTO orders (user_id, seller_id, product_id, product_name, product_price, quantity, total_price, order_time) VALUES
(6, 1, 110, '大米', 90, 1, 90, '2023-01-01'),
(6, 2, 120, '护手霜', 20, 2, 40, '2023-01-01'),
(6, 3, 130, '地板', 120, 5, 600, '2023-01-01'),
(7, 1, 140, '筒灯', 100, 10, 1000, '2023-01-01'),
(7, 2, 150, '假发', 2000, 1, 2000, '2023-01-01'),
(7, 3, 160, '牛奶', 100, 2, 200, '2023-01-01'),
(8, 1, 170, '百褶裙', 1000, 2, 2000, '2023-01-01'),
(8, 2, 180, '真丝丝巾', 300, 2, 600, '2023-01-01'),
(8, 3, 190, '太阳镜', 250, 1, 250, '2023-01-01'),
(9, 1, 200, '遮阳伞', 120, 1, 120, '2023-01-01'),
(9, 2, 210, '盆栽', 220, 5, 1100, '2023-01-01'),
(10, 1, 220, '口琴', 50, 1, 50, '2023-01-01'),
(10, 2, 230, 'RIO', 12, 10, 120, '2023-01-01');

现在我们需要将某一天的数据增量的导入到Hive对应的分区中,其实这个过程就是使用hdfswriter将增量的数据直接写入到HDFS的指定分区目录下即可。但是需要保证这个分区已经被创建出来了。

-- 检查指定分区是否存在
show partitions datax_shop.orders partition(year='2023', month='01');-- 如果这个分区不存在,就创建这个分区
alter table datax_shop.orders add partition(year='2023', month='01');

分区创建完成之后,就可以将某天新增的数据同步到对应的分区目录了

{"job": {"setting": {"speed": {"channel": 1}},"content": [{"reader": {"name": "mysqlreader","parameter": {"username": "root","password": "123456","column": ["id","user_id","seller_id","product_id","product_name","product_price","quantity","total_price","order_time"],"connection": [{"jdbcUrl": ["jdbc:mysql://qianfeng01:3306/datax_shop"],"table": ["orders"]}],"where": "order_time = '$date'"}},"writer": {"name": "hdfswriter","parameter": {"defaultFS": "hdfs://qianfeng01:9820","path": "/user/hive/warehouse/datax_shop.db/orders/year=$year/month=$month","fileName": "append","writeMode": "append","fieldDelimiter": "\t","fileType": "orc","column": [{"name": "id", "type": "int"},{"name": "user_id", "type": "int"},{"name": "seller_id", "type": "int"},{"name": "product_id", "type": "int"},{"name": "product_name", "type": "string"},{"name": "product_price", "type": "double"},{"name": "quantity", "type": "double"},{"name": "total_price", "type": "double"},{"name": "order_time", "type": "string"}]}}}]}
}

在上述的数据同步方案中,我们定义了三个变量:dateyearmonth,用来控制从MySQL数据库导入的数据和存放到Hive的对应的分区。因此我们在使用这个配置同步数据的时候,需要指定这三个变量值:

python3 /usr/local/datax/bin/datax.py -p "-Ddate=2023-01-01 -Dyear=2023 -Dmonth=01" incr-orders.json
5.1.5. 脚本调度

我们已经实现了将指定日期(2023-01-01)的增量的数据导入到Hive对应的数据表中,但是这样做不够灵活,我们不能每一次在需要增量导入的时候都去执行上述的一个个命令。为了能够更加方便的同步数据,以及可以定时的进行调度,我们可以将其做成一个脚本,在需要的时候直接调用即可。

shell脚本
#!/bin/bash# 获取需要同步的数据的日期为昨天
# dt=`date -d yesterday +"%Y-%m-%d"`
dt='2023-01-01'# 提取年
year=${dt:0:4}
month=${dt:5:2}# 设置DataX路径
DATAX_HOME=/usr/local/datax# 设置jobs路径
JOBS_HOME=/root/datax-example# 增量导入用户数据
python $DATAX_HOME/bin/datax.py -p "-Ddate=$dt" $JOBS_HOME/incr-users.json# 增量导入订单数据
# 1. 检查Hive表目标分区是否存在,如果目标分区不存在,创建分区
if [ ! hive -e "show partitions datax_shop.orders partition(year='$year', month='$month')" ]; thenhive -e "alter table datax_shop.orders add partition(year='$year', month='$month')"
fi# 3. 执行增量导入订单
python $DATAX_HOME/bin/datax.py -p "-Ddate=$dt -Dyear=$year -Dmonth=$month" $JOBS_HOME/incr-orders.json
python脚本
# @Author   : 
# @Company  : import datetime
import os
from pyhive import hive# 在脚本中需要和Hive进行交互,查询Hive表的分区是否存在、创建分区等操作,因此需要使用到PyHive
# 如果没有安装的话,需要手动安装一下PyHive
# yum install cyrus-sasl-plain  cyrus-sasl-devel  cyrus-sasl-gssapi
# pip3 install thrift
# pip3 install sasl
# pip3 install thrift-sasl
# pip3 install pyhive# PyHive需要使用Hive的ThriftServer服务,因此需要保证你的Hive对应的服务是打开的
# nohup hive --service hiveserver2 > /var/log/hiveserver2 2>&1 &# 设置 DataX 和 Jobs 的Home路径
DATAX_HOME = "/usr/local/datax"
JOBS_HOME = "/root/datax-example"# 设置同步任务的JSON配置文件名
INCR_USERS = "incr-users.json"
INCR_ORDERS = "incr-orders.json"# 获取当前时间
# now = datetime.datetime(2023, 1, 1)
now = datetime.datetime.now()
dt = str(now.date())
year = f"{now.year:0>4}"
month = f"{now.month:0>2}"# 增量导入用户数据
os.system(f'python {DATAX_HOME}/bin/datax.py -p "-Ddate={dt}" {JOBS_HOME}/{INCR_USERS}')# 查看Hive的指定分区是否存在,如果不存在,创建分区
with hive.Connection(host="192.168.10.111", port=10000, username="root", database="datax_shop") as conn:with conn.cursor() as cursor:cursor.execute(f"show partitions orders partition(year='{year}', month='{month}')")partitions = cursor.fetchone()if partitions is None:# 说明这个分区不存在,创建cursor.execute(f"alter table orders add partition(year='{year}', month='{month}')")# 增量导入订单数据
os.system(f'python {DATAX_HOME}/bin/datax.py -p "-Ddate={dt} -Dyear={year} -Dmonth={month}" {JOBS_HOME}/{INCR_ORDERS}')

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/899905.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

设计模式学习(1)

面向对象设计原则 单一职责 每个类只有一个职责,并被完整的封装在类中,该原则用来控制类的粒度。 例如Mapper,controller都只负责一个业务。 开闭原则 应该对扩展开放,而对修改封闭,例如定义接口或是抽象类作为抽…

在 Rocky Linux 9.2 上编译安装 Redis 6.2.6

文章目录 在 Rocky Linux 9.2 上编译安装 Redis 6.2.6Redis 介绍官网Redis 的核心特性高性能支持多种数据结构多种持久化机制复制与高可用2.5 事务与 Lua 脚本消息队列功能 Redis 适用场景Redis 与其他数据库对比Redis 的优势与劣势Redis 优势Redis 劣势 部署过程系统环境信息环…

量子计算与经典计算的融合与未来

最近研学过程中发现了一个巨牛的人工智能学习网站,通俗易懂,风趣幽默,忍不住分享一下给大家。点击下方超链接跳转到网站人工智能及编程语言学习教程。读者们可以通过里面的文章详细了解一下人工智能及其编程等教程和学习方法。下面进入文章正…

数据结构(4)——带哨兵位循环双向链表

目录 前言 一、带哨兵的循环双向链表是什么 二、链表的实现 2.1规定结构体 2.2创建节点 2.3初始化 2.4打印 2.5检验是否为空 2.6销毁链表 2.7尾插 2.8尾删 2.9头插 2.10头删 2.11寻找特定节点 2.12任意位置插入(pos前) 2.13删除任意节点 …

Github 2025-03-30 php开源项目日报 Top10

根据Github Trendings的统计,今日(2025-03-30统计)共有10个项目上榜。根据开发语言中项目的数量,汇总情况如下: 开发语言项目数量PHP项目10TypeScript项目1Coolify: 开源自助云平台 创建周期:1112 天开发语言:PHP, Blade协议类型:Apache License 2.0Star数量:10527 个Fo…

3. 线程间共享数据

1. 线程共享数据会造成什么问题? 1.1 读写不一致 多线程读不会造成数据变动,所以没有问题。只要有一个线程设计修改数据,就会导致数据共享出现问题,简单的是数据不一致,严重的是程序访问已经释放的内存,造…

JAVA垃圾回收算法和判断垃圾的算法

一、判断垃圾的算法 判断对象是否为垃圾的核心是确定对象是否不再被使用。Java主要采用以下两种算法: 1. 引用计数法(Reference Counting) 原理:每个对象维护一个引用计数器,记录被引用的次数。当引用被添加时计数器…

界面架构 - MVVM (Qt)

MVVM MVVM 的主要特点示例示例功能示例代码ViewModel 类(C)主函数入口(main.cpp) QML 文件(main.qml)总结 MVVM(Model-View-ViewModel)架构是一种旨在进一步分离界面和业务逻辑的设计…

第十四届MathorCup高校数学建模挑战赛-C题:基于 LSTM-ARIMA 和整数规划的货量预测与人员排班模型

目录 摘要 一、 问题重述 1.1 背景知识 1.2 问题描述 二、 问题分析 2.1 对问题一的分析 2.2 对问题二的分析 2.3 对问题三的分析 2.4 对问题四的分析 三、 模型假设 四、 符号说明 五、 问题一模型的建立与求解 5.1 数据预处理 5.2 基于 LSTM 的日货量预测模型 5.3 日货量预测…

银河麒麟V10 aarch64架构安装mysql教程

国产操作系统 ky10.aarch64 因为是arm架构,故选择mysql8,推荐安装8.0.28版本 尝试8.0.30和8.0.41版本均未成功,原因不明☹️ 1. 准备工作 ⏬ 下载地址:https://downloads.mysql.com/archives/community/ 2. 清理历史环境 不用管…

C++多继承

可以用多个基类来派生一个类。 格式为: class 类名:类名1,…, 类名n { private: … ; //私有成员说明; public: … ; //公有成员说明; protected: … ; //保护的成员说明; }; class D: public A, protected B, private C { …//派…

某地老旧房屋自动化监测项目

1. 项目简介 自从上个世纪90年代以来,我国经济发展迅猛,在此期间大量建筑平地而起,并且多为砖混结构的住房,使用寿命通常约为30-50年,钢筋混凝土结构,钢结构等高层建筑,这些建筑在一般情况下的…

产品经理的大语言模型课 04 -模型应用的云、边、端模式对比

目录 算力部署方式的影响因素数据量计算难度前期投入数据隐私应用规模与泛化能力 云、边、端部署的特点和对比典型场景举例社区人脸门禁后厨老鼠识别 未来展望 算力部署方式的影响因素 最近和人工智能从业者进行了非常广泛的沟通,尝试对模型应用的云、边、端模式进…

基于Python设计的TEQC数据质量可视化分析软件

标题:基于Python设计的TEQC数据质量可视化分析软件 内容:1.摘要 本文旨在设计一款基于Python的TEQC数据质量可视化分析软件。随着全球导航卫星系统(GNSS)的广泛应用,数据质量的评估变得至关重要。TEQC(TransEditQualityCheck&…

Flinksql--订单宽表

参考: https://chbxw.blog.csdn.net/article/details/115078261 (datastream 实现) 一、ODS 模拟订单表及订单明细表 CREATE TABLE orders (order_id STRING,user_id STRING,order_time TIMESTAMP(3),-- 定义事件时间及 Watermark(允许5秒乱序&#x…

粒子滤波介绍

目录 粒子滤波的主要流程可以分为以下 5 个步骤: 粒子滤波(PF) vs. ESKF(误差状态卡尔曼滤波) 粒子滤波的主要流程可以分为以下 5 个步骤: 初始化(Initialization) 生成 N 个粒子&…

一场国际安全厂商的交流会议简记

今天参与了一场国际安全厂商A公司组织的交流会议 与会有国际TOP企业跨境企业 还有国内一些头部商业公司。 A公司很有意思介绍了自己是怎么做安全运营中心SOC的。 介绍了很多内容,包括他们自己的员工量/设备量/事件量/SOC中心人员量,其中人员量只有个位数…

Java面试黄金宝典30

1. 请详细列举 30 条常用 SQL 优化方法 定义 SQL 优化是指通过对 SQL 语句、数据库表结构、索引等进行调整和改进,以提高 SQL 查询的执行效率,减少系统资源消耗,提升数据库整体性能的一系列操作。 要点 从索引运用、查询语句结构优化、数据…

花洒洗澡完毕并关闭后过段时间会突然滴水的原因探究

洗澡完毕后的残留水 在洗澡的过程中,我们通常会使用到大量的水。这些水会通过花洒管子到达花洒顶喷流出。由于大顶喷花洒的喷头较大,关闭后里面的存水会更多。 气压失衡后的滴水 当花洒关闭后,内部的水管和花洒头中仍存有一定量的水。由于…

QSettings用法实战(相机配置文件的写入和读取)

很多情况,在做项目开发的时候,将参数独立出来是比较好的方法 例如:相机的曝光次数、曝光时长等参数,独立成ini文件,用户可以在外面修改即可生效,无需在动代码重新编译等工作 QSettings便可以实现该功能 内…