Kafka 提供了一系列脚本用于命令行来操作 kafka。
1 Topic 操作
1.1 创建 Topic
创建一个名为 oldersix-topic 的 topic,副本数设置为3,分区数设置为2:
bin/kafka-topics.sh \
--create \
--zookeeper 192.168.31.162:2181 \
--replication-factor 3 \
--partitions 2 \
--topic oldersix-topic
1.2 查看下topic的情况
bin/kafka-topics.sh \
--describe \
--zookeeper 192.168.31.162:2181 \
--topic oldersix-topic
我们来看下输出内容的解释,第一行是所有分区的概要信息,之后的每一行表示每一个partition的信息。
- leader 节点负责给定 partition 的所有读写请求,同一个主题不同分区 leader 副本一般不一样(为了容灾)。
- replicas 表示某个 partition 在哪几个 broker 上存在备份。不管这个几点是不是”leader“,甚至这个节点挂了,也会列出。
- isr 是 replicas 的一个子集,它只列出当前还存活着的,并且已同步备份了该 partition 的节点。
2 生产消息 Producers
Kafka 生产者将消息发送到 topic 中去,同时负责选择将 message 发送到 topic 的哪一个partition中。通过 round-robin 做简单的负载均衡。也可以根据消息中的某一个关键字来进行区分。通常第二种方式使用的更多。
我们向新建的 oldersix-topic 中发送一些 message,kafka集群可以加上所有kafka节点
bin/kafka-console-producer.sh \
--broker-list 192.168.31.162:9092,192.168.31.162:9093,192.168.31.162:9094 \
--topic oldersix-topic
3 消费消息 Consumers
3.1 消费者组
传统的消息传递模式有2种:队列( queue) 和(publish-subscribe)
- queue模式:多个consumer从服务器中读取数据,消息只会到达一个consumer。
- publish-subscribe模式:消息会被广播给所有的consumer。
Kafka基于这2种模式提供了一种 consumer 的抽象概念:consumer group。
- queue模式:所有的 consumer 都位于同一个consumer group 下。
- publish-subscribe模式:所有的consumer都有着自己唯一的consumer group。
上图说明:由2个broker组成的kafka集群,某个主题总共有4个partition(P0-P3),分别位于不同的broker上。这个集群由2个Consumer Group消费, Consumer Group A 有2个consumer instances ,Consumer Group B有4个。
通常一个 topic 会有几个 Consumer Group ,每个 Consumer Group 都是一个逻辑上的订阅者( logical subscriber )。每个 Consumer Group 由多个 Consumer Instance 组成,从而达到可扩展和容灾的功能。
同一 Partion 的一条消息只能被同一个 Consumer Group 内的一个 Consumer 消费,但多个Consumer Group可同时消费这一消息。
3.2 消费顺序
一个 Partition 同一个时刻在一个 Consumer Group 中只能有一个 Consumer Instance 在消费,从而保证消费顺序。
Consumer Group 中的 Consumer Instance 的数量不能比一个 Topic 中的 partition 的数量多,否则,多出来的consumer消费不到消息。
Kafka 只在 Partition 的范围内保证消息消费的局部顺序性,不能在同一个 topic 中的多个partition中保证总的消费顺序性。
如果有在总体上保证消费顺序的需求,那么我们可以通过将 topic 的 partition 数量设置为1,将consumer group中的consumer instance数量也设置为1,但是这样会影响性能,所以kafka的顺序消费很少用。
3.3 Kafka 消费消息
bin/kafka-console-consumer.sh \
--bootstrap-server 192.168.31.162:9092,192.168.31.162:9093,192.168.31.162:9094 \
--from-beginning --topic oldersix-topic
可以看到,在消费端,我们已经消费到了 Producer 发送的消息。
4 Kafka 容错性
Kafka 的 commit log 的 partitions 分布在 kafka 集群中不同的 broker 上,每个 broker 都可以请求备份其他 broker 上 partition 上的数据。kafka 集群支持配置一个 partition 备份的数量。
针对每个 partition,都有一个 broker 起到 “leader” 角色作用,0 个或多个其他的 broker 作为“follwers”角色的作用。leader 处理所有的针对这个 partition 的读写请求,而 followers 被动复制 leader 的结果,不提供读写(主要是为了保证多副本数据与消费的一致性)。如果这个 leader 失效了,其中的一个 follower 将会自动的变成新的leader。
现在我们来测试我们容错性,因为 broker1目前是 oldersix-topic 的分区 0 的leader,所以我们要将其kill。
# 查看 broker 1 进程号
ps -ef | grep server.properties
# kill 进程
kill 2346
可以看到 zookeeepr 的节点中已经没有 broker1 了。
现在再执行命令:
bin/kafka-topics.sh \
--describe --zookeeper 192.168.31.162:2181 \
--topic oldersix-topic
我们可以看到,分区 0 的 leader 节点已经变成了broker 2。要注意的是,在 Isr 中,已经没有了 1 号 broker 节点。leader的选举也是从ISR(in-sync replica)中进行的。
此时,我们依然可以 消费新消息:
bin/kafka-console-consumer.sh \
--bootstrap-server 192.168.31.162:9092,192.168.31.162:9093,192.168.31.162:9094 \
--from-beginning --topic oldersix-topic
查看主题分区对应的 leader 信息:
get /brokers/topics/oldersix-topic/partitions/1/state
kafka 将很多集群关键信息记录在 zookeeper 里,保证自己的无状态,从而在水平扩容时非常方便。