Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions sdk/cosmos/azure-cosmos/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#### Breaking Changes

#### Bugs Fixed
* Fixed per-call `read_timeout`, `connection_timeout`, and `timeout` (operation deadline) being dropped on the metadata calls a query makes before its first page.

#### Other Changes

Expand Down
73 changes: 73 additions & 0 deletions sdk/cosmos/azure-cosmos/azure/cosmos/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,11 @@ def build_options(kwargs: dict[str, Any]) -> dict[str, Any]:
options[Constants.Kwargs.READ_TIMEOUT] = kwargs[Constants.Kwargs.READ_TIMEOUT]
if Constants.Kwargs.TIMEOUT in kwargs:
options[Constants.Kwargs.TIMEOUT] = kwargs[Constants.Kwargs.TIMEOUT]
# Copy (not pop) so connection_timeout stays in kwargs for the page fetch
# and is also placed in options, where the container read, partition-key
# ranges, and query plan calls read it.
if Constants.Kwargs.CONNECTION_TIMEOUT in kwargs:
options[Constants.Kwargs.CONNECTION_TIMEOUT] = kwargs[Constants.Kwargs.CONNECTION_TIMEOUT]


options[Constants.OperationStartTime] = time.time()
Expand Down Expand Up @@ -1082,6 +1087,71 @@ def _build_properties_cache(properties: dict[str, Any], container_link: str) ->
"partitionKey": properties.get("partitionKey", None), "container_link": container_link
}

# The three per-call timeout keys a caller can set on one request. The deadline
# tuple below adds OperationStartTime to these; the carry helpers iterate that
# 4-key tuple, not this one.
_PER_CALL_TIMEOUT_OPTION_KEYS: Tuple[str, ...] = (

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟢 Suggestion — Maintainability: _PER_CALL_TIMEOUT_OPTION_KEYS is vestigial

This 3-key tuple has no production consumer. Both helpers (_carry_per_call_timeout_options, _copy_per_call_timeouts_to_kwargs) iterate _PER_CALL_DEADLINE_OPTION_KEYS (the 4-key tuple at line 1102). The 3-key tuple is referenced only in tests/test_timeout_propagation_unit.py:122 to assert its own shape.

The two named constants look like a hierarchy ("just the timeouts" vs "timeouts + anchor"), but only the 4-key one is real. A new contributor reasonably reaching for "the canonical set of per-call timeout option keys" would import this tuple and write a carry loop that silently drops OperationStartTime — defeating the operation-deadline semantics this PR is establishing.

Suggested fix: either inline the three keys directly into _PER_CALL_DEADLINE_OPTION_KEYS and drop the 3-key tuple + its test, or, if you want to keep the distinction, add a comment that this tuple is for "name-only" use and not for iteration. The current shape is one rename away from a regression that wouldn't be caught by the existing tests (which exercise the 4-key path).

⚠️ AI-generated review — may be incorrect. Agree? → resolve the conversation. Disagree? → reply with your reasoning.

Constants.Kwargs.READ_TIMEOUT,
Constants.Kwargs.CONNECTION_TIMEOUT,
Constants.Kwargs.TIMEOUT,
)

# timeout and OperationStartTime must travel together: the deadline is checked as
# now - OperationStartTime, which defaults to now when missing, so a metadata call
# without it would measure from its own start, not the operation's.
_PER_CALL_DEADLINE_OPTION_KEYS: Tuple[str, ...] = _PER_CALL_TIMEOUT_OPTION_KEYS + (
Constants.OperationStartTime,
)


def _carry_per_call_timeout_options(source: Optional[Mapping[str, Any]], destination: dict[str, Any]) -> None:
"""Copy the per-call timeouts and the operation start time from source into destination.

Copies read_timeout, connection_timeout, timeout, and OperationStartTime. Only
keys present in source are copied, so a timeout the caller did not set stays
absent and the request uses the client default instead of None. A None or empty
source is a no-op.

:param source: The request options to read the timeouts from (may be None or empty).
:type source: ~collections.abc.Mapping[str, typing.Any] or None
:param destination: The options dict to copy the timeouts into.
:type destination: dict[str, typing.Any]
:return: None
:rtype: None
"""
if not source:
return
for key in _PER_CALL_DEADLINE_OPTION_KEYS:

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟢 Suggestion — Defensive consistency: _carry_per_call_timeout_options copies explicit None while its sibling skips it

The two helpers handle None values asymmetrically:

  • _carry_per_call_timeout_options (this function, lines 1124-1126): copies any key that is present, including explicit None.
  • _copy_per_call_timeouts_to_kwargs (lines 1149-1152): skips None values via if value is not None.

This is fine today because every terminal lift into request kwargs goes through the None-skipping helper before reaching _synchronized_request._Request, where kwargs.pop("read_timeout", connection_policy.ReadTimeout) would otherwise return None and disable the socket read timeout (the exact bug Copilot flagged earlier in this PR for the query-plan dispatcher).

The latent risk is that a future code path might use the result of _carry_per_call_timeout_options to build options that are handed directly to the request layer (or used as the source for another carry that doesn't go through _copy_per_call_timeouts_to_kwargs). The intermediate dict would carry None through and re-introduce the same bug — silently, because the helpers' names don't hint at the asymmetry.

Suggested fix: make _carry_per_call_timeout_options also skip None (mirror the if value is not None guard). One-line change; eliminates the asymmetry; no behavior change today.

⚠️ AI-generated review — may be incorrect. Agree? → resolve the conversation. Disagree? → reply with your reasoning.

if key in source:
destination[key] = source[key]


def _copy_per_call_timeouts_to_kwargs(
options: Optional[Mapping[str, Any]],
kwargs: dict[str, Any]
) -> None:
"""Copy the per-call timeouts and the operation start time from options into kwargs.

Moves read_timeout, connection_timeout, timeout, and OperationStartTime from
the request options into the kwargs the request layer reads. A value is copied
only when it is set (not None), so an unset timeout falls back to the client
default instead of None; setdefault keeps any value already in kwargs.

:param options: The request options to read the timeouts from (may be None or empty).
:type options: ~collections.abc.Mapping[str, typing.Any] or None
:param kwargs: The kwargs dict to copy the timeouts into; mutated in place.
:type kwargs: dict[str, typing.Any]
:return: None
:rtype: None
"""
if not options:
return
for key in _PER_CALL_DEADLINE_OPTION_KEYS:
value = options.get(key)
if value is not None:
kwargs.setdefault(key, value)


def format_pk_range_options(query_options: Mapping[str, Any]) -> dict[str, Any]:
"""Formats the partition key range options to be used internally from the query ones.
:param dict query_options: The query options being used.
Expand All @@ -1094,4 +1164,7 @@ def format_pk_range_options(query_options: Mapping[str, Any]) -> dict[str, Any]:
pk_range_options[Constants.ContainerRID] = query_options[Constants.ContainerRID]
if "excludedLocations" in query_options:
pk_range_options["excludedLocations"] = query_options["excludedLocations"]
# Keep the per-call timeouts so the partition-key ranges fetch uses them
# instead of the client default.
_carry_per_call_timeout_options(query_options, pk_range_options)
return pk_range_options
10 changes: 6 additions & 4 deletions sdk/cosmos/azure-cosmos/azure/cosmos/_constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,15 +116,17 @@ class _Constants:
class Kwargs:
"""Keyword arguments used in the azure-cosmos package"""

# Whether to retry write operations if they fail. Used either at client level or request level.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💬 Observation — Scope: unrelated style change bundled with the bugfix

This hunk does two things: it adds the new CONNECTION_TIMEOUT (which the PR needs), and it converts the pre-existing RETRY_WRITE and AVAILABILITY_STRATEGY from PEP 224 attribute docstrings (triple-quoted strings after the assignment) into # comments before the assignment. The diff confirms it:

-        RETRY_WRITE: Literal["retry_write"] = "retry_write"
-        """Whether to retry write operations if they fail. Used either at client level or request level."""
+        # Whether to retry write operations if they fail. Used either at client level or request level.
+        RETRY_WRITE: Literal["retry_write"] = "retry_write"

The style conversion is unrelated to timeout propagation and has a small cost: # comments are not available to Sphinx autoattribute extensions or PyCharm-style attribute-doc tooling that pick up PEP 224 strings, whereas the previous form was. The class is private (_Constants.Kwargs), so external docs are unlikely to suffer — but the change makes this PR slightly harder to review (mixed-purpose hunks) and quietly downgrades IDE/Sphinx behavior for these constants.

Suggested: either revert the conversion for the two existing keys and add only CONNECTION_TIMEOUT in the existing PEP 224 style, or pull the style conversion into a separate PR. Either keeps this PR focused on the bug it advertises.

⚠️ AI-generated review — may be incorrect. Agree? → resolve the conversation. Disagree? → reply with your reasoning.

RETRY_WRITE: Literal["retry_write"] = "retry_write"
"""Whether to retry write operations if they fail. Used either at client level or request level."""
EXCLUDED_LOCATIONS: Literal["excludedLocations"] = "excludedLocations"
# Availability strategy config. Used either at client level or request level.
AVAILABILITY_STRATEGY: Literal["availabilityStrategy"] = "availabilityStrategy"
"""Availability strategy config. Used either at client level or request level"""
# Socket read timeout in seconds. Used either at client level or request level.
READ_TIMEOUT: Literal["read_timeout"] = "read_timeout"
"""Socket read timeout in seconds. Used either at client level or request level."""
# Absolute timeout in seconds for the combined HTTP request and response processing.
TIMEOUT: Literal["timeout"] = "timeout"
"""Absolute timeout in seconds for the combined HTTP request and response processing."""
# Socket connect (handshake) timeout in seconds. Used either at client level or request level.
CONNECTION_TIMEOUT: Literal["connection_timeout"] = "connection_timeout"

class UserAgentFeatureFlags(IntEnum):
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3244,18 +3244,11 @@ def __QueryFeed( # pylint: disable=too-many-locals, too-many-statements, too-ma
"""
if options is None:
options = {}
read_timeout = options.get("read_timeout")
if read_timeout is not None:
# we currently have a gap where kwargs are not getting passed correctly down the pipeline. In order to make
# absolute time out work, we are passing read_timeout via kwargs as a temporary fix
kwargs.setdefault("read_timeout", read_timeout)

operation_start_time = options.get(Constants.OperationStartTime)
if operation_start_time is not None:
kwargs.setdefault(Constants.OperationStartTime, operation_start_time)
timeout = options.get("timeout")
if timeout is not None:
kwargs.setdefault("timeout", timeout)
# Copy the per-call timeouts and the operation start time out of options into
# kwargs, where _Request reads them. A value is copied only when set, so
# an unset timeout falls back to the client/policy default instead of
# None; setdefault keeps any explicit kwarg the caller already placed.
base._copy_per_call_timeouts_to_kwargs(options, kwargs)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 Recommendation — Coverage: change-feed /pkranges setup still drops per-call timeouts

The PR description promises "a per-call timeout bounds the whole query — metadata calls included", and the new helper here lifts the per-call timeouts out of options into kwargs so the page-fetch path picks them up. Good — for regular queries.

But a few lines below, the change-feed branch builds a fresh feed_options dict and copies only excludedLocations into it:

change_feed_state: Optional[ChangeFeedState] = options.get("changeFeedState")
if change_feed_state is not None:
    feed_options = {}
    if 'excludedLocations' in options:
        feed_options['excludedLocations'] = options['excludedLocations']
    change_feed_state.populate_request_headers(self._routing_map_provider, headers, feed_options)

populate_request_headers then drives routing_provider.get_overlapping_ranges(..., feed_options) to do the /pkranges lookup (_change_feed/change_feed_state.py:289-293). Because feed_options doesn't include read_timeout / connection_timeout / timeout / OperationStartTime, this metadata /pkranges fetch falls back to the client/policy defaults — the exact symptom the PR was written to remove.

A customer calling container.query_items_change_feed(feed_range=fr, read_timeout=30, connection_timeout=2, timeout=10) on a cold client (or after a routing-cache invalidation from a 410 / partition split) will still see the misleading error message the PR's commit log calls out — just on the change-feed surface instead of the query surface.

The async sibling at aio/_cosmos_client_connection_async.py:3110-3114 has the same shape and the same gap.

Suggested fix: add base._carry_per_call_timeout_options(options, feed_options) after the excludedLocations copy in both files. Mechanically identical to what the PR already does for the hybrid all-ranges path in _execution_context/hybrid_search_aggregator.py. A parallel test modeled on test_metadata_timeout_propagation.py would lock it in.

If this is intentionally out of scope, please file a tracking issue and call it out in the CHANGELOG so customers know the change-feed surface isn't covered yet.

⚠️ AI-generated review — may be incorrect. Agree? → resolve the conversation. Disagree? → reply with your reasoning.


# Execution context injects this via request options; keep kwargs fallback
# for compatibility with call paths that still thread internal values there.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
from azure.cosmos.exceptions import CosmosHttpResponseError
from azure.cosmos.http_constants import StatusCodes
from ..._constants import _Constants as Constants
from ... import _base

# pylint: disable=protected-access

Expand Down Expand Up @@ -67,11 +68,16 @@ def __init__(self, client, resource_link, query, options, fetch_function,
async def _create_execution_context_with_query_plan(self):
self._fetched_query_plan = True
query_to_use = self._query if self._query is not None else "Select * from root r"
# Forward the per-call timeouts and OperationStartTime only when the caller
# set them, so an unset value falls back to the client/policy default
# instead of overriding it with None.
query_plan_kwargs = {}
_base._copy_per_call_timeouts_to_kwargs(self._options, query_plan_kwargs)
query_plan = await self._client._GetQueryPlanThroughGateway(
query_to_use,
self._resource_link,
self._options.get('excludedLocations'),
read_timeout=self._options.get('read_timeout')
**query_plan_kwargs
)
query_execution_info = _PartitionedQueryExecutionInfo(query_plan)
qe_info = getattr(query_execution_info, "_query_execution_info", None)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
_FULL_TEXT_SCORE_SCOPE_KEY, _FULL_TEXT_SCORE_SCOPE_LOCAL, _FULL_TEXT_SCORE_SCOPE_DEFAULT
from azure.cosmos._routing import routing_range
from azure.cosmos import exceptions
from azure.cosmos import _base
from ..._constants import _Constants as Constants

# pylint: disable=protected-access
Expand Down Expand Up @@ -297,6 +298,9 @@ async def _get_target_partition_key_range(self, target_all_ranges):
feed_options = {}
if Constants.ContainerRID in self._options:
feed_options[Constants.ContainerRID] = self._options[Constants.ContainerRID]
# This path calls _ReadPartitionKeyRanges directly and skips
# format_pk_range_options, so copy the per-call timeouts here too.
_base._carry_per_call_timeout_options(self._options, feed_options)
return [item async for item in self._client._ReadPartitionKeyRanges(
collection_link=self._resource_link, feed_options=feed_options)]
query_ranges = self._partitioned_query_ex_info.get_query_ranges()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
from azure.cosmos.documents import _DistinctType
from azure.cosmos.http_constants import StatusCodes, SubStatusCodes
from .._constants import _Constants as Constants
from .. import _base

# pylint: disable=protected-access

Expand Down Expand Up @@ -97,11 +98,16 @@ def __init__(self, client, resource_link, query, options, fetch_function, respon
def _create_execution_context_with_query_plan(self):
self._fetched_query_plan = True
query_to_use = self._query if self._query is not None else "Select * from root r"
# Forward the per-call timeouts and OperationStartTime only when the caller
# set them, so an unset value falls back to the client/policy default
# instead of overriding it with None.
query_plan_kwargs = {}
_base._copy_per_call_timeouts_to_kwargs(self._options, query_plan_kwargs)
query_plan = self._client._GetQueryPlanThroughGateway(
query_to_use,
self._resource_link,
self._options.get('excludedLocations'),
read_timeout=self._options.get('read_timeout')
**query_plan_kwargs
)
query_execution_info = _PartitionedQueryExecutionInfo(query_plan)
qe_info = getattr(query_execution_info, "_query_execution_info", None)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from azure.cosmos._execution_context import document_producer
from azure.cosmos._routing import routing_range
from azure.cosmos import exceptions
from azure.cosmos import _base
from .._constants import _Constants as Constants

# pylint: disable=protected-access
Expand Down Expand Up @@ -454,6 +455,9 @@ def _get_target_partition_key_range(self, target_all_ranges):
feed_options = {}
if Constants.ContainerRID in self._options:
feed_options[Constants.ContainerRID] = self._options[Constants.ContainerRID]
# This path calls _ReadPartitionKeyRanges directly and skips
# format_pk_range_options, so copy the per-call timeouts here too.
_base._carry_per_call_timeout_options(self._options, feed_options)
return list(self._client._ReadPartitionKeyRanges(
collection_link=self._resource_link, feed_options=feed_options))
query_ranges = self._partitioned_query_ex_info.get_query_ranges()
Expand Down
13 changes: 5 additions & 8 deletions sdk/cosmos/azure-cosmos/azure/cosmos/aio/_container.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@
from .. import _utils as utils
from .._availability_strategy_config import _validate_request_hedging_strategy
from .._base import (_build_properties_cache, _deserialize_throughput, _replace_throughput,
build_options as _build_options, GenerateGuidId, validate_cache_staleness_value)
build_options as _build_options, _copy_per_call_timeouts_to_kwargs,
GenerateGuidId, validate_cache_staleness_value)
from .._change_feed.feed_range_internal import FeedRangeInternalEpk

from .._cosmos_responses import CosmosDict, CosmosList, CosmosAsyncItemPaged
Expand Down Expand Up @@ -102,13 +103,9 @@ async def _get_properties_with_options(self, options: Optional[dict[str, Any]] =
if options:
if "excludedLocations" in options:
kwargs['excluded_locations'] = options['excludedLocations']
if Constants.OperationStartTime in options:
kwargs[Constants.OperationStartTime] = options[Constants.OperationStartTime]
if Constants.Kwargs.TIMEOUT in options:
kwargs[Constants.Kwargs.TIMEOUT] = options[Constants.Kwargs.TIMEOUT]
if Constants.Kwargs.READ_TIMEOUT in options:
kwargs[Constants.Kwargs.READ_TIMEOUT] = options[Constants.Kwargs.READ_TIMEOUT]

# Forward the per-call timeouts and the operation start time so the
# container read honors them instead of the client/policy default.
_copy_per_call_timeouts_to_kwargs(options, kwargs)
return await self._get_properties(**kwargs)

async def _get_properties(self, **kwargs: Any) -> dict[str, Any]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3038,20 +3038,11 @@ async def __QueryFeed( # pylint: disable=too-many-branches,too-many-statements,
if options is None:
options = {}

read_timeout = options.get("read_timeout")
if read_timeout is not None:
# we currently have a gap where kwargs are not getting passed correctly down the pipeline. In order to make
# absolute time out work, we are passing read_timeout via kwargs as a temporary fix
kwargs.setdefault("read_timeout", read_timeout)

operation_start_time = options.get(Constants.OperationStartTime)
if operation_start_time is not None:
# we need to set operation_state in kwargs as thats where it is looked at while sending the request
kwargs.setdefault(Constants.OperationStartTime, operation_start_time)
timeout = options.get("timeout")
if timeout is not None:
# we need to set operation_state in kwargs as that's where it is looked at while sending the request
kwargs.setdefault("timeout", timeout)
# Copy the per-call timeouts and the operation start time out of options into
# kwargs, where _Request reads them. A value is copied only when set, so
# an unset timeout falls back to the client/policy default instead of
# None; setdefault keeps any explicit kwarg the caller already placed.
base._copy_per_call_timeouts_to_kwargs(options, kwargs)

# The capture dict can arrive via two upstream paths:
# 1. The query execution context puts it into ``options`` (the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,11 +140,13 @@ async def ExecuteAsync(client, global_endpoint_manager, function, *args, **kwarg
await _record_success_if_request_not_cancelled(args[0], global_endpoint_manager, pk_range_wrapper)
else:
result = await ExecuteFunctionAsync(function, *args, **kwargs)
# Check timeout after successful execution
if timeout:
elapsed = time.time() - operation_start_time
if elapsed >= timeout:
raise exceptions.CosmosClientTimeoutError(error=last_error)
# Check the deadline after a successful call. Outside the if/else so it
# also covers the normal request path (if args), matching the sync loop:
# a call that succeeds after the deadline passed must still raise.
if timeout:
elapsed = time.time() - operation_start_time
if elapsed >= timeout:
raise exceptions.CosmosClientTimeoutError(error=last_error)
if not client.last_response_headers:
client.last_response_headers = {}

Expand Down
11 changes: 4 additions & 7 deletions sdk/cosmos/azure-cosmos/azure/cosmos/container.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
from . import _utils as utils
from ._availability_strategy_config import _validate_request_hedging_strategy
from ._base import (_build_properties_cache, _deserialize_throughput, _replace_throughput, build_options,
GenerateGuidId, validate_cache_staleness_value)
_copy_per_call_timeouts_to_kwargs, GenerateGuidId, validate_cache_staleness_value)
from ._change_feed.feed_range_internal import FeedRangeInternalEpk
from ._constants import _Constants as Constants, TimeoutScope
from ._cosmos_client_connection import CosmosClientConnection
Expand Down Expand Up @@ -103,12 +103,9 @@ def _get_properties_with_options(self, options: Optional[dict[str, Any]] = None)
if options:
if "excludedLocations" in options:
kwargs['excluded_locations'] = options['excludedLocations']
if Constants.OperationStartTime in options:
kwargs[Constants.OperationStartTime] = options[Constants.OperationStartTime]
if Constants.Kwargs.TIMEOUT in options:
kwargs[Constants.Kwargs.TIMEOUT] = options[Constants.Kwargs.TIMEOUT]
if Constants.Kwargs.READ_TIMEOUT in options:
kwargs[Constants.Kwargs.READ_TIMEOUT] = options[Constants.Kwargs.READ_TIMEOUT]
# Forward the per-call timeouts and the operation start time so the
# container read honors them instead of the client/policy default.
_copy_per_call_timeouts_to_kwargs(options, kwargs)
return self._get_properties(**kwargs)

def _get_properties(self, **kwargs: Any) -> dict[str, Any]:
Expand Down
Loading
Loading