Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions sdk/cosmos/azure-cosmos/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#### Breaking Changes

#### Bugs Fixed
* Fixed a bug where a caller request-level timeout or cancellation that fired mid-flight during a cold control-plane metadata (container/collection) read could preempt the cross-region failover retry before the retry policy was consulted, surfacing a cancellation instead of failing over to the next preferred region. The SDK now grants one bounded, cancellation-shielded cross-region attempt for such metadata reads (configurable via `AZURE_COSMOS_METADATA_FAILOVER_GRACE_SECONDS`, default 10s; set to `0` to disable). See [issue 46471](https://github.com/Azure/azure-sdk-for-python/issues/46471).

#### Other Changes

Expand Down
11 changes: 11 additions & 0 deletions sdk/cosmos/azure-cosmos/azure/cosmos/_constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,17 @@ class _Constants:
TIMEOUT_ERROR_THRESHOLD_PPAF_DEFAULT: int = 10
# -------------------------------------------------------------------------

# Bounded grace window (in seconds) granted to a cold control-plane metadata
# (collection) read so that a cross-region failover attempt can run even when
# the caller's request-level timeout/cancellation fires mid-flight. See
# azure-sdk-for-python#46471 / azure-cosmos-dotnet-v3#5805. A value <= 0
# disables the grace window and restores the prior (preempted) behavior. The
# value is clamped to MetadataFailoverGraceSecondsMax.
METADATA_FAILOVER_GRACE_SECONDS = "AZURE_COSMOS_METADATA_FAILOVER_GRACE_SECONDS"
METADATA_FAILOVER_GRACE_SECONDS_DEFAULT: float = 10.0
METADATA_FAILOVER_GRACE_SECONDS_MAX: float = 86400.0
# -------------------------------------------------------------------------

# Controls how the SDK handles invalid UTF-8 bytes in HTTP response bodies.
# Accepted values: "REPLACE", "IGNORE". Anything else (including unset)
# leaves strict decoding in effect, which is the historical default.
Expand Down
135 changes: 135 additions & 0 deletions sdk/cosmos/azure-cosmos/azure/cosmos/_metadata_failover_grace.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
# The MIT License (MIT)
# Copyright (c) Microsoft Corporation. All rights reserved.

"""Caller-cancellation-shielded grace execution for cold control-plane metadata reads.

When a cold collection-metadata read (container cache warm-up) is routed at an
unhealthy preferred region, the SDK escalates through internal HTTP timeouts. If the
caller's request-level timeout / cancellation fires during that escalation, the retry
loop exits before the cross-region failover policy can route the next attempt to a
healthy region, so the customer surfaces a cancellation instead of a successful
failover.

This module provides the primitives used by the sync/async retry loops to grant ONE
bounded, cancellation-shielded cross-region attempt for such metadata reads, mirroring
the .NET ``MetadataDetachedExecutor`` (Azure/azure-cosmos-dotnet-v3#5844) and the
direction in Azure/azure-sdk-for-python#46471. The attempt runs detached from the
caller's cancellation; on success the caller receives the failover result, otherwise
the original cancellation is surfaced.
"""
import logging
import os
import threading
from typing import Any, Callable, Optional, Sequence, Tuple

from ._constants import _Constants
from .documents import _OperationType
from ._request_object import RequestObject
from .http_constants import ResourceType

logger = logging.getLogger("azure.cosmos._metadata_failover_grace")


def get_grace_seconds() -> float:
"""Resolve the bounded grace window for a metadata cross-region failover attempt.

Reads ``AZURE_COSMOS_METADATA_FAILOVER_GRACE_SECONDS`` (default
:attr:`_Constants.METADATA_FAILOVER_GRACE_SECONDS_DEFAULT`). A value ``<= 0``
disables the grace window (restores the prior preempted behavior). Values are
clamped to ``[0, METADATA_FAILOVER_GRACE_SECONDS_MAX]``. Malformed values fall
back to the default.

:returns: the grace window in seconds (``0`` means disabled).
:rtype: float
"""
raw = os.environ.get(_Constants.METADATA_FAILOVER_GRACE_SECONDS)
if raw is None:
value = _Constants.METADATA_FAILOVER_GRACE_SECONDS_DEFAULT
else:
try:
value = float(raw)
except (TypeError, ValueError):
value = _Constants.METADATA_FAILOVER_GRACE_SECONDS_DEFAULT
value = max(value, 0.0)
value = min(value, _Constants.METADATA_FAILOVER_GRACE_SECONDS_MAX)
return value


def is_metadata_failover_candidate(args: Sequence[Any]) -> bool:
"""Return True if the current request is a cold collection-metadata read.

The grace attempt is intentionally scoped to read-only collection
(``ResourceType.Collection``) reads, matching .NET which wires the detached
executor only into the collection-cache path. Data-plane operations keep their
existing cancellation semantics.

:param args: the positional args passed to the retry loop; ``args[0]`` is the
:class:`~azure.cosmos._request_object.RequestObject` when present.
:type args: Sequence[Any]
:returns: whether a metadata cross-region grace attempt is applicable.
:rtype: bool
"""
if not args:
return False
request = args[0]
if not isinstance(request, RequestObject):
return False
resource_type = getattr(request, "resource_type", None)
operation_type = getattr(request, "operation_type", None)
if resource_type != ResourceType.Collection:
return False
if operation_type is None:
return False
return _OperationType.IsReadOnlyOperation(operation_type)


def run_grace_attempt_sync(
attempt: Callable[[], Any],
grace_seconds: float,
) -> Tuple[bool, Optional[Any], Optional[BaseException]]:
"""Run a single cross-region metadata attempt detached from caller cancellation.

The attempt executes on a daemon thread bounded by ``grace_seconds`` so that the
caller's cancellation cannot preempt the cross-region failover decision. The
attempt performs exactly one request against the next preferred region (the
request was routed synchronously by the caller before this is invoked); it does
not re-enter the retry loop. If the grace window expires the thread is left
running in the background (its single in-flight request completes and its
retry-policy side-effects still benefit subsequent callers) and the caller
surfaces the original cancellation.

.. note::
On grace expiry the abandoned daemon thread may still be reading the shared
:class:`~azure.cosmos._request_object.RequestObject` while it completes its
single in-flight send. Callers must not mutate or reuse that request object
after the cancellation is surfaced; in practice the request belongs to the
failed operation and is discarded once it raises.

:param attempt: zero-arg callable performing exactly one cross-region attempt.
:type attempt: Callable
:param float grace_seconds: maximum time to wait for the attempt to complete.
:returns: ``(succeeded, result, exception)``. ``succeeded`` is True only when the
attempt completed within the grace window without raising.
:rtype: tuple[bool, object, BaseException]
"""
box: dict = {}

def _runner() -> None:
try:
box["result"] = attempt()
box["ok"] = True
except BaseException as exc: # pylint: disable=broad-except
box["exception"] = exc
box["ok"] = False

thread = threading.Thread(
target=_runner, name="cosmos-metadata-failover-grace", daemon=True)
thread.start()
thread.join(grace_seconds)

if thread.is_alive():
# Grace window expired; leave the detached attempt running in the background.
return False, None, None
if box.get("ok"):
return True, box.get("result"), None
return False, None, box.get("exception")
46 changes: 46 additions & 0 deletions sdk/cosmos/azure-cosmos/azure/cosmos/_retry_utility.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
from . import _session_retry_policy
from . import _timeout_failover_retry_policy
from . import exceptions
from . import _metadata_failover_grace
from ._constants import _Constants
from ._cosmos_http_logging_policy import _log_diagnostics_error
from ._global_partition_endpoint_manager_per_partition_automatic_failover import \
Expand Down Expand Up @@ -298,6 +299,51 @@ def Execute(client, global_endpoint_manager, function, *args, **kwargs): # pylin
_record_failure_if_request_not_cancelled(args[0], global_endpoint_manager, pk_range_wrapper)
_handle_service_response_retries(request, client, service_response_retry_policy, e, *args)

except BaseException as e: # pylint: disable=broad-except
# A caller request-level timeout / cancellation can fire mid-flight during a
# cold control-plane metadata (collection) read and preempt the retry loop
# before the cross-region failover policy is consulted, surfacing the
# cancellation instead of a successful failover to the next preferred region
# (see azure-sdk-for-python#46471 / azure-cosmos-dotnet-v3#5805). For metadata
# reads only, give the failover policy one bounded, cancellation-shielded
# attempt against the next region. Process-control signals and the SDK's own
# timeout error are never intercepted.
if isinstance(e, (KeyboardInterrupt, SystemExit, GeneratorExit,
exceptions.CosmosClientTimeoutError)):
raise
if not _metadata_failover_grace.is_metadata_failover_candidate(args):
raise
Comment on lines +302 to +315
grace_seconds = _metadata_failover_grace.get_grace_seconds()
if grace_seconds <= 0:
raise
if not timeout_failover_retry_policy.ShouldRetry(e):
raise
last_error = e

def _grace_attempt():
return ExecuteFunction(function, global_endpoint_manager, *args, **kwargs)

succeeded, grace_result, _grace_exc = _metadata_failover_grace.run_grace_attempt_sync(
_grace_attempt, grace_seconds)
if not succeeded:
# Surface the original caller cancellation; the detached attempt (if any)
# keeps running in the background to warm caches for subsequent callers.
if _grace_exc is not None:
logger.debug(
"Metadata cross-region failover grace attempt failed before the "
"caller cancellation was surfaced: %r", _grace_exc)
raise

if not client.last_response_headers:
client.last_response_headers = {}
client.last_response_headers[
HttpHeaders.ThrottleRetryCount
] = resourceThrottle_retry_policy.current_retry_attempt_count
client.last_response_headers[
HttpHeaders.ThrottleRetryWaitTimeInMs
] = resourceThrottle_retry_policy.cumulative_wait_time_in_milliseconds
return grace_result

def _record_success_if_request_not_cancelled(
request_params: RequestObject,
global_endpoint_manager: _GlobalPartitionEndpointManagerForPerPartitionAutomaticFailover,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
from .. import _session_retry_policy
from .. import _timeout_failover_retry_policy
from .. import exceptions
from .. import _metadata_failover_grace
from .._constants import _Constants
from .._container_recreate_retry_policy import ContainerRecreateRetryPolicy
from .._request_object import RequestObject
Expand All @@ -56,6 +57,24 @@
# pylint: disable=protected-access, disable=too-many-lines, disable=too-many-statements, disable=too-many-branches
# cspell:ignore ppaf, ppcb


def _observe_detached_grace_task(task: "asyncio.Task") -> None:
"""Retrieve a detached grace task's result/exception so it is marked observed.

When the metadata cross-region failover grace window expires, the shielded
attempt keeps running detached. Retrieving its eventual exception here prevents
asyncio from logging "Task exception was never retrieved" for that background task.

:param asyncio.Task task: the detached grace task.
"""
if task.cancelled():
return
try:
task.exception()
except asyncio.CancelledError:
pass


# args [0] is the request object
# args [1] is the connection policy
# args [2] is the pipeline client
Expand Down Expand Up @@ -314,6 +333,58 @@ async def ExecuteAsync(client, global_endpoint_manager, function, *args, **kwarg
pk_range_wrapper)
_handle_service_response_retries(request, client, service_response_retry_policy, e, *args)

except asyncio.CancelledError as e:
# A caller request-level timeout / cancellation can fire mid-flight during a
# cold control-plane metadata (collection) read and preempt the retry loop
# before the cross-region failover policy is consulted, surfacing the
# cancellation instead of a successful failover to the next preferred region
# (see azure-sdk-for-python#46471 / azure-cosmos-dotnet-v3#5805). For metadata
# reads only, give the failover policy one bounded, cancellation-shielded
# attempt against the next region. The attempt runs on a shielded task so the
# caller's cancellation cannot preempt the failover decision.
if not _metadata_failover_grace.is_metadata_failover_candidate(args):
raise
grace_seconds = _metadata_failover_grace.get_grace_seconds()
if grace_seconds <= 0:
raise
if not timeout_failover_retry_policy.ShouldRetry(e):
raise
last_error = e
# Run the cross-region attempt as an explicit task so that, if the grace
# window expires, the still-running detached task can be observed (its
# eventual exception retrieved) instead of raising an "exception was never
# retrieved" warning. The shield keeps the caller's cancellation from
# tearing down the detached task.
grace_task = asyncio.ensure_future(
ExecuteFunctionAsync(function, global_endpoint_manager, *args, **kwargs))
try:
grace_result = await asyncio.wait_for(
asyncio.shield(grace_task), timeout=grace_seconds)
except BaseException as grace_exc: # pylint: disable=broad-except
# Grace window expired or the cross-region attempt failed. Make sure the
# detached attempt's eventual result/exception is observed, then surface
# the original caller cancellation. The detached attempt (if still
# running) continues in the background to warm caches for subsequent
# callers.
if grace_task.done():
_observe_detached_grace_task(grace_task)
else:
grace_task.add_done_callback(_observe_detached_grace_task)
logger.debug(
"Metadata cross-region failover grace attempt did not complete "
"before the caller cancellation was surfaced: %r", grace_exc)
raise e from None
Comment on lines +360 to +376

if not client.last_response_headers:
client.last_response_headers = {}
client.last_response_headers[
HttpHeaders.ThrottleRetryCount
] = resourceThrottle_retry_policy.current_retry_attempt_count
client.last_response_headers[
HttpHeaders.ThrottleRetryWaitTimeInMs
] = resourceThrottle_retry_policy.cumulative_wait_time_in_milliseconds
return grace_result

async def _record_success_if_request_not_cancelled(
request_params: RequestObject,
global_endpoint_manager: _GlobalPartitionEndpointManagerForPerPartitionAutomaticFailoverAsync,
Expand Down
Loading
Loading