1#![doc(
10 html_logo_url = "https://raw.githubusercontent.com/paradigmxyz/reth/main/assets/reth-docs.png",
11 html_favicon_url = "https://avatars0.githubusercontent.com/u/97369466?s=256",
12 issue_tracker_base_url = "https://github.com/paradigmxyz/reth/issues/"
13)]
14#![cfg_attr(not(test), warn(unused_crate_dependencies))]
15#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]
16
17use std::{
18 cmp::Reverse,
19 collections::BinaryHeap,
20 io::{self, BufReader, BufWriter, Read, Seek, SeekFrom, Write},
21 path::{Path, PathBuf},
22};
23
24const KV_LEN: usize = 8;
26
27use rayon::prelude::*;
28use reth_db_api::table::{Compress, Encode, Key, Value};
29use tempfile::{NamedTempFile, TempDir};
30
31#[derive(Debug)]
41pub struct Collector<K, V>
42where
43 K: Encode + Ord,
44 V: Compress,
45{
46 parent_dir: Option<PathBuf>,
48 dir: Option<TempDir>,
50 files: Vec<EtlFile>,
52 buffer_size_bytes: usize,
54 buffer_capacity_bytes: usize,
56 buffer: Vec<(<K as Encode>::Encoded, <V as Compress>::Compressed)>,
58 len: usize,
60}
61
62impl<K, V> Collector<K, V>
63where
64 K: Key,
65 V: Value,
66{
67 pub const fn new(buffer_capacity_bytes: usize, parent_dir: Option<PathBuf>) -> Self {
71 Self {
72 parent_dir,
73 dir: None,
74 buffer_size_bytes: 0,
75 files: Vec::new(),
76 buffer_capacity_bytes,
77 buffer: Vec::new(),
78 len: 0,
79 }
80 }
81
82 pub const fn len(&self) -> usize {
84 self.len
85 }
86
87 pub const fn is_empty(&self) -> bool {
89 self.len == 0
90 }
91
92 pub fn clear(&mut self) {
94 self.dir = None;
95 self.files = Vec::new();
97 self.buffer = Vec::new();
98 self.buffer_size_bytes = 0;
99 self.len = 0;
100 }
101
102 pub fn insert(&mut self, key: K, value: V) -> io::Result<()> {
104 let key = key.encode();
105 let value = value.compress();
106 self.buffer_size_bytes += key.as_ref().len() + value.as_ref().len();
107 self.buffer.push((key, value));
108 if self.buffer_size_bytes > self.buffer_capacity_bytes {
109 self.flush()?;
110 }
111 self.len += 1;
112
113 Ok(())
114 }
115
116 fn dir(&mut self) -> io::Result<&TempDir> {
119 if self.dir.is_none() {
120 self.dir = match &self.parent_dir {
121 Some(dir) => {
122 if !dir.exists() {
123 std::fs::create_dir_all(dir)?;
124 }
125 Some(TempDir::new_in(dir)?)
126 }
127 None => Some(TempDir::new()?),
128 };
129 }
130 Ok(self.dir.as_ref().unwrap())
131 }
132
133 fn flush(&mut self) -> io::Result<()> {
134 self.buffer_size_bytes = 0;
135 self.buffer.par_sort_unstable_by(|a, b| a.0.cmp(&b.0));
136 let mut buf = Vec::with_capacity(self.buffer.len());
137 std::mem::swap(&mut buf, &mut self.buffer);
138
139 let path = self.dir()?.path().to_path_buf();
140 self.files.push(EtlFile::new(path.as_path(), buf)?);
141
142 Ok(())
143 }
144
145 pub fn iter(&mut self) -> std::io::Result<EtlIter<'_>> {
154 if self.buffer_size_bytes > 0 {
156 self.flush()?;
157 }
158
159 let mut heap = BinaryHeap::new();
160 for (current_id, file) in self.files.iter_mut().enumerate() {
161 if let Some((current_key, current_value)) = file.read_next()? {
162 heap.push((Reverse((current_key, current_value)), current_id));
163 }
164 }
165
166 Ok(EtlIter { heap, files: &mut self.files })
167 }
168}
169
170type HeapItem = (Reverse<(Vec<u8>, Vec<u8>)>, usize);
177
178#[derive(Debug)]
186pub struct EtlIter<'a> {
187 heap: BinaryHeap<HeapItem>,
189 files: &'a mut Vec<EtlFile>,
191}
192
193impl EtlIter<'_> {
194 pub fn peek(&self) -> Option<&(Vec<u8>, Vec<u8>)> {
196 self.heap.peek().map(|(Reverse(entry), _)| entry)
197 }
198}
199
200impl Iterator for EtlIter<'_> {
201 type Item = std::io::Result<(Vec<u8>, Vec<u8>)>;
202
203 fn next(&mut self) -> Option<Self::Item> {
204 let (Reverse(entry), id) = self.heap.pop()?;
206
207 match self.files[id].read_next() {
209 Ok(Some((key, value))) => {
210 self.heap.push((Reverse((key, value)), id));
211 Some(Ok(entry))
212 }
213 Ok(None) => Some(Ok(entry)),
214 err => err.transpose(),
215 }
216 }
217}
218
219#[derive(Debug)]
221struct EtlFile {
222 file: BufReader<NamedTempFile>,
223 len: usize,
224}
225
226impl EtlFile {
227 pub(crate) fn new<K, V>(dir: &Path, buffer: Vec<(K, V)>) -> std::io::Result<Self>
231 where
232 Self: Sized,
233 K: AsRef<[u8]>,
234 V: AsRef<[u8]>,
235 {
236 let file = NamedTempFile::new_in(dir)?;
237 let mut w = BufWriter::new(file);
238 for entry in &buffer {
239 let k = entry.0.as_ref();
240 let v = entry.1.as_ref();
241
242 w.write_all(&k.len().to_be_bytes())?;
243 w.write_all(&v.len().to_be_bytes())?;
244 w.write_all(k)?;
245 w.write_all(v)?;
246 }
247
248 let mut file = BufReader::new(w.into_inner()?);
249 file.seek(SeekFrom::Start(0))?;
250 let len = buffer.len();
251 Ok(Self { file, len })
252 }
253
254 pub(crate) fn read_next(&mut self) -> std::io::Result<Option<(Vec<u8>, Vec<u8>)>> {
258 if self.len == 0 {
259 return Ok(None)
260 }
261
262 let mut buffer_key_length = [0; KV_LEN];
263 let mut buffer_value_length = [0; KV_LEN];
264
265 self.file.read_exact(&mut buffer_key_length)?;
266 self.file.read_exact(&mut buffer_value_length)?;
267
268 let key_length = usize::from_be_bytes(buffer_key_length);
269 let value_length = usize::from_be_bytes(buffer_value_length);
270 let mut key = vec![0; key_length];
271 let mut value = vec![0; value_length];
272
273 self.file.read_exact(&mut key)?;
274 self.file.read_exact(&mut value)?;
275
276 self.len -= 1;
277
278 Ok(Some((key, value)))
279 }
280}
281
282#[cfg(test)]
283mod tests {
284 use super::*;
285 use alloy_primitives::{TxHash, TxNumber};
286
287 #[test]
288 fn etl_hashes() {
289 let mut entries: Vec<_> =
290 (0..10_000).map(|id| (TxHash::random(), id as TxNumber)).collect();
291
292 let mut collector = Collector::new(1024, None);
293 assert!(collector.dir.is_none());
294
295 for (k, v) in entries.clone() {
296 collector.insert(k, v).unwrap();
297 }
298 entries.sort_unstable_by_key(|entry| entry.0);
299
300 for (id, entry) in collector.iter().unwrap().enumerate() {
301 let expected = entries[id];
302 assert_eq!(
303 entry.unwrap(),
304 (expected.0.encode().to_vec(), expected.1.compress().clone())
305 );
306 }
307
308 let temp_dir_path = collector.dir.as_ref().unwrap().path().to_path_buf();
309
310 collector.clear();
311 assert!(collector.dir.is_none());
312 assert!(collector.files.is_empty());
313 assert_eq!(collector.buffer_size_bytes, 0);
314 assert!(collector.buffer.is_empty());
315 assert_eq!(collector.len, 0);
316 assert!(collector.is_empty());
317 assert!(!temp_dir_path.exists());
318 }
319}