Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/Backups/BackupsWorker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -393,7 +393,7 @@ struct BackupsWorker::BackupStarter

/// The "internal" option can only be used by a query that was initiated by another query (e.g., ON CLUSTER query).
/// It should not be allowed for an initial query explicitly specified by a user.
if (is_internal_backup && (query_context->getClientInfo().query_kind == ClientInfo::QueryKind::INITIAL_QUERY))
if (is_internal_backup && !query_context->isDDLOrOnClusterInternal())
throw Exception(ErrorCodes::ACCESS_DENIED, "Setting 'internal' cannot be set explicitly");

on_cluster = !backup_query->cluster.empty() || is_internal_backup;
Expand Down Expand Up @@ -846,7 +846,7 @@ struct BackupsWorker::RestoreStarter

/// The "internal" option can only be used by a query that was initiated by another query (e.g., ON CLUSTER query).
/// It should not be allowed for an initial query explicitly specified by a user.
if (is_internal_restore && (query_context->getClientInfo().query_kind == ClientInfo::QueryKind::INITIAL_QUERY))
if (is_internal_restore && !query_context->isDDLOrOnClusterInternal())
throw Exception(ErrorCodes::ACCESS_DENIED, "Setting 'internal' cannot be set explicitly");

/// RESTORE is a write operation, it should be forbidden in strict readonly mode (readonly=1).
Expand Down
4 changes: 2 additions & 2 deletions src/Databases/DatabaseReplicated.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1276,7 +1276,7 @@ BlockIO DatabaseReplicated::tryEnqueueReplicatedDDL(const ASTPtr & query, Contex
if (is_readonly)
throw Exception(ErrorCodes::NO_ZOOKEEPER, "Database is in readonly mode, because it cannot connect to ZooKeeper");

if (!flags.internal && (query_context->getClientInfo().query_kind != ClientInfo::QueryKind::INITIAL_QUERY))
if (!flags.internal && query_context->isDDLOrOnClusterInternal())
throw Exception(ErrorCodes::INCORRECT_QUERY, "It's not initial query. ON CLUSTER is not allowed for Replicated database.");

checkQueryValid(query, query_context);
Expand Down Expand Up @@ -1423,7 +1423,7 @@ void DatabaseReplicated::recoverLostReplica(const ZooKeeperPtr & current_zookeep
{
auto query_context = Context::createCopy(getContext());
query_context->makeQueryContext();
query_context->setQueryKind(ClientInfo::QueryKind::SECONDARY_QUERY);
query_context->setDDLOrOnClusterInternal(true);
query_context->setQueryKindReplicatedDatabaseInternal();
query_context->setCurrentDatabase(getDatabaseName());
query_context->setCurrentQueryId("");
Expand Down
2 changes: 1 addition & 1 deletion src/Interpreters/ClientInfo.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ class ClientInfo
{
NO_QUERY = 0, /// Uninitialized object.
INITIAL_QUERY = 1,
SECONDARY_QUERY = 2, /// Query that was initiated by another query for distributed or ON CLUSTER query execution.
SECONDARY_QUERY = 2, /// Query that was initiated by another query for distributed query execution.
};

ClientInfo();
Expand Down
1 change: 1 addition & 0 deletions src/Interpreters/Context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1150,6 +1150,7 @@ ContextData::ContextData(const ContextData &o) :
global_context(o.global_context),
buffer_context(o.buffer_context),
is_internal_query(o.is_internal_query),
is_ddl_or_on_cluster_internal(o.is_ddl_or_on_cluster_internal),
is_view_inner_query(o.is_view_inner_query),
positional_arguments_already_resolved(o.positional_arguments_already_resolved),
temp_data_on_disk(o.temp_data_on_disk),
Expand Down
8 changes: 8 additions & 0 deletions src/Interpreters/Context.h
Original file line number Diff line number Diff line change
Expand Up @@ -513,6 +513,11 @@ class ContextData

/// A flag, used to distinguish between user query and internal query to a database engine (MaterializedPostgreSQL).
bool is_internal_query = false;
/// Set for queries created internally by the server for DDL replication (ON CLUSTER, DatabaseReplicated)
/// and internal backup coordination.
/// Unlike query_kind == SECONDARY_QUERY (which comes from the client and can be spoofed),
/// this flag can only be set server-side and is safe to use for security-sensitive checks.
bool is_ddl_or_on_cluster_internal = false;
/// True when this context belongs to the inner query of an expanded view.
/// Positional arguments inside views must be resolved even on remote/secondary nodes where
/// enable_positional_arguments would otherwise be skipped (views are expanded on remote nodes,
Expand Down Expand Up @@ -1448,6 +1453,9 @@ class Context: public ContextData, public std::enable_shared_from_this<Context>
bool isInternalQuery() const { return is_internal_query; }
void setInternalQuery(bool internal) { is_internal_query = internal; }

bool isDDLOrOnClusterInternal() const { return is_ddl_or_on_cluster_internal; }
void setDDLOrOnClusterInternal(bool value) { is_ddl_or_on_cluster_internal = value; }

bool isViewInnerQuery() const { return is_view_inner_query; }
void setIsViewInnerQuery(bool value) { is_view_inner_query = value; }

Expand Down
4 changes: 2 additions & 2 deletions src/Interpreters/DDLTask.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ ContextMutablePtr DDLTaskBase::makeQueryContext(ContextPtr from_context, const Z
auto query_context = Context::createCopy(from_context);
query_context->makeQueryContext();
query_context->setCurrentQueryId(""); // generate random query_id
query_context->setQueryKind(ClientInfo::QueryKind::SECONDARY_QUERY);
query_context->setDDLOrOnClusterInternal(true);
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Preserve worker mode for cluster table functions

When a DDL worker executes CREATE ... ON CLUSTER ... AS SELECT ... FROM fileCluster/urlCluster/s3Cluster, this context is no longer marked as SECONDARY_QUERY, while the *Cluster table functions still use query_kind == SECONDARY_QUERY to choose their worker-local storage path (for example TableFunctionFileCluster.cpp:23 and TableFunctionURLCluster.cpp:13). As a result, each DDL worker can fan out to the whole cluster again instead of reading only its assigned local input, multiplying remote reads and potentially duplicating the data inserted by CTAS on every replica; keep the secondary query kind for execution semantics or update those table-function checks to also recognize this new internal DDL flag.

Useful? React with 👍 / 👎.

if (entry.settings)
query_context->applySettingsChanges(*entry.settings);
return query_context;
Expand Down Expand Up @@ -598,7 +598,7 @@ void DatabaseReplicatedTask::parseQueryFromEntry(ContextPtr context)
ContextMutablePtr DatabaseReplicatedTask::makeQueryContext(ContextPtr from_context, const ZooKeeperPtr & zookeeper)
{
auto query_context = DDLTaskBase::makeQueryContext(from_context, zookeeper);
query_context->setQueryKind(ClientInfo::QueryKind::SECONDARY_QUERY);
query_context->setDDLOrOnClusterInternal(true);
query_context->setQueryKindReplicatedDatabaseInternal();
query_context->setCurrentDatabase(database->getDatabaseName());

Expand Down
12 changes: 6 additions & 6 deletions src/Interpreters/InterpreterCreateQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ BlockIO InterpreterCreateQuery::createDatabase(ASTCreateQuery & create)
}
else
{
bool is_on_cluster = getContext()->getClientInfo().query_kind == ClientInfo::QueryKind::SECONDARY_QUERY;
bool is_on_cluster = getContext()->isDDLOrOnClusterInternal();
if (create.uuid != UUIDHelpers::Nil && !is_on_cluster && !internal)
throw Exception(ErrorCodes::INCORRECT_QUERY, "Ordinary database engine does not support UUID");

Expand Down Expand Up @@ -1393,7 +1393,7 @@ void InterpreterCreateQuery::assertOrSetUUID(ASTCreateQuery & create, const Data
const auto * kind_upper = create.is_dictionary ? "DICTIONARY" : "TABLE";
bool is_replicated_database_internal = database->getEngineName() == "Replicated" && getContext()->getClientInfo().is_replicated_database_internal;
bool from_path = create.attach_from_path.has_value();
bool is_on_cluster = getContext()->getClientInfo().query_kind == ClientInfo::QueryKind::SECONDARY_QUERY;
bool is_on_cluster = getContext()->isDDLOrOnClusterInternal();

if (database->getEngineName() == "Replicated" && create.uuid != UUIDHelpers::Nil && !is_replicated_database_internal && !internal && !is_on_cluster && !create.attach)
{
Expand Down Expand Up @@ -1598,7 +1598,7 @@ BlockIO InterpreterCreateQuery::createTable(ASTCreateQuery & create)
fs::path user_files = fs::path(getContext()->getUserFilesPath()).lexically_normal();
fs::path root_path = fs::path(getContext()->getPath()).lexically_normal();

if (getContext()->getClientInfo().query_kind == ClientInfo::QueryKind::INITIAL_QUERY)
if (!getContext()->isDDLOrOnClusterInternal())
{
fs::path data_path = fs::path(*create.attach_from_path).lexically_normal();
if (data_path.is_relative())
Expand All @@ -1618,7 +1618,7 @@ BlockIO InterpreterCreateQuery::createTable(ASTCreateQuery & create)
"Data directory {} must be inside {} to attach it", String(data_path), String(user_files));
}
}
else if (create.attach && !create.attach_short_syntax && getContext()->getClientInfo().query_kind != ClientInfo::QueryKind::SECONDARY_QUERY)
else if (create.attach && !create.attach_short_syntax && !getContext()->isDDLOrOnClusterInternal())
{
auto log = getLogger("InterpreterCreateQuery");
LOG_WARNING(log, "ATTACH TABLE query with full table definition is not recommended: "
Expand Down Expand Up @@ -1922,9 +1922,9 @@ bool InterpreterCreateQuery::doCreateTable(ASTCreateQuery & create,
/// Before actually creating the table, check if it will lead to cyclic dependencies.
checkTableCanBeAddedWithNoCyclicDependencies(create, query_ptr, getContext());

/// Initial queries in Replicated database at this point have query_kind = ClientInfo::QueryKind::SECONNDARY_QUERY,
/// Initial queries in Replicated database at this point have is_ddl_or_on_cluster_internal = true,
/// so we need to check whether the query is initial through getZooKeeperMetadataTransaction()->isInitialQuery()
bool is_initial_query = getContext()->getClientInfo().query_kind == ClientInfo::QueryKind::INITIAL_QUERY ||
bool is_initial_query = !getContext()->isDDLOrOnClusterInternal() ||
(getContext()->getZooKeeperMetadataTransaction() && getContext()->getZooKeeperMetadataTransaction()->isInitialQuery());
bool is_predefined_database = DatabaseCatalog::isPredefinedDatabase(create.getDatabase());
if (!internal && is_initial_query && !is_predefined_database)
Expand Down
4 changes: 2 additions & 2 deletions src/Interpreters/InterpreterDropQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ BlockIO InterpreterDropQuery::executeToTableImpl(const ContextPtr & context_, AS
"Table {} is not a Dictionary",
table_id.getNameForLogs());

bool secondary_query = getContext()->getClientInfo().query_kind == ClientInfo::QueryKind::SECONDARY_QUERY;
bool secondary_query = getContext()->isDDLOrOnClusterInternal();
if (!secondary_query && settings[Setting::ignore_drop_queries_probability] != 0 && ast_drop_query.kind == ASTDropQuery::Kind::Drop
&& std::uniform_real_distribution<>(0.0, 1.0)(thread_local_rng) <= settings[Setting::ignore_drop_queries_probability])
{
Expand Down Expand Up @@ -718,7 +718,7 @@ void InterpreterDropQuery::executeDropQuery(ASTDropQuery::Kind kind, ContextPtr

if (ignore_sync_setting)
drop_context->setSetting("database_atomic_wait_for_drop_and_detach_synchronously", false);
drop_context->setQueryKind(ClientInfo::QueryKind::SECONDARY_QUERY);
drop_context->setDDLOrOnClusterInternal(true);
if (auto txn = current_context->getZooKeeperMetadataTransaction())
{
/// For Replicated database
Expand Down
4 changes: 2 additions & 2 deletions src/Storages/Kafka/StorageKafkaUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -253,8 +253,8 @@ void registerStorageKafka(StorageFactory & factory)
throw Exception(
ErrorCodes::BAD_ARGUMENTS, "Either specify both zookeeper path and replica name or none of them");

const auto is_on_cluster = args.getLocalContext()->getClientInfo().query_kind == ClientInfo::QueryKind::SECONDARY_QUERY;
const auto is_replicated_database = args.getLocalContext()->getClientInfo().query_kind == ClientInfo::QueryKind::SECONDARY_QUERY
const auto is_on_cluster = args.getLocalContext()->isDDLOrOnClusterInternal();
const auto is_replicated_database = args.getLocalContext()->isDDLOrOnClusterInternal()
&& DatabaseCatalog::instance().getDatabase(args.table_id.database_name)->getEngineName() == "Replicated";

// UUID macro is only allowed:
Expand Down
2 changes: 1 addition & 1 deletion src/Storages/MergeTree/registerStorageMergeTree.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ static TableZnodeInfo extractZooKeeperPathAndReplicaNameFromEngineArgs(

if (has_valid_arguments)
{
bool is_replicated_database = local_context->getClientInfo().query_kind == ClientInfo::QueryKind::SECONDARY_QUERY &&
bool is_replicated_database = local_context->isDDLOrOnClusterInternal() &&
DatabaseCatalog::instance().getDatabase(table_id.database_name)->getEngineName() == "Replicated";


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ ObjectStorageQueueMetadata::tryAcquireBucket(const Bucket & bucket, const Proces

void ObjectStorageQueueMetadata::alterSettings(const SettingsChanges & changes, const ContextPtr & context)
{
bool is_initial_query = context->getClientInfo().query_kind == ClientInfo::QueryKind::INITIAL_QUERY ||
bool is_initial_query = !context->isDDLOrOnClusterInternal() ||
(context->getZooKeeperMetadataTransaction() && context->getZooKeeperMetadataTransaction()->isInitialQuery());

const fs::path alter_settings_lock_path = zookeeper_path / "alter_settings_lock";
Expand Down
4 changes: 2 additions & 2 deletions src/Storages/ObjectStorageQueue/registerQueueStorage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,8 @@ StoragePtr createQueueStorage(const StorageFactory::Arguments & args)
auto database = DatabaseCatalog::instance().tryGetDatabase(args.table_id.database_name);
const String database_engine = database ? database->getEngineName() : "";

bool is_on_cluster = args.getLocalContext()->getClientInfo().query_kind == ClientInfo::QueryKind::SECONDARY_QUERY;
bool is_replicated_database = args.getLocalContext()->getClientInfo().query_kind == ClientInfo::QueryKind::SECONDARY_QUERY &&
bool is_on_cluster = args.getLocalContext()->isDDLOrOnClusterInternal();
bool is_replicated_database = args.getLocalContext()->isDDLOrOnClusterInternal() &&
database_engine == "Replicated";

/// Allow implicit {uuid} macros only for keeper_path in ON CLUSTER queries
Expand Down
4 changes: 2 additions & 2 deletions src/Storages/TableZnodeInfo.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ namespace ErrorCodes

TableZnodeInfo TableZnodeInfo::resolve(const String & requested_path, const String & requested_replica_name, const StorageID & table_id, const ASTCreateQuery & query, LoadingStrictnessLevel mode, const ContextPtr & context)
{
bool is_on_cluster = context->getClientInfo().query_kind == ClientInfo::QueryKind::SECONDARY_QUERY;
bool is_replicated_database = context->getClientInfo().query_kind == ClientInfo::QueryKind::SECONDARY_QUERY &&
bool is_on_cluster = context->isDDLOrOnClusterInternal();
bool is_replicated_database = context->isDDLOrOnClusterInternal() &&
DatabaseCatalog::instance().getDatabase(table_id.database_name)->getEngineName() == "Replicated";

/// Allow implicit {uuid} macros only for zookeeper_path in ON CLUSTER queries
Expand Down
Loading