Skip to content

Refactor _SharedCache to handle context vs non-context ownership#38620

Draft
shunping wants to merge 4 commits into
apache:masterfrom
shunping:refactor-shared-cache-2
Draft

Refactor _SharedCache to handle context vs non-context ownership#38620
shunping wants to merge 4 commits into
apache:masterfrom
shunping:refactor-shared-cache-2

Conversation

@shunping
Copy link
Copy Markdown
Collaborator

@shunping shunping commented May 24, 2026

Currently, _SharedCache registers all live owners as joint owners of every cached subprocess key. This works fine when all owners are within a unified context (e.g. a test class wrapping multiple runs), but it causes unintended resource sharing/leakage when there are concurrent, independent, non-context owners (such as a long-lived runner).

Here is a resource leak scenario:

  1. The Prism runner starts and registers a persistent non-context owner owner_1.
  2. The pipeline requests an external transform, spinning up a short-lived Java Expansion Service that registers its own owner owner_2 and requests its startup command key.
  3. In the old implementation, get() automatically added all currently registered live owners to the cache key's owner set.
    for owner in self._live_owners:
    self._cache[key].owners.add(owner)

    As a result, owner_1 (the Prism runner) was registered as a joint owner of the short-lived Expansion Service's cache key.
  4. When the pipeline finished using the external transform and called purge(owner_2), the short-lived Expansion Service subprocess was never terminated because owner_1 was still registered as an owner (entry.owners is not empty).
    if not entry.owners:
    to_delete.append(entry.obj)
  5. This resulted in orphaned, zombie Expansion Service processes accumulating in the background, binding up localhost ports and exhausting resources.

This PR refactors _SharedCache in subprocess_server.py to explicitly distinguish between context owners (e.g., cache_subprocesses which should hold ownership over everything created under its block) and non-context owners (e.g., individual subprocess server instances like PrismRunner or expansion service subprocesses).

Additionally, get() now takes an optional owner arg. Context owners automatically own all created keys, while independent non-context owners only own the keys they explicitly request.


For example, we run YAML example test with pytest apache_beam/yaml/examples/testing/examples_test.py --log-cli-level=INFO -s.

  • Without this PR, we see all expansion service servers created in these 46 tests and prism server being destroyed at the end of pytest execution.
    Really destroying service at localhost:49623 with cmd: ('/Users/shunping/.apache_beam/cache/prism/bin/prism', '--job_port', '49623', '--log_level', 'info', '--log_kind', 'json', '--serve_http=false')
    WARNING:apache_beam.utils.subprocess_server:Really destroying service at localhost:49745 with cmd: ['/Users/shunping/.apache_beam/cache/venvs/e08b2501f55abf367196f6578e3dbed1fe2a6d9244fc96ad5d82f3d5c621c1cf/bin/python', '-m', 'apache_beam.runners.portability.expansion_service_main', '--port', '49745', '--fully_qualified_name_glob=*', '--pickle_library=cloudpickle', '--requirements_file=/Users/shunping/.apache_beam/cache/venvs/e08b2501f55abf367196f6578e3dbed1fe2a6d9244fc96ad5d82f3d5c621c1cf-requirements.txt', '--serve_loopback_worker']
    WARNING:apache_beam.utils.subprocess_server:Really destroying service at localhost:49855 with cmd: ['/Users/shunping/.apache_beam/cache/venvs/5052f7801853faa5017d022495ad2a30f1ae0a321fa50de56227e75028f0e612/bin/python', '-m', 'apache_beam.runners.portability.expansion_service_main', '--port', '49855', '--fully_qualified_name_glob=*', '--pickle_library=cloudpickle', '--requirements_file=/Users/shunping/.apache_beam/cache/venvs/5052f7801853faa5017d022495ad2a30f1ae0a321fa50de56227e75028f0e612-requirements.txt', '--serve_loopback_worker']
    WARNING:apache_beam.utils.subprocess_server:Really destroying service at localhost:49888 with cmd: ['/Users/shunping/.apache_beam/cache/venvs/aedc40ba446114c3eb019ef9c411c0ab2204da67a2f4c732530c1c6aba7eef31/bin/python', '-m', 'apache_beam.runners.portability.expansion_service_main', '--port', '49888', '--fully_qualified_name_glob=*', '--pickle_library=cloudpickle', '--requirements_file=/Users/shunping/.apache_beam/cache/venvs/aedc40ba446114c3eb019ef9c411c0ab2204da67a2f4c732530c1c6aba7eef31-requirements.txt', '--serve_loopback_worker']
    
- With this PR, each expansion service server is destroyed as soon as the corresponding test is over. Prism server is cleaned up before pytest ends.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant