企业版Spark Databricks + 企业版Kafka Confluent 联合高效挖掘数据价值

简介:本文介绍了如何使用阿里云的Confluent Cloud和Databricks构建数据流和LakeHouse,并介绍了如何使用Databricks提供的能力来挖掘数据价值,使用Spark MLlib构建您的机器学习模型。

前提条件

  • 已注册阿里云账号,详情请参见阿里云账号注册流程
  • 已开通 Databricks 数据洞察服务
  • 已开通 OSS 对象存储服务
  • 已开通 Confluent 流数据服务

创建Databricks集群 & Confluent集群

  1. 登录Confluent管理控制台,创建Confluent集群,并开启公网服务
  2. 登录Databricks管理控制台,创建Databricks集群

Databricks Worker节点公网访问

Databricks的worker节点暂时不支持公网访问,为了能访问Confluent的公网地址,请联系Databricks的开发人员添加NAT网关。

案例:出租车数据入湖及分析

出租车和网约车在每天的运行中持续产生行驶轨迹和交易数据,这些数据对于车辆调度,流量预测,安全监控等场景有着极大的价值。

本案例中我们使用纽约市的出租车数据来模拟网约车数据从产生,发布到流数据服务Confluent,通过Databricks Structured Streaming进行实时数据处理,并存储到LakeHouse的整个流程。数据存储到LakeHouse后,我们使用spark和spark sql对数据进行分析,并使用Spark的MLlib进行机器学习训练。

前置准备:

  1. 创建topic:
    登录Confluent的control center,在左侧选中Topics,点击Add a topic按钮,创建一个名为nyc_taxi_data的topic,将partition设置为3,其他配置保持默认。

  2. 创建OSS bucket:
    在和Databricks同一Region的OSS中,创建bucket,bucket命名为:databricks-confluent-integration
    进入到Bucket列表页,点击创建bucket按钮


    创建好bucket之后,在该bucket创建目录:checkpoint_dir和data/nyc_taxi_data两个目录
  3. 收集url,用户名,密码,路径等以便后续使用a。
  4. confluent集群ID:在csp的管控界面,集群详情页获取
  5. Confluent Control Center的用户名和密码
  6. 路径:
  • Databricks Structured Streaming的checkpoint存储目录
  • 采集的数据的存储目录

下面是我们后续会使用到的一些变量:

# 集群管控界面获取
confluent_cluster_id = "your_confluent_cluster_id"    
# 使用confluent集群ID拼接得到
confluent_server = "rb-{confluent_cluster_id}.csp.aliyuncs.com:9092" 
control_center_username = "your_confluent_control_center_username"
control_center_password = "your_confluent_control_center_password"topic = "nyc_taxi_data"checkpoint_location = "oss://databricks-confluent-integration/checkpoint_dir"
taxi_data_delta_lake = "oss://databricks-confluent-integration/data/nyc_taxi_data"

数据的产生

在本案例中,我们使用Kaggle上的NYC出租车数据集来模拟数据产生。

  • 我们先安装confluent的python客户端,其他语言的客户端参考confluent官网
pip install confluent_kafka
  • 构造用于创建Kafka Producer的基础信息,如:bootstrap-server,control center的username,password等
conf = {'bootstrap.servers': confluent_server,'key.serializer': StringSerializer('utf_8'),'value.serializer': StringSerializer('utf_8'),'client.id': socket.gethostname(),'security.protocol': 'SASL_SSL','sasl.mechanism': 'PLAIN','sasl.username': control_center_username,'sasl.password': control_center_password
}
  • 创建Producer:
producer = Producer(conf)
  • 向Kafka中发送消息(模拟数据的产生):
with open("/Path/To/train.csv", "rt") as f:float_field = ['fare_amount', 'pickup_longitude', 'pickup_latitude', 'dropoff_longitude', 'dropoff_latitude']for row in reader:i += 1try:for field in float_field:row[field] = float(row[field])row['passenger_count'] = int(row['passenger_count'])producer.produce(topic=topic, value=json.dumps(row))if i % 1000 == 0:producer.flush()if i == 200000:breakexcept ValueError: # discard null/NAN datacontinue 

Kafka中的partition和offset

在使用spark读取Kafka中的数据之前,我们回顾一下Kafka中的概念:partition和offset

  • partition:kafka为了能并行进行数据的写入,将每个topic的数据分为多个partition,每个partition由一个Broker负责,向partition写入数据时,负责该partition的Broker将消息复制给它的follower
  • offset:Kafka会为每条写入partition里的消息进行编号,消息的编号即为offset

我们在读取Kafka中的数据时,需要指定我们想要读取的数据,该指定需要从两个维度:partition的维度 + offset的维度。

  • Earliest:从每个partition的offset 0开始读取和加载
  • Latest:从每个partition最新的数据开始读取
  • 自定义:指定每个partition的开始offset和结束offset
  • 读取topic1 partition 0 offset 23和partition 0 offset -2之后的数据:"""{"topic1":{"0":23,"1":-2}}"""

除了指定start offset,我们还可以通过endingOffsets参数指定读取到什么位置为止。

将数据存储到LakeHouse:Spark集成Confluent

理解上述概念后,Databricks和Confluent的集成非常简单,只需要对spark session的readStream参数进行简单的设置就可以将Kafka中的实时流数据转换为Spark中的Dataframe:

lines = (spark.readStream# 指定数据源: kafka.format("kafka")# 指定kafka bootstrap server的URL.option("kafka.bootstrap.servers", confluent_server)# 指定订阅的topic.option("subscribe", topic)# 指定想要读取的数据的offset,earliest表示从每个partition的起始点开始读取.option("startingOffsets", "earliest")# 指定认证协议.option("kafka.security.protocol", "SASL_SSL").option("kafka.sasl.mechanism", "PLAIN")# 指定confluent的用户名和密码.option("kafka.sasl.jaas.config",f"""org.apache.kafka.common.security.plain.PlainLoginModule required username="{control_center_username}" password="{control_center_password}";""").load())

从kafka中读取的数据格式如下:

root|-- key: binary (nullable = true)|-- value: binary (nullable = true)|-- topic: string (nullable = true)|-- partition: integer (nullable = true)|-- offset: long (nullable = true)|-- timestamp: timestamp (nullable = true)|-- timestampType: integer (nullable = true)

由于key和value都是binary格式的,我们需要将value(json)由binary转换为string格式,并定义schema,提取出Json中的数据,并转换为对应的格式:

schema = (StructType().add('key', TimestampType()).add('fare_amount', FloatType()).add('pickup_datetime', TimestampType()).add('pickup_longitude', FloatType()).add('pickup_latitude', FloatType()).add('dropoff_longitude', FloatType()).add('dropoff_latitude', FloatType()).add('passenger_count', IntegerType()))# 将json中的列提取出来
lines = (lines.withColumn('data', from_json(col('value').cast('string'), # binary 转 stringschema))                     # 解析为schema.select(col('data.*')))                           # select value中的所有列

过滤掉错误,为空,NaN的数据:

lines = (lines.filter(col('pickup_longitude') != 0).filter(col('pickup_latitude') != 0).filter(col('dropoff_longitude') != 0).filter(col('dropoff_latitude') != 0).filter(col('fare_amount') != 0).filter(col('passenger_count') != 0))

最后,我们将解析出来的数据输出到LakeHouse中,以进行后续的分析和机器学习模型训练:

# lakehouse 的存储格式为 delta
query = (lines.writeStream.format('delta').option('checkpointLocation', checkpoint_location).option('path', taxi_data_delta_lake).start())
# 执行job,直到出现异常(如果只想执行该Job一段时间,可以指定timeout参数)
query.awaitTermination()

数据分析

我们先将LakeHouse中的数据使用Spark加载进来:

然后,我们对该Dataframe创建一个Table View,并探索fare_amount的分布:

可以看到fare_amount的最小值是负数,这显然是一条错误的数据,我们将这些错误的数据过滤,并探索fare_amount的分布:

然后我们探索价格和年份,月份,星期,打车时间的关系:

从上面可以看出两点:

  • 出租车的价格和年份有很大关系,从09年到15年呈不断增长的态势
  • 在中午和凌晨打车比上午和下午打车更贵一些。

我们再进一步探索价格和乘客数量的关系:

此外,出租车价格的另一个影响因素就是距离,这里我们借助python的geopy包和Spark的UDF来计算给定两个位置的距离,然后再分析费用和距离的关系。

经纬度的范围为[-90, 90],因此,我们第一步是清除错误的数据:

然后,我们增加一列数据:出租车行驶的距离,并将距离离散化,进行后续的分析:

统计打车距离的分布:

从上图可以看出:打车距离分布在区间[0, 15]miles内,我们继续统计在该区间内,打车价格和打车距离的关系:

如上图所示:打车价格和打车距离呈现出线性增长的趋势。

机器学习建模

在上一小节的数据分析中,我们已经提取了和出租车相关联的一些特征,根据这些特征,我们建立一个简单的线性回归模型:

打车费用 ~ (年份,打车时间,乘客数,距离)

先将特征和目标值提取出来:

对特征做归一化:

分割训练集和测试集:

建立线性回归模型进行训练:

训练结果统计:

使用Evaluator对模型进行评价:

总结

我们在本文中介绍了如何使用阿里云的Confluent Cloud和Databricks来构建您的数据流和LakeHouse,并介绍了如何使用Databricks提供的能力来挖掘数据价值,使用Spark MLlib构建您的机器学习模型。有了Confluent Cloud和Databricks,您可以轻松实现数据入湖,及时在最新的数据上进行探索,挖掘您的数据价值。欢迎您试用阿里云Confluent和Databricks。

原文链接

本文为阿里云原创内容,未经允许不得转载。 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/510880.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

解决微服务架构下流量有损问题的实践和探索

简介:绝⼤多数的软件应⽤⽣产安全事故发⽣在应⽤上下线发布阶段,尽管通过遵守业界约定俗成的可灰度、可观测和可滚回的安全⽣产三板斧,可以最⼤限度的规避发布过程中由于应⽤⾃身代码问题对⽤户造成的影响。但对于⾼并发⼤流量情况下的短时间…

5月25日,阿里云开源 PolarDB-X 将迎来升级发布

简介:2022年5月25日,阿里云开源 PolarDB-X 将升级发布新版本!PolarDB-X 从 2009 年开始服务于阿里巴巴电商核心系统, 2015 年开始对外提供商业化服务,并于 2021 年正式开源。本次发布会将重磅推出在稳定性、生态融合以…

技术分享丨云企业网CEN2.技术揭晓

简介:随着企业数字化转型的加速,越来越多的企业选择了将业务部署在云上,这其中有超过20%的企业有全球组网的需求,这就使得云上网络的规模越来越大,复杂度也越来越高,为了应对这些变化,阿里云推出…

MAE 自监督算法介绍和基于 EasyCV 的复现

简介:自监督学习(Self-Supervised Learning)能利用大量无标注的数据进行表征学习,然后在特定下游任务上对参数进行微调。通过这样的方式,能够在较少有标注数据上取得优于有监督学习方法的精度。近年来,自监…

企业实践|分布式系统可观测性之应用业务指标监控

简介:本文主要讲述如何建立应用业务指标Metrics监控和如何实现精准告警。Metrics 可以翻译为度量或者指标,指的是对于一些关键信息以可聚合的、数值的形式做定期统计,并绘制出各种趋势图表。透过它,我们可以观察系统的状态与趋势。…

1024 程序员节城市嘉年华,共话技术生涯的一万种可能!

更硬核的技术峰会,更多元的主题论坛,更丰富的科技元素……更热血的 1024 程序员节闪亮登场!由湖南湘江新区管委会主办,长沙工业与信息化局、长沙信息产业园管委会与 CSDN 联合承办的第三届 2022 1024 程序员节将于 10 月 22 - 24 …

作业帮在线业务 Kubernetes Serverless 虚拟节点大规模应用实践

简介:目前方案已经成熟,高峰期已有近万核规模的核心链路在线业务运行在基于阿里云 ACKECI 的 Kubernetes Serverless 虚拟节点。随着业务的放量,未来运行在 Serverless 虚拟节点上的服务规模会进一步扩大,将节省大量的资源成本。 …

浅析微服务全链路灰度解决方案

简介:帮助应用发布版本过程中更精细化,提高了发布过程中的稳定性。服务转移⾄请求链路上进行流量控制,有效保证了多个亲密关系的服务顺利安全发布以及服务多版本并⾏开发,进⼀步促进业务的快速发展。 作者: 十眠&…

译:零信任对 Kubernetes 意味着什么

这篇是 Buoyant 的创始人 William Morgan 文章《# What Does Zero Trust Mean for Kubernetes?》[1]的翻译,文章很好的解释了什么是零信任、为什么要实施零信任,以及服务网格如何以最小的代码实现零信任。零信任是营销炒作,还是新的机会&…

Serverless 应用中心:Serverless 应用全生命周期管理平台

简介:Serverless 应用中心,是阿里云 Serverless 应用全生命周期管理平台。通过 Serverless 应用中心,用户在部署应用之前无需进行额外的克隆、构建、打包和发布操作,即可快速部署和管理应用。Serverless 应用中心帮助用户快速联动…

云钉一体:EventBridge 联合钉钉连接器打通云钉生态

简介:今天,EventBridge 联合钉钉连接器,打通了钉钉生态和阿里云生态,钉钉的生态伙伴可以通过通道的能力驱动阿里云上海量的计算力。 作者:尘央 背景 “以事件集成阿里云,从 EventBridge 开始”是 EventB…

开源当道,群英荟萃!1024 程序员节北京峰会火热来袭

1024 程序员节,致敬每一位二进制世界的主角。由开放原子开源基金会主办,北京经开区国家信创园、CSDN 承办的 2022 1024 程序员节北京峰会将于 10 月 24 日精彩来袭。以“软件新时代 开源创未来”为主题,聚焦开源新潮流,诚邀广大程…

超全,一图了解 2022 长沙 · 中国 1024 程序员节!

超全版来啦!2022 长沙 中国 1024 程序员节重磅大咖再聚,共话中国技术新生态你想了解的全在这里收藏!收藏!收藏!

1024 程序员节技术英雄会鸣锣开场,问道中国技术新生态

战鼓鸣,英雄至。10 月 24 日,2022 长沙中国 1024 程序员节重磅环节“技术英雄会”鸣锣开场!中国工程院院士、开源掌门人领衔,各领域专家、精英云集,围绕本届大会主题“算力新时代,开源创未来”,…

无尽创想!CSDN 1024 大赛重磅发布

在构建科技世界的过程中,1024 这个数字被赋予了特殊的意义,它代表着广大的程序员群体,更蕴藏着无穷的想象力与价值。在 1024 程序员节发展为程序员的盛会之后,1024 大赛应运而生,并作为 1024 程序员节全新的板块重磅发…

小镇青年程序员的逆袭人生:从差点回老家到荔枝技术骨干

编者按: 1024 是 2 的十次方,是二进制计数的基本计量单位之一。在计算机的发展史中,在和 0/1 所代表的二进制世界里,有人用代码编织出了形形色色的数字、程序、互联网,创造出一个个神话。 ——他们就是一群可爱、低调…

1024统信举办首届技术开放日,硬核技术引领操作系统“大迁移”

10月24日程序员节之际,统信软件首届技术开放日在国家信创园区圆满落下帷幕。统信软件首届技术开放日囊括UP主直播互动、打卡探园、“大迁移”主题论坛、全系产品体验等精彩环节。来自统信软件研发部门负责人、行业专家、技术大咖以及专业媒体代表百余人莅临活动现场…

FFA 议程上线!实时化浪潮下,Apache Flink 还将在大数据领域掀起怎样的变革?...

Flink Forward Asia 2022 将于 11 月 26-27 日在线上举办,议程内容正式上线!今年是 Flink Forward Asia(下文简称 FFA)落地中国的第五个年头,也是 Flink 成为 Apache 软件基金会顶级项目的第八年。过去这几年&#xff…

全面提升易用性:OpenClusterManagement 0.7 版本发布

简介:千呼万唤始出来,三月末 OpenClusterManagement 社区正式发布了 v0.7 版本。在新的版本有一系列新的功能特性欢迎感兴趣的读者体验探索,同时在这个版本中社区维护者对目前已有的功能也修复了一些问题并对面向最终用户的体验进行了打磨和提…

“晕乎乎的概念”:阿里云函数计算的“应用”又是个啥

简介:为什么阿里云函数计算发布了这么多功能,只有少数的功能会伴随着体验活动一起来做运营?那么这个“应用”到底是何方神圣?他和现在“服务”,“函数”有啥关系? 作者:刘宇 曾经,…