kafka服务介绍

kafka

    • 安装使用
    • 管理 Kafka

Apache Kafka 是一个开源的分布式事件流平台,主要用于实时数据传输和流处理。它最初由 LinkedIn 开发,并在 2011 年成为 Apache 基金会的顶级项目。Kafka 设计的目标是处理大规模的数据流,同时提供高吞吐量、低延迟和高容错性

Kafka 的工作原理可以从几个关键方面来理解:

    1. 消息的生产
    • Producer:生产者是发送消息的客户端,向 Kafka 发送消息。生产者将消息发布到特定的主题(Topic)中,主题内部可以有多个分区(Partition)。
    • Partitioning:消息会被分配到不同的分区。分区是主题的逻辑分片,有助于提高并发处理能力。消息的分配通常基于某种策略,比如消息键(key)或者轮询。
    1. 消息的存储
    • Broker:Kafka 的服务器组件,负责存储和管理消息。每个 Kafka 实例都是一个 Broker。多个 Brokers 组成 Kafka 集群。
    • Log Segments:在每个分区中,消息被追加到日志(log)中。日志是一个有序的、不可变的消息序列。为了管理日志文件的大小,Kafka 会定期将日志分段成多个文件。
    1. 消息的消费
    • Consumer:消费者从 Kafka 中读取消息。消费者订阅一个或多个主题,然后拉取(fetch)数据。
    • Consumer Group:消费者可以组成一个消费组(Consumer Group)。同一消费组中的消费者会分担读取同一个主题的不同分区的任务,从而实现负载均衡。每个消息只会被消费组中的一个消费者读取。
    1. 消息的存储与复制
    • Replication:为了提高数据的可靠性,Kafka 使用副本(replica)。每个分区有一个主副本(leader)和若干个从副本(follower)。所有的读写请求都由主副本处理,从副本则从主副本同步数据。
    • Leader-Follower:在分区中,领导者负责所有的读写请求,从副本负责从领导者同步数据。领导者失败时,从副本会选举新的领导者,确保高可用性。
    1. 协调与管理
    • Zookeeper:早期 Kafka 使用 Zookeeper 来管理集群的元数据和协调集群中的 Broker 和分区状态。Zookeeper 负责领导者的选举、配置管理和状态监控。
    • Kafka 2.8.0 及以后:Kafka 开始逐渐减少对 Zookeeper 的依赖,尝试用 Kafka 自身的协议来管理集群的元数据,这种模式称为 KRaft 模式(Kafka Raft Metadata Mode)。

数据流示例

  • 生产:生产者将消息发送到一个主题,主题有多个分区。每条消息附带一个时间戳。
  • 存储:消息被追加到分区的日志中。日志分为多个段,Kafka 按顺序存储消息。
  • 消费:消费者从主题的分区中读取消息。消费者可以跟踪已处理的消息位置,以实现断点续传。
  • 复制:数据在主副本和从副本之间同步,保证数据在 Broker 失败时不会丢失。

通过这些机制,Kafka 能够实现高吞吐量、低延迟和高可靠性的消息传递和数据流处理。

安装使用

Kafka 依赖于 Java 运行环境,因此首先需要安装 Java 11 或更高版本

apt install -y openjdk-11-jdk
root@huhy:~# java --version
openjdk 11.0.23 2024-04-16
OpenJDK Runtime Environment (build 11.0.23+9-post-Ubuntu-1ubuntu1)
OpenJDK 64-Bit Server VM (build 11.0.23+9-post-Ubuntu-1ubuntu1, mixed mode, sharing)

官网下载;https://downloads.apache.org/kafka/3.7.1/kafka_2.13-3.7.1.tgz

tar -xf kafka_2.13-3.7.1.tgz
cd kafka_2.13-3.7.1/

Kafka 的配置文件位于 config 目录中。主要的配置文件包括:

  • server.properties:Kafka 的服务器配置文件

  • zookeeper.properties:Zookeeper 的配置文件(如果使用 Zookeeper)

    • broker.id:描述:Kafka Broker 的唯一标识符。每个 Broker 必须有一个唯一的 broker.id。
      默认值:无
      示例:broker.id=0

    • listeners:描述:Kafka Broker 监听的地址和端口。指定了 Kafka 接收客户端请求的地址。
      默认值:PLAINTEXT://:9092
      示例:listeners=PLAINTEXT://localhost:9092

    • advertised.listeners:描述:Kafka 向客户端公开的地址。客户端会通过此地址与 Broker 进行通信。
      默认值:无
      示例:advertised.listeners=PLAINTEXT://your-hostname:9092

    • log.dirs:
      描述:Kafka 存储日志文件的目录。可以设置多个目录,Kafka 会将数据分散存储。
      默认值:/tmp/kafka-logs
      示例:log.dirs=/var/lib/kafka/logs

    • log.retention.hours:
      描述:日志文件的保留时间,单位是小时。超过这个时间的数据会被删除。
      默认值:168(7 天)
      示例:log.retention.hours=168

    • log.segment.bytes:
      描述:每个日志段的大小,单位是字节。日志文件会被分段存储。
      默认值:1073741824(1 GB)
      示例:log.segment.bytes=536870912(512 MB)

    • num.partitions:
      描述:默认创建的主题的分区数量。
      默认值:1
      示例:num.partitions=3

    • replication.factor:
      描述:主题的副本因子,表示每个分区有多少副本。提高副本因子可以增加数据的可靠性。
      默认值:无(主题创建时指定)
      示例:replication.factor=2

    • message.max.bytes:
      描述:Kafka 允许的最大消息大小,单位是字节。
      默认值:1000012(1 MB)
      示例:message.max.bytes=2097152(2 MB)

    • log.retention.bytes:
      描述:每个分区的日志文件最大保留大小,超过这个大小的日志会被删除。
      默认值:-1(不限制)
      示例:log.retention.bytes=1073741824(1 GB)

    • log.cleaner.enable:
      描述:启用日志清理器,用于压缩日志中的重复数据。
      默认值:false
      示例:log.cleaner.enable=true

    • security.inter.broker.protocol:
      描述:Kafka Broker 之间的通信协议,支持 PLAINTEXT、SSL、SASL_PLAINTEXT、SASL_SSL。
      默认值:PLAINTEXT
      示例:security.inter.broker.protocol=SSL

    • ssl.keystore.location:
      描述:SSL 密钥库的位置,用于 SSL/TLS 加密。
      默认值:无
      示例:ssl.keystore.location=/path/to/keystore.jks

    • zookeeper.connect:
      描述:Zookeeper 的连接字符串,包括 Zookeeper 的主机名和端口号。
      默认值:localhost:2181
      示例:zookeeper.connect=localhost:2181

    • zookeeper.connection.timeout.ms:
      描述:Zookeeper 连接超时设置,单位是毫秒。
      默认值:6000
      示例:zookeeper.connection.timeout.ms=10000

    • auto.create.topics.enable:
      描述:是否自动创建主题。如果设置为 true,当客户端向不存在的主题发送消息时,Kafka 会自动创建该主题。
      默认值:true
      示例:auto.create.topics.enable=false

    • delete.topic.enable:
      描述:是否允许删除主题。如果设置为 true,可以通过 Kafka 提供的脚本删除主题。
      默认值:false
      示例:delete.topic.enable=true

通常情况下,默认配置就可以开始使用。如果需要自定义配置,可以编辑这些文件

启动 Zookeeper;Kafka 需要 Zookeeper 来管理集群的元数据。Kafka 附带了一个简单的 Zookeeper 实例,开启后另起一个终端

bin/zookeeper-server-start.sh config/zookeeper.properties

启动 Kafka Broker;在另一个终端中,启动 Kafka Broker

bin/kafka-server-start.sh config/server.properties

另起一个终端3;Kafka 使用主题来组织消息。可以使用 Kafka 提供的脚本创建主题。例如,创建一个名为 test-topic 的主题:

bin/kafka-topics.sh --create --topic test-topic --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1
root@huhy:~/kafka_2.13-3.7.1# bin/kafka-topics.sh --create --topic test-topic --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1
Created topic test-topic.

生产消息:可以使用 Kafka 提供的生产者工具向主题中发送消息。打开一个终端并运行,然后在终端4中输入消息并按回车键发送消息

bin/kafka-console-producer.sh --topic test-topic --bootstrap-server localhost:9092
root@huhy:~/kafka_2.13-3.7.1# bin/kafka-console-producer.sh --topic test-topic --bootstrap-server localhost:9092
>huhy
>

消费消息;在另一个终端3中,可以运行消费者工具来读取消息,只有最开始两个终端是不能终端,后两个有交互界面可以直接用

root@huhy:~/kafka_2.13-3.7.1# bin/kafka-console-consumer.sh --topic test-topic --from-beginning --bootstrap-server localhost:9092
huhy

获取信息如下

在这里插入图片描述

管理 Kafka

查看主题:

bin/kafka-topics.sh --list --bootstrap-server localhost:9092
root@huhy:~/kafka_2.13-3.7.1# bin/kafka-topics.sh --list --bootstrap-server localhost:9092
__consumer_offsets
test-topic

查看主题详情:

bin/kafka-topics.sh --describe --topic test-topic --bootstrap-server localhost:9092
root@huhy:~/kafka_2.13-3.7.1# bin/kafka-topics.sh --describe --topic test-topic --bootstrap-server localhost:9092
Topic: test-topic       TopicId: rXHPQIqJRkO5lQDOsco3NQ PartitionCount: 1       ReplicationFactor: 1    Configs:Topic: test-topic       Partition: 0    Leader: 0       Replicas: 0     Isr: 0

删除主题;

bin/kafka-topics.sh --delete --topic test-topic --bootstrap-server localhost:9092

验证查看

root@huhy:~/kafka_2.13-3.7.1# bin/kafka-topics.sh --delete --topic test-topic --bootstrap-server localhost:9092
root@huhy:~/kafka_2.13-3.7.1# bin/kafka-topics.sh --list --bootstrap-server localhost:9092
__consumer_offsets

停止 Kafka Broker

bin/kafka-server-stop.sh

停止 Zookeeper:

bin/zookeeper-server-stop.sh

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/49305.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Java语言程序设计——篇八(1)

🌿🌿🌿跟随博主脚步,从这里开始→博主主页🌿🌿🌿 Java常用核心类 主要内容Object: 终极父类toString( )方法equals( )方法getClass( )方法hashCode( )方法clone( )方法finalize( )方法实战演练 …

8. kubernetes资源——ingress

kubernetes资源——ingress 一、ingress介绍1、作用2、实现方式3、核心组件 二、部署ingress1、下载ingress_1.9.6.yaml文件2、事先导入镜像3、部署ingress 三、通过ingress发布k8s中的服务1、创建服务2、创建ingress规则发布服务3、测试访问 一、ingress介绍 1、作用 ingres…

若依+AI项目开发(二)

后端代码分析 二次开发 开始执行 生成成功 创建子模块

docker安装jenkins,并配置jdk、node和maven

拉取jenkins镜像 docker pull jenkins/jenkins:2.468-jdk21 创建一个文件夹,用于二次打包jenkins镜像 mkdir -p /data/jenkins cd /data/jenkins 提前准备好jdk和maven,并放到/data/jenkins下 由于3.8.x以上版本的maven只支持https协议,我们…

深入理解SQL中的INNER JOIN操作

本文介绍了INNER JOIN的定义、使用场景、计算方法及与其他JOIN的比较。INNER JOIN是关系数据库中常用的操作,用于返回两个表中匹配的行,只有在连接条件满足时才返回数据。本文详细解释了INNER JOIN的语法及其在一对多、多对多关系中的应用,通…

Redis实战---分布式锁

1. 什么是Redis分布式锁? 分布式锁,顾名思义,就是分布式系统中使用的锁,在单体应用中我们使用synchronized、ReentrantLock来解决线程时间的共享资源的访问问题,而在分布式系统中,资源贡献问题已经由线程之…

【Ubuntu】安装 Snipaste 截图软件

Snipaste 下载安装并使用 Snipastefor more information报错解决方案每次启动软件需要输入的命令如下添加开机自启动 下载 下载地址 安装并使用 Snipaste 进入终端输入命令 # 1、进入到 Snipaste-2.8.9-Beta-x86_64.AppImage 所在目录(根据自己的下载目录而定&…

Corsearch 用 ClickHouse 替换 MySQL 进行内容和品牌保护

本文字数:3357;估计阅读时间:9 分钟 作者:ClickHouse Team 本文在公众号【ClickHouseInc】首发 Chase Richards 自 2011 年在初创公司 Marketly 担任工程负责人,直到 2020 年公司被收购。他现在是品牌保护公司 Corsear…

JAVA笔记十六

十六、异常Exception 1.概念 异常:非正常情况,包括空的引用、数组下标越界、内存溢出等 Java提供了异常对象描述这类异常情况。 Java提供了异常机制来进行处理,通过异常机制来处理程序运行期间出现的错误,可以更好地提升程序的…

波特率和比特率的区别联系【理解】

波特率(Baud rate):表示单位时间内载波调制状态变化的次数 ,单位为波特(Baud); 【值得注意的是】单位“波特”本身就已经是代表每秒的调制数,不能用“波特每秒”(Baud per second)为…

MySQL练手 --- 1141. 查询近30天活跃用户数

题目链接:1141. 查询近30天活跃用户数 思路: 题目要求:统计截至 2019-07-27(包含2019-07-27),近 30 天的每日活跃用户数(当天只要有一条活动记录,即为活跃用户) 要计算…

react中简单的配置路由

1.安装react-router-dom npm install react-router-dom 2.新建文件 src下新建page文件夹,该文件夹下新建login和index文件夹用于存放登录页面和首页,再在对应文件夹下分别新建入口文件index.js; src下新建router文件用于存放路由配置文件…

「Ant Design」Antd 中卡片如何完全不展示内容区域、按需展示内容区域、不展示标题

前言 下面是默认的 Antd 卡片&#xff0c;由以下区域组成 处理 Antd 的 Card 展示形式大致有下面三种 卡片完全不展示内容区域 const App () > (<Card title"Default size card" extra{<a href"#">More</a>} style{{ width: 300 }}b…

nginx的学习(二):负载均衡和动静分离

简介 nginx的负载均衡和动静分离的简单使用 负载均衡配置 外部访问linux的ip地址:80/edu/a.html地址&#xff0c;会轮询访问Tomcat8080和Tomcat8081服务。 Tomcat的准备 准备两个Tomcat&#xff0c;具体准备步骤在nginx的学习一的反向代理例子2中&#xff0c;在Tomcat8080…

崖山异构数据库迁移利器YMP初体验-Oracle迁移YashanDB

前言 首届YashanDB「迁移体验官」开放后&#xff0c;陆续收到「体验官」们的投稿&#xff0c;小崖在此把优秀的投稿文章分享给大家~今天分享的用户文章是《崖山异构数据库迁移利器YMP初体验-Oracle迁移YashanDB》&#xff08;作者&#xff1a;小草&#xff09;&#xff0c;满满…

让你的程序有记忆功能。

目录 环境 代码 环境 大语言模型&#xff1a; gpt-40-mini Mem0: Empower your AI applications with long-term memory and personalization OpenAPI-Key: Mem0-Key&#xff1a; 代码 import osfrom dotenv import load_dotenv from openai import OpenAI from m…

网络安全领域五大注入攻击类型介绍

在网络安全领域&#xff0c;注入攻击是一种常见的攻击方式&#xff0c;攻击者通过向应用程序发送恶意数据来操控应用程序的行为。以下跟随博主通过具体样例一起来掌握以下五种知名的注入攻击类型。 1. SQL注入&#xff08;SQL Injection&#xff09; 1.1. 概述 SQL注入是最常见…

OAuth2 + Gateway统一认证一步步实现(公司项目能直接使用),密码模式授权码模式

文章目录 认证的具体实现环境的搭建基础版授权服务搭建引入依赖创建数据表yml配置配置SpringSecurity定义认证授权的配置类授权服务器存储客户端信息修改授权服务配置&#xff0c;支持密码模式 基础版授权服务测试授权码模式测试密码模式测试**测试校验token接口** 整合JWT使用…

内网对抗-隧道技术篇防火墙组策略FRPNPSChiselSocks代理端口映射C2上线

知识点&#xff1a; 1、隧道技术篇-传输层-工具项目-Frp&Nps&Chisel 2、隧道技术篇-传输层-端口转发&Socks建立&C2上线Frp Frp是专注于内网穿透的高性能的反向代理应用&#xff0c;支持TCP、UDP、HTTP、HTTPS等多种协议。可以将内网服务以安全、便捷的方式通过…