Spark经典案例分享

Spark经典案例

  1. 链接操作案例
  2. 二次排序案例

链接操作案例

案例需求

数据介绍

代码如下:

package base.charpter7import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession/*** @projectName sparkGNU2023  * @package base.charpter7  * @className base.charpter7.Join  * @description ${description}  * @author pblh123* @date 2023/11/28 17:25* @version 1.0**/object Join {def main(args: Array[String]): Unit = {//    1. 创建一个sc对象if (args.length != 4) {println("usage is WordCount <rating> <movie> <output>")System.exit(5)}val murl = args(0)val ratingfile = args(1)val movingfile = args(2)val outputfile = args(3)val spark: SparkSession = new SparkSession.Builder().appName(s"${this.getClass.getSimpleName}").master(murl).getOrCreate()val sc: SparkContext = spark.sparkContext//    2. 代码主体//    判断输出路径是否存在,存在则删除val conf: Configuration = new Configuration()val fs: FileSystem = FileSystem.get(conf)if (fs.exists(new Path(outputfile))) {println(s"存在目标文件夹$outputfile")fs.delete(new Path(outputfile))println(s"目标文件夹$outputfile 已删除")}else println(s"目标文件夹$outputfile 不存在")//rating etlval ratingrdd: RDD[String] = sc.textFile(ratingfile, 1)val rating: RDD[(Int, Double)] = ratingrdd.map(line => {val fileds: Array[String] = line.split("::")(fileds(1).toInt, fileds(2).toDouble)})val movieScores: RDD[(Int, Double)] = rating.groupByKey().map(x => {val avg = x._2.sum / x._2.size(x._1, avg)})//    move etlval movierdd: RDD[String] = sc.textFile(movingfile)// movieid,(movieid,title)val movieskey: RDD[(Int, (Int, String))] = movierdd.map(line => {val fileds: Array[String] = line.split("::")(fileds(0).toInt, fileds(1))}).keyBy(tup => tup._1)// movieid,(movieid,avg_rating)val sskey: RDD[(Int, (Int, Double))] = movieScores.keyBy(tup => tup._1)// movieid, (movieid,avg_rating),(movieid,title)val joinres: RDD[(Int, ((Int, Double), (Int, String)))] = sskey.join(movieskey)// movieid,avg_rating,titleval res: RDD[(Int, Double, String)] = joinres.filter(f => f._2._1._2 > 4.0).map(f => (f._1, f._2._1._2, f._2._2._2))
//    val res: RDD[(Int, Double, String)] = sskey.join(movieskey)
//      .filter(f => f._2._1._2 > 4.0)
//      .map(f => (f._1, f._2._1._2, f._2._2._2))res.take(5).foreach(println)res.saveAsTextFile(outputfile)//  3. 关闭sc,spark对象sc.stop()spark.stop()}
}

运行结果

二次排序案例

需求及数据说明:

代码实现

SecondarySortKey.class 方法

package base.charpter7/*** @projectName sparkGNU2023  * @package base.charpter7  * @className base.charpter7.SecondarySortKey  * @description ${description}  * @author pblh123* @date 2023/11/29 17:01* @version 1.0*/class SecondarySortKey(val first:Int, val second:Int) extends Ordered[SecondarySortKey] with Serializable{override def compare(that: SecondarySortKey): Int = {if (this.first - that.first != 0){this.first - that.first} else {this.second - that.second}}
}
SecondarySortApp.scala方法
package base.charpter7import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession/*** @projectName sparkGNU2023  * @package base.charpter7  * @className base.charpter7.SecondarySortApp  * @description ${description}  * @author pblh123* @date 2023/11/29 17:04* @version 1.0**/object SecondarySortApp {def main(args: Array[String]): Unit = {//  1. 创建spark,sc对象if (args.length != 2) {println("您需要输入二个参数")System.exit(5)}val musrl: String = args(0)val spark: SparkSession = new SparkSession.Builder().appName(s"${this.getClass.getSimpleName}").master(musrl).getOrCreate()val sc: SparkContext = spark.sparkContext//  2. 代码主体// 读取一个txt文件val inputfile: String = args(1)val lines: RDD[String] = sc.textFile(inputfile, 1)// 进行二次排序val pairRDDwithSort: RDD[(SecondarySortKey, String)] = lines.map(line => {val strings: Array[String] = line.split(" ")(new SecondarySortKey(strings(0).toInt, strings(1).toInt), line)})val pairRDDwithSort2: RDD[(SecondarySortKey, String)] = pairRDDwithSort.sortByKey(false)val sortedRes: RDD[String] = pairRDDwithSort2.map(sortedline => sortedline._2)sortedRes.collect().foreach(println)//  3. 关闭sc,spark对象sc.stop()spark.stop()}
}

配置参数

运行效果

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/192422.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

四、Zookeeper节点类型

目录 1、临时节点 2、永久节点 Znode有两种,分别为临时节点和永久节点。 节点的类型在创建时即被确定,并且不能改变。 1、临时节点 临时节点的生命周期依赖于创建它们的会话。一旦会话结束,临时节点将被自动删除,

OpenCV-Python:计算机视觉介绍

目录 1.背景 2.计算机视觉发展历史 3.计算机视觉主要任务 4.计算机视觉应用场景 5.知识笔记 1.背景 OpenCV是计算机视觉的一个框架&#xff0c;想要学习OpenCV&#xff0c;需要对计算机视觉有一个大致的了解。计算机视觉是指通过计算机技术和算法来模拟人类视觉系统的能力…

Redis高效缓存:加速应用性能的利器

目录 引言 1. Redis概述 1.1 什么是Redis&#xff1f; 1.2 Redis的特点 2. Redis在缓存中的应用 2.1 缓存的重要性 2.2 Redis作为缓存的优势 2.3 缓存使用场景 3. Redis在实时应用中的应用 3.1 实时数据处理的挑战 3.2 Redis的实时数据处理优势 3.3 实时应用中的Red…

整数转罗马数字算法(leetcode第12题)

题目描述&#xff1a; 罗马数字包含以下七种字符&#xff1a; I&#xff0c; V&#xff0c; X&#xff0c; L&#xff0c;C&#xff0c;D 和 M。 字符 数值 I 1 V 5 X 10 L 50 C 100 D 50…

mediapipe+opencv实现保存图像中的人脸,抹去其他信息

mediapipeopencv MediaPipe本身不提供图像处理功能&#xff0c;它主要用于检测和跟踪人脸、手势、姿势等。如果您想要从图像中仅提取人脸主要信息并去除其他信息. # codingutf-8 """project: teatAuthor&#xff1a;念卿 刘file&#xff1a; test.pydate&…

Kubernetes学习笔记-Part.09 K8s集群构建

目录 Part.01 Kubernets与docker Part.02 Docker版本 Part.03 Kubernetes原理 Part.04 资源规划 Part.05 基础环境准备 Part.06 Docker安装 Part.07 Harbor搭建 Part.08 K8s环境安装 Part.09 K8s集群构建 Part.10 容器回退 第九章 K8s集群构建 9.1.集群初始化 集群初始化是首…

文章解读与仿真程序复现思路——电网技术EI\CSCD\北大核心《余电上网/制氢方式下微电网系统全生命周期经济性评估》

该标题涉及到对微电网系统的全生命周期经济性进行评估&#xff0c;其重点关注两种运营方式&#xff1a;余电上网和制氢。以下是对标题的解读&#xff1a; 微电网系统&#xff1a; 微电网是指一种小规模的电力系统&#xff0c;通常包括分布式能源资源&#xff08;如太阳能、风能…

TS数据类型(全)

TS支持和JS几乎相同的数据类型&#xff0c;此外还提供了实用的枚举类型 总结&#xff1a;ts中变量一开始是什么类型&#xff0c;那么后期赋值的时候&#xff0c;只能用这个类型的数据&#xff0c;是不允许用其他类型的数据赋值给当前的这个变 量中。 数字类型&#xff08;numbe…

ES通过抽样agg聚合性能提升3-5倍

一直以来&#xff0c;es的agg聚合分析性能都比较差&#xff08;对应sql的 group by&#xff09;。特别是在超多数据中做聚合&#xff0c;在搜索的条件命中特别多结果的情况下&#xff0c;聚合分析会非常非常的慢。 一个聚合条件&#xff1a;聚合分析请求的时间 search time a…

DIV从不能移动变成随便定位(静态+编程)的方法

编了一个游戏测试小网页&#xff0c;竟然发现DIV不能随便移动&#xff0c;查了半天终于解决了静态和编程定位的问题。特别记录一下。 <div οnmοusedοwn"mDown(this)" οnmοuseup"mUp(this)" style"background-color: #D94A38; position: abso…

部署springboot项目到GKE(Google Kubernetes Engine)

GKE是 Google Cloud Platform 提供的托管 Kubernetes 服务&#xff0c;允许用户在 Google 的基础设施上部署、管理和扩展容器。本文介绍如何部署一个简单的springboot项目到GKE. 本文使用podman. 如果你用的是docker, 只需要把本文中所有命令中的podman替换成docker即可 非H…

LeetCode [中等]二叉树的右视图(层序

199. 二叉树的右视图 - 力扣&#xff08;LeetCode&#xff09; 从二叉树的层序遍历改进&#xff0c;根右左 /*** Definition for a binary tree node.* public class TreeNode {* public int val;* public TreeNode left;* public TreeNode right;* public T…

MiniDumpWriteDump函数生成dmp文件

MiniDumpWriteDump函数生成dmp文件 一&#xff1a;概述二&#xff1a; CreateDump.h三&#xff1a;CreateDump.cpp四&#xff1a;main测试五&#xff1a;winDbg分析 一&#xff1a;概述 v2008及以上版本都可以用。 包含CreateDump.h&#xff0c;CreateDump.cpp文件&#xff0c…

Linux: FS: inotify

这个和网卡的event-notify是一样的逻辑,内核看到有什么事情发生,可以通知到用户,然后用户可以根据自己的需求做一些处理。第一次看到,记录一下算是可以日后可以用到的功能。 man inotify。 inotify - monitoring filesystem events 描述: The inotify API provides a mec…

java+springboot物资连锁仓库经营商业管理系统+jsp

主要任务&#xff1a;通过网络搜集与本课题相关的素材资料&#xff0c;认真分析连锁经营商业管理系统的可行性和要实现的功能&#xff0c;做好需求分析&#xff0c;确定该系统的主要功能模块&#xff0c;依据数据库设计的原则对数据库进行设计。最后通过编码实现本系统功能并测…

Python核心编程之此时起步,为时不晚

目录 一、前言 二、程序输出,print语句及“HelloWorld!” 三、程序输入和 raw_input()内建函数

【KPDK】Log Library

DPDK日志库为其他DPDK库和驱动程序提供日志记录功能。默认情况下&#xff0c;在Linux应用程序中&#xff0c;日志既发送到syslog&#xff0c;也发送到控制台。在FreeBSD和Windows应用程序上&#xff0c;日志只发送到控制台。但是&#xff0c;用户可以覆盖日志功能以使用不同的日…

Linux周期任务

我自己博客网站里的文章 Linux周期任务&#xff1a;at和crontab 每个人或多或少都有一些约会或者是工作&#xff0c;有的工作是长期周期性的&#xff0c; 例如&#xff1a; 每个月一次的工作报告每周一次的午餐会报每天需要的打卡…… 有的工作则是一次性临时的&#xff0…

Prometheus+Grafana搭建日志采集

介绍 一、什么是日志数据采集 日志数据采集是指通过各种手段获取应用程序运行时产生的各类日志信息&#xff0c;并将这些信息存储到特定的地方&#xff0c;以便后续分析和使用。通常情况下&#xff0c;这些日志信息包括系统运行状态、错误信息、用户操作记录等等。通过对这些…