code iban 是有什么组成_深入浅出Zookeeper(四):客户端的请求在服务器中经历了什么...

12c5996ab658eb43c076f177f63530b8.png

作者  泊浮目 · 沃趣科技高级研发工程师

出品  沃趣科技

84890c75adf5a67b89a39a831bdfbd5a.png

1. 前言

当我们向zk发出一个数据更新请求时,这个请求的处理流程是什么样的?zk又是使用了什么共识算法来保证一致性呢?带着这些问题,我们进入今天的正文。

2. 设计模式:责任链模式(Chain of Responsibility)

在分析源码之前,必须先和大家简单的科普一下责任链模式,因为这和本文的内容密切相关。简单的说:责任链模式将多个对象组成一条指责链,然后按照它们在职责链的顺序一个个地找出到底谁来负责处理。

那它的好处是什么呢?即松耦合发出请求者和处理者之间的关系:处理者们可以自由的推卸“请求”直到找到相应的处理者。如果处理者收到了不属于自己所需处理的请求时,只需转发下去即可,不需要编写额外的逻辑处理。

3. 请求逻辑追踪

我们先从ZooKeeperServer这个类入手,查看其实现类。我们需要关心的(常见的)zk服务器角色如下:

  • LeaderZooKeeperServer

  • FollowerZooKeeperServer

  • ObserverZooKeeperServer

3.1 实现鸟瞰

8abcf2598347517573189d5c7503fd9c.png

3.1.1 LeaderZooKeeperServer

代码的入口在LeaderZooKeeperServer.setupRequestProcessors,为了阅读体验,笔者在这里会先以视图的方式呈现逻辑组织。而喜欢阅读源码的同学可以阅读3.2里的实现详解。

|-- LeaderRequestProcessor
\-- processRequest //检查会话是否失效
|-- PrepRequestProcessor
\-- processRequest //参数校验和根据需求创建事务
|-- ProposalRequestProcessor
\-- processRequest // 发起proposal
\-- //事务型请求
\-- SyncRequestProcessor
\-- processRequest // 将请求记录到事务日志中,如果有需要的话则触发快照
\-- AckRequestProcessor
\-- processRequest // 确认事务日志收集完成,对于Proposal的投票器进行ack反馈
\-- CommitProcessor
\-- processRequest // 等待集群内Proposal投票直到可被提交
\-- ToBeAppliedRequestProcessor
\-- processRequest // 存储已经被CommitProcessor处理过的可提交的Proposal——直到FinalRequestProcessor处理完后,才会将其移除
\-- FinalRequestProcessor
\-- processRequest // 回复请求,并改变内存数据库的状态
\-- //非事务型请求
\-- CommitProcessor
\-- processRequest // skip,只处理非事务型请求
\-- ToBeAppliedRequestProcessor
\-- processRequest // skip,配合CommitProcessor一起工作
\-- FinalRequestProcessor
\-- processRequest // 回复请求,并改变内存数据库的状态

3.1.2 FollowerZooKeeperServer

//处理 client的请求
|-- FollowerRequestProcessor
\-- processRequest //事务的话调用CommitProcessor,并发送给leader;不然直接到FinalProcessor
|-- CommitProcessor
\-- processRequest // 等待集群内Proposal投票直到可被提交
|-- FinalProcessor
\-- processRequest // 回复请求,并改变内存数据库的状态

//专门用来处理 leader发起的proposal
|-- SyncRequestProcessor
| \-- processRequest // 将请求记录到事务日志中,如果有需要的话则触发快照
|-- SendAckRequestProcessor
\-- processRequest // ack给proposal发起者,表示自身完成了日志的记录

3.1.3 ObserverZooKeeperServer

//处理 client的请求
|-- ObserverRequestProcessor
\-- processRequest //和FollowerRequestProcessor代码几乎一模一样:事务的话调用CommitProcessor,并发送给leader;不然直接到FinalProcessor
|-- CommitProcessor
\-- processRequest // 等待集群内Proposal投票直到可被提交
|-- FinalProcessor
\-- processRequest // 回复请求,并改变内存数据库的状态

3.2 实现详解

下面的源码分析基于3.5.7版本。

3.2.1 LeaderZooKeeperServer

   @Override
protected void setupRequestProcessors() {
RequestProcessor finalProcessor = new FinalRequestProcessor(this);
RequestProcessor toBeAppliedProcessor = new Leader.ToBeAppliedRequestProcessor(finalProcessor, getLeader());
commitProcessor = new CommitProcessor(toBeAppliedProcessor,
Long.toString(getServerId()), false,
getZooKeeperServerListener());
commitProcessor.start();
ProposalRequestProcessor proposalProcessor = new ProposalRequestProcessor(this,
commitProcessor);
proposalProcessor.initialize();
prepRequestProcessor = new PrepRequestProcessor(this, proposalProcessor);
prepRequestProcessor.start();
firstProcessor = new LeaderRequestProcessor(this, prepRequestProcessor);

setupContainerManager();
}

3.2.2 LeaderRequestProcessor

    @Override
public void processRequest(Request request)throws RequestProcessorException {
// Check if this is a local session and we are trying to create
// an ephemeral node, in which case we upgrade the session
Request upgradeRequest = null;
try {
upgradeRequest = lzks.checkUpgradeSession(request);
} catch (KeeperException ke) {
if (request.getHdr() != null) {
LOG.debug("Updating header");
request.getHdr().setType(OpCode.error);
request.setTxn(new ErrorTxn(ke.code().intValue()));
}
request.setException(ke);
LOG.info("Error creating upgrade request " + ke.getMessage());
} catch (IOException ie) {
LOG.error("Unexpected error in upgrade", ie);
}
if (upgradeRequest != null) {
nextProcessor.processRequest(upgradeRequest);
}

nextProcessor.processRequest(request);
}

这段逻辑很清楚。因需要检查会话是否过期,去创建一个临时节点。如果失败那么就抛出异常。

3.2.3 PrepRequestProcessor

该类有1000多行代码,故此会挑出较为典型的代码进行剖析。在此之前,我们先看注释:

This request processor is generally at the start of a RequestProcessor change. It sets up any transactions associated with requests that change the state of the system. It counts on ZooKeeperServer to update outstandingRequests, so that it can take into account transactions that are in the queue to be applied when generating a transaction.

简单来说,它一般位于请求处理链的头部,它会设置事务型请求(改变系统状态的请求)。

OpCode.create2

对于创建型请求逻辑大致为:

          case OpCode.create2:
CreateRequest create2Request = new CreateRequest();
pRequest2Txn(request.type, zks.getNextZxid(), request, create2Request, true);
break;

跳往pRequest2Txn

    protected void pRequest2Txn(int type, long zxid, Request request,
Record record, boolean deserialize)throws KeeperException, IOException, RequestProcessorException{
request.setHdr(new TxnHeader(request.sessionId, request.cxid, zxid,
Time.currentWallTime(), type));

switch (type) {
case OpCode.create:
case OpCode.create2:
case OpCode.createTTL:
case OpCode.createContainer: {
pRequest2TxnCreate(type, request, record, deserialize);
break;
}
//....多余代码不再展示

跳往pRequest2TxnCreate

    private void pRequest2TxnCreate(int type, Request request, Record record, boolean deserialize) throws IOException, KeeperException {
if (deserialize) {
ByteBufferInputStream.byteBuffer2Record(request.request, record);
}

int flags;
String path;
List acl;byte[] data;long ttl;if (type == OpCode.createTTL) {
CreateTTLRequest createTtlRequest = (CreateTTLRequest)record;
flags = createTtlRequest.getFlags();
path = createTtlRequest.getPath();
acl = createTtlRequest.getAcl();
data = createTtlRequest.getData();
ttl = createTtlRequest.getTtl();
} else {
CreateRequest createRequest = (CreateRequest)record;
flags = createRequest.getFlags();
path = createRequest.getPath();
acl = createRequest.getAcl();
data = createRequest.getData();
ttl = -1;
}
CreateMode createMode = CreateMode.fromFlag(flags);
validateCreateRequest(path, createMode, request, ttl);
String parentPath = validatePathForCreate(path, request.sessionId);
List listACL = fixupACL(path, request.authInfo, acl);
ChangeRecord parentRecord = getRecordForPath(parentPath);
checkACL(zks, parentRecord.acl, ZooDefs.Perms.CREATE, request.authInfo);int parentCVersion = parentRecord.stat.getCversion();if (createMode.isSequential()) {
path = path + String.format(Locale.ENGLISH, "%010d", parentCVersion);
}
validatePath(path, request.sessionId);try {if (getRecordForPath(path) != null) {throw new KeeperException.NodeExistsException(path);
}
} catch (KeeperException.NoNodeException e) {// ignore this one
}boolean ephemeralParent = EphemeralType.get(parentRecord.stat.getEphemeralOwner()) == EphemeralType.NORMAL;if (ephemeralParent) {throw new KeeperException.NoChildrenForEphemeralsException(path);
}int newCversion = parentRecord.stat.getCversion()+1;if (type == OpCode.createContainer) {
request.setTxn(new CreateContainerTxn(path, data, listACL, newCversion));
} else if (type == OpCode.createTTL) {
request.setTxn(new CreateTTLTxn(path, data, listACL, newCversion, ttl));
} else {
request.setTxn(new CreateTxn(path, data, listACL, createMode.isEphemeral(),
newCversion));
}
StatPersisted s = new StatPersisted();if (createMode.isEphemeral()) {
s.setEphemeralOwner(request.sessionId);
}
parentRecord = parentRecord.duplicate(request.getHdr().getZxid());
parentRecord.childCount++;
parentRecord.stat.setCversion(newCversion);
addChangeRecord(parentRecord);
addChangeRecord(new ChangeRecord(request.getHdr().getZxid(), path, s, 0, listACL));
}

大致可以总结下逻辑:

  1. 组装请求

  2. 校验请求是否合理:未定义的请求、参数不合理

  3. 检查上级路径是否存在

  4. 检查ACL

  5. 检查路径是否合法

  6. 将请求装入outstandingChanges队列

  7. 发送至下一个Processor

OpCode.multi

事务型请求:

          case OpCode.multi:
MultiTransactionRecord multiRequest = new MultiTransactionRecord();
try {
ByteBufferInputStream.byteBuffer2Record(request.request, multiRequest);
} catch(IOException e) {
request.setHdr(new TxnHeader(request.sessionId, request.cxid, zks.getNextZxid(),
Time.currentWallTime(), OpCode.multi));
throw e;
}
List txns = new ArrayList();//Each op in a multi-op must have the same zxid!long zxid = zks.getNextZxid();
KeeperException ke = null;//Store off current pending change records in case we need to rollback
Map pendingChanges = getPendingChanges(multiRequest);for(Op op: multiRequest) {
Record subrequest = op.toRequestRecord();int type;
Record txn;/* If we've already failed one of the ops, don't bother
* trying the rest as we know it's going to fail and it
* would be confusing in the logfiles.
*/if (ke != null) {
type = OpCode.error;
txn = new ErrorTxn(Code.RUNTIMEINCONSISTENCY.intValue());
}/* Prep the request and convert to a Txn */else {try {
pRequest2Txn(op.getType(), zxid, request, subrequest, false);
type = request.getHdr().getType();
txn = request.getTxn();
} catch (KeeperException e) {
ke = e;
type = OpCode.error;
txn = new ErrorTxn(e.code().intValue());if (e.code().intValue() > Code.APIERROR.intValue()) {
LOG.info("Got user-level KeeperException when processing {} aborting" +" remaining multi ops. Error Path:{} Error:{}",
request.toString(), e.getPath(), e.getMessage());
}
request.setException(e);/* Rollback change records from failed multi-op */
rollbackPendingChanges(zxid, pendingChanges);
}
}//FIXME: I don't want to have to serialize it here and then// immediately deserialize in next processor. But I'm// not sure how else to get the txn stored into our list.
ByteArrayOutputStream baos = new ByteArrayOutputStream();
BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);
txn.serialize(boa, "request") ;
ByteBuffer bb = ByteBuffer.wrap(baos.toByteArray());
txns.add(new Txn(type, bb.array()));
}
request.setHdr(new TxnHeader(request.sessionId, request.cxid, zxid,
Time.currentWallTime(), request.type));
request.setTxn(new MultiTxn(txns));break;

代码虽然看起来很恶心,但是逻辑倒是挺简单的:

  • 遍历所有请求,一个个组装成起来(要通过一系列的校验:请求合理、上级路径存在、ACL、路径合法),如果中间一直没有异常,则组装成一个请求,里面封装了事务的记录。不然则变成一个标记为错误的请求,并回滚掉当前作用域里的记录(一个Map)。无论如何,请求都会被发送至下一个Processor。

OpCode.sync

//All the rest don't need to create a Txn - just verify session
case OpCode.sync:
zks.sessionTracker.checkSession(request.sessionId,
request.getOwner());
break;

非事务型请求,校验一下session就可以发送至下一个Processor了。

3.2.4 ProposalRequestProcessor

对于事务请求会发起Proposal,并发送给CommitProcessor。而且ProposalRequestProcessor还会将事务请求交付给SyncRequestProcessor。

  public void processRequest(Request request) throws RequestProcessorException {
// LOG.warn("Ack>>> cxid = " + request.cxid + " type = " +
// request.type + " id = " + request.sessionId);
// request.addRQRec(">prop");


/* In the following IF-THEN-ELSE block, we process syncs on the leader.
* If the sync is coming from a follower, then the follower
* handler adds it to syncHandler. Otherwise, if it is a client of
* the leader that issued the sync command, then syncHandler won't
* contain the handler. In this case, we add it to syncHandler, and
* call processRequest on the next processor.
*/

if (request instanceof LearnerSyncRequest){
zks.getLeader().processSync((LearnerSyncRequest)request);
} else {
nextProcessor.processRequest(request);
if (request.getHdr() != null) {
// We need to sync and get consensus on any transactions
try {
zks.getLeader().propose(request);
} catch (XidRolloverException e) {
throw new RequestProcessorException(e.getMessage(), e);
}
syncProcessor.processRequest(request);
}
}
}

接着看propose:

  /**
* create a proposal and send it out to all the members
*
* @param request
* @return the proposal that is queued to send to all the members
*/
public Proposal propose(Request request) throws XidRolloverException {
/**
* Address the rollover issue. All lower 32bits set indicate a new leader
* election. Force a re-election instead. See ZOOKEEPER-1277
*/
if ((request.zxid & 0xffffffffL) == 0xffffffffL) {
String msg =
"zxid lower 32 bits have rolled over, forcing re-election, and therefore new epoch start";
shutdown(msg);
throw new XidRolloverException(msg);
}

byte[] data = SerializeUtils.serializeRequest(request);
proposalStats.setLastBufferSize(data.length);
QuorumPacket pp = new QuorumPacket(Leader.PROPOSAL, request.zxid, data, null);

Proposal p = new Proposal();
p.packet = pp;
p.request = request;

synchronized(this) {
p.addQuorumVerifier(self.getQuorumVerifier());

if (request.getHdr().getType() == OpCode.reconfig){
self.setLastSeenQuorumVerifier(request.qv, true);
}

if (self.getQuorumVerifier().getVersion() p.addQuorumVerifier(self.getLastSeenQuorumVerifier());
}

if (LOG.isDebugEnabled()) {
LOG.debug("Proposing:: " + request);
}

lastProposed = p.packet.getZxid();
outstandingProposals.put(lastProposed, p);
sendPacket(pp);
}
return p;
}

这次提交的记录是一个QuorumPacket,其实现了Record接口。指定了type为PROPOSAL。我们看一下注释:

    /**
* This message type is sent by a leader to propose a mutation.
*/
public final static int PROPOSAL = 2;

很显然,这个只有Leader才可以发起的一种变化型请求。再简单描述下逻辑:

  1. 放到outstandingProposals的Map里

  2. 组装成发送的Packet

  3. 将Proposal传递给下一个Processor

3.2.5 CommitProcessor

顾名思义,事务提交器。只关心事务请求——等待集群内Proposal投票直到可被提交。有了CommitProcessor,每个服务器都可以很好的对事务进行顺序处理。

该部分的代码实在简陋,故不占篇幅来分析。读者朋友知道上述信息后,也可以理解整个请求链是怎样的。

3.2.6 SyncRequestProcessor

逻辑非常的简单,将请求记录到事务日志中,并尝试触发快照。

   public void processRequest(Request request) {
// request.addRQRec(">sync");
queuedRequests.add(request);
}

//线程的核心方法,会对queuedRequests这个队列进行操作
@Override
public void run() {
try {
int logCount = 0;

// we do this in an attempt to ensure that not all of the servers
// in the ensemble take a snapshot at the same time
int randRoll = r.nextInt(snapCount/2);
while (true) {
Request si = null;
if (toFlush.isEmpty()) {
si = queuedRequests.take();
} else {
si = queuedRequests.poll();
if (si == null) {
flush(toFlush);
continue;
}
}
if (si == requestOfDeath) {
break;
}
if (si != null) {
// track the number of records written to the log
if (zks.getZKDatabase().append(si)) {
logCount++;
if (logCount > (snapCount / 2 + randRoll)) {
randRoll = r.nextInt(snapCount/2);
// roll the log
zks.getZKDatabase().rollLog();
// take a snapshot
if (snapInProcess != null && snapInProcess.isAlive()) {
LOG.warn("Too busy to snap, skipping");
} else {
snapInProcess = new ZooKeeperThread("Snapshot Thread") {
public void run() {
try {
zks.takeSnapshot();
} catch(Exception e) {
LOG.warn("Unexpected exception", e);
}
}
};
snapInProcess.start();
}
logCount = 0;
}
} else if (toFlush.isEmpty()) {
// optimization for read heavy workloads
// iff this is a read, and there are no pending
// flushes (writes), then just pass this to the next
// processor
if (nextProcessor != null) {
nextProcessor.processRequest(si);
if (nextProcessor instanceof Flushable) {
((Flushable)nextProcessor).flush();
}
}
continue;
}
toFlush.add(si);
if (toFlush.size() > 1000) {
flush(toFlush);
}
}
}
} catch (Throwable t) {
handleException(this.getName(), t);
} finally{
running = false;
}
LOG.info("SyncRequestProcessor exited!");
}

3.2.7 ToBeAppliedRequestProcessor

该处理器的核心为一个toBeApplied队列,专门用来存储那些已经被CommitProcessor处理过的可提交的Proposal——直到FinalRequestProcessor处理完后,才会将其移除。

        /*
* (non-Javadoc)
*
* @see org.apache.zookeeper.server.RequestProcessor#processRequest(org.apache.zookeeper.server.Request)
*/
public void processRequest(Request request) throws RequestProcessorException {
next.processRequest(request);

// The only requests that should be on toBeApplied are write
// requests, for which we will have a hdr. We can't simply use
// request.zxid here because that is set on read requests to equal
// the zxid of the last write op.
if (request.getHdr() != null) {
long zxid = request.getHdr().getZxid();
Iterator iter = leader.toBeApplied.iterator();if (iter.hasNext()) {
Proposal p = iter.next();if (p.request != null && p.request.zxid == zxid) {
iter.remove();return;
}
}
LOG.error("Committed request not found on toBeApplied: "
+ request);
}
}

3.2.8 FinalRequestProcessor

篇幅原因,在这里简单的描述下逻辑:既然是最后一个处理器,那么需要回复相应的请求,并负责事务请求的生效——改变内存数据库的状态。

3.2.9 FollowerZooKeeperServer

先看一下其组装Processors的代码:

    @Override
protected void setupRequestProcessors() {
RequestProcessor finalProcessor = new FinalRequestProcessor(this);
commitProcessor = new CommitProcessor(finalProcessor,
Long.toString(getServerId()), true, getZooKeeperServerListener());
commitProcessor.start();
firstProcessor = new FollowerRequestProcessor(this, commitProcessor);
((FollowerRequestProcessor) firstProcessor).start();
syncProcessor = new SyncRequestProcessor(this,
new SendAckRequestProcessor((Learner)getFollower()));
syncProcessor.start();
}

可以看到,这里又两对儿请求链:

  1. FollowerRequestProcessor -> CommitProcessor -> FinalProcessor

  2. SyncRequestProcessor -> SendAckRequestProcessor

那么请求来的时候,是哪个Processor来handle呢?这边可以大致跟踪一下:

  • firstProcessor(即FollowerRequestProcessor),是主要handle流程,由父类ZooKeeperServer来调度,handle 请求

  • syncProcessor(即SyncRequestProcessor)从logRequest的入口进来。该类的由Learner调度进来,handle leader的请求。

看到这里有人就要问了,这明明是个Observer,怎么从Learner进来的呢?这就得看签名了:

/**
* This class is the superclass of two of the three main actors in a ZK
* ensemble: Followers and Observers. Both Followers and Observers share
* a good deal of code which is moved into Peer to avoid duplication.
*/
public class Learner {

为了避免重复代码,就把一些共同的代码抽取上来了。

3.2.10 FollowerRequestProcessor

Follower的正常处理器,会判断是不是事务,是事务就发送给Leader,不然自己处理。

FollowerRequestProcessor.run

    @Override
public void run() {
try {
while (!finished) {
Request request = queuedRequests.take();
if (LOG.isTraceEnabled()) {
ZooTrace.logRequest(LOG, ZooTrace.CLIENT_REQUEST_TRACE_MASK,
'F', request, "");
}
if (request == Request.requestOfDeath) {
break;
}
// We want to queue the request to be processed before we submit
// the request to the leader so that we are ready to receive
// the response
nextProcessor.processRequest(request);

// We now ship the request to the leader. As with all
// other quorum operations, sync also follows this code
// path, but different from others, we need to keep track
// of the sync operations this follower has pending, so we
// add it to pendingSyncs.
switch (request.type) {
case OpCode.sync:
zks.pendingSyncs.add(request);
zks.getFollower().request(request);
break;
case OpCode.create:
case OpCode.create2:
case OpCode.createTTL:
case OpCode.createContainer:
case OpCode.delete:
case OpCode.deleteContainer:
case OpCode.setData:
case OpCode.reconfig:
case OpCode.setACL:
case OpCode.multi:
case OpCode.check:
zks.getFollower().request(request);
break;
case OpCode.createSession:
case OpCode.closeSession:
// Don't forward local sessions to the leader.
if (!request.isLocalSession()) {
zks.getFollower().request(request);
}
break;
}
}
} catch (Exception e) {
handleException(this.getName(), e);
}
LOG.info("FollowerRequestProcessor exited loop!");
}

而交付请求到CommitProcessor的逻辑很迷,事务型消息应该提交到leader,所以不需要这么一个processor——该Processor在前文也说过,用于等待集群内Proposal投票直到可被提交。

3.2.11 SendAckRequestProcessor

    public void processRequest(Request si) {
if(si.type != OpCode.sync){
QuorumPacket qp = new QuorumPacket(Leader.ACK, si.getHdr().getZxid(), null,
null);
try {
learner.writePacket(qp, false);
} catch (IOException e) {
LOG.warn("Closing connection to leader, exception during packet send", e);
try {
if (!learner.sock.isClosed()) {
learner.sock.close();
}
} catch (IOException e1) {
// Nothing to do, we are shutting things down, so an exception here is irrelevant
LOG.debug("Ignoring error closing the connection", e1);
}
}
}
}

逻辑非常的简单,用于反馈ACK成功,表示自身完成了事务日志的记录。

3.2.12 ObserverZooKeeperServer

    /**
* Set up the request processors for an Observer:
* firstProcesor->commitProcessor->finalProcessor
*/
@Override
protected void setupRequestProcessors() {
// We might consider changing the processor behaviour of
// Observers to, for example, remove the disk sync requirements.
// Currently, they behave almost exactly the same as followers.
RequestProcessor finalProcessor = new FinalRequestProcessor(this);
commitProcessor = new CommitProcessor(finalProcessor,
Long.toString(getServerId()), true,
getZooKeeperServerListener());
commitProcessor.start();
firstProcessor = new ObserverRequestProcessor(this, commitProcessor);
((ObserverRequestProcessor) firstProcessor).start();

/*
* Observer should write to disk, so that the it won't request
* too old txn from the leader which may lead to getting an entire
* snapshot.
*
* However, this may degrade performance as it has to write to disk
* and do periodic snapshot which may double the memory requirements
*/
if (syncRequestProcessorEnabled) {
syncProcessor = new SyncRequestProcessor(this, null);
syncProcessor.start();
}
}

逻辑很清晰(大概是因为3.3.0后加入的代码吧),正常的请求链为:

  1. ObserverRequestProcessor

  2. CommitProcessor

  3. FinalProcessor

如果syncRequestProcessorEnabled开启的情况下(缺省为开),这意味着Observer也会去记录事务日志以及做快照,这会给下降一定的性能,以及更多的内存要求。

然后再看下ObserverRequestProcessor,简直和FollowerRequestProcessor如出一辙,有追求的工程师都会想办法复用代码。

    @Override
public void run() {
try {
while (!finished) {
Request request = queuedRequests.take();
if (LOG.isTraceEnabled()) {
ZooTrace.logRequest(LOG, ZooTrace.CLIENT_REQUEST_TRACE_MASK,
'F', request, "");
}
if (request == Request.requestOfDeath) {
break;
}
// We want to queue the request to be processed before we submit
// the request to the leader so that we are ready to receive
// the response
nextProcessor.processRequest(request);

// We now ship the request to the leader. As with all
// other quorum operations, sync also follows this code
// path, but different from others, we need to keep track
// of the sync operations this Observer has pending, so we
// add it to pendingSyncs.
switch (request.type) {
case OpCode.sync:
zks.pendingSyncs.add(request);
zks.getObserver().request(request);
break;
case OpCode.create:
case OpCode.create2:
case OpCode.createTTL:
case OpCode.createContainer:
case OpCode.delete:
case OpCode.deleteContainer:
case OpCode.setData:
case OpCode.reconfig:
case OpCode.setACL:
case OpCode.multi:
case OpCode.check:
zks.getObserver().request(request);
break;
case OpCode.createSession:
case OpCode.closeSession:
// Don't forward local sessions to the leader.
if (!request.isLocalSession()) {
zks.getObserver().request(request);
}
break;
}
}
} catch (Exception e) {
handleException(this.getName(), e);
}
LOG.info("ObserverRequestProcessor exited loop!");
}

以上,就是源码分析部分,基于3.5.7版本。

4. 分布式事务:ZK如何进行事务处理

之前和大家过了一下源码,相信各位对ZK请求处理流程有了一定的了解。接下来,让我们理一理事务请求的过程。从Leader的ProposalRequestProcessor开始,大致会分为三个阶段,即:

  1. Sync

  2. Proposal

  3. Commit

4.1 Sync

主要由ProposalRequestProcessor来做,通知参与proposql的机器(Leader和Follower)都要记录事务日志。

4.2 Proposal

每个事务请求都要超过半数的投票认可(Leader + Follower)。

  1. Leader检查服务端的ZXID可用,可用的话发起Proposal。不可用则抛出XidRolloverException。(见org.apache.zookeeper.server.quorum.Leader.propose)

  2. 根据请求头、事务体以及ZXID生成Proposal(见org.apache.zookeeper.server.quorum.Leader.propose)

  3. 广播给所有Follower服务器(见org.apache.zookeeper.server.quorum.Leader.sendPacket)

  4. 相关成员记录日志,并ACK给Leader服务器——直到超过半数,或者超时(见org.apache.zookeeper.server.quorum.Leader.processAck)。

  5. 将请求丢入toBeApplied队列中。(见org.apache.zookeeper.server.quorum.Leader.tryToCommit)

  6. 广播Commit,发给Follower的为COMMIT,而Observer的为Inform。这使它们提交该Proposal。(见org.apache.zookeeper.server.quorum.Leader.commit && inform)

直到这里,算是完成了SyncRequestProcessor -> AckRequestProcessor

4.3 Commit

接下来讲CommitProcessor -> ToBeAppliedRequestProcessor -> FinalRequestProcessor的过程。

  1. 请求到CommitPrcocessor后是放入一个队列里,由线程一个个取出来。当取出来是事务请求时,那么就会设置一个pending对象到投票结束。这样保证了事务的顺序性,也可以让CommitPrcocessor方便的直到集群中是否有进行中的事务。

  2. 投票通过,唤醒commit流程。提交请求至committedRequests这个队列中,然后一个个发送至ToBeAppliedRequestProcessor里去。

  3. ToBeAppliedRequestProcessor会等待FinalRequestProcessor处理完成后,从toBeApplied队列中移除这个Proposal。

  4. FinalRequestProcessor会先去校验队列中最新的一个请求是否zxid小于等于当前的请求:

  • 是的话则移除该请求。这种情况说明最新应用的事务比当前事务更早完成共识,当前事务请求无效,但是会被记录到commitedLog中。

  • 等于是正常现象,因为这个对列是在PrepRequestProcessor时添加的。接着就是应用到内存数据库了,该内存数据库实例会维护一个默认上限为500的committedLog——存放最近成功的Proposal,便于快速同步。

如果在该步骤服务器宕机,则会在机器拉起时通过proposal阶段的预写日志进行数据订正,并通过PlayBackListener同时将其转换成proposal,并保存到committedLog中,便于同步。

5. 小结

综合全文,我们可以发现ZK对于事务的处理方式有点像是二阶段提交(two-phase commit)。其实这就是ZAB算法,在下一篇文章里,我们会详细介绍其实现,并介绍它的另一个用途——分布式选举。

相关链接

MySQL 一个让你怀疑人生的hang死现象

MySQL权限表损坏导致无法启动

K8S服务暴露: HAProxy在RDS场景下的妙用

深入浅出Zookeeper(三):Watch实现剖析

深入浅出Zookeeper(二):会话管理

深入浅出Zookeeper(一):存储技术

组复制系统变量 | 全方位认识 MySQL 8.0 Group Replication

组复制升级 | 全方位认识 MySQL 8.0 Group Replication

组复制性能 | 全方位认识 MySQL 8.0 Group Replication

组复制安全 | 全方位认识 MySQL 8.0 Group Replication

组复制常规操作-使用xtrabackup备份恢复或添加组成员 | 全方位认识MySQL8.0 Group Replication

组复制常规操作-网络分区&混合使用IPV6与IPV4 | 全方位认识 MySQL 8.0 Group Replication

组复制常规操作-分布式恢复 | 全方位认识 MySQL 8.0 Group Replication

组复制常规操作-事务一致性保证 | 全方位认识 MySQL 8.0 Group Replication

组复制常规操作-在线配置组 | 全方位认识 MySQL 8.0 Group Replication

再述mysqldump时域问题

揭秘 MySQL 主从环境中大事务的传奇事迹

MySQL 执行DDL语句 hang住了怎么办?

手把手教你认识OPTIMIZER_TRACE

MySQL行级别并行复制能并行应用多少个binlog group?

binlog server还是不可靠吗?

MySQL binlog基于时间点恢复数据失败是什么鬼?

MySQL高可用工具Orchestrator系列六:Orchestrator/raft一致性集群

MySQL高可用工具Orchestrator系列五:raft多节点模式安装

MySQL高可用工具Orchestrator系列四:拓扑恢复

MySQL高可用工具Orchestrator系列三:探测机制

select into outfile问题一则

开源监控系统Prometheus的前世今生

prometheus监控多个MySQL实例

prometheus配置MySQL邮件报警

MySQL问题两则

Kubernetes scheduler学习笔记

直方图系列1

执行计划-12:基数反馈

执行计划-11:真实数据

执行计划-10:猜想

执行计划-9:多倍操作

执行计划-8:成本、时间等

大数据量删除的思考(四)

大数据量删除的思考(三)

日志信息记录表|全方位认识 mysql 系统库

复制信息记录表|全方位认识 mysql 系统库

时区信息记录表|全方位认识 mysql 系统库

Oracle RAC Cache Fusion系列十八:Oracle RAC Statisticsand Wait Events

Oracle RAC Cache Fusion 系列十七:Oracle RAC DRM

Oracle RAC CacheFusion 系列十六:Oracle RAC CurrentBlock Server

a38dfaf5a7e31fd9ced22ae6f833e3a4.png

更多干货,欢迎来撩~

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/540087.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

启动activemq_「Java」 - SpringBoot amp; ActiveMQ

一、消息队列消息队列中间件是分布式系统中重要的组件,主要解决应用耦合、异步消息、流量削锋等问题,实现高性能、高可用、可伸缩和最终一致性架构,是大型分布式系统不可缺少的中间件。目前在生产环境中使用较多的消息队列有ActiveMQ、Rabbit…

永不休眠怎么设置_电脑休眠后应该怎样唤醒?

可能有朋友会碰到这种情况,电脑屏幕显示正在休眠,而且不停的转圈,这是怎么回事,如何唤醒?下面让坚哥为大家分析一下吧。电脑开机显示休眠一般根据以下几种情况进行处理:1、正常电脑休眠:一般的是按一下电源…

配置ssl证书_Mysql配置ssl证书

本环境基于mysql5.6配置,通过openssl生成证书进行配置一、确认环境信息1、查看数据库版本mysql> select version();-----------| version() |-----------| 5.6.36 |-----------2、查看数据库是否支持ssl配置mysql> show variables like have%ssl%;-----------…

如何让图片充满excel单元格_如何在Excel单元格建立下拉菜单

对于一些常用的数据我们往往会希望能够尽量快速的输入,下拉菜单就是一个最简单的解决办法。那么如何实现下拉菜单呢?跟随以下步骤,建立属于自己的下拉菜单吧!如何建立下拉菜单?一、确定内容:在单元格中&…

pgsql中float4导致java程序精度丢失_Java基础系列02

注释Java中支持三种注释:1.单行注释以//开始换行结束。2.多行注释以/*开始,以*/结束。3.说明注释以/**开始,以*/结束。关键字关键字:是指在程序中,Java已经定义好的单词,具有特殊含义。例如上篇文章中Hello…

用idea建立jsp项目_用idea创建maven项目,配置tomcat详解

用idea创建maven项目,配置tomcat详解,电脑上得有jdk1.7,或者1.8,然后就是maven3.x吧,再有就是tomcat7以上下面就直接开始看图啦:这个我刚刚开始没注意细看,原来web app 的骨架有2个呢&#xff0…

求立方根_初一数学立方根考点详解,立足基础,把握题型,学会方法

初一数学实数部分,平方根和立方根这两部分的知识点比较的基础,但是考试中却是经常会考,并且有很多的“陷阱”,也是让学生猝不及防,今天我和同学们继续学习交流立方根的知识点,通过详解考点,帮助…

mysql双主数据一致性_MySQL双主一致性架构优化 | 架构师之路-阿里云开发者社区...

一、双主保证高可用MySQL数据库集群常使用一主多从,主从同步,读写分离的方式来扩充数据库的读性能,保证读库的高可用,但此时写库仍然是单点。在一个MySQL数据库集群中可以设置两个主库,并设置双向同步,以冗…

spool导出姓名中文乱码_MySQL不同字符集转化标准—7步实现,杜绝乱码!

引言作为资深的DBA程序员,在工作中是否会遇到更这样的情况呢?原有数据库的字符集由于前期规划不足,随着业务的发展不能满足业务的需求。如原来业务系统用的是utf8字符集,后期有存储表情符号的需求,uft8字符集就不能满足…

easyexcel 设置标题_使用easyexcel完成复杂表头及标题的导出功能(自定义样式)

如需客户端指定excel版本,只需要判断后缀名然后在controller中的.excelType(ExcelTypeEnum.XLS)做指定输出内容格式即可***(注意表格行高列宽统一设置是在实体类的类名注解上,如果需要对表格进行精细的宽高设置需要删除掉这两个注解,可以在拦截器使用row的方法进行设置)1. ## 引…

nmon安装为什么重启mysql_Nmon的安装及使用

一、下载Nmon根据CPU的类型选择下载相应的版本:二、初始化工具[rootmululu ~]# cd /opt[rootmululu opt]# mkdir nmon[rootmululu opt]# cd nmon[rootmululu nmon]#wget http://sourceforge.net/projects/nmon/files/download/nmon_x86_12a.zip[rootmululu nmon]# u…

Iptables详解+实例

2019独角兽企业重金招聘Python工程师标准>>> Iptabels是与Linux内核集成的包过滤防火墙系统,几乎所有的linux发行版本都会包含Iptables的功能。如果 Linux 系统连接到因特网或 LAN、服务器或连接 LAN 和因特网的代理服务器, 则Iptables有利于…

阿里云服务器安装onlyoffice_阿里云服务器安装 JDK 8

欢迎关注“科技毒瘤君”&#xff01;上一期给大家分享了如何申请阿里云的免费云服务器&#xff0c;还没有看过的小伙伴可以先前往了解 >>阿里云免费服务器<<这一次将会为大家分享如何在服务器上配置 Java环境&#xff0c;这里演示使用的系统为Ubuntu 18.04 64位&am…

据说有99%的人都会做错的面试题

这道题主要考察了面试者对浮点数存储格式的理解。另外&#xff0c;请不要讨论该题本身是否有意义之类的话题。本题只为了测试面试者相关的知识是否掌握&#xff0c;题目本身并没有实际的意义。 下面有6个浮点类型变量&#xff0c;其中前三个是float类型的&#xff0c;后三个是d…

php使用mysql5和8的区别_mysql8.0和mysql5.7的区别是什么?

区别&#xff1a;mysql8.0的索引可以被隐藏和显示&#xff0c;当一个索引隐藏时&#xff0c;他不会被查询优化器所使用&#xff1b;2、mysql8.0新增了“SET PERSIST”命令&#xff1b;3、从mysql8.0开始&#xff0c;数据库的缺省编码将改为utf8mb4&#xff0c;包含了所有emoji字…

mysql pt check sum_percona工具pt-table-checksum

利用pt-table-checksum进行数据库同步检查rpm方式#wget percona.com/get/percona-toolkit.rpm源码方式#wget http://www.percona.com/downloads/percona-toolkit/2.2.1/percona-toolkit-2.2.8.tar.gz#yum install perl perl-CPAN perl-DBD-MySQL perl-Time-HiRes解压&#xff0…

如何通过BBED找回删除数据

项目案例&#xff1a;客户删除delete了重要数据&#xff0c;无备份&#xff0c;客户联系我&#xff0c;要求恢复相应数据。本次通过实验方式重现客户现场。备份高于一切&#xff0c;首先备份&#xff0c;再操作 创建表格&#xff1a; create table king(age number,name varcha…

mysql 重置密码语音_数字语音信号处理学习笔记语音信号的同态处理(2)

5.4 复倒谱和倒谱 定义 设信号x(n)的z变换为X(z) z[x(n)]&#xff0c;其对数为&#xff1a; (1) 那么 的逆z变换可写成&#xff1a; (2) 取 (1)式则有 (3) 于是式子(2)则可以写成 (4) 则式子(4)即为信号x(n)的复倒谱 的定义。因为 一般为复数&#xff0c;故称 为复倒谱。如果对…

NFS 八步神曲

Server:第一步yum install - y nfs*第二步vi /etc/exports第三步/var/testdirs *(rw,all_squash,anonuid99,anongid99,sync)第四步service nfs start第五步chkconfig --level 35 nfs on Client第一步mount 192.168.1.X:/var/www/testdirs /var/www/testdirs第二步vi /et…