reth_zstd_compressors/
lib.rs1#![doc(
4 html_logo_url = "https://raw.githubusercontent.com/paradigmxyz/reth/main/assets/reth-docs.png",
5 html_favicon_url = "https://avatars0.githubusercontent.com/u/97369466?s=256",
6 issue_tracker_base_url = "https://github.com/paradigmxyz/reth/issues/"
7)]
8#![cfg_attr(not(test), warn(unused_crate_dependencies))]
9#![cfg_attr(docsrs, feature(doc_cfg))]
10#![cfg_attr(not(feature = "std"), no_std)]
11
12extern crate alloc;
13
14use crate::alloc::string::ToString;
15use alloc::vec::Vec;
16use zstd::bulk::{Compressor, Decompressor};
17
18pub static RECEIPT_DICTIONARY: &[u8] = include_bytes!("../receipt_dictionary.bin");
20pub static TRANSACTION_DICTIONARY: &[u8] = include_bytes!("../transaction_dictionary.bin");
22
23#[cfg(feature = "std")]
24pub use locals::*;
25#[cfg(feature = "std")]
26mod locals {
27 use super::*;
28 use core::cell::RefCell;
29
30 std::thread_local! {
33 pub static TRANSACTION_COMPRESSOR: RefCell<Compressor<'static>> = RefCell::new(
35 Compressor::with_dictionary(0, TRANSACTION_DICTIONARY)
36 .expect("failed to initialize transaction compressor"),
37 );
38
39 pub static TRANSACTION_DECOMPRESSOR: RefCell<ReusableDecompressor> =
41 RefCell::new(ReusableDecompressor::new(
42 Decompressor::with_dictionary(TRANSACTION_DICTIONARY)
43 .expect("failed to initialize transaction decompressor"),
44 ));
45
46 pub static RECEIPT_COMPRESSOR: RefCell<Compressor<'static>> = RefCell::new(
48 Compressor::with_dictionary(0, RECEIPT_DICTIONARY)
49 .expect("failed to initialize receipt compressor"),
50 );
51
52 pub static RECEIPT_DECOMPRESSOR: RefCell<ReusableDecompressor> =
54 RefCell::new(ReusableDecompressor::new(
55 Decompressor::with_dictionary(RECEIPT_DICTIONARY)
56 .expect("failed to initialize receipt decompressor"),
57 ));
58 }
59}
60
61pub fn create_tx_compressor() -> Compressor<'static> {
63 Compressor::with_dictionary(0, TRANSACTION_DICTIONARY)
64 .expect("Failed to instantiate tx compressor")
65}
66
67pub fn create_tx_decompressor() -> ReusableDecompressor {
69 ReusableDecompressor::new(
70 Decompressor::with_dictionary(TRANSACTION_DICTIONARY)
71 .expect("Failed to instantiate tx decompressor"),
72 )
73}
74
75pub fn create_receipt_compressor() -> Compressor<'static> {
77 Compressor::with_dictionary(0, RECEIPT_DICTIONARY)
78 .expect("Failed to instantiate receipt compressor")
79}
80
81pub fn create_receipt_decompressor() -> ReusableDecompressor {
83 ReusableDecompressor::new(
84 Decompressor::with_dictionary(RECEIPT_DICTIONARY)
85 .expect("Failed to instantiate receipt decompressor"),
86 )
87}
88
89#[inline]
91pub fn with_tx_compressor<R>(f: impl FnOnce(&mut Compressor<'_>) -> R) -> R {
92 #[cfg(feature = "std")]
93 {
94 TRANSACTION_COMPRESSOR.with_borrow_mut(f)
95 }
96 #[cfg(not(feature = "std"))]
97 {
98 f(&mut create_tx_compressor())
99 }
100}
101
102#[inline]
105pub fn with_tx_decompressor<R>(f: impl FnOnce(&mut ReusableDecompressor) -> R) -> R {
106 #[cfg(feature = "std")]
107 {
108 TRANSACTION_DECOMPRESSOR.with_borrow_mut(f)
109 }
110 #[cfg(not(feature = "std"))]
111 {
112 f(&mut create_tx_decompressor())
113 }
114}
115
116#[inline]
118pub fn with_receipt_compressor<R>(f: impl FnOnce(&mut Compressor<'_>) -> R) -> R {
119 #[cfg(feature = "std")]
120 {
121 RECEIPT_COMPRESSOR.with_borrow_mut(f)
122 }
123 #[cfg(not(feature = "std"))]
124 {
125 f(&mut create_receipt_compressor())
126 }
127}
128
129#[inline]
131pub fn with_receipt_decompressor<R>(f: impl FnOnce(&mut ReusableDecompressor) -> R) -> R {
132 #[cfg(feature = "std")]
133 {
134 RECEIPT_DECOMPRESSOR.with_borrow_mut(f)
135 }
136 #[cfg(not(feature = "std"))]
137 {
138 f(&mut create_receipt_decompressor())
139 }
140}
141
142#[expect(missing_debug_implementations)]
144pub struct ReusableDecompressor {
145 decompressor: Decompressor<'static>,
147 buf: Vec<u8>,
149}
150
151impl ReusableDecompressor {
152 fn new(decompressor: Decompressor<'static>) -> Self {
153 Self { decompressor, buf: Vec::with_capacity(4096) }
154 }
155
156 pub fn decompress(&mut self, src: &[u8]) -> &[u8] {
158 let mut reserved_upper_bound = false;
161 while let Err(err) = self.decompressor.decompress_to_buffer(src, &mut self.buf) {
162 let err = err.to_string();
163 assert!(
164 err.contains("Destination buffer is too small"),
165 "Failed to decompress {} bytes: {err}",
166 src.len()
167 );
168
169 let additional = 'b: {
170 if !reserved_upper_bound {
174 reserved_upper_bound = true;
175 if let Some(upper_bound) = Decompressor::upper_bound(src) &&
176 let Some(additional) = upper_bound.checked_sub(self.buf.capacity())
177 {
178 break 'b additional
179 }
180 }
181
182 self.buf.capacity() + 24_000
185 };
186 self.reserve(additional, src.len());
187 }
188
189 &self.buf
192 }
193
194 #[track_caller]
195 fn reserve(&mut self, additional: usize, src_len: usize) {
196 if let Err(e) = self.buf.try_reserve(additional) {
197 panic!(
198 "failed to allocate to {existing} + {additional} bytes \
199 for the decompression of {src_len} bytes: {e}",
200 existing = self.buf.capacity(),
201 );
202 }
203 }
204}
205
206#[cfg(test)]
207mod tests {
208 use super::*;
209
210 #[test]
211 fn tx_roundtrip_compression() {
212 let data: Vec<u8> = (0u8..=255).collect();
213
214 let mut compressor = create_tx_compressor();
215 let compressed = compressor.compress(&data).expect("compress tx");
216
217 let mut decompressor = create_tx_decompressor();
218 let decompressed = decompressor.decompress(&compressed);
219
220 assert_eq!(decompressed, &*data);
221 }
222
223 #[test]
224 fn receipt_roundtrip_compression() {
225 let data: Vec<u8> = (0u8..=255).rev().collect();
226
227 let mut compressor = create_receipt_compressor();
228 let compressed = compressor.compress(&data).expect("compress receipt");
229
230 let mut decompressor = create_receipt_decompressor();
231 let decompressed = decompressor.decompress(&compressed);
232
233 assert_eq!(decompressed, &*data);
234 }
235}