MongoDB源码阅读之ReplSet源码分析

1. ReplSet源码结构

rs_config.h replSet间同步设置的工具类

rs_member.h 心跳检测类和replSet成员状态的定义

rs_sync.h 同步数据类

rs.h 定义了几乎所有replSet相关的类(Member:replSet中的节点成员,

GhostSync:备份同步类,ReplSet:管理所有Member的核心类)

2. ReplSet的结构分析

60L`9Y@OK2F{ESE(L~V{D@P

RSBase定义了线程锁。

ReplSetImpl完成了大部分Replication Set的操作:心跳检测,选举,设置主节点,查询节点信息。

Member记录其他节点的信息,Manager查询节点信息,ReplSetHealthPoolTask心跳检测,ReplSetConfig同步设置,Consensus选举主节点,StateBox设置主节点。

ReplSet封装了很多方便的操作。

3. 创建并设置ReplSet

在mongod开始运行后(mongoDBMain),会启动监听程序同时,根据运行参数中是否标明了”--replSet”,调用startReplication来创建ReplSet。

void startReplication() {
        /* if we are going to be a replica set, we aren't doing other forms of replication. */
        if( !cmdLine._replSet.empty() ) {  //看看参数里面有没有--replSet
            if( replSettings.slave || replSettings.master ) {   //这个参数不能与—slave与—master共存
                log() << "***" << endl;
                log() << "ERROR: can't use --slave or --master replication options with --replSet" << endl;
                log() << "***" << endl;
            }
            newRepl(); //将自己标示成为一个新的Replication节点

            replSet = true;
            ReplSetCmdline *replSetCmdline = new ReplSetCmdline(cmdLine._replSet);
            boost::thread t( boost::bind( &startReplSets, replSetCmdline) );  //启动线程完成真正的ReplSet创建

            return;
        }
        //……
   }

    void startReplSets(ReplSetCmdline *replSetCmdline) {  //线程中执行的函数
        Client::initThread("rsStart");
        try {
            verify( theReplSet == 0 );
            if( replSetCmdline == 0 ) {
                verify(!replSet);
                return;
            }
            replLocalAuth();
            (theReplSet = new ReplSet(*replSetCmdline))->go(); //创建一个ReplSet赋值到一个全局的指针中,随即调用go启动
        }
        catch(std::exception& e) {
            log() << "replSet caught exception in startReplSets thread: " << e.what() << rsLog;
            if( theReplSet )
                theReplSet->fatal();
        }
        cc().shutdown();
    }

 

在构造ReplSetImpl时,会调用ReplSetImpl::loadConfig去设置ReplSet。

 

void ReplSetImpl::loadConfig() {
        startupStatus = LOADINGCONFIG; //标注状态为读取config
        startupStatusMsg.set("loading " + rsConfigNs + " config (LOADINGCONFIG)");
        LOG(1) << "loadConfig() " << rsConfigNs << endl;

        while1 ) {   //循环获取配置
            try {
                vector<ReplSetConfig> configs;
                try {
                    configs.push_back( ReplSetConfig(HostAndPort::me()) );  //尝试从本机端口获取配置
                }
                catch(DBException& e) {
                    log() << "replSet exception loading our local replset configuration object : " << e.toString() << rsLog;
                }

                //这里的种子是用户配置的
                for( vector<HostAndPort>::const_iterator i = _seeds->begin(); i != _seeds->end(); i++ ) {   
                    try {
                        configs.push_back( ReplSetConfig(*i) );   //尝试从其他设备上获取配置
                    }
                    catch( DBException& e ) {
                        log() << "replSet exception trying to load config from " << *i << " : " << e.toString() << rsLog;
                    }
                }
                {
                    scoped_lock lck( replSettings.discoveredSeeds_mx );
                    if( replSettings.discoveredSeeds.size() > 0 ) {    //其他线程搜索的其他设备(心跳检测中提供的节点)
                        for (set<string>::iterator i = replSettings.discoveredSeeds.begin(); 
                             i != replSettings.discoveredSeeds.end(); 
                             i++) {
                            try {
                                configs.push_back( ReplSetConfig(HostAndPort(*i)) );   //从新搜索的设备中获取配置
                            }
                            catch( DBException& ) {
                                log(1) << "replSet exception trying to load config from discovered seed " << *i << rsLog;
                                replSettings.discoveredSeeds.erase(*i);
                            }
                        }
                    }
                }

                if (!replSettings.reconfig.isEmpty()) {
                    try {
                        configs.push_back(ReplSetConfig(replSettings.reconfig, true)); //从shell获取配置
                    }
                    catch( DBException& re) {
                        log() << "replSet couldn't load reconfig: " << re.what() << rsLog;
                        replSettings.reconfig = BSONObj();
                    }
                }

                int nok = 0;
                int nempty = 0;
                for( vector<ReplSetConfig>::iterator i = configs.begin(); i != configs.end(); i++ ) {
                    if( i->ok() )   //判断得到的设置是否可用
                        nok++;
                    if( i->empty() )   //判断得到配置是否为空
                        nempty++;    
                }
                if( nok == 0 ) {

                    if( nempty == (int) configs.size() ) {   //如果没有获取到配置信息
                        startupStatus = EMPTYCONFIG;   //标注状态
                        startupStatusMsg.set("can't get " + rsConfigNs + " config from self or any seed (EMPTYCONFIG)");
                        log() << "replSet can't get " << rsConfigNs << " config from self or any seed (EMPTYCONFIG)" << rsLog;
                        static unsigned once;
                        if( ++once == 1 ) {
                            log() << "replSet info you may need to run replSetInitiate -- rs.initiate() in the shell -- if that is not already done" << rsLog;
                        }
                        if( _seeds->size() == 0 ) {
                            LOG(1) << "replSet info no seed hosts were specified on the --replSet command line" << rsLog;
                        }
                    }
                    else {
                        startupStatus = EMPTYUNREACHABLE;
                        startupStatusMsg.set("can't currently get " + rsConfigNs + " config from self or any seed (EMPTYUNREACHABLE)");
                        log() << "replSet can't get " << rsConfigNs << " config from self or any seed (yet)" << rsLog;
                    }

                    sleepsecs(10);  //十秒后重试
                    continue;
                }

                if( !_loadConfigFinish(configs) ) {     //找到了配置信息但是配置失败
                    log() << "replSet info Couldn't load config yet. Sleeping 20sec and will try again." << rsLog;
                    sleepsecs(20);  //二十秒后重试
                    continue;
                }
            }
            catch(DBException& e) {
                startupStatus = BADCONFIG;
                startupStatusMsg.set("replSet error loading set config (BADCONFIG)");
                log() << "replSet error loading configurations " << e.toString() << rsLog;
                log() << "replSet error replication will not start" << rsLog;
                sethbmsg("error loading set config");
                _fatal();
                throw;
            }
            break;   //设置成功跳出循环
        }
        startupStatusMsg.set("? started");
        startupStatus = STARTED;   //标注状态为开始运行
    }

从代码上看,设置过程为先去各个地方找到可用的配置。之后,统计其中可用的配置,如果没有可用的配置也没有不为空的配置,则输出log提示用户设置。当找到了可用的配置之后,将其数组发送到_loadConfigFinish函数进行设置。

ReplSet的设置工作:

// Our own config must be the first one.
    bool ReplSetImpl::_loadConfigFinish(vector<ReplSetConfig>& cfgs) {
        int v = -1;    //当前版本号
        ReplSetConfig *highest = 0;    
        int myVersion = -2000;    
        int n = 0;
        for( vector<ReplSetConfig>::iterator i = cfgs.begin(); i != cfgs.end(); i++ ) {   //循环取出配置对象
            ReplSetConfig& cfg = *i;
            DEV log(1) << n+1 << " config shows version " << cfg.version << rsLog; 
            if( ++n == 1 ) myVersion = cfg.version;
            if( cfg.ok() && cfg.version > v ) {    //如果可用,并且配置版本比当前高,则存储起来
                highest = &cfg;
                v = cfg.version;
            }
        }
        verify( highest );

        if( !initFromConfig(*highest) )   //将最高配置对象,进行初始化
            return false;

        if( highest->version > myVersion && highest->version >= 0 ) {
            log() << "replSet got config version " << highest->version << " from a remote, saving locally" << rsLog;
            highest->saveConfigLocally(BSONObj());
        }
        return true;
    }

 

4. ReplSet间的心跳检测

在配置ReplSet的过程中,通过调用initFromConfig来更新配置。这个函数中会先去找到新添加的节点,如果有新的节点,为新的节点添加心跳检测的task,如果没有新的节点,则认为是更新配置,这样停掉之前所有的心跳检测task开启新的task for health。

MWI[ZFN(_[}9}V3ZJ`PU)FA

心跳检测过程的示意图:

//开启一个心跳检测task
    void ReplSetImpl::startHealthTaskFor(Member *m) {
        DEV log() << "starting rsHealthPoll for " << m->fullName() << endl;
        ReplSetHealthPollTask *task = new ReplSetHealthPollTask(m->h(), m->hbinfo());   //建立心跳检测task
        healthTasks.insert(task);     //插入心跳检测task集合
        task::repeat(task, 2000);     //每隔两秒心跳检测一次 (在一次检测完成以后等待两秒后再次检测)
    }

 

以发送请求方说明:

心跳检测的目是查看其他Repl上的节点是不是存活。

存活的话调用up函数如下:

 

void ReplSetHealthPollTask::up(const BSONObj& info, HeartbeatInfo& mem) {
            HeartbeatInfo::numPings++;
            mem.authIssue = false;

            if( mem.upSince == 0 ) {
                log() << "replSet member " << h.toString() << " is up" << rsLog;
                mem.upSince = mem.lastHeartbeat;
            }
            mem.health = 1.0;
            mem.lastHeartbeatMsg = info["hbmsg"].String();
            if( info.hasElement("opTime") )
                mem.opTime = info["opTime"].Date();

            // see if this member is in the electable set
            
//如果返回的心跳包含投票信息做下面的操作
            if( info["e"].eoo() ) {    
                // for backwards compatibility
                const Member *member = theReplSet->findById(mem.id());
                if (member && member->config().potentiallyHot()) {   //对方是否为活着的节点
                    theReplSet->addToElectable(mem.id());     //目标节点获得投票权
                }
                else {
                    theReplSet->rmFromElectable(mem.id());    //目标节点失去投票权
                }
            }
            // add this server to the electable set if it is within 10
            
// seconds of the latest optime we know of
            else if( info["e"].trueValue() &&
                     mem.opTime >= theReplSet->lastOpTimeWritten.getSecs() - 10) {
                unsigned lastOp = theReplSet->lastOtherOpTime().getSecs();
                if (lastOp > 0 && mem.opTime >= lastOp - 10) {
                    theReplSet->addToElectable(mem.id());
                }
            }
            else {
                theReplSet->rmFromElectable(mem.id());
            }

            //如果返回的信息中包含设置信息,做同步设置的工作
            be cfg = info["config"];
            if( cfg.ok() ) {
                // received a new config
                boost::function<void()> f =
                    boost::bind(&Manager::msgReceivedNewConfig, theReplSet->mgr, cfg.Obj().copy());
                theReplSet->mgr->send(f);
            }
        }
    };

连接失败时,表示目标节点离线。

 

void ReplSetHealthPollTask::down(HeartbeatInfo& mem, string msg) {
            //设置各种健康状态
            mem.authIssue = false;
            mem.health = 0.0;
            mem.ping = 0;
            //打印log信息
            if( mem.upSince || mem.downSince == 0 ) {
                mem.upSince = 0;
                mem.downSince = jsTime();
                mem.hbstate = MemberState::RS_DOWN;
                log() << "replSet info " << h.toString() << " is down (or slow to respond): " << msg << rsLog;
            }
            //剥夺节点投票权
            mem.lastHeartbeatMsg = msg;
            theReplSet->rmFromElectable(mem.id());
        }

如果在配置阶段接收到了不是自己列表中节点发出的心跳检测请求,可以检测新节点的配置信息。具体代码在heartbeat.cpp中的CmdReplSetHeartbeat(line:104)中实现。

5. ReplSet间的投票过程

投票过程是伴随在心跳检测和消息传递的过程中的,下面的代码阅读都本着只关心头片相关过程的代码阅读,其他枝节部分就略去了。

投票过程的序列图:

0{DW)`W4H1ZRLTE2WGTAA0I

a) 消息传递

消息传递的过程是由心跳检测来完成的,在心跳检测时,单个节点既是发送方也是接收方。

作为发送方,向其他节点发出:

 

//heartbeat.cpp line 134
    bool requestHeartbeat(string setName, string fromstring memberFullName, BSONObj& result,
                          int myCfgVersion, int& theirCfgVersion, bool checkEmpty) {
        if( replSetBlind ) {
            return false;
        }

        //从这段代码中看出,节点向外传出的是节点名称、节点的设置版本号和路径
        BSONObj cmd = BSON( "replSetHeartbeat" << setName <<
                            "v" << myCfgVersion <<
                            "pv" << 1 <<
                            "checkEmpty" << checkEmpty <<
                            "from" << from );

        // generally not a great idea to do outbound waiting calls in a
        
// write lock. heartbeats can be slow (multisecond to respond), so
        
// generally we don't want to be locked, at least not without
        
// thinking acarefully about it first.
        massert(15900"can't heartbeat: too much lock",
                !Lock::somethingWriteLocked() || theReplSet == 0 || !theReplSet->lockedByMe() );

        ScopedConn conn(memberFullName);
        return conn.runCommand("admin",
                               cmd,
                               result,
                               0,
                               &AuthenticationTable::getInternalSecurityAuthenticationTable());
    }


//消息返回后
        bool ReplSetHealthPollTask::_requestHeartbeat(HeartbeatInfo& mem, BSONObj& info, int& theirConfigVersion) {
            if (tries++ % threshold == (threshold - 1)) {
                ScopedConn conn(h.toString());
                conn.reconnect();
            }

            Timer timer;
            time_t before = curTimeMicros64() / 1000000;

            //调用函数传出消息
            bool ok = requestHeartbeat(theReplSet->name(), theReplSet->selfFullName(),
                                       h.toString(), info, theReplSet->config().version, theirConfigVersion);
        //info是传出心跳检测后的返回值
       
//根据返回值同步记录同步的偏差时间
            mem.ping = (unsigned int)timer.millis();

            // we set this on any response - we don't get this far if
            
// couldn't connect because exception is thrown
            time_t after = mem.lastHeartbeat = before + (mem.ping / 1000);

            if ( info["time"].isNumber() ) {
                long long t = info["time"].numberLong();
                if( t > after )
                    mem.skew = (int) (t - after);
                else if( t < before )
                    mem.skew = (int) (t - before); // negative
            }
            else {
                // it won't be there if remote hasn't initialized yet
                if( info.hasElement("time") )
                    warning() << "heatbeat.time isn't a number: " << info << endl;
                mem.skew = INT_MIN;
            }

            {
                //记录下其他节点的状态
                be state = info["state"];
                if( state.ok() )
                    mem.hbstate = MemberState(state.Int());
            }

            return ok;
        }

 

 

//消息返回后的另一部分状态处理
        void ReplSetHealthPollTask::doWork() {
            if ( !theReplSet ) {
                LOG(2) << "replSet not initialized yet, skipping health poll this round" << rsLog;
                return;
            }

            HeartbeatInfo mem = m;
            HeartbeatInfo old = mem;
            try {
                BSONObj info;
                int theirConfigVersion = -10000;

                bool ok = _requestHeartbeat(mem, info, theirConfigVersion);

                // weight new ping with old pings
                
// on the first ping, just use the ping value
                if (old.ping != 0) {
                    mem.ping = (unsigned int)((old.ping * .8) + (mem.ping * .2));
                }

                //……
            }
            catch(...) {
                //……
            }
            m = mem;

            //通知更新节点信息
            theReplSet->mgr->send( boost::bind(&ReplSet::msgUpdateHBInfo, theReplSet, mem) );

            static time_t last = 0;
            time_t now = time(0);
            //判断节点信息是否被改变
            bool changed = mem.changed(old);
            if( changed ) {
                if( old.hbstate != mem.hbstate )
                    log() << "replSet member " << h.toString() << " is now in state " << mem.hbstate.toString() << rsLog;
            }
            //当其他节点信息信息改变或者前后两次连接服务器的时间大于4秒,则更新一下自己节点的状态(在函数中发出投票请求)
            if( changed || now-last>4 ) {
                last = now;
                theReplSet->mgr->send( boost::bind(&Manager::msgCheckNewState, theReplSet->mgr) );
            }
        }

作为接收方:

//接收到数据后会调用这个函数
        virtual bool CmdReplSetHeartbeat::run(const string& , BSONObj& cmdObj, intstring& errmsg, BSONObjBuilder& result, bool fromRepl) {
            //略过鉴权等工作
            
//……
            result.append("set", theReplSet->name());       //名称
            result.append("state", theReplSet->state().s);     //节点状态
            result.append("e", theReplSet->iAmElectable());  //是否可以参加投票
            result.append("hbmsg", theReplSet->hbmsg());   //心跳的一些信息(具体还没看太明白)
            result.append("time", (long long) time(0));       //当前的服务器时间
            result.appendDate("opTime", theReplSet->lastOpTimeWritten.asDate()); //最后一次写操作的时间
            int v = theReplSet->config().version;    //设置的版本信息
            result.append("v", v);
            if( v > cmdObj["v"].Int() )
                result << "config" << theReplSet->config().asBson();

            return true;
        }

 

b) 检测那些节点可以投票

结合之前的心跳检测和后面的消息传递,ReplSet会根据心跳检测的结果调用addToElectable和rmFromElectable来添加和删除投票节点。

值得一提的是,在Manager::msgCheckNewState()被调用时,会去判断当前节点是否可以参与投票:

void Manager::checkElectableSet() {
        unsigned otherOp = rs->lastOtherOpTime().getSecs();
        
        // make sure the electable set is up-to-date
        if (rs->elect.aMajoritySeemsToBeUp() &&          //看看是否有选上的可能
            rs->iAmPotentiallyHot() &&                  //看看自己是不是活跃节点
            (otherOp == 0 || rs->lastOpTimeWritten.getSecs() >= otherOp - 10)) {       //上次写操作是否在10秒以内
            theReplSet->addToElectable(rs->selfId());
        }
        else {
            theReplSet->rmFromElectable(rs->selfId());
        }

//……
}

 c) 确定是不是要发起投票

当确定了那些节点可以投票以后,就要判断是不是要发起投票了。

/** called as the health threads get new results */
    void Manager::msgCheckNewState() {
        {
            //判断之前的节点状态
            
//……
            const Member *p = rs->box.getPrimary();         //
            if( p && p != rs->_self ) {
                if( !p->hbinfo().up() ||
                        !p->hbinfo().hbstate.primary() ) {
                    p = 0;
                    rs->box.setOtherPrimary(0);
                }
            }

            const Member *p2;
            {
                bool two;
                //判断一下是不是还有别的主节点
                
//如果有两个主节点说明就等待其他节点自己解决谁是主节点的问题
                
//返回的节点是确定的主节点
                p2 = findOtherPrimary(two);
                if( two ) {
                    /* two other nodes think they are primary (asynchronously polled) -- wait for things to settle down. */
                    log() << "replSet info two primaries (transiently)" << rsLog;
                    return;
                }
            }

            if( p2 ) {
                noteARemoteIsPrimary(p2);
                return;
            }

            /* didn't find anyone who wants to be primary */
           //如果包含一个主节点
            if( p ) {
                /* we are already primary */
                //主节点不是自己,说明有人能做主
                if( p != rs->_self ) {
                    rs->sethbmsg("error p != rs->self in checkNewState");
                    log() << "replSet " << p->fullName() << rsLog;
                    log() << "replSet " << rs->_self->fullName() << rsLog;
                    return;
                }
                
                //如果自己是主节点,需要将自己降级
                if( rs->elect.shouldRelinquish() ) {
                    log() << "can't see a majority of the set, relinquishing primary" << rsLog;
                    rs->relinquish();
                }

                return;
            }

            if( !rs->iAmPotentiallyHot() ) { // if not we never try to be primary
                OCCASIONALLY log() << "replSet I don't see a primary and I can't elect myself" << endl;
                return;
            }

            //看自己有没有可能成为主节点 
            /* no one seems to be primary.  shall we try to elect ourself? */
            if( !rs->elect.aMajoritySeemsToBeUp() ) {
                static time_t last;
                static int n;
                int ll = 0;
                if( ++n > 5 ) ll++;
                if( last + 60 > time(0 ) ) ll++;
                log(ll) << "replSet can't see a majority, will not try to elect self" << rsLog;
                last = time(0);
                return;
            }

            if( !rs->iAmElectable() ) {
                return;
            }

            busyWithElectSelf = true// don't try to do further elections & such while we are already working on one.
        }
        try {
            //开始投票
            rs->elect.electSelf();
        }
        catch(RetryAfterSleepException&) {
            /* we want to process new inbounds before trying this again.  so we just put a checkNewstate in the queue for eval later. */
            requeue();
        }
        catch(...) {
            log() << "replSet error unexpected assertion in rs manager" << rsLog;
        }
        busyWithElectSelf = false;
    }

 

d) 投票

作为心跳检测的接收方,当其他节点信息做了改变或者对某个节点前后连接时差大于4秒,就有可能调用Manager::msgCheckNewState更改自己状态并且发出投票请求。

与其说是投票不如说是就是一次询问的过程,就是节点向其他节点询问自己的状态是否符合当主机的条件。

//询问其他节点
    bool Consensus::weAreFreshest(bool& allUp, int& nTies) {
        const OpTime ord = theReplSet->lastOpTimeWritten;
        nTies = 0;
        verify( !ord.isNull() );
        //组织请求数据
        BSONObj cmd = BSON(
                          "replSetFresh" << 1 <<          
                          "set" << rs.name() <<                 //当前节点名称
                          "opTime" << Date_t(ord.asDate()) <<    //最后一次写入的时间
                          "who" << rs._self->fullName() <<       //当前节点路径
                          "cfgver" << rs._cfg->version <<         //当前节点设置的版本号
                          "id" << rs._self->id());                 //当前节点的id
        list<Target> L;
        int ver;
        /* the following queries arbiters, even though they are never fresh.  wonder if that makes sense.
           it doesn't, but it could, if they "know" what freshness it one day.  so consider removing
           arbiters from getTargets() here.  although getTargets is used elsewhere for elections; there
           arbiters are certainly targets - so a "includeArbs" bool would be necessary if we want to make
           not fetching them herein happen.
           
*/
        //获得投票列表
        rs.getTargets(L, ver);

        //发出请求
        multiCommand(cmd, L);
        int nok = 0;
        allUp = true;

        //处理请求(稍后分析)
        
//……
    }
//收到请求后的处理,忽略了验证处理和比较版本配置版本号的代码
        bool CmdReplSetFresh::shouldVeto(const BSONObj& cmdObj, string& errmsg) {
            // don't veto older versions
            if (cmdObj["id"].eoo()) {
                // they won't be looking for the veto field
                return false;
            }

            unsigned id = cmdObj["id"].Int();
            const Member* primary = theReplSet->box.getPrimary();           //当前的主服务器
            const Member* hopeful = theReplSet->findById(id);               //希望成为主机的服务器(从上面的代码看是发送请求方服务器)
            const Member *highestPriority = theReplSet->getMostElectable();    //当前节点心目中的主机

            
//以下判断发现不符合条件的就否决投票
            if( !hopeful ) {  //没有目标服务器
                errmsg = str::stream() << "replSet couldn't find member with id " << id;
                return true;
            }
            //如果当前的服务器是主机,而自己刚刚进行级别的调整
            else if( theReplSet->isPrimary() && theReplSet->lastOpTimeWritten >= hopeful->hbinfo().opTime ) {  
                // hbinfo is not updated, so we have to check the primary's last optime separately
                errmsg = str::stream() << "I am already primary, " << hopeful->fullName() <<
                    " can try again once I've stepped down";
                return true;
            }
            //如果主服务器,刚刚进行级别调整
            else if( primary && primary->hbinfo().opTime >= hopeful->hbinfo().opTime ) {
                // other members might be aware of more up-to-date nodes
                errmsg = str::stream() << hopeful->fullName() << " is trying to elect itself but " <<
                    primary->fullName() << " is already primary and more up-to-date";
                return true;
            }
            //当前节点记录的最高优先级节点的优先级比目标节点高
            else if( highestPriority && highestPriority->config().priority > hopeful->config().priority) {
                errmsg = str::stream() << hopeful->fullName() << " has lower priority than " << highestPriority->fullName();
                return true;
            }

            //当前节点不允许投票或者当前节点记录的最高优先级节点的优先级比目标节点高(不明白为什么又判断一遍)
            if ( !theReplSet->isElectable(id) ||
                (highestPriority && highestPriority->config().priority > hopeful->config().priority)) {
                return true;
            }

            return false;
        }
//得到投票数据以后解析
bool Consensus::weAreFreshest(bool& allUp, int& nTies) {
       //省略发送请求的部分
       
//请求返回后存储在list<Target>中
        int nok = 0;
        allUp = true;
        for( list<Target>::iterator i = L.begin(); i != L.end(); i++ ) {
        //i存储这其他节点返回的结果
            if( i->ok ) {
                nok++;
                if( i->result["fresher"].trueValue() ) {      //当前服务器不是最新的
                    log() << "not electing self, we are not freshest" << rsLog;
                    return false;
                }
                //根据返回的操作时间,统计与自己时间相同的节点(这部分从代码上是这个意思,不过后面的解析没有太懂,从其他地方的注释看,是个正在开发的功能)
                OpTime remoteOrd( i->result["opTime"].Date() );
                if( remoteOrd == ord )
                    nTies++;     
                verify( remoteOrd <= ord );

                //查看是当前节点是否否决自己成为主节点
                if( i->result["veto"].trueValue() ) {
                    BSONElement msg = i->result["errmsg"];
                    if (!msg.eoo()) {
                        log() << "not electing self, " << i->toHost << " would veto with '" <<
                            msg.String() << "'" << rsLog;
                    }
                    else {
                        log() << "not electing self, " << i->toHost << " would veto" << rsLog;
                    }
                    return false;
                }
            }
            else {
                DEV log() << "replSet freshest returns " << i->result.toString() << rsLog;
                allUp = false;
            }
        }
        LOG(1) << "replSet dev we are freshest of up nodes, nok:" << nok << " nTies:" << nTies << rsLog;
        verify( ord <= theReplSet->lastOpTimeWritten ); // <= as this may change while we are working...
        return true;
     }

 如果没有其他服务器投反对票,那么就向其他服务器发送设置自己为主节点的信息。之后,会向其他节点发送消息通知自己当选的消息,其他节点返回是否赞成,当赞成票过半的时候,自己当选。

void Consensus::_electSelf() {
           //略去投票部分
            
//……

             
//分发投票结果的确认部分
            BSONObj electCmd = BSON(
                                   "replSetElect" << 1 <<
                                   "set" << rs.name() <<              //节点名称
                                   "who" << me.fullName() <<        //节点路径
                                   "whoid" << me.hbinfo().id() <<      //节点的id
                                   "cfgver" << rs._cfg->version <<      //节点的配置信息
                                   "round" << OID::gen() /* this is just for diagnostics */
                               );

            int configVersion;
            list<Target> L;
            rs.getTargets(L, configVersion);
            multiCommand(electCmd, L);    //请求发送

 

//处理确认请求
    void Consensus::electCmdReceived(BSONObj cmd, BSONObjBuilder* _b) {
        BSONObjBuilder& b = *_b;
        DEV log() << "replSet received elect msg " << cmd.toString() << rsLog;
        else LOG(2) << "replSet received elect msg " << cmd.toString() << rsLog;
        string set = cmd["set"].String();
        unsigned whoid = cmd["whoid"].Int();
        int cfgver = cmd["cfgver"].Int();
        OID round = cmd["round"].OID();
        int myver = rs.config().version;

        const Member* primary = rs.box.getPrimary();
        const Member* hopeful = rs.findById(whoid);
        const Member* highestPriority = rs.getMostElectable();

        int vote = 0;
        ifset != rs.name() ) {
            log() << "replSet error received an elect request for '" << set << "' but our set name is '" << rs.name() << "'" << rsLog;
        }
        else if( myver < cfgver ) {    //如果接收方的版本号小于目标节点的版本号
            
// we are stale.  don't vote  不做其他处理
        }
        else if( myver > cfgver ) {    //如果接收方的版本号大于目标节点版本号,对方版本号过期,不赞成
            
// they are stale!
            log() << "replSet electCmdReceived info got stale version # during election" << rsLog;
            vote = -10000;
        }
        else if( !hopeful ) {    //目标节点id没有在接收方节点中挂名,不赞成
            log() << "replSet electCmdReceived couldn't find member with id " << whoid << rsLog;
            vote = -10000;
        }
        //如果主节点是自己,并且自己的最后写入时间比目标节点新,不赞成
        else if( primary && primary == rs._self && rs.lastOpTimeWritten >= hopeful->hbinfo().opTime ) {
            // hbinfo is not updated, so we have to check the primary's last optime separately
            log() << "I am already primary, " << hopeful->fullName()
                  << " can try again once I've stepped down" << rsLog;
            vote = -10000;
        }
        //如果接收方认为的主节点的写入时间比目标节点的写入时间新,不赞成
        else if( primary && primary->hbinfo().opTime >= hopeful->hbinfo().opTime ) {
            // other members might be aware of more up-to-date nodes
            log() << hopeful->fullName() << " is trying to elect itself but " <<
                  primary->fullName() << " is already primary and more up-to-date" << rsLog;
            vote = -10000;
        }
        //接收方认为当选呼声最高的节点比目标节点的当选呼声高,不赞成
        else if( highestPriority && highestPriority->config().priority > hopeful->config().priority) {
            log() << hopeful->fullName() << " has lower priority than " << highestPriority->fullName();
            vote = -10000;
        }
        else {
            //如果满足上面的所有条件
            try {
                vote = yea(whoid);     //算出赞成度
                dassert( hopeful->id() == whoid );
                rs.relinquish();
                log() << "replSet info voting yea for " <<  hopeful->fullName() << " (" << whoid << ')' << rsLog;
            }
            catch(VoteException&) {
                log() << "replSet voting no for " << hopeful->fullName() << " already voted for another" << rsLog;
            }
        }
        //组织返回数据
        b.append("vote", vote);  
        b.append("round", round);
    }
//处理返回结果 
           {
                for( list<Target>::iterator i = L.begin(); i != L.end(); i++ ) {
                    DEV log() << "replSet elect res: " << i->result.toString() << rsLog;
                    if( i->ok ) {
                        int v = i->result["vote"].Int();
                        tally += v;   //统计赞成结果
                    }
                }
                if( tally*2 <= totalVotes() ) {  //赞成结果小于投票总数的一半,终止成为主节点
                    log() << "replSet couldn't elect self, only received " << tally << " votes" << rsLog;
                }
                else if( time(0) - start > 30 ) {  //投票时间大于三十秒,终止成为主节点
                    
// defensive; should never happen as we have timeouts on connection and operation for our conn
                    log() << "replSet too much time passed during our election, ignoring result" << rsLog;
                }
                else if( configVersion != rs.config().version ) {   //传出的版本号与返回的版本号不一致(说明投票过程中有版本修改),终止成为主节点
                    log() << "replSet config version changed during our election, ignoring result" << rsLog;
                }
                else {
                    /* succeeded. */
                    log(1) << "replSet election succeeded, assuming primary role" << rsLog;
                    success = true;
                    rs.assumePrimary();  //当选为主节点
                }
            }
 e) 总结投票过程

在心跳检测中,节点间互相传递着信息。通过这些信息,节点能了解到其他节点的情况(配置版本,是否能连接上等等)。单个节点统计着这些信息,当某个节点设置发生改变,或者网络连接出现异常的时候,开始发送投票事件。

投票时,首先根据心跳检测记录的信息判断哪些节点可以被连接到,即有投票权。之后向所有可以连接到的节点,发出自己能否成为主节点的请求。其他节点投是否否决票,如果一致通过,该节点进行下一步操作:向其他节点发出自己当选的请求,其他节点根据情况确定请求,如果返回的赞成数大于投票总数的一半,该节点当选。

转载于:https://www.cnblogs.com/biosli/archive/2012/10/19/2730776.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/262369.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

C# 字符串性能

Written By Dr Herbie [2] Translated By Allen Lee Introduction 你在代码中处理字符串的方法可能会对性能产生令人吃惊的影响。程序中需要考虑两个由于使用字符串而产生的问题&#xff1a;临时字符串变量的使用和字符串连接。Background1.String是引用类型&#xff0c;在堆上…

手把手教你部署VSAN见证虚拟设备 (Cormac)

译者注&#xff1a;本文翻译自Cormac的博客&#xff0c;并未严格地逐字逐句的直译&#xff0c;如有谬误&#xff0c;万望见谅。原文见此http://cormachogan.com/2015/09/14/step-by-step-deployment-of-the-vsan-witness-appliance/现在开始在之前的帖子中我曾经介绍过见证虚拟…

PyCharm编程环境的中英文字体分别设置的好处多----一石三鸟地解决中文字体不一致、英文字体不涵盖中文字符、编程字体实用性兼顾美观性的三个问题

在编程环境&#xff08;例如Python的PyCharm&#xff09;中&#xff0c;我们希望编程环境的字体具有如下特性&#xff1a; &#xff08;1&#xff09;字体覆盖中文和英文字符。 &#xff08;2&#xff09;等宽字体&#xff0c;并且是TrueType字体&#xff0c;方便辨识代码中缩…

WordPress博客系统的安全

随着计算机网络的流行&#xff0c;越来越多的人开始创建自己的博客&#xff0c;论起博客系统&#xff0c;全球用的最多的博客系统就是wordpress&#xff08;以下简称WP&#xff09;。但是如果用过WP的人都应该知道&#xff0c;WP的站点想要做的好看&#xff0c;插件是必不可少的…

mysql为什么行数据库_关系数据表中的行称为什么?

在一个二维表中&#xff0c;水平方向的行称为元组&#xff0c;每一行是一个元组&#xff1b;元组对应表中的一个具体记录。数据元组也称为记录。一个数据表中的每一个记录均有一个惟一的编号(记录号)。一个记录也就是数据表中的一行。元组(tuple)是关系数据库中的基本概念&…

maven安装以及eclipse配置maven

http://jingyan.baidu.com/article/295430f136e8e00c7e0050b9.html 必须先下载并安装JDK&#xff0c;配置JDK的环境变量JAVA_HOME&#xff0c;否则maven将无法使用 eclipse安装maven插件后必须重新定位maven到本地maven目录 如下定位&#xff1a; 为了使得Eclipse中安装的Maven…

JS 问题集锦

【1】js页面跳转 和 js打开新窗口方法 第一种&#xff1a;<script language"javascript" type"text/javascript">window.location.href"http://www.xlfun.com/login.php?backurl"window.location.href; </script>第二种&#xff1a…

重温Observer模式--热水器·改(转载)

引言 在 C#中的委托和事件 一文的后半部分&#xff0c;我向大家讲述了Observer(观察者)模式&#xff0c;并使用委托和事件实现了这个模式。实际上&#xff0c;不使用委托和事件&#xff0c;一样可以实现Observer模式。在本文中&#xff0c;我将使用GOF的经典方式&#xff0c;再…

龙图 VP 李翀:数据化运营及云计算下的运维

文 | 龙图 VP 李翀原文地址&#xff1a;当游戏遇上大数据和云计算——谈数据化运营及云计算下的运维龙图做了8年的游戏&#xff0c;真正快速发展起来是在移动互联网这个时代。随着刀塔快速的扩张&#xff0c;我们在各个领域都遇到了非常多的之前没有想到过的状态。发现云计算其…

鼠标移动时,光标相对于对象的位置

鼠标在一个物体上移动时&#xff0c;能够计算出光标相对于任何其它对象的相对位置。 Code<Grid x:Name"LayoutRoot" Background"White"> <Grid HorizontalAlignment"Left" VerticalAlignment"Top" Name"grid1&q…

怎么判断一个字符串的最长回文子串是否在头尾_【Leetcode每日打卡】最长回文串...

干货预警&#xff1a;所有文章都会首发于我的公众号【甜姨的奇妙冒险】&#xff0c;欢迎watch。一、来历&#xff1a;力扣从3月开始开启了每日一题打卡活动&#xff0c;于是跟风加入了打卡大军&#xff0c;这两天写评论、发题解&#xff0c;没想到反响还不错&#xff0c;收到了…

.NET平台下WEB应用程序的部署(安装数据库和自动配置)

.NET平台下WEB应用程序的部署(安装数据库和自动配置)李洪根在.NET平台下&#xff0c;部署 Web 解决方案是比较方便的。我们可以利用Visual Studio.NET 2003添加一个WEB安装项目&#xff0c;在部署的“文件系统编辑器”中添加项目的主输出和内容文件&#xff0c;非常简易地完成安…

苹果原壁纸高清_全面屏壁纸高清 | 电影经典台词截图高清

iPhonex手机壁纸高清(苹果xs壁纸&#xff0c;苹果xr壁纸&#xff0c;iPhone11壁纸&#xff0c;安卓全屏壁纸)火影壁纸高清&#xff0c;经典电影台词截图拼接壁纸&#xff0c;电影《去他妈的世界》壁纸&#xff0c;欧美电影截图加上经典台词&#xff0c;2020好看的手机壁纸&…

python字符串切片用法_详解Python字符串切片

在python中,我们定义好一个字符串,如下所示。 在python中定义个字符串然后把它赋值给一个变量。 我们可以通过下标访问单个的字符,跟所有的语言一样,下标从0开始(==,我自己都觉得写的好脑残了) 这个时候呢,我们可以通过切片的方式来截取出我们定义的字符串的一部分。 使用…

azkaban config: nodes:_关于Nordic SDK的sdk.config.h

使用MDK对Nordic SDK开发&#xff0c;离不开sdk.config.h。请看以下两个视图&#xff1a;Text Editor视图Configuration Wiard视图咋一看挺神奇的&#xff0c;配置视图话&#xff0c;方便。具体实现和规则就不一一介绍了&#xff0c;网页搜“MDK中Configuration Wiard的使用”。…

SilverLight开发系列第1步:搭建开发环境

来自 http://www.cnblogs.com/kaima/archive/2008/08/17/1269637.html 在缺乏SilverLight中文教程的今天&#xff0c;新人要入门不容易&#xff0c;所以我根据自己阅读英文原档和实践经验&#xff0c;总结一个系列。首先介绍的是开发环境的搭建。 个人强烈推荐安装英文版的开发…

php使用smtp.sina.com邮箱发送邮件

2019独角兽企业重金招聘Python工程师标准>>> 需要写一个smtp发送类 <?php set_time_limit(120);class smtp {/* Public Variables */var $smtp_port;var $time_out;var $host_name;var $log_file;var $relay_host;var $debug;var $auth;var $user;var $pass;var…

java 不存在就创建_Java 判断多级路径是否存在,不存在就创建

Java 判断多级路径是否存在&#xff0c;不存在就创建方案一&#xff1a;(带文件名的Path&#xff1a;如&#xff1a;D:\news\2014\12\abc.text)public boolean isexitsPath(String path)throws InterruptedException{String [] pathspath.split("\\\\");StringBuffer…

asynchttpclient 超时_DNF:95更新前还能免费获得一件超时空装备?但这个任务一定完成...

现在距离95版本更新还有16天的时间&#xff0c;在前段时间体验服更新的内容中&#xff0c;相信玩家们已经对新版本了解的差不多了。最受玩家关注的就是装备升级方面的一些内容&#xff0c;都说95版本是一个土豪专属副本&#xff0c;但是小编并不那么认为&#xff01;从材料上来…