reth_provider/providers/static_file/
manager.rs

1use super::{
2    metrics::StaticFileProviderMetrics, writer::StaticFileWriters, LoadedJar,
3    StaticFileJarProvider, StaticFileProviderRW, StaticFileProviderRWRefMut,
4};
5use crate::{
6    to_range, BlockHashReader, BlockNumReader, BlockReader, BlockSource, HeaderProvider,
7    ReceiptProvider, StageCheckpointReader, StatsReader, TransactionVariant, TransactionsProvider,
8    TransactionsProviderExt,
9};
10use alloy_consensus::{
11    transaction::{SignerRecoverable, TransactionMeta},
12    Header,
13};
14use alloy_eips::{eip2718::Encodable2718, BlockHashOrNumber};
15use alloy_primitives::{b256, keccak256, Address, BlockHash, BlockNumber, TxHash, TxNumber, B256};
16use dashmap::DashMap;
17use notify::{RecommendedWatcher, RecursiveMode, Watcher};
18use parking_lot::RwLock;
19use reth_chainspec::{ChainInfo, ChainSpecProvider, EthChainSpec, NamedChain};
20use reth_db::{
21    lockfile::StorageLock,
22    static_file::{
23        iter_static_files, BlockHashMask, HeaderMask, HeaderWithHashMask, ReceiptMask,
24        StaticFileCursor, TransactionMask,
25    },
26};
27use reth_db_api::{
28    cursor::DbCursorRO,
29    models::StoredBlockBodyIndices,
30    table::{Decompress, Table, Value},
31    tables,
32    transaction::DbTx,
33};
34use reth_ethereum_primitives::{Receipt, TransactionSigned};
35use reth_nippy_jar::{NippyJar, NippyJarChecker, CONFIG_FILE_EXTENSION};
36use reth_node_types::NodePrimitives;
37use reth_primitives_traits::{RecoveredBlock, SealedHeader, SignedTransaction};
38use reth_stages_types::{PipelineTarget, StageId};
39use reth_static_file_types::{
40    find_fixed_range, HighestStaticFiles, SegmentHeader, SegmentRangeInclusive, StaticFileSegment,
41    DEFAULT_BLOCKS_PER_STATIC_FILE,
42};
43use reth_storage_api::{BlockBodyIndicesProvider, DBProvider};
44use reth_storage_errors::provider::{ProviderError, ProviderResult};
45use std::{
46    collections::{hash_map::Entry, BTreeMap, HashMap},
47    fmt::Debug,
48    marker::PhantomData,
49    ops::{Deref, Range, RangeBounds, RangeInclusive},
50    path::{Path, PathBuf},
51    sync::{atomic::AtomicU64, mpsc, Arc},
52};
53use tracing::{debug, info, trace, warn};
54
55/// Alias type for a map that can be queried for block ranges from a transaction
56/// segment respectively. It uses `TxNumber` to represent the transaction end of a static file
57/// range.
58type SegmentRanges = HashMap<StaticFileSegment, BTreeMap<TxNumber, SegmentRangeInclusive>>;
59
60/// Access mode on a static file provider. RO/RW.
61#[derive(Debug, Default, PartialEq, Eq)]
62pub enum StaticFileAccess {
63    /// Read-only access.
64    #[default]
65    RO,
66    /// Read-write access.
67    RW,
68}
69
70impl StaticFileAccess {
71    /// Returns `true` if read-only access.
72    pub const fn is_read_only(&self) -> bool {
73        matches!(self, Self::RO)
74    }
75
76    /// Returns `true` if read-write access.
77    pub const fn is_read_write(&self) -> bool {
78        matches!(self, Self::RW)
79    }
80}
81
82/// [`StaticFileProvider`] manages all existing [`StaticFileJarProvider`].
83///
84/// "Static files" contain immutable chain history data, such as:
85///  - transactions
86///  - headers
87///  - receipts
88///
89/// This provider type is responsible for reading and writing to static files.
90#[derive(Debug)]
91pub struct StaticFileProvider<N>(pub(crate) Arc<StaticFileProviderInner<N>>);
92
93impl<N> Clone for StaticFileProvider<N> {
94    fn clone(&self) -> Self {
95        Self(self.0.clone())
96    }
97}
98
99impl<N: NodePrimitives> StaticFileProvider<N> {
100    /// Creates a new [`StaticFileProvider`] with the given [`StaticFileAccess`].
101    fn new(path: impl AsRef<Path>, access: StaticFileAccess) -> ProviderResult<Self> {
102        let provider = Self(Arc::new(StaticFileProviderInner::new(path, access)?));
103        provider.initialize_index()?;
104        Ok(provider)
105    }
106
107    /// Creates a new [`StaticFileProvider`] with read-only access.
108    ///
109    /// Set `watch_directory` to `true` to track the most recent changes in static files. Otherwise,
110    /// new data won't be detected or queryable.
111    ///
112    /// Watching is recommended if the read-only provider is used on a directory that an active node
113    /// instance is modifying.
114    ///
115    /// See also [`StaticFileProvider::watch_directory`].
116    pub fn read_only(path: impl AsRef<Path>, watch_directory: bool) -> ProviderResult<Self> {
117        let provider = Self::new(path, StaticFileAccess::RO)?;
118
119        if watch_directory {
120            provider.watch_directory();
121        }
122
123        Ok(provider)
124    }
125
126    /// Creates a new [`StaticFileProvider`] with read-write access.
127    pub fn read_write(path: impl AsRef<Path>) -> ProviderResult<Self> {
128        Self::new(path, StaticFileAccess::RW)
129    }
130
131    /// Watches the directory for changes and updates the in-memory index when modifications
132    /// are detected.
133    ///
134    /// This may be necessary, since a non-node process that owns a [`StaticFileProvider`] does not
135    /// receive `update_index` notifications from a node that appends/truncates data.
136    pub fn watch_directory(&self) {
137        let provider = self.clone();
138        std::thread::spawn(move || {
139            let (tx, rx) = std::sync::mpsc::channel();
140            let mut watcher = RecommendedWatcher::new(
141                move |res| tx.send(res).unwrap(),
142                notify::Config::default(),
143            )
144            .expect("failed to create watcher");
145
146            watcher
147                .watch(&provider.path, RecursiveMode::NonRecursive)
148                .expect("failed to watch path");
149
150            // Some backends send repeated modified events
151            let mut last_event_timestamp = None;
152
153            while let Ok(res) = rx.recv() {
154                match res {
155                    Ok(event) => {
156                        // We only care about modified data events
157                        if !matches!(
158                            event.kind,
159                            notify::EventKind::Modify(_) |
160                                notify::EventKind::Create(_) |
161                                notify::EventKind::Remove(_)
162                        ) {
163                            continue
164                        }
165
166                        // We only trigger a re-initialization if a configuration file was
167                        // modified. This means that a
168                        // static_file_provider.commit() was called on the node after
169                        // appending/truncating rows
170                        for segment in event.paths {
171                            // Ensure it's a file with the .conf extension
172                            if segment
173                                .extension()
174                                .is_none_or(|s| s.to_str() != Some(CONFIG_FILE_EXTENSION))
175                            {
176                                continue
177                            }
178
179                            // Ensure it's well formatted static file name
180                            if StaticFileSegment::parse_filename(
181                                &segment.file_stem().expect("qed").to_string_lossy(),
182                            )
183                            .is_none()
184                            {
185                                continue
186                            }
187
188                            // If we can read the metadata and modified timestamp, ensure this is
189                            // not an old or repeated event.
190                            if let Ok(current_modified_timestamp) =
191                                std::fs::metadata(&segment).and_then(|m| m.modified())
192                            {
193                                if last_event_timestamp.is_some_and(|last_timestamp| {
194                                    last_timestamp >= current_modified_timestamp
195                                }) {
196                                    continue
197                                }
198                                last_event_timestamp = Some(current_modified_timestamp);
199                            }
200
201                            info!(target: "providers::static_file", updated_file = ?segment.file_stem(), "re-initializing static file provider index");
202                            if let Err(err) = provider.initialize_index() {
203                                warn!(target: "providers::static_file", "failed to re-initialize index: {err}");
204                            }
205                            break
206                        }
207                    }
208
209                    Err(err) => warn!(target: "providers::watcher", "watch error: {err:?}"),
210                }
211            }
212        });
213    }
214}
215
216impl<N: NodePrimitives> Deref for StaticFileProvider<N> {
217    type Target = StaticFileProviderInner<N>;
218
219    fn deref(&self) -> &Self::Target {
220        &self.0
221    }
222}
223
224/// [`StaticFileProviderInner`] manages all existing [`StaticFileJarProvider`].
225#[derive(Debug)]
226pub struct StaticFileProviderInner<N> {
227    /// Maintains a map which allows for concurrent access to different `NippyJars`, over different
228    /// segments and ranges.
229    map: DashMap<(BlockNumber, StaticFileSegment), LoadedJar>,
230    /// Min static file range for each segment.
231    /// This index is initialized on launch to keep track of the lowest, non-expired static file
232    /// per segment.
233    ///
234    /// This tracks the lowest static file per segment together with the block range in that
235    /// file. E.g. static file is batched in 500k block intervals then the lowest static file
236    /// is [0..499K], and the block range is start = 0, end = 499K.
237    /// This index is mainly used to History expiry, which targets transactions, e.g. pre-merge
238    /// history expiry would lead to removing all static files below the merge height.
239    static_files_min_block: RwLock<HashMap<StaticFileSegment, SegmentRangeInclusive>>,
240    /// This is an additional index that tracks the expired height, this will track the highest
241    /// block number that has been expired (missing). The first, non expired block is
242    /// `expired_history_height + 1`.
243    ///
244    /// This is effectively the transaction range that has been expired:
245    /// [`StaticFileProvider::delete_segment_below_block`] and mirrors
246    /// `static_files_min_block[transactions] - blocks_per_file`.
247    ///
248    /// This additional tracker exists for more efficient lookups because the node must be aware of
249    /// the expired height.
250    earliest_history_height: AtomicU64,
251    /// Max static file block for each segment
252    static_files_max_block: RwLock<HashMap<StaticFileSegment, u64>>,
253    /// Available static file block ranges on disk indexed by max transactions.
254    static_files_tx_index: RwLock<SegmentRanges>,
255    /// Directory where `static_files` are located
256    path: PathBuf,
257    /// Maintains a writer set of [`StaticFileSegment`].
258    writers: StaticFileWriters<N>,
259    /// Metrics for the static files.
260    metrics: Option<Arc<StaticFileProviderMetrics>>,
261    /// Access rights of the provider.
262    access: StaticFileAccess,
263    /// Number of blocks per file.
264    blocks_per_file: u64,
265    /// Write lock for when access is [`StaticFileAccess::RW`].
266    _lock_file: Option<StorageLock>,
267    /// Node primitives
268    _pd: PhantomData<N>,
269}
270
271impl<N: NodePrimitives> StaticFileProviderInner<N> {
272    /// Creates a new [`StaticFileProviderInner`].
273    fn new(path: impl AsRef<Path>, access: StaticFileAccess) -> ProviderResult<Self> {
274        let _lock_file = if access.is_read_write() {
275            StorageLock::try_acquire(path.as_ref()).map_err(ProviderError::other)?.into()
276        } else {
277            None
278        };
279
280        let provider = Self {
281            map: Default::default(),
282            writers: Default::default(),
283            static_files_min_block: Default::default(),
284            earliest_history_height: Default::default(),
285            static_files_max_block: Default::default(),
286            static_files_tx_index: Default::default(),
287            path: path.as_ref().to_path_buf(),
288            metrics: None,
289            access,
290            blocks_per_file: DEFAULT_BLOCKS_PER_STATIC_FILE,
291            _lock_file,
292            _pd: Default::default(),
293        };
294
295        Ok(provider)
296    }
297
298    pub const fn is_read_only(&self) -> bool {
299        self.access.is_read_only()
300    }
301
302    /// Each static file has a fixed number of blocks. This gives out the range where the requested
303    /// block is positioned.
304    pub const fn find_fixed_range(&self, block: BlockNumber) -> SegmentRangeInclusive {
305        find_fixed_range(block, self.blocks_per_file)
306    }
307}
308
309impl<N: NodePrimitives> StaticFileProvider<N> {
310    /// Set a custom number of blocks per file.
311    #[cfg(any(test, feature = "test-utils"))]
312    pub fn with_custom_blocks_per_file(self, blocks_per_file: u64) -> Self {
313        let mut provider =
314            Arc::try_unwrap(self.0).expect("should be called when initializing only");
315        provider.blocks_per_file = blocks_per_file;
316        Self(Arc::new(provider))
317    }
318
319    /// Enables metrics on the [`StaticFileProvider`].
320    pub fn with_metrics(self) -> Self {
321        let mut provider =
322            Arc::try_unwrap(self.0).expect("should be called when initializing only");
323        provider.metrics = Some(Arc::new(StaticFileProviderMetrics::default()));
324        Self(Arc::new(provider))
325    }
326
327    /// Reports metrics for the static files.
328    pub fn report_metrics(&self) -> ProviderResult<()> {
329        let Some(metrics) = &self.metrics else { return Ok(()) };
330
331        let static_files = iter_static_files(&self.path).map_err(ProviderError::other)?;
332        for (segment, ranges) in static_files {
333            let mut entries = 0;
334            let mut size = 0;
335
336            for (block_range, _) in &ranges {
337                let fixed_block_range = self.find_fixed_range(block_range.start());
338                let jar_provider = self
339                    .get_segment_provider(segment, || Some(fixed_block_range), None)?
340                    .ok_or_else(|| {
341                        ProviderError::MissingStaticFileBlock(segment, block_range.start())
342                    })?;
343
344                entries += jar_provider.rows();
345
346                let data_size = reth_fs_util::metadata(jar_provider.data_path())
347                    .map(|metadata| metadata.len())
348                    .unwrap_or_default();
349                let index_size = reth_fs_util::metadata(jar_provider.index_path())
350                    .map(|metadata| metadata.len())
351                    .unwrap_or_default();
352                let offsets_size = reth_fs_util::metadata(jar_provider.offsets_path())
353                    .map(|metadata| metadata.len())
354                    .unwrap_or_default();
355                let config_size = reth_fs_util::metadata(jar_provider.config_path())
356                    .map(|metadata| metadata.len())
357                    .unwrap_or_default();
358
359                size += data_size + index_size + offsets_size + config_size;
360            }
361
362            metrics.record_segment(segment, size, ranges.len(), entries);
363        }
364
365        Ok(())
366    }
367
368    /// Gets the [`StaticFileJarProvider`] of the requested segment and block.
369    pub fn get_segment_provider_from_block(
370        &self,
371        segment: StaticFileSegment,
372        block: BlockNumber,
373        path: Option<&Path>,
374    ) -> ProviderResult<StaticFileJarProvider<'_, N>> {
375        self.get_segment_provider(
376            segment,
377            || self.get_segment_ranges_from_block(segment, block),
378            path,
379        )?
380        .ok_or(ProviderError::MissingStaticFileBlock(segment, block))
381    }
382
383    /// Gets the [`StaticFileJarProvider`] of the requested segment and transaction.
384    pub fn get_segment_provider_from_transaction(
385        &self,
386        segment: StaticFileSegment,
387        tx: TxNumber,
388        path: Option<&Path>,
389    ) -> ProviderResult<StaticFileJarProvider<'_, N>> {
390        self.get_segment_provider(
391            segment,
392            || self.get_segment_ranges_from_transaction(segment, tx),
393            path,
394        )?
395        .ok_or(ProviderError::MissingStaticFileTx(segment, tx))
396    }
397
398    /// Gets the [`StaticFileJarProvider`] of the requested segment and block or transaction.
399    ///
400    /// `fn_range` should make sure the range goes through `find_fixed_range`.
401    pub fn get_segment_provider(
402        &self,
403        segment: StaticFileSegment,
404        fn_range: impl Fn() -> Option<SegmentRangeInclusive>,
405        path: Option<&Path>,
406    ) -> ProviderResult<Option<StaticFileJarProvider<'_, N>>> {
407        // If we have a path, then get the block range from its name.
408        // Otherwise, check `self.available_static_files`
409        let block_range = match path {
410            Some(path) => StaticFileSegment::parse_filename(
411                &path
412                    .file_name()
413                    .ok_or_else(|| {
414                        ProviderError::MissingStaticFilePath(segment, path.to_path_buf())
415                    })?
416                    .to_string_lossy(),
417            )
418            .and_then(|(parsed_segment, block_range)| {
419                if parsed_segment == segment {
420                    return Some(block_range)
421                }
422                None
423            }),
424            None => fn_range(),
425        };
426
427        // Return cached `LoadedJar` or insert it for the first time, and then, return it.
428        if let Some(block_range) = block_range {
429            return Ok(Some(self.get_or_create_jar_provider(segment, &block_range)?))
430        }
431
432        Ok(None)
433    }
434
435    /// Given a segment and block range it removes the cached provider from the map.
436    ///
437    /// CAUTION: cached provider should be dropped before calling this or IT WILL deadlock.
438    pub fn remove_cached_provider(
439        &self,
440        segment: StaticFileSegment,
441        fixed_block_range_end: BlockNumber,
442    ) {
443        self.map.remove(&(fixed_block_range_end, segment));
444    }
445
446    /// This handles history expiry by deleting all static files for the given segment below the
447    /// given block.
448    ///
449    /// For example if block is 1M and the blocks per file are 500K this will delete all individual
450    /// files below 1M, so 0-499K and 500K-999K.
451    ///
452    /// This will not delete the file that contains the block itself, because files can only be
453    /// removed entirely.
454    ///
455    /// # Safety
456    ///
457    /// This method will never delete the highest static file for the segment, even if the
458    /// requested block is higher than the highest block in static files. This ensures we always
459    /// maintain at least one static file if any exist.
460    ///
461    /// Returns a list of `SegmentHeader`s from the deleted jars.
462    pub fn delete_segment_below_block(
463        &self,
464        segment: StaticFileSegment,
465        block: BlockNumber,
466    ) -> ProviderResult<Vec<SegmentHeader>> {
467        // Nothing to delete if block is 0.
468        if block == 0 {
469            return Ok(Vec::new())
470        }
471
472        let highest_block = self.get_highest_static_file_block(segment);
473        let mut deleted_headers = Vec::new();
474
475        loop {
476            let Some(block_height) = self.get_lowest_static_file_block(segment) else {
477                return Ok(deleted_headers)
478            };
479
480            // Stop if we've reached the target block or the highest static file
481            if block_height >= block || Some(block_height) == highest_block {
482                return Ok(deleted_headers)
483            }
484
485            debug!(
486                target: "provider::static_file",
487                ?segment,
488                ?block_height,
489                "Deleting static file below block"
490            );
491
492            // now we need to wipe the static file, this will take care of updating the index and
493            // advance the lowest tracked block height for the segment.
494            let header = self.delete_jar(segment, block_height).inspect_err(|err| {
495                warn!( target: "provider::static_file", ?segment, %block_height, ?err, "Failed to delete static file below block")
496            })?;
497
498            deleted_headers.push(header);
499        }
500    }
501
502    /// Given a segment and block, it deletes the jar and all files from the respective block range.
503    ///
504    /// CAUTION: destructive. Deletes files on disk.
505    ///
506    /// This will re-initialize the index after deletion, so all files are tracked.
507    ///
508    /// Returns the `SegmentHeader` of the deleted jar.
509    pub fn delete_jar(
510        &self,
511        segment: StaticFileSegment,
512        block: BlockNumber,
513    ) -> ProviderResult<SegmentHeader> {
514        let fixed_block_range = self.find_fixed_range(block);
515        let key = (fixed_block_range.end(), segment);
516        let jar = if let Some((_, jar)) = self.map.remove(&key) {
517            jar.jar
518        } else {
519            let file = self.path.join(segment.filename(&fixed_block_range));
520            debug!(
521                target: "provider::static_file",
522                ?file,
523                ?fixed_block_range,
524                ?block,
525                "Loading static file jar for deletion"
526            );
527            NippyJar::<SegmentHeader>::load(&file).map_err(ProviderError::other)?
528        };
529
530        let header = jar.user_header().clone();
531        jar.delete().map_err(ProviderError::other)?;
532
533        self.initialize_index()?;
534
535        Ok(header)
536    }
537
538    /// Given a segment and block range it returns a cached
539    /// [`StaticFileJarProvider`]. TODO(joshie): we should check the size and pop N if there's too
540    /// many.
541    fn get_or_create_jar_provider(
542        &self,
543        segment: StaticFileSegment,
544        fixed_block_range: &SegmentRangeInclusive,
545    ) -> ProviderResult<StaticFileJarProvider<'_, N>> {
546        let key = (fixed_block_range.end(), segment);
547
548        // Avoid using `entry` directly to avoid a write lock in the common case.
549        trace!(target: "provider::static_file", ?segment, ?fixed_block_range, "Getting provider");
550        let mut provider: StaticFileJarProvider<'_, N> = if let Some(jar) = self.map.get(&key) {
551            trace!(target: "provider::static_file", ?segment, ?fixed_block_range, "Jar found in cache");
552            jar.into()
553        } else {
554            trace!(target: "provider::static_file", ?segment, ?fixed_block_range, "Creating jar from scratch");
555            let path = self.path.join(segment.filename(fixed_block_range));
556            let jar = NippyJar::load(&path).map_err(ProviderError::other)?;
557            self.map.entry(key).insert(LoadedJar::new(jar)?).downgrade().into()
558        };
559
560        if let Some(metrics) = &self.metrics {
561            provider = provider.with_metrics(metrics.clone());
562        }
563        Ok(provider)
564    }
565
566    /// Gets a static file segment's block range from the provider inner block
567    /// index.
568    fn get_segment_ranges_from_block(
569        &self,
570        segment: StaticFileSegment,
571        block: u64,
572    ) -> Option<SegmentRangeInclusive> {
573        self.static_files_max_block
574            .read()
575            .get(&segment)
576            .filter(|max| **max >= block)
577            .map(|_| self.find_fixed_range(block))
578    }
579
580    /// Gets a static file segment's fixed block range from the provider inner
581    /// transaction index.
582    fn get_segment_ranges_from_transaction(
583        &self,
584        segment: StaticFileSegment,
585        tx: u64,
586    ) -> Option<SegmentRangeInclusive> {
587        let static_files = self.static_files_tx_index.read();
588        let segment_static_files = static_files.get(&segment)?;
589
590        // It's more probable that the request comes from a newer tx height, so we iterate
591        // the static_files in reverse.
592        let mut static_files_rev_iter = segment_static_files.iter().rev().peekable();
593
594        while let Some((tx_end, block_range)) = static_files_rev_iter.next() {
595            if tx > *tx_end {
596                // request tx is higher than highest static file tx
597                return None
598            }
599            let tx_start = static_files_rev_iter.peek().map(|(tx_end, _)| *tx_end + 1).unwrap_or(0);
600            if tx_start <= tx {
601                return Some(self.find_fixed_range(block_range.end()))
602            }
603        }
604        None
605    }
606
607    /// Updates the inner transaction and block indexes alongside the internal cached providers in
608    /// `self.map`.
609    ///
610    /// Any entry higher than `segment_max_block` will be deleted from the previous structures.
611    ///
612    /// If `segment_max_block` is None it means there's no static file for this segment.
613    pub fn update_index(
614        &self,
615        segment: StaticFileSegment,
616        segment_max_block: Option<BlockNumber>,
617    ) -> ProviderResult<()> {
618        let mut max_block = self.static_files_max_block.write();
619        let mut tx_index = self.static_files_tx_index.write();
620
621        match segment_max_block {
622            Some(segment_max_block) => {
623                // Update the max block for the segment
624                max_block.insert(segment, segment_max_block);
625                let fixed_range = self.find_fixed_range(segment_max_block);
626
627                let jar = NippyJar::<SegmentHeader>::load(
628                    &self.path.join(segment.filename(&fixed_range)),
629                )
630                .map_err(ProviderError::other)?;
631
632                // Updates the tx index by first removing all entries which have a higher
633                // block_start than our current static file.
634                if let Some(tx_range) = jar.user_header().tx_range() {
635                    let tx_end = tx_range.end();
636
637                    // Current block range has the same block start as `fixed_range``, but block end
638                    // might be different if we are still filling this static file.
639                    if let Some(current_block_range) = jar.user_header().block_range().copied() {
640                        // Considering that `update_index` is called when we either append/truncate,
641                        // we are sure that we are handling the latest data
642                        // points.
643                        //
644                        // Here we remove every entry of the index that has a block start higher or
645                        // equal than our current one. This is important in the case
646                        // that we prune a lot of rows resulting in a file (and thus
647                        // a higher block range) deletion.
648                        tx_index
649                            .entry(segment)
650                            .and_modify(|index| {
651                                index.retain(|_, block_range| {
652                                    block_range.start() < fixed_range.start()
653                                });
654                                index.insert(tx_end, current_block_range);
655                            })
656                            .or_insert_with(|| BTreeMap::from([(tx_end, current_block_range)]));
657                    }
658                } else if segment.is_tx_based() {
659                    // The unwinded file has no more transactions/receipts. However, the highest
660                    // block is within this files' block range. We only retain
661                    // entries with block ranges before the current one.
662                    tx_index.entry(segment).and_modify(|index| {
663                        index.retain(|_, block_range| block_range.start() < fixed_range.start());
664                    });
665
666                    // If the index is empty, just remove it.
667                    if tx_index.get(&segment).is_some_and(|index| index.is_empty()) {
668                        tx_index.remove(&segment);
669                    }
670                }
671
672                // Update the cached provider.
673                self.map.insert((fixed_range.end(), segment), LoadedJar::new(jar)?);
674
675                // Delete any cached provider that no longer has an associated jar.
676                self.map.retain(|(end, seg), _| !(*seg == segment && *end > fixed_range.end()));
677            }
678            None => {
679                tx_index.remove(&segment);
680                max_block.remove(&segment);
681            }
682        };
683
684        Ok(())
685    }
686
687    /// Initializes the inner transaction and block index
688    pub fn initialize_index(&self) -> ProviderResult<()> {
689        let mut min_block = self.static_files_min_block.write();
690        let mut max_block = self.static_files_max_block.write();
691        let mut tx_index = self.static_files_tx_index.write();
692
693        min_block.clear();
694        max_block.clear();
695        tx_index.clear();
696
697        for (segment, ranges) in iter_static_files(&self.path).map_err(ProviderError::other)? {
698            // Update first and last block for each segment
699            if let Some((first_block_range, _)) = ranges.first() {
700                min_block.insert(segment, *first_block_range);
701            }
702            if let Some((last_block_range, _)) = ranges.last() {
703                max_block.insert(segment, last_block_range.end());
704            }
705
706            // Update tx -> block_range index
707            for (block_range, tx_range) in ranges {
708                if let Some(tx_range) = tx_range {
709                    let tx_end = tx_range.end();
710
711                    match tx_index.entry(segment) {
712                        Entry::Occupied(mut index) => {
713                            index.get_mut().insert(tx_end, block_range);
714                        }
715                        Entry::Vacant(index) => {
716                            index.insert(BTreeMap::from([(tx_end, block_range)]));
717                        }
718                    };
719                }
720            }
721        }
722
723        // If this is a re-initialization, we need to clear this as well
724        self.map.clear();
725
726        // initialize the expired history height to the lowest static file block
727        if let Some(lowest_range) = min_block.get(&StaticFileSegment::Transactions) {
728            // the earliest height is the lowest available block number
729            self.earliest_history_height
730                .store(lowest_range.start(), std::sync::atomic::Ordering::Relaxed);
731        }
732
733        Ok(())
734    }
735
736    /// Ensures that any broken invariants which cannot be healed on the spot return a pipeline
737    /// target to unwind to.
738    ///
739    /// Two types of consistency checks are done for:
740    ///
741    /// 1) When a static file fails to commit but the underlying data was changed.
742    /// 2) When a static file was committed, but the required database transaction was not.
743    ///
744    /// For 1) it can self-heal if `self.access.is_read_only()` is set to `false`. Otherwise, it
745    /// will return an error.
746    /// For 2) the invariants below are checked, and if broken, might require a pipeline unwind
747    /// to heal.
748    ///
749    /// For each static file segment:
750    /// * the corresponding database table should overlap or have continuity in their keys
751    ///   ([`TxNumber`] or [`BlockNumber`]).
752    /// * its highest block should match the stage checkpoint block number if it's equal or higher
753    ///   than the corresponding database table last entry.
754    ///
755    /// Returns a [`Option`] of [`PipelineTarget::Unwind`] if any healing is further required.
756    ///
757    /// WARNING: No static file writer should be held before calling this function, otherwise it
758    /// will deadlock.
759    pub fn check_consistency<Provider>(
760        &self,
761        provider: &Provider,
762        has_receipt_pruning: bool,
763    ) -> ProviderResult<Option<PipelineTarget>>
764    where
765        Provider: DBProvider + BlockReader + StageCheckpointReader + ChainSpecProvider,
766        N: NodePrimitives<Receipt: Value, BlockHeader: Value, SignedTx: Value>,
767    {
768        // OVM historical import is broken and does not work with this check. It's importing
769        // duplicated receipts resulting in having more receipts than the expected transaction
770        // range.
771        //
772        // If we detect an OVM import was done (block #1 <https://optimistic.etherscan.io/block/1>), skip it.
773        // More on [#11099](https://github.com/paradigmxyz/reth/pull/11099).
774        if provider.chain_spec().is_optimism() &&
775            reth_chainspec::Chain::optimism_mainnet() == provider.chain_spec().chain_id()
776        {
777            // check whether we have the first OVM block: <https://optimistic.etherscan.io/block/0xbee7192e575af30420cae0c7776304ac196077ee72b048970549e4f08e875453>
778            const OVM_HEADER_1_HASH: B256 =
779                b256!("0xbee7192e575af30420cae0c7776304ac196077ee72b048970549e4f08e875453");
780            if provider.block_number(OVM_HEADER_1_HASH)?.is_some() {
781                info!(target: "reth::cli",
782                    "Skipping storage verification for OP mainnet, expected inconsistency in OVM chain"
783                );
784                return Ok(None)
785            }
786        }
787
788        info!(target: "reth::cli", "Verifying storage consistency.");
789
790        let mut unwind_target: Option<BlockNumber> = None;
791        let mut update_unwind_target = |new_target: BlockNumber| {
792            if let Some(target) = unwind_target.as_mut() {
793                *target = (*target).min(new_target);
794            } else {
795                unwind_target = Some(new_target);
796            }
797        };
798
799        for segment in StaticFileSegment::iter() {
800            if has_receipt_pruning && segment.is_receipts() {
801                // Pruned nodes (including full node) do not store receipts as static files.
802                continue
803            }
804
805            if segment.is_receipts() &&
806                (NamedChain::Gnosis == provider.chain_spec().chain_id() ||
807                    NamedChain::Chiado == provider.chain_spec().chain_id())
808            {
809                // Gnosis and Chiado's historical import is broken and does not work with this
810                // check. They are importing receipts along with importing
811                // headers/bodies.
812                continue;
813            }
814
815            let initial_highest_block = self.get_highest_static_file_block(segment);
816
817            //  File consistency is broken if:
818            //
819            // * appending data was interrupted before a config commit, then data file will be
820            //   truncated according to the config.
821            //
822            // * pruning data was interrupted before a config commit, then we have deleted data that
823            //   we are expected to still have. We need to check the Database and unwind everything
824            //   accordingly.
825            if self.access.is_read_only() {
826                self.check_segment_consistency(segment)?;
827            } else {
828                // Fetching the writer will attempt to heal any file level inconsistency.
829                self.latest_writer(segment)?;
830            }
831
832            // Only applies to block-based static files. (Headers)
833            //
834            // The updated `highest_block` may have decreased if we healed from a pruning
835            // interruption.
836            let mut highest_block = self.get_highest_static_file_block(segment);
837            if initial_highest_block != highest_block {
838                info!(
839                    target: "reth::providers::static_file",
840                    ?initial_highest_block,
841                    unwind_target = highest_block,
842                    ?segment,
843                    "Setting unwind target."
844                );
845                update_unwind_target(highest_block.unwrap_or_default());
846            }
847
848            // Only applies to transaction-based static files. (Receipts & Transactions)
849            //
850            // Make sure the last transaction matches the last block from its indices, since a heal
851            // from a pruning interruption might have decreased the number of transactions without
852            // being able to update the last block of the static file segment.
853            let highest_tx = self.get_highest_static_file_tx(segment);
854            if let Some(highest_tx) = highest_tx {
855                let mut last_block = highest_block.unwrap_or_default();
856                loop {
857                    if let Some(indices) = provider.block_body_indices(last_block)? {
858                        if indices.last_tx_num() <= highest_tx {
859                            break
860                        }
861                    } else {
862                        // If the block body indices can not be found, then it means that static
863                        // files is ahead of database, and the `ensure_invariants` check will fix
864                        // it by comparing with stage checkpoints.
865                        break
866                    }
867                    if last_block == 0 {
868                        break
869                    }
870                    last_block -= 1;
871
872                    info!(
873                        target: "reth::providers::static_file",
874                        highest_block = self.get_highest_static_file_block(segment),
875                        unwind_target = last_block,
876                        ?segment,
877                        "Setting unwind target."
878                    );
879                    highest_block = Some(last_block);
880                    update_unwind_target(last_block);
881                }
882            }
883
884            if let Some(unwind) = match segment {
885                StaticFileSegment::Headers => self
886                    .ensure_invariants::<_, tables::Headers<N::BlockHeader>>(
887                        provider,
888                        segment,
889                        highest_block,
890                        highest_block,
891                    )?,
892                StaticFileSegment::Transactions => self
893                    .ensure_invariants::<_, tables::Transactions<N::SignedTx>>(
894                        provider,
895                        segment,
896                        highest_tx,
897                        highest_block,
898                    )?,
899                StaticFileSegment::Receipts => self
900                    .ensure_invariants::<_, tables::Receipts<N::Receipt>>(
901                        provider,
902                        segment,
903                        highest_tx,
904                        highest_block,
905                    )?,
906            } {
907                update_unwind_target(unwind);
908            }
909        }
910
911        Ok(unwind_target.map(PipelineTarget::Unwind))
912    }
913
914    /// Checks consistency of the latest static file segment and throws an error if at fault.
915    /// Read-only.
916    pub fn check_segment_consistency(&self, segment: StaticFileSegment) -> ProviderResult<()> {
917        if let Some(latest_block) = self.get_highest_static_file_block(segment) {
918            let file_path =
919                self.directory().join(segment.filename(&self.find_fixed_range(latest_block)));
920
921            let jar = NippyJar::<SegmentHeader>::load(&file_path).map_err(ProviderError::other)?;
922
923            NippyJarChecker::new(jar).check_consistency().map_err(ProviderError::other)?;
924        }
925        Ok(())
926    }
927
928    /// Check invariants for each corresponding table and static file segment:
929    ///
930    /// * the corresponding database table should overlap or have continuity in their keys
931    ///   ([`TxNumber`] or [`BlockNumber`]).
932    /// * its highest block should match the stage checkpoint block number if it's equal or higher
933    ///   than the corresponding database table last entry.
934    ///   * If the checkpoint block is higher, then request a pipeline unwind to the static file
935    ///     block. This is expressed by returning [`Some`] with the requested pipeline unwind
936    ///     target.
937    ///   * If the checkpoint block is lower, then heal by removing rows from the static file. In
938    ///     this case, the rows will be removed and [`None`] will be returned.
939    ///
940    /// * If the database tables overlap with static files and have contiguous keys, or the
941    ///   checkpoint block matches the highest static files block, then [`None`] will be returned.
942    fn ensure_invariants<Provider, T: Table<Key = u64>>(
943        &self,
944        provider: &Provider,
945        segment: StaticFileSegment,
946        highest_static_file_entry: Option<u64>,
947        highest_static_file_block: Option<BlockNumber>,
948    ) -> ProviderResult<Option<BlockNumber>>
949    where
950        Provider: DBProvider + BlockReader + StageCheckpointReader,
951    {
952        let mut db_cursor = provider.tx_ref().cursor_read::<T>()?;
953
954        if let Some((db_first_entry, _)) = db_cursor.first()? {
955            if let (Some(highest_entry), Some(highest_block)) =
956                (highest_static_file_entry, highest_static_file_block)
957            {
958                // If there is a gap between the entry found in static file and
959                // database, then we have most likely lost static file data and need to unwind so we
960                // can load it again
961                if !(db_first_entry <= highest_entry || highest_entry + 1 == db_first_entry) {
962                    info!(
963                        target: "reth::providers::static_file",
964                        ?db_first_entry,
965                        ?highest_entry,
966                        unwind_target = highest_block,
967                        ?segment,
968                        "Setting unwind target."
969                    );
970                    return Ok(Some(highest_block))
971                }
972            }
973
974            if let Some((db_last_entry, _)) = db_cursor.last()? &&
975                highest_static_file_entry
976                    .is_none_or(|highest_entry| db_last_entry > highest_entry)
977            {
978                return Ok(None)
979            }
980        }
981
982        let highest_static_file_entry = highest_static_file_entry.unwrap_or_default();
983        let highest_static_file_block = highest_static_file_block.unwrap_or_default();
984
985        // If static file entry is ahead of the database entries, then ensure the checkpoint block
986        // number matches.
987        let checkpoint_block_number = provider
988            .get_stage_checkpoint(match segment {
989                StaticFileSegment::Headers => StageId::Headers,
990                StaticFileSegment::Transactions => StageId::Bodies,
991                StaticFileSegment::Receipts => StageId::Execution,
992            })?
993            .unwrap_or_default()
994            .block_number;
995
996        // If the checkpoint is ahead, then we lost static file data. May be data corruption.
997        if checkpoint_block_number > highest_static_file_block {
998            info!(
999                target: "reth::providers::static_file",
1000                checkpoint_block_number,
1001                unwind_target = highest_static_file_block,
1002                ?segment,
1003                "Setting unwind target."
1004            );
1005            return Ok(Some(highest_static_file_block))
1006        }
1007
1008        // If the checkpoint is behind, then we failed to do a database commit **but committed** to
1009        // static files on executing a stage, or the reverse on unwinding a stage.
1010        // All we need to do is to prune the extra static file rows.
1011        if checkpoint_block_number < highest_static_file_block {
1012            info!(
1013                target: "reth::providers",
1014                ?segment,
1015                from = highest_static_file_block,
1016                to = checkpoint_block_number,
1017                "Unwinding static file segment."
1018            );
1019            let mut writer = self.latest_writer(segment)?;
1020            if segment.is_headers() {
1021                // TODO(joshie): is_block_meta
1022                writer.prune_headers(highest_static_file_block - checkpoint_block_number)?;
1023            } else if let Some(block) = provider.block_body_indices(checkpoint_block_number)? {
1024                // todo joshie: is querying block_body_indices a potential issue once bbi is moved
1025                // to sf as well
1026                let number = highest_static_file_entry - block.last_tx_num();
1027                if segment.is_receipts() {
1028                    writer.prune_receipts(number, checkpoint_block_number)?;
1029                } else {
1030                    writer.prune_transactions(number, checkpoint_block_number)?;
1031                }
1032            }
1033            writer.commit()?;
1034        }
1035
1036        Ok(None)
1037    }
1038
1039    /// Returns the earliest available block number that has not been expired and is still
1040    /// available.
1041    ///
1042    /// This means that the highest expired block (or expired block height) is
1043    /// `earliest_history_height.saturating_sub(1)`.
1044    ///
1045    /// Returns `0` if no history has been expired.
1046    pub fn earliest_history_height(&self) -> BlockNumber {
1047        self.earliest_history_height.load(std::sync::atomic::Ordering::Relaxed)
1048    }
1049
1050    /// Gets the lowest transaction static file block if it exists.
1051    ///
1052    /// For example if the transactions static file has blocks 0-499, this will return 499..
1053    ///
1054    /// If there is nothing on disk for the given segment, this will return [`None`].
1055    pub fn get_lowest_transaction_static_file_block(&self) -> Option<BlockNumber> {
1056        self.get_lowest_static_file_block(StaticFileSegment::Transactions)
1057    }
1058
1059    /// Gets the lowest static file's block height if it exists for a static file segment.
1060    ///
1061    /// For example if the static file has blocks 0-499, this will return 499..
1062    ///
1063    /// If there is nothing on disk for the given segment, this will return [`None`].
1064    pub fn get_lowest_static_file_block(&self, segment: StaticFileSegment) -> Option<BlockNumber> {
1065        self.static_files_min_block.read().get(&segment).map(|range| range.end())
1066    }
1067
1068    /// Gets the lowest static file's block range if it exists for a static file segment.
1069    ///
1070    /// If there is nothing on disk for the given segment, this will return [`None`].
1071    pub fn get_lowest_range(&self, segment: StaticFileSegment) -> Option<SegmentRangeInclusive> {
1072        self.static_files_min_block.read().get(&segment).copied()
1073    }
1074
1075    /// Gets the highest static file's block height if it exists for a static file segment.
1076    ///
1077    /// If there is nothing on disk for the given segment, this will return [`None`].
1078    pub fn get_highest_static_file_block(&self, segment: StaticFileSegment) -> Option<BlockNumber> {
1079        self.static_files_max_block.read().get(&segment).copied()
1080    }
1081
1082    /// Gets the highest static file transaction.
1083    ///
1084    /// If there is nothing on disk for the given segment, this will return [`None`].
1085    pub fn get_highest_static_file_tx(&self, segment: StaticFileSegment) -> Option<TxNumber> {
1086        self.static_files_tx_index
1087            .read()
1088            .get(&segment)
1089            .and_then(|index| index.last_key_value().map(|(last_tx, _)| *last_tx))
1090    }
1091
1092    /// Gets the highest static file block for all segments.
1093    pub fn get_highest_static_files(&self) -> HighestStaticFiles {
1094        HighestStaticFiles {
1095            receipts: self.get_highest_static_file_block(StaticFileSegment::Receipts),
1096        }
1097    }
1098
1099    /// Iterates through segment `static_files` in reverse order, executing a function until it
1100    /// returns some object. Useful for finding objects by [`TxHash`] or [`BlockHash`].
1101    pub fn find_static_file<T>(
1102        &self,
1103        segment: StaticFileSegment,
1104        func: impl Fn(StaticFileJarProvider<'_, N>) -> ProviderResult<Option<T>>,
1105    ) -> ProviderResult<Option<T>> {
1106        if let Some(highest_block) = self.get_highest_static_file_block(segment) {
1107            let mut range = self.find_fixed_range(highest_block);
1108            while range.end() > 0 {
1109                if let Some(res) = func(self.get_or_create_jar_provider(segment, &range)?)? {
1110                    return Ok(Some(res))
1111                }
1112                range = SegmentRangeInclusive::new(
1113                    range.start().saturating_sub(self.blocks_per_file),
1114                    range.end().saturating_sub(self.blocks_per_file),
1115                );
1116            }
1117        }
1118
1119        Ok(None)
1120    }
1121
1122    /// Fetches data within a specified range across multiple static files.
1123    ///
1124    /// This function iteratively retrieves data using `get_fn` for each item in the given range.
1125    /// It continues fetching until the end of the range is reached or the provided `predicate`
1126    /// returns false.
1127    pub fn fetch_range_with_predicate<T, F, P>(
1128        &self,
1129        segment: StaticFileSegment,
1130        range: Range<u64>,
1131        mut get_fn: F,
1132        mut predicate: P,
1133    ) -> ProviderResult<Vec<T>>
1134    where
1135        F: FnMut(&mut StaticFileCursor<'_>, u64) -> ProviderResult<Option<T>>,
1136        P: FnMut(&T) -> bool,
1137    {
1138        let mut result = Vec::with_capacity((range.end - range.start).min(100) as usize);
1139
1140        /// Resolves to the provider for the given block or transaction number.
1141        ///
1142        /// If the static file is missing, the `result` is returned.
1143        macro_rules! get_provider {
1144            ($number:expr) => {{
1145                let provider = if segment.is_block_based() {
1146                    self.get_segment_provider_from_block(segment, $number, None)
1147                } else {
1148                    self.get_segment_provider_from_transaction(segment, $number, None)
1149                };
1150
1151                match provider {
1152                    Ok(provider) => provider,
1153                    Err(
1154                        ProviderError::MissingStaticFileBlock(_, _) |
1155                        ProviderError::MissingStaticFileTx(_, _),
1156                    ) => return Ok(result),
1157                    Err(err) => return Err(err),
1158                }
1159            }};
1160        }
1161
1162        let mut provider = get_provider!(range.start);
1163        let mut cursor = provider.cursor()?;
1164
1165        // advances number in range
1166        'outer: for number in range {
1167            // The `retrying` flag ensures a single retry attempt per `number`. If `get_fn` fails to
1168            // access data in two different static files, it halts further attempts by returning
1169            // an error, effectively preventing infinite retry loops.
1170            let mut retrying = false;
1171
1172            // advances static files if `get_fn` returns None
1173            'inner: loop {
1174                match get_fn(&mut cursor, number)? {
1175                    Some(res) => {
1176                        if !predicate(&res) {
1177                            break 'outer
1178                        }
1179                        result.push(res);
1180                        break 'inner
1181                    }
1182                    None => {
1183                        if retrying {
1184                            return Ok(result)
1185                        }
1186                        // There is a very small chance of hitting a deadlock if two consecutive
1187                        // static files share the same bucket in the
1188                        // internal dashmap and we don't drop the current provider
1189                        // before requesting the next one.
1190                        drop(cursor);
1191                        drop(provider);
1192                        provider = get_provider!(number);
1193                        cursor = provider.cursor()?;
1194                        retrying = true;
1195                    }
1196                }
1197            }
1198        }
1199
1200        Ok(result)
1201    }
1202
1203    /// Fetches data within a specified range across multiple static files.
1204    ///
1205    /// Returns an iterator over the data
1206    pub fn fetch_range_iter<'a, T, F>(
1207        &'a self,
1208        segment: StaticFileSegment,
1209        range: Range<u64>,
1210        get_fn: F,
1211    ) -> ProviderResult<impl Iterator<Item = ProviderResult<T>> + 'a>
1212    where
1213        F: Fn(&mut StaticFileCursor<'_>, u64) -> ProviderResult<Option<T>> + 'a,
1214        T: std::fmt::Debug,
1215    {
1216        let get_provider = move |start: u64| {
1217            if segment.is_block_based() {
1218                self.get_segment_provider_from_block(segment, start, None)
1219            } else {
1220                self.get_segment_provider_from_transaction(segment, start, None)
1221            }
1222        };
1223
1224        let mut provider = Some(get_provider(range.start)?);
1225        Ok(range.filter_map(move |number| {
1226            match get_fn(&mut provider.as_ref().expect("qed").cursor().ok()?, number).transpose() {
1227                Some(result) => Some(result),
1228                None => {
1229                    // There is a very small chance of hitting a deadlock if two consecutive static
1230                    // files share the same bucket in the internal dashmap and
1231                    // we don't drop the current provider before requesting the
1232                    // next one.
1233                    provider.take();
1234                    provider = Some(get_provider(number).ok()?);
1235                    get_fn(&mut provider.as_ref().expect("qed").cursor().ok()?, number).transpose()
1236                }
1237            }
1238        }))
1239    }
1240
1241    /// Returns directory where `static_files` are located.
1242    pub fn directory(&self) -> &Path {
1243        &self.path
1244    }
1245
1246    /// Retrieves data from the database or static file, wherever it's available.
1247    ///
1248    /// # Arguments
1249    /// * `segment` - The segment of the static file to check against.
1250    /// * `index_key` - Requested index key, usually a block or transaction number.
1251    /// * `fetch_from_static_file` - A closure that defines how to fetch the data from the static
1252    ///   file provider.
1253    /// * `fetch_from_database` - A closure that defines how to fetch the data from the database
1254    ///   when the static file doesn't contain the required data or is not available.
1255    pub fn get_with_static_file_or_database<T, FS, FD>(
1256        &self,
1257        segment: StaticFileSegment,
1258        number: u64,
1259        fetch_from_static_file: FS,
1260        fetch_from_database: FD,
1261    ) -> ProviderResult<Option<T>>
1262    where
1263        FS: Fn(&Self) -> ProviderResult<Option<T>>,
1264        FD: Fn() -> ProviderResult<Option<T>>,
1265    {
1266        // If there is, check the maximum block or transaction number of the segment.
1267        let static_file_upper_bound = if segment.is_block_based() {
1268            self.get_highest_static_file_block(segment)
1269        } else {
1270            self.get_highest_static_file_tx(segment)
1271        };
1272
1273        if static_file_upper_bound
1274            .is_some_and(|static_file_upper_bound| static_file_upper_bound >= number)
1275        {
1276            return fetch_from_static_file(self)
1277        }
1278        fetch_from_database()
1279    }
1280
1281    /// Gets data within a specified range, potentially spanning different `static_files` and
1282    /// database.
1283    ///
1284    /// # Arguments
1285    /// * `segment` - The segment of the static file to query.
1286    /// * `block_range` - The range of data to fetch.
1287    /// * `fetch_from_static_file` - A function to fetch data from the `static_file`.
1288    /// * `fetch_from_database` - A function to fetch data from the database.
1289    /// * `predicate` - A function used to evaluate each item in the fetched data. Fetching is
1290    ///   terminated when this function returns false, thereby filtering the data based on the
1291    ///   provided condition.
1292    pub fn get_range_with_static_file_or_database<T, P, FS, FD>(
1293        &self,
1294        segment: StaticFileSegment,
1295        mut block_or_tx_range: Range<u64>,
1296        fetch_from_static_file: FS,
1297        mut fetch_from_database: FD,
1298        mut predicate: P,
1299    ) -> ProviderResult<Vec<T>>
1300    where
1301        FS: Fn(&Self, Range<u64>, &mut P) -> ProviderResult<Vec<T>>,
1302        FD: FnMut(Range<u64>, P) -> ProviderResult<Vec<T>>,
1303        P: FnMut(&T) -> bool,
1304    {
1305        let mut data = Vec::new();
1306
1307        // If there is, check the maximum block or transaction number of the segment.
1308        if let Some(static_file_upper_bound) = if segment.is_block_based() {
1309            self.get_highest_static_file_block(segment)
1310        } else {
1311            self.get_highest_static_file_tx(segment)
1312        } && block_or_tx_range.start <= static_file_upper_bound
1313        {
1314            let end = block_or_tx_range.end.min(static_file_upper_bound + 1);
1315            data.extend(fetch_from_static_file(
1316                self,
1317                block_or_tx_range.start..end,
1318                &mut predicate,
1319            )?);
1320            block_or_tx_range.start = end;
1321        }
1322
1323        if block_or_tx_range.end > block_or_tx_range.start {
1324            data.extend(fetch_from_database(block_or_tx_range, predicate)?)
1325        }
1326
1327        Ok(data)
1328    }
1329
1330    /// Returns `static_files` directory
1331    #[cfg(any(test, feature = "test-utils"))]
1332    pub fn path(&self) -> &Path {
1333        &self.path
1334    }
1335
1336    /// Returns `static_files` transaction index
1337    #[cfg(any(test, feature = "test-utils"))]
1338    pub fn tx_index(&self) -> &RwLock<SegmentRanges> {
1339        &self.static_files_tx_index
1340    }
1341}
1342
1343/// Helper trait to manage different [`StaticFileProviderRW`] of an `Arc<StaticFileProvider`
1344pub trait StaticFileWriter {
1345    /// The primitives type used by the static file provider.
1346    type Primitives: Send + Sync + 'static;
1347
1348    /// Returns a mutable reference to a [`StaticFileProviderRW`] of a [`StaticFileSegment`].
1349    fn get_writer(
1350        &self,
1351        block: BlockNumber,
1352        segment: StaticFileSegment,
1353    ) -> ProviderResult<StaticFileProviderRWRefMut<'_, Self::Primitives>>;
1354
1355    /// Returns a mutable reference to a [`StaticFileProviderRW`] of the latest
1356    /// [`StaticFileSegment`].
1357    fn latest_writer(
1358        &self,
1359        segment: StaticFileSegment,
1360    ) -> ProviderResult<StaticFileProviderRWRefMut<'_, Self::Primitives>>;
1361
1362    /// Commits all changes of all [`StaticFileProviderRW`] of all [`StaticFileSegment`].
1363    fn commit(&self) -> ProviderResult<()>;
1364
1365    /// Returns `true` if the static file provider has unwind queued.
1366    fn has_unwind_queued(&self) -> bool;
1367}
1368
1369impl<N: NodePrimitives> StaticFileWriter for StaticFileProvider<N> {
1370    type Primitives = N;
1371
1372    fn get_writer(
1373        &self,
1374        block: BlockNumber,
1375        segment: StaticFileSegment,
1376    ) -> ProviderResult<StaticFileProviderRWRefMut<'_, Self::Primitives>> {
1377        if self.access.is_read_only() {
1378            return Err(ProviderError::ReadOnlyStaticFileAccess)
1379        }
1380
1381        trace!(target: "provider::static_file", ?block, ?segment, "Getting static file writer.");
1382        self.writers.get_or_create(segment, || {
1383            StaticFileProviderRW::new(segment, block, Arc::downgrade(&self.0), self.metrics.clone())
1384        })
1385    }
1386
1387    fn latest_writer(
1388        &self,
1389        segment: StaticFileSegment,
1390    ) -> ProviderResult<StaticFileProviderRWRefMut<'_, Self::Primitives>> {
1391        self.get_writer(self.get_highest_static_file_block(segment).unwrap_or_default(), segment)
1392    }
1393
1394    fn commit(&self) -> ProviderResult<()> {
1395        self.writers.commit()
1396    }
1397
1398    fn has_unwind_queued(&self) -> bool {
1399        self.writers.has_unwind_queued()
1400    }
1401}
1402
1403impl<N: NodePrimitives<BlockHeader: Value>> HeaderProvider for StaticFileProvider<N> {
1404    type Header = N::BlockHeader;
1405
1406    fn header(&self, block_hash: BlockHash) -> ProviderResult<Option<Self::Header>> {
1407        self.find_static_file(StaticFileSegment::Headers, |jar_provider| {
1408            Ok(jar_provider
1409                .cursor()?
1410                .get_two::<HeaderWithHashMask<Self::Header>>((&block_hash).into())?
1411                .and_then(|(header, hash)| {
1412                    if hash == block_hash {
1413                        return Some(header)
1414                    }
1415                    None
1416                }))
1417        })
1418    }
1419
1420    fn header_by_number(&self, num: BlockNumber) -> ProviderResult<Option<Self::Header>> {
1421        self.get_segment_provider_from_block(StaticFileSegment::Headers, num, None)
1422            .and_then(|provider| provider.header_by_number(num))
1423            .or_else(|err| {
1424                if let ProviderError::MissingStaticFileBlock(_, _) = err {
1425                    Ok(None)
1426                } else {
1427                    Err(err)
1428                }
1429            })
1430    }
1431
1432    fn headers_range(
1433        &self,
1434        range: impl RangeBounds<BlockNumber>,
1435    ) -> ProviderResult<Vec<Self::Header>> {
1436        self.fetch_range_with_predicate(
1437            StaticFileSegment::Headers,
1438            to_range(range),
1439            |cursor, number| cursor.get_one::<HeaderMask<Self::Header>>(number.into()),
1440            |_| true,
1441        )
1442    }
1443
1444    fn sealed_header(
1445        &self,
1446        num: BlockNumber,
1447    ) -> ProviderResult<Option<SealedHeader<Self::Header>>> {
1448        self.get_segment_provider_from_block(StaticFileSegment::Headers, num, None)
1449            .and_then(|provider| provider.sealed_header(num))
1450            .or_else(|err| {
1451                if let ProviderError::MissingStaticFileBlock(_, _) = err {
1452                    Ok(None)
1453                } else {
1454                    Err(err)
1455                }
1456            })
1457    }
1458
1459    fn sealed_headers_while(
1460        &self,
1461        range: impl RangeBounds<BlockNumber>,
1462        predicate: impl FnMut(&SealedHeader<Self::Header>) -> bool,
1463    ) -> ProviderResult<Vec<SealedHeader<Self::Header>>> {
1464        self.fetch_range_with_predicate(
1465            StaticFileSegment::Headers,
1466            to_range(range),
1467            |cursor, number| {
1468                Ok(cursor
1469                    .get_two::<HeaderWithHashMask<Self::Header>>(number.into())?
1470                    .map(|(header, hash)| SealedHeader::new(header, hash)))
1471            },
1472            predicate,
1473        )
1474    }
1475}
1476
1477impl<N: NodePrimitives> BlockHashReader for StaticFileProvider<N> {
1478    fn block_hash(&self, num: u64) -> ProviderResult<Option<B256>> {
1479        self.get_segment_provider_from_block(StaticFileSegment::Headers, num, None)
1480            .and_then(|provider| provider.block_hash(num))
1481            .or_else(|err| {
1482                if let ProviderError::MissingStaticFileBlock(_, _) = err {
1483                    Ok(None)
1484                } else {
1485                    Err(err)
1486                }
1487            })
1488    }
1489
1490    fn canonical_hashes_range(
1491        &self,
1492        start: BlockNumber,
1493        end: BlockNumber,
1494    ) -> ProviderResult<Vec<B256>> {
1495        self.fetch_range_with_predicate(
1496            StaticFileSegment::Headers,
1497            start..end,
1498            |cursor, number| cursor.get_one::<BlockHashMask>(number.into()),
1499            |_| true,
1500        )
1501    }
1502}
1503
1504impl<N: NodePrimitives<SignedTx: Value + SignedTransaction, Receipt: Value>> ReceiptProvider
1505    for StaticFileProvider<N>
1506{
1507    type Receipt = N::Receipt;
1508
1509    fn receipt(&self, num: TxNumber) -> ProviderResult<Option<Self::Receipt>> {
1510        self.get_segment_provider_from_transaction(StaticFileSegment::Receipts, num, None)
1511            .and_then(|provider| provider.receipt(num))
1512            .or_else(|err| {
1513                if let ProviderError::MissingStaticFileTx(_, _) = err {
1514                    Ok(None)
1515                } else {
1516                    Err(err)
1517                }
1518            })
1519    }
1520
1521    fn receipt_by_hash(&self, hash: TxHash) -> ProviderResult<Option<Self::Receipt>> {
1522        if let Some(num) = self.transaction_id(hash)? {
1523            return self.receipt(num)
1524        }
1525        Ok(None)
1526    }
1527
1528    fn receipts_by_block(
1529        &self,
1530        _block: BlockHashOrNumber,
1531    ) -> ProviderResult<Option<Vec<Self::Receipt>>> {
1532        unreachable!()
1533    }
1534
1535    fn receipts_by_tx_range(
1536        &self,
1537        range: impl RangeBounds<TxNumber>,
1538    ) -> ProviderResult<Vec<Self::Receipt>> {
1539        self.fetch_range_with_predicate(
1540            StaticFileSegment::Receipts,
1541            to_range(range),
1542            |cursor, number| cursor.get_one::<ReceiptMask<Self::Receipt>>(number.into()),
1543            |_| true,
1544        )
1545    }
1546
1547    fn receipts_by_block_range(
1548        &self,
1549        _block_range: RangeInclusive<BlockNumber>,
1550    ) -> ProviderResult<Vec<Vec<Self::Receipt>>> {
1551        Err(ProviderError::UnsupportedProvider)
1552    }
1553}
1554
1555impl<N: NodePrimitives<SignedTx: Value, Receipt: Value, BlockHeader: Value>> TransactionsProviderExt
1556    for StaticFileProvider<N>
1557{
1558    fn transaction_hashes_by_range(
1559        &self,
1560        tx_range: Range<TxNumber>,
1561    ) -> ProviderResult<Vec<(TxHash, TxNumber)>> {
1562        let tx_range_size = (tx_range.end - tx_range.start) as usize;
1563
1564        // Transactions are different size, so chunks will not all take the same processing time. If
1565        // chunks are too big, there will be idle threads waiting for work. Choosing an
1566        // arbitrary smaller value to make sure it doesn't happen.
1567        let chunk_size = 100;
1568
1569        // iterator over the chunks
1570        let chunks = tx_range
1571            .clone()
1572            .step_by(chunk_size)
1573            .map(|start| start..std::cmp::min(start + chunk_size as u64, tx_range.end));
1574        let mut channels = Vec::with_capacity(tx_range_size.div_ceil(chunk_size));
1575
1576        for chunk_range in chunks {
1577            let (channel_tx, channel_rx) = mpsc::channel();
1578            channels.push(channel_rx);
1579
1580            let manager = self.clone();
1581
1582            // Spawn the task onto the global rayon pool
1583            // This task will send the results through the channel after it has calculated
1584            // the hash.
1585            rayon::spawn(move || {
1586                let mut rlp_buf = Vec::with_capacity(128);
1587                let _ = manager.fetch_range_with_predicate(
1588                    StaticFileSegment::Transactions,
1589                    chunk_range,
1590                    |cursor, number| {
1591                        Ok(cursor
1592                            .get_one::<TransactionMask<Self::Transaction>>(number.into())?
1593                            .map(|transaction| {
1594                                rlp_buf.clear();
1595                                let _ = channel_tx
1596                                    .send(calculate_hash((number, transaction), &mut rlp_buf));
1597                            }))
1598                    },
1599                    |_| true,
1600                );
1601            });
1602        }
1603
1604        let mut tx_list = Vec::with_capacity(tx_range_size);
1605
1606        // Iterate over channels and append the tx hashes unsorted
1607        for channel in channels {
1608            while let Ok(tx) = channel.recv() {
1609                let (tx_hash, tx_id) = tx.map_err(|boxed| *boxed)?;
1610                tx_list.push((tx_hash, tx_id));
1611            }
1612        }
1613
1614        Ok(tx_list)
1615    }
1616}
1617
1618impl<N: NodePrimitives<SignedTx: Decompress + SignedTransaction>> TransactionsProvider
1619    for StaticFileProvider<N>
1620{
1621    type Transaction = N::SignedTx;
1622
1623    fn transaction_id(&self, tx_hash: TxHash) -> ProviderResult<Option<TxNumber>> {
1624        self.find_static_file(StaticFileSegment::Transactions, |jar_provider| {
1625            let mut cursor = jar_provider.cursor()?;
1626            if cursor
1627                .get_one::<TransactionMask<Self::Transaction>>((&tx_hash).into())?
1628                .and_then(|tx| (tx.trie_hash() == tx_hash).then_some(tx))
1629                .is_some()
1630            {
1631                Ok(cursor.number())
1632            } else {
1633                Ok(None)
1634            }
1635        })
1636    }
1637
1638    fn transaction_by_id(&self, num: TxNumber) -> ProviderResult<Option<Self::Transaction>> {
1639        self.get_segment_provider_from_transaction(StaticFileSegment::Transactions, num, None)
1640            .and_then(|provider| provider.transaction_by_id(num))
1641            .or_else(|err| {
1642                if let ProviderError::MissingStaticFileTx(_, _) = err {
1643                    Ok(None)
1644                } else {
1645                    Err(err)
1646                }
1647            })
1648    }
1649
1650    fn transaction_by_id_unhashed(
1651        &self,
1652        num: TxNumber,
1653    ) -> ProviderResult<Option<Self::Transaction>> {
1654        self.get_segment_provider_from_transaction(StaticFileSegment::Transactions, num, None)
1655            .and_then(|provider| provider.transaction_by_id_unhashed(num))
1656            .or_else(|err| {
1657                if let ProviderError::MissingStaticFileTx(_, _) = err {
1658                    Ok(None)
1659                } else {
1660                    Err(err)
1661                }
1662            })
1663    }
1664
1665    fn transaction_by_hash(&self, hash: TxHash) -> ProviderResult<Option<Self::Transaction>> {
1666        self.find_static_file(StaticFileSegment::Transactions, |jar_provider| {
1667            Ok(jar_provider
1668                .cursor()?
1669                .get_one::<TransactionMask<Self::Transaction>>((&hash).into())?
1670                .and_then(|tx| (tx.trie_hash() == hash).then_some(tx)))
1671        })
1672    }
1673
1674    fn transaction_by_hash_with_meta(
1675        &self,
1676        _hash: TxHash,
1677    ) -> ProviderResult<Option<(Self::Transaction, TransactionMeta)>> {
1678        // Required data not present in static_files
1679        Err(ProviderError::UnsupportedProvider)
1680    }
1681
1682    fn transaction_block(&self, _id: TxNumber) -> ProviderResult<Option<BlockNumber>> {
1683        // Required data not present in static_files
1684        Err(ProviderError::UnsupportedProvider)
1685    }
1686
1687    fn transactions_by_block(
1688        &self,
1689        _block_id: BlockHashOrNumber,
1690    ) -> ProviderResult<Option<Vec<Self::Transaction>>> {
1691        // Required data not present in static_files
1692        Err(ProviderError::UnsupportedProvider)
1693    }
1694
1695    fn transactions_by_block_range(
1696        &self,
1697        _range: impl RangeBounds<BlockNumber>,
1698    ) -> ProviderResult<Vec<Vec<Self::Transaction>>> {
1699        // Required data not present in static_files
1700        Err(ProviderError::UnsupportedProvider)
1701    }
1702
1703    fn transactions_by_tx_range(
1704        &self,
1705        range: impl RangeBounds<TxNumber>,
1706    ) -> ProviderResult<Vec<Self::Transaction>> {
1707        self.fetch_range_with_predicate(
1708            StaticFileSegment::Transactions,
1709            to_range(range),
1710            |cursor, number| cursor.get_one::<TransactionMask<Self::Transaction>>(number.into()),
1711            |_| true,
1712        )
1713    }
1714
1715    fn senders_by_tx_range(
1716        &self,
1717        range: impl RangeBounds<TxNumber>,
1718    ) -> ProviderResult<Vec<Address>> {
1719        let txes = self.transactions_by_tx_range(range)?;
1720        Ok(reth_primitives_traits::transaction::recover::recover_signers(&txes)?)
1721    }
1722
1723    fn transaction_sender(&self, id: TxNumber) -> ProviderResult<Option<Address>> {
1724        match self.transaction_by_id_unhashed(id)? {
1725            Some(tx) => Ok(tx.recover_signer().ok()),
1726            None => Ok(None),
1727        }
1728    }
1729}
1730
1731impl<N: NodePrimitives> BlockNumReader for StaticFileProvider<N> {
1732    fn chain_info(&self) -> ProviderResult<ChainInfo> {
1733        // Required data not present in static_files
1734        Err(ProviderError::UnsupportedProvider)
1735    }
1736
1737    fn best_block_number(&self) -> ProviderResult<BlockNumber> {
1738        // Required data not present in static_files
1739        Err(ProviderError::UnsupportedProvider)
1740    }
1741
1742    fn last_block_number(&self) -> ProviderResult<BlockNumber> {
1743        Ok(self.get_highest_static_file_block(StaticFileSegment::Headers).unwrap_or_default())
1744    }
1745
1746    fn block_number(&self, _hash: B256) -> ProviderResult<Option<BlockNumber>> {
1747        // Required data not present in static_files
1748        Err(ProviderError::UnsupportedProvider)
1749    }
1750}
1751
1752/* Cannot be successfully implemented but must exist for trait requirements */
1753
1754impl<N: NodePrimitives<SignedTx: Value, Receipt: Value, BlockHeader: Value>> BlockReader
1755    for StaticFileProvider<N>
1756{
1757    type Block = N::Block;
1758
1759    fn find_block_by_hash(
1760        &self,
1761        _hash: B256,
1762        _source: BlockSource,
1763    ) -> ProviderResult<Option<Self::Block>> {
1764        // Required data not present in static_files
1765        Err(ProviderError::UnsupportedProvider)
1766    }
1767
1768    fn block(&self, _id: BlockHashOrNumber) -> ProviderResult<Option<Self::Block>> {
1769        // Required data not present in static_files
1770        Err(ProviderError::UnsupportedProvider)
1771    }
1772
1773    fn pending_block(&self) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
1774        // Required data not present in static_files
1775        Err(ProviderError::UnsupportedProvider)
1776    }
1777
1778    fn pending_block_and_receipts(
1779        &self,
1780    ) -> ProviderResult<Option<(RecoveredBlock<Self::Block>, Vec<Self::Receipt>)>> {
1781        // Required data not present in static_files
1782        Err(ProviderError::UnsupportedProvider)
1783    }
1784
1785    fn recovered_block(
1786        &self,
1787        _id: BlockHashOrNumber,
1788        _transaction_kind: TransactionVariant,
1789    ) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
1790        // Required data not present in static_files
1791        Err(ProviderError::UnsupportedProvider)
1792    }
1793
1794    fn sealed_block_with_senders(
1795        &self,
1796        _id: BlockHashOrNumber,
1797        _transaction_kind: TransactionVariant,
1798    ) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
1799        // Required data not present in static_files
1800        Err(ProviderError::UnsupportedProvider)
1801    }
1802
1803    fn block_range(&self, _range: RangeInclusive<BlockNumber>) -> ProviderResult<Vec<Self::Block>> {
1804        // Required data not present in static_files
1805        Err(ProviderError::UnsupportedProvider)
1806    }
1807
1808    fn block_with_senders_range(
1809        &self,
1810        _range: RangeInclusive<BlockNumber>,
1811    ) -> ProviderResult<Vec<RecoveredBlock<Self::Block>>> {
1812        Err(ProviderError::UnsupportedProvider)
1813    }
1814
1815    fn recovered_block_range(
1816        &self,
1817        _range: RangeInclusive<BlockNumber>,
1818    ) -> ProviderResult<Vec<RecoveredBlock<Self::Block>>> {
1819        Err(ProviderError::UnsupportedProvider)
1820    }
1821
1822    fn block_by_transaction_id(&self, _id: TxNumber) -> ProviderResult<Option<BlockNumber>> {
1823        Err(ProviderError::UnsupportedProvider)
1824    }
1825}
1826
1827impl<N: NodePrimitives> BlockBodyIndicesProvider for StaticFileProvider<N> {
1828    fn block_body_indices(&self, _num: u64) -> ProviderResult<Option<StoredBlockBodyIndices>> {
1829        Err(ProviderError::UnsupportedProvider)
1830    }
1831
1832    fn block_body_indices_range(
1833        &self,
1834        _range: RangeInclusive<BlockNumber>,
1835    ) -> ProviderResult<Vec<StoredBlockBodyIndices>> {
1836        Err(ProviderError::UnsupportedProvider)
1837    }
1838}
1839
1840impl<N: NodePrimitives> StatsReader for StaticFileProvider<N> {
1841    fn count_entries<T: Table>(&self) -> ProviderResult<usize> {
1842        match T::NAME {
1843            tables::CanonicalHeaders::NAME |
1844            tables::Headers::<Header>::NAME |
1845            tables::HeaderTerminalDifficulties::NAME => Ok(self
1846                .get_highest_static_file_block(StaticFileSegment::Headers)
1847                .map(|block| block + 1)
1848                .unwrap_or_default()
1849                as usize),
1850            tables::Receipts::<Receipt>::NAME => Ok(self
1851                .get_highest_static_file_tx(StaticFileSegment::Receipts)
1852                .map(|receipts| receipts + 1)
1853                .unwrap_or_default() as usize),
1854            tables::Transactions::<TransactionSigned>::NAME => Ok(self
1855                .get_highest_static_file_tx(StaticFileSegment::Transactions)
1856                .map(|txs| txs + 1)
1857                .unwrap_or_default()
1858                as usize),
1859            _ => Err(ProviderError::UnsupportedProvider),
1860        }
1861    }
1862}
1863
1864/// Calculates the tx hash for the given transaction and its id.
1865#[inline]
1866fn calculate_hash<T>(
1867    entry: (TxNumber, T),
1868    rlp_buf: &mut Vec<u8>,
1869) -> Result<(B256, TxNumber), Box<ProviderError>>
1870where
1871    T: Encodable2718,
1872{
1873    let (tx_id, tx) = entry;
1874    tx.encode_2718(rlp_buf);
1875    Ok((keccak256(rlp_buf), tx_id))
1876}