目录
- InitRowBuffer(101行~126行)
- InitProbeIterator(142行~153行)
- *HashJoinIterator* 的Init(155行~240行)
- InitializeChunkFiles(364行~401行)
- InitWritingToProbeRowSavingFile(1115~1119)
- InitReadingFromProbeRowSavingFile(1121~1126)
让我们接着上一篇分析BuildHashTable函数细节步骤继续吧,这些函数都在hash_join_iterator.cc
文件中。
InitRowBuffer(101行~126行)
需要注意的两个步骤
1、初始化row buffer,使用到一个seed。具体哈希计算算法我并没有特定去了解,这里也就不深入了
kHashTableSeed是个没有意义的数字,是用于计算hash的key。一个种子在内存哈希表中进行哈希运算,一个种子计算用于确定chunk 文件应该放置的行。如果两个操作使用同一个种子,将会把块文件加载到哈希表中,这样是错误的。
2、初始化row buffer后,需要调整迭代器的指向,将其头尾均指向row buffer的尾部。
......
if (m_row_buffer.Init(kHashTableSeed)) {DBUG_ASSERT(thd()->is_error()); // my_error should have been called.return true;
}m_hash_map_iterator = m_row_buffer.end();
m_hash_map_end = m_row_buffer.end();
return false;
InitProbeIterator(142行~153行)
1、初始化m_probe_input迭代器(为一个RowIterator)
2、判断m_probe_input_batch_mode是否为真(默认为false,指的是不开启批处理模式),为真的话就调用StartPSIBatchMode
if (m_probe_input->Init()) {return true;}if (m_probe_input_batch_mode) {m_probe_input->StartPSIBatchMode();}return false;
HashJoinIterator 的Init(155行~240行)
1、准备从build(驱动表)输入中读取行数据到哈希表中,用于构建哈希表
PrepareForRequestRowId(m_build_input_tables.tables(),m_tables_to_get_rowid_for);
函数调用栈:
|PrepareForRequestRowId
||prepare_for_position
涉及到使用主键去找rows,必须用主键字段扩展读取位图
然后初始化build的迭代器(为一个RowIterator)
2、初始化一些乱七八糟的变量
//默认在内存中做所有操作,并且没有任何对哈希表的重新填充操作。每个输入都只读一次,不会对磁盘写入任何数据。
m_hash_join_type = HashJoinType::IN_MEMORY;
//不需要把被驱动表读出来的行数据写到saving files中。因为只有当哈希连接生成块不能完全放到内存中 或者 哈希连接不能延伸到磁盘时,即probe输入行需要多次Read才需要开启probe行保存
m_write_to_probe_row_saving = false;
//很显然此时build迭代器读取的buffer中是有行数据的,当build input数据被消耗掉,停止hash join 迭代器请求更多行
m_build_iterator_has_more_rows = true;
//开启批处理模式
m_probe_input->EndPSIBatchModeIfStarted();
//如过外连接溢出到磁盘上操作,那么probe行可以和我们未看到的build input中的row匹配,若匹配为true。这里默认是内存里操作,所以初始化为false
m_probe_row_match_flag = false;
3、计算build row、probe row 占的内存大小
计算给定表单行数据需要占多大的byte,记录上界,这样之后pack的数据长度总是会比这个长度短。
upper_row_size 为build row、probe row两者的最大值。
size_t upper_row_size = 0;if (!m_build_input_tables.has_blob_column()) {upper_row_size =hash_join_buffer::ComputeRowSizeUpperBound(m_build_input_tables);}if (!m_probe_input_tables.has_blob_column()) {upper_row_size = std::max(upper_row_size,hash_join_buffer::ComputeRowSizeUpperBound(m_probe_input_tables));}
4、一个看不懂的操作,将一个string类型的变量reverse了一下
if (m_temporary_row_and_join_key_buffer.reserve(upper_row_size)) {my_error(ER_OUTOFMEMORY, MYF(0), upper_row_size);return true; // oom
}
5、如果一个表包含一个geometry列,确保该数据复制到行缓冲区,而不是只设置指向数据的指针。因为当hash join溢出到磁盘上,需要从块文件读回一行,行数据存储在一个临时缓冲区中,当临时缓冲区用于其他用途时,字段指向的数据将变为无效。
MarkCopyBlobsIfTableContainsGeometry(m_probe_input_tables);MarkCopyBlobsIfTableContainsGeometry(m_build_input_tables);
6、chunk 数组、标志clear操作
//m_chunk_files_on_disk数组来保存磁盘上的块文件列表,以防我们降级为磁盘上的散列联接.此时需要clear
m_chunk_files_on_disk.clear();
//build 和 probe 当前从hash join chunk中读取的是第0行
m_build_chunk_current_row = 0;
m_probe_chunk_current_row = 0;
//现在不使用任何一种hash join chunk
m_current_chunk = -1;
7、对于每个给定的表,如果需要,请求填写行ID(相当于调用file->position())
PrepareForRequestRowId(m_probe_input_tables.tables(),m_tables_to_get_rowid_for);
8、构建哈希表
// Build the hash table
if (BuildHashTable()) {DBUG_ASSERT(thd()->is_error() ||thd()->killed); // my_error should have been called.return true;
}
9、当build 和 probe 都没有row数据了,返回。
if (m_state == State::END_OF_ROWS) {// BuildHashTable() decided that the join is done (the build input is// empty, and we are in an inner-/semijoin. Anti-/outer join must output// NULL-complemented rows from the probe input).return false;}
10、如果是反连接并且join情况数组为空 并且 没有其他情况并且此时row buffer中仍然有数据。说明此时不需要输出任何东西,因为所有数据都在哈希表中。
if (m_join_type == JoinType::ANTI && m_join_conditions.empty() &&m_extra_condition == nullptr && !m_row_buffer.empty()) {// For degenerate antijoins, we know we will never output anything// if there's anything in the hash table, so we can end right away.// (We also don't need to read more than one row, but// CreateHashJoinAccessPath() has already added a LIMIT 1 for us// in this case.)m_state = State::END_OF_ROWS;return false;}
11、初始化probe迭代器
return InitProbeIterator();
InitializeChunkFiles(364行~401行)
初始化两个input的hashjoinchunk。
先估计需要多少块,有一个来源是planner估计的。此外可以假设当前行缓冲区代表了总体的行密度,将估计的剩余row数量/目前读到的row数量,就能得到chunk数。
1、缩减后的哈希表中的行 = 缩减因子* 哈希表中行数 这是一种保护措施,因为我们宁愿得到一个或两个额外的块,而不必多次重新读取探测输入。
constexpr double kReductionFactor = 0.9;
const size_t reduced_rows_in_hash_table =std::max<size_t>(1, rows_in_hash_table * kReductionFactor);
2、剩余行数 = max(哈希表中行数,planner估计的join的行数) - 哈希表中的行数
const size_t remaining_rows =std::max(rows_in_hash_table, estimated_rows_produced_by_join) -rows_in_hash_table;
3、 需要的chunk数 = max(剩余行数 / 缩减后的哈希表中的行数)
const size_t chunks_needed = std::max<size_t>(1, std::ceil(remaining_rows / reduced_rows_in_hash_table));
4、真正的chunk数 = min(最大chunk文件数,需要的chunk数) 限制每个输入的块数,这样就不会冒着达到服务器对打开文件数的限制的风险。
const size_t num_chunks = std::min(max_chunk_files, chunks_needed);
5、确保chunk数目是偶数,因为我们join的时候是按照probe和build的一对chunk进行join的
const size_t num_chunks_pow_2 = my_round_up_to_next_power(num_chunks);
6、调整chunk数目到偶数;然后对每对chunk进行初始化,一个是buildchunk一个是probechunk
chunk_pairs->resize(num_chunks_pow_2);for (ChunkPair &chunk_pair : *chunk_pairs) {if (chunk_pair.build_chunk.Init(build_tables, /*uses_match_flags=*/false) ||chunk_pair.probe_chunk.Init(probe_tables,include_match_flag_for_probe)) {my_error(ER_TEMP_FILE_WRITE_FAILURE, MYF(0));return true;}}
对于Init的两个参数解释:
tables – 行数据存储在哪个input
uses_match_flags – 是否应在每行前面加上匹配标志,表示该行是否有匹配行。
InitWritingToProbeRowSavingFile(1115~1119)
标记已经启用probe row saving,并准备probe row saving file以供写入
m_write_to_probe_row_saving = true;return m_probe_row_saving_write_file.Init(m_probe_input_tables,m_join_type == JoinType::OUTER);
至于HashJoinChunk::Init函数我们就不去细究了,它涉及到了IOcache操作。
InitReadingFromProbeRowSavingFile(1121~1126)
标记我们从probe row saving files中读取数据。然后将saving files 倒回开头
m_probe_row_saving_read_file = std::move(m_probe_row_saving_write_file);
m_probe_row_saving_read_file_current_row = 0;
m_read_from_probe_row_saving = true;
return m_probe_row_saving_read_file.Rewind();
1、将write file内容传给read file,同时使用move,避免拷贝赋值,仅仅改动指针指向
2、初始化当前应该读的行数为0,表示还没开始读
3、确定我们应当从probe row saving read files中读取数据
4、Rewind函数将file buffer 清除,准备读的新数据腾出空间