【实践案例】Databricks 数据洞察在美的暖通与楼宇的应用实践

简介: 获取更详细的 Databricks 数据洞察相关信息,可至产品详情页查看:https://www.aliyun.com/product/bigdata/spark

作者

美的暖通与楼宇事业部 先行研究中心智能技术部

 

美的暖通 IoT 数据平台建设背景

美的暖通与楼宇事业部(以下简称美的暖通)是美的集团旗下五大板块之一,产品覆盖多联机组、大型冷水机组、单元机、机房空调、扶梯、直梯、货梯以及楼宇自控软件和建筑弱电集成解决方案,远销海内外200多个国家。当前事业部设备数据上云仅停留在数据存储层面,缺乏挖掘数据价值的平台,造成大量数据荒废,并且不断消耗存储资源,增加存储费用和维护成本。另一方面,现有数据驱动应用缺乏部署平台,难以产生实际价值。因此,急需统一通用的 IoT 数据平台,以支持设备运行数据的快速分析和建模。

 

我们的 IoT 数据平台建设基于阿里云 Databricks 数据洞察全托管 Spark 产品,以下是整体业务架构图。在本文后面的章节,我们将就IoT数据平台建设技术选型上的一些思考,以及 Spark 技术栈尤其是 Delta Lake 场景的应用实践做一下分享。

 

image.png

 

选择 Spark & Delta Lake

在数据平台计算引擎层技术选型上,由于我们数据团队刚刚成立,前期的架构选型我们做了很多的调研,综合各个方面考虑,希望选择一个成熟且统一的平台:既能够支持数据处理、数据分析场景,也能够很好地支撑数据科学场景。加上团队成员对 Python 及 Spark 的经验丰富,所以,从一开始就将目标锁定到了 Spark 技术栈。

 

选择 Databricks 数据洞察 Delta Lake

通过与阿里云计算平台团队进行多方面的技术交流以及实际的概念验证,我们最终选择了阿里云 Databricks 数据洞察产品。作为 Spark 引擎的母公司,其商业版 Spark 引擎,全托管 Spark 技术栈,统一的数据工程和数据科学等,都是我们决定选择 Databricks 数据洞察的重要原因。

 

具体来看,Databricks 数据洞察提供的核心优势如下:

  • Saas 全托管 Spark:免运维,无需关注底层资源情况,降低运维成本,聚焦分析业务
  • 完整 Spark 技术栈集成:一站式集成 Spark 引擎和 Delta Lake 数据湖,100%兼容开源 Spark 社区版;Databricks 做商业支持,最快体验 Spark 最新版本特性
  • 总成本降低:商业版本 Spark 及 Delta Lake 性能优势显著;同时基于计算存储分离架构,存储依托阿里云 OSS 对象存储,借助阿里云 JindoFS 缓存层加速;能够有效降低集群总体使用成本
  • 高品质支持以及SLA保障:阿里云和 Databricks 提供覆盖 Spark 全栈的技术支持;提供商业化 SLA 保障与7*24小时 Databricks 专家支持服务

 

IoT 数据平台整体架构

image.png

整体的架构如上图所示。

 

我们接入的 IoT 数据分为两部分,历史存量数据和实时数据。目前,历史存量数据是通过 Spark SQL 以天为单位从不同客户关系数据库批量导入 Delta Lake 表中;实时数据通过 IoT 平台采集到云 Kafka ,经由 Spark Structured Streaming 消费后实时写入到 Delta Lake 表中。在这个过程中,我们将实时数据和历史数据都 sink 到同一张 Delta 表里,这种批流一体操作可大大简化我们的 ETL 流程(参考后面的案例部分)。数据管道下游,我们对接数据分析及数据科学工作流。

 

IoT 数据采集:从 Little Data 到 Big Data

作为 IoT 场景的典型应用,美的暖通最核心的数据均来自 IoT 终端设备。在整个 IoT 环境下,分布着无数个终端传感器。从小的维度看,传感器产生的数据本身属于 Small Data(或者称为 Little Data)。当把所有传感器连接成一个大的 IoT 网络,产生自不同传感器的数据经由 Gateway 与云端相连接,并最终在云端形成 Big Data 。

 

在我们的场景下,IoT 平台本身会对不同协议的数据进行初步解析,通过定制的硬件网络设备将解析后的半结构化 JSON 数据经由网络发送到云 Kafka。云 Kafka 扮演了整个数据管道的入口。

 

数据入湖:Delta Lake

IoT 场景下的数据有如下几个特点:

  • 时序数据:传感器产生的数据记录中包含时间相关的信息,数据本身具有时间属性,因此不同的数据之间可能存在一定的相关性。利用 as-of-join 将不同时间序列数据 join 到一起是下游数据预测分析的基础
  • 数据的实时性:传感器实时生成数据并以最低延迟的方式传输到数据管道,触发规则引擎,生成告警和事件,通知相关工作人员。
  • 数据体量巨大:IoT 网络环境下遍布各地的成千上万台设备及其传感器再通过接入服务将海量的数据归集到平台
  • 数据协议多样:通常在 IoT 平台接入的不同种类设备中,上传数据协议种类多样,数据编码格式不统一

 

IoT 数据上述特点给数据处理、数据分析及数据科学等带来了诸多挑战,庆幸的是,这些挑战借助 Spark 和 Delta Lake 都可以很好地应对。Delta Lake 提供了 ACID 事务保证,支持增量更新数据表以及流批同时写数据。借助 Spark Structed Streaming 可以实现 IoT 时序数据实时入湖。

 

以下是 Delta Lake 经典的三级数据表架构。具体到美的暖通 IoT 数据场景,我们针对每一层级的数据表分别做了如下定义:

 

image.png

 

  • Bronze 表:存储原生数据(Raw Data),数据经由 Spark Structed Streaming 从 Kafka 消费下来后 upsert 进 Delta Lake 表,该表作为唯一的真实数据表  (Single Source of Truth)
  • Silver表:该表是在对 Bronze 表的数据进行加工处理的基础上生成的中间表,在美的暖通的场景下,数据加工处理的步骤涉及到一些复杂的时序数据计算逻辑,这些逻辑都包装在了 Pandas UDF 里提供给 Spark 计算使用
  • Gold 表:Silver 表的数据施加 Schema 约束并做进一步清洗后的数据汇入 Gold 表,该表提供给下游的 Ad Hoc 查询分析及数据科学使用

 

数据分析:Ad-Hoc 查询

我们内部在开源 Superset 基础上定制了内部版本的 SQL 查询与数据可视化平台,通过 PyHive 连接到 Databricks 数据洞察 Spark Thrift Server 服务,可以将 SQL 提交到集群上。商业版本的 thrift server 在可用性及性能方面都做了增强,Databricks 数据洞察针对 JDBC 连接安全认证提供了基于 LDAP 的用户认证实现。借助 Superset ,数据分析师及数据科学家可以快速高效的对 Delta Lake 表进行数据探索。

 

数据科学:Workspace

楼宇能耗预测与设备故障诊断预测是美的暖通 IoT 大数据平台建设的两个主要业务目标。在 IoT 数据管道下游,需要对接机器学习平台。现阶段为了更快速方便地支撑起数据科学场景,我们将 Databricks 数据洞察集群与阿里云数据开发平台 DDC 打通。DDC 集成了在数据科学场景下更友好的 Jupyter Notebook ,通过在 Jupyter 上使用 PySpark ,可以将作业跑到 Databricks 数据洞察集群上;同时,也可以借助 Apache Airflow 对作业进行调度。同时,考虑到机器学习模型构建、迭代训练、指标检测、部署等基本环节,我们也在探索 MLOps ,目前这部分工作还在筹备中。

 

典型应用场景介绍

Delta Lake 数据入湖(批流一体)

 

使用 UDF 函数定义流数据写入 Delta Lake 的 Merge 规则

%spark
import org.apache.spark.sql._
import io.delta.tables._// Function to upsert `microBatchOutputDF` into Delta table using MERGE
def upsertToDelta(microBatchOutputDF: DataFrame, batchId: Long) {// Set the dataframe to view namemicroBatchOutputDF.createOrReplaceTempView("updates")// Use the view name to apply MERGE// NOTE: You have to use the SparkSession that has been used to define the `updates` dataframemicroBatchOutputDF.sparkSession.sql(s"""MERGE INTO delta_{table_name} tUSING updates sON s.uuid = t.uuidWHEN MATCHED THEN UPDATE SET t.device_id = s.device_id,t.indoor_temperature =
s.indoor_temperature,t.ouoor_temperature = s.ouoor_temperature,t.chiller_temperature =
s.chiller_temperature,t.electricity = s.electricity,t.protocal_version = s.protocal_version,t.dt=s.dt,t.update_time=current_timestamp()WHEN NOT MATCHED THEN INSERT (t.uuid,t.device_id,t.indoor_temperature,t.ouoor_temperature ,t.chiller_temperature
,t.electricity,t.protocal_version,t.dt,t.create_time,t.update_time)values
(s.uuid,s.device_id,s.indoor_temperature,s.ouoor_temperature,s.chiller_temperature,s.electricity,s.protocal_version
,s.dt,current_timestamp(),current_timestamp())""")
}

 

使用 Spark Structured Streaming 实时流写入 Delta Lake

 

%sparkimport org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.Triggerdef getquery(checkpoint_dir:String,tableName:String,servers:String,topic:String ) {var streamingInputDF =  spark.readStream.format("kafka").option("kafka.bootstrap.servers", servers).option("subscribe", topic)     .option("startingOffsets", "latest")  .option("minPartitions", "10")  .option("failOnDataLoss", "true").load()
val resDF=streamingInputDF.select(col("value").cast("string")).withColumn("newMessage",split(col("value"), " ")).filter(col("newMessage").getItem(7).isNotNull).select(col("newMessage").getItem(0).as("uuid"),col("newMessage").getItem(1).as("device_id"),col("newMessage").getItem(2).as("indoor_temperature"),col("newMessage").getItem(3).as("ouoor_temperature"),col("newMessage").getItem(4).as("chiller_temperature"),col("newMessage").getItem(5).as("electricity"),col("newMessage").getItem(6).as("protocal_version")).withColumn("dt",date_format(current_date(),"yyyyMMdd"))  
val query = resDF.writeStream.format("delta").option("checkpointLocation", checkpoint_dir).trigger(Trigger.ProcessingTime("60 seconds")) // 执行流处理时间间隔.foreachBatch(upsertToDelta _) //引用upsertToDelta函数.outputMode("update")query.start()
}

 

数据灾备:Deep Clone

由于 Delta Lake 的数据仅接入实时数据,对于存量历史数据我们是通过 SparkSQL 一次性 Sink Delta Lake 的表中,这样我们流和批处理时只维护一张 Delta 表,所以我们只在最初对这两部分数据做一次 Merge 操作。同时为了保证数据的高安全,我们使用 Databricks Deep Clone 来做数据灾备,每天会定时更新来维护一张从表以备用。对于每日新增的数据,使用 Deep Clone 同样只会对新数据 Insert 对需要更新的数据 Update 操作,这样可以大大提高执行效率。

 

CREATE OR REPLACE TABLE delta.delta_{table_name}_cloneDEEP CLONE delta.delta_{table_name};

 

性能优化:OPTIMIZE & Z-Ordering

在流处理场景下会产生大量的小文件,大量小文件的存在会严重影响数据系统的读性能。Delta Lake 提供了 OPTIMIZE 命令,可以将小文件进行合并压缩,另外,针对 Ad-Hoc 查询场景,由于涉及对单表多个维度数据的查询,我们借助 Delta Lake 提供的 Z-Ordering 机制,可以有效提升查询的性能。从而极大提升读取表的性能。DeltaLake 本身提供了 Auto Optimize 选项,但是会牺牲少量写性能,增加数据写入 delta 表的延迟。相反,执行 OPTIMIZE 命令并不会影响写的性能,因为 Delta Lake 本身支持 MVCC,支持 OPTIMIZE 的同时并发执行写操作。因此,我们采用定期触发执行 OPTIMIZE 的方案,每小时通过 OPTIMIZE 做一次合并小文件操作,同时执行 VACCUM 来清理过期数据文件:

OPTIMIZE delta.delta_{table_name} ZORDER by device_id, indoor_temperature;
set spark.databricks.delta.retentionDurationCheck.enabled = false;
VACUUM delta.delta_{table_name} RETAIN 1 HOURS;

 

另外,针对 Ad-Hoc 查询场景,由于涉及对单表多个维度数据的查询,我们借助 Delta Lake 提供的 Z-Ordering 机制,可以有效提升查询的性能。

 

总结与展望

我们基于阿里云 Databricks 数据洞察产品提供的商业版 Spark 及 Delta Lake 技术栈快速构建了 IoT 数据处理平台,Databricks 数据洞察全托管免运维、商业版本引擎性能上的优势以及计算/存储分离的架构,为我们节省了总体成本。同时,Databricks 数据洞察产品自身提供的丰富特性,也极大提升了我们数据团队的生产力,为数据分析业务的快速开展交付奠定了基础。未来,美的暖通希望与阿里云 Databricks 数据洞察团队针对 IoT 场景输出更多行业先进解决方案。

原文链接
本文为阿里云原创内容,未经允许不得转载。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/513331.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

mysql 默认时间字段 1067,mysql替datetime类型字段设置默认值default

操作服务器环境为Linux centos5.7安装的mysql版本 MariaDB,图简单直接使用命令install上去的。在自己mac上测试数据库修改Alter语句没有问题的,ALTER TABLE XXX.XXX ADD COLUMN createtime datetime NOT NULL DEFAULT NOW() AFTER userinfo;但是在服务器…

tcp ip协议_网络通信-TCP/IP协议族简述

导读:计算机与网络设备要相互通信需要遵守同样的规则。例如,如何找到通信目标、该使用哪种语言通信、怎么结束通信等规则。不同的硬件、操作系统之间的通信都需要遵循同一种规则,这种规则也称为是协议。下面本文主要从以下三个点讨论与互联网…

网站图片全自动加密_11 个值得收藏的在线工具和资源网站

pdf.toPDF 格式免费转换工具,可将 PDF 与其他文件类型相互转换,其他文件类型可自行查看。网站还提供常用的 PDF 工具,包括压缩、修复、拆分和解锁 PDF。ilovepdf完全免费。与 pdf.to 的区别是它支持转换的文件类型少,但 PDF 工具丰…

超详攻略!Databricks 数据洞察 - 企业级全托管 Spark 大数据分析平台及案例分析

简介: 5分钟读懂 Databricks 数据洞察 ~ 更多详细信息可登录 Databricks 数据洞察 产品链接:https://www.aliyun.com/product/bigdata/spark(当前产品提供¥599首购试用活动,欢迎试用!) 开源大数…

Alluxio完成C轮5000万美元融资,新设中国区总部力拓国内市场

编辑 | 宋慧 出品 | CSDN云计算 头图 | 付费下载于 IC photo 11月17日,全球开源的云原生数据编排软件开发商Alluxio宣布完成5000万美元C轮融资,该轮融资由新投资方高瓴创投领投,战略投资方和原股东a16z, Seven Seas Partners,火…

关于架构师:角色、能力和挑战

简介: 成为一名架构师可能是很多开发者的技术追求之一。那么如何理解架构?架构师是一个什么样的角色,需要具备什么样的能力?在架构师的道路上,会面临哪些挑战?本文作者道延分享他对架构以及架构师的思考和相…

php 网站移动端自适应,HTML5 移动端自适应布局

场景:为适应各种大小的屏幕自适应布局我知道的两种方式1.使用媒体查询,下面制定了几种适应方式,例如第一个表示屏幕宽度在320px-360px之间的,html字体大小适配为13.65pxmedia only screen and (max-width: 360px) and (min-width:…

python avg_python闭包

本文分为如下几个部分什么是闭包闭包与装饰器闭包等价——偏函数闭包等价——类闭包等价——其他闭包用于捕获状态值闭包等价——协程三种方法实现动态均值什么是闭包闭包是携带着一些自由变量的函数。我们直接来看一个例子def fun_out(a):def fun_in(b):return a breturn fun…

云上技术 | 混合云管理平台多Region架构

简介: 随着现代化进程加速,企业业务规模和迭代速度也今非昔比,在已具备一定规模的中大型电力系统中,会面临着数字化升级的压力,包括复杂组织架构管理、计算资源弹性扩展、IT运维提效等需求。基于电力行业属性部署一朵专…

超值爆赞丨Java 程序员推荐的学习教程,刷爆了朋友圈...

文内福利,扫码免费领取Hello,各位锋迷们,我是小千。很多学习Java的小伙伴都在找的全套免费java视频教程,这里全都有,资料齐全,拿来吧你!零基础学Java的学习路线图是怎样的?&#xff…

cas server php下载,关于用CAS Server与Php、Jetty配置实现SSO#4

最早由Yale开发的CAS在目前开源SSO市场上占据了80%的份额。简单研究了一下,感觉CAS确实比较成熟,认证流程均可通过证书保证安全,也提供了对多种App服务器和开发环境的支持。比较感兴趣的是两个:Php和Jetty,前者较为常用…

阿里大数据云原生化实践,EMR Spark on ACK 产品介绍

开源大数据社区 & 阿里云 EMR 系列直播 第六期 主题:EMR spark on ACK 产品演示及最佳实践 讲师:石磊,阿里云 EMR 团队技术专家 内容框架: 云原生化挑战及阿里实践Spark 容器化方案产品介绍和演示直播回放:扫描文…

剪映电脑版_七款手机剪辑app,效果堪比电脑软件

鉴于很多小伙伴因为没有电脑或者电脑配置低带不动pr或者AE的这类软件,但是又对剪辑有一定兴趣的人,可以先从手机上的剪辑做起,可以为以后的电脑办公打下很好的基础,亦或者有些人在电脑不方便用的情况下,想先用手机大致…

xposed 修改参数_【Android 原创】2020春节红包第三题Xposed框架Hook的应用

作者论坛账号:CrazyNut准备工具以及思路首先不了解Xposed框架Hook的可以看看大佬的基础教程 - 《教我兄弟学Android逆向12 编写xposed模块》本文不需要会看懂汇编代码,当你看完上面的文章,学会Xposed框架Hook的简单应用后。就算是从未接触过的…

Let‘s Fluent:更顺滑的MyBatis

简介: 只需瞅一眼Google Trends上全球Java界最热门的两款SQL映射框架近一年的对比数字,就不难了解其实力分布:在此领域,MyBatis早已占领东亚地区开发者市场,并以绝对优势稳居中国最抢手Java数据库访问框架之首。 作者 …

元宇宙会成为 IPv6 的拐点吗?

‍‍作者 | 马超,王丽丽,王一凡 责编 | 张红月出品 | CSDN(ID:CSDNnews)“如无必要,勿增实体”的奥卡姆剃刀原则,从IT人士的角度来看就是“只要能运行,就千万不要改”&#xf…

php网站加广告位,HotNews Pro主题文章内容上面添加广告位

使用的HotNew Pro主题后,文章内容上面没有广告位,但是有时需要在那个位置添加广告,就使用了一款叫Smart Ads广告管理插件,这个插件可以在文章内容上面和下面添加广告,直到昨天我删掉了Smart Ads这款插件,因…

电脑编程教学_东莞沙田mastercam编程学习怎么收费

东莞沙田mastercam编程学习怎么收费深圳卓越培训中心UG综合班主要课程:1,软件介绍,界面熟悉 ,快捷键,图层使用。2,草图使用,三维曲线绘制修改,草图线3D线互相转换。3,建模…

arduinowifi.send怎么获取响应_Vue3.0 响应式原理 (一)

前几天,回顾整理下关于vue2.0的响应式原理。温故而知新么,那么今天,整理了一下关于vue3.0的响应式原理,利用 JavaScript 来写的。本着尽可能的清晰易懂的原则,所以,可能会分几篇文章来发布。那现在开始上菜…

OceanBase首次阐述战略:继续坚持自研开放之路 开源300万行核心代码

简介: 在数据库OceanBase3.0峰会上,蚂蚁集团自主研发的分布式数据库OceanBase首次从技术、商业和生态三个维度对未来发展战略进行了系统性阐述。同时,OceanBase宣布正式开源,并成立OceanBase开源社区,社区官网同步上线…