diff --git a/cli/src/args.rs b/cli/src/args.rs index 718c527..fb07aec 100644 --- a/cli/src/args.rs +++ b/cli/src/args.rs @@ -8,7 +8,7 @@ use zeekstd::{CompressionLevel, SeekTable, seek_table}; // 128 MiB const MMAP_THRESHOLD: u64 = 0x0010_0000; -#[derive(Debug, Clone)] +#[derive(Debug, Copy, Clone)] pub struct ByteValue(u32); impl ByteValue { @@ -178,6 +178,9 @@ pub struct CompressArgs { #[arg(long, default_value = "uncompressed")] pub frame_size_policy: FrameSizePolicy, + #[arg(long)] + pub align: Option, + /// Provide a reference point for Zstandard's diff engine. #[arg(long)] pub patch_from: Option, @@ -295,6 +298,9 @@ pub struct ListArgs { #[arg(short, long, action)] pub detail: bool, + #[arg(long)] + pub show_padding: bool, + /// The format of the seek table. #[arg(long, default_value = "foot")] pub seek_table_format: SeekTableFormat, diff --git a/cli/src/command.rs b/cli/src/command.rs index 4794ddf..60eadd8 100644 --- a/cli/src/command.rs +++ b/cli/src/command.rs @@ -248,6 +248,7 @@ impl Command { start_frame: args.from_frame, end_frame, detail: args.detail, + show_padding: args.show_padding, }; Executor { @@ -284,6 +285,7 @@ enum ExecMode<'a> { start_frame: Option, end_frame: Option, detail: bool, + show_padding: bool, }, } @@ -346,11 +348,18 @@ impl Executor<'_> { start_frame, end_frame, detail, + show_padding, } => { - if start_frame.is_none() && end_frame.is_none() && !detail { + if start_frame.is_none() && end_frame.is_none() && !detail && !show_padding { list_summarize(&seek_table, &self.in_path, self.byte_fmt); } else { - list_frames(&seek_table, start_frame, end_frame, self.byte_fmt)?; + list_frames( + &seek_table, + start_frame, + end_frame, + self.byte_fmt, + show_padding, + )?; } } } @@ -423,6 +432,7 @@ fn list_frames( start_frame: Option, end_frame: Option, byte_fmt: fn(u64) -> String, + show_padding: bool, ) -> Result<()> { use std::fmt::Write as _; @@ -440,15 +450,19 @@ fn list_frames( let mut cnt = 0; for n in start..=end { - let comp = (byte_fmt)(st.frame_size_comp(n)?); - let uncomp = (byte_fmt)(st.frame_size_decomp(n)?); - let comp_off = (byte_fmt)(st.frame_start_comp(n)?); - let uncomp_off = (byte_fmt)(st.frame_start_decomp(n)?); - - writeln!( - &mut buf, - "{n: <15} {comp: <15} {uncomp: <15} {comp_off: <20} {uncomp_off: <20}", - )?; + let n_uncomp = st.frame_size_decomp(n)?; + + if n_uncomp > 0 || show_padding { + let comp = (byte_fmt)(st.frame_size_comp(n)?); + let uncomp = (byte_fmt)(n_uncomp); + let comp_off = (byte_fmt)(st.frame_start_comp(n)?); + let uncomp_off = (byte_fmt)(st.frame_start_decomp(n)?); + + writeln!( + &mut buf, + "{n: <15} {comp: <15} {uncomp: <15} {comp_off: <20} {uncomp_off: <20}", + )?; + } cnt += 1; if cnt == 100 { diff --git a/cli/src/compress.rs b/cli/src/compress.rs index 7172d65..0a201cb 100644 --- a/cli/src/compress.rs +++ b/cli/src/compress.rs @@ -35,6 +35,7 @@ impl Compressor<'_, W> { let encoder = EncodeOptions::with_cctx(cctx) .frame_size_policy(args.to_frame_size_policy()) + .align(args.align.map(|x| x.as_u32())) .checksum_flag(!args.no_checksum) .compression_level(args.compression_level) .into_encoder(writer) diff --git a/lib/src/encode.rs b/lib/src/encode.rs index 13c7df2..3f5dc8c 100644 --- a/lib/src/encode.rs +++ b/lib/src/encode.rs @@ -7,7 +7,9 @@ use zstd_safe::{ #[cfg(feature = "std")] use crate::seek_table::Format; -use crate::{SEEKABLE_MAX_FRAME_SIZE, SeekTable, error::Result}; +use crate::{ + SEEKABLE_MAX_FRAME_SIZE, SeekTable, error::Result, seek_table::SKIPPABLE_MAGIC_NUMBER, +}; // Constant value always can be casted const MAX_FRAME_SIZE: u32 = SEEKABLE_MAX_FRAME_SIZE as u32; @@ -110,6 +112,7 @@ impl EpilogueProgress { pub struct EncodeOptions<'a> { cctx: CCtx<'a>, frame_policy: FrameSizePolicy, + alignment: Option, checksum_flag: bool, compression_level: CompressionLevel, } @@ -143,6 +146,7 @@ impl<'a> EncodeOptions<'a> { Self { cctx, frame_policy: FrameSizePolicy::default(), + alignment: None, checksum_flag: false, compression_level: CompressionLevel::default(), } @@ -160,6 +164,12 @@ impl<'a> EncodeOptions<'a> { self } + /// Sets the frame alignment + pub fn align(mut self, alignment: Option) -> Self { + self.alignment = alignment; + self + } + /// Whether to write 32 bit checksums at the end of frames. pub fn checksum_flag(mut self, flag: bool) -> Self { self.checksum_flag = flag; @@ -266,6 +276,9 @@ impl<'a> EncodeOptions<'a> { pub struct RawEncoder<'a> { cctx: CCtx<'a>, frame_policy: FrameSizePolicy, + alignment: u32, + total_bytes_written: usize, + fill_zero: usize, frame_c_size: u32, frame_d_size: u32, seek_table: SeekTable, @@ -286,6 +299,9 @@ impl<'a> RawEncoder<'a> { Ok(Self { cctx: opts.cctx, frame_policy: opts.frame_policy, + alignment: opts.alignment.unwrap_or(1), + total_bytes_written: 0, + fill_zero: 0, frame_c_size: 0, frame_d_size: 0, seek_table: SeekTable::new(), @@ -313,6 +329,17 @@ impl<'a> RawEncoder<'a> { input: &[u8], output: &mut [u8], prefix: Option<&'b [u8]>, + ) -> Result { + let progress = self.compress_with_prefix_(input, output, prefix)?; + self.total_bytes_written += progress.out_progress; + Ok(progress) + } + + fn compress_with_prefix_<'b: 'a>( + &mut self, + input: &[u8], + output: &mut [u8], + prefix: Option<&'b [u8]>, ) -> Result { if self.is_frame_complete() { let mut out_progress = 0; @@ -323,34 +350,53 @@ impl<'a> RawEncoder<'a> { break; } } + return Ok(CompressionProgress::new(0, out_progress)); + } + + if self.fill_zero > 0 { + let out_progress = self.fill_zero.min(output.len()); + self.fill_zero -= out_progress; + return Ok(CompressionProgress::new(0, out_progress)); + } - Ok(CompressionProgress::new(0, out_progress)) - } else { - let limit = input.len().min(self.remaining_frame_size()); - let mut in_buf = InBuffer::around(&input[..limit]); - let mut out_buf = OutBuffer::around(output); - // Reference prefix at the beginning of a frame - // TODO: chain when stable - if let Some(pref) = prefix { - if self.frame_d_size == 0 { - self.cctx.ref_prefix(pref)?; + if self.frame_c_size == 0 { + match self.total_bytes_written % self.alignment as usize { + 0 => (), + r => { + let mut padding = self.alignment as usize - r; + if padding < 8 { + padding += self.alignment as usize + } + let out_progress = self.add_padding(output, padding)?; + return Ok(CompressionProgress::new(0, out_progress)); } } + } - while in_buf.pos() < limit && out_buf.pos() < out_buf.capacity() { - self.cctx.compress_stream2( - &mut out_buf, - &mut in_buf, - ZSTD_EndDirective::ZSTD_e_continue, - )?; + let limit = input.len().min(self.remaining_frame_size()); + let mut in_buf = InBuffer::around(&input[..limit]); + let mut out_buf = OutBuffer::around(output); + // Reference prefix at the beginning of a frame + // TODO: chain when stable + if let Some(pref) = prefix { + if self.frame_d_size == 0 { + self.cctx.ref_prefix(pref)?; } + } - // Casting should always be fine - self.frame_c_size += out_buf.pos() as u32; - self.frame_d_size += in_buf.pos() as u32; - - Ok(CompressionProgress::new(in_buf.pos(), out_buf.pos())) + while in_buf.pos() < limit && out_buf.pos() < out_buf.capacity() { + self.cctx.compress_stream2( + &mut out_buf, + &mut in_buf, + ZSTD_EndDirective::ZSTD_e_continue, + )?; } + + // Casting should always be fine + self.frame_c_size += out_buf.pos() as u32; + self.frame_d_size += in_buf.pos() as u32; + + Ok(CompressionProgress::new(in_buf.pos(), out_buf.pos())) } } @@ -471,6 +517,19 @@ impl RawEncoder<'_> { Ok(EpilogueProgress::new(out_buf.pos(), 0)) } + pub fn add_padding(&mut self, output: &mut [u8], padding: usize) -> Result { + assert!(padding >= 8); + if output.len() < 8 { + return Ok(0); + } + let zeroes = padding as u32 - 8; + output[0..4].copy_from_slice(&SKIPPABLE_MAGIC_NUMBER.to_le_bytes()); + output[4..8].copy_from_slice(&zeroes.to_le_bytes()); + self.fill_zero = zeroes as usize; + self.seek_table.log_frame(padding as u32, 0)?; + Ok(8) + } + /// Returns a reference to the internal [`SeekTable`]. /// /// # Examples diff --git a/lib/src/seek_table.rs b/lib/src/seek_table.rs index 3385062..7ac00f4 100644 --- a/lib/src/seek_table.rs +++ b/lib/src/seek_table.rs @@ -86,7 +86,7 @@ macro_rules! write_integrity { /// The size of each frame entry in the seek table. const SIZE_PER_FRAME: usize = 8; /// The skippable magic number of the skippable frame containing the seek table. -const SKIPPABLE_MAGIC_NUMBER: u32 = zstd_safe::zstd_sys::ZSTD_MAGIC_SKIPPABLE_START | 0xE; +pub const SKIPPABLE_MAGIC_NUMBER: u32 = zstd_safe::zstd_sys::ZSTD_MAGIC_SKIPPABLE_START | 0xE; struct Frame { c_size: u32,