Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,12 @@ Block ScanHashMapAfterProbeBlockInputStream::readImpl()
else
fillColumnsUsingCurrentPartition<false, true>(columns_left, columns_right, row_counter_column);
break;
case ASTTableJoin::Kind::Full:
if (parent.has_other_condition)
fillColumnsUsingCurrentPartition<true, false>(columns_left, columns_right, row_counter_column);
else
fillColumnsUsingCurrentPartition<false, false>(columns_left, columns_right, row_counter_column);
break;
case ASTTableJoin::Kind::RightAnti:
case ASTTableJoin::Kind::RightOuter:
if (parent.has_other_condition)
Expand Down
116 changes: 56 additions & 60 deletions dbms/src/Debug/MockExecutor/JoinBinder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,63 @@
#include <Debug/MockExecutor/ExecutorBinder.h>
#include <Debug/MockExecutor/JoinBinder.h>
#include <Flash/Coprocessor/DAGCodec.h>
#include <Flash/Coprocessor/JoinInterpreterHelper.h>
#include <Parsers/ASTIdentifier.h>
#include <Parsers/ASTTablesInSelectQuery.h>

namespace DB::mock
{
namespace
{
void appendJoinSchema(DAGSchema & output_schema, const DAGSchema & input_schema, bool make_nullable)
{
for (const auto & field : input_schema)
{
if (make_nullable && field.second.hasNotNullFlag())
output_schema.push_back(toNullableDAGColumnInfo(field));
else
output_schema.push_back(field);
}
}

void buildLeftSideJoinSchema(DAGSchema & schema, const DAGSchema & left_schema, tipb::JoinType tp)
{
appendJoinSchema(schema, left_schema, JoinInterpreterHelper::makeLeftJoinSideNullable(tp));
}

void buildRightSideJoinSchema(DAGSchema & schema, const DAGSchema & right_schema, tipb::JoinType tp)
{
/// Note: for semi join, the right table column is ignored
/// but for (anti) left outer semi join, a 1/0 (uint8) field is pushed back
/// indicating whether right table has matching row(s), see comment in ASTTableJoin::Kind for details.
if (tp == tipb::JoinType::TypeLeftOuterSemiJoin || tp == tipb::JoinType::TypeAntiLeftOuterSemiJoin)
{
tipb::FieldType field_type{};
field_type.set_tp(TiDB::TypeTiny);
field_type.set_charset("binary");
field_type.set_collate(TiDB::ITiDBCollator::BINARY);
field_type.set_flag(0);
field_type.set_flen(-1);
field_type.set_decimal(-1);
schema.push_back(std::make_pair("", TiDB::fieldTypeToColumnInfo(field_type)));
}
else if (tp != tipb::JoinType::TypeSemiJoin && tp != tipb::JoinType::TypeAntiSemiJoin)
{
appendJoinSchema(schema, right_schema, JoinInterpreterHelper::makeRightJoinSideNullable(tp));
}
}

DAGSchema buildOtherConditionSchema(
const DAGSchema & left_schema,
const DAGSchema & right_schema,
tipb::JoinType join_type)
{
DAGSchema merged_children_schema;
appendJoinSchema(merged_children_schema, left_schema, JoinInterpreterHelper::makeLeftJoinSideNullable(join_type));
appendJoinSchema(merged_children_schema, right_schema, JoinInterpreterHelper::makeRightJoinSideNullable(join_type));
return merged_children_schema;
}
} // namespace

void JoinBinder::addRuntimeFilter(MockRuntimeFilter & rf)
{
Expand Down Expand Up @@ -95,22 +147,8 @@ void JoinBinder::columnPrune(std::unordered_set<String> & used_columns)

/// update output schema
output_schema.clear();

for (auto & field : children[0]->output_schema)
{
if (tp == tipb::TypeRightOuterJoin && field.second.hasNotNullFlag())
output_schema.push_back(toNullableDAGColumnInfo(field));
else
output_schema.push_back(field);
}

for (auto & field : children[1]->output_schema)
{
if (tp == tipb::TypeLeftOuterJoin && field.second.hasNotNullFlag())
output_schema.push_back(toNullableDAGColumnInfo(field));
else
output_schema.push_back(field);
}
buildLeftSideJoinSchema(output_schema, children[0]->output_schema, tp);
buildRightSideJoinSchema(output_schema, children[1]->output_schema, tp);
}

void JoinBinder::fillJoinKeyAndFieldType(
Expand Down Expand Up @@ -187,11 +225,8 @@ bool JoinBinder::toTiPBExecutor(
astToPB(children[1]->output_schema, expr, cond, collator_id, context);
}

DAGSchema merged_children_schema{children[0]->output_schema};
merged_children_schema.insert(
merged_children_schema.end(),
children[1]->output_schema.begin(),
children[1]->output_schema.end());
DAGSchema merged_children_schema
= buildOtherConditionSchema(children[0]->output_schema, children[1]->output_schema, tp);

for (const auto & expr : other_conds)
{
Expand Down Expand Up @@ -293,45 +328,6 @@ void JoinBinder::toMPPSubPlan(
exchange_map[right_exchange_receiver->name] = std::make_pair(right_exchange_receiver, right_exchange_sender);
}

static void buildLeftSideJoinSchema(DAGSchema & schema, const DAGSchema & left_schema, tipb::JoinType tp)
{
for (const auto & field : left_schema)
{
if (tp == tipb::JoinType::TypeRightOuterJoin && field.second.hasNotNullFlag())
schema.push_back(toNullableDAGColumnInfo(field));
else
schema.push_back(field);
}
}

static void buildRightSideJoinSchema(DAGSchema & schema, const DAGSchema & right_schema, tipb::JoinType tp)
{
/// Note: for semi join, the right table column is ignored
/// but for (anti) left outer semi join, a 1/0 (uint8) field is pushed back
/// indicating whether right table has matching row(s), see comment in ASTTableJoin::Kind for details.
if (tp == tipb::JoinType::TypeLeftOuterSemiJoin || tp == tipb::JoinType::TypeAntiLeftOuterSemiJoin)
{
tipb::FieldType field_type{};
field_type.set_tp(TiDB::TypeTiny);
field_type.set_charset("binary");
field_type.set_collate(TiDB::ITiDBCollator::BINARY);
field_type.set_flag(0);
field_type.set_flen(-1);
field_type.set_decimal(-1);
schema.push_back(std::make_pair("", TiDB::fieldTypeToColumnInfo(field_type)));
}
else if (tp != tipb::JoinType::TypeSemiJoin && tp != tipb::JoinType::TypeAntiSemiJoin)
{
for (const auto & field : right_schema)
{
if (tp == tipb::JoinType::TypeLeftOuterJoin && field.second.hasNotNullFlag())
schema.push_back(toNullableDAGColumnInfo(field));
else
schema.push_back(field);
}
}
}

// compileJoin constructs a mocked Join executor node, note that all conditional expression params can be default
ExecutorBinderPtr compileJoin(
size_t & executor_index,
Expand Down
2 changes: 2 additions & 0 deletions dbms/src/Flash/Coprocessor/DAGUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -881,6 +881,8 @@ String getJoinTypeName(const tipb::JoinType & tp)
return "LeftOuterJoin";
case tipb::JoinType::TypeRightOuterJoin:
return "RightOuterJoin";
case tipb::JoinType::TypeFullOuterJoin:
return "FullOuterJoin";
case tipb::JoinType::TypeLeftOuterSemiJoin:
return "LeftOuterSemiJoin";
case tipb::JoinType::TypeAntiSemiJoin:
Expand Down
17 changes: 11 additions & 6 deletions dbms/src/Flash/Coprocessor/JoinInterpreterHelper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ std::pair<ASTTableJoin::Kind, size_t> getJoinKindAndBuildSideIndex(
/// 3. for non-cross left/right outer join, there is no problem in this swap.
/// 4. for cross left outer join, the build side is always right, needn't and can't swap.
/// 5. for cross right outer join, the build side is always left, so it will always swap and change to cross left outer join.
/// 6. for non-cross full outer join, keep full join kind and respect inner_idx as build side.
/// note that whatever the build side is, we can't support cross-right-outer join now.
static const std::unordered_map<
std::pair<tipb::JoinType, size_t>,
Expand All @@ -72,6 +73,8 @@ std::pair<ASTTableJoin::Kind, size_t> getJoinKindAndBuildSideIndex(
{{tipb::JoinType::TypeLeftOuterJoin, 1}, {ASTTableJoin::Kind::LeftOuter, 1}},
{{tipb::JoinType::TypeRightOuterJoin, 0}, {ASTTableJoin::Kind::LeftOuter, 0}},
{{tipb::JoinType::TypeRightOuterJoin, 1}, {ASTTableJoin::Kind::RightOuter, 1}},
{{tipb::JoinType::TypeFullOuterJoin, 0}, {ASTTableJoin::Kind::Full, 0}},
{{tipb::JoinType::TypeFullOuterJoin, 1}, {ASTTableJoin::Kind::Full, 1}},
{{tipb::JoinType::TypeSemiJoin, 0}, {ASTTableJoin::Kind::RightSemi, 0}},
{{tipb::JoinType::TypeSemiJoin, 1}, {ASTTableJoin::Kind::Semi, 1}},
{{tipb::JoinType::TypeAntiSemiJoin, 0}, {ASTTableJoin::Kind::RightAnti, 0}},
Expand Down Expand Up @@ -103,6 +106,8 @@ std::pair<ASTTableJoin::Kind, size_t> getJoinKindAndBuildSideIndex(
{{tipb::JoinType::TypeAntiLeftOuterSemiJoin, 1}, {ASTTableJoin::Kind::NullAware_LeftOuterAnti, 1}}};

RUNTIME_ASSERT(inner_index == 0 || inner_index == 1);
if (unlikely(tipb_join_type == tipb::JoinType::TypeFullOuterJoin && join_keys_size == 0))
throw TiFlashException("Cartesian full outer join is not supported yet", Errors::Coprocessor::BadRequest);
const auto & join_type_map = [is_null_aware, join_keys_size]() {
if (is_null_aware)
{
Expand Down Expand Up @@ -295,8 +300,8 @@ NamesAndTypes TiFlashJoin::genColumnsForOtherJoinFilter(
column_set_for_origin_columns.emplace(p.name);
}
};
append_origin_columns(left_cols, join.join_type() == tipb::JoinType::TypeRightOuterJoin);
append_origin_columns(right_cols, join.join_type() == tipb::JoinType::TypeLeftOuterJoin);
append_origin_columns(left_cols, makeLeftJoinSideNullable(join.join_type()));
append_origin_columns(right_cols, makeRightJoinSideNullable(join.join_type()));

/// append the columns generated by probe side prepare join actions.
/// the new columns are
Expand All @@ -310,8 +315,8 @@ NamesAndTypes TiFlashJoin::genColumnsForOtherJoinFilter(
columns_for_other_join_filter.emplace_back(p.name, make_nullable ? makeNullable(p.type) : p.type);
}
};
bool make_nullable = build_side_index == 1 ? join.join_type() == tipb::JoinType::TypeRightOuterJoin
: join.join_type() == tipb::JoinType::TypeLeftOuterJoin;
bool make_nullable = build_side_index == 1 ? makeLeftJoinSideNullable(join.join_type())
: makeRightJoinSideNullable(join.join_type());
append_new_columns(probe_prepare_join_actions->getSampleBlock(), make_nullable);

return columns_for_other_join_filter;
Expand All @@ -330,11 +335,11 @@ NamesAndTypes TiFlashJoin::genJoinOutputColumns(
}
};

append_output_columns(left_cols, join.join_type() == tipb::JoinType::TypeRightOuterJoin);
append_output_columns(left_cols, makeLeftJoinSideNullable(join.join_type()));
if (!isSemiFamily() && !isLeftOuterSemiFamily())
{
/// for (left outer) semi join, the columns from right table will be ignored
append_output_columns(right_cols, join.join_type() == tipb::JoinType::TypeLeftOuterJoin);
append_output_columns(right_cols, makeRightJoinSideNullable(join.join_type()));
}

if (!match_helper_name.empty())
Expand Down
14 changes: 12 additions & 2 deletions dbms/src/Flash/Coprocessor/JoinInterpreterHelper.h
Original file line number Diff line number Diff line change
Expand Up @@ -94,9 +94,9 @@ struct JoinNonEqualConditions
/// Validate this JoinNonEqualConditions and return error message if any.
const char * validate(ASTTableJoin::Kind kind) const
{
if unlikely (!left_filter_column.empty() && !isLeftOuterJoin(kind))
if unlikely (!left_filter_column.empty() && !(isLeftOuterJoin(kind) || kind == ASTTableJoin::Kind::Full))
return "non left join with left conditions";
if unlikely (!right_filter_column.empty() && !isRightOuterJoin(kind))
if unlikely (!right_filter_column.empty() && !(isRightOuterJoin(kind) || kind == ASTTableJoin::Kind::Full))
return "non right join with right conditions";

if unlikely ((!other_cond_name.empty() || !other_eq_cond_from_in_name.empty()) && other_cond_expr == nullptr)
Expand All @@ -119,6 +119,16 @@ struct JoinNonEqualConditions

namespace JoinInterpreterHelper
{
constexpr bool makeLeftJoinSideNullable(tipb::JoinType join_type)
{
return join_type == tipb::JoinType::TypeRightOuterJoin || join_type == tipb::JoinType::TypeFullOuterJoin;
}

constexpr bool makeRightJoinSideNullable(tipb::JoinType join_type)
{
return join_type == tipb::JoinType::TypeLeftOuterJoin || join_type == tipb::JoinType::TypeFullOuterJoin;
}

struct TiFlashJoin
{
TiFlashJoin(const tipb::Join & join_, bool is_test);
Expand Down
9 changes: 5 additions & 4 deletions dbms/src/Flash/Coprocessor/collectOutputFieldTypes.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

#include <Flash/Coprocessor/DAGCodec.h>
#include <Flash/Coprocessor/DAGUtils.h>
#include <Flash/Coprocessor/JoinInterpreterHelper.h>
#include <Flash/Coprocessor/collectOutputFieldTypes.h>
#include <TiDB/Schema/TiDB.h>
#include <common/types.h>
Expand Down Expand Up @@ -172,9 +173,9 @@ bool collectForJoin(std::vector<tipb::FieldType> & output_field_types, const tip
// collect output_field_types for join self
for (auto & field_type : children_output_field_types[0])
{
if (executor.join().join_type() == tipb::JoinType::TypeRightOuterJoin)
if (JoinInterpreterHelper::makeLeftJoinSideNullable(executor.join().join_type()))
{
/// the type of left column for right join is always nullable
/// the type of left column for right/full join is always nullable
auto updated_field_type = field_type;
updated_field_type.set_flag(
static_cast<UInt32>(updated_field_type.flag()) & (~static_cast<UInt32>(TiDB::ColumnFlagNotNull)));
Expand Down Expand Up @@ -210,9 +211,9 @@ bool collectForJoin(std::vector<tipb::FieldType> & output_field_types, const tip
/// for semi/anti semi join, the right table column is ignored
for (auto & field_type : children_output_field_types[1])
{
if (executor.join().join_type() == tipb::JoinType::TypeLeftOuterJoin)
if (JoinInterpreterHelper::makeRightJoinSideNullable(executor.join().join_type()))
{
/// the type of right column for left join is always nullable
/// the type of right column for left/full join is always nullable
auto updated_field_type = field_type;
updated_field_type.set_flag(
updated_field_type.flag() & (~static_cast<UInt32>(TiDB::ColumnFlagNotNull)));
Expand Down
Loading