diff --git a/README.md b/README.md index 1de5ba27..30d7a4f9 100644 --- a/README.md +++ b/README.md @@ -44,6 +44,12 @@ const stopAfter100ms = await glob('**/*.css', { signal: AbortSignal.timeout(100), }) +// cap concurrent async directory reads when traversing a large tree +const jsfilesThrottled = await glob('**/*.js', { concurrency: 8 }) + +// sync variants do not support concurrency and throw a TypeError +// globSync('**/*.js', { concurrency: 8 }) + // multiple patterns supported as well const images = await glob(['css/*.{png,jpeg}', 'public/*.{png,jpeg}']) @@ -315,6 +321,18 @@ share the previously loaded cache. This option may be either a string path or a `file://` URL object or string. +- `concurrency` Limit simultaneous async directory reads to a + positive integer greater than or equal to `8`. + + When omitted, async walks use the same unconstrained traversal + as before. When set, it only changes traversal rate, not the + result set, and it works alongside `signal`. Values below `8`, + `0`, negative numbers, and non-integers throw a `RangeError` + because the walker's fan-out needs enough in-flight `readdir()` + work to avoid starvation. Synchronous variants (`globSync()`, + `globStreamSync()`, and `globIterateSync()`) throw a `TypeError` + if `concurrency` is provided. + - `root` A string path resolved against the `cwd` option, which is used as the starting point for absolute patterns that start with `/`, (but not drive letters or UNC paths on Windows). diff --git a/benchmark.sh b/benchmark.sh index 1685c2e1..387995a4 100644 --- a/benchmark.sh +++ b/benchmark.sh @@ -178,6 +178,20 @@ CJS MJS t node "$wd/bench-working-dir/async.mjs" "$p" + echo -n $'current glob async c=8 \t' + cat > "$wd/bench-working-dir/async-c8.mjs" < console.log(files.length)) +MJS + t node "$wd/bench-working-dir/async-c8.mjs" "$p" + + echo -n $'current glob async c=16 \t' + cat > "$wd/bench-working-dir/async-c16.mjs" < console.log(files.length)) +MJS + t node "$wd/bench-working-dir/async-c16.mjs" "$p" + echo -n $'current glob stream \t' cat > "$wd/bench-working-dir/stream.mjs" < "$wd/bench-working-dir/stream-c8.mjs" < c++) + .on('end', () => console.log(c)) +MJS + t node "$wd/bench-working-dir/stream-c8.mjs" "$p" + + echo -n $'current stream c=16 \t' + cat > "$wd/bench-working-dir/stream-c16.mjs" < c++) + .on('end', () => console.log(c)) +MJS + t node "$wd/bench-working-dir/stream-c16.mjs" "$p" + # echo -n $'current glob sync cjs -e \t' # t node -e ' # console.log(require(process.argv[1]).sync(process.argv[2]).length) diff --git a/changelog.md b/changelog.md index da80311b..f826da4a 100644 --- a/changelog.md +++ b/changelog.md @@ -1,5 +1,15 @@ # changeglob +## [Unreleased] + +### Added + +- Add a `concurrency` option to async glob APIs so callers can cap + simultaneous directory reads without changing match results. + Values below `8` reject with `RangeError`, and sync variants + throw `TypeError` if `concurrency` is provided. Implements + US-006. + ## 13 - Move the CLI program out to a separate package, `glob-bin`. diff --git a/src/glob.ts b/src/glob.ts index ac0c654e..89666270 100644 --- a/src/glob.ts +++ b/src/glob.ts @@ -27,6 +27,46 @@ const defaultPlatform: NodeJS.Platform = process.platform : 'linux' +export const MIN_ASYNC_READDIR_CONCURRENCY = 8 + +const concurrencyMinimumMessage = () => + `concurrency must be a positive integer greater than or equal to ${MIN_ASYNC_READDIR_CONCURRENCY}` + +const concurrencyReason = + 'async glob walks need enough concurrent directory reads to avoid starvation in the walker fan-out' + +const invalidConcurrencyMessage = () => + `invalid concurrency option: ${concurrencyMinimumMessage()} because ${concurrencyReason}` + +const syncConcurrencyMessage = + 'concurrency option is not supported for synchronous glob variants' + +export const validateAsyncConcurrency = ( + concurrency: number | undefined, +): number | undefined => { + if (concurrency === undefined) { + return undefined + } + + if ( + !Number.isInteger(concurrency) || + concurrency <= 0 || + concurrency < MIN_ASYNC_READDIR_CONCURRENCY + ) { + throw new RangeError(invalidConcurrencyMessage()) + } + + return concurrency +} + +export const assertNoSyncConcurrency = ( + concurrency: number | undefined, +): void => { + if (concurrency !== undefined) { + throw new TypeError(syncConcurrencyMessage) + } +} + /** * A `GlobOptions` object may be provided to any of the exported methods, and * must be provided to the `Glob` constructor. @@ -71,6 +111,17 @@ export interface GlobOptions { */ cwd?: string | URL + /** + * Limit simultaneous async directory reads to a positive integer greater + * than or equal to `8`. Values below `8` are rejected because the async + * walker fan-out needs enough in-flight `readdir()` work to avoid + * starvation. + * + * Has no effect when omitted, and is not supported by synchronous glob + * methods. + */ + concurrency?: number + /** * Include `.dot` files in normal matches and `globstar` * matches. Note that an explicit dot in a portion of the pattern @@ -408,6 +459,7 @@ export class Glob implements GlobOptions { windowsPathsNoEscape: boolean withFileTypes: FileTypes includeChildMatches: boolean + concurrency?: number /** * The options provided to the constructor. @@ -455,6 +507,7 @@ export class Glob implements GlobOptions { this.realpath = !!opts.realpath this.absolute = opts.absolute this.includeChildMatches = opts.includeChildMatches !== false + this.concurrency = opts.concurrency this.noglobstar = !!opts.noglobstar this.matchBase = !!opts.matchBase @@ -558,21 +611,28 @@ export class Glob implements GlobOptions { */ async walk(): Promise> async walk(): Promise<(string | Path)[]> { + const concurrency = validateAsyncConcurrency(this.concurrency) + const walker = new GlobWalker(this.patterns, this.scurry.cwd, { + ...this.opts, + maxDepth: + this.maxDepth !== Infinity ? + this.maxDepth + this.scurry.cwd.depth() + : Infinity, + platform: this.platform, + nocase: this.nocase, + includeChildMatches: this.includeChildMatches, + }) + // Walkers always return array of Path objects, so we just have to // coerce them into the right shape. It will have already called // realpath() if the option was set to do so, so we know that's cached. // start out knowing the cwd, at least return [ - ...(await new GlobWalker(this.patterns, this.scurry.cwd, { - ...this.opts, - maxDepth: - this.maxDepth !== Infinity ? - this.maxDepth + this.scurry.cwd.depth() - : Infinity, - platform: this.platform, - nocase: this.nocase, - includeChildMatches: this.includeChildMatches, - }).walk()), + ...(await ( + concurrency === undefined ? + walker.walk() + : walker.walkWithConcurrency(concurrency) + )), ] } @@ -581,6 +641,7 @@ export class Glob implements GlobOptions { */ walkSync(): Results walkSync(): (string | Path)[] { + assertNoSyncConcurrency(this.concurrency) return [ ...new GlobWalker(this.patterns, this.scurry.cwd, { ...this.opts, @@ -600,7 +661,8 @@ export class Glob implements GlobOptions { */ stream(): Minipass, Result> stream(): Minipass { - return new GlobStream(this.patterns, this.scurry.cwd, { + const concurrency = validateAsyncConcurrency(this.concurrency) + const stream = new GlobStream(this.patterns, this.scurry.cwd, { ...this.opts, maxDepth: this.maxDepth !== Infinity ? @@ -609,7 +671,10 @@ export class Glob implements GlobOptions { platform: this.platform, nocase: this.nocase, includeChildMatches: this.includeChildMatches, - }).stream() + }) + return concurrency === undefined ? + stream.stream() + : stream.streamWithConcurrency(concurrency) } /** @@ -617,6 +682,7 @@ export class Glob implements GlobOptions { */ streamSync(): Minipass, Result> streamSync(): Minipass { + assertNoSyncConcurrency(this.concurrency) return new GlobStream(this.patterns, this.scurry.cwd, { ...this.opts, maxDepth: diff --git a/src/index.ts b/src/index.ts index a44700bf..3246de1f 100644 --- a/src/index.ts +++ b/src/index.ts @@ -7,7 +7,7 @@ import type { GlobOptionsWithFileTypesTrue, GlobOptionsWithFileTypesUnset, } from './glob.js' -import { Glob } from './glob.js' +import { assertNoSyncConcurrency, Glob } from './glob.js' import { hasMagic } from './has-magic.js' export { escape, unescape } from 'minimatch' @@ -55,6 +55,7 @@ export function globStreamSync( pattern: string | string[], options: GlobOptions = {}, ) { + assertNoSyncConcurrency(options.concurrency) return new Glob(pattern, options).streamSync() } @@ -108,6 +109,7 @@ export function globSync( pattern: string | string[], options: GlobOptions = {}, ) { + assertNoSyncConcurrency(options.concurrency) return new Glob(pattern, options).walkSync() } @@ -163,6 +165,7 @@ export function globIterateSync( pattern: string | string[], options: GlobOptions = {}, ) { + assertNoSyncConcurrency(options.concurrency) return new Glob(pattern, options).iterateSync() } diff --git a/src/walker.ts b/src/walker.ts index c70e38bb..9806b81d 100644 --- a/src/walker.ts +++ b/src/walker.ts @@ -24,6 +24,7 @@ export interface GlobWalkerOpts { dot?: boolean dotRelative?: boolean follow?: boolean + concurrency?: number ignore?: string | string[] | IgnoreLike mark?: boolean matchBase?: boolean @@ -81,6 +82,68 @@ const makeIgnore = ( : Array.isArray(ignore) ? new Ignore(ignore, opts) : ignore +type PendingRead = { + run: () => void + skip: () => void +} + +class AsyncReadLimiter { + readonly concurrency: number + #inFlight: number = 0 + #pending: PendingRead[] = [] + #signal?: AbortSignal + + constructor(concurrency: number, signal?: AbortSignal) { + this.concurrency = concurrency + this.#signal = signal + /* c8 ignore start */ + this.#signal?.addEventListener('abort', () => { + const pending = this.#pending.splice(0) + for (const item of pending) { + item.skip() + } + }) + /* c8 ignore stop */ + } + + schedule(run: () => void, skip: () => void) { + /* c8 ignore start */ + if (this.#signal?.aborted) { + skip() + return + } + /* c8 ignore stop */ + + if (this.#inFlight < this.concurrency) { + this.#inFlight++ + run() + return + } + + this.#pending.push({ run, skip }) + } + + done() { + this.#inFlight-- + while (this.#inFlight < this.concurrency) { + const next = this.#pending.shift() + if (!next) { + return + } + + /* c8 ignore start */ + if (this.#signal?.aborted) { + next.skip() + continue + } + /* c8 ignore stop */ + + this.#inFlight++ + next.run() + } + } +} + /** * basic walking utilities that all the glob walker types use */ @@ -94,6 +157,7 @@ export abstract class GlobUtil { #onResume: (() => unknown)[] = [] #ignore?: IgnoreLike #sep: '\\' | '/' + #readdirLimiter?: AsyncReadLimiter signal?: AbortSignal maxDepth: number includeChildMatches: boolean @@ -274,6 +338,124 @@ export abstract class GlobUtil { this.walkCB2(target, patterns, new Processor(this.opts), cb) } + walkCBWithConcurrency( + target: Path, + patterns: Pattern[], + concurrency: number, + cb: () => unknown, + ) { + if (this.signal?.aborted) return cb() + this.#readdirLimiter = new AsyncReadLimiter(concurrency, this.signal) + this.walkCB2WithConcurrency( + target, + patterns, + new Processor(this.opts), + cb, + ) + } + + #queueReaddir( + target: Path, + processor: Processor, + cb: () => unknown, + ) { + const limiter = this.#readdirLimiter + /* c8 ignore start */ + if (!limiter) { + throw new Error('bounded readdir limiter not initialized') + } + /* c8 ignore stop */ + + limiter.schedule( + () => { + target.readdirCB( + (_, entries) => { + limiter.done() + this.walkCB3WithConcurrency(target, entries, processor, cb) + }, + true, + ) + }, + cb, + ) + } + + walkCB2WithConcurrency( + target: Path, + patterns: Pattern[], + processor: Processor, + cb: () => unknown, + ) { + if (this.#childrenIgnored(target)) return cb() + /* c8 ignore next */ + if (this.signal?.aborted) return cb() + /* c8 ignore start */ + if (this.paused) { + this.onResume(() => + this.walkCB2WithConcurrency(target, patterns, processor, cb), + ) + /* c8 ignore next */ + return + } + /* c8 ignore stop */ + processor.processPatterns(target, patterns) + + let tasks = 1 + const next = () => { + if (--tasks === 0) cb() + } + + for (const [m, absolute, ifDir] of processor.matches.entries()) { + /* c8 ignore next */ + if (this.#ignored(m)) continue + tasks++ + void this.match(m, absolute, ifDir).then(() => next()) + } + + for (const t of processor.subwalkTargets()) { + if (this.maxDepth !== Infinity && t.depth() >= this.maxDepth) { + /* c8 ignore next */ + continue + } + tasks++ + const childrenCached = t.readdirCached() + if (t.calledReaddir()) { + this.walkCB3WithConcurrency(t, childrenCached, processor, next) + } else { + this.#queueReaddir(t, processor, next) + } + } + + next() + } + + walkCB3WithConcurrency( + target: Path, + entries: Path[], + processorx: Processor, + cb: () => unknown, + ) { + if (this.signal?.aborted) return cb() + const proc = processorx.filterEntries(target, entries) + + let tasks = 1 + const next = () => { + if (--tasks === 0) cb() + } + + for (const [m, absolute, ifDir] of proc.matches.entries()) { + if (this.#ignored(m)) continue + tasks++ + void this.match(m, absolute, ifDir).then(() => next()) + } + for (const [target, patterns] of proc.subwalks.entries()) { + tasks++ + this.walkCB2WithConcurrency(target, patterns, proc.child(), next) + } + + next() + } + walkCB2( target: Path, patterns: Pattern[], @@ -451,6 +633,30 @@ export class GlobWalker< return this.matches } + async walkWithConcurrency( + concurrency: number, + ): Promise>> { + if (this.signal?.aborted) throw this.signal.reason + if (this.path.isUnknown()) { + await this.path.lstat() + } + await new Promise((res, rej) => { + this.walkCBWithConcurrency( + this.path, + this.patterns, + concurrency, + () => { + if (this.signal?.aborted) { + rej(this.signal.reason) + } else { + res(this.matches) + } + }, + ) + }) + return this.matches + } + walkSync(): Set> { if (this.signal?.aborted) throw this.signal.reason if (this.path.isUnknown()) { @@ -496,6 +702,28 @@ export class GlobStream< return this.results } + streamWithConcurrency(concurrency: number): MatchStream { + const target = this.path + if (target.isUnknown()) { + void target.lstat().then(() => { + this.walkCBWithConcurrency( + target, + this.patterns, + concurrency, + () => this.results.end(), + ) + }) + } else { + this.walkCBWithConcurrency( + target, + this.patterns, + concurrency, + () => this.results.end(), + ) + } + return this.results + } + streamSync(): MatchStream { if (this.path.isUnknown()) { this.path.lstatSync() diff --git a/test/signal.ts b/test/signal.ts index 20dd435a..0d8657bd 100644 --- a/test/signal.ts +++ b/test/signal.ts @@ -21,6 +21,7 @@ const mocks = (ac: AbortController) => ({ const __dirname = fileURLToPath(new URL('.', import.meta.url)) const cwd = resolve(__dirname, 'fixtures/a') +const concurrency = 8 const yeet = new Error('yeet') @@ -73,6 +74,50 @@ t.test('mid-abort stream', t => { s.once('data', () => ac.abort(yeet)) }) +t.test('pre abort walk with concurrency', async t => { + const ac = new AbortController() + ac.abort(yeet) + await t.rejects( + glob('./**', { cwd, signal: ac.signal, concurrency }), + yeet, + ) +}) + +t.test('mid-abort walk with concurrency', async t => { + const ac = new AbortController() + const res = glob('./**', { cwd, signal: ac.signal, concurrency }) + ac.abort(yeet) + await t.rejects(res, yeet) +}) + +t.test('pre abort stream with concurrency', t => { + const ac = new AbortController() + ac.abort(yeet) + const s = globStream('./**', { + cwd, + signal: ac.signal, + concurrency, + }) + s.on('error', er => { + t.equal(er, yeet) + t.end() + }) +}) + +t.test('mid-abort stream with concurrency', t => { + const ac = new AbortController() + const s = globStream('./**', { + cwd, + signal: ac.signal, + concurrency, + }) + s.on('error', er => { + t.equal(er, yeet) + t.end() + }) + s.once('data', () => ac.abort(yeet)) +}) + t.test('pre abort sync stream', t => { const ac = new AbortController() ac.abort(yeet) diff --git a/test/stream.ts b/test/stream.ts index fb74a452..2b0ee4c1 100644 --- a/test/stream.ts +++ b/test/stream.ts @@ -199,6 +199,16 @@ t.test('iterate on main', async t => { t.equal(e.size, 0, 'saw all entries') }) +t.test('bounded iterate on main', async t => { + const s = globIterate('./**', { cwd, concurrency: 8 }) + const e = new Set(expect) + for await (const c of s) { + t.equal(e.has(c), true, JSON.stringify(c)) + e.delete(c) + } + t.equal(e.size, 0, 'saw all entries') +}) + t.test('iterateSync on main', t => { const s = globIterateSync('./**', { cwd }) const e = new Set(expect) @@ -226,6 +236,56 @@ t.test('stream on main', t => { sync = false }) +t.test('bounded stream on main', t => { + let sync: boolean = true + const stream = globStream('./**', { cwd, concurrency: 8 }) + const e = new Set(expect) + stream.on('data', c => { + t.equal(e.has(c), true, JSON.stringify(c)) + e.delete(c) + }) + stream.on('end', () => { + t.equal(e.size, 0, 'saw all entries') + t.equal(sync, false, 'did not finish in one tick') + t.end() + }) + sync = false +}) + +t.test('bounded stream reuses a warmed glob cache', t => { + const s = new Glob('./**', { cwd, concurrency: 8 }) + void s.walk().then(() => { + let sync: boolean = true + const stream = new Glob('./**', s).stream() + const e = new Set(expect) + stream.on('data', c => { + t.equal(e.has(c), true, JSON.stringify(c)) + e.delete(c) + }) + stream.on('end', () => { + t.equal(e.size, 0, 'saw all entries') + t.equal(sync, false, 'did not finish in one tick') + t.end() + }) + sync = false + }) +}) + +t.test('bounded stream handles paused consumers', t => { + const stream = globStream('./**', { cwd, concurrency: 8 }) + const e = new Set(expect) + stream.pause() + stream.on('data', c => { + t.equal(e.has(c), true, JSON.stringify(c)) + e.delete(c) + }) + stream.on('end', () => { + t.equal(e.size, 0, 'saw all entries') + t.end() + }) + setTimeout(() => stream.resume(), 10) +}) + t.test('streamSync on main', t => { let sync: boolean = true const stream = globStreamSync('./**', { cwd })