oracle转sparksql工具化,不使用Sqoop流程,利用CacheManager直接完成SparkSQL数据流直接回写Oracle...

以前都是使用Sqoop来完成数据从生成的hdfs数据存储上来抽取至oracle的数据库:sqoop抽取语句:

sqoop export --connect "jdbc:oracle:thin:@ip:port:sid" --username 用户名 --password 密码 --table sid.表名 --export-dir hdfs://nameservice1/user/XXX(hdfs地址) --fields-terminated-by "\001" --null-non-string '' --null-string '' -m 10;

由于项目需求我们现在要完成在代码中省城所需字段之后,直接回写到oracle中,因为数据量每天都很大,用实例或者List存有很大的局限性,可能会出现内存异常等不可预料的东西,所以我通过缓存器机制来存储数据,然后进行生成结果的临时表直接回写(后面做的hbase接口封装批量提交也比较类似)

废话不多说直接上代码:

1、建立缓存实体

package usi.java.oracle;

/**

@author HK

@date 2011-2-15 下午06:45:57

*/

public class Cache {

private String key;

private Object value;

private long timeOut;

private boolean expired;

public Cache() {

super();

}

public Cache(String key, String value, long timeOut, boolean expired) {

this.key = key;

this.value = value;

this.timeOut = timeOut;

this.expired = expired;

}

public String getKey() {

return key;

}

public long getTimeOut() {

return timeOut;

}

public Object getValue() {

return value;

}

public void setKey(String string) {

key = string;

}

public void setTimeOut(long l) {

timeOut = l;

}

public void setValue(Object object) {

value = object;

}

public boolean isExpired() {

return expired;

}

public void setExpired(boolean b) {

expired = b;

}

}

2、建立缓存控制器

package usi.java.oracle;

import java.util.Date;

import java.util.HashMap;

/**

@author HK

@date 2011-2-15 下午09:40:00

*/

public class CacheManager {

private static HashMap cacheMap = new HashMap();

/**

This class is singleton so private constructor is used.

*/

private CacheManager() {

super();

}

/**

returns cache item from hashmap

@param key

@return Cache

*/

private synchronized static Cache getCache(String key) {

return (Cache)cacheMap.get(key);

}

/**

Looks at the hashmap if a cache item exists or not

@param key

@return Cache

*/

private synchronized static boolean hasCache(String key) {

return cacheMap.containsKey(key);

}

/**

Invalidates all cache

*/

public synchronized static void invalidateAll() {

cacheMap.clear();

}

/**

Invalidates a single cache item

@param key

*/

public synchronized static void invalidate(String key) {

cacheMap.remove(key);

}

/**

Adds new item to cache hashmap

@param key

@return Cache

*/

private synchronized static void putCache(String key, Cache object) {

cacheMap.put(key, object);

}

/**

Reads a cache item's content

@param key

@return

*/

public static Cache getContent(String key) {

if (hasCache(key)) {

Cache cache = getCache(key);

if (cacheExpired(cache)) {

cache.setExpired(true);

}

return cache;

} else {

return null;

}

}

/**

@param key

@param content

@param ttl

*/

public static void putContent(String key, Object content, long ttl) {

Cache cache = new Cache();

cache.setKey(key);

cache.setValue(content);

cache.setTimeOut(ttl + new Date().getTime());

cache.setExpired(false);

putCache(key, cache);

}

/*@modelguid {172828D6-3AB2-46C4-96E2-E72B34264031}/

private static boolean cacheExpired(Cache cache) {

if (cache == null) {

return false;

}

long milisNow = new Date().getTime();

long milisExpire = cache.getTimeOut();

if (milisExpire < 0) { // Cache never expires

return false;

} else if (milisNow >= milisExpire) {

return true;

} else {

return false;

}

}

}

3、建立需要导出数据对象

package usi.java.oracle;

public class TaskAll {

private String mme_eid;

private String mme_editor;

private String entitytype_eid;

private String project_eid;

private String resource_eid;

public String getMme_eid() {

return mme_eid;

}

public void setMme_eid(String mme_eid) {

this.mme_eid = mme_eid;

}

public String getMme_editor() {

return mme_editor;

}

public void setMme_editor(String mme_editor) {

this.mme_editor = mme_editor;

}

public String getEntitytype_eid() {

return entitytype_eid;

}

public void setEntitytype_eid(String entitytype_eid) {

this.entitytype_eid = entitytype_eid;

}

public String getProject_eid() {

return project_eid;

}

public void setProject_eid(String project_eid) {

this.project_eid = project_eid;

}

public String getResource_eid() {

return resource_eid;

}

public void setResource_eid(String resource_eid) {

this.resource_eid = resource_eid;

}

}

5、执行逻辑主体,回写数据,批量提交

package usi.java.oracle;

import java.sql.Connection;

import java.sql.DriverManager;

import java.sql.PreparedStatement;

//import java.sql.ResultSet;

import java.util.List;

import org.apache.spark.SparkConf;

import org.apache.spark.SparkContext;

import org.apache.spark.sql.DataFrame;

import org.apache.spark.sql.Row;

import org.apache.spark.sql.hive.HiveContext;

public class redict_to_171ora {

public static void main(String[] args) {

SparkConf sc = new SparkConf().setAppName("redict_to_171ora");

SparkContext jsc = new SparkContext(sc);

HiveContext hc = new HiveContext(jsc);

String hivesql1="select t.mme_eid,t.mme_editor,t.entitytype_eid,t.project_eid,t.resource_eid from usi_odso.c_taskall t limit 150000";

DataFrame redict_to_171ora= hc.sql(hivesql1);

//redict_to_171ora.registerTempTable("hivesql1");

List collect=redict_to_171ora.javaRDD().collect();

int o=0;

for (Row lists: collect){

TaskAll task=new TaskAll();

task.setMme_eid(lists.getString(0));

task.setMme_editor(lists.getString(1));

task.setEntitytype_eid(lists.getString(2));

task.setProject_eid(lists.getString(3));

task.setResource_eid(lists.getString(4));

CacheManager.putContent(o+"", task, 30000000);

o++;

/* System.out.println(lists.size());

System.out.println(lists.getString(0));

System.out.println(lists.getString(1));

System.out.println(lists.getString(2));

System.out.println(lists.getString(3));

System.out.println(lists.getString(4));*/

}

System.out.println(o);

Connection con = null;// 创建一个数据库连接

PreparedStatement pre = null;// 创建预编译语句对象,一般都是用这个而不用Statement

//ResultSet result = null;// 创建一个结果集对象

try

{

Class.forName("oracle.jdbc.driver.OracleDriver");// 加载Oracle驱动程序

System.out.println("开始尝试连接数据库!");

String url = "jdbc:oracle:" + "thin:@ip:1521:sid";// 127.0.0.1是本机地址,XE是精简版Oracle的默认数据库名

String user = "user";// 用户名,系统默认的账户名

String password = "password";// 你安装时选设置的密码

con = DriverManager.getConnection(url, user, password);// 获取连接

System.out.println("连接成功!");

String sql = "insert into c_taskall_test(mme_eid,mme_editor,entitytype_eid,project_eid,resource_eid) values(?,?,?,?,?)";// 预编译语句,“?”代表参数

pre = con.prepareStatement(sql);// 实例化预编译语句

for(int i=0;i

// for (Row lists: collect){

// String sql = "insert into c_taskall_test(mme_eid,mme_editor,entitytype_eid,project_eid,resource_eid) values('"+task.getMme_eid()+"','"+task.getMme_editor()+"','"+task.getEntitytype_eid()+"','"+task.getProject_eid()+"','"+task.getResource_eid()+"')";// 预编译语句,“?”代表参数

// pre.setString(1, "三星");// 设置参数,前面的1表示参数的索引,而不是表中列名的索引

TaskAll task=(TaskAll) CacheManager.getContent(""+i).getValue();

pre.setString(1, task.getMme_eid());

pre.setString(2, task.getMme_editor());

pre.setString(3, task.getEntitytype_eid());

pre.setString(4, task.getProject_eid());

pre.setString(5, task.getResource_eid());

pre.addBatch();

if(i%20000==0){//可以设置不同的大小;如50,100,500,1000等等

pre.executeBatch();

con.commit();

pre.clearBatch();

// System.out.println("i的值"+i);

}

// result = pre.executeQuery();// 执行查询,注意括号中不需要再加参数

}

pre.executeBatch();

con.commit();

pre.clearBatch();

// System.out.println("i的值"+i);

/* if (result != null)

result.close();*/

if (pre != null)

pre.close();

/* while (result.next())

// 当结果集不为空时

System.out.println("usernum:" + result.getString("usernum") + "flow:"

+ result.getString("flow"));*/

}

catch (Exception e)

{

e.printStackTrace();

}

finally

{

try

{

// 逐一将上面的几个对象关闭,因为不关闭的话会影响性能、并且占用资源

// 注意关闭的顺序,最后使用的最先关闭

/* if (result != null)

result.close();*/

if (pre != null)

pre.close();

if (con != null)

con.close();

//System.out.println("数据库连接已关闭!");

}

catch (Exception e)

{

e.printStackTrace();

}

}

}

}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/551131.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

oracle+行换列,Oracle的数据表中行转列与列转行的操作实例讲解

行转列一张表查询结果为--行转列select years,(select amount from Tb_Amount as A where month1 and A.yearsTb_Amount.years)as m1,(select amount from Tb_Amount as A where month2 and A.yearsTb_Amount.years)as m2,(select amount from Tb_Amount as A where month3 and…

Oracle和sql语言,SQL语言的四种类型和ORACLE运算符

数据定义语言(DDL)数据操作语言(DML)数据控制语言(DCL)事务控制语言(TCL)Data Definition Language(DDL)DDL使我们有能力创建或删除表格。也可以定义索引(键)&#xff0c;规定表之间的链接&#xff0c;以及施加表间的约束。CREATE DATABASE - 创建新数据库ALTER DATABASE - 修改…

oracle的insert语句clob,.Net 操作 Oracle CLOB类型字段 INSERT 超长数据

如果仅仅在数据库中设置了类型为 CLOB 字段类型&#xff0c;使用普通的 INSERT 语句直接写入数据到数据库的话&#xff0c;它依然会将其视为 VARCHAR 类型数据&#xff0c;并最大长度为 4000 字符。超过该长度会报出字符串超长&#xff0c;写入数据失败的错误。使用 Dapper 处理…

php阅读器开发,微信小程序阅读器的简单实例开发

这篇文章主要介绍微信小程序阅读器的简单实例开发的相关资料,需要的朋友可以参考下今天和朋友聊天说到小程序&#xff0c;然后看在看书&#xff0c;然后我们就弄了个小读书的demo&#xff0c;然后现在分享一下。一、先来上图&#xff1a;首先先说下边的tabBar&#xff0c;项目采…

oracle存储返回sql查询,如何做才能使record类型和table类型存储查询语句返回的多条记录?...

CREATE OR REPLACE PROCEDURE Zxt_type_Study Is/*type 的使用*/Testrow1 Test%ROWTYPE;TYPE Aa IS TABLE OF Test%Rowtype;Testrow Aa : Aa();TYPE Bb IS TABLE OF Test.ac%Type;Cc Bb;Testrow2 Test%ROWTYPE;t Test.Ac%TYPE;BEGINTestrow2.Ac : a3;Testrow…

oracle10g检测未通过,win64bit安装oracle 10g版本检查未通过解决 提示要求的结果: 5.0,5.1,5.2,6.0 之一 实际结果: 6.1...

在WIN7上安装oracle 10g时&#xff0c;提示如下信息&#xff1a;正在检查操作系统要求...要求的结果: 5.0,5.1,5.2,6.0 之一实际结果: 6.1检查完成。此次检查的总体结果为: 失败 <<<<问题: Oracle Database 10g 未在当前操作系统中经过认证。建议案: 确保在正确的平…

php对扑克牌进行排序,C#代码实现扑克牌排序的几种方式

扑克牌游戏&#xff0c;总是能用到很多的手牌排序&#xff0c;总结了几种方式供参考&#xff0c;顺便记录一下方便以后使用。我做的这个是由(1-13:黑桃A-K || 14 - 26:红桃 || 27 - 39&#xff1a;梅花 || 39 - 52 : 方片 || 53.54&#xff1a;小王.大王)表示的一副扑克牌&…

oracle10g导入dmp文件恢复,oracle 10g 恢复dmp文件。

1. 在winxp下&#xff0c;安装10g&#xff0c;默认选择&#xff0c;一路ok。(安装前自检出现dhcp警告&#xff0c;可直接忽略)2.命令行&#xff0c;在xp下&#xff0c;输入sqlplus&#xff0c;即可启动&#xff0c;登陆用 sqlplus / as sysdba 用管理员登陆3.在恢复oracle时&am…

js十秒没有点击怎么判断_MAC口红怎么判断真假?没批号就没有生产日期,网友:品控太差...

一般来说&#xff0c;MAC的口红产品&#xff0c;底部都会有三位编码&#xff0c;这是该款口红的批号。因像MAC这类国外美妆护肤品牌&#xff0c;是不会将生产日期明确的标注在产品上&#xff0c;都是将其编码成批号&#xff0c;在标注在外包装或者产品的侧面、底部等边缘位置。…

剪辑内核linux,Linux01-Linux编辑内核定制属于自己的内核49

一、编译内核相关命令1、重装initrd文件命令&#xff1a;mkinitrd&#xff1a;creates initial ramdisk p_w_picpaths forpreloading modules格式:mkinitrd initrd文件路径 内核版本号,如:mkinitrd/boot/initrd-uname -r.img uname -r2、I/O处理命令a、命令格式说明2${parame…

go run main.go 参数_介绍一款Go项目热编译工具gowatch

使用场景在golang项目编写过程中&#xff0c;需要对项目不断的进行构建go build并调试以快速发现问题&#xff0c;而我们每次进行手动的进行go build又太重复&#xff0c;所以写了这么一个工具gowatch来实时的监听文件的改动并编译运行&#xff0c;大大提升开发效率。快速入门安…

linux 服务端口查询,linux 怎么查看服务和端口

(1)查看本机关于IPTABLES的设置情况[roottp ~]# iptables -L -nChain INPUT (policy ACCEPT)target prot opt source destinationChain FORWARD (policy ACCEPT)target prot opt source destinationChain OUTPUT (policy ACCEPT)target prot opt source destinationChain RH-Fi…

python 捕获鼠标点击事件,在Python中的wx.Frame外部捕获鼠标事件

In Python using wxPython, how can I set the transparency and size of a window based on the proximity of the mouse relative to the applications window, or frame?Eg. similar to a hyperbolic zoom, or The Dock in MAC OS X? I am trying to achieve this effect …

linux cordova安装教程,cordova搭建环境

cordova搭建环境一、配置java环境变量1.安装JDK 选择安装目录 安装过程中会出现两次 安装提示 。第一次是安装 jdk &#xff0c;第二次是安装 jre 。建议两个都安装在同一个java文件夹中的不同文件夹中。(不能都安装在java文件夹的根目录下&#xff0c;jdk和jre安装在同一文件夹…

安卓rpg绅士游戏资源_海贼无双3(动作游戏)——电脑安卓单机游戏下载资源分享...

点击上方蓝字关注我们01游戏简介&#xff1a;游戏名称&#xff1a;海贼无双3其它名称&#xff1a;One Piece - Pirate Warriors 3游戏类型&#xff1a;动作游戏开发发行&#xff1a;Omega Force游戏平台&#xff1a;PC整理时间&#xff1a;2020-04-25官方网址&#xff1a;http:…

linux分区始柱号,找到了linux分区顺序错乱修复方法

该楼层疑似违规已被系统折叠 隐藏此楼查看此楼我之前安装ARCH时 因为自带的CFDISK工具太烂使用了主流的FDISK和PARTED 都远远不如DISKPART满意 功能弱不说最重要的就是莫名奇妙的错误超级多 错误提示还少甚至没有 让人郁闷到极限尤其是parted 因为没光驱 硬盘安装 之前又是WIND…

11210怎么等于24_想要消耗100大卡热量,怎么做才最简单?

减肥&#xff0c;最根本的原理就是制造热量差&#xff0c;从饮食上限制热量的摄入&#xff0c;调理体质让基础代谢更高&#xff0c;运动来进一步增加消耗&#xff0c;当热量差达到7000大卡左右的时候&#xff0c;就能瘦下来1kg的纯脂肪&#xff01;美食当前&#xff0c;自然要先…

连接linux工具Mtr,Linux常用网络工具:路由扫描之mtr

除了上一篇《Linux常用网络工具&#xff1a;路由扫描之traceroute》介绍的traceroute之外&#xff0c;一般Linux还内置了另一个常用的路由扫描工具mtr。mtr在某些方面比traceroute更好用&#xff0c;它可以实时显示经过的每一跳路由的信息&#xff0c;并不断进行探测。tracerou…

redis rua解决库存问题_【150期】面试官:Redis的各项功能解决了哪些问题?

点击上方“Java面试题精选”&#xff0c;关注公众号面试刷图&#xff0c;查缺补漏>>号外&#xff1a;往期面试题&#xff0c;10篇为一个单位归置到本公众号菜单栏->面试题&#xff0c;有需要的欢迎翻阅阶段汇总集合&#xff1a;一百期面试题汇总先看一下Redis是一个什…

启动盘Linux windows,Linux 中创建 USB 启动盘来拯救 Windows 用户

人们经常要求我帮助他们恢复被锁死或损坏的 Windows 电脑。有时&#xff0c;我可以使用 Linux USB 启动盘来挂载 Windows 分区&#xff0c;然后从损坏的系统中传输和备份文件。有的时候&#xff0c;客户丢失了他们的密码或以其他方式锁死了他们的登录账户凭证。解锁账户的一种方…