-
Notifications
You must be signed in to change notification settings - Fork 19
Optimize SqlTableWriter #269
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
ae46c9d
854510e
f240b5e
13938a1
3f49fca
5be70f7
bf574fb
f7fba6e
0aeff20
dd54c0f
d3f6566
804873e
ca0221e
59deacf
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -3,6 +3,7 @@ | |||||||||||||||||||||||||||||||||||||||
| import logging | ||||||||||||||||||||||||||||||||||||||||
| from tempfile import NamedTemporaryFile | ||||||||||||||||||||||||||||||||||||||||
| import zipfile | ||||||||||||||||||||||||||||||||||||||||
| import itertools | ||||||||||||||||||||||||||||||||||||||||
| from itertools import zip_longest | ||||||||||||||||||||||||||||||||||||||||
| from typing import Optional | ||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||
|
|
@@ -16,6 +17,8 @@ | |||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||
| logger = logging.getLogger(__name__) | ||||||||||||||||||||||||||||||||||||||||
| MAX_COLUMN_SIZE = 2000 | ||||||||||||||||||||||||||||||||||||||||
| SCHEMA_CHECK_ROWS = 10 | ||||||||||||||||||||||||||||||||||||||||
| BATCH_SIZE = 1000 | ||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||
| def ensure_text(v, convert_none=False): | ||||||||||||||||||||||||||||||||||||||||
|
|
@@ -605,25 +608,111 @@ def upsert(self, table, row_dict): | |||||||||||||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||||||||||||||
| self.connection.execute(update) | ||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||
| def _commit(self): | ||||||||||||||||||||||||||||||||||||||||
| # Explicit commit works for all DB types. Replace with explicit | ||||||||||||||||||||||||||||||||||||||||
| # transactions when upgrading to SQLAlchemy 2.0 | ||||||||||||||||||||||||||||||||||||||||
| self.connection.execute(sqlalchemy.text('COMMIT')) | ||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||
| def bulk_upsert(self, table, batch): | ||||||||||||||||||||||||||||||||||||||||
| if not batch: | ||||||||||||||||||||||||||||||||||||||||
| return | ||||||||||||||||||||||||||||||||||||||||
| # SQLAlchemy requires all dicts in `batch` to have the same keys | ||||||||||||||||||||||||||||||||||||||||
| # for `insert(table).values(batch)`. We need to drop the columns | ||||||||||||||||||||||||||||||||||||||||
| # whose values are always `None` to reproduce the behavior of | ||||||||||||||||||||||||||||||||||||||||
| # `SqlTableWriter.insert()`. `batch_keys` are the columns where | ||||||||||||||||||||||||||||||||||||||||
| # _any_ row has a value set. | ||||||||||||||||||||||||||||||||||||||||
| batch_keys = set() | ||||||||||||||||||||||||||||||||||||||||
| for row_dict in batch: | ||||||||||||||||||||||||||||||||||||||||
| for key, value in row_dict.items(): | ||||||||||||||||||||||||||||||||||||||||
| if value is not None: | ||||||||||||||||||||||||||||||||||||||||
| batch_keys.add(key) | ||||||||||||||||||||||||||||||||||||||||
| batch = [{k: row_dict[k] for k in batch_keys} for row_dict in batch] | ||||||||||||||||||||||||||||||||||||||||
| if self.is_postgres: | ||||||||||||||||||||||||||||||||||||||||
| from sqlalchemy.dialects.postgresql import insert | ||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||
| stmt = insert(table).values(batch) | ||||||||||||||||||||||||||||||||||||||||
| new_row = stmt.excluded | ||||||||||||||||||||||||||||||||||||||||
| elif self.is_mysql: | ||||||||||||||||||||||||||||||||||||||||
| from sqlalchemy.dialects.mysql import insert | ||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||
| stmt = insert(table).values(batch) | ||||||||||||||||||||||||||||||||||||||||
| new_row = stmt.inserted | ||||||||||||||||||||||||||||||||||||||||
| else: | ||||||||||||||||||||||||||||||||||||||||
| # MSSQL and others: fall back to row-by-row | ||||||||||||||||||||||||||||||||||||||||
| for row_dict in batch: | ||||||||||||||||||||||||||||||||||||||||
| self.upsert(table, row_dict) | ||||||||||||||||||||||||||||||||||||||||
| return | ||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||
| # Use COALESCE so that a None in the inserted row preserves the | ||||||||||||||||||||||||||||||||||||||||
| # existing column value, matching the per-row upsert() which | ||||||||||||||||||||||||||||||||||||||||
| # strips Nones before building the UPDATE. | ||||||||||||||||||||||||||||||||||||||||
| # Only reference columns that already exist on the table. New | ||||||||||||||||||||||||||||||||||||||||
| # columns in batch_keys would raise KeyError here; the INSERT | ||||||||||||||||||||||||||||||||||||||||
| # itself will then fail and _flush_batch retries after fixing | ||||||||||||||||||||||||||||||||||||||||
| # the schema. | ||||||||||||||||||||||||||||||||||||||||
| update_cols = { | ||||||||||||||||||||||||||||||||||||||||
| c.name: sqlalchemy.func.coalesce(new_row[c.name], c) | ||||||||||||||||||||||||||||||||||||||||
| for c in table.columns | ||||||||||||||||||||||||||||||||||||||||
| if c.name != 'id' and c.name in batch_keys | ||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||
| if self.is_postgres: | ||||||||||||||||||||||||||||||||||||||||
| stmt = stmt.on_conflict_do_update( | ||||||||||||||||||||||||||||||||||||||||
| index_elements=['id'], | ||||||||||||||||||||||||||||||||||||||||
| set_=update_cols, | ||||||||||||||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||||||||||||||
| else: | ||||||||||||||||||||||||||||||||||||||||
| stmt = stmt.on_duplicate_key_update(**update_cols) | ||||||||||||||||||||||||||||||||||||||||
| self.connection.execute(stmt) | ||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||
| def _flush_batch(self, table, batch, data_type_dict): | ||||||||||||||||||||||||||||||||||||||||
| try: | ||||||||||||||||||||||||||||||||||||||||
| self.bulk_upsert(table, batch) | ||||||||||||||||||||||||||||||||||||||||
| except ( | ||||||||||||||||||||||||||||||||||||||||
| sqlalchemy.exc.CompileError, | ||||||||||||||||||||||||||||||||||||||||
| sqlalchemy.exc.OperationalError, | ||||||||||||||||||||||||||||||||||||||||
| sqlalchemy.exc.ProgrammingError, | ||||||||||||||||||||||||||||||||||||||||
| sqlalchemy.exc.DataError, | ||||||||||||||||||||||||||||||||||||||||
| ): | ||||||||||||||||||||||||||||||||||||||||
| # Likely a schema mismatch; fix schema and retry once | ||||||||||||||||||||||||||||||||||||||||
| for row_dict in batch: | ||||||||||||||||||||||||||||||||||||||||
| table = self.make_table_compatible( | ||||||||||||||||||||||||||||||||||||||||
| table, | ||||||||||||||||||||||||||||||||||||||||
| row_dict, | ||||||||||||||||||||||||||||||||||||||||
| data_type_dict, | ||||||||||||||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||||||||||||||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Okay this answers my earlier question on the spec. |
||||||||||||||||||||||||||||||||||||||||
| self.bulk_upsert(table, batch) | ||||||||||||||||||||||||||||||||||||||||
| self._commit() | ||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||
| def write_table(self, table_spec: TableSpec) -> None: | ||||||||||||||||||||||||||||||||||||||||
| table_name = table_spec.name | ||||||||||||||||||||||||||||||||||||||||
| headings = table_spec.headings | ||||||||||||||||||||||||||||||||||||||||
| data_type_dict = dict(zip_longest(headings, table_spec.data_types)) | ||||||||||||||||||||||||||||||||||||||||
| for i, row in enumerate(table_spec.rows): | ||||||||||||||||||||||||||||||||||||||||
| row_dict = dict(zip(headings, row)) | ||||||||||||||||||||||||||||||||||||||||
| if i == 0: | ||||||||||||||||||||||||||||||||||||||||
| table = self.get_table(table_name) | ||||||||||||||||||||||||||||||||||||||||
| if table is None: | ||||||||||||||||||||||||||||||||||||||||
| table = self.create_table( | ||||||||||||||||||||||||||||||||||||||||
| table_name, | ||||||||||||||||||||||||||||||||||||||||
| row_dict, | ||||||||||||||||||||||||||||||||||||||||
| data_type_dict, | ||||||||||||||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||||||||||||||
| # Checks the data type for every cell in every row. Maybe we | ||||||||||||||||||||||||||||||||||||||||
| # can use a future version of the data dictionary to avoid | ||||||||||||||||||||||||||||||||||||||||
| # this? | ||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||
| rows = (dict(zip(headings, row)) for row in table_spec.rows) | ||||||||||||||||||||||||||||||||||||||||
| first_row = next(rows, None) | ||||||||||||||||||||||||||||||||||||||||
| if first_row is None: | ||||||||||||||||||||||||||||||||||||||||
| return | ||||||||||||||||||||||||||||||||||||||||
| row_stream = itertools.chain([first_row], rows) | ||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||
| table = self.get_table(table_name) | ||||||||||||||||||||||||||||||||||||||||
| if table is None: | ||||||||||||||||||||||||||||||||||||||||
| table = self.create_table(table_name, first_row, data_type_dict) | ||||||||||||||||||||||||||||||||||||||||
|
Comment on lines
+691
to
+699
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nit: Would the following work? I think the only difference would be that Not a big deal at all; the way you have it is not much harder to understand.
Suggested change
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Yes, and neither would |
||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||
| for row_dict in itertools.islice(row_stream, SCHEMA_CHECK_ROWS): | ||||||||||||||||||||||||||||||||||||||||
| table = self.make_table_compatible(table, row_dict, data_type_dict) | ||||||||||||||||||||||||||||||||||||||||
| self.upsert(table, row_dict) | ||||||||||||||||||||||||||||||||||||||||
| self._commit() | ||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||
| logger.debug( | ||||||||||||||||||||||||||||||||||||||||
| "Schema check complete for %s rows in table '%s'. " | ||||||||||||||||||||||||||||||||||||||||
| 'Final columns: %s', | ||||||||||||||||||||||||||||||||||||||||
| SCHEMA_CHECK_ROWS, | ||||||||||||||||||||||||||||||||||||||||
| table_name, | ||||||||||||||||||||||||||||||||||||||||
| [c.name for c in table.columns], | ||||||||||||||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||
| for batch in _batched(row_stream, BATCH_SIZE): | ||||||||||||||||||||||||||||||||||||||||
| self._flush_batch(table, batch, data_type_dict) | ||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||
| def _get_columns_for_data(self, row_dict, data_type_dict): | ||||||||||||||||||||||||||||||||||||||||
| return [self.get_id_column()] + [ | ||||||||||||||||||||||||||||||||||||||||
|
|
@@ -638,3 +727,9 @@ def _get_columns_for_data(self, row_dict, data_type_dict): | |||||||||||||||||||||||||||||||||||||||
| and column_name != 'id' | ||||||||||||||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||||||||||||||
| ] | ||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||
| # Use itertools.batched when Python is always >= 3.12 | ||||||||||||||||||||||||||||||||||||||||
| def _batched(iterable, n): | ||||||||||||||||||||||||||||||||||||||||
| while batch := list(itertools.islice(iterable, n)): | ||||||||||||||||||||||||||||||||||||||||
| yield batch | ||||||||||||||||||||||||||||||||||||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this all possible exceptions when there is a incompatibility?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
d3f6566