reth_provider/providers/static_file/
manager.rs

1use super::{
2    metrics::StaticFileProviderMetrics, writer::StaticFileWriters, LoadedJar,
3    StaticFileJarProvider, StaticFileProviderRW, StaticFileProviderRWRefMut,
4};
5use crate::{
6    to_range, BlockHashReader, BlockNumReader, BlockReader, BlockSource, HeaderProvider,
7    ReceiptProvider, StageCheckpointReader, StatsReader, TransactionVariant, TransactionsProvider,
8    TransactionsProviderExt, WithdrawalsProvider,
9};
10use alloy_consensus::{transaction::TransactionMeta, Header};
11use alloy_eips::{eip2718::Encodable2718, eip4895::Withdrawals, BlockHashOrNumber};
12use alloy_primitives::{
13    b256, keccak256, Address, BlockHash, BlockNumber, TxHash, TxNumber, B256, U256,
14};
15use dashmap::DashMap;
16use notify::{RecommendedWatcher, RecursiveMode, Watcher};
17use parking_lot::RwLock;
18use reth_chainspec::{ChainInfo, ChainSpecProvider, EthChainSpec};
19use reth_db::{
20    lockfile::StorageLock,
21    static_file::{
22        iter_static_files, BlockHashMask, BodyIndicesMask, HeaderMask, HeaderWithHashMask,
23        ReceiptMask, StaticFileCursor, TDWithHashMask, TransactionMask,
24    },
25};
26use reth_db_api::{
27    cursor::DbCursorRO,
28    models::StoredBlockBodyIndices,
29    table::{Decompress, Table, Value},
30    tables,
31    transaction::DbTx,
32};
33use reth_nippy_jar::{NippyJar, NippyJarChecker, CONFIG_FILE_EXTENSION};
34use reth_node_types::{FullNodePrimitives, NodePrimitives};
35use reth_primitives::{
36    static_file::{
37        find_fixed_range, HighestStaticFiles, SegmentHeader, SegmentRangeInclusive,
38        DEFAULT_BLOCKS_PER_STATIC_FILE,
39    },
40    Receipt, RecoveredBlock, SealedBlock, SealedHeader, StaticFileSegment, TransactionSigned,
41};
42use reth_primitives_traits::SignedTransaction;
43use reth_stages_types::{PipelineTarget, StageId};
44use reth_storage_api::{BlockBodyIndicesProvider, DBProvider, OmmersProvider};
45use reth_storage_errors::provider::{ProviderError, ProviderResult};
46use std::{
47    collections::{hash_map::Entry, BTreeMap, HashMap},
48    fmt::Debug,
49    marker::PhantomData,
50    ops::{Deref, Range, RangeBounds, RangeInclusive},
51    path::{Path, PathBuf},
52    sync::{mpsc, Arc},
53};
54use tracing::{info, trace, warn};
55
56/// Alias type for a map that can be queried for block ranges from a transaction
57/// segment respectively. It uses `TxNumber` to represent the transaction end of a static file
58/// range.
59type SegmentRanges = HashMap<StaticFileSegment, BTreeMap<TxNumber, SegmentRangeInclusive>>;
60
61/// Access mode on a static file provider. RO/RW.
62#[derive(Debug, Default, PartialEq, Eq)]
63pub enum StaticFileAccess {
64    /// Read-only access.
65    #[default]
66    RO,
67    /// Read-write access.
68    RW,
69}
70
71impl StaticFileAccess {
72    /// Returns `true` if read-only access.
73    pub const fn is_read_only(&self) -> bool {
74        matches!(self, Self::RO)
75    }
76
77    /// Returns `true` if read-write access.
78    pub const fn is_read_write(&self) -> bool {
79        matches!(self, Self::RW)
80    }
81}
82
83/// [`StaticFileProvider`] manages all existing [`StaticFileJarProvider`].
84///
85/// "Static files" contain immutable chain history data, such as:
86///  - transactions
87///  - headers
88///  - receipts
89///
90/// This provider type is responsible for reading and writing to static files.
91#[derive(Debug)]
92pub struct StaticFileProvider<N>(pub(crate) Arc<StaticFileProviderInner<N>>);
93
94impl<N> Clone for StaticFileProvider<N> {
95    fn clone(&self) -> Self {
96        Self(self.0.clone())
97    }
98}
99
100impl<N: NodePrimitives> StaticFileProvider<N> {
101    /// Creates a new [`StaticFileProvider`] with the given [`StaticFileAccess`].
102    fn new(path: impl AsRef<Path>, access: StaticFileAccess) -> ProviderResult<Self> {
103        let provider = Self(Arc::new(StaticFileProviderInner::new(path, access)?));
104        provider.initialize_index()?;
105        Ok(provider)
106    }
107
108    /// Creates a new [`StaticFileProvider`] with read-only access.
109    ///
110    /// Set `watch_directory` to `true` to track the most recent changes in static files. Otherwise,
111    /// new data won't be detected or queryable.
112    ///
113    /// Watching is recommended if the read-only provider is used on a directory that an active node
114    /// instance is modifying.
115    ///
116    /// See also [`StaticFileProvider::watch_directory`].
117    pub fn read_only(path: impl AsRef<Path>, watch_directory: bool) -> ProviderResult<Self> {
118        let provider = Self::new(path, StaticFileAccess::RO)?;
119
120        if watch_directory {
121            provider.watch_directory();
122        }
123
124        Ok(provider)
125    }
126
127    /// Creates a new [`StaticFileProvider`] with read-write access.
128    pub fn read_write(path: impl AsRef<Path>) -> ProviderResult<Self> {
129        Self::new(path, StaticFileAccess::RW)
130    }
131
132    /// Watches the directory for changes and updates the in-memory index when modifications
133    /// are detected.
134    ///
135    /// This may be necessary, since a non-node process that owns a [`StaticFileProvider`] does not
136    /// receive `update_index` notifications from a node that appends/truncates data.
137    pub fn watch_directory(&self) {
138        let provider = self.clone();
139        std::thread::spawn(move || {
140            let (tx, rx) = std::sync::mpsc::channel();
141            let mut watcher = RecommendedWatcher::new(
142                move |res| tx.send(res).unwrap(),
143                notify::Config::default(),
144            )
145            .expect("failed to create watcher");
146
147            watcher
148                .watch(&provider.path, RecursiveMode::NonRecursive)
149                .expect("failed to watch path");
150
151            // Some backends send repeated modified events
152            let mut last_event_timestamp = None;
153
154            while let Ok(res) = rx.recv() {
155                match res {
156                    Ok(event) => {
157                        // We only care about modified data events
158                        if !matches!(
159                            event.kind,
160                            notify::EventKind::Modify(_) |
161                                notify::EventKind::Create(_) |
162                                notify::EventKind::Remove(_)
163                        ) {
164                            continue
165                        }
166
167                        // We only trigger a re-initialization if a configuration file was
168                        // modified. This means that a
169                        // static_file_provider.commit() was called on the node after
170                        // appending/truncating rows
171                        for segment in event.paths {
172                            // Ensure it's a file with the .conf extension
173                            #[allow(clippy::nonminimal_bool)]
174                            if !segment
175                                .extension()
176                                .is_some_and(|s| s.to_str() == Some(CONFIG_FILE_EXTENSION))
177                            {
178                                continue
179                            }
180
181                            // Ensure it's well formatted static file name
182                            if StaticFileSegment::parse_filename(
183                                &segment.file_stem().expect("qed").to_string_lossy(),
184                            )
185                            .is_none()
186                            {
187                                continue
188                            }
189
190                            // If we can read the metadata and modified timestamp, ensure this is
191                            // not an old or repeated event.
192                            if let Ok(current_modified_timestamp) =
193                                std::fs::metadata(&segment).and_then(|m| m.modified())
194                            {
195                                if last_event_timestamp.is_some_and(|last_timestamp| {
196                                    last_timestamp >= current_modified_timestamp
197                                }) {
198                                    continue
199                                }
200                                last_event_timestamp = Some(current_modified_timestamp);
201                            }
202
203                            info!(target: "providers::static_file", updated_file = ?segment.file_stem(), "re-initializing static file provider index");
204                            if let Err(err) = provider.initialize_index() {
205                                warn!(target: "providers::static_file", "failed to re-initialize index: {err}");
206                            }
207                            break
208                        }
209                    }
210
211                    Err(err) => warn!(target: "providers::watcher", "watch error: {err:?}"),
212                }
213            }
214        });
215    }
216}
217
218impl<N: NodePrimitives> Deref for StaticFileProvider<N> {
219    type Target = StaticFileProviderInner<N>;
220
221    fn deref(&self) -> &Self::Target {
222        &self.0
223    }
224}
225
226/// [`StaticFileProviderInner`] manages all existing [`StaticFileJarProvider`].
227#[derive(Debug)]
228pub struct StaticFileProviderInner<N> {
229    /// Maintains a map which allows for concurrent access to different `NippyJars`, over different
230    /// segments and ranges.
231    map: DashMap<(BlockNumber, StaticFileSegment), LoadedJar>,
232    /// Max static file block for each segment
233    static_files_max_block: RwLock<HashMap<StaticFileSegment, u64>>,
234    /// Available static file block ranges on disk indexed by max transactions.
235    static_files_tx_index: RwLock<SegmentRanges>,
236    /// Directory where `static_files` are located
237    path: PathBuf,
238    /// Maintains a writer set of [`StaticFileSegment`].
239    writers: StaticFileWriters<N>,
240    /// Metrics for the static files.
241    metrics: Option<Arc<StaticFileProviderMetrics>>,
242    /// Access rights of the provider.
243    access: StaticFileAccess,
244    /// Number of blocks per file.
245    blocks_per_file: u64,
246    /// Write lock for when access is [`StaticFileAccess::RW`].
247    _lock_file: Option<StorageLock>,
248    /// Node primitives
249    _pd: PhantomData<N>,
250}
251
252impl<N: NodePrimitives> StaticFileProviderInner<N> {
253    /// Creates a new [`StaticFileProviderInner`].
254    fn new(path: impl AsRef<Path>, access: StaticFileAccess) -> ProviderResult<Self> {
255        let _lock_file = if access.is_read_write() {
256            StorageLock::try_acquire(path.as_ref())?.into()
257        } else {
258            None
259        };
260
261        let provider = Self {
262            map: Default::default(),
263            writers: Default::default(),
264            static_files_max_block: Default::default(),
265            static_files_tx_index: Default::default(),
266            path: path.as_ref().to_path_buf(),
267            metrics: None,
268            access,
269            blocks_per_file: DEFAULT_BLOCKS_PER_STATIC_FILE,
270            _lock_file,
271            _pd: Default::default(),
272        };
273
274        Ok(provider)
275    }
276
277    pub const fn is_read_only(&self) -> bool {
278        self.access.is_read_only()
279    }
280
281    /// Each static file has a fixed number of blocks. This gives out the range where the requested
282    /// block is positioned.
283    pub const fn find_fixed_range(&self, block: BlockNumber) -> SegmentRangeInclusive {
284        find_fixed_range(block, self.blocks_per_file)
285    }
286}
287
288impl<N: NodePrimitives> StaticFileProvider<N> {
289    /// Set a custom number of blocks per file.
290    #[cfg(any(test, feature = "test-utils"))]
291    pub fn with_custom_blocks_per_file(self, blocks_per_file: u64) -> Self {
292        let mut provider =
293            Arc::try_unwrap(self.0).expect("should be called when initializing only");
294        provider.blocks_per_file = blocks_per_file;
295        Self(Arc::new(provider))
296    }
297
298    /// Enables metrics on the [`StaticFileProvider`].
299    pub fn with_metrics(self) -> Self {
300        let mut provider =
301            Arc::try_unwrap(self.0).expect("should be called when initializing only");
302        provider.metrics = Some(Arc::new(StaticFileProviderMetrics::default()));
303        Self(Arc::new(provider))
304    }
305
306    /// Reports metrics for the static files.
307    pub fn report_metrics(&self) -> ProviderResult<()> {
308        let Some(metrics) = &self.metrics else { return Ok(()) };
309
310        let static_files = iter_static_files(&self.path).map_err(ProviderError::other)?;
311        for (segment, ranges) in static_files {
312            let mut entries = 0;
313            let mut size = 0;
314
315            for (block_range, _) in &ranges {
316                let fixed_block_range = self.find_fixed_range(block_range.start());
317                let jar_provider = self
318                    .get_segment_provider(segment, || Some(fixed_block_range), None)?
319                    .ok_or_else(|| {
320                        ProviderError::MissingStaticFileBlock(segment, block_range.start())
321                    })?;
322
323                entries += jar_provider.rows();
324
325                let data_size = reth_fs_util::metadata(jar_provider.data_path())
326                    .map(|metadata| metadata.len())
327                    .unwrap_or_default();
328                let index_size = reth_fs_util::metadata(jar_provider.index_path())
329                    .map(|metadata| metadata.len())
330                    .unwrap_or_default();
331                let offsets_size = reth_fs_util::metadata(jar_provider.offsets_path())
332                    .map(|metadata| metadata.len())
333                    .unwrap_or_default();
334                let config_size = reth_fs_util::metadata(jar_provider.config_path())
335                    .map(|metadata| metadata.len())
336                    .unwrap_or_default();
337
338                size += data_size + index_size + offsets_size + config_size;
339            }
340
341            metrics.record_segment(segment, size, ranges.len(), entries);
342        }
343
344        Ok(())
345    }
346
347    /// Gets the [`StaticFileJarProvider`] of the requested segment and block.
348    pub fn get_segment_provider_from_block(
349        &self,
350        segment: StaticFileSegment,
351        block: BlockNumber,
352        path: Option<&Path>,
353    ) -> ProviderResult<StaticFileJarProvider<'_, N>> {
354        self.get_segment_provider(
355            segment,
356            || self.get_segment_ranges_from_block(segment, block),
357            path,
358        )?
359        .ok_or(ProviderError::MissingStaticFileBlock(segment, block))
360    }
361
362    /// Gets the [`StaticFileJarProvider`] of the requested segment and transaction.
363    pub fn get_segment_provider_from_transaction(
364        &self,
365        segment: StaticFileSegment,
366        tx: TxNumber,
367        path: Option<&Path>,
368    ) -> ProviderResult<StaticFileJarProvider<'_, N>> {
369        self.get_segment_provider(
370            segment,
371            || self.get_segment_ranges_from_transaction(segment, tx),
372            path,
373        )?
374        .ok_or(ProviderError::MissingStaticFileTx(segment, tx))
375    }
376
377    /// Gets the [`StaticFileJarProvider`] of the requested segment and block or transaction.
378    ///
379    /// `fn_range` should make sure the range goes through `find_fixed_range`.
380    pub fn get_segment_provider(
381        &self,
382        segment: StaticFileSegment,
383        fn_range: impl Fn() -> Option<SegmentRangeInclusive>,
384        path: Option<&Path>,
385    ) -> ProviderResult<Option<StaticFileJarProvider<'_, N>>> {
386        // If we have a path, then get the block range from its name.
387        // Otherwise, check `self.available_static_files`
388        let block_range = match path {
389            Some(path) => StaticFileSegment::parse_filename(
390                &path
391                    .file_name()
392                    .ok_or_else(|| {
393                        ProviderError::MissingStaticFilePath(segment, path.to_path_buf())
394                    })?
395                    .to_string_lossy(),
396            )
397            .and_then(|(parsed_segment, block_range)| {
398                if parsed_segment == segment {
399                    return Some(block_range)
400                }
401                None
402            }),
403            None => fn_range(),
404        };
405
406        // Return cached `LoadedJar` or insert it for the first time, and then, return it.
407        if let Some(block_range) = block_range {
408            return Ok(Some(self.get_or_create_jar_provider(segment, &block_range)?))
409        }
410
411        Ok(None)
412    }
413
414    /// Given a segment and block range it removes the cached provider from the map.
415    ///
416    /// CAUTION: cached provider should be dropped before calling this or IT WILL deadlock.
417    pub fn remove_cached_provider(
418        &self,
419        segment: StaticFileSegment,
420        fixed_block_range_end: BlockNumber,
421    ) {
422        self.map.remove(&(fixed_block_range_end, segment));
423    }
424
425    /// Given a segment and block, it deletes the jar and all files from the respective block range.
426    ///
427    /// CAUTION: destructive. Deletes files on disk.
428    pub fn delete_jar(&self, segment: StaticFileSegment, block: BlockNumber) -> ProviderResult<()> {
429        let fixed_block_range = self.find_fixed_range(block);
430        let key = (fixed_block_range.end(), segment);
431        let jar = if let Some((_, jar)) = self.map.remove(&key) {
432            jar.jar
433        } else {
434            NippyJar::<SegmentHeader>::load(&self.path.join(segment.filename(&fixed_block_range)))
435                .map_err(ProviderError::other)?
436        };
437
438        jar.delete().map_err(ProviderError::other)?;
439
440        let mut segment_max_block = None;
441        if fixed_block_range.start() > 0 {
442            segment_max_block = Some(fixed_block_range.start() - 1)
443        };
444        self.update_index(segment, segment_max_block)?;
445
446        Ok(())
447    }
448
449    /// Given a segment and block range it returns a cached
450    /// [`StaticFileJarProvider`]. TODO(joshie): we should check the size and pop N if there's too
451    /// many.
452    fn get_or_create_jar_provider(
453        &self,
454        segment: StaticFileSegment,
455        fixed_block_range: &SegmentRangeInclusive,
456    ) -> ProviderResult<StaticFileJarProvider<'_, N>> {
457        let key = (fixed_block_range.end(), segment);
458
459        // Avoid using `entry` directly to avoid a write lock in the common case.
460        trace!(target: "provider::static_file", ?segment, ?fixed_block_range, "Getting provider");
461        let mut provider: StaticFileJarProvider<'_, N> = if let Some(jar) = self.map.get(&key) {
462            trace!(target: "provider::static_file", ?segment, ?fixed_block_range, "Jar found in cache");
463            jar.into()
464        } else {
465            trace!(target: "provider::static_file", ?segment, ?fixed_block_range, "Creating jar from scratch");
466            let path = self.path.join(segment.filename(fixed_block_range));
467            let jar = NippyJar::load(&path).map_err(ProviderError::other)?;
468            self.map.entry(key).insert(LoadedJar::new(jar)?).downgrade().into()
469        };
470
471        if let Some(metrics) = &self.metrics {
472            provider = provider.with_metrics(metrics.clone());
473        }
474        Ok(provider)
475    }
476
477    /// Gets a static file segment's block range from the provider inner block
478    /// index.
479    fn get_segment_ranges_from_block(
480        &self,
481        segment: StaticFileSegment,
482        block: u64,
483    ) -> Option<SegmentRangeInclusive> {
484        self.static_files_max_block
485            .read()
486            .get(&segment)
487            .filter(|max| **max >= block)
488            .map(|_| self.find_fixed_range(block))
489    }
490
491    /// Gets a static file segment's fixed block range from the provider inner
492    /// transaction index.
493    fn get_segment_ranges_from_transaction(
494        &self,
495        segment: StaticFileSegment,
496        tx: u64,
497    ) -> Option<SegmentRangeInclusive> {
498        let static_files = self.static_files_tx_index.read();
499        let segment_static_files = static_files.get(&segment)?;
500
501        // It's more probable that the request comes from a newer tx height, so we iterate
502        // the static_files in reverse.
503        let mut static_files_rev_iter = segment_static_files.iter().rev().peekable();
504
505        while let Some((tx_end, block_range)) = static_files_rev_iter.next() {
506            if tx > *tx_end {
507                // request tx is higher than highest static file tx
508                return None
509            }
510            let tx_start = static_files_rev_iter.peek().map(|(tx_end, _)| *tx_end + 1).unwrap_or(0);
511            if tx_start <= tx {
512                return Some(self.find_fixed_range(block_range.end()))
513            }
514        }
515        None
516    }
517
518    /// Updates the inner transaction and block indexes alongside the internal cached providers in
519    /// `self.map`.
520    ///
521    /// Any entry higher than `segment_max_block` will be deleted from the previous structures.
522    ///
523    /// If `segment_max_block` is None it means there's no static file for this segment.
524    pub fn update_index(
525        &self,
526        segment: StaticFileSegment,
527        segment_max_block: Option<BlockNumber>,
528    ) -> ProviderResult<()> {
529        let mut max_block = self.static_files_max_block.write();
530        let mut tx_index = self.static_files_tx_index.write();
531
532        match segment_max_block {
533            Some(segment_max_block) => {
534                // Update the max block for the segment
535                max_block.insert(segment, segment_max_block);
536                let fixed_range = self.find_fixed_range(segment_max_block);
537
538                let jar = NippyJar::<SegmentHeader>::load(
539                    &self.path.join(segment.filename(&fixed_range)),
540                )
541                .map_err(ProviderError::other)?;
542
543                // Updates the tx index by first removing all entries which have a higher
544                // block_start than our current static file.
545                if let Some(tx_range) = jar.user_header().tx_range() {
546                    let tx_end = tx_range.end();
547
548                    // Current block range has the same block start as `fixed_range``, but block end
549                    // might be different if we are still filling this static file.
550                    if let Some(current_block_range) = jar.user_header().block_range().copied() {
551                        // Considering that `update_index` is called when we either append/truncate,
552                        // we are sure that we are handling the latest data
553                        // points.
554                        //
555                        // Here we remove every entry of the index that has a block start higher or
556                        // equal than our current one. This is important in the case
557                        // that we prune a lot of rows resulting in a file (and thus
558                        // a higher block range) deletion.
559                        tx_index
560                            .entry(segment)
561                            .and_modify(|index| {
562                                index.retain(|_, block_range| {
563                                    block_range.start() < fixed_range.start()
564                                });
565                                index.insert(tx_end, current_block_range);
566                            })
567                            .or_insert_with(|| BTreeMap::from([(tx_end, current_block_range)]));
568                    }
569                } else if segment.is_tx_based() {
570                    // The unwinded file has no more transactions/receipts. However, the highest
571                    // block is within this files' block range. We only retain
572                    // entries with block ranges before the current one.
573                    tx_index.entry(segment).and_modify(|index| {
574                        index.retain(|_, block_range| block_range.start() < fixed_range.start());
575                    });
576
577                    // If the index is empty, just remove it.
578                    if tx_index.get(&segment).is_some_and(|index| index.is_empty()) {
579                        tx_index.remove(&segment);
580                    }
581                }
582
583                // Update the cached provider.
584                self.map.insert((fixed_range.end(), segment), LoadedJar::new(jar)?);
585
586                // Delete any cached provider that no longer has an associated jar.
587                self.map.retain(|(end, seg), _| !(*seg == segment && *end > fixed_range.end()));
588            }
589            None => {
590                tx_index.remove(&segment);
591                max_block.remove(&segment);
592            }
593        };
594
595        Ok(())
596    }
597
598    /// Initializes the inner transaction and block index
599    pub fn initialize_index(&self) -> ProviderResult<()> {
600        let mut max_block = self.static_files_max_block.write();
601        let mut tx_index = self.static_files_tx_index.write();
602
603        max_block.clear();
604        tx_index.clear();
605
606        for (segment, ranges) in iter_static_files(&self.path).map_err(ProviderError::other)? {
607            // Update last block for each segment
608            if let Some((block_range, _)) = ranges.last() {
609                max_block.insert(segment, block_range.end());
610            }
611
612            // Update tx -> block_range index
613            for (block_range, tx_range) in ranges {
614                if let Some(tx_range) = tx_range {
615                    let tx_end = tx_range.end();
616
617                    match tx_index.entry(segment) {
618                        Entry::Occupied(mut index) => {
619                            index.get_mut().insert(tx_end, block_range);
620                        }
621                        Entry::Vacant(index) => {
622                            index.insert(BTreeMap::from([(tx_end, block_range)]));
623                        }
624                    };
625                }
626            }
627        }
628
629        // If this is a re-initialization, we need to clear this as well
630        self.map.clear();
631
632        Ok(())
633    }
634
635    /// Ensures that any broken invariants which cannot be healed on the spot return a pipeline
636    /// target to unwind to.
637    ///
638    /// Two types of consistency checks are done for:
639    ///
640    /// 1) When a static file fails to commit but the underlying data was changed.
641    /// 2) When a static file was committed, but the required database transaction was not.
642    ///
643    /// For 1) it can self-heal if `self.access.is_read_only()` is set to `false`. Otherwise, it
644    /// will return an error.
645    /// For 2) the invariants below are checked, and if broken, might require a pipeline unwind
646    /// to heal.
647    ///
648    /// For each static file segment:
649    /// * the corresponding database table should overlap or have continuity in their keys
650    ///   ([`TxNumber`] or [`BlockNumber`]).
651    /// * its highest block should match the stage checkpoint block number if it's equal or higher
652    ///   than the corresponding database table last entry.
653    ///
654    /// Returns a [`Option`] of [`PipelineTarget::Unwind`] if any healing is further required.
655    ///
656    /// WARNING: No static file writer should be held before calling this function, otherwise it
657    /// will deadlock.
658    #[allow(clippy::while_let_loop)]
659    pub fn check_consistency<Provider>(
660        &self,
661        provider: &Provider,
662        has_receipt_pruning: bool,
663    ) -> ProviderResult<Option<PipelineTarget>>
664    where
665        Provider: DBProvider + BlockReader + StageCheckpointReader + ChainSpecProvider,
666        N: NodePrimitives<Receipt: Value, BlockHeader: Value, SignedTx: Value>,
667    {
668        // OVM historical import is broken and does not work with this check. It's importing
669        // duplicated receipts resulting in having more receipts than the expected transaction
670        // range.
671        //
672        // If we detect an OVM import was done (block #1 <https://optimistic.etherscan.io/block/1>), skip it.
673        // More on [#11099](https://github.com/paradigmxyz/reth/pull/11099).
674        if provider.chain_spec().is_optimism() &&
675            reth_chainspec::Chain::optimism_mainnet() == provider.chain_spec().chain_id()
676        {
677            // check whether we have the first OVM block: <https://optimistic.etherscan.io/block/0xbee7192e575af30420cae0c7776304ac196077ee72b048970549e4f08e875453>
678            const OVM_HEADER_1_HASH: B256 =
679                b256!("0xbee7192e575af30420cae0c7776304ac196077ee72b048970549e4f08e875453");
680            if provider.block_number(OVM_HEADER_1_HASH)?.is_some() {
681                info!(target: "reth::cli",
682                    "Skipping storage verification for OP mainnet, expected inconsistency in OVM chain"
683                );
684                return Ok(None)
685            }
686        }
687
688        info!(target: "reth::cli", "Verifying storage consistency.");
689
690        let mut unwind_target: Option<BlockNumber> = None;
691        let mut update_unwind_target = |new_target: BlockNumber| {
692            if let Some(target) = unwind_target.as_mut() {
693                *target = (*target).min(new_target);
694            } else {
695                unwind_target = Some(new_target);
696            }
697        };
698
699        for segment in StaticFileSegment::iter() {
700            // Not integrated yet
701            if segment.is_block_meta() {
702                continue
703            }
704
705            if has_receipt_pruning && segment.is_receipts() {
706                // Pruned nodes (including full node) do not store receipts as static files.
707                continue
708            }
709
710            let initial_highest_block = self.get_highest_static_file_block(segment);
711
712            //  File consistency is broken if:
713            //
714            // * appending data was interrupted before a config commit, then data file will be
715            //   truncated according to the config.
716            //
717            // * pruning data was interrupted before a config commit, then we have deleted data that
718            //   we are expected to still have. We need to check the Database and unwind everything
719            //   accordingly.
720            if self.access.is_read_only() {
721                self.check_segment_consistency(segment)?;
722            } else {
723                // Fetching the writer will attempt to heal any file level inconsistency.
724                self.latest_writer(segment)?;
725            }
726
727            // Only applies to block-based static files. (Headers)
728            //
729            // The updated `highest_block` may have decreased if we healed from a pruning
730            // interruption.
731            let mut highest_block = self.get_highest_static_file_block(segment);
732            if initial_highest_block != highest_block {
733                info!(
734                    target: "reth::providers::static_file",
735                    ?initial_highest_block,
736                    unwind_target = highest_block,
737                    ?segment,
738                    "Setting unwind target."
739                );
740                update_unwind_target(highest_block.unwrap_or_default());
741            }
742
743            // Only applies to transaction-based static files. (Receipts & Transactions)
744            //
745            // Make sure the last transaction matches the last block from its indices, since a heal
746            // from a pruning interruption might have decreased the number of transactions without
747            // being able to update the last block of the static file segment.
748            let highest_tx = self.get_highest_static_file_tx(segment);
749            if let Some(highest_tx) = highest_tx {
750                let mut last_block = highest_block.unwrap_or_default();
751                loop {
752                    if let Some(indices) = provider.block_body_indices(last_block)? {
753                        if indices.last_tx_num() <= highest_tx {
754                            break
755                        }
756                    } else {
757                        // If the block body indices can not be found, then it means that static
758                        // files is ahead of database, and the `ensure_invariants` check will fix
759                        // it by comparing with stage checkpoints.
760                        break
761                    }
762                    if last_block == 0 {
763                        break
764                    }
765                    last_block -= 1;
766
767                    info!(
768                        target: "reth::providers::static_file",
769                        highest_block = self.get_highest_static_file_block(segment),
770                        unwind_target = last_block,
771                        ?segment,
772                        "Setting unwind target."
773                    );
774                    highest_block = Some(last_block);
775                    update_unwind_target(last_block);
776                }
777            }
778
779            if let Some(unwind) = match segment {
780                StaticFileSegment::Headers => self
781                    .ensure_invariants::<_, tables::Headers<N::BlockHeader>>(
782                        provider,
783                        segment,
784                        highest_block,
785                        highest_block,
786                    )?,
787                StaticFileSegment::Transactions => self
788                    .ensure_invariants::<_, tables::Transactions<N::SignedTx>>(
789                        provider,
790                        segment,
791                        highest_tx,
792                        highest_block,
793                    )?,
794                StaticFileSegment::Receipts => self
795                    .ensure_invariants::<_, tables::Receipts<N::Receipt>>(
796                        provider,
797                        segment,
798                        highest_tx,
799                        highest_block,
800                    )?,
801                StaticFileSegment::BlockMeta => self
802                    .ensure_invariants::<_, tables::BlockBodyIndices>(
803                        provider,
804                        segment,
805                        highest_block,
806                        highest_block,
807                    )?,
808            } {
809                update_unwind_target(unwind);
810            }
811        }
812
813        Ok(unwind_target.map(PipelineTarget::Unwind))
814    }
815
816    /// Checks consistency of the latest static file segment and throws an error if at fault.
817    /// Read-only.
818    pub fn check_segment_consistency(&self, segment: StaticFileSegment) -> ProviderResult<()> {
819        if let Some(latest_block) = self.get_highest_static_file_block(segment) {
820            let file_path =
821                self.directory().join(segment.filename(&self.find_fixed_range(latest_block)));
822
823            let jar = NippyJar::<SegmentHeader>::load(&file_path).map_err(ProviderError::other)?;
824
825            NippyJarChecker::new(jar).check_consistency().map_err(ProviderError::other)?;
826        }
827        Ok(())
828    }
829
830    /// Check invariants for each corresponding table and static file segment:
831    ///
832    /// * the corresponding database table should overlap or have continuity in their keys
833    ///   ([`TxNumber`] or [`BlockNumber`]).
834    /// * its highest block should match the stage checkpoint block number if it's equal or higher
835    ///   than the corresponding database table last entry.
836    ///   * If the checkpoint block is higher, then request a pipeline unwind to the static file
837    ///     block. This is expressed by returning [`Some`] with the requested pipeline unwind
838    ///     target.
839    ///   * If the checkpoint block is lower, then heal by removing rows from the static file. In
840    ///     this case, the rows will be removed and [`None`] will be returned.
841    ///
842    /// * If the database tables overlap with static files and have contiguous keys, or the
843    ///   checkpoint block matches the highest static files block, then [`None`] will be returned.
844    fn ensure_invariants<Provider, T: Table<Key = u64>>(
845        &self,
846        provider: &Provider,
847        segment: StaticFileSegment,
848        highest_static_file_entry: Option<u64>,
849        highest_static_file_block: Option<BlockNumber>,
850    ) -> ProviderResult<Option<BlockNumber>>
851    where
852        Provider: DBProvider + BlockReader + StageCheckpointReader,
853    {
854        let mut db_cursor = provider.tx_ref().cursor_read::<T>()?;
855
856        if let Some((db_first_entry, _)) = db_cursor.first()? {
857            if let (Some(highest_entry), Some(highest_block)) =
858                (highest_static_file_entry, highest_static_file_block)
859            {
860                // If there is a gap between the entry found in static file and
861                // database, then we have most likely lost static file data and need to unwind so we
862                // can load it again
863                if !(db_first_entry <= highest_entry || highest_entry + 1 == db_first_entry) {
864                    info!(
865                        target: "reth::providers::static_file",
866                        ?db_first_entry,
867                        ?highest_entry,
868                        unwind_target = highest_block,
869                        ?segment,
870                        "Setting unwind target."
871                    );
872                    return Ok(Some(highest_block))
873                }
874            }
875
876            if let Some((db_last_entry, _)) = db_cursor.last()? {
877                if highest_static_file_entry
878                    .is_none_or(|highest_entry| db_last_entry > highest_entry)
879                {
880                    return Ok(None)
881                }
882            }
883        }
884
885        let highest_static_file_entry = highest_static_file_entry.unwrap_or_default();
886        let highest_static_file_block = highest_static_file_block.unwrap_or_default();
887
888        // If static file entry is ahead of the database entries, then ensure the checkpoint block
889        // number matches.
890        let checkpoint_block_number = provider
891            .get_stage_checkpoint(match segment {
892                StaticFileSegment::Headers => StageId::Headers,
893                StaticFileSegment::Transactions | StaticFileSegment::BlockMeta => StageId::Bodies,
894                StaticFileSegment::Receipts => StageId::Execution,
895            })?
896            .unwrap_or_default()
897            .block_number;
898
899        // If the checkpoint is ahead, then we lost static file data. May be data corruption.
900        if checkpoint_block_number > highest_static_file_block {
901            info!(
902                target: "reth::providers::static_file",
903                checkpoint_block_number,
904                unwind_target = highest_static_file_block,
905                ?segment,
906                "Setting unwind target."
907            );
908            return Ok(Some(highest_static_file_block))
909        }
910
911        // If the checkpoint is behind, then we failed to do a database commit **but committed** to
912        // static files on executing a stage, or the reverse on unwinding a stage.
913        // All we need to do is to prune the extra static file rows.
914        if checkpoint_block_number < highest_static_file_block {
915            info!(
916                target: "reth::providers",
917                ?segment,
918                from = highest_static_file_block,
919                to = checkpoint_block_number,
920                "Unwinding static file segment."
921            );
922            let mut writer = self.latest_writer(segment)?;
923            if segment.is_headers() {
924                // TODO(joshie): is_block_meta
925                writer.prune_headers(highest_static_file_block - checkpoint_block_number)?;
926            } else if let Some(block) = provider.block_body_indices(checkpoint_block_number)? {
927                // todo joshie: is querying block_body_indices a potential issue once bbi is moved
928                // to sf as well
929                let number = highest_static_file_entry - block.last_tx_num();
930                if segment.is_receipts() {
931                    writer.prune_receipts(number, checkpoint_block_number)?;
932                } else {
933                    writer.prune_transactions(number, checkpoint_block_number)?;
934                }
935            }
936            writer.commit()?;
937        }
938
939        Ok(None)
940    }
941
942    /// Gets the highest static file block if it exists for a static file segment.
943    ///
944    /// If there is nothing on disk for the given segment, this will return [`None`].
945    pub fn get_highest_static_file_block(&self, segment: StaticFileSegment) -> Option<BlockNumber> {
946        self.static_files_max_block.read().get(&segment).copied()
947    }
948
949    /// Gets the highest static file transaction.
950    ///
951    /// If there is nothing on disk for the given segment, this will return [`None`].
952    pub fn get_highest_static_file_tx(&self, segment: StaticFileSegment) -> Option<TxNumber> {
953        self.static_files_tx_index
954            .read()
955            .get(&segment)
956            .and_then(|index| index.last_key_value().map(|(last_tx, _)| *last_tx))
957    }
958
959    /// Gets the highest static file block for all segments.
960    pub fn get_highest_static_files(&self) -> HighestStaticFiles {
961        HighestStaticFiles {
962            headers: self.get_highest_static_file_block(StaticFileSegment::Headers),
963            receipts: self.get_highest_static_file_block(StaticFileSegment::Receipts),
964            transactions: self.get_highest_static_file_block(StaticFileSegment::Transactions),
965            block_meta: self.get_highest_static_file_block(StaticFileSegment::BlockMeta),
966        }
967    }
968
969    /// Iterates through segment `static_files` in reverse order, executing a function until it
970    /// returns some object. Useful for finding objects by [`TxHash`] or [`BlockHash`].
971    pub fn find_static_file<T>(
972        &self,
973        segment: StaticFileSegment,
974        func: impl Fn(StaticFileJarProvider<'_, N>) -> ProviderResult<Option<T>>,
975    ) -> ProviderResult<Option<T>> {
976        if let Some(highest_block) = self.get_highest_static_file_block(segment) {
977            let mut range = self.find_fixed_range(highest_block);
978            while range.end() > 0 {
979                if let Some(res) = func(self.get_or_create_jar_provider(segment, &range)?)? {
980                    return Ok(Some(res))
981                }
982                range = SegmentRangeInclusive::new(
983                    range.start().saturating_sub(self.blocks_per_file),
984                    range.end().saturating_sub(self.blocks_per_file),
985                );
986            }
987        }
988
989        Ok(None)
990    }
991
992    /// Fetches data within a specified range across multiple static files.
993    ///
994    /// This function iteratively retrieves data using `get_fn` for each item in the given range.
995    /// It continues fetching until the end of the range is reached or the provided `predicate`
996    /// returns false.
997    pub fn fetch_range_with_predicate<T, F, P>(
998        &self,
999        segment: StaticFileSegment,
1000        range: Range<u64>,
1001        mut get_fn: F,
1002        mut predicate: P,
1003    ) -> ProviderResult<Vec<T>>
1004    where
1005        F: FnMut(&mut StaticFileCursor<'_>, u64) -> ProviderResult<Option<T>>,
1006        P: FnMut(&T) -> bool,
1007    {
1008        let get_provider = |start: u64| {
1009            if segment.is_block_based() {
1010                self.get_segment_provider_from_block(segment, start, None)
1011            } else {
1012                self.get_segment_provider_from_transaction(segment, start, None)
1013            }
1014        };
1015
1016        let mut result = Vec::with_capacity((range.end - range.start).min(100) as usize);
1017        let mut provider = get_provider(range.start)?;
1018        let mut cursor = provider.cursor()?;
1019
1020        // advances number in range
1021        'outer: for number in range {
1022            // The `retrying` flag ensures a single retry attempt per `number`. If `get_fn` fails to
1023            // access data in two different static files, it halts further attempts by returning
1024            // an error, effectively preventing infinite retry loops.
1025            let mut retrying = false;
1026
1027            // advances static files if `get_fn` returns None
1028            'inner: loop {
1029                match get_fn(&mut cursor, number)? {
1030                    Some(res) => {
1031                        if !predicate(&res) {
1032                            break 'outer
1033                        }
1034                        result.push(res);
1035                        break 'inner
1036                    }
1037                    None => {
1038                        if retrying {
1039                            warn!(
1040                                target: "provider::static_file",
1041                                ?segment,
1042                                ?number,
1043                                "Could not find block or tx number on a range request"
1044                            );
1045
1046                            let err = if segment.is_block_based() {
1047                                ProviderError::MissingStaticFileBlock(segment, number)
1048                            } else {
1049                                ProviderError::MissingStaticFileTx(segment, number)
1050                            };
1051                            return Err(err)
1052                        }
1053                        // There is a very small chance of hitting a deadlock if two consecutive
1054                        // static files share the same bucket in the
1055                        // internal dashmap and we don't drop the current provider
1056                        // before requesting the next one.
1057                        drop(cursor);
1058                        drop(provider);
1059                        provider = get_provider(number)?;
1060                        cursor = provider.cursor()?;
1061                        retrying = true;
1062                    }
1063                }
1064            }
1065        }
1066
1067        Ok(result)
1068    }
1069
1070    /// Fetches data within a specified range across multiple static files.
1071    ///
1072    /// Returns an iterator over the data
1073    pub fn fetch_range_iter<'a, T, F>(
1074        &'a self,
1075        segment: StaticFileSegment,
1076        range: Range<u64>,
1077        get_fn: F,
1078    ) -> ProviderResult<impl Iterator<Item = ProviderResult<T>> + 'a>
1079    where
1080        F: Fn(&mut StaticFileCursor<'_>, u64) -> ProviderResult<Option<T>> + 'a,
1081        T: std::fmt::Debug,
1082    {
1083        let get_provider = move |start: u64| {
1084            if segment.is_block_based() {
1085                self.get_segment_provider_from_block(segment, start, None)
1086            } else {
1087                self.get_segment_provider_from_transaction(segment, start, None)
1088            }
1089        };
1090
1091        let mut provider = Some(get_provider(range.start)?);
1092        Ok(range.filter_map(move |number| {
1093            match get_fn(&mut provider.as_ref().expect("qed").cursor().ok()?, number).transpose() {
1094                Some(result) => Some(result),
1095                None => {
1096                    // There is a very small chance of hitting a deadlock if two consecutive static
1097                    // files share the same bucket in the internal dashmap and
1098                    // we don't drop the current provider before requesting the
1099                    // next one.
1100                    provider.take();
1101                    provider = Some(get_provider(number).ok()?);
1102                    get_fn(&mut provider.as_ref().expect("qed").cursor().ok()?, number).transpose()
1103                }
1104            }
1105        }))
1106    }
1107
1108    /// Returns directory where `static_files` are located.
1109    pub fn directory(&self) -> &Path {
1110        &self.path
1111    }
1112
1113    /// Retrieves data from the database or static file, wherever it's available.
1114    ///
1115    /// # Arguments
1116    /// * `segment` - The segment of the static file to check against.
1117    /// * `index_key` - Requested index key, usually a block or transaction number.
1118    /// * `fetch_from_static_file` - A closure that defines how to fetch the data from the static
1119    ///   file provider.
1120    /// * `fetch_from_database` - A closure that defines how to fetch the data from the database
1121    ///   when the static file doesn't contain the required data or is not available.
1122    pub fn get_with_static_file_or_database<T, FS, FD>(
1123        &self,
1124        segment: StaticFileSegment,
1125        number: u64,
1126        fetch_from_static_file: FS,
1127        fetch_from_database: FD,
1128    ) -> ProviderResult<Option<T>>
1129    where
1130        FS: Fn(&Self) -> ProviderResult<Option<T>>,
1131        FD: Fn() -> ProviderResult<Option<T>>,
1132    {
1133        // If there is, check the maximum block or transaction number of the segment.
1134        let static_file_upper_bound = if segment.is_block_based() {
1135            self.get_highest_static_file_block(segment)
1136        } else {
1137            self.get_highest_static_file_tx(segment)
1138        };
1139
1140        if static_file_upper_bound
1141            .is_some_and(|static_file_upper_bound| static_file_upper_bound >= number)
1142        {
1143            return fetch_from_static_file(self)
1144        }
1145        fetch_from_database()
1146    }
1147
1148    /// Gets data within a specified range, potentially spanning different `static_files` and
1149    /// database.
1150    ///
1151    /// # Arguments
1152    /// * `segment` - The segment of the static file to query.
1153    /// * `block_range` - The range of data to fetch.
1154    /// * `fetch_from_static_file` - A function to fetch data from the `static_file`.
1155    /// * `fetch_from_database` - A function to fetch data from the database.
1156    /// * `predicate` - A function used to evaluate each item in the fetched data. Fetching is
1157    ///   terminated when this function returns false, thereby filtering the data based on the
1158    ///   provided condition.
1159    pub fn get_range_with_static_file_or_database<T, P, FS, FD>(
1160        &self,
1161        segment: StaticFileSegment,
1162        mut block_or_tx_range: Range<u64>,
1163        fetch_from_static_file: FS,
1164        mut fetch_from_database: FD,
1165        mut predicate: P,
1166    ) -> ProviderResult<Vec<T>>
1167    where
1168        FS: Fn(&Self, Range<u64>, &mut P) -> ProviderResult<Vec<T>>,
1169        FD: FnMut(Range<u64>, P) -> ProviderResult<Vec<T>>,
1170        P: FnMut(&T) -> bool,
1171    {
1172        let mut data = Vec::new();
1173
1174        // If there is, check the maximum block or transaction number of the segment.
1175        if let Some(static_file_upper_bound) = if segment.is_block_based() {
1176            self.get_highest_static_file_block(segment)
1177        } else {
1178            self.get_highest_static_file_tx(segment)
1179        } {
1180            if block_or_tx_range.start <= static_file_upper_bound {
1181                let end = block_or_tx_range.end.min(static_file_upper_bound + 1);
1182                data.extend(fetch_from_static_file(
1183                    self,
1184                    block_or_tx_range.start..end,
1185                    &mut predicate,
1186                )?);
1187                block_or_tx_range.start = end;
1188            }
1189        }
1190
1191        if block_or_tx_range.end > block_or_tx_range.start {
1192            data.extend(fetch_from_database(block_or_tx_range, predicate)?)
1193        }
1194
1195        Ok(data)
1196    }
1197
1198    /// Returns `static_files` directory
1199    #[cfg(any(test, feature = "test-utils"))]
1200    pub fn path(&self) -> &Path {
1201        &self.path
1202    }
1203
1204    /// Returns `static_files` transaction index
1205    #[cfg(any(test, feature = "test-utils"))]
1206    pub fn tx_index(&self) -> &RwLock<SegmentRanges> {
1207        &self.static_files_tx_index
1208    }
1209}
1210
1211/// Helper trait to manage different [`StaticFileProviderRW`] of an `Arc<StaticFileProvider`
1212pub trait StaticFileWriter {
1213    /// The primitives type used by the static file provider.
1214    type Primitives: Send + Sync + 'static;
1215
1216    /// Returns a mutable reference to a [`StaticFileProviderRW`] of a [`StaticFileSegment`].
1217    fn get_writer(
1218        &self,
1219        block: BlockNumber,
1220        segment: StaticFileSegment,
1221    ) -> ProviderResult<StaticFileProviderRWRefMut<'_, Self::Primitives>>;
1222
1223    /// Returns a mutable reference to a [`StaticFileProviderRW`] of the latest
1224    /// [`StaticFileSegment`].
1225    fn latest_writer(
1226        &self,
1227        segment: StaticFileSegment,
1228    ) -> ProviderResult<StaticFileProviderRWRefMut<'_, Self::Primitives>>;
1229
1230    /// Commits all changes of all [`StaticFileProviderRW`] of all [`StaticFileSegment`].
1231    fn commit(&self) -> ProviderResult<()>;
1232}
1233
1234impl<N: NodePrimitives> StaticFileWriter for StaticFileProvider<N> {
1235    type Primitives = N;
1236
1237    fn get_writer(
1238        &self,
1239        block: BlockNumber,
1240        segment: StaticFileSegment,
1241    ) -> ProviderResult<StaticFileProviderRWRefMut<'_, Self::Primitives>> {
1242        if self.access.is_read_only() {
1243            return Err(ProviderError::ReadOnlyStaticFileAccess)
1244        }
1245
1246        trace!(target: "provider::static_file", ?block, ?segment, "Getting static file writer.");
1247        self.writers.get_or_create(segment, || {
1248            StaticFileProviderRW::new(segment, block, Arc::downgrade(&self.0), self.metrics.clone())
1249        })
1250    }
1251
1252    fn latest_writer(
1253        &self,
1254        segment: StaticFileSegment,
1255    ) -> ProviderResult<StaticFileProviderRWRefMut<'_, Self::Primitives>> {
1256        self.get_writer(self.get_highest_static_file_block(segment).unwrap_or_default(), segment)
1257    }
1258
1259    fn commit(&self) -> ProviderResult<()> {
1260        self.writers.commit()
1261    }
1262}
1263
1264impl<N: NodePrimitives<BlockHeader: Value>> HeaderProvider for StaticFileProvider<N> {
1265    type Header = N::BlockHeader;
1266
1267    fn header(&self, block_hash: &BlockHash) -> ProviderResult<Option<Self::Header>> {
1268        self.find_static_file(StaticFileSegment::Headers, |jar_provider| {
1269            Ok(jar_provider
1270                .cursor()?
1271                .get_two::<HeaderWithHashMask<Self::Header>>(block_hash.into())?
1272                .and_then(|(header, hash)| {
1273                    if &hash == block_hash {
1274                        return Some(header)
1275                    }
1276                    None
1277                }))
1278        })
1279    }
1280
1281    fn header_by_number(&self, num: BlockNumber) -> ProviderResult<Option<Self::Header>> {
1282        self.get_segment_provider_from_block(StaticFileSegment::Headers, num, None)
1283            .and_then(|provider| provider.header_by_number(num))
1284            .or_else(|err| {
1285                if let ProviderError::MissingStaticFileBlock(_, _) = err {
1286                    Ok(None)
1287                } else {
1288                    Err(err)
1289                }
1290            })
1291    }
1292
1293    fn header_td(&self, block_hash: &BlockHash) -> ProviderResult<Option<U256>> {
1294        self.find_static_file(StaticFileSegment::Headers, |jar_provider| {
1295            Ok(jar_provider
1296                .cursor()?
1297                .get_two::<TDWithHashMask>(block_hash.into())?
1298                .and_then(|(td, hash)| (&hash == block_hash).then_some(td.0)))
1299        })
1300    }
1301
1302    fn header_td_by_number(&self, num: BlockNumber) -> ProviderResult<Option<U256>> {
1303        self.get_segment_provider_from_block(StaticFileSegment::Headers, num, None)
1304            .and_then(|provider| provider.header_td_by_number(num))
1305            .or_else(|err| {
1306                if let ProviderError::MissingStaticFileBlock(_, _) = err {
1307                    Ok(None)
1308                } else {
1309                    Err(err)
1310                }
1311            })
1312    }
1313
1314    fn headers_range(
1315        &self,
1316        range: impl RangeBounds<BlockNumber>,
1317    ) -> ProviderResult<Vec<Self::Header>> {
1318        self.fetch_range_with_predicate(
1319            StaticFileSegment::Headers,
1320            to_range(range),
1321            |cursor, number| cursor.get_one::<HeaderMask<Self::Header>>(number.into()),
1322            |_| true,
1323        )
1324    }
1325
1326    fn sealed_header(
1327        &self,
1328        num: BlockNumber,
1329    ) -> ProviderResult<Option<SealedHeader<Self::Header>>> {
1330        self.get_segment_provider_from_block(StaticFileSegment::Headers, num, None)
1331            .and_then(|provider| provider.sealed_header(num))
1332            .or_else(|err| {
1333                if let ProviderError::MissingStaticFileBlock(_, _) = err {
1334                    Ok(None)
1335                } else {
1336                    Err(err)
1337                }
1338            })
1339    }
1340
1341    fn sealed_headers_while(
1342        &self,
1343        range: impl RangeBounds<BlockNumber>,
1344        predicate: impl FnMut(&SealedHeader<Self::Header>) -> bool,
1345    ) -> ProviderResult<Vec<SealedHeader<Self::Header>>> {
1346        self.fetch_range_with_predicate(
1347            StaticFileSegment::Headers,
1348            to_range(range),
1349            |cursor, number| {
1350                Ok(cursor
1351                    .get_two::<HeaderWithHashMask<Self::Header>>(number.into())?
1352                    .map(|(header, hash)| SealedHeader::new(header, hash)))
1353            },
1354            predicate,
1355        )
1356    }
1357}
1358
1359impl<N: NodePrimitives> BlockHashReader for StaticFileProvider<N> {
1360    fn block_hash(&self, num: u64) -> ProviderResult<Option<B256>> {
1361        self.get_segment_provider_from_block(StaticFileSegment::Headers, num, None)?.block_hash(num)
1362    }
1363
1364    fn canonical_hashes_range(
1365        &self,
1366        start: BlockNumber,
1367        end: BlockNumber,
1368    ) -> ProviderResult<Vec<B256>> {
1369        self.fetch_range_with_predicate(
1370            StaticFileSegment::Headers,
1371            start..end,
1372            |cursor, number| cursor.get_one::<BlockHashMask>(number.into()),
1373            |_| true,
1374        )
1375    }
1376}
1377
1378impl<N: NodePrimitives<SignedTx: Value + SignedTransaction, Receipt: Value>> ReceiptProvider
1379    for StaticFileProvider<N>
1380{
1381    type Receipt = N::Receipt;
1382
1383    fn receipt(&self, num: TxNumber) -> ProviderResult<Option<Self::Receipt>> {
1384        self.get_segment_provider_from_transaction(StaticFileSegment::Receipts, num, None)
1385            .and_then(|provider| provider.receipt(num))
1386            .or_else(|err| {
1387                if let ProviderError::MissingStaticFileTx(_, _) = err {
1388                    Ok(None)
1389                } else {
1390                    Err(err)
1391                }
1392            })
1393    }
1394
1395    fn receipt_by_hash(&self, hash: TxHash) -> ProviderResult<Option<Self::Receipt>> {
1396        if let Some(num) = self.transaction_id(hash)? {
1397            return self.receipt(num)
1398        }
1399        Ok(None)
1400    }
1401
1402    fn receipts_by_block(
1403        &self,
1404        _block: BlockHashOrNumber,
1405    ) -> ProviderResult<Option<Vec<Self::Receipt>>> {
1406        unreachable!()
1407    }
1408
1409    fn receipts_by_tx_range(
1410        &self,
1411        range: impl RangeBounds<TxNumber>,
1412    ) -> ProviderResult<Vec<Self::Receipt>> {
1413        self.fetch_range_with_predicate(
1414            StaticFileSegment::Receipts,
1415            to_range(range),
1416            |cursor, number| cursor.get_one::<ReceiptMask<Self::Receipt>>(number.into()),
1417            |_| true,
1418        )
1419    }
1420}
1421
1422impl<N: FullNodePrimitives<SignedTx: Value, Receipt: Value, BlockHeader: Value>>
1423    TransactionsProviderExt for StaticFileProvider<N>
1424{
1425    fn transaction_hashes_by_range(
1426        &self,
1427        tx_range: Range<TxNumber>,
1428    ) -> ProviderResult<Vec<(TxHash, TxNumber)>> {
1429        let tx_range_size = (tx_range.end - tx_range.start) as usize;
1430
1431        // Transactions are different size, so chunks will not all take the same processing time. If
1432        // chunks are too big, there will be idle threads waiting for work. Choosing an
1433        // arbitrary smaller value to make sure it doesn't happen.
1434        let chunk_size = 100;
1435
1436        // iterator over the chunks
1437        let chunks = tx_range
1438            .clone()
1439            .step_by(chunk_size)
1440            .map(|start| start..std::cmp::min(start + chunk_size as u64, tx_range.end));
1441        let mut channels = Vec::with_capacity(tx_range_size.div_ceil(chunk_size));
1442
1443        for chunk_range in chunks {
1444            let (channel_tx, channel_rx) = mpsc::channel();
1445            channels.push(channel_rx);
1446
1447            let manager = self.clone();
1448
1449            // Spawn the task onto the global rayon pool
1450            // This task will send the results through the channel after it has calculated
1451            // the hash.
1452            rayon::spawn(move || {
1453                let mut rlp_buf = Vec::with_capacity(128);
1454                let _ = manager.fetch_range_with_predicate(
1455                    StaticFileSegment::Transactions,
1456                    chunk_range,
1457                    |cursor, number| {
1458                        Ok(cursor
1459                            .get_one::<TransactionMask<Self::Transaction>>(number.into())?
1460                            .map(|transaction| {
1461                                rlp_buf.clear();
1462                                let _ = channel_tx
1463                                    .send(calculate_hash((number, transaction), &mut rlp_buf));
1464                            }))
1465                    },
1466                    |_| true,
1467                );
1468            });
1469        }
1470
1471        let mut tx_list = Vec::with_capacity(tx_range_size);
1472
1473        // Iterate over channels and append the tx hashes unsorted
1474        for channel in channels {
1475            while let Ok(tx) = channel.recv() {
1476                let (tx_hash, tx_id) = tx.map_err(|boxed| *boxed)?;
1477                tx_list.push((tx_hash, tx_id));
1478            }
1479        }
1480
1481        Ok(tx_list)
1482    }
1483}
1484
1485impl<N: NodePrimitives<SignedTx: Decompress + SignedTransaction>> TransactionsProvider
1486    for StaticFileProvider<N>
1487{
1488    type Transaction = N::SignedTx;
1489
1490    fn transaction_id(&self, tx_hash: TxHash) -> ProviderResult<Option<TxNumber>> {
1491        self.find_static_file(StaticFileSegment::Transactions, |jar_provider| {
1492            let mut cursor = jar_provider.cursor()?;
1493            if cursor
1494                .get_one::<TransactionMask<Self::Transaction>>((&tx_hash).into())?
1495                .and_then(|tx| (tx.trie_hash() == tx_hash).then_some(tx))
1496                .is_some()
1497            {
1498                Ok(cursor.number())
1499            } else {
1500                Ok(None)
1501            }
1502        })
1503    }
1504
1505    fn transaction_by_id(&self, num: TxNumber) -> ProviderResult<Option<Self::Transaction>> {
1506        self.get_segment_provider_from_transaction(StaticFileSegment::Transactions, num, None)
1507            .and_then(|provider| provider.transaction_by_id(num))
1508            .or_else(|err| {
1509                if let ProviderError::MissingStaticFileTx(_, _) = err {
1510                    Ok(None)
1511                } else {
1512                    Err(err)
1513                }
1514            })
1515    }
1516
1517    fn transaction_by_id_unhashed(
1518        &self,
1519        num: TxNumber,
1520    ) -> ProviderResult<Option<Self::Transaction>> {
1521        self.get_segment_provider_from_transaction(StaticFileSegment::Transactions, num, None)
1522            .and_then(|provider| provider.transaction_by_id_unhashed(num))
1523            .or_else(|err| {
1524                if let ProviderError::MissingStaticFileTx(_, _) = err {
1525                    Ok(None)
1526                } else {
1527                    Err(err)
1528                }
1529            })
1530    }
1531
1532    fn transaction_by_hash(&self, hash: TxHash) -> ProviderResult<Option<Self::Transaction>> {
1533        self.find_static_file(StaticFileSegment::Transactions, |jar_provider| {
1534            Ok(jar_provider
1535                .cursor()?
1536                .get_one::<TransactionMask<Self::Transaction>>((&hash).into())?
1537                .and_then(|tx| (tx.trie_hash() == hash).then_some(tx)))
1538        })
1539    }
1540
1541    fn transaction_by_hash_with_meta(
1542        &self,
1543        _hash: TxHash,
1544    ) -> ProviderResult<Option<(Self::Transaction, TransactionMeta)>> {
1545        // Required data not present in static_files
1546        Err(ProviderError::UnsupportedProvider)
1547    }
1548
1549    fn transaction_block(&self, _id: TxNumber) -> ProviderResult<Option<BlockNumber>> {
1550        // Required data not present in static_files
1551        Err(ProviderError::UnsupportedProvider)
1552    }
1553
1554    fn transactions_by_block(
1555        &self,
1556        _block_id: BlockHashOrNumber,
1557    ) -> ProviderResult<Option<Vec<Self::Transaction>>> {
1558        // Required data not present in static_files
1559        Err(ProviderError::UnsupportedProvider)
1560    }
1561
1562    fn transactions_by_block_range(
1563        &self,
1564        _range: impl RangeBounds<BlockNumber>,
1565    ) -> ProviderResult<Vec<Vec<Self::Transaction>>> {
1566        // Required data not present in static_files
1567        Err(ProviderError::UnsupportedProvider)
1568    }
1569
1570    fn transactions_by_tx_range(
1571        &self,
1572        range: impl RangeBounds<TxNumber>,
1573    ) -> ProviderResult<Vec<Self::Transaction>> {
1574        self.fetch_range_with_predicate(
1575            StaticFileSegment::Transactions,
1576            to_range(range),
1577            |cursor, number| cursor.get_one::<TransactionMask<Self::Transaction>>(number.into()),
1578            |_| true,
1579        )
1580    }
1581
1582    fn senders_by_tx_range(
1583        &self,
1584        range: impl RangeBounds<TxNumber>,
1585    ) -> ProviderResult<Vec<Address>> {
1586        let txes = self.transactions_by_tx_range(range)?;
1587        Ok(reth_primitives_traits::transaction::recover::recover_signers(&txes)?)
1588    }
1589
1590    fn transaction_sender(&self, id: TxNumber) -> ProviderResult<Option<Address>> {
1591        match self.transaction_by_id_unhashed(id)? {
1592            Some(tx) => Ok(tx.recover_signer().ok()),
1593            None => Ok(None),
1594        }
1595    }
1596}
1597
1598/* Cannot be successfully implemented but must exist for trait requirements */
1599
1600impl<N: NodePrimitives> BlockNumReader for StaticFileProvider<N> {
1601    fn chain_info(&self) -> ProviderResult<ChainInfo> {
1602        // Required data not present in static_files
1603        Err(ProviderError::UnsupportedProvider)
1604    }
1605
1606    fn best_block_number(&self) -> ProviderResult<BlockNumber> {
1607        // Required data not present in static_files
1608        Err(ProviderError::UnsupportedProvider)
1609    }
1610
1611    fn last_block_number(&self) -> ProviderResult<BlockNumber> {
1612        // Required data not present in static_files
1613        Err(ProviderError::UnsupportedProvider)
1614    }
1615
1616    fn block_number(&self, _hash: B256) -> ProviderResult<Option<BlockNumber>> {
1617        // Required data not present in static_files
1618        Err(ProviderError::UnsupportedProvider)
1619    }
1620}
1621
1622impl<N: FullNodePrimitives<SignedTx: Value, Receipt: Value, BlockHeader: Value>> BlockReader
1623    for StaticFileProvider<N>
1624{
1625    type Block = N::Block;
1626
1627    fn find_block_by_hash(
1628        &self,
1629        _hash: B256,
1630        _source: BlockSource,
1631    ) -> ProviderResult<Option<Self::Block>> {
1632        // Required data not present in static_files
1633        Err(ProviderError::UnsupportedProvider)
1634    }
1635
1636    fn block(&self, _id: BlockHashOrNumber) -> ProviderResult<Option<Self::Block>> {
1637        // Required data not present in static_files
1638        Err(ProviderError::UnsupportedProvider)
1639    }
1640
1641    fn pending_block(&self) -> ProviderResult<Option<SealedBlock<Self::Block>>> {
1642        // Required data not present in static_files
1643        Err(ProviderError::UnsupportedProvider)
1644    }
1645
1646    fn pending_block_with_senders(&self) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
1647        // Required data not present in static_files
1648        Err(ProviderError::UnsupportedProvider)
1649    }
1650
1651    fn pending_block_and_receipts(
1652        &self,
1653    ) -> ProviderResult<Option<(SealedBlock<Self::Block>, Vec<Self::Receipt>)>> {
1654        // Required data not present in static_files
1655        Err(ProviderError::UnsupportedProvider)
1656    }
1657
1658    fn recovered_block(
1659        &self,
1660        _id: BlockHashOrNumber,
1661        _transaction_kind: TransactionVariant,
1662    ) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
1663        // Required data not present in static_files
1664        Err(ProviderError::UnsupportedProvider)
1665    }
1666
1667    fn sealed_block_with_senders(
1668        &self,
1669        _id: BlockHashOrNumber,
1670        _transaction_kind: TransactionVariant,
1671    ) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
1672        // Required data not present in static_files
1673        Err(ProviderError::UnsupportedProvider)
1674    }
1675
1676    fn block_range(&self, _range: RangeInclusive<BlockNumber>) -> ProviderResult<Vec<Self::Block>> {
1677        // Required data not present in static_files
1678        Err(ProviderError::UnsupportedProvider)
1679    }
1680
1681    fn block_with_senders_range(
1682        &self,
1683        _range: RangeInclusive<BlockNumber>,
1684    ) -> ProviderResult<Vec<RecoveredBlock<Self::Block>>> {
1685        Err(ProviderError::UnsupportedProvider)
1686    }
1687
1688    fn recovered_block_range(
1689        &self,
1690        _range: RangeInclusive<BlockNumber>,
1691    ) -> ProviderResult<Vec<RecoveredBlock<Self::Block>>> {
1692        Err(ProviderError::UnsupportedProvider)
1693    }
1694}
1695
1696impl<N: NodePrimitives> WithdrawalsProvider for StaticFileProvider<N> {
1697    fn withdrawals_by_block(
1698        &self,
1699        id: BlockHashOrNumber,
1700        timestamp: u64,
1701    ) -> ProviderResult<Option<Withdrawals>> {
1702        if let Some(num) = id.as_number() {
1703            return self
1704                .get_segment_provider_from_block(StaticFileSegment::BlockMeta, num, None)
1705                .and_then(|provider| provider.withdrawals_by_block(id, timestamp))
1706                .or_else(|err| {
1707                    if let ProviderError::MissingStaticFileBlock(_, _) = err {
1708                        Ok(None)
1709                    } else {
1710                        Err(err)
1711                    }
1712                })
1713        }
1714        // Only accepts block number queries
1715        Err(ProviderError::UnsupportedProvider)
1716    }
1717}
1718
1719impl<N: FullNodePrimitives<BlockHeader: Value>> OmmersProvider for StaticFileProvider<N> {
1720    fn ommers(&self, id: BlockHashOrNumber) -> ProviderResult<Option<Vec<Self::Header>>> {
1721        if let Some(num) = id.as_number() {
1722            return self
1723                .get_segment_provider_from_block(StaticFileSegment::BlockMeta, num, None)
1724                .and_then(|provider| provider.ommers(id))
1725                .or_else(|err| {
1726                    if let ProviderError::MissingStaticFileBlock(_, _) = err {
1727                        Ok(None)
1728                    } else {
1729                        Err(err)
1730                    }
1731                })
1732        }
1733        // Only accepts block number queries
1734        Err(ProviderError::UnsupportedProvider)
1735    }
1736}
1737
1738impl<N: NodePrimitives> BlockBodyIndicesProvider for StaticFileProvider<N> {
1739    fn block_body_indices(&self, num: u64) -> ProviderResult<Option<StoredBlockBodyIndices>> {
1740        self.get_segment_provider_from_block(StaticFileSegment::BlockMeta, num, None)
1741            .and_then(|provider| provider.block_body_indices(num))
1742            .or_else(|err| {
1743                if let ProviderError::MissingStaticFileBlock(_, _) = err {
1744                    Ok(None)
1745                } else {
1746                    Err(err)
1747                }
1748            })
1749    }
1750
1751    fn block_body_indices_range(
1752        &self,
1753        range: RangeInclusive<BlockNumber>,
1754    ) -> ProviderResult<Vec<StoredBlockBodyIndices>> {
1755        self.fetch_range_with_predicate(
1756            StaticFileSegment::BlockMeta,
1757            *range.start()..*range.end() + 1,
1758            |cursor, number| cursor.get_one::<BodyIndicesMask>(number.into()),
1759            |_| true,
1760        )
1761    }
1762}
1763
1764impl<N: NodePrimitives> StatsReader for StaticFileProvider<N> {
1765    fn count_entries<T: Table>(&self) -> ProviderResult<usize> {
1766        match T::NAME {
1767            tables::CanonicalHeaders::NAME |
1768            tables::Headers::<Header>::NAME |
1769            tables::HeaderTerminalDifficulties::NAME => Ok(self
1770                .get_highest_static_file_block(StaticFileSegment::Headers)
1771                .map(|block| block + 1)
1772                .unwrap_or_default()
1773                as usize),
1774            tables::Receipts::<Receipt>::NAME => Ok(self
1775                .get_highest_static_file_tx(StaticFileSegment::Receipts)
1776                .map(|receipts| receipts + 1)
1777                .unwrap_or_default() as usize),
1778            tables::Transactions::<TransactionSigned>::NAME => Ok(self
1779                .get_highest_static_file_tx(StaticFileSegment::Transactions)
1780                .map(|txs| txs + 1)
1781                .unwrap_or_default()
1782                as usize),
1783            _ => Err(ProviderError::UnsupportedProvider),
1784        }
1785    }
1786}
1787
1788/// Calculates the tx hash for the given transaction and its id.
1789#[inline]
1790fn calculate_hash<T>(
1791    entry: (TxNumber, T),
1792    rlp_buf: &mut Vec<u8>,
1793) -> Result<(B256, TxNumber), Box<ProviderError>>
1794where
1795    T: Encodable2718,
1796{
1797    let (tx_id, tx) = entry;
1798    tx.encode_2718(rlp_buf);
1799    Ok((keccak256(rlp_buf), tx_id))
1800}