reth_provider/providers/static_file/
manager.rs

1use super::{
2    metrics::StaticFileProviderMetrics, writer::StaticFileWriters, LoadedJar,
3    StaticFileJarProvider, StaticFileProviderRW, StaticFileProviderRWRefMut,
4};
5use crate::{
6    to_range, BlockHashReader, BlockNumReader, BlockReader, BlockSource, EitherWriter,
7    HeaderProvider, ReceiptProvider, StageCheckpointReader, StatsReader, TransactionVariant,
8    TransactionsProvider, TransactionsProviderExt,
9};
10use alloy_consensus::{
11    transaction::{SignerRecoverable, TransactionMeta},
12    Header,
13};
14use alloy_eips::{eip2718::Encodable2718, BlockHashOrNumber};
15use alloy_primitives::{b256, keccak256, Address, BlockHash, BlockNumber, TxHash, TxNumber, B256};
16use dashmap::DashMap;
17use notify::{RecommendedWatcher, RecursiveMode, Watcher};
18use parking_lot::RwLock;
19use reth_chainspec::{ChainInfo, ChainSpecProvider, EthChainSpec, NamedChain};
20use reth_db::{
21    lockfile::StorageLock,
22    static_file::{
23        iter_static_files, BlockHashMask, HeaderMask, HeaderWithHashMask, ReceiptMask,
24        StaticFileCursor, TransactionMask,
25    },
26};
27use reth_db_api::{
28    cursor::DbCursorRO,
29    models::StoredBlockBodyIndices,
30    table::{Decompress, Table, Value},
31    tables,
32    transaction::DbTx,
33};
34use reth_ethereum_primitives::{Receipt, TransactionSigned};
35use reth_nippy_jar::{NippyJar, NippyJarChecker, CONFIG_FILE_EXTENSION};
36use reth_node_types::NodePrimitives;
37use reth_primitives_traits::{RecoveredBlock, SealedHeader, SignedTransaction};
38use reth_stages_types::{PipelineTarget, StageId};
39use reth_static_file_types::{
40    find_fixed_range, HighestStaticFiles, SegmentHeader, SegmentRangeInclusive, StaticFileSegment,
41    DEFAULT_BLOCKS_PER_STATIC_FILE,
42};
43use reth_storage_api::{BlockBodyIndicesProvider, DBProvider, StorageSettingsCache};
44use reth_storage_errors::provider::{ProviderError, ProviderResult};
45use std::{
46    collections::{BTreeMap, HashMap},
47    fmt::Debug,
48    ops::{Deref, Range, RangeBounds, RangeInclusive},
49    path::{Path, PathBuf},
50    sync::{atomic::AtomicU64, mpsc, Arc},
51};
52use tracing::{debug, info, trace, warn};
53
54/// Alias type for a map that can be queried for block or transaction ranges. It uses `u64` to
55/// represent either a block or a transaction number end of a static file range.
56type SegmentRanges = HashMap<StaticFileSegment, BTreeMap<u64, SegmentRangeInclusive>>;
57
58/// Access mode on a static file provider. RO/RW.
59#[derive(Debug, Default, PartialEq, Eq)]
60pub enum StaticFileAccess {
61    /// Read-only access.
62    #[default]
63    RO,
64    /// Read-write access.
65    RW,
66}
67
68impl StaticFileAccess {
69    /// Returns `true` if read-only access.
70    pub const fn is_read_only(&self) -> bool {
71        matches!(self, Self::RO)
72    }
73
74    /// Returns `true` if read-write access.
75    pub const fn is_read_write(&self) -> bool {
76        matches!(self, Self::RW)
77    }
78}
79
80/// [`StaticFileProvider`] manages all existing [`StaticFileJarProvider`].
81///
82/// "Static files" contain immutable chain history data, such as:
83///  - transactions
84///  - headers
85///  - receipts
86///
87/// This provider type is responsible for reading and writing to static files.
88#[derive(Debug)]
89pub struct StaticFileProvider<N>(pub(crate) Arc<StaticFileProviderInner<N>>);
90
91impl<N> Clone for StaticFileProvider<N> {
92    fn clone(&self) -> Self {
93        Self(self.0.clone())
94    }
95}
96
97/// Builder for [`StaticFileProvider`] that allows configuration before initialization.
98#[derive(Debug)]
99pub struct StaticFileProviderBuilder<N> {
100    inner: StaticFileProviderInner<N>,
101}
102
103impl<N: NodePrimitives> StaticFileProviderBuilder<N> {
104    /// Creates a new builder with read-write access.
105    pub fn read_write(path: impl AsRef<Path>) -> ProviderResult<Self> {
106        StaticFileProviderInner::new(path, StaticFileAccess::RW).map(|inner| Self { inner })
107    }
108
109    /// Creates a new builder with read-only access.
110    pub fn read_only(path: impl AsRef<Path>) -> ProviderResult<Self> {
111        StaticFileProviderInner::new(path, StaticFileAccess::RO).map(|inner| Self { inner })
112    }
113
114    /// Set custom blocks per file for specific segments.
115    ///
116    /// Each static file segment is stored across multiple files, and each of these files contains
117    /// up to the specified number of blocks of data. When the file gets full, a new file is
118    /// created with the new block range.
119    ///
120    /// This setting affects the size of each static file, and can be set per segment.
121    ///
122    /// If it is changed for an existing node, existing static files will not be affected and will
123    /// be finished with the old blocks per file setting, but new static files will use the new
124    /// setting.
125    pub fn with_blocks_per_file_for_segments(
126        mut self,
127        segments: HashMap<StaticFileSegment, u64>,
128    ) -> Self {
129        self.inner.blocks_per_file.extend(segments);
130        self
131    }
132
133    /// Set a custom number of blocks per file for all segments.
134    pub fn with_blocks_per_file(mut self, blocks_per_file: u64) -> Self {
135        for segment in StaticFileSegment::iter() {
136            self.inner.blocks_per_file.insert(segment, blocks_per_file);
137        }
138        self
139    }
140
141    /// Set a custom number of blocks per file for a specific segment.
142    pub fn with_blocks_per_file_for_segment(
143        mut self,
144        segment: StaticFileSegment,
145        blocks_per_file: u64,
146    ) -> Self {
147        self.inner.blocks_per_file.insert(segment, blocks_per_file);
148        self
149    }
150
151    /// Enables metrics on the [`StaticFileProvider`].
152    pub fn with_metrics(mut self) -> Self {
153        self.inner.metrics = Some(Arc::new(StaticFileProviderMetrics::default()));
154        self
155    }
156
157    /// Builds the final [`StaticFileProvider`] and initializes the index.
158    pub fn build(self) -> ProviderResult<StaticFileProvider<N>> {
159        let provider = StaticFileProvider(Arc::new(self.inner));
160        provider.initialize_index()?;
161        Ok(provider)
162    }
163}
164
165impl<N: NodePrimitives> StaticFileProvider<N> {
166    /// Creates a new [`StaticFileProvider`] with read-only access.
167    ///
168    /// Set `watch_directory` to `true` to track the most recent changes in static files. Otherwise,
169    /// new data won't be detected or queryable.
170    ///
171    /// Watching is recommended if the read-only provider is used on a directory that an active node
172    /// instance is modifying.
173    ///
174    /// See also [`StaticFileProvider::watch_directory`].
175    pub fn read_only(path: impl AsRef<Path>, watch_directory: bool) -> ProviderResult<Self> {
176        let provider = StaticFileProviderBuilder::read_only(path)?.build()?;
177
178        if watch_directory {
179            provider.watch_directory();
180        }
181
182        Ok(provider)
183    }
184
185    /// Creates a new [`StaticFileProvider`] with read-write access.
186    pub fn read_write(path: impl AsRef<Path>) -> ProviderResult<Self> {
187        StaticFileProviderBuilder::read_write(path)?.build()
188    }
189
190    /// Watches the directory for changes and updates the in-memory index when modifications
191    /// are detected.
192    ///
193    /// This may be necessary, since a non-node process that owns a [`StaticFileProvider`] does not
194    /// receive `update_index` notifications from a node that appends/truncates data.
195    pub fn watch_directory(&self) {
196        let provider = self.clone();
197        std::thread::spawn(move || {
198            let (tx, rx) = std::sync::mpsc::channel();
199            let mut watcher = RecommendedWatcher::new(
200                move |res| tx.send(res).unwrap(),
201                notify::Config::default(),
202            )
203            .expect("failed to create watcher");
204
205            watcher
206                .watch(&provider.path, RecursiveMode::NonRecursive)
207                .expect("failed to watch path");
208
209            // Some backends send repeated modified events
210            let mut last_event_timestamp = None;
211
212            while let Ok(res) = rx.recv() {
213                match res {
214                    Ok(event) => {
215                        // We only care about modified data events
216                        if !matches!(
217                            event.kind,
218                            notify::EventKind::Modify(_) |
219                                notify::EventKind::Create(_) |
220                                notify::EventKind::Remove(_)
221                        ) {
222                            continue
223                        }
224
225                        // We only trigger a re-initialization if a configuration file was
226                        // modified. This means that a
227                        // static_file_provider.commit() was called on the node after
228                        // appending/truncating rows
229                        for segment in event.paths {
230                            // Ensure it's a file with the .conf extension
231                            if segment
232                                .extension()
233                                .is_none_or(|s| s.to_str() != Some(CONFIG_FILE_EXTENSION))
234                            {
235                                continue
236                            }
237
238                            // Ensure it's well formatted static file name
239                            if StaticFileSegment::parse_filename(
240                                &segment.file_stem().expect("qed").to_string_lossy(),
241                            )
242                            .is_none()
243                            {
244                                continue
245                            }
246
247                            // If we can read the metadata and modified timestamp, ensure this is
248                            // not an old or repeated event.
249                            if let Ok(current_modified_timestamp) =
250                                std::fs::metadata(&segment).and_then(|m| m.modified())
251                            {
252                                if last_event_timestamp.is_some_and(|last_timestamp| {
253                                    last_timestamp >= current_modified_timestamp
254                                }) {
255                                    continue
256                                }
257                                last_event_timestamp = Some(current_modified_timestamp);
258                            }
259
260                            info!(target: "providers::static_file", updated_file = ?segment.file_stem(), "re-initializing static file provider index");
261                            if let Err(err) = provider.initialize_index() {
262                                warn!(target: "providers::static_file", "failed to re-initialize index: {err}");
263                            }
264                            break
265                        }
266                    }
267
268                    Err(err) => warn!(target: "providers::watcher", "watch error: {err:?}"),
269                }
270            }
271        });
272    }
273}
274
275impl<N: NodePrimitives> Deref for StaticFileProvider<N> {
276    type Target = StaticFileProviderInner<N>;
277
278    fn deref(&self) -> &Self::Target {
279        &self.0
280    }
281}
282
283/// [`StaticFileProviderInner`] manages all existing [`StaticFileJarProvider`].
284#[derive(Debug)]
285pub struct StaticFileProviderInner<N> {
286    /// Maintains a map which allows for concurrent access to different `NippyJars`, over different
287    /// segments and ranges.
288    map: DashMap<(BlockNumber, StaticFileSegment), LoadedJar>,
289    /// Min static file range for each segment.
290    /// This index is initialized on launch to keep track of the lowest, non-expired static file
291    /// per segment and gets updated on `Self::update_index()`.
292    ///
293    /// This tracks the lowest static file per segment together with the block range in that
294    /// file. E.g. static file is batched in 500k block intervals then the lowest static file
295    /// is [0..499K], and the block range is start = 0, end = 499K.
296    /// This index is mainly used to History expiry, which targets transactions, e.g. pre-merge
297    /// history expiry would lead to removing all static files below the merge height.
298    static_files_min_block: RwLock<HashMap<StaticFileSegment, SegmentRangeInclusive>>,
299    /// This is an additional index that tracks the expired height, this will track the highest
300    /// block number that has been expired (missing). The first, non expired block is
301    /// `expired_history_height + 1`.
302    ///
303    /// This is effectively the transaction range that has been expired:
304    /// [`StaticFileProvider::delete_segment_below_block`] and mirrors
305    /// `static_files_min_block[transactions] - blocks_per_file`.
306    ///
307    /// This additional tracker exists for more efficient lookups because the node must be aware of
308    /// the expired height.
309    earliest_history_height: AtomicU64,
310    /// Max static file block for each segment
311    static_files_max_block: RwLock<HashMap<StaticFileSegment, u64>>,
312    /// Expected on disk static file block ranges indexed by max expected blocks.
313    ///
314    /// For example, a static file for expected block range `0..=499_000` may have only block range
315    /// `0..=1000` contained in it, as it wasn't fully filled yet. This index maps the max expected
316    /// block to the expected range, i.e. block `499_000` to block range `0..=499_000`.
317    static_files_expected_block_index: RwLock<SegmentRanges>,
318    /// Available on disk static file block ranges indexed by max transactions.
319    ///
320    /// For example, a static file for block range `0..=499_000` may only have block range
321    /// `0..=1000` and transaction range `0..=2000` contained in it. This index maps the max
322    /// available transaction to the available block range, i.e. transaction `2000` to block range
323    /// `0..=1000`.
324    static_files_tx_index: RwLock<SegmentRanges>,
325    /// Directory where `static_files` are located
326    path: PathBuf,
327    /// Maintains a writer set of [`StaticFileSegment`].
328    writers: StaticFileWriters<N>,
329    /// Metrics for the static files.
330    metrics: Option<Arc<StaticFileProviderMetrics>>,
331    /// Access rights of the provider.
332    access: StaticFileAccess,
333    /// Number of blocks per file, per segment.
334    blocks_per_file: HashMap<StaticFileSegment, u64>,
335    /// Write lock for when access is [`StaticFileAccess::RW`].
336    _lock_file: Option<StorageLock>,
337}
338
339impl<N: NodePrimitives> StaticFileProviderInner<N> {
340    /// Creates a new [`StaticFileProviderInner`].
341    fn new(path: impl AsRef<Path>, access: StaticFileAccess) -> ProviderResult<Self> {
342        let _lock_file = if access.is_read_write() {
343            StorageLock::try_acquire(path.as_ref()).map_err(ProviderError::other)?.into()
344        } else {
345            None
346        };
347
348        let mut blocks_per_file = HashMap::new();
349        for segment in StaticFileSegment::iter() {
350            blocks_per_file.insert(segment, DEFAULT_BLOCKS_PER_STATIC_FILE);
351        }
352
353        let provider = Self {
354            map: Default::default(),
355            writers: Default::default(),
356            static_files_min_block: Default::default(),
357            earliest_history_height: Default::default(),
358            static_files_max_block: Default::default(),
359            static_files_expected_block_index: Default::default(),
360            static_files_tx_index: Default::default(),
361            path: path.as_ref().to_path_buf(),
362            metrics: None,
363            access,
364            blocks_per_file,
365            _lock_file,
366        };
367
368        Ok(provider)
369    }
370
371    pub const fn is_read_only(&self) -> bool {
372        self.access.is_read_only()
373    }
374
375    /// Each static file has a fixed number of blocks. This gives out the range where the requested
376    /// block is positioned.
377    ///
378    /// If the specified block falls into one of the ranges of already initialized static files,
379    /// this function will return that range.
380    ///
381    /// If no matching file exists, this function will derive a new range from the end of the last
382    /// existing file, if any.
383    pub fn find_fixed_range_with_block_index(
384        &self,
385        segment: StaticFileSegment,
386        block_index: Option<&BTreeMap<u64, SegmentRangeInclusive>>,
387        block: BlockNumber,
388    ) -> SegmentRangeInclusive {
389        let blocks_per_file =
390            self.blocks_per_file.get(&segment).copied().unwrap_or(DEFAULT_BLOCKS_PER_STATIC_FILE);
391
392        if let Some(block_index) = block_index {
393            // Find first block range that contains the requested block
394            if let Some((_, range)) = block_index.iter().find(|(max_block, _)| block <= **max_block)
395            {
396                // Found matching range for an existing file using block index
397                return *range
398            } else if let Some((_, range)) = block_index.last_key_value() {
399                // Didn't find matching range for an existing file, derive a new range from the end
400                // of the last existing file range.
401                //
402                // `block` is always higher than `range.end()` here, because we iterated over all
403                // `block_index` ranges above and didn't find one that contains our block
404                let blocks_after_last_range = block - range.end();
405                let segments_to_skip = (blocks_after_last_range - 1) / blocks_per_file;
406                let start = range.end() + 1 + segments_to_skip * blocks_per_file;
407                return SegmentRangeInclusive::new(start, start + blocks_per_file - 1)
408            }
409        }
410        // No block index is available, derive a new range using the fixed number of blocks,
411        // starting from the beginning.
412        find_fixed_range(block, blocks_per_file)
413    }
414
415    /// Each static file has a fixed number of blocks. This gives out the range where the requested
416    /// block is positioned.
417    ///
418    /// If the specified block falls into one of the ranges of already initialized static files,
419    /// this function will return that range.
420    ///
421    /// If no matching file exists, this function will derive a new range from the end of the last
422    /// existing file, if any.
423    ///
424    /// This function will block indefinitely if a write lock for
425    /// [`Self::static_files_expected_block_index`] is acquired. In that case, use
426    /// [`Self::find_fixed_range_with_block_index`].
427    pub fn find_fixed_range(
428        &self,
429        segment: StaticFileSegment,
430        block: BlockNumber,
431    ) -> SegmentRangeInclusive {
432        self.find_fixed_range_with_block_index(
433            segment,
434            self.static_files_expected_block_index.read().get(&segment),
435            block,
436        )
437    }
438}
439
440impl<N: NodePrimitives> StaticFileProvider<N> {
441    /// Reports metrics for the static files.
442    pub fn report_metrics(&self) -> ProviderResult<()> {
443        let Some(metrics) = &self.metrics else { return Ok(()) };
444
445        let static_files = iter_static_files(&self.path).map_err(ProviderError::other)?;
446        for (segment, headers) in static_files {
447            let mut entries = 0;
448            let mut size = 0;
449
450            for (block_range, _) in &headers {
451                let fixed_block_range = self.find_fixed_range(segment, block_range.start());
452                let jar_provider = self
453                    .get_segment_provider_for_range(segment, || Some(fixed_block_range), None)?
454                    .ok_or_else(|| {
455                        ProviderError::MissingStaticFileBlock(segment, block_range.start())
456                    })?;
457
458                entries += jar_provider.rows();
459
460                let data_size = reth_fs_util::metadata(jar_provider.data_path())
461                    .map(|metadata| metadata.len())
462                    .unwrap_or_default();
463                let index_size = reth_fs_util::metadata(jar_provider.index_path())
464                    .map(|metadata| metadata.len())
465                    .unwrap_or_default();
466                let offsets_size = reth_fs_util::metadata(jar_provider.offsets_path())
467                    .map(|metadata| metadata.len())
468                    .unwrap_or_default();
469                let config_size = reth_fs_util::metadata(jar_provider.config_path())
470                    .map(|metadata| metadata.len())
471                    .unwrap_or_default();
472
473                size += data_size + index_size + offsets_size + config_size;
474            }
475
476            metrics.record_segment(segment, size, headers.len(), entries);
477        }
478
479        Ok(())
480    }
481
482    /// Gets the [`StaticFileJarProvider`] of the requested segment and start index that can be
483    /// either block or transaction.
484    pub fn get_segment_provider(
485        &self,
486        segment: StaticFileSegment,
487        start: u64,
488    ) -> ProviderResult<StaticFileJarProvider<'_, N>> {
489        if segment.is_block_based() {
490            self.get_segment_provider_for_block(segment, start, None)
491        } else {
492            self.get_segment_provider_for_transaction(segment, start, None)
493        }
494    }
495
496    /// Gets the [`StaticFileJarProvider`] of the requested segment and block.
497    pub fn get_segment_provider_for_block(
498        &self,
499        segment: StaticFileSegment,
500        block: BlockNumber,
501        path: Option<&Path>,
502    ) -> ProviderResult<StaticFileJarProvider<'_, N>> {
503        self.get_segment_provider_for_range(
504            segment,
505            || self.get_segment_ranges_from_block(segment, block),
506            path,
507        )?
508        .ok_or(ProviderError::MissingStaticFileBlock(segment, block))
509    }
510
511    /// Gets the [`StaticFileJarProvider`] of the requested segment and transaction.
512    pub fn get_segment_provider_for_transaction(
513        &self,
514        segment: StaticFileSegment,
515        tx: TxNumber,
516        path: Option<&Path>,
517    ) -> ProviderResult<StaticFileJarProvider<'_, N>> {
518        self.get_segment_provider_for_range(
519            segment,
520            || self.get_segment_ranges_from_transaction(segment, tx),
521            path,
522        )?
523        .ok_or(ProviderError::MissingStaticFileTx(segment, tx))
524    }
525
526    /// Gets the [`StaticFileJarProvider`] of the requested segment and block or transaction.
527    ///
528    /// `fn_range` should make sure the range goes through `find_fixed_range`.
529    pub fn get_segment_provider_for_range(
530        &self,
531        segment: StaticFileSegment,
532        fn_range: impl Fn() -> Option<SegmentRangeInclusive>,
533        path: Option<&Path>,
534    ) -> ProviderResult<Option<StaticFileJarProvider<'_, N>>> {
535        // If we have a path, then get the block range from its name.
536        // Otherwise, check `self.available_static_files`
537        let block_range = match path {
538            Some(path) => StaticFileSegment::parse_filename(
539                &path
540                    .file_name()
541                    .ok_or_else(|| {
542                        ProviderError::MissingStaticFileSegmentPath(segment, path.to_path_buf())
543                    })?
544                    .to_string_lossy(),
545            )
546            .and_then(|(parsed_segment, block_range)| {
547                if parsed_segment == segment {
548                    return Some(block_range)
549                }
550                None
551            }),
552            None => fn_range(),
553        };
554
555        // Return cached `LoadedJar` or insert it for the first time, and then, return it.
556        if let Some(block_range) = block_range {
557            return Ok(Some(self.get_or_create_jar_provider(segment, &block_range)?))
558        }
559
560        Ok(None)
561    }
562
563    /// Gets the [`StaticFileJarProvider`] of the requested path.
564    pub fn get_segment_provider_for_path(
565        &self,
566        path: &Path,
567    ) -> ProviderResult<Option<StaticFileJarProvider<'_, N>>> {
568        StaticFileSegment::parse_filename(
569            &path
570                .file_name()
571                .ok_or_else(|| ProviderError::MissingStaticFilePath(path.to_path_buf()))?
572                .to_string_lossy(),
573        )
574        .map(|(segment, block_range)| self.get_or_create_jar_provider(segment, &block_range))
575        .transpose()
576    }
577
578    /// Given a segment and block range it removes the cached provider from the map.
579    ///
580    /// CAUTION: cached provider should be dropped before calling this or IT WILL deadlock.
581    pub fn remove_cached_provider(
582        &self,
583        segment: StaticFileSegment,
584        fixed_block_range_end: BlockNumber,
585    ) {
586        self.map.remove(&(fixed_block_range_end, segment));
587    }
588
589    /// This handles history expiry by deleting all static files for the given segment below the
590    /// given block.
591    ///
592    /// For example if block is 1M and the blocks per file are 500K this will delete all individual
593    /// files below 1M, so 0-499K and 500K-999K.
594    ///
595    /// This will not delete the file that contains the block itself, because files can only be
596    /// removed entirely.
597    ///
598    /// # Safety
599    ///
600    /// This method will never delete the highest static file for the segment, even if the
601    /// requested block is higher than the highest block in static files. This ensures we always
602    /// maintain at least one static file if any exist.
603    ///
604    /// Returns a list of `SegmentHeader`s from the deleted jars.
605    pub fn delete_segment_below_block(
606        &self,
607        segment: StaticFileSegment,
608        block: BlockNumber,
609    ) -> ProviderResult<Vec<SegmentHeader>> {
610        // Nothing to delete if block is 0.
611        if block == 0 {
612            return Ok(Vec::new())
613        }
614
615        let highest_block = self.get_highest_static_file_block(segment);
616        let mut deleted_headers = Vec::new();
617
618        loop {
619            let Some(block_height) = self.get_lowest_range_end(segment) else {
620                return Ok(deleted_headers)
621            };
622
623            // Stop if we've reached the target block or the highest static file
624            if block_height >= block || Some(block_height) == highest_block {
625                return Ok(deleted_headers)
626            }
627
628            debug!(
629                target: "provider::static_file",
630                ?segment,
631                ?block_height,
632                "Deleting static file below block"
633            );
634
635            // now we need to wipe the static file, this will take care of updating the index and
636            // advance the lowest tracked block height for the segment.
637            let header = self.delete_jar(segment, block_height).inspect_err(|err| {
638                warn!( target: "provider::static_file", ?segment, %block_height, ?err, "Failed to delete static file below block")
639            })?;
640
641            deleted_headers.push(header);
642        }
643    }
644
645    /// Given a segment and block, it deletes the jar and all files from the respective block range.
646    ///
647    /// CAUTION: destructive. Deletes files on disk.
648    ///
649    /// This will re-initialize the index after deletion, so all files are tracked.
650    ///
651    /// Returns the `SegmentHeader` of the deleted jar.
652    pub fn delete_jar(
653        &self,
654        segment: StaticFileSegment,
655        block: BlockNumber,
656    ) -> ProviderResult<SegmentHeader> {
657        let fixed_block_range = self.find_fixed_range(segment, block);
658        let key = (fixed_block_range.end(), segment);
659        let jar = if let Some((_, jar)) = self.map.remove(&key) {
660            jar.jar
661        } else {
662            let file = self.path.join(segment.filename(&fixed_block_range));
663            debug!(
664                target: "provider::static_file",
665                ?file,
666                ?fixed_block_range,
667                ?block,
668                "Loading static file jar for deletion"
669            );
670            NippyJar::<SegmentHeader>::load(&file).map_err(ProviderError::other)?
671        };
672
673        let header = *jar.user_header();
674        jar.delete().map_err(ProviderError::other)?;
675
676        // SAFETY: this is currently necessary to ensure that certain indexes like
677        // `static_files_min_block` have the correct values after pruning.
678        self.initialize_index()?;
679
680        Ok(header)
681    }
682
683    /// Given a segment and block range it returns a cached
684    /// [`StaticFileJarProvider`]. TODO(joshie): we should check the size and pop N if there's too
685    /// many.
686    fn get_or_create_jar_provider(
687        &self,
688        segment: StaticFileSegment,
689        fixed_block_range: &SegmentRangeInclusive,
690    ) -> ProviderResult<StaticFileJarProvider<'_, N>> {
691        let key = (fixed_block_range.end(), segment);
692
693        // Avoid using `entry` directly to avoid a write lock in the common case.
694        trace!(target: "provider::static_file", ?segment, ?fixed_block_range, "Getting provider");
695        let mut provider: StaticFileJarProvider<'_, N> = if let Some(jar) = self.map.get(&key) {
696            trace!(target: "provider::static_file", ?segment, ?fixed_block_range, "Jar found in cache");
697            jar.into()
698        } else {
699            trace!(target: "provider::static_file", ?segment, ?fixed_block_range, "Creating jar from scratch");
700            let path = self.path.join(segment.filename(fixed_block_range));
701            let jar = NippyJar::load(&path).map_err(ProviderError::other)?;
702            self.map.entry(key).insert(LoadedJar::new(jar)?).downgrade().into()
703        };
704
705        if let Some(metrics) = &self.metrics {
706            provider = provider.with_metrics(metrics.clone());
707        }
708        Ok(provider)
709    }
710
711    /// Gets a static file segment's block range from the provider inner block
712    /// index.
713    fn get_segment_ranges_from_block(
714        &self,
715        segment: StaticFileSegment,
716        block: u64,
717    ) -> Option<SegmentRangeInclusive> {
718        self.static_files_max_block
719            .read()
720            .get(&segment)
721            .filter(|max| **max >= block)
722            .map(|_| self.find_fixed_range(segment, block))
723    }
724
725    /// Gets a static file segment's fixed block range from the provider inner
726    /// transaction index.
727    fn get_segment_ranges_from_transaction(
728        &self,
729        segment: StaticFileSegment,
730        tx: u64,
731    ) -> Option<SegmentRangeInclusive> {
732        let static_files = self.static_files_tx_index.read();
733        let segment_static_files = static_files.get(&segment)?;
734
735        // It's more probable that the request comes from a newer tx height, so we iterate
736        // the static_files in reverse.
737        let mut static_files_rev_iter = segment_static_files.iter().rev().peekable();
738
739        while let Some((tx_end, block_range)) = static_files_rev_iter.next() {
740            if tx > *tx_end {
741                // request tx is higher than highest static file tx
742                return None
743            }
744            let tx_start = static_files_rev_iter.peek().map(|(tx_end, _)| *tx_end + 1).unwrap_or(0);
745            if tx_start <= tx {
746                return Some(self.find_fixed_range(segment, block_range.end()))
747            }
748        }
749        None
750    }
751
752    /// Updates the inner transaction and block indexes alongside the internal cached providers in
753    /// `self.map`.
754    ///
755    /// Any entry higher than `segment_max_block` will be deleted from the previous structures.
756    ///
757    /// If `segment_max_block` is None it means there's no static file for this segment.
758    pub fn update_index(
759        &self,
760        segment: StaticFileSegment,
761        segment_max_block: Option<BlockNumber>,
762    ) -> ProviderResult<()> {
763        let mut min_block = self.static_files_min_block.write();
764        let mut max_block = self.static_files_max_block.write();
765        let mut expected_block_index = self.static_files_expected_block_index.write();
766        let mut tx_index = self.static_files_tx_index.write();
767
768        match segment_max_block {
769            Some(segment_max_block) => {
770                // Update the max block for the segment
771                max_block.insert(segment, segment_max_block);
772                let fixed_range = self.find_fixed_range_with_block_index(
773                    segment,
774                    expected_block_index.get(&segment),
775                    segment_max_block,
776                );
777
778                let jar = NippyJar::<SegmentHeader>::load(
779                    &self.path.join(segment.filename(&fixed_range)),
780                )
781                .map_err(ProviderError::other)?;
782
783                // Update min_block to track the lowest block range of the segment.
784                // This is initially set by initialize_index() on node startup, but must be updated
785                // as the file grows to prevent stale values.
786                //
787                // Without this update, min_block can remain at genesis (e.g. Some([0..=0]) or None)
788                // even after syncing to higher blocks (e.g. [0..=100]). A stale
789                // min_block causes get_lowest_static_file_block() to return the
790                // wrong end value, which breaks pruning logic that relies on it for
791                // safety checks.
792                //
793                // Example progression:
794                // 1. Node starts, initialize_index() sets min_block = [0..=0]
795                // 2. Sync to block 100, this update sets min_block = [0..=100]
796                // 3. Pruner calls get_lowest_static_file_block() -> returns 100 (correct). Without
797                //    this update, it would incorrectly return 0 (stale)
798                if let Some(current_block_range) = jar.user_header().block_range() {
799                    min_block
800                        .entry(segment)
801                        .and_modify(|current_min| {
802                            // delete_jar WILL ALWAYS re-initialize all indexes, so we are always
803                            // sure that current_min is always the lowest.
804                            if current_block_range.start() == current_min.start() {
805                                *current_min = current_block_range;
806                            }
807                        })
808                        .or_insert(current_block_range);
809                }
810
811                // Update the expected block index
812                expected_block_index
813                    .entry(segment)
814                    .and_modify(|index| {
815                        index.retain(|_, block_range| block_range.start() < fixed_range.start());
816
817                        index.insert(fixed_range.end(), fixed_range);
818                    })
819                    .or_insert_with(|| BTreeMap::from([(fixed_range.end(), fixed_range)]));
820
821                // Updates the tx index by first removing all entries which have a higher
822                // block_start than our current static file.
823                if let Some(tx_range) = jar.user_header().tx_range() {
824                    // Current block range has the same block start as `fixed_range``, but block end
825                    // might be different if we are still filling this static file.
826                    if let Some(current_block_range) = jar.user_header().block_range() {
827                        let tx_end = tx_range.end();
828
829                        // Considering that `update_index` is called when we either append/truncate,
830                        // we are sure that we are handling the latest data
831                        // points.
832                        //
833                        // Here we remove every entry of the index that has a block start higher or
834                        // equal than our current one. This is important in the case
835                        // that we prune a lot of rows resulting in a file (and thus
836                        // a higher block range) deletion.
837                        tx_index
838                            .entry(segment)
839                            .and_modify(|index| {
840                                index.retain(|_, block_range| {
841                                    block_range.start() < fixed_range.start()
842                                });
843                                index.insert(tx_end, current_block_range);
844                            })
845                            .or_insert_with(|| BTreeMap::from([(tx_end, current_block_range)]));
846                    }
847                } else if segment.is_tx_based() {
848                    // The unwinded file has no more transactions/receipts. However, the highest
849                    // block is within this files' block range. We only retain
850                    // entries with block ranges before the current one.
851                    tx_index.entry(segment).and_modify(|index| {
852                        index.retain(|_, block_range| block_range.start() < fixed_range.start());
853                    });
854
855                    // If the index is empty, just remove it.
856                    if tx_index.get(&segment).is_some_and(|index| index.is_empty()) {
857                        tx_index.remove(&segment);
858                    }
859                }
860
861                // Update the cached provider.
862                self.map.insert((fixed_range.end(), segment), LoadedJar::new(jar)?);
863
864                // Delete any cached provider that no longer has an associated jar.
865                self.map.retain(|(end, seg), _| !(*seg == segment && *end > fixed_range.end()));
866            }
867            None => {
868                max_block.remove(&segment);
869                min_block.remove(&segment);
870                expected_block_index.remove(&segment);
871                tx_index.remove(&segment);
872            }
873        };
874
875        Ok(())
876    }
877
878    /// Initializes the inner transaction and block index
879    pub fn initialize_index(&self) -> ProviderResult<()> {
880        let mut min_block = self.static_files_min_block.write();
881        let mut max_block = self.static_files_max_block.write();
882        let mut expected_block_index = self.static_files_expected_block_index.write();
883        let mut tx_index = self.static_files_tx_index.write();
884
885        min_block.clear();
886        max_block.clear();
887        tx_index.clear();
888
889        for (segment, headers) in iter_static_files(&self.path).map_err(ProviderError::other)? {
890            // Update first and last block for each segment
891            if let Some((block_range, _)) = headers.first() {
892                min_block.insert(segment, *block_range);
893            }
894            if let Some((block_range, _)) = headers.last() {
895                max_block.insert(segment, block_range.end());
896            }
897
898            for (block_range, header) in headers {
899                // Update max expected block -> expected_block_range index
900                expected_block_index
901                    .entry(segment)
902                    .and_modify(|index| {
903                        index.insert(header.expected_block_end(), header.expected_block_range());
904                    })
905                    .or_insert_with(|| {
906                        BTreeMap::from([(
907                            header.expected_block_end(),
908                            header.expected_block_range(),
909                        )])
910                    });
911
912                // Update max tx -> block_range index
913                if let Some(tx_range) = header.tx_range() {
914                    let tx_end = tx_range.end();
915
916                    tx_index
917                        .entry(segment)
918                        .and_modify(|index| {
919                            index.insert(tx_end, block_range);
920                        })
921                        .or_insert_with(|| BTreeMap::from([(tx_end, block_range)]));
922                }
923            }
924        }
925
926        // If this is a re-initialization, we need to clear this as well
927        self.map.clear();
928
929        // initialize the expired history height to the lowest static file block
930        if let Some(lowest_range) = min_block.get(&StaticFileSegment::Transactions) {
931            // the earliest height is the lowest available block number
932            self.earliest_history_height
933                .store(lowest_range.start(), std::sync::atomic::Ordering::Relaxed);
934        }
935
936        Ok(())
937    }
938
939    /// Ensures that any broken invariants which cannot be healed on the spot return a pipeline
940    /// target to unwind to.
941    ///
942    /// Two types of consistency checks are done for:
943    ///
944    /// 1) When a static file fails to commit but the underlying data was changed.
945    /// 2) When a static file was committed, but the required database transaction was not.
946    ///
947    /// For 1) it can self-heal if `self.access.is_read_only()` is set to `false`. Otherwise, it
948    /// will return an error.
949    /// For 2) the invariants below are checked, and if broken, might require a pipeline unwind
950    /// to heal.
951    ///
952    /// For each static file segment:
953    /// * the corresponding database table should overlap or have continuity in their keys
954    ///   ([`TxNumber`] or [`BlockNumber`]).
955    /// * its highest block should match the stage checkpoint block number if it's equal or higher
956    ///   than the corresponding database table last entry.
957    ///
958    /// Returns a [`Option`] of [`PipelineTarget::Unwind`] if any healing is further required.
959    ///
960    /// WARNING: No static file writer should be held before calling this function, otherwise it
961    /// will deadlock.
962    pub fn check_consistency<Provider>(
963        &self,
964        provider: &Provider,
965    ) -> ProviderResult<Option<PipelineTarget>>
966    where
967        Provider: DBProvider
968            + BlockReader
969            + StageCheckpointReader
970            + ChainSpecProvider
971            + StorageSettingsCache,
972        N: NodePrimitives<Receipt: Value, BlockHeader: Value, SignedTx: Value>,
973    {
974        // OVM historical import is broken and does not work with this check. It's importing
975        // duplicated receipts resulting in having more receipts than the expected transaction
976        // range.
977        //
978        // If we detect an OVM import was done (block #1 <https://optimistic.etherscan.io/block/1>), skip it.
979        // More on [#11099](https://github.com/paradigmxyz/reth/pull/11099).
980        if provider.chain_spec().is_optimism() &&
981            reth_chainspec::Chain::optimism_mainnet() == provider.chain_spec().chain_id()
982        {
983            // check whether we have the first OVM block: <https://optimistic.etherscan.io/block/0xbee7192e575af30420cae0c7776304ac196077ee72b048970549e4f08e875453>
984            const OVM_HEADER_1_HASH: B256 =
985                b256!("0xbee7192e575af30420cae0c7776304ac196077ee72b048970549e4f08e875453");
986            if provider.block_number(OVM_HEADER_1_HASH)?.is_some() {
987                info!(target: "reth::cli",
988                    "Skipping storage verification for OP mainnet, expected inconsistency in OVM chain"
989                );
990                return Ok(None)
991            }
992        }
993
994        info!(target: "reth::cli", "Verifying storage consistency.");
995
996        let mut unwind_target: Option<BlockNumber> = None;
997        let mut update_unwind_target = |new_target: BlockNumber| {
998            if let Some(target) = unwind_target.as_mut() {
999                *target = (*target).min(new_target);
1000            } else {
1001                unwind_target = Some(new_target);
1002            }
1003        };
1004
1005        for segment in StaticFileSegment::iter() {
1006            match segment {
1007                StaticFileSegment::Headers | StaticFileSegment::Transactions => {}
1008                StaticFileSegment::Receipts => {
1009                    if EitherWriter::receipts_destination(provider).is_database() {
1010                        // Old pruned nodes (including full node) do not store receipts as static
1011                        // files.
1012                        continue
1013                    }
1014
1015                    if NamedChain::Gnosis == provider.chain_spec().chain_id() ||
1016                        NamedChain::Chiado == provider.chain_spec().chain_id()
1017                    {
1018                        // Gnosis and Chiado's historical import is broken and does not work with
1019                        // this check. They are importing receipts along
1020                        // with importing headers/bodies.
1021                        continue;
1022                    }
1023                }
1024            }
1025
1026            let initial_highest_block = self.get_highest_static_file_block(segment);
1027
1028            //  File consistency is broken if:
1029            //
1030            // * appending data was interrupted before a config commit, then data file will be
1031            //   truncated according to the config.
1032            //
1033            // * pruning data was interrupted before a config commit, then we have deleted data that
1034            //   we are expected to still have. We need to check the Database and unwind everything
1035            //   accordingly.
1036            if self.access.is_read_only() {
1037                self.check_segment_consistency(segment)?;
1038            } else {
1039                // Fetching the writer will attempt to heal any file level inconsistency.
1040                self.latest_writer(segment)?;
1041            }
1042
1043            // Only applies to block-based static files. (Headers)
1044            //
1045            // The updated `highest_block` may have decreased if we healed from a pruning
1046            // interruption.
1047            let mut highest_block = self.get_highest_static_file_block(segment);
1048            if initial_highest_block != highest_block {
1049                info!(
1050                    target: "reth::providers::static_file",
1051                    ?initial_highest_block,
1052                    unwind_target = highest_block,
1053                    ?segment,
1054                    "Setting unwind target."
1055                );
1056                update_unwind_target(highest_block.unwrap_or_default());
1057            }
1058
1059            // Only applies to transaction-based static files. (Receipts & Transactions)
1060            //
1061            // Make sure the last transaction matches the last block from its indices, since a heal
1062            // from a pruning interruption might have decreased the number of transactions without
1063            // being able to update the last block of the static file segment.
1064            let highest_tx = self.get_highest_static_file_tx(segment);
1065            if let Some(highest_tx) = highest_tx {
1066                let mut last_block = highest_block.unwrap_or_default();
1067                loop {
1068                    if let Some(indices) = provider.block_body_indices(last_block)? {
1069                        if indices.last_tx_num() <= highest_tx {
1070                            break
1071                        }
1072                    } else {
1073                        // If the block body indices can not be found, then it means that static
1074                        // files is ahead of database, and the `ensure_invariants` check will fix
1075                        // it by comparing with stage checkpoints.
1076                        break
1077                    }
1078                    if last_block == 0 {
1079                        break
1080                    }
1081                    last_block -= 1;
1082
1083                    info!(
1084                        target: "reth::providers::static_file",
1085                        highest_block = self.get_highest_static_file_block(segment),
1086                        unwind_target = last_block,
1087                        ?segment,
1088                        "Setting unwind target."
1089                    );
1090                    highest_block = Some(last_block);
1091                    update_unwind_target(last_block);
1092                }
1093            }
1094
1095            if let Some(unwind) = match segment {
1096                StaticFileSegment::Headers => self
1097                    .ensure_invariants::<_, tables::Headers<N::BlockHeader>>(
1098                        provider,
1099                        segment,
1100                        highest_block,
1101                        highest_block,
1102                    )?,
1103                StaticFileSegment::Transactions => self
1104                    .ensure_invariants::<_, tables::Transactions<N::SignedTx>>(
1105                        provider,
1106                        segment,
1107                        highest_tx,
1108                        highest_block,
1109                    )?,
1110                StaticFileSegment::Receipts => self
1111                    .ensure_invariants::<_, tables::Receipts<N::Receipt>>(
1112                        provider,
1113                        segment,
1114                        highest_tx,
1115                        highest_block,
1116                    )?,
1117            } {
1118                update_unwind_target(unwind);
1119            }
1120        }
1121
1122        Ok(unwind_target.map(PipelineTarget::Unwind))
1123    }
1124
1125    /// Checks consistency of the latest static file segment and throws an error if at fault.
1126    /// Read-only.
1127    pub fn check_segment_consistency(&self, segment: StaticFileSegment) -> ProviderResult<()> {
1128        if let Some(latest_block) = self.get_highest_static_file_block(segment) {
1129            let file_path = self
1130                .directory()
1131                .join(segment.filename(&self.find_fixed_range(segment, latest_block)));
1132
1133            let jar = NippyJar::<SegmentHeader>::load(&file_path).map_err(ProviderError::other)?;
1134
1135            NippyJarChecker::new(jar).check_consistency().map_err(ProviderError::other)?;
1136        }
1137        Ok(())
1138    }
1139
1140    /// Check invariants for each corresponding table and static file segment:
1141    ///
1142    /// * the corresponding database table should overlap or have continuity in their keys
1143    ///   ([`TxNumber`] or [`BlockNumber`]).
1144    /// * its highest block should match the stage checkpoint block number if it's equal or higher
1145    ///   than the corresponding database table last entry.
1146    ///   * If the checkpoint block is higher, then request a pipeline unwind to the static file
1147    ///     block. This is expressed by returning [`Some`] with the requested pipeline unwind
1148    ///     target.
1149    ///   * If the checkpoint block is lower, then heal by removing rows from the static file. In
1150    ///     this case, the rows will be removed and [`None`] will be returned.
1151    ///
1152    /// * If the database tables overlap with static files and have contiguous keys, or the
1153    ///   checkpoint block matches the highest static files block, then [`None`] will be returned.
1154    fn ensure_invariants<Provider, T: Table<Key = u64>>(
1155        &self,
1156        provider: &Provider,
1157        segment: StaticFileSegment,
1158        highest_static_file_entry: Option<u64>,
1159        highest_static_file_block: Option<BlockNumber>,
1160    ) -> ProviderResult<Option<BlockNumber>>
1161    where
1162        Provider: DBProvider + BlockReader + StageCheckpointReader,
1163    {
1164        let mut db_cursor = provider.tx_ref().cursor_read::<T>()?;
1165
1166        if let Some((db_first_entry, _)) = db_cursor.first()? {
1167            if let (Some(highest_entry), Some(highest_block)) =
1168                (highest_static_file_entry, highest_static_file_block)
1169            {
1170                // If there is a gap between the entry found in static file and
1171                // database, then we have most likely lost static file data and need to unwind so we
1172                // can load it again
1173                if !(db_first_entry <= highest_entry || highest_entry + 1 == db_first_entry) {
1174                    info!(
1175                        target: "reth::providers::static_file",
1176                        ?db_first_entry,
1177                        ?highest_entry,
1178                        unwind_target = highest_block,
1179                        ?segment,
1180                        "Setting unwind target."
1181                    );
1182                    return Ok(Some(highest_block))
1183                }
1184            }
1185
1186            if let Some((db_last_entry, _)) = db_cursor.last()? &&
1187                highest_static_file_entry
1188                    .is_none_or(|highest_entry| db_last_entry > highest_entry)
1189            {
1190                return Ok(None)
1191            }
1192        }
1193
1194        let highest_static_file_entry = highest_static_file_entry.unwrap_or_default();
1195        let highest_static_file_block = highest_static_file_block.unwrap_or_default();
1196
1197        // If static file entry is ahead of the database entries, then ensure the checkpoint block
1198        // number matches.
1199        let checkpoint_block_number = provider
1200            .get_stage_checkpoint(match segment {
1201                StaticFileSegment::Headers => StageId::Headers,
1202                StaticFileSegment::Transactions => StageId::Bodies,
1203                StaticFileSegment::Receipts => StageId::Execution,
1204            })?
1205            .unwrap_or_default()
1206            .block_number;
1207
1208        // If the checkpoint is ahead, then we lost static file data. May be data corruption.
1209        if checkpoint_block_number > highest_static_file_block {
1210            info!(
1211                target: "reth::providers::static_file",
1212                checkpoint_block_number,
1213                unwind_target = highest_static_file_block,
1214                ?segment,
1215                "Setting unwind target."
1216            );
1217            return Ok(Some(highest_static_file_block))
1218        }
1219
1220        // If the checkpoint is behind, then we failed to do a database commit **but committed** to
1221        // static files on executing a stage, or the reverse on unwinding a stage.
1222        // All we need to do is to prune the extra static file rows.
1223        if checkpoint_block_number < highest_static_file_block {
1224            info!(
1225                target: "reth::providers",
1226                ?segment,
1227                from = highest_static_file_block,
1228                to = checkpoint_block_number,
1229                "Unwinding static file segment."
1230            );
1231            let mut writer = self.latest_writer(segment)?;
1232            match segment {
1233                StaticFileSegment::Headers => {
1234                    // TODO(joshie): is_block_meta
1235                    writer.prune_headers(highest_static_file_block - checkpoint_block_number)?;
1236                }
1237                StaticFileSegment::Transactions | StaticFileSegment::Receipts => {
1238                    if let Some(block) = provider.block_body_indices(checkpoint_block_number)? {
1239                        let number = highest_static_file_entry - block.last_tx_num();
1240
1241                        match segment {
1242                            StaticFileSegment::Transactions => {
1243                                writer.prune_transactions(number, checkpoint_block_number)?
1244                            }
1245                            StaticFileSegment::Receipts => {
1246                                writer.prune_receipts(number, checkpoint_block_number)?
1247                            }
1248                            StaticFileSegment::Headers => unreachable!(),
1249                        }
1250                    }
1251                }
1252            }
1253            writer.commit()?;
1254        }
1255
1256        Ok(None)
1257    }
1258
1259    /// Returns the earliest available block number that has not been expired and is still
1260    /// available.
1261    ///
1262    /// This means that the highest expired block (or expired block height) is
1263    /// `earliest_history_height.saturating_sub(1)`.
1264    ///
1265    /// Returns `0` if no history has been expired.
1266    pub fn earliest_history_height(&self) -> BlockNumber {
1267        self.earliest_history_height.load(std::sync::atomic::Ordering::Relaxed)
1268    }
1269
1270    /// Gets the lowest static file's block range if it exists for a static file segment.
1271    ///
1272    /// If there is nothing on disk for the given segment, this will return [`None`].
1273    pub fn get_lowest_range(&self, segment: StaticFileSegment) -> Option<SegmentRangeInclusive> {
1274        self.static_files_min_block.read().get(&segment).copied()
1275    }
1276
1277    /// Gets the lowest static file's block range start if it exists for a static file segment.
1278    ///
1279    /// For example if the lowest static file has blocks 0-499, this will return 0.
1280    ///
1281    /// If there is nothing on disk for the given segment, this will return [`None`].
1282    pub fn get_lowest_range_start(&self, segment: StaticFileSegment) -> Option<BlockNumber> {
1283        self.get_lowest_range(segment).map(|range| range.start())
1284    }
1285
1286    /// Gets the lowest static file's block range end if it exists for a static file segment.
1287    ///
1288    /// For example if the static file has blocks 0-499, this will return 499.
1289    ///
1290    /// If there is nothing on disk for the given segment, this will return [`None`].
1291    pub fn get_lowest_range_end(&self, segment: StaticFileSegment) -> Option<BlockNumber> {
1292        self.get_lowest_range(segment).map(|range| range.end())
1293    }
1294
1295    /// Gets the highest static file's block height if it exists for a static file segment.
1296    ///
1297    /// If there is nothing on disk for the given segment, this will return [`None`].
1298    pub fn get_highest_static_file_block(&self, segment: StaticFileSegment) -> Option<BlockNumber> {
1299        self.static_files_max_block.read().get(&segment).copied()
1300    }
1301
1302    /// Gets the highest static file transaction.
1303    ///
1304    /// If there is nothing on disk for the given segment, this will return [`None`].
1305    pub fn get_highest_static_file_tx(&self, segment: StaticFileSegment) -> Option<TxNumber> {
1306        self.static_files_tx_index
1307            .read()
1308            .get(&segment)
1309            .and_then(|index| index.last_key_value().map(|(last_tx, _)| *last_tx))
1310    }
1311
1312    /// Gets the highest static file block for all segments.
1313    pub fn get_highest_static_files(&self) -> HighestStaticFiles {
1314        HighestStaticFiles {
1315            receipts: self.get_highest_static_file_block(StaticFileSegment::Receipts),
1316        }
1317    }
1318
1319    /// Iterates through segment `static_files` in reverse order, executing a function until it
1320    /// returns some object. Useful for finding objects by [`TxHash`] or [`BlockHash`].
1321    pub fn find_static_file<T>(
1322        &self,
1323        segment: StaticFileSegment,
1324        func: impl Fn(StaticFileJarProvider<'_, N>) -> ProviderResult<Option<T>>,
1325    ) -> ProviderResult<Option<T>> {
1326        if let Some(ranges) = self.static_files_expected_block_index.read().get(&segment) {
1327            // Iterate through all ranges in reverse order (highest to lowest)
1328            for range in ranges.values().rev() {
1329                if let Some(res) = func(self.get_or_create_jar_provider(segment, range)?)? {
1330                    return Ok(Some(res))
1331                }
1332            }
1333        }
1334
1335        Ok(None)
1336    }
1337
1338    /// Fetches data within a specified range across multiple static files.
1339    ///
1340    /// This function iteratively retrieves data using `get_fn` for each item in the given range.
1341    /// It continues fetching until the end of the range is reached or the provided `predicate`
1342    /// returns false.
1343    pub fn fetch_range_with_predicate<T, F, P>(
1344        &self,
1345        segment: StaticFileSegment,
1346        range: Range<u64>,
1347        mut get_fn: F,
1348        mut predicate: P,
1349    ) -> ProviderResult<Vec<T>>
1350    where
1351        F: FnMut(&mut StaticFileCursor<'_>, u64) -> ProviderResult<Option<T>>,
1352        P: FnMut(&T) -> bool,
1353    {
1354        let mut result = Vec::with_capacity((range.end - range.start).min(100) as usize);
1355
1356        /// Resolves to the provider for the given block or transaction number.
1357        ///
1358        /// If the static file is missing, the `result` is returned.
1359        macro_rules! get_provider {
1360            ($number:expr) => {{
1361                match self.get_segment_provider(segment, $number) {
1362                    Ok(provider) => provider,
1363                    Err(
1364                        ProviderError::MissingStaticFileBlock(_, _) |
1365                        ProviderError::MissingStaticFileTx(_, _),
1366                    ) => return Ok(result),
1367                    Err(err) => return Err(err),
1368                }
1369            }};
1370        }
1371
1372        let mut provider = get_provider!(range.start);
1373        let mut cursor = provider.cursor()?;
1374
1375        // advances number in range
1376        'outer: for number in range {
1377            // The `retrying` flag ensures a single retry attempt per `number`. If `get_fn` fails to
1378            // access data in two different static files, it halts further attempts by returning
1379            // an error, effectively preventing infinite retry loops.
1380            let mut retrying = false;
1381
1382            // advances static files if `get_fn` returns None
1383            'inner: loop {
1384                match get_fn(&mut cursor, number)? {
1385                    Some(res) => {
1386                        if !predicate(&res) {
1387                            break 'outer
1388                        }
1389                        result.push(res);
1390                        break 'inner
1391                    }
1392                    None => {
1393                        if retrying {
1394                            return Ok(result)
1395                        }
1396                        // There is a very small chance of hitting a deadlock if two consecutive
1397                        // static files share the same bucket in the
1398                        // internal dashmap and we don't drop the current provider
1399                        // before requesting the next one.
1400                        drop(cursor);
1401                        drop(provider);
1402                        provider = get_provider!(number);
1403                        cursor = provider.cursor()?;
1404                        retrying = true;
1405                    }
1406                }
1407            }
1408        }
1409
1410        Ok(result)
1411    }
1412
1413    /// Fetches data within a specified range across multiple static files.
1414    ///
1415    /// Returns an iterator over the data
1416    pub fn fetch_range_iter<'a, T, F>(
1417        &'a self,
1418        segment: StaticFileSegment,
1419        range: Range<u64>,
1420        get_fn: F,
1421    ) -> ProviderResult<impl Iterator<Item = ProviderResult<T>> + 'a>
1422    where
1423        F: Fn(&mut StaticFileCursor<'_>, u64) -> ProviderResult<Option<T>> + 'a,
1424        T: std::fmt::Debug,
1425    {
1426        let mut provider = Some(self.get_segment_provider(segment, range.start)?);
1427        Ok(range.filter_map(move |number| {
1428            match get_fn(&mut provider.as_ref().expect("qed").cursor().ok()?, number).transpose() {
1429                Some(result) => Some(result),
1430                None => {
1431                    // There is a very small chance of hitting a deadlock if two consecutive static
1432                    // files share the same bucket in the internal dashmap and
1433                    // we don't drop the current provider before requesting the
1434                    // next one.
1435                    provider.take();
1436                    provider = Some(self.get_segment_provider(segment, number).ok()?);
1437                    get_fn(&mut provider.as_ref().expect("qed").cursor().ok()?, number).transpose()
1438                }
1439            }
1440        }))
1441    }
1442
1443    /// Returns directory where `static_files` are located.
1444    pub fn directory(&self) -> &Path {
1445        &self.path
1446    }
1447
1448    /// Retrieves data from the database or static file, wherever it's available.
1449    ///
1450    /// # Arguments
1451    /// * `segment` - The segment of the static file to check against.
1452    /// * `index_key` - Requested index key, usually a block or transaction number.
1453    /// * `fetch_from_static_file` - A closure that defines how to fetch the data from the static
1454    ///   file provider.
1455    /// * `fetch_from_database` - A closure that defines how to fetch the data from the database
1456    ///   when the static file doesn't contain the required data or is not available.
1457    pub fn get_with_static_file_or_database<T, FS, FD>(
1458        &self,
1459        segment: StaticFileSegment,
1460        number: u64,
1461        fetch_from_static_file: FS,
1462        fetch_from_database: FD,
1463    ) -> ProviderResult<Option<T>>
1464    where
1465        FS: Fn(&Self) -> ProviderResult<Option<T>>,
1466        FD: Fn() -> ProviderResult<Option<T>>,
1467    {
1468        // If there is, check the maximum block or transaction number of the segment.
1469        let static_file_upper_bound = if segment.is_block_based() {
1470            self.get_highest_static_file_block(segment)
1471        } else {
1472            self.get_highest_static_file_tx(segment)
1473        };
1474
1475        if static_file_upper_bound
1476            .is_some_and(|static_file_upper_bound| static_file_upper_bound >= number)
1477        {
1478            return fetch_from_static_file(self)
1479        }
1480        fetch_from_database()
1481    }
1482
1483    /// Gets data within a specified range, potentially spanning different `static_files` and
1484    /// database.
1485    ///
1486    /// # Arguments
1487    /// * `segment` - The segment of the static file to query.
1488    /// * `block_range` - The range of data to fetch.
1489    /// * `fetch_from_static_file` - A function to fetch data from the `static_file`.
1490    /// * `fetch_from_database` - A function to fetch data from the database.
1491    /// * `predicate` - A function used to evaluate each item in the fetched data. Fetching is
1492    ///   terminated when this function returns false, thereby filtering the data based on the
1493    ///   provided condition.
1494    pub fn get_range_with_static_file_or_database<T, P, FS, FD>(
1495        &self,
1496        segment: StaticFileSegment,
1497        mut block_or_tx_range: Range<u64>,
1498        fetch_from_static_file: FS,
1499        mut fetch_from_database: FD,
1500        mut predicate: P,
1501    ) -> ProviderResult<Vec<T>>
1502    where
1503        FS: Fn(&Self, Range<u64>, &mut P) -> ProviderResult<Vec<T>>,
1504        FD: FnMut(Range<u64>, P) -> ProviderResult<Vec<T>>,
1505        P: FnMut(&T) -> bool,
1506    {
1507        let mut data = Vec::new();
1508
1509        // If there is, check the maximum block or transaction number of the segment.
1510        if let Some(static_file_upper_bound) = if segment.is_block_based() {
1511            self.get_highest_static_file_block(segment)
1512        } else {
1513            self.get_highest_static_file_tx(segment)
1514        } && block_or_tx_range.start <= static_file_upper_bound
1515        {
1516            let end = block_or_tx_range.end.min(static_file_upper_bound + 1);
1517            data.extend(fetch_from_static_file(
1518                self,
1519                block_or_tx_range.start..end,
1520                &mut predicate,
1521            )?);
1522            block_or_tx_range.start = end;
1523        }
1524
1525        if block_or_tx_range.end > block_or_tx_range.start {
1526            data.extend(fetch_from_database(block_or_tx_range, predicate)?)
1527        }
1528
1529        Ok(data)
1530    }
1531
1532    /// Returns `static_files` directory
1533    #[cfg(any(test, feature = "test-utils"))]
1534    pub fn path(&self) -> &Path {
1535        &self.path
1536    }
1537
1538    /// Returns `static_files` transaction index
1539    #[cfg(any(test, feature = "test-utils"))]
1540    pub fn tx_index(&self) -> &RwLock<SegmentRanges> {
1541        &self.static_files_tx_index
1542    }
1543
1544    /// Returns `static_files` expected block index
1545    #[cfg(any(test, feature = "test-utils"))]
1546    pub fn expected_block_index(&self) -> &RwLock<SegmentRanges> {
1547        &self.static_files_expected_block_index
1548    }
1549}
1550
1551/// Helper trait to manage different [`StaticFileProviderRW`] of an `Arc<StaticFileProvider`
1552pub trait StaticFileWriter {
1553    /// The primitives type used by the static file provider.
1554    type Primitives: Send + Sync + 'static;
1555
1556    /// Returns a mutable reference to a [`StaticFileProviderRW`] of a [`StaticFileSegment`].
1557    fn get_writer(
1558        &self,
1559        block: BlockNumber,
1560        segment: StaticFileSegment,
1561    ) -> ProviderResult<StaticFileProviderRWRefMut<'_, Self::Primitives>>;
1562
1563    /// Returns a mutable reference to a [`StaticFileProviderRW`] of the latest
1564    /// [`StaticFileSegment`].
1565    fn latest_writer(
1566        &self,
1567        segment: StaticFileSegment,
1568    ) -> ProviderResult<StaticFileProviderRWRefMut<'_, Self::Primitives>>;
1569
1570    /// Commits all changes of all [`StaticFileProviderRW`] of all [`StaticFileSegment`].
1571    fn commit(&self) -> ProviderResult<()>;
1572
1573    /// Returns `true` if the static file provider has unwind queued.
1574    fn has_unwind_queued(&self) -> bool;
1575}
1576
1577impl<N: NodePrimitives> StaticFileWriter for StaticFileProvider<N> {
1578    type Primitives = N;
1579
1580    fn get_writer(
1581        &self,
1582        block: BlockNumber,
1583        segment: StaticFileSegment,
1584    ) -> ProviderResult<StaticFileProviderRWRefMut<'_, Self::Primitives>> {
1585        if self.access.is_read_only() {
1586            return Err(ProviderError::ReadOnlyStaticFileAccess)
1587        }
1588
1589        trace!(target: "provider::static_file", ?block, ?segment, "Getting static file writer.");
1590        self.writers.get_or_create(segment, || {
1591            StaticFileProviderRW::new(segment, block, Arc::downgrade(&self.0), self.metrics.clone())
1592        })
1593    }
1594
1595    fn latest_writer(
1596        &self,
1597        segment: StaticFileSegment,
1598    ) -> ProviderResult<StaticFileProviderRWRefMut<'_, Self::Primitives>> {
1599        self.get_writer(self.get_highest_static_file_block(segment).unwrap_or_default(), segment)
1600    }
1601
1602    fn commit(&self) -> ProviderResult<()> {
1603        self.writers.commit()
1604    }
1605
1606    fn has_unwind_queued(&self) -> bool {
1607        self.writers.has_unwind_queued()
1608    }
1609}
1610
1611impl<N: NodePrimitives<BlockHeader: Value>> HeaderProvider for StaticFileProvider<N> {
1612    type Header = N::BlockHeader;
1613
1614    fn header(&self, block_hash: BlockHash) -> ProviderResult<Option<Self::Header>> {
1615        self.find_static_file(StaticFileSegment::Headers, |jar_provider| {
1616            Ok(jar_provider
1617                .cursor()?
1618                .get_two::<HeaderWithHashMask<Self::Header>>((&block_hash).into())?
1619                .and_then(|(header, hash)| {
1620                    if hash == block_hash {
1621                        return Some(header)
1622                    }
1623                    None
1624                }))
1625        })
1626    }
1627
1628    fn header_by_number(&self, num: BlockNumber) -> ProviderResult<Option<Self::Header>> {
1629        self.get_segment_provider_for_block(StaticFileSegment::Headers, num, None)
1630            .and_then(|provider| provider.header_by_number(num))
1631            .or_else(|err| {
1632                if let ProviderError::MissingStaticFileBlock(_, _) = err {
1633                    Ok(None)
1634                } else {
1635                    Err(err)
1636                }
1637            })
1638    }
1639
1640    fn headers_range(
1641        &self,
1642        range: impl RangeBounds<BlockNumber>,
1643    ) -> ProviderResult<Vec<Self::Header>> {
1644        self.fetch_range_with_predicate(
1645            StaticFileSegment::Headers,
1646            to_range(range),
1647            |cursor, number| cursor.get_one::<HeaderMask<Self::Header>>(number.into()),
1648            |_| true,
1649        )
1650    }
1651
1652    fn sealed_header(
1653        &self,
1654        num: BlockNumber,
1655    ) -> ProviderResult<Option<SealedHeader<Self::Header>>> {
1656        self.get_segment_provider_for_block(StaticFileSegment::Headers, num, None)
1657            .and_then(|provider| provider.sealed_header(num))
1658            .or_else(|err| {
1659                if let ProviderError::MissingStaticFileBlock(_, _) = err {
1660                    Ok(None)
1661                } else {
1662                    Err(err)
1663                }
1664            })
1665    }
1666
1667    fn sealed_headers_while(
1668        &self,
1669        range: impl RangeBounds<BlockNumber>,
1670        predicate: impl FnMut(&SealedHeader<Self::Header>) -> bool,
1671    ) -> ProviderResult<Vec<SealedHeader<Self::Header>>> {
1672        self.fetch_range_with_predicate(
1673            StaticFileSegment::Headers,
1674            to_range(range),
1675            |cursor, number| {
1676                Ok(cursor
1677                    .get_two::<HeaderWithHashMask<Self::Header>>(number.into())?
1678                    .map(|(header, hash)| SealedHeader::new(header, hash)))
1679            },
1680            predicate,
1681        )
1682    }
1683}
1684
1685impl<N: NodePrimitives> BlockHashReader for StaticFileProvider<N> {
1686    fn block_hash(&self, num: u64) -> ProviderResult<Option<B256>> {
1687        self.get_segment_provider_for_block(StaticFileSegment::Headers, num, None)
1688            .and_then(|provider| provider.block_hash(num))
1689            .or_else(|err| {
1690                if let ProviderError::MissingStaticFileBlock(_, _) = err {
1691                    Ok(None)
1692                } else {
1693                    Err(err)
1694                }
1695            })
1696    }
1697
1698    fn canonical_hashes_range(
1699        &self,
1700        start: BlockNumber,
1701        end: BlockNumber,
1702    ) -> ProviderResult<Vec<B256>> {
1703        self.fetch_range_with_predicate(
1704            StaticFileSegment::Headers,
1705            start..end,
1706            |cursor, number| cursor.get_one::<BlockHashMask>(number.into()),
1707            |_| true,
1708        )
1709    }
1710}
1711
1712impl<N: NodePrimitives<SignedTx: Value + SignedTransaction, Receipt: Value>> ReceiptProvider
1713    for StaticFileProvider<N>
1714{
1715    type Receipt = N::Receipt;
1716
1717    fn receipt(&self, num: TxNumber) -> ProviderResult<Option<Self::Receipt>> {
1718        self.get_segment_provider_for_transaction(StaticFileSegment::Receipts, num, None)
1719            .and_then(|provider| provider.receipt(num))
1720            .or_else(|err| {
1721                if let ProviderError::MissingStaticFileTx(_, _) = err {
1722                    Ok(None)
1723                } else {
1724                    Err(err)
1725                }
1726            })
1727    }
1728
1729    fn receipt_by_hash(&self, hash: TxHash) -> ProviderResult<Option<Self::Receipt>> {
1730        if let Some(num) = self.transaction_id(hash)? {
1731            return self.receipt(num)
1732        }
1733        Ok(None)
1734    }
1735
1736    fn receipts_by_block(
1737        &self,
1738        _block: BlockHashOrNumber,
1739    ) -> ProviderResult<Option<Vec<Self::Receipt>>> {
1740        unreachable!()
1741    }
1742
1743    fn receipts_by_tx_range(
1744        &self,
1745        range: impl RangeBounds<TxNumber>,
1746    ) -> ProviderResult<Vec<Self::Receipt>> {
1747        self.fetch_range_with_predicate(
1748            StaticFileSegment::Receipts,
1749            to_range(range),
1750            |cursor, number| cursor.get_one::<ReceiptMask<Self::Receipt>>(number.into()),
1751            |_| true,
1752        )
1753    }
1754
1755    fn receipts_by_block_range(
1756        &self,
1757        _block_range: RangeInclusive<BlockNumber>,
1758    ) -> ProviderResult<Vec<Vec<Self::Receipt>>> {
1759        Err(ProviderError::UnsupportedProvider)
1760    }
1761}
1762
1763impl<N: NodePrimitives<SignedTx: Value, Receipt: Value, BlockHeader: Value>> TransactionsProviderExt
1764    for StaticFileProvider<N>
1765{
1766    fn transaction_hashes_by_range(
1767        &self,
1768        tx_range: Range<TxNumber>,
1769    ) -> ProviderResult<Vec<(TxHash, TxNumber)>> {
1770        let tx_range_size = (tx_range.end - tx_range.start) as usize;
1771
1772        // Transactions are different size, so chunks will not all take the same processing time. If
1773        // chunks are too big, there will be idle threads waiting for work. Choosing an
1774        // arbitrary smaller value to make sure it doesn't happen.
1775        let chunk_size = 100;
1776
1777        // iterator over the chunks
1778        let chunks = tx_range
1779            .clone()
1780            .step_by(chunk_size)
1781            .map(|start| start..std::cmp::min(start + chunk_size as u64, tx_range.end));
1782        let mut channels = Vec::with_capacity(tx_range_size.div_ceil(chunk_size));
1783
1784        for chunk_range in chunks {
1785            let (channel_tx, channel_rx) = mpsc::channel();
1786            channels.push(channel_rx);
1787
1788            let manager = self.clone();
1789
1790            // Spawn the task onto the global rayon pool
1791            // This task will send the results through the channel after it has calculated
1792            // the hash.
1793            rayon::spawn(move || {
1794                let mut rlp_buf = Vec::with_capacity(128);
1795                let _ = manager.fetch_range_with_predicate(
1796                    StaticFileSegment::Transactions,
1797                    chunk_range,
1798                    |cursor, number| {
1799                        Ok(cursor
1800                            .get_one::<TransactionMask<Self::Transaction>>(number.into())?
1801                            .map(|transaction| {
1802                                rlp_buf.clear();
1803                                let _ = channel_tx
1804                                    .send(calculate_hash((number, transaction), &mut rlp_buf));
1805                            }))
1806                    },
1807                    |_| true,
1808                );
1809            });
1810        }
1811
1812        let mut tx_list = Vec::with_capacity(tx_range_size);
1813
1814        // Iterate over channels and append the tx hashes unsorted
1815        for channel in channels {
1816            while let Ok(tx) = channel.recv() {
1817                let (tx_hash, tx_id) = tx.map_err(|boxed| *boxed)?;
1818                tx_list.push((tx_hash, tx_id));
1819            }
1820        }
1821
1822        Ok(tx_list)
1823    }
1824}
1825
1826impl<N: NodePrimitives<SignedTx: Decompress + SignedTransaction>> TransactionsProvider
1827    for StaticFileProvider<N>
1828{
1829    type Transaction = N::SignedTx;
1830
1831    fn transaction_id(&self, tx_hash: TxHash) -> ProviderResult<Option<TxNumber>> {
1832        self.find_static_file(StaticFileSegment::Transactions, |jar_provider| {
1833            let mut cursor = jar_provider.cursor()?;
1834            if cursor
1835                .get_one::<TransactionMask<Self::Transaction>>((&tx_hash).into())?
1836                .and_then(|tx| (tx.trie_hash() == tx_hash).then_some(tx))
1837                .is_some()
1838            {
1839                Ok(cursor.number())
1840            } else {
1841                Ok(None)
1842            }
1843        })
1844    }
1845
1846    fn transaction_by_id(&self, num: TxNumber) -> ProviderResult<Option<Self::Transaction>> {
1847        self.get_segment_provider_for_transaction(StaticFileSegment::Transactions, num, None)
1848            .and_then(|provider| provider.transaction_by_id(num))
1849            .or_else(|err| {
1850                if let ProviderError::MissingStaticFileTx(_, _) = err {
1851                    Ok(None)
1852                } else {
1853                    Err(err)
1854                }
1855            })
1856    }
1857
1858    fn transaction_by_id_unhashed(
1859        &self,
1860        num: TxNumber,
1861    ) -> ProviderResult<Option<Self::Transaction>> {
1862        self.get_segment_provider_for_transaction(StaticFileSegment::Transactions, num, None)
1863            .and_then(|provider| provider.transaction_by_id_unhashed(num))
1864            .or_else(|err| {
1865                if let ProviderError::MissingStaticFileTx(_, _) = err {
1866                    Ok(None)
1867                } else {
1868                    Err(err)
1869                }
1870            })
1871    }
1872
1873    fn transaction_by_hash(&self, hash: TxHash) -> ProviderResult<Option<Self::Transaction>> {
1874        self.find_static_file(StaticFileSegment::Transactions, |jar_provider| {
1875            Ok(jar_provider
1876                .cursor()?
1877                .get_one::<TransactionMask<Self::Transaction>>((&hash).into())?
1878                .and_then(|tx| (tx.trie_hash() == hash).then_some(tx)))
1879        })
1880    }
1881
1882    fn transaction_by_hash_with_meta(
1883        &self,
1884        _hash: TxHash,
1885    ) -> ProviderResult<Option<(Self::Transaction, TransactionMeta)>> {
1886        // Required data not present in static_files
1887        Err(ProviderError::UnsupportedProvider)
1888    }
1889
1890    fn transactions_by_block(
1891        &self,
1892        _block_id: BlockHashOrNumber,
1893    ) -> ProviderResult<Option<Vec<Self::Transaction>>> {
1894        // Required data not present in static_files
1895        Err(ProviderError::UnsupportedProvider)
1896    }
1897
1898    fn transactions_by_block_range(
1899        &self,
1900        _range: impl RangeBounds<BlockNumber>,
1901    ) -> ProviderResult<Vec<Vec<Self::Transaction>>> {
1902        // Required data not present in static_files
1903        Err(ProviderError::UnsupportedProvider)
1904    }
1905
1906    fn transactions_by_tx_range(
1907        &self,
1908        range: impl RangeBounds<TxNumber>,
1909    ) -> ProviderResult<Vec<Self::Transaction>> {
1910        self.fetch_range_with_predicate(
1911            StaticFileSegment::Transactions,
1912            to_range(range),
1913            |cursor, number| cursor.get_one::<TransactionMask<Self::Transaction>>(number.into()),
1914            |_| true,
1915        )
1916    }
1917
1918    fn senders_by_tx_range(
1919        &self,
1920        range: impl RangeBounds<TxNumber>,
1921    ) -> ProviderResult<Vec<Address>> {
1922        let txes = self.transactions_by_tx_range(range)?;
1923        Ok(reth_primitives_traits::transaction::recover::recover_signers(&txes)?)
1924    }
1925
1926    fn transaction_sender(&self, id: TxNumber) -> ProviderResult<Option<Address>> {
1927        match self.transaction_by_id_unhashed(id)? {
1928            Some(tx) => Ok(tx.recover_signer().ok()),
1929            None => Ok(None),
1930        }
1931    }
1932}
1933
1934impl<N: NodePrimitives> BlockNumReader for StaticFileProvider<N> {
1935    fn chain_info(&self) -> ProviderResult<ChainInfo> {
1936        // Required data not present in static_files
1937        Err(ProviderError::UnsupportedProvider)
1938    }
1939
1940    fn best_block_number(&self) -> ProviderResult<BlockNumber> {
1941        // Required data not present in static_files
1942        Err(ProviderError::UnsupportedProvider)
1943    }
1944
1945    fn last_block_number(&self) -> ProviderResult<BlockNumber> {
1946        Ok(self.get_highest_static_file_block(StaticFileSegment::Headers).unwrap_or_default())
1947    }
1948
1949    fn block_number(&self, _hash: B256) -> ProviderResult<Option<BlockNumber>> {
1950        // Required data not present in static_files
1951        Err(ProviderError::UnsupportedProvider)
1952    }
1953}
1954
1955/* Cannot be successfully implemented but must exist for trait requirements */
1956
1957impl<N: NodePrimitives<SignedTx: Value, Receipt: Value, BlockHeader: Value>> BlockReader
1958    for StaticFileProvider<N>
1959{
1960    type Block = N::Block;
1961
1962    fn find_block_by_hash(
1963        &self,
1964        _hash: B256,
1965        _source: BlockSource,
1966    ) -> ProviderResult<Option<Self::Block>> {
1967        // Required data not present in static_files
1968        Err(ProviderError::UnsupportedProvider)
1969    }
1970
1971    fn block(&self, _id: BlockHashOrNumber) -> ProviderResult<Option<Self::Block>> {
1972        // Required data not present in static_files
1973        Err(ProviderError::UnsupportedProvider)
1974    }
1975
1976    fn pending_block(&self) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
1977        // Required data not present in static_files
1978        Err(ProviderError::UnsupportedProvider)
1979    }
1980
1981    fn pending_block_and_receipts(
1982        &self,
1983    ) -> ProviderResult<Option<(RecoveredBlock<Self::Block>, Vec<Self::Receipt>)>> {
1984        // Required data not present in static_files
1985        Err(ProviderError::UnsupportedProvider)
1986    }
1987
1988    fn recovered_block(
1989        &self,
1990        _id: BlockHashOrNumber,
1991        _transaction_kind: TransactionVariant,
1992    ) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
1993        // Required data not present in static_files
1994        Err(ProviderError::UnsupportedProvider)
1995    }
1996
1997    fn sealed_block_with_senders(
1998        &self,
1999        _id: BlockHashOrNumber,
2000        _transaction_kind: TransactionVariant,
2001    ) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
2002        // Required data not present in static_files
2003        Err(ProviderError::UnsupportedProvider)
2004    }
2005
2006    fn block_range(&self, _range: RangeInclusive<BlockNumber>) -> ProviderResult<Vec<Self::Block>> {
2007        // Required data not present in static_files
2008        Err(ProviderError::UnsupportedProvider)
2009    }
2010
2011    fn block_with_senders_range(
2012        &self,
2013        _range: RangeInclusive<BlockNumber>,
2014    ) -> ProviderResult<Vec<RecoveredBlock<Self::Block>>> {
2015        Err(ProviderError::UnsupportedProvider)
2016    }
2017
2018    fn recovered_block_range(
2019        &self,
2020        _range: RangeInclusive<BlockNumber>,
2021    ) -> ProviderResult<Vec<RecoveredBlock<Self::Block>>> {
2022        Err(ProviderError::UnsupportedProvider)
2023    }
2024
2025    fn block_by_transaction_id(&self, _id: TxNumber) -> ProviderResult<Option<BlockNumber>> {
2026        Err(ProviderError::UnsupportedProvider)
2027    }
2028}
2029
2030impl<N: NodePrimitives> BlockBodyIndicesProvider for StaticFileProvider<N> {
2031    fn block_body_indices(&self, _num: u64) -> ProviderResult<Option<StoredBlockBodyIndices>> {
2032        Err(ProviderError::UnsupportedProvider)
2033    }
2034
2035    fn block_body_indices_range(
2036        &self,
2037        _range: RangeInclusive<BlockNumber>,
2038    ) -> ProviderResult<Vec<StoredBlockBodyIndices>> {
2039        Err(ProviderError::UnsupportedProvider)
2040    }
2041}
2042
2043impl<N: NodePrimitives> StatsReader for StaticFileProvider<N> {
2044    fn count_entries<T: Table>(&self) -> ProviderResult<usize> {
2045        match T::NAME {
2046            tables::CanonicalHeaders::NAME |
2047            tables::Headers::<Header>::NAME |
2048            tables::HeaderTerminalDifficulties::NAME => Ok(self
2049                .get_highest_static_file_block(StaticFileSegment::Headers)
2050                .map(|block| block + 1)
2051                .unwrap_or_default()
2052                as usize),
2053            tables::Receipts::<Receipt>::NAME => Ok(self
2054                .get_highest_static_file_tx(StaticFileSegment::Receipts)
2055                .map(|receipts| receipts + 1)
2056                .unwrap_or_default() as usize),
2057            tables::Transactions::<TransactionSigned>::NAME => Ok(self
2058                .get_highest_static_file_tx(StaticFileSegment::Transactions)
2059                .map(|txs| txs + 1)
2060                .unwrap_or_default()
2061                as usize),
2062            _ => Err(ProviderError::UnsupportedProvider),
2063        }
2064    }
2065}
2066
2067/// Calculates the tx hash for the given transaction and its id.
2068#[inline]
2069fn calculate_hash<T>(
2070    entry: (TxNumber, T),
2071    rlp_buf: &mut Vec<u8>,
2072) -> Result<(B256, TxNumber), Box<ProviderError>>
2073where
2074    T: Encodable2718,
2075{
2076    let (tx_id, tx) = entry;
2077    tx.encode_2718(rlp_buf);
2078    Ok((keccak256(rlp_buf), tx_id))
2079}
2080
2081#[cfg(test)]
2082mod tests {
2083    use std::collections::BTreeMap;
2084
2085    use reth_chain_state::EthPrimitives;
2086    use reth_db::test_utils::create_test_static_files_dir;
2087    use reth_static_file_types::{SegmentRangeInclusive, StaticFileSegment};
2088
2089    use crate::StaticFileProviderBuilder;
2090
2091    #[test]
2092    fn test_find_fixed_range_with_block_index() -> eyre::Result<()> {
2093        let (static_dir, _) = create_test_static_files_dir();
2094        let sf_rw = StaticFileProviderBuilder::<EthPrimitives>::read_write(&static_dir)?
2095            .with_blocks_per_file(100)
2096            .build()?;
2097
2098        let segment = StaticFileSegment::Headers;
2099
2100        // Test with None - should use default behavior
2101        assert_eq!(
2102            sf_rw.find_fixed_range_with_block_index(segment, None, 0),
2103            SegmentRangeInclusive::new(0, 99)
2104        );
2105        assert_eq!(
2106            sf_rw.find_fixed_range_with_block_index(segment, None, 250),
2107            SegmentRangeInclusive::new(200, 299)
2108        );
2109
2110        // Test with empty index - should fall back to default behavior
2111        assert_eq!(
2112            sf_rw.find_fixed_range_with_block_index(segment, Some(&BTreeMap::new()), 150),
2113            SegmentRangeInclusive::new(100, 199)
2114        );
2115
2116        // Create block index with existing ranges
2117        let block_index = BTreeMap::from_iter([
2118            (99, SegmentRangeInclusive::new(0, 99)),
2119            (199, SegmentRangeInclusive::new(100, 199)),
2120            (299, SegmentRangeInclusive::new(200, 299)),
2121        ]);
2122
2123        // Test blocks within existing ranges - should return the matching range
2124        assert_eq!(
2125            sf_rw.find_fixed_range_with_block_index(segment, Some(&block_index), 0),
2126            SegmentRangeInclusive::new(0, 99)
2127        );
2128        assert_eq!(
2129            sf_rw.find_fixed_range_with_block_index(segment, Some(&block_index), 50),
2130            SegmentRangeInclusive::new(0, 99)
2131        );
2132        assert_eq!(
2133            sf_rw.find_fixed_range_with_block_index(segment, Some(&block_index), 99),
2134            SegmentRangeInclusive::new(0, 99)
2135        );
2136        assert_eq!(
2137            sf_rw.find_fixed_range_with_block_index(segment, Some(&block_index), 100),
2138            SegmentRangeInclusive::new(100, 199)
2139        );
2140        assert_eq!(
2141            sf_rw.find_fixed_range_with_block_index(segment, Some(&block_index), 150),
2142            SegmentRangeInclusive::new(100, 199)
2143        );
2144        assert_eq!(
2145            sf_rw.find_fixed_range_with_block_index(segment, Some(&block_index), 199),
2146            SegmentRangeInclusive::new(100, 199)
2147        );
2148
2149        // Test blocks beyond existing ranges - should derive new ranges from the last range
2150        // Block 300 is exactly one segment after the last range
2151        assert_eq!(
2152            sf_rw.find_fixed_range_with_block_index(segment, Some(&block_index), 300),
2153            SegmentRangeInclusive::new(300, 399)
2154        );
2155        assert_eq!(
2156            sf_rw.find_fixed_range_with_block_index(segment, Some(&block_index), 350),
2157            SegmentRangeInclusive::new(300, 399)
2158        );
2159
2160        // Block 500 skips one segment (300-399)
2161        assert_eq!(
2162            sf_rw.find_fixed_range_with_block_index(segment, Some(&block_index), 500),
2163            SegmentRangeInclusive::new(500, 599)
2164        );
2165
2166        // Block 1000 skips many segments
2167        assert_eq!(
2168            sf_rw.find_fixed_range_with_block_index(segment, Some(&block_index), 1000),
2169            SegmentRangeInclusive::new(1000, 1099)
2170        );
2171
2172        // Test with block index having different sizes than blocks_per_file setting
2173        // This simulates the scenario where blocks_per_file was changed between runs
2174        let mixed_size_index = BTreeMap::from_iter([
2175            (49, SegmentRangeInclusive::new(0, 49)),     // 50 blocks
2176            (149, SegmentRangeInclusive::new(50, 149)),  // 100 blocks
2177            (349, SegmentRangeInclusive::new(150, 349)), // 200 blocks
2178        ]);
2179
2180        // Blocks within existing ranges should return those ranges regardless of size
2181        assert_eq!(
2182            sf_rw.find_fixed_range_with_block_index(segment, Some(&mixed_size_index), 25),
2183            SegmentRangeInclusive::new(0, 49)
2184        );
2185        assert_eq!(
2186            sf_rw.find_fixed_range_with_block_index(segment, Some(&mixed_size_index), 100),
2187            SegmentRangeInclusive::new(50, 149)
2188        );
2189        assert_eq!(
2190            sf_rw.find_fixed_range_with_block_index(segment, Some(&mixed_size_index), 200),
2191            SegmentRangeInclusive::new(150, 349)
2192        );
2193
2194        // Block after the last range should derive using current blocks_per_file (100)
2195        // from the end of the last range (349)
2196        assert_eq!(
2197            sf_rw.find_fixed_range_with_block_index(segment, Some(&mixed_size_index), 350),
2198            SegmentRangeInclusive::new(350, 449)
2199        );
2200        assert_eq!(
2201            sf_rw.find_fixed_range_with_block_index(segment, Some(&mixed_size_index), 450),
2202            SegmentRangeInclusive::new(450, 549)
2203        );
2204        assert_eq!(
2205            sf_rw.find_fixed_range_with_block_index(segment, Some(&mixed_size_index), 550),
2206            SegmentRangeInclusive::new(550, 649)
2207        );
2208
2209        Ok(())
2210    }
2211}