Skip to main content

reth_provider/providers/static_file/
manager.rs

1use super::{
2    metrics::StaticFileProviderMetrics, writer::StaticFileWriters, LoadedJar,
3    StaticFileJarProvider, StaticFileProviderRW, StaticFileProviderRWRefMut,
4};
5use crate::{
6    changeset_walker::{StaticFileAccountChangesetWalker, StaticFileStorageChangesetWalker},
7    to_range, BlockHashReader, BlockNumReader, BlockReader, BlockSource, EitherWriter,
8    EitherWriterDestination, HeaderProvider, ReceiptProvider, StageCheckpointReader, StatsReader,
9    TransactionVariant, TransactionsProvider, TransactionsProviderExt,
10};
11use alloy_consensus::{transaction::TransactionMeta, Header};
12use alloy_eips::{eip2718::Encodable2718, BlockHashOrNumber};
13use alloy_primitives::{b256, keccak256, Address, BlockHash, BlockNumber, TxHash, TxNumber, B256};
14use notify::{RecommendedWatcher, RecursiveMode, Watcher};
15use parking_lot::RwLock;
16use reth_chain_state::ExecutedBlock;
17use reth_chainspec::{ChainInfo, ChainSpecProvider, EthChainSpec, NamedChain};
18use reth_db::{
19    lockfile::StorageLock,
20    static_file::{
21        iter_static_files, BlockHashMask, HeaderMask, HeaderWithHashMask, ReceiptMask,
22        StaticFileCursor, StorageChangesetMask, TransactionMask, TransactionSenderMask,
23    },
24};
25use reth_db_api::{
26    cursor::DbCursorRO,
27    models::{AccountBeforeTx, BlockNumberAddress, StorageBeforeTx, StoredBlockBodyIndices},
28    table::{Decompress, Table, Value},
29    tables,
30    transaction::DbTx,
31};
32use reth_ethereum_primitives::{Receipt, TransactionSigned};
33use reth_nippy_jar::{NippyJar, NippyJarChecker, CONFIG_FILE_EXTENSION};
34use reth_node_types::NodePrimitives;
35use reth_primitives_traits::{
36    dashmap::DashMap, AlloyBlockHeader as _, BlockBody as _, RecoveredBlock, SealedHeader,
37    SignedTransaction, StorageEntry,
38};
39use reth_prune_types::PruneSegment;
40use reth_stages_types::PipelineTarget;
41use reth_static_file_types::{
42    find_fixed_range, ChangesetOffsetReader, HighestStaticFiles, SegmentHeader,
43    SegmentRangeInclusive, StaticFileMap, StaticFileSegment, DEFAULT_BLOCKS_PER_STATIC_FILE,
44};
45use reth_storage_api::{
46    BlockBodyIndicesProvider, ChangeSetReader, DBProvider, PruneCheckpointReader,
47    StorageChangeSetReader, StorageSettingsCache,
48};
49use reth_storage_errors::provider::{ProviderError, ProviderResult, StaticFileWriterError};
50use std::{
51    collections::BTreeMap,
52    fmt::Debug,
53    ops::{Bound, Deref, Range, RangeBounds, RangeInclusive},
54    path::{Path, PathBuf},
55    sync::{atomic::AtomicU64, mpsc, Arc},
56};
57use tracing::{debug, info, info_span, instrument, trace, warn};
58
59/// Alias type for a map that can be queried for block or transaction ranges. It uses `u64` to
60/// represent either a block or a transaction number end of a static file range.
61type SegmentRanges = BTreeMap<u64, SegmentRangeInclusive>;
62
63/// Access mode on a static file provider. RO/RW.
64#[derive(Debug, Default, PartialEq, Eq)]
65pub enum StaticFileAccess {
66    /// Read-only access.
67    #[default]
68    RO,
69    /// Read-write access.
70    RW,
71}
72
73impl StaticFileAccess {
74    /// Returns `true` if read-only access.
75    pub const fn is_read_only(&self) -> bool {
76        matches!(self, Self::RO)
77    }
78
79    /// Returns `true` if read-write access.
80    pub const fn is_read_write(&self) -> bool {
81        matches!(self, Self::RW)
82    }
83}
84
85/// Context for static file block writes.
86///
87/// Contains target segments and pruning configuration.
88#[derive(Debug, Clone, Copy, Default)]
89pub struct StaticFileWriteCtx {
90    /// Whether transaction senders should be written to static files.
91    pub write_senders: bool,
92    /// Whether receipts should be written to static files.
93    pub write_receipts: bool,
94    /// Whether account changesets should be written to static files.
95    pub write_account_changesets: bool,
96    /// Whether storage changesets should be written to static files.
97    pub write_storage_changesets: bool,
98    /// The current chain tip block number (for pruning).
99    pub tip: BlockNumber,
100    /// The prune mode for receipts, if any.
101    pub receipts_prune_mode: Option<reth_prune_types::PruneMode>,
102    /// Whether receipts are prunable (based on storage settings and prune distance).
103    pub receipts_prunable: bool,
104}
105
106/// [`StaticFileProvider`] manages all existing [`StaticFileJarProvider`].
107///
108/// "Static files" contain immutable chain history data, such as:
109///  - transactions
110///  - headers
111///  - receipts
112///
113/// This provider type is responsible for reading and writing to static files.
114#[derive(Debug)]
115pub struct StaticFileProvider<N>(pub(crate) Arc<StaticFileProviderInner<N>>);
116
117impl<N> Clone for StaticFileProvider<N> {
118    fn clone(&self) -> Self {
119        Self(self.0.clone())
120    }
121}
122
123/// Builder for [`StaticFileProvider`] that allows configuration before initialization.
124#[derive(Debug)]
125pub struct StaticFileProviderBuilder<P> {
126    access: StaticFileAccess,
127    use_metrics: bool,
128    blocks_per_file: StaticFileMap<u64>,
129    path: P,
130    genesis_block_number: u64,
131}
132
133impl<P: AsRef<Path>> StaticFileProviderBuilder<P> {
134    /// Creates a new builder with read-write access.
135    pub fn read_write(path: P) -> Self {
136        Self {
137            path,
138            access: StaticFileAccess::RW,
139            blocks_per_file: Default::default(),
140            use_metrics: false,
141            genesis_block_number: 0,
142        }
143    }
144
145    /// Creates a new builder with read-only access.
146    pub fn read_only(path: P) -> Self {
147        Self {
148            path,
149            access: StaticFileAccess::RO,
150            blocks_per_file: Default::default(),
151            use_metrics: false,
152            genesis_block_number: 0,
153        }
154    }
155
156    /// Set custom blocks per file for specific segments.
157    ///
158    /// Each static file segment is stored across multiple files, and each of these files contains
159    /// up to the specified number of blocks of data. When the file gets full, a new file is
160    /// created with the new block range.
161    ///
162    /// This setting affects the size of each static file, and can be set per segment.
163    ///
164    /// If it is changed for an existing node, existing static files will not be affected and will
165    /// be finished with the old blocks per file setting, but new static files will use the new
166    /// setting.
167    pub fn with_blocks_per_file_for_segments(
168        mut self,
169        segments: &<StaticFileMap<u64> as Deref>::Target,
170    ) -> Self {
171        for (segment, &blocks_per_file) in segments {
172            self.blocks_per_file.insert(segment, blocks_per_file);
173        }
174        self
175    }
176
177    /// Set a custom number of blocks per file for all segments.
178    pub fn with_blocks_per_file(mut self, blocks_per_file: u64) -> Self {
179        for segment in StaticFileSegment::iter() {
180            self.blocks_per_file.insert(segment, blocks_per_file);
181        }
182        self
183    }
184
185    /// Set a custom number of blocks per file for a specific segment.
186    pub fn with_blocks_per_file_for_segment(
187        mut self,
188        segment: StaticFileSegment,
189        blocks_per_file: u64,
190    ) -> Self {
191        self.blocks_per_file.insert(segment, blocks_per_file);
192        self
193    }
194
195    /// Enables metrics on the [`StaticFileProvider`].
196    pub const fn with_metrics(mut self) -> Self {
197        self.use_metrics = true;
198        self
199    }
200
201    /// Sets the genesis block number for the [`StaticFileProvider`].
202    ///
203    /// This configures the genesis block number, which is used to determine the starting point
204    /// for block indexing and querying operations.
205    ///
206    /// # Arguments
207    ///
208    /// * `genesis_block_number` - The block number of the genesis block.
209    ///
210    /// # Returns
211    ///
212    /// Returns `Self` to allow method chaining.
213    pub const fn with_genesis_block_number(mut self, genesis_block_number: u64) -> Self {
214        self.genesis_block_number = genesis_block_number;
215        self
216    }
217
218    /// Builds the final [`StaticFileProvider`] and initializes the index.
219    pub fn build<N: NodePrimitives>(self) -> ProviderResult<StaticFileProvider<N>> {
220        let mut provider = StaticFileProviderInner::new(self.path, self.access)?;
221        if self.use_metrics {
222            provider.metrics = Some(Arc::new(StaticFileProviderMetrics::default()));
223        }
224
225        for (segment, blocks_per_file) in *self.blocks_per_file {
226            provider.blocks_per_file.insert(segment, blocks_per_file);
227        }
228        provider.genesis_block_number = self.genesis_block_number;
229
230        let provider = StaticFileProvider(Arc::new(provider));
231        provider.initialize_index()?;
232        Ok(provider)
233    }
234}
235
236impl<N: NodePrimitives> StaticFileProvider<N> {
237    /// Creates a new [`StaticFileProvider`] with the given [`StaticFileAccess`].
238    fn new(path: impl AsRef<Path>, access: StaticFileAccess) -> ProviderResult<Self> {
239        let provider = Self(Arc::new(StaticFileProviderInner::new(path, access)?));
240        provider.initialize_index()?;
241        Ok(provider)
242    }
243}
244
245impl<N: NodePrimitives> StaticFileProvider<N> {
246    /// Creates a new [`StaticFileProvider`] with read-only access.
247    ///
248    /// Set `watch_directory` to `true` to track the most recent changes in static files. Otherwise,
249    /// new data won't be detected or queryable.
250    ///
251    /// Watching is recommended if the read-only provider is used on a directory that an active node
252    /// instance is modifying.
253    ///
254    /// See also [`StaticFileProvider::watch_directory`].
255    pub fn read_only(path: impl AsRef<Path>, watch_directory: bool) -> ProviderResult<Self> {
256        let provider = Self::new(path, StaticFileAccess::RO)?;
257
258        if watch_directory {
259            provider.watch_directory();
260        }
261
262        Ok(provider)
263    }
264
265    /// Creates a new [`StaticFileProvider`] with read-write access.
266    pub fn read_write(path: impl AsRef<Path>) -> ProviderResult<Self> {
267        Self::new(path, StaticFileAccess::RW)
268    }
269
270    /// Watches the directory for changes and updates the in-memory index when modifications
271    /// are detected.
272    ///
273    /// This may be necessary, since a non-node process that owns a [`StaticFileProvider`] does not
274    /// receive `update_index` notifications from a node that appends/truncates data.
275    pub fn watch_directory(&self) {
276        let provider = self.clone();
277        reth_tasks::spawn_os_thread("sf-watch", move || {
278            let (tx, rx) = std::sync::mpsc::channel();
279            let mut watcher = RecommendedWatcher::new(
280                move |res| tx.send(res).unwrap(),
281                notify::Config::default(),
282            )
283            .expect("failed to create watcher");
284
285            watcher
286                .watch(&provider.path, RecursiveMode::NonRecursive)
287                .expect("failed to watch path");
288
289            // Some backends send repeated modified events
290            let mut last_event_timestamp = None;
291
292            while let Ok(res) = rx.recv() {
293                match res {
294                    Ok(event) => {
295                        // We only care about modified data events
296                        if !matches!(
297                            event.kind,
298                            notify::EventKind::Modify(_) |
299                                notify::EventKind::Create(_) |
300                                notify::EventKind::Remove(_)
301                        ) {
302                            continue;
303                        }
304
305                        // We only trigger a re-initialization if a configuration file was
306                        // modified. This means that a
307                        // static_file_provider.commit() was called on the node after
308                        // appending/truncating rows
309                        for segment in event.paths {
310                            // Ensure it's a file with the .conf extension
311                            if segment
312                                .extension()
313                                .is_none_or(|s| s.to_str() != Some(CONFIG_FILE_EXTENSION))
314                            {
315                                continue;
316                            }
317
318                            // Ensure it's well formatted static file name
319                            if StaticFileSegment::parse_filename(
320                                &segment.file_stem().expect("qed").to_string_lossy(),
321                            )
322                            .is_none()
323                            {
324                                continue;
325                            }
326
327                            // If we can read the metadata and modified timestamp, ensure this is
328                            // not an old or repeated event.
329                            if let Ok(current_modified_timestamp) =
330                                std::fs::metadata(&segment).and_then(|m| m.modified())
331                            {
332                                if last_event_timestamp.is_some_and(|last_timestamp| {
333                                    last_timestamp >= current_modified_timestamp
334                                }) {
335                                    continue;
336                                }
337                                last_event_timestamp = Some(current_modified_timestamp);
338                            }
339
340                            info!(target: "providers::static_file", updated_file = ?segment.file_stem(), "re-initializing static file provider index");
341                            if let Err(err) = provider.initialize_index() {
342                                warn!(target: "providers::static_file", "failed to re-initialize index: {err}");
343                            }
344                            break;
345                        }
346                    }
347
348                    Err(err) => warn!(target: "providers::watcher", "watch error: {err:?}"),
349                }
350            }
351        });
352    }
353}
354
355impl<N: NodePrimitives> Deref for StaticFileProvider<N> {
356    type Target = StaticFileProviderInner<N>;
357
358    fn deref(&self) -> &Self::Target {
359        &self.0
360    }
361}
362
363/// [`StaticFileProviderInner`] manages all existing [`StaticFileJarProvider`].
364#[derive(Debug)]
365pub struct StaticFileProviderInner<N> {
366    /// Maintains a map which allows for concurrent access to different `NippyJars`, over different
367    /// segments and ranges.
368    map: DashMap<(BlockNumber, StaticFileSegment), LoadedJar>,
369    /// Indexes per segment.
370    indexes: RwLock<StaticFileMap<StaticFileSegmentIndex>>,
371    /// This is an additional index that tracks the expired height, this will track the highest
372    /// block number that has been expired (missing). The first, non expired block is
373    /// `expired_history_height + 1`.
374    ///
375    /// This is effectively the transaction range that has been expired:
376    /// [`StaticFileProvider::delete_segment_below_block`] and mirrors
377    /// `static_files_min_block[transactions] - blocks_per_file`.
378    ///
379    /// This additional tracker exists for more efficient lookups because the node must be aware of
380    /// the expired height.
381    earliest_history_height: AtomicU64,
382    /// Directory where `static_files` are located
383    path: PathBuf,
384    /// Maintains a writer set of [`StaticFileSegment`].
385    writers: StaticFileWriters<N>,
386    /// Metrics for the static files.
387    metrics: Option<Arc<StaticFileProviderMetrics>>,
388    /// Access rights of the provider.
389    access: StaticFileAccess,
390    /// Number of blocks per file, per segment.
391    blocks_per_file: StaticFileMap<u64>,
392    /// Write lock for when access is [`StaticFileAccess::RW`].
393    _lock_file: Option<StorageLock>,
394    /// Genesis block number, default is 0;
395    genesis_block_number: u64,
396}
397
398impl<N: NodePrimitives> StaticFileProviderInner<N> {
399    /// Creates a new [`StaticFileProviderInner`].
400    fn new(path: impl AsRef<Path>, access: StaticFileAccess) -> ProviderResult<Self> {
401        let _lock_file = if access.is_read_write() {
402            StorageLock::try_acquire(path.as_ref()).map_err(ProviderError::other)?.into()
403        } else {
404            None
405        };
406
407        let mut blocks_per_file = StaticFileMap::default();
408        for segment in StaticFileSegment::iter() {
409            blocks_per_file.insert(segment, DEFAULT_BLOCKS_PER_STATIC_FILE);
410        }
411
412        let provider = Self {
413            map: Default::default(),
414            indexes: Default::default(),
415            writers: Default::default(),
416            earliest_history_height: Default::default(),
417            path: path.as_ref().to_path_buf(),
418            metrics: None,
419            access,
420            blocks_per_file,
421            _lock_file,
422            genesis_block_number: 0,
423        };
424
425        Ok(provider)
426    }
427
428    pub const fn is_read_only(&self) -> bool {
429        self.access.is_read_only()
430    }
431
432    /// Each static file has a fixed number of blocks. This gives out the range where the requested
433    /// block is positioned.
434    ///
435    /// If the specified block falls into one of the ranges of already initialized static files,
436    /// this function will return that range.
437    ///
438    /// If no matching file exists, this function will derive a new range from the end of the last
439    /// existing file, if any.
440    pub fn find_fixed_range_with_block_index(
441        &self,
442        segment: StaticFileSegment,
443        block_index: Option<&SegmentRanges>,
444        block: BlockNumber,
445    ) -> SegmentRangeInclusive {
446        let blocks_per_file =
447            self.blocks_per_file.get(segment).copied().unwrap_or(DEFAULT_BLOCKS_PER_STATIC_FILE);
448
449        if let Some(block_index) = block_index {
450            // Find first block range that contains the requested block
451            if let Some((_, range)) = block_index.iter().find(|(max_block, _)| block <= **max_block)
452            {
453                // Found matching range for an existing file using block index
454                return *range;
455            } else if let Some((_, range)) = block_index.last_key_value() {
456                // Didn't find matching range for an existing file, derive a new range from the end
457                // of the last existing file range.
458                //
459                // `block` is always higher than `range.end()` here, because we iterated over all
460                // `block_index` ranges above and didn't find one that contains our block
461                let blocks_after_last_range = block - range.end();
462                let segments_to_skip = (blocks_after_last_range - 1) / blocks_per_file;
463                let start = range.end() + 1 + segments_to_skip * blocks_per_file;
464                return SegmentRangeInclusive::new(start, start + blocks_per_file - 1);
465            }
466        }
467        // No block index is available, derive a new range using the fixed number of blocks,
468        // starting from the beginning.
469        find_fixed_range(block, blocks_per_file)
470    }
471
472    /// Each static file has a fixed number of blocks. This gives out the range where the requested
473    /// block is positioned.
474    ///
475    /// If the specified block falls into one of the ranges of already initialized static files,
476    /// this function will return that range.
477    ///
478    /// If no matching file exists, this function will derive a new range from the end of the last
479    /// existing file, if any.
480    ///
481    /// This function will block indefinitely if a write lock for
482    /// [`Self::indexes`] is already acquired. In that case, use
483    /// [`Self::find_fixed_range_with_block_index`].
484    pub fn find_fixed_range(
485        &self,
486        segment: StaticFileSegment,
487        block: BlockNumber,
488    ) -> SegmentRangeInclusive {
489        self.find_fixed_range_with_block_index(
490            segment,
491            self.indexes.read().get(segment).map(|index| &index.expected_block_ranges_by_max_block),
492            block,
493        )
494    }
495
496    /// Get genesis block number
497    pub const fn genesis_block_number(&self) -> u64 {
498        self.genesis_block_number
499    }
500}
501
502impl<N: NodePrimitives> StaticFileProvider<N> {
503    /// Reports metrics for the static files.
504    ///
505    /// This uses the in-memory index to get file sizes from mmap handles instead of reading
506    /// filesystem metadata.
507    pub fn report_metrics(&self) -> ProviderResult<()> {
508        let Some(metrics) = &self.metrics else { return Ok(()) };
509
510        let static_files = iter_static_files(&self.path).map_err(ProviderError::other)?;
511        for (segment, headers) in &*static_files {
512            let mut entries = 0;
513            let mut size = 0;
514
515            for (block_range, _) in headers {
516                let fixed_block_range = self.find_fixed_range(segment, block_range.start());
517                let jar_provider = self
518                    .get_segment_provider_for_range(segment, || Some(fixed_block_range), None)?
519                    .ok_or_else(|| {
520                        ProviderError::MissingStaticFileBlock(segment, block_range.start())
521                    })?;
522
523                entries += jar_provider.rows();
524                size += jar_provider.size() as u64;
525            }
526
527            metrics.record_segment(segment, size, headers.len(), entries);
528        }
529
530        Ok(())
531    }
532
533    /// Writes headers for all blocks to the static file segment.
534    #[instrument(level = "debug", target = "providers::static_file", skip_all)]
535    fn write_headers(
536        w: &mut StaticFileProviderRWRefMut<'_, N>,
537        blocks: &[ExecutedBlock<N>],
538    ) -> ProviderResult<()> {
539        for block in blocks {
540            let b = block.recovered_block();
541            w.append_header(b.header(), &b.hash())?;
542        }
543        Ok(())
544    }
545
546    /// Writes transactions for all blocks to the static file segment.
547    #[instrument(level = "debug", target = "providers::static_file", skip_all)]
548    fn write_transactions(
549        w: &mut StaticFileProviderRWRefMut<'_, N>,
550        blocks: &[ExecutedBlock<N>],
551        tx_nums: &[TxNumber],
552    ) -> ProviderResult<()> {
553        for (block, &first_tx) in blocks.iter().zip(tx_nums) {
554            let b = block.recovered_block();
555            w.increment_block(b.number())?;
556            for (i, tx) in b.body().transactions().iter().enumerate() {
557                w.append_transaction(first_tx + i as u64, tx)?;
558            }
559        }
560        Ok(())
561    }
562
563    /// Writes transaction senders for all blocks to the static file segment.
564    #[instrument(level = "debug", target = "providers::static_file", skip_all)]
565    fn write_transaction_senders(
566        w: &mut StaticFileProviderRWRefMut<'_, N>,
567        blocks: &[ExecutedBlock<N>],
568        tx_nums: &[TxNumber],
569    ) -> ProviderResult<()> {
570        for (block, &first_tx) in blocks.iter().zip(tx_nums) {
571            let b = block.recovered_block();
572            w.increment_block(b.number())?;
573            for (i, sender) in b.senders_iter().enumerate() {
574                w.append_transaction_sender(first_tx + i as u64, sender)?;
575            }
576        }
577        Ok(())
578    }
579
580    /// Writes receipts for all blocks to the static file segment.
581    #[instrument(level = "debug", target = "providers::static_file", skip_all)]
582    fn write_receipts(
583        w: &mut StaticFileProviderRWRefMut<'_, N>,
584        blocks: &[ExecutedBlock<N>],
585        tx_nums: &[TxNumber],
586        ctx: &StaticFileWriteCtx,
587    ) -> ProviderResult<()> {
588        for (block, &first_tx) in blocks.iter().zip(tx_nums) {
589            let block_number = block.recovered_block().number();
590            w.increment_block(block_number)?;
591
592            // skip writing receipts if pruning configuration requires us to.
593            if ctx.receipts_prunable &&
594                ctx.receipts_prune_mode
595                    .is_some_and(|mode| mode.should_prune(block_number, ctx.tip))
596            {
597                continue
598            }
599
600            for (i, receipt) in block.execution_outcome().receipts.iter().enumerate() {
601                w.append_receipt(first_tx + i as u64, receipt)?;
602            }
603        }
604        Ok(())
605    }
606
607    /// Writes account changesets for all blocks to the static file segment.
608    #[instrument(level = "debug", target = "providers::static_file", skip_all)]
609    fn write_account_changesets(
610        w: &mut StaticFileProviderRWRefMut<'_, N>,
611        blocks: &[ExecutedBlock<N>],
612    ) -> ProviderResult<()> {
613        for block in blocks {
614            let block_number = block.recovered_block().number();
615            let reverts = block.execution_outcome().state.reverts.to_plain_state_reverts();
616
617            let changeset: Vec<_> = reverts
618                .accounts
619                .into_iter()
620                .flatten()
621                .map(|(address, info)| AccountBeforeTx { address, info: info.map(Into::into) })
622                .collect();
623            w.append_account_changeset(changeset, block_number)?;
624        }
625        Ok(())
626    }
627
628    /// Writes storage changesets for all blocks to the static file segment.
629    #[instrument(level = "debug", target = "providers::db", skip_all)]
630    fn write_storage_changesets(
631        w: &mut StaticFileProviderRWRefMut<'_, N>,
632        blocks: &[ExecutedBlock<N>],
633    ) -> ProviderResult<()> {
634        for block in blocks {
635            let block_number = block.recovered_block().number();
636            let reverts = block.execution_outcome().state.reverts.to_plain_state_reverts();
637
638            let changeset: Vec<_> = reverts
639                .storage
640                .into_iter()
641                .flatten()
642                .flat_map(|revert| {
643                    revert.storage_revert.into_iter().map(move |(key, revert_to_slot)| {
644                        StorageBeforeTx {
645                            address: revert.address,
646                            key: B256::from(key.to_be_bytes()),
647                            value: revert_to_slot.to_previous_value(),
648                        }
649                    })
650                })
651                .collect();
652            w.append_storage_changeset(changeset, block_number)?;
653        }
654        Ok(())
655    }
656
657    /// Writes to a static file segment using the provided closure.
658    ///
659    /// The closure receives a mutable reference to the segment writer. After the closure completes,
660    /// `sync_all()` is called to flush writes to disk.
661    #[instrument(level = "debug", target = "providers::static_file", skip_all, fields(?segment))]
662    fn write_segment<F>(
663        &self,
664        segment: StaticFileSegment,
665        first_block_number: BlockNumber,
666        f: F,
667    ) -> ProviderResult<()>
668    where
669        F: FnOnce(&mut StaticFileProviderRWRefMut<'_, N>) -> ProviderResult<()>,
670    {
671        let mut w = self.get_writer(first_block_number, segment)?;
672        f(&mut w)?;
673        w.sync_all()
674    }
675
676    /// Writes all static file data for multiple blocks in parallel per-segment.
677    ///
678    /// This spawns tasks on the storage thread pool for each segment type and each task calls
679    /// `sync_all()` on its writer when done.
680    #[instrument(level = "debug", target = "providers::static_file", skip_all)]
681    pub fn write_blocks_data(
682        &self,
683        blocks: &[ExecutedBlock<N>],
684        tx_nums: &[TxNumber],
685        ctx: StaticFileWriteCtx,
686        runtime: &reth_tasks::Runtime,
687    ) -> ProviderResult<()> {
688        if blocks.is_empty() {
689            return Ok(());
690        }
691
692        let first_block_number = blocks[0].recovered_block().number();
693
694        let mut r_headers = None;
695        let mut r_txs = None;
696        let mut r_senders = None;
697        let mut r_receipts = None;
698        let mut r_account_changesets = None;
699        let mut r_storage_changesets = None;
700
701        // Propagate tracing context into rayon-spawned threads so that per-segment
702        // write spans appear as children of write_blocks_data in traces.
703        let span = tracing::Span::current();
704        runtime.storage_pool().in_place_scope(|s| {
705            s.spawn(|_| {
706                let _guard = span.enter();
707                r_headers =
708                    Some(self.write_segment(StaticFileSegment::Headers, first_block_number, |w| {
709                        Self::write_headers(w, blocks)
710                    }));
711            });
712
713            s.spawn(|_| {
714                let _guard = span.enter();
715                r_txs = Some(self.write_segment(
716                    StaticFileSegment::Transactions,
717                    first_block_number,
718                    |w| Self::write_transactions(w, blocks, tx_nums),
719                ));
720            });
721
722            if ctx.write_senders {
723                s.spawn(|_| {
724                    let _guard = span.enter();
725                    r_senders = Some(self.write_segment(
726                        StaticFileSegment::TransactionSenders,
727                        first_block_number,
728                        |w| Self::write_transaction_senders(w, blocks, tx_nums),
729                    ));
730                });
731            }
732
733            if ctx.write_receipts {
734                s.spawn(|_| {
735                    let _guard = span.enter();
736                    r_receipts = Some(self.write_segment(
737                        StaticFileSegment::Receipts,
738                        first_block_number,
739                        |w| Self::write_receipts(w, blocks, tx_nums, &ctx),
740                    ));
741                });
742            }
743
744            if ctx.write_account_changesets {
745                s.spawn(|_| {
746                    let _guard = span.enter();
747                    r_account_changesets = Some(self.write_segment(
748                        StaticFileSegment::AccountChangeSets,
749                        first_block_number,
750                        |w| Self::write_account_changesets(w, blocks),
751                    ));
752                });
753            }
754
755            if ctx.write_storage_changesets {
756                s.spawn(|_| {
757                    let _guard = span.enter();
758                    r_storage_changesets = Some(self.write_segment(
759                        StaticFileSegment::StorageChangeSets,
760                        first_block_number,
761                        |w| Self::write_storage_changesets(w, blocks),
762                    ));
763                });
764            }
765        });
766
767        r_headers.ok_or(StaticFileWriterError::ThreadPanic("headers"))??;
768        r_txs.ok_or(StaticFileWriterError::ThreadPanic("transactions"))??;
769        if ctx.write_senders {
770            r_senders.ok_or(StaticFileWriterError::ThreadPanic("senders"))??;
771        }
772        if ctx.write_receipts {
773            r_receipts.ok_or(StaticFileWriterError::ThreadPanic("receipts"))??;
774        }
775        if ctx.write_account_changesets {
776            r_account_changesets
777                .ok_or(StaticFileWriterError::ThreadPanic("account_changesets"))??;
778        }
779        if ctx.write_storage_changesets {
780            r_storage_changesets
781                .ok_or(StaticFileWriterError::ThreadPanic("storage_changesets"))??;
782        }
783        Ok(())
784    }
785
786    /// Gets the [`StaticFileJarProvider`] of the requested segment and start index that can be
787    /// either block or transaction.
788    pub fn get_segment_provider(
789        &self,
790        segment: StaticFileSegment,
791        number: u64,
792    ) -> ProviderResult<StaticFileJarProvider<'_, N>> {
793        if segment.is_block_or_change_based() {
794            self.get_segment_provider_for_block(segment, number, None)
795        } else {
796            self.get_segment_provider_for_transaction(segment, number, None)
797        }
798    }
799
800    /// Gets the [`StaticFileJarProvider`] of the requested segment and start index that can be
801    /// either block or transaction.
802    ///
803    /// If the segment is not found, returns [`None`].
804    pub fn get_maybe_segment_provider(
805        &self,
806        segment: StaticFileSegment,
807        number: u64,
808    ) -> ProviderResult<Option<StaticFileJarProvider<'_, N>>> {
809        let provider = if segment.is_block_or_change_based() {
810            self.get_segment_provider_for_block(segment, number, None)
811        } else {
812            self.get_segment_provider_for_transaction(segment, number, None)
813        };
814
815        match provider {
816            Ok(provider) => Ok(Some(provider)),
817            Err(
818                ProviderError::MissingStaticFileBlock(_, _) |
819                ProviderError::MissingStaticFileTx(_, _),
820            ) => Ok(None),
821            Err(err) => Err(err),
822        }
823    }
824
825    /// Gets the [`StaticFileJarProvider`] of the requested segment and block.
826    pub fn get_segment_provider_for_block(
827        &self,
828        segment: StaticFileSegment,
829        block: BlockNumber,
830        path: Option<&Path>,
831    ) -> ProviderResult<StaticFileJarProvider<'_, N>> {
832        self.get_segment_provider_for_range(
833            segment,
834            || self.get_segment_ranges_from_block(segment, block),
835            path,
836        )?
837        .ok_or(ProviderError::MissingStaticFileBlock(segment, block))
838    }
839
840    /// Gets the [`StaticFileJarProvider`] of the requested segment and transaction.
841    pub fn get_segment_provider_for_transaction(
842        &self,
843        segment: StaticFileSegment,
844        tx: TxNumber,
845        path: Option<&Path>,
846    ) -> ProviderResult<StaticFileJarProvider<'_, N>> {
847        self.get_segment_provider_for_range(
848            segment,
849            || self.get_segment_ranges_from_transaction(segment, tx),
850            path,
851        )?
852        .ok_or(ProviderError::MissingStaticFileTx(segment, tx))
853    }
854
855    /// Gets the [`StaticFileJarProvider`] of the requested segment and block or transaction.
856    ///
857    /// `fn_range` should make sure the range goes through `find_fixed_range`.
858    pub fn get_segment_provider_for_range(
859        &self,
860        segment: StaticFileSegment,
861        fn_range: impl Fn() -> Option<SegmentRangeInclusive>,
862        path: Option<&Path>,
863    ) -> ProviderResult<Option<StaticFileJarProvider<'_, N>>> {
864        // If we have a path, then get the block range from its name.
865        // Otherwise, check `self.available_static_files`
866        let block_range = match path {
867            Some(path) => StaticFileSegment::parse_filename(
868                &path
869                    .file_name()
870                    .ok_or_else(|| {
871                        ProviderError::MissingStaticFileSegmentPath(segment, path.to_path_buf())
872                    })?
873                    .to_string_lossy(),
874            )
875            .and_then(|(parsed_segment, block_range)| {
876                if parsed_segment == segment {
877                    return Some(block_range);
878                }
879                None
880            }),
881            None => fn_range(),
882        };
883
884        // Return cached `LoadedJar` or insert it for the first time, and then, return it.
885        if let Some(block_range) = block_range {
886            return Ok(Some(self.get_or_create_jar_provider(segment, &block_range)?));
887        }
888
889        Ok(None)
890    }
891
892    /// Gets the [`StaticFileJarProvider`] of the requested path.
893    pub fn get_segment_provider_for_path(
894        &self,
895        path: &Path,
896    ) -> ProviderResult<Option<StaticFileJarProvider<'_, N>>> {
897        StaticFileSegment::parse_filename(
898            &path
899                .file_name()
900                .ok_or_else(|| ProviderError::MissingStaticFilePath(path.to_path_buf()))?
901                .to_string_lossy(),
902        )
903        .map(|(segment, block_range)| self.get_or_create_jar_provider(segment, &block_range))
904        .transpose()
905    }
906
907    /// Given a segment and block range it removes the cached provider from the map.
908    ///
909    /// CAUTION: cached provider should be dropped before calling this or IT WILL deadlock.
910    pub fn remove_cached_provider(
911        &self,
912        segment: StaticFileSegment,
913        fixed_block_range_end: BlockNumber,
914    ) {
915        self.map.remove(&(fixed_block_range_end, segment));
916    }
917
918    /// This handles history expiry by deleting all static files for the given segment below the
919    /// given block.
920    ///
921    /// For example if block is 1M and the blocks per file are 500K this will delete all individual
922    /// files below 1M, so 0-499K and 500K-999K.
923    ///
924    /// This will not delete the file that contains the block itself, because files can only be
925    /// removed entirely.
926    ///
927    /// # Safety
928    ///
929    /// This method will never delete the highest static file for the segment, even if the
930    /// requested block is higher than the highest block in static files. This ensures we always
931    /// maintain at least one static file if any exist.
932    ///
933    /// Returns a list of `SegmentHeader`s from the deleted jars.
934    pub fn delete_segment_below_block(
935        &self,
936        segment: StaticFileSegment,
937        block: BlockNumber,
938    ) -> ProviderResult<Vec<SegmentHeader>> {
939        // Nothing to delete if block is 0.
940        if block == 0 {
941            return Ok(Vec::new());
942        }
943
944        let highest_block = self.get_highest_static_file_block(segment);
945        let mut deleted_headers = Vec::new();
946
947        loop {
948            let Some(block_height) = self.get_lowest_range_end(segment) else {
949                return Ok(deleted_headers);
950            };
951
952            // Stop if we've reached the target block or the highest static file
953            if block_height >= block || Some(block_height) == highest_block {
954                return Ok(deleted_headers);
955            }
956
957            debug!(
958                target: "providers::static_file",
959                ?segment,
960                ?block_height,
961                "Deleting static file below block"
962            );
963
964            // now we need to wipe the static file, this will take care of updating the index and
965            // advance the lowest tracked block height for the segment.
966            let header = self.delete_jar(segment, block_height).inspect_err(|err| {
967                warn!( target: "providers::static_file", ?segment, %block_height, ?err, "Failed to delete static file below block")
968            })?;
969
970            deleted_headers.push(header);
971        }
972    }
973
974    /// Given a segment and block, it deletes the jar and all files from the respective block range.
975    ///
976    /// CAUTION: destructive. Deletes files on disk.
977    ///
978    /// This will re-initialize the index after deletion, so all files are tracked.
979    ///
980    /// Returns the `SegmentHeader` of the deleted jar.
981    pub fn delete_jar(
982        &self,
983        segment: StaticFileSegment,
984        block: BlockNumber,
985    ) -> ProviderResult<SegmentHeader> {
986        let fixed_block_range = self.find_fixed_range(segment, block);
987        let key = (fixed_block_range.end(), segment);
988        let file = self.path.join(segment.filename(&fixed_block_range));
989        let jar = if let Some((_, jar)) = self.map.remove(&key) {
990            jar.jar
991        } else {
992            debug!(
993                target: "providers::static_file",
994                ?file,
995                ?fixed_block_range,
996                ?block,
997                "Loading static file jar for deletion"
998            );
999            NippyJar::<SegmentHeader>::load(&file).map_err(ProviderError::other)?
1000        };
1001
1002        let header = jar.user_header().clone();
1003
1004        // Delete the sidecar file for changeset segments before deleting the main jar
1005        if segment.is_change_based() {
1006            let csoff_path = file.with_extension("csoff");
1007            if csoff_path.exists() {
1008                std::fs::remove_file(&csoff_path).map_err(ProviderError::other)?;
1009            }
1010        }
1011
1012        jar.delete().map_err(ProviderError::other)?;
1013
1014        // SAFETY: this is currently necessary to ensure that certain indexes like
1015        // `static_files_min_block` have the correct values after pruning.
1016        self.initialize_index()?;
1017
1018        Ok(header)
1019    }
1020
1021    /// Deletes ALL static file jars for the given segment, including the highest one.
1022    ///
1023    /// CAUTION: destructive. Deletes all files on disk for this segment.
1024    ///
1025    /// This is used for `PruneMode::Full` where all data should be removed.
1026    ///
1027    /// Returns a list of `SegmentHeader`s from the deleted jars.
1028    pub fn delete_segment(&self, segment: StaticFileSegment) -> ProviderResult<Vec<SegmentHeader>> {
1029        let mut deleted_headers = Vec::new();
1030
1031        while let Some(block_height) = self.get_highest_static_file_block(segment) {
1032            debug!(
1033                target: "providers::static_file",
1034                ?segment,
1035                ?block_height,
1036                "Deleting static file jar"
1037            );
1038
1039            let header = self.delete_jar(segment, block_height).inspect_err(|err| {
1040                warn!(target: "providers::static_file", ?segment, %block_height, ?err, "Failed to delete static file jar")
1041            })?;
1042
1043            deleted_headers.push(header);
1044        }
1045
1046        Ok(deleted_headers)
1047    }
1048
1049    /// Given a segment and block range it returns a cached
1050    /// [`StaticFileJarProvider`]. TODO(joshie): we should check the size and pop N if there's too
1051    /// many.
1052    fn get_or_create_jar_provider(
1053        &self,
1054        segment: StaticFileSegment,
1055        fixed_block_range: &SegmentRangeInclusive,
1056    ) -> ProviderResult<StaticFileJarProvider<'_, N>> {
1057        let key = (fixed_block_range.end(), segment);
1058
1059        // Avoid using `entry` directly to avoid a write lock in the common case.
1060        trace!(target: "providers::static_file", ?segment, ?fixed_block_range, "Getting provider");
1061        let mut provider: StaticFileJarProvider<'_, N> = if let Some(jar) = self.map.get(&key) {
1062            trace!(target: "providers::static_file", ?segment, ?fixed_block_range, "Jar found in cache");
1063            jar.into()
1064        } else {
1065            trace!(target: "providers::static_file", ?segment, ?fixed_block_range, "Creating jar from scratch");
1066            let path = self.path.join(segment.filename(fixed_block_range));
1067            let jar = NippyJar::load(&path).map_err(ProviderError::other)?;
1068            self.map.entry(key).insert(LoadedJar::new(jar)?).downgrade().into()
1069        };
1070
1071        if let Some(metrics) = &self.metrics {
1072            provider = provider.with_metrics(metrics.clone());
1073        }
1074        Ok(provider)
1075    }
1076
1077    /// Gets a static file segment's block range from the provider inner block
1078    /// index.
1079    fn get_segment_ranges_from_block(
1080        &self,
1081        segment: StaticFileSegment,
1082        block: u64,
1083    ) -> Option<SegmentRangeInclusive> {
1084        let indexes = self.indexes.read();
1085        let index = indexes.get(segment)?;
1086
1087        (index.max_block >= block).then(|| {
1088            self.find_fixed_range_with_block_index(
1089                segment,
1090                Some(&index.expected_block_ranges_by_max_block),
1091                block,
1092            )
1093        })
1094    }
1095
1096    /// Gets a static file segment's fixed block range from the provider inner
1097    /// transaction index.
1098    fn get_segment_ranges_from_transaction(
1099        &self,
1100        segment: StaticFileSegment,
1101        tx: u64,
1102    ) -> Option<SegmentRangeInclusive> {
1103        let indexes = self.indexes.read();
1104        let index = indexes.get(segment)?;
1105        let available_block_ranges_by_max_tx = index.available_block_ranges_by_max_tx.as_ref()?;
1106
1107        // It's more probable that the request comes from a newer tx height, so we iterate
1108        // the static_files in reverse.
1109        let mut static_files_rev_iter = available_block_ranges_by_max_tx.iter().rev().peekable();
1110
1111        while let Some((tx_end, block_range)) = static_files_rev_iter.next() {
1112            if tx > *tx_end {
1113                // request tx is higher than highest static file tx
1114                return None;
1115            }
1116            let tx_start = static_files_rev_iter.peek().map(|(tx_end, _)| *tx_end + 1).unwrap_or(0);
1117            if tx_start <= tx {
1118                return Some(self.find_fixed_range_with_block_index(
1119                    segment,
1120                    Some(&index.expected_block_ranges_by_max_block),
1121                    block_range.end(),
1122                ));
1123            }
1124        }
1125        None
1126    }
1127
1128    /// Updates the inner transaction and block indexes alongside the internal cached providers in
1129    /// `self.map`.
1130    ///
1131    /// Any entry higher than `segment_max_block` will be deleted from the previous structures.
1132    ///
1133    /// If `segment_max_block` is None it means there's no static file for this segment.
1134    pub fn update_index(
1135        &self,
1136        segment: StaticFileSegment,
1137        segment_max_block: Option<BlockNumber>,
1138    ) -> ProviderResult<()> {
1139        debug!(
1140            target: "providers::static_file",
1141            ?segment,
1142            ?segment_max_block,
1143            "Updating provider index"
1144        );
1145        let mut indexes = self.indexes.write();
1146
1147        match segment_max_block {
1148            Some(segment_max_block) => {
1149                let fixed_range = self.find_fixed_range_with_block_index(
1150                    segment,
1151                    indexes.get(segment).map(|index| &index.expected_block_ranges_by_max_block),
1152                    segment_max_block,
1153                );
1154
1155                let jar = NippyJar::<SegmentHeader>::load(
1156                    &self.path.join(segment.filename(&fixed_range)),
1157                )
1158                .map_err(ProviderError::other)?;
1159
1160                let index = indexes
1161                    .entry(segment)
1162                    .and_modify(|index| {
1163                        // Update max block
1164                        index.max_block = segment_max_block;
1165
1166                        // Update expected block range index
1167
1168                        // Remove all expected block ranges that are less than the new max block
1169                        index
1170                            .expected_block_ranges_by_max_block
1171                            .retain(|_, block_range| block_range.start() < fixed_range.start());
1172                        // Insert new expected block range
1173                        index
1174                            .expected_block_ranges_by_max_block
1175                            .insert(fixed_range.end(), fixed_range);
1176                    })
1177                    .or_insert_with(|| StaticFileSegmentIndex {
1178                        min_block_range: None,
1179                        max_block: segment_max_block,
1180                        expected_block_ranges_by_max_block: BTreeMap::from([(
1181                            fixed_range.end(),
1182                            fixed_range,
1183                        )]),
1184                        available_block_ranges_by_max_tx: None,
1185                    });
1186
1187                // Update min_block to track the lowest block range of the segment.
1188                // This is initially set by initialize_index() on node startup, but must be updated
1189                // as the file grows to prevent stale values.
1190                //
1191                // Without this update, min_block can remain at genesis (e.g. Some([0..=0]) or None)
1192                // even after syncing to higher blocks (e.g. [0..=100]). A stale
1193                // min_block causes get_lowest_static_file_block() to return the
1194                // wrong end value, which breaks pruning logic that relies on it for
1195                // safety checks.
1196                //
1197                // Example progression:
1198                // 1. Node starts, initialize_index() sets min_block = [0..=0]
1199                // 2. Sync to block 100, this update sets min_block = [0..=100]
1200                // 3. Pruner calls get_lowest_static_file_block() -> returns 100 (correct). Without
1201                //    this update, it would incorrectly return 0 (stale)
1202                if let Some(current_block_range) = jar.user_header().block_range() {
1203                    if let Some(min_block_range) = index.min_block_range.as_mut() {
1204                        // delete_jar WILL ALWAYS re-initialize all indexes, so we are always
1205                        // sure that current_min is always the lowest.
1206                        if current_block_range.start() == min_block_range.start() {
1207                            *min_block_range = current_block_range;
1208                        }
1209                    } else {
1210                        index.min_block_range = Some(current_block_range);
1211                    }
1212                }
1213
1214                // Updates the tx index by first removing all entries which have a higher
1215                // block_start than our current static file.
1216                if let Some(tx_range) = jar.user_header().tx_range() {
1217                    // Current block range has the same block start as `fixed_range``, but block end
1218                    // might be different if we are still filling this static file.
1219                    if let Some(current_block_range) = jar.user_header().block_range() {
1220                        let tx_end = tx_range.end();
1221
1222                        // Considering that `update_index` is called when we either append/truncate,
1223                        // we are sure that we are handling the latest data
1224                        // points.
1225                        //
1226                        // Here we remove every entry of the index that has a block start higher or
1227                        // equal than our current one. This is important in the case
1228                        // that we prune a lot of rows resulting in a file (and thus
1229                        // a higher block range) deletion.
1230                        if let Some(index) = index.available_block_ranges_by_max_tx.as_mut() {
1231                            index
1232                                .retain(|_, block_range| block_range.start() < fixed_range.start());
1233                            index.insert(tx_end, current_block_range);
1234                        } else {
1235                            index.available_block_ranges_by_max_tx =
1236                                Some(BTreeMap::from([(tx_end, current_block_range)]));
1237                        }
1238                    }
1239                } else if segment.is_tx_based() {
1240                    // The unwinded file has no more transactions/receipts. However, the highest
1241                    // block is within this files' block range. We only retain
1242                    // entries with block ranges before the current one.
1243                    if let Some(index) = index.available_block_ranges_by_max_tx.as_mut() {
1244                        index.retain(|_, block_range| block_range.start() < fixed_range.start());
1245                    }
1246
1247                    // If the index is empty, just remove it.
1248                    index.available_block_ranges_by_max_tx.take_if(|index| index.is_empty());
1249                }
1250
1251                // Update the cached provider.
1252                debug!(target: "providers::static_file", ?segment, "Inserting updated jar into cache");
1253                self.map.insert((fixed_range.end(), segment), LoadedJar::new(jar)?);
1254
1255                // Delete any cached provider that no longer has an associated jar.
1256                debug!(target: "providers::static_file", ?segment, "Cleaning up jar map");
1257                self.map.retain(|(end, seg), _| !(*seg == segment && *end > fixed_range.end()));
1258            }
1259            None => {
1260                debug!(target: "providers::static_file", ?segment, "Removing segment from index");
1261                indexes.remove(segment);
1262            }
1263        };
1264
1265        debug!(target: "providers::static_file", ?segment, "Updated provider index");
1266        Ok(())
1267    }
1268
1269    /// Initializes the inner transaction and block index
1270    pub fn initialize_index(&self) -> ProviderResult<()> {
1271        let mut indexes = self.indexes.write();
1272        indexes.clear();
1273
1274        for (segment, headers) in &*iter_static_files(&self.path).map_err(ProviderError::other)? {
1275            // Update first and last block for each segment
1276            //
1277            // It's safe to call `expect` here, because every segment has at least one header
1278            // associated with it.
1279            let min_block_range = Some(headers.first().expect("headers are not empty").0);
1280            let max_block = headers.last().expect("headers are not empty").0.end();
1281
1282            let mut expected_block_ranges_by_max_block = BTreeMap::default();
1283            let mut available_block_ranges_by_max_tx = None;
1284
1285            for (block_range, header) in headers {
1286                // Update max expected block -> expected_block_range index
1287                expected_block_ranges_by_max_block
1288                    .insert(header.expected_block_end(), header.expected_block_range());
1289
1290                // Update max tx -> block_range index
1291                if let Some(tx_range) = header.tx_range() {
1292                    let tx_end = tx_range.end();
1293
1294                    available_block_ranges_by_max_tx
1295                        .get_or_insert_with(BTreeMap::default)
1296                        .insert(tx_end, *block_range);
1297                }
1298            }
1299
1300            indexes.insert(
1301                segment,
1302                StaticFileSegmentIndex {
1303                    min_block_range,
1304                    max_block,
1305                    expected_block_ranges_by_max_block,
1306                    available_block_ranges_by_max_tx,
1307                },
1308            );
1309        }
1310
1311        // If this is a re-initialization, we need to clear this as well
1312        self.map.clear();
1313
1314        // initialize the expired history height to the lowest static file block
1315        if let Some(lowest_range) =
1316            indexes.get(StaticFileSegment::Transactions).and_then(|index| index.min_block_range)
1317        {
1318            // the earliest height is the lowest available block number
1319            self.earliest_history_height
1320                .store(lowest_range.start(), std::sync::atomic::Ordering::Relaxed);
1321        }
1322
1323        Ok(())
1324    }
1325
1326    /// Ensures that any broken invariants which cannot be healed on the spot return a pipeline
1327    /// target to unwind to.
1328    ///
1329    /// Two types of consistency checks are done for:
1330    ///
1331    /// 1) When a static file fails to commit but the underlying data was changed.
1332    /// 2) When a static file was committed, but the required database transaction was not.
1333    ///
1334    /// For 1) it can self-heal if `self.access.is_read_only()` is set to `false`. Otherwise, it
1335    /// will return an error.
1336    /// For 2) the invariants below are checked, and if broken, might require a pipeline unwind
1337    /// to heal.
1338    ///
1339    /// For each static file segment:
1340    /// * the corresponding database table should overlap or have continuity in their keys
1341    ///   ([`TxNumber`] or [`BlockNumber`]).
1342    /// * its highest block should match the stage checkpoint block number if it's equal or higher
1343    ///   than the corresponding database table last entry.
1344    ///
1345    /// Returns a [`Option`] of [`PipelineTarget::Unwind`] if any healing is further required.
1346    ///
1347    /// WARNING: No static file writer should be held before calling this function, otherwise it
1348    /// will deadlock.
1349    #[instrument(skip(self, provider), fields(read_only = self.is_read_only()))]
1350    pub fn check_consistency<Provider>(
1351        &self,
1352        provider: &Provider,
1353    ) -> ProviderResult<Option<PipelineTarget>>
1354    where
1355        Provider: DBProvider
1356            + BlockReader
1357            + StageCheckpointReader
1358            + PruneCheckpointReader
1359            + ChainSpecProvider
1360            + StorageSettingsCache,
1361        N: NodePrimitives<Receipt: Value, BlockHeader: Value, SignedTx: Value>,
1362    {
1363        // OVM historical import is broken and does not work with this check. It's importing
1364        // duplicated receipts resulting in having more receipts than the expected transaction
1365        // range.
1366        //
1367        // If we detect an OVM import was done (block #1 <https://optimistic.etherscan.io/block/1>), skip it.
1368        // More on [#11099](https://github.com/paradigmxyz/reth/pull/11099).
1369        if provider.chain_spec().is_optimism() &&
1370            reth_chainspec::Chain::optimism_mainnet() == provider.chain_spec().chain_id()
1371        {
1372            // check whether we have the first OVM block: <https://optimistic.etherscan.io/block/0xbee7192e575af30420cae0c7776304ac196077ee72b048970549e4f08e875453>
1373            const OVM_HEADER_1_HASH: B256 =
1374                b256!("0xbee7192e575af30420cae0c7776304ac196077ee72b048970549e4f08e875453");
1375            if provider.block_number(OVM_HEADER_1_HASH)?.is_some() {
1376                info!(target: "reth::cli",
1377                    "Skipping storage verification for OP mainnet, expected inconsistency in OVM chain"
1378                );
1379                return Ok(None);
1380            }
1381        }
1382
1383        info!(target: "reth::cli", "Verifying storage consistency.");
1384
1385        let mut unwind_target: Option<BlockNumber> = None;
1386
1387        let mut update_unwind_target = |new_target| {
1388            unwind_target =
1389                unwind_target.map(|current| current.min(new_target)).or(Some(new_target));
1390        };
1391
1392        for segment in self.segments_to_check(provider) {
1393            let span = info_span!(
1394                "Checking consistency for segment",
1395                ?segment,
1396                initial_highest_block = tracing::field::Empty,
1397                highest_block = tracing::field::Empty,
1398                highest_tx = tracing::field::Empty,
1399            );
1400            let _guard = span.enter();
1401
1402            debug!(target: "reth::providers::static_file", "Checking consistency for segment");
1403
1404            // Heal file-level inconsistencies and get before/after highest block
1405            let (initial_highest_block, mut highest_block) = self.maybe_heal_segment(segment)?;
1406            span.record("initial_highest_block", initial_highest_block);
1407            span.record("highest_block", highest_block);
1408
1409            // Only applies to block-based static files. (Headers)
1410            //
1411            // The updated `highest_block` may have decreased if we healed from a pruning
1412            // interruption.
1413            if initial_highest_block != highest_block {
1414                info!(
1415                    target: "reth::providers::static_file",
1416                    unwind_target = highest_block,
1417                    "Setting unwind target."
1418                );
1419                update_unwind_target(highest_block.unwrap_or_default());
1420            }
1421
1422            // Only applies to transaction-based static files. (Receipts & Transactions)
1423            //
1424            // Make sure the last transaction matches the last block from its indices, since a heal
1425            // from a pruning interruption might have decreased the number of transactions without
1426            // being able to update the last block of the static file segment.
1427            let highest_tx = self.get_highest_static_file_tx(segment);
1428            span.record("highest_tx", highest_tx);
1429            debug!(target: "reth::providers::static_file", "Checking tx index segment");
1430
1431            if let Some(highest_tx) = highest_tx {
1432                let mut last_block = highest_block.unwrap_or_default();
1433                debug!(target: "reth::providers::static_file", last_block, highest_tx, "Verifying last transaction matches last block indices");
1434                loop {
1435                    let Some(indices) = provider.block_body_indices(last_block)? else {
1436                        debug!(target: "reth::providers::static_file", last_block, "Block body indices not found, static files ahead of database");
1437                        // If the block body indices can not be found, then it means that static
1438                        // files is ahead of database, and the `ensure_invariants` check will fix
1439                        // it by comparing with stage checkpoints.
1440                        break
1441                    };
1442
1443                    debug!(target: "reth::providers::static_file", last_block, last_tx_num = indices.last_tx_num(), "Found block body indices");
1444
1445                    if indices.last_tx_num() <= highest_tx {
1446                        break
1447                    }
1448
1449                    if last_block == 0 {
1450                        debug!(target: "reth::providers::static_file", "Reached block 0 in verification loop");
1451                        break
1452                    }
1453
1454                    last_block -= 1;
1455
1456                    info!(
1457                        target: "reth::providers::static_file",
1458                        highest_block = self.get_highest_static_file_block(segment),
1459                        unwind_target = last_block,
1460                        "Setting unwind target."
1461                    );
1462                    span.record("highest_block", last_block);
1463                    highest_block = Some(last_block);
1464                    update_unwind_target(last_block);
1465                }
1466            }
1467
1468            debug!(target: "reth::providers::static_file", "Ensuring invariants for segment");
1469
1470            match self.ensure_invariants_for(provider, segment, highest_tx, highest_block)? {
1471                Some(unwind) => {
1472                    debug!(target: "reth::providers::static_file", unwind_target=unwind, "Invariants check returned unwind target");
1473                    update_unwind_target(unwind);
1474                }
1475                None => {
1476                    debug!(target: "reth::providers::static_file", "Invariants check completed, no unwind needed")
1477                }
1478            }
1479        }
1480
1481        Ok(unwind_target.map(PipelineTarget::Unwind))
1482    }
1483
1484    /// Heals file-level (`NippyJar`) inconsistencies for eligible static file
1485    /// segments.
1486    ///
1487    /// Call before [`Self::check_consistency`] so files are internally
1488    /// consistent.
1489    ///
1490    /// Uses the same segment-skip logic as [`Self::check_consistency`], but
1491    /// does not compare with database checkpoints or prune against them.
1492    pub fn check_file_consistency<Provider>(&self, provider: &Provider) -> ProviderResult<()>
1493    where
1494        Provider: DBProvider + ChainSpecProvider + StorageSettingsCache + PruneCheckpointReader,
1495    {
1496        info!(target: "reth::cli", "Healing static file inconsistencies.");
1497
1498        for segment in self.segments_to_check(provider) {
1499            let _guard = info_span!("healing_static_file_segment", ?segment).entered();
1500            let _ = self.maybe_heal_segment(segment)?;
1501        }
1502
1503        Ok(())
1504    }
1505
1506    /// Returns the static file segments that should be checked/healed for this provider.
1507    fn segments_to_check<'a, Provider>(
1508        &'a self,
1509        provider: &'a Provider,
1510    ) -> impl Iterator<Item = StaticFileSegment> + 'a
1511    where
1512        Provider: DBProvider + ChainSpecProvider + StorageSettingsCache + PruneCheckpointReader,
1513    {
1514        StaticFileSegment::iter()
1515            .filter(move |segment| self.should_check_segment(provider, *segment))
1516    }
1517
1518    /// True if the given segment should be checked/healed for this provider.
1519    fn should_check_segment<Provider>(
1520        &self,
1521        provider: &Provider,
1522        segment: StaticFileSegment,
1523    ) -> bool
1524    where
1525        Provider: DBProvider + ChainSpecProvider + StorageSettingsCache + PruneCheckpointReader,
1526    {
1527        match segment {
1528            StaticFileSegment::Headers | StaticFileSegment::Transactions => true,
1529            StaticFileSegment::Receipts => {
1530                if EitherWriter::receipts_destination(provider).is_database() {
1531                    // Old pruned nodes (including full node) do not store receipts as static
1532                    // files.
1533                    debug!(target: "reth::providers::static_file", ?segment, "Skipping receipts segment: receipts stored in database");
1534                    return false;
1535                }
1536
1537                if NamedChain::Gnosis == provider.chain_spec().chain_id() ||
1538                    NamedChain::Chiado == provider.chain_spec().chain_id()
1539                {
1540                    // Gnosis and Chiado's historical import is broken and does not work with
1541                    // this check. They are importing receipts along
1542                    // with importing headers/bodies.
1543                    debug!(target: "reth::providers::static_file", ?segment, "Skipping receipts segment: broken historical import for gnosis/chiado");
1544                    return false;
1545                }
1546
1547                true
1548            }
1549            StaticFileSegment::TransactionSenders => {
1550                if EitherWriterDestination::senders(provider).is_database() {
1551                    debug!(target: "reth::providers::static_file", ?segment, "Skipping senders segment: senders stored in database");
1552                    return false;
1553                }
1554
1555                if Self::is_segment_fully_pruned(provider, PruneSegment::SenderRecovery) {
1556                    debug!(target: "reth::providers::static_file", ?segment, "Skipping senders segment: fully pruned");
1557                    return false;
1558                }
1559
1560                true
1561            }
1562            StaticFileSegment::AccountChangeSets => {
1563                if EitherWriter::account_changesets_destination(provider).is_database() {
1564                    debug!(target: "reth::providers::static_file", ?segment, "Skipping account changesets segment: changesets stored in database");
1565                    return false;
1566                }
1567                true
1568            }
1569            StaticFileSegment::StorageChangeSets => {
1570                if EitherWriter::storage_changesets_destination(provider).is_database() {
1571                    debug!(target: "reth::providers::static_file", ?segment, "Skipping storage changesets segment: changesets stored in database");
1572                    return false
1573                }
1574                true
1575            }
1576        }
1577    }
1578
1579    /// Returns `true` if the given prune segment has a checkpoint with
1580    /// [`reth_prune_types::PruneMode::Full`], indicating all data for this segment has been
1581    /// intentionally deleted.
1582    fn is_segment_fully_pruned<Provider>(provider: &Provider, segment: PruneSegment) -> bool
1583    where
1584        Provider: PruneCheckpointReader,
1585    {
1586        provider
1587            .get_prune_checkpoint(segment)
1588            .ok()
1589            .flatten()
1590            .is_some_and(|checkpoint| checkpoint.prune_mode.is_full())
1591    }
1592
1593    /// Checks consistency of the latest static file segment and throws an
1594    /// error if at fault.
1595    ///
1596    /// Read-only.
1597    fn check_segment_consistency(&self, segment: StaticFileSegment) -> ProviderResult<()> {
1598        debug!(target: "reth::providers::static_file", "Checking segment consistency");
1599        if let Some(latest_block) = self.get_highest_static_file_block(segment) {
1600            let file_path = self
1601                .directory()
1602                .join(segment.filename(&self.find_fixed_range(segment, latest_block)));
1603            debug!(target: "reth::providers::static_file", ?file_path, latest_block, "Loading NippyJar for consistency check");
1604
1605            let jar = NippyJar::<SegmentHeader>::load(&file_path).map_err(ProviderError::other)?;
1606            debug!(target: "reth::providers::static_file", "NippyJar loaded, checking consistency");
1607
1608            NippyJarChecker::new(jar).check_consistency().map_err(ProviderError::other)?;
1609            debug!(target: "reth::providers::static_file", "NippyJar consistency check passed");
1610        } else {
1611            debug!(target: "reth::providers::static_file", "No static file block found, skipping consistency check");
1612        }
1613        Ok(())
1614    }
1615
1616    /// Attempts to heal file-level (`NippyJar`) inconsistencies for a single static file segment.
1617    ///
1618    /// Returns the highest block before and after healing, which can be used to detect
1619    /// if healing from a pruning interruption decreased the highest block.
1620    ///
1621    /// File consistency is broken if:
1622    ///
1623    /// * appending data was interrupted before a config commit, then data file will be truncated
1624    ///   according to the config.
1625    ///
1626    /// * pruning data was interrupted before a config commit, then we have deleted data that we are
1627    ///   expected to still have. We need to check the Database and unwind everything accordingly.
1628    ///
1629    /// **Note:** In read-only mode, this will return an error if a consistency issue is detected,
1630    /// since healing requires write access.
1631    fn maybe_heal_segment(
1632        &self,
1633        segment: StaticFileSegment,
1634    ) -> ProviderResult<(Option<BlockNumber>, Option<BlockNumber>)> {
1635        let initial_highest_block = self.get_highest_static_file_block(segment);
1636        debug!(target: "reth::providers::static_file", ?initial_highest_block, "Initial highest block for segment");
1637
1638        if self.access.is_read_only() {
1639            // Read-only mode: cannot modify files, so just validate consistency and error if
1640            // broken.
1641            debug!(target: "reth::providers::static_file", "Checking segment consistency (read-only)");
1642            self.check_segment_consistency(segment)?;
1643        } else {
1644            // Writable mode: fetching the writer will automatically heal any file-level
1645            // inconsistency by truncating data to match the last committed config.
1646            debug!(target: "reth::providers::static_file", "Fetching latest writer which might heal any potential inconsistency");
1647            self.latest_writer(segment)?;
1648        }
1649
1650        // The updated `highest_block` may have decreased if we healed from a
1651        // pruning interruption.
1652        let highest_block = self.get_highest_static_file_block(segment);
1653
1654        Ok((initial_highest_block, highest_block))
1655    }
1656
1657    /// Ensure invariants for each corresponding table and static file segment.
1658    fn ensure_invariants_for<Provider>(
1659        &self,
1660        provider: &Provider,
1661        segment: StaticFileSegment,
1662        highest_tx: Option<u64>,
1663        highest_block: Option<BlockNumber>,
1664    ) -> ProviderResult<Option<BlockNumber>>
1665    where
1666        Provider: DBProvider + BlockReader + StageCheckpointReader,
1667        N: NodePrimitives<Receipt: Value, BlockHeader: Value, SignedTx: Value>,
1668    {
1669        match segment {
1670            StaticFileSegment::Headers => self
1671                .ensure_invariants::<_, tables::Headers<N::BlockHeader>>(
1672                    provider,
1673                    segment,
1674                    highest_block,
1675                    highest_block,
1676                ),
1677            StaticFileSegment::Transactions => self
1678                .ensure_invariants::<_, tables::Transactions<N::SignedTx>>(
1679                    provider,
1680                    segment,
1681                    highest_tx,
1682                    highest_block,
1683                ),
1684            StaticFileSegment::Receipts => self
1685                .ensure_invariants::<_, tables::Receipts<N::Receipt>>(
1686                    provider,
1687                    segment,
1688                    highest_tx,
1689                    highest_block,
1690                ),
1691            StaticFileSegment::TransactionSenders => self
1692                .ensure_invariants::<_, tables::TransactionSenders>(
1693                    provider,
1694                    segment,
1695                    highest_tx,
1696                    highest_block,
1697                ),
1698            StaticFileSegment::AccountChangeSets => self
1699                .ensure_invariants::<_, tables::AccountChangeSets>(
1700                    provider,
1701                    segment,
1702                    highest_tx,
1703                    highest_block,
1704                ),
1705            StaticFileSegment::StorageChangeSets => self
1706                .ensure_changeset_invariants_by_block::<_, tables::StorageChangeSets, _>(
1707                    provider,
1708                    segment,
1709                    highest_block,
1710                    |key| key.block_number(),
1711                ),
1712        }
1713    }
1714
1715    /// Check invariants for each corresponding table and static file segment:
1716    ///
1717    /// * the corresponding database table should overlap or have continuity in their keys
1718    ///   ([`TxNumber`] or [`BlockNumber`]).
1719    /// * its highest block should match the stage checkpoint block number if it's equal or higher
1720    ///   than the corresponding database table last entry.
1721    ///   * If the checkpoint block is higher, then request a pipeline unwind to the static file
1722    ///     block. This is expressed by returning [`Some`] with the requested pipeline unwind
1723    ///     target.
1724    ///   * If the checkpoint block is lower, then heal by removing rows from the static file. In
1725    ///     this case, the rows will be removed and [`None`] will be returned.
1726    ///
1727    /// * If the database tables overlap with static files and have contiguous keys, or the
1728    ///   checkpoint block matches the highest static files block, then [`None`] will be returned.
1729    #[instrument(skip(self, provider, segment), fields(table = T::NAME))]
1730    fn ensure_invariants<Provider, T: Table<Key = u64>>(
1731        &self,
1732        provider: &Provider,
1733        segment: StaticFileSegment,
1734        highest_static_file_entry: Option<u64>,
1735        highest_static_file_block: Option<BlockNumber>,
1736    ) -> ProviderResult<Option<BlockNumber>>
1737    where
1738        Provider: DBProvider + BlockReader + StageCheckpointReader,
1739    {
1740        debug!(target: "reth::providers::static_file", "Ensuring invariants");
1741        let mut db_cursor = provider.tx_ref().cursor_read::<T>()?;
1742
1743        if let Some((db_first_entry, _)) = db_cursor.first()? {
1744            debug!(target: "reth::providers::static_file", db_first_entry, "Found first database entry");
1745            if let (Some(highest_entry), Some(highest_block)) =
1746                (highest_static_file_entry, highest_static_file_block)
1747            {
1748                // If there is a gap between the entry found in static file and
1749                // database, then we have most likely lost static file data and need to unwind so we
1750                // can load it again
1751                if !(db_first_entry <= highest_entry || highest_entry + 1 == db_first_entry) {
1752                    info!(
1753                        target: "reth::providers::static_file",
1754                        ?db_first_entry,
1755                        ?highest_entry,
1756                        unwind_target = highest_block,
1757                        "Setting unwind target."
1758                    );
1759                    return Ok(Some(highest_block));
1760                }
1761            }
1762
1763            if let Some((db_last_entry, _)) = db_cursor.last()? &&
1764                highest_static_file_entry
1765                    .is_none_or(|highest_entry| db_last_entry > highest_entry)
1766            {
1767                debug!(target: "reth::providers::static_file", db_last_entry, "Database has entries beyond static files, no unwind needed");
1768                return Ok(None)
1769            }
1770        } else {
1771            debug!(target: "reth::providers::static_file", "No database entries found");
1772        }
1773
1774        let highest_static_file_entry = highest_static_file_entry.unwrap_or_default();
1775        let highest_static_file_block = highest_static_file_block.unwrap_or_default();
1776
1777        // If static file entry is ahead of the database entries, then ensure the checkpoint block
1778        // number matches.
1779        let stage_id = segment.to_stage_id();
1780        let checkpoint_block_number =
1781            provider.get_stage_checkpoint(stage_id)?.unwrap_or_default().block_number;
1782        debug!(target: "reth::providers::static_file", ?stage_id, checkpoint_block_number, "Retrieved stage checkpoint");
1783
1784        // If the checkpoint is ahead, then we lost static file data. May be data corruption.
1785        if checkpoint_block_number > highest_static_file_block {
1786            info!(
1787                target: "reth::providers::static_file",
1788                checkpoint_block_number,
1789                unwind_target = highest_static_file_block,
1790                "Setting unwind target."
1791            );
1792            return Ok(Some(highest_static_file_block));
1793        }
1794
1795        // If the checkpoint is ahead, or matches, then nothing to do.
1796        if checkpoint_block_number >= highest_static_file_block {
1797            debug!(target: "reth::providers::static_file", "Invariants ensured, returning None");
1798            return Ok(None);
1799        }
1800
1801        // If the checkpoint is behind, then we failed to do a database commit
1802        // **but committed** to static files on executing a stage, or the
1803        // reverse on unwinding a stage.
1804        //
1805        // All we need to do is to prune the extra static file rows.
1806        info!(
1807            target: "reth::providers",
1808            from = highest_static_file_block,
1809            to = checkpoint_block_number,
1810            "Unwinding static file segment."
1811        );
1812        let mut writer = self.latest_writer(segment)?;
1813
1814        match segment {
1815            StaticFileSegment::Headers => {
1816                let prune_count = highest_static_file_block - checkpoint_block_number;
1817                debug!(target: "reth::providers::static_file", prune_count, "Pruning headers");
1818                // TODO(joshie): is_block_meta
1819                writer.prune_headers(prune_count)?;
1820            }
1821            StaticFileSegment::Transactions |
1822            StaticFileSegment::Receipts |
1823            StaticFileSegment::TransactionSenders => {
1824                if let Some(block) = provider.block_body_indices(checkpoint_block_number)? {
1825                    let number = highest_static_file_entry - block.last_tx_num();
1826                    debug!(target: "reth::providers::static_file", prune_count = number, checkpoint_block_number, "Pruning transaction based segment");
1827
1828                    match segment {
1829                        StaticFileSegment::Transactions => {
1830                            writer.prune_transactions(number, checkpoint_block_number)?
1831                        }
1832                        StaticFileSegment::Receipts => {
1833                            writer.prune_receipts(number, checkpoint_block_number)?
1834                        }
1835                        StaticFileSegment::TransactionSenders => {
1836                            writer.prune_transaction_senders(number, checkpoint_block_number)?
1837                        }
1838                        StaticFileSegment::Headers |
1839                        StaticFileSegment::AccountChangeSets |
1840                        StaticFileSegment::StorageChangeSets => {
1841                            unreachable!()
1842                        }
1843                    }
1844                } else {
1845                    debug!(target: "reth::providers::static_file", checkpoint_block_number, "No block body indices found for checkpoint block");
1846                }
1847            }
1848            StaticFileSegment::AccountChangeSets => {
1849                writer.prune_account_changesets(checkpoint_block_number)?;
1850            }
1851            StaticFileSegment::StorageChangeSets => {
1852                writer.prune_storage_changesets(checkpoint_block_number)?;
1853            }
1854        }
1855
1856        debug!(target: "reth::providers::static_file", "Committing writer after pruning");
1857        writer.commit()?;
1858        debug!(target: "reth::providers::static_file", "Writer committed successfully");
1859
1860        debug!(target: "reth::providers::static_file", "Invariants ensured, returning None");
1861        Ok(None)
1862    }
1863
1864    fn ensure_changeset_invariants_by_block<Provider, T, F>(
1865        &self,
1866        provider: &Provider,
1867        segment: StaticFileSegment,
1868        highest_static_file_block: Option<BlockNumber>,
1869        block_from_key: F,
1870    ) -> ProviderResult<Option<BlockNumber>>
1871    where
1872        Provider: DBProvider + BlockReader + StageCheckpointReader,
1873        T: Table,
1874        F: Fn(&T::Key) -> BlockNumber,
1875    {
1876        debug!(
1877            target: "reth::providers::static_file",
1878            ?segment,
1879            ?highest_static_file_block,
1880            "Ensuring changeset invariants"
1881        );
1882        let mut db_cursor = provider.tx_ref().cursor_read::<T>()?;
1883
1884        if let Some((db_first_key, _)) = db_cursor.first()? {
1885            let db_first_block = block_from_key(&db_first_key);
1886            if let Some(highest_block) = highest_static_file_block &&
1887                !(db_first_block <= highest_block || highest_block + 1 == db_first_block)
1888            {
1889                info!(
1890                    target: "reth::providers::static_file",
1891                    ?db_first_block,
1892                    ?highest_block,
1893                    unwind_target = highest_block,
1894                    ?segment,
1895                    "Setting unwind target."
1896                );
1897                return Ok(Some(highest_block))
1898            }
1899
1900            if let Some((db_last_key, _)) = db_cursor.last()? &&
1901                highest_static_file_block
1902                    .is_none_or(|highest_block| block_from_key(&db_last_key) > highest_block)
1903            {
1904                debug!(
1905                    target: "reth::providers::static_file",
1906                    ?segment,
1907                    "Database has entries beyond static files, no unwind needed"
1908                );
1909                return Ok(None)
1910            }
1911        } else {
1912            debug!(target: "reth::providers::static_file", ?segment, "No database entries found");
1913        }
1914
1915        let highest_static_file_block = highest_static_file_block.unwrap_or_default();
1916
1917        let stage_id = segment.to_stage_id();
1918        let checkpoint_block_number =
1919            provider.get_stage_checkpoint(stage_id)?.unwrap_or_default().block_number;
1920
1921        if checkpoint_block_number > highest_static_file_block {
1922            info!(
1923                target: "reth::providers::static_file",
1924                checkpoint_block_number,
1925                unwind_target = highest_static_file_block,
1926                ?segment,
1927                "Setting unwind target."
1928            );
1929            return Ok(Some(highest_static_file_block))
1930        }
1931
1932        if checkpoint_block_number < highest_static_file_block {
1933            info!(
1934                target: "reth::providers",
1935                ?segment,
1936                from = highest_static_file_block,
1937                to = checkpoint_block_number,
1938                "Unwinding static file segment."
1939            );
1940            let mut writer = self.latest_writer(segment)?;
1941            match segment {
1942                StaticFileSegment::AccountChangeSets => {
1943                    writer.prune_account_changesets(checkpoint_block_number)?;
1944                }
1945                StaticFileSegment::StorageChangeSets => {
1946                    writer.prune_storage_changesets(checkpoint_block_number)?;
1947                }
1948                _ => unreachable!("invalid segment for changeset invariants"),
1949            }
1950            writer.commit()?;
1951        }
1952
1953        Ok(None)
1954    }
1955
1956    /// Returns the earliest available block number that has not been expired and is still
1957    /// available.
1958    ///
1959    /// This means that the highest expired block (or expired block height) is
1960    /// `earliest_history_height.saturating_sub(1)`.
1961    ///
1962    /// Returns `0` if no history has been expired.
1963    pub fn earliest_history_height(&self) -> BlockNumber {
1964        self.earliest_history_height.load(std::sync::atomic::Ordering::Relaxed)
1965    }
1966
1967    /// Gets the lowest static file's block range if it exists for a static file segment.
1968    ///
1969    /// If there is nothing on disk for the given segment, this will return [`None`].
1970    pub fn get_lowest_range(&self, segment: StaticFileSegment) -> Option<SegmentRangeInclusive> {
1971        self.indexes.read().get(segment).and_then(|index| index.min_block_range)
1972    }
1973
1974    /// Gets the lowest static file's block range start if it exists for a static file segment.
1975    ///
1976    /// For example if the lowest static file has blocks 0-499, this will return 0.
1977    ///
1978    /// If there is nothing on disk for the given segment, this will return [`None`].
1979    pub fn get_lowest_range_start(&self, segment: StaticFileSegment) -> Option<BlockNumber> {
1980        self.get_lowest_range(segment).map(|range| range.start())
1981    }
1982
1983    /// Gets the lowest static file's block range end if it exists for a static file segment.
1984    ///
1985    /// For example if the static file has blocks 0-499, this will return 499.
1986    ///
1987    /// If there is nothing on disk for the given segment, this will return [`None`].
1988    pub fn get_lowest_range_end(&self, segment: StaticFileSegment) -> Option<BlockNumber> {
1989        self.get_lowest_range(segment).map(|range| range.end())
1990    }
1991
1992    /// Gets the highest static file's block height if it exists for a static file segment.
1993    ///
1994    /// If there is nothing on disk for the given segment, this will return [`None`].
1995    pub fn get_highest_static_file_block(&self, segment: StaticFileSegment) -> Option<BlockNumber> {
1996        self.indexes.read().get(segment).map(|index| index.max_block)
1997    }
1998
1999    /// Converts a range to a bounded `RangeInclusive` capped to the highest static file block.
2000    ///
2001    /// This is necessary because static file iteration beyond the tip would loop forever:
2002    /// blocks beyond the static file tip return `Ok(empty)` which is indistinguishable from
2003    /// blocks with no changes. We cap the end to the highest available block regardless of
2004    /// whether the input was unbounded or an explicit large value like `BlockNumber::MAX`.
2005    fn bound_range(
2006        &self,
2007        range: impl RangeBounds<BlockNumber>,
2008        segment: StaticFileSegment,
2009    ) -> RangeInclusive<BlockNumber> {
2010        let highest_block = self.get_highest_static_file_block(segment).unwrap_or(0);
2011
2012        let start = match range.start_bound() {
2013            Bound::Included(&n) => n,
2014            Bound::Excluded(&n) => n.saturating_add(1),
2015            Bound::Unbounded => 0,
2016        };
2017        let end = match range.end_bound() {
2018            Bound::Included(&n) => n.min(highest_block),
2019            Bound::Excluded(&n) => n.saturating_sub(1).min(highest_block),
2020            Bound::Unbounded => highest_block,
2021        };
2022
2023        start..=end
2024    }
2025
2026    /// Gets the highest static file transaction.
2027    ///
2028    /// If there is nothing on disk for the given segment, this will return [`None`].
2029    pub fn get_highest_static_file_tx(&self, segment: StaticFileSegment) -> Option<TxNumber> {
2030        self.indexes
2031            .read()
2032            .get(segment)
2033            .and_then(|index| index.available_block_ranges_by_max_tx.as_ref())
2034            .and_then(|index| index.last_key_value().map(|(last_tx, _)| *last_tx))
2035    }
2036
2037    /// Gets the highest static file block for all segments.
2038    pub fn get_highest_static_files(&self) -> HighestStaticFiles {
2039        HighestStaticFiles {
2040            receipts: self.get_highest_static_file_block(StaticFileSegment::Receipts),
2041        }
2042    }
2043
2044    /// Iterates through segment `static_files` in reverse order, executing a function until it
2045    /// returns some object. Useful for finding objects by [`TxHash`] or [`BlockHash`].
2046    pub fn find_static_file<T>(
2047        &self,
2048        segment: StaticFileSegment,
2049        func: impl Fn(StaticFileJarProvider<'_, N>) -> ProviderResult<Option<T>>,
2050    ) -> ProviderResult<Option<T>> {
2051        if let Some(ranges) =
2052            self.indexes.read().get(segment).map(|index| &index.expected_block_ranges_by_max_block)
2053        {
2054            // Iterate through all ranges in reverse order (highest to lowest)
2055            for range in ranges.values().rev() {
2056                if let Some(res) = func(self.get_or_create_jar_provider(segment, range)?)? {
2057                    return Ok(Some(res));
2058                }
2059            }
2060        }
2061
2062        Ok(None)
2063    }
2064
2065    /// Fetches data within a specified range across multiple static files.
2066    ///
2067    /// This function iteratively retrieves data using `get_fn` for each item in the given range.
2068    /// It continues fetching until the end of the range is reached or the provided `predicate`
2069    /// returns false.
2070    pub fn fetch_range_with_predicate<T, F, P>(
2071        &self,
2072        segment: StaticFileSegment,
2073        range: Range<u64>,
2074        mut get_fn: F,
2075        mut predicate: P,
2076    ) -> ProviderResult<Vec<T>>
2077    where
2078        F: FnMut(&mut StaticFileCursor<'_>, u64) -> ProviderResult<Option<T>>,
2079        P: FnMut(&T) -> bool,
2080    {
2081        let mut result = Vec::with_capacity((range.end - range.start).min(100) as usize);
2082
2083        /// Resolves to the provider for the given block or transaction number.
2084        ///
2085        /// If the static file is missing, the `result` is returned.
2086        macro_rules! get_provider {
2087            ($number:expr) => {{
2088                match self.get_segment_provider(segment, $number) {
2089                    Ok(provider) => provider,
2090                    Err(
2091                        ProviderError::MissingStaticFileBlock(_, _) |
2092                        ProviderError::MissingStaticFileTx(_, _),
2093                    ) => return Ok(result),
2094                    Err(err) => return Err(err),
2095                }
2096            }};
2097        }
2098
2099        let mut provider = get_provider!(range.start);
2100        let mut cursor = provider.cursor()?;
2101
2102        // advances number in range
2103        'outer: for number in range {
2104            // The `retrying` flag ensures a single retry attempt per `number`. If `get_fn` fails to
2105            // access data in two different static files, it halts further attempts by returning
2106            // an error, effectively preventing infinite retry loops.
2107            let mut retrying = false;
2108
2109            // advances static files if `get_fn` returns None
2110            'inner: loop {
2111                match get_fn(&mut cursor, number)? {
2112                    Some(res) => {
2113                        if !predicate(&res) {
2114                            break 'outer;
2115                        }
2116                        result.push(res);
2117                        break 'inner;
2118                    }
2119                    None => {
2120                        if retrying {
2121                            return Ok(result);
2122                        }
2123                        // There is a very small chance of hitting a deadlock if two consecutive
2124                        // static files share the same bucket in the
2125                        // internal dashmap and we don't drop the current provider
2126                        // before requesting the next one.
2127                        drop(cursor);
2128                        drop(provider);
2129                        provider = get_provider!(number);
2130                        cursor = provider.cursor()?;
2131                        retrying = true;
2132                    }
2133                }
2134            }
2135        }
2136
2137        result.shrink_to_fit();
2138
2139        Ok(result)
2140    }
2141
2142    /// Fetches data within a specified range across multiple static files.
2143    ///
2144    /// Returns an iterator over the data. Yields [`None`] if the data for the specified number is
2145    /// not found.
2146    pub fn fetch_range_iter<'a, T, F>(
2147        &'a self,
2148        segment: StaticFileSegment,
2149        range: Range<u64>,
2150        get_fn: F,
2151    ) -> ProviderResult<impl Iterator<Item = ProviderResult<Option<T>>> + 'a>
2152    where
2153        F: Fn(&mut StaticFileCursor<'_>, u64) -> ProviderResult<Option<T>> + 'a,
2154        T: std::fmt::Debug,
2155    {
2156        let mut provider = self.get_maybe_segment_provider(segment, range.start)?;
2157        Ok(range.map(move |number| {
2158            match provider
2159                .as_ref()
2160                .map(|provider| get_fn(&mut provider.cursor()?, number))
2161                .and_then(|result| result.transpose())
2162            {
2163                Some(result) => result.map(Some),
2164                None => {
2165                    // There is a very small chance of hitting a deadlock if two consecutive
2166                    // static files share the same bucket in the internal dashmap and we don't drop
2167                    // the current provider before requesting the next one.
2168                    provider.take();
2169                    provider = self.get_maybe_segment_provider(segment, number)?;
2170                    provider
2171                        .as_ref()
2172                        .map(|provider| get_fn(&mut provider.cursor()?, number))
2173                        .and_then(|result| result.transpose())
2174                        .transpose()
2175                }
2176            }
2177        }))
2178    }
2179
2180    /// Returns directory where `static_files` are located.
2181    pub fn directory(&self) -> &Path {
2182        &self.path
2183    }
2184
2185    /// Retrieves data from the database or static file, wherever it's available.
2186    ///
2187    /// # Arguments
2188    /// * `segment` - The segment of the static file to check against.
2189    /// * `index_key` - Requested index key, usually a block or transaction number.
2190    /// * `fetch_from_static_file` - A closure that defines how to fetch the data from the static
2191    ///   file provider.
2192    /// * `fetch_from_database` - A closure that defines how to fetch the data from the database
2193    ///   when the static file doesn't contain the required data or is not available.
2194    pub fn get_with_static_file_or_database<T, FS, FD>(
2195        &self,
2196        segment: StaticFileSegment,
2197        number: u64,
2198        fetch_from_static_file: FS,
2199        fetch_from_database: FD,
2200    ) -> ProviderResult<Option<T>>
2201    where
2202        FS: Fn(&Self) -> ProviderResult<Option<T>>,
2203        FD: Fn() -> ProviderResult<Option<T>>,
2204    {
2205        // If there is, check the maximum block or transaction number of the segment.
2206        let static_file_upper_bound = if segment.is_block_or_change_based() {
2207            self.get_highest_static_file_block(segment)
2208        } else {
2209            self.get_highest_static_file_tx(segment)
2210        };
2211
2212        if static_file_upper_bound
2213            .is_some_and(|static_file_upper_bound| static_file_upper_bound >= number)
2214        {
2215            return fetch_from_static_file(self);
2216        }
2217        fetch_from_database()
2218    }
2219
2220    /// Gets data within a specified range, potentially spanning different `static_files` and
2221    /// database.
2222    ///
2223    /// # Arguments
2224    /// * `segment` - The segment of the static file to query.
2225    /// * `block_or_tx_range` - The range of data to fetch.
2226    /// * `fetch_from_static_file` - A function to fetch data from the `static_file`.
2227    /// * `fetch_from_database` - A function to fetch data from the database.
2228    /// * `predicate` - A function used to evaluate each item in the fetched data. Fetching is
2229    ///   terminated when this function returns false, thereby filtering the data based on the
2230    ///   provided condition.
2231    pub fn get_range_with_static_file_or_database<T, P, FS, FD>(
2232        &self,
2233        segment: StaticFileSegment,
2234        mut block_or_tx_range: Range<u64>,
2235        fetch_from_static_file: FS,
2236        mut fetch_from_database: FD,
2237        mut predicate: P,
2238    ) -> ProviderResult<Vec<T>>
2239    where
2240        FS: Fn(&Self, Range<u64>, &mut P) -> ProviderResult<Vec<T>>,
2241        FD: FnMut(Range<u64>, P) -> ProviderResult<Vec<T>>,
2242        P: FnMut(&T) -> bool,
2243    {
2244        let mut data = Vec::new();
2245
2246        // If there is, check the maximum block or transaction number of the segment.
2247        if let Some(static_file_upper_bound) = if segment.is_block_or_change_based() {
2248            self.get_highest_static_file_block(segment)
2249        } else {
2250            self.get_highest_static_file_tx(segment)
2251        } && block_or_tx_range.start <= static_file_upper_bound
2252        {
2253            let end = block_or_tx_range.end.min(static_file_upper_bound + 1);
2254            data.extend(fetch_from_static_file(
2255                self,
2256                block_or_tx_range.start..end,
2257                &mut predicate,
2258            )?);
2259            block_or_tx_range.start = end;
2260        }
2261
2262        if block_or_tx_range.end > block_or_tx_range.start {
2263            data.extend(fetch_from_database(block_or_tx_range, predicate)?)
2264        }
2265
2266        Ok(data)
2267    }
2268
2269    /// Returns static files directory
2270    #[cfg(any(test, feature = "test-utils"))]
2271    pub fn path(&self) -> &Path {
2272        &self.path
2273    }
2274
2275    /// Returns transaction index
2276    #[cfg(any(test, feature = "test-utils"))]
2277    pub fn tx_index(&self, segment: StaticFileSegment) -> Option<SegmentRanges> {
2278        self.indexes
2279            .read()
2280            .get(segment)
2281            .and_then(|index| index.available_block_ranges_by_max_tx.as_ref())
2282            .cloned()
2283    }
2284
2285    /// Returns expected block index
2286    #[cfg(any(test, feature = "test-utils"))]
2287    pub fn expected_block_index(&self, segment: StaticFileSegment) -> Option<SegmentRanges> {
2288        self.indexes
2289            .read()
2290            .get(segment)
2291            .map(|index| &index.expected_block_ranges_by_max_block)
2292            .cloned()
2293    }
2294}
2295
2296#[derive(Debug)]
2297struct StaticFileSegmentIndex {
2298    /// Min static file block range.
2299    ///
2300    /// This index is initialized on launch to keep track of the lowest, non-expired static file
2301    /// per segment and gets updated on [`StaticFileProvider::update_index`].
2302    ///
2303    /// This tracks the lowest static file per segment together with the block range in that
2304    /// file. E.g. static file is batched in 500k block intervals then the lowest static file
2305    /// is [0..499K], and the block range is start = 0, end = 499K.
2306    ///
2307    /// This index is mainly used for history expiry, which targets transactions, e.g. pre-merge
2308    /// history expiry would lead to removing all static files below the merge height.
2309    min_block_range: Option<SegmentRangeInclusive>,
2310    /// Max static file block.
2311    max_block: u64,
2312    /// Expected static file block ranges indexed by max expected blocks.
2313    ///
2314    /// For example, a static file for expected block range `0..=499_000` may have only block range
2315    /// `0..=1000` contained in it, as it's not fully filled yet. This index maps the max expected
2316    /// block to the expected range, i.e. block `499_000` to block range `0..=499_000`.
2317    expected_block_ranges_by_max_block: SegmentRanges,
2318    /// Available on disk static file block ranges indexed by max transactions.
2319    ///
2320    /// For example, a static file for block range `0..=499_000` may only have block range
2321    /// `0..=1000` and transaction range `0..=2000` contained in it. This index maps the max
2322    /// available transaction to the available block range, i.e. transaction `2000` to block range
2323    /// `0..=1000`.
2324    available_block_ranges_by_max_tx: Option<SegmentRanges>,
2325}
2326
2327/// Helper trait to manage different [`StaticFileProviderRW`] of an `Arc<StaticFileProvider`
2328pub trait StaticFileWriter {
2329    /// The primitives type used by the static file provider.
2330    type Primitives: Send + Sync + 'static;
2331
2332    /// Returns a mutable reference to a [`StaticFileProviderRW`] of a [`StaticFileSegment`].
2333    fn get_writer(
2334        &self,
2335        block: BlockNumber,
2336        segment: StaticFileSegment,
2337    ) -> ProviderResult<StaticFileProviderRWRefMut<'_, Self::Primitives>>;
2338
2339    /// Returns a mutable reference to a [`StaticFileProviderRW`] of the latest
2340    /// [`StaticFileSegment`].
2341    fn latest_writer(
2342        &self,
2343        segment: StaticFileSegment,
2344    ) -> ProviderResult<StaticFileProviderRWRefMut<'_, Self::Primitives>>;
2345
2346    /// Commits all changes of all [`StaticFileProviderRW`] of all [`StaticFileSegment`].
2347    fn commit(&self) -> ProviderResult<()>;
2348
2349    /// Returns `true` if the static file provider has unwind queued.
2350    fn has_unwind_queued(&self) -> bool;
2351
2352    /// Finalizes all static file writers by committing their configuration to disk.
2353    ///
2354    /// Returns an error if prune is queued (use [`Self::commit`] instead).
2355    fn finalize(&self) -> ProviderResult<()>;
2356}
2357
2358impl<N: NodePrimitives> StaticFileWriter for StaticFileProvider<N> {
2359    type Primitives = N;
2360
2361    fn get_writer(
2362        &self,
2363        block: BlockNumber,
2364        segment: StaticFileSegment,
2365    ) -> ProviderResult<StaticFileProviderRWRefMut<'_, Self::Primitives>> {
2366        if self.access.is_read_only() {
2367            return Err(ProviderError::ReadOnlyStaticFileAccess);
2368        }
2369
2370        trace!(target: "providers::static_file", ?block, ?segment, "Getting static file writer.");
2371        self.writers.get_or_create(segment, || {
2372            StaticFileProviderRW::new(segment, block, Arc::downgrade(&self.0), self.metrics.clone())
2373        })
2374    }
2375
2376    fn latest_writer(
2377        &self,
2378        segment: StaticFileSegment,
2379    ) -> ProviderResult<StaticFileProviderRWRefMut<'_, Self::Primitives>> {
2380        let genesis_number = self.0.as_ref().genesis_block_number();
2381        self.get_writer(
2382            self.get_highest_static_file_block(segment).unwrap_or(genesis_number),
2383            segment,
2384        )
2385    }
2386
2387    fn commit(&self) -> ProviderResult<()> {
2388        self.writers.commit()
2389    }
2390
2391    fn has_unwind_queued(&self) -> bool {
2392        self.writers.has_unwind_queued()
2393    }
2394
2395    fn finalize(&self) -> ProviderResult<()> {
2396        self.writers.finalize()
2397    }
2398}
2399
2400impl<N: NodePrimitives> ChangeSetReader for StaticFileProvider<N> {
2401    fn account_block_changeset(
2402        &self,
2403        block_number: BlockNumber,
2404    ) -> ProviderResult<Vec<reth_db::models::AccountBeforeTx>> {
2405        let provider = match self.get_segment_provider_for_block(
2406            StaticFileSegment::AccountChangeSets,
2407            block_number,
2408            None,
2409        ) {
2410            Ok(provider) => provider,
2411            Err(ProviderError::MissingStaticFileBlock(_, _)) => return Ok(Vec::new()),
2412            Err(err) => return Err(err),
2413        };
2414
2415        if let Some(offset) = provider.read_changeset_offset(block_number)? {
2416            let mut cursor = provider.cursor()?;
2417            let mut changeset = Vec::with_capacity(offset.num_changes() as usize);
2418
2419            for i in offset.changeset_range() {
2420                if let Some(change) =
2421                    cursor.get_one::<reth_db::static_file::AccountChangesetMask>(i.into())?
2422                {
2423                    changeset.push(change)
2424                }
2425            }
2426            Ok(changeset)
2427        } else {
2428            Ok(Vec::new())
2429        }
2430    }
2431
2432    fn get_account_before_block(
2433        &self,
2434        block_number: BlockNumber,
2435        address: Address,
2436    ) -> ProviderResult<Option<reth_db::models::AccountBeforeTx>> {
2437        let provider = match self.get_segment_provider_for_block(
2438            StaticFileSegment::AccountChangeSets,
2439            block_number,
2440            None,
2441        ) {
2442            Ok(provider) => provider,
2443            Err(ProviderError::MissingStaticFileBlock(_, _)) => return Ok(None),
2444            Err(err) => return Err(err),
2445        };
2446
2447        let Some(offset) = provider.read_changeset_offset(block_number)? else {
2448            return Ok(None);
2449        };
2450
2451        let mut cursor = provider.cursor()?;
2452        let range = offset.changeset_range();
2453        let mut low = range.start;
2454        let mut high = range.end;
2455
2456        while low < high {
2457            let mid = low + (high - low) / 2;
2458            if let Some(change) =
2459                cursor.get_one::<reth_db::static_file::AccountChangesetMask>(mid.into())?
2460            {
2461                if change.address < address {
2462                    low = mid + 1;
2463                } else {
2464                    high = mid;
2465                }
2466            } else {
2467                // This is not expected but means we are out of the range / file somehow, and can't
2468                // continue
2469                debug!(
2470                    target: "providers::static_file",
2471                    ?low,
2472                    ?mid,
2473                    ?high,
2474                    ?range,
2475                    ?block_number,
2476                    ?address,
2477                    "Cannot continue binary search for account changeset fetch"
2478                );
2479                low = range.end;
2480                break;
2481            }
2482        }
2483
2484        if low < range.end &&
2485            let Some(change) = cursor
2486                .get_one::<reth_db::static_file::AccountChangesetMask>(low.into())?
2487                .filter(|change| change.address == address)
2488        {
2489            return Ok(Some(change));
2490        }
2491
2492        Ok(None)
2493    }
2494
2495    fn account_changesets_range(
2496        &self,
2497        range: impl core::ops::RangeBounds<BlockNumber>,
2498    ) -> ProviderResult<Vec<(BlockNumber, reth_db::models::AccountBeforeTx)>> {
2499        let range = self.bound_range(range, StaticFileSegment::AccountChangeSets);
2500        self.walk_account_changeset_range(range).collect()
2501    }
2502
2503    fn account_changeset_count(&self) -> ProviderResult<usize> {
2504        let mut count = 0;
2505
2506        let static_files = iter_static_files(&self.path).map_err(ProviderError::other)?;
2507        if let Some(changeset_segments) = static_files.get(StaticFileSegment::AccountChangeSets) {
2508            for (block_range, header) in changeset_segments {
2509                let csoff_path = self
2510                    .path
2511                    .join(StaticFileSegment::AccountChangeSets.filename(block_range))
2512                    .with_extension("csoff");
2513                if csoff_path.exists() {
2514                    let len = header.changeset_offsets_len();
2515                    let mut reader = ChangesetOffsetReader::new(&csoff_path, len)
2516                        .map_err(ProviderError::other)?;
2517                    let offsets = reader.get_range(0, len).map_err(ProviderError::other)?;
2518                    for offset in offsets {
2519                        count += offset.num_changes() as usize;
2520                    }
2521                }
2522            }
2523        }
2524
2525        Ok(count)
2526    }
2527}
2528
2529impl<N: NodePrimitives> StorageChangeSetReader for StaticFileProvider<N> {
2530    fn storage_changeset(
2531        &self,
2532        block_number: BlockNumber,
2533    ) -> ProviderResult<Vec<(BlockNumberAddress, StorageEntry)>> {
2534        let provider = match self.get_segment_provider_for_block(
2535            StaticFileSegment::StorageChangeSets,
2536            block_number,
2537            None,
2538        ) {
2539            Ok(provider) => provider,
2540            Err(ProviderError::MissingStaticFileBlock(_, _)) => return Ok(Vec::new()),
2541            Err(err) => return Err(err),
2542        };
2543
2544        if let Some(offset) = provider.read_changeset_offset(block_number)? {
2545            let mut cursor = provider.cursor()?;
2546            let mut changeset = Vec::with_capacity(offset.num_changes() as usize);
2547
2548            for i in offset.changeset_range() {
2549                if let Some(change) = cursor.get_one::<StorageChangesetMask>(i.into())? {
2550                    let block_address = BlockNumberAddress((block_number, change.address));
2551                    let entry = StorageEntry { key: change.key, value: change.value };
2552                    changeset.push((block_address, entry));
2553                }
2554            }
2555            Ok(changeset)
2556        } else {
2557            Ok(Vec::new())
2558        }
2559    }
2560
2561    fn get_storage_before_block(
2562        &self,
2563        block_number: BlockNumber,
2564        address: Address,
2565        storage_key: B256,
2566    ) -> ProviderResult<Option<StorageEntry>> {
2567        let provider = match self.get_segment_provider_for_block(
2568            StaticFileSegment::StorageChangeSets,
2569            block_number,
2570            None,
2571        ) {
2572            Ok(provider) => provider,
2573            Err(ProviderError::MissingStaticFileBlock(_, _)) => return Ok(None),
2574            Err(err) => return Err(err),
2575        };
2576
2577        let Some(offset) = provider.read_changeset_offset(block_number)? else {
2578            return Ok(None);
2579        };
2580
2581        let mut cursor = provider.cursor()?;
2582        let range = offset.changeset_range();
2583        let mut low = range.start;
2584        let mut high = range.end;
2585
2586        while low < high {
2587            let mid = low + (high - low) / 2;
2588            if let Some(change) = cursor.get_one::<StorageChangesetMask>(mid.into())? {
2589                match (change.address, change.key).cmp(&(address, storage_key)) {
2590                    std::cmp::Ordering::Less => low = mid + 1,
2591                    _ => high = mid,
2592                }
2593            } else {
2594                debug!(
2595                    target: "providers::static_file",
2596                    ?low,
2597                    ?mid,
2598                    ?high,
2599                    ?range,
2600                    ?block_number,
2601                    ?address,
2602                    ?storage_key,
2603                    "Cannot continue binary search for storage changeset fetch"
2604                );
2605                low = range.end;
2606                break;
2607            }
2608        }
2609
2610        if low < range.end &&
2611            let Some(change) = cursor
2612                .get_one::<StorageChangesetMask>(low.into())?
2613                .filter(|change| change.address == address && change.key == storage_key)
2614        {
2615            return Ok(Some(StorageEntry { key: change.key, value: change.value }));
2616        }
2617
2618        Ok(None)
2619    }
2620
2621    fn storage_changesets_range(
2622        &self,
2623        range: impl RangeBounds<BlockNumber>,
2624    ) -> ProviderResult<Vec<(BlockNumberAddress, StorageEntry)>> {
2625        let range = self.bound_range(range, StaticFileSegment::StorageChangeSets);
2626        self.walk_storage_changeset_range(range).collect()
2627    }
2628
2629    fn storage_changeset_count(&self) -> ProviderResult<usize> {
2630        let mut count = 0;
2631
2632        let static_files = iter_static_files(&self.path).map_err(ProviderError::other)?;
2633        if let Some(changeset_segments) = static_files.get(StaticFileSegment::StorageChangeSets) {
2634            for (block_range, header) in changeset_segments {
2635                let csoff_path = self
2636                    .path
2637                    .join(StaticFileSegment::StorageChangeSets.filename(block_range))
2638                    .with_extension("csoff");
2639                if csoff_path.exists() {
2640                    let len = header.changeset_offsets_len();
2641                    let mut reader = ChangesetOffsetReader::new(&csoff_path, len)
2642                        .map_err(ProviderError::other)?;
2643                    let offsets = reader.get_range(0, len).map_err(ProviderError::other)?;
2644                    for offset in offsets {
2645                        count += offset.num_changes() as usize;
2646                    }
2647                }
2648            }
2649        }
2650
2651        Ok(count)
2652    }
2653}
2654
2655impl<N: NodePrimitives> StaticFileProvider<N> {
2656    /// Creates an iterator for walking through account changesets in the specified block range.
2657    ///
2658    /// This returns a lazy iterator that fetches changesets block by block to avoid loading
2659    /// everything into memory at once.
2660    ///
2661    /// Accepts any range type that implements `RangeBounds<BlockNumber>`, including:
2662    /// - `Range<BlockNumber>` (e.g., `0..100`)
2663    /// - `RangeInclusive<BlockNumber>` (e.g., `0..=99`)
2664    /// - `RangeFrom<BlockNumber>` (e.g., `0..`) - iterates until exhausted
2665    pub fn walk_account_changeset_range(
2666        &self,
2667        range: impl RangeBounds<BlockNumber>,
2668    ) -> StaticFileAccountChangesetWalker<Self> {
2669        StaticFileAccountChangesetWalker::new(self.clone(), range)
2670    }
2671
2672    /// Creates an iterator for walking through storage changesets in the specified block range.
2673    pub fn walk_storage_changeset_range(
2674        &self,
2675        range: impl RangeBounds<BlockNumber>,
2676    ) -> StaticFileStorageChangesetWalker<Self> {
2677        StaticFileStorageChangesetWalker::new(self.clone(), range)
2678    }
2679}
2680
2681impl<N: NodePrimitives<BlockHeader: Value>> HeaderProvider for StaticFileProvider<N> {
2682    type Header = N::BlockHeader;
2683
2684    fn header(&self, block_hash: BlockHash) -> ProviderResult<Option<Self::Header>> {
2685        self.find_static_file(StaticFileSegment::Headers, |jar_provider| {
2686            Ok(jar_provider
2687                .cursor()?
2688                .get_two::<HeaderWithHashMask<Self::Header>>((&block_hash).into())?
2689                .and_then(|(header, hash)| {
2690                    if hash == block_hash {
2691                        return Some(header);
2692                    }
2693                    None
2694                }))
2695        })
2696    }
2697
2698    fn header_by_number(&self, num: BlockNumber) -> ProviderResult<Option<Self::Header>> {
2699        self.get_segment_provider_for_block(StaticFileSegment::Headers, num, None)
2700            .and_then(|provider| provider.header_by_number(num))
2701            .or_else(|err| {
2702                if let ProviderError::MissingStaticFileBlock(_, _) = err {
2703                    Ok(None)
2704                } else {
2705                    Err(err)
2706                }
2707            })
2708    }
2709
2710    fn headers_range(
2711        &self,
2712        range: impl RangeBounds<BlockNumber>,
2713    ) -> ProviderResult<Vec<Self::Header>> {
2714        self.fetch_range_with_predicate(
2715            StaticFileSegment::Headers,
2716            to_range(range),
2717            |cursor, number| cursor.get_one::<HeaderMask<Self::Header>>(number.into()),
2718            |_| true,
2719        )
2720    }
2721
2722    fn sealed_header(
2723        &self,
2724        num: BlockNumber,
2725    ) -> ProviderResult<Option<SealedHeader<Self::Header>>> {
2726        self.get_segment_provider_for_block(StaticFileSegment::Headers, num, None)
2727            .and_then(|provider| provider.sealed_header(num))
2728            .or_else(|err| {
2729                if let ProviderError::MissingStaticFileBlock(_, _) = err {
2730                    Ok(None)
2731                } else {
2732                    Err(err)
2733                }
2734            })
2735    }
2736
2737    fn sealed_headers_while(
2738        &self,
2739        range: impl RangeBounds<BlockNumber>,
2740        predicate: impl FnMut(&SealedHeader<Self::Header>) -> bool,
2741    ) -> ProviderResult<Vec<SealedHeader<Self::Header>>> {
2742        self.fetch_range_with_predicate(
2743            StaticFileSegment::Headers,
2744            to_range(range),
2745            |cursor, number| {
2746                Ok(cursor
2747                    .get_two::<HeaderWithHashMask<Self::Header>>(number.into())?
2748                    .map(|(header, hash)| SealedHeader::new(header, hash)))
2749            },
2750            predicate,
2751        )
2752    }
2753}
2754
2755impl<N: NodePrimitives> BlockHashReader for StaticFileProvider<N> {
2756    fn block_hash(&self, num: u64) -> ProviderResult<Option<B256>> {
2757        self.get_segment_provider_for_block(StaticFileSegment::Headers, num, None)
2758            .and_then(|provider| provider.block_hash(num))
2759            .or_else(|err| {
2760                if let ProviderError::MissingStaticFileBlock(_, _) = err {
2761                    Ok(None)
2762                } else {
2763                    Err(err)
2764                }
2765            })
2766    }
2767
2768    fn canonical_hashes_range(
2769        &self,
2770        start: BlockNumber,
2771        end: BlockNumber,
2772    ) -> ProviderResult<Vec<B256>> {
2773        self.fetch_range_with_predicate(
2774            StaticFileSegment::Headers,
2775            start..end,
2776            |cursor, number| cursor.get_one::<BlockHashMask>(number.into()),
2777            |_| true,
2778        )
2779    }
2780}
2781
2782impl<N: NodePrimitives<SignedTx: Value + SignedTransaction, Receipt: Value>> ReceiptProvider
2783    for StaticFileProvider<N>
2784{
2785    type Receipt = N::Receipt;
2786
2787    fn receipt(&self, num: TxNumber) -> ProviderResult<Option<Self::Receipt>> {
2788        self.get_segment_provider_for_transaction(StaticFileSegment::Receipts, num, None)
2789            .and_then(|provider| provider.receipt(num))
2790            .or_else(|err| {
2791                if let ProviderError::MissingStaticFileTx(_, _) = err {
2792                    Ok(None)
2793                } else {
2794                    Err(err)
2795                }
2796            })
2797    }
2798
2799    fn receipt_by_hash(&self, hash: TxHash) -> ProviderResult<Option<Self::Receipt>> {
2800        if let Some(num) = self.transaction_id(hash)? {
2801            return self.receipt(num);
2802        }
2803        Ok(None)
2804    }
2805
2806    fn receipts_by_block(
2807        &self,
2808        _block: BlockHashOrNumber,
2809    ) -> ProviderResult<Option<Vec<Self::Receipt>>> {
2810        unreachable!()
2811    }
2812
2813    fn receipts_by_tx_range(
2814        &self,
2815        range: impl RangeBounds<TxNumber>,
2816    ) -> ProviderResult<Vec<Self::Receipt>> {
2817        self.fetch_range_with_predicate(
2818            StaticFileSegment::Receipts,
2819            to_range(range),
2820            |cursor, number| cursor.get_one::<ReceiptMask<Self::Receipt>>(number.into()),
2821            |_| true,
2822        )
2823    }
2824
2825    fn receipts_by_block_range(
2826        &self,
2827        _block_range: RangeInclusive<BlockNumber>,
2828    ) -> ProviderResult<Vec<Vec<Self::Receipt>>> {
2829        Err(ProviderError::UnsupportedProvider)
2830    }
2831}
2832
2833impl<N: NodePrimitives<SignedTx: Value, Receipt: Value, BlockHeader: Value>> TransactionsProviderExt
2834    for StaticFileProvider<N>
2835{
2836    fn transaction_hashes_by_range(
2837        &self,
2838        tx_range: Range<TxNumber>,
2839    ) -> ProviderResult<Vec<(TxHash, TxNumber)>> {
2840        let tx_range_size = (tx_range.end - tx_range.start) as usize;
2841
2842        // Transactions are different size, so chunks will not all take the same processing time. If
2843        // chunks are too big, there will be idle threads waiting for work. Choosing an
2844        // arbitrary smaller value to make sure it doesn't happen.
2845        let chunk_size = 100;
2846
2847        // iterator over the chunks
2848        let chunks = tx_range
2849            .clone()
2850            .step_by(chunk_size)
2851            .map(|start| start..std::cmp::min(start + chunk_size as u64, tx_range.end));
2852        let mut channels = Vec::with_capacity(tx_range_size.div_ceil(chunk_size));
2853
2854        for chunk_range in chunks {
2855            let (channel_tx, channel_rx) = mpsc::channel();
2856            channels.push(channel_rx);
2857
2858            let manager = self.clone();
2859
2860            // Spawn the task onto the global rayon pool
2861            // This task will send the results through the channel after it has calculated
2862            // the hash.
2863            rayon::spawn(move || {
2864                let mut rlp_buf = Vec::with_capacity(128);
2865                let _ = manager.fetch_range_with_predicate(
2866                    StaticFileSegment::Transactions,
2867                    chunk_range,
2868                    |cursor, number| {
2869                        Ok(cursor
2870                            .get_one::<TransactionMask<Self::Transaction>>(number.into())?
2871                            .map(|transaction| {
2872                                rlp_buf.clear();
2873                                let _ = channel_tx
2874                                    .send(calculate_hash((number, transaction), &mut rlp_buf));
2875                            }))
2876                    },
2877                    |_| true,
2878                );
2879            });
2880        }
2881
2882        let mut tx_list = Vec::with_capacity(tx_range_size);
2883
2884        // Iterate over channels and append the tx hashes unsorted
2885        for channel in channels {
2886            while let Ok(tx) = channel.recv() {
2887                let (tx_hash, tx_id) = tx.map_err(|boxed| *boxed)?;
2888                tx_list.push((tx_hash, tx_id));
2889            }
2890        }
2891
2892        Ok(tx_list)
2893    }
2894}
2895
2896impl<N: NodePrimitives<SignedTx: Decompress + SignedTransaction>> TransactionsProvider
2897    for StaticFileProvider<N>
2898{
2899    type Transaction = N::SignedTx;
2900
2901    fn transaction_id(&self, tx_hash: TxHash) -> ProviderResult<Option<TxNumber>> {
2902        self.find_static_file(StaticFileSegment::Transactions, |jar_provider| {
2903            let mut cursor = jar_provider.cursor()?;
2904            if cursor
2905                .get_one::<TransactionMask<Self::Transaction>>((&tx_hash).into())?
2906                .and_then(|tx| (tx.trie_hash() == tx_hash).then_some(tx))
2907                .is_some()
2908            {
2909                Ok(cursor.number())
2910            } else {
2911                Ok(None)
2912            }
2913        })
2914    }
2915
2916    fn transaction_by_id(&self, num: TxNumber) -> ProviderResult<Option<Self::Transaction>> {
2917        self.get_segment_provider_for_transaction(StaticFileSegment::Transactions, num, None)
2918            .and_then(|provider| provider.transaction_by_id(num))
2919            .or_else(|err| {
2920                if let ProviderError::MissingStaticFileTx(_, _) = err {
2921                    Ok(None)
2922                } else {
2923                    Err(err)
2924                }
2925            })
2926    }
2927
2928    fn transaction_by_id_unhashed(
2929        &self,
2930        num: TxNumber,
2931    ) -> ProviderResult<Option<Self::Transaction>> {
2932        self.get_segment_provider_for_transaction(StaticFileSegment::Transactions, num, None)
2933            .and_then(|provider| provider.transaction_by_id_unhashed(num))
2934            .or_else(|err| {
2935                if let ProviderError::MissingStaticFileTx(_, _) = err {
2936                    Ok(None)
2937                } else {
2938                    Err(err)
2939                }
2940            })
2941    }
2942
2943    fn transaction_by_hash(&self, hash: TxHash) -> ProviderResult<Option<Self::Transaction>> {
2944        self.find_static_file(StaticFileSegment::Transactions, |jar_provider| {
2945            Ok(jar_provider
2946                .cursor()?
2947                .get_one::<TransactionMask<Self::Transaction>>((&hash).into())?
2948                .and_then(|tx| (tx.trie_hash() == hash).then_some(tx)))
2949        })
2950    }
2951
2952    fn transaction_by_hash_with_meta(
2953        &self,
2954        _hash: TxHash,
2955    ) -> ProviderResult<Option<(Self::Transaction, TransactionMeta)>> {
2956        // Required data not present in static_files
2957        Err(ProviderError::UnsupportedProvider)
2958    }
2959
2960    fn transactions_by_block(
2961        &self,
2962        _block_id: BlockHashOrNumber,
2963    ) -> ProviderResult<Option<Vec<Self::Transaction>>> {
2964        // Required data not present in static_files
2965        Err(ProviderError::UnsupportedProvider)
2966    }
2967
2968    fn transactions_by_block_range(
2969        &self,
2970        _range: impl RangeBounds<BlockNumber>,
2971    ) -> ProviderResult<Vec<Vec<Self::Transaction>>> {
2972        // Required data not present in static_files
2973        Err(ProviderError::UnsupportedProvider)
2974    }
2975
2976    fn transactions_by_tx_range(
2977        &self,
2978        range: impl RangeBounds<TxNumber>,
2979    ) -> ProviderResult<Vec<Self::Transaction>> {
2980        self.fetch_range_with_predicate(
2981            StaticFileSegment::Transactions,
2982            to_range(range),
2983            |cursor, number| cursor.get_one::<TransactionMask<Self::Transaction>>(number.into()),
2984            |_| true,
2985        )
2986    }
2987
2988    fn senders_by_tx_range(
2989        &self,
2990        range: impl RangeBounds<TxNumber>,
2991    ) -> ProviderResult<Vec<Address>> {
2992        self.fetch_range_with_predicate(
2993            StaticFileSegment::TransactionSenders,
2994            to_range(range),
2995            |cursor, number| cursor.get_one::<TransactionSenderMask>(number.into()),
2996            |_| true,
2997        )
2998    }
2999
3000    fn transaction_sender(&self, id: TxNumber) -> ProviderResult<Option<Address>> {
3001        self.get_segment_provider_for_transaction(StaticFileSegment::TransactionSenders, id, None)
3002            .and_then(|provider| provider.transaction_sender(id))
3003            .or_else(|err| {
3004                if let ProviderError::MissingStaticFileTx(_, _) = err {
3005                    Ok(None)
3006                } else {
3007                    Err(err)
3008                }
3009            })
3010    }
3011}
3012
3013impl<N: NodePrimitives> BlockNumReader for StaticFileProvider<N> {
3014    fn chain_info(&self) -> ProviderResult<ChainInfo> {
3015        // Required data not present in static_files
3016        Err(ProviderError::UnsupportedProvider)
3017    }
3018
3019    fn best_block_number(&self) -> ProviderResult<BlockNumber> {
3020        // Required data not present in static_files
3021        Err(ProviderError::UnsupportedProvider)
3022    }
3023
3024    fn last_block_number(&self) -> ProviderResult<BlockNumber> {
3025        Ok(self.get_highest_static_file_block(StaticFileSegment::Headers).unwrap_or_default())
3026    }
3027
3028    fn block_number(&self, _hash: B256) -> ProviderResult<Option<BlockNumber>> {
3029        // Required data not present in static_files
3030        Err(ProviderError::UnsupportedProvider)
3031    }
3032}
3033
3034/* Cannot be successfully implemented but must exist for trait requirements */
3035
3036impl<N: NodePrimitives<SignedTx: Value, Receipt: Value, BlockHeader: Value>> BlockReader
3037    for StaticFileProvider<N>
3038{
3039    type Block = N::Block;
3040
3041    fn find_block_by_hash(
3042        &self,
3043        _hash: B256,
3044        _source: BlockSource,
3045    ) -> ProviderResult<Option<Self::Block>> {
3046        // Required data not present in static_files
3047        Err(ProviderError::UnsupportedProvider)
3048    }
3049
3050    fn block(&self, _id: BlockHashOrNumber) -> ProviderResult<Option<Self::Block>> {
3051        // Required data not present in static_files
3052        Err(ProviderError::UnsupportedProvider)
3053    }
3054
3055    fn pending_block(&self) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
3056        // Required data not present in static_files
3057        Err(ProviderError::UnsupportedProvider)
3058    }
3059
3060    fn pending_block_and_receipts(
3061        &self,
3062    ) -> ProviderResult<Option<(RecoveredBlock<Self::Block>, Vec<Self::Receipt>)>> {
3063        // Required data not present in static_files
3064        Err(ProviderError::UnsupportedProvider)
3065    }
3066
3067    fn recovered_block(
3068        &self,
3069        _id: BlockHashOrNumber,
3070        _transaction_kind: TransactionVariant,
3071    ) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
3072        // Required data not present in static_files
3073        Err(ProviderError::UnsupportedProvider)
3074    }
3075
3076    fn sealed_block_with_senders(
3077        &self,
3078        _id: BlockHashOrNumber,
3079        _transaction_kind: TransactionVariant,
3080    ) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
3081        // Required data not present in static_files
3082        Err(ProviderError::UnsupportedProvider)
3083    }
3084
3085    fn block_range(&self, _range: RangeInclusive<BlockNumber>) -> ProviderResult<Vec<Self::Block>> {
3086        // Required data not present in static_files
3087        Err(ProviderError::UnsupportedProvider)
3088    }
3089
3090    fn block_with_senders_range(
3091        &self,
3092        _range: RangeInclusive<BlockNumber>,
3093    ) -> ProviderResult<Vec<RecoveredBlock<Self::Block>>> {
3094        Err(ProviderError::UnsupportedProvider)
3095    }
3096
3097    fn recovered_block_range(
3098        &self,
3099        _range: RangeInclusive<BlockNumber>,
3100    ) -> ProviderResult<Vec<RecoveredBlock<Self::Block>>> {
3101        Err(ProviderError::UnsupportedProvider)
3102    }
3103
3104    fn block_by_transaction_id(&self, _id: TxNumber) -> ProviderResult<Option<BlockNumber>> {
3105        Err(ProviderError::UnsupportedProvider)
3106    }
3107}
3108
3109impl<N: NodePrimitives> BlockBodyIndicesProvider for StaticFileProvider<N> {
3110    fn block_body_indices(&self, _num: u64) -> ProviderResult<Option<StoredBlockBodyIndices>> {
3111        Err(ProviderError::UnsupportedProvider)
3112    }
3113
3114    fn block_body_indices_range(
3115        &self,
3116        _range: RangeInclusive<BlockNumber>,
3117    ) -> ProviderResult<Vec<StoredBlockBodyIndices>> {
3118        Err(ProviderError::UnsupportedProvider)
3119    }
3120}
3121
3122impl<N: NodePrimitives> StatsReader for StaticFileProvider<N> {
3123    fn count_entries<T: Table>(&self) -> ProviderResult<usize> {
3124        match T::NAME {
3125            tables::CanonicalHeaders::NAME |
3126            tables::Headers::<Header>::NAME |
3127            tables::HeaderTerminalDifficulties::NAME => Ok(self
3128                .get_highest_static_file_block(StaticFileSegment::Headers)
3129                .map(|block| block + 1)
3130                .unwrap_or_default()
3131                as usize),
3132            tables::Receipts::<Receipt>::NAME => Ok(self
3133                .get_highest_static_file_tx(StaticFileSegment::Receipts)
3134                .map(|receipts| receipts + 1)
3135                .unwrap_or_default() as usize),
3136            tables::Transactions::<TransactionSigned>::NAME => Ok(self
3137                .get_highest_static_file_tx(StaticFileSegment::Transactions)
3138                .map(|txs| txs + 1)
3139                .unwrap_or_default()
3140                as usize),
3141            tables::TransactionSenders::NAME => Ok(self
3142                .get_highest_static_file_tx(StaticFileSegment::TransactionSenders)
3143                .map(|txs| txs + 1)
3144                .unwrap_or_default() as usize),
3145            _ => Err(ProviderError::UnsupportedProvider),
3146        }
3147    }
3148}
3149
3150/// Calculates the tx hash for the given transaction and its id.
3151#[inline]
3152fn calculate_hash<T>(
3153    entry: (TxNumber, T),
3154    rlp_buf: &mut Vec<u8>,
3155) -> Result<(B256, TxNumber), Box<ProviderError>>
3156where
3157    T: Encodable2718,
3158{
3159    let (tx_id, tx) = entry;
3160    tx.encode_2718(rlp_buf);
3161    Ok((keccak256(rlp_buf), tx_id))
3162}
3163
3164#[cfg(test)]
3165mod tests {
3166    use std::collections::BTreeMap;
3167
3168    use reth_chain_state::EthPrimitives;
3169    use reth_db::test_utils::create_test_static_files_dir;
3170    use reth_static_file_types::{SegmentRangeInclusive, StaticFileSegment};
3171
3172    use crate::{providers::StaticFileProvider, StaticFileProviderBuilder};
3173
3174    #[test]
3175    fn test_find_fixed_range_with_block_index() -> eyre::Result<()> {
3176        let (static_dir, _) = create_test_static_files_dir();
3177        let sf_rw: StaticFileProvider<EthPrimitives> =
3178            StaticFileProviderBuilder::read_write(&static_dir).with_blocks_per_file(100).build()?;
3179
3180        let segment = StaticFileSegment::Headers;
3181
3182        // Test with None - should use default behavior
3183        assert_eq!(
3184            sf_rw.find_fixed_range_with_block_index(segment, None, 0),
3185            SegmentRangeInclusive::new(0, 99)
3186        );
3187        assert_eq!(
3188            sf_rw.find_fixed_range_with_block_index(segment, None, 250),
3189            SegmentRangeInclusive::new(200, 299)
3190        );
3191
3192        // Test with empty index - should fall back to default behavior
3193        assert_eq!(
3194            sf_rw.find_fixed_range_with_block_index(segment, Some(&BTreeMap::new()), 150),
3195            SegmentRangeInclusive::new(100, 199)
3196        );
3197
3198        // Create block index with existing ranges
3199        let block_index = BTreeMap::from_iter([
3200            (99, SegmentRangeInclusive::new(0, 99)),
3201            (199, SegmentRangeInclusive::new(100, 199)),
3202            (299, SegmentRangeInclusive::new(200, 299)),
3203        ]);
3204
3205        // Test blocks within existing ranges - should return the matching range
3206        assert_eq!(
3207            sf_rw.find_fixed_range_with_block_index(segment, Some(&block_index), 0),
3208            SegmentRangeInclusive::new(0, 99)
3209        );
3210        assert_eq!(
3211            sf_rw.find_fixed_range_with_block_index(segment, Some(&block_index), 50),
3212            SegmentRangeInclusive::new(0, 99)
3213        );
3214        assert_eq!(
3215            sf_rw.find_fixed_range_with_block_index(segment, Some(&block_index), 99),
3216            SegmentRangeInclusive::new(0, 99)
3217        );
3218        assert_eq!(
3219            sf_rw.find_fixed_range_with_block_index(segment, Some(&block_index), 100),
3220            SegmentRangeInclusive::new(100, 199)
3221        );
3222        assert_eq!(
3223            sf_rw.find_fixed_range_with_block_index(segment, Some(&block_index), 150),
3224            SegmentRangeInclusive::new(100, 199)
3225        );
3226        assert_eq!(
3227            sf_rw.find_fixed_range_with_block_index(segment, Some(&block_index), 199),
3228            SegmentRangeInclusive::new(100, 199)
3229        );
3230
3231        // Test blocks beyond existing ranges - should derive new ranges from the last range
3232        // Block 300 is exactly one segment after the last range
3233        assert_eq!(
3234            sf_rw.find_fixed_range_with_block_index(segment, Some(&block_index), 300),
3235            SegmentRangeInclusive::new(300, 399)
3236        );
3237        assert_eq!(
3238            sf_rw.find_fixed_range_with_block_index(segment, Some(&block_index), 350),
3239            SegmentRangeInclusive::new(300, 399)
3240        );
3241
3242        // Block 500 skips one segment (300-399)
3243        assert_eq!(
3244            sf_rw.find_fixed_range_with_block_index(segment, Some(&block_index), 500),
3245            SegmentRangeInclusive::new(500, 599)
3246        );
3247
3248        // Block 1000 skips many segments
3249        assert_eq!(
3250            sf_rw.find_fixed_range_with_block_index(segment, Some(&block_index), 1000),
3251            SegmentRangeInclusive::new(1000, 1099)
3252        );
3253
3254        // Test with block index having different sizes than blocks_per_file setting
3255        // This simulates the scenario where blocks_per_file was changed between runs
3256        let mixed_size_index = BTreeMap::from_iter([
3257            (49, SegmentRangeInclusive::new(0, 49)),     // 50 blocks
3258            (149, SegmentRangeInclusive::new(50, 149)),  // 100 blocks
3259            (349, SegmentRangeInclusive::new(150, 349)), // 200 blocks
3260        ]);
3261
3262        // Blocks within existing ranges should return those ranges regardless of size
3263        assert_eq!(
3264            sf_rw.find_fixed_range_with_block_index(segment, Some(&mixed_size_index), 25),
3265            SegmentRangeInclusive::new(0, 49)
3266        );
3267        assert_eq!(
3268            sf_rw.find_fixed_range_with_block_index(segment, Some(&mixed_size_index), 100),
3269            SegmentRangeInclusive::new(50, 149)
3270        );
3271        assert_eq!(
3272            sf_rw.find_fixed_range_with_block_index(segment, Some(&mixed_size_index), 200),
3273            SegmentRangeInclusive::new(150, 349)
3274        );
3275
3276        // Block after the last range should derive using current blocks_per_file (100)
3277        // from the end of the last range (349)
3278        assert_eq!(
3279            sf_rw.find_fixed_range_with_block_index(segment, Some(&mixed_size_index), 350),
3280            SegmentRangeInclusive::new(350, 449)
3281        );
3282        assert_eq!(
3283            sf_rw.find_fixed_range_with_block_index(segment, Some(&mixed_size_index), 450),
3284            SegmentRangeInclusive::new(450, 549)
3285        );
3286        assert_eq!(
3287            sf_rw.find_fixed_range_with_block_index(segment, Some(&mixed_size_index), 550),
3288            SegmentRangeInclusive::new(550, 649)
3289        );
3290
3291        Ok(())
3292    }
3293}