kafka实现延迟队列

前言

首先说一下延迟队列这个东西,实际上实现他的方法有很多,kafka实现并不是一个最好的选择,例如redis的zset可以实现,rocketmq天然的可以实现,rabbitmq也可以实现。如果切换前几种方案成本高的情况下,那么就使用kafka实现,实际上kafka实现延迟队列也是借用了rocketmq的延迟队列思想,rocketmq的延迟时间是固定的几个,并不是自定义的,但是kafka可以实现自定义的延迟时间,但是不能过多,因为是依据topic实现的,接下来我使用go实现简单的kafka的延迟队列。

实现方案

1、首先创建两个topic、一个delayTopic、一个realTopic

2、生产者把消息先发送到delayTopic

3、延迟服务再把delayTopic里面的消息超过我们所设置的时间写入到realTopic

4、消费者再消费realTopic里面的数据即可

具体实现

1、生产者发送消息到延迟队列
msg := &sarama.ProducerMessage{Topic:     kafka.DelayTopic,Timestamp: time.Now(),Key:       sarama.StringEncoder("rta_key"),Value:     sarama.StringEncoder(riStr),}partition, offset, err := kafka.KafkaDelayQueue.SendMessage(msg)
2、延迟服务的消费者(消费延迟队列里面的数据到real队列)
const (DelayTime  = time.Minute * 5DelayTopic = "delayTopic"RealTopic  = "realTopic"
)// KafkaDelayQueueProducer 延迟队列生产者,包含了生产者和延迟服务
type KafkaDelayQueueProducer struct {producer   sarama.SyncProducer // 生产者delayTopic string              // 延迟服务主题
}// NewKafkaDelayQueueProducer 创建延迟队列生产者
// producer 生产者
// delayServiceConsumerGroup 延迟服务消费者组
// delayTime 延迟时间
// delayTopic 延迟服务主题
// realTopic 真实队列主题
func NewKafkaDelayQueueProducer(producer sarama.SyncProducer, delayServiceConsumerGroup sarama.ConsumerGroup,delayTime time.Duration, delayTopic, realTopic string, log *log) *KafkaDelayQueueProducer {var (signals = make(chan os.Signal, 1))signal.Notify(signals, syscall.SIGTERM, syscall.SIGINT, os.Interrupt)// 启动延迟服务consumer := NewDelayServiceConsumer(producer, delayTime, realTopic, log)log.Info("[NewKafkaDelayQueueProducer] delay queue consumer start")go func() {for {if err := delayServiceConsumerGroup.Consume(context.Background(),[]string{delayTopic}, consumer); err != nil {log.Error("[NewKafkaDelayQueueProducer] delay queue consumer failed,err: ", zap.Error(err))break}time.Sleep(2 * time.Second)log.Info("[NewKafkaDelayQueueProducer] 检测消费函数是否一直执行")// 检查是否接收到中断信号,如果是则退出循环select {case sin := <-signals:consumer.Logger.Info("[NewKafkaDelayQueueProducer]get signal,", zap.Any("signal", sin))returndefault:}}log.Info("[NewKafkaDelayQueueProducer] consumer func exit")}()log.Info("[NewKafkaDelayQueueProducer] return KafkaDelayQueueProducer")return &KafkaDelayQueueProducer{producer:   producer,delayTopic: delayTopic,}
}// SendMessage 发送消息
func (q *KafkaDelayQueueProducer) SendMessage(msg *sarama.ProducerMessage) (partition int32, offset int64, err error) {msg.Topic = q.delayTopicreturn q.producer.SendMessage(msg)
}// DelayServiceConsumer 延迟服务消费者
type DelayServiceConsumer struct {producer  sarama.SyncProducerdelay     time.DurationrealTopic stringLogger    *log.DomobLog
}func NewDelayServiceConsumer(producer sarama.SyncProducer, delay time.Duration,realTopic string, log *log.DomobLog) *DelayServiceConsumer {return &DelayServiceConsumer{producer:  producer,delay:     delay,realTopic: realTopic,Logger:    log,}
}func (c *DelayServiceConsumer) ConsumeClaim(session sarama.ConsumerGroupSession,claim sarama.ConsumerGroupClaim) error {c.Logger.Info("[delaye ConsumerClaim] cc")for message := range claim.Messages() {// 如果消息已经超时,把消息发送到真实队列now := time.Now()c.Logger.Info("[delay ConsumeClaim] out",zap.Any("send real topic res", now.Sub(message.Timestamp) >= c.delay),zap.Any("message.Timestamp", message.Timestamp),zap.Any("c.delay", c.delay),zap.Any("claim.Messages len", len(claim.Messages())),zap.Any("sub:", now.Sub(message.Timestamp)),zap.Any("meskey:", message.Key),zap.Any("message:", string(message.Value)),)if now.Sub(message.Timestamp) >= c.delay {c.Logger.Info("[delay ConsumeClaim] jinlai", zap.Any("mes", string(message.Value)))_, _, err := c.producer.SendMessage(&sarama.ProducerMessage{Topic:     c.realTopic,Timestamp: message.Timestamp,Key:       sarama.ByteEncoder(message.Key),Value:     sarama.ByteEncoder(message.Value),})if err != nil {c.Logger.Info("[delay ConsumeClaim] delay already send to real topic failed", zap.Error(err))return nil}if err == nil {session.MarkMessage(message, "")c.Logger.Info("[delay ConsumeClaim] delay already send to real topic success")continue}}// 否则休眠一秒time.Sleep(time.Second)return nil}c.Logger.Info("[delay ConsumeClaim] ph",zap.Any("partitiion", claim.Partition()),zap.Any("HighWaterMarkOffset", claim.HighWaterMarkOffset()))c.Logger.Info("[delay ConsumeClaim] delay consumer end")return nil
}func (c *DelayServiceConsumer) Setup(sarama.ConsumerGroupSession) error {return nil
}func (c *DelayServiceConsumer) Cleanup(sarama.ConsumerGroupSession) error {return nil
}

这个方法整体逻辑就是不断消费延迟队列里面的消息,判断消息时间是否大于现在,如果大于现在说明消息超时了,就把该消息发送到真实的队列里面去了,真实队列是一直在消费的。如果没超时的话就不会标记消息,还会重新消费,消费成功会标记该消息。

重点:我在测试的时候是一秒拉一次消息,但这个也不是太准时,不过最终结果差距不大,想知道具体怎么消费的可以自己debug

3、真实队列里面的消费逻辑

type ConsumerRta struct {Logger *log
}func ConsumerToRequestRta(consumerGroup sarama.ConsumerGroup, lg *log) {var (signals = make(chan os.Signal, 1)wg = &sync.WaitGroup{})signal.Notify(signals, syscall.SIGTERM, syscall.SIGINT, os.Interrupt)wg.Add(1)// 启动消费者协程go func() {defer wg.Done()consumer := NewConsumerRta(lg)consumer.Logger.Info("[ConsumerToRequestRta] consumer group start")// 执行消费者组消费for {if err := consumerGroup.Consume(context.Background(), []string{kafka.RealTopic}, consumer); err != nil {consumer.Logger.Error("[ConsumerToRequestRta] Error from consumer group:", zap.Error(err))break}time.Sleep(2 * time.Second) // 等待一段时间后重试// 检查是否接收到中断信号,如果是则退出循环select {case sin := <-signals:consumer.Logger.Info("get signal,", zap.Any("signal", sin))returndefault:}}}()wg.Wait()lg.Info("[ConsumerToRequestRta] consumer end & exit")
}func NewConsumerRta(lg *log) *ConsumerRta {return &ConsumerRta{Logger: lg,}
}func (c *ConsumerRta) ConsumeClaim(session sarama.ConsumerGroupSession,claim sarama.ConsumerGroupClaim) error {for message := range claim.Messages() {// 消费逻辑session.MarkMessage(message, "")return nil}return nil
}func (c *ConsumerRta) Setup(sarama.ConsumerGroupSession) error {return nil
}func (c *ConsumerRta) Cleanup(sarama.ConsumerGroupSession) error {return nil
}
4、kafka配置
type KafkaConfig struct {BrokerList []stringTopic      []stringGroupId    []stringCfg        *sarama.ConfigPemPath    stringKeyPath    stringCaPemPath  string
}var (Producer           sarama.SyncProducerConsumerGroupReal  sarama.ConsumerGroupConsumerGroupDelay sarama.ConsumerGroupKafkaDelayQueue    *KafkaDelayQueueProducer
)func NewKafkaConfig(cfg KafkaConfig) (err error) {Producer, err = sarama.NewSyncProducer(cfg.BrokerList, cfg.Cfg)if err != nil {return err}ConsumerGroupReal, err = sarama.NewConsumerGroup(cfg.BrokerList, cfg.GroupId[0], cfg.Cfg)if err != nil {return err}ConsumerGroupDelay, err = sarama.NewConsumerGroup(cfg.BrokerList, cfg.GroupId[1], cfg.Cfg)if err != nil {return err}return nil
}func GetKafkaDelayQueue(log *log) {KafkaDelayQueue = NewKafkaDelayQueueProducer(Producer, ConsumerGroupDelay, DelayTime, DelayTopic, RealTopic, log)
}

这个里面我没有怎么封装,可以自行封装,使用的是IBM的sarama客户端

总结

基本上就是以上三步实现,里面的一些log日志可以传递自己的log日志即可,使用的是消费者组消费的,添加上自己的topic和groupid即可

重点:以上实现延迟时间可能不是太精准,我使用的时候还是有点小小的误差,不过误差不大,强相关业务还是使用其他专业实现延迟队列mq,或使用自行方案

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/658804.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

OSDI 2023: LVMT: An Efficient Authenticated Storage for Blockchain

我们使用以下6个分类标准对本文的研究选题进行分析: 1. 研究方向: 区块链可扩展性: 提高交易吞吐量和减少确认时间的研究,例如零知识证明、分片和状态通道。密码学技术: 开发或改进用于区块链应用的新密码原语,例如椭圆曲线、承诺方案和累加器。区块链存储和效率: 优化区块…

02神经网络的学习及代码实现

“学习”是指从训练数据中自动获取最优权重参数的过程。引入损失函数指标&#xff0c;学习的目的是以该损失函数为基准&#xff0c;找出尽可能小的损失函数的值。 1、从数据中学习 从数据中学习规律&#xff0c;模式&#xff0c;避免人为介入。 先从图像中提取特征量&#x…

【GAMES101】Lecture 13 光线追踪 Whitted-Style

目录 光线追踪 基本的光线追踪算法 Whitted-Style光线追踪 求曲面交点 求三角形交点 Mller Trumbore Algorithm&#xff08;MT算法&#xff09; 光线追踪 这里讲一下为什么我们需要光线追踪&#xff0c;主要是因为光栅化没有办法很好的处理全局的光照效果&#xff0c;就…

MySQL备份和恢复(二)mysqldump

注意&#xff1a;mysqldump是完全备份 一、mysqldump备份命令 1、 备份数据库 含创建库语句 &#xff08;1&#xff09;备份指定数据库 完全备份一个或多个完整的库&#xff0c; mysqldump -uroot -p[密码] --databases 库名1 [库名2].. >/备份路径/备份文件名.sql#导出…

如何恢复已删除的照片?

在这篇综合文章中发现恢复丢失照片的有效且免费的方法。无论您使用的是智能手机、iPhone、Windows 计算机、Mac、SD 卡还是数码相机&#xff0c;我们都提供有关如何恢复已删除照片的分步说明。此外&#xff0c;学习一些有价值的技巧&#xff0c;以防止将来意外删除照片。 意外…

2024.1.28周报

目录 摘要 ABSTRACT 一、文献阅读 1、题目 2、摘要 3、解决的问题 4、算法模型 5、总结 二、PINN方法 三、PINN神经网络源码 总结 摘要 本周我阅读了一篇题目为Physics Informed Deep Learning (Part I): Data-driven Solutions of Nonlinear Partial Differential…

配置vite自动按需引入 vant 组件

为什么学 按需加载可以减少包体积,优化加载性能 学习内容 全局注册组件 import 需要的组件import 组件样式使用 app.use 注册组件 Tree Shaking 介绍使用 什么是 tree shaking&#xff1f; Tree shaking是一种优化技术&#xff0c;用于减少JavaScript或其他编程语言中未被使用…

【2024程序员必看】鸿蒙应用开发行业分析

鸿蒙操作系统沉浸四年&#xff0c;这次终于迎来了破局的机会&#xff0c;自从2023年华为秋季发布会上宣布鸿蒙 Next操作系统不在兼容Android后&#xff0c;就有不少大厂开始陆续与华为达成了鸿蒙原生应用的开发合作&#xff0c;据1月18日华为官方宣布110多天的产业合力“突进”…

python+selenium自动化测试项目实战

说明&#xff1a;本项目采用流程控制思想&#xff0c;未引用unittest&pytest等单元测试框架 一.项目介绍 目的 测试某官方网站登录功能模块可以正常使用 用例 1.输入格式正确的用户名和正确的密码&#xff0c;验证是否登录成功&#xff1b; 2.输入格式正确的用户名和不…

【C语言】(12)指针

指针在C语言中是一个非常重要的概念&#xff0c;它为程序员提供了直接访问内存的能力&#xff0c;使得数据操作更加灵活高效。理解并正确使用指针是掌握C语言的关键之一。 1. 指针的基本概念 指针本质上是一个变量&#xff0c;其存储的是另一个变量的内存地址。通过指针&…

单例模式有几种写法?请谈谈你的理解?

为什么有单例模式&#xff1f; 单例模式&#xff08;Singleton&#xff09;&#xff0c;也叫单子模式&#xff0c;是一种常用的软件设计模式。在应用这个模式时&#xff0c;单例对象的类必须保证只有一个实例存在。许多时候整个系统只需要拥有一个全局对象&#xff0c;这样有利…

测试用例的书写方式以及测试模板大全

一个优秀的测试用例&#xff0c;应该包含以下信息&#xff1a; 1 &#xff09; 软件或项目的名称 2 &#xff09; 软件或项目的版本&#xff08;内部版本号&#xff09; 3 &#xff09; 功能模块名 4 &#xff09; 测试用例的简单描述&#xff0c;即该用例执行的目的或方法…

SpringMVC实现对网页的访问,在请求控制器中创建处理请求的方法

目录 测试HelloWorld RequestMapping注解 RequestMapping注解的位置 RequestMapping注解的value属性 RequestMapping注解的method属性 SpringMVC支持路径中的占位符&#xff08;重点&#xff09; SpringMVC获取请求参数 1、通过ServletAPI获取 2、通过控制器方法的形参…

头歌C++之while循环性质

目录 第1关:求1到n间所有整数的和 本关必读 本关任务 测试说明 第2关:计算x的n次方 本关必读 本关任务 测试说明 第3关:求给定正整数的“亲密对数” 本关必读 本关任务 测试说明 第4关:判断正整数n的各位数字中是否包含数字3或4

Spring-boot项目+Rancher6.3部署+Nacos配置中心+Rureka注册中心+Harbor镜像仓库+NFS存储

目录 一、项目概述二、环境三、部署流程3.1 Harbor部署3.1.1 docker安装3.1.2 docker-compose安装3.1.3 安装证书3.1.4 Harbor下载配置安装 3.2 NFS存储搭建3.3 Rancher平台配置3.3.1 NFS存储相关配置3.3.2 Harbor相关配置3.3.3 Nacos部署及相关配置3.3.4 工作负载deployment配…

Vue3+vite引入Tailwind CSS

Tailwind CSS 是一个为快速创建定制化 UI 组件而设计的实用型框架。与其他 CSS 框架或库不同&#xff0c;Tailwind CSS 组件没有预先设置好样式。可以使用 Tailwind 的低级实用类来为 CSS 元素设置样式&#xff0c;如 margin、flex、color 等。 自从 2017 年发布以来&#xff…

嵌入式学习第十五天

内存管理: 1.malloc void *malloc(size_t size); 功能: 申请堆区空间 参数: size:申请堆区空间的大小 返回值: 返回获得的空间的首地址 失败返回NULL 2.free void free(void *ptr); 功能: 释放堆区空间 注…

五大架构风格之一:数据流风格

数据流风格详细介绍 系统架构数据流风格是一种软件体系结构风格&#xff0c;它强调了系统内部不同部分之间的数据流动。这种风格侧重于描述系统中的数据处理过程&#xff0c;以及数据是如何从一个组件传递到另一个组件的。以下是系统架构数据流风格的详细介绍&#xff1a; 1 基…

vue3项目下载@element-plus/icons-vue苦笑不得的乌龙

一、背景 node.js版本&#xff1a;v16.20.1 npm版本&#xff1a;8.19.4 pnpm版本&#xff1a;8.0.0 二、心路历程 pnpm install element-plus/icons-vue 用命令下载element-plus/icons-vue的时候&#xff0c;报错并提醒如图 是&#xff0c;我按照提示执行了&#xff0c;结…

【美团】1688事业部-高级开发工程师JAVA-杭州

所属部门: 淘天集团 | 学历: 本科 | 工作年限: 1 年 职位描述 参与建设基于产业带LBS物理信息实时数据的全新B类专业导购模式&#xff0c;建设1688的产业地图导购中心化场景&#xff0c;为线上下做融合提供交互技术窗口&#xff1b;主导构建商家可提报和小二可运营的榜单技术支…