调度系统:使用 Apache Airflow 管理和调度 Couchbase SQL 脚本的实际例子

假设场景如下:

每天定时执行一组 Couchbase SQL 脚本,用于数据同步、聚合和清洗。

脚本包括:

同步数据到 Couchbase 集群。

执行数据聚合查询。

清理过期数据。

要求:

支持任务依赖管理。

提供任务失败后的重试机制。

支持日志和运行状态的监控。

使用 Airflow 实现

Airflow 提供了强大的调度和任务依赖管理能力,可以将上述流程定义为一个 DAG(有向无环图)。

  1. 创建 Couchbase SQL 脚本

创建三个 SQL 脚本:

sync_data.sql:

INSERT INTO bucket-name (KEY, VALUE)
SELECT META().id, new_data.*
FROM source-bucket new_data
WHERE META().id NOT IN (SELECT RAW META().id FROM bucket-name);

aggregate_data.sql:

SELECT category, COUNT(*) AS count
FROM bucket-name
WHERE type = “product”
GROUP BY category;

cleanup_expired_data.sql:

DELETE FROM bucket-name
WHERE expiration_date < NOW_STR();

  1. 安装 Couchbase 的 Python 客户端

通过 pip 安装所需的 Couchbase 依赖:

pip install couchbase

  1. 定义 Airflow DAG 和任务

couchbase_workflow.py:

from airflow import DAG

from airflow.operators.python import PythonOperator

from datetime import datetime, timedelta

from couchbase.cluster import Cluster, ClusterOptions

from couchbase_core.cluster import PasswordAuthenticator

\

Couchbase 连接函数

def execute_couchbase_query(sql_file_path):
# 连接 Couchbase 集群
cluster = Cluster(
‘couchbase://localhost’,
ClusterOptions(PasswordAuthenticator(‘username’, ‘password’))
)
bucket = cluster.bucket(‘bucket-name’)
query_service = cluster.query_indexes()

# 读取并执行 SQL 脚本
with open(sql_file_path, 'r') as file:query = file.read()
result = query_service.query(query)
print(f"Executed query from {sql_file_path}: {result}")

定义默认参数

default_args = {
‘owner’: ‘admin’,
‘depends_on_past’: False,
‘email_on_failure’: True,
‘email’: [‘admin@example.com’],
‘retries’: 2,
‘retry_delay’: timedelta(minutes=5),
}

定义 DAG

with DAG(
dag_id=‘couchbase_sql_workflow’,
default_args=default_args,
description=‘A workflow to execute Couchbase SQL scripts’,
schedule_interval=‘0 3 * * *’, # 每天凌晨 3 点运行
start_date=datetime(2024, 1, 1),
catchup=False,
tags=[‘couchbase’, ‘sql’],
) as dag:

# 任务 1: 同步数据
sync_data_task = PythonOperator(task_id='sync_data',python_callable=execute_couchbase_query,op_args=['/path/to/sql_scripts/sync_data.sql']
)# 任务 2: 数据聚合
aggregate_data_task = PythonOperator(task_id='aggregate_data',python_callable=execute_couchbase_query,op_args=['/path/to/sql_scripts/aggregate_data.sql']
)# 任务 3: 清理过期数据
cleanup_data_task = PythonOperator(task_id='cleanup_data',python_callable=execute_couchbase_query,op_args=['/path/to/sql_scripts/cleanup_expired_data.sql']
)# 定义任务依赖
sync_data_task >> aggregate_data_task >> cleanup_data_task
  1. 部署 DAG 到 Airflow

将脚本保存为 couchbase_workflow.py 并放置到 Airflow 的 DAG 文件夹中(通常是 /airflow/dags)。

确保 Airflow 服务正常运行:

airflow webserver
airflow scheduler

登录到 Airflow Web 界面,启用并监控 couchbase_sql_workflow。

  1. 优势分析

任务调度:通过 schedule_interval 定时调度任务,支持灵活的 Cron 表达式。

任务依赖管理:通过 >> 定义任务依赖,确保顺序执行。

重试机制:默认支持失败后的自动重试。

可观察性:Airflow 提供任务状态跟踪和日志记录,方便调试和监控。

  1. 扩展优化

参数化 SQL:可在 SQL 中加入参数,通过 PythonOperator 动态替换。

自定义连接器:使用 Airflow 的 Hook 构建更灵活的 Couchbase 连接器。

错误处理:在 Python 函数中捕获异常并记录到外部系统(如日志系统或监控平台)。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/63489.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

国城杯2024——Curve

相关知识链接&#xff1a;https://tangcuxiaojikuai.xyz/post/187210a7.html #sagemath from Crypto.Util.number import *def add(P, Q):(x1, y1) P(x2, y2) Qx3 (x1*y2 y1*x2) * inverse(1 d*x1*x2*y1*y2, p) % py3 (y1*y2 - a*x1*x2) * inverse(1 - d*x1*x2*y1*y2, p…

第三部分:进阶概念 8.事件处理 --[JavaScript 新手村:开启编程之旅的第一步]

JavaScript 事件处理是 Web 开发中不可或缺的一部分&#xff0c;它允许开发者响应用户的交互行为&#xff08;如点击、键盘输入等&#xff09;或浏览器的行为&#xff08;如页面加载完成&#xff09;。通过事件处理&#xff0c;我们可以使网页更加动态和互动。以下是关于 JavaS…

STM32WB55 FUS烧录

FUS固件下载 Firmware Update Service&#xff08;FUS&#xff09;是一种用于在STM32WB微控制器上更新固件的功能。FUS下载是指通过FUS服务进行固件更新的过程。通过FUS下载&#xff0c;您可以通过无线方式将新的固件加载到STM32WB设备中&#xff0c;而无需使用传统的有线编程方…

BERT模型的实现

本文用 pytorch 实现一个BERT模型。 食用方法&#xff1a; 直接下载完整实现&#xff0c; 在自己本地跑一遍&#xff0c;保证不报错。先完成数据预处理阶段&#xff08;1-4&#xff09;的代码阅读&#xff0c;然后按照如下关键点的描述完成代码的实现。自己看着代码手写后续部…

Qt之第三方库‌QXlsx使用(三)

Qt开发 系列文章 - QXlsx&#xff08;三&#xff09; 目录 前言 一、Qt开源库 二、QXlsx 1.QXlsx介绍 2.QXlsx下载 3.QXlsx移植 4.修改项目文件.pro 三、使用技巧 1.添加头文件 2.写入数据 3.读出数据 总结 前言 Qt第三方控件库是指非Qt官方提供的、用于扩展Qt应用…

框架篇面试

一、Spring框架中的单例bean的安全性 Spring框架中有一个Scope注解&#xff0c;默认的值就是singleton&#xff0c;单例的&#xff1b;因为一般在spring的bean中注入的都是无状态的对象&#xff0c;所以没有线程安全问题。但是如果在bean中定义了可修改的成员变量&#xff0c;…

OpenAI 发布 o1 LLM,推出 ChatGPT Pro

OpenAI正式发布了专为复杂推理而构建的 OpenAI o1大型语言模型(LLM)。 该公司还推出了 ChatGPT Pro&#xff0c;这是一项每月 200 美元的套餐&#xff0c;包括无限制访问 OpenAI o1、o1-mini、GPT-4o 和高级语音对话。 OpenAI o1 从 9 月 12 日起在 ChatGPT 中推出预览版&…

【Linux】文件描述符fd

1.前置预备 文件 内容 属性访问文件之前&#xff0c;都必须先打开他 #include<stdio.h> int main() { FILE* fpfopen("log.txt","w"); if(fpNULL) { perror("fopen"); return 1; } fclose(fp); return 0…

字节高频算法面试题:小于 n 的最大数

问题描述&#xff08;感觉n的位数需要大于等于2&#xff0c;因为n的位数1的话会有点问题&#xff0c;“且无重复”是指nums中存在重复&#xff0c;但是最后返回的小于n最大数是可以重复使用nums中的元素的&#xff09;&#xff1a; 思路&#xff1a; 先对nums倒序排序 暴力回…

nodejs 06.npm的使用以及package.json详解

一.npm(npm | Home)的介绍 npm(Node Package Manager)是一个node.js的包管理工具,允许用户下载安装更新分享node.js包 二.npm相关命令以及作用 1.npm init -y 这条命令主要是当项目中没有package.json这个文件的时候生成package.json这个文件 2.npm i / npm install (包名) 这条…

李飞飞首个“空间智能”模型发布:一张图,生成一个3D世界 | LeetTalk Daily

“LeetTalk Daily”&#xff0c;每日科技前沿&#xff0c;由LeetTools AI精心筛选&#xff0c;为您带来最新鲜、最具洞察力的科技新闻。 在人工智能技术迅速发展的背景下&#xff0c;李飞飞创立的世界实验室于近期发布了首个“空间智能”模型&#xff0c;这一创新成果引发了3D生…

Cursor+Devbox AI开发快速入门

1. 前言 今天无意间了解到 Cursor 和 Devbox 两大开发神器,初步尝试以后发现确实能够大幅度提升开发效率,特此想要整理成博客以供大家快速入门. 简单理解 Cursor 就是一款结合AI大模型的代码编辑器,你可以将自己的思路告诉AI,剩下的目录结构的搭建以及项目代码的实现均由AI帮…

机器学习--绪论

开启这一系列文章的初衷&#xff0c;是希望搭建一座通向机器学习世界的桥梁&#xff0c;为有志于探索这一领域的读者提供系统性指引和实践经验分享。随着人工智能和大数据技术的迅猛发展&#xff0c;机器学习已成为推动技术创新和社会变革的重要驱动力。从智能推荐系统到自然语…

计算机毕设-基于springboot的实践性教学系统设计与实现(附源码+lw+ppt+开题报告)

博主介绍&#xff1a;✌多个项目实战经验、多个大型网购商城开发经验、在某机构指导学员上千名、专注于本行业领域✌ 技术范围&#xff1a;Java实战项目、Python实战项目、微信小程序/安卓实战项目、爬虫大数据实战项目、Nodejs实战项目、PHP实战项目、.NET实战项目、Golang实战…

SpringMvc完整知识点二(完结)

SpringMVC获取请求参数 环境准备工作等均省略&#xff0c;可详见快速入门&#xff0c;此处只写非共有部分代码 该部分示例项目SpringMvcThree已上传至Gitee&#xff0c;可自行下载 客户端请求参数的格式为&#xff1a;namevalue&passwordvalue... ... 服务端想要获取请求…

【python 批量将PPT中各种东西保存为图片 没有水印】

# 安装 pip install Aspose.Slides24.12.0 # 代码(没有水印&#xff0c;亲测可用&#xff01;&#xff01;) 使用导出md为中介&#xff0c;巧妙&#xff01;但是不能导出整张无水印的幻灯片&#xff01; import aspose.slides as slidesppt_path r"xxx.pptx" out_…

PDF拆分之怎么对批量的PDF文件进行分割-免费PDF编辑工具分享

>>更多PDF文件处理应用技巧请前往 96缔盟PDF处理器 主页 查阅&#xff01; ——————————————————————————————————————— 当然了&#xff0c;单个文件或者其他任意的文件个数的拆分也是支持的&#xff01; 序言 我之前的文章也有…

EmoAva:首个大规模、高质量的文本到3D表情映射数据集。

2024-12-03&#xff0c;由哈尔滨工业大学&#xff08;深圳&#xff09;的计算机科学系联合澳门大学、新加坡南洋理工大学等机构创建了EmoAva数据集&#xff0c;这是首个大规模、高质量的文本到3D表情映射数据集&#xff0c;对于推动情感丰富的3D头像生成技术的发展具有重要意义…

【开源免费】基于Vue和SpringBoot的课程答疑系统(附论文)

博主说明&#xff1a;本文项目编号 T 070 &#xff0c;文末自助获取源码 \color{red}{T070&#xff0c;文末自助获取源码} T070&#xff0c;文末自助获取源码 目录 一、系统介绍二、演示录屏三、启动教程四、功能截图五、文案资料5.1 选题背景5.2 国内外研究现状5.3 可行性分析…

Spring Boot 整合 Druid 并开启监控

文章目录 1. 引言2. 添加依赖3. 配置数据源4. 开启监控功能5. 自定义 Druid 配置&#xff08;可选&#xff09;6. 访问监控页面7. 注意事项8. 总结 Druid 是一个由阿里巴巴开源的高性能数据库连接池&#xff0c;它不仅提供了高效的连接管理功能&#xff0c;还自带了强大的监控和…