Skip to content

Streaming to reduce memory usage #9

@frehoy

Description

@frehoy

Hello!

Thanks for a very useful library.
My use case involves reading many tables with get_table_data_all and ingesting the data into bigquery, which I'm currently doing by creating a polars dataframe from the api result and writing that to disk as a parquet file which I then upload.

I noticed that the full response is materialised as a list before being returned. That's very convenient but it also means that I need to have enough memory to fit all the data in RAM, which becomes a bit expensive for large tables. For example "TAB6683" uses ~16GB of RAM.

Would you be open to a PR that implements a function like get_table_data_all_iter that yields data that I could use like

from pxweb import PxApi
api = PxApi("scb")
with open("TAB6683.ndjson", "w") as f:
    for record in api.get_table_data_all_iter("TAB6683"):
        f.write(json.dumps(record, ensure_ascii=False) + "\n")

Here's an example implementation as a method on PxApi

#src/pxweb/api.py

    def get_table_data_all_iter(
        self,
        table_id: str,
        show: Literal["code", "value", "code_value"] | None = None,
    ) -> Iterator[dict]:
        """
        Like `~~.PxApi.get_table_data_all`, but yields row dicts one at a time
        instead of materialising the full dataset in memory before returning.
        Rows are yielded as each sub-query completes, so processing can start
        before all network calls have finished.

        Parameters
        ----------
        table_id: str
            An ID of a table to get data from.
        show: str, optional
            Set to "code_value", "code" or "value", to specify what to show in the categorical columns.

        Yields
        ------
        :
            One dict per data cell, in the same format as `~~.PxApi.get_table_data_all`.
        """
        if show not in (valid_show := {"code", "value", "code_value", None}):
            raise ValueError(
                f"Invalid value for show: {show}. Expected one of {valid_show}."
            )

        table_variables = self.get_table_variables(table_id)
        value_codes = expand_wildcards(
            {k: ["*"] for k in table_variables},
            table_variables,
        )

        def fetch(query):
            return unpack_table_data(
                self._client.call(
                    endpoint=f"/tables/{table_id}/data", query=query
                ),
                show=show,
            )

        if (
            count_data_cells(value_codes)
            > self._client.configuration["maxDataCells"]
        ):
            subqueries = [
                build_query(sub_query)
                for sub_query in split_value_codes(
                    value_codes, self._client.configuration["maxDataCells"]
                )
            ]
            if self.max_workers == 1:
                logger.debug("Fetching %s subqueries", len(subqueries))
                for subquery in subqueries:
                    yield from fetch(subquery)
            else:
                logger.debug(
                    "Fetching %s subqueries with %s workers",
                    len(subqueries),
                    self.max_workers,
                )
                with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
                    for result in executor.map(fetch, subqueries):
                        yield from result
        else:
            yield from fetch(build_query(value_codes))

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions