使用spark将MongoDB数据导入hive

使用spark将MongoDB数据导入hive

一、pyspark
1.1 pymongo+spark
代码
import json,sys
import datetime, time
import pymongo
import urllib.parse
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType'127.0.0.1 27017 mongo_db mongo_collection hive_db hive_table mongo_user mongo_password'
print('host:',sys.argv[1], 'port:',sys.argv[2], 'mongo_db:',sys.argv[3], 'mongo_collection:',sys.argv[4], 'hive_db:',sys.argv[5], 'hive_table:',sys.argv[6], 'mongo_user:',sys.argv[7], 'mongo_password:',sys.argv[8])
# MongoDB连接信息
mongo_username =sys.argv[7]
mongo_password = sys.argv[8]
mongo_host = sys.argv[1]
mongo_database = sys.argv[3]
mongo_port=sys.argv[2]# 转义用户名和密码
if mongo_username and mongo_password:escaped_username = urllib.parse.quote_plus(mongo_username)escaped_password = urllib.parse.quote_plus(mongo_password)# 构建MongoDB连接URLmongo_connection_url = "mongodb://{0}:{1}@{2}:{3}/{4}".format(escaped_username, escaped_password, mongo_host, mongo_port, mongo_database)
else:mongo_connection_url = "mongodb://{0}:{1}/{2}".format( mongo_host, mongo_port, mongo_database)
# 连接 MongoDB
mongo_client = pymongo.MongoClient(mongo_connection_url)
mongo_db = mongo_client[sys.argv[3]]
mongo_collection = mongo_db[sys.argv[4]]
mongo_data = mongo_collection.find()
mongo_collection.list_indexes()
# 从 MongoDB 读取数据
values = []
values_batch = []
flag=0
time_start = time.time()
for data in mongo_data:if flag==0:# 定义字段列表field_list = list(data.keys())flag=1res={}for key in field_list:try:res[key]=json.dumps( str(data[key]), ensure_ascii=False)except:res[key]=Nonecolumns = list(res.values())values_batch.append(columns)# 创建 SparkSession
spark = SparkSession.builder.appName("CreateStringDataFrame").master('local[*]').getOrCreate()
# 构建 StructType 对象
schema = StructType([StructField(field_name, StringType(), True) for field_name in field_list])
# 创建空的 DataFrame
df = spark.createDataFrame(values_batch, schema)
# 显示 DataFrame 结构
df.printSchema()
df.write.mode("overwrite").saveAsTable("{Hive_DB}.{Hive_Name}".format(Hive_DB=sys.argv[5], Hive_Name=sys.argv[6]))
spark.stop()
time_end = time.time()
spark-submit
spark-submit --num-executors 1 --executor-memory 512M --executor-cores 1 --deploy-mode client --queue root.users.root  ./mongo.py 127.0.0.1 27017 test_db test_table tmp_can_delete_database mongo_test test1 test1
1.2 mongo-spark-connector

生产环境不方便使用,亲测各种报错

from pyspark.sql import SparkSession
my_spark = SparkSession \.builder \.appName("myApp") \.config("spark.mongodb.read.connection.uri", "mongodb://127.0.0.1/intca2.tweetsIntca2") \.config("spark.mongodb.write.connection.uri", "mongodb://127.0.0.1/intca2.tweetsIntca2") \.config('spark.jars.packages', 'org.mongodb.spark:mongo-spark-connector_2.11:2.2.2') \
.getOrCreate()
df = spark.read.format("com.mongodb.spark.sql.DefaultSource").load()
df.write.mode("overwrite").saveAsTable("{Hive_DB}.{Hive_Name}")
spark.stop()
二、Scala
2.1 pom.xml
<dependencies><dependency><groupId>com.thoughtworks.paranamer</groupId><artifactId>paranamer</artifactId><version>2.8</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-hive_2.11</artifactId><version>2.4.0</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.11</artifactId><version>2.4.0</version></dependency><dependency><groupId>org.mongodb.spark</groupId><artifactId>mongo-spark-connector_2.12</artifactId><version>2.4.0</version></dependency></dependencies>
2.2 代码
package org.spark
import org.apache.spark.sql.{SaveMode, SparkSession}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
object Main {private def insert(host:String,port:String,db:String,collection:String,Hive_DB:String,Hive_Name:String,username:Object=None,password:Object=None): Unit = {var url=""//判断username 和密码是否为空if(username==None || password==None){url=s"mongodb://${host}:${port}/${db}.${collection}"}else{url=s"mongodb://${username}:${password}@${host}:${port}/${db}.${collection}"}val spark = SparkSession.builder().appName("mongo").config("spark.mongodb.input.uri", url).getOrCreate()val df=spark.read.format("com.mongodb.spark.sql.DefaultSource").load()df.show(5)val string_df=df.select(df.columns.map(c => col(c).cast("string").alias(c)): _*)string_df.write.mode(SaveMode.Overwrite).saveAsTable(s"${Hive_DB}.${Hive_Name}")spark.stop()}def main(args: Array[String]): Unit = {val start_time = System.currentTimeMillis()println(args(0),args(1),args(2),args(3),args(4),args(5),args(6),args(7))insert(args(0),args(1),args(2),args(3),args(4),args(5),args(6),args(7))
//    insert("127.0.0.1","27017","mongo_db","mongo_collection","hive_db","hive_table","mongo_username","mongo_password")val end_time = System.currentTimeMillis()val duration = (end_time - start_time) / 1000.0println(s"程序运行时间为 $duration 秒")}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/615932.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

功效产品如何做好营销?媒介盒子解答

功能性产品目前的营销痛点就在于宣传夸张导致用户信任度降低&#xff0c;尤其是健康类产品&#xff0c;作为消费者&#xff0c;对此类产品大多持观望态度&#xff0c;但媒介盒子作为提供品牌宣传服务的团队&#xff0c;想和大家聊聊&#xff1a;功能性产品除了在功能上进行宣传…

【Java】解决Servlet编程中出现的中文乱码问题

1、引言 前面两篇文章我们讲述了编写Servlet程序的基本步骤和修改一个Servlet程序 【Java】编写一个简单的Servlet程序​​​​​​ 【Java】SmartTomcat的配置及使用 上面两篇文章的示例代码都是使用的全英文&#xff0c;当我们编写中文&#xff0c;发现似乎出了一点点问题…

【100个 Unity实用技能】☀️ | UGUI中 判断屏幕中某个坐标点的位置是否在指定UI区域内

&#x1f3ac; 博客主页&#xff1a;https://xiaoy.blog.csdn.net &#x1f3a5; 本文由 呆呆敲代码的小Y 原创&#xff0c;首发于 CSDN&#x1f649; &#x1f384; 学习专栏推荐&#xff1a;Unity系统学习专栏 &#x1f332; 游戏制作专栏推荐&#xff1a;游戏制作 &…

PCA主成分分析算法

在数据分析中&#xff0c;如果特征太多&#xff0c;或者特征之间的相关性太高&#xff0c;通常可以用PCA来进行降维。比如通过对原有10个特征的线性组合, 我们找出3个主成分&#xff0c;就足以解释绝大多数的方差&#xff0c;该算法在高维数据集中被广泛应用。 算法&#xff08…

【一文搞懂JVM的内存屏障】

要命的问题&#xff1a; 什么是线程的安全性&#xff1f;怎么保证&#xff1f;jvm什么是的内存屏障&#xff1f;他有什么作用&#xff1f; **线程的安全性是指&#xff1a;**指在多线程环境下&#xff0c;多个线程同时访问同一资源时不会产生意外结果或导致数据出错的状态。其…

在线ai扩图是什么?有什么工具?分享3个好用的工具。

在线ai扩图是什么&#xff1f;有什么工具&#xff1f;分享3个好用的工具。 在当今数字化的时代&#xff0c;图像处理成为了我们日常生活和工作中不可或缺的一部分。有时候&#xff0c;我们需要将图像放大以获取更多的细节&#xff0c;但传统的方法往往会导致图像质量的损失。幸…

Invalid bound statement(只有调用IService接口这一层会报错的)

问题描述:controller直接调用实现类可以,但是一旦调用IService这个接口这一层就报错. 找遍了大家都说是xml没对应好,但是我确实都可以一路往下跳,真的对应好了.结果发现是 MapperScan写错了,如下才是对的. MapperScan的作用是不需要在mapper上一直写注解了,只要启动类上写好就放…

无货源电商哪个平台比较适合新手?

我是电商珠珠 近年来电商平台层出不穷&#xff0c;无论是传统平台像是拼多多、淘宝、京东&#xff0c;还是短视频电商平台&#xff1a;快手、抖音小店、视频号小店。 都成为了兼职乃至全职人群心中的香饽饽&#xff0c;有人选择去做拼多多、有人选择去做抖音小店&#xff0c;…

算法训练营Day38

#Java #完全背包 #动态规划 Feeling and experiences&#xff1a; 动态规划&#xff1a;完全背包理论基础 之前学习的是01背包&#xff0c;其特点在于&#xff1a;每个物品都只能取一个 而完全背包则是可以一个物品取多个。 有N件物品和一个最…

高级分布式系统-第6讲 分布式系统的容错性--故障/错误/失效/异常

分布式系统容错性的概念 分布式系统的容错性&#xff1a; 当发生故障时&#xff0c; 分布式系统应当在进行恢复的同时继续以可接受的方式进行操作&#xff0c; 并且可以从部分失效中自动恢复&#xff0c; 且不会严重影响整体性能。 具体包括以下4个方面的内容&#xff1a; 可…

如何将后端带过来的字符串通过‘,’号作为判断依据,分割字符串然后生成数组

在实际开发工程中我们会遇到我们调用后端接口获取图片、文件、视频甚至选择的对象时&#xff0c;如果是这样的&#xff1a; 这种数据类型如果想渲染在html中的话就会很麻烦&#xff0c;我们可以通过","号为切割点将它放入数组中&#xff0c;通过列表进行渲染 由于实…

vue使用elementui select下拉库组件鼠标hover出现下拉框

mounted 生命周期里去监听 鼠标进入和鼠标移出事件 this.$refs.mySelect.addEventListener(mouseenter, function () {this.querySelector(.selectel .el-select-dropdown).style.display block})this.$refs.mySelect.addEventListener(mouseleave, function () {this.querySe…

STM32入门教程-2023版【3-4】总结GPIO使用方法

三、总结GPIO使用方法 总体上来说是比较简单的 首先初始化时钟&#xff0c;然后定义结构体&#xff0c;赋值结构体 GPIO_Mode可以选择那8种输入输出模式&#xff0c;GPIO_Pin选择引脚&#xff0c;可以用按位或的方式同时选中多个引脚,GPIO_Speed选择输出速度&#xff0c;最后使…

全网最全持续集成接口自动化-jmeter+ant+jenkins

ant 批量执行Jmeter 一、环境准备 1、JDK环境&#xff1a;Java Downloads | Oracle 2、ANT环境&#xff1a;Apache Ant - Binary Distributions 3、Jmeter&#xff1a;Apache JMeter - Download Apache JMeter 4、将 jmeter的extras目录中ant-jmeter-1.1.1.jar包拷贝至ant…

c语言中scanf的用法

通过键盘&#xff0c;将数据输入到变量中 #include<stdio.h>int main(void){int a;scanf("%d",&a);printf("%d",a);return 0; }scanf("%d",&a);: 从键盘输入数据的时候&#xff0c;输入的是一个个字符而不是数字等与已经定义好的…

【开发篇】一、内存泄漏的分析工具

文章目录 1、内存泄漏2、解决内存泄漏3、工具一&#xff1a;Top4、工具二&#xff1a;VisualVM5、工具三&#xff1a;阿尔萨斯Arthas6、工具四&#xff1a;Promethus Grafana7、图像分析 1、内存泄漏 一个对象不再使用后&#xff0c;&#xff08;因其从GC Root仍有引用链可达…

2023下半年软考证书什么时候发放?怎么领取?

已经确定领取时间的地区&#xff1a; 广东&#xff1a; 电子版&#xff1a;2024年1月8日上线 纸质版&#xff1a;预计24年2月开始 重庆&#xff1a; 邮寄申领&#xff1a;2024年1月15日0:00-3月1日23:00 现场领取&#xff1a;2024年1月15日-2月7日 贵州&#xff1a; 邮…

vue Element Plus Cascader级联选择器点击标签选中复选框

element-plus原功能 element-plus的Cascader级联选择器点击标签时是不会选中复选框的&#xff0c;我们想要实现点击标签时也能选中复选框这个效果&#xff0c;那么就要用到一些原生的方法 实现效果 mounted() {// Cascader 级联选择器: 点击文本就让它自动点击前面的input就可…

PPT自动化处理

python-pptx模块 可以创建、修改PPT(.pptx)文件非Python标准模块&#xff0c;需要单独安装 在线安装方式 pip install python-pptx 读取slide幻灯片 .slides 获取shape形状 slide.shapes 判断一个shape中是否存在文字 shape.has_text_frame 获取文字框 shape.text_f…