这是本人学习的总结,主要学习资料如下
- 马士兵教育
- rocketMq官方文档
目录
- 1、Overview
- 2、技术亮点
- 2.1、消息写入时的自旋锁和可重入锁
- 2.2、堆外内存机制
- 2.2.1、Overview
- 2.2.2、源码
- 2.2.2.1、开启堆外内存的条件
- 2.2.2.2、堆外内存的初始化
- 2.2.2.3、写消息到堆外内存
- 2.2.2.4、堆外内存同步数据到磁盘
1、Overview
这是Broker
中类的架构图。
发送和接收消息的代码流程是从上到下的,比如接受消息的流程就是SendMessageProcessor#processRequest-> DefaultMessageStore#asyncPutMessage -> CommitLog#asyncPutMessage -> MappedFile#appendMessage
。
2、技术亮点
2.1、消息写入时的自旋锁和可重入锁
在CommitLog
的构造方法中,初始化了这么一个锁。在写入消息时会调用这个锁的lock()
和unlock()
方法。
this.putMessageLock = defaultMessageStore.getMessageStoreConfig()
.isUseReentrantLockWhenPutMessage() ? new PutMessageReentrantLock()
: new PutMessageSpinLock();
默认情况下是自旋锁,我们也可以配置成可重入锁。
我们看看PutMessageSpinLock
怎么实现的。
public class PutMessageSpinLock implements PutMessageLock {//true: Can lock, false : in lock.private AtomicBoolean putMessageSpinLock = new AtomicBoolean(true);@Overridepublic void lock() {boolean flag;do {flag = this.putMessageSpinLock.compareAndSet(true, false);}while (!flag);}@Overridepublic void unlock() {this.putMessageSpinLock.compareAndSet(false, true);}
}
这个自旋实现的很简单,就是不断地循环然后通过CAS
加锁解锁,所以这个锁不会阻塞线程,不涉及操作系统上下文切换,只是CPU空转。
PutMessageReentrantLock
则更简单,它直接使用ReentrantLock
来加锁解锁。所以可能会导致线程阻塞或者挂起。
官方文档建议,异步刷盘时使用自旋锁,同步刷盘使用可重入锁。
因为异步刷盘速度快,消息到Borker
内存就可以返回发送成功,占有锁的时间较少,自旋锁能有最大的效率。
同步刷盘需要等到消息写入磁盘后才能返回发送成功,占有所得时间较长,用自旋锁会导致大量线程空转占用CPU。所以需要用可重入锁将获取锁失败的线程挂起。
2.2、堆外内存机制
2.2.1、Overview
堆外内存机制用于高并发的场景。
因为高并发会在JVM中产生大量的对象,很可能会频繁地触发GC导致STW暂停业务线程。
堆外内存是指从内存中开辟一个新的空间,这个空间的回收不受GC的控制,完全交给开发者。
这片堆外内存会被当成一个缓存,Broker
接受到的消息对象会存放到堆外内存中,然后定时从把消息从堆外内存中刷到磁盘。
因为堆外内存的垃圾回收不受GC控制,而是交给开发者,所以就能保证垃圾回收的频率够低,保证业务线程尽可能少地暂停。
这是消息写入时,普通模式和开启堆外内存时的流程图。
2.2.2、源码
2.2.2.1、开启堆外内存的条件
在MessageStoreConfig
中可以看到什么情况才会被RocketMQ
认为当前开启了堆外内存。
public boolean isTransientStorePoolEnable() {return transientStorePoolEnable && FlushDiskType.ASYNC_FLUSH == getFlushDiskType()&& BrokerRole.SLAVE != getBrokerRole();
}
可以看到,三个条件同时满足才能开启堆外内存。
Broker.conf
中设置transientStorePoolEnable=true
。- 刷盘方式是异步刷盘:第二个刷盘方式必须是异步刷盘。这是因为同步刷盘要求数据写到磁盘后才返回ACK给生产者,这需要较长的时间。但堆外内存的意义就是为了满足高并发,同步刷盘与之相违背,所以只能是异步刷盘。
- 当前的
Broker
必须是Master:因为主从架构中,从节点只能只能被消费者读消息不能被生产者写消息,而堆外内存只是一个写数据时的缓存,读数据还是得从磁盘中读。所以从节点开启堆外内存没意义,反而会占用内存影响性能。
2.2.2.2、堆外内存的初始化
初始化的内容比较简单,靠外部配置就足够的话,一般是在BrokerStartup#createBrokerController
中。比较复杂的则是在BrokerController#createBrokerController
中。
堆外内存和DefaultMessageStore
有关,初始化在DefaultMessageStore
的构造方法。下面是相关代码。
if (messageStoreConfig.isTransientStorePoolEnable()) {this.transientStorePool.init();
}
public void init() {// poolSize = 5 by defaultfor (int i = 0; i < poolSize; i++) {ByteBuffer byteBuffer = ByteBuffer.allocateDirect(fileSize);final long address = ((DirectBuffer) byteBuffer).address();Pointer pointer = new Pointer(address);LibC.INSTANCE.mlock(pointer, new NativeLong(fileSize));availableBuffers.offer(byteBuffer);}
}
从代码中可以看到,初始化具体做的事是新建默认5个ByteBuffer
对象,然后存放在availableBuffers
中,availableBuffers
是一个队列。
这5个ByteBuffer
是映射到堆外内存的缓存,后续将通过这几个缓存向堆外内存中放数据。
之后在初始化BrokerController#intialize()
中,会通过线程AllocateMappedFileService
调用MappedFile#init()
方法,将上面初始化好的availableBuffers
通过transientStorePool#borrowBuffer()
传给MappedFile
的writeBuffer
。这样写消息时,MappedFile
就可以通过writeBuffer
向堆外内存写数据。
// init with off-heap
public void init(final String fileName, final int fileSize,final TransientStorePool transientStorePool) throws IOException {init(fileName, fileSize);this.writeBuffer = transientStorePool.borrowBuffer();this.transientStorePool = transientStorePool;
}
2.2.2.3、写消息到堆外内存
写消息的流程是SendMessageProcessor#processRequest-> DefaultMessageStore#asyncPutMessage -> CommitLog#asyncPutMessage -> MappedFile#appendMessage -> MappdFile#appendMessagInner
,即使开启堆外内存也是一样。
所以我们可以来看看MappedFile#appendMessagInner
怎么实现写消息到堆外内存。这里截取关键片段。
// 如果writeBuffer不为空,则开启了堆外内存,否则用正常的mappedByteBufferByteBuffer byteBuffer = writeBuffer != null ? writeBuffer.slice() : this.mappedByteBuffer.slice();byteBuffer.position(currentPos);AppendMessageResult result;if (messageExt instanceof MessageExtBrokerInner) {result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos,(MessageExtBrokerInner) messageExt, putMessageContext);} else if (messageExt instanceof MessageExtBatch) {result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos,(MessageExtBatch) messageExt, putMessageContext);} else {return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);}
代码中可以看到,关键点是writeBuffer
。之前提到开启了堆外内存,那初始化时会将堆外内存的映射缓存传给MappedFile
中的workBuffer
;如果没开启堆外内存则writeBuffer
为null。
所以开启堆外内存就向writeBuffer
写数据到堆外内存;没有开启就向mappedByteBuffer
写数据到磁盘。
2.2.2.4、堆外内存同步数据到磁盘
堆外内存只是一个缓存,最终数据应该同步到磁盘。RocketMQ
设置了一个定时线程做这个工作,叫CommitRealTimeService
。
默认200ms
同步一次。因为MQ
就是用于异步的场景,所以写完数据至少200ms
后才能读到也是可以忍耐的。
具体的代码不展示,没什么值得说的地方。