From 6b9c71ad297dbb307e313436926e3024e48a432d Mon Sep 17 00:00:00 2001 From: Anton Ivashkin Date: Thu, 4 Jun 2026 17:20:04 +0200 Subject: [PATCH 1/2] Fix aggregation flow with remote initiator --- .../StorageObjectStorageCluster.cpp | 6 +- ...leFunctionObjectStorageClusterFallback.cpp | 5 +- tests/integration/test_s3_cluster/test.py | 74 +++++++++++++++++++ 3 files changed, 83 insertions(+), 2 deletions(-) diff --git a/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp b/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp index e5131d06ae2e..592345b2fa30 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp @@ -36,6 +36,8 @@ namespace Setting extern const SettingsObjectStorageGranularityLevel cluster_table_function_split_granularity; extern const SettingsBool parallel_replicas_for_cluster_engines; extern const SettingsString object_storage_cluster; + extern const SettingsBool object_storage_remote_initiator; + extern const SettingsString object_storage_remote_initiator_cluster; extern const SettingsInt64 delta_lake_snapshot_start_version; extern const SettingsInt64 delta_lake_snapshot_end_version; extern const SettingsUInt64 lock_object_storage_task_distribution_ms; @@ -720,7 +722,9 @@ QueryProcessingStage::Enum StorageObjectStorageCluster::getQueryProcessingStage( ContextPtr context, QueryProcessingStage::Enum to_stage, const StorageSnapshotPtr & storage_snapshot, SelectQueryInfo & query_info) const { /// Full query if fall back to pure storage. - if (getClusterName(context).empty()) + if (getClusterName(context).empty() // Not cluster request + && !(context->getSettingsRef()[Setting::object_storage_remote_initiator] // Not request with remote initiator + && !context->getSettingsRef()[Setting::object_storage_remote_initiator_cluster].value.empty())) return QueryProcessingStage::Enum::FetchColumns; /// Distributed storage. diff --git a/src/TableFunctions/TableFunctionObjectStorageClusterFallback.cpp b/src/TableFunctions/TableFunctionObjectStorageClusterFallback.cpp index 8b2819e42d46..16c6656e3317 100644 --- a/src/TableFunctions/TableFunctionObjectStorageClusterFallback.cpp +++ b/src/TableFunctions/TableFunctionObjectStorageClusterFallback.cpp @@ -117,7 +117,10 @@ void TableFunctionObjectStorageClusterFallback::parseArguments const auto & settings = context->getSettingsRef(); is_cluster_function = !settings[Setting::object_storage_cluster].value.empty() && typename Base::Configuration().isClusterSupported(); - is_remote = settings[Setting::object_storage_remote_initiator]; + // Remote initiator requires 'object_storage_cluster' or 'object_storage_remote_initiator_cluster' + is_remote = settings[Setting::object_storage_remote_initiator] + && (!settings[Setting::object_storage_cluster].value.empty() + || !settings[Setting::object_storage_remote_initiator_cluster].value.empty()); if (is_cluster_function) { diff --git a/tests/integration/test_s3_cluster/test.py b/tests/integration/test_s3_cluster/test.py index fa533379a18f..afa832ee4a9e 100644 --- a/tests/integration/test_s3_cluster/test.py +++ b/tests/integration/test_s3_cluster/test.py @@ -1509,3 +1509,77 @@ def test_object_storage_remote_initiator_without_cluster_function(started_cluste assert users[1:] == ["s0_0_0\tdefault", "s0_0_1\tfoo", "s0_1_0\tfoo"] + + +def test_object_storage_remote_initiator_aggregation(started_cluster): + node = started_cluster.instances["s0_0_0"] + + # Remove initiator without cluster request + # Check that aggregation works on nodes + query_id = uuid.uuid4().hex + + result = node.query( + f""" + SELECT sum(value) from s3( + 'http://minio1:9001/root/data/{{clickhouse,database}}/*', 'minio', '{minio_secret_key}', 'CSV', + 'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))') + SETTINGS + object_storage_remote_initiator=1, + object_storage_remote_initiator_cluster='cluster_with_dots_and_user' + """, + query_id = query_id, + ) + + assert result == "67802152770\n" + + node.query("SYSTEM FLUSH LOGS ON CLUSTER 'cluster_all'") + result_rows = node.query( + f""" + SELECT sum(result_rows) + FROM clusterAllReplicas('cluster_all', system.query_log) + WHERE type='QueryFinish' AND initial_query_id='{query_id}' + AND is_initial_query = 0 + ORDER BY ALL + FORMAT TSV + """ + ).splitlines() + + # Data processed on cluster 'hidden_cluster_with_username_and_password'. + # Cluster contains two nodes, each returns one row. + assert result_rows == ["2"] + + # Remove initiator without cluster request + # Check that aggregation works on nodes + query_id = uuid.uuid4().hex + + result = node.query( + f""" + SELECT value % 2 as bit, sum(value) from s3( + 'http://minio1:9001/root/data/{{clickhouse,database}}/*', 'minio', '{minio_secret_key}', 'CSV', + 'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))') + GROUP BY bit + SETTINGS + object_storage_remote_initiator=1, + object_storage_remote_initiator_cluster='cluster_with_dots_and_user' + """, + query_id = query_id, + ) + + assert result == "0\t41117771522\n1\t26684381248\n" + + node.query("SYSTEM FLUSH LOGS ON CLUSTER 'cluster_all'") + result_rows = node.query( + f""" + SELECT sum(result_rows) + FROM clusterAllReplicas('cluster_all', system.query_log) + WHERE type='QueryFinish' AND initial_query_id='{query_id}' + AND is_initial_query = 0 + ORDER BY ALL + FORMAT TSV + """ + ).splitlines() + + # Data processed on cluster 'hidden_cluster_with_username_and_password'. + # Cluster contains two nodes, each returns up to two rows, at least two rows totaly. + result_rows = int(result_rows[0]) + assert result_rows >= 2 and result_rows <= 4 From e72ebfe1776b07ddb0b65732eff7aaa401b0bff4 Mon Sep 17 00:00:00 2001 From: Anton Ivashkin Date: Thu, 4 Jun 2026 18:40:18 +0200 Subject: [PATCH 2/2] Fix test --- tests/integration/test_s3_cluster/test.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/integration/test_s3_cluster/test.py b/tests/integration/test_s3_cluster/test.py index afa832ee4a9e..8262c6c7d5b3 100644 --- a/tests/integration/test_s3_cluster/test.py +++ b/tests/integration/test_s3_cluster/test.py @@ -1558,6 +1558,7 @@ def test_object_storage_remote_initiator_aggregation(started_cluster): 'http://minio1:9001/root/data/{{clickhouse,database}}/*', 'minio', '{minio_secret_key}', 'CSV', 'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))') GROUP BY bit + ORDER BY bit SETTINGS object_storage_remote_initiator=1, object_storage_remote_initiator_cluster='cluster_with_dots_and_user'