kafka创建topic命令_0748-5.14.4-Kafka的扩容和缩容

​文档编写目的

在Kafka集群资源使用已超出系统配置的资源时,或者有大量资源闲置造成资源浪费的时候,需要分别通过扩容Kafka和缩容Kafka来进行调整。本篇文章Fayson主要介绍如何进行Kafka的扩容和缩容,以及变更后的Kafka集群如何进行负载均衡的操作。

  • 测试环境:

1.Redhat7.2

2.采用root用户操作

3.CM为5.16.2,CDH为5.14.4

4.Kafka版本为0.10.2

5.集群启用了Kerberos,Kafka未启用Kerberos和Sentry

Kafka集群的扩容

2.1 当前Kafka集群状态

集群中有3个kafka broker

3b469c60a200ec9bc31f51fae618ed01.png

有2个topic,分别为test和test1,情况如下

1b700ddbeb84c42c667e6c694bea7ed1.png
1799198729f60e2db732fbfc3561b108.png

2.2 扩容前准备

新扩容的机器要先加入集群中,通过CM管理,按照下面的步骤进行操作

1.修改新添加的机器的hostname

[root@hadoop6 ~]# hostnamectl set-hostname cdh04.hadoop.com
8739d1b8e606a50cb3358574124c4900.png

2.修改/etc/hosts文件并同步到所有节点,这里用脚本来实现。

192.168.0.204 cdh01.hadoop.com cdh01192.168.0.205 cdh02.hadoop.com cdh02192.168.0.206 cdh03.hadoop.com cdh03192.168.0.195 cdh04.hadoop.com cdh04
4fb3a81126177b25e892074f16f30f6d.png
f044fc4ffd249601c4faa3d8c4303ace.png

3.新添加的节点关闭防火墙,设置开机自动关闭

[root@cdh04 ~]# systemctl stop firewalld[root@cdh04 ~]# systemctl disable firewalld
36ec3047a41bb7f6d0e9f294ac2a832e.png

4.新添加的节点禁用SELinux,并修改修改/etc/selinux/config

[root@cdh04 ~]# setenforce 0setenforce: SELinux is disabled[root@cdh04 ~]# vim /etc/selinux/config
cccab90340efc56fac928051f3697acf.png

5.新添加的节点关闭透明大页面,并且在/etc/rc.d/rc.local 里面加入脚本,设置自动开机关闭。

[root@cdh04 ~]# echo never > /sys/kernel/mm/transparent_hugepage/defrag[root@cdh04 ~]# echo never > /sys/kernel/mm/transparent_hugepage/enabled[root@cdh04 ~]# vim /etc/rc.d/rc.localif test -f /sys/kernel/mm/transparent_hugepage/defragthen echo never > /sys/kernel/mm/transparent_hugepage/defragfiif test -f /sys/kernel/mm/transparent_hugepage/enabledthen echo never > /sys/kernel/mm/transparent_hugepage/enabledfi
a654f6de5f7af9e30256d068be7ca0d4.png
[root@cdh04 ~]# chmod +x /etc/rc.d/rc.local

6.新添加的节点设置SWAP,并且设置开机自动更改

[root@cdh04 ~]# sysctl vm.swappiness=1vm.swappiness = 1[root@cdh04 ~]# echo vm.swappiness = 1  >> /etc/sysctl.conf
6b4bd88560bb211441332efde9213583.png

7.新添加的节点配置时钟同步,先在所有服务器卸载chrony,再安装ntp,再修改配置把时钟同步跟其他kafka broker节点保持一致

[root@cdh04 ~]# yum -y remove chrony[root@cdh04 ~]# yum -y install ntp
6184ee77e1acb403a60f2e4d67047341.png
vim /etc/ntp.conf
b318a44e2e77da7e6b9922345b8752eb.png

启动ntp服务,并设置开机启动

[root@cdh04 java]# systemctl start ntpd[root@cdh04 java]# systemctl enable ntpd

8.新添加的节点安装kerberos客户端,并进行配置

[root@cdh04 ~]# yum -y install krb5-libs krb5-workstation
4e0c813e07176e443f19de064ad15b2b.png

从其他节点拷贝/etc/krb5.conf文件到新节点

da29feb4badb76e17b40e5427e8ae86a.png

9.从其他节点拷贝集群使用的JDK到新节点/usr/java目录下

[root@cdh01 java]# scp -r jdk1.8.0_131/ cdh04:/usr/java/jdk1.8.0_131/
2b50ca74b13f98f1f583359de7e38ef9.png

10.在CM编辑主机模板kafka,只添加kafka broker角色

303c9a6ebcc6f7df1a37ade5fbab51da.png

2.3 扩容Kafka

1.添加新的节点到集群,从CM主页点击Add Hosts,如下图所示

112a4f8d0e92f59b31a6268184c08a61.png

2.点击继续

35f0f993d24924263f56af07d4257550.png

3.搜索主机,输入主机名,点击搜索并继续

9c1315993bccd67842dbe47273d3d88a.png
f7fd3ddf2fd1a2495dfd9cf4b8813e01.png

4.输入自定义存储库地址,并继续

4b955bcfa2d0dcdc5b168dd8e16c6162.png

5.点击安装JDK并继续

e1962c4210b55bae872c3f4b9cb4a3c2.png

6.输入主机密码并继续

91fb31998905affcfcd6327243a411c2.png

7.正在进行安装,安装成功后点击继续

600cb5daf4d066150d073f0c08720f28.png
c9355c61b4e8b294544da33cce362a17.png
a9c67726b86aaa239a3438c0c668e102.png

激活完成,点击继续

907c2e691cb1cbe694bc175472040241.png

8.点击继续,进入主机检查,添加主机完成

d4e512b56ac6568a3b267eaec63ada5c.png

9.应用主机模板kafka,扩容完成

601268a212b3c8b13c7d31610c9dfc34.png
8294b193a6db1ff05038b69181583fe8.png

扩容完成

45d8602200529803780b4d182220cb47.png

扩容后平衡

在扩容完成后,可以通过自带的命令来生成topic的平衡策略和执行平衡的操作。

3.1 生成平衡策略

1.查询当前创建的topic

[root@cdh01 ~]# kafka-topics --list --zookeeper cdh01.hadoop.com:2181
6bed370127b20e41ade26a5f3c8cd973.png

2.按照查询到的topic来创建文件topics-to-move.json 格式如下

{"topics": [{"topic": "test"},{"topic": "test1"}            ],"version":1}
b3274a56475e73dbd9f90592d71088fa.png

3.在CM查询kafka broker的ID

3140a0315f56d13dcc206797c64ca10a.png

4.用命令生成迁移方案,下面画红框的就是生成的平衡方案

kafka-reassign-partitions --zookeeper cdh01.hadoop.com:2181 --topics-to-move-json-file topics-to-move.json  --broker-list "121,124,125,126" --generate
3ef4571bcf91b9cc4418f75d1bcfb07a.png

5.把Proposed partition reassignment configuration下的内容复制保存为json文件newkafka.json,平衡方案生成完成。

0193f1d26d8ea74703b6c041f03178e9.png

3.2 执行平衡策略

1.执行下面的命令来进行平衡

[root@cdh01 ~]# kafka-reassign-partitions --zookeeper cdh01.hadoop.com:2181 --reassignment-json-file newkafka.json --execute
07f42e0834b83cb4d517330fe93feb6e.png

2.用下面命令来进行查看执行的进度

kafka-reassign-partitions --zookeeper cdh01.hadoop.com:2181 --reassignment-json-file newkafka.json --verify
9aa81c350ded2f4af89308ee45101e95.png

从结果可以看到已经执行完成。,并且在新添加的节点上也可以看到有topic的副本存过来了。

38846468a01d2008600c732db0e3a94c.png

缩容前的准备

1.编辑之前创建的topics-to-move.json文件,添加上系统自动生成的__consumer_offsets

[root@cdh01 ~]# vim topics-to-move.json {"topics": [{"topic": "test"},{"topic": "__consumer_offsets"},{"topic": "test1"}            ],"version":1}
4421457fb7a6f1eb1bd45cb9efdabd29.png

2.再次使用命令生成迁移计划,这里只选取121,124,126这三个broker,然后把生成计划中的126替换成125进行保存,这样就把126上的数据全部迁移到了125上。

kafka-reassign-partitions --zookeeper cdh01.hadoop.com:2181 --topics-to-move-json-file topics-to-move.json  --broker-list "121,124,126" --generate
a868fc635a35661c0dee1f54af9d32bf.png
884965c01ebf6abbeea117013b990b3c.png

3.执行迁移命令,进行迁移

kafka-reassign-partitions --zookeeper cdh01.hadoop.com:2181 --reassignment-json-file newkafka.json --execute
d712a5ca973a2abeb8cacaaa4bb0cbe8.png

4.进行查询,迁移完成

kafka-reassign-partitions --zookeeper cdh01.hadoop.com:2181 --reassignment-json-file newkafka.json --verify
c93f80ead971c7feb498cd11f8ff392e.png

5.在要删除的broker上也可以看到,topic数据已经迁移走

1772b6f59564a6832f43fa7ac33ca8e2.png

Kafka集群的缩容

在完成上诉缩容前的准备后,现在可以进行kafka集群的缩容。

1.从CM进入Kafka的实例界面

09ddf54d7b61a8aa63da37a410a2d9bb.png

2.勾选要删除的broker,先停止该broker

fe223607e5c56e1f487c1ac996d45f95.png

3.停止完成后,进行删除

058332f8bf36aa34af8ea15d73791861.png

删除完成。

a92e02572cd27c9c68136b4b8935d29b.png

总结

1.Kafka集群的扩容和缩容可以通过CM来进行添加broker和删除broker来进行。

2.在Kafka集群扩容后,已有topic的partition不会自动均衡到新的磁盘上。可以通过kafka-reassign-partitions命令来进行数据平衡,先用命令生成平衡方案,再执行。也可以手动编辑迁移方案来进行执行。

3.新建topic的partition, 会以磁盘为单位,按照partition数量最少的来落盘。

4.在Kafka缩容前,需要把要删除的broker上的topic数据迁出,也可以通过kafka-reassign-partitions来进行迁移,手动编辑迁移方案,再通过命令执行即可。

5.kafka-reassign-partitions这个命令,只有指定了broker id上的topic才会参与partition的reassign。根据我们的需求,可以手动来编写和修改迁移计划。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/431514.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

java 对话框 显示图片_Java对话框上显示图片

手掌心其实有很多种方法可以解决图片显示大小的问题:使用photoshop修改. 优点是可以节省系统资源, 显示图片的时候,不用做处理,缺点是需要了解ps的基本操作使用JDialog 自定义对话框. 优点 可以实现复杂的效果, 缺点,代码量比较多使用ImageIcon, Image 类 实现图片的缩放,. 优点…

class-dump获取iOS私有api

转自:http://blog.csdn.net/sunyuanyang625/article/details/41440167 获取各类iOS私有api 安装工具class-dump 资源地址http://download.csdn.net/detail/map625/8191343 运行class-dump并编译src项目 编译之后在produce中找到编译好的class…

python能开发什么产品_三周学 Python ?不,三周做个产品

我的同事在看到毫无开发经验的我用三周时间,不但从零基础用上了 Python,还做出了一个客户关系管理系统,强烈邀请我分享经验。惶恐,因为我并没有出色的智商,也没有觉得三周学 Python 是一个体现自己牛叉的事情(不少人可…

爬楼梯 java_Leetcode 70.爬楼梯(Java)

题目:假设你正在爬楼梯。需要 n 阶你才能到达楼顶。每次你可以爬 1 或 2 个台阶。你有多少种不同的方法可以爬到楼顶呢?注意:给定 n 是一个正整数。示例 1:输入: 2输出: 2解释: 有两种方法可以爬…

WebView之2

首先需要添加权限&#xff1a; <uses-permission android:name"android.permission.INTERNET"/>   MainActivity: package com.wyl.webview;import android.app.Activity; import android.app.ProgressDialog; import android.os.Bundle; import android.vie…

火星云分发全网视频_好用的短视频一键分发软件,让工作效率提高10倍

随着近几年新媒体行业的高速发展&#xff0c;新媒体行业的红利也越来越来&#xff0c;也有越来越多的人想要享受到这波红利&#xff0c;于是不管是个人是企业都纷纷开始进入这个市场。不过也随之诞生了一系列麻烦繁琐的问题&#xff0c;如怎么持续创作内容&#xff0c;怎么花费…

java 采样_Java编程实现beta分布的采样或抽样实例代码

本文研究的主要是Java编程实现beta分布的采样或抽样&#xff0c;具体如下。本文将使用math3提供的工具包&#xff0c;对beta分布进行采样。如下程序是对alpha81&#xff0c;beta219的beta分布函数&#xff0c;进行抽样&#xff0c;共采样10000次。package function;/*** author…

树莓派 蓝牙音响_你应该拥有一个树莓派

为什么你应该拥有一个树莓派树莓派并不是极客的玩具树莓派可以用来做什么?它能实现的实在是太多了,最常见的比如自动化脚本 各种机器人bot: QQ/wechat/微博/facebook/telegram,IM结合爬虫 telegram的bot如今被黑产们结合比特币,做成了交易系统 使用宝塔linux面板,一键安装word…

训练集的识别率一直波动_机器学习验证集为什么不再有新意?

机器学习中&#xff0c;一般将样本数据分成独立的三部分&#xff1a;训练集、验证集和测试集。其中验证集在机器学习中所起到的作用是&#xff1a;开发模型总需要调节模型的参数&#xff0c;而整个调节过程需要在验证集数据上运行训练的模型&#xff0c;从而给出其表现的反馈信…

java写一个web聊天工具_javaweb写的在线聊天应用

写这个玩意儿就是想练练手&#xff0c; 用户需要登陆才能在线聊天&#xff0c;不要依赖数据库&#xff0c; 不需要数据库的操作&#xff0c; 所有的数据都是保存在内存中&#xff0c; 如果服务器一旦重启&#xff0c;数据就没有了&#xff1b;登录界面&#xff1a;聊天界面&…

python dataframe删除重复行_详解pandas使用drop_duplicates去除DataFrame重复项参数

Pandas之drop_duplicates&#xff1a;去除重复项方法DataFrame.drop_duplicates(subsetNone, keepfirst, inplaceFalse)参数这个drop_duplicate方法是对DataFrame格式的数据&#xff0c;去除特定列下面的重复行。返回DataFrame格式的数据。subset : column label or sequence o…

O-C相关-08-动态类型与静态类型

08-动态类型与静态类型 1, 什么是动态类型和静态类型 1) 动态语言 又叫动态编程语言&#xff0c;是指程序在运行时可以改变其结构&#xff1a;新的函数可以被引进&#xff0c;已有的函数可以被删除等在结构上的变化。比如众所周知的ECMAScript(JavaScript)便是一个动态语言。除…

python中numpy是什么_什么是NumPy?

本文是对官方文档的翻译&#xff0c;原文在此What is NumPy? - NumPy v1.14 Manual​docs.scipy.org以下开始正文。NumPy是Python的一个用于科学计算的基础包。它提供了多维数组对象&#xff0c;多种衍生的对象(例如隐藏数组和矩阵)和一个用于数组快速运算的混合的程序&#x…

java for循环 等待_java – 主题:忙等待 – 空循环

参见英文答案 > Is this starvation? 2个在我们的大学课程中,我们学习了Threads并使用“Busy Waiting”方法作为在TrafficLight等待的汽车的示例.为此,我们构建了三个类&#xff1a;> TrafficLight(实现Runnable)>汽车(实现Runna…

人物角色群体攻击判定(一)

我们制作3D动作游戏的时候,常常主人公是一对多人进行攻击,或者敌人在角色前方一定范围内进行攻击. 我们怎么实现这种效果呢&#xff1f; 1. 使用触发器(多体攻击)方便调试,可视化(推荐)2. 运用点乘和叉乘,判断敌人的角度和位置(单,多)一般3. 使用Physics的Physics.OverlapSpher…

mfc cimage加载显示图片_在微信小程序里实现图片预加载组件

网页中的图片预加载我们知道在 Web 页面中实现图片的预加载其实很简单&#xff0c;通常的做法是在 JS 中使用 Image 对象即可&#xff0c;代码大致如下var image new Image() image.onload function() {console.log(图片加载完成) } image.src"//misc.360buyimg.com/lib…

java mongodb 删除字段类型_Mongodb基本数据类型、常用命令之增加、更新、删除

1.null---表示空值或者该字段不存在&#xff0c;如{"name"&#xff1a;null}2.布尔 --- 和java中的布尔一样&#xff0c;有两种&#xff1a;true,false,如{"sex":true}3.浮点数---shell中所有的数字都是浮点数&#xff0c;如{"age":12},{"m…

HDU 2242 双连通分量 考研路茫茫——空调教室

思路就是求边双连通分量&#xff0c;然后缩点&#xff0c;再用树形DP搞一下。 代码和求强连通很类似&#xff0c;有点神奇&#xff0c;_&#xff0c;慢慢消化吧 1 #include <cstdio>2 #include <cstring>3 #include <algorithm>4 #include <vector>5 #…

wxpython菜单栏嵌套窗口_如何在wxpython中使嵌套的Panel和Sizer工作

此处的逻辑是创建一个Panel,在其中添加控件,然后创建一个BoxSizer,在其中声明您在Panel中添加的每个控件在调整大小时的行为,并最终在Panel上设置应用BoxSizer的对象.您有2期.>首先,您缺少posPnlobject的上述语句的最后一部分.添加&#xff1a;posPnl.SetSizer(posPnlSzr)将…

Java多态与反射

多态通过分离做什么和怎么做&#xff0c;从另一个角度将接口与实现分离开来&#xff1b;通过多态来消除类型之间的耦合关系&#xff0c;在Java中&#xff0c;多态也叫动态绑定&#xff0c;后期绑定或运行时绑定&#xff0c;那么什么是方法绑定&#xff1f; 方法调用绑定&#x…