1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
use crate::NippyJarError;
use serde::{Deserialize, Serialize};

mod zstd;
pub use self::zstd::{DecoderDictionary, Decompressor, Zstd, ZstdState};
mod lz4;
pub use self::lz4::Lz4;

/// Trait that will compress column values
pub trait Compression: Serialize + for<'a> Deserialize<'a> {
    /// Appends decompressed data to the dest buffer. Requires `dest` to have sufficient capacity.
    fn decompress_to(&self, value: &[u8], dest: &mut Vec<u8>) -> Result<(), NippyJarError>;

    /// Returns decompressed data.
    fn decompress(&self, value: &[u8]) -> Result<Vec<u8>, NippyJarError>;

    /// Appends compressed data from `src` to `dest`. `dest`. Requires `dest` to have sufficient
    /// capacity.
    ///
    /// Returns number of bytes written to `dest`.
    fn compress_to(&self, src: &[u8], dest: &mut Vec<u8>) -> Result<usize, NippyJarError>;

    /// Compresses data from `src`
    fn compress(&self, src: &[u8]) -> Result<Vec<u8>, NippyJarError>;

    /// Returns `true` if it's ready to compress.
    ///
    /// Example: it will return false, if `zstd` with dictionary is set, but wasn't generated.
    fn is_ready(&self) -> bool {
        true
    }

    #[cfg(test)]
    /// If required, prepares compression algorithm with an early pass on the data.
    fn prepare_compression(
        &mut self,
        _columns: Vec<impl IntoIterator<Item = Vec<u8>>>,
    ) -> Result<(), NippyJarError> {
        Ok(())
    }
}

/// Enum with different [`Compression`] types.
#[derive(Debug, Serialize, Deserialize)]
#[cfg_attr(test, derive(PartialEq))]
pub enum Compressors {
    Zstd(Zstd),
    Lz4(Lz4),
}

impl Compression for Compressors {
    fn decompress_to(&self, value: &[u8], dest: &mut Vec<u8>) -> Result<(), NippyJarError> {
        match self {
            Self::Zstd(zstd) => zstd.decompress_to(value, dest),
            Self::Lz4(lz4) => lz4.decompress_to(value, dest),
        }
    }
    fn decompress(&self, value: &[u8]) -> Result<Vec<u8>, NippyJarError> {
        match self {
            Self::Zstd(zstd) => zstd.decompress(value),
            Self::Lz4(lz4) => lz4.decompress(value),
        }
    }

    fn compress_to(&self, src: &[u8], dest: &mut Vec<u8>) -> Result<usize, NippyJarError> {
        let initial_capacity = dest.capacity();
        loop {
            let result = match self {
                Self::Zstd(zstd) => zstd.compress_to(src, dest),
                Self::Lz4(lz4) => lz4.compress_to(src, dest),
            };

            match result {
                Ok(v) => return Ok(v),
                Err(err) => match err {
                    NippyJarError::OutputTooSmall => {
                        dest.reserve(initial_capacity);
                    }
                    _ => return Err(err),
                },
            }
        }
    }

    fn compress(&self, src: &[u8]) -> Result<Vec<u8>, NippyJarError> {
        match self {
            Self::Zstd(zstd) => zstd.compress(src),
            Self::Lz4(lz4) => lz4.compress(src),
        }
    }

    fn is_ready(&self) -> bool {
        match self {
            Self::Zstd(zstd) => zstd.is_ready(),
            Self::Lz4(lz4) => lz4.is_ready(),
        }
    }

    #[cfg(test)]
    fn prepare_compression(
        &mut self,
        columns: Vec<impl IntoIterator<Item = Vec<u8>>>,
    ) -> Result<(), NippyJarError> {
        match self {
            Self::Zstd(zstd) => zstd.prepare_compression(columns),
            Self::Lz4(lz4) => lz4.prepare_compression(columns),
        }
    }
}