Spark 增量抽取 Mysql To Hive

题目要求:

  1. 抽取ds_db01库中customer_inf的增量数据进入Hive的ods库中表customer_inf。根据ods.user_info表中modified_time作为增量字段,只将新增的数据抽入,字段名称、类型不变,同时添加静态分区,分区字段为etl_date,类型为String,且值为当前日期的前一天日期(分区字段格式为yyyyMMdd)。使用hive cli执行show partitions ods.customer_inf命令;

代码实现: 

package org.exampleimport org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSessionimport java.time.LocalDateobject Demo {def main(args: Array[String]): Unit = {
//    创建sparkval conf = new SparkConf().setMaster("local[*]").setAppName("one").set("spark.testing.memory", "2147480000").set("dfs.client.use.datanode.hostname", "true")System.setProperty("HADOOP_USER_NAME", "root")// 连接hive
val spark = SparkSession.builder()// 配置 Hive Metastore 的连接地址.config("hive.metastore.uris", "thrift://192.168.23.60:9083")// 配置 Hive 数据仓库的存储位置.config("hive.metastore.warehouse", "hdfs://192.168.23.60://9000/user/hive/warehouse")// 配置 Spark SQL 的存储分配策略为 "LEGACY".config("spark.sql.storeAssignmentPolicy", "LEGACY")// 添加其他自定义的 Spark 配置.config(conf)// 启用对 Hive 的支持,使得可以使用 Hive 的表和查询.enableHiveSupport()// 创建 SparkSession 对象.getOrCreate()//连接mysqlspark.read.format("jdbc").option("url","jdbc:mysql://192.168.23.60:3306/ds_db01??characterEncoding=UTF-8").option("driver","com.mysql.jdbc.Driver").option("user","root").option("password","123456").option("dbtable","customer_inf").load().createOrReplaceTempView("v")  //对该表创建视图spark.sql("select * from v")//    获取当天时间的前一天
val unit = java.time.LocalDate.now().plusYears(-1).plusMonths(-1).plusDays(-1).toString().replace("-", "")val unit1 = unit.toInt
//全量抽取
//    spark.sql(
//      s"""
//         |insert overwrite table gh_test.customer_inf
//         |partition (etl_date="${unit}")
//         |select * from v
//         |
//         |""".stripMargin).show()
//
//spark.sql("select * from gh_test.customer_inf").show//将modified_time类型转换为yyyyMMddspark.sql(s"""|select  customer_inf_id,customer_id,customer_name,identity_card_type,identity_card_no,mobile_phone,|customer_email,gender,customer_point,register_time,birthday,customer_level,customer_money,|from_unixtime(unix_timestamp(modified_time,'yyyy-MM-dd'),'yyyyMMdd') as modified_time|from v|""".stripMargin).createOrReplaceTempView("v1")
//      spark.sql("select  count(*) from gh_test.customer_inf").show//从mysql中增量抽取到hivespark.sql(s"""|insert overwrite table gh_test.customer_inf|partition (etl_date="${unit}")|select * from v where  modified_time>"${unit1}"|""".stripMargin).show()//  spark.sql("select * from gh_test.customer_inf").show//    查询抽取后的条数spark.sql("select  count(*) from gh_test.customer_inf").show
//    spark.sql("desc gh_test.customer_inf")}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/76242.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

SpringCloud(二)

1.Nacos配置管理 Nacos除了可以做注册中心,同样可以做配置管理来使用。 1.1.统一配置管理 当微服务部署的实例越来越多,达到数十、数百时,逐个修改微服务配置就会让人抓狂,而且很容易出错。我们需要一种统一配置管理方案&#…

NIFI实现数据库数据增量同步

说明 nifi版本:1.23.2(docker镜像) 需求背景 将数据库中的数据同步到另一个数据库中,要求对于新增的数据和历史有修改的数据进行增量同步 模拟数据 建表语句 源数据库和目标数据库结构要保持一致,这样可以避免后…

固定资产管理数据怎么算?

在企业的运营中,固定资产的管理是一个至关重要的环节。然而,对于许多企业来说,理解和管理这些资产的数据却常常是一团迷雾。那么,固定资产管理数据究竟应该如何计算呢?这是一个需要我们深入探讨的问题。  我们需要明…

MySQL——命令行客户端的字符集问题

原因:服务器端认为你的客户端的字符集是utf-8,而实际上你的客户端的字符集是GBK。 查看所有字符集:SHOW VARIABLES LIKE character_set_%; 解决方案,设置当前连接的客户端字符集 “SET NAMES GBK;”

Android12之/proc/pid/status参数含义(一百六十五)

简介: CSDN博客专家,专注Android/Linux系统,分享多mic语音方案、音视频、编解码等技术,与大家一起成长! 优质专栏:Audio工程师进阶系列【原创干货持续更新中……】🚀 人生格言: 人生…

python sorted函数详解2023.9.11

sorted函数详解 1. 输入和输出2. key传入函数 1. 输入和输出 help(sorted) Help on built-in function sorted in module builtins: sorted(iterable, /, *, keyNone, reverseFalse)Return a new list containing all items from the iterable in ascending order.A custom k…

Redis监控工具_RedisLive

Redis监控工具_RedisLive Redis安装请看: MacBook安装Redis redis集群搭建_亲自操作 RedisLive安装 RedisLive是由python编写的并且开源的图形化监控工具,非常轻量级,核心服务部分只包含一个web服务和一个基于redis自带的info命令以及monitor命令的…

php://filter协议在任意文件读取漏洞(附例题)

php://filter php://fiter 中文叫 元器封装,咱也不知道为什么这么翻译,目前我的理解是可以通过这个玩意对上面提到的php IO流进行处理,及现在可以对php的 IO流进行一定操作。 过滤器:及通过php://filter 对php 的IO流进行的具体…

微服务之流控、容错组件sentinel

背景 2012年阿里巴巴研发的流量治理组件,核心功能流控、容错 有什么功能 流量控制 流量控制 网关控制 黑白名单 熔断降级 熔断 保护分布式系统防止因为调用下有服务时产生故障或者请求超时等异常影响上游服务,使用熔断方案,类似断路器…

hive中的索引

使用索引前的配置 在使用Hive索引之前,需要进行一些配置,以确保索引能够正常工作。以下是一些常见的配置步骤: Hive配置 在Hive中启用索引功能,需要在Hive配置文件(hive-site.xml)中设置以下属性&#x…

T2I-Adapter:增强文本到图像生成的控制能力

链接:GitHub - TencentARC/T2I-Adapter: T2I-Adapter 文本到图像生成 (T2I) 是人工智能领域的一个重要研究方向。近年来,随着深度学习技术的发展,T2I 技术取得了显著进展,生成的图像在视觉效果上已经与真实图像难以区分。 然而&…

ILS解析漏洞复现

搭建好ILS后,访问127.0.0.1:8000 写一个phpinfo的脚本 可以看到。现在是不能访问的 赋予 IIS 解析 phpinfo 能力 打开服务器管理器,打开 IIS 管理器 点击处理程序映射 再次访问,发现程序可以访问 将index.php改为index.png 此时php脚本自然是…

【pdf密码】如何限制他人对PDF文件编辑?

制作好的PDF文件,先要设置一个密码防止他人对文件进行编辑,那么我们可以对PDF文件设置限制编辑,设置方法很简单,我们在PDF编辑器中点击文件 – 属性 – 安全,在权限下拉框中选中【密码保护】 然后在密码保护界面中&…

LeetCode_贪心算法_困难_630.课程表 III

目录 1.题目2.思路3.代码实现(Java) 1.题目 这里有 n 门不同的在线课程,按从 1 到 n 编号。给你一个数组 courses ,其中 courses[i] [durationi, lastDayi] 表示第 i 门课将会持续上 durationi 天课,并且必须在不晚于…

查看创建好的数据库

MySQL从小白到总裁完整教程目录:https://blog.csdn.net/weixin_67859959/article/details/129334507?spm1001.2014.3001.5502 语法格式: show create database 数据库名称; 案列:查看testing数据库信息 mysql> show create database testing; ------------------------…

SpringMVC相关知识点

1.Spring MVC的理解? 首先,MVC模型是模型,视图,控制器的简写,其思想核心是通过将请求处理控制,业务逻辑,数据封装,数据显示等流程节点分离的思想来组织代码。 所以,MVC是…

华为星闪联盟:引领无线通信技术创新的先锋

星闪(NearLink),是由华为倡导并发起的新一代无线短距通信技术,它从零到一全新设计,是为了满足万物互联时代个性化、多样化的极致、创新体验需求而诞生的。这项技术汇聚了中国300多家头部企业和机构的集体智慧&#xff…

【STM32】FSMC—扩展外部 SRAM 初步使用 1

基于野火指南者《零死角玩转 STM32F103—指南者》的学习 STM32F103系列 FSMC Flexible Static Memory Controller简介 1.详细功能参看《STM32F10x参考手册》,这边是概述 是一个外设,挂载在AHB总线下。 可以用于驱动包括 SRAM、NOR FLASH 以及 NAND FL…

C#自定义控件组件实现Chart图表(多Y轴,选择图例加粗,选择放大,缩放,点击查看信息等功能)

先看看ECharts的效果 C# 工具箱里的Chart控件就不演示了,很多效果没办法做出来,做出来效果也很不理想。所以,需要自己去手动实现工具箱里的Chart没办法实现的效果; 先看看实现后的效果 绑定数据 点击图表 点击右侧图例加粗 选择放大 右键 点击缩小,恢复

RJ45水晶头网线顺序出错排查

线序 网线水晶头RJ45常用的线序标准ANSI / TIA-568定义了T568A与T568B两种线序,一般使用T568B,水晶头8个孔对应的8条线颜色如下图: 那1至8的编号,是从水晶头哪一面为参考呢,如下图,是水晶头金手指一面&am…