Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,30 +28,28 @@
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.Schema.UnresolvedColumn;
import org.apache.flink.table.api.Schema.UnresolvedPhysicalColumn;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.catalog.CatalogManager;
import org.apache.flink.table.catalog.DataTypeFactory;
import org.apache.flink.table.catalog.ResolvedCatalogBaseTable;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.planner.calcite.FlinkCalciteSqlValidator;
import org.apache.flink.table.planner.calcite.FlinkPlannerImpl;
import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
import org.apache.flink.table.planner.calcite.SqlRewriterUtils;
import org.apache.flink.table.planner.operations.PlannerQueryOperation;
import org.apache.flink.table.planner.operations.SqlNodeToOperationConversion;
import org.apache.flink.table.planner.operations.converters.SqlNodeConverter.ConvertContext;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;

import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.RelFactories;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.sql.SqlCall;
import org.apache.calcite.rex.RexBuilder;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.sql.SqlIdentifier;
import org.apache.calcite.sql.SqlLiteral;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.SqlNodeList;
import org.apache.calcite.sql.parser.SqlParserPos;
import org.apache.calcite.sql.validate.SqlValidator;
import org.apache.calcite.tools.RelBuilder;

import javax.annotation.Nullable;

Expand Down Expand Up @@ -88,80 +86,94 @@ public MergeTableAsUtil(ConvertContext context) {
}

/**
* Rewrites the query operation to include only the fields that may be persisted in the sink.
* Reshapes the query so its output columns line up with the sink table's persistable columns,
* by reordering them and filling sink columns the query does not produce with {@code NULL}.
*
* <p>In {@code CREATE TABLE ... AS SELECT} the sink columns do not always match the query
* output: the {@code CREATE TABLE} part may add columns or reorder them. The "maybe" reflects
* that some statements need no reshaping at all, in which case the original query is returned
* unchanged.
*
* <p>Examples, given a source table {@code source(a INT, c STRING)}:
*
* <ul>
* <li>Schema fully derived from the query, nothing to change:
* <pre>
* CREATE TABLE sink AS SELECT a, c FROM source
* -- query already produces (a, c); returned unchanged
* </pre>
* <li>Extra sink columns not produced by the query are filled with {@code NULL}:
* <pre>
* CREATE TABLE sink (p1 INT, p2 STRING) AS SELECT a, c FROM source
* -- projected to: NULL AS p1, NULL AS p2, a, c
* </pre>
* <li>Columns reordered to match the declared order:
* <pre>
* CREATE TABLE sink (c, a) AS SELECT a, c FROM source
* -- projected to: c, a
* </pre>
* </ul>
*
* <p>A sink column that the query does not produce and that is declared {@code NOT NULL} cannot
* be filled with {@code NULL} and raises a {@link ValidationException}.
Comment thread
raminqaf marked this conversation as resolved.
Outdated
*/
public PlannerQueryOperation maybeRewriteQuery(
CatalogManager catalogManager,
FlinkPlannerImpl flinkPlanner,
PlannerQueryOperation origQueryOperation,
SqlNode origQueryNode,
ResolvedCatalogBaseTable<?> sinkTable) {
FlinkCalciteSqlValidator sqlValidator = flinkPlanner.getOrCreateSqlValidator();
SqlRewriterUtils rewriterUtils = new SqlRewriterUtils(sqlValidator);
FlinkTypeFactory typeFactory = (FlinkTypeFactory) sqlValidator.getTypeFactory();

// Only fields that may be persisted will be included in the select query
RowType sinkRowType =
((RowType) sinkTable.getResolvedSchema().toSinkRowDataType().getLogicalType());

Map<String, Integer> sourceFields =
IntStream.range(0, origQueryOperation.getResolvedSchema().getColumnNames().size())
final RelNode queryRelNode = origQueryOperation.getCalciteTree();
final RelOptCluster cluster = queryRelNode.getCluster();
final RexBuilder rexBuilder = cluster.getRexBuilder();
final FlinkTypeFactory typeFactory = (FlinkTypeFactory) cluster.getTypeFactory();

// Only fields that may be persisted are included in the sink.
final RowType sinkRowType =
(RowType) sinkTable.getResolvedSchema().toSinkRowDataType().getLogicalType();

final List<String> sourceColumns = origQueryOperation.getResolvedSchema().getColumnNames();
final Map<String, Integer> sourceFields =
IntStream.range(0, sourceColumns.size())
.boxed()
.collect(
Collectors.toMap(
origQueryOperation.getResolvedSchema().getColumnNames()
::get,
Function.identity()));

// assignedFields contains the new sink fields that are not present in the source
// and that will be included in the select query
LinkedHashMap<Integer, SqlNode> assignedFields = new LinkedHashMap<>();
.collect(Collectors.toMap(sourceColumns::get, Function.identity()));

// targetPositions contains the positions of the source fields that will be
// included in the select query
List<Object> targetPositions = new ArrayList<>();
final List<RexNode> projects = new ArrayList<>();
final List<String> fieldNames = new ArrayList<>();
// The projection is a no-op when the query already produces the sink columns 1:1 in order.
boolean rewriteNeeded = sinkRowType.getFieldCount() != sourceColumns.size();

int pos = -1;
for (RowType.RowField targetField : sinkRowType.getFields()) {
pos++;
fieldNames.add(targetField.getName());

if (!sourceFields.containsKey(targetField.getName())) {
final Integer sourcePos = sourceFields.get(targetField.getName());
if (sourcePos == null) {
if (!targetField.getType().isNullable()) {
throw new ValidationException(
"Column '"
+ targetField.getName()
+ "' has no default value and does not allow NULLs.");
}

assignedFields.put(
pos,
validator.maybeCast(
SqlLiteral.createNull(SqlParserPos.ZERO),
typeFactory.createUnknownType(),
projects.add(
rexBuilder.makeNullLiteral(
typeFactory.createFieldTypeFromLogicalType(targetField.getType())));
rewriteNeeded = true;
} else {
targetPositions.add(sourceFields.get(targetField.getName()));
projects.add(rexBuilder.makeInputRef(queryRelNode, sourcePos));
if (sourcePos != pos) {
rewriteNeeded = true;
}
}
Comment thread
raminqaf marked this conversation as resolved.
}

// rewrite query
SqlCall newSelect =
rewriterUtils.rewriteCall(
rewriterUtils,
sqlValidator,
(SqlCall) origQueryNode,
typeFactory.buildRelNodeRowType(sinkRowType),
assignedFields,
targetPositions,
() -> "Unsupported node type " + origQueryNode.getKind());

return (PlannerQueryOperation)
SqlNodeToOperationConversion.convert(flinkPlanner, catalogManager, newSelect)
.orElseThrow(
() ->
new TableException(
"Unsupported node type "
+ newSelect.getClass().getSimpleName()));
if (!rewriteNeeded) {
return origQueryOperation;
}

final RelBuilder relBuilder = RelFactories.LOGICAL_BUILDER.create(cluster, null);
final RelNode projected =
relBuilder.push(queryRelNode).project(projects, fieldNames, true).build();
return new PlannerQueryOperation(projected, () -> escapeExpression.apply(origQueryNode));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,18 +47,17 @@ public class SqlCreateTableAsConverter extends AbstractCreateTableConverter<SqlC
public Operation convertSqlNode(SqlCreateTableAs sqlCreateTableAs, ConvertContext context) {
final FlinkPlannerImpl flinkPlanner = context.getFlinkPlanner();
final CatalogManager catalogManager = context.getCatalogManager();
SqlNode asQuerySqlNode = sqlCreateTableAs.getAsQuery();
SqlNode validatedAsQuery = flinkPlanner.validate(asQuerySqlNode);
final SqlNode asQuerySqlNode = sqlCreateTableAs.getAsQuery();

PlannerQueryOperation query =
(PlannerQueryOperation)
SqlNodeToOperationConversion.convert(
flinkPlanner, catalogManager, validatedAsQuery)
flinkPlanner, catalogManager, asQuerySqlNode)
.orElseThrow(
() ->
new TableException(
"CTAS unsupported node type "
+ validatedAsQuery
+ asQuerySqlNode
.getClass()
.getSimpleName()));
ResolvedCatalogTable tableWithResolvedSchema =
Expand All @@ -67,12 +66,7 @@ public Operation convertSqlNode(SqlCreateTableAs sqlCreateTableAs, ConvertContex
// If needed, rewrite the query to include the new sink fields in the select list
query =
new MergeTableAsUtil(context)
.maybeRewriteQuery(
catalogManager,
flinkPlanner,
query,
validatedAsQuery,
tableWithResolvedSchema);
.maybeRewriteQuery(query, asQuerySqlNode, tableWithResolvedSchema);

ObjectIdentifier identifier = getIdentifier(sqlCreateTableAs, context);
CreateTableOperation createTableOperation =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,7 @@ public Operation convertSqlNode(SqlReplaceTableAs sqlReplaceTableAs, ConvertCont
query =
new MergeTableAsUtil(context)
.maybeRewriteQuery(
context.getCatalogManager(),
flinkPlanner,
query,
sqlReplaceTableAs.getAsQuery(),
tableWithResolvedSchema);
query, sqlReplaceTableAs.getAsQuery(), tableWithResolvedSchema);

ObjectIdentifier identifier = getIdentifier(sqlReplaceTableAs, context);
CreateTableOperation createTableOperation =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.planner.calcite.FlinkPlannerImpl;
import org.apache.flink.table.planner.parse.CalciteParser;
import org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.SetSemanticTableFunction;
import org.apache.flink.table.planner.utils.TestSimpleDynamicTableSourceFactory;

import org.apache.calcite.sql.SqlNode;
Expand Down Expand Up @@ -432,6 +433,53 @@ void testExplainCreateTableAs() {
.isInstanceOf(CreateTableASOperation.class);
}

@Test
void testCreateTableAsWithSetSemanticPtf() {
functionCatalog.registerTemporarySystemFunction("f", new SetSemanticTableFunction(), false);

// Rewritten query: unchanged. The PTF output already matches the sink columns (a, out).
final String sql =
"CREATE TABLE tbl1 WITH ('connector' = '"
+ TestSimpleDynamicTableSourceFactory.IDENTIFIER()
+ "') AS SELECT * FROM f(r => TABLE t1 PARTITION BY a, i => 1)";

Operation ctas = parseAndConvert(sql);
Operation operation = ((CreateTableASOperation) ctas).getCreateTableOperation();
// The partition key `a` is passed through and the PTF appends its `out` column.
assertThat(operation)
.is(
new HamcrestCondition<>(
isCreateTableOperation(
withSchema(
Schema.newBuilder()
.column("a", DataTypes.BIGINT().notNull())
.column("out", DataTypes.STRING())
.build()))));
}

@Test
void testCreateTableAsWithSetSemanticPtfAndReorderedColumns() {
functionCatalog.registerTemporarySystemFunction("f", new SetSemanticTableFunction(), false);

// Rewritten query: SELECT `out`, `a`. The PTF output (a, out) reordered to sink columns.
final String sql =
"CREATE TABLE tbl1 (`out`, `a`) WITH ('connector' = '"
+ TestSimpleDynamicTableSourceFactory.IDENTIFIER()
+ "') AS SELECT * FROM f(r => TABLE t1 PARTITION BY a, i => 1)";

Operation ctas = parseAndConvert(sql);
Operation operation = ((CreateTableASOperation) ctas).getCreateTableOperation();
assertThat(operation)
.is(
new HamcrestCondition<>(
isCreateTableOperation(
withSchema(
Schema.newBuilder()
.column("out", DataTypes.STRING())
.column("a", DataTypes.BIGINT().notNull())
.build()))));
}

private Operation parseAndConvert(String sql) {
final FlinkPlannerImpl planner = getPlannerBySqlDialect(SqlDialect.DEFAULT);
final CalciteParser parser = getParserBySqlDialect(SqlDialect.DEFAULT);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.apache.flink.table.operations.materializedtable.AlterMaterializedTableSuspendOperation;
import org.apache.flink.table.operations.materializedtable.CreateMaterializedTableOperation;
import org.apache.flink.table.operations.materializedtable.DropMaterializedTableOperation;
import org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.SetSemanticTableFunction;
import org.apache.flink.table.planner.utils.TableFunc0;

import org.junit.jupiter.api.BeforeEach;
Expand Down Expand Up @@ -195,6 +196,21 @@ void before() throws TableAlreadyExistException, DatabaseNotExistException {
sqlWithNonPersistedLast, "base_mtbl_with_non_persisted_last");
}

@Test
void testCreateMaterializedTableAsSelectWithSetSemanticTablePtf() {
functionCatalog.registerTemporarySystemFunction("f", new SetSemanticTableFunction(), false);
final String sql =
"CREATE MATERIALIZED TABLE mt_ptf\n"
+ "WITH (\n"
+ " 'connector' = 'filesystem',\n"
+ " 'format' = 'json'\n"
+ ")\n"
+ "AS SELECT * FROM f(r => TABLE t1 PARTITION BY a, i => 1)";

final Operation operation = parse(sql);
assertThat(operation).isInstanceOf(CreateMaterializedTableOperation.class);
}

@Test
void testCreateMaterializedTable() {
final String sql =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.flink.table.operations.ddl.CreateTableOperation;
import org.apache.flink.table.planner.calcite.FlinkPlannerImpl;
import org.apache.flink.table.planner.parse.CalciteParser;
import org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.SetSemanticTableFunction;
import org.apache.flink.table.types.AbstractDataType;

import org.apache.calcite.sql.SqlNode;
Expand Down Expand Up @@ -290,6 +291,47 @@ void testExplainReplaceTableAs() {
.isInstanceOf(ReplaceTableAsOperation.class);
}

@Test
void testReplaceTableAsWithSetSemanticPtf() {
functionCatalog.registerTemporarySystemFunction("f", new SetSemanticTableFunction(), false);

String tableName = "replace_table";
// Rewritten query: unchanged. The PTF output already matches the sink columns (a, out).
String sql =
"REPLACE TABLE "
+ tableName
+ " WITH ('k1' = 'v1', 'k2' = 'v2')"
+ " as SELECT * FROM f(r => TABLE t1 PARTITION BY a, i => 1)";
// The partition key `a` is passed through and the PTF appends its `out` column.
Schema tableSchema =
Schema.newBuilder()
.column("a", DataTypes.BIGINT().notNull())
.column("out", DataTypes.STRING())
.build();

testCommonReplaceTableAs(sql, tableName, null, tableSchema, null, Collections.emptyList());
}

@Test
void testReplaceTableAsWithSetSemanticPtfAndReorderedColumns() {
functionCatalog.registerTemporarySystemFunction("f", new SetSemanticTableFunction(), false);

String tableName = "replace_table";
// Rewritten query: SELECT `out`, `a`. The PTF output (a, out) reordered to sink columns.
String sql =
Comment thread
raminqaf marked this conversation as resolved.
Outdated
"REPLACE TABLE "
+ tableName
+ " (`out`, `a`) WITH ('k1' = 'v1', 'k2' = 'v2')"
+ " as SELECT * FROM f(r => TABLE t1 PARTITION BY a, i => 1)";
Comment thread
raminqaf marked this conversation as resolved.
Outdated
Schema tableSchema =
Schema.newBuilder()
.column("out", DataTypes.STRING())
.column("a", DataTypes.BIGINT().notNull())
.build();

testCommonReplaceTableAs(sql, tableName, null, tableSchema, null, Collections.emptyList());
Comment thread
raminqaf marked this conversation as resolved.
Outdated
Comment thread
raminqaf marked this conversation as resolved.
Outdated
}

private void testCommonReplaceTableAs(
String sql, String tableName, @Nullable String tableComment) {
testCommonReplaceTableAs(
Expand Down
Loading