在Go项目中二次封装Kafka客户端功能

1.摘要

在上一章节中,我利用Docker快速搭建了一个Kafka服务,并测试成功Kafka生产者和消费者功能,本章内容尝试在Go项目中对Kafka服务进行封装调用, 实现从Kafka自动接收消息并消费。

在本文中使用了Kafka的一个高性能开源库Sarama, Sarama是一个遵循MIT许可协议的Apache Kafka Go客户端库, 该开源库地址为:GitHub - IBM/sarama: Sarama is a Go library for Apache Kafka.。

2.功能结构组织

为了能在项目中快速使用, 我在项目目录中专门新建了一个名为kafka的文件夹,在该文件夹下新建了四个文件,分别为:

kafka (目录)|----- consumer.go   (消费者方法实现)|----- producer.go   (生产者方法实现)|----- kafka.go      (定义接口)|----- kafka_test.go (单元功能测试)

为方便项目使用,在此基础上做了二次封装。

3.消费者实现

第一步首先定义了一个结构体, 里面包含了Kafka的主机、topic、接收通道和消费者对象信息:

type KafkaConsumer struct {Hosts    string          // Kafka主机IP:端口,例如:192.168.201.206:9092Ctopic   string          // topic名称Kchan    chan string     // 接收信息通道Consumer sarama.Consumer // 消费者对象
}

接下来是消费者初始化函数:

func (k *KafkaConsumer) kafkaInit() {// 定义配置选项 config := sarama.NewConfig()config.Consumer.Return.Errors = trueconfig.Version = sarama.V0_10_2_0// 初始化一个消费对象consumer, err := sarama.NewConsumer(k.Hosts, config)if err != nil {err = errors.New("NewConsumer错误,原因:" + err.Error())fmt.Println(err.Error())return}// 获取所有Topictopics, err := consumer.Topics()if err != nil {fmt.Println(err.Error())return}// 判断是否有自定义的Topicvar topicsName = ""for _, e := range topics {if e == k.Ctopic {topicsName = ebreak}}// 没有自定义的Topic则报错if topicsName == "" {err = errors.New("找不到topics内容")fmt.Println(err.Error())return}// 将消费对象保存到结构体以备后面使用k.Consumer = consumer
}

在上面的初始化函数中, 首先初始化一个消费对象, 然后获取所有的Topic名称,并判断了在这些Topic名称中是否有我自定义的名称,获取成功后则将消费对象保存到我们绑定的结构体中。

接下来是消费监控函数实现,代码如下:

func (k *KafkaConsumer) kafkaProcess() {var wg sync.WaitGroup// 遍历指定Topic分区持续监控消息Partitions, _ := k.Consumer.Partitions(k.Ctopic)for _, subPartitions := range Partitions {pc, err := k.Consumer.ConsumePartition(k.Ctopic, subPartitions, sarama.OffsetNewest)if err != nil {continue}wg.Add(1)go func() {defer wg.Done()// 这里进入另一个函数可以过滤消息内容k.processPartition(pc)}()}wg.Wait()
}

函数processPartition()的实现代码如下:

func (k *KafkaConsumer) processPartition(pc sarama.PartitionConsumer) {defer pc.AsyncClose()for msg := range pc.Messages() {// 这里可以过滤不需要的Topic的信息if strings.Contains(string(msg.Value), "group_state2") {continue}// 这里将获取到的Topic信息发送到通道k.Kchan <- string(msg.Value)}
}

4.生产者实现

为了跟消费者代码配套,这里也同步实现了生产者代码,主要功能是完成工作后,给指定Topic的生产方返回一个指定消息。

定义生产者的结构体如下:

type KafkaProducer struct {hosts         string               // Kafka主机sendmsg       string               // 消费方返回给生产方的消息ptopic        string               // TopicAsyncProducer sarama.AsyncProducer // Kafka生产者接口对象
}

对应的生产者初始化函数实现如下:

func (k *KafkaProducer) kafkaInit() {// 定义配置参数config := sarama.NewConfig()config.Producer.RequiredAcks = sarama.WaitForAllconfig.Producer.Retry.Max = 5config.Producer.Return.Successes = trueconfig.Version = sarama.V0_10_2_0// 初始化一个生产者对象producer, err := sarama.NewAsyncProducer(k.hosts, config)if err != nil {err = errors.New("NewAsyncProducer错误,原因:" + err.Error())fmt.Println(err.Error())return}// 保存对象到结构体k.AsyncProducer = producer
}

给生产者回复信息的函数实现如下:

func (k *KafkaProducer) kafkaProcess() {msg := &sarama.ProducerMessage{Topic: k.ptopic,}// 信息编码msg.Value = sarama.ByteEncoder(k.sendmsg)// 将信息发送给通道k.AsyncProducer.Input() <- msg
}

5.接口定义实现

首先对于生产者和消费者,都有对应的初始化和执行操作,因此定义接口函数如下:

// Kafka方法接口
type IKafkaMethod interface {kafkaInit()     // 初始化方法kafkaProcess()  // 执行方法
}

为了方便管理接口的赋值操作, 这里定义了一个接口管理方法, 并用Set()函数进行接口类型赋值, Run()函数负责运行对应的成员函数:

// 接口管理结构体
type KafkaManager struct {kafkaMethod IKafkaMethod  // 接口对象
}// 定义实现Set方法
func (km *KafkaManager) Set(m IKafkaMethod) {km.kafkaMethod = m  // 将指定的方法赋给接口
}// 定义实现Run方法
func (km *KafkaManager) Run() {km.kafkaMethod.kafkaInit()go km.kafkaMethod.kafkaProcess()
}

最后一部分是供外部调用的函数,首先定义一个结构体,该结构体中保存了Kafka的基础信息和三个对象指针:

type KafkaMessager struct {KafkaManager  *KafkaManager   // 接口管理对象指针KafkaProducer *KafkaProducer  // 生产者对象指针KafkaConsumer *KafkaConsumer  // 消费者对象指针Hosts         string          // Kafka主机topic         string          // topic
}// 供外部调用初始化的函数,传入Kafka主机IP和Topic,返回操作对象指针,并初始化结构体成员变量
func NewKafkaMessager(hosts, topic string) *KafkaMessager {km := &KafkaMessager{KafkaManager:  new(KafkaManager),KafkaProducer: new(KafkaProducer),KafkaConsumer: new(KafkaConsumer),Hosts:         hosts,topic:         topic,}return km
}

6.功能调用和验证

在Kafka_test.go文件中,定义一个用于单元测试的函数,格式如下:

func TestKafka(t *testing.T) {....
}

使用单元测试函数的好处是可以单独调试, 专注核心功能本身。

我使用的编辑器是Goland, 在TestKafka函数前面有个三角形小图标,点击可以选择各种调试选项,如图:

下面是我模拟用户调用的客户端代码片段:

// 这里选择我自己搭建的Kafka所在服务器,Topic为test123
// 注意:这里的hosts格式是IP:端口的格式,例如:192.168.201.206:9092
hosts := "192.168.201.206:9092"
topic := "test123"// 调用初始化函数,并将上面的内容作为参数传进去
nkm := NewKafkaMessager(hosts, topic)// 初始化消费者,当生产者发出消息,消费者自动消费
nkm.KafkaConsumer.Hosts = hosts             // 消费者host赋值
nkm.KafkaConsumer.Ctopic = topic            // 消费者topic赋值
nkm.KafkaConsumer.Kchan = make(chan string) // 初始化消息通道
nkm.KafkaManager.Set(nkm.KafkaConsumer)     // 接口赋值,设置成操作消费者方法
nkm.KafkaManager.Run()                  // 执行消费者初始化方法// 监听通道,接收生产客户端发过来的消息
recv := <- nkm.KafkaConsumer.Kchan
fmt.Println(recv)  // 打印接收到的消息

现在我们可以选择直接运行程序了,然后在Kafka的生产者控制台中输入字符:Hello,Goland发送:

可以看到,我们的程序成功接收到Kafka生产者发送过来的信息。

--- END --

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/119230.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

InstructionGPT

之前是写在[LLM&#xff1a;提示学习Prompt Learning]里的&#xff0c;抽出来单独讲一下。 基本原理 在做下游的任务时&#xff0c;我们发现GPT-3有很强大的能力&#xff0c;但是只要人类说的话不属于GPT-3的范式&#xff0c;他几乎无法理解。例如&#xff0c;我们说把句子A变…

Android Studio Logcat日志VIVO手机显示*号问题

咨询VIVO客服 1、拨盘输入 *#06# 获取串码&#xff0c;发送给客服 2、拨号盘输入*#*#112#*#*-右上角菜单-更多-一键授权 注意不要刷机&#xff0c;恢复出厂设置&#xff0c;手动取消授权哦

【Linux】Linux任务管理与守护进程

Linux任务管理与守护进程 一、任务管理1、进程组概念2、作业概念3、会话概念4、相关操作 二、守护进程1、守护进程的创建2、守护进程的库函数 一、任务管理 1、进程组概念 在Linux中&#xff0c;每个进程除了有一个进程ID之外&#xff0c;还有一个属性是进程组(PGID)&#xff…

CAD迷你看图 mac v4.4.5

CAD迷你看图是一款小巧的DWG文件浏览小工具&#xff0c;支持AutoCAD DWG/DXF等常用图纸文件&#xff0c;可脱离AutoCAD快速浏览DWG图纸&#xff0c;并提供了平移、缩放、全屏等功能。该软件采用独特的云技术&#xff0c;根据不同DWG图纸的需要自动装载相应字体&#xff0c;解决…

木马文件检测系统 毕业设计 JAVA+Vue+SpringBoot+MySQL

项目编号&#xff1a;S041&#xff0c;源码已在 Bilibili 中上架&#xff0c;需要的朋友请自行下载。 https://gf.bilibili.com/item/detail/1104375029为了帮助小白入门 Java&#xff0c;博主录制了本项目配套的《项目手把手启动教程》&#xff0c;希望能给同学们带来帮助。 …

Linux网络编程:IP协议

目录 一. IP协议的功能 二. IP协议报头 2.1 IP报头的格式 2.2 IP报头各部分含义 三. IP报文的分片问题 3.1 什么是分片 3.2 分片的原理 3.3 合并报文 四. 网段划分 4.1 网络号和主机号 4.2 网络号和主机号的划分策略 4.3 特殊的IP地址 4.4 IP地址数量不足问题 五.…

「滚雪球学Java」:Java前序篇(章节汇总)

&#x1f3c6;本文收录于「滚雪球学Java」专栏&#xff0c;专业攻坚指数级提升&#xff0c;助你一臂之力&#xff0c;带你早日登顶&#x1f680;&#xff0c;欢迎大家关注&&收藏&#xff01;持续更新中&#xff0c;up&#xff01;up&#xff01;up&#xff01;&#xf…

竞赛选题 深度学习卷积神经网络垃圾分类系统 - 深度学习 神经网络 图像识别 垃圾分类 算法 小程序

文章目录 0 简介1 背景意义2 数据集3 数据探索4 数据增广(数据集补充)5 垃圾图像分类5.1 迁移学习5.1.1 什么是迁移学习&#xff1f;5.1.2 为什么要迁移学习&#xff1f; 5.2 模型选择5.3 训练环境5.3.1 硬件配置5.3.2 软件配置 5.4 训练过程5.5 模型分类效果(PC端) 6 构建垃圾…

文心一言 VS 讯飞星火 VS chatgpt (121)-- 算法导论10.4 2题

二、用go语言&#xff0c;给定一个n结点的二叉树&#xff0c;写出一个 O(n)时间的递归过程&#xff0c;将该树每个结点的关键字输出。 文心一言&#xff1a; 在Go语言中&#xff0c;你可以使用递归函数来遍历二叉树的所有节点&#xff0c;并输出每个节点的关键字。以下是一个…

OpenLayers入门,OpenLayers从vue的assets资源路径加载geojson文件并解析数据叠加到地图上,以加载世界各国边界为例

专栏目录: OpenLayers入门教程汇总目录 前言 本章以加载世界各国边界的GeoJSON格式数据为例,讲解如何使用OpenLayers从vue的assets资源路径加载geojson文件并解析数据叠加到地图上。 二、依赖和使用 "ol": "^6.15.1"使用npm安装依赖npm install ol@…

redis爆满导致数据丢失

记一则redis爆满导致数据丢失的一场事故 某功能上线后&#xff0c;发现出现问题&#xff0c;最后定位到了 redis. 由于存储的数据过多&#xff0c;导致阿里云4G大小的 redis 爆满&#xff0c;触发了回收策略。 于是临时扩容,运维同学当时未找到阿里云配置。 后面我用工具连接了…

【递归、搜索与回溯算法】第七节.257. 二叉树的所有路径和46. 全排列

作者简介&#xff1a;大家好&#xff0c;我是未央&#xff1b; 博客首页&#xff1a;未央.303 系列专栏&#xff1a;递归、搜索与回溯算法 每日一句&#xff1a;人的一生&#xff0c;可以有所作为的时机只有一次&#xff0c;那就是现在&#xff01;&#xff01;&#xff01;&am…

人工智能的发展方向:探索智能未来的无限可能

原创 | 文 BFT机器人 人工智能&#xff0c;简称AI&#xff0c;是一门专注于研究计算机如何能像人类一样思考、学习和解决问题的科学。它的创造初衷是构建一个智能系统&#xff0c;能模仿、模拟甚至实现人工智能的各种功能和行为&#xff0c;随着科技的持续进步&#xff0c;人工…

低成本IC上岸攻略—IC设计网课白嫖篇

数字电路基础 清华大学 王红主讲&#xff1a;数字电子技术基础 西安电子科技大学 任爱锋主讲&#xff1a;数字电路与逻辑设计 模拟电路基础 上交大 郑益慧主讲&#xff1a;模拟电子技术基础 清华大学 华成英主讲&#xff1a;模拟电子技术基础 半导体物理&#xff1a; 西…

java--死循环与循环嵌套

1.死循环 可以一直执行下去的一种循环&#xff0c;如果没有干预不会停下来的 2.死循环的写法 3.循环嵌套 循环中又包含循环 4.循环嵌套的特点 外部循环每循环一次&#xff0c;内部循环会全部执行完一轮

【QT】对象树

一、QT对象树的概念 先来看一下 QObject 的构造函数&#xff1a; 通过帮助文档我们可以看到&#xff0c;QObject 的构造函数中会传入一个 Parent 父对象指针&#xff0c;children() 函数返回 QObjectList。即每一个 QObject 对象有且仅有一个父对象&#xff0c;但可以有很多个…

美颜SDK集成指南:为应用添加视频美颜功能

随着社交媒体和直播应用的兴起&#xff0c;视频美颜功能已成为用户追求的一项热门特性。用户希望能够在拍摄照片或进行实时视频直播时&#xff0c;使用美颜功能来增强其外观。为了满足这一需求&#xff0c;开发者可以考虑集成美颜SDK&#xff0c;为其应用增加这一吸引人的功能。…

uniapp接口请求api封装,规范化调用

封装规范和vue中的差不多&#xff0c;都是统一封装成一个request对象&#xff0c;然后在api.js里面调用。 先创建一个utils文件夹&#xff0c;然后里面创建一个request.js&#xff0c;代码如下&#xff1a; export const baseURL 基础url地址const request (options) > …

面试题:线程池执行的用户任务抛出异常会怎样?

文章目录 ThreadPoolExecutor.execute源码分析 ThreadPoolExecutor.submit源码分析 ScheduledThreadPoolExecutor.schedule源码分析 思考&#xff1a;ThreadPoolExecutor.execute发生异常时为什么要退出 ThreadPoolExecutor.execute 源码分析 看源码可以知道&#xff0c;Thre…

VScode连接的服务器上使用jupyter显示请选择内核源

问题复现 我实在VScode上用ssh-remote连接的服务器&#xff0c;想用.ipynb文件上写东西&#xff0c;结果窗口上方弹出一个输入框&#xff0c;“请键入以选择内核”&#xff1b; 在扩展里找到jupyter更新一下 之前左边的图标是灰色的&#xff0c;后来我下下载了新的版本&#…