Kafka(六)Kafka基本客户端命令操作

转载自:https://blog.51cto.com/littledevil/2147950

主题管理

创建主题

如果配置了auto.create.topics.enable=true(这也是默认值)这样当生产者向一个没有创建的主题发送消息就会自动创建,其分区数量和副本数量也是有默认配置来控制的。

# 我们这里创建一个3个分区每个分区有2个副本的主题
kafka-topics.sh --create --zookeeper 172.16.48.171:2181/kafka --replication-factor 2 --partitions 3 --topic KafkaTest
--create表示建立
--zookeeper

表示ZK地址,可以传递多个,用逗号分隔

--zookeeper IP:PORT,IP:PORT,IP:PORT/kafka

--replication-factor表示副本数量,这里的数量是包含Leader副本和Follower副本,副本数量不能超过代理数量
--partitions表示主题的分区数量,必须传递该参数。Kafka的生产者和消费者采用多线程并行对主题的消息进行处理,每个线程处理一个分区,分区越多吞吐量就会越大,但是分区越多也意味着需要打开更多的文件句柄数量,这样也会带来一些开销。
--topic表示主题名称

image.png

在Zookeeper中可以看到如下信息

image.png

删除主题

删除有两种方式手动和自动

  • 手动方式需要删除各个节点日志路径下的该主题所有分区,并且删除zookeeper上/brokers/topics和/config/topics下的对应主题节点

  • 自动删除就是通过脚本来完成,同时需要配置服务器配置文件中的delete.topic.enable=true,默认为false也就是说通过命令删除主题只会删除ZK中的节点,日志文件不会删除需要手动清理,如果配置为true,则会自动删除日志文件。

kafka-topics.sh --delete --zookeeper 172.16.48.171:2181/kafka --topic KafkaTest

image.png

下面的两句话就是说该主题标记为删除/admin/delete_topics节点下。实际数据没有影响因为该参数没有设置为true。

查看主题

# 列出所有主题
kafka-topics.sh --list --zookeeper 172.16.48.171:2181/kafka

image.png

下面是从ZK中看到的所有主题

image.png

# 查看所有主题信息
kafka-topics.sh --describe --zookeeper 172.16.48.171:2181/kafka

image.png

# 查看特定主题信息
kafka-topics.sh --describe --zookeeper 172.16.48.171:2181/kafka --topic BBB

image.png

Replicas:是AR列表,表示副本分布在哪些代理上,且该列表第一个元素就是Leader副本所在代理

ISR:该列表是显示已经同步的副本集合,这个列表的副本都是存活的

# 通过--describe 和 --under-replicated-partitions 可以查看正在同步的主题或者同步可能发生异常,
# 也就是ISR列表长度小于AR列表,如果一切正常则不会返回任何东西,也可以通过 --tipic 指定具体主题
kafka-topics.sh --describe --zookeeper 172.16.48.171:2181/kafka --under-replicated-partitions
# 查看哪些主题建立时使用了单独的配置 
kafka-topics.sh --describe --zookeeper 172.16.48.171:2181/kafka --topics-with-overrides

这里只有一个内部主题__comsumer_offsets使用了非配置文件中的设置

image.png

 

配置管理

所谓配置就是参数,比如修改主题的默认参数。

主题级别的

# 查看配置
kafka-configs.sh --describe --zookeeper 172.16.48.171:2181/kafka --entity-type topics --entity-name BB

这里显示 Configs for topic 'BBB' are 表示它的配置有哪些,这里没有表示没有为该主题单独设置配置,都是使用的默认配置。

image.png

# 增加一个配置
kafka-configs.sh --zookeeper 172.16.48.171:2181/kafka --entity-type topics --entity-name BBB --alter --add-config flush.messages=2

image.png

如果修改的话还是相同的命令,只是把值修改一下

image.png

# 删除配置
kafka-configs.sh --zookeeper 172.16.48.171:2181/kafka --entity-type topics --entity-name BBB --alter --delete-config flush.messages

image.png

客户端级别

这个主要是设置流控

# 设置指定消费者的流控 --entity-name 是客户端在创建生产者或者消费者时是指定的client.id名称
kafka-configs.sh --zookeeper 172.16.48.171:2181/kafka --alter --add-config 'producer_byte_rate=1024,consumer_byte_rate=1024' --entity-type clients --entity-name COMSUMER_NAME

image.png

下图为ZK中对应的信息

image.png

 

分区管理

分区平衡

Leader副本在集群中应该是均衡分布,因为Leader副本对外提供读写服务,尽可能不让同一个主题的多个Leader副本在同一个代理上,但是随着时间推移比如故障转移等情况发送,Leader副本可能不均衡。有两种方式设置自动平衡,自动和手动。

自动就是在配置文件中增加 auto.leader.rebalance.enable true 如果该项为false,当某个节点故障恢复并重新上线后,它原来的Leader副本也不会转移回来,只是一个Follower副本。

手动就是通过命令来执行

kafka-preferred-replica-election.sh --zookeeper 172.16.48.171:2181/kafka

分区迁移

当下线一个节点需要将该节点上的分区副本迁移到其他可用节点上,Kafka并不会自动进行分区迁移,如果不迁移就会导致某些主题数据丢失和不可用的情况。当增加新节点时,只有新创建的主题才会分配到新节点上,之前的主题分区不会自动分配到新节点上,因为老的分区在创建时AR列表中没有这个新节点。

image.png

上面2个主题,每个主题3个分区,每个分区3个副本,我们假设现在代理2要下线,所以我们要把代理2上的这两个主题的分区数据迁移出来。

# 1. 在KAFKA目录的config目录中建立topics-to-move.json文件
{"topics":[{"topic":"AAA"},{"topic":"BBB"}],"version":1
}

 

# 2. 生成分区分配方案,只是生成一个方案信息然后输出
kafka-reassign-partitions.sh --zookeeper 172.16.48.171:2181/kafka --topics-to-move-json-file ./topics-to-move.json --broker-list "1,2" --generate

image.png

这个命令的原理是从zookeeper中读取主题元数据信息及制定的有效代理,根据分区副本分配算法重新计算指定主题的分区副本分配方案。把【Proposed partition reassignment configuration】下面的分区方案保存到一个JSON文件中,partitions-reassignment.json 文件名无所谓。

# 3. 执行方案
kafka-reassign-partitions.sh --zookeeper 172.16.48.171:2181/kafka --reassignment-json-file ./partitions-reassignment.json --execute

image.png

# 4. 查看进度
kafka-reassign-partitions.sh --zookeeper 172.16.48.171:2181/kafka --reassignment-json-file ./partitions-reassignment.json --verify

image.png

查看结果,这里已经没有代理0了。

image.png

集群扩容

上面演示了节点下线的数据迁移,这里演示一下集群扩容的数据迁移。我们还是用上面两个主题,假设代理0又重新上线了。其实扩容就是上面的反向操作

# 1. 建立JSON文件
# 该文件和之前的相同

 

# 2. 生成方案并保存到一个JSON文件中
kafka-reassign-partitions.sh --zookeeper 172.16.48.171:2181/kafka --topics-to-move-json-file ./topics-to-move.json --broker-list "0,1,2" --generate

image.png

# 3. 数据迁移,这里通过--throttle做一个限流操作,如果数据过大会把网络堵塞。
kafka-reassign-partitions.sh --zookeeper 172.16.48.171:2181/kafka --reassignment-json-file ./partitions-reassignment.json --execute --throttle 1024

image.png

查看进度和结果

image.png

增加分区

通常在需要提供吞吐量的时候我们会增加分区,然后如果代理数量不扩大,同时生产者和消费者线程不增大,你扩展了分区也没有用。

image.png

kafka-topics.sh --alter --zookeeper 172.16.48.171:2181/kafka --partitions 3 --topic KafkaTest03

image.png

增加副本

集群规模扩大并且想对所有主题或者指定主题提高可用性,那么可以增加原有主题的副本数量

image.png

上面是3个分区,每个分区1个副本,我们现在把每个分区扩展为3个副本

# 1. 创建JSON文件 replica-extends.json 
{"version": 1,"partitions": [{"topic": "KafkaTest04","partition": 0,"replicas": [0,1,2]},{"topic": "KafkaTest04","partition": 1,"replicas": [0,1,2]},{"topic": "KafkaTest04","partition": 2,"replicas": [0,1,2]}]
}
# 2. 执行分区副本重新分配命令
kafka-reassign-partitions.sh --zookeeper 172.16.48.171:2181/kafka --reassignment-json-file ./replica-extends.json --execute

image.png

查看状态

image.png

查看结果

image.png

 

镜像操作

Kafka有一个镜像工具kafka-mirror-maker.sh,用于将一个集群数据同步到另外一个集群中,这个非常有用,比如机房搬迁就需要进行数据同步。该工具的本质就是创建一个消费者,在源集群中需要迁移的主题消费数据,然后创建一个生产者,将消费的数据写入到目标集群中。

首先创建消费者配置文件mirror-consumer.properties(文件路径和名称是自定义的)

# 源kafka集群代理地址列表
bootstrap.servers=IP1:9092,IP2:9092,IP3:9092
# 消费者组名
group.id=mirror

其次创建生产者配置文件mirror-producer.properties(文件路径和名称是自定义的)

# 目标kafka集群地址列表
bootstrap.servers=IP1:9092,IP2:9092,IP3:9092

运行镜像命令

# 通过 --whitelist 指定需要镜像的主题,通过  --blacklist 指定不需要镜像的主题
kafka-mirror-maker.sh --consumer.config PATH/mirror-consumer.properties --producer.config PATH/mirror-producer.properties --whitelist TOPIC

由于镜像操作是启动一个生产者和消费者,所以数据同步完成后这个生产者和消费者并不会关闭,它会依然等待新数据,所以同步完成以后你需要自己查看,确认完成了则关闭生产者和消费者。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/510059.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

linux平台C++开发基本知识

最近工作中,需要在linux上开发C程序。有了下面的问题。 1,linux平台C开发和win32 C开发有什么区别呢? 2,除了C语言,数据结构等必须的知识外,还需要些了解什么呢? 3,如何在linux平台开…

Impala 调用Hbase 报错 LeaseException

impala调用Hbase表数据操作时,提示 LeaseException 异常,主要原因是hbase数据过大,调用期间没有汇报心跳导致 WARNINGS: LeaseException: org.apache.hadoop.hbase.regionserver.LeaseException: lease -8355984789923245890 does not exist…

mitmproxy 中间人代理工具,抓包工具,linux抓包工具 mitmproxy 使用

mitmproxy是一个支持HTTP和HTTPS的抓包程序,类似Fiddler、Charles的功能,可以在linux以命令行形式的展示抓包信息 mitmdump:它是mitmproxy的命令行接口,利用它我们可以对接Python脚本,用Python实现监听后的处理。 mitm…

Ajax学习总结+案例

一、AJAX简介 1、Asynchronous JavaScript And XML指异步 JavaScript 及 XML 2、老技术新用法。是基于JavaScript、XML、HTML、CSS新用法 二、同步和异步(理解) 三、第一个异步请求案例(熟悉编码步骤) 四、XmlHttpRequest&…

Spark 运行内存不足Not enough space to cache rdd in memory,Container killed by YARN for exceeding memory

日志报错(WARN类型最后执行成功可以忽略): 19/04/15 12:35:37 INFO memory.MemoryStore: Will not store rdd_2_5119/04/15 12:35:37 WARN memory.MemoryStore: Not enough space to cache rdd_2_51 in memory! (computed 1109.7 MB so far)…

Spark 某两个节点数据分析速度慢 - hbase数据删除(分裂) 元信息未删除导致 There is an overlap in the region chain.

基于Hbase2.0,Spark2.2 问题描述 执行Spark处理Hbase数据时,遇到某两个Excutor处理速度特别慢,如图 正常速度10多分钟 左右处理完成, 一个多小时有另外一个处理完成,还有一个在处理中。 分析原因 1.查看hbase数据分…

在MFC程序中增加控制台窗口

MFC程序中,如果想要输出调试信息,我们一般都是TRACE或者使用LOG文件,都不是很方便,第一个需要我们在调试状态下,第二个也要配置麻烦而且不直观。而使用Console来显示调试信息应该是更好的选择。下面介绍几种在MFC程序中…

mysql配置

MySQL5.6.11安装步骤(Windows7 64位) http://jingyan.baidu.com/article/f3ad7d0ffc061a09c3345bf0.html1. 下载MySQL Community Server 5.6.21,注意选择系统类型(32位/64位) 2. 解压MySQL压缩包 将以下载的MySQL压缩包…

大数据技术讲解

HDFS的体系架构 整个Hadoop的体系结构主要是通过HDFS来实现对分布式存储的底层支持,并通过MR来实现对分布式并行任务处理的程序支持。 HDFS采用主从(Master/Slave)结构模型,一个HDFS集群是由一个NameNode和若干个DataNode组…

Hbase快照Snapshot 数据备份、恢复与迁移

场景 hbase数据迁移时我们需要统计迁移时的数据量,以确保迁移后的数据的完成,但是如果hbase表数据持续增加的话,迁移时无法统计出准确的数据量,此时我们使用快照的方式进行数据迁移,以确保迁移的数量可以和某一时间节点…

HUE 打开 WorkFlow异常 Operation category READ is not supported in state standby

异常:在hue上配置的一些定时任务突然停止执行。 1.打开页面HUE->WorkFlow 发现页面异常,无法进入WorkFlow,如下图 2.查看HUE日志 查看到WebHdfsException异常,访问HDFS文件浏览器报错, [26/Jun/2019 09:29:55 080…

Hbase Native memory allocation (mmap) failed to map xxx bytes for committing reserved memory

新启动测试环境Hbase报错,报错日志如下 # # There is insufficient memory for the Java Runtime Environment to continue. # Native memory allocation (mmap) failed to map 31715688448 bytes for committing reserved memory. # An error report file with mo…

CDH Yarn资源动态分配 - 指定资源限制 公平调度具体设置

日常工作中会涉及到各种资源分配等问题,跨部门,跨业务等等,这里介绍基于CDH版本的Yarn 公平调度(实际使用DRF调度) 不同时间配置不同资源参考(计划模式):https://datamining.blog.cs…

CDH 配置YARN动态资源池的计划模式,根据时间划分资源,不同时间不同队列使用不同资源

公平调度配置可参考:http://datamining.blog.csdn.net/article/details/94554469 目录 计划模式设置 队列资源抢占分配 计划模式设置 1.创建计划规则 2.设置白天配置,可以根据业务选择具体配置计划,调整资源,选择每天&#xff…

java集合类总结

Collection:单列集合类的跟接口,用于存储一系列符合某种规则的元素,它有两个重要的子接口,分别时List和Set还有Queue。其中List的特点时元素有序,元素可重复,Set的特点时元素无序且不可重复,Que…

TCP为什么是四次挥手

TCP 3次握手 客户端向服务器发送一个SYN(包含了SYN,SEQ)。 当服务器接收到客户端发过来的SYN时,会向客户端发送一个SYNACK的数据包,其实ACK的ack等于上一次发送SYN数据包的(SYNSEQ)。 当客户…

TCP的三次握手和四次挥手详解

为什么需要“三次握手” 在谢希仁著《计算机网络》第四版中讲“三次握手”的目的是“为了防止已失效的连接请求报文段突然又传送到了服务端,因而产生错误”。在另一部经典的《计算机网络》一书中讲“三次握手”的目的是为了解决“网络中存在延迟的重复分组”的问题。…

zabbix server is not running the information displayed may not be current

页面报错如下 查看日志提示 30037:20190710:193016.878 cannot start alert manager service: Cannot bind socket to "/var/run/zabbix/zabbix_server_alerter.sock": [13] Permission denied.30039:20190710:193016.879 server #30 started [preprocessing manage…

VS2010项目配置详解

首先看一下项目设置中可以使用的宏(环境变量),常用的有: ConfigurationName 配置名字,通常是Debug或者Release IntDir 编译器使用的中间目录,产出obj文件 OutDir 链接器使用的输出目录 ProjectDir 项目目录…

Centos7 下 zabbix服务安装与部署,linux监控服务

客户端安装参考:https://mp.csdn.net/postedit/95475740 安装Zabbix 关闭 SeLinux 临时关闭 setenforce 0 永久关闭 vi /etc/selinux/config 关闭防火墙 临时关闭 systemctl stop firewalld.service 永久关闭 systemctl disable firewalld.service安装基础环…