Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,6 @@ target
# Added by cargo

/target

# A local cache for pre-training data.
corpus
4 changes: 2 additions & 2 deletions crates/wubbie/src/config/tokenizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use crate::tokenizer::{DEFAULT_MIN_FREQUENCY, VOCAB_SIZE};
/// `wubbie tokenizer`: train the byte-level BPE tokenizer on a corpus slice.
///
/// The corpus comes from exactly one source: a local `--input` path (a file, or
/// a directory swept for corpus files) or a pinned Hugging Face dataset
/// a directory walked recursively for corpus files) or a pinned Hugging Face dataset
/// (`--hf-repo` at `--hf-revision`). The filtered CommonPile slice lives on HF
/// (MULTI-1378) and is pulled on demand; the local path covers tiny local runs.
/// Shards are JSONL/`.gz` (text under `--text-field`) or plain text.
Expand All @@ -24,7 +24,7 @@ use crate::tokenizer::{DEFAULT_MIN_FREQUENCY, VOCAB_SIZE};
ArgGroup::new("corpus_source").required(true).args(["input", "hf_repo"])
))]
pub struct TokenizerSubcommand {
/// Local corpus: a single file, or a directory swept (non-recursively) for
/// Local corpus: a single file, or a directory walked recursively for
/// `.jsonl`/`.jsonl.gz`/`.txt` files.
#[arg(long, value_name = "PATH")]
input: Option<PathBuf>,
Expand Down
89 changes: 76 additions & 13 deletions crates/wubbie/src/corpus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ pub struct HfSource {
/// Where the corpus comes from.
#[derive(Debug, Clone)]
pub enum CorpusSource {
/// A local file, or a directory swept (non-recursively) for corpus files.
/// A local file, or a directory walked recursively for corpus files.
Local(PathBuf),
/// A pinned Hugging Face dataset, pulled via [`hf_hub`].
Hf(HfSource),
Expand Down Expand Up @@ -192,6 +192,12 @@ pub fn read_sample(files: &[PathBuf], text_field: &str, max_chars: usize) -> Res
}

/// Resolve a local `--input` path to its corpus files.
///
/// A file path resolves to itself; a directory is walked **recursively** for
/// corpus shards at any depth. The CommonPile slice is laid out one
/// subdirectory per source (`<source>/<source>.chunk.NN.jsonl.gz`), so a
/// flat, single-level sweep would miss every shard — the walk descends into
/// subdirectories to find them.
fn collect_local_files(input: &Path) -> Result<Vec<PathBuf>> {
let metadata = fs::metadata(input)
.with_context(|| format!("cannot read corpus input: {}", input.display()))?;
Expand All @@ -200,22 +206,46 @@ fn collect_local_files(input: &Path) -> Result<Vec<PathBuf>> {
return Ok(vec![input.to_path_buf()]);
}

let mut files: Vec<PathBuf> = fs::read_dir(input)
.with_context(|| format!("cannot read corpus directory: {}", input.display()))?
.filter_map(|entry| entry.ok().map(|entry| entry.path()))
.filter(|path| {
path.is_file()
&& path
.file_name()
.and_then(|n| n.to_str())
.is_some_and(is_corpus_file)
})
.collect();
// Iterative depth-first walk over a stack of directories. Sorting the full
// result at the end makes the shard order deterministic regardless of the
// order the filesystem hands back entries — the testing policy requires it.
let mut files: Vec<PathBuf> = Vec::new();
let mut dirs = vec![input.to_path_buf()];
while let Some(dir) = dirs.pop() {
let entries = fs::read_dir(&dir)
.with_context(|| format!("cannot read corpus directory: {}", dir.display()))?;
for entry in entries {
let entry = entry.with_context(|| {
format!(
"cannot read an entry in corpus directory: {}",
dir.display()
)
})?;
// `file_type()` does not follow symlinks, so we only descend into
// real subdirectories — a symlinked directory is skipped, which
// keeps the walk free of symlink cycles.
let file_type = entry
.file_type()
.with_context(|| format!("cannot stat corpus entry: {}", entry.path().display()))?;
let path = entry.path();
if file_type.is_dir() {
dirs.push(path);
} else if path
.file_name()
.and_then(|n| n.to_str())
.is_some_and(is_corpus_file)
// `is_file()` follows symlinks, so a symlinked shard still counts.
&& path.is_file()
{
files.push(path);
}
}
}
files.sort();

ensure!(
!files.is_empty(),
"no corpus files (.jsonl/.jsonl.gz/.txt) found in directory: {}",
"no corpus files (.jsonl/.jsonl.gz/.txt) found under directory: {}",
input.display(),
);
Ok(files)
Expand Down Expand Up @@ -502,6 +532,39 @@ mod tests {
fs::remove_dir_all(&dir).expect("cleanup");
}

#[test]
fn collects_corpus_files_recursively_and_sorted() {
// The CommonPile slice nests shards one subdirectory per source, so the
// walk must descend into subdirectories — a single-level sweep would
// find nothing. Mixed depths and a non-corpus file at the root exercise
// both the recursion and the extension filter.
let dir = std::env::temp_dir().join(format!("wubbie-recursive-{}", std::process::id()));
let _ = fs::remove_dir_all(&dir);
let src_a = dir.join("source_a");
let src_b = dir.join("source_b").join("nested");
fs::create_dir_all(&src_a).expect("create source_a");
fs::create_dir_all(&src_b).expect("create nested source_b");

// Non-corpus files (at the root and inside a source dir) must be ignored.
fs::write(dir.join("README.md"), "ignore me").expect("write readme");
fs::write(src_a.join("manifest.json"), "{}").expect("write manifest");
// Corpus shards at two different depths.
fs::write(src_a.join("a.chunk.00.jsonl"), "{\"text\": \"alpha\"}\n").expect("write a");
fs::write(src_b.join("b.chunk.00.jsonl"), "{\"text\": \"beta\"}\n").expect("write b");

let files = collect_local_files(&dir).expect("collect files");
assert_eq!(
files,
vec![
src_a.join("a.chunk.00.jsonl"),
src_b.join("b.chunk.00.jsonl")
],
"recurses into subdirs, ignores non-corpus files, and returns sorted paths",
);

fs::remove_dir_all(&dir).expect("cleanup");
}

#[test]
fn empty_directory_is_rejected() {
let dir = std::env::temp_dir().join(format!("wubbie-empty-{}", std::process::id()));
Expand Down