大数据技术之kafka (第 3 章 Kafka 架构深入 ) offset讲解

新版的 Kafka 使用一个选举出来的 controller 来监听 zookeeper,其他 node 再去和 controller 通信,这么做的目的是为了减少 zookeeper 的压力。bootstrap-servers 会自动发现其他 broker,这也是 bootstrap 的含义

前面我们讲到了消费者,接下来我们详细讲下消费者个数改变,分区重新分配是如何接着消费的

首先我们还是一样创建一个新的主题topic 主题是名字叫bigdata  这里报错原因我已经创建过了,主题已经存在

发送一条消息 hello  因为有两个分区,其中一个分区有变化,一个没有变化

我们开启一个消费者,让其中一个分区被消费

查看zookeeper  ,进入zk客户端

[root@backup01 bin]# ./zkCli.sh 
Connecting to localhost:2181
2020-04-06 08:54:43,793 [myid:] - INFO  [main:Environment@100] - Client environment:zookeeper.version=3.4.13-2d71af4dbe22557fda74f9a9b4309b15a7487f03, built on 06/29/2018 04:05 GMT
2020-04-06 08:54:43,816 [myid:] - INFO  [main:Environment@100] - Client environment:host.name=backup01
2020-04-06 08:54:43,816 [myid:] - INFO  [main:Environment@100] - Client environment:java.version=1.8.0_172
2020-04-06 08:54:43,817 [myid:] - INFO  [main:Environment@100] - Client environment:java.vendor=Oracle Corporation
2020-04-06 08:54:43,818 [myid:] - INFO  [main:Environment@100] - Client environment:java.home=/usr/local/java/jdk1.8.0_172/jre
2020-04-06 08:54:43,818 [myid:] - INFO  [main:Environment@100] - Client environment:java.class.path=/usr/local/hadoop/zookeeper/zookeeper-3.4.13/bin/../build/classes:/usr/local/hadoop/zookeeper/zookeeper-3.4.13/bin/../build/lib/*.jar:/usr/local/hadoop/zookeeper/zookeeper-3.4.13/bin/../lib/slf4j-log4j12-1.7.25.jar:/usr/local/hadoop/zookeeper/zookeeper-3.4.13/bin/../lib/slf4j-api-1.7.25.jar:/usr/local/hadoop/zookeeper/zookeeper-3.4.13/bin/../lib/netty-3.10.6.Final.jar:/usr/local/hadoop/zookeeper/zookeeper-3.4.13/bin/../lib/log4j-1.2.17.jar:/usr/local/hadoop/zookeeper/zookeeper-3.4.13/bin/../lib/jline-0.9.94.jar:/usr/local/hadoop/zookeeper/zookeeper-3.4.13/bin/../lib/audience-annotations-0.5.0.jar:/usr/local/hadoop/zookeeper/zookeeper-3.4.13/bin/../zookeeper-3.4.13.jar:/usr/local/hadoop/zookeeper/zookeeper-3.4.13/bin/../src/java/lib/*.jar:/usr/local/hadoop/zookeeper/zookeeper-3.4.13/bin/../conf:
2020-04-06 08:54:43,818 [myid:] - INFO  [main:Environment@100] - Client environment:java.library.path=/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib
2020-04-06 08:54:43,818 [myid:] - INFO  [main:Environment@100] - Client environment:java.io.tmpdir=/tmp
2020-04-06 08:54:43,818 [myid:] - INFO  [main:Environment@100] - Client environment:java.compiler=<NA>
2020-04-06 08:54:43,818 [myid:] - INFO  [main:Environment@100] - Client environment:os.name=Linux
2020-04-06 08:54:43,818 [myid:] - INFO  [main:Environment@100] - Client environment:os.arch=amd64
2020-04-06 08:54:43,818 [myid:] - INFO  [main:Environment@100] - Client environment:os.version=3.10.0-862.el7.x86_64
2020-04-06 08:54:43,818 [myid:] - INFO  [main:Environment@100] - Client environment:user.name=root
2020-04-06 08:54:43,818 [myid:] - INFO  [main:Environment@100] - Client environment:user.home=/root
2020-04-06 08:54:43,818 [myid:] - INFO  [main:Environment@100] - Client environment:user.dir=/usr/local/hadoop/zookeeper/zookeeper-3.4.13/bin
2020-04-06 08:54:43,819 [myid:] - INFO  [main:ZooKeeper@442] - Initiating client connection, connectString=localhost:2181 sessionTimeout=30000 watcher=org.apache.zookeeper.ZooKeeperMain$MyWatcher@277050dc
2020-04-06 08:54:43,845 [myid:] - INFO  [main-SendThread(localhost:2181):ClientCnxn$SendThread@1029] - Opening socket connection to server localhost/127.0.0.1:2181. Will not attempt to authenticate using SASL (unknown error)
Welcome to ZooKeeper!
JLine support is enabled
2020-04-06 08:54:43,960 [myid:] - INFO  [main-SendThread(localhost:2181):ClientCnxn$SendThread@879] - Socket connection established to localhost/127.0.0.1:2181, initiating session
2020-04-06 08:54:43,985 [myid:] - INFO  [main-SendThread(localhost:2181):ClientCnxn$SendThread@1303] - Session establishment complete on server localhost/127.0.0.1:2181, sessionid = 0x1000393bf830000, negotiated timeout = 30000WATCHER::WatchedEvent state:SyncConnected type:None path:null

我们进入了zk客户端的根目录   看到的除了zookeeper其他都是kafka

这个controller是后面要聊的,是关于kafka的老大controller不是指定的,这个controller是争抢资源的,谁先抢到谁就是老大,正常情况下,先启动的就是老大

[zk: localhost:2181(CONNECTED) 1] get /controllercontroller_epoch   controller
[zk: localhost:2181(CONNECTED) 1] get /controller
{"version":1,"brokerid":0,"timestamp":"1586133981273"}
cZxid = 0x500000003
ctime = Mon Apr 06 08:46:21 CST 2020
mZxid = 0x500000003
mtime = Mon Apr 06 08:46:21 CST 2020
pZxid = 0x500000003
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x2000392dd2c0000
dataLength = 54
numChildren = 0
[zk: localhost:2181(CONNECTED) 2] 

 我们看到brokerid:0   ---> backup01:9092

[zk: localhost:2181(CONNECTED) 3] ls /brokers
[ids, topics, seqid]
[zk: localhost:2181(CONNECTED) 4]

broker/ids    0    1     2

topics有5个主题

[zk: localhost:2181(CONNECTED) 2] ls /controller
[]
[zk: localhost:2181(CONNECTED) 3] ls /brokers
[ids, topics, seqid]
[zk: localhost:2181(CONNECTED) 4] ls /brokers/ids
[0, 1, 2]
[zk: localhost:2181(CONNECTED) 5] ls /brokers/topics
[secend, bigdata, three, first, __consumer_offsets]
[zk: localhost:2181(CONNECTED) 6] 

 在新的版本中topics中多了一个__consumer_offsets这样一个主题

查看到消费者的默认50个分区信息

这里我也不知道为啥在zk客户端查看consumers(消费者组consumer-group)信息为空,,从上面的消息可以知道,我的生产者和消费者都是正常的,不知道新版本有什么特殊的改变,

查资料可知:Kafka 0.9 版本之前,consumer 默认将 offset 保存在 Zookeeper 中,从 0.9 版本开始,consumer 默认将 offset 保存在 Kafka 一个内置的 topic 中,该 topic 为__consumer_offsets。 

1)修改配置文件 consumer.properties 

exclude.internal.topics=false 
cd /usr/local/hadoop/kafka/kafka_2.12-2.4.1/config
vim consumer.properties

所以我们知道了新版的kafka使用__consumer_offsets想要消费生成功 必须修改consumer.properties 这个配置文件才能正常记录消费信息

__consumer_offsets这个也是个主题,正常情况下,kafka并不希望我们去获取这个主题下的数据,我们这里是为了看里面的消费情况,所以我们就设置一下

2)读取 offset  (我们使用的是0.11版本之后)

0.11.0.0 之前:
bin/kafka-console-consumer.sh --topic __consumer_offsets zookeeper backup01:2181 --formatter "kafka.coordinator.GroupMetadataManager\$OffsetsMessageFormatter" --consumer.config config/consumer.properties from-beginning 
0.11.0.0 之后版本(含): 
bin/kafka-console-consumer.sh --topic __consumer_offsets zookeeper backup01:2181 --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --consumer.config config/consumer.properties frombeginning 

 

 这个很关键,指定配置文件,如果不指定,那就不生效

这里抛出一个问题,为什么要使用zookeeper?

假设我们要启动一个消费者A去消费这个T1这个主题,我们连的是bootstrap-server,这个A 会把消息放到 topic 为__consumer_offsets这个主题下,那A是不是相当于__consumer_offsets这个主题的生产者,我们要消费的是__consumer_offsets,如果我们这里__consumer_offsets连的bootstrap-server,感觉这个__consumer_offsets又充当了生产者,感觉自己生产数据自己消费,有点怪怪的,所以我们这里使用了zookeeper,

但是在新的版本中

[root@backup01 kafka_2.12-2.4.1]# bin/kafka-console-consumer.sh --topic __consumer_offsets zookeeper backup01:2181 --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --consumer.config config/consumer.properties frombeginning
Missing required argument "[bootstrap-server]"

我们从consumer.properties配置文件也可以只是这里用的是bootstrap.servers 

Missing required argument "[bootstrap-server]"那我们不能用zookeeper了

[root@backup01 kafka_2.12-2.4.1]# bin/kafka-console-consumer.sh --topic __consumer_offsets --bootstrap-server backup01:9092 --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --consumer.config config/consumer.properties frombeginning

key value形式      [test-consumer-group,__consumer_offsets,3]的hash值我们不知道,但是程序自己是能找到那个分区(50个分区中)能确定唯一的offset

假如一个消费者组的的其中一个消费者挂了,其他消费者怎么能拿到offset,接着消费呢?

因为是同一个组的,挂了之后会通知其他消费者会带着同一个组名来继续消费

我们可以发现大概1秒钟更新一次,这个提交是速度是可以改的

补充一点消费者组kafka的命令操作:

我们通过其它方式查看在kafka消费者组的信息  ,查看consumer group列表,使用--list参数

查看consumer group列表有新、旧两种命令,分别查看新版(信息保存在broker中)consumer列表和老版(信息保存在zookeeper中)consumer列表,因而需要区分指定bootstrap--server和zookeeper参数:

注意:在新的版本中老版本的查询方式已经移除,这里只提供新版本的查询方式

bin/kafka-consumer-groups.sh new--consumer --bootstrap-server backup01:9092 --list

使用kafka的bin目录下面的kafka-consumer-groups.sh命令可以查看offset消费情况,注意,如果你的offset是存在kafka集群上的,就指定kafka服务器的地址bootstrap-server:

 ./bin/kafka-consumer-groups.sh --bootstrap-server backup01:9092 --describe --group console-consumer-63897
GROUP                  TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                                            HOST            CLIENT-ID
console-consumer-63897 bigdata         0          -               3               -               consumer-console-consumer-63897-1-255b7119-e051-4ee1-ad64-d44581be1d20 /192.168.0.120  consumer-console-consumer-63897-1
console-consumer-63897 bigdata         1          -               2               -               consumer-console-consumer-63897-1-255b7119-e051-4ee1-ad64-d44581be1d20 /192.168.0.120  consumer-console-consumer-63897-1
[root@backup01 kafka_2.12-2.4.1]# ./bin/kafka-consumer-groups.sh --bootstrap-server backup01:9092 --describe --group console-consumer-69882GROUP                  TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                                            HOST            CLIENT-ID
console-consumer-69882 bigdata         0          -               3               -               consumer-console-consumer-69882-1-7df49099-2e83-4871-8468-3279b3213edb /192.168.0.120  consumer-console-consumer-69882-1
console-consumer-69882 bigdata         1          -               2               -               consumer-console-consumer-69882-1-7df49099-2e83-4871-8468-3279b3213edb /192.168.0.120  consumer-console-consumer-69882-1

补充说明:

关于这个__consumer_offsets(后续再说,这里大概了解下)

抛出问题:

__consumer_offsets这个topic是由kafka自动创建的,默认50个,但是都存在一台kafka服务器上,这是不是就存在很明显的单点故障? 经测试,如果将存储consumer_offsets的这台机器kill掉,所有的消费者都停止消费了。请问这个问题是怎么解决的呢?

原因分析:

由于__consumer_offsets这个用于存储offset的分区是由kafka服务器默认自动创建的,那么它在创建该分区的时候,分区数和副本数的依据是什么? 分区数是固定的50,这个没什么可怀疑的,副本数呢?应该是一个默认值1,依据是,如果我们没有在server.properties文件中指定topic分区的副本数的话,它的默认值就是1。

__consumer_offsets是一个非常重要的topic,我们怎么能允许它只有一个副本呢?这样就存在单点故障,也就是如果该分区所在的集群宕机了的话,
我们的消费者就无法正常消费数据了。

我在笔记本上搭建了kafka集群,共3个Broker,来解决这个问题。下面是一些记录。

 说明:如果你的__consumer_offsets这个topic已经被创建了,而且只存在一台broker上,如果你直接使用命令删除这个topic是会报错了,提示这是kafka内置的topic,禁止删除。可以在zookeeper中删除(我是使用ZooInspector这个客户端连上zookeeper,删除和__consumer_offsets这个topic有关的目录或节点)。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/576448.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

大数据技术之kafka (第 3 章 Kafka 架构深入 ) 消费者组案例

1&#xff09;需求&#xff1a;测试同一个消费者组中的消费者&#xff0c;同一时刻只能有一个消费者消费。 2&#xff09;案例实操 &#xff08;1&#xff09;在 backupo01、backupo02 上修改/usr/local/hadoop/kafka/kafka_2.12-2.4.1/config/consumer.properties 配置 文件…

大数据技术之kafka (第 3 章 Kafka 架构深入 ) 高效读写数据

1&#xff09;顺序写磁盘 Kafka 的 producer 生产数据&#xff0c;要写入到 log 文件中&#xff0c;写的过程是一直追加到文件末端&#xff0c; 为顺序写。官网有数据表明&#xff0c;同样的磁盘&#xff0c;顺序写能到 600M/s&#xff0c;而随机写只有 100K/s。这 与磁盘的机…

大数据技术之kafka (第 3 章 Kafka 架构深入) Zookeeper 在 Kafka 中的作用

Kafka 集群中有一个 broker 会被选举为 Controller&#xff0c;负责管理集群 broker 的上下线&#xff0c;所有 topic 的分区副本分配和 leader 选举等工作。 Controller 的管理工作都是依赖于 Zookeeper 的。 以下为 partition 的 leader 选举过程&#xff1a; Leader选举流…

LinkedList源码阅分析

LinkedList里面涉及到的一些操作&#xff0c;非常细致&#xff0c;以避免出现的空指针&#xff0c;理解后对于其优点与确定会有一个更加整体的认识吧。 继承关系图(对比ArrayList) 元素的存储结构 在LinkedList中&#xff0c;每一个元素都是Node存储&#xff0c;Node拥有一个存…

取消选中目标CALL

事先在游戏里选中一个怪物bp send,回到游戏里,按ESC&#xff0c;OD断下来,复制 返回到 elementc.072AFDD8 005869B2 返回到 elementc.005869B2 来自 elementc.0058E8A0072AFDEC 00588B1F 返回到 elementc.00588B1F 来自 elementc.00586980072AFE28 005A7346 返回到 el…

《深入理解java虚拟机》第1章 走近Java

1.6实战:自己编译JDK 想要一探JDK内部的实现机制&#xff0c;最便捷的路径之一就是自己编译- -套JDK,通过阅读和跟踪调试JDK源码去了解Java技术体系的原理&#xff0c;虽然门槛会高一点&#xff0c;但肯定会比阅读各种书籍、文章更加贴近本质。另外&#xff0c;JDK中的很多底层…

《深入理解java虚拟机》第2章 Java内存区域与内存溢出异常

Java与C之间有一堵由内存动态分配和垃圾收集技术所围成的“高墙”&#xff0c;墙外面的人想进去&#xff0c;墙里面的人却想出来。 2.1 概述 https://blog.csdn.net/q5706503/article/details/84640762 对于从事C、C程序开发的开发人员来说&#xff0c;在内存管理领域&#…

线性表的定义和基本运算之线性结构

一、线性表的逻辑定义和性质 线性表是最简单和最常用的一种数据结构&#xff0c;他是由n个数据元素&#xff08;结点&#xff09;a1,a2,a3,a4........an组成的有限序列。其中&#xff0c;数据元素个数那位表的长度。当n为0时称为空表&#xff0c;非空的线性表通常记为 &#x…

数据结构之指针复习

废话不多说&#xff0c;拿起键盘就是干&#xff0c;直接上代码&#xff1a; #include <stdio.h>int main() {double *p;double x 66.6;p &x; //x占8个字节&#xff0c;一个字节占8位&#xff0c;一个字节一个地址double arr[3] { 1.1,2.2,3.3 };double *q;q &a…

数据结构之结构体复习

为什么出现结构体&#xff1f; 为了表示一些复杂的数据&#xff0c;一些基本数据类型无法满足要求&#xff0c; 当要用一个变量描述一个对象的多个属性时&#xff0c;普通的内置数据类型是表示不了的&#xff0c;这个时候就可以用结构体回。结构体和类很相似&#xff0c;唯一不…

高效管理读书笔记

高效管理读书笔记一、优秀的权威宣言二、主要的内容要点2.1 有权威的领导都会关心自己的员工2.2 问责而不指责2.3 多点尤达&#xff0c;少点超人三、原书一、优秀的权威宣言 优秀的权威就是&#xff1a; 指出大部分人视而不见的问题的气质今天畅所欲言而不是空等明天的好心【…

蒙特卡罗方法介绍(一)

蒙特卡罗方法介绍(一) 一、蒙特卡罗方法的基本思想和解题步骤 1.1 蒙特卡罗方法的基本思想 蒙特卡罗方法也称随机模拟法、随机抽样技术或统计实验发&#xff0c;其基本思想是&#xff1a;为了求解数学、物理、工程技术或生产管理等方面的问题。首先&#xff0c;建立一个与求…

神策数据张涛:如何让用户标签价值落地?

本文根据神策数据副总裁张涛在《用户个性化运营—标签体系搭建新机遇》主题沙龙中演讲整理所得。 标签系统&#xff0c;在企业中已不是什么“高大上”的说辞。然而让用户标签价值真正落地企业不多&#xff0c;就像“青少年谈性”&#xff0c; 有一段话形容得再贴切不过&#xf…

蒙特卡罗方法介绍( 二)

蒙特卡罗方法介绍( 二) 一、蒙特卡罗求解定积分 蒙特卡洛方法求解定积分有两种方法&#xff0c;一种是上一节中讲的投点法&#xff0c;另外一种是期望法&#xff08;也称平均值法&#xff09;。 1.1 投点法 给出如下曲线f(x)f(x)f(x),求f(x)f(x)f(x)在a,ba,ba,b上的积分&am…

大数据技术之kafka (第 3 章 Kafka 架构深入) 分区策略在分析

如果不懂分区策略请看我之前的文章&#xff1a;https://blog.csdn.net/ywl470812087/article/details/105328015 默认的方式我们采用的是Range策略方式&#xff08;按主题给消费者消费&#xff0c;主题被谁订阅了就谁消费&#xff09; 先看下下面这个图&#xff0c;画的很丑&a…

如何达成目标笔记

如何达成目标 一、本书主要内容 推荐序一 升级你的行动工具箱 推荐序二 人们可以改变 引言 成功者和自制力的悖论 //004 自制力到底是怎样的 //007 你能做什么 //009 本书的主题 //011 1.1 准备就绪 第1章 你明白自己去往哪里吗 别说“做到最好” //017 大局与细节 //…

大数据技术之 Kafka (第 4 章 Kafka API ) Producer API

4.1.1 消息发送流程 Kafka 的 Producer 发送消息采用的是异步发送的方式。在消息发送的过程中&#xff0c;涉及到了两个线程——main 线程和 Sender 线程&#xff0c;以及一个线程共享变量——RecordAccumulator。main 线程将消息发送给 RecordAccumulator&#xff0c;Sender…

《关键对话——何谓关键对话》读书笔记(一)

《关键对话——何谓关键对话》读书笔记&#xff08;一&#xff09; 利用假期的时间&#xff0c;将关键对话阅读了一遍&#xff0c;书中提到的观点&#xff0c;方法&#xff0c;场景等很适合我目前处的状态&#xff0c;有的时候读起来仿佛就是自己身临其境&#xff0c;有种感同身…

从java读取Excel继续说大道至简 .

在上一篇博客《从复杂到简单&#xff0c;大道至简》中说道我们要把复杂的问题简单化&#xff0c;也就是要把问题细分&#xff0c;让大问题变成小问题&#xff0c;这样解决起来会相对容易&#xff0c;当我们把容易的小问题解决掉了&#xff0c;大问题自动就会迎刃而解。 所以今天…

推荐算法工程师的成长之道

推荐算法工程师的成长之道 原创&#xff1a; gongyouliu 大数据与人工智能 3月20日 源链接&#xff1a;原文地址 本文&#xff0c;作者会基于自己的实践经验讲述推荐算法工程师的成长之道&#xff0c;这里的“道”有发展路径和道(道理、方法论、经验、智慧)两层意思。 所以本文…