Skip to main content

reth_provider/providers/static_file/
manager.rs

1use super::{
2    metrics::StaticFileProviderMetrics, writer::StaticFileWriters, LoadedJar,
3    StaticFileJarProvider, StaticFileProviderRW, StaticFileProviderRWRefMut,
4};
5use crate::{
6    changeset_walker::{StaticFileAccountChangesetWalker, StaticFileStorageChangesetWalker},
7    to_range, BlockHashReader, BlockNumReader, BlockReader, BlockSource, EitherWriter,
8    EitherWriterDestination, HeaderProvider, ReceiptProvider, StageCheckpointReader, StatsReader,
9    TransactionVariant, TransactionsProvider, TransactionsProviderExt,
10};
11use alloy_consensus::{
12    transaction::{TransactionMeta, TxHashRef},
13    Header,
14};
15use alloy_eips::BlockHashOrNumber;
16use alloy_primitives::{b256, Address, BlockHash, BlockNumber, TxHash, TxNumber, B256};
17
18use parking_lot::RwLock;
19use reth_chain_state::ExecutedBlock;
20use reth_chainspec::{ChainInfo, ChainSpecProvider, EthChainSpec, NamedChain};
21use reth_db::{
22    lockfile::StorageLock,
23    static_file::{
24        iter_static_files, BlockHashMask, HeaderMask, HeaderWithHashMask, ReceiptMask,
25        StaticFileCursor, StorageChangesetMask, TransactionMask, TransactionSenderMask,
26    },
27};
28use reth_db_api::{
29    cursor::DbCursorRO,
30    models::{AccountBeforeTx, BlockNumberAddress, StorageBeforeTx, StoredBlockBodyIndices},
31    table::{Decompress, Table, Value},
32    tables,
33    transaction::DbTx,
34};
35use reth_ethereum_primitives::{Receipt, TransactionSigned};
36use reth_nippy_jar::{NippyJar, NippyJarChecker};
37use reth_node_types::NodePrimitives;
38use reth_primitives_traits::{
39    dashmap::DashMap, AlloyBlockHeader as _, BlockBody as _, RecoveredBlock, SealedHeader,
40    SignedTransaction, StorageEntry,
41};
42use reth_prune_types::PruneSegment;
43use reth_stages_types::PipelineTarget;
44use reth_static_file_types::{
45    find_fixed_range, HighestStaticFiles, SegmentHeader, SegmentRangeInclusive, StaticFileMap,
46    StaticFileSegment, DEFAULT_BLOCKS_PER_STATIC_FILE,
47};
48use reth_storage_api::{
49    BlockBodyIndicesProvider, ChangeSetReader, DBProvider, PruneCheckpointReader,
50    StorageChangeSetReader, StorageSettingsCache,
51};
52use reth_storage_errors::provider::{ProviderError, ProviderResult, StaticFileWriterError};
53use std::{
54    collections::BTreeMap,
55    fmt::Debug,
56    ops::{Bound, Deref, Range, RangeBounds, RangeInclusive},
57    path::{Path, PathBuf},
58    sync::{atomic::AtomicU64, mpsc, Arc},
59};
60use tracing::{debug, info, info_span, instrument, trace, warn};
61
62/// Alias type for a map that can be queried for block or transaction ranges. It uses `u64` to
63/// represent either a block or a transaction number end of a static file range.
64type SegmentRanges = BTreeMap<u64, SegmentRangeInclusive>;
65
66/// Access mode on a static file provider. RO/RW.
67#[derive(Debug, Default, PartialEq, Eq)]
68pub enum StaticFileAccess {
69    /// Read-only access.
70    #[default]
71    RO,
72    /// Read-write access.
73    RW,
74}
75
76impl StaticFileAccess {
77    /// Returns `true` if read-only access.
78    pub const fn is_read_only(&self) -> bool {
79        matches!(self, Self::RO)
80    }
81
82    /// Returns `true` if read-write access.
83    pub const fn is_read_write(&self) -> bool {
84        matches!(self, Self::RW)
85    }
86}
87
88/// Context for static file block writes.
89///
90/// Contains target segments and pruning configuration.
91#[derive(Debug, Clone, Copy, Default)]
92pub struct StaticFileWriteCtx {
93    /// Whether transaction senders should be written to static files.
94    pub write_senders: bool,
95    /// Whether receipts should be written to static files.
96    pub write_receipts: bool,
97    /// Whether account changesets should be written to static files.
98    pub write_account_changesets: bool,
99    /// Whether storage changesets should be written to static files.
100    pub write_storage_changesets: bool,
101    /// The current chain tip block number (for pruning).
102    pub tip: BlockNumber,
103    /// The prune mode for receipts, if any.
104    pub receipts_prune_mode: Option<reth_prune_types::PruneMode>,
105    /// Whether receipts are prunable (based on storage settings and prune distance).
106    pub receipts_prunable: bool,
107}
108
109/// [`StaticFileProvider`] manages all existing [`StaticFileJarProvider`].
110///
111/// "Static files" contain immutable chain history data, such as:
112///  - transactions
113///  - headers
114///  - receipts
115///
116/// This provider type is responsible for reading and writing to static files.
117#[derive(Debug)]
118pub struct StaticFileProvider<N>(pub(crate) Arc<StaticFileProviderInner<N>>);
119
120impl<N> Clone for StaticFileProvider<N> {
121    fn clone(&self) -> Self {
122        Self(self.0.clone())
123    }
124}
125
126/// Builder for [`StaticFileProvider`] that allows configuration before initialization.
127#[derive(Debug)]
128pub struct StaticFileProviderBuilder<P> {
129    access: StaticFileAccess,
130    use_metrics: bool,
131    blocks_per_file: StaticFileMap<u64>,
132    path: P,
133    genesis_block_number: u64,
134}
135
136impl<P: AsRef<Path>> StaticFileProviderBuilder<P> {
137    /// Creates a new builder with read-write access.
138    pub fn read_write(path: P) -> Self {
139        Self {
140            path,
141            access: StaticFileAccess::RW,
142            blocks_per_file: Default::default(),
143            use_metrics: false,
144            genesis_block_number: 0,
145        }
146    }
147
148    /// Creates a new builder with read-only access.
149    pub fn read_only(path: P) -> Self {
150        Self {
151            path,
152            access: StaticFileAccess::RO,
153            blocks_per_file: Default::default(),
154            use_metrics: false,
155            genesis_block_number: 0,
156        }
157    }
158
159    /// Set custom blocks per file for specific segments.
160    ///
161    /// Each static file segment is stored across multiple files, and each of these files contains
162    /// up to the specified number of blocks of data. When the file gets full, a new file is
163    /// created with the new block range.
164    ///
165    /// This setting affects the size of each static file, and can be set per segment.
166    ///
167    /// If it is changed for an existing node, existing static files will not be affected and will
168    /// be finished with the old blocks per file setting, but new static files will use the new
169    /// setting.
170    pub fn with_blocks_per_file_for_segments(
171        mut self,
172        segments: &<StaticFileMap<u64> as Deref>::Target,
173    ) -> Self {
174        for (segment, &blocks_per_file) in segments {
175            self.blocks_per_file.insert(segment, blocks_per_file);
176        }
177        self
178    }
179
180    /// Set a custom number of blocks per file for all segments.
181    pub fn with_blocks_per_file(mut self, blocks_per_file: u64) -> Self {
182        for segment in StaticFileSegment::iter() {
183            self.blocks_per_file.insert(segment, blocks_per_file);
184        }
185        self
186    }
187
188    /// Set a custom number of blocks per file for a specific segment.
189    pub fn with_blocks_per_file_for_segment(
190        mut self,
191        segment: StaticFileSegment,
192        blocks_per_file: u64,
193    ) -> Self {
194        self.blocks_per_file.insert(segment, blocks_per_file);
195        self
196    }
197
198    /// Enables metrics on the [`StaticFileProvider`].
199    pub const fn with_metrics(mut self) -> Self {
200        self.use_metrics = true;
201        self
202    }
203
204    /// Sets the genesis block number for the [`StaticFileProvider`].
205    ///
206    /// This configures the genesis block number, which is used to determine the starting point
207    /// for block indexing and querying operations.
208    ///
209    /// # Arguments
210    ///
211    /// * `genesis_block_number` - The block number of the genesis block.
212    ///
213    /// # Returns
214    ///
215    /// Returns `Self` to allow method chaining.
216    pub const fn with_genesis_block_number(mut self, genesis_block_number: u64) -> Self {
217        self.genesis_block_number = genesis_block_number;
218        self
219    }
220
221    /// Builds the final [`StaticFileProvider`] and initializes the index.
222    pub fn build<N: NodePrimitives>(self) -> ProviderResult<StaticFileProvider<N>> {
223        let mut provider = StaticFileProviderInner::new(self.path, self.access)?;
224        if self.use_metrics {
225            provider.metrics = Some(Arc::new(StaticFileProviderMetrics::default()));
226        }
227
228        for (segment, blocks_per_file) in *self.blocks_per_file {
229            provider.blocks_per_file.insert(segment, blocks_per_file);
230        }
231        provider.genesis_block_number = self.genesis_block_number;
232
233        let provider = StaticFileProvider(Arc::new(provider));
234        provider.initialize_index()?;
235        Ok(provider)
236    }
237}
238
239impl<N: NodePrimitives> StaticFileProvider<N> {
240    /// Creates a new [`StaticFileProvider`] with the given [`StaticFileAccess`].
241    fn new(path: impl AsRef<Path>, access: StaticFileAccess) -> ProviderResult<Self> {
242        let provider = Self(Arc::new(StaticFileProviderInner::new(path, access)?));
243        provider.initialize_index()?;
244        Ok(provider)
245    }
246}
247
248impl<N: NodePrimitives> StaticFileProvider<N> {
249    /// Creates a new [`StaticFileProvider`] with read-only access.
250    ///
251    /// The caller is responsible for calling [`StaticFileProvider::initialize_index`] when
252    /// underlying data changes.
253    pub fn read_only(path: impl AsRef<Path>) -> ProviderResult<Self> {
254        Self::new(path, StaticFileAccess::RO)
255    }
256
257    /// Creates a new [`StaticFileProvider`] with read-write access.
258    pub fn read_write(path: impl AsRef<Path>) -> ProviderResult<Self> {
259        Self::new(path, StaticFileAccess::RW)
260    }
261}
262
263impl<N: NodePrimitives> Deref for StaticFileProvider<N> {
264    type Target = StaticFileProviderInner<N>;
265
266    fn deref(&self) -> &Self::Target {
267        &self.0
268    }
269}
270
271/// [`StaticFileProviderInner`] manages all existing [`StaticFileJarProvider`].
272#[derive(Debug)]
273pub struct StaticFileProviderInner<N> {
274    /// Maintains a map which allows for concurrent access to different `NippyJars`, over different
275    /// segments and ranges.
276    map: DashMap<(BlockNumber, StaticFileSegment), LoadedJar>,
277    /// Indexes per segment.
278    indexes: RwLock<StaticFileMap<StaticFileSegmentIndex>>,
279    /// This is an additional index that tracks the expired height, this will track the highest
280    /// block number that has been expired (missing). The first, non expired block is
281    /// `expired_history_height + 1`.
282    ///
283    /// This is effectively the transaction range that has been expired:
284    /// [`StaticFileProvider::delete_segment_below_block`] and mirrors
285    /// `static_files_min_block[transactions] - blocks_per_file`.
286    ///
287    /// This additional tracker exists for more efficient lookups because the node must be aware of
288    /// the expired height.
289    earliest_history_height: AtomicU64,
290    /// Directory where `static_files` are located
291    path: PathBuf,
292    /// Maintains a writer set of [`StaticFileSegment`].
293    writers: StaticFileWriters<N>,
294    /// Metrics for the static files.
295    metrics: Option<Arc<StaticFileProviderMetrics>>,
296    /// Access rights of the provider.
297    access: StaticFileAccess,
298    /// Number of blocks per file, per segment.
299    blocks_per_file: StaticFileMap<u64>,
300    /// Write lock for when access is [`StaticFileAccess::RW`].
301    _lock_file: Option<StorageLock>,
302    /// Genesis block number, default is 0;
303    genesis_block_number: u64,
304}
305
306impl<N: NodePrimitives> StaticFileProviderInner<N> {
307    /// Creates a new [`StaticFileProviderInner`].
308    fn new(path: impl AsRef<Path>, access: StaticFileAccess) -> ProviderResult<Self> {
309        let _lock_file = if access.is_read_write() {
310            StorageLock::try_acquire(path.as_ref()).map_err(ProviderError::other)?.into()
311        } else {
312            None
313        };
314
315        let mut blocks_per_file = StaticFileMap::default();
316        for segment in StaticFileSegment::iter() {
317            blocks_per_file.insert(segment, DEFAULT_BLOCKS_PER_STATIC_FILE);
318        }
319
320        let provider = Self {
321            map: Default::default(),
322            indexes: Default::default(),
323            writers: Default::default(),
324            earliest_history_height: Default::default(),
325            path: path.as_ref().to_path_buf(),
326            metrics: None,
327            access,
328            blocks_per_file,
329            _lock_file,
330            genesis_block_number: 0,
331        };
332
333        Ok(provider)
334    }
335
336    pub const fn is_read_only(&self) -> bool {
337        self.access.is_read_only()
338    }
339
340    /// Each static file has a fixed number of blocks. This gives out the range where the requested
341    /// block is positioned.
342    ///
343    /// If the specified block falls into one of the ranges of already initialized static files,
344    /// this function will return that range.
345    ///
346    /// If no matching file exists, this function will derive a new range from the end of the last
347    /// existing file, if any.
348    pub fn find_fixed_range_with_block_index(
349        &self,
350        segment: StaticFileSegment,
351        block_index: Option<&SegmentRanges>,
352        block: BlockNumber,
353    ) -> SegmentRangeInclusive {
354        let blocks_per_file =
355            self.blocks_per_file.get(segment).copied().unwrap_or(DEFAULT_BLOCKS_PER_STATIC_FILE);
356
357        if let Some(block_index) = block_index {
358            // Find first block range that contains the requested block
359            if let Some((_, range)) = block_index.iter().find(|(max_block, _)| block <= **max_block)
360            {
361                // Found matching range for an existing file using block index
362                return *range;
363            } else if let Some((_, range)) = block_index.last_key_value() {
364                // Didn't find matching range for an existing file, derive a new range from the end
365                // of the last existing file range.
366                //
367                // `block` is always higher than `range.end()` here, because we iterated over all
368                // `block_index` ranges above and didn't find one that contains our block
369                let blocks_after_last_range = block - range.end();
370                let segments_to_skip = (blocks_after_last_range - 1) / blocks_per_file;
371                let start = range.end() + 1 + segments_to_skip * blocks_per_file;
372                return SegmentRangeInclusive::new(start, start + blocks_per_file - 1);
373            }
374        }
375        // No block index is available, derive a new range using the fixed number of blocks,
376        // starting from the beginning.
377        find_fixed_range(block, blocks_per_file)
378    }
379
380    /// Each static file has a fixed number of blocks. This gives out the range where the requested
381    /// block is positioned.
382    ///
383    /// If the specified block falls into one of the ranges of already initialized static files,
384    /// this function will return that range.
385    ///
386    /// If no matching file exists, this function will derive a new range from the end of the last
387    /// existing file, if any.
388    ///
389    /// This function will block indefinitely if a write lock for
390    /// [`Self::indexes`] is already acquired. In that case, use
391    /// [`Self::find_fixed_range_with_block_index`].
392    pub fn find_fixed_range(
393        &self,
394        segment: StaticFileSegment,
395        block: BlockNumber,
396    ) -> SegmentRangeInclusive {
397        self.find_fixed_range_with_block_index(
398            segment,
399            self.indexes.read().get(segment).map(|index| &index.expected_block_ranges_by_max_block),
400            block,
401        )
402    }
403
404    /// Get genesis block number
405    pub const fn genesis_block_number(&self) -> u64 {
406        self.genesis_block_number
407    }
408}
409
410impl<N: NodePrimitives> StaticFileProvider<N> {
411    /// Reports metrics for the static files.
412    ///
413    /// This uses the in-memory index to get file sizes from mmap handles instead of reading
414    /// filesystem metadata.
415    pub fn report_metrics(&self) -> ProviderResult<()> {
416        let Some(metrics) = &self.metrics else { return Ok(()) };
417
418        let static_files = iter_static_files(&self.path).map_err(ProviderError::other)?;
419        for (segment, headers) in &*static_files {
420            let mut entries = 0;
421            let mut size = 0;
422
423            for (block_range, _) in headers {
424                let fixed_block_range = self.find_fixed_range(segment, block_range.start());
425                let jar_provider = self
426                    .get_segment_provider_for_range(segment, || Some(fixed_block_range), None)?
427                    .ok_or_else(|| {
428                        ProviderError::MissingStaticFileBlock(segment, block_range.start())
429                    })?;
430
431                entries += jar_provider.rows();
432                size += jar_provider.size() as u64;
433            }
434
435            metrics.record_segment(segment, size, headers.len(), entries);
436        }
437
438        Ok(())
439    }
440
441    /// Writes headers for all blocks to the static file segment.
442    #[instrument(level = "debug", target = "providers::static_file", skip_all)]
443    fn write_headers(
444        w: &mut StaticFileProviderRWRefMut<'_, N>,
445        blocks: &[ExecutedBlock<N>],
446    ) -> ProviderResult<()> {
447        for block in blocks {
448            let b = block.recovered_block();
449            w.append_header(b.header(), &b.hash())?;
450        }
451        Ok(())
452    }
453
454    /// Writes transactions for all blocks to the static file segment.
455    #[instrument(level = "debug", target = "providers::static_file", skip_all)]
456    fn write_transactions(
457        w: &mut StaticFileProviderRWRefMut<'_, N>,
458        blocks: &[ExecutedBlock<N>],
459        tx_nums: &[TxNumber],
460    ) -> ProviderResult<()> {
461        for (block, &first_tx) in blocks.iter().zip(tx_nums) {
462            let b = block.recovered_block();
463            w.increment_block(b.number())?;
464            for (i, tx) in b.body().transactions().iter().enumerate() {
465                w.append_transaction(first_tx + i as u64, tx)?;
466            }
467        }
468        Ok(())
469    }
470
471    /// Writes transaction senders for all blocks to the static file segment.
472    #[instrument(level = "debug", target = "providers::static_file", skip_all)]
473    fn write_transaction_senders(
474        w: &mut StaticFileProviderRWRefMut<'_, N>,
475        blocks: &[ExecutedBlock<N>],
476        tx_nums: &[TxNumber],
477    ) -> ProviderResult<()> {
478        for (block, &first_tx) in blocks.iter().zip(tx_nums) {
479            let b = block.recovered_block();
480            w.increment_block(b.number())?;
481            for (i, sender) in b.senders_iter().enumerate() {
482                w.append_transaction_sender(first_tx + i as u64, sender)?;
483            }
484        }
485        Ok(())
486    }
487
488    /// Writes receipts for all blocks to the static file segment.
489    #[instrument(level = "debug", target = "providers::static_file", skip_all)]
490    fn write_receipts(
491        w: &mut StaticFileProviderRWRefMut<'_, N>,
492        blocks: &[ExecutedBlock<N>],
493        tx_nums: &[TxNumber],
494        ctx: &StaticFileWriteCtx,
495    ) -> ProviderResult<()> {
496        for (block, &first_tx) in blocks.iter().zip(tx_nums) {
497            let block_number = block.recovered_block().number();
498            w.increment_block(block_number)?;
499
500            // skip writing receipts if pruning configuration requires us to.
501            if ctx.receipts_prunable &&
502                ctx.receipts_prune_mode
503                    .is_some_and(|mode| mode.should_prune(block_number, ctx.tip))
504            {
505                continue
506            }
507
508            for (i, receipt) in block.execution_outcome().receipts.iter().enumerate() {
509                w.append_receipt(first_tx + i as u64, receipt)?;
510            }
511        }
512        Ok(())
513    }
514
515    /// Writes account changesets for all blocks to the static file segment.
516    #[instrument(level = "debug", target = "providers::static_file", skip_all)]
517    fn write_account_changesets(
518        w: &mut StaticFileProviderRWRefMut<'_, N>,
519        blocks: &[ExecutedBlock<N>],
520    ) -> ProviderResult<()> {
521        for block in blocks {
522            let block_number = block.recovered_block().number();
523            let reverts = block.execution_outcome().state.reverts.to_plain_state_reverts();
524
525            let changeset: Vec<_> = reverts
526                .accounts
527                .into_iter()
528                .flatten()
529                .map(|(address, info)| AccountBeforeTx { address, info: info.map(Into::into) })
530                .collect();
531            w.append_account_changeset(changeset, block_number)?;
532        }
533        Ok(())
534    }
535
536    /// Writes storage changesets for all blocks to the static file segment.
537    #[instrument(level = "debug", target = "providers::db", skip_all)]
538    fn write_storage_changesets(
539        w: &mut StaticFileProviderRWRefMut<'_, N>,
540        blocks: &[ExecutedBlock<N>],
541    ) -> ProviderResult<()> {
542        for block in blocks {
543            let block_number = block.recovered_block().number();
544            let reverts = block.execution_outcome().state.reverts.to_plain_state_reverts();
545
546            let changeset: Vec<_> = reverts
547                .storage
548                .into_iter()
549                .flatten()
550                .flat_map(|revert| {
551                    revert.storage_revert.into_iter().map(move |(key, revert_to_slot)| {
552                        StorageBeforeTx {
553                            address: revert.address,
554                            key: B256::from(key.to_be_bytes()),
555                            value: revert_to_slot.to_previous_value(),
556                        }
557                    })
558                })
559                .collect();
560            w.append_storage_changeset(changeset, block_number)?;
561        }
562        Ok(())
563    }
564
565    /// Writes to a static file segment using the provided closure.
566    ///
567    /// The closure receives a mutable reference to the segment writer. After the closure completes,
568    /// `sync_all()` is called to flush writes to disk.
569    #[instrument(level = "debug", target = "providers::static_file", skip_all, fields(?segment))]
570    fn write_segment<F>(
571        &self,
572        segment: StaticFileSegment,
573        first_block_number: BlockNumber,
574        f: F,
575    ) -> ProviderResult<()>
576    where
577        F: FnOnce(&mut StaticFileProviderRWRefMut<'_, N>) -> ProviderResult<()>,
578    {
579        let mut w = self.get_writer(first_block_number, segment)?;
580        f(&mut w)?;
581        w.sync_all()
582    }
583
584    /// Writes all static file data for multiple blocks in parallel per-segment.
585    ///
586    /// This spawns tasks on the storage thread pool for each segment type and each task calls
587    /// `sync_all()` on its writer when done.
588    #[instrument(level = "debug", target = "providers::static_file", skip_all)]
589    pub fn write_blocks_data(
590        &self,
591        blocks: &[ExecutedBlock<N>],
592        tx_nums: &[TxNumber],
593        ctx: StaticFileWriteCtx,
594        runtime: &reth_tasks::Runtime,
595    ) -> ProviderResult<()> {
596        if blocks.is_empty() {
597            return Ok(());
598        }
599
600        let first_block_number = blocks[0].recovered_block().number();
601
602        let mut r_headers = None;
603        let mut r_txs = None;
604        let mut r_senders = None;
605        let mut r_receipts = None;
606        let mut r_account_changesets = None;
607        let mut r_storage_changesets = None;
608
609        // Propagate tracing context into rayon-spawned threads so that per-segment
610        // write spans appear as children of write_blocks_data in traces.
611        let span = tracing::Span::current();
612        runtime.storage_pool().in_place_scope(|s| {
613            s.spawn(|_| {
614                let _guard = span.enter();
615                r_headers =
616                    Some(self.write_segment(StaticFileSegment::Headers, first_block_number, |w| {
617                        Self::write_headers(w, blocks)
618                    }));
619            });
620
621            s.spawn(|_| {
622                let _guard = span.enter();
623                r_txs = Some(self.write_segment(
624                    StaticFileSegment::Transactions,
625                    first_block_number,
626                    |w| Self::write_transactions(w, blocks, tx_nums),
627                ));
628            });
629
630            if ctx.write_senders {
631                s.spawn(|_| {
632                    let _guard = span.enter();
633                    r_senders = Some(self.write_segment(
634                        StaticFileSegment::TransactionSenders,
635                        first_block_number,
636                        |w| Self::write_transaction_senders(w, blocks, tx_nums),
637                    ));
638                });
639            }
640
641            if ctx.write_receipts {
642                s.spawn(|_| {
643                    let _guard = span.enter();
644                    r_receipts = Some(self.write_segment(
645                        StaticFileSegment::Receipts,
646                        first_block_number,
647                        |w| Self::write_receipts(w, blocks, tx_nums, &ctx),
648                    ));
649                });
650            }
651
652            if ctx.write_account_changesets {
653                s.spawn(|_| {
654                    let _guard = span.enter();
655                    r_account_changesets = Some(self.write_segment(
656                        StaticFileSegment::AccountChangeSets,
657                        first_block_number,
658                        |w| Self::write_account_changesets(w, blocks),
659                    ));
660                });
661            }
662
663            if ctx.write_storage_changesets {
664                s.spawn(|_| {
665                    let _guard = span.enter();
666                    r_storage_changesets = Some(self.write_segment(
667                        StaticFileSegment::StorageChangeSets,
668                        first_block_number,
669                        |w| Self::write_storage_changesets(w, blocks),
670                    ));
671                });
672            }
673        });
674
675        r_headers.ok_or(StaticFileWriterError::ThreadPanic("headers"))??;
676        r_txs.ok_or(StaticFileWriterError::ThreadPanic("transactions"))??;
677        if ctx.write_senders {
678            r_senders.ok_or(StaticFileWriterError::ThreadPanic("senders"))??;
679        }
680        if ctx.write_receipts {
681            r_receipts.ok_or(StaticFileWriterError::ThreadPanic("receipts"))??;
682        }
683        if ctx.write_account_changesets {
684            r_account_changesets
685                .ok_or(StaticFileWriterError::ThreadPanic("account_changesets"))??;
686        }
687        if ctx.write_storage_changesets {
688            r_storage_changesets
689                .ok_or(StaticFileWriterError::ThreadPanic("storage_changesets"))??;
690        }
691        Ok(())
692    }
693
694    /// Gets the [`StaticFileJarProvider`] of the requested segment and start index that can be
695    /// either block or transaction.
696    pub fn get_segment_provider(
697        &self,
698        segment: StaticFileSegment,
699        number: u64,
700    ) -> ProviderResult<StaticFileJarProvider<'_, N>> {
701        if segment.is_block_or_change_based() {
702            self.get_segment_provider_for_block(segment, number, None)
703        } else {
704            self.get_segment_provider_for_transaction(segment, number, None)
705        }
706    }
707
708    /// Gets the [`StaticFileJarProvider`] of the requested segment and start index that can be
709    /// either block or transaction.
710    ///
711    /// If the segment is not found, returns [`None`].
712    pub fn get_maybe_segment_provider(
713        &self,
714        segment: StaticFileSegment,
715        number: u64,
716    ) -> ProviderResult<Option<StaticFileJarProvider<'_, N>>> {
717        let provider = if segment.is_block_or_change_based() {
718            self.get_segment_provider_for_block(segment, number, None)
719        } else {
720            self.get_segment_provider_for_transaction(segment, number, None)
721        };
722
723        match provider {
724            Ok(provider) => Ok(Some(provider)),
725            Err(
726                ProviderError::MissingStaticFileBlock(_, _) |
727                ProviderError::MissingStaticFileTx(_, _),
728            ) => Ok(None),
729            Err(err) => Err(err),
730        }
731    }
732
733    /// Gets the [`StaticFileJarProvider`] of the requested segment and block.
734    pub fn get_segment_provider_for_block(
735        &self,
736        segment: StaticFileSegment,
737        block: BlockNumber,
738        path: Option<&Path>,
739    ) -> ProviderResult<StaticFileJarProvider<'_, N>> {
740        self.get_segment_provider_for_range(
741            segment,
742            || self.get_segment_ranges_from_block(segment, block),
743            path,
744        )?
745        .ok_or(ProviderError::MissingStaticFileBlock(segment, block))
746    }
747
748    /// Gets the [`StaticFileJarProvider`] of the requested segment and transaction.
749    pub fn get_segment_provider_for_transaction(
750        &self,
751        segment: StaticFileSegment,
752        tx: TxNumber,
753        path: Option<&Path>,
754    ) -> ProviderResult<StaticFileJarProvider<'_, N>> {
755        self.get_segment_provider_for_range(
756            segment,
757            || self.get_segment_ranges_from_transaction(segment, tx),
758            path,
759        )?
760        .ok_or(ProviderError::MissingStaticFileTx(segment, tx))
761    }
762
763    /// Gets the [`StaticFileJarProvider`] of the requested segment and block or transaction.
764    ///
765    /// `fn_range` should make sure the range goes through `find_fixed_range`.
766    pub fn get_segment_provider_for_range(
767        &self,
768        segment: StaticFileSegment,
769        fn_range: impl Fn() -> Option<SegmentRangeInclusive>,
770        path: Option<&Path>,
771    ) -> ProviderResult<Option<StaticFileJarProvider<'_, N>>> {
772        // If we have a path, then get the block range from its name.
773        // Otherwise, check `self.available_static_files`
774        let block_range = match path {
775            Some(path) => StaticFileSegment::parse_filename(
776                &path
777                    .file_name()
778                    .ok_or_else(|| {
779                        ProviderError::MissingStaticFileSegmentPath(segment, path.to_path_buf())
780                    })?
781                    .to_string_lossy(),
782            )
783            .and_then(|(parsed_segment, block_range)| {
784                if parsed_segment == segment {
785                    return Some(block_range);
786                }
787                None
788            }),
789            None => fn_range(),
790        };
791
792        // Return cached `LoadedJar` or insert it for the first time, and then, return it.
793        if let Some(block_range) = block_range {
794            return Ok(Some(self.get_or_create_jar_provider(segment, &block_range)?));
795        }
796
797        Ok(None)
798    }
799
800    /// Gets the [`StaticFileJarProvider`] of the requested path.
801    pub fn get_segment_provider_for_path(
802        &self,
803        path: &Path,
804    ) -> ProviderResult<Option<StaticFileJarProvider<'_, N>>> {
805        StaticFileSegment::parse_filename(
806            &path
807                .file_name()
808                .ok_or_else(|| ProviderError::MissingStaticFilePath(path.to_path_buf()))?
809                .to_string_lossy(),
810        )
811        .map(|(segment, block_range)| self.get_or_create_jar_provider(segment, &block_range))
812        .transpose()
813    }
814
815    /// Given a segment and block range it removes the cached provider from the map.
816    ///
817    /// CAUTION: cached provider should be dropped before calling this or IT WILL deadlock.
818    pub fn remove_cached_provider(
819        &self,
820        segment: StaticFileSegment,
821        fixed_block_range_end: BlockNumber,
822    ) {
823        self.map.remove(&(fixed_block_range_end, segment));
824    }
825
826    /// This handles history expiry by deleting all static files for the given segment below the
827    /// given block.
828    ///
829    /// For example if block is 1M and the blocks per file are 500K this will delete all individual
830    /// files below 1M, so 0-499K and 500K-999K.
831    ///
832    /// This will not delete the file that contains the block itself, because files can only be
833    /// removed entirely.
834    ///
835    /// # Safety
836    ///
837    /// This method will never delete the highest static file for the segment, even if the
838    /// requested block is higher than the highest block in static files. This ensures we always
839    /// maintain at least one static file if any exist.
840    ///
841    /// Returns a list of `SegmentHeader`s from the deleted jars.
842    pub fn delete_segment_below_block(
843        &self,
844        segment: StaticFileSegment,
845        block: BlockNumber,
846    ) -> ProviderResult<Vec<SegmentHeader>> {
847        // Nothing to delete if block is 0.
848        if block == 0 {
849            return Ok(Vec::new());
850        }
851
852        let highest_block = self.get_highest_static_file_block(segment);
853        let mut deleted_headers = Vec::new();
854
855        loop {
856            let Some(block_height) = self.get_lowest_range_end(segment) else {
857                return Ok(deleted_headers);
858            };
859
860            // Stop if we've reached the target block or the highest static file
861            if block_height >= block || Some(block_height) == highest_block {
862                return Ok(deleted_headers);
863            }
864
865            debug!(
866                target: "providers::static_file",
867                ?segment,
868                ?block_height,
869                "Deleting static file below block"
870            );
871
872            // now we need to wipe the static file, this will take care of updating the index and
873            // advance the lowest tracked block height for the segment.
874            let header = self.delete_jar(segment, block_height).inspect_err(|err| {
875                warn!( target: "providers::static_file", ?segment, %block_height, ?err, "Failed to delete static file below block")
876            })?;
877
878            deleted_headers.push(header);
879        }
880    }
881
882    /// Given a segment and block, it deletes the jar and all files from the respective block range.
883    ///
884    /// CAUTION: destructive. Deletes files on disk.
885    ///
886    /// This will re-initialize the index after deletion, so all files are tracked.
887    ///
888    /// Returns the `SegmentHeader` of the deleted jar.
889    pub fn delete_jar(
890        &self,
891        segment: StaticFileSegment,
892        block: BlockNumber,
893    ) -> ProviderResult<SegmentHeader> {
894        let fixed_block_range = self.find_fixed_range(segment, block);
895        let key = (fixed_block_range.end(), segment);
896        let file = self.path.join(segment.filename(&fixed_block_range));
897        let jar = if let Some((_, jar)) = self.map.remove(&key) {
898            jar.jar
899        } else {
900            debug!(
901                target: "providers::static_file",
902                ?file,
903                ?fixed_block_range,
904                ?block,
905                "Loading static file jar for deletion"
906            );
907            NippyJar::<SegmentHeader>::load(&file).map_err(ProviderError::other)?
908        };
909
910        let header = jar.user_header().clone();
911
912        // Delete the sidecar file for changeset segments before deleting the main jar
913        if segment.is_change_based() {
914            let csoff_path = file.with_extension("csoff");
915            if csoff_path.exists() {
916                std::fs::remove_file(&csoff_path).map_err(ProviderError::other)?;
917            }
918        }
919
920        jar.delete().map_err(ProviderError::other)?;
921
922        // SAFETY: this is currently necessary to ensure that certain indexes like
923        // `static_files_min_block` have the correct values after pruning.
924        self.initialize_index()?;
925
926        Ok(header)
927    }
928
929    /// Deletes ALL static file jars for the given segment, including the highest one.
930    ///
931    /// CAUTION: destructive. Deletes all files on disk for this segment.
932    ///
933    /// This is used for `PruneMode::Full` where all data should be removed.
934    ///
935    /// Returns a list of `SegmentHeader`s from the deleted jars.
936    pub fn delete_segment(&self, segment: StaticFileSegment) -> ProviderResult<Vec<SegmentHeader>> {
937        let mut deleted_headers = Vec::new();
938
939        self.writers.remove(segment);
940
941        while let Some(block_height) = self.get_highest_static_file_block(segment) {
942            debug!(
943                target: "providers::static_file",
944                ?segment,
945                ?block_height,
946                "Deleting static file jar"
947            );
948
949            let header = self.delete_jar(segment, block_height).inspect_err(|err| {
950                warn!(target: "providers::static_file", ?segment, %block_height, ?err, "Failed to delete static file jar")
951            })?;
952
953            deleted_headers.push(header);
954        }
955
956        Ok(deleted_headers)
957    }
958
959    /// Given a segment and block range it returns a cached
960    /// [`StaticFileJarProvider`]. TODO(joshie): we should check the size and pop N if there's too
961    /// many.
962    fn get_or_create_jar_provider(
963        &self,
964        segment: StaticFileSegment,
965        fixed_block_range: &SegmentRangeInclusive,
966    ) -> ProviderResult<StaticFileJarProvider<'_, N>> {
967        let key = (fixed_block_range.end(), segment);
968
969        // Avoid using `entry` directly to avoid a write lock in the common case.
970        trace!(target: "providers::static_file", ?segment, ?fixed_block_range, "Getting provider");
971        let mut provider: StaticFileJarProvider<'_, N> = if let Some(jar) = self.map.get(&key) {
972            trace!(target: "providers::static_file", ?segment, ?fixed_block_range, "Jar found in cache");
973            jar.into()
974        } else {
975            trace!(target: "providers::static_file", ?segment, ?fixed_block_range, "Creating jar from scratch");
976            let path = self.path.join(segment.filename(fixed_block_range));
977            let jar = NippyJar::load(&path).map_err(ProviderError::other)?;
978            self.map.entry(key).insert(LoadedJar::new(jar)?).downgrade().into()
979        };
980
981        if let Some(metrics) = &self.metrics {
982            provider = provider.with_metrics(metrics.clone());
983        }
984        Ok(provider)
985    }
986
987    /// Gets a static file segment's block range from the provider inner block
988    /// index.
989    fn get_segment_ranges_from_block(
990        &self,
991        segment: StaticFileSegment,
992        block: u64,
993    ) -> Option<SegmentRangeInclusive> {
994        let indexes = self.indexes.read();
995        let index = indexes.get(segment)?;
996
997        (index.max_block >= block).then(|| {
998            self.find_fixed_range_with_block_index(
999                segment,
1000                Some(&index.expected_block_ranges_by_max_block),
1001                block,
1002            )
1003        })
1004    }
1005
1006    /// Gets a static file segment's fixed block range from the provider inner
1007    /// transaction index.
1008    fn get_segment_ranges_from_transaction(
1009        &self,
1010        segment: StaticFileSegment,
1011        tx: u64,
1012    ) -> Option<SegmentRangeInclusive> {
1013        let indexes = self.indexes.read();
1014        let index = indexes.get(segment)?;
1015        let available_block_ranges_by_max_tx = index.available_block_ranges_by_max_tx.as_ref()?;
1016
1017        // It's more probable that the request comes from a newer tx height, so we iterate
1018        // the static_files in reverse.
1019        let mut static_files_rev_iter = available_block_ranges_by_max_tx.iter().rev().peekable();
1020
1021        while let Some((tx_end, block_range)) = static_files_rev_iter.next() {
1022            if tx > *tx_end {
1023                // request tx is higher than highest static file tx
1024                return None;
1025            }
1026            let tx_start = static_files_rev_iter.peek().map(|(tx_end, _)| *tx_end + 1).unwrap_or(0);
1027            if tx_start <= tx {
1028                return Some(self.find_fixed_range_with_block_index(
1029                    segment,
1030                    Some(&index.expected_block_ranges_by_max_block),
1031                    block_range.end(),
1032                ));
1033            }
1034        }
1035        None
1036    }
1037
1038    /// Updates the inner transaction and block indexes alongside the internal cached providers in
1039    /// `self.map`.
1040    ///
1041    /// Any entry higher than `segment_max_block` will be deleted from the previous structures.
1042    ///
1043    /// If `segment_max_block` is None it means there's no static file for this segment.
1044    pub fn update_index(
1045        &self,
1046        segment: StaticFileSegment,
1047        segment_max_block: Option<BlockNumber>,
1048    ) -> ProviderResult<()> {
1049        debug!(
1050            target: "providers::static_file",
1051            ?segment,
1052            ?segment_max_block,
1053            "Updating provider index"
1054        );
1055        let mut indexes = self.indexes.write();
1056
1057        match segment_max_block {
1058            Some(segment_max_block) => {
1059                let fixed_range = self.find_fixed_range_with_block_index(
1060                    segment,
1061                    indexes.get(segment).map(|index| &index.expected_block_ranges_by_max_block),
1062                    segment_max_block,
1063                );
1064
1065                let jar = NippyJar::<SegmentHeader>::load(
1066                    &self.path.join(segment.filename(&fixed_range)),
1067                )
1068                .map_err(ProviderError::other)?;
1069
1070                let index = indexes
1071                    .entry(segment)
1072                    .and_modify(|index| {
1073                        // Update max block
1074                        index.max_block = segment_max_block;
1075
1076                        // Update expected block range index
1077
1078                        // Remove all expected block ranges that are less than the new max block
1079                        index
1080                            .expected_block_ranges_by_max_block
1081                            .retain(|_, block_range| block_range.start() < fixed_range.start());
1082                        // Insert new expected block range
1083                        index
1084                            .expected_block_ranges_by_max_block
1085                            .insert(fixed_range.end(), fixed_range);
1086                    })
1087                    .or_insert_with(|| StaticFileSegmentIndex {
1088                        min_block_range: None,
1089                        max_block: segment_max_block,
1090                        expected_block_ranges_by_max_block: BTreeMap::from([(
1091                            fixed_range.end(),
1092                            fixed_range,
1093                        )]),
1094                        available_block_ranges_by_max_tx: None,
1095                    });
1096
1097                // Update min_block to track the lowest block range of the segment.
1098                // This is initially set by initialize_index() on node startup, but must be updated
1099                // as the file grows to prevent stale values.
1100                //
1101                // Without this update, min_block can remain at genesis (e.g. Some([0..=0]) or None)
1102                // even after syncing to higher blocks (e.g. [0..=100]). A stale
1103                // min_block causes get_lowest_static_file_block() to return the
1104                // wrong end value, which breaks pruning logic that relies on it for
1105                // safety checks.
1106                //
1107                // Example progression:
1108                // 1. Node starts, initialize_index() sets min_block = [0..=0]
1109                // 2. Sync to block 100, this update sets min_block = [0..=100]
1110                // 3. Pruner calls get_lowest_static_file_block() -> returns 100 (correct). Without
1111                //    this update, it would incorrectly return 0 (stale)
1112                if let Some(current_block_range) = jar.user_header().block_range() {
1113                    if let Some(min_block_range) = index.min_block_range.as_mut() {
1114                        // delete_jar WILL ALWAYS re-initialize all indexes, so we are always
1115                        // sure that current_min is always the lowest.
1116                        if current_block_range.start() == min_block_range.start() {
1117                            *min_block_range = current_block_range;
1118                        }
1119                    } else {
1120                        index.min_block_range = Some(current_block_range);
1121                    }
1122                }
1123
1124                // Updates the tx index by first removing all entries which have a higher
1125                // block_start than our current static file.
1126                if let Some(tx_range) = jar.user_header().tx_range() {
1127                    // Current block range has the same block start as `fixed_range``, but block end
1128                    // might be different if we are still filling this static file.
1129                    if let Some(current_block_range) = jar.user_header().block_range() {
1130                        let tx_end = tx_range.end();
1131
1132                        // Considering that `update_index` is called when we either append/truncate,
1133                        // we are sure that we are handling the latest data
1134                        // points.
1135                        //
1136                        // Here we remove every entry of the index that has a block start higher or
1137                        // equal than our current one. This is important in the case
1138                        // that we prune a lot of rows resulting in a file (and thus
1139                        // a higher block range) deletion.
1140                        if let Some(index) = index.available_block_ranges_by_max_tx.as_mut() {
1141                            index
1142                                .retain(|_, block_range| block_range.start() < fixed_range.start());
1143                            index.insert(tx_end, current_block_range);
1144                        } else {
1145                            index.available_block_ranges_by_max_tx =
1146                                Some(BTreeMap::from([(tx_end, current_block_range)]));
1147                        }
1148                    }
1149                } else if segment.is_tx_based() {
1150                    // The unwinded file has no more transactions/receipts. However, the highest
1151                    // block is within this files' block range. We only retain
1152                    // entries with block ranges before the current one.
1153                    if let Some(index) = index.available_block_ranges_by_max_tx.as_mut() {
1154                        index.retain(|_, block_range| block_range.start() < fixed_range.start());
1155                    }
1156
1157                    // If the index is empty, just remove it.
1158                    index.available_block_ranges_by_max_tx.take_if(|index| index.is_empty());
1159                }
1160
1161                // Update the cached provider.
1162                debug!(target: "providers::static_file", ?segment, "Inserting updated jar into cache");
1163                self.map.insert((fixed_range.end(), segment), LoadedJar::new(jar)?);
1164
1165                // Delete any cached provider that no longer has an associated jar.
1166                debug!(target: "providers::static_file", ?segment, "Cleaning up jar map");
1167                self.map.retain(|(end, seg), _| !(*seg == segment && *end > fixed_range.end()));
1168            }
1169            None => {
1170                debug!(target: "providers::static_file", ?segment, "Removing segment from index");
1171                indexes.remove(segment);
1172            }
1173        };
1174
1175        debug!(target: "providers::static_file", ?segment, "Updated provider index");
1176        Ok(())
1177    }
1178
1179    /// Initializes the inner transaction and block index
1180    pub fn initialize_index(&self) -> ProviderResult<()> {
1181        let mut indexes = self.indexes.write();
1182        indexes.clear();
1183
1184        for (segment, headers) in &*iter_static_files(&self.path).map_err(ProviderError::other)? {
1185            // Update first and last block for each segment
1186            //
1187            // It's safe to call `expect` here, because every segment has at least one header
1188            // associated with it.
1189            let min_block_range = Some(headers.first().expect("headers are not empty").0);
1190            let max_block = headers.last().expect("headers are not empty").0.end();
1191
1192            let mut expected_block_ranges_by_max_block = BTreeMap::default();
1193            let mut available_block_ranges_by_max_tx = None;
1194
1195            for (block_range, header) in headers {
1196                // Update max expected block -> expected_block_range index
1197                expected_block_ranges_by_max_block
1198                    .insert(header.expected_block_end(), header.expected_block_range());
1199
1200                // Update max tx -> block_range index
1201                if let Some(tx_range) = header.tx_range() {
1202                    let tx_end = tx_range.end();
1203
1204                    available_block_ranges_by_max_tx
1205                        .get_or_insert_with(BTreeMap::default)
1206                        .insert(tx_end, *block_range);
1207                }
1208            }
1209
1210            indexes.insert(
1211                segment,
1212                StaticFileSegmentIndex {
1213                    min_block_range,
1214                    max_block,
1215                    expected_block_ranges_by_max_block,
1216                    available_block_ranges_by_max_tx,
1217                },
1218            );
1219        }
1220
1221        // If this is a re-initialization, we need to clear this as well
1222        self.map.clear();
1223
1224        // initialize the expired history height to the lowest static file block
1225        if let Some(lowest_range) =
1226            indexes.get(StaticFileSegment::Transactions).and_then(|index| index.min_block_range)
1227        {
1228            // the earliest height is the lowest available block number
1229            self.earliest_history_height
1230                .store(lowest_range.start(), std::sync::atomic::Ordering::Relaxed);
1231        }
1232
1233        Ok(())
1234    }
1235
1236    /// Ensures that any broken invariants which cannot be healed on the spot return a pipeline
1237    /// target to unwind to.
1238    ///
1239    /// Two types of consistency checks are done for:
1240    ///
1241    /// 1) When a static file fails to commit but the underlying data was changed.
1242    /// 2) When a static file was committed, but the required database transaction was not.
1243    ///
1244    /// For 1) it can self-heal if `self.access.is_read_only()` is set to `false`. Otherwise, it
1245    /// will return an error.
1246    /// For 2) the invariants below are checked, and if broken, might require a pipeline unwind
1247    /// to heal.
1248    ///
1249    /// For each static file segment:
1250    /// * the corresponding database table should overlap or have continuity in their keys
1251    ///   ([`TxNumber`] or [`BlockNumber`]).
1252    /// * its highest block should match the stage checkpoint block number if it's equal or higher
1253    ///   than the corresponding database table last entry.
1254    ///
1255    /// Returns a [`Option`] of [`PipelineTarget::Unwind`] if any healing is further required.
1256    ///
1257    /// WARNING: No static file writer should be held before calling this function, otherwise it
1258    /// will deadlock.
1259    #[instrument(skip(self, provider), fields(read_only = self.is_read_only()))]
1260    pub fn check_consistency<Provider>(
1261        &self,
1262        provider: &Provider,
1263    ) -> ProviderResult<Option<PipelineTarget>>
1264    where
1265        Provider: DBProvider
1266            + BlockReader
1267            + StageCheckpointReader
1268            + PruneCheckpointReader
1269            + ChainSpecProvider
1270            + StorageSettingsCache,
1271        N: NodePrimitives<Receipt: Value, BlockHeader: Value, SignedTx: Value>,
1272    {
1273        // OVM historical import is broken and does not work with this check. It's importing
1274        // duplicated receipts resulting in having more receipts than the expected transaction
1275        // range.
1276        //
1277        // If we detect an OVM import was done (block #1 <https://optimistic.etherscan.io/block/1>), skip it.
1278        // More on [#11099](https://github.com/paradigmxyz/reth/pull/11099).
1279        if provider.chain_spec().is_optimism() &&
1280            reth_chainspec::Chain::optimism_mainnet() == provider.chain_spec().chain_id()
1281        {
1282            // check whether we have the first OVM block: <https://optimistic.etherscan.io/block/0xbee7192e575af30420cae0c7776304ac196077ee72b048970549e4f08e875453>
1283            const OVM_HEADER_1_HASH: B256 =
1284                b256!("0xbee7192e575af30420cae0c7776304ac196077ee72b048970549e4f08e875453");
1285            if provider.block_number(OVM_HEADER_1_HASH)?.is_some() {
1286                info!(target: "reth::cli",
1287                    "Skipping storage verification for OP mainnet, expected inconsistency in OVM chain"
1288                );
1289                return Ok(None);
1290            }
1291        }
1292
1293        info!(target: "reth::cli", "Verifying storage consistency.");
1294
1295        let mut unwind_target: Option<BlockNumber> = None;
1296
1297        let mut update_unwind_target = |new_target| {
1298            unwind_target =
1299                unwind_target.map(|current| current.min(new_target)).or(Some(new_target));
1300        };
1301
1302        for segment in self.segments_to_check(provider) {
1303            let span = info_span!(
1304                "Checking consistency for segment",
1305                ?segment,
1306                initial_highest_block = tracing::field::Empty,
1307                highest_block = tracing::field::Empty,
1308                highest_tx = tracing::field::Empty,
1309            );
1310            let _guard = span.enter();
1311
1312            debug!(target: "reth::providers::static_file", "Checking consistency for segment");
1313
1314            // Heal file-level inconsistencies and get before/after highest block
1315            let (initial_highest_block, mut highest_block) = self.maybe_heal_segment(segment)?;
1316            span.record("initial_highest_block", initial_highest_block);
1317            span.record("highest_block", highest_block);
1318
1319            // Only applies to block-based static files. (Headers)
1320            //
1321            // The updated `highest_block` may have decreased if we healed from a pruning
1322            // interruption.
1323            if initial_highest_block != highest_block {
1324                info!(
1325                    target: "reth::providers::static_file",
1326                    unwind_target = highest_block,
1327                    "Setting unwind target."
1328                );
1329                update_unwind_target(highest_block.unwrap_or_default());
1330            }
1331
1332            // Only applies to transaction-based static files. (Receipts & Transactions)
1333            //
1334            // Make sure the last transaction matches the last block from its indices, since a heal
1335            // from a pruning interruption might have decreased the number of transactions without
1336            // being able to update the last block of the static file segment.
1337            let highest_tx = self.get_highest_static_file_tx(segment);
1338            span.record("highest_tx", highest_tx);
1339            debug!(target: "reth::providers::static_file", "Checking tx index segment");
1340
1341            if let Some(highest_tx) = highest_tx {
1342                let mut last_block = highest_block.unwrap_or_default();
1343                debug!(target: "reth::providers::static_file", last_block, highest_tx, "Verifying last transaction matches last block indices");
1344                loop {
1345                    let Some(indices) = provider.block_body_indices(last_block)? else {
1346                        debug!(target: "reth::providers::static_file", last_block, "Block body indices not found, static files ahead of database");
1347                        // If the block body indices can not be found, then it means that static
1348                        // files is ahead of database, and the `ensure_invariants` check will fix
1349                        // it by comparing with stage checkpoints.
1350                        break
1351                    };
1352
1353                    debug!(target: "reth::providers::static_file", last_block, last_tx_num = indices.last_tx_num(), "Found block body indices");
1354
1355                    if indices.last_tx_num() <= highest_tx {
1356                        break
1357                    }
1358
1359                    if last_block == 0 {
1360                        debug!(target: "reth::providers::static_file", "Reached block 0 in verification loop");
1361                        break
1362                    }
1363
1364                    last_block -= 1;
1365
1366                    info!(
1367                        target: "reth::providers::static_file",
1368                        highest_block = self.get_highest_static_file_block(segment),
1369                        unwind_target = last_block,
1370                        "Setting unwind target."
1371                    );
1372                    span.record("highest_block", last_block);
1373                    highest_block = Some(last_block);
1374                    update_unwind_target(last_block);
1375                }
1376            }
1377
1378            debug!(target: "reth::providers::static_file", "Ensuring invariants for segment");
1379
1380            match self.ensure_invariants_for(provider, segment, highest_tx, highest_block)? {
1381                Some(unwind) => {
1382                    debug!(target: "reth::providers::static_file", unwind_target=unwind, "Invariants check returned unwind target");
1383                    update_unwind_target(unwind);
1384                }
1385                None => {
1386                    debug!(target: "reth::providers::static_file", "Invariants check completed, no unwind needed")
1387                }
1388            }
1389        }
1390
1391        Ok(unwind_target.map(PipelineTarget::Unwind))
1392    }
1393
1394    /// Heals file-level (`NippyJar`) inconsistencies for eligible static file
1395    /// segments.
1396    ///
1397    /// Call before [`Self::check_consistency`] so files are internally
1398    /// consistent.
1399    ///
1400    /// Uses the same segment-skip logic as [`Self::check_consistency`], but
1401    /// does not compare with database checkpoints or prune against them.
1402    pub fn check_file_consistency<Provider>(&self, provider: &Provider) -> ProviderResult<()>
1403    where
1404        Provider: DBProvider + ChainSpecProvider + StorageSettingsCache + PruneCheckpointReader,
1405    {
1406        info!(target: "reth::cli", "Healing static file inconsistencies.");
1407
1408        for segment in self.segments_to_check(provider) {
1409            let _guard = info_span!("healing_static_file_segment", ?segment).entered();
1410            let _ = self.maybe_heal_segment(segment)?;
1411        }
1412
1413        Ok(())
1414    }
1415
1416    /// Returns the static file segments that should be checked/healed for this provider.
1417    fn segments_to_check<'a, Provider>(
1418        &'a self,
1419        provider: &'a Provider,
1420    ) -> impl Iterator<Item = StaticFileSegment> + 'a
1421    where
1422        Provider: DBProvider + ChainSpecProvider + StorageSettingsCache + PruneCheckpointReader,
1423    {
1424        StaticFileSegment::iter()
1425            .filter(move |segment| self.should_check_segment(provider, *segment))
1426    }
1427
1428    /// True if the given segment should be checked/healed for this provider.
1429    fn should_check_segment<Provider>(
1430        &self,
1431        provider: &Provider,
1432        segment: StaticFileSegment,
1433    ) -> bool
1434    where
1435        Provider: DBProvider + ChainSpecProvider + StorageSettingsCache + PruneCheckpointReader,
1436    {
1437        match segment {
1438            StaticFileSegment::Headers | StaticFileSegment::Transactions => true,
1439            StaticFileSegment::Receipts => {
1440                if EitherWriter::receipts_destination(provider).is_database() {
1441                    // Old pruned nodes (including full node) do not store receipts as static
1442                    // files.
1443                    debug!(target: "reth::providers::static_file", ?segment, "Skipping receipts segment: receipts stored in database");
1444                    return false;
1445                }
1446
1447                if NamedChain::Gnosis == provider.chain_spec().chain_id() ||
1448                    NamedChain::Chiado == provider.chain_spec().chain_id()
1449                {
1450                    // Gnosis and Chiado's historical import is broken and does not work with
1451                    // this check. They are importing receipts along
1452                    // with importing headers/bodies.
1453                    debug!(target: "reth::providers::static_file", ?segment, "Skipping receipts segment: broken historical import for gnosis/chiado");
1454                    return false;
1455                }
1456
1457                true
1458            }
1459            StaticFileSegment::TransactionSenders => {
1460                if EitherWriterDestination::senders(provider).is_database() {
1461                    debug!(target: "reth::providers::static_file", ?segment, "Skipping senders segment: senders stored in database");
1462                    return false;
1463                }
1464
1465                if Self::is_segment_fully_pruned(provider, PruneSegment::SenderRecovery) {
1466                    debug!(target: "reth::providers::static_file", ?segment, "Skipping senders segment: fully pruned");
1467                    return false;
1468                }
1469
1470                true
1471            }
1472            StaticFileSegment::AccountChangeSets => {
1473                if EitherWriter::account_changesets_destination(provider).is_database() {
1474                    debug!(target: "reth::providers::static_file", ?segment, "Skipping account changesets segment: changesets stored in database");
1475                    return false;
1476                }
1477                true
1478            }
1479            StaticFileSegment::StorageChangeSets => {
1480                if EitherWriter::storage_changesets_destination(provider).is_database() {
1481                    debug!(target: "reth::providers::static_file", ?segment, "Skipping storage changesets segment: changesets stored in database");
1482                    return false
1483                }
1484                true
1485            }
1486        }
1487    }
1488
1489    /// Returns `true` if the given prune segment has a checkpoint with
1490    /// [`reth_prune_types::PruneMode::Full`], indicating all data for this segment has been
1491    /// intentionally deleted.
1492    fn is_segment_fully_pruned<Provider>(provider: &Provider, segment: PruneSegment) -> bool
1493    where
1494        Provider: PruneCheckpointReader,
1495    {
1496        provider
1497            .get_prune_checkpoint(segment)
1498            .ok()
1499            .flatten()
1500            .is_some_and(|checkpoint| checkpoint.prune_mode.is_full())
1501    }
1502
1503    /// Checks consistency of the latest static file segment and throws an
1504    /// error if at fault.
1505    ///
1506    /// Read-only.
1507    fn check_segment_consistency(&self, segment: StaticFileSegment) -> ProviderResult<()> {
1508        debug!(target: "reth::providers::static_file", "Checking segment consistency");
1509        if let Some(latest_block) = self.get_highest_static_file_block(segment) {
1510            let file_path = self
1511                .directory()
1512                .join(segment.filename(&self.find_fixed_range(segment, latest_block)));
1513            debug!(target: "reth::providers::static_file", ?file_path, latest_block, "Loading NippyJar for consistency check");
1514
1515            let jar = NippyJar::<SegmentHeader>::load(&file_path).map_err(ProviderError::other)?;
1516            debug!(target: "reth::providers::static_file", "NippyJar loaded, checking consistency");
1517
1518            NippyJarChecker::new(jar).check_consistency().map_err(ProviderError::other)?;
1519            debug!(target: "reth::providers::static_file", "NippyJar consistency check passed");
1520        } else {
1521            debug!(target: "reth::providers::static_file", "No static file block found, skipping consistency check");
1522        }
1523        Ok(())
1524    }
1525
1526    /// Attempts to heal file-level (`NippyJar`) inconsistencies for a single static file segment.
1527    ///
1528    /// Returns the highest block before and after healing, which can be used to detect
1529    /// if healing from a pruning interruption decreased the highest block.
1530    ///
1531    /// File consistency is broken if:
1532    ///
1533    /// * appending data was interrupted before a config commit, then data file will be truncated
1534    ///   according to the config.
1535    ///
1536    /// * pruning data was interrupted before a config commit, then we have deleted data that we are
1537    ///   expected to still have. We need to check the Database and unwind everything accordingly.
1538    ///
1539    /// **Note:** In read-only mode, this will return an error if a consistency issue is detected,
1540    /// since healing requires write access.
1541    fn maybe_heal_segment(
1542        &self,
1543        segment: StaticFileSegment,
1544    ) -> ProviderResult<(Option<BlockNumber>, Option<BlockNumber>)> {
1545        let initial_highest_block = self.get_highest_static_file_block(segment);
1546        debug!(target: "reth::providers::static_file", ?initial_highest_block, "Initial highest block for segment");
1547
1548        if self.access.is_read_only() {
1549            // Read-only mode: cannot modify files, so just validate consistency and error if
1550            // broken.
1551            debug!(target: "reth::providers::static_file", "Checking segment consistency (read-only)");
1552            self.check_segment_consistency(segment)?;
1553        } else {
1554            // Writable mode: fetching the writer will automatically heal any file-level
1555            // inconsistency by truncating data to match the last committed config.
1556            debug!(target: "reth::providers::static_file", "Fetching latest writer which might heal any potential inconsistency");
1557            self.latest_writer(segment)?;
1558        }
1559
1560        // The updated `highest_block` may have decreased if we healed from a
1561        // pruning interruption.
1562        let highest_block = self.get_highest_static_file_block(segment);
1563
1564        Ok((initial_highest_block, highest_block))
1565    }
1566
1567    /// Ensure invariants for each corresponding table and static file segment.
1568    fn ensure_invariants_for<Provider>(
1569        &self,
1570        provider: &Provider,
1571        segment: StaticFileSegment,
1572        highest_tx: Option<u64>,
1573        highest_block: Option<BlockNumber>,
1574    ) -> ProviderResult<Option<BlockNumber>>
1575    where
1576        Provider: DBProvider + BlockReader + StageCheckpointReader,
1577        N: NodePrimitives<Receipt: Value, BlockHeader: Value, SignedTx: Value>,
1578    {
1579        match segment {
1580            StaticFileSegment::Headers => self
1581                .ensure_invariants::<_, tables::Headers<N::BlockHeader>>(
1582                    provider,
1583                    segment,
1584                    highest_block,
1585                    highest_block,
1586                ),
1587            StaticFileSegment::Transactions => self
1588                .ensure_invariants::<_, tables::Transactions<N::SignedTx>>(
1589                    provider,
1590                    segment,
1591                    highest_tx,
1592                    highest_block,
1593                ),
1594            StaticFileSegment::Receipts => self
1595                .ensure_invariants::<_, tables::Receipts<N::Receipt>>(
1596                    provider,
1597                    segment,
1598                    highest_tx,
1599                    highest_block,
1600                ),
1601            StaticFileSegment::TransactionSenders => self
1602                .ensure_invariants::<_, tables::TransactionSenders>(
1603                    provider,
1604                    segment,
1605                    highest_tx,
1606                    highest_block,
1607                ),
1608            StaticFileSegment::AccountChangeSets => self
1609                .ensure_invariants::<_, tables::AccountChangeSets>(
1610                    provider,
1611                    segment,
1612                    highest_tx,
1613                    highest_block,
1614                ),
1615            StaticFileSegment::StorageChangeSets => self
1616                .ensure_changeset_invariants_by_block::<_, tables::StorageChangeSets, _>(
1617                    provider,
1618                    segment,
1619                    highest_block,
1620                    |key| key.block_number(),
1621                ),
1622        }
1623    }
1624
1625    /// Check invariants for each corresponding table and static file segment:
1626    ///
1627    /// * the corresponding database table should overlap or have continuity in their keys
1628    ///   ([`TxNumber`] or [`BlockNumber`]).
1629    /// * its highest block should match the stage checkpoint block number if it's equal or higher
1630    ///   than the corresponding database table last entry.
1631    ///   * If the checkpoint block is higher, then request a pipeline unwind to the static file
1632    ///     block. This is expressed by returning [`Some`] with the requested pipeline unwind
1633    ///     target.
1634    ///   * If the checkpoint block is lower, then heal by removing rows from the static file. In
1635    ///     this case, the rows will be removed and [`None`] will be returned.
1636    ///
1637    /// * If the database tables overlap with static files and have contiguous keys, or the
1638    ///   checkpoint block matches the highest static files block, then [`None`] will be returned.
1639    #[instrument(skip(self, provider, segment), fields(table = T::NAME))]
1640    fn ensure_invariants<Provider, T: Table<Key = u64>>(
1641        &self,
1642        provider: &Provider,
1643        segment: StaticFileSegment,
1644        highest_static_file_entry: Option<u64>,
1645        highest_static_file_block: Option<BlockNumber>,
1646    ) -> ProviderResult<Option<BlockNumber>>
1647    where
1648        Provider: DBProvider + BlockReader + StageCheckpointReader,
1649    {
1650        debug!(target: "reth::providers::static_file", "Ensuring invariants");
1651        let mut db_cursor = provider.tx_ref().cursor_read::<T>()?;
1652
1653        if let Some((db_first_entry, _)) = db_cursor.first()? {
1654            debug!(target: "reth::providers::static_file", db_first_entry, "Found first database entry");
1655            if let (Some(highest_entry), Some(highest_block)) =
1656                (highest_static_file_entry, highest_static_file_block)
1657            {
1658                // If there is a gap between the entry found in static file and
1659                // database, then we have most likely lost static file data and need to unwind so we
1660                // can load it again
1661                if !(db_first_entry <= highest_entry || highest_entry + 1 == db_first_entry) {
1662                    info!(
1663                        target: "reth::providers::static_file",
1664                        ?db_first_entry,
1665                        ?highest_entry,
1666                        unwind_target = highest_block,
1667                        "Setting unwind target."
1668                    );
1669                    return Ok(Some(highest_block));
1670                }
1671            }
1672
1673            if let Some((db_last_entry, _)) = db_cursor.last()? &&
1674                highest_static_file_entry
1675                    .is_none_or(|highest_entry| db_last_entry > highest_entry)
1676            {
1677                debug!(target: "reth::providers::static_file", db_last_entry, "Database has entries beyond static files, no unwind needed");
1678                return Ok(None)
1679            }
1680        } else {
1681            debug!(target: "reth::providers::static_file", "No database entries found");
1682        }
1683
1684        let highest_static_file_entry = highest_static_file_entry.unwrap_or_default();
1685        let highest_static_file_block = highest_static_file_block.unwrap_or_default();
1686
1687        // If static file entry is ahead of the database entries, then ensure the checkpoint block
1688        // number matches.
1689        let stage_id = segment.to_stage_id();
1690        let checkpoint_block_number =
1691            provider.get_stage_checkpoint(stage_id)?.unwrap_or_default().block_number;
1692        debug!(target: "reth::providers::static_file", ?stage_id, checkpoint_block_number, "Retrieved stage checkpoint");
1693
1694        // If the checkpoint is ahead, then we lost static file data. May be data corruption.
1695        if checkpoint_block_number > highest_static_file_block {
1696            info!(
1697                target: "reth::providers::static_file",
1698                checkpoint_block_number,
1699                unwind_target = highest_static_file_block,
1700                "Setting unwind target."
1701            );
1702            return Ok(Some(highest_static_file_block));
1703        }
1704
1705        // If the checkpoint is ahead, or matches, then nothing to do.
1706        if checkpoint_block_number >= highest_static_file_block {
1707            debug!(target: "reth::providers::static_file", "Invariants ensured, returning None");
1708            return Ok(None);
1709        }
1710
1711        // If the checkpoint is behind, then we failed to do a database commit
1712        // **but committed** to static files on executing a stage, or the
1713        // reverse on unwinding a stage.
1714        //
1715        // All we need to do is to prune the extra static file rows.
1716        info!(
1717            target: "reth::providers",
1718            from = highest_static_file_block,
1719            to = checkpoint_block_number,
1720            "Unwinding static file segment."
1721        );
1722        let mut writer = self.latest_writer(segment)?;
1723
1724        match segment {
1725            StaticFileSegment::Headers => {
1726                let prune_count = highest_static_file_block - checkpoint_block_number;
1727                debug!(target: "reth::providers::static_file", prune_count, "Pruning headers");
1728                // TODO(joshie): is_block_meta
1729                writer.prune_headers(prune_count)?;
1730            }
1731            StaticFileSegment::Transactions |
1732            StaticFileSegment::Receipts |
1733            StaticFileSegment::TransactionSenders => {
1734                if let Some(block) = provider.block_body_indices(checkpoint_block_number)? {
1735                    let number = highest_static_file_entry - block.last_tx_num();
1736                    debug!(target: "reth::providers::static_file", prune_count = number, checkpoint_block_number, "Pruning transaction based segment");
1737
1738                    match segment {
1739                        StaticFileSegment::Transactions => {
1740                            writer.prune_transactions(number, checkpoint_block_number)?
1741                        }
1742                        StaticFileSegment::Receipts => {
1743                            writer.prune_receipts(number, checkpoint_block_number)?
1744                        }
1745                        StaticFileSegment::TransactionSenders => {
1746                            writer.prune_transaction_senders(number, checkpoint_block_number)?
1747                        }
1748                        StaticFileSegment::Headers |
1749                        StaticFileSegment::AccountChangeSets |
1750                        StaticFileSegment::StorageChangeSets => {
1751                            unreachable!()
1752                        }
1753                    }
1754                } else {
1755                    debug!(target: "reth::providers::static_file", checkpoint_block_number, "No block body indices found for checkpoint block");
1756                }
1757            }
1758            StaticFileSegment::AccountChangeSets => {
1759                writer.prune_account_changesets(checkpoint_block_number)?;
1760            }
1761            StaticFileSegment::StorageChangeSets => {
1762                writer.prune_storage_changesets(checkpoint_block_number)?;
1763            }
1764        }
1765
1766        debug!(target: "reth::providers::static_file", "Committing writer after pruning");
1767        writer.commit()?;
1768        debug!(target: "reth::providers::static_file", "Writer committed successfully");
1769
1770        debug!(target: "reth::providers::static_file", "Invariants ensured, returning None");
1771        Ok(None)
1772    }
1773
1774    fn ensure_changeset_invariants_by_block<Provider, T, F>(
1775        &self,
1776        provider: &Provider,
1777        segment: StaticFileSegment,
1778        highest_static_file_block: Option<BlockNumber>,
1779        block_from_key: F,
1780    ) -> ProviderResult<Option<BlockNumber>>
1781    where
1782        Provider: DBProvider + BlockReader + StageCheckpointReader,
1783        T: Table,
1784        F: Fn(&T::Key) -> BlockNumber,
1785    {
1786        debug!(
1787            target: "reth::providers::static_file",
1788            ?segment,
1789            ?highest_static_file_block,
1790            "Ensuring changeset invariants"
1791        );
1792        let mut db_cursor = provider.tx_ref().cursor_read::<T>()?;
1793
1794        if let Some((db_first_key, _)) = db_cursor.first()? {
1795            let db_first_block = block_from_key(&db_first_key);
1796            if let Some(highest_block) = highest_static_file_block &&
1797                !(db_first_block <= highest_block || highest_block + 1 == db_first_block)
1798            {
1799                info!(
1800                    target: "reth::providers::static_file",
1801                    ?db_first_block,
1802                    ?highest_block,
1803                    unwind_target = highest_block,
1804                    ?segment,
1805                    "Setting unwind target."
1806                );
1807                return Ok(Some(highest_block))
1808            }
1809
1810            if let Some((db_last_key, _)) = db_cursor.last()? &&
1811                highest_static_file_block
1812                    .is_none_or(|highest_block| block_from_key(&db_last_key) > highest_block)
1813            {
1814                debug!(
1815                    target: "reth::providers::static_file",
1816                    ?segment,
1817                    "Database has entries beyond static files, no unwind needed"
1818                );
1819                return Ok(None)
1820            }
1821        } else {
1822            debug!(target: "reth::providers::static_file", ?segment, "No database entries found");
1823        }
1824
1825        let highest_static_file_block = highest_static_file_block.unwrap_or_default();
1826
1827        let stage_id = segment.to_stage_id();
1828        let checkpoint_block_number =
1829            provider.get_stage_checkpoint(stage_id)?.unwrap_or_default().block_number;
1830
1831        if checkpoint_block_number > highest_static_file_block {
1832            info!(
1833                target: "reth::providers::static_file",
1834                checkpoint_block_number,
1835                unwind_target = highest_static_file_block,
1836                ?segment,
1837                "Setting unwind target."
1838            );
1839            return Ok(Some(highest_static_file_block))
1840        }
1841
1842        if checkpoint_block_number < highest_static_file_block {
1843            info!(
1844                target: "reth::providers",
1845                ?segment,
1846                from = highest_static_file_block,
1847                to = checkpoint_block_number,
1848                "Unwinding static file segment."
1849            );
1850            let mut writer = self.latest_writer(segment)?;
1851            match segment {
1852                StaticFileSegment::AccountChangeSets => {
1853                    writer.prune_account_changesets(checkpoint_block_number)?;
1854                }
1855                StaticFileSegment::StorageChangeSets => {
1856                    writer.prune_storage_changesets(checkpoint_block_number)?;
1857                }
1858                _ => unreachable!("invalid segment for changeset invariants"),
1859            }
1860            writer.commit()?;
1861        }
1862
1863        Ok(None)
1864    }
1865
1866    /// Returns the earliest available block number that has not been expired and is still
1867    /// available.
1868    ///
1869    /// This means that the highest expired block (or expired block height) is
1870    /// `earliest_history_height.saturating_sub(1)`.
1871    ///
1872    /// Returns `0` if no history has been expired.
1873    pub fn earliest_history_height(&self) -> BlockNumber {
1874        self.earliest_history_height.load(std::sync::atomic::Ordering::Relaxed)
1875    }
1876
1877    /// Gets the lowest static file's block range if it exists for a static file segment.
1878    ///
1879    /// If there is nothing on disk for the given segment, this will return [`None`].
1880    pub fn get_lowest_range(&self, segment: StaticFileSegment) -> Option<SegmentRangeInclusive> {
1881        self.indexes.read().get(segment).and_then(|index| index.min_block_range)
1882    }
1883
1884    /// Gets the lowest static file's block range start if it exists for a static file segment.
1885    ///
1886    /// For example if the lowest static file has blocks 0-499, this will return 0.
1887    ///
1888    /// If there is nothing on disk for the given segment, this will return [`None`].
1889    pub fn get_lowest_range_start(&self, segment: StaticFileSegment) -> Option<BlockNumber> {
1890        self.get_lowest_range(segment).map(|range| range.start())
1891    }
1892
1893    /// Gets the lowest static file's block range end if it exists for a static file segment.
1894    ///
1895    /// For example if the static file has blocks 0-499, this will return 499.
1896    ///
1897    /// If there is nothing on disk for the given segment, this will return [`None`].
1898    pub fn get_lowest_range_end(&self, segment: StaticFileSegment) -> Option<BlockNumber> {
1899        self.get_lowest_range(segment).map(|range| range.end())
1900    }
1901
1902    /// Gets the highest static file's block height if it exists for a static file segment.
1903    ///
1904    /// If there is nothing on disk for the given segment, this will return [`None`].
1905    pub fn get_highest_static_file_block(&self, segment: StaticFileSegment) -> Option<BlockNumber> {
1906        self.indexes.read().get(segment).map(|index| index.max_block)
1907    }
1908
1909    /// Converts a range to a bounded `RangeInclusive` capped to the highest static file block.
1910    ///
1911    /// This is necessary because static file iteration beyond the tip would loop forever:
1912    /// blocks beyond the static file tip return `Ok(empty)` which is indistinguishable from
1913    /// blocks with no changes. We cap the end to the highest available block regardless of
1914    /// whether the input was unbounded or an explicit large value like `BlockNumber::MAX`.
1915    fn bound_range(
1916        &self,
1917        range: impl RangeBounds<BlockNumber>,
1918        segment: StaticFileSegment,
1919    ) -> RangeInclusive<BlockNumber> {
1920        let highest_block = self.get_highest_static_file_block(segment).unwrap_or(0);
1921
1922        let start = match range.start_bound() {
1923            Bound::Included(&n) => n,
1924            Bound::Excluded(&n) => n.saturating_add(1),
1925            Bound::Unbounded => 0,
1926        };
1927        let end = match range.end_bound() {
1928            Bound::Included(&n) => n.min(highest_block),
1929            Bound::Excluded(&n) => n.saturating_sub(1).min(highest_block),
1930            Bound::Unbounded => highest_block,
1931        };
1932
1933        start..=end
1934    }
1935
1936    /// Gets the highest static file transaction.
1937    ///
1938    /// If there is nothing on disk for the given segment, this will return [`None`].
1939    pub fn get_highest_static_file_tx(&self, segment: StaticFileSegment) -> Option<TxNumber> {
1940        self.indexes
1941            .read()
1942            .get(segment)
1943            .and_then(|index| index.available_block_ranges_by_max_tx.as_ref())
1944            .and_then(|index| index.last_key_value().map(|(last_tx, _)| *last_tx))
1945    }
1946
1947    /// Gets the highest static file block for all segments.
1948    pub fn get_highest_static_files(&self) -> HighestStaticFiles {
1949        HighestStaticFiles {
1950            receipts: self.get_highest_static_file_block(StaticFileSegment::Receipts),
1951        }
1952    }
1953
1954    /// Iterates through segment `static_files` in reverse order, executing a function until it
1955    /// returns some object. Useful for finding objects by [`TxHash`] or [`BlockHash`].
1956    pub fn find_static_file<T>(
1957        &self,
1958        segment: StaticFileSegment,
1959        func: impl Fn(StaticFileJarProvider<'_, N>) -> ProviderResult<Option<T>>,
1960    ) -> ProviderResult<Option<T>> {
1961        if let Some(ranges) =
1962            self.indexes.read().get(segment).map(|index| &index.expected_block_ranges_by_max_block)
1963        {
1964            // Iterate through all ranges in reverse order (highest to lowest)
1965            for range in ranges.values().rev() {
1966                if let Some(res) = func(self.get_or_create_jar_provider(segment, range)?)? {
1967                    return Ok(Some(res));
1968                }
1969            }
1970        }
1971
1972        Ok(None)
1973    }
1974
1975    /// Fetches data within a specified range across multiple static files.
1976    ///
1977    /// This function iteratively retrieves data using `get_fn` for each item in the given range.
1978    /// It continues fetching until the end of the range is reached or the provided `predicate`
1979    /// returns false.
1980    pub fn fetch_range_with_predicate<T, F, P>(
1981        &self,
1982        segment: StaticFileSegment,
1983        range: Range<u64>,
1984        mut get_fn: F,
1985        mut predicate: P,
1986    ) -> ProviderResult<Vec<T>>
1987    where
1988        F: FnMut(&mut StaticFileCursor<'_>, u64) -> ProviderResult<Option<T>>,
1989        P: FnMut(&T) -> bool,
1990    {
1991        let mut result = Vec::with_capacity((range.end - range.start).min(100) as usize);
1992
1993        /// Resolves to the provider for the given block or transaction number.
1994        ///
1995        /// If the static file is missing, the `result` is returned.
1996        macro_rules! get_provider {
1997            ($number:expr) => {{
1998                match self.get_segment_provider(segment, $number) {
1999                    Ok(provider) => provider,
2000                    Err(
2001                        ProviderError::MissingStaticFileBlock(_, _) |
2002                        ProviderError::MissingStaticFileTx(_, _),
2003                    ) => return Ok(result),
2004                    Err(err) => return Err(err),
2005                }
2006            }};
2007        }
2008
2009        let mut provider = get_provider!(range.start);
2010        let mut cursor = provider.cursor()?;
2011
2012        // advances number in range
2013        'outer: for number in range {
2014            // The `retrying` flag ensures a single retry attempt per `number`. If `get_fn` fails to
2015            // access data in two different static files, it halts further attempts by returning
2016            // an error, effectively preventing infinite retry loops.
2017            let mut retrying = false;
2018
2019            // advances static files if `get_fn` returns None
2020            'inner: loop {
2021                match get_fn(&mut cursor, number)? {
2022                    Some(res) => {
2023                        if !predicate(&res) {
2024                            break 'outer;
2025                        }
2026                        result.push(res);
2027                        break 'inner;
2028                    }
2029                    None => {
2030                        if retrying {
2031                            return Ok(result);
2032                        }
2033                        // There is a very small chance of hitting a deadlock if two consecutive
2034                        // static files share the same bucket in the
2035                        // internal dashmap and we don't drop the current provider
2036                        // before requesting the next one.
2037                        drop(cursor);
2038                        drop(provider);
2039                        provider = get_provider!(number);
2040                        cursor = provider.cursor()?;
2041                        retrying = true;
2042                    }
2043                }
2044            }
2045        }
2046
2047        result.shrink_to_fit();
2048
2049        Ok(result)
2050    }
2051
2052    /// Fetches data within a specified range across multiple static files.
2053    ///
2054    /// Returns an iterator over the data. Yields [`None`] if the data for the specified number is
2055    /// not found.
2056    pub fn fetch_range_iter<'a, T, F>(
2057        &'a self,
2058        segment: StaticFileSegment,
2059        range: Range<u64>,
2060        get_fn: F,
2061    ) -> ProviderResult<impl Iterator<Item = ProviderResult<Option<T>>> + 'a>
2062    where
2063        F: Fn(&mut StaticFileCursor<'_>, u64) -> ProviderResult<Option<T>> + 'a,
2064        T: std::fmt::Debug,
2065    {
2066        let mut provider = self.get_maybe_segment_provider(segment, range.start)?;
2067        Ok(range.map(move |number| {
2068            match provider
2069                .as_ref()
2070                .map(|provider| get_fn(&mut provider.cursor()?, number))
2071                .and_then(|result| result.transpose())
2072            {
2073                Some(result) => result.map(Some),
2074                None => {
2075                    // There is a very small chance of hitting a deadlock if two consecutive
2076                    // static files share the same bucket in the internal dashmap and we don't drop
2077                    // the current provider before requesting the next one.
2078                    provider.take();
2079                    provider = self.get_maybe_segment_provider(segment, number)?;
2080                    provider
2081                        .as_ref()
2082                        .map(|provider| get_fn(&mut provider.cursor()?, number))
2083                        .and_then(|result| result.transpose())
2084                        .transpose()
2085                }
2086            }
2087        }))
2088    }
2089
2090    /// Returns directory where `static_files` are located.
2091    pub fn directory(&self) -> &Path {
2092        &self.path
2093    }
2094
2095    /// Retrieves data from the database or static file, wherever it's available.
2096    ///
2097    /// # Arguments
2098    /// * `segment` - The segment of the static file to check against.
2099    /// * `index_key` - Requested index key, usually a block or transaction number.
2100    /// * `fetch_from_static_file` - A closure that defines how to fetch the data from the static
2101    ///   file provider.
2102    /// * `fetch_from_database` - A closure that defines how to fetch the data from the database
2103    ///   when the static file doesn't contain the required data or is not available.
2104    pub fn get_with_static_file_or_database<T, FS, FD>(
2105        &self,
2106        segment: StaticFileSegment,
2107        number: u64,
2108        fetch_from_static_file: FS,
2109        fetch_from_database: FD,
2110    ) -> ProviderResult<Option<T>>
2111    where
2112        FS: Fn(&Self) -> ProviderResult<Option<T>>,
2113        FD: Fn() -> ProviderResult<Option<T>>,
2114    {
2115        // If there is, check the maximum block or transaction number of the segment.
2116        let static_file_upper_bound = if segment.is_block_or_change_based() {
2117            self.get_highest_static_file_block(segment)
2118        } else {
2119            self.get_highest_static_file_tx(segment)
2120        };
2121
2122        if static_file_upper_bound
2123            .is_some_and(|static_file_upper_bound| static_file_upper_bound >= number)
2124        {
2125            return fetch_from_static_file(self);
2126        }
2127        fetch_from_database()
2128    }
2129
2130    /// Gets data within a specified range, potentially spanning different `static_files` and
2131    /// database.
2132    ///
2133    /// # Arguments
2134    /// * `segment` - The segment of the static file to query.
2135    /// * `block_or_tx_range` - The range of data to fetch.
2136    /// * `fetch_from_static_file` - A function to fetch data from the `static_file`.
2137    /// * `fetch_from_database` - A function to fetch data from the database.
2138    /// * `predicate` - A function used to evaluate each item in the fetched data. Fetching is
2139    ///   terminated when this function returns false, thereby filtering the data based on the
2140    ///   provided condition.
2141    pub fn get_range_with_static_file_or_database<T, P, FS, FD>(
2142        &self,
2143        segment: StaticFileSegment,
2144        mut block_or_tx_range: Range<u64>,
2145        fetch_from_static_file: FS,
2146        mut fetch_from_database: FD,
2147        mut predicate: P,
2148    ) -> ProviderResult<Vec<T>>
2149    where
2150        FS: Fn(&Self, Range<u64>, &mut P) -> ProviderResult<Vec<T>>,
2151        FD: FnMut(Range<u64>, P) -> ProviderResult<Vec<T>>,
2152        P: FnMut(&T) -> bool,
2153    {
2154        let mut data = Vec::new();
2155
2156        // If there is, check the maximum block or transaction number of the segment.
2157        if let Some(static_file_upper_bound) = if segment.is_block_or_change_based() {
2158            self.get_highest_static_file_block(segment)
2159        } else {
2160            self.get_highest_static_file_tx(segment)
2161        } && block_or_tx_range.start <= static_file_upper_bound
2162        {
2163            let end = block_or_tx_range.end.min(static_file_upper_bound + 1);
2164            data.extend(fetch_from_static_file(
2165                self,
2166                block_or_tx_range.start..end,
2167                &mut predicate,
2168            )?);
2169            block_or_tx_range.start = end;
2170        }
2171
2172        if block_or_tx_range.end > block_or_tx_range.start {
2173            data.extend(fetch_from_database(block_or_tx_range, predicate)?)
2174        }
2175
2176        Ok(data)
2177    }
2178
2179    /// Returns static files directory
2180    #[cfg(any(test, feature = "test-utils"))]
2181    pub fn path(&self) -> &Path {
2182        &self.path
2183    }
2184
2185    /// Returns transaction index
2186    #[cfg(any(test, feature = "test-utils"))]
2187    pub fn tx_index(&self, segment: StaticFileSegment) -> Option<SegmentRanges> {
2188        self.indexes
2189            .read()
2190            .get(segment)
2191            .and_then(|index| index.available_block_ranges_by_max_tx.as_ref())
2192            .cloned()
2193    }
2194
2195    /// Returns expected block index
2196    #[cfg(any(test, feature = "test-utils"))]
2197    pub fn expected_block_index(&self, segment: StaticFileSegment) -> Option<SegmentRanges> {
2198        self.indexes
2199            .read()
2200            .get(segment)
2201            .map(|index| &index.expected_block_ranges_by_max_block)
2202            .cloned()
2203    }
2204}
2205
2206#[derive(Debug)]
2207struct StaticFileSegmentIndex {
2208    /// Min static file block range.
2209    ///
2210    /// This index is initialized on launch to keep track of the lowest, non-expired static file
2211    /// per segment and gets updated on [`StaticFileProvider::update_index`].
2212    ///
2213    /// This tracks the lowest static file per segment together with the block range in that
2214    /// file. E.g. static file is batched in 500k block intervals then the lowest static file
2215    /// is [0..499K], and the block range is start = 0, end = 499K.
2216    ///
2217    /// This index is mainly used for history expiry, which targets transactions, e.g. pre-merge
2218    /// history expiry would lead to removing all static files below the merge height.
2219    min_block_range: Option<SegmentRangeInclusive>,
2220    /// Max static file block.
2221    max_block: u64,
2222    /// Expected static file block ranges indexed by max expected blocks.
2223    ///
2224    /// For example, a static file for expected block range `0..=499_000` may have only block range
2225    /// `0..=1000` contained in it, as it's not fully filled yet. This index maps the max expected
2226    /// block to the expected range, i.e. block `499_000` to block range `0..=499_000`.
2227    expected_block_ranges_by_max_block: SegmentRanges,
2228    /// Available on disk static file block ranges indexed by max transactions.
2229    ///
2230    /// For example, a static file for block range `0..=499_000` may only have block range
2231    /// `0..=1000` and transaction range `0..=2000` contained in it. This index maps the max
2232    /// available transaction to the available block range, i.e. transaction `2000` to block range
2233    /// `0..=1000`.
2234    available_block_ranges_by_max_tx: Option<SegmentRanges>,
2235}
2236
2237/// Helper trait to manage different [`StaticFileProviderRW`] of an `Arc<StaticFileProvider`
2238pub trait StaticFileWriter {
2239    /// The primitives type used by the static file provider.
2240    type Primitives: Send + Sync + 'static;
2241
2242    /// Returns a mutable reference to a [`StaticFileProviderRW`] of a [`StaticFileSegment`].
2243    fn get_writer(
2244        &self,
2245        block: BlockNumber,
2246        segment: StaticFileSegment,
2247    ) -> ProviderResult<StaticFileProviderRWRefMut<'_, Self::Primitives>>;
2248
2249    /// Returns a mutable reference to a [`StaticFileProviderRW`] of the latest
2250    /// [`StaticFileSegment`].
2251    fn latest_writer(
2252        &self,
2253        segment: StaticFileSegment,
2254    ) -> ProviderResult<StaticFileProviderRWRefMut<'_, Self::Primitives>>;
2255
2256    /// Commits all changes of all [`StaticFileProviderRW`] of all [`StaticFileSegment`].
2257    fn commit(&self) -> ProviderResult<()>;
2258
2259    /// Returns `true` if the static file provider has unwind queued.
2260    fn has_unwind_queued(&self) -> bool;
2261
2262    /// Finalizes all static file writers by committing their configuration to disk.
2263    ///
2264    /// Returns an error if prune is queued (use [`Self::commit`] instead).
2265    fn finalize(&self) -> ProviderResult<()>;
2266}
2267
2268impl<N: NodePrimitives> StaticFileWriter for StaticFileProvider<N> {
2269    type Primitives = N;
2270
2271    fn get_writer(
2272        &self,
2273        block: BlockNumber,
2274        segment: StaticFileSegment,
2275    ) -> ProviderResult<StaticFileProviderRWRefMut<'_, Self::Primitives>> {
2276        if self.access.is_read_only() {
2277            return Err(ProviderError::ReadOnlyStaticFileAccess);
2278        }
2279
2280        trace!(target: "providers::static_file", ?block, ?segment, "Getting static file writer.");
2281        self.writers.get_or_create(segment, || {
2282            StaticFileProviderRW::new(segment, block, Arc::downgrade(&self.0), self.metrics.clone())
2283        })
2284    }
2285
2286    fn latest_writer(
2287        &self,
2288        segment: StaticFileSegment,
2289    ) -> ProviderResult<StaticFileProviderRWRefMut<'_, Self::Primitives>> {
2290        let genesis_number = self.0.as_ref().genesis_block_number();
2291        self.get_writer(
2292            self.get_highest_static_file_block(segment).unwrap_or(genesis_number),
2293            segment,
2294        )
2295    }
2296
2297    fn commit(&self) -> ProviderResult<()> {
2298        self.writers.commit()
2299    }
2300
2301    fn has_unwind_queued(&self) -> bool {
2302        self.writers.has_unwind_queued()
2303    }
2304
2305    fn finalize(&self) -> ProviderResult<()> {
2306        self.writers.finalize()
2307    }
2308}
2309
2310impl<N: NodePrimitives> ChangeSetReader for StaticFileProvider<N> {
2311    fn account_block_changeset(
2312        &self,
2313        block_number: BlockNumber,
2314    ) -> ProviderResult<Vec<reth_db::models::AccountBeforeTx>> {
2315        let provider = match self.get_segment_provider_for_block(
2316            StaticFileSegment::AccountChangeSets,
2317            block_number,
2318            None,
2319        ) {
2320            Ok(provider) => provider,
2321            Err(ProviderError::MissingStaticFileBlock(_, _)) => return Ok(Vec::new()),
2322            Err(err) => return Err(err),
2323        };
2324
2325        if let Some(offset) = provider.read_changeset_offset(block_number)? {
2326            let mut cursor = provider.cursor()?;
2327            let mut changeset = Vec::with_capacity(offset.num_changes() as usize);
2328
2329            for i in offset.changeset_range() {
2330                if let Some(change) =
2331                    cursor.get_one::<reth_db::static_file::AccountChangesetMask>(i.into())?
2332                {
2333                    changeset.push(change)
2334                }
2335            }
2336            Ok(changeset)
2337        } else {
2338            Ok(Vec::new())
2339        }
2340    }
2341
2342    fn get_account_before_block(
2343        &self,
2344        block_number: BlockNumber,
2345        address: Address,
2346    ) -> ProviderResult<Option<reth_db::models::AccountBeforeTx>> {
2347        let provider = match self.get_segment_provider_for_block(
2348            StaticFileSegment::AccountChangeSets,
2349            block_number,
2350            None,
2351        ) {
2352            Ok(provider) => provider,
2353            Err(ProviderError::MissingStaticFileBlock(_, _)) => return Ok(None),
2354            Err(err) => return Err(err),
2355        };
2356
2357        let Some(offset) = provider.read_changeset_offset(block_number)? else {
2358            return Ok(None);
2359        };
2360
2361        let mut cursor = provider.cursor()?;
2362        let range = offset.changeset_range();
2363        let mut low = range.start;
2364        let mut high = range.end;
2365
2366        while low < high {
2367            let mid = low + (high - low) / 2;
2368            if let Some(change) =
2369                cursor.get_one::<reth_db::static_file::AccountChangesetMask>(mid.into())?
2370            {
2371                if change.address < address {
2372                    low = mid + 1;
2373                } else {
2374                    high = mid;
2375                }
2376            } else {
2377                // This is not expected but means we are out of the range / file somehow, and can't
2378                // continue
2379                debug!(
2380                    target: "providers::static_file",
2381                    ?low,
2382                    ?mid,
2383                    ?high,
2384                    ?range,
2385                    ?block_number,
2386                    ?address,
2387                    "Cannot continue binary search for account changeset fetch"
2388                );
2389                low = range.end;
2390                break;
2391            }
2392        }
2393
2394        if low < range.end &&
2395            let Some(change) = cursor
2396                .get_one::<reth_db::static_file::AccountChangesetMask>(low.into())?
2397                .filter(|change| change.address == address)
2398        {
2399            return Ok(Some(change));
2400        }
2401
2402        Ok(None)
2403    }
2404
2405    fn account_changesets_range(
2406        &self,
2407        range: impl core::ops::RangeBounds<BlockNumber>,
2408    ) -> ProviderResult<Vec<(BlockNumber, reth_db::models::AccountBeforeTx)>> {
2409        let range = self.bound_range(range, StaticFileSegment::AccountChangeSets);
2410        self.walk_account_changeset_range(range).collect()
2411    }
2412}
2413
2414impl<N: NodePrimitives> StorageChangeSetReader for StaticFileProvider<N> {
2415    fn storage_changeset(
2416        &self,
2417        block_number: BlockNumber,
2418    ) -> ProviderResult<Vec<(BlockNumberAddress, StorageEntry)>> {
2419        let provider = match self.get_segment_provider_for_block(
2420            StaticFileSegment::StorageChangeSets,
2421            block_number,
2422            None,
2423        ) {
2424            Ok(provider) => provider,
2425            Err(ProviderError::MissingStaticFileBlock(_, _)) => return Ok(Vec::new()),
2426            Err(err) => return Err(err),
2427        };
2428
2429        if let Some(offset) = provider.read_changeset_offset(block_number)? {
2430            let mut cursor = provider.cursor()?;
2431            let mut changeset = Vec::with_capacity(offset.num_changes() as usize);
2432
2433            for i in offset.changeset_range() {
2434                if let Some(change) = cursor.get_one::<StorageChangesetMask>(i.into())? {
2435                    let block_address = BlockNumberAddress((block_number, change.address));
2436                    let entry = StorageEntry { key: change.key, value: change.value };
2437                    changeset.push((block_address, entry));
2438                }
2439            }
2440            Ok(changeset)
2441        } else {
2442            Ok(Vec::new())
2443        }
2444    }
2445
2446    fn get_storage_before_block(
2447        &self,
2448        block_number: BlockNumber,
2449        address: Address,
2450        storage_key: B256,
2451    ) -> ProviderResult<Option<StorageEntry>> {
2452        let provider = match self.get_segment_provider_for_block(
2453            StaticFileSegment::StorageChangeSets,
2454            block_number,
2455            None,
2456        ) {
2457            Ok(provider) => provider,
2458            Err(ProviderError::MissingStaticFileBlock(_, _)) => return Ok(None),
2459            Err(err) => return Err(err),
2460        };
2461
2462        let Some(offset) = provider.read_changeset_offset(block_number)? else {
2463            return Ok(None);
2464        };
2465
2466        let mut cursor = provider.cursor()?;
2467        let range = offset.changeset_range();
2468        let mut low = range.start;
2469        let mut high = range.end;
2470
2471        while low < high {
2472            let mid = low + (high - low) / 2;
2473            if let Some(change) = cursor.get_one::<StorageChangesetMask>(mid.into())? {
2474                match (change.address, change.key).cmp(&(address, storage_key)) {
2475                    std::cmp::Ordering::Less => low = mid + 1,
2476                    _ => high = mid,
2477                }
2478            } else {
2479                debug!(
2480                    target: "providers::static_file",
2481                    ?low,
2482                    ?mid,
2483                    ?high,
2484                    ?range,
2485                    ?block_number,
2486                    ?address,
2487                    ?storage_key,
2488                    "Cannot continue binary search for storage changeset fetch"
2489                );
2490                low = range.end;
2491                break;
2492            }
2493        }
2494
2495        if low < range.end &&
2496            let Some(change) = cursor
2497                .get_one::<StorageChangesetMask>(low.into())?
2498                .filter(|change| change.address == address && change.key == storage_key)
2499        {
2500            return Ok(Some(StorageEntry { key: change.key, value: change.value }));
2501        }
2502
2503        Ok(None)
2504    }
2505
2506    fn storage_changesets_range(
2507        &self,
2508        range: impl RangeBounds<BlockNumber>,
2509    ) -> ProviderResult<Vec<(BlockNumberAddress, StorageEntry)>> {
2510        let range = self.bound_range(range, StaticFileSegment::StorageChangeSets);
2511        self.walk_storage_changeset_range(range).collect()
2512    }
2513}
2514
2515impl<N: NodePrimitives> StaticFileProvider<N> {
2516    /// Creates an iterator for walking through account changesets in the specified block range.
2517    ///
2518    /// This returns a lazy iterator that fetches changesets block by block to avoid loading
2519    /// everything into memory at once.
2520    ///
2521    /// Accepts any range type that implements `RangeBounds<BlockNumber>`, including:
2522    /// - `Range<BlockNumber>` (e.g., `0..100`)
2523    /// - `RangeInclusive<BlockNumber>` (e.g., `0..=99`)
2524    /// - `RangeFrom<BlockNumber>` (e.g., `0..`) - iterates until exhausted
2525    pub fn walk_account_changeset_range(
2526        &self,
2527        range: impl RangeBounds<BlockNumber>,
2528    ) -> StaticFileAccountChangesetWalker<Self> {
2529        StaticFileAccountChangesetWalker::new(self.clone(), range)
2530    }
2531
2532    /// Creates an iterator for walking through storage changesets in the specified block range.
2533    pub fn walk_storage_changeset_range(
2534        &self,
2535        range: impl RangeBounds<BlockNumber>,
2536    ) -> StaticFileStorageChangesetWalker<Self> {
2537        StaticFileStorageChangesetWalker::new(self.clone(), range)
2538    }
2539}
2540
2541impl<N: NodePrimitives<BlockHeader: Value>> HeaderProvider for StaticFileProvider<N> {
2542    type Header = N::BlockHeader;
2543
2544    fn header(&self, block_hash: BlockHash) -> ProviderResult<Option<Self::Header>> {
2545        self.find_static_file(StaticFileSegment::Headers, |jar_provider| {
2546            Ok(jar_provider
2547                .cursor()?
2548                .get_two::<HeaderWithHashMask<Self::Header>>((&block_hash).into())?
2549                .and_then(|(header, hash)| {
2550                    if hash == block_hash {
2551                        return Some(header);
2552                    }
2553                    None
2554                }))
2555        })
2556    }
2557
2558    fn header_by_number(&self, num: BlockNumber) -> ProviderResult<Option<Self::Header>> {
2559        self.get_segment_provider_for_block(StaticFileSegment::Headers, num, None)
2560            .and_then(|provider| provider.header_by_number(num))
2561            .or_else(|err| {
2562                if let ProviderError::MissingStaticFileBlock(_, _) = err {
2563                    Ok(None)
2564                } else {
2565                    Err(err)
2566                }
2567            })
2568    }
2569
2570    fn headers_range(
2571        &self,
2572        range: impl RangeBounds<BlockNumber>,
2573    ) -> ProviderResult<Vec<Self::Header>> {
2574        self.fetch_range_with_predicate(
2575            StaticFileSegment::Headers,
2576            to_range(range),
2577            |cursor, number| cursor.get_one::<HeaderMask<Self::Header>>(number.into()),
2578            |_| true,
2579        )
2580    }
2581
2582    fn sealed_header(
2583        &self,
2584        num: BlockNumber,
2585    ) -> ProviderResult<Option<SealedHeader<Self::Header>>> {
2586        self.get_segment_provider_for_block(StaticFileSegment::Headers, num, None)
2587            .and_then(|provider| provider.sealed_header(num))
2588            .or_else(|err| {
2589                if let ProviderError::MissingStaticFileBlock(_, _) = err {
2590                    Ok(None)
2591                } else {
2592                    Err(err)
2593                }
2594            })
2595    }
2596
2597    fn sealed_headers_while(
2598        &self,
2599        range: impl RangeBounds<BlockNumber>,
2600        predicate: impl FnMut(&SealedHeader<Self::Header>) -> bool,
2601    ) -> ProviderResult<Vec<SealedHeader<Self::Header>>> {
2602        self.fetch_range_with_predicate(
2603            StaticFileSegment::Headers,
2604            to_range(range),
2605            |cursor, number| {
2606                Ok(cursor
2607                    .get_two::<HeaderWithHashMask<Self::Header>>(number.into())?
2608                    .map(|(header, hash)| SealedHeader::new(header, hash)))
2609            },
2610            predicate,
2611        )
2612    }
2613}
2614
2615impl<N: NodePrimitives> BlockHashReader for StaticFileProvider<N> {
2616    fn block_hash(&self, num: u64) -> ProviderResult<Option<B256>> {
2617        self.get_segment_provider_for_block(StaticFileSegment::Headers, num, None)
2618            .and_then(|provider| provider.block_hash(num))
2619            .or_else(|err| {
2620                if let ProviderError::MissingStaticFileBlock(_, _) = err {
2621                    Ok(None)
2622                } else {
2623                    Err(err)
2624                }
2625            })
2626    }
2627
2628    fn canonical_hashes_range(
2629        &self,
2630        start: BlockNumber,
2631        end: BlockNumber,
2632    ) -> ProviderResult<Vec<B256>> {
2633        self.fetch_range_with_predicate(
2634            StaticFileSegment::Headers,
2635            start..end,
2636            |cursor, number| cursor.get_one::<BlockHashMask>(number.into()),
2637            |_| true,
2638        )
2639    }
2640}
2641
2642impl<N: NodePrimitives<SignedTx: Value + SignedTransaction, Receipt: Value>> ReceiptProvider
2643    for StaticFileProvider<N>
2644{
2645    type Receipt = N::Receipt;
2646
2647    fn receipt(&self, num: TxNumber) -> ProviderResult<Option<Self::Receipt>> {
2648        self.get_segment_provider_for_transaction(StaticFileSegment::Receipts, num, None)
2649            .and_then(|provider| provider.receipt(num))
2650            .or_else(|err| {
2651                if let ProviderError::MissingStaticFileTx(_, _) = err {
2652                    Ok(None)
2653                } else {
2654                    Err(err)
2655                }
2656            })
2657    }
2658
2659    fn receipt_by_hash(&self, hash: TxHash) -> ProviderResult<Option<Self::Receipt>> {
2660        if let Some(num) = self.transaction_id(hash)? {
2661            return self.receipt(num);
2662        }
2663        Ok(None)
2664    }
2665
2666    fn receipts_by_block(
2667        &self,
2668        _block: BlockHashOrNumber,
2669    ) -> ProviderResult<Option<Vec<Self::Receipt>>> {
2670        unreachable!()
2671    }
2672
2673    fn receipts_by_tx_range(
2674        &self,
2675        range: impl RangeBounds<TxNumber>,
2676    ) -> ProviderResult<Vec<Self::Receipt>> {
2677        self.fetch_range_with_predicate(
2678            StaticFileSegment::Receipts,
2679            to_range(range),
2680            |cursor, number| cursor.get_one::<ReceiptMask<Self::Receipt>>(number.into()),
2681            |_| true,
2682        )
2683    }
2684
2685    fn receipts_by_block_range(
2686        &self,
2687        _block_range: RangeInclusive<BlockNumber>,
2688    ) -> ProviderResult<Vec<Vec<Self::Receipt>>> {
2689        Err(ProviderError::UnsupportedProvider)
2690    }
2691}
2692
2693impl<N: NodePrimitives<SignedTx: Value, Receipt: Value, BlockHeader: Value>> TransactionsProviderExt
2694    for StaticFileProvider<N>
2695{
2696    fn transaction_hashes_by_range(
2697        &self,
2698        tx_range: Range<TxNumber>,
2699    ) -> ProviderResult<Vec<(TxHash, TxNumber)>> {
2700        let tx_range_size = (tx_range.end - tx_range.start) as usize;
2701
2702        // Transactions are different size, so chunks will not all take the same processing time. If
2703        // chunks are too big, there will be idle threads waiting for work. Choosing an
2704        // arbitrary smaller value to make sure it doesn't happen.
2705        let chunk_size = 100;
2706
2707        // iterator over the chunks
2708        let chunks = tx_range
2709            .clone()
2710            .step_by(chunk_size)
2711            .map(|start| start..std::cmp::min(start + chunk_size as u64, tx_range.end));
2712        let mut channels = Vec::with_capacity(tx_range_size.div_ceil(chunk_size));
2713
2714        for chunk_range in chunks {
2715            let (channel_tx, channel_rx) = mpsc::channel();
2716            channels.push(channel_rx);
2717
2718            let manager = self.clone();
2719
2720            // Spawn the task onto the global rayon pool
2721            // This task will send the cached transaction hash through the channel.
2722            rayon::spawn(move || {
2723                let _ = manager.fetch_range_with_predicate(
2724                    StaticFileSegment::Transactions,
2725                    chunk_range,
2726                    |cursor, number| {
2727                        Ok(cursor
2728                            .get_one::<TransactionMask<Self::Transaction>>(number.into())?
2729                            .map(|transaction| {
2730                                let _ = channel_tx.send(transaction_hash((number, transaction)));
2731                            }))
2732                    },
2733                    |_| true,
2734                );
2735            });
2736        }
2737
2738        let mut tx_list = Vec::with_capacity(tx_range_size);
2739
2740        // Iterate over channels and append the tx hashes unsorted
2741        for channel in channels {
2742            while let Ok(tx) = channel.recv() {
2743                let (tx_hash, tx_id) = tx.map_err(|boxed| *boxed)?;
2744                tx_list.push((tx_hash, tx_id));
2745            }
2746        }
2747
2748        Ok(tx_list)
2749    }
2750}
2751
2752impl<N: NodePrimitives<SignedTx: Decompress + SignedTransaction>> TransactionsProvider
2753    for StaticFileProvider<N>
2754{
2755    type Transaction = N::SignedTx;
2756
2757    fn transaction_id(&self, tx_hash: TxHash) -> ProviderResult<Option<TxNumber>> {
2758        self.find_static_file(StaticFileSegment::Transactions, |jar_provider| {
2759            let mut cursor = jar_provider.cursor()?;
2760            if cursor
2761                .get_one::<TransactionMask<Self::Transaction>>((&tx_hash).into())?
2762                .and_then(|tx| (*tx.tx_hash() == tx_hash).then_some(tx))
2763                .is_some()
2764            {
2765                Ok(cursor.number())
2766            } else {
2767                Ok(None)
2768            }
2769        })
2770    }
2771
2772    fn transaction_by_id(&self, num: TxNumber) -> ProviderResult<Option<Self::Transaction>> {
2773        self.get_segment_provider_for_transaction(StaticFileSegment::Transactions, num, None)
2774            .and_then(|provider| provider.transaction_by_id(num))
2775            .or_else(|err| {
2776                if let ProviderError::MissingStaticFileTx(_, _) = err {
2777                    Ok(None)
2778                } else {
2779                    Err(err)
2780                }
2781            })
2782    }
2783
2784    fn transaction_by_id_unhashed(
2785        &self,
2786        num: TxNumber,
2787    ) -> ProviderResult<Option<Self::Transaction>> {
2788        self.get_segment_provider_for_transaction(StaticFileSegment::Transactions, num, None)
2789            .and_then(|provider| provider.transaction_by_id_unhashed(num))
2790            .or_else(|err| {
2791                if let ProviderError::MissingStaticFileTx(_, _) = err {
2792                    Ok(None)
2793                } else {
2794                    Err(err)
2795                }
2796            })
2797    }
2798
2799    fn transaction_by_hash(&self, hash: TxHash) -> ProviderResult<Option<Self::Transaction>> {
2800        self.find_static_file(StaticFileSegment::Transactions, |jar_provider| {
2801            Ok(jar_provider
2802                .cursor()?
2803                .get_one::<TransactionMask<Self::Transaction>>((&hash).into())?
2804                .and_then(|tx| (*tx.tx_hash() == hash).then_some(tx)))
2805        })
2806    }
2807
2808    fn transaction_by_hash_with_meta(
2809        &self,
2810        _hash: TxHash,
2811    ) -> ProviderResult<Option<(Self::Transaction, TransactionMeta)>> {
2812        // Required data not present in static_files
2813        Err(ProviderError::UnsupportedProvider)
2814    }
2815
2816    fn transactions_by_block(
2817        &self,
2818        _block_id: BlockHashOrNumber,
2819    ) -> ProviderResult<Option<Vec<Self::Transaction>>> {
2820        // Required data not present in static_files
2821        Err(ProviderError::UnsupportedProvider)
2822    }
2823
2824    fn transactions_by_block_range(
2825        &self,
2826        _range: impl RangeBounds<BlockNumber>,
2827    ) -> ProviderResult<Vec<Vec<Self::Transaction>>> {
2828        // Required data not present in static_files
2829        Err(ProviderError::UnsupportedProvider)
2830    }
2831
2832    fn transactions_by_tx_range(
2833        &self,
2834        range: impl RangeBounds<TxNumber>,
2835    ) -> ProviderResult<Vec<Self::Transaction>> {
2836        self.fetch_range_with_predicate(
2837            StaticFileSegment::Transactions,
2838            to_range(range),
2839            |cursor, number| cursor.get_one::<TransactionMask<Self::Transaction>>(number.into()),
2840            |_| true,
2841        )
2842    }
2843
2844    fn senders_by_tx_range(
2845        &self,
2846        range: impl RangeBounds<TxNumber>,
2847    ) -> ProviderResult<Vec<Address>> {
2848        self.fetch_range_with_predicate(
2849            StaticFileSegment::TransactionSenders,
2850            to_range(range),
2851            |cursor, number| cursor.get_one::<TransactionSenderMask>(number.into()),
2852            |_| true,
2853        )
2854    }
2855
2856    fn transaction_sender(&self, id: TxNumber) -> ProviderResult<Option<Address>> {
2857        self.get_segment_provider_for_transaction(StaticFileSegment::TransactionSenders, id, None)
2858            .and_then(|provider| provider.transaction_sender(id))
2859            .or_else(|err| {
2860                if let ProviderError::MissingStaticFileTx(_, _) = err {
2861                    Ok(None)
2862                } else {
2863                    Err(err)
2864                }
2865            })
2866    }
2867}
2868
2869impl<N: NodePrimitives> BlockNumReader for StaticFileProvider<N> {
2870    fn chain_info(&self) -> ProviderResult<ChainInfo> {
2871        // Required data not present in static_files
2872        Err(ProviderError::UnsupportedProvider)
2873    }
2874
2875    fn best_block_number(&self) -> ProviderResult<BlockNumber> {
2876        // Required data not present in static_files
2877        Err(ProviderError::UnsupportedProvider)
2878    }
2879
2880    fn last_block_number(&self) -> ProviderResult<BlockNumber> {
2881        Ok(self.get_highest_static_file_block(StaticFileSegment::Headers).unwrap_or_default())
2882    }
2883
2884    fn block_number(&self, _hash: B256) -> ProviderResult<Option<BlockNumber>> {
2885        // Required data not present in static_files
2886        Err(ProviderError::UnsupportedProvider)
2887    }
2888}
2889
2890/* Cannot be successfully implemented but must exist for trait requirements */
2891
2892impl<N: NodePrimitives<SignedTx: Value, Receipt: Value, BlockHeader: Value>> BlockReader
2893    for StaticFileProvider<N>
2894{
2895    type Block = N::Block;
2896
2897    fn find_block_by_hash(
2898        &self,
2899        _hash: B256,
2900        _source: BlockSource,
2901    ) -> ProviderResult<Option<Self::Block>> {
2902        // Required data not present in static_files
2903        Err(ProviderError::UnsupportedProvider)
2904    }
2905
2906    fn block(&self, _id: BlockHashOrNumber) -> ProviderResult<Option<Self::Block>> {
2907        // Required data not present in static_files
2908        Err(ProviderError::UnsupportedProvider)
2909    }
2910
2911    fn pending_block(&self) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
2912        // Required data not present in static_files
2913        Err(ProviderError::UnsupportedProvider)
2914    }
2915
2916    fn pending_block_and_receipts(
2917        &self,
2918    ) -> ProviderResult<Option<(RecoveredBlock<Self::Block>, Vec<Self::Receipt>)>> {
2919        // Required data not present in static_files
2920        Err(ProviderError::UnsupportedProvider)
2921    }
2922
2923    fn recovered_block(
2924        &self,
2925        _id: BlockHashOrNumber,
2926        _transaction_kind: TransactionVariant,
2927    ) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
2928        // Required data not present in static_files
2929        Err(ProviderError::UnsupportedProvider)
2930    }
2931
2932    fn sealed_block_with_senders(
2933        &self,
2934        _id: BlockHashOrNumber,
2935        _transaction_kind: TransactionVariant,
2936    ) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
2937        // Required data not present in static_files
2938        Err(ProviderError::UnsupportedProvider)
2939    }
2940
2941    fn block_range(&self, _range: RangeInclusive<BlockNumber>) -> ProviderResult<Vec<Self::Block>> {
2942        // Required data not present in static_files
2943        Err(ProviderError::UnsupportedProvider)
2944    }
2945
2946    fn block_with_senders_range(
2947        &self,
2948        _range: RangeInclusive<BlockNumber>,
2949    ) -> ProviderResult<Vec<RecoveredBlock<Self::Block>>> {
2950        Err(ProviderError::UnsupportedProvider)
2951    }
2952
2953    fn recovered_block_range(
2954        &self,
2955        _range: RangeInclusive<BlockNumber>,
2956    ) -> ProviderResult<Vec<RecoveredBlock<Self::Block>>> {
2957        Err(ProviderError::UnsupportedProvider)
2958    }
2959
2960    fn block_by_transaction_id(&self, _id: TxNumber) -> ProviderResult<Option<BlockNumber>> {
2961        Err(ProviderError::UnsupportedProvider)
2962    }
2963}
2964
2965impl<N: NodePrimitives> BlockBodyIndicesProvider for StaticFileProvider<N> {
2966    fn block_body_indices(&self, _num: u64) -> ProviderResult<Option<StoredBlockBodyIndices>> {
2967        Err(ProviderError::UnsupportedProvider)
2968    }
2969
2970    fn block_body_indices_range(
2971        &self,
2972        _range: RangeInclusive<BlockNumber>,
2973    ) -> ProviderResult<Vec<StoredBlockBodyIndices>> {
2974        Err(ProviderError::UnsupportedProvider)
2975    }
2976}
2977
2978impl<N: NodePrimitives> StatsReader for StaticFileProvider<N> {
2979    fn count_entries<T: Table>(&self) -> ProviderResult<usize> {
2980        match T::NAME {
2981            tables::CanonicalHeaders::NAME |
2982            tables::Headers::<Header>::NAME |
2983            tables::HeaderTerminalDifficulties::NAME => Ok(self
2984                .get_highest_static_file_block(StaticFileSegment::Headers)
2985                .map(|block| block + 1)
2986                .unwrap_or_default()
2987                as usize),
2988            tables::Receipts::<Receipt>::NAME => Ok(self
2989                .get_highest_static_file_tx(StaticFileSegment::Receipts)
2990                .map(|receipts| receipts + 1)
2991                .unwrap_or_default() as usize),
2992            tables::Transactions::<TransactionSigned>::NAME => Ok(self
2993                .get_highest_static_file_tx(StaticFileSegment::Transactions)
2994                .map(|txs| txs + 1)
2995                .unwrap_or_default()
2996                as usize),
2997            tables::TransactionSenders::NAME => Ok(self
2998                .get_highest_static_file_tx(StaticFileSegment::TransactionSenders)
2999                .map(|txs| txs + 1)
3000                .unwrap_or_default() as usize),
3001            _ => Err(ProviderError::UnsupportedProvider),
3002        }
3003    }
3004}
3005
3006/// Returns the tx hash for the given transaction and its id.
3007#[inline]
3008fn transaction_hash<T>(entry: (TxNumber, T)) -> Result<(B256, TxNumber), Box<ProviderError>>
3009where
3010    T: TxHashRef,
3011{
3012    let (tx_id, tx) = entry;
3013    Ok((*tx.tx_hash(), tx_id))
3014}
3015
3016#[cfg(test)]
3017mod tests {
3018    use std::collections::BTreeMap;
3019
3020    use reth_chain_state::EthPrimitives;
3021    use reth_db::test_utils::create_test_static_files_dir;
3022    use reth_static_file_types::{SegmentRangeInclusive, StaticFileSegment};
3023
3024    use crate::{providers::StaticFileProvider, StaticFileProviderBuilder};
3025
3026    #[test]
3027    fn test_find_fixed_range_with_block_index() -> eyre::Result<()> {
3028        let (static_dir, _) = create_test_static_files_dir();
3029        let sf_rw: StaticFileProvider<EthPrimitives> =
3030            StaticFileProviderBuilder::read_write(&static_dir).with_blocks_per_file(100).build()?;
3031
3032        let segment = StaticFileSegment::Headers;
3033
3034        // Test with None - should use default behavior
3035        assert_eq!(
3036            sf_rw.find_fixed_range_with_block_index(segment, None, 0),
3037            SegmentRangeInclusive::new(0, 99)
3038        );
3039        assert_eq!(
3040            sf_rw.find_fixed_range_with_block_index(segment, None, 250),
3041            SegmentRangeInclusive::new(200, 299)
3042        );
3043
3044        // Test with empty index - should fall back to default behavior
3045        assert_eq!(
3046            sf_rw.find_fixed_range_with_block_index(segment, Some(&BTreeMap::new()), 150),
3047            SegmentRangeInclusive::new(100, 199)
3048        );
3049
3050        // Create block index with existing ranges
3051        let block_index = BTreeMap::from_iter([
3052            (99, SegmentRangeInclusive::new(0, 99)),
3053            (199, SegmentRangeInclusive::new(100, 199)),
3054            (299, SegmentRangeInclusive::new(200, 299)),
3055        ]);
3056
3057        // Test blocks within existing ranges - should return the matching range
3058        assert_eq!(
3059            sf_rw.find_fixed_range_with_block_index(segment, Some(&block_index), 0),
3060            SegmentRangeInclusive::new(0, 99)
3061        );
3062        assert_eq!(
3063            sf_rw.find_fixed_range_with_block_index(segment, Some(&block_index), 50),
3064            SegmentRangeInclusive::new(0, 99)
3065        );
3066        assert_eq!(
3067            sf_rw.find_fixed_range_with_block_index(segment, Some(&block_index), 99),
3068            SegmentRangeInclusive::new(0, 99)
3069        );
3070        assert_eq!(
3071            sf_rw.find_fixed_range_with_block_index(segment, Some(&block_index), 100),
3072            SegmentRangeInclusive::new(100, 199)
3073        );
3074        assert_eq!(
3075            sf_rw.find_fixed_range_with_block_index(segment, Some(&block_index), 150),
3076            SegmentRangeInclusive::new(100, 199)
3077        );
3078        assert_eq!(
3079            sf_rw.find_fixed_range_with_block_index(segment, Some(&block_index), 199),
3080            SegmentRangeInclusive::new(100, 199)
3081        );
3082
3083        // Test blocks beyond existing ranges - should derive new ranges from the last range
3084        // Block 300 is exactly one segment after the last range
3085        assert_eq!(
3086            sf_rw.find_fixed_range_with_block_index(segment, Some(&block_index), 300),
3087            SegmentRangeInclusive::new(300, 399)
3088        );
3089        assert_eq!(
3090            sf_rw.find_fixed_range_with_block_index(segment, Some(&block_index), 350),
3091            SegmentRangeInclusive::new(300, 399)
3092        );
3093
3094        // Block 500 skips one segment (300-399)
3095        assert_eq!(
3096            sf_rw.find_fixed_range_with_block_index(segment, Some(&block_index), 500),
3097            SegmentRangeInclusive::new(500, 599)
3098        );
3099
3100        // Block 1000 skips many segments
3101        assert_eq!(
3102            sf_rw.find_fixed_range_with_block_index(segment, Some(&block_index), 1000),
3103            SegmentRangeInclusive::new(1000, 1099)
3104        );
3105
3106        // Test with block index having different sizes than blocks_per_file setting
3107        // This simulates the scenario where blocks_per_file was changed between runs
3108        let mixed_size_index = BTreeMap::from_iter([
3109            (49, SegmentRangeInclusive::new(0, 49)),     // 50 blocks
3110            (149, SegmentRangeInclusive::new(50, 149)),  // 100 blocks
3111            (349, SegmentRangeInclusive::new(150, 349)), // 200 blocks
3112        ]);
3113
3114        // Blocks within existing ranges should return those ranges regardless of size
3115        assert_eq!(
3116            sf_rw.find_fixed_range_with_block_index(segment, Some(&mixed_size_index), 25),
3117            SegmentRangeInclusive::new(0, 49)
3118        );
3119        assert_eq!(
3120            sf_rw.find_fixed_range_with_block_index(segment, Some(&mixed_size_index), 100),
3121            SegmentRangeInclusive::new(50, 149)
3122        );
3123        assert_eq!(
3124            sf_rw.find_fixed_range_with_block_index(segment, Some(&mixed_size_index), 200),
3125            SegmentRangeInclusive::new(150, 349)
3126        );
3127
3128        // Block after the last range should derive using current blocks_per_file (100)
3129        // from the end of the last range (349)
3130        assert_eq!(
3131            sf_rw.find_fixed_range_with_block_index(segment, Some(&mixed_size_index), 350),
3132            SegmentRangeInclusive::new(350, 449)
3133        );
3134        assert_eq!(
3135            sf_rw.find_fixed_range_with_block_index(segment, Some(&mixed_size_index), 450),
3136            SegmentRangeInclusive::new(450, 549)
3137        );
3138        assert_eq!(
3139            sf_rw.find_fixed_range_with_block_index(segment, Some(&mixed_size_index), 550),
3140            SegmentRangeInclusive::new(550, 649)
3141        );
3142
3143        Ok(())
3144    }
3145}