Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,30 +28,28 @@
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.Schema.UnresolvedColumn;
import org.apache.flink.table.api.Schema.UnresolvedPhysicalColumn;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.catalog.CatalogManager;
import org.apache.flink.table.catalog.DataTypeFactory;
import org.apache.flink.table.catalog.ResolvedCatalogBaseTable;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.planner.calcite.FlinkCalciteSqlValidator;
import org.apache.flink.table.planner.calcite.FlinkPlannerImpl;
import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
import org.apache.flink.table.planner.calcite.SqlRewriterUtils;
import org.apache.flink.table.planner.operations.PlannerQueryOperation;
import org.apache.flink.table.planner.operations.SqlNodeToOperationConversion;
import org.apache.flink.table.planner.operations.converters.SqlNodeConverter.ConvertContext;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;

import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.RelFactories;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.sql.SqlCall;
import org.apache.calcite.rex.RexBuilder;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.sql.SqlIdentifier;
import org.apache.calcite.sql.SqlLiteral;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.SqlNodeList;
import org.apache.calcite.sql.parser.SqlParserPos;
import org.apache.calcite.sql.validate.SqlValidator;
import org.apache.calcite.tools.RelBuilder;

import javax.annotation.Nullable;

Expand Down Expand Up @@ -88,80 +86,70 @@ public MergeTableAsUtil(ConvertContext context) {
}

/**
* Rewrites the query operation to include only the fields that may be persisted in the sink.
* Reshapes the query so its output columns line up with the sink's persistable columns:
* reordering them and filling sink columns the query does not produce with {@code NULL}.
* Returns the query unchanged when it already matches the sink 1:1. A sink column the query
* does not produce that is declared {@code NOT NULL} raises a {@link ValidationException}.
*/
public PlannerQueryOperation maybeRewriteQuery(
CatalogManager catalogManager,
FlinkPlannerImpl flinkPlanner,
PlannerQueryOperation origQueryOperation,
SqlNode origQueryNode,
ResolvedCatalogBaseTable<?> sinkTable) {
FlinkCalciteSqlValidator sqlValidator = flinkPlanner.getOrCreateSqlValidator();
SqlRewriterUtils rewriterUtils = new SqlRewriterUtils(sqlValidator);
FlinkTypeFactory typeFactory = (FlinkTypeFactory) sqlValidator.getTypeFactory();

// Only fields that may be persisted will be included in the select query
RowType sinkRowType =
((RowType) sinkTable.getResolvedSchema().toSinkRowDataType().getLogicalType());

Map<String, Integer> sourceFields =
IntStream.range(0, origQueryOperation.getResolvedSchema().getColumnNames().size())
final RelNode queryRelNode = origQueryOperation.getCalciteTree();
final RelOptCluster cluster = queryRelNode.getCluster();
final RexBuilder rexBuilder = cluster.getRexBuilder();
final FlinkTypeFactory typeFactory = (FlinkTypeFactory) cluster.getTypeFactory();

// Only fields that may be persisted are included in the sink.
final RowType sinkRowType =
(RowType) sinkTable.getResolvedSchema().toSinkRowDataType().getLogicalType();

final List<String> sourceColumns = origQueryOperation.getResolvedSchema().getColumnNames();
final Map<String, Integer> sourceFields =
IntStream.range(0, sourceColumns.size())
.boxed()
.collect(
Collectors.toMap(
origQueryOperation.getResolvedSchema().getColumnNames()
::get,
Function.identity()));
.collect(Collectors.toMap(sourceColumns::get, Function.identity()));

// assignedFields contains the new sink fields that are not present in the source
// and that will be included in the select query
LinkedHashMap<Integer, SqlNode> assignedFields = new LinkedHashMap<>();

// targetPositions contains the positions of the source fields that will be
// included in the select query
List<Object> targetPositions = new ArrayList<>();
final List<RexNode> projects = new ArrayList<>();
final List<String> fieldNames = new ArrayList<>();
// The projection is a no-op when the query already produces the sink columns 1:1 in order.
boolean rewriteNeeded = sinkRowType.getFieldCount() != sourceColumns.size();

// The loop cannot stop once a rewrite is detected: the projection must cover every sink
// field, and every missing NOT NULL column must still be validated.
int pos = -1;
for (RowType.RowField targetField : sinkRowType.getFields()) {
pos++;
fieldNames.add(targetField.getName());

if (!sourceFields.containsKey(targetField.getName())) {
final Integer sourcePos = sourceFields.get(targetField.getName());
if (sourcePos == null) {
if (!targetField.getType().isNullable()) {
throw new ValidationException(
"Column '"
+ targetField.getName()
+ "' has no default value and does not allow NULLs.");
}

assignedFields.put(
pos,
validator.maybeCast(
SqlLiteral.createNull(SqlParserPos.ZERO),
typeFactory.createUnknownType(),
projects.add(
rexBuilder.makeNullLiteral(
typeFactory.createFieldTypeFromLogicalType(targetField.getType())));
rewriteNeeded = true;
} else {
targetPositions.add(sourceFields.get(targetField.getName()));
projects.add(rexBuilder.makeInputRef(queryRelNode, sourcePos));
if (sourcePos != pos) {
rewriteNeeded = true;
}
}
Comment thread
raminqaf marked this conversation as resolved.
}

// rewrite query
SqlCall newSelect =
rewriterUtils.rewriteCall(
rewriterUtils,
sqlValidator,
(SqlCall) origQueryNode,
typeFactory.buildRelNodeRowType(sinkRowType),
assignedFields,
targetPositions,
() -> "Unsupported node type " + origQueryNode.getKind());

return (PlannerQueryOperation)
SqlNodeToOperationConversion.convert(flinkPlanner, catalogManager, newSelect)
.orElseThrow(
() ->
new TableException(
"Unsupported node type "
+ newSelect.getClass().getSimpleName()));
if (!rewriteNeeded) {
return origQueryOperation;
}

final RelBuilder relBuilder = RelFactories.LOGICAL_BUILDER.create(cluster, null);
final RelNode projected =
relBuilder.push(queryRelNode).project(projects, fieldNames, true).build();
return new PlannerQueryOperation(projected, () -> escapeExpression.apply(origQueryNode));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,18 +47,17 @@ public class SqlCreateTableAsConverter extends AbstractCreateTableConverter<SqlC
public Operation convertSqlNode(SqlCreateTableAs sqlCreateTableAs, ConvertContext context) {
final FlinkPlannerImpl flinkPlanner = context.getFlinkPlanner();
final CatalogManager catalogManager = context.getCatalogManager();
SqlNode asQuerySqlNode = sqlCreateTableAs.getAsQuery();
SqlNode validatedAsQuery = flinkPlanner.validate(asQuerySqlNode);
final SqlNode asQuerySqlNode = sqlCreateTableAs.getAsQuery();

PlannerQueryOperation query =
(PlannerQueryOperation)
SqlNodeToOperationConversion.convert(
flinkPlanner, catalogManager, validatedAsQuery)
flinkPlanner, catalogManager, asQuerySqlNode)
.orElseThrow(
() ->
new TableException(
"CTAS unsupported node type "
+ validatedAsQuery
+ asQuerySqlNode
.getClass()
.getSimpleName()));
ResolvedCatalogTable tableWithResolvedSchema =
Expand All @@ -67,12 +66,7 @@ public Operation convertSqlNode(SqlCreateTableAs sqlCreateTableAs, ConvertContex
// If needed, rewrite the query to include the new sink fields in the select list
query =
new MergeTableAsUtil(context)
.maybeRewriteQuery(
catalogManager,
flinkPlanner,
query,
validatedAsQuery,
tableWithResolvedSchema);
.maybeRewriteQuery(query, asQuerySqlNode, tableWithResolvedSchema);

ObjectIdentifier identifier = getIdentifier(sqlCreateTableAs, context);
CreateTableOperation createTableOperation =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,7 @@ public Operation convertSqlNode(SqlReplaceTableAs sqlReplaceTableAs, ConvertCont
query =
new MergeTableAsUtil(context)
.maybeRewriteQuery(
context.getCatalogManager(),
flinkPlanner,
query,
sqlReplaceTableAs.getAsQuery(),
tableWithResolvedSchema);
query, sqlReplaceTableAs.getAsQuery(), tableWithResolvedSchema);

ObjectIdentifier identifier = getIdentifier(sqlReplaceTableAs, context);
CreateTableOperation createTableOperation =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.apache.flink.table.operations.materializedtable.AlterMaterializedTableSuspendOperation;
import org.apache.flink.table.operations.materializedtable.CreateMaterializedTableOperation;
import org.apache.flink.table.operations.materializedtable.DropMaterializedTableOperation;
import org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.SetSemanticTableFunction;
import org.apache.flink.table.planner.utils.TableFunc0;

import org.junit.jupiter.api.BeforeEach;
Expand Down Expand Up @@ -195,6 +196,21 @@ void before() throws TableAlreadyExistException, DatabaseNotExistException {
sqlWithNonPersistedLast, "base_mtbl_with_non_persisted_last");
}

@Test
void testCreateMaterializedTableAsSelectWithSetSemanticTablePtf() {
functionCatalog.registerTemporarySystemFunction("f", new SetSemanticTableFunction(), false);
final String sql =
"CREATE MATERIALIZED TABLE mt_ptf\n"
+ "WITH (\n"
+ " 'connector' = 'filesystem',\n"
+ " 'format' = 'json'\n"
+ ")\n"
+ "AS SELECT * FROM f(r => TABLE t1 PARTITION BY a, i => 1)";

final Operation operation = parse(sql);
assertThat(operation).isInstanceOf(CreateMaterializedTableOperation.class);
}

@Test
void testCreateMaterializedTable() {
final String sql =
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.planner.runtime.stream.sql;

import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.functions.ProcessTableFunction;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.SetSemanticTableFunction;
import org.apache.flink.table.planner.runtime.utils.StreamingTestBase;
import org.apache.flink.table.planner.runtime.utils.TestData;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import static org.assertj.core.api.Assertions.assertThat;

/** IT Case for CREATE TABLE AS SELECT statement over a {@link ProcessTableFunction}. */
class CTASITCase extends StreamingTestBase {

@BeforeEach
@Override
public void before() throws Exception {
super.before();
final String dataId = TestValuesTableFactory.registerData(TestData.smallData3());
tEnv().executeSql(
String.format(
"CREATE TABLE source(a int, b bigint, c string)"
+ " WITH ('connector' = 'values', 'bounded' = 'true', 'data-id' = '%s')",
dataId));
tEnv().createTemporarySystemFunction("f", SetSemanticTableFunction.class);
}

@Test
void testCreateTableAsSelectOverSetSemanticPtfWithReorderedColumns() throws Exception {
tEnv().executeSql(
"CREATE TABLE sink (`out`, `a`) WITH ('connector' = 'values') AS "
+ "SELECT * FROM f(r => TABLE source PARTITION BY a, i => 1)")
.await();

final ResolvedSchema schema = tEnv().from("sink").getResolvedSchema();
assertThat(schema.getColumnNames()).containsExactly("out", "a");

assertThat(TestValuesTableFactory.getResultsAsStrings("sink"))
.containsExactlyInAnyOrder(
"+I[{+I[1, 1, Hi], 1}, 1]",
"+I[{+I[2, 2, Hello], 1}, 2]",
"+I[{+I[3, 2, Hello world], 1}, 3]");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.exceptions.TableNotExistException;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.SetSemanticTableFunction;
import org.apache.flink.table.planner.runtime.utils.StreamingTestBase;
import org.apache.flink.table.planner.runtime.utils.TestData;
import org.apache.flink.table.types.AbstractDataType;
Expand Down Expand Up @@ -253,6 +254,28 @@ void testCreateOrReplaceTableASWithTableNotExist() throws Exception {
verifyCatalogTable(expectCatalogTable, getCatalogTable("not_exist_target"));
}

@Test
void testReplaceTableAsSelectOverSetSemanticPtf() throws Exception {
tEnv().createTemporarySystemFunction("f", SetSemanticTableFunction.class);
tEnv().executeSql(
"REPLACE TABLE target WITH ('connector' = 'values', 'bounded' = 'true') AS "
+ "SELECT * FROM f(r => TABLE source PARTITION BY a, i => 1)")
.await();

assertThat(TestValuesTableFactory.getResultsAsStrings("target"))
.containsExactlyInAnyOrder(
"+I[1, {+I[1, 1, Hi], 1}]",
"+I[2, {+I[2, 2, Hello], 1}]",
"+I[3, {+I[3, 2, Hello world], 1}]");

// The partition key `a` is passed through and the PTF appends its `out` column.
CatalogTable expectCatalogTable =
getExpectCatalogTable(
new String[] {"a", "out"},
new AbstractDataType[] {DataTypes.INT(), DataTypes.STRING()});
verifyCatalogTable(expectCatalogTable, getCatalogTable("target"));
}

private CatalogTable getExpectCatalogTable(
String[] cols, AbstractDataType<?>[] fieldDataTypes) {
return getExpectCatalogTable(cols, fieldDataTypes, getDefaultTargetTableOptions());
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
== Abstract Syntax Tree ==
LogicalSink(table=[default_catalog.default_database.MyCtasTable], fields=[EXPR$0, a, b])
+- LogicalProject(EXPR$0=[null:INTEGER], a=[$0], b=[$1])
LogicalSink(table=[default_catalog.default_database.MyCtasTable], fields=[votes, a, b, metadata_col])

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't this a breaking change? The EXPR$0 to votes makes sense, but the additional metadata_col?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not a breaking change. metadata_col appears only because I extended this test's DDL to also declare a persisted metadata column and a virtual one, to add CTAS coverage for those column kinds. It's a change to the test input, not to planner behavior.
I verified on master: running this exact DDL there already produces the same plan, including metadata_col:

  LogicalSink(table=[…MyCtasTable], fields=[votes, a, b, metadata_col])                                                             
  +- LogicalProject(votes=[null:INTEGER], a=[$0], b=[$1], metadata_col=[null:BIGINT])

+- LogicalProject(votes=[null:INTEGER], a=[$0], b=[$1], metadata_col=[null:BIGINT])
+- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])

== Optimized Physical Plan ==
Sink(table=[default_catalog.default_database.MyCtasTable], fields=[EXPR$0, a, b])
+- Calc(select=[null:INTEGER AS EXPR$0, a, b])
Sink(table=[default_catalog.default_database.MyCtasTable], fields=[votes, a, b, metadata_col])
+- Calc(select=[null:INTEGER AS votes, a, b, null:BIGINT AS metadata_col])
+- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c])

== Optimized Execution Plan ==
Sink(table=[default_catalog.default_database.MyCtasTable], fields=[EXPR$0, a, b])
+- Calc(select=[null:INTEGER AS EXPR$0, a, b])
Sink(table=[default_catalog.default_database.MyCtasTable], fields=[votes, a, b, metadata_col])
+- Calc(select=[null:INTEGER AS votes, a, b, null:BIGINT AS metadata_col])
+- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c])
Loading