reth_zstd_compressors/
lib.rs
1#![doc(
4 html_logo_url = "https://raw.githubusercontent.com/paradigmxyz/reth/main/assets/reth-docs.png",
5 html_favicon_url = "https://avatars0.githubusercontent.com/u/97369466?s=256",
6 issue_tracker_base_url = "https://github.com/paradigmxyz/reth/issues/"
7)]
8#![cfg_attr(not(test), warn(unused_crate_dependencies))]
9#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]
10#![cfg_attr(not(feature = "std"), no_std)]
11
12extern crate alloc;
13
14use crate::alloc::string::ToString;
15use alloc::vec::Vec;
16use zstd::bulk::{Compressor, Decompressor};
17
18pub static RECEIPT_DICTIONARY: &[u8] = include_bytes!("../receipt_dictionary.bin");
20pub static TRANSACTION_DICTIONARY: &[u8] = include_bytes!("../transaction_dictionary.bin");
22
23#[cfg(feature = "std")]
24pub use locals::*;
25#[cfg(feature = "std")]
26mod locals {
27 use super::*;
28 use core::cell::RefCell;
29
30 std::thread_local! {
33 pub static TRANSACTION_COMPRESSOR: RefCell<Compressor<'static>> = RefCell::new(
35 Compressor::with_dictionary(0, TRANSACTION_DICTIONARY)
36 .expect("failed to initialize transaction compressor"),
37 );
38
39 pub static TRANSACTION_DECOMPRESSOR: RefCell<ReusableDecompressor> =
41 RefCell::new(ReusableDecompressor::new(
42 Decompressor::with_dictionary(TRANSACTION_DICTIONARY)
43 .expect("failed to initialize transaction decompressor"),
44 ));
45
46 pub static RECEIPT_COMPRESSOR: RefCell<Compressor<'static>> = RefCell::new(
48 Compressor::with_dictionary(0, RECEIPT_DICTIONARY)
49 .expect("failed to initialize receipt compressor"),
50 );
51
52 pub static RECEIPT_DECOMPRESSOR: RefCell<ReusableDecompressor> =
54 RefCell::new(ReusableDecompressor::new(
55 Decompressor::with_dictionary(RECEIPT_DICTIONARY)
56 .expect("failed to initialize receipt decompressor"),
57 ));
58 }
59}
60
61pub fn create_tx_compressor() -> Compressor<'static> {
63 Compressor::with_dictionary(0, RECEIPT_DICTIONARY).expect("Failed to instantiate tx compressor")
64}
65
66pub fn create_tx_decompressor() -> ReusableDecompressor {
68 ReusableDecompressor::new(
69 Decompressor::with_dictionary(TRANSACTION_DICTIONARY)
70 .expect("Failed to instantiate tx decompressor"),
71 )
72}
73
74pub fn create_receipt_compressor() -> Compressor<'static> {
76 Compressor::with_dictionary(0, RECEIPT_DICTIONARY)
77 .expect("Failed to instantiate receipt compressor")
78}
79
80pub fn create_receipt_decompressor() -> ReusableDecompressor {
82 ReusableDecompressor::new(
83 Decompressor::with_dictionary(RECEIPT_DICTIONARY)
84 .expect("Failed to instantiate receipt decompressor"),
85 )
86}
87
88#[allow(missing_debug_implementations)]
90pub struct ReusableDecompressor {
91 decompressor: Decompressor<'static>,
93 buf: Vec<u8>,
95}
96
97impl ReusableDecompressor {
98 fn new(decompressor: Decompressor<'static>) -> Self {
99 Self { decompressor, buf: Vec::with_capacity(4096) }
100 }
101
102 pub fn decompress(&mut self, src: &[u8]) -> &[u8] {
104 let mut reserved_upper_bound = false;
107 while let Err(err) = self.decompressor.decompress_to_buffer(src, &mut self.buf) {
108 let err = err.to_string();
109 assert!(
110 err.contains("Destination buffer is too small"),
111 "Failed to decompress {} bytes: {err}",
112 src.len()
113 );
114
115 let additional = 'b: {
116 if !reserved_upper_bound {
120 reserved_upper_bound = true;
121 if let Some(upper_bound) = Decompressor::upper_bound(src) {
122 if let Some(additional) = upper_bound.checked_sub(self.buf.capacity()) {
123 break 'b additional
124 }
125 }
126 }
127
128 self.buf.capacity() + 24_000
131 };
132 self.reserve(additional, src.len());
133 }
134
135 &self.buf
138 }
139
140 #[track_caller]
141 fn reserve(&mut self, additional: usize, src_len: usize) {
142 if let Err(e) = self.buf.try_reserve(additional) {
143 panic!(
144 "failed to allocate to {existing} + {additional} bytes \
145 for the decompression of {src_len} bytes: {e}",
146 existing = self.buf.capacity(),
147 );
148 }
149 }
150}