Skip to main content

reth_zstd_compressors/
lib.rs

1//! Commonly used zstd [`Compressor`] and [`Decompressor`] for reth types.
2
3#![doc(
4    html_logo_url = "https://raw.githubusercontent.com/paradigmxyz/reth/main/assets/reth-docs.png",
5    html_favicon_url = "https://avatars0.githubusercontent.com/u/97369466?s=256",
6    issue_tracker_base_url = "https://github.com/paradigmxyz/reth/issues/"
7)]
8#![cfg_attr(not(test), warn(unused_crate_dependencies))]
9#![cfg_attr(docsrs, feature(doc_cfg))]
10#![cfg_attr(not(feature = "std"), no_std)]
11
12extern crate alloc;
13
14use crate::alloc::string::ToString;
15use alloc::vec::Vec;
16use zstd::bulk::{Compressor, Decompressor};
17
18/// Compression/Decompression dictionary for `Receipt`.
19pub static RECEIPT_DICTIONARY: &[u8] = include_bytes!("../receipt_dictionary.bin");
20/// Compression/Decompression dictionary for `Transaction`.
21pub static TRANSACTION_DICTIONARY: &[u8] = include_bytes!("../transaction_dictionary.bin");
22
23#[cfg(feature = "std")]
24pub use locals::*;
25#[cfg(feature = "std")]
26mod locals {
27    use super::*;
28    use core::cell::RefCell;
29
30    // We use `thread_local` compressors and decompressors because dictionaries can be quite big,
31    // and zstd-rs recommends to use one context/compressor per thread
32    std::thread_local! {
33        /// Thread Transaction compressor.
34        pub static TRANSACTION_COMPRESSOR: RefCell<Compressor<'static>> = RefCell::new(
35            Compressor::with_dictionary(0, TRANSACTION_DICTIONARY)
36                .expect("failed to initialize transaction compressor"),
37        );
38
39        /// Thread Transaction decompressor.
40        pub static TRANSACTION_DECOMPRESSOR: RefCell<ReusableDecompressor> =
41            RefCell::new(ReusableDecompressor::new(
42                Decompressor::with_dictionary(TRANSACTION_DICTIONARY)
43                    .expect("failed to initialize transaction decompressor"),
44            ));
45
46        /// Thread receipt compressor.
47        pub static RECEIPT_COMPRESSOR: RefCell<Compressor<'static>> = RefCell::new(
48            Compressor::with_dictionary(0, RECEIPT_DICTIONARY)
49                .expect("failed to initialize receipt compressor"),
50        );
51
52        /// Thread receipt decompressor.
53        pub static RECEIPT_DECOMPRESSOR: RefCell<ReusableDecompressor> =
54            RefCell::new(ReusableDecompressor::new(
55                Decompressor::with_dictionary(RECEIPT_DICTIONARY)
56                    .expect("failed to initialize receipt decompressor"),
57            ));
58    }
59}
60
61/// Fn creates tx [`Compressor`]
62pub fn create_tx_compressor() -> Compressor<'static> {
63    Compressor::with_dictionary(0, TRANSACTION_DICTIONARY)
64        .expect("Failed to instantiate tx compressor")
65}
66
67/// Fn creates tx [`Decompressor`]
68pub fn create_tx_decompressor() -> ReusableDecompressor {
69    ReusableDecompressor::new(
70        Decompressor::with_dictionary(TRANSACTION_DICTIONARY)
71            .expect("Failed to instantiate tx decompressor"),
72    )
73}
74
75/// Fn creates receipt [`Compressor`]
76pub fn create_receipt_compressor() -> Compressor<'static> {
77    Compressor::with_dictionary(0, RECEIPT_DICTIONARY)
78        .expect("Failed to instantiate receipt compressor")
79}
80
81/// Fn creates receipt [`Decompressor`]
82pub fn create_receipt_decompressor() -> ReusableDecompressor {
83    ReusableDecompressor::new(
84        Decompressor::with_dictionary(RECEIPT_DICTIONARY)
85            .expect("Failed to instantiate receipt decompressor"),
86    )
87}
88
89/// Executes `f` with the thread-local transaction compressor on `std`, otherwise creates a new one.
90#[inline]
91pub fn with_tx_compressor<R>(f: impl FnOnce(&mut Compressor<'_>) -> R) -> R {
92    #[cfg(feature = "std")]
93    {
94        TRANSACTION_COMPRESSOR.with_borrow_mut(f)
95    }
96    #[cfg(not(feature = "std"))]
97    {
98        f(&mut create_tx_compressor())
99    }
100}
101
102/// Executes `f` with the thread-local transaction decompressor on `std`, otherwise creates a new
103/// one.
104#[inline]
105pub fn with_tx_decompressor<R>(f: impl FnOnce(&mut ReusableDecompressor) -> R) -> R {
106    #[cfg(feature = "std")]
107    {
108        TRANSACTION_DECOMPRESSOR.with_borrow_mut(f)
109    }
110    #[cfg(not(feature = "std"))]
111    {
112        f(&mut create_tx_decompressor())
113    }
114}
115
116/// Executes `f` with the thread-local receipt compressor on `std`, otherwise creates a new one.
117#[inline]
118pub fn with_receipt_compressor<R>(f: impl FnOnce(&mut Compressor<'_>) -> R) -> R {
119    #[cfg(feature = "std")]
120    {
121        RECEIPT_COMPRESSOR.with_borrow_mut(f)
122    }
123    #[cfg(not(feature = "std"))]
124    {
125        f(&mut create_receipt_compressor())
126    }
127}
128
129/// Executes `f` with the thread-local receipt decompressor on `std`, otherwise creates a new one.
130#[inline]
131pub fn with_receipt_decompressor<R>(f: impl FnOnce(&mut ReusableDecompressor) -> R) -> R {
132    #[cfg(feature = "std")]
133    {
134        RECEIPT_DECOMPRESSOR.with_borrow_mut(f)
135    }
136    #[cfg(not(feature = "std"))]
137    {
138        f(&mut create_receipt_decompressor())
139    }
140}
141
142/// Reusable decompressor that uses its own internal buffer.
143#[expect(missing_debug_implementations)]
144pub struct ReusableDecompressor {
145    /// The `zstd` decompressor.
146    decompressor: Decompressor<'static>,
147    /// The buffer to decompress to.
148    buf: Vec<u8>,
149}
150
151impl ReusableDecompressor {
152    fn new(decompressor: Decompressor<'static>) -> Self {
153        Self { decompressor, buf: Vec::with_capacity(4096) }
154    }
155
156    /// Decompresses `src` reusing the decompressor and its internal buffer.
157    pub fn decompress(&mut self, src: &[u8]) -> &[u8] {
158        // If the decompression fails because the buffer is too small, we try to reserve more space
159        // by getting the upper bound and retry the decompression.
160        let mut reserved_upper_bound = false;
161        while let Err(err) = self.decompressor.decompress_to_buffer(src, &mut self.buf) {
162            let err = err.to_string();
163            assert!(
164                err.contains("Destination buffer is too small"),
165                "Failed to decompress {} bytes: {err}",
166                src.len()
167            );
168
169            let additional = 'b: {
170                // Try to get the upper bound of the decompression for the given source.
171                // Do this only once as it might be expensive and will be the same for the same
172                // source.
173                if !reserved_upper_bound {
174                    reserved_upper_bound = true;
175                    if let Some(upper_bound) = Decompressor::upper_bound(src) &&
176                        let Some(additional) = upper_bound.checked_sub(self.buf.capacity())
177                    {
178                        break 'b additional
179                    }
180                }
181
182                // Otherwise, double the capacity of the buffer.
183                // This should normally not be reached as the upper bound should be enough.
184                self.buf.capacity() + 24_000
185            };
186            self.reserve(additional, src.len());
187        }
188
189        // `decompress_to_buffer` sets the length of the vector to the number of bytes written, so
190        // we can safely return it as a slice.
191        &self.buf
192    }
193
194    #[track_caller]
195    fn reserve(&mut self, additional: usize, src_len: usize) {
196        if let Err(e) = self.buf.try_reserve(additional) {
197            panic!(
198                "failed to allocate to {existing} + {additional} bytes \
199                 for the decompression of {src_len} bytes: {e}",
200                existing = self.buf.capacity(),
201            );
202        }
203    }
204}
205
206#[cfg(test)]
207mod tests {
208    use super::*;
209
210    #[test]
211    fn tx_roundtrip_compression() {
212        let data: Vec<u8> = (0u8..=255).collect();
213
214        let mut compressor = create_tx_compressor();
215        let compressed = compressor.compress(&data).expect("compress tx");
216
217        let mut decompressor = create_tx_decompressor();
218        let decompressed = decompressor.decompress(&compressed);
219
220        assert_eq!(decompressed, &*data);
221    }
222
223    #[test]
224    fn receipt_roundtrip_compression() {
225        let data: Vec<u8> = (0u8..=255).rev().collect();
226
227        let mut compressor = create_receipt_compressor();
228        let compressed = compressor.compress(&data).expect("compress receipt");
229
230        let mut decompressor = create_receipt_decompressor();
231        let decompressed = decompressor.decompress(&compressed);
232
233        assert_eq!(decompressed, &*data);
234    }
235}