Skip to main content

reth_stages/stages/
utils.rs

1//! Utils for `stages`.
2use alloy_primitives::{map::AddressMap, Address, BlockNumber, TxNumber, B256};
3use reth_config::config::EtlConfig;
4use reth_db_api::{
5    cursor::{DbCursorRO, DbCursorRW},
6    models::{
7        sharded_key::NUM_OF_INDICES_IN_SHARD, storage_sharded_key::StorageShardedKey,
8        AccountBeforeTx, AddressStorageKey, BlockNumberAddress, ShardedKey,
9    },
10    table::{Decode, Decompress, Table},
11    transaction::DbTx,
12    BlockNumberList,
13};
14use reth_etl::Collector;
15use reth_primitives_traits::NodePrimitives;
16use reth_provider::{
17    providers::StaticFileProvider, to_range, BlockReader, DBProvider, EitherWriter, ProviderError,
18    StaticFileProviderFactory,
19};
20use reth_stages_api::StageError;
21use reth_static_file_types::StaticFileSegment;
22use reth_storage_api::{ChangeSetReader, StorageChangeSetReader};
23use std::{collections::HashMap, hash::Hash, ops::RangeBounds};
24use tracing::info;
25
26/// Number of blocks before pushing indices from cache to [`Collector`]
27const DEFAULT_CACHE_THRESHOLD: u64 = 100_000;
28
29/// Collects all history (`H`) indices for a range of changesets (`CS`) and stores them in a
30/// [`Collector`].
31///
32/// ## Process
33/// The function utilizes a `HashMap` cache with a structure of `PartialKey` (`P`) (Address or
34/// Address.StorageKey) to `BlockNumberList`. When the cache exceeds its capacity, its contents are
35/// moved to a [`Collector`]. Here, each entry's key is a concatenation of `PartialKey` and the
36/// highest block number in its list.
37///
38/// ## Example
39/// 1. Initial Cache State: `{ Address1: [1,2,3], ... }`
40/// 2. Cache is flushed to the `Collector`.
41/// 3. Updated Cache State: `{ Address1: [100,300], ... }`
42/// 4. Cache is flushed again.
43///
44/// As a result, the `Collector` will contain entries such as `(Address1.3, [1,2,3])` and
45/// `(Address1.300, [100,300])`. The entries may be stored across one or more files.
46pub(crate) fn collect_history_indices<Provider, CS, H, P>(
47    provider: &Provider,
48    range: impl RangeBounds<CS::Key>,
49    sharded_key_factory: impl Fn(P, BlockNumber) -> H::Key,
50    partial_key_factory: impl Fn((CS::Key, CS::Value)) -> (u64, P),
51    etl_config: &EtlConfig,
52) -> Result<Collector<H::Key, H::Value>, StageError>
53where
54    Provider: DBProvider,
55    CS: Table,
56    H: Table<Value = BlockNumberList>,
57    P: Copy + Eq + Hash,
58{
59    let mut changeset_cursor = provider.tx_ref().cursor_read::<CS>()?;
60
61    let mut collector = Collector::new(etl_config.file_size, etl_config.dir.clone());
62    let mut cache: HashMap<P, Vec<u64>> = HashMap::default();
63
64    let mut collect = |cache: &mut HashMap<P, Vec<u64>>| {
65        for (key, indices) in cache.drain() {
66            let last = *indices.last().expect("qed");
67            collector.insert(
68                sharded_key_factory(key, last),
69                BlockNumberList::new_pre_sorted(indices.into_iter()),
70            )?;
71        }
72        Ok::<(), StageError>(())
73    };
74
75    // observability
76    let total_changesets = provider.tx_ref().entries::<CS>()?;
77    let interval = (total_changesets / 1000).max(1);
78
79    let mut flush_counter = 0;
80    let mut current_block_number = u64::MAX;
81    for (idx, entry) in changeset_cursor.walk_range(range)?.enumerate() {
82        let (block_number, key) = partial_key_factory(entry?);
83        cache.entry(key).or_default().push(block_number);
84
85        if idx > 0 && idx.is_multiple_of(interval) && total_changesets > 1000 {
86            info!(target: "sync::stages::index_history", progress = %format!("{:.4}%", (idx as f64 / total_changesets as f64) * 100.0), "Collecting indices");
87        }
88
89        // Make sure we only flush the cache every DEFAULT_CACHE_THRESHOLD blocks.
90        if current_block_number != block_number {
91            current_block_number = block_number;
92            flush_counter += 1;
93            if flush_counter > DEFAULT_CACHE_THRESHOLD {
94                collect(&mut cache)?;
95                flush_counter = 0;
96            }
97        }
98    }
99    collect(&mut cache)?;
100
101    Ok(collector)
102}
103
104/// Allows collecting indices from a cache with a custom insert fn
105fn collect_indices<K, F>(
106    cache: impl Iterator<Item = (K, Vec<u64>)>,
107    mut insert_fn: F,
108) -> Result<(), StageError>
109where
110    F: FnMut(K, Vec<u64>) -> Result<(), StageError>,
111{
112    for (key, indices) in cache {
113        insert_fn(key, indices)?
114    }
115    Ok(())
116}
117
118/// Collects account history indices using a provider that implements `ChangeSetReader`.
119pub(crate) fn collect_account_history_indices<Provider>(
120    provider: &Provider,
121    range: impl RangeBounds<BlockNumber>,
122    etl_config: &EtlConfig,
123) -> Result<Collector<ShardedKey<Address>, BlockNumberList>, StageError>
124where
125    Provider: DBProvider + ChangeSetReader + StaticFileProviderFactory,
126{
127    let mut collector = Collector::new(etl_config.file_size, etl_config.dir.clone());
128    let mut cache: AddressMap<Vec<u64>> = AddressMap::default();
129
130    let mut insert_fn = |address: Address, indices: Vec<u64>| {
131        let last = indices.last().expect("indices is non-empty");
132        collector.insert(
133            ShardedKey::new(address, *last),
134            BlockNumberList::new_pre_sorted(indices.into_iter()),
135        )?;
136        Ok(())
137    };
138
139    // Convert range bounds to concrete range
140    let range = to_range(range);
141
142    // Use the new walker for lazy iteration over static file changesets
143    let static_file_provider = provider.static_file_provider();
144
145    // Get total count for progress reporting
146    let total_changesets = static_file_provider.account_changeset_count()?;
147    let interval = (total_changesets / 1000).max(1);
148
149    let walker = static_file_provider.walk_account_changeset_range(range);
150
151    let mut flush_counter = 0;
152    let mut current_block_number = u64::MAX;
153
154    for (idx, changeset_result) in walker.enumerate() {
155        let (block_number, AccountBeforeTx { address, .. }) = changeset_result?;
156        cache.entry(address).or_default().push(block_number);
157
158        if idx > 0 && idx % interval == 0 && total_changesets > 1000 {
159            info!(target: "sync::stages::index_history", progress = %format!("{:.4}%", (idx as f64 / total_changesets as f64) * 100.0), "Collecting indices");
160        }
161
162        if block_number != current_block_number {
163            current_block_number = block_number;
164            flush_counter += 1;
165        }
166
167        if flush_counter > DEFAULT_CACHE_THRESHOLD {
168            collect_indices(cache.drain(), &mut insert_fn)?;
169            flush_counter = 0;
170        }
171    }
172    collect_indices(cache.into_iter(), insert_fn)?;
173
174    Ok(collector)
175}
176
177/// Collects storage history indices using a provider that implements `StorageChangeSetReader`.
178pub(crate) fn collect_storage_history_indices<Provider>(
179    provider: &Provider,
180    range: impl RangeBounds<BlockNumber>,
181    etl_config: &EtlConfig,
182) -> Result<Collector<StorageShardedKey, BlockNumberList>, StageError>
183where
184    Provider: DBProvider + StorageChangeSetReader + StaticFileProviderFactory,
185{
186    let mut collector = Collector::new(etl_config.file_size, etl_config.dir.clone());
187    let mut cache: HashMap<AddressStorageKey, Vec<u64>> = HashMap::default();
188
189    let mut insert_fn = |key: AddressStorageKey, indices: Vec<u64>| {
190        let last = indices.last().expect("qed");
191        collector.insert(
192            StorageShardedKey::new(key.0 .0, key.0 .1, *last),
193            BlockNumberList::new_pre_sorted(indices.into_iter()),
194        )?;
195        Ok::<(), StageError>(())
196    };
197
198    let range = to_range(range);
199    let static_file_provider = provider.static_file_provider();
200
201    let total_changesets = static_file_provider.storage_changeset_count()?;
202    let interval = (total_changesets / 1000).max(1);
203
204    let walker = static_file_provider.walk_storage_changeset_range(range);
205
206    let mut flush_counter = 0;
207    let mut current_block_number = u64::MAX;
208
209    for (idx, changeset_result) in walker.enumerate() {
210        let (BlockNumberAddress((block_number, address)), storage) = changeset_result?;
211        cache
212            .entry(AddressStorageKey((address, storage.key.as_b256())))
213            .or_default()
214            .push(block_number);
215
216        if idx > 0 && idx % interval == 0 && total_changesets > 1000 {
217            info!(target: "sync::stages::index_history", progress = %format!("{:.4}%", (idx as f64 / total_changesets as f64) * 100.0), "Collecting indices");
218        }
219
220        if block_number != current_block_number {
221            current_block_number = block_number;
222            flush_counter += 1;
223        }
224
225        if flush_counter > DEFAULT_CACHE_THRESHOLD {
226            collect_indices(cache.drain(), &mut insert_fn)?;
227            flush_counter = 0;
228        }
229    }
230
231    collect_indices(cache.into_iter(), insert_fn)?;
232
233    Ok(collector)
234}
235
236/// Loads account history indices into the database via `EitherWriter`.
237///
238/// Works with [`EitherWriter`] to support both MDBX and `RocksDB` backends.
239///
240/// ## Process
241/// Iterates over elements, grouping indices by their address. It flushes indices to disk
242/// when reaching a shard's max length (`NUM_OF_INDICES_IN_SHARD`) or when the address changes,
243/// ensuring the last previous address shard is stored.
244///
245/// Uses `Option<Address>` instead of `Address::default()` as the sentinel to avoid
246/// incorrectly treating `Address::ZERO` as "no previous address".
247pub(crate) fn load_account_history<N, CURSOR>(
248    mut collector: Collector<ShardedKey<Address>, BlockNumberList>,
249    append_only: bool,
250    writer: &mut EitherWriter<'_, CURSOR, N>,
251) -> Result<(), StageError>
252where
253    N: NodePrimitives,
254    CURSOR: DbCursorRW<reth_db_api::tables::AccountsHistory>
255        + DbCursorRO<reth_db_api::tables::AccountsHistory>,
256{
257    let mut current_address: Option<Address> = None;
258    // Accumulator for block numbers where the current address changed.
259    let mut current_list = Vec::<u64>::new();
260
261    let total_entries = collector.len();
262    let interval = (total_entries / 10).max(1);
263
264    for (index, element) in collector.iter()?.enumerate() {
265        let (k, v) = element?;
266        let sharded_key = ShardedKey::<Address>::decode_owned(k)?;
267        let new_list = BlockNumberList::decompress_owned(v)?;
268
269        if index > 0 && index.is_multiple_of(interval) && total_entries > 10 {
270            info!(target: "sync::stages::index_history", progress = %format!("{:.2}%", (index as f64 / total_entries as f64) * 100.0), "Writing indices");
271        }
272
273        let address = sharded_key.key;
274
275        // When address changes, flush the previous address's shards and start fresh.
276        if current_address != Some(address) {
277            // Flush all remaining shards for the previous address (uses u64::MAX for last shard).
278            if let Some(prev_addr) = current_address {
279                flush_account_history_shards(prev_addr, &mut current_list, append_only, writer)?;
280            }
281
282            current_address = Some(address);
283            current_list.clear();
284
285            // On incremental sync, merge with the existing last shard from the database.
286            // The last shard is stored with key (address, u64::MAX) so we can find it.
287            if !append_only &&
288                let Some(last_shard) = writer.get_last_account_history_shard(address)?
289            {
290                current_list.extend(last_shard.iter());
291            }
292        }
293
294        // Append new block numbers to the accumulator.
295        current_list.extend(new_list.iter());
296
297        // Flush complete shards, keeping the last (partial) shard buffered.
298        flush_account_history_shards_partial(address, &mut current_list, append_only, writer)?;
299    }
300
301    // Flush the final address's remaining shard.
302    if let Some(addr) = current_address {
303        flush_account_history_shards(addr, &mut current_list, append_only, writer)?;
304    }
305
306    Ok(())
307}
308
309/// Flushes complete shards for account history, keeping the trailing partial shard buffered.
310///
311/// Only flushes when we have more than one shard's worth of data, keeping the last
312/// (possibly partial) shard for continued accumulation. This avoids writing a shard
313/// that may need to be updated when more indices arrive.
314fn flush_account_history_shards_partial<N, CURSOR>(
315    address: Address,
316    list: &mut Vec<u64>,
317    append_only: bool,
318    writer: &mut EitherWriter<'_, CURSOR, N>,
319) -> Result<(), StageError>
320where
321    N: NodePrimitives,
322    CURSOR: DbCursorRW<reth_db_api::tables::AccountsHistory>
323        + DbCursorRO<reth_db_api::tables::AccountsHistory>,
324{
325    // Nothing to flush if we haven't filled a complete shard yet.
326    if list.len() <= NUM_OF_INDICES_IN_SHARD {
327        return Ok(());
328    }
329
330    let num_full_shards = list.len() / NUM_OF_INDICES_IN_SHARD;
331
332    // Always keep at least one shard buffered for continued accumulation.
333    // If len is exact multiple of shard size, keep the last full shard.
334    let shards_to_flush = if list.len().is_multiple_of(NUM_OF_INDICES_IN_SHARD) {
335        num_full_shards - 1
336    } else {
337        num_full_shards
338    };
339
340    if shards_to_flush == 0 {
341        return Ok(());
342    }
343
344    // Split: flush the first N shards, keep the remainder buffered.
345    let flush_len = shards_to_flush * NUM_OF_INDICES_IN_SHARD;
346    let remainder = list.split_off(flush_len);
347
348    // Write each complete shard with its highest block number as the key.
349    for chunk in list.chunks(NUM_OF_INDICES_IN_SHARD) {
350        let highest = *chunk.last().expect("chunk is non-empty");
351        let key = ShardedKey::new(address, highest);
352        let value = BlockNumberList::new_pre_sorted(chunk.iter().copied());
353
354        if append_only {
355            writer.append_account_history(key, &value)?;
356        } else {
357            writer.upsert_account_history(key, &value)?;
358        }
359    }
360
361    // Keep the remaining indices for the next iteration.
362    *list = remainder;
363    Ok(())
364}
365
366/// Flushes all remaining shards for account history, using `u64::MAX` for the last shard.
367///
368/// The `u64::MAX` key for the final shard is an invariant that allows `seek_exact(address,
369/// u64::MAX)` to find the last shard during incremental sync for merging with new indices.
370fn flush_account_history_shards<N, CURSOR>(
371    address: Address,
372    list: &mut Vec<u64>,
373    append_only: bool,
374    writer: &mut EitherWriter<'_, CURSOR, N>,
375) -> Result<(), StageError>
376where
377    N: NodePrimitives,
378    CURSOR: DbCursorRW<reth_db_api::tables::AccountsHistory>
379        + DbCursorRO<reth_db_api::tables::AccountsHistory>,
380{
381    if list.is_empty() {
382        return Ok(());
383    }
384
385    let num_chunks = list.len().div_ceil(NUM_OF_INDICES_IN_SHARD);
386
387    for (i, chunk) in list.chunks(NUM_OF_INDICES_IN_SHARD).enumerate() {
388        let is_last = i == num_chunks - 1;
389
390        // Use u64::MAX for the final shard's key. This invariant allows incremental sync
391        // to find the last shard via seek_exact(address, u64::MAX) for merging.
392        let highest = if is_last { u64::MAX } else { *chunk.last().expect("chunk is non-empty") };
393
394        let key = ShardedKey::new(address, highest);
395        let value = BlockNumberList::new_pre_sorted(chunk.iter().copied());
396
397        if append_only {
398            writer.append_account_history(key, &value)?;
399        } else {
400            writer.upsert_account_history(key, &value)?;
401        }
402    }
403
404    list.clear();
405    Ok(())
406}
407
408/// Called when database is ahead of static files. Attempts to find the first block we are missing
409/// transactions for.
410pub(crate) fn missing_static_data_error<Provider>(
411    last_tx_num: TxNumber,
412    static_file_provider: &StaticFileProvider<Provider::Primitives>,
413    provider: &Provider,
414    segment: StaticFileSegment,
415) -> Result<StageError, ProviderError>
416where
417    Provider: BlockReader + StaticFileProviderFactory,
418{
419    let mut last_block =
420        static_file_provider.get_highest_static_file_block(segment).unwrap_or_default();
421
422    // To be extra safe, we make sure that the last tx num matches the last block from its indices.
423    // If not, get it.
424    loop {
425        if let Some(indices) = provider.block_body_indices(last_block)? &&
426            indices.last_tx_num() <= last_tx_num
427        {
428            break
429        }
430        if last_block == 0 {
431            break
432        }
433        last_block -= 1;
434    }
435
436    let missing_block = Box::new(provider.sealed_header(last_block + 1)?.unwrap_or_default());
437
438    Ok(StageError::MissingStaticFileData {
439        block: Box::new(missing_block.block_with_parent()),
440        segment,
441    })
442}
443
444/// Loads storage history indices into the database via `EitherWriter`.
445///
446/// Works with [`EitherWriter`] to support both MDBX and `RocksDB` backends.
447///
448/// ## Process
449/// Iterates over elements, grouping indices by their (address, `storage_key`) pairs. It flushes
450/// indices to disk when reaching a shard's max length (`NUM_OF_INDICES_IN_SHARD`) or when the
451/// (address, `storage_key`) pair changes, ensuring the last previous shard is stored.
452///
453/// Uses `Option<(Address, B256)>` instead of default values as the sentinel to avoid
454/// incorrectly treating `(Address::ZERO, B256::ZERO)` as "no previous key".
455pub(crate) fn load_storage_history<N, CURSOR>(
456    mut collector: Collector<StorageShardedKey, BlockNumberList>,
457    append_only: bool,
458    writer: &mut EitherWriter<'_, CURSOR, N>,
459) -> Result<(), StageError>
460where
461    N: NodePrimitives,
462    CURSOR: DbCursorRW<reth_db_api::tables::StoragesHistory>
463        + DbCursorRO<reth_db_api::tables::StoragesHistory>,
464{
465    let mut current_key: Option<(Address, B256)> = None;
466    // Accumulator for block numbers where the current (address, storage_key) changed.
467    let mut current_list = Vec::<u64>::new();
468
469    let total_entries = collector.len();
470    let interval = (total_entries / 10).max(1);
471
472    for (index, element) in collector.iter()?.enumerate() {
473        let (k, v) = element?;
474        let sharded_key = StorageShardedKey::decode_owned(k)?;
475        let new_list = BlockNumberList::decompress_owned(v)?;
476
477        if index > 0 && index.is_multiple_of(interval) && total_entries > 10 {
478            info!(target: "sync::stages::index_history", progress = %format!("{:.2}%", (index as f64 / total_entries as f64) * 100.0), "Writing indices");
479        }
480
481        let partial_key = (sharded_key.address, sharded_key.sharded_key.key);
482
483        // When (address, storage_key) changes, flush the previous key's shards and start fresh.
484        if current_key != Some(partial_key) {
485            // Flush all remaining shards for the previous key (uses u64::MAX for last shard).
486            if let Some((prev_addr, prev_storage_key)) = current_key {
487                flush_storage_history_shards(
488                    prev_addr,
489                    prev_storage_key,
490                    &mut current_list,
491                    append_only,
492                    writer,
493                )?;
494            }
495
496            current_key = Some(partial_key);
497            current_list.clear();
498
499            // On incremental sync, merge with the existing last shard from the database.
500            // The last shard is stored with key (address, storage_key, u64::MAX) so we can find it.
501            if !append_only &&
502                let Some(last_shard) =
503                    writer.get_last_storage_history_shard(partial_key.0, partial_key.1)?
504            {
505                current_list.extend(last_shard.iter());
506            }
507        }
508
509        // Append new block numbers to the accumulator.
510        current_list.extend(new_list.iter());
511
512        // Flush complete shards, keeping the last (partial) shard buffered.
513        flush_storage_history_shards_partial(
514            partial_key.0,
515            partial_key.1,
516            &mut current_list,
517            append_only,
518            writer,
519        )?;
520    }
521
522    // Flush the final key's remaining shard.
523    if let Some((addr, storage_key)) = current_key {
524        flush_storage_history_shards(addr, storage_key, &mut current_list, append_only, writer)?;
525    }
526
527    Ok(())
528}
529
530/// Flushes complete shards for storage history, keeping the trailing partial shard buffered.
531///
532/// Only flushes when we have more than one shard's worth of data, keeping the last
533/// (possibly partial) shard for continued accumulation. This avoids writing a shard
534/// that may need to be updated when more indices arrive.
535fn flush_storage_history_shards_partial<N, CURSOR>(
536    address: Address,
537    storage_key: B256,
538    list: &mut Vec<u64>,
539    append_only: bool,
540    writer: &mut EitherWriter<'_, CURSOR, N>,
541) -> Result<(), StageError>
542where
543    N: NodePrimitives,
544    CURSOR: DbCursorRW<reth_db_api::tables::StoragesHistory>
545        + DbCursorRO<reth_db_api::tables::StoragesHistory>,
546{
547    // Nothing to flush if we haven't filled a complete shard yet.
548    if list.len() <= NUM_OF_INDICES_IN_SHARD {
549        return Ok(());
550    }
551
552    let num_full_shards = list.len() / NUM_OF_INDICES_IN_SHARD;
553
554    // Always keep at least one shard buffered for continued accumulation.
555    // If len is exact multiple of shard size, keep the last full shard.
556    let shards_to_flush = if list.len().is_multiple_of(NUM_OF_INDICES_IN_SHARD) {
557        num_full_shards - 1
558    } else {
559        num_full_shards
560    };
561
562    if shards_to_flush == 0 {
563        return Ok(());
564    }
565
566    // Split: flush the first N shards, keep the remainder buffered.
567    let flush_len = shards_to_flush * NUM_OF_INDICES_IN_SHARD;
568    let remainder = list.split_off(flush_len);
569
570    // Write each complete shard with its highest block number as the key.
571    for chunk in list.chunks(NUM_OF_INDICES_IN_SHARD) {
572        let highest = *chunk.last().expect("chunk is non-empty");
573        let key = StorageShardedKey::new(address, storage_key, highest);
574        let value = BlockNumberList::new_pre_sorted(chunk.iter().copied());
575
576        if append_only {
577            writer.append_storage_history(key, &value)?;
578        } else {
579            writer.upsert_storage_history(key, &value)?;
580        }
581    }
582
583    // Keep the remaining indices for the next iteration.
584    *list = remainder;
585    Ok(())
586}
587
588/// Flushes all remaining shards for storage history, using `u64::MAX` for the last shard.
589///
590/// The `u64::MAX` key for the final shard is an invariant that allows
591/// `seek_exact(address, storage_key, u64::MAX)` to find the last shard during incremental
592/// sync for merging with new indices.
593fn flush_storage_history_shards<N, CURSOR>(
594    address: Address,
595    storage_key: B256,
596    list: &mut Vec<u64>,
597    append_only: bool,
598    writer: &mut EitherWriter<'_, CURSOR, N>,
599) -> Result<(), StageError>
600where
601    N: NodePrimitives,
602    CURSOR: DbCursorRW<reth_db_api::tables::StoragesHistory>
603        + DbCursorRO<reth_db_api::tables::StoragesHistory>,
604{
605    if list.is_empty() {
606        return Ok(());
607    }
608
609    let num_chunks = list.len().div_ceil(NUM_OF_INDICES_IN_SHARD);
610
611    for (i, chunk) in list.chunks(NUM_OF_INDICES_IN_SHARD).enumerate() {
612        let is_last = i == num_chunks - 1;
613
614        // Use u64::MAX for the final shard's key. This invariant allows incremental sync
615        // to find the last shard via seek_exact(address, storage_key, u64::MAX) for merging.
616        let highest = if is_last { u64::MAX } else { *chunk.last().expect("chunk is non-empty") };
617
618        let key = StorageShardedKey::new(address, storage_key, highest);
619        let value = BlockNumberList::new_pre_sorted(chunk.iter().copied());
620
621        if append_only {
622            writer.append_storage_history(key, &value)?;
623        } else {
624            writer.upsert_storage_history(key, &value)?;
625        }
626    }
627
628    list.clear();
629    Ok(())
630}