1、关键组件
主从同步的实现逻辑主要在HAService中,在它的构造函数中实例化了几个对象同时在start()方法内执行启动:
public class HAService {public HAService(final DefaultMessageStore defaultMessageStore) throws IOException {this.defaultMessageStore = defaultMessageStore;this.acceptSocketService =new AcceptSocketService(defaultMessageStore.getMessageStoreConfig().getHaListenPort());this.groupTransferService = new GroupTransferService();this.haClient = new HAClient();}......public void start() throws Exception {this.acceptSocketService.beginAccept();this.acceptSocketService.start();this.groupTransferService.start();this.haClient.start();}
}
首先了解一下HAService的构造函数中的内容究竟是干什么的:
- AcceptSocketService:主要是处理从节点的连接,调用AcceptSocketService#beginAccept()方法,这一步主要是进行端口绑定,在端口上监听从节点的连接请求;调用AcceptSocketService#start()方法启动服务,这一步主要为了处理从节点的连接请求,与从节点建立连接(可以看做是运行在master节点的)。
- GroupTransferService:主要用于在主从同步的时候,等待数据传输完毕(可以看做是运行在master节点的。
- HAClient:里面与master节点建立连接,向master汇报主从同步进度并存储master发送过来的同步数据(可以看做是运行在slave从节点的)。
了解完HAService中的组件,而且看到在start()方法中启动了各个组件,那么HAService在何时被启动的呢?
还记得之前在记录broker时,看过BrokerController#initialize()初始化方法内,同时也构建了DefaultMessageStore对象,它作为HAService构造函数的入参,定义的start()方法中就包含HAService的启动
1).构建DefaultMessageStore以及start()启动
//BrokerController.class
public class BrokerController {private MessageStore messageStore;//broekr初始化public boolean initialize() throws CloneNotSupportedException {.......this.messageStore = new DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager, this.messageArrivingListener,this.brokerConfig);.......}//borker启动public void start() throws Exception {if (this.messageStore != null) {this.messageStore.start();}}}
}
2)实例化HAServer以及start()启动
//DefaultMessageStore.class
public class DefaultMessageStore implements MessageStore {private final HAService haService;......public DefaultMessageStore(final MessageStoreConfig messageStoreConfig, final BrokerStatsManager brokerStatsManager,final MessageArrivingListener messageArrivingListener, final BrokerConfig brokerConfig) throws IOException {......//实例化HAServiceif (!messageStoreConfig.isEnableDLegerCommitLog()) {this.haService = new HAService(this);} else {this.haService = null;}......}public void start() throws Exception {......if (!messageStoreConfig.isEnableDLegerCommitLog()) {//启动HAthis.haService.start();this.handleScheduleMessageService(messageStoreConfig.getBrokerRole());}......}
}
2.主从同步流程
2.1.绑定端口,监听连接请求
AcceptSocketService#beginAccept方法里面首先获取了ServerSocketChannel,然后进行端口绑定,并在selector上面注册了OP_ACCEPT事件的监听,监听从节点的连接请求:
class AcceptSocketService extends ServiceThread {/*** 监听从节点的连接** @throws Exception If fails.*/public void beginAccept() throws Exception {// 创建ServerSocketChannelthis.serverSocketChannel = ServerSocketChannel.open();// 获取selectorthis.selector = RemotingUtil.openSelector();this.serverSocketChannel.socket().setReuseAddress(true);// 绑定端口:10912this.serverSocketChannel.socket().bind(this.socketAddressListen);// 设置非阻塞this.serverSocketChannel.configureBlocking(false);// 注册OP_ACCEPT连接事件的监听this.serverSocketChannel.register(this.selector, SelectionKey.OP_ACCEPT);}
}
2.2master节点处理连接
因为继承了ServiceThread,所以被调用start()启动方法后,会另外开启一个线程执行run()代码,这块就是处理连接请求:
public class HAService {class AcceptSocketService extends ServiceThread {@Overridepublic void run() {log.info(this.getServiceName() + " service started");// 如果服务未停止while (!this.isStopped()) {try {this.selector.select(1000);// 获取监听到的事件Set<SelectionKey> selected = this.selector.selectedKeys();// 处理事件if (selected != null) {for (SelectionKey k : selected) {// 如果是连接事件if ((k.readyOps() & SelectionKey.OP_ACCEPT) != 0) {SocketChannel sc = ((ServerSocketChannel) k.channel()).accept();if (sc != null) {HAService.log.info("HAService receive new connection, "+ sc.socket().getRemoteSocketAddress());try {// 创建HAConnection,建立连接HAConnection conn = new HAConnection(HAService.this, sc);// 启动conn.start();//添加连接HAService.this.addConnection(conn);}...}}
}
- 从selector中获取到监听到的事件;
- 如果是OP_ACCEPT连接事件,创建与从节点的连接对象HAConnection,与从节点建立连接,然后调用HAConnection的start方法进行启动,并创建的HAConnection对象加入到连接集合中,HAConnection中封装了Master节点和从节点的数据同步逻辑;
2.3HAClient
HAClient同样也继承了ServiceThread
public void run() {log.info(this.getServiceName() + " service started");//是否执行while (!this.isStopped()) {try {//连接Masterif (this.connectMaster()) {//判断时间间隔是否合法if (this.isTimeToReportOffset()) {// 发送同步偏移量,传入的参数是当前的主从复制偏移量currentReportedOffsetboolean result = this.reportSlaveMaxOffset(this.currentReportedOffset);//返回不对则关闭连接if (!result) {this.closeMaster();}}......
}}
2.3.1slave与主节点建立连接
connectMaster()方法执行连接主节点操作
class HAClient extends ServiceThread {// 当前的主从复制进度private long currentReportedOffset = 0;private boolean connectMaster() throws ClosedChannelException {if (null == socketChannel) {String addr = this.masterAddress.get();if (addr != null) {// 将地址转为SocketAddressSocketAddress socketAddress = RemotingUtil.string2SocketAddress(addr);if (socketAddress != null) {// 连接masterthis.socketChannel = RemotingUtil.connect(socketAddress);if (this.socketChannel != null) {// 注册OP_READ可读事件监听this.socketChannel.register(this.selector, SelectionKey.OP_READ);}}}// 获取CommitLog中当前最大的偏移量this.currentReportedOffset = HAService.this.defaultMessageStore.getMaxPhyOffset();// 更新上次写入时间this.lastWriteTimestamp = System.currentTimeMillis();}return this.socketChannel != null;}
}
2.3.2处理网络可读事件
processReadEvent()方法中处理了可读事件,也就是处理Master节点发送的同步数据, 首先从socketChannel中读取数据到byteBufferRead中,byteBufferRead是读缓冲区,读取数据的方法会返回读取到的字节数,对字节数大小进行判断:
class HAClient extends ServiceThread {// 读缓冲区,会将从socketChannel读入缓冲区private ByteBuffer byteBufferRead = ByteBuffer.allocate(READ_MAX_BUFFER_SIZE);private boolean processReadEvent() {int readSizeZeroTimes = 0;while (this.byteBufferRead.hasRemaining()) {try {// 从socketChannel中读取数据到byteBufferRead中,返回读取到的字节数int readSize = this.socketChannel.read(this.byteBufferRead);if (readSize > 0) {// 重置readSizeZeroTimesreadSizeZeroTimes = 0;// 处理数据boolean result = this.dispatchReadRequest();if (!result) {log.error("HAClient, dispatchReadRequest error");return false;}} else if (readSize == 0) {// 记录读取到空数据的次数if (++readSizeZeroTimes >= 3) {break;}} else {log.info("HAClient, processReadEvent read socket < 0");return false;}} catch (IOException e) {log.info("HAClient, processReadEvent read socket exception", e);return false;}}return true;}}
- 如果可读字节数大于0表示有数据需要处理,调用dispatchReadRequest方法进行处理;
- 如果可读字节数为0表示没有可读数据,此时记录读取到空数据的次数,如果连续读到空数据的次数大于3次,将终止本次处理;
2.3.3消息写入ComitLog
dispatchReadRequest方法中会将从节点读取到的数据写入CommitLog,dispatchPosition记录了已经处理的数据在读缓冲区中的位置,从读缓冲区byteBufferRead获取剩余可读取的字节数,如果可读数据的字节数大于一个消息头的字节数(12个字节),表示有数据还未处理完毕,反之表示消息已经处理完毕结束处理。
private boolean dispatchReadRequest() {// 消息头大小final int msgHeaderSize = 8 + 4; // phyoffset + sizeint readSocketPos = this.byteBufferRead.position();// 开启循环不断读取数据while (true) {......// 如果可读取的字节数大于一个消息头的字节数 + 消息体大小if (diff >= (msgHeaderSize + bodySize)) {byte[] bodyData = new byte[bodySize];this.byteBufferRead.position(this.dispatchPosition + msgHeaderSize);this.byteBufferRead.get(bodyData);// 从读缓冲区中根据消息的位置,读取消息内容,将消息追加到从节点的CommitLog中HAService.this.defaultMessageStore.appendToCommitLog(masterPhyOffset, bodyData);this.byteBufferRead.position(readSocketPos);// 更新dispatchPosition的值为消息头大小+消息体大小this.dispatchPosition += msgHeaderSize + bodySize;if (!reportSlaveMaxOffsetPlus()) {return false;}continue;}}if (!this.byteBufferRead.hasRemaining()) {this.reallocateByteBuffer();}break;}return true;}
2.4向Master发送主从同步消息拉取偏移量
在HAClient#run()中与主节点建立连接后,会向主节点发送同步消息拉取偏移量,调用reportSlaveMaxOffset()
private boolean reportSlaveMaxOffset(final long maxOffset) {this.reportOffset.position(0);this.reportOffset.limit(8); // 设置数据传输大小为8个字节this.reportOffset.putLong(maxOffset);// 设置同步偏移量this.reportOffset.position(0);this.reportOffset.limit(8);for (int i = 0; i < 3 && this.reportOffset.hasRemaining(); i++) {try {// 向Master节点发送拉取偏移量this.socketChannel.write(this.reportOffset);} catch (IOException e) {log.error(this.getServiceName()+ "reportSlaveMaxOffset this.socketChannel.write exception", e);return false;}}// 更新发送时间lastWriteTimestamp = HAService.this.defaultMessageStore.getSystemClock().now();return !this.reportOffset.hasRemaining();}
2.5HAConnection
前面知道HAClient中Slave节点会定时向Master节点汇报从节点的消息同步偏移量,那么Master节点是如何处理的呢?
HAConnection中封装了Master节点与从节点的网络通信处理,分别在ReadSocketService(负责读Socket的服务)和WriteSocketService(负责读Socket的服务)。
暂时不做深究了有兴趣的可以去看看。这边值注意的一点是,消息消费时用的是netty,而主从同步时用的是java.nio下原生的SocketChannel