SparkSQL数据的加载与保存

1 读取和保存文件

SparkSQL读取和保存的文件一般为三种,JSON文件、CSV文件和列式存储的文件,同时可以通过添加参数,来识别不同的存储和压缩格式。

1.1 CSV文件

1)代码实现

package com.atguigu.sparksql;

import com.atguigu.sparksql.Bean.User;

import org.apache.spark.SparkConf;

import org.apache.spark.api.java.function.MapFunction;

import org.apache.spark.sql.*;

public class Test06_CSV {

    public static void main(String[] args) throws ClassNotFoundException {

        //1. 创建配置对象

        SparkConf conf = new SparkConf().setAppName("sparksql").setMaster("local[*]");

        //2. 获取sparkSession

        SparkSession spark = SparkSession.builder().config(conf).getOrCreate();

        //3. 编写代码

        DataFrameReader reader = spark.read();

        // 添加参数  读取csv

        Dataset<Row> userDS = reader

                .option("header", "true")//默认为false 不读取列名

                .option("sep",",") // 默认为, 列的分割

                // 不需要写压缩格式  自适应

                .csv("input/user.csv");

        userDS.show();

        // 转换为user的ds

        // 直接转换类型会报错  csv读取的数据都是string

//        Dataset<User> userDS1 = userDS.as(Encoders.bean(User.class));

        userDS.printSchema();

        Dataset<User> userDS1 = userDS.map(new MapFunction<Row, User>() {

            @Override

            public User call(Row value) throws Exception {

                return new User(Long.valueOf(value.getString(0)), value.getString(1));

            }

        }, Encoders.bean(User.class));

        userDS1.show();

        // 写出为csv文件

        DataFrameWriter<User> writer = userDS1.write();

        writer.option("header",";")

                .option("header","true")

//                .option("compression","gzip")// 压缩格式

                // 写出模式

                // append 追加

                // Ignore 忽略本次写出

                // Overwrite 覆盖写

                // ErrorIfExists 如果存在报错

                .mode(SaveMode.Append)

                .csv("output");

        //4. 关闭sparkSession

        spark.close();

    }

}

1.2 JSON文件

package com.atguigu.sparksql;

import com.atguigu.sparksql.Bean.User;

import org.apache.spark.SparkConf;

import org.apache.spark.sql.*;

public class Test07_JSON {

    public static void main(String[] args) {

        //1. 创建配置对象

        SparkConf conf = new SparkConf().setAppName("sparksql").setMaster("local[*]");

        //2. 获取sparkSession

        SparkSession spark = SparkSession.builder().config(conf).getOrCreate();

        //3. 编写代码

        Dataset<Row> json = spark.read().json("input/user.json");

        // json数据可以读取数据的数据类型

        Dataset<User> userDS = json.as(Encoders.bean(User.class));

        userDS.show();

        // 读取别的类型的数据也能写出为json

        DataFrameWriter<User> writer = userDS.write();

        writer.json("output1");

        //4. 关闭sparkSession

        spark.close();

    }

}

1.3 Parquet文件

列式存储的数据自带列分割。

package com.atguigu.sparksql;

import org.apache.spark.SparkConf;

import org.apache.spark.sql.Dataset;

import org.apache.spark.sql.Row;

import org.apache.spark.sql.SparkSession;

public class Test08_Parquet {

    public static void main(String[] args) {

        //1. 创建配置对象

        SparkConf conf = new SparkConf().setAppName("sparksql").setMaster("local[*]");

        //2. 获取sparkSession

        SparkSession spark = SparkSession.builder().config(conf).getOrCreate();

        //3. 编写代码

        Dataset<Row> json = spark.read().json("input/user.json");

        // 写出默认使用snappy压缩

//        json.write().parquet("output");

        // 读取parquet 自带解析  能够识别列名

        Dataset<Row> parquet = spark.read().parquet("output");

        parquet.printSchema();

        //4. 关闭sparkSession

        spark.close();

    }

}

2 与MySQL交互

1)导入依赖

<dependency>

    <groupId>mysql</groupId>

    <artifactId>mysql-connector-java</artifactId>

    <version>5.1.27</version>

</dependency>

2)从MySQL读数据

package com.atguigu.sparksql;

import org.apache.spark.SparkConf;

import org.apache.spark.sql.Dataset;

import org.apache.spark.sql.Row;

import org.apache.spark.sql.SparkSession;

import java.util.Properties;

public class Test09_Table {

    public static void main(String[] args) {

        //1. 创建配置对象

        SparkConf conf = new SparkConf().setAppName("sparksql").setMaster("local[*]");

        //2. 获取sparkSession

        SparkSession spark = SparkSession.builder().config(conf).getOrCreate();

        //3. 编写代码

        Dataset<Row> json = spark.read().json("input/user.json");

        // 添加参数

        Properties properties = new Properties();

        properties.setProperty("user","root");

        properties.setProperty("password","000000");

//        json.write()

//                // 写出模式针对于表格追加覆盖

//                .mode(SaveMode.Append)

//                .jdbc("jdbc:mysql://hadoop102:3306","gmall.testInfo",properties);

        Dataset<Row> jdbc = spark.read().jdbc("jdbc:mysql://hadoop102:3306", "gmall.testInfo", properties);

        jdbc.show();

        //4. 关闭sparkSession

        spark.close();

    }

}

3 与Hive交互

SparkSQL可以采用内嵌Hive(spark开箱即用的hive),也可以采用外部Hive。企业开发中,通常采用外部Hive。

3.1 Linux中的交互

1)添加MySQL连接驱动到spark-yarn的jars目录

[atguigu@hadoop102 spark-yarn]$ cp /opt/software/mysql-connector-java-5.1.27-bin.jar /opt/module/spark-yarn/jars

2)添加hive-site.xml文件到spark-yarn的conf目录

[atguigu@hadoop102 spark-yarn]$ cp /opt/module/hive/conf/hive-site.xml /opt/module/spark-yarn/conf

3)启动spark-sql的客户端即可

[atguigu@hadoop102 spark-yarn]$  bin/spark-sql --master yarn

spark-sql (default)> show tables;

3.2 IDEA中的交互

1)添加依赖

<dependencies>

    <dependency>

       <groupId>org.apache.spark</groupId>

       <artifactId>spark-sql_2.12</artifactId>

       <version>3.1</version>

    </dependency>

    <dependency>

       <groupId>mysql</groupId>

       <artifactId>mysql-connector-java</artifactId>

       <version>5.1.27</version>

    </dependency>

    <dependency>

       <groupId>org.apache.spark</groupId>

       <artifactId>spark-hive_2.12</artifactId>

       <version>3.1</version>

    </dependency>

    <dependency>

       <groupId>org.projectlombok</groupId>

       <artifactId>lombok</artifactId>

       <version>1.18.22</version>

    </dependency>

</dependencies>

2)拷贝hive-site.xml到resources目录(如果需要操作Hadoop,需要拷贝hdfs-site.xml、core-site.xml、yarn-site.xml)

3)代码实现

package com.atguigu.sparksql;

import org.apache.spark.SparkConf;

import org.apache.spark.sql.SparkSession;

public class Test10_Hive {

    public static void main(String[] args) {

        System.setProperty("HADOOP_USER_NAME","atguigu");

        //1. 创建配置对象

        SparkConf conf = new SparkConf().setAppName("sparksql").setMaster("local[*]");

        //2. 获取sparkSession

        SparkSession spark = SparkSession.builder()

                .enableHiveSupport()// 添加hive支持

                .config(conf).getOrCreate();

        //3. 编写代码

        spark.sql("show tables").show();

        spark.sql("create table user_info(name String,age bigint)");

        spark.sql("insert into table user_info values('zhangsan',10)");

        spark.sql("select * from user_info").show();

        //4. 关闭sparkSession

        spark.close();

    }

}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/142576.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

vscode+python开发之虚拟环境和解释器切换

需求情景&#xff1a; 现在我们要开发多个项目比如&#xff1a;项目A&#xff0c;项目B、项目C&#xff0c;他们每个项目需要依赖不同的库。每个项目依赖的解释器也不一样怎么办&#xff1f; 项目A&#xff1a;需要在python3.7环境运行 依赖aadd3.2库 项目B、需要在python3.11…

C/C++轻量级并发TCP服务器框架Zinx-框架开发001: 读取标准输入,回显到标准输出

文章目录 完整代码实现参考-非项目使用项目使用的代码 - 乱-但是思路与上面的相同创建Kernel类添加删除修改epoll&#xff0c;才能写run方法创建stdin_Channel类在Kernel类中实现run方法 完整代码实现参考-非项目使用 #include <errno.h> #include <signal.h> #in…

2024上海国际智能驾驶技术展览会(自动驾驶展)

2024上海国际智能驾驶技术展览会 2024 Shanghai International Autonomous driving Expo 时间&#xff1a;2024年3月26-28日 地点&#xff1a;上海跨国采购会展中心 随着科技的飞速发展&#xff0c;智能驾驶已经成为了汽车行业的重要趋势。在这个时代背景下&#xff0c;汽车不…

Ansys Lumerical | 用于增强现实系统的表面浮雕光栅

在本示例中&#xff0c;我们使用 RCWA 求解器设计了一个斜面浮雕光栅 (SRG)&#xff0c;它将用于将光线耦合到单色增强现实 (AR) 系统的波导中。光栅的几何形状经过优化&#xff0c;可将正常入射光导入-1 光栅阶次。 然后我们将光栅特性导出为 Lumerical Sub-Wavelength Model …

数字媒体技术基础之:常见图片文件格式

在数字图像处理和图形设计领域&#xff0c;了解不同的图片文件格式及其特点是至关重要的。每种格式都有其独特的用途和优势。以下介绍一些最常见的图片文件格式。 JPEG Joint Photographic Experts Group 扩展名&#xff1a;.jpg 或 .jpeg 特点&#xff1a; 1、有损压缩&#x…

【Proteus仿真】【51单片机】拔河游戏设计

文章目录 一、功能简介二、软件设计三、实验现象联系作者 一、功能简介 本项目使用Proteus8仿真51单片机控制器&#xff0c;使用按键、LED、动态数码管模块等。 主要功能&#xff1a; 系统运行后&#xff0c;指示灯处于中间位置&#xff0c;数码管显示得分0&#xff0c;当按下…

20231114在HP笔记本的ubuntu20.04系统下向RealmeQ手机发送PDF文件

20231114在HP笔记本的ubuntu20.04系统下向RealmeQ手机发送PDF文件 2023/11/14 14:11 手机&#xff1a;Realme Q 笔记本电脑&#xff1a;HP https://item.jd.com/100012583174.html 惠普&#xff08;HP&#xff09;战66 三代AMD版 14英寸轻薄笔记本电脑&#xff08;锐龙7nm 六核…

多维时序 | MATLAB实现PSO-LSTM-Attention粒子群优化长短期记忆神经网络融合注意力机制的多变量时间序列预测

多维时序 | MATLAB实现PSO-LSTM-Attention粒子群优化长短期记忆神经网络融合注意力机制的多变量时间序列预测 目录 多维时序 | MATLAB实现PSO-LSTM-Attention粒子群优化长短期记忆神经网络融合注意力机制的多变量时间序列预测预测效果基本介绍模型描述程序设计参考资料 预测效果…

数据拟合、参数估计、插值等数据处理算法

介绍 数据拟合&#xff1a; 数据拟合是通过选择或构建合适的函数模型&#xff0c;将给定的数据点与该函数模型进行匹配和拟合的过程。常见的数据拟合方法包括最小二乘法和非线性最小二乘法。最小二乘法通过最小化实际数据与拟合函数的残差平方和来求解最优拟合参数。非线性最小…

HTTP/2.0协议详解

前言 HTTP/2.0&#xff1a;互联网通信的革新标准 随着互联网技术的飞速发展&#xff0c;HTTP协议作为互联网应用最广泛的通信协议&#xff0c;也在不断演进和优化。HTTP/2.0是HTTP协议的最新版本&#xff0c;它旨在提供更高效、更安全、更快速的互联网连接。 一、HTTP/2.0的…

第一篇 《随机点名答题系统》简介及设计流程图(类抽奖系统、在线答题系统、线上答题系统、在线点名系统、线上点名系统、在线考试系统、线上考试系统)

专栏目录 第一篇 《随机点名答题系统》简介及设计流程图&#xff08;类抽奖系统、在线答题系统、线上答题系统、在线点名系统、线上点名系统、在线考试系统、线上考试系统&#xff09;-CSDN博客 第二篇 《随机点名答题系统》——题库管理详解&#xff08;类抽奖系统、在线答题…

Redis04-分布式锁

目录 Redis实现分布式锁 分布式锁的工作流程 Redis实现分布式锁 Redission的watch dog Redis分布式锁的合理应用 Redis实现分布式锁 在单节点的服务器中&#xff0c;java中的synchronized机制是处于JVM层面的&#xff0c;只能保证线程之间的同步。而实际的服务部署中&…

Spark SQL编程

1. Spark SQL概述 1.1 什么是Spark SQL Spark SQL是用于结构化数据处理的Spark模块。与基本的Spark RDD API不同&#xff0c;Spark SQL提供的接口为Spark提供了有关数据结构和正在执行的计算的更多信息。在内部&#xff0c;Spark SQL使用这些额外的信息来执行额外的优化。与Spa…

解析SQL 获取表、字段及SQL查询参数

解析SQL 获取表、字段及SQL查询参数 1. 执行效果2. 使用2.1 引入依赖2.2 相关实体2.3 工具类 1. 执行效果 2. 使用 2.1 引入依赖 <!-- sql 解析处理--><dependency><groupId>com.github.jsqlparser</groupId><artifactId>jsqlparser</artifa…

「实用场景教程」如何用日程控件DHTMLX Scheduler制作酒店预订日历?(一)

dhtmlxScheduler是一个类似于Google日历的JavaScript日程安排控件&#xff0c;日历事件通过Ajax动态加载&#xff0c;支持通过拖放功能调整事件日期和时间&#xff0c;事件可以按天&#xff0c;周&#xff0c;月三个种视图显示。 DHTMLX Scheduler正式版下载 在本教程中&…

K8S的基础知识

K8S的意义与入门 专有名词 容器:包含了运行一个应用程序所需要的所有东西,包括:代码、运行时、各种依赖和配置。pod:K8s调度的最小单元,包含一个或多个容器。一个容器组中的容器具有紧密耦合性,共享资源,存储空间和IP。即同一个容器组中的容器可以通过localhost:xxx访问…

windows系统pycharm程序通过urllib下载权重https报错解决

报错内容&#xff1a; raise URLError(unknown url type: %s % type) urllib.error.URLError: <urlopen error unknown url type: https> 解决办法记录&#xff1a; 1. 下载 pyopenssl : pip install pyopenssl 此时&#xff0c; import ssl 可以通过提示指导你安…

简单地聊一聊Spring Boot的构架

前言 本文小编将详细解析Spring Boot框架&#xff0c;并通过代码举例说明每个层的作用。我们将深入探讨Spring Boot的整体架构&#xff0c;包括展示层、业务逻辑层和数据访问层。通过这些例子&#xff0c;读者将更加清晰地了解每个层在应用程序中的具体作用。通过代码实例&…

Uniapp开发 购物商城源码 在线电商商城源码 适配移动终端项目及各小程序

lilishop电商商城系统 商城移动端&#xff0c;使用Uniapp开发&#xff0c;可编译为所有移动终端项目及各小程序 源码下载&#xff1a;https://download.csdn.net/download/m0_66047725/88487579 源码下载2&#xff1a;关注我留言