reth_provider/providers/static_file/
manager.rs

1use super::{
2    metrics::StaticFileProviderMetrics, writer::StaticFileWriters, LoadedJar,
3    StaticFileJarProvider, StaticFileProviderRW, StaticFileProviderRWRefMut,
4};
5use crate::{
6    to_range, BlockHashReader, BlockNumReader, BlockReader, BlockSource, HeaderProvider,
7    ReceiptProvider, StageCheckpointReader, StatsReader, TransactionVariant, TransactionsProvider,
8    TransactionsProviderExt,
9};
10use alloy_consensus::{
11    transaction::{SignerRecoverable, TransactionMeta},
12    Header,
13};
14use alloy_eips::{eip2718::Encodable2718, BlockHashOrNumber};
15use alloy_primitives::{
16    b256, keccak256, Address, BlockHash, BlockNumber, TxHash, TxNumber, B256, U256,
17};
18use dashmap::DashMap;
19use notify::{RecommendedWatcher, RecursiveMode, Watcher};
20use parking_lot::RwLock;
21use reth_chainspec::{ChainInfo, ChainSpecProvider, EthChainSpec, NamedChain};
22use reth_db::{
23    lockfile::StorageLock,
24    static_file::{
25        iter_static_files, BlockHashMask, HeaderMask, HeaderWithHashMask, ReceiptMask,
26        StaticFileCursor, TDWithHashMask, TransactionMask,
27    },
28};
29use reth_db_api::{
30    cursor::DbCursorRO,
31    models::StoredBlockBodyIndices,
32    table::{Decompress, Table, Value},
33    tables,
34    transaction::DbTx,
35};
36use reth_ethereum_primitives::{Receipt, TransactionSigned};
37use reth_nippy_jar::{NippyJar, NippyJarChecker, CONFIG_FILE_EXTENSION};
38use reth_node_types::{FullNodePrimitives, NodePrimitives};
39use reth_primitives_traits::{RecoveredBlock, SealedHeader, SignedTransaction};
40use reth_stages_types::{PipelineTarget, StageId};
41use reth_static_file_types::{
42    find_fixed_range, HighestStaticFiles, SegmentHeader, SegmentRangeInclusive, StaticFileSegment,
43    DEFAULT_BLOCKS_PER_STATIC_FILE,
44};
45use reth_storage_api::{BlockBodyIndicesProvider, DBProvider};
46use reth_storage_errors::provider::{ProviderError, ProviderResult};
47use std::{
48    collections::{hash_map::Entry, BTreeMap, HashMap},
49    fmt::Debug,
50    marker::PhantomData,
51    ops::{Deref, Range, RangeBounds, RangeInclusive},
52    path::{Path, PathBuf},
53    sync::{atomic::AtomicU64, mpsc, Arc},
54};
55use tracing::{debug, info, trace, warn};
56
57/// Alias type for a map that can be queried for block ranges from a transaction
58/// segment respectively. It uses `TxNumber` to represent the transaction end of a static file
59/// range.
60type SegmentRanges = HashMap<StaticFileSegment, BTreeMap<TxNumber, SegmentRangeInclusive>>;
61
62/// Access mode on a static file provider. RO/RW.
63#[derive(Debug, Default, PartialEq, Eq)]
64pub enum StaticFileAccess {
65    /// Read-only access.
66    #[default]
67    RO,
68    /// Read-write access.
69    RW,
70}
71
72impl StaticFileAccess {
73    /// Returns `true` if read-only access.
74    pub const fn is_read_only(&self) -> bool {
75        matches!(self, Self::RO)
76    }
77
78    /// Returns `true` if read-write access.
79    pub const fn is_read_write(&self) -> bool {
80        matches!(self, Self::RW)
81    }
82}
83
84/// [`StaticFileProvider`] manages all existing [`StaticFileJarProvider`].
85///
86/// "Static files" contain immutable chain history data, such as:
87///  - transactions
88///  - headers
89///  - receipts
90///
91/// This provider type is responsible for reading and writing to static files.
92#[derive(Debug)]
93pub struct StaticFileProvider<N>(pub(crate) Arc<StaticFileProviderInner<N>>);
94
95impl<N> Clone for StaticFileProvider<N> {
96    fn clone(&self) -> Self {
97        Self(self.0.clone())
98    }
99}
100
101impl<N: NodePrimitives> StaticFileProvider<N> {
102    /// Creates a new [`StaticFileProvider`] with the given [`StaticFileAccess`].
103    fn new(path: impl AsRef<Path>, access: StaticFileAccess) -> ProviderResult<Self> {
104        let provider = Self(Arc::new(StaticFileProviderInner::new(path, access)?));
105        provider.initialize_index()?;
106        Ok(provider)
107    }
108
109    /// Creates a new [`StaticFileProvider`] with read-only access.
110    ///
111    /// Set `watch_directory` to `true` to track the most recent changes in static files. Otherwise,
112    /// new data won't be detected or queryable.
113    ///
114    /// Watching is recommended if the read-only provider is used on a directory that an active node
115    /// instance is modifying.
116    ///
117    /// See also [`StaticFileProvider::watch_directory`].
118    pub fn read_only(path: impl AsRef<Path>, watch_directory: bool) -> ProviderResult<Self> {
119        let provider = Self::new(path, StaticFileAccess::RO)?;
120
121        if watch_directory {
122            provider.watch_directory();
123        }
124
125        Ok(provider)
126    }
127
128    /// Creates a new [`StaticFileProvider`] with read-write access.
129    pub fn read_write(path: impl AsRef<Path>) -> ProviderResult<Self> {
130        Self::new(path, StaticFileAccess::RW)
131    }
132
133    /// Watches the directory for changes and updates the in-memory index when modifications
134    /// are detected.
135    ///
136    /// This may be necessary, since a non-node process that owns a [`StaticFileProvider`] does not
137    /// receive `update_index` notifications from a node that appends/truncates data.
138    pub fn watch_directory(&self) {
139        let provider = self.clone();
140        std::thread::spawn(move || {
141            let (tx, rx) = std::sync::mpsc::channel();
142            let mut watcher = RecommendedWatcher::new(
143                move |res| tx.send(res).unwrap(),
144                notify::Config::default(),
145            )
146            .expect("failed to create watcher");
147
148            watcher
149                .watch(&provider.path, RecursiveMode::NonRecursive)
150                .expect("failed to watch path");
151
152            // Some backends send repeated modified events
153            let mut last_event_timestamp = None;
154
155            while let Ok(res) = rx.recv() {
156                match res {
157                    Ok(event) => {
158                        // We only care about modified data events
159                        if !matches!(
160                            event.kind,
161                            notify::EventKind::Modify(_) |
162                                notify::EventKind::Create(_) |
163                                notify::EventKind::Remove(_)
164                        ) {
165                            continue
166                        }
167
168                        // We only trigger a re-initialization if a configuration file was
169                        // modified. This means that a
170                        // static_file_provider.commit() was called on the node after
171                        // appending/truncating rows
172                        for segment in event.paths {
173                            // Ensure it's a file with the .conf extension
174                            if segment
175                                .extension()
176                                .is_none_or(|s| s.to_str() != Some(CONFIG_FILE_EXTENSION))
177                            {
178                                continue
179                            }
180
181                            // Ensure it's well formatted static file name
182                            if StaticFileSegment::parse_filename(
183                                &segment.file_stem().expect("qed").to_string_lossy(),
184                            )
185                            .is_none()
186                            {
187                                continue
188                            }
189
190                            // If we can read the metadata and modified timestamp, ensure this is
191                            // not an old or repeated event.
192                            if let Ok(current_modified_timestamp) =
193                                std::fs::metadata(&segment).and_then(|m| m.modified())
194                            {
195                                if last_event_timestamp.is_some_and(|last_timestamp| {
196                                    last_timestamp >= current_modified_timestamp
197                                }) {
198                                    continue
199                                }
200                                last_event_timestamp = Some(current_modified_timestamp);
201                            }
202
203                            info!(target: "providers::static_file", updated_file = ?segment.file_stem(), "re-initializing static file provider index");
204                            if let Err(err) = provider.initialize_index() {
205                                warn!(target: "providers::static_file", "failed to re-initialize index: {err}");
206                            }
207                            break
208                        }
209                    }
210
211                    Err(err) => warn!(target: "providers::watcher", "watch error: {err:?}"),
212                }
213            }
214        });
215    }
216}
217
218impl<N: NodePrimitives> Deref for StaticFileProvider<N> {
219    type Target = StaticFileProviderInner<N>;
220
221    fn deref(&self) -> &Self::Target {
222        &self.0
223    }
224}
225
226/// [`StaticFileProviderInner`] manages all existing [`StaticFileJarProvider`].
227#[derive(Debug)]
228pub struct StaticFileProviderInner<N> {
229    /// Maintains a map which allows for concurrent access to different `NippyJars`, over different
230    /// segments and ranges.
231    map: DashMap<(BlockNumber, StaticFileSegment), LoadedJar>,
232    /// Min static file range for each segment.
233    /// This index is initialized on launch to keep track of the lowest, non-expired static file
234    /// per segment.
235    ///
236    /// This tracks the lowest static file per segment together with the block range in that
237    /// file. E.g. static file is batched in 500k block intervals then the lowest static file
238    /// is [0..499K], and the block range is start = 0, end = 499K.
239    /// This index is mainly used to History expiry, which targets transactions, e.g. pre-merge
240    /// history expiry would lead to removing all static files below the merge height.
241    static_files_min_block: RwLock<HashMap<StaticFileSegment, SegmentRangeInclusive>>,
242    /// This is an additional index that tracks the expired height, this will track the highest
243    /// block number that has been expired (missing). The first, non expired block is
244    /// `expired_history_height + 1`.
245    ///
246    /// This is effectively the transaction range that has been expired:
247    /// [`StaticFileProvider::delete_transactions_below`] and mirrors
248    /// `static_files_min_block[transactions] - blocks_per_file`.
249    ///
250    /// This additional tracker exists for more efficient lookups because the node must be aware of
251    /// the expired height.
252    earliest_history_height: AtomicU64,
253    /// Max static file block for each segment
254    static_files_max_block: RwLock<HashMap<StaticFileSegment, u64>>,
255    /// Available static file block ranges on disk indexed by max transactions.
256    static_files_tx_index: RwLock<SegmentRanges>,
257    /// Directory where `static_files` are located
258    path: PathBuf,
259    /// Maintains a writer set of [`StaticFileSegment`].
260    writers: StaticFileWriters<N>,
261    /// Metrics for the static files.
262    metrics: Option<Arc<StaticFileProviderMetrics>>,
263    /// Access rights of the provider.
264    access: StaticFileAccess,
265    /// Number of blocks per file.
266    blocks_per_file: u64,
267    /// Write lock for when access is [`StaticFileAccess::RW`].
268    _lock_file: Option<StorageLock>,
269    /// Node primitives
270    _pd: PhantomData<N>,
271}
272
273impl<N: NodePrimitives> StaticFileProviderInner<N> {
274    /// Creates a new [`StaticFileProviderInner`].
275    fn new(path: impl AsRef<Path>, access: StaticFileAccess) -> ProviderResult<Self> {
276        let _lock_file = if access.is_read_write() {
277            StorageLock::try_acquire(path.as_ref()).map_err(ProviderError::other)?.into()
278        } else {
279            None
280        };
281
282        let provider = Self {
283            map: Default::default(),
284            writers: Default::default(),
285            static_files_min_block: Default::default(),
286            earliest_history_height: Default::default(),
287            static_files_max_block: Default::default(),
288            static_files_tx_index: Default::default(),
289            path: path.as_ref().to_path_buf(),
290            metrics: None,
291            access,
292            blocks_per_file: DEFAULT_BLOCKS_PER_STATIC_FILE,
293            _lock_file,
294            _pd: Default::default(),
295        };
296
297        Ok(provider)
298    }
299
300    pub const fn is_read_only(&self) -> bool {
301        self.access.is_read_only()
302    }
303
304    /// Each static file has a fixed number of blocks. This gives out the range where the requested
305    /// block is positioned.
306    pub const fn find_fixed_range(&self, block: BlockNumber) -> SegmentRangeInclusive {
307        find_fixed_range(block, self.blocks_per_file)
308    }
309}
310
311impl<N: NodePrimitives> StaticFileProvider<N> {
312    /// Set a custom number of blocks per file.
313    #[cfg(any(test, feature = "test-utils"))]
314    pub fn with_custom_blocks_per_file(self, blocks_per_file: u64) -> Self {
315        let mut provider =
316            Arc::try_unwrap(self.0).expect("should be called when initializing only");
317        provider.blocks_per_file = blocks_per_file;
318        Self(Arc::new(provider))
319    }
320
321    /// Enables metrics on the [`StaticFileProvider`].
322    pub fn with_metrics(self) -> Self {
323        let mut provider =
324            Arc::try_unwrap(self.0).expect("should be called when initializing only");
325        provider.metrics = Some(Arc::new(StaticFileProviderMetrics::default()));
326        Self(Arc::new(provider))
327    }
328
329    /// Reports metrics for the static files.
330    pub fn report_metrics(&self) -> ProviderResult<()> {
331        let Some(metrics) = &self.metrics else { return Ok(()) };
332
333        let static_files = iter_static_files(&self.path).map_err(ProviderError::other)?;
334        for (segment, ranges) in static_files {
335            let mut entries = 0;
336            let mut size = 0;
337
338            for (block_range, _) in &ranges {
339                let fixed_block_range = self.find_fixed_range(block_range.start());
340                let jar_provider = self
341                    .get_segment_provider(segment, || Some(fixed_block_range), None)?
342                    .ok_or_else(|| {
343                        ProviderError::MissingStaticFileBlock(segment, block_range.start())
344                    })?;
345
346                entries += jar_provider.rows();
347
348                let data_size = reth_fs_util::metadata(jar_provider.data_path())
349                    .map(|metadata| metadata.len())
350                    .unwrap_or_default();
351                let index_size = reth_fs_util::metadata(jar_provider.index_path())
352                    .map(|metadata| metadata.len())
353                    .unwrap_or_default();
354                let offsets_size = reth_fs_util::metadata(jar_provider.offsets_path())
355                    .map(|metadata| metadata.len())
356                    .unwrap_or_default();
357                let config_size = reth_fs_util::metadata(jar_provider.config_path())
358                    .map(|metadata| metadata.len())
359                    .unwrap_or_default();
360
361                size += data_size + index_size + offsets_size + config_size;
362            }
363
364            metrics.record_segment(segment, size, ranges.len(), entries);
365        }
366
367        Ok(())
368    }
369
370    /// Gets the [`StaticFileJarProvider`] of the requested segment and block.
371    pub fn get_segment_provider_from_block(
372        &self,
373        segment: StaticFileSegment,
374        block: BlockNumber,
375        path: Option<&Path>,
376    ) -> ProviderResult<StaticFileJarProvider<'_, N>> {
377        self.get_segment_provider(
378            segment,
379            || self.get_segment_ranges_from_block(segment, block),
380            path,
381        )?
382        .ok_or(ProviderError::MissingStaticFileBlock(segment, block))
383    }
384
385    /// Gets the [`StaticFileJarProvider`] of the requested segment and transaction.
386    pub fn get_segment_provider_from_transaction(
387        &self,
388        segment: StaticFileSegment,
389        tx: TxNumber,
390        path: Option<&Path>,
391    ) -> ProviderResult<StaticFileJarProvider<'_, N>> {
392        self.get_segment_provider(
393            segment,
394            || self.get_segment_ranges_from_transaction(segment, tx),
395            path,
396        )?
397        .ok_or(ProviderError::MissingStaticFileTx(segment, tx))
398    }
399
400    /// Gets the [`StaticFileJarProvider`] of the requested segment and block or transaction.
401    ///
402    /// `fn_range` should make sure the range goes through `find_fixed_range`.
403    pub fn get_segment_provider(
404        &self,
405        segment: StaticFileSegment,
406        fn_range: impl Fn() -> Option<SegmentRangeInclusive>,
407        path: Option<&Path>,
408    ) -> ProviderResult<Option<StaticFileJarProvider<'_, N>>> {
409        // If we have a path, then get the block range from its name.
410        // Otherwise, check `self.available_static_files`
411        let block_range = match path {
412            Some(path) => StaticFileSegment::parse_filename(
413                &path
414                    .file_name()
415                    .ok_or_else(|| {
416                        ProviderError::MissingStaticFilePath(segment, path.to_path_buf())
417                    })?
418                    .to_string_lossy(),
419            )
420            .and_then(|(parsed_segment, block_range)| {
421                if parsed_segment == segment {
422                    return Some(block_range)
423                }
424                None
425            }),
426            None => fn_range(),
427        };
428
429        // Return cached `LoadedJar` or insert it for the first time, and then, return it.
430        if let Some(block_range) = block_range {
431            return Ok(Some(self.get_or_create_jar_provider(segment, &block_range)?))
432        }
433
434        Ok(None)
435    }
436
437    /// Given a segment and block range it removes the cached provider from the map.
438    ///
439    /// CAUTION: cached provider should be dropped before calling this or IT WILL deadlock.
440    pub fn remove_cached_provider(
441        &self,
442        segment: StaticFileSegment,
443        fixed_block_range_end: BlockNumber,
444    ) {
445        self.map.remove(&(fixed_block_range_end, segment));
446    }
447
448    /// This handles history expiry by deleting all transaction static files below the given block.
449    ///
450    /// For example if block is 1M and the blocks per file are 500K this will delete all individual
451    /// files below 1M, so 0-499K and 500K-999K.
452    ///
453    /// This will not delete the file that contains the block itself, because files can only be
454    /// removed entirely.
455    pub fn delete_transactions_below(&self, block: BlockNumber) -> ProviderResult<()> {
456        // Nothing to delete if block is 0.
457        if block == 0 {
458            return Ok(())
459        }
460
461        loop {
462            let Some(block_height) =
463                self.get_lowest_static_file_block(StaticFileSegment::Transactions)
464            else {
465                return Ok(())
466            };
467
468            if block_height >= block {
469                return Ok(())
470            }
471
472            debug!(
473                target: "provider::static_file",
474                ?block_height,
475                "Deleting transaction static file below block"
476            );
477
478            // now we need to wipe the static file, this will take care of updating the index and
479            // advance the lowest tracked block height for the transactions segment.
480            self.delete_jar(StaticFileSegment::Transactions, block_height)
481                .inspect_err(|err| {
482                    warn!( target: "provider::static_file", %block_height, ?err, "Failed to delete transaction static file below block")
483                })
484                ?;
485        }
486    }
487
488    /// Given a segment and block, it deletes the jar and all files from the respective block range.
489    ///
490    /// CAUTION: destructive. Deletes files on disk.
491    ///
492    /// This will re-initialize the index after deletion, so all files are tracked.
493    pub fn delete_jar(&self, segment: StaticFileSegment, block: BlockNumber) -> ProviderResult<()> {
494        let fixed_block_range = self.find_fixed_range(block);
495        let key = (fixed_block_range.end(), segment);
496        let jar = if let Some((_, jar)) = self.map.remove(&key) {
497            jar.jar
498        } else {
499            let file = self.path.join(segment.filename(&fixed_block_range));
500            debug!(
501                target: "provider::static_file",
502                ?file,
503                ?fixed_block_range,
504                ?block,
505                "Loading static file jar for deletion"
506            );
507            NippyJar::<SegmentHeader>::load(&file).map_err(ProviderError::other)?
508        };
509
510        jar.delete().map_err(ProviderError::other)?;
511
512        self.initialize_index()?;
513
514        Ok(())
515    }
516
517    /// Given a segment and block range it returns a cached
518    /// [`StaticFileJarProvider`]. TODO(joshie): we should check the size and pop N if there's too
519    /// many.
520    fn get_or_create_jar_provider(
521        &self,
522        segment: StaticFileSegment,
523        fixed_block_range: &SegmentRangeInclusive,
524    ) -> ProviderResult<StaticFileJarProvider<'_, N>> {
525        let key = (fixed_block_range.end(), segment);
526
527        // Avoid using `entry` directly to avoid a write lock in the common case.
528        trace!(target: "provider::static_file", ?segment, ?fixed_block_range, "Getting provider");
529        let mut provider: StaticFileJarProvider<'_, N> = if let Some(jar) = self.map.get(&key) {
530            trace!(target: "provider::static_file", ?segment, ?fixed_block_range, "Jar found in cache");
531            jar.into()
532        } else {
533            trace!(target: "provider::static_file", ?segment, ?fixed_block_range, "Creating jar from scratch");
534            let path = self.path.join(segment.filename(fixed_block_range));
535            let jar = NippyJar::load(&path).map_err(ProviderError::other)?;
536            self.map.entry(key).insert(LoadedJar::new(jar)?).downgrade().into()
537        };
538
539        if let Some(metrics) = &self.metrics {
540            provider = provider.with_metrics(metrics.clone());
541        }
542        Ok(provider)
543    }
544
545    /// Gets a static file segment's block range from the provider inner block
546    /// index.
547    fn get_segment_ranges_from_block(
548        &self,
549        segment: StaticFileSegment,
550        block: u64,
551    ) -> Option<SegmentRangeInclusive> {
552        self.static_files_max_block
553            .read()
554            .get(&segment)
555            .filter(|max| **max >= block)
556            .map(|_| self.find_fixed_range(block))
557    }
558
559    /// Gets a static file segment's fixed block range from the provider inner
560    /// transaction index.
561    fn get_segment_ranges_from_transaction(
562        &self,
563        segment: StaticFileSegment,
564        tx: u64,
565    ) -> Option<SegmentRangeInclusive> {
566        let static_files = self.static_files_tx_index.read();
567        let segment_static_files = static_files.get(&segment)?;
568
569        // It's more probable that the request comes from a newer tx height, so we iterate
570        // the static_files in reverse.
571        let mut static_files_rev_iter = segment_static_files.iter().rev().peekable();
572
573        while let Some((tx_end, block_range)) = static_files_rev_iter.next() {
574            if tx > *tx_end {
575                // request tx is higher than highest static file tx
576                return None
577            }
578            let tx_start = static_files_rev_iter.peek().map(|(tx_end, _)| *tx_end + 1).unwrap_or(0);
579            if tx_start <= tx {
580                return Some(self.find_fixed_range(block_range.end()))
581            }
582        }
583        None
584    }
585
586    /// Updates the inner transaction and block indexes alongside the internal cached providers in
587    /// `self.map`.
588    ///
589    /// Any entry higher than `segment_max_block` will be deleted from the previous structures.
590    ///
591    /// If `segment_max_block` is None it means there's no static file for this segment.
592    pub fn update_index(
593        &self,
594        segment: StaticFileSegment,
595        segment_max_block: Option<BlockNumber>,
596    ) -> ProviderResult<()> {
597        let mut max_block = self.static_files_max_block.write();
598        let mut tx_index = self.static_files_tx_index.write();
599
600        match segment_max_block {
601            Some(segment_max_block) => {
602                // Update the max block for the segment
603                max_block.insert(segment, segment_max_block);
604                let fixed_range = self.find_fixed_range(segment_max_block);
605
606                let jar = NippyJar::<SegmentHeader>::load(
607                    &self.path.join(segment.filename(&fixed_range)),
608                )
609                .map_err(ProviderError::other)?;
610
611                // Updates the tx index by first removing all entries which have a higher
612                // block_start than our current static file.
613                if let Some(tx_range) = jar.user_header().tx_range() {
614                    let tx_end = tx_range.end();
615
616                    // Current block range has the same block start as `fixed_range``, but block end
617                    // might be different if we are still filling this static file.
618                    if let Some(current_block_range) = jar.user_header().block_range().copied() {
619                        // Considering that `update_index` is called when we either append/truncate,
620                        // we are sure that we are handling the latest data
621                        // points.
622                        //
623                        // Here we remove every entry of the index that has a block start higher or
624                        // equal than our current one. This is important in the case
625                        // that we prune a lot of rows resulting in a file (and thus
626                        // a higher block range) deletion.
627                        tx_index
628                            .entry(segment)
629                            .and_modify(|index| {
630                                index.retain(|_, block_range| {
631                                    block_range.start() < fixed_range.start()
632                                });
633                                index.insert(tx_end, current_block_range);
634                            })
635                            .or_insert_with(|| BTreeMap::from([(tx_end, current_block_range)]));
636                    }
637                } else if segment.is_tx_based() {
638                    // The unwinded file has no more transactions/receipts. However, the highest
639                    // block is within this files' block range. We only retain
640                    // entries with block ranges before the current one.
641                    tx_index.entry(segment).and_modify(|index| {
642                        index.retain(|_, block_range| block_range.start() < fixed_range.start());
643                    });
644
645                    // If the index is empty, just remove it.
646                    if tx_index.get(&segment).is_some_and(|index| index.is_empty()) {
647                        tx_index.remove(&segment);
648                    }
649                }
650
651                // Update the cached provider.
652                self.map.insert((fixed_range.end(), segment), LoadedJar::new(jar)?);
653
654                // Delete any cached provider that no longer has an associated jar.
655                self.map.retain(|(end, seg), _| !(*seg == segment && *end > fixed_range.end()));
656            }
657            None => {
658                tx_index.remove(&segment);
659                max_block.remove(&segment);
660            }
661        };
662
663        Ok(())
664    }
665
666    /// Initializes the inner transaction and block index
667    pub fn initialize_index(&self) -> ProviderResult<()> {
668        let mut min_block = self.static_files_min_block.write();
669        let mut max_block = self.static_files_max_block.write();
670        let mut tx_index = self.static_files_tx_index.write();
671
672        min_block.clear();
673        max_block.clear();
674        tx_index.clear();
675
676        for (segment, ranges) in iter_static_files(&self.path).map_err(ProviderError::other)? {
677            // Update first and last block for each segment
678            if let Some((first_block_range, _)) = ranges.first() {
679                min_block.insert(segment, *first_block_range);
680            }
681            if let Some((last_block_range, _)) = ranges.last() {
682                max_block.insert(segment, last_block_range.end());
683            }
684
685            // Update tx -> block_range index
686            for (block_range, tx_range) in ranges {
687                if let Some(tx_range) = tx_range {
688                    let tx_end = tx_range.end();
689
690                    match tx_index.entry(segment) {
691                        Entry::Occupied(mut index) => {
692                            index.get_mut().insert(tx_end, block_range);
693                        }
694                        Entry::Vacant(index) => {
695                            index.insert(BTreeMap::from([(tx_end, block_range)]));
696                        }
697                    };
698                }
699            }
700        }
701
702        // If this is a re-initialization, we need to clear this as well
703        self.map.clear();
704
705        // initialize the expired history height to the lowest static file block
706        if let Some(lowest_range) = min_block.get(&StaticFileSegment::Transactions) {
707            // the earliest height is the lowest available block number
708            self.earliest_history_height
709                .store(lowest_range.start(), std::sync::atomic::Ordering::Relaxed);
710        }
711
712        Ok(())
713    }
714
715    /// Ensures that any broken invariants which cannot be healed on the spot return a pipeline
716    /// target to unwind to.
717    ///
718    /// Two types of consistency checks are done for:
719    ///
720    /// 1) When a static file fails to commit but the underlying data was changed.
721    /// 2) When a static file was committed, but the required database transaction was not.
722    ///
723    /// For 1) it can self-heal if `self.access.is_read_only()` is set to `false`. Otherwise, it
724    /// will return an error.
725    /// For 2) the invariants below are checked, and if broken, might require a pipeline unwind
726    /// to heal.
727    ///
728    /// For each static file segment:
729    /// * the corresponding database table should overlap or have continuity in their keys
730    ///   ([`TxNumber`] or [`BlockNumber`]).
731    /// * its highest block should match the stage checkpoint block number if it's equal or higher
732    ///   than the corresponding database table last entry.
733    ///
734    /// Returns a [`Option`] of [`PipelineTarget::Unwind`] if any healing is further required.
735    ///
736    /// WARNING: No static file writer should be held before calling this function, otherwise it
737    /// will deadlock.
738    pub fn check_consistency<Provider>(
739        &self,
740        provider: &Provider,
741        has_receipt_pruning: bool,
742    ) -> ProviderResult<Option<PipelineTarget>>
743    where
744        Provider: DBProvider + BlockReader + StageCheckpointReader + ChainSpecProvider,
745        N: NodePrimitives<Receipt: Value, BlockHeader: Value, SignedTx: Value>,
746    {
747        // OVM historical import is broken and does not work with this check. It's importing
748        // duplicated receipts resulting in having more receipts than the expected transaction
749        // range.
750        //
751        // If we detect an OVM import was done (block #1 <https://optimistic.etherscan.io/block/1>), skip it.
752        // More on [#11099](https://github.com/paradigmxyz/reth/pull/11099).
753        if provider.chain_spec().is_optimism() &&
754            reth_chainspec::Chain::optimism_mainnet() == provider.chain_spec().chain_id()
755        {
756            // check whether we have the first OVM block: <https://optimistic.etherscan.io/block/0xbee7192e575af30420cae0c7776304ac196077ee72b048970549e4f08e875453>
757            const OVM_HEADER_1_HASH: B256 =
758                b256!("0xbee7192e575af30420cae0c7776304ac196077ee72b048970549e4f08e875453");
759            if provider.block_number(OVM_HEADER_1_HASH)?.is_some() {
760                info!(target: "reth::cli",
761                    "Skipping storage verification for OP mainnet, expected inconsistency in OVM chain"
762                );
763                return Ok(None)
764            }
765        }
766
767        info!(target: "reth::cli", "Verifying storage consistency.");
768
769        let mut unwind_target: Option<BlockNumber> = None;
770        let mut update_unwind_target = |new_target: BlockNumber| {
771            if let Some(target) = unwind_target.as_mut() {
772                *target = (*target).min(new_target);
773            } else {
774                unwind_target = Some(new_target);
775            }
776        };
777
778        for segment in StaticFileSegment::iter() {
779            if has_receipt_pruning && segment.is_receipts() {
780                // Pruned nodes (including full node) do not store receipts as static files.
781                continue
782            }
783
784            if segment.is_receipts() &&
785                (NamedChain::Gnosis == provider.chain_spec().chain_id() ||
786                    NamedChain::Chiado == provider.chain_spec().chain_id())
787            {
788                // Gnosis and Chiado's historical import is broken and does not work with this
789                // check. They are importing receipts along with importing
790                // headers/bodies.
791                continue;
792            }
793
794            let initial_highest_block = self.get_highest_static_file_block(segment);
795
796            //  File consistency is broken if:
797            //
798            // * appending data was interrupted before a config commit, then data file will be
799            //   truncated according to the config.
800            //
801            // * pruning data was interrupted before a config commit, then we have deleted data that
802            //   we are expected to still have. We need to check the Database and unwind everything
803            //   accordingly.
804            if self.access.is_read_only() {
805                self.check_segment_consistency(segment)?;
806            } else {
807                // Fetching the writer will attempt to heal any file level inconsistency.
808                self.latest_writer(segment)?;
809            }
810
811            // Only applies to block-based static files. (Headers)
812            //
813            // The updated `highest_block` may have decreased if we healed from a pruning
814            // interruption.
815            let mut highest_block = self.get_highest_static_file_block(segment);
816            if initial_highest_block != highest_block {
817                info!(
818                    target: "reth::providers::static_file",
819                    ?initial_highest_block,
820                    unwind_target = highest_block,
821                    ?segment,
822                    "Setting unwind target."
823                );
824                update_unwind_target(highest_block.unwrap_or_default());
825            }
826
827            // Only applies to transaction-based static files. (Receipts & Transactions)
828            //
829            // Make sure the last transaction matches the last block from its indices, since a heal
830            // from a pruning interruption might have decreased the number of transactions without
831            // being able to update the last block of the static file segment.
832            let highest_tx = self.get_highest_static_file_tx(segment);
833            if let Some(highest_tx) = highest_tx {
834                let mut last_block = highest_block.unwrap_or_default();
835                loop {
836                    if let Some(indices) = provider.block_body_indices(last_block)? {
837                        if indices.last_tx_num() <= highest_tx {
838                            break
839                        }
840                    } else {
841                        // If the block body indices can not be found, then it means that static
842                        // files is ahead of database, and the `ensure_invariants` check will fix
843                        // it by comparing with stage checkpoints.
844                        break
845                    }
846                    if last_block == 0 {
847                        break
848                    }
849                    last_block -= 1;
850
851                    info!(
852                        target: "reth::providers::static_file",
853                        highest_block = self.get_highest_static_file_block(segment),
854                        unwind_target = last_block,
855                        ?segment,
856                        "Setting unwind target."
857                    );
858                    highest_block = Some(last_block);
859                    update_unwind_target(last_block);
860                }
861            }
862
863            if let Some(unwind) = match segment {
864                StaticFileSegment::Headers => self
865                    .ensure_invariants::<_, tables::Headers<N::BlockHeader>>(
866                        provider,
867                        segment,
868                        highest_block,
869                        highest_block,
870                    )?,
871                StaticFileSegment::Transactions => self
872                    .ensure_invariants::<_, tables::Transactions<N::SignedTx>>(
873                        provider,
874                        segment,
875                        highest_tx,
876                        highest_block,
877                    )?,
878                StaticFileSegment::Receipts => self
879                    .ensure_invariants::<_, tables::Receipts<N::Receipt>>(
880                        provider,
881                        segment,
882                        highest_tx,
883                        highest_block,
884                    )?,
885            } {
886                update_unwind_target(unwind);
887            }
888        }
889
890        Ok(unwind_target.map(PipelineTarget::Unwind))
891    }
892
893    /// Checks consistency of the latest static file segment and throws an error if at fault.
894    /// Read-only.
895    pub fn check_segment_consistency(&self, segment: StaticFileSegment) -> ProviderResult<()> {
896        if let Some(latest_block) = self.get_highest_static_file_block(segment) {
897            let file_path =
898                self.directory().join(segment.filename(&self.find_fixed_range(latest_block)));
899
900            let jar = NippyJar::<SegmentHeader>::load(&file_path).map_err(ProviderError::other)?;
901
902            NippyJarChecker::new(jar).check_consistency().map_err(ProviderError::other)?;
903        }
904        Ok(())
905    }
906
907    /// Check invariants for each corresponding table and static file segment:
908    ///
909    /// * the corresponding database table should overlap or have continuity in their keys
910    ///   ([`TxNumber`] or [`BlockNumber`]).
911    /// * its highest block should match the stage checkpoint block number if it's equal or higher
912    ///   than the corresponding database table last entry.
913    ///   * If the checkpoint block is higher, then request a pipeline unwind to the static file
914    ///     block. This is expressed by returning [`Some`] with the requested pipeline unwind
915    ///     target.
916    ///   * If the checkpoint block is lower, then heal by removing rows from the static file. In
917    ///     this case, the rows will be removed and [`None`] will be returned.
918    ///
919    /// * If the database tables overlap with static files and have contiguous keys, or the
920    ///   checkpoint block matches the highest static files block, then [`None`] will be returned.
921    fn ensure_invariants<Provider, T: Table<Key = u64>>(
922        &self,
923        provider: &Provider,
924        segment: StaticFileSegment,
925        highest_static_file_entry: Option<u64>,
926        highest_static_file_block: Option<BlockNumber>,
927    ) -> ProviderResult<Option<BlockNumber>>
928    where
929        Provider: DBProvider + BlockReader + StageCheckpointReader,
930    {
931        let mut db_cursor = provider.tx_ref().cursor_read::<T>()?;
932
933        if let Some((db_first_entry, _)) = db_cursor.first()? {
934            if let (Some(highest_entry), Some(highest_block)) =
935                (highest_static_file_entry, highest_static_file_block)
936            {
937                // If there is a gap between the entry found in static file and
938                // database, then we have most likely lost static file data and need to unwind so we
939                // can load it again
940                if !(db_first_entry <= highest_entry || highest_entry + 1 == db_first_entry) {
941                    info!(
942                        target: "reth::providers::static_file",
943                        ?db_first_entry,
944                        ?highest_entry,
945                        unwind_target = highest_block,
946                        ?segment,
947                        "Setting unwind target."
948                    );
949                    return Ok(Some(highest_block))
950                }
951            }
952
953            if let Some((db_last_entry, _)) = db_cursor.last()? &&
954                highest_static_file_entry
955                    .is_none_or(|highest_entry| db_last_entry > highest_entry)
956            {
957                return Ok(None)
958            }
959        }
960
961        let highest_static_file_entry = highest_static_file_entry.unwrap_or_default();
962        let highest_static_file_block = highest_static_file_block.unwrap_or_default();
963
964        // If static file entry is ahead of the database entries, then ensure the checkpoint block
965        // number matches.
966        let checkpoint_block_number = provider
967            .get_stage_checkpoint(match segment {
968                StaticFileSegment::Headers => StageId::Headers,
969                StaticFileSegment::Transactions => StageId::Bodies,
970                StaticFileSegment::Receipts => StageId::Execution,
971            })?
972            .unwrap_or_default()
973            .block_number;
974
975        // If the checkpoint is ahead, then we lost static file data. May be data corruption.
976        if checkpoint_block_number > highest_static_file_block {
977            info!(
978                target: "reth::providers::static_file",
979                checkpoint_block_number,
980                unwind_target = highest_static_file_block,
981                ?segment,
982                "Setting unwind target."
983            );
984            return Ok(Some(highest_static_file_block))
985        }
986
987        // If the checkpoint is behind, then we failed to do a database commit **but committed** to
988        // static files on executing a stage, or the reverse on unwinding a stage.
989        // All we need to do is to prune the extra static file rows.
990        if checkpoint_block_number < highest_static_file_block {
991            info!(
992                target: "reth::providers",
993                ?segment,
994                from = highest_static_file_block,
995                to = checkpoint_block_number,
996                "Unwinding static file segment."
997            );
998            let mut writer = self.latest_writer(segment)?;
999            if segment.is_headers() {
1000                // TODO(joshie): is_block_meta
1001                writer.prune_headers(highest_static_file_block - checkpoint_block_number)?;
1002            } else if let Some(block) = provider.block_body_indices(checkpoint_block_number)? {
1003                // todo joshie: is querying block_body_indices a potential issue once bbi is moved
1004                // to sf as well
1005                let number = highest_static_file_entry - block.last_tx_num();
1006                if segment.is_receipts() {
1007                    writer.prune_receipts(number, checkpoint_block_number)?;
1008                } else {
1009                    writer.prune_transactions(number, checkpoint_block_number)?;
1010                }
1011            }
1012            writer.commit()?;
1013        }
1014
1015        Ok(None)
1016    }
1017
1018    /// Returns the earliest available block number that has not been expired and is still
1019    /// available.
1020    ///
1021    /// This means that the highest expired block (or expired block height) is
1022    /// `earliest_history_height.saturating_sub(1)`.
1023    ///
1024    /// Returns `0` if no history has been expired.
1025    pub fn earliest_history_height(&self) -> BlockNumber {
1026        self.earliest_history_height.load(std::sync::atomic::Ordering::Relaxed)
1027    }
1028
1029    /// Gets the lowest transaction static file block if it exists.
1030    ///
1031    /// For example if the transactions static file has blocks 0-499, this will return 499..
1032    ///
1033    /// If there is nothing on disk for the given segment, this will return [`None`].
1034    pub fn get_lowest_transaction_static_file_block(&self) -> Option<BlockNumber> {
1035        self.get_lowest_static_file_block(StaticFileSegment::Transactions)
1036    }
1037
1038    /// Gets the lowest static file's block height if it exists for a static file segment.
1039    ///
1040    /// For example if the static file has blocks 0-499, this will return 499..
1041    ///
1042    /// If there is nothing on disk for the given segment, this will return [`None`].
1043    pub fn get_lowest_static_file_block(&self, segment: StaticFileSegment) -> Option<BlockNumber> {
1044        self.static_files_min_block.read().get(&segment).map(|range| range.end())
1045    }
1046
1047    /// Gets the highest static file's block height if it exists for a static file segment.
1048    ///
1049    /// If there is nothing on disk for the given segment, this will return [`None`].
1050    pub fn get_highest_static_file_block(&self, segment: StaticFileSegment) -> Option<BlockNumber> {
1051        self.static_files_max_block.read().get(&segment).copied()
1052    }
1053
1054    /// Gets the highest static file transaction.
1055    ///
1056    /// If there is nothing on disk for the given segment, this will return [`None`].
1057    pub fn get_highest_static_file_tx(&self, segment: StaticFileSegment) -> Option<TxNumber> {
1058        self.static_files_tx_index
1059            .read()
1060            .get(&segment)
1061            .and_then(|index| index.last_key_value().map(|(last_tx, _)| *last_tx))
1062    }
1063
1064    /// Gets the highest static file block for all segments.
1065    pub fn get_highest_static_files(&self) -> HighestStaticFiles {
1066        HighestStaticFiles {
1067            headers: self.get_highest_static_file_block(StaticFileSegment::Headers),
1068            receipts: self.get_highest_static_file_block(StaticFileSegment::Receipts),
1069            transactions: self.get_highest_static_file_block(StaticFileSegment::Transactions),
1070        }
1071    }
1072
1073    /// Iterates through segment `static_files` in reverse order, executing a function until it
1074    /// returns some object. Useful for finding objects by [`TxHash`] or [`BlockHash`].
1075    pub fn find_static_file<T>(
1076        &self,
1077        segment: StaticFileSegment,
1078        func: impl Fn(StaticFileJarProvider<'_, N>) -> ProviderResult<Option<T>>,
1079    ) -> ProviderResult<Option<T>> {
1080        if let Some(highest_block) = self.get_highest_static_file_block(segment) {
1081            let mut range = self.find_fixed_range(highest_block);
1082            while range.end() > 0 {
1083                if let Some(res) = func(self.get_or_create_jar_provider(segment, &range)?)? {
1084                    return Ok(Some(res))
1085                }
1086                range = SegmentRangeInclusive::new(
1087                    range.start().saturating_sub(self.blocks_per_file),
1088                    range.end().saturating_sub(self.blocks_per_file),
1089                );
1090            }
1091        }
1092
1093        Ok(None)
1094    }
1095
1096    /// Fetches data within a specified range across multiple static files.
1097    ///
1098    /// This function iteratively retrieves data using `get_fn` for each item in the given range.
1099    /// It continues fetching until the end of the range is reached or the provided `predicate`
1100    /// returns false.
1101    pub fn fetch_range_with_predicate<T, F, P>(
1102        &self,
1103        segment: StaticFileSegment,
1104        range: Range<u64>,
1105        mut get_fn: F,
1106        mut predicate: P,
1107    ) -> ProviderResult<Vec<T>>
1108    where
1109        F: FnMut(&mut StaticFileCursor<'_>, u64) -> ProviderResult<Option<T>>,
1110        P: FnMut(&T) -> bool,
1111    {
1112        let mut result = Vec::with_capacity((range.end - range.start).min(100) as usize);
1113
1114        /// Resolves to the provider for the given block or transaction number.
1115        ///
1116        /// If the static file is missing, the `result` is returned.
1117        macro_rules! get_provider {
1118            ($number:expr) => {{
1119                let provider = if segment.is_block_based() {
1120                    self.get_segment_provider_from_block(segment, $number, None)
1121                } else {
1122                    self.get_segment_provider_from_transaction(segment, $number, None)
1123                };
1124
1125                match provider {
1126                    Ok(provider) => provider,
1127                    Err(
1128                        ProviderError::MissingStaticFileBlock(_, _) |
1129                        ProviderError::MissingStaticFileTx(_, _),
1130                    ) => return Ok(result),
1131                    Err(err) => return Err(err),
1132                }
1133            }};
1134        }
1135
1136        let mut provider = get_provider!(range.start);
1137        let mut cursor = provider.cursor()?;
1138
1139        // advances number in range
1140        'outer: for number in range {
1141            // The `retrying` flag ensures a single retry attempt per `number`. If `get_fn` fails to
1142            // access data in two different static files, it halts further attempts by returning
1143            // an error, effectively preventing infinite retry loops.
1144            let mut retrying = false;
1145
1146            // advances static files if `get_fn` returns None
1147            'inner: loop {
1148                match get_fn(&mut cursor, number)? {
1149                    Some(res) => {
1150                        if !predicate(&res) {
1151                            break 'outer
1152                        }
1153                        result.push(res);
1154                        break 'inner
1155                    }
1156                    None => {
1157                        if retrying {
1158                            return Ok(result)
1159                        }
1160                        // There is a very small chance of hitting a deadlock if two consecutive
1161                        // static files share the same bucket in the
1162                        // internal dashmap and we don't drop the current provider
1163                        // before requesting the next one.
1164                        drop(cursor);
1165                        drop(provider);
1166                        provider = get_provider!(number);
1167                        cursor = provider.cursor()?;
1168                        retrying = true;
1169                    }
1170                }
1171            }
1172        }
1173
1174        Ok(result)
1175    }
1176
1177    /// Fetches data within a specified range across multiple static files.
1178    ///
1179    /// Returns an iterator over the data
1180    pub fn fetch_range_iter<'a, T, F>(
1181        &'a self,
1182        segment: StaticFileSegment,
1183        range: Range<u64>,
1184        get_fn: F,
1185    ) -> ProviderResult<impl Iterator<Item = ProviderResult<T>> + 'a>
1186    where
1187        F: Fn(&mut StaticFileCursor<'_>, u64) -> ProviderResult<Option<T>> + 'a,
1188        T: std::fmt::Debug,
1189    {
1190        let get_provider = move |start: u64| {
1191            if segment.is_block_based() {
1192                self.get_segment_provider_from_block(segment, start, None)
1193            } else {
1194                self.get_segment_provider_from_transaction(segment, start, None)
1195            }
1196        };
1197
1198        let mut provider = Some(get_provider(range.start)?);
1199        Ok(range.filter_map(move |number| {
1200            match get_fn(&mut provider.as_ref().expect("qed").cursor().ok()?, number).transpose() {
1201                Some(result) => Some(result),
1202                None => {
1203                    // There is a very small chance of hitting a deadlock if two consecutive static
1204                    // files share the same bucket in the internal dashmap and
1205                    // we don't drop the current provider before requesting the
1206                    // next one.
1207                    provider.take();
1208                    provider = Some(get_provider(number).ok()?);
1209                    get_fn(&mut provider.as_ref().expect("qed").cursor().ok()?, number).transpose()
1210                }
1211            }
1212        }))
1213    }
1214
1215    /// Returns directory where `static_files` are located.
1216    pub fn directory(&self) -> &Path {
1217        &self.path
1218    }
1219
1220    /// Retrieves data from the database or static file, wherever it's available.
1221    ///
1222    /// # Arguments
1223    /// * `segment` - The segment of the static file to check against.
1224    /// * `index_key` - Requested index key, usually a block or transaction number.
1225    /// * `fetch_from_static_file` - A closure that defines how to fetch the data from the static
1226    ///   file provider.
1227    /// * `fetch_from_database` - A closure that defines how to fetch the data from the database
1228    ///   when the static file doesn't contain the required data or is not available.
1229    pub fn get_with_static_file_or_database<T, FS, FD>(
1230        &self,
1231        segment: StaticFileSegment,
1232        number: u64,
1233        fetch_from_static_file: FS,
1234        fetch_from_database: FD,
1235    ) -> ProviderResult<Option<T>>
1236    where
1237        FS: Fn(&Self) -> ProviderResult<Option<T>>,
1238        FD: Fn() -> ProviderResult<Option<T>>,
1239    {
1240        // If there is, check the maximum block or transaction number of the segment.
1241        let static_file_upper_bound = if segment.is_block_based() {
1242            self.get_highest_static_file_block(segment)
1243        } else {
1244            self.get_highest_static_file_tx(segment)
1245        };
1246
1247        if static_file_upper_bound
1248            .is_some_and(|static_file_upper_bound| static_file_upper_bound >= number)
1249        {
1250            return fetch_from_static_file(self)
1251        }
1252        fetch_from_database()
1253    }
1254
1255    /// Gets data within a specified range, potentially spanning different `static_files` and
1256    /// database.
1257    ///
1258    /// # Arguments
1259    /// * `segment` - The segment of the static file to query.
1260    /// * `block_range` - The range of data to fetch.
1261    /// * `fetch_from_static_file` - A function to fetch data from the `static_file`.
1262    /// * `fetch_from_database` - A function to fetch data from the database.
1263    /// * `predicate` - A function used to evaluate each item in the fetched data. Fetching is
1264    ///   terminated when this function returns false, thereby filtering the data based on the
1265    ///   provided condition.
1266    pub fn get_range_with_static_file_or_database<T, P, FS, FD>(
1267        &self,
1268        segment: StaticFileSegment,
1269        mut block_or_tx_range: Range<u64>,
1270        fetch_from_static_file: FS,
1271        mut fetch_from_database: FD,
1272        mut predicate: P,
1273    ) -> ProviderResult<Vec<T>>
1274    where
1275        FS: Fn(&Self, Range<u64>, &mut P) -> ProviderResult<Vec<T>>,
1276        FD: FnMut(Range<u64>, P) -> ProviderResult<Vec<T>>,
1277        P: FnMut(&T) -> bool,
1278    {
1279        let mut data = Vec::new();
1280
1281        // If there is, check the maximum block or transaction number of the segment.
1282        if let Some(static_file_upper_bound) = if segment.is_block_based() {
1283            self.get_highest_static_file_block(segment)
1284        } else {
1285            self.get_highest_static_file_tx(segment)
1286        } && block_or_tx_range.start <= static_file_upper_bound
1287        {
1288            let end = block_or_tx_range.end.min(static_file_upper_bound + 1);
1289            data.extend(fetch_from_static_file(
1290                self,
1291                block_or_tx_range.start..end,
1292                &mut predicate,
1293            )?);
1294            block_or_tx_range.start = end;
1295        }
1296
1297        if block_or_tx_range.end > block_or_tx_range.start {
1298            data.extend(fetch_from_database(block_or_tx_range, predicate)?)
1299        }
1300
1301        Ok(data)
1302    }
1303
1304    /// Returns `static_files` directory
1305    #[cfg(any(test, feature = "test-utils"))]
1306    pub fn path(&self) -> &Path {
1307        &self.path
1308    }
1309
1310    /// Returns `static_files` transaction index
1311    #[cfg(any(test, feature = "test-utils"))]
1312    pub fn tx_index(&self) -> &RwLock<SegmentRanges> {
1313        &self.static_files_tx_index
1314    }
1315}
1316
1317/// Helper trait to manage different [`StaticFileProviderRW`] of an `Arc<StaticFileProvider`
1318pub trait StaticFileWriter {
1319    /// The primitives type used by the static file provider.
1320    type Primitives: Send + Sync + 'static;
1321
1322    /// Returns a mutable reference to a [`StaticFileProviderRW`] of a [`StaticFileSegment`].
1323    fn get_writer(
1324        &self,
1325        block: BlockNumber,
1326        segment: StaticFileSegment,
1327    ) -> ProviderResult<StaticFileProviderRWRefMut<'_, Self::Primitives>>;
1328
1329    /// Returns a mutable reference to a [`StaticFileProviderRW`] of the latest
1330    /// [`StaticFileSegment`].
1331    fn latest_writer(
1332        &self,
1333        segment: StaticFileSegment,
1334    ) -> ProviderResult<StaticFileProviderRWRefMut<'_, Self::Primitives>>;
1335
1336    /// Commits all changes of all [`StaticFileProviderRW`] of all [`StaticFileSegment`].
1337    fn commit(&self) -> ProviderResult<()>;
1338
1339    /// Returns `true` if the static file provider has unwind queued.
1340    fn has_unwind_queued(&self) -> bool;
1341}
1342
1343impl<N: NodePrimitives> StaticFileWriter for StaticFileProvider<N> {
1344    type Primitives = N;
1345
1346    fn get_writer(
1347        &self,
1348        block: BlockNumber,
1349        segment: StaticFileSegment,
1350    ) -> ProviderResult<StaticFileProviderRWRefMut<'_, Self::Primitives>> {
1351        if self.access.is_read_only() {
1352            return Err(ProviderError::ReadOnlyStaticFileAccess)
1353        }
1354
1355        trace!(target: "provider::static_file", ?block, ?segment, "Getting static file writer.");
1356        self.writers.get_or_create(segment, || {
1357            StaticFileProviderRW::new(segment, block, Arc::downgrade(&self.0), self.metrics.clone())
1358        })
1359    }
1360
1361    fn latest_writer(
1362        &self,
1363        segment: StaticFileSegment,
1364    ) -> ProviderResult<StaticFileProviderRWRefMut<'_, Self::Primitives>> {
1365        self.get_writer(self.get_highest_static_file_block(segment).unwrap_or_default(), segment)
1366    }
1367
1368    fn commit(&self) -> ProviderResult<()> {
1369        self.writers.commit()
1370    }
1371
1372    fn has_unwind_queued(&self) -> bool {
1373        self.writers.has_unwind_queued()
1374    }
1375}
1376
1377impl<N: NodePrimitives<BlockHeader: Value>> HeaderProvider for StaticFileProvider<N> {
1378    type Header = N::BlockHeader;
1379
1380    fn header(&self, block_hash: BlockHash) -> ProviderResult<Option<Self::Header>> {
1381        self.find_static_file(StaticFileSegment::Headers, |jar_provider| {
1382            Ok(jar_provider
1383                .cursor()?
1384                .get_two::<HeaderWithHashMask<Self::Header>>((&block_hash).into())?
1385                .and_then(|(header, hash)| {
1386                    if hash == block_hash {
1387                        return Some(header)
1388                    }
1389                    None
1390                }))
1391        })
1392    }
1393
1394    fn header_by_number(&self, num: BlockNumber) -> ProviderResult<Option<Self::Header>> {
1395        self.get_segment_provider_from_block(StaticFileSegment::Headers, num, None)
1396            .and_then(|provider| provider.header_by_number(num))
1397            .or_else(|err| {
1398                if let ProviderError::MissingStaticFileBlock(_, _) = err {
1399                    Ok(None)
1400                } else {
1401                    Err(err)
1402                }
1403            })
1404    }
1405
1406    fn header_td(&self, block_hash: BlockHash) -> ProviderResult<Option<U256>> {
1407        self.find_static_file(StaticFileSegment::Headers, |jar_provider| {
1408            Ok(jar_provider
1409                .cursor()?
1410                .get_two::<TDWithHashMask>((&block_hash).into())?
1411                .and_then(|(td, hash)| (hash == block_hash).then_some(td.0)))
1412        })
1413    }
1414
1415    fn header_td_by_number(&self, num: BlockNumber) -> ProviderResult<Option<U256>> {
1416        self.get_segment_provider_from_block(StaticFileSegment::Headers, num, None)
1417            .and_then(|provider| provider.header_td_by_number(num))
1418            .or_else(|err| {
1419                if let ProviderError::MissingStaticFileBlock(_, _) = err {
1420                    Ok(None)
1421                } else {
1422                    Err(err)
1423                }
1424            })
1425    }
1426
1427    fn headers_range(
1428        &self,
1429        range: impl RangeBounds<BlockNumber>,
1430    ) -> ProviderResult<Vec<Self::Header>> {
1431        self.fetch_range_with_predicate(
1432            StaticFileSegment::Headers,
1433            to_range(range),
1434            |cursor, number| cursor.get_one::<HeaderMask<Self::Header>>(number.into()),
1435            |_| true,
1436        )
1437    }
1438
1439    fn sealed_header(
1440        &self,
1441        num: BlockNumber,
1442    ) -> ProviderResult<Option<SealedHeader<Self::Header>>> {
1443        self.get_segment_provider_from_block(StaticFileSegment::Headers, num, None)
1444            .and_then(|provider| provider.sealed_header(num))
1445            .or_else(|err| {
1446                if let ProviderError::MissingStaticFileBlock(_, _) = err {
1447                    Ok(None)
1448                } else {
1449                    Err(err)
1450                }
1451            })
1452    }
1453
1454    fn sealed_headers_while(
1455        &self,
1456        range: impl RangeBounds<BlockNumber>,
1457        predicate: impl FnMut(&SealedHeader<Self::Header>) -> bool,
1458    ) -> ProviderResult<Vec<SealedHeader<Self::Header>>> {
1459        self.fetch_range_with_predicate(
1460            StaticFileSegment::Headers,
1461            to_range(range),
1462            |cursor, number| {
1463                Ok(cursor
1464                    .get_two::<HeaderWithHashMask<Self::Header>>(number.into())?
1465                    .map(|(header, hash)| SealedHeader::new(header, hash)))
1466            },
1467            predicate,
1468        )
1469    }
1470}
1471
1472impl<N: NodePrimitives> BlockHashReader for StaticFileProvider<N> {
1473    fn block_hash(&self, num: u64) -> ProviderResult<Option<B256>> {
1474        self.get_segment_provider_from_block(StaticFileSegment::Headers, num, None)
1475            .and_then(|provider| provider.block_hash(num))
1476            .or_else(|err| {
1477                if let ProviderError::MissingStaticFileBlock(_, _) = err {
1478                    Ok(None)
1479                } else {
1480                    Err(err)
1481                }
1482            })
1483    }
1484
1485    fn canonical_hashes_range(
1486        &self,
1487        start: BlockNumber,
1488        end: BlockNumber,
1489    ) -> ProviderResult<Vec<B256>> {
1490        self.fetch_range_with_predicate(
1491            StaticFileSegment::Headers,
1492            start..end,
1493            |cursor, number| cursor.get_one::<BlockHashMask>(number.into()),
1494            |_| true,
1495        )
1496    }
1497}
1498
1499impl<N: NodePrimitives<SignedTx: Value + SignedTransaction, Receipt: Value>> ReceiptProvider
1500    for StaticFileProvider<N>
1501{
1502    type Receipt = N::Receipt;
1503
1504    fn receipt(&self, num: TxNumber) -> ProviderResult<Option<Self::Receipt>> {
1505        self.get_segment_provider_from_transaction(StaticFileSegment::Receipts, num, None)
1506            .and_then(|provider| provider.receipt(num))
1507            .or_else(|err| {
1508                if let ProviderError::MissingStaticFileTx(_, _) = err {
1509                    Ok(None)
1510                } else {
1511                    Err(err)
1512                }
1513            })
1514    }
1515
1516    fn receipt_by_hash(&self, hash: TxHash) -> ProviderResult<Option<Self::Receipt>> {
1517        if let Some(num) = self.transaction_id(hash)? {
1518            return self.receipt(num)
1519        }
1520        Ok(None)
1521    }
1522
1523    fn receipts_by_block(
1524        &self,
1525        _block: BlockHashOrNumber,
1526    ) -> ProviderResult<Option<Vec<Self::Receipt>>> {
1527        unreachable!()
1528    }
1529
1530    fn receipts_by_tx_range(
1531        &self,
1532        range: impl RangeBounds<TxNumber>,
1533    ) -> ProviderResult<Vec<Self::Receipt>> {
1534        self.fetch_range_with_predicate(
1535            StaticFileSegment::Receipts,
1536            to_range(range),
1537            |cursor, number| cursor.get_one::<ReceiptMask<Self::Receipt>>(number.into()),
1538            |_| true,
1539        )
1540    }
1541
1542    fn receipts_by_block_range(
1543        &self,
1544        _block_range: RangeInclusive<BlockNumber>,
1545    ) -> ProviderResult<Vec<Vec<Self::Receipt>>> {
1546        Err(ProviderError::UnsupportedProvider)
1547    }
1548}
1549
1550impl<N: FullNodePrimitives<SignedTx: Value, Receipt: Value, BlockHeader: Value>>
1551    TransactionsProviderExt for StaticFileProvider<N>
1552{
1553    fn transaction_hashes_by_range(
1554        &self,
1555        tx_range: Range<TxNumber>,
1556    ) -> ProviderResult<Vec<(TxHash, TxNumber)>> {
1557        let tx_range_size = (tx_range.end - tx_range.start) as usize;
1558
1559        // Transactions are different size, so chunks will not all take the same processing time. If
1560        // chunks are too big, there will be idle threads waiting for work. Choosing an
1561        // arbitrary smaller value to make sure it doesn't happen.
1562        let chunk_size = 100;
1563
1564        // iterator over the chunks
1565        let chunks = tx_range
1566            .clone()
1567            .step_by(chunk_size)
1568            .map(|start| start..std::cmp::min(start + chunk_size as u64, tx_range.end));
1569        let mut channels = Vec::with_capacity(tx_range_size.div_ceil(chunk_size));
1570
1571        for chunk_range in chunks {
1572            let (channel_tx, channel_rx) = mpsc::channel();
1573            channels.push(channel_rx);
1574
1575            let manager = self.clone();
1576
1577            // Spawn the task onto the global rayon pool
1578            // This task will send the results through the channel after it has calculated
1579            // the hash.
1580            rayon::spawn(move || {
1581                let mut rlp_buf = Vec::with_capacity(128);
1582                let _ = manager.fetch_range_with_predicate(
1583                    StaticFileSegment::Transactions,
1584                    chunk_range,
1585                    |cursor, number| {
1586                        Ok(cursor
1587                            .get_one::<TransactionMask<Self::Transaction>>(number.into())?
1588                            .map(|transaction| {
1589                                rlp_buf.clear();
1590                                let _ = channel_tx
1591                                    .send(calculate_hash((number, transaction), &mut rlp_buf));
1592                            }))
1593                    },
1594                    |_| true,
1595                );
1596            });
1597        }
1598
1599        let mut tx_list = Vec::with_capacity(tx_range_size);
1600
1601        // Iterate over channels and append the tx hashes unsorted
1602        for channel in channels {
1603            while let Ok(tx) = channel.recv() {
1604                let (tx_hash, tx_id) = tx.map_err(|boxed| *boxed)?;
1605                tx_list.push((tx_hash, tx_id));
1606            }
1607        }
1608
1609        Ok(tx_list)
1610    }
1611}
1612
1613impl<N: NodePrimitives<SignedTx: Decompress + SignedTransaction>> TransactionsProvider
1614    for StaticFileProvider<N>
1615{
1616    type Transaction = N::SignedTx;
1617
1618    fn transaction_id(&self, tx_hash: TxHash) -> ProviderResult<Option<TxNumber>> {
1619        self.find_static_file(StaticFileSegment::Transactions, |jar_provider| {
1620            let mut cursor = jar_provider.cursor()?;
1621            if cursor
1622                .get_one::<TransactionMask<Self::Transaction>>((&tx_hash).into())?
1623                .and_then(|tx| (tx.trie_hash() == tx_hash).then_some(tx))
1624                .is_some()
1625            {
1626                Ok(cursor.number())
1627            } else {
1628                Ok(None)
1629            }
1630        })
1631    }
1632
1633    fn transaction_by_id(&self, num: TxNumber) -> ProviderResult<Option<Self::Transaction>> {
1634        self.get_segment_provider_from_transaction(StaticFileSegment::Transactions, num, None)
1635            .and_then(|provider| provider.transaction_by_id(num))
1636            .or_else(|err| {
1637                if let ProviderError::MissingStaticFileTx(_, _) = err {
1638                    Ok(None)
1639                } else {
1640                    Err(err)
1641                }
1642            })
1643    }
1644
1645    fn transaction_by_id_unhashed(
1646        &self,
1647        num: TxNumber,
1648    ) -> ProviderResult<Option<Self::Transaction>> {
1649        self.get_segment_provider_from_transaction(StaticFileSegment::Transactions, num, None)
1650            .and_then(|provider| provider.transaction_by_id_unhashed(num))
1651            .or_else(|err| {
1652                if let ProviderError::MissingStaticFileTx(_, _) = err {
1653                    Ok(None)
1654                } else {
1655                    Err(err)
1656                }
1657            })
1658    }
1659
1660    fn transaction_by_hash(&self, hash: TxHash) -> ProviderResult<Option<Self::Transaction>> {
1661        self.find_static_file(StaticFileSegment::Transactions, |jar_provider| {
1662            Ok(jar_provider
1663                .cursor()?
1664                .get_one::<TransactionMask<Self::Transaction>>((&hash).into())?
1665                .and_then(|tx| (tx.trie_hash() == hash).then_some(tx)))
1666        })
1667    }
1668
1669    fn transaction_by_hash_with_meta(
1670        &self,
1671        _hash: TxHash,
1672    ) -> ProviderResult<Option<(Self::Transaction, TransactionMeta)>> {
1673        // Required data not present in static_files
1674        Err(ProviderError::UnsupportedProvider)
1675    }
1676
1677    fn transaction_block(&self, _id: TxNumber) -> ProviderResult<Option<BlockNumber>> {
1678        // Required data not present in static_files
1679        Err(ProviderError::UnsupportedProvider)
1680    }
1681
1682    fn transactions_by_block(
1683        &self,
1684        _block_id: BlockHashOrNumber,
1685    ) -> ProviderResult<Option<Vec<Self::Transaction>>> {
1686        // Required data not present in static_files
1687        Err(ProviderError::UnsupportedProvider)
1688    }
1689
1690    fn transactions_by_block_range(
1691        &self,
1692        _range: impl RangeBounds<BlockNumber>,
1693    ) -> ProviderResult<Vec<Vec<Self::Transaction>>> {
1694        // Required data not present in static_files
1695        Err(ProviderError::UnsupportedProvider)
1696    }
1697
1698    fn transactions_by_tx_range(
1699        &self,
1700        range: impl RangeBounds<TxNumber>,
1701    ) -> ProviderResult<Vec<Self::Transaction>> {
1702        self.fetch_range_with_predicate(
1703            StaticFileSegment::Transactions,
1704            to_range(range),
1705            |cursor, number| cursor.get_one::<TransactionMask<Self::Transaction>>(number.into()),
1706            |_| true,
1707        )
1708    }
1709
1710    fn senders_by_tx_range(
1711        &self,
1712        range: impl RangeBounds<TxNumber>,
1713    ) -> ProviderResult<Vec<Address>> {
1714        let txes = self.transactions_by_tx_range(range)?;
1715        Ok(reth_primitives_traits::transaction::recover::recover_signers(&txes)?)
1716    }
1717
1718    fn transaction_sender(&self, id: TxNumber) -> ProviderResult<Option<Address>> {
1719        match self.transaction_by_id_unhashed(id)? {
1720            Some(tx) => Ok(tx.recover_signer().ok()),
1721            None => Ok(None),
1722        }
1723    }
1724}
1725
1726impl<N: NodePrimitives> BlockNumReader for StaticFileProvider<N> {
1727    fn chain_info(&self) -> ProviderResult<ChainInfo> {
1728        // Required data not present in static_files
1729        Err(ProviderError::UnsupportedProvider)
1730    }
1731
1732    fn best_block_number(&self) -> ProviderResult<BlockNumber> {
1733        // Required data not present in static_files
1734        Err(ProviderError::UnsupportedProvider)
1735    }
1736
1737    fn last_block_number(&self) -> ProviderResult<BlockNumber> {
1738        Ok(self.get_highest_static_file_block(StaticFileSegment::Headers).unwrap_or_default())
1739    }
1740
1741    fn block_number(&self, _hash: B256) -> ProviderResult<Option<BlockNumber>> {
1742        // Required data not present in static_files
1743        Err(ProviderError::UnsupportedProvider)
1744    }
1745}
1746
1747/* Cannot be successfully implemented but must exist for trait requirements */
1748
1749impl<N: FullNodePrimitives<SignedTx: Value, Receipt: Value, BlockHeader: Value>> BlockReader
1750    for StaticFileProvider<N>
1751{
1752    type Block = N::Block;
1753
1754    fn find_block_by_hash(
1755        &self,
1756        _hash: B256,
1757        _source: BlockSource,
1758    ) -> ProviderResult<Option<Self::Block>> {
1759        // Required data not present in static_files
1760        Err(ProviderError::UnsupportedProvider)
1761    }
1762
1763    fn block(&self, _id: BlockHashOrNumber) -> ProviderResult<Option<Self::Block>> {
1764        // Required data not present in static_files
1765        Err(ProviderError::UnsupportedProvider)
1766    }
1767
1768    fn pending_block(&self) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
1769        // Required data not present in static_files
1770        Err(ProviderError::UnsupportedProvider)
1771    }
1772
1773    fn pending_block_and_receipts(
1774        &self,
1775    ) -> ProviderResult<Option<(RecoveredBlock<Self::Block>, Vec<Self::Receipt>)>> {
1776        // Required data not present in static_files
1777        Err(ProviderError::UnsupportedProvider)
1778    }
1779
1780    fn recovered_block(
1781        &self,
1782        _id: BlockHashOrNumber,
1783        _transaction_kind: TransactionVariant,
1784    ) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
1785        // Required data not present in static_files
1786        Err(ProviderError::UnsupportedProvider)
1787    }
1788
1789    fn sealed_block_with_senders(
1790        &self,
1791        _id: BlockHashOrNumber,
1792        _transaction_kind: TransactionVariant,
1793    ) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
1794        // Required data not present in static_files
1795        Err(ProviderError::UnsupportedProvider)
1796    }
1797
1798    fn block_range(&self, _range: RangeInclusive<BlockNumber>) -> ProviderResult<Vec<Self::Block>> {
1799        // Required data not present in static_files
1800        Err(ProviderError::UnsupportedProvider)
1801    }
1802
1803    fn block_with_senders_range(
1804        &self,
1805        _range: RangeInclusive<BlockNumber>,
1806    ) -> ProviderResult<Vec<RecoveredBlock<Self::Block>>> {
1807        Err(ProviderError::UnsupportedProvider)
1808    }
1809
1810    fn recovered_block_range(
1811        &self,
1812        _range: RangeInclusive<BlockNumber>,
1813    ) -> ProviderResult<Vec<RecoveredBlock<Self::Block>>> {
1814        Err(ProviderError::UnsupportedProvider)
1815    }
1816
1817    fn block_by_transaction_id(&self, _id: TxNumber) -> ProviderResult<Option<BlockNumber>> {
1818        Err(ProviderError::UnsupportedProvider)
1819    }
1820}
1821
1822impl<N: NodePrimitives> BlockBodyIndicesProvider for StaticFileProvider<N> {
1823    fn block_body_indices(&self, _num: u64) -> ProviderResult<Option<StoredBlockBodyIndices>> {
1824        Err(ProviderError::UnsupportedProvider)
1825    }
1826
1827    fn block_body_indices_range(
1828        &self,
1829        _range: RangeInclusive<BlockNumber>,
1830    ) -> ProviderResult<Vec<StoredBlockBodyIndices>> {
1831        Err(ProviderError::UnsupportedProvider)
1832    }
1833}
1834
1835impl<N: NodePrimitives> StatsReader for StaticFileProvider<N> {
1836    fn count_entries<T: Table>(&self) -> ProviderResult<usize> {
1837        match T::NAME {
1838            tables::CanonicalHeaders::NAME |
1839            tables::Headers::<Header>::NAME |
1840            tables::HeaderTerminalDifficulties::NAME => Ok(self
1841                .get_highest_static_file_block(StaticFileSegment::Headers)
1842                .map(|block| block + 1)
1843                .unwrap_or_default()
1844                as usize),
1845            tables::Receipts::<Receipt>::NAME => Ok(self
1846                .get_highest_static_file_tx(StaticFileSegment::Receipts)
1847                .map(|receipts| receipts + 1)
1848                .unwrap_or_default() as usize),
1849            tables::Transactions::<TransactionSigned>::NAME => Ok(self
1850                .get_highest_static_file_tx(StaticFileSegment::Transactions)
1851                .map(|txs| txs + 1)
1852                .unwrap_or_default()
1853                as usize),
1854            _ => Err(ProviderError::UnsupportedProvider),
1855        }
1856    }
1857}
1858
1859/// Calculates the tx hash for the given transaction and its id.
1860#[inline]
1861fn calculate_hash<T>(
1862    entry: (TxNumber, T),
1863    rlp_buf: &mut Vec<u8>,
1864) -> Result<(B256, TxNumber), Box<ProviderError>>
1865where
1866    T: Encodable2718,
1867{
1868    let (tx_id, tx) = entry;
1869    tx.encode_2718(rlp_buf);
1870    Ok((keccak256(rlp_buf), tx_id))
1871}