上一篇讲解了brin index的基本概念以及页布局postgres源码解析54 Brin Index–1,后续会从源码角度对索引的构建、维护等方面进行深入讲解。
1 关键数据结构
2 brinbuild执行流程图
3 brinbuild 函数详解
1 首先调用brin_matepage_init初始化brin meta元数据页,并构造对应的XLOG日志填充入至WAL buffer中;
2 紧接着调用brinRevmapInitialize初始化brin revmap映射页、BrinBuildState结构体用于记录后续brin tuple状态信息;
3 按heap表物理块的顺序扫描,构造对应的brin index 元组信息,元组的构造流程由回调函数brinbuildCallback实现;
4 调用form_and_insert_tuple将索引元组插入brin regular常规页中,同时将此元组的TID信息记录至brin revmap映射页中;
5 为此插入动作构造XLOG日志并插入至WAL buffer中;
6 最后释放锁资源,如发生页扩展情况需更新对应的FSM信息;
/** brinbuild() -- build a new BRIN index.*/
IndexBuildResult *
brinbuild(Relation heap, Relation index, IndexInfo *indexInfo)
{IndexBuildResult *result;double reltuples;double idxtuples;BrinRevmap *revmap;BrinBuildState *state;Buffer meta;BlockNumber pagesPerRange;/** We expect to be called exactly once for any index relation.*/if (RelationGetNumberOfBlocks(index) != 0)elog(ERROR, "index \"%s\" already contains data",RelationGetRelationName(index));/** Critical section not required, because on error the creation of the* whole relation will be rolled back.*/meta = ReadBuffer(index, P_NEW);Assert(BufferGetBlockNumber(meta) == BRIN_METAPAGE_BLKNO);LockBuffer(meta, BUFFER_LOCK_EXCLUSIVE);brin_metapage_init(BufferGetPage(meta), BrinGetPagesPerRange(index),BRIN_CURRENT_VERSION);MarkBufferDirty(meta);if (RelationNeedsWAL(index)){xl_brin_createidx xlrec;XLogRecPtr recptr;Page page;xlrec.version = BRIN_CURRENT_VERSION;xlrec.pagesPerRange = BrinGetPagesPerRange(index);XLogBeginInsert();XLogRegisterData((char *) &xlrec, SizeOfBrinCreateIdx);XLogRegisterBuffer(0, meta, REGBUF_WILL_INIT | REGBUF_STANDARD);recptr = XLogInsert(RM_BRIN_ID, XLOG_BRIN_CREATE_INDEX);page = BufferGetPage(meta);PageSetLSN(page, recptr);}UnlockReleaseBuffer(meta);/** Initialize our state, including the deformed tuple state.*/revmap = brinRevmapInitialize(index, &pagesPerRange, NULL);state = initialize_brin_buildstate(index, revmap, pagesPerRange);/** Now scan the relation. No syncscan allowed here because we want the* heap blocks in physical order.*/reltuples = table_index_build_scan(heap, index, indexInfo, false, true,brinbuildCallback, (void *) state, NULL);/* process the final batch */form_and_insert_tuple(state);/* release resources */idxtuples = state->bs_numtuples;brinRevmapTerminate(state->bs_rmAccess);terminate_brin_buildstate(state);/** Return statistics*/result = (IndexBuildResult *) palloc(sizeof(IndexBuildResult));result->heap_tuples = reltuples;result->index_tuples = idxtuples;return result;
}
4 brin_form_tuple
brin tuple在内存中形式为BrinMemTuple,磁盘形式为BrinTuple,因此在写入磁盘前需要将BrinMemTuple转换成BrinTuple;其执行流程为:
1 首先根据brdesc->bd_totalstored为values、nulls、phony_nullbitmap与untoasted_values数组申请内存空间;
2 遍历brdesc->bd_tupdesc->natts属性,检查tuple是否存在空值,如果存在的需要将在nulls数组的对应元素置为true;
3 后续依次将tuple中的数据读出,并填充至values数组中;
4 遍历完brin index所有属性后,开始计算磁盘形式Brin index的长度lens;
5 申请大小为lens的内存空间rettuple,填充rettuple->bt_blkno与rettuple->bt_info属性,后续调用heap_fill_tuple将values数组中的数值依次填充至rettuple的数据域区;
6 后续填充bitmap区域,设置null 位码;
7 最后更新bt_info标识信息,返回rettuple地址。
/** Generate a new on-disk tuple to be inserted in a BRIN index.** See brin_form_placeholder_tuple if you touch this.*/
BrinTuple *
brin_form_tuple(BrinDesc *brdesc, BlockNumber blkno, BrinMemTuple *tuple,Size *size)
{Datum *values;bool *nulls;bool anynulls = false;BrinTuple *rettuple;int keyno;int idxattno;uint16 phony_infomask = 0;bits8 *phony_nullbitmap;Size len,hoff,data_len;int i;#ifdef TOAST_INDEX_HACKDatum *untoasted_values;int nuntoasted = 0;
#endifAssert(brdesc->bd_totalstored > 0);values = (Datum *) palloc(sizeof(Datum) * brdesc->bd_totalstored);nulls = (bool *) palloc0(sizeof(bool) * brdesc->bd_totalstored);phony_nullbitmap = (bits8 *)palloc(sizeof(bits8) * BITMAPLEN(brdesc->bd_totalstored));#ifdef TOAST_INDEX_HACKuntoasted_values = (Datum *) palloc(sizeof(Datum) * brdesc->bd_totalstored);
#endif/** Set up the values/nulls arrays for heap_fill_tuple*/idxattno = 0;for (keyno = 0; keyno < brdesc->bd_tupdesc->natts; keyno++){int datumno;/** "allnulls" is set when there's no nonnull value in any row in the* column; when this happens, there is no data to store. Thus set the* nullable bits for all data elements of this column and we're done.*/if (tuple->bt_columns[keyno].bv_allnulls){for (datumno = 0;datumno < brdesc->bd_info[keyno]->oi_nstored;datumno++)nulls[idxattno++] = true;anynulls = true;continue;}/** The "hasnulls" bit is set when there are some null values in the* data. We still need to store a real value, but the presence of* this means we need a null bitmap.*/if (tuple->bt_columns[keyno].bv_hasnulls)anynulls = true;/** Now obtain the values of each stored datum. Note that some values* might be toasted, and we cannot rely on the original heap values* sticking around forever, so we must detoast them. Also try to* compress them.*/for (datumno = 0;datumno < brdesc->bd_info[keyno]->oi_nstored;datumno++){Datum value = tuple->bt_columns[keyno].bv_values[datumno];#ifdef TOAST_INDEX_HACK/* We must look at the stored type, not at the index descriptor. */TypeCacheEntry *atttype = brdesc->bd_info[keyno]->oi_typcache[datumno];/* Do we need to free the value at the end? */bool free_value = false;/* For non-varlena types we don't need to do anything special */if (atttype->typlen != -1){values[idxattno++] = value;continue;}/** Do nothing if value is not of varlena type. We don't need to* care about NULL values here, thanks to bv_allnulls above.** If value is stored EXTERNAL, must fetch it so we are not* depending on outside storage.** XXX Is this actually true? Could it be that the summary is* NULL even for range with non-NULL data? E.g. degenerate bloom* filter may be thrown away, etc.*/if (VARATT_IS_EXTERNAL(DatumGetPointer(value))){value = PointerGetDatum(heap_tuple_fetch_attr((struct varlena *)DatumGetPointer(value)));free_value = true;}/** If value is above size target, and is of a compressible datatype,* try to compress it in-line.*/if (!VARATT_IS_EXTENDED(DatumGetPointer(value)) &&VARSIZE(DatumGetPointer(value)) > TOAST_INDEX_TARGET &&(atttype->typstorage == 'x' || atttype->typstorage == 'm')){Datum cvalue = toast_compress_datum(value);if (DatumGetPointer(cvalue) != NULL){/* successful compression */if (free_value)pfree(DatumGetPointer(value));value = cvalue;free_value = true;}}/** If we untoasted / compressed the value, we need to free it* after forming the index tuple.*/if (free_value)untoasted_values[nuntoasted++] = value;#endifvalues[idxattno++] = value;}}/* Assert we did not overrun temp arrays */Assert(idxattno <= brdesc->bd_totalstored);/* compute total space needed */len = SizeOfBrinTuple;if (anynulls){/** We need a double-length bitmap on an on-disk BRIN index tuple; the* first half stores the "allnulls" bits, the second stores* "hasnulls".*/len += BITMAPLEN(brdesc->bd_tupdesc->natts * 2);}len = hoff = MAXALIGN(len);data_len = heap_compute_data_size(brtuple_disk_tupdesc(brdesc),values, nulls);len += data_len;len = MAXALIGN(len);rettuple = palloc0(len);rettuple->bt_blkno = blkno;rettuple->bt_info = hoff;/* Assert that hoff fits in the space available */Assert((rettuple->bt_info & BRIN_OFFSET_MASK) == hoff); // BRIN_OFFSET_MASK= 0x1F (00011111)/** The infomask and null bitmap as computed by heap_fill_tuple are useless* to us. However, that function will not accept a null infomask; and we* need to pass a valid null bitmap so that it will correctly skip* outputting null attributes in the data area.*/heap_fill_tuple(brtuple_disk_tupdesc(brdesc),values,nulls,(char *) rettuple + hoff,data_len,&phony_infomask,phony_nullbitmap);/* done with these */pfree(values);pfree(nulls);pfree(phony_nullbitmap);#ifdef TOAST_INDEX_HACKfor (i = 0; i < nuntoasted; i++)pfree(DatumGetPointer(untoasted_values[i]));
#endif/** Now fill in the real null bitmasks. allnulls first.*/if (anynulls){bits8 *bitP;int bitmask;rettuple->bt_info |= BRIN_NULLS_MASK;/** Note that we reverse the sense of null bits in this module: we* store a 1 for a null attribute rather than a 0. So we must reverse* the sense of the att_isnull test in brin_deconstruct_tuple as well.*/bitP = ((bits8 *) ((char *) rettuple + SizeOfBrinTuple)) - 1;bitmask = HIGHBIT;for (keyno = 0; keyno < brdesc->bd_tupdesc->natts; keyno++){if (bitmask != HIGHBIT)bitmask <<= 1;else{bitP += 1;*bitP = 0x0;bitmask = 1;}if (!tuple->bt_columns[keyno].bv_allnulls)continue;*bitP |= bitmask;}/* hasnulls bits follow */for (keyno = 0; keyno < brdesc->bd_tupdesc->natts; keyno++){if (bitmask != HIGHBIT)bitmask <<= 1;else{bitP += 1;*bitP = 0x0;bitmask = 1;}if (!tuple->bt_columns[keyno].bv_hasnulls)continue;*bitP |= bitmask;}bitP = ((bits8 *) (rettuple + SizeOfBrinTuple)) - 1;}if (tuple->bt_placeholder)rettuple->bt_info |= BRIN_PLACEHOLDER_MASK;*size = len;return rettuple;
}
5 brin_doinsert 函数
经过函数brin_form_tuple对内存形式BrinMemTuple加工生成磁盘形式Brin tuple,后进入真正的插入操作,由brin_doinsert函数实现。执行流程如下:
1 首先进行安全检查,判断待插入元组大小是否超过单个brin index元组最大阈值,如果是则写错误日志信息,返回InvalidOffsetNumber;
2 确保索引条目所在heap页域是否对应当前映射页,如果不对应,需进行扩展或者重用某映射页;
3 如果待插入常规页所在缓冲块有效,获取缓冲块排它锁;检查此常规页是否有足够空间容纳待插入索引,不能的话则释放缓冲块排它锁,将缓冲块号*buffer置为InvalidBuffer;
4 如果缓冲块号为InvalidBuffer,则循环调用brin_getinsertbuffer函数找到可用的brin buffer;
5 对步骤2中的revmap buffer施加缓冲块排它锁;
6 获取步骤4中brin buffer对应常规页地址page和常规页块号blk;
7 调用PageAddItem函数向page中插入索引元组,返回其偏移量off;
8 将上述blk与off信息封装成TID插入revmap映射页中,并为此操作生成对应的XLOG;
9 释放锁资源,如果发生brin index常规页扩展情况,需要更新对应的FSM信息。
OffsetNumber
brin_doinsert(Relation idxrel, BlockNumber pagesPerRange,BrinRevmap *revmap, Buffer *buffer, BlockNumber heapBlk,BrinTuple *tup, Size itemsz)
{Page page;BlockNumber blk;OffsetNumber off;Size freespace = 0;Buffer revmapbuf;ItemPointerData tid;bool extended;Assert(itemsz == MAXALIGN(itemsz));/* If the item is oversized, don't even bother. */if (itemsz > BrinMaxItemSize){ereport(ERROR,(errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),errmsg("index row size %zu exceeds maximum %zu for index \"%s\"",itemsz, BrinMaxItemSize, RelationGetRelationName(idxrel))));return InvalidOffsetNumber; /* keep compiler quiet */}/* Make sure the revmap is long enough to contain the entry we need */brinRevmapExtend(revmap, heapBlk);/** Acquire lock on buffer supplied by caller, if any. If it doesn't have* enough space, unpin it to obtain a new one below.*/if (BufferIsValid(*buffer)){/** It's possible that another backend (or ourselves!) extended the* revmap over the page we held a pin on, so we cannot assume that* it's still a regular page.*/LockBuffer(*buffer, BUFFER_LOCK_EXCLUSIVE);if (br_page_get_freespace(BufferGetPage(*buffer)) < itemsz){UnlockReleaseBuffer(*buffer);*buffer = InvalidBuffer;}}/** If we still don't have a usable buffer, have brin_getinsertbuffer* obtain one for us.*/if (!BufferIsValid(*buffer)){do*buffer = brin_getinsertbuffer(idxrel, InvalidBuffer, itemsz, &extended);while (!BufferIsValid(*buffer));}elseextended = false;/* Now obtain lock on revmap buffer */revmapbuf = brinLockRevmapPageForUpdate(revmap, heapBlk);page = BufferGetPage(*buffer);blk = BufferGetBlockNumber(*buffer);/* Execute the actual insertion */START_CRIT_SECTION();if (extended)brin_page_init(page, BRIN_PAGETYPE_REGULAR);off = PageAddItem(page, (Item) tup, itemsz, InvalidOffsetNumber,false, false);if (off == InvalidOffsetNumber)elog(ERROR, "failed to add BRIN tuple to new page");MarkBufferDirty(*buffer);/* needed to update FSM below */if (extended)freespace = br_page_get_freespace(page);ItemPointerSet(&tid, blk, off);brinSetHeapBlockItemptr(revmapbuf, pagesPerRange, heapBlk, tid);MarkBufferDirty(revmapbuf);/* XLOG stuff */if (RelationNeedsWAL(idxrel)){xl_brin_insert xlrec;XLogRecPtr recptr;uint8 info;info = XLOG_BRIN_INSERT | (extended ? XLOG_BRIN_INIT_PAGE : 0);xlrec.heapBlk = heapBlk;xlrec.pagesPerRange = pagesPerRange;xlrec.offnum = off;XLogBeginInsert();XLogRegisterData((char *) &xlrec, SizeOfBrinInsert);XLogRegisterBuffer(0, *buffer, REGBUF_STANDARD | (extended ? REGBUF_WILL_INIT : 0));XLogRegisterBufData(0, (char *) tup, itemsz);XLogRegisterBuffer(1, revmapbuf, 0);recptr = XLogInsert(RM_BRIN_ID, info);PageSetLSN(page, recptr);PageSetLSN(BufferGetPage(revmapbuf), recptr);}END_CRIT_SECTION();/* Tuple is firmly on buffer; we can release our locks */LockBuffer(*buffer, BUFFER_LOCK_UNLOCK);LockBuffer(revmapbuf, BUFFER_LOCK_UNLOCK);BRIN_elog((DEBUG2, "inserted tuple (%u,%u) for range starting at %u",blk, off, heapBlk));if (extended){RecordPageWithFreeSpace(idxrel, blk, freespace);FreeSpaceMapVacuumRange(idxrel, blk, blk + 1);}return off;
}