kafka中文文档 本文环境:ubuntu:18.04
kafka安装、配置与基本使用(单节点)
安装kafka
下载 0.10.0.1版本并解压缩
> tar -xzf kafka_2.11-0.10.0.1.tgz
> cd kafka_2.11-0.10.0.1.tgz
kafka简单配置
> vi config/server.properties主要注意三个地方:broker.id: 标识本机log.dirs: 是kafka接收消息存放路径zookeeper.connect: 指定连接的zookeeper集群地址
启动服务器
启动ZooKeeper服务器:
如果没有ZooKeeper服务器,可以通过与kafka打包在一起的便捷脚本来创建一个单节点ZooKeeper实例:
> bin/zookeeper-server-start.sh config/zookeeper.properties
> #后台启动: bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
[2021-02-22 15:01:37,495] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
...
启动Kafka服务器:
> bin/kafka-server-start.sh config/server.properties
> #后台启动: bin/kafka-server-start.sh -daemon config/server.properties
[2021-02-22 15:01:47,028] INFO Verifying properties (kafka.utils.VerifiableProperties)
[2021-02-22 15:01:47,051] INFO Property socket.send.buffer.bytes is overridden to 1048576 (kafka.utils.VerifiableProperties)
...
创建一个 topic
创建一个名为“test”的topic,它有一个分区和一个副本:
> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
查看所有 topic列表:
> bin/kafka-topics.sh --list --zookeeper localhost:2181
test
删除一个 topic (标记):
> bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic test
发送一些消息
Kafka自带一个命令行客户端,它从文件或标准输入中获取输入,并将其作为message(消息)发送到Kafka集群。默认情况下,每行将作为单独的message发送。
运行 producer,然后在控制台输入一些消息以发送到服务器。
> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
This is a message
This is another message
启动一个 consumer
Kafka 还有一个命令行consumer(消费者),将消息转储到标准输出。
> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
This is a message
This is another message
查看消费组
查看所有消费组
> bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list --new-consumer
查看单个消费组的消费详情
> bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group test(消费组name) --new-consumer
发布-订阅模式
Topic:
-
Kafka中的Topics总是多订阅者模式,一个topic可以拥有一个或者多个消费者来订阅它的数据
-
对于每一个topic, Kafka集群都会维持一个分区日志,每个分区都是有序且顺序不可变的记录集,并且不断地追加到结构化的commitlog文件
-
分区中的每一个记录都会分配一个id号来表示顺序,我们称之为 offset,offset用来唯一的标识分区中每一条记录
-
Kafka 集群保留所有发布的记录—无论他们是否已被消费—并通过一个可配置的参数——保留期限来控制.
-
每一个消费者中唯一保存的元数据是offset(偏移量)即消费在log中的位置.
偏移量由消费者所控制:通常读取记录后,消费者会以线性的方式增加偏移量,但是实际上,由于这个位置由消费者控制,所以消费者可以采用任何顺序来消费记录.
生产者
生产者可以将数据发布到所选择的topic(主题)中
消费者
-
消费者使用一个 消费组 名称来进行标识,发布到topic中的每条记录被分配给 订阅消费组 中的一个消费者实例.
-
如果所有的消费者实例在同一消费组中,消息记录会负载平衡到每一个消费者实例
-
如果所有的消费者实例在不同的消费组中,每条消息记录会广播到所有的消费者进程
消费者消费是以消费组为单位,消费组之间数据互不影响,组内数据均分
Go实现(Go 1.15)
kafka_p.go
package kafka_pimport ("github.com/Shopify/sarama""log""time"
)// 定义Kafka生产者对象
type KafkaProducer struct {Config *sarama.ConfigAddress []stringTopic string
}// 定义生产者接口
type Producer interface {MsgContent() []byte
}//实例化sarama: Config
func (kP *KafkaProducer) ProducerConfigInit() {config := sarama.NewConfig()//是否开启消息发送成功后通知 successes channelconfig.Producer.Return.Successes = true//重试次数config.Producer.Retry.Max = 3//失败后再次尝试的间隔时间config.Producer.Retry.Backoff = 1 * time.Second//指定kafka版本,不指定,使用最小版本,高版本的新功能可能无法正常使用.config.Version = sarama.V0_10_0_1kP.Config = config
}//同步消息模式
func (kP *KafkaProducer) SyncProducer(p Producer) error {//初始化客户端producer, err := sarama.NewSyncProducer(kP.Address, kP.Config)if err != nil {log.Printf("sarama.NewSyncProducer err, message=%s \n", err)return err}defer producer.Close()//发送消息msg := &sarama.ProducerMessage{Topic: kP.Topic,Value: sarama.ByteEncoder(p.MsgContent()),}part, offset, err := producer.SendMessage(msg)if err != nil {log.Printf("send message(%s) err=%s \n", p.MsgContent(), err)return err} else {log.Printf("发送成功,partition=%d, offset=%d \n", part, offset)return nil}
}// 启动Kafka生产者
func (kP *KafkaProducer) StartP(p Producer) error {kP.ProducerConfigInit()return kP.SyncProducer(p)
}
kafka_c.go
package kafka_cimport ("github.com/Shopify/sarama"cluster "github.com/bsm/sarama-cluster""log""time"
)// 定义Kafka消费者对象
type KafkaConsumer struct {Config *cluster.ConfigAddress []stringTopics []stringGroupId string
}// 定义消费者接口
type Consumer interface {Consumer([]byte) error
}//实例化sarama: Config
func (kC *KafkaConsumer) ConsumerConfigInit() {config := cluster.NewConfig()//接收失败通知config.Consumer.Return.Errors = true//设置使用的kafka版本,如果低于V0_10_0_0版本,消息中的timestrap没有作用.需要消费和生产同时配置config.Version = sarama.V0_10_0_1//初始从最新的offset开始config.Consumer.Offsets.Initial = sarama.OffsetNewestkC.Config = config
}//cluster消费组 消费
func (kC *KafkaConsumer) ClusterConsumer(c Consumer) error {//初始化客户端consumer, err := cluster.NewConsumer(kC.Address, kC.GroupId, kC.Topics, kC.Config)if err != nil {log.Printf("Failed open consumer: %s \n", err)return err}defer consumer.Close()go func() {for err := range consumer.Errors() {log.Printf("consumer err: %s \n", err)}}()go func() {for note := range consumer.Notifications() {log.Println("Rebalanced: ", note)}}()//接收消息for msg := range consumer.Messages() {if err = c.Consumer(msg.Value); err != nil {return nil}else {// MarkOffset 并不是实时写入kafka,有可能在程序crash时丢掉未提交的offsetconsumer.MarkOffset(msg, "")time.Sleep(1 * time.Second)}}return nil
}// 启动Kafka消费者
func (kC *KafkaConsumer) StartC(c Consumer) error {kC.ConsumerConfigInit()return kC.ClusterConsumer(c)
}
配置kafka_config.go
package kafka_configimport "fmt"// Address
var Address = []string{"IP:9092"}var Topic = "test" // 生产者topic
var Topics = []string{"test"} // 消费者topics
var GroupId = "test-group-1" // 消费组id//发送者
type TestP struct {MsgData []byte
}// 实现发送者
func (t *TestP) MsgContent() []byte {fmt.Println(string(t.MsgData))return t.MsgData
}//接收者
type TestC struct {MsgData string
}// 实现接收者
func (t *TestC) Consumer(dataByte []byte) error {fmt.Println(string(dataByte))return nil
}type KafkaMessage struct {FileDir string `json:"dir"`FileName string `json:"file"`OperateType string `json:"operation"`OldData string `json:"old_data"`NewData string `json:"new_data"`
}type KafkaMessageP struct {UserName string `json:"user_name"`MsgId string `json:"msg_id"`Messages []KafkaMessage `json:"msg"`
}
消费者main.go
package mainimport ("golong/kafka_c""golong/kafka_config"
)func main() {//消费者c := &kafka_config.TestC{}kC := &kafka_c.KafkaConsumer{Address: kafka_config.Address,Topics: kafka_config.Topics,GroupId: kafka_config.GroupId,}_ = kC.StartC(c)
}
生产者main.go
package mainimport ("encoding/json""fmt""golong/kafka_config""golong/kafka_p"
)func main() {//生产者msgKp := &kafka_config.KafkaMessageP{}msgKp.UserName = "fxm"msgKpBytes, _ := json.Marshal(msgKp)p := &kafka_config.TestP{MsgData: msgKpBytes,}kP := &kafka_p.KafkaProducer{Address: kafka_config.Address,Topic: kafka_config.Topic,}err := kP.StartP(p)if err != nil {fmt.Println("添加异常 !!!")}
}
panic: non-positive interval for NewTicker 问题处理
//处理1: 找到这个consumer.go源码位置,上面的第二个报错有标注位置github.com/bsm/sarama-cluster.(*Consumer).cmLoop(0xc000212000, 0xc0002ba1e0)D:/work/mygo/pkg/mod/github.com/bsm/sarama-cluster@v2.1.15+incompatible/consumer.go:452 +0x61// 修改452行,// ticker := time.NewTicker(c.client.config.Consumer.Offsets.CommitInterval)ticker := time.NewTicker(c.client.config.Consumer.Offsets.AutoCommit.Interval)// 保存重新build即可//处理2:把 sarama 版本改成 从 v1.26.1 --> v1.24.1 就可以用啦 github.com/Shopify/sarama v1.24.1gomod 的配置改下版本号就可以github.com/Shopify/sarama v1.24.1github.com/bsm/sarama-cluster v2.1.15+incompatible