reth_provider/providers/static_file/
manager.rs

1use super::{
2    metrics::StaticFileProviderMetrics, writer::StaticFileWriters, LoadedJar,
3    StaticFileJarProvider, StaticFileProviderRW, StaticFileProviderRWRefMut,
4};
5use crate::{
6    to_range, BlockHashReader, BlockNumReader, BlockReader, BlockSource, EitherWriter,
7    EitherWriterDestination, HeaderProvider, ReceiptProvider, StageCheckpointReader, StatsReader,
8    TransactionVariant, TransactionsProvider, TransactionsProviderExt,
9};
10use alloy_consensus::{transaction::TransactionMeta, Header};
11use alloy_eips::{eip2718::Encodable2718, BlockHashOrNumber};
12use alloy_primitives::{b256, keccak256, Address, BlockHash, BlockNumber, TxHash, TxNumber, B256};
13use dashmap::DashMap;
14use notify::{RecommendedWatcher, RecursiveMode, Watcher};
15use parking_lot::RwLock;
16use reth_chainspec::{ChainInfo, ChainSpecProvider, EthChainSpec, NamedChain};
17use reth_db::{
18    lockfile::StorageLock,
19    static_file::{
20        iter_static_files, BlockHashMask, HeaderMask, HeaderWithHashMask, ReceiptMask,
21        StaticFileCursor, TransactionMask, TransactionSenderMask,
22    },
23};
24use reth_db_api::{
25    cursor::DbCursorRO,
26    models::StoredBlockBodyIndices,
27    table::{Decompress, Table, Value},
28    tables,
29    transaction::DbTx,
30};
31use reth_ethereum_primitives::{Receipt, TransactionSigned};
32use reth_nippy_jar::{NippyJar, NippyJarChecker, CONFIG_FILE_EXTENSION};
33use reth_node_types::NodePrimitives;
34use reth_primitives_traits::{RecoveredBlock, SealedHeader, SignedTransaction};
35use reth_stages_types::{PipelineTarget, StageId};
36use reth_static_file_types::{
37    find_fixed_range, HighestStaticFiles, SegmentHeader, SegmentRangeInclusive, StaticFileSegment,
38    DEFAULT_BLOCKS_PER_STATIC_FILE,
39};
40use reth_storage_api::{BlockBodyIndicesProvider, DBProvider, StorageSettingsCache};
41use reth_storage_errors::provider::{ProviderError, ProviderResult};
42use std::{
43    collections::{BTreeMap, HashMap},
44    fmt::Debug,
45    ops::{Deref, Range, RangeBounds, RangeInclusive},
46    path::{Path, PathBuf},
47    sync::{atomic::AtomicU64, mpsc, Arc},
48};
49use tracing::{debug, info, trace, warn};
50
51/// Alias type for a map that can be queried for block or transaction ranges. It uses `u64` to
52/// represent either a block or a transaction number end of a static file range.
53type SegmentRanges = BTreeMap<u64, SegmentRangeInclusive>;
54
55/// Access mode on a static file provider. RO/RW.
56#[derive(Debug, Default, PartialEq, Eq)]
57pub enum StaticFileAccess {
58    /// Read-only access.
59    #[default]
60    RO,
61    /// Read-write access.
62    RW,
63}
64
65impl StaticFileAccess {
66    /// Returns `true` if read-only access.
67    pub const fn is_read_only(&self) -> bool {
68        matches!(self, Self::RO)
69    }
70
71    /// Returns `true` if read-write access.
72    pub const fn is_read_write(&self) -> bool {
73        matches!(self, Self::RW)
74    }
75}
76
77/// [`StaticFileProvider`] manages all existing [`StaticFileJarProvider`].
78///
79/// "Static files" contain immutable chain history data, such as:
80///  - transactions
81///  - headers
82///  - receipts
83///
84/// This provider type is responsible for reading and writing to static files.
85#[derive(Debug)]
86pub struct StaticFileProvider<N>(pub(crate) Arc<StaticFileProviderInner<N>>);
87
88impl<N> Clone for StaticFileProvider<N> {
89    fn clone(&self) -> Self {
90        Self(self.0.clone())
91    }
92}
93
94/// Builder for [`StaticFileProvider`] that allows configuration before initialization.
95#[derive(Debug)]
96pub struct StaticFileProviderBuilder<N> {
97    inner: StaticFileProviderInner<N>,
98}
99
100impl<N: NodePrimitives> StaticFileProviderBuilder<N> {
101    /// Creates a new builder with read-write access.
102    pub fn read_write(path: impl AsRef<Path>) -> ProviderResult<Self> {
103        StaticFileProviderInner::new(path, StaticFileAccess::RW).map(|inner| Self { inner })
104    }
105
106    /// Creates a new builder with read-only access.
107    pub fn read_only(path: impl AsRef<Path>) -> ProviderResult<Self> {
108        StaticFileProviderInner::new(path, StaticFileAccess::RO).map(|inner| Self { inner })
109    }
110
111    /// Set custom blocks per file for specific segments.
112    ///
113    /// Each static file segment is stored across multiple files, and each of these files contains
114    /// up to the specified number of blocks of data. When the file gets full, a new file is
115    /// created with the new block range.
116    ///
117    /// This setting affects the size of each static file, and can be set per segment.
118    ///
119    /// If it is changed for an existing node, existing static files will not be affected and will
120    /// be finished with the old blocks per file setting, but new static files will use the new
121    /// setting.
122    pub fn with_blocks_per_file_for_segments(
123        mut self,
124        segments: HashMap<StaticFileSegment, u64>,
125    ) -> Self {
126        self.inner.blocks_per_file.extend(segments);
127        self
128    }
129
130    /// Set a custom number of blocks per file for all segments.
131    pub fn with_blocks_per_file(mut self, blocks_per_file: u64) -> Self {
132        for segment in StaticFileSegment::iter() {
133            self.inner.blocks_per_file.insert(segment, blocks_per_file);
134        }
135        self
136    }
137
138    /// Set a custom number of blocks per file for a specific segment.
139    pub fn with_blocks_per_file_for_segment(
140        mut self,
141        segment: StaticFileSegment,
142        blocks_per_file: u64,
143    ) -> Self {
144        self.inner.blocks_per_file.insert(segment, blocks_per_file);
145        self
146    }
147
148    /// Enables metrics on the [`StaticFileProvider`].
149    pub fn with_metrics(mut self) -> Self {
150        self.inner.metrics = Some(Arc::new(StaticFileProviderMetrics::default()));
151        self
152    }
153
154    /// Builds the final [`StaticFileProvider`] and initializes the index.
155    pub fn build(self) -> ProviderResult<StaticFileProvider<N>> {
156        let provider = StaticFileProvider(Arc::new(self.inner));
157        provider.initialize_index()?;
158        Ok(provider)
159    }
160}
161
162impl<N: NodePrimitives> StaticFileProvider<N> {
163    /// Creates a new [`StaticFileProvider`] with read-only access.
164    ///
165    /// Set `watch_directory` to `true` to track the most recent changes in static files. Otherwise,
166    /// new data won't be detected or queryable.
167    ///
168    /// Watching is recommended if the read-only provider is used on a directory that an active node
169    /// instance is modifying.
170    ///
171    /// See also [`StaticFileProvider::watch_directory`].
172    pub fn read_only(path: impl AsRef<Path>, watch_directory: bool) -> ProviderResult<Self> {
173        let provider = StaticFileProviderBuilder::read_only(path)?.build()?;
174
175        if watch_directory {
176            provider.watch_directory();
177        }
178
179        Ok(provider)
180    }
181
182    /// Creates a new [`StaticFileProvider`] with read-write access.
183    pub fn read_write(path: impl AsRef<Path>) -> ProviderResult<Self> {
184        StaticFileProviderBuilder::read_write(path)?.build()
185    }
186
187    /// Watches the directory for changes and updates the in-memory index when modifications
188    /// are detected.
189    ///
190    /// This may be necessary, since a non-node process that owns a [`StaticFileProvider`] does not
191    /// receive `update_index` notifications from a node that appends/truncates data.
192    pub fn watch_directory(&self) {
193        let provider = self.clone();
194        std::thread::spawn(move || {
195            let (tx, rx) = std::sync::mpsc::channel();
196            let mut watcher = RecommendedWatcher::new(
197                move |res| tx.send(res).unwrap(),
198                notify::Config::default(),
199            )
200            .expect("failed to create watcher");
201
202            watcher
203                .watch(&provider.path, RecursiveMode::NonRecursive)
204                .expect("failed to watch path");
205
206            // Some backends send repeated modified events
207            let mut last_event_timestamp = None;
208
209            while let Ok(res) = rx.recv() {
210                match res {
211                    Ok(event) => {
212                        // We only care about modified data events
213                        if !matches!(
214                            event.kind,
215                            notify::EventKind::Modify(_) |
216                                notify::EventKind::Create(_) |
217                                notify::EventKind::Remove(_)
218                        ) {
219                            continue
220                        }
221
222                        // We only trigger a re-initialization if a configuration file was
223                        // modified. This means that a
224                        // static_file_provider.commit() was called on the node after
225                        // appending/truncating rows
226                        for segment in event.paths {
227                            // Ensure it's a file with the .conf extension
228                            if segment
229                                .extension()
230                                .is_none_or(|s| s.to_str() != Some(CONFIG_FILE_EXTENSION))
231                            {
232                                continue
233                            }
234
235                            // Ensure it's well formatted static file name
236                            if StaticFileSegment::parse_filename(
237                                &segment.file_stem().expect("qed").to_string_lossy(),
238                            )
239                            .is_none()
240                            {
241                                continue
242                            }
243
244                            // If we can read the metadata and modified timestamp, ensure this is
245                            // not an old or repeated event.
246                            if let Ok(current_modified_timestamp) =
247                                std::fs::metadata(&segment).and_then(|m| m.modified())
248                            {
249                                if last_event_timestamp.is_some_and(|last_timestamp| {
250                                    last_timestamp >= current_modified_timestamp
251                                }) {
252                                    continue
253                                }
254                                last_event_timestamp = Some(current_modified_timestamp);
255                            }
256
257                            info!(target: "providers::static_file", updated_file = ?segment.file_stem(), "re-initializing static file provider index");
258                            if let Err(err) = provider.initialize_index() {
259                                warn!(target: "providers::static_file", "failed to re-initialize index: {err}");
260                            }
261                            break
262                        }
263                    }
264
265                    Err(err) => warn!(target: "providers::watcher", "watch error: {err:?}"),
266                }
267            }
268        });
269    }
270}
271
272impl<N: NodePrimitives> Deref for StaticFileProvider<N> {
273    type Target = StaticFileProviderInner<N>;
274
275    fn deref(&self) -> &Self::Target {
276        &self.0
277    }
278}
279
280/// [`StaticFileProviderInner`] manages all existing [`StaticFileJarProvider`].
281#[derive(Debug)]
282pub struct StaticFileProviderInner<N> {
283    /// Maintains a map which allows for concurrent access to different `NippyJars`, over different
284    /// segments and ranges.
285    map: DashMap<(BlockNumber, StaticFileSegment), LoadedJar>,
286    /// Indexes per segment.
287    indexes: RwLock<HashMap<StaticFileSegment, StaticFileSegmentIndex>>,
288    /// This is an additional index that tracks the expired height, this will track the highest
289    /// block number that has been expired (missing). The first, non expired block is
290    /// `expired_history_height + 1`.
291    ///
292    /// This is effectively the transaction range that has been expired:
293    /// [`StaticFileProvider::delete_segment_below_block`] and mirrors
294    /// `static_files_min_block[transactions] - blocks_per_file`.
295    ///
296    /// This additional tracker exists for more efficient lookups because the node must be aware of
297    /// the expired height.
298    earliest_history_height: AtomicU64,
299    /// Directory where `static_files` are located
300    path: PathBuf,
301    /// Maintains a writer set of [`StaticFileSegment`].
302    writers: StaticFileWriters<N>,
303    /// Metrics for the static files.
304    metrics: Option<Arc<StaticFileProviderMetrics>>,
305    /// Access rights of the provider.
306    access: StaticFileAccess,
307    /// Number of blocks per file, per segment.
308    blocks_per_file: HashMap<StaticFileSegment, u64>,
309    /// Write lock for when access is [`StaticFileAccess::RW`].
310    _lock_file: Option<StorageLock>,
311}
312
313impl<N: NodePrimitives> StaticFileProviderInner<N> {
314    /// Creates a new [`StaticFileProviderInner`].
315    fn new(path: impl AsRef<Path>, access: StaticFileAccess) -> ProviderResult<Self> {
316        let _lock_file = if access.is_read_write() {
317            StorageLock::try_acquire(path.as_ref()).map_err(ProviderError::other)?.into()
318        } else {
319            None
320        };
321
322        let mut blocks_per_file = HashMap::new();
323        for segment in StaticFileSegment::iter() {
324            blocks_per_file.insert(segment, DEFAULT_BLOCKS_PER_STATIC_FILE);
325        }
326
327        let provider = Self {
328            map: Default::default(),
329            indexes: Default::default(),
330            writers: Default::default(),
331            earliest_history_height: Default::default(),
332            path: path.as_ref().to_path_buf(),
333            metrics: None,
334            access,
335            blocks_per_file,
336            _lock_file,
337        };
338
339        Ok(provider)
340    }
341
342    pub const fn is_read_only(&self) -> bool {
343        self.access.is_read_only()
344    }
345
346    /// Each static file has a fixed number of blocks. This gives out the range where the requested
347    /// block is positioned.
348    ///
349    /// If the specified block falls into one of the ranges of already initialized static files,
350    /// this function will return that range.
351    ///
352    /// If no matching file exists, this function will derive a new range from the end of the last
353    /// existing file, if any.
354    pub fn find_fixed_range_with_block_index(
355        &self,
356        segment: StaticFileSegment,
357        block_index: Option<&SegmentRanges>,
358        block: BlockNumber,
359    ) -> SegmentRangeInclusive {
360        let blocks_per_file =
361            self.blocks_per_file.get(&segment).copied().unwrap_or(DEFAULT_BLOCKS_PER_STATIC_FILE);
362
363        if let Some(block_index) = block_index {
364            // Find first block range that contains the requested block
365            if let Some((_, range)) = block_index.iter().find(|(max_block, _)| block <= **max_block)
366            {
367                // Found matching range for an existing file using block index
368                return *range
369            } else if let Some((_, range)) = block_index.last_key_value() {
370                // Didn't find matching range for an existing file, derive a new range from the end
371                // of the last existing file range.
372                //
373                // `block` is always higher than `range.end()` here, because we iterated over all
374                // `block_index` ranges above and didn't find one that contains our block
375                let blocks_after_last_range = block - range.end();
376                let segments_to_skip = (blocks_after_last_range - 1) / blocks_per_file;
377                let start = range.end() + 1 + segments_to_skip * blocks_per_file;
378                return SegmentRangeInclusive::new(start, start + blocks_per_file - 1)
379            }
380        }
381        // No block index is available, derive a new range using the fixed number of blocks,
382        // starting from the beginning.
383        find_fixed_range(block, blocks_per_file)
384    }
385
386    /// Each static file has a fixed number of blocks. This gives out the range where the requested
387    /// block is positioned.
388    ///
389    /// If the specified block falls into one of the ranges of already initialized static files,
390    /// this function will return that range.
391    ///
392    /// If no matching file exists, this function will derive a new range from the end of the last
393    /// existing file, if any.
394    ///
395    /// This function will block indefinitely if a write lock for
396    /// [`Self::indexes`] is already acquired. In that case, use
397    /// [`Self::find_fixed_range_with_block_index`].
398    pub fn find_fixed_range(
399        &self,
400        segment: StaticFileSegment,
401        block: BlockNumber,
402    ) -> SegmentRangeInclusive {
403        self.find_fixed_range_with_block_index(
404            segment,
405            self.indexes
406                .read()
407                .get(&segment)
408                .map(|index| &index.expected_block_ranges_by_max_block),
409            block,
410        )
411    }
412}
413
414impl<N: NodePrimitives> StaticFileProvider<N> {
415    /// Reports metrics for the static files.
416    pub fn report_metrics(&self) -> ProviderResult<()> {
417        let Some(metrics) = &self.metrics else { return Ok(()) };
418
419        let static_files = iter_static_files(&self.path).map_err(ProviderError::other)?;
420        for (segment, headers) in static_files {
421            let mut entries = 0;
422            let mut size = 0;
423
424            for (block_range, _) in &headers {
425                let fixed_block_range = self.find_fixed_range(segment, block_range.start());
426                let jar_provider = self
427                    .get_segment_provider_for_range(segment, || Some(fixed_block_range), None)?
428                    .ok_or_else(|| {
429                        ProviderError::MissingStaticFileBlock(segment, block_range.start())
430                    })?;
431
432                entries += jar_provider.rows();
433
434                let data_size = reth_fs_util::metadata(jar_provider.data_path())
435                    .map(|metadata| metadata.len())
436                    .unwrap_or_default();
437                let index_size = reth_fs_util::metadata(jar_provider.index_path())
438                    .map(|metadata| metadata.len())
439                    .unwrap_or_default();
440                let offsets_size = reth_fs_util::metadata(jar_provider.offsets_path())
441                    .map(|metadata| metadata.len())
442                    .unwrap_or_default();
443                let config_size = reth_fs_util::metadata(jar_provider.config_path())
444                    .map(|metadata| metadata.len())
445                    .unwrap_or_default();
446
447                size += data_size + index_size + offsets_size + config_size;
448            }
449
450            metrics.record_segment(segment, size, headers.len(), entries);
451        }
452
453        Ok(())
454    }
455
456    /// Gets the [`StaticFileJarProvider`] of the requested segment and start index that can be
457    /// either block or transaction.
458    pub fn get_segment_provider(
459        &self,
460        segment: StaticFileSegment,
461        number: u64,
462    ) -> ProviderResult<StaticFileJarProvider<'_, N>> {
463        if segment.is_block_based() {
464            self.get_segment_provider_for_block(segment, number, None)
465        } else {
466            self.get_segment_provider_for_transaction(segment, number, None)
467        }
468    }
469
470    /// Gets the [`StaticFileJarProvider`] of the requested segment and start index that can be
471    /// either block or transaction.
472    ///
473    /// If the segment is not found, returns [`None`].
474    pub fn get_maybe_segment_provider(
475        &self,
476        segment: StaticFileSegment,
477        number: u64,
478    ) -> ProviderResult<Option<StaticFileJarProvider<'_, N>>> {
479        let provider = if segment.is_block_based() {
480            self.get_segment_provider_for_block(segment, number, None)
481        } else {
482            self.get_segment_provider_for_transaction(segment, number, None)
483        };
484
485        match provider {
486            Ok(provider) => Ok(Some(provider)),
487            Err(
488                ProviderError::MissingStaticFileBlock(_, _) |
489                ProviderError::MissingStaticFileTx(_, _),
490            ) => Ok(None),
491            Err(err) => Err(err),
492        }
493    }
494
495    /// Gets the [`StaticFileJarProvider`] of the requested segment and block.
496    pub fn get_segment_provider_for_block(
497        &self,
498        segment: StaticFileSegment,
499        block: BlockNumber,
500        path: Option<&Path>,
501    ) -> ProviderResult<StaticFileJarProvider<'_, N>> {
502        self.get_segment_provider_for_range(
503            segment,
504            || self.get_segment_ranges_from_block(segment, block),
505            path,
506        )?
507        .ok_or(ProviderError::MissingStaticFileBlock(segment, block))
508    }
509
510    /// Gets the [`StaticFileJarProvider`] of the requested segment and transaction.
511    pub fn get_segment_provider_for_transaction(
512        &self,
513        segment: StaticFileSegment,
514        tx: TxNumber,
515        path: Option<&Path>,
516    ) -> ProviderResult<StaticFileJarProvider<'_, N>> {
517        self.get_segment_provider_for_range(
518            segment,
519            || self.get_segment_ranges_from_transaction(segment, tx),
520            path,
521        )?
522        .ok_or(ProviderError::MissingStaticFileTx(segment, tx))
523    }
524
525    /// Gets the [`StaticFileJarProvider`] of the requested segment and block or transaction.
526    ///
527    /// `fn_range` should make sure the range goes through `find_fixed_range`.
528    pub fn get_segment_provider_for_range(
529        &self,
530        segment: StaticFileSegment,
531        fn_range: impl Fn() -> Option<SegmentRangeInclusive>,
532        path: Option<&Path>,
533    ) -> ProviderResult<Option<StaticFileJarProvider<'_, N>>> {
534        // If we have a path, then get the block range from its name.
535        // Otherwise, check `self.available_static_files`
536        let block_range = match path {
537            Some(path) => StaticFileSegment::parse_filename(
538                &path
539                    .file_name()
540                    .ok_or_else(|| {
541                        ProviderError::MissingStaticFileSegmentPath(segment, path.to_path_buf())
542                    })?
543                    .to_string_lossy(),
544            )
545            .and_then(|(parsed_segment, block_range)| {
546                if parsed_segment == segment {
547                    return Some(block_range)
548                }
549                None
550            }),
551            None => fn_range(),
552        };
553
554        // Return cached `LoadedJar` or insert it for the first time, and then, return it.
555        if let Some(block_range) = block_range {
556            return Ok(Some(self.get_or_create_jar_provider(segment, &block_range)?))
557        }
558
559        Ok(None)
560    }
561
562    /// Gets the [`StaticFileJarProvider`] of the requested path.
563    pub fn get_segment_provider_for_path(
564        &self,
565        path: &Path,
566    ) -> ProviderResult<Option<StaticFileJarProvider<'_, N>>> {
567        StaticFileSegment::parse_filename(
568            &path
569                .file_name()
570                .ok_or_else(|| ProviderError::MissingStaticFilePath(path.to_path_buf()))?
571                .to_string_lossy(),
572        )
573        .map(|(segment, block_range)| self.get_or_create_jar_provider(segment, &block_range))
574        .transpose()
575    }
576
577    /// Given a segment and block range it removes the cached provider from the map.
578    ///
579    /// CAUTION: cached provider should be dropped before calling this or IT WILL deadlock.
580    pub fn remove_cached_provider(
581        &self,
582        segment: StaticFileSegment,
583        fixed_block_range_end: BlockNumber,
584    ) {
585        self.map.remove(&(fixed_block_range_end, segment));
586    }
587
588    /// This handles history expiry by deleting all static files for the given segment below the
589    /// given block.
590    ///
591    /// For example if block is 1M and the blocks per file are 500K this will delete all individual
592    /// files below 1M, so 0-499K and 500K-999K.
593    ///
594    /// This will not delete the file that contains the block itself, because files can only be
595    /// removed entirely.
596    ///
597    /// # Safety
598    ///
599    /// This method will never delete the highest static file for the segment, even if the
600    /// requested block is higher than the highest block in static files. This ensures we always
601    /// maintain at least one static file if any exist.
602    ///
603    /// Returns a list of `SegmentHeader`s from the deleted jars.
604    pub fn delete_segment_below_block(
605        &self,
606        segment: StaticFileSegment,
607        block: BlockNumber,
608    ) -> ProviderResult<Vec<SegmentHeader>> {
609        // Nothing to delete if block is 0.
610        if block == 0 {
611            return Ok(Vec::new())
612        }
613
614        let highest_block = self.get_highest_static_file_block(segment);
615        let mut deleted_headers = Vec::new();
616
617        loop {
618            let Some(block_height) = self.get_lowest_range_end(segment) else {
619                return Ok(deleted_headers)
620            };
621
622            // Stop if we've reached the target block or the highest static file
623            if block_height >= block || Some(block_height) == highest_block {
624                return Ok(deleted_headers)
625            }
626
627            debug!(
628                target: "provider::static_file",
629                ?segment,
630                ?block_height,
631                "Deleting static file below block"
632            );
633
634            // now we need to wipe the static file, this will take care of updating the index and
635            // advance the lowest tracked block height for the segment.
636            let header = self.delete_jar(segment, block_height).inspect_err(|err| {
637                warn!( target: "provider::static_file", ?segment, %block_height, ?err, "Failed to delete static file below block")
638            })?;
639
640            deleted_headers.push(header);
641        }
642    }
643
644    /// Given a segment and block, it deletes the jar and all files from the respective block range.
645    ///
646    /// CAUTION: destructive. Deletes files on disk.
647    ///
648    /// This will re-initialize the index after deletion, so all files are tracked.
649    ///
650    /// Returns the `SegmentHeader` of the deleted jar.
651    pub fn delete_jar(
652        &self,
653        segment: StaticFileSegment,
654        block: BlockNumber,
655    ) -> ProviderResult<SegmentHeader> {
656        let fixed_block_range = self.find_fixed_range(segment, block);
657        let key = (fixed_block_range.end(), segment);
658        let jar = if let Some((_, jar)) = self.map.remove(&key) {
659            jar.jar
660        } else {
661            let file = self.path.join(segment.filename(&fixed_block_range));
662            debug!(
663                target: "provider::static_file",
664                ?file,
665                ?fixed_block_range,
666                ?block,
667                "Loading static file jar for deletion"
668            );
669            NippyJar::<SegmentHeader>::load(&file).map_err(ProviderError::other)?
670        };
671
672        let header = *jar.user_header();
673        jar.delete().map_err(ProviderError::other)?;
674
675        // SAFETY: this is currently necessary to ensure that certain indexes like
676        // `static_files_min_block` have the correct values after pruning.
677        self.initialize_index()?;
678
679        Ok(header)
680    }
681
682    /// Given a segment and block range it returns a cached
683    /// [`StaticFileJarProvider`]. TODO(joshie): we should check the size and pop N if there's too
684    /// many.
685    fn get_or_create_jar_provider(
686        &self,
687        segment: StaticFileSegment,
688        fixed_block_range: &SegmentRangeInclusive,
689    ) -> ProviderResult<StaticFileJarProvider<'_, N>> {
690        let key = (fixed_block_range.end(), segment);
691
692        // Avoid using `entry` directly to avoid a write lock in the common case.
693        trace!(target: "provider::static_file", ?segment, ?fixed_block_range, "Getting provider");
694        let mut provider: StaticFileJarProvider<'_, N> = if let Some(jar) = self.map.get(&key) {
695            trace!(target: "provider::static_file", ?segment, ?fixed_block_range, "Jar found in cache");
696            jar.into()
697        } else {
698            trace!(target: "provider::static_file", ?segment, ?fixed_block_range, "Creating jar from scratch");
699            let path = self.path.join(segment.filename(fixed_block_range));
700            let jar = NippyJar::load(&path).map_err(ProviderError::other)?;
701            self.map.entry(key).insert(LoadedJar::new(jar)?).downgrade().into()
702        };
703
704        if let Some(metrics) = &self.metrics {
705            provider = provider.with_metrics(metrics.clone());
706        }
707        Ok(provider)
708    }
709
710    /// Gets a static file segment's block range from the provider inner block
711    /// index.
712    fn get_segment_ranges_from_block(
713        &self,
714        segment: StaticFileSegment,
715        block: u64,
716    ) -> Option<SegmentRangeInclusive> {
717        let indexes = self.indexes.read();
718        let index = indexes.get(&segment)?;
719
720        (index.max_block >= block).then(|| {
721            self.find_fixed_range_with_block_index(
722                segment,
723                Some(&index.expected_block_ranges_by_max_block),
724                block,
725            )
726        })
727    }
728
729    /// Gets a static file segment's fixed block range from the provider inner
730    /// transaction index.
731    fn get_segment_ranges_from_transaction(
732        &self,
733        segment: StaticFileSegment,
734        tx: u64,
735    ) -> Option<SegmentRangeInclusive> {
736        let indexes = self.indexes.read();
737        let index = indexes.get(&segment)?;
738        let available_block_ranges_by_max_tx = index.available_block_ranges_by_max_tx.as_ref()?;
739
740        // It's more probable that the request comes from a newer tx height, so we iterate
741        // the static_files in reverse.
742        let mut static_files_rev_iter = available_block_ranges_by_max_tx.iter().rev().peekable();
743
744        while let Some((tx_end, block_range)) = static_files_rev_iter.next() {
745            if tx > *tx_end {
746                // request tx is higher than highest static file tx
747                return None
748            }
749            let tx_start = static_files_rev_iter.peek().map(|(tx_end, _)| *tx_end + 1).unwrap_or(0);
750            if tx_start <= tx {
751                return Some(self.find_fixed_range_with_block_index(
752                    segment,
753                    Some(&index.expected_block_ranges_by_max_block),
754                    block_range.end(),
755                ))
756            }
757        }
758        None
759    }
760
761    /// Updates the inner transaction and block indexes alongside the internal cached providers in
762    /// `self.map`.
763    ///
764    /// Any entry higher than `segment_max_block` will be deleted from the previous structures.
765    ///
766    /// If `segment_max_block` is None it means there's no static file for this segment.
767    pub fn update_index(
768        &self,
769        segment: StaticFileSegment,
770        segment_max_block: Option<BlockNumber>,
771    ) -> ProviderResult<()> {
772        debug!(
773            target: "provider::static_file",
774            ?segment,
775            ?segment_max_block,
776            "Updating provider index"
777        );
778        let mut indexes = self.indexes.write();
779
780        match segment_max_block {
781            Some(segment_max_block) => {
782                let fixed_range = self.find_fixed_range_with_block_index(
783                    segment,
784                    indexes.get(&segment).map(|index| &index.expected_block_ranges_by_max_block),
785                    segment_max_block,
786                );
787
788                let jar = NippyJar::<SegmentHeader>::load(
789                    &self.path.join(segment.filename(&fixed_range)),
790                )
791                .map_err(ProviderError::other)?;
792
793                let index = indexes
794                    .entry(segment)
795                    .and_modify(|index| {
796                        // Update max block
797                        index.max_block = segment_max_block;
798
799                        // Update expected block range index
800
801                        // Remove all expected block ranges that are less than the new max block
802                        index
803                            .expected_block_ranges_by_max_block
804                            .retain(|_, block_range| block_range.start() < fixed_range.start());
805                        // Insert new expected block range
806                        index
807                            .expected_block_ranges_by_max_block
808                            .insert(fixed_range.end(), fixed_range);
809                    })
810                    .or_insert_with(|| StaticFileSegmentIndex {
811                        min_block_range: None,
812                        max_block: segment_max_block,
813                        expected_block_ranges_by_max_block: BTreeMap::from([(
814                            fixed_range.end(),
815                            fixed_range,
816                        )]),
817                        available_block_ranges_by_max_tx: None,
818                    });
819
820                // Update min_block to track the lowest block range of the segment.
821                // This is initially set by initialize_index() on node startup, but must be updated
822                // as the file grows to prevent stale values.
823                //
824                // Without this update, min_block can remain at genesis (e.g. Some([0..=0]) or None)
825                // even after syncing to higher blocks (e.g. [0..=100]). A stale
826                // min_block causes get_lowest_static_file_block() to return the
827                // wrong end value, which breaks pruning logic that relies on it for
828                // safety checks.
829                //
830                // Example progression:
831                // 1. Node starts, initialize_index() sets min_block = [0..=0]
832                // 2. Sync to block 100, this update sets min_block = [0..=100]
833                // 3. Pruner calls get_lowest_static_file_block() -> returns 100 (correct). Without
834                //    this update, it would incorrectly return 0 (stale)
835                if let Some(current_block_range) = jar.user_header().block_range() {
836                    if let Some(min_block_range) = index.min_block_range.as_mut() {
837                        // delete_jar WILL ALWAYS re-initialize all indexes, so we are always
838                        // sure that current_min is always the lowest.
839                        if current_block_range.start() == min_block_range.start() {
840                            *min_block_range = current_block_range;
841                        }
842                    } else {
843                        index.min_block_range = Some(current_block_range);
844                    }
845                }
846
847                // Updates the tx index by first removing all entries which have a higher
848                // block_start than our current static file.
849                if let Some(tx_range) = jar.user_header().tx_range() {
850                    // Current block range has the same block start as `fixed_range``, but block end
851                    // might be different if we are still filling this static file.
852                    if let Some(current_block_range) = jar.user_header().block_range() {
853                        let tx_end = tx_range.end();
854
855                        // Considering that `update_index` is called when we either append/truncate,
856                        // we are sure that we are handling the latest data
857                        // points.
858                        //
859                        // Here we remove every entry of the index that has a block start higher or
860                        // equal than our current one. This is important in the case
861                        // that we prune a lot of rows resulting in a file (and thus
862                        // a higher block range) deletion.
863                        if let Some(index) = index.available_block_ranges_by_max_tx.as_mut() {
864                            index
865                                .retain(|_, block_range| block_range.start() < fixed_range.start());
866                            index.insert(tx_end, current_block_range);
867                        } else {
868                            index.available_block_ranges_by_max_tx =
869                                Some(BTreeMap::from([(tx_end, current_block_range)]));
870                        }
871                    }
872                } else if segment.is_tx_based() {
873                    // The unwinded file has no more transactions/receipts. However, the highest
874                    // block is within this files' block range. We only retain
875                    // entries with block ranges before the current one.
876                    if let Some(index) = index.available_block_ranges_by_max_tx.as_mut() {
877                        index.retain(|_, block_range| block_range.start() < fixed_range.start());
878                    }
879
880                    // If the index is empty, just remove it.
881                    index.available_block_ranges_by_max_tx.take_if(|index| index.is_empty());
882                }
883
884                // Update the cached provider.
885                debug!(target: "provider::static_file", ?segment, "Inserting updated jar into cache");
886                self.map.insert((fixed_range.end(), segment), LoadedJar::new(jar)?);
887
888                // Delete any cached provider that no longer has an associated jar.
889                debug!(target: "provider::static_file", ?segment, "Cleaning up jar map");
890                self.map.retain(|(end, seg), _| !(*seg == segment && *end > fixed_range.end()));
891            }
892            None => {
893                debug!(target: "provider::static_file", ?segment, "Removing segment from index");
894                indexes.remove(&segment);
895            }
896        };
897
898        debug!(target: "provider::static_file", ?segment, "Updated provider index");
899        Ok(())
900    }
901
902    /// Initializes the inner transaction and block index
903    pub fn initialize_index(&self) -> ProviderResult<()> {
904        let mut indexes = self.indexes.write();
905        indexes.clear();
906
907        for (segment, headers) in iter_static_files(&self.path).map_err(ProviderError::other)? {
908            // Update first and last block for each segment
909            //
910            // It's safe to call `expect` here, because every segment has at least one header
911            // associated with it.
912            let min_block_range = Some(headers.first().expect("headers are not empty").0);
913            let max_block = headers.last().expect("headers are not empty").0.end();
914
915            let mut expected_block_ranges_by_max_block = BTreeMap::default();
916            let mut available_block_ranges_by_max_tx = None;
917
918            for (block_range, header) in headers {
919                // Update max expected block -> expected_block_range index
920                expected_block_ranges_by_max_block
921                    .insert(header.expected_block_end(), header.expected_block_range());
922
923                // Update max tx -> block_range index
924                if let Some(tx_range) = header.tx_range() {
925                    let tx_end = tx_range.end();
926
927                    available_block_ranges_by_max_tx
928                        .get_or_insert_with(BTreeMap::default)
929                        .insert(tx_end, block_range);
930                }
931            }
932
933            indexes.insert(
934                segment,
935                StaticFileSegmentIndex {
936                    min_block_range,
937                    max_block,
938                    expected_block_ranges_by_max_block,
939                    available_block_ranges_by_max_tx,
940                },
941            );
942        }
943
944        // If this is a re-initialization, we need to clear this as well
945        self.map.clear();
946
947        // initialize the expired history height to the lowest static file block
948        if let Some(lowest_range) =
949            indexes.get(&StaticFileSegment::Transactions).and_then(|index| index.min_block_range)
950        {
951            // the earliest height is the lowest available block number
952            self.earliest_history_height
953                .store(lowest_range.start(), std::sync::atomic::Ordering::Relaxed);
954        }
955
956        Ok(())
957    }
958
959    /// Ensures that any broken invariants which cannot be healed on the spot return a pipeline
960    /// target to unwind to.
961    ///
962    /// Two types of consistency checks are done for:
963    ///
964    /// 1) When a static file fails to commit but the underlying data was changed.
965    /// 2) When a static file was committed, but the required database transaction was not.
966    ///
967    /// For 1) it can self-heal if `self.access.is_read_only()` is set to `false`. Otherwise, it
968    /// will return an error.
969    /// For 2) the invariants below are checked, and if broken, might require a pipeline unwind
970    /// to heal.
971    ///
972    /// For each static file segment:
973    /// * the corresponding database table should overlap or have continuity in their keys
974    ///   ([`TxNumber`] or [`BlockNumber`]).
975    /// * its highest block should match the stage checkpoint block number if it's equal or higher
976    ///   than the corresponding database table last entry.
977    ///
978    /// Returns a [`Option`] of [`PipelineTarget::Unwind`] if any healing is further required.
979    ///
980    /// WARNING: No static file writer should be held before calling this function, otherwise it
981    /// will deadlock.
982    pub fn check_consistency<Provider>(
983        &self,
984        provider: &Provider,
985    ) -> ProviderResult<Option<PipelineTarget>>
986    where
987        Provider: DBProvider
988            + BlockReader
989            + StageCheckpointReader
990            + ChainSpecProvider
991            + StorageSettingsCache,
992        N: NodePrimitives<Receipt: Value, BlockHeader: Value, SignedTx: Value>,
993    {
994        // OVM historical import is broken and does not work with this check. It's importing
995        // duplicated receipts resulting in having more receipts than the expected transaction
996        // range.
997        //
998        // If we detect an OVM import was done (block #1 <https://optimistic.etherscan.io/block/1>), skip it.
999        // More on [#11099](https://github.com/paradigmxyz/reth/pull/11099).
1000        if provider.chain_spec().is_optimism() &&
1001            reth_chainspec::Chain::optimism_mainnet() == provider.chain_spec().chain_id()
1002        {
1003            // check whether we have the first OVM block: <https://optimistic.etherscan.io/block/0xbee7192e575af30420cae0c7776304ac196077ee72b048970549e4f08e875453>
1004            const OVM_HEADER_1_HASH: B256 =
1005                b256!("0xbee7192e575af30420cae0c7776304ac196077ee72b048970549e4f08e875453");
1006            if provider.block_number(OVM_HEADER_1_HASH)?.is_some() {
1007                info!(target: "reth::cli",
1008                    "Skipping storage verification for OP mainnet, expected inconsistency in OVM chain"
1009                );
1010                return Ok(None)
1011            }
1012        }
1013
1014        info!(target: "reth::cli", "Verifying storage consistency.");
1015
1016        let mut unwind_target: Option<BlockNumber> = None;
1017        let mut update_unwind_target = |new_target: BlockNumber| {
1018            if let Some(target) = unwind_target.as_mut() {
1019                *target = (*target).min(new_target);
1020            } else {
1021                unwind_target = Some(new_target);
1022            }
1023        };
1024
1025        for segment in StaticFileSegment::iter() {
1026            debug!(target: "reth::providers::static_file", ?segment, "Checking consistency for segment");
1027            match segment {
1028                StaticFileSegment::Headers | StaticFileSegment::Transactions => {}
1029                StaticFileSegment::Receipts => {
1030                    if EitherWriter::receipts_destination(provider).is_database() {
1031                        // Old pruned nodes (including full node) do not store receipts as static
1032                        // files.
1033                        debug!(target: "reth::providers::static_file", ?segment, "Skipping receipts consistency check: receipts stored in database");
1034                        continue
1035                    }
1036
1037                    if NamedChain::Gnosis == provider.chain_spec().chain_id() ||
1038                        NamedChain::Chiado == provider.chain_spec().chain_id()
1039                    {
1040                        // Gnosis and Chiado's historical import is broken and does not work with
1041                        // this check. They are importing receipts along
1042                        // with importing headers/bodies.
1043                        debug!(target: "reth::providers::static_file", ?segment, "Skipping receipts consistency check: broken historical import for gnosis/chiado");
1044                        continue;
1045                    }
1046                }
1047                StaticFileSegment::TransactionSenders => {
1048                    if EitherWriterDestination::senders(provider).is_database() {
1049                        continue
1050                    }
1051                }
1052            }
1053
1054            let initial_highest_block = self.get_highest_static_file_block(segment);
1055            debug!(target: "reth::providers::static_file", ?segment, ?initial_highest_block, "Initial highest block for segment");
1056
1057            //  File consistency is broken if:
1058            //
1059            // * appending data was interrupted before a config commit, then data file will be
1060            //   truncated according to the config.
1061            //
1062            // * pruning data was interrupted before a config commit, then we have deleted data that
1063            //   we are expected to still have. We need to check the Database and unwind everything
1064            //   accordingly.
1065            if self.access.is_read_only() {
1066                debug!(target: "reth::providers::static_file", ?segment, "Checking segment consistency (read-only)");
1067                self.check_segment_consistency(segment)?;
1068            } else {
1069                debug!(target: "reth::providers::static_file", ?segment, "Fetching latest writer which might heal any potential inconsistency");
1070                // Fetching the writer will attempt to heal any file level inconsistency.
1071                self.latest_writer(segment)?;
1072            }
1073
1074            // Only applies to block-based static files. (Headers)
1075            //
1076            // The updated `highest_block` may have decreased if we healed from a pruning
1077            // interruption.
1078            let mut highest_block = self.get_highest_static_file_block(segment);
1079            if initial_highest_block != highest_block {
1080                info!(
1081                    target: "reth::providers::static_file",
1082                    ?initial_highest_block,
1083                    unwind_target = highest_block,
1084                    ?segment,
1085                    "Setting unwind target."
1086                );
1087                update_unwind_target(highest_block.unwrap_or_default());
1088            }
1089
1090            // Only applies to transaction-based static files. (Receipts & Transactions)
1091            //
1092            // Make sure the last transaction matches the last block from its indices, since a heal
1093            // from a pruning interruption might have decreased the number of transactions without
1094            // being able to update the last block of the static file segment.
1095            let highest_tx = self.get_highest_static_file_tx(segment);
1096            debug!(target: "reth::providers::static_file", ?segment, ?highest_tx, ?highest_block, "Highest transaction for segment");
1097            if let Some(highest_tx) = highest_tx {
1098                let mut last_block = highest_block.unwrap_or_default();
1099                debug!(target: "reth::providers::static_file", ?segment, last_block, highest_tx, "Verifying last transaction matches last block indices");
1100                loop {
1101                    if let Some(indices) = provider.block_body_indices(last_block)? {
1102                        debug!(target: "reth::providers::static_file", ?segment, last_block, last_tx_num = indices.last_tx_num(), highest_tx, "Found block body indices");
1103                        if indices.last_tx_num() <= highest_tx {
1104                            break
1105                        }
1106                    } else {
1107                        debug!(target: "reth::providers::static_file", ?segment, last_block, "Block body indices not found, static files ahead of database");
1108                        // If the block body indices can not be found, then it means that static
1109                        // files is ahead of database, and the `ensure_invariants` check will fix
1110                        // it by comparing with stage checkpoints.
1111                        break
1112                    }
1113                    if last_block == 0 {
1114                        debug!(target: "reth::providers::static_file", ?segment, "Reached block 0 in verification loop");
1115                        break
1116                    }
1117                    last_block -= 1;
1118
1119                    info!(
1120                        target: "reth::providers::static_file",
1121                        highest_block = self.get_highest_static_file_block(segment),
1122                        unwind_target = last_block,
1123                        ?segment,
1124                        "Setting unwind target."
1125                    );
1126                    highest_block = Some(last_block);
1127                    update_unwind_target(last_block);
1128                }
1129            }
1130
1131            debug!(target: "reth::providers::static_file", ?segment, "Ensuring invariants for segment");
1132            if let Some(unwind) = match segment {
1133                StaticFileSegment::Headers => self
1134                    .ensure_invariants::<_, tables::Headers<N::BlockHeader>>(
1135                        provider,
1136                        segment,
1137                        highest_block,
1138                        highest_block,
1139                    )?,
1140                StaticFileSegment::Transactions => self
1141                    .ensure_invariants::<_, tables::Transactions<N::SignedTx>>(
1142                        provider,
1143                        segment,
1144                        highest_tx,
1145                        highest_block,
1146                    )?,
1147                StaticFileSegment::Receipts => self
1148                    .ensure_invariants::<_, tables::Receipts<N::Receipt>>(
1149                        provider,
1150                        segment,
1151                        highest_tx,
1152                        highest_block,
1153                    )?,
1154                StaticFileSegment::TransactionSenders => self
1155                    .ensure_invariants::<_, tables::TransactionSenders>(
1156                        provider,
1157                        segment,
1158                        highest_tx,
1159                        highest_block,
1160                    )?,
1161            } {
1162                debug!(target: "reth::providers::static_file", ?segment, unwind_target=unwind, "Invariants check returned unwind target");
1163                update_unwind_target(unwind);
1164            } else {
1165                debug!(target: "reth::providers::static_file", ?segment, "Invariants check completed, no unwind needed");
1166            }
1167        }
1168
1169        Ok(unwind_target.map(PipelineTarget::Unwind))
1170    }
1171
1172    /// Checks consistency of the latest static file segment and throws an error if at fault.
1173    /// Read-only.
1174    pub fn check_segment_consistency(&self, segment: StaticFileSegment) -> ProviderResult<()> {
1175        debug!(target: "reth::providers::static_file", ?segment, "Checking segment consistency");
1176        if let Some(latest_block) = self.get_highest_static_file_block(segment) {
1177            let file_path = self
1178                .directory()
1179                .join(segment.filename(&self.find_fixed_range(segment, latest_block)));
1180            debug!(target: "reth::providers::static_file", ?segment, ?file_path, latest_block, "Loading NippyJar for consistency check");
1181
1182            let jar = NippyJar::<SegmentHeader>::load(&file_path).map_err(ProviderError::other)?;
1183            debug!(target: "reth::providers::static_file", ?segment, "NippyJar loaded, checking consistency");
1184
1185            NippyJarChecker::new(jar).check_consistency().map_err(ProviderError::other)?;
1186            debug!(target: "reth::providers::static_file", ?segment, "NippyJar consistency check passed");
1187        } else {
1188            debug!(target: "reth::providers::static_file", ?segment, "No static file block found, skipping consistency check");
1189        }
1190        Ok(())
1191    }
1192
1193    /// Check invariants for each corresponding table and static file segment:
1194    ///
1195    /// * the corresponding database table should overlap or have continuity in their keys
1196    ///   ([`TxNumber`] or [`BlockNumber`]).
1197    /// * its highest block should match the stage checkpoint block number if it's equal or higher
1198    ///   than the corresponding database table last entry.
1199    ///   * If the checkpoint block is higher, then request a pipeline unwind to the static file
1200    ///     block. This is expressed by returning [`Some`] with the requested pipeline unwind
1201    ///     target.
1202    ///   * If the checkpoint block is lower, then heal by removing rows from the static file. In
1203    ///     this case, the rows will be removed and [`None`] will be returned.
1204    ///
1205    /// * If the database tables overlap with static files and have contiguous keys, or the
1206    ///   checkpoint block matches the highest static files block, then [`None`] will be returned.
1207    fn ensure_invariants<Provider, T: Table<Key = u64>>(
1208        &self,
1209        provider: &Provider,
1210        segment: StaticFileSegment,
1211        highest_static_file_entry: Option<u64>,
1212        highest_static_file_block: Option<BlockNumber>,
1213    ) -> ProviderResult<Option<BlockNumber>>
1214    where
1215        Provider: DBProvider + BlockReader + StageCheckpointReader,
1216    {
1217        debug!(target: "reth::providers::static_file", ?segment, ?highest_static_file_entry, ?highest_static_file_block, "Ensuring invariants");
1218        let mut db_cursor = provider.tx_ref().cursor_read::<T>()?;
1219
1220        if let Some((db_first_entry, _)) = db_cursor.first()? {
1221            debug!(target: "reth::providers::static_file", ?segment, db_first_entry, "Found first database entry");
1222            if let (Some(highest_entry), Some(highest_block)) =
1223                (highest_static_file_entry, highest_static_file_block)
1224            {
1225                // If there is a gap between the entry found in static file and
1226                // database, then we have most likely lost static file data and need to unwind so we
1227                // can load it again
1228                if !(db_first_entry <= highest_entry || highest_entry + 1 == db_first_entry) {
1229                    info!(
1230                        target: "reth::providers::static_file",
1231                        ?db_first_entry,
1232                        ?highest_entry,
1233                        unwind_target = highest_block,
1234                        ?segment,
1235                        "Setting unwind target."
1236                    );
1237                    return Ok(Some(highest_block))
1238                }
1239            }
1240
1241            if let Some((db_last_entry, _)) = db_cursor.last()? &&
1242                highest_static_file_entry
1243                    .is_none_or(|highest_entry| db_last_entry > highest_entry)
1244            {
1245                debug!(target: "reth::providers::static_file", ?segment, db_last_entry, ?highest_static_file_entry, "Database has entries beyond static files, no unwind needed");
1246                return Ok(None)
1247            }
1248        } else {
1249            debug!(target: "reth::providers::static_file", ?segment, "No database entries found");
1250        }
1251
1252        let highest_static_file_entry = highest_static_file_entry.unwrap_or_default();
1253        let highest_static_file_block = highest_static_file_block.unwrap_or_default();
1254
1255        // If static file entry is ahead of the database entries, then ensure the checkpoint block
1256        // number matches.
1257        let stage_id = match segment {
1258            StaticFileSegment::Headers => StageId::Headers,
1259            StaticFileSegment::Transactions => StageId::Bodies,
1260            StaticFileSegment::Receipts => StageId::Execution,
1261            StaticFileSegment::TransactionSenders => StageId::SenderRecovery,
1262        };
1263        let checkpoint_block_number =
1264            provider.get_stage_checkpoint(stage_id)?.unwrap_or_default().block_number;
1265        debug!(target: "reth::providers::static_file", ?segment, ?stage_id, checkpoint_block_number, highest_static_file_block, "Retrieved stage checkpoint");
1266
1267        // If the checkpoint is ahead, then we lost static file data. May be data corruption.
1268        if checkpoint_block_number > highest_static_file_block {
1269            info!(
1270                target: "reth::providers::static_file",
1271                checkpoint_block_number,
1272                unwind_target = highest_static_file_block,
1273                ?segment,
1274                "Setting unwind target."
1275            );
1276            return Ok(Some(highest_static_file_block))
1277        }
1278
1279        // If the checkpoint is behind, then we failed to do a database commit **but committed** to
1280        // static files on executing a stage, or the reverse on unwinding a stage.
1281        // All we need to do is to prune the extra static file rows.
1282        if checkpoint_block_number < highest_static_file_block {
1283            info!(
1284                target: "reth::providers",
1285                ?segment,
1286                from = highest_static_file_block,
1287                to = checkpoint_block_number,
1288                "Unwinding static file segment."
1289            );
1290            let mut writer = self.latest_writer(segment)?;
1291            match segment {
1292                StaticFileSegment::Headers => {
1293                    let prune_count = highest_static_file_block - checkpoint_block_number;
1294                    debug!(target: "reth::providers::static_file", ?segment, prune_count, "Pruning headers");
1295                    // TODO(joshie): is_block_meta
1296                    writer.prune_headers(prune_count)?;
1297                }
1298                StaticFileSegment::Transactions |
1299                StaticFileSegment::Receipts |
1300                StaticFileSegment::TransactionSenders => {
1301                    if let Some(block) = provider.block_body_indices(checkpoint_block_number)? {
1302                        let number = highest_static_file_entry - block.last_tx_num();
1303                        debug!(target: "reth::providers::static_file", ?segment, prune_count = number, checkpoint_block_number, "Pruning transaction based segment");
1304
1305                        match segment {
1306                            StaticFileSegment::Transactions => {
1307                                writer.prune_transactions(number, checkpoint_block_number)?
1308                            }
1309                            StaticFileSegment::Receipts => {
1310                                writer.prune_receipts(number, checkpoint_block_number)?
1311                            }
1312                            StaticFileSegment::TransactionSenders => {
1313                                writer.prune_transaction_senders(number, checkpoint_block_number)?
1314                            }
1315                            StaticFileSegment::Headers => unreachable!(),
1316                        }
1317                    } else {
1318                        debug!(target: "reth::providers::static_file", ?segment, checkpoint_block_number, "No block body indices found for checkpoint block");
1319                    }
1320                }
1321            }
1322            debug!(target: "reth::providers::static_file", ?segment, "Committing writer after pruning");
1323            writer.commit()?;
1324            debug!(target: "reth::providers::static_file", ?segment, "Writer committed successfully");
1325        }
1326
1327        debug!(target: "reth::providers::static_file", ?segment, "Invariants ensured, returning None");
1328        Ok(None)
1329    }
1330
1331    /// Returns the earliest available block number that has not been expired and is still
1332    /// available.
1333    ///
1334    /// This means that the highest expired block (or expired block height) is
1335    /// `earliest_history_height.saturating_sub(1)`.
1336    ///
1337    /// Returns `0` if no history has been expired.
1338    pub fn earliest_history_height(&self) -> BlockNumber {
1339        self.earliest_history_height.load(std::sync::atomic::Ordering::Relaxed)
1340    }
1341
1342    /// Gets the lowest static file's block range if it exists for a static file segment.
1343    ///
1344    /// If there is nothing on disk for the given segment, this will return [`None`].
1345    pub fn get_lowest_range(&self, segment: StaticFileSegment) -> Option<SegmentRangeInclusive> {
1346        self.indexes.read().get(&segment).and_then(|index| index.min_block_range)
1347    }
1348
1349    /// Gets the lowest static file's block range start if it exists for a static file segment.
1350    ///
1351    /// For example if the lowest static file has blocks 0-499, this will return 0.
1352    ///
1353    /// If there is nothing on disk for the given segment, this will return [`None`].
1354    pub fn get_lowest_range_start(&self, segment: StaticFileSegment) -> Option<BlockNumber> {
1355        self.get_lowest_range(segment).map(|range| range.start())
1356    }
1357
1358    /// Gets the lowest static file's block range end if it exists for a static file segment.
1359    ///
1360    /// For example if the static file has blocks 0-499, this will return 499.
1361    ///
1362    /// If there is nothing on disk for the given segment, this will return [`None`].
1363    pub fn get_lowest_range_end(&self, segment: StaticFileSegment) -> Option<BlockNumber> {
1364        self.get_lowest_range(segment).map(|range| range.end())
1365    }
1366
1367    /// Gets the highest static file's block height if it exists for a static file segment.
1368    ///
1369    /// If there is nothing on disk for the given segment, this will return [`None`].
1370    pub fn get_highest_static_file_block(&self, segment: StaticFileSegment) -> Option<BlockNumber> {
1371        self.indexes.read().get(&segment).map(|index| index.max_block)
1372    }
1373
1374    /// Gets the highest static file transaction.
1375    ///
1376    /// If there is nothing on disk for the given segment, this will return [`None`].
1377    pub fn get_highest_static_file_tx(&self, segment: StaticFileSegment) -> Option<TxNumber> {
1378        self.indexes
1379            .read()
1380            .get(&segment)
1381            .and_then(|index| index.available_block_ranges_by_max_tx.as_ref())
1382            .and_then(|index| index.last_key_value().map(|(last_tx, _)| *last_tx))
1383    }
1384
1385    /// Gets the highest static file block for all segments.
1386    pub fn get_highest_static_files(&self) -> HighestStaticFiles {
1387        HighestStaticFiles {
1388            receipts: self.get_highest_static_file_block(StaticFileSegment::Receipts),
1389        }
1390    }
1391
1392    /// Iterates through segment `static_files` in reverse order, executing a function until it
1393    /// returns some object. Useful for finding objects by [`TxHash`] or [`BlockHash`].
1394    pub fn find_static_file<T>(
1395        &self,
1396        segment: StaticFileSegment,
1397        func: impl Fn(StaticFileJarProvider<'_, N>) -> ProviderResult<Option<T>>,
1398    ) -> ProviderResult<Option<T>> {
1399        if let Some(ranges) =
1400            self.indexes.read().get(&segment).map(|index| &index.expected_block_ranges_by_max_block)
1401        {
1402            // Iterate through all ranges in reverse order (highest to lowest)
1403            for range in ranges.values().rev() {
1404                if let Some(res) = func(self.get_or_create_jar_provider(segment, range)?)? {
1405                    return Ok(Some(res))
1406                }
1407            }
1408        }
1409
1410        Ok(None)
1411    }
1412
1413    /// Fetches data within a specified range across multiple static files.
1414    ///
1415    /// This function iteratively retrieves data using `get_fn` for each item in the given range.
1416    /// It continues fetching until the end of the range is reached or the provided `predicate`
1417    /// returns false.
1418    pub fn fetch_range_with_predicate<T, F, P>(
1419        &self,
1420        segment: StaticFileSegment,
1421        range: Range<u64>,
1422        mut get_fn: F,
1423        mut predicate: P,
1424    ) -> ProviderResult<Vec<T>>
1425    where
1426        F: FnMut(&mut StaticFileCursor<'_>, u64) -> ProviderResult<Option<T>>,
1427        P: FnMut(&T) -> bool,
1428    {
1429        let mut result = Vec::with_capacity((range.end - range.start).min(100) as usize);
1430
1431        /// Resolves to the provider for the given block or transaction number.
1432        ///
1433        /// If the static file is missing, the `result` is returned.
1434        macro_rules! get_provider {
1435            ($number:expr) => {{
1436                match self.get_segment_provider(segment, $number) {
1437                    Ok(provider) => provider,
1438                    Err(
1439                        ProviderError::MissingStaticFileBlock(_, _) |
1440                        ProviderError::MissingStaticFileTx(_, _),
1441                    ) => return Ok(result),
1442                    Err(err) => return Err(err),
1443                }
1444            }};
1445        }
1446
1447        let mut provider = get_provider!(range.start);
1448        let mut cursor = provider.cursor()?;
1449
1450        // advances number in range
1451        'outer: for number in range {
1452            // The `retrying` flag ensures a single retry attempt per `number`. If `get_fn` fails to
1453            // access data in two different static files, it halts further attempts by returning
1454            // an error, effectively preventing infinite retry loops.
1455            let mut retrying = false;
1456
1457            // advances static files if `get_fn` returns None
1458            'inner: loop {
1459                match get_fn(&mut cursor, number)? {
1460                    Some(res) => {
1461                        if !predicate(&res) {
1462                            break 'outer
1463                        }
1464                        result.push(res);
1465                        break 'inner
1466                    }
1467                    None => {
1468                        if retrying {
1469                            return Ok(result)
1470                        }
1471                        // There is a very small chance of hitting a deadlock if two consecutive
1472                        // static files share the same bucket in the
1473                        // internal dashmap and we don't drop the current provider
1474                        // before requesting the next one.
1475                        drop(cursor);
1476                        drop(provider);
1477                        provider = get_provider!(number);
1478                        cursor = provider.cursor()?;
1479                        retrying = true;
1480                    }
1481                }
1482            }
1483        }
1484
1485        Ok(result)
1486    }
1487
1488    /// Fetches data within a specified range across multiple static files.
1489    ///
1490    /// Returns an iterator over the data. Yields [`None`] if the data for the specified number is
1491    /// not found.
1492    pub fn fetch_range_iter<'a, T, F>(
1493        &'a self,
1494        segment: StaticFileSegment,
1495        range: Range<u64>,
1496        get_fn: F,
1497    ) -> ProviderResult<impl Iterator<Item = ProviderResult<Option<T>>> + 'a>
1498    where
1499        F: Fn(&mut StaticFileCursor<'_>, u64) -> ProviderResult<Option<T>> + 'a,
1500        T: std::fmt::Debug,
1501    {
1502        let mut provider = self.get_maybe_segment_provider(segment, range.start)?;
1503        Ok(range.map(move |number| {
1504            match provider
1505                .as_ref()
1506                .map(|provider| get_fn(&mut provider.cursor()?, number))
1507                .and_then(|result| result.transpose())
1508            {
1509                Some(result) => result.map(Some),
1510                None => {
1511                    // There is a very small chance of hitting a deadlock if two consecutive
1512                    // static files share the same bucket in the internal dashmap and we don't drop
1513                    // the current provider before requesting the next one.
1514                    provider.take();
1515                    provider = self.get_maybe_segment_provider(segment, number)?;
1516                    provider
1517                        .as_ref()
1518                        .map(|provider| get_fn(&mut provider.cursor()?, number))
1519                        .and_then(|result| result.transpose())
1520                        .transpose()
1521                }
1522            }
1523        }))
1524    }
1525
1526    /// Returns directory where `static_files` are located.
1527    pub fn directory(&self) -> &Path {
1528        &self.path
1529    }
1530
1531    /// Retrieves data from the database or static file, wherever it's available.
1532    ///
1533    /// # Arguments
1534    /// * `segment` - The segment of the static file to check against.
1535    /// * `index_key` - Requested index key, usually a block or transaction number.
1536    /// * `fetch_from_static_file` - A closure that defines how to fetch the data from the static
1537    ///   file provider.
1538    /// * `fetch_from_database` - A closure that defines how to fetch the data from the database
1539    ///   when the static file doesn't contain the required data or is not available.
1540    pub fn get_with_static_file_or_database<T, FS, FD>(
1541        &self,
1542        segment: StaticFileSegment,
1543        number: u64,
1544        fetch_from_static_file: FS,
1545        fetch_from_database: FD,
1546    ) -> ProviderResult<Option<T>>
1547    where
1548        FS: Fn(&Self) -> ProviderResult<Option<T>>,
1549        FD: Fn() -> ProviderResult<Option<T>>,
1550    {
1551        // If there is, check the maximum block or transaction number of the segment.
1552        let static_file_upper_bound = if segment.is_block_based() {
1553            self.get_highest_static_file_block(segment)
1554        } else {
1555            self.get_highest_static_file_tx(segment)
1556        };
1557
1558        if static_file_upper_bound
1559            .is_some_and(|static_file_upper_bound| static_file_upper_bound >= number)
1560        {
1561            return fetch_from_static_file(self)
1562        }
1563        fetch_from_database()
1564    }
1565
1566    /// Gets data within a specified range, potentially spanning different `static_files` and
1567    /// database.
1568    ///
1569    /// # Arguments
1570    /// * `segment` - The segment of the static file to query.
1571    /// * `block_or_tx_range` - The range of data to fetch.
1572    /// * `fetch_from_static_file` - A function to fetch data from the `static_file`.
1573    /// * `fetch_from_database` - A function to fetch data from the database.
1574    /// * `predicate` - A function used to evaluate each item in the fetched data. Fetching is
1575    ///   terminated when this function returns false, thereby filtering the data based on the
1576    ///   provided condition.
1577    pub fn get_range_with_static_file_or_database<T, P, FS, FD>(
1578        &self,
1579        segment: StaticFileSegment,
1580        mut block_or_tx_range: Range<u64>,
1581        fetch_from_static_file: FS,
1582        mut fetch_from_database: FD,
1583        mut predicate: P,
1584    ) -> ProviderResult<Vec<T>>
1585    where
1586        FS: Fn(&Self, Range<u64>, &mut P) -> ProviderResult<Vec<T>>,
1587        FD: FnMut(Range<u64>, P) -> ProviderResult<Vec<T>>,
1588        P: FnMut(&T) -> bool,
1589    {
1590        let mut data = Vec::new();
1591
1592        // If there is, check the maximum block or transaction number of the segment.
1593        if let Some(static_file_upper_bound) = if segment.is_block_based() {
1594            self.get_highest_static_file_block(segment)
1595        } else {
1596            self.get_highest_static_file_tx(segment)
1597        } && block_or_tx_range.start <= static_file_upper_bound
1598        {
1599            let end = block_or_tx_range.end.min(static_file_upper_bound + 1);
1600            data.extend(fetch_from_static_file(
1601                self,
1602                block_or_tx_range.start..end,
1603                &mut predicate,
1604            )?);
1605            block_or_tx_range.start = end;
1606        }
1607
1608        if block_or_tx_range.end > block_or_tx_range.start {
1609            data.extend(fetch_from_database(block_or_tx_range, predicate)?)
1610        }
1611
1612        Ok(data)
1613    }
1614
1615    /// Returns static files directory
1616    #[cfg(any(test, feature = "test-utils"))]
1617    pub fn path(&self) -> &Path {
1618        &self.path
1619    }
1620
1621    /// Returns transaction index
1622    #[cfg(any(test, feature = "test-utils"))]
1623    pub fn tx_index(&self, segment: StaticFileSegment) -> Option<SegmentRanges> {
1624        self.indexes
1625            .read()
1626            .get(&segment)
1627            .and_then(|index| index.available_block_ranges_by_max_tx.as_ref())
1628            .cloned()
1629    }
1630
1631    /// Returns expected block index
1632    #[cfg(any(test, feature = "test-utils"))]
1633    pub fn expected_block_index(&self, segment: StaticFileSegment) -> Option<SegmentRanges> {
1634        self.indexes
1635            .read()
1636            .get(&segment)
1637            .map(|index| &index.expected_block_ranges_by_max_block)
1638            .cloned()
1639    }
1640}
1641
1642#[derive(Debug)]
1643struct StaticFileSegmentIndex {
1644    /// Min static file block range.
1645    ///
1646    /// This index is initialized on launch to keep track of the lowest, non-expired static file
1647    /// per segment and gets updated on [`StaticFileProvider::update_index`].
1648    ///
1649    /// This tracks the lowest static file per segment together with the block range in that
1650    /// file. E.g. static file is batched in 500k block intervals then the lowest static file
1651    /// is [0..499K], and the block range is start = 0, end = 499K.
1652    ///
1653    /// This index is mainly used for history expiry, which targets transactions, e.g. pre-merge
1654    /// history expiry would lead to removing all static files below the merge height.
1655    min_block_range: Option<SegmentRangeInclusive>,
1656    /// Max static file block.
1657    max_block: u64,
1658    /// Expected static file block ranges indexed by max expected blocks.
1659    ///
1660    /// For example, a static file for expected block range `0..=499_000` may have only block range
1661    /// `0..=1000` contained in it, as it's not fully filled yet. This index maps the max expected
1662    /// block to the expected range, i.e. block `499_000` to block range `0..=499_000`.
1663    expected_block_ranges_by_max_block: SegmentRanges,
1664    /// Available on disk static file block ranges indexed by max transactions.
1665    ///
1666    /// For example, a static file for block range `0..=499_000` may only have block range
1667    /// `0..=1000` and transaction range `0..=2000` contained in it. This index maps the max
1668    /// available transaction to the available block range, i.e. transaction `2000` to block range
1669    /// `0..=1000`.
1670    available_block_ranges_by_max_tx: Option<SegmentRanges>,
1671}
1672
1673/// Helper trait to manage different [`StaticFileProviderRW`] of an `Arc<StaticFileProvider`
1674pub trait StaticFileWriter {
1675    /// The primitives type used by the static file provider.
1676    type Primitives: Send + Sync + 'static;
1677
1678    /// Returns a mutable reference to a [`StaticFileProviderRW`] of a [`StaticFileSegment`].
1679    fn get_writer(
1680        &self,
1681        block: BlockNumber,
1682        segment: StaticFileSegment,
1683    ) -> ProviderResult<StaticFileProviderRWRefMut<'_, Self::Primitives>>;
1684
1685    /// Returns a mutable reference to a [`StaticFileProviderRW`] of the latest
1686    /// [`StaticFileSegment`].
1687    fn latest_writer(
1688        &self,
1689        segment: StaticFileSegment,
1690    ) -> ProviderResult<StaticFileProviderRWRefMut<'_, Self::Primitives>>;
1691
1692    /// Commits all changes of all [`StaticFileProviderRW`] of all [`StaticFileSegment`].
1693    fn commit(&self) -> ProviderResult<()>;
1694
1695    /// Returns `true` if the static file provider has unwind queued.
1696    fn has_unwind_queued(&self) -> bool;
1697}
1698
1699impl<N: NodePrimitives> StaticFileWriter for StaticFileProvider<N> {
1700    type Primitives = N;
1701
1702    fn get_writer(
1703        &self,
1704        block: BlockNumber,
1705        segment: StaticFileSegment,
1706    ) -> ProviderResult<StaticFileProviderRWRefMut<'_, Self::Primitives>> {
1707        if self.access.is_read_only() {
1708            return Err(ProviderError::ReadOnlyStaticFileAccess)
1709        }
1710
1711        trace!(target: "provider::static_file", ?block, ?segment, "Getting static file writer.");
1712        self.writers.get_or_create(segment, || {
1713            StaticFileProviderRW::new(segment, block, Arc::downgrade(&self.0), self.metrics.clone())
1714        })
1715    }
1716
1717    fn latest_writer(
1718        &self,
1719        segment: StaticFileSegment,
1720    ) -> ProviderResult<StaticFileProviderRWRefMut<'_, Self::Primitives>> {
1721        self.get_writer(self.get_highest_static_file_block(segment).unwrap_or_default(), segment)
1722    }
1723
1724    fn commit(&self) -> ProviderResult<()> {
1725        self.writers.commit()
1726    }
1727
1728    fn has_unwind_queued(&self) -> bool {
1729        self.writers.has_unwind_queued()
1730    }
1731}
1732
1733impl<N: NodePrimitives<BlockHeader: Value>> HeaderProvider for StaticFileProvider<N> {
1734    type Header = N::BlockHeader;
1735
1736    fn header(&self, block_hash: BlockHash) -> ProviderResult<Option<Self::Header>> {
1737        self.find_static_file(StaticFileSegment::Headers, |jar_provider| {
1738            Ok(jar_provider
1739                .cursor()?
1740                .get_two::<HeaderWithHashMask<Self::Header>>((&block_hash).into())?
1741                .and_then(|(header, hash)| {
1742                    if hash == block_hash {
1743                        return Some(header)
1744                    }
1745                    None
1746                }))
1747        })
1748    }
1749
1750    fn header_by_number(&self, num: BlockNumber) -> ProviderResult<Option<Self::Header>> {
1751        self.get_segment_provider_for_block(StaticFileSegment::Headers, num, None)
1752            .and_then(|provider| provider.header_by_number(num))
1753            .or_else(|err| {
1754                if let ProviderError::MissingStaticFileBlock(_, _) = err {
1755                    Ok(None)
1756                } else {
1757                    Err(err)
1758                }
1759            })
1760    }
1761
1762    fn headers_range(
1763        &self,
1764        range: impl RangeBounds<BlockNumber>,
1765    ) -> ProviderResult<Vec<Self::Header>> {
1766        self.fetch_range_with_predicate(
1767            StaticFileSegment::Headers,
1768            to_range(range),
1769            |cursor, number| cursor.get_one::<HeaderMask<Self::Header>>(number.into()),
1770            |_| true,
1771        )
1772    }
1773
1774    fn sealed_header(
1775        &self,
1776        num: BlockNumber,
1777    ) -> ProviderResult<Option<SealedHeader<Self::Header>>> {
1778        self.get_segment_provider_for_block(StaticFileSegment::Headers, num, None)
1779            .and_then(|provider| provider.sealed_header(num))
1780            .or_else(|err| {
1781                if let ProviderError::MissingStaticFileBlock(_, _) = err {
1782                    Ok(None)
1783                } else {
1784                    Err(err)
1785                }
1786            })
1787    }
1788
1789    fn sealed_headers_while(
1790        &self,
1791        range: impl RangeBounds<BlockNumber>,
1792        predicate: impl FnMut(&SealedHeader<Self::Header>) -> bool,
1793    ) -> ProviderResult<Vec<SealedHeader<Self::Header>>> {
1794        self.fetch_range_with_predicate(
1795            StaticFileSegment::Headers,
1796            to_range(range),
1797            |cursor, number| {
1798                Ok(cursor
1799                    .get_two::<HeaderWithHashMask<Self::Header>>(number.into())?
1800                    .map(|(header, hash)| SealedHeader::new(header, hash)))
1801            },
1802            predicate,
1803        )
1804    }
1805}
1806
1807impl<N: NodePrimitives> BlockHashReader for StaticFileProvider<N> {
1808    fn block_hash(&self, num: u64) -> ProviderResult<Option<B256>> {
1809        self.get_segment_provider_for_block(StaticFileSegment::Headers, num, None)
1810            .and_then(|provider| provider.block_hash(num))
1811            .or_else(|err| {
1812                if let ProviderError::MissingStaticFileBlock(_, _) = err {
1813                    Ok(None)
1814                } else {
1815                    Err(err)
1816                }
1817            })
1818    }
1819
1820    fn canonical_hashes_range(
1821        &self,
1822        start: BlockNumber,
1823        end: BlockNumber,
1824    ) -> ProviderResult<Vec<B256>> {
1825        self.fetch_range_with_predicate(
1826            StaticFileSegment::Headers,
1827            start..end,
1828            |cursor, number| cursor.get_one::<BlockHashMask>(number.into()),
1829            |_| true,
1830        )
1831    }
1832}
1833
1834impl<N: NodePrimitives<SignedTx: Value + SignedTransaction, Receipt: Value>> ReceiptProvider
1835    for StaticFileProvider<N>
1836{
1837    type Receipt = N::Receipt;
1838
1839    fn receipt(&self, num: TxNumber) -> ProviderResult<Option<Self::Receipt>> {
1840        self.get_segment_provider_for_transaction(StaticFileSegment::Receipts, num, None)
1841            .and_then(|provider| provider.receipt(num))
1842            .or_else(|err| {
1843                if let ProviderError::MissingStaticFileTx(_, _) = err {
1844                    Ok(None)
1845                } else {
1846                    Err(err)
1847                }
1848            })
1849    }
1850
1851    fn receipt_by_hash(&self, hash: TxHash) -> ProviderResult<Option<Self::Receipt>> {
1852        if let Some(num) = self.transaction_id(hash)? {
1853            return self.receipt(num)
1854        }
1855        Ok(None)
1856    }
1857
1858    fn receipts_by_block(
1859        &self,
1860        _block: BlockHashOrNumber,
1861    ) -> ProviderResult<Option<Vec<Self::Receipt>>> {
1862        unreachable!()
1863    }
1864
1865    fn receipts_by_tx_range(
1866        &self,
1867        range: impl RangeBounds<TxNumber>,
1868    ) -> ProviderResult<Vec<Self::Receipt>> {
1869        self.fetch_range_with_predicate(
1870            StaticFileSegment::Receipts,
1871            to_range(range),
1872            |cursor, number| cursor.get_one::<ReceiptMask<Self::Receipt>>(number.into()),
1873            |_| true,
1874        )
1875    }
1876
1877    fn receipts_by_block_range(
1878        &self,
1879        _block_range: RangeInclusive<BlockNumber>,
1880    ) -> ProviderResult<Vec<Vec<Self::Receipt>>> {
1881        Err(ProviderError::UnsupportedProvider)
1882    }
1883}
1884
1885impl<N: NodePrimitives<SignedTx: Value, Receipt: Value, BlockHeader: Value>> TransactionsProviderExt
1886    for StaticFileProvider<N>
1887{
1888    fn transaction_hashes_by_range(
1889        &self,
1890        tx_range: Range<TxNumber>,
1891    ) -> ProviderResult<Vec<(TxHash, TxNumber)>> {
1892        let tx_range_size = (tx_range.end - tx_range.start) as usize;
1893
1894        // Transactions are different size, so chunks will not all take the same processing time. If
1895        // chunks are too big, there will be idle threads waiting for work. Choosing an
1896        // arbitrary smaller value to make sure it doesn't happen.
1897        let chunk_size = 100;
1898
1899        // iterator over the chunks
1900        let chunks = tx_range
1901            .clone()
1902            .step_by(chunk_size)
1903            .map(|start| start..std::cmp::min(start + chunk_size as u64, tx_range.end));
1904        let mut channels = Vec::with_capacity(tx_range_size.div_ceil(chunk_size));
1905
1906        for chunk_range in chunks {
1907            let (channel_tx, channel_rx) = mpsc::channel();
1908            channels.push(channel_rx);
1909
1910            let manager = self.clone();
1911
1912            // Spawn the task onto the global rayon pool
1913            // This task will send the results through the channel after it has calculated
1914            // the hash.
1915            rayon::spawn(move || {
1916                let mut rlp_buf = Vec::with_capacity(128);
1917                let _ = manager.fetch_range_with_predicate(
1918                    StaticFileSegment::Transactions,
1919                    chunk_range,
1920                    |cursor, number| {
1921                        Ok(cursor
1922                            .get_one::<TransactionMask<Self::Transaction>>(number.into())?
1923                            .map(|transaction| {
1924                                rlp_buf.clear();
1925                                let _ = channel_tx
1926                                    .send(calculate_hash((number, transaction), &mut rlp_buf));
1927                            }))
1928                    },
1929                    |_| true,
1930                );
1931            });
1932        }
1933
1934        let mut tx_list = Vec::with_capacity(tx_range_size);
1935
1936        // Iterate over channels and append the tx hashes unsorted
1937        for channel in channels {
1938            while let Ok(tx) = channel.recv() {
1939                let (tx_hash, tx_id) = tx.map_err(|boxed| *boxed)?;
1940                tx_list.push((tx_hash, tx_id));
1941            }
1942        }
1943
1944        Ok(tx_list)
1945    }
1946}
1947
1948impl<N: NodePrimitives<SignedTx: Decompress + SignedTransaction>> TransactionsProvider
1949    for StaticFileProvider<N>
1950{
1951    type Transaction = N::SignedTx;
1952
1953    fn transaction_id(&self, tx_hash: TxHash) -> ProviderResult<Option<TxNumber>> {
1954        self.find_static_file(StaticFileSegment::Transactions, |jar_provider| {
1955            let mut cursor = jar_provider.cursor()?;
1956            if cursor
1957                .get_one::<TransactionMask<Self::Transaction>>((&tx_hash).into())?
1958                .and_then(|tx| (tx.trie_hash() == tx_hash).then_some(tx))
1959                .is_some()
1960            {
1961                Ok(cursor.number())
1962            } else {
1963                Ok(None)
1964            }
1965        })
1966    }
1967
1968    fn transaction_by_id(&self, num: TxNumber) -> ProviderResult<Option<Self::Transaction>> {
1969        self.get_segment_provider_for_transaction(StaticFileSegment::Transactions, num, None)
1970            .and_then(|provider| provider.transaction_by_id(num))
1971            .or_else(|err| {
1972                if let ProviderError::MissingStaticFileTx(_, _) = err {
1973                    Ok(None)
1974                } else {
1975                    Err(err)
1976                }
1977            })
1978    }
1979
1980    fn transaction_by_id_unhashed(
1981        &self,
1982        num: TxNumber,
1983    ) -> ProviderResult<Option<Self::Transaction>> {
1984        self.get_segment_provider_for_transaction(StaticFileSegment::Transactions, num, None)
1985            .and_then(|provider| provider.transaction_by_id_unhashed(num))
1986            .or_else(|err| {
1987                if let ProviderError::MissingStaticFileTx(_, _) = err {
1988                    Ok(None)
1989                } else {
1990                    Err(err)
1991                }
1992            })
1993    }
1994
1995    fn transaction_by_hash(&self, hash: TxHash) -> ProviderResult<Option<Self::Transaction>> {
1996        self.find_static_file(StaticFileSegment::Transactions, |jar_provider| {
1997            Ok(jar_provider
1998                .cursor()?
1999                .get_one::<TransactionMask<Self::Transaction>>((&hash).into())?
2000                .and_then(|tx| (tx.trie_hash() == hash).then_some(tx)))
2001        })
2002    }
2003
2004    fn transaction_by_hash_with_meta(
2005        &self,
2006        _hash: TxHash,
2007    ) -> ProviderResult<Option<(Self::Transaction, TransactionMeta)>> {
2008        // Required data not present in static_files
2009        Err(ProviderError::UnsupportedProvider)
2010    }
2011
2012    fn transactions_by_block(
2013        &self,
2014        _block_id: BlockHashOrNumber,
2015    ) -> ProviderResult<Option<Vec<Self::Transaction>>> {
2016        // Required data not present in static_files
2017        Err(ProviderError::UnsupportedProvider)
2018    }
2019
2020    fn transactions_by_block_range(
2021        &self,
2022        _range: impl RangeBounds<BlockNumber>,
2023    ) -> ProviderResult<Vec<Vec<Self::Transaction>>> {
2024        // Required data not present in static_files
2025        Err(ProviderError::UnsupportedProvider)
2026    }
2027
2028    fn transactions_by_tx_range(
2029        &self,
2030        range: impl RangeBounds<TxNumber>,
2031    ) -> ProviderResult<Vec<Self::Transaction>> {
2032        self.fetch_range_with_predicate(
2033            StaticFileSegment::Transactions,
2034            to_range(range),
2035            |cursor, number| cursor.get_one::<TransactionMask<Self::Transaction>>(number.into()),
2036            |_| true,
2037        )
2038    }
2039
2040    fn senders_by_tx_range(
2041        &self,
2042        range: impl RangeBounds<TxNumber>,
2043    ) -> ProviderResult<Vec<Address>> {
2044        self.fetch_range_with_predicate(
2045            StaticFileSegment::TransactionSenders,
2046            to_range(range),
2047            |cursor, number| cursor.get_one::<TransactionSenderMask>(number.into()),
2048            |_| true,
2049        )
2050    }
2051
2052    fn transaction_sender(&self, id: TxNumber) -> ProviderResult<Option<Address>> {
2053        self.get_segment_provider_for_transaction(StaticFileSegment::TransactionSenders, id, None)
2054            .and_then(|provider| provider.transaction_sender(id))
2055            .or_else(|err| {
2056                if let ProviderError::MissingStaticFileTx(_, _) = err {
2057                    Ok(None)
2058                } else {
2059                    Err(err)
2060                }
2061            })
2062    }
2063}
2064
2065impl<N: NodePrimitives> BlockNumReader for StaticFileProvider<N> {
2066    fn chain_info(&self) -> ProviderResult<ChainInfo> {
2067        // Required data not present in static_files
2068        Err(ProviderError::UnsupportedProvider)
2069    }
2070
2071    fn best_block_number(&self) -> ProviderResult<BlockNumber> {
2072        // Required data not present in static_files
2073        Err(ProviderError::UnsupportedProvider)
2074    }
2075
2076    fn last_block_number(&self) -> ProviderResult<BlockNumber> {
2077        Ok(self.get_highest_static_file_block(StaticFileSegment::Headers).unwrap_or_default())
2078    }
2079
2080    fn block_number(&self, _hash: B256) -> ProviderResult<Option<BlockNumber>> {
2081        // Required data not present in static_files
2082        Err(ProviderError::UnsupportedProvider)
2083    }
2084}
2085
2086/* Cannot be successfully implemented but must exist for trait requirements */
2087
2088impl<N: NodePrimitives<SignedTx: Value, Receipt: Value, BlockHeader: Value>> BlockReader
2089    for StaticFileProvider<N>
2090{
2091    type Block = N::Block;
2092
2093    fn find_block_by_hash(
2094        &self,
2095        _hash: B256,
2096        _source: BlockSource,
2097    ) -> ProviderResult<Option<Self::Block>> {
2098        // Required data not present in static_files
2099        Err(ProviderError::UnsupportedProvider)
2100    }
2101
2102    fn block(&self, _id: BlockHashOrNumber) -> ProviderResult<Option<Self::Block>> {
2103        // Required data not present in static_files
2104        Err(ProviderError::UnsupportedProvider)
2105    }
2106
2107    fn pending_block(&self) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
2108        // Required data not present in static_files
2109        Err(ProviderError::UnsupportedProvider)
2110    }
2111
2112    fn pending_block_and_receipts(
2113        &self,
2114    ) -> ProviderResult<Option<(RecoveredBlock<Self::Block>, Vec<Self::Receipt>)>> {
2115        // Required data not present in static_files
2116        Err(ProviderError::UnsupportedProvider)
2117    }
2118
2119    fn recovered_block(
2120        &self,
2121        _id: BlockHashOrNumber,
2122        _transaction_kind: TransactionVariant,
2123    ) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
2124        // Required data not present in static_files
2125        Err(ProviderError::UnsupportedProvider)
2126    }
2127
2128    fn sealed_block_with_senders(
2129        &self,
2130        _id: BlockHashOrNumber,
2131        _transaction_kind: TransactionVariant,
2132    ) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
2133        // Required data not present in static_files
2134        Err(ProviderError::UnsupportedProvider)
2135    }
2136
2137    fn block_range(&self, _range: RangeInclusive<BlockNumber>) -> ProviderResult<Vec<Self::Block>> {
2138        // Required data not present in static_files
2139        Err(ProviderError::UnsupportedProvider)
2140    }
2141
2142    fn block_with_senders_range(
2143        &self,
2144        _range: RangeInclusive<BlockNumber>,
2145    ) -> ProviderResult<Vec<RecoveredBlock<Self::Block>>> {
2146        Err(ProviderError::UnsupportedProvider)
2147    }
2148
2149    fn recovered_block_range(
2150        &self,
2151        _range: RangeInclusive<BlockNumber>,
2152    ) -> ProviderResult<Vec<RecoveredBlock<Self::Block>>> {
2153        Err(ProviderError::UnsupportedProvider)
2154    }
2155
2156    fn block_by_transaction_id(&self, _id: TxNumber) -> ProviderResult<Option<BlockNumber>> {
2157        Err(ProviderError::UnsupportedProvider)
2158    }
2159}
2160
2161impl<N: NodePrimitives> BlockBodyIndicesProvider for StaticFileProvider<N> {
2162    fn block_body_indices(&self, _num: u64) -> ProviderResult<Option<StoredBlockBodyIndices>> {
2163        Err(ProviderError::UnsupportedProvider)
2164    }
2165
2166    fn block_body_indices_range(
2167        &self,
2168        _range: RangeInclusive<BlockNumber>,
2169    ) -> ProviderResult<Vec<StoredBlockBodyIndices>> {
2170        Err(ProviderError::UnsupportedProvider)
2171    }
2172}
2173
2174impl<N: NodePrimitives> StatsReader for StaticFileProvider<N> {
2175    fn count_entries<T: Table>(&self) -> ProviderResult<usize> {
2176        match T::NAME {
2177            tables::CanonicalHeaders::NAME |
2178            tables::Headers::<Header>::NAME |
2179            tables::HeaderTerminalDifficulties::NAME => Ok(self
2180                .get_highest_static_file_block(StaticFileSegment::Headers)
2181                .map(|block| block + 1)
2182                .unwrap_or_default()
2183                as usize),
2184            tables::Receipts::<Receipt>::NAME => Ok(self
2185                .get_highest_static_file_tx(StaticFileSegment::Receipts)
2186                .map(|receipts| receipts + 1)
2187                .unwrap_or_default() as usize),
2188            tables::Transactions::<TransactionSigned>::NAME => Ok(self
2189                .get_highest_static_file_tx(StaticFileSegment::Transactions)
2190                .map(|txs| txs + 1)
2191                .unwrap_or_default()
2192                as usize),
2193            tables::TransactionSenders::NAME => Ok(self
2194                .get_highest_static_file_tx(StaticFileSegment::TransactionSenders)
2195                .map(|txs| txs + 1)
2196                .unwrap_or_default() as usize),
2197            _ => Err(ProviderError::UnsupportedProvider),
2198        }
2199    }
2200}
2201
2202/// Calculates the tx hash for the given transaction and its id.
2203#[inline]
2204fn calculate_hash<T>(
2205    entry: (TxNumber, T),
2206    rlp_buf: &mut Vec<u8>,
2207) -> Result<(B256, TxNumber), Box<ProviderError>>
2208where
2209    T: Encodable2718,
2210{
2211    let (tx_id, tx) = entry;
2212    tx.encode_2718(rlp_buf);
2213    Ok((keccak256(rlp_buf), tx_id))
2214}
2215
2216#[cfg(test)]
2217mod tests {
2218    use std::collections::BTreeMap;
2219
2220    use reth_chain_state::EthPrimitives;
2221    use reth_db::test_utils::create_test_static_files_dir;
2222    use reth_static_file_types::{SegmentRangeInclusive, StaticFileSegment};
2223
2224    use crate::StaticFileProviderBuilder;
2225
2226    #[test]
2227    fn test_find_fixed_range_with_block_index() -> eyre::Result<()> {
2228        let (static_dir, _) = create_test_static_files_dir();
2229        let sf_rw = StaticFileProviderBuilder::<EthPrimitives>::read_write(&static_dir)?
2230            .with_blocks_per_file(100)
2231            .build()?;
2232
2233        let segment = StaticFileSegment::Headers;
2234
2235        // Test with None - should use default behavior
2236        assert_eq!(
2237            sf_rw.find_fixed_range_with_block_index(segment, None, 0),
2238            SegmentRangeInclusive::new(0, 99)
2239        );
2240        assert_eq!(
2241            sf_rw.find_fixed_range_with_block_index(segment, None, 250),
2242            SegmentRangeInclusive::new(200, 299)
2243        );
2244
2245        // Test with empty index - should fall back to default behavior
2246        assert_eq!(
2247            sf_rw.find_fixed_range_with_block_index(segment, Some(&BTreeMap::new()), 150),
2248            SegmentRangeInclusive::new(100, 199)
2249        );
2250
2251        // Create block index with existing ranges
2252        let block_index = BTreeMap::from_iter([
2253            (99, SegmentRangeInclusive::new(0, 99)),
2254            (199, SegmentRangeInclusive::new(100, 199)),
2255            (299, SegmentRangeInclusive::new(200, 299)),
2256        ]);
2257
2258        // Test blocks within existing ranges - should return the matching range
2259        assert_eq!(
2260            sf_rw.find_fixed_range_with_block_index(segment, Some(&block_index), 0),
2261            SegmentRangeInclusive::new(0, 99)
2262        );
2263        assert_eq!(
2264            sf_rw.find_fixed_range_with_block_index(segment, Some(&block_index), 50),
2265            SegmentRangeInclusive::new(0, 99)
2266        );
2267        assert_eq!(
2268            sf_rw.find_fixed_range_with_block_index(segment, Some(&block_index), 99),
2269            SegmentRangeInclusive::new(0, 99)
2270        );
2271        assert_eq!(
2272            sf_rw.find_fixed_range_with_block_index(segment, Some(&block_index), 100),
2273            SegmentRangeInclusive::new(100, 199)
2274        );
2275        assert_eq!(
2276            sf_rw.find_fixed_range_with_block_index(segment, Some(&block_index), 150),
2277            SegmentRangeInclusive::new(100, 199)
2278        );
2279        assert_eq!(
2280            sf_rw.find_fixed_range_with_block_index(segment, Some(&block_index), 199),
2281            SegmentRangeInclusive::new(100, 199)
2282        );
2283
2284        // Test blocks beyond existing ranges - should derive new ranges from the last range
2285        // Block 300 is exactly one segment after the last range
2286        assert_eq!(
2287            sf_rw.find_fixed_range_with_block_index(segment, Some(&block_index), 300),
2288            SegmentRangeInclusive::new(300, 399)
2289        );
2290        assert_eq!(
2291            sf_rw.find_fixed_range_with_block_index(segment, Some(&block_index), 350),
2292            SegmentRangeInclusive::new(300, 399)
2293        );
2294
2295        // Block 500 skips one segment (300-399)
2296        assert_eq!(
2297            sf_rw.find_fixed_range_with_block_index(segment, Some(&block_index), 500),
2298            SegmentRangeInclusive::new(500, 599)
2299        );
2300
2301        // Block 1000 skips many segments
2302        assert_eq!(
2303            sf_rw.find_fixed_range_with_block_index(segment, Some(&block_index), 1000),
2304            SegmentRangeInclusive::new(1000, 1099)
2305        );
2306
2307        // Test with block index having different sizes than blocks_per_file setting
2308        // This simulates the scenario where blocks_per_file was changed between runs
2309        let mixed_size_index = BTreeMap::from_iter([
2310            (49, SegmentRangeInclusive::new(0, 49)),     // 50 blocks
2311            (149, SegmentRangeInclusive::new(50, 149)),  // 100 blocks
2312            (349, SegmentRangeInclusive::new(150, 349)), // 200 blocks
2313        ]);
2314
2315        // Blocks within existing ranges should return those ranges regardless of size
2316        assert_eq!(
2317            sf_rw.find_fixed_range_with_block_index(segment, Some(&mixed_size_index), 25),
2318            SegmentRangeInclusive::new(0, 49)
2319        );
2320        assert_eq!(
2321            sf_rw.find_fixed_range_with_block_index(segment, Some(&mixed_size_index), 100),
2322            SegmentRangeInclusive::new(50, 149)
2323        );
2324        assert_eq!(
2325            sf_rw.find_fixed_range_with_block_index(segment, Some(&mixed_size_index), 200),
2326            SegmentRangeInclusive::new(150, 349)
2327        );
2328
2329        // Block after the last range should derive using current blocks_per_file (100)
2330        // from the end of the last range (349)
2331        assert_eq!(
2332            sf_rw.find_fixed_range_with_block_index(segment, Some(&mixed_size_index), 350),
2333            SegmentRangeInclusive::new(350, 449)
2334        );
2335        assert_eq!(
2336            sf_rw.find_fixed_range_with_block_index(segment, Some(&mixed_size_index), 450),
2337            SegmentRangeInclusive::new(450, 549)
2338        );
2339        assert_eq!(
2340            sf_rw.find_fixed_range_with_block_index(segment, Some(&mixed_size_index), 550),
2341            SegmentRangeInclusive::new(550, 649)
2342        );
2343
2344        Ok(())
2345    }
2346}