reth_provider/providers/static_file/
manager.rs

1use super::{
2    metrics::StaticFileProviderMetrics, writer::StaticFileWriters, LoadedJar,
3    StaticFileJarProvider, StaticFileProviderRW, StaticFileProviderRWRefMut,
4};
5use crate::{
6    to_range, BlockHashReader, BlockNumReader, BlockReader, BlockSource, HeaderProvider,
7    ReceiptProvider, StageCheckpointReader, StatsReader, TransactionVariant, TransactionsProvider,
8    TransactionsProviderExt, WithdrawalsProvider,
9};
10use alloy_consensus::{transaction::TransactionMeta, Header};
11use alloy_eips::{eip2718::Encodable2718, eip4895::Withdrawals, BlockHashOrNumber};
12use alloy_primitives::{
13    b256, keccak256, Address, BlockHash, BlockNumber, TxHash, TxNumber, B256, U256,
14};
15use dashmap::DashMap;
16use notify::{RecommendedWatcher, RecursiveMode, Watcher};
17use parking_lot::RwLock;
18use reth_chainspec::{ChainInfo, ChainSpecProvider, EthChainSpec};
19use reth_db::{
20    lockfile::StorageLock,
21    static_file::{
22        iter_static_files, BlockHashMask, BodyIndicesMask, HeaderMask, HeaderWithHashMask,
23        ReceiptMask, StaticFileCursor, TDWithHashMask, TransactionMask,
24    },
25};
26use reth_db_api::{
27    cursor::DbCursorRO,
28    models::StoredBlockBodyIndices,
29    table::{Decompress, Table, Value},
30    tables,
31    transaction::DbTx,
32};
33use reth_ethereum_primitives::{Receipt, TransactionSigned};
34use reth_nippy_jar::{NippyJar, NippyJarChecker, CONFIG_FILE_EXTENSION};
35use reth_node_types::{FullNodePrimitives, NodePrimitives};
36use reth_primitives_traits::{RecoveredBlock, SealedBlock, SealedHeader, SignedTransaction};
37use reth_stages_types::{PipelineTarget, StageId};
38use reth_static_file_types::{
39    find_fixed_range, HighestStaticFiles, SegmentHeader, SegmentRangeInclusive, StaticFileSegment,
40    DEFAULT_BLOCKS_PER_STATIC_FILE,
41};
42use reth_storage_api::{BlockBodyIndicesProvider, DBProvider, OmmersProvider};
43use reth_storage_errors::provider::{ProviderError, ProviderResult};
44use std::{
45    collections::{hash_map::Entry, BTreeMap, HashMap},
46    fmt::Debug,
47    marker::PhantomData,
48    ops::{Deref, Range, RangeBounds, RangeInclusive},
49    path::{Path, PathBuf},
50    sync::{mpsc, Arc},
51};
52use tracing::{info, trace, warn};
53
54/// Alias type for a map that can be queried for block ranges from a transaction
55/// segment respectively. It uses `TxNumber` to represent the transaction end of a static file
56/// range.
57type SegmentRanges = HashMap<StaticFileSegment, BTreeMap<TxNumber, SegmentRangeInclusive>>;
58
59/// Access mode on a static file provider. RO/RW.
60#[derive(Debug, Default, PartialEq, Eq)]
61pub enum StaticFileAccess {
62    /// Read-only access.
63    #[default]
64    RO,
65    /// Read-write access.
66    RW,
67}
68
69impl StaticFileAccess {
70    /// Returns `true` if read-only access.
71    pub const fn is_read_only(&self) -> bool {
72        matches!(self, Self::RO)
73    }
74
75    /// Returns `true` if read-write access.
76    pub const fn is_read_write(&self) -> bool {
77        matches!(self, Self::RW)
78    }
79}
80
81/// [`StaticFileProvider`] manages all existing [`StaticFileJarProvider`].
82///
83/// "Static files" contain immutable chain history data, such as:
84///  - transactions
85///  - headers
86///  - receipts
87///
88/// This provider type is responsible for reading and writing to static files.
89#[derive(Debug)]
90pub struct StaticFileProvider<N>(pub(crate) Arc<StaticFileProviderInner<N>>);
91
92impl<N> Clone for StaticFileProvider<N> {
93    fn clone(&self) -> Self {
94        Self(self.0.clone())
95    }
96}
97
98impl<N: NodePrimitives> StaticFileProvider<N> {
99    /// Creates a new [`StaticFileProvider`] with the given [`StaticFileAccess`].
100    fn new(path: impl AsRef<Path>, access: StaticFileAccess) -> ProviderResult<Self> {
101        let provider = Self(Arc::new(StaticFileProviderInner::new(path, access)?));
102        provider.initialize_index()?;
103        Ok(provider)
104    }
105
106    /// Creates a new [`StaticFileProvider`] with read-only access.
107    ///
108    /// Set `watch_directory` to `true` to track the most recent changes in static files. Otherwise,
109    /// new data won't be detected or queryable.
110    ///
111    /// Watching is recommended if the read-only provider is used on a directory that an active node
112    /// instance is modifying.
113    ///
114    /// See also [`StaticFileProvider::watch_directory`].
115    pub fn read_only(path: impl AsRef<Path>, watch_directory: bool) -> ProviderResult<Self> {
116        let provider = Self::new(path, StaticFileAccess::RO)?;
117
118        if watch_directory {
119            provider.watch_directory();
120        }
121
122        Ok(provider)
123    }
124
125    /// Creates a new [`StaticFileProvider`] with read-write access.
126    pub fn read_write(path: impl AsRef<Path>) -> ProviderResult<Self> {
127        Self::new(path, StaticFileAccess::RW)
128    }
129
130    /// Watches the directory for changes and updates the in-memory index when modifications
131    /// are detected.
132    ///
133    /// This may be necessary, since a non-node process that owns a [`StaticFileProvider`] does not
134    /// receive `update_index` notifications from a node that appends/truncates data.
135    pub fn watch_directory(&self) {
136        let provider = self.clone();
137        std::thread::spawn(move || {
138            let (tx, rx) = std::sync::mpsc::channel();
139            let mut watcher = RecommendedWatcher::new(
140                move |res| tx.send(res).unwrap(),
141                notify::Config::default(),
142            )
143            .expect("failed to create watcher");
144
145            watcher
146                .watch(&provider.path, RecursiveMode::NonRecursive)
147                .expect("failed to watch path");
148
149            // Some backends send repeated modified events
150            let mut last_event_timestamp = None;
151
152            while let Ok(res) = rx.recv() {
153                match res {
154                    Ok(event) => {
155                        // We only care about modified data events
156                        if !matches!(
157                            event.kind,
158                            notify::EventKind::Modify(_) |
159                                notify::EventKind::Create(_) |
160                                notify::EventKind::Remove(_)
161                        ) {
162                            continue
163                        }
164
165                        // We only trigger a re-initialization if a configuration file was
166                        // modified. This means that a
167                        // static_file_provider.commit() was called on the node after
168                        // appending/truncating rows
169                        for segment in event.paths {
170                            // Ensure it's a file with the .conf extension
171                            if segment
172                                .extension()
173                                .is_none_or(|s| s.to_str() != Some(CONFIG_FILE_EXTENSION))
174                            {
175                                continue
176                            }
177
178                            // Ensure it's well formatted static file name
179                            if StaticFileSegment::parse_filename(
180                                &segment.file_stem().expect("qed").to_string_lossy(),
181                            )
182                            .is_none()
183                            {
184                                continue
185                            }
186
187                            // If we can read the metadata and modified timestamp, ensure this is
188                            // not an old or repeated event.
189                            if let Ok(current_modified_timestamp) =
190                                std::fs::metadata(&segment).and_then(|m| m.modified())
191                            {
192                                if last_event_timestamp.is_some_and(|last_timestamp| {
193                                    last_timestamp >= current_modified_timestamp
194                                }) {
195                                    continue
196                                }
197                                last_event_timestamp = Some(current_modified_timestamp);
198                            }
199
200                            info!(target: "providers::static_file", updated_file = ?segment.file_stem(), "re-initializing static file provider index");
201                            if let Err(err) = provider.initialize_index() {
202                                warn!(target: "providers::static_file", "failed to re-initialize index: {err}");
203                            }
204                            break
205                        }
206                    }
207
208                    Err(err) => warn!(target: "providers::watcher", "watch error: {err:?}"),
209                }
210            }
211        });
212    }
213}
214
215impl<N: NodePrimitives> Deref for StaticFileProvider<N> {
216    type Target = StaticFileProviderInner<N>;
217
218    fn deref(&self) -> &Self::Target {
219        &self.0
220    }
221}
222
223/// [`StaticFileProviderInner`] manages all existing [`StaticFileJarProvider`].
224#[derive(Debug)]
225pub struct StaticFileProviderInner<N> {
226    /// Maintains a map which allows for concurrent access to different `NippyJars`, over different
227    /// segments and ranges.
228    map: DashMap<(BlockNumber, StaticFileSegment), LoadedJar>,
229    /// Max static file block for each segment
230    static_files_max_block: RwLock<HashMap<StaticFileSegment, u64>>,
231    /// Available static file block ranges on disk indexed by max transactions.
232    static_files_tx_index: RwLock<SegmentRanges>,
233    /// Directory where `static_files` are located
234    path: PathBuf,
235    /// Maintains a writer set of [`StaticFileSegment`].
236    writers: StaticFileWriters<N>,
237    /// Metrics for the static files.
238    metrics: Option<Arc<StaticFileProviderMetrics>>,
239    /// Access rights of the provider.
240    access: StaticFileAccess,
241    /// Number of blocks per file.
242    blocks_per_file: u64,
243    /// Write lock for when access is [`StaticFileAccess::RW`].
244    _lock_file: Option<StorageLock>,
245    /// Node primitives
246    _pd: PhantomData<N>,
247}
248
249impl<N: NodePrimitives> StaticFileProviderInner<N> {
250    /// Creates a new [`StaticFileProviderInner`].
251    fn new(path: impl AsRef<Path>, access: StaticFileAccess) -> ProviderResult<Self> {
252        let _lock_file = if access.is_read_write() {
253            StorageLock::try_acquire(path.as_ref()).map_err(ProviderError::other)?.into()
254        } else {
255            None
256        };
257
258        let provider = Self {
259            map: Default::default(),
260            writers: Default::default(),
261            static_files_max_block: Default::default(),
262            static_files_tx_index: Default::default(),
263            path: path.as_ref().to_path_buf(),
264            metrics: None,
265            access,
266            blocks_per_file: DEFAULT_BLOCKS_PER_STATIC_FILE,
267            _lock_file,
268            _pd: Default::default(),
269        };
270
271        Ok(provider)
272    }
273
274    pub const fn is_read_only(&self) -> bool {
275        self.access.is_read_only()
276    }
277
278    /// Each static file has a fixed number of blocks. This gives out the range where the requested
279    /// block is positioned.
280    pub const fn find_fixed_range(&self, block: BlockNumber) -> SegmentRangeInclusive {
281        find_fixed_range(block, self.blocks_per_file)
282    }
283}
284
285impl<N: NodePrimitives> StaticFileProvider<N> {
286    /// Set a custom number of blocks per file.
287    #[cfg(any(test, feature = "test-utils"))]
288    pub fn with_custom_blocks_per_file(self, blocks_per_file: u64) -> Self {
289        let mut provider =
290            Arc::try_unwrap(self.0).expect("should be called when initializing only");
291        provider.blocks_per_file = blocks_per_file;
292        Self(Arc::new(provider))
293    }
294
295    /// Enables metrics on the [`StaticFileProvider`].
296    pub fn with_metrics(self) -> Self {
297        let mut provider =
298            Arc::try_unwrap(self.0).expect("should be called when initializing only");
299        provider.metrics = Some(Arc::new(StaticFileProviderMetrics::default()));
300        Self(Arc::new(provider))
301    }
302
303    /// Reports metrics for the static files.
304    pub fn report_metrics(&self) -> ProviderResult<()> {
305        let Some(metrics) = &self.metrics else { return Ok(()) };
306
307        let static_files = iter_static_files(&self.path).map_err(ProviderError::other)?;
308        for (segment, ranges) in static_files {
309            let mut entries = 0;
310            let mut size = 0;
311
312            for (block_range, _) in &ranges {
313                let fixed_block_range = self.find_fixed_range(block_range.start());
314                let jar_provider = self
315                    .get_segment_provider(segment, || Some(fixed_block_range), None)?
316                    .ok_or_else(|| {
317                        ProviderError::MissingStaticFileBlock(segment, block_range.start())
318                    })?;
319
320                entries += jar_provider.rows();
321
322                let data_size = reth_fs_util::metadata(jar_provider.data_path())
323                    .map(|metadata| metadata.len())
324                    .unwrap_or_default();
325                let index_size = reth_fs_util::metadata(jar_provider.index_path())
326                    .map(|metadata| metadata.len())
327                    .unwrap_or_default();
328                let offsets_size = reth_fs_util::metadata(jar_provider.offsets_path())
329                    .map(|metadata| metadata.len())
330                    .unwrap_or_default();
331                let config_size = reth_fs_util::metadata(jar_provider.config_path())
332                    .map(|metadata| metadata.len())
333                    .unwrap_or_default();
334
335                size += data_size + index_size + offsets_size + config_size;
336            }
337
338            metrics.record_segment(segment, size, ranges.len(), entries);
339        }
340
341        Ok(())
342    }
343
344    /// Gets the [`StaticFileJarProvider`] of the requested segment and block.
345    pub fn get_segment_provider_from_block(
346        &self,
347        segment: StaticFileSegment,
348        block: BlockNumber,
349        path: Option<&Path>,
350    ) -> ProviderResult<StaticFileJarProvider<'_, N>> {
351        self.get_segment_provider(
352            segment,
353            || self.get_segment_ranges_from_block(segment, block),
354            path,
355        )?
356        .ok_or(ProviderError::MissingStaticFileBlock(segment, block))
357    }
358
359    /// Gets the [`StaticFileJarProvider`] of the requested segment and transaction.
360    pub fn get_segment_provider_from_transaction(
361        &self,
362        segment: StaticFileSegment,
363        tx: TxNumber,
364        path: Option<&Path>,
365    ) -> ProviderResult<StaticFileJarProvider<'_, N>> {
366        self.get_segment_provider(
367            segment,
368            || self.get_segment_ranges_from_transaction(segment, tx),
369            path,
370        )?
371        .ok_or(ProviderError::MissingStaticFileTx(segment, tx))
372    }
373
374    /// Gets the [`StaticFileJarProvider`] of the requested segment and block or transaction.
375    ///
376    /// `fn_range` should make sure the range goes through `find_fixed_range`.
377    pub fn get_segment_provider(
378        &self,
379        segment: StaticFileSegment,
380        fn_range: impl Fn() -> Option<SegmentRangeInclusive>,
381        path: Option<&Path>,
382    ) -> ProviderResult<Option<StaticFileJarProvider<'_, N>>> {
383        // If we have a path, then get the block range from its name.
384        // Otherwise, check `self.available_static_files`
385        let block_range = match path {
386            Some(path) => StaticFileSegment::parse_filename(
387                &path
388                    .file_name()
389                    .ok_or_else(|| {
390                        ProviderError::MissingStaticFilePath(segment, path.to_path_buf())
391                    })?
392                    .to_string_lossy(),
393            )
394            .and_then(|(parsed_segment, block_range)| {
395                if parsed_segment == segment {
396                    return Some(block_range)
397                }
398                None
399            }),
400            None => fn_range(),
401        };
402
403        // Return cached `LoadedJar` or insert it for the first time, and then, return it.
404        if let Some(block_range) = block_range {
405            return Ok(Some(self.get_or_create_jar_provider(segment, &block_range)?))
406        }
407
408        Ok(None)
409    }
410
411    /// Given a segment and block range it removes the cached provider from the map.
412    ///
413    /// CAUTION: cached provider should be dropped before calling this or IT WILL deadlock.
414    pub fn remove_cached_provider(
415        &self,
416        segment: StaticFileSegment,
417        fixed_block_range_end: BlockNumber,
418    ) {
419        self.map.remove(&(fixed_block_range_end, segment));
420    }
421
422    /// Given a segment and block, it deletes the jar and all files from the respective block range.
423    ///
424    /// CAUTION: destructive. Deletes files on disk.
425    pub fn delete_jar(&self, segment: StaticFileSegment, block: BlockNumber) -> ProviderResult<()> {
426        let fixed_block_range = self.find_fixed_range(block);
427        let key = (fixed_block_range.end(), segment);
428        let jar = if let Some((_, jar)) = self.map.remove(&key) {
429            jar.jar
430        } else {
431            NippyJar::<SegmentHeader>::load(&self.path.join(segment.filename(&fixed_block_range)))
432                .map_err(ProviderError::other)?
433        };
434
435        jar.delete().map_err(ProviderError::other)?;
436
437        let mut segment_max_block = None;
438        if fixed_block_range.start() > 0 {
439            segment_max_block = Some(fixed_block_range.start() - 1)
440        };
441        self.update_index(segment, segment_max_block)?;
442
443        Ok(())
444    }
445
446    /// Given a segment and block range it returns a cached
447    /// [`StaticFileJarProvider`]. TODO(joshie): we should check the size and pop N if there's too
448    /// many.
449    fn get_or_create_jar_provider(
450        &self,
451        segment: StaticFileSegment,
452        fixed_block_range: &SegmentRangeInclusive,
453    ) -> ProviderResult<StaticFileJarProvider<'_, N>> {
454        let key = (fixed_block_range.end(), segment);
455
456        // Avoid using `entry` directly to avoid a write lock in the common case.
457        trace!(target: "provider::static_file", ?segment, ?fixed_block_range, "Getting provider");
458        let mut provider: StaticFileJarProvider<'_, N> = if let Some(jar) = self.map.get(&key) {
459            trace!(target: "provider::static_file", ?segment, ?fixed_block_range, "Jar found in cache");
460            jar.into()
461        } else {
462            trace!(target: "provider::static_file", ?segment, ?fixed_block_range, "Creating jar from scratch");
463            let path = self.path.join(segment.filename(fixed_block_range));
464            let jar = NippyJar::load(&path).map_err(ProviderError::other)?;
465            self.map.entry(key).insert(LoadedJar::new(jar)?).downgrade().into()
466        };
467
468        if let Some(metrics) = &self.metrics {
469            provider = provider.with_metrics(metrics.clone());
470        }
471        Ok(provider)
472    }
473
474    /// Gets a static file segment's block range from the provider inner block
475    /// index.
476    fn get_segment_ranges_from_block(
477        &self,
478        segment: StaticFileSegment,
479        block: u64,
480    ) -> Option<SegmentRangeInclusive> {
481        self.static_files_max_block
482            .read()
483            .get(&segment)
484            .filter(|max| **max >= block)
485            .map(|_| self.find_fixed_range(block))
486    }
487
488    /// Gets a static file segment's fixed block range from the provider inner
489    /// transaction index.
490    fn get_segment_ranges_from_transaction(
491        &self,
492        segment: StaticFileSegment,
493        tx: u64,
494    ) -> Option<SegmentRangeInclusive> {
495        let static_files = self.static_files_tx_index.read();
496        let segment_static_files = static_files.get(&segment)?;
497
498        // It's more probable that the request comes from a newer tx height, so we iterate
499        // the static_files in reverse.
500        let mut static_files_rev_iter = segment_static_files.iter().rev().peekable();
501
502        while let Some((tx_end, block_range)) = static_files_rev_iter.next() {
503            if tx > *tx_end {
504                // request tx is higher than highest static file tx
505                return None
506            }
507            let tx_start = static_files_rev_iter.peek().map(|(tx_end, _)| *tx_end + 1).unwrap_or(0);
508            if tx_start <= tx {
509                return Some(self.find_fixed_range(block_range.end()))
510            }
511        }
512        None
513    }
514
515    /// Updates the inner transaction and block indexes alongside the internal cached providers in
516    /// `self.map`.
517    ///
518    /// Any entry higher than `segment_max_block` will be deleted from the previous structures.
519    ///
520    /// If `segment_max_block` is None it means there's no static file for this segment.
521    pub fn update_index(
522        &self,
523        segment: StaticFileSegment,
524        segment_max_block: Option<BlockNumber>,
525    ) -> ProviderResult<()> {
526        let mut max_block = self.static_files_max_block.write();
527        let mut tx_index = self.static_files_tx_index.write();
528
529        match segment_max_block {
530            Some(segment_max_block) => {
531                // Update the max block for the segment
532                max_block.insert(segment, segment_max_block);
533                let fixed_range = self.find_fixed_range(segment_max_block);
534
535                let jar = NippyJar::<SegmentHeader>::load(
536                    &self.path.join(segment.filename(&fixed_range)),
537                )
538                .map_err(ProviderError::other)?;
539
540                // Updates the tx index by first removing all entries which have a higher
541                // block_start than our current static file.
542                if let Some(tx_range) = jar.user_header().tx_range() {
543                    let tx_end = tx_range.end();
544
545                    // Current block range has the same block start as `fixed_range``, but block end
546                    // might be different if we are still filling this static file.
547                    if let Some(current_block_range) = jar.user_header().block_range().copied() {
548                        // Considering that `update_index` is called when we either append/truncate,
549                        // we are sure that we are handling the latest data
550                        // points.
551                        //
552                        // Here we remove every entry of the index that has a block start higher or
553                        // equal than our current one. This is important in the case
554                        // that we prune a lot of rows resulting in a file (and thus
555                        // a higher block range) deletion.
556                        tx_index
557                            .entry(segment)
558                            .and_modify(|index| {
559                                index.retain(|_, block_range| {
560                                    block_range.start() < fixed_range.start()
561                                });
562                                index.insert(tx_end, current_block_range);
563                            })
564                            .or_insert_with(|| BTreeMap::from([(tx_end, current_block_range)]));
565                    }
566                } else if segment.is_tx_based() {
567                    // The unwinded file has no more transactions/receipts. However, the highest
568                    // block is within this files' block range. We only retain
569                    // entries with block ranges before the current one.
570                    tx_index.entry(segment).and_modify(|index| {
571                        index.retain(|_, block_range| block_range.start() < fixed_range.start());
572                    });
573
574                    // If the index is empty, just remove it.
575                    if tx_index.get(&segment).is_some_and(|index| index.is_empty()) {
576                        tx_index.remove(&segment);
577                    }
578                }
579
580                // Update the cached provider.
581                self.map.insert((fixed_range.end(), segment), LoadedJar::new(jar)?);
582
583                // Delete any cached provider that no longer has an associated jar.
584                self.map.retain(|(end, seg), _| !(*seg == segment && *end > fixed_range.end()));
585            }
586            None => {
587                tx_index.remove(&segment);
588                max_block.remove(&segment);
589            }
590        };
591
592        Ok(())
593    }
594
595    /// Initializes the inner transaction and block index
596    pub fn initialize_index(&self) -> ProviderResult<()> {
597        let mut max_block = self.static_files_max_block.write();
598        let mut tx_index = self.static_files_tx_index.write();
599
600        max_block.clear();
601        tx_index.clear();
602
603        for (segment, ranges) in iter_static_files(&self.path).map_err(ProviderError::other)? {
604            // Update last block for each segment
605            if let Some((block_range, _)) = ranges.last() {
606                max_block.insert(segment, block_range.end());
607            }
608
609            // Update tx -> block_range index
610            for (block_range, tx_range) in ranges {
611                if let Some(tx_range) = tx_range {
612                    let tx_end = tx_range.end();
613
614                    match tx_index.entry(segment) {
615                        Entry::Occupied(mut index) => {
616                            index.get_mut().insert(tx_end, block_range);
617                        }
618                        Entry::Vacant(index) => {
619                            index.insert(BTreeMap::from([(tx_end, block_range)]));
620                        }
621                    };
622                }
623            }
624        }
625
626        // If this is a re-initialization, we need to clear this as well
627        self.map.clear();
628
629        Ok(())
630    }
631
632    /// Ensures that any broken invariants which cannot be healed on the spot return a pipeline
633    /// target to unwind to.
634    ///
635    /// Two types of consistency checks are done for:
636    ///
637    /// 1) When a static file fails to commit but the underlying data was changed.
638    /// 2) When a static file was committed, but the required database transaction was not.
639    ///
640    /// For 1) it can self-heal if `self.access.is_read_only()` is set to `false`. Otherwise, it
641    /// will return an error.
642    /// For 2) the invariants below are checked, and if broken, might require a pipeline unwind
643    /// to heal.
644    ///
645    /// For each static file segment:
646    /// * the corresponding database table should overlap or have continuity in their keys
647    ///   ([`TxNumber`] or [`BlockNumber`]).
648    /// * its highest block should match the stage checkpoint block number if it's equal or higher
649    ///   than the corresponding database table last entry.
650    ///
651    /// Returns a [`Option`] of [`PipelineTarget::Unwind`] if any healing is further required.
652    ///
653    /// WARNING: No static file writer should be held before calling this function, otherwise it
654    /// will deadlock.
655    pub fn check_consistency<Provider>(
656        &self,
657        provider: &Provider,
658        has_receipt_pruning: bool,
659    ) -> ProviderResult<Option<PipelineTarget>>
660    where
661        Provider: DBProvider + BlockReader + StageCheckpointReader + ChainSpecProvider,
662        N: NodePrimitives<Receipt: Value, BlockHeader: Value, SignedTx: Value>,
663    {
664        // OVM historical import is broken and does not work with this check. It's importing
665        // duplicated receipts resulting in having more receipts than the expected transaction
666        // range.
667        //
668        // If we detect an OVM import was done (block #1 <https://optimistic.etherscan.io/block/1>), skip it.
669        // More on [#11099](https://github.com/paradigmxyz/reth/pull/11099).
670        if provider.chain_spec().is_optimism() &&
671            reth_chainspec::Chain::optimism_mainnet() == provider.chain_spec().chain_id()
672        {
673            // check whether we have the first OVM block: <https://optimistic.etherscan.io/block/0xbee7192e575af30420cae0c7776304ac196077ee72b048970549e4f08e875453>
674            const OVM_HEADER_1_HASH: B256 =
675                b256!("0xbee7192e575af30420cae0c7776304ac196077ee72b048970549e4f08e875453");
676            if provider.block_number(OVM_HEADER_1_HASH)?.is_some() {
677                info!(target: "reth::cli",
678                    "Skipping storage verification for OP mainnet, expected inconsistency in OVM chain"
679                );
680                return Ok(None)
681            }
682        }
683
684        info!(target: "reth::cli", "Verifying storage consistency.");
685
686        let mut unwind_target: Option<BlockNumber> = None;
687        let mut update_unwind_target = |new_target: BlockNumber| {
688            if let Some(target) = unwind_target.as_mut() {
689                *target = (*target).min(new_target);
690            } else {
691                unwind_target = Some(new_target);
692            }
693        };
694
695        for segment in StaticFileSegment::iter() {
696            // Not integrated yet
697            if segment.is_block_meta() {
698                continue
699            }
700
701            if has_receipt_pruning && segment.is_receipts() {
702                // Pruned nodes (including full node) do not store receipts as static files.
703                continue
704            }
705
706            let initial_highest_block = self.get_highest_static_file_block(segment);
707
708            //  File consistency is broken if:
709            //
710            // * appending data was interrupted before a config commit, then data file will be
711            //   truncated according to the config.
712            //
713            // * pruning data was interrupted before a config commit, then we have deleted data that
714            //   we are expected to still have. We need to check the Database and unwind everything
715            //   accordingly.
716            if self.access.is_read_only() {
717                self.check_segment_consistency(segment)?;
718            } else {
719                // Fetching the writer will attempt to heal any file level inconsistency.
720                self.latest_writer(segment)?;
721            }
722
723            // Only applies to block-based static files. (Headers)
724            //
725            // The updated `highest_block` may have decreased if we healed from a pruning
726            // interruption.
727            let mut highest_block = self.get_highest_static_file_block(segment);
728            if initial_highest_block != highest_block {
729                info!(
730                    target: "reth::providers::static_file",
731                    ?initial_highest_block,
732                    unwind_target = highest_block,
733                    ?segment,
734                    "Setting unwind target."
735                );
736                update_unwind_target(highest_block.unwrap_or_default());
737            }
738
739            // Only applies to transaction-based static files. (Receipts & Transactions)
740            //
741            // Make sure the last transaction matches the last block from its indices, since a heal
742            // from a pruning interruption might have decreased the number of transactions without
743            // being able to update the last block of the static file segment.
744            let highest_tx = self.get_highest_static_file_tx(segment);
745            if let Some(highest_tx) = highest_tx {
746                let mut last_block = highest_block.unwrap_or_default();
747                loop {
748                    if let Some(indices) = provider.block_body_indices(last_block)? {
749                        if indices.last_tx_num() <= highest_tx {
750                            break
751                        }
752                    } else {
753                        // If the block body indices can not be found, then it means that static
754                        // files is ahead of database, and the `ensure_invariants` check will fix
755                        // it by comparing with stage checkpoints.
756                        break
757                    }
758                    if last_block == 0 {
759                        break
760                    }
761                    last_block -= 1;
762
763                    info!(
764                        target: "reth::providers::static_file",
765                        highest_block = self.get_highest_static_file_block(segment),
766                        unwind_target = last_block,
767                        ?segment,
768                        "Setting unwind target."
769                    );
770                    highest_block = Some(last_block);
771                    update_unwind_target(last_block);
772                }
773            }
774
775            if let Some(unwind) = match segment {
776                StaticFileSegment::Headers => self
777                    .ensure_invariants::<_, tables::Headers<N::BlockHeader>>(
778                        provider,
779                        segment,
780                        highest_block,
781                        highest_block,
782                    )?,
783                StaticFileSegment::Transactions => self
784                    .ensure_invariants::<_, tables::Transactions<N::SignedTx>>(
785                        provider,
786                        segment,
787                        highest_tx,
788                        highest_block,
789                    )?,
790                StaticFileSegment::Receipts => self
791                    .ensure_invariants::<_, tables::Receipts<N::Receipt>>(
792                        provider,
793                        segment,
794                        highest_tx,
795                        highest_block,
796                    )?,
797                StaticFileSegment::BlockMeta => self
798                    .ensure_invariants::<_, tables::BlockBodyIndices>(
799                        provider,
800                        segment,
801                        highest_block,
802                        highest_block,
803                    )?,
804            } {
805                update_unwind_target(unwind);
806            }
807        }
808
809        Ok(unwind_target.map(PipelineTarget::Unwind))
810    }
811
812    /// Checks consistency of the latest static file segment and throws an error if at fault.
813    /// Read-only.
814    pub fn check_segment_consistency(&self, segment: StaticFileSegment) -> ProviderResult<()> {
815        if let Some(latest_block) = self.get_highest_static_file_block(segment) {
816            let file_path =
817                self.directory().join(segment.filename(&self.find_fixed_range(latest_block)));
818
819            let jar = NippyJar::<SegmentHeader>::load(&file_path).map_err(ProviderError::other)?;
820
821            NippyJarChecker::new(jar).check_consistency().map_err(ProviderError::other)?;
822        }
823        Ok(())
824    }
825
826    /// Check invariants for each corresponding table and static file segment:
827    ///
828    /// * the corresponding database table should overlap or have continuity in their keys
829    ///   ([`TxNumber`] or [`BlockNumber`]).
830    /// * its highest block should match the stage checkpoint block number if it's equal or higher
831    ///   than the corresponding database table last entry.
832    ///   * If the checkpoint block is higher, then request a pipeline unwind to the static file
833    ///     block. This is expressed by returning [`Some`] with the requested pipeline unwind
834    ///     target.
835    ///   * If the checkpoint block is lower, then heal by removing rows from the static file. In
836    ///     this case, the rows will be removed and [`None`] will be returned.
837    ///
838    /// * If the database tables overlap with static files and have contiguous keys, or the
839    ///   checkpoint block matches the highest static files block, then [`None`] will be returned.
840    fn ensure_invariants<Provider, T: Table<Key = u64>>(
841        &self,
842        provider: &Provider,
843        segment: StaticFileSegment,
844        highest_static_file_entry: Option<u64>,
845        highest_static_file_block: Option<BlockNumber>,
846    ) -> ProviderResult<Option<BlockNumber>>
847    where
848        Provider: DBProvider + BlockReader + StageCheckpointReader,
849    {
850        let mut db_cursor = provider.tx_ref().cursor_read::<T>()?;
851
852        if let Some((db_first_entry, _)) = db_cursor.first()? {
853            if let (Some(highest_entry), Some(highest_block)) =
854                (highest_static_file_entry, highest_static_file_block)
855            {
856                // If there is a gap between the entry found in static file and
857                // database, then we have most likely lost static file data and need to unwind so we
858                // can load it again
859                if !(db_first_entry <= highest_entry || highest_entry + 1 == db_first_entry) {
860                    info!(
861                        target: "reth::providers::static_file",
862                        ?db_first_entry,
863                        ?highest_entry,
864                        unwind_target = highest_block,
865                        ?segment,
866                        "Setting unwind target."
867                    );
868                    return Ok(Some(highest_block))
869                }
870            }
871
872            if let Some((db_last_entry, _)) = db_cursor.last()? {
873                if highest_static_file_entry
874                    .is_none_or(|highest_entry| db_last_entry > highest_entry)
875                {
876                    return Ok(None)
877                }
878            }
879        }
880
881        let highest_static_file_entry = highest_static_file_entry.unwrap_or_default();
882        let highest_static_file_block = highest_static_file_block.unwrap_or_default();
883
884        // If static file entry is ahead of the database entries, then ensure the checkpoint block
885        // number matches.
886        let checkpoint_block_number = provider
887            .get_stage_checkpoint(match segment {
888                StaticFileSegment::Headers => StageId::Headers,
889                StaticFileSegment::Transactions | StaticFileSegment::BlockMeta => StageId::Bodies,
890                StaticFileSegment::Receipts => StageId::Execution,
891            })?
892            .unwrap_or_default()
893            .block_number;
894
895        // If the checkpoint is ahead, then we lost static file data. May be data corruption.
896        if checkpoint_block_number > highest_static_file_block {
897            info!(
898                target: "reth::providers::static_file",
899                checkpoint_block_number,
900                unwind_target = highest_static_file_block,
901                ?segment,
902                "Setting unwind target."
903            );
904            return Ok(Some(highest_static_file_block))
905        }
906
907        // If the checkpoint is behind, then we failed to do a database commit **but committed** to
908        // static files on executing a stage, or the reverse on unwinding a stage.
909        // All we need to do is to prune the extra static file rows.
910        if checkpoint_block_number < highest_static_file_block {
911            info!(
912                target: "reth::providers",
913                ?segment,
914                from = highest_static_file_block,
915                to = checkpoint_block_number,
916                "Unwinding static file segment."
917            );
918            let mut writer = self.latest_writer(segment)?;
919            if segment.is_headers() {
920                // TODO(joshie): is_block_meta
921                writer.prune_headers(highest_static_file_block - checkpoint_block_number)?;
922            } else if let Some(block) = provider.block_body_indices(checkpoint_block_number)? {
923                // todo joshie: is querying block_body_indices a potential issue once bbi is moved
924                // to sf as well
925                let number = highest_static_file_entry - block.last_tx_num();
926                if segment.is_receipts() {
927                    writer.prune_receipts(number, checkpoint_block_number)?;
928                } else {
929                    writer.prune_transactions(number, checkpoint_block_number)?;
930                }
931            }
932            writer.commit()?;
933        }
934
935        Ok(None)
936    }
937
938    /// Gets the highest static file block if it exists for a static file segment.
939    ///
940    /// If there is nothing on disk for the given segment, this will return [`None`].
941    pub fn get_highest_static_file_block(&self, segment: StaticFileSegment) -> Option<BlockNumber> {
942        self.static_files_max_block.read().get(&segment).copied()
943    }
944
945    /// Gets the highest static file transaction.
946    ///
947    /// If there is nothing on disk for the given segment, this will return [`None`].
948    pub fn get_highest_static_file_tx(&self, segment: StaticFileSegment) -> Option<TxNumber> {
949        self.static_files_tx_index
950            .read()
951            .get(&segment)
952            .and_then(|index| index.last_key_value().map(|(last_tx, _)| *last_tx))
953    }
954
955    /// Gets the highest static file block for all segments.
956    pub fn get_highest_static_files(&self) -> HighestStaticFiles {
957        HighestStaticFiles {
958            headers: self.get_highest_static_file_block(StaticFileSegment::Headers),
959            receipts: self.get_highest_static_file_block(StaticFileSegment::Receipts),
960            transactions: self.get_highest_static_file_block(StaticFileSegment::Transactions),
961            block_meta: self.get_highest_static_file_block(StaticFileSegment::BlockMeta),
962        }
963    }
964
965    /// Iterates through segment `static_files` in reverse order, executing a function until it
966    /// returns some object. Useful for finding objects by [`TxHash`] or [`BlockHash`].
967    pub fn find_static_file<T>(
968        &self,
969        segment: StaticFileSegment,
970        func: impl Fn(StaticFileJarProvider<'_, N>) -> ProviderResult<Option<T>>,
971    ) -> ProviderResult<Option<T>> {
972        if let Some(highest_block) = self.get_highest_static_file_block(segment) {
973            let mut range = self.find_fixed_range(highest_block);
974            while range.end() > 0 {
975                if let Some(res) = func(self.get_or_create_jar_provider(segment, &range)?)? {
976                    return Ok(Some(res))
977                }
978                range = SegmentRangeInclusive::new(
979                    range.start().saturating_sub(self.blocks_per_file),
980                    range.end().saturating_sub(self.blocks_per_file),
981                );
982            }
983        }
984
985        Ok(None)
986    }
987
988    /// Fetches data within a specified range across multiple static files.
989    ///
990    /// This function iteratively retrieves data using `get_fn` for each item in the given range.
991    /// It continues fetching until the end of the range is reached or the provided `predicate`
992    /// returns false.
993    pub fn fetch_range_with_predicate<T, F, P>(
994        &self,
995        segment: StaticFileSegment,
996        range: Range<u64>,
997        mut get_fn: F,
998        mut predicate: P,
999    ) -> ProviderResult<Vec<T>>
1000    where
1001        F: FnMut(&mut StaticFileCursor<'_>, u64) -> ProviderResult<Option<T>>,
1002        P: FnMut(&T) -> bool,
1003    {
1004        let get_provider = |start: u64| {
1005            if segment.is_block_based() {
1006                self.get_segment_provider_from_block(segment, start, None)
1007            } else {
1008                self.get_segment_provider_from_transaction(segment, start, None)
1009            }
1010        };
1011
1012        let mut result = Vec::with_capacity((range.end - range.start).min(100) as usize);
1013        let mut provider = get_provider(range.start)?;
1014        let mut cursor = provider.cursor()?;
1015
1016        // advances number in range
1017        'outer: for number in range {
1018            // The `retrying` flag ensures a single retry attempt per `number`. If `get_fn` fails to
1019            // access data in two different static files, it halts further attempts by returning
1020            // an error, effectively preventing infinite retry loops.
1021            let mut retrying = false;
1022
1023            // advances static files if `get_fn` returns None
1024            'inner: loop {
1025                match get_fn(&mut cursor, number)? {
1026                    Some(res) => {
1027                        if !predicate(&res) {
1028                            break 'outer
1029                        }
1030                        result.push(res);
1031                        break 'inner
1032                    }
1033                    None => {
1034                        if retrying {
1035                            warn!(
1036                                target: "provider::static_file",
1037                                ?segment,
1038                                ?number,
1039                                "Could not find block or tx number on a range request"
1040                            );
1041
1042                            let err = if segment.is_block_based() {
1043                                ProviderError::MissingStaticFileBlock(segment, number)
1044                            } else {
1045                                ProviderError::MissingStaticFileTx(segment, number)
1046                            };
1047                            return Err(err)
1048                        }
1049                        // There is a very small chance of hitting a deadlock if two consecutive
1050                        // static files share the same bucket in the
1051                        // internal dashmap and we don't drop the current provider
1052                        // before requesting the next one.
1053                        drop(cursor);
1054                        drop(provider);
1055                        provider = get_provider(number)?;
1056                        cursor = provider.cursor()?;
1057                        retrying = true;
1058                    }
1059                }
1060            }
1061        }
1062
1063        Ok(result)
1064    }
1065
1066    /// Fetches data within a specified range across multiple static files.
1067    ///
1068    /// Returns an iterator over the data
1069    pub fn fetch_range_iter<'a, T, F>(
1070        &'a self,
1071        segment: StaticFileSegment,
1072        range: Range<u64>,
1073        get_fn: F,
1074    ) -> ProviderResult<impl Iterator<Item = ProviderResult<T>> + 'a>
1075    where
1076        F: Fn(&mut StaticFileCursor<'_>, u64) -> ProviderResult<Option<T>> + 'a,
1077        T: std::fmt::Debug,
1078    {
1079        let get_provider = move |start: u64| {
1080            if segment.is_block_based() {
1081                self.get_segment_provider_from_block(segment, start, None)
1082            } else {
1083                self.get_segment_provider_from_transaction(segment, start, None)
1084            }
1085        };
1086
1087        let mut provider = Some(get_provider(range.start)?);
1088        Ok(range.filter_map(move |number| {
1089            match get_fn(&mut provider.as_ref().expect("qed").cursor().ok()?, number).transpose() {
1090                Some(result) => Some(result),
1091                None => {
1092                    // There is a very small chance of hitting a deadlock if two consecutive static
1093                    // files share the same bucket in the internal dashmap and
1094                    // we don't drop the current provider before requesting the
1095                    // next one.
1096                    provider.take();
1097                    provider = Some(get_provider(number).ok()?);
1098                    get_fn(&mut provider.as_ref().expect("qed").cursor().ok()?, number).transpose()
1099                }
1100            }
1101        }))
1102    }
1103
1104    /// Returns directory where `static_files` are located.
1105    pub fn directory(&self) -> &Path {
1106        &self.path
1107    }
1108
1109    /// Retrieves data from the database or static file, wherever it's available.
1110    ///
1111    /// # Arguments
1112    /// * `segment` - The segment of the static file to check against.
1113    /// * `index_key` - Requested index key, usually a block or transaction number.
1114    /// * `fetch_from_static_file` - A closure that defines how to fetch the data from the static
1115    ///   file provider.
1116    /// * `fetch_from_database` - A closure that defines how to fetch the data from the database
1117    ///   when the static file doesn't contain the required data or is not available.
1118    pub fn get_with_static_file_or_database<T, FS, FD>(
1119        &self,
1120        segment: StaticFileSegment,
1121        number: u64,
1122        fetch_from_static_file: FS,
1123        fetch_from_database: FD,
1124    ) -> ProviderResult<Option<T>>
1125    where
1126        FS: Fn(&Self) -> ProviderResult<Option<T>>,
1127        FD: Fn() -> ProviderResult<Option<T>>,
1128    {
1129        // If there is, check the maximum block or transaction number of the segment.
1130        let static_file_upper_bound = if segment.is_block_based() {
1131            self.get_highest_static_file_block(segment)
1132        } else {
1133            self.get_highest_static_file_tx(segment)
1134        };
1135
1136        if static_file_upper_bound
1137            .is_some_and(|static_file_upper_bound| static_file_upper_bound >= number)
1138        {
1139            return fetch_from_static_file(self)
1140        }
1141        fetch_from_database()
1142    }
1143
1144    /// Gets data within a specified range, potentially spanning different `static_files` and
1145    /// database.
1146    ///
1147    /// # Arguments
1148    /// * `segment` - The segment of the static file to query.
1149    /// * `block_range` - The range of data to fetch.
1150    /// * `fetch_from_static_file` - A function to fetch data from the `static_file`.
1151    /// * `fetch_from_database` - A function to fetch data from the database.
1152    /// * `predicate` - A function used to evaluate each item in the fetched data. Fetching is
1153    ///   terminated when this function returns false, thereby filtering the data based on the
1154    ///   provided condition.
1155    pub fn get_range_with_static_file_or_database<T, P, FS, FD>(
1156        &self,
1157        segment: StaticFileSegment,
1158        mut block_or_tx_range: Range<u64>,
1159        fetch_from_static_file: FS,
1160        mut fetch_from_database: FD,
1161        mut predicate: P,
1162    ) -> ProviderResult<Vec<T>>
1163    where
1164        FS: Fn(&Self, Range<u64>, &mut P) -> ProviderResult<Vec<T>>,
1165        FD: FnMut(Range<u64>, P) -> ProviderResult<Vec<T>>,
1166        P: FnMut(&T) -> bool,
1167    {
1168        let mut data = Vec::new();
1169
1170        // If there is, check the maximum block or transaction number of the segment.
1171        if let Some(static_file_upper_bound) = if segment.is_block_based() {
1172            self.get_highest_static_file_block(segment)
1173        } else {
1174            self.get_highest_static_file_tx(segment)
1175        } {
1176            if block_or_tx_range.start <= static_file_upper_bound {
1177                let end = block_or_tx_range.end.min(static_file_upper_bound + 1);
1178                data.extend(fetch_from_static_file(
1179                    self,
1180                    block_or_tx_range.start..end,
1181                    &mut predicate,
1182                )?);
1183                block_or_tx_range.start = end;
1184            }
1185        }
1186
1187        if block_or_tx_range.end > block_or_tx_range.start {
1188            data.extend(fetch_from_database(block_or_tx_range, predicate)?)
1189        }
1190
1191        Ok(data)
1192    }
1193
1194    /// Returns `static_files` directory
1195    #[cfg(any(test, feature = "test-utils"))]
1196    pub fn path(&self) -> &Path {
1197        &self.path
1198    }
1199
1200    /// Returns `static_files` transaction index
1201    #[cfg(any(test, feature = "test-utils"))]
1202    pub fn tx_index(&self) -> &RwLock<SegmentRanges> {
1203        &self.static_files_tx_index
1204    }
1205}
1206
1207/// Helper trait to manage different [`StaticFileProviderRW`] of an `Arc<StaticFileProvider`
1208pub trait StaticFileWriter {
1209    /// The primitives type used by the static file provider.
1210    type Primitives: Send + Sync + 'static;
1211
1212    /// Returns a mutable reference to a [`StaticFileProviderRW`] of a [`StaticFileSegment`].
1213    fn get_writer(
1214        &self,
1215        block: BlockNumber,
1216        segment: StaticFileSegment,
1217    ) -> ProviderResult<StaticFileProviderRWRefMut<'_, Self::Primitives>>;
1218
1219    /// Returns a mutable reference to a [`StaticFileProviderRW`] of the latest
1220    /// [`StaticFileSegment`].
1221    fn latest_writer(
1222        &self,
1223        segment: StaticFileSegment,
1224    ) -> ProviderResult<StaticFileProviderRWRefMut<'_, Self::Primitives>>;
1225
1226    /// Commits all changes of all [`StaticFileProviderRW`] of all [`StaticFileSegment`].
1227    fn commit(&self) -> ProviderResult<()>;
1228}
1229
1230impl<N: NodePrimitives> StaticFileWriter for StaticFileProvider<N> {
1231    type Primitives = N;
1232
1233    fn get_writer(
1234        &self,
1235        block: BlockNumber,
1236        segment: StaticFileSegment,
1237    ) -> ProviderResult<StaticFileProviderRWRefMut<'_, Self::Primitives>> {
1238        if self.access.is_read_only() {
1239            return Err(ProviderError::ReadOnlyStaticFileAccess)
1240        }
1241
1242        trace!(target: "provider::static_file", ?block, ?segment, "Getting static file writer.");
1243        self.writers.get_or_create(segment, || {
1244            StaticFileProviderRW::new(segment, block, Arc::downgrade(&self.0), self.metrics.clone())
1245        })
1246    }
1247
1248    fn latest_writer(
1249        &self,
1250        segment: StaticFileSegment,
1251    ) -> ProviderResult<StaticFileProviderRWRefMut<'_, Self::Primitives>> {
1252        self.get_writer(self.get_highest_static_file_block(segment).unwrap_or_default(), segment)
1253    }
1254
1255    fn commit(&self) -> ProviderResult<()> {
1256        self.writers.commit()
1257    }
1258}
1259
1260impl<N: NodePrimitives<BlockHeader: Value>> HeaderProvider for StaticFileProvider<N> {
1261    type Header = N::BlockHeader;
1262
1263    fn header(&self, block_hash: &BlockHash) -> ProviderResult<Option<Self::Header>> {
1264        self.find_static_file(StaticFileSegment::Headers, |jar_provider| {
1265            Ok(jar_provider
1266                .cursor()?
1267                .get_two::<HeaderWithHashMask<Self::Header>>(block_hash.into())?
1268                .and_then(|(header, hash)| {
1269                    if &hash == block_hash {
1270                        return Some(header)
1271                    }
1272                    None
1273                }))
1274        })
1275    }
1276
1277    fn header_by_number(&self, num: BlockNumber) -> ProviderResult<Option<Self::Header>> {
1278        self.get_segment_provider_from_block(StaticFileSegment::Headers, num, None)
1279            .and_then(|provider| provider.header_by_number(num))
1280            .or_else(|err| {
1281                if let ProviderError::MissingStaticFileBlock(_, _) = err {
1282                    Ok(None)
1283                } else {
1284                    Err(err)
1285                }
1286            })
1287    }
1288
1289    fn header_td(&self, block_hash: &BlockHash) -> ProviderResult<Option<U256>> {
1290        self.find_static_file(StaticFileSegment::Headers, |jar_provider| {
1291            Ok(jar_provider
1292                .cursor()?
1293                .get_two::<TDWithHashMask>(block_hash.into())?
1294                .and_then(|(td, hash)| (&hash == block_hash).then_some(td.0)))
1295        })
1296    }
1297
1298    fn header_td_by_number(&self, num: BlockNumber) -> ProviderResult<Option<U256>> {
1299        self.get_segment_provider_from_block(StaticFileSegment::Headers, num, None)
1300            .and_then(|provider| provider.header_td_by_number(num))
1301            .or_else(|err| {
1302                if let ProviderError::MissingStaticFileBlock(_, _) = err {
1303                    Ok(None)
1304                } else {
1305                    Err(err)
1306                }
1307            })
1308    }
1309
1310    fn headers_range(
1311        &self,
1312        range: impl RangeBounds<BlockNumber>,
1313    ) -> ProviderResult<Vec<Self::Header>> {
1314        self.fetch_range_with_predicate(
1315            StaticFileSegment::Headers,
1316            to_range(range),
1317            |cursor, number| cursor.get_one::<HeaderMask<Self::Header>>(number.into()),
1318            |_| true,
1319        )
1320    }
1321
1322    fn sealed_header(
1323        &self,
1324        num: BlockNumber,
1325    ) -> ProviderResult<Option<SealedHeader<Self::Header>>> {
1326        self.get_segment_provider_from_block(StaticFileSegment::Headers, num, None)
1327            .and_then(|provider| provider.sealed_header(num))
1328            .or_else(|err| {
1329                if let ProviderError::MissingStaticFileBlock(_, _) = err {
1330                    Ok(None)
1331                } else {
1332                    Err(err)
1333                }
1334            })
1335    }
1336
1337    fn sealed_headers_while(
1338        &self,
1339        range: impl RangeBounds<BlockNumber>,
1340        predicate: impl FnMut(&SealedHeader<Self::Header>) -> bool,
1341    ) -> ProviderResult<Vec<SealedHeader<Self::Header>>> {
1342        self.fetch_range_with_predicate(
1343            StaticFileSegment::Headers,
1344            to_range(range),
1345            |cursor, number| {
1346                Ok(cursor
1347                    .get_two::<HeaderWithHashMask<Self::Header>>(number.into())?
1348                    .map(|(header, hash)| SealedHeader::new(header, hash)))
1349            },
1350            predicate,
1351        )
1352    }
1353}
1354
1355impl<N: NodePrimitives> BlockHashReader for StaticFileProvider<N> {
1356    fn block_hash(&self, num: u64) -> ProviderResult<Option<B256>> {
1357        self.get_segment_provider_from_block(StaticFileSegment::Headers, num, None)?.block_hash(num)
1358    }
1359
1360    fn canonical_hashes_range(
1361        &self,
1362        start: BlockNumber,
1363        end: BlockNumber,
1364    ) -> ProviderResult<Vec<B256>> {
1365        self.fetch_range_with_predicate(
1366            StaticFileSegment::Headers,
1367            start..end,
1368            |cursor, number| cursor.get_one::<BlockHashMask>(number.into()),
1369            |_| true,
1370        )
1371    }
1372}
1373
1374impl<N: NodePrimitives<SignedTx: Value + SignedTransaction, Receipt: Value>> ReceiptProvider
1375    for StaticFileProvider<N>
1376{
1377    type Receipt = N::Receipt;
1378
1379    fn receipt(&self, num: TxNumber) -> ProviderResult<Option<Self::Receipt>> {
1380        self.get_segment_provider_from_transaction(StaticFileSegment::Receipts, num, None)
1381            .and_then(|provider| provider.receipt(num))
1382            .or_else(|err| {
1383                if let ProviderError::MissingStaticFileTx(_, _) = err {
1384                    Ok(None)
1385                } else {
1386                    Err(err)
1387                }
1388            })
1389    }
1390
1391    fn receipt_by_hash(&self, hash: TxHash) -> ProviderResult<Option<Self::Receipt>> {
1392        if let Some(num) = self.transaction_id(hash)? {
1393            return self.receipt(num)
1394        }
1395        Ok(None)
1396    }
1397
1398    fn receipts_by_block(
1399        &self,
1400        _block: BlockHashOrNumber,
1401    ) -> ProviderResult<Option<Vec<Self::Receipt>>> {
1402        unreachable!()
1403    }
1404
1405    fn receipts_by_tx_range(
1406        &self,
1407        range: impl RangeBounds<TxNumber>,
1408    ) -> ProviderResult<Vec<Self::Receipt>> {
1409        self.fetch_range_with_predicate(
1410            StaticFileSegment::Receipts,
1411            to_range(range),
1412            |cursor, number| cursor.get_one::<ReceiptMask<Self::Receipt>>(number.into()),
1413            |_| true,
1414        )
1415    }
1416}
1417
1418impl<N: FullNodePrimitives<SignedTx: Value, Receipt: Value, BlockHeader: Value>>
1419    TransactionsProviderExt for StaticFileProvider<N>
1420{
1421    fn transaction_hashes_by_range(
1422        &self,
1423        tx_range: Range<TxNumber>,
1424    ) -> ProviderResult<Vec<(TxHash, TxNumber)>> {
1425        let tx_range_size = (tx_range.end - tx_range.start) as usize;
1426
1427        // Transactions are different size, so chunks will not all take the same processing time. If
1428        // chunks are too big, there will be idle threads waiting for work. Choosing an
1429        // arbitrary smaller value to make sure it doesn't happen.
1430        let chunk_size = 100;
1431
1432        // iterator over the chunks
1433        let chunks = tx_range
1434            .clone()
1435            .step_by(chunk_size)
1436            .map(|start| start..std::cmp::min(start + chunk_size as u64, tx_range.end));
1437        let mut channels = Vec::with_capacity(tx_range_size.div_ceil(chunk_size));
1438
1439        for chunk_range in chunks {
1440            let (channel_tx, channel_rx) = mpsc::channel();
1441            channels.push(channel_rx);
1442
1443            let manager = self.clone();
1444
1445            // Spawn the task onto the global rayon pool
1446            // This task will send the results through the channel after it has calculated
1447            // the hash.
1448            rayon::spawn(move || {
1449                let mut rlp_buf = Vec::with_capacity(128);
1450                let _ = manager.fetch_range_with_predicate(
1451                    StaticFileSegment::Transactions,
1452                    chunk_range,
1453                    |cursor, number| {
1454                        Ok(cursor
1455                            .get_one::<TransactionMask<Self::Transaction>>(number.into())?
1456                            .map(|transaction| {
1457                                rlp_buf.clear();
1458                                let _ = channel_tx
1459                                    .send(calculate_hash((number, transaction), &mut rlp_buf));
1460                            }))
1461                    },
1462                    |_| true,
1463                );
1464            });
1465        }
1466
1467        let mut tx_list = Vec::with_capacity(tx_range_size);
1468
1469        // Iterate over channels and append the tx hashes unsorted
1470        for channel in channels {
1471            while let Ok(tx) = channel.recv() {
1472                let (tx_hash, tx_id) = tx.map_err(|boxed| *boxed)?;
1473                tx_list.push((tx_hash, tx_id));
1474            }
1475        }
1476
1477        Ok(tx_list)
1478    }
1479}
1480
1481impl<N: NodePrimitives<SignedTx: Decompress + SignedTransaction>> TransactionsProvider
1482    for StaticFileProvider<N>
1483{
1484    type Transaction = N::SignedTx;
1485
1486    fn transaction_id(&self, tx_hash: TxHash) -> ProviderResult<Option<TxNumber>> {
1487        self.find_static_file(StaticFileSegment::Transactions, |jar_provider| {
1488            let mut cursor = jar_provider.cursor()?;
1489            if cursor
1490                .get_one::<TransactionMask<Self::Transaction>>((&tx_hash).into())?
1491                .and_then(|tx| (tx.trie_hash() == tx_hash).then_some(tx))
1492                .is_some()
1493            {
1494                Ok(cursor.number())
1495            } else {
1496                Ok(None)
1497            }
1498        })
1499    }
1500
1501    fn transaction_by_id(&self, num: TxNumber) -> ProviderResult<Option<Self::Transaction>> {
1502        self.get_segment_provider_from_transaction(StaticFileSegment::Transactions, num, None)
1503            .and_then(|provider| provider.transaction_by_id(num))
1504            .or_else(|err| {
1505                if let ProviderError::MissingStaticFileTx(_, _) = err {
1506                    Ok(None)
1507                } else {
1508                    Err(err)
1509                }
1510            })
1511    }
1512
1513    fn transaction_by_id_unhashed(
1514        &self,
1515        num: TxNumber,
1516    ) -> ProviderResult<Option<Self::Transaction>> {
1517        self.get_segment_provider_from_transaction(StaticFileSegment::Transactions, num, None)
1518            .and_then(|provider| provider.transaction_by_id_unhashed(num))
1519            .or_else(|err| {
1520                if let ProviderError::MissingStaticFileTx(_, _) = err {
1521                    Ok(None)
1522                } else {
1523                    Err(err)
1524                }
1525            })
1526    }
1527
1528    fn transaction_by_hash(&self, hash: TxHash) -> ProviderResult<Option<Self::Transaction>> {
1529        self.find_static_file(StaticFileSegment::Transactions, |jar_provider| {
1530            Ok(jar_provider
1531                .cursor()?
1532                .get_one::<TransactionMask<Self::Transaction>>((&hash).into())?
1533                .and_then(|tx| (tx.trie_hash() == hash).then_some(tx)))
1534        })
1535    }
1536
1537    fn transaction_by_hash_with_meta(
1538        &self,
1539        _hash: TxHash,
1540    ) -> ProviderResult<Option<(Self::Transaction, TransactionMeta)>> {
1541        // Required data not present in static_files
1542        Err(ProviderError::UnsupportedProvider)
1543    }
1544
1545    fn transaction_block(&self, _id: TxNumber) -> ProviderResult<Option<BlockNumber>> {
1546        // Required data not present in static_files
1547        Err(ProviderError::UnsupportedProvider)
1548    }
1549
1550    fn transactions_by_block(
1551        &self,
1552        _block_id: BlockHashOrNumber,
1553    ) -> ProviderResult<Option<Vec<Self::Transaction>>> {
1554        // Required data not present in static_files
1555        Err(ProviderError::UnsupportedProvider)
1556    }
1557
1558    fn transactions_by_block_range(
1559        &self,
1560        _range: impl RangeBounds<BlockNumber>,
1561    ) -> ProviderResult<Vec<Vec<Self::Transaction>>> {
1562        // Required data not present in static_files
1563        Err(ProviderError::UnsupportedProvider)
1564    }
1565
1566    fn transactions_by_tx_range(
1567        &self,
1568        range: impl RangeBounds<TxNumber>,
1569    ) -> ProviderResult<Vec<Self::Transaction>> {
1570        self.fetch_range_with_predicate(
1571            StaticFileSegment::Transactions,
1572            to_range(range),
1573            |cursor, number| cursor.get_one::<TransactionMask<Self::Transaction>>(number.into()),
1574            |_| true,
1575        )
1576    }
1577
1578    fn senders_by_tx_range(
1579        &self,
1580        range: impl RangeBounds<TxNumber>,
1581    ) -> ProviderResult<Vec<Address>> {
1582        let txes = self.transactions_by_tx_range(range)?;
1583        Ok(reth_primitives_traits::transaction::recover::recover_signers(&txes)?)
1584    }
1585
1586    fn transaction_sender(&self, id: TxNumber) -> ProviderResult<Option<Address>> {
1587        match self.transaction_by_id_unhashed(id)? {
1588            Some(tx) => Ok(tx.recover_signer().ok()),
1589            None => Ok(None),
1590        }
1591    }
1592}
1593
1594/* Cannot be successfully implemented but must exist for trait requirements */
1595
1596impl<N: NodePrimitives> BlockNumReader for StaticFileProvider<N> {
1597    fn chain_info(&self) -> ProviderResult<ChainInfo> {
1598        // Required data not present in static_files
1599        Err(ProviderError::UnsupportedProvider)
1600    }
1601
1602    fn best_block_number(&self) -> ProviderResult<BlockNumber> {
1603        // Required data not present in static_files
1604        Err(ProviderError::UnsupportedProvider)
1605    }
1606
1607    fn last_block_number(&self) -> ProviderResult<BlockNumber> {
1608        // Required data not present in static_files
1609        Err(ProviderError::UnsupportedProvider)
1610    }
1611
1612    fn block_number(&self, _hash: B256) -> ProviderResult<Option<BlockNumber>> {
1613        // Required data not present in static_files
1614        Err(ProviderError::UnsupportedProvider)
1615    }
1616}
1617
1618impl<N: FullNodePrimitives<SignedTx: Value, Receipt: Value, BlockHeader: Value>> BlockReader
1619    for StaticFileProvider<N>
1620{
1621    type Block = N::Block;
1622
1623    fn find_block_by_hash(
1624        &self,
1625        _hash: B256,
1626        _source: BlockSource,
1627    ) -> ProviderResult<Option<Self::Block>> {
1628        // Required data not present in static_files
1629        Err(ProviderError::UnsupportedProvider)
1630    }
1631
1632    fn block(&self, _id: BlockHashOrNumber) -> ProviderResult<Option<Self::Block>> {
1633        // Required data not present in static_files
1634        Err(ProviderError::UnsupportedProvider)
1635    }
1636
1637    fn pending_block(&self) -> ProviderResult<Option<SealedBlock<Self::Block>>> {
1638        // Required data not present in static_files
1639        Err(ProviderError::UnsupportedProvider)
1640    }
1641
1642    fn pending_block_with_senders(&self) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
1643        // Required data not present in static_files
1644        Err(ProviderError::UnsupportedProvider)
1645    }
1646
1647    fn pending_block_and_receipts(
1648        &self,
1649    ) -> ProviderResult<Option<(SealedBlock<Self::Block>, Vec<Self::Receipt>)>> {
1650        // Required data not present in static_files
1651        Err(ProviderError::UnsupportedProvider)
1652    }
1653
1654    fn recovered_block(
1655        &self,
1656        _id: BlockHashOrNumber,
1657        _transaction_kind: TransactionVariant,
1658    ) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
1659        // Required data not present in static_files
1660        Err(ProviderError::UnsupportedProvider)
1661    }
1662
1663    fn sealed_block_with_senders(
1664        &self,
1665        _id: BlockHashOrNumber,
1666        _transaction_kind: TransactionVariant,
1667    ) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
1668        // Required data not present in static_files
1669        Err(ProviderError::UnsupportedProvider)
1670    }
1671
1672    fn block_range(&self, _range: RangeInclusive<BlockNumber>) -> ProviderResult<Vec<Self::Block>> {
1673        // Required data not present in static_files
1674        Err(ProviderError::UnsupportedProvider)
1675    }
1676
1677    fn block_with_senders_range(
1678        &self,
1679        _range: RangeInclusive<BlockNumber>,
1680    ) -> ProviderResult<Vec<RecoveredBlock<Self::Block>>> {
1681        Err(ProviderError::UnsupportedProvider)
1682    }
1683
1684    fn recovered_block_range(
1685        &self,
1686        _range: RangeInclusive<BlockNumber>,
1687    ) -> ProviderResult<Vec<RecoveredBlock<Self::Block>>> {
1688        Err(ProviderError::UnsupportedProvider)
1689    }
1690}
1691
1692impl<N: NodePrimitives> WithdrawalsProvider for StaticFileProvider<N> {
1693    fn withdrawals_by_block(
1694        &self,
1695        id: BlockHashOrNumber,
1696        timestamp: u64,
1697    ) -> ProviderResult<Option<Withdrawals>> {
1698        if let Some(num) = id.as_number() {
1699            return self
1700                .get_segment_provider_from_block(StaticFileSegment::BlockMeta, num, None)
1701                .and_then(|provider| provider.withdrawals_by_block(id, timestamp))
1702                .or_else(|err| {
1703                    if let ProviderError::MissingStaticFileBlock(_, _) = err {
1704                        Ok(None)
1705                    } else {
1706                        Err(err)
1707                    }
1708                })
1709        }
1710        // Only accepts block number queries
1711        Err(ProviderError::UnsupportedProvider)
1712    }
1713}
1714
1715impl<N: FullNodePrimitives<BlockHeader: Value>> OmmersProvider for StaticFileProvider<N> {
1716    fn ommers(&self, id: BlockHashOrNumber) -> ProviderResult<Option<Vec<Self::Header>>> {
1717        if let Some(num) = id.as_number() {
1718            return self
1719                .get_segment_provider_from_block(StaticFileSegment::BlockMeta, num, None)
1720                .and_then(|provider| provider.ommers(id))
1721                .or_else(|err| {
1722                    if let ProviderError::MissingStaticFileBlock(_, _) = err {
1723                        Ok(None)
1724                    } else {
1725                        Err(err)
1726                    }
1727                })
1728        }
1729        // Only accepts block number queries
1730        Err(ProviderError::UnsupportedProvider)
1731    }
1732}
1733
1734impl<N: NodePrimitives> BlockBodyIndicesProvider for StaticFileProvider<N> {
1735    fn block_body_indices(&self, num: u64) -> ProviderResult<Option<StoredBlockBodyIndices>> {
1736        self.get_segment_provider_from_block(StaticFileSegment::BlockMeta, num, None)
1737            .and_then(|provider| provider.block_body_indices(num))
1738            .or_else(|err| {
1739                if let ProviderError::MissingStaticFileBlock(_, _) = err {
1740                    Ok(None)
1741                } else {
1742                    Err(err)
1743                }
1744            })
1745    }
1746
1747    fn block_body_indices_range(
1748        &self,
1749        range: RangeInclusive<BlockNumber>,
1750    ) -> ProviderResult<Vec<StoredBlockBodyIndices>> {
1751        self.fetch_range_with_predicate(
1752            StaticFileSegment::BlockMeta,
1753            *range.start()..*range.end() + 1,
1754            |cursor, number| cursor.get_one::<BodyIndicesMask>(number.into()),
1755            |_| true,
1756        )
1757    }
1758}
1759
1760impl<N: NodePrimitives> StatsReader for StaticFileProvider<N> {
1761    fn count_entries<T: Table>(&self) -> ProviderResult<usize> {
1762        match T::NAME {
1763            tables::CanonicalHeaders::NAME |
1764            tables::Headers::<Header>::NAME |
1765            tables::HeaderTerminalDifficulties::NAME => Ok(self
1766                .get_highest_static_file_block(StaticFileSegment::Headers)
1767                .map(|block| block + 1)
1768                .unwrap_or_default()
1769                as usize),
1770            tables::Receipts::<Receipt>::NAME => Ok(self
1771                .get_highest_static_file_tx(StaticFileSegment::Receipts)
1772                .map(|receipts| receipts + 1)
1773                .unwrap_or_default() as usize),
1774            tables::Transactions::<TransactionSigned>::NAME => Ok(self
1775                .get_highest_static_file_tx(StaticFileSegment::Transactions)
1776                .map(|txs| txs + 1)
1777                .unwrap_or_default()
1778                as usize),
1779            _ => Err(ProviderError::UnsupportedProvider),
1780        }
1781    }
1782}
1783
1784/// Calculates the tx hash for the given transaction and its id.
1785#[inline]
1786fn calculate_hash<T>(
1787    entry: (TxNumber, T),
1788    rlp_buf: &mut Vec<u8>,
1789) -> Result<(B256, TxNumber), Box<ProviderError>>
1790where
1791    T: Encodable2718,
1792{
1793    let (tx_id, tx) = entry;
1794    tx.encode_2718(rlp_buf);
1795    Ok((keccak256(rlp_buf), tx_id))
1796}