Spark中使用scala完成数据抽取任务 -- 总结

如题

在这里插入图片描述

任务二:离线数据处理,校赛题目需要使用spark框架将mysql数据库中ds_db01数据库的user_info表的内容抽取到Hive库的user_info表中,并且添加一个字段设置字段的格式 第二个任务和第一个的内容几乎一样。

在该任务中主要需要完成以下几个阶段:

  • 构建maven工程
  • 编写程序
    连接mysql数据库
    读取MySQL数据库中的数据
    在hive中新建数据库
    编写程序将读取到的数据处理之后导入到hive
  • 将程序打成jar包 通过scp命令传到集群中
  • 在集群中使用spark --submit命令执行jar包

构建maven项目

使用idea新建一个空项目,在pom.xml文件中引入相对应的依赖

在这里插入图片描述

踩坑点(1)
maven中的依赖主要是关于spark的依赖,这些依赖在引入的时候需要注意引入的版本需要与集群中的scala版本相对应

maven仓库链接 : 点击这里

在这里插入图片描述
这里就说明了可以使用的Scala版本,注意对应自己集群中的版本选择依赖

踩坑点(2):
该任务需要程序连接本地的mysql服务,所以需要引入java连接mysql数据库的第三方依赖(idea设置-项目结构-添加依赖选择下载好的mysql-connector-java.jar就可以):
在这里插入图片描述

但是在idea中使用maven构建工具打包jar包的时候会出现第三方依赖打不进jar包的情况,这是因为maven工程中需要引入一个插件

<build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>2.4.1</version><executions><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><transformers><transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"><mainClass>com.xxg.Main</mainClass></transformer></transformers></configuration></execution></executions></plugin></plugins></build>

使用该插件再次打包jar包,就不会出现打不进的情况

maven工程完整代码:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>org.example</groupId><artifactId>untitled</artifactId><version>1.0-SNAPSHOT</version><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding></properties><dependencies><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.12</artifactId><version>3.1.0</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-hive_2.12</artifactId><version>3.1.2</version><scope>provided</scope></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_2.12</artifactId><version>3.1.0</version><scope>provided</scope></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.16</version></dependency></dependencies><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>2.4.1</version><executions><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><transformers><transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"><mainClass>com.xxg.Main</mainClass></transformer></transformers></configuration></execution></executions></plugin></plugins></build></project>

编写程序

编写的程序中需要用到上文引入的那些依赖
程序的逻辑主要是
(1)与spark建立连接、启动相关配置
(2)启动hive的动态分区
(3)连接mysql
(4)取出mysql中所需字段并处理
(5)将处理结果存入Hive中

与spark建立连接

    val conf = new SparkConf().setAppName("子任务1:数据抽取").setMaster("local")val spark = SparkSession.builder().config(conf).enableHiveSupport().getOrCreate()spark.sparkContext.setLogLevel("OFF")

这段代码的目的是创建一个Spark配置对象,设置应用程序的名称为"子任务1:数据抽取",并设置Spark的主节点为"local"。然后,它使用这个配置来创建一个SparkSession对象,并启用Hive支持。最后,它设置SparkContext的日志级别为"OFF"。

启动动态分区

    //启用动态分区spark.conf.set("hive.exec.dynamic.partition.mode", "nonstrict")

这段代码是用于配置Apache Spark中的Hive动态分区模式。

在Hive中,动态分区可以在执行查询时动态地创建多个分区。这对于处理大量数据非常有用,因为它可以减少手动分区的需要。

连接mysql

    val url = "jdbc:mysql://192.168.96.33/ds_db01"//连接mysqlval jdbcDF = spark.read.format("jdbc").options(Map("driver" -> "com.mysql.cj.jdbc.Driver","url" -> url,"password" -> "123456","dbtable" -> "sku_info")).load()

注意:url需要以"jdbc:mysql://"开头,因为程序是使用jdbc这个第三方依赖访问mysql的。
在spark.read连接的时候也需要使用format()指定数据库驱动是jdbc,option的参数是map类型,
他需要指定依赖包的类名、密码和使用的数据表。

取出mysql中所需字段并处理

    jdbcDF.createTempView("ods") // 创建临时表val dataframe = spark.sql("""select * from ods;""".stripMargin)dataframe.show()val data = dataframe.withColumn("etl_date", date_format(date_sub(current_date(), 1), "yyyyMMdd"))data.show()

jdbcDF.createTempView(“ods”):这行代码创建了一个名为"ods"的临时视图。jdbcDF是一个DataFrame,它包含了从JDBC数据源中读取的数据(这里是mysql中的ds_db01库中的sku_info表)。createTempView方法将该DataFrame注册为一个临时表,以便在后续的SQL查询中使用。

val data = dataframe.withColumn(…):这行代码向dataframe中添加一个新列。新列的名称为"etl_date",值是通过对当前日期减去一天,并按照"yyyyMMdd"的格式进行格式化得到的。date_format和date_sub是Spark中用于日期处理的函数。新生成的DataFrame存储在变量data中。

将处理结果存入Hive中

data.write.format("hive").mode("append").partitionBy("etl_date").saveAsTable("ods.sku_info")
spark.sql("show partitions ods.sku_info").show()

这行代码的功能是:

  • data.write.format(“hive”):指定输出格式为Hive。
  • mode(“append”):如果目标表已经存在,则以追加模式写入数据,而不是覆盖现有数据。
  • partitionBy(“etl_date”):按“etl_date”列分区。
  • saveAsTable(“ods.sku_info”):将数据保存为名为“ods.sku_info”的Hive表。
  • spark.sql(“show partitions ods.sku_info”).show() :执行一个SQL查询来查看“ods.sku_info”表的分区信息

将程序打成jar包

程序编写完成之后,可以使用idea自带的maven构建工具把项目打包成jar包:
在这里插入图片描述
等待编译,由于引入了插件,所以会打出两个jar包,其中没有original的是我们需要的包。
在这里插入图片描述

文件传输

使用scp命令或者文件传输工具将该jar包发送到集群中的一台机器上

scp -r unitiled-1.0-SNAPSHOT.jar root@master:/opt/

在集群中使用spark --submit命令执行jar包

spark-submit --class org.example.Task.Task01  --master yarn --deploy-mode client /opt/untitled-1.0-SNAPSHOT.jar

运行这个命令的时候有两个踩坑点

踩坑点(3)
如果不把打包第三方依赖的maven引入。或者程序中没有成功指定jdbc类都会报这样的错,如果出现这样的问题可以检查这两个地方。
在这里插入图片描述
踩坑点(4)

在这里插入图片描述
出现这个问题,网上的结局办法为:
在这里插入图片描述
但是我尝试过并没有解决问题,我的解决方案为:
(1)在mysql中创建一个用户名为“”(空)的用户
(2)给这个用户所有的权限

grant all privileges on *.* to ""@"%";
flush privileges;

指令运行成功的结果为:
在这里插入图片描述
第二个任务和第一个任务完全一样,该个名字就可以。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/236730.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

[Java][File]使用mkdir以及CreateNewFile来制作游戏存档的分级目录+异常抛出机制

文件夹分类&#xff1a; Resource\\Image Resource\\Voice Resource\\VideoInformation\\Characters Information\\mobs Information\\BackpacksPlugin\\Serves Plugin\\Consumers文件的分类&#xff1a; Information\\Characters\\Male.txt Information\\Characters\\Female.t…

刷题记录第五十一天-去除重复字母

题目要求的是字典序最小的结果。只需要理解一点就是按大小顺序排列的字符串的字典序就是最小的&#xff0c;如“abcd”这种。 解题思路如下&#xff1a; 首先明确要使用栈结构&#xff0c;并且是从栈底到栈顶递增&#xff0c;要尽可能保证递增&#xff0c;这样就能保证字典序最…

前端项目常用函数封装(二)

文章目录 前端项目常用函数封装(一)判断两个数组是否有相同元素 返回相同元素&#xff08;数组&#xff09;判断hex颜色值是深色还是浅色随机生成深浅样色 js判断是手机端还是移动端使用UA判断使用媒体查询判断 fetch直接读文件内容&#xff0c;解决乱码问题下载文件将字符串下…

ansibe的脚本---playbook剧本(1)

playbook剧本组成部分&#xff1a; 1、task 任务&#xff1a; 主要是包含要在目标主机上的操作&#xff0c;使用模块定义操作。每个任务都是模块的调用。 2、variables变量&#xff1a;存储和传递数据。变量可自定义&#xff0c;可以在playbook中定义为全局变量&#xff0c;可…

深入理解 Spring Boot:核心知识与约定大于配置原则

深入理解 Spring Boot&#xff1a;核心知识与约定大于配置原则 简单说一下为什么要有 Spring Boot&#xff1f; 因为 Spring 的缺点。 虽然 Spring 的组件代码是轻量级的&#xff0c;但它的配置却是重量级的(需要大量 XML 配置) 为了减少配置文件&#xff0c;简化开发 Spri…

HarmonyOS应用事件打点开发指导

简介 传统的日志系统里汇聚了整个设备上所有程序运行的过程流水日志&#xff0c;难以识别其中的关键信息。因此&#xff0c;应用开发者需要一种数据打点机制&#xff0c;用来评估如访问数、日活、用户操作习惯以及影响用户使用的关键因素等关键信息。 HiAppEvent 是在系统层面…

手机数码品牌网站建设的作用是什么

手机数码产品几乎已经成为成年人必备的&#xff0c;包括手机、电脑、摄像机、键盘配件等&#xff0c;同时市场中相关企业也非常多&#xff0c;消费者可供选择的商品类型也很多样&#xff0c;而对企业来讲&#xff0c;只有不断提升品牌形象、获客拉新等才能不断提升企业地位&…

istio工作负载

目录 文章目录 目录本节实战前言1、WorkloadEntry多实例不同端口权重位置 2、WorkloadGroup关于我最后 本节实战 实战名称&#x1f6a9; 实战&#xff1a;WorkloadEntry测试-2023.12.21(测试成功) 前言 在之前的章节中我们已经多次提到了工作负载&#xff0c;在 Istio 中工作…

《Effective C++》条款42

了解typename的双重意义 作为类模板而言class和typename是一样的。 template <class T> class A {}; template <typename T> class A {}; template <typename C> class A {... private:C::const_iterator* x; }; 这段代码看起来没啥问题。因为我们已经知道它…

CSS中设置盒子大小-box-sizing,设置盒子阴影- box-shadow ,设置盒子圆角-border-radius,属性及其属性值

一&#xff1a;设置盒子大小-box-sizing 默认情况下&#xff1a;盒子可见宽的大小由内容区&#xff0c;内边距&#xff0c;边框共同决定 box-sizing 属性用来设置盒子尺寸的计算方式&#xff0c; 也就是 width/height 指的是谁 可选值&#xff1a; content-box 内容区 默认值…

机器学习之神经结构搜索(Neural Architecture Search,NAS)附代码

概念 神经结构搜索(Neural Architecture Search,NAS)是一种自动化机器学习技术,它旨在通过搜索神经网络的结构空间来找到最优的网络架构,以解决特定的任务。通常,这个搜索过程可以通过强化学习、进化算法、遗传算法或其他优化方法来完成。神经结构搜索的目标是提高神经网…

持续集成交付CICD:HELM 手动完成前端项目应用发布与回滚

目录 一、实验 1.环境 2.K8S master节点部署HELM3 3.K8S master节点安装git 4. Harbor镜像确认 5. HELM 手动完成前端项目应用发布与回滚 6.代码上传到GitLab 二、问题 1.Ingress中 path 的类型有何区别 2. HELM创建项目报错 一、实验 1.环境 &#xff08;1&#x…

智能优化算法应用:基于野狗算法3D无线传感器网络(WSN)覆盖优化 - 附代码

智能优化算法应用&#xff1a;基于野狗算法3D无线传感器网络(WSN)覆盖优化 - 附代码 文章目录 智能优化算法应用&#xff1a;基于野狗算法3D无线传感器网络(WSN)覆盖优化 - 附代码1.无线传感网络节点模型2.覆盖数学模型及分析3.野狗算法4.实验参数设定5.算法结果6.参考文献7.MA…

APP测试要点有哪些?本文已经给你梳理好了!

我们日常购物、旅游、支付等活动都离不开手机&#xff0c;由此衍生了很多APP。 比如每天使用频率非常高的微信、支付宝、微博、抖音、王者荣耀等等。 APP测试主要进行功能测试、性能测试、自动化测试、安全性测试、兼容性测试、专项测试。 01 APP测试流程 APP测试流程与web…

NLP论文阅读记录 - AAAI-23 | 01 Cogito Ergo Summ:通过语义解析图和一致性奖励对生物医学论文进行抽象总结

文章目录 前言0、论文摘要一、Introduction1.1目标问题1.2相关的尝试1.3本文贡献 二.相关工作2.1抽象概括2.2图增强摘要2.3 抽象概括的强化学习 三.本文方法COGITOERGOSUMM 框架3.1 问题陈述3.2 图表构建**事件图****AMR 图****图合并和重新连接**Model文本编码器图编码器解码器…

机器学习--线性回归

目录 监督学习算法 线性回归 损失函数 梯度下降 目标函数 更新参数 批量梯度下降 随机梯度下降 小批量梯度下降法 数据预处理 特征标准化 正弦函数特征 多项式特征的函数 数据预处理步骤 线性回归代码实现 初始化步骤 实现梯度下降优化模块 损失与预测模块 …

在Linux Docker中部署RStudio Server,实现高效远程访问

&#x1f308;个人主页&#xff1a;聆风吟 &#x1f525;系列专栏&#xff1a;网络奇遇记、Cpolar杂谈 &#x1f516;少年有梦不应止于心动&#xff0c;更要付诸行动。 文章目录 &#x1f4cb;前言一. 安装RStudio Server二. 本地访问三. Linux 安装cpolar四. 配置RStudio serv…

图像卷积操作

目录 一、互相关运算 二、卷积层 三、图像中目标的边缘检测 四、学习卷积核 五、特征映射和感受野 一、互相关运算 严格来说&#xff0c;卷积层是个错误的叫法&#xff0c;因为它所表达的运算其实是互相关运算&#xff08;cross-correlation&#xff09;&#xff0c;而不是…

软考学习五部曲

视频学知识 学习知识环节看视频看书都可以&#xff0c;书很厚一本。如果要看完的话要很多时间&#xff0c;所以我觉得还是看视频更快一点&#xff0c;而且视频还可以倍速。我看的那个视频我觉得非常不错&#xff0c;但是我看的视频b站已经下架了看不到了。其他的视频没仔细去看…

原生Android项目中引入Flutter并实现android 与 flutter 之间的通信

前提条件&#xff1a; 完成Flutter安装与环境搭建 一、原生Android项目中引入Flutter 1、在Android项目中&#xff0c;添加Flutter支持的体系结构过滤器 项目 - > app -> build.gradle ...... defaultConfig {......ndk {// Flutter支持的体系结构过滤器abiFilters a…