Kafka的基本介绍以及扩展

文章目录

    • 基本操作
      • 新增Topic
      • 查询Topic
      • 修改Topic
      • 删除Topic
    • 生产者和消费者
      • 创建生产者
      • 创建消费者
    • Broker扩展
    • Producer扩展
    • Topic、Partition、Message扩展
    • 存储策略
    • 容错机制

基本操作

新增Topic

指定两个分区,两个副本,replication不能大于集群中的broker数

[root@hadoop01 kafka_2.12-2.4.0]# bin/kafka-topics.sh --create --zookeeper hadoop01:2181 --partitions 2 --replication-factor 2 --topic hello
Created topic hello.

查询Topic

[root@hadoop01 kafka_2.12-2.4.0]# bin/kafka-topics.sh --list --zookeeper hadoop01:2181
hello# 查看详细信息
[root@hadoop01 kafka_2.12-2.4.0]# bin/kafka-topics.sh --describe --zookeeper hadoop01:2181
Topic: hello	PartitionCount: 2	ReplicationFactor: 2	Configs: Topic: hello	Partition: 0	Leader: 1	Replicas: 1,2	Isr: 1,2Topic: hello	Partition: 1	Leader: 2	Replicas: 2,0	Isr: 2,0

image-20240312094710653

修改Topic

修改partition的数量,只能增加

[root@hadoop01 kafka_2.12-2.4.0]# bin/kafka-topics.sh --alter --zookeeper hadoop01:2181 --partitions 5 --topic hello
WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected
Adding partitions succeeded![root@hadoop01 kafka_2.12-2.4.0]# bin/kafka-topics.sh --describe --zookeeper hadoop01:2181
Topic: hello	PartitionCount: 5	ReplicationFactor: 2	Configs: Topic: hello	Partition: 0	Leader: 1	Replicas: 1,2	Isr: 1,2Topic: hello	Partition: 1	Leader: 2	Replicas: 2,0	Isr: 2,0Topic: hello	Partition: 2	Leader: 0	Replicas: 0,2	Isr: 0,2Topic: hello	Partition: 3	Leader: 1	Replicas: 1,2	Isr: 1,2Topic: hello	Partition: 4	Leader: 2	Replicas: 2,0	Isr: 2,0

删除Topic

删除topic,删除操作是不可逆的,从1.0开始默认开启删除功能,之前的版本只会标记为删除状态,需要设置delete.topic.enable为true才可以真正删除

[root@hadoop01 kafka_2.12-2.4.0]# bin/kafka-topics.sh --delete --zookeeper hadoop01:2181 --topic helloTopic hello is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.

生产者和消费者

创建生产者

bin/kafka-console-prodecer.sh

创建消费者

bin/kafka-console-consumer.sh
[root@hadoop01 kafka_2.12-2.4.0]# bin/kafka-topics.sh --create --zookeeper hadoop01:2181 --partitions 5 --replication-factor 2 --topic hello
Created topic hello.# producer
[root@hadoop01 kafka_2.12-2.4.0]# bin/kafka-console-producer.sh --broker-list hadoop01:9092 --topic hello# consumer 这个只消费最新的消息
[root@hadoop01 kafka_2.12-2.4.0]# bin/kafka-console-consumer.sh  --bootstrap-server hadoop01:9092 --topic hello# 消费之前的消息
[root@hadoop01 kafka_2.12-2.4.0]# bin/kafka-console-consumer.sh  --bootstrap-server hadoop01:9092 --topic hello --from-beginning

Broker扩展

配置文件server.properties

# The number of messages to accept before forcing a flush of data to disk
# 根据条数选择刷新磁盘的时机
log.flush.interval.messages=10000# 根据消息的间隔时间刷新
# The maximum amount of time a message can sit in a log before we force a flush
log.flush.interval.ms=1000# The minimum age of a log file to be eligible for deletion due to age  日志保存时间
log.retention.hours=168# A size-based retention policy for logs. Segments are pruned from the log unless the remaining
# segments drop below log.retention.bytes. Functions independently of log.retention.hours.
#log.retention.bytes=1073741824# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=1073741824# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies  每隔5分钟检查文件是否满足删除的条件
log.retention.check.interval.ms=300000

Producer扩展

  • Partitioner:根据用户设置的算法(比如根据消息的key来设计算法到底分发到哪个分区里面)来计算发送到哪个分区-Partition,默认是随机
  • 数据通信方式:同步发送和异步发送,同步是指生产者发送数据后,要等待接收方发回响应后再发送下一个数据的通讯方式;异步指发送生产者发送消息后不等接收方响应就立即发送下一条数据的方式,通信方式通过acks的配置来控制。
    • acks:默认为1.表示需要Leader节点回复收到消息
    • acks:all,表示需要所有的Leader节点以及所有的副本节点回复收到消息(acks=-1)
    • acks:0,不需要回复

Topic、Partition、Message扩展

  • 每个Partition在存储层面是Append Log文件,新消息都会被直接追加到log文件的尾部,每条消息在log文件中的位置称为offset(偏移量)
  • 越多的Partition可以容纳更多的Consumer,有效提升并发消费的能力
  • 业务类型增加了可以增加Topic,数据量大需要增加Partition
  • Message:offset,类型是long表示此消息在一个Partition中的起始位置,可以认为offset是Partition中的messageId,自增;MessageSize,类似为int32,表示此消息的字节大小;data,类型为bytes,表示message的具体内容

image-20240312125048098

存储策略

  • 在kafka中每个topic包含1到多个partition,每个partition存储一部分Message,每条Message包含三个属性,其中有一个是Offset
  • Offset相当于这个partition中的message的唯一ID,可以通过分段+索引的方式去找到这个message;分段就是segment文件,每个partition由多个segment文件组成;索引就是index,每个index里面都会记录每个segment文件中的第一条数据的偏移量,然后根据这个偏移量就可以去segment文件中找到对应的消息
# The maximum size of a log segment file. When this size is reached a new log segment will be created.
# 这个配置就表示每个segment文件的大小,超过这个大小就会再创建一个新的文件
log.segment.bytes=1073741824

kafka消息的存储流程:producer生产的消息会被发送到Topic的多个Partition上面,Topic收到消息之后会往partition的最后一个segment文件中添加这条消息,文件达到一定大小后会创建新的文件

image-20240312130302894

image-20240312125947693

容错机制

  • 一个Broker宕机后对集群的影响不大

    # 模拟节点宕机
    [root@hadoop01 config]# jps
    41728 NameNode
    53523 Kafka
    42246 ResourceManager
    59789 Jps
    41998 SecondaryNameNode
    52655 QuorumPeerMain
    [root@hadoop01 config]# kill 53523
    [root@hadoop01 config]# jps
    41728 NameNode
    59809 Jps
    42246 ResourceManager
    41998 SecondaryNameNode
    52655 QuorumPeerMain# 连接到kafka
    [root@hadoop01 zookeeper3.8.4]# bin/zkCli.sh
    [zk: localhost:2181(CONNECTED) 0] ls /
    [admin, brokers, cluster, config, consumers, controller, controller_epoch, isr_change_notification, latest_producer_id_block, log_dir_event_notification, zookeeper]
    [zk: localhost:2181(CONNECTED) 1] ls /brokers 
    [ids, seqid, topics]
    [zk: localhost:2181(CONNECTED) 2] ls /brokers/ids 
    [1, 2]
    [zk: localhost:2181(CONNECTED) 4] get /brokers/ids/1
    {"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT"},"endpoints":["PLAINTEXT://hadoop02:9092"],"jmx_port":-1,"host":"hadoop02","timestamp":"1710206078306","port":9092,"version":4}
    [zk: localhost:2181(CONNECTED) 5] 
    # zookeeper会重新选举leader
    [root@hadoop01 kafka_2.12-2.4.0]# bin/kafka-topics.sh --describe --zookeeper hadoop01:2181 --topic hello
    Topic: hello	PartitionCount: 5	ReplicationFactor: 2	Configs: Topic: hello	Partition: 0	Leader: 2	Replicas: 0,2	Isr: 2Topic: hello	Partition: 1	Leader: 1	Replicas: 1,0	Isr: 1Topic: hello	Partition: 2	Leader: 2	Replicas: 2,1	Isr: 2,1Topic: hello	Partition: 3	Leader: 1	Replicas: 0,1	Isr: 1Topic: hello	Partition: 4	Leader: 1	Replicas: 1,2	Isr: 1,2
    You have new mail in /var/spool/mail/root
    • 当kafka集群中新增一个Broker节点,zookeeper会自动识别并在适当的时机选择此节点提供Leader服务
    # 重新启动
    [root@hadoop01 kafka_2.12-2.4.0]# bin/kafka-server-start.sh -daemon config/server.properties 
    You have new mail in /var/spool/mail/root
    [root@hadoop01 kafka_2.12-2.4.0]# jps
    41728 NameNode
    60640 Kafka
    60707 Jps
    42246 ResourceManager
    41998 SecondaryNameNode
    52655 QuorumPeerMain# 进入zookeeper观察
    [zk: localhost:2181(CONNECTED) 2] ls /brokers/ids
    [0, 1, 2]
    [zk: localhost:2181(CONNECTED) 3] get /brokers/ids/0
    {"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT"},"endpoints":["PLAINTEXT://hadoop01:9092"],"jmx_port":-1,"host":"hadoop01","timestamp":"1710221534732","port":9092,"version":4}# 查询kafka topic信息
    [root@hadoop01 kafka_2.12-2.4.0]# bin/kafka-topics.sh --describe --zookeeper hadoop01:2181 --topic hello
    Topic: hello	PartitionCount: 5	ReplicationFactor: 2	Configs: Topic: hello	Partition: 0	Leader: 2	Replicas: 0,2	Isr: 2,0Topic: hello	Partition: 1	Leader: 1	Replicas: 1,0	Isr: 1,0Topic: hello	Partition: 2	Leader: 2	Replicas: 2,1	Isr: 2,1Topic: hello	Partition: 3	Leader: 1	Replicas: 0,1	Isr: 1,0Topic: hello	Partition: 4	Leader: 1	Replicas: 1,2	Isr: 1,2
    You have new mail in /var/spool/mail/root
  • 新启动的几点不会是任何分区的leader,所以要重新均匀分配,其实不分配也可以,在kafka中有对应的配置

     [root@hadoop01 kafka_2.12-2.4.0]# bin/kafka-leader-election.sh --bootstrap-server hadoop01:9092 --election-type preferred --all-topic-partitions
    You have new mail in /var/spool/mail/root
    [root@hadoop01 kafka_2.12-2.4.0]# bin/kafka-topics.sh --describe --zookeeper hadoop01:2181 --topic hello
    Topic: hello	PartitionCount: 5	ReplicationFactor: 2	Configs: Topic: hello	Partition: 0	Leader: 0	Replicas: 0,2	Isr: 2,0Topic: hello	Partition: 1	Leader: 1	Replicas: 1,0	Isr: 1,0Topic: hello	Partition: 2	Leader: 2	Replicas: 2,1	Isr: 2,1Topic: hello	Partition: 3	Leader: 0	Replicas: 0,1	Isr: 1,0Topic: hello	Partition: 4	Leader: 1	Replicas: 1,2	Isr: 1,2

在kafka中的Broker是无状态的,本身是不保存任何信息的,Broker的所有信息都放在zookeeper里面了,所以,Broker进程挂掉或者启动,对集群的影响不大!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/738966.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【MAC】MacOS M2 芯片的Mysql 数据库安装与使用

1.下载 https://downloads.mysql.com/archives/community/ 选择ARM的 2.安装 在安装到最后一步:configuration 一定要选择Use Legacy Password Encryption。 一定要记得输入密码,这个密码也是登陆mysql的密码,非常重要。备注:…

Huggingface中Transformer模型使用

NLP自从Transformer模型出现后,处理方式有大统一的趋势,首先回答几个基础问题: 1、自然语言处理究竟要做一件什么事呢?自然语言处理最终解决的是分类问题,但是它不仅仅输出一个分类的预测结果,关键的在于构…

私立医院的革命者:大数据解决方案全面解析

第一部分:背景 在信息化飞速发展的今天,医疗行业正经历着一场深刻的数字化转型。特别是对于私立医院来说,要在这个变革的浪潮中立于不败之地,就必须拥抱新技术,优化服务流程,提高医疗质量。大数据技术&…

Mac M1:通过docker安装RocketMQ、RocketMQ-Dashboard

0. 引言 最近本地启动以前docker安装的rocketmq发现报错了,因为是从老mac迁移过来的,发现支持的芯片还是amd的,于是重新在docker下安装rocketmq,并记录下步骤,方便大家后续参考。 1. 步骤 1、先下载项目源码 git c…

基于eleiment-plus的表格select控件

控件不是我写的&#xff0c;来源于scui,但在使用中遇到了一些问题&#xff0c;希望能把过程记录下来&#xff0c;同时把这个问题修复掉。 在使用的时候对控件进行二级封装&#xff0c;比如我的一个商品组件&#xff0c;再很多地方可以用到&#xff0c;于是 <template>&l…

【Python】一文详细介绍 plt.rc_context() 在 Matplotlib 中的原理、作用、注意事项

【Python】一文详细介绍 plt.rc_context() 在 Matplotlib 中的原理、作用、注意事项 &#x1f308; 个人主页&#xff1a;高斯小哥 &#x1f525; 高质量专栏&#xff1a;Matplotlib之旅&#xff1a;零基础精通数据可视化、Python基础【高质量合集】、PyTorch零基础入门教程&a…

2024.3.11 C++作业

1、提示并输入一个字符串&#xff0c;统计该字符中大写、小写字母个数、数字个数、空格个数以及其他字符个数要求使用C风格字符串完成 #include <iostream>using namespace std;int main() {char str[20];cout << "please enter the str:";gets(str);in…

linux GitLab 私有仓库的搭建

下载地址 gitLab 安装包下载地址&#xff1a;https://about.gitlab.com/install/ 环境准备&#xff1a; 环境&#xff1a;CentOS7.6 安装包&#xff1a;gitlab-ce-8.9.5-ce.0.el7.x86_64.rpm 硬件配置&#xff1a; 4G 安装步骤&#xff1a; 安装&#xff1a; [rootserver3 ~]…

Docker学习——Dock镜像

什么是Docker镜像 Docker 镜像类似于虚拟机镜像&#xff0c;可以将它理解为一个只读的模板。 一个镜像可以包含一个基本的操作系统环境&#xff0c;里面仅安装了 Apache 应用程序&#xff08;或 用户需要的其他软件&#xff09; 可以把它称为一个 Apache 镜像。镜像是创建 Do…

如何把一款App从无到有运营起来?都需要哪些资源?

引言 在这个数字化的时代&#xff0c;移动应用程序&#xff08;App&#xff09;如同现代社会的魔法手段&#xff0c;将我们与世界连接在一起。无论是寻找爱情的那一刻&#xff0c;还是享受美食的时光&#xff1b;无论是在城市喧嚣中找到宁静的那一刻&#xff0c;还是在孤寂时刻…

2024 年广东省职业院校技能大赛(高职组) “云计算应用”赛项样题⑤

2024 年广东省职业院校技能大赛&#xff08;高职组&#xff09; “云计算应用”赛项样题⑤ 模块一 私有云&#xff08;50 分&#xff09;任务 1 私有云服务搭建&#xff08;10 分&#xff09;任务 2 私有云服务运维&#xff08;25 分&#xff09;任务 3 私有云运维开发&#xf…

软件杯 图像检索算法

文章目录 1 前言2 图像检索介绍(1) 无监督图像检索(2) 有监督图像检索 3 图像检索步骤4 应用实例5 最后 1 前言 &#x1f525; 优质竞赛项目系列&#xff0c;今天要分享的是 图像检索算法 该项目较为新颖&#xff0c;适合作为竞赛课题方向&#xff0c;学长非常推荐&#xff…

c++之旅——第六弹

大家好啊&#xff0c;这里是c之旅第五弹&#xff0c;跟随我的步伐来开始这一篇的学习吧&#xff01; 如果有知识性错误&#xff0c;欢迎各位指正&#xff01;&#xff01;一起加油&#xff01;&#xff01; 创作不易&#xff0c;希望大家多多支持哦&#xff01; 一,静态成员&…

【rk3229 android7.1.2 替换默认输入法】

问题平台描述 问题描述解决方法 郑重声明:本人原创博文&#xff0c;都是实战&#xff0c;均经过实际项目验证出货的 转载请标明出处:攻城狮2015 Platform: Rockchip CPU:rk3229 OS:Android 7.1.2 Kernel: 3.10 问题描述 国内客户&#xff0c;觉得安卓自带的输入法不好用&#x…

智能警用装备柜管理系统|智能化可视化管理

智能警用装备柜管理系统|智能化可视化管理 我司&#xff08;JIONCH集驰&#xff09;警用装备管理系统&#xff08;智装备DW-S304&#xff09;是依托互云计算、大数据、RFID技术、数据库技术、AI、视频分析技术对警用装备进行统一管理、分析的信息化、智能化、规范化的系统。 智…

返回值不同算方法重载么?为什么?

1、典型回答 返回值不同不算方法重载 方法重载&#xff08;Overloading&#xff09;是指在同一个类中定义了多个同名方法&#xff0c;但它们的参数列表不同&#xff0c;方法重载要求方法&#xff1a; 名称相同参数类型、参数个数或参数顺序&#xff0c;至少有一个不同 方法…

分布式之LoadBalancer

一、LoadBalancer介绍 Spring Cloud LoadBalancer是Spring Cloud官方自己提供的客户端负载均衡器,抽象和实现&#xff0c;用来替代Ribbon&#xff08;已经停更&#xff09;&#xff0c; 二、Ribbon和Loadbalance 对比 组件组件提供的负载策略支持负载的客户端Ribbon随机 Ran…

[MYSQL数据库]--约束

前言 作者&#xff1a;小蜗牛向前冲 名言&#xff1a;我可以接受失败&#xff0c;但我不能接受放弃 如果觉的博主的文章还不错的话&#xff0c;还请点赞&#xff0c;收藏&#xff0c;关注&#x1f440;支持博主。如果发现有问题的地方欢迎❀大家在评论区指正 目录 一、空属性…

C++的学习

代码练习 输入一个字符串&#xff0c;统计其中大写字母、小写字母、数字、空格以及其他字符的个数 #include <iostream>using namespace std;int main() {cout << "请输入一个字符串" << endl;string str;getline(cin,str);int capital 0;int l…

HTTP/1.1 协议优化方案探讨

前言 HTTP/1.1 是目前广泛应用的网络协议之一&#xff0c;虽然已经存在多年&#xff0c;但我们仍然可以通过优化来提升其性能和效率。本文将从优化思路的角度出发&#xff0c;探讨如何在 HTTP/1.1 协议下实现优化&#xff0c;包括避免发送重复 HTTP 请求、减少 HTTP 请求次数、…