From 41d152d52e007e5d00505ba4933e4ce2c49144e9 Mon Sep 17 00:00:00 2001 From: SuchodolskiEdvin Date: Tue, 2 Jun 2026 14:36:48 +0000 Subject: [PATCH] Support incremental_strategy parameter and new insert_overwrite strategy - updated proto with new parameters - added new tests - added validation for chosen incremental_strategies - added new insert_overwrite strategy logic --- cli/api/dbadapters/execution_sql.ts | 98 ++++++++++++--- cli/api/execution_sql_test.ts | 30 +++++ cli/api/goldens/insert_overwrite_extend.sql | 97 +++++++++++++++ cli/api/goldens/insert_overwrite_ignore.sql | 23 ++++ core/actions/incremental_table.ts | 56 +++++++++ core/actions/incremental_table_test.ts | 129 ++++++++++++++++++++ core/actions/table_test.ts | 24 ++++ core/actions/view_test.ts | 24 ++++ core/main_test.ts | 3 + protos/configs.proto | 16 +++ protos/core.proto | 15 +++ 11 files changed, 497 insertions(+), 18 deletions(-) create mode 100644 cli/api/goldens/insert_overwrite_extend.sql create mode 100644 cli/api/goldens/insert_overwrite_ignore.sql diff --git a/cli/api/dbadapters/execution_sql.ts b/cli/api/dbadapters/execution_sql.ts index 3745e7a2f..a11a61171 100644 --- a/cli/api/dbadapters/execution_sql.ts +++ b/cli/api/dbadapters/execution_sql.ts @@ -154,24 +154,8 @@ from (${query}) as insertions`; this.buildIncrementalSchemaChangeTasks(tasks, table); // Fall through to run the static DML after the procedure alters the schema case dataform.OnSchemaChange.IGNORE: - default: - tasks.add( - Task.statement( - table.uniqueKey && table.uniqueKey.length > 0 - ? this.mergeInto( - table.target, - tableMetadata?.fields.map(f => f.name), - this.getIncrementalQuery(table), - table.uniqueKey, - table.bigquery && table.bigquery.updatePartitionFilter - ) - : this.insertInto( - table.target, - tableMetadata?.fields.map(f => f.name).map(column => `\`${column}\``), - this.getIncrementalQuery(table) - ) - ) - ); + const columns = tableMetadata?.fields.map(f => f.name) || []; + tasks.add(Task.statement(this.getIncrementalDmlStatement(table, columns))); break; } } @@ -470,6 +454,84 @@ when matched then when not matched then insert (${backtickedColumns.join(",")}) values (${backtickedColumns.join(",")})`; } + + private insertOverwrite( + target: dataform.ITarget, + columns: string[], + query: string, + partitionBy: string, + updatePartitionFilter: string + ): string { + const uniqueId = this.uniqueIdGenerator(); + const stagingTableUnqualified = `staging_table_temp_${uniqueId}`; + const backtickedColumns = columns.map(column => `\`${column}\``); + const resolveTargetTable = this.resolveTarget(target); + + return `CREATE OR REPLACE TEMP TABLE \`${stagingTableUnqualified}\` AS ( + ${query} +); + +BEGIN + DECLARE partitions_for_replacement DEFAULT ( + ARRAY( + SELECT DISTINCT ${partitionBy} + FROM \`${stagingTableUnqualified}\` + WHERE ${partitionBy} IS NOT NULL + ) + ); + + MERGE ${resolveTargetTable} T + USING \`${stagingTableUnqualified}\` S + ON FALSE + WHEN NOT MATCHED BY SOURCE AND ${partitionBy} IN UNNEST(partitions_for_replacement) ${updatePartitionFilter ? `and T.${updatePartitionFilter}` : ""} THEN + DELETE + WHEN NOT MATCHED BY TARGET THEN + INSERT (${backtickedColumns.join(",")}) VALUES (${backtickedColumns.join(",")}); +END; + +DROP TABLE IF EXISTS \`${stagingTableUnqualified}\`;`; + } + + private getIncrementalDmlStatement( + table: dataform.ITable, + columns: string[] + ): string { + const incrementalQuery = this.getIncrementalQuery(table); + + switch (table.incrementalStrategy) { + case dataform.IncrementalStrategy.INSERT_OVERWRITE: + return this.insertOverwrite( + table.target, + columns, + incrementalQuery, + table.bigquery && table.bigquery.partitionBy, + table.bigquery && table.bigquery.updatePartitionFilter + ); + case dataform.IncrementalStrategy.MERGE: + return this.mergeInto( + table.target, + columns, + incrementalQuery, + table.uniqueKey, + table.bigquery && table.bigquery.updatePartitionFilter + ); + default: + if (table.uniqueKey && table.uniqueKey.length > 0) { + return this.mergeInto( + table.target, + columns, + incrementalQuery, + table.uniqueKey, + table.bigquery && table.bigquery.updatePartitionFilter + ); + } + return this.insertInto( + table.target, + columns.map(column => `\`${column}\``), + incrementalQuery + ); + } + } } export function collectEvaluationQueries( diff --git a/cli/api/execution_sql_test.ts b/cli/api/execution_sql_test.ts index 4a8cf5568..b9e4c65c9 100644 --- a/cli/api/execution_sql_test.ts +++ b/cli/api/execution_sql_test.ts @@ -86,4 +86,34 @@ suite("ExecutionSql with 'onSchemaChange'", () => { const expectedSql = fs.readFileSync("cli/api/goldens/on_schema_change_ignore.sql", "utf8"); expect(procedureSql).to.equal(expectedSql.trim()); }); + + test("generates INSERT_OVERWRITE script for IGNORE strategy", () => { + const table = { + ...baseTable, + incrementalStrategy: dataform.IncrementalStrategy.INSERT_OVERWRITE, + bigquery: { + partitionBy: "DATE(ts)", + updatePartitionFilter: "ts >= '2024-01-01'" + } + }; + const tasks = executionSql.publishTasks(table, { fullRefresh: false }, tableMetadata); + const sql = tasks.build().map(t => t.statement).join("\n;\n"); + const expectedSql = fs.readFileSync("cli/api/goldens/insert_overwrite_ignore.sql", "utf8"); + expect(sql).to.equal(expectedSql.trim()); + }); + + test("generates INSERT_OVERWRITE script for EXTEND strategy", () => { + const table = { + ...baseTable, + incrementalStrategy: dataform.IncrementalStrategy.INSERT_OVERWRITE, + onSchemaChange: dataform.OnSchemaChange.EXTEND, + bigquery: { + partitionBy: "DATE(ts)" + } + }; + const tasks = executionSql.publishTasks(table, { fullRefresh: false }, tableMetadata); + const sql = tasks.build().map(t => t.statement).join("\n;\n"); + const expectedSql = fs.readFileSync("cli/api/goldens/insert_overwrite_extend.sql", "utf8"); + expect(sql).to.equal(expectedSql.trim()); + }); }); diff --git a/cli/api/goldens/insert_overwrite_extend.sql b/cli/api/goldens/insert_overwrite_extend.sql new file mode 100644 index 000000000..1e15315a1 --- /dev/null +++ b/cli/api/goldens/insert_overwrite_extend.sql @@ -0,0 +1,97 @@ +CREATE OR REPLACE PROCEDURE `project-id.dataset-id.df_osc_test_uuid`() +OPTIONS(strict_mode=false) +BEGIN + +-- Create empty table to extract schema of new query. +CREATE OR REPLACE TABLE `project-id.dataset-id.incremental_on_schema_change_df_temp_test_uuid_empty` AS ( + SELECT * FROM (select 1 as id, 'a' as field1, 'new' as field2) AS insertions LIMIT 0 +); + + +-- Compare schemas +DECLARE dataform_columns ARRAY; +DECLARE temp_table_columns ARRAY>; +DECLARE columns_added ARRAY>; +DECLARE columns_removed ARRAY; + +SET dataform_columns = ( + SELECT IFNULL(ARRAY_AGG(DISTINCT column_name), []) + FROM `project-id.dataset-id.INFORMATION_SCHEMA.COLUMNS` + WHERE table_name = 'incremental_on_schema_change' +); + +SET temp_table_columns = ( + SELECT IFNULL(ARRAY_AGG(STRUCT(column_name, data_type)), []) + FROM `project-id.dataset-id.INFORMATION_SCHEMA.COLUMNS` + WHERE table_name = 'incremental_on_schema_change_df_temp_test_uuid_empty' +); + +SET columns_added = ( + SELECT IFNULL(ARRAY_AGG(column_info), []) + FROM UNNEST(temp_table_columns) AS column_info + WHERE column_info.column_name NOT IN UNNEST(dataform_columns) +); +SET columns_removed = ( + SELECT IFNULL(ARRAY_AGG(column_name), []) + FROM UNNEST(dataform_columns) AS column_name + WHERE column_name NOT IN (SELECT col.column_name FROM UNNEST(temp_table_columns) AS col) +); + + +-- Apply schema change strategy (EXTEND). +IF ARRAY_LENGTH(columns_removed) > 0 THEN + RAISE USING MESSAGE = FORMAT( + "Column removals are not allowed when on_schema_change = 'EXTEND'. Removed columns: %T", + columns_removed + ); +END IF; + +IF ARRAY_LENGTH(columns_added) > 0 THEN + EXECUTE IMMEDIATE ( + "ALTER TABLE `project-id.dataset-id.incremental_on_schema_change` " || + ( + SELECT STRING_AGG(FORMAT("ADD COLUMN IF NOT EXISTS %s %s", column_info.column_name, column_info.data_type), ", ") + FROM UNNEST(columns_added) AS column_info + ) + ); +END IF; + + + +-- Cleanup temporary tables. +DROP TABLE IF EXISTS `project-id.dataset-id.incremental_on_schema_change_df_temp_test_uuid_empty`; + +END +; +BEGIN + CALL `project-id.dataset-id.df_osc_test_uuid`(); +EXCEPTION WHEN ERROR THEN + DROP TABLE IF EXISTS `project-id.dataset-id.incremental_on_schema_change_df_temp_test_uuid_empty`; + DROP PROCEDURE IF EXISTS `project-id.dataset-id.df_osc_test_uuid`; + RAISE; +END; +DROP PROCEDURE IF EXISTS `project-id.dataset-id.df_osc_test_uuid` +; +CREATE OR REPLACE TEMP TABLE `staging_table_temp_test_uuid` AS ( + select 1 as id, 'a' as field1, 'new' as field2 +); + +BEGIN + DECLARE partitions_for_replacement DEFAULT ( + ARRAY( + SELECT DISTINCT DATE(ts) + FROM `staging_table_temp_test_uuid` + WHERE DATE(ts) IS NOT NULL + ) + ); + + MERGE `project-id.dataset-id.incremental_on_schema_change` T + USING `staging_table_temp_test_uuid` S + ON FALSE + WHEN NOT MATCHED BY SOURCE AND DATE(ts) IN UNNEST(partitions_for_replacement) THEN + DELETE + WHEN NOT MATCHED BY TARGET THEN + INSERT (`id`,`field1`) VALUES (`id`,`field1`); +END; + +DROP TABLE IF EXISTS `staging_table_temp_test_uuid` diff --git a/cli/api/goldens/insert_overwrite_ignore.sql b/cli/api/goldens/insert_overwrite_ignore.sql new file mode 100644 index 000000000..7f694f057 --- /dev/null +++ b/cli/api/goldens/insert_overwrite_ignore.sql @@ -0,0 +1,23 @@ +CREATE OR REPLACE TEMP TABLE `staging_table_temp_test_uuid` AS ( + select 1 as id, 'a' as field1, 'new' as field2 +); + +BEGIN + DECLARE partitions_for_replacement DEFAULT ( + ARRAY( + SELECT DISTINCT DATE(ts) + FROM `staging_table_temp_test_uuid` + WHERE DATE(ts) IS NOT NULL + ) + ); + + MERGE `project-id.dataset-id.incremental_on_schema_change` T + USING `staging_table_temp_test_uuid` S + ON FALSE + WHEN NOT MATCHED BY SOURCE AND DATE(ts) IN UNNEST(partitions_for_replacement) and T.ts >= '2024-01-01' THEN + DELETE + WHEN NOT MATCHED BY TARGET THEN + INSERT (`id`,`field1`) VALUES (`id`,`field1`); +END; + +DROP TABLE IF EXISTS `staging_table_temp_test_uuid` diff --git a/core/actions/incremental_table.ts b/core/actions/incremental_table.ts index 9abd4477b..6cafdf1cb 100644 --- a/core/actions/incremental_table.ts +++ b/core/actions/incremental_table.ts @@ -217,6 +217,30 @@ export class IncrementalTable extends ActionBuilder { } : {}), }); this.proto.onSchemaChange = this.mapOnSchemaChange(config.onSchemaChange); + this.proto.incrementalStrategy = this.mapIncrementalStrategy(config.incrementalStrategy); + + switch (this.proto.incrementalStrategy) { + case dataform.IncrementalStrategy.INSERT_OVERWRITE: + if (!this.proto.bigquery || !this.proto.bigquery.partitionBy) { + this.session.compileError( + new Error("incrementalStrategy 'insert_overwrite' requires 'partitionBy' to be set."), + config.filename, + this.proto.target + ); + } + break; + case dataform.IncrementalStrategy.MERGE: + if (!this.proto.uniqueKey || this.proto.uniqueKey.length === 0) { + this.session.compileError( + new Error("incrementalStrategy 'merge' requires 'uniqueKey' to be set."), + config.filename, + this.proto.target + ); + } + break; + default: + break; + } if (config.reservation) { if (!this.proto.actionDescriptor) { @@ -736,6 +760,38 @@ export class IncrementalTable extends ActionBuilder { throw new Error(`OnSchemaChange value "${onSchemaChange}" is not supported`); } } + + private mapIncrementalStrategy( + incrementalStrategy?: string | number + ): dataform.IncrementalStrategy { + if (!incrementalStrategy) { + return dataform.IncrementalStrategy.INCREMENTAL_STRATEGY_UNSPECIFIED; + } + + if (typeof incrementalStrategy === "number") { + switch (incrementalStrategy) { + case dataform.ActionConfig.IncrementalStrategy.INCREMENTAL_STRATEGY_UNSPECIFIED: + return dataform.IncrementalStrategy.INCREMENTAL_STRATEGY_UNSPECIFIED; + case dataform.ActionConfig.IncrementalStrategy.MERGE: + return dataform.IncrementalStrategy.MERGE; + case dataform.ActionConfig.IncrementalStrategy.INSERT_OVERWRITE: + return dataform.IncrementalStrategy.INSERT_OVERWRITE; + default: + throw new Error(`IncrementalStrategy value "${incrementalStrategy}" is not supported`); + } + } + + switch (incrementalStrategy.toString().toUpperCase()) { + case "INCREMENTAL_STRATEGY_UNSPECIFIED": + return dataform.IncrementalStrategy.INCREMENTAL_STRATEGY_UNSPECIFIED; + case "MERGE": + return dataform.IncrementalStrategy.MERGE; + case "INSERT_OVERWRITE": + return dataform.IncrementalStrategy.INSERT_OVERWRITE; + default: + throw new Error(`IncrementalStrategy value "${incrementalStrategy}" is not supported`); + } + } } /** diff --git a/core/actions/incremental_table_test.ts b/core/actions/incremental_table_test.ts index 9b444fbb9..b26d39d70 100644 --- a/core/actions/incremental_table_test.ts +++ b/core/actions/incremental_table_test.ts @@ -62,6 +62,7 @@ actions: onSchemaChange: "IGNORE", query: "SELECT 1", incrementalQuery: "SELECT 1", + incrementalStrategy: "INCREMENTAL_STRATEGY_UNSPECIFIED", type: "incremental", enumType: "INCREMENTAL", protected: true, @@ -185,6 +186,7 @@ SELECT 1` fileName: `definitions/${testParameters.filename}`, query: "\n\n\nSELECT 1", incrementalQuery: "\n\n\nSELECT 1", + incrementalStrategy: "INCREMENTAL_STRATEGY_UNSPECIFIED", actionDescriptor: { ...exampleActionDescriptor.outputActionDescriptor, reservation: "reservation", @@ -251,6 +253,7 @@ SELECT 1`; fileName: `definitions/${minimalIncrementalTableName}.sqlx`, query: "\n\n\nSELECT 1", incrementalQuery: "\n\n\nSELECT 1", + incrementalStrategy: "INCREMENTAL_STRATEGY_UNSPECIFIED", } ]); }) @@ -348,6 +351,7 @@ actions: fileName: "definitions/filename.sql", query: "SELECT 1", incrementalQuery: "SELECT 1", + incrementalStrategy: "INCREMENTAL_STRATEGY_UNSPECIFIED", actionDescriptor: { bigqueryLabels: { key: "val" @@ -964,6 +968,7 @@ select \${incremental()} as is_incremental` hermeticity: "NON_HERMETIC", onSchemaChange: "IGNORE", fileName: "definitions/incremental.js", + incrementalStrategy: "INCREMENTAL_STRATEGY_UNSPECIFIED", jitCode: '(ctx) => Promise.resolve({query: "select 1", incrementalQuery: "select 1"})', actionDescriptor: { compilationMode: "ACTION_COMPILATION_MODE_JIT" @@ -987,4 +992,128 @@ select \${incremental()} as is_incremental` expect(result.compile.compiledGraph.graphErrors.compilationErrors.some(e => e.message.includes("Cannot mix AoT and JiT compilation"))).equals(true); }); }); + + suite("incrementalStrategy", () => { + test("compiles successfully with insert_overwrite and partitionBy", () => { + const projectDir = tmpDirFixture.createNewTmpDir(); + fs.writeFileSync( + path.join(projectDir, "workflow_settings.yaml"), + VALID_WORKFLOW_SETTINGS_YAML + ); + fs.mkdirSync(path.join(projectDir, "definitions")); + fs.writeFileSync( + path.join(projectDir, "definitions/incremental.sqlx"), + `config { + type: "incremental", + incrementalStrategy: "insert_overwrite", + partitionBy: "DATE(ts)" + } + SELECT 1` + ); + + const result = runMainInVm(coreExecutionRequestFromPath(projectDir)); + + expect(result.compile.compiledGraph.graphErrors.compilationErrors).deep.equals([]); + expect(result.compile.compiledGraph.tables[0].incrementalStrategy).equals( + dataform.IncrementalStrategy.INSERT_OVERWRITE + ); + }); + + test("compilation fails with insert_overwrite and missing partitionBy", () => { + const projectDir = tmpDirFixture.createNewTmpDir(); + fs.writeFileSync( + path.join(projectDir, "workflow_settings.yaml"), + VALID_WORKFLOW_SETTINGS_YAML + ); + fs.mkdirSync(path.join(projectDir, "definitions")); + fs.writeFileSync( + path.join(projectDir, "definitions/incremental.sqlx"), + `config { + type: "incremental", + incrementalStrategy: "insert_overwrite" + } + SELECT 1` + ); + + const result = runMainInVm(coreExecutionRequestFromPath(projectDir)); + + expect(result.compile.compiledGraph.graphErrors.compilationErrors.length).greaterThan(0); + expect(result.compile.compiledGraph.graphErrors.compilationErrors[0].message).contains( + "incrementalStrategy 'insert_overwrite' requires 'partitionBy' to be set." + ); + }); + + test("compiles successfully with merge and uniqueKey", () => { + const projectDir = tmpDirFixture.createNewTmpDir(); + fs.writeFileSync( + path.join(projectDir, "workflow_settings.yaml"), + VALID_WORKFLOW_SETTINGS_YAML + ); + fs.mkdirSync(path.join(projectDir, "definitions")); + fs.writeFileSync( + path.join(projectDir, "definitions/incremental.sqlx"), + `config { + type: "incremental", + incrementalStrategy: "merge", + uniqueKey: ["id"] + } + SELECT 1` + ); + + const result = runMainInVm(coreExecutionRequestFromPath(projectDir)); + + expect(result.compile.compiledGraph.graphErrors.compilationErrors).deep.equals([]); + expect(result.compile.compiledGraph.tables[0].incrementalStrategy).equals( + dataform.IncrementalStrategy.MERGE + ); + }); + + test("compilation fails with merge and missing uniqueKey", () => { + const projectDir = tmpDirFixture.createNewTmpDir(); + fs.writeFileSync( + path.join(projectDir, "workflow_settings.yaml"), + VALID_WORKFLOW_SETTINGS_YAML + ); + fs.mkdirSync(path.join(projectDir, "definitions")); + fs.writeFileSync( + path.join(projectDir, "definitions/incremental.sqlx"), + `config { + type: "incremental", + incrementalStrategy: "merge" + } + SELECT 1` + ); + + const result = runMainInVm(coreExecutionRequestFromPath(projectDir)); + + expect(result.compile.compiledGraph.graphErrors.compilationErrors.length).greaterThan(0); + expect(result.compile.compiledGraph.graphErrors.compilationErrors[0].message).contains( + "incrementalStrategy 'merge' requires 'uniqueKey' to be set." + ); + }); + + test("compilation fails with invalid incrementalStrategy", () => { + const projectDir = tmpDirFixture.createNewTmpDir(); + fs.writeFileSync( + path.join(projectDir, "workflow_settings.yaml"), + VALID_WORKFLOW_SETTINGS_YAML + ); + fs.mkdirSync(path.join(projectDir, "definitions")); + fs.writeFileSync( + path.join(projectDir, "definitions/incremental.sqlx"), + `config { + type: "incremental", + incrementalStrategy: "invalid_strategy" + } + SELECT 1` + ); + + const result = runMainInVm(coreExecutionRequestFromPath(projectDir)); + + expect(result.compile.compiledGraph.graphErrors.compilationErrors.length).greaterThan(0); + expect(result.compile.compiledGraph.graphErrors.compilationErrors[0].message).contains( + 'IncrementalStrategy value "invalid_strategy" is not supported' + ); + }); + }); }); diff --git a/core/actions/table_test.ts b/core/actions/table_test.ts index 38a010d1e..aaaa3c098 100644 --- a/core/actions/table_test.ts +++ b/core/actions/table_test.ts @@ -484,6 +484,30 @@ ${exampleBuiltInAssertionsAsYaml.inputActionConfigBlock} ); }); + test("fails compilation if incrementalStrategy is set on standard table", () => { + const projectDir = tmpDirFixture.createNewTmpDir(); + fs.writeFileSync( + path.join(projectDir, "workflow_settings.yaml"), + VALID_WORKFLOW_SETTINGS_YAML + ); + fs.mkdirSync(path.join(projectDir, "definitions")); + fs.writeFileSync( + path.join(projectDir, "definitions/table.sqlx"), + `config { + type: "table", + incrementalStrategy: "merge" + } + SELECT 1` + ); + + const result = runMainInVm(coreExecutionRequestFromPath(projectDir)); + + expect(result.compile.compiledGraph.graphErrors.compilationErrors.length).greaterThan(0); + expect(result.compile.compiledGraph.graphErrors.compilationErrors[0].message).contains( + 'Unexpected property "incrementalStrategy"' + ); + }); + suite("Iceberg table options", () => { const setupFiles = ( projectDir: string, diff --git a/core/actions/view_test.ts b/core/actions/view_test.ts index b72c4444e..cc4c3fbdd 100644 --- a/core/actions/view_test.ts +++ b/core/actions/view_test.ts @@ -282,6 +282,30 @@ ${exampleBuiltInAssertionsAsYaml.inputActionConfigBlock} ); }); + test("fails compilation if incrementalStrategy is set on view", () => { + const projectDir = tmpDirFixture.createNewTmpDir(); + fs.writeFileSync( + path.join(projectDir, "workflow_settings.yaml"), + VALID_WORKFLOW_SETTINGS_YAML + ); + fs.mkdirSync(path.join(projectDir, "definitions")); + fs.writeFileSync( + path.join(projectDir, "definitions/view.sqlx"), + `config { + type: "view", + incrementalStrategy: "merge" + } + SELECT 1` + ); + + const result = runMainInVm(coreExecutionRequestFromPath(projectDir)); + + expect(result.compile.compiledGraph.graphErrors.compilationErrors.length).greaterThan(0); + expect(result.compile.compiledGraph.graphErrors.compilationErrors[0].message).contains( + 'Unexpected property "incrementalStrategy"' + ); + }); + suite("jit compilation", () => { test("jit compilation is supported", () => { const projectDir = tmpDirFixture.createNewTmpDir(); diff --git a/core/main_test.ts b/core/main_test.ts index c23f62f2e..068500ce2 100644 --- a/core/main_test.ts +++ b/core/main_test.ts @@ -1201,6 +1201,7 @@ publish("name", { incrementalPostOps: ["post_op"], incrementalPreOps: ["pre_op"], incrementalQuery: "SELECT 1", + incrementalStrategy: "INCREMENTAL_STRATEGY_UNSPECIFIED", protected: false, onSchemaChange: "IGNORE" } @@ -1266,6 +1267,7 @@ publish("name", { ...(tableType === "incremental" ? { incrementalQuery: "SELECT * FROM `defaultProject.defaultDataset.operation`", + incrementalStrategy: "INCREMENTAL_STRATEGY_UNSPECIFIED", protected: false, onSchemaChange: "IGNORE" } @@ -1315,6 +1317,7 @@ publish("name", { incrementalQuery: "SELECT 1", protected: true, onSchemaChange: "IGNORE", + incrementalStrategy: "INCREMENTAL_STRATEGY_UNSPECIFIED", } ]); }); diff --git a/protos/configs.proto b/protos/configs.proto index ad8668c96..be682f3e7 100644 --- a/protos/configs.proto +++ b/protos/configs.proto @@ -382,6 +382,18 @@ message ActionConfig { SYNCHRONIZE = 3; } + enum IncrementalStrategy { + // Strategy is unspecified. Defaults to MERGE if uniqueKey is configured, + // or APPEND otherwise. + INCREMENTAL_STRATEGY_UNSPECIFIED = 0; + // MERGE strategy. Matches rows based on uniqueKey to update existing rows + // or insert new ones. + MERGE = 1; + // INSERT_OVERWRITE strategy. Overwrites entire partitions in the target + // table that are present in the new staging data. + INSERT_OVERWRITE = 2; + } + message IncrementalTableConfig { // The name of the incremental table. string name = 1; @@ -485,6 +497,10 @@ message ActionConfig { // If unset, the value from workflow_settings.yaml is used. If neither is set, default BigQuery behavior applies. // Dataform CLI only (GCP Dataform support pending). string reservation = 27; + + // Optional. The incremental strategy to use when updating the table. + // Defaults to MERGE if uniqueKey is configured, or APPEND otherwise. + IncrementalStrategy incremental_strategy = 28; } message AssertionConfig { diff --git a/protos/core.proto b/protos/core.proto index 91657f3cd..a65896937 100644 --- a/protos/core.proto +++ b/protos/core.proto @@ -175,6 +175,18 @@ enum OnSchemaChange { SYNCHRONIZE = 3; } +enum IncrementalStrategy { + // Strategy is unspecified. Defaults to MERGE if uniqueKey is configured, + // or APPEND otherwise. + INCREMENTAL_STRATEGY_UNSPECIFIED = 0; + // MERGE strategy. Matches rows based on uniqueKey to update existing row + // or insert new ones. + MERGE = 1; + // INSERT_OVERWRITE strategy. Overwrites entire partitions in the target + // table that are present in the new staging data. + INSERT_OVERWRITE = 2; +} + message Table { // For legacy compatability reasons, both incremental tables and views are // configured in compiled graphs via the Table proto. @@ -211,6 +223,9 @@ message Table { string jit_code = 38; + // The incremental strategy to use when updating the incremental table. + IncrementalStrategy incremental_strategy = 39; + // Warehouse specific features. BigQueryOptions bigquery = 22;