基于AWS Serverless的Glue服务进行ETL(提取、转换和加载)数据分析(二)——数据清洗、转换

2 数据清洗、转换

此实验使用S3作为数据源

ETL:

E    extract         输入
T    transform     转换
L    load             输出

大纲

  • 2 数据清洗、转换
    • 2.1 架构图
    • 2.2 数据清洗
    • 2.3 编辑脚本
      • 2.3.1 连接数据源(s3)
      • 2.3.2. 数据结构转换
      • 2.3.2 数据结构拆分、定义
      • 2.3.3 清洗后的数据写入新s3
      • 2.3.4 运行作业
    • 2.4 数据分区
      • 2.4.1 编辑脚本
      • 2.4.2 运行脚本
    • 2.5 总结

2.1 架构图

在这里插入图片描述

2.2 数据清洗

此步会将S3中的原始数据清洗成我们想要的自定义结构的数据。之后,我们可通过APIGateway+Lambda+Athena来实现一个无服务器的数据分析服务。

步骤图例
1、入口在这里插入图片描述
2、创建Job(s3作为数据源,则Type选择Spark,若为Kinesis等,选择Stream Spark)在这里插入图片描述
3、IAM角色需要有s3与Glue的权限在这里插入图片描述
4、选择s3脚本位置,若已经完成脚本的编写工作,则可以选择第二项或第三项,若无则Glue会提供默认脚本在这里插入图片描述
5、安全配置参数在这里插入图片描述建议:添加参数–enable-auto-scaling为true。每次在我们执行Job任务时,会根据运行 ETL 任务的数据处理单元(DPU)的个数来分配动态IP,在我们子网的动态IP数低于DPU数时,Job将会执行失败。此参数将会动态分配IP。
6、数据源()在这里插入图片描述
7、数据目标(我们会将清洗后的数据存储到新的s3桶)在这里插入图片描述
8、设计架构(在本案例中,我们会自定义脚本。所以不再在此处设计架构)(此处设计后,脚本会自动生成相关代码)在这里插入图片描述
9、保存在这里插入图片描述

2.3 编辑脚本

脚本中的args参数的键值需要从Job的安全配置参数中定义

2.3.1 连接数据源(s3)

#数据源
datasource = glueContext.create_dynamic_frame.from_catalog(database = args['db_name'], table_name = tableName, transformation_ctx = "datasource")

2.3.2. 数据结构转换

mapped_readings = ApplyMapping.apply(frame = datasource, mappings = [("lclid", "string", "meter_id", "string"), \("datetime", "string", "reading_time", "string"), \("KWH/hh (per half hour)", "double", "reading_value", "double")], \transformation_ctx = "mapped_readings")

2.3.2 数据结构拆分、定义

mapped_readings_df = DynamicFrame.toDF(mapped_readings)mapped_readings_df = mapped_readings_df.withColumn("obis_code", lit(""))
mapped_readings_df = mapped_readings_df.withColumn("reading_type", lit("INT"))reading_time = to_timestamp(col("reading_time"), "yyyy-MM-dd HH:mm:ss")
mapped_readings_df = mapped_readings_df \.withColumn("week_of_year", weekofyear(reading_time)) \.withColumn("date_str", regexp_replace(col("reading_time").substr(1,10), "-", "")) \.withColumn("day_of_month", dayofmonth(reading_time)) \.withColumn("month", month(reading_time)) \.withColumn("year", year(reading_time)) \.withColumn("hour", hour(reading_time)) \.withColumn("minute", minute(reading_time)) \.withColumn("reading_date_time", reading_time) \.drop("reading_time")

2.3.3 清洗后的数据写入新s3

# write data to S3
filteredMeterReads = DynamicFrame.fromDF(mapped_readings_df, glueContext, "filteredMeterReads")s3_clean_path = "s3://" + args['clean_data_bucket']glueContext.write_dynamic_frame.from_options(frame = filteredMeterReads,connection_type = "s3",connection_options = {"path": s3_clean_path},format = "parquet",transformation_ctx = "s3CleanDatasink")

2.3.4 运行作业

    执行成功后,状态将变为"SUCCESS",失败将会给出失败信息,可在CloudWatch 中查看详情

在这里插入图片描述

在这里插入图片描述


清洗后的数据保存到了s3


在这里插入图片描述
数据清洗完毕后,可通过上一篇中的爬网程序步骤,将清洗后的数据的结构创建表到数据目录中,
此时我们可以使用Athena对清洗后的数据进行分析。

2.4 数据分区

接下来我们对数据进行分区处理(此处只提供了按天分区
重新进行数据清洗中的创建Job操作后,重写脚本

2.4.1 编辑脚本

连接数据源。表为上一步最后重新爬取生成的新表。

cleanedMeterDataSource = glueContext.create_dynamic_frame.from_catalog(database = args['db_name'], table_name = tableName, transformation_ctx = "cleanedMeterDataSource")

根据type与data_str分区

business_zone_bucket_path_daily = "s3://{}/daily".format(args['business_zone_bucket'])businessZone = glueContext.write_dynamic_frame.from_options(frame = cleanedMeterDataSource, \connection_type = "s3", \connection_options = {"path": business_zone_bucket_path_daily, "partitionKeys": ["reading_type", "date_str"]},\format = "parquet", \transformation_ctx = "businessZone")

2.4.2 运行脚本

分区后的数据结果:
在这里插入图片描述
再次创建、运行爬网程序,将会在数据目录中生成新的分区表。

2.5 总结

到这一步,我们已经使用Glue ETL对s3桶中的数据进行了清洗、分区操作。在进行上篇中的Athena操作后,我们已经可以通过Athena直接查询到清洗、分区后的数据集了。
接下来,我们会通过使用APIGateway+Lambda+Athena来构建一个无服务器的数据查询分析服务。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/208093.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

FFmpeg开发笔记(六)如何访问Github下载FFmpeg源码

学习FFmpeg的时候,经常要到GitHub下载各种开源代码,比如FFmpeg的源码页面位于https://github.com/FFmpeg/FFmpeg。然而国内访问GitHub很不稳定,经常打不开该网站,比如在命令行执行下面的ping命令。 ping github.com 上面的ping结…

初识Linux:权限(1)

目录 提示:以下指令均在Xshell 7 中进行 Linux 的权限 内核: 查看操作系统版本 查看cpu信息 查看内存信息 外部程序: 用户: 普通用户变为超级用户: su 和 su-的区别: root用户变成普通用户&#…

KALI LINUX信息收集

预计更新 第一章 入门 1.1 什么是Kali Linux? 1.2 安装Kali Linux 1.3 Kali Linux桌面环境介绍 1.4 基本命令和工具 第二章 信息收集 1.1 网络扫描 1.2 端口扫描 1.3 漏洞扫描 1.4 社交工程学 第三章 攻击和渗透测试 1.1 密码破解 1.2 暴力破解 1.3 漏洞利用 1.4 …

什么是SSL证书?

当我们网上购物或银行业务时,为了安全起见,我们希望看到网站的地址栏上有“HTTPS”和安全锁图标。但是这个“HTTPS”和锁定图标实际上意味着什么?要回答这些问题,我们需要了解 HTTPS、SSL 协议和 SSL 证书。 关于HTTPS、SSL和SSL…

风控反欺诈安全学习路标

1. 金融和支付领域知识 - 了解金融和支付领域的基本概念、业务流程和风险特点。 - 学习金融机构的监管要求和合规措施,如KYC(了解你的客户)和AML(反洗钱)。 2. 数据分析和挖掘技术 - 学习数据分析和数据挖掘的基本原理…

fastadmin获取关联表数据select渲染

php public function piliangadd(){if (false === $this->request->isPost()) {$fenlei_list = Db::name(fenlei)->order(weigh desc)->select();$this</

每天五分钟计算机视觉:稠密连接网络(DenseNet)

本文重点 在前面的课程中我们学习了残差网络ResNet,而DenseNet可以看成是ResNet的后续,我们看一下图就可以看出二者的主要区别了。 特点 DenseNet是一种卷积神经网络,它的特点是每一层都直接连接到所有后续层。这意味着,每一层都接收来自前一层的输出,并将其作为输入传递…

Flyway——Oracle创建前缀索引

文章目录 前言创建一般索引的语法前缀索引 前言 索引有助于提升数据库表的查询速率&#xff0c;极大的缩减查询的时间。但索引的创建需要考虑的因素很多&#xff0c;并非索引越多越好&#xff01; 创建一般索引的语法 oracle创建一般的常见索引&#xff0c;语法如下所示&…

n个人排成一圈,数数123离队

#include<stdio.h> int main() { int i, n100,k0,j0,a[1000]{0};//k&#xff1a;数数123的变量&#xff0c;j记录离开队列人数的变量scanf("%d",&n);for(int ii0; ii<n; ii){ for( i0; i<n; i){// printf("wei%d ",i);if((a[i]0)&&…

掌握Line多开技术,打造私人专属空间

掌握Line多开技术&#xff0c;打造私人专属空间 在现代社交网络的时代&#xff0c;人们经常需要同时处理多个社交账号&#xff0c;例如工作、家庭、朋友等不同领域的社交关系。而对于Line这样的主流社交应用来说&#xff0c;多开技术可以让用户更便捷地管理多个账号&#xff0…

数据结构线性表-栈和队列的实现

1. 栈(Stack) 1.1 概念 栈&#xff1a;一种特殊的线性表&#xff0c;其只允许在固定的一端进行插入和删除元素操作。进行数据插入和删除操作的一端称为栈 顶&#xff0c;另一端称为栈底。栈中的数据元素遵守后进先出LIFO&#xff08;Last In First Out&#xff09;的原则。 …

Vue学习计划-Vue2--Vue核心(三)methods和computed

Vue 1. 事件 v-on 基础 使用 v-on:xxx或者xxx绑定事件&#xff0c;其中xxx是事件名 事件的回调需要配置在methods对象中&#xff0c;最终会在vm上 methods中配置函数&#xff0c;不要用箭头函数&#xff0c;否则this就不是vm了 methods中配置函数&#xff0c;都是被Vue管…

Seata使用

本文以seata-server-1.5.2&#xff0c;以配置中心、注册中心使用Nacos&#xff0c;store.modedb&#xff08;mysql&#xff09;为例进行操作。 一、Seata Server端 1、下载seata server 链接: http://seata.io/zh-cn/blog/download.html下载压缩包&#xff0c;解压至非中文目录…

Java技术栈 —— 微服务框架Spring Cloud —— Ruoyi-Cloud 学习(一)

Ruoyi-cloud 项目学习 一、项目环境搭建与启动1.1 nacos安装部署1.1.1 nacos安装、启动1.1.2 nacos部署 1.2 seata安装部署1.3 后端部署与运行1.3.1 ruoyi-modules-file模块运行报错 1.4 nginx安装、部署、配置与启动1.5 redis安装与部署1.6 前段框架知识1.7 项目启动1.8 参考 …

实用方法 | 搭建真正满足用户需求的在线帮助中心

随着互联网的普及和信息技术的快速发展&#xff0c;客户服务和支持变得越来越重要。为了提高客户满意度和维持良好的品牌形象&#xff0c;越来越多企业都开始搭建自己的在线帮助中心。 不知从何下手&#xff1f;细想一下&#xff0c;搭建在线帮助中心主要就是为了解决用户的问…

根据java类名找出当前是哪个Excel中的sheet

pom.xml <?xml version"1.0" encoding"UTF-8"?> <project xmlns"http://maven.apache.org/POM/4.0.0"xmlns:xsi"http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation"http://maven.apache.org/POM/4.0.0 …

shell_81.Linux在命令行中创建使用函数

在命令行中使用函数 在命令行中创建函数 两种方法 单行方式来定义函数&#xff1a; $ function divem { echo $[ $1 / $2 ]; } $ divem 100 5 20 $ 当你在命令行中定义函数时&#xff0c;必须在每个命令后面加个分号&#xff0c;这样 shell 就能知道哪里是命令的起止了&am…

反射实现tomcat

获取类信息的方法 1.通过类对象 x.getClass() 2.通过class.forname方法 Class.forname(className);这里className是存储类名的字符串 3.通过类名.class 类名.class 通过类名创建对象 类名.newInstance&#xff08;&#xff09;&#xff1b; 反射可以看到类的一切信息&#xff1…

C语言联合和枚举讲解

目录 联合体的大小 联合体如何省空间 巧用联合体 联合判断大小端&#xff08;惊为天人&#xff0c;大佬写的&#xff0c;我借鉴&#xff09; 枚举 枚举类型的使用 首先我们先看一下菜鸟教程中的对C语言联合体的说明 联合体的大小 #include <stdio.h> union u {char…

Proteus仿真--基于ADC0808设计的调温报警器

本文介绍基于ADC0808实现的调温报警器设计&#xff08;完整仿真源文件及代码见文末链接&#xff09; 温度调节使用滑动变阻器模拟实现&#xff0c;ADC0808采集信号并输出在LCD上面显示&#xff0c;报警系统是LED灯和蜂鸣器实现声光电报警 仿真图如下 仿真运行视频 Proteus仿真…