Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,30 +28,28 @@
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.Schema.UnresolvedColumn;
import org.apache.flink.table.api.Schema.UnresolvedPhysicalColumn;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.catalog.CatalogManager;
import org.apache.flink.table.catalog.DataTypeFactory;
import org.apache.flink.table.catalog.ResolvedCatalogBaseTable;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.planner.calcite.FlinkCalciteSqlValidator;
import org.apache.flink.table.planner.calcite.FlinkPlannerImpl;
import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
import org.apache.flink.table.planner.calcite.SqlRewriterUtils;
import org.apache.flink.table.planner.operations.PlannerQueryOperation;
import org.apache.flink.table.planner.operations.SqlNodeToOperationConversion;
import org.apache.flink.table.planner.operations.converters.SqlNodeConverter.ConvertContext;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;

import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.RelFactories;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.sql.SqlCall;
import org.apache.calcite.rex.RexBuilder;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.sql.SqlIdentifier;
import org.apache.calcite.sql.SqlLiteral;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.SqlNodeList;
import org.apache.calcite.sql.parser.SqlParserPos;
import org.apache.calcite.sql.validate.SqlValidator;
import org.apache.calcite.tools.RelBuilder;

import javax.annotation.Nullable;

Expand Down Expand Up @@ -88,80 +86,70 @@ public MergeTableAsUtil(ConvertContext context) {
}

/**
* Rewrites the query operation to include only the fields that may be persisted in the sink.
* Reshapes the query so its output columns line up with the sink's persistable columns:
* reordering them and filling sink columns the query does not produce with {@code NULL}.
* Returns the query unchanged when it already matches the sink 1:1. A sink column the query
* does not produce that is declared {@code NOT NULL} raises a {@link ValidationException}.
*/
public PlannerQueryOperation maybeRewriteQuery(
CatalogManager catalogManager,
FlinkPlannerImpl flinkPlanner,
PlannerQueryOperation origQueryOperation,
SqlNode origQueryNode,
ResolvedCatalogBaseTable<?> sinkTable) {
FlinkCalciteSqlValidator sqlValidator = flinkPlanner.getOrCreateSqlValidator();
SqlRewriterUtils rewriterUtils = new SqlRewriterUtils(sqlValidator);
FlinkTypeFactory typeFactory = (FlinkTypeFactory) sqlValidator.getTypeFactory();

// Only fields that may be persisted will be included in the select query
RowType sinkRowType =
((RowType) sinkTable.getResolvedSchema().toSinkRowDataType().getLogicalType());

Map<String, Integer> sourceFields =
IntStream.range(0, origQueryOperation.getResolvedSchema().getColumnNames().size())
final RelNode queryRelNode = origQueryOperation.getCalciteTree();
final RelOptCluster cluster = queryRelNode.getCluster();
final RexBuilder rexBuilder = cluster.getRexBuilder();
final FlinkTypeFactory typeFactory = (FlinkTypeFactory) cluster.getTypeFactory();

// Only fields that may be persisted are included in the sink.
final RowType sinkRowType =
(RowType) sinkTable.getResolvedSchema().toSinkRowDataType().getLogicalType();

final List<String> sourceColumns = origQueryOperation.getResolvedSchema().getColumnNames();
final Map<String, Integer> sourceFields =
IntStream.range(0, sourceColumns.size())
.boxed()
.collect(
Collectors.toMap(
origQueryOperation.getResolvedSchema().getColumnNames()
::get,
Function.identity()));
.collect(Collectors.toMap(sourceColumns::get, Function.identity()));

// assignedFields contains the new sink fields that are not present in the source
// and that will be included in the select query
LinkedHashMap<Integer, SqlNode> assignedFields = new LinkedHashMap<>();

// targetPositions contains the positions of the source fields that will be
// included in the select query
List<Object> targetPositions = new ArrayList<>();
final List<RexNode> projects = new ArrayList<>();
final List<String> fieldNames = new ArrayList<>();
// The projection is a no-op when the query already produces the sink columns 1:1 in order.
boolean rewriteNeeded = sinkRowType.getFieldCount() != sourceColumns.size();

// The loop cannot stop once a rewrite is detected: the projection must cover every sink
// field, and every missing NOT NULL column must still be validated.
int pos = -1;
for (RowType.RowField targetField : sinkRowType.getFields()) {
pos++;
fieldNames.add(targetField.getName());

if (!sourceFields.containsKey(targetField.getName())) {
final Integer sourcePos = sourceFields.get(targetField.getName());
if (sourcePos == null) {
if (!targetField.getType().isNullable()) {
throw new ValidationException(
"Column '"
+ targetField.getName()
+ "' has no default value and does not allow NULLs.");
}

assignedFields.put(
pos,
validator.maybeCast(
SqlLiteral.createNull(SqlParserPos.ZERO),
typeFactory.createUnknownType(),
projects.add(
rexBuilder.makeNullLiteral(
typeFactory.createFieldTypeFromLogicalType(targetField.getType())));
rewriteNeeded = true;
} else {
targetPositions.add(sourceFields.get(targetField.getName()));
projects.add(rexBuilder.makeInputRef(queryRelNode, sourcePos));
if (sourcePos != pos) {
rewriteNeeded = true;
}
}
Comment thread
raminqaf marked this conversation as resolved.
}

// rewrite query
SqlCall newSelect =
rewriterUtils.rewriteCall(
rewriterUtils,
sqlValidator,
(SqlCall) origQueryNode,
typeFactory.buildRelNodeRowType(sinkRowType),
assignedFields,
targetPositions,
() -> "Unsupported node type " + origQueryNode.getKind());

return (PlannerQueryOperation)
SqlNodeToOperationConversion.convert(flinkPlanner, catalogManager, newSelect)
.orElseThrow(
() ->
new TableException(
"Unsupported node type "
+ newSelect.getClass().getSimpleName()));
if (!rewriteNeeded) {
return origQueryOperation;
}

final RelBuilder relBuilder = RelFactories.LOGICAL_BUILDER.create(cluster, null);
final RelNode projected =
relBuilder.push(queryRelNode).project(projects, fieldNames, true).build();
return new PlannerQueryOperation(projected, () -> escapeExpression.apply(origQueryNode));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,18 +47,17 @@ public class SqlCreateTableAsConverter extends AbstractCreateTableConverter<SqlC
public Operation convertSqlNode(SqlCreateTableAs sqlCreateTableAs, ConvertContext context) {
final FlinkPlannerImpl flinkPlanner = context.getFlinkPlanner();
final CatalogManager catalogManager = context.getCatalogManager();
SqlNode asQuerySqlNode = sqlCreateTableAs.getAsQuery();
SqlNode validatedAsQuery = flinkPlanner.validate(asQuerySqlNode);
final SqlNode asQuerySqlNode = sqlCreateTableAs.getAsQuery();

PlannerQueryOperation query =
(PlannerQueryOperation)
SqlNodeToOperationConversion.convert(
flinkPlanner, catalogManager, validatedAsQuery)
flinkPlanner, catalogManager, asQuerySqlNode)
.orElseThrow(
() ->
new TableException(
"CTAS unsupported node type "
+ validatedAsQuery
+ asQuerySqlNode
.getClass()
.getSimpleName()));
ResolvedCatalogTable tableWithResolvedSchema =
Expand All @@ -67,12 +66,7 @@ public Operation convertSqlNode(SqlCreateTableAs sqlCreateTableAs, ConvertContex
// If needed, rewrite the query to include the new sink fields in the select list
query =
new MergeTableAsUtil(context)
.maybeRewriteQuery(
catalogManager,
flinkPlanner,
query,
validatedAsQuery,
tableWithResolvedSchema);
.maybeRewriteQuery(query, asQuerySqlNode, tableWithResolvedSchema);

ObjectIdentifier identifier = getIdentifier(sqlCreateTableAs, context);
CreateTableOperation createTableOperation =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,7 @@ public Operation convertSqlNode(SqlReplaceTableAs sqlReplaceTableAs, ConvertCont
query =
new MergeTableAsUtil(context)
.maybeRewriteQuery(
context.getCatalogManager(),
flinkPlanner,
query,
sqlReplaceTableAs.getAsQuery(),
tableWithResolvedSchema);
query, sqlReplaceTableAs.getAsQuery(), tableWithResolvedSchema);

ObjectIdentifier identifier = getIdentifier(sqlReplaceTableAs, context);
CreateTableOperation createTableOperation =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.planner.calcite.FlinkPlannerImpl;
import org.apache.flink.table.planner.parse.CalciteParser;
import org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.SetSemanticTableFunction;
import org.apache.flink.table.planner.utils.TestSimpleDynamicTableSourceFactory;

import org.apache.calcite.sql.SqlNode;
Expand Down Expand Up @@ -432,6 +433,51 @@ void testExplainCreateTableAs() {
.isInstanceOf(CreateTableASOperation.class);
}

@Test
void testCreateTableAsWithSetSemanticPtf() {
functionCatalog.registerTemporarySystemFunction("f", new SetSemanticTableFunction(), false);

final String sql =
"CREATE TABLE tbl1 WITH ('connector' = '"
+ TestSimpleDynamicTableSourceFactory.IDENTIFIER()
+ "') AS SELECT * FROM f(r => TABLE t1 PARTITION BY a, i => 1)";

Operation ctas = parseAndConvert(sql);
Operation operation = ((CreateTableASOperation) ctas).getCreateTableOperation();
// The partition key `a` is passed through and the PTF appends its `out` column.
assertThat(operation)
.is(
new HamcrestCondition<>(
isCreateTableOperation(
withSchema(
Schema.newBuilder()
.column("a", DataTypes.BIGINT().notNull())
.column("out", DataTypes.STRING())
.build()))));
}

@Test
void testCreateTableAsWithSetSemanticPtfAndReorderedColumns() {
functionCatalog.registerTemporarySystemFunction("f", new SetSemanticTableFunction(), false);

final String sql =
"CREATE TABLE tbl1 (`out`, `a`) WITH ('connector' = '"
+ TestSimpleDynamicTableSourceFactory.IDENTIFIER()
+ "') AS SELECT * FROM f(r => TABLE t1 PARTITION BY a, i => 1)";

Operation ctas = parseAndConvert(sql);
Operation operation = ((CreateTableASOperation) ctas).getCreateTableOperation();
assertThat(operation)
.is(
new HamcrestCondition<>(
isCreateTableOperation(
withSchema(
Schema.newBuilder()
.column("out", DataTypes.STRING())
.column("a", DataTypes.BIGINT().notNull())
.build()))));
}

private Operation parseAndConvert(String sql) {
final FlinkPlannerImpl planner = getPlannerBySqlDialect(SqlDialect.DEFAULT);
final CalciteParser parser = getParserBySqlDialect(SqlDialect.DEFAULT);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.apache.flink.table.operations.materializedtable.AlterMaterializedTableSuspendOperation;
import org.apache.flink.table.operations.materializedtable.CreateMaterializedTableOperation;
import org.apache.flink.table.operations.materializedtable.DropMaterializedTableOperation;
import org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.SetSemanticTableFunction;
import org.apache.flink.table.planner.utils.TableFunc0;

import org.junit.jupiter.api.BeforeEach;
Expand Down Expand Up @@ -195,6 +196,21 @@ void before() throws TableAlreadyExistException, DatabaseNotExistException {
sqlWithNonPersistedLast, "base_mtbl_with_non_persisted_last");
}

@Test
void testCreateMaterializedTableAsSelectWithSetSemanticTablePtf() {
functionCatalog.registerTemporarySystemFunction("f", new SetSemanticTableFunction(), false);
final String sql =
"CREATE MATERIALIZED TABLE mt_ptf\n"
+ "WITH (\n"
+ " 'connector' = 'filesystem',\n"
+ " 'format' = 'json'\n"
+ ")\n"
+ "AS SELECT * FROM f(r => TABLE t1 PARTITION BY a, i => 1)";

final Operation operation = parse(sql);
assertThat(operation).isInstanceOf(CreateMaterializedTableOperation.class);
}

@Test
void testCreateMaterializedTable() {
final String sql =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.flink.table.operations.ddl.CreateTableOperation;
import org.apache.flink.table.planner.calcite.FlinkPlannerImpl;
import org.apache.flink.table.planner.parse.CalciteParser;
import org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.SetSemanticTableFunction;
import org.apache.flink.table.types.AbstractDataType;

import org.apache.calcite.sql.SqlNode;
Expand Down Expand Up @@ -290,6 +291,61 @@ void testExplainReplaceTableAs() {
.isInstanceOf(ReplaceTableAsOperation.class);
}

@Test
void testReplaceTableAsWithSetSemanticPtf() {
functionCatalog.registerTemporarySystemFunction("f", new SetSemanticTableFunction(), false);

String tableName = "replace_table";
String sql =
"REPLACE TABLE "
+ tableName
+ " WITH ('k1' = 'v1', 'k2' = 'v2')"
+ " AS SELECT * FROM f(r => TABLE t1 PARTITION BY a, i => 1)";
// The partition key `a` is passed through and the PTF appends its `out` column.
Schema tableSchema =
Schema.newBuilder()
.column("a", DataTypes.BIGINT().notNull())
.column("out", DataTypes.STRING())
.build();

testCommonReplaceTableAs(sql, tableName, null, tableSchema, null, List.of());
assertThat(
((ReplaceTableAsOperation) parseAndConvert(sql))
.getChild()
.asSerializableString())
.isEqualTo(
"SELECT `EXPR$0`.`a`, `EXPR$0`.`out`\n"
+ "FROM TABLE(`f`(`r` => (SELECT `t1`.`a`, `t1`.`b`, `t1`.`c`, `t1`.`d`\n"
+ "FROM `builtin`.`default`.`t1` AS `t1`) PARTITION BY `a`, `i` => 1)) AS `EXPR$0`");
}

@Test
void testReplaceTableAsWithSetSemanticPtfAndReorderedColumns() {
functionCatalog.registerTemporarySystemFunction("f", new SetSemanticTableFunction(), false);

String tableName = "replace_table";
String sql =
"REPLACE TABLE "
+ tableName
+ " (`out`, `a`) WITH ('k1' = 'v1', 'k2' = 'v2')"
+ " AS SELECT * FROM f(r => TABLE t1 PARTITION BY a, i => 1)";
Schema tableSchema =
Schema.newBuilder()
.column("out", DataTypes.STRING())
.column("a", DataTypes.BIGINT().notNull())
.build();

testCommonReplaceTableAs(sql, tableName, null, tableSchema, null, List.of());
assertThat(
((ReplaceTableAsOperation) parseAndConvert(sql))
.getChild()
.asSerializableString())
.isEqualTo(
"SELECT `EXPR$0`.`a`, `EXPR$0`.`out`\n"
+ "FROM TABLE(`f`(`r` => (SELECT `t1`.`a`, `t1`.`b`, `t1`.`c`, `t1`.`d`\n"
+ "FROM `builtin`.`default`.`t1` AS `t1`) PARTITION BY `a`, `i` => 1)) AS `EXPR$0`");
}

private void testCommonReplaceTableAs(
String sql, String tableName, @Nullable String tableComment) {
testCommonReplaceTableAs(
Expand Down
Loading