数据湖 data lake
GOAL: This post discusses SQL “UPDATE” statement equivalent for a data lake (object) storage using Apache Spark execution engine. To further clarify consider this, when you need to perform conditional updates to a massive table in a relations data warehouse… you will do something like
目标 :这篇文章讨论了等效于使用Apache Spark执行引擎的数据湖(对象)存储SQL“ UPDATE”语句。 为了进一步阐明这一点,当您需要对关系数据仓库中的大型表执行条件更新时,您将执行以下操作
UPDATE <table name>
SET <column> = <value>
WHERE <primary_key> IN (val1, val2, val3, val4)
How would you do the same when your data is stored as parquet files in an object storage (S3, ADLS Gen2, etc.)?
当数据作为拼合文件存储在对象存储中(S3,ADLS Gen2等)时,您将如何做?
CONTEXT: Consider a massive table about 15TB in size that gets 40–50 GB (~ 1B rows) of new data every day. This new data contains fresh records to be inserted and updates to older records as well. These updates to older records can go as far back as 18 months and are the root of all complications. When processing new data every day the pipeline has to remove duplicates for all records that received updates.
上下文 :考虑一个约15TB的大型表,每天可获取40–50 GB(〜1B行)的新数据。 此新数据包含要插入的新记录,以及对旧记录的更新。 这些对较早记录的更新可以追溯到18个月,并且是所有并发症的根源。 每天处理新数据时,管道必须删除所有收到更新的记录的重复项。
Sample business context, consider an online sports store that discounts prices based on number of goods purchased… so, a pair-of-shoes and a pair-of-shorts individually might cost $10 and $5 respectively, but when the purchases are grouped together they cost $13. Now, to further complicate things… imagine if the buyer could group her/his purchases at a later time after making the purchases individually. So, let’s say I purchased a pair-of-shoes on Jan 1st, 2020 for $10 and then on Jul 7th, 2020 I decide to purchase a pair-of-shorts, which is $5 by itself. But, at this point I can group my recent purchase of shorts with my older purchase of shoes made on Jan 1st… doing this reduces my total expense on shoes + shorts to $13 instead of $15. On the backend, this transaction doesn’t just reduce the price of shorts, but it reduces the price of both shorts and shoes proportionally. So, the transaction that holds original selling price of the shoes needs to be updated from $10 to $8.7 (taking out percentage 2/15 = 0.133). In light of above business case, let’s see the three major components of this problem
以业务环境为例,请考虑一家在线体育商店,该商店根据购买的商品数量来打折价格……因此,一双鞋和一双短裤可能分别花费10美元和5美元,但是当将购买组合在一起时,花费$ 13。 现在,要进一步使事情复杂化……想象一下购买者是否可以在单独进行购买后稍后将其购买分组。 因此,假设我在2020年1月1日以10美元的价格购买了一双鞋 ,然后在2020年7月7日,我决定购买一条5美元的短裤 。 但是,在这一点上,我可以将我最近购买的短裤与1月1日以前购买的旧鞋归为一类……这样做可以将我的鞋子+短裤的总支出减少到13美元,而不是15美元。 在后端,此交易不仅降低了短裤的价格,而且还成比例地降低了短裤和鞋子的价格。 因此,保持鞋子原始销售价格的交易需要从10美元更新为8.7美元(扣除2/15百分比= 0.133)。 根据上述业务案例,让我们看一下这个问题的三个主要组成部分
- The massive table we spoke of earlier is the sales table that holds all transactions, 我们之前提到的庞大表是保存所有交易的销售表,
- The data coming into the system every day are all transactions for that day (new and updates to older records) 每天进入系统的数据是当天的所有交易(新记录和旧记录的更新)
- The pipeline code that consumes incoming data, processes it, and updates the sales table 消耗传入数据,对其进行处理并更新销售表的管道代码
Complications with this scenario,
这种情况下的并发症
1. Volume of data in transit — About 1 billion(40 GB) transactions flowing into the system every day
1. 传输中的数据量-每天大约有10亿(40 GB)交易流入系统
2. Volume of data at rest — sales table is massive (~15TB). This table is partitioned on transaction date and each partition (i.e. transaction date folder) contains a billion rows
2. 静态数据量-销售表非常大(约15TB)。 该表按交易日期分区,每个分区(即交易日期文件夹)包含十亿行
3. Updates to historical data — Every day the incoming transactions can update historical data up to past 18 months (545 days) which mean ~545 billion rows
3. 更新历史数据 -过去18个月(545天)内,每天传入的交易每天都可以更新历史数据,这意味着约5,450亿行
4. The data is stored in a data lake (S3, ADLS Gen2, etc.) and not in a relational data warehouse… which mean there are no SQL like indices or UPDATE statements to take advantage of.
4.数据存储在数据湖(S3,ADLS Gen2等)中, 而不存储在关系数据仓库中 ……这意味着没有SQL之类的索引或UPDATE语句可以利用。
TECHNICAL DETAILS: This approach assumes data is stored in an object storage i.e. S3, ADLS Gen2 etc. and the processing is done using Apache Spark based execution layer.
技术细节 :此方法假定数据存储在对象存储中,即S3,ADLS Gen2等,并且使用基于Apache Spark的执行层进行处理。
- Data is stored in an object storage (S3, ADLS Gen2, etc.) as parquet files and is partitioned by transaction date. So, in above example, the record representing shoe purchase dated Jan 1st, 2020 will be within a folder titled Jan 1st, 2020 数据作为实木复合地板文件存储在对象存储(S3,ADLS Gen2等)中,并按交易日期进行分区。 因此,在上述示例中,代表日期为2020年1月1日的鞋子购买的记录将位于标题为2020年1月1日的文件夹中
- Each record flowing into the data lake is appended with a column called “record_timestamp”. This holds timestamp value of when a particular record was received. This is crucial for identifying latest records in case of multiple duplicates 每个流入数据湖的记录都附加一个称为“ record_timestamp”的列。 这保留接收到特定记录的时间戳值。 这对于在多次重复的情况下识别最新记录至关重要
The object storage (refer schematic above) is divided in to two sections:
对象存储(请参见上面的示意图)分为两个部分:
a.
一个。
Landing zone — where the incoming data is stored in folders. Refer “landing zone” in above schematic, each folder is named with a date, this date signifies when the data contained in the folder was received. So, all of data received on 01/07/2020 will reside in folder name = “01/07/2020”
着陆区 -传入数据存储在文件夹中的区域。 请参阅上面示意图中的“着陆区”,每个文件夹都有一个日期命名,该日期表示何时接收到该文件夹中包含的数据。 因此,2020年1月7日收到的所有数据都将驻留在文件夹名称=“ 01/07/2020”中
b.
b。
Processed data zone — where the final view of sales table resides i.e. every transaction has its latest adjusted value. Refer “Processed Data Zone” in above schematic, folders in this zone are also named with a date… this date is “transaction_date”. So, if on 03/07/2020… we receive an update to a transaction which was initially made on 01/01/2020… this new record will be stored in folder titled “03/07/2020” in “Landing Zone” and in folder titled “01/01/2020” in “Processed Data Zone”. A dataset can be stored like this by a simple command such as
已处理数据区 -销售表的最终视图所在的位置,即每笔交易都有其最新调整后的价值。 请参阅上面示意图中的“已处理数据区域”,该区域中的文件夹也被命名为日期,该日期为“ transaction_date”。 因此,如果在03/07/2020…我们收到的交易更新最初是在2020年1月1日……此新记录将存储在“着陆区”中名为“ 03/07/2020”的文件夹中,并且在“已处理数据区域”中名为“ 01/01/2020”的文件夹中。 数据集可以通过一个简单的命令像这样存储
dataframe_name.write.partitionBy(“transaction_date”).parquet(<location>)
Note: As the transaction date is used for partitioning, it will not appear in the data within the folders titled with transaction date
注意:由于交易日期用于分区,因此它不会出现在以交易日期为标题的文件夹中的数据中
4. For processing the data, we use PySpark on databricks (approach stays same for other spark distributions)
4.为了处理数据,我们在数据块上使用PySpark(方法对于其他火花分布保持不变)
FINALLY, THE APPROACH: Assume the pipeline runs every night at 2 am to process data for the previous day. In current example let’s assume it’s 2 am on July 8th (i.e. 07/08/2020) and the pipeline will be processing data for 07/07/2020. The approach to update data is primarily two phases:
最后,方法:假设管道每天晚上2点运行,以处理前一天的数据。 在当前示例中,我们假设它是7月8日凌晨2点(即07/08/2020),并且管道将处理07/07/2020的数据。 更新数据的方法主要分为两个阶段:
First phase has three sub-steps
第一阶段包含三个子步骤
1. read in the new data from Landing Zone,
1.从着陆区读取新数据,
2
2
. append it to existing data in “Processed Data Zone” in the respective folders as per transaction date,
。 根据交易日期将其附加到相应文件夹中“已处理数据区域”中的现有数据,
3. store names (i.e. dates) of all folders that received updates in a list so that in next step we can use it
3.将收到更新的所有文件夹的名称(即日期)存储在列表中,以便在下一步中可以使用它
First sub-step is self-explanatory. Let me explain the second sub-step in a bit detail with an example, consider our old purchases of a pair of shoes on Jan 1st 2020 and then a pair of shorts on Jul 07th 2020, now this transaction on Jul 7th 2020 will lead to an update to selling price of shoes from $10 to $8.7 because of grouping discount. This will be reflected in the data lake as below:
第一步是不言自明的。 让我用一个示例来详细解释第二个子步骤,考虑我们在2020年1月1日购买的一双鞋,然后在2020年7月7日购买的一双短裤,现在在2020年7月7日的交易将导致由于分组折扣,鞋子的售价从10美元更新为8.7美元。 这将反映在数据湖中,如下所示:
On Jan 1st 2020, the data in folder corresponding to this date will look like… only shoes purchased
2020年1月1日,与此日期对应的文件夹中的数据如下所示:仅购买了鞋子
… on Jul 07th 2020, with a purchase of a pair of shorts being grouped with the earlier transaction. The data in folder dated Jan 1st 2020 will look like this
…于2020年7月7日,购买了一条与早期交易组合在一起的短裤。 文件夹中日期为2020年1月1日的数据将如下所示
Note: This is possible because when an update is made to an existing transaction, the update preserves the original transaction date and ID in addition to recording its own creation date. The transaction for a pair of shorts will reflect in folder dated Jul 07th 2020 because this is the original transaction for purchase of shorts.
注意:之所以可行,是因为在对现有交易进行更新时,该更新除了记录其自己的创建日期之外,还保留了原始交易日期和ID。 一对短裤的交易将反映在2020年7月7日的文件夹中,因为这是购买短裤的原始交易。
The third sub-steps of this phase help us create a list of folder names that received updates in sub-step two and now contain duplicate records. Make sure you store this list in a temporary location.
此阶段的第三个子步骤可帮助我们创建一个文件夹名称列表,该文件夹名称在第二步中已接收更新,现在包含重复记录。 确保将此列表存储在一个临时位置。
Second phase is about removing duplicates from all folders updated by second sub-step in last phase. This is accomplished by leveraging the list of folder names created in third sub-step of last phase. In worst case scenario, this list will have 545 values (i.e. one entry per day for last 18 months). Let’s see how we will handle this case… Each of these 545 folders contain about a billion records and there are multiple ways to remove duplicates from all of these folders… I believe the easiest one to visualize is using a loop. Granted this is not most efficient but it does help get the idea across. So, let’s go through sub-steps of this phase
第二阶段是从上一个阶段的第二个子步骤更新的所有文件夹中删除重复项。 这是通过利用在上一个阶段的第三子步骤中创建的文件夹名称列表来完成的。 在最坏的情况下,此列表将具有545个值(即,过去18个月中每天有一个条目)。 让我们看看我们将如何处理这种情况……这545个文件夹中的每个文件夹都包含约10亿条记录,并且有多种方法可以从所有这些文件夹中删除重复项……我相信最容易看到的是使用循环。 当然,这不是最有效的方法,但确实有助于将想法传播出去。 因此,让我们来看一下该阶段的子步骤
1. Read in the list of folder names which contain duplicate transactions,
1.读入包含重复交易的文件夹名称列表,
2. Loop through this list and perform following
2.遍历此列表并执行以下操作
a. Read the data from the folder specified by loop counter,
一个。 从循环计数器指定的文件夹中读取数据,
b. Remove duplicates(defined as per candidate key columns) from this data frame, and
b。 从此数据框中删除重复项(按候选关键字列定义),然后
Import pyspark.sql.functions sfdf_duplicates_removed = (df_with_duplicates
.withColumn('rn',sf.row_number()
.over(Window().partitionBy(<primary_key>)
.orderBy(sf.col(order_by_col).desc())))
.where((sf.col("rn") == 1))
)
c. Write refreshed dataset back to its original location
C。 将刷新的数据集写回到其原始位置
For parallelizing “duplicates removal” step, you can use serverless execution such as AWS Lambda functions in addition to a queue store for folders names that need to be refreshed.
为了并行化“重复项删除”步骤,除了可以存储需要刷新的文件夹名称的队列存储之外,还可以使用无服务器执行(例如AWS Lambda函数)。
CONCLUSION: This approach seems to work very nicely with large datasets, and it scales gracefully as processing needs grow. In other words, the curve of execution time (y-axis) vs data size (x-axis) begins to flatten as the data size grows… this is primarily because the second phase of processing allows for massive parallelization.
结论 :这种方法似乎适用于大型数据集,并且可以随着处理需求的增长而适当扩展。 换句话说,执行时间(y轴)对数据大小(x轴)的曲线随着数据大小的增长而开始趋于平坦……这主要是因为处理的第二阶段允许大规模并行化。
Although, the fictitious business example used here pertains to sales, this pattern can be leveraged in any scenario with need for big data processing such as — IOT, log streams analysis, etc. Thanks for reading!
尽管此处使用的虚拟业务示例与销售有关,但是可以在需要大数据处理的任何情况下利用此模式,例如IOT,日志流分析等。感谢您的阅读!
翻译自: https://medium.com/@ashishverma_93245/pattern-to-efficiently-update-terabytes-of-data-in-a-data-lake-1f4981b1861
数据湖 data lake
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/389091.shtml
如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!