【大数据】Spark使用大全:下载安装、RDD操作、JAVA编程、SQL

目录

前言

1.下载安装

2.RDD操作

3.JAVA编程示例

4.Spark SQL


前言

本文是作者大数据系列中的一文,专栏地址:

https://blog.csdn.net/joker_zjn/category_12631789.html?spm=1001.2014.3001.5482

该系列会成体系的聊一聊整个大数据的技术栈,绝对干货,欢迎订阅。

1.下载安装

前置环境:

  • Hadoop 3.1.3
  • Java JDK 1.8

下载地址:

Downloads | Apache Spark

往下拉找到Spark release archives.

由于前面我们已经搭建好了hadoop环境,所以这里选择with out hadoop的版本。

配置config目录下有一个配置模板spark-env.sh.template:

将这个模板修改或者复制为spark-env.sh然后在里面:

export SPARK_DIST_CLASSPATH=${Hadoop的安装路径/bin classpath}

因为Spark只是个计算引擎,具体要去操作对应的分部署文件系统的,所以将Spark的类路径指向了hadoop。也就是通过这个配置将Spark要操作的数据源设置为了HDFS。

启动:

bin目录下:

./run-exmaple SparkPi

这是一个Spark自带的demo,如果跑起来不报错,说明就没什么问题了。

2.RDD操作

可以用Spark自带的Spark  shell来进行RDD操作:

./bin/spark-shell

RDD操作分为两类:

  • 转换,就是只是返回中间数据集的操作。
  • 动作,就是有具体单个返回值的操作。

map - 应用于RDD的每个元素,产生一个新的RDD。

val numbersRdd = spark.sparkContext.parallelize(Array(1, 2, 3, 4))
val squaredRdd = numbersRdd.map(x => x * x) 

filter - 根据函数条件过滤RDD中的元素。

val evenNumbersRdd = numbersRdd.filter(_ % 2 == 0)

flatMap - 对RDD中的每个元素应用函数并展平结果。

val wordsRdd = spark.sparkContext.textFile("hdfs://path/to/textfile.txt")
val wordsFlatMapped = wordsRdd.flatMap(line => line.split(" "))

mapPartitions - 对每个分区应用一个函数。

val incrementedRdd = numbersRdd.mapPartitions(iter => iter.map(x => x + 1))

union - 合并两个RDD。

val rdd1 = spark.sparkContext.parallelize(Array(1, 2))
val rdd2 = spark.sparkContext.parallelize(Array(3, 4))
val combinedRdd = rdd1.union(rdd2)

distinct - 返回RDD中不重复的元素。

val uniqueNumbers = numbersRdd.distinct()

join - 对两个键值对RDD进行内连接。

val rddA = spark.sparkContext.parallelize(Array((1, "a"), (2, "b")))
val rddB = spark.sparkContext.parallelize(Array((1, "x"), (3, "y")))
val joinedRdd = rddA.join(rddB)

reduce - 通过函数聚合RDD中的所有元素。

val sum = numbersRdd.reduce(_ + _)

collect - 返回RDD的所有元素到Driver作为数组。

val allElements = numbersRdd.collect()

count - 返回RDD中元素的数量。

val count = numbersRdd.count()

first - 返回RDD的第一个元素。

val firstElement = numbersRdd.first()

take(n) - 返回RDD的前n个元素。

val topThree = numbersRdd.take(3)

saveAsTextFile - 将RDD的内容保存为文本文件。

wordsRdd.saveAsTextFile("hdfs://path/to/output")

foreach - 对RDD的每个元素应用函数,常用于副作用操作。

numbersRdd.foreach(println)

3.JAVA编程示例

依赖:

<dependencies><dependency> <!-- Spark dependency --><groupId>org.apache.spark</groupId><artifactId>spark-core_2.11</artifactId><version>2.4.0</version></dependency></dependencies>

 编码:

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;public class WordCountFromHDFS {public static void main(String[] args) {if (args.length != 1) {System.err.println("Usage: WordCountFromHDFS <input path>");System.exit(1);}// 初始化Spark配置SparkConf conf = new SparkConf().setAppName("WordCountFromHDFS").setMaster("local"); // 本地模式运行,根据实际情况可改为yarn等// 创建SparkContext实例JavaSparkContext sc = new JavaSparkContext(conf);// HDFS文件路径,这里直接从命令行参数获取String inputPath = args[0];// 从HDFS读取文件内容JavaRDD<String> lines = sc.textFile(inputPath);// 每行分割成单词,然后扁平化,最后统计每个单词出现的次数JavaRDD<String> words = lines.flatMap(line -> Arrays.asList(line.split("\\s+")).iterator());JavaPairRDD<String, Integer> wordCounts = words.mapToPair(word -> new Tuple2<>(word, 1)).reduceByKey((a, b) -> a + b);// 收集结果并打印List<Tuple2<String, Integer>> results = wordCounts.collect();for (Tuple2<String, Integer> result : results) {System.out.println(result._1() + ": " + result._2());}// 停止SparkContextsc.stop();}
}

4.Spark SQL

park SQL是Spark的一个组件,它从Spark 1.3.0版本开始被引入,并在后续版本中不断得到增强和发展。Spark SQL允许用户使用SQL或者DataFrame API来处理结构化和半结构化的数据。下面做个小小的演示。

假设我们有一个CSV文件位于HDFS上,我们可以用以下命令加载它:

   val df = spark.read
     .option("header", "true")
     .csv("hdfs://localhost:9000/path/to/yourfile.csv")

创建临时视图:

   df.createOrReplaceTempView("my_table")

执行sql:

val result = spark.sql("SELECT column_name FROM my_table WHERE condition")

joinResult.show()

连表查询:

// 假设dfOrders和dfCustomers分别是orders和customers的DataFrame
dfOrders.createOrReplaceTempView("orders")
dfCustomers.createOrReplaceTempView("customers")

val joinResult = spark.sql(
  """
    SELECT orders.order_id, customers.customer_name
    FROM orders
    INNER JOIN customers
    ON orders.customer_id = customers.customer_id
  """
)

joinResult.show()

当然Spark SQL也有对应的JAVA API,支持编程的方式来操作,用到的时候查一下就是,此处就不展开了。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/26599.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

stable-diffusion 3 体验部署流程(ComfyUI)

环境准备 下载及简介 git clone https://huggingface.co/stabilityai/stable-diffusion-3-medium SD3 checkpoints&#xff1a; sd3_medium_incl_clips.safetensors (5.5GB)sd3_medium_incl_clips_t5xxlfp8.safetensors (10.1GB)sd3_medium.safetensors (4.3GB) 前两个可以…

SAP OB52 财务账期月结月底月初开关

公告&#xff1a;周一至周五每日一更&#xff0c;周六日存稿&#xff0c;请您点“关注”和“在看”&#xff0c;后续推送的时候不至于看不到每日更新内容&#xff0c;感谢。 这是一条刮刮乐&#xff0c;按住全部选中&#xff1a;点关注的人最帅最美&#xff0c;欢迎&#xff1…

vuInhub靶场实战系列--Kioptrix Level #4

免责声明 本文档仅供学习和研究使用,请勿使用文中的技术源码用于非法用途,任何人造成的任何负面影响,与本人无关。 目录 免责声明前言一、环境配置1.1 靶场信息1.2 靶场配置 二、信息收集2.1 主机发现2.1.1 netdiscover2.1.2 arp-scan主机扫描 2.2 端口扫描2.3 指纹识别2.4 目…

nodejs——原型链污染

一、引用类型皆为对象 原型和原型链都是来源于对象而服务于对象的概念&#xff0c;所以我们要先明确一点&#xff1a; JavaScript中一切引用类型都是对象&#xff0c;对象就是属性的集合。 Array类型、Function类型、Object类型、Date类型、RegExp类型等都是引用类型。 也就…

Vue22-v-model收集表单数据

一、效果图 二、代码 2-1、HTML代码 2-2、vue代码 1、v-model单选框的收集信息 v-model&#xff1a;默认收集的就是元素中的value值。 单选框添加默认值&#xff1a; 2、v-model多选框的收集信息 ①、多个选择的多选 注意&#xff1a; 此处的hobby要是数组&#xff01;&…

【深度学习基础】激活函数:Tanh、Sigmoid 和 Softmax

激活函数是深度学习模型中不可或缺的一部分&#xff0c;它们赋予神经网络强大的非线性变换能力&#xff0c;使其能够拟合复杂的函数关系。在这篇博文中&#xff0c;我们将探讨三种常见的激活函数&#xff1a;Tanh、Sigmoid 和 Softmax&#xff0c;并提供一些记忆它们的技巧。 1…

C++基础知识(八:STL标准库 deque )

deque在C的STL(Standard Template Library)中是一个非常强大的容器&#xff0c;它的全称是“Double-Ended Queue”&#xff0c;即双端队列。deque结合了数组和链表的优点&#xff0c;提供了在两端进行高效插入和删除操作的能力&#xff0c;同时保持了随机访问的特性。 双端队列…

#慧眼识模每日PK[话题]##用五种语言说爸爸我爱你[话题]#

#慧眼识模每日PK #用五种语言说爸爸我爱你 你觉得哪个模型回答得更好&#xff1f;欢迎留言 A.蓝 B.紫 更多问题&#xff0c;扫码体验吧&#xff5e; by 国家&#xff08;杭州&#xff09;新型交换中心

养猫发现猫毛过敏?宠物空气净化器真的能拯救猫毛过敏吗?

广东省 猫咪是许多人梦寐以求的伴侣&#xff0c;但对于轻度猫毛过敏和鼻炎患者来说&#xff0c;养猫似乎是个遥不可及的梦想。我常在社交媒体上羡慕地观看朋友们的吸猫日常&#xff0c;却因过敏无法亲自养猫。这种遗憾驱使我寻找解决方案&#xff0c;从研究低过敏猫种到尝试空气…

2024/06/13--代码随想录算法3/17|01背包问题 二维、01背包问题 一维、416. 分割等和子集

01背包问题 二维 卡码网链接 动态规划5步曲 确定dp数组&#xff08;dp table&#xff09;以及下标的含义&#xff1a;dp[i][j] &#xff1a;从下标为[0,i-1]个物品中任取&#xff0c;放进容量为j的背包&#xff0c;价值总和最大为多少。确定递推公式&#xff0c; 有两个方向可…

算法专题总结链接地址

刷力扣的时候会遇到一些总结类型的题解&#xff0c;在此记录&#xff0c;方便自己以后找 前缀和 前缀和https://leetcode.cn/problems/unique-substrings-in-wraparound-string/solutions/432752/xi-fa-dai-ni-xue-suan-fa-yi-ci-gao-ding-qian-zhui-/ 单调栈 单调栈https:…

Javaweb04-Servlet技术2(HttpServletResponse, HttpServletRequest)

Servlet技术基础 HttpServletResponse对象 HttpServletResponce对象是继承ServletResponse接口&#xff0c;专门用于封装Http请求 HttpServletResponce有关响应行的方法 方法说明功能描述void setStatus(int stauts)用于设置HTTP响应消息的状态码&#xff0c;并生成响应状态…

第17章通信系统架构设计理论与实践

常见的5种常用的网络架构和构建网络的相关技术&#xff0c;以及网络构建的分析和设计方法。 17.1通信系统概述 通信技术和网络技术的发展&#xff0c;通信网络发生很大变化&#xff0c;入网的形式变化&#xff0c;传输的速率的提高、接入网络的方式多样化、网络结构的更为复杂…

~$开头的临时文件是什么?可以删除吗?

&#xff08;2023.12.4&#xff09; 在进行Word文档编辑的时候&#xff0c;都会产生一个以~$开头的临时文件&#xff0c;它会自动备份文档编辑内容&#xff0c;若是正常关闭程序&#xff0c;这个文档就会自动消失&#xff1b;而在非正常情况下关闭word文档&#xff0c;如断电&…

考研计组chap2数据的表示和运算(补充)

一、进位计数制 1.r进制 第i位表示r进制的权为i 2.进制转换 &#xff08;1&#xff09;r->10 对应位置数*权值 &#xff08;2&#xff09;2 -> 16 or 8 每三位2进制数可表示1位16进制 每四位2进制数可表示1位16进制 so 分开之后转为16进制即可 eg&#xff1a;11…

JDK8新特性【接口新特征、lambda语法、Supplier、Consumer、Function、Predicate】

目录 一、关于接口的新特性1.1 jdk1.8之前的接口重要特性1.2 JDK8以后代码演示 1.3 总结通过代码演示发现作用 二、Lambda表达式[重点]2.1 将匿名内部类写法改写为lambda写法2.2 语法特点能够写成lambda形式的的前提语法特征代码演示深入理解lambda 2.3 总结 三、函数式接口3.1…

ISO17025认证是什么?怎么做?

ISO17025认证是一种国际通用的实验室质量管理体系认证&#xff0c;其目标是确保实验室的技术能力、管理水平以及测试结果的可靠性和准确性达到国际认可的标准。该认证由国际标准化组织&#xff08;ISO&#xff09;和国际电工委员会&#xff08;IEC&#xff09;联合发布&#xf…

pytorch神经网络训练(AlexNet)

导包 import osimport torchimport torch.nn as nnimport torch.optim as optimfrom torch.utils.data import Dataset, DataLoaderfrom PIL import Imagefrom torchvision import models, transforms 定义自定义图像数据集 class CustomImageDataset(Dataset): 定义一个自…

美丽的拉萨,神奇的布达拉宫

原文链接&#xff1a;美丽的拉萨&#xff0c;神奇的布达拉宫 2022年11月30日&#xff0c;可能将成为一个改变人类历史的日子——美国人工智能开发机构OpenAI推出了聊天机器人ChatGPT-3.5&#xff0c;将人工智能的发展推向了一个新的高度。2023年11月7日&#xff0c;OpenAI首届…

TcpClient 服务器、客户端连接

TcpClient 服务器 TcpListener 搭建tcp服务器的类&#xff0c;基于socket套接字通信的 1 创建服务器对象 TcpListener server new TcpListener(IPAddress.Parse("127.0.0.1"), 3000); 2 开启服务器 设置最大连接数 server.Start(1000); 3 接收客户端的链接,只能…