SpringBoot操作spark处理hdfs文件

SpringBoot操作spark处理hdfs文件

  • 在这里插入图片描述

1、导入依赖

  • <!--        spark依赖--><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.12</artifactId><version>3.2.2</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_2.12</artifactId><version>3.2.2</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.spark/spark-mllib --><dependency><groupId>org.apache.spark</groupId><artifactId>spark-mllib_2.12</artifactId><version>3.2.2</version></dependency>
    

2、配置spark信息

  • 建立一个配置文件,配置spark信息
import org.apache.spark.SparkConf;
import org.apache.spark.sql.SparkSession;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;//将文件交于spring管理
@Configuration
public class SparkConfig {//使用yml中的配置@Value("${spark.master}")private String sparkMaster;@Value("${spark.appName}")private String sparkAppName;@Value("${hdfs.user}")private String hdfsUser;@Value("${hdfs.path}")private String hdfsPath;@Beanpublic SparkConf sparkConf() {SparkConf conf = new SparkConf();conf.setMaster(sparkMaster);conf.setAppName(sparkAppName);// 添加HDFS配置conf.set("fs.defaultFS", hdfsPath);conf.set("spark.hadoop.hdfs.user",hdfsUser);return conf;}@Beanpublic SparkSession sparkSession() {return SparkSession.builder().config(sparkConf()).getOrCreate();}
}

3、controller和service

  • controller类

    • import org.springframework.beans.factory.annotation.Autowired;
      import org.springframework.web.bind.annotation.GetMapping;
      import org.springframework.web.bind.annotation.RequestMapping;
      import org.springframework.web.bind.annotation.RestController;
      import xyz.zzj.traffic_main_code.service.SparkService;@RestController
      @RequestMapping("/spark")
      public class SparkController {@Autowiredprivate SparkService sparkService;@GetMapping("/run")public String runSparkJob() {//读取Hadoop HDFS文件String filePath = "hdfs://192.168.44.128:9000/subwayData.csv";sparkService.executeHadoopSparkJob(filePath);return "Spark job executed successfully!";}
      }
      
  • 处理地铁数据的service

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import xyz.zzj.traffic_main_code.service.SparkReadHdfs;import java.io.IOException;
import java.net.URI;
import static org.apache.spark.sql.functions.*;@Service
public class SparkReadHdfsImpl implements SparkReadHdfs {private final SparkSession spark;@Value("${hdfs.user}")private String hdfsUser;@Value("${hdfs.path}")private String hdfsPath;@Autowiredpublic SparkReadHdfsImpl(SparkSession spark) {this.spark = spark;}/*** 读取HDFS上的CSV文件并上传到HDFS* @param filePath*/@Overridepublic void sparkSubway(String filePath) {try {// 设置Hadoop配置JavaSparkContext jsc = JavaSparkContext.fromSparkContext(spark.sparkContext());Configuration hadoopConf = jsc.hadoopConfiguration();hadoopConf.set("fs.defaultFS", hdfsPath);hadoopConf.set("hadoop.user.name", hdfsUser);// 读取HDFS上的文件Dataset<Row> df = spark.read().option("header", "true") // 指定第一行是列名.option("inferSchema", "true") // 自动推断列的数据类型.csv(filePath);// 显示DataFrame的所有数据
//            df.show(Integer.MAX_VALUE, false);// 对DataFrame进行清洗和转换操作// 检查缺失值df.select("number", "people", "dateTime").na().drop().show();// 对数据进行类型转换Dataset<Row> df2 = df.select(col("number").cast(DataTypes.IntegerType),col("people").cast(DataTypes.IntegerType),to_date(col("dateTime"), "yyyy年MM月dd日").alias("dateTime"));// 去重Dataset<Row> df3 = df2.dropDuplicates();// 数据过滤,确保people列没有负数Dataset<Row> df4 = df3.filter(col("people").geq(0));
//            df4.show();// 数据聚合,按dateTime分组,统计每天的总客流量Dataset<Row> df6 = df4.groupBy("dateTime").agg(sum("people").alias("total_people"));
//            df6.show();sparkForSubway(df6,"/time_subwayData.csv");//数据聚合,获取每天人数最多的地铁numberDataset<Row> df7 = df4.groupBy("dateTime").agg(max("people").alias("max_people"));sparkForSubway(df7,"/everyday_max_subwayData.csv");//数据聚合,计算每天的客流强度:每天总people除以632840Dataset<Row> df8 = df4.groupBy("dateTime").agg(sum("people").divide(632.84).alias("strength"));sparkForSubway(df8,"/everyday_strength_subwayData.csv");} catch (Exception e) {e.printStackTrace();}}private static void sparkForSubway(Dataset<Row> df6, String hdfsPath) throws IOException {// 保存处理后的数据到HDFSdf6.coalesce(1).write().mode("overwrite").option("header", "true").csv("hdfs://192.168.44.128:9000/time_subwayData");// 创建Hadoop配置Configuration conf = new Configuration();// 获取FileSystem实例FileSystem fs = FileSystem.get(URI.create("hdfs://192.168.44.128:9000"), conf);// 定义临时目录和目标文件路径Path tempDir = new Path("/time_subwayData");FileStatus[] files = fs.listStatus(tempDir);// 检查目标文件是否存在,如果存在则删除Path targetFile1 = new Path(hdfsPath);if (fs.exists(targetFile1)) {fs.delete(targetFile1, true); // true 表示递归删除}for (FileStatus file : files) {if (file.isFile() && file.getPath().getName().startsWith("part-")) {Path targetFile = new Path(hdfsPath);fs.rename(file.getPath(), targetFile);}}// 删除临时目录fs.delete(tempDir, true);}}

4、运行

  • 项目运行完后,打开浏览器
    • spark处理地铁数据
      • http://localhost:8686/spark/dispose
  • 观察spark和hdfs
    • http://192.168.44.128:8099/
    • http://192.168.44.128:9870/explorer.html#/
      • image-20250109095551610

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/66922.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Jaeger UI使用、采集应用API排除特定路径

Jaeger使用 注&#xff1a; Jaeger服务端版本为&#xff1a;jaegertracing/all-in-one-1.6.0 OpenTracing版本为&#xff1a;0.33.0&#xff0c;最后一个版本&#xff0c;停留在May 06, 2019。最好升级到OpenTelemetry。 Jaeger客户端版本为&#xff1a;jaeger-client-1.3.2。…

PySide6-UI界面设计

导论&#xff1a; PySide6和PyQt都是Python对Qt框架的绑定&#xff0c;允许开发者使用Qt创建平台的GUI应用程序。如果你正在开发商业项目&#xff0c;或者需要使用最新的QT6特性&#xff0c;PySide6是一个更好的选择。如果你更倾向于一个成熟的社区和丰富的资源&#xff0c;Py…

使用 Multer 上传图片到阿里云 OSS

文件上传到哪里更好&#xff1f; 上传到服务器本地 上传到服务器本地&#xff0c;这种方法在现今商业项目中&#xff0c;几乎已经见不到了。因为服务器带宽&#xff0c;磁盘 IO 都是非常有限的。将文件上传和读取放在自己服务器上&#xff0c;并不是明智的选择。 上传到云储存…

电机控制的数字化升级:基于DSP和FPGA的仿真与实现

数字信号处理器&#xff08;DSP&#xff0c;Digital Signal Processor&#xff09;在工业自动化领域的应用日益广泛。DSP是一种专门用于将模拟信号转换成数字信号并进行处理的技术&#xff0c;能够实现信号的数字滤波、重构、调制和解调等多项功能&#xff0c;确保信号处理的精…

leetcode_2816. 翻倍以链表形式表示的数字

2816. 翻倍以链表形式表示的数字 - 力扣&#xff08;LeetCode&#xff09; 搜先看到这个题目 链表的节点那么多 已经远超longlong能够表示的范围 那么暴力解题 肯定是不可以的了 我们可以想到 乘法运算中 就是从低位到高位进行计算 刚开始 我想先反转链表 然后在计算 然后在进…

Element Plus 之 el-table相同行合并(通用函数),相同列合并(自行判断需合并的字段以及相应的列下标)

展示 代码 <el-table :data"tableData" border style"width: 100%" :span-method"objectSpanMethod"><el-table-column prop"date" label"Date" width"180" align"center" /><el-table…

汽车供应链关键节点:物流采购成本管理全解析

在汽车行业&#xff0c;供应链管理是一项至关重要的任务。汽车制造从零部件的生产到整车的交付&#xff0c;涉及多个环节&#xff0c;其中物流、采购与成本管理是核心节点。本文将深入分析这些关键环节&#xff0c;探讨如何通过供应商管理系统及相关工具优化供应链管理。 一、…

Linux文件系统的安全保障---Overlayroot!

overlayroot 是一种使用 OverlayFS 实现的功能&#xff0c;可将根文件系统挂载为只读&#xff0c;并通过一个临时的写层实现对文件系统的修改。这种方法非常适合嵌入式设备或需要保持系统文件完整性和安全性的场景。下文以 RK3568 平台为例&#xff0c;介绍制作 overlayroot 的…

DEVIN AI==初步使用

注册 官网 Devin (the Developer) 开启一个对话session 体验第一个官方DEMO 工作内容是爬取两个网页&#xff0c;对比他们的注册流程&#xff0c;然后生成一个网页展示对比的结果 左边是对话&#xff0c;可以描述你的需求&#xff0c;右边是DEVIN根据你的要求做的一些事情&…

【Rust自学】11.3. 自定义错误信息

喜欢的话别忘了点赞、收藏加关注哦&#xff0c;对接下来的教程有兴趣的可以关注专栏。谢谢喵&#xff01;(&#xff65;ω&#xff65;) 11.3.1. 添加错误信息 在 11.2. 断言(Assert) 中我们学习了assert!、assert_eq!和assert_ne!这三个宏&#xff0c;而这篇文章讲的就是它…

git - 用SSH方式迁出远端git库

文章目录 git - 用SSH方式迁出远端git库概述笔记以gitee为例产生RSA密钥对 备注END git - 用SSH方式迁出远端git库 概述 最近一段时间&#xff0c;在网络没问题的情况下&#xff0c;用git方式直接迁出git库总是会失败。 失败都是在远端, 显示RPC错误。 但是git服务器端是没问…

Linux第一个系统程序---进度条

进度条---命令行版本 回车换行 其实本质上回车和换行是不同概念&#xff0c;我们用一张图来简单的理解一下&#xff1a; 在计算机语言当中&#xff1a; 换行符&#xff1a;\n 回车符&#xff1a;\r \r\n&#xff1a;回车换行 这时候有人可能会有疑问&#xff1a;我在学习C…

智慧城市应急指挥中心系统平台建设方案

建设背景与目标 智慧城市应急指挥中心系统平台的建设&#xff0c;源于对城市管理精细化、智能化的迫切需求。平台旨在通过整合各方资源&#xff0c;实现应急事件的快速响应与高效处置&#xff0c;提升城市安全管理水平。 前端设计与信息采集 前端设计注重立体化、全方位信息…

011:利用大津算法完成图片分割

本文为合集收录&#xff0c;欢迎查看合集/专栏链接进行全部合集的系统学习。 合集完整版请参考这里。 上一篇文章介绍了大津算法可以完成图片的前景和背景分割。 总的来说&#xff0c;大津算法的核心思想就两个&#xff1a; 数学上&#xff0c;通过确定一个像素阈值&#xf…

一键部署Netdata系统无需公网IP轻松实现本地服务器的可视化监控

文章目录 前言1.关于Netdata2.本地部署Netdata3.使用Netdata4.cpolar内网穿透工具安装5.创建远程连接公网地址6.固定Netdata公网地址 &#x1f4a1; 推荐 前些天发现了一个巨牛的人工智能学习网站&#xff0c;通俗易懂&#xff0c;风趣幽默&#xff0c;忍不住分享一下给大家。…

【Artificial Intelligence篇】AI 入侵家庭:解锁智能生活的魔法密码,开启居家梦幻新体验

家庭智能化的时代已经到来&#xff0c;准备好了嘛&#xff01;&#xff01;&#xff01; 在当今数字化浪潮汹涌澎湃的时代&#xff0c;人工智能&#xff08;AI&#xff09;宛如一位神秘而强大的魔法师&#xff0c;悄然 “入侵” 了我…

如何用SQL语句来查询表或索引的行存/列存存储方式|OceanBase 用户问题集锦

一、问题背景 自OceanBase 4.3.0版本起&#xff0c;支持了列存引擎&#xff0c;允许表和索引以行存、纯列存或行列冗余的形式创建&#xff0c;且这些存储方式可以自由组合。除了使用 show create table命令来查看表和索引的存储类型外&#xff0c;也有用户询问如何通过SQL语句…

程序员如何高效学习API

API&#xff08;应用程序编程接口&#xff09;是软件开发中的关键组件&#xff0c;它允许不同的软件应用程序相互通信、交换数据和功能。作为程序员&#xff0c;掌握API的学习和应用是提升开发效率和代码质量的重要途径。本文将详细介绍程序员如何高效学习API&#xff0c;包括基…

如何播放视频文件

文章目录 1. 概念介绍2. 使用方法2.1 实现步骤2.2 具体细节3. 示例代码4. 内容总结我们在上一章回中介绍了"如何获取文件类型"相关的内容,本章回中将介绍如何播放视频.闲话休提,让我们一起Talk Flutter吧。 1. 概念介绍 播放视频是我们常用的功能,不过Flutter官方…

Type-C单口便携显示器-LDR6021

Type-C单口便携显示器是一种新兴的显示设备&#xff0c;它凭借其便携性、高性能和广泛的应用场景等优势&#xff0c;正在成为市场的新宠。以下是Type-C单口便携显示器的具体运用方式&#xff1a; 一、连接与传输 1. **设备连接**&#xff1a;Type-C单口便携显示器通过Type-C接…