Kafka 详解:全面解析分布式流处理平台

Kafka 详解:全面解析分布式流处理平台

Apache Kafka 是一个分布式流处理平台,主要用于构建实时数据管道和流式应用。它具有高吞吐量、低延迟、高可用性和高可靠性的特点,广泛应用于日志收集、数据流处理、消息系统、实时分析等场景。

📢 Kafka 概述

Apache Kafka 是由 LinkedIn 开发并于 2011 年开源的一个分布式流处理平台,后来捐赠给 Apache 软件基金会。它设计用于高吞吐量、分布式系统,能够处理大规模的实时数据流。

核心概念

  • Producer(生产者):负责发布消息到 Kafka 集群的客户端。
  • Consumer(消费者):订阅和处理 Kafka 中消息的客户端。
  • Broker(代理):Kafka 集群中的一个服务器节点。
  • Topic(主题):消息的分类和管理单位,类似于消息队列的队列。
  • Partition(分区):Topic 的子单位,用于并行处理和数据分布。
  • Replica(副本):分区的副本,用于数据冗余和高可用性。
  • Zookeeper:用于管理和协调 Kafka 集群的元数据和状态信息。

更多zookeeper相关知识,请点击:Zookeeper 详解:分布式协调服务的核心概念与实践

📢 Kafka 架构

Kafka 的架构主要包括以下几个部分:

  • 生产者:向 Kafka 主题发布消息。
  • 消费者:从 Kafka 主题订阅和消费消息。
  • 主题和分区:消息被发布到主题中,并分布在多个分区上。
  • 代理(Broker):Kafka 集群中的服务器,负责存储消息和处理请求。
  • Zookeeper:用于存储集群的元数据、配置和状态信息。

📢 Kafka 数据模型

消息

消息是 Kafka 中最小的数据单位,每条消息包含一个键值对和一些元数据,如时间戳。

主题(Topic)

主题是消息的分类单位。生产者将消息发送到主题,消费者从主题订阅消息。

分区(Partition)

每个主题被划分为多个分区,分区是 Kafka 并行处理和数据分布的基本单位。

副本(Replica)

每个分区有多个副本,以确保高可用性和数据冗余。

Kafka 集群

Kafka 集群由多个 Broker 组成,Broker 之间通过 Zookeeper 进行协调和管理。Zookeeper 负责存储集群的元数据,包括 Broker 信息、主题和分区的元数据等。

Broker

Broker 是 Kafka 集群中的一个节点,负责接收、存储和转发消息。Broker 通过 Zookeeper 协调和管理集群中的分区和副本。

Zookeeper

Zookeeper 是一个分布式协调服务,用于管理和协调 Kafka 集群的元数据和状态信息。Kafka 依赖 Zookeeper 来实现分布式协调、负载均衡和故障恢复。

📢 Kafka 安装与配置

环境准备

  • 安装 Java(Kafka 依赖于 Java 运行环境)。
  • 下载并安装 Kafka 和 Zookeeper。

配置文件

Kafka 的主要配置文件包括:

  • server.properties:Broker 的配置文件。
  • zookeeper.properties:Zookeeper 的配置文件。

启动 Kafka 和 Zookeeper

#  启动 Zookeeper
bin/zookeeper-server-start.sh config/zookeeper.properties
# 启动 Kafka
bin/kafka-server-start.sh config/server.properties

📢 Kafka 生产者

生产者是向 Kafka 主题发布消息的客户端。生产者通过 Producer API 向 Kafka 发送消息。

生产者配置

主要配置选项包括:

  • bootstrap.servers:Kafka 集群的地址。
  • key.serializer 和 value.serializer:用于序列化键和值的类。
  • acks:消息确认模式。

生产者示例

import org.apache.kafka.clients.producer.*;import java.util.Properties;public class SimpleProducer {public static void main(String[] args) {Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("acks", "all");Producer<String, String> producer = new KafkaProducer<>(props);for (int i = 0; i < 10; i++) {producer.send(new ProducerRecord<>("my-topic", Integer.toString(i), Integer.toString(i)));}producer.close();}
}

📢 Kafka 消费者

消费者是从 Kafka 主题订阅和消费消息的客户端。消费者通过 Consumer API 读取消息。

消费者配置

主要配置选项包括:

  • bootstrap.servers:Kafka 集群的地址。
  • group.id:消费者组 ID。
  • key.deserializer 和 value.deserializer:用于反序列化键和值的类。
  • auto.offset.reset:消费位移的重置策略。

消费者示例

import org.apache.kafka.clients.consumer.*;import java.time.Duration;
import java.util.Collections;
import java.util.Properties;public class SimpleConsumer {public static void main(String[] args) {Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("group.id", "my-group");props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("auto.offset.reset", "earliest");Consumer<String, String> consumer = new KafkaConsumer<>(props);consumer.subscribe(Collections.singletonList("my-topic"));while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String, String> record : records) {System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());}}}
}

📢 Kafka Topic

创建 Topic

可以使用 Kafka 提供的命令行工具创建 Topic。

bin/kafka-topics.sh --create --topic my-topic --bootstrap-server localhost:9092 --partitions 3 --replication-factor 1

查看 Topic 列表

bin/kafka-topics.sh --list --bootstrap-server localhost:9092

删除 Topic

bin/kafka-topics.sh --delete --topic my-topic --bootstrap-server localhost:9092

📢 Kafka 分区和副本

分区

分区是 Kafka 实现并行处理和数据分布的基本单位。每个分区在物理上是一个日志文件,分区内的消息是有序的,但分区之间是无序的。

副本

副本用于数据冗余和高可用性。每个分区有一个 leader 副本和多个 follower 副本。生产者和消费者只能与 leader 副本交互,follower 副本从 leader 副本同步数据。

副本分配策略

Kafka 使用一致性哈希算法将分区分配到不同的 Broker 上,以实现负载均衡和高可用性。

Kafka 数据持久化

Kafka 提供两种主要的数据持久化机制:日志段和索引文件。

日志段

每个分区的消息被分成多个日志段,日志段是顺序写入的。Kafka 通过滚动机制创建新的日志段,并删除旧的日志段。

索引文件

Kafka 为每个日志段创建索引文件,用于快速查找特定的消息偏移量。索引文件包括偏移量索引和时间戳索引。

📢 Kafka 高级功能

事务

Kafka 支持跨分区、跨主题的事务,保证消息的原子性和一致性。

压缩

Kafka 支持消息压缩,以减少网络带宽和存储空间。常见的压缩算法包括 Gzip、Snappy 和 LZ4。

ACL

Kafka 提供访问控制列表(ACL),用于控制用户和客户端对 Kafka 集群的访问权限。

📢 Kafka 调优

Broker 调优

  • 调整文件描述符限制:增加 Broker 可用的文件描述符数量。
  • 调整 JVM 参数:优化 JVM 的内存分配和垃圾回收策略。
  • 调整网络参数:优化 Broker 的网络传输性能。

生产者调优

  • 批量发送:启用消息批量发送,以提高吞吐量。
  • 压缩:启用消息压缩,以减少网络带宽和存储空间。

消费者调优

  • 并行消费:使用多个消费者实例并行消费消息,以提高消费速度。
  • 自动提交位移:根据需求配置位移提交策略,平衡性能和数据一致性。

🔥 Kafka 常见问题

消息丢失

  • 原因:可能由于网络故障、Broker 宕机或生产者/消费者配置不当。
  • 解决:配置合适的 ack 策略、增加副本数量、优化网络和硬件环境。

消息重复

  • 原因:可能由于生产者重试、消费者位移提交失败等。
  • 解决:使用 Kafka 事务、配置幂等生产者、合理处理消费逻辑。

消息延迟

  • 原因:可能由于网络延迟、Broker 负载过高、磁盘 I/O 性能不足等。
  • 解决:优化网络和硬件配置、调整 Broker 和客户端参数、使用更高性能的存储设备。

通过这篇详解指南,你可以全面了解 Kafka 的基本原理、架构设计、安装配置、生产者和消费者的使用,以及高级功能和调优技巧。希望这能帮助你更好地使用和掌握 Kafka,构建高效、可靠的流处理系统。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/849172.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

LeetCode刷题 | Day 4 分割等和子集(Partition Equal Subset Sum)自底向上动态规划

LeetCode刷题 | Day 4 分割等和子集(Partition Equal Subset Sum)自底向上动态规划 文章目录 LeetCode刷题 | Day 4 分割等和子集(Partition Equal Subset Sum)自底向上动态规划前言一、题目概述二、解题方法2.1 一维表格的自底向上动态规划2.1.1 思路讲解2.1.2 伪代码 + 逐…

002.数据分析_Pandas初识

我 的 个 人 主 页&#xff1a;&#x1f449;&#x1f449; 失心疯的个人主页 &#x1f448;&#x1f448; 入 门 教 程 推 荐 &#xff1a;&#x1f449;&#x1f449; Python零基础入门教程合集 &#x1f448;&#x1f448; 虚 拟 环 境 搭 建 &#xff1a;&#x1f449;&…

MySQL-权限管理(二)

一 host中的含义 /usr/local/mysql/bin/mysql -pLXYlxy2:024.#8u} -S /data/mysql/tmp/mysqld.sock select user,host,authentication_string from mysql.user; %:主要允许从任何主机连接到MySQL服务器&#xff0c;即外部连接localhost: 代表只允许本地主机连接到MySQL服务器&…

[AIGC] 详解Mockito - 简单易学的Java单元测试框架

在Java的世界中, 单元测试是一项非常重要的任务. Mockito作为一个强大灵活的mock框架&#xff0c;可以帮助我们有效的编写和管理我们的单元测试. 了解并掌握Mockito的使用对于提高我们的开发效率和保证我们的软件质量有着巨大的帮助. 文章目录 什么是Mockito?Mockito的核心API…

Spring Boot中整合Jasypt 使用自定义注解+AOP实现敏感字段的加解密

&#x1f604; 19年之后由于某些原因断更了三年&#xff0c;23年重新扬帆起航&#xff0c;推出更多优质博文&#xff0c;希望大家多多支持&#xff5e; &#x1f337; 古之立大事者&#xff0c;不惟有超世之才&#xff0c;亦必有坚忍不拔之志 &#x1f390; 个人CSND主页——Mi…

奥威BI零售数据分析方案的优缺点一览

奥威BI零售数据分析方案是一套基于BI大数据智能可视化分析系统&#xff0c;根据零售企业数据分析共性需求、业务特殊性量身打造&#xff0c;点击下载应用&#xff0c;立即将零售数据情况分析清楚&#xff0c;直观呈现。很多企业都是直接在该零售数据分析方案的基础上实现了智能…

mybatis动态SQL的if判断是否等于某个值查询条件无效

我们在使用mybaits时&#xff0c;经常使用if条件来做动态查询&#xff0c; 当查询条件是字符串类型时&#xff0c;要写不等于某个空的情况要考虑不等于null和不等于空字符串 当查询条件是日期类型或者long&#xff0c;Integer 等类型时&#xff0c;if条件里不能写不等于空字符串…

Java 基础面试300题 (291-313)

Java 基础面试300题 &#xff08;291-313&#xff09; 291 . Externalizable接口和Serializable 接口有什么区别&#xff1f; Serializable 接口是一个标记接口&#xff0c;没有定义任何方法&#xff0c;不必实现。Externalizable接口定义了readExternal()和writeExternal()方…

QT6.3学习技巧,快速入门

学习和掌握QT6.3的技巧和入门方法&#xff0c;可以让你更快速地掌握和使用这个开发工具。下面是一些建议和步骤&#xff1a; 学习基础知识&#xff1a;在开始学习QT6.3之前&#xff0c;建议先了解一些基础知识&#xff0c;例如C编程语言和图形用户界面的基本概念等。这些知识可…

如果entity中的age字段为integer类型,mybatisplus更新entity时,当age字段为null,数据不会更新,怎么办?

在使用 MyBatis-Plus 更新实体对象时&#xff0c;如果 age 字段为 null 并且希望将该字段更新为数据库中的 null&#xff0c;可以使用以下几种方法来实现&#xff1a; 方法1&#xff1a;使用 TableField(updateStrategy FieldStrategy.IGNORED) 通过在实体类中使用 TableFie…

跟着GPT学设计模式之观察者模式

你好&#xff0c;这里是codetrend专栏“跟着GPT学设计模式”。 引言 观察者模式&#xff08;Observer Pattern&#xff09;是一种行为型设计模式&#xff0c;它定义了对象之间的一对多依赖关系&#xff0c;使得当一个对象的状态发生改变时&#xff0c;其依赖对象都能够收到通…

PostgreSql常用的时间类型以及对应Java中的类型以及在Mybatis XML中转换

PGSQL DATE&#xff1a;用于存储日期信息&#xff0c;不包含任何时间信息&#xff0c;例如&#xff1a;‘2024-06-06’。 TIME&#xff1a;用于存储一天内的时间&#xff0c;精确到毫秒&#xff0c;例如&#xff1a;‘10:30:00.123’。 TIMESTAMP&#xff1a;用于存储日期和…

Junit(Java单元测试)

配置文件 要想使用 Junit 进行单元测试需要引入以下第三方库&#xff1a; 引入后可以使用 Test&#xff0c;BeforeEach等注解 <!-- https://mvnrepository.com/artifact/org.junit.jupiter/junit-jupiter-api --><dependency><groupId>org.junit.jupiter<…

eclipse添加maven插件

打开eclipse菜单 Help/Install New SoftwareWork with下拉菜单选择 2022-03 - https://download.eclipse.org/releases/2022-03‘type filter text’搜索框中输入 maven选择 M2E - Maven Integration for Eclipse一路next安装&#xff0c;重启eclipseImport项目时&#xff0c;就…

ESP8266在阿里云上线(arduino)

电脑连接上ESP8266的板子 在arduino编写代码&#xff08;arduino按照之前的配置&#xff0c;已安装好esp的开发板和几个库ArduinoJson我选了5.的版本&#xff0c;PubSubclient,Crypto,AliyunIoTSDK并且修改pubsubclient的参数&#xff09; 在项目&#xff0c;加载库&#xff0c…

Elixir学习笔记——二进制、字符串和字符列表

在“基本类型”中&#xff0c;我们学习了一些关于字符串的知识&#xff0c;并使用 is_binary/1 函数进行检查&#xff1a; iex>string "hello" "hello" iex>is_binary(string) true 在本章中&#xff0c;我们将明确二进制到底是什么、它们与字符串…

【RuoYi】如何解决Postman无法访问RuoYi中的接口数据

一、前言 最近&#xff0c;写项目要求需要将数据返回&#xff0c;指定的接口&#xff0c;并且需要使用Postman来测试接口数据&#xff0c;看是否能够请求到数据。然后项目用的是RuoYi的框架&#xff0c;RuoYi使用了SpringSecurity来做的安全框架&#xff0c;所以在访问的时候&a…

【Linux】ip命令详解

Linux网络排查 目录 一、ip命令介绍 1.1 ip命令简介 1.2 ip命令的由来 二、ip命令使用帮助 2.1 ip命令的help帮助信息 2.2 ip命令对象介绍 2.3 ip命令选项介绍 三、查看网络信息 3.1 显示当前网络接口信息 3.2 显示网络设备运行状态 3.3 显示详细设备信息 3.4 查看…

基于.NetCore和ABP.VNext的项目实战八:使用Redis缓存数据

这里将集成Redis,使用Redis来缓存数据,在appsettings.json配置Redis的连接字符串 //appsettings.json ..."Caching": {"IsOpen": "true","RedisConnectionString": "127.0.0.1:6379,ConnectTimeout=15000,SyncTimeout=5000&qu…

面向大模型的存储加速方案

参考&#xff1a;面向大模型的存储加速方案设计和实践-百度开发者中心 (baidu.com) 对于一个典型的训练来说&#xff0c;可能迭代多轮 epoch。在每个 epoch 内&#xff0c;首先需要对数据集进行随机打散&#xff0c;然后将打散后的数据划分为若干 batch&#xff0c;每读取一个 …