Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 80 additions & 18 deletions cli/api/dbadapters/execution_sql.ts
Original file line number Diff line number Diff line change
Expand Up @@ -154,24 +154,8 @@ from (${query}) as insertions`;
this.buildIncrementalSchemaChangeTasks(tasks, table);
// Fall through to run the static DML after the procedure alters the schema
case dataform.OnSchemaChange.IGNORE:
default:
tasks.add(
Task.statement(
table.uniqueKey && table.uniqueKey.length > 0
? this.mergeInto(
table.target,
tableMetadata?.fields.map(f => f.name),
this.getIncrementalQuery(table),
table.uniqueKey,
table.bigquery && table.bigquery.updatePartitionFilter
)
: this.insertInto(
table.target,
tableMetadata?.fields.map(f => f.name).map(column => `\`${column}\``),
this.getIncrementalQuery(table)
)
)
);
const columns = tableMetadata?.fields.map(f => f.name) || [];
tasks.add(Task.statement(this.getIncrementalDmlStatement(table, columns)));
break;
}
}
Expand Down Expand Up @@ -470,6 +454,84 @@ when matched then
when not matched then
insert (${backtickedColumns.join(",")}) values (${backtickedColumns.join(",")})`;
}

private insertOverwrite(
target: dataform.ITarget,
columns: string[],
query: string,
partitionBy: string,
updatePartitionFilter: string
): string {
const uniqueId = this.uniqueIdGenerator();
const stagingTableUnqualified = `staging_table_temp_${uniqueId}`;
const backtickedColumns = columns.map(column => `\`${column}\``);
const resolveTargetTable = this.resolveTarget(target);

return `CREATE OR REPLACE TEMP TABLE \`${stagingTableUnqualified}\` AS (
${query}
);

BEGIN
DECLARE partitions_for_replacement DEFAULT (
ARRAY(
SELECT DISTINCT ${partitionBy}
FROM \`${stagingTableUnqualified}\`
WHERE ${partitionBy} IS NOT NULL
)
);

MERGE ${resolveTargetTable} T
USING \`${stagingTableUnqualified}\` S
ON FALSE
WHEN NOT MATCHED BY SOURCE AND ${partitionBy} IN UNNEST(partitions_for_replacement) ${updatePartitionFilter ? `and T.${updatePartitionFilter}` : ""} THEN
DELETE
WHEN NOT MATCHED BY TARGET THEN
INSERT (${backtickedColumns.join(",")}) VALUES (${backtickedColumns.join(",")});
END;

DROP TABLE IF EXISTS \`${stagingTableUnqualified}\`;`;
}

private getIncrementalDmlStatement(
table: dataform.ITable,
columns: string[]
): string {
const incrementalQuery = this.getIncrementalQuery(table);

switch (table.incrementalStrategy) {
case dataform.IncrementalStrategy.INSERT_OVERWRITE:
return this.insertOverwrite(
table.target,
columns,
incrementalQuery,
table.bigquery && table.bigquery.partitionBy,
table.bigquery && table.bigquery.updatePartitionFilter
);
case dataform.IncrementalStrategy.MERGE:
return this.mergeInto(
table.target,
columns,
incrementalQuery,
table.uniqueKey,
table.bigquery && table.bigquery.updatePartitionFilter
);
default:
if (table.uniqueKey && table.uniqueKey.length > 0) {
return this.mergeInto(
table.target,
columns,
incrementalQuery,
table.uniqueKey,
table.bigquery && table.bigquery.updatePartitionFilter
);
}
return this.insertInto(
table.target,
columns.map(column => `\`${column}\``),
incrementalQuery
);
}
}
}

export function collectEvaluationQueries(
Expand Down
30 changes: 30 additions & 0 deletions cli/api/execution_sql_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -86,4 +86,34 @@ suite("ExecutionSql with 'onSchemaChange'", () => {
const expectedSql = fs.readFileSync("cli/api/goldens/on_schema_change_ignore.sql", "utf8");
expect(procedureSql).to.equal(expectedSql.trim());
});

test("generates INSERT_OVERWRITE script for IGNORE strategy", () => {
const table = {
...baseTable,
incrementalStrategy: dataform.IncrementalStrategy.INSERT_OVERWRITE,
bigquery: {
partitionBy: "DATE(ts)",
updatePartitionFilter: "ts >= '2024-01-01'"
}
};
const tasks = executionSql.publishTasks(table, { fullRefresh: false }, tableMetadata);
const sql = tasks.build().map(t => t.statement).join("\n;\n");
const expectedSql = fs.readFileSync("cli/api/goldens/insert_overwrite_ignore.sql", "utf8");
expect(sql).to.equal(expectedSql.trim());
});

test("generates INSERT_OVERWRITE script for EXTEND strategy", () => {
const table = {
...baseTable,
incrementalStrategy: dataform.IncrementalStrategy.INSERT_OVERWRITE,
onSchemaChange: dataform.OnSchemaChange.EXTEND,
bigquery: {
partitionBy: "DATE(ts)"
}
};
const tasks = executionSql.publishTasks(table, { fullRefresh: false }, tableMetadata);
const sql = tasks.build().map(t => t.statement).join("\n;\n");
const expectedSql = fs.readFileSync("cli/api/goldens/insert_overwrite_extend.sql", "utf8");
expect(sql).to.equal(expectedSql.trim());
});
});
97 changes: 97 additions & 0 deletions cli/api/goldens/insert_overwrite_extend.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
CREATE OR REPLACE PROCEDURE `project-id.dataset-id.df_osc_test_uuid`()
OPTIONS(strict_mode=false)
BEGIN

-- Create empty table to extract schema of new query.
CREATE OR REPLACE TABLE `project-id.dataset-id.incremental_on_schema_change_df_temp_test_uuid_empty` AS (
SELECT * FROM (select 1 as id, 'a' as field1, 'new' as field2) AS insertions LIMIT 0
);


-- Compare schemas
DECLARE dataform_columns ARRAY<STRING>;
DECLARE temp_table_columns ARRAY<STRUCT<column_name STRING, data_type STRING>>;
DECLARE columns_added ARRAY<STRUCT<column_name STRING, data_type STRING>>;
DECLARE columns_removed ARRAY<STRING>;

SET dataform_columns = (
SELECT IFNULL(ARRAY_AGG(DISTINCT column_name), [])
FROM `project-id.dataset-id.INFORMATION_SCHEMA.COLUMNS`
WHERE table_name = 'incremental_on_schema_change'
);

SET temp_table_columns = (
SELECT IFNULL(ARRAY_AGG(STRUCT(column_name, data_type)), [])
FROM `project-id.dataset-id.INFORMATION_SCHEMA.COLUMNS`
WHERE table_name = 'incremental_on_schema_change_df_temp_test_uuid_empty'
);

SET columns_added = (
SELECT IFNULL(ARRAY_AGG(column_info), [])
FROM UNNEST(temp_table_columns) AS column_info
WHERE column_info.column_name NOT IN UNNEST(dataform_columns)
);
SET columns_removed = (
SELECT IFNULL(ARRAY_AGG(column_name), [])
FROM UNNEST(dataform_columns) AS column_name
WHERE column_name NOT IN (SELECT col.column_name FROM UNNEST(temp_table_columns) AS col)
);


-- Apply schema change strategy (EXTEND).
IF ARRAY_LENGTH(columns_removed) > 0 THEN
RAISE USING MESSAGE = FORMAT(
"Column removals are not allowed when on_schema_change = 'EXTEND'. Removed columns: %T",
columns_removed
);
END IF;

IF ARRAY_LENGTH(columns_added) > 0 THEN
EXECUTE IMMEDIATE (
"ALTER TABLE `project-id.dataset-id.incremental_on_schema_change` " ||
(
SELECT STRING_AGG(FORMAT("ADD COLUMN IF NOT EXISTS %s %s", column_info.column_name, column_info.data_type), ", ")
FROM UNNEST(columns_added) AS column_info
)
);
END IF;



-- Cleanup temporary tables.
DROP TABLE IF EXISTS `project-id.dataset-id.incremental_on_schema_change_df_temp_test_uuid_empty`;

END
;
BEGIN
CALL `project-id.dataset-id.df_osc_test_uuid`();
EXCEPTION WHEN ERROR THEN
DROP TABLE IF EXISTS `project-id.dataset-id.incremental_on_schema_change_df_temp_test_uuid_empty`;
DROP PROCEDURE IF EXISTS `project-id.dataset-id.df_osc_test_uuid`;
RAISE;
END;
DROP PROCEDURE IF EXISTS `project-id.dataset-id.df_osc_test_uuid`
;
CREATE OR REPLACE TEMP TABLE `staging_table_temp_test_uuid` AS (
select 1 as id, 'a' as field1, 'new' as field2
);

BEGIN
DECLARE partitions_for_replacement DEFAULT (
ARRAY(
SELECT DISTINCT DATE(ts)
FROM `staging_table_temp_test_uuid`
WHERE DATE(ts) IS NOT NULL
)
);

MERGE `project-id.dataset-id.incremental_on_schema_change` T
USING `staging_table_temp_test_uuid` S
ON FALSE
WHEN NOT MATCHED BY SOURCE AND DATE(ts) IN UNNEST(partitions_for_replacement) THEN
DELETE
WHEN NOT MATCHED BY TARGET THEN
INSERT (`id`,`field1`) VALUES (`id`,`field1`);
END;

DROP TABLE IF EXISTS `staging_table_temp_test_uuid`
23 changes: 23 additions & 0 deletions cli/api/goldens/insert_overwrite_ignore.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
CREATE OR REPLACE TEMP TABLE `staging_table_temp_test_uuid` AS (
select 1 as id, 'a' as field1, 'new' as field2
);

BEGIN
DECLARE partitions_for_replacement DEFAULT (
ARRAY(
SELECT DISTINCT DATE(ts)
FROM `staging_table_temp_test_uuid`
WHERE DATE(ts) IS NOT NULL
)
);

MERGE `project-id.dataset-id.incremental_on_schema_change` T
USING `staging_table_temp_test_uuid` S
ON FALSE
WHEN NOT MATCHED BY SOURCE AND DATE(ts) IN UNNEST(partitions_for_replacement) and T.ts >= '2024-01-01' THEN
DELETE
WHEN NOT MATCHED BY TARGET THEN
INSERT (`id`,`field1`) VALUES (`id`,`field1`);
END;

DROP TABLE IF EXISTS `staging_table_temp_test_uuid`
56 changes: 56 additions & 0 deletions core/actions/incremental_table.ts
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,30 @@ export class IncrementalTable extends ActionBuilder<dataform.Table> {
} : {}),
});
this.proto.onSchemaChange = this.mapOnSchemaChange(config.onSchemaChange);
this.proto.incrementalStrategy = this.mapIncrementalStrategy(config.incrementalStrategy);

switch (this.proto.incrementalStrategy) {
case dataform.IncrementalStrategy.INSERT_OVERWRITE:
if (!this.proto.bigquery || !this.proto.bigquery.partitionBy) {
this.session.compileError(
new Error("incrementalStrategy 'insert_overwrite' requires 'partitionBy' to be set."),
config.filename,
this.proto.target
);
}
break;
case dataform.IncrementalStrategy.MERGE:
if (!this.proto.uniqueKey || this.proto.uniqueKey.length === 0) {
this.session.compileError(
new Error("incrementalStrategy 'merge' requires 'uniqueKey' to be set."),
config.filename,
this.proto.target
);
}
break;
default:
break;
}

if (config.reservation) {
if (!this.proto.actionDescriptor) {
Expand Down Expand Up @@ -736,6 +760,38 @@ export class IncrementalTable extends ActionBuilder<dataform.Table> {
throw new Error(`OnSchemaChange value "${onSchemaChange}" is not supported`);
}
}

private mapIncrementalStrategy(
incrementalStrategy?: string | number
): dataform.IncrementalStrategy {
if (!incrementalStrategy) {
return dataform.IncrementalStrategy.INCREMENTAL_STRATEGY_UNSPECIFIED;
}

if (typeof incrementalStrategy === "number") {
switch (incrementalStrategy) {
case dataform.ActionConfig.IncrementalStrategy.INCREMENTAL_STRATEGY_UNSPECIFIED:
return dataform.IncrementalStrategy.INCREMENTAL_STRATEGY_UNSPECIFIED;
case dataform.ActionConfig.IncrementalStrategy.MERGE:
return dataform.IncrementalStrategy.MERGE;
case dataform.ActionConfig.IncrementalStrategy.INSERT_OVERWRITE:
return dataform.IncrementalStrategy.INSERT_OVERWRITE;
default:
throw new Error(`IncrementalStrategy value "${incrementalStrategy}" is not supported`);
}
}

switch (incrementalStrategy.toString().toUpperCase()) {
case "INCREMENTAL_STRATEGY_UNSPECIFIED":
return dataform.IncrementalStrategy.INCREMENTAL_STRATEGY_UNSPECIFIED;
case "MERGE":
return dataform.IncrementalStrategy.MERGE;
case "INSERT_OVERWRITE":
return dataform.IncrementalStrategy.INSERT_OVERWRITE;
default:
throw new Error(`IncrementalStrategy value "${incrementalStrategy}" is not supported`);
}
}
}

/**
Expand Down
Loading
Loading