spark不同结构Dataset合并

1.先将hdfs(或本地)存储的csv文件加载为Dataset
先在本地C盘准备两个csv文件
test.csv

client_id,behives,del,normal_status,cust_type,no_trd_days
7056,zl,1,hy,个人,2
7057,cf,1,hy,个人,12
7058,hs,2,hy,个人,1200
212121,0,sj,hy,个人,1100
212122,1,yx,hy,个人,100
212123,1,ls,hy,个人,100

test1.csv

cust_no,code,hight
sw7056,66661,125
es7057,66661,156
wq7058,66661,165

使用spark 加载,并为其创建视图

Dataset<Row> a = createRealView(session, "file:///C:\\\\test.csv");
Dataset<Row> b = createRealView(session, "file:///C:\\\\test1.csv");
a.createOrReplaceTempView("a");
b.createOrReplaceTempView("b");

由于这两个Dataset的结构和数据量均不一样,若要拼接为一个大的Dataset,可以把每个Dataset多生成一个自动增长编号的列,这里最快的方式是使用每行数据的索引号,则需要转换为RDD操作。

JavaPairRDD<Long, Row> aNewRDD = a.toJavaRDD().zipWithIndex().mapToPair(tuple -> {Long key = tuple._2;Row val = tuple._1;return new Tuple2<>(key, val);});JavaPairRDD<Long, Row> bNewRDD = b.toJavaRDD().zipWithIndex().mapToPair(tuple -> {Long key = tuple._2;Row val = tuple._1;return new Tuple2<>(key, val);});

原本zipWithIndex()之后的格式为JavaPairRDD<Row, Long> 但是我们需要转换一下结构为JavaPairRDD<Long, Row> 为后续的join操作做准备,因为join关联数据使用的是JavaPairRDD<T,M>中的T字段

JavaPairRDD<Long, Tuple2<Row, Row>> joinRDD = aNewRDD.join(bNewRDD);
joinRDD.collect().forEach(x -> System.out.println(x));

打印结果为

(0,([7056,zl,1,hy,个人,2],[sw7056,66661,125]))
(1,([7057,cf,1,hy,个人,12],[es7057,66661,156]))
(2,([7058,hs,2,hy,个人,1200],[wq7058,66661,165]))

那么接下来 需要将Tuple2<Row, Row>中的两个Row合并为一个Row

JavaRDD<Row> rtRDD = joinRDD.map(tuple -> {List<Object> rowContent = new ArrayList<>();List<Object> tp1 = JavaConverters.seqAsJavaList(tuple._2._1.toSeq());List<Object> tp2 = JavaConverters.seqAsJavaList(tuple._2._2.toSeq());rowContent.addAll(tp1);rowContent.addAll(tp2);Seq<Object> rtSeq = JavaConverters.asScalaIteratorConverter(rowContent.iterator()).asScala().toSeq();return Row.fromSeq(rtSeq);});

这样就把JavaPairRDD<Long, Tuple2<Row, Row>>转换为JavaRDD<Row>,然后就需要转为Dataset<Row>
由于程序并不知道Dataset的数据结构,则需要建立结果的结构所需的schema

StructType schema = new StructType(new StructField[]{new StructField("client_id", DataTypes.StringType, false, Metadata.empty()),new StructField("behives", DataTypes.StringType, false, Metadata.empty()),new StructField("del", DataTypes.StringType, false, Metadata.empty()),new StructField("normal_status", DataTypes.StringType, false, Metadata.empty()),new StructField("cust_type", DataTypes.StringType, false, Metadata.empty()),new StructField("no_trd_days", DataTypes.StringType, false, Metadata.empty()),new StructField("cust_no", DataTypes.StringType, false, Metadata.empty()),new StructField("code", DataTypes.StringType, false, Metadata.empty()),new StructField("hight", DataTypes.StringType, false, Metadata.empty())});

然后创建出新的Dataset<Row>

Dataset<Row> dataView = session.createDataFrame(rtRDD, schema);
dataView.show();

最终结果是

+---------+-------+---+-------------+---------+-----------+-------+-----+-----+
|client_id|behives|del|normal_status|cust_type|no_trd_days|cust_no| code|hight|
+---------+-------+---+-------------+---------+-----------+-------+-----+-----+
|     7056|     zl|  1|           hy|     个人|          2| sw7056|66661|  125|
|     7057|     cf|  1|           hy|     个人|         12| es7057|66661|  156|
|     7058|     hs|  2|           hy|     个人|       1200| wq7058|66661|  165|
+---------+-------+---+-------------+---------+-----------+-------+-----+-----+

完整代码:

public void createView(SparkSession session, String portraitPath, String prodPath) {Dataset<Row> a = createRealView(session, "file:///C:\\\\test.csv");Dataset<Row> rowDataset = a.unionAll(a).unionAll(a).orderBy(functions.rand());a.createOrReplaceTempView("a");Dataset<Row> b = createRealView(session, "file:///C:\\\\test1.csv");b.createOrReplaceTempView("b");JavaPairRDD<Long, Row> aNewRDD = a.toJavaRDD().zipWithIndex().mapToPair(tuple -> {Long key = tuple._2;Row val = tuple._1;return new Tuple2<>(key, val);});JavaPairRDD<Long, Row> bNewRDD = b.toJavaRDD().zipWithIndex().mapToPair(tuple -> {Long key = tuple._2;Row val = tuple._1;return new Tuple2<>(key, val);});JavaPairRDD<Long, Tuple2<Row, Row>> joinRDD = aNewRDD.join(bNewRDD);joinRDD.collect().forEach(x -> System.out.println(x));StructType schema = new StructType(new StructField[]{new StructField("client_id", DataTypes.StringType, false, Metadata.empty()),new StructField("behives", DataTypes.StringType, false, Metadata.empty()),new StructField("del", DataTypes.StringType, false, Metadata.empty()),new StructField("normal_status", DataTypes.StringType, false, Metadata.empty()),new StructField("cust_type", DataTypes.StringType, false, Metadata.empty()),new StructField("no_trd_days", DataTypes.StringType, false, Metadata.empty()),new StructField("cust_no", DataTypes.StringType, false, Metadata.empty()),new StructField("code", DataTypes.StringType, false, Metadata.empty()),new StructField("hight", DataTypes.StringType, false, Metadata.empty())});JavaRDD<Row> rtRDD = joinRDD.map(tuple -> {List<Object> rowContent = new ArrayList<>();List<Object> tp1 = JavaConverters.seqAsJavaList(tuple._2._1.toSeq());List<Object> tp2 = JavaConverters.seqAsJavaList(tuple._2._2.toSeq());rowContent.addAll(tp1);rowContent.addAll(tp2);Seq<Object> rtSeq = JavaConverters.asScalaIteratorConverter(rowContent.iterator()).asScala().toSeq();return Row.fromSeq(rtSeq);});Dataset<Row> dataView = session.createDataFrame(rtRDD, schema);dataView.show();}private Dataset<Row> createRealView(SparkSession session, String hdfsPath) {Dataset<Row> wideTableDF = null;try {wideTableDF = session.read().format("csv").option("header", "true").load(hdfsPath);} catch (Exception e) {System.err.println("未找到有效的宽表数据,查找路径为:" + hdfsPath);}return wideTableDF;}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/198765.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

如何入驻抖音本地生活服务商,门槛太高怎么办?

随着抖音本地生活服务市场的逐渐成熟&#xff0c;越来越多平台开始涉及本地生活服务领域&#xff0c;而本地生活服务商成了一个香窝窝&#xff0c;为了保护用户权益和平台生态&#xff0c;对入驻入驻抖音本地生活服务商的条件及审核也越来越严格&#xff0c;这让很多想成为抖音…

ES常用操作语句

ES常用操作语句 注&#xff1a;本文中的操作语句基于ES5.5和7.7的版本&#xff0c;版本不同操作语句上可能有细微差别&#xff0c;如5.5版本有索引类型&#xff0c;7.7版本已废弃&#xff0c;查询不应该带索引类型 新增 # 添加字段&#xff0c;并设置字段类型 PUT /索引/_map…

LeetCode56. 合并区间

&#x1f517;:【贪心算法&#xff0c;合并区间有细节&#xff01;LeetCode&#xff1a;56.合并区间-哔哩哔哩】 class Solution { public:vector<vector<int>> merge(vector<vector<int>>& intervals) {if(intervals.size()0){return intervals;…

【页面】表格展示

展示 Dom <template><div class"srch-result-container"><!--左侧--><div class"left"><div v-for"(item,index) in muneList" :key"index" :class"(muneIndexitem.mm)?active:"click"pa…

GORM gorm.DB 对象剖析

文章目录 1.GORM 简介2.gorm.DB 简介2.1 定义2.2 初始化2.3 查询方法2.4 事务支持2.5 模型关联2.6 钩子&#xff08;Hooks&#xff09;2.7 自定义数据类型 3.为什么不同请求可以共用一个 gorm.DB 对象&#xff1f;4.链式调用与方法5.小结参考文献 1.GORM 简介 GORM 是一个流行…

基于c 实现 FIFO

功能&#xff1a; 1、读和写长度不限制 2、数据操作 和 指针操作分开&#xff08;如先操作数据&#xff0c;再操作指针&#xff09; 适用场景&#xff1a; 单向通信模式&#xff0c;一方写、一方读&#xff0c;可用于任务间单向通信&#xff08;无需锁&#xff09; 如&…

7-HDFS的文件管理

单选题 题目1&#xff1a;下列哪个属性是hdfs-site.xml中的配置&#xff1f; 选项: A fs.defaultFS B dfs.replication C mapreduce.framework.name D yarn.resourcemanager.address 答案&#xff1a;B ------------------------------ 题目2&#xff1a;HDFS默认备份数量&…

fastboot常用命令

fastboot常用命令 显示fastboot设备&#xff1a;fastboot devices 获取手机相关信息&#xff1a;fastboot getvar all 重启手机&#xff1a;fastboot reboot 重启到bootloader&#xff1a;fastboot reboot-bootloader 擦除分区&#xff1a;fastboot erase (分区名) 例&…

代码随想录算法训练营第四十三天 _ 动态规划_1049.最后一块石头的重量II、494.目标和、474.一和零。

学习目标&#xff1a; 动态规划五部曲&#xff1a; ① 确定dp[i]的含义 ② 求递推公式 ③ dp数组如何初始化 ④ 确定遍历顺序 ⑤ 打印递归数组 ---- 调试 引用自代码随想录&#xff01; 60天训练营打卡计划&#xff01; 学习内容&#xff1a; 1049.最后一块石头的重量II 该题…

360公司-2019校招笔试-Windows开发工程师客观题合集解析

360公司-2019校招笔试-Windows开发工程师客观题合集 API无法实现进程间数据的相互传递是PostMessage2.以下代码执行后,it的数据为(异常) std::list<int> temp; std::list<int>::iterator it = temp.begin(); it = --it; 3.API在失败时的返回值跟其他不一样是 …

微信小程序自定义tabBar简易实现

文章目录 1.app.json设置custom为true开启自定义2.根目录创建自定义的tab文件3.app.js全局封装一个设置tabbar选中的方法4.在onshow中使用选中方法最终效果预览 1.app.json设置custom为true开启自定义 2.根目录创建自定义的tab文件 index.wxml <view class"tab-bar&quo…

自动升降压稳压电源模块输入3v~24V输出3.3/4.2/5/9/12V芯片

自动升降压稳压电源模块是一种高效、高稳定性的电源解决方案&#xff0c;广泛应用于各种需要稳定电压输出的场合。该模块采用宽电压低功耗方案&#xff0c;能够自动升降压&#xff0c;适应不同的输入电压范围&#xff0c;同时具有关断功能&#xff0c;确保设备的安全运行。 该电…

Vue 报错error:0308010C:digital envelope routines::unsupported

因为 node.js V17版本中最近发布的OpenSSL3.0, 而OpenSSL3.0对允许算法和密钥大小增加了严格的限制 方法一 windows终端输入 set NODE_OPTIONS--openssl-legacy-provider 方法二 降低node版本&#xff0c;比如16. 方法三 package.json增加如下配置 "scripts":…

想要更高效的文件传输?这些aspera替代方案可以助你一臂之力

随着数字化时代的不断推进&#xff0c;数据传输已成为各行各业、各类企业所必需的核心能力。而在文件传输方面&#xff0c;传统的方式往往面临着诸多问题&#xff0c;例如文件大小限制、传输速度过慢、不稳定、不安全等问题。为此&#xff0c;许多企业开始寻找更可靠、更高效的…

Java大数据开发入门教程:使用Hadoop处理海量数据

引言&#xff1a; 随着互联网的发展和智能设备的普及&#xff0c;数据量的爆炸式增长已成为现实。如何高效地处理和分析这些海量数据成为了当今技术领域的一个重要课题。在大数据领域&#xff0c;Hadoop作为一个开源的分布式计算框架&#xff0c;被广泛应用于海量数据的存储和处…

网工学习10-IP地址

一、IP地址概念 IP地址是一个32位的二进制数&#xff0c;它由网络ID和主机ID两部份组成&#xff0c;用来在网络中唯一的标识的一台计算机。网络ID用来标识计算机所处的网段&#xff1b;主机ID用来标识计算机在网段中的位置。IP地址通常用4组3位十进制数表示&#xff0c;中间用…

XHR 和 Fetch 的区别

网站开发普遍采用前后端分离的模式&#xff0c;数据交互成为了不可或缺的关键环节。在这个过程中&#xff0c;XHR 和 Fetch API 是两种最常见的方法&#xff0c;用于从 Web 服务器获取数据。XHR 是一种传统的数据请求方式&#xff0c;而 Fetch API 则代表了现代 Web 开发的新兴…

scipy笔记:scipy.interpolate.interp1d

1 主要使用方法 class scipy.interpolate.interp1d(x, y, kindlinear, axis-1, copyTrue, bounds_errorNone, fill_valuenan, assume_sortedFalse) 2 主要函数 x一维实数值数组&#xff0c;代表插值的自变量y N维实数值数组&#xff0c;其中沿着插值轴的 y 长度必须等于 x 的…

Linux:使用pv实现执行进度监控

pv全称&#xff1a;Pipe Viewer&#xff0c;通过管道显示数据处理进度的信息 安装 yum install pv -y示例 复制文件 # 显示进度 pv data.sql > ./data-new.sql330MiB 0:00:00 [1.32GiB/s] [>] 100%限制mysql数据导出速率 mysqldump | pv -L10m > data.sql# -L, -…

gitlab注册无中国区电话验证问题

众所周知gitlab对中国区不友好&#xff0c;无法直接注册&#xff0c;页面无法选择86的手机号进行验证码发送。 Google上众多的方案是修改dom&#xff0c;而且时间大约是21年以前。 修改dom&#xff0c;对于现在的VUE、React框架来说是没有用的&#xff0c;所以不用尝试。 直接看…