IS_TRANSIENT = e -> e instanceof KubernetesResourceNotFoundException;
}
diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/KubernetesPeonClient.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/KubernetesPeonClient.java
index 320bffba94bf..9fdc25fa6455 100644
--- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/KubernetesPeonClient.java
+++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/KubernetesPeonClient.java
@@ -121,13 +121,13 @@ public boolean deletePeonJob(K8sTaskId taskId)
.withName(taskId.getK8sJobName())
.delete().isEmpty());
if (result) {
- log.info("Cleaned up k8s task: %s", taskId);
+ log.info("Cleaned up k8s job: %s", taskId);
} else {
- log.info("K8s task does not exist: %s", taskId);
+ log.info("K8s job does not exist: %s", taskId);
}
return result;
} else {
- log.info("Not cleaning up task %s due to flag: debugJobs=true", taskId);
+ log.info("Not cleaning up job %s due to flag: debugJobs=true", taskId);
return true;
}
}
@@ -271,6 +271,6 @@ private void emitK8sPodMetrics(Task task, String metric, long durationMs)
{
ServiceMetricEvent.Builder metricBuilder = new ServiceMetricEvent.Builder();
IndexTaskUtils.setTaskDimensions(metricBuilder, task);
- emitter.emit(metricBuilder.build(metric, durationMs));
+ emitter.emit(metricBuilder.setMetric(metric, durationMs));
}
}
diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/runnerstrategy/KubernetesRunnerStrategy.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/runnerstrategy/KubernetesRunnerStrategy.java
new file mode 100644
index 000000000000..8b0a6374ad46
--- /dev/null
+++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/runnerstrategy/KubernetesRunnerStrategy.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.k8s.overlord.runnerstrategy;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.druid.indexing.common.task.Task;
+
+/**
+ * Implementation of {@link RunnerStrategy} that always selects the Kubernetes runner type for tasks.
+ *
+ * This strategy is specific for tasks that are intended to be executed in a Kubernetes environment.
+ * Regardless of task specifics, this strategy always returns {@link RunnerType#KUBERNETES_RUNNER_TYPE}.
+ */
+public class KubernetesRunnerStrategy implements RunnerStrategy
+{
+ @JsonCreator
+ public KubernetesRunnerStrategy()
+ {
+ }
+
+ @Override
+ public RunnerType getRunnerTypeForTask(Task task)
+ {
+ return RunnerType.KUBERNETES_RUNNER_TYPE;
+ }
+}
diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/runnerstrategy/RunnerStrategy.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/runnerstrategy/RunnerStrategy.java
new file mode 100644
index 000000000000..5aa2bc4723ab
--- /dev/null
+++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/runnerstrategy/RunnerStrategy.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.k8s.overlord.runnerstrategy;
+
+import com.fasterxml.jackson.annotation.JsonSubTypes;
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import org.apache.druid.indexing.common.task.Task;
+import org.apache.druid.k8s.overlord.KubernetesTaskRunnerFactory;
+
+/**
+ * Strategy interface for selecting the appropriate runner type based on the task spec or specific context conditions.
+ *
+ *
This interface is part of a strategy pattern and is implemented by different classes that handle
+ * the logic of selecting a runner type based on various criteria. Each task submitted to the system
+ * will pass through the strategy implementation to determine the correct runner for execution.
+ *
+ *
The strategy uses {@link RunnerType} as a standardized way of referring to and managing different types of runners.
+ */
+@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = KubernetesRunnerStrategy.class)
+@JsonSubTypes(value = {
+ @JsonSubTypes.Type(name = "k8s", value = KubernetesRunnerStrategy.class),
+ @JsonSubTypes.Type(name = "worker", value = WorkerRunnerStrategy.class),
+ @JsonSubTypes.Type(name = "taskType", value = TaskTypeRunnerStrategy.class)
+})
+public interface RunnerStrategy
+{
+ String WORKER_NAME = "worker";
+
+ /**
+ * Enumerates the available runner types, each associated with a specific method of task execution.
+ * These runner types are used by the strategies to make decisions and by the system to route tasks appropriately.
+ */
+ enum RunnerType
+ {
+ KUBERNETES_RUNNER_TYPE(KubernetesTaskRunnerFactory.TYPE_NAME),
+ WORKER_RUNNER_TYPE(WORKER_NAME);
+
+ private final String type;
+
+ RunnerType(String type)
+ {
+ this.type = type;
+ }
+
+ public String getType()
+ {
+ return type;
+ }
+ }
+
+ /**
+ * Analyzes the task and determines the appropriate runner type for executing it.
+ *
+ * @param task The task that needs to be executed.
+ * @return The runner type deemed most suitable for executing the task.
+ */
+ RunnerType getRunnerTypeForTask(Task task);
+}
diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/runnerstrategy/TaskTypeRunnerStrategy.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/runnerstrategy/TaskTypeRunnerStrategy.java
new file mode 100644
index 000000000000..6a16314be5b9
--- /dev/null
+++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/runnerstrategy/TaskTypeRunnerStrategy.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.k8s.overlord.runnerstrategy;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Preconditions;
+import org.apache.druid.indexing.common.task.Task;
+
+import javax.annotation.Nullable;
+import java.util.Map;
+
+/**
+ * Implementation of {@link RunnerStrategy} that allows dynamic selection of runner type based on task type.
+ *
+ *
This strategy checks each task's type against a set of overrides to determine the appropriate runner type.
+ * If no override is specified for a task's type, it uses a default runner.
+ *
+ *
Runner types are determined based on configurations provided at construction, including default runner
+ * type and specific overrides per task type. This strategy is designed for environments where tasks may require
+ * different execution environments (e.g., Kubernetes or worker nodes).
+ */
+public class TaskTypeRunnerStrategy implements RunnerStrategy
+{
+ @Nullable
+ private final Map overrides;
+ private final RunnerStrategy kubernetesRunnerStrategy = new KubernetesRunnerStrategy();
+ private WorkerRunnerStrategy workerRunnerStrategy;
+ private final RunnerStrategy defaultRunnerStrategy;
+ private final String defaultRunner;
+
+ @JsonCreator
+ public TaskTypeRunnerStrategy(
+ @JsonProperty("default") String defaultRunner,
+ @JsonProperty("overrides") @Nullable Map overrides
+ )
+ {
+ Preconditions.checkNotNull(defaultRunner);
+ workerRunnerStrategy = new WorkerRunnerStrategy();
+ defaultRunnerStrategy = RunnerType.WORKER_RUNNER_TYPE.getType().equals(defaultRunner) ?
+ workerRunnerStrategy : kubernetesRunnerStrategy;
+ validate(overrides);
+ this.defaultRunner = defaultRunner;
+ this.overrides = overrides;
+ }
+
+ @JsonProperty
+ @JsonInclude(JsonInclude.Include.NON_NULL)
+ public Map getOverrides()
+ {
+ return overrides;
+ }
+
+ @JsonProperty
+ public String getDefault()
+ {
+ return defaultRunner;
+ }
+
+ @Override
+ public RunnerType getRunnerTypeForTask(Task task)
+ {
+ String runnerType = null;
+ if (overrides != null) {
+ runnerType = overrides.get(task.getType());
+ }
+
+ RunnerStrategy runnerStrategy = getRunnerSelectStrategy(runnerType);
+ return runnerStrategy.getRunnerTypeForTask(task);
+ }
+
+ private RunnerStrategy getRunnerSelectStrategy(String runnerType)
+ {
+ if (runnerType == null) {
+ return defaultRunnerStrategy;
+ }
+
+ if (WORKER_NAME.equals(runnerType)) {
+ return workerRunnerStrategy;
+ } else {
+ return kubernetesRunnerStrategy;
+ }
+ }
+
+ private void validate(Map overrides)
+ {
+ if (overrides == null) {
+ return;
+ }
+
+ boolean hasValidRunnerType =
+ overrides.values().stream().allMatch(v -> RunnerType.WORKER_RUNNER_TYPE.getType().equals(v)
+ || RunnerType.KUBERNETES_RUNNER_TYPE.getType().equals(v));
+ Preconditions.checkArgument(
+ hasValidRunnerType,
+ "Invalid config in 'overrides'. Each runner type must be either '%s' or '%s'.",
+ RunnerType.WORKER_RUNNER_TYPE.getType(),
+ RunnerType.KUBERNETES_RUNNER_TYPE.getType()
+ );
+ }
+
+ @Override
+ public String toString()
+ {
+ return "TaskTypeRunnerStrategy{" +
+ "default=" + defaultRunner +
+ ", overrides=" + overrides +
+ '}';
+ }
+}
diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/runnerstrategy/WorkerRunnerStrategy.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/runnerstrategy/WorkerRunnerStrategy.java
new file mode 100644
index 000000000000..bd06f91aa8f1
--- /dev/null
+++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/runnerstrategy/WorkerRunnerStrategy.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.k8s.overlord.runnerstrategy;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.druid.indexing.common.task.Task;
+
+/**
+ * Implementation of {@link RunnerStrategy} that always selects the Worker runner type for tasks.
+ *
+ * This strategy is specific for tasks that are intended to be executed in a Worker environment.
+ * Regardless of task specifics, this strategy always returns {@link RunnerType#WORKER_RUNNER_TYPE}.
+ */
+public class WorkerRunnerStrategy implements RunnerStrategy
+{
+ @JsonCreator
+ public WorkerRunnerStrategy()
+ {
+ }
+
+ @Override
+ public RunnerType getRunnerTypeForTask(Task task)
+ {
+ return RunnerType.WORKER_RUNNER_TYPE;
+ }
+}
diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/K8sTaskAdapter.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/K8sTaskAdapter.java
index 712bc1a47e20..862b176b1159 100644
--- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/K8sTaskAdapter.java
+++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/K8sTaskAdapter.java
@@ -41,7 +41,10 @@
import io.fabric8.kubernetes.api.model.ResourceRequirementsBuilder;
import io.fabric8.kubernetes.api.model.batch.v1.Job;
import io.fabric8.kubernetes.api.model.batch.v1.JobBuilder;
+import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.error.InternalServerError;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.overlord.ForkingTaskRunner;
@@ -57,8 +60,11 @@
import org.apache.druid.k8s.overlord.common.PeonCommandContext;
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.log.StartupLoggingConfig;
+import org.apache.druid.tasklogs.TaskLogs;
import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -89,6 +95,7 @@ public abstract class K8sTaskAdapter implements TaskAdapter
protected final StartupLoggingConfig startupLoggingConfig;
protected final DruidNode node;
protected final ObjectMapper mapper;
+ protected final TaskLogs taskLogs;
public K8sTaskAdapter(
KubernetesClientApi client,
@@ -96,7 +103,8 @@ public K8sTaskAdapter(
TaskConfig taskConfig,
StartupLoggingConfig startupLoggingConfig,
DruidNode node,
- ObjectMapper mapper
+ ObjectMapper mapper,
+ TaskLogs taskLogs
)
{
this.client = client;
@@ -105,6 +113,7 @@ public K8sTaskAdapter(
this.startupLoggingConfig = startupLoggingConfig;
this.node = node;
this.mapper = mapper;
+ this.taskLogs = taskLogs;
}
@Override
@@ -132,11 +141,39 @@ public Task toTask(Job from) throws IOException
Optional taskJson = envVars.stream().filter(x -> "TASK_JSON".equals(x.getName())).findFirst();
String contents = taskJson.map(envVar -> taskJson.get().getValue()).orElse(null);
if (contents == null) {
- throw new IOException("No TASK_JSON environment variable found in pod: " + from.getMetadata().getName());
+ log.info("No TASK_JSON environment variable found in pod: %s. Trying to load task payload from deep storage.", from.getMetadata().getName());
+ return toTaskUsingDeepStorage(from);
}
return mapper.readValue(Base64Compression.decompressBase64(contents), Task.class);
}
+ private Task toTaskUsingDeepStorage(Job from) throws IOException
+ {
+ com.google.common.base.Optional taskBody = taskLogs.streamTaskPayload(getTaskId(from).getOriginalTaskId());
+ if (!taskBody.isPresent()) {
+ throw InternalServerError.exception(
+ "Could not load task payload from deep storage for job [%s]. Check the overlord logs for any errors in uploading task payload to deep storage.",
+ from.getMetadata().getName()
+ );
+ }
+ String task = IOUtils.toString(taskBody.get(), Charset.defaultCharset());
+ return mapper.readValue(task, Task.class);
+ }
+
+ @Override
+ public K8sTaskId getTaskId(Job from)
+ {
+ Map annotations = from.getSpec().getTemplate().getMetadata().getAnnotations();
+ if (annotations == null) {
+ throw DruidException.defensive().build("No annotations found on pod spec for job [%s]", from.getMetadata().getName());
+ }
+ String taskId = annotations.get(DruidK8sConstants.TASK_ID);
+ if (taskId == null) {
+ throw DruidException.defensive().build("No task_id annotation found on pod spec for job [%s]", from.getMetadata().getName());
+ }
+ return new K8sTaskId(taskId);
+ }
+
@VisibleForTesting
abstract Job createJobFromPodSpec(PodSpec podSpec, Task task, PeonCommandContext context) throws IOException;
@@ -219,15 +256,11 @@ void addEnvironmentVariables(Container mainContainer, PeonCommandContext context
.build());
}
- mainContainer.getEnv().addAll(Lists.newArrayList(
+ List envVars = Lists.newArrayList(
new EnvVarBuilder()
.withName(DruidK8sConstants.TASK_DIR_ENV)
.withValue(context.getTaskDir().getAbsolutePath())
.build(),
- new EnvVarBuilder()
- .withName(DruidK8sConstants.TASK_JSON_ENV)
- .withValue(taskContents)
- .build(),
new EnvVarBuilder()
.withName(DruidK8sConstants.JAVA_OPTS)
.withValue(Joiner.on(" ").join(context.getJavaOpts()))
@@ -244,7 +277,17 @@ void addEnvironmentVariables(Container mainContainer, PeonCommandContext context
null,
"metadata.name"
)).build()).build()
- ));
+ );
+
+ if (taskContents.length() < DruidK8sConstants.MAX_ENV_VARIABLE_KBS) {
+ envVars.add(
+ new EnvVarBuilder()
+ .withName(DruidK8sConstants.TASK_JSON_ENV)
+ .withValue(taskContents)
+ .build()
+ );
+ }
+ mainContainer.getEnv().addAll(envVars);
}
protected Container setupMainContainer(
@@ -403,6 +446,9 @@ private List generateCommand(Task task)
command.add("--loadBroadcastSegments");
command.add("true");
}
+
+ command.add("--taskId");
+ command.add(task.getId());
log.info(
"Peon Command for K8s job: %s",
ForkingTaskRunner.getMaskedCommand(startupLoggingConfig.getMaskProperties(), command)
@@ -433,5 +479,12 @@ static ResourceRequirements getResourceRequirements(ResourceRequirements require
}
return requirements;
}
+
+ @Override
+ public boolean shouldUseDeepStorageForTaskPayload(Task task) throws IOException
+ {
+ String compressedTaskPayload = Base64Compression.compressBase64(mapper.writeValueAsString(task));
+ return compressedTaskPayload.length() > DruidK8sConstants.MAX_ENV_VARIABLE_KBS;
+ }
}
diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/MultiContainerTaskAdapter.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/MultiContainerTaskAdapter.java
index 9cda8f864882..a81154a3bcba 100644
--- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/MultiContainerTaskAdapter.java
+++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/MultiContainerTaskAdapter.java
@@ -43,6 +43,7 @@
import org.apache.druid.k8s.overlord.common.PeonCommandContext;
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.log.StartupLoggingConfig;
+import org.apache.druid.tasklogs.TaskLogs;
import java.io.IOException;
import java.util.Collections;
@@ -59,10 +60,11 @@ public MultiContainerTaskAdapter(
TaskConfig taskConfig,
StartupLoggingConfig startupLoggingConfig,
DruidNode druidNode,
- ObjectMapper mapper
+ ObjectMapper mapper,
+ TaskLogs taskLogs
)
{
- super(client, taskRunnerConfig, taskConfig, startupLoggingConfig, druidNode, mapper);
+ super(client, taskRunnerConfig, taskConfig, startupLoggingConfig, druidNode, mapper, taskLogs);
}
@Override
diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/PodTemplateTaskAdapter.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/PodTemplateTaskAdapter.java
index a3d10f7dcd1c..ef0509a673f8 100644
--- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/PodTemplateTaskAdapter.java
+++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/PodTemplateTaskAdapter.java
@@ -24,8 +24,8 @@
import com.fasterxml.jackson.databind.introspect.AnnotatedClass;
import com.fasterxml.jackson.databind.introspect.AnnotatedClassResolver;
import com.fasterxml.jackson.databind.jsontype.NamedType;
-import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
import io.fabric8.kubernetes.api.model.EnvVar;
import io.fabric8.kubernetes.api.model.EnvVarBuilder;
import io.fabric8.kubernetes.api.model.EnvVarSourceBuilder;
@@ -35,11 +35,13 @@
import io.fabric8.kubernetes.api.model.batch.v1.Job;
import io.fabric8.kubernetes.api.model.batch.v1.JobBuilder;
import io.fabric8.kubernetes.client.utils.Serialization;
+import org.apache.commons.io.IOUtils;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.error.InternalServerError;
import org.apache.druid.guice.IndexingServiceModuleHelper;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.java.util.common.IAE;
-import org.apache.druid.java.util.common.IOE;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.logger.Logger;
@@ -49,12 +51,16 @@
import org.apache.druid.k8s.overlord.common.K8sTaskId;
import org.apache.druid.k8s.overlord.common.KubernetesOverlordUtils;
import org.apache.druid.server.DruidNode;
+import org.apache.druid.tasklogs.TaskLogs;
import java.io.File;
import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.Charset;
import java.nio.file.Files;
import java.util.Collection;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
@@ -85,13 +91,15 @@ public class PodTemplateTaskAdapter implements TaskAdapter
private final DruidNode node;
private final ObjectMapper mapper;
private final HashMap templates;
+ private final TaskLogs taskLogs;
public PodTemplateTaskAdapter(
KubernetesTaskRunnerConfig taskRunnerConfig,
TaskConfig taskConfig,
DruidNode node,
ObjectMapper mapper,
- Properties properties
+ Properties properties,
+ TaskLogs taskLogs
)
{
this.taskRunnerConfig = taskRunnerConfig;
@@ -99,6 +107,7 @@ public PodTemplateTaskAdapter(
this.node = node;
this.mapper = mapper;
this.templates = initializePodTemplates(properties);
+ this.taskLogs = taskLogs;
}
/**
@@ -163,15 +172,44 @@ public Task toTask(Job from) throws IOException
{
Map annotations = from.getSpec().getTemplate().getMetadata().getAnnotations();
if (annotations == null) {
- throw new IOE("No annotations found on pod spec for job [%s]", from.getMetadata().getName());
+ log.info("No annotations found on pod spec for job [%s]. Trying to load task payload from deep storage.", from.getMetadata().getName());
+ return toTaskUsingDeepStorage(from);
}
String task = annotations.get(DruidK8sConstants.TASK);
if (task == null) {
- throw new IOE("No task annotation found on pod spec for job [%s]", from.getMetadata().getName());
+ log.info("No task annotation found on pod spec for job [%s]. Trying to load task payload from deep storage.", from.getMetadata().getName());
+ return toTaskUsingDeepStorage(from);
}
return mapper.readValue(Base64Compression.decompressBase64(task), Task.class);
}
+ private Task toTaskUsingDeepStorage(Job from) throws IOException
+ {
+ com.google.common.base.Optional taskBody = taskLogs.streamTaskPayload(getTaskId(from).getOriginalTaskId());
+ if (!taskBody.isPresent()) {
+ throw InternalServerError.exception(
+ "Could not load task payload from deep storage for job [%s]. Check the overlord logs for errors uploading task payloads to deep storage.",
+ from.getMetadata().getName()
+ );
+ }
+ String task = IOUtils.toString(taskBody.get(), Charset.defaultCharset());
+ return mapper.readValue(task, Task.class);
+ }
+
+ @Override
+ public K8sTaskId getTaskId(Job from)
+ {
+ Map annotations = from.getSpec().getTemplate().getMetadata().getAnnotations();
+ if (annotations == null) {
+ throw DruidException.defensive().build("No annotations found on pod spec for job [%s]", from.getMetadata().getName());
+ }
+ String taskId = annotations.get(DruidK8sConstants.TASK_ID);
+ if (taskId == null) {
+ throw DruidException.defensive().build("No task_id annotation found on pod spec for job [%s]", from.getMetadata().getName());
+ }
+ return new K8sTaskId(taskId);
+ }
+
private HashMap initializePodTemplates(Properties properties)
{
HashMap podTemplateMap = new HashMap<>();
@@ -208,9 +246,9 @@ private Optional loadPodTemplate(String key, Properties properties)
}
}
- private Collection getEnv(Task task)
+ private Collection getEnv(Task task) throws IOException
{
- return ImmutableList.of(
+ List envVars = Lists.newArrayList(
new EnvVarBuilder()
.withName(DruidK8sConstants.TASK_DIR_ENV)
.withValue(taskConfig.getBaseDir())
@@ -219,17 +257,21 @@ private Collection getEnv(Task task)
.withName(DruidK8sConstants.TASK_ID_ENV)
.withValue(task.getId())
.build(),
- new EnvVarBuilder()
- .withName(DruidK8sConstants.TASK_JSON_ENV)
- .withValueFrom(new EnvVarSourceBuilder().withFieldRef(new ObjectFieldSelector(
- null,
- StringUtils.format("metadata.annotations['%s']", DruidK8sConstants.TASK)
- )).build()).build(),
new EnvVarBuilder()
.withName(DruidK8sConstants.LOAD_BROADCAST_SEGMENTS_ENV)
.withValue(Boolean.toString(task.supportsQueries()))
.build()
);
+ if (!shouldUseDeepStorageForTaskPayload(task)) {
+ envVars.add(new EnvVarBuilder()
+ .withName(DruidK8sConstants.TASK_JSON_ENV)
+ .withValueFrom(new EnvVarSourceBuilder().withFieldRef(new ObjectFieldSelector(
+ null,
+ StringUtils.format("metadata.annotations['%s']", DruidK8sConstants.TASK)
+ )).build()).build()
+ );
+ }
+ return envVars;
}
private Map getPodLabels(KubernetesTaskRunnerConfig config, Task task)
@@ -239,14 +281,18 @@ private Map getPodLabels(KubernetesTaskRunnerConfig config, Task
private Map getPodTemplateAnnotations(Task task) throws IOException
{
- return ImmutableMap.builder()
- .put(DruidK8sConstants.TASK, Base64Compression.compressBase64(mapper.writeValueAsString(task)))
+ ImmutableMap.Builder podTemplateAnnotationBuilder = ImmutableMap.builder()
.put(DruidK8sConstants.TLS_ENABLED, String.valueOf(node.isEnableTlsPort()))
.put(DruidK8sConstants.TASK_ID, task.getId())
.put(DruidK8sConstants.TASK_TYPE, task.getType())
.put(DruidK8sConstants.TASK_GROUP_ID, task.getGroupId())
- .put(DruidK8sConstants.TASK_DATASOURCE, task.getDataSource())
- .build();
+ .put(DruidK8sConstants.TASK_DATASOURCE, task.getDataSource());
+
+ if (!shouldUseDeepStorageForTaskPayload(task)) {
+ podTemplateAnnotationBuilder
+ .put(DruidK8sConstants.TASK, Base64Compression.compressBase64(mapper.writeValueAsString(task)));
+ }
+ return podTemplateAnnotationBuilder.build();
}
private Map getJobLabels(KubernetesTaskRunnerConfig config, Task task)
@@ -276,4 +322,11 @@ private String getDruidLabel(String baseLabel)
{
return DruidK8sConstants.DRUID_LABEL_PREFIX + baseLabel;
}
+
+ @Override
+ public boolean shouldUseDeepStorageForTaskPayload(Task task) throws IOException
+ {
+ String compressedTaskPayload = Base64Compression.compressBase64(mapper.writeValueAsString(task));
+ return compressedTaskPayload.length() > DruidK8sConstants.MAX_ENV_VARIABLE_KBS;
+ }
}
diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/SingleContainerTaskAdapter.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/SingleContainerTaskAdapter.java
index c64acd153106..be0588c35b15 100644
--- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/SingleContainerTaskAdapter.java
+++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/SingleContainerTaskAdapter.java
@@ -32,6 +32,7 @@
import org.apache.druid.k8s.overlord.common.PeonCommandContext;
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.log.StartupLoggingConfig;
+import org.apache.druid.tasklogs.TaskLogs;
import java.io.IOException;
import java.util.Collections;
@@ -47,10 +48,11 @@ public SingleContainerTaskAdapter(
TaskConfig taskConfig,
StartupLoggingConfig startupLoggingConfig,
DruidNode druidNode,
- ObjectMapper mapper
+ ObjectMapper mapper,
+ TaskLogs taskLogs
)
{
- super(client, taskRunnerConfig, taskConfig, startupLoggingConfig, druidNode, mapper);
+ super(client, taskRunnerConfig, taskConfig, startupLoggingConfig, druidNode, mapper, taskLogs);
}
@Override
diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/TaskAdapter.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/TaskAdapter.java
index 05933604f2ba..9dacb213cf3f 100644
--- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/TaskAdapter.java
+++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/TaskAdapter.java
@@ -21,6 +21,7 @@
import io.fabric8.kubernetes.api.model.batch.v1.Job;
import org.apache.druid.indexing.common.task.Task;
+import org.apache.druid.k8s.overlord.common.K8sTaskId;
import java.io.IOException;
@@ -31,4 +32,10 @@ public interface TaskAdapter
Task toTask(Job from) throws IOException;
+ K8sTaskId getTaskId(Job from);
+
+ /**
+ * Method for exposing to external classes whether the task has its task payload bundled by the adapter or relies on a external system
+ */
+ boolean shouldUseDeepStorageForTaskPayload(Task task) throws IOException;
}
diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunnerConfigTest.java b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunnerConfigTest.java
new file mode 100644
index 000000000000..8ad631682f95
--- /dev/null
+++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunnerConfigTest.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.k8s.overlord;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.druid.jackson.DefaultObjectMapper;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+
+public class KubernetesAndWorkerTaskRunnerConfigTest
+{
+ @Test
+ public void test_deserializable() throws IOException
+ {
+ ObjectMapper mapper = new DefaultObjectMapper();
+ KubernetesAndWorkerTaskRunnerConfig config = mapper.readValue(
+ this.getClass().getClassLoader().getResource("kubernetesAndWorkerTaskRunnerConfig.json"),
+ KubernetesAndWorkerTaskRunnerConfig.class
+ );
+
+ Assert.assertEquals("worker", config.getRunnerStrategy());
+ Assert.assertEquals("remote", config.getWorkerType());
+ }
+
+ @Test
+ public void test_withDefaults()
+ {
+ KubernetesAndWorkerTaskRunnerConfig config = new KubernetesAndWorkerTaskRunnerConfig(null, null);
+
+ Assert.assertEquals(KubernetesTaskRunnerFactory.TYPE_NAME, config.getRunnerStrategy());
+ }
+}
diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunnerFactoryTest.java b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunnerFactoryTest.java
new file mode 100644
index 000000000000..88696017d056
--- /dev/null
+++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunnerFactoryTest.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+package org.apache.druid.k8s.overlord;
+
+import org.apache.druid.indexing.overlord.RemoteTaskRunnerFactory;
+import org.apache.druid.indexing.overlord.hrtr.HttpRemoteTaskRunnerFactory;
+import org.apache.druid.k8s.overlord.runnerstrategy.KubernetesRunnerStrategy;
+import org.apache.druid.k8s.overlord.runnerstrategy.WorkerRunnerStrategy;
+import org.easymock.EasyMock;
+import org.easymock.EasyMockRunner;
+import org.easymock.EasyMockSupport;
+import org.easymock.Mock;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+
+@RunWith(EasyMockRunner.class)
+public class KubernetesAndWorkerTaskRunnerFactoryTest extends EasyMockSupport
+{
+
+ @Mock KubernetesTaskRunnerFactory kubernetesTaskRunnerFactory;
+ @Mock HttpRemoteTaskRunnerFactory httpRemoteTaskRunnerFactory;
+ @Mock RemoteTaskRunnerFactory remoteTaskRunnerFactory;
+
+ @Test
+ public void test_useHttpTaskRunner_asDefault()
+ {
+ KubernetesAndWorkerTaskRunnerFactory factory = new KubernetesAndWorkerTaskRunnerFactory(
+ kubernetesTaskRunnerFactory,
+ httpRemoteTaskRunnerFactory,
+ remoteTaskRunnerFactory,
+ new KubernetesAndWorkerTaskRunnerConfig(null, null),
+ new WorkerRunnerStrategy()
+ );
+
+ EasyMock.expect(httpRemoteTaskRunnerFactory.build()).andReturn(null);
+ EasyMock.expect(kubernetesTaskRunnerFactory.build()).andReturn(null);
+
+ replayAll();
+ factory.build();
+ verifyAll();
+ }
+
+ @Test
+ public void test_specifyRemoteTaskRunner()
+ {
+ KubernetesAndWorkerTaskRunnerFactory factory = new KubernetesAndWorkerTaskRunnerFactory(
+ kubernetesTaskRunnerFactory,
+ httpRemoteTaskRunnerFactory,
+ remoteTaskRunnerFactory,
+ new KubernetesAndWorkerTaskRunnerConfig(null, "remote"),
+ new WorkerRunnerStrategy()
+ );
+
+ EasyMock.expect(remoteTaskRunnerFactory.build()).andReturn(null);
+ EasyMock.expect(kubernetesTaskRunnerFactory.build()).andReturn(null);
+
+ replayAll();
+ factory.build();
+ verifyAll();
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void test_specifyIncorrectTaskRunner_shouldThrowException()
+ {
+ KubernetesAndWorkerTaskRunnerFactory factory = new KubernetesAndWorkerTaskRunnerFactory(
+ kubernetesTaskRunnerFactory,
+ httpRemoteTaskRunnerFactory,
+ remoteTaskRunnerFactory,
+ new KubernetesAndWorkerTaskRunnerConfig(null, "noop"),
+ new KubernetesRunnerStrategy()
+ );
+
+ EasyMock.expect(remoteTaskRunnerFactory.build()).andReturn(null);
+ EasyMock.expect(kubernetesTaskRunnerFactory.build()).andReturn(null);
+
+ replayAll();
+ factory.build();
+ verifyAll();
+ }
+}
diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunnerTest.java b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunnerTest.java
new file mode 100644
index 000000000000..3ab515cc6e55
--- /dev/null
+++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunnerTest.java
@@ -0,0 +1,374 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+package org.apache.druid.k8s.overlord;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.util.concurrent.Futures;
+import org.apache.commons.io.IOUtils;
+import org.apache.druid.indexer.RunnerTaskState;
+import org.apache.druid.indexer.TaskLocation;
+import org.apache.druid.indexer.TaskStatus;
+import org.apache.druid.indexing.common.task.NoopTask;
+import org.apache.druid.indexing.common.task.Task;
+import org.apache.druid.indexing.overlord.hrtr.HttpRemoteTaskRunner;
+import org.apache.druid.k8s.overlord.runnerstrategy.KubernetesRunnerStrategy;
+import org.apache.druid.k8s.overlord.runnerstrategy.TaskTypeRunnerStrategy;
+import org.apache.druid.k8s.overlord.runnerstrategy.WorkerRunnerStrategy;
+import org.easymock.EasyMock;
+import org.easymock.EasyMockRunner;
+import org.easymock.EasyMockSupport;
+import org.easymock.Mock;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.Charset;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+
+
+@RunWith(EasyMockRunner.class)
+public class KubernetesAndWorkerTaskRunnerTest extends EasyMockSupport
+{
+
+ private static final String ID = "id";
+ private static final String DATA_SOURCE = "dataSource";
+ @Mock KubernetesTaskRunner kubernetesTaskRunner;
+ @Mock HttpRemoteTaskRunner workerTaskRunner;
+
+ KubernetesAndWorkerTaskRunner runner;
+
+ private Task task;
+
+ @Before
+ public void setup()
+ {
+ task = NoopTask.create();
+ runner = new KubernetesAndWorkerTaskRunner(
+ kubernetesTaskRunner,
+ workerTaskRunner,
+ new KubernetesRunnerStrategy()
+ );
+ }
+
+ @Test
+ public void test_runOnKubernetes() throws ExecutionException, InterruptedException
+ {
+ KubernetesAndWorkerTaskRunner kubernetesAndWorkerTaskRunner = new KubernetesAndWorkerTaskRunner(
+ kubernetesTaskRunner,
+ workerTaskRunner,
+ new KubernetesRunnerStrategy()
+ );
+ TaskStatus taskStatus = TaskStatus.success(ID);
+ EasyMock.expect(kubernetesTaskRunner.run(task)).andReturn(Futures.immediateFuture(taskStatus));
+
+ replayAll();
+ Assert.assertEquals(taskStatus, kubernetesAndWorkerTaskRunner.run(task).get());
+ verifyAll();
+ }
+
+ @Test
+ public void test_runOnWorker() throws ExecutionException, InterruptedException
+ {
+ KubernetesAndWorkerTaskRunner kubernetesAndWorkerTaskRunner = new KubernetesAndWorkerTaskRunner(
+ kubernetesTaskRunner,
+ workerTaskRunner,
+ new WorkerRunnerStrategy()
+ );
+ TaskStatus taskStatus = TaskStatus.success(ID);
+ EasyMock.expect(workerTaskRunner.run(task)).andReturn(Futures.immediateFuture(taskStatus));
+
+ replayAll();
+ Assert.assertEquals(taskStatus, kubernetesAndWorkerTaskRunner.run(task).get());
+ verifyAll();
+ }
+
+ @Test
+ public void test_runOnKubernetesOrWorkerBasedOnStrategy() throws ExecutionException, InterruptedException
+ {
+ TaskTypeRunnerStrategy runnerStrategy = new TaskTypeRunnerStrategy("k8s", ImmutableMap.of("index_kafka", "worker"));
+ KubernetesAndWorkerTaskRunner kubernetesAndWorkerTaskRunner = new KubernetesAndWorkerTaskRunner(
+ kubernetesTaskRunner,
+ workerTaskRunner,
+ runnerStrategy
+ );
+ Task taskMock = EasyMock.createMock(Task.class);
+ TaskStatus taskStatus = TaskStatus.success(ID);
+ EasyMock.expect(taskMock.getId()).andReturn(ID).anyTimes();
+
+ EasyMock.expect(taskMock.getType()).andReturn("index_kafka").once();
+ EasyMock.expect(workerTaskRunner.run(taskMock)).andReturn(Futures.immediateFuture(taskStatus)).once();
+ EasyMock.replay(taskMock, workerTaskRunner);
+ Assert.assertEquals(taskStatus, kubernetesAndWorkerTaskRunner.run(taskMock).get());
+ EasyMock.verify(taskMock, workerTaskRunner);
+ EasyMock.reset(taskMock, workerTaskRunner);
+
+ EasyMock.expect(taskMock.getType()).andReturn("compact").once();
+ EasyMock.expect(kubernetesTaskRunner.run(taskMock)).andReturn(Futures.immediateFuture(taskStatus)).once();
+ EasyMock.replay(taskMock, kubernetesTaskRunner);
+ Assert.assertEquals(taskStatus, kubernetesAndWorkerTaskRunner.run(taskMock).get());
+ EasyMock.verify(taskMock, kubernetesTaskRunner);
+ }
+
+ @Test
+ public void test_getUsedCapacity()
+ {
+ EasyMock.expect(kubernetesTaskRunner.getUsedCapacity()).andReturn(1);
+ EasyMock.expect(workerTaskRunner.getUsedCapacity()).andReturn(1);
+
+ replayAll();
+ Assert.assertEquals(2, runner.getUsedCapacity());
+ verifyAll();
+ }
+
+ @Test
+ public void test_getTotalCapacity()
+ {
+ EasyMock.expect(kubernetesTaskRunner.getTotalCapacity()).andReturn(1);
+ EasyMock.expect(workerTaskRunner.getTotalCapacity()).andReturn(1);
+
+ replayAll();
+ Assert.assertEquals(2, runner.getTotalCapacity());
+ verifyAll();
+ }
+
+ @Test
+ public void test_getRunnerTaskState_kubernetes()
+ {
+ RunnerTaskState runnerTaskState = RunnerTaskState.RUNNING;
+ EasyMock.expect(kubernetesTaskRunner.getRunnerTaskState(ID)).andReturn(runnerTaskState);
+
+ replayAll();
+ Assert.assertEquals(runnerTaskState, runner.getRunnerTaskState(ID));
+ verifyAll();
+ }
+
+ @Test
+ public void test_getRunnerTaskState_worker()
+ {
+ RunnerTaskState runnerTaskState = RunnerTaskState.RUNNING;
+ EasyMock.expect(kubernetesTaskRunner.getRunnerTaskState(ID)).andReturn(null);
+ EasyMock.expect(workerTaskRunner.getRunnerTaskState(ID)).andReturn(runnerTaskState);
+
+ replayAll();
+ Assert.assertEquals(runnerTaskState, runner.getRunnerTaskState(ID));
+ verifyAll();
+ }
+
+ @Test
+ public void test_streamTaskLog_kubernetes() throws IOException
+ {
+ InputStream inputStream = IOUtils.toInputStream("inputStream", Charset.defaultCharset());
+ EasyMock.expect(kubernetesTaskRunner.streamTaskLog(ID, 0)).andReturn(Optional.of(inputStream));
+
+ replayAll();
+ Assert.assertEquals(inputStream, runner.streamTaskLog(ID, 0).get());
+ verifyAll();
+ }
+
+ @Test
+ public void test_streamTasklog_worker() throws IOException
+ {
+ InputStream inputStream = IOUtils.toInputStream("inputStream", Charset.defaultCharset());
+ EasyMock.expect(kubernetesTaskRunner.streamTaskLog(ID, 0)).andReturn(Optional.absent());
+ EasyMock.expect(workerTaskRunner.streamTaskLog(ID, 0)).andReturn(Optional.of(inputStream));
+
+ replayAll();
+ Assert.assertEquals(inputStream, runner.streamTaskLog(ID, 0).get());
+ verifyAll();
+ }
+
+ @Test
+ public void test_getBlacklistedTaskSlotCount()
+ {
+ Map kubernetesTaskSlots = ImmutableMap.of("category", 1L);
+ Map workerTaskSlots = ImmutableMap.of("category2", 2L);
+
+ EasyMock.expect(kubernetesTaskRunner.getBlacklistedTaskSlotCount()).andReturn(kubernetesTaskSlots);
+ EasyMock.expect(workerTaskRunner.getBlacklistedTaskSlotCount()).andReturn(workerTaskSlots);
+
+ replayAll();
+ Assert.assertEquals(
+ ImmutableMap.builder().putAll(kubernetesTaskSlots).putAll(workerTaskSlots).build(),
+ runner.getBlacklistedTaskSlotCount()
+ );
+ verifyAll();
+ }
+
+ @Test
+ public void test_getLazyTaskSlotCount()
+ {
+ Map kubernetesTaskSlots = ImmutableMap.of("category", 1L);
+ Map workerTaskSlots = ImmutableMap.of("category2", 2L);
+
+ EasyMock.expect(kubernetesTaskRunner.getLazyTaskSlotCount()).andReturn(kubernetesTaskSlots);
+ EasyMock.expect(workerTaskRunner.getLazyTaskSlotCount()).andReturn(workerTaskSlots);
+
+ replayAll();
+ Assert.assertEquals(
+ ImmutableMap.builder().putAll(kubernetesTaskSlots).putAll(workerTaskSlots).build(),
+ runner.getLazyTaskSlotCount()
+ );
+ verifyAll();
+ }
+
+ @Test
+ public void test_getIdleTaskSlotCount()
+ {
+ Map kubernetesTaskSlots = ImmutableMap.of("category", 1L);
+ Map workerTaskSlots = ImmutableMap.of("category2", 2L);
+
+ EasyMock.expect(kubernetesTaskRunner.getLazyTaskSlotCount()).andReturn(kubernetesTaskSlots);
+ EasyMock.expect(workerTaskRunner.getLazyTaskSlotCount()).andReturn(workerTaskSlots);
+
+ replayAll();
+ Assert.assertEquals(
+ ImmutableMap.builder().putAll(kubernetesTaskSlots).putAll(workerTaskSlots).build(),
+ runner.getLazyTaskSlotCount()
+ );
+ verifyAll();
+ }
+
+ @Test
+ public void test_getTotalTaskSlotCount()
+ {
+ Map kubernetesTaskSlots = ImmutableMap.of("category", 1L);
+ Map workerTaskSlots = ImmutableMap.of("category2", 2L);
+
+ EasyMock.expect(kubernetesTaskRunner.getLazyTaskSlotCount()).andReturn(kubernetesTaskSlots);
+ EasyMock.expect(workerTaskRunner.getLazyTaskSlotCount()).andReturn(workerTaskSlots);
+
+ replayAll();
+ Assert.assertEquals(
+ ImmutableMap.builder().putAll(kubernetesTaskSlots).putAll(workerTaskSlots).build(),
+ runner.getLazyTaskSlotCount()
+ );
+ verifyAll();
+ }
+
+ @Test
+ public void test_getKnownTasks()
+ {
+ EasyMock.expect(kubernetesTaskRunner.getKnownTasks()).andReturn(ImmutableList.of());
+ EasyMock.expect(workerTaskRunner.getKnownTasks()).andReturn(ImmutableList.of());
+
+ replayAll();
+ Assert.assertEquals(
+ 0,
+ runner.getKnownTasks().size()
+ );
+ verifyAll();
+ }
+
+ @Test
+ public void test_getPendingTasks()
+ {
+ EasyMock.expect(kubernetesTaskRunner.getPendingTasks()).andReturn(ImmutableList.of());
+ EasyMock.expect(workerTaskRunner.getPendingTasks()).andReturn(ImmutableList.of());
+
+ replayAll();
+ Assert.assertEquals(
+ 0,
+ runner.getPendingTasks().size()
+ );
+ verifyAll();
+ }
+
+ @Test
+ public void test_getRunningTasks()
+ {
+ EasyMock.expect(kubernetesTaskRunner.getRunningTasks()).andReturn(ImmutableList.of());
+ EasyMock.expect(workerTaskRunner.getRunningTasks()).andReturn(ImmutableList.of());
+
+ replayAll();
+ Assert.assertEquals(
+ 0,
+ runner.getRunningTasks().size()
+ );
+ verifyAll();
+ }
+
+ @Test
+ public void test_shutdown()
+ {
+ String reason = "reason";
+ kubernetesTaskRunner.shutdown(ID, reason);
+ workerTaskRunner.shutdown(ID, reason);
+ replayAll();
+ runner.shutdown(ID, reason);
+ verifyAll();
+ }
+
+ @Test
+ public void test_restore()
+ {
+ EasyMock.expect(kubernetesTaskRunner.restore()).andReturn(ImmutableList.of());
+ EasyMock.expect(workerTaskRunner.restore()).andReturn(ImmutableList.of());
+ replayAll();
+ Assert.assertEquals(0, runner.restore().size());
+ verifyAll();
+ }
+
+ @Test
+ public void test_getTaskLocation_kubernetes()
+ {
+ TaskLocation kubernetesTaskLocation = TaskLocation.create("host", 0, 0, false);
+ EasyMock.expect(kubernetesTaskRunner.getTaskLocation(ID)).andReturn(kubernetesTaskLocation);
+ replayAll();
+ Assert.assertEquals(kubernetesTaskLocation, runner.getTaskLocation(ID));
+ verifyAll();
+ }
+
+ @Test
+ public void test_getTaskLocation_worker()
+ {
+ TaskLocation workerTaskLocation = TaskLocation.create("host", 0, 0, false);
+ EasyMock.expect(kubernetesTaskRunner.getTaskLocation(ID)).andReturn(TaskLocation.unknown());
+ EasyMock.expect(workerTaskRunner.getTaskLocation(ID)).andReturn(workerTaskLocation);
+
+ replayAll();
+ Assert.assertEquals(workerTaskLocation, runner.getTaskLocation(ID));
+ verifyAll();
+ }
+
+ @Test
+ public void test_updateStatus()
+ {
+ kubernetesTaskRunner.updateStatus(task, TaskStatus.running(ID));
+ replayAll();
+ runner.updateStatus(task, TaskStatus.running(ID));
+ verifyAll();
+ }
+
+ @Test
+ public void test_updateLocation()
+ {
+ kubernetesTaskRunner.updateLocation(task, TaskLocation.unknown());
+ replayAll();
+ runner.updateLocation(task, TaskLocation.unknown());
+ verifyAll();
+ }
+}
diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycleTest.java b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycleTest.java
index 3a017e5f74ff..1c6e429a3dc3 100644
--- a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycleTest.java
+++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycleTest.java
@@ -30,10 +30,10 @@
import org.apache.druid.indexer.TaskLocation;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.TestUtils;
-import org.apache.druid.indexing.common.task.NoopTask;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.k8s.overlord.common.JobResponse;
import org.apache.druid.k8s.overlord.common.K8sTaskId;
+import org.apache.druid.k8s.overlord.common.K8sTestUtils;
import org.apache.druid.k8s.overlord.common.KubernetesPeonClient;
import org.apache.druid.k8s.overlord.common.PeonPhase;
import org.apache.druid.tasklogs.TaskLogs;
@@ -73,13 +73,13 @@ public class KubernetesPeonLifecycleTest extends EasyMockSupport
public void setup()
{
mapper = new TestUtils().getTestObjectMapper();
- task = NoopTask.create(ID, 0);
+ task = K8sTestUtils.createTask(ID, 0);
k8sTaskId = new K8sTaskId(task);
EasyMock.expect(logWatch.getOutput()).andReturn(IOUtils.toInputStream("", StandardCharsets.UTF_8)).anyTimes();
}
@Test
- public void test_run()
+ public void test_run() throws IOException
{
KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle(
task,
@@ -114,7 +114,7 @@ protected synchronized TaskStatus join(long timeout)
replayAll();
- TaskStatus taskStatus = peonLifecycle.run(job, 0L, 0L);
+ TaskStatus taskStatus = peonLifecycle.run(job, 0L, 0L, false);
verifyAll();
@@ -124,7 +124,51 @@ protected synchronized TaskStatus join(long timeout)
}
@Test
- public void test_run_whenCalledMultipleTimes_raisesIllegalStateException()
+ public void test_run_useTaskManager() throws IOException
+ {
+ KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle(
+ task,
+ kubernetesClient,
+ taskLogs,
+ mapper,
+ stateListener
+ )
+ {
+ @Override
+ protected synchronized TaskStatus join(long timeout)
+ {
+ return TaskStatus.success(ID);
+ }
+ };
+
+ Job job = new JobBuilder().withNewMetadata().withName(ID).endMetadata().build();
+
+ EasyMock.expect(kubernetesClient.launchPeonJobAndWaitForStart(
+ EasyMock.eq(job),
+ EasyMock.eq(task),
+ EasyMock.anyLong(),
+ EasyMock.eq(TimeUnit.MILLISECONDS)
+ )).andReturn(null);
+ Assert.assertEquals(KubernetesPeonLifecycle.State.NOT_STARTED, peonLifecycle.getState());
+
+ stateListener.stateChanged(KubernetesPeonLifecycle.State.PENDING, ID);
+ EasyMock.expectLastCall().once();
+ stateListener.stateChanged(KubernetesPeonLifecycle.State.STOPPED, ID);
+ EasyMock.expectLastCall().once();
+
+ taskLogs.pushTaskPayload(EasyMock.anyString(), EasyMock.anyObject());
+ replayAll();
+
+ TaskStatus taskStatus = peonLifecycle.run(job, 0L, 0L, true);
+
+ verifyAll();
+ Assert.assertTrue(taskStatus.isSuccess());
+ Assert.assertEquals(ID, taskStatus.getId());
+ Assert.assertEquals(KubernetesPeonLifecycle.State.STOPPED, peonLifecycle.getState());
+ }
+
+ @Test
+ public void test_run_whenCalledMultipleTimes_raisesIllegalStateException() throws IOException
{
KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle(
task,
@@ -159,12 +203,12 @@ protected synchronized TaskStatus join(long timeout)
replayAll();
- peonLifecycle.run(job, 0L, 0L);
+ peonLifecycle.run(job, 0L, 0L, false);
Assert.assertThrows(
"Task [id] failed to run: invalid peon lifecycle state transition [STOPPED]->[PENDING]",
IllegalStateException.class,
- () -> peonLifecycle.run(job, 0L, 0L)
+ () -> peonLifecycle.run(job, 0L, 0L, false)
);
verifyAll();
@@ -198,9 +242,6 @@ protected synchronized TaskStatus join(long timeout)
EasyMock.anyLong(),
EasyMock.eq(TimeUnit.MILLISECONDS)
)).andReturn(null);
- EasyMock.expect(kubernetesClient.deletePeonJob(
- new K8sTaskId(ID)
- )).andReturn(true);
Assert.assertEquals(KubernetesPeonLifecycle.State.NOT_STARTED, peonLifecycle.getState());
stateListener.stateChanged(KubernetesPeonLifecycle.State.PENDING, ID);
EasyMock.expectLastCall().once();
@@ -211,7 +252,7 @@ protected synchronized TaskStatus join(long timeout)
Assert.assertThrows(
Exception.class,
- () -> peonLifecycle.run(job, 0L, 0L)
+ () -> peonLifecycle.run(job, 0L, 0L, false)
);
verifyAll();
@@ -245,7 +286,6 @@ public void test_join_withoutJob_returnsFailedTaskStatus() throws IOException
EasyMock.expectLastCall().once();
logWatch.close();
EasyMock.expectLastCall();
- EasyMock.expect(kubernetesClient.deletePeonJob(k8sTaskId)).andReturn(true);
replayAll();
@@ -298,7 +338,6 @@ public void test_join() throws IOException
EasyMock.expectLastCall().once();
logWatch.close();
EasyMock.expectLastCall();
- EasyMock.expect(kubernetesClient.deletePeonJob(k8sTaskId)).andReturn(true);
Assert.assertEquals(KubernetesPeonLifecycle.State.NOT_STARTED, peonLifecycle.getState());
@@ -353,7 +392,6 @@ public void test_join_whenCalledMultipleTimes_raisesIllegalStateException() thro
EasyMock.expectLastCall().once();
logWatch.close();
EasyMock.expectLastCall();
- EasyMock.expect(kubernetesClient.deletePeonJob(k8sTaskId)).andReturn(true);
Assert.assertEquals(KubernetesPeonLifecycle.State.NOT_STARTED, peonLifecycle.getState());
@@ -408,7 +446,6 @@ public void test_join_withoutTaskStatus_returnsFailedTaskStatus() throws IOExcep
EasyMock.expectLastCall().once();
logWatch.close();
EasyMock.expectLastCall();
- EasyMock.expect(kubernetesClient.deletePeonJob(k8sTaskId)).andReturn(true);
Assert.assertEquals(KubernetesPeonLifecycle.State.NOT_STARTED, peonLifecycle.getState());
@@ -459,7 +496,6 @@ public void test_join_whenIOExceptionThrownWhileStreamingTaskStatus_returnsFaile
EasyMock.expectLastCall().once();
logWatch.close();
EasyMock.expectLastCall();
- EasyMock.expect(kubernetesClient.deletePeonJob(k8sTaskId)).andReturn(true);
Assert.assertEquals(KubernetesPeonLifecycle.State.NOT_STARTED, peonLifecycle.getState());
@@ -512,7 +548,6 @@ public void test_join_whenIOExceptionThrownWhileStreamingTaskLogs_isIgnored() th
EasyMock.expectLastCall().once();
logWatch.close();
EasyMock.expectLastCall();
- EasyMock.expect(kubernetesClient.deletePeonJob(k8sTaskId)).andReturn(true);
Assert.assertEquals(KubernetesPeonLifecycle.State.NOT_STARTED, peonLifecycle.getState());
@@ -554,8 +589,6 @@ public void test_join_whenRuntimeExceptionThrownWhileWaitingForKubernetesJob_thr
logWatch.close();
EasyMock.expectLastCall();
- EasyMock.expect(kubernetesClient.deletePeonJob(k8sTaskId)).andReturn(true);
-
Assert.assertEquals(KubernetesPeonLifecycle.State.NOT_STARTED, peonLifecycle.getState());
replayAll();
@@ -908,8 +941,11 @@ public void test_getTaskLocation_withStoppedTaskState_returnsUnknown()
stateListener
);
setPeonLifecycleState(peonLifecycle, KubernetesPeonLifecycle.State.STOPPED);
+ EasyMock.expect(kubernetesClient.getPeonPod(k8sTaskId.getK8sJobName())).andReturn(Optional.absent()).once();
+ replayAll();
Assert.assertEquals(TaskLocation.unknown(), peonLifecycle.getTaskLocation());
+ verifyAll();
}
private void setPeonLifecycleState(KubernetesPeonLifecycle peonLifecycle, KubernetesPeonLifecycle.State state)
diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java
index ca1fc641719e..36a7b4cfcd9c 100644
--- a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java
+++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java
@@ -29,15 +29,14 @@
import org.apache.druid.indexer.RunnerTaskState;
import org.apache.druid.indexer.TaskLocation;
import org.apache.druid.indexer.TaskStatus;
-import org.apache.druid.indexing.common.task.NoopTask;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.overlord.TaskRunnerWorkItem;
-import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.emitter.service.ServiceEventBuilder;
import org.apache.druid.java.util.http.client.HttpClient;
import org.apache.druid.java.util.http.client.Request;
import org.apache.druid.java.util.http.client.response.InputStreamResponseHandler;
+import org.apache.druid.k8s.overlord.common.K8sTestUtils;
import org.apache.druid.k8s.overlord.common.KubernetesPeonClient;
import org.apache.druid.k8s.overlord.taskadapter.TaskAdapter;
import org.easymock.EasyMock;
@@ -56,7 +55,6 @@
import java.nio.charset.StandardCharsets;
import java.util.Collection;
import java.util.Collections;
-import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
@@ -89,7 +87,7 @@ public void setup()
.withCapacity(1)
.build();
- task = NoopTask.create(ID, 0);
+ task = K8sTestUtils.createTask(ID, 0);
runner = new KubernetesTaskRunner(
taskAdapter,
@@ -101,6 +99,89 @@ public void setup()
);
}
+ @Test
+ public void test_start_withExistingJobs() throws IOException
+ {
+ KubernetesTaskRunner runner = new KubernetesTaskRunner(
+ taskAdapter,
+ config,
+ peonClient,
+ httpClient,
+ new TestPeonLifecycleFactory(kubernetesPeonLifecycle),
+ emitter
+ )
+ {
+ @Override
+ protected ListenableFuture joinAsync(Task task)
+ {
+ return tasks.computeIfAbsent(
+ task.getId(),
+ k -> new KubernetesWorkItem(
+ task,
+ Futures.immediateFuture(TaskStatus.success(task.getId()))
+ )
+ ).getResult();
+ }
+ };
+
+ Job job = new JobBuilder()
+ .withNewMetadata()
+ .withName(ID)
+ .endMetadata()
+ .build();
+
+ EasyMock.expect(peonClient.getPeonJobs()).andReturn(ImmutableList.of(job));
+ EasyMock.expect(taskAdapter.toTask(job)).andReturn(task);
+
+ replayAll();
+
+ runner.start();
+
+ verifyAll();
+
+ Assert.assertNotNull(runner.tasks);
+ Assert.assertEquals(1, runner.tasks.size());
+ }
+
+ @Test
+ public void test_start_whenDeserializationExceptionThrown_isIgnored() throws IOException
+ {
+ KubernetesTaskRunner runner = new KubernetesTaskRunner(
+ taskAdapter,
+ config,
+ peonClient,
+ httpClient,
+ new TestPeonLifecycleFactory(kubernetesPeonLifecycle),
+ emitter
+ )
+ {
+ @Override
+ protected ListenableFuture joinAsync(Task task)
+ {
+ return tasks.computeIfAbsent(task.getId(), k -> new KubernetesWorkItem(task, null))
+ .getResult();
+ }
+ };
+
+ Job job = new JobBuilder()
+ .withNewMetadata()
+ .withName(ID)
+ .endMetadata()
+ .build();
+
+ EasyMock.expect(peonClient.getPeonJobs()).andReturn(ImmutableList.of(job));
+ EasyMock.expect(taskAdapter.toTask(job)).andThrow(new IOException());
+
+ replayAll();
+
+ runner.start();
+
+ verifyAll();
+
+ Assert.assertNotNull(runner.tasks);
+ Assert.assertEquals(0, runner.tasks.size());
+ }
+
@Test
public void test_streamTaskLog_withoutExistingTask_returnsEmptyOptional()
{
@@ -140,10 +221,12 @@ public void test_run_withoutExistingTask() throws IOException, ExecutionExceptio
TaskStatus taskStatus = TaskStatus.success(task.getId());
EasyMock.expect(taskAdapter.fromTask(task)).andReturn(job);
+ EasyMock.expect(taskAdapter.shouldUseDeepStorageForTaskPayload(task)).andReturn(false);
EasyMock.expect(kubernetesPeonLifecycle.run(
EasyMock.eq(job),
EasyMock.anyLong(),
- EasyMock.anyLong()
+ EasyMock.anyLong(),
+ EasyMock.anyBoolean()
)).andReturn(taskStatus);
replayAll();
@@ -152,8 +235,6 @@ public void test_run_withoutExistingTask() throws IOException, ExecutionExceptio
Assert.assertEquals(taskStatus, future.get());
verifyAll();
-
- Assert.assertFalse(runner.tasks.containsKey(task.getId()));
}
@Test
@@ -177,10 +258,12 @@ public void test_run_whenExceptionThrown_throwsRuntimeException() throws IOExcep
.build();
EasyMock.expect(taskAdapter.fromTask(task)).andReturn(job);
+ EasyMock.expect(taskAdapter.shouldUseDeepStorageForTaskPayload(task)).andReturn(false);
EasyMock.expect(kubernetesPeonLifecycle.run(
EasyMock.eq(job),
EasyMock.anyLong(),
- EasyMock.anyLong()
+ EasyMock.anyLong(),
+ EasyMock.anyBoolean()
)).andThrow(new IllegalStateException());
replayAll();
@@ -191,8 +274,6 @@ public void test_run_whenExceptionThrown_throwsRuntimeException() throws IOExcep
Assert.assertTrue(e.getCause() instanceof RuntimeException);
verifyAll();
-
- Assert.assertFalse(runner.tasks.containsKey(task.getId()));
}
@Test
@@ -208,8 +289,6 @@ public void test_join_withoutExistingTask() throws ExecutionException, Interrupt
Assert.assertEquals(taskStatus, future.get());
verifyAll();
-
- Assert.assertFalse(runner.tasks.containsKey(task.getId()));
}
@Test
@@ -236,28 +315,11 @@ public void test_join_whenExceptionThrown_throwsRuntimeException()
Assert.assertTrue(e.getCause() instanceof RuntimeException);
verifyAll();
-
- Assert.assertFalse(runner.tasks.containsKey(task.getId()));
- }
-
- @Test
- public void test_doTask_withoutWorkItem_throwsRuntimeException()
- {
- Assert.assertThrows(
- "Task [id] disappeared",
- RuntimeException.class,
- () -> runner.doTask(task, true)
- );
}
@Test
public void test_doTask_whenShutdownRequested_throwsRuntimeException()
{
- KubernetesWorkItem workItem = new KubernetesWorkItem(task, null);
- workItem.shutdown();
-
- runner.tasks.put(task.getId(), workItem);
-
Assert.assertThrows(
"Task [id] has been shut down",
RuntimeException.class,
@@ -266,13 +328,7 @@ public void test_doTask_whenShutdownRequested_throwsRuntimeException()
}
@Test
- public void test_shutdown_withoutExistingTask()
- {
- runner.shutdown(task.getId(), "");
- }
-
- @Test
- public void test_shutdown_withExistingTask()
+ public void test_shutdown_withExistingTask_removesTaskFromMap()
{
KubernetesWorkItem workItem = new KubernetesWorkItem(task, null) {
@Override
@@ -282,82 +338,14 @@ protected synchronized void shutdown()
};
runner.tasks.put(task.getId(), workItem);
-
runner.shutdown(task.getId(), "");
+ Assert.assertTrue(runner.tasks.isEmpty());
}
@Test
- public void test_restore_withExistingJobs() throws IOException
- {
- KubernetesTaskRunner runner = new KubernetesTaskRunner(
- taskAdapter,
- config,
- peonClient,
- httpClient,
- new TestPeonLifecycleFactory(kubernetesPeonLifecycle),
- emitter
- ) {
- @Override
- protected ListenableFuture joinAsync(Task task)
- {
- return new KubernetesWorkItem(task, null).getResult();
- }
- };
-
- Job job = new JobBuilder()
- .withNewMetadata()
- .withName(ID)
- .endMetadata()
- .build();
-
- EasyMock.expect(peonClient.getPeonJobs()).andReturn(ImmutableList.of(job));
- EasyMock.expect(taskAdapter.toTask(job)).andReturn(task);
-
- replayAll();
-
- List>> tasks = runner.restore();
-
- verifyAll();
-
- Assert.assertNotNull(tasks);
- Assert.assertEquals(1, tasks.size());
- }
-
- @Test
- public void test_restore_whenDeserializationExceptionThrown_isIgnored() throws IOException
+ public void test_shutdown_withoutExistingTask()
{
- KubernetesTaskRunner runner = new KubernetesTaskRunner(
- taskAdapter,
- config,
- peonClient,
- httpClient,
- new TestPeonLifecycleFactory(kubernetesPeonLifecycle),
- emitter
- ) {
- @Override
- protected ListenableFuture joinAsync(Task task)
- {
- return new KubernetesWorkItem(task, null).getResult();
- }
- };
-
- Job job = new JobBuilder()
- .withNewMetadata()
- .withName(ID)
- .endMetadata()
- .build();
-
- EasyMock.expect(peonClient.getPeonJobs()).andReturn(ImmutableList.of(job));
- EasyMock.expect(taskAdapter.toTask(job)).andThrow(new IOException());
-
- replayAll();
-
- List>> tasks = runner.restore();
-
- verifyAll();
-
- Assert.assertNotNull(tasks);
- Assert.assertEquals(0, tasks.size());
+ runner.shutdown(task.getId(), "");
}
@Test
@@ -390,7 +378,7 @@ public void test_getKnownTasks()
@Test
public void test_getRunningTasks()
{
- Task pendingTask = NoopTask.create("pending-id", 0);
+ Task pendingTask = K8sTestUtils.createTask("pending-id", 0);
KubernetesWorkItem pendingWorkItem = new KubernetesWorkItem(pendingTask, null) {
@Override
protected RunnerTaskState getRunnerTaskState()
@@ -400,7 +388,7 @@ protected RunnerTaskState getRunnerTaskState()
};
runner.tasks.put(pendingTask.getId(), pendingWorkItem);
- Task runningTask = NoopTask.create("running-id", 0);
+ Task runningTask = K8sTestUtils.createTask("running-id", 0);
KubernetesWorkItem runningWorkItem = new KubernetesWorkItem(runningTask, null) {
@Override
protected RunnerTaskState getRunnerTaskState()
@@ -419,7 +407,7 @@ protected RunnerTaskState getRunnerTaskState()
@Test
public void test_getPendingTasks()
{
- Task pendingTask = NoopTask.create("pending-id", 0);
+ Task pendingTask = K8sTestUtils.createTask("pending-id", 0);
KubernetesWorkItem pendingWorkItem = new KubernetesWorkItem(pendingTask, null) {
@Override
protected RunnerTaskState getRunnerTaskState()
@@ -429,7 +417,7 @@ protected RunnerTaskState getRunnerTaskState()
};
runner.tasks.put(pendingTask.getId(), pendingWorkItem);
- Task runningTask = NoopTask.create("running-id", 0);
+ Task runningTask = K8sTestUtils.createTask("running-id", 0);
KubernetesWorkItem runningWorkItem = new KubernetesWorkItem(runningTask, null) {
@Override
protected RunnerTaskState getRunnerTaskState()
@@ -629,6 +617,30 @@ public TaskLocation getLocation()
verifyAll();
}
+ @Test
+ public void test_getTaskLocation_withExistingTask()
+ {
+ KubernetesWorkItem workItem = new KubernetesWorkItem(task, null) {
+ @Override
+ public TaskLocation getLocation()
+ {
+ return TaskLocation.create("host", 0, 1, false);
+ }
+ };
+
+ runner.tasks.put(task.getId(), workItem);
+
+ TaskLocation taskLocation = runner.getTaskLocation(task.getId());
+ Assert.assertEquals(TaskLocation.create("host", 0, 1, false), taskLocation);
+ }
+
+ @Test
+ public void test_getTaskLocation_noTaskFound()
+ {
+ TaskLocation taskLocation = runner.getTaskLocation(task.getId());
+ Assert.assertEquals(TaskLocation.unknown(), taskLocation);
+ }
+
@Test
public void test_getTotalCapacity()
{
@@ -644,6 +656,5 @@ public void test_getUsedCapacity()
Assert.assertEquals(1, runner.getUsedCapacity());
runner.tasks.remove(task.getId());
Assert.assertEquals(0, runner.getUsedCapacity());
-
}
}
diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesWorkItemTest.java b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesWorkItemTest.java
index 5f951770480f..7d17193b1714 100644
--- a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesWorkItemTest.java
+++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesWorkItemTest.java
@@ -44,7 +44,7 @@ public class KubernetesWorkItemTest extends EasyMockSupport
@Before
public void setup()
{
- task = NoopTask.create("id", 0);
+ task = NoopTask.create();
workItem = new KubernetesWorkItem(task, null);
}
@@ -75,7 +75,6 @@ public void test_setKubernetesPeonLifecycleTwice_throwsIllegalStateException()
public void test_shutdown_withoutKubernetesPeonLifecycle()
{
workItem.shutdown();
- Assert.assertTrue(workItem.isShutdownRequested());
}
@Test
@@ -91,7 +90,6 @@ public void test_shutdown_withKubernetesPeonLifecycle()
workItem.shutdown();
verifyAll();
- Assert.assertTrue(workItem.isShutdownRequested());
}
@Test
diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/K8sTestUtils.java b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/K8sTestUtils.java
index c0affe9573c3..ab5379ffa6fa 100644
--- a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/K8sTestUtils.java
+++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/K8sTestUtils.java
@@ -29,7 +29,9 @@
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
import org.apache.druid.indexing.common.task.IndexTask;
+import org.apache.druid.indexing.common.task.NoopTask;
import org.apache.druid.indexing.common.task.Task;
+import org.apache.druid.indexing.common.task.Tasks;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.query.aggregation.AggregatorFactory;
@@ -39,6 +41,7 @@
import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
import java.io.File;
+import java.util.Collections;
public class K8sTestUtils
@@ -120,4 +123,9 @@ public static T fileToResource(String contents, Class type)
type
);
}
+
+ public static NoopTask createTask(String id, int priority)
+ {
+ return new NoopTask(id, null, null, 0, 0, Collections.singletonMap(Tasks.PRIORITY_KEY, priority));
+ }
}
diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/runnerstrategy/KubernetesRunnerStrategyTest.java b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/runnerstrategy/KubernetesRunnerStrategyTest.java
new file mode 100644
index 000000000000..880d5528ac7b
--- /dev/null
+++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/runnerstrategy/KubernetesRunnerStrategyTest.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.k8s.overlord.runnerstrategy;
+
+import org.apache.druid.indexing.common.task.Task;
+import org.easymock.EasyMockRunner;
+import org.easymock.EasyMockSupport;
+import org.easymock.Mock;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+@RunWith(EasyMockRunner.class)
+public class KubernetesRunnerStrategyTest extends EasyMockSupport
+{
+ @Mock
+ Task task;
+
+ @Test
+ public void test_kubernetesRunnerStrategy_returnsCorrectRunnerType()
+ {
+ KubernetesRunnerStrategy runnerStrategy = new KubernetesRunnerStrategy();
+
+ Assert.assertEquals(RunnerStrategy.RunnerType.KUBERNETES_RUNNER_TYPE, runnerStrategy.getRunnerTypeForTask(task));
+ }
+}
diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/runnerstrategy/TaskTypeRunnerStrategyTest.java b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/runnerstrategy/TaskTypeRunnerStrategyTest.java
new file mode 100644
index 000000000000..a32630ed6144
--- /dev/null
+++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/runnerstrategy/TaskTypeRunnerStrategyTest.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.k8s.overlord.runnerstrategy;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.druid.indexing.common.task.Task;
+import org.apache.druid.k8s.overlord.KubernetesTaskRunnerFactory;
+import org.easymock.EasyMock;
+import org.easymock.EasyMockRunner;
+import org.easymock.EasyMockSupport;
+import org.easymock.Mock;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+@RunWith(EasyMockRunner.class)
+public class TaskTypeRunnerStrategyTest extends EasyMockSupport
+{
+ @Mock
+ Task task;
+
+ @Test
+ public void test_taskTypeRunnerStrategy_returnsCorrectRunnerType()
+ {
+ TaskTypeRunnerStrategy runnerStrategy = new TaskTypeRunnerStrategy("k8s", ImmutableMap.of("index_kafka", "worker"));
+ EasyMock.expect(task.getType()).andReturn("index_kafka");
+ EasyMock.expectLastCall().once();
+ EasyMock.expect(task.getType()).andReturn("compact");
+ EasyMock.expectLastCall().once();
+ replayAll();
+ Assert.assertEquals(RunnerStrategy.WORKER_NAME, runnerStrategy.getRunnerTypeForTask(task).getType());
+ Assert.assertEquals(KubernetesTaskRunnerFactory.TYPE_NAME, runnerStrategy.getRunnerTypeForTask(task).getType());
+ verifyAll();
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void test_invalidOverridesConfig_shouldThrowException()
+ {
+ new TaskTypeRunnerStrategy(
+ "k8s",
+ ImmutableMap.of(
+ "index_kafka",
+ "non_exist_runner"
+ )
+ );
+ }
+}
diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/runnerstrategy/WorkerRunnerStrategyTest.java b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/runnerstrategy/WorkerRunnerStrategyTest.java
new file mode 100644
index 000000000000..1a3ae34fc6a7
--- /dev/null
+++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/runnerstrategy/WorkerRunnerStrategyTest.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.k8s.overlord.runnerstrategy;
+
+
+import org.apache.druid.indexing.common.task.Task;
+import org.easymock.EasyMockRunner;
+import org.easymock.EasyMockSupport;
+import org.easymock.Mock;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+@RunWith(EasyMockRunner.class)
+public class WorkerRunnerStrategyTest extends EasyMockSupport
+{
+ @Mock
+ Task task;
+
+ @Test
+ public void test_workerRunnerStrategy_returnsCorrectRunnerType()
+ {
+ WorkerRunnerStrategy runnerStrategy = new WorkerRunnerStrategy();
+ Assert.assertEquals(RunnerStrategy.WORKER_NAME, runnerStrategy.getRunnerTypeForTask(task).getType());
+ }
+}
diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/DruidPeonClientIntegrationTest.java b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/DruidPeonClientIntegrationTest.java
index 22e2311bbbae..098161685883 100644
--- a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/DruidPeonClientIntegrationTest.java
+++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/DruidPeonClientIntegrationTest.java
@@ -129,7 +129,8 @@ public void testDeployingSomethingToKind(@TempDir Path tempDir) throws Exception
taskConfig,
startupLoggingConfig,
druidNode,
- jsonMapper
+ jsonMapper,
+ null
);
String taskBasePath = "/home/taskDir";
PeonCommandContext context = new PeonCommandContext(Collections.singletonList(
diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/K8sTaskAdapterTest.java b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/K8sTaskAdapterTest.java
index 519e7177cbe5..c1100ccc29d1 100644
--- a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/K8sTaskAdapterTest.java
+++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/K8sTaskAdapterTest.java
@@ -37,10 +37,13 @@
import io.fabric8.kubernetes.api.model.ResourceRequirements;
import io.fabric8.kubernetes.api.model.ResourceRequirementsBuilder;
import io.fabric8.kubernetes.api.model.batch.v1.Job;
+import io.fabric8.kubernetes.api.model.batch.v1.JobBuilder;
import io.fabric8.kubernetes.api.model.batch.v1.JobList;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
+import org.apache.commons.lang.RandomStringUtils;
import org.apache.commons.lang.StringUtils;
+import org.apache.druid.error.DruidException;
import org.apache.druid.guice.FirehoseModule;
import org.apache.druid.indexing.common.TestUtils;
import org.apache.druid.indexing.common.config.TaskConfig;
@@ -52,6 +55,7 @@
import org.apache.druid.java.util.common.HumanReadableBytes;
import org.apache.druid.k8s.overlord.KubernetesTaskRunnerConfig;
import org.apache.druid.k8s.overlord.common.DruidK8sConstants;
+import org.apache.druid.k8s.overlord.common.K8sTaskId;
import org.apache.druid.k8s.overlord.common.K8sTestUtils;
import org.apache.druid.k8s.overlord.common.KubernetesExecutor;
import org.apache.druid.k8s.overlord.common.KubernetesResourceNotFoundException;
@@ -59,15 +63,23 @@
import org.apache.druid.k8s.overlord.common.TestKubernetesClient;
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.log.StartupLoggingConfig;
+import org.apache.druid.tasklogs.NoopTaskLogs;
+import org.apache.druid.tasklogs.TaskLogs;
+import org.junit.Assert;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.IOException;
+import java.nio.charset.Charset;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
+import java.util.stream.Collectors;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -82,6 +94,8 @@ class K8sTaskAdapterTest
private final TaskConfig taskConfig;
private final DruidNode node;
private final ObjectMapper jsonMapper;
+ private final TaskLogs taskLogs;
+
public K8sTaskAdapterTest()
{
@@ -105,6 +119,7 @@ public K8sTaskAdapterTest()
);
startupLoggingConfig = new StartupLoggingConfig();
taskConfig = new TaskConfigBuilder().setBaseDir("src/test/resources").build();
+ taskLogs = new NoopTaskLogs();
}
@Test
@@ -139,7 +154,9 @@ public PodSpec getSpec()
taskConfig,
startupLoggingConfig,
node,
- jsonMapper
+ jsonMapper,
+ taskLogs
+
);
Task task = K8sTestUtils.getTask();
Job jobFromSpec = adapter.fromTask(task);
@@ -166,7 +183,8 @@ public void serializingAndDeserializingATask() throws IOException
taskConfig,
startupLoggingConfig,
node,
- jsonMapper
+ jsonMapper,
+ taskLogs
);
Task task = K8sTestUtils.getTask();
Job jobFromSpec = adapter.createJobFromPodSpec(
@@ -189,6 +207,169 @@ public void serializingAndDeserializingATask() throws IOException
assertEquals(task, taskFromJob);
}
+ @Test
+ public void fromTask_dontSetTaskJSON() throws IOException
+ {
+ final PodSpec podSpec = K8sTestUtils.getDummyPodSpec();
+ TestKubernetesClient testClient = new TestKubernetesClient(client)
+ {
+ @SuppressWarnings("unchecked")
+ @Override
+ public T executeRequest(KubernetesExecutor executor) throws KubernetesResourceNotFoundException
+ {
+ return (T) new Pod()
+ {
+ @Override
+ public PodSpec getSpec()
+ {
+ return podSpec;
+ }
+ };
+ }
+ };
+
+ KubernetesTaskRunnerConfig config = KubernetesTaskRunnerConfig.builder()
+ .withNamespace("test")
+ .build();
+ K8sTaskAdapter adapter = new SingleContainerTaskAdapter(
+ testClient,
+ config,
+ taskConfig,
+ startupLoggingConfig,
+ node,
+ jsonMapper,
+ taskLogs
+ );
+ Task task = new NoopTask(
+ "id",
+ "id",
+ "datasource",
+ 0,
+ 0,
+ ImmutableMap.of("context", RandomStringUtils.randomAlphanumeric((int) DruidK8sConstants.MAX_ENV_VARIABLE_KBS * 20))
+ );
+ Job job = adapter.fromTask(task);
+ // TASK_JSON should not be set in env variables
+ Assertions.assertFalse(
+ job.getSpec()
+ .getTemplate()
+ .getSpec()
+ .getContainers()
+ .get(0).getEnv()
+ .stream().anyMatch(env -> env.getName().equals(DruidK8sConstants.TASK_JSON_ENV))
+ );
+
+ // --taskId should be passed to the peon command args
+ Assertions.assertTrue(
+ Arrays.stream(job.getSpec()
+ .getTemplate()
+ .getSpec()
+ .getContainers()
+ .get(0)
+ .getArgs()
+ .get(0).split(" ")).collect(Collectors.toSet())
+ .containsAll(ImmutableList.of("--taskId", task.getId()))
+ );
+ }
+
+ @Test
+ public void toTask_useTaskPayloadManager() throws IOException
+ {
+ TestKubernetesClient testClient = new TestKubernetesClient(client);
+ KubernetesTaskRunnerConfig config = KubernetesTaskRunnerConfig.builder()
+ .withNamespace("test")
+ .build();
+ Task taskInTaskPayloadManager = K8sTestUtils.getTask();
+ TaskLogs mockTestLogs = Mockito.mock(TaskLogs.class);
+ Mockito.when(mockTestLogs.streamTaskPayload("ID")).thenReturn(com.google.common.base.Optional.of(
+ new ByteArrayInputStream(jsonMapper.writeValueAsString(taskInTaskPayloadManager).getBytes(Charset.defaultCharset()))
+ ));
+ K8sTaskAdapter adapter = new SingleContainerTaskAdapter(
+ testClient,
+ config,
+ taskConfig,
+ startupLoggingConfig,
+ node,
+ jsonMapper,
+ mockTestLogs
+ );
+
+ Job job = new JobBuilder()
+ .editMetadata().withName("job").endMetadata()
+ .editSpec().editTemplate().editMetadata()
+ .addToAnnotations(DruidK8sConstants.TASK_ID, "ID")
+ .endMetadata().editSpec().addToContainers(new ContainerBuilder().withName("main").build()).endSpec().endTemplate().endSpec().build();
+
+ Task taskFromJob = adapter.toTask(job);
+ assertEquals(taskInTaskPayloadManager, taskFromJob);
+ }
+
+ @Test
+ public void getTaskId()
+ {
+ TestKubernetesClient testClient = new TestKubernetesClient(client);
+ KubernetesTaskRunnerConfig config = KubernetesTaskRunnerConfig.builder().build();
+ K8sTaskAdapter adapter = new SingleContainerTaskAdapter(
+ testClient,
+ config,
+ taskConfig,
+ startupLoggingConfig,
+ node,
+ jsonMapper,
+ taskLogs
+ );
+ Job job = new JobBuilder()
+ .editSpec().editTemplate().editMetadata()
+ .addToAnnotations(DruidK8sConstants.TASK_ID, "ID")
+ .endMetadata().endTemplate().endSpec().build();
+
+ assertEquals(new K8sTaskId("ID"), adapter.getTaskId(job));
+ }
+
+ @Test
+ public void getTaskId_noAnnotations()
+ {
+ TestKubernetesClient testClient = new TestKubernetesClient(client);
+ KubernetesTaskRunnerConfig config = KubernetesTaskRunnerConfig.builder().build();
+ K8sTaskAdapter adapter = new SingleContainerTaskAdapter(
+ testClient,
+ config,
+ taskConfig,
+ startupLoggingConfig,
+ node,
+ jsonMapper,
+ taskLogs
+ );
+ Job job = new JobBuilder()
+ .editSpec().editTemplate().editMetadata()
+ .endMetadata().endTemplate().endSpec()
+ .editMetadata().withName("job").endMetadata().build();
+
+ Assert.assertThrows(DruidException.class, () -> adapter.getTaskId(job));
+ }
+
+ @Test
+ public void getTaskId_missingTaskIdAnnotation()
+ {
+ TestKubernetesClient testClient = new TestKubernetesClient(client);
+ KubernetesTaskRunnerConfig config = KubernetesTaskRunnerConfig.builder().build();
+ K8sTaskAdapter adapter = new SingleContainerTaskAdapter(
+ testClient,
+ config,
+ taskConfig,
+ startupLoggingConfig,
+ node,
+ jsonMapper,
+ taskLogs
+ );
+ Job job = new JobBuilder()
+ .editSpec().editTemplate().editMetadata()
+ .addToAnnotations(DruidK8sConstants.TASK_GROUP_ID, "ID")
+ .endMetadata().endTemplate().endSpec()
+ .editMetadata().withName("job").endMetadata().build();
+
+ Assert.assertThrows(DruidException.class, () -> adapter.getTaskId(job));
+ }
@Test
void testGrabbingTheLastXmxValueFromACommand()
{
@@ -282,7 +463,8 @@ void testAddingMonitors() throws IOException
taskConfig,
startupLoggingConfig,
node,
- jsonMapper
+ jsonMapper,
+ taskLogs
);
Task task = K8sTestUtils.getTask();
// no monitor in overlord, no monitor override
@@ -305,7 +487,8 @@ void testAddingMonitors() throws IOException
taskConfig,
startupLoggingConfig,
node,
- jsonMapper
+ jsonMapper,
+ taskLogs
);
adapter.addEnvironmentVariables(container, context, task.toString());
EnvVar env = container.getEnv()
@@ -322,7 +505,8 @@ void testAddingMonitors() throws IOException
taskConfig,
startupLoggingConfig,
node,
- jsonMapper
+ jsonMapper,
+ taskLogs
);
container.getEnv().add(new EnvVarBuilder()
.withName("druid_monitoring_monitors")
@@ -347,14 +531,17 @@ void testEphemeralStorageIsRespected() throws IOException
KubernetesTaskRunnerConfig config = KubernetesTaskRunnerConfig.builder()
.withNamespace("test")
.build();
- SingleContainerTaskAdapter adapter =
- new SingleContainerTaskAdapter(testClient,
- config, taskConfig,
- startupLoggingConfig,
- node,
- jsonMapper
- );
- NoopTask task = NoopTask.create("id", 1);
+
+ SingleContainerTaskAdapter adapter = new SingleContainerTaskAdapter(
+ testClient,
+ config,
+ taskConfig,
+ startupLoggingConfig,
+ node,
+ jsonMapper,
+ taskLogs
+ );
+ NoopTask task = K8sTestUtils.createTask("id", 1);
Job actual = adapter.createJobFromPodSpec(
pod.getSpec(),
task,
@@ -426,4 +613,5 @@ void testEphemeralStorage()
);
assertEquals(1, additionalProperties.getAdditionalProperties().size());
}
+
}
diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/MultiContainerTaskAdapterTest.java b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/MultiContainerTaskAdapterTest.java
index aa93856ded86..45ea08733768 100644
--- a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/MultiContainerTaskAdapterTest.java
+++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/MultiContainerTaskAdapterTest.java
@@ -41,6 +41,8 @@
import org.apache.druid.k8s.overlord.common.TestKubernetesClient;
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.log.StartupLoggingConfig;
+import org.apache.druid.tasklogs.TaskLogs;
+import org.easymock.Mock;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -58,6 +60,7 @@ class MultiContainerTaskAdapterTest
private TaskConfig taskConfig;
private DruidNode druidNode;
private ObjectMapper jsonMapper;
+ @Mock private TaskLogs taskLogs;
@BeforeEach
public void setup()
@@ -98,9 +101,10 @@ public void testMultiContainerSupport() throws IOException
taskConfig,
startupLoggingConfig,
druidNode,
- jsonMapper
+ jsonMapper,
+ taskLogs
);
- NoopTask task = NoopTask.create("id", 1);
+ NoopTask task = K8sTestUtils.createTask("id", 1);
Job actual = adapter.createJobFromPodSpec(
pod.getSpec(),
task,
@@ -146,9 +150,10 @@ public void testMultiContainerSupportWithNamedContainer() throws IOException
taskConfig,
startupLoggingConfig,
druidNode,
- jsonMapper
+ jsonMapper,
+ taskLogs
);
- NoopTask task = NoopTask.create("id", 1);
+ NoopTask task = K8sTestUtils.createTask("id", 1);
PodSpec spec = pod.getSpec();
K8sTaskAdapter.massageSpec(spec, "primary");
Job actual = adapter.createJobFromPodSpec(
@@ -191,13 +196,17 @@ public void testOverridingPeonMonitors() throws IOException
.withPrimaryContainerName("primary")
.withPeonMonitors(ImmutableList.of("org.apache.druid.java.util.metrics.JvmMonitor"))
.build();
- MultiContainerTaskAdapter adapter = new MultiContainerTaskAdapter(testClient,
- config,
- taskConfig,
- startupLoggingConfig,
- druidNode,
- jsonMapper);
- NoopTask task = NoopTask.create("id", 1);
+
+ MultiContainerTaskAdapter adapter = new MultiContainerTaskAdapter(
+ testClient,
+ config,
+ taskConfig,
+ startupLoggingConfig,
+ druidNode,
+ jsonMapper,
+ taskLogs
+ );
+ NoopTask task = K8sTestUtils.createTask("id", 1);
PodSpec spec = pod.getSpec();
K8sTaskAdapter.massageSpec(spec, config.getPrimaryContainerName());
Job actual = adapter.createJobFromPodSpec(
diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/PodTemplateTaskAdapterTest.java b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/PodTemplateTaskAdapterTest.java
index 992a071e22a8..74dfacd1a327 100644
--- a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/PodTemplateTaskAdapterTest.java
+++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/PodTemplateTaskAdapterTest.java
@@ -20,30 +20,39 @@
package org.apache.druid.k8s.overlord.taskadapter;
import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableMap;
import io.fabric8.kubernetes.api.model.PodTemplate;
import io.fabric8.kubernetes.api.model.batch.v1.Job;
import io.fabric8.kubernetes.api.model.batch.v1.JobBuilder;
+import org.apache.commons.lang.RandomStringUtils;
+import org.apache.druid.error.DruidException;
import org.apache.druid.indexing.common.TestUtils;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.config.TaskConfigBuilder;
import org.apache.druid.indexing.common.task.NoopTask;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.java.util.common.IAE;
-import org.apache.druid.java.util.common.IOE;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.k8s.overlord.KubernetesTaskRunnerConfig;
import org.apache.druid.k8s.overlord.common.Base64Compression;
import org.apache.druid.k8s.overlord.common.DruidK8sConstants;
+import org.apache.druid.k8s.overlord.common.K8sTaskId;
import org.apache.druid.k8s.overlord.common.K8sTestUtils;
import org.apache.druid.server.DruidNode;
+import org.apache.druid.tasklogs.TaskLogs;
import org.easymock.EasyMock;
+import org.easymock.Mock;
import org.junit.Assert;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
+import org.mockito.Mockito;
+import java.io.ByteArrayInputStream;
import java.io.IOException;
+import java.nio.charset.Charset;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Collections;
@@ -51,6 +60,8 @@
import java.util.Properties;
import java.util.stream.Collectors;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
public class PodTemplateTaskAdapterTest
{
@TempDir private Path tempDir;
@@ -59,6 +70,7 @@ public class PodTemplateTaskAdapterTest
private TaskConfig taskConfig;
private DruidNode node;
private ObjectMapper mapper;
+ @Mock private TaskLogs taskLogs;
@BeforeEach
public void setup()
@@ -89,7 +101,8 @@ public void test_fromTask_withoutBasePodTemplateInRuntimeProperites_raisesIAE()
taskConfig,
node,
mapper,
- new Properties()
+ new Properties(),
+ taskLogs
));
}
@@ -109,7 +122,8 @@ public void test_fromTask_withBasePodTemplateInRuntimeProperites_withEmptyFile_r
taskConfig,
node,
mapper,
- props
+ props,
+ taskLogs
));
}
@@ -127,19 +141,11 @@ public void test_fromTask_withBasePodTemplateInRuntimeProperites() throws IOExce
taskConfig,
node,
mapper,
- props
+ props,
+ taskLogs
);
- Task task = new NoopTask(
- "id",
- "id",
- "datasource",
- 0,
- 0,
- null,
- null,
- null
- );
+ Task task = new NoopTask("id", "id", "datasource", 0, 0, null);
Job actual = adapter.fromTask(task);
Job expected = K8sTestUtils.fileToResource("expectedNoopJob.yaml", Job.class);
@@ -168,20 +174,11 @@ public void test_fromTask_withBasePodTemplateInRuntimeProperites_andTlsEnabled()
true
),
mapper,
- props
- );
-
- Task task = new NoopTask(
- "id",
- "id",
- "datasource",
- 0,
- 0,
- null,
- null,
- null
+ props,
+ taskLogs
);
+ Task task = new NoopTask("id", "id", "datasource", 0, 0, null);
Job actual = adapter.fromTask(task);
Job expected = K8sTestUtils.fileToResource("expectedNoopJobTlsEnabled.yaml", Job.class);
@@ -204,7 +201,8 @@ public void test_fromTask_withNoopPodTemplateInRuntimeProperties_withEmptyFile_r
taskConfig,
node,
mapper,
- props
+ props,
+ taskLogs
));
}
@@ -223,7 +221,34 @@ public void test_fromTask_withNoopPodTemplateInRuntimeProperites() throws IOExce
taskConfig,
node,
mapper,
- props
+ props,
+ taskLogs
+ );
+
+ Task task = new NoopTask("id", "id", "datasource", 0, 0, null);
+ Job actual = adapter.fromTask(task);
+ Job expected = K8sTestUtils.fileToResource("expectedNoopJob.yaml", Job.class);
+
+ assertJobSpecsEqual(actual, expected);
+ }
+
+ @Test
+ public void test_fromTask_withNoopPodTemplateInRuntimeProperites_dontSetTaskJSON() throws IOException
+ {
+ Path templatePath = Files.createFile(tempDir.resolve("noop.yaml"));
+ mapper.writeValue(templatePath.toFile(), podTemplateSpec);
+
+ Properties props = new Properties();
+ props.setProperty("druid.indexer.runner.k8s.podTemplate.base", templatePath.toString());
+ props.setProperty("druid.indexer.runner.k8s.podTemplate.noop", templatePath.toString());
+
+ PodTemplateTaskAdapter adapter = new PodTemplateTaskAdapter(
+ taskRunnerConfig,
+ taskConfig,
+ node,
+ mapper,
+ props,
+ taskLogs
);
Task task = new NoopTask(
@@ -232,19 +257,17 @@ public void test_fromTask_withNoopPodTemplateInRuntimeProperites() throws IOExce
"datasource",
0,
0,
- null,
- null,
- null
+ ImmutableMap.of("context", RandomStringUtils.randomAlphanumeric((int) DruidK8sConstants.MAX_ENV_VARIABLE_KBS * 20))
);
Job actual = adapter.fromTask(task);
- Job expected = K8sTestUtils.fileToResource("expectedNoopJob.yaml", Job.class);
+ Job expected = K8sTestUtils.fileToResource("expectedNoopJobNoTaskJson.yaml", Job.class);
- assertJobSpecsEqual(actual, expected);
+ Assertions.assertEquals(actual, expected);
}
@Test
- public void test_fromTask_withoutAnnotations_throwsIOE() throws IOException
+ public void test_fromTask_withoutAnnotations_throwsDruidException() throws IOException
{
Path templatePath = Files.createFile(tempDir.resolve("base.yaml"));
mapper.writeValue(templatePath.toFile(), podTemplateSpec);
@@ -257,17 +280,91 @@ public void test_fromTask_withoutAnnotations_throwsIOE() throws IOException
taskConfig,
node,
mapper,
- props
+ props,
+ taskLogs
);
Job job = K8sTestUtils.fileToResource("baseJobWithoutAnnotations.yaml", Job.class);
- Assert.assertThrows(IOE.class, () -> adapter.toTask(job));
+ Assert.assertThrows(DruidException.class, () -> adapter.toTask(job));
}
@Test
- public void test_fromTask_withoutTaskAnnotation_throwsIOE() throws IOException
+ public void test_getTaskId() throws IOException
+ {
+ Path templatePath = Files.createFile(tempDir.resolve("base.yaml"));
+ mapper.writeValue(templatePath.toFile(), podTemplateSpec);
+
+ Properties props = new Properties();
+ props.setProperty("druid.indexer.runner.k8s.podTemplate.base", templatePath.toString());
+ PodTemplateTaskAdapter adapter = new PodTemplateTaskAdapter(
+ taskRunnerConfig,
+ taskConfig,
+ node,
+ mapper,
+ props,
+ taskLogs
+ );
+ Job job = new JobBuilder()
+ .editSpec().editTemplate().editMetadata()
+ .addToAnnotations(DruidK8sConstants.TASK_ID, "ID")
+ .endMetadata().endTemplate().endSpec().build();
+
+ assertEquals(new K8sTaskId("ID"), adapter.getTaskId(job));
+ }
+
+ @Test
+ public void test_getTaskId_noAnnotations() throws IOException
+ {
+ Path templatePath = Files.createFile(tempDir.resolve("base.yaml"));
+ mapper.writeValue(templatePath.toFile(), podTemplateSpec);
+
+ Properties props = new Properties();
+ props.setProperty("druid.indexer.runner.k8s.podTemplate.base", templatePath.toString());
+ PodTemplateTaskAdapter adapter = new PodTemplateTaskAdapter(
+ taskRunnerConfig,
+ taskConfig,
+ node,
+ mapper,
+ props,
+ taskLogs
+ );
+ Job job = new JobBuilder()
+ .editSpec().editTemplate().editMetadata()
+ .endMetadata().endTemplate().endSpec()
+ .editMetadata().withName("job").endMetadata().build();
+
+ Assert.assertThrows(DruidException.class, () -> adapter.getTaskId(job));
+ }
+
+ @Test
+ public void test_getTaskId_missingTaskIdAnnotation() throws IOException
+ {
+ Path templatePath = Files.createFile(tempDir.resolve("base.yaml"));
+ mapper.writeValue(templatePath.toFile(), podTemplateSpec);
+
+ Properties props = new Properties();
+ props.setProperty("druid.indexer.runner.k8s.podTemplate.base", templatePath.toString());
+ PodTemplateTaskAdapter adapter = new PodTemplateTaskAdapter(
+ taskRunnerConfig,
+ taskConfig,
+ node,
+ mapper,
+ props,
+ taskLogs
+ );
+ Job job = new JobBuilder()
+ .editSpec().editTemplate().editMetadata()
+ .addToAnnotations(DruidK8sConstants.TASK_GROUP_ID, "ID")
+ .endMetadata().endTemplate().endSpec()
+ .editMetadata().withName("job").endMetadata().build();
+
+ Assert.assertThrows(DruidException.class, () -> adapter.getTaskId(job));
+ }
+
+ @Test
+ public void test_toTask_withoutTaskAnnotation_throwsIOE() throws IOException
{
Path templatePath = Files.createFile(tempDir.resolve("base.yaml"));
mapper.writeValue(templatePath.toFile(), podTemplateSpec);
@@ -280,7 +377,8 @@ public void test_fromTask_withoutTaskAnnotation_throwsIOE() throws IOException
taskConfig,
node,
mapper,
- props
+ props,
+ taskLogs
);
Job baseJob = K8sTestUtils.fileToResource("baseJobWithoutAnnotations.yaml", Job.class);
@@ -294,11 +392,11 @@ public void test_fromTask_withoutTaskAnnotation_throwsIOE() throws IOException
.endTemplate()
.endSpec()
.build();
- Assert.assertThrows(IOE.class, () -> adapter.toTask(job));
+ Assert.assertThrows(DruidException.class, () -> adapter.toTask(job));
}
@Test
- public void test_fromTask() throws IOException
+ public void test_toTask() throws IOException
{
Path templatePath = Files.createFile(tempDir.resolve("base.yaml"));
mapper.writeValue(templatePath.toFile(), podTemplateSpec);
@@ -311,13 +409,43 @@ public void test_fromTask() throws IOException
taskConfig,
node,
mapper,
- props
+ props,
+ taskLogs
);
Job job = K8sTestUtils.fileToResource("baseJob.yaml", Job.class);
Task actual = adapter.toTask(job);
- Task expected = NoopTask.create("id", 1);
+ Task expected = K8sTestUtils.createTask("id", 1);
+
+ Assertions.assertEquals(expected, actual);
+ }
+
+ @Test
+ public void test_toTask_useTaskPayloadManager() throws IOException
+ {
+ Path templatePath = Files.createFile(tempDir.resolve("base.yaml"));
+ mapper.writeValue(templatePath.toFile(), podTemplateSpec);
+
+ Properties props = new Properties();
+ props.put("druid.indexer.runner.k8s.podTemplate.base", templatePath.toString());
+
+ Task expected = new NoopTask("id", null, "datasource", 0, 0, ImmutableMap.of());
+ TaskLogs mockTestLogs = Mockito.mock(TaskLogs.class);
+ Mockito.when(mockTestLogs.streamTaskPayload("id")).thenReturn(Optional.of(
+ new ByteArrayInputStream(mapper.writeValueAsString(expected).getBytes(Charset.defaultCharset()))
+ ));
+
+ PodTemplateTaskAdapter adapter = new PodTemplateTaskAdapter(
+ taskRunnerConfig,
+ taskConfig,
+ node,
+ mapper,
+ props,
+ mockTestLogs
+ );
+ Job job = K8sTestUtils.fileToResource("expectedNoopJob.yaml", Job.class);
+ Task actual = adapter.toTask(job);
Assertions.assertEquals(expected, actual);
}
@@ -336,7 +464,8 @@ public void test_fromTask_withRealIds() throws IOException
taskConfig,
node,
mapper,
- props
+ props,
+ taskLogs
);
Task task = new NoopTask(
@@ -345,8 +474,6 @@ public void test_fromTask_withRealIds() throws IOException
"data_source",
0,
0,
- null,
- null,
null
);
@@ -371,7 +498,8 @@ public void test_fromTask_taskSupportsQueries() throws IOException
taskConfig,
node,
mapper,
- props
+ props,
+ taskLogs
);
Task task = EasyMock.mock(Task.class);
@@ -406,7 +534,7 @@ private void assertJobSpecsEqual(Job actual, Job expected) throws IOException
expectedAnnotations.remove(DruidK8sConstants.TASK);
expected.getSpec().getTemplate().getMetadata().setAnnotations(expectedAnnotations);
- Assertions.assertEquals(actual, expected);
+ Assertions.assertEquals(expected, actual);
Assertions.assertEquals(
Base64Compression.decompressBase64(actualTaskAnnotation),
Base64Compression.decompressBase64(expectedTaskAnnotation)
diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/SingleContainerTaskAdapterTest.java b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/SingleContainerTaskAdapterTest.java
index 3e51c5a7acbc..43a40daedc1d 100644
--- a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/SingleContainerTaskAdapterTest.java
+++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/SingleContainerTaskAdapterTest.java
@@ -39,6 +39,8 @@
import org.apache.druid.k8s.overlord.common.TestKubernetesClient;
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.log.StartupLoggingConfig;
+import org.apache.druid.tasklogs.TaskLogs;
+import org.easymock.Mock;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -57,6 +59,8 @@ class SingleContainerTaskAdapterTest
private DruidNode druidNode;
private ObjectMapper jsonMapper;
+ @Mock private TaskLogs taskLogs;
+
@BeforeEach
public void setup()
{
@@ -96,9 +100,10 @@ public void testSingleContainerSupport() throws IOException
taskConfig,
startupLoggingConfig,
druidNode,
- jsonMapper
+ jsonMapper,
+ taskLogs
);
- NoopTask task = NoopTask.create("id", 1);
+ NoopTask task = K8sTestUtils.createTask("id", 1);
Job actual = adapter.createJobFromPodSpec(
pod.getSpec(),
task,
diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJob.yaml b/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJob.yaml
index 7c048ea6df48..2cef837f3972 100644
--- a/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJob.yaml
+++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJob.yaml
@@ -26,7 +26,7 @@ spec:
druid.task.group.id: "id"
druid.task.datasource: "datasource"
annotations:
- task: "H4sIAAAAAAAAAEVOuQ4CIRD9l6kpVhObbY0xtrs2liOMSoKAHEZC+HeHrEczmXfmVUjFE4xgnfMgQCv++Qi4Bpf94QcVJpxdDrKbO4gLEBCyPeo70+vNMHBDnAhVWag/nihmkzh72s0cuuhANxfZYrMxAqSziV6s18aN9CkfK+ATtcGzNjqVfZ/0HRTokblEbdGjZBHGVWtvT9WXlc8AAAA="
+ task: "H4sIAAAAAAAAAD2MvQ4CIRCE32VqijsTG1qLi7W+wArEbHICrmC8EN7dJf40k/lmJtNQthxgEVPKMGCvXsXgKqnm4x89FTqlKm6MBzw+YCA1nvmm8W4/TQYuxRJeBbZ17cJ3ZhvoSbzShVcu2zLOf9cS7pUl+ANlclrCzr2/AQUK0FqZAAAA"
tls.enabled: "false"
task.id: "id"
task.type: "noop"
@@ -42,11 +42,11 @@ spec:
value: "/tmp"
- name: "TASK_ID"
value: "id"
+ - name: "LOAD_BROADCAST_SEGMENTS"
+ value: "false"
- name: "TASK_JSON"
valueFrom:
fieldRef:
fieldPath: "metadata.annotations['task']"
- - name: "LOAD_BROADCAST_SEGMENTS"
- value: "false"
image: one
name: primary
diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJobLongIds.yaml b/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJobLongIds.yaml
index b781070892c9..cf16c49c5db1 100644
--- a/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJobLongIds.yaml
+++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJobLongIds.yaml
@@ -26,7 +26,7 @@ spec:
druid.task.group.id: "apiissuedkillwikipedia3omjobnbc10000101t000000000z20230514t0000"
druid.task.datasource: "data_source"
annotations:
- task: "H4sIAAAAAAAAAMVQu07EMBD8F9fJae0QIblFCNHepeEay4kNLOezjR9AFOXf2XBIVNQnbbEzs6/ZhZU5WiaZDyGyhqGhXEdsMedqjTqhc+oTTxitQd2pcH4Lox8nxQGgBU4xAMif2BF1VAJE10Lf8pv/hH7gtxI6CXwnBBxp60sKNT5eZbXRRR9CTdP2hA2ofEENS9UPeCZe9AD0mry32swX6g/vba6uUPPT/YGanjHZ15CpxFfnGjYFX+wX6ctKE+3vcLkw/aHR6REdlvlh838N98m+VzrY3OmoJzqESb6u3yiWc3MUAgAA"
+ task: "H4sIAAAAAAAAAMVQPa/CMAz8L55b5KRUSFkZnt5Mpy6R20RPhtKENOFDVf87KbC+GekG352l83mG+PAWFIzOeSiATZ7Jc8nTlKzRJx4GfeMTe2uYKu3OR9eNXa8FIpYoMhpE9cImS62WKKsS61Js/zPqRuwUVgrFRkpsc+pfcMn/fiXaUKSDS6Ffn7ASPb1ZASGNDZ+zLmvEAno3RnuPoOYle/azpmagK/FAHQ8cHz9rk2/0CPaSOFizJ099PgSUWJYnqMIU2d4BAAA="
tls.enabled: "false"
task.id: "api-issued_kill_wikipedia3_omjobnbc_1000-01-01T00:00:00.000Z_2023-05-14T00:00:00.000Z_2023-05-15T17:03:01.220Z"
task.type: "noop"
@@ -42,11 +42,11 @@ spec:
value: "/tmp"
- name: "TASK_ID"
value: "api-issued_kill_wikipedia3_omjobnbc_1000-01-01T00:00:00.000Z_2023-05-14T00:00:00.000Z_2023-05-15T17:03:01.220Z"
+ - name: "LOAD_BROADCAST_SEGMENTS"
+ value: "false"
- name: "TASK_JSON"
valueFrom:
fieldRef:
fieldPath: "metadata.annotations['task']"
- - name: "LOAD_BROADCAST_SEGMENTS"
- value: "false"
image: one
name: primary
diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJobNoTaskJson.yaml b/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJobNoTaskJson.yaml
new file mode 100644
index 000000000000..d72d0ef37b03
--- /dev/null
+++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJobNoTaskJson.yaml
@@ -0,0 +1,47 @@
+apiVersion: batch/v1
+kind: Job
+metadata:
+ name: "id-3e70afe5cd823dfc7dd308eea616426b"
+ labels:
+ druid.k8s.peons: "true"
+ druid.task.id: "id"
+ druid.task.type: "noop"
+ druid.task.group.id: "id"
+ druid.task.datasource: "datasource"
+ annotations:
+ task.id: "id"
+ task.type: "noop"
+ task.group.id: "id"
+ task.datasource: "datasource"
+spec:
+ activeDeadlineSeconds: 14400
+ backoffLimit: 0
+ ttlSecondsAfterFinished: 172800
+ template:
+ metadata:
+ labels:
+ druid.k8s.peons: "true"
+ druid.task.id: "id"
+ druid.task.type: "noop"
+ druid.task.group.id: "id"
+ druid.task.datasource: "datasource"
+ annotations:
+ tls.enabled: "false"
+ task.id: "id"
+ task.type: "noop"
+ task.group.id: "id"
+ task.datasource: "datasource"
+ spec:
+ containers:
+ - command:
+ - sleep
+ - "3600"
+ env:
+ - name: "TASK_DIR"
+ value: "/tmp"
+ - name: "TASK_ID"
+ value: "id"
+ - name: "LOAD_BROADCAST_SEGMENTS"
+ value: "false"
+ image: one
+ name: primary
diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJobTlsEnabled.yaml b/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJobTlsEnabled.yaml
index 0eb8f8b02f08..a230ac913a60 100644
--- a/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJobTlsEnabled.yaml
+++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedNoopJobTlsEnabled.yaml
@@ -26,7 +26,7 @@ spec:
druid.task.group.id: "id"
druid.task.datasource: "datasource"
annotations:
- task: "H4sIAAAAAAAAAEVOuQ4CIRD9l6kpVhObbY0xtrs2liOMSoKAHEZC+HeHrEczmXfmVUjFE4xgnfMgQCv++Qi4Bpf94QcVJpxdDrKbO4gLEBCyPeo70+vNMHBDnAhVWag/nihmkzh72s0cuuhANxfZYrMxAqSziV6s18aN9CkfK+ATtcGzNjqVfZ/0HRTokblEbdGjZBHGVWtvT9WXlc8AAAA="
+ task: "H4sIAAAAAAAAAD2MvQ4CIRCE32VqijsTG1qLi7W+wArEbHICrmC8EN7dJf40k/lmJtNQthxgEVPKMGCvXsXgKqnm4x89FTqlKm6MBzw+YCA1nvmm8W4/TQYuxRJeBbZ17cJ3ZhvoSbzShVcu2zLOf9cS7pUl+ANlclrCzr2/AQUK0FqZAAAA"
tls.enabled: "true"
task.id: "id"
task.type: "noop"
@@ -42,11 +42,11 @@ spec:
value: "/tmp"
- name: "TASK_ID"
value: "id"
+ - name: "LOAD_BROADCAST_SEGMENTS"
+ value: "false"
- name: "TASK_JSON"
valueFrom:
fieldRef:
fieldPath: "metadata.annotations['task']"
- - name: "LOAD_BROADCAST_SEGMENTS"
- value: "false"
image: one
name: primary
diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/kubernetesAndWorkerTaskRunnerConfig.json b/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/kubernetesAndWorkerTaskRunnerConfig.json
new file mode 100644
index 000000000000..43e7414f11f8
--- /dev/null
+++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/kubernetesAndWorkerTaskRunnerConfig.json
@@ -0,0 +1,4 @@
+{
+ "runnerStrategy.type": "worker",
+ "runnerStrategy.workerType": "remote"
+}
\ No newline at end of file
diff --git a/extensions-contrib/materialized-view-maintenance/pom.xml b/extensions-contrib/materialized-view-maintenance/pom.xml
index f54f1e87df78..ff35634c2a70 100644
--- a/extensions-contrib/materialized-view-maintenance/pom.xml
+++ b/extensions-contrib/materialized-view-maintenance/pom.xml
@@ -22,7 +22,7 @@
druid
org.apache.druid
- 28.0.0-SNAPSHOT
+ 29.0.0-SNAPSHOT
../../pom.xml
4.0.0
@@ -128,4 +128,15 @@