Add Task.block_filter for eager block pruning before workers spawn#69
Open
mzouink wants to merge 2 commits into
Open
Add Task.block_filter for eager block pruning before workers spawn#69mzouink wants to merge 2 commits into
mzouink wants to merge 2 commits into
Conversation
A new optional `block_filter` parameter on `daisy.Task` lets callers drop blocks from the dependency graph at construction time, before any worker process is spawned and before the scheduler begins handing blocks out. This is distinct from `check_function`, which runs lazily per block as a worker tries to acquire one and marks the block as already-completed. `block_filter` runs once, in the master, and the filtered blocks never count toward `total_block_count` and are never offered to a worker. Motivation: large blockwise inference jobs over sparse volumes (e.g. restricted to a coarse inference mask) often have tens of millions of candidate blocks but only a small fraction of real work. Today `num_workers` workers are bsub-launched up-front regardless, then sit idle while the master walks the block grid; with `block_filter` the graph collapses to just the live blocks before the worker pool is brought up. Wiring: - `Task.__init__` accepts `block_filter: Optional[Callable[[Block], bool]]` - `DependencyGraph.__add_task_dependency_graph` forwards it to the inner `BlockwiseDependencyGraph` - When set, `BlockwiseDependencyGraph` materializes the surviving blocks per level once in `__init__`. `num_blocks`, `num_roots`, and `level_blocks` then read from the cached filtered set. The original lazy enumeration path is preserved as `_unfiltered_level_blocks` and used when no filter is supplied — no behavior change for existing callers. Tests in `tests/test_scheduler.py` cover the typical case (filter half the blocks, scheduler only ever returns the kept ones) and the zero-blocks-after-filter degenerate case.
When `block_filter` is set, `_apply_block_filter` can take many seconds to minutes on large block grids (e.g. ~14M candidate blocks for a sparse-mask volumetric inference). Emit an INFO log at start, drive a tqdm progress bar across all levels, and log the surviving block count at the end so callers can tell whether the master is making progress or stuck. Per-level totals are computed analytically up-front so tqdm reports a real total without exhausting the underlying generator.
Contributor
Author
|
The main problem I am solving here is the idle time for the workers when there are a lot of blocks to skip |
There was a problem hiding this comment.
Pull request overview
This PR introduces an optional Task.block_filter callback intended to eagerly prune blocks in the master process at dependency-graph construction time, so filtered blocks are excluded from scheduling and from total_block_count. This targets workloads with extremely sparse “real work” over large candidate block grids.
Changes:
- Add
block_filterparameter todaisy.Taskand forward it into dependency-graph construction. - Implement eager filtering/caching of per-level blocks in
BlockwiseDependencyGraph, updatingnum_blocks,num_roots, andlevel_blocksto reflect the filtered set. - Add scheduler tests verifying that filtered blocks are never scheduled and that the “all blocks dropped” case yields no work.
Reviewed changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated 3 comments.
| File | Description |
|---|---|
tests/test_scheduler.py |
Adds tests for block pruning behavior and the zero-block degenerate case. |
daisy/task.py |
Exposes the new block_filter parameter on the public Task API and documents it. |
daisy/dependency_graph.py |
Forwards block_filter into BlockwiseDependencyGraph and materializes filtered blocks eagerly for counts/enumeration. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Comment on lines
152
to
156
| write_roi, | ||
| process_function, | ||
| check_function=None, | ||
| block_filter=None, | ||
| init_callback_fn=None, |
Comment on lines
+160
to
+164
| """Materialize every level's blocks once and keep only those passing | ||
| ``block_filter``. After this, ``num_blocks``, ``num_roots``, and | ||
| ``level_blocks`` all reflect the surviving set. | ||
|
|
||
| Walks all blocks across all levels with a tqdm progress bar so callers |
Comment on lines
238
to
241
| def num_roots(self): | ||
| if self._filtered_blocks_by_level is not None: | ||
| return len(self._filtered_blocks_by_level[0]) | ||
| return self._num_level_blocks(0) |
Contributor
|
Would be good to have:
|
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
A new optional
block_filterparameter ondaisy.Tasklets callers drop blocks from the dependency graph at construction time, before any worker process is spawned and before the scheduler begins handing blocks out.This is distinct from
check_function, which runs lazily per block as a worker tries to acquire one and marks the block as already-completed.block_filterruns once, in the master, and the filtered blocks never count towardtotal_block_countand are never offered to a worker.Motivation: large blockwise inference jobs over sparse volumes (e.g. restricted to a coarse inference mask) often have tens of millions of candidate blocks but only a small fraction of real work. Today
num_workersworkers are bsub-launched up-front regardless, then sit idle while the master walks the block grid; withblock_filterthe graph collapses to just the live blocks before the worker pool is brought up.Wiring:
Task.__init__acceptsblock_filter: Optional[Callable[[Block], bool]]DependencyGraph.__add_task_dependency_graphforwards it to the innerBlockwiseDependencyGraphBlockwiseDependencyGraphmaterializes the surviving blocks per level once in__init__.num_blocks,num_roots, andlevel_blocksthen read from the cached filtered set. The original lazy enumeration path is preserved as_unfiltered_level_blocksand used when no filter is supplied — no behavior change for existing callers.Tests in
tests/test_scheduler.pycover the typical case (filter half the blocks, scheduler only ever returns the kept ones) and the zero-blocks-after-filter degenerate case.