消息队列设计一个幂等的消费逻辑golang版

如何实现消息幂等

设计幂等的消费逻辑的关键是确保每条消息只被处理一次,即使在网络故障或消费者重启的情况下。通常使用唯一的消息ID和持久化存储来记录处理过的消息ID。

实现步骤

  1. 连接kafka和redis
  2. 检查消息ID
  3. 处理消息
  4. 标记消息已处理
package mainimport ("context""crypto/md5""encoding/hex""fmt""github.com/confluentinc/confluent-kafka-go/kafka""github.com/go-redis/redis/v8""log""time"
)// 初始化Redis客户端
var ctx = context.Background()
var rdb = redis.NewClient(&redis.Options{Addr:     "localhost:6379",Password: "", // no password setDB:       0,  // use default DB
})// 计算消息的唯一ID(可以使用消息的内容或其他标识)
func calculateMessageID(message []byte) string {hash := md5.Sum(message)return hex.EncodeToString(hash[:])
}// 检查消息ID是否已处理
func isMessageProcessed(messageID string) bool {result, err := rdb.Get(ctx, messageID).Result()if err == redis.Nil {return false} else if err != nil {log.Fatalf("Failed to get message ID from Redis: %v", err)}return result == "processed"
}// 标记消息ID为已处理
func markMessageAsProcessed(messageID string) {err := rdb.Set(ctx, messageID, "processed", 0).Err()if err != nil {log.Fatalf("Failed to set message ID in Redis: %v", err)}
}// 处理消息的逻辑
func processMessage(message []byte) {// 在这里添加具体的消息处理逻辑fmt.Printf("Processing message: %s\n", string(message))
}
// 初始化Kafka消费者,读取消息,检查消息ID,处理未处理的消息,并将消息ID标记为已处理。
func main() {consumer, err := kafka.NewConsumer(&kafka.ConfigMap{"bootstrap.servers": "localhost","group.id":          "myGroup","auto.offset.reset": "earliest",})if err != nil {log.Fatalf("Failed to create consumer: %v", err)}defer consumer.Close()consumer.Subscribe("myTopic", nil)for {msg, err := consumer.ReadMessage(-1)if err == nil {messageID := calculateMessageID(msg.Value)if !isMessageProcessed(messageID) {processMessage(msg.Value)markMessageAsProcessed(messageID)} else {fmt.Printf("Message %s already processed\n", messageID)}} else {fmt.Printf("Consumer error: %v (%v)\n", err, msg)}time.Sleep(1 * time.Second) // 可选:添加延迟以防止消息消费过快}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/863599.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

C# StringBuilder

以下是一些基本的 StringBuilder 使用方法:创建 StringBuilder 实例:追加字符串:插入字符串:删除字符串:替换字符串:清空 StringBuilder:转换 StringBuilder 为字符串:使用容量&…

MySQL1(初始数据库 概念 DDL建库建表 数据库的三大范式 表约束)

目录 一、初始数据库 二、概念 三、DDL建库建表 1. 数据库结构 2. SQL语句分类 3. DDL语句操作数据库 注释: 查看数据库: ​编辑创建数据库: 删除数据库: 选择数据库: 4. 数据库表的字段类型 4.1 字符串…

java将html转成图片

java 将html转成图片 1.导入jar2.代码3.展示结果4.注意事项 最近有一个需求需要根据指定的样式生成图片&#xff0c;使用java原生技术有些麻烦&#xff0c;所以上网搜了下案例&#xff0c;最后发现最好用的还是html2image&#xff0c;这里进行简单总结下。 1.导入jar <!-- 用…

android adb命令常用大全

android adb命令大全 1、取全量日志&#xff1a; adb logcat -v threadtime > 1.log2、dos窗口日志异常&#xff0c;使用命令清理下 adb logcat -G 200M3、直接在dos命令窗口查询某个关键字 adb logcat | findStr "KeyWord"4、查看版本号&#xff1a; adb sh…

腾讯视频VIP会员账号怎么扫码登录一个帐号登录几个人的设备?

腾讯视频VIP会员账号怎么扫码登录&#xff1f; 腾讯视频VIP会员账号要想实现扫码登录&#xff0c;仅支持在电脑Web网页版和WindowsPC软件上登录腾讯视频时&#xff0c;才可以实现手机QQ扫码登录腾讯视频。 腾讯视频VIP与SVIP会员有什么区别&#xff1f; 腾讯视频VIP会员&…

前端小白必学:对Cookie、localStorage 和 sessionStorage 的简单理解

前言 Cookie、localStorage 和 sessionStorage 作为Web开发领域中广泛采用的三种客户端数据存储技术&#xff0c;它们各自拥有独特的优势、应用场景及限制条件&#xff0c;共同支撑起前端数据管理的多样化需求。也是面试常考题之一&#xff0c;今天就和大家简单谈一下我对它们…

Python 量化

当涉及到量化金融和数据分析时&#xff0c;Python是一种非常流行的编程语言&#xff0c;因为它拥有丰富的库和工具&#xff0c;适用于处理金融数据、执行统计分析、制定交易策略等任务。下面是一些常用的Python量化金融模块及其功能的详细介绍&#xff1a; ### 1. **pandas** …

揭开大语言模型(LLM)内部运作的算法逻辑

本文探讨了 Anthropic 的突破性技术&#xff0c;以揭示大型语言模型 (LLM) 的内部工作原理&#xff0c;揭示其不透明的本质。通过深入研究LLM Claude Sonnet 的“大脑”&#xff0c;Anthropic 增强了人工智能的安全性和可解释性&#xff0c;为人工智能的决策过程提供了更深入的…

应用部署方式演变

应用部署方式演变 1.传统部署2.虚拟化部署3.容器化部署 1.传统部署 传统的应用程序部署是将多个应用程序直接部署在操作系统上&#xff0c;一旦其中的某个应用程序出现内存泄漏&#xff0c;那么该程序就会大量吞噬系统内容空间&#xff0c;导致其他应用程序无法正常运行。 2.虚…

如何让两个不同网段的直连地址通信(有点意思)

群里一个朋友出了个题&#xff1a;两个路由器接口直连&#xff0c;一个接口IP是1.1.1.1/30&#xff0c;一个接口IP是2.2.2.2/30&#xff0c;如何让它们通信&#xff1f; 群里的朋友们纷纷献策&#xff1a; 1、用PPP方式连接&#xff0c;直接通 2、配对端IP地址同网段的s…

鱼叉式钓鱼

鱼叉式网络钓鱼&#xff1a; 鱼叉式网络钓鱼是一种网络钓鱼形式&#xff0c;它针对特定个人或组织发送定制消息&#xff0c;旨在引发特定反应&#xff0c;例如泄露敏感信息或安装恶意软件。这些攻击高度个性化&#xff0c;使用从各种来源收集的信息&#xff0c;例如社交媒体资…

Face Adapter - 一键面部表情迁移、换脸工具 本地一键整合包下载

Face Adapter是一款高效的人脸编辑适配器&#xff0c;由浙江大学和腾讯联合开发&#xff0c;适用于预先训练的扩散模型&#xff0c;专门针对人脸再现和交换任务。 只需要上传一张源脸和一张参考人脸&#xff0c;就能按照参考人脸的风格生成相同的面部的表情&#xff0c;一键生…

掌握Python编程的深层技能

一、Python基础语法、变量、列表、字典等运用 1.运行python程序的两种方式 1.交互式即时得到程序的运行结果 2.脚本方式把程序写到文件里(约定俗称文件名后缀为.py),然后用python解释器解释执行其中的内容2.python程序运行的三个步骤 python3.8 C:\a\b\c.py 1.先启动python3…

Golang-channel理解

channel golang-channel语雀笔记整理 channelgolang channel的设计动机&#xff1f;chanel的数据结构/设计思考 golang channel的设计动机&#xff1f; channel是一种不同协程之间实现异步通信的数据结构。golang中有一种很经典的说法是要基于通信实现共享内存&#xff0c;而不…

机器学习基础:开源库学习-Numpy科学计算库

目录 Numpy科学计算库 什么是多维数组 数组基础 高维数组 操作和创建数组 Numpy介绍 创建数组 数组的属性 二维数组 三维数组 数组元素的数据类型 创建特殊的数组 np.arange() np.ones() np.zeros() np.eye() np.linspace() np.logspace() asarray() 数组运…

AUTOSAR汽车电子嵌入式编程精讲300篇-智能网联汽车CAN总线-基于电压信号的CAN总线入侵检测系统设计与实现

目录 前言 入侵检测系统研究现状 入侵检测系统建模 CAN总线 入侵检测威胁模型 Deep SVDD模型 入侵检测系统方案设计 挑战和解决方案 差分信号的采集与处理 差分信号的特征提取 入侵检测模型的设计 入侵检测系统性能评估 实验环境设置 不同的车辆状态 不同数量的…

一致性哈希算法golang版本

什么是一致性哈希 一致性哈希&#xff08;Consistent Hashing&#xff09;是一种分布式系统中常用的算法&#xff0c;用于在节点&#xff08;如缓存服务器&#xff09;之间均匀分配数据。它的核心思想是将所有可能的哈希值组织成一个环形结构&#xff0c;并将数据和节点通过哈…

005 参数绑定处理

文章目录 参数绑定默认支持的参数类型参数绑定使用要求简单类型RequestParam注解 绑定POJO类型绑定集合或者数组类型 参数绑定示例JSP代码Controller代码PO代码 自定义日期参数绑定Converter代码Converter配置 文件类型参数绑定加入依赖包上传页面配置Multipart解析器Controlle…

thymeleaf+mybatis(本文章用于期末考前10分钟速看)

期末速看 pom&#xff08;了解&#xff09;application.propertiessql代码Controller控制层视图service&#xff1a; 服务层mapper&#xff08;dao&#xff09;&#xff1a;持久层entity层(model层&#xff0c;domain层、 bean)&#xff1a;对应数据库表&#xff0c;实体类 效果…

谈谈你对AQS的理解

AQS概述 AQS&#xff0c;全称为AbstractQueuedSynchronizer&#xff0c;是Java并发包&#xff08;java.util.concurrent&#xff09;中一个核心的框架&#xff0c;主要用于构建阻塞式锁和相关的同步器&#xff0c;也是构建锁或者其他同步组件的基础框架。AQS提供了一种基于FIF…