列存储(Insert)
- 概述
- 相关函数
- CStoreUpdate::ExecUpdate 函数
- JunkFilter 结构体
- CStoreInsert::BatchInsert 函数
- bulkload_rows::append_one_vector 函数
- bulkload_rows::append_in_column_orientation我函数
- ExecVecUpdate 函数
- CStoreUpdate::EndUpdate 函数
声明:本文的部分内容参考了他人的文章。在编写过程中,我们尊重他人的知识产权和学术成果,力求遵循合理使用原则,并在适用的情况下注明引用来源。
本文主要参考了 OpenGauss1.1.0 的开源代码和《OpenGauss数据库源码解析》一书以及OpenGauss社区学习文档
概述
在先前的学习中分别介绍了列存储(创建表)和列存储(Insert),相比大家对列存储也有了初步的认识和了解。本文接下来将学习列存储(update)的相关知识。
列存储和行存储是数据库中两种不同的数据存储方式,它们在数据更新操作上有一些显著的区别:
- 行存储(Row-based storage):
- 存储方式: 行存储以行为单位存储数据,整个行的数据被存储在一起。
- 更新操作: 当执行更新操作时,行存储需要更新整行的数据。即使只修改了一部分字段,也需要更新整个行,这可能导致额外的存储和 I/O 操作。
- 列存储(Column-based storage):
- 存储方式: 列存储以列为单位存储数据,每一列的数据被存储在一起。
- 更新操作: 列存储在更新操作上具有优势,因为它只需要更新涉及到的列。当只修改表中的一部分字段时,只有相关列需要被更新,而其他列保持不变。这可以减少更新操作的开销。
相关函数
按照传统,我们按照一个实际案例来学习,创建一个名为 sales 的列存储表,用于存储销售数据,表的结构如下:
--------创建表
CREATE TABLE columnar_test (id INT,name VARCHAR(50),age INT,city VARCHAR(50)
) WITH (ORIENTATION = COLUMN);--------插入数据
INSERT INTO columnar_test (id, name, age, city)
VALUES(1, 'Alice', 25, 'New York'),(2, 'Bob', 30, 'San Francisco'),(3, 'Charlie', 28, 'Los Angeles'),(4, 'David', 35, 'Chicago'),(5, 'Eva', 22, 'Miami');postgres=# select * from columnar_test;id | name | age | city
----+---------+-----+---------------1 | Alice | 25 | New York2 | Bob | 30 | San Francisco3 | Charlie | 28 | Los Angeles4 | David | 35 | Chicago5 | Eva | 22 | Miami
(5 rows)--------更新数据
UPDATE columnar_test SET name = 'kuchiki' where id = 1;
CStoreUpdate::ExecUpdate 函数
CStoreUpdate::ExecUpdate 函数主要用于执行更新操作。首先,它确保输入的 VectorBatch 和相关的执行信息不为空,并进行必要的断言检查。然后,通过 JunkFilter(废弃数据过滤器)执行删除操作,将批量数据中的符合删除条件的行进行删除。接着,调整批量数据的列数以适应执行环境,并检查批量数据是否符合表的约束条件。最后,根据是分区表还是非分区表,调用相应的批量插入方法(BatchInsert)执行更新后的插入操作。函数返回更新的行数。这一系列操作确保了在更新数据时的一致性和有效性。
函数接受两个参数:
- VectorBatch* batch: 表示待更新的批量数据,以 VectorBatch 的形式传入。VectorBatch 通常是一组行存储的数据,包含多个列。
- int options: 表示更新操作的选项,以整数形式传入。这个参数可能包含有关更新行为的一些额外信息,比如是否跳过写入日志(WAL)等。
函数入参调试信息如下所示:
(gdb) p *batch
$1 = {<BaseObject> = {<No data fields>}, m_rows = 1, m_cols = 6, m_checkSel = false, m_sel = 0x7f8e87f29450, m_arr = 0x7f8e87f29880, m_sysColumns = 0x0,m_pCompressBuf = 0x0}
(gdb) p *batch.m_sel
$2 = true
(gdb) p *batch.m_arr
$4 = {<BaseObject> = {<No data fields>}, m_rows = 1, m_desc = {<BaseObject> = {<No data fields>}, typeId = 23, typeMod = -1, encoded = false}, m_const = false,m_flag = 0x7f8e87e8df40 "", m_buf = 0x7f8e87f22088, m_vals = 0x7f8e87f20060, m_addVar = (Datum (ScalarVector::*)(ScalarVector * const, Datum, int)) 0x173136a<ScalarVector::AddHeaderVar(unsigned long, int)>}
(gdb) p options
$5 = 0
VectorBatch 类的部分成员函数解释如下:
class VectorBatch : public BaseObject {
public:// 批量数据中的行数。int m_rows;// 批量数据中的列数。int m_cols;// 是否检查选择向量。bool m_checkSel;// 选择向量,用于标记批量数据中哪些行是有效的。bool* m_sel;// ScalarVector数组,每个ScalarVector对应批量数据中的一列。ScalarVector* m_arr;// SysColumns,用于存储系统列的容器。SysColContainer* m_sysColumns;// 压缩缓冲区,用于存储批量数据的压缩信息。StringInfo m_pCompressBuf;
};
CStoreUpdate::ExecUpdate 函数源码如下:(路径:src/gausskernel/storage/cstore/cstore_update.cpp
)
uint64 CStoreUpdate::ExecUpdate(_in_ VectorBatch* batch, _in_ int options)
{// 断言:确保输入的VectorBatch和相关的执行信息不为空Assert(batch && m_resultRelInfo && m_delete);// 断言:确保(m_isPartition && m_partionInsert)或者(!m_isPartition && m_insert)条件成立Assert((m_isPartition && m_partionInsert) || (!m_isPartition && m_insert));// 获取废弃数据过滤器JunkFilter* junkfilter = m_resultRelInfo->ri_junkFilter;// 调用删除操作,通过JunkFilter执行删除,并删除批量数据中符合条件的行m_delete->PutDeleteBatch(batch, junkfilter);// 记录批量数据的原始列数int oriCols = batch->m_cols;// 调整批量数据的列数以适应执行环境batch->m_cols = junkfilter->jf_cleanTupType->natts;// 检查批量数据是否符合表的约束条件if (m_relation->rd_att->constr)ExecVecConstraints(m_resultRelInfo, batch, m_estate);// 根据是分区表还是非分区表,调用相应的批量插入方法,执行更新后的插入操作if (m_isPartition)m_partionInsert->BatchInsert(batch, options);elsem_insert->BatchInsert(batch, options);// 恢复批量数据的原始列数batch->m_cols = oriCols;// 返回更新的行数return (uint64)(uint32)batch->m_rows;
}
其中 CStoreUpdate::ExecUpdate 函数的调用关系如下所示:
┌──cstore_update.cpp─────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┐│198 { ││199 Assert(sortTupDesc && m_resultRelInfo && m_delete); ││200 ││201 JunkFilter* junkfilter = m_resultRelInfo->ri_junkFilter; ││202 ││203 // init delete sort state ││204 m_delete->InitSortState(sortTupDesc, junkfilter->jf_xc_part_id, junkfilter->jf_junkAttNo); ││205 } ││206 ││207 uint64 CStoreUpdate::ExecUpdate(_in_ VectorBatch* batch, _in_ int options) ││208 { │
B+>│209 Assert(batch && m_resultRelInfo && m_delete); ││210 Assert((m_isPartition && m_partionInsert) || (!m_isPartition && m_insert)); ││211 ││212 JunkFilter* junkfilter = m_resultRelInfo->ri_junkFilter; ││213 ││214 // delete ││215 m_delete->PutDeleteBatch(batch, junkfilter); ││216 ││217 int oriCols = batch->m_cols; ││218 batch->m_cols = junkfilter->jf_cleanTupType->natts; ││219 ││220 // Check the constraints of the batch ││221 if (m_relation->rd_att->constr) ││222 ExecVecConstraints(m_resultRelInfo, batch, m_estate); │└────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┘
multi-thre Thread 0x7f8e8 In: CStoreUpdate::ExecUpdate Line: 209 PC: 0x1b66ed5
#0 CStoreUpdate::ExecUpdate (this=0x7f8e907491f8, batch=0x7f8e87f293e8, options=0) at cstore_update.cpp:209
#1 0x000000000169360d in ExecVecUpdate<CStoreUpdate> (state=0x7f8e90748060, update_op=0x7f8e907491f8, batch=0x7f8e87f293e8, estate=0x7f8e8824e060,can_set_tag=true, options=0) at vecmodifytable.cpp:111
#2 0x0000000001691e54 in ExecVecModifyTable (node=0x7f8e90748060) at vecmodifytable.cpp:662
#3 0x000000000173425f in VectorEngine (node=0x7f8e90748060) at vecexecutor.cpp:171
#4 0x0000000001687fd5 in ExecVecToRow (state=0x7f8e8675a060) at vectortorow.cpp:149
#5 0x000000000159a439 in ExecProcNodeByType (node=0x7f8e8675a060) at execProcnode.cpp:677
#6 0x000000000159a8dd in ExecProcNode (node=0x7f8e8675a060) at execProcnode.cpp:769
#7 0x0000000001595232 in ExecutePlan (estate=0x7f8e8824e060, planstate=0x7f8e8675a060, operation=CMD_UPDATE, sendTuples=false, numberTuples=0,direction=ForwardScanDirection, dest=0x7f8e88245be8) at execMain.cpp:2124
#8 0x0000000001591d6a in standard_ExecutorRun (queryDesc=0x7f8e8825d060, direction=ForwardScanDirection, count=0) at execMain.cpp:608
#9 0x000000000139a5d4 in explain_ExecutorRun (queryDesc=0x7f8e8825d060, direction=ForwardScanDirection, count=0) at auto_explain.cpp:116
#10 0x000000000159188f in ExecutorRun (queryDesc=0x7f8e8825d060, direction=ForwardScanDirection, count=0) at execMain.cpp:484
---Type <return> to continue, or q <return> to quit---
JunkFilter 结构体
在 CStoreUpdate::ExecUpdate 函数中调用了,调用了一个 JunkFilter 结构体来获取废弃数据过滤器:
// 获取废弃数据过滤器JunkFilter* junkfilter = m_resultRelInfo->ri_junkFilter;// 调用删除操作,通过JunkFilter执行删除,并删除批量数据中符合条件的行m_delete->PutDeleteBatch(batch, junkfilter);// 记录批量数据的原始列数int oriCols = batch->m_cols;// 调整批量数据的列数以适应执行环境batch->m_cols = junkfilter->jf_cleanTupType->natts;
下面解释一下 JunkFilter 结构体的作用: JunkFilter 结构体用于存储有关垃圾属性(junk attributes)的信息。垃圾属性是元组中仅用于在执行器中存储中间信息的属性,不应包含在生成的元组中。例如,在执行 UPDATE 查询时,规划器向目标列表添加一个“垃圾”条目,以便由 ExecutePlan() 返回的元组包含额外的属性:待更新的元组的 ctid。这是执行更新所需的,但我们不希望 ctid 成为存储的新元组的一部分!因此,我们应用一个“垃圾过滤器”来移除垃圾属性并形成真正的输出元组。垃圾过滤器代码还提供了从输入元组中提取垃圾属性值的例程。函数源码如下:(路径:src/include/nodes/execnodes.h
)
/* * JunkFilter结构体,用于存储垃圾属性信息*/
typedef struct JunkFilter {NodeTag type; /* 结点类型 */List* jf_targetList; /* 原始目标列表(包括垃圾属性) */TupleDesc jf_cleanTupType; /* "清理"元组的元组描述符(不包括垃圾属性) */AttrNumber* jf_cleanMap; /* 非垃圾属性的“原始”元组属性号和“清理”元组属性号的对应关系映射 */TupleTableSlot* jf_resultSlot; /* 用于保存清理后的元组的元组槽 */AttrNumber jf_junkAttNo; /* 未被垃圾过滤器代码使用,可以被调用者用于记住特定垃圾属性的属性号 */
#ifdef PGXC/* * 在PGXC中,类似于jf_junkAttNo,jf_xc_node_id和jf_xc_wholerow用于保存xc_node_id和wholerow的垃圾属性号。* 在PG中,jf_junkAttNo仅用于ctid或wholerow之一,不需要同时包含两者;ctid用于物理关系,而wholerow用于视图。*/AttrNumber jf_xc_node_id; /* xc_node_id的垃圾属性号 */AttrNumber jf_xc_wholerow; /* xc_wholerow的垃圾属性号 */AttrNumber jf_xc_part_id; /* xc_part_id的垃圾属性号 */AttrNumber jf_xc_bucket_id; /* xc_bucket_id的垃圾属性号 */List* jf_primary_keys; /* 主键列表,用于PGXC */
#endif
} JunkFilter;
CStoreInsert::BatchInsert 函数
CStoreInsert::BatchInsert 函数是用于批量插入数据的函数 ,通过向列存储表中插入数据来进行大批量的数据加载。
该函数主要有三个步骤:
- 部分聚簇键的情况下:
- 将数据放入排序容器中,当排序容器已满或插入结束时执行排序。
- 重置排序容器,并获取下一批值。
- 重置并释放所有内存块。
- 没有部分聚簇键的情况下:
- 缓存数据直到 batchrows 被填满。
- 如果 batchrows 已满,执行批量插入。根据是否启用 DELTA,执行插入操作或者普通批量插入。
- 将 pBatch 的数据追加到 m_bufferedBatchRows 中,直到所有数据都已追加。
- 已经结束批量插入:
- 如果启用 DELTA,执行 InsertDeltaTable 操作,否则执行普通批量插入。
- 重置 m_bufferedBatchRows,释放相关内存块。
函数源码如下:(路径:src/gausskernel/storage/cstore/cstore_delete.cpp
)
// BatchInsert
// 用于列存表的向量接口插入大批量数据
void CStoreInsert::BatchInsert(_in_ VectorBatch* pBatch, _in_ int options)
{// 确保 pBatch 不为空或者已经结束插入Assert(pBatch || IsEnd());// 切换内存上下文,以防止在大批量插入期间内存泄漏MemoryContext oldCnxt = MemoryContextSwitchTo(m_tmpMemCnxt);// 步骤 1: 表具有部分聚簇键// 我们需要将数据放入排序容器中,然后批量插入数据if (NeedPartialSort()) {// 确保 m_tmpBatchRows 不为空Assert(m_tmpBatchRows);// 如果有 pBatch,确保列数与表的属性数一致if (pBatch) {Assert(pBatch->m_cols == m_relation->rd_att->natts);m_sorter->PutVecBatch(m_relation, pBatch);}// 如果排序器已满或者已经结束,执行排序if (m_sorter->IsFull() || IsEnd()) {m_sorter->RunSort();// 重置并获取下一批值DoBatchInsert(options);m_sorter->Reset(IsEnd());// 重置并释放所有内存块m_tmpBatchRows->reset(false);}}// 步骤 2: 表没有部分聚簇键// 我们需要缓存数据直到 batchrows 被填满else {// 确保 m_bufferedBatchRows 不为空Assert(m_bufferedBatchRows);// 如果 batchrows 已满,执行批量插入if (IsEnd()) {// 根据是否启用 DELTA,执行插入操作if (ENABLE_DELTA(m_bufferedBatchRows)) {InsertDeltaTable(m_bufferedBatchRows, options);} else {BatchInsertCommon(m_bufferedBatchRows, options);}m_bufferedBatchRows->reset(true);}// 需要缓存数据直到 batchrows 被填满if (pBatch) {// 确保 pBatch 的行数不超过 BatchMaxSizeAssert(pBatch->m_rows <= BatchMaxSize);// 确保 pBatch 的列数和表的属性数不为零Assert(pBatch->m_cols && m_relation->rd_att->natts);// 确保 m_bufferedBatchRows 的最大行数大于零Assert(m_bufferedBatchRows->m_rows_maxnum > 0);// 确保 m_bufferedBatchRows 的最大行数是 BatchMaxSize 的整数倍Assert(m_bufferedBatchRows->m_rows_maxnum % BatchMaxSize == 0);int startIdx = 0;// 将 pBatch 的数据追加到 m_bufferedBatchRows 中while (m_bufferedBatchRows->append_one_vector(RelationGetDescr(m_relation), pBatch, &startIdx, m_cstorInsertMem)) {// 执行批量插入BatchInsertCommon(m_bufferedBatchRows, options);m_bufferedBatchRows->reset(true);}// 确保所有数据都已追加Assert(startIdx == pBatch->m_rows);}}// 步骤 3: 如果已经结束批量插入,必须更新索引数据FlushIndexDataIfNeed();// 重置内存上下文MemoryContextReset(m_tmpMemCnxt);// 切回原始内存上下文(void)MemoryContextSwitchTo(oldCnxt);
}
bulkload_rows::append_one_vector 函数
bulkload_rows::append_one_vector 函数的主要作用是将一个向量追加到 batchrows 中,采用了一些策略来控制内存分配和数据的追加。其中,采用了三个元组(第一个元组、最后一个元组和一个随机元组)的采样,通过计算它们的大小来确定平均大小 tuple_size。接着,根据内存信息参数 m_memInfo 和总内存大小的限制,决定是执行列扩展还是按行方向追加数据。如果需要内存扩展,则进行相应的处理,并记录相关日志。函数源码如下:(路径:src/gausskernel/storage/cstore/cstore_vector.cpp
)
/** @Description: 将一个向量追加到 batchrows 中* @IN p_batch: 向量批次* @IN/OUT start_idx: 起始索引,将被更新* @IN tup_desc: 元组描述* @IN m_memInfo: 内存信息参数,用于控制内存分配* @Return: 如果行数或总内存达到限制,返回 true* @See also:*/
bool bulkload_rows::append_one_vector(TupleDesc tup_desc, VectorBatch* p_batch, int* start_idx, MemInfoArg* m_memInfo)
{// 确保向量批次的行数大于零Assert(p_batch->m_rows > 0);// 检查是否还有待处理的向量if (*start_idx < p_batch->m_rows) {// 使用第一个元组、最后一个元组和一个随机元组作为采样来确定平均大小Size first_tuple_size = (this->*m_form_sample_tuple_size_func)(tup_desc, p_batch, *start_idx);Size last_tuple_size = (this->*m_form_sample_tuple_size_func)(tup_desc, p_batch, (p_batch->m_rows - 1));int random_pos = (((unsigned int)random()) % (p_batch->m_rows - *start_idx)) + *start_idx;Size random_tuple_size = (this->*m_form_sample_tuple_size_func)(tup_desc, p_batch, random_pos);Size tuple_size = MaxTriple(first_tuple_size, last_tuple_size, random_tuple_size);// 如果启用了内存信息参数,并且插入内存不为零if (m_memInfo != NULL && m_memInfo->MemInsert > 0 &&((unsigned long)(unsigned int)m_memInfo->MemInsert < m_using_blocks_total_rawsize / 1024ULL)) {// 计算可用的扩展内存,最大为 dywlm_client_get_memory(),且不超过工作内存的 10%int64 spreadMem = Min(dywlm_client_get_memory() * 1024L, m_memInfo->MemInsert * 1024L) / 1024;// 如果扩展内存超过插入内存的 10%if (spreadMem > m_memInfo->MemInsert * 0.1) {// 增加插入内存,并调整上下文的最大空间大小m_memInfo->MemInsert += spreadMem;m_memInfo->spreadNum++;AllocSet context = (AllocSet)m_context->parent;context->maxSpaceSize += spreadMem * 1024L;MEMCTL_LOG(DEBUG2,"CStoreInsert(Batch) 自动内存扩展 %ldKB 成功,工作内存为 %dKB,扩展次数为 %d。",spreadMem,m_memInfo->MemInsert,m_memInfo->spreadNum);} else {// 内存扩展失败,记录日志并返回 trueMEMCTL_LOG(LOG,"CStoreInsert(Batch) 自动内存扩展 %ldKB 失败,工作内存为 %dKB。",spreadMem,m_memInfo->MemInsert);return true;}}// 如果剩余总内存和每个元组的平均大小足够,执行列扩展if ((m_using_blocks_total_rawsize < BULKLOAD_MAX_MEMSIZE) &&((BULKLOAD_MAX_MEMSIZE - m_using_blocks_total_rawsize) / (p_batch->m_rows - *start_idx) > tuple_size)) {return (this->*m_form_append_column_func)(tup_desc, p_batch, start_idx);} else {// 否则,按行方向追加return append_in_row_orientation(tup_desc, p_batch, start_idx);}} else {// 快速路径判断是否已处理完 p_batchreturn full_rownum() || full_rowsize();}
}
调试信息如下所示:
(gdb) p first_tuple_size
$1 = 28
(gdb) p last_tuple_size
$2 = 28
(gdb) p random_pos
$3 = 0
(gdb) p random_tuple_size
$4 = 28
(gdb) p tuple_size
$5 = 28
bulkload_rows::append_in_column_orientation我函数
bulkload_rows::append_in_column_orientation 函数是 bulkload_rows 结构体中的模板方法,用于在列存储场景下,将一个向量批次按列方向追加到批量行数据结构中。函数通过遍历属性,处理每个属性的一批值,将其解码并追加到相应的列向量中,同时进行最小/最大值的比较。此外,函数还处理了被删除的列的情况,并根据实际情况更新行数和处理游标。函数的主要目的是构建列存储的内部数据结构,以便有效地处理大规模数据的批量加载。函数源码如下:(路径:src/gausskernel/storage/cstore/cstore_vector.cpp
)
/** @Description: 在列方向追加一个向量* @IN p_batch: 一个向量批次* @IN/OUT start_idx: 起始位置* @IN tup_desc: 元组描述* @Return: 如果该 batchrow 已满,返回 true* @See also:*/
template <bool hasDroppedColumn>
bool bulkload_rows::append_in_column_orientation(TupleDesc tup_desc, VectorBatch* p_batch, int* start_idx)
{// 断言,确保向量批次非空,属性数量匹配,且当前行数未达到最大值Assert(p_batch);Assert(m_attr_num == tup_desc->natts);Assert(m_rows_curnum < m_rows_maxnum);// 获取元组属性数量和最大行数int const nattrs = tup_desc->natts;int const maxRows = Min(m_rows_maxnum - m_rows_curnum, p_batch->m_rows - *start_idx);// 获取 ScalarVector 和 bulkload_vector 的指针,以及元组属性信息ScalarVector* scalar_vector = p_batch->m_arr;bulkload_vector* vector = m_vectors;Form_pg_attribute* attrs = tup_desc->attrs;// 遍历属性for (int attrIdx = 0; attrIdx < nattrs; ++attrIdx) {// 如果有被删除的列,并且当前列为被删除的列if (hasDroppedColumn && (*attrs)->attisdropped) {/* 跳到下一个属性 *//* 对于被删除的列,设置所有值为 NULL */int destRowIdx = m_rows_curnum;for (int rowCnt = 0; rowCnt < maxRows; ++rowCnt) {vector->m_values_nulls.set_null(destRowIdx++);}++scalar_vector;++vector;++attrs;continue;}int srcRowIdx = *start_idx;int destRowIdx = m_rows_curnum;/* 处理一个属性的一批值 */for (int rowCnt = 0; rowCnt < maxRows; ++rowCnt) {if (unlikely(scalar_vector->IsNull(srcRowIdx))) {/* 追加一个 NULL 值 */vector->m_values_nulls.set_null(destRowIdx);} else {/* 从标量向量解码值 */Datum value = (vector->*(vector->m_decode))(scalar_vector, srcRowIdx);/* 将此值追加到向量中 */value = (vector->*(vector->m_append))(this, value, (*attrs)->attlen);/* 比较最小/最大值 */vector->m_minmax.m_compare(vector->m_minmax.m_min_buf,vector->m_minmax.m_max_buf,value,&(vector->m_minmax.m_first_compare),&(vector->m_minmax.m_varstr_maxlen));}/* 移动到该属性的下一个值 */++srcRowIdx;++destRowIdx;}/* 跳到下一个属性 */++scalar_vector;++vector;++attrs;}/* 更新行数和处理游标 */m_rows_curnum += maxRows;*start_idx += maxRows;// 返回是否已满return full_rownum() || full_rowsize();
}
这里附上以上代码的调试信息:
(gdb) p nattrs
$1 = 4
(gdb) p maxRows
$2 = 1
(gdb) p * scalar_vector
$3 = {<BaseObject> = {<No data fields>}, m_rows = 1, m_desc = {<BaseObject> = {<No data fields>}, typeId = 23, typeMod = -1, encoded = false}, m_const = false,m_flag = 0x7f15adf9df40 "", m_buf = 0x7f15ae012088, m_vals = 0x7f15ae010060, m_addVar = (Datum (ScalarVector::*)(ScalarVector * const, Datum, int)) 0x173136a<ScalarVector::AddHeaderVar(unsigned long, int)>}
(gdb) p * vector
$4 = {<BaseObject> = {<No data fields>}, m_blocks = {m_head = 0x7f15ae6a2488, m_current = 0x7f15ae6a2488, m_block_num = 1, m_block_size = 8192}, m_attlen = 4,m_decode = (Datum (bulkload_vector::*)(const bulkload_vector * const, ScalarVector *,int)) 0x1b75c74 <bulkload_vector::decode_integer(ScalarVector*, int) const>, m_append = (Datum (bulkload_vector::*)(bulkload_vector * const,bulkload_rows *, Datum, int)) 0x1b7585a <bulkload_vector::append_int32(bulkload_rows*, unsigned long, int)>, m_values_nulls = {m_vals_points = 0x0,m_null_bitmap = 0x7f15ae558060 "", m_vals_num = 0, m_has_null = false, m_all_null = true}, m_minmax = {m_compare = 0x1b68ef1<CompareInt32(char*, char*, Datum, bool*, int*)>, m_finish_compare = 0x1b67159 <FinishCompareFixedLength(char const*, char const*, CUDesc*)>,m_min_buf = '\000' <repeats 31 times>, m_max_buf = '\000' <repeats 31 times>, m_varstr_maxlen = 0, m_first_compare = true}}
(gdb) p ** attrs@4
$5 = {{attrelid = 32784, attname = {data = "id", '\000' <repeats 61 times>}, atttypid = 23, attstattarget = -1, attlen = 4, attnum = 1, attndims = 0,attcacheoff = 0, atttypmod = -1, attbyval = true, attstorage = 112 'p', attalign = 105 'i', attnotnull = false, atthasdef = false, attisdropped = false,attislocal = true, attcmprmode = 127 '\177', attinhcount = 0, attcollation = 0, attkvtype = 0 '\000'}, {attrelid = 32784, attname = {data = "name", '\000' <repeats 59 times>}, atttypid = 1043, attstattarget = -1, attlen = -1, attnum = 2, attndims = 0, attcacheoff = -1, atttypmod = 54,attbyval = false, attstorage = 120 'x', attalign = 105 'i', attnotnull = false, atthasdef = false, attisdropped = false, attislocal = true,attcmprmode = 127 '\177', attinhcount = 0, attcollation = 100, attkvtype = 0 '\000'}, {attrelid = 32784, attname = {data = "age", '\000' <repeats 60 times>}, atttypid = 23, attstattarget = -1, attlen = 4, attnum = 3, attndims = 0, attcacheoff = -1, atttypmod = -1,attbyval = true, attstorage = 112 'p', attalign = 105 'i', attnotnull = false, atthasdef = false, attisdropped = false, attislocal = true,attcmprmode = 127 '\177', attinhcount = 0, attcollation = 0, attkvtype = 0 '\000'}, {attrelid = 32784, attname = {data = "city", '\000' <repeats 59 times>}, atttypid = 1043, attstattarget = -1, attlen = -1, attnum = 4, attndims = 0, attcacheoff = -1, atttypmod = 54,attbyval = false, attstorage = 120 'x', attalign = 105 'i', attnotnull = false, atthasdef = false, attisdropped = false, attislocal = true,attcmprmode = 127 '\177', attinhcount = 0, attcollation = 100, attkvtype = 0 '\000'}}# 执行 Datum value = (vector->*(vector->m_decode))(scalar_vector, srcRowIdx);前后 value 值的变化
(gdb) p value
$16 = 139731090381120的变化
(gdb) p value
$17 = 1
ExecVecUpdate 函数
ExecVecUpdate 函数用于执行矢量化的更新操作。函数接受一个 VecModifyTableState 状态对象、一个表示更新操作的 update_op 对象、一个包含更新数据的矢量批次 batch、执行计划状态对象 estate、一个布尔值 can_set_tag 和一些选项。函数调用 update_op 的 ExecUpdate 方法来执行实际的更新操作,并返回更新的行数。如果 can_set_tag 为真,它会更新执行计划状态对象中的 es_processed 字段,表示处理的总行数。最后,函数返回空指针。函数源码如下:(路径:src/gausskernel/runtime/vecexecutor/vecnode/vecmodifytable.cpp
)
/** @Description: 模板函数,用于执行矢量化的更新操作。* @TemplateParameters:* - T: 表示更新操作的类型,可能是 CStoreUpdate 或其他类型。* @Parameters:* - state: VecModifyTableState 状态对象,表示矢量修改表的状态。* - update_op: 更新操作的对象,可能是 CStoreUpdate 或其他类型。* - batch: 包含更新数据的矢量批次。* - estate: 执行计划状态对象,表示当前执行计划的状态。* - can_set_tag: 一个布尔值,如果为真,表示可以更新执行计划状态的处理行数字段。* - options: 更新操作的选项。* @Return: 返回空指针。*/
template <class T>
VectorBatch* ExecVecUpdate(VecModifyTableState* state, T* update_op, VectorBatch* batch, EState* estate, bool can_set_tag, int options)
{// 执行更新操作,获取更新的行数。uint64 updateRows = update_op->ExecUpdate(batch, options);// 如果 can_set_tag 为真,则更新执行计划状态对象的处理行数字段。if (can_set_tag) {(estate->es_processed) += updateRows;}// 返回空指针。return NULL;
}
CStoreUpdate::EndUpdate 函数
CStoreUpdate::EndUpdate 函数是 CStoreUpdate 类的方法,用于结束更新操作。首先,通过断言确保删除操作对象存在。然后,执行删除操作,将删除的行数记录下来。接着,根据表是否是分区表来判断使用分区插入对象或普通插入对象。如果是分区表,则结束分区插入批量插入操作;如果是普通表,则设置插入操作结束标志,并执行批量插入操作。该函数完成了更新操作的收尾工作,包括删除和插入的处理。函数源码如下所示:(路径:src/gausskernel/storage/cstore/cstore_update.cpp
)
/** @Description: 结束更新操作,执行相应的清理工作。* @Parameters:* - options: 更新操作的选项。*/
void CStoreUpdate::EndUpdate(int options)
{// 断言删除操作对象存在。Assert(m_delete);// 断言插入操作对象存在,并且根据是否是分区表判断使用分区插入对象或普通插入对象。Assert((m_isPartition && m_partionInsert) || (!m_isPartition && m_insert));// 结束删除操作。m_delete->ExecDelete();// 如果是分区表,结束分区插入批量插入。if (m_isPartition) {m_partionInsert->EndBatchInsert();}// 如果是普通表,设置插入操作结束标志,并执行批量插入操作。else {m_insert->SetEndFlag();m_insert->BatchInsert(nullptr, options);}
}
函数调用关系如下所示:
┌──cstore_update.cpp─────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┐│231 ││232 return (uint64)(uint32)batch->m_rows; ││233 } ││234 ││235 void CStoreUpdate::EndUpdate(_in_ int options) ││236 { │
B+>│237 Assert(m_delete); ││238 Assert((m_isPartition && m_partionInsert) || (!m_isPartition && m_insert)); ││239 ││240 // end delete ││241 m_delete->ExecDelete(); ││242 ││243 // end insert ││244 if (m_isPartition) { ││245 m_partionInsert->EndBatchInsert(); ││246 } else { ││247 m_insert->SetEndFlag(); ││248 m_insert->BatchInsert((VectorBatch*)NULL, options); ││249 } ││250 } ││251 ││252 ││253 ││254 ││255 │└────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┘
multi-thre Thread 0x7f15b In: CStoreUpdate::EndUpdate Line: 237 PC: 0x1b6705b
#0 CStoreUpdate::EndUpdate (this=0x7f15ae54b1f8, options=0) at cstore_update.cpp:237
#1 0x00000000016927a1 in ExecVecModifyTable (node=0x7f15ae54a060) at vecmodifytable.cpp:762
#2 0x000000000173425f in VectorEngine (node=0x7f15ae54a060) at vecexecutor.cpp:171
#3 0x0000000001687fd5 in ExecVecToRow (state=0x7f15ae544060) at vectortorow.cpp:149
#4 0x000000000159a439 in ExecProcNodeByType (node=0x7f15ae544060) at execProcnode.cpp:677
#5 0x000000000159a8dd in ExecProcNode (node=0x7f15ae544060) at execProcnode.cpp:769
#6 0x0000000001595232 in ExecutePlan (estate=0x7f15ae540060, planstate=0x7f15ae544060, operation=CMD_UPDATE, sendTuples=false, numberTuples=0,direction=ForwardScanDirection, dest=0x7f15ae465be8) at execMain.cpp:2124
#7 0x0000000001591d6a in standard_ExecutorRun (queryDesc=0x7f15ae47c860, direction=ForwardScanDirection, count=0) at execMain.cpp:608
#8 0x000000000139a5d4 in explain_ExecutorRun (queryDesc=0x7f15ae47c860, direction=ForwardScanDirection, count=0) at auto_explain.cpp:116
#9 0x000000000159188f in ExecutorRun (queryDesc=0x7f15ae47c860, direction=ForwardScanDirection, count=0) at execMain.cpp:484
#10 0x000000000146f859 in ProcessQuery (plan=0x7f15b31e0e30, sourceText=0x7f15ae464060 "UPDATE columnar_test SET name = 'kuchiki' where id = 1;", params=0x0,dest=0x7f15ae465be8, completionTag=0x7f15b2d47f90 "") at pquery.cpp:291
---Type <return> to continue, or q <return> to quit---