启动kafka
[root@localhost kafka_2.12-2.5.1] # [kube:] cat start_zk.sh
./bin/zookeeper-server-start.sh config/zookeeper.properties > /dev/null &
[root@localhost kafka_2.12-2.5.1] # [kube:] cat start_kafka.sh
./bin/kafka-server-start.sh config/server.properties > /dev/null &[root@localhost kafka_2.12-2.5.1] # [kube:] netstat -lntup| grep java
tcp6 0 0 :::9092 :::* LISTEN 8458/java
tcp6 0 0 :::54948 :::* LISTEN 8104/java
tcp6 0 0 :::2181 :::* LISTEN 8104/java
tcp6 0 0 :::60903 :::* LISTEN 8458/java
创建 topic
创建主题
bin/kafka-topics.sh --create --zookeeper 127.0.0.1:2181 --replication-factor 1 --partitions 3 --topic test查看主题信息
bin/kafka-topics.sh --describe --zookeeper 127.0.0.1:2181 --topic test生产者生产消息
bin/kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic test
package mainimport ("context""fmt""github.com/Shopify/sarama"
)func main() {config := sarama.NewConfig()config.Version = sarama.V2_0_0_0 // kafka消费者组最低版本 V0_10_2_0config.Consumer.Offsets.Initial = sarama.OffsetNewestconfig.Consumer.Offsets.AutoCommit.Enable = false // 关闭自动提交config.Consumer.Return.Errors = true// config.Consumer.Group.Rebalance.Strategy = &balanceStrategy{} 支持自定义消费者重平衡策略group, err := sarama.NewConsumerGroup([]string{"localhost:9092"}, "g1", config)if err != nil {panic(err)}defer func() { _ = group.Close() }()// Track errorsgo func() {for err := range group.Errors() {fmt.Println("ERROR", err)}}()// Iterate over consumer sessions.ctx := context.Background()for {// `Consume` should be called inside an infinite loop, when a// server-side rebalance happens, the consumer session will need to be// recreated to get the new claimserr := group.Consume(ctx, []string{"test"}, &exampleConsumerGroupHandler{})if err != nil {panic(err)}}
}type exampleConsumerGroupHandler struct{}func (exampleConsumerGroupHandler) Setup(se sarama.ConsumerGroupSession) error {fmt.Printf("Setup %q %+v\n", se.MemberID(), se.Claims())return nil
}
func (exampleConsumerGroupHandler) Cleanup(se sarama.ConsumerGroupSession) error {fmt.Printf("Cleanup %q %+v\n", se.MemberID(), se.Claims())return nil
}
func (h exampleConsumerGroupHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {for msg := range claim.Messages() {fmt.Printf("ConsumeClaim Message topic:%q partition:%d offset:%d ts:%s val:%s\n",msg.Topic, msg.Partition, msg.Offset, msg.Timestamp, msg.Value)sess.MarkMessage(msg, "")sess.Commit()}return nil
}
查看消费者组的消费情况
CURRENT-OFFSET:当前消费位移
LOG-END-OFFSET:最新可消费位移
LAG:消息堆积数量
LAG越大,堆积数越多
[root@localhost kafka_2.12-2.5.1] # [kube:] bin/kafka-consumer-groups.sh --describe --bootstrap-server 127.0.0.1:9092 --group g1 GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
g1 test 0 10 10 0 sarama-888ca2a0-05e9-43dd-a99a-6a02c8b10c00 /0:0:0:0:0:0:0:1 sarama
g1 test 1 15 15 0 sarama-888ca2a0-05e9-43dd-a99a-6a02c8b10c00 /0:0:0:0:0:0:0:1 sarama
g1 test 2 18 18 0 sarama-28558d3b-0c75-4b0d-a473-ced5e1b51fb3 /0:0:0:0:0:0:0:1 sarama
[root@localhost kafka_2.12-2.5.1] # [kube:]