从提取层、处理层、基础结构入手,带你了解Spark和Kafka!

作者 | Jaskaran S. Puri

译者 | 火火酱 责编 | 徐威龙

封图| CSDN 下载于视觉中国

电子商务市场中典型的一天是这样的:每分钟发生1万个事件流,并且要选择合适的工具对其进行处理。

本文将帮助你重新创建一个场景,其中有大量的数据流入,你不仅需要存储这些数据,还需要对其进行一些实时分析。

这只是系统设计(System Design)的例子之一,你必须在其中开发高度可用且可扩展的数据管道。虽然在电子商务这样的用例中,可能有n个需要考虑的其他因素,但在本文中,我们会将其分成3个主要部分:

1.  提取(Ingestion)

2.  处理(Processing)

3.  输出(Output)

简而言之,几乎所有系统设计都从这个角度进行分析的,同样,这也是最容易出问题的地方。

提取层(INGESTION LAYER)

在介绍我们的工具之前,先退一步,看看这里要解决的是什么样的用例或问题。要想了解输入或提取层(Input or Ingestion layer),首先要先了解一下输出层(Output layer)。一般来讲,输出层可以通过两种方式实现:

1.  批处理(Batch Processing):如果你只进行一次分析,或者只是要更新一下每日报告,又或者只是在团队中进行随机演示,那么你可以选择成批地提取数据。这意味着可能要从你的DB数据库中取出小部分数据转储,并对其进行分析。

2.  实时处理(Real-Time Processing):也被称为流数据(Streaming Data),这种方式在进行十分重要的数据分析的情况下经常使用。这在B2C场景中最为常见。

批处理的好处是,它减少了构建实时管道的开销,而且你永远不用处理完整的数据集。尽管这在B2C环境中并不适用,尤其是在电子商务环境中,你必须推荐新产品、跟踪用户行为或设计实时仪表板。

现在,在了解了输出层的实时特性之后,我们就要选择相应的提取工具(Ingestion Tools)了。当然,要想从各种用例中获取数据,有很多工具可供选择。但是根据流行程度、社区实力和在各种用例中的实现情况来看,Kafka和Spark Streaming是很好的选择。

(Kafka:https://kafka.apache.org/

(Spark Streaming:https://spark.apache.org/streaming/

同样,要重视了解业务需求,以便确定执行同一工作的几个不同工具。在电子商务这样的场景中,我们知道需要实时输出数据,但是怎样才算实时呢?

1-2秒就算相当实时了!这点没错,但对于电子商务网站来说却并非如此,因为用户并不会等待一秒之后再执行下次点击。这就引出了延迟(latency)的概念。这是我们用来选择提取工具的标准。这两个工具之间有很多不同之处,但Kafka能够提供毫秒级的延迟!

处理层(PROCESSING LAYER)

在此用例中,我们将分别讨论Spark和Kafka的处理机制。我们将会看到spark如何处理底层硬件本不应该持有的数据。另一方面,我们也将看到使用Kafka来消费数据是多么的容易,以及其是如何处理百万级规模的数据。

我将使用以下来自Kaggle的数据集,该数据集行数超过1亿。

https://www.kaggle.com/mkechinov/ecommerce-behavior-data-from-multi-category-store

(Kaggle:https://kaggle.com/

除非你有一台非常高端的计算机,否则不可能将整个数据集加载到本地机器的内存中,甚至不可能将其拆分为多批进行处理,当然,除非你对每批传入都执行处理,这就是为什么我们要使用Spark。

(Spark:https://spark.apache.org/)

基础结构

设置spark有其复杂性,因此,为了加快速度,我们将在Databricks上启动一个spark集群,使你能够在AWS S3(数据驻留的地方)的数据支持下快速启动集群。

(Databricks:http://databricks.com/try-databricks

(AWS S3:https://aws.amazon.com/s3/)

Spark遵循典型的Master-Slave架构,该体系概括而言即主服务器(Master Server)负责所有的作业调度及一些其他工作,从服务器(Slave Server)负责执行实际操作或将数据保存在内存中。    

 Spark 架构, 1 主节点(Master Node) + 2 工作/从节点(Worker/Slave Nodes)

当我们在数据上实现Spark时,会再对其进行详细讨论。就目前而言,我在数据块上构建了一个1个工作节点(Worker Node)+ 1个主节点(Master Node)集群,总共有2个核和8 GB内存,尽管完整的配置应该是4个核和16 GB内存。

之所以是2核8GB内存是因为我们所有的spark操作都只在工作节点上进行,而我们只有一个工作节点。内核数量(也就是2)将在这里扮演一个十分关键的角色,因为所有的并行化工作都将在这里发生。

在8GB内存上读取14GB数据

内存中只能存储小部分数据,因此spark所做的是:它只在你要对数据执行某些操作时,才将数据加载到内存中。例如,下面这行代码将并行读取我们的数据集,即利用2个内核来读取我们的数据。

ecomm_df = sparkSession.read.csv("/mnt/%s/ecomm.csv" % MOUNT_NAME, header=True
我们将以大约112个小块的形式提取14 GB的文件,由于我们有2个内核,所以每次取2个小块,每个小块128MB。
尽管如此,spark不会在你提交该命令时立即开始读取文件,因为还有另一个延迟计算(lazy evaluation)的概念,这使得它不能按照传统的python方式进行读取!但是我们仍然可以通过快速转换为RDD来检查该文件的分区/块数量。
ecomm_df.rdd.getNumPartitions()
OUTPUT: 110 #Number of partitions(延迟计算:https://bit.ly/2xxt2Br)

这与我们计算的十分接近。查看以下链接,了解我是如何从14 GB的文件大小中计算出112个分区的。

(链接地址:https://bit.ly/2IR77Yh

现在,在不涉及太多技术信息的情况下,我们来快速浏览一下数据:

# A SAMPLE RECORD OF OUR DATARow(event_time='2019-11-01 00:00:00 UTC', event_type='view', product_id='1003461', category_id='2053013555631882655', category_code='electronics.smartphone', brand='xiaomi', price='489.07', user_id='520088904', user_session='4d3b30da-a5e4-49df-b1a8-ba5943f1dd33')

筛选出那些仅购买了小米智能手机的人,然后执行LEFT JOIN。看看每个命令是如何被分成110个任务和2个始终并行运行的任务的。

按照品牌,分析有百分之多少的用户仅查看、添加到购物车还是购买了特定的商品,

现在你已经了解了spark的功能,这是一种可以在有限的资源集上训练/分析几乎任何大小的数据的可扩展的方法。

模拟数据的实时提取

我们之前已经讨论了不少关于Kafka的问题,所以就不在这里进行深入探讨了,让我们看看在真实场景中摄取这类数据的Kafka管道是什么样子的!

无论何时,当我们要讨论的是1亿数量级事件时,可扩展性就是要优先考虑的重中之重,对分区(partitions)和消费者组(consumer groups)的理解也是如此。在如此大的提取量下,这两个要素可以破坏我们的系统。看看下面这个架构,大致了解一下Kafka系统。       

  你的邮箱/信箱就是此模型在现实生活中的副本。

1.  邮递员:这个人是生产者,他的工作是挑选数据并将其放入你的邮箱中。

2.  邮箱/信箱:这是你的代理商,如果没有人来收信的话,信就会在这里不断堆积。

3.  你的地址:这是你的主题(topic),邮递员是如何知道要把数据发送到哪里的呢?

4.  你:你就是消费者,你有责任收集这些数据并对其进行进一步处理。

这是对Kafka数据流机制的非常简单的解释,能够帮助我们进一步理解本文,而分区和消费者组的概念也能够帮助你更好地理解代码片段。       

  主题容纳着数据,可以被分成n个分区以供并行使用。

在此种规模下,你需要并行处理数据。为此,可以将传入的数据分成不同的分区,此时,我们可以设置消费者组,这意味着有多个消费者希望从同一个源读取数据。

参照上面的体系结构,两个消费者从同一个源读取数据,因此,会在同一时间读取更多的数据,但读取的数据却不相同。如果消费者1(Consumer 1)已经读取了第1行和第2行,那么消费者2(Consumer 2)将永远不会看到这些数据,因为这种分离在分区级上就已经发生了。

下面是我使用分区和消费者组大规模提取此类数据的一个实现:

# 4 Partitions Made# Topic Name : ecomm_test./kafka_2.11-2.3.1/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 4 --topic ecomm_test# Send data to ecomm_test topicproducer.send(topic='ecomm_test', value=line)# Start 2 consumers and assign it to the group "ecommGroup"consumer = KafkaConsumer('ecomm_test', group_id='ecommGroup')consumer = KafkaConsumer('ecomm_test', group_id='ecommGroup')# Output of how consumer 1 reads data, only reading from 2 partitions i.e. 0 & 1ConsumerRecord(topic=u'ecomm_test', partition=1, value='2019-11-01 00:00:01 UTC,view,17302664,2053013553853497655,,creed,28.31,561587266,755422e7-9040-477b-9bd2-6a6e8fd97387\n')ConsumerRecord(topic=u'ecomm_test', partition=0, value='2019-11-01 00:00:01 UTC,view,3601530,2053013563810775923,appliances.kitchen.washer,lg,712.87,518085591,3bfb58cd-7892-48cc-8020-2f17e6de6e7f\n')# Output of how consumer 2 reads data, only reading from 2 partitions i.e. 2 & 3ConsumerRecord(topic=u'ecomm_test', partition=3, value='2019-11-01 00:00:05 UTC,view,4600658,2053013563944993659,appliances.kitchen.dishwasher,samsung,411.83,526595547,aab33a9a-29c3-4d50-84c1-8a2bc9256104\n')ConsumerRecord(topic=u'ecomm_test', partition=2, value='2019-11-01 00:00:01 UTC,view,1306421,2053013558920217191,computers.notebook,hp,514.56,514028527,df8184cc-3694-4549-8c8c-6b5171877376\n')KISS生产架构:输出

我们只需要确保自己符合以下概念即可:

1.  KISS:保持简单愚蠢:保持架构尽可能的简单。

2.  微服务:解耦组件以避免一系列的故障。

3.  CAP定理:一致性、可用性、分区容错性。选择两个对你来说最重要的。

最后,我们将介绍可以在生产系统中实现的最终架构,尽管它们还涉及许多其他组件(如可用区、存储系统和故障转移计划)但本文只是对生产中最终处理层的概述。

          具有数据流的完整架构图

如你所见,图中描述得非常明白,没有针对所有用例的正确的架构/系统设计。你只需要根据给定资源构建出可行的方案。

原文:https://towardsdatascience.com/knowing-pyspark-and-kafka-a-100-million-events-use-case-5910159d08d7

推荐阅读:还不知道 AWS 是什么?这 11 个重点带你认识 AWS !
数据库连接池的原理没你想得这么复杂
为什么程序员如此“嫌弃”主干开发模式?
智能合约编写之 Solidity 的设计模式
2020年,5种将死的编程语言
我去,同事居然用明文存储密码!!!
真香,朕在看了!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/518733.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Java-switch选择结构

基本类型 public class SwitchDemo01 {public static void main(String[] args) {// case 穿透char grade B;switch (grade) {case A:System.out.println("优秀");break;case B:System.out.println("良好");break;case C:System.out.println("及格&…

高性能服务器架构思路【不仅是思路】

在服务器端程序开发领域,性能问题一直是备受关注的重点。业界有大量的框架、组件、类库都是以性能为卖点而广为人知。然而,服务器端程序在性能问题上应该有何种基本思路,这个却很少被这些项目的文档提及。本文正式希望介绍服务器端解决性能问…

v-if v-show区别(面试题)

v-if、v-show顾名思义就是用来判断视图层展示效果的,在Vue中文社区说明文档的介绍有: 相同点: v-if与v-show都可以动态控制dom元素显示隐藏 不同点: 1.v-if :可以根据表达式的值在DOM中生成或移除一个元素。 v-show:可…

专访世界顶级游戏背后的男人

‍作者 | 华为云和TA的朋友们封图| CSDN 下载于视觉中国如今,对于网易不鸣工作室CEO王希来说,团队的代表作《战意》已经获得阶段性成功。但不为人知的是,从下定决心要做世界顶级游戏,到这款作品萌生想法,再到正式公测&…

Java-While循环

public class WhileDemo01 {public static void main(String[] args) {// 输出 1 ~ 100int i 0;while (i<100) {i;System.out.println(i);}// 计算 1234...100&#xff1f;int j 0;int sum 0;while (j<100){sum sum j;j;}System.out.println(sum); // 5050} }https…

如何将深度学习训练速度提升一百倍?PAISoar 来了

阿里妹导读&#xff1a;得力于数据规模增长、神经网络结构的演进和计算能力的增强&#xff0c;深度学习的图像处理、语音识别等领域取得了飞速发展。随着训练数据规模和模型复杂度的不断增大&#xff0c;如何充分利用分布式集群的计算资源加快训练速度&#xff0c;提升业务支持…

[Err] 1055 - Expression #1 of ORDER BY clause is not in GROUP BY clause 的问题 MySQL

show variables like "sql_mode"; set sql_mode; set sql_modeNO_ENGINE_SUBSTITUTION,STRICT_TRANS_TABLES;

破解面试难题8个角度带你解读SQL面试技巧!

作者 | Xinran Waibel译者 | 天道酬勤 责编 | 徐威龙封图| CSDN 下载于视觉中国SQL是用于数据分析和数据处理的最重要的编程语言之一&#xff0c;因此SQL问题始终是与数据科学相关工作&#xff08;例如数据分析师、数据科学家和数据工程师&#xff09;面试过程中的一部分。 SQ…

一文读懂架构整洁之道(附知识脉络图)

程序的世界飞速发展&#xff0c;今天所掌握的技能可能明年就过时了&#xff0c;但有一些东西是历久弥新&#xff0c;永远不变的&#xff0c;掌握了这些&#xff0c;在程序的海洋里就不会迷路&#xff0c;架构思想就是这样一种东西。 本文是《架构整洁之道》的读书笔记&#xf…

python获取视频时长方法

1.使用subprocess和re import re import subprocess video r"work/train/video/a8b96f016a28d8f3836f7cbb7734ecde.mp4" import subprocessdef get_length(filename):result subprocess.run(["ffprobe", "-v", "error", "-sho…

阿里敏捷教练全面解析淘宝直播敏捷实践之路

背景介绍 阿里很少提敏捷转型或DevOps&#xff0c;阿里是强业务驱动的&#xff0c;不管用什么办法&#xff0c;一定要达到业务目标。 我来自敏捷教练团队&#xff0c;我们的职责是帮助团队拿结果。这里的团队不限于研发团队&#xff0c;我现在支持的团队包括销售团队和产品运…

int默认值为0,Integer默认值为null

前提概要 Java为每个原始类型提供了封装类&#xff0c;Integer是java为int提供的封装类。 int的默认值为0&#xff0c;而Integer的默认值为null&#xff0c;即Integer可以区分出未赋值和值为0的区别&#xff0c;int则无法表达出未赋值的情况。 代码示例 public class…

GitHub 接连封杀开源项目惹众怒,CEO 亲自道歉!

作者 | 唐小引头图 | CSDN 下载自东方 IC出品 | CSDN&#xff08;ID&#xff1a;CSDNnews&#xff09;王坚博士曾经做过这样一个非常形象的比喻&#xff0c;他将做 App 比作是在别人的花园里弄盆栽&#xff0c;「种点花草是没有问题的」&#xff0c;不过「别人叫你的产品下架你…

一键托管,阿里云全链路追踪服务正式商用:成本仅自建1/5或更少

随着互联网架构的扩张&#xff0c;分布式系统变得日趋复杂&#xff0c;越来越多的组件开始走向分布式化&#xff0c;如微服务、消息收发、分布式数据库、分布式缓存、分布式对象存储、跨域调用&#xff0c;这些组件共同构成了繁杂的分布式网络。 在一次800多人的开发者调研中&…

基于Docker的Mysql主从复制搭建_mysql5.7.x

文章目录为什么基于Docker搭建&#xff1f;一、拉取镜像创建容器1. 拉取mysql:5.7镜像2. 创建master容器3. 创建slave容器4. 查看正在运行的容器5. 此时可以使用Navicat等工具测试连接mysql二、搭建Master(主)服务器2.1. 进入到Master容器内部2.2. my.cnf编辑2.2. 重启mysql服务…

医疗保健、零售、金融、制造业……一文带你看懂大数据对工业领域的影响!...

作者 | Zubair Hassan译者 | 风车云马 责编 | 徐威龙封图| CSDN 下载于视觉中国随着大数据技术的兴起&#xff0c;工业领域在很大程度上发生了变化。智能手机和其他通讯方式的使用迅速增加&#xff0c;使得每天都能收集大量数据。以下是大数据对工业领域的影响。如今&#xff0…

mysql主从复制排错

使用start slave开启主从复制过程后&#xff0c;如果SlaveIORunning一直是Connecting&#xff0c;则说明主从复制一直处于连接状态&#xff0c;这种情况一般是下面几种原因造成的&#xff0c;我们可以根据 Last_IO_Error提示予以排除。 可能的原因说明网络不通查看master和sla…

揭秘!一个高准确率的Flutter埋点框架如何设计

背景 用户行为埋点是用来记录用户在操作时的一系列行为&#xff0c;也是业务做判断的核心数据依据&#xff0c;如果缺失或者不准确将会给业务带来不可恢复的损失。闲鱼将业务代码从Native迁移到Flutter上过程中&#xff0c;发现原先Native体系上的埋点方案无法应用在Flutter体…

如何运行没有Root权限的Docker?干货来了!

作者 | Vaibhav Raizada译者 | 天道酬勤责编 | 徐威龙封图| CSDN 下载于视觉中国在本文中&#xff0c;我们讨论了如何在没有root权限的情况下运行Docker&#xff0c;以便更好地管理容器中的安全性。Docker作为Root用户Docker以root用户身份运行其容器。但是你的工作负载真的需要…

搭建主从数据库出现的错误 error connecting to master ‘slave@172.17.0.2:3306‘ - retry-time: 30 retries: 1

在搭建主从数据库的时候出现了报错 出现错误的截图&#xff1a; 解决办法&#xff1a; 重新授权 CREATE USER slave% IDENTIFIED BY 123456; GRANT REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO slave%;参考链接: 搭建主从数据库出现的错误error connecting to master …