Skip to content

feat(client): bound concurrent in-flight data updates to cap peak memory#917

Open
Kyzgor wants to merge 1 commit into
permitio:masterfrom
Kyzgor:fix/bound-concurrent-data-updates
Open

feat(client): bound concurrent in-flight data updates to cap peak memory#917
Kyzgor wants to merge 1 commit into
permitio:masterfrom
Kyzgor:fix/bound-concurrent-data-updates

Conversation

@Kyzgor

@Kyzgor Kyzgor commented Jun 8, 2026

Copy link
Copy Markdown

Fixes Issue

Related to #844 and #770. This is hardening; the primary leak in those issues is fixed separately
in the fetch-worker PR (#916). There is no "Closes" here, this is a defense-in-depth bound rather
than the reported-bug fix.

Changes proposed

Bounds how many data-update entries fetch and write to the policy store concurrently, so a burst
of updates cannot stack an unbounded number of in-flight datasets in memory.

  • opal_client/config.py: new DATA_UPDATER_MAX_CONCURRENT_UPDATES (default 10).
  • opal_client/data/updater.py: an asyncio.Semaphore acquired as the outermost context manager
    around each entry's fetch and write in _update_policy_data, plus a constructor override
    max_concurrent_data_updates.
  • opal_client/tests/data_updater_backpressure_test.py (new): asserts peak concurrent writes
    stays at or below the cap under a burst, and that every update is still applied (no drops).

Why

trigger_data_update fires each update fire-and-forget into an unbounded TasksPool, and
_update_policy_data fetches inside a per-dst_path hierarchical lock whose waiters queue without
bound. Under load (a reconnect storm across a fleet, frequent periodic updates, or a slow policy
store) the count of in-flight updates, each holding its fetched dataset, is bounded only by the
arrival rate, so the peak working set can grow large before the backlog drains.

This is a different failure mode from the per-worker frame retention fixed in #916 (that one is a
steady-state high-water mark; this one is a transient burst spike). They are complementary.

Check List (Check all the applicable boxes)

  • I sign off on contributing this submission to open-source
  • My code follows the code style of this project.
  • My change requires changes to the documentation.
  • I have updated the documentation accordingly.
  • All new and existing tests passed.
  • This PR does not contain plagiarized content.
  • The title of my pull request is a short description of the requested changes.

Screenshots

N/A. The behaviour is memory and concurrency, quantified below.

Note to reviewers

Design choices, and what is preserved:

  • The semaphore is acquired around the fetch and write of each update entry only, as the outermost
    context manager inside the existing async with. It does not wrap _send_reports (which uses
    the same TasksPool), so report delivery is never throttled behind data writes.
  • It blocks rather than drops, so no update is lost. This is not a lossy queue.
  • The hierarchical lock (per-dst_path write ordering) is untouched.

Empirical results from a local harness driving the real DataUpdater -> fetch path:

Concurrency (N) Without cap (peak RSS) With cap=5 (peak RSS)
10 +27 MB +19 MB
30 +49 MB +21 MB
60 +98 MB +21 MB

Peak concurrent writes pin to the cap regardless of N, and peak RSS stops scaling with load. A
larger A/B (45 updates x 4 MB with a slow store): 1,202 MB drops to ~340 MB at cap=5. The
regression test fails if the semaphore is removed (observed peak concurrency jumps to about the
burst size).

On the default value: 10 is a balance, high enough not to throttle typical multi-entry configs
and low enough to bound memory under pathological bursts. It is fully configurable, and I am happy
to adjust the default if you prefer a different point, or to make it opt-in (an unbounded default).

CI note: the security/snyk check tends to error on fork PRs (org token/quota) rather than
indicating a code problem.

`trigger_data_update` fires each update fire-and-forget into an unbounded
`TasksPool`, and `_update_policy_data` fetches inside a per-path lock whose
waiters queue without bound. Under a burst — a reconnect storm across a fleet,
frequent periodic updates, or a slow policy store — the number of in-flight
updates (each holding its fetched dataset) is bounded only by the arrival rate,
inflating the peak working set.

Add an `asyncio.Semaphore` (`DATA_UPDATER_MAX_CONCURRENT_UPDATES`, default 10)
acquired around the fetch+write of each update entry, so peak concurrency — and
hence peak memory — is bounded. The semaphore blocks rather than drops, so
no update is lost, and per-path write ordering (the hierarchical lock) is
preserved. Adds a regression test asserting peak concurrent writes stays at or
below the cap under a burst.

Related to permitio#844, permitio#770
@netlify

netlify Bot commented Jun 8, 2026

Copy link
Copy Markdown

Deploy Preview for opal-docs canceled.

Name Link
🔨 Latest commit 76e8c62
🔍 Latest deploy log https://app.netlify.com/projects/opal-docs/deploys/6a2707013b75d60008996185

1 similar comment
@netlify

netlify Bot commented Jun 8, 2026

Copy link
Copy Markdown

Deploy Preview for opal-docs canceled.

Name Link
🔨 Latest commit 76e8c62
🔍 Latest deploy log https://app.netlify.com/projects/opal-docs/deploys/6a2707013b75d60008996185

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant