From ae46c9d0ed96ad39a30a719ce62a15211ec9450c Mon Sep 17 00:00:00 2001 From: Norman Hooper Date: Thu, 9 Apr 2026 14:53:19 -0400 Subject: [PATCH 01/14] Design spec for SQL writer memory optimization --- .../specs/sql-writer-memory-optimization.md | 191 ++++++++++++++++++ 1 file changed, 191 insertions(+) create mode 100644 claude/specs/sql-writer-memory-optimization.md diff --git a/claude/specs/sql-writer-memory-optimization.md b/claude/specs/sql-writer-memory-optimization.md new file mode 100644 index 00000000..f74fa5b9 --- /dev/null +++ b/claude/specs/sql-writer-memory-optimization.md @@ -0,0 +1,191 @@ +# SQL Writer Memory Optimization + +## Problem + +When exporting millions of rows to a SQL database, `SqlTableWriter` in +`commcare_export/writers.py` uses excessive memory because: + +1. **Per-row schema checking**: `make_table_compatible()` is called for every + row, creating `MigrationContext` and `Operations` objects each time. On + schema changes it calls `metadata.clear()` and re-reflects the table, + creating churn in SQLAlchemy's `MetaData` object graph. + +2. **Per-row SQL compilation**: `upsert()` compiles a new `INSERT` (and + potentially `UPDATE`) statement object for every row. + +3. **No commit batching**: The entire export runs without intermediate commits, + so the database driver may buffer transaction state for the full duration. + +## Design + +### 1. Limit `make_table_compatible()` to the first N rows + +**Constant**: `SCHEMA_CHECK_ROWS = 10` + +In `write_table()`, only call `make_table_compatible()` for rows where +`i < SCHEMA_CHECK_ROWS`. After that, assume the schema is stable. + +This also addresses SQLAlchemy `MetaData` object growth: the +`metadata.clear()` / `get_table()` cycle that causes MetaData churn only runs +inside `make_table_compatible()`. Once we stop calling it after row 10, the +MetaData object holds only the stable reflected table and no further +MigrationContext/Operations objects are allocated. + +**Logging**: After the schema-check phase completes (at row `SCHEMA_CHECK_ROWS` +or when all rows have been consumed, whichever comes first), log a message at +`DEBUG` level confirming that schema checks are complete and listing the final +column set. + +#### Error handling + +After row 10, a row may contain a value that is incompatible with the current +column type, or a column that doesn't exist in the table. When a batch insert +fails: + +1. Check whether the failure is due to a schema mismatch (missing column or + type incompatibility). +2. If so, call `make_table_compatible()` for the failing row, then retry the + entire batch. Only do this **once per batch** -- if the retry also fails, + raise the original exception. +3. If the failure is not schema-related, raise immediately. + +This keeps the common path fast while still handling late schema evolution. + +### 2. Batch writes and commits + +**Constant**: `BATCH_SIZE = 1000` + +Replace the current row-at-a-time loop with batched processing: + +``` +accumulate rows into a batch (list of row dicts) +when batch is full (or rows are exhausted): + try: + bulk upsert the batch + except schema error: + run make_table_compatible on the failing row + retry the batch once + commit +``` + +#### Bulk upsert strategy + +The current `upsert()` method does `INSERT`, then falls back to `UPDATE` on +`IntegrityError`. For batched operation, use **dialect-specific bulk upsert**: + +- **PostgreSQL**: Use `sqlalchemy.dialects.postgresql.insert` with + `on_conflict_do_update()` on the `id` primary key. Pass the full batch to + `connection.execute(statement, batch_list)`. + +- **MySQL**: Use `sqlalchemy.dialects.mysql.insert` with + `on_duplicate_key_update()`. Pass the full batch to + `connection.execute(statement, batch_list)`. + +- **MSSQL**: Use a `MERGE` statement via raw SQL, or fall back to row-by-row + upsert within the batch. MSSQL doesn't have a clean SQLAlchemy upsert API. + +- **Other dialects** (Oracle, etc.): Fall back to row-by-row `upsert()` within + the batch, same as current behavior. The commit-per-batch improvement still + applies. + +Implement a method `bulk_upsert(table, batch)` that dispatches on +`self.is_postgres` / `self.is_mysql` / `self.is_mssql` and falls back to +row-by-row for unsupported dialects. + +#### Commits + +Call `self.connection.execute(text("COMMIT"))` (or use the connection's +transaction API) after each batch. This bounds the transaction size and lets the +database release resources. + +#### Filtering None values + +The current `upsert()` strips `None` values from `row_dict` so that columns +that don't yet exist aren't included. For bulk inserts, all rows in a batch must +have the **same set of keys**. To handle this: + +- After schema checking is complete (row 10+), build the batch using a stable + set of column names (the headings from `table_spec.headings`). +- Include `None` values as-is -- the columns exist by this point (all columns + are nullable). +- During the schema-check phase (rows 0-9), continue using row-by-row + `upsert()` with None-stripping, since columns may not exist yet. + +### 3. Revised `write_table()` flow + +```python +def write_table(self, table_spec): + table_name = table_spec.name + headings = table_spec.headings + data_type_dict = dict(zip_longest(headings, table_spec.data_types)) + + table = None + batch = [] + + for i, row in enumerate(table_spec.rows): + row_dict = dict(zip(headings, row)) + + if i == 0: + table = self.get_table(table_name) + if table is None: + table = self.create_table(table_name, row_dict, data_type_dict) + + if i < SCHEMA_CHECK_ROWS: + # Schema-check phase: row-by-row with full compatibility checks + table = self.make_table_compatible(table, row_dict, data_type_dict) + self.upsert(table, row_dict) + else: + # Batched phase + batch.append(row_dict) + if len(batch) >= BATCH_SIZE: + self._flush_batch(table, batch) + batch = [] + + # Flush any remaining rows + if batch: + self._flush_batch(table, batch) + + # Commit any remaining schema-check-phase rows + if table is not None: + self._commit() + + +def _flush_batch(self, table, batch): + try: + self.bulk_upsert(table, batch) + except (sqlalchemy.exc.OperationalError, sqlalchemy.exc.ProgrammingError): + # Likely a schema mismatch; try to fix and retry once + for row_dict in batch: + table = self.make_table_compatible(table, row_dict, ...) + self.bulk_upsert(table, batch) # retry; raise on second failure + self._commit() +``` + +### 4. What this does NOT change + +- **API fetching**: Already lazy/paginated. No changes needed. +- **minilinq pipeline**: Already uses generators. No changes needed. +- **`JValueTableWriter`**: Used for JSON output, not SQL. Out of scope. +- **`StreamingMarkdownTableWriter`**: Used for terminal display. Out of scope. +- **The `upsert()` method**: Kept as-is for the schema-check phase and for + dialect fallback within batches. + +## Files to modify + +- `commcare_export/writers.py`: All changes are in `SqlTableWriter`. + - Add constants `SCHEMA_CHECK_ROWS` and `BATCH_SIZE`. + - Add `bulk_upsert()` method. + - Add `_flush_batch()` method. + - Add `_commit()` helper. + - Rewrite `write_table()` as described above. + +## Testing + +- Existing tests in `tests/test_writers.py` must continue to pass (they + exercise the full write_table path with small row counts that stay within the + schema-check phase). +- Add a test that writes > `SCHEMA_CHECK_ROWS` rows to verify the batched + path is exercised. +- Add a test that introduces a new column after row 10 to verify the + schema-mismatch retry logic. +- Test with PostgreSQL and MySQL dialects (at minimum) to verify bulk upsert. From 854510e836d79d451819cfd64c434259236877a5 Mon Sep 17 00:00:00 2001 From: Norman Hooper Date: Fri, 10 Apr 2026 15:06:01 -0400 Subject: [PATCH 02/14] Add SCHEMA_CHECK_ROWS, BATCH_SIZE constants and _commit() helper Co-Authored-By: Claude Opus 4.6 (1M context) --- commcare_export/writers.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/commcare_export/writers.py b/commcare_export/writers.py index 54efe9c8..388c9e7b 100644 --- a/commcare_export/writers.py +++ b/commcare_export/writers.py @@ -16,6 +16,8 @@ logger = logging.getLogger(__name__) MAX_COLUMN_SIZE = 2000 +SCHEMA_CHECK_ROWS = 10 +BATCH_SIZE = 1000 def ensure_text(v, convert_none=False): @@ -605,6 +607,11 @@ def upsert(self, table, row_dict): ) self.connection.execute(update) + def _commit(self): + # Explicit commit works for all DB types. Replace with explicit + # transactions when upgrading to SQLAlchemy 2.0 + self.connection.execute(sqlalchemy.text('COMMIT')) + def write_table(self, table_spec: TableSpec) -> None: table_name = table_spec.name headings = table_spec.headings From f240b5ecc39f5779b9e2f46ca72de7c12d0e76ff Mon Sep 17 00:00:00 2001 From: Norman Hooper Date: Fri, 10 Apr 2026 15:06:01 -0400 Subject: [PATCH 03/14] Add bulk_upsert() with dialect-specific INSERT ON CONFLICT Co-Authored-By: Claude Opus 4.6 (1M context) --- commcare_export/writers.py | 29 +++++++++++++++++++++++++++++ tests/test_writers.py | 36 ++++++++++++++++++++++++++++++++++++ 2 files changed, 65 insertions(+) diff --git a/commcare_export/writers.py b/commcare_export/writers.py index 388c9e7b..1cce5d7a 100644 --- a/commcare_export/writers.py +++ b/commcare_export/writers.py @@ -612,6 +612,35 @@ def _commit(self): # transactions when upgrading to SQLAlchemy 2.0 self.connection.execute(sqlalchemy.text('COMMIT')) + def bulk_upsert(self, table, batch): + if not batch: + return + if self.is_postgres: + from sqlalchemy.dialects.postgresql import insert + + stmt = insert(table).values(batch) + update_cols = {c.name: c for c in stmt.excluded if c.name != 'id'} + stmt = stmt.on_conflict_do_update( + index_elements=['id'], + set_=update_cols, + ) + self.connection.execute(stmt) + elif self.is_mysql: + from sqlalchemy.dialects.mysql import insert + + stmt = insert(table).values(batch) + update_cols = { + c.name: stmt.inserted[c.name] + for c in table.columns + if c.name != 'id' + } + stmt = stmt.on_duplicate_key_update(**update_cols) + self.connection.execute(stmt) + else: + # MSSQL and others: fall back to row-by-row + for row_dict in batch: + self.upsert(table, row_dict) + def write_table(self, table_spec: TableSpec) -> None: table_name = table_spec.name headings = table_spec.headings diff --git a/tests/test_writers.py b/tests/test_writers.py index 9b144a2e..9620ec7b 100644 --- a/tests/test_writers.py +++ b/tests/test_writers.py @@ -709,6 +709,42 @@ def test_mssql_nvarchar_length_downsize(self, writer): ) assert result['some_data'] == ('some_data', 'nvarchar', -1) + def test_bulk_upsert(self, writer): + # Create table via normal write_table path + with writer: + writer.write_table( + TableSpec( + name='foo_bulk_upsert', + headings=['id', 'a', 'b'], + rows=[ + ['row1', 'val1', 'x'], + ['row2', 'val2', 'y'], + ], + ) + ) + + # bulk_upsert: update row1, insert row3 + with writer: + table = writer.get_table('foo_bulk_upsert') + batch = [ + {'id': 'row1', 'a': 'updated1', 'b': 'ux'}, + {'id': 'row3', 'a': 'val3', 'b': 'z'}, + ] + writer.bulk_upsert(table, batch) + writer._commit() + + with writer: + result = { + row['id']: dict(row) + for row in writer.connection.execute( + 'SELECT id, a, b FROM foo_bulk_upsert' + ) + } + assert len(result) == 3 + assert result['row1'] == {'id': 'row1', 'a': 'updated1', 'b': 'ux'} + assert result['row2'] == {'id': 'row2', 'a': 'val2', 'b': 'y'} + assert result['row3'] == {'id': 'row3', 'a': 'val3', 'b': 'z'} + def test_emoji(self, writer): with writer: writer.write_table( From 13938a1a6223257b6949ca4f129e6a457472a814 Mon Sep 17 00:00:00 2001 From: Norman Hooper Date: Fri, 10 Apr 2026 15:06:01 -0400 Subject: [PATCH 04/14] Add _flush_batch() with schema-mismatch retry logic Co-Authored-By: Claude Opus 4.6 (1M context) --- commcare_export/writers.py | 18 ++++++++++++++++++ tests/test_writers.py | 36 ++++++++++++++++++++++++++++++++++++ 2 files changed, 54 insertions(+) diff --git a/commcare_export/writers.py b/commcare_export/writers.py index 1cce5d7a..11f1faa3 100644 --- a/commcare_export/writers.py +++ b/commcare_export/writers.py @@ -641,6 +641,24 @@ def bulk_upsert(self, table, batch): for row_dict in batch: self.upsert(table, row_dict) + def _flush_batch(self, table, batch, data_type_dict): + try: + self.bulk_upsert(table, batch) + except ( + sqlalchemy.exc.CompileError, + sqlalchemy.exc.OperationalError, + sqlalchemy.exc.ProgrammingError, + ): + # Likely a schema mismatch; fix schema and retry once + for row_dict in batch: + table = self.make_table_compatible( + table, + row_dict, + data_type_dict, + ) + self.bulk_upsert(table, batch) + self._commit() + def write_table(self, table_spec: TableSpec) -> None: table_name = table_spec.name headings = table_spec.headings diff --git a/tests/test_writers.py b/tests/test_writers.py index 9620ec7b..e2ac4d9e 100644 --- a/tests/test_writers.py +++ b/tests/test_writers.py @@ -3,6 +3,7 @@ import io import tempfile import zipfile +from itertools import zip_longest import openpyxl import sqlalchemy @@ -745,6 +746,41 @@ def test_bulk_upsert(self, writer): assert result['row2'] == {'id': 'row2', 'a': 'val2', 'b': 'y'} assert result['row3'] == {'id': 'row3', 'a': 'val3', 'b': 'z'} + def test_flush_batch_retry_on_new_column(self, writer): + # Create table with columns [id, a] + with writer: + writer.write_table( + TableSpec( + name='foo_flush_retry', + headings=['id', 'a'], + rows=[['row1', 'val1']], + ) + ) + + # _flush_batch with a batch containing new column 'b' + with writer: + table = writer.get_table('foo_flush_retry') + headings = ['id', 'a', 'b'] + data_type_dict = dict(zip_longest(headings, [])) + batch = [ + {'id': 'row2', 'a': 'val2', 'b': 'new_col_val'}, + ] + writer._flush_batch(table, batch, data_type_dict) + + with writer: + result = { + row['id']: dict(row) + for row in writer.connection.execute( + 'SELECT id, a, b FROM foo_flush_retry' + ) + } + assert len(result) == 2 + assert result['row2'] == { + 'id': 'row2', + 'a': 'val2', + 'b': 'new_col_val', + } + def test_emoji(self, writer): with writer: writer.write_table( From 3f49fca22d2fc7ec8421e41c20f0ae51839f0cce Mon Sep 17 00:00:00 2001 From: Norman Hooper Date: Fri, 10 Apr 2026 15:06:01 -0400 Subject: [PATCH 05/14] Rewrite write_table() with two-phase schema-check and batched writes Co-Authored-By: Claude Opus 4.6 (1M context) --- commcare_export/writers.py | 51 ++++++++++++++++++-- tests/test_writers.py | 95 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 141 insertions(+), 5 deletions(-) diff --git a/commcare_export/writers.py b/commcare_export/writers.py index 11f1faa3..62aadbef 100644 --- a/commcare_export/writers.py +++ b/commcare_export/writers.py @@ -663,8 +663,14 @@ def write_table(self, table_spec: TableSpec) -> None: table_name = table_spec.name headings = table_spec.headings data_type_dict = dict(zip_longest(headings, table_spec.data_types)) + + table = None + batch = [] + schema_check_complete = False + for i, row in enumerate(table_spec.rows): row_dict = dict(zip(headings, row)) + if i == 0: table = self.get_table(table_name) if table is None: @@ -673,11 +679,46 @@ def write_table(self, table_spec: TableSpec) -> None: row_dict, data_type_dict, ) - # Checks the data type for every cell in every row. Maybe we - # can use a future version of the data dictionary to avoid - # this? - table = self.make_table_compatible(table, row_dict, data_type_dict) - self.upsert(table, row_dict) + + if i < SCHEMA_CHECK_ROWS: + # Schema-check phase: row-by-row with full compatibility + # checks + table = self.make_table_compatible( + table, + row_dict, + data_type_dict, + ) + self.upsert(table, row_dict) + else: + assert table is not None # So that mypy knows it's a Table + if not schema_check_complete: + schema_check_complete = True + self._commit() + logger.debug( + "Schema check complete for table '%s'. Final columns: %s", + table_name, + [c.name for c in table.columns], + ) + batch.append(row_dict) + if len(batch) >= BATCH_SIZE: + self._flush_batch(table, batch, data_type_dict) + batch = [] + + if table is None: + return + + if batch: + self._flush_batch(table, batch, data_type_dict) + else: + # All rows in schema-check phase; commit them + self._commit() + + if not schema_check_complete: + logger.debug( + "Schema check complete for table '%s'. Final columns: %s", + table_name, + [c.name for c in table.columns], + ) def _get_columns_for_data(self, row_dict, data_type_dict): return [self.get_id_column()] + [ diff --git a/tests/test_writers.py b/tests/test_writers.py index e2ac4d9e..f5aab103 100644 --- a/tests/test_writers.py +++ b/tests/test_writers.py @@ -11,6 +11,7 @@ import pytest from commcare_export.specs import TableSpec from commcare_export.writers import ( + SCHEMA_CHECK_ROWS, CsvTableWriter, Excel2007TableWriter, JValueTableWriter, @@ -795,3 +796,97 @@ def test_emoji(self, writer): } ) ) + + def test_batched_write(self, writer): + num_rows = SCHEMA_CHECK_ROWS + 15 + rows = [[f'id_{i}', f'a_{i}', i] for i in range(num_rows)] + with writer: + writer.write_table( + TableSpec( + name='foo_batched_write', + headings=['id', 'a', 'b'], + rows=rows, + ) + ) + + with writer: + result = list( + writer.connection.execute( + 'SELECT id, a, b FROM foo_batched_write' + ) + ) + assert len(result) == num_rows + result_dict = {row['id']: dict(row) for row in result} + for i in range(num_rows): + assert result_dict[f'id_{i}'] == { + 'id': f'id_{i}', + 'a': f'a_{i}', + 'b': i, + } + + def test_batched_upsert(self, writer): + num_rows = SCHEMA_CHECK_ROWS + 5 + rows = [[f'id_{i}', f'a_{i}', i] for i in range(num_rows)] + with writer: + writer.write_table( + TableSpec( + name='foo_batched_upsert', + headings=['id', 'a', 'b'], + rows=rows, + ) + ) + + # Second write: update all existing + add 5 new + rows2 = [ + [f'id_{i}', f'updated_{i}', i + 100] for i in range(num_rows + 5) + ] + with writer: + writer.write_table( + TableSpec( + name='foo_batched_upsert', + headings=['id', 'a', 'b'], + rows=rows2, + ) + ) + + with writer: + result = list( + writer.connection.execute( + 'SELECT id, a, b FROM foo_batched_upsert' + ) + ) + assert len(result) == num_rows + 5 + result_dict = {row['id']: dict(row) for row in result} + for i in range(num_rows + 5): + assert result_dict[f'id_{i}'] == { + 'id': f'id_{i}', + 'a': f'updated_{i}', + 'b': i + 100, + } + + def test_late_schema_change_via_write_table(self, writer): + rows = [] + for i in range(SCHEMA_CHECK_ROWS): + rows.append([f'id_{i}', f'a_{i}', None]) + for i in range(SCHEMA_CHECK_ROWS, SCHEMA_CHECK_ROWS + 5): + rows.append([f'id_{i}', f'a_{i}', f'b_{i}']) + + with writer: + writer.write_table( + TableSpec( + name='foo_late_schema', + headings=['id', 'a', 'b'], + rows=rows, + ) + ) + + with writer: + result = list( + writer.connection.execute( + 'SELECT id, a, b FROM foo_late_schema' + ) + ) + assert len(result) == SCHEMA_CHECK_ROWS + 5 + result_dict = {row['id']: dict(row) for row in result} + for i in range(SCHEMA_CHECK_ROWS, SCHEMA_CHECK_ROWS + 5): + assert result_dict[f'id_{i}']['b'] == f'b_{i}' From 5be70f7c55b6cd913234ec98080a9400f4d85811 Mon Sep 17 00:00:00 2001 From: Norman Hooper Date: Fri, 10 Apr 2026 17:39:44 -0400 Subject: [PATCH 06/14] bulk_upsert to use same keys as upsert --- commcare_export/writers.py | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/commcare_export/writers.py b/commcare_export/writers.py index 62aadbef..6583c865 100644 --- a/commcare_export/writers.py +++ b/commcare_export/writers.py @@ -615,6 +615,20 @@ def _commit(self): def bulk_upsert(self, table, batch): if not batch: return + # SQLAlchemy requires all dicts in `batch` to have the same keys + # for `insert(table).values(batch)`. We need to drop the columns + # whose values are always `None` to reproduce the behavior of + # `SqlTableWriter.insert()`. `batch_keys` are the columns where + # _any_ row has a value set. + batch_keys = { + k for row_dict in batch + for k, v in row_dict.items() + if v is not None + } + batch = [ + {k: row_dict[k] for k in batch_keys} + for row_dict in batch + ] if self.is_postgres: from sqlalchemy.dialects.postgresql import insert From bf574fb04d943232fd8babd745d2d1ecadfd26f9 Mon Sep 17 00:00:00 2001 From: Norman Hooper Date: Fri, 10 Apr 2026 15:42:06 -0400 Subject: [PATCH 07/14] Clean up SQL writer memory optimization spec --- .../specs/sql-writer-memory-optimization.md | 191 ------------------ 1 file changed, 191 deletions(-) delete mode 100644 claude/specs/sql-writer-memory-optimization.md diff --git a/claude/specs/sql-writer-memory-optimization.md b/claude/specs/sql-writer-memory-optimization.md deleted file mode 100644 index f74fa5b9..00000000 --- a/claude/specs/sql-writer-memory-optimization.md +++ /dev/null @@ -1,191 +0,0 @@ -# SQL Writer Memory Optimization - -## Problem - -When exporting millions of rows to a SQL database, `SqlTableWriter` in -`commcare_export/writers.py` uses excessive memory because: - -1. **Per-row schema checking**: `make_table_compatible()` is called for every - row, creating `MigrationContext` and `Operations` objects each time. On - schema changes it calls `metadata.clear()` and re-reflects the table, - creating churn in SQLAlchemy's `MetaData` object graph. - -2. **Per-row SQL compilation**: `upsert()` compiles a new `INSERT` (and - potentially `UPDATE`) statement object for every row. - -3. **No commit batching**: The entire export runs without intermediate commits, - so the database driver may buffer transaction state for the full duration. - -## Design - -### 1. Limit `make_table_compatible()` to the first N rows - -**Constant**: `SCHEMA_CHECK_ROWS = 10` - -In `write_table()`, only call `make_table_compatible()` for rows where -`i < SCHEMA_CHECK_ROWS`. After that, assume the schema is stable. - -This also addresses SQLAlchemy `MetaData` object growth: the -`metadata.clear()` / `get_table()` cycle that causes MetaData churn only runs -inside `make_table_compatible()`. Once we stop calling it after row 10, the -MetaData object holds only the stable reflected table and no further -MigrationContext/Operations objects are allocated. - -**Logging**: After the schema-check phase completes (at row `SCHEMA_CHECK_ROWS` -or when all rows have been consumed, whichever comes first), log a message at -`DEBUG` level confirming that schema checks are complete and listing the final -column set. - -#### Error handling - -After row 10, a row may contain a value that is incompatible with the current -column type, or a column that doesn't exist in the table. When a batch insert -fails: - -1. Check whether the failure is due to a schema mismatch (missing column or - type incompatibility). -2. If so, call `make_table_compatible()` for the failing row, then retry the - entire batch. Only do this **once per batch** -- if the retry also fails, - raise the original exception. -3. If the failure is not schema-related, raise immediately. - -This keeps the common path fast while still handling late schema evolution. - -### 2. Batch writes and commits - -**Constant**: `BATCH_SIZE = 1000` - -Replace the current row-at-a-time loop with batched processing: - -``` -accumulate rows into a batch (list of row dicts) -when batch is full (or rows are exhausted): - try: - bulk upsert the batch - except schema error: - run make_table_compatible on the failing row - retry the batch once - commit -``` - -#### Bulk upsert strategy - -The current `upsert()` method does `INSERT`, then falls back to `UPDATE` on -`IntegrityError`. For batched operation, use **dialect-specific bulk upsert**: - -- **PostgreSQL**: Use `sqlalchemy.dialects.postgresql.insert` with - `on_conflict_do_update()` on the `id` primary key. Pass the full batch to - `connection.execute(statement, batch_list)`. - -- **MySQL**: Use `sqlalchemy.dialects.mysql.insert` with - `on_duplicate_key_update()`. Pass the full batch to - `connection.execute(statement, batch_list)`. - -- **MSSQL**: Use a `MERGE` statement via raw SQL, or fall back to row-by-row - upsert within the batch. MSSQL doesn't have a clean SQLAlchemy upsert API. - -- **Other dialects** (Oracle, etc.): Fall back to row-by-row `upsert()` within - the batch, same as current behavior. The commit-per-batch improvement still - applies. - -Implement a method `bulk_upsert(table, batch)` that dispatches on -`self.is_postgres` / `self.is_mysql` / `self.is_mssql` and falls back to -row-by-row for unsupported dialects. - -#### Commits - -Call `self.connection.execute(text("COMMIT"))` (or use the connection's -transaction API) after each batch. This bounds the transaction size and lets the -database release resources. - -#### Filtering None values - -The current `upsert()` strips `None` values from `row_dict` so that columns -that don't yet exist aren't included. For bulk inserts, all rows in a batch must -have the **same set of keys**. To handle this: - -- After schema checking is complete (row 10+), build the batch using a stable - set of column names (the headings from `table_spec.headings`). -- Include `None` values as-is -- the columns exist by this point (all columns - are nullable). -- During the schema-check phase (rows 0-9), continue using row-by-row - `upsert()` with None-stripping, since columns may not exist yet. - -### 3. Revised `write_table()` flow - -```python -def write_table(self, table_spec): - table_name = table_spec.name - headings = table_spec.headings - data_type_dict = dict(zip_longest(headings, table_spec.data_types)) - - table = None - batch = [] - - for i, row in enumerate(table_spec.rows): - row_dict = dict(zip(headings, row)) - - if i == 0: - table = self.get_table(table_name) - if table is None: - table = self.create_table(table_name, row_dict, data_type_dict) - - if i < SCHEMA_CHECK_ROWS: - # Schema-check phase: row-by-row with full compatibility checks - table = self.make_table_compatible(table, row_dict, data_type_dict) - self.upsert(table, row_dict) - else: - # Batched phase - batch.append(row_dict) - if len(batch) >= BATCH_SIZE: - self._flush_batch(table, batch) - batch = [] - - # Flush any remaining rows - if batch: - self._flush_batch(table, batch) - - # Commit any remaining schema-check-phase rows - if table is not None: - self._commit() - - -def _flush_batch(self, table, batch): - try: - self.bulk_upsert(table, batch) - except (sqlalchemy.exc.OperationalError, sqlalchemy.exc.ProgrammingError): - # Likely a schema mismatch; try to fix and retry once - for row_dict in batch: - table = self.make_table_compatible(table, row_dict, ...) - self.bulk_upsert(table, batch) # retry; raise on second failure - self._commit() -``` - -### 4. What this does NOT change - -- **API fetching**: Already lazy/paginated. No changes needed. -- **minilinq pipeline**: Already uses generators. No changes needed. -- **`JValueTableWriter`**: Used for JSON output, not SQL. Out of scope. -- **`StreamingMarkdownTableWriter`**: Used for terminal display. Out of scope. -- **The `upsert()` method**: Kept as-is for the schema-check phase and for - dialect fallback within batches. - -## Files to modify - -- `commcare_export/writers.py`: All changes are in `SqlTableWriter`. - - Add constants `SCHEMA_CHECK_ROWS` and `BATCH_SIZE`. - - Add `bulk_upsert()` method. - - Add `_flush_batch()` method. - - Add `_commit()` helper. - - Rewrite `write_table()` as described above. - -## Testing - -- Existing tests in `tests/test_writers.py` must continue to pass (they - exercise the full write_table path with small row counts that stay within the - schema-check phase). -- Add a test that writes > `SCHEMA_CHECK_ROWS` rows to verify the batched - path is exercised. -- Add a test that introduces a new column after row 10 to verify the - schema-mismatch retry logic. -- Test with PostgreSQL and MySQL dialects (at minimum) to verify bulk upsert. From f7fba6e210df482015b1fad08826c524aa9a14c0 Mon Sep 17 00:00:00 2001 From: Norman Hooper Date: Tue, 28 Apr 2026 17:48:28 -0400 Subject: [PATCH 08/14] Use itertools --- commcare_export/writers.py | 69 +++++++++++--------------------------- 1 file changed, 19 insertions(+), 50 deletions(-) diff --git a/commcare_export/writers.py b/commcare_export/writers.py index 6583c865..30bd2ea6 100644 --- a/commcare_export/writers.py +++ b/commcare_export/writers.py @@ -3,6 +3,7 @@ import logging from tempfile import NamedTemporaryFile import zipfile +import itertools from itertools import zip_longest from typing import Optional @@ -678,61 +679,29 @@ def write_table(self, table_spec: TableSpec) -> None: headings = table_spec.headings data_type_dict = dict(zip_longest(headings, table_spec.data_types)) - table = None - batch = [] - schema_check_complete = False - - for i, row in enumerate(table_spec.rows): - row_dict = dict(zip(headings, row)) + rows = (dict(zip(headings, row)) for row in table_spec.rows) + first_row = next(rows, None) + if first_row is None: + return + row_stream = itertools.chain([first_row], rows) - if i == 0: - table = self.get_table(table_name) - if table is None: - table = self.create_table( - table_name, - row_dict, - data_type_dict, - ) + table = self.get_table(table_name) + if table is None: + table = self.create_table(table_name, first_row, data_type_dict) - if i < SCHEMA_CHECK_ROWS: - # Schema-check phase: row-by-row with full compatibility - # checks - table = self.make_table_compatible( - table, - row_dict, - data_type_dict, - ) - self.upsert(table, row_dict) - else: - assert table is not None # So that mypy knows it's a Table - if not schema_check_complete: - schema_check_complete = True - self._commit() - logger.debug( - "Schema check complete for table '%s'. Final columns: %s", - table_name, - [c.name for c in table.columns], - ) - batch.append(row_dict) - if len(batch) >= BATCH_SIZE: - self._flush_batch(table, batch, data_type_dict) - batch = [] + for row_dict in itertools.islice(row_stream, SCHEMA_CHECK_ROWS): + table = self.make_table_compatible(table, row_dict, data_type_dict) + self.upsert(table, row_dict) + self._commit() - if table is None: - return + logger.debug( + "Schema check complete for table '%s'. Final columns: %s", + table_name, + [c.name for c in table.columns], + ) - if batch: + for batch in itertools.batched(row_stream, BATCH_SIZE): self._flush_batch(table, batch, data_type_dict) - else: - # All rows in schema-check phase; commit them - self._commit() - - if not schema_check_complete: - logger.debug( - "Schema check complete for table '%s'. Final columns: %s", - table_name, - [c.name for c in table.columns], - ) def _get_columns_for_data(self, row_dict, data_type_dict): return [self.get_id_column()] + [ From 0aeff206dc7ab7c3ee12b573386f1a2abc55bf9a Mon Sep 17 00:00:00 2001 From: Norman Hooper Date: Tue, 28 Apr 2026 18:08:52 -0400 Subject: [PATCH 09/14] itertools.batched was added in Python 3.12 --- commcare_export/writers.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/commcare_export/writers.py b/commcare_export/writers.py index 30bd2ea6..a806f435 100644 --- a/commcare_export/writers.py +++ b/commcare_export/writers.py @@ -700,7 +700,7 @@ def write_table(self, table_spec: TableSpec) -> None: [c.name for c in table.columns], ) - for batch in itertools.batched(row_stream, BATCH_SIZE): + for batch in _batched(row_stream, BATCH_SIZE): self._flush_batch(table, batch, data_type_dict) def _get_columns_for_data(self, row_dict, data_type_dict): @@ -716,3 +716,9 @@ def _get_columns_for_data(self, row_dict, data_type_dict): and column_name != 'id' ) ] + + +# Use itertools.batched when Python is always >= 3.12 +def _batched(iterable, n): + while batch := list(itertools.islice(iterable, n)): + yield batch From dd54c0fc8101d8a5a934b56f7263830be17509b7 Mon Sep 17 00:00:00 2001 From: Norman Hooper Date: Wed, 6 May 2026 17:30:46 -0400 Subject: [PATCH 10/14] Explicitly mention the number of rows (They might not be the first rows in the table. They are the first rows in the export.) --- commcare_export/writers.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/commcare_export/writers.py b/commcare_export/writers.py index a806f435..6cb058dd 100644 --- a/commcare_export/writers.py +++ b/commcare_export/writers.py @@ -695,7 +695,9 @@ def write_table(self, table_spec: TableSpec) -> None: self._commit() logger.debug( - "Schema check complete for table '%s'. Final columns: %s", + "Schema check complete for %s rows in table '%s'. " + "Final columns: %s", + SCHEMA_CHECK_ROWS, table_name, [c.name for c in table.columns], ) From d3f6566262200fbbbe00c08b8541500b14e1fe15 Mon Sep 17 00:00:00 2001 From: Norman Hooper Date: Wed, 6 May 2026 17:35:05 -0400 Subject: [PATCH 11/14] Add DataError to caught exceptions MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - sqlalchemy.exc.CompileError — raised at statement-compile time, before the DB sees anything. Triggered by things like stmt.excluded/stmt.inserted referencing a column that isn't in the table metadata, or a type that can't be rendered for the dialect. - sqlalchemy.exc.OperationalError — DB-side. For schema mismatches this is mostly the MySQL path: "Unknown column 'x' in 'field list'", and also things like "Data too long for column" on some MySQL versions/drivers. (It also covers non-schema issues like deadlocks/timeouts, which the retry won't actually fix.) - sqlalchemy.exc.ProgrammingError — DB-side. Postgres/MSSQL "column … does not exist", "relation does not exist", and similar DDL-shape errors come through here. - sqlalchemy.exc.DataError — DB-side. It's the sibling DBAPI subclass for "value doesn't fit the column" — string longer than VARCHAR(n), number out of range for the integer type, invalid date, numeric overflow, etc. On Postgres this is exactly how value too long for type character varying(N) and numeric field overflow arrive; psycopg2 raises StringDataRightTruncation / NumericValueOutOfRange, which SQLAlchemy maps to DataError. --- commcare_export/writers.py | 1 + 1 file changed, 1 insertion(+) diff --git a/commcare_export/writers.py b/commcare_export/writers.py index 6cb058dd..350d385c 100644 --- a/commcare_export/writers.py +++ b/commcare_export/writers.py @@ -663,6 +663,7 @@ def _flush_batch(self, table, batch, data_type_dict): sqlalchemy.exc.CompileError, sqlalchemy.exc.OperationalError, sqlalchemy.exc.ProgrammingError, + sqlalchemy.exc.DataError, ): # Likely a schema mismatch; fix schema and retry once for row_dict in batch: From 804873eefbffe3ed83b879d95841853b09807aa3 Mon Sep 17 00:00:00 2001 From: Norman Hooper Date: Wed, 6 May 2026 17:42:07 -0400 Subject: [PATCH 12/14] Replace set comprehension with nested for loops --- commcare_export/writers.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/commcare_export/writers.py b/commcare_export/writers.py index 350d385c..80e01b2a 100644 --- a/commcare_export/writers.py +++ b/commcare_export/writers.py @@ -621,11 +621,11 @@ def bulk_upsert(self, table, batch): # whose values are always `None` to reproduce the behavior of # `SqlTableWriter.insert()`. `batch_keys` are the columns where # _any_ row has a value set. - batch_keys = { - k for row_dict in batch - for k, v in row_dict.items() - if v is not None - } + batch_keys = set() + for row_dict in batch: + for key, value in row_dict.items(): + if value is not None: + batch_keys.add(key) batch = [ {k: row_dict[k] for k in batch_keys} for row_dict in batch From ca0221e591231a754ed1eaa57d926e5473b5ed82 Mon Sep 17 00:00:00 2001 From: Norman Hooper Date: Wed, 6 May 2026 21:18:34 -0400 Subject: [PATCH 13/14] Preserve existing values for None in bulk_upsert Wrap the conflict-update SET clause in COALESCE so that a None in the inserted row leaves the existing column value untouched, matching the per-row upsert() which strips Nones before building the UPDATE. Restrict update_cols to batch_keys so columns where every row in the batch is None are not touched at all. Iterate over table.columns and filter by batch_keys instead of using batch_keys directly, since a new column in batch_keys is not yet on the table and table.c[name] raises KeyError. Letting the INSERT fail on the missing column lets _flush_batch retry after schema repair. Co-Authored-By: Claude Opus 4.7 (1M context) --- commcare_export/writers.py | 44 ++++++++++++++++++------------- tests/test_writers.py | 53 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 79 insertions(+), 18 deletions(-) diff --git a/commcare_export/writers.py b/commcare_export/writers.py index 80e01b2a..c49cafbc 100644 --- a/commcare_export/writers.py +++ b/commcare_export/writers.py @@ -626,35 +626,43 @@ def bulk_upsert(self, table, batch): for key, value in row_dict.items(): if value is not None: batch_keys.add(key) - batch = [ - {k: row_dict[k] for k in batch_keys} - for row_dict in batch - ] + batch = [{k: row_dict[k] for k in batch_keys} for row_dict in batch] if self.is_postgres: from sqlalchemy.dialects.postgresql import insert stmt = insert(table).values(batch) - update_cols = {c.name: c for c in stmt.excluded if c.name != 'id'} - stmt = stmt.on_conflict_do_update( - index_elements=['id'], - set_=update_cols, - ) - self.connection.execute(stmt) + new_row = stmt.excluded elif self.is_mysql: from sqlalchemy.dialects.mysql import insert stmt = insert(table).values(batch) - update_cols = { - c.name: stmt.inserted[c.name] - for c in table.columns - if c.name != 'id' - } - stmt = stmt.on_duplicate_key_update(**update_cols) - self.connection.execute(stmt) + new_row = stmt.inserted else: # MSSQL and others: fall back to row-by-row for row_dict in batch: self.upsert(table, row_dict) + return + + # Use COALESCE so that a None in the inserted row preserves the + # existing column value, matching the per-row upsert() which + # strips Nones before building the UPDATE. + # Only reference columns that already exist on the table. New + # columns in batch_keys would raise KeyError here; the INSERT + # itself will then fail and _flush_batch retries after fixing + # the schema. + update_cols = { + c.name: sqlalchemy.func.coalesce(new_row[c.name], c) + for c in table.columns + if c.name != 'id' and c.name in batch_keys + } + if self.is_postgres: + stmt = stmt.on_conflict_do_update( + index_elements=['id'], + set_=update_cols, + ) + else: + stmt = stmt.on_duplicate_key_update(**update_cols) + self.connection.execute(stmt) def _flush_batch(self, table, batch, data_type_dict): try: @@ -697,7 +705,7 @@ def write_table(self, table_spec: TableSpec) -> None: logger.debug( "Schema check complete for %s rows in table '%s'. " - "Final columns: %s", + 'Final columns: %s', SCHEMA_CHECK_ROWS, table_name, [c.name for c in table.columns], diff --git a/tests/test_writers.py b/tests/test_writers.py index f5aab103..0c2dc204 100644 --- a/tests/test_writers.py +++ b/tests/test_writers.py @@ -747,6 +747,59 @@ def test_bulk_upsert(self, writer): assert result['row2'] == {'id': 'row2', 'a': 'val2', 'b': 'y'} assert result['row3'] == {'id': 'row3', 'a': 'val3', 'b': 'z'} + def test_bulk_upsert_preserves_existing_values_for_none(self, writer): + with writer: + writer.write_table( + TableSpec( + name='foo_bulk_upsert_none', + headings=['id', 'a', 'b', 'c'], + rows=[ + ['row1', 'val1', 'x', 'keep1'], + ['row2', 'val2', 'y', 'keep2'], + ], + ) + ) + + # Update row1 with None for b (whole-batch None for column c). + # Both should be preserved at their existing values rather than + # clobbered to NULL. row3 is a new insert; its None values land + # as NULL since there is no prior row. + with writer: + table = writer.get_table('foo_bulk_upsert_none') + batch = [ + {'id': 'row1', 'a': 'updated1', 'b': None, 'c': None}, + {'id': 'row3', 'a': 'val3', 'b': 'z', 'c': None}, + ] + writer.bulk_upsert(table, batch) + writer._commit() + + with writer: + result = { + row['id']: dict(row) + for row in writer.connection.execute( + 'SELECT id, a, b, c FROM foo_bulk_upsert_none' + ) + } + assert len(result) == 3 + assert result['row1'] == { + 'id': 'row1', + 'a': 'updated1', + 'b': 'x', + 'c': 'keep1', + } + assert result['row2'] == { + 'id': 'row2', + 'a': 'val2', + 'b': 'y', + 'c': 'keep2', + } + assert result['row3'] == { + 'id': 'row3', + 'a': 'val3', + 'b': 'z', + 'c': None, + } + def test_flush_batch_retry_on_new_column(self, writer): # Create table with columns [id, a] with writer: From 59deacfb9dd0741be826b25c88772a48ffc05b74 Mon Sep 17 00:00:00 2001 From: Norman Hooper Date: Wed, 6 May 2026 21:43:07 -0400 Subject: [PATCH 14/14] Add type annotations to module-level mutable containers Newer mypy with disallow_any_generics flags untyped dict/list literals assigned at module scope. Co-Authored-By: Claude Opus 4.7 (1M context) --- commcare_export/env.py | 2 +- commcare_export/excel_query.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/commcare_export/env.py b/commcare_export/env.py index e52b76b6..63e6cf55 100644 --- a/commcare_export/env.py +++ b/commcare_export/env.py @@ -16,7 +16,7 @@ logger = logging.getLogger(__name__) -JSONPATH_CACHE = {} +JSONPATH_CACHE: dict[str, Any] = {} class CannotBind(Exception): diff --git a/commcare_export/excel_query.py b/commcare_export/excel_query.py index 8fbf112f..8d9528e6 100644 --- a/commcare_export/excel_query.py +++ b/commcare_export/excel_query.py @@ -696,7 +696,7 @@ def check_columns(parsed_sheets, columns): raise MissingColumnException(errors_by_sheet) -blacklisted_tables = [] +blacklisted_tables: list[str] = [] def blacklist(table_name):