【Spark-HDFS小文件合并】使用 Spark 实现 HDFS 小文件合并

【Spark-HDFS小文件合并】使用 Spark 实现 HDFS 小文件合并

  • 1)导入依赖
  • 2)代码实现
    • 2.1.HDFSUtils
    • 2.2.MergeFilesApplication

需求描述:

1、使用 Spark 做小文件合并压缩处理。

2、实际生产中相关配置、日志、明细可以记录在 Mysql 中。

3、core-site.xml、hdfs-site.xml、hive-site.xml、yarn-site.xmlx 等文件放在项目的 resources 目录下进行认证。

4、下面的案例抽取出了主体部分的代码,具体实现时需要结合 HDFS 工具类,利用好 Mysql 做好配置、日志、以及相关明细,结合各自业务进行文件合并。

1)导入依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>test.cn.suitcase</groupId><artifactId>mergefiles</artifactId><version>4.0.0</version><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding><java.version>1.8</java.version><maven.compiler.source>1.8</maven.compiler.source><maven.compiler.target>1.8</maven.compiler.target><encoding>UTF-8</encoding>
<!--        <spark.version>3.0.2</spark.version>--><spark.version>2.4.8</spark.version><scala.version>2.11.12</scala.version></properties><dependencies><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-core</artifactId><version>2.20.0</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-common</artifactId><version>3.3.2</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-client --><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>3.3.2</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-hdfs --><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-hdfs</artifactId><version>3.3.2</version></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-api</artifactId><version>2.20.0</version></dependency><dependency><groupId>org.scala-lang</groupId><artifactId>scala-library</artifactId><version>${scala.version}</version></dependency><dependency><groupId>org.scala-lang</groupId><artifactId>scala-compiler</artifactId><version>${scala.version}</version></dependency><dependency><groupId>org.scala-lang</groupId><artifactId>scala-reflect</artifactId><version>${scala.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.11</artifactId><version>${spark.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-launcher_2.11</artifactId><version>${spark.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_2.11</artifactId><version>${spark.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-hive_2.11</artifactId><version>${spark.version}</version><scope>provided</scope></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>2.0.32</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.33</version></dependency><dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-core</artifactId><version>2.14.2</version></dependency></dependencies><build><plugins><!-- Java Compiler --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.1</version><configuration><source>${java.version}</source><target>${java.version}</target></configuration></plugin><!-- We use the maven-shade plugin to create a fat jar that contains all necessary dependencies. --><!-- Change the value of <mainClass>...</mainClass> if your program entry point changes. --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>3.0.0</version><executions><!-- Run shade goal on package phase --><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><artifactSet><excludes><exclude>org.apache.flink:force-shading</exclude><exclude>com.google.code.findbugs:jsr305</exclude><exclude>org.slf4j:*</exclude><exclude>org.apache.logging.log4j:*</exclude></excludes></artifactSet><filters><filter><!-- Do not copy the signatures in the META-INF folder.Otherwise, this might cause SecurityExceptions when using the JAR. --><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters></configuration></execution></executions></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-surefire-plugin</artifactId><version>2.22.1</version><configuration><groups>IntegrationTest</groups></configuration></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.1</version><configuration><source>${java.version}</source><target>${java.version}</target></configuration></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-surefire-plugin</artifactId><version>2.22.1</version></plugin></plugins></build></project>

2)代码实现

2.1.HDFSUtils

public class HDFSUtils {private static Logger logger = LoggerFactory.getLogger(HDFSUtils.class);private static final Configuration hdfsConfig = new Configuration();private static FileSystem fs;public static void init() {System.out.println(Thread.currentThread().getContextClassLoader());try {hdfsConfig.addResource(Thread.currentThread().getContextClassLoader().getResource("./core-site.xml"));hdfsConfig.addResource(Thread.currentThread().getContextClassLoader().getResource("./hdfs-site.xml"));fs = FileSystem.get(hdfsConfig);} catch (FileNotFoundException fnfe) {fnfe.printStackTrace();logger.error("Load properties failed.");} catch (IOException ioe) {ioe.printStackTrace();logger.error(String.format("IOException: " + ioe.getMessage()));}}public static long getDirectorySize(String directoryPath) {final Path path = new Path(directoryPath);long size = 0;try {size = fs.getContentSummary(path).getLength();} catch (IOException ex) {}return size;}public static long getFileCount(String directoryPath) {final Path path = new Path(directoryPath);long count = 0;try {count = fs.getContentSummary(path).getFileCount();} catch (IOException ex) {}return count;}public static long getBlockSize() {return fs.getDefaultBlockSize(fs.getHomeDirectory());}public static String getFile(String filePath) {final Path path = new Path(filePath);FSDataInputStream dis = null;String fileName = null;try {if (fs.exists(path) && fs.isFile(path)) {dis = fs.open(path);StringWriter stringWriter = new StringWriter();IOUtils.copy(dis, stringWriter, "UTF-8");fileName = stringWriter.toString();return fileName;} else {throw new FileNotFoundException();}} catch (IOException ioException) {logger.error("Get file from hdfs failed: " + ioException.getMessage());} finally {if (dis != null) {try {dis.close();} catch (IOException ex) {logger.error("close FSDataInputStream failed: " + ex.getMessage());}}}return fileName;}public static Boolean exists(String filePath) {Path path = new Path(filePath);Boolean ifExists = false;try {ifExists = fs.exists(path);return ifExists;} catch (IOException ex) {logger.error(String.format("hdfs file %s not exists", filePath));}return ifExists;}public static boolean renameDir(String existingName, String newName) {final Path existingPath = new Path(existingName);final Path finalName = new Path(newName);try {if (exists(newName)) {logger.error(String.format("Path %s already exists when try to rename %s to %s.", newName, existingName, newName));return false;}return fs.rename(existingPath, finalName);} catch (IOException ex) {logger.error("Rename hdfs directory failed: " + ex.getMessage());}return false;}public static boolean removeDirSkipTrash(String dir) {Path path = new Path(dir);boolean rv = false;try {if (exists(dir)) {if (fs.delete(path, true)) {logger.info(String.format("文件夹 %s 删除成功.", path));rv = true;}} else {logger.error(String.format("要删除的文件夹 %s 不存在", dir));return false;}} catch (IOException ex) {logger.error("文件夹 %s 存在但是删除失败");}return rv;}public static List<String> listDirs(String baseDir) {Path path = new Path(baseDir);List<String> dirs = new ArrayList<>();try {FileStatus[] fileStatuses = fs.globStatus(path);for (int i = 0; i < fileStatuses.length; i++) {dirs.add(fileStatuses[i].getPath().toUri().getRawPath());}}} catch (Exception ex) {logger.error(String.format("List directories under %s failed.", baseDir));}return dirs;}public static void close() {try {fs.close();} catch (IOException ex) {logger.error("hdfs file system close failed: " + ex.getMessage());}}}

2.2.MergeFilesApplication

下面的案例抽取出了主体部分的代码,具体实现时需要结合 HDFS 工具类,利用好 Mysql 做好配置、日志、以及相关明细,结合各自业务进行文件合并。

public class MergeFilesApplication {public static void main(String[] args) {System.out.println(Arrays.asList(args));//指定hadoop用户System.setProperty("HADOOP_USER_NAME", "hdfs");System.setProperty("user.name", "hdfs");//获取 SparkSession 对象SparkSession sparkSession = SparkSession.builder().config("spark.scheduler.mode", "FAIR")//配置调度模式.config("spark.sql.warehouse.dir", "/warehouse/tablespace/external/hive")//配置warehouse目录.appName("MergeFilesApplication").getOrCreate();//合并文件sparkSession.read()//spark读取.parquet(sourceDir)//读取数据源目录.coalesce(partitions)//配置spark分区数.sortWithinPartitions("col1", "col2")//每个分区内按照指定需要的列进行排序.write()//spark写入.mode(SaveMode.Append)//写入模式为追加.option("compression", "gzip")//压缩方式以为gzip.parquet(targetMergedDir);//写入目标目录}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/579161.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

JAVA日志

日志 Slf4j slf4j 的全称是 Simple Loging Facade For Java&#xff0c;即它仅仅是一个为 Java 程序提供日志输出的统一接口&#xff0c;并不是一个具体的日志实现方案&#xff0c;就比如 JDBC 一样&#xff0c;只是一种规则而已。所以单独的 slf4j 是不能工作的&#xff0c;…

如何使用 Matplotlib 绘制 3D 圣诞树

系列文章目录 前言 转自&#xff1a;How to draw a 3D Christmas Tree with Matplotlib | by Timur Bakibayev, Ph.D. | Analytics Vidhya | Mediumhttps://medium.com/analytics-vidhya/how-to-draw-a-3d-christmas-tree-with-matplotlib-aabb9bc27864 因为我们把圣诞树安装…

Sql 动态行转列

SELECT ID, Name, [Month],auth FROM dbo.Test3 数据列表&#xff1a; 1.静态行专列 Select auth, MAX( CASE WHEN [Month] 一月 then Name else null end) 一月, MAX( CASE WHEN [Month] 二月 then Name else null end) 二月, MAX( CASE WHEN…

零基础学Java第一天

1.什么是Java Java是一门编程语言 思考问题&#xff1a; 人和人沟通? 中文 英文 人和计算机沟通&#xff1f; 计算机语言&#xff1a; C C C# php python 2. Java诞生 前身叫Oak&#xff08;橡树&#xff09; 目前最流行的版本还是JDK8 3.Java三大平台体系 JavaSE&#xff08…

2312llvm,用匹配器构建clang工具

原文 用LibTooling和LibASTMatchers构建工具 这里展示如何基于Clang的LibTooling构建有用的源到源翻译工具.基础 步骤0:取Clang 因为Clang是LLVM项目的一部分,因此你需要先下载LLVM的源码.Clang和LLVM都在同一个git仓库中,在不同的目录下.更多见入门指南. cd ~/clang-llvm…

(企业 / 公司项目)微服务OpenFeign怎么实现服务间调用?(含面试题)

Feign: 远程调用组件使用步骤&#xff0c;理解上面的图  后台系统中, 微服务和微服务之间的调用可以通过Feign组件来完成.  Feign组件集成了Ribbon负载均衡策略(默认开启的, 使用轮询机制),Hystrix熔断器 (默认关闭的, 需要通过配置文件进行设置开启)  被调用的微服务…

【AI服饰】孔雀背景服装_AIGC服饰订制设计咨询产业

服饰系列 AIGC&#xff08;Artificial Intelligence Generated Content&#xff09;服饰图是指通过人工智能生成的服装设计图案。随着人工智能技术的不断进步&#xff0c;AIGC服饰图在未来有着广阔的发展空间。 首先&#xff0c;AIGC服饰图可以提供更多的设计可能性。传统的服…

TypeScript学习(面试篇)

在当今的 Web 开发世界中&#xff0c;TypeScript 作为一种强大的工具为自己赢得了一席之地&#xff0c;它弥补了 JavaScript 的灵活性和静态类型语言的鲁棒性之间的差距&#xff08;至少在 JavaScript 实现自己的类型之前&#xff09;。 随着技术格局的不断发展&#xff0c;对…

【负载均衡】Keepalived 高可用详解

1、Keepalived介绍 ​ Keepalived是一个基于VRRP协议来实现LVS服务高可用方案,可以利用其来避免单点故障。一个LVS服务会使用2台服务器运行Keepalived,一台为主服务器MASTER,另一台为备份服务器BACKUP,但是对外表现为一个虚拟IP,主服务器会发送特定的消息给备份服务器,当…

NI VeriStand中的硬件I / O延迟时间

NI VeriStand中的硬件I / O延迟时间 - NI 适用于 软件 VeriStand 问题详述 在我的VeriStand项目中&#xff0c;我要从DAQ或FPGA硬件中获取数据&#xff0c;在模型中处理输出&#xff0c;然后输出数据。在硬件输入和输出之间&#xff0c;我应该期望什么样的延迟&#xff1f;如…

工作实践篇 Flink(一:flink提交jar)

一&#xff1a;参数 flink 模式 – standalone 二&#xff1a;步骤 1. 将本地测试好的代码进行本地运行。确保没问题&#xff0c;进行打包。 2. 找到打好的jar包&#xff0c;将jar包上传到对应的服务器。 3. 执行flink命令&#xff0c;跑代码。 /opt/flink/flink-1.13.6/bi…

连续语义分割(CSS)24种最新经典方法汇总,包含数据回放、自监督、正则化等5个细分方向

连续语义分割&#xff08;CSS&#xff09;是计算机视觉中的一个新兴领域&#xff0c;其基本任务是在某一时刻学习预测特定类别的图像分割&#xff0c;并在随后需要的时候连续增加学习类别的数量&#xff0c;同时保持对已有类别的分割能力。这个过程中需要解决的主要挑战包括灾难…

2023年全国职业院校技能大赛网络系统管理网络模块 运维服务器配置

(五)网络运维配置 1.完成整网连通后,进入网络监控运维阶段,运维软件已安装在PC的虚拟机中,通过运维平台监控拓扑中所有网络设备(AP除外)。考试现场提供运维平台登陆的用户名密码信息。 其他的如上使用设备管理地址 2.通过运维平台将被监控设备纳入监控范围;通过拓扑配…

前端实现H265编码的m3u8视频流播放

前言 视频监控是智慧城市、智慧园区等WebGIS类系统中最为常见的硬件对接设备&#xff0c;最常用的监控视频流格式为m3u8格式&#xff0c;但是m3u8格式通常都是h.265编码格式的&#xff0c;我搜遍了几乎所有前端视频播放插件&#xff0c;几乎普通的播放器插件都不支持h.265格式…

Spring Boot整合MyBatis-Plus框架快速上手

最开始&#xff0c;我们要在Java中使用数据库时&#xff0c;需要使用JDBC&#xff0c;创建Connection、ResultSet等&#xff0c;然后我们又对JDBC的操作进行了封装&#xff0c;创建了许多类似于DBUtil等工具类。再慢慢的&#xff0c;出现了一系列持久层的框架&#xff1a;Hiber…

记一次接口交互is开头的属性序列化后“is”丢失问题

问题背景&#xff1a; 今天在做项目联调时调用别人的第三方接口时&#xff0c;发现字段传递不对导致参数传递异常的问题&#xff0c;当时还很奇怪&#xff0c;明白传好着呢&#xff0c;怎么就好端端的出现字段不对的情况呢&#xff1f; 查看发现该字段为boolean类型的isIsRef…

第39节: Vue3 表单输入绑定及修饰符

在UniApp中使用Vue3框架时&#xff0c;你可以使用表单输入绑定来将用户输入与组件的数据进行绑定。以下是一个示例&#xff0c;演示了如何在UniApp中使用Vue3框架使用表单输入绑定&#xff1a; <template> <view> <input v-model"message" type&qu…

内网离线搭建之----kafka集群

1.系统版本 虚拟机192.168.9.184 虚拟机192.168.9.185 虚拟机192.168.9.186系统 centos7 7.6.1810 2.依赖下载 ps&#xff1a;置顶资源里已经下载好了&#xff0c;直接用&#xff01;&#xff01;&#xff01;&#xff01;&#xff01;&#xff01;&#xff01;&#xff01;…

Linux下安装MySQL

Linux下安装MySQL 下载地址&#xff1a;https://dev.mysql.com/downloads/mysql/5.7.html#downloads 解压 tar -xvf mysql-5.7.26-linux-glibc2.12-x86_64.tar 再移动并重命名一下 mv mysql-5.7.26-linux-glibc2.12-x86_64 /usr/local/mysql创建mysql用户组和用户并修改权限…

【回溯】符号三角形问题Python实现

文章目录 [toc]问题描述回溯法时间复杂性Python实现 个人主页&#xff1a;丷从心 系列专栏&#xff1a;回溯法 问题描述 下图是由 14 14 14个“ ”和 14 14 14个“ − - −”组成的符号三角形&#xff0c; 2 2 2个同号下面都是” “&#xff0c; 2 2 2个异号下面都是“ −…