使用spark将MongoDB数据导入hive
一、pyspark
1.1 pymongo+spark
代码
import json,sys
import datetime, time
import pymongo
import urllib.parse
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType'127.0.0.1 27017 mongo_db mongo_collection hive_db hive_table mongo_user mongo_password'
print('host:',sys.argv[1], 'port:',sys.argv[2], 'mongo_db:',sys.argv[3], 'mongo_collection:',sys.argv[4], 'hive_db:',sys.argv[5], 'hive_table:',sys.argv[6], 'mongo_user:',sys.argv[7], 'mongo_password:',sys.argv[8])
# MongoDB连接信息
mongo_username =sys.argv[7]
mongo_password = sys.argv[8]
mongo_host = sys.argv[1]
mongo_database = sys.argv[3]
mongo_port=sys.argv[2]# 转义用户名和密码
if mongo_username and mongo_password:escaped_username = urllib.parse.quote_plus(mongo_username)escaped_password = urllib.parse.quote_plus(mongo_password)# 构建MongoDB连接URLmongo_connection_url = "mongodb://{0}:{1}@{2}:{3}/{4}".format(escaped_username, escaped_password, mongo_host, mongo_port, mongo_database)
else:mongo_connection_url = "mongodb://{0}:{1}/{2}".format( mongo_host, mongo_port, mongo_database)
# 连接 MongoDB
mongo_client = pymongo.MongoClient(mongo_connection_url)
mongo_db = mongo_client[sys.argv[3]]
mongo_collection = mongo_db[sys.argv[4]]
mongo_data = mongo_collection.find()
mongo_collection.list_indexes()
# 从 MongoDB 读取数据
values = []
values_batch = []
flag=0
time_start = time.time()
for data in mongo_data:if flag==0:# 定义字段列表field_list = list(data.keys())flag=1res={}for key in field_list:try:res[key]=json.dumps( str(data[key]), ensure_ascii=False)except:res[key]=Nonecolumns = list(res.values())values_batch.append(columns)# 创建 SparkSession
spark = SparkSession.builder.appName("CreateStringDataFrame").master('local[*]').getOrCreate()
# 构建 StructType 对象
schema = StructType([StructField(field_name, StringType(), True) for field_name in field_list])
# 创建空的 DataFrame
df = spark.createDataFrame(values_batch, schema)
# 显示 DataFrame 结构
df.printSchema()
df.write.mode("overwrite").saveAsTable("{Hive_DB}.{Hive_Name}".format(Hive_DB=sys.argv[5], Hive_Name=sys.argv[6]))
spark.stop()
time_end = time.time()
spark-submit
spark-submit --num-executors 1 --executor-memory 512M --executor-cores 1 --deploy-mode client --queue root.users.root ./mongo.py 127.0.0.1 27017 test_db test_table tmp_can_delete_database mongo_test test1 test1
1.2 mongo-spark-connector
生产环境不方便使用,亲测各种报错
from pyspark.sql import SparkSession
my_spark = SparkSession \.builder \.appName("myApp") \.config("spark.mongodb.read.connection.uri", "mongodb://127.0.0.1/intca2.tweetsIntca2") \.config("spark.mongodb.write.connection.uri", "mongodb://127.0.0.1/intca2.tweetsIntca2") \.config('spark.jars.packages', 'org.mongodb.spark:mongo-spark-connector_2.11:2.2.2') \
.getOrCreate()
df = spark.read.format("com.mongodb.spark.sql.DefaultSource").load()
df.write.mode("overwrite").saveAsTable("{Hive_DB}.{Hive_Name}")
spark.stop()
二、Scala
2.1 pom.xml
<dependencies><dependency><groupId>com.thoughtworks.paranamer</groupId><artifactId>paranamer</artifactId><version>2.8</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-hive_2.11</artifactId><version>2.4.0</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.11</artifactId><version>2.4.0</version></dependency><dependency><groupId>org.mongodb.spark</groupId><artifactId>mongo-spark-connector_2.12</artifactId><version>2.4.0</version></dependency></dependencies>
2.2 代码
package org.spark
import org.apache.spark.sql.{SaveMode, SparkSession}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
object Main {private def insert(host:String,port:String,db:String,collection:String,Hive_DB:String,Hive_Name:String,username:Object=None,password:Object=None): Unit = {var url=""//判断username 和密码是否为空if(username==None || password==None){url=s"mongodb://${host}:${port}/${db}.${collection}"}else{url=s"mongodb://${username}:${password}@${host}:${port}/${db}.${collection}"}val spark = SparkSession.builder().appName("mongo").config("spark.mongodb.input.uri", url).getOrCreate()val df=spark.read.format("com.mongodb.spark.sql.DefaultSource").load()df.show(5)val string_df=df.select(df.columns.map(c => col(c).cast("string").alias(c)): _*)string_df.write.mode(SaveMode.Overwrite).saveAsTable(s"${Hive_DB}.${Hive_Name}")spark.stop()}def main(args: Array[String]): Unit = {val start_time = System.currentTimeMillis()println(args(0),args(1),args(2),args(3),args(4),args(5),args(6),args(7))insert(args(0),args(1),args(2),args(3),args(4),args(5),args(6),args(7))
// insert("127.0.0.1","27017","mongo_db","mongo_collection","hive_db","hive_table","mongo_username","mongo_password")val end_time = System.currentTimeMillis()val duration = (end_time - start_time) / 1000.0println(s"程序运行时间为 $duration 秒")}
}