将 AzureBlob 的日志通过 Azure Event Hubs 发给 Elasticsearch(3.纯python的实惠版)

前情:

将 AzureBlob 的日志通过 Azure Event Hubs 发给 Elasticsearch(1.标准版)-CSDN博客

将 AzureBlob 的日志通过 Azure Event Hubs 发给 Elasticsearch(2.换掉付费的Event Hubs)-CSDN博客

python脚本实现

厉害的小伙伴最终使用python脚本免费实现了这个功能

文件结构

以下是cursor对代码的评审意见:

这是一个用于下载 Azure Storage 访问日志并将其导入到 Elasticsearch 的项目。让我逐个文件分析:

1. 整体架构

项目采用了良好的模块化设计,主要包含以下组件:

  • main.py: 主程序入口,协调整体流程
  • azure_log_downloader.py: Azure blob 存储日志下载器
  • elasticsearch_sender.py: Elasticsearch 数据发送器
  • models.py: 数据模型定义
  • log_config.py: 日志配置
  • .env: 环境配置文件

2. 具体评审

main.py

优点:

  • 支持日期范围处理
  • 良好的错误处理和日志记录
  • 模块化的设计,职责划分清晰

建议改进:

  • 可以考虑添加命令行参数支持,使日期范围可配置
  • 可以添加并发处理支持,提高处理速度
azure_log_downloader.py

优点:

  • 清晰的类设计
  • 良好的错误处理

建议改进:

  • 可以添加重试机制
  • 可以考虑支持异步操作
elasticsearch_sender.py

优点:

  • 实现了批量处理
  • 包含重试机制
  • 良好的连接测试功能

建议改进:

  • 可以考虑添加索引模板管理
  • 可以添加更多的 ES 配置选项
models.py

优点:

  • 使用 dataclass 简化了类定义
  • 详细的字段解析逻辑
  • 良好的类型注解

建议改进:

  • 可以添加字段验证
  • 可以考虑使用 Pydantic 进行数据验证
log_config.py

优点:

  • 完善的日志配置
  • 支持文件和控制台输出

建议改进:

  • 可以考虑添加日志轮转
  • 可以添加日志级别配置选项

3. 安全性建议

  • 建议使用更安全的方式存储敏感信息(如 API keys)
  • 建议添加 SSL 证书验证
  • 建议添加请求速率限制

4. 性能建议

  • 考虑添加并发处理
  • 可以实现增量更新机制
  • 考虑添加缓存机制

5. 可维护性建议

  • 添加单元测试
  • 添加更详细的文档
  • 考虑添加监控指标
  • 添加 CI/CD 配置

总体来说,这是一个设计良好的项目,代码质量较高,有良好的错误处理和日志记录。主要可以在性能优化、安全性和可维护性方面进行进一步改进。

具体脚本

1. azure_log_downloader.py:

from azure.storage.blob import BlobServiceClient
import json
from models import StorageOperationLogclass AzureLogDownloader:def __init__(self, connection_string, container_name, blob_name, logger=None):self.logger = loggerself.connection_string = connection_stringself.container_name = container_nameself.blob_name = blob_namedef _get_blob_client(self):blob_service_client = BlobServiceClient.from_connection_string(self.connection_string)blob_client = blob_service_client.get_blob_client(container=self.container_name, blob=self.blob_name)return blob_clientdef download_and_transform(self):"""Download and transform log data from Azure storage"""try:blob_client = self._get_blob_client()if not blob_client.exists():self.logger.info(f"Blob does not exist, skipping: {self.blob_name}")return []blob_data = blob_client.download_blob().readall().decode('utf-8')transformed_entries = []for line in blob_data.splitlines():if line.strip():try:log_entry = json.loads(line)log_obj = StorageOperationLog.from_log_entry(log_entry, self.logger)if log_obj:transformed_entries.append(log_obj)except json.JSONDecodeError as e:self.logger.error(f"Error parsing line: {str(e)}")continueself.logger.info(f"Downloaded and transformed {len(transformed_entries)} logs")return transformed_entriesexcept Exception as e:self.logger.error(f"Error downloading blob: {str(e)}")self.logger.error(f"Blob: {self.blob_name}, Container: {self.container_name}")self.logger.error(f"Error type: {type(e).__name__}")return []

2. elasticsearch_sender.py:

from elasticsearch import Elasticsearch, helpers
import time
import uuidclass ElasticsearchSender:def __init__(self, host, api_key=None, index_name="logs", logger=None):self.logger = loggerself.config = {'hosts': host,'timeout': 30,'retry_on_timeout': True,'max_retries': 3,'verify_certs': False,'ssl_show_warn': False,'use_ssl': True}if api_key:self.config['api_key'] = api_keyself.index_name = index_nameself.es = Elasticsearch(**self.config)def test_connection(self):"""Test Elasticsearch connection"""try:info = self.es.info()self.logger.info("\nElasticsearch Server Info:")self.logger.info(f"Version: {info['version']['number']}")self.logger.info(f"Cluster Name: {info['cluster_name']}")return Trueexcept Exception as e:self.logger.error(f"\nElasticsearch connection failed: {str(e)}")return Falsedef send_logs(self, log_entries, batch_size=500, max_retries=3):"""Send logs to Elasticsearch"""def generate_actions():for entry in log_entries:doc_data = entry.__dict__.copy()if 'time' in doc_data:doc_data['@timestamp'] = doc_data.pop('time')action = {'_index': self.index_name,'_id': str(uuid.uuid4()),'_source': doc_data}yield actionsuccess_count = 0failure_count = 0retry_count = 0while retry_count < max_retries:try:success, failed = helpers.bulk(self.es,generate_actions(),chunk_size=batch_size,raise_on_error=False,raise_on_exception=False)success_count += successfailure_count += len(failed) if failed else 0self.logger.info(f"\nBatch processing results:")self.logger.info(f"- Successfully indexed: {success_count} documents")self.logger.info(f"- Failed: {failure_count} documents")if not failed:breakretry_count += 1if retry_count < max_retries:self.logger.info(f"Retrying... (Attempt {retry_count + 1}/{max_retries})")time.sleep(2 ** retry_count)except Exception as e:self.logger.error(f"\nBulk indexing error: {str(e)}")retry_count += 1if retry_count < max_retries:self.logger.info(f"Retrying... (Attempt {retry_count + 1}/{max_retries})")time.sleep(2 ** retry_count)else:self.logger.info("Maximum retry attempts reached")breakreturn success_count, failure_count

3. log_config.py:

import logging
import os
from datetime import UTC, datetimedef setup_logger(target_date: datetime = None, log_prefix: str = "app"):base_dir = os.path.dirname(os.path.abspath(__file__))log_dir = os.path.join(base_dir, 'logs')if not os.path.exists(log_dir):os.makedirs(log_dir)current_time = datetime.now(UTC).strftime("%Y%m%d_%H%M%S")target_date_str = target_date.strftime("%Y%m%d") if target_date else "None"log_file = os.path.join(log_dir, f'{log_prefix}_target_date_{target_date_str}_export_at_{current_time}.log')logger = logging.getLogger('AccessLog')logger.setLevel(logging.INFO)file_handler = logging.FileHandler(log_file, encoding='utf-8')file_handler.setLevel(logging.INFO)console_handler = logging.StreamHandler()console_handler.setLevel(logging.INFO)formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')file_handler.setFormatter(formatter)console_handler.setFormatter(formatter)logger.addHandler(file_handler)logger.addHandler(console_handler)return logger

4. models.py:

from dataclasses import dataclass
from datetime import datetime
import re
from typing import Optional@dataclass
class StorageOperationLog:time: datetimecategory: Optional[str]operationName: Optional[str]callerIpAddress: Optional[str]location: Optional[str]uri: Optional[str]durationMs: Optional[int]referrerHeader: Optional[str]userAgentHeader: Optional[str]requestBodySize: Optional[int]responseBodySize: Optional[int]serverLatencyMs: Optional[int]objectKey: Optional[str]functionName: Optional[str]file_extension: Optional[str]@staticmethoddef parse_object_key(object_key: str, logger=None) -> tuple[Optional[str], Optional[str]]:"""Parse objectKey to get institution_id and functionName"""try:container_match = re.search(r'container-(\d+)', object_key)parts = object_key.split('/')function_name = Noneif container_match:container_index = next((i for i, part in enumerate(parts) if 'container-' in part), None)if container_index is not None and container_index + 1 < len(parts):function_name = parts[container_index + 1]file_extension = Noneif parts and '.' in parts[-1]:file_extension = parts[-1].split('.')[-1].lower()return function_name, file_extensionexcept Exception as e:if logger:logger.error(f"Error parsing object_key {object_key}: {str(e)}")return None, None@classmethoddef from_log_entry(cls, entry: dict[str, any], logger=None) -> Optional['StorageOperationLog']:"""Create StorageOperationLog instance from raw log entry"""try:properties = entry.get('properties', {})object_key = properties.get('objectKey', '')function_name, file_extension = cls.parse_object_key(object_key)return cls(time=entry.get('time'),category=entry.get('category'),operationName=entry.get('operationName'),callerIpAddress=entry.get('callerIpAddress'),location=entry.get('location'),uri=entry.get('uri'),durationMs=int(entry.get('durationMs')) if entry.get('durationMs') is not None else None,referrerHeader=properties.get('referrerHeader'),userAgentHeader=properties.get('userAgentHeader'),requestBodySize=int(properties.get('requestBodySize')) if properties.get('requestBodySize') is not None else None,responseBodySize=int(properties.get('responseBodySize')) if properties.get('responseBodySize') is not None else None,serverLatencyMs=int(properties.get('serverLatencyMs')) if properties.get('serverLatencyMs') is not None else None,objectKey=object_key,functionName=function_name,file_extension=file_extension)except Exception as e:if logger:logger.error(f"Error creating StorageOperationLog: {str(e)}")return Nonedef __post_init__(self):if isinstance(self.time, str):if 'Z' in self.time:time_parts = self.time.split('.')if len(time_parts) > 1:microseconds = time_parts[1].replace('Z', '')[:6]time_str = f"{time_parts[0]}.{microseconds}Z"self.time = datetime.strptime(time_str, "%Y-%m-%dT%H:%M:%S.%fZ")else:self.time = datetime.strptime(self.time, "%Y-%m-%dT%H:%M:%SZ")

5. main.py:

from log_config import setup_logger
from azure_log_downloader import AzureLogDownloader
from elasticsearch_sender import ElasticsearchSender
from datetime import datetime, timedelta, UTC
from dotenv import load_dotenv
import osload_dotenv()def _get_index_name(target_date: datetime):"""Get full index name for the specified date"""return os.getenv('ELASTICSEARCH_INDEX_TEMPLATE', 'logs-{year}-{month}').format(year=target_date.year,month=target_date.month)def _get_blob_name_list(target_date: datetime):"""Get blob paths for all hours of the specified date"""blobs = []for hour in range(24):blob_time = target_date.replace(hour=hour, minute=0, second=0, microsecond=0)blob_name = os.getenv('AZURE_STORAGE_BLOB_TEMPLATE', 'logs/y={year}/m={month}/d={day}/h={hour}').format(year=blob_time.year,month=blob_time.month,day=blob_time.day,hour=blob_time.hour)blobs.append(blob_name)return blobsdef main():start_date = datetime(2024, 1, 1, tzinfo=UTC)end_date = datetime(2024, 1, 2, tzinfo=UTC)current_date = start_datewhile current_date <= end_date:target_date = current_datelogger = setup_logger(target_date, os.getenv('LOG_PREFIX', 'app'))try:logger.info(f"\nProcessing data for {current_date.date()}")elasticsearch_index = _get_index_name(target_date)sender = ElasticsearchSender(os.getenv('ELASTICSEARCH_HOST', 'http://localhost:9200'),os.getenv('ELASTICSEARCH_API_KEY'),elasticsearch_index,logger)if not sender.test_connection():logger.error("Elasticsearch connection failed")current_date += timedelta(days=1)continuetotal_logs = total_success = total_failed = 0blobs = _get_blob_name_list(target_date)for container in os.getenv('AZURE_STORAGE_CONTAINERS', 'logs').split(','):logger.info(f"\nProcessing container: {container}")for blob_name in blobs:logger.info(f"\nProcessing blob: {blob_name}")downloader = AzureLogDownloader(os.getenv('AZURE_STORAGE_URI'),container,blob_name,logger)try:log_entries = downloader.download_and_transform()success, failed = sender.send_logs(log_entries)total_logs += len(log_entries)total_success += successtotal_failed += failedexcept Exception as e:logger.error(f"Error processing {blob_name}: {str(e)}")continuelogger.info(f"\n{current_date.date()} Processing completed:")logger.info(f"Total documents processed: {total_logs}")logger.info(f"Successfully indexed: {total_success}")logger.info(f"Failed: {total_failed}")finally:for handler in logger.handlers[:]:handler.close()logger.removeHandler(handler)current_date += timedelta(days=1)if __name__ == "__main__":main()

6. .env :

ELASTICSEARCH_HOST=http://localhost:9200
ELASTICSEARCH_API_KEY=your_api_key
ELASTICSEARCH_INDEX_TEMPLATE=logs-{year}-{month}
AZURE_STORAGE_URI=your_storage_connection_string
AZURE_STORAGE_CONTAINERS=logs
AZURE_STORAGE_BLOB_TEMPLATE=logs/y={year}/m={month}/d={day}/h={hour}
LOG_PREFIX=app


前情后续:

将 AzureBlob 的日志通过 Azure Event Hubs 发给 Elasticsearch(1.标准版)-CSDN博客

将 AzureBlob 的日志通过 Azure Event Hubs 发给 Elasticsearch(2.换掉付费的Event Hubs)-CSDN博客

将 AzureBlob 的日志通过 Azure Event Hubs 发给 Elasticsearch(3.纯python的实惠版)-CSDN博客




本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/66622.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

python学opencv|读取图像(四十)掩模:三通道图像的局部覆盖

【1】引言 前序学习了使用numpy创建单通道的灰色图像&#xff0c;并对灰色图像的局部进行了颜色更改&#xff0c;相关链接为&#xff1a; python学opencv|读取图像&#xff08;九&#xff09;用numpy创建黑白相间灰度图_numpy生成全黑图片-CSDN博客 之后又学习了使用numpy创…

【全面解析】深入解析 TCP/IP 协议:网络通信的基石

深入解析 TCP/IP 协议&#xff1a;网络通信的基石 导语 你是否曾好奇&#xff0c;现代互联网是如何实现全球设备之间的高速、稳定和可靠通信的&#xff1f;无论是浏览网页、发送电子邮件&#xff0c;还是进行视频通话&#xff0c;背后都离不开 TCP/IP 协议 的支撑。作为互联网…

全面解析 Java 流程控制语句

Java学习资料 Java学习资料 Java学习资料 在 Java 编程中&#xff0c;流程控制语句是构建程序逻辑的关键部分&#xff0c;它决定了程序的执行顺序和走向。通过合理运用这些语句&#xff0c;开发者能够实现各种复杂的业务逻辑&#xff0c;让程序更加灵活和智能。 顺序结构 顺…

Linux系统常用指令

查找文件 find / -name "<文件名>" 2>/dev/null //遍历系统查找指定文件名文件ls -l | grep "<文件名>" //列出当前目录下有关文件名的文件find -name sw_sfp_alarm_cfg.xml //查找文件名对应路径 切换目录 编辑文件 vi <文件…

【Unity】ScrollViewContent适配问题(Contentsizefilter不刷新、ContentSizeFilter失效问题)

最近做了一个项目&#xff0c;菜单栏读取数据后自动生成&#xff0c;结果用到了双重布局 父物体 尝试了很多方式&#xff0c;也看过很多大佬的文章&#xff0c;后来自己琢磨了一下&#xff0c;当子物体组件自动生成之后&#xff0c;使用以下以下代码效果会好一些&#xff1a; …

AI辅助医学统计分析APP

AI辅助医学统计分析APP 医学统计分析的困难点在于开始阶段分析的规划和得出分析结果之后分析结果的解释&#xff0c;前者之所以困难是因为分析方法繁多又有不同的使用条件&#xff0c;后者则是因为结果中术语较多&#xff0c;且各种分析方法术语又有不同&#xff0c;非统计专业…

[STM32 HAL库]串口中断编程思路

一、前言 最近在准备蓝桥杯比赛&#xff08;嵌入式赛道&#xff09;&#xff0c;研究了以下串口空闲中断DMA接收不定长的数据&#xff0c;感觉这个方法的接收效率很高&#xff0c;十分好用。方法配置都成功了&#xff0c;但是有一个点需要进行考虑&#xff0c;就是一般我们需要…

浅谈Java之AJAX

一、基本介绍 在Java开发中&#xff0c;AJAX&#xff08;Asynchronous JavaScript and XML&#xff09;是一种用于创建动态网页的技术&#xff0c;它允许网页在不重新加载整个页面的情况下与服务器进行交互。 二、关键点和示例 1. AJAX的基本原理 AJAX通过JavaScript的XMLHtt…

AutoSar架构学习笔记

1.AUTOSAR&#xff08;Automotive Open System Architecture&#xff0c;汽车开放系统架构&#xff09;是一个针对汽车行业的软件架构标准&#xff0c;旨在提升汽车电子系统的模块化、可扩展性、可重用性和互操作性。AUTOSAR的目标是为汽车电子控制单元&#xff08;ECU&#xf…

算法竞赛之差分进阶——等差数列差分 python

目录 前置知识进入正题实战演练 前置知识 给定区间 [ l, r ]&#xff0c;让我们把数组中的[ l, r ] 区间中的每一个数加上c,即 a[ l ] c , a[ l 1 ] c , a[ l 2] c , a[ r ] c; 怎么做&#xff1f;很简单&#xff0c;差分一下即可 还不会的小伙伴点此进入学习 进入正题 …

TDengine 做 Apache SuperSet 数据源

‌Apache Superset‌ 是一个现代的企业级商业智能&#xff08;BI&#xff09;Web 应用程序&#xff0c;主要用于数据探索和可视化。它由 Apache 软件基金会支持&#xff0c;是一个开源项目&#xff0c;它拥有活跃的社区和丰富的生态系统。Apache Superset 提供了直观的用户界面…

金融场景 PB 级大规模日志平台:中信银行信用卡中心从 Elasticsearch 到 Apache Doris 的先进实践

导读&#xff1a;中信银行信用卡中心每日新增日志数据 140 亿条&#xff08;80TB&#xff09;&#xff0c;全量归档日志量超 40PB&#xff0c;早期基于 Elasticsearch 构建的日志云平台&#xff0c;面临存储成本高、实时写入性能差、文本检索慢以及日志分析能力不足等问题。因此…

虚幻商城 Fab 免费资产自动化入库

文章目录 一、背景二、实现效果展示三、实现自动化入库一、背景 上一次写了个这篇文章 虚幻商城 Quixel 免费资产一键入库,根据这个构想,便决定将范围扩大,使 Fab 商城的所有的免费资产自动化入库,是所有!所有! 上一篇文章是根据下图这部分资产一键入库: 而这篇文章则…

游戏为什么失败?回顾某平庸游戏

1、上周玩了一个老鼠为主角的游戏&#xff0c;某平台喜1送的&#xff0c; 下载了很久而一直没空玩&#xff0c;大约1G&#xff0c;为了清硬盘空间而玩。 也是为了拔掉心中的一根刺&#xff0c;下载了而老是不玩总感觉不舒服。 2、老鼠造型比较写实&#xff0c;看上去就有些讨…

亲测有效!如何快速实现 PostgreSQL 数据迁移到 时序数据库TDengine

小T导读&#xff1a;本篇文章是“2024&#xff0c;我想和 TDengine 谈谈”征文活动的优秀投稿之一&#xff0c;作者从数据库运维的角度出发&#xff0c;分享了利用 TDengine Cloud 提供的迁移工具&#xff0c;从 PostgreSQL 数据库到 TDengine 进行数据迁移的完整实践过程。文章…

C#,入门教程(01)—— Visual Studio 2022 免费安装的详细图文与动画教程

通过本课程的学习&#xff0c;你可以掌握C#编程的重点&#xff0c;享受编程的乐趣。 在本课程之前&#xff0c;你无需具备任何C#的基础知识&#xff0c;只要能操作电脑即可。 不过&#xff0c;希望你的数学不是体育老师教的。好的程序是数理化的实现与模拟。没有较好的数学基础…

Linux探秘坊-------3.开发工具详解(2)

1.动静态库和动静态链接&#xff08;操作&#xff09; 静态库是指编译链接时,把库⽂件的代码全部加⼊到可执⾏⽂件中,因此⽣成的⽂件 ⽐较⼤,但在运⾏时也就不再需要库⽂件了。其后缀名⼀般为“.a” 动态库与之相反,在编译链接时并 没有把库⽂件的代码加⼊到可执⾏⽂件中 ,⽽…

电脑开机出现Bitlock怎么办

目录 1.前言 2.产生原因&#xff1a; 1.系统异常关机 2.系统更新错误 3.硬件更换 4.CMOS电池问题 5.出厂设置 6.意外情况 3.解锁步骤&#xff1a; 3.1&#xff1a;记住密钥ID&#xff08;前6位&#xff09; 3.2&#xff1a;打开aka.ms/myrecoverykey网址 3.3&#…

C# 的 NLog 库高级进阶

一、引言 在 C# 开发的广袤天地中&#xff0c;日志记录宛如开发者的 “千里眼” 与 “顺风耳”&#xff0c;助力我们洞察应用程序的运行状态&#xff0c;快速定位并解决问题。而 NLog 库&#xff0c;无疑是日志记录领域中的璀璨明星&#xff0c;以其强大的功能、灵活的配置和出…

Avalonia系列文章之小试牛刀

最近有朋友反馈&#xff0c;能否分享一下Avalonia相关的文章&#xff0c;于是就抽空学习了一下&#xff0c;发现Avalonia真的是一款非常不错的UI框架&#xff0c;值得花时间认真学习一下&#xff0c;于是边学习边记录&#xff0c;整理成文&#xff0c;分享给大家&#xff0c;希…