reth_nippy_jar/compression/
mod.rs

1use crate::NippyJarError;
2use serde::{Deserialize, Serialize};
3
4mod zstd;
5pub use self::zstd::{DecoderDictionary, Decompressor, Zstd, ZstdState};
6mod lz4;
7pub use self::lz4::Lz4;
8
9/// Trait that will compress column values
10pub trait Compression: Serialize + for<'a> Deserialize<'a> {
11    /// Appends decompressed data to the dest buffer. Requires `dest` to have sufficient capacity.
12    fn decompress_to(&self, value: &[u8], dest: &mut Vec<u8>) -> Result<(), NippyJarError>;
13
14    /// Returns decompressed data.
15    fn decompress(&self, value: &[u8]) -> Result<Vec<u8>, NippyJarError>;
16
17    /// Appends compressed data from `src` to `dest`. `dest`. Requires `dest` to have sufficient
18    /// capacity.
19    ///
20    /// Returns number of bytes written to `dest`.
21    fn compress_to(&self, src: &[u8], dest: &mut Vec<u8>) -> Result<usize, NippyJarError>;
22
23    /// Compresses data from `src`
24    fn compress(&self, src: &[u8]) -> Result<Vec<u8>, NippyJarError>;
25
26    /// Returns `true` if it's ready to compress.
27    ///
28    /// Example: it will return false, if `zstd` with dictionary is set, but wasn't generated.
29    fn is_ready(&self) -> bool {
30        true
31    }
32
33    #[cfg(test)]
34    /// If required, prepares compression algorithm with an early pass on the data.
35    fn prepare_compression(
36        &mut self,
37        _columns: Vec<impl IntoIterator<Item = Vec<u8>>>,
38    ) -> Result<(), NippyJarError> {
39        Ok(())
40    }
41}
42
43/// Enum with different [`Compression`] types.
44#[derive(Debug, Serialize, Deserialize)]
45#[cfg_attr(test, derive(PartialEq))]
46pub enum Compressors {
47    /// Zstandard compression algorithm with custom settings.
48    Zstd(Zstd),
49    /// LZ4 compression algorithm with custom settings.
50    Lz4(Lz4),
51}
52
53impl Compression for Compressors {
54    fn decompress_to(&self, value: &[u8], dest: &mut Vec<u8>) -> Result<(), NippyJarError> {
55        match self {
56            Self::Zstd(zstd) => zstd.decompress_to(value, dest),
57            Self::Lz4(lz4) => lz4.decompress_to(value, dest),
58        }
59    }
60    fn decompress(&self, value: &[u8]) -> Result<Vec<u8>, NippyJarError> {
61        match self {
62            Self::Zstd(zstd) => zstd.decompress(value),
63            Self::Lz4(lz4) => lz4.decompress(value),
64        }
65    }
66
67    fn compress_to(&self, src: &[u8], dest: &mut Vec<u8>) -> Result<usize, NippyJarError> {
68        let initial_capacity = dest.capacity();
69        loop {
70            let result = match self {
71                Self::Zstd(zstd) => zstd.compress_to(src, dest),
72                Self::Lz4(lz4) => lz4.compress_to(src, dest),
73            };
74
75            match result {
76                Ok(v) => return Ok(v),
77                Err(err) => match err {
78                    NippyJarError::OutputTooSmall => {
79                        dest.reserve(initial_capacity);
80                    }
81                    _ => return Err(err),
82                },
83            }
84        }
85    }
86
87    fn compress(&self, src: &[u8]) -> Result<Vec<u8>, NippyJarError> {
88        match self {
89            Self::Zstd(zstd) => zstd.compress(src),
90            Self::Lz4(lz4) => lz4.compress(src),
91        }
92    }
93
94    fn is_ready(&self) -> bool {
95        match self {
96            Self::Zstd(zstd) => zstd.is_ready(),
97            Self::Lz4(lz4) => lz4.is_ready(),
98        }
99    }
100
101    #[cfg(test)]
102    fn prepare_compression(
103        &mut self,
104        columns: Vec<impl IntoIterator<Item = Vec<u8>>>,
105    ) -> Result<(), NippyJarError> {
106        match self {
107            Self::Zstd(zstd) => zstd.prepare_compression(columns),
108            Self::Lz4(lz4) => lz4.prepare_compression(columns),
109        }
110    }
111}