状态机
可以放大观看。
HashJoinState
Hash Join运行期状态结构体
typedef struct HashJoinState
{JoinState js; /* 基类;its first field is NodeTag */ExprState *hashclauses;//hash连接条件List *hj_OuterHashKeys; /* 外表条件链表;list of ExprState nodes */List *hj_InnerHashKeys; /* 内表连接条件;list of ExprState nodes */List *hj_HashOperators; /* 操作符OIDs链表;list of operator OIDs */HashJoinTable hj_HashTable;//Hash表uint32 hj_CurHashValue;//当前的Hash值int hj_CurBucketNo;//当前的bucket编号int hj_CurSkewBucketNo;//行倾斜bucket编号HashJoinTuple hj_CurTuple;//当前元组TupleTableSlot *hj_OuterTupleSlot;//outer relation slotTupleTableSlot *hj_HashTupleSlot;//Hash tuple slotTupleTableSlot *hj_NullOuterTupleSlot;//用于外连接的outer虚拟slotTupleTableSlot *hj_NullInnerTupleSlot;//用于外连接的inner虚拟slotTupleTableSlot *hj_FirstOuterTupleSlot;//int hj_JoinState;//JoinState状态bool hj_MatchedOuter;//是否匹配bool hj_OuterNotEmpty;//outer relation是否为空
} HashJoinState;
HashJoinTable
typedef struct HashJoinTableData
{int nbuckets; /* 内存中的hash桶数;# buckets in the in-memory hash table */int log2_nbuckets; /* 2的对数(nbuckets必须是2的幂);its log2 (nbuckets must be a power of 2) */int nbuckets_original; /* 首次hash时的桶数;# buckets when starting the first hash */int nbuckets_optimal; /* 优化后的桶数(每个批次);optimal # buckets (per batch) */int log2_nbuckets_optimal; /* 2的对数;log2(nbuckets_optimal) *//* buckets[i] is head of list of tuples in i'th in-memory bucket *///bucket [i]是内存中第i个桶中的元组链表的head itemunion{/* unshared array is per-batch storage, as are all the tuples *///未共享数组是按批处理存储的,所有元组均如此struct HashJoinTupleData **unshared;/* shared array is per-query DSA area, as are all the tuples *///共享数组是每个查询的DSA区域,所有元组均如此dsa_pointer_atomic *shared;}buckets;bool keepNulls; /*如不匹配则存储NULL元组,该值为T;true to store unmatchable NULL tuples *///关于skew优化的变量bool skewEnabled; /*是否使用倾斜优化?;are we using skew optimization? */HashSkewBucket **skewBucket; /* 倾斜的hash表桶数;hashtable of skew buckets */int skewBucketLen; /* skewBucket数组大小;size of skewBucket array (a power of 2!) */int nSkewBuckets; /* 活动的倾斜桶数;number of active skew buckets */int *skewBucketNums; /* 活动倾斜桶数组索引;array indexes of active skew buckets */int nbatch; /* 批次数;number of batches */int curbatch; /* 当前批次,第一轮为0;current batch #; 0 during 1st pass */int nbatch_original; /* 在开始inner扫描时的批次;nbatch when we started inner scan */int nbatch_outstart; /* 在开始outer扫描时的批次;nbatch when we started outer scan */bool growEnabled; /* 关闭nbatch增加的标记;flag to shut off nbatch increases */double totalTuples; /* 从inner plan获得的元组数;# tuples obtained from inner plan */double partialTuples; /* 通过hashjoin获得的inner元组数;# tuples obtained from inner plan by me */double skewTuples; /* 倾斜元组数;# tuples inserted into skew tuples *//** 这些数组在散列连接的生命周期内分配,但仅当nbatch > 1时分配。* 只有当第一次将元组写入文件时,文件才会打开(否则它的指针将保持NULL)。* 注意,第0个数组元素永远不会被使用,因为批次0的元组永远不会转储.*/BufFile **innerBatchFile; /* 每个批次的inner虚拟临时文件缓存;buffered virtual temp file per batch */BufFile **outerBatchFile; /* 每个批次的outer虚拟临时文件缓存;buffered virtual temp file per batch *//** 有关正在散列的数据类型的特定于数据类型的散列函数的信息。* 这些数组的长度与散列连接子句(散列键)的数量相同。*/FmgrInfo *outer_hashfunctions; /* outer hash函数FmgrInfo结构体;lookup data for hash functions */FmgrInfo *inner_hashfunctions; /* inner hash函数FmgrInfo结构体;lookup data for hash functions */bool *hashStrict; /* 每个hash操作符是严格?is each hash join operator strict? */Size spaceUsed; /* 元组使用的当前内存空间大小;memory space currently used by tuples */Size spaceAllowed; /* 空间使用上限;upper limit for space used */Size spacePeak; /* 峰值的空间使用;peak space used */Size spaceUsedSkew; /* 倾斜哈希表的当前空间使用情况;skew hash table's current space usage */Size spaceAllowedSkew; /* 倾斜哈希表的使用上限;upper limit for skew hashtable */MemoryContext hashCxt; /* 整个散列连接存储的上下文;context for whole-hash-join storage */MemoryContext batchCxt; /* 该批次存储的上下文;context for this-batch-only storage *//* used for dense allocation of tuples (into linked chunks) *///用于密集分配元组(到链接块中)HashMemoryChunk chunks; /* 整个批次使用一个链表;one list for the whole batch *//* Shared and private state for Parallel Hash. *///并行hash使用的共享和私有状态HashMemoryChunk current_chunk; /* 后台进程的当前chunk;this backend's current chunk */dsa_area *area; /* 用于分配内存的DSA区域;DSA area to allocate memory from */ParallelHashJoinState *parallel_state;//并行执行状态ParallelHashJoinBatchAccessor *batches;//并行访问器dsa_pointer current_chunk_shared;//当前chunk的开始指针
} HashJoinTableData;
其他
在inner join第一次扫描中,可以从执行器得到tuple。
如果batch数目 > 1,那么不属于第一批的tuple将被保存在batch的inner的临时文件中。
在outer join中同理,不过我们将tuple写入batch的outer临时文件中
完成第一次扫描后,堆每批剩余的tuple做如下操作:
1、从inner 批处理文件中读取元组,加载到hash table中的bucket
2、从outer 批处理文件中读取元组,匹配hash bucket,然后输出结果。
参考
postgresql-13源码