create table t1(
c1 int primary key,
c2 int
);
create table t2(
d1 int primary key,
d2 int
);
insert into t1 values(1,1);
insert into t1 values(2,2);
insert into t1 values(3,3);
insert into t1 values(4,4);
insert into t1 values(5,5);
insert into t1 values(6,6);
insert into t1 values(7,7);
insert into t2 values(1,1);
insert into t2 values(2,2);
insert into t2 values(3,3);
insert into t2 values(4,4);
insert into t2 values(5,5);
insert into t2 values(6,6);
insert into t2 values(7,7);
在Mysql8.0.20之后(包含),Mysql在实际执行过程中大量的使用了HashJoin
,只要是以前能够用到BNL(Blocked Nested Loop Join)
的地方,都会选择HashJoin
的方式来代替,众所周知,HashJoin
只能适用于等值查询,譬如下面的语句:
explain format=tree SELECT * FROM t1 JOIN t2 ON t1.c2=t2.d2;
/*explain的结果如下*/
| EXPLAIN |
+-----------------------------------------------------------------------------------------------------------------------------------------------------------------+
| -> Inner hash join (t2.d2 = t1.c2) (cost=6.10 rows=7)
-> Table scan on t2 (cost=0.05 rows=7)
-> Hash
-> Table scan on t1 (cost=0.95 rows=7)
|
因为上面join列都没有使用到索引,所以对于这个语句,Mysql会优先使用HashJoin
,但是如果使用到了索引,那么就依旧会采用Nested Loop Join
的方式,比如下面的语句:
explain format=tree SELECT * FROM t1 JOIN t2 ON t1.c1=t2.d2;
| EXPLAIN |
+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| -> Nested loop inner join (cost=3.40 rows=7)
-> Filter: (t2.d2 is not null) (cost=0.95 rows=7)
-> Table scan on t2 (cost=0.95 rows=7)
-> Single-row index lookup on t1 using PRIMARY (c1=t2.d2) (cost=0.26 rows=1)
|
重点来了,在Mysql中,即使join条件没有采用等值的方式,那么也会采用HashJoin的方式,比如下面的语句:
explain format=tree SELECT * FROM t1 JOIN t2 ON t1.c2 > t2.d2;
----------------------------------------------+
| EXPLAIN |
+-------------------------------------------------------------------------------------------------------------+
/*可以看到,join条件变成了filter的方式,但是join方式还是采用了HashJoin的方式*/
| -> Filter: (t1.c2 > t2.d2) (cost=6.10 rows=16)
-> Inner hash join (no condition) (cost=6.10 rows=16)
-> Table scan on t2 (cost=0.07 rows=7)
-> Hash
-> Table scan on t1 (cost=0.95 rows=7)
|
可以看到,join条件变成了filter
的方式,但是join
方式还是采用了HashJoin
的方式,那么Mysql是怎么实现非等值情况下的HashJoin
的呢?这里我们分析下代码。
本示例代码采用如下的sql语句来进行示例:
SELECT * FROM t1 JOIN t2 ON t1.c1=t2.d2
上述语句在Mysql里面使用到的Iterator
如下:
HashJoinIterator
TableScanIterator
TableScanIterator
在HashJoin中,主要是流程如下:
左表扫描数据 -> 构建Hash表 -> 右表扫描数据 -> 在Hash进行probe操作 -> 将结果返回给用户
重要函数调用的流程图大体如下:
左表扫描数据并构建Hash表的流程
sql_union.cc
里面的ExecuteIteratorQuery
函数,会在执行前对iterator
进行如下操作: if (m_root_iterator->Init()) {
return true;
}
HashJoinIterator
的Init
函数介绍如下: // Prepare to read the build input into the hash map.
// m_build_input:为做表的对象,这里也就是t1表的TableScanIterator,先进行init操作,里面其实就是准备扫描表相关的准备工作
if (m_build_input->Init()) {
DBUG_ASSERT(thd()->is_error()); // my_error should have been called.
return true;
}
// We always start out by doing everything in memory.
m_hash_join_type = HashJoinType::IN_MEMORY;
m_write_to_probe_row_saving = false;
//省略部分代码
// Build the hash table
// 扫描左表,第一次进行构建Hash表的操作,这是一个比较重要的函数
if (BuildHashTable()) {
DBUG_ASSERT(thd()->is_error()); // my_error should have been called.
return true;
}
if (m_state == State::END_OF_ROWS) {
// BuildHashTable() decided that the join is done (the build input is
// empty, and we are in an inner-/semijoin. Anti-/outer join must output
// NULL-complemented rows from the probe input).
return false;
}
// 进行右表的准备工作。
return InitProbeIterator();
BuildHashTable()
函数如下: if (!m_build_iterator_has_more_rows) {
m_state = State::END_OF_ROWS;
return false;
}
//省略部分代码
// 初始化m_row_buffer成员变量,该变量本质上存储了一个std::unordered_multimap对象
if (InitRowBuffer()) {
return true;
}
const bool reject_duplicate_keys = RejectDuplicateKeys();
const bool store_rows_with_null_in_join_key = m_join_type == JoinType::OUTER;
// If Init() is called multiple times (e.g., if hash join is inside an
// dependent subquery), we must clear the NULL row flag, as it may have been
// set by the previous executing of this hash join.
m_build_input->SetNullRowFlag(/*is_null_row=*/false);
PFSBatchMode batch_mode(m_build_input.get());
//在Init阶段,Mysql会尽可能的多从底层读取数据构建Hash表。
for (;;) { // Termination condition within loop.
int res = m_build_input->Read();
if (res == 1) {
DBUG_ASSERT(thd()->is_error()); // my_error should have been called.
return true;
}
//省略部分代码
//将底层扫描的数据放在m_row_buffer map里面,该函数下面会详细介绍。
const hash_join_buffer::StoreRowResult store_row_result =
m_row_buffer.StoreRow(thd(), reject_duplicate_keys,
store_rows_with_null_in_join_key);
//校验返回的结果,分别为:存储成功,buffer满了需要下盘和致命错误三种情况。
switch (store_row_result) {
case hash_join_buffer::StoreRowResult::ROW_STORED:
break;
case hash_join_buffer::StoreRowResult::BUFFER_FULL: {
// The row buffer is full, so start spilling to disk (if allowed). Note
// that the row buffer checks for OOM _after_ the row was inserted, so
// we should always manage to insert at least one row.
DBUG_ASSERT(!m_row_buffer.empty());
// If we are not allowed to spill to disk, just go on to reading from
// the probe iterator.
if (!m_allow_spill_to_disk) {
if (m_join_type != JoinType::INNER) {
// Enable probe row saving, so that unmatched probe rows are written
// to the probe row saving file. After the next refill of the hash
// table, we will read rows from the probe row saving file, ensuring
// that we only read unmatched probe rows.
InitWritingToProbeRowSavingFile();
}
SetReadingProbeRowState();
return false;
}
// Ideally, we would use the estimated row count from the iterator. But
// not all iterators has the row count available (i.e.
// RemoveDuplicatesIterator), so get the row count directly from the
// QEP_TAB.
const QEP_TAB *last_table_in_join =
m_build_input_tables.tables().back().qep_tab;
if (InitializeChunkFiles(
last_table_in_join->position()->prefix_rowcount,
m_row_buffer.size(), kMaxChunks, m_probe_input_tables,
m_build_input_tables,
/*include_match_flag_for_probe=*/m_join_type == JoinType::OUTER,
&m_chunk_files_on_disk)) {
DBUG_ASSERT(thd()->is_error()); // my_error should have been called.
return true;
}
// Write out the remaining rows from the build input out to chunk files.
// The probe input will be written out to chunk files later; we will do
// it _after_ we have checked the probe input for matches against the
// rows that are already written to the hash table. An alternative
// approach would be to write out the remaining rows from the build
// _and_ the rows that already are in the hash table. In that case, we
// could also write out the entire probe input to disk here as well. But
// we don not want to waste the rows that we already have stored in
// memory.
//
// We never write out rows with NULL in condition for the build/right
// input, as these rows will never match in a join condition.
if (WriteRowsToChunks(thd(), m_build_input.get(), m_build_input_tables,
m_join_conditions, kChunkPartitioningHashSeed,
&m_chunk_files_on_disk,
true /* write_to_build_chunks */,
false /* write_rows_with_null_in_join_key */,
&m_temporary_row_and_join_key_buffer)) {
DBUG_ASSERT(thd()->is_error()); // my_error should have been called.
return true;
}
// Flush and position all chunk files from the build input at the
// beginning.
for (ChunkPair &chunk_pair : m_chunk_files_on_disk) {
if (chunk_pair.build_chunk.Rewind()) {
DBUG_ASSERT(
thd()->is_error()); // my_error should have been called.
return true;
}
}
SetReadingProbeRowState();
return false;
}
case hash_join_buffer::StoreRowResult::FATAL_ERROR:
// An unrecoverable error. Most likely, malloc failed, so report OOM.
// Note that we cannot say for sure how much memory we tried to allocate
// when failing, so just report ‘join_buffer_size‘ as the amount of
// memory we tried to allocate.
my_error(ER_OUTOFMEMORY, MYF(ME_FATALERROR),
thd()->variables.join_buff_size);
return true;
}
}
HashJoinRowBuffer::StoreRow
函数
StoreRowResult HashJoinRowBuffer::StoreRow(
THD *thd, bool reject_duplicate_keys,
bool store_rows_with_null_in_condition) {
// Make the key from the join conditions.
m_buffer.length(0);
//所有的condition都是equal的形式,append_join_key_for_hash_join函数里面会根据不同的数据类型,将数据拷贝到m_buffer里面。
for (const HashJoinCondition &hash_join_condition : m_join_conditions) {
bool null_in_join_condition =
hash_join_condition.join_condition()->append_join_key_for_hash_join(
thd, m_tables.tables_bitmap(), hash_join_condition, &m_buffer);
if (null_in_join_condition && !store_rows_with_null_in_condition) {
// SQL NULL values will never match in an inner join or semijoin, so skip
// the row.
return StoreRowResult::ROW_STORED;
}
}
// TODO(efroseth): We should probably use an unordered_map instead of multimap
// for these cases so we do not have to hash and lookup twice.
//有的语句是会reject 重复key的,所以这里专门做一个检查。
if (reject_duplicate_keys &&
contains(Key(pointer_cast<const uchar *>(m_buffer.ptr()),
m_buffer.length()))) {
return StoreRowResult::ROW_STORED;
}
// Allocate the join key on the same MEM_ROOT that the hash table is
// allocated on, so it has the same lifetime as the rest of the contents in
// the hash map (until Clear() is called on the HashJoinBuffer).
//从m_buffer中拷贝join key到join_key_data中,方便后续放入map中的操作
const size_t join_key_size = m_buffer.length();
uchar *join_key_data = nullptr;
if (join_key_size > 0) {
join_key_data = m_mem_root.ArrayAlloc<uchar>(join_key_size);
if (join_key_data == nullptr) {
return StoreRowResult::FATAL_ERROR;
}
memcpy(join_key_data, m_buffer.ptr(), join_key_size);
}
//将行中的具体内容拷贝m_buffer中
// Save the contents of all columns marked for reading.
if (StoreFromTableBuffers(m_tables, &m_buffer)) {
return StoreRowResult::FATAL_ERROR;
}
// Give the row the same lifetime as the hash map, by allocating it on the
// same MEM_ROOT as the hash map is allocated on.
//从m_buffer中取出具体的数据内容,然后放在row对象中,方便后面插入到map中,目前的版本中,BufferRow对象只是Key的一个重命名,本质上两者是同一个类对象。
const size_t row_size = m_buffer.length();
uchar *row = nullptr;
if (row_size > 0) {
row = m_mem_root.ArrayAlloc<uchar>(row_size);
if (row == nullptr) {
return StoreRowResult::FATAL_ERROR;
}
memcpy(row, m_buffer.ptr(), row_size);
}
//将查询到的数据和join key封装为Key和BufferRow对象,放入到map中
m_last_row_stored = m_hash_map->emplace(Key(join_key_data, join_key_size),
BufferRow(row, row_size));
if (m_mem_root.allocated_size() > m_max_mem_available) {
return StoreRowResult::BUFFER_FULL;
}
return StoreRowResult::ROW_STORED;
}
InitProbeIterator
函数
bool HashJoinIterator::InitProbeIterator() {
DBUG_ASSERT(m_state == State::READING_ROW_FROM_PROBE_ITERATOR);
//本质上是一个TableScanIterator,因此这里也只是简单的Init操作,并没有进行真正的probe操作
if (m_probe_input->Init()) {
return true;
}
if (m_probe_input_batch_mode) {
m_probe_input->StartPSIBatchMode();
}
return false;
}
右表扫描数据并进行probe
操作。
首先绝大多数情况下,probe
操作是在Read
阶段执行的,如下ExecuteIteratorQuery
函数里面的Read
循环。
for (;;) {
//在这个例子里面,就是HashJoinIterator进行Read操作。
int error = m_root_iterator->Read();
DBUG_EXECUTE_IF("bug13822652_1", thd->killed = THD::KILL_QUERY;);
if (error > 0 || thd->is_error()) // Fatal error
return true;
else if (error < 0)
break;
else if (thd->killed) // Aborted by user
{
thd->send_kill_message();
return true;
}
++*send_records_ptr;
if (query_result->send_data(thd, *fields)) {
return true;
}
thd->get_stmt_da()->inc_current_row_for_condition();
}
HashJoinIterator::Read
函数如下:
int HashJoinIterator::Read() {
for (;;) {
if (thd()->killed) { // Aborted by user.
thd()->send_kill_message();
return 1;
}
switch (m_state) {
case State::LOADING_NEXT_CHUNK_PAIR:
if (ReadNextHashJoinChunk()) {
return 1;
}
break;
//大多数情况下,会进入到该分支。
case State::READING_ROW_FROM_PROBE_ITERATOR:
if (ReadRowFromProbeIterator()) {
return 1;
}
break;
//省略部分代码,省略的case大部分都是内存无法存下,需要下盘的情况。
case State::READING_FIRST_ROW_FROM_HASH_TABLE:
//开始从hashtable中获取数据,准备返回给用户
case State::READING_FROM_HASH_TABLE: {
//这里的操作是将HashTable里面的数据重新复制给qep_tab->table()里面的相关的成员变量中,具体的行数据存储在qep_tab->table()->record数组中。该函数下面的会介绍
const int res = ReadNextJoinedRowFromHashTable();
if (res == 0) {
// A joined row is ready, so send it to the client.
return 0;
}
if (res == -1) {
// No more matching rows in the hash table, or antijoin found a
// matching row. Read a new row from the probe input.
continue;
}
// An error occured, so abort the join.
DBUG_ASSERT(res == 1);
return res;
}
case State::END_OF_ROWS:
return -1;
}
}
// Unreachable.
DBUG_ASSERT(false);
return 1;
}
HashJoinIterator::ReadRowFromProbeIterator
函数如下:
bool HashJoinIterator::ReadRowFromProbeIterator() {
DBUG_ASSERT(m_current_chunk == -1);
//从右表中读取具体的数据
int result = m_probe_input->Read();
if (result == 1) {
DBUG_ASSERT(thd()->is_error()); // my_error should have been called.
return true;
}
if (result == 0) {
RequestRowId(m_probe_input_tables.tables());
// A row from the probe iterator is ready.
//开始从hash表中进行probe操作
LookupProbeRowInHashTable();
return false;
}
//下面的代码省略掉,这里只介绍最简单的场景
}
void HashJoinIterator::LookupProbeRowInHashTable()
函数如下:
void HashJoinIterator::LookupProbeRowInHashTable() {
if (m_join_conditions.empty()) {
// Skip the call to equal_range in case we don‘t have any join conditions.
// This can save up to 20% in case of multi-table joins.
m_hash_map_iterator = m_row_buffer.begin();
m_hash_map_end = m_row_buffer.end();
//因为没有join condition(本质上是一个笛卡尔积),所以这里不进行probe操作,直接更新state,下一次循环就可以直接从hashJoin中读取到join后的行
m_state = State::READING_FIRST_ROW_FROM_HASH_TABLE;
return;
}
// Extract the join key from the probe input, and use that key as the lookup
// key in the hash table.
//类似于HashJoinRowBuffer::StoreRow里面的操作,将probe里面的行构建成hashMap可以用的Key
bool null_in_join_key = ConstructJoinKey(
thd(), m_join_conditions, m_probe_input_tables.tables_bitmap(),
&m_temporary_row_and_join_key_buffer);
if (null_in_join_key) {
if (m_join_type == JoinType::ANTI || m_join_type == JoinType::OUTER) {
// SQL NULL was found, and we will never find a matching row in the hash
// table. Let us indicate that, so that a null-complemented row is
// returned.
m_hash_map_iterator = m_row_buffer.end();
m_hash_map_end = m_row_buffer.end();
m_state = State::READING_FIRST_ROW_FROM_HASH_TABLE;
} else {
SetReadingProbeRowState();
}
return;
}
hash_join_buffer::Key key(
pointer_cast<const uchar *>(m_temporary_row_and_join_key_buffer.ptr()),
m_temporary_row_and_join_key_buffer.length());
//这里是一个简单的trick,如果只关心第一行的数据,那么直接使用unordered_multimap的find函数可以更低的时间复杂度,否则使用equal_range函数
if ((m_join_type == JoinType::SEMI || m_join_type == JoinType::ANTI) &&
m_extra_condition == nullptr) {
// find() has a better average complexity than equal_range() (constant vs.
// linear in the number of matching elements). And for semijoins, we are
// only interested in the first match anyways, so this may give a nice
// speedup. An exception to this is if we have any "extra" conditions that
// needs to be evaluated after the hash table lookup, but before the row is
// returned; we may need to read through the entire hash table to find a row
// that satisfies the extra condition(s).
m_hash_map_iterator = m_row_buffer.find(key);
m_hash_map_end = m_row_buffer.end();
} else {
auto range = m_row_buffer.equal_range(key);
m_hash_map_iterator = range.first;
m_hash_map_end = range.second;
}
//ok,join后的数据已经准备好了,设置状态为,下一次循环直接将结果返回给用户
m_state = State::READING_FIRST_ROW_FROM_HASH_TABLE;
}
关于HashJoinIterator::ReadNextJoinedRowFromHashTable()
函数
int HashJoinIterator::ReadNextJoinedRowFromHashTable() {
int res;
bool passes_extra_conditions = false;
do {
//将jointable里面的行读取出来,拷贝qep_tab()->table()->records数组里。注意的是,之前存储的时候,采用的是pack的方式存储的,现在还原出来需要unpack。至于probe表的数据,因为已经在record数组里面的,所以这里也就不需要任何额外的操作了。
res = ReadJoinedRow();
// ReadJoinedRow() can only return 0 (row is ready) or -1 (EOF).
DBUG_ASSERT(res == 0 || res == -1);
// Evaluate any extra conditions that are attached to this iterator before
// we return a row.
if (res == 0) {
//一些额外的condition evaluate,不是filter,filter会专门创建iterator来执行。
passes_extra_conditions = JoinedRowPassesExtraConditions();
if (thd()->is_error()) {
// Evaluation of extra conditions raised an error, so abort the join.
return 1;
}
if (!passes_extra_conditions) {
// Advance to the next matching row in the hash table. Note that the
// iterator stays in the state READING_FIRST_ROW_FROM_HASH_TABLE even
// though we are not actually reading the first row anymore. This is
// because WriteProbeRowToDiskIfApplicable() needs to know if this is
// the first row that matches both the join condition and any extra
// conditions; only unmatched rows will be written to disk.
++m_hash_map_iterator;
}
}
} while (res == 0 && !passes_extra_conditions);
// The row passed all extra conditions (or we are out of rows in the hash
// table), so we can now write the row to disk.
// Inner and outer joins: Write out all rows from the probe input (given that
// we have degraded into on-disk hash join).
// Semijoin and antijoin: Write out rows that do not have any matching row in
// the hash table.
//检查是否需要下盘
if (WriteProbeRowToDiskIfApplicable()) {
return 1;
}
//外查询的null处理
if (res == -1) {
// If we did not find a matching row in the hash table, antijoin and outer
// join should ouput the last row read from the probe input together with a
// NULL-complemented row from the build input. However, in case of on-disk
// antijoin, a row from the probe input can match a row from the build input
// that has already been written out to disk. So for on-disk antijoin, we
// cannot output any rows until we have started reading from chunk files.
//
// On-disk outer join is a bit more tricky; we can only output a
// NULL-complemented row if the probe row did not match anything from the
// build input while doing any of the probe phases. We can have multiple
// probe phases if e.g. a build chunk file is too big to fit in memory; we
// would have to read the build chunk in multiple smaller chunks while doing
// a probe phase for each of these smaller chunks. To keep track of this,
// each probe row is prefixed with a match flag in the chunk files.
bool return_null_complemented_row = false;
if ((on_disk_hash_join() && m_current_chunk == -1) ||
m_write_to_probe_row_saving) {
return_null_complemented_row = false;
} else if (m_join_type == JoinType::ANTI) {
return_null_complemented_row = true;
} else if (m_join_type == JoinType::OUTER &&
m_state == State::READING_FIRST_ROW_FROM_HASH_TABLE &&
!m_probe_row_match_flag) {
return_null_complemented_row = true;
}
SetReadingProbeRowState();
if (return_null_complemented_row) {
m_build_input->SetNullRowFlag(true);
return 0;
}
return -1;
}
// 省略部分代码
++m_hash_map_iterator;
return 0;
}
可以看到,在Mysql的处理中,如果join条件里面没有等值查询的话,直接的表现就是HashJoinIterator
里面的m_join_conditions
值为空,那么在往HashTable
里面emplace
的时候,创建的Key对象,代码如下:
//将查询到的数据和join key封装为Key和BufferRow对象,放入到map中
m_last_row_stored = m_hash_map->emplace(Key(join_key_data, join_key_size),
BufferRow(row, row_size));
此时的join_key_data
固定为NULL
,而join_key_size
固定为0,也就是说,这个map
里面的key
只有一个,与之对应的value
则有多个。实际上表现出来的结果就是做了一个简单的笛卡尔积,然后在进行条件过滤,本质上并没有进行传统HashJoin
的probe
操作。
原文:https://www.cnblogs.com/seancheer/p/14127503.html