reth_provider/providers/static_file/
manager.rs

1use super::{
2    metrics::StaticFileProviderMetrics, writer::StaticFileWriters, LoadedJar,
3    StaticFileJarProvider, StaticFileProviderRW, StaticFileProviderRWRefMut,
4};
5use crate::{
6    to_range, BlockHashReader, BlockNumReader, BlockReader, BlockSource, EitherWriter,
7    EitherWriterDestination, HeaderProvider, ReceiptProvider, StageCheckpointReader, StatsReader,
8    TransactionVariant, TransactionsProvider, TransactionsProviderExt,
9};
10use alloy_consensus::{transaction::TransactionMeta, Header};
11use alloy_eips::{eip2718::Encodable2718, BlockHashOrNumber};
12use alloy_primitives::{b256, keccak256, Address, BlockHash, BlockNumber, TxHash, TxNumber, B256};
13use dashmap::DashMap;
14use notify::{RecommendedWatcher, RecursiveMode, Watcher};
15use parking_lot::RwLock;
16use reth_chainspec::{ChainInfo, ChainSpecProvider, EthChainSpec, NamedChain};
17use reth_db::{
18    lockfile::StorageLock,
19    static_file::{
20        iter_static_files, BlockHashMask, HeaderMask, HeaderWithHashMask, ReceiptMask,
21        StaticFileCursor, TransactionMask, TransactionSenderMask,
22    },
23};
24use reth_db_api::{
25    cursor::DbCursorRO,
26    models::StoredBlockBodyIndices,
27    table::{Decompress, Table, Value},
28    tables,
29    transaction::DbTx,
30};
31use reth_ethereum_primitives::{Receipt, TransactionSigned};
32use reth_nippy_jar::{NippyJar, NippyJarChecker, CONFIG_FILE_EXTENSION};
33use reth_node_types::NodePrimitives;
34use reth_primitives_traits::{RecoveredBlock, SealedHeader, SignedTransaction};
35use reth_stages_types::{PipelineTarget, StageId};
36use reth_static_file_types::{
37    find_fixed_range, HighestStaticFiles, SegmentHeader, SegmentRangeInclusive, StaticFileSegment,
38    DEFAULT_BLOCKS_PER_STATIC_FILE,
39};
40use reth_storage_api::{BlockBodyIndicesProvider, DBProvider, StorageSettingsCache};
41use reth_storage_errors::provider::{ProviderError, ProviderResult};
42use std::{
43    collections::{BTreeMap, HashMap},
44    fmt::Debug,
45    ops::{Deref, Range, RangeBounds, RangeInclusive},
46    path::{Path, PathBuf},
47    sync::{atomic::AtomicU64, mpsc, Arc},
48};
49use tracing::{debug, info, trace, warn};
50
51/// Alias type for a map that can be queried for block or transaction ranges. It uses `u64` to
52/// represent either a block or a transaction number end of a static file range.
53type SegmentRanges = BTreeMap<u64, SegmentRangeInclusive>;
54
55/// Access mode on a static file provider. RO/RW.
56#[derive(Debug, Default, PartialEq, Eq)]
57pub enum StaticFileAccess {
58    /// Read-only access.
59    #[default]
60    RO,
61    /// Read-write access.
62    RW,
63}
64
65impl StaticFileAccess {
66    /// Returns `true` if read-only access.
67    pub const fn is_read_only(&self) -> bool {
68        matches!(self, Self::RO)
69    }
70
71    /// Returns `true` if read-write access.
72    pub const fn is_read_write(&self) -> bool {
73        matches!(self, Self::RW)
74    }
75}
76
77/// [`StaticFileProvider`] manages all existing [`StaticFileJarProvider`].
78///
79/// "Static files" contain immutable chain history data, such as:
80///  - transactions
81///  - headers
82///  - receipts
83///
84/// This provider type is responsible for reading and writing to static files.
85#[derive(Debug)]
86pub struct StaticFileProvider<N>(pub(crate) Arc<StaticFileProviderInner<N>>);
87
88impl<N> Clone for StaticFileProvider<N> {
89    fn clone(&self) -> Self {
90        Self(self.0.clone())
91    }
92}
93
94/// Builder for [`StaticFileProvider`] that allows configuration before initialization.
95#[derive(Debug)]
96pub struct StaticFileProviderBuilder<N> {
97    inner: StaticFileProviderInner<N>,
98}
99
100impl<N: NodePrimitives> StaticFileProviderBuilder<N> {
101    /// Creates a new builder with read-write access.
102    pub fn read_write(path: impl AsRef<Path>) -> ProviderResult<Self> {
103        StaticFileProviderInner::new(path, StaticFileAccess::RW).map(|inner| Self { inner })
104    }
105
106    /// Creates a new builder with read-only access.
107    pub fn read_only(path: impl AsRef<Path>) -> ProviderResult<Self> {
108        StaticFileProviderInner::new(path, StaticFileAccess::RO).map(|inner| Self { inner })
109    }
110
111    /// Set custom blocks per file for specific segments.
112    ///
113    /// Each static file segment is stored across multiple files, and each of these files contains
114    /// up to the specified number of blocks of data. When the file gets full, a new file is
115    /// created with the new block range.
116    ///
117    /// This setting affects the size of each static file, and can be set per segment.
118    ///
119    /// If it is changed for an existing node, existing static files will not be affected and will
120    /// be finished with the old blocks per file setting, but new static files will use the new
121    /// setting.
122    pub fn with_blocks_per_file_for_segments(
123        mut self,
124        segments: HashMap<StaticFileSegment, u64>,
125    ) -> Self {
126        self.inner.blocks_per_file.extend(segments);
127        self
128    }
129
130    /// Set a custom number of blocks per file for all segments.
131    pub fn with_blocks_per_file(mut self, blocks_per_file: u64) -> Self {
132        for segment in StaticFileSegment::iter() {
133            self.inner.blocks_per_file.insert(segment, blocks_per_file);
134        }
135        self
136    }
137
138    /// Set a custom number of blocks per file for a specific segment.
139    pub fn with_blocks_per_file_for_segment(
140        mut self,
141        segment: StaticFileSegment,
142        blocks_per_file: u64,
143    ) -> Self {
144        self.inner.blocks_per_file.insert(segment, blocks_per_file);
145        self
146    }
147
148    /// Enables metrics on the [`StaticFileProvider`].
149    pub fn with_metrics(mut self) -> Self {
150        self.inner.metrics = Some(Arc::new(StaticFileProviderMetrics::default()));
151        self
152    }
153
154    /// Sets the genesis block number for the [`StaticFileProvider`].
155    ///
156    /// This configures the genesis block number, which is used to determine the starting point
157    /// for block indexing and querying operations.
158    ///
159    /// # Arguments
160    ///
161    /// * `genesis_block_number` - The block number of the genesis block.
162    ///
163    /// # Returns
164    ///
165    /// Returns `Self` to allow method chaining.
166    pub const fn with_genesis_block_number(mut self, genesis_block_number: u64) -> Self {
167        self.inner.genesis_block_number = genesis_block_number;
168        self
169    }
170
171    /// Builds the final [`StaticFileProvider`] and initializes the index.
172    pub fn build(self) -> ProviderResult<StaticFileProvider<N>> {
173        let provider = StaticFileProvider(Arc::new(self.inner));
174        provider.initialize_index()?;
175        Ok(provider)
176    }
177}
178
179impl<N: NodePrimitives> StaticFileProvider<N> {
180    /// Creates a new [`StaticFileProvider`] with read-only access.
181    ///
182    /// Set `watch_directory` to `true` to track the most recent changes in static files. Otherwise,
183    /// new data won't be detected or queryable.
184    ///
185    /// Watching is recommended if the read-only provider is used on a directory that an active node
186    /// instance is modifying.
187    ///
188    /// See also [`StaticFileProvider::watch_directory`].
189    pub fn read_only(path: impl AsRef<Path>, watch_directory: bool) -> ProviderResult<Self> {
190        let provider = StaticFileProviderBuilder::read_only(path)?.build()?;
191
192        if watch_directory {
193            provider.watch_directory();
194        }
195
196        Ok(provider)
197    }
198
199    /// Creates a new [`StaticFileProvider`] with read-write access.
200    pub fn read_write(path: impl AsRef<Path>) -> ProviderResult<Self> {
201        StaticFileProviderBuilder::read_write(path)?.build()
202    }
203
204    /// Watches the directory for changes and updates the in-memory index when modifications
205    /// are detected.
206    ///
207    /// This may be necessary, since a non-node process that owns a [`StaticFileProvider`] does not
208    /// receive `update_index` notifications from a node that appends/truncates data.
209    pub fn watch_directory(&self) {
210        let provider = self.clone();
211        std::thread::spawn(move || {
212            let (tx, rx) = std::sync::mpsc::channel();
213            let mut watcher = RecommendedWatcher::new(
214                move |res| tx.send(res).unwrap(),
215                notify::Config::default(),
216            )
217            .expect("failed to create watcher");
218
219            watcher
220                .watch(&provider.path, RecursiveMode::NonRecursive)
221                .expect("failed to watch path");
222
223            // Some backends send repeated modified events
224            let mut last_event_timestamp = None;
225
226            while let Ok(res) = rx.recv() {
227                match res {
228                    Ok(event) => {
229                        // We only care about modified data events
230                        if !matches!(
231                            event.kind,
232                            notify::EventKind::Modify(_) |
233                                notify::EventKind::Create(_) |
234                                notify::EventKind::Remove(_)
235                        ) {
236                            continue
237                        }
238
239                        // We only trigger a re-initialization if a configuration file was
240                        // modified. This means that a
241                        // static_file_provider.commit() was called on the node after
242                        // appending/truncating rows
243                        for segment in event.paths {
244                            // Ensure it's a file with the .conf extension
245                            if segment
246                                .extension()
247                                .is_none_or(|s| s.to_str() != Some(CONFIG_FILE_EXTENSION))
248                            {
249                                continue
250                            }
251
252                            // Ensure it's well formatted static file name
253                            if StaticFileSegment::parse_filename(
254                                &segment.file_stem().expect("qed").to_string_lossy(),
255                            )
256                            .is_none()
257                            {
258                                continue
259                            }
260
261                            // If we can read the metadata and modified timestamp, ensure this is
262                            // not an old or repeated event.
263                            if let Ok(current_modified_timestamp) =
264                                std::fs::metadata(&segment).and_then(|m| m.modified())
265                            {
266                                if last_event_timestamp.is_some_and(|last_timestamp| {
267                                    last_timestamp >= current_modified_timestamp
268                                }) {
269                                    continue
270                                }
271                                last_event_timestamp = Some(current_modified_timestamp);
272                            }
273
274                            info!(target: "providers::static_file", updated_file = ?segment.file_stem(), "re-initializing static file provider index");
275                            if let Err(err) = provider.initialize_index() {
276                                warn!(target: "providers::static_file", "failed to re-initialize index: {err}");
277                            }
278                            break
279                        }
280                    }
281
282                    Err(err) => warn!(target: "providers::watcher", "watch error: {err:?}"),
283                }
284            }
285        });
286    }
287}
288
289impl<N: NodePrimitives> Deref for StaticFileProvider<N> {
290    type Target = StaticFileProviderInner<N>;
291
292    fn deref(&self) -> &Self::Target {
293        &self.0
294    }
295}
296
297/// [`StaticFileProviderInner`] manages all existing [`StaticFileJarProvider`].
298#[derive(Debug)]
299pub struct StaticFileProviderInner<N> {
300    /// Maintains a map which allows for concurrent access to different `NippyJars`, over different
301    /// segments and ranges.
302    map: DashMap<(BlockNumber, StaticFileSegment), LoadedJar>,
303    /// Indexes per segment.
304    indexes: RwLock<HashMap<StaticFileSegment, StaticFileSegmentIndex>>,
305    /// This is an additional index that tracks the expired height, this will track the highest
306    /// block number that has been expired (missing). The first, non expired block is
307    /// `expired_history_height + 1`.
308    ///
309    /// This is effectively the transaction range that has been expired:
310    /// [`StaticFileProvider::delete_segment_below_block`] and mirrors
311    /// `static_files_min_block[transactions] - blocks_per_file`.
312    ///
313    /// This additional tracker exists for more efficient lookups because the node must be aware of
314    /// the expired height.
315    earliest_history_height: AtomicU64,
316    /// Directory where `static_files` are located
317    path: PathBuf,
318    /// Maintains a writer set of [`StaticFileSegment`].
319    writers: StaticFileWriters<N>,
320    /// Metrics for the static files.
321    metrics: Option<Arc<StaticFileProviderMetrics>>,
322    /// Access rights of the provider.
323    access: StaticFileAccess,
324    /// Number of blocks per file, per segment.
325    blocks_per_file: HashMap<StaticFileSegment, u64>,
326    /// Write lock for when access is [`StaticFileAccess::RW`].
327    _lock_file: Option<StorageLock>,
328    /// Genesis block number, default is 0;
329    genesis_block_number: u64,
330}
331
332impl<N: NodePrimitives> StaticFileProviderInner<N> {
333    /// Creates a new [`StaticFileProviderInner`].
334    fn new(path: impl AsRef<Path>, access: StaticFileAccess) -> ProviderResult<Self> {
335        let _lock_file = if access.is_read_write() {
336            StorageLock::try_acquire(path.as_ref()).map_err(ProviderError::other)?.into()
337        } else {
338            None
339        };
340
341        let mut blocks_per_file = HashMap::new();
342        for segment in StaticFileSegment::iter() {
343            blocks_per_file.insert(segment, DEFAULT_BLOCKS_PER_STATIC_FILE);
344        }
345
346        let provider = Self {
347            map: Default::default(),
348            indexes: Default::default(),
349            writers: Default::default(),
350            earliest_history_height: Default::default(),
351            path: path.as_ref().to_path_buf(),
352            metrics: None,
353            access,
354            blocks_per_file,
355            _lock_file,
356            genesis_block_number: 0,
357        };
358
359        Ok(provider)
360    }
361
362    pub const fn is_read_only(&self) -> bool {
363        self.access.is_read_only()
364    }
365
366    /// Each static file has a fixed number of blocks. This gives out the range where the requested
367    /// block is positioned.
368    ///
369    /// If the specified block falls into one of the ranges of already initialized static files,
370    /// this function will return that range.
371    ///
372    /// If no matching file exists, this function will derive a new range from the end of the last
373    /// existing file, if any.
374    pub fn find_fixed_range_with_block_index(
375        &self,
376        segment: StaticFileSegment,
377        block_index: Option<&SegmentRanges>,
378        block: BlockNumber,
379    ) -> SegmentRangeInclusive {
380        let blocks_per_file =
381            self.blocks_per_file.get(&segment).copied().unwrap_or(DEFAULT_BLOCKS_PER_STATIC_FILE);
382
383        if let Some(block_index) = block_index {
384            // Find first block range that contains the requested block
385            if let Some((_, range)) = block_index.iter().find(|(max_block, _)| block <= **max_block)
386            {
387                // Found matching range for an existing file using block index
388                return *range
389            } else if let Some((_, range)) = block_index.last_key_value() {
390                // Didn't find matching range for an existing file, derive a new range from the end
391                // of the last existing file range.
392                //
393                // `block` is always higher than `range.end()` here, because we iterated over all
394                // `block_index` ranges above and didn't find one that contains our block
395                let blocks_after_last_range = block - range.end();
396                let segments_to_skip = (blocks_after_last_range - 1) / blocks_per_file;
397                let start = range.end() + 1 + segments_to_skip * blocks_per_file;
398                return SegmentRangeInclusive::new(start, start + blocks_per_file - 1)
399            }
400        }
401        // No block index is available, derive a new range using the fixed number of blocks,
402        // starting from the beginning.
403        find_fixed_range(block, blocks_per_file)
404    }
405
406    /// Each static file has a fixed number of blocks. This gives out the range where the requested
407    /// block is positioned.
408    ///
409    /// If the specified block falls into one of the ranges of already initialized static files,
410    /// this function will return that range.
411    ///
412    /// If no matching file exists, this function will derive a new range from the end of the last
413    /// existing file, if any.
414    ///
415    /// This function will block indefinitely if a write lock for
416    /// [`Self::indexes`] is already acquired. In that case, use
417    /// [`Self::find_fixed_range_with_block_index`].
418    pub fn find_fixed_range(
419        &self,
420        segment: StaticFileSegment,
421        block: BlockNumber,
422    ) -> SegmentRangeInclusive {
423        self.find_fixed_range_with_block_index(
424            segment,
425            self.indexes
426                .read()
427                .get(&segment)
428                .map(|index| &index.expected_block_ranges_by_max_block),
429            block,
430        )
431    }
432
433    /// Get genesis block number
434    pub const fn genesis_block_number(&self) -> u64 {
435        self.genesis_block_number
436    }
437}
438
439impl<N: NodePrimitives> StaticFileProvider<N> {
440    /// Reports metrics for the static files.
441    pub fn report_metrics(&self) -> ProviderResult<()> {
442        let Some(metrics) = &self.metrics else { return Ok(()) };
443
444        let static_files = iter_static_files(&self.path).map_err(ProviderError::other)?;
445        for (segment, headers) in static_files {
446            let mut entries = 0;
447            let mut size = 0;
448
449            for (block_range, _) in &headers {
450                let fixed_block_range = self.find_fixed_range(segment, block_range.start());
451                let jar_provider = self
452                    .get_segment_provider_for_range(segment, || Some(fixed_block_range), None)?
453                    .ok_or_else(|| {
454                        ProviderError::MissingStaticFileBlock(segment, block_range.start())
455                    })?;
456
457                entries += jar_provider.rows();
458
459                let data_path = jar_provider.data_path().to_path_buf();
460                let index_path = jar_provider.index_path();
461                let offsets_path = jar_provider.offsets_path();
462                let config_path = jar_provider.config_path();
463
464                // can release jar early
465                drop(jar_provider);
466
467                let data_size = reth_fs_util::metadata(data_path)
468                    .map(|metadata| metadata.len())
469                    .unwrap_or_default();
470                let index_size = reth_fs_util::metadata(index_path)
471                    .map(|metadata| metadata.len())
472                    .unwrap_or_default();
473                let offsets_size = reth_fs_util::metadata(offsets_path)
474                    .map(|metadata| metadata.len())
475                    .unwrap_or_default();
476                let config_size = reth_fs_util::metadata(config_path)
477                    .map(|metadata| metadata.len())
478                    .unwrap_or_default();
479
480                size += data_size + index_size + offsets_size + config_size;
481            }
482
483            metrics.record_segment(segment, size, headers.len(), entries);
484        }
485
486        Ok(())
487    }
488
489    /// Gets the [`StaticFileJarProvider`] of the requested segment and start index that can be
490    /// either block or transaction.
491    pub fn get_segment_provider(
492        &self,
493        segment: StaticFileSegment,
494        number: u64,
495    ) -> ProviderResult<StaticFileJarProvider<'_, N>> {
496        if segment.is_block_based() {
497            self.get_segment_provider_for_block(segment, number, None)
498        } else {
499            self.get_segment_provider_for_transaction(segment, number, None)
500        }
501    }
502
503    /// Gets the [`StaticFileJarProvider`] of the requested segment and start index that can be
504    /// either block or transaction.
505    ///
506    /// If the segment is not found, returns [`None`].
507    pub fn get_maybe_segment_provider(
508        &self,
509        segment: StaticFileSegment,
510        number: u64,
511    ) -> ProviderResult<Option<StaticFileJarProvider<'_, N>>> {
512        let provider = if segment.is_block_based() {
513            self.get_segment_provider_for_block(segment, number, None)
514        } else {
515            self.get_segment_provider_for_transaction(segment, number, None)
516        };
517
518        match provider {
519            Ok(provider) => Ok(Some(provider)),
520            Err(
521                ProviderError::MissingStaticFileBlock(_, _) |
522                ProviderError::MissingStaticFileTx(_, _),
523            ) => Ok(None),
524            Err(err) => Err(err),
525        }
526    }
527
528    /// Gets the [`StaticFileJarProvider`] of the requested segment and block.
529    pub fn get_segment_provider_for_block(
530        &self,
531        segment: StaticFileSegment,
532        block: BlockNumber,
533        path: Option<&Path>,
534    ) -> ProviderResult<StaticFileJarProvider<'_, N>> {
535        self.get_segment_provider_for_range(
536            segment,
537            || self.get_segment_ranges_from_block(segment, block),
538            path,
539        )?
540        .ok_or(ProviderError::MissingStaticFileBlock(segment, block))
541    }
542
543    /// Gets the [`StaticFileJarProvider`] of the requested segment and transaction.
544    pub fn get_segment_provider_for_transaction(
545        &self,
546        segment: StaticFileSegment,
547        tx: TxNumber,
548        path: Option<&Path>,
549    ) -> ProviderResult<StaticFileJarProvider<'_, N>> {
550        self.get_segment_provider_for_range(
551            segment,
552            || self.get_segment_ranges_from_transaction(segment, tx),
553            path,
554        )?
555        .ok_or(ProviderError::MissingStaticFileTx(segment, tx))
556    }
557
558    /// Gets the [`StaticFileJarProvider`] of the requested segment and block or transaction.
559    ///
560    /// `fn_range` should make sure the range goes through `find_fixed_range`.
561    pub fn get_segment_provider_for_range(
562        &self,
563        segment: StaticFileSegment,
564        fn_range: impl Fn() -> Option<SegmentRangeInclusive>,
565        path: Option<&Path>,
566    ) -> ProviderResult<Option<StaticFileJarProvider<'_, N>>> {
567        // If we have a path, then get the block range from its name.
568        // Otherwise, check `self.available_static_files`
569        let block_range = match path {
570            Some(path) => StaticFileSegment::parse_filename(
571                &path
572                    .file_name()
573                    .ok_or_else(|| {
574                        ProviderError::MissingStaticFileSegmentPath(segment, path.to_path_buf())
575                    })?
576                    .to_string_lossy(),
577            )
578            .and_then(|(parsed_segment, block_range)| {
579                if parsed_segment == segment {
580                    return Some(block_range)
581                }
582                None
583            }),
584            None => fn_range(),
585        };
586
587        // Return cached `LoadedJar` or insert it for the first time, and then, return it.
588        if let Some(block_range) = block_range {
589            return Ok(Some(self.get_or_create_jar_provider(segment, &block_range)?))
590        }
591
592        Ok(None)
593    }
594
595    /// Gets the [`StaticFileJarProvider`] of the requested path.
596    pub fn get_segment_provider_for_path(
597        &self,
598        path: &Path,
599    ) -> ProviderResult<Option<StaticFileJarProvider<'_, N>>> {
600        StaticFileSegment::parse_filename(
601            &path
602                .file_name()
603                .ok_or_else(|| ProviderError::MissingStaticFilePath(path.to_path_buf()))?
604                .to_string_lossy(),
605        )
606        .map(|(segment, block_range)| self.get_or_create_jar_provider(segment, &block_range))
607        .transpose()
608    }
609
610    /// Given a segment and block range it removes the cached provider from the map.
611    ///
612    /// CAUTION: cached provider should be dropped before calling this or IT WILL deadlock.
613    pub fn remove_cached_provider(
614        &self,
615        segment: StaticFileSegment,
616        fixed_block_range_end: BlockNumber,
617    ) {
618        self.map.remove(&(fixed_block_range_end, segment));
619    }
620
621    /// This handles history expiry by deleting all static files for the given segment below the
622    /// given block.
623    ///
624    /// For example if block is 1M and the blocks per file are 500K this will delete all individual
625    /// files below 1M, so 0-499K and 500K-999K.
626    ///
627    /// This will not delete the file that contains the block itself, because files can only be
628    /// removed entirely.
629    ///
630    /// # Safety
631    ///
632    /// This method will never delete the highest static file for the segment, even if the
633    /// requested block is higher than the highest block in static files. This ensures we always
634    /// maintain at least one static file if any exist.
635    ///
636    /// Returns a list of `SegmentHeader`s from the deleted jars.
637    pub fn delete_segment_below_block(
638        &self,
639        segment: StaticFileSegment,
640        block: BlockNumber,
641    ) -> ProviderResult<Vec<SegmentHeader>> {
642        // Nothing to delete if block is 0.
643        if block == 0 {
644            return Ok(Vec::new())
645        }
646
647        let highest_block = self.get_highest_static_file_block(segment);
648        let mut deleted_headers = Vec::new();
649
650        loop {
651            let Some(block_height) = self.get_lowest_range_end(segment) else {
652                return Ok(deleted_headers)
653            };
654
655            // Stop if we've reached the target block or the highest static file
656            if block_height >= block || Some(block_height) == highest_block {
657                return Ok(deleted_headers)
658            }
659
660            debug!(
661                target: "provider::static_file",
662                ?segment,
663                ?block_height,
664                "Deleting static file below block"
665            );
666
667            // now we need to wipe the static file, this will take care of updating the index and
668            // advance the lowest tracked block height for the segment.
669            let header = self.delete_jar(segment, block_height).inspect_err(|err| {
670                warn!( target: "provider::static_file", ?segment, %block_height, ?err, "Failed to delete static file below block")
671            })?;
672
673            deleted_headers.push(header);
674        }
675    }
676
677    /// Given a segment and block, it deletes the jar and all files from the respective block range.
678    ///
679    /// CAUTION: destructive. Deletes files on disk.
680    ///
681    /// This will re-initialize the index after deletion, so all files are tracked.
682    ///
683    /// Returns the `SegmentHeader` of the deleted jar.
684    pub fn delete_jar(
685        &self,
686        segment: StaticFileSegment,
687        block: BlockNumber,
688    ) -> ProviderResult<SegmentHeader> {
689        let fixed_block_range = self.find_fixed_range(segment, block);
690        let key = (fixed_block_range.end(), segment);
691        let jar = if let Some((_, jar)) = self.map.remove(&key) {
692            jar.jar
693        } else {
694            let file = self.path.join(segment.filename(&fixed_block_range));
695            debug!(
696                target: "provider::static_file",
697                ?file,
698                ?fixed_block_range,
699                ?block,
700                "Loading static file jar for deletion"
701            );
702            NippyJar::<SegmentHeader>::load(&file).map_err(ProviderError::other)?
703        };
704
705        let header = *jar.user_header();
706        jar.delete().map_err(ProviderError::other)?;
707
708        // SAFETY: this is currently necessary to ensure that certain indexes like
709        // `static_files_min_block` have the correct values after pruning.
710        self.initialize_index()?;
711
712        Ok(header)
713    }
714
715    /// Given a segment and block range it returns a cached
716    /// [`StaticFileJarProvider`]. TODO(joshie): we should check the size and pop N if there's too
717    /// many.
718    fn get_or_create_jar_provider(
719        &self,
720        segment: StaticFileSegment,
721        fixed_block_range: &SegmentRangeInclusive,
722    ) -> ProviderResult<StaticFileJarProvider<'_, N>> {
723        let key = (fixed_block_range.end(), segment);
724
725        // Avoid using `entry` directly to avoid a write lock in the common case.
726        trace!(target: "provider::static_file", ?segment, ?fixed_block_range, "Getting provider");
727        let mut provider: StaticFileJarProvider<'_, N> = if let Some(jar) = self.map.get(&key) {
728            trace!(target: "provider::static_file", ?segment, ?fixed_block_range, "Jar found in cache");
729            jar.into()
730        } else {
731            trace!(target: "provider::static_file", ?segment, ?fixed_block_range, "Creating jar from scratch");
732            let path = self.path.join(segment.filename(fixed_block_range));
733            let jar = NippyJar::load(&path).map_err(ProviderError::other)?;
734            self.map.entry(key).insert(LoadedJar::new(jar)?).downgrade().into()
735        };
736
737        if let Some(metrics) = &self.metrics {
738            provider = provider.with_metrics(metrics.clone());
739        }
740        Ok(provider)
741    }
742
743    /// Gets a static file segment's block range from the provider inner block
744    /// index.
745    fn get_segment_ranges_from_block(
746        &self,
747        segment: StaticFileSegment,
748        block: u64,
749    ) -> Option<SegmentRangeInclusive> {
750        let indexes = self.indexes.read();
751        let index = indexes.get(&segment)?;
752
753        (index.max_block >= block).then(|| {
754            self.find_fixed_range_with_block_index(
755                segment,
756                Some(&index.expected_block_ranges_by_max_block),
757                block,
758            )
759        })
760    }
761
762    /// Gets a static file segment's fixed block range from the provider inner
763    /// transaction index.
764    fn get_segment_ranges_from_transaction(
765        &self,
766        segment: StaticFileSegment,
767        tx: u64,
768    ) -> Option<SegmentRangeInclusive> {
769        let indexes = self.indexes.read();
770        let index = indexes.get(&segment)?;
771        let available_block_ranges_by_max_tx = index.available_block_ranges_by_max_tx.as_ref()?;
772
773        // It's more probable that the request comes from a newer tx height, so we iterate
774        // the static_files in reverse.
775        let mut static_files_rev_iter = available_block_ranges_by_max_tx.iter().rev().peekable();
776
777        while let Some((tx_end, block_range)) = static_files_rev_iter.next() {
778            if tx > *tx_end {
779                // request tx is higher than highest static file tx
780                return None
781            }
782            let tx_start = static_files_rev_iter.peek().map(|(tx_end, _)| *tx_end + 1).unwrap_or(0);
783            if tx_start <= tx {
784                return Some(self.find_fixed_range_with_block_index(
785                    segment,
786                    Some(&index.expected_block_ranges_by_max_block),
787                    block_range.end(),
788                ))
789            }
790        }
791        None
792    }
793
794    /// Updates the inner transaction and block indexes alongside the internal cached providers in
795    /// `self.map`.
796    ///
797    /// Any entry higher than `segment_max_block` will be deleted from the previous structures.
798    ///
799    /// If `segment_max_block` is None it means there's no static file for this segment.
800    pub fn update_index(
801        &self,
802        segment: StaticFileSegment,
803        segment_max_block: Option<BlockNumber>,
804    ) -> ProviderResult<()> {
805        debug!(
806            target: "provider::static_file",
807            ?segment,
808            ?segment_max_block,
809            "Updating provider index"
810        );
811        let mut indexes = self.indexes.write();
812
813        match segment_max_block {
814            Some(segment_max_block) => {
815                let fixed_range = self.find_fixed_range_with_block_index(
816                    segment,
817                    indexes.get(&segment).map(|index| &index.expected_block_ranges_by_max_block),
818                    segment_max_block,
819                );
820
821                let jar = NippyJar::<SegmentHeader>::load(
822                    &self.path.join(segment.filename(&fixed_range)),
823                )
824                .map_err(ProviderError::other)?;
825
826                let index = indexes
827                    .entry(segment)
828                    .and_modify(|index| {
829                        // Update max block
830                        index.max_block = segment_max_block;
831
832                        // Update expected block range index
833
834                        // Remove all expected block ranges that are less than the new max block
835                        index
836                            .expected_block_ranges_by_max_block
837                            .retain(|_, block_range| block_range.start() < fixed_range.start());
838                        // Insert new expected block range
839                        index
840                            .expected_block_ranges_by_max_block
841                            .insert(fixed_range.end(), fixed_range);
842                    })
843                    .or_insert_with(|| StaticFileSegmentIndex {
844                        min_block_range: None,
845                        max_block: segment_max_block,
846                        expected_block_ranges_by_max_block: BTreeMap::from([(
847                            fixed_range.end(),
848                            fixed_range,
849                        )]),
850                        available_block_ranges_by_max_tx: None,
851                    });
852
853                // Update min_block to track the lowest block range of the segment.
854                // This is initially set by initialize_index() on node startup, but must be updated
855                // as the file grows to prevent stale values.
856                //
857                // Without this update, min_block can remain at genesis (e.g. Some([0..=0]) or None)
858                // even after syncing to higher blocks (e.g. [0..=100]). A stale
859                // min_block causes get_lowest_static_file_block() to return the
860                // wrong end value, which breaks pruning logic that relies on it for
861                // safety checks.
862                //
863                // Example progression:
864                // 1. Node starts, initialize_index() sets min_block = [0..=0]
865                // 2. Sync to block 100, this update sets min_block = [0..=100]
866                // 3. Pruner calls get_lowest_static_file_block() -> returns 100 (correct). Without
867                //    this update, it would incorrectly return 0 (stale)
868                if let Some(current_block_range) = jar.user_header().block_range() {
869                    if let Some(min_block_range) = index.min_block_range.as_mut() {
870                        // delete_jar WILL ALWAYS re-initialize all indexes, so we are always
871                        // sure that current_min is always the lowest.
872                        if current_block_range.start() == min_block_range.start() {
873                            *min_block_range = current_block_range;
874                        }
875                    } else {
876                        index.min_block_range = Some(current_block_range);
877                    }
878                }
879
880                // Updates the tx index by first removing all entries which have a higher
881                // block_start than our current static file.
882                if let Some(tx_range) = jar.user_header().tx_range() {
883                    // Current block range has the same block start as `fixed_range``, but block end
884                    // might be different if we are still filling this static file.
885                    if let Some(current_block_range) = jar.user_header().block_range() {
886                        let tx_end = tx_range.end();
887
888                        // Considering that `update_index` is called when we either append/truncate,
889                        // we are sure that we are handling the latest data
890                        // points.
891                        //
892                        // Here we remove every entry of the index that has a block start higher or
893                        // equal than our current one. This is important in the case
894                        // that we prune a lot of rows resulting in a file (and thus
895                        // a higher block range) deletion.
896                        if let Some(index) = index.available_block_ranges_by_max_tx.as_mut() {
897                            index
898                                .retain(|_, block_range| block_range.start() < fixed_range.start());
899                            index.insert(tx_end, current_block_range);
900                        } else {
901                            index.available_block_ranges_by_max_tx =
902                                Some(BTreeMap::from([(tx_end, current_block_range)]));
903                        }
904                    }
905                } else if segment.is_tx_based() {
906                    // The unwinded file has no more transactions/receipts. However, the highest
907                    // block is within this files' block range. We only retain
908                    // entries with block ranges before the current one.
909                    if let Some(index) = index.available_block_ranges_by_max_tx.as_mut() {
910                        index.retain(|_, block_range| block_range.start() < fixed_range.start());
911                    }
912
913                    // If the index is empty, just remove it.
914                    index.available_block_ranges_by_max_tx.take_if(|index| index.is_empty());
915                }
916
917                // Update the cached provider.
918                debug!(target: "provider::static_file", ?segment, "Inserting updated jar into cache");
919                self.map.insert((fixed_range.end(), segment), LoadedJar::new(jar)?);
920
921                // Delete any cached provider that no longer has an associated jar.
922                debug!(target: "provider::static_file", ?segment, "Cleaning up jar map");
923                self.map.retain(|(end, seg), _| !(*seg == segment && *end > fixed_range.end()));
924            }
925            None => {
926                debug!(target: "provider::static_file", ?segment, "Removing segment from index");
927                indexes.remove(&segment);
928            }
929        };
930
931        debug!(target: "provider::static_file", ?segment, "Updated provider index");
932        Ok(())
933    }
934
935    /// Initializes the inner transaction and block index
936    pub fn initialize_index(&self) -> ProviderResult<()> {
937        let mut indexes = self.indexes.write();
938        indexes.clear();
939
940        for (segment, headers) in iter_static_files(&self.path).map_err(ProviderError::other)? {
941            // Update first and last block for each segment
942            //
943            // It's safe to call `expect` here, because every segment has at least one header
944            // associated with it.
945            let min_block_range = Some(headers.first().expect("headers are not empty").0);
946            let max_block = headers.last().expect("headers are not empty").0.end();
947
948            let mut expected_block_ranges_by_max_block = BTreeMap::default();
949            let mut available_block_ranges_by_max_tx = None;
950
951            for (block_range, header) in headers {
952                // Update max expected block -> expected_block_range index
953                expected_block_ranges_by_max_block
954                    .insert(header.expected_block_end(), header.expected_block_range());
955
956                // Update max tx -> block_range index
957                if let Some(tx_range) = header.tx_range() {
958                    let tx_end = tx_range.end();
959
960                    available_block_ranges_by_max_tx
961                        .get_or_insert_with(BTreeMap::default)
962                        .insert(tx_end, block_range);
963                }
964            }
965
966            indexes.insert(
967                segment,
968                StaticFileSegmentIndex {
969                    min_block_range,
970                    max_block,
971                    expected_block_ranges_by_max_block,
972                    available_block_ranges_by_max_tx,
973                },
974            );
975        }
976
977        // If this is a re-initialization, we need to clear this as well
978        self.map.clear();
979
980        // initialize the expired history height to the lowest static file block
981        if let Some(lowest_range) =
982            indexes.get(&StaticFileSegment::Transactions).and_then(|index| index.min_block_range)
983        {
984            // the earliest height is the lowest available block number
985            self.earliest_history_height
986                .store(lowest_range.start(), std::sync::atomic::Ordering::Relaxed);
987        }
988
989        Ok(())
990    }
991
992    /// Ensures that any broken invariants which cannot be healed on the spot return a pipeline
993    /// target to unwind to.
994    ///
995    /// Two types of consistency checks are done for:
996    ///
997    /// 1) When a static file fails to commit but the underlying data was changed.
998    /// 2) When a static file was committed, but the required database transaction was not.
999    ///
1000    /// For 1) it can self-heal if `self.access.is_read_only()` is set to `false`. Otherwise, it
1001    /// will return an error.
1002    /// For 2) the invariants below are checked, and if broken, might require a pipeline unwind
1003    /// to heal.
1004    ///
1005    /// For each static file segment:
1006    /// * the corresponding database table should overlap or have continuity in their keys
1007    ///   ([`TxNumber`] or [`BlockNumber`]).
1008    /// * its highest block should match the stage checkpoint block number if it's equal or higher
1009    ///   than the corresponding database table last entry.
1010    ///
1011    /// Returns a [`Option`] of [`PipelineTarget::Unwind`] if any healing is further required.
1012    ///
1013    /// WARNING: No static file writer should be held before calling this function, otherwise it
1014    /// will deadlock.
1015    pub fn check_consistency<Provider>(
1016        &self,
1017        provider: &Provider,
1018    ) -> ProviderResult<Option<PipelineTarget>>
1019    where
1020        Provider: DBProvider
1021            + BlockReader
1022            + StageCheckpointReader
1023            + ChainSpecProvider
1024            + StorageSettingsCache,
1025        N: NodePrimitives<Receipt: Value, BlockHeader: Value, SignedTx: Value>,
1026    {
1027        // OVM historical import is broken and does not work with this check. It's importing
1028        // duplicated receipts resulting in having more receipts than the expected transaction
1029        // range.
1030        //
1031        // If we detect an OVM import was done (block #1 <https://optimistic.etherscan.io/block/1>), skip it.
1032        // More on [#11099](https://github.com/paradigmxyz/reth/pull/11099).
1033        if provider.chain_spec().is_optimism() &&
1034            reth_chainspec::Chain::optimism_mainnet() == provider.chain_spec().chain_id()
1035        {
1036            // check whether we have the first OVM block: <https://optimistic.etherscan.io/block/0xbee7192e575af30420cae0c7776304ac196077ee72b048970549e4f08e875453>
1037            const OVM_HEADER_1_HASH: B256 =
1038                b256!("0xbee7192e575af30420cae0c7776304ac196077ee72b048970549e4f08e875453");
1039            if provider.block_number(OVM_HEADER_1_HASH)?.is_some() {
1040                info!(target: "reth::cli",
1041                    "Skipping storage verification for OP mainnet, expected inconsistency in OVM chain"
1042                );
1043                return Ok(None)
1044            }
1045        }
1046
1047        info!(target: "reth::cli", "Verifying storage consistency.");
1048
1049        let mut unwind_target: Option<BlockNumber> = None;
1050        let mut update_unwind_target = |new_target: BlockNumber| {
1051            if let Some(target) = unwind_target.as_mut() {
1052                *target = (*target).min(new_target);
1053            } else {
1054                unwind_target = Some(new_target);
1055            }
1056        };
1057
1058        for segment in self.segments_to_check(provider) {
1059            debug!(target: "reth::providers::static_file", ?segment, "Checking consistency for segment");
1060
1061            // Heal file-level inconsistencies and get before/after highest block
1062            let (initial_highest_block, mut highest_block) = self.maybe_heal_segment(segment)?;
1063
1064            // Only applies to block-based static files. (Headers)
1065            //
1066            // The updated `highest_block` may have decreased if we healed from a pruning
1067            // interruption.
1068            if initial_highest_block != highest_block {
1069                info!(
1070                    target: "reth::providers::static_file",
1071                    ?initial_highest_block,
1072                    unwind_target = highest_block,
1073                    ?segment,
1074                    "Setting unwind target."
1075                );
1076                update_unwind_target(highest_block.unwrap_or_default());
1077            }
1078
1079            // Only applies to transaction-based static files. (Receipts & Transactions)
1080            //
1081            // Make sure the last transaction matches the last block from its indices, since a heal
1082            // from a pruning interruption might have decreased the number of transactions without
1083            // being able to update the last block of the static file segment.
1084            let highest_tx = self.get_highest_static_file_tx(segment);
1085            debug!(target: "reth::providers::static_file", ?segment, ?highest_tx, ?highest_block, "Highest transaction for segment");
1086            if let Some(highest_tx) = highest_tx {
1087                let mut last_block = highest_block.unwrap_or_default();
1088                debug!(target: "reth::providers::static_file", ?segment, last_block, highest_tx, "Verifying last transaction matches last block indices");
1089                loop {
1090                    if let Some(indices) = provider.block_body_indices(last_block)? {
1091                        debug!(target: "reth::providers::static_file", ?segment, last_block, last_tx_num = indices.last_tx_num(), highest_tx, "Found block body indices");
1092                        if indices.last_tx_num() <= highest_tx {
1093                            break
1094                        }
1095                    } else {
1096                        debug!(target: "reth::providers::static_file", ?segment, last_block, "Block body indices not found, static files ahead of database");
1097                        // If the block body indices can not be found, then it means that static
1098                        // files is ahead of database, and the `ensure_invariants` check will fix
1099                        // it by comparing with stage checkpoints.
1100                        break
1101                    }
1102                    if last_block == 0 {
1103                        debug!(target: "reth::providers::static_file", ?segment, "Reached block 0 in verification loop");
1104                        break
1105                    }
1106                    last_block -= 1;
1107
1108                    info!(
1109                        target: "reth::providers::static_file",
1110                        highest_block = self.get_highest_static_file_block(segment),
1111                        unwind_target = last_block,
1112                        ?segment,
1113                        "Setting unwind target."
1114                    );
1115                    highest_block = Some(last_block);
1116                    update_unwind_target(last_block);
1117                }
1118            }
1119
1120            debug!(target: "reth::providers::static_file", ?segment, "Ensuring invariants for segment");
1121            if let Some(unwind) = match segment {
1122                StaticFileSegment::Headers => self
1123                    .ensure_invariants::<_, tables::Headers<N::BlockHeader>>(
1124                        provider,
1125                        segment,
1126                        highest_block,
1127                        highest_block,
1128                    )?,
1129                StaticFileSegment::Transactions => self
1130                    .ensure_invariants::<_, tables::Transactions<N::SignedTx>>(
1131                        provider,
1132                        segment,
1133                        highest_tx,
1134                        highest_block,
1135                    )?,
1136                StaticFileSegment::Receipts => self
1137                    .ensure_invariants::<_, tables::Receipts<N::Receipt>>(
1138                        provider,
1139                        segment,
1140                        highest_tx,
1141                        highest_block,
1142                    )?,
1143                StaticFileSegment::TransactionSenders => self
1144                    .ensure_invariants::<_, tables::TransactionSenders>(
1145                        provider,
1146                        segment,
1147                        highest_tx,
1148                        highest_block,
1149                    )?,
1150            } {
1151                debug!(target: "reth::providers::static_file", ?segment, unwind_target=unwind, "Invariants check returned unwind target");
1152                update_unwind_target(unwind);
1153            } else {
1154                debug!(target: "reth::providers::static_file", ?segment, "Invariants check completed, no unwind needed");
1155            }
1156        }
1157
1158        Ok(unwind_target.map(PipelineTarget::Unwind))
1159    }
1160
1161    /// Heals file-level (`NippyJar`) inconsistencies for eligible static file segments.
1162    ///
1163    /// Call before [`Self::check_consistency`] so files are internally consistent.
1164    /// Uses the same segment-skip logic as [`Self::check_consistency`], but does not compare with
1165    /// database checkpoints or prune against them.
1166    pub fn check_file_consistency<Provider>(&self, provider: &Provider) -> ProviderResult<()>
1167    where
1168        Provider: DBProvider + ChainSpecProvider + StorageSettingsCache,
1169    {
1170        info!(target: "reth::cli", "Healing static file inconsistencies.");
1171
1172        for segment in self.segments_to_check(provider) {
1173            let _ = self.maybe_heal_segment(segment)?;
1174        }
1175
1176        Ok(())
1177    }
1178
1179    /// Returns the static file segments that should be checked/healed for this provider.
1180    fn segments_to_check<'a, Provider>(
1181        &'a self,
1182        provider: &'a Provider,
1183    ) -> impl Iterator<Item = StaticFileSegment> + 'a
1184    where
1185        Provider: DBProvider + ChainSpecProvider + StorageSettingsCache,
1186    {
1187        StaticFileSegment::iter()
1188            .filter(move |segment| self.should_check_segment(provider, *segment))
1189    }
1190
1191    fn should_check_segment<Provider>(
1192        &self,
1193        provider: &Provider,
1194        segment: StaticFileSegment,
1195    ) -> bool
1196    where
1197        Provider: DBProvider + ChainSpecProvider + StorageSettingsCache,
1198    {
1199        match segment {
1200            StaticFileSegment::Headers | StaticFileSegment::Transactions => true,
1201            StaticFileSegment::Receipts => {
1202                if EitherWriter::receipts_destination(provider).is_database() {
1203                    // Old pruned nodes (including full node) do not store receipts as static
1204                    // files.
1205                    debug!(target: "reth::providers::static_file", ?segment, "Skipping receipts segment: receipts stored in database");
1206                    return false
1207                }
1208
1209                if NamedChain::Gnosis == provider.chain_spec().chain_id() ||
1210                    NamedChain::Chiado == provider.chain_spec().chain_id()
1211                {
1212                    // Gnosis and Chiado's historical import is broken and does not work with
1213                    // this check. They are importing receipts along
1214                    // with importing headers/bodies.
1215                    debug!(target: "reth::providers::static_file", ?segment, "Skipping receipts segment: broken historical import for gnosis/chiado");
1216                    return false;
1217                }
1218
1219                true
1220            }
1221            StaticFileSegment::TransactionSenders => {
1222                !EitherWriterDestination::senders(provider).is_database()
1223            }
1224        }
1225    }
1226
1227    /// Checks consistency of the latest static file segment and throws an error if at fault.
1228    /// Read-only.
1229    pub fn check_segment_consistency(&self, segment: StaticFileSegment) -> ProviderResult<()> {
1230        debug!(target: "reth::providers::static_file", ?segment, "Checking segment consistency");
1231        if let Some(latest_block) = self.get_highest_static_file_block(segment) {
1232            let file_path = self
1233                .directory()
1234                .join(segment.filename(&self.find_fixed_range(segment, latest_block)));
1235            debug!(target: "reth::providers::static_file", ?segment, ?file_path, latest_block, "Loading NippyJar for consistency check");
1236
1237            let jar = NippyJar::<SegmentHeader>::load(&file_path).map_err(ProviderError::other)?;
1238            debug!(target: "reth::providers::static_file", ?segment, "NippyJar loaded, checking consistency");
1239
1240            NippyJarChecker::new(jar).check_consistency().map_err(ProviderError::other)?;
1241            debug!(target: "reth::providers::static_file", ?segment, "NippyJar consistency check passed");
1242        } else {
1243            debug!(target: "reth::providers::static_file", ?segment, "No static file block found, skipping consistency check");
1244        }
1245        Ok(())
1246    }
1247
1248    /// Attempts to heal file-level (`NippyJar`) inconsistencies for a single static file segment.
1249    ///
1250    /// Returns the highest block before and after healing, which can be used to detect
1251    /// if healing from a pruning interruption decreased the highest block.
1252    ///
1253    /// File consistency is broken if:
1254    ///
1255    /// * appending data was interrupted before a config commit, then data file will be truncated
1256    ///   according to the config.
1257    ///
1258    /// * pruning data was interrupted before a config commit, then we have deleted data that we are
1259    ///   expected to still have. We need to check the Database and unwind everything accordingly.
1260    ///
1261    /// **Note:** In read-only mode, this will return an error if a consistency issue is detected,
1262    /// since healing requires write access.
1263    fn maybe_heal_segment(
1264        &self,
1265        segment: StaticFileSegment,
1266    ) -> ProviderResult<(Option<BlockNumber>, Option<BlockNumber>)> {
1267        let initial_highest_block = self.get_highest_static_file_block(segment);
1268        debug!(target: "reth::providers::static_file", ?segment, ?initial_highest_block, "Initial highest block for segment");
1269
1270        if self.access.is_read_only() {
1271            // Read-only mode: cannot modify files, so just validate consistency and error if
1272            // broken.
1273            debug!(target: "reth::providers::static_file", ?segment, "Checking segment consistency (read-only)");
1274            self.check_segment_consistency(segment)?;
1275        } else {
1276            // Writable mode: fetching the writer will automatically heal any file-level
1277            // inconsistency by truncating data to match the last committed config.
1278            debug!(target: "reth::providers::static_file", ?segment, "Fetching latest writer which might heal any potential inconsistency");
1279            self.latest_writer(segment)?;
1280        }
1281
1282        // The updated `highest_block` may have decreased if we healed from a pruning
1283        // interruption.
1284        let highest_block = self.get_highest_static_file_block(segment);
1285
1286        Ok((initial_highest_block, highest_block))
1287    }
1288
1289    /// Check invariants for each corresponding table and static file segment:
1290    ///
1291    /// * the corresponding database table should overlap or have continuity in their keys
1292    ///   ([`TxNumber`] or [`BlockNumber`]).
1293    /// * its highest block should match the stage checkpoint block number if it's equal or higher
1294    ///   than the corresponding database table last entry.
1295    ///   * If the checkpoint block is higher, then request a pipeline unwind to the static file
1296    ///     block. This is expressed by returning [`Some`] with the requested pipeline unwind
1297    ///     target.
1298    ///   * If the checkpoint block is lower, then heal by removing rows from the static file. In
1299    ///     this case, the rows will be removed and [`None`] will be returned.
1300    ///
1301    /// * If the database tables overlap with static files and have contiguous keys, or the
1302    ///   checkpoint block matches the highest static files block, then [`None`] will be returned.
1303    fn ensure_invariants<Provider, T: Table<Key = u64>>(
1304        &self,
1305        provider: &Provider,
1306        segment: StaticFileSegment,
1307        highest_static_file_entry: Option<u64>,
1308        highest_static_file_block: Option<BlockNumber>,
1309    ) -> ProviderResult<Option<BlockNumber>>
1310    where
1311        Provider: DBProvider + BlockReader + StageCheckpointReader,
1312    {
1313        debug!(target: "reth::providers::static_file", ?segment, ?highest_static_file_entry, ?highest_static_file_block, "Ensuring invariants");
1314        let mut db_cursor = provider.tx_ref().cursor_read::<T>()?;
1315
1316        if let Some((db_first_entry, _)) = db_cursor.first()? {
1317            debug!(target: "reth::providers::static_file", ?segment, db_first_entry, "Found first database entry");
1318            if let (Some(highest_entry), Some(highest_block)) =
1319                (highest_static_file_entry, highest_static_file_block)
1320            {
1321                // If there is a gap between the entry found in static file and
1322                // database, then we have most likely lost static file data and need to unwind so we
1323                // can load it again
1324                if !(db_first_entry <= highest_entry || highest_entry + 1 == db_first_entry) {
1325                    info!(
1326                        target: "reth::providers::static_file",
1327                        ?db_first_entry,
1328                        ?highest_entry,
1329                        unwind_target = highest_block,
1330                        ?segment,
1331                        "Setting unwind target."
1332                    );
1333                    return Ok(Some(highest_block))
1334                }
1335            }
1336
1337            if let Some((db_last_entry, _)) = db_cursor.last()? &&
1338                highest_static_file_entry
1339                    .is_none_or(|highest_entry| db_last_entry > highest_entry)
1340            {
1341                debug!(target: "reth::providers::static_file", ?segment, db_last_entry, ?highest_static_file_entry, "Database has entries beyond static files, no unwind needed");
1342                return Ok(None)
1343            }
1344        } else {
1345            debug!(target: "reth::providers::static_file", ?segment, "No database entries found");
1346        }
1347
1348        let highest_static_file_entry = highest_static_file_entry.unwrap_or_default();
1349        let highest_static_file_block = highest_static_file_block.unwrap_or_default();
1350
1351        // If static file entry is ahead of the database entries, then ensure the checkpoint block
1352        // number matches.
1353        let stage_id = match segment {
1354            StaticFileSegment::Headers => StageId::Headers,
1355            StaticFileSegment::Transactions => StageId::Bodies,
1356            StaticFileSegment::Receipts => StageId::Execution,
1357            StaticFileSegment::TransactionSenders => StageId::SenderRecovery,
1358        };
1359        let checkpoint_block_number =
1360            provider.get_stage_checkpoint(stage_id)?.unwrap_or_default().block_number;
1361        debug!(target: "reth::providers::static_file", ?segment, ?stage_id, checkpoint_block_number, highest_static_file_block, "Retrieved stage checkpoint");
1362
1363        // If the checkpoint is ahead, then we lost static file data. May be data corruption.
1364        if checkpoint_block_number > highest_static_file_block {
1365            info!(
1366                target: "reth::providers::static_file",
1367                checkpoint_block_number,
1368                unwind_target = highest_static_file_block,
1369                ?segment,
1370                "Setting unwind target."
1371            );
1372            return Ok(Some(highest_static_file_block))
1373        }
1374
1375        // If the checkpoint is behind, then we failed to do a database commit **but committed** to
1376        // static files on executing a stage, or the reverse on unwinding a stage.
1377        // All we need to do is to prune the extra static file rows.
1378        if checkpoint_block_number < highest_static_file_block {
1379            info!(
1380                target: "reth::providers",
1381                ?segment,
1382                from = highest_static_file_block,
1383                to = checkpoint_block_number,
1384                "Unwinding static file segment."
1385            );
1386            let mut writer = self.latest_writer(segment)?;
1387            match segment {
1388                StaticFileSegment::Headers => {
1389                    let prune_count = highest_static_file_block - checkpoint_block_number;
1390                    debug!(target: "reth::providers::static_file", ?segment, prune_count, "Pruning headers");
1391                    // TODO(joshie): is_block_meta
1392                    writer.prune_headers(prune_count)?;
1393                }
1394                StaticFileSegment::Transactions |
1395                StaticFileSegment::Receipts |
1396                StaticFileSegment::TransactionSenders => {
1397                    if let Some(block) = provider.block_body_indices(checkpoint_block_number)? {
1398                        let number = highest_static_file_entry - block.last_tx_num();
1399                        debug!(target: "reth::providers::static_file", ?segment, prune_count = number, checkpoint_block_number, "Pruning transaction based segment");
1400
1401                        match segment {
1402                            StaticFileSegment::Transactions => {
1403                                writer.prune_transactions(number, checkpoint_block_number)?
1404                            }
1405                            StaticFileSegment::Receipts => {
1406                                writer.prune_receipts(number, checkpoint_block_number)?
1407                            }
1408                            StaticFileSegment::TransactionSenders => {
1409                                writer.prune_transaction_senders(number, checkpoint_block_number)?
1410                            }
1411                            StaticFileSegment::Headers => unreachable!(),
1412                        }
1413                    } else {
1414                        debug!(target: "reth::providers::static_file", ?segment, checkpoint_block_number, "No block body indices found for checkpoint block");
1415                    }
1416                }
1417            }
1418            debug!(target: "reth::providers::static_file", ?segment, "Committing writer after pruning");
1419            writer.commit()?;
1420            debug!(target: "reth::providers::static_file", ?segment, "Writer committed successfully");
1421        }
1422
1423        debug!(target: "reth::providers::static_file", ?segment, "Invariants ensured, returning None");
1424        Ok(None)
1425    }
1426
1427    /// Returns the earliest available block number that has not been expired and is still
1428    /// available.
1429    ///
1430    /// This means that the highest expired block (or expired block height) is
1431    /// `earliest_history_height.saturating_sub(1)`.
1432    ///
1433    /// Returns `0` if no history has been expired.
1434    pub fn earliest_history_height(&self) -> BlockNumber {
1435        self.earliest_history_height.load(std::sync::atomic::Ordering::Relaxed)
1436    }
1437
1438    /// Gets the lowest static file's block range if it exists for a static file segment.
1439    ///
1440    /// If there is nothing on disk for the given segment, this will return [`None`].
1441    pub fn get_lowest_range(&self, segment: StaticFileSegment) -> Option<SegmentRangeInclusive> {
1442        self.indexes.read().get(&segment).and_then(|index| index.min_block_range)
1443    }
1444
1445    /// Gets the lowest static file's block range start if it exists for a static file segment.
1446    ///
1447    /// For example if the lowest static file has blocks 0-499, this will return 0.
1448    ///
1449    /// If there is nothing on disk for the given segment, this will return [`None`].
1450    pub fn get_lowest_range_start(&self, segment: StaticFileSegment) -> Option<BlockNumber> {
1451        self.get_lowest_range(segment).map(|range| range.start())
1452    }
1453
1454    /// Gets the lowest static file's block range end if it exists for a static file segment.
1455    ///
1456    /// For example if the static file has blocks 0-499, this will return 499.
1457    ///
1458    /// If there is nothing on disk for the given segment, this will return [`None`].
1459    pub fn get_lowest_range_end(&self, segment: StaticFileSegment) -> Option<BlockNumber> {
1460        self.get_lowest_range(segment).map(|range| range.end())
1461    }
1462
1463    /// Gets the highest static file's block height if it exists for a static file segment.
1464    ///
1465    /// If there is nothing on disk for the given segment, this will return [`None`].
1466    pub fn get_highest_static_file_block(&self, segment: StaticFileSegment) -> Option<BlockNumber> {
1467        self.indexes.read().get(&segment).map(|index| index.max_block)
1468    }
1469
1470    /// Gets the highest static file transaction.
1471    ///
1472    /// If there is nothing on disk for the given segment, this will return [`None`].
1473    pub fn get_highest_static_file_tx(&self, segment: StaticFileSegment) -> Option<TxNumber> {
1474        self.indexes
1475            .read()
1476            .get(&segment)
1477            .and_then(|index| index.available_block_ranges_by_max_tx.as_ref())
1478            .and_then(|index| index.last_key_value().map(|(last_tx, _)| *last_tx))
1479    }
1480
1481    /// Gets the highest static file block for all segments.
1482    pub fn get_highest_static_files(&self) -> HighestStaticFiles {
1483        HighestStaticFiles {
1484            receipts: self.get_highest_static_file_block(StaticFileSegment::Receipts),
1485        }
1486    }
1487
1488    /// Iterates through segment `static_files` in reverse order, executing a function until it
1489    /// returns some object. Useful for finding objects by [`TxHash`] or [`BlockHash`].
1490    pub fn find_static_file<T>(
1491        &self,
1492        segment: StaticFileSegment,
1493        func: impl Fn(StaticFileJarProvider<'_, N>) -> ProviderResult<Option<T>>,
1494    ) -> ProviderResult<Option<T>> {
1495        if let Some(ranges) =
1496            self.indexes.read().get(&segment).map(|index| &index.expected_block_ranges_by_max_block)
1497        {
1498            // Iterate through all ranges in reverse order (highest to lowest)
1499            for range in ranges.values().rev() {
1500                if let Some(res) = func(self.get_or_create_jar_provider(segment, range)?)? {
1501                    return Ok(Some(res))
1502                }
1503            }
1504        }
1505
1506        Ok(None)
1507    }
1508
1509    /// Fetches data within a specified range across multiple static files.
1510    ///
1511    /// This function iteratively retrieves data using `get_fn` for each item in the given range.
1512    /// It continues fetching until the end of the range is reached or the provided `predicate`
1513    /// returns false.
1514    pub fn fetch_range_with_predicate<T, F, P>(
1515        &self,
1516        segment: StaticFileSegment,
1517        range: Range<u64>,
1518        mut get_fn: F,
1519        mut predicate: P,
1520    ) -> ProviderResult<Vec<T>>
1521    where
1522        F: FnMut(&mut StaticFileCursor<'_>, u64) -> ProviderResult<Option<T>>,
1523        P: FnMut(&T) -> bool,
1524    {
1525        let mut result = Vec::with_capacity((range.end - range.start).min(100) as usize);
1526
1527        /// Resolves to the provider for the given block or transaction number.
1528        ///
1529        /// If the static file is missing, the `result` is returned.
1530        macro_rules! get_provider {
1531            ($number:expr) => {{
1532                match self.get_segment_provider(segment, $number) {
1533                    Ok(provider) => provider,
1534                    Err(
1535                        ProviderError::MissingStaticFileBlock(_, _) |
1536                        ProviderError::MissingStaticFileTx(_, _),
1537                    ) => return Ok(result),
1538                    Err(err) => return Err(err),
1539                }
1540            }};
1541        }
1542
1543        let mut provider = get_provider!(range.start);
1544        let mut cursor = provider.cursor()?;
1545
1546        // advances number in range
1547        'outer: for number in range {
1548            // The `retrying` flag ensures a single retry attempt per `number`. If `get_fn` fails to
1549            // access data in two different static files, it halts further attempts by returning
1550            // an error, effectively preventing infinite retry loops.
1551            let mut retrying = false;
1552
1553            // advances static files if `get_fn` returns None
1554            'inner: loop {
1555                match get_fn(&mut cursor, number)? {
1556                    Some(res) => {
1557                        if !predicate(&res) {
1558                            break 'outer
1559                        }
1560                        result.push(res);
1561                        break 'inner
1562                    }
1563                    None => {
1564                        if retrying {
1565                            return Ok(result)
1566                        }
1567                        // There is a very small chance of hitting a deadlock if two consecutive
1568                        // static files share the same bucket in the
1569                        // internal dashmap and we don't drop the current provider
1570                        // before requesting the next one.
1571                        drop(cursor);
1572                        drop(provider);
1573                        provider = get_provider!(number);
1574                        cursor = provider.cursor()?;
1575                        retrying = true;
1576                    }
1577                }
1578            }
1579        }
1580
1581        result.shrink_to_fit();
1582
1583        Ok(result)
1584    }
1585
1586    /// Fetches data within a specified range across multiple static files.
1587    ///
1588    /// Returns an iterator over the data. Yields [`None`] if the data for the specified number is
1589    /// not found.
1590    pub fn fetch_range_iter<'a, T, F>(
1591        &'a self,
1592        segment: StaticFileSegment,
1593        range: Range<u64>,
1594        get_fn: F,
1595    ) -> ProviderResult<impl Iterator<Item = ProviderResult<Option<T>>> + 'a>
1596    where
1597        F: Fn(&mut StaticFileCursor<'_>, u64) -> ProviderResult<Option<T>> + 'a,
1598        T: std::fmt::Debug,
1599    {
1600        let mut provider = self.get_maybe_segment_provider(segment, range.start)?;
1601        Ok(range.map(move |number| {
1602            match provider
1603                .as_ref()
1604                .map(|provider| get_fn(&mut provider.cursor()?, number))
1605                .and_then(|result| result.transpose())
1606            {
1607                Some(result) => result.map(Some),
1608                None => {
1609                    // There is a very small chance of hitting a deadlock if two consecutive
1610                    // static files share the same bucket in the internal dashmap and we don't drop
1611                    // the current provider before requesting the next one.
1612                    provider.take();
1613                    provider = self.get_maybe_segment_provider(segment, number)?;
1614                    provider
1615                        .as_ref()
1616                        .map(|provider| get_fn(&mut provider.cursor()?, number))
1617                        .and_then(|result| result.transpose())
1618                        .transpose()
1619                }
1620            }
1621        }))
1622    }
1623
1624    /// Returns directory where `static_files` are located.
1625    pub fn directory(&self) -> &Path {
1626        &self.path
1627    }
1628
1629    /// Retrieves data from the database or static file, wherever it's available.
1630    ///
1631    /// # Arguments
1632    /// * `segment` - The segment of the static file to check against.
1633    /// * `index_key` - Requested index key, usually a block or transaction number.
1634    /// * `fetch_from_static_file` - A closure that defines how to fetch the data from the static
1635    ///   file provider.
1636    /// * `fetch_from_database` - A closure that defines how to fetch the data from the database
1637    ///   when the static file doesn't contain the required data or is not available.
1638    pub fn get_with_static_file_or_database<T, FS, FD>(
1639        &self,
1640        segment: StaticFileSegment,
1641        number: u64,
1642        fetch_from_static_file: FS,
1643        fetch_from_database: FD,
1644    ) -> ProviderResult<Option<T>>
1645    where
1646        FS: Fn(&Self) -> ProviderResult<Option<T>>,
1647        FD: Fn() -> ProviderResult<Option<T>>,
1648    {
1649        // If there is, check the maximum block or transaction number of the segment.
1650        let static_file_upper_bound = if segment.is_block_based() {
1651            self.get_highest_static_file_block(segment)
1652        } else {
1653            self.get_highest_static_file_tx(segment)
1654        };
1655
1656        if static_file_upper_bound
1657            .is_some_and(|static_file_upper_bound| static_file_upper_bound >= number)
1658        {
1659            return fetch_from_static_file(self)
1660        }
1661        fetch_from_database()
1662    }
1663
1664    /// Gets data within a specified range, potentially spanning different `static_files` and
1665    /// database.
1666    ///
1667    /// # Arguments
1668    /// * `segment` - The segment of the static file to query.
1669    /// * `block_or_tx_range` - The range of data to fetch.
1670    /// * `fetch_from_static_file` - A function to fetch data from the `static_file`.
1671    /// * `fetch_from_database` - A function to fetch data from the database.
1672    /// * `predicate` - A function used to evaluate each item in the fetched data. Fetching is
1673    ///   terminated when this function returns false, thereby filtering the data based on the
1674    ///   provided condition.
1675    pub fn get_range_with_static_file_or_database<T, P, FS, FD>(
1676        &self,
1677        segment: StaticFileSegment,
1678        mut block_or_tx_range: Range<u64>,
1679        fetch_from_static_file: FS,
1680        mut fetch_from_database: FD,
1681        mut predicate: P,
1682    ) -> ProviderResult<Vec<T>>
1683    where
1684        FS: Fn(&Self, Range<u64>, &mut P) -> ProviderResult<Vec<T>>,
1685        FD: FnMut(Range<u64>, P) -> ProviderResult<Vec<T>>,
1686        P: FnMut(&T) -> bool,
1687    {
1688        let mut data = Vec::new();
1689
1690        // If there is, check the maximum block or transaction number of the segment.
1691        if let Some(static_file_upper_bound) = if segment.is_block_based() {
1692            self.get_highest_static_file_block(segment)
1693        } else {
1694            self.get_highest_static_file_tx(segment)
1695        } && block_or_tx_range.start <= static_file_upper_bound
1696        {
1697            let end = block_or_tx_range.end.min(static_file_upper_bound + 1);
1698            data.extend(fetch_from_static_file(
1699                self,
1700                block_or_tx_range.start..end,
1701                &mut predicate,
1702            )?);
1703            block_or_tx_range.start = end;
1704        }
1705
1706        if block_or_tx_range.end > block_or_tx_range.start {
1707            data.extend(fetch_from_database(block_or_tx_range, predicate)?)
1708        }
1709
1710        Ok(data)
1711    }
1712
1713    /// Returns static files directory
1714    #[cfg(any(test, feature = "test-utils"))]
1715    pub fn path(&self) -> &Path {
1716        &self.path
1717    }
1718
1719    /// Returns transaction index
1720    #[cfg(any(test, feature = "test-utils"))]
1721    pub fn tx_index(&self, segment: StaticFileSegment) -> Option<SegmentRanges> {
1722        self.indexes
1723            .read()
1724            .get(&segment)
1725            .and_then(|index| index.available_block_ranges_by_max_tx.as_ref())
1726            .cloned()
1727    }
1728
1729    /// Returns expected block index
1730    #[cfg(any(test, feature = "test-utils"))]
1731    pub fn expected_block_index(&self, segment: StaticFileSegment) -> Option<SegmentRanges> {
1732        self.indexes
1733            .read()
1734            .get(&segment)
1735            .map(|index| &index.expected_block_ranges_by_max_block)
1736            .cloned()
1737    }
1738}
1739
1740#[derive(Debug)]
1741struct StaticFileSegmentIndex {
1742    /// Min static file block range.
1743    ///
1744    /// This index is initialized on launch to keep track of the lowest, non-expired static file
1745    /// per segment and gets updated on [`StaticFileProvider::update_index`].
1746    ///
1747    /// This tracks the lowest static file per segment together with the block range in that
1748    /// file. E.g. static file is batched in 500k block intervals then the lowest static file
1749    /// is [0..499K], and the block range is start = 0, end = 499K.
1750    ///
1751    /// This index is mainly used for history expiry, which targets transactions, e.g. pre-merge
1752    /// history expiry would lead to removing all static files below the merge height.
1753    min_block_range: Option<SegmentRangeInclusive>,
1754    /// Max static file block.
1755    max_block: u64,
1756    /// Expected static file block ranges indexed by max expected blocks.
1757    ///
1758    /// For example, a static file for expected block range `0..=499_000` may have only block range
1759    /// `0..=1000` contained in it, as it's not fully filled yet. This index maps the max expected
1760    /// block to the expected range, i.e. block `499_000` to block range `0..=499_000`.
1761    expected_block_ranges_by_max_block: SegmentRanges,
1762    /// Available on disk static file block ranges indexed by max transactions.
1763    ///
1764    /// For example, a static file for block range `0..=499_000` may only have block range
1765    /// `0..=1000` and transaction range `0..=2000` contained in it. This index maps the max
1766    /// available transaction to the available block range, i.e. transaction `2000` to block range
1767    /// `0..=1000`.
1768    available_block_ranges_by_max_tx: Option<SegmentRanges>,
1769}
1770
1771/// Helper trait to manage different [`StaticFileProviderRW`] of an `Arc<StaticFileProvider`
1772pub trait StaticFileWriter {
1773    /// The primitives type used by the static file provider.
1774    type Primitives: Send + Sync + 'static;
1775
1776    /// Returns a mutable reference to a [`StaticFileProviderRW`] of a [`StaticFileSegment`].
1777    fn get_writer(
1778        &self,
1779        block: BlockNumber,
1780        segment: StaticFileSegment,
1781    ) -> ProviderResult<StaticFileProviderRWRefMut<'_, Self::Primitives>>;
1782
1783    /// Returns a mutable reference to a [`StaticFileProviderRW`] of the latest
1784    /// [`StaticFileSegment`].
1785    fn latest_writer(
1786        &self,
1787        segment: StaticFileSegment,
1788    ) -> ProviderResult<StaticFileProviderRWRefMut<'_, Self::Primitives>>;
1789
1790    /// Commits all changes of all [`StaticFileProviderRW`] of all [`StaticFileSegment`].
1791    fn commit(&self) -> ProviderResult<()>;
1792
1793    /// Returns `true` if the static file provider has unwind queued.
1794    fn has_unwind_queued(&self) -> bool;
1795}
1796
1797impl<N: NodePrimitives> StaticFileWriter for StaticFileProvider<N> {
1798    type Primitives = N;
1799
1800    fn get_writer(
1801        &self,
1802        block: BlockNumber,
1803        segment: StaticFileSegment,
1804    ) -> ProviderResult<StaticFileProviderRWRefMut<'_, Self::Primitives>> {
1805        if self.access.is_read_only() {
1806            return Err(ProviderError::ReadOnlyStaticFileAccess)
1807        }
1808
1809        trace!(target: "provider::static_file", ?block, ?segment, "Getting static file writer.");
1810        self.writers.get_or_create(segment, || {
1811            StaticFileProviderRW::new(segment, block, Arc::downgrade(&self.0), self.metrics.clone())
1812        })
1813    }
1814
1815    fn latest_writer(
1816        &self,
1817        segment: StaticFileSegment,
1818    ) -> ProviderResult<StaticFileProviderRWRefMut<'_, Self::Primitives>> {
1819        let genesis_number = self.0.as_ref().genesis_block_number();
1820        self.get_writer(
1821            self.get_highest_static_file_block(segment).unwrap_or(genesis_number),
1822            segment,
1823        )
1824    }
1825
1826    fn commit(&self) -> ProviderResult<()> {
1827        self.writers.commit()
1828    }
1829
1830    fn has_unwind_queued(&self) -> bool {
1831        self.writers.has_unwind_queued()
1832    }
1833}
1834
1835impl<N: NodePrimitives<BlockHeader: Value>> HeaderProvider for StaticFileProvider<N> {
1836    type Header = N::BlockHeader;
1837
1838    fn header(&self, block_hash: BlockHash) -> ProviderResult<Option<Self::Header>> {
1839        self.find_static_file(StaticFileSegment::Headers, |jar_provider| {
1840            Ok(jar_provider
1841                .cursor()?
1842                .get_two::<HeaderWithHashMask<Self::Header>>((&block_hash).into())?
1843                .and_then(|(header, hash)| {
1844                    if hash == block_hash {
1845                        return Some(header)
1846                    }
1847                    None
1848                }))
1849        })
1850    }
1851
1852    fn header_by_number(&self, num: BlockNumber) -> ProviderResult<Option<Self::Header>> {
1853        self.get_segment_provider_for_block(StaticFileSegment::Headers, num, None)
1854            .and_then(|provider| provider.header_by_number(num))
1855            .or_else(|err| {
1856                if let ProviderError::MissingStaticFileBlock(_, _) = err {
1857                    Ok(None)
1858                } else {
1859                    Err(err)
1860                }
1861            })
1862    }
1863
1864    fn headers_range(
1865        &self,
1866        range: impl RangeBounds<BlockNumber>,
1867    ) -> ProviderResult<Vec<Self::Header>> {
1868        self.fetch_range_with_predicate(
1869            StaticFileSegment::Headers,
1870            to_range(range),
1871            |cursor, number| cursor.get_one::<HeaderMask<Self::Header>>(number.into()),
1872            |_| true,
1873        )
1874    }
1875
1876    fn sealed_header(
1877        &self,
1878        num: BlockNumber,
1879    ) -> ProviderResult<Option<SealedHeader<Self::Header>>> {
1880        self.get_segment_provider_for_block(StaticFileSegment::Headers, num, None)
1881            .and_then(|provider| provider.sealed_header(num))
1882            .or_else(|err| {
1883                if let ProviderError::MissingStaticFileBlock(_, _) = err {
1884                    Ok(None)
1885                } else {
1886                    Err(err)
1887                }
1888            })
1889    }
1890
1891    fn sealed_headers_while(
1892        &self,
1893        range: impl RangeBounds<BlockNumber>,
1894        predicate: impl FnMut(&SealedHeader<Self::Header>) -> bool,
1895    ) -> ProviderResult<Vec<SealedHeader<Self::Header>>> {
1896        self.fetch_range_with_predicate(
1897            StaticFileSegment::Headers,
1898            to_range(range),
1899            |cursor, number| {
1900                Ok(cursor
1901                    .get_two::<HeaderWithHashMask<Self::Header>>(number.into())?
1902                    .map(|(header, hash)| SealedHeader::new(header, hash)))
1903            },
1904            predicate,
1905        )
1906    }
1907}
1908
1909impl<N: NodePrimitives> BlockHashReader for StaticFileProvider<N> {
1910    fn block_hash(&self, num: u64) -> ProviderResult<Option<B256>> {
1911        self.get_segment_provider_for_block(StaticFileSegment::Headers, num, None)
1912            .and_then(|provider| provider.block_hash(num))
1913            .or_else(|err| {
1914                if let ProviderError::MissingStaticFileBlock(_, _) = err {
1915                    Ok(None)
1916                } else {
1917                    Err(err)
1918                }
1919            })
1920    }
1921
1922    fn canonical_hashes_range(
1923        &self,
1924        start: BlockNumber,
1925        end: BlockNumber,
1926    ) -> ProviderResult<Vec<B256>> {
1927        self.fetch_range_with_predicate(
1928            StaticFileSegment::Headers,
1929            start..end,
1930            |cursor, number| cursor.get_one::<BlockHashMask>(number.into()),
1931            |_| true,
1932        )
1933    }
1934}
1935
1936impl<N: NodePrimitives<SignedTx: Value + SignedTransaction, Receipt: Value>> ReceiptProvider
1937    for StaticFileProvider<N>
1938{
1939    type Receipt = N::Receipt;
1940
1941    fn receipt(&self, num: TxNumber) -> ProviderResult<Option<Self::Receipt>> {
1942        self.get_segment_provider_for_transaction(StaticFileSegment::Receipts, num, None)
1943            .and_then(|provider| provider.receipt(num))
1944            .or_else(|err| {
1945                if let ProviderError::MissingStaticFileTx(_, _) = err {
1946                    Ok(None)
1947                } else {
1948                    Err(err)
1949                }
1950            })
1951    }
1952
1953    fn receipt_by_hash(&self, hash: TxHash) -> ProviderResult<Option<Self::Receipt>> {
1954        if let Some(num) = self.transaction_id(hash)? {
1955            return self.receipt(num)
1956        }
1957        Ok(None)
1958    }
1959
1960    fn receipts_by_block(
1961        &self,
1962        _block: BlockHashOrNumber,
1963    ) -> ProviderResult<Option<Vec<Self::Receipt>>> {
1964        unreachable!()
1965    }
1966
1967    fn receipts_by_tx_range(
1968        &self,
1969        range: impl RangeBounds<TxNumber>,
1970    ) -> ProviderResult<Vec<Self::Receipt>> {
1971        self.fetch_range_with_predicate(
1972            StaticFileSegment::Receipts,
1973            to_range(range),
1974            |cursor, number| cursor.get_one::<ReceiptMask<Self::Receipt>>(number.into()),
1975            |_| true,
1976        )
1977    }
1978
1979    fn receipts_by_block_range(
1980        &self,
1981        _block_range: RangeInclusive<BlockNumber>,
1982    ) -> ProviderResult<Vec<Vec<Self::Receipt>>> {
1983        Err(ProviderError::UnsupportedProvider)
1984    }
1985}
1986
1987impl<N: NodePrimitives<SignedTx: Value, Receipt: Value, BlockHeader: Value>> TransactionsProviderExt
1988    for StaticFileProvider<N>
1989{
1990    fn transaction_hashes_by_range(
1991        &self,
1992        tx_range: Range<TxNumber>,
1993    ) -> ProviderResult<Vec<(TxHash, TxNumber)>> {
1994        let tx_range_size = (tx_range.end - tx_range.start) as usize;
1995
1996        // Transactions are different size, so chunks will not all take the same processing time. If
1997        // chunks are too big, there will be idle threads waiting for work. Choosing an
1998        // arbitrary smaller value to make sure it doesn't happen.
1999        let chunk_size = 100;
2000
2001        // iterator over the chunks
2002        let chunks = tx_range
2003            .clone()
2004            .step_by(chunk_size)
2005            .map(|start| start..std::cmp::min(start + chunk_size as u64, tx_range.end));
2006        let mut channels = Vec::with_capacity(tx_range_size.div_ceil(chunk_size));
2007
2008        for chunk_range in chunks {
2009            let (channel_tx, channel_rx) = mpsc::channel();
2010            channels.push(channel_rx);
2011
2012            let manager = self.clone();
2013
2014            // Spawn the task onto the global rayon pool
2015            // This task will send the results through the channel after it has calculated
2016            // the hash.
2017            rayon::spawn(move || {
2018                let mut rlp_buf = Vec::with_capacity(128);
2019                let _ = manager.fetch_range_with_predicate(
2020                    StaticFileSegment::Transactions,
2021                    chunk_range,
2022                    |cursor, number| {
2023                        Ok(cursor
2024                            .get_one::<TransactionMask<Self::Transaction>>(number.into())?
2025                            .map(|transaction| {
2026                                rlp_buf.clear();
2027                                let _ = channel_tx
2028                                    .send(calculate_hash((number, transaction), &mut rlp_buf));
2029                            }))
2030                    },
2031                    |_| true,
2032                );
2033            });
2034        }
2035
2036        let mut tx_list = Vec::with_capacity(tx_range_size);
2037
2038        // Iterate over channels and append the tx hashes unsorted
2039        for channel in channels {
2040            while let Ok(tx) = channel.recv() {
2041                let (tx_hash, tx_id) = tx.map_err(|boxed| *boxed)?;
2042                tx_list.push((tx_hash, tx_id));
2043            }
2044        }
2045
2046        Ok(tx_list)
2047    }
2048}
2049
2050impl<N: NodePrimitives<SignedTx: Decompress + SignedTransaction>> TransactionsProvider
2051    for StaticFileProvider<N>
2052{
2053    type Transaction = N::SignedTx;
2054
2055    fn transaction_id(&self, tx_hash: TxHash) -> ProviderResult<Option<TxNumber>> {
2056        self.find_static_file(StaticFileSegment::Transactions, |jar_provider| {
2057            let mut cursor = jar_provider.cursor()?;
2058            if cursor
2059                .get_one::<TransactionMask<Self::Transaction>>((&tx_hash).into())?
2060                .and_then(|tx| (tx.trie_hash() == tx_hash).then_some(tx))
2061                .is_some()
2062            {
2063                Ok(cursor.number())
2064            } else {
2065                Ok(None)
2066            }
2067        })
2068    }
2069
2070    fn transaction_by_id(&self, num: TxNumber) -> ProviderResult<Option<Self::Transaction>> {
2071        self.get_segment_provider_for_transaction(StaticFileSegment::Transactions, num, None)
2072            .and_then(|provider| provider.transaction_by_id(num))
2073            .or_else(|err| {
2074                if let ProviderError::MissingStaticFileTx(_, _) = err {
2075                    Ok(None)
2076                } else {
2077                    Err(err)
2078                }
2079            })
2080    }
2081
2082    fn transaction_by_id_unhashed(
2083        &self,
2084        num: TxNumber,
2085    ) -> ProviderResult<Option<Self::Transaction>> {
2086        self.get_segment_provider_for_transaction(StaticFileSegment::Transactions, num, None)
2087            .and_then(|provider| provider.transaction_by_id_unhashed(num))
2088            .or_else(|err| {
2089                if let ProviderError::MissingStaticFileTx(_, _) = err {
2090                    Ok(None)
2091                } else {
2092                    Err(err)
2093                }
2094            })
2095    }
2096
2097    fn transaction_by_hash(&self, hash: TxHash) -> ProviderResult<Option<Self::Transaction>> {
2098        self.find_static_file(StaticFileSegment::Transactions, |jar_provider| {
2099            Ok(jar_provider
2100                .cursor()?
2101                .get_one::<TransactionMask<Self::Transaction>>((&hash).into())?
2102                .and_then(|tx| (tx.trie_hash() == hash).then_some(tx)))
2103        })
2104    }
2105
2106    fn transaction_by_hash_with_meta(
2107        &self,
2108        _hash: TxHash,
2109    ) -> ProviderResult<Option<(Self::Transaction, TransactionMeta)>> {
2110        // Required data not present in static_files
2111        Err(ProviderError::UnsupportedProvider)
2112    }
2113
2114    fn transactions_by_block(
2115        &self,
2116        _block_id: BlockHashOrNumber,
2117    ) -> ProviderResult<Option<Vec<Self::Transaction>>> {
2118        // Required data not present in static_files
2119        Err(ProviderError::UnsupportedProvider)
2120    }
2121
2122    fn transactions_by_block_range(
2123        &self,
2124        _range: impl RangeBounds<BlockNumber>,
2125    ) -> ProviderResult<Vec<Vec<Self::Transaction>>> {
2126        // Required data not present in static_files
2127        Err(ProviderError::UnsupportedProvider)
2128    }
2129
2130    fn transactions_by_tx_range(
2131        &self,
2132        range: impl RangeBounds<TxNumber>,
2133    ) -> ProviderResult<Vec<Self::Transaction>> {
2134        self.fetch_range_with_predicate(
2135            StaticFileSegment::Transactions,
2136            to_range(range),
2137            |cursor, number| cursor.get_one::<TransactionMask<Self::Transaction>>(number.into()),
2138            |_| true,
2139        )
2140    }
2141
2142    fn senders_by_tx_range(
2143        &self,
2144        range: impl RangeBounds<TxNumber>,
2145    ) -> ProviderResult<Vec<Address>> {
2146        self.fetch_range_with_predicate(
2147            StaticFileSegment::TransactionSenders,
2148            to_range(range),
2149            |cursor, number| cursor.get_one::<TransactionSenderMask>(number.into()),
2150            |_| true,
2151        )
2152    }
2153
2154    fn transaction_sender(&self, id: TxNumber) -> ProviderResult<Option<Address>> {
2155        self.get_segment_provider_for_transaction(StaticFileSegment::TransactionSenders, id, None)
2156            .and_then(|provider| provider.transaction_sender(id))
2157            .or_else(|err| {
2158                if let ProviderError::MissingStaticFileTx(_, _) = err {
2159                    Ok(None)
2160                } else {
2161                    Err(err)
2162                }
2163            })
2164    }
2165}
2166
2167impl<N: NodePrimitives> BlockNumReader for StaticFileProvider<N> {
2168    fn chain_info(&self) -> ProviderResult<ChainInfo> {
2169        // Required data not present in static_files
2170        Err(ProviderError::UnsupportedProvider)
2171    }
2172
2173    fn best_block_number(&self) -> ProviderResult<BlockNumber> {
2174        // Required data not present in static_files
2175        Err(ProviderError::UnsupportedProvider)
2176    }
2177
2178    fn last_block_number(&self) -> ProviderResult<BlockNumber> {
2179        Ok(self.get_highest_static_file_block(StaticFileSegment::Headers).unwrap_or_default())
2180    }
2181
2182    fn block_number(&self, _hash: B256) -> ProviderResult<Option<BlockNumber>> {
2183        // Required data not present in static_files
2184        Err(ProviderError::UnsupportedProvider)
2185    }
2186}
2187
2188/* Cannot be successfully implemented but must exist for trait requirements */
2189
2190impl<N: NodePrimitives<SignedTx: Value, Receipt: Value, BlockHeader: Value>> BlockReader
2191    for StaticFileProvider<N>
2192{
2193    type Block = N::Block;
2194
2195    fn find_block_by_hash(
2196        &self,
2197        _hash: B256,
2198        _source: BlockSource,
2199    ) -> ProviderResult<Option<Self::Block>> {
2200        // Required data not present in static_files
2201        Err(ProviderError::UnsupportedProvider)
2202    }
2203
2204    fn block(&self, _id: BlockHashOrNumber) -> ProviderResult<Option<Self::Block>> {
2205        // Required data not present in static_files
2206        Err(ProviderError::UnsupportedProvider)
2207    }
2208
2209    fn pending_block(&self) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
2210        // Required data not present in static_files
2211        Err(ProviderError::UnsupportedProvider)
2212    }
2213
2214    fn pending_block_and_receipts(
2215        &self,
2216    ) -> ProviderResult<Option<(RecoveredBlock<Self::Block>, Vec<Self::Receipt>)>> {
2217        // Required data not present in static_files
2218        Err(ProviderError::UnsupportedProvider)
2219    }
2220
2221    fn recovered_block(
2222        &self,
2223        _id: BlockHashOrNumber,
2224        _transaction_kind: TransactionVariant,
2225    ) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
2226        // Required data not present in static_files
2227        Err(ProviderError::UnsupportedProvider)
2228    }
2229
2230    fn sealed_block_with_senders(
2231        &self,
2232        _id: BlockHashOrNumber,
2233        _transaction_kind: TransactionVariant,
2234    ) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
2235        // Required data not present in static_files
2236        Err(ProviderError::UnsupportedProvider)
2237    }
2238
2239    fn block_range(&self, _range: RangeInclusive<BlockNumber>) -> ProviderResult<Vec<Self::Block>> {
2240        // Required data not present in static_files
2241        Err(ProviderError::UnsupportedProvider)
2242    }
2243
2244    fn block_with_senders_range(
2245        &self,
2246        _range: RangeInclusive<BlockNumber>,
2247    ) -> ProviderResult<Vec<RecoveredBlock<Self::Block>>> {
2248        Err(ProviderError::UnsupportedProvider)
2249    }
2250
2251    fn recovered_block_range(
2252        &self,
2253        _range: RangeInclusive<BlockNumber>,
2254    ) -> ProviderResult<Vec<RecoveredBlock<Self::Block>>> {
2255        Err(ProviderError::UnsupportedProvider)
2256    }
2257
2258    fn block_by_transaction_id(&self, _id: TxNumber) -> ProviderResult<Option<BlockNumber>> {
2259        Err(ProviderError::UnsupportedProvider)
2260    }
2261}
2262
2263impl<N: NodePrimitives> BlockBodyIndicesProvider for StaticFileProvider<N> {
2264    fn block_body_indices(&self, _num: u64) -> ProviderResult<Option<StoredBlockBodyIndices>> {
2265        Err(ProviderError::UnsupportedProvider)
2266    }
2267
2268    fn block_body_indices_range(
2269        &self,
2270        _range: RangeInclusive<BlockNumber>,
2271    ) -> ProviderResult<Vec<StoredBlockBodyIndices>> {
2272        Err(ProviderError::UnsupportedProvider)
2273    }
2274}
2275
2276impl<N: NodePrimitives> StatsReader for StaticFileProvider<N> {
2277    fn count_entries<T: Table>(&self) -> ProviderResult<usize> {
2278        match T::NAME {
2279            tables::CanonicalHeaders::NAME |
2280            tables::Headers::<Header>::NAME |
2281            tables::HeaderTerminalDifficulties::NAME => Ok(self
2282                .get_highest_static_file_block(StaticFileSegment::Headers)
2283                .map(|block| block + 1)
2284                .unwrap_or_default()
2285                as usize),
2286            tables::Receipts::<Receipt>::NAME => Ok(self
2287                .get_highest_static_file_tx(StaticFileSegment::Receipts)
2288                .map(|receipts| receipts + 1)
2289                .unwrap_or_default() as usize),
2290            tables::Transactions::<TransactionSigned>::NAME => Ok(self
2291                .get_highest_static_file_tx(StaticFileSegment::Transactions)
2292                .map(|txs| txs + 1)
2293                .unwrap_or_default()
2294                as usize),
2295            tables::TransactionSenders::NAME => Ok(self
2296                .get_highest_static_file_tx(StaticFileSegment::TransactionSenders)
2297                .map(|txs| txs + 1)
2298                .unwrap_or_default() as usize),
2299            _ => Err(ProviderError::UnsupportedProvider),
2300        }
2301    }
2302}
2303
2304/// Calculates the tx hash for the given transaction and its id.
2305#[inline]
2306fn calculate_hash<T>(
2307    entry: (TxNumber, T),
2308    rlp_buf: &mut Vec<u8>,
2309) -> Result<(B256, TxNumber), Box<ProviderError>>
2310where
2311    T: Encodable2718,
2312{
2313    let (tx_id, tx) = entry;
2314    tx.encode_2718(rlp_buf);
2315    Ok((keccak256(rlp_buf), tx_id))
2316}
2317
2318#[cfg(test)]
2319mod tests {
2320    use std::collections::BTreeMap;
2321
2322    use reth_chain_state::EthPrimitives;
2323    use reth_db::test_utils::create_test_static_files_dir;
2324    use reth_static_file_types::{SegmentRangeInclusive, StaticFileSegment};
2325
2326    use crate::StaticFileProviderBuilder;
2327
2328    #[test]
2329    fn test_find_fixed_range_with_block_index() -> eyre::Result<()> {
2330        let (static_dir, _) = create_test_static_files_dir();
2331        let sf_rw = StaticFileProviderBuilder::<EthPrimitives>::read_write(&static_dir)?
2332            .with_blocks_per_file(100)
2333            .build()?;
2334
2335        let segment = StaticFileSegment::Headers;
2336
2337        // Test with None - should use default behavior
2338        assert_eq!(
2339            sf_rw.find_fixed_range_with_block_index(segment, None, 0),
2340            SegmentRangeInclusive::new(0, 99)
2341        );
2342        assert_eq!(
2343            sf_rw.find_fixed_range_with_block_index(segment, None, 250),
2344            SegmentRangeInclusive::new(200, 299)
2345        );
2346
2347        // Test with empty index - should fall back to default behavior
2348        assert_eq!(
2349            sf_rw.find_fixed_range_with_block_index(segment, Some(&BTreeMap::new()), 150),
2350            SegmentRangeInclusive::new(100, 199)
2351        );
2352
2353        // Create block index with existing ranges
2354        let block_index = BTreeMap::from_iter([
2355            (99, SegmentRangeInclusive::new(0, 99)),
2356            (199, SegmentRangeInclusive::new(100, 199)),
2357            (299, SegmentRangeInclusive::new(200, 299)),
2358        ]);
2359
2360        // Test blocks within existing ranges - should return the matching range
2361        assert_eq!(
2362            sf_rw.find_fixed_range_with_block_index(segment, Some(&block_index), 0),
2363            SegmentRangeInclusive::new(0, 99)
2364        );
2365        assert_eq!(
2366            sf_rw.find_fixed_range_with_block_index(segment, Some(&block_index), 50),
2367            SegmentRangeInclusive::new(0, 99)
2368        );
2369        assert_eq!(
2370            sf_rw.find_fixed_range_with_block_index(segment, Some(&block_index), 99),
2371            SegmentRangeInclusive::new(0, 99)
2372        );
2373        assert_eq!(
2374            sf_rw.find_fixed_range_with_block_index(segment, Some(&block_index), 100),
2375            SegmentRangeInclusive::new(100, 199)
2376        );
2377        assert_eq!(
2378            sf_rw.find_fixed_range_with_block_index(segment, Some(&block_index), 150),
2379            SegmentRangeInclusive::new(100, 199)
2380        );
2381        assert_eq!(
2382            sf_rw.find_fixed_range_with_block_index(segment, Some(&block_index), 199),
2383            SegmentRangeInclusive::new(100, 199)
2384        );
2385
2386        // Test blocks beyond existing ranges - should derive new ranges from the last range
2387        // Block 300 is exactly one segment after the last range
2388        assert_eq!(
2389            sf_rw.find_fixed_range_with_block_index(segment, Some(&block_index), 300),
2390            SegmentRangeInclusive::new(300, 399)
2391        );
2392        assert_eq!(
2393            sf_rw.find_fixed_range_with_block_index(segment, Some(&block_index), 350),
2394            SegmentRangeInclusive::new(300, 399)
2395        );
2396
2397        // Block 500 skips one segment (300-399)
2398        assert_eq!(
2399            sf_rw.find_fixed_range_with_block_index(segment, Some(&block_index), 500),
2400            SegmentRangeInclusive::new(500, 599)
2401        );
2402
2403        // Block 1000 skips many segments
2404        assert_eq!(
2405            sf_rw.find_fixed_range_with_block_index(segment, Some(&block_index), 1000),
2406            SegmentRangeInclusive::new(1000, 1099)
2407        );
2408
2409        // Test with block index having different sizes than blocks_per_file setting
2410        // This simulates the scenario where blocks_per_file was changed between runs
2411        let mixed_size_index = BTreeMap::from_iter([
2412            (49, SegmentRangeInclusive::new(0, 49)),     // 50 blocks
2413            (149, SegmentRangeInclusive::new(50, 149)),  // 100 blocks
2414            (349, SegmentRangeInclusive::new(150, 349)), // 200 blocks
2415        ]);
2416
2417        // Blocks within existing ranges should return those ranges regardless of size
2418        assert_eq!(
2419            sf_rw.find_fixed_range_with_block_index(segment, Some(&mixed_size_index), 25),
2420            SegmentRangeInclusive::new(0, 49)
2421        );
2422        assert_eq!(
2423            sf_rw.find_fixed_range_with_block_index(segment, Some(&mixed_size_index), 100),
2424            SegmentRangeInclusive::new(50, 149)
2425        );
2426        assert_eq!(
2427            sf_rw.find_fixed_range_with_block_index(segment, Some(&mixed_size_index), 200),
2428            SegmentRangeInclusive::new(150, 349)
2429        );
2430
2431        // Block after the last range should derive using current blocks_per_file (100)
2432        // from the end of the last range (349)
2433        assert_eq!(
2434            sf_rw.find_fixed_range_with_block_index(segment, Some(&mixed_size_index), 350),
2435            SegmentRangeInclusive::new(350, 449)
2436        );
2437        assert_eq!(
2438            sf_rw.find_fixed_range_with_block_index(segment, Some(&mixed_size_index), 450),
2439            SegmentRangeInclusive::new(450, 549)
2440        );
2441        assert_eq!(
2442            sf_rw.find_fixed_range_with_block_index(segment, Some(&mixed_size_index), 550),
2443            SegmentRangeInclusive::new(550, 649)
2444        );
2445
2446        Ok(())
2447    }
2448}