1.绪论
MappedFileQueue是commitLog中最核心的主组件。前面讲解commitLog的时候也曾说过,MappedFileQueue本质上就是一个MappedFile队列,而commitLog操纵Mmapped读写的时候,也是通过MappedFileQueue来实现的。
commitlog和mappedfilequeue和mappedfile的关系如图所示:
所以我们需要分析一下mappedfilequeue的具体作用。
2.MappedFileQueue
2.1 组成
public class MappedFileQueue implements Swappable {private static final Logger log = LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);private static final Logger LOG_ERROR = LoggerFactory.getLogger(LoggerName.STORE_ERROR_LOGGER_NAME);protected final String storePath;//mappedfile的大小protected final int mappedFileSize;//mappedfile的队列protected final CopyOnWriteArrayList<MappedFile> mappedFiles = new CopyOnWriteArrayList<>();//mappedFile分配服务protected final AllocateMappedFileService allocateMappedFileService;//刷新和commit的指针protected long flushedWhere = 0;protected long committedWhere = 0;//存储时间戳,主要是删除过期的mappedfile使用protected volatile long storeTimestamp = 0;
}
2.2 检查每个commitfile是否损坏
其实就是遍历前n-1个commitfile的大小,判断是否为mappedFileSize,如果不是,便认为文件已经损坏。
//检查每个commitfile是否损坏public void checkSelf() {List<MappedFile> mappedFiles = new ArrayList<>(this.mappedFiles);if (!mappedFiles.isEmpty()) {Iterator<MappedFile> iterator = mappedFiles.iterator();MappedFile pre = null;//遍历每个mappedfilewhile (iterator.hasNext()) {MappedFile cur = iterator.next();if (pre != null) {//计算出每个commitfile的大小看是否为mappedFileSize(1gb)if (cur.getFileFromOffset() - pre.getFileFromOffset() != this.mappedFileSize) {LOG_ERROR.error("[BUG]The mappedFile queue's data is damaged, the adjacent mappedFile's offset don't match. pre file {}, cur file {}",pre.getFileName(), cur.getFileName());}}pre = cur;}}}
2.2 根据时间戳获取大于该时间戳或者小于时间戳的消息的时间
可以看出mappedfile中的startTimestamp为第一条消息的写入时间,stopTimestamp为最后一条消息的写入时间。并且该方法根据boundaryType进行判断:
如果boundaryType传参为LOWER,表示获取最后一条消息写入时间大于等于传入时间的mappedfile;如果boundaryType传参为UPPER,表示获取第一条消息写入时间小于等于传入时间的mappedfile
//根据时间戳,获取一段数据public MappedFile getConsumeQueueMappedFileByTime(final long timestamp, CommitLog commitLog,BoundaryType boundaryType) {//获取commitfile数组Object[] mfs = copyMappedFiles(0);if (null == mfs) {return null;}//从后往前遍历m每个mappedfilefor (int i = mfs.length - 1; i >= 0; i--) {DefaultMappedFile mappedFile = (DefaultMappedFile) mfs[i];// Figure out the earliest message store time in the consume queue mapped file.if (mappedFile.getStartTimestamp() < 0) {//从commitlog的持久化文件中获取到该commitlog开始写入数据的时间戳并且设置到commitlogfile中SelectMappedBufferResult selectMappedBufferResult = mappedFile.selectMappedBuffer(0, ConsumeQueue.CQ_STORE_UNIT_SIZE);if (null != selectMappedBufferResult) {try {ByteBuffer buffer = selectMappedBufferResult.getByteBuffer();long physicalOffset = buffer.getLong();int messageSize = buffer.getInt();long messageStoreTime = commitLog.pickupStoreTimestamp(physicalOffset, messageSize);if (messageStoreTime > 0) {mappedFile.setStartTimestamp(messageStoreTime);}} finally {selectMappedBufferResult.release();}}}// Figure out the latest message store time in the consume queue mapped file.//从commitlog的持久化文件中获取到该commitlog最后一条消息的写入的时间戳并且设置到commitlogfile中if (i < mfs.length - 1 && mappedFile.getStopTimestamp() < 0) {SelectMappedBufferResult selectMappedBufferResult = mappedFile.selectMappedBuffer(mappedFileSize - ConsumeQueue.CQ_STORE_UNIT_SIZE, ConsumeQueue.CQ_STORE_UNIT_SIZE);if (null != selectMappedBufferResult) {try {ByteBuffer buffer = selectMappedBufferResult.getByteBuffer();long physicalOffset = buffer.getLong();int messageSize = buffer.getInt();long messageStoreTime = commitLog.pickupStoreTimestamp(physicalOffset, messageSize);if (messageStoreTime > 0) {mappedFile.setStopTimestamp(messageStoreTime);}} finally {selectMappedBufferResult.release();}}}}switch (boundaryType) {//如果boundaryType传参为LOWER,表示获取最后一条消息写入时间大于等于传入时间的mappedfilecase LOWER: {for (int i = 0; i < mfs.length; i++) {DefaultMappedFile mappedFile = (DefaultMappedFile) mfs[i];if (i < mfs.length - 1) {long stopTimestamp = mappedFile.getStopTimestamp();if (stopTimestamp >= timestamp) {return mappedFile;}}// Just return the latest one.if (i == mfs.length - 1) {return mappedFile;}}}//如果boundaryType传参为UPPER,表示获取第一条消息写入时间小于等于传入时间的mappedfilecase UPPER: {for (int i = mfs.length - 1; i >= 0; i--) {DefaultMappedFile mappedFile = (DefaultMappedFile) mfs[i];if (mappedFile.getStartTimestamp() <= timestamp) {return mappedFile;}}}}return null;}
2.3 获取修改时间大于等于传入时间的mappedfile
//获取修改时间大于等于传入时间的mappedfilepublic MappedFile getMappedFileByTime(final long timestamp) {Object[] mfs = this.copyMappedFiles(0);if (null == mfs)return null;for (int i = 0; i < mfs.length; i++) {MappedFile mappedFile = (MappedFile) mfs[i];if (mappedFile.getLastModifiedTimestamp() >= timestamp) {return mappedFile;}}return (MappedFile) mfs[mfs.length - 1];}
2.4 根据offset,删除大于传入offset后面的所有数据
//根据offset,删除大于传入offset后面的所有数据public void truncateDirtyFiles(long offset) {List<MappedFile> willRemoveFiles = new ArrayList<>();for (MappedFile file : this.mappedFiles) {long fileTailOffset = file.getFileFromOffset() + this.mappedFileSize;if (fileTailOffset > offset) {//如果offset在对应的mappedfile中,需要重置写指针、flsuh指针和commit指针if (offset >= file.getFileFromOffset()) {file.setWrotePosition((int) (offset % this.mappedFileSize));file.setCommittedPosition((int) (offset % this.mappedFileSize));file.setFlushedPosition((int) (offset % this.mappedFileSize));} else {//删除小于offset的mappedfile,主要包括关闭niochannel,删除磁盘文件和利用buffer的clean方法清除bufferfile.destroy(1000);willRemoveFiles.add(file);}}}this.deleteExpiredFile(willRemoveFiles);}
2.5 从mappedfilequeue中删除对应的文件
//从mappedfilequeu中删除过期的文件(传入的files)void deleteExpiredFile(List<MappedFile> files) {if (!files.isEmpty()) {Iterator<MappedFile> iterator = files.iterator();while (iterator.hasNext()) {MappedFile cur = iterator.next();if (!this.mappedFiles.contains(cur)) {iterator.remove();log.info("This mappedFile {} is not contained by mappedFiles, so skip it.", cur.getFileName());}}try {if (!this.mappedFiles.removeAll(files)) {log.error("deleteExpiredFile remove failed.");}} catch (Exception e) {log.error("deleteExpiredFile has exception.", e);}}}
2.6 broker初始化的时候,如何将磁盘文件与内存建立映射关系
其实在构建mappedfile的时候,会调用mappedfile的init方法,该方法会见了mappedByteBuffer和磁盘文件的映射关系。
//在broker重启时,需要从磁盘中将mappedfile读取到内存中public boolean doLoad(List<File> files) {//根据名称排序,mappedfile的名称其实就是文件的物理偏移量files.sort(Comparator.comparing(File::getName));for (int i = 0; i < files.size(); i++) {File file = files.get(i);if (file.isDirectory()) {continue;}if (file.length() == 0 && i == files.size() - 1) {boolean ok = file.delete();log.warn("{} size is 0, auto delete. is_ok: {}", file, ok);continue;}if (file.length() != this.mappedFileSize) {log.warn(file + "\t" + file.length()+ " length not matched message store config value, please check it manually");return false;}try {//新建一个mappedfile,这里会调用mappedfile的init方法,他会新建一个mappedbytebuffer,并且与文件建立映射关系MappedFile mappedFile = new DefaultMappedFile(file.getPath(), mappedFileSize);//初始化文件的写指针、flush指针、commit指针mappedFile.setWrotePosition(this.mappedFileSize);mappedFile.setFlushedPosition(this.mappedFileSize);mappedFile.setCommittedPosition(this.mappedFileSize);//加入到mappedfilequeue中this.mappedFiles.add(mappedFile);log.info("load " + file.getPath() + " OK");} catch (IOException e) {log.error("load file " + file + " error", e);return false;}}return true;}
2.7 根据起始偏移量获取到最后一块mappedfile
在获取到最后一块mappedfile的时候,可以根据needcreate参数判断:如果超过内存范围,是否需要重新构建一个mappedfile。
//重要,根据起始偏移量获取到最后一块mappedfile,并且根据needcreate参数判断如果超过内存范围,是否需要重新构建一个mappedfilepublic MappedFile getLastMappedFile(final long startOffset, boolean needCreate) {long createOffset = -1;//获取到mappedfilequeue中的最后一个mappedfileMappedFile mappedFileLast = getLastMappedFile();//如果最后一块mappedfile为空,表示此时为初始化,获取到新的mapperfile的起始位置if (mappedFileLast == null) {createOffset = startOffset - (startOffset % this.mappedFileSize);}//如果不为空,但是最后一块mappedfile已经满了,也需要新建一块mappedfileif (mappedFileLast != null && mappedFileLast.isFull()) {createOffset = mappedFileLast.getFileFromOffset() + this.mappedFileSize;}if (createOffset != -1 && needCreate) {//新建mappedfilereturn tryCreateMappedFile(createOffset);}return mappedFileLast;}
根据代码可以看出,mappedfile其实是在tryCreateMappedFile这个方法中创建出来的。我们仔细研究一下mappedfile的创建。
2.8 mappedfile的创建
2.8.1 mappedfile的创建分析
创建其实分成两种形式,一是同步创建,即直接调用new方法创建mappedfile文件,另一种是调用allocateMappedFileService进行异步创建。
public MappedFile tryCreateMappedFile(long createOffset) {String nextFilePath = this.storePath + File.separator + UtilAll.offset2FileName(createOffset);String nextNextFilePath = this.storePath + File.separator + UtilAll.offset2FileName(createOffset+ this.mappedFileSize);return doCreateMappedFile(nextFilePath, nextNextFilePath);}protected MappedFile doCreateMappedFile(String nextFilePath, String nextNextFilePath) {MappedFile mappedFile = null;//创建包含两种形式:一是直接创建 二是交给allocateMappedFileService进行异步创建。if (this.allocateMappedFileService != null) {//可以看出本质上还是调用的allocateMappedFileService的putRequestAndReturnMappedFile方法来进行创建的,并且创建的时候会//创建两个mappedfile,其实预热的思想,在mappedfile的基础上会创建两个mappedfile文件mappedFile = this.allocateMappedFileService.putRequestAndReturnMappedFile(nextFilePath,nextNextFilePath, this.mappedFileSize);} else {try {mappedFile = new DefaultMappedFile(nextFilePath, this.mappedFileSize);} catch (IOException e) {log.error("create mappedFile exception", e);}}if (mappedFile != null) {if (this.mappedFiles.isEmpty()) {mappedFile.setFirstCreateInQueue(true);}//将创建结果加入到mqppedqueu中this.mappedFiles.add(mappedFile);}return mappedFile;}
2.8.2 mappedfile异步创建服务-AllocateMappedFileService
其核心就是将分配请求加入到一个队列中,然后启动线程来进行消费并且分配mappedfile。
1.基本组成
public class AllocateMappedFileService extends ServiceThread {private static final Logger log = LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);private static int waitTimeOut = 1000 * 5;//存储mappedfile分配请求的queueprivate ConcurrentMap<String, AllocateRequest> requestTable =new ConcurrentHashMap<>();private PriorityBlockingQueue<AllocateRequest> requestQueue =new PriorityBlockingQueue<>();private volatile boolean hasException = false;private DefaultMessageStore messageStore;
}
2.如何进行分配的
public void run() {log.info(this.getServiceName() + " service started");while (!this.isStopped() && this.mmapOperation()) {}log.info(this.getServiceName() + " service end");}
private boolean mmapOperation() {boolean isSuccess = false;AllocateRequest req = null;try {//取出分配mappedfile的请求req = this.requestQueue.take();AllocateRequest expectedRequest = this.requestTable.get(req.getFilePath());if (null == expectedRequest) {log.warn("this mmap request expired, maybe cause timeout " + req.getFilePath() + " "+ req.getFileSize());return true;}if (expectedRequest != req) {log.warn("never expected here, maybe cause timeout " + req.getFilePath() + " "+ req.getFileSize() + ", req:" + req + ", expectedRequest:" + expectedRequest);return true;}if (req.getMappedFile() == null) {long beginTime = System.currentTimeMillis();MappedFile mappedFile;//判断是否开启了瞬时存储技术if (messageStore.isTransientStorePoolEnable()) {try {mappedFile = ServiceLoader.load(MappedFile.class).iterator().next();//如果开启了瞬时存储技术,调用mappedflile中支持瞬时存储技术的init方法mappedFile.init(req.getFilePath(), req.getFileSize(), messageStore.getTransientStorePool());} catch (RuntimeException e) {log.warn("Use default implementation.");mappedFile = new DefaultMappedFile(req.getFilePath(), req.getFileSize(), messageStore.getTransientStorePool());}} else {//如果没有开启,便调用mappedflile中不支持瞬时存储技术的init方法mappedFile = new DefaultMappedFile(req.getFilePath(), req.getFileSize());}long elapsedTime = UtilAll.computeElapsedTimeMilliseconds(beginTime);if (elapsedTime > 10) {int queueSize = this.requestQueue.size();log.warn("create mappedFile spent time(ms) " + elapsedTime + " queue size " + queueSize+ " " + req.getFilePath() + " " + req.getFileSize());}// pre write mappedFile//进行mappedfile的预热if (mappedFile.getFileSize() >= this.messageStore.getMessageStoreConfig().getMappedFileSizeCommitLog()&&this.messageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {mappedFile.warmMappedFile(this.messageStore.getMessageStoreConfig().getFlushDiskType(),this.messageStore.getMessageStoreConfig().getFlushLeastPagesWhenWarmMapedFile());}req.setMappedFile(mappedFile);this.hasException = false;isSuccess = true;}}
}
可以看出AllocateMappedFileService本质上就是调用mappedfile的init方法,来对mappedfile的构造和初始化。
2.9 flush mappedfile
即找到上次flsuh的位置,调用mappedfile的flush方法,将mappedbytebuffer中的文件flush到磁盘中去。
//将mappedfile flush到磁盘中去public boolean flush(final int flushLeastPages) {boolean result = true;//找到上一次flush的位置MappedFile mappedFile = this.findMappedFileByOffset(this.getFlushedWhere(), this.getFlushedWhere() == 0);if (mappedFile != null) {long tmpTimeStamp = mappedFile.getStoreTimestamp();//调用flush方法刷新磁盘文件int offset = mappedFile.flush(flushLeastPages);long where = mappedFile.getFileFromOffset() + offset;result = where == this.getFlushedWhere();this.setFlushedWhere(where);if (0 == flushLeastPages) {this.setStoreTimestamp(tmpTimeStamp);}}return result;}
2.10 commit mappedfile
public synchronized boolean commit(final int commitLeastPages) {boolean result = true;//找到上一次commit的位置MappedFile mappedFile = this.findMappedFileByOffset(this.getCommittedWhere(), this.getCommittedWhere() == 0);if (mappedFile != null) {//调用mappedfile的commit方法commitint offset = mappedFile.commit(commitLeastPages);long where = mappedFile.getFileFromOffset() + offset;result = where == this.getCommittedWhere();this.setCommittedWhere(where);}return result;}
2.11 根据offset返回所在的mappedfile
//返回对应offset在的那个mappedfilepublic MappedFile findMappedFileByOffset(final long offset, final boolean returnFirstOnNotFound) {try {MappedFile firstMappedFile = this.getFirstMappedFile();MappedFile lastMappedFile = this.getLastMappedFile();if (firstMappedFile != null && lastMappedFile != null) {if (offset < firstMappedFile.getFileFromOffset() || offset >= lastMappedFile.getFileFromOffset() + this.mappedFileSize) {LOG_ERROR.warn("Offset not matched. Request offset: {}, firstOffset: {}, lastOffset: {}, mappedFileSize: {}, mappedFiles count: {}",offset,firstMappedFile.getFileFromOffset(),lastMappedFile.getFileFromOffset() + this.mappedFileSize,this.mappedFileSize,this.mappedFiles.size());} else {int index = (int) ((offset / this.mappedFileSize) - (firstMappedFile.getFileFromOffset() / this.mappedFileSize));MappedFile targetFile = null;try {targetFile = this.mappedFiles.get(index);} catch (Exception ignored) {}if (targetFile != null && offset >= targetFile.getFileFromOffset()&& offset < targetFile.getFileFromOffset() + this.mappedFileSize) {return targetFile;}for (MappedFile tmpMappedFile : this.mappedFiles) {if (offset >= tmpMappedFile.getFileFromOffset()&& offset < tmpMappedFile.getFileFromOffset() + this.mappedFileSize) {return tmpMappedFile;}}}if (returnFirstOnNotFound) {return firstMappedFile;}}} catch (Exception e) {log.error("findMappedFileByOffset Exception", e);}return null;}