列存储(CStore)(六)
- 概述
- CStore::GetCUDataFromRemote 函数
- CStore::CheckConsistenceOfCUDescCtl 函数
- CStore::CheckConsistenceOfCUDesc 函数
- CStore::CheckConsistenceOfCUData 函数
- 额外补充
声明:本文的部分内容参考了他人的文章。在编写过程中,我们尊重他人的知识产权和学术成果,力求遵循合理使用原则,并在适用的情况下注明引用来源。
本文主要参考了 OpenGauss1.1.0 的开源代码和《OpenGauss数据库源码解析》一书以及OpenGauss社区学习文档和一些学习资料
概述
本章我们继续在【 OpenGauss源码学习 —— 列存储(CStore)(五)】的基础上进行进一步学习,本文将主要介绍 CStore 类中剩余的各别私有成员函数。
CStore::GetCUDataFromRemote 函数
CStore::GetCUDataFromRemote 函数主要用于在压缩列存储引擎(CStore)中远程加载压缩单元(CU)的数据。它通过从缓存中获取 CU 缓冲区,检查是否需要重用内存,并在获取压缩锁的情况下进行远程加载。如果 CU 缓冲区已经包含压缩数据,它将远程加载的数据进行验证,并在需要时覆盖缓冲区。最后,它启动 CU 的解压缩,并在发现 CRC 错误或 magic 错误时报告相应的错误。此函数支持并发访问,确保不同会话之间对 CU 数据的远程加载和解压缩操作的正确性。函数源码如下:(路径:src/gausskernel/storage/cstore/cstore_am.cpp
)
/** @Description: 仅被 CStore::GetCUData() 调用,用于远程加载 CU* @IN/OUT cuDescPtr: CU 描述符指针* @IN/OUT cuPtr: CU 指针* @IN/OUT colIdx: 列索引* @IN/OUT slotId: 插槽ID,必须被锁定* @IN/OUT valSize: 值的大小* @Return: CU 未压缩返回码* @See also: CStore::GetCUData*/
CUUncompressedRetCode CStore::GetCUDataFromRemote(CUDesc* cuDescPtr, CU* cuPtr, int colIdx, int valSize, const int& slotId)
{// 获取属性信息Form_pg_attribute* attrs = m_relation->rd_att->attrs;// 初始化返回码CUUncompressedRetCode retCode = CU_OK;/* 重用内存并检查是否有其他会话同时更新它。 */if (CUCache->ReserveCstoreDataBlockWithSlotId(slotId)) {// 获取 CU 缓冲区cuPtr = CUCache->GetCUBuf(slotId);cuPtr->m_inCUCache = true;// 设置属性信息cuPtr->SetAttInfo(valSize, attrs[colIdx]->atttypmod, attrs[colIdx]->atttypid);/** 远程加载需要 CU 压缩。(cuPtr->m_compressedLoadBuf != NULL)* 如果 CU 未压缩,表示其他线程已经远程读取了 CU 并进行了解压缩。*/CUCache->AcquireCompressLock(slotId);if (cuPtr->m_cache_compressed) {// 远程加载 CUm_cuStorage[colIdx]->RemoteLoadCU(cuPtr, cuDescPtr->cu_pointer, cuDescPtr->cu_size, g_instance.attr.attr_storage.enable_adio_function, true);// 验证 CU 是否正确if (cuPtr->IsVerified(cuDescPtr->magic))m_cuStorage[colIdx]->OverwriteCU(cuPtr->m_compressedBuf, cuDescPtr->cu_pointer, cuDescPtr->cu_size, false);}CUCache->RealeseCompressLock(slotId);// 完成数据块 IO 操作CUCache->DataBlockCompleteIO(slotId);} else {if (CUCache->DataBlockWaitIO(slotId)) {// 如果在远程读取 CU 时发生 IO 错误,报错ereport(ERROR,(errcode(ERRCODE_IO_ERROR),errmodule(MOD_CACHE),errmsg("There is an IO error when remote read CU in cu_id %u of relation %s file %s offset %lu. ""slotId %d, column \"%s\" ",cuDescPtr->cu_id,RelationGetRelationName(m_relation),relcolpath(m_cuStorage[colIdx]),cuDescPtr->cu_pointer,slotId,NameStr(m_relation->rd_att->attrs[colIdx]->attname))));}}// 开始解压缩 CUretCode = CUCache->StartUncompressCU(cuDescPtr, slotId, this->m_plan_node_id, this->m_timing_on, ALIGNOF_CUSIZE);if (retCode == CU_ERR_CRC || retCode == CU_ERR_MAGIC) {// 远程加载 CRC 错误或魔数错误,报错CUCache->TerminateCU(true);ereport(ERROR,(errcode(ERRCODE_DATA_CORRUPTED),(errmsg("invalid CU in cu_id %u of relation %s file %s offset %lu, remote read %s",cuDescPtr->cu_id,RelationGetRelationName(m_relation),relcolpath(m_cuStorage[colIdx]),cuDescPtr->cu_pointer,GetUncompressErrMsg(retCode)))));}return retCode;
}
CStore::CheckConsistenceOfCUDescCtl 函数
CStore::CheckConsistenceOfCUDescCtl 函数的主要目的是在进行批量加载压缩单元(CU)时,检查不同列的压缩单元描述控制块(LoadCUDescCtl)之间的一致性,确保它们在相同的加载批次中保持一致。函数源码如下:(路径:src/gausskernel/storage/cstore/cstore_am.cpp
)
/** @Description: 检查批量加载期间 CUDescCtl(压缩单元描述符控制块)的一致性。* @See also:*/
void CStore::CheckConsistenceOfCUDescCtl(void)
{// 获取第一列的 CUDescCtlLoadCUDescCtl* firstCUDescCtl = m_CUDescInfo[0];LoadCUDescCtl* checkCUdescCtl = NULL;// 遍历其他列的 CUDescCtl,并比较它们的字段以确保一致性for (int i = 1; i < m_colNum; ++i) {checkCUdescCtl = m_CUDescInfo[i];// 检查下一个 CU ID、上次加载数量和当前加载数量是否一致if (checkCUdescCtl->nextCUID == firstCUDescCtl->nextCUID &&checkCUdescCtl->lastLoadNum == firstCUDescCtl->lastLoadNum &&checkCUdescCtl->curLoadNum == firstCUDescCtl->curLoadNum) {continue; // 此列的 CUDescCtl 一致,继续下一列}// 如果发现不一致性,生成错误报告ereport(defence_errlevel(),(errcode(ERRCODE_INTERNAL_ERROR),errmsg("批量加载期间 CUDescCtl(表列、下一个 CU ID、上次加载数量、当前加载数量)不一致。""CUDescCtl[%d] 为 (%d %u %u %u),CUDescCtl[%d] 为 (%d %u %u %u)",0,(m_colId[0] + 1),firstCUDescCtl->nextCUID,firstCUDescCtl->lastLoadNum,firstCUDescCtl->curLoadNum,i,(m_colId[i] + 1),checkCUdescCtl->nextCUID,checkCUdescCtl->lastLoadNum,checkCUdescCtl->curLoadNum),errdetail("关系信息:名称 \"%s\",命名空间 ID %u,ID %u,relfilenode %u/%u/%u",RelationGetRelationName(m_relation),RelationGetNamespace(m_relation),RelationGetRelid(m_relation),m_relation->rd_node.spcNode,m_relation->rd_node.dbNode,m_relation->rd_node.relNode)));}
}
注解:
在这个上下文中,“一致性” 指的是不同列的压缩单元描述符控制块(LoadCUDescCtl)的一些关键字段具有相同的值,表明它们在相同的加载状态下。具体而言,这些字段包括:
- 下一个 CU ID(nextCUID): 表示下一个将要加载的压缩单元的 ID。
- 上次加载数量(lastLoadNum): 表示上一次加载的压缩单元数量。
- 当前加载数量(curLoadNum): 表示当前加载的压缩单元数量。
通过检查这些字段的值,可以确保不同列的加载状态是一致的。这是因为在某些情况下,不同列可能在不同的时间加载不同数量的压缩单元。例如,如果某列的加载比其他列更快,它可能会在其他列加载完成之前加载更多的压缩单元。
举例来说,假设有两列 A 和 B,它们都有各自的 LoadCUDescCtl。在某个时间点,A 列的 nextCUID 是 10,lastLoadNum 是 5,curLoadNum是 8,而 B 列的对应值是相同的,那么它们就是一致的。如果它们的这些值在某个时间点不同,那么就表示加载的状态不一致。
CStore::CheckConsistenceOfCUDesc 函数
CStore::CheckConsistenceOfCUDesc 函数用于检查不同列的压缩单元描述符(CUDesc)在给定的 CUDesc 索引(cudescIdx)下的一致性。一致性主要涉及以下两个关键字段:
- CU ID(cu_id): 压缩单元的唯一标识符。
- 行数(row_count): 压缩单元中的行数。
通过比较这些字段的值,可以确保不同列在相同 CUDesc 索引下的加载状态是一致的。
举例来说,如果有两列 A 和 B,在 CUDesc 索引为 3 的情况下,如果 A 列和 B 列的 CUDesc 的 cu_id 和 row_count 字段的值相同,那么它们就是一致的。如果这些值在某个时间点不同,就表示加载的状态不一致,可能是由于某些加载错误或者数据不一致导致的。
CStore::CheckConsistenceOfCUDesc 函数函数源码如下:(路径:src/gausskernel/storage/cstore/cstore_am.cpp
)
/** @Description: 检查不同列的压缩单元描述符(CUDesc)在给定的CUDesc索引(cudescIdx)下的一致性。* 一致性主要涉及CU ID(cu_id)和行数(row_count)这两个关键字段的比较,* 确保不同列在相同CUDesc索引下的加载状态是一致的。* @Param[IN] cudescIdx: 给定的CUDesc索引* @See also:*/
void CStore::CheckConsistenceOfCUDesc(int cudescIdx) const
{CUDesc* firstCUDesc = m_CUDescInfo[0]->cuDescArray + cudescIdx; // 获取第一列的CUDescCUDesc* checkCUDesc = NULL; // 待检查的CUDesc// 遍历各列,比较CU ID和行数字段的值for (int col = 1; col < m_colNum; ++col) {checkCUDesc = m_CUDescInfo[col]->cuDescArray + cudescIdx; // 获取当前列的CUDesc// 如果CU ID和行数字段的值相同,说明一致,继续下一列的检查if (checkCUDesc->cu_id == firstCUDesc->cu_id && checkCUDesc->row_count == firstCUDesc->row_count) {continue;}// 如果不一致,报告错误信息ereport(defence_errlevel(),(errcode(ERRCODE_INTERNAL_ERROR),errmsg("Inconsistent of CUDesc(table column, CUDesc index, CU id, number of rows) during batch loading, ""CUDesc[%d] (%d %d %u %d), CUDesc[%d] (%d %d %u %d)",0,(m_colId[0] + 1),cudescIdx,firstCUDesc->cu_id,firstCUDesc->row_count,col,(m_colId[col] + 1),cudescIdx,checkCUDesc->cu_id,checkCUDesc->row_count),errdetail("relation info: name \"%s\", namespace id %u, id %u, relfilenode %u/%u/%u",RelationGetRelationName(m_relation),RelationGetNamespace(m_relation),RelationGetRelid(m_relation),m_relation->rd_node.spcNode,m_relation->rd_node.dbNode,m_relation->rd_node.relNode)));}
}
CStore::CheckConsistenceOfCUData 函数
CStore::CheckConsistenceOfCUData 函数用于检查列存储引擎中压缩单元数据(CU)与相应的压缩单元描述符(CUDesc)之间的一致性。它包括检查关键字段如数据指针、偏移指针、magic、行数和数据大小等,确保在批量加载期间数据的正确性。如果发现不一致性,函数将生成错误报告,包括详细的关系信息、列信息以及不匹配的字段值,帮助在数据存储和访问过程中及早发现和排除问题。CStore::CheckConsistenceOfCUData 函数函数源码如下:(路径:src/gausskernel/storage/cstore/cstore_am.cpp
)
/** @Description: 检查压缩单元数据(CU)与相应的压缩单元描述符(CUDesc)之间的一致性,包括检查数据指针、偏移指针、魔数、行数和数据大小等关键字段。* @Param[IN] cuDescPtr: 压缩单元描述符指针* @Param[IN] cu: 压缩单元指针* @Param[IN] col: 列号* @See also: CStore::GetCUData*/
void CStore::CheckConsistenceOfCUData(CUDesc* cuDescPtr, CU* cu, AttrNumber col) const
{/** 该内存屏障防止乱序读取,可能导致使用未完成解压的CU。* 我们必须在GetCUData函数的每个分支中返回cuPtr之前添加内存屏障。*/
#ifdef __aarch64__pg_memory_barrier();
#endif/* 检查源数据指针 */if (cu->m_srcData == NULL) {ereport(defence_errlevel(),(errcode(ERRCODE_INTERNAL_ERROR),errmsg("CU的m_srcData指针在CheckConsistenceOfCUData中为NULL。"),errdetail("relation info: name \"%s\", namespace id %u, id %u, relfilenode %u/%u/%u",RelationGetRelationName(m_relation), RelationGetNamespace(m_relation), RelationGetRelid(m_relation),m_relation->rd_node.spcNode, m_relation->rd_node.dbNode, m_relation->rd_node.relNode),errdetail_internal("CU信息: 表列 %d, id %u, 偏移 %lu, 大小 %d, 行数 %d",col,cuDescPtr->cu_id, cuDescPtr->cu_pointer, cuDescPtr->cu_size, cuDescPtr->row_count)));}/* 检查偏移指针 */if ((cu->m_eachValSize < 0 && cu->m_offset == NULL) || (cu->HasNullValue() && cu->m_offset == NULL)) {ereport(defence_errlevel(),(errcode(ERRCODE_INTERNAL_ERROR),errmsg("CU的m_offset指针在CheckConsistenceOfCUData中为NULL。"),errdetail("relation info: name \"%s\", namespace id %u, id %u, relfilenode %u/%u/%u",RelationGetRelationName(m_relation), RelationGetNamespace(m_relation), RelationGetRelid(m_relation),m_relation->rd_node.spcNode, m_relation->rd_node.dbNode, m_relation->rd_node.relNode),errdetail_internal("CU信息: 表列 %d, id %u, 偏移 %lu, 大小 %d, 行数 %d",col,cuDescPtr->cu_id, cuDescPtr->cu_pointer, cuDescPtr->cu_size, cuDescPtr->row_count)));}/* 检查magic */if (cu->m_magic != cuDescPtr->magic) {ereport(defence_errlevel(),(errcode(ERRCODE_INTERNAL_ERROR),errmsg("缓存的CU数据与CUDesc之间的magic不匹配,CUDesc的magic %u,CU的magic %u",cuDescPtr->magic,cu->m_magic),errdetail("relation info: name \"%s\", namespace id %u, id %u, relfilenode %u/%u/%u",RelationGetRelationName(m_relation), RelationGetNamespace(m_relation), RelationGetRelid(m_relation),m_relation->rd_node.spcNode, m_relation->rd_node.dbNode, m_relation->rd_node.relNode),errdetail_internal("CU信息: 表列 %d, id %u, 偏移 %lu, 大小 %d, 行数 %d",col,cuDescPtr->cu_id, cuDescPtr->cu_pointer, cuDescPtr->cu_size, cuDescPtr->row_count)));}/* 检查行数 */if (cu->m_offsetSize > 0) {/* 参见CU::FormValuesOffset() */if ((cu->m_offsetSize / (int)sizeof(int32)) != (cuDescPtr->row_count + 1)) {ereport(defence_errlevel(),(errcode(ERRCODE_INTERNAL_ERROR),errmsg("缓存的CU数据与CUDesc之间的行数不匹配,CUDesc的行数 %d,CU的行数 %d",cuDescPtr->row_count,((cu->m_offsetSize / (int)sizeof(int32)) - 1)),errdetail("relation info: name \"%s\", namespace id %u, id %u, relfilenode %u/%u/%u",RelationGetRelationName(m_relation),RelationGetNamespace(m_relation),RelationGetRelid(m_relation),m_relation->rd_node.spcNode,m_relation->rd_node.dbNode,m_relation->rd_node.relNode),errdetail_internal("CU信息: 表列 %d, id %u, 偏移 %lu, 大小 %d, 魔数 %u",col,cuDescPtr->cu_id,cuDescPtr->cu_pointer,cuDescPtr->cu_size,cuDescPtr->magic)));}}/* 检查CU大小 */if (cu->m_cuSize != (uint32)cuDescPtr->cu_size) {ereport(defence_errlevel(),(errcode(ERRCODE_INTERNAL_ERROR),errmsg("缓存的CU数据与CUDesc之间的CU大小不匹配,CUDesc的CU大小 %u,CU的CU大小 %u",(uint32)cuDescPtr->cu_size,cu->m_cuSize),errdetail("relation info: name \"%s\", namespace id %u, id %u, relfilenode %u/%u/%u",RelationGetRelationName(m_relation),RelationGetNamespace(m_relation),RelationGetRelid(m_relation),m_relation->rd_node.spcNode,m_relation->rd_node.dbNode,m_relation->rd_node.relNode),errdetail_internal("CU信息: 表列 %d, id %u, 偏移 %lu, 行数 %d, 魔数 %u",col,cuDescPtr->cu_id,cuDescPtr->cu_pointer,cuDescPtr->row_count,cuDescPtr->magic)));}
}
额外补充
到此,以上便基本介绍完了 CStore 类中的绝大多数成员函数,最后再补充一下CStore 类中的私有成员变量吧,这些成员变量用于维护和管理数据库表的状态、元数据和其他相关信息。函数源码如下:(路径:src/gausskernel/storage/cstore/cstore_am.cpp
)
private:// 本地控制私有内存使用。// m_scanMemContext:用于整个 cstore-scan 过程中存活的对象。// m_perScanMemCnxt:用于每个堆表扫描和解压缩期间的临时空间。MemoryContext m_scanMemContext;MemoryContext m_perScanMemCnxt;// 当前要使用的快照。Snapshot m_snapshot;// 1. 已访问的用户列 ID// 2. 已访问的系统列 ID// 3. 用于延迟读取的标志// 4. 每个用户列的 CU 存储。int *m_colId;int *m_sysColId;bool *m_lateRead;CUStorage **m_cuStorage;// 1. 已访问列的 CUDesc 信息// 2. 用于系统或常量列的虚拟 CUDescLoadCUDescCtl **m_CUDescInfo;LoadCUDescCtl *m_virtualCUDescInfo;// 已访问的 CUDesc 索引数组// 在 RoughCheck 之后,将访问哪个 CUint *m_CUDescIdx;// adio 参数int m_lastNumCUDescIdx;int m_prefetch_quantity;int m_prefetch_threshold;bool m_load_finish;// 当前扫描位置在 CU 内int *m_scanPosInCU;// Rough Check 函数RoughCheckFunc *m_RCFuncs;typedef int (CStore::*m_colFillFun)(int seq, CUDesc *cuDescPtr, ScalarVector *vec);typedef struct {m_colFillFun colFillFun[2];} colFillArray;typedef void (CStore::*FillVectorByTidsFun)(_in_ int colIdx, _in_ ScalarVector *tids, _out_ ScalarVector *vec);typedef void (CStore::*FillVectorLateReadFun)(_in_ int seq, _in_ ScalarVector *tids, _in_ CUDesc *cuDescPtr,_out_ ScalarVector *vec);FillVectorByTidsFun *m_fillVectorByTids;FillVectorLateReadFun *m_fillVectorLateRead;colFillArray *m_colFillFunArrary;typedef void (CStore::*fillMinMaxFuncPtr)(CUDesc *cuDescPtr, ScalarVector *vec, int pos);fillMinMaxFuncPtr *m_fillMinMaxFunc;ScanFuncPtr m_scanFunc; // cstore scan function ptr// 该计划的节点 IDint m_plan_node_id;// 1. 已访问的用户列数// 2. 已访问的系统列数。int m_colNum;int m_sysColNum;// 1. 加载的 CUDesc 信息或虚拟 CUDesc 信息的长度// 2. m_CUDescIdx 的长度int m_NumLoadCUDesc;int m_NumCUDescIdx;// 1. 当前删除掩码的 CU ID。// 2. m_CUDescIdx 中的当前访问游标// 3. CU 内的当前访问行游标uint32 m_delMaskCUId;int m_cursor;int m_rowCursorInCU;uint32 m_startCUID; /* 扫描开始的 CU ID。 */uint32 m_endCUID; /* 扫描结束的 CU ID。 */unsigned char m_cuDelMask[MaxDelBitmapSize];// 是否存在死行bool m_hasDeadRow;// 是否需要进行 Rough Checkbool m_needRCheck;// 仅访问常量列bool m_onlyConstCol;bool m_timing_on; /* 记录 CStoreScan 步骤的时间 */RangeScanInRedis m_rangeScanInRedis; /* 如果在重分布时是范围扫描 */// cbtree 索引标志bool m_useBtreeIndex;// 第一列的索引,从 0 开始int m_firstColIdx;// 用于延迟读取// cuDesc 数组中的批次的 cuDesc ID。int m_cuDescIdx;// 用于延迟读取// 第一个延迟读取列的索引,其中填充了 ctid。int m_laterReadCtidColIdx;
};