看完上一章之后,有没有很好奇,生产者发送完消息之后,server是如何存储,这一章节就来学习
入口
SendMessageProcessor.processRequest
private CompletableFuture<RemotingCommand> asyncSendMessage(ChannelHandlerContext ctx, RemotingCommand request,SendMessageContext mqtraceContext,SendMessageRequestHeader requestHeader) {final RemotingCommand response = preSend(ctx, request, requestHeader);final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader)response.readCustomHeader();if (response.getCode() != -1) {return CompletableFuture.completedFuture(response);}final byte[] body = request.getBody();int queueIdInt = requestHeader.getQueueId();TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());if (queueIdInt < 0) {queueIdInt = randomQueueId(topicConfig.getWriteQueueNums());}MessageExtBrokerInner msgInner = new MessageExtBrokerInner();msgInner.setTopic(requestHeader.getTopic());msgInner.setQueueId(queueIdInt);if (!handleRetryAndDLQ(requestHeader, response, request, msgInner, topicConfig)) {return CompletableFuture.completedFuture(response);}msgInner.setBody(body);msgInner.setFlag(requestHeader.getFlag());MessageAccessor.setProperties(msgInner, MessageDecoder.string2messageProperties(requestHeader.getProperties()));msgInner.setPropertiesString(requestHeader.getProperties());msgInner.setBornTimestamp(requestHeader.getBornTimestamp());msgInner.setBornHost(ctx.channel().remoteAddress());msgInner.setStoreHost(this.getStoreHost());msgInner.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes());String clusterName = this.brokerController.getBrokerConfig().getBrokerClusterName();MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_CLUSTER, clusterName);msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));CompletableFuture<PutMessageResult> putMessageResult = null;Map<String, String> origProps = MessageDecoder.string2messageProperties(requestHeader.getProperties());String transFlag = origProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);if (transFlag != null && Boolean.parseBoolean(transFlag)) {if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {response.setCode(ResponseCode.NO_PERMISSION);response.setRemark("the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()+ "] sending transaction message is forbidden");return CompletableFuture.completedFuture(response);}putMessageResult = this.brokerController.getTransactionalMessageService().asyncPrepareMessage(msgInner);} else {// 使用defaultMessageStore.aysncPutMessage存储putMessageResult = this.brokerController.getMessageStore().asyncPutMessage(msgInner);}return handlePutMessageResultFuture(putMessageResult, response, request, msgInner, responseHeader, mqtraceContext, ctx, queueIdInt);}
实际真正的负责存储就是DefaultMessageStore, 不过在讲述DefaultMessageStore的时候,我们是自底往上学,因为DefaultMessageStore比较复杂,从顶往下学容易学乱。先从地基开始,然后再看高楼大厦
MappedFile
public class MappedFile extends ReferenceResource {// 内存页大小:4kpublic static final int OS_PAGE_SIZE = 1024 * 4;protected static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);// 当前进程下 所有的 mappedFile占用的总虚拟内存大小private static final AtomicLong TOTAL_MAPPED_VIRTUAL_MEMORY = new AtomicLong(0);// 当前进程下 所有的 mappedFile个数private static final AtomicInteger TOTAL_MAPPED_FILES = new AtomicInteger(0);// 当前mappedFile数据写入点protected final AtomicInteger wrotePosition = new AtomicInteger(0);protected final AtomicInteger committedPosition = new AtomicInteger(0);// 当前mappedFIle数据落盘位点(flushedPosition 之前的数据 都是安全数据,flushedPosition~wrotePosition之间的数据 属于脏页)private final AtomicInteger flushedPosition = new AtomicInteger(0);// 文件大小protected int fileSize;// 文件通道protected FileChannel fileChannel;/*** Message will put to here first, and then reput to FileChannel if writeBuffer is not null.*/protected ByteBuffer writeBuffer = null;protected TransientStorePool transientStorePool = null;// 文件名称(commitLog ConsumeQueue:文件名就是 第一条消息的 物理偏移量 索引文件: 年月日小时分钟秒.. )private String fileName;// 文件名转longprivate long fileFromOffset;// 文件对象private File file;// 内存映射缓冲区,访问虚拟内存private MappedByteBuffer mappedByteBuffer;// 该文件下 保存的第一条 msg 的存储时间private volatile long storeTimestamp = 0;// 当前文件如果是 目录内 有效文件的 首文件的话,该值为trueprivate boolean firstCreateInQueue = false;
-
构造方法
-
appendMessage方法
-
appendMessage(byte[] data)
-
flush
MappedFileQueue
public class MappedFileQueue {private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);private static final InternalLogger LOG_ERROR = InternalLoggerFactory.getLogger(LoggerName.STORE_ERROR_LOGGER_NAME);private static final int DELETE_FILES_BATCH_MAX = 10;// mfq 管理的目录(CommitLog: ../store/commitlog 或者 consumeQueue: ../store/xxx_topic/0)private final String storePath;// 目录下每个文件大小(commitLog文件 默认 1g consumeQueue 文件 默认 600w字节)private final int mappedFileSize;// list,目录下的每个 mappedFile 都加入该listprivate final CopyOnWriteArrayList<MappedFile> mappedFiles = new CopyOnWriteArrayList<MappedFile>();// 创建mappedFile的服务,内部有自己的线程,咱们通过向他提交 request ,内部线程处理完后 会返回给我们结果 结果 就是 mappedFile对象。private final AllocateMappedFileService allocateMappedFileService;// 目录的刷盘位点(它的值: curMappedFile.fileName + curMappedFile.wrotePosition)private long flushedWhere = 0;private long committedWhere = 0;// 当前目录下最后一条msg存储时间private volatile long storeTimestamp = 0;
-
load方法
-
getLastMappedFile
/*** 参数1:startOffset ,文件起始偏移量* 参数2:needCreate ,当 list 为空时,是否创建mappedFile*/public MappedFile getLastMappedFile(final long startOffset, boolean needCreate) {// 该值控制是否创建 mappedFile,当需要创建mappedFile时,它充当文件名的结尾// 两种情况会创建:// 1. list内没有mappedFile// 2. list最后一个 mappedFile (当前顺序写的mappedFile) 它写满了..long createOffset = -1;MappedFile mappedFileLast = getLastMappedFile();if (mappedFileLast == null) {// 情况1 list内没有mappedFile// createOffset 取值 必须是 mappedFileSize 的倍数 或者 0createOffset = startOffset - (startOffset % this.mappedFileSize);}if (mappedFileLast != null && mappedFileLast.isFull()) { // 情况2 list最后一个 mappedFile (当前顺序写的mappedFile) 它写满了..// 上一个文件名 转long + mappedFileSizecreateOffset = mappedFileLast.getFileFromOffset() + this.mappedFileSize;}if (createOffset != -1 && needCreate) {// 这里面是创建 新的 mappedFile 的逻辑。// 获取待创建文件的 绝对路径(下次即将要创建的文件名)String nextFilePath = this.storePath + File.separator + UtilAll.offset2FileName(createOffset);// 获取 下下次 要创建的文件的 绝对路径String nextNextFilePath = this.storePath + File.separator+ UtilAll.offset2FileName(createOffset + this.mappedFileSize);MappedFile mappedFile = null;if (this.allocateMappedFileService != null) {// 创建mappedFile的服务,内部有自己的线程,咱们通过向他提交 request ,// 内部线程处理完后 会返回给我们结果 结果 就是 mappedFile对象。// 当mappedFileSize >= 1g 的话,这里创建的mappedFile 会执行它的 预热方法。mappedFile = this.allocateMappedFileService.putRequestAndReturnMappedFile(nextFilePath,nextNextFilePath, this.mappedFileSize);} else {try {mappedFile = new MappedFile(nextFilePath, this.mappedFileSize);} catch (IOException e) {log.error("create mappedFile exception", e);}}if (mappedFile != null) {if (this.mappedFiles.isEmpty()) {mappedFile.setFirstCreateInQueue(true);}this.mappedFiles.add(mappedFile);}return mappedFile;}return mappedFileLast;}
- flush
/*** @param flushLeastPages (0 表示强制刷新, > 0 脏页数据必须达到 flushLeastPages 才刷新)* @return boolean true 表示本次刷盘无数据落盘 false 表示本次刷盘有数据落盘*/public boolean flush(final int flushLeastPages) {boolean result = true;// 获取当前正在刷盘的文件 (正在顺序写的mappedFile)MappedFile mappedFile = this.findMappedFileByOffset(this.flushedWhere, this.flushedWhere == 0);if (mappedFile != null) {// 获取mappedFile 最后一条消息的存储时间long tmpTimeStamp = mappedFile.getStoreTimestamp();// 调用mf 刷盘 ,返回mf的最新的落盘位点int offset = mappedFile.flush(flushLeastPages);// mf起始偏移量 + mf最新的落盘位点long where = mappedFile.getFileFromOffset() + offset;// true 表示本次刷盘无数据落盘 false 表示本次刷盘有数据落盘result = where == this.flushedWhere;// 将最新的目录刷盘位点 赋值给 flushedWherethis.flushedWhere = where;if (0 == flushLeastPages) {this.storeTimestamp = tmpTimeStamp;}}return result;}