【ceph学习】ceph如何进行数据的读写(3)

本章摘要

上文说到,osdc中封装请求,使用message中的相关机制将请求发送出去。
本文详细介绍osd服务端如何进行请求的接收。

osd初始化

osd启动时,定义了message变量ms_public,该变量绑定public网络,负责接收客户端的请求。ms_public会启动对应的线程进行接收,并指定接收函数。

  //ceph_osd.ccMessenger *ms_public = Messenger::create(g_ceph_context, public_msg_type,entity_name_t::OSD(whoami), "client", nonce);Messenger *ms_cluster = Messenger::create(g_ceph_context, cluster_msg_type,entity_name_t::OSD(whoami), "cluster", nonce);ms_public->set_default_policy(Messenger::Policy::stateless_registered_server(0));ms_public->set_policy_throttlers(entity_name_t::TYPE_CLIENT,client_byte_throttler.get(),client_msg_throttler.get());ms_public->set_policy(entity_name_t::TYPE_MON,Messenger::Policy::lossy_client(osd_required));ms_public->set_policy(entity_name_t::TYPE_MGR,Messenger::Policy::lossy_client(osd_required));ms_cluster->set_default_policy(Messenger::Policy::stateless_server(0));ms_cluster->set_policy(entity_name_t::TYPE_MON, Messenger::Policy::lossy_client(0));ms_cluster->set_policy(entity_name_t::TYPE_OSD,Messenger::Policy::lossless_peer(osd_required));ms_cluster->set_policy(entity_name_t::TYPE_CLIENT,Messenger::Policy::stateless_server(0));

在create中,初始化一个AsyncMessenger的对象

Messenger *Messenger::create(CephContext *cct, const std::string &type,entity_name_t name, std::string lname,uint64_t nonce)
{if (type == "random" || type.find("async") != std::string::npos)return new AsyncMessenger(cct, name, type, std::move(lname), nonce);lderr(cct) << "unrecognized ms_type '" << type << "'" << dendl;return nullptr;
}
AsyncMessenger::AsyncMessenger(CephContext *cct, entity_name_t name,const std::string &type, std::string mname, uint64_t _nonce): SimplePolicyMessenger(cct, name),dispatch_queue(cct, this, mname),nonce(_nonce)
{std::string transport_type = "posix";if (type.find("rdma") != std::string::npos)transport_type = "rdma";else if (type.find("dpdk") != std::string::npos)transport_type = "dpdk";auto single = &cct->lookup_or_create_singleton_object<StackSingleton>("AsyncMessenger::NetworkStack::" + transport_type, true, cct);single->ready(transport_type);stack = single->stack.get();stack->start();local_worker = stack->get_worker();local_connection = ceph::make_ref<AsyncConnection>(cct, this, &dispatch_queue,local_worker, true, true);init_local_connection();reap_handler = new C_handle_reap(this);unsigned processor_num = 1;if (stack->support_local_listen_table())processor_num = stack->get_num_worker();for (unsigned i = 0; i < processor_num; ++i)processors.push_back(new Processor(this, stack->get_worker(i), cct));
}

绑定IP,初始化osd对象的时候,ms_public作为参数传入。osd在调用init的时候,进行设置。

  if (ms_public->bindv(public_addrs) < 0)forker.exit(1);if (ms_cluster->bindv(cluster_addrs) < 0)forker.exit(1);osdptr = new OSD(g_ceph_context,std::move(store),whoami,ms_cluster,ms_public,ms_hb_front_client,ms_hb_back_client,ms_hb_front_server,ms_hb_back_server,ms_objecter,&mc,data_path,journal_path,poolctx);int err = osdptr->pre_init();if (err < 0) {derr << TEXT_RED << " ** ERROR: osd pre_init failed: " << cpp_strerror(-err)<< TEXT_NORMAL << dendl;forker.exit(1);}ms_public->start();ms_hb_front_client->start();ms_hb_back_client->start();ms_hb_front_server->start();ms_hb_back_server->start();ms_cluster->start();ms_objecter->start();

bindv的具体流程

int Messenger::bindv(const entity_addrvec_t& addrs)
{return bind(addrs.legacy_addr());
}
int AsyncMessenger::bind(const entity_addr_t &bind_addr)
{ldout(cct,10) << __func__ << " " << bind_addr << dendl;// old bind() can take entity_addr_t(). new bindv() can take a// 0.0.0.0-like address but needs type and family to be set.auto a = bind_addr;if (a == entity_addr_t()) {a.set_type(entity_addr_t::TYPE_LEGACY);if (cct->_conf->ms_bind_ipv6) {a.set_family(AF_INET6);} else {a.set_family(AF_INET);}}return bindv(entity_addrvec_t(a));
}
int AsyncMessenger::bindv(const entity_addrvec_t &bind_addrs)
{lock.lock();if (!pending_bind && started) {ldout(cct,10) << __func__ << " already started" << dendl;lock.unlock();return -1;}ldout(cct,10) << __func__ << " " << bind_addrs << dendl;if (!stack->is_ready()) {ldout(cct, 10) << __func__ << " Network Stack is not ready for bind yet - postponed" << dendl;pending_bind_addrs = bind_addrs;pending_bind = true;lock.unlock();return 0;}lock.unlock();// bind to a socketstd::set<int> avoid_ports;entity_addrvec_t bound_addrs;unsigned i = 0;for (auto &&p : processors) {int r = p->bind(bind_addrs, avoid_ports, &bound_addrs);if (r) {// Note: this is related to local tcp listen table problem.// Posix(default kernel implementation) backend shares listen table// in the kernel, so all threads can use the same listen table naturally// and only one thread need to bind. But other backends(like dpdk) uses local// listen table, we need to bind/listen tcp port for each worker. So if the// first worker failed to bind, it could be think the normal error then handle// it, like port is used case. But if the first worker successfully to bind// but the second worker failed, it's not expected and we need to assert// hereceph_assert(i == 0);return r;}++i;}_finish_bind(bind_addrs, bound_addrs);return 0;
}

osdptr进行init的时候会江client_messenger(ms_cluster)add进去。

  //ceph_osd.cc// start osderr = osdptr->init();
//OSD.cc
int OSD::init(){// i'm ready!client_messenger->add_dispatcher_tail(&mgrc);client_messenger->add_dispatcher_tail(this);cluster_messenger->add_dispatcher_head(this);
}//Message.h
void add_dispatcher_tail(Dispatcher *d) {
bool first = dispatchers.empty();
dispatchers.push_back(d);
if (d->ms_can_fast_dispatch_any())fast_dispatchers.push_back(d);
if (first)ready();
}
//AsyncMessenger.cc
void AsyncMessenger::ready()
{ldout(cct,10) << __func__ << " " << get_myaddrs() << dendl;stack->ready();if (pending_bind) {int err = bindv(pending_bind_addrs);if (err) {lderr(cct) << __func__ << " postponed bind failed" << dendl;ceph_abort();}}std::lock_guard l{lock};for (auto &&p : processors)p->start();dispatch_queue.start();
}
//启动信息接收线程
void DispatchQueue::start()
{ceph_assert(!stop);ceph_assert(!dispatch_thread.is_started());dispatch_thread.create("ms_dispatch");local_delivery_thread.create("ms_local");
}
//DispatchQueue.hclass DispatchThread : public Thread {DispatchQueue *dq;public:explicit DispatchThread(DispatchQueue *dq) : dq(dq) {}void *entry() override {dq->entry();return 0;}} dispatch_thread;ceph::mutex local_delivery_lock;ceph::condition_variable local_delivery_cond;bool stop_local_delivery;std::queue<std::pair<ceph::ref_t<Message>, int>> local_messages;class LocalDeliveryThread : public Thread {DispatchQueue *dq;public:explicit LocalDeliveryThread(DispatchQueue *dq) : dq(dq) {}void *entry() override {dq->run_local_delivery();return 0;}} local_delivery_thread;
/** This function delivers incoming messages to the Messenger.* Connections with messages are kept in queues; when beginning a message* delivery the highest-priority queue is selected, the connection from the* front of the queue is removed, and its message read. If the connection* has remaining messages at that priority level, it is re-placed on to the* end of the queue. If the queue is empty; it's removed.* The message is then delivered and the process starts again.*/
void DispatchQueue::entry()
{std::unique_lock l{lock};while (true) {while (!mqueue.empty()) {QueueItem qitem = mqueue.dequeue();if (!qitem.is_code())remove_arrival(qitem.get_message());l.unlock();if (qitem.is_code()) {if (cct->_conf->ms_inject_internal_delays &&cct->_conf->ms_inject_delay_probability &&(rand() % 10000)/10000.0 < cct->_conf->ms_inject_delay_probability) {utime_t t;t.set_from_double(cct->_conf->ms_inject_internal_delays);ldout(cct, 1) << "DispatchQueue::entry  inject delay of " << t<< dendl;t.sleep();}switch (qitem.get_code()) {case D_BAD_REMOTE_RESET:msgr->ms_deliver_handle_remote_reset(qitem.get_connection());break;case D_CONNECT:msgr->ms_deliver_handle_connect(qitem.get_connection());break;case D_ACCEPT:msgr->ms_deliver_handle_accept(qitem.get_connection());break;case D_BAD_RESET:msgr->ms_deliver_handle_reset(qitem.get_connection());break;case D_CONN_REFUSED:msgr->ms_deliver_handle_refused(qitem.get_connection());break;default:ceph_abort();}} else {const ref_t<Message>& m = qitem.get_message();if (stop) {ldout(cct,10) << " stop flag set, discarding " << m << " " << *m << dendl;} else {uint64_t msize = pre_dispatch(m);msgr->ms_deliver_dispatch(m);post_dispatch(m, msize);}}l.lock();}if (stop)break;// wait for something to be put on queuecond.wait(l);}
}
//DispatchQueue.cc
void DispatchQueue::run_local_delivery()
{std::unique_lock l{local_delivery_lock};while (true) {if (stop_local_delivery)break;if (local_messages.empty()) {local_delivery_cond.wait(l);continue;}auto p = std::move(local_messages.front());local_messages.pop();l.unlock();const ref_t<Message>& m = p.first;int priority = p.second;fast_preprocess(m);if (can_fast_dispatch(m)) {fast_dispatch(m);} else {enqueue(m, priority, 0);}l.lock();}
}void fast_dispatch(Message* m) {return fast_dispatch(ceph::ref_t<Message>(m, false)); /* consume ref */
}
void DispatchQueue::fast_dispatch(const ref_t<Message>& m)
{uint64_t msize = pre_dispatch(m);msgr->ms_fast_dispatch(m);post_dispatch(m, msize);
}
//Dispather.h/*** Perform a "fast dispatch" on a given message. See* ms_can_fast_dispatch() for the requirements.** @param m The Message to fast dispatch.*/virtual void ms_fast_dispatch(Message *m) { ceph_abort(); }/* ms_fast_dispatch2 because otherwise the child must define both */virtual void ms_fast_dispatch2(const MessageRef &m) {/* allow old style dispatch handling that expects a Message * with a floating ref */return ms_fast_dispatch(MessageRef(m).detach()); /* XXX N.B. always consumes ref */}
//OSD.cc
void OSD::ms_fast_dispatch(Message *m)
{auto dispatch_span = tracing::osd::tracer.start_trace(__func__);FUNCTRACE(cct);if (service.is_stopping()) {m->put();return;}// peering event?switch (m->get_type()) {case CEPH_MSG_PING:dout(10) << "ping from " << m->get_source() << dendl;m->put();return;case MSG_OSD_FORCE_RECOVERY:handle_fast_force_recovery(static_cast<MOSDForceRecovery*>(m));return;case MSG_OSD_SCRUB2:handle_fast_scrub(static_cast<MOSDScrub2*>(m));return;case MSG_OSD_PG_CREATE2:return handle_fast_pg_create(static_cast<MOSDPGCreate2*>(m));case MSG_OSD_PG_NOTIFY:return handle_fast_pg_notify(static_cast<MOSDPGNotify*>(m));case MSG_OSD_PG_INFO:return handle_fast_pg_info(static_cast<MOSDPGInfo*>(m));case MSG_OSD_PG_REMOVE:return handle_fast_pg_remove(static_cast<MOSDPGRemove*>(m));// these are single-pg messages that handle themselvescase MSG_OSD_PG_LOG:case MSG_OSD_PG_TRIM:case MSG_OSD_PG_NOTIFY2:case MSG_OSD_PG_QUERY2:case MSG_OSD_PG_INFO2:case MSG_OSD_BACKFILL_RESERVE:case MSG_OSD_RECOVERY_RESERVE:case MSG_OSD_PG_LEASE:case MSG_OSD_PG_LEASE_ACK:{MOSDPeeringOp *pm = static_cast<MOSDPeeringOp*>(m);if (require_osd_peer(pm)) {enqueue_peering_evt(pm->get_spg(),PGPeeringEventRef(pm->get_event()));}pm->put();return;}}OpRequestRef op = op_tracker.create_request<OpRequest, Message*>(m);{
#ifdef WITH_LTTNGosd_reqid_t reqid = op->get_reqid();
#endiftracepoint(osd, ms_fast_dispatch, reqid.name._type,reqid.name._num, reqid.tid, reqid.inc);}op->osd_parent_span = tracing::osd::tracer.add_span("op-request-created", dispatch_span);if (m->trace)op->osd_trace.init("osd op", &trace_endpoint, &m->trace);// note sender epoch, min req's epochop->sent_epoch = static_cast<MOSDFastDispatchOp*>(m)->get_map_epoch();op->min_epoch = static_cast<MOSDFastDispatchOp*>(m)->get_min_epoch();ceph_assert(op->min_epoch <= op->sent_epoch); // sanity check!service.maybe_inject_dispatch_delay();if (m->get_connection()->has_features(CEPH_FEATUREMASK_RESEND_ON_SPLIT) ||m->get_type() != CEPH_MSG_OSD_OP) {// queue it directlyenqueue_op(static_cast<MOSDFastDispatchOp*>(m)->get_spg(),std::move(op),static_cast<MOSDFastDispatchOp*>(m)->get_map_epoch());} else {// legacy client, and this is an MOSDOp (the *only* fast dispatch// message that didn't have an explicit spg_t); we need to map// them to an spg_t while preserving delivery order.auto priv = m->get_connection()->get_priv();if (auto session = static_cast<Session*>(priv.get()); session) {std::lock_guard l{session->session_dispatch_lock};op->get();session->waiting_on_map.push_back(*op);OSDMapRef nextmap = service.get_nextmap_reserved();dispatch_session_waiting(session, nextmap);service.release_map(nextmap);}}OID_EVENT_TRACE_WITH_MSG(m, "MS_FAST_DISPATCH_END", false);
}
void OSD::dispatch_session_waiting(const ceph::ref_t<Session>& session, OSDMapRef osdmap)
{ceph_assert(ceph_mutex_is_locked(session->session_dispatch_lock));auto i = session->waiting_on_map.begin();while (i != session->waiting_on_map.end()) {OpRequestRef op = &(*i);ceph_assert(ms_can_fast_dispatch(op->get_req()));auto m = op->get_req<MOSDFastDispatchOp>();if (m->get_min_epoch() > osdmap->get_epoch()) {break;}session->waiting_on_map.erase(i++);op->put();spg_t pgid;if (m->get_type() == CEPH_MSG_OSD_OP) {pg_t actual_pgid = osdmap->raw_pg_to_pg(static_cast<const MOSDOp*>(m)->get_pg());if (!osdmap->get_primary_shard(actual_pgid, &pgid)) {continue;}} else {pgid = m->get_spg();}enqueue_op(pgid, std::move(op), m->get_map_epoch());}if (session->waiting_on_map.empty()) {clear_session_waiting_on_map(session);} else {register_session_waiting_on_map(session);}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/52776.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Java使用POI创建带样式和公式的Excel文件

这篇文章将演示如何使用POI 创建带样式和公式的Excel文件。 代码 import org.apache.poi.ss.usermodel.*; import org.apache.poi.xssf.usermodel.XSSFWorkbook;import java.io.FileOutputStream; import java.io.IOException;public class ExcelDemo {public static void mai…

FPGA第 5 篇,FPGA技术优略势,FPGA学习方向,FPGA学习路线(FPGA专业知识的学习方向,FPGA现场可编程门阵列学习路线和方向)

前言 前几篇讲了一下FPGA的发展和应用&#xff0c;以及未来前景。具体详细&#xff0c;请看 FPGA发展和应用&#xff0c;以及未来前景https://blog.csdn.net/weixin_65793170/category_12665249.html 这里我们来&#xff0c;记录一下&#xff0c;FPGA专业知识的学习路线 一.…

Python(C++)自动微分导图

&#x1f3af;要点 反向传播矢量化计算方式前向传递和后向传递计算方式图节点拓扑排序一阶二阶前向和伴随模式计算二元分类中生成系数高斯噪声和特征二元二次方程有向无环计算图超平面搜索前向梯度下降算法快速傅里叶变换材料应力和切线算子GPU CUDA 神经网络算术微分 Pytho…

理解 decltype() 指定符(C++ 11 及以上版本)

目录 1. 功能 2. 语法格式 3. 理解 3.1 第一点 1.2 第二点 4. 例释 在 C 编程语言中&#xff0c;decltype 是一个用于检查实体的声明类型或表达式的类型和值类别的关键字。该关键字在 C11 中引入&#xff0c;主要用于泛型编程中&#xff0c;因为在泛型编程中&#x…

数据类型 NVARCHAR2 与 VARCHAR2 的对比

数据类型 NVARCHAR2 与 VARCHAR2 的对比 在数据库系统中&#xff0c;字符数据类型是用于存储文本数据的关键部分。在达梦数据库&#xff08;DM Database&#xff09;以及许多其他关系数据库管理系统&#xff08;例如 Oracle&#xff09;&#xff0c;常见的字符数据类型有 NVAR…

C语言阴阳迷宫

目录 开头程序程序的流程图程序游玩的效果下一篇博客要说的东西 开头 大家好&#xff0c;我叫这是我58。 程序 #define _CRT_SECURE_NO_WARNINGS 1 #include <stdio.h> #include <stdlib.h> #include <string.h> #include <Windows.h> enum WASD {W…

CityHash、FarmHash

CityHash和FarmHash都是由Google开发的非加密哈希函数&#xff0c;专为快速处理大量数据而设计。它们在数据中心和大规模分布式系统中尤其有用&#xff0c;用于任务如数据分区、查找优化、数据校验等。这两种哈希函数都以其高效的性能和良好的分布特性而著称。 CityHash City…

设计模式 -- 外观模式(Facade Pattern)

1 问题引出 组建一个家庭影院 DVD 播放器、投影仪、自动屏幕、环绕立体声、爆米花机,要求完成使用家庭影院的功能&#xff0c;其过程为&#xff1a; 直接用遥控器&#xff1a;统筹各设备开关 开爆米花机&#xff0c;放下屏幕 &#xff0c;开投影仪 &#xff0c;开音响&#xf…

【人工智能】AI算法系统设计与算法建模的详细阐述

&#x1f3c6;&#x1f3c6;欢迎大家来到我们的天空&#x1f3c6;&#x1f3c6; &#x1f3c6;&#x1f3c6;如果文章内容对您有所触动&#xff0c;别忘了点赞、关注&#xff0c;收藏&#xff01; &#x1f3c6; 作者简介&#xff1a;我们的天空 &#x1f3c6;《头衔》&#x…

自定义全局变量在uniapp的Vuex应用

本文介绍了uniapp使用自定义全局变量的方法。当同一业务在连续页面操作时&#xff0c;存在部分筛选变量需要始终保持一致&#xff0c;比如时间筛选条件等&#xff0c;来回跳转页面时如果采用变量传递&#xff0c;常较为繁琐&#xff0c;存在遗漏传递或未清除上一次变量值&#…

图像金字塔的作用

1. 概述 图像金字塔是图像多尺度表达的一种&#xff0c;主要应用与图像分割&#xff0c;是一种以多分辨率来解释图像的有效但概念简单的结构。图像金字塔实际上是一张图片在不同尺度下的集合&#xff0c;即原图的上采样和下采样集合。金字塔的底部是高分辨率图像&#xff0c;而…

LuaJit分析(九)LuaJit中的JIT原理分析

Jit in luajit Luajit是一款高性能的lua解释器&#xff0c;与官方的lua解释器相比&#xff0c;luajit的高速除了将解释器直接以汇编代码实现外&#xff0c;还支持jit模式&#xff08;Just in time&#xff09;。Jit模式即将luajit的字节码编译成处理器能够直接执行的机器码&am…

vue3如何监听reactive对象是哪个属性发生的变化

在 Vue 3 中&#xff0c;如果你想监听 reactive 对象中的某个属性发生的变化&#xff0c;你可以使用 watch 函数进行监听。watch 函数允许你观察 reactive 对象的某个属性或者整个对象&#xff0c;并在变化时执行相应的操作。 1. 监听 reactive 对象的某个属性 如果你只想监听…

C++学习/复习补充记录 --- 图论(深搜,广搜)

数据结构与算法 | 深搜&#xff08;DFS&#xff09;与广搜&#xff08;BFS&#xff09;_深搜广搜算法-CSDN博客 深度优先搜索理论基础 深搜和广搜的区别&#xff1a; &#xff08;通俗版&#xff09; dfs是可一个方向去搜&#xff0c;不到黄河不回头&#xff0c;直到遇到绝境了…

在Unity中使用C#进行Xml序列化时保留特定小数位的方法参考

序列化方法代码参考&#xff1a; using System.IO; using System.Xml.Serialization;public class XmlTool {public static string ToXml<T>(T obj){XmlSerializer xmlSerializer new XmlSerializer(typeof(T));using var stringWriter new StringWriter();//让xml文档…

linux驱动 -- 输入子系统

1:输入子系统介绍 一个统一的输入设备的开发框架&#xff0c; 统一生成设备文件&#xff0c; 统一返回固定格式值。 2:输入子系统开发设备 键盘、鼠标、触摸屏等等。 3&#xff1a;输入子系统运行框架 应用层&#xff1a;操作设备文件openclosereadwrite 输入子系统&#xff…

Netty 学习笔记

Java 网络编程 早期的 Java API 只支持由本地系统套接字库提供的所谓的阻塞函数&#xff0c;下面的代码展示了一个使用传统 Java API 的服务器代码的普通示例 // 创建一个 ServerSocket 用以监听指定端口上的连接请求 ServerSocket serverSocket new ServerSocket(5000); //…

OS常规测试方法-PPMU

step 0: 检查工作&#xff1a; 检查每根pin连接到指定的PPMU资源是否正确继电器资源在PRJ文件中是否定义正确 step 1 设计者设计的测试电路继电器重置初始化close应该闭合的继电器 step 2 DPS pin电压置0V&#xff0c;同时考虑电流量程wait闭合测试机DPS通道RELAYwait st…

android13 隐藏状态栏里面的飞行模式 隐藏蓝牙 隐藏网络

总纲 android13 rom 开发总纲说明 目录 1.前言 2.问题分析 3.代码分析 4.代码修改 5.编译运行 6.彩蛋 1.前言 android13 隐藏状态栏里面的飞行模式,或者其他功能,如网络,蓝牙等等功能,隐藏下图中的一些图标。 2.问题分析 这里如果直接找这个布局的话,需要跟的逻…

nefu暑假acm集训1 构造矩阵 个人模板+例题汇总

前言&#xff1a; 以下都是nefu暑假集训的训练题&#xff0c;我在此把我的模板和写的一些练习题汇总一下并分享出来&#xff0c;希望在能满足我复习的情况下能帮助到你。 正文&#xff1a; 模板&#xff1a; #include<bits/stdc.h> using namespace std; typedef long…