diff --git a/.gitignore b/.gitignore index ec5476fa210..208fd5eeaaf 100644 --- a/.gitignore +++ b/.gitignore @@ -18,3 +18,4 @@ gocd/templates/vendor/ gocd/generated-pipelines/ Brewfile.lock.json .zed/ +dump.rdb diff --git a/pyproject.toml b/pyproject.toml index e330d09d3c7..a9e9615446e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,6 +4,7 @@ version = "26.7.0.dev0" dependencies = [ "blinker>=1.9", "click>=8.1.7", + "clickhouse-connect>=0.8.0", "clickhouse-driver>=0.2.10", "confluent-kafka>=2.7.0", "datadog>=0.49.1", @@ -125,6 +126,8 @@ exclude = ["^rust_snuba/", "^tests/datasets/", "^tests/query/"] [[tool.mypy.overrides]] module = [ "_strptime", + "clickhouse_connect", + "clickhouse_connect.*", "clickhouse_driver", "clickhouse_driver.errors", "confluent_kafka", diff --git a/snuba/admin/clickhouse/common.py b/snuba/admin/clickhouse/common.py index 25c71940b75..d74ee6ea715 100644 --- a/snuba/admin/clickhouse/common.py +++ b/snuba/admin/clickhouse/common.py @@ -7,7 +7,13 @@ from snuba import settings from snuba.clickhouse.native import ClickhousePool -from snuba.clusters.cluster import ClickhouseClientSettings, ClickhouseCluster +from snuba.clusters.cluster import ( + ClickhouseClientSettings, + ClickhouseCluster, + ClickhouseNode, + connection_cache, + use_clickhouse_connect_driver, +) from snuba.datasets.storage import ReadableTableStorage from snuba.datasets.storages.factory import get_storage from snuba.datasets.storages.storage_key import StorageKey @@ -43,7 +49,7 @@ def is_valid_node(host: str, port: int, cluster: ClickhouseCluster, storage_name }, ) - return any(node.host_name == host and node.port == port for node in nodes) + return any(node.host_name == host and node.native_port == port for node in nodes) def _get_storage(storage_name: str) -> ReadableTableStorage: @@ -71,7 +77,7 @@ def _validate_node( "host": clickhouse_host, "port": clickhouse_port, "query_host": cluster.get_query_node().host_name, - "query_port": cluster.get_query_node().port, + "query_port": cluster.get_query_node().native_port, }, ) @@ -89,24 +95,38 @@ def _build_validated_pool( password: str, client_settings: ClickhouseClientSettings, ) -> ClickhousePool: - # Single chokepoint for admin ClickhousePool construction. ClickhousePool - # ships the user/password in the first hello packet of the native protocol, - # so an unvalidated host means credentials reach whatever listener answers. - # All admin helpers must go through here — never call ClickhousePool - # directly from this module. The regression test - # test_no_direct_clickhouse_pool_construction_in_admin enforces this. + # Single chokepoint for admin ClickhousePool acquisition. A pool ships the + # user/password to the node (the native protocol's first hello packet, or + # the HTTP auth header), so an unvalidated host means credentials reach + # whatever listener answers. All admin helpers must go through here — never + # acquire a pool from the connection cache directly in this module. The + # regression test test_no_direct_clickhouse_pool_construction_in_admin + # enforces this. _validate_node(clickhouse_host, clickhouse_port, cluster, storage_name) - return ClickhousePool( - clickhouse_host, - clickhouse_port, + # Go through the shared connection cache so the driver (native vs + # clickhouse-connect/HTTP) is selected by the runtime config, behind the + # abstract ClickhousePool type, just like the cluster's own connections. + return connection_cache.get_node_connection( + client_settings, + ClickhouseNode(clickhouse_host, clickhouse_port, http_port=cluster.get_http_port()), username, password, database, - max_pool_size=2, - client_settings=client_settings.value.settings, + secure=False, + ca_certs=None, + verify=False, ) +def _driver_cache_token() -> str: + # Part of the admin connection cache keys so that flipping the + # use_clickhouse_connect_driver runtime flag re-resolves admin connections + # to the new driver, instead of returning a pool pinned to whichever driver + # was active when the entry was first cached. This keeps admin traffic + # switchable at runtime, like the cluster query/reader paths. + return "connect" if use_clickhouse_connect_driver() else "native" + + def get_ro_node_connection( clickhouse_host: str, clickhouse_port: int, @@ -115,7 +135,7 @@ def get_ro_node_connection( ) -> ClickhousePool: storage = _get_storage(storage_name) - key = f"{storage.get_storage_key()}-{clickhouse_host}" + key = f"{storage.get_storage_key()}-{clickhouse_host}-{_driver_cache_token()}" if key in NODE_CONNECTIONS: return NODE_CONNECTIONS[key] @@ -162,8 +182,9 @@ def get_ro_node_connection( def get_ro_query_node_connection( storage_name: str, client_settings: ClickhouseClientSettings ) -> ClickhousePool: - if storage_name in CLUSTER_CONNECTIONS: - return CLUSTER_CONNECTIONS[storage_name] + key = f"{storage_name}-{_driver_cache_token()}" + if key in CLUSTER_CONNECTIONS: + return CLUSTER_CONNECTIONS[key] storage = _get_storage(storage_name) cluster = storage.get_cluster() @@ -172,7 +193,7 @@ def get_ro_query_node_connection( connection_id.hostname, connection_id.tcp_port, storage_name, client_settings ) - CLUSTER_CONNECTIONS[storage_name] = connection + CLUSTER_CONNECTIONS[key] = connection return connection @@ -184,7 +205,7 @@ def get_sudo_node_connection( ) -> ClickhousePool: storage = _get_storage(storage_name) - key = f"{storage.get_storage_key()}-{clickhouse_host}-sudo" + key = f"{storage.get_storage_key()}-{clickhouse_host}-sudo-{_driver_cache_token()}" if key in NODE_CONNECTIONS: return NODE_CONNECTIONS[key] @@ -216,7 +237,7 @@ def get_clusterless_node_connection( cluster = storage.get_cluster() database = cluster.get_database() - key = f"{storage.get_storage_key()}-{clickhouse_host}-clusterless-{database}" + key = f"{storage.get_storage_key()}-{clickhouse_host}-clusterless-{database}-{_driver_cache_token()}" if key in NODE_CONNECTIONS: return NODE_CONNECTIONS[key] @@ -245,7 +266,7 @@ def get_ro_clusterless_node_connection( cluster = storage.get_cluster() database = cluster.get_database() - key = f"{storage.get_storage_key()}-{clickhouse_host}-clusterless-ro-{database}" + key = f"{storage.get_storage_key()}-{clickhouse_host}-clusterless-ro-{database}-{_driver_cache_token()}" if key in NODE_CONNECTIONS: return NODE_CONNECTIONS[key] diff --git a/snuba/admin/clickhouse/copy_tables.py b/snuba/admin/clickhouse/copy_tables.py index 52cdd90bb78..c3d44f27766 100644 --- a/snuba/admin/clickhouse/copy_tables.py +++ b/snuba/admin/clickhouse/copy_tables.py @@ -128,7 +128,9 @@ def copy_tables( skip_on_cluster: bool = False, cluster_name_override: Optional[str] = None, ) -> CopyTablesResponse: - settings = ClickhouseClientSettings.QUERY + # Table copies can run long, so use the unbounded INTERNAL profile rather + # than the 30s user-read QUERY profile. + settings = ClickhouseClientSettings.INTERNAL source_connection = get_clusterless_node_connection( source_host, 9000, storage_name, client_settings=settings ) diff --git a/snuba/admin/clickhouse/nodes.py b/snuba/admin/clickhouse/nodes.py index 59a2f4b8536..9dd252ac735 100644 --- a/snuba/admin/clickhouse/nodes.py +++ b/snuba/admin/clickhouse/nodes.py @@ -48,7 +48,7 @@ def _get_nodes(storage_key: StorageKey, local: bool = True) -> Sequence[Node]: return [] else: return [ - {"host": node.host_name, "port": node.port} + {"host": node.host_name, "port": node.native_port} for node in ( cluster.get_local_nodes() if local else cluster.get_distributed_nodes() ) @@ -62,7 +62,7 @@ def _get_query_node(storage_key: StorageKey) -> Optional[Node]: try: cluster = get_storage(storage_key).get_cluster() query_node = cluster.get_query_node() - return {"host": query_node.host_name, "port": query_node.port} + return {"host": query_node.host_name, "port": query_node.native_port} except (AssertionError, KeyError, UndefinedClickhouseCluster): return None diff --git a/snuba/cli/cleanup.py b/snuba/cli/cleanup.py index 10ce232eb62..4616f2ac425 100644 --- a/snuba/cli/cleanup.py +++ b/snuba/cli/cleanup.py @@ -70,6 +70,7 @@ def cleanup( from snuba.cleanup import logger, run_cleanup from snuba.clickhouse.native import ClickhousePool + from snuba.clusters.cluster import ClickhouseNode, connection_cache storage = get_writable_storage(StorageKey(storage_name)) @@ -81,16 +82,20 @@ def cleanup( cluster = storage.get_cluster() database = cluster.get_database() + connection: ClickhousePool if clickhouse_host and clickhouse_port: - connection = ClickhousePool( - clickhouse_host, - clickhouse_port, + # Go through the shared connection cache so the driver (native vs + # clickhouse-connect/HTTP) is selected by the runtime config, behind + # the abstract ClickhousePool type. + connection = connection_cache.get_node_connection( + ClickhouseClientSettings.CLEANUP, + ClickhouseNode(clickhouse_host, clickhouse_port, http_port=cluster.get_http_port()), clickhouse_user, clickhouse_password, database, - clickhouse_secure, - clickhouse_ca_certs, - clickhouse_verify, + secure=clickhouse_secure, + ca_certs=clickhouse_ca_certs, + verify=clickhouse_verify, ) elif not cluster.is_single_node(): raise click.ClickException("Provide ClickHouse host and port for cleanup") diff --git a/snuba/cli/optimize.py b/snuba/cli/optimize.py index f5d91dbe4a9..4d42453d90f 100644 --- a/snuba/cli/optimize.py +++ b/snuba/cli/optimize.py @@ -80,6 +80,7 @@ def optimize( from snuba.clickhouse.native import ClickhousePool from snuba.clickhouse.optimize.optimize import logger + from snuba.clusters.cluster import ClickhouseNode, connection_cache setup_logging(log_level) setup_sentry() @@ -100,17 +101,25 @@ def optimize( # passing this information won't be necessary, and running this command once # will ensure that optimize is performed on all of the individual nodes for # that cluster. + connection: ClickhousePool if clickhouse_host and clickhouse_port: - connection = ClickhousePool( - clickhouse_host, - clickhouse_port, + # Go through the shared connection cache so the driver (native vs + # clickhouse-connect/HTTP) is selected by the runtime config, behind + # the abstract ClickhousePool type. The OPTIMIZE timeout is carried by + # the client settings profile the cache reads. + connection = connection_cache.get_node_connection( + ClickhouseClientSettings.OPTIMIZE, + ClickhouseNode( + clickhouse_host, + clickhouse_port, + http_port=storage.get_cluster().get_http_port(), + ), clickhouse_user, clickhouse_password, database, - clickhouse_secure, - clickhouse_ca_certs, - clickhouse_verify, - send_receive_timeout=ClickhouseClientSettings.OPTIMIZE.value.timeout, + secure=clickhouse_secure, + ca_certs=clickhouse_ca_certs, + verify=clickhouse_verify, ) elif not storage.get_cluster().is_single_node(): raise click.ClickException("Provide Clickhouse host and port for optimize") diff --git a/snuba/cli/querylog_to_csv.py b/snuba/cli/querylog_to_csv.py index 497bde52738..3d7c5ea43c5 100644 --- a/snuba/cli/querylog_to_csv.py +++ b/snuba/cli/querylog_to_csv.py @@ -1,4 +1,5 @@ import csv +import os from datetime import datetime from typing import NamedTuple, Optional, Sequence, Tuple @@ -7,8 +8,12 @@ from snuba import settings from snuba.admin.notifications.slack.client import SlackClient -from snuba.clickhouse.native import ClickhousePool -from snuba.clusters.cluster import ClickhouseClientSettings +from snuba.clusters.cluster import ( + DEFAULT_CLICKHOUSE_HTTP_PORT, + ClickhouseClientSettings, + ClickhouseNode, + connection_cache, +) from snuba.environment import setup_logging, setup_sentry logger = structlog.get_logger().bind(module=__name__) @@ -160,13 +165,21 @@ def querylog_to_csv( query = get_query_results(event_type, [database], tables, start_time, end_time) (clickhouse_user, clickhouse_password) = get_credentials() - connection = ClickhousePool( - host=clickhouse_host, - port=clickhouse_port, - user=clickhouse_user, - password=clickhouse_password, - database=database, - client_settings=ClickhouseClientSettings.QUERY.value.settings, + # Go through the shared connection cache so the driver (native vs + # clickhouse-connect/HTTP) is selected by the runtime config, behind the + # abstract ClickhousePool type. There is no cluster here to read an + # http_port from, so use the configured CLICKHOUSE_HTTP_PORT (the same env + # var the cluster config reads), defaulting to the well-known port. + http_port = int(os.environ.get("CLICKHOUSE_HTTP_PORT", DEFAULT_CLICKHOUSE_HTTP_PORT)) + connection = connection_cache.get_node_connection( + ClickhouseClientSettings.QUERY, + ClickhouseNode(clickhouse_host, clickhouse_port, http_port=http_port), + clickhouse_user, + clickhouse_password, + database, + secure=False, + ca_certs=None, + verify=False, ) results = connection.execute(query) filename = format_filename(table) diff --git a/snuba/clickhouse/connect.py b/snuba/clickhouse/connect.py new file mode 100644 index 00000000000..0dc8ce92444 --- /dev/null +++ b/snuba/clickhouse/connect.py @@ -0,0 +1,336 @@ +from __future__ import annotations + +import logging +from threading import Lock +from typing import Any, Mapping, Optional, Sequence + +import clickhouse_connect +import sentry_sdk +from clickhouse_connect import common as clickhouse_connect_common +from clickhouse_connect.driver.client import Client +from clickhouse_connect.driver.exceptions import ClickHouseError, OperationalError +from clickhouse_connect.driver.httputil import get_pool_manager + +from snuba import environment, settings, state +from snuba.clickhouse.errors import ClickhouseError +from snuba.clickhouse.native import ( + ClickhousePool, + ClickhouseProfile, + ClickhouseResult, + Params, +) +from snuba.utils.metrics.wrapper import MetricsWrapper + +logger = logging.getLogger("snuba.clickhouse.connect") + +metrics = MetricsWrapper(environment.metrics, "clickhouse.connect") + +# Fallback send/receive timeout (seconds) used when a client settings profile +# does not specify one. Matches clickhouse-connect's own default. Per-profile +# timeouts (e.g. 30s for reads, longer for migrations) are honored as-is, the +# same way the native driver uses them. +DEFAULT_SEND_RECEIVE_TIMEOUT_SECONDS = 300 + +# Default ClickHouse HTTP port, used when a caller does not pass one. +DEFAULT_CLICKHOUSE_HTTP_PORT = 8123 + +# clickhouse-connect raises a ProgrammingError by default when it is asked to +# send a setting it considers unknown or readonly. The native driver simply +# forwards whatever settings it is given to the server, so to preserve parity +# we tell clickhouse-connect to drop unrecognized settings instead of failing. +clickhouse_connect_common.set_setting("invalid_setting_action", "drop") + + +class ClickhouseConnectPool(ClickhousePool): + """ + HTTP based ClickHouse client backed by ``clickhouse-connect``. + + It subclasses :class:`snuba.clickhouse.native.ClickhousePool` and overrides + the ``execute`` / ``execute_robust`` / ``close`` interface so it is a true + drop-in replacement. The decision of which pool to instantiate is made by + the connection cache (see :mod:`snuba.clusters.cluster`), one level above + the individual drivers. + + Unlike the native pool, this class does not maintain its own queue of + connections: ``clickhouse-connect`` manages an HTTP connection pool (via + ``urllib3``) for us. A single :class:`Client` is created lazily and reused + across threads, with the underlying pool sized to ``max_pool_size``. + """ + + def __init__( + self, + host: str, + user: str, + password: str, + database: str, + http_port: int = DEFAULT_CLICKHOUSE_HTTP_PORT, + secure: bool = False, + ca_certs: Optional[str] = None, + verify: Optional[bool] = False, + connect_timeout: int = 1, + send_receive_timeout: Optional[int] = 35, + client_settings: Mapping[str, Any] = {}, + ) -> None: + # No native connection queue here; clickhouse-connect manages its own + # HTTP pool. ``port`` is the abstract base attribute (it holds the + # cluster's configured HTTP port for this driver). The pool size is not + # a construction parameter: it is always taken from the + # ``clickhouse_connect_pool_size`` runtime config (see _get_client), so + # it can be tuned at runtime without rebuilding pools. + self.host = host + self.port = http_port + self.user = user + self.password = password + self.database = database + self.secure = secure + self.ca_certs = ca_certs + self.verify = verify + self.connect_timeout = connect_timeout + self.send_receive_timeout = send_receive_timeout + self.client_settings = client_settings + + self.__client: Optional[Client] = None + self.__lock = Lock() + + def _get_client(self) -> Client: + # The client (and its handshake with the server) is created lazily so + # that simply constructing a pool does not open a connection. + if self.__client is None: + with self.__lock: + if self.__client is None: + # Pool size always comes from the clickhouse_connect_pool_size + # runtime config, falling back to the configured + # CLICKHOUSE_MAX_POOL_SIZE. The value is read once, when the + # (cached) client is first created. + pool_size = ( + state.get_int_config( + "clickhouse_connect_pool_size", settings.CLICKHOUSE_MAX_POOL_SIZE + ) + or settings.CLICKHOUSE_MAX_POOL_SIZE + ) + pool_mgr = get_pool_manager( + ca_cert=self.ca_certs, + verify=bool(self.verify), + maxsize=pool_size, + # All requests go to a single host, so a single pool is + # enough. Keep a small margin for safety. + num_pools=2, + ) + self.__client = clickhouse_connect.get_client( + host=self.host, + port=self.port, + username=self.user, + password=self.password, + database=self.database, + interface="https" if self.secure else "http", + secure=self.secure, + verify=bool(self.verify), + ca_cert=self.ca_certs, + connect_timeout=self.connect_timeout, + # Honor the per-profile timeout as-is, like the native + # driver does (reads get 30s, migrations/DDL keep their + # longer timeouts). Fall back to the default when a + # profile does not set one. + send_receive_timeout=( + self.send_receive_timeout + if self.send_receive_timeout is not None + else DEFAULT_SEND_RECEIVE_TIMEOUT_SECONDS + ), + settings=dict(self.client_settings), + pool_mgr=pool_mgr, + # The native driver applies no implicit row limit; match + # that behavior here. + query_limit=0, + # Sessions serialize queries on the server. We share a + # single client across threads, so sessions must be + # disabled to allow concurrent queries. + autogenerate_session_id=False, + ) + return self.__client + + def _build_query_settings( + self, + settings: Optional[Mapping[str, Any]], + query_id: Optional[str], + capture_trace: bool, + ) -> Optional[Mapping[str, Any]]: + query_settings = dict(settings) if settings else {} + if query_id is not None: + query_settings["query_id"] = query_id + if capture_trace: + # We still ask the server to emit trace logs, but unlike the native + # driver clickhouse-connect does not surface them (it only reads the + # X-ClickHouse-Summary header), so ``trace_output`` ends up empty on + # this path. See the note in _execute_once. Practically this means + # the snuba-admin trace view and its profile-events parsing return + # nothing when the HTTP driver is enabled; every other admin query + # path is driver-agnostic. Reconstructing traces over HTTP would + # require querying system.text_log by query_id (a separate feature). + query_settings["send_logs_level"] = "trace" + return query_settings or None + + def _execute_once( + self, + query: str, + params: Params, + with_column_types: bool, + query_id: Optional[str], + settings: Optional[Mapping[str, Any]], + columnar: bool, + capture_trace: bool, + ) -> ClickhouseResult: + client = self._get_client() + query_settings = self._build_query_settings(settings, query_id, capture_trace) + + with sentry_sdk.start_span(description=query, op="db.clickhouse") as span: + span.set_data(sentry_sdk.consts.SPANDATA.DB_SYSTEM, "clickhouse") + span.set_data("query_id", query_id) + span.set_data("settings", query_settings) + query_result = client.query( + query, + parameters=params if params else None, + settings=query_settings, + column_oriented=columnar, + ) + + summary = query_result.summary or {} + + def _int(key: str) -> int: + value = summary.get(key) + try: + return int(value) if value is not None else 0 + except (TypeError, ValueError): + return 0 + + elapsed_ns = summary.get("elapsed_ns") + try: + elapsed = float(elapsed_ns) / 1e9 if elapsed_ns is not None else 0.0 + except (TypeError, ValueError): + elapsed = 0.0 + + profile_data = ClickhouseProfile( + blocks=0, + bytes=_int("read_bytes"), + elapsed=elapsed, + progress_bytes=_int("read_bytes"), + rows=_int("read_rows"), + ) + + results: Sequence[Any] = query_result.result_set + + # trace_output is always empty here: clickhouse-connect has no mechanism + # for capturing the server's send_logs_level output (it only parses the + # X-ClickHouse-Summary header for the profile above). This is a known, + # accepted limitation of the HTTP path — see _build_query_settings. + if with_column_types: + meta = [ + (name, column_type.name) + for name, column_type in zip(query_result.column_names, query_result.column_types) + ] + return ClickhouseResult( + results=results, + meta=meta, + profile=profile_data, + trace_output="", + ) + + return ClickhouseResult( + results=results, + profile=profile_data, + trace_output="", + ) + + def execute( + self, + query: str, + params: Params = None, + with_column_types: bool = False, + query_id: Optional[str] = None, + settings: Optional[Mapping[str, Any]] = None, + types_check: bool = False, + columnar: bool = False, + capture_trace: bool = False, + retryable: bool = True, + ) -> ClickhouseResult: + """ + Execute a clickhouse query. + + Unlike :class:`snuba.clickhouse.native.ClickhouseNativePool`, this + method does not implement any retry logic of its own. Retries (stale + keep-alive sockets, transport errors and HTTP 429/503/504 responses) + are handled internally by clickhouse-connect. Notably this means the + native pool's ``TOO_MANY_SIMULTANEOUS_QUERIES`` backoff is *not* + replicated: clickhouse-connect does not retry that error, so it is + surfaced directly to the caller. + + The ``retryable`` argument is accepted for interface parity with the + native pool but has no effect here. + """ + try: + return self._execute_once( + query, + params, + with_column_types, + query_id, + settings, + columnar, + capture_trace, + ) + except OperationalError as e: + # Connection/transport level failures. Mirrors the native pool's + # handling of NetworkError/SocketTimeoutError by emitting the + # connection_error metric before surfacing the error. + metrics.increment( + "connection_error", + tags={ + "host": self.host, + "port": str(self.port), + "user": self.user, + "database": self.database, + }, + ) + raise ClickhouseError(str(e), code=getattr(e, "code", None) or -1) from e + except ClickHouseError as e: + # ClickHouseError is the base class for every clickhouse-connect + # error (DatabaseError, ProgrammingError, DataError, ...). The + # native pool likewise wraps the whole clickhouse_driver errors.Error + # family into ClickhouseError, preserving the server error code when + # there is one. + raise ClickhouseError(str(e), code=getattr(e, "code", None) or -1) from e + + def execute_robust( + self, + query: str, + params: Params = None, + with_column_types: bool = False, + query_id: Optional[str] = None, + settings: Optional[Mapping[str, Any]] = None, + types_check: bool = False, + columnar: bool = False, + capture_trace: bool = False, + retryable: bool = True, + ) -> ClickhouseResult: + """ + Mirrors :meth:`ClickhousePool.execute_robust`. Since retries are + delegated to clickhouse-connect, this is equivalent to :meth:`execute`. + """ + return self.execute( + query, + params=params, + with_column_types=with_column_types, + query_id=query_id, + settings=settings, + types_check=types_check, + columnar=columnar, + capture_trace=capture_trace, + retryable=retryable, + ) + + def close(self) -> None: + # Take the same lock _get_client uses so a concurrent lazy init can't + # race with teardown (one thread closing the client while another is + # creating or about to use it). + with self.__lock: + if self.__client is not None: + self.__client.close() + self.__client = None diff --git a/snuba/clickhouse/native.py b/snuba/clickhouse/native.py index a737a5e30d4..7ea3f40a315 100644 --- a/snuba/clickhouse/native.py +++ b/snuba/clickhouse/native.py @@ -4,6 +4,7 @@ import queue import re import time +from abc import ABC, abstractmethod from contextlib import contextmanager from dataclasses import dataclass, field from datetime import date, datetime @@ -72,7 +73,61 @@ def capture_logging() -> Generator[StringIO, None, None]: buffer.close() -class ClickhousePool(object): +class ClickhousePool(ABC): + """ + Abstract base for a pool of ClickHouse connections. + + Concrete implementations: + - :class:`ClickhouseNativePool` — native protocol via clickhouse-driver + - ``ClickhouseConnectPool`` (snuba.clickhouse.connect) — HTTP protocol via + clickhouse-connect + + Callers receive connections typed as ``ClickhousePool`` and only rely on the + methods/attributes declared here, so the two drivers are interchangeable. + """ + + host: str + port: int + user: str + password: str + database: str + + @abstractmethod + def execute( + self, + query: str, + params: Params = None, + with_column_types: bool = False, + query_id: Optional[str] = None, + settings: Optional[Mapping[str, Any]] = None, + types_check: bool = False, + columnar: bool = False, + capture_trace: bool = False, + retryable: bool = True, + ) -> ClickhouseResult: + raise NotImplementedError + + @abstractmethod + def execute_robust( + self, + query: str, + params: Params = None, + with_column_types: bool = False, + query_id: Optional[str] = None, + settings: Optional[Mapping[str, Any]] = None, + types_check: bool = False, + columnar: bool = False, + capture_trace: bool = False, + retryable: bool = True, + ) -> ClickhouseResult: + raise NotImplementedError + + @abstractmethod + def close(self) -> None: + raise NotImplementedError + + +class ClickhouseNativePool(ClickhousePool): def __init__( self, host: str, @@ -400,7 +455,14 @@ def transform_uuid(value: UUID) -> str: ) -class NativeDriverReader(Reader): +class ClickhouseReader(Reader): + """ + Reader for ClickHouse queries. It adapts a :class:`ClickhouseResult` into the + JSON-flavored ``Result``. It is driver-agnostic: it wraps the abstract + :class:`ClickhousePool`, so the same reader works for both the native and the + clickhouse-connect (HTTP) pools. + """ + def __init__( self, cache_partition_id: Optional[str], diff --git a/snuba/clusters/cluster.py b/snuba/clusters/cluster.py index 60fd982a62d..9046c537fdf 100644 --- a/snuba/clusters/cluster.py +++ b/snuba/clusters/cluster.py @@ -18,9 +18,13 @@ import structlog -from snuba import settings +from snuba import settings, state from snuba.clickhouse.http import HTTPBatchWriter, InsertStatement, JSONRow -from snuba.clickhouse.native import ClickhousePool, NativeDriverReader +from snuba.clickhouse.native import ( + ClickhouseNativePool, + ClickhousePool, + ClickhouseReader, +) from snuba.clusters.storage_sets import ( DEV_STORAGE_SETS, StorageSetKey, @@ -33,6 +37,11 @@ logger = structlog.get_logger().bind(module=__name__) +# Well-known default ClickHouse HTTP port, used by by-host helpers (e.g. CLI +# tools) that only know a node's native address and have no cluster config to +# read an http_port from. +DEFAULT_CLICKHOUSE_HTTP_PORT = 8123 + class ClickhouseClientSettingsType(NamedTuple): settings: Mapping[str, Any] @@ -65,7 +74,16 @@ class ClickhouseClientSettings(Enum): ) DELETE = ClickhouseClientSettingsType({"mutations_sync": 1}, None) OPTIMIZE = ClickhouseClientSettingsType({}, settings.OPTIMIZE_QUERY_TIMEOUT) - QUERY = ClickhouseClientSettingsType({}, None) + # User-facing read queries get a 30s timeout. Migrations, DDL and other + # long-running operations keep their own (default or longer) timeouts + # above/below. + QUERY = ClickhouseClientSettingsType({}, 30) + # Internal/maintenance queries that are NOT user-facing reads and must not + # inherit QUERY's 30s cap: cluster topology discovery (system.clusters), + # storage-routing load lookups, delete-throttling system-table checks, the + # span-export job and admin table copies. These can legitimately run long, + # so they stay unbounded (their behavior before QUERY got a read timeout). + INTERNAL = ClickhouseClientSettingsType({}, None) QUERYLOG = ClickhouseClientSettingsType({}, None) TRACING = ClickhouseClientSettingsType({"readonly": 2}, None) REPLACE = ClickhouseClientSettingsType( @@ -103,12 +121,17 @@ class ClickhouseClientSettings(Enum): @dataclass(frozen=True) class ClickhouseNode: host_name: str - port: int + native_port: int shard: Optional[int] = None replica: Optional[int] = None + # The node's HTTP port, used by the clickhouse-connect (HTTP) driver. It is + # optional because nodes built outside a cluster context (e.g. in tests, or + # the replacer's load balancer, which never open HTTP connections) do not + # need one. Cluster-produced nodes always carry the cluster's HTTP port. + http_port: Optional[int] = None def __str__(self) -> str: - return f"{self.host_name}:{self.port}" + return f"{self.host_name}:{self.native_port}" class ClickhouseNodeType(Enum): @@ -166,6 +189,23 @@ def get_batch_writer( ClickhouseWriterOptions = Optional[Mapping[str, Any]] +def use_clickhouse_connect_driver() -> bool: + """ + Whether the read path should use the clickhouse-connect (HTTP) driver + instead of the native protocol. + + Controlled by a runtime config flag (defaulting to the + ``USE_CLICKHOUSE_CONNECT_DRIVER`` setting) so the migration can be rolled + out and rolled back without a deploy. + """ + default = 1 if settings.USE_CLICKHOUSE_CONNECT_DRIVER else 0 + return bool(state.get_int_config("use_clickhouse_connect_driver", default)) + + +# The driver discriminator is part of the cache key so that the native and the +# HTTP pool for the same node can be cached side by side. The node's HTTP port +# is part of ``ClickhouseNode`` itself, so it does not need a separate key +# element. CacheKey = Tuple[ ClickhouseNode, ClickhouseClientSettings, @@ -175,6 +215,7 @@ def get_batch_writer( bool, Optional[str], Optional[bool], + str, ] @@ -194,8 +235,26 @@ def get_node_connection( ca_certs: Optional[str], verify: Optional[bool], ) -> ClickhousePool: + """ + Return a cached connection pool for the node, typed as the abstract + :class:`ClickhousePool`. The driver is decided here, from the + ``use_clickhouse_connect_driver`` runtime config: when it is enabled the + clickhouse-connect (HTTP) pool is built (connecting on the node's + ``http_port``), otherwise the native one (connecting on the node's + ``native_port``). Both variants are cached side by side (the driver is + part of the cache key). + + This is the single place pools are instantiated and the single place the + driver is selected, so every caller — the cluster query/node connections + as well as the admin and CLI by-host helpers — goes through it and gets + one shared, runtime-selected pool behind the abstract + :class:`ClickhousePool` type. Pool sizing is left to the pools themselves + (the connect pool reads the ``clickhouse_connect_pool_size`` runtime + config). + """ + use_connect = use_clickhouse_connect_driver() with self.__lock: - settings, timeout = client_settings.value + client_settings_dict, timeout = client_settings.value cache_key = ( node, client_settings, @@ -205,20 +264,48 @@ def get_node_connection( secure, ca_certs, verify, + "http" if use_connect else "native", ) if cache_key not in self.__cache: - self.__cache[cache_key] = ClickhousePool( - node.host_name, - node.port, - user, - password, - database, - client_settings=settings, - send_receive_timeout=timeout, - secure=secure, - ca_certs=ca_certs, - verify=verify, - ) + pool: ClickhousePool + if use_connect: + # Imported here so that the native code path never imports + # clickhouse-connect. + from snuba.clickhouse.connect import ClickhouseConnectPool + + pool = ClickhouseConnectPool( + host=node.host_name, + # Fall back to the default HTTP port only for nodes that + # were built without one (e.g. by-host helpers that have + # no cluster http_port to draw on). + http_port=( + node.http_port + if node.http_port is not None + else DEFAULT_CLICKHOUSE_HTTP_PORT + ), + user=user, + password=password, + database=database, + client_settings=client_settings_dict, + send_receive_timeout=timeout, + secure=secure, + ca_certs=ca_certs, + verify=verify, + ) + else: + pool = ClickhouseNativePool( + node.host_name, + node.native_port, + user, + password, + database, + client_settings=client_settings_dict, + send_receive_timeout=timeout, + secure=secure, + ca_certs=ca_certs, + verify=verify, + ) + self.__cache[cache_key] = pool return self.__cache[cache_key] @@ -272,7 +359,7 @@ def __init__( self.__port = port self.__max_connections = max_connections or _DEFAULT_MAX_CONNECTIONS self.__block_connections = block_connections - self.__query_node = ClickhouseNode(host, port) + self.__query_node = ClickhouseNode(host, port, http_port=http_port) self.__user = user self.__password = password self.__database = database @@ -283,11 +370,12 @@ def __init__( self.__single_node = single_node self.__cluster_name = cluster_name self.__distributed_cluster_name = distributed_cluster_name - self.__reader: Optional[Reader] = None - self.__deleter: Optional[Reader] = None self.__connection_cache = connection_cache self.__cache_partition_id = cache_partition_id self.__query_settings_prefix = query_settings_prefix + # The local node used by the deleter is static cluster topology; cache + # it so get_deleter() does not re-run a system.clusters lookup per call. + self.__delete_local_node: Optional[ClickhouseNode] = None def __str__(self) -> str: return str(self.__query_node) @@ -316,8 +404,12 @@ def get_node_connection( Get a Clickhouse connection using the client settings provided. Reuse any connection to the same node with the same settings otherwise establish a new connection. - """ + The driver is selected inside ``ConnectionCache.get_node_connection`` + from the ``use_clickhouse_connect_driver`` runtime config (HTTP pool + when enabled, native otherwise). The choice applies to every caller + (reads, migrations, replacer, optimize, ...), not just the read path. + """ return self.__connection_cache.get_node_connection( client_settings, node, @@ -330,25 +422,32 @@ def get_node_connection( ) def get_deleter(self) -> Reader: - if not self.__deleter: - # we need the connection to the storage nodes, not - # the distributed nodes - local_node = self.get_local_nodes()[0] - self.__deleter = NativeDriverReader( - cache_partition_id=f"{self.__cache_partition_id}_deletes", - client=self.get_node_connection(ClickhouseClientSettings.DELETE, local_node), - query_settings_prefix=self.__query_settings_prefix, - ) - return self.__deleter + # we need the connection to the storage nodes, not the distributed + # nodes. The node lookup is cached (it can run a system.clusters query + # on multi-node clusters) while the connection is resolved per call so + # the driver can still switch at runtime. + if self.__delete_local_node is None: + self.__delete_local_node = self.get_local_nodes()[0] + return ClickhouseReader( + cache_partition_id=f"{self.__cache_partition_id}_deletes", + client=self.get_node_connection( + ClickhouseClientSettings.DELETE, self.__delete_local_node + ), + query_settings_prefix=self.__query_settings_prefix, + ) def get_reader(self) -> Reader: - if not self.__reader: - self.__reader = NativeDriverReader( - cache_partition_id=self.__cache_partition_id, - client=self.get_query_connection(ClickhouseClientSettings.QUERY), - query_settings_prefix=self.__query_settings_prefix, - ) - return self.__reader + """ + Return a reader for the query node. The driver-agnostic ClickhouseReader + wraps whichever pool (native or HTTP) get_query_connection selects from + the ``use_clickhouse_connect_driver`` runtime config, so the driver can + be switched at runtime. + """ + return ClickhouseReader( + cache_partition_id=self.__cache_partition_id, + client=self.get_query_connection(ClickhouseClientSettings.QUERY), + query_settings_prefix=self.__query_settings_prefix, + ) def get_batch_writer( self, @@ -419,15 +518,24 @@ def get_distributed_nodes(self) -> Sequence[ClickhouseNode]: def get_connection_id(self) -> ConnectionId: return ConnectionId( hostname=self.__query_node.host_name, - tcp_port=self.__query_node.port, + tcp_port=self.__query_node.native_port, http_port=self.__http_port, database_name=self.__database, ) def __get_cluster_nodes(self, cluster_name: str) -> Sequence[ClickhouseNode]: + # system.clusters only reports the native port; every node in the + # cluster shares the cluster's configured HTTP port, so stamp it on each + # discovered node so it is self-describing for the HTTP driver. return [ - ClickhouseNode(*host) - for host in self.get_query_connection(ClickhouseClientSettings.QUERY) + ClickhouseNode( + host_name=host[0], + native_port=host[1], + shard=host[2], + replica=host[3], + http_port=self.__http_port, + ) + for host in self.get_query_connection(ClickhouseClientSettings.INTERNAL) .execute( "select host_name, port, shard_num, replica_num from system.clusters where cluster=%(cluster_name)s", {"cluster_name": cluster_name}, diff --git a/snuba/manual_jobs/extract_span_data.py b/snuba/manual_jobs/extract_span_data.py index d7ca7767cc0..9dc1ccfb6c0 100644 --- a/snuba/manual_jobs/extract_span_data.py +++ b/snuba/manual_jobs/extract_span_data.py @@ -112,7 +112,7 @@ def _generate_spans_query(self) -> str: def execute(self, logger: JobLogger) -> None: cluster = get_cluster(StorageSetKey.EVENTS_ANALYTICS_PLATFORM) - connection = cluster.get_query_connection(ClickhouseClientSettings.QUERY) + connection = cluster.get_query_connection(ClickhouseClientSettings.INTERNAL) query = f""" INSERT INTO FUNCTION gcs('{self._gcp_bucket_name}/{self._output_file_path}', diff --git a/snuba/migrations/check_dangerous.py b/snuba/migrations/check_dangerous.py index 2436d70910c..c45b608ebde 100644 --- a/snuba/migrations/check_dangerous.py +++ b/snuba/migrations/check_dangerous.py @@ -27,7 +27,9 @@ def check_dangerous_operation(op: SqlOperation, columns_state: ColumnStatesMapTy table = op.get_table_name() nodes = op.get_nodes() for node in nodes: - old_type = columns_state.get((node.host_name, node.port, table, col.name), None) + old_type = columns_state.get( + (node.host_name, node.native_port, table, col.name), None + ) if old_type: _check_dangerous(old_type, col.type) except DangerousOperationError as err: diff --git a/snuba/migrations/connect.py b/snuba/migrations/connect.py index 05044c68a28..901eda46219 100644 --- a/snuba/migrations/connect.py +++ b/snuba/migrations/connect.py @@ -161,9 +161,9 @@ def check_for_inactive_replicas(clusters: List[ClickhouseCluster]) -> None: inactive_replica_info = [] for cluster in clusters: for node in cluster.get_local_nodes(): - if (node.host_name, node.port) in checked_nodes: + if (node.host_name, node.native_port) in checked_nodes: continue - checked_nodes.add((node.host_name, node.port)) + checked_nodes.add((node.host_name, node.native_port)) conn = cluster.get_node_connection(ClickhouseClientSettings.MIGRATE, node) tables_with_inactive = conn.execute( f"SELECT table, total_replicas, active_replicas FROM system.replicas " @@ -197,9 +197,9 @@ def get_column_states() -> ColumnStatesMapType: except UndefinedClickhouseCluster: continue for node in (*local_nodes, *distributed_nodes, query_node): - if (node.host_name, node.port) in checked_nodes: + if (node.host_name, node.native_port) in checked_nodes: continue - checked_nodes.add((node.host_name, node.port)) + checked_nodes.add((node.host_name, node.native_port)) conn = cluster.get_node_connection(ClickhouseClientSettings.MIGRATE, node) column_types = conn.execute( @@ -208,6 +208,6 @@ def get_column_states() -> ColumnStatesMapType: for row in column_types: table, col_name, type = row - column_states[(node.host_name, node.port, table, col_name)] = type + column_states[(node.host_name, node.native_port, table, col_name)] = type return column_states diff --git a/snuba/settings/__init__.py b/snuba/settings/__init__.py index a1aebf43709..4ac7fe2434d 100644 --- a/snuba/settings/__init__.py +++ b/snuba/settings/__init__.py @@ -99,6 +99,11 @@ # pool.get() when ClickHouse is hung but not dropping connections. CLICKHOUSE_POOL_GET_TIMEOUT_SECONDS = 5 +# Default for routing ClickHouse queries through clickhouse-connect (over the +# HTTP protocol) instead of the native clickhouse-driver protocol. This default +# can be overridden at runtime with the `use_clickhouse_connect_driver` config. +USE_CLICKHOUSE_CONNECT_DRIVER = False + CLUSTERS: Sequence[Mapping[str, Any]] = [ { "host": os.environ.get("CLICKHOUSE_HOST", "127.0.0.1"), diff --git a/snuba/web/delete_query.py b/snuba/web/delete_query.py index 075917d1751..4d3d1c74b59 100644 --- a/snuba/web/delete_query.py +++ b/snuba/web/delete_query.py @@ -199,7 +199,7 @@ def _num_ongoing_mutations(cluster: ClickhouseCluster, tables: Sequence[str]) -> ) """ return int( - cluster.get_query_connection(ClickhouseClientSettings.QUERY).execute(query).results[0][0] + cluster.get_query_connection(ClickhouseClientSettings.INTERNAL).execute(query).results[0][0] ) @@ -218,7 +218,7 @@ def _num_parts_currently_mutating(cluster: ClickhouseCluster) -> int: WHERE metric = 'PartMutation' """ return int( - cluster.get_query_connection(ClickhouseClientSettings.QUERY).execute(query).results[0][0] + cluster.get_query_connection(ClickhouseClientSettings.INTERNAL).execute(query).results[0][0] ) diff --git a/snuba/web/rpc/storage_routing/load_retriever.py b/snuba/web/rpc/storage_routing/load_retriever.py index 43b8775f0dc..cd6a0fa2638 100644 --- a/snuba/web/rpc/storage_routing/load_retriever.py +++ b/snuba/web/rpc/storage_routing/load_retriever.py @@ -131,12 +131,12 @@ def get_cluster_loadinfo( """ cluster_load = float( - cluster.get_query_connection(ClickhouseClientSettings.QUERY) + cluster.get_query_connection(ClickhouseClientSettings.INTERNAL) .execute(cluster_load_query) .results[0][0] ) concurrent_queries = int( - cluster.get_query_connection(ClickhouseClientSettings.QUERY) + cluster.get_query_connection(ClickhouseClientSettings.INTERNAL) .execute(concurrent_queries_query) .results[0][0] ) diff --git a/tests/admin/test_system_queries.py b/tests/admin/test_system_queries.py index d395bc743fa..772a3a9342e 100644 --- a/tests/admin/test_system_queries.py +++ b/tests/admin/test_system_queries.py @@ -299,14 +299,15 @@ def test_clusterless_rejects_unvalidated_host( helper_name: str, client_settings: ClickhouseClientSettings ) -> None: """ - Regression for EAP-488: the clusterless helpers used to construct a + Regression for EAP-488: the clusterless helpers used to acquire a ClickhousePool against any attacker-supplied host/port, which leaked the - configured ClickHouse user/password in the first hello packet of the - native protocol. Both helpers must now call _validate_node before - constructing the pool, so an invalid host produces InvalidNodeError and - no credentials ever leave the process. + configured ClickHouse user/password to the node (the native protocol's + first hello packet, or the HTTP auth header). Both helpers must now call + _validate_node before acquiring the pool, so an invalid host produces + InvalidNodeError and no credentials ever leave the process. """ from snuba.admin.clickhouse import common + from snuba.clusters.cluster import connection_cache helper = getattr(common, helper_name) @@ -316,7 +317,9 @@ def test_clusterless_rejects_unvalidated_host( "_validate_node", side_effect=InvalidNodeError("host not in cluster"), ) as mock_validate, - patch.object(common, "ClickhousePool") as mock_pool, + # connection_cache is the shared singleton; patching the method on the + # object affects the reference bound inside common as well. + patch.object(connection_cache, "get_node_connection") as mock_pool, ): # Clear any cached connection for this storage so the cache lookup # can't short-circuit validation. @@ -326,9 +329,9 @@ def test_clusterless_rejects_unvalidated_host( with pytest.raises(InvalidNodeError): helper("attacker.example.com", 9009, "errors", client_settings) - assert mock_validate.called, "_validate_node must run before pool construction" + assert mock_validate.called, "_validate_node must run before pool acquisition" assert not mock_pool.called, ( - "ClickhousePool must not be constructed for an unvalidated host — " + "A pool must not be acquired for an unvalidated host — " "doing so would transmit ClickHouse credentials to the attacker" ) @@ -385,11 +388,24 @@ def test_sudo_mode_skips_experimental_analyzer(sql_query: str, sudo_mode: bool) ) +# Names that acquire a ClickhousePool (and therefore send credentials to a +# node): the connection cache accessor and direct construction of either pool +# implementation. Any of these in admin code must sit behind _validate_node. +_POOL_ACQUISITION_NAMES = frozenset( + { + "get_node_connection", + "ClickhouseNativePool", + "ClickhouseConnectPool", + } +) + + def _find_clickhouse_pool_calls(tree: ast.AST) -> list[tuple[ast.Call, list[str]]]: """ - Walks an AST and returns every `ClickhousePool(...)` call site, paired - with the chain of enclosing function names (outermost first) so the - regression guard below can assert *where* construction happens. + Walks an AST and returns every pool-acquisition call site (cache accessor + or direct pool construction), paired with the chain of enclosing function + names (outermost first) so the regression guard below can assert *where* + acquisition happens. """ results: list[tuple[ast.Call, list[str]]] = [] @@ -416,7 +432,7 @@ def visit_Call(self, node: ast.Call) -> None: if isinstance(func, ast.Attribute) else None ) - if name == "ClickhousePool": + if name in _POOL_ACQUISITION_NAMES: results.append((node, list(self.scope))) self.generic_visit(node) @@ -426,19 +442,21 @@ def visit_Call(self, node: ast.Call) -> None: def test_no_direct_clickhouse_pool_construction_in_admin() -> None: """ - Defense-in-depth for EAP-488: ClickhousePool ships the configured - user/password in the first hello packet of the native protocol, so any - admin code path that constructs one against a caller-supplied host - leaks credentials to whatever listener answers. `_build_validated_pool` - in snuba/admin/clickhouse/common.py is the single chokepoint that runs - `_validate_node` first — every other admin module must go through it. - - This test enforces that structural invariant by AST-walking every - snuba/admin/**/*.py file: - - * common.py may only construct ClickhousePool from inside - `_build_validated_pool`. - * No other admin module may construct ClickhousePool at all. + Defense-in-depth for EAP-488: a ClickhousePool ships the configured + user/password to the node (the native protocol's first hello packet, or + the HTTP auth header), so any admin code path that acquires one against a + caller-supplied host leaks credentials to whatever listener answers. + `_build_validated_pool` in snuba/admin/clickhouse/common.py is the single + chokepoint that runs `_validate_node` first — every other admin module + must go through it. + + "Acquiring" a pool means either constructing one of the pool + implementations directly or fetching one from the shared connection cache + via `get_node_connection`. This test enforces that structural invariant by + AST-walking every snuba/admin/**/*.py file: + + * common.py may only acquire a pool from inside `_build_validated_pool`. + * No other admin module may acquire a pool at all. If this fails, a new caller has likely re-introduced the vulnerability. """ @@ -456,13 +474,13 @@ def test_no_direct_clickhouse_pool_construction_in_admin() -> None: if py_file == common_path: if scope != ["_build_validated_pool"]: offenders.append( - f"{location} constructs ClickhousePool inside " + f"{location} acquires a ClickhousePool inside " f"{'.'.join(scope) or ''} — must be inside " "_build_validated_pool so _validate_node runs first." ) else: offenders.append( - f"{location} constructs ClickhousePool directly — call " + f"{location} acquires a ClickhousePool directly — call " "_build_validated_pool (or a helper that wraps it) so " "_validate_node guards the host before credentials are sent." ) diff --git a/tests/clickhouse/optimize/test_optimize_tracker.py b/tests/clickhouse/optimize/test_optimize_tracker.py index ee03e5e38a9..406f55164a9 100644 --- a/tests/clickhouse/optimize/test_optimize_tracker.py +++ b/tests/clickhouse/optimize/test_optimize_tracker.py @@ -7,7 +7,7 @@ import pytest from snuba import settings -from snuba.clickhouse.native import ClickhousePool, ClickhouseResult +from snuba.clickhouse.native import ClickhouseNativePool, ClickhouseResult from snuba.clickhouse.optimize import optimize from snuba.clickhouse.optimize.optimize import run_optimize_cron_job from snuba.clickhouse.optimize.optimize_tracker import ( @@ -298,10 +298,10 @@ def test_merge_info() -> None: ] ) - with patch.object(ClickhousePool, "execute") as mock_clickhouse_execute: + with patch.object(ClickhouseNativePool, "execute") as mock_clickhouse_execute: mock_clickhouse_execute.return_value = merge_query_result merge_info = optimize.get_current_large_merges( - clickhouse=ClickhousePool("127.0.0.1", 9000, "user", "password", "database"), + clickhouse=ClickhouseNativePool("127.0.0.1", 9000, "user", "password", "database"), database="default", table="errors_local", ) @@ -322,7 +322,7 @@ def test_merge_info() -> None: assert merge_info[0].estimated_time == 8020.61436897 / (0.9895385071013121 + 0.0001) busy = optimize.is_busy_merging( - clickhouse=ClickhousePool("127.0.0.1", 9000, "user", "password", "database"), + clickhouse=ClickhouseNativePool("127.0.0.1", 9000, "user", "password", "database"), database="default", table="errors_local", ) diff --git a/tests/clickhouse/test_connect.py b/tests/clickhouse/test_connect.py new file mode 100644 index 00000000000..1ab43d453d2 --- /dev/null +++ b/tests/clickhouse/test_connect.py @@ -0,0 +1,338 @@ +from typing import Any, cast +from unittest import mock + +import pytest + +from snuba.clickhouse.connect import ClickhouseConnectPool +from snuba.clickhouse.errors import ClickhouseError +from snuba.clickhouse.formatter.nodes import FormattedQuery + +# Error code returned by ClickHouse when the maximum number of simultaneous +# queries has been exceeded. +TOO_MANY_SIMULTANEOUS_QUERIES = 202 + + +class FakeColumnType: + def __init__(self, name: str) -> None: + self.name = name + + +class FakeQueryResult: + def __init__( + self, + result_set: Any, + column_names: Any = (), + column_types: Any = (), + summary: Any = None, + ) -> None: + self.result_set = result_set + self.column_names = column_names + self.column_types = column_types + self.summary = summary or {} + + +def _make_pool(client: mock.Mock) -> ClickhouseConnectPool: + pool = ClickhouseConnectPool( + host="host", + user="test", + password="test", + database="test", + ) + # Avoid creating a real client / connection. + pool._get_client = lambda: client # type: ignore[method-assign] + return pool + + +def test_execute_maps_result_and_profile() -> None: + client = mock.Mock() + client.query.return_value = FakeQueryResult( + result_set=[[1, "a"], [2, "b"]], + column_names=("id", "name"), + column_types=(FakeColumnType("UInt64"), FakeColumnType("String")), + summary={"read_rows": "2", "read_bytes": "128", "elapsed_ns": "1500000000"}, + ) + + pool = _make_pool(client) + result = pool.execute("SELECT id, name FROM t", with_column_types=True) + + assert result.results == [[1, "a"], [2, "b"]] + assert result.meta == [("id", "UInt64"), ("name", "String")] + assert result.profile is not None + assert result.profile["rows"] == 2 + assert result.profile["bytes"] == 128 + assert result.profile["elapsed"] == 1.5 + + +def test_execute_passes_query_id_and_settings() -> None: + client = mock.Mock() + client.query.return_value = FakeQueryResult(result_set=[]) + + pool = _make_pool(client) + pool.execute( + "SELECT 1", + query_id="my-query-id", + settings={"max_threads": 4}, + ) + + _, kwargs = client.query.call_args + assert kwargs["settings"]["query_id"] == "my-query-id" + assert kwargs["settings"]["max_threads"] == 4 + + +def test_too_many_simultaneous_queries_not_retried() -> None: + # We delegate all retries to clickhouse-connect, which does not retry the + # TOO_MANY_SIMULTANEOUS_QUERIES error. It should be surfaced directly, + # mapped to a ClickhouseError that preserves the error code. + from clickhouse_connect.driver.exceptions import DatabaseError + + client = mock.Mock() + client.query.side_effect = DatabaseError("too many", code=TOO_MANY_SIMULTANEOUS_QUERIES) + + pool = _make_pool(client) + try: + pool.execute("SELECT 1") + raise AssertionError("expected a ClickhouseError to be raised") + except ClickhouseError as error: + assert error.code == TOO_MANY_SIMULTANEOUS_QUERIES + # No retry on top of clickhouse-connect's own handling. + assert client.query.call_count == 1 + + +def test_operational_error_mapped_without_extra_retries() -> None: + # Connection-level retries are clickhouse-connect's responsibility; we only + # map the surfaced error onto ClickhouseError without retrying again. + from clickhouse_connect.driver.exceptions import OperationalError + + client = mock.Mock() + client.query.side_effect = OperationalError("connection refused") + + pool = _make_pool(client) + with pytest.raises(ClickhouseError): + pool.execute("SELECT 1", retryable=True) + + assert client.query.call_count == 1 + + +def test_generic_clickhouse_error_wrapped() -> None: + # Any clickhouse-connect error (here a ProgrammingError) must be wrapped in + # a snuba ClickhouseError, matching how the native pool wraps the whole + # clickhouse_driver errors.Error family. + from clickhouse_connect.driver.exceptions import ProgrammingError + + client = mock.Mock() + client.query.side_effect = ProgrammingError("bad query") + + pool = _make_pool(client) + with pytest.raises(ClickhouseError): + pool.execute("SELECT 1") + + assert client.query.call_count == 1 + + +def test_timeouts_are_passed_through() -> None: + import clickhouse_connect + + # The per-profile timeout is honored as-is (no capping), the same way the + # native driver uses it. A large timeout (e.g. from MIGRATE) is preserved. + pool = ClickhouseConnectPool( + host="host", + user="test", + password="test", + database="test", + connect_timeout=60, + send_receive_timeout=300000, + ) + + with ( + mock.patch.object(clickhouse_connect, "get_client") as get_client, + mock.patch("snuba.clickhouse.connect.get_pool_manager"), + ): + pool._get_client() + + _, kwargs = get_client.call_args + assert kwargs["send_receive_timeout"] == 300000 + assert kwargs["connect_timeout"] == 60 + + +def test_send_receive_timeout_default_when_unset() -> None: + import clickhouse_connect + + pool = ClickhouseConnectPool( + host="host", + user="test", + password="test", + database="test", + send_receive_timeout=None, + ) + + with ( + mock.patch.object(clickhouse_connect, "get_client") as get_client, + mock.patch("snuba.clickhouse.connect.get_pool_manager"), + ): + pool._get_client() + + _, kwargs = get_client.call_args + assert kwargs["send_receive_timeout"] == 300 + + +def test_read_query_client_settings_use_30s_timeout() -> None: + # Read queries (the QUERY profile) get a 30s timeout on both drivers. + from snuba.clusters.cluster import ClickhouseClientSettings + + assert ClickhouseClientSettings.QUERY.value.timeout == 30 + + +def test_internal_profile_is_unbounded() -> None: + # The 30s read timeout must not leak onto internal/maintenance queries + # (topology discovery, routing load lookups, delete throttling checks, the + # span-export job, table copies). They use the INTERNAL profile, which stays + # unbounded so long-running operations aren't capped at 30s. + from snuba.clusters.cluster import ClickhouseClientSettings + + assert ClickhouseClientSettings.INTERNAL.value.timeout is None + + +def test_pool_size_defaults_to_setting() -> None: + import clickhouse_connect + + from snuba import settings + + pool = ClickhouseConnectPool(host="host", user="test", password="test", database="test") + + with ( + mock.patch.object(clickhouse_connect, "get_client"), + mock.patch("snuba.clickhouse.connect.get_pool_manager") as get_pool_manager, + ): + pool._get_client() + + _, kwargs = get_pool_manager.call_args + assert kwargs["maxsize"] == settings.CLICKHOUSE_MAX_POOL_SIZE + + +@pytest.mark.redis_db +def test_pool_size_runtime_override() -> None: + import clickhouse_connect + + from snuba import state + + state.set_config("clickhouse_connect_pool_size", 42) + + pool = ClickhouseConnectPool(host="host", user="test", password="test", database="test") + + with ( + mock.patch.object(clickhouse_connect, "get_client"), + mock.patch("snuba.clickhouse.connect.get_pool_manager") as get_pool_manager, + ): + pool._get_client() + + _, kwargs = get_pool_manager.call_args + assert kwargs["maxsize"] == 42 + + +def test_clickhouse_reader_wraps_connect_pool() -> None: + # The single driver-agnostic ClickhouseReader wraps the abstract pool, so it + # works with the connect pool just like the native one. + from snuba.clickhouse.native import ClickhouseReader + + pool = _make_pool(mock.Mock()) + reader = ClickhouseReader(cache_partition_id=None, client=pool, query_settings_prefix=None) + assert isinstance(reader, ClickhouseReader) + + +def test_with_totals_handled_over_http() -> None: + # WITH TOTALS works on the HTTP driver through clickhouse-connect's own + # parsing: its Native-format reader concatenates every response block, + # including the trailing totals block, into result_set, so the totals row + # arrives last — exactly what the driver-agnostic ClickhouseReader expects + # when it pops the last row as "totals". No native-driver-specific handling + # is involved. + from snuba.clickhouse.native import ClickhouseReader + + class FakeFormattedQuery: + def get_sql(self) -> str: + return "SELECT project_id, count() FROM t GROUP BY project_id WITH TOTALS" + + client = mock.Mock() + # Two data rows followed by the totals row, the way clickhouse-connect + # surfaces a WITH TOTALS response over HTTP. + client.query.return_value = FakeQueryResult( + result_set=[[1, 10], [2, 20], [0, 30]], + column_names=("project_id", "count()"), + column_types=(FakeColumnType("UInt64"), FakeColumnType("UInt64")), + ) + + pool = _make_pool(client) + reader = ClickhouseReader(cache_partition_id=None, client=pool, query_settings_prefix=None) + + result = reader.execute(cast(FormattedQuery, FakeFormattedQuery()), with_totals=True) + + # The trailing row is split out as totals; only the real rows remain in data. + assert result["data"] == [ + {"project_id": 1, "count()": 10}, + {"project_id": 2, "count()": 20}, + ] + assert result["totals"] == {"project_id": 0, "count()": 30} + + +def test_connect_type_names_drive_reader_transforms() -> None: + # The connect pool exposes clickhouse-connect's column_type.name in the + # result meta, and the driver-agnostic ClickhouseReader runs that through + # the same Date / DateTime / UUID regex transforms used for the native + # driver. This pins that clickhouse-connect's type-name format matches what + # those regexes expect (including parametrized types like DateTime('UTC') + # and Nullable(UUID)), so transformations are not silently skipped on the + # HTTP path. + from datetime import date, datetime + from uuid import UUID as UUIDClass + + from clickhouse_connect.datatypes.registry import get_from_name + + from snuba.clickhouse.native import ClickhouseReader + + class FakeFormattedQuery: + def get_sql(self) -> str: + return "SELECT d, dt, dt_tz, uid, nuid" + + # Column types named exactly as clickhouse-connect produces them. The + # assertion documents that clickhouse-connect uses the canonical ClickHouse + # type strings (the same the native driver returns), so the reader regexes + # match identically for both drivers. + col_types = [ + get_from_name("Date"), + get_from_name("DateTime"), + get_from_name("DateTime('UTC')"), + get_from_name("UUID"), + get_from_name("Nullable(UUID)"), + ] + assert [c.name for c in col_types] == [ + "Date", + "DateTime", + "DateTime('UTC')", + "UUID", + "Nullable(UUID)", + ] + + d = date(2023, 1, 2) + dt = datetime(2023, 1, 2, 3, 4, 5) + uid = UUIDClass("00000000-0000-0000-0000-000000000001") + + client = mock.Mock() + client.query.return_value = FakeQueryResult( + result_set=[[d, dt, dt, uid, uid]], + column_names=("d", "dt", "dt_tz", "uid", "nuid"), + column_types=tuple(col_types), + ) + + pool = _make_pool(client) + reader = ClickhouseReader(cache_partition_id=None, client=pool, query_settings_prefix=None) + result = reader.execute(cast(FormattedQuery, FakeFormattedQuery())) + + # Date/DateTime (incl. the parametrized tz variant) become ISO strings and + # UUID (incl. Nullable) becomes a string. Had the regexes failed to match + # the connect type names, these would still be the original objects. + row = result["data"][0] + assert row["d"] == "2023-01-02T00:00:00+00:00" + assert row["dt"] == "2023-01-02T03:04:05+00:00" + assert row["dt_tz"] == "2023-01-02T03:04:05+00:00" + assert row["uid"] == "00000000-0000-0000-0000-000000000001" + assert row["nuid"] == "00000000-0000-0000-0000-000000000001" diff --git a/tests/clickhouse/test_native.py b/tests/clickhouse/test_native.py index 1eabd3518f1..12df495bcc7 100644 --- a/tests/clickhouse/test_native.py +++ b/tests/clickhouse/test_native.py @@ -9,7 +9,7 @@ from snuba import state from snuba.clickhouse.errors import ClickhouseError -from snuba.clickhouse.native import ClickhousePool, transform_datetime +from snuba.clickhouse.native import ClickhouseNativePool, transform_datetime def test_transform_datetime() -> None: @@ -27,7 +27,7 @@ def test_robust_concurrency_limit() -> None: connection = mock.Mock() connection.execute.side_effect = ClickhouseError("some error", extra_data={"code": 1}) - pool = ClickhousePool("host", 100, "test", "test", "test") + pool = ClickhouseNativePool("host", 100, "test", "test", "test") pool.pool = queue.LifoQueue(1) pool.pool.put(connection, block=False) @@ -62,7 +62,7 @@ def test_concurrency_limit() -> None: state.set_config("simultaneous_queries_sleep_seconds", 0.5) - pool = ClickhousePool("host", 100, "test", "test", "test") + pool = ClickhouseNativePool("host", 100, "test", "test", "test") pool.pool = queue.LifoQueue(1) pool.pool.put(connection, block=False) @@ -100,7 +100,7 @@ def test_execute_retries(retryable: bool, expected: int) -> None: socket_timeout_connection = mock.Mock() socket_timeout_connection.execute.side_effect = errors.SocketTimeoutError - pool = ClickhousePool(CLUSTER_HOST, CLUSTER_PORT, "test", "test", TEST_DB_NAME) + pool = ClickhouseNativePool(CLUSTER_HOST, CLUSTER_PORT, "test", "test", TEST_DB_NAME) with mock.patch.object(pool, "_create_conn", lambda: socket_timeout_connection): pool.pool = queue.LifoQueue(1) diff --git a/tests/clusters/fake_cluster.py b/tests/clusters/fake_cluster.py index 4be4782d5b9..16b09404999 100644 --- a/tests/clusters/fake_cluster.py +++ b/tests/clusters/fake_cluster.py @@ -1,6 +1,6 @@ from typing import Any, List, Mapping, MutableMapping, Optional, Sequence, Set, Tuple -from snuba.clickhouse.native import ClickhousePool, ClickhouseResult, Params +from snuba.clickhouse.native import ClickhouseNativePool, ClickhouseResult, Params from snuba.clusters.cluster import ( ClickhouseClientSettings, ClickhouseCluster, @@ -13,7 +13,7 @@ class ServerExplodedException(SerializableException): pass -class FakeClickhousePool(ClickhousePool): +class FakeClickhousePool(ClickhouseNativePool): def __init__(self, host_name: str) -> None: self.__queries: List[str] = [] self.host = host_name @@ -128,7 +128,7 @@ def get_node_connection( self, client_settings: ClickhouseClientSettings, node: ClickhouseNode, - ) -> ClickhousePool: + ) -> ClickhouseNativePool: settings, timeout = client_settings.value cache_key = (node, client_settings) if cache_key not in self.__connections: diff --git a/tests/clusters/test_cluster.py b/tests/clusters/test_cluster.py index e8ef94dbdb6..35ee5815fd6 100644 --- a/tests/clusters/test_cluster.py +++ b/tests/clusters/test_cluster.py @@ -5,7 +5,7 @@ import pytest from snuba import settings -from snuba.clickhouse.native import ClickhousePool, ClickhouseResult +from snuba.clickhouse.native import ClickhouseNativePool, ClickhouseResult from snuba.clusters import cluster from snuba.clusters.storage_sets import StorageSetKey from snuba.datasets.storages.factory import get_storage @@ -177,13 +177,13 @@ def test_disabled_cluster() -> None: @pytest.mark.clickhouse_db def test_get_local_nodes() -> None: importlib.reload(cluster) - with patch.object(ClickhousePool, "execute") as execute: + with patch.object(ClickhouseNativePool, "execute") as execute: execute.return_value = ClickhouseResult([("host_1", 9000, 1, 1), ("host_2", 9000, 2, 1)]) local_cluster = get_storage(StorageKey("errors")).get_cluster() assert len(local_cluster.get_local_nodes()) == 1 assert local_cluster.get_local_nodes()[0].host_name == "host_1" - assert local_cluster.get_local_nodes()[0].port == 9000 + assert local_cluster.get_local_nodes()[0].native_port == 9000 assert local_cluster.get_local_nodes()[0].shard is None assert local_cluster.get_local_nodes()[0].replica is None @@ -260,6 +260,48 @@ def test_cache_connections() -> None: ) != cluster_3.get_query_connection(cluster.ClickhouseClientSettings.QUERY) +@pytest.mark.redis_db +@pytest.mark.clickhouse_db +def test_get_node_connection_selects_driver() -> None: + from snuba import state + from snuba.clickhouse.connect import ClickhouseConnectPool + from snuba.clickhouse.native import ClickhouseNativePool, ClickhouseReader + + test_cluster = cluster.ClickhouseCluster( + "127.0.0.1", + 8000, + "default", + "", + "default", + 8001, + False, + None, + False, + {"events"}, + True, + ) + + # The driver is selected at the pool level; the reader is the single + # driver-agnostic ClickhouseReader regardless. + # Default: native pool. + state.set_config("use_clickhouse_connect_driver", 0) + native_pool = test_cluster.get_query_connection(cluster.ClickhouseClientSettings.QUERY) + assert isinstance(native_pool, ClickhouseNativePool) + assert isinstance(test_cluster.get_reader(), ClickhouseReader) + + # Flip on at runtime: HTTP pool. + state.set_config("use_clickhouse_connect_driver", 1) + http_pool = test_cluster.get_query_connection(cluster.ClickhouseClientSettings.QUERY) + assert isinstance(http_pool, ClickhouseConnectPool) + + # Flip back: native pool again. + state.set_config("use_clickhouse_connect_driver", 0) + assert isinstance( + test_cluster.get_query_connection(cluster.ClickhouseClientSettings.QUERY), + ClickhouseNativePool, + ) + + @patch("snuba.settings.SLICED_CLUSTERS", SLICED_CLUSTERS_CONFIG) @pytest.mark.clickhouse_db def test_sliced_cluster() -> None: @@ -267,14 +309,14 @@ def test_sliced_cluster() -> None: res_cluster = cluster.get_cluster(StorageSetKey.GENERIC_METRICS_DISTRIBUTIONS, 1) - assert res_cluster.is_single_node() == True + assert res_cluster.is_single_node() is True assert res_cluster.get_database() == "slice_1_default" assert res_cluster.get_host() == "host_slice" assert res_cluster.get_port() == 9001 res_cluster_default = cluster.get_cluster(StorageSetKey.GENERIC_METRICS_DISTRIBUTIONS, 0) - assert res_cluster_default.is_single_node() == True + assert res_cluster_default.is_single_node() is True assert res_cluster_default.get_database() == "default" assert res_cluster_default.get_host() == "host_slice" assert res_cluster_default.get_port() == 9000 diff --git a/tests/test_clickhouse.py b/tests/test_clickhouse.py index 6278d374657..44880267110 100644 --- a/tests/test_clickhouse.py +++ b/tests/test_clickhouse.py @@ -5,7 +5,7 @@ from snuba.clickhouse.columns import Array, UInt from snuba.clickhouse.columns import SchemaModifiers as Modifier -from snuba.clickhouse.native import ClickhousePool +from snuba.clickhouse.native import ClickhouseNativePool from snuba.clusters.cluster import ClickhouseClientSettings from snuba.datasets.storages.factory import get_storage, get_writable_storage from snuba.datasets.storages.storage_key import StorageKey @@ -35,7 +35,7 @@ def test_reconnect(FakeClient: Client) -> None: errors.NetworkError, '{"data": "to my face"}', ] - cp = ClickhousePool("0:0:0:0", 9000, "default", "", "default") + cp = ClickhouseNativePool("0:0:0:0", 9000, "default", "", "default") cp.execute("SHOW TABLES") assert FakeClient.return_value.execute.mock_calls == [ call( diff --git a/tests/web/rpc/v1/routing_strategies/test_cluster_loadinfo.py b/tests/web/rpc/v1/routing_strategies/test_cluster_loadinfo.py index ade72da039e..871b89f1d20 100644 --- a/tests/web/rpc/v1/routing_strategies/test_cluster_loadinfo.py +++ b/tests/web/rpc/v1/routing_strategies/test_cluster_loadinfo.py @@ -42,7 +42,7 @@ def test_get_cluster_loadinfo_if_cache_fails() -> None: @pytest.mark.redis_db @pytest.mark.clickhouse_db def test_get_cluster_load_error_handling() -> None: - with patch("snuba.clusters.cluster.ClickhousePool.execute") as mock_execute: + with patch("snuba.clusters.cluster.ClickhouseNativePool.execute") as mock_execute: mock_execute.side_effect = Exception("Test error") load_info = get_cluster_loadinfo() assert load_info is not None diff --git a/tests/web/test_cache_partitions.py b/tests/web/test_cache_partitions.py index 82884b3780e..27ace9ad806 100644 --- a/tests/web/test_cache_partitions.py +++ b/tests/web/test_cache_partitions.py @@ -1,22 +1,22 @@ import pytest -from snuba.clickhouse.native import ClickhousePool, NativeDriverReader +from snuba.clickhouse.native import ClickhouseNativePool, ClickhouseReader from snuba.web.db_query import _get_cache_partition @pytest.mark.redis_db def test_cache_partition() -> None: - pool = ClickhousePool("127.0.0.1", 9000, "", "", "") - reader1 = NativeDriverReader(None, pool, None) - reader2 = NativeDriverReader(None, pool, None) + pool = ClickhouseNativePool("127.0.0.1", 9000, "", "", "") + reader1 = ClickhouseReader(None, pool, None) + reader2 = ClickhouseReader(None, pool, None) default_cache = _get_cache_partition(reader1) another_default_cache = _get_cache_partition(reader2) assert id(default_cache) == id(another_default_cache) - reader3 = NativeDriverReader("non_default", pool, None) - reader4 = NativeDriverReader("non_default", pool, None) + reader3 = ClickhouseReader("non_default", pool, None) + reader4 = ClickhouseReader("non_default", pool, None) nondefault_cache = _get_cache_partition(reader3) another_nondefault_cache = _get_cache_partition(reader4) diff --git a/uv.lock b/uv.lock index 15315dcfaec..4ea0a24ef9b 100644 --- a/uv.lock +++ b/uv.lock @@ -2,8 +2,10 @@ version = 1 revision = 3 requires-python = ">=3.13" resolution-markers = [ - "sys_platform == 'darwin'", - "sys_platform == 'linux'", + "python_full_version >= '3.14' and sys_platform == 'darwin'", + "python_full_version < '3.14' and sys_platform == 'darwin'", + "python_full_version >= '3.14' and sys_platform == 'linux'", + "python_full_version < '3.14' and sys_platform == 'linux'", ] supported-markers = [ "sys_platform == 'darwin'", @@ -96,6 +98,25 @@ wheels = [ { url = "https://pypi.devinfra.sentry.io/wheels/click-8.1.7-py3-none-any.whl", hash = "sha256:ae74fb96c20a0277a1d615f1e4d73c8414f5a98db8b799a7931d1582f3390c28" }, ] +[[package]] +name = "clickhouse-connect" +version = "1.3.0" +source = { registry = "https://pypi.devinfra.sentry.io/simple" } +dependencies = [ + { name = "certifi", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" }, + { name = "lz4", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" }, + { name = "urllib3", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" }, + { name = "zstandard", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" }, +] +wheels = [ + { url = "https://pypi.devinfra.sentry.io/wheels/clickhouse_connect-1.3.0-cp313-cp313-macosx_11_0_arm64.whl", hash = "sha256:9164b2d021eb4f8474767a301e196ffdb045ee960e9b68fc00c53b975ea10e79" }, + { url = "https://pypi.devinfra.sentry.io/wheels/clickhouse_connect-1.3.0-cp313-cp313-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:2432a22aab181a483f3dc6b398c8f641099f2d71bb9289ec84de9643a3b96493" }, + { url = "https://pypi.devinfra.sentry.io/wheels/clickhouse_connect-1.3.0-cp313-cp313-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:dee1b1d91258859260b7a2059fea87e1ee52f006e4a99a76ef21aa67a31face5" }, + { url = "https://pypi.devinfra.sentry.io/wheels/clickhouse_connect-1.3.0-cp314-cp314-macosx_11_0_arm64.whl", hash = "sha256:225de4ab00609e599f2529a8a5256da5f473ce9544a04ab9b18b8fdd5baf9005" }, + { url = "https://pypi.devinfra.sentry.io/wheels/clickhouse_connect-1.3.0-cp314-cp314-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:c18c0773a85f26c7eaeb59e0fd0a142e464312fda1c54fc7feae6115eb1759d4" }, + { url = "https://pypi.devinfra.sentry.io/wheels/clickhouse_connect-1.3.0-cp314-cp314-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:61f2f32403ac23354a572b160fa0a51ad5e76ba88aea37ebe5371b8863659339" }, +] + [[package]] name = "clickhouse-driver" version = "0.2.10" @@ -486,6 +507,19 @@ wheels = [ { url = "https://pypi.devinfra.sentry.io/wheels/jsonschema_specifications-2023.12.1-py3-none-any.whl", hash = "sha256:87e4fdf3a94858b8a2ba2778d9ba57d8a9cafca7c7489c46ba0d30a8bc6a9c3c" }, ] +[[package]] +name = "lz4" +version = "4.4.5" +source = { registry = "https://pypi.devinfra.sentry.io/simple" } +wheels = [ + { url = "https://pypi.devinfra.sentry.io/wheels/lz4-4.4.5-cp313-cp313-macosx_11_0_arm64.whl", hash = "sha256:b424df1076e40d4e884cfcc4c77d815368b7fb9ebcd7e634f937725cd9a8a72a" }, + { url = "https://pypi.devinfra.sentry.io/wheels/lz4-4.4.5-cp313-cp313-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:533298d208b58b651662dd972f52d807d48915176e5b032fb4f8c3b6f5fe535c" }, + { url = "https://pypi.devinfra.sentry.io/wheels/lz4-4.4.5-cp313-cp313-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:451039b609b9a88a934800b5fc6ee401c89ad9c175abf2f4d9f8b2e4ef1afc64" }, + { url = "https://pypi.devinfra.sentry.io/wheels/lz4-4.4.5-cp314-cp314-macosx_11_0_arm64.whl", hash = "sha256:c8e71b14938082ebaf78144f3b3917ac715f72d14c076f384a4c062df96f9df6" }, + { url = "https://pypi.devinfra.sentry.io/wheels/lz4-4.4.5-cp314-cp314-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:3b84a42da86e8ad8537aabef062e7f661f4a877d1c74d65606c49d835d36d668" }, + { url = "https://pypi.devinfra.sentry.io/wheels/lz4-4.4.5-cp314-cp314-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:0bba042ec5a61fa77c7e380351a61cb768277801240249841defd2ff0a10742f" }, +] + [[package]] name = "markupsafe" version = "3.0.2" @@ -1044,6 +1078,7 @@ source = { editable = "." } dependencies = [ { name = "blinker", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" }, { name = "click", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" }, + { name = "clickhouse-connect", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" }, { name = "clickhouse-driver", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" }, { name = "confluent-kafka", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" }, { name = "datadog", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" }, @@ -1113,6 +1148,7 @@ dev = [ requires-dist = [ { name = "blinker", specifier = ">=1.9" }, { name = "click", specifier = ">=8.1.7" }, + { name = "clickhouse-connect", specifier = ">=0.8.0" }, { name = "clickhouse-driver", specifier = ">=0.2.10" }, { name = "confluent-kafka", specifier = ">=2.7.0" }, { name = "datadog", specifier = ">=0.49.1" }, @@ -1376,7 +1412,8 @@ name = "watchdog" version = "3.0.0" source = { registry = "https://pypi.devinfra.sentry.io/simple" } resolution-markers = [ - "sys_platform == 'linux'", + "python_full_version >= '3.14' and sys_platform == 'linux'", + "python_full_version < '3.14' and sys_platform == 'linux'", ] wheels = [ { url = "https://pypi.devinfra.sentry.io/wheels/watchdog-3.0.0-py3-none-manylinux2014_aarch64.whl", hash = "sha256:0e06ab8858a76e1219e68c7573dfeba9dd1c0219476c5a44d5333b01d7e1743a" }, @@ -1388,7 +1425,8 @@ name = "watchdog" version = "6.0.0" source = { registry = "https://pypi.devinfra.sentry.io/simple" } resolution-markers = [ - "sys_platform == 'darwin'", + "python_full_version >= '3.14' and sys_platform == 'darwin'", + "python_full_version < '3.14' and sys_platform == 'darwin'", ] wheels = [ { url = "https://pypi.devinfra.sentry.io/wheels/watchdog-6.0.0-cp313-cp313-macosx_10_13_x86_64.whl", hash = "sha256:76aae96b00ae814b181bb25b1b98076d5fc84e8a53cd8885a318b42b6d3a5134" }, @@ -1406,3 +1444,16 @@ dependencies = [ wheels = [ { url = "https://pypi.devinfra.sentry.io/wheels/werkzeug-3.1.6-py3-none-any.whl", hash = "sha256:7ddf3357bb9564e407607f988f683d72038551200c704012bb9a4c523d42f131" }, ] + +[[package]] +name = "zstandard" +version = "0.25.0" +source = { registry = "https://pypi.devinfra.sentry.io/simple" } +wheels = [ + { url = "https://pypi.devinfra.sentry.io/wheels/zstandard-0.25.0-cp313-cp313-macosx_11_0_arm64.whl", hash = "sha256:a1a4ae2dec3993a32247995bdfe367fc3266da832d82f8438c8570f989753de1" }, + { url = "https://pypi.devinfra.sentry.io/wheels/zstandard-0.25.0-cp313-cp313-manylinux2014_aarch64.manylinux_2_17_aarch64.whl", hash = "sha256:bfc4e20784722098822e3eee42b8e576b379ed72cca4a7cb856ae733e62192ea" }, + { url = "https://pypi.devinfra.sentry.io/wheels/zstandard-0.25.0-cp313-cp313-manylinux2014_x86_64.manylinux_2_17_x86_64.whl", hash = "sha256:8e735494da3db08694d26480f1493ad2cf86e99bdd53e8e9771b2752a5c0246a" }, + { url = "https://pypi.devinfra.sentry.io/wheels/zstandard-0.25.0-cp314-cp314-macosx_11_0_arm64.whl", hash = "sha256:05df5136bc5a011f33cd25bc9f506e7426c0c9b3f9954f056831ce68f3b6689f" }, + { url = "https://pypi.devinfra.sentry.io/wheels/zstandard-0.25.0-cp314-cp314-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:223415140608d0f0da010499eaa8ccdb9af210a543fac54bce15babbcfc78439" }, + { url = "https://pypi.devinfra.sentry.io/wheels/zstandard-0.25.0-cp314-cp314-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:e09bb6252b6476d8d56100e8147b803befa9a12cea144bbe629dd508800d1ad0" }, +]