Add datu split command#58
Merged
Merged
Conversation
Adds `datu split`, the inverse of `concat`: splits a single input file into multiple output files of at most `--split` rows each (default 100000), with an optional `--limit` on total rows processed. Streams the input in a single pass via the existing record-batch reader/writer layer rather than materializing the whole file, so it scales to large inputs. Partition files are named by inserting a zero-padded `.partNNNNN` segment before the extension of the (optional) output path, which defaults to the input path. Co-Authored-By: Claude Sonnet 5 <noreply@anthropic.com>
--split now also takes kb/mb/gb/tb (decimal) or kib/mib/gib/tib (binary), case-insensitive, to size partitions by approximate output size instead of row count. Byte sizes are estimated from each RecordBatch's in-memory Arrow size, since exact on-disk/compressed size can't be known before writing. Also refactors --limit to be applied once via the existing apply_offset_limit helper instead of being threaded through the per-partition loop, since that composition doesn't generalize to byte-sized partitions. Fixes a bug found while testing byte-based splitting: a partition boundary landing exactly at a batch's end could stash a zero-row leftover in the reader's pending slot, which falsely signaled more data was available and produced a phantom empty/corrupt output file. Co-Authored-By: Claude Sonnet 5 <noreply@anthropic.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
datu split, the inverse ofconcat: splits a single input file into multiple output files of at most--splitrows each (default100000), with an optional--limiton total rows processed (default0= unlimited).concat,diff, and the ORC pipeline already use), so it doesn't require a second read pass or materializing the whole file in memory.--splitalso accepts a byte size instead of a row count:kb/mb/gb/tb(decimal) orkib/mib/gib/tib(binary), case-insensitive (e.g.64mb,1.5GiB), for sizing partitions by approximate output size.Changes
src/pipeline/split.rs(new): coresplit_file()— validates input/output formats, opens a lazy row-by-row reader (JSON is materialized via DataFusion, matchingdiff's existing approach), and slicesRecordBatches across partition boundaries via a smallPartitionReaderwrapper.src/bin/datu/commands/split.rs(new): CLI wrapper —datu split <INPUT> [OUTPUT] [OPTIONS]with-I/-O,--split,--limit,--sparse,--json-pretty, and--input-headers, following the same conventions asconcat/convert.OUTPUTis an optional second positional (likehead/tail/sample) that defaults to the input path..partNNNNNsegment (1-based) before the extension, e.g.large-file.avro→large-file.part00001.avro,large-file.part00002.avro, ...src/pipeline.rs,src/bin/datu/commands/mod.rs, andsrc/bin/datu/main.rs.README.md: new### splitsection plus updated "Supported Formats" bullets.features/cli/cli.feature: updated the hardcoded--help/helpoutput docstrings to include the newsplitsubcommand.--splitbyte-size supportSplitSizetype (Rows(usize)/Bytes(u64)) parses--split: a plain integer means rows (unchanged default behavior); a number followed by a unit means bytes. Byte sizes are estimated from eachRecordBatch's in-memory Arrow size (get_array_memory_size()), since the exact on-disk/compressed size can't be known before writing — documented as approximate in--helpand the README.PartitionReader's row-count budget generalized to aRemainingBudgetenum (Rows/Bytes) so the same slicing logic drives both modes.--limit(total row cap) is now applied once up front via the existingapply_offset_limithelper (src/pipeline/record_batch.rs) instead of being threaded through the per-partition loop — that per-partition composition doesn't generalize to byte-sized partitions, and this is a net simplification of the original row-only logic too.pendingslot, which falsely signaled "more data available" and produced a phantom empty/corrupt trailing output file (reproduced with small byte budgets against a 1000-row Avro fixture, 69 partitions, last one corrupt). Fixed by only stashing a leftover when it actually contains rows.features/cli/split.feature: 5 new scenarios (decimal unit, case-insensitivity, binary unit, unknown-unit error, zero-byte-size error) added to the 6 from the original PR.Testing
cargo build/cargo clippy --all-targets— cleancargo fmt --check— cleancargo test --lib— 186 passed, including 19pipeline::splittests (SplitSizeparsing + row/byte-basedsplit_file)cargo test --bins— 38 passed, including 4commands::splittestscargo test --test cli— 134/134 Cucumber scenarios passed (11splitscenarios total)cargo test --test repl— 80/80 Cucumber scenarios passed (regression check)datu split fixtures/userdata5.avro u.avro --split 300on a 1000-row Avro file produced 4 partitions (300/300/300/100 rows), row counts verified to sum back to 1000.datu split fixtures/userdata5.avro u.avro --split 20kbproduced 63 partitions; summed per-partition row counts equal 1000.Checklist
🤖 Generated with Claude Code