51.Go操作kafka示例(kafka-go库)

文章目录

  • 一、简介
  • 二、生产者
  • 三、消费者

代码地址:https://gitee.com/lymgoforIT/golang-trick/tree/master/31-kafka-go

一、简介

之前已经介绍过一个操作kafka的go库了,28.windows安装kafka,Go操作kafka示例(sarama库) ,但是这个库比较老了,当前比较流行的库是github.com/segmentio/kafka-go,所以本次我们就使用一下它。

我们在GitHub直接输入kafka并带上language标签为Go时,可以可以看到当前get github.com/segmentio/kafka-go库是最流行的。
在这里插入图片描述

首先启动kafka的服务器,然后在项目中go get github.com/segmentio/kafka-go

接着我们就可以创建生产者和消费者了,注意:在实际工作中,一般是一个服务为生产者,另一个服务作为消费者,但是本案例中不涉及微服务,就是演示一下生成和消费的示例代码,因此写到了一个服务当中。代码文件组织如下:
在这里插入图片描述
user.go :用于测试发送和消费结构体字符串消息

package modeltype User struct {Id       int64  `json:"id"`UserName string `json:"user_name"`Age      int64  `json:"age"`
}

二、生产者

启动zookeeperkafka,并创建名为testtopic,步骤可以参考:28.windows安装kafka,Go操作kafka示例(sarama库)

producer.go

package producerimport ("context""encoding/json""fmt""golang-trick/31-kafka-go/model""time""github.com/segmentio/kafka-go"
)var (topic    = "user"Producer *kafka.Writer
)func init() {Producer = &kafka.Writer{Addr:                   kafka.TCP("localhost:9092"), //TCP函数参数为不定长参数,可以传多个地址组成集群Topic:                  topic,Balancer:               &kafka.Hash{}, // 用于对key进行hash,决定消息发送到哪个分区MaxAttempts:            0,WriteBackoffMin:        0,WriteBackoffMax:        0,BatchSize:              0,BatchBytes:             0,BatchTimeout:           0,ReadTimeout:            0,WriteTimeout:           time.Second,       // kafka有时候可能负载很高,写不进去,那么超时后可以放弃写入,用于可以丢消息的场景RequiredAcks:           kafka.RequireNone, // 不需要任何节点确认就返回Async:                  false,Completion:             nil,Compression:            0,Logger:                 nil,ErrorLogger:            nil,Transport:              nil,AllowAutoTopicCreation: false, // 第一次发消息的时候,如果topic不存在,就自动创建topic,工作中禁止使用}
}// 生产消息,发送user信息
func SendMessage(ctx context.Context, user *model.User) {msgContent, err := json.Marshal(user)if err != nil {fmt.Println(fmt.Sprintf("json marshal user err,user:%v,err:%v", user, err))}msg := kafka.Message{Topic:         "",Partition:     0,Offset:        0,HighWaterMark: 0,Key:           []byte(fmt.Sprintf("%d", user.Id)),Value:         msgContent,Headers:       nil,WriterData:    nil,Time:          time.Time{},}err = Producer.WriteMessages(ctx, msg)if err != nil {fmt.Println(fmt.Sprintf("写入kafka失败,user:%v,err:%v", user, err))}
}

main.go: 测试消息发送

package mainimport ("context""fmt""golang-trick/31-kafka-go/model""golang-trick/31-kafka-go/producer"
)func main() {ctx := context.Background()for i := 0; i < 5; i++ {user := &model.User{Id:       int64(i + 1),UserName: fmt.Sprintf("lym:%d", i),Age:      18,}producer.SendMessage(ctx, user)}producer.Producer.Close() // 消息发送完毕后,关闭生产者
}

可以看到五条消息都发送成功
在这里插入图片描述

三、消费者

consumer.go

package consumerimport ("context""encoding/json""fmt""golang-trick/24-gin-learning/class08/model""time""github.com/segmentio/kafka-go"
)var (topic    = "user"Consumer *kafka.Reader
)func init() {Consumer = kafka.NewReader(kafka.ReaderConfig{Brokers:                []string{"localhost:9092"}, // broker地址 数组GroupID:                "test",                     // 消费者组id,每个消费者组可以消费kafka的完整数据,但是同一个消费者组中的消费者根据设置的分区消费策略共同消费kafka中的数据GroupTopics:            nil,Topic:                  topic, // 消费哪个topicPartition:              0,Dialer:                 nil,QueueCapacity:          0,MinBytes:               0,MaxBytes:               0,MaxWait:                0,ReadBatchTimeout:       0,ReadLagInterval:        0,GroupBalancers:         nil,HeartbeatInterval:      0,CommitInterval:         time.Second, // offset 上报间隔PartitionWatchInterval: 0,WatchPartitionChanges:  false,SessionTimeout:         0,RebalanceTimeout:       0,JoinGroupBackoff:       0,RetentionTime:          0,StartOffset:            kafka.FirstOffset, // 仅对新创建的消费者组生效,从头开始消费,工作中可能更常用从最新的开始消费kafka.LastOffsetReadBackoffMin:         0,ReadBackoffMax:         0,Logger:                 nil,ErrorLogger:            nil,IsolationLevel:         0,MaxAttempts:            0,OffsetOutOfRangeError:  false,})
}// 消费消息
func ReadMessage(ctx context.Context) {// 消费者应该通过协程一直开着,一直消费for {if msg, err := Consumer.ReadMessage(ctx); err != nil {fmt.Println(fmt.Sprintf("读kafka失败,err:%v", err))break // 当前消息读取失败时,并不退出for终止所有后续消费,而是跳过该消息即可} else {user := &model.User{}err := json.Unmarshal(msg.Value, user)if err != nil {fmt.Println(fmt.Sprintf("json unmarshal msg value err,msg:%v,err:%v", user, err))break // 当前消息处理失败时,并不退出for终止所有后续消费,而是跳过该消息即可}fmt.Println(fmt.Sprintf("topic=%s,partition=%d,offset=%d,key=%s,user=%v", msg.Topic, msg.Partition, msg.Offset, msg.Key, user))}}
}

main.go: 测试接收消息

package mainimport ("context""fmt""golang-trick/31-kafka-go/consumer""os""os/signal""syscall"
)// 需要监听信息2和15,在程序退出时,关闭Consumer
func listenSignal() {c := make(chan os.Signal, 1)signal.Notify(c, syscall.SIGINT, syscall.SIGTERM)sig := <-cfmt.Printf("收到信号 %s ", sig.String())if consumer.Consumer != nil {consumer.Consumer.Close()}os.Exit(0)
}func main() {ctx := context.Background()//for i := 0; i < 5; i++ {//	user := &model.User{//		Id:       int64(i + 1),//		UserName: fmt.Sprintf("lym:%d", i),//		Age:      18,//	}//	producer.SendMessage(ctx, user)//}//producer.Producer.Close()go consumer.ReadMessage(ctx)listenSignal()
}

启动后,因为我们设置的从头开始消费,所以原有的五条消息消费成功,然后在等待着队列中有消息时继续消费
在这里插入图片描述
我们可以通过kafka客户端发两条消息,看看我们的消费者程序是否能消费到

在这里插入图片描述
最后关闭服务停止消费
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/214695.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

基于JavaWeb+SpringBoot+Vue在线拍卖系统的设计和实现

基于JavaWebSpringBootVue在线拍卖系统系统的设计和实现 源码获取入口Lun文目录前言主要技术系统设计功能截图订阅经典源码专栏Java项目精品实战案例《500套》 源码获取 源码获取入口 Lun文目录 摘 要 1 Abstract 1 1 系统概述 4 1.1 概述 4 1.2课题意义 4 1.3 主要内容 4 2 …

C++新经典模板与泛型编程:策略技术中的算法策略

策略技术中的算法策略 在之前博客中funcsum()函数模板中&#xff0c;实现了对数组元素的求和运算。求和在这里可以看作一种算法&#xff0c;扩展一下思路&#xff0c;对数组元素求差、求乘积、求最大值和最小值等&#xff0c;都可以看作算法。而当前的funcsum()函数模板中&…

MySQL使用教程

数据构成了我们日益数字化的社会基础。想象一下&#xff0c;从移动应用和银行系统到搜索引擎&#xff0c;再到如 ChatGPT 这样的先进人工智能聊天机器人&#xff0c;这些工具若没有数据支撑&#xff0c;将寸步难行。你有没有好奇过这些海量数据都存放在哪里呢&#xff1f;答案正…

2023年团体程序设计天梯赛——总决赛题

F-L1-1 最好的文档 有一位软件工程师说过一句很有道理的话&#xff1a;“Good code is its own best documentation.”&#xff08;好代码本身就是最好的文档&#xff09;。本题就请你直接在屏幕上输出这句话。 输入格式&#xff1a; 本题没有输入。 输出格式&#xff1a; 在一…

ALNS4VRPTWTF

文章概述 文章研究了城市物流背景下带有第三方转运设施的车辆路径问题。与经典的车辆路径问题不同&#xff0c;这些问题提供了将客户需求交付给第三方转运设施&#xff08;如城市集散中心&#xff09;的选择&#xff0c;并收取一定的费用。为了解决这些挑战&#xff0c;该研究…

LeetCode 279完全平方数 139单词拆分 卡码网 56携带矿石资源(多重背包) | 代码随想录25期训练营day45

动态规划算法6 LeetCode 279 完全平方数 2023.12.11 题目链接代码随想录讲解[链接] int numSquares(int n) {//1确定dp数组&#xff0c;其下标表示j的完全平方数的最少数量//3初始化&#xff0c;将dp[0]初始化为0&#xff0c;用于计算&#xff0c;其他值设为INT_MAX用于递推…

物料分类帐概览

原文地址&#xff1a;Overview: What is SAP Material Ledger? | SAP Blogs 物料分类账是收集物料主数据存储在物料主数据中的物料交易数据的工具。 物料分类帐使用此数据来计算价格以评估这些物料。 物料台账是实际成本核算的基础。它允许以多种货币对材料库存进行评估&am…

对象的生离死别

对象的生离死别 实验介绍 在构建一个类时&#xff0c;一般情况下需要编写构造函数、拷贝构造函数以及析构函数&#xff0c;这将直接影响程序的运行。而初始化列表是在调用构造函数时初始化参数的方式。 一个对象从实例化到销毁的历程&#xff1a; 知识点 内存分区构造函数exp…

LabVIEW开发矿井排水监控系统

LabVIEW开发矿井排水监控系统 针对矿井水害对煤矿安全生产构成的威胁&#xff0c;设计了一种基于嵌入式PLC和LabVIEW的矿井排水监控系统。该系统结合了PLC的可靠控制与单片机的应用灵活性&#xff0c;有效克服了传统排水方法中的不足&#xff0c;如测量不准确、效率低下等问题…

ESP8266模块(CH340)零基础实战

USB数据线连接ESP8266模块到电脑 先按住FLASH键,再按一下RST键,然后松开 此时电脑可识别出CH340 COM接口 CH340芯片厂商网址: wch.cn 传输比特率9600 win11自带驱动 下载Arduino IDE

一文了解什么是Selenium自动化测试?

一、Selenium是什么&#xff1f; 用官网的一句话来讲&#xff1a;Selenium automates browsers. Thats it&#xff01;简单来讲&#xff0c;Selenium是一个用于Web应用程序自动化测试工具。Selenium测试直接运行在浏览器中&#xff0c;就像真正的用户在操作浏览器一样。支持的浏…

嵌入式系统复习--概述

文章目录 基本概念嵌入式系统的组成结构嵌入式操作系统嵌入式软件开发环境硬件基础简介下一篇 基本概念 嵌入式计算机&#xff1a;把嵌入到对象体系中、实现对象体系智能化控制的带有微控制器的计算机&#xff0c;称作嵌入式计算机 嵌入式系统&#xff1a;以应用为中心&#…

harmonyOS学习笔记之@Provide装饰器和@Consume装饰器

Provide和Consume&#xff0c;应用于与后代组件的双向数据同步&#xff0c;应用于状态数据在多个层级之间传递的场景。不同于State/Link装饰器修饰的 父子组件之间通过命名参数机制传递&#xff0c;Provide和Consume摆脱参数传递机制的束缚&#xff0c;实现跨层级传递。 其中Pr…

基于Java的招聘系统的设计与实现

末尾获取源码 开发语言&#xff1a;Java Java开发工具&#xff1a;JDK1.8 后端框架&#xff1a;SSM 前端&#xff1a;Vue 数据库&#xff1a;MySQL5.7和Navicat管理工具结合 服务器&#xff1a;Tomcat8.5 开发软件&#xff1a;IDEA / Eclipse 是否Maven项目&#xff1a;是 目录…

OWASP Web 安全测试指南 WSTG

Eoin Keary的前言 软件不安全的问题可能是我们这个时代最重要的技术挑战。支持业务、社交网络等的 Web 应用程序的急剧兴起只会加剧建立一种强大的方法来编写和保护我们的 Internet、Web 应用程序和数据的要求。 在开放 Web 应用程序安全项目 &#xff08;OWASP&#xff09; 中…

HarmonyOS应用开发-手写板

这是一个基于HarmonyOS做的一个手写板应用&#xff0c;只需要简单的几十行代码&#xff0c;就可以实现如下手写功能以及清空画布功能。 一、先上效果图&#xff1a; 二、上代码 Entry Component struct Index {//手写路径State pathCommands: string ;build() {Column() {//…

RocketMQ-源码架构

源码环境搭建 1、主要功能模块 RocketMQ官方Git仓库地址&#xff1a;GitHub - apache/rocketmq: Apache RocketMQ is a cloud native messaging and streaming platform, making it simple to build event-driven applications. RocketMQ的官方网站下载&#xff1a;下载 | R…

C++STL算法库中谓词的使用

什么是c的谓词 谓词概念&#xff1a; 谓词函数是一个判断式&#xff0c;一个返回bool值的函数或者仿函数&#xff0c;有几个入参就是几元谓词。一般做一个函数的参数使用【引用自百度百科】。 常见的可以作为谓词的东西&#xff1a;函数、函数指针、函数对象、lambda表达式&am…

2023 年浙江省职业院校技能大赛信息安全管理与评估赛项规程

*2023 年浙江省职业院校技能大赛“高职组”* *“信息安全管理与评估”赛项规程* *一、赛项名称* 赛项名称&#xff1a;信息安全管理与评估 英文名称&#xff1a;Information Security Management and Evaluation 赛项组别&#xff1a;高职 赛项归属产业&#xff1a;电子信…

热电厂发电机组常见故障及预测性维护方法

热电厂的发电机组是关键的能源生产设备&#xff0c;在电力供应中扮演着关键角色。但经过长期运行和高负荷工作&#xff0c;一旦发生故障&#xff0c;可能导致停机、设备损坏甚至引发严重事故。因此&#xff0c;实施有效的预测性维护方法对于确保发电机组的稳定运行至关重要。本…