Go 如何通过 Kafka 客户端库 生产与消费消息

文章目录

  • 0.前置说明
    • 1. confluent-kafka-go
    • 2. sarama
    • 3. segmentio/kafka-go
    • 4. franz-go
      • 选择建议
  • 1.启动 kafka 集群
  • 2.安装 confluent-kafka-go 库
  • 3.创建生产者
    • 特殊文件说明
    • 如何查看.log文件内容
  • 4.创建消费者

0.前置说明

Go 语言中有一些流行的 Kafka 客户端库。以下是几个常用的库及其优劣与区别:

1. confluent-kafka-go

  • 优点

    • 高性能:基于 librdkafka,性能非常高。
    • 功能全面:支持 Kafka 的所有高级功能,如事务、压缩、认证等。
    • 社区支持:由 Confluent 维护,社区活跃,文档丰富。
    • 稳定性:广泛使用于生产环境,经过大量测试和验证。
  • 缺点

    • 依赖性:依赖于 librdkafka,需要额外安装该库。
    • 复杂性:配置和使用相对复杂,特别是对于新手。

2. sarama

  • 优点

    • 纯 Go 实现:不依赖于任何 C 库,安装和使用非常方便。
    • 社区活跃:由 Shopify 维护,社区支持良好,文档齐全。
    • 灵活性:提供了丰富的配置选项,适用于各种使用场景。
  • 缺点

    • 性能:相对于 confluent-kafka-go,性能稍逊一筹。
    • 功能:不支持 Kafka 的一些高级功能,如事务。

3. segmentio/kafka-go

  • 优点

    • 纯 Go 实现:不依赖于任何 C 库,安装和使用非常方便。
    • 简洁易用:API 设计简洁,易于上手。
    • 灵活性:支持多种配置选项,适用于各种使用场景。
  • 缺点

    • 性能:相对于 confluent-kafka-go,性能稍逊一筹。
    • 功能:不支持 Kafka 的一些高级功能,如事务。

4. franz-go

  • 优点

    • 纯 Go 实现:不依赖于任何 C 库,安装和使用非常方便。
    • 高性能:在纯 Go 实现中性能较为优越。
    • 功能全面:支持 Kafka 的大部分功能,包括事务。
  • 缺点

    • 社区支持:相对于 saramaconfluent-kafka-go,社区支持稍弱。
    • 文档:文档相对较少,需要更多的社区贡献。

选择建议

  • 高性能和高级功能需求:如果你需要高性能和 Kafka 的高级功能(如事务、压缩、认证等),confluent-kafka-go 是一个不错的选择。
  • 纯 Go 实现和易用性:如果你更倾向于使用纯 Go 实现的库,并且希望安装和使用更加简便,可以选择 saramasegmentio/kafka-go
  • 平衡性能和功能:如果你希望在纯 Go 实现中获得较好的性能和功能支持,可以考虑 franz-go

本文我们就以confluent-kafka-go库为例来编写代码。

1.启动 kafka 集群

不知道如何搭建集群请点击这里 ----》Kafka 集群部署(CentOS 单机模拟版)

如果你懒得启动集群,那么直接跳过

  1. cluster目录下运行集群启动脚本 cluster.sh;
cd cluster
./cluster.sh
  1. 检查是否启动成功;
ll zookeeper-data/
total 4
drwxr-xr-x 3 root root 4096 May 27 10:20 zookeeperll broker-data/
total 12
drwxr-xr-x 2 root root 4096 May 27 10:21 broker-1
drwxr-xr-x 2 root root 4096 May 27 10:21 broker-2
drwxr-xr-x 2 root root 4096 May 27 10:21 broker-3

2.安装 confluent-kafka-go 库

  1. 查看你的go工作目录
echo $GOPATH
  1. GOPATH目录下的src目录下新建 produce 项目
mkdir src/produce
cd src/produce
  1. 在你的项目目录中运行 go mod init 命令来初始化一个新的 Go 模块
go mod init produce
  1. 安装 confluent-kafka-go
go get github.com/confluentinc/confluent-kafka-go/kafka

3.创建生产者

  1. 新建文件 producer.go
touch producer.go
  1. 编写代码
package mainimport ("fmt""log""github.com/confluentinc/confluent-kafka-go/kafka"
)func main() {// 创建生产者实例broker := "localhost:9091" // 集群地址topic := "test"            // 主题名称producer, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": broker}) // 创建生产者实例// 检查错误if err != nil {log.Fatalf("Failed to create producer: %s", err)}defer producer.Close()fmt.Printf("Created Producer %v\n", producer)// 生产消息message := "hello kafka"for i := 0; i < 10; i++ {producer.Produce(&kafka.Message{TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny}, // 任题名称Value:          []byte(message + fmt.Sprintf("%d", i)),                             // 消息内容}, nil)}if err != nil {log.Fatalf("Failed to produce message: %v", err)}// 等待消息发送完成e := <-producer.Events() // 阻塞直到消息发送完成switch ev := e.(type) {case *kafka.Message:if ev.TopicPartition.Error != nil {log.Printf("Failed to deliver message: %v", ev.TopicPartition)} else {fmt.Printf("Delivered message: %s to %v\n", string(ev.Value), ev.TopicPartition)}}// 冲刷缓冲区消息producer.Flush(15 * 1000)
}

代码说明

  1. 创建生产者时需要指定集群地址以及主题信息,如果没有该主题则自动创建
  2. 生产者会异步地将消息发送到 Kafka,因此你需要处理交付报告以确保消息成功发送。

我们需要了解一下Go语言和Kafka之间的关系:Go是一种静态类型、编译型的编程语言,由Google开发并开源。它适用于构建高性能服务器端应用程序和网络服务。而Apache Kafka是一个分布式流处理平台,主要面向大规模数据传输和存储。

在这个例子中,我们有一个生产者程序,它使用Kafka的客户端库来连接到Kafka集群,然后通过创建一个生产者实例来开始发送消息。当生产者准备好要发送的消息时,它就会调用Send()方法将其添加到缓冲区中。一旦缓冲区满了或者用户主动触发了Flush()方法,生产者就会把缓冲区里的所有消息一起发送给Kafka集群。

  1. 编译运行,生产者发送消息
go build producer.go 
./producer 
Created Producer rdkafka#producer-1
Delivered message: hello kafka0 to test[0]@0
  1. 查看消息
ll cluster/broker-data/broker-1
total 20
-rw-r--r-- 1 root root    0 May 27 10:20 cleaner-offset-checkpoint
-rw-r--r-- 1 root root    4 May 27 11:36 log-start-offset-checkpoint
-rw-r--r-- 1 root root   88 May 27 10:20 meta.properties
-rw-r--r-- 1 root root   13 May 27 11:36 recovery-point-offset-checkpoint
-rw-r--r-- 1 root root   14 May 27 11:36 replication-offset-checkpoint
drwxr-xr-x 2 root root 4096 May 27 11:21 test-0 # 我们创建的主题 数字代表分区号ll cluster/broker-data/broker-1/test-0/
total 12
-rw-r--r-- 1 root root 10485760 May 27 11:21 00000000000000000000.index
-rw-r--r-- 1 root root      251 May 27 11:21 00000000000000000000.log
-rw-r--r-- 1 root root 10485756 May 27 11:21 00000000000000000000.timeindex
-rw-r--r-- 1 root root        8 May 27 11:21 leader-epoch-checkpoint
-rw-r--r-- 1 root root       43 May 27 11:21 partition.metadata

特殊文件说明

Kafka 的数据文件存储在每个分区的目录中,这些文件包括 .index.log.timeindexleader-epoch-checkpointpartition.metadata 文件。每个文件都有其特定的用途,下面是对这些文件的详细解释:

  1. .log 文件

    • 用途:存储实际的消息数据。
    • 描述:这是 Kafka 中最重要的文件,包含了生产者发送到 Kafka 的消息。每个 .log 文件代表一个日志段(log segment),文件名通常是该段的起始偏移量(offset)。
  2. .index 文件

    • 用途:存储消息偏移量到物理文件位置的映射。
    • 描述:这个文件是一个稀疏索引,允许 Kafka 快速查找特定偏移量的消息。通过这个索引,Kafka 可以避免从头开始扫描整个日志文件,从而提高查找效率。
  3. .timeindex 文件

    • 用途:存储消息时间戳到物理文件位置的映射。
    • 描述:这个文件允许 Kafka 根据时间戳快速查找消息。它是一个稀疏索引,类似于 .index 文件,但索引的是时间戳而不是偏移量。
  4. leader-epoch-checkpoint 文件

    • 用途:记录分区的领导者纪元(leader epoch)信息。
    • 描述:这个文件包含了每个纪元的起始偏移量。领导者纪元是 Kafka 用来跟踪分区领导者变化的机制。每次分区领导者发生变化时,纪元号会增加。这个文件帮助 Kafka 在领导者变更时进行数据恢复和一致性检查。
  5. partition.metadata 文件

    • 用途:存储分区的元数据信息。
    • 描述:这个文件包含了分区的一些基本信息,如分区的版本号等。它帮助 Kafka 管理和维护分区的元数据。

这些文件共同作用,确保 Kafka 能够高效、可靠地存储和检索消息数据。

如何查看.log文件内容

  • 执行指令
 ~/cluster/broker-1/bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files ./00000000000000000000.log --print-data-log
~/cluster/broker-1/bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files ./00000000000000000000.log --print-data-log
Dumping ./00000000000000000000.log
Log starting offset: 0
baseOffset: 0 lastOffset: 9 count: 10 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false deleteHorizonMs: OptionalLong.empty position: 0 CreateTime: 1716780091840 size: 251 magic: 2 compresscodec: none crc: 997822510 isvalid: true
| offset: 0 CreateTime: 1716780091840 keySize: -1 valueSize: 12 sequence: -1 headerKeys: [] payload: hello kafka0
| offset: 1 CreateTime: 1716780091840 keySize: -1 valueSize: 12 sequence: -1 headerKeys: [] payload: hello kafka1
| offset: 2 CreateTime: 1716780091840 keySize: -1 valueSize: 12 sequence: -1 headerKeys: [] payload: hello kafka2
| offset: 3 CreateTime: 1716780091840 keySize: -1 valueSize: 12 sequence: -1 headerKeys: [] payload: hello kafka3
| offset: 4 CreateTime: 1716780091840 keySize: -1 valueSize: 12 sequence: -1 headerKeys: [] payload: hello kafka4
| offset: 5 CreateTime: 1716780091840 keySize: -1 valueSize: 12 sequence: -1 headerKeys: [] payload: hello kafka5
| offset: 6 CreateTime: 1716780091840 keySize: -1 valueSize: 12 sequence: -1 headerKeys: [] payload: hello kafka6
| offset: 7 CreateTime: 1716780091840 keySize: -1 valueSize: 12 sequence: -1 headerKeys: [] payload: hello kafka7
| offset: 8 CreateTime: 1716780091840 keySize: -1 valueSize: 12 sequence: -1 headerKeys: [] payload: hello kafka8
| offset: 9 CreateTime: 1716780091840 keySize: -1 valueSize: 12 sequence: -1 headerKeys: [] payload: hello kafka9

如上我们可以看到消息已经成功的发送。

4.创建消费者

  1. 创建消费者项目
mkdir src/consume
cd src/consume
  1. 在你的项目目录中运行 go mod init 命令来初始化一个新的 Go 模块
go mod init consume
  1. 安装 confluent-kafka-go
go get github.com/confluentinc/confluent-kafka-go/kafka
  1. 新建文件
touch consumer.go
  1. 编写代码
package mainimport ("fmt""log""github.com/confluentinc/confluent-kafka-go/kafka"
)func main() {// 创建消费者实例broker := "localhost:9091" // 集群地址topic := "test"            // 主题名称c, err := kafka.NewConsumer(&kafka.ConfigMap{"bootstrap.servers": broker,     // 集群地址"group.id":          "my-group", // 消费者组"auto.offset.reset": "earliest", // 设置偏移量 从头开始消费})// 检查错误if err != nil {log.Printf("Failed to create consumer: %s\n", err)}defer c.Close()// 描述订阅主题c.SubscribeTopics([]string{topic}, nil)fmt.Printf("Consuming topic %s\n", topic)// 消费消息for {msg, err := c.ReadMessage(-1) // 阻塞直到消息到达if err == nil {fmt.Printf("Consumed message: %s\n", msg.Value)} else {// 消费者错误fmt.Printf("Consumer error: %v (%v)\n", err, msg)}}
}
  1. 编译并运行
go build consumer.go 
./consumer 
Consuming topic test
Consumed message: hello kafka0
Consumed message: hello kafka1
Consumed message: hello kafka2
Consumed message: hello kafka3
Consumed message: hello kafka4
Consumed message: hello kafka5
Consumed message: hello kafka6
Consumed message: hello kafka7
Consumed message: hello kafka8
Consumed message: hello kafka9

可以看到已经成功的消费刚才生产的消息。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/20208.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【Uniapp小程序】自定义导航栏uni-nav-bar滚动渐变色

效果图 新建activityScrollTop.js作为mixins export default {data() {return {navBgColor: "rgba(0,0,0,0)", // 初始背景颜色为完全透明navTextColor: "rgba(0,0,0,1)", // 初始文字颜色};},onPageScroll(e) {// 设置背景const newAlpha Math.min((e.s…

踩坑:6年后为何不用GraphQL了?

GraphQL 是一项令人难以置信的技术&#xff0c;自从我在 2018 年首次开始将其投入生产以来&#xff0c;它就吸引了很多人的注意力。 在一大堆无类型的 JSON REST API 上构建了许多 React SPA 之后&#xff0c;我发现 GraphQL 是一股清新的空气。 然而&#xff0c;随着时间的推…

mybatis用map接收返回对象,不想让数据类型为tinyint自动转换为boolean,如何处理

在 MyBatis 中&#xff0c;当使用 Map 来接收查询结果时&#xff0c;MyBatis 会根据列的数据类型自动选择合适的 Java 类型来映射这些值。默认情况下&#xff0c;如果数据库列是 TINYINT(1)&#xff0c;MyBatis 可能会错误地将其映射为 boolean&#xff0c;因为它经常被误解为只…

PPP认证两种:PAP和CHAP,两次握手和三次握手

CHAP&#xff08;Challenge-Handshake Authentication Protocol&#xff0c;质询握手认证协议&#xff09;的设计理念是增强网络认证过程的安全性。在CHAP的三次握手过程中&#xff0c;不直接传送用户的明文密码&#xff0c;以此来提高安全性&#xff0c;具体步骤如下&#xff…

开源大模型源代码

开源大模型的源代码可以在多个平台上找到&#xff0c;以下是一些知名的开源大模型及其源代码的获取方式&#xff1a; 1. **艾伦人工智能研究所的开放大语言模型&#xff08;Open Language Model&#xff0c;OLMo&#xff09;**&#xff1a; - 提供了完整的模型权重、训练代…

springboot结合mybatis使用多数据源的方式

背景 最近有一个需求&#xff0c;有两个库需要做同步数据&#xff0c;一个Doris库&#xff0c;一个mysql库&#xff0c;两边的表结构一致&#xff0c;这里不能使用navicat等工具提供的数据传输之类的功能&#xff0c;只能使用代码做同步&#xff0c;springboot配置多数据…

如何设置手机的DNS

DNS 服务器 IP 地址 苹果 华为 小米 OPPO VIVO DNS 服务器 IP 地址 中国大陆部分地区会被运营商屏蔽网络导致无法访问&#xff0c;可修改手机DNS解决。 推荐 阿里的DNS (223.5.5.5&#xff09;或 114 (114.114.114.114和114.114.115.115) 更多公开DNS参考&#xff1a; 苹果…

ESP32-C3模组上实现蓝牙BLE配网功能(1)

本文内容参考&#xff1a; 《ESP32-C3 物联网工程开发实战》 乐鑫科技 蓝牙的名字由来是怎样的&#xff1f;为什么不叫它“白牙”&#xff1f; 特此致谢&#xff01; 一、蓝牙知识基础 1. 什么是蓝牙&#xff1f; &#xff08;1&#xff09;简介 蓝牙技术是一种无线数据和…

【缓存】OS层面缓存设计机制

操作系统的缓存设计机制是计算机体系结构中的一个重要组成部分&#xff0c;旨在提高系统的性能&#xff0c;特别是通过减少对慢速存储设备&#xff08;如硬盘&#xff09;的访问次数来加速数据的读取和写入。 以下是一些常见的操作系统缓存设计机制&#xff1a; CPU缓存&…

web学习笔记(六十一)

目录 如何使用公共组件来编写页面 如何使用公共组件来编写页面 1.导入公共组件nav.vue import Catenav from "/components/nav.vue"; 2.在页面插入子组件 如果使用了setup语法糖此时就可以直接在页面插入 <Catenav ></Catenav>标签&#xff0c; …

.NET 快速重构概要1

1.封装集合 在某些场景中,向类的使用者隐藏类中的完整集合是一个很好的做法,比如对集合的 add/remove 操作中包 含其他的相关逻辑时。因此,以可迭代但不直接在集合上进行操作的方式来暴露集合,是个不错的主意。 public class Order { private int _orderTotal; private Li…

Camunda BPM架构

Camunda BPM既可以单独作为流程引擎服务存在,也能嵌入到其他java应用中。Camunda BPM的核心流程引擎是一个轻量级的模块,可以被Spring管理或者加入到自定义的编程模型中,并且支持线程模型。 1,流程引擎架构 流程引擎由多个组件构成,如下所示: API服务 API服务,允许ja…

逻辑回归分类算法

文章目录 算法推导 线性回归解决连续值的回归预测&#xff1b;而逻辑回归解决离散值的分类预测&#xff1b; 算法推导 逻辑回归可以看作是两部分&#xff0c;以0、1分类问题说明&#xff1b; 线性回归部分 对于一个样本 x i x_i xi​&#xff0c;有n个特征 x i ( 1 ) x_i^{(1)…

蒙自源儿童餐新品上市,引领健康美味新潮流

随着夏日的热烈与儿童节的欢乐氛围到来&#xff0c;蒙自源品牌隆重推出儿童餐新品&#xff0c;以“快乐不分大小&#xff0c;谁还不是个宝宝”为主题&#xff0c;为广大消费者带来一场健康与美味的盛宴。新品上市活动将于5月25日举行&#xff0c;蒙自源将以其独特的产品魅力和创…

install

目录 1、 install 1.1、 //creates form with validation 1.2、 onStepChanging: function (event, currentIndex, newIndex) { 1.3、 onFinishing: function (event, currentIndex) { 1.4、 //init inst

最新 HUAWEI DevEco Studio 调试技巧

最新 HUAWEI DevEco Studio 调试技巧 前言 在我们使用 HUAWEI DevEco Studio 编辑器开发鸿蒙应用时&#xff0c;免不了要对我们的应用程序进行代码调试。我们根据实际情况&#xff0c;一般会用到以下三种方式进行代码调试。 肉眼调试法注释排错调试法控制台输出法弹出提示法断…

【算法实战】每日一题:将某个序列中内的每个元素都设为相同的值的最短次数(差分数组解法,附概念理解以及实战操作)

题目 将某个序列中内的每个元素都设为相同的值的最短次数 1.差分数组&#xff08;后面的减去前面的值存储的位置可以理解为中间&#xff09; 差分数组用于处理序列中的区间更新和查询问题。它存储序列中相邻元素之间的差值&#xff0c;而不是直接存储每个元素的值 怎么对某…

STM32 入门教程(江科大教材)#笔记2

3-4按键控制LED /** LED.c**/ #include "stm32f10x.h" // Device headervoid LED_Init(void) {/*开启时钟*/RCC_APB2PeriphClockCmd(RCC_APB2Periph_GPIOA, ENABLE); //开启GPIOA的时钟/*GPIO初始化*/GPIO_InitTypeDef GPIO_InitStructure;GPIO_I…

关系数据库:关系运算

文章目录 关系运算并&#xff08;Union&#xff09;差&#xff08;Difference&#xff09;交&#xff08;Intersection&#xff09;笛卡尔积&#xff08;Extended Cartesian Product&#xff09;投影&#xff08;projection&#xff09;选择&#xff08;Selection&#xff09;除…

微信小程序中应用van-calendar时加载时间过长,以及设置min-data无效的问题解决

一、我们微信小程序中应用van-calendar时&#xff0c;如果没有设置min-data&#xff0c;那么页面的加载时间会非常长&#xff0c;所以&#xff0c;一定一定要配置min-data&#xff1b; 二、vue中min-data的写法是:min-data“new Date(2023, 0, 1)”&#xff0c;而在小程序中的写…