【大数据学习 | Spark-SQL】SparkSQL读写数据

我们使用sparksql进行编程,编程的过程我们需要创建dataframe对象,这个对象的创建方式我们是先创建RDD然后再转换rdd变成为DataFrame对象。

但是sparksql给大家提供了多种便捷读取数据的方式。

//原始读取数据方式
sc.textFile().toRDD
sqlSc.createDataFrame(rdd,schema)
//更便捷的使用方式
sqlSc.read.text|orc|parquet|jdbc|csv|json
df.write.text|orc|parquet|jdbc|csv|json

write写出存储数据的时候也是文件夹的,而且文件夹不能存在。

  • csv是一个介于文本和excel之间的一种格式,如果是文本打开用逗号分隔的。
  • text文本普通文本,但是这个文本必须只能保存一列内容。

以上两个文本都是只有内容的,没有列的。

  • json是一种字符串结构,本质就是字符串,但是存在kv,例子 {"name":"zhangsan","age":20}

多平台解析方便,带有格式信息。

  • orc格式一个列式存储格式,hive专有的。
  • parquet列式存储,顶级项目

以上都是列式存储问题,优点(1.列式存储,检索效率高,防止冗余查询 2.带有汇总信息,查询特别快 3.带有轻量级索引,可以跳过大部分数据进行检索),他们都是二进制文件,带有格式信息。

jdbc 方式,它是一种协议,只要符合jdbc规范的服务都可以连接,mysql,oracle,hive,sparksql

整体代码:

package com.hainiu.sparkimport org.apache.spark.sql.SQLContext
import org.apache.spark.sql.expressions.Window
import org.apache.spark.{SparkConf, SparkContext}import java.util.Propertiesobject TestMovieWithSql {def main(args: Array[String]): Unit = {//??movie???//1.id  middle=name  last=typeval conf = new SparkConf()conf.setAppName("movie")conf.setMaster("local[*]")conf.set("spark.shuffle.partitions","20")val sc = new SparkContext(conf)val sqlSc = new SQLContext(sc)import sqlSc.implicits._//deal dataval df = sc.textFile("data/movies.txt").flatMap(t => {val strs = t.split(",")val mid = strs(0)val types = strs.reverse.headval name = strs.tail.reverse.tail.reverse.mkString(" ")types.split("\\|").map((mid, name, _))}).toDF("mid", "mname", "type")df.limit(1).show()val df1 = sc.textFile("data/ratings.txt").map(t=>{val strs = t.split(",")(strs(0),strs(1),strs(2).toDouble)}).toDF("userid","mid","score")df1.limit(1).show()import org.apache.spark.sql.functions._val df11 = df.join(df1, "mid").groupBy("userid", "type").agg(count("userid").as("cnt")).withColumn("rn", row_number().over(Window.partitionBy("userid").orderBy($"cnt".desc))).where("rn = 1").select("userid", "type")val df22 = df.join(df1, "mid").groupBy("type", "mname").agg(avg("score").as("avg")).withColumn("rn", row_number().over(Window.partitionBy("type").orderBy($"avg".desc))).where("rn<4").select("type", "mname")val df33 = df11.join(df22, "type")//spark3.1.2?? spark2.x//    df33.write.csv()df33.write.format("csv").save("data/csv")//    df33.write.
//      csv("data/csv")
//    df33.write.json("data/json")//    df33.write.parquet("data/parquet")
//    df33.write.orc("data/orc")
//    val pro = new Properties()
//    pro.put("user","root")
//    pro.put("password","hainiu")
//    df33.write.jdbc("jdbc:mysql://11.99.173.24:3306/hainiu","movie",pro)}
}

为了简化存储的计算方式:

package com.hainiu.sparkimport org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext}object TestSink {def main(args: Array[String]): Unit = {val conf = new SparkConf()conf.setAppName("test sink")conf.setMaster("local[*]")val sc = new SparkContext(conf)val sqlSc = new SQLContext(sc)import sqlSc.implicits._import org.apache.spark.sql.functions._val df = sc.textFile("data/a.txt").map(t=>{val strs = t.split(" ")(strs(0),strs(1),strs(2),strs(3))}).toDF("id","name","age","gender").withColumn("all",concat_ws(" ",$"id",$"name",$"age",$"gender")).select("all")
//    df.write.csv("data/csv")
//    df.write.format("org.apache.spark.sql.execution.datasources.v2.csv.CSVDataSourceV2")
//      .save("data/csv")
//    df.write.parquet("data/parquet")
//    df.write.format("org.apache.spark.sql.execution.datasources.v2.parquet.ParquetDataSourceV2")
//      .save("data/parquet")
//    df.write.format("org.apache.spark.sql.execution.datasources.v2.json.JsonDataSourceV2")
//      .save("data/json")df.write.format("org.apache.spark.sql.execution.datasources.v2.text.TextDataSourceV2").save("data/text")}
}

读取数据代码:

package com.hainiu.sparkimport org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SQLContextimport java.util.Propertiesobject TestReadData {def main(args: Array[String]): Unit = {val conf = new SparkConf()conf.setAppName("movie")conf.setMaster("local[*]")conf.set("spark.shuffle.partitions", "20")val sc = new SparkContext(conf)val sqlSc = new SQLContext(sc)
//    sqlSc.read.text("data/text").show()
//    sqlSc.read.csv("data/csv").show()
//  
//    sqlSc.read.parquet("data/parquet").show()
//    sqlSc.read.json("data/json").show()sqlSc.read.format("org.apache.spark.sql.execution.datasources.v2.text.TextDataSourceV2").load("data/text").show()sqlSc.read.format("org.apache.spark.sql.execution.datasources.v2.csv.CSVDataSourceV2").load("data/csv").show()sqlSc.read.format("org.apache.spark.sql.execution.datasources.v2.json.JsonDataSourceV2").load("data/json").show()sqlSc.read.format("org.apache.spark.sql.execution.datasources.v2.parquet.ParquetDataSourceV2").load("data/parquet").show()sqlSc.read.orc("data/orc").show()val pro = new Properties()pro.put("user","root")pro.put("password","hainiu")sqlSc.read.jdbc("jdbc:mysql://11.99.173.24:3306/hainiu","movie",pro).show()}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/62695.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

React Native学习笔记(三)

一 组件简介 1.1 简介 RN中的核心组件&#xff0c;是对原生组件的封装 原生组件&#xff1a;Android或ios内的组件核心组件&#xff1a;RN中常用的&#xff0c;来自react-native的组件 原生组件 在 Android 开发中是使用 Kotlin 或 Java 来编写视图&#xff1b;在 iOS 开发…

视觉语言动作模型VLA的持续升级:从π0之参考基线Octo到OpenVLA、TinyVLA、DeeR-VLA、3D-VLA

第一部分 VLA模型π0之参考基线Octo 1.1 Octo的提出背景与其整体架构 1.1.1 Octo的提出背景与相关工作 许多研究使用从机器人收集的大量轨迹数据集来训练策略 从早期使用自主数据收集来扩展策略训练的工作[71,48,41,19-Robonet,27,30]到最近探索将现代基于transformer的策略…

C与指针。

目录 1_指针理解 1.1变量的值 1.2变量的地址 1.3指针 1.4取变量的地址 2_分析指针 2.1分析指针变量的要素 2.2根据需求定义指针变量 3_指针的使用 3.1指针对变量的读操作 3.2指针对变量的写操作 4_指针占用空间的大小与位移 4.1指针占用空间的大小 4.2指针的位移…

单片机学习笔记 15. 串口通信(理论)

更多单片机学习笔记&#xff1a;单片机学习笔记 1. 点亮一个LED灯单片机学习笔记 2. LED灯闪烁单片机学习笔记 3. LED灯流水灯单片机学习笔记 4. 蜂鸣器滴~滴~滴~单片机学习笔记 5. 数码管静态显示单片机学习笔记 6. 数码管动态显示单片机学习笔记 7. 独立键盘单片机学习笔记 8…

树莓派5+文心一言 -> 智能音箱

一、简介 效果&#xff1a;运行起来后&#xff0c;可以连续对话 硬件&#xff1a;树莓派5、麦克风、音箱&#xff0c;成本500-1000 软件&#xff1a;snowboy作为唤醒词、百度语音作为语音识别、brain作为指令匹配、百度文心一言作为对话模块、微软的edge-tts语音合成... 二…

SAP SD学习笔记17 - 投诉处理3 - Credit/Debit Memo依赖,Credit/Debit Memo

上一章讲了 请求书&#xff08;发票&#xff09;的取消。 SAP SD学习笔记16 - 请求书的取消 - VF11-CSDN博客 再往上几章&#xff0c;讲了下图里面的返品传票&#xff1a; SAP SD学习笔记14 - 投诉处理1 - 返品处理&#xff08;退货处理&#xff09;的流程以及系统实操&#…

Linux服务器使用JupyterLab

一、JupyterLab的配置 1. conda配置 自行搜索conda安装与配置。 2. 环境创建 &#xff08;1&#xff09;创建环境 conda create -n jupyter python3.10&#xff08;2&#xff09;激活环境 conda activate jupyter&#xff08;3&#xff09;安装jupyter包 pip install -i…

Flutter:页面滚动

1、单一页面&#xff0c;没有列表没分页的&#xff0c;推荐使用&#xff1a;SingleChildScrollView() return Scaffold(backgroundColor: Color(0xffF6F6F6),body: SingleChildScrollView(child: _buildView()) );2、列表没分页&#xff0c;如购物车页&#xff0c;每个item之间…

使用GitZip for github插件下载git仓库中的单个文件

背景&#xff1a;git仓库不知道抽什么疯&#xff0c;下载不了单个文件&#xff0c;点击下载没有反应&#xff0c;遂找寻其他方法&#xff0c;在这里简单记录下。 使用GitZip for github插件下载仓库中的单个文件 1、首先在浏览器安装插件&#xff0c;并确保为打开状态。 2、然…

Unet改进57:在不同位置添加SFHF

本文内容:在不同位置添加CBAM注意力机制 论文简介 由于恶劣的大气条件或独特的降解机制,自然图像会遭受各种退化现象。这种多样性使得为各种恢复任务设计一个通用框架具有挑战性。现有的图像恢复方法没有探索不同退化现象之间的共性,而是侧重于在有限的恢复先验下对网络结构…

数据结构(初阶7)---七大排序法(堆排序,快速排序,归并排序,希尔排序,冒泡排序,选择排序,插入排序)(详解)

排序 1.插入排序2.希尔排序3.冒泡排序4.选择排序(双头排序优化版)5.堆排序6.快速排序1). 双指针法2).前后指针法3).非递归法 7.归并排序1).递归版本(递归的回退就是归并)2).非递归版本(迭代版本) 计算机执行的最多的操作之一就有排序&#xff0c;排序是一项极其重要的技能 接下…

DataWhale—PumpkinBook(TASK07支持向量机)

课程开源地址及相关视频链接&#xff1a;&#xff08;当然这里也希望大家支持一下正版西瓜书和南瓜书图书&#xff0c;支持文睿、秦州等等致力于开源生态建设的大佬✿✿ヽ(▽)ノ✿&#xff09; Datawhale-学用 AI,从此开始 【吃瓜教程】《机器学习公式详解》&#xff08;南瓜…

【Python数据分析五十个小案例】使用自然语言处理(NLP)技术分析 Twitter 情感

博客主页&#xff1a;小馒头学python 本文专栏: Python爬虫五十个小案例 专栏简介&#xff1a;分享五十个Python爬虫小案例 项目简介 什么是情感分析 情感分析&#xff08;Sentiment Analysis&#xff09;是文本分析的一部分&#xff0c;旨在识别文本中传递的情感信息&…

【数据结构与算法】排序算法(上)——插入排序与选择排序

文章目录 一、常见的排序算法二、插入排序2.1、直接插入排序2.2、希尔排序( 缩小增量排序 ) 三、选择排序3.1、直接选择排序3.2、堆排序3.2.1、堆排序的代码实现 一、常见的排序算法 常见排序算法中有四大排序算法&#xff0c;第一是插入排序&#xff0c;二是选择排序&#xff…

Educator头歌:离散数学 - 图论

第1关&#xff1a;图的概念 任务描述 本关任务&#xff1a;学习图的基本概念&#xff0c;完成相关练习。 相关知识 为了完成本关任务&#xff0c;你需要掌握&#xff1a;图的概念。 图的概念 1.一个图G是一个有序三元组G<V,R,ϕ>&#xff0c;其中V是非空顶点集合&am…

oracle RAC各版本集群总结和常用命令汇总

oracle RAC学习 RAC介绍 RAC&#xff1a;高可用集群&#xff0c;负载均衡集群&#xff0c;高性能计算集群 RAC是⼀种⾼可⽤&#xff0c;⾼性能&#xff0c;负载均衡的share-everything的集群 8i:内存融合雏形 内存融合雏形&#xff08;Oracle Parallel Server&#xff09;…

数据资产管理是什么?为什么重要?核心组成部分(分类分级、登记追踪、质量管理、安全合规)、实施方法、未来趋势、战略意义

文章目录 一、引言&#xff1a;数据的新时代二、什么是数据资产管理&#xff1f;2.1 定义2.2 核心功能 三、为什么数据资产管理至关重要&#xff1f;3.1 面对的数据管理挑战 四、数据资产管理的核心组成部分4.1 数据分类与分级4.2 数据资产登记与追踪4.3 数据质量管理4.4 数据安…

C++高阶算法[汇总]

&#xff08;一&#xff09;高精度算法概述 高精度算法是指能够处理超出常规数据类型表示范围的数值的算法。在 C 中&#xff0c;标准数据类型通常有固定的位数和精度限制&#xff0c;而高精度算法可以解决大数运算、金融计算和科学计算等领域的问题。 &#xff08;二&#x…

springboot365高校疫情防控web系统(论文+源码)_kaic

毕 业 设 计&#xff08;论 文&#xff09; 题目&#xff1a;高校疫情防控的设计与实现 摘 要 互联网发展至今&#xff0c;无论是其理论还是技术都已经成熟&#xff0c;而且它广泛参与在社会中的方方面面。它让信息都可以通过网络传播&#xff0c;搭配信息管理工具可以很好地为…

Electron实现打开子窗口加载vue路由指定的组件页面白屏

白屏有两种情况&#xff1a; Vue项目使用的history路由的话就会显示空白&#xff0c;加载不出来路由&#xff0c;也不能跳转路由 这种情况看我上一篇文章Electron vue3 打包之后不能跳转路由-CSDN博客 Electron中已经能正常加载页面跳转路由&#xff0c;但是创建子窗口加载子页…