如何实现消息幂等
设计幂等的消费逻辑的关键是确保每条消息只被处理一次,即使在网络故障或消费者重启的情况下。通常使用唯一的消息ID和持久化存储来记录处理过的消息ID。
实现步骤
- 连接kafka和redis
- 检查消息ID
- 处理消息
- 标记消息已处理
package mainimport ("context""crypto/md5""encoding/hex""fmt""github.com/confluentinc/confluent-kafka-go/kafka""github.com/go-redis/redis/v8""log""time"
)// 初始化Redis客户端
var ctx = context.Background()
var rdb = redis.NewClient(&redis.Options{Addr: "localhost:6379",Password: "", // no password setDB: 0, // use default DB
})// 计算消息的唯一ID(可以使用消息的内容或其他标识)
func calculateMessageID(message []byte) string {hash := md5.Sum(message)return hex.EncodeToString(hash[:])
}// 检查消息ID是否已处理
func isMessageProcessed(messageID string) bool {result, err := rdb.Get(ctx, messageID).Result()if err == redis.Nil {return false} else if err != nil {log.Fatalf("Failed to get message ID from Redis: %v", err)}return result == "processed"
}// 标记消息ID为已处理
func markMessageAsProcessed(messageID string) {err := rdb.Set(ctx, messageID, "processed", 0).Err()if err != nil {log.Fatalf("Failed to set message ID in Redis: %v", err)}
}// 处理消息的逻辑
func processMessage(message []byte) {// 在这里添加具体的消息处理逻辑fmt.Printf("Processing message: %s\n", string(message))
}
// 初始化Kafka消费者,读取消息,检查消息ID,处理未处理的消息,并将消息ID标记为已处理。
func main() {consumer, err := kafka.NewConsumer(&kafka.ConfigMap{"bootstrap.servers": "localhost","group.id": "myGroup","auto.offset.reset": "earliest",})if err != nil {log.Fatalf("Failed to create consumer: %v", err)}defer consumer.Close()consumer.Subscribe("myTopic", nil)for {msg, err := consumer.ReadMessage(-1)if err == nil {messageID := calculateMessageID(msg.Value)if !isMessageProcessed(messageID) {processMessage(msg.Value)markMessageAsProcessed(messageID)} else {fmt.Printf("Message %s already processed\n", messageID)}} else {fmt.Printf("Consumer error: %v (%v)\n", err, msg)}time.Sleep(1 * time.Second) // 可选:添加延迟以防止消息消费过快}
}