本章摘要
上文说到,osdc中封装请求,使用message中的相关机制将请求发送出去。
本文详细介绍osd服务端如何进行请求的接收。
osd初始化
osd启动时,定义了message变量ms_public,该变量绑定public网络,负责接收客户端的请求。ms_public会启动对应的线程进行接收,并指定接收函数。
//ceph_osd.ccMessenger *ms_public = Messenger::create(g_ceph_context, public_msg_type,entity_name_t::OSD(whoami), "client", nonce);Messenger *ms_cluster = Messenger::create(g_ceph_context, cluster_msg_type,entity_name_t::OSD(whoami), "cluster", nonce);ms_public->set_default_policy(Messenger::Policy::stateless_registered_server(0));ms_public->set_policy_throttlers(entity_name_t::TYPE_CLIENT,client_byte_throttler.get(),client_msg_throttler.get());ms_public->set_policy(entity_name_t::TYPE_MON,Messenger::Policy::lossy_client(osd_required));ms_public->set_policy(entity_name_t::TYPE_MGR,Messenger::Policy::lossy_client(osd_required));ms_cluster->set_default_policy(Messenger::Policy::stateless_server(0));ms_cluster->set_policy(entity_name_t::TYPE_MON, Messenger::Policy::lossy_client(0));ms_cluster->set_policy(entity_name_t::TYPE_OSD,Messenger::Policy::lossless_peer(osd_required));ms_cluster->set_policy(entity_name_t::TYPE_CLIENT,Messenger::Policy::stateless_server(0));
在create中,初始化一个AsyncMessenger的对象
Messenger *Messenger::create(CephContext *cct, const std::string &type,entity_name_t name, std::string lname,uint64_t nonce)
{if (type == "random" || type.find("async") != std::string::npos)return new AsyncMessenger(cct, name, type, std::move(lname), nonce);lderr(cct) << "unrecognized ms_type '" << type << "'" << dendl;return nullptr;
}
AsyncMessenger::AsyncMessenger(CephContext *cct, entity_name_t name,const std::string &type, std::string mname, uint64_t _nonce): SimplePolicyMessenger(cct, name),dispatch_queue(cct, this, mname),nonce(_nonce)
{std::string transport_type = "posix";if (type.find("rdma") != std::string::npos)transport_type = "rdma";else if (type.find("dpdk") != std::string::npos)transport_type = "dpdk";auto single = &cct->lookup_or_create_singleton_object<StackSingleton>("AsyncMessenger::NetworkStack::" + transport_type, true, cct);single->ready(transport_type);stack = single->stack.get();stack->start();local_worker = stack->get_worker();local_connection = ceph::make_ref<AsyncConnection>(cct, this, &dispatch_queue,local_worker, true, true);init_local_connection();reap_handler = new C_handle_reap(this);unsigned processor_num = 1;if (stack->support_local_listen_table())processor_num = stack->get_num_worker();for (unsigned i = 0; i < processor_num; ++i)processors.push_back(new Processor(this, stack->get_worker(i), cct));
}
绑定IP,初始化osd对象的时候,ms_public作为参数传入。osd在调用init的时候,进行设置。
if (ms_public->bindv(public_addrs) < 0)forker.exit(1);if (ms_cluster->bindv(cluster_addrs) < 0)forker.exit(1);osdptr = new OSD(g_ceph_context,std::move(store),whoami,ms_cluster,ms_public,ms_hb_front_client,ms_hb_back_client,ms_hb_front_server,ms_hb_back_server,ms_objecter,&mc,data_path,journal_path,poolctx);int err = osdptr->pre_init();if (err < 0) {derr << TEXT_RED << " ** ERROR: osd pre_init failed: " << cpp_strerror(-err)<< TEXT_NORMAL << dendl;forker.exit(1);}ms_public->start();ms_hb_front_client->start();ms_hb_back_client->start();ms_hb_front_server->start();ms_hb_back_server->start();ms_cluster->start();ms_objecter->start();
bindv的具体流程
int Messenger::bindv(const entity_addrvec_t& addrs)
{return bind(addrs.legacy_addr());
}
int AsyncMessenger::bind(const entity_addr_t &bind_addr)
{ldout(cct,10) << __func__ << " " << bind_addr << dendl;// old bind() can take entity_addr_t(). new bindv() can take a// 0.0.0.0-like address but needs type and family to be set.auto a = bind_addr;if (a == entity_addr_t()) {a.set_type(entity_addr_t::TYPE_LEGACY);if (cct->_conf->ms_bind_ipv6) {a.set_family(AF_INET6);} else {a.set_family(AF_INET);}}return bindv(entity_addrvec_t(a));
}
int AsyncMessenger::bindv(const entity_addrvec_t &bind_addrs)
{lock.lock();if (!pending_bind && started) {ldout(cct,10) << __func__ << " already started" << dendl;lock.unlock();return -1;}ldout(cct,10) << __func__ << " " << bind_addrs << dendl;if (!stack->is_ready()) {ldout(cct, 10) << __func__ << " Network Stack is not ready for bind yet - postponed" << dendl;pending_bind_addrs = bind_addrs;pending_bind = true;lock.unlock();return 0;}lock.unlock();// bind to a socketstd::set<int> avoid_ports;entity_addrvec_t bound_addrs;unsigned i = 0;for (auto &&p : processors) {int r = p->bind(bind_addrs, avoid_ports, &bound_addrs);if (r) {// Note: this is related to local tcp listen table problem.// Posix(default kernel implementation) backend shares listen table// in the kernel, so all threads can use the same listen table naturally// and only one thread need to bind. But other backends(like dpdk) uses local// listen table, we need to bind/listen tcp port for each worker. So if the// first worker failed to bind, it could be think the normal error then handle// it, like port is used case. But if the first worker successfully to bind// but the second worker failed, it's not expected and we need to assert// hereceph_assert(i == 0);return r;}++i;}_finish_bind(bind_addrs, bound_addrs);return 0;
}
osdptr进行init的时候会江client_messenger(ms_cluster)add进去。
//ceph_osd.cc// start osderr = osdptr->init();
//OSD.cc
int OSD::init(){// i'm ready!client_messenger->add_dispatcher_tail(&mgrc);client_messenger->add_dispatcher_tail(this);cluster_messenger->add_dispatcher_head(this);
}//Message.h
void add_dispatcher_tail(Dispatcher *d) {
bool first = dispatchers.empty();
dispatchers.push_back(d);
if (d->ms_can_fast_dispatch_any())fast_dispatchers.push_back(d);
if (first)ready();
}
//AsyncMessenger.cc
void AsyncMessenger::ready()
{ldout(cct,10) << __func__ << " " << get_myaddrs() << dendl;stack->ready();if (pending_bind) {int err = bindv(pending_bind_addrs);if (err) {lderr(cct) << __func__ << " postponed bind failed" << dendl;ceph_abort();}}std::lock_guard l{lock};for (auto &&p : processors)p->start();dispatch_queue.start();
}
//启动信息接收线程
void DispatchQueue::start()
{ceph_assert(!stop);ceph_assert(!dispatch_thread.is_started());dispatch_thread.create("ms_dispatch");local_delivery_thread.create("ms_local");
}
//DispatchQueue.hclass DispatchThread : public Thread {DispatchQueue *dq;public:explicit DispatchThread(DispatchQueue *dq) : dq(dq) {}void *entry() override {dq->entry();return 0;}} dispatch_thread;ceph::mutex local_delivery_lock;ceph::condition_variable local_delivery_cond;bool stop_local_delivery;std::queue<std::pair<ceph::ref_t<Message>, int>> local_messages;class LocalDeliveryThread : public Thread {DispatchQueue *dq;public:explicit LocalDeliveryThread(DispatchQueue *dq) : dq(dq) {}void *entry() override {dq->run_local_delivery();return 0;}} local_delivery_thread;
/** This function delivers incoming messages to the Messenger.* Connections with messages are kept in queues; when beginning a message* delivery the highest-priority queue is selected, the connection from the* front of the queue is removed, and its message read. If the connection* has remaining messages at that priority level, it is re-placed on to the* end of the queue. If the queue is empty; it's removed.* The message is then delivered and the process starts again.*/
void DispatchQueue::entry()
{std::unique_lock l{lock};while (true) {while (!mqueue.empty()) {QueueItem qitem = mqueue.dequeue();if (!qitem.is_code())remove_arrival(qitem.get_message());l.unlock();if (qitem.is_code()) {if (cct->_conf->ms_inject_internal_delays &&cct->_conf->ms_inject_delay_probability &&(rand() % 10000)/10000.0 < cct->_conf->ms_inject_delay_probability) {utime_t t;t.set_from_double(cct->_conf->ms_inject_internal_delays);ldout(cct, 1) << "DispatchQueue::entry inject delay of " << t<< dendl;t.sleep();}switch (qitem.get_code()) {case D_BAD_REMOTE_RESET:msgr->ms_deliver_handle_remote_reset(qitem.get_connection());break;case D_CONNECT:msgr->ms_deliver_handle_connect(qitem.get_connection());break;case D_ACCEPT:msgr->ms_deliver_handle_accept(qitem.get_connection());break;case D_BAD_RESET:msgr->ms_deliver_handle_reset(qitem.get_connection());break;case D_CONN_REFUSED:msgr->ms_deliver_handle_refused(qitem.get_connection());break;default:ceph_abort();}} else {const ref_t<Message>& m = qitem.get_message();if (stop) {ldout(cct,10) << " stop flag set, discarding " << m << " " << *m << dendl;} else {uint64_t msize = pre_dispatch(m);msgr->ms_deliver_dispatch(m);post_dispatch(m, msize);}}l.lock();}if (stop)break;// wait for something to be put on queuecond.wait(l);}
}
//DispatchQueue.cc
void DispatchQueue::run_local_delivery()
{std::unique_lock l{local_delivery_lock};while (true) {if (stop_local_delivery)break;if (local_messages.empty()) {local_delivery_cond.wait(l);continue;}auto p = std::move(local_messages.front());local_messages.pop();l.unlock();const ref_t<Message>& m = p.first;int priority = p.second;fast_preprocess(m);if (can_fast_dispatch(m)) {fast_dispatch(m);} else {enqueue(m, priority, 0);}l.lock();}
}void fast_dispatch(Message* m) {return fast_dispatch(ceph::ref_t<Message>(m, false)); /* consume ref */
}
void DispatchQueue::fast_dispatch(const ref_t<Message>& m)
{uint64_t msize = pre_dispatch(m);msgr->ms_fast_dispatch(m);post_dispatch(m, msize);
}
//Dispather.h/*** Perform a "fast dispatch" on a given message. See* ms_can_fast_dispatch() for the requirements.** @param m The Message to fast dispatch.*/virtual void ms_fast_dispatch(Message *m) { ceph_abort(); }/* ms_fast_dispatch2 because otherwise the child must define both */virtual void ms_fast_dispatch2(const MessageRef &m) {/* allow old style dispatch handling that expects a Message * with a floating ref */return ms_fast_dispatch(MessageRef(m).detach()); /* XXX N.B. always consumes ref */}
//OSD.cc
void OSD::ms_fast_dispatch(Message *m)
{auto dispatch_span = tracing::osd::tracer.start_trace(__func__);FUNCTRACE(cct);if (service.is_stopping()) {m->put();return;}// peering event?switch (m->get_type()) {case CEPH_MSG_PING:dout(10) << "ping from " << m->get_source() << dendl;m->put();return;case MSG_OSD_FORCE_RECOVERY:handle_fast_force_recovery(static_cast<MOSDForceRecovery*>(m));return;case MSG_OSD_SCRUB2:handle_fast_scrub(static_cast<MOSDScrub2*>(m));return;case MSG_OSD_PG_CREATE2:return handle_fast_pg_create(static_cast<MOSDPGCreate2*>(m));case MSG_OSD_PG_NOTIFY:return handle_fast_pg_notify(static_cast<MOSDPGNotify*>(m));case MSG_OSD_PG_INFO:return handle_fast_pg_info(static_cast<MOSDPGInfo*>(m));case MSG_OSD_PG_REMOVE:return handle_fast_pg_remove(static_cast<MOSDPGRemove*>(m));// these are single-pg messages that handle themselvescase MSG_OSD_PG_LOG:case MSG_OSD_PG_TRIM:case MSG_OSD_PG_NOTIFY2:case MSG_OSD_PG_QUERY2:case MSG_OSD_PG_INFO2:case MSG_OSD_BACKFILL_RESERVE:case MSG_OSD_RECOVERY_RESERVE:case MSG_OSD_PG_LEASE:case MSG_OSD_PG_LEASE_ACK:{MOSDPeeringOp *pm = static_cast<MOSDPeeringOp*>(m);if (require_osd_peer(pm)) {enqueue_peering_evt(pm->get_spg(),PGPeeringEventRef(pm->get_event()));}pm->put();return;}}OpRequestRef op = op_tracker.create_request<OpRequest, Message*>(m);{
#ifdef WITH_LTTNGosd_reqid_t reqid = op->get_reqid();
#endiftracepoint(osd, ms_fast_dispatch, reqid.name._type,reqid.name._num, reqid.tid, reqid.inc);}op->osd_parent_span = tracing::osd::tracer.add_span("op-request-created", dispatch_span);if (m->trace)op->osd_trace.init("osd op", &trace_endpoint, &m->trace);// note sender epoch, min req's epochop->sent_epoch = static_cast<MOSDFastDispatchOp*>(m)->get_map_epoch();op->min_epoch = static_cast<MOSDFastDispatchOp*>(m)->get_min_epoch();ceph_assert(op->min_epoch <= op->sent_epoch); // sanity check!service.maybe_inject_dispatch_delay();if (m->get_connection()->has_features(CEPH_FEATUREMASK_RESEND_ON_SPLIT) ||m->get_type() != CEPH_MSG_OSD_OP) {// queue it directlyenqueue_op(static_cast<MOSDFastDispatchOp*>(m)->get_spg(),std::move(op),static_cast<MOSDFastDispatchOp*>(m)->get_map_epoch());} else {// legacy client, and this is an MOSDOp (the *only* fast dispatch// message that didn't have an explicit spg_t); we need to map// them to an spg_t while preserving delivery order.auto priv = m->get_connection()->get_priv();if (auto session = static_cast<Session*>(priv.get()); session) {std::lock_guard l{session->session_dispatch_lock};op->get();session->waiting_on_map.push_back(*op);OSDMapRef nextmap = service.get_nextmap_reserved();dispatch_session_waiting(session, nextmap);service.release_map(nextmap);}}OID_EVENT_TRACE_WITH_MSG(m, "MS_FAST_DISPATCH_END", false);
}
void OSD::dispatch_session_waiting(const ceph::ref_t<Session>& session, OSDMapRef osdmap)
{ceph_assert(ceph_mutex_is_locked(session->session_dispatch_lock));auto i = session->waiting_on_map.begin();while (i != session->waiting_on_map.end()) {OpRequestRef op = &(*i);ceph_assert(ms_can_fast_dispatch(op->get_req()));auto m = op->get_req<MOSDFastDispatchOp>();if (m->get_min_epoch() > osdmap->get_epoch()) {break;}session->waiting_on_map.erase(i++);op->put();spg_t pgid;if (m->get_type() == CEPH_MSG_OSD_OP) {pg_t actual_pgid = osdmap->raw_pg_to_pg(static_cast<const MOSDOp*>(m)->get_pg());if (!osdmap->get_primary_shard(actual_pgid, &pgid)) {continue;}} else {pgid = m->get_spg();}enqueue_op(pgid, std::move(op), m->get_map_epoch());}if (session->waiting_on_map.empty()) {clear_session_waiting_on_map(session);} else {register_session_waiting_on_map(session);}
}