數據集成平台:datax將MySQL數據同步到hive(全部列和指定列)

1.數據集成平台:將MySQL數據同步到hive(全部和指定列)

  1. python環境:2.7版本
  2. py腳本
    傳參:

source_database:數據庫
source_table:表
source_columns:列
source_splitPk:split key,要求必須是int類型

# coding=utf-8
import json
import getopt
import os
import sys
import MySQLdb#MySQL相关配置,需根据实际情况作出修改
mysql_host = "47.57.227.5"
mysql_port = "3306"
mysql_user = "vinson_readonly"
mysql_passwd = "8AGY5Eqq8Ac8VR7b"#HDFS NameNode相关配置,需根据实际情况作出修改
hdfs_nn_host = "mycluster"
hdfs_nn_port = "8020"#生成配置文件的目标路径,可根据实际情况作出修改
def get_connection():return MySQLdb.connect(host=mysql_host, port=int(mysql_port), user=mysql_user, passwd=mysql_passwd)def get_mysql_meta(database, table, columns):connection = get_connection()cursor = connection.cursor()if columns == 'all':# 如果传入 '*' 表示要所有列sql = "SELECT COLUMN_NAME, DATA_TYPE FROM information_schema.COLUMNS WHERE TABLE_SCHEMA='%s' AND TABLE_NAME='%s' ORDER BY ORDINAL_POSITION" % (database, table)else:# 传入指定列# 将每个列名加上单引号columns = ', '.join("'%s'" % col.strip() for col in columns.split(','))sql = "SELECT COLUMN_NAME, DATA_TYPE FROM information_schema.COLUMNS WHERE TABLE_SCHEMA='%s' AND TABLE_NAME='%s' AND COLUMN_NAME IN (%s) ORDER BY ORDINAL_POSITION" % (database, table, columns)cursor.execute(sql)fetchall = cursor.fetchall()# print(fetchall)cursor.close()connection.close()return fetchalldef get_mysql_columns(database, table,source_columns):return map(lambda x: x[0], get_mysql_meta(database,table,source_columns))def get_hive_columns(database, table,source_columns):def type_mapping(mysql_type):mappings = {"bigint": "bigint","int": "bigint","smallint": "bigint","tinyint": "bigint","mediumint": "bigint","decimal": "string","double": "double","float": "float","binary": "string","char": "string","varchar": "string","datetime": "string","time": "string","timestamp": "string","date": "string","text": "string","bit": "string",}return mappings[mysql_type]meta = get_mysql_meta(database, table,source_columns)return map(lambda x: {"name": x[0], "type": type_mapping(x[1].lower())}, meta)def generate_json(source_database, source_table,source_columns,source_splitPk):job = {"job": {"setting": {"speed": {"channel": 15},"errorLimit": {"record": 0,"percentage": 0.02}},"content": [{"reader": {"name": "mysqlreader","batchSize":"8192","batchByteSize":"33554432","parameter": {"username": mysql_user,"password": mysql_passwd,"column": get_mysql_columns(source_database, source_table,source_columns),"splitPk": source_splitPk,"connection": [{"table": [source_table],"jdbcUrl": ["jdbc:mysql://" + mysql_host + ":" + mysql_port + "/" + source_database + "?userCompress=true&useCursorFetch=true&useUnicode=true&characterEncoding=utf-8&useSSL=false"]}]}},"writer": {"name": "hdfswriter","batchSize":"8192","batchByteSize":"33554432","parameter": {"defaultFS": "hdfs://" + hdfs_nn_host + ":" + hdfs_nn_port,"fileType": "text","path": "${targetdir}","fileName": source_table,"column": get_hive_columns(source_database, source_table,source_columns),"writeMode": "append","fieldDelimiter": u"\u0001","compress": "gzip"}},"transformer": [{"name": "dx_groovy","parameter": {"code": "for(int i=0;i<record.getColumnNumber();i++){if(record.getColumn(i).getByteSize()!=0){Column column = record.getColumn(i); def str = column.asString(); def newStr=null; newStr=str.replaceAll(\"[\\r\\n]\",\"\"); record.setColumn(i, new StringColumn(newStr)); };};return record;","extraPackage":[]}}]}]}}output_path = "/opt/module/datax/job/import/" + source_databaseif not os.path.exists(output_path):os.makedirs(output_path)with open(os.path.join(output_path, ".".join([source_database, source_table, "json"])), "w") as f:json.dump(job, f)def main(args):source_database = ""source_table = ""source_columns = ""source_splitPk = ""options, arguments = getopt.getopt(args, 'd:t:c:k:', ['sourcedb=', 'sourcetbl=', 'columns=', 'splitPk='])for opt_name, opt_value in options:if opt_name in ('-d', '--sourcedb'):source_database = opt_valueif opt_name in ('-t', '--sourcetbl'):source_table = opt_valueif opt_name in ('-c', '--columns'):source_columns = opt_valueif opt_name in ('-k', '--splitPk'):source_splitPk = opt_valuegenerate_json(source_database, source_table,source_columns,source_splitPk)if __name__ == '__main__':main(sys.argv[1:])
  1. sh腳本
#!/bin/bash
python ~/bin/sap_gateway_gen_import_config.py -d db -t table -c Id,created_date -k selfincrementid
python ~/bin/sap_gateway_gen_import_config.py  -d db -t table  -c all -k selfincrementid

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/702128.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

YOLOv8改进 | Conv篇 | 利用YOLOv9的GELAN模块替换C2f结构(附轻量化版本 + 高效涨点版本 + 结构图)

一、本文介绍 本文给大家带来的改进机制是利用2024/02/21号最新发布的YOLOv9其中提出的GELAN模块来改进YOLOv8中的C2f,GELAN融合了CSPNet和ELAN机制同时其中利用到了RepConv在获取更多有效特征的同时在推理时专用单分支结构从而不影响推理速度,同时本文的内容提供了两种版本…

[数据集][目标检测]游泳者溺水数据集VOC+YOLO格式2类别895张

数据集制作单位&#xff1a;未来自主研究中心(FIRC) 数据集格式&#xff1a;Pascal VOC格式YOLO格式(不包含分割路径的txt文件&#xff0c;仅仅包含jpg图片以及对应的VOC格式xml文件和yolo格式txt文件) 图片数量(jpg文件个数)&#xff1a;895 标注数量(xml文件个数)&#xff1a…

遗传算法优化LSTM回归预测,ga-lstm时间序列的预测

​目录 背影 摘要 LSTM的基本定义 LSTM实现的步骤 遗传算法原理 遗传算法优化LSTM回归预测,ga-lstm时间序列的预测 MATALB代码:遗传算法优化LSTM回归预测,ga-lstm时间序列的预测(代码完整,数据齐全)资源-CSDN文库 https://download.csdn.net/download/abc991835105/8887…

【文生视频】Diffusion Transformer:OpenAI Sora 原理、Stable Diffusion 3 同源技术

文生视频 Diffusion Transformer&#xff1a;Sora 核心架构、Stable Diffusion 3 同源技术 提出背景变换器的引入Diffusion Transformer (DiT)架构Diffusion Transformer (DiT)总结 OpenAI Sora 设计思路阶段1: 数据准备和预处理阶段2: 架构设计阶段3: 输入数据的结构化阶段4: …

云安全威胁及日常防护方案建议

随着互联网技术发展&#xff0c;企业越来越多地采用云服务&#xff0c;云安全成为了企业IT安全的重要组成部分。然而&#xff0c;伴随技术的发展普及&#xff0c;云安全也面临着许多安全风险。下面我们就来简单了解下目前常见的一些云安全风险以及有什么防护方案。 1、云平台配…

基于qt的图书管理系统----04sql功能开发

参考b站&#xff1a;视频连接 源码github&#xff1a;github 目录 1 封装一个全局的对象2 设计所有接口2.1 初始化数据库接口2.2 登陆接口2.3 条件查询用户接口 1 封装一个全局的对象 新建一个cclass&#xff0c;sqlmange&#xff0c;并且在.pro文件中添加上sql 使用c单例模…

Linux进程【补充】

文章目录 进程概念task_struct 进程创建forkvfork写时拷贝 进程状态僵尸进程孤儿进程守护进程 进程地址空间是什么为什么怎么做 进程概念 进程是一个程序的执行实例或者是担当系统资源分配的实体。当一个程序运行时&#xff0c;被从硬盘加载到内存中&#xff0c;操作系统为每个…

Python实战:爬取小红书——采集笔记详情

上一篇文章发出后&#xff0c;有读者问能不能爬到小红书笔记详情数据&#xff0c;今天他来了。 Python实战&#xff1a;爬取小红书 一、先看效果 程序输入&#xff1a;在一个txt文件内粘贴要爬取的笔记链接&#xff0c;每行放1个链接。 程序输出&#xff1a;输出是一个所有笔记…

Mybatis执行过程

1、加载配置文件 InputStream is Resources.getResourceAsStream("mybatis-config.xml"); 2、获得session对象&#xff0c;接受结果 SqlSessionfactoryBuilder builder new SqlSessionfactoryBuilder(); SqlSessionFactory factory builder .build(is); SqlSessio…

docker-compose 搭建laravel环境

laravel环境包含nginx,mysql,php7.4,redis 一、安装好docker后pull镜像 1.nginx镜像 docker pull nginx:latest单独启动容器 docker run --name nginx -p 80:80 -d nginx 2.php镜像 docker pull php:7.4-fpm3.mysql镜像 docker pull mysql:5.74.redis镜像 docker pull r…

zabbix3.4.6 源码安装

Step1&#xff1a; 下载 https://www.zabbix.com/download 选中一下。download Zabbix Sources PackageReleaseDateRelease NotesZabbix ManualDownloadZabbix 3.4Server, Proxy, Agent, GUI3.4.615 January, 2018 Download step2 &#xff1a;拷贝在redhat 6.3_X86_86(192…

UE蓝图 序列(Sequence)节点和源码

系列文章目录 UE蓝图 Get节点和源码 UE蓝图 Set节点和源码 UE蓝图 Cast节点和源码 UE蓝图 分支(Branch)节点和源码 UE蓝图 入口(FunctionEntry)节点和源码 UE蓝图 返回结果(FunctionResult)节点和源码 UE蓝图 函数调用(CallFunction)节点和源码 UE蓝图 函数调用(CallFunction)…

Vue3中的事件监听与处理机制深度解析

随着Vue3的发布&#xff0c;其在性能、灵活性和易用性上都实现了显著提升。其中&#xff0c;事件监听和处理机制作为Vue框架中的重要组成部分&#xff0c;也进行了相应的优化与升级。本文将深入探讨Vue3中如何进行事件监听与处理。 一、Vue3事件绑定 在Vue3中&#xff0c;我们…

springboot215基于springboot技术的美食烹饪互动平台的设计与实现

美食烹饪互动平台的设计与实现 摘 要 如今社会上各行各业&#xff0c;都喜欢用自己行业的专属软件工作&#xff0c;互联网发展到这个时候&#xff0c;人们已经发现离不开了互联网。新技术的产生&#xff0c;往往能解决一些老技术的弊端问题。因为传统美食信息管理难度大&…

MAC地址学习和老化

MAC地址学习过程 一般情况下&#xff0c;MAC地址表是设备根据收到的数据帧里的源MAC地址自动学习而建立的。 图1 MAC地址学习示意图 如图1&#xff0c;HostA向SwitchA发送数据时&#xff0c;SwitchA从数据帧中解析出源MAC地址&#xff08;即HostA的MAC地址&#xff09;和VLAN…

做接口测试的流程一般是怎么样的?UI功能6大流程、接口测试8大流程这些你真的全会了吗?

在讲接口流程测试之前&#xff0c;首先需要给大家申明下&#xff1a;接口测试对于测试人员而言&#xff0c;非常非常重要&#xff0c;懂功能测试接口测试&#xff0c;就能在企业中拿到一份非常不错的薪资。 这么重要的接口测试&#xff0c;一般也是面试笔试必问。为方便大家更…

C++ //练习 8.13 重写本节的电话号码程序,从一个命名文件而非cin读取数据。

C Primer&#xff08;第5版&#xff09; 练习 8.13 练习 8.13 重写本节的电话号码程序&#xff0c;从一个命名文件而非cin读取数据。 环境&#xff1a;Linux Ubuntu&#xff08;云服务器&#xff09; 工具&#xff1a;vim 代码块 /***************************************…

Unity(第四部)新手组件

暴力解释就是官方给你的功能&#xff1b;作用的对象上面如&#xff1a; 创建一个球体&#xff0c;给这个球体加上重力 所有物体都是一个空物体&#xff0c;加上一些组件才形成了所需要的GameObject。 这是一个空物体&#xff0c;在Scene场景中没有任何外在表现&#xff0c;因为…

tinyxml2开源库使用

源码下载&#xff1a;GitHub - leethomason/tinyxml2: TinyXML2 is a simple, small, efficient, C XML parser that can be easily integrated into other programs. 1.加载tinyxml2库 解压上面现在的压缩包&#xff0c;将tinyxml2.h/tinyxml2.cpp添加到项目工程当中&#x…

javascript中的垃圾回收机制

一、什么是JavaScript垃圾回收机制 JavaScript中的垃圾回收机制是自动管理内存的一种机制。它负责在程序运行时识别和清除不再使用的内存&#xff0c;以便释放资源并提高性能。 JavaScript中的垃圾回收器会定期扫描内存中的对象&#xff0c;标记那些可达对象和不可达对象。 可达…