nsq中diskqueue详解 - 第三篇

前面一篇博客 nsq中diskqueue详解 - 第二篇_YZF_Kevin的博客-CSDN博客 我们讲了diskqueue的两种文件存储格式,diskqueue的启动入口,元数据文件的读取和写入,如果你还没了解过,强烈建议先看一下

这篇博客,我们重点讲diskqueue的定义,以及最核心的ioloop循环,写入消息的处理,读取消息的处理等等

1. diskqueue的定义

先看下diskqueue的源码定义,如下(已加详细注释)

// FIFO的持久化队列
type diskQueue struct {// 64bit atomic vars need to be first for proper alignment on 32bit platformsreadPos      		int64			// 读文件的读取点writePos     		int64			// 写文件的写入点(也是当前文件的总写入字节数)readFileNum  		int64			// 当前要读取的文件编号writeFileNum 		int64			// 当前要写入的文件编号depth        		int64			// 总的消息数(注意不是当前文件的写入消息数,而是全部的)sync.RWMutex						// 读写锁,对exitFlag等标记的读写时需加锁name                string			// 队列实例名字(只是打印时用,用来跟其他队列有所区别,nsq中使用topic+channel来标识一个队列)dataPath            string 			// 文件所在路径maxBytesPerFile     int64  			// 一个文件的最大字节数(外部传入)maxBytesPerFileRead int64			// 当前文件最大可读字节数(跟maxBytesPerFile不同,因为一个文件限定100M,可能实际只写了99.97M,读取时只能以99.97M为准)minMsgSize          int32			// msg的最小字节数(外部传入)maxMsgSize          int32			// msg的最大字节数(外部传入)syncEvery           int64         	// 读写多少次触发刷到磁盘syncTimeout         time.Duration 	// 同步到文件的间隔(触发时还会判断是否有变化,有变化才会真的刷)exitFlag            int32			// 退出标记,1表正在退出,不再接收新数据写入,ioloop()协程也退出needSync            bool			// 标记需同步到磁盘nextReadPos     	int64			// 下次应该读的位置(不直接更新读位置是因为消息还没投递给外部,不算真正读完成,如果外部没接收,下次还得从readPos读。// 只有外部接收完成,才算消息投递完成,才可以把 readPos 更新为 nextReadPosnextReadFileNum 	int64			// 下次应该读的文件号(原因同上)readFile  			*os.File		// 读文件的文件指针writeFile 			*os.File		// 写文件的文件指针reader    			*bufio.Reader	// 读文件对象的readerwriteBuf  			bytes.Buffer	// 写文件对象的bufferreadChan 			chan []byte		// 只读的通道,通过ReadChan()返回给外面使用(无缓冲,压入数据后只能等外部使用者取走后才能继续)peekChan 			chan []byte		// 查看的通道,通过PeekChan()返回给外面使用// 内部使用的通道depthChan         	chan int64		// 存放当前总消息数的通道writeChan         	chan []byte		// 接收外部数据的通道(无缓冲,所以压入后阻塞等待ioloop()循环的处理)writeResponseChan 	chan error		// 接收外部数据后回应的通道(无缓冲)emptyChan         	chan int		// 清空队列信号的通道(当外部调用Empty()时,会往该通道写1,ioloop()循环中读取后会执行清空操作)emptyResponseChan 	chan error		// 清空队列信号结果的通道exitChan          	chan int		// 退出通道exitSyncChan      	chan int		// 退出结果的通道logf 				AppLogFunc		// 日志函数(外部传入)
}

都是一些变量定义,已经加了详细的注释,这里不再一一介绍,只是总结下

1. 前面的5个变量 readPos, writePos, readFileNum, writeFileNum, depth不用多介绍,上一篇博客已经讲过了,都是元数据的字段,描述队列的整体信息

2. 至于name, dataPath,maxBytesPerFile, minMsgSize, maxMsgSize, syncEvery, syncTimieout,这7个变量都是diskqueue启动时外部传入的,主要是设置队列属性

3. maxBytesPerFileRead 不同于 maxBytesPerFile:

        maxBytesPerFile 是所有消息数据文件的最大写入字节数,是个限制值,比如100M

        maxBytesPerFileRead 是当前读文件的最大可读字节数,这个是实际值,每当读一个新文件时都会获取文件的实际大小,实际只能读这么多,比如文件实际是99.97M,那只能读这么多

4.  needSync是标记是否需同步磁盘,读写次数超过限定的syncEvery时,或需新建写文件时都要置为true,ioloop()循环就会执行一次强制刷新到磁盘

5. nextReadPos和nextReadFileNum,我们一般会认为,数据读取后不就可以后移读位置了么?其实不是这样的,因为这里读取完毕后仅仅是尝试压入readChan,不代表外部真的接收了。举例:这里读取成功后尝试压入到readChan,readChan是个无缓冲的通道,但外部一直没接收最后还关闭了自己,下次再从这里读消息时我们还要投递这个消息,也就是说读取位置不能变;nextReadFileNum同理,只有上一个文件全部读取且外部接收成功了,这里才能读新文件

6. readChan和peekChan,对外提供的ReadChan()是获取一个只读通道,这里读取一个消息后压入readChan,等待外部接收后才返回,后移读位置,再读取下一个消息;对外提供的PeekChan()也是获取一个只读通道,等待外部接收后才返回,但是不后移读位置,也不会触发读下一个消息,也就是说PeekChan()正如其名,仅仅提供查看功能

2. diskqueue的核心ioloop()

我们先说下ioloop()函数的运作机制,这样大家先做到心里有数,再看源码就很轻松了

1. 建立一个定时器,触发时间为外部传入的syncTimeout,定时器触发时检测有没有读写发生,有则同步一次磁盘

2. writeChan是接收外部写入的通道,外部写入后,本函数会在select中读取到,调用writeOne()写入到当前的写文件(如果发现文件即将超上限,就新开一个文件写),count值+1

3. readChan是给外部读取的通道,本函数总是会读取一个消息往readChan中压入,等待外部接收,外部接收成功后,本函数会在select中返回,就再读取下一个消息(如果发现当前读文件已读完,就读下一个文件),count值+1

4. 每读取一个消息,每写入一个消息,count都会加1,cout值达到外部传入的syncEvery,就同步一次磁盘

5. emptyChan 用来接收外部的清空信号,本函数的select中接收到就进行清空操作

6. exitChan 用来接收外部的退出信号,本函数的select中接收到就进行退出操作

好了,上面的6个操作就是ioloop()函数的核心了,现在贴代码(已添加详细注释)

// 独立协程运行
func (d *diskQueue) ioLoop() {var dataRead []byte		// 存储每次读取的数据var err 	errorvar count 	int64		// 每read,write一次,count值+1,满syncEvery则往磁盘同步一次var r 		chan []byte	// 对外的读消息通道(为nil时说明没有数据可读)var p 		chan []byte	// 对外的查看消息通道(为nil时说明没有数据可读)// 同步定时器syncTicker := time.NewTicker(d.syncTimeout)for {// count值够了就标记需同步if count == d.syncEvery {d.needSync = true}// 发现需同步if d.needSync {err = d.sync()	// 同步到磁盘(该函数在同步磁盘后会把needSync置为false)if err != nil {d.logf(ERROR, "DISKQUEUE(%s) failed to sync - %s", d.name, err)}count = 0		// count重新从0开始计}// 有消息可读的条件:读的是旧文件 或者 读的位置比写的位置小if (d.readFileNum < d.writeFileNum) || (d.readPos < d.writePos) {// 读位置已经移动(说明外部取走了数据),才可读下一个消息if d.nextReadPos == d.readPos {dataRead, err = d.readOne()if err != nil {d.logf(ERROR, "DISKQUEUE(%s) reading at %d of %s - %s", d.name, d.readPos, d.fileName(d.readFileNum), err)// 读出错的处理d.handleReadError()continue}}// 赋值通道r = d.readChanp = d.peekChan} else {	// 不可读的时候,把r,p置为nil,确保下面的select会直接跳过,外部使用者在select中判断时也会跳过r = nilp = nil}// go中通道的特性决定:select中对为nil的chan的读/写操作会直接跳过,// 所以只有有数据可读的时候,p,r才有值(p指向peekChan,r指向readChan)select {case p <- dataRead:// 注意,这里什么都没做,因为p仅仅是查看通道,消息不算取走case r <- dataRead:	// 把读出来的这个消息压入到readChan,外部从readChan取走以后,这里会立即返回// 每取走一个消息,count值+1count++// 重新计算下次读位置,读文件号d.moveForward()case d.depthChan <- d.depth:	// 把最新的未读消息数压入对外通道// 什么都不做,外部取走未处理消息数跟本协程没关系case <-d.emptyChan:	// 收到清空队列的信号d.emptyResponseChan <- d.deleteAllFiles()	// 删除所有的文件(清空队列),并把结果压入返回通道,因为外部调用者还在等结果count = 0 // 重新从0计数case dataWrite := <-d.writeChan: // 新接收到消息count++	// 每收到一个消息,count值+1d.writeResponseChan <- d.writeOne(dataWrite) // 消息写入到文件缓冲区,结果压入返回通道case <-syncTicker.C:// 同步定时器if count == 0 {continue	// 虽然时间到了,但数据没有变化,也不用同步}d.needSync = true // 这里仅标记,下次for循环会进行同步操作case <-d.exitChan:	// 退出信号goto exit}}exit:d.logf(INFO, "DISKQUEUE(%s): closing ... ioLoop", d.name)syncTicker.Stop()// 退出完成,往exitSyncChan发信号,因为主协程还在等d.exitSyncChan <- 1
}

上面已经讲了ioloop()函数的运作机制,代码也添加了详细的注释,相信大家都能轻松看懂

我再啰嗦下几个值得注意的点

1. diskqueue的运作流程是一边新写入消息,一边读取处理,像生产者消费者机制一样

如果读慢写快,结果就是消息文件会一直新增,这倒没什么,反正消息已保存进文件了,后面再处理就是了

如果读快写慢,那么只要持续的时间够长,读的位置一定会追上写的位置,造成无消息可读,等于发生了读写追尾,这个时候读操作就要停止

所以总结下可读操作的条件:

要么读的文件号较小,写的文件号较大,读写不是同一个文件;

要么读写的是同一个文件,但读的位置比写的位置小;

2. 函数内:r表可读的通道,p表查看的通道,当不可读的时候,r,p均赋值为nil,本函数的select会直接跳过对值为nil通道的写入,外部接受者的select也会跳过对值为nil通道的读取,这是golang通道的特性之一,大家注意

3. diskqueue在退出/删除时会调用exit()函数,该函数会关闭通道exitChan,此时ioloop()的select就会返回,关闭ioloop的协程。注意:关闭通道时,所有监听该通道的select都会收到消息,这也是golang通道的特性之一

3. 新写入消息时的处理

diskqueue对外提供的写入消息接口为

Put([]byte) error

这个接口的实现不复杂,就是把对应的数据写入到当前在写文件,如果发现写入数据后,当前在写文件会超上限,那就不写了,关闭当前的写文件,新开一个写入文件从0位置开始写

源码如下(已添加详细注释)

// 把指定数据压入接收队列
func (d *diskQueue) Put(data []byte) error {d.RLock()defer d.RUnlock()// 如果队列正在退出,返回吧if d.exitFlag == 1 {return errors.New("exiting")}d.writeChan <- data			// 压入接收队列,因为无缓冲,所以会阻塞等待ioloop()循环中取走才会返回return <-d.writeResponseChan// 阻塞等待结果,ioloop()从writeChan读取执行后会立马出结果
}

可以看到,函数操作很简单,核心操作就是往d.writeChan中压入数据,由于d.writeChan是无缓冲通道,所以会阻塞等待ioloop()函数中select的取走

ioloop()函数中select对此的处理如下

case dataWrite := <-d.writeChan: // 新接收到消息count++    // 每收到一个消息,count值+1d.writeResponseChan <- d.writeOne(dataWrite) // 消息写入到文件缓冲区,结果压入返回通道

select对此的处理也很简单,count值加1后,调用了d.writeOne(dataWrite),然后把writeOne()的结果写入到writeResponseChan 

我们看下writeOne()函数的处理,源码如下(已添加详细注释)

// 写入一个消息到缓冲区(函数内部会自动创建新文件)
func (d *diskQueue) writeOne(data []byte) error {var err errordataLen 	:= int32(len(data))		// 数据长度totalBytes 	:= int64(4 + dataLen)	// 本次写入的总字节数(4字节的数据长度 + 真正数据部分)// 数据大小检测if dataLen < d.minMsgSize || dataLen > d.maxMsgSize {return fmt.Errorf("invalid message write size (%d) minMsgSize=%d maxMsgSize=%d", dataLen, d.minMsgSize, d.maxMsgSize)}// 如果加上本次写入量会超过文件最大限制,就关闭当前文件,创建新的if d.writePos > 0 && d.writePos+totalBytes > d.maxBytesPerFile {// 如果当前已经在读这个文件if d.readFileNum == d.writeFileNum {d.maxBytesPerFileRead = d.writePos	// 标识最大可读字节数即writePos(因为不再写入这个文件了,下面会往新文件里面写了)}d.writeFileNum++	// 新文件编号d.writePos = 0		// 新文件的写入起始点// 当前文件的内容刷到磁盘err = d.sync()if err != nil {d.logf(ERROR, "DISKQUEUE(%s) failed to sync - %s", d.name, err)}// 关闭当前文件if d.writeFile != nil {d.writeFile.Close()d.writeFile = nil}}// 要写的文件还不存在,新建if d.writeFile == nil {// 格式化文件名curFileName := d.fileName(d.writeFileNum)// 创建文件d.writeFile, err = os.OpenFile(curFileName, os.O_RDWR|os.O_CREATE, 0600)if err != nil {return err}d.logf(INFO, "DISKQUEUE(%s): writeOne() opened %s", d.name, curFileName)// 如果已有写入位置if d.writePos > 0 {_, err = d.writeFile.Seek(d.writePos, 0) // 偏移文件游标,0表从文件开头进行偏移if err != nil {d.writeFile.Close()d.writeFile = nilreturn err}}}d.writeBuf.Reset()// 先把数据长度(4字节)写入buferr = binary.Write(&d.writeBuf, binary.BigEndian, dataLen)if err != nil {return err}// 再把数据写入buf_, err = d.writeBuf.Write(data)if err != nil {return err}// 把buf写入(注意这里其实是写入到文件的缓冲区,并没有刷到磁盘中,只有调用writeFile.fsync()才是真正刷到磁盘)_, err = d.writeFile.Write(d.writeBuf.Bytes())if err != nil {d.writeFile.Close()d.writeFile = nilreturn err}d.writePos 	+= totalBytes	// 更新写入位置d.depth		+= 1			// 更新消息数return err
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/65805.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

JavaScript运行机制与实践应用

一、JavsScript运行机制 1、JavaScript 是一种解释型语言&#xff0c;它的执行机制主要包括以下几个步骤&#xff1a; 2、事件循环 3、JavaScript运行模型 4、JavaScript任务 5、JavaScript宏任务和微任务 6、案例分析 console.log(script start) setTimeout(function () {co…

【面试题精讲】Spring Framework有哪些模块?

首发博客地址 系列文章地址 Spring Framework是一个广泛使用的开源应用程序框架&#xff0c;用于构建企业级Java应用程序。它提供了许多不同的模块&#xff0c;用于支持各种不同的应用程序开发需求。以下是Spring Framework的一些核心模块&#xff1a; 「Spring Core Container…

RocketMQ消息队列-@RocketMQMessageListener实现原理

使用Spring-RocketMQ时&#xff0c;只需要引入rocketmq-spring-boot-starter包&#xff0c;并且定义以下消费者&#xff0c;就可以很简单的实现消息消费 Component RocketMQMessageListener(topic "first-topic", consumerGroup "my-producer-group", s…

代码随想录Day_51打卡

①、买卖股票的最佳时机含冷冻期 给定一个整数数组prices&#xff0c;其中第 prices[i] 表示第 i 天的股票价格 。​ 设计一个算法计算出最大利润。在满足以下约束条件下&#xff0c;你可以尽可能地完成更多的交易&#xff08;多次买卖一支股票&#xff09;: 卖出股票后&#…

SEAN代码(1)

代码地址 首先定义一个trainer。 trainer Pix2PixTrainer(opt)在Pix2PixTrainer内部&#xff0c;首先定义Pix2PixModel模型。 self.pix2pix_model Pix2PixModel(opt)在Pix2PixModel内部定义生成器&#xff0c;判别器。 self.netG, self.netD, self.netE self.initialize_…

Ansible学习笔记10

1、在group1的被管理机里的mariadb里创建一个abc库&#xff1b; 1&#xff09; 然后我们到agent主机上进行检查&#xff1a; 可以看到数据库已经创建成功。 再看几个其他命令&#xff1a; #a组主机重启mysql&#xff0c;并设置开机自启 ansible a -m service -a "namemy…

算法练习(10):牛客在线编程10 贪心算法

package jz.bm;import java.util.ArrayList; import java.util.Arrays;public class bm10 {/*** BM95 分糖果问题*/public int candy (int[] arr) {int res 0;int n arr.length;int[] nums new int[n];//每个人都分配一个糖果for (int i 0; i < n; i) {nums[i] 1;}//从…

HDMI 输出实验

FPGA教程学习 第十四章 HDMI 输出实验 文章目录 FPGA教程学习前言实验原理实验过程程序设计时钟模块&#xff08;video_pll&#xff09;彩条产生模块&#xff08;color_bar)配置数据查找表模块&#xff08;lut_adv7511&#xff09;I2C Master 寄存器配置模块&#xff08;i2c_c…

elasticSearch+kibana+logstash+filebeat集群改成https认证

文章目录 一、生成相关证书二、配置elasticSearh三、配置kibana四、配置logstash五、配置filebeat六、连接https es的java api 一、生成相关证书 ps&#xff1a;主节点操作 切换用户&#xff1a;su es 进入目录&#xff1a;cd /home/es/elasticsearch-7.6.2 创建文件&#x…

Pytest 框架执行用例流程浅谈

背景&#xff1a; 根据以下简单的代码示例&#xff0c;我们将从源码的角度分析其中的关键加载执行步骤&#xff0c;对pytest整体流程架构有个初步学习。 代码示例&#xff1a; import pytest def test_add(): assert 1 1 2 def test_sub(): assert 2 - 1 1 通过 pytes…

uniapp项目实践总结(八)自定义加载组件

有时候一个页面请求接口需要加载很长时间,这时候就需要一个加载页面来告知用户内容正在请求加载中,下面就写一个简单的自定义加载组件。 目录 准备工作逻辑思路实战演练效果预览准备工作 在之前的全局组件目录components下新建一个组件文件夹,命名为q-loading,组件为q-loa…

Adobe Illustrator 2023 for mac安装教程,可用。

Adobe Illustrator 是行业标准的矢量图形应用程序&#xff0c;可以为印刷、网络、视频和移动设备创建logos、图标、绘图、排版和插图。数以百万计的设计师和艺术家使用Illustrator CC创作&#xff0c;从网页图标和产品包装到书籍插图和广告牌。此版本是2023版本&#xff0c;适配…

LeetCode(力扣)236. 二叉树的最近公共祖先Python

LeetCode236. 二叉树的最近公共祖先 题目链接代码 题目链接 https://leetcode.cn/problems/lowest-common-ancestor-of-a-binary-tree/ 代码 # Definition for a binary tree node. # class TreeNode: # def __init__(self, x): # self.val x # self.…

C语言深入理解指针(非常详细)(二)

目录 指针运算指针-整数指针-指针指针的关系运算 野指针野指针成因指针未初始化指针越界访问指针指向的空间释放 如何规避野指针指针初始化注意指针越界指针不使用时就用NULL避免返回局部变量的地址 assert断言指针的使用和传址调用传址调用例子&#xff08;strlen函数的实现&a…

The Cherno——OpenGL

The Cherno——OpenGL 1. 欢迎来到OpenGL OpenGL是一种跨平台的图形接口&#xff08;API&#xff09;&#xff0c;就是一大堆我们能够调用的函数去做一些与图像相关的事情。特殊的是&#xff0c;OpenGL允许我们访问GPU&#xff08;Graphics Processing Unit 图像处理单元&…

pwngdb 中 b *$rebase(0x相对基址偏移) 是什么意思

pwngdb 中 b *$rebase(0x相对基址偏移) 是什么意思 pwngdb 是一个针对二进制漏洞利用的调试工具库&#xff0c;用于在 GDB 调试器中辅助进行漏洞开发和漏洞利用的调试。b *$rebase(0x相对基址偏移) 是 pwngdb 中的一个调试命令&#xff0c;用于在基地址重定位后设置断点。 在二…

Python小知识 - 如何使用Python的Flask框架快速开发Web应用

如何使用Python的Flask框架快速开发Web应用 现在越来越多的人把Python作为自己的第一语言来学习&#xff0c;Python的简洁易学的语法以及丰富的第三方库让人们越来越喜欢上了这门语言。本文将介绍如何使用Python的Flask框架快速开发Web应用。 Flask是一个使用Python编写的轻量级…

Spring Boot中通过maven进行多环境配置

上文 java Spring Boot将不同配置拆分入不同文件管理 中 我们说到了&#xff0c;多环境的多文件区分管理 说到多环境 其实不止我们 Spring Boot有 很多的东西都有 那么 这就有一个问题 如果 spring 和 maven 都配置了环境 而且他们配的不一样 那么 会用谁的呢&#xff1f; 此…

MySQL编写建表语句,如何优雅处理创建时间与更新时间

在 MySQL 中&#xff0c;可以使用 TIMESTAMP 或者 DATETIME 数据类型来存储日期和时间信息&#xff0c;并结合默认值和触发器来实现自动更新 createTime 和 updateTime 字段。 以下是一个示例建表语句&#xff0c;演示如何设置自动更新的 createTime 和 updateTime 字段&#…

《TCP/IP网络编程》阅读笔记--基于Windows实现Hello Word服务器端和客户端

目录 1--Hello Word服务器端 2--客户端 3--编译运行 3-1--编译服务器端 3-2--编译客户端 3-3--运行 1--Hello Word服务器端 // gcc hello_server_win.c -o hello_server_win -lwsock32 // hello_server_win 9190 #include <stdio.h> #include <stdlib.h> #i…