golang封装调用kafka的工具包

封装一个golang调用kafka的工具包,包含了consumer,producer,auth,在自己的生产环境上做过验证。可以做参考作用,也可以直接使用。

部分代码

// Run 执行消费动作
func (cg *ConsumerGroup) Run(ctx context.Context) {defer cg.close()for {select {case err := <-cg.consumer.Errors():cg.logger.WithError(err).Errorln("Error channel")cg.handleConsumeError(err)case <-ctx.Done():err := ctx.Err()if err == nil || errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {// 正常退出return}cg.logger.WithError(err).Errorln("上下文异常退出")default:if err := cg.consumer.Consume(ctx, cg.options.topics, cg.options); err != nil {cg.logger.WithError(err).Errorln("Consume Error channel")cg.handleConsumeError(err)}}}
}
func (kc *KafkaClient) CreateTopic(ctx context.Context, topicName string, ops ...TopicOption) (err error) {topicConf := &sarama.TopicDetail{NumPartitions:     3,ReplicationFactor: 1,ConfigEntries: map[string]*string{"cleanup.policy": &TopicTTLPolicy,"retention.ms":   &TopicTTLRetention,},}for _, op := range ops {op(topicConf)}return kc.cli.CreateTopic(topicName, topicConf, false)
}

代码太多,全部写出来不现实,详细代码在这个下载包里,有test文件可以查看是如何调用的

另外需要自己处理消费时的速度,比如用channel控制同时消费的数量
golang调用kafka下载

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/829885.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Linux——(关于权限常见的3个问题)

文章目录 1.修改文件或者目录的拥有者和所属组1.1chown指令1.2chgrp指令 2.常见的权限三个问题2.1对应一个目录&#xff0c;如果要进入&#xff0c;需要什么权限&#xff1f;2.2为什么我们创建的文件默认权限不是7772.2.1关于Linux下的权限掩码 2.3文件能否被删除取决于什么2.3…

Beyond Chain-of-Thought: A Survey of Chain-of-X Paradigms for LLMs论文阅读笔记(未完待续)

地址&#xff1a;https://arxiv.org/html/2404.15676v1 一些论文合集&#xff1a;https://github.com/atfortes/Awesome-LLM-Reasoning 背景 思维链 &#xff08;Chain-of-Thought&#xff0c;CoT&#xff09; 是一种被广泛采用的提示方法&#xff0c;它激发了大型语言模型 …

《HCIP-openEuler实验指导手册》2.1安装和测试Nginx

知识点 Nginx (发音为 “engine x”) 是一个开源的高性能 HTTP 和反向代理服务器&#xff0c;也是一个 IMAP/POP3/SMTP 代理服务器。由 Igor Sysoev 创建并维护&#xff0c;其设计用于处理高并发连接&#xff0c;具有高度的可扩展性和灵活性。 安装步骤 yum方式安装 dn…

读书笔记--数据管理知识体系的阅读总结感悟

最近继续研读DAMA数据管理知识体系之数据管理章节,结合自身在应用系统建设、数据治理工作实践,有所感悟并记录如下,供大家参考。数据管理工作需要技术和非技术的双重技能,由业务人员和信息技术人员相互协作,共同来承担责任,确保组织管理的数据是高质量的,主要驱动力是使…

go语言实现简单ngnix样例

目录 1、代码实现样例&#xff1a; 2、postman调用ngnix&#xff0c;转发&#xff1a; 1、代码实现样例&#xff1a; package mainimport ("bytes""encoding/json""io""log""net/http""net/http/httputil""…

Chapter 1-17. Introduction to Congestion in Storage Networks

Q: Isn’t increasing network capacity the ultimate solution to network congestion? Increasing network capacity is the solution when a lack of network capacity is the root cause of congestion. There are many more reasons for network congestion and in thos…

ruoyi-nbcio-plus基于vue3的flowable收回任务后重新进行提交表单的处理

更多ruoyi-nbcio功能请看演示系统 gitee源代码地址 前后端代码&#xff1a; https://gitee.com/nbacheng/ruoyi-nbcio 演示地址&#xff1a;RuoYi-Nbcio后台管理系统 http://218.75.87.38:9666/ 更多nbcio-boot功能请看演示系统 gitee源代码地址 后端代码&#xff1a; h…

双目视觉(双目相机)

1.时间同步 需要一个单独的硬件&#xff08;单片机&#xff09;单独给每一个相机发送触发信息&#xff0c;然后再接收返回。 2.相机选型&#xff1a; &#xff08;1&#xff09;相机的分辨率 根据对图像精度的要求来选择相机的分辨率。 &#xff08;2&#xff09;颜色 通…

Python AI库 Pandas的常见操作的扩展知识

Python AI库 Pandas的常见操作的扩展知识 本文默认读者具备以下技能&#xff1a; 熟悉python基础知识&#xff0c;vscode或其它编辑工具 熟悉表格文件的基本操作 具备自主扩展学习能力 前文中对Pandas的数据结构以及基础操作做了介绍,本文中会在前文的基础上,对常见的操作进…

selenium拉动滚动条

selenium拉动滚动条 # 导包 from selenium import webdriver from time import sleep # 获取浏览器驱动对象 driver webdriver.Edge() # 最大化浏览器 driver.maximize_window() # 隐式等待 driver.implicitly_wait(30) # 打开url url r"C:\Users\黄永生\Desktop\软件测…

计算机找不到vcruntime140_1.dll,无法继续执行代码快速解决方法

vcruntime140_1.dll是一个重要的Windows操作系统中的动态链接库&#xff08;DLL&#xff09;文件&#xff0c;它是微软Visual C Redistributable软件包的组成部分。以下是该文件的详细介绍&#xff1a; 名称含义&#xff1a;“vcruntime”代表Visual C Runtime&#xff0c;表明…

static page 项目

static page 项目 作者&#xff1a;不染心 博客地址&#xff1a;https://blog.csdn.net/qq_38234785 源码地址&#xff1a;https://mbd.pub/o/bread/ZpWVlJps 未经允许&#xff0c;不得转载 文档版本v1&#xff0c;还没写完持续更新 一、引言 1. 软件概述和背景 本软件是…

STM32f103 HAL库读保护以及解除

读保护 void Flash_EnableReadProtection(void) {FLASH_OBProgramInitTypeDef OBInit;__HAL_FLASH_PREFETCH_BUFFER_DISABLE();HAL_FLASHEx_OBGetConfig(&OBInit);if(OBInit.RDPLevel OB_RDP_LEVEL_0){OBInit.OptionType | OPTIONBYTE_RDP;OBInit.RDPLevel OB_RDP_LEVEL…

FIR滤波器——DSP学习笔记三(包含一个滤波器设计的简明案例)

​​​​​​ 背景知识 FIR滤波器的特性与优点 可精确地实现线性相位响应&#xff08;Linear phase response&#xff09;&#xff0c;无相位失真&#xff1b; 总是稳定的&#xff0c;所有极点都位于原点 线性相位FIR滤波器的性质、类型及零点位置 冲击响应满足&#xff1a;奇…

iframe父子页面通信

目录 一、 创建父页面和子页面 父页面&#xff08;parent.html&#xff09;&#xff1a; 子页面&#xff08;child.html&#xff09;&#xff1a; 二、 实现父子页面之间的通信 在父页面的 在子页面的 三、扩展&#xff1a;postMessage() 方法的语法&#xff1a; 一、 …

(学习日记)2024.05.03:UCOSIII第五十七节:User文件夹函数概览(uCOS-III->Source文件夹)第三部分

之前的章节都是针对某个或某些知识点进行的专项讲解,重点在功能和代码解释。 回到最初开始学μC/OS-III系统时,当时就定下了一个目标,不仅要读懂,还要读透,改造成更适合中国宝宝体质的使用方式。在学完野火的教程后,经过几经思考,最后决定自己锦上添花,再续上几章。 这…

【PyTorch】torch.gather() 用法

gather常被用于image做mask的操作中&#xff0c;对哪些地方进行赋值0/1 API&#xff1a; torch.gather — PyTorch 2.2 documentation torch.gather(input, dim, index, outNone) → Tensor gather()的意义&#xff1a; 顾名思义&#xff0c;聚集、集合&#xff1a;gather…

计算机网络组成—物理层

一、物理层基本概念 物理层解决如何在连接各种计算机的传输媒体上传输数据比特流&#xff0c;而不是指具体的传输媒体。 1物理层接口特性 机械特性&#xff1a;定义物理连接的特性&#xff0c;规定物理连接时所采用的规格、接口形状、引线数目、引脚数量和排列情况电气特性&…

VS2019配合QT5.9开发IRayAT430相机SDK

环境配置 VS2019 QT5.9 编译器版本 MSVC2017_64添加系统环境变量&#xff08;完毕后重启电脑&#xff09; 从VS2019中下载Qt插件 从VS2019中添加单个编译组件 上述操作完成后用VS打开工程文件&#xff0c;工程文件地址 &#xff1a; C:\Users\86173\Desktop\IRCNETSDK_W…

python基础知识分享

1程序结构 1.1分支结构 单分支结构: 例如 if 条件表达式: 语句块 双分支结构: 例如 if 条件表达式: 语句块1 else: 语句块2 多分支路结构: 例如: if 条件表达式: if 条件表达式1: 语句块1 elif 条件表达式2: 语句块2 elif 条件表达式3: 语句块3 ....... else: …