使用 Elastic Observability 监控 dbt 管道

作者:来自 Elastic Almudena Sanz Olivé•Tamara Dancheva

了解如何使用 Elastic 设置 dbt 监控系统,该系统可主动发出数据处理成本峰值、每张表的行数异常以及数据质量测试失败的警报。

在 Elastic 可观察性组织内的数据分析团队中,我们使用 dbt(dbt™,data build tool - 数据构建工具)来执行我们的 SQL 数据转换管道。dbt 是一个 SQL 优先的转换工作流,可让团队快速协作地部署分析代码。具体来说,我们使用开源项目 dbt core,你可以在其中从命令行进行开发并运行你的 dbt 项目。

我们的数据转换管道每天运行并处理数据,这些数据将提供给我们的内部仪表板、报告、分析和机器学习 (ML) 模型。

过去曾发生过管道故障、源表包含错误数据或我们对 SQL 代码进行了更改导致数据质量问题的事件,而我们只有在每周报告中看到显示异常数量的记录时才意识到这一点。这就是为什么我们建立了一个监控系统,该系统会在这些类型的事件发生时主动向我们发出警报,并帮助我们进行可视化和分析以了解其根本原因,从而为我们节省了几个小时或几天的手动调查时间。

我们利用自己的可观察性解决方案来帮助解决这一挑战,监控 dbt 实施的整个生命周期。此设置使我们能够跟踪模型的行为并对最终表进行数据质量测试。我们将运行作业和测试中的 dbt 流程日志导出到 Elasticsearch,并利用 Kibana 创建仪表板、设置警报和配置机器学习作业来监控和评估问题。

下图显示了我们的完整架构。在后续文章中,我们还将介绍如何使用 OTEL 和 Elastic 观察我们的 python 数据处理和 ML 模型流程 - 敬请期待。

为什么要使用 Elastic 监控 dbt 管道?

每次调用时,dbt 都会生成并保存一个或多个 JSON 文件(称为 artifacts),其中包含有关调用结果的日志数据。根据 dbt 文档,dbt run 和 dbt test 调用日志存储在文件 run_results.json 中:

This file contains information about a completed invocation of dbt, including timing and status info for each node (model, test, etc) that was executed. In aggregate, many run_results.json can be combined to calculate average model runtime, test failure rates, the number of record changes captured by snapshots, etc.

监控 dbt 运行调用日志有助于解决多个问题,包括跟踪和警告表容量、检测资源密集型模型的过多时隙时间、识别由于时隙时间或容量导致的成本峰值以及查明可能表明存在调度问题的缓慢执行时间。当我们将 PR 与存在问题的代码更改合并时,此系统至关重要,导致上游表 A 中的每日行数突然下降。通过将 dbt 运行日志导入 Elastic,我们的异常检测作业快速识别出表 A 及其下游表 B、C 和 D 的每日行数异常。数据分析团队收到了有关该问题的警报通知,使我们能够及时排除故障、修复和回填表,以免影响每周仪表板和下游 ML 模型。

监控 dbt 测试调用日志还可以解决多个问题,例如识别表中的重复项、通过验证所有枚举字段来检测特定字段允许值的未注意到的更改,以及解决各种其他数据处理和质量问题。借助仪表板和数据质量测试警报,我们可以主动识别重复键、意外类别值和空值增加等问题,从而确保数据完整性。在我们的团队中,我们遇到了一个问题,即我们的原始查找表之一的更改导致用户表中出现重复行,从而使报告的用户数量翻倍。通过将 dbt 测试日志导入 Elastic,我们的规则检测到一些重复测试失败。团队收到了有关该问题的警报通知,使我们能够通过找到根本原因的上游表立即对其进行故障排除。这些重复意味着下游表必须处理 2 倍的数据量,从而导致处理的字节数和时隙时间激增。dbt 运行日志上的异常检测和警报还帮助我们发现各个表的这些峰值,并使我们能够量化对账单的影响。

使用 Elastic 和 Kibana 处理 dbt 日志使我们能够获得实时洞察,帮助我们快速排除潜在问题,并确保我们的数据转换过程顺利运行。我们在 Kibana 中设置了异常检测作业和警报,以监控 dbt 处理的行数、时隙时间和测试结果。这让我们能够捕捉实时事件,通过及时识别和修复这些问题,Elastic 使我们的数据管道更具弹性,我们的模型更具成本效益,帮助我们掌握成本峰值或数据质量问题。

我们还可以将这些信息与 Elastic 中采集的其他事件关联起来,例如使用 Elastic Github 连接器,我们可以将数据质量测试失败或其他异常与代码更改关联起来,以找到导致问题的提交或 PR 的根本原因。通过将应用程序日志采集到 Elastic,我们还可以使用 APM 分析管道中的这些问题是否影响了下游应用程序,从而增加了延迟、吞吐量或错误率。采集账单、收入数据或网络流量,我们还可以看到业务指标的影响。

如何将 dbt 调用日志导出到 Elasticsearch

我们在生产中每天运行 dbt run 和 dbt test 流程后,使用 Python Elasticsearch 客户端将 dbt 调用日志发送到 Elastic。设置只需要你安装 Elasticsearch Python 客户端并获取你的 Elastic Cloud ID(转到 https://cloud.elastic.co/deployments/,选择你的部署并找到 Cloud ID)和 Elastic Cloud API 密钥(按照本指南操作)

此 python 辅助函数将把 run_results.json 文件中的结果索引到指定的索引。你只需将变量导出到环境:

  • RESULTS_FILE:run_results.json 文件的路径
  • DBT_RUN_LOGS_INDEX:你想要在 Elastic 中为 dbt 运行日志索引指定的名称,例如 dbt_run_logs
  • DBT_TEST_LOGS_INDEX:你想要在 Elastic 中为 dbt 测试日志索引指定的名称,例如dbt_test_logs
  • ES_CLUSTER_CLOUD_ID
  • ES_CLUSTER_API_KEY

然后从你的 Python 代码中调用函数 log_dbt_es,或者将此代码保存为 Python 脚本,并在执行你的 dbt run 或 dbt test 命令后运行它:

from elasticsearch import Elasticsearch, helpers
import os
import sys
import jsondef log_dbt_es():RESULTS_FILE = os.environ["RESULTS_FILE"]DBT_RUN_LOGS_INDEX = os.environ["DBT_RUN_LOGS_INDEX"]DBT_TEST_LOGS_INDEX = os.environ["DBT_TEST_LOGS_INDEX"]es_cluster_cloud_id = os.environ["ES_CLUSTER_CLOUD_ID"]es_cluster_api_key = os.environ["ES_CLUSTER_API_KEY"]es_client = Elasticsearch(cloud_id=es_cluster_cloud_id,api_key=es_cluster_api_key,request_timeout=120,)if not os.path.exists(RESULTS_FILE):print(f"ERROR: {RESULTS_FILE} No dbt run results found.")sys.exit(1)with open(RESULTS_FILE, "r") as json_file:results = json.load(json_file)timestamp = results["metadata"]["generated_at"]metadata = results["metadata"]elapsed_time = results["elapsed_time"]args = results["args"]docs = []for result in results["results"]:if result["unique_id"].split(".")[0] == "test":result["_index"] = DBT_TEST_LOGS_INDEXelse:result["_index"] = DBT_RUN_LOGS_INDEXresult["@timestamp"] = timestampresult["metadata"] = metadataresult["elapsed_time"] = elapsed_timeresult["args"] = argsdocs.append(result)_ = helpers.bulk(es_client, docs)return "Done"# Call the function
log_dbt_es()

如果你想从 run_results.json 中添加/删除任何其他字段,你可以修改上述函数来执行此操作。

将结果编入索引后,你可以使用 Kibana 为两个索引创建数据视图,并开始在 Discover 中探索它们。

转到 Discover,单击左上角的数据视图选择器,然后单击 “Create a data view”。

现在,你可以用你喜欢的名称创建数据视图。对 dbt run(代码中的 DBT_RUN_LOGS_INDEX)和 dbt test(代码中的 DBT_TEST_LOGS_INDEX)索引执行此操作:

返回 Discover,你将能够选择 Data Views 并探索数据。

dbt run 警报、仪表板和 ML 作业

dbt run 的调用针对当前数据库执行已编译的 SQL 模型文件。dbt run 调用日志包含以下字段:

  • unique_id:唯一模型标识符
  • execution_time:执行此模型运行所花费的总时间

日志还包含有关适配器作业执行的以下指标:

  • adapter_response.bytes_processed
  • adapter_response.bytes_billed
  • adapter_response.slot_ms
  • adapter_response.rows_affected

我们已使用 Kibana 根据上述指标设置异常检测作业。你可以配置按 unique_id 拆分的 multi-metric job,以便在每个表受影响的行数、消耗的时隙时间或计费的字节数的总和异常时发出警报。你可以跟踪每个指标的一个作业。如果你已为每个表构建了指标的仪表板,则可以使用此快捷方式直接从可视化中创建异常检测作业。创建作业并运行传入数据后,你可以 view the jobs 并使用异常时间线中的三个点按钮将其添加到仪表板:

我们使用 ML 作业设置了警报,当检测到异常时,会向我们发送电子邮件/Slack 消息。可以直接从 “Machine Learning > Anomaly Detection Jobs)” 页面创建警报,方法是单击 ML 作业行末尾的三个点:

我们还使用 Kibana 仪表板可视化每个表的异常检测作业结果和相关指标,以确定哪些表消耗了我们的大部分资源,了解它们的时间演变,并测量可以帮助我们了解月度变化的汇总指标。

dbt test 警报和仪表板

你可能已经熟悉 dbt 中的测试,但如果你不熟悉,dbt 数据测试就是你对模型做出的断言(assertions)。使用命令 dbt test,dbt 将告诉你项目中的每个测试是通过还是失败。这里是如何设置它们的示例。在我们的团队中,我们使用开箱即用的 dbt 测试(unique、not_null、accepted_values 和 relationships)以及包 dbt_utils 和 dbt_expectations 进行一些额外的测试。运行命令 dbt test 时,它会生成存储在 run_results.json 中的日志。

dbt 测试日志包含以下字段:

  • unique_id:唯一测试标识符,测试在其唯一标识符中包含 “test” 前缀
  • status:测试结果,pass 或 fail
  • execution_time:执行此测试所花费的总时间
  • failures:如果测试通过,则为 0,如果测试失败,则为 1
  • message:如果测试失败,则为失败的原因

日志还包含来自适配器的有关作业执行的指标。

我们已经设置了文档计数警报(参见指南),当有任何失败的测试时,它将向我们发送电子邮件/Slack 消息。警报规则是在我们之前创建的 dbt 测试数据视图上设置的,查询过滤 status:fail 以获取失败测试的日志,规则条件是文档计数大于 0。每当生产中的任何测试失败时,我们都会收到一条警报,其中包含指向警报详细信息和仪表板的链接,以便能够对其进行故障排除:

我们还构建了一个仪表板来可视化运行的测试、失败的测试及其执行时间和时间段,以查看测试运行的历史视图:

使用 AI 助手查找根本原因

对于我们来说,分析这些多种信息源的最有效方法是使用 AI 助手帮助我们排除故障。在我们的案例中,我们收到了有关测试失败的警报,并使用 AI 助手为我们提供发生的事情的背景信息。然后我们询问是否有任何下游后果,AI 助手解释了异常检测作业的结果,这表明我们的下游表之一的时隙时间激增,并且时隙时间与基线相比有所增加。然后,我们询问根本原因,AI 助手能够从我们的 Github 变更日志中找到并向我们提供 PR 链接,该 PR 与事件的开始相匹配,并且是最可能的原因。

结论

作为数据分析团队,我们有责任保证我们向利益相关者提供的表格、图表、模型、报告和仪表板准确无误,并包含正确的信息来源。随着团队的发展,我们拥有的模型数量变得越来越大,相互联系也越来越紧密,要保证一切顺利运行并提供准确的结果并不容易。拥有一个可以主动提醒我们成本飙升、行数异常或数据质量测试失败的监控系统,就像拥有一个值得信赖的伙伴,如果出现问题,它会提前提醒你,并帮助你找到问题的根本原因。

dbt 调用日志是有关我们数据管道状态的重要信息来源,而 Elastic 是从中挖掘最大潜力的完美工具。使用这篇博文作为利用 dbt 日志的起点,帮助你的团队实现更高的可靠性和安心,让他们专注于更具战略性的任务,而不必担心潜在的数据问题。

原文:Monitor dbt pipelines with Elastic Observability — Elastic Observability Labs

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/58598.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

网站攻击,XSS攻击的类型

XSS(跨站脚本)攻击是一种网络安全攻击方式,攻击者通过在网站页面中注入恶意脚本,使脚本在其他用户的浏览器中执行,从而窃取用户信息、篡改页面内容或操控用户账户。这类攻击通常利用网站对输入数据的过滤不严格&#x…

数据库 示例解析

描述: 找出顾客订单中所花运费比他所下订单平均运费的两倍都还贵的订单号,列出cOrderNo。运费属性名为mShippingCost,顾客号属性为cCustomerID。 代码示例: SELECT o.cOrderNo FROM orders o WHERE o.mShippingCost > (SELE…

2023年信息安全工程师摸底测试卷

目录 1.密码算法 2.等级保护 3.密码学 4.安全评估 5.网络安全控制技术 6.恶意代码 7.身份认证 8.资产管理 9.密码分类 10.被动攻击 11.商用密码服务​编辑 12.超文本传输协议 13.数字水印技术 14.信息系统安全设计 15.重放攻击 16.信息资产保护 17.身份认证 …

大数据治理:确保数据价值与合规性的战略框架

大数据治理:确保数据价值与合规性的战略框架 引言 在信息技术迅猛发展的今天,数据已成为推动企业增长和创新的关键资源。根据统计,全球数据的生成量在每两年内翻一番,预计到2025年,全球数据总量将达到175ZB&#xff…

linux驱动—在自己的总线目录下创建属性文件

在总线目录下创建属性文件以扩展其功能。 通过创建属性文件, 我们可以为总线添加额外的信息和控制选项, 以便与设备和驱动进行交互。 简单就是,属性文件,可以完成用户空间和内核空间的数据交互, 比如在应用层快速修改g…

R向量运算数组矩阵

向量的运算 向量的加减乘除可以直接进运行,不用循环 向量之间的运算:分别对应计算,不用循环 两个运算的向量可以不是长度相等,但是一定长度要成整数倍。 每种运算都可以返回逻辑值T或F 取整函数 保留小数位用round: …

2024熵密杯初始题2

问题简要: 已知 counter 0x7501E6EA token 0xF4CE927C79B616E8E8F7223828794EEDF9B16591AE572172572D51E135E0D21A 伪造出另一个可以通过验证的counter和token。 给出token生成及验证代码如下: import binascii from gmssl import sm3# 读取HMAC ke…

Python入门之基础语法

第1关:行与缩进 任务描述 本关任务:改正代码中不正确的缩进,使其能够正常编译,并输出正确的结果。 相关知识 缩进 Python 与 C/C、Java 这些 C 类语言不同,Python 使用缩进来表示代码块,缩进的空格数量可…

WebSocket与Socket

一、定义与用途 Socket Socket(套接字)是一个抽象层,用于在网络上执行进程间的通信。它为应用程序提供了发送和接收数据的机制,通过IP和端口号来标识网络中唯一的位置。Socket可以使用TCP进行面向连接的可靠通信,也可以…

[Python学习日记-54] Python 中的日志模块 —— logging

[Python学习日记-54] Python 中的日志模块 —— logging 简介 基础用法 日志写入到文件 自定义日志格式 日志同时输出到屏幕和写入到文件 简介 在程序的运行过程中会执行很多操作或者进行很多的交互,也有的时候可能你开发出来的网站会遭到黑客的攻击&#xff0…

10.30Python随堂考试

1.(12分)使用Python的NumPy库,创建一个形状为(4,4)的二维数组,并且初始化所有元素为其行索引与列索引之和。 import numpy as np arr np.array([[i j for j in range(4)] for i in range(4)]) print(arr)2.(8分&…

C++关键字noexcept应用及案例

文章目录 使用场景:注意事项: noexcept在C中的应用和重要性:与标准库的交互与异常安全相关的编程模式与C标准的关系与性能的关系示例代码 综合案例扩展后的代码新增功能解释异常安全性能优化 在C中, noexcept是一个关键字&#x…

STM32F103HAL库实现低功耗(睡眠模式、停止模式和待机模式)

STM32F103HAL库实现低功耗(睡眠模式、停止模式和待机模式) 1. STM32电源结构2. 电源管理器2.1 上电复位和掉电复位2.2 可编辑电压监测器(PVD) 3. 低功耗模式介绍3.1 睡眠模式3.2 停止模式3.3 待机模式 4. 低功耗相关寄存器5. 低功…

Windows: 如何实现CLIPTokenizer.from_pretrained`本地加载`stable-diffusion-2-1-base`

参考:https://blog.csdn.net/qq_38423499/article/details/137158458 https://github.com/VinAIResearch/Anti-DreamBooth?tabreadme-ov-file 联网下载没有问题: import osos.environ["HF_ENDPOINT"] "https://hf-mirror.com" i…

从0学习React(9)

代码解析 const changeOrg (orgId) > {queryData.orgId orgId;delete queryData.deviceClassifyId;setQueryData(queryData);actionRef.current?.reset();loadTreeData(orgId); };1. const changeOrg (orgId) > { ... }; 这是一个箭头函数(arrow functi…

【vue】14.插槽:构建可复用组件的关键

今天看代码的时候碰到了插槽&#xff0c;有些看不懂&#xff0c;所以写下这篇文章&#xff0c;系统地梳理一下关于插槽的内容&#xff0c;也希望给大家带来一些帮助。 // 我碰到的插槽长这样 <template #default"scope">... </template> 一.什么是插槽…

阿里巴巴店铺商品API返回值中的商品分类与筛选条件

阿里巴巴店铺商品API返回值中的商品分类与筛选条件对于电商平台的运营和用户购物体验至关重要。以下是对这两个方面的详细解析&#xff1a; 一、商品分类 商品分类是指将商品按照其属性、用途、材质等因素进行归类&#xff0c;以便商家和用户更好地管理和查找商品。在阿里巴巴…

Electron 是一个用于构建跨平台桌面应用程序的开源框架

Electron 是一个用于构建跨平台桌面应用程序的开源框架。它结合了 Chromium&#xff08;用于网页渲染的浏览器引擎&#xff09;和 Node.js&#xff08;用于后端开发的 JavaScript 运行时&#xff09;&#xff0c;允许开发者使用熟悉的 HTML、CSS 和 JavaScript 技术来开发桌面应…

影刀RPA与Python作为爬虫的对比

1.概要 RPA&#xff08;Robotic Process Automation&#xff0c;机器人流程自动化&#xff09;是一种业务流程自动化技术&#xff0c;它通过软件机器人或“虚拟劳动力”来模拟和集成人类用户与数字系统之间的交互。RPA工具可以自动执行重复性的、基于规则的任务&#xff0c;这…

camera和lidar外参标定

雷达和相机的外参标定&#xff08;外部参数标定&#xff09;指的是确定两者之间的旋转和平移关系&#xff0c;使得它们的坐标系可以对齐。 文章目录 无目标标定livox_camera_calibdirect_visual_lidar_calibration 有目标标定velo2cam_calibration 无目标标定 livox_camera_ca…