數據集成平台:datax將MySQL數據以query方式同步到hive
1.py腳本
# coding=utf-8
import json
import getopt
import os
import sys
import MySQLdb
import re# MySQL相关配置,需根据实际情况作出修改
mysql_host = "xx"
mysql_port = "3306"
mysql_user = "xx"
mysql_passwd = "xx"# HDFS NameNode相关配置,需根据实际情况作出修改
hdfs_nn_host = "mycluster"
hdfs_nn_port = "8020"# 生成配置文件的目标路径,可根据实际情况作出修改
def get_connection():return MySQLdb.connect(host=mysql_host, port=int(mysql_port), user=mysql_user, passwd=mysql_passwd)def get_hive_columns(database, table, source_columns):return [{"name": col, "type": "string"} for col in source_columns]def generate_json(source_database, source_table, source_querySql, columns_list):job = {"job": {"setting": {"speed": {"channel": 15},"errorLimit": {"record": 0,"percentage": 0.02}},"content": [{"reader": {"name": "mysqlreader","batchSize": "8192","batchByteSize": "33554432","parameter": {"username": mysql_user,"password": mysql_passwd,"connection": [{"querySql": [source_querySql],"jdbcUrl": ["jdbc:mysql://" + mysql_host + ":" + mysql_port + "/" + source_database + "?userCompress=true&useCursorFetch=true&useUnicode=true&characterEncoding=utf-8&useSSL=false"]}]}},"writer": {"name": "hdfswriter","batchSize": "8192","batchByteSize": "33554432","parameter": {"defaultFS": "hdfs://" + hdfs_nn_host + ":" + hdfs_nn_port,"fileType": "text","path": "${targetdir}","fileName": source_table,"column": get_hive_columns(source_database, source_table, columns_list),"writeMode": "append","fieldDelimiter": u"\u0001","compress": "gzip"}},"transformer": [{"name": "dx_groovy","parameter": {"code": "for(int i=0;i<record.getColumnNumber();i++){if(record.getColumn(i).getByteSize()!=0){Column column = record.getColumn(i); def str = column.asString(); def newStr=null; newStr=str.replaceAll(\"[\\r\\n]\",\"\"); record.setColumn(i, new StringColumn(newStr)); };};return record;","extraPackage": []}}]}]}}output_path = "/opt/module/datax/job/import/" + source_databaseif not os.path.exists(output_path):os.makedirs(output_path)with open(os.path.join(output_path, ".".join([source_database, source_table, "json"])), "w") as f:json.dump(job, f)def main(args):source_database = ""source_table = ""source_querySql = ""options, arguments = getopt.getopt(args, 'd:t:s:', ['sourcedb=', 'querysql='])for opt_name, opt_value in options:if opt_name in ('-d', '--sourcedb'):source_database = opt_valueif opt_name in ('-t', '--sourcetbl'):source_table = opt_valueif opt_name in ('-s', '--querysql'):source_querySql = opt_valuematch = re.search(r"select(.*?)from", source_querySql, re.DOTALL | re.IGNORECASE)selected_columns = match.group(1).strip()# 去掉所有空格和换行selected_columns_cleaned = re.sub(r'\s+', '', selected_columns)# 分割字段columns_list = re.split(r',', selected_columns_cleaned)print(columns_list)generate_json(source_database, source_table, source_querySql, columns_list)#print(source_database, source_table, source_querySql)if __name__ == '__main__':main(sys.argv[1:])
2.sh腳本
#!/bin/bash
python ~/bin/sap_gateway_query_gen_import_config.py -d sap_gateway -t test2 -s “querySql”