Skip to main content

reth_stages/stages/
utils.rs

1//! Utils for `stages`.
2use alloy_primitives::{map::AddressMap, Address, BlockNumber, TxNumber, B256};
3use reth_config::config::EtlConfig;
4use reth_db_api::{
5    cursor::{DbCursorRO, DbCursorRW},
6    models::{
7        sharded_key::NUM_OF_INDICES_IN_SHARD, storage_sharded_key::StorageShardedKey,
8        AccountBeforeTx, AddressStorageKey, BlockNumberAddress, ShardedKey,
9    },
10    table::{Decode, Decompress, Table},
11    transaction::DbTx,
12    BlockNumberList,
13};
14use reth_etl::Collector;
15use reth_primitives_traits::NodePrimitives;
16use reth_provider::{
17    providers::StaticFileProvider, to_range, BlockReader, DBProvider, EitherWriter, ProviderError,
18    StaticFileProviderFactory,
19};
20use reth_stages_api::StageError;
21use reth_static_file_types::StaticFileSegment;
22use reth_storage_api::{ChangeSetReader, StorageChangeSetReader};
23use std::{collections::HashMap, hash::Hash, ops::RangeBounds};
24use tracing::info;
25
26/// Number of blocks before pushing indices from cache to [`Collector`]
27const DEFAULT_CACHE_THRESHOLD: u64 = 100_000;
28
29/// Collects all history (`H`) indices for a range of changesets (`CS`) and stores them in a
30/// [`Collector`].
31///
32/// ## Process
33/// The function utilizes a `HashMap` cache with a structure of `PartialKey` (`P`) (Address or
34/// Address.StorageKey) to `BlockNumberList`. When the cache exceeds its capacity, its contents are
35/// moved to a [`Collector`]. Here, each entry's key is a concatenation of `PartialKey` and the
36/// highest block number in its list.
37///
38/// ## Example
39/// 1. Initial Cache State: `{ Address1: [1,2,3], ... }`
40/// 2. Cache is flushed to the `Collector`.
41/// 3. Updated Cache State: `{ Address1: [100,300], ... }`
42/// 4. Cache is flushed again.
43///
44/// As a result, the `Collector` will contain entries such as `(Address1.3, [1,2,3])` and
45/// `(Address1.300, [100,300])`. The entries may be stored across one or more files.
46pub(crate) fn collect_history_indices<Provider, CS, H, P>(
47    provider: &Provider,
48    range: impl RangeBounds<CS::Key>,
49    sharded_key_factory: impl Fn(P, BlockNumber) -> H::Key,
50    partial_key_factory: impl Fn((CS::Key, CS::Value)) -> (u64, P),
51    etl_config: &EtlConfig,
52) -> Result<Collector<H::Key, H::Value>, StageError>
53where
54    Provider: DBProvider,
55    CS: Table,
56    H: Table<Value = BlockNumberList>,
57    P: Copy + Eq + Hash,
58{
59    let mut changeset_cursor = provider.tx_ref().cursor_read::<CS>()?;
60
61    let mut collector = Collector::new(etl_config.file_size, etl_config.dir.clone());
62    let mut cache: HashMap<P, Vec<u64>> = HashMap::default();
63
64    let mut collect = |cache: &mut HashMap<P, Vec<u64>>| {
65        for (key, indices) in cache.drain() {
66            let last = *indices.last().expect("qed");
67            collector
68                .insert(sharded_key_factory(key, last), BlockNumberList::new_pre_sorted(indices))?;
69        }
70        Ok::<(), StageError>(())
71    };
72
73    // observability
74    let total_changesets = provider.tx_ref().entries::<CS>()?;
75    let interval = (total_changesets / 1000).max(1);
76
77    let mut flush_counter = 0;
78    let mut current_block_number = u64::MAX;
79    for (idx, entry) in changeset_cursor.walk_range(range)?.enumerate() {
80        let (block_number, key) = partial_key_factory(entry?);
81        cache.entry(key).or_default().push(block_number);
82
83        if idx > 0 && idx.is_multiple_of(interval) && total_changesets > 1000 {
84            info!(target: "sync::stages::index_history", progress = %format!("{:.4}%", (idx as f64 / total_changesets as f64) * 100.0), "Collecting indices");
85        }
86
87        // Make sure we only flush the cache every DEFAULT_CACHE_THRESHOLD blocks.
88        if current_block_number != block_number {
89            current_block_number = block_number;
90            flush_counter += 1;
91            if flush_counter > DEFAULT_CACHE_THRESHOLD {
92                collect(&mut cache)?;
93                flush_counter = 0;
94            }
95        }
96    }
97    collect(&mut cache)?;
98
99    Ok(collector)
100}
101
102/// Allows collecting indices from a cache with a custom insert fn
103fn collect_indices<K, F>(
104    cache: impl Iterator<Item = (K, Vec<u64>)>,
105    mut insert_fn: F,
106) -> Result<(), StageError>
107where
108    F: FnMut(K, Vec<u64>) -> Result<(), StageError>,
109{
110    for (key, indices) in cache {
111        insert_fn(key, indices)?
112    }
113    Ok(())
114}
115
116/// Collects account history indices using a provider that implements `ChangeSetReader`.
117pub(crate) fn collect_account_history_indices<Provider>(
118    provider: &Provider,
119    range: impl RangeBounds<BlockNumber>,
120    etl_config: &EtlConfig,
121) -> Result<Collector<ShardedKey<Address>, BlockNumberList>, StageError>
122where
123    Provider: DBProvider + ChangeSetReader + StaticFileProviderFactory,
124{
125    let mut collector = Collector::new(etl_config.file_size, etl_config.dir.clone());
126    let mut cache: AddressMap<Vec<u64>> = AddressMap::default();
127
128    let mut insert_fn = |address: Address, indices: Vec<u64>| {
129        let last = indices.last().expect("indices is non-empty");
130        collector
131            .insert(ShardedKey::new(address, *last), BlockNumberList::new_pre_sorted(indices))?;
132        Ok(())
133    };
134
135    // Convert range bounds to concrete range
136    let range = to_range(range);
137
138    // Use the new walker for lazy iteration over static file changesets
139    let static_file_provider = provider.static_file_provider();
140
141    // Get total count for progress reporting
142    let total_changesets = static_file_provider.account_changeset_count()?;
143    let interval = (total_changesets / 1000).max(1);
144
145    let walker = static_file_provider.walk_account_changeset_range(range);
146
147    let mut flush_counter = 0;
148    let mut current_block_number = u64::MAX;
149
150    for (idx, changeset_result) in walker.enumerate() {
151        let (block_number, AccountBeforeTx { address, .. }) = changeset_result?;
152        cache.entry(address).or_default().push(block_number);
153
154        if idx > 0 && idx % interval == 0 && total_changesets > 1000 {
155            info!(target: "sync::stages::index_history", progress = %format!("{:.4}%", (idx as f64 / total_changesets as f64) * 100.0), "Collecting indices");
156        }
157
158        if block_number != current_block_number {
159            current_block_number = block_number;
160            flush_counter += 1;
161        }
162
163        if flush_counter > DEFAULT_CACHE_THRESHOLD {
164            collect_indices(cache.drain(), &mut insert_fn)?;
165            flush_counter = 0;
166        }
167    }
168    collect_indices(cache.into_iter(), insert_fn)?;
169
170    Ok(collector)
171}
172
173/// Collects storage history indices using a provider that implements `StorageChangeSetReader`.
174pub(crate) fn collect_storage_history_indices<Provider>(
175    provider: &Provider,
176    range: impl RangeBounds<BlockNumber>,
177    etl_config: &EtlConfig,
178) -> Result<Collector<StorageShardedKey, BlockNumberList>, StageError>
179where
180    Provider: DBProvider + StorageChangeSetReader + StaticFileProviderFactory,
181{
182    let mut collector = Collector::new(etl_config.file_size, etl_config.dir.clone());
183    let mut cache: HashMap<AddressStorageKey, Vec<u64>> = HashMap::default();
184
185    let mut insert_fn = |key: AddressStorageKey, indices: Vec<u64>| {
186        let last = indices.last().expect("qed");
187        collector.insert(
188            StorageShardedKey::new(key.0 .0, key.0 .1, *last),
189            BlockNumberList::new_pre_sorted(indices),
190        )?;
191        Ok::<(), StageError>(())
192    };
193
194    let range = to_range(range);
195    let static_file_provider = provider.static_file_provider();
196
197    let total_changesets = static_file_provider.storage_changeset_count()?;
198    let interval = (total_changesets / 1000).max(1);
199
200    let walker = static_file_provider.walk_storage_changeset_range(range);
201
202    let mut flush_counter = 0;
203    let mut current_block_number = u64::MAX;
204
205    for (idx, changeset_result) in walker.enumerate() {
206        let (BlockNumberAddress((block_number, address)), storage) = changeset_result?;
207        cache.entry(AddressStorageKey((address, storage.key))).or_default().push(block_number);
208
209        if idx > 0 && idx % interval == 0 && total_changesets > 1000 {
210            info!(target: "sync::stages::index_history", progress = %format!("{:.4}%", (idx as f64 / total_changesets as f64) * 100.0), "Collecting indices");
211        }
212
213        if block_number != current_block_number {
214            current_block_number = block_number;
215            flush_counter += 1;
216        }
217
218        if flush_counter > DEFAULT_CACHE_THRESHOLD {
219            collect_indices(cache.drain(), &mut insert_fn)?;
220            flush_counter = 0;
221        }
222    }
223
224    collect_indices(cache.into_iter(), insert_fn)?;
225
226    Ok(collector)
227}
228
229/// Loads account history indices into the database via `EitherWriter`.
230///
231/// Works with [`EitherWriter`] to support both MDBX and `RocksDB` backends.
232///
233/// ## Process
234/// Iterates over elements, grouping indices by their address. It flushes indices to disk
235/// when reaching a shard's max length (`NUM_OF_INDICES_IN_SHARD`) or when the address changes,
236/// ensuring the last previous address shard is stored.
237///
238/// Uses `Option<Address>` instead of `Address::default()` as the sentinel to avoid
239/// incorrectly treating `Address::ZERO` as "no previous address".
240pub(crate) fn load_account_history<N, CURSOR>(
241    mut collector: Collector<ShardedKey<Address>, BlockNumberList>,
242    append_only: bool,
243    writer: &mut EitherWriter<'_, CURSOR, N>,
244) -> Result<(), StageError>
245where
246    N: NodePrimitives,
247    CURSOR: DbCursorRW<reth_db_api::tables::AccountsHistory>
248        + DbCursorRO<reth_db_api::tables::AccountsHistory>,
249{
250    let mut current_address: Option<Address> = None;
251    // Accumulator for block numbers where the current address changed.
252    let mut current_list = Vec::<u64>::new();
253
254    let total_entries = collector.len();
255    let interval = (total_entries / 10).max(1);
256
257    for (index, element) in collector.iter()?.enumerate() {
258        let (k, v) = element?;
259        let sharded_key = ShardedKey::<Address>::decode_owned(k)?;
260        let new_list = BlockNumberList::decompress_owned(v)?;
261
262        if index > 0 && index.is_multiple_of(interval) && total_entries > 10 {
263            info!(target: "sync::stages::index_history", progress = %format!("{:.2}%", (index as f64 / total_entries as f64) * 100.0), "Writing indices");
264        }
265
266        let address = sharded_key.key;
267
268        // When address changes, flush the previous address's shards and start fresh.
269        if current_address != Some(address) {
270            // Flush all remaining shards for the previous address (uses u64::MAX for last shard).
271            if let Some(prev_addr) = current_address {
272                flush_account_history_shards(prev_addr, &mut current_list, append_only, writer)?;
273            }
274
275            current_address = Some(address);
276            current_list.clear();
277
278            // On incremental sync, merge with the existing last shard from the database.
279            // The last shard is stored with key (address, u64::MAX) so we can find it.
280            if !append_only &&
281                let Some(last_shard) = writer.get_last_account_history_shard(address)?
282            {
283                current_list.extend(last_shard.iter());
284            }
285        }
286
287        // Append new block numbers to the accumulator.
288        current_list.extend(new_list.iter());
289
290        // Flush complete shards, keeping the last (partial) shard buffered.
291        flush_account_history_shards_partial(address, &mut current_list, append_only, writer)?;
292    }
293
294    // Flush the final address's remaining shard.
295    if let Some(addr) = current_address {
296        flush_account_history_shards(addr, &mut current_list, append_only, writer)?;
297    }
298
299    Ok(())
300}
301
302/// Flushes complete shards for account history, keeping the trailing partial shard buffered.
303///
304/// Only flushes when we have more than one shard's worth of data, keeping the last
305/// (possibly partial) shard for continued accumulation. This avoids writing a shard
306/// that may need to be updated when more indices arrive.
307fn flush_account_history_shards_partial<N, CURSOR>(
308    address: Address,
309    list: &mut Vec<u64>,
310    append_only: bool,
311    writer: &mut EitherWriter<'_, CURSOR, N>,
312) -> Result<(), StageError>
313where
314    N: NodePrimitives,
315    CURSOR: DbCursorRW<reth_db_api::tables::AccountsHistory>
316        + DbCursorRO<reth_db_api::tables::AccountsHistory>,
317{
318    // Nothing to flush if we haven't filled a complete shard yet.
319    if list.len() <= NUM_OF_INDICES_IN_SHARD {
320        return Ok(());
321    }
322
323    let num_full_shards = list.len() / NUM_OF_INDICES_IN_SHARD;
324
325    // Always keep at least one shard buffered for continued accumulation.
326    // If len is exact multiple of shard size, keep the last full shard.
327    let shards_to_flush = if list.len().is_multiple_of(NUM_OF_INDICES_IN_SHARD) {
328        num_full_shards - 1
329    } else {
330        num_full_shards
331    };
332
333    if shards_to_flush == 0 {
334        return Ok(());
335    }
336
337    // Split: flush the first N shards, keep the remainder buffered.
338    let flush_len = shards_to_flush * NUM_OF_INDICES_IN_SHARD;
339    let remainder = list.split_off(flush_len);
340
341    // Write each complete shard with its highest block number as the key.
342    for chunk in list.chunks(NUM_OF_INDICES_IN_SHARD) {
343        let highest = *chunk.last().expect("chunk is non-empty");
344        let key = ShardedKey::new(address, highest);
345        let value = BlockNumberList::new_pre_sorted(chunk.iter().copied());
346
347        if append_only {
348            writer.append_account_history(key, &value)?;
349        } else {
350            writer.upsert_account_history(key, &value)?;
351        }
352    }
353
354    // Keep the remaining indices for the next iteration.
355    *list = remainder;
356    Ok(())
357}
358
359/// Flushes all remaining shards for account history, using `u64::MAX` for the last shard.
360///
361/// The `u64::MAX` key for the final shard is an invariant that allows `seek_exact(address,
362/// u64::MAX)` to find the last shard during incremental sync for merging with new indices.
363fn flush_account_history_shards<N, CURSOR>(
364    address: Address,
365    list: &mut Vec<u64>,
366    append_only: bool,
367    writer: &mut EitherWriter<'_, CURSOR, N>,
368) -> Result<(), StageError>
369where
370    N: NodePrimitives,
371    CURSOR: DbCursorRW<reth_db_api::tables::AccountsHistory>
372        + DbCursorRO<reth_db_api::tables::AccountsHistory>,
373{
374    if list.is_empty() {
375        return Ok(());
376    }
377
378    let num_chunks = list.len().div_ceil(NUM_OF_INDICES_IN_SHARD);
379
380    for (i, chunk) in list.chunks(NUM_OF_INDICES_IN_SHARD).enumerate() {
381        let is_last = i == num_chunks - 1;
382
383        // Use u64::MAX for the final shard's key. This invariant allows incremental sync
384        // to find the last shard via seek_exact(address, u64::MAX) for merging.
385        let highest = if is_last { u64::MAX } else { *chunk.last().expect("chunk is non-empty") };
386
387        let key = ShardedKey::new(address, highest);
388        let value = BlockNumberList::new_pre_sorted(chunk.iter().copied());
389
390        if append_only {
391            writer.append_account_history(key, &value)?;
392        } else {
393            writer.upsert_account_history(key, &value)?;
394        }
395    }
396
397    list.clear();
398    Ok(())
399}
400
401/// Called when database is ahead of static files. Attempts to find the first block we are missing
402/// transactions for.
403pub(crate) fn missing_static_data_error<Provider>(
404    last_tx_num: TxNumber,
405    static_file_provider: &StaticFileProvider<Provider::Primitives>,
406    provider: &Provider,
407    segment: StaticFileSegment,
408) -> Result<StageError, ProviderError>
409where
410    Provider: BlockReader + StaticFileProviderFactory,
411{
412    let mut last_block =
413        static_file_provider.get_highest_static_file_block(segment).unwrap_or_default();
414
415    // To be extra safe, we make sure that the last tx num matches the last block from its indices.
416    // If not, get it.
417    loop {
418        if let Some(indices) = provider.block_body_indices(last_block)? &&
419            indices.last_tx_num() <= last_tx_num
420        {
421            break
422        }
423        if last_block == 0 {
424            break
425        }
426        last_block -= 1;
427    }
428
429    let missing_block = Box::new(provider.sealed_header(last_block + 1)?.unwrap_or_default());
430
431    Ok(StageError::MissingStaticFileData {
432        block: Box::new(missing_block.block_with_parent()),
433        segment,
434    })
435}
436
437/// Loads storage history indices into the database via `EitherWriter`.
438///
439/// Works with [`EitherWriter`] to support both MDBX and `RocksDB` backends.
440///
441/// ## Process
442/// Iterates over elements, grouping indices by their (address, `storage_key`) pairs. It flushes
443/// indices to disk when reaching a shard's max length (`NUM_OF_INDICES_IN_SHARD`) or when the
444/// (address, `storage_key`) pair changes, ensuring the last previous shard is stored.
445///
446/// Uses `Option<(Address, B256)>` instead of default values as the sentinel to avoid
447/// incorrectly treating `(Address::ZERO, B256::ZERO)` as "no previous key".
448pub(crate) fn load_storage_history<N, CURSOR>(
449    mut collector: Collector<StorageShardedKey, BlockNumberList>,
450    append_only: bool,
451    writer: &mut EitherWriter<'_, CURSOR, N>,
452) -> Result<(), StageError>
453where
454    N: NodePrimitives,
455    CURSOR: DbCursorRW<reth_db_api::tables::StoragesHistory>
456        + DbCursorRO<reth_db_api::tables::StoragesHistory>,
457{
458    let mut current_key: Option<(Address, B256)> = None;
459    // Accumulator for block numbers where the current (address, storage_key) changed.
460    let mut current_list = Vec::<u64>::new();
461
462    let total_entries = collector.len();
463    let interval = (total_entries / 10).max(1);
464
465    for (index, element) in collector.iter()?.enumerate() {
466        let (k, v) = element?;
467        let sharded_key = StorageShardedKey::decode_owned(k)?;
468        let new_list = BlockNumberList::decompress_owned(v)?;
469
470        if index > 0 && index.is_multiple_of(interval) && total_entries > 10 {
471            info!(target: "sync::stages::index_history", progress = %format!("{:.2}%", (index as f64 / total_entries as f64) * 100.0), "Writing indices");
472        }
473
474        let partial_key = (sharded_key.address, sharded_key.sharded_key.key);
475
476        // When (address, storage_key) changes, flush the previous key's shards and start fresh.
477        if current_key != Some(partial_key) {
478            // Flush all remaining shards for the previous key (uses u64::MAX for last shard).
479            if let Some((prev_addr, prev_storage_key)) = current_key {
480                flush_storage_history_shards(
481                    prev_addr,
482                    prev_storage_key,
483                    &mut current_list,
484                    append_only,
485                    writer,
486                )?;
487            }
488
489            current_key = Some(partial_key);
490            current_list.clear();
491
492            // On incremental sync, merge with the existing last shard from the database.
493            // The last shard is stored with key (address, storage_key, u64::MAX) so we can find it.
494            if !append_only &&
495                let Some(last_shard) =
496                    writer.get_last_storage_history_shard(partial_key.0, partial_key.1)?
497            {
498                current_list.extend(last_shard.iter());
499            }
500        }
501
502        // Append new block numbers to the accumulator.
503        current_list.extend(new_list.iter());
504
505        // Flush complete shards, keeping the last (partial) shard buffered.
506        flush_storage_history_shards_partial(
507            partial_key.0,
508            partial_key.1,
509            &mut current_list,
510            append_only,
511            writer,
512        )?;
513    }
514
515    // Flush the final key's remaining shard.
516    if let Some((addr, storage_key)) = current_key {
517        flush_storage_history_shards(addr, storage_key, &mut current_list, append_only, writer)?;
518    }
519
520    Ok(())
521}
522
523/// Flushes complete shards for storage history, keeping the trailing partial shard buffered.
524///
525/// Only flushes when we have more than one shard's worth of data, keeping the last
526/// (possibly partial) shard for continued accumulation. This avoids writing a shard
527/// that may need to be updated when more indices arrive.
528fn flush_storage_history_shards_partial<N, CURSOR>(
529    address: Address,
530    storage_key: B256,
531    list: &mut Vec<u64>,
532    append_only: bool,
533    writer: &mut EitherWriter<'_, CURSOR, N>,
534) -> Result<(), StageError>
535where
536    N: NodePrimitives,
537    CURSOR: DbCursorRW<reth_db_api::tables::StoragesHistory>
538        + DbCursorRO<reth_db_api::tables::StoragesHistory>,
539{
540    // Nothing to flush if we haven't filled a complete shard yet.
541    if list.len() <= NUM_OF_INDICES_IN_SHARD {
542        return Ok(());
543    }
544
545    let num_full_shards = list.len() / NUM_OF_INDICES_IN_SHARD;
546
547    // Always keep at least one shard buffered for continued accumulation.
548    // If len is exact multiple of shard size, keep the last full shard.
549    let shards_to_flush = if list.len().is_multiple_of(NUM_OF_INDICES_IN_SHARD) {
550        num_full_shards - 1
551    } else {
552        num_full_shards
553    };
554
555    if shards_to_flush == 0 {
556        return Ok(());
557    }
558
559    // Split: flush the first N shards, keep the remainder buffered.
560    let flush_len = shards_to_flush * NUM_OF_INDICES_IN_SHARD;
561    let remainder = list.split_off(flush_len);
562
563    // Write each complete shard with its highest block number as the key.
564    for chunk in list.chunks(NUM_OF_INDICES_IN_SHARD) {
565        let highest = *chunk.last().expect("chunk is non-empty");
566        let key = StorageShardedKey::new(address, storage_key, highest);
567        let value = BlockNumberList::new_pre_sorted(chunk.iter().copied());
568
569        if append_only {
570            writer.append_storage_history(key, &value)?;
571        } else {
572            writer.upsert_storage_history(key, &value)?;
573        }
574    }
575
576    // Keep the remaining indices for the next iteration.
577    *list = remainder;
578    Ok(())
579}
580
581/// Flushes all remaining shards for storage history, using `u64::MAX` for the last shard.
582///
583/// The `u64::MAX` key for the final shard is an invariant that allows
584/// `seek_exact(address, storage_key, u64::MAX)` to find the last shard during incremental
585/// sync for merging with new indices.
586fn flush_storage_history_shards<N, CURSOR>(
587    address: Address,
588    storage_key: B256,
589    list: &mut Vec<u64>,
590    append_only: bool,
591    writer: &mut EitherWriter<'_, CURSOR, N>,
592) -> Result<(), StageError>
593where
594    N: NodePrimitives,
595    CURSOR: DbCursorRW<reth_db_api::tables::StoragesHistory>
596        + DbCursorRO<reth_db_api::tables::StoragesHistory>,
597{
598    if list.is_empty() {
599        return Ok(());
600    }
601
602    let num_chunks = list.len().div_ceil(NUM_OF_INDICES_IN_SHARD);
603
604    for (i, chunk) in list.chunks(NUM_OF_INDICES_IN_SHARD).enumerate() {
605        let is_last = i == num_chunks - 1;
606
607        // Use u64::MAX for the final shard's key. This invariant allows incremental sync
608        // to find the last shard via seek_exact(address, storage_key, u64::MAX) for merging.
609        let highest = if is_last { u64::MAX } else { *chunk.last().expect("chunk is non-empty") };
610
611        let key = StorageShardedKey::new(address, storage_key, highest);
612        let value = BlockNumberList::new_pre_sorted(chunk.iter().copied());
613
614        if append_only {
615            writer.append_storage_history(key, &value)?;
616        } else {
617            writer.upsert_storage_history(key, &value)?;
618        }
619    }
620
621    list.clear();
622    Ok(())
623}