离线数仓数据导出-hive数据同步到mysql

离线数仓数据导出-hive数据同步到mysql

  • MySQL建库建表
  • 数据导出

为方便报表应用使用数据,需将ads各指标的统计结果导出到MySQL数据库中。
datax支持hive同步MySQL:仅仅支持hive存储的hdfs文件导出。所以reader选hdfs-reader,writer选mysql-writer。

在这里插入图片描述
null值 在hive和mysql里的存储格式不一样,需要告诉DataX应该如何转换。
在这里插入图片描述

MySQL建库建表

12.1.1 创建数据库

CREATE DATABASE IF NOT EXISTS gmall_report DEFAULT CHARSET utf8 COLLATE utf8_general_ci;

建mysql表的,
1字段个数要和hive中的ads层数据保持一致,
2字段类型要和hive对等替换,
3字段顺序也要一致
每张表要有主键

1)各活动补贴率
dt activity_id activity_name 三个主键联合而成

DROP TABLE IF EXISTS `ads_activity_stats`;
CREATE TABLE `ads_activity_stats`  (`dt` date NOT NULL COMMENT '统计日期',`activity_id` varchar(16) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT '活动ID',`activity_name` varchar(64) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '活动名称',`start_date` varchar(16) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '活动开始日期',`reduce_rate` decimal(16, 2) NULL DEFAULT NULL COMMENT '补贴率',PRIMARY KEY (`dt`, `activity_id`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci COMMENT = '活动统计' ROW_FORMAT = Dynamic;

数据导出

DataX配置文件生成脚本
方便起见,此处提供了DataX配置文件批量生成脚本,脚本内容及使用方式如下。
1)在~/bin目录下创建gen_export_config.py脚本
[atguigu@hadoop102 bin]$ vim ~/bin/gen_export_config.py
脚本内容如下

# coding=utf-8
import json
import getopt
import os
import sys
import MySQLdb#MySQL相关配置,需根据实际情况作出修改
mysql_host = "hadoop102"
mysql_port = "3306"
mysql_user = "root"
mysql_passwd = "000000"#HDFS NameNode相关配置,需根据实际情况作出修改
hdfs_nn_host = "hadoop102"
hdfs_nn_port = "8020"#生成配置文件的目标路径,可根据实际情况作出修改
output_path = "/opt/module/datax/job/export"def get_connection():return MySQLdb.connect(host=mysql_host, port=int(mysql_port), user=mysql_user, passwd=mysql_passwd)def get_mysql_meta(database, table):connection = get_connection()cursor = connection.cursor()sql = "SELECT COLUMN_NAME,DATA_TYPE from information_schema.COLUMNS WHERE TABLE_SCHEMA=%s AND TABLE_NAME=%s ORDER BY ORDINAL_POSITION"cursor.execute(sql, [database, table])fetchall = cursor.fetchall()cursor.close()connection.close()return fetchalldef get_mysql_columns(database, table):return map(lambda x: x[0], get_mysql_meta(database, table))def generate_json(target_database, target_table):job = {"job": {"setting": {"speed": {"channel": 3},"errorLimit": {"record": 0,"percentage": 0.02}},"content": [{"reader": {"name": "hdfsreader","parameter": {"path": "${exportdir}","defaultFS": "hdfs://" + hdfs_nn_host + ":" + hdfs_nn_port,"column": ["*"],"fileType": "text","encoding": "UTF-8","fieldDelimiter": "\t","nullFormat": "\\N"}},"writer": {"name": "mysqlwriter","parameter": {"writeMode": "replace","username": mysql_user,"password": mysql_passwd,"column": get_mysql_columns(target_database, target_table),"connection": [{"jdbcUrl":"jdbc:mysql://" + mysql_host + ":" + mysql_port + "/" + target_database + "?useUnicode=true&characterEncoding=utf-8","table": [target_table]}]}}}]}}if not os.path.exists(output_path):os.makedirs(output_path)with open(os.path.join(output_path, ".".join([target_database, target_table, "json"])), "w") as f:json.dump(job, f)def main(args):target_database = ""target_table = ""options, arguments = getopt.getopt(args, '-d:-t:', ['targetdb=', 'targettbl='])for opt_name, opt_value in options:if opt_name in ('-d', '--targetdb'):target_database = opt_valueif opt_name in ('-t', '--targettbl'):target_table = opt_valuegenerate_json(target_database, target_table)if __name__ == '__main__':main(sys.argv[1:])

在~/bin目录下创建gen_export_config.sh脚本
[atguigu@hadoop102 bin]$ vim ~/bin/gen_export_config.sh
脚本内容如下。

#!/bin/bashpython ~/bin/gen_export_config.py -d gmall_report -t ads_activity_stats
python ~/bin/gen_export_config.py -d gmall_report -t ads_coupon_stats
python ~/bin/gen_export_config.py -d gmall_report -t ads_new_buyer_stats
python ~/bin/gen_export_config.py -d gmall_report -t ads_order_by_province
python ~/bin/gen_export_config.py -d gmall_report -t ads_page_path
python ~/bin/gen_export_config.py -d gmall_report -t ads_repeat_purchase_by_tm
python ~/bin/gen_export_config.py -d gmall_report -t ads_sku_cart_num_top3_by_cate
python ~/bin/gen_export_config.py -d gmall_report -t ads_trade_stats
python ~/bin/gen_export_config.py -d gmall_report -t ads_trade_stats_by_cate
python ~/bin/gen_export_config.py -d gmall_report -t ads_trade_stats_by_tm
python ~/bin/gen_export_config.py -d gmall_report -t ads_traffic_stats_by_channel
python ~/bin/gen_export_config.py -d gmall_report -t ads_user_action
python ~/bin/gen_export_config.py -d gmall_report -t ads_user_change
python ~/bin/gen_export_config.py -d gmall_report -t ads_user_retention
python ~/bin/gen_export_config.py -d gmall_report -t ads_user_stats

3)为gen_export_config.sh脚本增加执行权限
[atguigu@hadoop102 bin]$ chmod +x ~/bin/gen_export_config.sh
4)执行gen_export_config.sh脚本,生成配置文件
[atguigu@hadoop102 bin]$ gen_export_config.sh
5)观察生成的配置文件

[atguigu@hadoop102 bin]$ ls /opt/module/datax/job/export/

编写每日导出脚本
(1)在hadoop102的/home/atguigu/bin目录下创建hdfs_to_mysql.sh
[atguigu@hadoop102 bin]$ vim hdfs_to_mysql.sh
(2)编写如下内容


#! /bin/bashDATAX_HOME=/opt/module/datax#DataX导出路径不允许存在空文件,该函数作用为清理空文件
handle_export_path(){target_file=$1for i in `hadoop fs -ls -R $target_file | awk '{print $8}'`; dohadoop fs -test -z $iif [[ $? -eq 0 ]]; thenecho "$i文件大小为0,正在删除"hadoop fs -rm -r -f $ifidone}#数据导出
export_data() {datax_config=$1export_dir=$2hadoop fs -test -e $export_dirif [[ $? -eq 0 ]]thenhandle_export_path $export_dirfile_count=$(hadoop fs -ls $export_dir | wc -l)if [ $file_count -gt 0 ]thenset -e;$DATAX_HOME/bin/datax.py -p"-Dexportdir=$export_dir" $datax_configset +e;else echo "$export_dir 目录为空,跳过~"fielseecho "路径 $export_dir 不存在,跳过~"fi
}case $1 in"ads_new_buyer_stats")export_data /opt/module/datax/job/export/gmall_report.ads_new_buyer_stats.json /warehouse/gmall/ads/ads_new_buyer_stats;;"ads_order_by_province")export_data /opt/module/datax/job/export/gmall_report.ads_order_by_province.json /warehouse/gmall/ads/ads_order_by_province;;"ads_page_path")export_data /opt/module/datax/job/export/gmall_report.ads_page_path.json /warehouse/gmall/ads/ads_page_path;;"ads_repeat_purchase_by_tm")export_data /opt/module/datax/job/export/gmall_report.ads_repeat_purchase_by_tm.json /warehouse/gmall/ads/ads_repeat_purchase_by_tm;;"ads_trade_stats")export_data /opt/module/datax/job/export/gmall_report.ads_trade_stats.json /warehouse/gmall/ads/ads_trade_stats;;"ads_trade_stats_by_cate")export_data /opt/module/datax/job/export/gmall_report.ads_trade_stats_by_cate.json /warehouse/gmall/ads/ads_trade_stats_by_cate;;"ads_trade_stats_by_tm")export_data /opt/module/datax/job/export/gmall_report.ads_trade_stats_by_tm.json /warehouse/gmall/ads/ads_trade_stats_by_tm;;"ads_traffic_stats_by_channel")export_data /opt/module/datax/job/export/gmall_report.ads_traffic_stats_by_channel.json /warehouse/gmall/ads/ads_traffic_stats_by_channel;;"ads_user_action")export_data /opt/module/datax/job/export/gmall_report.ads_user_action.json /warehouse/gmall/ads/ads_user_action;;"ads_user_change")export_data /opt/module/datax/job/export/gmall_report.ads_user_change.json /warehouse/gmall/ads/ads_user_change;;"ads_user_retention")export_data /opt/module/datax/job/export/gmall_report.ads_user_retention.json /warehouse/gmall/ads/ads_user_retention;;"ads_user_stats")export_data /opt/module/datax/job/export/gmall_report.ads_user_stats.json /warehouse/gmall/ads/ads_user_stats;;"ads_activity_stats")export_data /opt/module/datax/job/export/gmall_report.ads_activity_stats.json /warehouse/gmall/ads/ads_activity_stats;;"ads_coupon_stats")export_data /opt/module/datax/job/export/gmall_report.ads_coupon_stats.json /warehouse/gmall/ads/ads_coupon_stats;;"ads_sku_cart_num_top3_by_cate")export_data /opt/module/datax/job/export/gmall_report.ads_sku_cart_num_top3_by_cate.json /warehouse/gmall/ads/ads_sku_cart_num_top3_by_cate;;"all")export_data /opt/module/datax/job/export/gmall_report.ads_new_buyer_stats.json /warehouse/gmall/ads/ads_new_buyer_statsexport_data /opt/module/datax/job/export/gmall_report.ads_order_by_province.json /warehouse/gmall/ads/ads_order_by_provinceexport_data /opt/module/datax/job/export/gmall_report.ads_page_path.json /warehouse/gmall/ads/ads_page_pathexport_data /opt/module/datax/job/export/gmall_report.ads_repeat_purchase_by_tm.json /warehouse/gmall/ads/ads_repeat_purchase_by_tmexport_data /opt/module/datax/job/export/gmall_report.ads_trade_stats.json /warehouse/gmall/ads/ads_trade_statsexport_data /opt/module/datax/job/export/gmall_report.ads_trade_stats_by_cate.json /warehouse/gmall/ads/ads_trade_stats_by_cateexport_data /opt/module/datax/job/export/gmall_report.ads_trade_stats_by_tm.json /warehouse/gmall/ads/ads_trade_stats_by_tmexport_data /opt/module/datax/job/export/gmall_report.ads_traffic_stats_by_channel.json /warehouse/gmall/ads/ads_traffic_stats_by_channelexport_data /opt/module/datax/job/export/gmall_report.ads_user_action.json /warehouse/gmall/ads/ads_user_actionexport_data /opt/module/datax/job/export/gmall_report.ads_user_change.json /warehouse/gmall/ads/ads_user_changeexport_data /opt/module/datax/job/export/gmall_report.ads_user_retention.json /warehouse/gmall/ads/ads_user_retentionexport_data /opt/module/datax/job/export/gmall_report.ads_user_stats.json /warehouse/gmall/ads/ads_user_statsexport_data /opt/module/datax/job/export/gmall_report.ads_activity_stats.json /warehouse/gmall/ads/ads_activity_statsexport_data /opt/module/datax/job/export/gmall_report.ads_coupon_stats.json /warehouse/gmall/ads/ads_coupon_statsexport_data /opt/module/datax/job/export/gmall_report.ads_sku_cart_num_top3_by_cate.json /warehouse/gmall/ads/ads_sku_cart_num_top3_by_cate;;
esac

(3)增加脚本执行权限
[atguigu@hadoop102 bin]$ chmod +x hdfs_to_mysql.sh
(4)脚本用法
[atguigu@hadoop102 bin]$ hdfs_to_mysql.sh all

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/748.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

python输入输出特殊处理

输出 需要满足输出一行后,再输出一行,行中每个元素用空格隔开 length len(tri) tmp [] for i in range(len(tri)):tmp tri[i]for j in range(len(tri[i])):print(tmp[j],end )print()输入p 一次性输入6个数字到列表中,并且输入的每个数…

怎样在外网登录访问CRM管理系统?

一、什么是CRM管理系统? Customer Relationship Management,简称CRM,指客户关系管理,是企业利用信息互联网技术,协调企业、顾客和服务上的交互,提升管理服务。为了企业信息安全以及使用方便,企业…

SSM小程序作品集展示微信小程序

采用技术 SSM小程序作品集展示微信小程序的设计与实现~ 开发语言:Java 数据库:MySQL 技术:SpringMVCMyBatis 工具:IDEA/Ecilpse、Navicat、Maven 页面展示效果 用户功能 用户注册 用户首页 作品集 优秀作者 我的分享 管…

Linux嵌入式驱动开发-内核定时器

文章目录 内核时间管理处理jiffies绕回的APIExample:使用 jiffies 判断超时jiffies与ms、us、ns转换的API 内核定时器内核定时器APIinit_timer: 初始化 timer_list 类型变量add_timer: 向 Linux 内核注册定时器del_timer: 删除一个定时器del_timer_sync: del_timer …

powershell@命令行提示符样式配置自定义@pwsh重写prompt显示电量内存时间等信息

文章目录 abstract流行的powershell prompt模块示例 powershell原生修改Prompt函数配置文档Prompt命令来自哪里 简单修改带上电量和时间的Prompt 复杂修改预览FAQ:没有必要修改相关仓库地址样式选择平衡样式花哨样式响应性能 小结 abstract 在 PowerShell 中,可以通…

CSDN积分和等级和 能创建专栏数量的关系。还差1000多分!

积分查询:CSDN 博客积分规则 博客积分是CSDN对用户努力的认可和奖励,也是衡量博客水平的重要标准。博客等级也将由博客积分唯一决定。积分规则具体如下: 1、每发布一篇原创或者翻译文章:可获得10分; 2、每发布一篇转载…

做一个答题pk小程序多少钱

在探讨“做一个答题pk小程序多少钱”这一问题时,我们首先需要明确的是,小程序的价格并非固定不变,而是受到多种因素的影响。这些因素包括但不限于小程序的复杂度、功能需求、开发周期、技术难度以及开发团队的规模和经验等。因此,…

Docker-volume创建数据卷

创建一个名为myvol的数据卷: [rootlocalhost ~]# docker volume create myvol myvol[rootlocalhost ~]# docker volume ls DRIVER VOLUME NAME local myvol查看数据卷: [rootlocalhost ~]# docker volume inspect myvol [{&…

Web前端 JavaScript笔记7

js的执行机制 js是单线程 同步:前面一个任务执行结束之后,执行后一个 异步:异步任务,引擎放在一边,不进入主线程,而进入任务队列的任务 js通过浏览器解析,浏览器靠引擎解析 回调函数同步任务执行…

微服务与Web服务:定义、优势、挑战与实践指南20240416

引言 在当前的软件开发领域,微服务和Web服务是两个频繁被讨论的术语。随着企业应用的复杂性和规模的增加,深入理解这两种服务架构的优势和挑战变得极为关键。本文将探讨微服务和Web服务的核心概念、优缺点,并通过Go语言示例展示它们的实现。…

LabVIEW卡尔曼滤波技术

LabVIEW卡尔曼滤波技术 在现代航空导航中,高精度和快速响应的方位解算对于航空安全至关重要。通过LabVIEW平台实现一种卡尔曼滤波方位解算修正技术,以改善传统导航设备在方位解算中的噪声干扰问题,从而提高其解算精度和效率。通过LabVIEW的强…

新媒体短视频运营之抖音的19种流量变现模式

本文中谈的有带货类、广告推广类、团购类、任务推广类等多种抖音变现方式,包括图文带货、视频带货、抖店带货、直播带货等,适合个人博主、商家和企业等不同用户群体。 同时,还介绍了中视频计划、剪映创作人、看见音乐计划等变现路径。 1、图文带货 开通要求:0粉即可开通…

Java基础之JVM基础调优与常见问题

常见命令 以下命令的介绍,全部在jdk8环境下运行的; jps ☆☆☆☆☆ 查看当前运行的进程号; jmap ☆☆☆ jmap命令可以查看jvm的内存信息,class对应的实例个数以及占用的内存大小 jmap -histo 查看当前java进程 [rdVM-8-12-c…

Ugee手写板Ex08 S设置流程

手写笔的结构 笔尖 鼠标左键 上面第一个键:鼠标右键(效果有时候也不完全等同) 上面第二个键:鼠标中键 WPS ①打开pdf ②批注->随意画->画曲线 效果如下:

定时执行专家 - 高级功能详解 - 关联任务设置

◆ 需求场景 AB两个任务,A任务每隔 10分钟执行一次;A任务执行完1分钟(60秒)之后,再执行B任务。这种情况就需要用到“关联任务”功能。 此处以“日程提醒”、“执行Nircmd命令”举例。A任务设置:日程提醒&…

小程序中使用HTTPS调用自带文本安全内容检测接口(msg_sec_check)的实现方法

在小程序中调用自带的文本安全内容检测接口,你需要使用小程序提供的wx.request方法。以下是一个示例代码: javascript代码: // 假设你已经获取了access_token,如果不知道如何获取,可以参考我上一篇文章 const access_token 你的access_tok…

Matlab 最小二乘法拟合直线(过指定点)

文章目录 一、简介二、实现代码三、实现效果参考资料一、简介 基于之前博客中的思路:点云最小二乘法拟合空间直线,我们只需要将其中的去均值化的操作,转换为我们要指定点的坐标,并计算出相应的协方差矩阵,就可以很容易的求解出经过我们指定点的直线(当然它仍然保证了所有…

Python 基于docker部署的Mysql备份查询脚本

前言 此环境是基于docker部署的mysql,docker部署mysql可以参考如下链接: docker 部署服务案例-CSDN博客 颜色块文件 rootbogon:~ 2024-04-18 16:34:23# cat DefaultColor.py ######################################################################…

java音乐播放器系统设计与实现springboot-vue

后端技术 SpinrgBoot的主要优点有: 1、为所有spring开发提供了一个更快、更广泛的入门体验; 2、零配置; 3、集成了大量常用的第三方库的配置; Maven: 项目管理和构建自动化工具,用于java项目。 java: 广泛使用的编程语…

阿里云服务器连接数详细说明

阿里云服务器“连接数”是什么意思?连接又称网络会话,是客户端与服务器建立连接并传输数据的过程。网络五元组(包括源IP、目的IP、源端口、目的端口、协议)唯一确定一个连接,ECS实例的连接数包括通过TCP、UDP、ICMP协议…