依赖Kafka的Go单元测试例解

Kafka[1]是Apache基金会开源的一个分布式事件流处理平台,是Java阵营(最初为Scala)中的一款杀手级应用,其提供的高可靠性、高吞吐量和低延迟的数据传输能力,让其到目前为止依旧是现代企业级应用系统以及云原生应用系统中使用的重要中间件。

在日常开发Go程序时,我们经常会遇到一些依赖Kafka的代码[2],如何对这些代码进行测试,尤其是单测是摆在Go开发者前面的一个现实问题!

有人说用mock,是个路子。但看过我的《单测时尽量用fake object[3]》一文的童鞋估计已经走在了寻找kafka fake object的路上了!Kafka虽好,但身形硕大,不那么灵巧。找到一个合适的fake object不容易。在这篇文章中,我们就来聊聊如何测试那些依赖kafka的代码,再往本质一点说,就是和大家以找找那些合适的kafka fake object。

1. 寻找fake object的策略

在《单测时尽量用fake object[4]》一文中,我们提到过,如果测试的依赖提供了tiny版本或某些简化版,我们可以直接使用这些版本作为fake object的候选,就像etcd提供了用于测试的自身简化版的实现(embed)[5]那样。

但Kafka并没有提供tiny版本,我们也只能选择《单测时尽量用fake object[6]》一文提到的另外一个策略,那就是利用容器来充当fake object,这是目前能搞到任意依赖的fake object的最简单路径了。也许以后WASI(WebAssembly System Interface)[7]成熟了,让wasm脱离浏览器并可以在本地系统上飞起,到时候换用wasm也不迟。

下面我们就按照使用容器的策略来找一找适合的kafka container。

2. testcontainers-go

我们第一站就来到了testcontainers-go[8]。testcontainers-go是一个Go语言开源项目,专门用于简化创建和清理基于容器的依赖项,常用于Go项目的单元测试、自动化集成或冒烟测试中。通过testcontainers-go提供的易于使用的API,开发人员能够以编程方式定义作为测试的一部分而运行的容器,并在测试完成时清理这些资源。

注:testcontainers[9]不仅提供Go API,它还覆盖了主流的编程语言,包括:Java、.NET、Python、Node.js、Rust[10]等。

在几个月之前,testcontainers-go[11]项目还没有提供对Kafka的直接支持,我们需要自己使用testcontainers.GenericContainer来自定义并启动kafka容器。2023年9月,以KRaft模式运行的Kafka容器才被首次引入testcontainers-go项目[12]

目前testcontainers-go使用的kafka镜像版本是confluentinc/confluent-local:7.5.0[13]。Confluent[14]是在kafka背后的那家公司,基于kafka提供商业化支持。今年初,Confluent还收购了Immerok,将apache的另外一个明星项目Flink招致麾下。

confluent-local[15]并不是一个流行的kafka镜像,它只是一个使用KRaft模式的零配置的、包含Confluent Community RestProxy的Apache Kafka,并且镜像是实验性的,仅应用于本地开发工作流,不应该用在支持生产工作负载。

生产中最常用的开源kafka镜像是confluentinc/cp-kafka镜像[16],它是基于开源Kafka项目构建的,但在此基础上添加了一些额外的功能和工具,以提供更丰富的功能和更易于部署和管理的体验。cp-kafka镜像的版本号并非kafka的版本号,其对应关系需要cp-kafka镜像官网查询。

另外一个开发领域常用的kafka镜像是bitnami的kafka镜像。Bitnami是一个提供各种开源软件的预打包镜像和应用程序栈的公司。Bitnami Kafka镜像是基于开源Kafka项目构建的,是一个可用于快速部署和运行Kafka的Docker镜像。Bitnami Kafka镜像与其内部的Kakfa的版本号保持一致。

下面我们就来看看如何使用testcontainers-go的kafka来作为依赖kafka的Go单元测试用例的fake object。

这第一个测试示例改编自testcontainers-go/kafka module的example_test.go:

// testcontainers/kafka_setup/kafka_test.gopackage mainimport ("context""fmt""testing""github.com/testcontainers/testcontainers-go/modules/kafka"
)func TestKafkaSetup(t *testing.T) {ctx := context.Background()kafkaContainer, err := kafka.RunContainer(ctx, kafka.WithClusterID("test-cluster"))if err != nil {panic(err)}// Clean up the containerdefer func() {if err := kafkaContainer.Terminate(ctx); err != nil {panic(err)}}()state, err := kafkaContainer.State(ctx)if err != nil {panic(err)}if kafkaContainer.ClusterID != "test-cluster" {t.Errorf("want test-cluster, actual %s", kafkaContainer.ClusterID)}if state.Running != true {t.Errorf("want true, actual %t", state.Running)}brokers, _ := kafkaContainer.Brokers(ctx)fmt.Printf("%q\n", brokers)
}

在这个例子中,我们直接调用kafka.RunContainer创建了一个名为test-cluster的kafka实例,如果没有通过WithImage向RunContainer传入自定义镜像,那么默认我们将启动一个confluentinc/confluent-local:7.5.0的容器(注意:随着时间变化,该默认容器镜像的版本也会随之改变)。

通过RunContainer返回的kafka.KafkaContainer我们可以获取到关于kafka容器的各种信息,比如上述代码中的ClusterID、kafka Broker地址信息等。有了这些信息,我们后续便可以与以容器形式启动的kafka建立连接并做数据的写入和读取操作了。

我们先来看这个测试的运行结果,与预期一致:

$ go test 
2023/12/16 21:45:52 github.com/testcontainers/testcontainers-go - Connected to docker: ... ...Resolved Docker Host: unix:///var/run/docker.sockResolved Docker Socket Path: /var/run/docker.sockTest SessionID: 19e47867b733f4da4f430d78961771ae3a1cc66c5deca083b4f6359c6d4b2468Test ProcessID: 41b9ef62-2617-4189-b23a-1bfa4c06dfec
2023/12/16 21:45:52 Creating container for image docker.io/testcontainers/ryuk:0.5.1
2023/12/16 21:45:53 Container created: 8f2240042c27
2023/12/16 21:45:53 Starting container: 8f2240042c27
2023/12/16 21:45:53 Container started: 8f2240042c27
2023/12/16 21:45:53 Waiting for container id 8f2240042c27 image: docker.io/testcontainers/ryuk:0.5.1. Waiting for: &{Port:8080/tcp timeout:<nil> PollInterval:100ms}
2023/12/16 21:45:53 Creating container for image confluentinc/confluent-local:7.5.0
2023/12/16 21:45:53 Container created: a39a495aed0b
2023/12/16 21:45:53 Starting container: a39a495aed0b
2023/12/16 21:45:53 Container started: a39a495aed0b
["localhost:1037"]
2023/12/16 21:45:58 Terminating container: a39a495aed0b
2023/12/16 21:45:58 Container terminated: a39a495aed0b
PASS
ok   demo 6.236s

接下来,在上面用例的基础上,我们再来做一个Kafka连接以及数据读写测试:

// testcontainers/kafka_consumer_and_producer/kafka_test.gopackage mainimport ("bytes""context""errors""net""strconv""testing""time""github.com/testcontainers/testcontainers-go/modules/kafka"kc "github.com/segmentio/kafka-go" // kafka client
)func createTopics(brokers []string, topics ...string) error {// to create topics when auto.create.topics.enable='false'conn, err := kc.Dial("tcp", brokers[0])if err != nil {return err}defer conn.Close()controller, err := conn.Controller()if err != nil {return err}var controllerConn *kc.ConncontrollerConn, err = kc.Dial("tcp", net.JoinHostPort(controller.Host, strconv.Itoa(controller.Port)))if err != nil {return err}defer controllerConn.Close()var topicConfigs []kc.TopicConfigfor _, topic := range topics {topicConfig := kc.TopicConfig{Topic:             topic,NumPartitions:     1,ReplicationFactor: 1,}topicConfigs = append(topicConfigs, topicConfig)}err = controllerConn.CreateTopics(topicConfigs...)if err != nil {return err}return nil
}func newWriter(brokers []string, topic string) *kc.Writer {return &kc.Writer{Addr:                   kc.TCP(brokers...),Topic:                  topic,Balancer:               &kc.LeastBytes{},AllowAutoTopicCreation: true,RequiredAcks:           0,}
}func newReader(brokers []string, topic string) *kc.Reader {return kc.NewReader(kc.ReaderConfig{Brokers:  brokers,Topic:    topic,GroupID:  "test-group",MaxBytes: 10e6, // 10MB})
}func TestProducerAndConsumer(t *testing.T) {ctx := context.Background()kafkaContainer, err := kafka.RunContainer(ctx, kafka.WithClusterID("test-cluster"))if err != nil {t.Fatalf("want nil, actual %v\n", err)}// Clean up the containerdefer func() {if err := kafkaContainer.Terminate(ctx); err != nil {t.Fatalf("want nil, actual %v\n", err)}}()state, err := kafkaContainer.State(ctx)if err != nil {t.Fatalf("want nil, actual %v\n", err)}if state.Running != true {t.Errorf("want true, actual %t", state.Running)}brokers, err := kafkaContainer.Brokers(ctx)if err != nil {t.Fatalf("want nil, actual %v\n", err)}topic := "test-topic"w := newWriter(brokers, topic)defer w.Close()r := newReader(brokers, topic)defer r.Close()err = createTopics(brokers, topic)if err != nil {t.Fatalf("want nil, actual %v\n", err)}time.Sleep(5 * time.Second)messages := []kc.Message{{Key:   []byte("Key-A"),Value: []byte("Value-A"),},{Key:   []byte("Key-B"),Value: []byte("Value-B"),},{Key:   []byte("Key-C"),Value: []byte("Value-C"),},{Key:   []byte("Key-D"),Value: []byte("Value-D!"),},}const retries = 3for i := 0; i < retries; i++ {ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)defer cancel()// attempt to create topic prior to publishing the messageerr = w.WriteMessages(ctx, messages...)if errors.Is(err, kc.LeaderNotAvailable) || errors.Is(err, context.DeadlineExceeded) {time.Sleep(time.Millisecond * 250)continue}if err != nil {t.Fatalf("want nil, actual %v\n", err)}break}var getMessages []kc.Messagefor i := 0; i < len(messages); i++ {m, err := r.ReadMessage(context.Background())if err != nil {t.Fatalf("want nil, actual %v\n", err)}getMessages = append(getMessages, m)}for i := 0; i < len(messages); i++ {if !bytes.Equal(getMessages[i].Key, messages[i].Key) {t.Errorf("want %s, actual %s\n", string(messages[i].Key), string(getMessages[i].Key))}if !bytes.Equal(getMessages[i].Value, messages[i].Value) {t.Errorf("want %s, actual %s\n", string(messages[i].Value), string(getMessages[i].Value))}}
}

我们使用segmentio/kafka-go这个客户端[17]来实现kafka的读写。关于如何使用segmentio/kafka-go这个客户端,可以参考我之前写的《Go社区主流Kafka客户端简要对比[18]》。

这里我们在TestProducerAndConsumer这个用例中,先通过testcontainers-go的kafka.RunContainer启动一个Kakfa实例,然后创建了一个topic: “test-topic”。我们在写入消息前也可以不单独创建这个“test-topic”,Kafka默认启用topic自动创建,并且segmentio/kafka-go的高级API:Writer也支持AllowAutoTopicCreation的设置。不过topic的创建需要一些时间,如果要在首次写入消息时创建topic,此次写入可能会失败,需要retry。

向topic写入一条消息(实际上是一个批量Message,包括四个key-value pair)后,我们调用ReadMessage从上述topic中读取消息,并将读取的消息与写入的消息做比较。

注:近期发现kafka-go的一个可能导致内存暴涨的问题[19],在kafka ack返回延迟变大的时候,可能触发该问题。

下面是执行该用例的输出结果:

$ go test
2023/12/17 17:43:54 github.com/testcontainers/testcontainers-go - Connected to docker: Server Version: 24.0.7API Version: 1.43Operating System: CentOS Linux 7 (Core)Total Memory: 30984 MBResolved Docker Host: unix:///var/run/docker.sockResolved Docker Socket Path: /var/run/docker.sockTest SessionID: f76fe611c753aa4ef1456285503b0935a29795e7c0fab2ea2588029929215a08Test ProcessID: 27f531ee-9b5f-4e4f-b5f0-468143871004
2023/12/17 17:43:54 Creating container for image docker.io/testcontainers/ryuk:0.5.1
2023/12/17 17:43:54 Container created: 577309098f4c
2023/12/17 17:43:54 Starting container: 577309098f4c
2023/12/17 17:43:54 Container started: 577309098f4c
2023/12/17 17:43:54 Waiting for container id 577309098f4c image: docker.io/testcontainers/ryuk:0.5.1. Waiting for: &{Port:8080/tcp timeout:<nil> PollInterval:100ms}
2023/12/17 17:43:54 Creating container for image confluentinc/confluent-local:7.5.0
2023/12/17 17:43:55 Container created: 1ee11e11742b
2023/12/17 17:43:55 Starting container: 1ee11e11742b
2023/12/17 17:43:55 Container started: 1ee11e11742b
2023/12/17 17:44:15 Terminating container: 1ee11e11742b
2023/12/17 17:44:15 Container terminated: 1ee11e11742b
PASS
ok   demo 21.505s

我们看到默认情况下,testcontainer能满足与kafka交互的基本需求,并且testcontainer提供了一系列Option(WithXXX)可以对container进行定制,以满足一些扩展性的要求,但是这需要你对testcontainer提供的API有更全面的了解。

除了开箱即用的testcontainer之外,我们还可以使用另外一种方便的基于容器的技术:docker-compose来定制和启停我们需要的kafka image[20]。接下来,我们就来看看如何使用docker-compose建立fake kafka object。

3. 使用docker-compose建立fake kafka

3.1 一个基础的基于docker-compose的fake kafka实例模板

这次我们使用bitnami提供的kafka镜像,我们先建立一个“等价”于上面“testcontainers-go”提供的kafka module的kafka实例,下面是docker-compose.yml:

// docker-compose/bitnami/plaintext/docker-compose.ymlversion: "2"services:kafka:image: docker.io/bitnami/kafka:3.6network_mode: "host"volumes:- "kafka_data:/bitnami"environment:# KRaft settings- KAFKA_CFG_NODE_ID=0- KAFKA_CFG_PROCESS_ROLES=controller,broker- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@localhost:9093# Listeners- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://:9092- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER- KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT# borrow from testcontainer- KAFKA_CFG_BROKER_ID=0- KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR=1- KAFKA_CFG_OFFSETS_TOPIC_NUM_PARTITIONS=1- KAFKA_CFG_TRANSACTION_STATE_LOG_MIN_ISR=1- KAFKA_CFG_GROUP_INITIAL_REBALANCE_DELAY_MS=0- KAFKA_CFG_LOG_FLUSH_INTERVAL_MESSAGES=9223372036854775807
volumes:kafka_data:driver: local

我们看到其中一些配置“借鉴”了testcontainers-go的kafka module,我们启动一下该容器:

$ docker-compose up -d
[+] Running 2/2✔ Volume "plaintext_kafka_data"  Created                                                                                    0.0s ✔ Container plaintext-kafka-1    Started                                                                                    0.1s

依赖该容器的go测试代码与前面的TestProducerAndConsumer差不多,只是在开始处去掉了container的创建过程:

// docker-compose/bitnami/plaintext/kafka_test.gofunc TestProducerAndConsumer(t *testing.T) {brokers := []string{"localhost:9092"}topic := "test-topic"w := newWriter(brokers, topic)defer w.Close()r := newReader(brokers, topic)defer r.Close()err := createTopics(brokers, topic)if err != nil {t.Fatalf("want nil, actual %v\n", err)}time.Sleep(5 * time.Second)... ...
}

运行该测试用例,我们看到预期的结果:

go test
write message ok  Value-A
write message ok  Value-B
write message ok  Value-C
write message ok  Value-D!
PASS
ok   demo 15.143s

不过对于单元测试来说,显然我们不能手动来启动和停止kafka container,我们需要为每个用例填上setup和teardown,这样也能保证用例间的相互隔离,于是我们增加了一个docker_compose_helper.go文件,在这个文件中我们提供了一些帮助testcase启停kafka的helper函数:

// docker-compose/bitnami/plaintext/docker_compose_helper.gopackage mainimport ("fmt""os/exec""strings""time"
)// helpler function for operating docker container through docker-compose commandconst (defaultCmd     = "docker-compose"defaultCfgFile = "docker-compose.yml"
)func execCliCommand(cmd string, opts ...string) ([]byte, error) {cmds := cmd + " " + strings.Join(opts, " ")fmt.Println("exec command:", cmds)return exec.Command(cmd, opts...).CombinedOutput()
}func execDockerComposeCommand(cmd string, cfgFile string, opts ...string) ([]byte, error) {var allOpts = []string{"-f", cfgFile}allOpts = append(allOpts, opts...)return execCliCommand(cmd, allOpts...)
}func UpKakfa(composeCfgFile string) ([]byte, error) {b, err := execDockerComposeCommand(defaultCmd, composeCfgFile, "up", "-d")if err != nil {return nil, err}time.Sleep(10 * time.Second)return b, nil
}func UpDefaultKakfa() ([]byte, error) {return UpKakfa(defaultCfgFile)
}func DownKakfa(composeCfgFile string) ([]byte, error) {b, err := execDockerComposeCommand(defaultCmd, composeCfgFile, "down", "-v")if err != nil { return nil, err}time.Sleep(10 * time.Second)return b, nil
}func DownDefaultKakfa() ([]byte, error) {return DownKakfa(defaultCfgFile)
}

眼尖的童鞋可能看到:在UpKakfa和DownKafka函数中我们使用了硬编码的“time.Sleep”来等待10s,通常在镜像已经pull到本地后这是有效的,但却不是最精确地等待方式,testcontainers-go/wait[21]中提供了等待容器内程序启动完毕的多种策略,如果你想用更精确的等待方式,可以了解一下wait包。

基于helper函数,我们改造一下TestProducerAndConsumer用例:

// docker-compose/bitnami/plaintext/kafka_test.go
func TestProducerAndConsumer(t *testing.T) {_, err := UpDefaultKakfa()if err != nil {t.Fatalf("want nil, actual %v\n", err)}t.Cleanup(func() {DownDefaultKakfa()})... ...
}

我们在用例开始处通过UpDefaultKakfa使用docker-compose将kafka实例启动起来,然后注册了Cleanup函数[22],用于在test case执行结束后销毁kafka实例。

下面是新版用例的执行结果:

$ go test
exec command: docker-compose -f docker-compose.yml up -d
write message ok  Value-A
write message ok  Value-B
write message ok  Value-C
write message ok  Value-D!
exec command: docker-compose -f docker-compose.yml down -v
PASS
ok   demo 36.402s

使用docker-compose的最大好处就是可以通过docker-compose.yml文件对要fake的object进行灵活的定制,这种定制与testcontainers-go的差别就是你无需去研究testcontiners-go的API。

下面是使用tls连接与kafka建立连接并实现读写的示例。

3.2 建立一个基于TLS连接的fake kafka实例

Kafka的配置复杂是有目共睹的,为了建立一个基于TLS连接,我也是花了不少时间做“试验”,尤其是listeners以及证书的配置,不下点苦功夫读文档还真是配不出来。

下面是一个基于bitnami/kafka镜像配置出来的基于TLS安全通道上的kafka实例:

// docker-compose/bitnami/tls/docker-compose.yml# config doc:  https://github.com/bitnami/containers/blob/main/bitnami/kafka/README.mdversion: "2"services:kafka:image: docker.io/bitnami/kafka:3.6network_mode: "host"#ports:#- "9092:9092"environment:# KRaft settings- KAFKA_CFG_NODE_ID=0- KAFKA_CFG_PROCESS_ROLES=controller,broker- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@localhost:9094# Listeners- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,SECURED://:9093,CONTROLLER://:9094- KAFKA_CFG_ADVERTISED_LISTENERS=SECURED://:9093- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,SECURED:SSL,PLAINTEXT:PLAINTEXT- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER- KAFKA_CFG_INTER_BROKER_LISTENER_NAME=SECURED# SSL settings- KAFKA_TLS_TYPE=PEM- KAFKA_TLS_CLIENT_AUTH=none- KAFKA_CFG_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM=# borrow from testcontainer- KAFKA_CFG_BROKER_ID=0- KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR=1- KAFKA_CFG_OFFSETS_TOPIC_NUM_PARTITIONS=1- KAFKA_CFG_TRANSACTION_STATE_LOG_MIN_ISR=1- KAFKA_CFG_GROUP_INITIAL_REBALANCE_DELAY_MS=0- KAFKA_CFG_LOG_FLUSH_INTERVAL_MESSAGES=9223372036854775807volumes:# server.cert, server.key and ca.crt- "kafka_data:/bitnami"- "./kafka.keystore.pem:/opt/bitnami/kafka/config/certs/kafka.keystore.pem:ro"- "./kafka.keystore.key:/opt/bitnami/kafka/config/certs/kafka.keystore.key:ro"- "./kafka.truststore.pem:/opt/bitnami/kafka/config/certs/kafka.truststore.pem:ro"
volumes:kafka_data:driver: local

这里我们使用pem格式的证书和key,在上面配置中,volumes下面挂载的kafka.keystore.pem、kafka.keystore.key和kafka.truststore.pem分别对应了以前在Go中常用的名字:server-cert.pem(服务端证书), server-key.pem(服务端私钥)和ca-cert.pem(CA证书)。

这里整理了一个一键生成的脚本docker-compose/bitnami/tls/kafka-generate-cert.sh,我们执行该脚本生成所有需要的证书并放到指定位置(遇到命令行提示,只需要一路回车即可):

$bash kafka-generate-cert.sh 
.........++++++
.............................++++++
You are about to be asked to enter information that will be incorporated
into your certificate request.
What you are about to enter is what is called a Distinguished Name or a DN.
There are quite a few fields but you can leave some blank
For some fields there will be a default value,
If you enter '.', the field will be left blank.
-----
Country Name (2 letter code) [XX]:
State or Province Name (full name) []:
Locality Name (eg, city) [Default City]:
Organization Name (eg, company) [Default Company Ltd]:
Organizational Unit Name (eg, section) []:
Common Name (eg, your name or your server's hostname) []:
Email Address []:Please enter the following 'extra' attributes
to be sent with your certificate request
A challenge password []:
An optional company name []:
Signature ok
subject=/C=XX/L=Default City/O=Default Company Ltd
Getting Private key
.....................++++++
.........++++++
You are about to be asked to enter information that will be incorporated
into your certificate request.
What you are about to enter is what is called a Distinguished Name or a DN.
There are quite a few fields but you can leave some blank
For some fields there will be a default value,
If you enter '.', the field will be left blank.
-----
Country Name (2 letter code) [XX]:
State or Province Name (full name) []:
Locality Name (eg, city) [Default City]:
Organization Name (eg, company) [Default Company Ltd]:
Organizational Unit Name (eg, section) []:
Common Name (eg, your name or your server's hostname) []:
Email Address []:Please enter the following 'extra' attributes
to be sent with your certificate request
A challenge password []:
An optional company name []:
Signature ok
subject=/C=XX/L=Default City/O=Default Company Ltd
Getting CA Private Key

接下来,我们来改造用例,使之支持以tls方式建立到kakfa的连接:

//docker-compose/bitnami/tls/kafka_test.gofunc createTopics(brokers []string, tlsConfig *tls.Config, topics ...string) error {dialer := &kc.Dialer{Timeout:   10 * time.Second,DualStack: true,TLS:       tlsConfig,}conn, err := dialer.DialContext(context.Background(), "tcp", brokers[0])if err != nil {fmt.Println("creating topic: dialer dial error:", err)return err}defer conn.Close()fmt.Println("creating topic: dialer dial ok")... ...
}func newWriter(brokers []string, tlsConfig *tls.Config, topic string) *kc.Writer {w := &kc.Writer{Addr:                   kc.TCP(brokers...),Topic:                  topic,Balancer:               &kc.LeastBytes{},AllowAutoTopicCreation: true,Async:                  true,//RequiredAcks:           0,Completion: func(messages []kc.Message, err error) {for _, message := range messages {if err != nil {fmt.Println("write message fail", err)} else {fmt.Println("write message ok", string(message.Topic), string(message.Value))}}},}if tlsConfig != nil {w.Transport = &kc.Transport{TLS: tlsConfig,}}return w
}func newReader(brokers []string, tlsConfig *tls.Config, topic string) *kc.Reader {dialer := &kc.Dialer{Timeout:   10 * time.Second,DualStack: true,TLS:       tlsConfig,}return kc.NewReader(kc.ReaderConfig{Dialer:   dialer,Brokers:  brokers,Topic:    topic,GroupID:  "test-group",MaxBytes: 10e6, // 10MB})
}func TestProducerAndConsumer(t *testing.T) {var err error_, err = UpDefaultKakfa()if err != nil {t.Fatalf("want nil, actual %v\n", err)}t.Cleanup(func() {DownDefaultKakfa()})brokers := []string{"localhost:9093"}topic := "test-topic"tlsConfig, _ := newTLSConfig()w := newWriter(brokers, tlsConfig, topic)defer w.Close()r := newReader(brokers, tlsConfig, topic)defer r.Close()err = createTopics(brokers, tlsConfig, topic)if err != nil {fmt.Printf("create topic error: %v, but it may not affect the later action, just ignore it\n", err)}time.Sleep(5 * time.Second)... ...
}func newTLSConfig() (*tls.Config, error) {/*// 加载 CA 证书caCert, err := ioutil.ReadFile("/path/to/ca.crt")if err != nil {return nil, err}// 加载客户端证书和私钥cert, err := tls.LoadX509KeyPair("/path/to/client.crt", "/path/to/client.key")if err != nil {return nil, err}// 创建 CertPool 并添加 CA 证书caCertPool := x509.NewCertPool()caCertPool.AppendCertsFromPEM(caCert)*/// 创建并返回 TLS 配置return &tls.Config{//RootCAs:      caCertPool,//Certificates: []tls.Certificate{cert},InsecureSkipVerify: true,}, nil
}

在上述代码中,我们按照segmentio/kafka-go为createTopics、newWriter和newReader都加上了tls.Config参数,此外在测试用例中,我们用newTLSConfig创建一个tls.Config的实例,在这里我们一切简化处理,采用InsecureSkipVerify=true的方式与kafka broker服务端进行握手,既不验证服务端证书,也不做双向认证(mutual TLS)。

下面是修改代码后的测试用例执行结果:

$ go test
exec command: docker-compose -f docker-compose.yml up -d
creating topic: dialer dial ok
creating topic: get controller ok
creating topic: dial control listener ok
create topic error: EOF, but it may not affect the later action, just ignore it
write message error: [3] Unknown Topic Or Partition: the request is for a topic or partition that does not exist on this broker
write message ok  Value-A
write message ok  Value-B
write message ok  Value-C
write message ok  Value-D!
exec command: docker-compose -f docker-compose.yml down -v
PASS
ok   demo 38.473s

这里我们看到:createTopics虽然连接kafka的各个listener都ok,但调用topic创建时,返回EOF,但这的确不影响后续action的执行,不确定这是segmentio/kafka-go的问题,还是kafka实例的问题。另外首次写入消息时,也因为topic或partition未建立而失败,retry后消息正常写入。

通过这个例子我们看到,基于docker-compose建立fake object有着更广泛的灵活性,如果做好容器启动和停止的精准wait机制的话,我可能会更多选择这种方式。

4. 小结

本文介绍了如何在Go编程中进行依赖Kafka的单元测试,并探讨了寻找适合的Kafka fake object的策略。

对于Kafka这样的复杂系统来说,找到合适的fake object并不容易。因此,本文推荐使用容器作为fake object的策略,并分别介绍了使用testcontainers-go项目和使用docker-compose作为简化创建和清理基于容器的依赖项的工具。相对于刚刚加入testcontainers-go项目没多久的kafka module而言,使用docker-compose自定义fake object更加灵活一些。但无论哪种方法,开发人员都需要对kafka的配置有一个较为整体和深入的理解。

文中主要聚焦使用testcontainers-go和docker-compose建立fake kafka的过程,而用例并没有建立明确的sut(被测目标),比如针对某个函数的白盒单元测试。

文本涉及的源码可以在这里[23]下载。


Gopher部落知识星球[24]在2024年将继续致力于打造一个高品质的Go语言学习和交流平台。我们将继续提供优质的Go技术文章首发和阅读体验。同时,我们也会加强代码质量和最佳实践的分享,包括如何编写简洁、可读、可测试的Go代码。此外,我们还会加强星友之间的交流和互动。欢迎大家踊跃提问,分享心得,讨论技术。我会在第一时间进行解答和交流。我衷心希望Gopher部落可以成为大家学习、进步、交流的港湾。让我相聚在Gopher部落,享受coding的快乐! 欢迎大家踊跃加入!

81ed1a8e3b2ec36c2a8bfb5fe42030dc.jpegbb41fd881ae5dae346eabf6602abe4c2.png

239e04916eecca9989a0935430490b0c.png66959bfd13c6d62cd14cfecb26086f2d.jpeg

著名云主机服务厂商DigitalOcean发布最新的主机计划,入门级Droplet配置升级为:1 core CPU、1G内存、25G高速SSD,价格5$/月。有使用DigitalOcean需求的朋友,可以打开这个链接地址[25]:https://m.do.co/c/bff6eed92687 开启你的DO主机之路。

Gopher Daily(Gopher每日新闻) - https://gopherdaily.tonybai.com

我的联系方式:

  • 微博(暂不可用):https://weibo.com/bigwhite20xx

  • 微博2:https://weibo.com/u/6484441286

  • 博客:tonybai.com

  • github: https://github.com/bigwhite

  • Gopher Daily归档 - https://github.com/bigwhite/gopherdaily

c31927feb87c64b0be8ce6876880cd97.jpeg

商务合作方式:撰稿、出书、培训、在线课程、合伙创业、咨询、广告合作。

参考资料

[1] 

Kafka: https://kafka.apache.org

[2] 

依赖Kafka的代码: https://tonybai.com/2023/09/04/slog-in-action-file-logging-rotation-and-kafka-integration/

[3] 

单测时尽量用fake object: https://tonybai.com/2023/04/20/provide-fake-object-for-external-collaborators/

[4] 

单测时尽量用fake object: https://tonybai.com/2023/04/20/provide-fake-object-for-external-collaborators/

[5] 

用于测试的自身简化版的实现(embed): https://github.com/etcd-io/etcd/blob/main/tests/integration/embed

[6] 

单测时尽量用fake object: https://tonybai.com/2023/04/20/provide-fake-object-for-external-collaborators/

[7] 

WASI(WebAssembly System Interface): https://wasi.dev/

[8] 

testcontainers-go: https://golang.testcontainers.org/

[9] 

testcontainers: https://testcontainers.com

[10] 

Rust: https://tonybai.com/2023/02/22/rust-vs-go-in-2023/

[11] 

testcontainers-go: https://github.com/testcontainers/testcontainers-go/

[12] 

以KRaft模式运行的Kafka容器才被首次引入testcontainers-go项目: https://github.com/testcontainers/testcontainers-go/pull/1610

[13] 

confluentinc/confluent-local:7.5.0: https://hub.docker.com/r/confluentinc/confluent-local

[14] 

Confluent: https://www.confluent.io

[15] 

confluent-local: https://hub.docker.com/r/confluentinc/confluent-local

[16] 

confluentinc/cp-kafka镜像: https://hub.docker.com/r/confluentinc/cp-kafka

[17] 

使用segmentio/kafka-go这个客户端: https://tonybai.com/2022/03/28/the-comparison-of-the-go-community-leading-kakfa-clients

[18] 

Go社区主流Kafka客户端简要对比: https://tonybai.com/2022/03/28/the-comparison-of-the-go-community-leading-kakfa-clients

[19] 

发现kafka-go的一个可能导致内存暴涨的问题: https://github.com/segmentio/kafka-go/pull/1117

[20] 

docker-compose来定制和启停我们需要的kafka image: https://tonybai.com/2021/11/26/build-all-in-one-runtime-environment-with-docker-compose

[21] 

testcontainers-go/wait: https://pkg.go.dev/github.com/testcontainers/testcontainers-go@v0.26.0/wait

[22] 

注册了Cleanup函数: https://tonybai.com/2020/03/08/some-changes-in-go-1-14/

[23] 

这里: https://github.com/bigwhite/experiments/tree/master/unit-testing-deps-on-kafka

[24] 

Gopher部落知识星球: https://public.zsxq.com/groups/51284458844544

[25] 

链接地址: https://m.do.co/c/bff6eed92687

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/612143.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

SpringCloud微服务 【实用篇】| RabbitMQ快速入门、SpringAMQP

目录 一&#xff1a;初始RabbitMQ 1. 同步和异步通讯 1.1 同步调用 1.2 异步调用 2. MQ常见框架 二&#xff1a;RabbitMQ快速入门 1. RabbitMQ概述和安装 2. 常见消息队列模型 3. 快速入门案例 三&#xff1a;SpringAMQP 1. Basic Queue 简单队列模型 2. Work Queu…

ORACLE RAC DG文件路径错乱解决办法

最近接手了一个客户的RAC-RAC dg环境的维护,登录上去之后发现dg延迟了8天,由于主库的空间非常紧张,归档日志早就删除了,所以准备使用rman基于scn点的备份恢复的方案恢复dg同步 在备份完成之后,使用新的控制文件进行数据恢复的时候报错datafile 43 not found: 检查了一下发现当…

SpringBoot中使用单例模式+ScheduledExecutorService实现异步多线程任务(若依源码学习)

场景 若依前后端分离版手把手教你本地搭建环境并运行项目&#xff1a; 若依前后端分离版手把手教你本地搭建环境并运行项目_本地运行若依前后端分离-CSDN博客 设计模式-单例模式-饿汉式单例模式、懒汉式单例模式、静态内部类在Java中的使用示例&#xff1a; 设计模式-单例模…

Python requirements.txt 详解

文章目录 1 概述1.1 作用1.2 注意 2 操作2.1 生成 requirements.txt2.2 安装 requirements.txt 3 示例3.1 新建 Django 项目3.2 找到 Scripts 目录&#xff0c;执行生成 requirements.txt 命令 1 概述 1.1 作用 作用&#xff1a;记录 当前项目下 所有 依赖包及其版本号&#…

不知道题目是啥

本题是学校的集训里的题&#xff0c;所有不知道题目名字是啥&#xff0c;直接看题目就好 解题思路&#xff1a;因为字符串只含有小写字母&#xff0c;所以可以创建两个数组分别来存s和t的每个字母出现次数&#xff0c;然后遍历数组&#xff0c;如果s字符串中的某个字母比t的小&…

输电线路分布式故障诊断装置的四大特点介绍-深圳鼎信

输电线路分布式故障诊断装置是一种利用行波测距、无线通信等技术手段实现电网故障定位的设备。这对于电网的故障处理和恢复具有重要意义&#xff0c;可以帮助运维人员提高故障处理的效率&#xff0c;缩短故障处理时间&#xff0c;减少停电时间&#xff0c;提高用户的供电可靠性…

premiere简约大气3D动画logo片头Pr模板Mogrt免费下载

Premiere简约大气3D动画logo片头pr模板mogrt下载&#xff0c;无需插件&#xff0c;高清分辨率&#xff0c;易于自定义&#xff0c;包括教程&#xff0c;不包括音频和图像。免费下载&#xff1a;https://prmuban.com/37065.html

Linux学习(1):目录结构、编辑器和用户管理

Linux学习&#xff08;1&#xff09;&#xff1a;目录结构、编辑器和用户管理 1 Linux目录结构2 vi和vim编辑器2.1 快捷键练习 3 用户管理3.1 添加用户3.2 删除用户即主目录3.3 切换用户 4 用户组 1 Linux目录结构 在linux世界里&#xff0c;一切皆为文件。 linux目录结构&a…

test fuzz-05-模糊测试 kelinci AFL-based fuzzing for Java

拓展阅读 开源 Auto generate mock data for java test.(便于 Java 测试自动生成对象信息) 开源 Junit performance rely on junit5 and jdk8.(java 性能测试框架。性能测试。压测。测试报告生成。) test fuzz-01-模糊测试&#xff08;Fuzz Testing&#xff09; test fuzz-…

Gin CORS 跨域请求资源共享与中间件

Gin CORS 跨域请求资源共享与中间件 文章目录 Gin CORS 跨域请求资源共享与中间件一、同源策略1.1 什么是浏览器的同源策略&#xff1f;1.2 同源策略判依据1.3 跨域问题三种解决方案 二、CORS:跨域资源共享简介(后端技术)三 CORS基本流程1.CORS请求分类2.基本流程 四、CORS两种…

Java项目:02 基于ssm超市订单管理系统

项目介绍 基于ssm超市订单管理系统 环境&#xff1a;jdk1.8&#xff0c;mysql5.7&#xff0c;tomcat8.5&#xff0c;maven3.6 软件&#xff1a;IDEA 功能&#xff1a;超市后台管理系统&#xff0c;有订单管理&#xff0c;供应商管理&#xff0c;用户管理&#xff0c;密码修改&…

阿赵UE学习笔记——9、材质和材质实例

阿赵UE学习笔记目录 大家好&#xff0c;我是阿赵。   继续学习虚幻引擎&#xff0c;这次来了解一下UE里面关于材质的一些概念性的东西。 一、材质 材质这个概念&#xff0c;在所有三维软件里面都会有&#xff0c;比如3Dsmax里面的材质球&#xff0c;或者Unity里面的Material…

解决docker run报错:Error response from daemon: No command specified.

将docker镜像export/import之后&#xff0c;对新的镜像执行docker run时报错&#xff1a; docker: Error response from daemon: No command specified. 解决方法&#xff1a; 方案1&#xff1a; 查看容器的command&#xff1a; docker ps --no-trunc 在docker run命令上增加…

【Python】AttributeError: module ‘torch.nn‘ has no attribute ‘HardSigmoid‘

AttributeError: module ‘torch.nn’ has no attribute ‘HardSigmoid’ 这个错误是因为PyTorch的torch.nn模块中并没有HardSigmoid这个函数。是拼写的大小写问题&#xff0c;换成nn.Hardsigmoid()即可。 如下述代码出错。 import torch import torch.nn as nn hard_sigmoid…

自动化的力量可实现更好的供应商风险管理

长期以来&#xff0c;公司一直依赖制造商、服务提供商、供应商或顾问等丰富的外部各方网络来促进整体运营并从外部专业知识或产品中获益。虽然这些合作伙伴关系通常是互惠互利的&#xff0c;但公司也需要意识到第三方甚至第四方供应商带来的潜在风险&#xff0c;并考虑整个供应…

VSCode使用MinGW编译器,配置C/C++环境

目录 一、安装VSCode 二、安装MinGW编译器 1、配置环境变量 2、测试配置是否成功 三、配置VSCode 1、安装所需扩展 2、新建代码存放文件夹 3、添加配置文件 4、配置文件内容 &#xff08;1&#xff09;c_cpp_properties.json &#xff08;2&#xff09;launch.json …

基于Java SSM框架实现线上教学平台系统项目【项目源码+论文说明】计算机毕业设计

基于java的SSM框架实现线上教学平台演示 摘要 在社会快速发展的影响下&#xff0c;使线上教学平台的管理和运营比过去十年更加理性化。依照这一现实为基础&#xff0c;设计一个快捷而又方便的网上线上教学平台系统是一项十分重要并且有价值的事情。对于传统的线上教学平台控制…

走进shell

Linux系统启动时&#xff0c;会自动创建多个虚拟控制台。虚拟控制台是运行在Linux系统内存中的终端会话。 打开Linux控制台Terminal使用tty命令查看当前使用的虚拟控制台。 注&#xff1a;tty 表示电传打字机(teletypewriter) $ tty /dev/pts/0表示当前使用的是/dev/pts/0 虚拟…

(1)(1.13) SiK无线电高级配置(五)

文章目录 前言 10 可用频率范围 11 DUTY_CYCLE 设置 12 低延迟模式 13 先听后说 (LBT) 14 升级无线电固件 15 MAVLink协议说明 前言 本文提供 SiK 遥测无线电(SiK Telemetry Radio)的高级配置信息。它面向"高级用户"和希望更好地了解无线电如何运行的用户。 1…

C#基础:通过QQ邮件发送验证码到指定邮箱

一、控制台程序 using System; using System.Net; using System.Net.Mail;public class EmailSender {public void SendEmail(string toAddress, string subject, string body){// 设置发件人邮箱地址以及授权码string fromAddress "xxxxxqq.com";string password …