记录
命令(终端操作kafka)
# 验证kafka是否启动
ps -ef | grep kafka # ps -ef 命令用于显示所有正在运行的进程的详细信息
lsof -i :9092# 启动kafka
brew services start zookeeper
brew services start kafka# 创建topic
kafka-topics --create --topic test --partitions 1 --replication-factor 1 --bootstrap-server localhost:9092
解释:kafka-topics:用于管理主题。–create:创建一个新的主题。–topic test:主题的名称为 test。–partitions 1:有 1 个分区(partition)。–replication-factor 1:主题的副本因子为 1。表示没有冗余,数据仅存储在一个节点上。–bootstrap-server localhost:9092:localhost:9092 表示 Kafka 服务器运行在本地主机的 9092 端口。# 查看主题
kafka-topics --list --bootstrap-server localhost:9092#订阅(消费者) 新建一个终端,输入
kafka-console-consumer --bootstrap-server localhost:9092 --topic test --from-beginning#发布(生产者) 新建一个终端,输入
kafka-console-producer --bootstrap-server localhost:9092 --topic test# 删除Topic
kafka-topics --delete --topic test --bootstrap-server localhost:9092
代码操作Kafka
简单版本
生产者:
packagemainimport("github.com/IBM/sarama""log"
)funcmain() {config := sarama.NewConfig()config.Producer.RequiredAcks = sarama.WaitForAllconfig.Producer.Retry.Max = 5config.Producer.Return.Successes = trueproducer, err := sarama.NewSyncProducer([]string{"localhost:9092"}, config)iferr != nil {log.Fatalf("Failed to start producer: %s", err)}deferproducer.Close()msg := &sarama.ProducerMessage{Topic: "test_topic",Value: sarama.StringEncoder("Hello, Kafka!"),}partition, offset, err := producer.SendMessage(msg)iferr != nil {log.Fatalf("Failed to send message: %s", err)}log.Printf("Message is stored in topic(%s)/partition(%d)/offset(%d)\n", "test_topic", partition, offset)
}
消费者
package mainimport ("fmt""github.com/IBM/sarama""log"
)func main() {config := sarama.NewConfig()config.Consumer.Return.Errors = true// 创建消费者consumer, err := sarama.NewConsumer([]string{"localhost:9092"}, config)if err != nil {log.Fatalf("Failed to start consumer: %s", err)}defer consumer.Close()// 订阅 Kafka 主题partitionConsumer, err := consumer.ConsumePartition("test_topic", 0, sarama.OffsetNewest)if err != nil {log.Fatalf("Failed to start partition consumer: %s", err)}defer partitionConsumer.Close()// 消费消息for msg := range partitionConsumer.Messages() {log.Printf("Consumed message: %s, from partition(%d), offset(%d)\n", string(msg.Value), msg.Partition, msg.Offset)}
}
多点配置版本
生产者
packagemainimport("fmt""github.com/IBM/sarama""log""time"
)funcmain() {// 配置生产者config := sarama.NewConfig()config.Producer.RequiredAcks = sarama.WaitForAll // 等待所有副本确认config.Producer.Retry.Max = 5 // 最大重试次数config.Producer.Return.Successes = true // 返回成功的消息config.Producer.Return.Errors = true // 返回失败的消息config.Producer.Timeout = 10 * time.Second // 设置生产者的超时时间config.Net.MaxOpenRequests = 5 // 控制最大请求数config.Version = sarama.V2_8_0_0 // 配置 Kafka 版本(可根据实际 Kafka 版本调整)// 创建生产者实例producer, err := sarama.NewSyncProducer([]string{"localhost:9092"}, config)iferr != nil {log.Fatalf("Failed to start producer: %s", err)}deferproducer.Close()// 循环发送消息fori := 1; ; i++ {// 构造消息msg := &sarama.ProducerMessage{Topic: "test_topic", // 目标主题Value: sarama.StringEncoder(fmt.Sprintf("Message #%d: Hello, Kafka! www.zpf0000.com", i)), // 动态生成消息内容}// 发送消息partition, offset, err := producer.SendMessage(msg)iferr != nil {// 错误处理:打印错误并继续发送下一条消息log.Printf("Failed to send message: %s", err)continue}// 成功发送消息后记录日志log.Printf("Message #%d is stored in topic(%s)/partition(%d)/offset(%d)", i, "test_topic", partition, offset)// 模拟消息生产间隔(例如每秒发送一条消息)time.Sleep(1 * time.Second)}
}
消费者
packagemainimport("github.com/IBM/sarama""log""os""os/signal""syscall""time"
)funcmain() {// 配置消费者config := sarama.NewConfig()config.Consumer.Return.Errors = true // 启用错误返回config.Consumer.Offsets.Initial = sarama.OffsetNewest // 从最新消息开始消费config.Version = sarama.V2_8_0_0 // 配置 Kafka 版本(可根据实际 Kafka 版本调整)// 创建 Kafka 消费者实例consumer, err := sarama.NewConsumer([]string{"localhost:9092"}, config)iferr != nil {log.Fatalf("Failed to start consumer: %s", err)}deferconsumer.Close()// 订阅主题的分区partitionConsumer, err := consumer.ConsumePartition("test_topic", 0, sarama.OffsetNewest)iferr != nil {log.Fatalf("Failed to start partition consumer: %s", err)}deferpartitionConsumer.Close()// 用于捕获系统信号(例如 Ctrl+C),在接收到信号时优雅地退出sigChan := make(chanos.Signal, 1)signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)// 用来跟踪消费者的状态,确保及时处理错误gofunc() {forerr := rangepartitionConsumer.Errors() {log.Printf("Error: %s", err.Error())}}()// 监听消息并处理log.Println("Consumer is ready, waiting for messages...")for{select{casemsg := <-partitionConsumer.Messages():// 打印收到的消息log.Printf("Received message: %s, from partition(%d), offset(%d)\n", string(msg.Value), msg.Partition, msg.Offset)// 处理消息(可以根据需求扩展处理逻辑)// 模拟消息处理时间time.Sleep(500 * time.Millisecond) // 例如处理消息需要 500 毫秒// 在这里,可以对消息进行确认或其他操作,例如处理完消息后将其存入数据库等case<-sigChan:// 捕获到退出信号,优雅退出log.Println("Received shutdown signal, exiting...")return}}
}
链接
https://cloud.tencent.com/developer/article/1547380 # 优质博客一篇
https://kafka1x.apachecn.org/documentation.html#producerapi # 官方中文文档