reth_etl/
lib.rs

1//! ETL data collector.
2//!
3//! This crate is useful for dumping unsorted data into temporary files and iterating on their
4//! sorted representation later on.
5//!
6//! This has multiple uses, such as optimizing database inserts (for Btree based databases) and
7//! memory management (as it moves the buffer to disk instead of memory).
8
9#![doc(
10    html_logo_url = "https://raw.githubusercontent.com/paradigmxyz/reth/main/assets/reth-docs.png",
11    html_favicon_url = "https://avatars0.githubusercontent.com/u/97369466?s=256",
12    issue_tracker_base_url = "https://github.com/paradigmxyz/reth/issues/"
13)]
14#![cfg_attr(not(test), warn(unused_crate_dependencies))]
15#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]
16
17use std::{
18    cmp::Reverse,
19    collections::BinaryHeap,
20    io::{self, BufReader, BufWriter, Read, Seek, SeekFrom, Write},
21    path::{Path, PathBuf},
22};
23
24/// Key len and Value len encode use [`usize::to_be_bytes()`] the length is 8.
25const KV_LEN: usize = 8;
26
27use rayon::prelude::*;
28use reth_db_api::table::{Compress, Encode, Key, Value};
29use tempfile::{NamedTempFile, TempDir};
30
31/// An ETL (extract, transform, load) data collector.
32///
33/// Data is pushed (extract) to the collector which internally flushes the data in a sorted
34/// (transform) manner to files of some specified capacity. the data can later be iterated over
35/// (load) in a sorted manner.
36///
37/// Used mainly to insert data into `MDBX` in a sorted manner. This is important because performance
38/// and storage space degrades greatly if the data is inserted unsorted (eg. tables with hashes as
39/// keys.) as opposed to append & sorted insert. Some benchmarks can be found [here](https://github.com/paradigmxyz/reth/pull/1130#issuecomment-1418642755).
40#[derive(Debug)]
41pub struct Collector<K, V>
42where
43    K: Encode + Ord,
44    V: Compress,
45{
46    /// Parent directory where to create ETL files
47    parent_dir: Option<PathBuf>,
48    /// Directory for temporary file storage
49    dir: Option<TempDir>,
50    /// Collection of temporary ETL files
51    files: Vec<EtlFile>,
52    /// Current buffer size in bytes
53    buffer_size_bytes: usize,
54    /// Maximum buffer capacity in bytes, triggers flush when reached
55    buffer_capacity_bytes: usize,
56    /// In-memory buffer storing encoded and compressed key-value pairs
57    buffer: Vec<(<K as Encode>::Encoded, <V as Compress>::Compressed)>,
58    /// Total number of elements in the collector, including all files
59    len: usize,
60}
61
62impl<K, V> Collector<K, V>
63where
64    K: Key,
65    V: Value,
66{
67    /// Create a new collector with some capacity.
68    ///
69    /// Once the capacity (in bytes) is reached, the data is sorted and flushed to disk.
70    pub const fn new(buffer_capacity_bytes: usize, parent_dir: Option<PathBuf>) -> Self {
71        Self {
72            parent_dir,
73            dir: None,
74            buffer_size_bytes: 0,
75            files: Vec::new(),
76            buffer_capacity_bytes,
77            buffer: Vec::new(),
78            len: 0,
79        }
80    }
81
82    /// Returns number of elements currently in the collector.
83    pub const fn len(&self) -> usize {
84        self.len
85    }
86
87    /// Returns `true` if there are currently no elements in the collector.
88    pub const fn is_empty(&self) -> bool {
89        self.len == 0
90    }
91
92    /// Clears the collector, removing all data, including the temporary directory.
93    pub fn clear(&mut self) {
94        self.dir = None;
95        // Clear vectors and free the allocated memory
96        self.files = Vec::new();
97        self.buffer = Vec::new();
98        self.buffer_size_bytes = 0;
99        self.len = 0;
100    }
101
102    /// Insert an entry into the collector.
103    pub fn insert(&mut self, key: K, value: V) -> io::Result<()> {
104        let key = key.encode();
105        let value = value.compress();
106        self.buffer_size_bytes += key.as_ref().len() + value.as_ref().len();
107        self.buffer.push((key, value));
108        if self.buffer_size_bytes > self.buffer_capacity_bytes {
109            self.flush()?;
110        }
111        self.len += 1;
112
113        Ok(())
114    }
115
116    /// Returns a reference to the temporary directory used by the collector. If the directory
117    /// doesn't exist, it will be created.
118    fn dir(&mut self) -> io::Result<&TempDir> {
119        if self.dir.is_none() {
120            self.dir = match &self.parent_dir {
121                Some(dir) => {
122                    if !dir.exists() {
123                        std::fs::create_dir_all(dir)?;
124                    }
125                    Some(TempDir::new_in(dir)?)
126                }
127                None => Some(TempDir::new()?),
128            };
129        }
130        Ok(self.dir.as_ref().unwrap())
131    }
132
133    fn flush(&mut self) -> io::Result<()> {
134        self.buffer_size_bytes = 0;
135        self.buffer.par_sort_unstable_by(|a, b| a.0.cmp(&b.0));
136        let mut buf = Vec::with_capacity(self.buffer.len());
137        std::mem::swap(&mut buf, &mut self.buffer);
138
139        let path = self.dir()?.path().to_path_buf();
140        self.files.push(EtlFile::new(path.as_path(), buf)?);
141
142        Ok(())
143    }
144
145    /// Returns an iterator over the collector data.
146    ///
147    /// The items of the iterator are sorted across all underlying files.
148    ///
149    /// # Note
150    ///
151    /// The keys and values have been pre-encoded, meaning they *SHOULD NOT* be encoded or
152    /// compressed again.
153    pub fn iter(&mut self) -> std::io::Result<EtlIter<'_>> {
154        // Flush the remaining items to disk
155        if self.buffer_size_bytes > 0 {
156            self.flush()?;
157        }
158
159        let mut heap = BinaryHeap::new();
160        for (current_id, file) in self.files.iter_mut().enumerate() {
161            if let Some((current_key, current_value)) = file.read_next()? {
162                heap.push((Reverse((current_key, current_value)), current_id));
163            }
164        }
165
166        Ok(EtlIter { heap, files: &mut self.files })
167    }
168}
169
170/// Type alias for the items stored in the heap of [`EtlIter`].
171///
172/// Each item in the heap is a tuple containing:
173/// - A `Reverse` tuple of a key-value pair (`Vec<u8>, Vec<u8>`), used to maintain the heap in
174///   ascending order of keys.
175/// - An index (`usize`) representing the source file from which the key-value pair was read.
176type HeapItem = (Reverse<(Vec<u8>, Vec<u8>)>, usize);
177
178/// `EtlIter` is an iterator for traversing through sorted key-value pairs in a collection of ETL
179/// files. These files are created using the [`Collector`] and contain data where keys are encoded
180/// and values are compressed.
181///
182/// This iterator returns each key-value pair in ascending order based on the key.
183/// It is particularly designed to efficiently handle large datasets by employing a binary heap for
184/// managing the iteration order.
185#[derive(Debug)]
186pub struct EtlIter<'a> {
187    /// Heap managing the next items to be iterated.
188    heap: BinaryHeap<HeapItem>,
189    /// Reference to the vector of ETL files being iterated over.
190    files: &'a mut Vec<EtlFile>,
191}
192
193impl EtlIter<'_> {
194    /// Peeks into the next element
195    pub fn peek(&self) -> Option<&(Vec<u8>, Vec<u8>)> {
196        self.heap.peek().map(|(Reverse(entry), _)| entry)
197    }
198}
199
200impl Iterator for EtlIter<'_> {
201    type Item = std::io::Result<(Vec<u8>, Vec<u8>)>;
202
203    fn next(&mut self) -> Option<Self::Item> {
204        // Get the next sorted entry from the heap
205        let (Reverse(entry), id) = self.heap.pop()?;
206
207        // Populate the heap with the next entry from the same file
208        match self.files[id].read_next() {
209            Ok(Some((key, value))) => {
210                self.heap.push((Reverse((key, value)), id));
211                Some(Ok(entry))
212            }
213            Ok(None) => Some(Ok(entry)),
214            err => err.transpose(),
215        }
216    }
217}
218
219/// A temporary ETL file.
220#[derive(Debug)]
221struct EtlFile {
222    file: BufReader<NamedTempFile>,
223    len: usize,
224}
225
226impl EtlFile {
227    /// Create a new file with the given data (which should be pre-sorted) at the given path.
228    ///
229    /// The file will be a temporary file.
230    pub(crate) fn new<K, V>(dir: &Path, buffer: Vec<(K, V)>) -> std::io::Result<Self>
231    where
232        Self: Sized,
233        K: AsRef<[u8]>,
234        V: AsRef<[u8]>,
235    {
236        let file = NamedTempFile::new_in(dir)?;
237        let mut w = BufWriter::new(file);
238        for entry in &buffer {
239            let k = entry.0.as_ref();
240            let v = entry.1.as_ref();
241
242            w.write_all(&k.len().to_be_bytes())?;
243            w.write_all(&v.len().to_be_bytes())?;
244            w.write_all(k)?;
245            w.write_all(v)?;
246        }
247
248        let mut file = BufReader::new(w.into_inner()?);
249        file.seek(SeekFrom::Start(0))?;
250        let len = buffer.len();
251        Ok(Self { file, len })
252    }
253
254    /// Read the next entry in the file.
255    ///
256    /// Can return error if it reaches EOF before filling the internal buffers.
257    pub(crate) fn read_next(&mut self) -> std::io::Result<Option<(Vec<u8>, Vec<u8>)>> {
258        if self.len == 0 {
259            return Ok(None)
260        }
261
262        let mut buffer_key_length = [0; KV_LEN];
263        let mut buffer_value_length = [0; KV_LEN];
264
265        self.file.read_exact(&mut buffer_key_length)?;
266        self.file.read_exact(&mut buffer_value_length)?;
267
268        let key_length = usize::from_be_bytes(buffer_key_length);
269        let value_length = usize::from_be_bytes(buffer_value_length);
270        let mut key = vec![0; key_length];
271        let mut value = vec![0; value_length];
272
273        self.file.read_exact(&mut key)?;
274        self.file.read_exact(&mut value)?;
275
276        self.len -= 1;
277
278        Ok(Some((key, value)))
279    }
280}
281
282#[cfg(test)]
283mod tests {
284    use super::*;
285    use alloy_primitives::{TxHash, TxNumber};
286
287    #[test]
288    fn etl_hashes() {
289        let mut entries: Vec<_> =
290            (0..10_000).map(|id| (TxHash::random(), id as TxNumber)).collect();
291
292        let mut collector = Collector::new(1024, None);
293        assert!(collector.dir.is_none());
294
295        for (k, v) in entries.clone() {
296            collector.insert(k, v).unwrap();
297        }
298        entries.sort_unstable_by_key(|entry| entry.0);
299
300        for (id, entry) in collector.iter().unwrap().enumerate() {
301            let expected = entries[id];
302            assert_eq!(
303                entry.unwrap(),
304                (expected.0.encode().to_vec(), expected.1.compress().clone())
305            );
306        }
307
308        let temp_dir_path = collector.dir.as_ref().unwrap().path().to_path_buf();
309
310        collector.clear();
311        assert!(collector.dir.is_none());
312        assert!(collector.files.is_empty());
313        assert_eq!(collector.buffer_size_bytes, 0);
314        assert!(collector.buffer.is_empty());
315        assert_eq!(collector.len, 0);
316        assert!(collector.is_empty());
317        assert!(!temp_dir_path.exists());
318    }
319}