Skip to main content

reth_stages/stages/
utils.rs

1//! Utils for `stages`.
2use alloy_primitives::{map::AddressMap, Address, BlockNumber, TxNumber, B256};
3use reth_config::config::EtlConfig;
4use reth_db_api::{
5    cursor::{DbCursorRO, DbCursorRW},
6    models::{
7        sharded_key::NUM_OF_INDICES_IN_SHARD, storage_sharded_key::StorageShardedKey,
8        AccountBeforeTx, AddressStorageKey, BlockNumberAddress, ShardedKey,
9    },
10    table::{Decode, Decompress, Table},
11    transaction::DbTx,
12    BlockNumberList,
13};
14use reth_etl::Collector;
15use reth_primitives_traits::NodePrimitives;
16use reth_provider::{
17    providers::StaticFileProvider, to_range, BlockReader, DBProvider, EitherWriter, ProviderError,
18    StaticFileProviderFactory,
19};
20use reth_stages_api::StageError;
21use reth_static_file_types::StaticFileSegment;
22use reth_storage_api::{ChangeSetReader, StorageChangeSetReader};
23use std::{collections::HashMap, hash::Hash, ops::RangeBounds};
24use tracing::info;
25
26/// Number of blocks before pushing indices from cache to [`Collector`]
27const DEFAULT_CACHE_THRESHOLD: u64 = 100_000;
28
29/// Collects all history (`H`) indices for a range of changesets (`CS`) and stores them in a
30/// [`Collector`].
31///
32/// ## Process
33/// The function utilizes a `HashMap` cache with a structure of `PartialKey` (`P`) (Address or
34/// Address.StorageKey) to `BlockNumberList`. When the cache exceeds its capacity, its contents are
35/// moved to a [`Collector`]. Here, each entry's key is a concatenation of `PartialKey` and the
36/// highest block number in its list.
37///
38/// ## Example
39/// 1. Initial Cache State: `{ Address1: [1,2,3], ... }`
40/// 2. Cache is flushed to the `Collector`.
41/// 3. Updated Cache State: `{ Address1: [100,300], ... }`
42/// 4. Cache is flushed again.
43///
44/// As a result, the `Collector` will contain entries such as `(Address1.3, [1,2,3])` and
45/// `(Address1.300, [100,300])`. The entries may be stored across one or more files.
46pub(crate) fn collect_history_indices<Provider, CS, H, P>(
47    provider: &Provider,
48    range: impl RangeBounds<CS::Key>,
49    sharded_key_factory: impl Fn(P, BlockNumber) -> H::Key,
50    partial_key_factory: impl Fn((CS::Key, CS::Value)) -> (u64, P),
51    etl_config: &EtlConfig,
52) -> Result<Collector<H::Key, H::Value>, StageError>
53where
54    Provider: DBProvider,
55    CS: Table,
56    H: Table<Value = BlockNumberList>,
57    P: Copy + Eq + Hash,
58{
59    let mut changeset_cursor = provider.tx_ref().cursor_read::<CS>()?;
60
61    let mut collector = Collector::new(etl_config.file_size, etl_config.dir.clone());
62    let mut cache: HashMap<P, Vec<u64>> = HashMap::default();
63
64    let mut collect = |cache: &mut HashMap<P, Vec<u64>>| {
65        for (key, indices) in cache.drain() {
66            let last = *indices.last().expect("qed");
67            collector
68                .insert(sharded_key_factory(key, last), BlockNumberList::new_pre_sorted(indices))?;
69        }
70        Ok::<(), StageError>(())
71    };
72
73    // observability
74    let total_changesets = provider.tx_ref().entries::<CS>()?;
75    let interval = (total_changesets / 1000).max(1);
76
77    let mut flush_counter = 0;
78    let mut current_block_number = u64::MAX;
79    for (idx, entry) in changeset_cursor.walk_range(range)?.enumerate() {
80        let (block_number, key) = partial_key_factory(entry?);
81        cache.entry(key).or_default().push(block_number);
82
83        if idx > 0 && idx.is_multiple_of(interval) && total_changesets > 1000 {
84            info!(target: "sync::stages::index_history", progress = %format!("{:.4}%", (idx as f64 / total_changesets as f64) * 100.0), "Collecting indices");
85        }
86
87        // Make sure we only flush the cache every DEFAULT_CACHE_THRESHOLD blocks.
88        if current_block_number != block_number {
89            current_block_number = block_number;
90            flush_counter += 1;
91            if flush_counter > DEFAULT_CACHE_THRESHOLD {
92                collect(&mut cache)?;
93                flush_counter = 0;
94            }
95        }
96    }
97    collect(&mut cache)?;
98
99    Ok(collector)
100}
101
102/// Allows collecting indices from a cache with a custom insert fn
103fn collect_indices<K, F>(
104    cache: impl Iterator<Item = (K, Vec<u64>)>,
105    mut insert_fn: F,
106) -> Result<(), StageError>
107where
108    F: FnMut(K, Vec<u64>) -> Result<(), StageError>,
109{
110    for (key, indices) in cache {
111        insert_fn(key, indices)?
112    }
113    Ok(())
114}
115
116/// Collects account history indices using a provider that implements `ChangeSetReader`.
117pub(crate) fn collect_account_history_indices<Provider>(
118    provider: &Provider,
119    range: impl RangeBounds<BlockNumber>,
120    etl_config: &EtlConfig,
121) -> Result<Collector<ShardedKey<Address>, BlockNumberList>, StageError>
122where
123    Provider: DBProvider + ChangeSetReader + StaticFileProviderFactory,
124{
125    let mut collector = Collector::new(etl_config.file_size, etl_config.dir.clone());
126    let mut cache: AddressMap<Vec<u64>> = AddressMap::default();
127
128    let mut insert_fn = |address: Address, indices: Vec<u64>| {
129        let last = indices.last().expect("indices is non-empty");
130        collector
131            .insert(ShardedKey::new(address, *last), BlockNumberList::new_pre_sorted(indices))?;
132        Ok(())
133    };
134
135    // Convert range bounds to concrete range
136    let range = to_range(range);
137    let start_block = range.start;
138
139    // Use the new walker for lazy iteration over static file changesets
140    let static_file_provider = provider.static_file_provider();
141
142    let walker = static_file_provider.walk_account_changeset_range(range);
143
144    let mut flush_counter = 0;
145    let mut current_block_number = u64::MAX;
146
147    for changeset_result in walker {
148        let (block_number, AccountBeforeTx { address, .. }) = changeset_result?;
149        cache.entry(address).or_default().push(block_number);
150
151        if block_number != current_block_number {
152            current_block_number = block_number;
153            flush_counter += 1;
154        }
155
156        if flush_counter > DEFAULT_CACHE_THRESHOLD {
157            info!(
158                target: "sync::stages::index_history",
159                processed_blocks = current_block_number.saturating_sub(start_block) + 1,
160                current_block = current_block_number,
161                "Collecting indices"
162            );
163            collect_indices(cache.drain(), &mut insert_fn)?;
164            flush_counter = 0;
165        }
166    }
167    collect_indices(cache.into_iter(), insert_fn)?;
168
169    Ok(collector)
170}
171
172/// Collects storage history indices using a provider that implements `StorageChangeSetReader`.
173pub(crate) fn collect_storage_history_indices<Provider>(
174    provider: &Provider,
175    range: impl RangeBounds<BlockNumber>,
176    etl_config: &EtlConfig,
177) -> Result<Collector<StorageShardedKey, BlockNumberList>, StageError>
178where
179    Provider: DBProvider + StorageChangeSetReader + StaticFileProviderFactory,
180{
181    let mut collector = Collector::new(etl_config.file_size, etl_config.dir.clone());
182    let mut cache: HashMap<AddressStorageKey, Vec<u64>> = HashMap::default();
183
184    let mut insert_fn = |key: AddressStorageKey, indices: Vec<u64>| {
185        let last = indices.last().expect("qed");
186        collector.insert(
187            StorageShardedKey::new(key.0 .0, key.0 .1, *last),
188            BlockNumberList::new_pre_sorted(indices),
189        )?;
190        Ok::<(), StageError>(())
191    };
192
193    let range = to_range(range);
194    let start_block = range.start;
195    let static_file_provider = provider.static_file_provider();
196
197    let walker = static_file_provider.walk_storage_changeset_range(range);
198
199    let mut flush_counter = 0;
200    let mut current_block_number = u64::MAX;
201
202    for changeset_result in walker {
203        let (BlockNumberAddress((block_number, address)), storage) = changeset_result?;
204        cache.entry(AddressStorageKey((address, storage.key))).or_default().push(block_number);
205
206        if block_number != current_block_number {
207            current_block_number = block_number;
208            flush_counter += 1;
209        }
210
211        if flush_counter > DEFAULT_CACHE_THRESHOLD {
212            info!(
213                target: "sync::stages::index_history",
214                processed_blocks = current_block_number.saturating_sub(start_block) + 1,
215                current_block = current_block_number,
216                "Collecting indices"
217            );
218            collect_indices(cache.drain(), &mut insert_fn)?;
219            flush_counter = 0;
220        }
221    }
222
223    collect_indices(cache.into_iter(), insert_fn)?;
224
225    Ok(collector)
226}
227
228/// Loads account history indices into the database via `EitherWriter`.
229///
230/// Works with [`EitherWriter`] to support both MDBX and `RocksDB` backends.
231///
232/// ## Process
233/// Iterates over elements, grouping indices by their address. It flushes indices to disk
234/// when reaching a shard's max length (`NUM_OF_INDICES_IN_SHARD`) or when the address changes,
235/// ensuring the last previous address shard is stored.
236///
237/// Uses `Option<Address>` instead of `Address::default()` as the sentinel to avoid
238/// incorrectly treating `Address::ZERO` as "no previous address".
239pub(crate) fn load_account_history<N, CURSOR>(
240    mut collector: Collector<ShardedKey<Address>, BlockNumberList>,
241    append_only: bool,
242    writer: &mut EitherWriter<'_, CURSOR, N>,
243) -> Result<(), StageError>
244where
245    N: NodePrimitives,
246    CURSOR: DbCursorRW<reth_db_api::tables::AccountsHistory>
247        + DbCursorRO<reth_db_api::tables::AccountsHistory>,
248{
249    let mut current_address: Option<Address> = None;
250    // Accumulator for block numbers where the current address changed.
251    let mut current_list = Vec::<u64>::new();
252
253    let total_entries = collector.len();
254    let interval = (total_entries / 10).max(1);
255
256    for (index, element) in collector.iter()?.enumerate() {
257        let (k, v) = element?;
258        let sharded_key = ShardedKey::<Address>::decode_owned(k)?;
259        let new_list = BlockNumberList::decompress_owned(v)?;
260
261        if index > 0 && index.is_multiple_of(interval) && total_entries > 10 {
262            info!(target: "sync::stages::index_history", progress = %format!("{:.2}%", (index as f64 / total_entries as f64) * 100.0), "Writing indices");
263        }
264
265        let address = sharded_key.key;
266
267        // When address changes, flush the previous address's shards and start fresh.
268        if current_address != Some(address) {
269            // Flush all remaining shards for the previous address (uses u64::MAX for last shard).
270            if let Some(prev_addr) = current_address {
271                flush_account_history_shards(prev_addr, &mut current_list, append_only, writer)?;
272            }
273
274            current_address = Some(address);
275            current_list.clear();
276
277            // On incremental sync, merge with the existing last shard from the database.
278            // The last shard is stored with key (address, u64::MAX) so we can find it.
279            if !append_only &&
280                let Some(last_shard) = writer.get_last_account_history_shard(address)?
281            {
282                current_list.extend(last_shard.iter());
283            }
284        }
285
286        // Append new block numbers to the accumulator.
287        current_list.extend(new_list.iter());
288
289        // Flush complete shards, keeping the last (partial) shard buffered.
290        flush_account_history_shards_partial(address, &mut current_list, append_only, writer)?;
291    }
292
293    // Flush the final address's remaining shard.
294    if let Some(addr) = current_address {
295        flush_account_history_shards(addr, &mut current_list, append_only, writer)?;
296    }
297
298    Ok(())
299}
300
301/// Flushes complete shards for account history, keeping the trailing partial shard buffered.
302///
303/// Only flushes when we have more than one shard's worth of data, keeping the last
304/// (possibly partial) shard for continued accumulation. This avoids writing a shard
305/// that may need to be updated when more indices arrive.
306fn flush_account_history_shards_partial<N, CURSOR>(
307    address: Address,
308    list: &mut Vec<u64>,
309    append_only: bool,
310    writer: &mut EitherWriter<'_, CURSOR, N>,
311) -> Result<(), StageError>
312where
313    N: NodePrimitives,
314    CURSOR: DbCursorRW<reth_db_api::tables::AccountsHistory>
315        + DbCursorRO<reth_db_api::tables::AccountsHistory>,
316{
317    // Nothing to flush if we haven't filled a complete shard yet.
318    if list.len() <= NUM_OF_INDICES_IN_SHARD {
319        return Ok(());
320    }
321
322    let num_full_shards = list.len() / NUM_OF_INDICES_IN_SHARD;
323
324    // Always keep at least one shard buffered for continued accumulation.
325    // If len is exact multiple of shard size, keep the last full shard.
326    let shards_to_flush = if list.len().is_multiple_of(NUM_OF_INDICES_IN_SHARD) {
327        num_full_shards - 1
328    } else {
329        num_full_shards
330    };
331
332    if shards_to_flush == 0 {
333        return Ok(());
334    }
335
336    // Split: flush the first N shards, keep the remainder buffered.
337    let flush_len = shards_to_flush * NUM_OF_INDICES_IN_SHARD;
338    let remainder = list.split_off(flush_len);
339
340    // Write each complete shard with its highest block number as the key.
341    for chunk in list.chunks(NUM_OF_INDICES_IN_SHARD) {
342        let highest = *chunk.last().expect("chunk is non-empty");
343        let key = ShardedKey::new(address, highest);
344        let value = BlockNumberList::new_pre_sorted(chunk.iter().copied());
345
346        if append_only {
347            writer.append_account_history(key, &value)?;
348        } else {
349            writer.upsert_account_history(key, &value)?;
350        }
351    }
352
353    // Keep the remaining indices for the next iteration.
354    *list = remainder;
355    Ok(())
356}
357
358/// Flushes all remaining shards for account history, using `u64::MAX` for the last shard.
359///
360/// The `u64::MAX` key for the final shard is an invariant that allows `seek_exact(address,
361/// u64::MAX)` to find the last shard during incremental sync for merging with new indices.
362fn flush_account_history_shards<N, CURSOR>(
363    address: Address,
364    list: &mut Vec<u64>,
365    append_only: bool,
366    writer: &mut EitherWriter<'_, CURSOR, N>,
367) -> Result<(), StageError>
368where
369    N: NodePrimitives,
370    CURSOR: DbCursorRW<reth_db_api::tables::AccountsHistory>
371        + DbCursorRO<reth_db_api::tables::AccountsHistory>,
372{
373    if list.is_empty() {
374        return Ok(());
375    }
376
377    let num_chunks = list.len().div_ceil(NUM_OF_INDICES_IN_SHARD);
378
379    for (i, chunk) in list.chunks(NUM_OF_INDICES_IN_SHARD).enumerate() {
380        let is_last = i == num_chunks - 1;
381
382        // Use u64::MAX for the final shard's key. This invariant allows incremental sync
383        // to find the last shard via seek_exact(address, u64::MAX) for merging.
384        let highest = if is_last { u64::MAX } else { *chunk.last().expect("chunk is non-empty") };
385
386        let key = ShardedKey::new(address, highest);
387        let value = BlockNumberList::new_pre_sorted(chunk.iter().copied());
388
389        if append_only {
390            writer.append_account_history(key, &value)?;
391        } else {
392            writer.upsert_account_history(key, &value)?;
393        }
394    }
395
396    list.clear();
397    Ok(())
398}
399
400/// Called when database is ahead of static files. Attempts to find the first block we are missing
401/// transactions for.
402pub(crate) fn missing_static_data_error<Provider>(
403    last_tx_num: TxNumber,
404    static_file_provider: &StaticFileProvider<Provider::Primitives>,
405    provider: &Provider,
406    segment: StaticFileSegment,
407) -> Result<StageError, ProviderError>
408where
409    Provider: BlockReader + StaticFileProviderFactory,
410{
411    let mut last_block =
412        static_file_provider.get_highest_static_file_block(segment).unwrap_or_default();
413
414    // To be extra safe, we make sure that the last tx num matches the last block from its indices.
415    // If not, get it.
416    loop {
417        if let Some(indices) = provider.block_body_indices(last_block)? &&
418            indices.last_tx_num() <= last_tx_num
419        {
420            break
421        }
422        if last_block == 0 {
423            break
424        }
425        last_block -= 1;
426    }
427
428    let missing_block = Box::new(provider.sealed_header(last_block + 1)?.unwrap_or_default());
429
430    Ok(StageError::MissingStaticFileData {
431        block: Box::new(missing_block.block_with_parent()),
432        segment,
433    })
434}
435
436/// Loads storage history indices into the database via `EitherWriter`.
437///
438/// Works with [`EitherWriter`] to support both MDBX and `RocksDB` backends.
439///
440/// ## Process
441/// Iterates over elements, grouping indices by their (address, `storage_key`) pairs. It flushes
442/// indices to disk when reaching a shard's max length (`NUM_OF_INDICES_IN_SHARD`) or when the
443/// (address, `storage_key`) pair changes, ensuring the last previous shard is stored.
444///
445/// Uses `Option<(Address, B256)>` instead of default values as the sentinel to avoid
446/// incorrectly treating `(Address::ZERO, B256::ZERO)` as "no previous key".
447pub(crate) fn load_storage_history<N, CURSOR>(
448    mut collector: Collector<StorageShardedKey, BlockNumberList>,
449    append_only: bool,
450    writer: &mut EitherWriter<'_, CURSOR, N>,
451) -> Result<(), StageError>
452where
453    N: NodePrimitives,
454    CURSOR: DbCursorRW<reth_db_api::tables::StoragesHistory>
455        + DbCursorRO<reth_db_api::tables::StoragesHistory>,
456{
457    let mut current_key: Option<(Address, B256)> = None;
458    // Accumulator for block numbers where the current (address, storage_key) changed.
459    let mut current_list = Vec::<u64>::new();
460
461    let total_entries = collector.len();
462    let interval = (total_entries / 10).max(1);
463
464    for (index, element) in collector.iter()?.enumerate() {
465        let (k, v) = element?;
466        let sharded_key = StorageShardedKey::decode_owned(k)?;
467        let new_list = BlockNumberList::decompress_owned(v)?;
468
469        if index > 0 && index.is_multiple_of(interval) && total_entries > 10 {
470            info!(target: "sync::stages::index_history", progress = %format!("{:.2}%", (index as f64 / total_entries as f64) * 100.0), "Writing indices");
471        }
472
473        let partial_key = (sharded_key.address, sharded_key.sharded_key.key);
474
475        // When (address, storage_key) changes, flush the previous key's shards and start fresh.
476        if current_key != Some(partial_key) {
477            // Flush all remaining shards for the previous key (uses u64::MAX for last shard).
478            if let Some((prev_addr, prev_storage_key)) = current_key {
479                flush_storage_history_shards(
480                    prev_addr,
481                    prev_storage_key,
482                    &mut current_list,
483                    append_only,
484                    writer,
485                )?;
486            }
487
488            current_key = Some(partial_key);
489            current_list.clear();
490
491            // On incremental sync, merge with the existing last shard from the database.
492            // The last shard is stored with key (address, storage_key, u64::MAX) so we can find it.
493            if !append_only &&
494                let Some(last_shard) =
495                    writer.get_last_storage_history_shard(partial_key.0, partial_key.1)?
496            {
497                current_list.extend(last_shard.iter());
498            }
499        }
500
501        // Append new block numbers to the accumulator.
502        current_list.extend(new_list.iter());
503
504        // Flush complete shards, keeping the last (partial) shard buffered.
505        flush_storage_history_shards_partial(
506            partial_key.0,
507            partial_key.1,
508            &mut current_list,
509            append_only,
510            writer,
511        )?;
512    }
513
514    // Flush the final key's remaining shard.
515    if let Some((addr, storage_key)) = current_key {
516        flush_storage_history_shards(addr, storage_key, &mut current_list, append_only, writer)?;
517    }
518
519    Ok(())
520}
521
522/// Flushes complete shards for storage history, keeping the trailing partial shard buffered.
523///
524/// Only flushes when we have more than one shard's worth of data, keeping the last
525/// (possibly partial) shard for continued accumulation. This avoids writing a shard
526/// that may need to be updated when more indices arrive.
527fn flush_storage_history_shards_partial<N, CURSOR>(
528    address: Address,
529    storage_key: B256,
530    list: &mut Vec<u64>,
531    append_only: bool,
532    writer: &mut EitherWriter<'_, CURSOR, N>,
533) -> Result<(), StageError>
534where
535    N: NodePrimitives,
536    CURSOR: DbCursorRW<reth_db_api::tables::StoragesHistory>
537        + DbCursorRO<reth_db_api::tables::StoragesHistory>,
538{
539    // Nothing to flush if we haven't filled a complete shard yet.
540    if list.len() <= NUM_OF_INDICES_IN_SHARD {
541        return Ok(());
542    }
543
544    let num_full_shards = list.len() / NUM_OF_INDICES_IN_SHARD;
545
546    // Always keep at least one shard buffered for continued accumulation.
547    // If len is exact multiple of shard size, keep the last full shard.
548    let shards_to_flush = if list.len().is_multiple_of(NUM_OF_INDICES_IN_SHARD) {
549        num_full_shards - 1
550    } else {
551        num_full_shards
552    };
553
554    if shards_to_flush == 0 {
555        return Ok(());
556    }
557
558    // Split: flush the first N shards, keep the remainder buffered.
559    let flush_len = shards_to_flush * NUM_OF_INDICES_IN_SHARD;
560    let remainder = list.split_off(flush_len);
561
562    // Write each complete shard with its highest block number as the key.
563    for chunk in list.chunks(NUM_OF_INDICES_IN_SHARD) {
564        let highest = *chunk.last().expect("chunk is non-empty");
565        let key = StorageShardedKey::new(address, storage_key, highest);
566        let value = BlockNumberList::new_pre_sorted(chunk.iter().copied());
567
568        if append_only {
569            writer.append_storage_history(key, &value)?;
570        } else {
571            writer.upsert_storage_history(key, &value)?;
572        }
573    }
574
575    // Keep the remaining indices for the next iteration.
576    *list = remainder;
577    Ok(())
578}
579
580/// Flushes all remaining shards for storage history, using `u64::MAX` for the last shard.
581///
582/// The `u64::MAX` key for the final shard is an invariant that allows
583/// `seek_exact(address, storage_key, u64::MAX)` to find the last shard during incremental
584/// sync for merging with new indices.
585fn flush_storage_history_shards<N, CURSOR>(
586    address: Address,
587    storage_key: B256,
588    list: &mut Vec<u64>,
589    append_only: bool,
590    writer: &mut EitherWriter<'_, CURSOR, N>,
591) -> Result<(), StageError>
592where
593    N: NodePrimitives,
594    CURSOR: DbCursorRW<reth_db_api::tables::StoragesHistory>
595        + DbCursorRO<reth_db_api::tables::StoragesHistory>,
596{
597    if list.is_empty() {
598        return Ok(());
599    }
600
601    let num_chunks = list.len().div_ceil(NUM_OF_INDICES_IN_SHARD);
602
603    for (i, chunk) in list.chunks(NUM_OF_INDICES_IN_SHARD).enumerate() {
604        let is_last = i == num_chunks - 1;
605
606        // Use u64::MAX for the final shard's key. This invariant allows incremental sync
607        // to find the last shard via seek_exact(address, storage_key, u64::MAX) for merging.
608        let highest = if is_last { u64::MAX } else { *chunk.last().expect("chunk is non-empty") };
609
610        let key = StorageShardedKey::new(address, storage_key, highest);
611        let value = BlockNumberList::new_pre_sorted(chunk.iter().copied());
612
613        if append_only {
614            writer.append_storage_history(key, &value)?;
615        } else {
616            writer.upsert_storage_history(key, &value)?;
617        }
618    }
619
620    list.clear();
621    Ok(())
622}