将 AzureBlob 的日志通过 Azure Event Hubs 发给 Elasticsearch(2 换掉付费的Event Hubs)

前情回顾:

将 AzureBlob 的日志通过 Azure Event Hubs 发给 Elasticsearch(1)-CSDN博客

前边的方案是挺好的,但 Azure Event Hubs 是付费服务,我这里只是一个获取日志进行必要的分析,并且不要求实时性,关键 没钱,那怎么办呢?

替代方案:

如果对实时性没有严格要求,并且希望避免使用付费的 Azure Event Hubs 服务,可以采用更经济的方式,比如将 Azure Blob 存储的日志直接发送到 Elasticsearch 或通过 Azure Storage QueueAzure Function 来实现。这些方法可以有效减少成本,同时满足日志解析和分析需求。

1. 直接从 Azure Blob 存储读取日志并处理

思路

直接从 Azure Blob 存储 中读取生成的日志文件,解析所需字段后发送到 Elasticsearch。这种方式适合没有实时性需求的场景,定期运行任务即可。

实现步骤
  1. 启用 Azure 存储日志记录

    • 在存储账户中启用 诊断设置
    • 将日志记录输出到同一存储账户的 Blob 容器中,例如 insights-logs
  2. 使用 Azure Function 或定时任务读取日志

    • 编写脚本(可以用 Python、PowerShell、C# 等语言),从指定的 Blob 容器中下载日志文件。
    • 解析日志文件,根据需要提取字段并格式化为 Elasticsearch 可接受的 JSON 数据。
    • 将数据批量发送到 Elasticsearch。
  3. 示例代码

    • 使用 Pythonazure-storage-blob 库:
    from azure.storage.blob import BlobServiceClient
    from elasticsearch import Elasticsearch
    import json# Azure Blob Storage 配置
    storage_account_name = "your_storage_account"
    storage_account_key = "your_storage_account_key"
    container_name = "insights-logs"# Elasticsearch 配置
    es = Elasticsearch("http://your-elasticsearch-server:9200")def process_blob_logs():blob_service_client = BlobServiceClient(account_url=f"https://{storage_account_name}.blob.core.windows.net",credential=storage_account_key)container_client = blob_service_client.get_container_client(container_name)# 列出日志文件blobs = container_client.list_blobs()for blob in blobs:blob_client = container_client.get_blob_client(blob)log_data = blob_client.download_blob().readall().decode('utf-8')# 假设日志是 JSON 格式for line in log_data.splitlines():try:log_entry = json.loads(line)# 提取你需要的字段parsed_entry = {"timestamp": log_entry["time"],"operation": log_entry["operationName"],"blobName": log_entry.get("blobName"),"requestorIp": log_entry.get("requestorIpAddress"),}# 写入 Elasticsearches.index(index="storage-logs", document=parsed_entry)except Exception as e:print(f"Error processing log entry: {e}")if __name__ == "__main__":process_blob_logs()
    
  4. 设置定时运行

    • 如果使用 Azure Function,配置 Timer Trigger 定期运行该脚本。
    • 如果使用本地脚本,使用 cron(Linux)或计划任务(Windows)实现定时任务。

2. 使用 Azure Storage Queue

思路

利用 Azure Storage Queue 作为消息队列替代 Event Hubs,用于暂存日志文件元数据或小型消息,然后由处理程序(如 Azure Function 或脚本)消费队列消息,读取并处理日志。

实现步骤
  1. 启用日志记录

    • 将日志写入 Blob 存储。
  2. 配置 Azure Storage Queue

    • 创建一个 Azure Queue
    • 编写脚本将日志的元数据(例如 Blob 的路径)写入队列。
  3. 编写处理脚本

    • 消费队列消息,根据消息中的 Blob 路径读取并解析日志文件。
  4. 示例代码

    • 使用 Python 和 azure-storage-queue
    from azure.storage.queue import QueueClient
    from azure.storage.blob import BlobServiceClient
    import jsonqueue_name = "your-queue"
    storage_account_name = "your_storage_account"
    storage_account_key = "your_storage_account_key"def process_queue_messages():queue_client = QueueClient(account_url=f"https://{storage_account_name}.queue.core.windows.net",credential=storage_account_key,queue_name=queue_name)blob_service_client = BlobServiceClient(account_url=f"https://{storage_account_name}.blob.core.windows.net",credential=storage_account_key)messages = queue_client.receive_messages()for msg in messages:blob_path = msg.content  # 假设队列消息中存储的是 Blob 路径container_name, blob_name = blob_path.split('/', 1)blob_client = blob_service_client.get_blob_client(container_name, blob_name)log_data = blob_client.download_blob().readall().decode('utf-8')# 解析日志并发送到 Elasticsearch(同上)print(f"Processing blob: {blob_path}")queue_client.delete_message(msg)  # 删除已处理消息if __name__ == "__main__":process_queue_messages()
    

3. 直接读取和解析日志文件后推送到 Elasticsearch

这种方法可以完全避开队列服务,直接通过脚本定期从存储账户下载日志文件并解析后推送到 Elasticsearch。

注意事项
  • 定时任务频率:根据日志生成的频率设定脚本运行的时间间隔。
  • 日志存储策略:Blob 日志文件可能会快速增长,考虑启用存储生命周期管理规则定期删除过期日志文件。
  • 安全性:确保存储帐户密钥或连接字符串的安全性,可使用 Azure 的 Managed Identity 替代密钥。

比较与选择

方案适用场景实现复杂度成本
直接从 Blob 存储读取数据量不大,无实时性需求
使用 Storage Queue有一定的队列需求,异步处理较低
使用 Azure Event Hubs需要高吞吐量和实时性

推荐:

  • 如果数据量不大,选择 直接从 Blob 存储读取
  • 如果需要解耦消息和处理程序,可以选择 Azure Storage Queue


前情后续:

将 AzureBlob 的日志通过 Azure Event Hubs 发给 Elasticsearch(1)-CSDN博客

将 AzureBlob 的日志通过 Azure Event Hubs 发给 Elasticsearch(2)-CSDN博客

将 AzureBlob 的日志通过 Azure Event Hubs 发给 Elasticsearch(3)-CSDN博客




本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/68429.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

sunrays-framework 微调

文章目录 1.common-log4j2-starter 动态获取并打印日志存储的根目录的绝对路径以及应用的访问地址1.目录2.log4j2.xml 配置LOG_HOME3.LogHomePrinter.java 配置监听器4.spring.factories 注册监听器5.测试1.common-log4j2-starter-demo 配置2.启动测试 2.common-minio-starter …

ElasticSearch上

安装ElasticSearch Lucene:Java语言的搜索引擎类库,易扩展;高性能(基于倒排索引)Elasticsearch基于Lucene,支持分布式,可水平扩展;提供Restful接口,可被任何语言调用Ela…

element-ui textarea备注 textarea 多行输入框

发现用这个组件,为了给用户更好的体验,要加下属性 1. 通过设置 autosize 属性可以使得文本域的高度能够根据文本内容自动进行调整,并且 autosize 还可以设定为一个对象,指定最小行数和最大行数。:autosize"{ minRows: 3, ma…

.netframwork模拟启动webapi服务并编写对应api接口

在.NET Framework环境中模拟启动Web服务,可以使用几种不同的方法。一个常见的选择是利用HttpListener类来创建一个简单的HTTP服务器,或者使用Owin/Katana库来自托管ASP.NET Web API或MVC应用。下面简要介绍Owin/Katana示例代码。这种方法更加灵活&#x…

路由环路的产生原因与解决方法(1)

路由环路 路由环路就是数据包不断在这个网络传输,始终到达不了目的地,导致掉线或者网络瘫痪。 TTL (生存时间):数据包每经过一个路由器的转发,其数值减1,当一个数据包的TTL值为0是,路…

Android CustomTextField

在 Compose 中开发用户界面时,需要处理输入框和键盘的交互,例如在键盘弹出时调整布局位置,避免遮挡重要内容。本篇博客将通过一个完整的示例展示如何实现这一功能。 功能概述 本例实现了一个简单的输入框。当输入框获得焦点或输入文字时&…

Alluxio数据流转方案在联通智网的应用

分享嘉宾 陈得泳 - 中国联通大数据平台 SRE 工程师,致力于基于开源生态构建稳定、高效、安全、低成本的大数据集群。 观看完整分享回放 业务背景 统一底座和安全基座位于不同IDC;统一底座:承接 O 域全域网络数据,包括移动网信…

搜维尔科技提供完整的人形机器人解决方案以及训练系统

问题:从灵巧手收集的数据是否也会在大脑大模型中训练,或是在专门用于手部控制的单独模型中训练? Q: If the data collected from dexterous hands will be trained as well in the brain large model, or in a separate model dedicated for…

打造餐饮品牌的产品矩阵:美味与策略的完美融合-中小企实战运营和营销工作室博客

打造餐饮品牌的产品矩阵:美味与策略的完美融合-中小企实战运营和营销工作室博客 在竞争激烈的餐饮市场中,打造一个成功的餐饮品牌,关键在于构建一个强大且富有吸引力的产品矩阵。这不仅涉及到研发出令人垂涎欲滴的美味佳肴,更需要…

前端大数据处理 - Web Worker

前言 先了解一个概念:页面假死 浏览器有GUI渲染线程与JS引擎线程,这两个线程是互斥的关系 当js有大量计算时,会造成 UI 阻塞,出现界面卡顿、掉帧等情况,严重时会出现页面卡死的情况,俗称假死 在前端开发…

无缝过渡:将 Ansys 子结构模型转换为 Nastran

了解如何将 Ansys 子结构模型无缝转换为 Nastran,以满足有效载荷动态模型要求 Ansys 子结构模型的优势 Ansys 子结构模型为从事大型装配体结构分析和仿真的工程师和分析师提供了多项优势。 这些模型通过将复杂结构划分为更小、更易于管理的子结构,可以…

【Flink系列】4. Flink运行时架构

4. Flink运行时架构 4.1 系统架构 Flink运行时架构——Standalone会话模式为例 1)作业管理器(JobManager) JobManager是一个Flink集群中任务管理和调度的核心,是控制应用执行的主进程。也就是说,每个应用都应该被…

AI刷题-饭馆菜品选择问题、构造回文字符串问题

目录 一、饭馆菜品选择问题 问题描述 测试样例 解题思路: 问题理解 数据结构选择 算法步骤 最终代码: 运行结果: 二、构造回文字符串问题 问题描述 测试样例 解题思路: 解题思路 具体步骤 最终代码:…

使用redis-cli命令实现redis crud操作

项目场景: 线上环境上redis中的key影响数据展示,需要删除。但环境特殊没办法通过 redis客户端工具直连。只能使用redis-cli命令来实现。 操作步骤: 1、确定redis安装的服务器; 2、找到redis的安装目录下 ##找到redis安装目…

讲一下ZooKeeper的持久化机制?

大家好,我是锋哥。今天分享关于【讲一下ZooKeeper的持久化机制?】面试题。希望对大家有帮助; 讲一下ZooKeeper的持久化机制? 1000道 互联网大厂Java工程师 精选面试题-Java资源分享网 ZooKeeper 是一个开源的分布式协调服务&…

图数据库 | 18、高可用分布式设计(中)

上文我们聊了在设计高性能、高可用图数据库的时候,从单实例、单节点出发,一般有3种架构演进选项:主备高可用,今天我们具体讲讲分布式共识,以及大规模水平分布式。 主备高可用、分布式共识、大规模水平分布式&#xff…

【Python】第二弹---深入理解编程基础:从常量、变量到注释的全面解析

✨个人主页: 熬夜学编程的小林 💗系列专栏: 【C语言详解】 【数据结构详解】【C详解】【Linux系统编程】【MySQL】【Python】 目录 1、常量和表达式 2、变量和类型 2.1、变量是什么 2.2、变量的语法 2.3、变量的类型 2.4、动态类型特…

生产环境中常用的设计模式

生产环境中常用的设计模式 设计模式目的使用场景示例单例模式保证一个类仅有一个实例,并提供一个访问它的全局访问点- 日志记录器- 配置管理器工厂方法模式定义一个创建对象的接口,让子类决定实例化哪个类- 各种工厂类(如视频游戏工厂模式创…

YOLOv10改进,YOLOv10检测头融合RFAConv卷积,添加小目标检测层(四头检测)+CA注意机制,全网首发

摘要 空间注意力已广泛应用于提升卷积神经网络(CNN)的性能,但它存在一定的局限性。作者提出了一个新的视角,认为空间注意力机制本质上解决了卷积核参数共享的问题。然而,空间注意力生成的注意力图信息对于大尺寸卷积核来说是不足够的。因此,提出了一种新型的注意力机制—…

解锁C#语法的无限可能:从基础到进阶的编程之旅

目录 一、C# 基础语法 1.1 数据类型 1.2 变量与常量 1.3 运算符 1.4 控制流语句 二、C# 面向对象编程语法 2.1 类与对象 2.2 封装 2.3 继承 2.4 多态 虚方法 抽象类 接口 三、C# 高级语法 3.1 特性(Attribute) 预定义特性 自定义特性 3…