一、流程:ceph-deploy部署ceph-mon组建集群
1.ceph-deploy部署ceph-mon的工作流程及首次启动
1)通过命令创建ceph-mon,命令为:ceph-deploy create mon keyring
def mon(args):if args.subcommand == 'create':mon_create(args)elif args.subcommand == 'add':mon_add(args)elif args.subcommand == 'destroy':mon_destroy(args)elif args.subcommand == 'create-initial':mon_create_initial(args)else:LOG.error('subcommand %s not implemented', args.subcommand)
2)在创建mon时,会根据传入的args参数生成配置文件ceph.conf。
def mon_create(distro, args, monitor_keyring):hostname = distro.conn.remote_module.shortname()logger = distro.conn.loggerlogger.debug('remote hostname: %s' % hostname)path = paths.mon.path(args.cluster, hostname)uid = distro.conn.remote_module.path_getuid(constants.base_path)gid = distro.conn.remote_module.path_getgid(constants.base_path)done_path = paths.mon.done(args.cluster, hostname)init_path = paths.mon.init(args.cluster, hostname, distro.init)conf_data = conf.ceph.load_raw(args)# write the configuration filedistro.conn.remote_module.write_conf( #写入配置/etc/ceph/ceph.confargs.cluster,conf_data,args.overwrite_conf,)def write_conf(cluster, conf, overwrite): #写入配置/etc/ceph/ceph.conf""" write cluster configuration to /etc/ceph/{cluster}.conf """import ospath = '/etc/ceph/{cluster}.conf'.format(cluster=cluster)tmp = '{path}.{pid}.tmp'.format(path=path, pid=os.getpid())if os.path.exists(path):with open(path) as f:old = f.read()if old != conf and not overwrite:raise RuntimeError('config file %s exists with different content; use --overwrite-conf to overwrite' % path)with open(tmp, 'w') as f:f.write(conf)f.flush()os.fsync(f)os.rename(tmp, path)
3)检查ceph-mon组件工作目录(/var/lib/ceph/mon/mycluster-myhostname)是否存在,不存在就创建,除了创建该目录外,还需要在该路径下创建keyring秘钥。然后执行命令"ceph-mon --cluster args.cluster --mkfs -i hostname --keyring --setuser uid --setgroup gid"启动ceph-mon进程,此时也是第一次启动ceph-mon。然后它会创建done文件并启动cepn-mon服务。
# if the mon path does not exist, create itdistro.conn.remote_module.create_mon_path(path, uid, gid) #path为/var/lib/ceph/mon/mycluster-myhostnameif not distro.conn.remote_module.path_exists(done_path):logger.debug('done path does not exist: %s' % done_path)if not distro.conn.remote_module.path_exists(paths.mon.constants.tmp_path): #如果路径不存在还需要创建keyringlogger.info('creating tmp path: %s' % paths.mon.constants.tmp_path)distro.conn.remote_module.makedir(paths.mon.constants.tmp_path)keyring = paths.mon.keyring(args.cluster, hostname)logger.info('creating keyring file: %s' % keyring)distro.conn.remote_module.write_monitor_keyring( #创建keyringkeyring,monitor_keyring,uid, gid,)user_args = []if uid != 0:user_args = user_args + [ '--setuser', str(uid) ]if gid != 0:user_args = user_args + [ '--setgroup', str(gid) ]remoto.process.run( #第一次运行时需要执行的命令distro.conn,['ceph-mon','--cluster', args.cluster,'--mkfs', '-i', hostname,'--keyring', keyring,] + user_args)# create the done file 创建done文件distro.conn.remote_module.create_done_path(done_path, uid, gid)# create init pathdistro.conn.remote_module.create_init_path(init_path, uid, gid)# start mon service 启动服务start_mon_service(distro, args.cluster, hostname) def create_mon_path(path, uid=-1, gid=-1):"""create the mon path if it does not exist"""if not os.path.exists(path):os.makedirs(path)os.chown(path, uid, gid);
4)启动之后,需要将ceph-mon加入到mon_in_quorum里面,这是一个set的数据结构,这里面包含着集群的所有ceph-mon。该mon_in_quorum里面包含着leader,其他全都是peon(普通成员)。
def mon_create_initial(args):# create them normally through mon_createargs.mon = mon_initial_membersmon_create(args)# make the sets to be able to compare latemon_in_quorum = set([]) for host in mon_initial_members:mon_name = 'mon.%s' % hostLOG.info('processing monitor %s', mon_name)sleeps = [20, 20, 15, 10, 10, 5]tries = 5rlogger = logging.getLogger(host)distro = hosts.get(host,username=args.username,callbacks=[packages.ceph_is_installed])while tries:status = mon_status_check(distro.conn, rlogger, host, args)has_reached_quorum = status.get('state', '') in ['peon', 'leader']if not has_reached_quorum:LOG.warning('%s monitor is not yet in quorum, tries left: %s' % (mon_name, tries))tries -= 1sleep_seconds = sleeps.pop()LOG.warning('waiting %s seconds before retrying', sleep_seconds)time.sleep(sleep_seconds) # Magic numberelse:mon_in_quorum.add(host) //添加进mon_in_quorumLOG.info('%s monitor has reached quorum!', mon_name)breakdistro.conn.exit()
2.ceph-mon数据存储方式
1)存储方式:mon它的数据可以通过两种方式来进行存储,一种是rocksDB存储、一种是leveldb存储,在ceph中具体使用哪一种存储方式取决于/var/lib/ceph/mon/$ceph-id目录下的kv_backend文件的内容,如果kv_backend中为rocksdb,则使用rocksdb存储,若为空或读取错误时,使用leveldb存储,它们都是一个key/value类型的数据库,区别在于rocksdb配置更灵活,支持的压缩算法比较多,除了snappy压缩外还支持zstd压缩,并且压缩比也更高。
int open(ostream &out) {string kv_type;int r = read_meta("kv_backend", &kv_type); //读取kv_backend文件,获取存储类型kv_typeif (r < 0 || kv_type.empty()) { // assume old monitors that did not mark the type were leveldb.kv_type = "leveldb";r = write_meta("kv_backend", kv_type);if (r < 0)return r;}_open(kv_type);r = db->open(out);if (r < 0)return r;
.....
}
2)存储位置:mon的数据存储在一个可配置的路径mon_data下面,mon_data默认位置为/var/lib/ceph/mon/$ceph-id目录下,该目录存放了mon的keyring秘钥、kv存储引擎名称(rocksdb)、mon支持的版本(octopus)、以及RocksDB的存储文件store.db。
Option("mon_data", Option::TYPE_STR, Option::LEVEL_ADVANCED).set_flag(Option::FLAG_NO_MON_UPDATE).set_default("/var/lib/ceph/mon/$cluster-$id") //默认mon_data配置路径为/var/lib/ceph/mon/$cluster-$id.add_service("mon").set_description("path to mon database")MonitorDBStore *store = new MonitorDBStore(g_conf()->mon_data);
ceph3:
/var/lib/ceph/mon/ceph-ceph3
# ls
done
keyring kv_backend min_mon_release store.db systemd
ceph3:
/var/lib/ceph/mon/ceph-ceph3
# cat kv_backend
rocksdb
ceph3:
/var/lib/ceph/mon/ceph-ceph3
#
3)ceph-mon数据主要包括集群健康状态、配置、osd是否存活和Paxos等数据,而存储在Rocksdb中的也正是这些数据,存储方式主要是采用SSTable(Sorted String Table)的方式存储。通过encode_pending将数据编码后存入rocksdb。
MonitorDBStore::TransactionRef t = paxos->get_pending_transaction();if (should_stash_full())encode_full(t);encode_pending(t);
have_pending = false;if (format_version > 0) {t->put(get_service_name(), "format_version", format_version);
}
二、流程:ceph-mon加入集群后二次启动
1.启动流程
1.在ceph_mon.cc文件的main函数中,首先判断linxdfs序列号是否正确,然后设置线程名ceph-mon;接着读取启动时传入的命令行参数“/usr/bin/ceph-mon -f --cluster ceph --id ceph1 --setuser root --setgroup root”,并检验命令行参数。
int main(int argc, const char **argv)
{//检查序列号char* const linxdfspath = "/etc/linxsn/linxdfs_sn.conf";
.....ceph_pthread_setname(pthread_self(), "ceph-mon");
......
//解析命令行参数std::string val;for (std::vector<const char*>::iterator i = args.begin(); i != args.end(); ) {if (ceph_argparse_double_dash(args, i)) {break;} else if (ceph_argparse_flag(args, i, "--mkfs", (char*)NULL)) { //若命令行参数中有mkfs参数,则会进行370的mkfs操作mkfs = true;} else if (ceph_argparse_flag(args, i, "--compact", (char*)NULL)) {compact = true;} else if (ceph_argparse_flag(args, i, "--force-sync", (char*)NULL)) {force_sync = true;} else if (ceph_argparse_flag(args, i, "--yes-i-really-mean-it", (char*)NULL)) {yes_really = true;} else if (ceph_argparse_witharg(args, i, &val, "--osdmap", (char*)NULL)) {osdmapfn = val;} else if (ceph_argparse_witharg(args, i, &val, "--inject_monmap", (char*)NULL)) {inject_monmap = val;} else if (ceph_argparse_witharg(args, i, &val, "--extract-monmap", (char*)NULL)) {extract_monmap = val;} else {++i;}}
2.然后进行mkfs流程,该流程里面会检查并创建/var/lib/ceph/mon/$ceph_id目录,该目录包括以下几个文件:done keyring kv_backend min_mon_release systemd和子目录 store.db 。
// -- mkfs --if (mkfs) { //第一次启动时,mkfs一定会为true,并进入该if内部创建/var/lib/ceph/$ceph_id目录,同时会为该目录填充内容int err = check_mon_data_exists(); //当mkfs为true时,第一次启动会检查mon_data存在,不存在会mkdir创建if (err == -ENOENT) {if (::mkdir(g_conf()->mon_data.c_str(), 0755)) {derr << "mkdir(" << g_conf()->mon_data << ") : "<< cpp_strerror(errno) << dendl;exit(1);}} else if (err < 0) {derr << "error opening '" << g_conf()->mon_data << "': "<< cpp_strerror(-err) << dendl;exit(-err);}
3.构建monmap,将mon_data中的数据(store.db)decode解码到bufflist中,再写入到文件,以此来构建monmap。
......MonMap monmap; //构建monmap{// note that even if we don't find a viable monmap, we should go ahead// and try to build it up in the next if-else block.bufferlist mapbl;int err = obtain_monmap(*store, mapbl); //从store.db中获取monmap信息并构建monmapif (err >= 0) {try {monmap.decode(mapbl);} catch (const buffer::error& e) {derr << "can't decode monmap: " << e.what() << dendl;}} else {derr << "unable to obtain a monmap: " << cpp_strerror(err) << dendl;}dout(10) << __func__ << " monmap:\n";JSONFormatter jf(true);jf.dump_object("monmap", monmap);jf.flush(*_dout);*_dout << dendl;if (!extract_monmap.empty()) {int r = mapbl.write_file(extract_monmap.c_str());
4.创建Messager对象msgr,从monmap中获取rank并绑定到msgr上面,设置msgr信息、绑定地址等
//创建msgrMessenger *msgr = Messenger::create(g_ceph_context, public_msgr_type,entity_name_t::MON(rank), "mon",0, // zero nonceMessenger::HAS_MANY_CONNECTIONS);msgr->set_cluster_protocol(CEPH_MON_PROTOCOL);msgr->set_default_send_priority(CEPH_MSG_PRIO_HIGH);msgr->set_default_policy(Messenger::Policy::stateless_server(0));msgr->set_policy(entity_name_t::TYPE_MON,Messenger::Policy::lossless_peer_reuse(CEPH_FEATURE_SERVER_LUMINOUS));msgr->set_policy(entity_name_t::TYPE_OSD,Messenger::Policy::stateless_server(CEPH_FEATURE_SERVER_LUMINOUS));msgr->set_policy(entity_name_t::TYPE_CLIENT,Messenger::Policy::stateless_server(0));msgr->set_policy(entity_name_t::TYPE_MDS,Messenger::Policy::stateless_server(0));// binderr = msgr->bindv(bind_addrs);if (public_addrs != bind_addrs) {msgr->set_addrs(public_addrs);}
5.创建Monitor对象mon,设置传入的cmd信息,调用preinit进行预初始化(预初始化里面主要包括对paxos、msgr对应的服务端,客户端初始化)。
//创建mon对象mon = new Monitor(g_ceph_context, g_conf()->name.get_id(), store,msgr, mgr_msgr, &monmap); //创建mon对象mon->orig_argc = argc;mon->orig_argv = argv;err = mon->preinit(); //预初始化
int Monitor::preinit()
{paxos->init_logger();init_paxos();messenger->set_auth_client(this);messenger->set_auth_server(this);mgr_messenger->set_auth_client(this);
....
}
6.启动msgr,然后调用init对mon进行初始化同时启动mon。
msgr->start();
mgr_msgr->start();mon->init(); //初始化mon
7.当触发SIGINT、SIGTERM信号时就会释放所有mon、msgr等资源。
register_async_signal_handler_oneshot(SIGINT, handle_mon_signal);register_async_signal_handler_oneshot(SIGTERM, handle_mon_signal);if (g_conf()->inject_early_sigterm)kill(getpid(), SIGTERM);msgr->wait();mgr_msgr->wait();store->close();shutdown_async_signal_handler();delete mon;delete store;delete msgr;.....
3.加入集群
ceph-mon需要与其他监视器节点进行通信以构建监视器集群。它会尝试连接到其他已知的监视器节点,并通过消息交换建立集群中的监视器之间的通信。
3.1)建立通信连接(绑定地址、端口等)
ceph-mon模块通信依赖于AsyncMessager的异步通信,在ceph-mon.cc里面创建mon和Messenger对象(由于继承关系实质上是创建的AsyncMessenger对象),并且在初始化mon和AsyncMessager时,服务端会绑定本机ip和端口(通过配置获取),然后再调用_init_local_connection函数建立连接。
//创建Messenger对象
Messenger *Messenger::create(CephContext *cct, const string &type,entity_name_t name, string lname,uint64_t nonce, uint64_t cflags)
{int r = -1;if (type == "random") {r = 0;//r = ceph::util::generate_random_number(0, 1);}if (r == 0 || type.find("async") != std::string::npos)return new AsyncMessenger(cct, name, type, std::move(lname), nonce); //异步对象lderr(cct) << "unrecognized ms_type '" << type << "'" << dendl;return nullptr;
} // binderr = msgr->bindv(bind_addrs);if (err < 0) {derr << "unable to bind monitor to " << bind_addrs << dendl;prefork.exit(1);}
//绑定socket具体实现
int AsyncMessenger::bindv(const entity_addrvec_t &bind_addrs)
{lock.lock();if (!pending_bind && started) {ldout(cct,10) << __func__ << " already started" << dendl;lock.unlock();return -1;}ldout(cct,10) << __func__ << " " << bind_addrs << dendl;if (!stack->is_ready()) {ldout(cct, 10) << __func__ << " Network Stack is not ready for bind yet - postponed" << dendl;pending_bind_addrs = bind_addrs;pending_bind = true;lock.unlock();return 0;}lock.unlock();// bind to a socketset<int> avoid_ports;entity_addrvec_t bound_addrs;unsigned i = 0;for (auto &&p : processors) {int r = p->bind(bind_addrs, avoid_ports, &bound_addrs);if (r) {// Note: this is related to local tcp listen table problem.// Posix(default kernel implementation) backend shares listen table// in the kernel, so all threads can use the same listen table naturally// and only one thread need to bind. But other backends(like dpdk) uses local// listen table, we need to bind/listen tcp port for each worker. So if the// first worker failed to bind, it could be think the normal error then handle// it, like port is used case. But if the first worker successfully to bind// but the second worker failed, it's not expected and we need to assert// hereceph_assert(i == 0);return r;}++i;}_finish_bind(bind_addrs, bound_addrs);return 0;
}
//启动AsyncMessenger
int AsyncMessenger::start()
{std::scoped_lock l{lock};ldout(cct,1) << __func__ << " start" << dendl;// register at least one entity, first!ceph_assert(my_name.type() >= 0);ceph_assert(!started);started = true;stopped = false;if (!did_bind) {entity_addrvec_t newaddrs = *my_addrs;for (auto& a : newaddrs.v) {a.nonce = nonce;}set_myaddrs(newaddrs);_init_local_connection(); //建立连接}return 0;
}
3.2)加入集群
ceph-mon在与其他ceph-mon建立起链接过后会进入STATE_PROBING状态,然后发送OP_PROBE消息给各个节点,等待其他节点同步完成后开始插入到集群中。
void Monitor::bootstrap()
{
.....// probe monitorsdout(10) << "probing other monitors" << dendl;for (unsigned i = 0; i < monmap->size(); i++) {if ((int)i != rank)send_mon_message(new MMonProbe(monmap->fsid, MMonProbe::OP_PROBE, name, has_ever_joined, //发送probe消息给其他节点ceph_release()),i);
......dout(10) << "bootstrap" << dendl;wait_for_paxos_write(); //等待其他节点同步完成
......if (monmap->contains(name))quorum.insert(name); //插入集群中
....
}
4.mon选举
当mon增加或减少时,ceph-mon进程会触发回调函数call_async里start_election开始进行选举,在该函数里主要做了以下几件事:
1)如果Paxos正在STATE_WRITING或者STATE_WRITING_PREVIOUS状态,则等待paxos更新完成。
2)调用_reset()重置monitor中的服务,包括probe timeout事件、health检查事件、scrub事件等,并且restart paxos以及所有的paxos service服务。
3)设置自己进入STATE_ELECTING状态,并增加l_mon_num_elections和l_mon_election_call这些统计数据。
4)调用elector的call_election()进行选举。
void Monitor::start_election()
{dout(10) << "start_election" << dendl;wait_for_paxos_write(); //等待paxos的更新完成_reset();state = STATE_ELECTING; //设置自身状态logger->inc(l_mon_num_elections);logger->inc(l_mon_election_call);clog->info() << "mon." << name << " calling monitor election";elector.call_election();
}// called by bootstrap(), or on leader|peon -> electing
void Monitor::_reset()
{dout(10) << __func__ << dendl;// disable authentication{std::lock_guard l(auth_lock);authmon()->_set_mon_num_rank(0, 0);}cancel_probe_timeout();timecheck_finish();health_events_cleanup(); //重置健康服务health_check_log_times.clear();scrub_event_cancel();leader_since = utime_t();quorum_since = {};if (!quorum.empty()) {exited_quorum = ceph_clock_now();}quorum.clear();outside_quorum.clear(); //重置选举服务quorum_feature_map.clear();scrub_reset();paxos->restart();for (auto& svc : paxos_service) {svc->restart();}
}
5)Elector::call_election (),在这里主要做了以下几件事:
5.1)从Mon store中读出mon的election_epoch存储在epoch中,更新epoch的值使其变为奇数,表明进入了选举cycle。epoch为偶数,表明已经形成了稳定的quorum。epoch为偶数时表示为稳定状态,奇数为还在选举中。
5.2)把自己加入到acked_me map中,并设置electing_me为true,希望大家选自己当leader。
5.3)向monmap中的成员发送MMonElection::OP_PROPOSE消息。
void ElectionLogic::start()
{if (!participating) {ldout(cct, 0) << "not starting new election -- not participating" << dendl;return;}ldout(cct, 5) << "start -- can i be leader?" << dendl;acked_me.clear();init();// start by trying to elect meif (epoch % 2 == 0) {bump_epoch(epoch+1); // odd == election cycle·更新epoch值为奇数} else {elector->validate_store();}electing_me = true;acked_me.insert(elector->get_my_rank()); //加入acked_meleader_acked = -1;elector->propose_to_peers(epoch); //发送OP_PROPOSE消息elector->_start();
}
6)其它的Monitor收到消息后,经过dispatch逻辑,即Monitor:: ms_dispatch() --> Monitor::_ms_dispatch() --> Monitor::dispatch_op()--> Elector::dispatch(),之后进入消息处理流程。dispatch()中调用Elector::handle_propose(),首先确保收到消息的epoch版本是处于选举的版本(奇数)并且满足对feature的要求,接着判断将自己的选举epoch设置为和消息中包含的epoch的值,最后调用ElectionLogic::receive_propose比对rank值,如果其他的Monitor它们自己的rank值更小,则自己不去确认此次选举,而是重新发起一轮选举,如果它们自己的rank值更大,则进入Elector::defer()流程,发送MMonElection::OP_ACK消息,确认该轮选举为最小的那个Monitor,这样经过rank值小的Monitor多次选举后,最终选出了rank值最小的那个Monitor,选它为leader。
bool ms_dispatch(Message *m) override {std::lock_guard l{lock};_ms_dispatch(m); //return true;}void Monitor::_ms_dispatch(Message *m)
{
......if ((is_synchronizing() ||(!s->authenticated && !exited_quorum.is_zero())) &&!src_is_mon &&m->get_type() != CEPH_MSG_PING) {waitlist_or_zap_client(op);} else {dispatch_op(op); //}return;
}
void Monitor::dispatch_op(MonOpRequestRef op)
{
......// elector messagescase MSG_MON_ELECTION:op->set_type_election();//check privileges here for simplicityif (!op->get_session()->is_capable("mon", MON_CAP_X)) {dout(0) << "MMonElection received from entity without enough caps!"<< op->get_session()->caps << dendl;return;;}if (!is_probing() && !is_synchronizing()) {elector.dispatch(op); //}
......
}void Elector::dispatch(MonOpRequestRef op)
{op->mark_event("elector:dispatch");ceph_assert(op->is_type_election());switch (op->get_req()->get_type()) {case MSG_MON_ELECTION:
......switch (em->op) {case MMonElection::OP_PROPOSE: //处理OP_PROPOSE消息handle_propose(op);return;
......
}void Elector::handle_propose(MonOpRequestRef op)
{op->mark_event("elector:handle_propose");auto m = op->get_req<MMonElection>();dout(5) << "handle_propose from " << m->get_source() << dendl;int from = m->get_source().num();ceph_assert(m->epoch % 2 == 1); // election 确保选举epoch为奇数uint64_t required_features = mon->get_required_features();mon_feature_t required_mon_features = mon->get_required_mon_features();dout(10) << __func__ << " required features " << required_features<< " " << required_mon_features<< ", peer features " << m->get_connection()->get_features()<< " " << m->mon_features<< dendl;if ((required_features ^ m->get_connection()->get_features()) &required_features) {dout(5) << " ignoring propose from mon" << from<< " without required features" << dendl;nak_old_peer(op);return;} else if (mon->monmap->min_mon_release > m->mon_release) {dout(5) << " ignoring propose from mon" << from<< " release " << (int)m->mon_release<< " < min_mon_release " << (int)mon->monmap->min_mon_release<< dendl;nak_old_peer(op);return;} else if (!m->mon_features.contains_all(required_mon_features)) {// all the features in 'required_mon_features' not in 'm->mon_features'mon_feature_t missing = required_mon_features.diff(m->mon_features);dout(5) << " ignoring propose from mon." << from<< " without required mon_features " << missing<< dendl;nak_old_peer(op);}logic.receive_propose(from, m->epoch); //比对rank值,决定选举权
}void ElectionLogic::receive_propose(int from, epoch_t mepoch)
{
......if (elector->get_my_rank() < from) {// i would win over them.if (leader_acked >= 0) { // we already acked someoneceph_assert(leader_acked < from); // and they still win, of courseldout(cct, 5) << "no, we already acked " << leader_acked << dendl;} else {// wait, i should win!if (!electing_me) {elector->trigger_new_election();}}} else { //自身rank值更大// they would win over meif (leader_acked < 0 || // haven't acked anyone yet, orleader_acked > from || // they would win over who you did ack, orleader_acked == from) { // this is the guy we're already deferring todefer(from); //确认选举} else {// ignore them!ldout(cct, 5) << "no, we already acked " << leader_acked << dendl;}}......}
5.同步数据
选举完成后,ceph-mon需要同步leader节点数据,触发MSG_MON_SYNC事件类型,经过调用栈dispatch_op->handle_sync->handle_sync_chunk→sync_finish调用apply_transaction进行数据同步。
void Monitor::sync_finish(version_t last_committed)
{
......if (sync_full) {// finalize the paxos commitsauto tx(std::make_shared<MonitorDBStore::Transaction>());paxos->read_and_prepare_transactions(tx, sync_start_version,last_committed);tx->put(paxos->get_name(), "last_committed", last_committed);dout(30) << __func__ << " final tx dump:\n";JSONFormatter f(true);tx->dump(&f);f.flush(*_dout);*_dout << dendl;store->apply_transaction(tx);}
......
}
6.健康检查
当其他节点传入的消息op类型为CEPH_MSG_PING时,mon会执行handle_ping流程去处理,处理过程是先通过op获取到请求的消息,然后构造reply消息进行回复,reply消息的内容是通过mon内置的healthMonitor获取到的状态信息。
void Monitor::dispatch_op(MonOpRequestRef op)
{
.......case CEPH_MSG_PING:handle_ping(op); return;
......
}void Monitor::handle_ping(MonOpRequestRef op)
{auto m = op->get_req<MPing>();dout(10) << __func__ << " " << *m << dendl;MPing *reply = new MPing;bufferlist payload;boost::scoped_ptr<Formatter> f(new JSONFormatter(true));f->open_object_section("pong");healthmon()->get_health_status(false, f.get(), nullptr);get_mon_status(f.get());f->close_section();stringstream ss;f->flush(ss);encode(ss.str(), payload);reply->set_payload(payload); //设置发送内容,即健康信息dout(10) << __func__ << " reply payload len " << reply->get_payload().length() << dendl;m->get_connection()->send_message(reply); //发送回复
}
三、ceph-mon集群正常工作时的工作流程
ceph-mon集群正常运行情况下,mon数量和状态并没有发生变化,因此不会触发重新选举leader的行为,所以此时的ceph-mon更多的是监控和维护集群的状态,它会执行一些监控流程,比如监控集群状态情况、记录日志等。
1.记录日志
ceph-mon通过dout宏来将日志输出到指定文件中,日志路径可通过配置写入log_file变量中,当需要打印日志时,可通过如下方式写入日志到文件中(需要将ceph.conf中对应模块日志级别debug mgr、debug mon等调至20 dout(20)才能生效):
void LogMonitor::update_from_paxos(bool *need_bootstrap)
{
.......if (g_conf()->mon_cluster_log_to_file) { //获取配置中的log_file变量,该变量存放日志位置string log_file = channels.get_log_file(channel);dout(20) << __func__ << " logging for channel '" << channel<< "' to file '" << log_file << "'" << dendl;
......
}
2.监控集群状态
2.1)ceph-mon定期进行对集群其他节点进行状态收集,状态收集的周期默认为30s,可通过mon_data_avail_warn进行配置更改周期长度,状态收集的过程实质是更新monmap、osdmap和pgmap这些表来监控集群的状态。
Option("mon_data_avail_warn", Option::TYPE_INT, Option::LEVEL_ADVANCED)
.set_default(30) //配置默认30s
.add_service("mon")
.set_description("issue MON_DISK_LOW health warning when mon available space below this percentage"),
2.2)每个节点的ceph-mon都会收集自身的节点状态,然后互相通信来同步各自节点的状态。
2.2.1)ceph-mon 在处理同步的流程中,根据ceph-mon发出的同步请求MMonSync::OP_CHUNK给leader进行处理,调用Monitor::handle_sync_chunk(MonOpRequestRef op)将数据发送给集群leader节点。
void Monitor::handle_sync(MonOpRequestRef op)
{auto m = op->get_req<MMonSync>();dout(10) << __func__ << " " << *m << dendl;switch (m->op) {// provider ---------case MMonSync::OP_CHUNK: //同步case MMonSync::OP_LAST_CHUNK:handle_sync_chunk(op); break;
......
}void Monitor::handle_sync_chunk(MonOpRequestRef op)
{
......if (m->op == MMonSync::OP_CHUNK) {sync_reset_timeout();sync_get_next_chunk();} else if (m->op == MMonSync::OP_LAST_CHUNK) {sync_finish(m->last_committed);}
......
}
2.2.2)选举完成后,ceph-mon需要同步leader节点数据,触发MSG_MON_SYNC事件类型,经过调用栈dispatch_op->handle_sync->handle_sync_chunk→sync_finish调用apply_transaction进行数据同步。
void Monitor::sync_finish(version_t last_committed)
{
......if (sync_full) {// finalize the paxos commitsauto tx(std::make_shared<MonitorDBStore::Transaction>());paxos->read_and_prepare_transactions(tx, sync_start_version,last_committed);tx->put(paxos->get_name(), "last_committed", last_committed);dout(30) << __func__ << " final tx dump:\n";JSONFormatter f(true);tx->dump(&f);f.flush(*_dout);*_dout << dendl;store->apply_transaction(tx);}
......
}
其他:ceph-mon通信方式分析
1)vip迁移到另外节点,ceph-mon恢复需要同步哪些数据
当vip发生迁移时,需要同步迁移ceph-mon的节点的/var/lib/ceph/mon/$cluster-$ceph-id/目录内的所有数据,因为该目录存储了ceph-mon的所有数据。可参考:https://www.bookstack.cn/read/ceph-handbook/Advance_usage-mon_backup.mdhttps://www.bookstack.cn/read/ceph-handbook/Advance_usage-mon_backup.md
2)数据通信
建立通信连接后,AsyncMessenger对象中的NetworkStack成员会默认创建三个worker(可配置),每个worker线程被创建时都会被命名为msgr-worker-0/1/2以此类推,这些线程是真正被用来进行通信的,具体通信方式是:每个线程中包含一个EventCenter去获取发生的事件,通过EventCenter内置的EpollDriver对象来获取并处理这些事件,该对象使用epoll网络模型,当某个socket有事件到来时,会被该epoll对象监测到并根据不同的事件类型进行处理,EventCenter中支持的事件类型有file事件和timer事件,主要包含事件的创建、删除以及处理超时事件。
NetworkStack::NetworkStack(CephContext *c, const string &t): type(t), started(false), cct(c)
{const uint64_t InitEventNumber = 5000;num_workers = cct->_conf->ms_async_op_threads; // cct->_conf->ms_async_op_threads默认配置为3for (unsigned i = 0; i < num_workers; ++i) {Worker *w = create_worker(cct, type, i);w->center.init(InitEventNumber, i, type);workers.push_back(w);}cct->register_fork_watcher(this);
}
//线程命名为msgr-worker-%u
std::function<void ()> NetworkStack::add_thread(unsigned worker_id)
{Worker *w = workers[worker_id]; //worker线程return [this, w]() {char tp_name[16];sprintf(tp_name, "msgr-worker-%u", w->id);ceph_pthread_setname(pthread_self(), tp_name);const unsigned EventMaxWaitUs = 30000000;w->center.set_owner(); //创建CenterDriverldout(cct, 10) << __func__ << " starting" << dendl;w->initialize();w->init_done();while (!w->done) {ldout(cct, 30) << __func__ << " calling event process" << dendl;//创建worker如下
Worker* NetworkStack::create_worker(CephContext *c, const string &type, unsigned i)
{if (type == "posix")return new PosixWorker(c, i);...
}
//EventCenter
class Worker : public Thread {...EventCenter center;...
}//初始化EventCenter
int EventCenter::init(int nevent, unsigned center_id, const std::string &type)
{// can't init multi timesceph_assert(this->nevent == 0);this->type = type;this->center_id = center_id;if (type == "dpdk") {
#ifdef HAVE_DPDKdriver = new DPDKDriver(cct);
#endif} else {
#ifdef HAVE_EPOLLdriver = new EpollDriver(cct); //使用epoll模型
#else
#ifdef HAVE_KQUEUEdriver = new KqueueDriver(cct);
#elsedriver = new SelectDriver(cct);
#endif
#endif}
......int fds[2];if (pipe_cloexec(fds, 0) < 0) { //创建管道int e = errno;lderr(cct) << __func__ << " can't create notify pipe: " << cpp_strerror(e) << dendl;return -e;}notify_receive_fd = fds[0];notify_send_fd = fds[1];r = net.set_nonblock(notify_receive_fd); //设置非阻塞socketif (r < 0) {return r;}r = net.set_nonblock(notify_send_fd);if (r < 0) {return r;}return r;
}
} // Used by internal threadint create_file_event(int fd, int mask, EventCallbackRef ctxt); //创建file事件uint64_t create_time_event(uint64_t milliseconds, EventCallbackRef ctxt); //创建timer事件void delete_file_event(int fd, int mask);void delete_time_event(uint64_t id);int process_events(unsigned timeout_microseconds, ceph::timespan *working_dur = nullptr); //处理超时事件