【Kafka】高级特性:主题

目录

  • 主题的管理
    • 创建主题
    • 查看主题
    • 修改主题
    • 删除主题
  • 增加分区
  • 分区副本的分配
  • 必要参数配置
  • KafkaAdminClient应用
    • 功能
    • 操作示例

主题的管理

使用kafka-topics.sh脚本。

下面是使用脚本的一些选项

选项说明
–config <String: name=value>为创建的或修改的主题指定配置信息。
–create创建一个新主题
–delete删除一个主题
–delete-config <String: name>删除现有主题的一个主题配置条目。这些条目就 是在–config中给出的配置条目。
–alter更改主题的分区数量,副本分配和/或配置条目。
–describe列出给定主题的细节。
–disable-rack-aware禁用副本分配的机架感知。
–force抑制控制台提示信息
–help打印帮助信息
–replica-assignment
<String:broker_id_for_part1_replica1
:broker_id_for_part1_replica2
,broker_id_for_part2_replica1
:broker_id_for_part2_replica2 , …>
当创建或修改主题的时候手动指定partition-to-broker的分配关系。
–replication-factor <Integer:replication factor>要创建的主题分区副本数。1表示只有一个副本, 也就是Leader副本。
–topic <String: topic>要创建、修改或描述的主题名称。除了创建,修改和描述在这里还可以使用正则表达式。
–topics-with-overridesif set when describing topics, only show topics that have overridden configs
–unavailable-partitionsif set when describing topics, only show partitions whose leader is not available
–under-replicated-partitionsif set when describing topics, only show under replicated partitions
–zookeeper <String: urls>必需的参数:连接zookeeper的字符串,逗号分隔的多个host:port列表。多个URL可以故障转移。

创建主题

kafka-topics.sh --zookeeper localhost:2181/myKafka --create --topic topic_x - -partitions 1 --replication-factor 1 kafka-topics.sh --zookeeper localhost:2181/myKafka --create --topic topic_test_02 --partitions 3 --replication-factor 1 --config max.message.bytes=1048576 --config segment.bytes=10485760

查看主题

kafka-topics.sh --zookeeper localhost:2181/myKafka --list 
kafka-topics.sh --zookeeper localhost:2181/myKafka --describe --topic topic_x 
kafka-topics.sh --zookeeper localhost:2181/myKafka --topics-with-overrides -- describe

修改主题

kafka-topics.sh --zookeeper localhost:2181/myKafka --create --topic topic_test_01 --partitions 2 --replication-factor 1kafka-topics.sh --zookeeper localhost:2181/myKafka --alter --topic topic_test_01 --config max.message.bytes=1048576 kafka-topics.sh --zookeeper localhost:2181/myKafka --describe --topic topic_test_01kafka-topics.sh --zookeeper localhost:2181/myKafka --alter --topic topic_test_01 --config segment.bytes=10485760 kafka-topics.sh --zookeeper localhost:2181/myKafka --alter --delete-config max.message.bytes --topic topic_test_01

删除主题

kafka-topics.sh --zookeeper localhost:2181/myKafka --delete --topic topic_x

删除主题时,只是给主题添加删除的标记,要过一段时间删除。

增加分区

通过命令行工具操作,主题的分区只能增加,不能减少。否则报错

ERROR org.apache.kafka.common.errors.InvalidPartitionsException: The number of partitions for a topic can only be increased. Topic myTop1 currently has 2 partitions, 1 would not be an increase.

通过–alter修改主题的分区数,增加分区。

kafka-topics.sh --zookeeper localhost/myKafka --alter --topic myTop1 -- partitions 2

分区副本的分配

副本分配的三个目标:

  1. 均衡地将副本分散于各个broker上
  2. 对于某个broker上分配的分区,它的其他副本在其他broker上
  3. 如果所有的broker都有机架信息,尽量将分区的各个副本分配到不同机架上的broker。

在不考虑机架信息的情况下:

  1. 第一个副本分区通过轮询的方式挑选一个broker,进行分配。该轮询从broker列表的随机位置进行轮询。
  2. 其余副本通过增加偏移进行分配。

必要参数配置

使用kafka-topics.sh --config xx=xx --config yy=yy

配置给主题的参数如下:

属性默认值服务器默认属性说明
cleanup.policydeletelog.cleanup.policy要么是”delete“要么是”compact“; 这个字符串指明了当他们的回收时间或者尺寸限制到达时,针对旧日志部分的利用方式。默认 方式(“delete”)将会丢弃旧的部分;”compact“将会进行日志压缩。
compression.typenoneproducer用于压缩数据的压缩类 型。默认是无压缩。正确的选项 值是none、gzip、snappy、 lz4。压缩最好用于批量处理,批 量处理消息越多,压缩性能越 好。
max.message.bytes1000000max.message.byteskafka追加消息的最大字节数。注 意如果你增大这个字节数,也必 须增大consumer的fetch字节 数,这样consumer才能fetch到 这些最大字节数的消息。
min.cleanable.dirty.ratio0.5min.cleanable.dirty.ratio此项配置控制log压缩器试图进行 清除日志的频率。默认情况下, 将避免清除压缩率超过50%的日 志。这个比率避免了最大的空间 浪费
min.insync.replicas1min.insync.replicas当producer设置 request.required.acks为-1时, min.insync.replicas指定replicas 的最小数目(必须确认每一个 repica的写数据都是成功的), 如果这个数目没有达到, producer会产生异常。
retention.bytesNonelog.retention.bytes如果使用“delete”的retention 策 略,这项配置就是指在删除日志之前,日志所能达到的最大尺寸。默认情况下,没有尺寸限制而只有时间限制
retention.ms7 dayslog.retention.minutes如果使用“delete”的retention策 略,这项配置就是指删除日志前 日志保存的时间。
segment.bytes1GBlog.segment.byteskafka中log日志是分成一块块存储的,此配置是指log
segment.index.bytes10MBlog.index.size.max.bytes此配置是有关offsets和文件位置 之间映射的索引文件的大小;一 般不需要修改这个配置
segment.jitter.ms0log.roll.jitter.{ms,hours}The maximum jitter to subtract from logRollTimeMillis.
segment.ms7 dayslog.roll.hours即使log的分块文件没有达到需要删除、压缩的大小,一旦log 的时间达到这个上限,就会强制新建一个log分块文件
unclean.leader.election.enabletrue指明了是否能够使不在ISR中的replicas设置用来作为leader

KafkaAdminClient应用

除了使用Kafka的bin目录下的脚本工具来管理Kafka,还可以使用管理Kafka的API将某些管理查看的功能集成到系统中。在Kafka0.11.0.0版本之前,可以通过kafka-core包(Kafka的服务端,采用Scala编写)中的AdminClient和AdminUtils来实现部分的集群管理操作。Kafka0.11.0.0之后,又多了一个AdminClient,在kafka-client包下,一个抽象类,具体的实现是org.apache.kafka.clients.admin.KafkaAdminClient

功能

KafkaAdminClient包含了一下几种功能(以Kafka1.0.2版本为准):

  1. 创建主题: createTopics(final Collection<NewTopic> newTopics, final CreateTopicsOptions options)

  2. 删除主题:deleteTopics(final Collection<String> topicNames, DeleteTopicsOptions options)

  3. 列出所有主题:listTopics(final ListTopicsOptions options)

  4. 查询主题:describeTopics(final Collection<String> topicNames, DescribeTopicsOptions options)

  5. 查询集群信息:describeCluster(DescribeClusterOptions options)

  6. 查询配置信息:describeConfigs(Collection<ConfigResource> configResources, final DescribeConfigsOptions options)

  7. 修改配置信息:alterConfigs(Map<ConfigResource, Config> configs, final AlterConfigsOptions options)

  8. 修改副本的日志目录:alterReplicaLogDirs(Map<TopicPartitionReplica, String> replicaAssignment, final AlterReplicaLogDirsOptions options)

  9. 查询节点的日志目录信息:describeLogDirs(Collection<Integer> brokers, DescribeLogDirsOptions options)

  10. 查询副本的日志目录信息:describeReplicaLogDirs(Collection<TopicPartitionReplica> replicas,DescribeReplicaLogDirsOptions options)

  11. 增加分区:createPartitions(Map<String, NewPartitions> newPartitions, final CreatePartitionsOptions options)

其内部原理是使用Kafka自定义的一套二进制协议来实现,详细可以参见Kafka协议。

用到的参数:

属性说明重要性
bootstrap.servers向Kafka集群建立初始连接用到的host/port列表。客户端会使用这里列出的所有服务器进行集群其他服务器的发现,而不管是否指定了哪个服务器用作引导。这个列表仅影响用来发现集群所有服务器的初始主机。
字符串形式:host1:port1,host2:port2,…
由于这组服务器仅用于建立初始链接,然后发现集群中的所有服务器,因此没有必要将集群中的所有地址写在这里。
一般最好两台,以防其中一台宕掉。
high
client.id生产者发送请求的时候传递给broker的id字符串。用于在broker的请求日志中追踪什么应用发送了什么消息。一般该id是跟业务有关的字符串。medium
connections.max.idle.ms当连接空闲时间达到这个值,就关闭连接。long型数据,默认:300000medium
request.timeout.ms客户端等待服务端响应的最大时间。如果该时间超时,则客户端要么重新发起请求,要么如果重试耗尽,请求失败。int类型值,默认:120000medium
security.protocol跟broker通信的协议:PLAINTEXT, SSL,SASL_PLAINTEXT, SASL_SSL.string类型值,默认:PLAINTEXTmedium
reconnect.backoff.ms重新连接主机的等待时间。避免了重连的密集循环。该等待时间应用于该客户端到broker的所有连接。long型值,默认:50medium
retries重试的次数,达到此值,失败。int类型值,默认5。low
retry.backoff.ms在发生失败的时候如果需要重试,则该配置表示客户端等待多长时间再发起重试。该时间的存在避免了密集循环。long型值,默认值:100。low
receive.buffer.bytesTCP接收缓存(SO_RCVBUF),如果设置为-1,则使用操作系统默认的值。int类型值,默认65536medium
send.buffer.bytes用于TCP发送数据时使用的缓冲大小(SO_SNDBUF),-1表示使用OS默认的缓冲区大小。int类型值,默认值:131072medium
reconnect.backoff.max.ms对于每个连续的连接失败,每台主机的退避将成倍增加,直至达到此最大值。在计算退避增量之后,添加20%的随机抖动以避免连接风暴。
long型值,默认1000
low

操作示例

主要操作步骤:

  1. 客户端根据方法的调用创建相应的协议请求,比如创建Topic的createTopics方法,其内部就是发送CreateTopicRequest请求。
  2. 客户端发送请求至Kafka Broker。
  3. Kafka Broker处理相应的请求并回执,比如与CreateTopicRequest对应的是CreateTopicResponse。
  4. 客户端接收相应的回执并进行解析处理。

和协议有关的请求和回执的类基本都在org.apache.kafka.common.requests包中,AbstractRequest和AbstractResponse是这些请求和响应类的两个父类。

综上,如果要自定义实现一个功能,只需要三个步骤:

  1. 自定义XXXOptions;
  2. 自定义XXXResult返回值;
  3. 自定义Call,然后挑选合适的XXXRequest和XXXResponse来实现Call类中的3个抽象方法。

示例:

import org.apache.kafka.clients.admin.*;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.requests.DescribeLogDirsResponse;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;import java.util.*;
import java.util.concurrent.ExecutionException;
import java.util.function.BiConsumer;
import java.util.function.Consumer;public class MyAdminClient {private KafkaAdminClient client;@Beforepublic void before() {Map<String, Object> configs = new HashMap<>();configs.put("bootstrap.servers", "node1:9092");configs.put("client.id", "admin_001");client = (KafkaAdminClient) KafkaAdminClient.create(configs);}@Afterpublic void after() {// 关闭admin客户端client.close();}@Testpublic void testListTopics() throws ExecutionException, InterruptedException {// 列出主题
//        final ListTopicsResult listTopicsResult = client.listTopics();ListTopicsOptions options = new ListTopicsOptions();// 列出内部主题options.listInternal(true);// 设置请求超时时间,单位是毫秒options.timeoutMs(500);final ListTopicsResult listTopicsResult = client.listTopics(options);//        final Set<String> strings = listTopicsResult.names().get();
//
//        strings.forEach(name -> {
//            System.out.println(name);
//        });// 将请求变成同步的请求,直接获取结果final Collection<TopicListing> topicListings = listTopicsResult.listings().get();topicListings.forEach(new Consumer<TopicListing>() {@Overridepublic void accept(TopicListing topicListing) {// 该主题是否是内部主题final boolean internal = topicListing.isInternal();// 该主题的名字final String name = topicListing.name();System.out.println("主题是否是内部主题:" + internal);System.out.println("主题的名字:" + name);System.out.println(topicListing);System.out.println("=====================================");}});}@Testpublic void testDescribeLogDirs() throws ExecutionException, InterruptedException {final DescribeLogDirsResult describeLogDirsResult = client.describeLogDirs(Collections.singleton(0));final Map<Integer, Map<String, DescribeLogDirsResponse.LogDirInfo>> integerMapMap= describeLogDirsResult.all().get();integerMapMap.forEach(new BiConsumer<Integer, Map<String, DescribeLogDirsResponse.LogDirInfo>>() {@Overridepublic void accept(Integer integer, Map<String, DescribeLogDirsResponse.LogDirInfo> stringLogDirInfoMap) {System.out.println("broker.id = " + integer);
//                log.dirs可以设置多个目录stringLogDirInfoMap.forEach(new BiConsumer<String, DescribeLogDirsResponse.LogDirInfo>() {@Overridepublic void accept(String s, DescribeLogDirsResponse.LogDirInfo logDirInfo) {System.out.println("logdir = " + s);final Map<TopicPartition, DescribeLogDirsResponse.ReplicaInfo> replicaInfos = logDirInfo.replicaInfos;replicaInfos.forEach(new BiConsumer<TopicPartition, DescribeLogDirsResponse.ReplicaInfo>() {@Overridepublic void accept(TopicPartition topicPartition, DescribeLogDirsResponse.ReplicaInfo replicaInfo) {System.out.println("主题分区:" + topicPartition.partition());System.out.println("主题:" + topicPartition.topic());
//                                final boolean isFuture = replicaInfo.isFuture;
//                                final long offsetLag = replicaInfo.offsetLag;
//                                final long size = replicaInfo.size;}});}});}});}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/650885.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

C语言常见面试题:什么是枚举,枚举的作用是什么?

枚举是一种特殊的数据类型&#xff0c;它是一组具命名的整型常量的集合。枚举的作用如下&#xff1a; 限制用户不能随意赋值&#xff1a;枚举类型可以限制用户只能使用定义时列举的值进行赋值&#xff0c;而不能随意赋值。这样可以增加代码的可读性和可维护性。方便管理公共的…

[题单练习] 大模拟题

看完题后不知所措 P1058 [NOIP2008 普及组] 立体图 - 洛谷 | 计算机科学教育新生态 (luogu.com.cn) 简单来说&#xff0c;题目要我们按照输入&#xff0c;把立体图画出来&#xff0c;先放张图来震撼一下 看题解&#xff0c;题解的思路如下&#xff1a; 1. 先把一个积木块存入…

栈和队列的动态实现(C语言实现)

✅✅✅✅✅✅✅✅✅✅✅✅✅✅✅✅ ✨✨✨✨✨✨✨✨✨✨✨✨✨✨✨✨ &#x1f33f;&#x1f33f;&#x1f33f;&#x1f33f;&#x1f33f;&#x1f33f;&#x1f33f;&#x1f33f;&#x1f33f;&#x1f33f;&#x1f33f;&#x1f33f;&#x1f33f;&#x1f33f;&#x1…

图像与二进制数据间的转换

概述 此文档主要是记录图像与二进制互相转换的方法&#xff0c;此文档记录于20220811 进行图片和二进制的互相转换 若想转为base64&#xff0c;可以看 图像与base64互转 的内容。 Python 资料&#xff1a;python 图片和二进制转换的三种方式_脸不大的CVer的博客-CSDN博客_p…

统计学-R语言-7.3

文章目录 前言总体方差的检验一个总体方差的检验两个总体方差比的检验 非参数检验总体分布的检验正态性检验的图示法Shapiro-Wilk和K-S正态性检验总体位置参数的检验 练习 前言 本篇文章继续对总体方差的检验进行介绍。 总体方差的检验 一个总体方差的检验 在生产和生活的许多…

1 月 26日算法练习

文章目录 九宫幻方穿越雷区走迷宫 九宫幻方 小明最近在教邻居家的小朋友小学奥数&#xff0c;而最近正好讲述到了三阶幻方这个部分&#xff0c;三阶幻方指的是将1~9不重复的填入一个33的矩阵当中&#xff0c;使得每一行、每一列和每一条对角线的和都是相同的。 三阶幻方又被称…

IS-IS:07 ISIS缺省路由

IS-IS 有两种缺省路由&#xff0c;第一种缺省路由是由 level-1 路由器在特定条件下自动产生的&#xff0c;它的下一跳是离它最近的 &#xff08;cost 最小&#xff09;level-1-2路由器。第二种缺省路由是 IS-IS 路由器上使用 default-route-advertise 命令产生并发布的。 本次实…

第十七讲_HarmonyOS应用开发Stage模型应用组件

HarmonyOS应用开发Stage模型应用组件 1. 应用级配置2. Module级配置3. Stage模型的组件3.1 AbilityStage3.1.1 AbilityStage的创建和配置3.1.2 AbilityStage的生命周期回调3.1.3 AbilityStage的事件回调&#xff1a; 3.2 UIAbility3.2.1 UIAbility生命周期3.2.3 UIAbility启动模…

CSAPP fall2015 深入理解计算机系统 Cache lab详解

Cache Lab cache lab 缓存实验 代码下载 从CSAPP上面下载对应的lab代码 http://csapp.cs.cmu.edu/3e/labs.html 环境准备 需要安装 valgrind。可以参考文章Valgrind centos。 安装好以后执行valgrind --version可以看到版本号。 Cache simulator cache simulator not a …

ART: Automatic multi-step reasoning and tool-use for large language models 导读

ART: Automatic multi-step reasoning and tool-use for large language models 本文介绍了一种名为“自动推理和工具使用&#xff08;ART&#xff09;”的新框架&#xff0c;用于解决大型语言模型&#xff08;LLM&#xff09;在处理复杂任务时需要手动编写程序的问题。该框架可…

【音视频原理】音频编解码原理 ③ ( 音频 比特率 / 码率 | 音频 帧 / 帧长 | 音频 帧 采样排列方式 - 交错模式 和 非交错模式 )

文章目录 一、音频 比特率 / 码率1、音频 比特率2、音频 比特率 案例3、音频 码率4、音频 码率相关因素5、常见的 音频 码率6、视频码率 - 仅做参考 二、音频 帧 / 帧长1、音频帧2、音频 帧长度 三、音频 帧 采样排列方式 - 交错模式 和 非交错模式1、交错模式2、非交错模式 一…

排序问题上机考试刷题

排序与查找可以说是计算机领域最经典的问题&#xff0c;排序和查找问题在考研机试真题中经常出现。排序考点在历年机试考点中分布广泛。排序既是考生必须掌握的基本算法&#xff0c;又是考生 学习其他大部分算法的前提和基础。首先学习对基本类型的排序。对基本类型排序&#x…

【C++中的STL】函数对象

函数对象 函数对象概念谓词概念 内建函数对象算术仿函数关系仿函数逻辑仿函数&#xff08;基本用不到&#xff09; 函数对象概念 重载函数调用操作符的类&#xff0c;其对象常称为函数对象&#xff0c;函数对象使用重载的()时。行为类似函数调用&#xff0c;也叫仿函数。 函数…

分析crash日志

每一天都要快乐的进步~~ 文章目录 在分析 crash 日志时&#xff0c;通常需要关注以下信息&#xff1a; 1️⃣ 错误信息&#xff1a;了解 crash 的具体错误信息&#xff0c;这有助于定位问题的根源所在。 2️⃣ 堆栈跟踪&#xff1a;查看堆栈跟踪&#xff0c;确定 crash 发生的…

4.F1 评分机器学习模型性能的常用的评估指标

F1评分作为机器学习领域中的一个综合性评价指标&#xff0c;旨在在准确率和召回率之间寻求平衡&#xff0c;进而提供对模型性能全面评估的手段。本文将深入探讨F1评分的定义、计算方法、应用领域、案例研究以及未来发展方向&#xff0c;力求为读者提供详实而全面的了解。 一.F…

C#学习笔记_StringBuilder+程序效率测试

String问题&#xff1a;当程序中进行过多字符串处理操作时&#xff0c;会在内存中产生过多垃圾信息&#xff0c;影响程序效率。 StringBuilder简介 StringBuilder为一个类&#xff0c;属于引用类型。StringBuilder与string的区别在于&#xff0c;StringBuilder对于字符串的操…

osgEarth真HelloWorld

osgEarth真HelloWorld vcpkg installtests vcpkg install osgEarth安装指南 https://docs.osgearth.org/en/latest/install.html&#xff0c; 预先设置ports/osg/portfile.cmake GL3 否则调用osg相关功能时会出现如下提示 OpenSceneGraph does not define OSG_GL3_AVAILABLE; …

语音方向精典论文品读_HuBERT

英文名称: HuBERT: Self-Supervised Speech Representation Learning by Masked Prediction of Hidden Units 中文名称: HuBERT&#xff1a;通过隐藏单元的屏蔽预测进行自监督语音表示学习 链接: http://arxiv.org/abs/2106.07447v1 代码: https:// github.com/pytorch/fairseq…

vertica10.0.0单点安装_ubuntu18.04

ubuntu的软件包格式为deb&#xff0c;而rpm格式的包归属于红帽子Red Hat。 由于项目一直用的vertica-9.3.1-4.x86_64.RHEL6.rpm&#xff0c;未进行其他版本适配&#xff0c;而官网又下载不到vertica-9.3.1-4.x86_64.deb&#xff0c;尝试通过alian命令将rpm转成deb&#xff0c;但…

盘古信息IMS OS 数垒制造操作系统+ 产品及生态部正式营运

启新址吉祥如意&#xff0c;登高楼再谱新篇。2024年1月22日&#xff0c;广东盘古信息科技股份有限公司新办公楼层正式投入使用并举行了揭牌仪式&#xff0c;以崭新的面貌、奋进的姿态开启全新篇章。 盘古信息总部位于东莞市南信产业园&#xff0c;现根据公司战略发展需求、赋能…