核心实现是使用CountDownLatch来实现的,先取集群节点总数一半以上数量的CountDownLatch
再发送请求调用其他节点,在这个过程中对于正常响应的节点进行latch.countDown();
最后再统计数量是否为0再决定是否抛异常
// 请求参数final String content = json.toString();// 这里再去通知其他节点,// peers就是各个集群节点,peers.majorityCount() 取peers.size() / 2 + 1 个数的CountDownLatchfinal CountDownLatch latch = new CountDownLatch(peers.majorityCount());// 遍历除了自己之外的节点for (final String server : peers.allServersIncludeMyself()) {if (isLeader(server)) {latch.countDown();continue;}// 调用各个节点 发送请求final String url = buildUrl(server, API_ON_PUB);HttpClient.asyncHttpPostLarge(url, Arrays.asList("key", key), content, new Callback<String>() {@Overridepublic void onReceive(RestResult<String> result) {if (!result.ok()) {Loggers.RAFT.warn("[RAFT] failed to publish data to peer, datumId={}, peer={}, http code={}",datum.key, server, result.getCode());return;}latch.countDown();}@Overridepublic void onError(Throwable throwable) {Loggers.RAFT.error("[RAFT] failed to publish data to peer", throwable);}@Overridepublic void onCancel() {}});}if (!latch.await(UtilsAndCommons.RAFT_PUBLISH_TIMEOUT, TimeUnit.MILLISECONDS)) {// only majority servers return success can we consider this update successLoggers.RAFT.error("data publish failed, caused failed to notify majority, key={}", key);throw new IllegalStateException("data publish failed, caused failed to notify majority, key=" + key);}