reth_provider/providers/static_file/
manager.rs

1use super::{
2    metrics::StaticFileProviderMetrics, writer::StaticFileWriters, LoadedJar,
3    StaticFileJarProvider, StaticFileProviderRW, StaticFileProviderRWRefMut,
4};
5use crate::{
6    to_range, BlockHashReader, BlockNumReader, BlockReader, BlockSource, EitherWriter,
7    EitherWriterDestination, HeaderProvider, ReceiptProvider, StageCheckpointReader, StatsReader,
8    TransactionVariant, TransactionsProvider, TransactionsProviderExt,
9};
10use alloy_consensus::{transaction::TransactionMeta, Header};
11use alloy_eips::{eip2718::Encodable2718, BlockHashOrNumber};
12use alloy_primitives::{b256, keccak256, Address, BlockHash, BlockNumber, TxHash, TxNumber, B256};
13use dashmap::DashMap;
14use notify::{RecommendedWatcher, RecursiveMode, Watcher};
15use parking_lot::RwLock;
16use reth_chainspec::{ChainInfo, ChainSpecProvider, EthChainSpec, NamedChain};
17use reth_db::{
18    lockfile::StorageLock,
19    static_file::{
20        iter_static_files, BlockHashMask, HeaderMask, HeaderWithHashMask, ReceiptMask,
21        StaticFileCursor, TransactionMask, TransactionSenderMask,
22    },
23};
24use reth_db_api::{
25    cursor::DbCursorRO,
26    models::StoredBlockBodyIndices,
27    table::{Decompress, Table, Value},
28    tables,
29    transaction::DbTx,
30};
31use reth_ethereum_primitives::{Receipt, TransactionSigned};
32use reth_nippy_jar::{NippyJar, NippyJarChecker, CONFIG_FILE_EXTENSION};
33use reth_node_types::NodePrimitives;
34use reth_primitives_traits::{RecoveredBlock, SealedHeader, SignedTransaction};
35use reth_stages_types::{PipelineTarget, StageId};
36use reth_static_file_types::{
37    find_fixed_range, HighestStaticFiles, SegmentHeader, SegmentRangeInclusive, StaticFileSegment,
38    DEFAULT_BLOCKS_PER_STATIC_FILE,
39};
40use reth_storage_api::{BlockBodyIndicesProvider, DBProvider, StorageSettingsCache};
41use reth_storage_errors::provider::{ProviderError, ProviderResult};
42use std::{
43    collections::{BTreeMap, HashMap},
44    fmt::Debug,
45    ops::{Deref, Range, RangeBounds, RangeInclusive},
46    path::{Path, PathBuf},
47    sync::{atomic::AtomicU64, mpsc, Arc},
48};
49use tracing::{debug, info, trace, warn};
50
51/// Alias type for a map that can be queried for block or transaction ranges. It uses `u64` to
52/// represent either a block or a transaction number end of a static file range.
53type SegmentRanges = BTreeMap<u64, SegmentRangeInclusive>;
54
55/// Access mode on a static file provider. RO/RW.
56#[derive(Debug, Default, PartialEq, Eq)]
57pub enum StaticFileAccess {
58    /// Read-only access.
59    #[default]
60    RO,
61    /// Read-write access.
62    RW,
63}
64
65impl StaticFileAccess {
66    /// Returns `true` if read-only access.
67    pub const fn is_read_only(&self) -> bool {
68        matches!(self, Self::RO)
69    }
70
71    /// Returns `true` if read-write access.
72    pub const fn is_read_write(&self) -> bool {
73        matches!(self, Self::RW)
74    }
75}
76
77/// [`StaticFileProvider`] manages all existing [`StaticFileJarProvider`].
78///
79/// "Static files" contain immutable chain history data, such as:
80///  - transactions
81///  - headers
82///  - receipts
83///
84/// This provider type is responsible for reading and writing to static files.
85#[derive(Debug)]
86pub struct StaticFileProvider<N>(pub(crate) Arc<StaticFileProviderInner<N>>);
87
88impl<N> Clone for StaticFileProvider<N> {
89    fn clone(&self) -> Self {
90        Self(self.0.clone())
91    }
92}
93
94/// Builder for [`StaticFileProvider`] that allows configuration before initialization.
95#[derive(Debug)]
96pub struct StaticFileProviderBuilder<N> {
97    inner: StaticFileProviderInner<N>,
98}
99
100impl<N: NodePrimitives> StaticFileProviderBuilder<N> {
101    /// Creates a new builder with read-write access.
102    pub fn read_write(path: impl AsRef<Path>) -> ProviderResult<Self> {
103        StaticFileProviderInner::new(path, StaticFileAccess::RW).map(|inner| Self { inner })
104    }
105
106    /// Creates a new builder with read-only access.
107    pub fn read_only(path: impl AsRef<Path>) -> ProviderResult<Self> {
108        StaticFileProviderInner::new(path, StaticFileAccess::RO).map(|inner| Self { inner })
109    }
110
111    /// Set custom blocks per file for specific segments.
112    ///
113    /// Each static file segment is stored across multiple files, and each of these files contains
114    /// up to the specified number of blocks of data. When the file gets full, a new file is
115    /// created with the new block range.
116    ///
117    /// This setting affects the size of each static file, and can be set per segment.
118    ///
119    /// If it is changed for an existing node, existing static files will not be affected and will
120    /// be finished with the old blocks per file setting, but new static files will use the new
121    /// setting.
122    pub fn with_blocks_per_file_for_segments(
123        mut self,
124        segments: HashMap<StaticFileSegment, u64>,
125    ) -> Self {
126        self.inner.blocks_per_file.extend(segments);
127        self
128    }
129
130    /// Set a custom number of blocks per file for all segments.
131    pub fn with_blocks_per_file(mut self, blocks_per_file: u64) -> Self {
132        for segment in StaticFileSegment::iter() {
133            self.inner.blocks_per_file.insert(segment, blocks_per_file);
134        }
135        self
136    }
137
138    /// Set a custom number of blocks per file for a specific segment.
139    pub fn with_blocks_per_file_for_segment(
140        mut self,
141        segment: StaticFileSegment,
142        blocks_per_file: u64,
143    ) -> Self {
144        self.inner.blocks_per_file.insert(segment, blocks_per_file);
145        self
146    }
147
148    /// Enables metrics on the [`StaticFileProvider`].
149    pub fn with_metrics(mut self) -> Self {
150        self.inner.metrics = Some(Arc::new(StaticFileProviderMetrics::default()));
151        self
152    }
153
154    /// Builds the final [`StaticFileProvider`] and initializes the index.
155    pub fn build(self) -> ProviderResult<StaticFileProvider<N>> {
156        let provider = StaticFileProvider(Arc::new(self.inner));
157        provider.initialize_index()?;
158        Ok(provider)
159    }
160}
161
162impl<N: NodePrimitives> StaticFileProvider<N> {
163    /// Creates a new [`StaticFileProvider`] with read-only access.
164    ///
165    /// Set `watch_directory` to `true` to track the most recent changes in static files. Otherwise,
166    /// new data won't be detected or queryable.
167    ///
168    /// Watching is recommended if the read-only provider is used on a directory that an active node
169    /// instance is modifying.
170    ///
171    /// See also [`StaticFileProvider::watch_directory`].
172    pub fn read_only(path: impl AsRef<Path>, watch_directory: bool) -> ProviderResult<Self> {
173        let provider = StaticFileProviderBuilder::read_only(path)?.build()?;
174
175        if watch_directory {
176            provider.watch_directory();
177        }
178
179        Ok(provider)
180    }
181
182    /// Creates a new [`StaticFileProvider`] with read-write access.
183    pub fn read_write(path: impl AsRef<Path>) -> ProviderResult<Self> {
184        StaticFileProviderBuilder::read_write(path)?.build()
185    }
186
187    /// Watches the directory for changes and updates the in-memory index when modifications
188    /// are detected.
189    ///
190    /// This may be necessary, since a non-node process that owns a [`StaticFileProvider`] does not
191    /// receive `update_index` notifications from a node that appends/truncates data.
192    pub fn watch_directory(&self) {
193        let provider = self.clone();
194        std::thread::spawn(move || {
195            let (tx, rx) = std::sync::mpsc::channel();
196            let mut watcher = RecommendedWatcher::new(
197                move |res| tx.send(res).unwrap(),
198                notify::Config::default(),
199            )
200            .expect("failed to create watcher");
201
202            watcher
203                .watch(&provider.path, RecursiveMode::NonRecursive)
204                .expect("failed to watch path");
205
206            // Some backends send repeated modified events
207            let mut last_event_timestamp = None;
208
209            while let Ok(res) = rx.recv() {
210                match res {
211                    Ok(event) => {
212                        // We only care about modified data events
213                        if !matches!(
214                            event.kind,
215                            notify::EventKind::Modify(_) |
216                                notify::EventKind::Create(_) |
217                                notify::EventKind::Remove(_)
218                        ) {
219                            continue
220                        }
221
222                        // We only trigger a re-initialization if a configuration file was
223                        // modified. This means that a
224                        // static_file_provider.commit() was called on the node after
225                        // appending/truncating rows
226                        for segment in event.paths {
227                            // Ensure it's a file with the .conf extension
228                            if segment
229                                .extension()
230                                .is_none_or(|s| s.to_str() != Some(CONFIG_FILE_EXTENSION))
231                            {
232                                continue
233                            }
234
235                            // Ensure it's well formatted static file name
236                            if StaticFileSegment::parse_filename(
237                                &segment.file_stem().expect("qed").to_string_lossy(),
238                            )
239                            .is_none()
240                            {
241                                continue
242                            }
243
244                            // If we can read the metadata and modified timestamp, ensure this is
245                            // not an old or repeated event.
246                            if let Ok(current_modified_timestamp) =
247                                std::fs::metadata(&segment).and_then(|m| m.modified())
248                            {
249                                if last_event_timestamp.is_some_and(|last_timestamp| {
250                                    last_timestamp >= current_modified_timestamp
251                                }) {
252                                    continue
253                                }
254                                last_event_timestamp = Some(current_modified_timestamp);
255                            }
256
257                            info!(target: "providers::static_file", updated_file = ?segment.file_stem(), "re-initializing static file provider index");
258                            if let Err(err) = provider.initialize_index() {
259                                warn!(target: "providers::static_file", "failed to re-initialize index: {err}");
260                            }
261                            break
262                        }
263                    }
264
265                    Err(err) => warn!(target: "providers::watcher", "watch error: {err:?}"),
266                }
267            }
268        });
269    }
270}
271
272impl<N: NodePrimitives> Deref for StaticFileProvider<N> {
273    type Target = StaticFileProviderInner<N>;
274
275    fn deref(&self) -> &Self::Target {
276        &self.0
277    }
278}
279
280/// [`StaticFileProviderInner`] manages all existing [`StaticFileJarProvider`].
281#[derive(Debug)]
282pub struct StaticFileProviderInner<N> {
283    /// Maintains a map which allows for concurrent access to different `NippyJars`, over different
284    /// segments and ranges.
285    map: DashMap<(BlockNumber, StaticFileSegment), LoadedJar>,
286    /// Indexes per segment.
287    indexes: RwLock<HashMap<StaticFileSegment, StaticFileSegmentIndex>>,
288    /// This is an additional index that tracks the expired height, this will track the highest
289    /// block number that has been expired (missing). The first, non expired block is
290    /// `expired_history_height + 1`.
291    ///
292    /// This is effectively the transaction range that has been expired:
293    /// [`StaticFileProvider::delete_segment_below_block`] and mirrors
294    /// `static_files_min_block[transactions] - blocks_per_file`.
295    ///
296    /// This additional tracker exists for more efficient lookups because the node must be aware of
297    /// the expired height.
298    earliest_history_height: AtomicU64,
299    /// Directory where `static_files` are located
300    path: PathBuf,
301    /// Maintains a writer set of [`StaticFileSegment`].
302    writers: StaticFileWriters<N>,
303    /// Metrics for the static files.
304    metrics: Option<Arc<StaticFileProviderMetrics>>,
305    /// Access rights of the provider.
306    access: StaticFileAccess,
307    /// Number of blocks per file, per segment.
308    blocks_per_file: HashMap<StaticFileSegment, u64>,
309    /// Write lock for when access is [`StaticFileAccess::RW`].
310    _lock_file: Option<StorageLock>,
311}
312
313impl<N: NodePrimitives> StaticFileProviderInner<N> {
314    /// Creates a new [`StaticFileProviderInner`].
315    fn new(path: impl AsRef<Path>, access: StaticFileAccess) -> ProviderResult<Self> {
316        let _lock_file = if access.is_read_write() {
317            StorageLock::try_acquire(path.as_ref()).map_err(ProviderError::other)?.into()
318        } else {
319            None
320        };
321
322        let mut blocks_per_file = HashMap::new();
323        for segment in StaticFileSegment::iter() {
324            blocks_per_file.insert(segment, DEFAULT_BLOCKS_PER_STATIC_FILE);
325        }
326
327        let provider = Self {
328            map: Default::default(),
329            indexes: Default::default(),
330            writers: Default::default(),
331            earliest_history_height: Default::default(),
332            path: path.as_ref().to_path_buf(),
333            metrics: None,
334            access,
335            blocks_per_file,
336            _lock_file,
337        };
338
339        Ok(provider)
340    }
341
342    pub const fn is_read_only(&self) -> bool {
343        self.access.is_read_only()
344    }
345
346    /// Each static file has a fixed number of blocks. This gives out the range where the requested
347    /// block is positioned.
348    ///
349    /// If the specified block falls into one of the ranges of already initialized static files,
350    /// this function will return that range.
351    ///
352    /// If no matching file exists, this function will derive a new range from the end of the last
353    /// existing file, if any.
354    pub fn find_fixed_range_with_block_index(
355        &self,
356        segment: StaticFileSegment,
357        block_index: Option<&SegmentRanges>,
358        block: BlockNumber,
359    ) -> SegmentRangeInclusive {
360        let blocks_per_file =
361            self.blocks_per_file.get(&segment).copied().unwrap_or(DEFAULT_BLOCKS_PER_STATIC_FILE);
362
363        if let Some(block_index) = block_index {
364            // Find first block range that contains the requested block
365            if let Some((_, range)) = block_index.iter().find(|(max_block, _)| block <= **max_block)
366            {
367                // Found matching range for an existing file using block index
368                return *range
369            } else if let Some((_, range)) = block_index.last_key_value() {
370                // Didn't find matching range for an existing file, derive a new range from the end
371                // of the last existing file range.
372                //
373                // `block` is always higher than `range.end()` here, because we iterated over all
374                // `block_index` ranges above and didn't find one that contains our block
375                let blocks_after_last_range = block - range.end();
376                let segments_to_skip = (blocks_after_last_range - 1) / blocks_per_file;
377                let start = range.end() + 1 + segments_to_skip * blocks_per_file;
378                return SegmentRangeInclusive::new(start, start + blocks_per_file - 1)
379            }
380        }
381        // No block index is available, derive a new range using the fixed number of blocks,
382        // starting from the beginning.
383        find_fixed_range(block, blocks_per_file)
384    }
385
386    /// Each static file has a fixed number of blocks. This gives out the range where the requested
387    /// block is positioned.
388    ///
389    /// If the specified block falls into one of the ranges of already initialized static files,
390    /// this function will return that range.
391    ///
392    /// If no matching file exists, this function will derive a new range from the end of the last
393    /// existing file, if any.
394    ///
395    /// This function will block indefinitely if a write lock for
396    /// [`Self::indexes`] is already acquired. In that case, use
397    /// [`Self::find_fixed_range_with_block_index`].
398    pub fn find_fixed_range(
399        &self,
400        segment: StaticFileSegment,
401        block: BlockNumber,
402    ) -> SegmentRangeInclusive {
403        self.find_fixed_range_with_block_index(
404            segment,
405            self.indexes
406                .read()
407                .get(&segment)
408                .map(|index| &index.expected_block_ranges_by_max_block),
409            block,
410        )
411    }
412}
413
414impl<N: NodePrimitives> StaticFileProvider<N> {
415    /// Reports metrics for the static files.
416    pub fn report_metrics(&self) -> ProviderResult<()> {
417        let Some(metrics) = &self.metrics else { return Ok(()) };
418
419        let static_files = iter_static_files(&self.path).map_err(ProviderError::other)?;
420        for (segment, headers) in static_files {
421            let mut entries = 0;
422            let mut size = 0;
423
424            for (block_range, _) in &headers {
425                let fixed_block_range = self.find_fixed_range(segment, block_range.start());
426                let jar_provider = self
427                    .get_segment_provider_for_range(segment, || Some(fixed_block_range), None)?
428                    .ok_or_else(|| {
429                        ProviderError::MissingStaticFileBlock(segment, block_range.start())
430                    })?;
431
432                entries += jar_provider.rows();
433
434                let data_path = jar_provider.data_path().to_path_buf();
435                let index_path = jar_provider.index_path();
436                let offsets_path = jar_provider.offsets_path();
437                let config_path = jar_provider.config_path();
438
439                // can release jar early
440                drop(jar_provider);
441
442                let data_size = reth_fs_util::metadata(data_path)
443                    .map(|metadata| metadata.len())
444                    .unwrap_or_default();
445                let index_size = reth_fs_util::metadata(index_path)
446                    .map(|metadata| metadata.len())
447                    .unwrap_or_default();
448                let offsets_size = reth_fs_util::metadata(offsets_path)
449                    .map(|metadata| metadata.len())
450                    .unwrap_or_default();
451                let config_size = reth_fs_util::metadata(config_path)
452                    .map(|metadata| metadata.len())
453                    .unwrap_or_default();
454
455                size += data_size + index_size + offsets_size + config_size;
456            }
457
458            metrics.record_segment(segment, size, headers.len(), entries);
459        }
460
461        Ok(())
462    }
463
464    /// Gets the [`StaticFileJarProvider`] of the requested segment and start index that can be
465    /// either block or transaction.
466    pub fn get_segment_provider(
467        &self,
468        segment: StaticFileSegment,
469        number: u64,
470    ) -> ProviderResult<StaticFileJarProvider<'_, N>> {
471        if segment.is_block_based() {
472            self.get_segment_provider_for_block(segment, number, None)
473        } else {
474            self.get_segment_provider_for_transaction(segment, number, None)
475        }
476    }
477
478    /// Gets the [`StaticFileJarProvider`] of the requested segment and start index that can be
479    /// either block or transaction.
480    ///
481    /// If the segment is not found, returns [`None`].
482    pub fn get_maybe_segment_provider(
483        &self,
484        segment: StaticFileSegment,
485        number: u64,
486    ) -> ProviderResult<Option<StaticFileJarProvider<'_, N>>> {
487        let provider = if segment.is_block_based() {
488            self.get_segment_provider_for_block(segment, number, None)
489        } else {
490            self.get_segment_provider_for_transaction(segment, number, None)
491        };
492
493        match provider {
494            Ok(provider) => Ok(Some(provider)),
495            Err(
496                ProviderError::MissingStaticFileBlock(_, _) |
497                ProviderError::MissingStaticFileTx(_, _),
498            ) => Ok(None),
499            Err(err) => Err(err),
500        }
501    }
502
503    /// Gets the [`StaticFileJarProvider`] of the requested segment and block.
504    pub fn get_segment_provider_for_block(
505        &self,
506        segment: StaticFileSegment,
507        block: BlockNumber,
508        path: Option<&Path>,
509    ) -> ProviderResult<StaticFileJarProvider<'_, N>> {
510        self.get_segment_provider_for_range(
511            segment,
512            || self.get_segment_ranges_from_block(segment, block),
513            path,
514        )?
515        .ok_or(ProviderError::MissingStaticFileBlock(segment, block))
516    }
517
518    /// Gets the [`StaticFileJarProvider`] of the requested segment and transaction.
519    pub fn get_segment_provider_for_transaction(
520        &self,
521        segment: StaticFileSegment,
522        tx: TxNumber,
523        path: Option<&Path>,
524    ) -> ProviderResult<StaticFileJarProvider<'_, N>> {
525        self.get_segment_provider_for_range(
526            segment,
527            || self.get_segment_ranges_from_transaction(segment, tx),
528            path,
529        )?
530        .ok_or(ProviderError::MissingStaticFileTx(segment, tx))
531    }
532
533    /// Gets the [`StaticFileJarProvider`] of the requested segment and block or transaction.
534    ///
535    /// `fn_range` should make sure the range goes through `find_fixed_range`.
536    pub fn get_segment_provider_for_range(
537        &self,
538        segment: StaticFileSegment,
539        fn_range: impl Fn() -> Option<SegmentRangeInclusive>,
540        path: Option<&Path>,
541    ) -> ProviderResult<Option<StaticFileJarProvider<'_, N>>> {
542        // If we have a path, then get the block range from its name.
543        // Otherwise, check `self.available_static_files`
544        let block_range = match path {
545            Some(path) => StaticFileSegment::parse_filename(
546                &path
547                    .file_name()
548                    .ok_or_else(|| {
549                        ProviderError::MissingStaticFileSegmentPath(segment, path.to_path_buf())
550                    })?
551                    .to_string_lossy(),
552            )
553            .and_then(|(parsed_segment, block_range)| {
554                if parsed_segment == segment {
555                    return Some(block_range)
556                }
557                None
558            }),
559            None => fn_range(),
560        };
561
562        // Return cached `LoadedJar` or insert it for the first time, and then, return it.
563        if let Some(block_range) = block_range {
564            return Ok(Some(self.get_or_create_jar_provider(segment, &block_range)?))
565        }
566
567        Ok(None)
568    }
569
570    /// Gets the [`StaticFileJarProvider`] of the requested path.
571    pub fn get_segment_provider_for_path(
572        &self,
573        path: &Path,
574    ) -> ProviderResult<Option<StaticFileJarProvider<'_, N>>> {
575        StaticFileSegment::parse_filename(
576            &path
577                .file_name()
578                .ok_or_else(|| ProviderError::MissingStaticFilePath(path.to_path_buf()))?
579                .to_string_lossy(),
580        )
581        .map(|(segment, block_range)| self.get_or_create_jar_provider(segment, &block_range))
582        .transpose()
583    }
584
585    /// Given a segment and block range it removes the cached provider from the map.
586    ///
587    /// CAUTION: cached provider should be dropped before calling this or IT WILL deadlock.
588    pub fn remove_cached_provider(
589        &self,
590        segment: StaticFileSegment,
591        fixed_block_range_end: BlockNumber,
592    ) {
593        self.map.remove(&(fixed_block_range_end, segment));
594    }
595
596    /// This handles history expiry by deleting all static files for the given segment below the
597    /// given block.
598    ///
599    /// For example if block is 1M and the blocks per file are 500K this will delete all individual
600    /// files below 1M, so 0-499K and 500K-999K.
601    ///
602    /// This will not delete the file that contains the block itself, because files can only be
603    /// removed entirely.
604    ///
605    /// # Safety
606    ///
607    /// This method will never delete the highest static file for the segment, even if the
608    /// requested block is higher than the highest block in static files. This ensures we always
609    /// maintain at least one static file if any exist.
610    ///
611    /// Returns a list of `SegmentHeader`s from the deleted jars.
612    pub fn delete_segment_below_block(
613        &self,
614        segment: StaticFileSegment,
615        block: BlockNumber,
616    ) -> ProviderResult<Vec<SegmentHeader>> {
617        // Nothing to delete if block is 0.
618        if block == 0 {
619            return Ok(Vec::new())
620        }
621
622        let highest_block = self.get_highest_static_file_block(segment);
623        let mut deleted_headers = Vec::new();
624
625        loop {
626            let Some(block_height) = self.get_lowest_range_end(segment) else {
627                return Ok(deleted_headers)
628            };
629
630            // Stop if we've reached the target block or the highest static file
631            if block_height >= block || Some(block_height) == highest_block {
632                return Ok(deleted_headers)
633            }
634
635            debug!(
636                target: "provider::static_file",
637                ?segment,
638                ?block_height,
639                "Deleting static file below block"
640            );
641
642            // now we need to wipe the static file, this will take care of updating the index and
643            // advance the lowest tracked block height for the segment.
644            let header = self.delete_jar(segment, block_height).inspect_err(|err| {
645                warn!( target: "provider::static_file", ?segment, %block_height, ?err, "Failed to delete static file below block")
646            })?;
647
648            deleted_headers.push(header);
649        }
650    }
651
652    /// Given a segment and block, it deletes the jar and all files from the respective block range.
653    ///
654    /// CAUTION: destructive. Deletes files on disk.
655    ///
656    /// This will re-initialize the index after deletion, so all files are tracked.
657    ///
658    /// Returns the `SegmentHeader` of the deleted jar.
659    pub fn delete_jar(
660        &self,
661        segment: StaticFileSegment,
662        block: BlockNumber,
663    ) -> ProviderResult<SegmentHeader> {
664        let fixed_block_range = self.find_fixed_range(segment, block);
665        let key = (fixed_block_range.end(), segment);
666        let jar = if let Some((_, jar)) = self.map.remove(&key) {
667            jar.jar
668        } else {
669            let file = self.path.join(segment.filename(&fixed_block_range));
670            debug!(
671                target: "provider::static_file",
672                ?file,
673                ?fixed_block_range,
674                ?block,
675                "Loading static file jar for deletion"
676            );
677            NippyJar::<SegmentHeader>::load(&file).map_err(ProviderError::other)?
678        };
679
680        let header = *jar.user_header();
681        jar.delete().map_err(ProviderError::other)?;
682
683        // SAFETY: this is currently necessary to ensure that certain indexes like
684        // `static_files_min_block` have the correct values after pruning.
685        self.initialize_index()?;
686
687        Ok(header)
688    }
689
690    /// Given a segment and block range it returns a cached
691    /// [`StaticFileJarProvider`]. TODO(joshie): we should check the size and pop N if there's too
692    /// many.
693    fn get_or_create_jar_provider(
694        &self,
695        segment: StaticFileSegment,
696        fixed_block_range: &SegmentRangeInclusive,
697    ) -> ProviderResult<StaticFileJarProvider<'_, N>> {
698        let key = (fixed_block_range.end(), segment);
699
700        // Avoid using `entry` directly to avoid a write lock in the common case.
701        trace!(target: "provider::static_file", ?segment, ?fixed_block_range, "Getting provider");
702        let mut provider: StaticFileJarProvider<'_, N> = if let Some(jar) = self.map.get(&key) {
703            trace!(target: "provider::static_file", ?segment, ?fixed_block_range, "Jar found in cache");
704            jar.into()
705        } else {
706            trace!(target: "provider::static_file", ?segment, ?fixed_block_range, "Creating jar from scratch");
707            let path = self.path.join(segment.filename(fixed_block_range));
708            let jar = NippyJar::load(&path).map_err(ProviderError::other)?;
709            self.map.entry(key).insert(LoadedJar::new(jar)?).downgrade().into()
710        };
711
712        if let Some(metrics) = &self.metrics {
713            provider = provider.with_metrics(metrics.clone());
714        }
715        Ok(provider)
716    }
717
718    /// Gets a static file segment's block range from the provider inner block
719    /// index.
720    fn get_segment_ranges_from_block(
721        &self,
722        segment: StaticFileSegment,
723        block: u64,
724    ) -> Option<SegmentRangeInclusive> {
725        let indexes = self.indexes.read();
726        let index = indexes.get(&segment)?;
727
728        (index.max_block >= block).then(|| {
729            self.find_fixed_range_with_block_index(
730                segment,
731                Some(&index.expected_block_ranges_by_max_block),
732                block,
733            )
734        })
735    }
736
737    /// Gets a static file segment's fixed block range from the provider inner
738    /// transaction index.
739    fn get_segment_ranges_from_transaction(
740        &self,
741        segment: StaticFileSegment,
742        tx: u64,
743    ) -> Option<SegmentRangeInclusive> {
744        let indexes = self.indexes.read();
745        let index = indexes.get(&segment)?;
746        let available_block_ranges_by_max_tx = index.available_block_ranges_by_max_tx.as_ref()?;
747
748        // It's more probable that the request comes from a newer tx height, so we iterate
749        // the static_files in reverse.
750        let mut static_files_rev_iter = available_block_ranges_by_max_tx.iter().rev().peekable();
751
752        while let Some((tx_end, block_range)) = static_files_rev_iter.next() {
753            if tx > *tx_end {
754                // request tx is higher than highest static file tx
755                return None
756            }
757            let tx_start = static_files_rev_iter.peek().map(|(tx_end, _)| *tx_end + 1).unwrap_or(0);
758            if tx_start <= tx {
759                return Some(self.find_fixed_range_with_block_index(
760                    segment,
761                    Some(&index.expected_block_ranges_by_max_block),
762                    block_range.end(),
763                ))
764            }
765        }
766        None
767    }
768
769    /// Updates the inner transaction and block indexes alongside the internal cached providers in
770    /// `self.map`.
771    ///
772    /// Any entry higher than `segment_max_block` will be deleted from the previous structures.
773    ///
774    /// If `segment_max_block` is None it means there's no static file for this segment.
775    pub fn update_index(
776        &self,
777        segment: StaticFileSegment,
778        segment_max_block: Option<BlockNumber>,
779    ) -> ProviderResult<()> {
780        debug!(
781            target: "provider::static_file",
782            ?segment,
783            ?segment_max_block,
784            "Updating provider index"
785        );
786        let mut indexes = self.indexes.write();
787
788        match segment_max_block {
789            Some(segment_max_block) => {
790                let fixed_range = self.find_fixed_range_with_block_index(
791                    segment,
792                    indexes.get(&segment).map(|index| &index.expected_block_ranges_by_max_block),
793                    segment_max_block,
794                );
795
796                let jar = NippyJar::<SegmentHeader>::load(
797                    &self.path.join(segment.filename(&fixed_range)),
798                )
799                .map_err(ProviderError::other)?;
800
801                let index = indexes
802                    .entry(segment)
803                    .and_modify(|index| {
804                        // Update max block
805                        index.max_block = segment_max_block;
806
807                        // Update expected block range index
808
809                        // Remove all expected block ranges that are less than the new max block
810                        index
811                            .expected_block_ranges_by_max_block
812                            .retain(|_, block_range| block_range.start() < fixed_range.start());
813                        // Insert new expected block range
814                        index
815                            .expected_block_ranges_by_max_block
816                            .insert(fixed_range.end(), fixed_range);
817                    })
818                    .or_insert_with(|| StaticFileSegmentIndex {
819                        min_block_range: None,
820                        max_block: segment_max_block,
821                        expected_block_ranges_by_max_block: BTreeMap::from([(
822                            fixed_range.end(),
823                            fixed_range,
824                        )]),
825                        available_block_ranges_by_max_tx: None,
826                    });
827
828                // Update min_block to track the lowest block range of the segment.
829                // This is initially set by initialize_index() on node startup, but must be updated
830                // as the file grows to prevent stale values.
831                //
832                // Without this update, min_block can remain at genesis (e.g. Some([0..=0]) or None)
833                // even after syncing to higher blocks (e.g. [0..=100]). A stale
834                // min_block causes get_lowest_static_file_block() to return the
835                // wrong end value, which breaks pruning logic that relies on it for
836                // safety checks.
837                //
838                // Example progression:
839                // 1. Node starts, initialize_index() sets min_block = [0..=0]
840                // 2. Sync to block 100, this update sets min_block = [0..=100]
841                // 3. Pruner calls get_lowest_static_file_block() -> returns 100 (correct). Without
842                //    this update, it would incorrectly return 0 (stale)
843                if let Some(current_block_range) = jar.user_header().block_range() {
844                    if let Some(min_block_range) = index.min_block_range.as_mut() {
845                        // delete_jar WILL ALWAYS re-initialize all indexes, so we are always
846                        // sure that current_min is always the lowest.
847                        if current_block_range.start() == min_block_range.start() {
848                            *min_block_range = current_block_range;
849                        }
850                    } else {
851                        index.min_block_range = Some(current_block_range);
852                    }
853                }
854
855                // Updates the tx index by first removing all entries which have a higher
856                // block_start than our current static file.
857                if let Some(tx_range) = jar.user_header().tx_range() {
858                    // Current block range has the same block start as `fixed_range``, but block end
859                    // might be different if we are still filling this static file.
860                    if let Some(current_block_range) = jar.user_header().block_range() {
861                        let tx_end = tx_range.end();
862
863                        // Considering that `update_index` is called when we either append/truncate,
864                        // we are sure that we are handling the latest data
865                        // points.
866                        //
867                        // Here we remove every entry of the index that has a block start higher or
868                        // equal than our current one. This is important in the case
869                        // that we prune a lot of rows resulting in a file (and thus
870                        // a higher block range) deletion.
871                        if let Some(index) = index.available_block_ranges_by_max_tx.as_mut() {
872                            index
873                                .retain(|_, block_range| block_range.start() < fixed_range.start());
874                            index.insert(tx_end, current_block_range);
875                        } else {
876                            index.available_block_ranges_by_max_tx =
877                                Some(BTreeMap::from([(tx_end, current_block_range)]));
878                        }
879                    }
880                } else if segment.is_tx_based() {
881                    // The unwinded file has no more transactions/receipts. However, the highest
882                    // block is within this files' block range. We only retain
883                    // entries with block ranges before the current one.
884                    if let Some(index) = index.available_block_ranges_by_max_tx.as_mut() {
885                        index.retain(|_, block_range| block_range.start() < fixed_range.start());
886                    }
887
888                    // If the index is empty, just remove it.
889                    index.available_block_ranges_by_max_tx.take_if(|index| index.is_empty());
890                }
891
892                // Update the cached provider.
893                debug!(target: "provider::static_file", ?segment, "Inserting updated jar into cache");
894                self.map.insert((fixed_range.end(), segment), LoadedJar::new(jar)?);
895
896                // Delete any cached provider that no longer has an associated jar.
897                debug!(target: "provider::static_file", ?segment, "Cleaning up jar map");
898                self.map.retain(|(end, seg), _| !(*seg == segment && *end > fixed_range.end()));
899            }
900            None => {
901                debug!(target: "provider::static_file", ?segment, "Removing segment from index");
902                indexes.remove(&segment);
903            }
904        };
905
906        debug!(target: "provider::static_file", ?segment, "Updated provider index");
907        Ok(())
908    }
909
910    /// Initializes the inner transaction and block index
911    pub fn initialize_index(&self) -> ProviderResult<()> {
912        let mut indexes = self.indexes.write();
913        indexes.clear();
914
915        for (segment, headers) in iter_static_files(&self.path).map_err(ProviderError::other)? {
916            // Update first and last block for each segment
917            //
918            // It's safe to call `expect` here, because every segment has at least one header
919            // associated with it.
920            let min_block_range = Some(headers.first().expect("headers are not empty").0);
921            let max_block = headers.last().expect("headers are not empty").0.end();
922
923            let mut expected_block_ranges_by_max_block = BTreeMap::default();
924            let mut available_block_ranges_by_max_tx = None;
925
926            for (block_range, header) in headers {
927                // Update max expected block -> expected_block_range index
928                expected_block_ranges_by_max_block
929                    .insert(header.expected_block_end(), header.expected_block_range());
930
931                // Update max tx -> block_range index
932                if let Some(tx_range) = header.tx_range() {
933                    let tx_end = tx_range.end();
934
935                    available_block_ranges_by_max_tx
936                        .get_or_insert_with(BTreeMap::default)
937                        .insert(tx_end, block_range);
938                }
939            }
940
941            indexes.insert(
942                segment,
943                StaticFileSegmentIndex {
944                    min_block_range,
945                    max_block,
946                    expected_block_ranges_by_max_block,
947                    available_block_ranges_by_max_tx,
948                },
949            );
950        }
951
952        // If this is a re-initialization, we need to clear this as well
953        self.map.clear();
954
955        // initialize the expired history height to the lowest static file block
956        if let Some(lowest_range) =
957            indexes.get(&StaticFileSegment::Transactions).and_then(|index| index.min_block_range)
958        {
959            // the earliest height is the lowest available block number
960            self.earliest_history_height
961                .store(lowest_range.start(), std::sync::atomic::Ordering::Relaxed);
962        }
963
964        Ok(())
965    }
966
967    /// Ensures that any broken invariants which cannot be healed on the spot return a pipeline
968    /// target to unwind to.
969    ///
970    /// Two types of consistency checks are done for:
971    ///
972    /// 1) When a static file fails to commit but the underlying data was changed.
973    /// 2) When a static file was committed, but the required database transaction was not.
974    ///
975    /// For 1) it can self-heal if `self.access.is_read_only()` is set to `false`. Otherwise, it
976    /// will return an error.
977    /// For 2) the invariants below are checked, and if broken, might require a pipeline unwind
978    /// to heal.
979    ///
980    /// For each static file segment:
981    /// * the corresponding database table should overlap or have continuity in their keys
982    ///   ([`TxNumber`] or [`BlockNumber`]).
983    /// * its highest block should match the stage checkpoint block number if it's equal or higher
984    ///   than the corresponding database table last entry.
985    ///
986    /// Returns a [`Option`] of [`PipelineTarget::Unwind`] if any healing is further required.
987    ///
988    /// WARNING: No static file writer should be held before calling this function, otherwise it
989    /// will deadlock.
990    pub fn check_consistency<Provider>(
991        &self,
992        provider: &Provider,
993    ) -> ProviderResult<Option<PipelineTarget>>
994    where
995        Provider: DBProvider
996            + BlockReader
997            + StageCheckpointReader
998            + ChainSpecProvider
999            + StorageSettingsCache,
1000        N: NodePrimitives<Receipt: Value, BlockHeader: Value, SignedTx: Value>,
1001    {
1002        // OVM historical import is broken and does not work with this check. It's importing
1003        // duplicated receipts resulting in having more receipts than the expected transaction
1004        // range.
1005        //
1006        // If we detect an OVM import was done (block #1 <https://optimistic.etherscan.io/block/1>), skip it.
1007        // More on [#11099](https://github.com/paradigmxyz/reth/pull/11099).
1008        if provider.chain_spec().is_optimism() &&
1009            reth_chainspec::Chain::optimism_mainnet() == provider.chain_spec().chain_id()
1010        {
1011            // check whether we have the first OVM block: <https://optimistic.etherscan.io/block/0xbee7192e575af30420cae0c7776304ac196077ee72b048970549e4f08e875453>
1012            const OVM_HEADER_1_HASH: B256 =
1013                b256!("0xbee7192e575af30420cae0c7776304ac196077ee72b048970549e4f08e875453");
1014            if provider.block_number(OVM_HEADER_1_HASH)?.is_some() {
1015                info!(target: "reth::cli",
1016                    "Skipping storage verification for OP mainnet, expected inconsistency in OVM chain"
1017                );
1018                return Ok(None)
1019            }
1020        }
1021
1022        info!(target: "reth::cli", "Verifying storage consistency.");
1023
1024        let mut unwind_target: Option<BlockNumber> = None;
1025        let mut update_unwind_target = |new_target: BlockNumber| {
1026            if let Some(target) = unwind_target.as_mut() {
1027                *target = (*target).min(new_target);
1028            } else {
1029                unwind_target = Some(new_target);
1030            }
1031        };
1032
1033        for segment in StaticFileSegment::iter() {
1034            debug!(target: "reth::providers::static_file", ?segment, "Checking consistency for segment");
1035            match segment {
1036                StaticFileSegment::Headers | StaticFileSegment::Transactions => {}
1037                StaticFileSegment::Receipts => {
1038                    if EitherWriter::receipts_destination(provider).is_database() {
1039                        // Old pruned nodes (including full node) do not store receipts as static
1040                        // files.
1041                        debug!(target: "reth::providers::static_file", ?segment, "Skipping receipts consistency check: receipts stored in database");
1042                        continue
1043                    }
1044
1045                    if NamedChain::Gnosis == provider.chain_spec().chain_id() ||
1046                        NamedChain::Chiado == provider.chain_spec().chain_id()
1047                    {
1048                        // Gnosis and Chiado's historical import is broken and does not work with
1049                        // this check. They are importing receipts along
1050                        // with importing headers/bodies.
1051                        debug!(target: "reth::providers::static_file", ?segment, "Skipping receipts consistency check: broken historical import for gnosis/chiado");
1052                        continue;
1053                    }
1054                }
1055                StaticFileSegment::TransactionSenders => {
1056                    if EitherWriterDestination::senders(provider).is_database() {
1057                        continue
1058                    }
1059                }
1060            }
1061
1062            let initial_highest_block = self.get_highest_static_file_block(segment);
1063            debug!(target: "reth::providers::static_file", ?segment, ?initial_highest_block, "Initial highest block for segment");
1064
1065            //  File consistency is broken if:
1066            //
1067            // * appending data was interrupted before a config commit, then data file will be
1068            //   truncated according to the config.
1069            //
1070            // * pruning data was interrupted before a config commit, then we have deleted data that
1071            //   we are expected to still have. We need to check the Database and unwind everything
1072            //   accordingly.
1073            if self.access.is_read_only() {
1074                debug!(target: "reth::providers::static_file", ?segment, "Checking segment consistency (read-only)");
1075                self.check_segment_consistency(segment)?;
1076            } else {
1077                debug!(target: "reth::providers::static_file", ?segment, "Fetching latest writer which might heal any potential inconsistency");
1078                // Fetching the writer will attempt to heal any file level inconsistency.
1079                self.latest_writer(segment)?;
1080            }
1081
1082            // Only applies to block-based static files. (Headers)
1083            //
1084            // The updated `highest_block` may have decreased if we healed from a pruning
1085            // interruption.
1086            let mut highest_block = self.get_highest_static_file_block(segment);
1087            if initial_highest_block != highest_block {
1088                info!(
1089                    target: "reth::providers::static_file",
1090                    ?initial_highest_block,
1091                    unwind_target = highest_block,
1092                    ?segment,
1093                    "Setting unwind target."
1094                );
1095                update_unwind_target(highest_block.unwrap_or_default());
1096            }
1097
1098            // Only applies to transaction-based static files. (Receipts & Transactions)
1099            //
1100            // Make sure the last transaction matches the last block from its indices, since a heal
1101            // from a pruning interruption might have decreased the number of transactions without
1102            // being able to update the last block of the static file segment.
1103            let highest_tx = self.get_highest_static_file_tx(segment);
1104            debug!(target: "reth::providers::static_file", ?segment, ?highest_tx, ?highest_block, "Highest transaction for segment");
1105            if let Some(highest_tx) = highest_tx {
1106                let mut last_block = highest_block.unwrap_or_default();
1107                debug!(target: "reth::providers::static_file", ?segment, last_block, highest_tx, "Verifying last transaction matches last block indices");
1108                loop {
1109                    if let Some(indices) = provider.block_body_indices(last_block)? {
1110                        debug!(target: "reth::providers::static_file", ?segment, last_block, last_tx_num = indices.last_tx_num(), highest_tx, "Found block body indices");
1111                        if indices.last_tx_num() <= highest_tx {
1112                            break
1113                        }
1114                    } else {
1115                        debug!(target: "reth::providers::static_file", ?segment, last_block, "Block body indices not found, static files ahead of database");
1116                        // If the block body indices can not be found, then it means that static
1117                        // files is ahead of database, and the `ensure_invariants` check will fix
1118                        // it by comparing with stage checkpoints.
1119                        break
1120                    }
1121                    if last_block == 0 {
1122                        debug!(target: "reth::providers::static_file", ?segment, "Reached block 0 in verification loop");
1123                        break
1124                    }
1125                    last_block -= 1;
1126
1127                    info!(
1128                        target: "reth::providers::static_file",
1129                        highest_block = self.get_highest_static_file_block(segment),
1130                        unwind_target = last_block,
1131                        ?segment,
1132                        "Setting unwind target."
1133                    );
1134                    highest_block = Some(last_block);
1135                    update_unwind_target(last_block);
1136                }
1137            }
1138
1139            debug!(target: "reth::providers::static_file", ?segment, "Ensuring invariants for segment");
1140            if let Some(unwind) = match segment {
1141                StaticFileSegment::Headers => self
1142                    .ensure_invariants::<_, tables::Headers<N::BlockHeader>>(
1143                        provider,
1144                        segment,
1145                        highest_block,
1146                        highest_block,
1147                    )?,
1148                StaticFileSegment::Transactions => self
1149                    .ensure_invariants::<_, tables::Transactions<N::SignedTx>>(
1150                        provider,
1151                        segment,
1152                        highest_tx,
1153                        highest_block,
1154                    )?,
1155                StaticFileSegment::Receipts => self
1156                    .ensure_invariants::<_, tables::Receipts<N::Receipt>>(
1157                        provider,
1158                        segment,
1159                        highest_tx,
1160                        highest_block,
1161                    )?,
1162                StaticFileSegment::TransactionSenders => self
1163                    .ensure_invariants::<_, tables::TransactionSenders>(
1164                        provider,
1165                        segment,
1166                        highest_tx,
1167                        highest_block,
1168                    )?,
1169            } {
1170                debug!(target: "reth::providers::static_file", ?segment, unwind_target=unwind, "Invariants check returned unwind target");
1171                update_unwind_target(unwind);
1172            } else {
1173                debug!(target: "reth::providers::static_file", ?segment, "Invariants check completed, no unwind needed");
1174            }
1175        }
1176
1177        Ok(unwind_target.map(PipelineTarget::Unwind))
1178    }
1179
1180    /// Checks consistency of the latest static file segment and throws an error if at fault.
1181    /// Read-only.
1182    pub fn check_segment_consistency(&self, segment: StaticFileSegment) -> ProviderResult<()> {
1183        debug!(target: "reth::providers::static_file", ?segment, "Checking segment consistency");
1184        if let Some(latest_block) = self.get_highest_static_file_block(segment) {
1185            let file_path = self
1186                .directory()
1187                .join(segment.filename(&self.find_fixed_range(segment, latest_block)));
1188            debug!(target: "reth::providers::static_file", ?segment, ?file_path, latest_block, "Loading NippyJar for consistency check");
1189
1190            let jar = NippyJar::<SegmentHeader>::load(&file_path).map_err(ProviderError::other)?;
1191            debug!(target: "reth::providers::static_file", ?segment, "NippyJar loaded, checking consistency");
1192
1193            NippyJarChecker::new(jar).check_consistency().map_err(ProviderError::other)?;
1194            debug!(target: "reth::providers::static_file", ?segment, "NippyJar consistency check passed");
1195        } else {
1196            debug!(target: "reth::providers::static_file", ?segment, "No static file block found, skipping consistency check");
1197        }
1198        Ok(())
1199    }
1200
1201    /// Check invariants for each corresponding table and static file segment:
1202    ///
1203    /// * the corresponding database table should overlap or have continuity in their keys
1204    ///   ([`TxNumber`] or [`BlockNumber`]).
1205    /// * its highest block should match the stage checkpoint block number if it's equal or higher
1206    ///   than the corresponding database table last entry.
1207    ///   * If the checkpoint block is higher, then request a pipeline unwind to the static file
1208    ///     block. This is expressed by returning [`Some`] with the requested pipeline unwind
1209    ///     target.
1210    ///   * If the checkpoint block is lower, then heal by removing rows from the static file. In
1211    ///     this case, the rows will be removed and [`None`] will be returned.
1212    ///
1213    /// * If the database tables overlap with static files and have contiguous keys, or the
1214    ///   checkpoint block matches the highest static files block, then [`None`] will be returned.
1215    fn ensure_invariants<Provider, T: Table<Key = u64>>(
1216        &self,
1217        provider: &Provider,
1218        segment: StaticFileSegment,
1219        highest_static_file_entry: Option<u64>,
1220        highest_static_file_block: Option<BlockNumber>,
1221    ) -> ProviderResult<Option<BlockNumber>>
1222    where
1223        Provider: DBProvider + BlockReader + StageCheckpointReader,
1224    {
1225        debug!(target: "reth::providers::static_file", ?segment, ?highest_static_file_entry, ?highest_static_file_block, "Ensuring invariants");
1226        let mut db_cursor = provider.tx_ref().cursor_read::<T>()?;
1227
1228        if let Some((db_first_entry, _)) = db_cursor.first()? {
1229            debug!(target: "reth::providers::static_file", ?segment, db_first_entry, "Found first database entry");
1230            if let (Some(highest_entry), Some(highest_block)) =
1231                (highest_static_file_entry, highest_static_file_block)
1232            {
1233                // If there is a gap between the entry found in static file and
1234                // database, then we have most likely lost static file data and need to unwind so we
1235                // can load it again
1236                if !(db_first_entry <= highest_entry || highest_entry + 1 == db_first_entry) {
1237                    info!(
1238                        target: "reth::providers::static_file",
1239                        ?db_first_entry,
1240                        ?highest_entry,
1241                        unwind_target = highest_block,
1242                        ?segment,
1243                        "Setting unwind target."
1244                    );
1245                    return Ok(Some(highest_block))
1246                }
1247            }
1248
1249            if let Some((db_last_entry, _)) = db_cursor.last()? &&
1250                highest_static_file_entry
1251                    .is_none_or(|highest_entry| db_last_entry > highest_entry)
1252            {
1253                debug!(target: "reth::providers::static_file", ?segment, db_last_entry, ?highest_static_file_entry, "Database has entries beyond static files, no unwind needed");
1254                return Ok(None)
1255            }
1256        } else {
1257            debug!(target: "reth::providers::static_file", ?segment, "No database entries found");
1258        }
1259
1260        let highest_static_file_entry = highest_static_file_entry.unwrap_or_default();
1261        let highest_static_file_block = highest_static_file_block.unwrap_or_default();
1262
1263        // If static file entry is ahead of the database entries, then ensure the checkpoint block
1264        // number matches.
1265        let stage_id = match segment {
1266            StaticFileSegment::Headers => StageId::Headers,
1267            StaticFileSegment::Transactions => StageId::Bodies,
1268            StaticFileSegment::Receipts => StageId::Execution,
1269            StaticFileSegment::TransactionSenders => StageId::SenderRecovery,
1270        };
1271        let checkpoint_block_number =
1272            provider.get_stage_checkpoint(stage_id)?.unwrap_or_default().block_number;
1273        debug!(target: "reth::providers::static_file", ?segment, ?stage_id, checkpoint_block_number, highest_static_file_block, "Retrieved stage checkpoint");
1274
1275        // If the checkpoint is ahead, then we lost static file data. May be data corruption.
1276        if checkpoint_block_number > highest_static_file_block {
1277            info!(
1278                target: "reth::providers::static_file",
1279                checkpoint_block_number,
1280                unwind_target = highest_static_file_block,
1281                ?segment,
1282                "Setting unwind target."
1283            );
1284            return Ok(Some(highest_static_file_block))
1285        }
1286
1287        // If the checkpoint is behind, then we failed to do a database commit **but committed** to
1288        // static files on executing a stage, or the reverse on unwinding a stage.
1289        // All we need to do is to prune the extra static file rows.
1290        if checkpoint_block_number < highest_static_file_block {
1291            info!(
1292                target: "reth::providers",
1293                ?segment,
1294                from = highest_static_file_block,
1295                to = checkpoint_block_number,
1296                "Unwinding static file segment."
1297            );
1298            let mut writer = self.latest_writer(segment)?;
1299            match segment {
1300                StaticFileSegment::Headers => {
1301                    let prune_count = highest_static_file_block - checkpoint_block_number;
1302                    debug!(target: "reth::providers::static_file", ?segment, prune_count, "Pruning headers");
1303                    // TODO(joshie): is_block_meta
1304                    writer.prune_headers(prune_count)?;
1305                }
1306                StaticFileSegment::Transactions |
1307                StaticFileSegment::Receipts |
1308                StaticFileSegment::TransactionSenders => {
1309                    if let Some(block) = provider.block_body_indices(checkpoint_block_number)? {
1310                        let number = highest_static_file_entry - block.last_tx_num();
1311                        debug!(target: "reth::providers::static_file", ?segment, prune_count = number, checkpoint_block_number, "Pruning transaction based segment");
1312
1313                        match segment {
1314                            StaticFileSegment::Transactions => {
1315                                writer.prune_transactions(number, checkpoint_block_number)?
1316                            }
1317                            StaticFileSegment::Receipts => {
1318                                writer.prune_receipts(number, checkpoint_block_number)?
1319                            }
1320                            StaticFileSegment::TransactionSenders => {
1321                                writer.prune_transaction_senders(number, checkpoint_block_number)?
1322                            }
1323                            StaticFileSegment::Headers => unreachable!(),
1324                        }
1325                    } else {
1326                        debug!(target: "reth::providers::static_file", ?segment, checkpoint_block_number, "No block body indices found for checkpoint block");
1327                    }
1328                }
1329            }
1330            debug!(target: "reth::providers::static_file", ?segment, "Committing writer after pruning");
1331            writer.commit()?;
1332            debug!(target: "reth::providers::static_file", ?segment, "Writer committed successfully");
1333        }
1334
1335        debug!(target: "reth::providers::static_file", ?segment, "Invariants ensured, returning None");
1336        Ok(None)
1337    }
1338
1339    /// Returns the earliest available block number that has not been expired and is still
1340    /// available.
1341    ///
1342    /// This means that the highest expired block (or expired block height) is
1343    /// `earliest_history_height.saturating_sub(1)`.
1344    ///
1345    /// Returns `0` if no history has been expired.
1346    pub fn earliest_history_height(&self) -> BlockNumber {
1347        self.earliest_history_height.load(std::sync::atomic::Ordering::Relaxed)
1348    }
1349
1350    /// Gets the lowest static file's block range if it exists for a static file segment.
1351    ///
1352    /// If there is nothing on disk for the given segment, this will return [`None`].
1353    pub fn get_lowest_range(&self, segment: StaticFileSegment) -> Option<SegmentRangeInclusive> {
1354        self.indexes.read().get(&segment).and_then(|index| index.min_block_range)
1355    }
1356
1357    /// Gets the lowest static file's block range start if it exists for a static file segment.
1358    ///
1359    /// For example if the lowest static file has blocks 0-499, this will return 0.
1360    ///
1361    /// If there is nothing on disk for the given segment, this will return [`None`].
1362    pub fn get_lowest_range_start(&self, segment: StaticFileSegment) -> Option<BlockNumber> {
1363        self.get_lowest_range(segment).map(|range| range.start())
1364    }
1365
1366    /// Gets the lowest static file's block range end if it exists for a static file segment.
1367    ///
1368    /// For example if the static file has blocks 0-499, this will return 499.
1369    ///
1370    /// If there is nothing on disk for the given segment, this will return [`None`].
1371    pub fn get_lowest_range_end(&self, segment: StaticFileSegment) -> Option<BlockNumber> {
1372        self.get_lowest_range(segment).map(|range| range.end())
1373    }
1374
1375    /// Gets the highest static file's block height if it exists for a static file segment.
1376    ///
1377    /// If there is nothing on disk for the given segment, this will return [`None`].
1378    pub fn get_highest_static_file_block(&self, segment: StaticFileSegment) -> Option<BlockNumber> {
1379        self.indexes.read().get(&segment).map(|index| index.max_block)
1380    }
1381
1382    /// Gets the highest static file transaction.
1383    ///
1384    /// If there is nothing on disk for the given segment, this will return [`None`].
1385    pub fn get_highest_static_file_tx(&self, segment: StaticFileSegment) -> Option<TxNumber> {
1386        self.indexes
1387            .read()
1388            .get(&segment)
1389            .and_then(|index| index.available_block_ranges_by_max_tx.as_ref())
1390            .and_then(|index| index.last_key_value().map(|(last_tx, _)| *last_tx))
1391    }
1392
1393    /// Gets the highest static file block for all segments.
1394    pub fn get_highest_static_files(&self) -> HighestStaticFiles {
1395        HighestStaticFiles {
1396            receipts: self.get_highest_static_file_block(StaticFileSegment::Receipts),
1397        }
1398    }
1399
1400    /// Iterates through segment `static_files` in reverse order, executing a function until it
1401    /// returns some object. Useful for finding objects by [`TxHash`] or [`BlockHash`].
1402    pub fn find_static_file<T>(
1403        &self,
1404        segment: StaticFileSegment,
1405        func: impl Fn(StaticFileJarProvider<'_, N>) -> ProviderResult<Option<T>>,
1406    ) -> ProviderResult<Option<T>> {
1407        if let Some(ranges) =
1408            self.indexes.read().get(&segment).map(|index| &index.expected_block_ranges_by_max_block)
1409        {
1410            // Iterate through all ranges in reverse order (highest to lowest)
1411            for range in ranges.values().rev() {
1412                if let Some(res) = func(self.get_or_create_jar_provider(segment, range)?)? {
1413                    return Ok(Some(res))
1414                }
1415            }
1416        }
1417
1418        Ok(None)
1419    }
1420
1421    /// Fetches data within a specified range across multiple static files.
1422    ///
1423    /// This function iteratively retrieves data using `get_fn` for each item in the given range.
1424    /// It continues fetching until the end of the range is reached or the provided `predicate`
1425    /// returns false.
1426    pub fn fetch_range_with_predicate<T, F, P>(
1427        &self,
1428        segment: StaticFileSegment,
1429        range: Range<u64>,
1430        mut get_fn: F,
1431        mut predicate: P,
1432    ) -> ProviderResult<Vec<T>>
1433    where
1434        F: FnMut(&mut StaticFileCursor<'_>, u64) -> ProviderResult<Option<T>>,
1435        P: FnMut(&T) -> bool,
1436    {
1437        let mut result = Vec::with_capacity((range.end - range.start).min(100) as usize);
1438
1439        /// Resolves to the provider for the given block or transaction number.
1440        ///
1441        /// If the static file is missing, the `result` is returned.
1442        macro_rules! get_provider {
1443            ($number:expr) => {{
1444                match self.get_segment_provider(segment, $number) {
1445                    Ok(provider) => provider,
1446                    Err(
1447                        ProviderError::MissingStaticFileBlock(_, _) |
1448                        ProviderError::MissingStaticFileTx(_, _),
1449                    ) => return Ok(result),
1450                    Err(err) => return Err(err),
1451                }
1452            }};
1453        }
1454
1455        let mut provider = get_provider!(range.start);
1456        let mut cursor = provider.cursor()?;
1457
1458        // advances number in range
1459        'outer: for number in range {
1460            // The `retrying` flag ensures a single retry attempt per `number`. If `get_fn` fails to
1461            // access data in two different static files, it halts further attempts by returning
1462            // an error, effectively preventing infinite retry loops.
1463            let mut retrying = false;
1464
1465            // advances static files if `get_fn` returns None
1466            'inner: loop {
1467                match get_fn(&mut cursor, number)? {
1468                    Some(res) => {
1469                        if !predicate(&res) {
1470                            break 'outer
1471                        }
1472                        result.push(res);
1473                        break 'inner
1474                    }
1475                    None => {
1476                        if retrying {
1477                            return Ok(result)
1478                        }
1479                        // There is a very small chance of hitting a deadlock if two consecutive
1480                        // static files share the same bucket in the
1481                        // internal dashmap and we don't drop the current provider
1482                        // before requesting the next one.
1483                        drop(cursor);
1484                        drop(provider);
1485                        provider = get_provider!(number);
1486                        cursor = provider.cursor()?;
1487                        retrying = true;
1488                    }
1489                }
1490            }
1491        }
1492
1493        Ok(result)
1494    }
1495
1496    /// Fetches data within a specified range across multiple static files.
1497    ///
1498    /// Returns an iterator over the data. Yields [`None`] if the data for the specified number is
1499    /// not found.
1500    pub fn fetch_range_iter<'a, T, F>(
1501        &'a self,
1502        segment: StaticFileSegment,
1503        range: Range<u64>,
1504        get_fn: F,
1505    ) -> ProviderResult<impl Iterator<Item = ProviderResult<Option<T>>> + 'a>
1506    where
1507        F: Fn(&mut StaticFileCursor<'_>, u64) -> ProviderResult<Option<T>> + 'a,
1508        T: std::fmt::Debug,
1509    {
1510        let mut provider = self.get_maybe_segment_provider(segment, range.start)?;
1511        Ok(range.map(move |number| {
1512            match provider
1513                .as_ref()
1514                .map(|provider| get_fn(&mut provider.cursor()?, number))
1515                .and_then(|result| result.transpose())
1516            {
1517                Some(result) => result.map(Some),
1518                None => {
1519                    // There is a very small chance of hitting a deadlock if two consecutive
1520                    // static files share the same bucket in the internal dashmap and we don't drop
1521                    // the current provider before requesting the next one.
1522                    provider.take();
1523                    provider = self.get_maybe_segment_provider(segment, number)?;
1524                    provider
1525                        .as_ref()
1526                        .map(|provider| get_fn(&mut provider.cursor()?, number))
1527                        .and_then(|result| result.transpose())
1528                        .transpose()
1529                }
1530            }
1531        }))
1532    }
1533
1534    /// Returns directory where `static_files` are located.
1535    pub fn directory(&self) -> &Path {
1536        &self.path
1537    }
1538
1539    /// Retrieves data from the database or static file, wherever it's available.
1540    ///
1541    /// # Arguments
1542    /// * `segment` - The segment of the static file to check against.
1543    /// * `index_key` - Requested index key, usually a block or transaction number.
1544    /// * `fetch_from_static_file` - A closure that defines how to fetch the data from the static
1545    ///   file provider.
1546    /// * `fetch_from_database` - A closure that defines how to fetch the data from the database
1547    ///   when the static file doesn't contain the required data or is not available.
1548    pub fn get_with_static_file_or_database<T, FS, FD>(
1549        &self,
1550        segment: StaticFileSegment,
1551        number: u64,
1552        fetch_from_static_file: FS,
1553        fetch_from_database: FD,
1554    ) -> ProviderResult<Option<T>>
1555    where
1556        FS: Fn(&Self) -> ProviderResult<Option<T>>,
1557        FD: Fn() -> ProviderResult<Option<T>>,
1558    {
1559        // If there is, check the maximum block or transaction number of the segment.
1560        let static_file_upper_bound = if segment.is_block_based() {
1561            self.get_highest_static_file_block(segment)
1562        } else {
1563            self.get_highest_static_file_tx(segment)
1564        };
1565
1566        if static_file_upper_bound
1567            .is_some_and(|static_file_upper_bound| static_file_upper_bound >= number)
1568        {
1569            return fetch_from_static_file(self)
1570        }
1571        fetch_from_database()
1572    }
1573
1574    /// Gets data within a specified range, potentially spanning different `static_files` and
1575    /// database.
1576    ///
1577    /// # Arguments
1578    /// * `segment` - The segment of the static file to query.
1579    /// * `block_or_tx_range` - The range of data to fetch.
1580    /// * `fetch_from_static_file` - A function to fetch data from the `static_file`.
1581    /// * `fetch_from_database` - A function to fetch data from the database.
1582    /// * `predicate` - A function used to evaluate each item in the fetched data. Fetching is
1583    ///   terminated when this function returns false, thereby filtering the data based on the
1584    ///   provided condition.
1585    pub fn get_range_with_static_file_or_database<T, P, FS, FD>(
1586        &self,
1587        segment: StaticFileSegment,
1588        mut block_or_tx_range: Range<u64>,
1589        fetch_from_static_file: FS,
1590        mut fetch_from_database: FD,
1591        mut predicate: P,
1592    ) -> ProviderResult<Vec<T>>
1593    where
1594        FS: Fn(&Self, Range<u64>, &mut P) -> ProviderResult<Vec<T>>,
1595        FD: FnMut(Range<u64>, P) -> ProviderResult<Vec<T>>,
1596        P: FnMut(&T) -> bool,
1597    {
1598        let mut data = Vec::new();
1599
1600        // If there is, check the maximum block or transaction number of the segment.
1601        if let Some(static_file_upper_bound) = if segment.is_block_based() {
1602            self.get_highest_static_file_block(segment)
1603        } else {
1604            self.get_highest_static_file_tx(segment)
1605        } && block_or_tx_range.start <= static_file_upper_bound
1606        {
1607            let end = block_or_tx_range.end.min(static_file_upper_bound + 1);
1608            data.extend(fetch_from_static_file(
1609                self,
1610                block_or_tx_range.start..end,
1611                &mut predicate,
1612            )?);
1613            block_or_tx_range.start = end;
1614        }
1615
1616        if block_or_tx_range.end > block_or_tx_range.start {
1617            data.extend(fetch_from_database(block_or_tx_range, predicate)?)
1618        }
1619
1620        Ok(data)
1621    }
1622
1623    /// Returns static files directory
1624    #[cfg(any(test, feature = "test-utils"))]
1625    pub fn path(&self) -> &Path {
1626        &self.path
1627    }
1628
1629    /// Returns transaction index
1630    #[cfg(any(test, feature = "test-utils"))]
1631    pub fn tx_index(&self, segment: StaticFileSegment) -> Option<SegmentRanges> {
1632        self.indexes
1633            .read()
1634            .get(&segment)
1635            .and_then(|index| index.available_block_ranges_by_max_tx.as_ref())
1636            .cloned()
1637    }
1638
1639    /// Returns expected block index
1640    #[cfg(any(test, feature = "test-utils"))]
1641    pub fn expected_block_index(&self, segment: StaticFileSegment) -> Option<SegmentRanges> {
1642        self.indexes
1643            .read()
1644            .get(&segment)
1645            .map(|index| &index.expected_block_ranges_by_max_block)
1646            .cloned()
1647    }
1648}
1649
1650#[derive(Debug)]
1651struct StaticFileSegmentIndex {
1652    /// Min static file block range.
1653    ///
1654    /// This index is initialized on launch to keep track of the lowest, non-expired static file
1655    /// per segment and gets updated on [`StaticFileProvider::update_index`].
1656    ///
1657    /// This tracks the lowest static file per segment together with the block range in that
1658    /// file. E.g. static file is batched in 500k block intervals then the lowest static file
1659    /// is [0..499K], and the block range is start = 0, end = 499K.
1660    ///
1661    /// This index is mainly used for history expiry, which targets transactions, e.g. pre-merge
1662    /// history expiry would lead to removing all static files below the merge height.
1663    min_block_range: Option<SegmentRangeInclusive>,
1664    /// Max static file block.
1665    max_block: u64,
1666    /// Expected static file block ranges indexed by max expected blocks.
1667    ///
1668    /// For example, a static file for expected block range `0..=499_000` may have only block range
1669    /// `0..=1000` contained in it, as it's not fully filled yet. This index maps the max expected
1670    /// block to the expected range, i.e. block `499_000` to block range `0..=499_000`.
1671    expected_block_ranges_by_max_block: SegmentRanges,
1672    /// Available on disk static file block ranges indexed by max transactions.
1673    ///
1674    /// For example, a static file for block range `0..=499_000` may only have block range
1675    /// `0..=1000` and transaction range `0..=2000` contained in it. This index maps the max
1676    /// available transaction to the available block range, i.e. transaction `2000` to block range
1677    /// `0..=1000`.
1678    available_block_ranges_by_max_tx: Option<SegmentRanges>,
1679}
1680
1681/// Helper trait to manage different [`StaticFileProviderRW`] of an `Arc<StaticFileProvider`
1682pub trait StaticFileWriter {
1683    /// The primitives type used by the static file provider.
1684    type Primitives: Send + Sync + 'static;
1685
1686    /// Returns a mutable reference to a [`StaticFileProviderRW`] of a [`StaticFileSegment`].
1687    fn get_writer(
1688        &self,
1689        block: BlockNumber,
1690        segment: StaticFileSegment,
1691    ) -> ProviderResult<StaticFileProviderRWRefMut<'_, Self::Primitives>>;
1692
1693    /// Returns a mutable reference to a [`StaticFileProviderRW`] of the latest
1694    /// [`StaticFileSegment`].
1695    fn latest_writer(
1696        &self,
1697        segment: StaticFileSegment,
1698    ) -> ProviderResult<StaticFileProviderRWRefMut<'_, Self::Primitives>>;
1699
1700    /// Commits all changes of all [`StaticFileProviderRW`] of all [`StaticFileSegment`].
1701    fn commit(&self) -> ProviderResult<()>;
1702
1703    /// Returns `true` if the static file provider has unwind queued.
1704    fn has_unwind_queued(&self) -> bool;
1705}
1706
1707impl<N: NodePrimitives> StaticFileWriter for StaticFileProvider<N> {
1708    type Primitives = N;
1709
1710    fn get_writer(
1711        &self,
1712        block: BlockNumber,
1713        segment: StaticFileSegment,
1714    ) -> ProviderResult<StaticFileProviderRWRefMut<'_, Self::Primitives>> {
1715        if self.access.is_read_only() {
1716            return Err(ProviderError::ReadOnlyStaticFileAccess)
1717        }
1718
1719        trace!(target: "provider::static_file", ?block, ?segment, "Getting static file writer.");
1720        self.writers.get_or_create(segment, || {
1721            StaticFileProviderRW::new(segment, block, Arc::downgrade(&self.0), self.metrics.clone())
1722        })
1723    }
1724
1725    fn latest_writer(
1726        &self,
1727        segment: StaticFileSegment,
1728    ) -> ProviderResult<StaticFileProviderRWRefMut<'_, Self::Primitives>> {
1729        self.get_writer(self.get_highest_static_file_block(segment).unwrap_or_default(), segment)
1730    }
1731
1732    fn commit(&self) -> ProviderResult<()> {
1733        self.writers.commit()
1734    }
1735
1736    fn has_unwind_queued(&self) -> bool {
1737        self.writers.has_unwind_queued()
1738    }
1739}
1740
1741impl<N: NodePrimitives<BlockHeader: Value>> HeaderProvider for StaticFileProvider<N> {
1742    type Header = N::BlockHeader;
1743
1744    fn header(&self, block_hash: BlockHash) -> ProviderResult<Option<Self::Header>> {
1745        self.find_static_file(StaticFileSegment::Headers, |jar_provider| {
1746            Ok(jar_provider
1747                .cursor()?
1748                .get_two::<HeaderWithHashMask<Self::Header>>((&block_hash).into())?
1749                .and_then(|(header, hash)| {
1750                    if hash == block_hash {
1751                        return Some(header)
1752                    }
1753                    None
1754                }))
1755        })
1756    }
1757
1758    fn header_by_number(&self, num: BlockNumber) -> ProviderResult<Option<Self::Header>> {
1759        self.get_segment_provider_for_block(StaticFileSegment::Headers, num, None)
1760            .and_then(|provider| provider.header_by_number(num))
1761            .or_else(|err| {
1762                if let ProviderError::MissingStaticFileBlock(_, _) = err {
1763                    Ok(None)
1764                } else {
1765                    Err(err)
1766                }
1767            })
1768    }
1769
1770    fn headers_range(
1771        &self,
1772        range: impl RangeBounds<BlockNumber>,
1773    ) -> ProviderResult<Vec<Self::Header>> {
1774        self.fetch_range_with_predicate(
1775            StaticFileSegment::Headers,
1776            to_range(range),
1777            |cursor, number| cursor.get_one::<HeaderMask<Self::Header>>(number.into()),
1778            |_| true,
1779        )
1780    }
1781
1782    fn sealed_header(
1783        &self,
1784        num: BlockNumber,
1785    ) -> ProviderResult<Option<SealedHeader<Self::Header>>> {
1786        self.get_segment_provider_for_block(StaticFileSegment::Headers, num, None)
1787            .and_then(|provider| provider.sealed_header(num))
1788            .or_else(|err| {
1789                if let ProviderError::MissingStaticFileBlock(_, _) = err {
1790                    Ok(None)
1791                } else {
1792                    Err(err)
1793                }
1794            })
1795    }
1796
1797    fn sealed_headers_while(
1798        &self,
1799        range: impl RangeBounds<BlockNumber>,
1800        predicate: impl FnMut(&SealedHeader<Self::Header>) -> bool,
1801    ) -> ProviderResult<Vec<SealedHeader<Self::Header>>> {
1802        self.fetch_range_with_predicate(
1803            StaticFileSegment::Headers,
1804            to_range(range),
1805            |cursor, number| {
1806                Ok(cursor
1807                    .get_two::<HeaderWithHashMask<Self::Header>>(number.into())?
1808                    .map(|(header, hash)| SealedHeader::new(header, hash)))
1809            },
1810            predicate,
1811        )
1812    }
1813}
1814
1815impl<N: NodePrimitives> BlockHashReader for StaticFileProvider<N> {
1816    fn block_hash(&self, num: u64) -> ProviderResult<Option<B256>> {
1817        self.get_segment_provider_for_block(StaticFileSegment::Headers, num, None)
1818            .and_then(|provider| provider.block_hash(num))
1819            .or_else(|err| {
1820                if let ProviderError::MissingStaticFileBlock(_, _) = err {
1821                    Ok(None)
1822                } else {
1823                    Err(err)
1824                }
1825            })
1826    }
1827
1828    fn canonical_hashes_range(
1829        &self,
1830        start: BlockNumber,
1831        end: BlockNumber,
1832    ) -> ProviderResult<Vec<B256>> {
1833        self.fetch_range_with_predicate(
1834            StaticFileSegment::Headers,
1835            start..end,
1836            |cursor, number| cursor.get_one::<BlockHashMask>(number.into()),
1837            |_| true,
1838        )
1839    }
1840}
1841
1842impl<N: NodePrimitives<SignedTx: Value + SignedTransaction, Receipt: Value>> ReceiptProvider
1843    for StaticFileProvider<N>
1844{
1845    type Receipt = N::Receipt;
1846
1847    fn receipt(&self, num: TxNumber) -> ProviderResult<Option<Self::Receipt>> {
1848        self.get_segment_provider_for_transaction(StaticFileSegment::Receipts, num, None)
1849            .and_then(|provider| provider.receipt(num))
1850            .or_else(|err| {
1851                if let ProviderError::MissingStaticFileTx(_, _) = err {
1852                    Ok(None)
1853                } else {
1854                    Err(err)
1855                }
1856            })
1857    }
1858
1859    fn receipt_by_hash(&self, hash: TxHash) -> ProviderResult<Option<Self::Receipt>> {
1860        if let Some(num) = self.transaction_id(hash)? {
1861            return self.receipt(num)
1862        }
1863        Ok(None)
1864    }
1865
1866    fn receipts_by_block(
1867        &self,
1868        _block: BlockHashOrNumber,
1869    ) -> ProviderResult<Option<Vec<Self::Receipt>>> {
1870        unreachable!()
1871    }
1872
1873    fn receipts_by_tx_range(
1874        &self,
1875        range: impl RangeBounds<TxNumber>,
1876    ) -> ProviderResult<Vec<Self::Receipt>> {
1877        self.fetch_range_with_predicate(
1878            StaticFileSegment::Receipts,
1879            to_range(range),
1880            |cursor, number| cursor.get_one::<ReceiptMask<Self::Receipt>>(number.into()),
1881            |_| true,
1882        )
1883    }
1884
1885    fn receipts_by_block_range(
1886        &self,
1887        _block_range: RangeInclusive<BlockNumber>,
1888    ) -> ProviderResult<Vec<Vec<Self::Receipt>>> {
1889        Err(ProviderError::UnsupportedProvider)
1890    }
1891}
1892
1893impl<N: NodePrimitives<SignedTx: Value, Receipt: Value, BlockHeader: Value>> TransactionsProviderExt
1894    for StaticFileProvider<N>
1895{
1896    fn transaction_hashes_by_range(
1897        &self,
1898        tx_range: Range<TxNumber>,
1899    ) -> ProviderResult<Vec<(TxHash, TxNumber)>> {
1900        let tx_range_size = (tx_range.end - tx_range.start) as usize;
1901
1902        // Transactions are different size, so chunks will not all take the same processing time. If
1903        // chunks are too big, there will be idle threads waiting for work. Choosing an
1904        // arbitrary smaller value to make sure it doesn't happen.
1905        let chunk_size = 100;
1906
1907        // iterator over the chunks
1908        let chunks = tx_range
1909            .clone()
1910            .step_by(chunk_size)
1911            .map(|start| start..std::cmp::min(start + chunk_size as u64, tx_range.end));
1912        let mut channels = Vec::with_capacity(tx_range_size.div_ceil(chunk_size));
1913
1914        for chunk_range in chunks {
1915            let (channel_tx, channel_rx) = mpsc::channel();
1916            channels.push(channel_rx);
1917
1918            let manager = self.clone();
1919
1920            // Spawn the task onto the global rayon pool
1921            // This task will send the results through the channel after it has calculated
1922            // the hash.
1923            rayon::spawn(move || {
1924                let mut rlp_buf = Vec::with_capacity(128);
1925                let _ = manager.fetch_range_with_predicate(
1926                    StaticFileSegment::Transactions,
1927                    chunk_range,
1928                    |cursor, number| {
1929                        Ok(cursor
1930                            .get_one::<TransactionMask<Self::Transaction>>(number.into())?
1931                            .map(|transaction| {
1932                                rlp_buf.clear();
1933                                let _ = channel_tx
1934                                    .send(calculate_hash((number, transaction), &mut rlp_buf));
1935                            }))
1936                    },
1937                    |_| true,
1938                );
1939            });
1940        }
1941
1942        let mut tx_list = Vec::with_capacity(tx_range_size);
1943
1944        // Iterate over channels and append the tx hashes unsorted
1945        for channel in channels {
1946            while let Ok(tx) = channel.recv() {
1947                let (tx_hash, tx_id) = tx.map_err(|boxed| *boxed)?;
1948                tx_list.push((tx_hash, tx_id));
1949            }
1950        }
1951
1952        Ok(tx_list)
1953    }
1954}
1955
1956impl<N: NodePrimitives<SignedTx: Decompress + SignedTransaction>> TransactionsProvider
1957    for StaticFileProvider<N>
1958{
1959    type Transaction = N::SignedTx;
1960
1961    fn transaction_id(&self, tx_hash: TxHash) -> ProviderResult<Option<TxNumber>> {
1962        self.find_static_file(StaticFileSegment::Transactions, |jar_provider| {
1963            let mut cursor = jar_provider.cursor()?;
1964            if cursor
1965                .get_one::<TransactionMask<Self::Transaction>>((&tx_hash).into())?
1966                .and_then(|tx| (tx.trie_hash() == tx_hash).then_some(tx))
1967                .is_some()
1968            {
1969                Ok(cursor.number())
1970            } else {
1971                Ok(None)
1972            }
1973        })
1974    }
1975
1976    fn transaction_by_id(&self, num: TxNumber) -> ProviderResult<Option<Self::Transaction>> {
1977        self.get_segment_provider_for_transaction(StaticFileSegment::Transactions, num, None)
1978            .and_then(|provider| provider.transaction_by_id(num))
1979            .or_else(|err| {
1980                if let ProviderError::MissingStaticFileTx(_, _) = err {
1981                    Ok(None)
1982                } else {
1983                    Err(err)
1984                }
1985            })
1986    }
1987
1988    fn transaction_by_id_unhashed(
1989        &self,
1990        num: TxNumber,
1991    ) -> ProviderResult<Option<Self::Transaction>> {
1992        self.get_segment_provider_for_transaction(StaticFileSegment::Transactions, num, None)
1993            .and_then(|provider| provider.transaction_by_id_unhashed(num))
1994            .or_else(|err| {
1995                if let ProviderError::MissingStaticFileTx(_, _) = err {
1996                    Ok(None)
1997                } else {
1998                    Err(err)
1999                }
2000            })
2001    }
2002
2003    fn transaction_by_hash(&self, hash: TxHash) -> ProviderResult<Option<Self::Transaction>> {
2004        self.find_static_file(StaticFileSegment::Transactions, |jar_provider| {
2005            Ok(jar_provider
2006                .cursor()?
2007                .get_one::<TransactionMask<Self::Transaction>>((&hash).into())?
2008                .and_then(|tx| (tx.trie_hash() == hash).then_some(tx)))
2009        })
2010    }
2011
2012    fn transaction_by_hash_with_meta(
2013        &self,
2014        _hash: TxHash,
2015    ) -> ProviderResult<Option<(Self::Transaction, TransactionMeta)>> {
2016        // Required data not present in static_files
2017        Err(ProviderError::UnsupportedProvider)
2018    }
2019
2020    fn transactions_by_block(
2021        &self,
2022        _block_id: BlockHashOrNumber,
2023    ) -> ProviderResult<Option<Vec<Self::Transaction>>> {
2024        // Required data not present in static_files
2025        Err(ProviderError::UnsupportedProvider)
2026    }
2027
2028    fn transactions_by_block_range(
2029        &self,
2030        _range: impl RangeBounds<BlockNumber>,
2031    ) -> ProviderResult<Vec<Vec<Self::Transaction>>> {
2032        // Required data not present in static_files
2033        Err(ProviderError::UnsupportedProvider)
2034    }
2035
2036    fn transactions_by_tx_range(
2037        &self,
2038        range: impl RangeBounds<TxNumber>,
2039    ) -> ProviderResult<Vec<Self::Transaction>> {
2040        self.fetch_range_with_predicate(
2041            StaticFileSegment::Transactions,
2042            to_range(range),
2043            |cursor, number| cursor.get_one::<TransactionMask<Self::Transaction>>(number.into()),
2044            |_| true,
2045        )
2046    }
2047
2048    fn senders_by_tx_range(
2049        &self,
2050        range: impl RangeBounds<TxNumber>,
2051    ) -> ProviderResult<Vec<Address>> {
2052        self.fetch_range_with_predicate(
2053            StaticFileSegment::TransactionSenders,
2054            to_range(range),
2055            |cursor, number| cursor.get_one::<TransactionSenderMask>(number.into()),
2056            |_| true,
2057        )
2058    }
2059
2060    fn transaction_sender(&self, id: TxNumber) -> ProviderResult<Option<Address>> {
2061        self.get_segment_provider_for_transaction(StaticFileSegment::TransactionSenders, id, None)
2062            .and_then(|provider| provider.transaction_sender(id))
2063            .or_else(|err| {
2064                if let ProviderError::MissingStaticFileTx(_, _) = err {
2065                    Ok(None)
2066                } else {
2067                    Err(err)
2068                }
2069            })
2070    }
2071}
2072
2073impl<N: NodePrimitives> BlockNumReader for StaticFileProvider<N> {
2074    fn chain_info(&self) -> ProviderResult<ChainInfo> {
2075        // Required data not present in static_files
2076        Err(ProviderError::UnsupportedProvider)
2077    }
2078
2079    fn best_block_number(&self) -> ProviderResult<BlockNumber> {
2080        // Required data not present in static_files
2081        Err(ProviderError::UnsupportedProvider)
2082    }
2083
2084    fn last_block_number(&self) -> ProviderResult<BlockNumber> {
2085        Ok(self.get_highest_static_file_block(StaticFileSegment::Headers).unwrap_or_default())
2086    }
2087
2088    fn block_number(&self, _hash: B256) -> ProviderResult<Option<BlockNumber>> {
2089        // Required data not present in static_files
2090        Err(ProviderError::UnsupportedProvider)
2091    }
2092}
2093
2094/* Cannot be successfully implemented but must exist for trait requirements */
2095
2096impl<N: NodePrimitives<SignedTx: Value, Receipt: Value, BlockHeader: Value>> BlockReader
2097    for StaticFileProvider<N>
2098{
2099    type Block = N::Block;
2100
2101    fn find_block_by_hash(
2102        &self,
2103        _hash: B256,
2104        _source: BlockSource,
2105    ) -> ProviderResult<Option<Self::Block>> {
2106        // Required data not present in static_files
2107        Err(ProviderError::UnsupportedProvider)
2108    }
2109
2110    fn block(&self, _id: BlockHashOrNumber) -> ProviderResult<Option<Self::Block>> {
2111        // Required data not present in static_files
2112        Err(ProviderError::UnsupportedProvider)
2113    }
2114
2115    fn pending_block(&self) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
2116        // Required data not present in static_files
2117        Err(ProviderError::UnsupportedProvider)
2118    }
2119
2120    fn pending_block_and_receipts(
2121        &self,
2122    ) -> ProviderResult<Option<(RecoveredBlock<Self::Block>, Vec<Self::Receipt>)>> {
2123        // Required data not present in static_files
2124        Err(ProviderError::UnsupportedProvider)
2125    }
2126
2127    fn recovered_block(
2128        &self,
2129        _id: BlockHashOrNumber,
2130        _transaction_kind: TransactionVariant,
2131    ) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
2132        // Required data not present in static_files
2133        Err(ProviderError::UnsupportedProvider)
2134    }
2135
2136    fn sealed_block_with_senders(
2137        &self,
2138        _id: BlockHashOrNumber,
2139        _transaction_kind: TransactionVariant,
2140    ) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
2141        // Required data not present in static_files
2142        Err(ProviderError::UnsupportedProvider)
2143    }
2144
2145    fn block_range(&self, _range: RangeInclusive<BlockNumber>) -> ProviderResult<Vec<Self::Block>> {
2146        // Required data not present in static_files
2147        Err(ProviderError::UnsupportedProvider)
2148    }
2149
2150    fn block_with_senders_range(
2151        &self,
2152        _range: RangeInclusive<BlockNumber>,
2153    ) -> ProviderResult<Vec<RecoveredBlock<Self::Block>>> {
2154        Err(ProviderError::UnsupportedProvider)
2155    }
2156
2157    fn recovered_block_range(
2158        &self,
2159        _range: RangeInclusive<BlockNumber>,
2160    ) -> ProviderResult<Vec<RecoveredBlock<Self::Block>>> {
2161        Err(ProviderError::UnsupportedProvider)
2162    }
2163
2164    fn block_by_transaction_id(&self, _id: TxNumber) -> ProviderResult<Option<BlockNumber>> {
2165        Err(ProviderError::UnsupportedProvider)
2166    }
2167}
2168
2169impl<N: NodePrimitives> BlockBodyIndicesProvider for StaticFileProvider<N> {
2170    fn block_body_indices(&self, _num: u64) -> ProviderResult<Option<StoredBlockBodyIndices>> {
2171        Err(ProviderError::UnsupportedProvider)
2172    }
2173
2174    fn block_body_indices_range(
2175        &self,
2176        _range: RangeInclusive<BlockNumber>,
2177    ) -> ProviderResult<Vec<StoredBlockBodyIndices>> {
2178        Err(ProviderError::UnsupportedProvider)
2179    }
2180}
2181
2182impl<N: NodePrimitives> StatsReader for StaticFileProvider<N> {
2183    fn count_entries<T: Table>(&self) -> ProviderResult<usize> {
2184        match T::NAME {
2185            tables::CanonicalHeaders::NAME |
2186            tables::Headers::<Header>::NAME |
2187            tables::HeaderTerminalDifficulties::NAME => Ok(self
2188                .get_highest_static_file_block(StaticFileSegment::Headers)
2189                .map(|block| block + 1)
2190                .unwrap_or_default()
2191                as usize),
2192            tables::Receipts::<Receipt>::NAME => Ok(self
2193                .get_highest_static_file_tx(StaticFileSegment::Receipts)
2194                .map(|receipts| receipts + 1)
2195                .unwrap_or_default() as usize),
2196            tables::Transactions::<TransactionSigned>::NAME => Ok(self
2197                .get_highest_static_file_tx(StaticFileSegment::Transactions)
2198                .map(|txs| txs + 1)
2199                .unwrap_or_default()
2200                as usize),
2201            tables::TransactionSenders::NAME => Ok(self
2202                .get_highest_static_file_tx(StaticFileSegment::TransactionSenders)
2203                .map(|txs| txs + 1)
2204                .unwrap_or_default() as usize),
2205            _ => Err(ProviderError::UnsupportedProvider),
2206        }
2207    }
2208}
2209
2210/// Calculates the tx hash for the given transaction and its id.
2211#[inline]
2212fn calculate_hash<T>(
2213    entry: (TxNumber, T),
2214    rlp_buf: &mut Vec<u8>,
2215) -> Result<(B256, TxNumber), Box<ProviderError>>
2216where
2217    T: Encodable2718,
2218{
2219    let (tx_id, tx) = entry;
2220    tx.encode_2718(rlp_buf);
2221    Ok((keccak256(rlp_buf), tx_id))
2222}
2223
2224#[cfg(test)]
2225mod tests {
2226    use std::collections::BTreeMap;
2227
2228    use reth_chain_state::EthPrimitives;
2229    use reth_db::test_utils::create_test_static_files_dir;
2230    use reth_static_file_types::{SegmentRangeInclusive, StaticFileSegment};
2231
2232    use crate::StaticFileProviderBuilder;
2233
2234    #[test]
2235    fn test_find_fixed_range_with_block_index() -> eyre::Result<()> {
2236        let (static_dir, _) = create_test_static_files_dir();
2237        let sf_rw = StaticFileProviderBuilder::<EthPrimitives>::read_write(&static_dir)?
2238            .with_blocks_per_file(100)
2239            .build()?;
2240
2241        let segment = StaticFileSegment::Headers;
2242
2243        // Test with None - should use default behavior
2244        assert_eq!(
2245            sf_rw.find_fixed_range_with_block_index(segment, None, 0),
2246            SegmentRangeInclusive::new(0, 99)
2247        );
2248        assert_eq!(
2249            sf_rw.find_fixed_range_with_block_index(segment, None, 250),
2250            SegmentRangeInclusive::new(200, 299)
2251        );
2252
2253        // Test with empty index - should fall back to default behavior
2254        assert_eq!(
2255            sf_rw.find_fixed_range_with_block_index(segment, Some(&BTreeMap::new()), 150),
2256            SegmentRangeInclusive::new(100, 199)
2257        );
2258
2259        // Create block index with existing ranges
2260        let block_index = BTreeMap::from_iter([
2261            (99, SegmentRangeInclusive::new(0, 99)),
2262            (199, SegmentRangeInclusive::new(100, 199)),
2263            (299, SegmentRangeInclusive::new(200, 299)),
2264        ]);
2265
2266        // Test blocks within existing ranges - should return the matching range
2267        assert_eq!(
2268            sf_rw.find_fixed_range_with_block_index(segment, Some(&block_index), 0),
2269            SegmentRangeInclusive::new(0, 99)
2270        );
2271        assert_eq!(
2272            sf_rw.find_fixed_range_with_block_index(segment, Some(&block_index), 50),
2273            SegmentRangeInclusive::new(0, 99)
2274        );
2275        assert_eq!(
2276            sf_rw.find_fixed_range_with_block_index(segment, Some(&block_index), 99),
2277            SegmentRangeInclusive::new(0, 99)
2278        );
2279        assert_eq!(
2280            sf_rw.find_fixed_range_with_block_index(segment, Some(&block_index), 100),
2281            SegmentRangeInclusive::new(100, 199)
2282        );
2283        assert_eq!(
2284            sf_rw.find_fixed_range_with_block_index(segment, Some(&block_index), 150),
2285            SegmentRangeInclusive::new(100, 199)
2286        );
2287        assert_eq!(
2288            sf_rw.find_fixed_range_with_block_index(segment, Some(&block_index), 199),
2289            SegmentRangeInclusive::new(100, 199)
2290        );
2291
2292        // Test blocks beyond existing ranges - should derive new ranges from the last range
2293        // Block 300 is exactly one segment after the last range
2294        assert_eq!(
2295            sf_rw.find_fixed_range_with_block_index(segment, Some(&block_index), 300),
2296            SegmentRangeInclusive::new(300, 399)
2297        );
2298        assert_eq!(
2299            sf_rw.find_fixed_range_with_block_index(segment, Some(&block_index), 350),
2300            SegmentRangeInclusive::new(300, 399)
2301        );
2302
2303        // Block 500 skips one segment (300-399)
2304        assert_eq!(
2305            sf_rw.find_fixed_range_with_block_index(segment, Some(&block_index), 500),
2306            SegmentRangeInclusive::new(500, 599)
2307        );
2308
2309        // Block 1000 skips many segments
2310        assert_eq!(
2311            sf_rw.find_fixed_range_with_block_index(segment, Some(&block_index), 1000),
2312            SegmentRangeInclusive::new(1000, 1099)
2313        );
2314
2315        // Test with block index having different sizes than blocks_per_file setting
2316        // This simulates the scenario where blocks_per_file was changed between runs
2317        let mixed_size_index = BTreeMap::from_iter([
2318            (49, SegmentRangeInclusive::new(0, 49)),     // 50 blocks
2319            (149, SegmentRangeInclusive::new(50, 149)),  // 100 blocks
2320            (349, SegmentRangeInclusive::new(150, 349)), // 200 blocks
2321        ]);
2322
2323        // Blocks within existing ranges should return those ranges regardless of size
2324        assert_eq!(
2325            sf_rw.find_fixed_range_with_block_index(segment, Some(&mixed_size_index), 25),
2326            SegmentRangeInclusive::new(0, 49)
2327        );
2328        assert_eq!(
2329            sf_rw.find_fixed_range_with_block_index(segment, Some(&mixed_size_index), 100),
2330            SegmentRangeInclusive::new(50, 149)
2331        );
2332        assert_eq!(
2333            sf_rw.find_fixed_range_with_block_index(segment, Some(&mixed_size_index), 200),
2334            SegmentRangeInclusive::new(150, 349)
2335        );
2336
2337        // Block after the last range should derive using current blocks_per_file (100)
2338        // from the end of the last range (349)
2339        assert_eq!(
2340            sf_rw.find_fixed_range_with_block_index(segment, Some(&mixed_size_index), 350),
2341            SegmentRangeInclusive::new(350, 449)
2342        );
2343        assert_eq!(
2344            sf_rw.find_fixed_range_with_block_index(segment, Some(&mixed_size_index), 450),
2345            SegmentRangeInclusive::new(450, 549)
2346        );
2347        assert_eq!(
2348            sf_rw.find_fixed_range_with_block_index(segment, Some(&mixed_size_index), 550),
2349            SegmentRangeInclusive::new(550, 649)
2350        );
2351
2352        Ok(())
2353    }
2354}