Skip to main content

reth_provider/providers/static_file/
manager.rs

1use super::{
2    metrics::StaticFileProviderMetrics, writer::StaticFileWriters, LoadedJar,
3    StaticFileJarProvider, StaticFileProviderRW, StaticFileProviderRWRefMut,
4};
5use crate::{
6    changeset_walker::{StaticFileAccountChangesetWalker, StaticFileStorageChangesetWalker},
7    to_range, BlockHashReader, BlockNumReader, BlockReader, BlockSource, EitherWriter,
8    EitherWriterDestination, HeaderProvider, ReceiptProvider, StageCheckpointReader, StatsReader,
9    TransactionVariant, TransactionsProvider, TransactionsProviderExt,
10};
11use alloy_consensus::{transaction::TransactionMeta, Header};
12use alloy_eips::{eip2718::Encodable2718, BlockHashOrNumber};
13use alloy_primitives::{b256, keccak256, Address, BlockHash, BlockNumber, TxHash, TxNumber, B256};
14
15use parking_lot::RwLock;
16use reth_chain_state::ExecutedBlock;
17use reth_chainspec::{ChainInfo, ChainSpecProvider, EthChainSpec, NamedChain};
18use reth_db::{
19    lockfile::StorageLock,
20    static_file::{
21        iter_static_files, BlockHashMask, HeaderMask, HeaderWithHashMask, ReceiptMask,
22        StaticFileCursor, StorageChangesetMask, TransactionMask, TransactionSenderMask,
23    },
24};
25use reth_db_api::{
26    cursor::DbCursorRO,
27    models::{AccountBeforeTx, BlockNumberAddress, StorageBeforeTx, StoredBlockBodyIndices},
28    table::{Decompress, Table, Value},
29    tables,
30    transaction::DbTx,
31};
32use reth_ethereum_primitives::{Receipt, TransactionSigned};
33use reth_nippy_jar::{NippyJar, NippyJarChecker};
34use reth_node_types::NodePrimitives;
35use reth_primitives_traits::{
36    dashmap::DashMap, AlloyBlockHeader as _, BlockBody as _, RecoveredBlock, SealedHeader,
37    SignedTransaction, StorageEntry,
38};
39use reth_prune_types::PruneSegment;
40use reth_stages_types::PipelineTarget;
41use reth_static_file_types::{
42    find_fixed_range, HighestStaticFiles, SegmentHeader, SegmentRangeInclusive, StaticFileMap,
43    StaticFileSegment, DEFAULT_BLOCKS_PER_STATIC_FILE,
44};
45use reth_storage_api::{
46    BlockBodyIndicesProvider, ChangeSetReader, DBProvider, PruneCheckpointReader,
47    StorageChangeSetReader, StorageSettingsCache,
48};
49use reth_storage_errors::provider::{ProviderError, ProviderResult, StaticFileWriterError};
50use std::{
51    collections::BTreeMap,
52    fmt::Debug,
53    ops::{Bound, Deref, Range, RangeBounds, RangeInclusive},
54    path::{Path, PathBuf},
55    sync::{atomic::AtomicU64, mpsc, Arc},
56};
57use tracing::{debug, info, info_span, instrument, trace, warn};
58
59/// Alias type for a map that can be queried for block or transaction ranges. It uses `u64` to
60/// represent either a block or a transaction number end of a static file range.
61type SegmentRanges = BTreeMap<u64, SegmentRangeInclusive>;
62
63/// Access mode on a static file provider. RO/RW.
64#[derive(Debug, Default, PartialEq, Eq)]
65pub enum StaticFileAccess {
66    /// Read-only access.
67    #[default]
68    RO,
69    /// Read-write access.
70    RW,
71}
72
73impl StaticFileAccess {
74    /// Returns `true` if read-only access.
75    pub const fn is_read_only(&self) -> bool {
76        matches!(self, Self::RO)
77    }
78
79    /// Returns `true` if read-write access.
80    pub const fn is_read_write(&self) -> bool {
81        matches!(self, Self::RW)
82    }
83}
84
85/// Context for static file block writes.
86///
87/// Contains target segments and pruning configuration.
88#[derive(Debug, Clone, Copy, Default)]
89pub struct StaticFileWriteCtx {
90    /// Whether transaction senders should be written to static files.
91    pub write_senders: bool,
92    /// Whether receipts should be written to static files.
93    pub write_receipts: bool,
94    /// Whether account changesets should be written to static files.
95    pub write_account_changesets: bool,
96    /// Whether storage changesets should be written to static files.
97    pub write_storage_changesets: bool,
98    /// The current chain tip block number (for pruning).
99    pub tip: BlockNumber,
100    /// The prune mode for receipts, if any.
101    pub receipts_prune_mode: Option<reth_prune_types::PruneMode>,
102    /// Whether receipts are prunable (based on storage settings and prune distance).
103    pub receipts_prunable: bool,
104}
105
106/// [`StaticFileProvider`] manages all existing [`StaticFileJarProvider`].
107///
108/// "Static files" contain immutable chain history data, such as:
109///  - transactions
110///  - headers
111///  - receipts
112///
113/// This provider type is responsible for reading and writing to static files.
114#[derive(Debug)]
115pub struct StaticFileProvider<N>(pub(crate) Arc<StaticFileProviderInner<N>>);
116
117impl<N> Clone for StaticFileProvider<N> {
118    fn clone(&self) -> Self {
119        Self(self.0.clone())
120    }
121}
122
123/// Builder for [`StaticFileProvider`] that allows configuration before initialization.
124#[derive(Debug)]
125pub struct StaticFileProviderBuilder<P> {
126    access: StaticFileAccess,
127    use_metrics: bool,
128    blocks_per_file: StaticFileMap<u64>,
129    path: P,
130    genesis_block_number: u64,
131}
132
133impl<P: AsRef<Path>> StaticFileProviderBuilder<P> {
134    /// Creates a new builder with read-write access.
135    pub fn read_write(path: P) -> Self {
136        Self {
137            path,
138            access: StaticFileAccess::RW,
139            blocks_per_file: Default::default(),
140            use_metrics: false,
141            genesis_block_number: 0,
142        }
143    }
144
145    /// Creates a new builder with read-only access.
146    pub fn read_only(path: P) -> Self {
147        Self {
148            path,
149            access: StaticFileAccess::RO,
150            blocks_per_file: Default::default(),
151            use_metrics: false,
152            genesis_block_number: 0,
153        }
154    }
155
156    /// Set custom blocks per file for specific segments.
157    ///
158    /// Each static file segment is stored across multiple files, and each of these files contains
159    /// up to the specified number of blocks of data. When the file gets full, a new file is
160    /// created with the new block range.
161    ///
162    /// This setting affects the size of each static file, and can be set per segment.
163    ///
164    /// If it is changed for an existing node, existing static files will not be affected and will
165    /// be finished with the old blocks per file setting, but new static files will use the new
166    /// setting.
167    pub fn with_blocks_per_file_for_segments(
168        mut self,
169        segments: &<StaticFileMap<u64> as Deref>::Target,
170    ) -> Self {
171        for (segment, &blocks_per_file) in segments {
172            self.blocks_per_file.insert(segment, blocks_per_file);
173        }
174        self
175    }
176
177    /// Set a custom number of blocks per file for all segments.
178    pub fn with_blocks_per_file(mut self, blocks_per_file: u64) -> Self {
179        for segment in StaticFileSegment::iter() {
180            self.blocks_per_file.insert(segment, blocks_per_file);
181        }
182        self
183    }
184
185    /// Set a custom number of blocks per file for a specific segment.
186    pub fn with_blocks_per_file_for_segment(
187        mut self,
188        segment: StaticFileSegment,
189        blocks_per_file: u64,
190    ) -> Self {
191        self.blocks_per_file.insert(segment, blocks_per_file);
192        self
193    }
194
195    /// Enables metrics on the [`StaticFileProvider`].
196    pub const fn with_metrics(mut self) -> Self {
197        self.use_metrics = true;
198        self
199    }
200
201    /// Sets the genesis block number for the [`StaticFileProvider`].
202    ///
203    /// This configures the genesis block number, which is used to determine the starting point
204    /// for block indexing and querying operations.
205    ///
206    /// # Arguments
207    ///
208    /// * `genesis_block_number` - The block number of the genesis block.
209    ///
210    /// # Returns
211    ///
212    /// Returns `Self` to allow method chaining.
213    pub const fn with_genesis_block_number(mut self, genesis_block_number: u64) -> Self {
214        self.genesis_block_number = genesis_block_number;
215        self
216    }
217
218    /// Builds the final [`StaticFileProvider`] and initializes the index.
219    pub fn build<N: NodePrimitives>(self) -> ProviderResult<StaticFileProvider<N>> {
220        let mut provider = StaticFileProviderInner::new(self.path, self.access)?;
221        if self.use_metrics {
222            provider.metrics = Some(Arc::new(StaticFileProviderMetrics::default()));
223        }
224
225        for (segment, blocks_per_file) in *self.blocks_per_file {
226            provider.blocks_per_file.insert(segment, blocks_per_file);
227        }
228        provider.genesis_block_number = self.genesis_block_number;
229
230        let provider = StaticFileProvider(Arc::new(provider));
231        provider.initialize_index()?;
232        Ok(provider)
233    }
234}
235
236impl<N: NodePrimitives> StaticFileProvider<N> {
237    /// Creates a new [`StaticFileProvider`] with the given [`StaticFileAccess`].
238    fn new(path: impl AsRef<Path>, access: StaticFileAccess) -> ProviderResult<Self> {
239        let provider = Self(Arc::new(StaticFileProviderInner::new(path, access)?));
240        provider.initialize_index()?;
241        Ok(provider)
242    }
243}
244
245impl<N: NodePrimitives> StaticFileProvider<N> {
246    /// Creates a new [`StaticFileProvider`] with read-only access.
247    ///
248    /// The caller is responsible for calling [`StaticFileProvider::initialize_index`] when
249    /// underlying data changes.
250    pub fn read_only(path: impl AsRef<Path>) -> ProviderResult<Self> {
251        Self::new(path, StaticFileAccess::RO)
252    }
253
254    /// Creates a new [`StaticFileProvider`] with read-write access.
255    pub fn read_write(path: impl AsRef<Path>) -> ProviderResult<Self> {
256        Self::new(path, StaticFileAccess::RW)
257    }
258}
259
260impl<N: NodePrimitives> Deref for StaticFileProvider<N> {
261    type Target = StaticFileProviderInner<N>;
262
263    fn deref(&self) -> &Self::Target {
264        &self.0
265    }
266}
267
268/// [`StaticFileProviderInner`] manages all existing [`StaticFileJarProvider`].
269#[derive(Debug)]
270pub struct StaticFileProviderInner<N> {
271    /// Maintains a map which allows for concurrent access to different `NippyJars`, over different
272    /// segments and ranges.
273    map: DashMap<(BlockNumber, StaticFileSegment), LoadedJar>,
274    /// Indexes per segment.
275    indexes: RwLock<StaticFileMap<StaticFileSegmentIndex>>,
276    /// This is an additional index that tracks the expired height, this will track the highest
277    /// block number that has been expired (missing). The first, non expired block is
278    /// `expired_history_height + 1`.
279    ///
280    /// This is effectively the transaction range that has been expired:
281    /// [`StaticFileProvider::delete_segment_below_block`] and mirrors
282    /// `static_files_min_block[transactions] - blocks_per_file`.
283    ///
284    /// This additional tracker exists for more efficient lookups because the node must be aware of
285    /// the expired height.
286    earliest_history_height: AtomicU64,
287    /// Directory where `static_files` are located
288    path: PathBuf,
289    /// Maintains a writer set of [`StaticFileSegment`].
290    writers: StaticFileWriters<N>,
291    /// Metrics for the static files.
292    metrics: Option<Arc<StaticFileProviderMetrics>>,
293    /// Access rights of the provider.
294    access: StaticFileAccess,
295    /// Number of blocks per file, per segment.
296    blocks_per_file: StaticFileMap<u64>,
297    /// Write lock for when access is [`StaticFileAccess::RW`].
298    _lock_file: Option<StorageLock>,
299    /// Genesis block number, default is 0;
300    genesis_block_number: u64,
301}
302
303impl<N: NodePrimitives> StaticFileProviderInner<N> {
304    /// Creates a new [`StaticFileProviderInner`].
305    fn new(path: impl AsRef<Path>, access: StaticFileAccess) -> ProviderResult<Self> {
306        let _lock_file = if access.is_read_write() {
307            StorageLock::try_acquire(path.as_ref()).map_err(ProviderError::other)?.into()
308        } else {
309            None
310        };
311
312        let mut blocks_per_file = StaticFileMap::default();
313        for segment in StaticFileSegment::iter() {
314            blocks_per_file.insert(segment, DEFAULT_BLOCKS_PER_STATIC_FILE);
315        }
316
317        let provider = Self {
318            map: Default::default(),
319            indexes: Default::default(),
320            writers: Default::default(),
321            earliest_history_height: Default::default(),
322            path: path.as_ref().to_path_buf(),
323            metrics: None,
324            access,
325            blocks_per_file,
326            _lock_file,
327            genesis_block_number: 0,
328        };
329
330        Ok(provider)
331    }
332
333    pub const fn is_read_only(&self) -> bool {
334        self.access.is_read_only()
335    }
336
337    /// Each static file has a fixed number of blocks. This gives out the range where the requested
338    /// block is positioned.
339    ///
340    /// If the specified block falls into one of the ranges of already initialized static files,
341    /// this function will return that range.
342    ///
343    /// If no matching file exists, this function will derive a new range from the end of the last
344    /// existing file, if any.
345    pub fn find_fixed_range_with_block_index(
346        &self,
347        segment: StaticFileSegment,
348        block_index: Option<&SegmentRanges>,
349        block: BlockNumber,
350    ) -> SegmentRangeInclusive {
351        let blocks_per_file =
352            self.blocks_per_file.get(segment).copied().unwrap_or(DEFAULT_BLOCKS_PER_STATIC_FILE);
353
354        if let Some(block_index) = block_index {
355            // Find first block range that contains the requested block
356            if let Some((_, range)) = block_index.iter().find(|(max_block, _)| block <= **max_block)
357            {
358                // Found matching range for an existing file using block index
359                return *range;
360            } else if let Some((_, range)) = block_index.last_key_value() {
361                // Didn't find matching range for an existing file, derive a new range from the end
362                // of the last existing file range.
363                //
364                // `block` is always higher than `range.end()` here, because we iterated over all
365                // `block_index` ranges above and didn't find one that contains our block
366                let blocks_after_last_range = block - range.end();
367                let segments_to_skip = (blocks_after_last_range - 1) / blocks_per_file;
368                let start = range.end() + 1 + segments_to_skip * blocks_per_file;
369                return SegmentRangeInclusive::new(start, start + blocks_per_file - 1);
370            }
371        }
372        // No block index is available, derive a new range using the fixed number of blocks,
373        // starting from the beginning.
374        find_fixed_range(block, blocks_per_file)
375    }
376
377    /// Each static file has a fixed number of blocks. This gives out the range where the requested
378    /// block is positioned.
379    ///
380    /// If the specified block falls into one of the ranges of already initialized static files,
381    /// this function will return that range.
382    ///
383    /// If no matching file exists, this function will derive a new range from the end of the last
384    /// existing file, if any.
385    ///
386    /// This function will block indefinitely if a write lock for
387    /// [`Self::indexes`] is already acquired. In that case, use
388    /// [`Self::find_fixed_range_with_block_index`].
389    pub fn find_fixed_range(
390        &self,
391        segment: StaticFileSegment,
392        block: BlockNumber,
393    ) -> SegmentRangeInclusive {
394        self.find_fixed_range_with_block_index(
395            segment,
396            self.indexes.read().get(segment).map(|index| &index.expected_block_ranges_by_max_block),
397            block,
398        )
399    }
400
401    /// Get genesis block number
402    pub const fn genesis_block_number(&self) -> u64 {
403        self.genesis_block_number
404    }
405}
406
407impl<N: NodePrimitives> StaticFileProvider<N> {
408    /// Reports metrics for the static files.
409    ///
410    /// This uses the in-memory index to get file sizes from mmap handles instead of reading
411    /// filesystem metadata.
412    pub fn report_metrics(&self) -> ProviderResult<()> {
413        let Some(metrics) = &self.metrics else { return Ok(()) };
414
415        let static_files = iter_static_files(&self.path).map_err(ProviderError::other)?;
416        for (segment, headers) in &*static_files {
417            let mut entries = 0;
418            let mut size = 0;
419
420            for (block_range, _) in headers {
421                let fixed_block_range = self.find_fixed_range(segment, block_range.start());
422                let jar_provider = self
423                    .get_segment_provider_for_range(segment, || Some(fixed_block_range), None)?
424                    .ok_or_else(|| {
425                        ProviderError::MissingStaticFileBlock(segment, block_range.start())
426                    })?;
427
428                entries += jar_provider.rows();
429                size += jar_provider.size() as u64;
430            }
431
432            metrics.record_segment(segment, size, headers.len(), entries);
433        }
434
435        Ok(())
436    }
437
438    /// Writes headers for all blocks to the static file segment.
439    #[instrument(level = "debug", target = "providers::static_file", skip_all)]
440    fn write_headers(
441        w: &mut StaticFileProviderRWRefMut<'_, N>,
442        blocks: &[ExecutedBlock<N>],
443    ) -> ProviderResult<()> {
444        for block in blocks {
445            let b = block.recovered_block();
446            w.append_header(b.header(), &b.hash())?;
447        }
448        Ok(())
449    }
450
451    /// Writes transactions for all blocks to the static file segment.
452    #[instrument(level = "debug", target = "providers::static_file", skip_all)]
453    fn write_transactions(
454        w: &mut StaticFileProviderRWRefMut<'_, N>,
455        blocks: &[ExecutedBlock<N>],
456        tx_nums: &[TxNumber],
457    ) -> ProviderResult<()> {
458        for (block, &first_tx) in blocks.iter().zip(tx_nums) {
459            let b = block.recovered_block();
460            w.increment_block(b.number())?;
461            for (i, tx) in b.body().transactions().iter().enumerate() {
462                w.append_transaction(first_tx + i as u64, tx)?;
463            }
464        }
465        Ok(())
466    }
467
468    /// Writes transaction senders for all blocks to the static file segment.
469    #[instrument(level = "debug", target = "providers::static_file", skip_all)]
470    fn write_transaction_senders(
471        w: &mut StaticFileProviderRWRefMut<'_, N>,
472        blocks: &[ExecutedBlock<N>],
473        tx_nums: &[TxNumber],
474    ) -> ProviderResult<()> {
475        for (block, &first_tx) in blocks.iter().zip(tx_nums) {
476            let b = block.recovered_block();
477            w.increment_block(b.number())?;
478            for (i, sender) in b.senders_iter().enumerate() {
479                w.append_transaction_sender(first_tx + i as u64, sender)?;
480            }
481        }
482        Ok(())
483    }
484
485    /// Writes receipts for all blocks to the static file segment.
486    #[instrument(level = "debug", target = "providers::static_file", skip_all)]
487    fn write_receipts(
488        w: &mut StaticFileProviderRWRefMut<'_, N>,
489        blocks: &[ExecutedBlock<N>],
490        tx_nums: &[TxNumber],
491        ctx: &StaticFileWriteCtx,
492    ) -> ProviderResult<()> {
493        for (block, &first_tx) in blocks.iter().zip(tx_nums) {
494            let block_number = block.recovered_block().number();
495            w.increment_block(block_number)?;
496
497            // skip writing receipts if pruning configuration requires us to.
498            if ctx.receipts_prunable &&
499                ctx.receipts_prune_mode
500                    .is_some_and(|mode| mode.should_prune(block_number, ctx.tip))
501            {
502                continue
503            }
504
505            for (i, receipt) in block.execution_outcome().receipts.iter().enumerate() {
506                w.append_receipt(first_tx + i as u64, receipt)?;
507            }
508        }
509        Ok(())
510    }
511
512    /// Writes account changesets for all blocks to the static file segment.
513    #[instrument(level = "debug", target = "providers::static_file", skip_all)]
514    fn write_account_changesets(
515        w: &mut StaticFileProviderRWRefMut<'_, N>,
516        blocks: &[ExecutedBlock<N>],
517    ) -> ProviderResult<()> {
518        for block in blocks {
519            let block_number = block.recovered_block().number();
520            let reverts = block.execution_outcome().state.reverts.to_plain_state_reverts();
521
522            let changeset: Vec<_> = reverts
523                .accounts
524                .into_iter()
525                .flatten()
526                .map(|(address, info)| AccountBeforeTx { address, info: info.map(Into::into) })
527                .collect();
528            w.append_account_changeset(changeset, block_number)?;
529        }
530        Ok(())
531    }
532
533    /// Writes storage changesets for all blocks to the static file segment.
534    #[instrument(level = "debug", target = "providers::db", skip_all)]
535    fn write_storage_changesets(
536        w: &mut StaticFileProviderRWRefMut<'_, N>,
537        blocks: &[ExecutedBlock<N>],
538    ) -> ProviderResult<()> {
539        for block in blocks {
540            let block_number = block.recovered_block().number();
541            let reverts = block.execution_outcome().state.reverts.to_plain_state_reverts();
542
543            let changeset: Vec<_> = reverts
544                .storage
545                .into_iter()
546                .flatten()
547                .flat_map(|revert| {
548                    revert.storage_revert.into_iter().map(move |(key, revert_to_slot)| {
549                        StorageBeforeTx {
550                            address: revert.address,
551                            key: B256::from(key.to_be_bytes()),
552                            value: revert_to_slot.to_previous_value(),
553                        }
554                    })
555                })
556                .collect();
557            w.append_storage_changeset(changeset, block_number)?;
558        }
559        Ok(())
560    }
561
562    /// Writes to a static file segment using the provided closure.
563    ///
564    /// The closure receives a mutable reference to the segment writer. After the closure completes,
565    /// `sync_all()` is called to flush writes to disk.
566    #[instrument(level = "debug", target = "providers::static_file", skip_all, fields(?segment))]
567    fn write_segment<F>(
568        &self,
569        segment: StaticFileSegment,
570        first_block_number: BlockNumber,
571        f: F,
572    ) -> ProviderResult<()>
573    where
574        F: FnOnce(&mut StaticFileProviderRWRefMut<'_, N>) -> ProviderResult<()>,
575    {
576        let mut w = self.get_writer(first_block_number, segment)?;
577        f(&mut w)?;
578        w.sync_all()
579    }
580
581    /// Writes all static file data for multiple blocks in parallel per-segment.
582    ///
583    /// This spawns tasks on the storage thread pool for each segment type and each task calls
584    /// `sync_all()` on its writer when done.
585    #[instrument(level = "debug", target = "providers::static_file", skip_all)]
586    pub fn write_blocks_data(
587        &self,
588        blocks: &[ExecutedBlock<N>],
589        tx_nums: &[TxNumber],
590        ctx: StaticFileWriteCtx,
591        runtime: &reth_tasks::Runtime,
592    ) -> ProviderResult<()> {
593        if blocks.is_empty() {
594            return Ok(());
595        }
596
597        let first_block_number = blocks[0].recovered_block().number();
598
599        let mut r_headers = None;
600        let mut r_txs = None;
601        let mut r_senders = None;
602        let mut r_receipts = None;
603        let mut r_account_changesets = None;
604        let mut r_storage_changesets = None;
605
606        // Propagate tracing context into rayon-spawned threads so that per-segment
607        // write spans appear as children of write_blocks_data in traces.
608        let span = tracing::Span::current();
609        runtime.storage_pool().in_place_scope(|s| {
610            s.spawn(|_| {
611                let _guard = span.enter();
612                r_headers =
613                    Some(self.write_segment(StaticFileSegment::Headers, first_block_number, |w| {
614                        Self::write_headers(w, blocks)
615                    }));
616            });
617
618            s.spawn(|_| {
619                let _guard = span.enter();
620                r_txs = Some(self.write_segment(
621                    StaticFileSegment::Transactions,
622                    first_block_number,
623                    |w| Self::write_transactions(w, blocks, tx_nums),
624                ));
625            });
626
627            if ctx.write_senders {
628                s.spawn(|_| {
629                    let _guard = span.enter();
630                    r_senders = Some(self.write_segment(
631                        StaticFileSegment::TransactionSenders,
632                        first_block_number,
633                        |w| Self::write_transaction_senders(w, blocks, tx_nums),
634                    ));
635                });
636            }
637
638            if ctx.write_receipts {
639                s.spawn(|_| {
640                    let _guard = span.enter();
641                    r_receipts = Some(self.write_segment(
642                        StaticFileSegment::Receipts,
643                        first_block_number,
644                        |w| Self::write_receipts(w, blocks, tx_nums, &ctx),
645                    ));
646                });
647            }
648
649            if ctx.write_account_changesets {
650                s.spawn(|_| {
651                    let _guard = span.enter();
652                    r_account_changesets = Some(self.write_segment(
653                        StaticFileSegment::AccountChangeSets,
654                        first_block_number,
655                        |w| Self::write_account_changesets(w, blocks),
656                    ));
657                });
658            }
659
660            if ctx.write_storage_changesets {
661                s.spawn(|_| {
662                    let _guard = span.enter();
663                    r_storage_changesets = Some(self.write_segment(
664                        StaticFileSegment::StorageChangeSets,
665                        first_block_number,
666                        |w| Self::write_storage_changesets(w, blocks),
667                    ));
668                });
669            }
670        });
671
672        r_headers.ok_or(StaticFileWriterError::ThreadPanic("headers"))??;
673        r_txs.ok_or(StaticFileWriterError::ThreadPanic("transactions"))??;
674        if ctx.write_senders {
675            r_senders.ok_or(StaticFileWriterError::ThreadPanic("senders"))??;
676        }
677        if ctx.write_receipts {
678            r_receipts.ok_or(StaticFileWriterError::ThreadPanic("receipts"))??;
679        }
680        if ctx.write_account_changesets {
681            r_account_changesets
682                .ok_or(StaticFileWriterError::ThreadPanic("account_changesets"))??;
683        }
684        if ctx.write_storage_changesets {
685            r_storage_changesets
686                .ok_or(StaticFileWriterError::ThreadPanic("storage_changesets"))??;
687        }
688        Ok(())
689    }
690
691    /// Gets the [`StaticFileJarProvider`] of the requested segment and start index that can be
692    /// either block or transaction.
693    pub fn get_segment_provider(
694        &self,
695        segment: StaticFileSegment,
696        number: u64,
697    ) -> ProviderResult<StaticFileJarProvider<'_, N>> {
698        if segment.is_block_or_change_based() {
699            self.get_segment_provider_for_block(segment, number, None)
700        } else {
701            self.get_segment_provider_for_transaction(segment, number, None)
702        }
703    }
704
705    /// Gets the [`StaticFileJarProvider`] of the requested segment and start index that can be
706    /// either block or transaction.
707    ///
708    /// If the segment is not found, returns [`None`].
709    pub fn get_maybe_segment_provider(
710        &self,
711        segment: StaticFileSegment,
712        number: u64,
713    ) -> ProviderResult<Option<StaticFileJarProvider<'_, N>>> {
714        let provider = if segment.is_block_or_change_based() {
715            self.get_segment_provider_for_block(segment, number, None)
716        } else {
717            self.get_segment_provider_for_transaction(segment, number, None)
718        };
719
720        match provider {
721            Ok(provider) => Ok(Some(provider)),
722            Err(
723                ProviderError::MissingStaticFileBlock(_, _) |
724                ProviderError::MissingStaticFileTx(_, _),
725            ) => Ok(None),
726            Err(err) => Err(err),
727        }
728    }
729
730    /// Gets the [`StaticFileJarProvider`] of the requested segment and block.
731    pub fn get_segment_provider_for_block(
732        &self,
733        segment: StaticFileSegment,
734        block: BlockNumber,
735        path: Option<&Path>,
736    ) -> ProviderResult<StaticFileJarProvider<'_, N>> {
737        self.get_segment_provider_for_range(
738            segment,
739            || self.get_segment_ranges_from_block(segment, block),
740            path,
741        )?
742        .ok_or(ProviderError::MissingStaticFileBlock(segment, block))
743    }
744
745    /// Gets the [`StaticFileJarProvider`] of the requested segment and transaction.
746    pub fn get_segment_provider_for_transaction(
747        &self,
748        segment: StaticFileSegment,
749        tx: TxNumber,
750        path: Option<&Path>,
751    ) -> ProviderResult<StaticFileJarProvider<'_, N>> {
752        self.get_segment_provider_for_range(
753            segment,
754            || self.get_segment_ranges_from_transaction(segment, tx),
755            path,
756        )?
757        .ok_or(ProviderError::MissingStaticFileTx(segment, tx))
758    }
759
760    /// Gets the [`StaticFileJarProvider`] of the requested segment and block or transaction.
761    ///
762    /// `fn_range` should make sure the range goes through `find_fixed_range`.
763    pub fn get_segment_provider_for_range(
764        &self,
765        segment: StaticFileSegment,
766        fn_range: impl Fn() -> Option<SegmentRangeInclusive>,
767        path: Option<&Path>,
768    ) -> ProviderResult<Option<StaticFileJarProvider<'_, N>>> {
769        // If we have a path, then get the block range from its name.
770        // Otherwise, check `self.available_static_files`
771        let block_range = match path {
772            Some(path) => StaticFileSegment::parse_filename(
773                &path
774                    .file_name()
775                    .ok_or_else(|| {
776                        ProviderError::MissingStaticFileSegmentPath(segment, path.to_path_buf())
777                    })?
778                    .to_string_lossy(),
779            )
780            .and_then(|(parsed_segment, block_range)| {
781                if parsed_segment == segment {
782                    return Some(block_range);
783                }
784                None
785            }),
786            None => fn_range(),
787        };
788
789        // Return cached `LoadedJar` or insert it for the first time, and then, return it.
790        if let Some(block_range) = block_range {
791            return Ok(Some(self.get_or_create_jar_provider(segment, &block_range)?));
792        }
793
794        Ok(None)
795    }
796
797    /// Gets the [`StaticFileJarProvider`] of the requested path.
798    pub fn get_segment_provider_for_path(
799        &self,
800        path: &Path,
801    ) -> ProviderResult<Option<StaticFileJarProvider<'_, N>>> {
802        StaticFileSegment::parse_filename(
803            &path
804                .file_name()
805                .ok_or_else(|| ProviderError::MissingStaticFilePath(path.to_path_buf()))?
806                .to_string_lossy(),
807        )
808        .map(|(segment, block_range)| self.get_or_create_jar_provider(segment, &block_range))
809        .transpose()
810    }
811
812    /// Given a segment and block range it removes the cached provider from the map.
813    ///
814    /// CAUTION: cached provider should be dropped before calling this or IT WILL deadlock.
815    pub fn remove_cached_provider(
816        &self,
817        segment: StaticFileSegment,
818        fixed_block_range_end: BlockNumber,
819    ) {
820        self.map.remove(&(fixed_block_range_end, segment));
821    }
822
823    /// This handles history expiry by deleting all static files for the given segment below the
824    /// given block.
825    ///
826    /// For example if block is 1M and the blocks per file are 500K this will delete all individual
827    /// files below 1M, so 0-499K and 500K-999K.
828    ///
829    /// This will not delete the file that contains the block itself, because files can only be
830    /// removed entirely.
831    ///
832    /// # Safety
833    ///
834    /// This method will never delete the highest static file for the segment, even if the
835    /// requested block is higher than the highest block in static files. This ensures we always
836    /// maintain at least one static file if any exist.
837    ///
838    /// Returns a list of `SegmentHeader`s from the deleted jars.
839    pub fn delete_segment_below_block(
840        &self,
841        segment: StaticFileSegment,
842        block: BlockNumber,
843    ) -> ProviderResult<Vec<SegmentHeader>> {
844        // Nothing to delete if block is 0.
845        if block == 0 {
846            return Ok(Vec::new());
847        }
848
849        let highest_block = self.get_highest_static_file_block(segment);
850        let mut deleted_headers = Vec::new();
851
852        loop {
853            let Some(block_height) = self.get_lowest_range_end(segment) else {
854                return Ok(deleted_headers);
855            };
856
857            // Stop if we've reached the target block or the highest static file
858            if block_height >= block || Some(block_height) == highest_block {
859                return Ok(deleted_headers);
860            }
861
862            debug!(
863                target: "providers::static_file",
864                ?segment,
865                ?block_height,
866                "Deleting static file below block"
867            );
868
869            // now we need to wipe the static file, this will take care of updating the index and
870            // advance the lowest tracked block height for the segment.
871            let header = self.delete_jar(segment, block_height).inspect_err(|err| {
872                warn!( target: "providers::static_file", ?segment, %block_height, ?err, "Failed to delete static file below block")
873            })?;
874
875            deleted_headers.push(header);
876        }
877    }
878
879    /// Given a segment and block, it deletes the jar and all files from the respective block range.
880    ///
881    /// CAUTION: destructive. Deletes files on disk.
882    ///
883    /// This will re-initialize the index after deletion, so all files are tracked.
884    ///
885    /// Returns the `SegmentHeader` of the deleted jar.
886    pub fn delete_jar(
887        &self,
888        segment: StaticFileSegment,
889        block: BlockNumber,
890    ) -> ProviderResult<SegmentHeader> {
891        let fixed_block_range = self.find_fixed_range(segment, block);
892        let key = (fixed_block_range.end(), segment);
893        let file = self.path.join(segment.filename(&fixed_block_range));
894        let jar = if let Some((_, jar)) = self.map.remove(&key) {
895            jar.jar
896        } else {
897            debug!(
898                target: "providers::static_file",
899                ?file,
900                ?fixed_block_range,
901                ?block,
902                "Loading static file jar for deletion"
903            );
904            NippyJar::<SegmentHeader>::load(&file).map_err(ProviderError::other)?
905        };
906
907        let header = jar.user_header().clone();
908
909        // Delete the sidecar file for changeset segments before deleting the main jar
910        if segment.is_change_based() {
911            let csoff_path = file.with_extension("csoff");
912            if csoff_path.exists() {
913                std::fs::remove_file(&csoff_path).map_err(ProviderError::other)?;
914            }
915        }
916
917        jar.delete().map_err(ProviderError::other)?;
918
919        // SAFETY: this is currently necessary to ensure that certain indexes like
920        // `static_files_min_block` have the correct values after pruning.
921        self.initialize_index()?;
922
923        Ok(header)
924    }
925
926    /// Deletes ALL static file jars for the given segment, including the highest one.
927    ///
928    /// CAUTION: destructive. Deletes all files on disk for this segment.
929    ///
930    /// This is used for `PruneMode::Full` where all data should be removed.
931    ///
932    /// Returns a list of `SegmentHeader`s from the deleted jars.
933    pub fn delete_segment(&self, segment: StaticFileSegment) -> ProviderResult<Vec<SegmentHeader>> {
934        let mut deleted_headers = Vec::new();
935
936        while let Some(block_height) = self.get_highest_static_file_block(segment) {
937            debug!(
938                target: "providers::static_file",
939                ?segment,
940                ?block_height,
941                "Deleting static file jar"
942            );
943
944            let header = self.delete_jar(segment, block_height).inspect_err(|err| {
945                warn!(target: "providers::static_file", ?segment, %block_height, ?err, "Failed to delete static file jar")
946            })?;
947
948            deleted_headers.push(header);
949        }
950
951        Ok(deleted_headers)
952    }
953
954    /// Given a segment and block range it returns a cached
955    /// [`StaticFileJarProvider`]. TODO(joshie): we should check the size and pop N if there's too
956    /// many.
957    fn get_or_create_jar_provider(
958        &self,
959        segment: StaticFileSegment,
960        fixed_block_range: &SegmentRangeInclusive,
961    ) -> ProviderResult<StaticFileJarProvider<'_, N>> {
962        let key = (fixed_block_range.end(), segment);
963
964        // Avoid using `entry` directly to avoid a write lock in the common case.
965        trace!(target: "providers::static_file", ?segment, ?fixed_block_range, "Getting provider");
966        let mut provider: StaticFileJarProvider<'_, N> = if let Some(jar) = self.map.get(&key) {
967            trace!(target: "providers::static_file", ?segment, ?fixed_block_range, "Jar found in cache");
968            jar.into()
969        } else {
970            trace!(target: "providers::static_file", ?segment, ?fixed_block_range, "Creating jar from scratch");
971            let path = self.path.join(segment.filename(fixed_block_range));
972            let jar = NippyJar::load(&path).map_err(ProviderError::other)?;
973            self.map.entry(key).insert(LoadedJar::new(jar)?).downgrade().into()
974        };
975
976        if let Some(metrics) = &self.metrics {
977            provider = provider.with_metrics(metrics.clone());
978        }
979        Ok(provider)
980    }
981
982    /// Gets a static file segment's block range from the provider inner block
983    /// index.
984    fn get_segment_ranges_from_block(
985        &self,
986        segment: StaticFileSegment,
987        block: u64,
988    ) -> Option<SegmentRangeInclusive> {
989        let indexes = self.indexes.read();
990        let index = indexes.get(segment)?;
991
992        (index.max_block >= block).then(|| {
993            self.find_fixed_range_with_block_index(
994                segment,
995                Some(&index.expected_block_ranges_by_max_block),
996                block,
997            )
998        })
999    }
1000
1001    /// Gets a static file segment's fixed block range from the provider inner
1002    /// transaction index.
1003    fn get_segment_ranges_from_transaction(
1004        &self,
1005        segment: StaticFileSegment,
1006        tx: u64,
1007    ) -> Option<SegmentRangeInclusive> {
1008        let indexes = self.indexes.read();
1009        let index = indexes.get(segment)?;
1010        let available_block_ranges_by_max_tx = index.available_block_ranges_by_max_tx.as_ref()?;
1011
1012        // It's more probable that the request comes from a newer tx height, so we iterate
1013        // the static_files in reverse.
1014        let mut static_files_rev_iter = available_block_ranges_by_max_tx.iter().rev().peekable();
1015
1016        while let Some((tx_end, block_range)) = static_files_rev_iter.next() {
1017            if tx > *tx_end {
1018                // request tx is higher than highest static file tx
1019                return None;
1020            }
1021            let tx_start = static_files_rev_iter.peek().map(|(tx_end, _)| *tx_end + 1).unwrap_or(0);
1022            if tx_start <= tx {
1023                return Some(self.find_fixed_range_with_block_index(
1024                    segment,
1025                    Some(&index.expected_block_ranges_by_max_block),
1026                    block_range.end(),
1027                ));
1028            }
1029        }
1030        None
1031    }
1032
1033    /// Updates the inner transaction and block indexes alongside the internal cached providers in
1034    /// `self.map`.
1035    ///
1036    /// Any entry higher than `segment_max_block` will be deleted from the previous structures.
1037    ///
1038    /// If `segment_max_block` is None it means there's no static file for this segment.
1039    pub fn update_index(
1040        &self,
1041        segment: StaticFileSegment,
1042        segment_max_block: Option<BlockNumber>,
1043    ) -> ProviderResult<()> {
1044        debug!(
1045            target: "providers::static_file",
1046            ?segment,
1047            ?segment_max_block,
1048            "Updating provider index"
1049        );
1050        let mut indexes = self.indexes.write();
1051
1052        match segment_max_block {
1053            Some(segment_max_block) => {
1054                let fixed_range = self.find_fixed_range_with_block_index(
1055                    segment,
1056                    indexes.get(segment).map(|index| &index.expected_block_ranges_by_max_block),
1057                    segment_max_block,
1058                );
1059
1060                let jar = NippyJar::<SegmentHeader>::load(
1061                    &self.path.join(segment.filename(&fixed_range)),
1062                )
1063                .map_err(ProviderError::other)?;
1064
1065                let index = indexes
1066                    .entry(segment)
1067                    .and_modify(|index| {
1068                        // Update max block
1069                        index.max_block = segment_max_block;
1070
1071                        // Update expected block range index
1072
1073                        // Remove all expected block ranges that are less than the new max block
1074                        index
1075                            .expected_block_ranges_by_max_block
1076                            .retain(|_, block_range| block_range.start() < fixed_range.start());
1077                        // Insert new expected block range
1078                        index
1079                            .expected_block_ranges_by_max_block
1080                            .insert(fixed_range.end(), fixed_range);
1081                    })
1082                    .or_insert_with(|| StaticFileSegmentIndex {
1083                        min_block_range: None,
1084                        max_block: segment_max_block,
1085                        expected_block_ranges_by_max_block: BTreeMap::from([(
1086                            fixed_range.end(),
1087                            fixed_range,
1088                        )]),
1089                        available_block_ranges_by_max_tx: None,
1090                    });
1091
1092                // Update min_block to track the lowest block range of the segment.
1093                // This is initially set by initialize_index() on node startup, but must be updated
1094                // as the file grows to prevent stale values.
1095                //
1096                // Without this update, min_block can remain at genesis (e.g. Some([0..=0]) or None)
1097                // even after syncing to higher blocks (e.g. [0..=100]). A stale
1098                // min_block causes get_lowest_static_file_block() to return the
1099                // wrong end value, which breaks pruning logic that relies on it for
1100                // safety checks.
1101                //
1102                // Example progression:
1103                // 1. Node starts, initialize_index() sets min_block = [0..=0]
1104                // 2. Sync to block 100, this update sets min_block = [0..=100]
1105                // 3. Pruner calls get_lowest_static_file_block() -> returns 100 (correct). Without
1106                //    this update, it would incorrectly return 0 (stale)
1107                if let Some(current_block_range) = jar.user_header().block_range() {
1108                    if let Some(min_block_range) = index.min_block_range.as_mut() {
1109                        // delete_jar WILL ALWAYS re-initialize all indexes, so we are always
1110                        // sure that current_min is always the lowest.
1111                        if current_block_range.start() == min_block_range.start() {
1112                            *min_block_range = current_block_range;
1113                        }
1114                    } else {
1115                        index.min_block_range = Some(current_block_range);
1116                    }
1117                }
1118
1119                // Updates the tx index by first removing all entries which have a higher
1120                // block_start than our current static file.
1121                if let Some(tx_range) = jar.user_header().tx_range() {
1122                    // Current block range has the same block start as `fixed_range``, but block end
1123                    // might be different if we are still filling this static file.
1124                    if let Some(current_block_range) = jar.user_header().block_range() {
1125                        let tx_end = tx_range.end();
1126
1127                        // Considering that `update_index` is called when we either append/truncate,
1128                        // we are sure that we are handling the latest data
1129                        // points.
1130                        //
1131                        // Here we remove every entry of the index that has a block start higher or
1132                        // equal than our current one. This is important in the case
1133                        // that we prune a lot of rows resulting in a file (and thus
1134                        // a higher block range) deletion.
1135                        if let Some(index) = index.available_block_ranges_by_max_tx.as_mut() {
1136                            index
1137                                .retain(|_, block_range| block_range.start() < fixed_range.start());
1138                            index.insert(tx_end, current_block_range);
1139                        } else {
1140                            index.available_block_ranges_by_max_tx =
1141                                Some(BTreeMap::from([(tx_end, current_block_range)]));
1142                        }
1143                    }
1144                } else if segment.is_tx_based() {
1145                    // The unwinded file has no more transactions/receipts. However, the highest
1146                    // block is within this files' block range. We only retain
1147                    // entries with block ranges before the current one.
1148                    if let Some(index) = index.available_block_ranges_by_max_tx.as_mut() {
1149                        index.retain(|_, block_range| block_range.start() < fixed_range.start());
1150                    }
1151
1152                    // If the index is empty, just remove it.
1153                    index.available_block_ranges_by_max_tx.take_if(|index| index.is_empty());
1154                }
1155
1156                // Update the cached provider.
1157                debug!(target: "providers::static_file", ?segment, "Inserting updated jar into cache");
1158                self.map.insert((fixed_range.end(), segment), LoadedJar::new(jar)?);
1159
1160                // Delete any cached provider that no longer has an associated jar.
1161                debug!(target: "providers::static_file", ?segment, "Cleaning up jar map");
1162                self.map.retain(|(end, seg), _| !(*seg == segment && *end > fixed_range.end()));
1163            }
1164            None => {
1165                debug!(target: "providers::static_file", ?segment, "Removing segment from index");
1166                indexes.remove(segment);
1167            }
1168        };
1169
1170        debug!(target: "providers::static_file", ?segment, "Updated provider index");
1171        Ok(())
1172    }
1173
1174    /// Initializes the inner transaction and block index
1175    pub fn initialize_index(&self) -> ProviderResult<()> {
1176        let mut indexes = self.indexes.write();
1177        indexes.clear();
1178
1179        for (segment, headers) in &*iter_static_files(&self.path).map_err(ProviderError::other)? {
1180            // Update first and last block for each segment
1181            //
1182            // It's safe to call `expect` here, because every segment has at least one header
1183            // associated with it.
1184            let min_block_range = Some(headers.first().expect("headers are not empty").0);
1185            let max_block = headers.last().expect("headers are not empty").0.end();
1186
1187            let mut expected_block_ranges_by_max_block = BTreeMap::default();
1188            let mut available_block_ranges_by_max_tx = None;
1189
1190            for (block_range, header) in headers {
1191                // Update max expected block -> expected_block_range index
1192                expected_block_ranges_by_max_block
1193                    .insert(header.expected_block_end(), header.expected_block_range());
1194
1195                // Update max tx -> block_range index
1196                if let Some(tx_range) = header.tx_range() {
1197                    let tx_end = tx_range.end();
1198
1199                    available_block_ranges_by_max_tx
1200                        .get_or_insert_with(BTreeMap::default)
1201                        .insert(tx_end, *block_range);
1202                }
1203            }
1204
1205            indexes.insert(
1206                segment,
1207                StaticFileSegmentIndex {
1208                    min_block_range,
1209                    max_block,
1210                    expected_block_ranges_by_max_block,
1211                    available_block_ranges_by_max_tx,
1212                },
1213            );
1214        }
1215
1216        // If this is a re-initialization, we need to clear this as well
1217        self.map.clear();
1218
1219        // initialize the expired history height to the lowest static file block
1220        if let Some(lowest_range) =
1221            indexes.get(StaticFileSegment::Transactions).and_then(|index| index.min_block_range)
1222        {
1223            // the earliest height is the lowest available block number
1224            self.earliest_history_height
1225                .store(lowest_range.start(), std::sync::atomic::Ordering::Relaxed);
1226        }
1227
1228        Ok(())
1229    }
1230
1231    /// Ensures that any broken invariants which cannot be healed on the spot return a pipeline
1232    /// target to unwind to.
1233    ///
1234    /// Two types of consistency checks are done for:
1235    ///
1236    /// 1) When a static file fails to commit but the underlying data was changed.
1237    /// 2) When a static file was committed, but the required database transaction was not.
1238    ///
1239    /// For 1) it can self-heal if `self.access.is_read_only()` is set to `false`. Otherwise, it
1240    /// will return an error.
1241    /// For 2) the invariants below are checked, and if broken, might require a pipeline unwind
1242    /// to heal.
1243    ///
1244    /// For each static file segment:
1245    /// * the corresponding database table should overlap or have continuity in their keys
1246    ///   ([`TxNumber`] or [`BlockNumber`]).
1247    /// * its highest block should match the stage checkpoint block number if it's equal or higher
1248    ///   than the corresponding database table last entry.
1249    ///
1250    /// Returns a [`Option`] of [`PipelineTarget::Unwind`] if any healing is further required.
1251    ///
1252    /// WARNING: No static file writer should be held before calling this function, otherwise it
1253    /// will deadlock.
1254    #[instrument(skip(self, provider), fields(read_only = self.is_read_only()))]
1255    pub fn check_consistency<Provider>(
1256        &self,
1257        provider: &Provider,
1258    ) -> ProviderResult<Option<PipelineTarget>>
1259    where
1260        Provider: DBProvider
1261            + BlockReader
1262            + StageCheckpointReader
1263            + PruneCheckpointReader
1264            + ChainSpecProvider
1265            + StorageSettingsCache,
1266        N: NodePrimitives<Receipt: Value, BlockHeader: Value, SignedTx: Value>,
1267    {
1268        // OVM historical import is broken and does not work with this check. It's importing
1269        // duplicated receipts resulting in having more receipts than the expected transaction
1270        // range.
1271        //
1272        // If we detect an OVM import was done (block #1 <https://optimistic.etherscan.io/block/1>), skip it.
1273        // More on [#11099](https://github.com/paradigmxyz/reth/pull/11099).
1274        if provider.chain_spec().is_optimism() &&
1275            reth_chainspec::Chain::optimism_mainnet() == provider.chain_spec().chain_id()
1276        {
1277            // check whether we have the first OVM block: <https://optimistic.etherscan.io/block/0xbee7192e575af30420cae0c7776304ac196077ee72b048970549e4f08e875453>
1278            const OVM_HEADER_1_HASH: B256 =
1279                b256!("0xbee7192e575af30420cae0c7776304ac196077ee72b048970549e4f08e875453");
1280            if provider.block_number(OVM_HEADER_1_HASH)?.is_some() {
1281                info!(target: "reth::cli",
1282                    "Skipping storage verification for OP mainnet, expected inconsistency in OVM chain"
1283                );
1284                return Ok(None);
1285            }
1286        }
1287
1288        info!(target: "reth::cli", "Verifying storage consistency.");
1289
1290        let mut unwind_target: Option<BlockNumber> = None;
1291
1292        let mut update_unwind_target = |new_target| {
1293            unwind_target =
1294                unwind_target.map(|current| current.min(new_target)).or(Some(new_target));
1295        };
1296
1297        for segment in self.segments_to_check(provider) {
1298            let span = info_span!(
1299                "Checking consistency for segment",
1300                ?segment,
1301                initial_highest_block = tracing::field::Empty,
1302                highest_block = tracing::field::Empty,
1303                highest_tx = tracing::field::Empty,
1304            );
1305            let _guard = span.enter();
1306
1307            debug!(target: "reth::providers::static_file", "Checking consistency for segment");
1308
1309            // Heal file-level inconsistencies and get before/after highest block
1310            let (initial_highest_block, mut highest_block) = self.maybe_heal_segment(segment)?;
1311            span.record("initial_highest_block", initial_highest_block);
1312            span.record("highest_block", highest_block);
1313
1314            // Only applies to block-based static files. (Headers)
1315            //
1316            // The updated `highest_block` may have decreased if we healed from a pruning
1317            // interruption.
1318            if initial_highest_block != highest_block {
1319                info!(
1320                    target: "reth::providers::static_file",
1321                    unwind_target = highest_block,
1322                    "Setting unwind target."
1323                );
1324                update_unwind_target(highest_block.unwrap_or_default());
1325            }
1326
1327            // Only applies to transaction-based static files. (Receipts & Transactions)
1328            //
1329            // Make sure the last transaction matches the last block from its indices, since a heal
1330            // from a pruning interruption might have decreased the number of transactions without
1331            // being able to update the last block of the static file segment.
1332            let highest_tx = self.get_highest_static_file_tx(segment);
1333            span.record("highest_tx", highest_tx);
1334            debug!(target: "reth::providers::static_file", "Checking tx index segment");
1335
1336            if let Some(highest_tx) = highest_tx {
1337                let mut last_block = highest_block.unwrap_or_default();
1338                debug!(target: "reth::providers::static_file", last_block, highest_tx, "Verifying last transaction matches last block indices");
1339                loop {
1340                    let Some(indices) = provider.block_body_indices(last_block)? else {
1341                        debug!(target: "reth::providers::static_file", last_block, "Block body indices not found, static files ahead of database");
1342                        // If the block body indices can not be found, then it means that static
1343                        // files is ahead of database, and the `ensure_invariants` check will fix
1344                        // it by comparing with stage checkpoints.
1345                        break
1346                    };
1347
1348                    debug!(target: "reth::providers::static_file", last_block, last_tx_num = indices.last_tx_num(), "Found block body indices");
1349
1350                    if indices.last_tx_num() <= highest_tx {
1351                        break
1352                    }
1353
1354                    if last_block == 0 {
1355                        debug!(target: "reth::providers::static_file", "Reached block 0 in verification loop");
1356                        break
1357                    }
1358
1359                    last_block -= 1;
1360
1361                    info!(
1362                        target: "reth::providers::static_file",
1363                        highest_block = self.get_highest_static_file_block(segment),
1364                        unwind_target = last_block,
1365                        "Setting unwind target."
1366                    );
1367                    span.record("highest_block", last_block);
1368                    highest_block = Some(last_block);
1369                    update_unwind_target(last_block);
1370                }
1371            }
1372
1373            debug!(target: "reth::providers::static_file", "Ensuring invariants for segment");
1374
1375            match self.ensure_invariants_for(provider, segment, highest_tx, highest_block)? {
1376                Some(unwind) => {
1377                    debug!(target: "reth::providers::static_file", unwind_target=unwind, "Invariants check returned unwind target");
1378                    update_unwind_target(unwind);
1379                }
1380                None => {
1381                    debug!(target: "reth::providers::static_file", "Invariants check completed, no unwind needed")
1382                }
1383            }
1384        }
1385
1386        Ok(unwind_target.map(PipelineTarget::Unwind))
1387    }
1388
1389    /// Heals file-level (`NippyJar`) inconsistencies for eligible static file
1390    /// segments.
1391    ///
1392    /// Call before [`Self::check_consistency`] so files are internally
1393    /// consistent.
1394    ///
1395    /// Uses the same segment-skip logic as [`Self::check_consistency`], but
1396    /// does not compare with database checkpoints or prune against them.
1397    pub fn check_file_consistency<Provider>(&self, provider: &Provider) -> ProviderResult<()>
1398    where
1399        Provider: DBProvider + ChainSpecProvider + StorageSettingsCache + PruneCheckpointReader,
1400    {
1401        info!(target: "reth::cli", "Healing static file inconsistencies.");
1402
1403        for segment in self.segments_to_check(provider) {
1404            let _guard = info_span!("healing_static_file_segment", ?segment).entered();
1405            let _ = self.maybe_heal_segment(segment)?;
1406        }
1407
1408        Ok(())
1409    }
1410
1411    /// Returns the static file segments that should be checked/healed for this provider.
1412    fn segments_to_check<'a, Provider>(
1413        &'a self,
1414        provider: &'a Provider,
1415    ) -> impl Iterator<Item = StaticFileSegment> + 'a
1416    where
1417        Provider: DBProvider + ChainSpecProvider + StorageSettingsCache + PruneCheckpointReader,
1418    {
1419        StaticFileSegment::iter()
1420            .filter(move |segment| self.should_check_segment(provider, *segment))
1421    }
1422
1423    /// True if the given segment should be checked/healed for this provider.
1424    fn should_check_segment<Provider>(
1425        &self,
1426        provider: &Provider,
1427        segment: StaticFileSegment,
1428    ) -> bool
1429    where
1430        Provider: DBProvider + ChainSpecProvider + StorageSettingsCache + PruneCheckpointReader,
1431    {
1432        match segment {
1433            StaticFileSegment::Headers | StaticFileSegment::Transactions => true,
1434            StaticFileSegment::Receipts => {
1435                if EitherWriter::receipts_destination(provider).is_database() {
1436                    // Old pruned nodes (including full node) do not store receipts as static
1437                    // files.
1438                    debug!(target: "reth::providers::static_file", ?segment, "Skipping receipts segment: receipts stored in database");
1439                    return false;
1440                }
1441
1442                if NamedChain::Gnosis == provider.chain_spec().chain_id() ||
1443                    NamedChain::Chiado == provider.chain_spec().chain_id()
1444                {
1445                    // Gnosis and Chiado's historical import is broken and does not work with
1446                    // this check. They are importing receipts along
1447                    // with importing headers/bodies.
1448                    debug!(target: "reth::providers::static_file", ?segment, "Skipping receipts segment: broken historical import for gnosis/chiado");
1449                    return false;
1450                }
1451
1452                true
1453            }
1454            StaticFileSegment::TransactionSenders => {
1455                if EitherWriterDestination::senders(provider).is_database() {
1456                    debug!(target: "reth::providers::static_file", ?segment, "Skipping senders segment: senders stored in database");
1457                    return false;
1458                }
1459
1460                if Self::is_segment_fully_pruned(provider, PruneSegment::SenderRecovery) {
1461                    debug!(target: "reth::providers::static_file", ?segment, "Skipping senders segment: fully pruned");
1462                    return false;
1463                }
1464
1465                true
1466            }
1467            StaticFileSegment::AccountChangeSets => {
1468                if EitherWriter::account_changesets_destination(provider).is_database() {
1469                    debug!(target: "reth::providers::static_file", ?segment, "Skipping account changesets segment: changesets stored in database");
1470                    return false;
1471                }
1472                true
1473            }
1474            StaticFileSegment::StorageChangeSets => {
1475                if EitherWriter::storage_changesets_destination(provider).is_database() {
1476                    debug!(target: "reth::providers::static_file", ?segment, "Skipping storage changesets segment: changesets stored in database");
1477                    return false
1478                }
1479                true
1480            }
1481        }
1482    }
1483
1484    /// Returns `true` if the given prune segment has a checkpoint with
1485    /// [`reth_prune_types::PruneMode::Full`], indicating all data for this segment has been
1486    /// intentionally deleted.
1487    fn is_segment_fully_pruned<Provider>(provider: &Provider, segment: PruneSegment) -> bool
1488    where
1489        Provider: PruneCheckpointReader,
1490    {
1491        provider
1492            .get_prune_checkpoint(segment)
1493            .ok()
1494            .flatten()
1495            .is_some_and(|checkpoint| checkpoint.prune_mode.is_full())
1496    }
1497
1498    /// Checks consistency of the latest static file segment and throws an
1499    /// error if at fault.
1500    ///
1501    /// Read-only.
1502    fn check_segment_consistency(&self, segment: StaticFileSegment) -> ProviderResult<()> {
1503        debug!(target: "reth::providers::static_file", "Checking segment consistency");
1504        if let Some(latest_block) = self.get_highest_static_file_block(segment) {
1505            let file_path = self
1506                .directory()
1507                .join(segment.filename(&self.find_fixed_range(segment, latest_block)));
1508            debug!(target: "reth::providers::static_file", ?file_path, latest_block, "Loading NippyJar for consistency check");
1509
1510            let jar = NippyJar::<SegmentHeader>::load(&file_path).map_err(ProviderError::other)?;
1511            debug!(target: "reth::providers::static_file", "NippyJar loaded, checking consistency");
1512
1513            NippyJarChecker::new(jar).check_consistency().map_err(ProviderError::other)?;
1514            debug!(target: "reth::providers::static_file", "NippyJar consistency check passed");
1515        } else {
1516            debug!(target: "reth::providers::static_file", "No static file block found, skipping consistency check");
1517        }
1518        Ok(())
1519    }
1520
1521    /// Attempts to heal file-level (`NippyJar`) inconsistencies for a single static file segment.
1522    ///
1523    /// Returns the highest block before and after healing, which can be used to detect
1524    /// if healing from a pruning interruption decreased the highest block.
1525    ///
1526    /// File consistency is broken if:
1527    ///
1528    /// * appending data was interrupted before a config commit, then data file will be truncated
1529    ///   according to the config.
1530    ///
1531    /// * pruning data was interrupted before a config commit, then we have deleted data that we are
1532    ///   expected to still have. We need to check the Database and unwind everything accordingly.
1533    ///
1534    /// **Note:** In read-only mode, this will return an error if a consistency issue is detected,
1535    /// since healing requires write access.
1536    fn maybe_heal_segment(
1537        &self,
1538        segment: StaticFileSegment,
1539    ) -> ProviderResult<(Option<BlockNumber>, Option<BlockNumber>)> {
1540        let initial_highest_block = self.get_highest_static_file_block(segment);
1541        debug!(target: "reth::providers::static_file", ?initial_highest_block, "Initial highest block for segment");
1542
1543        if self.access.is_read_only() {
1544            // Read-only mode: cannot modify files, so just validate consistency and error if
1545            // broken.
1546            debug!(target: "reth::providers::static_file", "Checking segment consistency (read-only)");
1547            self.check_segment_consistency(segment)?;
1548        } else {
1549            // Writable mode: fetching the writer will automatically heal any file-level
1550            // inconsistency by truncating data to match the last committed config.
1551            debug!(target: "reth::providers::static_file", "Fetching latest writer which might heal any potential inconsistency");
1552            self.latest_writer(segment)?;
1553        }
1554
1555        // The updated `highest_block` may have decreased if we healed from a
1556        // pruning interruption.
1557        let highest_block = self.get_highest_static_file_block(segment);
1558
1559        Ok((initial_highest_block, highest_block))
1560    }
1561
1562    /// Ensure invariants for each corresponding table and static file segment.
1563    fn ensure_invariants_for<Provider>(
1564        &self,
1565        provider: &Provider,
1566        segment: StaticFileSegment,
1567        highest_tx: Option<u64>,
1568        highest_block: Option<BlockNumber>,
1569    ) -> ProviderResult<Option<BlockNumber>>
1570    where
1571        Provider: DBProvider + BlockReader + StageCheckpointReader,
1572        N: NodePrimitives<Receipt: Value, BlockHeader: Value, SignedTx: Value>,
1573    {
1574        match segment {
1575            StaticFileSegment::Headers => self
1576                .ensure_invariants::<_, tables::Headers<N::BlockHeader>>(
1577                    provider,
1578                    segment,
1579                    highest_block,
1580                    highest_block,
1581                ),
1582            StaticFileSegment::Transactions => self
1583                .ensure_invariants::<_, tables::Transactions<N::SignedTx>>(
1584                    provider,
1585                    segment,
1586                    highest_tx,
1587                    highest_block,
1588                ),
1589            StaticFileSegment::Receipts => self
1590                .ensure_invariants::<_, tables::Receipts<N::Receipt>>(
1591                    provider,
1592                    segment,
1593                    highest_tx,
1594                    highest_block,
1595                ),
1596            StaticFileSegment::TransactionSenders => self
1597                .ensure_invariants::<_, tables::TransactionSenders>(
1598                    provider,
1599                    segment,
1600                    highest_tx,
1601                    highest_block,
1602                ),
1603            StaticFileSegment::AccountChangeSets => self
1604                .ensure_invariants::<_, tables::AccountChangeSets>(
1605                    provider,
1606                    segment,
1607                    highest_tx,
1608                    highest_block,
1609                ),
1610            StaticFileSegment::StorageChangeSets => self
1611                .ensure_changeset_invariants_by_block::<_, tables::StorageChangeSets, _>(
1612                    provider,
1613                    segment,
1614                    highest_block,
1615                    |key| key.block_number(),
1616                ),
1617        }
1618    }
1619
1620    /// Check invariants for each corresponding table and static file segment:
1621    ///
1622    /// * the corresponding database table should overlap or have continuity in their keys
1623    ///   ([`TxNumber`] or [`BlockNumber`]).
1624    /// * its highest block should match the stage checkpoint block number if it's equal or higher
1625    ///   than the corresponding database table last entry.
1626    ///   * If the checkpoint block is higher, then request a pipeline unwind to the static file
1627    ///     block. This is expressed by returning [`Some`] with the requested pipeline unwind
1628    ///     target.
1629    ///   * If the checkpoint block is lower, then heal by removing rows from the static file. In
1630    ///     this case, the rows will be removed and [`None`] will be returned.
1631    ///
1632    /// * If the database tables overlap with static files and have contiguous keys, or the
1633    ///   checkpoint block matches the highest static files block, then [`None`] will be returned.
1634    #[instrument(skip(self, provider, segment), fields(table = T::NAME))]
1635    fn ensure_invariants<Provider, T: Table<Key = u64>>(
1636        &self,
1637        provider: &Provider,
1638        segment: StaticFileSegment,
1639        highest_static_file_entry: Option<u64>,
1640        highest_static_file_block: Option<BlockNumber>,
1641    ) -> ProviderResult<Option<BlockNumber>>
1642    where
1643        Provider: DBProvider + BlockReader + StageCheckpointReader,
1644    {
1645        debug!(target: "reth::providers::static_file", "Ensuring invariants");
1646        let mut db_cursor = provider.tx_ref().cursor_read::<T>()?;
1647
1648        if let Some((db_first_entry, _)) = db_cursor.first()? {
1649            debug!(target: "reth::providers::static_file", db_first_entry, "Found first database entry");
1650            if let (Some(highest_entry), Some(highest_block)) =
1651                (highest_static_file_entry, highest_static_file_block)
1652            {
1653                // If there is a gap between the entry found in static file and
1654                // database, then we have most likely lost static file data and need to unwind so we
1655                // can load it again
1656                if !(db_first_entry <= highest_entry || highest_entry + 1 == db_first_entry) {
1657                    info!(
1658                        target: "reth::providers::static_file",
1659                        ?db_first_entry,
1660                        ?highest_entry,
1661                        unwind_target = highest_block,
1662                        "Setting unwind target."
1663                    );
1664                    return Ok(Some(highest_block));
1665                }
1666            }
1667
1668            if let Some((db_last_entry, _)) = db_cursor.last()? &&
1669                highest_static_file_entry
1670                    .is_none_or(|highest_entry| db_last_entry > highest_entry)
1671            {
1672                debug!(target: "reth::providers::static_file", db_last_entry, "Database has entries beyond static files, no unwind needed");
1673                return Ok(None)
1674            }
1675        } else {
1676            debug!(target: "reth::providers::static_file", "No database entries found");
1677        }
1678
1679        let highest_static_file_entry = highest_static_file_entry.unwrap_or_default();
1680        let highest_static_file_block = highest_static_file_block.unwrap_or_default();
1681
1682        // If static file entry is ahead of the database entries, then ensure the checkpoint block
1683        // number matches.
1684        let stage_id = segment.to_stage_id();
1685        let checkpoint_block_number =
1686            provider.get_stage_checkpoint(stage_id)?.unwrap_or_default().block_number;
1687        debug!(target: "reth::providers::static_file", ?stage_id, checkpoint_block_number, "Retrieved stage checkpoint");
1688
1689        // If the checkpoint is ahead, then we lost static file data. May be data corruption.
1690        if checkpoint_block_number > highest_static_file_block {
1691            info!(
1692                target: "reth::providers::static_file",
1693                checkpoint_block_number,
1694                unwind_target = highest_static_file_block,
1695                "Setting unwind target."
1696            );
1697            return Ok(Some(highest_static_file_block));
1698        }
1699
1700        // If the checkpoint is ahead, or matches, then nothing to do.
1701        if checkpoint_block_number >= highest_static_file_block {
1702            debug!(target: "reth::providers::static_file", "Invariants ensured, returning None");
1703            return Ok(None);
1704        }
1705
1706        // If the checkpoint is behind, then we failed to do a database commit
1707        // **but committed** to static files on executing a stage, or the
1708        // reverse on unwinding a stage.
1709        //
1710        // All we need to do is to prune the extra static file rows.
1711        info!(
1712            target: "reth::providers",
1713            from = highest_static_file_block,
1714            to = checkpoint_block_number,
1715            "Unwinding static file segment."
1716        );
1717        let mut writer = self.latest_writer(segment)?;
1718
1719        match segment {
1720            StaticFileSegment::Headers => {
1721                let prune_count = highest_static_file_block - checkpoint_block_number;
1722                debug!(target: "reth::providers::static_file", prune_count, "Pruning headers");
1723                // TODO(joshie): is_block_meta
1724                writer.prune_headers(prune_count)?;
1725            }
1726            StaticFileSegment::Transactions |
1727            StaticFileSegment::Receipts |
1728            StaticFileSegment::TransactionSenders => {
1729                if let Some(block) = provider.block_body_indices(checkpoint_block_number)? {
1730                    let number = highest_static_file_entry - block.last_tx_num();
1731                    debug!(target: "reth::providers::static_file", prune_count = number, checkpoint_block_number, "Pruning transaction based segment");
1732
1733                    match segment {
1734                        StaticFileSegment::Transactions => {
1735                            writer.prune_transactions(number, checkpoint_block_number)?
1736                        }
1737                        StaticFileSegment::Receipts => {
1738                            writer.prune_receipts(number, checkpoint_block_number)?
1739                        }
1740                        StaticFileSegment::TransactionSenders => {
1741                            writer.prune_transaction_senders(number, checkpoint_block_number)?
1742                        }
1743                        StaticFileSegment::Headers |
1744                        StaticFileSegment::AccountChangeSets |
1745                        StaticFileSegment::StorageChangeSets => {
1746                            unreachable!()
1747                        }
1748                    }
1749                } else {
1750                    debug!(target: "reth::providers::static_file", checkpoint_block_number, "No block body indices found for checkpoint block");
1751                }
1752            }
1753            StaticFileSegment::AccountChangeSets => {
1754                writer.prune_account_changesets(checkpoint_block_number)?;
1755            }
1756            StaticFileSegment::StorageChangeSets => {
1757                writer.prune_storage_changesets(checkpoint_block_number)?;
1758            }
1759        }
1760
1761        debug!(target: "reth::providers::static_file", "Committing writer after pruning");
1762        writer.commit()?;
1763        debug!(target: "reth::providers::static_file", "Writer committed successfully");
1764
1765        debug!(target: "reth::providers::static_file", "Invariants ensured, returning None");
1766        Ok(None)
1767    }
1768
1769    fn ensure_changeset_invariants_by_block<Provider, T, F>(
1770        &self,
1771        provider: &Provider,
1772        segment: StaticFileSegment,
1773        highest_static_file_block: Option<BlockNumber>,
1774        block_from_key: F,
1775    ) -> ProviderResult<Option<BlockNumber>>
1776    where
1777        Provider: DBProvider + BlockReader + StageCheckpointReader,
1778        T: Table,
1779        F: Fn(&T::Key) -> BlockNumber,
1780    {
1781        debug!(
1782            target: "reth::providers::static_file",
1783            ?segment,
1784            ?highest_static_file_block,
1785            "Ensuring changeset invariants"
1786        );
1787        let mut db_cursor = provider.tx_ref().cursor_read::<T>()?;
1788
1789        if let Some((db_first_key, _)) = db_cursor.first()? {
1790            let db_first_block = block_from_key(&db_first_key);
1791            if let Some(highest_block) = highest_static_file_block &&
1792                !(db_first_block <= highest_block || highest_block + 1 == db_first_block)
1793            {
1794                info!(
1795                    target: "reth::providers::static_file",
1796                    ?db_first_block,
1797                    ?highest_block,
1798                    unwind_target = highest_block,
1799                    ?segment,
1800                    "Setting unwind target."
1801                );
1802                return Ok(Some(highest_block))
1803            }
1804
1805            if let Some((db_last_key, _)) = db_cursor.last()? &&
1806                highest_static_file_block
1807                    .is_none_or(|highest_block| block_from_key(&db_last_key) > highest_block)
1808            {
1809                debug!(
1810                    target: "reth::providers::static_file",
1811                    ?segment,
1812                    "Database has entries beyond static files, no unwind needed"
1813                );
1814                return Ok(None)
1815            }
1816        } else {
1817            debug!(target: "reth::providers::static_file", ?segment, "No database entries found");
1818        }
1819
1820        let highest_static_file_block = highest_static_file_block.unwrap_or_default();
1821
1822        let stage_id = segment.to_stage_id();
1823        let checkpoint_block_number =
1824            provider.get_stage_checkpoint(stage_id)?.unwrap_or_default().block_number;
1825
1826        if checkpoint_block_number > highest_static_file_block {
1827            info!(
1828                target: "reth::providers::static_file",
1829                checkpoint_block_number,
1830                unwind_target = highest_static_file_block,
1831                ?segment,
1832                "Setting unwind target."
1833            );
1834            return Ok(Some(highest_static_file_block))
1835        }
1836
1837        if checkpoint_block_number < highest_static_file_block {
1838            info!(
1839                target: "reth::providers",
1840                ?segment,
1841                from = highest_static_file_block,
1842                to = checkpoint_block_number,
1843                "Unwinding static file segment."
1844            );
1845            let mut writer = self.latest_writer(segment)?;
1846            match segment {
1847                StaticFileSegment::AccountChangeSets => {
1848                    writer.prune_account_changesets(checkpoint_block_number)?;
1849                }
1850                StaticFileSegment::StorageChangeSets => {
1851                    writer.prune_storage_changesets(checkpoint_block_number)?;
1852                }
1853                _ => unreachable!("invalid segment for changeset invariants"),
1854            }
1855            writer.commit()?;
1856        }
1857
1858        Ok(None)
1859    }
1860
1861    /// Returns the earliest available block number that has not been expired and is still
1862    /// available.
1863    ///
1864    /// This means that the highest expired block (or expired block height) is
1865    /// `earliest_history_height.saturating_sub(1)`.
1866    ///
1867    /// Returns `0` if no history has been expired.
1868    pub fn earliest_history_height(&self) -> BlockNumber {
1869        self.earliest_history_height.load(std::sync::atomic::Ordering::Relaxed)
1870    }
1871
1872    /// Gets the lowest static file's block range if it exists for a static file segment.
1873    ///
1874    /// If there is nothing on disk for the given segment, this will return [`None`].
1875    pub fn get_lowest_range(&self, segment: StaticFileSegment) -> Option<SegmentRangeInclusive> {
1876        self.indexes.read().get(segment).and_then(|index| index.min_block_range)
1877    }
1878
1879    /// Gets the lowest static file's block range start if it exists for a static file segment.
1880    ///
1881    /// For example if the lowest static file has blocks 0-499, this will return 0.
1882    ///
1883    /// If there is nothing on disk for the given segment, this will return [`None`].
1884    pub fn get_lowest_range_start(&self, segment: StaticFileSegment) -> Option<BlockNumber> {
1885        self.get_lowest_range(segment).map(|range| range.start())
1886    }
1887
1888    /// Gets the lowest static file's block range end if it exists for a static file segment.
1889    ///
1890    /// For example if the static file has blocks 0-499, this will return 499.
1891    ///
1892    /// If there is nothing on disk for the given segment, this will return [`None`].
1893    pub fn get_lowest_range_end(&self, segment: StaticFileSegment) -> Option<BlockNumber> {
1894        self.get_lowest_range(segment).map(|range| range.end())
1895    }
1896
1897    /// Gets the highest static file's block height if it exists for a static file segment.
1898    ///
1899    /// If there is nothing on disk for the given segment, this will return [`None`].
1900    pub fn get_highest_static_file_block(&self, segment: StaticFileSegment) -> Option<BlockNumber> {
1901        self.indexes.read().get(segment).map(|index| index.max_block)
1902    }
1903
1904    /// Converts a range to a bounded `RangeInclusive` capped to the highest static file block.
1905    ///
1906    /// This is necessary because static file iteration beyond the tip would loop forever:
1907    /// blocks beyond the static file tip return `Ok(empty)` which is indistinguishable from
1908    /// blocks with no changes. We cap the end to the highest available block regardless of
1909    /// whether the input was unbounded or an explicit large value like `BlockNumber::MAX`.
1910    fn bound_range(
1911        &self,
1912        range: impl RangeBounds<BlockNumber>,
1913        segment: StaticFileSegment,
1914    ) -> RangeInclusive<BlockNumber> {
1915        let highest_block = self.get_highest_static_file_block(segment).unwrap_or(0);
1916
1917        let start = match range.start_bound() {
1918            Bound::Included(&n) => n,
1919            Bound::Excluded(&n) => n.saturating_add(1),
1920            Bound::Unbounded => 0,
1921        };
1922        let end = match range.end_bound() {
1923            Bound::Included(&n) => n.min(highest_block),
1924            Bound::Excluded(&n) => n.saturating_sub(1).min(highest_block),
1925            Bound::Unbounded => highest_block,
1926        };
1927
1928        start..=end
1929    }
1930
1931    /// Gets the highest static file transaction.
1932    ///
1933    /// If there is nothing on disk for the given segment, this will return [`None`].
1934    pub fn get_highest_static_file_tx(&self, segment: StaticFileSegment) -> Option<TxNumber> {
1935        self.indexes
1936            .read()
1937            .get(segment)
1938            .and_then(|index| index.available_block_ranges_by_max_tx.as_ref())
1939            .and_then(|index| index.last_key_value().map(|(last_tx, _)| *last_tx))
1940    }
1941
1942    /// Gets the highest static file block for all segments.
1943    pub fn get_highest_static_files(&self) -> HighestStaticFiles {
1944        HighestStaticFiles {
1945            receipts: self.get_highest_static_file_block(StaticFileSegment::Receipts),
1946        }
1947    }
1948
1949    /// Iterates through segment `static_files` in reverse order, executing a function until it
1950    /// returns some object. Useful for finding objects by [`TxHash`] or [`BlockHash`].
1951    pub fn find_static_file<T>(
1952        &self,
1953        segment: StaticFileSegment,
1954        func: impl Fn(StaticFileJarProvider<'_, N>) -> ProviderResult<Option<T>>,
1955    ) -> ProviderResult<Option<T>> {
1956        if let Some(ranges) =
1957            self.indexes.read().get(segment).map(|index| &index.expected_block_ranges_by_max_block)
1958        {
1959            // Iterate through all ranges in reverse order (highest to lowest)
1960            for range in ranges.values().rev() {
1961                if let Some(res) = func(self.get_or_create_jar_provider(segment, range)?)? {
1962                    return Ok(Some(res));
1963                }
1964            }
1965        }
1966
1967        Ok(None)
1968    }
1969
1970    /// Fetches data within a specified range across multiple static files.
1971    ///
1972    /// This function iteratively retrieves data using `get_fn` for each item in the given range.
1973    /// It continues fetching until the end of the range is reached or the provided `predicate`
1974    /// returns false.
1975    pub fn fetch_range_with_predicate<T, F, P>(
1976        &self,
1977        segment: StaticFileSegment,
1978        range: Range<u64>,
1979        mut get_fn: F,
1980        mut predicate: P,
1981    ) -> ProviderResult<Vec<T>>
1982    where
1983        F: FnMut(&mut StaticFileCursor<'_>, u64) -> ProviderResult<Option<T>>,
1984        P: FnMut(&T) -> bool,
1985    {
1986        let mut result = Vec::with_capacity((range.end - range.start).min(100) as usize);
1987
1988        /// Resolves to the provider for the given block or transaction number.
1989        ///
1990        /// If the static file is missing, the `result` is returned.
1991        macro_rules! get_provider {
1992            ($number:expr) => {{
1993                match self.get_segment_provider(segment, $number) {
1994                    Ok(provider) => provider,
1995                    Err(
1996                        ProviderError::MissingStaticFileBlock(_, _) |
1997                        ProviderError::MissingStaticFileTx(_, _),
1998                    ) => return Ok(result),
1999                    Err(err) => return Err(err),
2000                }
2001            }};
2002        }
2003
2004        let mut provider = get_provider!(range.start);
2005        let mut cursor = provider.cursor()?;
2006
2007        // advances number in range
2008        'outer: for number in range {
2009            // The `retrying` flag ensures a single retry attempt per `number`. If `get_fn` fails to
2010            // access data in two different static files, it halts further attempts by returning
2011            // an error, effectively preventing infinite retry loops.
2012            let mut retrying = false;
2013
2014            // advances static files if `get_fn` returns None
2015            'inner: loop {
2016                match get_fn(&mut cursor, number)? {
2017                    Some(res) => {
2018                        if !predicate(&res) {
2019                            break 'outer;
2020                        }
2021                        result.push(res);
2022                        break 'inner;
2023                    }
2024                    None => {
2025                        if retrying {
2026                            return Ok(result);
2027                        }
2028                        // There is a very small chance of hitting a deadlock if two consecutive
2029                        // static files share the same bucket in the
2030                        // internal dashmap and we don't drop the current provider
2031                        // before requesting the next one.
2032                        drop(cursor);
2033                        drop(provider);
2034                        provider = get_provider!(number);
2035                        cursor = provider.cursor()?;
2036                        retrying = true;
2037                    }
2038                }
2039            }
2040        }
2041
2042        result.shrink_to_fit();
2043
2044        Ok(result)
2045    }
2046
2047    /// Fetches data within a specified range across multiple static files.
2048    ///
2049    /// Returns an iterator over the data. Yields [`None`] if the data for the specified number is
2050    /// not found.
2051    pub fn fetch_range_iter<'a, T, F>(
2052        &'a self,
2053        segment: StaticFileSegment,
2054        range: Range<u64>,
2055        get_fn: F,
2056    ) -> ProviderResult<impl Iterator<Item = ProviderResult<Option<T>>> + 'a>
2057    where
2058        F: Fn(&mut StaticFileCursor<'_>, u64) -> ProviderResult<Option<T>> + 'a,
2059        T: std::fmt::Debug,
2060    {
2061        let mut provider = self.get_maybe_segment_provider(segment, range.start)?;
2062        Ok(range.map(move |number| {
2063            match provider
2064                .as_ref()
2065                .map(|provider| get_fn(&mut provider.cursor()?, number))
2066                .and_then(|result| result.transpose())
2067            {
2068                Some(result) => result.map(Some),
2069                None => {
2070                    // There is a very small chance of hitting a deadlock if two consecutive
2071                    // static files share the same bucket in the internal dashmap and we don't drop
2072                    // the current provider before requesting the next one.
2073                    provider.take();
2074                    provider = self.get_maybe_segment_provider(segment, number)?;
2075                    provider
2076                        .as_ref()
2077                        .map(|provider| get_fn(&mut provider.cursor()?, number))
2078                        .and_then(|result| result.transpose())
2079                        .transpose()
2080                }
2081            }
2082        }))
2083    }
2084
2085    /// Returns directory where `static_files` are located.
2086    pub fn directory(&self) -> &Path {
2087        &self.path
2088    }
2089
2090    /// Retrieves data from the database or static file, wherever it's available.
2091    ///
2092    /// # Arguments
2093    /// * `segment` - The segment of the static file to check against.
2094    /// * `index_key` - Requested index key, usually a block or transaction number.
2095    /// * `fetch_from_static_file` - A closure that defines how to fetch the data from the static
2096    ///   file provider.
2097    /// * `fetch_from_database` - A closure that defines how to fetch the data from the database
2098    ///   when the static file doesn't contain the required data or is not available.
2099    pub fn get_with_static_file_or_database<T, FS, FD>(
2100        &self,
2101        segment: StaticFileSegment,
2102        number: u64,
2103        fetch_from_static_file: FS,
2104        fetch_from_database: FD,
2105    ) -> ProviderResult<Option<T>>
2106    where
2107        FS: Fn(&Self) -> ProviderResult<Option<T>>,
2108        FD: Fn() -> ProviderResult<Option<T>>,
2109    {
2110        // If there is, check the maximum block or transaction number of the segment.
2111        let static_file_upper_bound = if segment.is_block_or_change_based() {
2112            self.get_highest_static_file_block(segment)
2113        } else {
2114            self.get_highest_static_file_tx(segment)
2115        };
2116
2117        if static_file_upper_bound
2118            .is_some_and(|static_file_upper_bound| static_file_upper_bound >= number)
2119        {
2120            return fetch_from_static_file(self);
2121        }
2122        fetch_from_database()
2123    }
2124
2125    /// Gets data within a specified range, potentially spanning different `static_files` and
2126    /// database.
2127    ///
2128    /// # Arguments
2129    /// * `segment` - The segment of the static file to query.
2130    /// * `block_or_tx_range` - The range of data to fetch.
2131    /// * `fetch_from_static_file` - A function to fetch data from the `static_file`.
2132    /// * `fetch_from_database` - A function to fetch data from the database.
2133    /// * `predicate` - A function used to evaluate each item in the fetched data. Fetching is
2134    ///   terminated when this function returns false, thereby filtering the data based on the
2135    ///   provided condition.
2136    pub fn get_range_with_static_file_or_database<T, P, FS, FD>(
2137        &self,
2138        segment: StaticFileSegment,
2139        mut block_or_tx_range: Range<u64>,
2140        fetch_from_static_file: FS,
2141        mut fetch_from_database: FD,
2142        mut predicate: P,
2143    ) -> ProviderResult<Vec<T>>
2144    where
2145        FS: Fn(&Self, Range<u64>, &mut P) -> ProviderResult<Vec<T>>,
2146        FD: FnMut(Range<u64>, P) -> ProviderResult<Vec<T>>,
2147        P: FnMut(&T) -> bool,
2148    {
2149        let mut data = Vec::new();
2150
2151        // If there is, check the maximum block or transaction number of the segment.
2152        if let Some(static_file_upper_bound) = if segment.is_block_or_change_based() {
2153            self.get_highest_static_file_block(segment)
2154        } else {
2155            self.get_highest_static_file_tx(segment)
2156        } && block_or_tx_range.start <= static_file_upper_bound
2157        {
2158            let end = block_or_tx_range.end.min(static_file_upper_bound + 1);
2159            data.extend(fetch_from_static_file(
2160                self,
2161                block_or_tx_range.start..end,
2162                &mut predicate,
2163            )?);
2164            block_or_tx_range.start = end;
2165        }
2166
2167        if block_or_tx_range.end > block_or_tx_range.start {
2168            data.extend(fetch_from_database(block_or_tx_range, predicate)?)
2169        }
2170
2171        Ok(data)
2172    }
2173
2174    /// Returns static files directory
2175    #[cfg(any(test, feature = "test-utils"))]
2176    pub fn path(&self) -> &Path {
2177        &self.path
2178    }
2179
2180    /// Returns transaction index
2181    #[cfg(any(test, feature = "test-utils"))]
2182    pub fn tx_index(&self, segment: StaticFileSegment) -> Option<SegmentRanges> {
2183        self.indexes
2184            .read()
2185            .get(segment)
2186            .and_then(|index| index.available_block_ranges_by_max_tx.as_ref())
2187            .cloned()
2188    }
2189
2190    /// Returns expected block index
2191    #[cfg(any(test, feature = "test-utils"))]
2192    pub fn expected_block_index(&self, segment: StaticFileSegment) -> Option<SegmentRanges> {
2193        self.indexes
2194            .read()
2195            .get(segment)
2196            .map(|index| &index.expected_block_ranges_by_max_block)
2197            .cloned()
2198    }
2199}
2200
2201#[derive(Debug)]
2202struct StaticFileSegmentIndex {
2203    /// Min static file block range.
2204    ///
2205    /// This index is initialized on launch to keep track of the lowest, non-expired static file
2206    /// per segment and gets updated on [`StaticFileProvider::update_index`].
2207    ///
2208    /// This tracks the lowest static file per segment together with the block range in that
2209    /// file. E.g. static file is batched in 500k block intervals then the lowest static file
2210    /// is [0..499K], and the block range is start = 0, end = 499K.
2211    ///
2212    /// This index is mainly used for history expiry, which targets transactions, e.g. pre-merge
2213    /// history expiry would lead to removing all static files below the merge height.
2214    min_block_range: Option<SegmentRangeInclusive>,
2215    /// Max static file block.
2216    max_block: u64,
2217    /// Expected static file block ranges indexed by max expected blocks.
2218    ///
2219    /// For example, a static file for expected block range `0..=499_000` may have only block range
2220    /// `0..=1000` contained in it, as it's not fully filled yet. This index maps the max expected
2221    /// block to the expected range, i.e. block `499_000` to block range `0..=499_000`.
2222    expected_block_ranges_by_max_block: SegmentRanges,
2223    /// Available on disk static file block ranges indexed by max transactions.
2224    ///
2225    /// For example, a static file for block range `0..=499_000` may only have block range
2226    /// `0..=1000` and transaction range `0..=2000` contained in it. This index maps the max
2227    /// available transaction to the available block range, i.e. transaction `2000` to block range
2228    /// `0..=1000`.
2229    available_block_ranges_by_max_tx: Option<SegmentRanges>,
2230}
2231
2232/// Helper trait to manage different [`StaticFileProviderRW`] of an `Arc<StaticFileProvider`
2233pub trait StaticFileWriter {
2234    /// The primitives type used by the static file provider.
2235    type Primitives: Send + Sync + 'static;
2236
2237    /// Returns a mutable reference to a [`StaticFileProviderRW`] of a [`StaticFileSegment`].
2238    fn get_writer(
2239        &self,
2240        block: BlockNumber,
2241        segment: StaticFileSegment,
2242    ) -> ProviderResult<StaticFileProviderRWRefMut<'_, Self::Primitives>>;
2243
2244    /// Returns a mutable reference to a [`StaticFileProviderRW`] of the latest
2245    /// [`StaticFileSegment`].
2246    fn latest_writer(
2247        &self,
2248        segment: StaticFileSegment,
2249    ) -> ProviderResult<StaticFileProviderRWRefMut<'_, Self::Primitives>>;
2250
2251    /// Commits all changes of all [`StaticFileProviderRW`] of all [`StaticFileSegment`].
2252    fn commit(&self) -> ProviderResult<()>;
2253
2254    /// Returns `true` if the static file provider has unwind queued.
2255    fn has_unwind_queued(&self) -> bool;
2256
2257    /// Finalizes all static file writers by committing their configuration to disk.
2258    ///
2259    /// Returns an error if prune is queued (use [`Self::commit`] instead).
2260    fn finalize(&self) -> ProviderResult<()>;
2261}
2262
2263impl<N: NodePrimitives> StaticFileWriter for StaticFileProvider<N> {
2264    type Primitives = N;
2265
2266    fn get_writer(
2267        &self,
2268        block: BlockNumber,
2269        segment: StaticFileSegment,
2270    ) -> ProviderResult<StaticFileProviderRWRefMut<'_, Self::Primitives>> {
2271        if self.access.is_read_only() {
2272            return Err(ProviderError::ReadOnlyStaticFileAccess);
2273        }
2274
2275        trace!(target: "providers::static_file", ?block, ?segment, "Getting static file writer.");
2276        self.writers.get_or_create(segment, || {
2277            StaticFileProviderRW::new(segment, block, Arc::downgrade(&self.0), self.metrics.clone())
2278        })
2279    }
2280
2281    fn latest_writer(
2282        &self,
2283        segment: StaticFileSegment,
2284    ) -> ProviderResult<StaticFileProviderRWRefMut<'_, Self::Primitives>> {
2285        let genesis_number = self.0.as_ref().genesis_block_number();
2286        self.get_writer(
2287            self.get_highest_static_file_block(segment).unwrap_or(genesis_number),
2288            segment,
2289        )
2290    }
2291
2292    fn commit(&self) -> ProviderResult<()> {
2293        self.writers.commit()
2294    }
2295
2296    fn has_unwind_queued(&self) -> bool {
2297        self.writers.has_unwind_queued()
2298    }
2299
2300    fn finalize(&self) -> ProviderResult<()> {
2301        self.writers.finalize()
2302    }
2303}
2304
2305impl<N: NodePrimitives> ChangeSetReader for StaticFileProvider<N> {
2306    fn account_block_changeset(
2307        &self,
2308        block_number: BlockNumber,
2309    ) -> ProviderResult<Vec<reth_db::models::AccountBeforeTx>> {
2310        let provider = match self.get_segment_provider_for_block(
2311            StaticFileSegment::AccountChangeSets,
2312            block_number,
2313            None,
2314        ) {
2315            Ok(provider) => provider,
2316            Err(ProviderError::MissingStaticFileBlock(_, _)) => return Ok(Vec::new()),
2317            Err(err) => return Err(err),
2318        };
2319
2320        if let Some(offset) = provider.read_changeset_offset(block_number)? {
2321            let mut cursor = provider.cursor()?;
2322            let mut changeset = Vec::with_capacity(offset.num_changes() as usize);
2323
2324            for i in offset.changeset_range() {
2325                if let Some(change) =
2326                    cursor.get_one::<reth_db::static_file::AccountChangesetMask>(i.into())?
2327                {
2328                    changeset.push(change)
2329                }
2330            }
2331            Ok(changeset)
2332        } else {
2333            Ok(Vec::new())
2334        }
2335    }
2336
2337    fn get_account_before_block(
2338        &self,
2339        block_number: BlockNumber,
2340        address: Address,
2341    ) -> ProviderResult<Option<reth_db::models::AccountBeforeTx>> {
2342        let provider = match self.get_segment_provider_for_block(
2343            StaticFileSegment::AccountChangeSets,
2344            block_number,
2345            None,
2346        ) {
2347            Ok(provider) => provider,
2348            Err(ProviderError::MissingStaticFileBlock(_, _)) => return Ok(None),
2349            Err(err) => return Err(err),
2350        };
2351
2352        let Some(offset) = provider.read_changeset_offset(block_number)? else {
2353            return Ok(None);
2354        };
2355
2356        let mut cursor = provider.cursor()?;
2357        let range = offset.changeset_range();
2358        let mut low = range.start;
2359        let mut high = range.end;
2360
2361        while low < high {
2362            let mid = low + (high - low) / 2;
2363            if let Some(change) =
2364                cursor.get_one::<reth_db::static_file::AccountChangesetMask>(mid.into())?
2365            {
2366                if change.address < address {
2367                    low = mid + 1;
2368                } else {
2369                    high = mid;
2370                }
2371            } else {
2372                // This is not expected but means we are out of the range / file somehow, and can't
2373                // continue
2374                debug!(
2375                    target: "providers::static_file",
2376                    ?low,
2377                    ?mid,
2378                    ?high,
2379                    ?range,
2380                    ?block_number,
2381                    ?address,
2382                    "Cannot continue binary search for account changeset fetch"
2383                );
2384                low = range.end;
2385                break;
2386            }
2387        }
2388
2389        if low < range.end &&
2390            let Some(change) = cursor
2391                .get_one::<reth_db::static_file::AccountChangesetMask>(low.into())?
2392                .filter(|change| change.address == address)
2393        {
2394            return Ok(Some(change));
2395        }
2396
2397        Ok(None)
2398    }
2399
2400    fn account_changesets_range(
2401        &self,
2402        range: impl core::ops::RangeBounds<BlockNumber>,
2403    ) -> ProviderResult<Vec<(BlockNumber, reth_db::models::AccountBeforeTx)>> {
2404        let range = self.bound_range(range, StaticFileSegment::AccountChangeSets);
2405        self.walk_account_changeset_range(range).collect()
2406    }
2407}
2408
2409impl<N: NodePrimitives> StorageChangeSetReader for StaticFileProvider<N> {
2410    fn storage_changeset(
2411        &self,
2412        block_number: BlockNumber,
2413    ) -> ProviderResult<Vec<(BlockNumberAddress, StorageEntry)>> {
2414        let provider = match self.get_segment_provider_for_block(
2415            StaticFileSegment::StorageChangeSets,
2416            block_number,
2417            None,
2418        ) {
2419            Ok(provider) => provider,
2420            Err(ProviderError::MissingStaticFileBlock(_, _)) => return Ok(Vec::new()),
2421            Err(err) => return Err(err),
2422        };
2423
2424        if let Some(offset) = provider.read_changeset_offset(block_number)? {
2425            let mut cursor = provider.cursor()?;
2426            let mut changeset = Vec::with_capacity(offset.num_changes() as usize);
2427
2428            for i in offset.changeset_range() {
2429                if let Some(change) = cursor.get_one::<StorageChangesetMask>(i.into())? {
2430                    let block_address = BlockNumberAddress((block_number, change.address));
2431                    let entry = StorageEntry { key: change.key, value: change.value };
2432                    changeset.push((block_address, entry));
2433                }
2434            }
2435            Ok(changeset)
2436        } else {
2437            Ok(Vec::new())
2438        }
2439    }
2440
2441    fn get_storage_before_block(
2442        &self,
2443        block_number: BlockNumber,
2444        address: Address,
2445        storage_key: B256,
2446    ) -> ProviderResult<Option<StorageEntry>> {
2447        let provider = match self.get_segment_provider_for_block(
2448            StaticFileSegment::StorageChangeSets,
2449            block_number,
2450            None,
2451        ) {
2452            Ok(provider) => provider,
2453            Err(ProviderError::MissingStaticFileBlock(_, _)) => return Ok(None),
2454            Err(err) => return Err(err),
2455        };
2456
2457        let Some(offset) = provider.read_changeset_offset(block_number)? else {
2458            return Ok(None);
2459        };
2460
2461        let mut cursor = provider.cursor()?;
2462        let range = offset.changeset_range();
2463        let mut low = range.start;
2464        let mut high = range.end;
2465
2466        while low < high {
2467            let mid = low + (high - low) / 2;
2468            if let Some(change) = cursor.get_one::<StorageChangesetMask>(mid.into())? {
2469                match (change.address, change.key).cmp(&(address, storage_key)) {
2470                    std::cmp::Ordering::Less => low = mid + 1,
2471                    _ => high = mid,
2472                }
2473            } else {
2474                debug!(
2475                    target: "providers::static_file",
2476                    ?low,
2477                    ?mid,
2478                    ?high,
2479                    ?range,
2480                    ?block_number,
2481                    ?address,
2482                    ?storage_key,
2483                    "Cannot continue binary search for storage changeset fetch"
2484                );
2485                low = range.end;
2486                break;
2487            }
2488        }
2489
2490        if low < range.end &&
2491            let Some(change) = cursor
2492                .get_one::<StorageChangesetMask>(low.into())?
2493                .filter(|change| change.address == address && change.key == storage_key)
2494        {
2495            return Ok(Some(StorageEntry { key: change.key, value: change.value }));
2496        }
2497
2498        Ok(None)
2499    }
2500
2501    fn storage_changesets_range(
2502        &self,
2503        range: impl RangeBounds<BlockNumber>,
2504    ) -> ProviderResult<Vec<(BlockNumberAddress, StorageEntry)>> {
2505        let range = self.bound_range(range, StaticFileSegment::StorageChangeSets);
2506        self.walk_storage_changeset_range(range).collect()
2507    }
2508}
2509
2510impl<N: NodePrimitives> StaticFileProvider<N> {
2511    /// Creates an iterator for walking through account changesets in the specified block range.
2512    ///
2513    /// This returns a lazy iterator that fetches changesets block by block to avoid loading
2514    /// everything into memory at once.
2515    ///
2516    /// Accepts any range type that implements `RangeBounds<BlockNumber>`, including:
2517    /// - `Range<BlockNumber>` (e.g., `0..100`)
2518    /// - `RangeInclusive<BlockNumber>` (e.g., `0..=99`)
2519    /// - `RangeFrom<BlockNumber>` (e.g., `0..`) - iterates until exhausted
2520    pub fn walk_account_changeset_range(
2521        &self,
2522        range: impl RangeBounds<BlockNumber>,
2523    ) -> StaticFileAccountChangesetWalker<Self> {
2524        StaticFileAccountChangesetWalker::new(self.clone(), range)
2525    }
2526
2527    /// Creates an iterator for walking through storage changesets in the specified block range.
2528    pub fn walk_storage_changeset_range(
2529        &self,
2530        range: impl RangeBounds<BlockNumber>,
2531    ) -> StaticFileStorageChangesetWalker<Self> {
2532        StaticFileStorageChangesetWalker::new(self.clone(), range)
2533    }
2534}
2535
2536impl<N: NodePrimitives<BlockHeader: Value>> HeaderProvider for StaticFileProvider<N> {
2537    type Header = N::BlockHeader;
2538
2539    fn header(&self, block_hash: BlockHash) -> ProviderResult<Option<Self::Header>> {
2540        self.find_static_file(StaticFileSegment::Headers, |jar_provider| {
2541            Ok(jar_provider
2542                .cursor()?
2543                .get_two::<HeaderWithHashMask<Self::Header>>((&block_hash).into())?
2544                .and_then(|(header, hash)| {
2545                    if hash == block_hash {
2546                        return Some(header);
2547                    }
2548                    None
2549                }))
2550        })
2551    }
2552
2553    fn header_by_number(&self, num: BlockNumber) -> ProviderResult<Option<Self::Header>> {
2554        self.get_segment_provider_for_block(StaticFileSegment::Headers, num, None)
2555            .and_then(|provider| provider.header_by_number(num))
2556            .or_else(|err| {
2557                if let ProviderError::MissingStaticFileBlock(_, _) = err {
2558                    Ok(None)
2559                } else {
2560                    Err(err)
2561                }
2562            })
2563    }
2564
2565    fn headers_range(
2566        &self,
2567        range: impl RangeBounds<BlockNumber>,
2568    ) -> ProviderResult<Vec<Self::Header>> {
2569        self.fetch_range_with_predicate(
2570            StaticFileSegment::Headers,
2571            to_range(range),
2572            |cursor, number| cursor.get_one::<HeaderMask<Self::Header>>(number.into()),
2573            |_| true,
2574        )
2575    }
2576
2577    fn sealed_header(
2578        &self,
2579        num: BlockNumber,
2580    ) -> ProviderResult<Option<SealedHeader<Self::Header>>> {
2581        self.get_segment_provider_for_block(StaticFileSegment::Headers, num, None)
2582            .and_then(|provider| provider.sealed_header(num))
2583            .or_else(|err| {
2584                if let ProviderError::MissingStaticFileBlock(_, _) = err {
2585                    Ok(None)
2586                } else {
2587                    Err(err)
2588                }
2589            })
2590    }
2591
2592    fn sealed_headers_while(
2593        &self,
2594        range: impl RangeBounds<BlockNumber>,
2595        predicate: impl FnMut(&SealedHeader<Self::Header>) -> bool,
2596    ) -> ProviderResult<Vec<SealedHeader<Self::Header>>> {
2597        self.fetch_range_with_predicate(
2598            StaticFileSegment::Headers,
2599            to_range(range),
2600            |cursor, number| {
2601                Ok(cursor
2602                    .get_two::<HeaderWithHashMask<Self::Header>>(number.into())?
2603                    .map(|(header, hash)| SealedHeader::new(header, hash)))
2604            },
2605            predicate,
2606        )
2607    }
2608}
2609
2610impl<N: NodePrimitives> BlockHashReader for StaticFileProvider<N> {
2611    fn block_hash(&self, num: u64) -> ProviderResult<Option<B256>> {
2612        self.get_segment_provider_for_block(StaticFileSegment::Headers, num, None)
2613            .and_then(|provider| provider.block_hash(num))
2614            .or_else(|err| {
2615                if let ProviderError::MissingStaticFileBlock(_, _) = err {
2616                    Ok(None)
2617                } else {
2618                    Err(err)
2619                }
2620            })
2621    }
2622
2623    fn canonical_hashes_range(
2624        &self,
2625        start: BlockNumber,
2626        end: BlockNumber,
2627    ) -> ProviderResult<Vec<B256>> {
2628        self.fetch_range_with_predicate(
2629            StaticFileSegment::Headers,
2630            start..end,
2631            |cursor, number| cursor.get_one::<BlockHashMask>(number.into()),
2632            |_| true,
2633        )
2634    }
2635}
2636
2637impl<N: NodePrimitives<SignedTx: Value + SignedTransaction, Receipt: Value>> ReceiptProvider
2638    for StaticFileProvider<N>
2639{
2640    type Receipt = N::Receipt;
2641
2642    fn receipt(&self, num: TxNumber) -> ProviderResult<Option<Self::Receipt>> {
2643        self.get_segment_provider_for_transaction(StaticFileSegment::Receipts, num, None)
2644            .and_then(|provider| provider.receipt(num))
2645            .or_else(|err| {
2646                if let ProviderError::MissingStaticFileTx(_, _) = err {
2647                    Ok(None)
2648                } else {
2649                    Err(err)
2650                }
2651            })
2652    }
2653
2654    fn receipt_by_hash(&self, hash: TxHash) -> ProviderResult<Option<Self::Receipt>> {
2655        if let Some(num) = self.transaction_id(hash)? {
2656            return self.receipt(num);
2657        }
2658        Ok(None)
2659    }
2660
2661    fn receipts_by_block(
2662        &self,
2663        _block: BlockHashOrNumber,
2664    ) -> ProviderResult<Option<Vec<Self::Receipt>>> {
2665        unreachable!()
2666    }
2667
2668    fn receipts_by_tx_range(
2669        &self,
2670        range: impl RangeBounds<TxNumber>,
2671    ) -> ProviderResult<Vec<Self::Receipt>> {
2672        self.fetch_range_with_predicate(
2673            StaticFileSegment::Receipts,
2674            to_range(range),
2675            |cursor, number| cursor.get_one::<ReceiptMask<Self::Receipt>>(number.into()),
2676            |_| true,
2677        )
2678    }
2679
2680    fn receipts_by_block_range(
2681        &self,
2682        _block_range: RangeInclusive<BlockNumber>,
2683    ) -> ProviderResult<Vec<Vec<Self::Receipt>>> {
2684        Err(ProviderError::UnsupportedProvider)
2685    }
2686}
2687
2688impl<N: NodePrimitives<SignedTx: Value, Receipt: Value, BlockHeader: Value>> TransactionsProviderExt
2689    for StaticFileProvider<N>
2690{
2691    fn transaction_hashes_by_range(
2692        &self,
2693        tx_range: Range<TxNumber>,
2694    ) -> ProviderResult<Vec<(TxHash, TxNumber)>> {
2695        let tx_range_size = (tx_range.end - tx_range.start) as usize;
2696
2697        // Transactions are different size, so chunks will not all take the same processing time. If
2698        // chunks are too big, there will be idle threads waiting for work. Choosing an
2699        // arbitrary smaller value to make sure it doesn't happen.
2700        let chunk_size = 100;
2701
2702        // iterator over the chunks
2703        let chunks = tx_range
2704            .clone()
2705            .step_by(chunk_size)
2706            .map(|start| start..std::cmp::min(start + chunk_size as u64, tx_range.end));
2707        let mut channels = Vec::with_capacity(tx_range_size.div_ceil(chunk_size));
2708
2709        for chunk_range in chunks {
2710            let (channel_tx, channel_rx) = mpsc::channel();
2711            channels.push(channel_rx);
2712
2713            let manager = self.clone();
2714
2715            // Spawn the task onto the global rayon pool
2716            // This task will send the results through the channel after it has calculated
2717            // the hash.
2718            rayon::spawn(move || {
2719                let mut rlp_buf = Vec::with_capacity(128);
2720                let _ = manager.fetch_range_with_predicate(
2721                    StaticFileSegment::Transactions,
2722                    chunk_range,
2723                    |cursor, number| {
2724                        Ok(cursor
2725                            .get_one::<TransactionMask<Self::Transaction>>(number.into())?
2726                            .map(|transaction| {
2727                                rlp_buf.clear();
2728                                let _ = channel_tx
2729                                    .send(calculate_hash((number, transaction), &mut rlp_buf));
2730                            }))
2731                    },
2732                    |_| true,
2733                );
2734            });
2735        }
2736
2737        let mut tx_list = Vec::with_capacity(tx_range_size);
2738
2739        // Iterate over channels and append the tx hashes unsorted
2740        for channel in channels {
2741            while let Ok(tx) = channel.recv() {
2742                let (tx_hash, tx_id) = tx.map_err(|boxed| *boxed)?;
2743                tx_list.push((tx_hash, tx_id));
2744            }
2745        }
2746
2747        Ok(tx_list)
2748    }
2749}
2750
2751impl<N: NodePrimitives<SignedTx: Decompress + SignedTransaction>> TransactionsProvider
2752    for StaticFileProvider<N>
2753{
2754    type Transaction = N::SignedTx;
2755
2756    fn transaction_id(&self, tx_hash: TxHash) -> ProviderResult<Option<TxNumber>> {
2757        self.find_static_file(StaticFileSegment::Transactions, |jar_provider| {
2758            let mut cursor = jar_provider.cursor()?;
2759            if cursor
2760                .get_one::<TransactionMask<Self::Transaction>>((&tx_hash).into())?
2761                .and_then(|tx| (tx.trie_hash() == tx_hash).then_some(tx))
2762                .is_some()
2763            {
2764                Ok(cursor.number())
2765            } else {
2766                Ok(None)
2767            }
2768        })
2769    }
2770
2771    fn transaction_by_id(&self, num: TxNumber) -> ProviderResult<Option<Self::Transaction>> {
2772        self.get_segment_provider_for_transaction(StaticFileSegment::Transactions, num, None)
2773            .and_then(|provider| provider.transaction_by_id(num))
2774            .or_else(|err| {
2775                if let ProviderError::MissingStaticFileTx(_, _) = err {
2776                    Ok(None)
2777                } else {
2778                    Err(err)
2779                }
2780            })
2781    }
2782
2783    fn transaction_by_id_unhashed(
2784        &self,
2785        num: TxNumber,
2786    ) -> ProviderResult<Option<Self::Transaction>> {
2787        self.get_segment_provider_for_transaction(StaticFileSegment::Transactions, num, None)
2788            .and_then(|provider| provider.transaction_by_id_unhashed(num))
2789            .or_else(|err| {
2790                if let ProviderError::MissingStaticFileTx(_, _) = err {
2791                    Ok(None)
2792                } else {
2793                    Err(err)
2794                }
2795            })
2796    }
2797
2798    fn transaction_by_hash(&self, hash: TxHash) -> ProviderResult<Option<Self::Transaction>> {
2799        self.find_static_file(StaticFileSegment::Transactions, |jar_provider| {
2800            Ok(jar_provider
2801                .cursor()?
2802                .get_one::<TransactionMask<Self::Transaction>>((&hash).into())?
2803                .and_then(|tx| (tx.trie_hash() == hash).then_some(tx)))
2804        })
2805    }
2806
2807    fn transaction_by_hash_with_meta(
2808        &self,
2809        _hash: TxHash,
2810    ) -> ProviderResult<Option<(Self::Transaction, TransactionMeta)>> {
2811        // Required data not present in static_files
2812        Err(ProviderError::UnsupportedProvider)
2813    }
2814
2815    fn transactions_by_block(
2816        &self,
2817        _block_id: BlockHashOrNumber,
2818    ) -> ProviderResult<Option<Vec<Self::Transaction>>> {
2819        // Required data not present in static_files
2820        Err(ProviderError::UnsupportedProvider)
2821    }
2822
2823    fn transactions_by_block_range(
2824        &self,
2825        _range: impl RangeBounds<BlockNumber>,
2826    ) -> ProviderResult<Vec<Vec<Self::Transaction>>> {
2827        // Required data not present in static_files
2828        Err(ProviderError::UnsupportedProvider)
2829    }
2830
2831    fn transactions_by_tx_range(
2832        &self,
2833        range: impl RangeBounds<TxNumber>,
2834    ) -> ProviderResult<Vec<Self::Transaction>> {
2835        self.fetch_range_with_predicate(
2836            StaticFileSegment::Transactions,
2837            to_range(range),
2838            |cursor, number| cursor.get_one::<TransactionMask<Self::Transaction>>(number.into()),
2839            |_| true,
2840        )
2841    }
2842
2843    fn senders_by_tx_range(
2844        &self,
2845        range: impl RangeBounds<TxNumber>,
2846    ) -> ProviderResult<Vec<Address>> {
2847        self.fetch_range_with_predicate(
2848            StaticFileSegment::TransactionSenders,
2849            to_range(range),
2850            |cursor, number| cursor.get_one::<TransactionSenderMask>(number.into()),
2851            |_| true,
2852        )
2853    }
2854
2855    fn transaction_sender(&self, id: TxNumber) -> ProviderResult<Option<Address>> {
2856        self.get_segment_provider_for_transaction(StaticFileSegment::TransactionSenders, id, None)
2857            .and_then(|provider| provider.transaction_sender(id))
2858            .or_else(|err| {
2859                if let ProviderError::MissingStaticFileTx(_, _) = err {
2860                    Ok(None)
2861                } else {
2862                    Err(err)
2863                }
2864            })
2865    }
2866}
2867
2868impl<N: NodePrimitives> BlockNumReader for StaticFileProvider<N> {
2869    fn chain_info(&self) -> ProviderResult<ChainInfo> {
2870        // Required data not present in static_files
2871        Err(ProviderError::UnsupportedProvider)
2872    }
2873
2874    fn best_block_number(&self) -> ProviderResult<BlockNumber> {
2875        // Required data not present in static_files
2876        Err(ProviderError::UnsupportedProvider)
2877    }
2878
2879    fn last_block_number(&self) -> ProviderResult<BlockNumber> {
2880        Ok(self.get_highest_static_file_block(StaticFileSegment::Headers).unwrap_or_default())
2881    }
2882
2883    fn block_number(&self, _hash: B256) -> ProviderResult<Option<BlockNumber>> {
2884        // Required data not present in static_files
2885        Err(ProviderError::UnsupportedProvider)
2886    }
2887}
2888
2889/* Cannot be successfully implemented but must exist for trait requirements */
2890
2891impl<N: NodePrimitives<SignedTx: Value, Receipt: Value, BlockHeader: Value>> BlockReader
2892    for StaticFileProvider<N>
2893{
2894    type Block = N::Block;
2895
2896    fn find_block_by_hash(
2897        &self,
2898        _hash: B256,
2899        _source: BlockSource,
2900    ) -> ProviderResult<Option<Self::Block>> {
2901        // Required data not present in static_files
2902        Err(ProviderError::UnsupportedProvider)
2903    }
2904
2905    fn block(&self, _id: BlockHashOrNumber) -> ProviderResult<Option<Self::Block>> {
2906        // Required data not present in static_files
2907        Err(ProviderError::UnsupportedProvider)
2908    }
2909
2910    fn pending_block(&self) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
2911        // Required data not present in static_files
2912        Err(ProviderError::UnsupportedProvider)
2913    }
2914
2915    fn pending_block_and_receipts(
2916        &self,
2917    ) -> ProviderResult<Option<(RecoveredBlock<Self::Block>, Vec<Self::Receipt>)>> {
2918        // Required data not present in static_files
2919        Err(ProviderError::UnsupportedProvider)
2920    }
2921
2922    fn recovered_block(
2923        &self,
2924        _id: BlockHashOrNumber,
2925        _transaction_kind: TransactionVariant,
2926    ) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
2927        // Required data not present in static_files
2928        Err(ProviderError::UnsupportedProvider)
2929    }
2930
2931    fn sealed_block_with_senders(
2932        &self,
2933        _id: BlockHashOrNumber,
2934        _transaction_kind: TransactionVariant,
2935    ) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
2936        // Required data not present in static_files
2937        Err(ProviderError::UnsupportedProvider)
2938    }
2939
2940    fn block_range(&self, _range: RangeInclusive<BlockNumber>) -> ProviderResult<Vec<Self::Block>> {
2941        // Required data not present in static_files
2942        Err(ProviderError::UnsupportedProvider)
2943    }
2944
2945    fn block_with_senders_range(
2946        &self,
2947        _range: RangeInclusive<BlockNumber>,
2948    ) -> ProviderResult<Vec<RecoveredBlock<Self::Block>>> {
2949        Err(ProviderError::UnsupportedProvider)
2950    }
2951
2952    fn recovered_block_range(
2953        &self,
2954        _range: RangeInclusive<BlockNumber>,
2955    ) -> ProviderResult<Vec<RecoveredBlock<Self::Block>>> {
2956        Err(ProviderError::UnsupportedProvider)
2957    }
2958
2959    fn block_by_transaction_id(&self, _id: TxNumber) -> ProviderResult<Option<BlockNumber>> {
2960        Err(ProviderError::UnsupportedProvider)
2961    }
2962}
2963
2964impl<N: NodePrimitives> BlockBodyIndicesProvider for StaticFileProvider<N> {
2965    fn block_body_indices(&self, _num: u64) -> ProviderResult<Option<StoredBlockBodyIndices>> {
2966        Err(ProviderError::UnsupportedProvider)
2967    }
2968
2969    fn block_body_indices_range(
2970        &self,
2971        _range: RangeInclusive<BlockNumber>,
2972    ) -> ProviderResult<Vec<StoredBlockBodyIndices>> {
2973        Err(ProviderError::UnsupportedProvider)
2974    }
2975}
2976
2977impl<N: NodePrimitives> StatsReader for StaticFileProvider<N> {
2978    fn count_entries<T: Table>(&self) -> ProviderResult<usize> {
2979        match T::NAME {
2980            tables::CanonicalHeaders::NAME |
2981            tables::Headers::<Header>::NAME |
2982            tables::HeaderTerminalDifficulties::NAME => Ok(self
2983                .get_highest_static_file_block(StaticFileSegment::Headers)
2984                .map(|block| block + 1)
2985                .unwrap_or_default()
2986                as usize),
2987            tables::Receipts::<Receipt>::NAME => Ok(self
2988                .get_highest_static_file_tx(StaticFileSegment::Receipts)
2989                .map(|receipts| receipts + 1)
2990                .unwrap_or_default() as usize),
2991            tables::Transactions::<TransactionSigned>::NAME => Ok(self
2992                .get_highest_static_file_tx(StaticFileSegment::Transactions)
2993                .map(|txs| txs + 1)
2994                .unwrap_or_default()
2995                as usize),
2996            tables::TransactionSenders::NAME => Ok(self
2997                .get_highest_static_file_tx(StaticFileSegment::TransactionSenders)
2998                .map(|txs| txs + 1)
2999                .unwrap_or_default() as usize),
3000            _ => Err(ProviderError::UnsupportedProvider),
3001        }
3002    }
3003}
3004
3005/// Calculates the tx hash for the given transaction and its id.
3006#[inline]
3007fn calculate_hash<T>(
3008    entry: (TxNumber, T),
3009    rlp_buf: &mut Vec<u8>,
3010) -> Result<(B256, TxNumber), Box<ProviderError>>
3011where
3012    T: Encodable2718,
3013{
3014    let (tx_id, tx) = entry;
3015    tx.encode_2718(rlp_buf);
3016    Ok((keccak256(rlp_buf), tx_id))
3017}
3018
3019#[cfg(test)]
3020mod tests {
3021    use std::collections::BTreeMap;
3022
3023    use reth_chain_state::EthPrimitives;
3024    use reth_db::test_utils::create_test_static_files_dir;
3025    use reth_static_file_types::{SegmentRangeInclusive, StaticFileSegment};
3026
3027    use crate::{providers::StaticFileProvider, StaticFileProviderBuilder};
3028
3029    #[test]
3030    fn test_find_fixed_range_with_block_index() -> eyre::Result<()> {
3031        let (static_dir, _) = create_test_static_files_dir();
3032        let sf_rw: StaticFileProvider<EthPrimitives> =
3033            StaticFileProviderBuilder::read_write(&static_dir).with_blocks_per_file(100).build()?;
3034
3035        let segment = StaticFileSegment::Headers;
3036
3037        // Test with None - should use default behavior
3038        assert_eq!(
3039            sf_rw.find_fixed_range_with_block_index(segment, None, 0),
3040            SegmentRangeInclusive::new(0, 99)
3041        );
3042        assert_eq!(
3043            sf_rw.find_fixed_range_with_block_index(segment, None, 250),
3044            SegmentRangeInclusive::new(200, 299)
3045        );
3046
3047        // Test with empty index - should fall back to default behavior
3048        assert_eq!(
3049            sf_rw.find_fixed_range_with_block_index(segment, Some(&BTreeMap::new()), 150),
3050            SegmentRangeInclusive::new(100, 199)
3051        );
3052
3053        // Create block index with existing ranges
3054        let block_index = BTreeMap::from_iter([
3055            (99, SegmentRangeInclusive::new(0, 99)),
3056            (199, SegmentRangeInclusive::new(100, 199)),
3057            (299, SegmentRangeInclusive::new(200, 299)),
3058        ]);
3059
3060        // Test blocks within existing ranges - should return the matching range
3061        assert_eq!(
3062            sf_rw.find_fixed_range_with_block_index(segment, Some(&block_index), 0),
3063            SegmentRangeInclusive::new(0, 99)
3064        );
3065        assert_eq!(
3066            sf_rw.find_fixed_range_with_block_index(segment, Some(&block_index), 50),
3067            SegmentRangeInclusive::new(0, 99)
3068        );
3069        assert_eq!(
3070            sf_rw.find_fixed_range_with_block_index(segment, Some(&block_index), 99),
3071            SegmentRangeInclusive::new(0, 99)
3072        );
3073        assert_eq!(
3074            sf_rw.find_fixed_range_with_block_index(segment, Some(&block_index), 100),
3075            SegmentRangeInclusive::new(100, 199)
3076        );
3077        assert_eq!(
3078            sf_rw.find_fixed_range_with_block_index(segment, Some(&block_index), 150),
3079            SegmentRangeInclusive::new(100, 199)
3080        );
3081        assert_eq!(
3082            sf_rw.find_fixed_range_with_block_index(segment, Some(&block_index), 199),
3083            SegmentRangeInclusive::new(100, 199)
3084        );
3085
3086        // Test blocks beyond existing ranges - should derive new ranges from the last range
3087        // Block 300 is exactly one segment after the last range
3088        assert_eq!(
3089            sf_rw.find_fixed_range_with_block_index(segment, Some(&block_index), 300),
3090            SegmentRangeInclusive::new(300, 399)
3091        );
3092        assert_eq!(
3093            sf_rw.find_fixed_range_with_block_index(segment, Some(&block_index), 350),
3094            SegmentRangeInclusive::new(300, 399)
3095        );
3096
3097        // Block 500 skips one segment (300-399)
3098        assert_eq!(
3099            sf_rw.find_fixed_range_with_block_index(segment, Some(&block_index), 500),
3100            SegmentRangeInclusive::new(500, 599)
3101        );
3102
3103        // Block 1000 skips many segments
3104        assert_eq!(
3105            sf_rw.find_fixed_range_with_block_index(segment, Some(&block_index), 1000),
3106            SegmentRangeInclusive::new(1000, 1099)
3107        );
3108
3109        // Test with block index having different sizes than blocks_per_file setting
3110        // This simulates the scenario where blocks_per_file was changed between runs
3111        let mixed_size_index = BTreeMap::from_iter([
3112            (49, SegmentRangeInclusive::new(0, 49)),     // 50 blocks
3113            (149, SegmentRangeInclusive::new(50, 149)),  // 100 blocks
3114            (349, SegmentRangeInclusive::new(150, 349)), // 200 blocks
3115        ]);
3116
3117        // Blocks within existing ranges should return those ranges regardless of size
3118        assert_eq!(
3119            sf_rw.find_fixed_range_with_block_index(segment, Some(&mixed_size_index), 25),
3120            SegmentRangeInclusive::new(0, 49)
3121        );
3122        assert_eq!(
3123            sf_rw.find_fixed_range_with_block_index(segment, Some(&mixed_size_index), 100),
3124            SegmentRangeInclusive::new(50, 149)
3125        );
3126        assert_eq!(
3127            sf_rw.find_fixed_range_with_block_index(segment, Some(&mixed_size_index), 200),
3128            SegmentRangeInclusive::new(150, 349)
3129        );
3130
3131        // Block after the last range should derive using current blocks_per_file (100)
3132        // from the end of the last range (349)
3133        assert_eq!(
3134            sf_rw.find_fixed_range_with_block_index(segment, Some(&mixed_size_index), 350),
3135            SegmentRangeInclusive::new(350, 449)
3136        );
3137        assert_eq!(
3138            sf_rw.find_fixed_range_with_block_index(segment, Some(&mixed_size_index), 450),
3139            SegmentRangeInclusive::new(450, 549)
3140        );
3141        assert_eq!(
3142            sf_rw.find_fixed_range_with_block_index(segment, Some(&mixed_size_index), 550),
3143            SegmentRangeInclusive::new(550, 649)
3144        );
3145
3146        Ok(())
3147    }
3148}