如何通过 Kafka 将数据导入 Elasticsearch

作者:来自 Elastic Andre Luiz

将 Apache Kafka 与 Elasticsearch 集成的分步指南,以便使用 Python、Docker Compose 和 Kafka Connect 实现高效的数据提取、索引和可视化。

在本文中,我们将展示如何将 Apache Kafka 与 Elasticsearch 集成以进行数据提取和索引。我们将概述 Kafka、其生产者(producers)和消费者(consumers)的概念,并创建一个日志索引,其中将通过 Apache Kafka 接收和索引消息。该项目以 Python 实现,代码可在 GitHub 上找到。

先决条件

  • Docker 和 Docker Compose:确保你的机器上安装了 Docker 和 Docker Compose。
  • Python 3.x:运行生产者和消费者脚本。

Apache Kafka 简介

Apache Kafka 是一个分布式流媒体平台,具有高可扩展性和可用性以及容错能力。在 Kafka 中,数据管理通过主要组件进行:

  • Broker/代理:负责在生产者和消费者之间存储和分发消息。
  • Zookeeper:管理和协调 Kafka 代理,控制集群的状态、分区领导者和消费者信息。
  • Topics/主题:发布和存储数据以供使用的渠道。
  • Consumers 及 Producers/消费者和生产者:生产者向主题发送数据,而消费者则检索该数据。

这些组件共同构成了 Kafka 生态系统,为数据流提供了强大的框架。

项目结构

为了理解数据提取过程,我们将其分为几个阶段:

  • 基础设施配置/Infrastructure Provisioning:设置 Docker 环境以支持 Kafka、Elasticsearch 和 Kibana。
  • 创建生产者/Producer Creation:实现 Kafka 生产者,将数据发送到日志主题。
  • 创建消费者/Consumer Creation:开发 Kafka 消费者以读取和索引 Elasticsearch 中的消息。
  • 提取验证/Ingestion Validation:验证和确认已发送和已使用的数据。

使用 Docker Compose 进行基础设施配置

我们利用 Docker Compose 来配置和管理必要的服务。下面,你将找到 Docker Compose 代码,它设置了 Apache Kafka、Elasticsearch 和 Kibana 集成所需的每项服务,确保数据提取过程。

docker-compose.yml

version: "3"services:zookeeper:image: confluentinc/cp-zookeeper:latestcontainer_name: zookeeperenvironment:ZOOKEEPER_CLIENT_PORT: 2181kafka:image: confluentinc/cp-kafka:latestcontainer_name: kafkadepends_on:- zookeeperports:- "9092:9092"- "9094:9094"environment:KAFKA_BROKER_ID: 1KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST:${HOST_IP}:9092KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXTKAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXTKAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1elasticsearch:image: docker.elastic.co/elasticsearch/elasticsearch:8.15.1container_name: elasticsearch-8.15.1environment:- node.name=elasticsearch- xpack.security.enabled=false- discovery.type=single-node- "ES_JAVA_OPTS=-Xms512m -Xmx512m"volumes:- ./elasticsearch:/usr/share/elasticsearch/dataports:- 9200:9200kibana:image: docker.elastic.co/kibana/kibana:8.15.1container_name: kibana-8.15.1ports:- 5601:5601environment:ELASTICSEARCH_URL: http://elasticsearch:9200ELASTICSEARCH_HOSTS: '["http://elasticsearch:9200"]'

你可以直接从 Elasticsearch Labs GitHub repo 访问该文件。

使用 Kafka 生产器发送数据

生产器负责将消息发送到日志主题。通过批量发送消息,可以提高网络使用效率,允许使用 batch_size 和 linger_ms 设置进行优化,这两个设置分别控制批次的数量和延迟。配置 acks='all' 可确保消息持久存储,这对于重要的日志数据至关重要。

producer = KafkaProducer(bootstrap_servers=['localhost:9092'],  # Specifies the Kafka server to connectvalue_serializer=lambda x: json.dumps(x).encode('utf-8'),  # Serializes data as JSON and encodes it to UTF-8 before sendingbatch_size=16384,     # Sets the maximum batch size in bytes (here, 16 KB) for buffered messages before sendinglinger_ms=10,         # Sets the maximum delay (in milliseconds) before sending the batchacks='all'            # Specifies acknowledgment level; 'all' ensures message durability by waiting for all replicas to acknowledge
)def generate_log_message():levels = ["INFO", "WARNING", "ERROR", "DEBUG"]messages = ["User login successful","User login failed","Database connection established","Database connection failed","Service started","Service stopped","Payment processed","Payment failed"]log_entry = {"level": random.choice(levels),"message": random.choice(messages),"timestamp": time.time()}return log_entrydef send_log_batches(topic, num_batches=5, batch_size=10):for i in range(num_batches):logger.info(f"Sending batch {i + 1}/{num_batches}")for _ in range(batch_size):log_message = generate_log_message()producer.send(topic, value=log_message)producer.flush()if __name__ == "__main__":topic = "logs"send_log_batches(topic)producer.close()

当启动 producer 的时候,会批量的向 topic 发送消息,如下图:

INFO:kafka.conn:Set configuration …
INFO:log_producer:Sending batch 1/5 
INFO:log_producer:Sending batch 2/5
INFO:log_producer:Sending batch 3/5
INFO:log_producer:Sending batch 4/5

使用 Kafka Consumer 消费和索引数据

Consumer 旨在高效处理消息,消费来自日志主题的批次并将其索引到 Elasticsearch 中。使用 auto_offset_reset='latest',可确保 Consumer 开始处理最新消息,忽略较旧的消息,max_poll_records=10 将批次限制为 10 条消息。使用 fetch_max_wait_ms=2000,Consumer 最多等待 2 秒以积累足够的消息,然后再处理批次。

在其主循环中,Consumer 消费日志消息、处理并将每个批次索引到 Elasticsearch 中,确保持续的数据摄取。

consumer = KafkaConsumer('logs',                               bootstrap_servers=['localhost:9092'],auto_offset_reset='latest',            # Ensures reading from the latest offset if the group has no offset storedenable_auto_commit=True,               # Automatically commits the offset after processinggroup_id='log_consumer_group',         # Specifies the consumer group to manage offset trackingmax_poll_records=10,                   # Maximum number of messages per batchfetch_max_wait_ms=2000                 # Maximum wait time to form a batch (in ms)
)def create_bulk_actions(logs):for log in logs:yield {"_index": "logs","_source": {'level': log['level'],'message': log['message'],'timestamp': log['timestamp']}}if __name__ == "__main__":try:print("Starting message processing…")while True:messages = consumer.poll(timeout_ms=1000)  # Poll receive messages# process each batch messagesfor _, records in messages.items():logs = [json.loads(record.value) for record in records]bulk_actions = create_bulk_actions(logs)response = helpers.bulk(es, bulk_actions)print(f"Indexed {response[0]} logs.")except Exception as e:print(f"Erro: {e}")finally:consumer.close()print(f"Finish")

在 Kibana 中可视化数据

借助 Kibana,我们可以探索和验证从 Kafka 提取并在 Elasticsearch 中编入索引的数据。通过访问 Kibana 中的开发工具,你可以查看已编入索引的消息并确认数据符合预期。例如,如果我们的 Kafka 生产者发送了 5 个批次,每个批次 10 条消息,我们应该在索引中看到总共 50 条记录。

要验证数据,你可以在 Dev Tools 部分使用以下查询:

GET /logs/_search
{"query": {"match_all": {}}
}

相应:

此外,Kibana 还提供了创建可视化和仪表板的功能,可帮助使分析更加直观和具有交互性。下面,你可以看到我们创建的一些仪表板和可视化示例,它们以各种格式展示了数据,增强了我们对所处理信息的理解。

使用 Kafka Connect 进行数据提取

Kafka Connect 是一种旨在促进数据源和目标(接收器)之间的集成的服务,例如数据库或文件系统。它使用预定义的连接器来自动处理数据移动。在我们的例子中,Elasticsearch 充当数据接收器。

使用 Kafka Connect,我们可以简化数据提取过程,无需手动将数据提取工作流实施到 Elasticsearch 中。借助适当的连接器,Kafka Connect 允许将发送到 Kafka 主题的数据直接在 Elasticsearch 中编入索引,只需进行最少的设置,无需额外编码。

使用 Kafka Connect

要实现 Kafka Connect,我们将 kafka-connect 服务添加到我们的 Docker Compose 设置中。此配置的一个关键部分是安装 Elasticsearch 连接器,它将处理数据索引。

配置服务并创建 Kafka Connect 容器后,将需要一个 Elasticsearch 连接器的配置文件。此文件定义基本参数,例如:

  • connection.url:Elasticsearch 的连接 URL。
  • topics:连接器将监视的 Kafka 主题(在本例中为 “logs”)。
  • type.name:Elasticsearch 中的文档类型(通常为 _doc)。
  • value.converter:将 Kafka 消息转换为 JSON 格式。
  • value.converter.schemas.enable:指定是否应包含架构。
  • schema.ignorekey.ignore:在索引期间忽略 Kafka 架构和键的设置。

以下是在 Kafka Connect 中创建 Elasticsearch 连接器的 curl 命令:

curl --location '{{url}}/connectors' \
--header 'Content-Type: application/json' \
--data '{"name": "elasticsearch-sink-connector","config": {"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector","topics": "logs","connection.url": "http://elasticsearch:9200","type.name": "_doc","value.converter": "org.apache.kafka.connect.json.JsonConverter","value.converter.schemas.enable": "false","schema.ignore": "true","key.ignore": "true"}
}'

通过此配置,Kafka Connect 将自动开始提取发送到 “logs” 主题的数据并在 Elasticsearch 中对其进行索引。这种方法允许完全自动化的数据提取和索引,而无需额外的编码,从而简化整个集成过程。

结论

集成 Kafka 和 Elasticsearch 为实时数据提取和分析创建了一个强大的管道。本指南提供了一种构建强大数据提取架构的基础方法,并在 Kibana 中实现无缝可视化和分析,以适应未来更复杂的要求。

此外,使用 Kafka Connect 使 Kafka 和 Elasticsearch 之间的集成更加简化,无需额外的代码来处理和索引数据。Kafka Connect 使发送到特定主题的数据能够以最少的配置自动在 Elasticsearch 中编入索引。

想要获得 Elastic 认证?了解下一次 Elasticsearch 工程师培训的时间!

Elasticsearch 包含许多新功能,可帮助你为你的用例构建最佳搜索解决方案。深入了解我们的示例笔记本以了解更多信息,开始免费云试用,或立即在本地机器上试用 Elastic。

原文:How to ingest data to Elasticsearch through Kafka - Elasticsearch Labs

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/65077.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

LLaMA-Factory GLM4-9B-CHAT LoRA 微调实战

🤩LLaMA-Factory GLM LoRA 微调 安装llama-factory包 git clone --depth 1 https://github.com/hiyouga/LLaMA-Factory.git进入下载好的llama-factory,安装依赖包 cd LLaMA-Factory pip install -e ".[torch,metrics]" #上面这步操作会完成…

view draw aosp15

基础/背景知识 如何理解Drawable? 在 Android 中,Drawable 是一个抽象的概念,表示可以绘制到屏幕上的内容。 它可以是位图图像、矢量图形、形状、颜色等。 Drawable 本身并不是一个 View,它不能直接添加到布局中,而是…

gridcontrol表格某一列设置成复选框,选择多行(repositoryItemCheckEdit1)

1. 往表格中添加repositoryItemCheckEdit1 2. 事件: repositoryItemCheckEdit1.QueryCheckStateByValue repositoryItemCheckEdit1_QueryCheckStateByValue; private void repositoryItemCheckEdit1_QueryCheckStateByValue(object sender, DevExpress.XtraEditor…

重温设计模式--适配器模式

文章目录 适配器模式(Adapter Pattern)概述适配器模式UML图适配器模式的结构目标接口(Target):适配器(Adapter):被适配者(Adaptee): 作用&#xf…

C语言项目 天天酷跑(上篇)

前言 这里讲述这个天天酷跑是怎么实现的,我会在天天酷跑的下篇添加源代码,这里会讲述天天酷跑这个项目是如何实现的每一个思路,都是作者自己学习于别人的代码而创作的项目和思路,这个代码和网上有些许不一样,因为掺杂了…

公交车信息管理系统:构建智能城市交通的基石

程序设计 本系统主要使用Java语言编码设计功能,MySQL数据库管控数据信息,SSM框架创建系统架构,通过这些关键技术对系统进行详细设计,设计和实现系统相关的功能模块。最后对系统进行测试,这一环节的结果,基本…

MDS-NPV/NPIV

在存储区域网络(SAN)中,域ID(Domain ID)是一个用于区分不同存储区域的关键参数。域ID允许SAN环境中的不同部分独立操作,从而提高效率和安全性。以下是关于域ID的一些关键信息: 域ID的作用&…

【网络安全产品大调研系列】1. 漏洞扫描

1. 为什么会出现漏扫技术? 每次黑客攻击事件进行追溯的时候,根据日志分析后,我们往往发现基本都是系统、Web、 弱口令、配置这四个方面中的其中一个出现的安全问题导致黑客可以轻松入侵的。 操作系统的版本滞后,没有更新补丁&am…

验证 Dijkstra 算法程序输出的奥秘

一、引言 Dijkstra 算法作为解决图中单源最短路径问题的经典算法,在网络路由、交通规划、资源分配等众多领域有着广泛应用。其通过不断选择距离源节点最近的未访问节点,逐步更新邻居节点的最短路径信息,以求得从源节点到其他所有节点的最短路径。在实际应用中,确保 Dijkst…

【论文阅读笔记】Learning to sample

Learning to sample 前沿引言方法问题声明S-NET匹配ProgressiveNet: sampling as ordering 实验分类检索重建 结论附录 前沿 这是一篇比较经典的基于深度学习的点云下采样方法 核心创新点: 首次提出了一种学习驱动的、任务特定的点云采样方法引入了两种采样网络&…

Mysql大数据量表分页查询性能优化

一、模拟场景 1、产品表t_product,数据量500万+ 2、未做任何优化前,cout查询时间大约4秒;LIMIT offset, count 时,offset 值较大时查询时间越久。 count查询 SELECT COUNT(*) AS total FROM t_product WHERE deleted = 0 AND tenant_id = 1 分页查询 SELECT * FROM t_…

pythonWeb~伍~初识Django

初识Django 1.技术栈 Python知识点:函数、面向对象。前端知识点:HTML、CSS、JavaScript、jQuery、BootStrap。MySQL数据库。Python的Web框架: Flask,自身短小精悍 第三方组件。Django,内部已集成了很多组件 第三方…

kimi搜索AI多线程批量生成txt原创文章软件-不需要账号及key

kimi搜索AI多线程批量生成txt原创文章软件介绍: 软件可以设置三种模型写文章:kimi:默认AI模型,kimi-search:联网检索模型 ,kimi-research:探索版搜索聚合模型 1、可以设置写联网搜索文章&#…

DevNow x Notion

前言 Notion 应该是目前用户量比较大的一个在线笔记软件,它的文档系统也非常完善,支持多种文档格式,如 Markdown、富文本、表格、公式等。 早期我也用过一段时间,后来有点不习惯,就换到了 Obsidian ,但是…

CSPM认证最推荐学习哪个级别?

一、什么是CSPM? CSPM的全称是Certified Strategic Project Manager,中文名称为“项目管理专业人员能力评价等级证书”。这是由中国标准化协会依据国家标准《项目管理专业人员能力评价要求》(GB/T 41831-2022)推出的一项认证&…

oracle: create new database

用database configuration Assistant 引导创建数据库。记得给system,sys 设置自己的口令,便于添加新操作用户。 创建操作用户: -- 别加双引号,否则,无法用 create user geovindu identified by 888888; create user geovin identi…

快速建站(网站如何在自己的电脑里跑起来) 详细步骤 一

1.选择开源网站 开源网站的建设平台有多种类型,每种类型都针对不同的需求和用途。我们根据自己的需求来选择 一、内容管理系统(CMS) 内容管理系统是开源网站建设中最常见的类型之一,它们提供了一个易于使用的界面,使用…

ECharts散点图-气泡图,附视频讲解与代码下载

引言: ECharts散点图是一种常见的数据可视化图表类型,它通过在二维坐标系或其它坐标系中绘制散乱的点来展示数据之间的关系。本文将详细介绍如何使用ECharts库实现一个散点图,包括图表效果预览、视频讲解及代码下载,让你轻松掌握…

【蓝桥杯——物联网设计与开发】拓展模块5 - 光敏/热释电模块

目录 一、光敏/热释电模块 (1)资源介绍 🔅原理图 🔅AS312 🌙简介 🌙特性 🔅LDR (2)STM32CubeMX 软件配置 (3)代码编写 (4&#x…

Escalate_Linux靶机

Escalate_Linux靶机 前言:集合了多种liunx提权方法的靶场,通过该靶场可以简单的了解liunx提权方法 1,扫描一下端口 80/tcp open http 111/tcp open rpc 2049/udp nfs要知道对方的共享才能挂载 139/445 Samba SMB是一个协议名&#xff0c…