数据湖 data lake_在Data Lake中高效更新TB级数据的模式

数据湖 data lake

GOAL: This post discusses SQL “UPDATE” statement equivalent for a data lake (object) storage using Apache Spark execution engine. To further clarify consider this, when you need to perform conditional updates to a massive table in a relations data warehouse… you will do something like

目标 :这篇文章讨论了等效于使用Apache Spark执行引擎的数据湖(对象)存储SQL“ UPDATE”语句。 为了进一步阐明这一点,当您需要对关系数据仓库中的大型表执行条件更新时,您将执行以下操作

UPDATE <table name>
SET <column> = <value>
WHERE <primary_key> IN (val1, val2, val3, val4)

How would you do the same when your data is stored as parquet files in an object storage (S3, ADLS Gen2, etc.)?

当数据作为拼合文件存储在对象存储中(S3,ADLS Gen2等)时,您将如何做?

CONTEXT: Consider a massive table about 15TB in size that gets 40–50 GB (~ 1B rows) of new data every day. This new data contains fresh records to be inserted and updates to older records as well. These updates to older records can go as far back as 18 months and are the root of all complications. When processing new data every day the pipeline has to remove duplicates for all records that received updates.

上下文 :考虑一个约15TB的大型表,每天可获取40–50 GB(〜1B行)的新数据。 此新数据包含要插入的新记录,以及对旧记录的更新。 这些对较早记录的更新可以追溯到18个月,并且是所有并发症的根源。 每天处理新数据时,管道必须删除所有收到更新的记录的重复项。

Sample business context, consider an online sports store that discounts prices based on number of goods purchased… so, a pair-of-shoes and a pair-of-shorts individually might cost $10 and $5 respectively, but when the purchases are grouped together they cost $13. Now, to further complicate things… imagine if the buyer could group her/his purchases at a later time after making the purchases individually. So, let’s say I purchased a pair-of-shoes on Jan 1st, 2020 for $10 and then on Jul 7th, 2020 I decide to purchase a pair-of-shorts, which is $5 by itself. But, at this point I can group my recent purchase of shorts with my older purchase of shoes made on Jan 1st… doing this reduces my total expense on shoes + shorts to $13 instead of $15. On the backend, this transaction doesn’t just reduce the price of shorts, but it reduces the price of both shorts and shoes proportionally. So, the transaction that holds original selling price of the shoes needs to be updated from $10 to $8.7 (taking out percentage 2/15 = 0.133). In light of above business case, let’s see the three major components of this problem

以业务环境为例,请考虑一家在线体育商店,该商店根据购买的商品数量来打折价格……因此,一双鞋一双短裤可能分别花费10美元和5美元,但是当将购买组合在一起时,花费$ 13。 现在,要进一步使事情复杂化……想象一下购买者是否可以在单独进行购买后稍后将其购买分组。 因此,假设我在2020年1月1日以10美元的价格购买了一双鞋 ,然后在2020年7月7日,我决定购买一条5美元的短裤 。 但是,在这一点上,我可以将我最近购买的短裤与1月1日以前购买的旧鞋归为一类……这样做可以将我的鞋子+短裤的总支出减少到13美元,而不是15美元。 在后端,此交易不仅降低了短裤的价格,而且还成比例地降低了短裤和鞋子的价格。 因此,保持鞋子原始销售价格的交易需要从10美元更新为8.7美元(扣除2/15百分比= 0.133)。 根据上述业务案例,让我们看一下这个问题的三个主要组成部分

  1. The massive table we spoke of earlier is the sales table that holds all transactions,

    我们之前提到的庞大表是保存所有交易的销售表,
  2. The data coming into the system every day are all transactions for that day (new and updates to older records)

    每天进入系统的数据是当天的所有交易(新记录和旧记录的更新)
  3. The pipeline code that consumes incoming data, processes it, and updates the sales table

    消耗传入数据,对其进行处理并更新销售表的管道代码

Complications with this scenario,

这种情况下的并发症

1. Volume of data in transit — About 1 billion(40 GB) transactions flowing into the system every day

1. 传输中的数据量-每天大约有10亿(40 GB)交易流入系统

2. Volume of data at rest — sales table is massive (~15TB). This table is partitioned on transaction date and each partition (i.e. transaction date folder) contains a billion rows

2. 静态数据量-销售表非常大(约15TB)。 该表按交易日期分区,每个分区(即交易日期文件夹)包含十亿行

3. Updates to historical data — Every day the incoming transactions can update historical data up to past 18 months (545 days) which mean ~545 billion rows

3. 更新历史数据 -过去18个月(545天)内,每天传入的交易每天都可以更新历史数据,这意味着约5,450亿行

4. The data is stored in a data lake (S3, ADLS Gen2, etc.) and not in a relational data warehouse… which mean there are no SQL like indices or UPDATE statements to take advantage of.

4.数据存储在数据湖(S3,ADLS Gen2等)中, 而不存储在关系数据仓库中 ……这意味着没有SQL之类的索引或UPDATE语句可以利用。

TECHNICAL DETAILS: This approach assumes data is stored in an object storage i.e. S3, ADLS Gen2 etc. and the processing is done using Apache Spark based execution layer.

技术细节 :此方法假定数据存储在对象存储中,即S3,ADLS Gen2等,并且使用基于Apache Spark的执行层进行处理。

Image for post
High level schematic for data storage and flow
数据存储和流程的高级示意图
  1. Data is stored in an object storage (S3, ADLS Gen2, etc.) as parquet files and is partitioned by transaction date. So, in above example, the record representing shoe purchase dated Jan 1st, 2020 will be within a folder titled Jan 1st, 2020

    数据作为实木复合地板文件存储在对象存储(S3,ADLS Gen2等)中,并按交易日期进行分区。 因此,在上述示例中,代表日期为2020年1月1日的鞋子购买的记录将位于标题为2020年1月1日的文件夹中
  2. Each record flowing into the data lake is appended with a column called “record_timestamp”. This holds timestamp value of when a particular record was received. This is crucial for identifying latest records in case of multiple duplicates

    每个流入数据湖的记录都附加一个称为“ record_timestamp”的列。 这保留接收到特定记录的时间戳值。 这对于在多次重复的情况下识别最新记录至关重要
  3. The object storage (refer schematic above) is divided in to two sections:

    对象存储(请参见上面的示意图)分为两个部分:

    a.

    一个。

    Landing zone — where the incoming data is stored in folders. Refer “landing zone” in above schematic, each folder is named with a date, this date signifies when the data contained in the folder was received. So, all of data received on 01/07/2020 will reside in folder name = “01/07/2020”

    着陆区 -传入数据存储在文件夹中的区域。 请参阅上面示意图中的“着陆区”,每个文件夹都有一个日期命名,该日期表示何时接收到该文件夹​​中包含的数据。 因此,2020年1月7日收到的所有数据都将驻留在文件夹名称=“ 01/07/2020”中

    b.

    b。

    Processed data zone — where the final view of sales table resides i.e. every transaction has its latest adjusted value. Refer “Processed Data Zone” in above schematic, folders in this zone are also named with a date… this date is “transaction_date”. So, if on 03/07/2020… we receive an update to a transaction which was initially made on 01/01/2020… this new record will be stored in folder titled “03/07/2020” in “Landing Zone” and in folder titled “01/01/2020” in “Processed Data Zone”. A dataset can be stored like this by a simple command such as

    已处理数据区 -销售表的最终视图所在的位置,即每笔交易都有其最新调整后的价值。 请参阅上面示意图中的“已处理数据区域”,该区域中的文件夹也被命名为日期,该日期为“ transaction_date”。 因此,如果在03/07/2020…我们收到的交易更新最初是在2020年1月1日……此新记录将存储在“着陆区”中名为“ 03/07/2020”的文件夹中,并且在“已处理数据区域”中名为“ 01/01/2020”的文件夹中。 数据集可以通过一个简单的命令像这样存储

dataframe_name.write.partitionBy(“transaction_date”).parquet(<location>)

Note: As the transaction date is used for partitioning, it will not appear in the data within the folders titled with transaction date

注意:由于交易日期用于分区,因此它不会出现在以交易日期为标题的文件夹中的数据中

4. For processing the data, we use PySpark on databricks (approach stays same for other spark distributions)

4.为了处理数据,我们在数据块上使用PySpark(方法对于其他火花分布保持不变)

FINALLY, THE APPROACH: Assume the pipeline runs every night at 2 am to process data for the previous day. In current example let’s assume it’s 2 am on July 8th (i.e. 07/08/2020) and the pipeline will be processing data for 07/07/2020. The approach to update data is primarily two phases:

最后,方法:假设管道每天晚上2点运行,以处理前一天的数据。 在当前示例中,我们假设它是7月8日凌晨2点(即07/08/2020),并且管道将处理07/07/2020的数据。 更新数据的方法主要分为两个阶段:

  • First phase has three sub-steps

    第一阶段包含三个子步骤

    1. read in the new data from Landing Zone,

    1.从着陆区读取新数据,

    2

    2

    . append it to existing data in “Processed Data Zone” in the respective folders as per transaction date,

    根据交易日期将其附加到相应文件夹中“已处理数据区域”中的现有数据,

    3. store names (i.e. dates) of all folders that received updates in a list so that in next step we can use it

    3.将收到更新的所有文件夹的名称(即日期)存储在列表中,以便在下一步中可以使用它

    First sub-step is self-explanatory. Let me explain the second sub-step in a bit detail with an example, consider our old purchases of a pair of shoes on Jan 1st 2020 and then a pair of shorts on Jul 07th 2020, now this transaction on Jul 7th 2020 will lead to an update to selling price of shoes from $10 to $8.7 because of grouping discount. This will be reflected in the data lake as below:

    第一步是不言自明的。 让我用一个示例来详细解释第二个子步骤,考虑我们在2020年1月1日购买的一双鞋,然后在2020年7月7日购买的一双短裤,现在在2020年7月7日的交易将导致由于分组折扣,鞋子的售价从10美元更新为8.7美元。 这将反映在数据湖中,如下所示:

    On Jan 1st 2020, the data in folder corresponding to this date will look like… only shoes purchased

    2020年1月1日,与此日期对应的文件夹中的数据如下所示:仅购买了鞋子

Image for post

… on Jul 07th 2020, with a purchase of a pair of shorts being grouped with the earlier transaction. The data in folder dated Jan 1st 2020 will look like this

…于2020年7月7日,购买了一条与早期交易组合在一起的短裤。 文件夹中日期为2020年1月1日的数据将如下所示

Image for post
New selling price appended
附加新售价

Note: This is possible because when an update is made to an existing transaction, the update preserves the original transaction date and ID in addition to recording its own creation date. The transaction for a pair of shorts will reflect in folder dated Jul 07th 2020 because this is the original transaction for purchase of shorts.

注意:之所以可行,是因为在对现有交易进行更新时,该更新除了记录其自己的创建日期之外,还保留了原始交易日期和ID。 一对短裤的交易将反映在2020年7月7日的文件夹中,因为这是购买短裤的原始交易。

The third sub-steps of this phase help us create a list of folder names that received updates in sub-step two and now contain duplicate records. Make sure you store this list in a temporary location.

此阶段的第三个子步骤可帮助我们创建一个文件夹名称列表,该文件夹名称在第二步中已接收更新,现在包含重复记录。 确保将此列表存储在一个临时位置。

  • Second phase is about removing duplicates from all folders updated by second sub-step in last phase. This is accomplished by leveraging the list of folder names created in third sub-step of last phase. In worst case scenario, this list will have 545 values (i.e. one entry per day for last 18 months). Let’s see how we will handle this case… Each of these 545 folders contain about a billion records and there are multiple ways to remove duplicates from all of these folders… I believe the easiest one to visualize is using a loop. Granted this is not most efficient but it does help get the idea across. So, let’s go through sub-steps of this phase

    第二阶段是从上一个阶段的第二个子步骤更新的所有文件夹中删除重复项。 这是通过利用上一个阶段的第三子步骤中创建的文件夹名称列表来完成的。 在最坏的情况下,此列表将具有545个值(即,过去18个月中每天有一个条目)。 让我们看看我们将如何处理这种情况……这545个文件夹中的每个文件夹都包含约10亿条记录,并且有多种方法可以从所有这些文件夹中删除重复项……我相信最容易看到的是使用循环。 当然,这不是最有效的方法,但确实有助于将想法传播出去。 因此,让我们来看一下该阶段的子步骤

    1. Read in the list of folder names which contain duplicate transactions,

    1.读入包含重复交易的文件夹名称列表,

    2. Loop through this list and perform following

    2.遍历此列表并执行以下操作

    a. Read the data from the folder specified by loop counter,

    一个。 从循环计数器指定的文件夹中读取数据,

    b. Remove duplicates(defined as per candidate key columns) from this data frame, and

    b。 从此数据框中删除重复项(按候选关键字列定义),然后

Import pyspark.sql.functions sfdf_duplicates_removed = (df_with_duplicates
.withColumn('rn',sf.row_number()
.over(Window().partitionBy(<primary_key>)
.orderBy(sf.col(order_by_col).desc())))
.where((sf.col("rn") == 1))
)

c. Write refreshed dataset back to its original location

C。 将刷新的数据集写回到其原始位置

For parallelizing “duplicates removal” step, you can use serverless execution such as AWS Lambda functions in addition to a queue store for folders names that need to be refreshed.

为了并行化“重复项删除”步骤,除了可以存储需要刷新的文件夹名称的队列存储之外,还可以使用无服务器执行(例如AWS Lambda函数)。

CONCLUSION: This approach seems to work very nicely with large datasets, and it scales gracefully as processing needs grow. In other words, the curve of execution time (y-axis) vs data size (x-axis) begins to flatten as the data size grows… this is primarily because the second phase of processing allows for massive parallelization.

结论 :这种方法似乎适用于大型数据集,并且可以随着处理需求的增长而适当扩展。 换句话说,执行时间(y轴)对数据大小(x轴)的曲线随着数据大小的增长而开始趋于平坦……这主要是因为处理的第二阶段允许大规模并行化。

Image for post

Although, the fictitious business example used here pertains to sales, this pattern can be leveraged in any scenario with need for big data processing such as — IOT, log streams analysis, etc. Thanks for reading!

尽管此处使用的虚拟业务示例与销售有关,但是可以在需要大数据处理的任何情况下利用此模式,例如IOT,日志流分析等。感谢您的阅读!

翻译自: https://medium.com/@ashishverma_93245/pattern-to-efficiently-update-terabytes-of-data-in-a-data-lake-1f4981b1861

数据湖 data lake

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/389091.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

advanced installer更换程序id_好程序员web前端培训分享kbone高级-事件系统

好程序员web前端培训分享kbone高级-事件系统&#xff1a;1、用法&#xff0c;对于多页面的应用&#xff0c;在 Web 端可以直接通过 a 标签或者 location 对象进行跳转&#xff0c;但是在小程序中则行不通&#xff1b;同时 Web 端的页面 url 实现和小程序页面路由也是完全不一样…

ai对话机器人实现方案_显然地引入了AI —无代码机器学习解决方案

ai对话机器人实现方案A couple of folks from Obviously.ai contacted me a few days back to introduce their service — a completely no-code machine learning automation tool. I was a bit skeptical at first, as I always am with supposedly fully-automated solutio…

网络负载平衡的

网络负载平衡允许你将传入的请求传播到最多达32台的服务器上&#xff0c;即可以使用最多32台服务器共同分担对外的网络请求服务。网络负载平衡技术保证即使是在负载很重的情况下它们也能作出快速响应。 网络负载平衡对外只须提供一个IP地址&#xff08;或域名&#xff09;。 如…

神经网络 CNN

# encodingutf-8import tensorflow as tfimport numpy as npfrom tensorflow.examples.tutorials.mnist import input_datamnist input_data.read_data_sets(MNIST_data, one_hotTrue)def weight_variable(shape): initial tf.truncated_normal(shape, stddev0.1) # 定义…

图片中的暖色或冷色滤色片是否会带来更多点击? —机器学习A / B测试

A/B test on ads is the art of choosing the best advertisement that optimizes your goal (number of clicks, likes, etc). For example, if you change a simple thing like a filter in your pictures you will drive more traffic to your links.广告的A / B测试是一种选…

3d制作中需要注意的问题_浅谈线路板制作时需要注意的问题

PCB电路板是电子设备重要的基础组装部件&#xff0c;在制作PCB电路板时&#xff0c;只有将各个方面都考虑清楚&#xff0c;才能保证电子设备在使用时不会出现问题。今天小编就与大家一起分享线路板制作时需要注意的问题&#xff0c;归纳一下几点&#xff1a;1、考虑制作类型电路…

冷启动、热启动时间性能优化

用户希望应用程序能够快速响应并加载。 一个启动速度慢的应用程序不符合这个期望&#xff0c;可能会令用户失望。 这种糟糕的体验可能会导致用户在应用商店中对您的应用进行糟糕的评价&#xff0c;甚至完全放弃您的应用。 本文档提供的信息可帮助您优化应用的启动时间。 它首先…

python:lambda、filter、map、reduce

lambda 为关键字。filter&#xff0c;map&#xff0c;reduce为内置函数。 lambda&#xff1a;实现python中单行最小函数。 g lambda x: x * 2 #相当于 def g(x):return x*2print(g(3))# 6 注意&#xff1a;这里直接g(3)可以执行&#xff0c;但没有输出的&#xff0c;前面的…

集群

原文地址&#xff1a;http://www.microsoft.com/china/MSDN/library/windev/COMponentdev/CdappCEnter.mspx?mfrtrue 本文假设读者熟悉 Windows 2000、COM、IIS 5.0 摘要 Application Center 2000 简化了从基于 Microsoft .NET 的应用程序到群集的部署&#xff0c;群集是一组…

Myeclipes连接Mysql数据库配置

相信大家在网站上也找到了许多关于myeclipes如何连接mysql数据库的解决方案&#xff0c;虽然每一步都按照他的步骤来&#xff0c;可到最后还是提示连接失败&#xff0c;有的方案可能应个人设备而异&#xff0c;配置环境不同导致。经过个人多方探索终于找到一个简单便捷的配置方…

cnn图像二分类 python_人工智能Keras图像分类器(CNN卷积神经网络的图片识别篇)...

上期文章我们分享了人工智能Keras图像分类器(CNN卷积神经网络的图片识别的训练模型)&#xff0c;本期我们使用预训练模型对图片进行识别&#xff1a;Keras CNN卷积神经网络模型训练导入第三方库from keras.preprocessing.image import img_to_arrayfrom keras.models import lo…

图卷积 节点分类_在节点分类任务上训练图卷积网络

图卷积 节点分类This article goes through the implementation of Graph Convolution Networks (GCN) using Spektral API, which is a Python library for graph deep learning based on Tensorflow 2. We are going to perform Semi-Supervised Node Classification using C…

回归分析预测_使用回归分析预测心脏病。

回归分析预测As per the Centers for Disease Control and Prevention report, heart disease is the prime killer of both men and women in the United States and around the globe. There are several data mining techniques that can be leveraged by researchers/ stat…

crc16的c语言函数 计算ccitt_C语言为何如此重要

●●●如今&#xff0c;有很多学生不懂为何要学习编程语言&#xff0c;为何要学习C语言&#xff1f;原因是大学生不能满足于只会用办公软件&#xff0c;而应当有更高的学习要求&#xff0c;对于理工科的学生尤其如此。计算机的本质是“程序的机器”&#xff0c;程序和指令的思想…

aws spark_使用Spark构建AWS数据湖时的一些问题以及如何处理这些问题

aws spark技术提示 (TECHNICAL TIPS) 介绍 (Introduction) At first, it seemed to be quite easy to write down and run a Spark application. If you are experienced with data frame manipulation using pandas, numpy and other packages in Python, and/or the SQL lang…

冲刺第三天 11.27 TUE

任务执行情况 已解决问题 数据库结构已经确定 对联生成model已训练完成 词匹配部分完成 微信前端rush版本完成 总体情况 团队成员今日已完成任务剩余任务困难Dacheng, Weijieazure数据库搭建(完成&#xff09;multiple communication scripts, call APIs需要进行整合调试Yichon…

DPDK+Pktgen 高速发包测试

参考博客 Pktgen概述 Pktgen,(Packet Gen-erator)是一个基于DPDK的软件框架&#xff0c;发包速率可达线速。提供运行时管理&#xff0c;端口实时测量。可以控制 UDP, TCP, ARP, ICMP, GRE, MPLS and Queue-in-Queue等包。可以通过TCP进行远程控制。Pktgen官网 安装使用过程 版本…

数据科学家编程能力需要多好_我们不需要这么多的数据科学家

数据科学家编程能力需要多好I have held the title of data scientist in two industries. I’ve interviewed for more than 30 additional data science positions. I’ve been the CTO of a data-centric startup. I’ve done many hours of data science consulting.我曾担…

excel表格行列显示十字定位_WPS表格:Excel表格打印时,如何每页都显示标题行?...

电子表格数据很多的时候&#xff0c;要分很多页打印&#xff0c;如何每页都能显示标题行呢&#xff1f;以下表为例&#xff0c;我们在WPS2019中演示如何每页都显示前两行标题行&#xff1f;1.首先点亮顶部的页面布局选项卡。然后点击打印标题或表头按钮。2.在弹出的页面设置对话…

sql优化技巧_使用这些查询优化技巧成为SQL向导

sql优化技巧成为SQL向导&#xff01; (Become an SQL Wizard!) It turns out storing data by rows and columns is convenient in a lot of situations, so relational databases have remained a cornerstone of data management in businesses across the globe. Structured…