spark sql hbase java_Spark 读写 HBase 的两种方式(RDD、DataFrame)

使用 saveAsHadoopDataset 写入数据

import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor, TableName}

import org.apache.hadoop.hbase.client.{HBaseAdmin, Put, Result}

import org.apache.hadoop.hbase.io.ImmutableBytesWritable

import org.apache.hadoop.hbase.mapreduce.TableInputFormat

//import org.apache.hadoop.hbase.mapreduce.TableOutputFormat

import org.apache.hadoop.hbase.mapred.TableOutputFormat

import org.apache.hadoop.hbase.util.Bytes

import org.apache.hadoop.mapred.JobConf

//import org.apache.hadoop.mapreduce.Job

import org.apache.log4j.{Level, Logger}

import org.apache.spark.sql.SparkSession

/**

* Created by blockchain on 18-9-9 下午3:45 in Beijing.

*/

object SparkHBaseRDD {

def main(args: Array[String]) {

// 屏蔽不必要的日志显示在终端上

Logger.getLogger("org.apache.spark").setLevel(Level.WARN)

val spark = SparkSession.builder().appName("SparkHBase").getOrCreate()

val sc = spark.sparkContext

val tablename = "SparkHBase"

val hbaseConf = HBaseConfiguration.create()

hbaseConf.set("hbase.zookeeper.quorum","localhost") //设置zooKeeper集群地址,也可以通过将hbase-site.xml导入classpath,但是建议在程序里这样设置

hbaseConf.set("hbase.zookeeper.property.clientPort", "2181") //设置zookeeper连接端口,默认2181

hbaseConf.set(TableOutputFormat.OUTPUT_TABLE, tablename)

//初始化job,TableOutputFormat 是 org.apache.hadoop.hbase.mapred 包下的

val jobConf = new JobConf(hbaseConf)

jobConf.setOutputFormat(classOf[TableOutputFormat])

val indataRDD = sc.makeRDD(Array("2,jack,16", "1,Lucy,15", "5,mike,17", "3,Lily,14"))

val rdd = indataRDD.map(_.split(',')).map{ arr=>

/*一个Put对象就是一行记录,在构造方法中指定主键

* 所有插入的数据必须用org.apache.hadoop.hbase.util.Bytes.toBytes方法转换

* Put.addColumn 方法接收三个参数:列族,列名,数据*/

val put = new Put(Bytes.toBytes(arr(0)))

put.addColumn(Bytes.toBytes("cf1"),Bytes.toBytes("name"),Bytes.toBytes(arr(1)))

put.addColumn(Bytes.toBytes("cf1"),Bytes.toBytes("age"),Bytes.toBytes(arr(2)))

(new ImmutableBytesWritable, put)

}

rdd.saveAsHadoopDataset(jobConf)

spark.stop()

}

}

使用 newAPIHadoopRDD 读取数据

import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor, TableName}

import org.apache.hadoop.hbase.client.{HBaseAdmin, Put, Result}

import org.apache.hadoop.hbase.io.ImmutableBytesWritable

import org.apache.hadoop.hbase.mapreduce.TableInputFormat

//import org.apache.hadoop.hbase.mapreduce.TableOutputFormat

import org.apache.hadoop.hbase.mapred.TableOutputFormat

import org.apache.hadoop.hbase.util.Bytes

import org.apache.hadoop.mapred.JobConf

//import org.apache.hadoop.mapreduce.Job

import org.apache.log4j.{Level, Logger}

import org.apache.spark.sql.SparkSession

/**

* Created by blockchain on 18-9-9 下午3:45 in Beijing.

*/

object SparkHBaseRDD {

def main(args: Array[String]) {

// 屏蔽不必要的日志显示在终端上

Logger.getLogger("org.apache.spark").setLevel(Level.WARN)

val spark = SparkSession.builder().appName("SparkHBase").getOrCreate()

val sc = spark.sparkContext

val tablename = "SparkHBase"

val hbaseConf = HBaseConfiguration.create()

hbaseConf.set("hbase.zookeeper.quorum","localhost") //设置zooKeeper集群地址,也可以通过将hbase-site.xml导入classpath,但是建议在程序里这样设置

hbaseConf.set("hbase.zookeeper.property.clientPort", "2181") //设置zookeeper连接端口,默认2181

hbaseConf.set(TableInputFormat.INPUT_TABLE, tablename)

// 如果表不存在则创建表

val admin = new HBaseAdmin(hbaseConf)

if (!admin.isTableAvailable(tablename)) {

val tableDesc = new HTableDescriptor(TableName.valueOf(tablename))

admin.createTable(tableDesc)

}

//读取数据并转化成rdd TableInputFormat 是 org.apache.hadoop.hbase.mapreduce 包下的

val hBaseRDD = sc.newAPIHadoopRDD(hbaseConf, classOf[TableInputFormat],

classOf[ImmutableBytesWritable],

classOf[Result])

hBaseRDD.foreach{ case (_ ,result) =>

//获取行键

val key = Bytes.toString(result.getRow)

//通过列族和列名获取列

val name = Bytes.toString(result.getValue("cf1".getBytes,"name".getBytes))

val age = Bytes.toString(result.getValue("cf1".getBytes,"age".getBytes))

println("Row key:"+key+" Name:"+name+" Age:"+age)

}

admin.close()

spark.stop()

}

}

Spark DataFrame 通过 Phoenix 读写 HBase

需要添加的依赖如下:

org.apache.phoenix

phoenix-core

${phoenix.version}

org.apache.phoenix

phoenix-spark

${phoenix.version}

下面老规矩,直接上代码。

package com.ai.spark

import org.apache.log4j.{Level, Logger}

import org.apache.spark.sql.{SaveMode, SparkSession}

/**

* Created by blockchain on 18-9-9 下午8:33 in Beijing.

*/

object SparkHBaseDataFrame {

def main(args: Array[String]) {

// 屏蔽不必要的日志显示在终端上

Logger.getLogger("org.apache.spark").setLevel(Level.WARN)

val spark = SparkSession.builder().appName("SparkHBaseDataFrame").getOrCreate()

val url = s"jdbc:phoenix:localhost:2181"

val dbtable = "PHOENIXTEST"

//spark 读取 phoenix 返回 DataFrame 的 第一种方式

val rdf = spark.read

.format("jdbc")

.option("driver", "org.apache.phoenix.jdbc.PhoenixDriver")

.option("url", url)

.option("dbtable", dbtable)

.load()

rdf.printSchema()

//spark 读取 phoenix 返回 DataFrame 的 第二种方式

val df = spark.read

.format("org.apache.phoenix.spark")

.options(Map("table" -> dbtable, "zkUrl" -> url))

.load()

df.printSchema()

//spark DataFrame 写入 phoenix,需要先建好表

df.write

.format("org.apache.phoenix.spark")

.mode(SaveMode.Overwrite)

.options(Map("table" -> "PHOENIXTESTCOPY", "zkUrl" -> url))

.save()

spark.stop()

}

}

参考链接:

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/499206.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

java 双重检查锁 有序_Java中的双重检查锁(double checked locking)

1 public classSingleton {2 private staticSingleton uniqueSingleton;34 privateSingleton() {5 }67 publicSingleton getInstance() {8 if (null uniqueSingleton) {9 uniqueSingleton newSingleton();10 }11 returnuniqueSingleton;12 }13 }在多线程的情况下,这…

centos 7 nginx hhvm mysql_CentOS 7 编译安装 HHVM 3.4.1

博客彻底迁移到了DigitalOcean上,无奈囊中羞涩,只能开个512MB内存的Droplet。。。只不过,小内存也可以玩出花样。之前就一直想尝试用一下HHVM,无奈编译实在是太麻烦,一直偷懒没使用。这篇文章就用来记录期间编译安装的…

java 耦合度_Java第三十八天,Spring框架系列,使用工厂模式降低程序耦合度

一、什么是耦合程序之间的依赖关系:①类之间的依赖②方法之间的依赖注意:不可能完全的消除依赖关系,而只能是降低程序之间的依赖关系二、解耦降低程序之间的依赖关系:1.解决类之间的依赖关系:①通过读取配置文件来获取…

Java 实现sha_Java实现SHA算法的方法详解

本文实例讲述了Java实现SHA算法的方法。分享给大家供大家参考,具体如下:一 简介安全散列算法固定长度摘要信息二 SHA算法SHA-1、SHA-2(SHA-224、SHA-256、SHA384、SHA-512)三 SHA算法实现package com.imooc.security.sha;import java.security.MessageDi…

jsp和java使用值_如何将表单的值从jsp发送到Java

我有一个包含30个不同字段的表格。将它们全部传递给控制器​​时,需要为每个属性都具有一个属性,以及一对getter,setter。我将使表单字段作为一个对象并将该对象发送到控制器。我正在使用以下代码 , 但有人建议从jsp调用java方法并…

linux mysql 脚本带参数_Linux下用SHELL脚本执行带输入输出参数的ORACLE存储过程并得到结果...

存储过程 myproc(in_num in number,out_num out number)sql脚本模板mysql.sql{var nret number;execute :nret : 0;--初始化call存储过程 myproc(in_num in number,out_num out number)sql脚本模板mysql.sql{var nret number;execute :nret : 0;--初始化call myproc(in_code,:n…

mysql 半同步 配置_Mysql 半同步复制配置

以下是配置和监控半同步复制:1. 半同步复制功能以plugin的方式接入MySQL,需要在主库与从库两端同时开启半同步的支持,具体配置如下:On the mastermysql> INSTALL PLUGIN rpl_semi_sync_master SONAME ‘semisync_master.so’;m…

java 窗口线程_Java 窗体与线程问题

展开全部主窗口线程不能阻塞,也就是说应该处于空闲状态。如果阻62616964757a686964616fe4b893e5b19e31333335333638塞,窗口将不能响应用户事件。程序应该类似于这样public class ServerFrame extends JFrame {public ServerFrame() {//设置其他窗口参数/…

MySQL自动建立集合自动分片_1.mongodb初步使用总结

mongoDB2.6使用总结一、准备工作下载java驱动包驱动包下载地址:http://www.doczj.com/doc/3305bc20960590c69ec376c0.html/artifact/org.mongodb/mongo-java-driver mongoDB下载:http://www.doczj.com/doc/3305bc20960590c69ec376c0.html/在线api&#x…

java response.write_@ResponseBody与response.getWriter .write()区别

responseBody注解的作用是将controller的方法返回的对象通过适当的转换器转换为指定的格式之后,写入到response对象的body区,通常用来返回JSON数据或者是XML数据,需要注意的呢,在使用此注解之后不会再走视图处理器,而是…

springcloud 创建子父项目_idea搭建springCloud----搭建父子项目(二)

今天介绍一下 : idea 搭建父子项目父项目:springCloud_ht子项目:eureka_server(注册中心)1-1.新建父项目:1-2 起名称 springCloud_ht1-3 什么都不选,next1-4 :起项目名称:springCloud_ht1-5: 该目录为下图,但是项目为红…

db2 mysql sql server_连接数据库的方法(Oracle DB2 SQL Server MySQL...)

[java]代码库import java.sql.Connection;import java.sql.DriverManager;import java.sql.SQLException;/*** 连接各类数据库的方法*/public class DBConnector {/*** 获得数据库连接** param driverClassName* 连接数据库用到的驱动类的类名* param dbURL* 数据库的URL* para…

格子里输出 java_蓝桥杯-格子中输出-java

/* (程序头部注释开始)* 程序的版权和版本声明部分* Copyright (c) 2016, 广州科技贸易职业学院信息工程系学生* All rights reserved.* 文件名称: 蓝桥杯赛题* 作 者: 彭俊豪* 完成日期: 2016 年 04月 01日* 版 本 号: …

基于java的rsa加密程序_RSA rsa加密程序,rsa java源码和 rsa的jsp Crypt_De algrithms 解密 238万源代码下载- www.pudn.com...

文件名称: RSA下载收藏√ [5 4 3 2 1 ]开发工具: Java文件大小: 169 KB上传时间: 2014-04-23下载次数: 0提 供 者: 姚双奇详细说明:rsa加密程序,rsa java源码和 rsa的jsp-rsa encryption program, rsa java source code and rsa in jsp文件列表(点击…

java jdbc is一个会话_java_JdbcUtilis_单实例

//eg1,没有使用单实例,eg2有package cn.itcast;import java.sql.Connection;import java.sql.DriverManager;import java.sql.ResultSet;import java.sql.SQLException;import java.sql.Statement;public final class JdbcUtils {private static String url "…

蝗虫算法java代码_蝗虫搜索算法 蝗虫算法:蝗虫优化算法是模拟自然界蝗虫种群捕食行为而提出的一 联合开发网 - pudn.com...

蝗虫搜索算法所属分类:其他开发工具:matlab文件大小:347KB下载次数:5上传日期:2020-07-26 16:31:25上 传 者:西柚不加冰说明: 蝗虫算法:蝗虫优化算法是模拟自然界蝗虫种群捕食行为而…

mysql 唯一约束 多字段_mysql多字段唯一约束

MySQL唯一约束(Unique Key)要求该列唯一,允许为空,但只能出现一个空值。唯一约束可以确保一列或者几列不出现重复值。在创建表时设置唯一约束在定义完列之后直接使用 UNIQUE 关键字指定唯一约束,语法规则如下: UNIQUE创建数据表 t…

java中final是修饰符么_Java final修饰符详解

final 在 Java 中的意思是最终,也可以称为完结器,表示对象是最终形态的,不可改变的意思。final 应用于类、方法和变量时意义是不同的,但本质是一样的,都表示不可改变。使用 final 关键字声明类、变量和方法需要注意以下…

java程序输出88的0 1矩阵_《剑指Offer》Java实现--顺时针打印矩阵

题目描述输入一个矩阵,按照从外向里顺时针的顺序打印出每一个数字。测试用例如下图:测试数组思路分析这道题目并不难,关键在于边界控制!每一层遍历均从左上角开始,逐层往里完成遍历。Java代码实现/*** 由外向内顺时针打…

java程序表头出不来_JAVA SWING 表头不显示问题

Cats萌萌说明: JScrollPane 也是组件, 你需要把滚动面板也添加到你的布局里.由于你的布局是绝对布局setLayout(null); 那么你还需要为JScrollPane 设置大小和位置效果图参考代码1234567891011121314151617181920212223import javax.swing.*; public class FrameDemo extends JF…