Skip to content

Add indexing supervisor#194

Open
kfaraz wants to merge 23 commits into
masterfrom
add_indexing_supervisor
Open

Add indexing supervisor#194
kfaraz wants to merge 23 commits into
masterfrom
add_indexing_supervisor

Conversation

@kfaraz

@kfaraz kfaraz commented Aug 4, 2025

Copy link
Copy Markdown
Owner

Description

  • Add a BatchIndexingSupervisor as an analog to the SeekableStreamSupervisor
  • CompactionSupervisor implements BatchIndexingSupervisor
  • ScheduledBatchSupervisor will also be updated to extend this supervisor in follow up PRs
  • A BatchIndexingSupervisor uses BatchIndexingJobTemplate to create batch jobs (Task or MSQ SQL)

Changes

  • Add BatchIndexingJob
  • Add BatchIndexingSupervisor

Pending items

  • fix code coverage
  • fix up compaction stats collection
  • store compaction state in MSQ jobs
  • fill timeline gaps, support realiging intervals
  • cancel mismatching task
  • pass in the engine to the template
  • Basic MSQ template
  • invoke onTimelineUpdated - timeline will now get updated very frequently,
  • we don't want to recompact intervals, try to find the right thing to do.
  • we might have to do it via the policy
  • maybe use searchInterval instead of skipIntervals
  • how does this whole thing affect queuedIntervals
  • for duty, it doesn't matter
  • for supervisors, intervals will always be mutually exclusive

@kfaraz kfaraz changed the base branch from master to fix_checkstyle August 11, 2025 06:44
@kfaraz kfaraz changed the base branch from fix_checkstyle to master August 11, 2025 06:44
*
* @param jobParams Parameters for the current run of the scheduler.
*/
boolean shouldCreateJobs(P jobParams);

Check notice

Code scanning / CodeQL

Useless parameter Note

The parameter 'jobParams' is never used.

Copilot Autofix

AI 11 months ago

To fix the problem, remove the unused parameter P jobParams from the shouldCreateJobs method in the BatchIndexingSupervisor interface. This involves editing the method signature on line 38 to remove the parameter, so it becomes boolean shouldCreateJobs();. No other changes are needed in this file. However, all implementations of this interface and any code that calls this method will also need to be updated to remove the parameter, but since we are only allowed to edit the code shown, we will only update the interface here.


Suggested changeset 1
indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/BatchIndexingSupervisor.java

Autofix patch

Autofix patch
Run the following command in your local git repository to apply this patch
cat << 'EOF' | git apply
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/BatchIndexingSupervisor.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/BatchIndexingSupervisor.java
--- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/BatchIndexingSupervisor.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/BatchIndexingSupervisor.java
@@ -35,7 +35,7 @@
    *
    * @param jobParams Parameters for the current run of the scheduler.
    */
-  boolean shouldCreateJobs(P jobParams);
+  boolean shouldCreateJobs();
 
   /**
    * Creates jobs to be launched in the current run of the scheduler.
EOF
@@ -35,7 +35,7 @@
*
* @param jobParams Parameters for the current run of the scheduler.
*/
boolean shouldCreateJobs(P jobParams);
boolean shouldCreateJobs();

/**
* Creates jobs to be launched in the current run of the scheduler.
Copilot is powered by AI and may make mistakes. Always verify output.
config.getTaskPriority(),
ClientCompactionTaskQueryTuningConfig.from(
config.getTuningConfig(),
config.getMaxRowsPerSegment(),

Check notice

Code scanning / CodeQL

Deprecated method or constructor invocation Note

Invoking
DataSourceCompactionConfig.getMaxRowsPerSegment
should be avoided because it has been deprecated.

Copilot Autofix

AI 11 months ago

To fix the problem, we should replace the usage of the deprecated getMaxRowsPerSegment() method with its recommended alternative. Typically, when a method is deprecated, the Javadoc will suggest a replacement. In the context of Druid's compaction configuration, the replacement is likely getMaxRowsPerSegmentPerPartition() or a similar method, which provides more granular control. The fix should be applied only to the invocation at line 363 in server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java. No changes to imports are needed, as the replacement method should be available on the same class. The rest of the arguments to ClientCompactionTaskQueryTuningConfig.from() should remain unchanged to preserve existing functionality.

Suggested changeset 1
server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java

Autofix patch

Autofix patch
Run the following command in your local git repository to apply this patch
cat << 'EOF' | git apply
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java
--- a/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java
@@ -360,7 +360,7 @@
         config.getTaskPriority(),
         ClientCompactionTaskQueryTuningConfig.from(
             config.getTuningConfig(),
-            config.getMaxRowsPerSegment(),
+            config.getMaxRowsPerSegmentPerPartition(),
             config.getMetricsSpec() != null
         ),
         granularitySpec,
EOF
@@ -360,7 +360,7 @@
config.getTaskPriority(),
ClientCompactionTaskQueryTuningConfig.from(
config.getTuningConfig(),
config.getMaxRowsPerSegment(),
config.getMaxRowsPerSegmentPerPartition(),
config.getMetricsSpec() != null
),
granularitySpec,
Copilot is powered by AI and may make mistakes. Always verify output.

// Check if the job is already running, completed or skipped
final CompactionStatus compactionStatus = getCurrentStatusForJob(job, policy);
switch (compactionStatus.getState()) {

Check warning

Code scanning / CodeQL

Missing enum case in switch Warning

Switch statement does not have a case for
PENDING
.

Copilot Autofix

AI 11 months ago

To fix the problem, we should add a case for the missing PENDING enum value in the switch statement on compactionStatus.getState() within the startJobIfPendingAndReady method. The best way to do this is to handle PENDING in a manner consistent with the method's logic. Since the method is about starting jobs that are pending and ready, and there is already logic for adding jobs to pending if not enough slots are available, it makes sense to treat PENDING similarly to SKIPPED (i.e., add to pending and return false), or to explicitly document why it should be handled differently. If the correct action is unclear, the safest approach is to add a case for PENDING that either handles it appropriately or throws an exception to make the code's intent clear.

The change should be made in the file indexing-service/src/main/java/org/apache/druid/indexing/compact/CompactionJobQueue.java, specifically in the switch statement starting at line 213 in the startJobIfPendingAndReady method. No new imports or method definitions are required.

Suggested changeset 1
indexing-service/src/main/java/org/apache/druid/indexing/compact/CompactionJobQueue.java

Autofix patch

Autofix patch
Run the following command in your local git repository to apply this patch
cat << 'EOF' | git apply
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/compact/CompactionJobQueue.java b/indexing-service/src/main/java/org/apache/druid/indexing/compact/CompactionJobQueue.java
--- a/indexing-service/src/main/java/org/apache/druid/indexing/compact/CompactionJobQueue.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/compact/CompactionJobQueue.java
@@ -218,6 +218,9 @@
       case SKIPPED:
         snapshotBuilder.addToSkipped(candidate);
         return false;
+      case PENDING:
+        snapshotBuilder.addToPending(candidate);
+        return false;
     }
 
     // Check if enough compaction task slots are available
EOF
@@ -218,6 +218,9 @@
case SKIPPED:
snapshotBuilder.addToSkipped(candidate);
return false;
case PENDING:
snapshotBuilder.addToPending(candidate);
return false;
}

// Check if enough compaction task slots are available
Copilot is powered by AI and may make mistakes. Always verify output.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants