1use super::{
2 metrics::StaticFileProviderMetrics, writer::StaticFileWriters, LoadedJar,
3 StaticFileJarProvider, StaticFileProviderRW, StaticFileProviderRWRefMut,
4};
5use crate::{
6 changeset_walker::{StaticFileAccountChangesetWalker, StaticFileStorageChangesetWalker},
7 to_range, BlockHashReader, BlockNumReader, BlockReader, BlockSource, EitherWriter,
8 EitherWriterDestination, HeaderProvider, ReceiptProvider, StageCheckpointReader, StatsReader,
9 TransactionVariant, TransactionsProvider, TransactionsProviderExt,
10};
11use alloy_consensus::{
12 transaction::{TransactionMeta, TxHashRef},
13 Header,
14};
15use alloy_eips::BlockHashOrNumber;
16use alloy_primitives::{b256, Address, BlockHash, BlockNumber, TxHash, TxNumber, B256};
17
18use parking_lot::RwLock;
19use reth_chain_state::ExecutedBlock;
20use reth_chainspec::{ChainInfo, ChainSpecProvider, EthChainSpec, NamedChain};
21use reth_db::{
22 lockfile::StorageLock,
23 static_file::{
24 iter_static_files, BlockHashMask, HeaderMask, HeaderWithHashMask, ReceiptMask,
25 StaticFileCursor, StorageChangesetMask, TransactionMask, TransactionSenderMask,
26 },
27};
28use reth_db_api::{
29 cursor::DbCursorRO,
30 models::{AccountBeforeTx, BlockNumberAddress, StorageBeforeTx, StoredBlockBodyIndices},
31 table::{Decompress, Table, Value},
32 tables,
33 transaction::DbTx,
34};
35use reth_ethereum_primitives::{Receipt, TransactionSigned};
36use reth_nippy_jar::{NippyJar, NippyJarChecker};
37use reth_node_types::NodePrimitives;
38use reth_primitives_traits::{
39 dashmap::DashMap, AlloyBlockHeader as _, BlockBody as _, RecoveredBlock, SealedHeader,
40 SignedTransaction, StorageEntry,
41};
42use reth_prune_types::PruneSegment;
43use reth_stages_types::PipelineTarget;
44use reth_static_file_types::{
45 find_fixed_range, HighestStaticFiles, SegmentHeader, SegmentRangeInclusive, StaticFileMap,
46 StaticFileSegment, DEFAULT_BLOCKS_PER_STATIC_FILE,
47};
48use reth_storage_api::{
49 BlockBodyIndicesProvider, ChangeSetReader, DBProvider, PruneCheckpointReader,
50 StorageChangeSetReader, StorageSettingsCache,
51};
52use reth_storage_errors::provider::{ProviderError, ProviderResult, StaticFileWriterError};
53use std::{
54 collections::BTreeMap,
55 fmt::Debug,
56 ops::{Bound, Deref, Range, RangeBounds, RangeInclusive},
57 path::{Path, PathBuf},
58 sync::{atomic::AtomicU64, mpsc, Arc},
59};
60use tracing::{debug, info, info_span, instrument, trace, warn};
61
62type SegmentRanges = BTreeMap<u64, SegmentRangeInclusive>;
65
66#[derive(Debug, Default, PartialEq, Eq)]
68pub enum StaticFileAccess {
69 #[default]
71 RO,
72 RW,
74}
75
76impl StaticFileAccess {
77 pub const fn is_read_only(&self) -> bool {
79 matches!(self, Self::RO)
80 }
81
82 pub const fn is_read_write(&self) -> bool {
84 matches!(self, Self::RW)
85 }
86}
87
88#[derive(Debug, Clone, Copy, Default)]
92pub struct StaticFileWriteCtx {
93 pub write_senders: bool,
95 pub write_receipts: bool,
97 pub write_account_changesets: bool,
99 pub write_storage_changesets: bool,
101 pub tip: BlockNumber,
103 pub receipts_prune_mode: Option<reth_prune_types::PruneMode>,
105 pub receipts_prunable: bool,
107}
108
109#[derive(Debug)]
118pub struct StaticFileProvider<N>(pub(crate) Arc<StaticFileProviderInner<N>>);
119
120impl<N> Clone for StaticFileProvider<N> {
121 fn clone(&self) -> Self {
122 Self(self.0.clone())
123 }
124}
125
126#[derive(Debug)]
128pub struct StaticFileProviderBuilder<P> {
129 access: StaticFileAccess,
130 use_metrics: bool,
131 blocks_per_file: StaticFileMap<u64>,
132 path: P,
133 genesis_block_number: u64,
134}
135
136impl<P: AsRef<Path>> StaticFileProviderBuilder<P> {
137 pub fn read_write(path: P) -> Self {
139 Self {
140 path,
141 access: StaticFileAccess::RW,
142 blocks_per_file: Default::default(),
143 use_metrics: false,
144 genesis_block_number: 0,
145 }
146 }
147
148 pub fn read_only(path: P) -> Self {
150 Self {
151 path,
152 access: StaticFileAccess::RO,
153 blocks_per_file: Default::default(),
154 use_metrics: false,
155 genesis_block_number: 0,
156 }
157 }
158
159 pub fn with_blocks_per_file_for_segments(
171 mut self,
172 segments: &<StaticFileMap<u64> as Deref>::Target,
173 ) -> Self {
174 for (segment, &blocks_per_file) in segments {
175 self.blocks_per_file.insert(segment, blocks_per_file);
176 }
177 self
178 }
179
180 pub fn with_blocks_per_file(mut self, blocks_per_file: u64) -> Self {
182 for segment in StaticFileSegment::iter() {
183 self.blocks_per_file.insert(segment, blocks_per_file);
184 }
185 self
186 }
187
188 pub fn with_blocks_per_file_for_segment(
190 mut self,
191 segment: StaticFileSegment,
192 blocks_per_file: u64,
193 ) -> Self {
194 self.blocks_per_file.insert(segment, blocks_per_file);
195 self
196 }
197
198 pub const fn with_metrics(mut self) -> Self {
200 self.use_metrics = true;
201 self
202 }
203
204 pub const fn with_genesis_block_number(mut self, genesis_block_number: u64) -> Self {
217 self.genesis_block_number = genesis_block_number;
218 self
219 }
220
221 pub fn build<N: NodePrimitives>(self) -> ProviderResult<StaticFileProvider<N>> {
223 let mut provider = StaticFileProviderInner::new(self.path, self.access)?;
224 if self.use_metrics {
225 provider.metrics = Some(Arc::new(StaticFileProviderMetrics::default()));
226 }
227
228 for (segment, blocks_per_file) in *self.blocks_per_file {
229 provider.blocks_per_file.insert(segment, blocks_per_file);
230 }
231 provider.genesis_block_number = self.genesis_block_number;
232
233 let provider = StaticFileProvider(Arc::new(provider));
234 provider.initialize_index()?;
235 Ok(provider)
236 }
237}
238
239impl<N: NodePrimitives> StaticFileProvider<N> {
240 fn new(path: impl AsRef<Path>, access: StaticFileAccess) -> ProviderResult<Self> {
242 let provider = Self(Arc::new(StaticFileProviderInner::new(path, access)?));
243 provider.initialize_index()?;
244 Ok(provider)
245 }
246}
247
248impl<N: NodePrimitives> StaticFileProvider<N> {
249 pub fn read_only(path: impl AsRef<Path>) -> ProviderResult<Self> {
254 Self::new(path, StaticFileAccess::RO)
255 }
256
257 pub fn read_write(path: impl AsRef<Path>) -> ProviderResult<Self> {
259 Self::new(path, StaticFileAccess::RW)
260 }
261}
262
263impl<N: NodePrimitives> Deref for StaticFileProvider<N> {
264 type Target = StaticFileProviderInner<N>;
265
266 fn deref(&self) -> &Self::Target {
267 &self.0
268 }
269}
270
271#[derive(Debug)]
273pub struct StaticFileProviderInner<N> {
274 map: DashMap<(BlockNumber, StaticFileSegment), LoadedJar>,
277 indexes: RwLock<StaticFileMap<StaticFileSegmentIndex>>,
279 earliest_history_height: AtomicU64,
290 path: PathBuf,
292 writers: StaticFileWriters<N>,
294 metrics: Option<Arc<StaticFileProviderMetrics>>,
296 access: StaticFileAccess,
298 blocks_per_file: StaticFileMap<u64>,
300 _lock_file: Option<StorageLock>,
302 genesis_block_number: u64,
304}
305
306impl<N: NodePrimitives> StaticFileProviderInner<N> {
307 fn new(path: impl AsRef<Path>, access: StaticFileAccess) -> ProviderResult<Self> {
309 let _lock_file = if access.is_read_write() {
310 StorageLock::try_acquire(path.as_ref()).map_err(ProviderError::other)?.into()
311 } else {
312 None
313 };
314
315 let mut blocks_per_file = StaticFileMap::default();
316 for segment in StaticFileSegment::iter() {
317 blocks_per_file.insert(segment, DEFAULT_BLOCKS_PER_STATIC_FILE);
318 }
319
320 let provider = Self {
321 map: Default::default(),
322 indexes: Default::default(),
323 writers: Default::default(),
324 earliest_history_height: Default::default(),
325 path: path.as_ref().to_path_buf(),
326 metrics: None,
327 access,
328 blocks_per_file,
329 _lock_file,
330 genesis_block_number: 0,
331 };
332
333 Ok(provider)
334 }
335
336 pub const fn is_read_only(&self) -> bool {
337 self.access.is_read_only()
338 }
339
340 pub fn find_fixed_range_with_block_index(
349 &self,
350 segment: StaticFileSegment,
351 block_index: Option<&SegmentRanges>,
352 block: BlockNumber,
353 ) -> SegmentRangeInclusive {
354 let blocks_per_file =
355 self.blocks_per_file.get(segment).copied().unwrap_or(DEFAULT_BLOCKS_PER_STATIC_FILE);
356
357 if let Some(block_index) = block_index {
358 if let Some((_, range)) = block_index.iter().find(|(max_block, _)| block <= **max_block)
360 {
361 return *range;
363 } else if let Some((_, range)) = block_index.last_key_value() {
364 let blocks_after_last_range = block - range.end();
370 let segments_to_skip = (blocks_after_last_range - 1) / blocks_per_file;
371 let start = range.end() + 1 + segments_to_skip * blocks_per_file;
372 return SegmentRangeInclusive::new(start, start + blocks_per_file - 1);
373 }
374 }
375 find_fixed_range(block, blocks_per_file)
378 }
379
380 pub fn find_fixed_range(
393 &self,
394 segment: StaticFileSegment,
395 block: BlockNumber,
396 ) -> SegmentRangeInclusive {
397 self.find_fixed_range_with_block_index(
398 segment,
399 self.indexes.read().get(segment).map(|index| &index.expected_block_ranges_by_max_block),
400 block,
401 )
402 }
403
404 pub const fn genesis_block_number(&self) -> u64 {
406 self.genesis_block_number
407 }
408}
409
410impl<N: NodePrimitives> StaticFileProvider<N> {
411 pub fn report_metrics(&self) -> ProviderResult<()> {
416 let Some(metrics) = &self.metrics else { return Ok(()) };
417
418 let static_files = iter_static_files(&self.path).map_err(ProviderError::other)?;
419 for (segment, headers) in &*static_files {
420 let mut entries = 0;
421 let mut size = 0;
422
423 for (block_range, _) in headers {
424 let fixed_block_range = self.find_fixed_range(segment, block_range.start());
425 let jar_provider = self
426 .get_segment_provider_for_range(segment, || Some(fixed_block_range), None)?
427 .ok_or_else(|| {
428 ProviderError::MissingStaticFileBlock(segment, block_range.start())
429 })?;
430
431 entries += jar_provider.rows();
432 size += jar_provider.size() as u64;
433 }
434
435 metrics.record_segment(segment, size, headers.len(), entries);
436 }
437
438 Ok(())
439 }
440
441 #[instrument(level = "debug", target = "providers::static_file", skip_all)]
443 fn write_headers(
444 w: &mut StaticFileProviderRWRefMut<'_, N>,
445 blocks: &[ExecutedBlock<N>],
446 ) -> ProviderResult<()> {
447 for block in blocks {
448 let b = block.recovered_block();
449 w.append_header(b.header(), &b.hash())?;
450 }
451 Ok(())
452 }
453
454 #[instrument(level = "debug", target = "providers::static_file", skip_all)]
456 fn write_transactions(
457 w: &mut StaticFileProviderRWRefMut<'_, N>,
458 blocks: &[ExecutedBlock<N>],
459 tx_nums: &[TxNumber],
460 ) -> ProviderResult<()> {
461 for (block, &first_tx) in blocks.iter().zip(tx_nums) {
462 let b = block.recovered_block();
463 w.increment_block(b.number())?;
464 for (i, tx) in b.body().transactions().iter().enumerate() {
465 w.append_transaction(first_tx + i as u64, tx)?;
466 }
467 }
468 Ok(())
469 }
470
471 #[instrument(level = "debug", target = "providers::static_file", skip_all)]
473 fn write_transaction_senders(
474 w: &mut StaticFileProviderRWRefMut<'_, N>,
475 blocks: &[ExecutedBlock<N>],
476 tx_nums: &[TxNumber],
477 ) -> ProviderResult<()> {
478 for (block, &first_tx) in blocks.iter().zip(tx_nums) {
479 let b = block.recovered_block();
480 w.increment_block(b.number())?;
481 for (i, sender) in b.senders_iter().enumerate() {
482 w.append_transaction_sender(first_tx + i as u64, sender)?;
483 }
484 }
485 Ok(())
486 }
487
488 #[instrument(level = "debug", target = "providers::static_file", skip_all)]
490 fn write_receipts(
491 w: &mut StaticFileProviderRWRefMut<'_, N>,
492 blocks: &[ExecutedBlock<N>],
493 tx_nums: &[TxNumber],
494 ctx: &StaticFileWriteCtx,
495 ) -> ProviderResult<()> {
496 for (block, &first_tx) in blocks.iter().zip(tx_nums) {
497 let block_number = block.recovered_block().number();
498 w.increment_block(block_number)?;
499
500 if ctx.receipts_prunable &&
502 ctx.receipts_prune_mode
503 .is_some_and(|mode| mode.should_prune(block_number, ctx.tip))
504 {
505 continue
506 }
507
508 for (i, receipt) in block.execution_outcome().receipts.iter().enumerate() {
509 w.append_receipt(first_tx + i as u64, receipt)?;
510 }
511 }
512 Ok(())
513 }
514
515 #[instrument(level = "debug", target = "providers::static_file", skip_all)]
517 fn write_account_changesets(
518 w: &mut StaticFileProviderRWRefMut<'_, N>,
519 blocks: &[ExecutedBlock<N>],
520 ) -> ProviderResult<()> {
521 for block in blocks {
522 let block_number = block.recovered_block().number();
523 let reverts = block.execution_outcome().state.reverts.to_plain_state_reverts();
524
525 let changeset: Vec<_> = reverts
526 .accounts
527 .into_iter()
528 .flatten()
529 .map(|(address, info)| AccountBeforeTx { address, info: info.map(Into::into) })
530 .collect();
531 w.append_account_changeset(changeset, block_number)?;
532 }
533 Ok(())
534 }
535
536 #[instrument(level = "debug", target = "providers::db", skip_all)]
538 fn write_storage_changesets(
539 w: &mut StaticFileProviderRWRefMut<'_, N>,
540 blocks: &[ExecutedBlock<N>],
541 ) -> ProviderResult<()> {
542 for block in blocks {
543 let block_number = block.recovered_block().number();
544 let reverts = block.execution_outcome().state.reverts.to_plain_state_reverts();
545
546 let changeset: Vec<_> = reverts
547 .storage
548 .into_iter()
549 .flatten()
550 .flat_map(|revert| {
551 revert.storage_revert.into_iter().map(move |(key, revert_to_slot)| {
552 StorageBeforeTx {
553 address: revert.address,
554 key: B256::from(key.to_be_bytes()),
555 value: revert_to_slot.to_previous_value(),
556 }
557 })
558 })
559 .collect();
560 w.append_storage_changeset(changeset, block_number)?;
561 }
562 Ok(())
563 }
564
565 #[instrument(level = "debug", target = "providers::static_file", skip_all, fields(?segment))]
570 fn write_segment<F>(
571 &self,
572 segment: StaticFileSegment,
573 first_block_number: BlockNumber,
574 f: F,
575 ) -> ProviderResult<()>
576 where
577 F: FnOnce(&mut StaticFileProviderRWRefMut<'_, N>) -> ProviderResult<()>,
578 {
579 let mut w = self.get_writer(first_block_number, segment)?;
580 f(&mut w)?;
581 w.sync_all()
582 }
583
584 #[instrument(level = "debug", target = "providers::static_file", skip_all)]
589 pub fn write_blocks_data(
590 &self,
591 blocks: &[ExecutedBlock<N>],
592 tx_nums: &[TxNumber],
593 ctx: StaticFileWriteCtx,
594 runtime: &reth_tasks::Runtime,
595 ) -> ProviderResult<()> {
596 if blocks.is_empty() {
597 return Ok(());
598 }
599
600 let first_block_number = blocks[0].recovered_block().number();
601
602 let mut r_headers = None;
603 let mut r_txs = None;
604 let mut r_senders = None;
605 let mut r_receipts = None;
606 let mut r_account_changesets = None;
607 let mut r_storage_changesets = None;
608
609 let span = tracing::Span::current();
612 runtime.storage_pool().in_place_scope(|s| {
613 s.spawn(|_| {
614 let _guard = span.enter();
615 r_headers =
616 Some(self.write_segment(StaticFileSegment::Headers, first_block_number, |w| {
617 Self::write_headers(w, blocks)
618 }));
619 });
620
621 s.spawn(|_| {
622 let _guard = span.enter();
623 r_txs = Some(self.write_segment(
624 StaticFileSegment::Transactions,
625 first_block_number,
626 |w| Self::write_transactions(w, blocks, tx_nums),
627 ));
628 });
629
630 if ctx.write_senders {
631 s.spawn(|_| {
632 let _guard = span.enter();
633 r_senders = Some(self.write_segment(
634 StaticFileSegment::TransactionSenders,
635 first_block_number,
636 |w| Self::write_transaction_senders(w, blocks, tx_nums),
637 ));
638 });
639 }
640
641 if ctx.write_receipts {
642 s.spawn(|_| {
643 let _guard = span.enter();
644 r_receipts = Some(self.write_segment(
645 StaticFileSegment::Receipts,
646 first_block_number,
647 |w| Self::write_receipts(w, blocks, tx_nums, &ctx),
648 ));
649 });
650 }
651
652 if ctx.write_account_changesets {
653 s.spawn(|_| {
654 let _guard = span.enter();
655 r_account_changesets = Some(self.write_segment(
656 StaticFileSegment::AccountChangeSets,
657 first_block_number,
658 |w| Self::write_account_changesets(w, blocks),
659 ));
660 });
661 }
662
663 if ctx.write_storage_changesets {
664 s.spawn(|_| {
665 let _guard = span.enter();
666 r_storage_changesets = Some(self.write_segment(
667 StaticFileSegment::StorageChangeSets,
668 first_block_number,
669 |w| Self::write_storage_changesets(w, blocks),
670 ));
671 });
672 }
673 });
674
675 r_headers.ok_or(StaticFileWriterError::ThreadPanic("headers"))??;
676 r_txs.ok_or(StaticFileWriterError::ThreadPanic("transactions"))??;
677 if ctx.write_senders {
678 r_senders.ok_or(StaticFileWriterError::ThreadPanic("senders"))??;
679 }
680 if ctx.write_receipts {
681 r_receipts.ok_or(StaticFileWriterError::ThreadPanic("receipts"))??;
682 }
683 if ctx.write_account_changesets {
684 r_account_changesets
685 .ok_or(StaticFileWriterError::ThreadPanic("account_changesets"))??;
686 }
687 if ctx.write_storage_changesets {
688 r_storage_changesets
689 .ok_or(StaticFileWriterError::ThreadPanic("storage_changesets"))??;
690 }
691 Ok(())
692 }
693
694 pub fn get_segment_provider(
697 &self,
698 segment: StaticFileSegment,
699 number: u64,
700 ) -> ProviderResult<StaticFileJarProvider<'_, N>> {
701 if segment.is_block_or_change_based() {
702 self.get_segment_provider_for_block(segment, number, None)
703 } else {
704 self.get_segment_provider_for_transaction(segment, number, None)
705 }
706 }
707
708 pub fn get_maybe_segment_provider(
713 &self,
714 segment: StaticFileSegment,
715 number: u64,
716 ) -> ProviderResult<Option<StaticFileJarProvider<'_, N>>> {
717 let provider = if segment.is_block_or_change_based() {
718 self.get_segment_provider_for_block(segment, number, None)
719 } else {
720 self.get_segment_provider_for_transaction(segment, number, None)
721 };
722
723 match provider {
724 Ok(provider) => Ok(Some(provider)),
725 Err(
726 ProviderError::MissingStaticFileBlock(_, _) |
727 ProviderError::MissingStaticFileTx(_, _),
728 ) => Ok(None),
729 Err(err) => Err(err),
730 }
731 }
732
733 pub fn get_segment_provider_for_block(
735 &self,
736 segment: StaticFileSegment,
737 block: BlockNumber,
738 path: Option<&Path>,
739 ) -> ProviderResult<StaticFileJarProvider<'_, N>> {
740 self.get_segment_provider_for_range(
741 segment,
742 || self.get_segment_ranges_from_block(segment, block),
743 path,
744 )?
745 .ok_or(ProviderError::MissingStaticFileBlock(segment, block))
746 }
747
748 pub fn get_segment_provider_for_transaction(
750 &self,
751 segment: StaticFileSegment,
752 tx: TxNumber,
753 path: Option<&Path>,
754 ) -> ProviderResult<StaticFileJarProvider<'_, N>> {
755 self.get_segment_provider_for_range(
756 segment,
757 || self.get_segment_ranges_from_transaction(segment, tx),
758 path,
759 )?
760 .ok_or(ProviderError::MissingStaticFileTx(segment, tx))
761 }
762
763 pub fn get_segment_provider_for_range(
767 &self,
768 segment: StaticFileSegment,
769 fn_range: impl Fn() -> Option<SegmentRangeInclusive>,
770 path: Option<&Path>,
771 ) -> ProviderResult<Option<StaticFileJarProvider<'_, N>>> {
772 let block_range = match path {
775 Some(path) => StaticFileSegment::parse_filename(
776 &path
777 .file_name()
778 .ok_or_else(|| {
779 ProviderError::MissingStaticFileSegmentPath(segment, path.to_path_buf())
780 })?
781 .to_string_lossy(),
782 )
783 .and_then(|(parsed_segment, block_range)| {
784 if parsed_segment == segment {
785 return Some(block_range);
786 }
787 None
788 }),
789 None => fn_range(),
790 };
791
792 if let Some(block_range) = block_range {
794 return Ok(Some(self.get_or_create_jar_provider(segment, &block_range)?));
795 }
796
797 Ok(None)
798 }
799
800 pub fn get_segment_provider_for_path(
802 &self,
803 path: &Path,
804 ) -> ProviderResult<Option<StaticFileJarProvider<'_, N>>> {
805 StaticFileSegment::parse_filename(
806 &path
807 .file_name()
808 .ok_or_else(|| ProviderError::MissingStaticFilePath(path.to_path_buf()))?
809 .to_string_lossy(),
810 )
811 .map(|(segment, block_range)| self.get_or_create_jar_provider(segment, &block_range))
812 .transpose()
813 }
814
815 pub fn remove_cached_provider(
819 &self,
820 segment: StaticFileSegment,
821 fixed_block_range_end: BlockNumber,
822 ) {
823 self.map.remove(&(fixed_block_range_end, segment));
824 }
825
826 pub fn delete_segment_below_block(
843 &self,
844 segment: StaticFileSegment,
845 block: BlockNumber,
846 ) -> ProviderResult<Vec<SegmentHeader>> {
847 if block == 0 {
849 return Ok(Vec::new());
850 }
851
852 let highest_block = self.get_highest_static_file_block(segment);
853 let mut deleted_headers = Vec::new();
854
855 loop {
856 let Some(block_height) = self.get_lowest_range_end(segment) else {
857 return Ok(deleted_headers);
858 };
859
860 if block_height >= block || Some(block_height) == highest_block {
862 return Ok(deleted_headers);
863 }
864
865 debug!(
866 target: "providers::static_file",
867 ?segment,
868 ?block_height,
869 "Deleting static file below block"
870 );
871
872 let header = self.delete_jar(segment, block_height).inspect_err(|err| {
875 warn!( target: "providers::static_file", ?segment, %block_height, ?err, "Failed to delete static file below block")
876 })?;
877
878 deleted_headers.push(header);
879 }
880 }
881
882 pub fn delete_jar(
890 &self,
891 segment: StaticFileSegment,
892 block: BlockNumber,
893 ) -> ProviderResult<SegmentHeader> {
894 let fixed_block_range = self.find_fixed_range(segment, block);
895 let key = (fixed_block_range.end(), segment);
896 let file = self.path.join(segment.filename(&fixed_block_range));
897 let jar = if let Some((_, jar)) = self.map.remove(&key) {
898 jar.jar
899 } else {
900 debug!(
901 target: "providers::static_file",
902 ?file,
903 ?fixed_block_range,
904 ?block,
905 "Loading static file jar for deletion"
906 );
907 NippyJar::<SegmentHeader>::load(&file).map_err(ProviderError::other)?
908 };
909
910 let header = jar.user_header().clone();
911
912 if segment.is_change_based() {
914 let csoff_path = file.with_extension("csoff");
915 if csoff_path.exists() {
916 std::fs::remove_file(&csoff_path).map_err(ProviderError::other)?;
917 }
918 }
919
920 jar.delete().map_err(ProviderError::other)?;
921
922 self.initialize_index()?;
925
926 Ok(header)
927 }
928
929 pub fn delete_segment(&self, segment: StaticFileSegment) -> ProviderResult<Vec<SegmentHeader>> {
937 let mut deleted_headers = Vec::new();
938
939 self.writers.remove(segment);
940
941 while let Some(block_height) = self.get_highest_static_file_block(segment) {
942 debug!(
943 target: "providers::static_file",
944 ?segment,
945 ?block_height,
946 "Deleting static file jar"
947 );
948
949 let header = self.delete_jar(segment, block_height).inspect_err(|err| {
950 warn!(target: "providers::static_file", ?segment, %block_height, ?err, "Failed to delete static file jar")
951 })?;
952
953 deleted_headers.push(header);
954 }
955
956 Ok(deleted_headers)
957 }
958
959 fn get_or_create_jar_provider(
963 &self,
964 segment: StaticFileSegment,
965 fixed_block_range: &SegmentRangeInclusive,
966 ) -> ProviderResult<StaticFileJarProvider<'_, N>> {
967 let key = (fixed_block_range.end(), segment);
968
969 trace!(target: "providers::static_file", ?segment, ?fixed_block_range, "Getting provider");
971 let mut provider: StaticFileJarProvider<'_, N> = if let Some(jar) = self.map.get(&key) {
972 trace!(target: "providers::static_file", ?segment, ?fixed_block_range, "Jar found in cache");
973 jar.into()
974 } else {
975 trace!(target: "providers::static_file", ?segment, ?fixed_block_range, "Creating jar from scratch");
976 let path = self.path.join(segment.filename(fixed_block_range));
977 let jar = NippyJar::load(&path).map_err(ProviderError::other)?;
978 self.map.entry(key).insert(LoadedJar::new(jar)?).downgrade().into()
979 };
980
981 if let Some(metrics) = &self.metrics {
982 provider = provider.with_metrics(metrics.clone());
983 }
984 Ok(provider)
985 }
986
987 fn get_segment_ranges_from_block(
990 &self,
991 segment: StaticFileSegment,
992 block: u64,
993 ) -> Option<SegmentRangeInclusive> {
994 let indexes = self.indexes.read();
995 let index = indexes.get(segment)?;
996
997 (index.max_block >= block).then(|| {
998 self.find_fixed_range_with_block_index(
999 segment,
1000 Some(&index.expected_block_ranges_by_max_block),
1001 block,
1002 )
1003 })
1004 }
1005
1006 fn get_segment_ranges_from_transaction(
1009 &self,
1010 segment: StaticFileSegment,
1011 tx: u64,
1012 ) -> Option<SegmentRangeInclusive> {
1013 let indexes = self.indexes.read();
1014 let index = indexes.get(segment)?;
1015 let available_block_ranges_by_max_tx = index.available_block_ranges_by_max_tx.as_ref()?;
1016
1017 let mut static_files_rev_iter = available_block_ranges_by_max_tx.iter().rev().peekable();
1020
1021 while let Some((tx_end, block_range)) = static_files_rev_iter.next() {
1022 if tx > *tx_end {
1023 return None;
1025 }
1026 let tx_start = static_files_rev_iter.peek().map(|(tx_end, _)| *tx_end + 1).unwrap_or(0);
1027 if tx_start <= tx {
1028 return Some(self.find_fixed_range_with_block_index(
1029 segment,
1030 Some(&index.expected_block_ranges_by_max_block),
1031 block_range.end(),
1032 ));
1033 }
1034 }
1035 None
1036 }
1037
1038 pub fn update_index(
1045 &self,
1046 segment: StaticFileSegment,
1047 segment_max_block: Option<BlockNumber>,
1048 ) -> ProviderResult<()> {
1049 debug!(
1050 target: "providers::static_file",
1051 ?segment,
1052 ?segment_max_block,
1053 "Updating provider index"
1054 );
1055 let mut indexes = self.indexes.write();
1056
1057 match segment_max_block {
1058 Some(segment_max_block) => {
1059 let fixed_range = self.find_fixed_range_with_block_index(
1060 segment,
1061 indexes.get(segment).map(|index| &index.expected_block_ranges_by_max_block),
1062 segment_max_block,
1063 );
1064
1065 let jar = NippyJar::<SegmentHeader>::load(
1066 &self.path.join(segment.filename(&fixed_range)),
1067 )
1068 .map_err(ProviderError::other)?;
1069
1070 let index = indexes
1071 .entry(segment)
1072 .and_modify(|index| {
1073 index.max_block = segment_max_block;
1075
1076 index
1080 .expected_block_ranges_by_max_block
1081 .retain(|_, block_range| block_range.start() < fixed_range.start());
1082 index
1084 .expected_block_ranges_by_max_block
1085 .insert(fixed_range.end(), fixed_range);
1086 })
1087 .or_insert_with(|| StaticFileSegmentIndex {
1088 min_block_range: None,
1089 max_block: segment_max_block,
1090 expected_block_ranges_by_max_block: BTreeMap::from([(
1091 fixed_range.end(),
1092 fixed_range,
1093 )]),
1094 available_block_ranges_by_max_tx: None,
1095 });
1096
1097 if let Some(current_block_range) = jar.user_header().block_range() {
1113 if let Some(min_block_range) = index.min_block_range.as_mut() {
1114 if current_block_range.start() == min_block_range.start() {
1117 *min_block_range = current_block_range;
1118 }
1119 } else {
1120 index.min_block_range = Some(current_block_range);
1121 }
1122 }
1123
1124 if let Some(tx_range) = jar.user_header().tx_range() {
1127 if let Some(current_block_range) = jar.user_header().block_range() {
1130 let tx_end = tx_range.end();
1131
1132 if let Some(index) = index.available_block_ranges_by_max_tx.as_mut() {
1141 index
1142 .retain(|_, block_range| block_range.start() < fixed_range.start());
1143 index.insert(tx_end, current_block_range);
1144 } else {
1145 index.available_block_ranges_by_max_tx =
1146 Some(BTreeMap::from([(tx_end, current_block_range)]));
1147 }
1148 }
1149 } else if segment.is_tx_based() {
1150 if let Some(index) = index.available_block_ranges_by_max_tx.as_mut() {
1154 index.retain(|_, block_range| block_range.start() < fixed_range.start());
1155 }
1156
1157 index.available_block_ranges_by_max_tx.take_if(|index| index.is_empty());
1159 }
1160
1161 debug!(target: "providers::static_file", ?segment, "Inserting updated jar into cache");
1163 self.map.insert((fixed_range.end(), segment), LoadedJar::new(jar)?);
1164
1165 debug!(target: "providers::static_file", ?segment, "Cleaning up jar map");
1167 self.map.retain(|(end, seg), _| !(*seg == segment && *end > fixed_range.end()));
1168 }
1169 None => {
1170 debug!(target: "providers::static_file", ?segment, "Removing segment from index");
1171 indexes.remove(segment);
1172 }
1173 };
1174
1175 debug!(target: "providers::static_file", ?segment, "Updated provider index");
1176 Ok(())
1177 }
1178
1179 pub fn initialize_index(&self) -> ProviderResult<()> {
1181 let mut indexes = self.indexes.write();
1182 indexes.clear();
1183
1184 for (segment, headers) in &*iter_static_files(&self.path).map_err(ProviderError::other)? {
1185 let min_block_range = Some(headers.first().expect("headers are not empty").0);
1190 let max_block = headers.last().expect("headers are not empty").0.end();
1191
1192 let mut expected_block_ranges_by_max_block = BTreeMap::default();
1193 let mut available_block_ranges_by_max_tx = None;
1194
1195 for (block_range, header) in headers {
1196 expected_block_ranges_by_max_block
1198 .insert(header.expected_block_end(), header.expected_block_range());
1199
1200 if let Some(tx_range) = header.tx_range() {
1202 let tx_end = tx_range.end();
1203
1204 available_block_ranges_by_max_tx
1205 .get_or_insert_with(BTreeMap::default)
1206 .insert(tx_end, *block_range);
1207 }
1208 }
1209
1210 indexes.insert(
1211 segment,
1212 StaticFileSegmentIndex {
1213 min_block_range,
1214 max_block,
1215 expected_block_ranges_by_max_block,
1216 available_block_ranges_by_max_tx,
1217 },
1218 );
1219 }
1220
1221 self.map.clear();
1223
1224 if let Some(lowest_range) =
1226 indexes.get(StaticFileSegment::Transactions).and_then(|index| index.min_block_range)
1227 {
1228 self.earliest_history_height
1230 .store(lowest_range.start(), std::sync::atomic::Ordering::Relaxed);
1231 }
1232
1233 Ok(())
1234 }
1235
1236 #[instrument(skip(self, provider), fields(read_only = self.is_read_only()))]
1260 pub fn check_consistency<Provider>(
1261 &self,
1262 provider: &Provider,
1263 ) -> ProviderResult<Option<PipelineTarget>>
1264 where
1265 Provider: DBProvider
1266 + BlockReader
1267 + StageCheckpointReader
1268 + PruneCheckpointReader
1269 + ChainSpecProvider
1270 + StorageSettingsCache,
1271 N: NodePrimitives<Receipt: Value, BlockHeader: Value, SignedTx: Value>,
1272 {
1273 if provider.chain_spec().is_optimism() &&
1280 reth_chainspec::Chain::optimism_mainnet() == provider.chain_spec().chain_id()
1281 {
1282 const OVM_HEADER_1_HASH: B256 =
1284 b256!("0xbee7192e575af30420cae0c7776304ac196077ee72b048970549e4f08e875453");
1285 if provider.block_number(OVM_HEADER_1_HASH)?.is_some() {
1286 info!(target: "reth::cli",
1287 "Skipping storage verification for OP mainnet, expected inconsistency in OVM chain"
1288 );
1289 return Ok(None);
1290 }
1291 }
1292
1293 info!(target: "reth::cli", "Verifying storage consistency.");
1294
1295 let mut unwind_target: Option<BlockNumber> = None;
1296
1297 let mut update_unwind_target = |new_target| {
1298 unwind_target =
1299 unwind_target.map(|current| current.min(new_target)).or(Some(new_target));
1300 };
1301
1302 for segment in self.segments_to_check(provider) {
1303 let span = info_span!(
1304 "Checking consistency for segment",
1305 ?segment,
1306 initial_highest_block = tracing::field::Empty,
1307 highest_block = tracing::field::Empty,
1308 highest_tx = tracing::field::Empty,
1309 );
1310 let _guard = span.enter();
1311
1312 debug!(target: "reth::providers::static_file", "Checking consistency for segment");
1313
1314 let (initial_highest_block, mut highest_block) = self.maybe_heal_segment(segment)?;
1316 span.record("initial_highest_block", initial_highest_block);
1317 span.record("highest_block", highest_block);
1318
1319 if initial_highest_block != highest_block {
1324 info!(
1325 target: "reth::providers::static_file",
1326 unwind_target = highest_block,
1327 "Setting unwind target."
1328 );
1329 update_unwind_target(highest_block.unwrap_or_default());
1330 }
1331
1332 let highest_tx = self.get_highest_static_file_tx(segment);
1338 span.record("highest_tx", highest_tx);
1339 debug!(target: "reth::providers::static_file", "Checking tx index segment");
1340
1341 if let Some(highest_tx) = highest_tx {
1342 let mut last_block = highest_block.unwrap_or_default();
1343 debug!(target: "reth::providers::static_file", last_block, highest_tx, "Verifying last transaction matches last block indices");
1344 loop {
1345 let Some(indices) = provider.block_body_indices(last_block)? else {
1346 debug!(target: "reth::providers::static_file", last_block, "Block body indices not found, static files ahead of database");
1347 break
1351 };
1352
1353 debug!(target: "reth::providers::static_file", last_block, last_tx_num = indices.last_tx_num(), "Found block body indices");
1354
1355 if indices.last_tx_num() <= highest_tx {
1356 break
1357 }
1358
1359 if last_block == 0 {
1360 debug!(target: "reth::providers::static_file", "Reached block 0 in verification loop");
1361 break
1362 }
1363
1364 last_block -= 1;
1365
1366 info!(
1367 target: "reth::providers::static_file",
1368 highest_block = self.get_highest_static_file_block(segment),
1369 unwind_target = last_block,
1370 "Setting unwind target."
1371 );
1372 span.record("highest_block", last_block);
1373 highest_block = Some(last_block);
1374 update_unwind_target(last_block);
1375 }
1376 }
1377
1378 debug!(target: "reth::providers::static_file", "Ensuring invariants for segment");
1379
1380 match self.ensure_invariants_for(provider, segment, highest_tx, highest_block)? {
1381 Some(unwind) => {
1382 debug!(target: "reth::providers::static_file", unwind_target=unwind, "Invariants check returned unwind target");
1383 update_unwind_target(unwind);
1384 }
1385 None => {
1386 debug!(target: "reth::providers::static_file", "Invariants check completed, no unwind needed")
1387 }
1388 }
1389 }
1390
1391 Ok(unwind_target.map(PipelineTarget::Unwind))
1392 }
1393
1394 pub fn check_file_consistency<Provider>(&self, provider: &Provider) -> ProviderResult<()>
1403 where
1404 Provider: DBProvider + ChainSpecProvider + StorageSettingsCache + PruneCheckpointReader,
1405 {
1406 info!(target: "reth::cli", "Healing static file inconsistencies.");
1407
1408 for segment in self.segments_to_check(provider) {
1409 let _guard = info_span!("healing_static_file_segment", ?segment).entered();
1410 let _ = self.maybe_heal_segment(segment)?;
1411 }
1412
1413 Ok(())
1414 }
1415
1416 fn segments_to_check<'a, Provider>(
1418 &'a self,
1419 provider: &'a Provider,
1420 ) -> impl Iterator<Item = StaticFileSegment> + 'a
1421 where
1422 Provider: DBProvider + ChainSpecProvider + StorageSettingsCache + PruneCheckpointReader,
1423 {
1424 StaticFileSegment::iter()
1425 .filter(move |segment| self.should_check_segment(provider, *segment))
1426 }
1427
1428 fn should_check_segment<Provider>(
1430 &self,
1431 provider: &Provider,
1432 segment: StaticFileSegment,
1433 ) -> bool
1434 where
1435 Provider: DBProvider + ChainSpecProvider + StorageSettingsCache + PruneCheckpointReader,
1436 {
1437 match segment {
1438 StaticFileSegment::Headers | StaticFileSegment::Transactions => true,
1439 StaticFileSegment::Receipts => {
1440 if EitherWriter::receipts_destination(provider).is_database() {
1441 debug!(target: "reth::providers::static_file", ?segment, "Skipping receipts segment: receipts stored in database");
1444 return false;
1445 }
1446
1447 if NamedChain::Gnosis == provider.chain_spec().chain_id() ||
1448 NamedChain::Chiado == provider.chain_spec().chain_id()
1449 {
1450 debug!(target: "reth::providers::static_file", ?segment, "Skipping receipts segment: broken historical import for gnosis/chiado");
1454 return false;
1455 }
1456
1457 true
1458 }
1459 StaticFileSegment::TransactionSenders => {
1460 if EitherWriterDestination::senders(provider).is_database() {
1461 debug!(target: "reth::providers::static_file", ?segment, "Skipping senders segment: senders stored in database");
1462 return false;
1463 }
1464
1465 if Self::is_segment_fully_pruned(provider, PruneSegment::SenderRecovery) {
1466 debug!(target: "reth::providers::static_file", ?segment, "Skipping senders segment: fully pruned");
1467 return false;
1468 }
1469
1470 true
1471 }
1472 StaticFileSegment::AccountChangeSets => {
1473 if EitherWriter::account_changesets_destination(provider).is_database() {
1474 debug!(target: "reth::providers::static_file", ?segment, "Skipping account changesets segment: changesets stored in database");
1475 return false;
1476 }
1477 true
1478 }
1479 StaticFileSegment::StorageChangeSets => {
1480 if EitherWriter::storage_changesets_destination(provider).is_database() {
1481 debug!(target: "reth::providers::static_file", ?segment, "Skipping storage changesets segment: changesets stored in database");
1482 return false
1483 }
1484 true
1485 }
1486 }
1487 }
1488
1489 fn is_segment_fully_pruned<Provider>(provider: &Provider, segment: PruneSegment) -> bool
1493 where
1494 Provider: PruneCheckpointReader,
1495 {
1496 provider
1497 .get_prune_checkpoint(segment)
1498 .ok()
1499 .flatten()
1500 .is_some_and(|checkpoint| checkpoint.prune_mode.is_full())
1501 }
1502
1503 fn check_segment_consistency(&self, segment: StaticFileSegment) -> ProviderResult<()> {
1508 debug!(target: "reth::providers::static_file", "Checking segment consistency");
1509 if let Some(latest_block) = self.get_highest_static_file_block(segment) {
1510 let file_path = self
1511 .directory()
1512 .join(segment.filename(&self.find_fixed_range(segment, latest_block)));
1513 debug!(target: "reth::providers::static_file", ?file_path, latest_block, "Loading NippyJar for consistency check");
1514
1515 let jar = NippyJar::<SegmentHeader>::load(&file_path).map_err(ProviderError::other)?;
1516 debug!(target: "reth::providers::static_file", "NippyJar loaded, checking consistency");
1517
1518 NippyJarChecker::new(jar).check_consistency().map_err(ProviderError::other)?;
1519 debug!(target: "reth::providers::static_file", "NippyJar consistency check passed");
1520 } else {
1521 debug!(target: "reth::providers::static_file", "No static file block found, skipping consistency check");
1522 }
1523 Ok(())
1524 }
1525
1526 fn maybe_heal_segment(
1542 &self,
1543 segment: StaticFileSegment,
1544 ) -> ProviderResult<(Option<BlockNumber>, Option<BlockNumber>)> {
1545 let initial_highest_block = self.get_highest_static_file_block(segment);
1546 debug!(target: "reth::providers::static_file", ?initial_highest_block, "Initial highest block for segment");
1547
1548 if self.access.is_read_only() {
1549 debug!(target: "reth::providers::static_file", "Checking segment consistency (read-only)");
1552 self.check_segment_consistency(segment)?;
1553 } else {
1554 debug!(target: "reth::providers::static_file", "Fetching latest writer which might heal any potential inconsistency");
1557 self.latest_writer(segment)?;
1558 }
1559
1560 let highest_block = self.get_highest_static_file_block(segment);
1563
1564 Ok((initial_highest_block, highest_block))
1565 }
1566
1567 fn ensure_invariants_for<Provider>(
1569 &self,
1570 provider: &Provider,
1571 segment: StaticFileSegment,
1572 highest_tx: Option<u64>,
1573 highest_block: Option<BlockNumber>,
1574 ) -> ProviderResult<Option<BlockNumber>>
1575 where
1576 Provider: DBProvider + BlockReader + StageCheckpointReader,
1577 N: NodePrimitives<Receipt: Value, BlockHeader: Value, SignedTx: Value>,
1578 {
1579 match segment {
1580 StaticFileSegment::Headers => self
1581 .ensure_invariants::<_, tables::Headers<N::BlockHeader>>(
1582 provider,
1583 segment,
1584 highest_block,
1585 highest_block,
1586 ),
1587 StaticFileSegment::Transactions => self
1588 .ensure_invariants::<_, tables::Transactions<N::SignedTx>>(
1589 provider,
1590 segment,
1591 highest_tx,
1592 highest_block,
1593 ),
1594 StaticFileSegment::Receipts => self
1595 .ensure_invariants::<_, tables::Receipts<N::Receipt>>(
1596 provider,
1597 segment,
1598 highest_tx,
1599 highest_block,
1600 ),
1601 StaticFileSegment::TransactionSenders => self
1602 .ensure_invariants::<_, tables::TransactionSenders>(
1603 provider,
1604 segment,
1605 highest_tx,
1606 highest_block,
1607 ),
1608 StaticFileSegment::AccountChangeSets => self
1609 .ensure_invariants::<_, tables::AccountChangeSets>(
1610 provider,
1611 segment,
1612 highest_tx,
1613 highest_block,
1614 ),
1615 StaticFileSegment::StorageChangeSets => self
1616 .ensure_changeset_invariants_by_block::<_, tables::StorageChangeSets, _>(
1617 provider,
1618 segment,
1619 highest_block,
1620 |key| key.block_number(),
1621 ),
1622 }
1623 }
1624
1625 #[instrument(skip(self, provider, segment), fields(table = T::NAME))]
1640 fn ensure_invariants<Provider, T: Table<Key = u64>>(
1641 &self,
1642 provider: &Provider,
1643 segment: StaticFileSegment,
1644 highest_static_file_entry: Option<u64>,
1645 highest_static_file_block: Option<BlockNumber>,
1646 ) -> ProviderResult<Option<BlockNumber>>
1647 where
1648 Provider: DBProvider + BlockReader + StageCheckpointReader,
1649 {
1650 debug!(target: "reth::providers::static_file", "Ensuring invariants");
1651 let mut db_cursor = provider.tx_ref().cursor_read::<T>()?;
1652
1653 if let Some((db_first_entry, _)) = db_cursor.first()? {
1654 debug!(target: "reth::providers::static_file", db_first_entry, "Found first database entry");
1655 if let (Some(highest_entry), Some(highest_block)) =
1656 (highest_static_file_entry, highest_static_file_block)
1657 {
1658 if !(db_first_entry <= highest_entry || highest_entry + 1 == db_first_entry) {
1662 info!(
1663 target: "reth::providers::static_file",
1664 ?db_first_entry,
1665 ?highest_entry,
1666 unwind_target = highest_block,
1667 "Setting unwind target."
1668 );
1669 return Ok(Some(highest_block));
1670 }
1671 }
1672
1673 if let Some((db_last_entry, _)) = db_cursor.last()? &&
1674 highest_static_file_entry
1675 .is_none_or(|highest_entry| db_last_entry > highest_entry)
1676 {
1677 debug!(target: "reth::providers::static_file", db_last_entry, "Database has entries beyond static files, no unwind needed");
1678 return Ok(None)
1679 }
1680 } else {
1681 debug!(target: "reth::providers::static_file", "No database entries found");
1682 }
1683
1684 let highest_static_file_entry = highest_static_file_entry.unwrap_or_default();
1685 let highest_static_file_block = highest_static_file_block.unwrap_or_default();
1686
1687 let stage_id = segment.to_stage_id();
1690 let checkpoint_block_number =
1691 provider.get_stage_checkpoint(stage_id)?.unwrap_or_default().block_number;
1692 debug!(target: "reth::providers::static_file", ?stage_id, checkpoint_block_number, "Retrieved stage checkpoint");
1693
1694 if checkpoint_block_number > highest_static_file_block {
1696 info!(
1697 target: "reth::providers::static_file",
1698 checkpoint_block_number,
1699 unwind_target = highest_static_file_block,
1700 "Setting unwind target."
1701 );
1702 return Ok(Some(highest_static_file_block));
1703 }
1704
1705 if checkpoint_block_number >= highest_static_file_block {
1707 debug!(target: "reth::providers::static_file", "Invariants ensured, returning None");
1708 return Ok(None);
1709 }
1710
1711 info!(
1717 target: "reth::providers",
1718 from = highest_static_file_block,
1719 to = checkpoint_block_number,
1720 "Unwinding static file segment."
1721 );
1722 let mut writer = self.latest_writer(segment)?;
1723
1724 match segment {
1725 StaticFileSegment::Headers => {
1726 let prune_count = highest_static_file_block - checkpoint_block_number;
1727 debug!(target: "reth::providers::static_file", prune_count, "Pruning headers");
1728 writer.prune_headers(prune_count)?;
1730 }
1731 StaticFileSegment::Transactions |
1732 StaticFileSegment::Receipts |
1733 StaticFileSegment::TransactionSenders => {
1734 if let Some(block) = provider.block_body_indices(checkpoint_block_number)? {
1735 let number = highest_static_file_entry - block.last_tx_num();
1736 debug!(target: "reth::providers::static_file", prune_count = number, checkpoint_block_number, "Pruning transaction based segment");
1737
1738 match segment {
1739 StaticFileSegment::Transactions => {
1740 writer.prune_transactions(number, checkpoint_block_number)?
1741 }
1742 StaticFileSegment::Receipts => {
1743 writer.prune_receipts(number, checkpoint_block_number)?
1744 }
1745 StaticFileSegment::TransactionSenders => {
1746 writer.prune_transaction_senders(number, checkpoint_block_number)?
1747 }
1748 StaticFileSegment::Headers |
1749 StaticFileSegment::AccountChangeSets |
1750 StaticFileSegment::StorageChangeSets => {
1751 unreachable!()
1752 }
1753 }
1754 } else {
1755 debug!(target: "reth::providers::static_file", checkpoint_block_number, "No block body indices found for checkpoint block");
1756 }
1757 }
1758 StaticFileSegment::AccountChangeSets => {
1759 writer.prune_account_changesets(checkpoint_block_number)?;
1760 }
1761 StaticFileSegment::StorageChangeSets => {
1762 writer.prune_storage_changesets(checkpoint_block_number)?;
1763 }
1764 }
1765
1766 debug!(target: "reth::providers::static_file", "Committing writer after pruning");
1767 writer.commit()?;
1768 debug!(target: "reth::providers::static_file", "Writer committed successfully");
1769
1770 debug!(target: "reth::providers::static_file", "Invariants ensured, returning None");
1771 Ok(None)
1772 }
1773
1774 fn ensure_changeset_invariants_by_block<Provider, T, F>(
1775 &self,
1776 provider: &Provider,
1777 segment: StaticFileSegment,
1778 highest_static_file_block: Option<BlockNumber>,
1779 block_from_key: F,
1780 ) -> ProviderResult<Option<BlockNumber>>
1781 where
1782 Provider: DBProvider + BlockReader + StageCheckpointReader,
1783 T: Table,
1784 F: Fn(&T::Key) -> BlockNumber,
1785 {
1786 debug!(
1787 target: "reth::providers::static_file",
1788 ?segment,
1789 ?highest_static_file_block,
1790 "Ensuring changeset invariants"
1791 );
1792 let mut db_cursor = provider.tx_ref().cursor_read::<T>()?;
1793
1794 if let Some((db_first_key, _)) = db_cursor.first()? {
1795 let db_first_block = block_from_key(&db_first_key);
1796 if let Some(highest_block) = highest_static_file_block &&
1797 !(db_first_block <= highest_block || highest_block + 1 == db_first_block)
1798 {
1799 info!(
1800 target: "reth::providers::static_file",
1801 ?db_first_block,
1802 ?highest_block,
1803 unwind_target = highest_block,
1804 ?segment,
1805 "Setting unwind target."
1806 );
1807 return Ok(Some(highest_block))
1808 }
1809
1810 if let Some((db_last_key, _)) = db_cursor.last()? &&
1811 highest_static_file_block
1812 .is_none_or(|highest_block| block_from_key(&db_last_key) > highest_block)
1813 {
1814 debug!(
1815 target: "reth::providers::static_file",
1816 ?segment,
1817 "Database has entries beyond static files, no unwind needed"
1818 );
1819 return Ok(None)
1820 }
1821 } else {
1822 debug!(target: "reth::providers::static_file", ?segment, "No database entries found");
1823 }
1824
1825 let highest_static_file_block = highest_static_file_block.unwrap_or_default();
1826
1827 let stage_id = segment.to_stage_id();
1828 let checkpoint_block_number =
1829 provider.get_stage_checkpoint(stage_id)?.unwrap_or_default().block_number;
1830
1831 if checkpoint_block_number > highest_static_file_block {
1832 info!(
1833 target: "reth::providers::static_file",
1834 checkpoint_block_number,
1835 unwind_target = highest_static_file_block,
1836 ?segment,
1837 "Setting unwind target."
1838 );
1839 return Ok(Some(highest_static_file_block))
1840 }
1841
1842 if checkpoint_block_number < highest_static_file_block {
1843 info!(
1844 target: "reth::providers",
1845 ?segment,
1846 from = highest_static_file_block,
1847 to = checkpoint_block_number,
1848 "Unwinding static file segment."
1849 );
1850 let mut writer = self.latest_writer(segment)?;
1851 match segment {
1852 StaticFileSegment::AccountChangeSets => {
1853 writer.prune_account_changesets(checkpoint_block_number)?;
1854 }
1855 StaticFileSegment::StorageChangeSets => {
1856 writer.prune_storage_changesets(checkpoint_block_number)?;
1857 }
1858 _ => unreachable!("invalid segment for changeset invariants"),
1859 }
1860 writer.commit()?;
1861 }
1862
1863 Ok(None)
1864 }
1865
1866 pub fn earliest_history_height(&self) -> BlockNumber {
1874 self.earliest_history_height.load(std::sync::atomic::Ordering::Relaxed)
1875 }
1876
1877 pub fn get_lowest_range(&self, segment: StaticFileSegment) -> Option<SegmentRangeInclusive> {
1881 self.indexes.read().get(segment).and_then(|index| index.min_block_range)
1882 }
1883
1884 pub fn get_lowest_range_start(&self, segment: StaticFileSegment) -> Option<BlockNumber> {
1890 self.get_lowest_range(segment).map(|range| range.start())
1891 }
1892
1893 pub fn get_lowest_range_end(&self, segment: StaticFileSegment) -> Option<BlockNumber> {
1899 self.get_lowest_range(segment).map(|range| range.end())
1900 }
1901
1902 pub fn get_highest_static_file_block(&self, segment: StaticFileSegment) -> Option<BlockNumber> {
1906 self.indexes.read().get(segment).map(|index| index.max_block)
1907 }
1908
1909 fn bound_range(
1916 &self,
1917 range: impl RangeBounds<BlockNumber>,
1918 segment: StaticFileSegment,
1919 ) -> RangeInclusive<BlockNumber> {
1920 let highest_block = self.get_highest_static_file_block(segment).unwrap_or(0);
1921
1922 let start = match range.start_bound() {
1923 Bound::Included(&n) => n,
1924 Bound::Excluded(&n) => n.saturating_add(1),
1925 Bound::Unbounded => 0,
1926 };
1927 let end = match range.end_bound() {
1928 Bound::Included(&n) => n.min(highest_block),
1929 Bound::Excluded(&n) => n.saturating_sub(1).min(highest_block),
1930 Bound::Unbounded => highest_block,
1931 };
1932
1933 start..=end
1934 }
1935
1936 pub fn get_highest_static_file_tx(&self, segment: StaticFileSegment) -> Option<TxNumber> {
1940 self.indexes
1941 .read()
1942 .get(segment)
1943 .and_then(|index| index.available_block_ranges_by_max_tx.as_ref())
1944 .and_then(|index| index.last_key_value().map(|(last_tx, _)| *last_tx))
1945 }
1946
1947 pub fn get_highest_static_files(&self) -> HighestStaticFiles {
1949 HighestStaticFiles {
1950 receipts: self.get_highest_static_file_block(StaticFileSegment::Receipts),
1951 }
1952 }
1953
1954 pub fn find_static_file<T>(
1957 &self,
1958 segment: StaticFileSegment,
1959 func: impl Fn(StaticFileJarProvider<'_, N>) -> ProviderResult<Option<T>>,
1960 ) -> ProviderResult<Option<T>> {
1961 if let Some(ranges) =
1962 self.indexes.read().get(segment).map(|index| &index.expected_block_ranges_by_max_block)
1963 {
1964 for range in ranges.values().rev() {
1966 if let Some(res) = func(self.get_or_create_jar_provider(segment, range)?)? {
1967 return Ok(Some(res));
1968 }
1969 }
1970 }
1971
1972 Ok(None)
1973 }
1974
1975 pub fn fetch_range_with_predicate<T, F, P>(
1981 &self,
1982 segment: StaticFileSegment,
1983 range: Range<u64>,
1984 mut get_fn: F,
1985 mut predicate: P,
1986 ) -> ProviderResult<Vec<T>>
1987 where
1988 F: FnMut(&mut StaticFileCursor<'_>, u64) -> ProviderResult<Option<T>>,
1989 P: FnMut(&T) -> bool,
1990 {
1991 let mut result = Vec::with_capacity((range.end - range.start).min(100) as usize);
1992
1993 macro_rules! get_provider {
1997 ($number:expr) => {{
1998 match self.get_segment_provider(segment, $number) {
1999 Ok(provider) => provider,
2000 Err(
2001 ProviderError::MissingStaticFileBlock(_, _) |
2002 ProviderError::MissingStaticFileTx(_, _),
2003 ) => return Ok(result),
2004 Err(err) => return Err(err),
2005 }
2006 }};
2007 }
2008
2009 let mut provider = get_provider!(range.start);
2010 let mut cursor = provider.cursor()?;
2011
2012 'outer: for number in range {
2014 let mut retrying = false;
2018
2019 'inner: loop {
2021 match get_fn(&mut cursor, number)? {
2022 Some(res) => {
2023 if !predicate(&res) {
2024 break 'outer;
2025 }
2026 result.push(res);
2027 break 'inner;
2028 }
2029 None => {
2030 if retrying {
2031 return Ok(result);
2032 }
2033 drop(cursor);
2038 drop(provider);
2039 provider = get_provider!(number);
2040 cursor = provider.cursor()?;
2041 retrying = true;
2042 }
2043 }
2044 }
2045 }
2046
2047 result.shrink_to_fit();
2048
2049 Ok(result)
2050 }
2051
2052 pub fn fetch_range_iter<'a, T, F>(
2057 &'a self,
2058 segment: StaticFileSegment,
2059 range: Range<u64>,
2060 get_fn: F,
2061 ) -> ProviderResult<impl Iterator<Item = ProviderResult<Option<T>>> + 'a>
2062 where
2063 F: Fn(&mut StaticFileCursor<'_>, u64) -> ProviderResult<Option<T>> + 'a,
2064 T: std::fmt::Debug,
2065 {
2066 let mut provider = self.get_maybe_segment_provider(segment, range.start)?;
2067 Ok(range.map(move |number| {
2068 match provider
2069 .as_ref()
2070 .map(|provider| get_fn(&mut provider.cursor()?, number))
2071 .and_then(|result| result.transpose())
2072 {
2073 Some(result) => result.map(Some),
2074 None => {
2075 provider.take();
2079 provider = self.get_maybe_segment_provider(segment, number)?;
2080 provider
2081 .as_ref()
2082 .map(|provider| get_fn(&mut provider.cursor()?, number))
2083 .and_then(|result| result.transpose())
2084 .transpose()
2085 }
2086 }
2087 }))
2088 }
2089
2090 pub fn directory(&self) -> &Path {
2092 &self.path
2093 }
2094
2095 pub fn get_with_static_file_or_database<T, FS, FD>(
2105 &self,
2106 segment: StaticFileSegment,
2107 number: u64,
2108 fetch_from_static_file: FS,
2109 fetch_from_database: FD,
2110 ) -> ProviderResult<Option<T>>
2111 where
2112 FS: Fn(&Self) -> ProviderResult<Option<T>>,
2113 FD: Fn() -> ProviderResult<Option<T>>,
2114 {
2115 let static_file_upper_bound = if segment.is_block_or_change_based() {
2117 self.get_highest_static_file_block(segment)
2118 } else {
2119 self.get_highest_static_file_tx(segment)
2120 };
2121
2122 if static_file_upper_bound
2123 .is_some_and(|static_file_upper_bound| static_file_upper_bound >= number)
2124 {
2125 return fetch_from_static_file(self);
2126 }
2127 fetch_from_database()
2128 }
2129
2130 pub fn get_range_with_static_file_or_database<T, P, FS, FD>(
2142 &self,
2143 segment: StaticFileSegment,
2144 mut block_or_tx_range: Range<u64>,
2145 fetch_from_static_file: FS,
2146 mut fetch_from_database: FD,
2147 mut predicate: P,
2148 ) -> ProviderResult<Vec<T>>
2149 where
2150 FS: Fn(&Self, Range<u64>, &mut P) -> ProviderResult<Vec<T>>,
2151 FD: FnMut(Range<u64>, P) -> ProviderResult<Vec<T>>,
2152 P: FnMut(&T) -> bool,
2153 {
2154 let mut data = Vec::new();
2155
2156 if let Some(static_file_upper_bound) = if segment.is_block_or_change_based() {
2158 self.get_highest_static_file_block(segment)
2159 } else {
2160 self.get_highest_static_file_tx(segment)
2161 } && block_or_tx_range.start <= static_file_upper_bound
2162 {
2163 let end = block_or_tx_range.end.min(static_file_upper_bound + 1);
2164 data.extend(fetch_from_static_file(
2165 self,
2166 block_or_tx_range.start..end,
2167 &mut predicate,
2168 )?);
2169 block_or_tx_range.start = end;
2170 }
2171
2172 if block_or_tx_range.end > block_or_tx_range.start {
2173 data.extend(fetch_from_database(block_or_tx_range, predicate)?)
2174 }
2175
2176 Ok(data)
2177 }
2178
2179 #[cfg(any(test, feature = "test-utils"))]
2181 pub fn path(&self) -> &Path {
2182 &self.path
2183 }
2184
2185 #[cfg(any(test, feature = "test-utils"))]
2187 pub fn tx_index(&self, segment: StaticFileSegment) -> Option<SegmentRanges> {
2188 self.indexes
2189 .read()
2190 .get(segment)
2191 .and_then(|index| index.available_block_ranges_by_max_tx.as_ref())
2192 .cloned()
2193 }
2194
2195 #[cfg(any(test, feature = "test-utils"))]
2197 pub fn expected_block_index(&self, segment: StaticFileSegment) -> Option<SegmentRanges> {
2198 self.indexes
2199 .read()
2200 .get(segment)
2201 .map(|index| &index.expected_block_ranges_by_max_block)
2202 .cloned()
2203 }
2204}
2205
2206#[derive(Debug)]
2207struct StaticFileSegmentIndex {
2208 min_block_range: Option<SegmentRangeInclusive>,
2220 max_block: u64,
2222 expected_block_ranges_by_max_block: SegmentRanges,
2228 available_block_ranges_by_max_tx: Option<SegmentRanges>,
2235}
2236
2237pub trait StaticFileWriter {
2239 type Primitives: Send + Sync + 'static;
2241
2242 fn get_writer(
2244 &self,
2245 block: BlockNumber,
2246 segment: StaticFileSegment,
2247 ) -> ProviderResult<StaticFileProviderRWRefMut<'_, Self::Primitives>>;
2248
2249 fn latest_writer(
2252 &self,
2253 segment: StaticFileSegment,
2254 ) -> ProviderResult<StaticFileProviderRWRefMut<'_, Self::Primitives>>;
2255
2256 fn commit(&self) -> ProviderResult<()>;
2258
2259 fn has_unwind_queued(&self) -> bool;
2261
2262 fn finalize(&self) -> ProviderResult<()>;
2266}
2267
2268impl<N: NodePrimitives> StaticFileWriter for StaticFileProvider<N> {
2269 type Primitives = N;
2270
2271 fn get_writer(
2272 &self,
2273 block: BlockNumber,
2274 segment: StaticFileSegment,
2275 ) -> ProviderResult<StaticFileProviderRWRefMut<'_, Self::Primitives>> {
2276 if self.access.is_read_only() {
2277 return Err(ProviderError::ReadOnlyStaticFileAccess);
2278 }
2279
2280 trace!(target: "providers::static_file", ?block, ?segment, "Getting static file writer.");
2281 self.writers.get_or_create(segment, || {
2282 StaticFileProviderRW::new(segment, block, Arc::downgrade(&self.0), self.metrics.clone())
2283 })
2284 }
2285
2286 fn latest_writer(
2287 &self,
2288 segment: StaticFileSegment,
2289 ) -> ProviderResult<StaticFileProviderRWRefMut<'_, Self::Primitives>> {
2290 let genesis_number = self.0.as_ref().genesis_block_number();
2291 self.get_writer(
2292 self.get_highest_static_file_block(segment).unwrap_or(genesis_number),
2293 segment,
2294 )
2295 }
2296
2297 fn commit(&self) -> ProviderResult<()> {
2298 self.writers.commit()
2299 }
2300
2301 fn has_unwind_queued(&self) -> bool {
2302 self.writers.has_unwind_queued()
2303 }
2304
2305 fn finalize(&self) -> ProviderResult<()> {
2306 self.writers.finalize()
2307 }
2308}
2309
2310impl<N: NodePrimitives> ChangeSetReader for StaticFileProvider<N> {
2311 fn account_block_changeset(
2312 &self,
2313 block_number: BlockNumber,
2314 ) -> ProviderResult<Vec<reth_db::models::AccountBeforeTx>> {
2315 let provider = match self.get_segment_provider_for_block(
2316 StaticFileSegment::AccountChangeSets,
2317 block_number,
2318 None,
2319 ) {
2320 Ok(provider) => provider,
2321 Err(ProviderError::MissingStaticFileBlock(_, _)) => return Ok(Vec::new()),
2322 Err(err) => return Err(err),
2323 };
2324
2325 if let Some(offset) = provider.read_changeset_offset(block_number)? {
2326 let mut cursor = provider.cursor()?;
2327 let mut changeset = Vec::with_capacity(offset.num_changes() as usize);
2328
2329 for i in offset.changeset_range() {
2330 if let Some(change) =
2331 cursor.get_one::<reth_db::static_file::AccountChangesetMask>(i.into())?
2332 {
2333 changeset.push(change)
2334 }
2335 }
2336 Ok(changeset)
2337 } else {
2338 Ok(Vec::new())
2339 }
2340 }
2341
2342 fn get_account_before_block(
2343 &self,
2344 block_number: BlockNumber,
2345 address: Address,
2346 ) -> ProviderResult<Option<reth_db::models::AccountBeforeTx>> {
2347 let provider = match self.get_segment_provider_for_block(
2348 StaticFileSegment::AccountChangeSets,
2349 block_number,
2350 None,
2351 ) {
2352 Ok(provider) => provider,
2353 Err(ProviderError::MissingStaticFileBlock(_, _)) => return Ok(None),
2354 Err(err) => return Err(err),
2355 };
2356
2357 let Some(offset) = provider.read_changeset_offset(block_number)? else {
2358 return Ok(None);
2359 };
2360
2361 let mut cursor = provider.cursor()?;
2362 let range = offset.changeset_range();
2363 let mut low = range.start;
2364 let mut high = range.end;
2365
2366 while low < high {
2367 let mid = low + (high - low) / 2;
2368 if let Some(change) =
2369 cursor.get_one::<reth_db::static_file::AccountChangesetMask>(mid.into())?
2370 {
2371 if change.address < address {
2372 low = mid + 1;
2373 } else {
2374 high = mid;
2375 }
2376 } else {
2377 debug!(
2380 target: "providers::static_file",
2381 ?low,
2382 ?mid,
2383 ?high,
2384 ?range,
2385 ?block_number,
2386 ?address,
2387 "Cannot continue binary search for account changeset fetch"
2388 );
2389 low = range.end;
2390 break;
2391 }
2392 }
2393
2394 if low < range.end &&
2395 let Some(change) = cursor
2396 .get_one::<reth_db::static_file::AccountChangesetMask>(low.into())?
2397 .filter(|change| change.address == address)
2398 {
2399 return Ok(Some(change));
2400 }
2401
2402 Ok(None)
2403 }
2404
2405 fn account_changesets_range(
2406 &self,
2407 range: impl core::ops::RangeBounds<BlockNumber>,
2408 ) -> ProviderResult<Vec<(BlockNumber, reth_db::models::AccountBeforeTx)>> {
2409 let range = self.bound_range(range, StaticFileSegment::AccountChangeSets);
2410 self.walk_account_changeset_range(range).collect()
2411 }
2412}
2413
2414impl<N: NodePrimitives> StorageChangeSetReader for StaticFileProvider<N> {
2415 fn storage_changeset(
2416 &self,
2417 block_number: BlockNumber,
2418 ) -> ProviderResult<Vec<(BlockNumberAddress, StorageEntry)>> {
2419 let provider = match self.get_segment_provider_for_block(
2420 StaticFileSegment::StorageChangeSets,
2421 block_number,
2422 None,
2423 ) {
2424 Ok(provider) => provider,
2425 Err(ProviderError::MissingStaticFileBlock(_, _)) => return Ok(Vec::new()),
2426 Err(err) => return Err(err),
2427 };
2428
2429 if let Some(offset) = provider.read_changeset_offset(block_number)? {
2430 let mut cursor = provider.cursor()?;
2431 let mut changeset = Vec::with_capacity(offset.num_changes() as usize);
2432
2433 for i in offset.changeset_range() {
2434 if let Some(change) = cursor.get_one::<StorageChangesetMask>(i.into())? {
2435 let block_address = BlockNumberAddress((block_number, change.address));
2436 let entry = StorageEntry { key: change.key, value: change.value };
2437 changeset.push((block_address, entry));
2438 }
2439 }
2440 Ok(changeset)
2441 } else {
2442 Ok(Vec::new())
2443 }
2444 }
2445
2446 fn get_storage_before_block(
2447 &self,
2448 block_number: BlockNumber,
2449 address: Address,
2450 storage_key: B256,
2451 ) -> ProviderResult<Option<StorageEntry>> {
2452 let provider = match self.get_segment_provider_for_block(
2453 StaticFileSegment::StorageChangeSets,
2454 block_number,
2455 None,
2456 ) {
2457 Ok(provider) => provider,
2458 Err(ProviderError::MissingStaticFileBlock(_, _)) => return Ok(None),
2459 Err(err) => return Err(err),
2460 };
2461
2462 let Some(offset) = provider.read_changeset_offset(block_number)? else {
2463 return Ok(None);
2464 };
2465
2466 let mut cursor = provider.cursor()?;
2467 let range = offset.changeset_range();
2468 let mut low = range.start;
2469 let mut high = range.end;
2470
2471 while low < high {
2472 let mid = low + (high - low) / 2;
2473 if let Some(change) = cursor.get_one::<StorageChangesetMask>(mid.into())? {
2474 match (change.address, change.key).cmp(&(address, storage_key)) {
2475 std::cmp::Ordering::Less => low = mid + 1,
2476 _ => high = mid,
2477 }
2478 } else {
2479 debug!(
2480 target: "providers::static_file",
2481 ?low,
2482 ?mid,
2483 ?high,
2484 ?range,
2485 ?block_number,
2486 ?address,
2487 ?storage_key,
2488 "Cannot continue binary search for storage changeset fetch"
2489 );
2490 low = range.end;
2491 break;
2492 }
2493 }
2494
2495 if low < range.end &&
2496 let Some(change) = cursor
2497 .get_one::<StorageChangesetMask>(low.into())?
2498 .filter(|change| change.address == address && change.key == storage_key)
2499 {
2500 return Ok(Some(StorageEntry { key: change.key, value: change.value }));
2501 }
2502
2503 Ok(None)
2504 }
2505
2506 fn storage_changesets_range(
2507 &self,
2508 range: impl RangeBounds<BlockNumber>,
2509 ) -> ProviderResult<Vec<(BlockNumberAddress, StorageEntry)>> {
2510 let range = self.bound_range(range, StaticFileSegment::StorageChangeSets);
2511 self.walk_storage_changeset_range(range).collect()
2512 }
2513}
2514
2515impl<N: NodePrimitives> StaticFileProvider<N> {
2516 pub fn walk_account_changeset_range(
2526 &self,
2527 range: impl RangeBounds<BlockNumber>,
2528 ) -> StaticFileAccountChangesetWalker<Self> {
2529 StaticFileAccountChangesetWalker::new(self.clone(), range)
2530 }
2531
2532 pub fn walk_storage_changeset_range(
2534 &self,
2535 range: impl RangeBounds<BlockNumber>,
2536 ) -> StaticFileStorageChangesetWalker<Self> {
2537 StaticFileStorageChangesetWalker::new(self.clone(), range)
2538 }
2539}
2540
2541impl<N: NodePrimitives<BlockHeader: Value>> HeaderProvider for StaticFileProvider<N> {
2542 type Header = N::BlockHeader;
2543
2544 fn header(&self, block_hash: BlockHash) -> ProviderResult<Option<Self::Header>> {
2545 self.find_static_file(StaticFileSegment::Headers, |jar_provider| {
2546 Ok(jar_provider
2547 .cursor()?
2548 .get_two::<HeaderWithHashMask<Self::Header>>((&block_hash).into())?
2549 .and_then(|(header, hash)| {
2550 if hash == block_hash {
2551 return Some(header);
2552 }
2553 None
2554 }))
2555 })
2556 }
2557
2558 fn header_by_number(&self, num: BlockNumber) -> ProviderResult<Option<Self::Header>> {
2559 self.get_segment_provider_for_block(StaticFileSegment::Headers, num, None)
2560 .and_then(|provider| provider.header_by_number(num))
2561 .or_else(|err| {
2562 if let ProviderError::MissingStaticFileBlock(_, _) = err {
2563 Ok(None)
2564 } else {
2565 Err(err)
2566 }
2567 })
2568 }
2569
2570 fn headers_range(
2571 &self,
2572 range: impl RangeBounds<BlockNumber>,
2573 ) -> ProviderResult<Vec<Self::Header>> {
2574 self.fetch_range_with_predicate(
2575 StaticFileSegment::Headers,
2576 to_range(range),
2577 |cursor, number| cursor.get_one::<HeaderMask<Self::Header>>(number.into()),
2578 |_| true,
2579 )
2580 }
2581
2582 fn sealed_header(
2583 &self,
2584 num: BlockNumber,
2585 ) -> ProviderResult<Option<SealedHeader<Self::Header>>> {
2586 self.get_segment_provider_for_block(StaticFileSegment::Headers, num, None)
2587 .and_then(|provider| provider.sealed_header(num))
2588 .or_else(|err| {
2589 if let ProviderError::MissingStaticFileBlock(_, _) = err {
2590 Ok(None)
2591 } else {
2592 Err(err)
2593 }
2594 })
2595 }
2596
2597 fn sealed_headers_while(
2598 &self,
2599 range: impl RangeBounds<BlockNumber>,
2600 predicate: impl FnMut(&SealedHeader<Self::Header>) -> bool,
2601 ) -> ProviderResult<Vec<SealedHeader<Self::Header>>> {
2602 self.fetch_range_with_predicate(
2603 StaticFileSegment::Headers,
2604 to_range(range),
2605 |cursor, number| {
2606 Ok(cursor
2607 .get_two::<HeaderWithHashMask<Self::Header>>(number.into())?
2608 .map(|(header, hash)| SealedHeader::new(header, hash)))
2609 },
2610 predicate,
2611 )
2612 }
2613}
2614
2615impl<N: NodePrimitives> BlockHashReader for StaticFileProvider<N> {
2616 fn block_hash(&self, num: u64) -> ProviderResult<Option<B256>> {
2617 self.get_segment_provider_for_block(StaticFileSegment::Headers, num, None)
2618 .and_then(|provider| provider.block_hash(num))
2619 .or_else(|err| {
2620 if let ProviderError::MissingStaticFileBlock(_, _) = err {
2621 Ok(None)
2622 } else {
2623 Err(err)
2624 }
2625 })
2626 }
2627
2628 fn canonical_hashes_range(
2629 &self,
2630 start: BlockNumber,
2631 end: BlockNumber,
2632 ) -> ProviderResult<Vec<B256>> {
2633 self.fetch_range_with_predicate(
2634 StaticFileSegment::Headers,
2635 start..end,
2636 |cursor, number| cursor.get_one::<BlockHashMask>(number.into()),
2637 |_| true,
2638 )
2639 }
2640}
2641
2642impl<N: NodePrimitives<SignedTx: Value + SignedTransaction, Receipt: Value>> ReceiptProvider
2643 for StaticFileProvider<N>
2644{
2645 type Receipt = N::Receipt;
2646
2647 fn receipt(&self, num: TxNumber) -> ProviderResult<Option<Self::Receipt>> {
2648 self.get_segment_provider_for_transaction(StaticFileSegment::Receipts, num, None)
2649 .and_then(|provider| provider.receipt(num))
2650 .or_else(|err| {
2651 if let ProviderError::MissingStaticFileTx(_, _) = err {
2652 Ok(None)
2653 } else {
2654 Err(err)
2655 }
2656 })
2657 }
2658
2659 fn receipt_by_hash(&self, hash: TxHash) -> ProviderResult<Option<Self::Receipt>> {
2660 if let Some(num) = self.transaction_id(hash)? {
2661 return self.receipt(num);
2662 }
2663 Ok(None)
2664 }
2665
2666 fn receipts_by_block(
2667 &self,
2668 _block: BlockHashOrNumber,
2669 ) -> ProviderResult<Option<Vec<Self::Receipt>>> {
2670 unreachable!()
2671 }
2672
2673 fn receipts_by_tx_range(
2674 &self,
2675 range: impl RangeBounds<TxNumber>,
2676 ) -> ProviderResult<Vec<Self::Receipt>> {
2677 self.fetch_range_with_predicate(
2678 StaticFileSegment::Receipts,
2679 to_range(range),
2680 |cursor, number| cursor.get_one::<ReceiptMask<Self::Receipt>>(number.into()),
2681 |_| true,
2682 )
2683 }
2684
2685 fn receipts_by_block_range(
2686 &self,
2687 _block_range: RangeInclusive<BlockNumber>,
2688 ) -> ProviderResult<Vec<Vec<Self::Receipt>>> {
2689 Err(ProviderError::UnsupportedProvider)
2690 }
2691}
2692
2693impl<N: NodePrimitives<SignedTx: Value, Receipt: Value, BlockHeader: Value>> TransactionsProviderExt
2694 for StaticFileProvider<N>
2695{
2696 fn transaction_hashes_by_range(
2697 &self,
2698 tx_range: Range<TxNumber>,
2699 ) -> ProviderResult<Vec<(TxHash, TxNumber)>> {
2700 let tx_range_size = (tx_range.end - tx_range.start) as usize;
2701
2702 let chunk_size = 100;
2706
2707 let chunks = tx_range
2709 .clone()
2710 .step_by(chunk_size)
2711 .map(|start| start..std::cmp::min(start + chunk_size as u64, tx_range.end));
2712 let mut channels = Vec::with_capacity(tx_range_size.div_ceil(chunk_size));
2713
2714 for chunk_range in chunks {
2715 let (channel_tx, channel_rx) = mpsc::channel();
2716 channels.push(channel_rx);
2717
2718 let manager = self.clone();
2719
2720 rayon::spawn(move || {
2723 let _ = manager.fetch_range_with_predicate(
2724 StaticFileSegment::Transactions,
2725 chunk_range,
2726 |cursor, number| {
2727 Ok(cursor
2728 .get_one::<TransactionMask<Self::Transaction>>(number.into())?
2729 .map(|transaction| {
2730 let _ = channel_tx.send(transaction_hash((number, transaction)));
2731 }))
2732 },
2733 |_| true,
2734 );
2735 });
2736 }
2737
2738 let mut tx_list = Vec::with_capacity(tx_range_size);
2739
2740 for channel in channels {
2742 while let Ok(tx) = channel.recv() {
2743 let (tx_hash, tx_id) = tx.map_err(|boxed| *boxed)?;
2744 tx_list.push((tx_hash, tx_id));
2745 }
2746 }
2747
2748 Ok(tx_list)
2749 }
2750}
2751
2752impl<N: NodePrimitives<SignedTx: Decompress + SignedTransaction>> TransactionsProvider
2753 for StaticFileProvider<N>
2754{
2755 type Transaction = N::SignedTx;
2756
2757 fn transaction_id(&self, tx_hash: TxHash) -> ProviderResult<Option<TxNumber>> {
2758 self.find_static_file(StaticFileSegment::Transactions, |jar_provider| {
2759 let mut cursor = jar_provider.cursor()?;
2760 if cursor
2761 .get_one::<TransactionMask<Self::Transaction>>((&tx_hash).into())?
2762 .and_then(|tx| (*tx.tx_hash() == tx_hash).then_some(tx))
2763 .is_some()
2764 {
2765 Ok(cursor.number())
2766 } else {
2767 Ok(None)
2768 }
2769 })
2770 }
2771
2772 fn transaction_by_id(&self, num: TxNumber) -> ProviderResult<Option<Self::Transaction>> {
2773 self.get_segment_provider_for_transaction(StaticFileSegment::Transactions, num, None)
2774 .and_then(|provider| provider.transaction_by_id(num))
2775 .or_else(|err| {
2776 if let ProviderError::MissingStaticFileTx(_, _) = err {
2777 Ok(None)
2778 } else {
2779 Err(err)
2780 }
2781 })
2782 }
2783
2784 fn transaction_by_id_unhashed(
2785 &self,
2786 num: TxNumber,
2787 ) -> ProviderResult<Option<Self::Transaction>> {
2788 self.get_segment_provider_for_transaction(StaticFileSegment::Transactions, num, None)
2789 .and_then(|provider| provider.transaction_by_id_unhashed(num))
2790 .or_else(|err| {
2791 if let ProviderError::MissingStaticFileTx(_, _) = err {
2792 Ok(None)
2793 } else {
2794 Err(err)
2795 }
2796 })
2797 }
2798
2799 fn transaction_by_hash(&self, hash: TxHash) -> ProviderResult<Option<Self::Transaction>> {
2800 self.find_static_file(StaticFileSegment::Transactions, |jar_provider| {
2801 Ok(jar_provider
2802 .cursor()?
2803 .get_one::<TransactionMask<Self::Transaction>>((&hash).into())?
2804 .and_then(|tx| (*tx.tx_hash() == hash).then_some(tx)))
2805 })
2806 }
2807
2808 fn transaction_by_hash_with_meta(
2809 &self,
2810 _hash: TxHash,
2811 ) -> ProviderResult<Option<(Self::Transaction, TransactionMeta)>> {
2812 Err(ProviderError::UnsupportedProvider)
2814 }
2815
2816 fn transactions_by_block(
2817 &self,
2818 _block_id: BlockHashOrNumber,
2819 ) -> ProviderResult<Option<Vec<Self::Transaction>>> {
2820 Err(ProviderError::UnsupportedProvider)
2822 }
2823
2824 fn transactions_by_block_range(
2825 &self,
2826 _range: impl RangeBounds<BlockNumber>,
2827 ) -> ProviderResult<Vec<Vec<Self::Transaction>>> {
2828 Err(ProviderError::UnsupportedProvider)
2830 }
2831
2832 fn transactions_by_tx_range(
2833 &self,
2834 range: impl RangeBounds<TxNumber>,
2835 ) -> ProviderResult<Vec<Self::Transaction>> {
2836 self.fetch_range_with_predicate(
2837 StaticFileSegment::Transactions,
2838 to_range(range),
2839 |cursor, number| cursor.get_one::<TransactionMask<Self::Transaction>>(number.into()),
2840 |_| true,
2841 )
2842 }
2843
2844 fn senders_by_tx_range(
2845 &self,
2846 range: impl RangeBounds<TxNumber>,
2847 ) -> ProviderResult<Vec<Address>> {
2848 self.fetch_range_with_predicate(
2849 StaticFileSegment::TransactionSenders,
2850 to_range(range),
2851 |cursor, number| cursor.get_one::<TransactionSenderMask>(number.into()),
2852 |_| true,
2853 )
2854 }
2855
2856 fn transaction_sender(&self, id: TxNumber) -> ProviderResult<Option<Address>> {
2857 self.get_segment_provider_for_transaction(StaticFileSegment::TransactionSenders, id, None)
2858 .and_then(|provider| provider.transaction_sender(id))
2859 .or_else(|err| {
2860 if let ProviderError::MissingStaticFileTx(_, _) = err {
2861 Ok(None)
2862 } else {
2863 Err(err)
2864 }
2865 })
2866 }
2867}
2868
2869impl<N: NodePrimitives> BlockNumReader for StaticFileProvider<N> {
2870 fn chain_info(&self) -> ProviderResult<ChainInfo> {
2871 Err(ProviderError::UnsupportedProvider)
2873 }
2874
2875 fn best_block_number(&self) -> ProviderResult<BlockNumber> {
2876 Err(ProviderError::UnsupportedProvider)
2878 }
2879
2880 fn last_block_number(&self) -> ProviderResult<BlockNumber> {
2881 Ok(self.get_highest_static_file_block(StaticFileSegment::Headers).unwrap_or_default())
2882 }
2883
2884 fn block_number(&self, _hash: B256) -> ProviderResult<Option<BlockNumber>> {
2885 Err(ProviderError::UnsupportedProvider)
2887 }
2888}
2889
2890impl<N: NodePrimitives<SignedTx: Value, Receipt: Value, BlockHeader: Value>> BlockReader
2893 for StaticFileProvider<N>
2894{
2895 type Block = N::Block;
2896
2897 fn find_block_by_hash(
2898 &self,
2899 _hash: B256,
2900 _source: BlockSource,
2901 ) -> ProviderResult<Option<Self::Block>> {
2902 Err(ProviderError::UnsupportedProvider)
2904 }
2905
2906 fn block(&self, _id: BlockHashOrNumber) -> ProviderResult<Option<Self::Block>> {
2907 Err(ProviderError::UnsupportedProvider)
2909 }
2910
2911 fn pending_block(&self) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
2912 Err(ProviderError::UnsupportedProvider)
2914 }
2915
2916 fn pending_block_and_receipts(
2917 &self,
2918 ) -> ProviderResult<Option<(RecoveredBlock<Self::Block>, Vec<Self::Receipt>)>> {
2919 Err(ProviderError::UnsupportedProvider)
2921 }
2922
2923 fn recovered_block(
2924 &self,
2925 _id: BlockHashOrNumber,
2926 _transaction_kind: TransactionVariant,
2927 ) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
2928 Err(ProviderError::UnsupportedProvider)
2930 }
2931
2932 fn sealed_block_with_senders(
2933 &self,
2934 _id: BlockHashOrNumber,
2935 _transaction_kind: TransactionVariant,
2936 ) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
2937 Err(ProviderError::UnsupportedProvider)
2939 }
2940
2941 fn block_range(&self, _range: RangeInclusive<BlockNumber>) -> ProviderResult<Vec<Self::Block>> {
2942 Err(ProviderError::UnsupportedProvider)
2944 }
2945
2946 fn block_with_senders_range(
2947 &self,
2948 _range: RangeInclusive<BlockNumber>,
2949 ) -> ProviderResult<Vec<RecoveredBlock<Self::Block>>> {
2950 Err(ProviderError::UnsupportedProvider)
2951 }
2952
2953 fn recovered_block_range(
2954 &self,
2955 _range: RangeInclusive<BlockNumber>,
2956 ) -> ProviderResult<Vec<RecoveredBlock<Self::Block>>> {
2957 Err(ProviderError::UnsupportedProvider)
2958 }
2959
2960 fn block_by_transaction_id(&self, _id: TxNumber) -> ProviderResult<Option<BlockNumber>> {
2961 Err(ProviderError::UnsupportedProvider)
2962 }
2963}
2964
2965impl<N: NodePrimitives> BlockBodyIndicesProvider for StaticFileProvider<N> {
2966 fn block_body_indices(&self, _num: u64) -> ProviderResult<Option<StoredBlockBodyIndices>> {
2967 Err(ProviderError::UnsupportedProvider)
2968 }
2969
2970 fn block_body_indices_range(
2971 &self,
2972 _range: RangeInclusive<BlockNumber>,
2973 ) -> ProviderResult<Vec<StoredBlockBodyIndices>> {
2974 Err(ProviderError::UnsupportedProvider)
2975 }
2976}
2977
2978impl<N: NodePrimitives> StatsReader for StaticFileProvider<N> {
2979 fn count_entries<T: Table>(&self) -> ProviderResult<usize> {
2980 match T::NAME {
2981 tables::CanonicalHeaders::NAME |
2982 tables::Headers::<Header>::NAME |
2983 tables::HeaderTerminalDifficulties::NAME => Ok(self
2984 .get_highest_static_file_block(StaticFileSegment::Headers)
2985 .map(|block| block + 1)
2986 .unwrap_or_default()
2987 as usize),
2988 tables::Receipts::<Receipt>::NAME => Ok(self
2989 .get_highest_static_file_tx(StaticFileSegment::Receipts)
2990 .map(|receipts| receipts + 1)
2991 .unwrap_or_default() as usize),
2992 tables::Transactions::<TransactionSigned>::NAME => Ok(self
2993 .get_highest_static_file_tx(StaticFileSegment::Transactions)
2994 .map(|txs| txs + 1)
2995 .unwrap_or_default()
2996 as usize),
2997 tables::TransactionSenders::NAME => Ok(self
2998 .get_highest_static_file_tx(StaticFileSegment::TransactionSenders)
2999 .map(|txs| txs + 1)
3000 .unwrap_or_default() as usize),
3001 _ => Err(ProviderError::UnsupportedProvider),
3002 }
3003 }
3004}
3005
3006#[inline]
3008fn transaction_hash<T>(entry: (TxNumber, T)) -> Result<(B256, TxNumber), Box<ProviderError>>
3009where
3010 T: TxHashRef,
3011{
3012 let (tx_id, tx) = entry;
3013 Ok((*tx.tx_hash(), tx_id))
3014}
3015
3016#[cfg(test)]
3017mod tests {
3018 use std::collections::BTreeMap;
3019
3020 use reth_chain_state::EthPrimitives;
3021 use reth_db::test_utils::create_test_static_files_dir;
3022 use reth_static_file_types::{SegmentRangeInclusive, StaticFileSegment};
3023
3024 use crate::{providers::StaticFileProvider, StaticFileProviderBuilder};
3025
3026 #[test]
3027 fn test_find_fixed_range_with_block_index() -> eyre::Result<()> {
3028 let (static_dir, _) = create_test_static_files_dir();
3029 let sf_rw: StaticFileProvider<EthPrimitives> =
3030 StaticFileProviderBuilder::read_write(&static_dir).with_blocks_per_file(100).build()?;
3031
3032 let segment = StaticFileSegment::Headers;
3033
3034 assert_eq!(
3036 sf_rw.find_fixed_range_with_block_index(segment, None, 0),
3037 SegmentRangeInclusive::new(0, 99)
3038 );
3039 assert_eq!(
3040 sf_rw.find_fixed_range_with_block_index(segment, None, 250),
3041 SegmentRangeInclusive::new(200, 299)
3042 );
3043
3044 assert_eq!(
3046 sf_rw.find_fixed_range_with_block_index(segment, Some(&BTreeMap::new()), 150),
3047 SegmentRangeInclusive::new(100, 199)
3048 );
3049
3050 let block_index = BTreeMap::from_iter([
3052 (99, SegmentRangeInclusive::new(0, 99)),
3053 (199, SegmentRangeInclusive::new(100, 199)),
3054 (299, SegmentRangeInclusive::new(200, 299)),
3055 ]);
3056
3057 assert_eq!(
3059 sf_rw.find_fixed_range_with_block_index(segment, Some(&block_index), 0),
3060 SegmentRangeInclusive::new(0, 99)
3061 );
3062 assert_eq!(
3063 sf_rw.find_fixed_range_with_block_index(segment, Some(&block_index), 50),
3064 SegmentRangeInclusive::new(0, 99)
3065 );
3066 assert_eq!(
3067 sf_rw.find_fixed_range_with_block_index(segment, Some(&block_index), 99),
3068 SegmentRangeInclusive::new(0, 99)
3069 );
3070 assert_eq!(
3071 sf_rw.find_fixed_range_with_block_index(segment, Some(&block_index), 100),
3072 SegmentRangeInclusive::new(100, 199)
3073 );
3074 assert_eq!(
3075 sf_rw.find_fixed_range_with_block_index(segment, Some(&block_index), 150),
3076 SegmentRangeInclusive::new(100, 199)
3077 );
3078 assert_eq!(
3079 sf_rw.find_fixed_range_with_block_index(segment, Some(&block_index), 199),
3080 SegmentRangeInclusive::new(100, 199)
3081 );
3082
3083 assert_eq!(
3086 sf_rw.find_fixed_range_with_block_index(segment, Some(&block_index), 300),
3087 SegmentRangeInclusive::new(300, 399)
3088 );
3089 assert_eq!(
3090 sf_rw.find_fixed_range_with_block_index(segment, Some(&block_index), 350),
3091 SegmentRangeInclusive::new(300, 399)
3092 );
3093
3094 assert_eq!(
3096 sf_rw.find_fixed_range_with_block_index(segment, Some(&block_index), 500),
3097 SegmentRangeInclusive::new(500, 599)
3098 );
3099
3100 assert_eq!(
3102 sf_rw.find_fixed_range_with_block_index(segment, Some(&block_index), 1000),
3103 SegmentRangeInclusive::new(1000, 1099)
3104 );
3105
3106 let mixed_size_index = BTreeMap::from_iter([
3109 (49, SegmentRangeInclusive::new(0, 49)), (149, SegmentRangeInclusive::new(50, 149)), (349, SegmentRangeInclusive::new(150, 349)), ]);
3113
3114 assert_eq!(
3116 sf_rw.find_fixed_range_with_block_index(segment, Some(&mixed_size_index), 25),
3117 SegmentRangeInclusive::new(0, 49)
3118 );
3119 assert_eq!(
3120 sf_rw.find_fixed_range_with_block_index(segment, Some(&mixed_size_index), 100),
3121 SegmentRangeInclusive::new(50, 149)
3122 );
3123 assert_eq!(
3124 sf_rw.find_fixed_range_with_block_index(segment, Some(&mixed_size_index), 200),
3125 SegmentRangeInclusive::new(150, 349)
3126 );
3127
3128 assert_eq!(
3131 sf_rw.find_fixed_range_with_block_index(segment, Some(&mixed_size_index), 350),
3132 SegmentRangeInclusive::new(350, 449)
3133 );
3134 assert_eq!(
3135 sf_rw.find_fixed_range_with_block_index(segment, Some(&mixed_size_index), 450),
3136 SegmentRangeInclusive::new(450, 549)
3137 );
3138 assert_eq!(
3139 sf_rw.find_fixed_range_with_block_index(segment, Some(&mixed_size_index), 550),
3140 SegmentRangeInclusive::new(550, 649)
3141 );
3142
3143 Ok(())
3144 }
3145}