Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ namespace Setting
extern const SettingsObjectStorageGranularityLevel cluster_table_function_split_granularity;
extern const SettingsBool parallel_replicas_for_cluster_engines;
extern const SettingsString object_storage_cluster;
extern const SettingsBool object_storage_remote_initiator;
extern const SettingsString object_storage_remote_initiator_cluster;
extern const SettingsInt64 delta_lake_snapshot_start_version;
extern const SettingsInt64 delta_lake_snapshot_end_version;
extern const SettingsUInt64 lock_object_storage_task_distribution_ms;
Expand Down Expand Up @@ -720,7 +722,9 @@ QueryProcessingStage::Enum StorageObjectStorageCluster::getQueryProcessingStage(
ContextPtr context, QueryProcessingStage::Enum to_stage, const StorageSnapshotPtr & storage_snapshot, SelectQueryInfo & query_info) const
{
/// Full query if fall back to pure storage.
if (getClusterName(context).empty())
if (getClusterName(context).empty() // Not cluster request
&& !(context->getSettingsRef()[Setting::object_storage_remote_initiator] // Not request with remote initiator
&& !context->getSettingsRef()[Setting::object_storage_remote_initiator_cluster].value.empty()))
Comment on lines +725 to +727
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Keep unsupported storages at FetchColumns

When object_storage_remote_initiator is set for a configuration that does not support cluster mode, such as icebergLocal or other local data-lake storages where getClusterName is empty because isClusterSupported is false, this new exception makes getQueryProcessingStage report WithMergeableState. IStorageCluster::read still immediately falls back to pure_storage for those configurations, and StorageObjectStorage::read ignores processed_stage, so aggregate queries can be planned as if partial aggregation happened remotely even though only raw rows were read. Please only return the distributed stage when the subsequent read path will actually use the remote/cluster execution path.

Useful? React with 👍 / 👎.

return QueryProcessingStage::Enum::FetchColumns;

/// Distributed storage.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,10 @@ void TableFunctionObjectStorageClusterFallback<Definition, Base>::parseArguments
const auto & settings = context->getSettingsRef();

is_cluster_function = !settings[Setting::object_storage_cluster].value.empty() && typename Base::Configuration().isClusterSupported();
is_remote = settings[Setting::object_storage_remote_initiator];
// Remote initiator requires 'object_storage_cluster' or 'object_storage_remote_initiator_cluster'
is_remote = settings[Setting::object_storage_remote_initiator]
&& (!settings[Setting::object_storage_cluster].value.empty()
|| !settings[Setting::object_storage_remote_initiator_cluster].value.empty());

if (is_cluster_function)
{
Expand Down
75 changes: 75 additions & 0 deletions tests/integration/test_s3_cluster/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -1509,3 +1509,78 @@ def test_object_storage_remote_initiator_without_cluster_function(started_cluste
assert users[1:] == ["s0_0_0\tdefault",
"s0_0_1\tfoo",
"s0_1_0\tfoo"]


def test_object_storage_remote_initiator_aggregation(started_cluster):
node = started_cluster.instances["s0_0_0"]

# Remove initiator without cluster request
# Check that aggregation works on nodes
query_id = uuid.uuid4().hex

result = node.query(
f"""
SELECT sum(value) from s3(
'http://minio1:9001/root/data/{{clickhouse,database}}/*', 'minio', '{minio_secret_key}', 'CSV',
'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))')
SETTINGS
object_storage_remote_initiator=1,
object_storage_remote_initiator_cluster='cluster_with_dots_and_user'
""",
query_id = query_id,
)

assert result == "67802152770\n"

node.query("SYSTEM FLUSH LOGS ON CLUSTER 'cluster_all'")
result_rows = node.query(
f"""
SELECT sum(result_rows)
FROM clusterAllReplicas('cluster_all', system.query_log)
WHERE type='QueryFinish' AND initial_query_id='{query_id}'
AND is_initial_query = 0
ORDER BY ALL
FORMAT TSV
"""
).splitlines()

# Data processed on cluster 'hidden_cluster_with_username_and_password'.
# Cluster contains two nodes, each returns one row.
assert result_rows == ["2"]

# Remove initiator without cluster request
# Check that aggregation works on nodes
query_id = uuid.uuid4().hex

result = node.query(
f"""
SELECT value % 2 as bit, sum(value) from s3(
'http://minio1:9001/root/data/{{clickhouse,database}}/*', 'minio', '{minio_secret_key}', 'CSV',
'name String, value UInt32, polygon Array(Array(Tuple(Float64, Float64)))')
GROUP BY bit
ORDER BY bit
SETTINGS
object_storage_remote_initiator=1,
object_storage_remote_initiator_cluster='cluster_with_dots_and_user'
""",
query_id = query_id,
)

assert result == "0\t41117771522\n1\t26684381248\n"

node.query("SYSTEM FLUSH LOGS ON CLUSTER 'cluster_all'")
result_rows = node.query(
f"""
SELECT sum(result_rows)
FROM clusterAllReplicas('cluster_all', system.query_log)
WHERE type='QueryFinish' AND initial_query_id='{query_id}'
AND is_initial_query = 0
ORDER BY ALL
FORMAT TSV
"""
).splitlines()

# Data processed on cluster 'hidden_cluster_with_username_and_password'.
# Cluster contains two nodes, each returns up to two rows, at least two rows totaly.
result_rows = int(result_rows[0])
assert result_rows >= 2 and result_rows <= 4
Loading