Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
747936a
feat(clickhouse): Add clickhouse-connect (HTTP) driver behind runtime…
claude Jun 18, 2026
925c465
refactor(clickhouse): Delegate connect-pool retries to clickhouse-con…
claude Jun 18, 2026
15c08f2
fix(clickhouse): Match native exception handling and cap timeout at 30s
claude Jun 18, 2026
2cfaab9
feat(clickhouse): Add runtime override for connect pool size
claude Jun 18, 2026
0635e19
refactor(clickhouse): Move driver selection out of the native pool
claude Jun 18, 2026
740897e
refactor(clickhouse): Introduce a driver-selecting pool
claude Jun 18, 2026
1e8affa
refactor(clickhouse): Always return a DriverSelectingPool from the cache
claude Jun 18, 2026
2bf6855
refactor(clickhouse): Always use the default ClickHouse HTTP port
claude Jun 18, 2026
0500c2f
refactor(clickhouse): Select reader (native vs HTTP) in get_reader
claude Jun 18, 2026
e124ee5
style(tests): Fix pre-existing E712 comparisons in test_cluster
claude Jun 18, 2026
5bf8da9
refactor(clickhouse): Apply driver selection to all connections, not …
claude Jun 18, 2026
a87be31
fix(clickhouse): Align connect timeouts with native; 30s read timeout
claude Jun 18, 2026
d8cd5e4
refactor(clickhouse): Abstract base classes for pools and readers
claude Jun 18, 2026
778ccc9
fix(clickhouse): Use the cluster's configured http_port for the conne…
claude Jun 18, 2026
8ecbb7f
perf(clickhouse): Cache deleter's local-node lookup
claude Jun 18, 2026
fac0556
fix(clickhouse): Resolve driver once per get_reader/get_deleter
claude Jun 18, 2026
30915ab
fix(clickhouse): Match FakeClickhouseCluster override signature; drop…
claude Jun 18, 2026
d88912b
refactor(clickhouse): Have pools build their own reader; cluster stay…
claude Jun 18, 2026
1c28b61
refactor(clickhouse): Collapse to a single ClickhouseReader; regenera…
claude Jun 18, 2026
69af580
fix(clickhouse): Include http_port in the connection cache key
claude Jun 18, 2026
830b6f1
refactor(clickhouse): Route by-host pool acquisition through Connecti…
claude Jun 18, 2026
348135c
fix(clickhouse): Single runtime-sized connect pool; driver-aware admi…
claude Jun 18, 2026
b144516
refactor(clickhouse): Decide driver inside ConnectionCache; lock pool…
claude Jun 18, 2026
4d788a1
test(clickhouse): Pin connect type-name compatibility with reader tra…
claude Jun 18, 2026
3f7d05c
refactor(clickhouse): Make ClickhouseNode carry native_port and http_…
claude Jun 18, 2026
c485a37
Merge remote-tracking branch 'origin/master' into claude/clickhouse-d…
claude Jun 18, 2026
58ac330
[getsentry/action-github-commit] Auto commit
getsantry[bot] Jun 18, 2026
d080300
fix(tests): Satisfy strict mypy in connect/admin test files
claude Jun 18, 2026
9a71608
docs(clickhouse): Document the HTTP-path tracing limitation
claude Jun 18, 2026
fe24fb7
fix(clickhouse): Scope the 30s read timeout to user reads only
claude Jun 18, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,4 @@ gocd/templates/vendor/
gocd/generated-pipelines/
Brewfile.lock.json
.zed/
dump.rdb
3 changes: 3 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ version = "26.7.0.dev0"
dependencies = [
"blinker>=1.9",
"click>=8.1.7",
"clickhouse-connect>=0.8.0",
"clickhouse-driver>=0.2.10",
"confluent-kafka>=2.7.0",
"datadog>=0.49.1",
Expand Down Expand Up @@ -125,6 +126,8 @@ exclude = ["^rust_snuba/", "^tests/datasets/", "^tests/query/"]
[[tool.mypy.overrides]]
module = [
"_strptime",
"clickhouse_connect",
"clickhouse_connect.*",
"clickhouse_driver",
"clickhouse_driver.errors",
"confluent_kafka",
Expand Down
63 changes: 42 additions & 21 deletions snuba/admin/clickhouse/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,13 @@

from snuba import settings
from snuba.clickhouse.native import ClickhousePool
from snuba.clusters.cluster import ClickhouseClientSettings, ClickhouseCluster
from snuba.clusters.cluster import (
ClickhouseClientSettings,
ClickhouseCluster,
ClickhouseNode,
connection_cache,
use_clickhouse_connect_driver,
)
from snuba.datasets.storage import ReadableTableStorage
from snuba.datasets.storages.factory import get_storage
from snuba.datasets.storages.storage_key import StorageKey
Expand Down Expand Up @@ -43,7 +49,7 @@ def is_valid_node(host: str, port: int, cluster: ClickhouseCluster, storage_name
},
)

return any(node.host_name == host and node.port == port for node in nodes)
return any(node.host_name == host and node.native_port == port for node in nodes)


def _get_storage(storage_name: str) -> ReadableTableStorage:
Expand Down Expand Up @@ -71,7 +77,7 @@ def _validate_node(
"host": clickhouse_host,
"port": clickhouse_port,
"query_host": cluster.get_query_node().host_name,
"query_port": cluster.get_query_node().port,
"query_port": cluster.get_query_node().native_port,
},
)

Expand All @@ -89,24 +95,38 @@ def _build_validated_pool(
password: str,
client_settings: ClickhouseClientSettings,
) -> ClickhousePool:
# Single chokepoint for admin ClickhousePool construction. ClickhousePool
# ships the user/password in the first hello packet of the native protocol,
# so an unvalidated host means credentials reach whatever listener answers.
# All admin helpers must go through here — never call ClickhousePool
# directly from this module. The regression test
# test_no_direct_clickhouse_pool_construction_in_admin enforces this.
# Single chokepoint for admin ClickhousePool acquisition. A pool ships the
# user/password to the node (the native protocol's first hello packet, or
# the HTTP auth header), so an unvalidated host means credentials reach
# whatever listener answers. All admin helpers must go through here — never
# acquire a pool from the connection cache directly in this module. The
# regression test test_no_direct_clickhouse_pool_construction_in_admin
# enforces this.
_validate_node(clickhouse_host, clickhouse_port, cluster, storage_name)
return ClickhousePool(
clickhouse_host,
clickhouse_port,
# Go through the shared connection cache so the driver (native vs
# clickhouse-connect/HTTP) is selected by the runtime config, behind the
# abstract ClickhousePool type, just like the cluster's own connections.
return connection_cache.get_node_connection(
client_settings,
ClickhouseNode(clickhouse_host, clickhouse_port, http_port=cluster.get_http_port()),
username,
password,
database,
max_pool_size=2,
client_settings=client_settings.value.settings,
secure=False,
ca_certs=None,
verify=False,
)


Comment thread
sentry[bot] marked this conversation as resolved.
def _driver_cache_token() -> str:
# Part of the admin connection cache keys so that flipping the
# use_clickhouse_connect_driver runtime flag re-resolves admin connections
# to the new driver, instead of returning a pool pinned to whichever driver
# was active when the entry was first cached. This keeps admin traffic
# switchable at runtime, like the cluster query/reader paths.
return "connect" if use_clickhouse_connect_driver() else "native"


def get_ro_node_connection(
clickhouse_host: str,
clickhouse_port: int,
Expand All @@ -115,7 +135,7 @@ def get_ro_node_connection(
) -> ClickhousePool:
storage = _get_storage(storage_name)

key = f"{storage.get_storage_key()}-{clickhouse_host}"

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Admin pool size limit removed

Medium Severity

Admin ClickHouse connections no longer cap the native pool at two connections. _build_validated_pool now uses the shared connection_cache, which builds pools with the default CLICKHOUSE_MAX_POOL_SIZE (25) or the HTTP runtime pool size, so targeted admin nodes can see far more simultaneous connections than before.

Fix in Cursor Fix in Web

Reviewed by Cursor Bugbot for commit 9a71608. Configure here.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is intentional, not an oversight. Earlier in the PR the admin helper built its own ClickhouseNativePool(..., max_pool_size=2); we deliberately removed that per-caller cap so admin acquires the one shared, runtime-config-sized pool from ConnectionCache like every other caller (the connect pool always sizes from clickhouse_connect_pool_size, the native pool from CLICKHOUSE_MAX_POOL_SIZE). Keeping a hardcoded 2 would mean reintroducing the per-caller max_pool_size plumbing through the cache that we just took out, and would give admin a different-sized pool than everything else hitting the same node.

Admin is low-traffic (human operators running system queries / tracing), so sharing the standard sizing is acceptable, and pool size is now runtime-tunable via clickhouse_connect_pool_size if it ever needs limiting. Leaving as-is.


Generated by Claude Code

key = f"{storage.get_storage_key()}-{clickhouse_host}-{_driver_cache_token()}"
if key in NODE_CONNECTIONS:
return NODE_CONNECTIONS[key]

Expand Down Expand Up @@ -162,8 +182,9 @@ def get_ro_node_connection(
def get_ro_query_node_connection(
storage_name: str, client_settings: ClickhouseClientSettings
) -> ClickhousePool:
if storage_name in CLUSTER_CONNECTIONS:
return CLUSTER_CONNECTIONS[storage_name]
key = f"{storage_name}-{_driver_cache_token()}"
if key in CLUSTER_CONNECTIONS:
return CLUSTER_CONNECTIONS[key]

storage = _get_storage(storage_name)
cluster = storage.get_cluster()
Expand All @@ -172,7 +193,7 @@ def get_ro_query_node_connection(
connection_id.hostname, connection_id.tcp_port, storage_name, client_settings
)

CLUSTER_CONNECTIONS[storage_name] = connection
CLUSTER_CONNECTIONS[key] = connection
return connection


Expand All @@ -184,7 +205,7 @@ def get_sudo_node_connection(
) -> ClickhousePool:
storage = _get_storage(storage_name)

key = f"{storage.get_storage_key()}-{clickhouse_host}-sudo"
key = f"{storage.get_storage_key()}-{clickhouse_host}-sudo-{_driver_cache_token()}"
if key in NODE_CONNECTIONS:
return NODE_CONNECTIONS[key]

Expand Down Expand Up @@ -216,7 +237,7 @@ def get_clusterless_node_connection(
cluster = storage.get_cluster()
database = cluster.get_database()

key = f"{storage.get_storage_key()}-{clickhouse_host}-clusterless-{database}"
key = f"{storage.get_storage_key()}-{clickhouse_host}-clusterless-{database}-{_driver_cache_token()}"
if key in NODE_CONNECTIONS:
return NODE_CONNECTIONS[key]

Expand Down Expand Up @@ -245,7 +266,7 @@ def get_ro_clusterless_node_connection(
cluster = storage.get_cluster()
database = cluster.get_database()

key = f"{storage.get_storage_key()}-{clickhouse_host}-clusterless-ro-{database}"
key = f"{storage.get_storage_key()}-{clickhouse_host}-clusterless-ro-{database}-{_driver_cache_token()}"
if key in NODE_CONNECTIONS:
return NODE_CONNECTIONS[key]

Expand Down
4 changes: 3 additions & 1 deletion snuba/admin/clickhouse/copy_tables.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,9 @@ def copy_tables(
skip_on_cluster: bool = False,
cluster_name_override: Optional[str] = None,
) -> CopyTablesResponse:
settings = ClickhouseClientSettings.QUERY
# Table copies can run long, so use the unbounded INTERNAL profile rather
# than the 30s user-read QUERY profile.
settings = ClickhouseClientSettings.INTERNAL
source_connection = get_clusterless_node_connection(
source_host, 9000, storage_name, client_settings=settings
)
Expand Down
4 changes: 2 additions & 2 deletions snuba/admin/clickhouse/nodes.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ def _get_nodes(storage_key: StorageKey, local: bool = True) -> Sequence[Node]:
return []
else:
return [
{"host": node.host_name, "port": node.port}
{"host": node.host_name, "port": node.native_port}
for node in (
cluster.get_local_nodes() if local else cluster.get_distributed_nodes()
)
Expand All @@ -62,7 +62,7 @@ def _get_query_node(storage_key: StorageKey) -> Optional[Node]:
try:
cluster = get_storage(storage_key).get_cluster()
query_node = cluster.get_query_node()
return {"host": query_node.host_name, "port": query_node.port}
return {"host": query_node.host_name, "port": query_node.native_port}
except (AssertionError, KeyError, UndefinedClickhouseCluster):
return None

Expand Down
17 changes: 11 additions & 6 deletions snuba/cli/cleanup.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ def cleanup(

from snuba.cleanup import logger, run_cleanup
from snuba.clickhouse.native import ClickhousePool
from snuba.clusters.cluster import ClickhouseNode, connection_cache

storage = get_writable_storage(StorageKey(storage_name))

Expand All @@ -81,16 +82,20 @@ def cleanup(
cluster = storage.get_cluster()
database = cluster.get_database()

connection: ClickhousePool
if clickhouse_host and clickhouse_port:
connection = ClickhousePool(
clickhouse_host,
clickhouse_port,
# Go through the shared connection cache so the driver (native vs
# clickhouse-connect/HTTP) is selected by the runtime config, behind
# the abstract ClickhousePool type.
connection = connection_cache.get_node_connection(
ClickhouseClientSettings.CLEANUP,
ClickhouseNode(clickhouse_host, clickhouse_port, http_port=cluster.get_http_port()),
clickhouse_user,
clickhouse_password,
database,
clickhouse_secure,
clickhouse_ca_certs,
clickhouse_verify,
secure=clickhouse_secure,
ca_certs=clickhouse_ca_certs,
verify=clickhouse_verify,
)
elif not cluster.is_single_node():
raise click.ClickException("Provide ClickHouse host and port for cleanup")
Expand Down
23 changes: 16 additions & 7 deletions snuba/cli/optimize.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ def optimize(

from snuba.clickhouse.native import ClickhousePool
from snuba.clickhouse.optimize.optimize import logger
from snuba.clusters.cluster import ClickhouseNode, connection_cache

setup_logging(log_level)
setup_sentry()
Expand All @@ -100,17 +101,25 @@ def optimize(
# passing this information won't be necessary, and running this command once
# will ensure that optimize is performed on all of the individual nodes for
# that cluster.
connection: ClickhousePool
if clickhouse_host and clickhouse_port:
connection = ClickhousePool(
clickhouse_host,
clickhouse_port,
# Go through the shared connection cache so the driver (native vs
# clickhouse-connect/HTTP) is selected by the runtime config, behind
# the abstract ClickhousePool type. The OPTIMIZE timeout is carried by
# the client settings profile the cache reads.
connection = connection_cache.get_node_connection(
ClickhouseClientSettings.OPTIMIZE,
ClickhouseNode(
clickhouse_host,
clickhouse_port,
http_port=storage.get_cluster().get_http_port(),
),
clickhouse_user,
clickhouse_password,
database,
clickhouse_secure,
clickhouse_ca_certs,
clickhouse_verify,
send_receive_timeout=ClickhouseClientSettings.OPTIMIZE.value.timeout,
secure=clickhouse_secure,
ca_certs=clickhouse_ca_certs,
verify=clickhouse_verify,
)
elif not storage.get_cluster().is_single_node():
raise click.ClickException("Provide Clickhouse host and port for optimize")
Expand Down
31 changes: 22 additions & 9 deletions snuba/cli/querylog_to_csv.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import csv
import os
from datetime import datetime
from typing import NamedTuple, Optional, Sequence, Tuple

Expand All @@ -7,8 +8,12 @@

from snuba import settings
from snuba.admin.notifications.slack.client import SlackClient
from snuba.clickhouse.native import ClickhousePool
from snuba.clusters.cluster import ClickhouseClientSettings
from snuba.clusters.cluster import (
DEFAULT_CLICKHOUSE_HTTP_PORT,
ClickhouseClientSettings,
ClickhouseNode,
connection_cache,
)
from snuba.environment import setup_logging, setup_sentry

logger = structlog.get_logger().bind(module=__name__)
Expand Down Expand Up @@ -160,13 +165,21 @@ def querylog_to_csv(
query = get_query_results(event_type, [database], tables, start_time, end_time)

(clickhouse_user, clickhouse_password) = get_credentials()
connection = ClickhousePool(
host=clickhouse_host,
port=clickhouse_port,
user=clickhouse_user,
password=clickhouse_password,
database=database,
client_settings=ClickhouseClientSettings.QUERY.value.settings,
# Go through the shared connection cache so the driver (native vs
# clickhouse-connect/HTTP) is selected by the runtime config, behind the
# abstract ClickhousePool type. There is no cluster here to read an
# http_port from, so use the configured CLICKHOUSE_HTTP_PORT (the same env
# var the cluster config reads), defaulting to the well-known port.
http_port = int(os.environ.get("CLICKHOUSE_HTTP_PORT", DEFAULT_CLICKHOUSE_HTTP_PORT))
connection = connection_cache.get_node_connection(
ClickhouseClientSettings.QUERY,
ClickhouseNode(clickhouse_host, clickhouse_port, http_port=http_port),
clickhouse_user,
clickhouse_password,
database,
secure=False,
ca_certs=None,
verify=False,
)
Comment thread
sentry[bot] marked this conversation as resolved.
results = connection.execute(query)
filename = format_filename(table)
Expand Down
Loading
Loading