Spark经典案例分享

Spark经典案例

  1. 链接操作案例
  2. 二次排序案例

链接操作案例

案例需求

数据介绍

代码如下:

package base.charpter7import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession/*** @projectName sparkGNU2023  * @package base.charpter7  * @className base.charpter7.Join  * @description ${description}  * @author pblh123* @date 2023/11/28 17:25* @version 1.0**/object Join {def main(args: Array[String]): Unit = {//    1. 创建一个sc对象if (args.length != 4) {println("usage is WordCount <rating> <movie> <output>")System.exit(5)}val murl = args(0)val ratingfile = args(1)val movingfile = args(2)val outputfile = args(3)val spark: SparkSession = new SparkSession.Builder().appName(s"${this.getClass.getSimpleName}").master(murl).getOrCreate()val sc: SparkContext = spark.sparkContext//    2. 代码主体//    判断输出路径是否存在,存在则删除val conf: Configuration = new Configuration()val fs: FileSystem = FileSystem.get(conf)if (fs.exists(new Path(outputfile))) {println(s"存在目标文件夹$outputfile")fs.delete(new Path(outputfile))println(s"目标文件夹$outputfile 已删除")}else println(s"目标文件夹$outputfile 不存在")//rating etlval ratingrdd: RDD[String] = sc.textFile(ratingfile, 1)val rating: RDD[(Int, Double)] = ratingrdd.map(line => {val fileds: Array[String] = line.split("::")(fileds(1).toInt, fileds(2).toDouble)})val movieScores: RDD[(Int, Double)] = rating.groupByKey().map(x => {val avg = x._2.sum / x._2.size(x._1, avg)})//    move etlval movierdd: RDD[String] = sc.textFile(movingfile)// movieid,(movieid,title)val movieskey: RDD[(Int, (Int, String))] = movierdd.map(line => {val fileds: Array[String] = line.split("::")(fileds(0).toInt, fileds(1))}).keyBy(tup => tup._1)// movieid,(movieid,avg_rating)val sskey: RDD[(Int, (Int, Double))] = movieScores.keyBy(tup => tup._1)// movieid, (movieid,avg_rating),(movieid,title)val joinres: RDD[(Int, ((Int, Double), (Int, String)))] = sskey.join(movieskey)// movieid,avg_rating,titleval res: RDD[(Int, Double, String)] = joinres.filter(f => f._2._1._2 > 4.0).map(f => (f._1, f._2._1._2, f._2._2._2))
//    val res: RDD[(Int, Double, String)] = sskey.join(movieskey)
//      .filter(f => f._2._1._2 > 4.0)
//      .map(f => (f._1, f._2._1._2, f._2._2._2))res.take(5).foreach(println)res.saveAsTextFile(outputfile)//  3. 关闭sc,spark对象sc.stop()spark.stop()}
}

运行结果

二次排序案例

需求及数据说明:

代码实现

SecondarySortKey.class 方法

package base.charpter7/*** @projectName sparkGNU2023  * @package base.charpter7  * @className base.charpter7.SecondarySortKey  * @description ${description}  * @author pblh123* @date 2023/11/29 17:01* @version 1.0*/class SecondarySortKey(val first:Int, val second:Int) extends Ordered[SecondarySortKey] with Serializable{override def compare(that: SecondarySortKey): Int = {if (this.first - that.first != 0){this.first - that.first} else {this.second - that.second}}
}
SecondarySortApp.scala方法
package base.charpter7import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession/*** @projectName sparkGNU2023  * @package base.charpter7  * @className base.charpter7.SecondarySortApp  * @description ${description}  * @author pblh123* @date 2023/11/29 17:04* @version 1.0**/object SecondarySortApp {def main(args: Array[String]): Unit = {//  1. 创建spark,sc对象if (args.length != 2) {println("您需要输入二个参数")System.exit(5)}val musrl: String = args(0)val spark: SparkSession = new SparkSession.Builder().appName(s"${this.getClass.getSimpleName}").master(musrl).getOrCreate()val sc: SparkContext = spark.sparkContext//  2. 代码主体// 读取一个txt文件val inputfile: String = args(1)val lines: RDD[String] = sc.textFile(inputfile, 1)// 进行二次排序val pairRDDwithSort: RDD[(SecondarySortKey, String)] = lines.map(line => {val strings: Array[String] = line.split(" ")(new SecondarySortKey(strings(0).toInt, strings(1).toInt), line)})val pairRDDwithSort2: RDD[(SecondarySortKey, String)] = pairRDDwithSort.sortByKey(false)val sortedRes: RDD[String] = pairRDDwithSort2.map(sortedline => sortedline._2)sortedRes.collect().foreach(println)//  3. 关闭sc,spark对象sc.stop()spark.stop()}
}

配置参数

运行效果

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/192422.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

四、Zookeeper节点类型

目录 1、临时节点 2、永久节点 Znode有两种,分别为临时节点和永久节点。 节点的类型在创建时即被确定,并且不能改变。 1、临时节点 临时节点的生命周期依赖于创建它们的会话。一旦会话结束,临时节点将被自动删除,

OpenCV-Python:计算机视觉介绍

目录 1.背景 2.计算机视觉发展历史 3.计算机视觉主要任务 4.计算机视觉应用场景 5.知识笔记 1.背景 OpenCV是计算机视觉的一个框架&#xff0c;想要学习OpenCV&#xff0c;需要对计算机视觉有一个大致的了解。计算机视觉是指通过计算机技术和算法来模拟人类视觉系统的能力…

Redis高效缓存:加速应用性能的利器

目录 引言 1. Redis概述 1.1 什么是Redis&#xff1f; 1.2 Redis的特点 2. Redis在缓存中的应用 2.1 缓存的重要性 2.2 Redis作为缓存的优势 2.3 缓存使用场景 3. Redis在实时应用中的应用 3.1 实时数据处理的挑战 3.2 Redis的实时数据处理优势 3.3 实时应用中的Red…

mediapipe+opencv实现保存图像中的人脸,抹去其他信息

mediapipeopencv MediaPipe本身不提供图像处理功能&#xff0c;它主要用于检测和跟踪人脸、手势、姿势等。如果您想要从图像中仅提取人脸主要信息并去除其他信息. # codingutf-8 """project: teatAuthor&#xff1a;念卿 刘file&#xff1a; test.pydate&…

Kubernetes学习笔记-Part.09 K8s集群构建

目录 Part.01 Kubernets与docker Part.02 Docker版本 Part.03 Kubernetes原理 Part.04 资源规划 Part.05 基础环境准备 Part.06 Docker安装 Part.07 Harbor搭建 Part.08 K8s环境安装 Part.09 K8s集群构建 Part.10 容器回退 第九章 K8s集群构建 9.1.集群初始化 集群初始化是首…

文章解读与仿真程序复现思路——电网技术EI\CSCD\北大核心《余电上网/制氢方式下微电网系统全生命周期经济性评估》

该标题涉及到对微电网系统的全生命周期经济性进行评估&#xff0c;其重点关注两种运营方式&#xff1a;余电上网和制氢。以下是对标题的解读&#xff1a; 微电网系统&#xff1a; 微电网是指一种小规模的电力系统&#xff0c;通常包括分布式能源资源&#xff08;如太阳能、风能…

ES通过抽样agg聚合性能提升3-5倍

一直以来&#xff0c;es的agg聚合分析性能都比较差&#xff08;对应sql的 group by&#xff09;。特别是在超多数据中做聚合&#xff0c;在搜索的条件命中特别多结果的情况下&#xff0c;聚合分析会非常非常的慢。 一个聚合条件&#xff1a;聚合分析请求的时间 search time a…

部署springboot项目到GKE(Google Kubernetes Engine)

GKE是 Google Cloud Platform 提供的托管 Kubernetes 服务&#xff0c;允许用户在 Google 的基础设施上部署、管理和扩展容器。本文介绍如何部署一个简单的springboot项目到GKE. 本文使用podman. 如果你用的是docker, 只需要把本文中所有命令中的podman替换成docker即可 非H…

java+springboot物资连锁仓库经营商业管理系统+jsp

主要任务&#xff1a;通过网络搜集与本课题相关的素材资料&#xff0c;认真分析连锁经营商业管理系统的可行性和要实现的功能&#xff0c;做好需求分析&#xff0c;确定该系统的主要功能模块&#xff0c;依据数据库设计的原则对数据库进行设计。最后通过编码实现本系统功能并测…

Linux周期任务

我自己博客网站里的文章 Linux周期任务&#xff1a;at和crontab 每个人或多或少都有一些约会或者是工作&#xff0c;有的工作是长期周期性的&#xff0c; 例如&#xff1a; 每个月一次的工作报告每周一次的午餐会报每天需要的打卡…… 有的工作则是一次性临时的&#xff0…

Prometheus+Grafana搭建日志采集

介绍 一、什么是日志数据采集 日志数据采集是指通过各种手段获取应用程序运行时产生的各类日志信息&#xff0c;并将这些信息存储到特定的地方&#xff0c;以便后续分析和使用。通常情况下&#xff0c;这些日志信息包括系统运行状态、错误信息、用户操作记录等等。通过对这些…

牛客算法题 【HJ97 记负均正】 golang实现

题目 HJ97 记负均正 描述 首先输入要输入的整数个数n&#xff0c;然后输入n个整数。输出为n个整数中负数的个数&#xff0c;和所有正整数的平均值&#xff0c;结果保留一位小数。 0即不是正整数&#xff0c;也不是负数&#xff0c;不计入计算。如果没有正数&#xff0c;则平均…

大文件分片上传、分片进度以及整体进度、断点续传(一)

大文件分片上传 效果展示 前端 思路 前端的思路&#xff1a;将大文件切分成多个小文件&#xff0c;然后并发给后端。 页面构建 先在页面上写几个组件用来获取文件。 <body><input type"file" id"file" /><button id"uploadButton…

动态规划学习——回文串

目录 一&#xff0c;回文子串 1.题目 2.题目接口 3&#xff0c;解题代码及其思路 解题代码&#xff1a; 二&#xff0c; 分割回文串II 1&#xff0c;题目 2&#xff0c;题目接口 3&#xff0c;解题思路及其代码 一&#xff0c;回文子串 1.题目 给你一个字符串 s &…

模板初阶(2):函数模板的匹配原则,类模板的实例化

一、函数模板的匹配原则 int Add(const int& x, const int& y) {return x y; }template <class T> T Add(const T& x, const T& y) {return x y; }int main() {int a1 1, a2 2;Add(a1, a2);double d1 1.1, d2 2.2;Add(d1, d2);return 0; }一个非模…

【搭建网站】搭建一个自己的网站

【搭建网站】搭建一个自己的网站 传送门&#xff1a;搭建一个自己的网站&#xff1f;看这个就够了&#xff01; P1&#xff0c;建站准备 P2&#xff0c;创建站点

ZooKeeper 如何保证数据一致性?

在分布式场景中&#xff0c;ZooKeeper 的应用非常广泛&#xff0c;比如数据发布和订阅、命名服务、配置中心、注册中心、分布式锁等。 ZooKeeper 提供了一个类似于 Linux 文件系统的数据模型&#xff0c;和基于 Watcher 机制的分布式事件通知&#xff0c;这些特性都依赖 ZooKee…

【开源】基于JAVA语言的桃花峪滑雪场租赁系统

项目编号&#xff1a; S 036 &#xff0c;文末获取源码。 \color{red}{项目编号&#xff1a;S036&#xff0c;文末获取源码。} 项目编号&#xff1a;S036&#xff0c;文末获取源码。 目录 一、摘要1.1 项目介绍1.2 项目录屏 二、功能模块2.1 游客服务2.2 雪场管理 三、数据库设…

Redis数据存储:高效、灵活、实时

目录 引言 1. Redis概述 1.1 什么是Redis&#xff1f; 1.2 Redis的数据结构 1.3 Redis的持久化机制 2. Redis的使用场景 2.1 缓存 2.2 会话存储 2.3 发布/订阅系统 2.4 计数器和排行榜 3. Redis最佳实践 3.1 数据模型设计 3.2 键的命名规范 3.3 事务和原子操作 3…