diff --git a/sdk/cosmos/azure-cosmos/CHANGELOG.md b/sdk/cosmos/azure-cosmos/CHANGELOG.md index b8b1c451ad10..db4745e4f4af 100644 --- a/sdk/cosmos/azure-cosmos/CHANGELOG.md +++ b/sdk/cosmos/azure-cosmos/CHANGELOG.md @@ -7,6 +7,7 @@ #### Breaking Changes #### Bugs Fixed +* Fixed a bug where a caller request-level timeout or cancellation that fired mid-flight during a cold control-plane metadata (container/collection) read could preempt the cross-region failover retry before the retry policy was consulted, surfacing a cancellation instead of failing over to the next preferred region. The SDK now grants one bounded, cancellation-shielded cross-region attempt for such metadata reads (configurable via `AZURE_COSMOS_METADATA_FAILOVER_GRACE_SECONDS`, default 10s; set to `0` to disable). See [issue 46471](https://github.com/Azure/azure-sdk-for-python/issues/46471). #### Other Changes diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/_constants.py b/sdk/cosmos/azure-cosmos/azure/cosmos/_constants.py index 73ba0649a859..2d93a21275de 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/_constants.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/_constants.py @@ -90,6 +90,17 @@ class _Constants: TIMEOUT_ERROR_THRESHOLD_PPAF_DEFAULT: int = 10 # ------------------------------------------------------------------------- + # Bounded grace window (in seconds) granted to a cold control-plane metadata + # (collection) read so that a cross-region failover attempt can run even when + # the caller's request-level timeout/cancellation fires mid-flight. See + # azure-sdk-for-python#46471 / azure-cosmos-dotnet-v3#5805. A value <= 0 + # disables the grace window and restores the prior (preempted) behavior. The + # value is clamped to MetadataFailoverGraceSecondsMax. + METADATA_FAILOVER_GRACE_SECONDS = "AZURE_COSMOS_METADATA_FAILOVER_GRACE_SECONDS" + METADATA_FAILOVER_GRACE_SECONDS_DEFAULT: float = 10.0 + METADATA_FAILOVER_GRACE_SECONDS_MAX: float = 86400.0 + # ------------------------------------------------------------------------- + # Controls how the SDK handles invalid UTF-8 bytes in HTTP response bodies. # Accepted values: "REPLACE", "IGNORE". Anything else (including unset) # leaves strict decoding in effect, which is the historical default. diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/_metadata_failover_grace.py b/sdk/cosmos/azure-cosmos/azure/cosmos/_metadata_failover_grace.py new file mode 100644 index 000000000000..cd82728514b3 --- /dev/null +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/_metadata_failover_grace.py @@ -0,0 +1,135 @@ +# The MIT License (MIT) +# Copyright (c) Microsoft Corporation. All rights reserved. + +"""Caller-cancellation-shielded grace execution for cold control-plane metadata reads. + +When a cold collection-metadata read (container cache warm-up) is routed at an +unhealthy preferred region, the SDK escalates through internal HTTP timeouts. If the +caller's request-level timeout / cancellation fires during that escalation, the retry +loop exits before the cross-region failover policy can route the next attempt to a +healthy region, so the customer surfaces a cancellation instead of a successful +failover. + +This module provides the primitives used by the sync/async retry loops to grant ONE +bounded, cancellation-shielded cross-region attempt for such metadata reads, mirroring +the .NET ``MetadataDetachedExecutor`` (Azure/azure-cosmos-dotnet-v3#5844) and the +direction in Azure/azure-sdk-for-python#46471. The attempt runs detached from the +caller's cancellation; on success the caller receives the failover result, otherwise +the original cancellation is surfaced. +""" +import logging +import os +import threading +from typing import Any, Callable, Optional, Sequence, Tuple + +from ._constants import _Constants +from .documents import _OperationType +from ._request_object import RequestObject +from .http_constants import ResourceType + +logger = logging.getLogger("azure.cosmos._metadata_failover_grace") + + +def get_grace_seconds() -> float: + """Resolve the bounded grace window for a metadata cross-region failover attempt. + + Reads ``AZURE_COSMOS_METADATA_FAILOVER_GRACE_SECONDS`` (default + :attr:`_Constants.METADATA_FAILOVER_GRACE_SECONDS_DEFAULT`). A value ``<= 0`` + disables the grace window (restores the prior preempted behavior). Values are + clamped to ``[0, METADATA_FAILOVER_GRACE_SECONDS_MAX]``. Malformed values fall + back to the default. + + :returns: the grace window in seconds (``0`` means disabled). + :rtype: float + """ + raw = os.environ.get(_Constants.METADATA_FAILOVER_GRACE_SECONDS) + if raw is None: + value = _Constants.METADATA_FAILOVER_GRACE_SECONDS_DEFAULT + else: + try: + value = float(raw) + except (TypeError, ValueError): + value = _Constants.METADATA_FAILOVER_GRACE_SECONDS_DEFAULT + value = max(value, 0.0) + value = min(value, _Constants.METADATA_FAILOVER_GRACE_SECONDS_MAX) + return value + + +def is_metadata_failover_candidate(args: Sequence[Any]) -> bool: + """Return True if the current request is a cold collection-metadata read. + + The grace attempt is intentionally scoped to read-only collection + (``ResourceType.Collection``) reads, matching .NET which wires the detached + executor only into the collection-cache path. Data-plane operations keep their + existing cancellation semantics. + + :param args: the positional args passed to the retry loop; ``args[0]`` is the + :class:`~azure.cosmos._request_object.RequestObject` when present. + :type args: Sequence[Any] + :returns: whether a metadata cross-region grace attempt is applicable. + :rtype: bool + """ + if not args: + return False + request = args[0] + if not isinstance(request, RequestObject): + return False + resource_type = getattr(request, "resource_type", None) + operation_type = getattr(request, "operation_type", None) + if resource_type != ResourceType.Collection: + return False + if operation_type is None: + return False + return _OperationType.IsReadOnlyOperation(operation_type) + + +def run_grace_attempt_sync( + attempt: Callable[[], Any], + grace_seconds: float, +) -> Tuple[bool, Optional[Any], Optional[BaseException]]: + """Run a single cross-region metadata attempt detached from caller cancellation. + + The attempt executes on a daemon thread bounded by ``grace_seconds`` so that the + caller's cancellation cannot preempt the cross-region failover decision. The + attempt performs exactly one request against the next preferred region (the + request was routed synchronously by the caller before this is invoked); it does + not re-enter the retry loop. If the grace window expires the thread is left + running in the background (its single in-flight request completes and its + retry-policy side-effects still benefit subsequent callers) and the caller + surfaces the original cancellation. + + .. note:: + On grace expiry the abandoned daemon thread may still be reading the shared + :class:`~azure.cosmos._request_object.RequestObject` while it completes its + single in-flight send. Callers must not mutate or reuse that request object + after the cancellation is surfaced; in practice the request belongs to the + failed operation and is discarded once it raises. + + :param attempt: zero-arg callable performing exactly one cross-region attempt. + :type attempt: Callable + :param float grace_seconds: maximum time to wait for the attempt to complete. + :returns: ``(succeeded, result, exception)``. ``succeeded`` is True only when the + attempt completed within the grace window without raising. + :rtype: tuple[bool, object, BaseException] + """ + box: dict = {} + + def _runner() -> None: + try: + box["result"] = attempt() + box["ok"] = True + except BaseException as exc: # pylint: disable=broad-except + box["exception"] = exc + box["ok"] = False + + thread = threading.Thread( + target=_runner, name="cosmos-metadata-failover-grace", daemon=True) + thread.start() + thread.join(grace_seconds) + + if thread.is_alive(): + # Grace window expired; leave the detached attempt running in the background. + return False, None, None + if box.get("ok"): + return True, box.get("result"), None + return False, None, box.get("exception") diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/_retry_utility.py b/sdk/cosmos/azure-cosmos/azure/cosmos/_retry_utility.py index f8071eded6f6..4d74af09cac0 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/_retry_utility.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/_retry_utility.py @@ -39,6 +39,7 @@ from . import _session_retry_policy from . import _timeout_failover_retry_policy from . import exceptions +from . import _metadata_failover_grace from ._constants import _Constants from ._cosmos_http_logging_policy import _log_diagnostics_error from ._global_partition_endpoint_manager_per_partition_automatic_failover import \ @@ -298,6 +299,51 @@ def Execute(client, global_endpoint_manager, function, *args, **kwargs): # pylin _record_failure_if_request_not_cancelled(args[0], global_endpoint_manager, pk_range_wrapper) _handle_service_response_retries(request, client, service_response_retry_policy, e, *args) + except BaseException as e: # pylint: disable=broad-except + # A caller request-level timeout / cancellation can fire mid-flight during a + # cold control-plane metadata (collection) read and preempt the retry loop + # before the cross-region failover policy is consulted, surfacing the + # cancellation instead of a successful failover to the next preferred region + # (see azure-sdk-for-python#46471 / azure-cosmos-dotnet-v3#5805). For metadata + # reads only, give the failover policy one bounded, cancellation-shielded + # attempt against the next region. Process-control signals and the SDK's own + # timeout error are never intercepted. + if isinstance(e, (KeyboardInterrupt, SystemExit, GeneratorExit, + exceptions.CosmosClientTimeoutError)): + raise + if not _metadata_failover_grace.is_metadata_failover_candidate(args): + raise + grace_seconds = _metadata_failover_grace.get_grace_seconds() + if grace_seconds <= 0: + raise + if not timeout_failover_retry_policy.ShouldRetry(e): + raise + last_error = e + + def _grace_attempt(): + return ExecuteFunction(function, global_endpoint_manager, *args, **kwargs) + + succeeded, grace_result, _grace_exc = _metadata_failover_grace.run_grace_attempt_sync( + _grace_attempt, grace_seconds) + if not succeeded: + # Surface the original caller cancellation; the detached attempt (if any) + # keeps running in the background to warm caches for subsequent callers. + if _grace_exc is not None: + logger.debug( + "Metadata cross-region failover grace attempt failed before the " + "caller cancellation was surfaced: %r", _grace_exc) + raise + + if not client.last_response_headers: + client.last_response_headers = {} + client.last_response_headers[ + HttpHeaders.ThrottleRetryCount + ] = resourceThrottle_retry_policy.current_retry_attempt_count + client.last_response_headers[ + HttpHeaders.ThrottleRetryWaitTimeInMs + ] = resourceThrottle_retry_policy.cumulative_wait_time_in_milliseconds + return grace_result + def _record_success_if_request_not_cancelled( request_params: RequestObject, global_endpoint_manager: _GlobalPartitionEndpointManagerForPerPartitionAutomaticFailover, diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_retry_utility_async.py b/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_retry_utility_async.py index a6d44a699cb8..9654c8ab385b 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_retry_utility_async.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_retry_utility_async.py @@ -41,6 +41,7 @@ from .. import _session_retry_policy from .. import _timeout_failover_retry_policy from .. import exceptions +from .. import _metadata_failover_grace from .._constants import _Constants from .._container_recreate_retry_policy import ContainerRecreateRetryPolicy from .._request_object import RequestObject @@ -56,6 +57,24 @@ # pylint: disable=protected-access, disable=too-many-lines, disable=too-many-statements, disable=too-many-branches # cspell:ignore ppaf, ppcb + +def _observe_detached_grace_task(task: "asyncio.Task") -> None: + """Retrieve a detached grace task's result/exception so it is marked observed. + + When the metadata cross-region failover grace window expires, the shielded + attempt keeps running detached. Retrieving its eventual exception here prevents + asyncio from logging "Task exception was never retrieved" for that background task. + + :param asyncio.Task task: the detached grace task. + """ + if task.cancelled(): + return + try: + task.exception() + except asyncio.CancelledError: + pass + + # args [0] is the request object # args [1] is the connection policy # args [2] is the pipeline client @@ -314,6 +333,58 @@ async def ExecuteAsync(client, global_endpoint_manager, function, *args, **kwarg pk_range_wrapper) _handle_service_response_retries(request, client, service_response_retry_policy, e, *args) + except asyncio.CancelledError as e: + # A caller request-level timeout / cancellation can fire mid-flight during a + # cold control-plane metadata (collection) read and preempt the retry loop + # before the cross-region failover policy is consulted, surfacing the + # cancellation instead of a successful failover to the next preferred region + # (see azure-sdk-for-python#46471 / azure-cosmos-dotnet-v3#5805). For metadata + # reads only, give the failover policy one bounded, cancellation-shielded + # attempt against the next region. The attempt runs on a shielded task so the + # caller's cancellation cannot preempt the failover decision. + if not _metadata_failover_grace.is_metadata_failover_candidate(args): + raise + grace_seconds = _metadata_failover_grace.get_grace_seconds() + if grace_seconds <= 0: + raise + if not timeout_failover_retry_policy.ShouldRetry(e): + raise + last_error = e + # Run the cross-region attempt as an explicit task so that, if the grace + # window expires, the still-running detached task can be observed (its + # eventual exception retrieved) instead of raising an "exception was never + # retrieved" warning. The shield keeps the caller's cancellation from + # tearing down the detached task. + grace_task = asyncio.ensure_future( + ExecuteFunctionAsync(function, global_endpoint_manager, *args, **kwargs)) + try: + grace_result = await asyncio.wait_for( + asyncio.shield(grace_task), timeout=grace_seconds) + except BaseException as grace_exc: # pylint: disable=broad-except + # Grace window expired or the cross-region attempt failed. Make sure the + # detached attempt's eventual result/exception is observed, then surface + # the original caller cancellation. The detached attempt (if still + # running) continues in the background to warm caches for subsequent + # callers. + if grace_task.done(): + _observe_detached_grace_task(grace_task) + else: + grace_task.add_done_callback(_observe_detached_grace_task) + logger.debug( + "Metadata cross-region failover grace attempt did not complete " + "before the caller cancellation was surfaced: %r", grace_exc) + raise e from None + + if not client.last_response_headers: + client.last_response_headers = {} + client.last_response_headers[ + HttpHeaders.ThrottleRetryCount + ] = resourceThrottle_retry_policy.current_retry_attempt_count + client.last_response_headers[ + HttpHeaders.ThrottleRetryWaitTimeInMs + ] = resourceThrottle_retry_policy.cumulative_wait_time_in_milliseconds + return grace_result + async def _record_success_if_request_not_cancelled( request_params: RequestObject, global_endpoint_manager: _GlobalPartitionEndpointManagerForPerPartitionAutomaticFailoverAsync, diff --git a/sdk/cosmos/azure-cosmos/tests/test_metadata_failover_grace_unit.py b/sdk/cosmos/azure-cosmos/tests/test_metadata_failover_grace_unit.py new file mode 100644 index 000000000000..07e1d15d8d6c --- /dev/null +++ b/sdk/cosmos/azure-cosmos/tests/test_metadata_failover_grace_unit.py @@ -0,0 +1,225 @@ +# The MIT License (MIT) +# Copyright (c) Microsoft Corporation. All rights reserved. + +"""Sync unit tests for the metadata cross-region failover grace window. + +Covers the fix for the bug where a caller request-level timeout / cancellation that +fires mid-flight during a cold control-plane metadata (collection) read preempts the +cross-region failover decision (azure-sdk-for-python#46471 / +azure-cosmos-dotnet-v3#5805). +""" +# cspell:ignore ppaf + +import asyncio +import time +import unittest + +import pytest + +from azure.cosmos import _retry_utility, _metadata_failover_grace +from azure.cosmos._constants import _Constants +from azure.cosmos._request_object import RequestObject +from azure.cosmos.documents import _OperationType +from azure.cosmos.http_constants import ResourceType + + +class _FakeRetryOptions: + MaxRetryAttemptCount = 9 + FixedRetryIntervalInMilliseconds = 0 + MaxWaitTimeInSeconds = 30 + + +class _FakeConnectionPolicy: + def __init__(self, enable_discovery=True): + self.EnableEndpointDiscovery = enable_discovery + self.RetryOptions = _FakeRetryOptions() + + +class _FakeLocationCache: + def __init__(self, regions): + self.read_regional_routing_contexts = regions + self.write_regional_routing_contexts = regions + + def _get_applicable_read_regional_routing_contexts(self, *_a, **_k): + return self.read_regional_routing_contexts + + def _get_applicable_write_regional_routing_contexts(self, *_a, **_k): + return self.write_regional_routing_contexts + + +class _FakeGEM: + def __init__(self, regions): + self.location_cache = _FakeLocationCache(regions) + + def is_per_partition_automatic_failover_applicable(self, _req): + return False + + def is_circuit_breaker_applicable(self, _req): + return False + + def try_ppaf_failover_threshold(self, *_a, **_k): + return None + + def resolve_service_endpoint_for_partition(self, *_a, **_k): + return "https://next-region.example/" + + def can_use_multiple_write_locations(self, *_a, **_k): + return False + + def record_success(self, *_a, **_k): + return None + + +class _FakeClient: + def __init__(self, enable_discovery=True): + self.last_response_headers = {} + self.connection_policy = _FakeConnectionPolicy(enable_discovery) + self._container_properties_cache = {} + self.session = None + + def _UpdateSessionIfRequired(self, *_a, **_k): + return None + + +def _make_request(resource_type): + return RequestObject(resource_type, _OperationType.Read, {}) + + +@pytest.mark.cosmosEmulator +class TestMetadataFailoverGraceUnit(unittest.TestCase): + + def setUp(self): + self._orig = _retry_utility.ExecuteFunction + + def tearDown(self): + _retry_utility.ExecuteFunction = self._orig + + def _install_mock(self, second_behavior="ok"): + state = {"n": 0} + + def mock(function, *args, **kwargs): + state["n"] += 1 + if state["n"] == 1: + raise asyncio.CancelledError() + if second_behavior == "ok": + return ({"ok": True, "region": "B"}, {}) + if second_behavior == "raise": + raise asyncio.CancelledError() + raise AssertionError("unexpected") + + _retry_utility.ExecuteFunction = mock + return state + + # ---- helper-level tests ---- + + def test_is_metadata_failover_candidate(self): + self.assertTrue(_metadata_failover_grace.is_metadata_failover_candidate( + (_make_request(ResourceType.Collection),))) + self.assertFalse(_metadata_failover_grace.is_metadata_failover_candidate( + (_make_request(ResourceType.Document),))) + self.assertFalse(_metadata_failover_grace.is_metadata_failover_candidate(())) + write_req = RequestObject(ResourceType.Collection, _OperationType.Create, {}) + self.assertFalse(_metadata_failover_grace.is_metadata_failover_candidate((write_req,))) + + def test_get_grace_seconds_default_and_clamp(self): + env = _Constants.METADATA_FAILOVER_GRACE_SECONDS + import os + prev = os.environ.get(env) + try: + os.environ.pop(env, None) + self.assertEqual(_metadata_failover_grace.get_grace_seconds(), + _Constants.METADATA_FAILOVER_GRACE_SECONDS_DEFAULT) + os.environ[env] = "3.5" + self.assertEqual(_metadata_failover_grace.get_grace_seconds(), 3.5) + os.environ[env] = "-1" + self.assertEqual(_metadata_failover_grace.get_grace_seconds(), 0.0) + os.environ[env] = str(_Constants.METADATA_FAILOVER_GRACE_SECONDS_MAX * 10) + self.assertEqual(_metadata_failover_grace.get_grace_seconds(), + _Constants.METADATA_FAILOVER_GRACE_SECONDS_MAX) + os.environ[env] = "not-a-number" + self.assertEqual(_metadata_failover_grace.get_grace_seconds(), + _Constants.METADATA_FAILOVER_GRACE_SECONDS_DEFAULT) + finally: + if prev is None: + os.environ.pop(env, None) + else: + os.environ[env] = prev + + def test_run_grace_attempt_sync_success(self): + ok, result, exc = _metadata_failover_grace.run_grace_attempt_sync( + lambda: {"v": 1}, 5.0) + self.assertTrue(ok) + self.assertEqual(result, {"v": 1}) + self.assertIsNone(exc) + + def test_run_grace_attempt_sync_exception(self): + def boom(): + raise ValueError("nope") + ok, result, exc = _metadata_failover_grace.run_grace_attempt_sync(boom, 5.0) + self.assertFalse(ok) + self.assertIsNone(result) + self.assertIsInstance(exc, ValueError) + + def test_run_grace_attempt_sync_timeout(self): + ok, result, exc = _metadata_failover_grace.run_grace_attempt_sync( + lambda: time.sleep(1.0), 0.05) + self.assertFalse(ok) + self.assertIsNone(result) + self.assertIsNone(exc) + + # ---- end-to-end retry-loop tests ---- + + def test_metadata_read_cancel_triggers_grace_failover_success(self): + state = self._install_mock("ok") + result = _retry_utility.Execute( + _FakeClient(), _FakeGEM(["A", "B"]), lambda *a, **k: None, + _make_request(ResourceType.Collection)) + self.assertEqual(result[0], {"ok": True, "region": "B"}) + self.assertEqual(state["n"], 2) + + def test_docs_read_cancel_does_not_trigger_grace(self): + state = self._install_mock("ok") + with self.assertRaises(asyncio.CancelledError): + _retry_utility.Execute( + _FakeClient(), _FakeGEM(["A", "B"]), lambda *a, **k: None, + _make_request(ResourceType.Document)) + self.assertEqual(state["n"], 1) + + def test_policy_declines_propagates_original(self): + # endpoint discovery disabled -> timeout failover policy declines + state = self._install_mock("ok") + with self.assertRaises(asyncio.CancelledError): + _retry_utility.Execute( + _FakeClient(enable_discovery=False), _FakeGEM(["A", "B"]), + lambda *a, **k: None, _make_request(ResourceType.Collection)) + self.assertEqual(state["n"], 1) + + def test_grace_attempt_failure_surfaces_original(self): + state = self._install_mock("raise") + with self.assertRaises(asyncio.CancelledError): + _retry_utility.Execute( + _FakeClient(), _FakeGEM(["A", "B"]), lambda *a, **k: None, + _make_request(ResourceType.Collection)) + self.assertEqual(state["n"], 2) + + def test_grace_disabled_via_env_propagates_original(self): + import os + env = _Constants.METADATA_FAILOVER_GRACE_SECONDS + prev = os.environ.get(env) + os.environ[env] = "0" + try: + state = self._install_mock("ok") + with self.assertRaises(asyncio.CancelledError): + _retry_utility.Execute( + _FakeClient(), _FakeGEM(["A", "B"]), lambda *a, **k: None, + _make_request(ResourceType.Collection)) + self.assertEqual(state["n"], 1) + finally: + if prev is None: + os.environ.pop(env, None) + else: + os.environ[env] = prev + + +if __name__ == "__main__": + unittest.main() diff --git a/sdk/cosmos/azure-cosmos/tests/test_metadata_failover_grace_unit_async.py b/sdk/cosmos/azure-cosmos/tests/test_metadata_failover_grace_unit_async.py new file mode 100644 index 000000000000..39b63fe9b7e9 --- /dev/null +++ b/sdk/cosmos/azure-cosmos/tests/test_metadata_failover_grace_unit_async.py @@ -0,0 +1,234 @@ +# The MIT License (MIT) +# Copyright (c) Microsoft Corporation. All rights reserved. + +"""Async unit tests for the metadata cross-region failover grace window. + +Covers the fix for the bug where an ``asyncio.CancelledError`` from a caller +request-level timeout / cancellation that fires mid-flight during a cold +control-plane metadata (collection) read preempts the cross-region failover +decision (azure-sdk-for-python#46471 / azure-cosmos-dotnet-v3#5805). +""" +# cspell:ignore ppaf + +import asyncio +import os +import unittest + +import pytest + +from azure.cosmos import _metadata_failover_grace +from azure.cosmos._constants import _Constants +from azure.cosmos._request_object import RequestObject +from azure.cosmos.aio import _retry_utility_async +from azure.cosmos.documents import _OperationType +from azure.cosmos.http_constants import ResourceType + + +class _FakeRetryOptions: + MaxRetryAttemptCount = 9 + FixedRetryIntervalInMilliseconds = 0 + MaxWaitTimeInSeconds = 30 + + +class _FakeConnectionPolicy: + def __init__(self, enable_discovery=True): + self.EnableEndpointDiscovery = enable_discovery + self.RetryOptions = _FakeRetryOptions() + + +class _FakeLocationCache: + def __init__(self, regions): + self.read_regional_routing_contexts = regions + self.write_regional_routing_contexts = regions + + def _get_applicable_read_regional_routing_contexts(self, *_a, **_k): + return self.read_regional_routing_contexts + + def _get_applicable_write_regional_routing_contexts(self, *_a, **_k): + return self.write_regional_routing_contexts + + +class _FakeGEM: + def __init__(self, regions): + self.location_cache = _FakeLocationCache(regions) + + def is_per_partition_automatic_failover_applicable(self, _req): + return False + + def is_circuit_breaker_applicable(self, _req): + return False + + def try_ppaf_failover_threshold(self, *_a, **_k): + return None + + def resolve_service_endpoint_for_partition(self, *_a, **_k): + return "https://next-region.example/" + + def can_use_multiple_write_locations(self, *_a, **_k): + return False + + async def record_success(self, *_a, **_k): + return None + + +class _FakeClient: + def __init__(self, enable_discovery=True): + self.last_response_headers = {} + self.connection_policy = _FakeConnectionPolicy(enable_discovery) + self._container_properties_cache = {} + self.session = None + + def _UpdateSessionIfRequired(self, *_a, **_k): + return None + + +def _make_request(resource_type): + return RequestObject(resource_type, _OperationType.Read, {}) + + +async def _noop(*_a, **_k): + return ({}, {}) + + +@pytest.mark.cosmosEmulator +class TestMetadataFailoverGraceUnitAsync(unittest.IsolatedAsyncioTestCase): + + def setUp(self): + self._orig = _retry_utility_async.ExecuteFunctionAsync + + def tearDown(self): + _retry_utility_async.ExecuteFunctionAsync = self._orig + + def _install_mock(self, second_behavior="ok"): + state = {"n": 0} + + async def mock(function, *args, **kwargs): + state["n"] += 1 + if state["n"] == 1: + raise asyncio.CancelledError() + if second_behavior == "ok": + return ({"ok": True, "region": "B"}, {}) + if second_behavior == "raise": + raise asyncio.CancelledError() + raise AssertionError("unexpected") + + _retry_utility_async.ExecuteFunctionAsync = mock + return state + + async def test_metadata_read_cancel_triggers_grace_failover_success(self): + state = self._install_mock("ok") + result = await _retry_utility_async.ExecuteAsync( + _FakeClient(), _FakeGEM(["A", "B"]), _noop, + _make_request(ResourceType.Collection)) + self.assertEqual(result[0], {"ok": True, "region": "B"}) + self.assertEqual(state["n"], 2) + + async def test_docs_read_cancel_does_not_trigger_grace(self): + state = self._install_mock("ok") + with self.assertRaises(asyncio.CancelledError): + await _retry_utility_async.ExecuteAsync( + _FakeClient(), _FakeGEM(["A", "B"]), _noop, + _make_request(ResourceType.Document)) + self.assertEqual(state["n"], 1) + + async def test_policy_declines_propagates_original(self): + state = self._install_mock("ok") + with self.assertRaises(asyncio.CancelledError): + await _retry_utility_async.ExecuteAsync( + _FakeClient(enable_discovery=False), _FakeGEM(["A", "B"]), _noop, + _make_request(ResourceType.Collection)) + self.assertEqual(state["n"], 1) + + async def test_grace_attempt_failure_surfaces_original(self): + state = self._install_mock("raise") + with self.assertRaises(asyncio.CancelledError): + await _retry_utility_async.ExecuteAsync( + _FakeClient(), _FakeGEM(["A", "B"]), _noop, + _make_request(ResourceType.Collection)) + self.assertEqual(state["n"], 2) + + async def test_grace_disabled_via_env_propagates_original(self): + env = _Constants.METADATA_FAILOVER_GRACE_SECONDS + prev = os.environ.get(env) + os.environ[env] = "0" + try: + state = self._install_mock("ok") + with self.assertRaises(asyncio.CancelledError): + await _retry_utility_async.ExecuteAsync( + _FakeClient(), _FakeGEM(["A", "B"]), _noop, + _make_request(ResourceType.Collection)) + self.assertEqual(state["n"], 1) + finally: + if prev is None: + os.environ.pop(env, None) + else: + os.environ[env] = prev + + async def test_grace_timeout_surfaces_original_clean_cause(self): + env = _Constants.METADATA_FAILOVER_GRACE_SECONDS + prev = os.environ.get(env) + os.environ[env] = "0.02" + state = {"n": 0} + + async def mock(function, *args, **kwargs): + state["n"] += 1 + if state["n"] == 1: + raise asyncio.CancelledError() + await asyncio.sleep(1.0) + return ({"ok": True}, {}) + + _retry_utility_async.ExecuteFunctionAsync = mock + try: + with self.assertRaises(asyncio.CancelledError) as ctx: + await _retry_utility_async.ExecuteAsync( + _FakeClient(), _FakeGEM(["A", "B"]), _noop, + _make_request(ResourceType.Collection)) + # Original cancellation is surfaced cleanly (no misleading cause chain). + self.assertIsNone(ctx.exception.__cause__) + self.assertEqual(state["n"], 2) + finally: + if prev is None: + os.environ.pop(env, None) + else: + os.environ[env] = prev + + async def test_grace_timeout_failing_detached_attempt_is_observed(self): + # When the grace window expires and the detached attempt later raises, the + # detached task's exception must be observed (no "Task exception was never + # retrieved"). + loop = asyncio.get_running_loop() + unhandled = [] + loop.set_exception_handler(lambda _loop, ctx: unhandled.append(ctx)) + env = _Constants.METADATA_FAILOVER_GRACE_SECONDS + prev = os.environ.get(env) + os.environ[env] = "0.02" + state = {"n": 0} + + async def mock(function, *args, **kwargs): + state["n"] += 1 + if state["n"] == 1: + raise asyncio.CancelledError() + await asyncio.sleep(0.1) + raise ValueError("region B also failed") + + _retry_utility_async.ExecuteFunctionAsync = mock + try: + with self.assertRaises(asyncio.CancelledError): + await _retry_utility_async.ExecuteAsync( + _FakeClient(), _FakeGEM(["A", "B"]), _noop, + _make_request(ResourceType.Collection)) + # Let the detached attempt run to its failure so the observer fires. + await asyncio.sleep(0.3) + never_retrieved = [c for c in unhandled + if "never retrieved" in str(c.get("message", "")).lower()] + self.assertEqual(never_retrieved, []) + finally: + loop.set_exception_handler(None) + if prev is None: + os.environ.pop(env, None) + else: + os.environ[env] = prev + + +if __name__ == "__main__": + unittest.main()