reth_zstd_compressors/
lib.rs

1//! Commonly used zstd [`Compressor`] and [`Decompressor`] for reth types.
2
3#![doc(
4    html_logo_url = "https://raw.githubusercontent.com/paradigmxyz/reth/main/assets/reth-docs.png",
5    html_favicon_url = "https://avatars0.githubusercontent.com/u/97369466?s=256",
6    issue_tracker_base_url = "https://github.com/paradigmxyz/reth/issues/"
7)]
8#![cfg_attr(not(test), warn(unused_crate_dependencies))]
9#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]
10#![cfg_attr(not(feature = "std"), no_std)]
11
12extern crate alloc;
13
14use crate::alloc::string::ToString;
15use alloc::vec::Vec;
16use zstd::bulk::{Compressor, Decompressor};
17
18/// Compression/Decompression dictionary for `Receipt`.
19pub static RECEIPT_DICTIONARY: &[u8] = include_bytes!("../receipt_dictionary.bin");
20/// Compression/Decompression dictionary for `Transaction`.
21pub static TRANSACTION_DICTIONARY: &[u8] = include_bytes!("../transaction_dictionary.bin");
22
23#[cfg(feature = "std")]
24pub use locals::*;
25#[cfg(feature = "std")]
26mod locals {
27    use super::*;
28    use core::cell::RefCell;
29
30    // We use `thread_local` compressors and decompressors because dictionaries can be quite big,
31    // and zstd-rs recommends to use one context/compressor per thread
32    std::thread_local! {
33        /// Thread Transaction compressor.
34        pub static TRANSACTION_COMPRESSOR: RefCell<Compressor<'static>> = RefCell::new(
35            Compressor::with_dictionary(0, TRANSACTION_DICTIONARY)
36                .expect("failed to initialize transaction compressor"),
37        );
38
39        /// Thread Transaction decompressor.
40        pub static TRANSACTION_DECOMPRESSOR: RefCell<ReusableDecompressor> =
41            RefCell::new(ReusableDecompressor::new(
42                Decompressor::with_dictionary(TRANSACTION_DICTIONARY)
43                    .expect("failed to initialize transaction decompressor"),
44            ));
45
46        /// Thread receipt compressor.
47        pub static RECEIPT_COMPRESSOR: RefCell<Compressor<'static>> = RefCell::new(
48            Compressor::with_dictionary(0, RECEIPT_DICTIONARY)
49                .expect("failed to initialize receipt compressor"),
50        );
51
52        /// Thread receipt decompressor.
53        pub static RECEIPT_DECOMPRESSOR: RefCell<ReusableDecompressor> =
54            RefCell::new(ReusableDecompressor::new(
55                Decompressor::with_dictionary(RECEIPT_DICTIONARY)
56                    .expect("failed to initialize receipt decompressor"),
57            ));
58    }
59}
60
61/// Fn creates tx [`Compressor`]
62pub fn create_tx_compressor() -> Compressor<'static> {
63    Compressor::with_dictionary(0, RECEIPT_DICTIONARY).expect("Failed to instantiate tx compressor")
64}
65
66/// Fn creates tx [`Decompressor`]
67pub fn create_tx_decompressor() -> ReusableDecompressor {
68    ReusableDecompressor::new(
69        Decompressor::with_dictionary(TRANSACTION_DICTIONARY)
70            .expect("Failed to instantiate tx decompressor"),
71    )
72}
73
74/// Fn creates receipt [`Compressor`]
75pub fn create_receipt_compressor() -> Compressor<'static> {
76    Compressor::with_dictionary(0, RECEIPT_DICTIONARY)
77        .expect("Failed to instantiate receipt compressor")
78}
79
80/// Fn creates receipt [`Decompressor`]
81pub fn create_receipt_decompressor() -> ReusableDecompressor {
82    ReusableDecompressor::new(
83        Decompressor::with_dictionary(RECEIPT_DICTIONARY)
84            .expect("Failed to instantiate receipt decompressor"),
85    )
86}
87
88/// Reusable decompressor that uses its own internal buffer.
89#[allow(missing_debug_implementations)]
90pub struct ReusableDecompressor {
91    /// The `zstd` decompressor.
92    decompressor: Decompressor<'static>,
93    /// The buffer to decompress to.
94    buf: Vec<u8>,
95}
96
97impl ReusableDecompressor {
98    fn new(decompressor: Decompressor<'static>) -> Self {
99        Self { decompressor, buf: Vec::with_capacity(4096) }
100    }
101
102    /// Decompresses `src` reusing the decompressor and its internal buffer.
103    pub fn decompress(&mut self, src: &[u8]) -> &[u8] {
104        // If the decompression fails because the buffer is too small, we try to reserve more space
105        // by getting the upper bound and retry the decompression.
106        let mut reserved_upper_bound = false;
107        while let Err(err) = self.decompressor.decompress_to_buffer(src, &mut self.buf) {
108            let err = err.to_string();
109            assert!(
110                err.contains("Destination buffer is too small"),
111                "Failed to decompress {} bytes: {err}",
112                src.len()
113            );
114
115            let additional = 'b: {
116                // Try to get the upper bound of the decompression for the given source.
117                // Do this only once as it might be expensive and will be the same for the same
118                // source.
119                if !reserved_upper_bound {
120                    reserved_upper_bound = true;
121                    if let Some(upper_bound) = Decompressor::upper_bound(src) {
122                        if let Some(additional) = upper_bound.checked_sub(self.buf.capacity()) {
123                            break 'b additional
124                        }
125                    }
126                }
127
128                // Otherwise, double the capacity of the buffer.
129                // This should normally not be reached as the upper bound should be enough.
130                self.buf.capacity() + 24_000
131            };
132            self.reserve(additional, src.len());
133        }
134
135        // `decompress_to_buffer` sets the length of the vector to the number of bytes written, so
136        // we can safely return it as a slice.
137        &self.buf
138    }
139
140    #[track_caller]
141    fn reserve(&mut self, additional: usize, src_len: usize) {
142        if let Err(e) = self.buf.try_reserve(additional) {
143            panic!(
144                "failed to allocate to {existing} + {additional} bytes \
145                 for the decompression of {src_len} bytes: {e}",
146                existing = self.buf.capacity(),
147            );
148        }
149    }
150}