外部jar包_大数据系列之PySpark读写外部数据库

本文以MySQL和HBASE为例,简要介绍Spark通过PyMySQL和HadoopAPI算子对外部数据库的读写操作


1、PySpark读写MySQL

MySQL环境准备参考“数据库系列之MySQL主从复制集群部署”部分

1.1 PyMySQL和MySQLDB模块

PyMySQL是在Python3.x版本中用于连接MySQL服务器的一个库,Python2中则使用mysqldb,目前在Python 2版本支持PyMySQL。使用以下命令安装PyMysql模块:

pip install PyMySQL

f6c49d25edd70662ec20ea9cc863766a.png

连接到MySQL数据库

import pymysql
# 打开数据库连接
db = pymysql.connect("localhost","testuser","test123","TESTDB" )
# 使用 cursor() 方法创建一个游标对象
cursor cursor = db.cursor()
# 使用 execute() 方法执行 SQL 查询
cursor.execute("SELECT VERSION()")
# 使用 fetchone() 方法获取单条数据.
data = cursor.fetchone() print ("Database version : %s " % data)
# 关闭数据库连接
db.close()
1.2 Spark数据写入MySQL

1)启动MySQL服务并检查

[root@tango-01 bin]# ./mysqld_safe &
[root@tango-01 bin]# 180814 15:50:02 mysqld_safe Logging to '/usr/local/mysql/data/error.log'.
180814 15:50:02 mysqld_safe Starting mysqld daemon with databases from /usr/local/mysql/data
[root@tango-01 bin]# ps -ef|grep mysql

2)创建MySQL表

[root@tango-01 bin]# ./mysql -u root -proot
mysql> use test;
mysql> create table test_spark(id int(4),info char(8),name char(20),sex char(2));
mysql> show tables;
+----------------+
| Tables_in_test |
+----------------+
| test_spark |
+----------------+
2 rows in set (0.00 sec)

3)向MySQL中写入数据

  • 启动ipython notebook

PYSPARK_DRIVER_PYTHON=jupyter PYSPARK_DRIVER_PYTHON_OPTS="notebook"  HADOOP_CONF_DIR=/usr/local/spark/hadoop-2.9.0/etc/hadoop  pyspark
  • 建立MySQL连接,写入数据

from pyspark import SparkContext
from pyspark import SparkConf
import pymysql

rawData=['1,info1,tango,F','2,info2,zhangsan,M']
conn = pymysql.connect(user="root",passwd="xxxxxx",host="192.168.112.10",db="test",charset="utf8")
cursor=conn.cursor()
for i in range(len(rawData)):
retData=rawData[i].split(',')
id = retData[0]
info = retData[1]
name = retData[2]
sex = retData[3]
sql = "insert into test_spark(id,info,name,sex) values('%s','%s','%s','%s')" %(id,info,name,sex)
cursor.execute(sql)
conn.commit()
conn.close()

31023cfa1ba72611e4ab2d7ed945dcf3.png

  • 查询MySQL表数据

21e7a5e14e520aa855dca8f2f1954b52.png

1.3 Spark读取MySQL数据

1)下载mysql-connect-java驱动,并存放在spark目录的jars下

5f7f83ae9e8435a04d07ab13f1957b55.png

2)运行pyspark,执行以下语句

[root@tango-spark01 jars]# pyspark
>>> from pyspark.sql import SQLContext
>>> sqlContext = SQLContext(sc)
>>> dataframe_mysql = sqlContext.read.format("jdbc").\
... options(url="jdbc:mysql://192.168.112.10:3306/test", driver="com.mysql.jdbc.Driver",
... dbtable="test_spark", user="root", password="xxxxxx").load()
>>> dataframe_mysql.show()

5f5740303fe03e317f21c8e0a0b1997d.png

2、PySpark读写HBASE

HBASE环境准备参考“大数据系列之HBASE集群环境部署”部分,HBASE版本为1.2.6,Hadoop版本为2.9.0,Spark版本为2.3.0。注:使用高版本的HBASE如2.1.0出现NotFoundMethod接口问题。

2.1 Spark读写HBASE模块

1)saveAsNewAPIHadoopDataset模块

Spark算子saveAsNewAPIHadoopDataset使用新的Hadoop API将RDD输出到任何Hadoop支持的存储系统,为该存储系统使用Hadoop Configuration对象。saveAsNewAPIHadoopDataset参数说明如下:

saveAsNewAPIHadoopDataset(conf=conf,keyConverter=keyConv,valueConverter=valueConv)
- conf:HBASE的配置文件
- keyConverter:key值的输出类型
- valueConverter:value值的输出类型

2)newAPIHadoopRDD模块

使用新的Hadoop API读取数据,参数如下:

newAPIHadoopRDD(inputFormatClass, keyClass, valueClass, keyConverter=None, valueConverter=None, conf=None, batchSize=0)
- inputFormatClass :Hadoop InputFormat class名称- keyClass:key Writable class名称- valueClass:value Writable class名称- keyConverter:key值的输入类型- valueConverter:value值的输入类型- conf:HBASE的配置文件- batchSize:Python对象作为单个Java对象个数,默认为0,自动选择
2.2 Spark数据写入HBASE

1)启动HBASE服务

[root@tango-spark01 hbase-2.1.0]# ./bin/start-hbase.sh

在Master和Slave服务器使用jps查看HMaster和HRegionServer进程:

[root@tango-spark01 logs]# jps
1859 ResourceManager
1493 NameNode
4249 HMaster
5578 Jps
1695 SecondaryNameNode
[root@tango-spark02 conf]# jps
1767 NodeManager
3880 HRegionServer
1627 DataNode
4814 Jps

注:启动HBASE之前需先启动zookeeper集群和Hadoop集群环境

2)创建HBASE表

hbase(main):027:0> create 'spark_hbase','userinfo'
Created table spark_hbase
Took 2.6556 seconds
=> Hbase::Table - spark_hbase
hbase(main):028:0> put 'spark_hbase','2018001','userinfo:name','zhangsan'
Took 0.0426 seconds
hbase(main):029:0> put 'spark_hbase','2018001','userinfo:age','16'
Took 0.0079 seconds
hbase(main):030:0> put 'spark_hbase','2018001','userinfo:sex','M'

3)配置Spark 在Spark 2.0版本上缺少相关把hbase的数据转换python可读取的jar包,需要另行下载https://mvnrepository.com/artifact/org.apache.spark/spark-examples_2.11/1.6.0-typesafe-001

b86645d5413866b5804cea14b4c8be09.png

  • 上传jar包到spark lib库

[root@tango-spark01 jars]# pwd
/usr/local/spark/spark-2.3.0/jars
[root@tango-spark01 jars]# mkdir hbase
[root@tango-spark01 jars]# cd hbase
[root@tango-spark01 hbase]# ls
spark-examples_2.11-1.6.0-typesafe-001.jar
  • 编辑spark-env.sh,添加以下内容:

export SPARK_DIST_CLASSPATH=$(/usr/local/spark/hadoop-2.9.0/bin/hadoop classpath):$(/usr/local/spark/hbase-2.1.0/bin/hbase classpath):/usr/local/spark/spark-2.3.0/jars/hbase/*
  • 拷贝HBASE下的lib库到spark下

[root@tango-spark01 lib]# pwd
/usr/local/spark/hbase-2.1.0/lib
[root@tango-spark01 lib]# cp -f hbase-* /usr/local/spark/spark-2.3.0/jars/hbase/
[root@tango-spark01 lib]# cp -f guava-11.0.2.jar /usr/local/spark/spark-2.3.0/jars/hbase/
[root@tango-spark01 lib]# cp -f htrace-core-3.1.0-incubating.jar /usr/local/spark/spark-2.3.0/jars/hbase/
[root@tango-spark01 lib]# cp -f protobuf-java-2.5.0.jar /usr/local/spark/spark-2.3.0/jars/hbase/
  • 重启HBASE

[root@tango-spark01 hbase-2.1.0]# ./bin/stop-hbase.sh
[root@tango-spark01 hbase-2.1.0]# ./bin/start-hbase.sh

4)向HBASE中写入数据

  • 启动ipython notebook

PYSPARK_DRIVER_PYTHON=jupyter PYSPARK_DRIVER_PYTHON_OPTS="notebook"  HADOOP_CONF_DIR=/usr/local/spark/hadoop-2.8.3/etc/hadoop  pyspark
  • 配置初始化

zk_host="192.168.112.101"
table = "spark_hbase"
keyConv = "org.apache.spark.examples.pythonconverters.StringToImmutableBytesWritableConverter"
valueConv = "org.apache.spark.examples.pythonconverters.StringListToPutConverter"
conf = {"hbase.zookeeper.quorum": zk_host,"hbase.mapred.outputtable": table,
"mapreduce.outputformat.class": "org.apache.hadoop.hbase.mapreduce.TableOutputFormat",
"mapreduce.job.output.key.class": "org.apache.hadoop.hbase.io.ImmutableBytesWritable",
"mapreduce.job.output.value.class": "org.apache.hadoop.io.Writable"}
  • 初始化数据并序列化转换为RDD

rawData = ['2018003,userinfo,name,Lily','2018004,userinfo,name,Tango','2018003,userinfo,age,22','2018004,userinfo,age,28']
print(rawData)
rddRow = sc.parallelize(rawData).map(lambda x: (x[0:7],x.split(',')))
rddRow.take(5)

eafb3ef31cfe74eee402d58bb8e35fe5.png

  • 调用saveAsNewAPIHadoopDataset模块写入HBASE

rddRow.saveAsNewAPIHadoopDataset(conf=conf,keyConverter=keyConv,valueConverter=valueConv)
  • 查询HBASE中表数据,看到插入数据

b4093e8a7c8c8475407cfc3c8b7006bc.png

2.3 Spark读取HBASE数据

Spark读取HBASE数据使用newAPIHadoopRDD模块

1)配置初始化

host = '192.168.112.101'
table = 'spark_hbase'
conf = {"hbase.zookeeper.quorum": host, "hbase.mapreduce.inputtable": table}
keyConv = "org.apache.spark.examples.pythonconverters.ImmutableBytesWritableToStringConverter"
valueConv = "org.apache.spark.examples.pythonconverters.HBaseResultToStringConverter"

2)调用newAPIHadoopRDD模块读取HBASE数据

hbase_rdd = sc.newAPIHadoopRDD("org.apache.hadoop.hbase.mapreduce.TableInputFormat","org.apache.hadoop.hbase.io.ImmutableBytesWritable","org.apache.hadoop.hbase.client.Result",keyConverter=keyConv,valueConverter=valueConv,conf=conf)
count = hbase_rdd.count()
hbase_rdd.cache()
output = hbase_rdd.collect()
for (k, v) in output:
print (k, v)

输出结果如下:

4a1e4b48555c9329fa9d6e983a69456c.png


参考资料

  1. http://spark.apache.org/docs/latest/api/python/pyspark.html

  2. 数据库系列之MySQL主从复制集群部署

  3. 大数据系列之HBASE集群环境部署

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/521700.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Vue + Spring Boot 项目实战(二):使用 CLI 搭建 Vue.js 项目

文章目录一、安装 Vue CLI二、构建前端项目2.1.创建一个基于 webpack 模板的项目2.2. 安装图解2.3. 项目结构总览2.4. 运行项目2.5. 浏览器验证项目GitHub 地址: https://github.com/Antabot/White-Jotter在动手操作之前,我希望大家已经清楚什么是 “前后…

这本销量突破70W+的 Python 插画式书籍,凭什么这么火?

戳蓝字“CSDN云计算”关注我们哦!最近不管我在互联网看新闻、看视频或者看公众号文章,甚至我在淘宝的时候都会看到一个词,那就是“Python”,我也不知道我对它做了什么,为什么老是给我推送Python?甚至我和程…

【人脸识别终结者】多伦多大学反人脸识别,身份欺骗成功率达99.5%

摘要: 多伦多大学研究人员设计新算法,通过动态地干扰人脸识别工具来保护用户的隐私。结果表明,他们的系统可以将原本可检测到的人脸比例从接近100%降低到0.5%。在一些社交媒体平台,每次你上传照片或视频时,…

php中$stu_by,PHP基础案例二:计算学生年龄

一、需求分析为了方便、准确、快捷的展示学的年龄,系统通常根据学生的出生日期进行自动计算。下面请利用PHP变量分别保存学生的年月日,并通过PHP中的data函数获取当前年月日,最后计算出学生的年龄。例如:出生年月日为:…

iPhone11首批用户评价实属翻车现场;据悉,谷歌宣布投资30亿欧元扩大欧洲数据中心……...

关注并标星星CSDN云计算极客头条:速递、最新、绝对有料。这里有企业新动、这里有业界要闻,打起十二分精神,紧跟fashion你可以的!每周三次,打卡即read更快、更全了解泛云圈精彩newsgo go go 微软说:杀手机器…

阿里七层流量入口 Tengine硬件加速探索之路

摘要: Tengine在软件层面已经有了深度的调试和优化经验,但是在硬件层面,通用处理器(CPU)已经进入了摩尔定律,有了瓶颈。而在业务量突飞猛进的当下,如何利用硬件来提升性能,承载双11等…

新增16条设计规约!阿里巴巴Java开发手册(详尽版)开放下载!

摘要:2018年6月,《阿里巴巴Java开发手册》再次刷新代码规范认知,我们新增了16条设计规约!现免费开放下载,不可错过!《阿里巴巴Java开发手册》是阿里内部Java工程师所遵循的开发规范,涵盖编程规约…

受困于敏捷开发的数据与架构?肿么办?

戳蓝字“CSDN云计算”关注我们哦!译|Lorraine Lo文|Isaac Sacolick来源|InfoWorld网站如今企业强调敏捷开发不是一天两天,但在此过程中敏捷团队通常都会面临的一大挑战就是如何定义以及遵循开发中数据架构的模式和标准这一系列问题。人们之所以认为推动数…

Vue + Spring Boot 项目实战(三):使用IntelliJ IDEA快速生成 Vue.js 项目

文章目录一、使用idea Vue插件生成项目结构1.1. Create New Project1.2. 设置项目地址1.3. 确认项目名称1.4. 项目描述1.5. 作者信息1.6. 编译模板1.7. 安装路由二、安装node_modules模块三、启动项目四、Vue 项目结构分析4.1. 概览4.2. index.html4.3. App.vue4.4. main.js4.5…

沙漠种水稻,88岁的袁隆平又创造奇迹!他参与的“袁米”还有个大计划

摘要: 今年88岁的袁隆平说,他退休之前想看到两件事情:一件是杂交稻大面积亩产1200公斤,另一件是海水稻研发成功。 这几天,关于海水稻的话题再次成为热点。 上周,在两院院士大会上,海水稻和杂交水…

八年磨一剑,阿里云ApsaraDB for HBase2.0正式上线

摘要: ApsaraDB for HBase2.0于2018年6月6日即将正式发布上线啦! 它是基于社区HBase2.0稳定版的升级,也是阿里HBase多年的实践经验和技术积累的持续延伸,全面解决了旧版本碰到的核心问题,并做了很多优化改进&#xff0…

oracle 12.2 启用分片,关于Oracle Sharding,你想知道的都在这里

编辑手记:随着Oracle12.2的发布,Sharding技术也逐渐变得越来越强大,关于Sharding,你所关心的问题的答案,可能都在这里。注:本文来自Oracle FAQ文档翻译相关阅读:什么是Oracle ShardingOracle Sharding是为O…

看完这篇还不会kafka,我跪榴莲!

戳蓝字“CSDN云计算”关注我们哦!Kafka 对外使用 Topic 的概念,生产者往 Topic 里写消息,消费者从中读消息。为了做到水平扩展,一个 Topic 实际是由多个 Partition 组成的,遇到瓶颈时,可以通过增加 Partiti…

Vue + Spring Boot 项目实战(四):前后端结合测试(登录页面开发)

前面我们已经完成了前端项目 DEMO 的构建,这一篇文章主要目的如下: ①打通前后端之间的联系,为接下来的开发打下基础 ②登录页面的开发(无数据库情况下) 文章目录一、后端项目创建1.1. 项目/包名称1.2. web依赖1.3. 运…

探秘盒马机器人餐厅:老外目瞪口呆,90岁奶奶狂点赞

摘要: “菜来了。”一台小车停在餐桌旁,“张开”透明盒盖,瓮声瓮气的机器声提示取餐,它的“肚子”里,是一条热腾腾的清蒸鱼。 刚刚点菜的顾客兴奋地宣布,从下单到拿到菜,只花了11分钟。此时不到…

【漫画】AI小猪的一生

摘要: 6月7日,在上海云栖大会上,阿里云发布ET农业大脑。希望将人工智能与农业深入结合,目前已应用于生猪养殖、苹果及甜瓜种植,具备数字档案生成、全生命周期管理、智能农事分析、全链路溯源等功能。AI可以记录猪完整的…

苹果手机数据线充不了电_手机充电器充不了电?其实4招就能解决

无论科技有多么发达,智能手机怎么样变化,至少就目前使用的智能手机上来看,使用手机就离不开"电"。哪怕是即将开售的折叠手机,也还是需要充电才能够正常使用。所以说,对于是手机而言,"有电&q…

Vue + Spring Boot 项目实战(五):数据库的引入

文章目录一、引入数据库1.安装数据库2. 安装mysql3. MySQL客户端4. .使用 Navicat 创建数据库与表二、使用数据库验证登录1.项目相关配置2.登录控制器2.1. User 类2.2. UserDAO2.3. UserService2.4. LoginController2.5. 测试一、引入数据库 1.安装数据库 官网地址&#xff1…

三招提升数据不平衡模型的性能(附python代码)

摘要: 本文的主要目标是处理数据不平衡问题。文中描述了用来克服数据不平衡问题的三种技术,分别是集成交叉验证、类别权重以及过大预测 。对于深度学习而言,数据集非常重要,但在实际项目中,或多或少会碰见数据不平衡问…

为什么说「中台」程序员将来会最值钱?

戳蓝字“CSDN云计算”关注我们哦!今年在国内互联网公司中真的是很流行中台这个概念,不,是非常流行,是相当流行。作为程序员真的非常有必要了解一下。国内中台概念的由来国内中台的这个概念最早是由阿里巴巴提出来的。据说故事是这…