1use super::{
2 metrics::StaticFileProviderMetrics, writer::StaticFileWriters, LoadedJar,
3 StaticFileJarProvider, StaticFileProviderRW, StaticFileProviderRWRefMut,
4};
5use crate::{
6 changeset_walker::{StaticFileAccountChangesetWalker, StaticFileStorageChangesetWalker},
7 to_range, BlockHashReader, BlockNumReader, BlockReader, BlockSource, EitherWriter,
8 EitherWriterDestination, HeaderProvider, ReceiptProvider, StageCheckpointReader, StatsReader,
9 TransactionVariant, TransactionsProvider, TransactionsProviderExt,
10};
11use alloy_consensus::{transaction::TransactionMeta, Header};
12use alloy_eips::{eip2718::Encodable2718, BlockHashOrNumber};
13use alloy_primitives::{b256, keccak256, Address, BlockHash, BlockNumber, TxHash, TxNumber, B256};
14
15use parking_lot::RwLock;
16use reth_chain_state::ExecutedBlock;
17use reth_chainspec::{ChainInfo, ChainSpecProvider, EthChainSpec, NamedChain};
18use reth_db::{
19 lockfile::StorageLock,
20 static_file::{
21 iter_static_files, BlockHashMask, HeaderMask, HeaderWithHashMask, ReceiptMask,
22 StaticFileCursor, StorageChangesetMask, TransactionMask, TransactionSenderMask,
23 },
24};
25use reth_db_api::{
26 cursor::DbCursorRO,
27 models::{AccountBeforeTx, BlockNumberAddress, StorageBeforeTx, StoredBlockBodyIndices},
28 table::{Decompress, Table, Value},
29 tables,
30 transaction::DbTx,
31};
32use reth_ethereum_primitives::{Receipt, TransactionSigned};
33use reth_nippy_jar::{NippyJar, NippyJarChecker};
34use reth_node_types::NodePrimitives;
35use reth_primitives_traits::{
36 dashmap::DashMap, AlloyBlockHeader as _, BlockBody as _, RecoveredBlock, SealedHeader,
37 SignedTransaction, StorageEntry,
38};
39use reth_prune_types::PruneSegment;
40use reth_stages_types::PipelineTarget;
41use reth_static_file_types::{
42 find_fixed_range, HighestStaticFiles, SegmentHeader, SegmentRangeInclusive, StaticFileMap,
43 StaticFileSegment, DEFAULT_BLOCKS_PER_STATIC_FILE,
44};
45use reth_storage_api::{
46 BlockBodyIndicesProvider, ChangeSetReader, DBProvider, PruneCheckpointReader,
47 StorageChangeSetReader, StorageSettingsCache,
48};
49use reth_storage_errors::provider::{ProviderError, ProviderResult, StaticFileWriterError};
50use std::{
51 collections::BTreeMap,
52 fmt::Debug,
53 ops::{Bound, Deref, Range, RangeBounds, RangeInclusive},
54 path::{Path, PathBuf},
55 sync::{atomic::AtomicU64, mpsc, Arc},
56};
57use tracing::{debug, info, info_span, instrument, trace, warn};
58
59type SegmentRanges = BTreeMap<u64, SegmentRangeInclusive>;
62
63#[derive(Debug, Default, PartialEq, Eq)]
65pub enum StaticFileAccess {
66 #[default]
68 RO,
69 RW,
71}
72
73impl StaticFileAccess {
74 pub const fn is_read_only(&self) -> bool {
76 matches!(self, Self::RO)
77 }
78
79 pub const fn is_read_write(&self) -> bool {
81 matches!(self, Self::RW)
82 }
83}
84
85#[derive(Debug, Clone, Copy, Default)]
89pub struct StaticFileWriteCtx {
90 pub write_senders: bool,
92 pub write_receipts: bool,
94 pub write_account_changesets: bool,
96 pub write_storage_changesets: bool,
98 pub tip: BlockNumber,
100 pub receipts_prune_mode: Option<reth_prune_types::PruneMode>,
102 pub receipts_prunable: bool,
104}
105
106#[derive(Debug)]
115pub struct StaticFileProvider<N>(pub(crate) Arc<StaticFileProviderInner<N>>);
116
117impl<N> Clone for StaticFileProvider<N> {
118 fn clone(&self) -> Self {
119 Self(self.0.clone())
120 }
121}
122
123#[derive(Debug)]
125pub struct StaticFileProviderBuilder<P> {
126 access: StaticFileAccess,
127 use_metrics: bool,
128 blocks_per_file: StaticFileMap<u64>,
129 path: P,
130 genesis_block_number: u64,
131}
132
133impl<P: AsRef<Path>> StaticFileProviderBuilder<P> {
134 pub fn read_write(path: P) -> Self {
136 Self {
137 path,
138 access: StaticFileAccess::RW,
139 blocks_per_file: Default::default(),
140 use_metrics: false,
141 genesis_block_number: 0,
142 }
143 }
144
145 pub fn read_only(path: P) -> Self {
147 Self {
148 path,
149 access: StaticFileAccess::RO,
150 blocks_per_file: Default::default(),
151 use_metrics: false,
152 genesis_block_number: 0,
153 }
154 }
155
156 pub fn with_blocks_per_file_for_segments(
168 mut self,
169 segments: &<StaticFileMap<u64> as Deref>::Target,
170 ) -> Self {
171 for (segment, &blocks_per_file) in segments {
172 self.blocks_per_file.insert(segment, blocks_per_file);
173 }
174 self
175 }
176
177 pub fn with_blocks_per_file(mut self, blocks_per_file: u64) -> Self {
179 for segment in StaticFileSegment::iter() {
180 self.blocks_per_file.insert(segment, blocks_per_file);
181 }
182 self
183 }
184
185 pub fn with_blocks_per_file_for_segment(
187 mut self,
188 segment: StaticFileSegment,
189 blocks_per_file: u64,
190 ) -> Self {
191 self.blocks_per_file.insert(segment, blocks_per_file);
192 self
193 }
194
195 pub const fn with_metrics(mut self) -> Self {
197 self.use_metrics = true;
198 self
199 }
200
201 pub const fn with_genesis_block_number(mut self, genesis_block_number: u64) -> Self {
214 self.genesis_block_number = genesis_block_number;
215 self
216 }
217
218 pub fn build<N: NodePrimitives>(self) -> ProviderResult<StaticFileProvider<N>> {
220 let mut provider = StaticFileProviderInner::new(self.path, self.access)?;
221 if self.use_metrics {
222 provider.metrics = Some(Arc::new(StaticFileProviderMetrics::default()));
223 }
224
225 for (segment, blocks_per_file) in *self.blocks_per_file {
226 provider.blocks_per_file.insert(segment, blocks_per_file);
227 }
228 provider.genesis_block_number = self.genesis_block_number;
229
230 let provider = StaticFileProvider(Arc::new(provider));
231 provider.initialize_index()?;
232 Ok(provider)
233 }
234}
235
236impl<N: NodePrimitives> StaticFileProvider<N> {
237 fn new(path: impl AsRef<Path>, access: StaticFileAccess) -> ProviderResult<Self> {
239 let provider = Self(Arc::new(StaticFileProviderInner::new(path, access)?));
240 provider.initialize_index()?;
241 Ok(provider)
242 }
243}
244
245impl<N: NodePrimitives> StaticFileProvider<N> {
246 pub fn read_only(path: impl AsRef<Path>) -> ProviderResult<Self> {
251 Self::new(path, StaticFileAccess::RO)
252 }
253
254 pub fn read_write(path: impl AsRef<Path>) -> ProviderResult<Self> {
256 Self::new(path, StaticFileAccess::RW)
257 }
258}
259
260impl<N: NodePrimitives> Deref for StaticFileProvider<N> {
261 type Target = StaticFileProviderInner<N>;
262
263 fn deref(&self) -> &Self::Target {
264 &self.0
265 }
266}
267
268#[derive(Debug)]
270pub struct StaticFileProviderInner<N> {
271 map: DashMap<(BlockNumber, StaticFileSegment), LoadedJar>,
274 indexes: RwLock<StaticFileMap<StaticFileSegmentIndex>>,
276 earliest_history_height: AtomicU64,
287 path: PathBuf,
289 writers: StaticFileWriters<N>,
291 metrics: Option<Arc<StaticFileProviderMetrics>>,
293 access: StaticFileAccess,
295 blocks_per_file: StaticFileMap<u64>,
297 _lock_file: Option<StorageLock>,
299 genesis_block_number: u64,
301}
302
303impl<N: NodePrimitives> StaticFileProviderInner<N> {
304 fn new(path: impl AsRef<Path>, access: StaticFileAccess) -> ProviderResult<Self> {
306 let _lock_file = if access.is_read_write() {
307 StorageLock::try_acquire(path.as_ref()).map_err(ProviderError::other)?.into()
308 } else {
309 None
310 };
311
312 let mut blocks_per_file = StaticFileMap::default();
313 for segment in StaticFileSegment::iter() {
314 blocks_per_file.insert(segment, DEFAULT_BLOCKS_PER_STATIC_FILE);
315 }
316
317 let provider = Self {
318 map: Default::default(),
319 indexes: Default::default(),
320 writers: Default::default(),
321 earliest_history_height: Default::default(),
322 path: path.as_ref().to_path_buf(),
323 metrics: None,
324 access,
325 blocks_per_file,
326 _lock_file,
327 genesis_block_number: 0,
328 };
329
330 Ok(provider)
331 }
332
333 pub const fn is_read_only(&self) -> bool {
334 self.access.is_read_only()
335 }
336
337 pub fn find_fixed_range_with_block_index(
346 &self,
347 segment: StaticFileSegment,
348 block_index: Option<&SegmentRanges>,
349 block: BlockNumber,
350 ) -> SegmentRangeInclusive {
351 let blocks_per_file =
352 self.blocks_per_file.get(segment).copied().unwrap_or(DEFAULT_BLOCKS_PER_STATIC_FILE);
353
354 if let Some(block_index) = block_index {
355 if let Some((_, range)) = block_index.iter().find(|(max_block, _)| block <= **max_block)
357 {
358 return *range;
360 } else if let Some((_, range)) = block_index.last_key_value() {
361 let blocks_after_last_range = block - range.end();
367 let segments_to_skip = (blocks_after_last_range - 1) / blocks_per_file;
368 let start = range.end() + 1 + segments_to_skip * blocks_per_file;
369 return SegmentRangeInclusive::new(start, start + blocks_per_file - 1);
370 }
371 }
372 find_fixed_range(block, blocks_per_file)
375 }
376
377 pub fn find_fixed_range(
390 &self,
391 segment: StaticFileSegment,
392 block: BlockNumber,
393 ) -> SegmentRangeInclusive {
394 self.find_fixed_range_with_block_index(
395 segment,
396 self.indexes.read().get(segment).map(|index| &index.expected_block_ranges_by_max_block),
397 block,
398 )
399 }
400
401 pub const fn genesis_block_number(&self) -> u64 {
403 self.genesis_block_number
404 }
405}
406
407impl<N: NodePrimitives> StaticFileProvider<N> {
408 pub fn report_metrics(&self) -> ProviderResult<()> {
413 let Some(metrics) = &self.metrics else { return Ok(()) };
414
415 let static_files = iter_static_files(&self.path).map_err(ProviderError::other)?;
416 for (segment, headers) in &*static_files {
417 let mut entries = 0;
418 let mut size = 0;
419
420 for (block_range, _) in headers {
421 let fixed_block_range = self.find_fixed_range(segment, block_range.start());
422 let jar_provider = self
423 .get_segment_provider_for_range(segment, || Some(fixed_block_range), None)?
424 .ok_or_else(|| {
425 ProviderError::MissingStaticFileBlock(segment, block_range.start())
426 })?;
427
428 entries += jar_provider.rows();
429 size += jar_provider.size() as u64;
430 }
431
432 metrics.record_segment(segment, size, headers.len(), entries);
433 }
434
435 Ok(())
436 }
437
438 #[instrument(level = "debug", target = "providers::static_file", skip_all)]
440 fn write_headers(
441 w: &mut StaticFileProviderRWRefMut<'_, N>,
442 blocks: &[ExecutedBlock<N>],
443 ) -> ProviderResult<()> {
444 for block in blocks {
445 let b = block.recovered_block();
446 w.append_header(b.header(), &b.hash())?;
447 }
448 Ok(())
449 }
450
451 #[instrument(level = "debug", target = "providers::static_file", skip_all)]
453 fn write_transactions(
454 w: &mut StaticFileProviderRWRefMut<'_, N>,
455 blocks: &[ExecutedBlock<N>],
456 tx_nums: &[TxNumber],
457 ) -> ProviderResult<()> {
458 for (block, &first_tx) in blocks.iter().zip(tx_nums) {
459 let b = block.recovered_block();
460 w.increment_block(b.number())?;
461 for (i, tx) in b.body().transactions().iter().enumerate() {
462 w.append_transaction(first_tx + i as u64, tx)?;
463 }
464 }
465 Ok(())
466 }
467
468 #[instrument(level = "debug", target = "providers::static_file", skip_all)]
470 fn write_transaction_senders(
471 w: &mut StaticFileProviderRWRefMut<'_, N>,
472 blocks: &[ExecutedBlock<N>],
473 tx_nums: &[TxNumber],
474 ) -> ProviderResult<()> {
475 for (block, &first_tx) in blocks.iter().zip(tx_nums) {
476 let b = block.recovered_block();
477 w.increment_block(b.number())?;
478 for (i, sender) in b.senders_iter().enumerate() {
479 w.append_transaction_sender(first_tx + i as u64, sender)?;
480 }
481 }
482 Ok(())
483 }
484
485 #[instrument(level = "debug", target = "providers::static_file", skip_all)]
487 fn write_receipts(
488 w: &mut StaticFileProviderRWRefMut<'_, N>,
489 blocks: &[ExecutedBlock<N>],
490 tx_nums: &[TxNumber],
491 ctx: &StaticFileWriteCtx,
492 ) -> ProviderResult<()> {
493 for (block, &first_tx) in blocks.iter().zip(tx_nums) {
494 let block_number = block.recovered_block().number();
495 w.increment_block(block_number)?;
496
497 if ctx.receipts_prunable &&
499 ctx.receipts_prune_mode
500 .is_some_and(|mode| mode.should_prune(block_number, ctx.tip))
501 {
502 continue
503 }
504
505 for (i, receipt) in block.execution_outcome().receipts.iter().enumerate() {
506 w.append_receipt(first_tx + i as u64, receipt)?;
507 }
508 }
509 Ok(())
510 }
511
512 #[instrument(level = "debug", target = "providers::static_file", skip_all)]
514 fn write_account_changesets(
515 w: &mut StaticFileProviderRWRefMut<'_, N>,
516 blocks: &[ExecutedBlock<N>],
517 ) -> ProviderResult<()> {
518 for block in blocks {
519 let block_number = block.recovered_block().number();
520 let reverts = block.execution_outcome().state.reverts.to_plain_state_reverts();
521
522 let changeset: Vec<_> = reverts
523 .accounts
524 .into_iter()
525 .flatten()
526 .map(|(address, info)| AccountBeforeTx { address, info: info.map(Into::into) })
527 .collect();
528 w.append_account_changeset(changeset, block_number)?;
529 }
530 Ok(())
531 }
532
533 #[instrument(level = "debug", target = "providers::db", skip_all)]
535 fn write_storage_changesets(
536 w: &mut StaticFileProviderRWRefMut<'_, N>,
537 blocks: &[ExecutedBlock<N>],
538 ) -> ProviderResult<()> {
539 for block in blocks {
540 let block_number = block.recovered_block().number();
541 let reverts = block.execution_outcome().state.reverts.to_plain_state_reverts();
542
543 let changeset: Vec<_> = reverts
544 .storage
545 .into_iter()
546 .flatten()
547 .flat_map(|revert| {
548 revert.storage_revert.into_iter().map(move |(key, revert_to_slot)| {
549 StorageBeforeTx {
550 address: revert.address,
551 key: B256::from(key.to_be_bytes()),
552 value: revert_to_slot.to_previous_value(),
553 }
554 })
555 })
556 .collect();
557 w.append_storage_changeset(changeset, block_number)?;
558 }
559 Ok(())
560 }
561
562 #[instrument(level = "debug", target = "providers::static_file", skip_all, fields(?segment))]
567 fn write_segment<F>(
568 &self,
569 segment: StaticFileSegment,
570 first_block_number: BlockNumber,
571 f: F,
572 ) -> ProviderResult<()>
573 where
574 F: FnOnce(&mut StaticFileProviderRWRefMut<'_, N>) -> ProviderResult<()>,
575 {
576 let mut w = self.get_writer(first_block_number, segment)?;
577 f(&mut w)?;
578 w.sync_all()
579 }
580
581 #[instrument(level = "debug", target = "providers::static_file", skip_all)]
586 pub fn write_blocks_data(
587 &self,
588 blocks: &[ExecutedBlock<N>],
589 tx_nums: &[TxNumber],
590 ctx: StaticFileWriteCtx,
591 runtime: &reth_tasks::Runtime,
592 ) -> ProviderResult<()> {
593 if blocks.is_empty() {
594 return Ok(());
595 }
596
597 let first_block_number = blocks[0].recovered_block().number();
598
599 let mut r_headers = None;
600 let mut r_txs = None;
601 let mut r_senders = None;
602 let mut r_receipts = None;
603 let mut r_account_changesets = None;
604 let mut r_storage_changesets = None;
605
606 let span = tracing::Span::current();
609 runtime.storage_pool().in_place_scope(|s| {
610 s.spawn(|_| {
611 let _guard = span.enter();
612 r_headers =
613 Some(self.write_segment(StaticFileSegment::Headers, first_block_number, |w| {
614 Self::write_headers(w, blocks)
615 }));
616 });
617
618 s.spawn(|_| {
619 let _guard = span.enter();
620 r_txs = Some(self.write_segment(
621 StaticFileSegment::Transactions,
622 first_block_number,
623 |w| Self::write_transactions(w, blocks, tx_nums),
624 ));
625 });
626
627 if ctx.write_senders {
628 s.spawn(|_| {
629 let _guard = span.enter();
630 r_senders = Some(self.write_segment(
631 StaticFileSegment::TransactionSenders,
632 first_block_number,
633 |w| Self::write_transaction_senders(w, blocks, tx_nums),
634 ));
635 });
636 }
637
638 if ctx.write_receipts {
639 s.spawn(|_| {
640 let _guard = span.enter();
641 r_receipts = Some(self.write_segment(
642 StaticFileSegment::Receipts,
643 first_block_number,
644 |w| Self::write_receipts(w, blocks, tx_nums, &ctx),
645 ));
646 });
647 }
648
649 if ctx.write_account_changesets {
650 s.spawn(|_| {
651 let _guard = span.enter();
652 r_account_changesets = Some(self.write_segment(
653 StaticFileSegment::AccountChangeSets,
654 first_block_number,
655 |w| Self::write_account_changesets(w, blocks),
656 ));
657 });
658 }
659
660 if ctx.write_storage_changesets {
661 s.spawn(|_| {
662 let _guard = span.enter();
663 r_storage_changesets = Some(self.write_segment(
664 StaticFileSegment::StorageChangeSets,
665 first_block_number,
666 |w| Self::write_storage_changesets(w, blocks),
667 ));
668 });
669 }
670 });
671
672 r_headers.ok_or(StaticFileWriterError::ThreadPanic("headers"))??;
673 r_txs.ok_or(StaticFileWriterError::ThreadPanic("transactions"))??;
674 if ctx.write_senders {
675 r_senders.ok_or(StaticFileWriterError::ThreadPanic("senders"))??;
676 }
677 if ctx.write_receipts {
678 r_receipts.ok_or(StaticFileWriterError::ThreadPanic("receipts"))??;
679 }
680 if ctx.write_account_changesets {
681 r_account_changesets
682 .ok_or(StaticFileWriterError::ThreadPanic("account_changesets"))??;
683 }
684 if ctx.write_storage_changesets {
685 r_storage_changesets
686 .ok_or(StaticFileWriterError::ThreadPanic("storage_changesets"))??;
687 }
688 Ok(())
689 }
690
691 pub fn get_segment_provider(
694 &self,
695 segment: StaticFileSegment,
696 number: u64,
697 ) -> ProviderResult<StaticFileJarProvider<'_, N>> {
698 if segment.is_block_or_change_based() {
699 self.get_segment_provider_for_block(segment, number, None)
700 } else {
701 self.get_segment_provider_for_transaction(segment, number, None)
702 }
703 }
704
705 pub fn get_maybe_segment_provider(
710 &self,
711 segment: StaticFileSegment,
712 number: u64,
713 ) -> ProviderResult<Option<StaticFileJarProvider<'_, N>>> {
714 let provider = if segment.is_block_or_change_based() {
715 self.get_segment_provider_for_block(segment, number, None)
716 } else {
717 self.get_segment_provider_for_transaction(segment, number, None)
718 };
719
720 match provider {
721 Ok(provider) => Ok(Some(provider)),
722 Err(
723 ProviderError::MissingStaticFileBlock(_, _) |
724 ProviderError::MissingStaticFileTx(_, _),
725 ) => Ok(None),
726 Err(err) => Err(err),
727 }
728 }
729
730 pub fn get_segment_provider_for_block(
732 &self,
733 segment: StaticFileSegment,
734 block: BlockNumber,
735 path: Option<&Path>,
736 ) -> ProviderResult<StaticFileJarProvider<'_, N>> {
737 self.get_segment_provider_for_range(
738 segment,
739 || self.get_segment_ranges_from_block(segment, block),
740 path,
741 )?
742 .ok_or(ProviderError::MissingStaticFileBlock(segment, block))
743 }
744
745 pub fn get_segment_provider_for_transaction(
747 &self,
748 segment: StaticFileSegment,
749 tx: TxNumber,
750 path: Option<&Path>,
751 ) -> ProviderResult<StaticFileJarProvider<'_, N>> {
752 self.get_segment_provider_for_range(
753 segment,
754 || self.get_segment_ranges_from_transaction(segment, tx),
755 path,
756 )?
757 .ok_or(ProviderError::MissingStaticFileTx(segment, tx))
758 }
759
760 pub fn get_segment_provider_for_range(
764 &self,
765 segment: StaticFileSegment,
766 fn_range: impl Fn() -> Option<SegmentRangeInclusive>,
767 path: Option<&Path>,
768 ) -> ProviderResult<Option<StaticFileJarProvider<'_, N>>> {
769 let block_range = match path {
772 Some(path) => StaticFileSegment::parse_filename(
773 &path
774 .file_name()
775 .ok_or_else(|| {
776 ProviderError::MissingStaticFileSegmentPath(segment, path.to_path_buf())
777 })?
778 .to_string_lossy(),
779 )
780 .and_then(|(parsed_segment, block_range)| {
781 if parsed_segment == segment {
782 return Some(block_range);
783 }
784 None
785 }),
786 None => fn_range(),
787 };
788
789 if let Some(block_range) = block_range {
791 return Ok(Some(self.get_or_create_jar_provider(segment, &block_range)?));
792 }
793
794 Ok(None)
795 }
796
797 pub fn get_segment_provider_for_path(
799 &self,
800 path: &Path,
801 ) -> ProviderResult<Option<StaticFileJarProvider<'_, N>>> {
802 StaticFileSegment::parse_filename(
803 &path
804 .file_name()
805 .ok_or_else(|| ProviderError::MissingStaticFilePath(path.to_path_buf()))?
806 .to_string_lossy(),
807 )
808 .map(|(segment, block_range)| self.get_or_create_jar_provider(segment, &block_range))
809 .transpose()
810 }
811
812 pub fn remove_cached_provider(
816 &self,
817 segment: StaticFileSegment,
818 fixed_block_range_end: BlockNumber,
819 ) {
820 self.map.remove(&(fixed_block_range_end, segment));
821 }
822
823 pub fn delete_segment_below_block(
840 &self,
841 segment: StaticFileSegment,
842 block: BlockNumber,
843 ) -> ProviderResult<Vec<SegmentHeader>> {
844 if block == 0 {
846 return Ok(Vec::new());
847 }
848
849 let highest_block = self.get_highest_static_file_block(segment);
850 let mut deleted_headers = Vec::new();
851
852 loop {
853 let Some(block_height) = self.get_lowest_range_end(segment) else {
854 return Ok(deleted_headers);
855 };
856
857 if block_height >= block || Some(block_height) == highest_block {
859 return Ok(deleted_headers);
860 }
861
862 debug!(
863 target: "providers::static_file",
864 ?segment,
865 ?block_height,
866 "Deleting static file below block"
867 );
868
869 let header = self.delete_jar(segment, block_height).inspect_err(|err| {
872 warn!( target: "providers::static_file", ?segment, %block_height, ?err, "Failed to delete static file below block")
873 })?;
874
875 deleted_headers.push(header);
876 }
877 }
878
879 pub fn delete_jar(
887 &self,
888 segment: StaticFileSegment,
889 block: BlockNumber,
890 ) -> ProviderResult<SegmentHeader> {
891 let fixed_block_range = self.find_fixed_range(segment, block);
892 let key = (fixed_block_range.end(), segment);
893 let file = self.path.join(segment.filename(&fixed_block_range));
894 let jar = if let Some((_, jar)) = self.map.remove(&key) {
895 jar.jar
896 } else {
897 debug!(
898 target: "providers::static_file",
899 ?file,
900 ?fixed_block_range,
901 ?block,
902 "Loading static file jar for deletion"
903 );
904 NippyJar::<SegmentHeader>::load(&file).map_err(ProviderError::other)?
905 };
906
907 let header = jar.user_header().clone();
908
909 if segment.is_change_based() {
911 let csoff_path = file.with_extension("csoff");
912 if csoff_path.exists() {
913 std::fs::remove_file(&csoff_path).map_err(ProviderError::other)?;
914 }
915 }
916
917 jar.delete().map_err(ProviderError::other)?;
918
919 self.initialize_index()?;
922
923 Ok(header)
924 }
925
926 pub fn delete_segment(&self, segment: StaticFileSegment) -> ProviderResult<Vec<SegmentHeader>> {
934 let mut deleted_headers = Vec::new();
935
936 while let Some(block_height) = self.get_highest_static_file_block(segment) {
937 debug!(
938 target: "providers::static_file",
939 ?segment,
940 ?block_height,
941 "Deleting static file jar"
942 );
943
944 let header = self.delete_jar(segment, block_height).inspect_err(|err| {
945 warn!(target: "providers::static_file", ?segment, %block_height, ?err, "Failed to delete static file jar")
946 })?;
947
948 deleted_headers.push(header);
949 }
950
951 Ok(deleted_headers)
952 }
953
954 fn get_or_create_jar_provider(
958 &self,
959 segment: StaticFileSegment,
960 fixed_block_range: &SegmentRangeInclusive,
961 ) -> ProviderResult<StaticFileJarProvider<'_, N>> {
962 let key = (fixed_block_range.end(), segment);
963
964 trace!(target: "providers::static_file", ?segment, ?fixed_block_range, "Getting provider");
966 let mut provider: StaticFileJarProvider<'_, N> = if let Some(jar) = self.map.get(&key) {
967 trace!(target: "providers::static_file", ?segment, ?fixed_block_range, "Jar found in cache");
968 jar.into()
969 } else {
970 trace!(target: "providers::static_file", ?segment, ?fixed_block_range, "Creating jar from scratch");
971 let path = self.path.join(segment.filename(fixed_block_range));
972 let jar = NippyJar::load(&path).map_err(ProviderError::other)?;
973 self.map.entry(key).insert(LoadedJar::new(jar)?).downgrade().into()
974 };
975
976 if let Some(metrics) = &self.metrics {
977 provider = provider.with_metrics(metrics.clone());
978 }
979 Ok(provider)
980 }
981
982 fn get_segment_ranges_from_block(
985 &self,
986 segment: StaticFileSegment,
987 block: u64,
988 ) -> Option<SegmentRangeInclusive> {
989 let indexes = self.indexes.read();
990 let index = indexes.get(segment)?;
991
992 (index.max_block >= block).then(|| {
993 self.find_fixed_range_with_block_index(
994 segment,
995 Some(&index.expected_block_ranges_by_max_block),
996 block,
997 )
998 })
999 }
1000
1001 fn get_segment_ranges_from_transaction(
1004 &self,
1005 segment: StaticFileSegment,
1006 tx: u64,
1007 ) -> Option<SegmentRangeInclusive> {
1008 let indexes = self.indexes.read();
1009 let index = indexes.get(segment)?;
1010 let available_block_ranges_by_max_tx = index.available_block_ranges_by_max_tx.as_ref()?;
1011
1012 let mut static_files_rev_iter = available_block_ranges_by_max_tx.iter().rev().peekable();
1015
1016 while let Some((tx_end, block_range)) = static_files_rev_iter.next() {
1017 if tx > *tx_end {
1018 return None;
1020 }
1021 let tx_start = static_files_rev_iter.peek().map(|(tx_end, _)| *tx_end + 1).unwrap_or(0);
1022 if tx_start <= tx {
1023 return Some(self.find_fixed_range_with_block_index(
1024 segment,
1025 Some(&index.expected_block_ranges_by_max_block),
1026 block_range.end(),
1027 ));
1028 }
1029 }
1030 None
1031 }
1032
1033 pub fn update_index(
1040 &self,
1041 segment: StaticFileSegment,
1042 segment_max_block: Option<BlockNumber>,
1043 ) -> ProviderResult<()> {
1044 debug!(
1045 target: "providers::static_file",
1046 ?segment,
1047 ?segment_max_block,
1048 "Updating provider index"
1049 );
1050 let mut indexes = self.indexes.write();
1051
1052 match segment_max_block {
1053 Some(segment_max_block) => {
1054 let fixed_range = self.find_fixed_range_with_block_index(
1055 segment,
1056 indexes.get(segment).map(|index| &index.expected_block_ranges_by_max_block),
1057 segment_max_block,
1058 );
1059
1060 let jar = NippyJar::<SegmentHeader>::load(
1061 &self.path.join(segment.filename(&fixed_range)),
1062 )
1063 .map_err(ProviderError::other)?;
1064
1065 let index = indexes
1066 .entry(segment)
1067 .and_modify(|index| {
1068 index.max_block = segment_max_block;
1070
1071 index
1075 .expected_block_ranges_by_max_block
1076 .retain(|_, block_range| block_range.start() < fixed_range.start());
1077 index
1079 .expected_block_ranges_by_max_block
1080 .insert(fixed_range.end(), fixed_range);
1081 })
1082 .or_insert_with(|| StaticFileSegmentIndex {
1083 min_block_range: None,
1084 max_block: segment_max_block,
1085 expected_block_ranges_by_max_block: BTreeMap::from([(
1086 fixed_range.end(),
1087 fixed_range,
1088 )]),
1089 available_block_ranges_by_max_tx: None,
1090 });
1091
1092 if let Some(current_block_range) = jar.user_header().block_range() {
1108 if let Some(min_block_range) = index.min_block_range.as_mut() {
1109 if current_block_range.start() == min_block_range.start() {
1112 *min_block_range = current_block_range;
1113 }
1114 } else {
1115 index.min_block_range = Some(current_block_range);
1116 }
1117 }
1118
1119 if let Some(tx_range) = jar.user_header().tx_range() {
1122 if let Some(current_block_range) = jar.user_header().block_range() {
1125 let tx_end = tx_range.end();
1126
1127 if let Some(index) = index.available_block_ranges_by_max_tx.as_mut() {
1136 index
1137 .retain(|_, block_range| block_range.start() < fixed_range.start());
1138 index.insert(tx_end, current_block_range);
1139 } else {
1140 index.available_block_ranges_by_max_tx =
1141 Some(BTreeMap::from([(tx_end, current_block_range)]));
1142 }
1143 }
1144 } else if segment.is_tx_based() {
1145 if let Some(index) = index.available_block_ranges_by_max_tx.as_mut() {
1149 index.retain(|_, block_range| block_range.start() < fixed_range.start());
1150 }
1151
1152 index.available_block_ranges_by_max_tx.take_if(|index| index.is_empty());
1154 }
1155
1156 debug!(target: "providers::static_file", ?segment, "Inserting updated jar into cache");
1158 self.map.insert((fixed_range.end(), segment), LoadedJar::new(jar)?);
1159
1160 debug!(target: "providers::static_file", ?segment, "Cleaning up jar map");
1162 self.map.retain(|(end, seg), _| !(*seg == segment && *end > fixed_range.end()));
1163 }
1164 None => {
1165 debug!(target: "providers::static_file", ?segment, "Removing segment from index");
1166 indexes.remove(segment);
1167 }
1168 };
1169
1170 debug!(target: "providers::static_file", ?segment, "Updated provider index");
1171 Ok(())
1172 }
1173
1174 pub fn initialize_index(&self) -> ProviderResult<()> {
1176 let mut indexes = self.indexes.write();
1177 indexes.clear();
1178
1179 for (segment, headers) in &*iter_static_files(&self.path).map_err(ProviderError::other)? {
1180 let min_block_range = Some(headers.first().expect("headers are not empty").0);
1185 let max_block = headers.last().expect("headers are not empty").0.end();
1186
1187 let mut expected_block_ranges_by_max_block = BTreeMap::default();
1188 let mut available_block_ranges_by_max_tx = None;
1189
1190 for (block_range, header) in headers {
1191 expected_block_ranges_by_max_block
1193 .insert(header.expected_block_end(), header.expected_block_range());
1194
1195 if let Some(tx_range) = header.tx_range() {
1197 let tx_end = tx_range.end();
1198
1199 available_block_ranges_by_max_tx
1200 .get_or_insert_with(BTreeMap::default)
1201 .insert(tx_end, *block_range);
1202 }
1203 }
1204
1205 indexes.insert(
1206 segment,
1207 StaticFileSegmentIndex {
1208 min_block_range,
1209 max_block,
1210 expected_block_ranges_by_max_block,
1211 available_block_ranges_by_max_tx,
1212 },
1213 );
1214 }
1215
1216 self.map.clear();
1218
1219 if let Some(lowest_range) =
1221 indexes.get(StaticFileSegment::Transactions).and_then(|index| index.min_block_range)
1222 {
1223 self.earliest_history_height
1225 .store(lowest_range.start(), std::sync::atomic::Ordering::Relaxed);
1226 }
1227
1228 Ok(())
1229 }
1230
1231 #[instrument(skip(self, provider), fields(read_only = self.is_read_only()))]
1255 pub fn check_consistency<Provider>(
1256 &self,
1257 provider: &Provider,
1258 ) -> ProviderResult<Option<PipelineTarget>>
1259 where
1260 Provider: DBProvider
1261 + BlockReader
1262 + StageCheckpointReader
1263 + PruneCheckpointReader
1264 + ChainSpecProvider
1265 + StorageSettingsCache,
1266 N: NodePrimitives<Receipt: Value, BlockHeader: Value, SignedTx: Value>,
1267 {
1268 if provider.chain_spec().is_optimism() &&
1275 reth_chainspec::Chain::optimism_mainnet() == provider.chain_spec().chain_id()
1276 {
1277 const OVM_HEADER_1_HASH: B256 =
1279 b256!("0xbee7192e575af30420cae0c7776304ac196077ee72b048970549e4f08e875453");
1280 if provider.block_number(OVM_HEADER_1_HASH)?.is_some() {
1281 info!(target: "reth::cli",
1282 "Skipping storage verification for OP mainnet, expected inconsistency in OVM chain"
1283 );
1284 return Ok(None);
1285 }
1286 }
1287
1288 info!(target: "reth::cli", "Verifying storage consistency.");
1289
1290 let mut unwind_target: Option<BlockNumber> = None;
1291
1292 let mut update_unwind_target = |new_target| {
1293 unwind_target =
1294 unwind_target.map(|current| current.min(new_target)).or(Some(new_target));
1295 };
1296
1297 for segment in self.segments_to_check(provider) {
1298 let span = info_span!(
1299 "Checking consistency for segment",
1300 ?segment,
1301 initial_highest_block = tracing::field::Empty,
1302 highest_block = tracing::field::Empty,
1303 highest_tx = tracing::field::Empty,
1304 );
1305 let _guard = span.enter();
1306
1307 debug!(target: "reth::providers::static_file", "Checking consistency for segment");
1308
1309 let (initial_highest_block, mut highest_block) = self.maybe_heal_segment(segment)?;
1311 span.record("initial_highest_block", initial_highest_block);
1312 span.record("highest_block", highest_block);
1313
1314 if initial_highest_block != highest_block {
1319 info!(
1320 target: "reth::providers::static_file",
1321 unwind_target = highest_block,
1322 "Setting unwind target."
1323 );
1324 update_unwind_target(highest_block.unwrap_or_default());
1325 }
1326
1327 let highest_tx = self.get_highest_static_file_tx(segment);
1333 span.record("highest_tx", highest_tx);
1334 debug!(target: "reth::providers::static_file", "Checking tx index segment");
1335
1336 if let Some(highest_tx) = highest_tx {
1337 let mut last_block = highest_block.unwrap_or_default();
1338 debug!(target: "reth::providers::static_file", last_block, highest_tx, "Verifying last transaction matches last block indices");
1339 loop {
1340 let Some(indices) = provider.block_body_indices(last_block)? else {
1341 debug!(target: "reth::providers::static_file", last_block, "Block body indices not found, static files ahead of database");
1342 break
1346 };
1347
1348 debug!(target: "reth::providers::static_file", last_block, last_tx_num = indices.last_tx_num(), "Found block body indices");
1349
1350 if indices.last_tx_num() <= highest_tx {
1351 break
1352 }
1353
1354 if last_block == 0 {
1355 debug!(target: "reth::providers::static_file", "Reached block 0 in verification loop");
1356 break
1357 }
1358
1359 last_block -= 1;
1360
1361 info!(
1362 target: "reth::providers::static_file",
1363 highest_block = self.get_highest_static_file_block(segment),
1364 unwind_target = last_block,
1365 "Setting unwind target."
1366 );
1367 span.record("highest_block", last_block);
1368 highest_block = Some(last_block);
1369 update_unwind_target(last_block);
1370 }
1371 }
1372
1373 debug!(target: "reth::providers::static_file", "Ensuring invariants for segment");
1374
1375 match self.ensure_invariants_for(provider, segment, highest_tx, highest_block)? {
1376 Some(unwind) => {
1377 debug!(target: "reth::providers::static_file", unwind_target=unwind, "Invariants check returned unwind target");
1378 update_unwind_target(unwind);
1379 }
1380 None => {
1381 debug!(target: "reth::providers::static_file", "Invariants check completed, no unwind needed")
1382 }
1383 }
1384 }
1385
1386 Ok(unwind_target.map(PipelineTarget::Unwind))
1387 }
1388
1389 pub fn check_file_consistency<Provider>(&self, provider: &Provider) -> ProviderResult<()>
1398 where
1399 Provider: DBProvider + ChainSpecProvider + StorageSettingsCache + PruneCheckpointReader,
1400 {
1401 info!(target: "reth::cli", "Healing static file inconsistencies.");
1402
1403 for segment in self.segments_to_check(provider) {
1404 let _guard = info_span!("healing_static_file_segment", ?segment).entered();
1405 let _ = self.maybe_heal_segment(segment)?;
1406 }
1407
1408 Ok(())
1409 }
1410
1411 fn segments_to_check<'a, Provider>(
1413 &'a self,
1414 provider: &'a Provider,
1415 ) -> impl Iterator<Item = StaticFileSegment> + 'a
1416 where
1417 Provider: DBProvider + ChainSpecProvider + StorageSettingsCache + PruneCheckpointReader,
1418 {
1419 StaticFileSegment::iter()
1420 .filter(move |segment| self.should_check_segment(provider, *segment))
1421 }
1422
1423 fn should_check_segment<Provider>(
1425 &self,
1426 provider: &Provider,
1427 segment: StaticFileSegment,
1428 ) -> bool
1429 where
1430 Provider: DBProvider + ChainSpecProvider + StorageSettingsCache + PruneCheckpointReader,
1431 {
1432 match segment {
1433 StaticFileSegment::Headers | StaticFileSegment::Transactions => true,
1434 StaticFileSegment::Receipts => {
1435 if EitherWriter::receipts_destination(provider).is_database() {
1436 debug!(target: "reth::providers::static_file", ?segment, "Skipping receipts segment: receipts stored in database");
1439 return false;
1440 }
1441
1442 if NamedChain::Gnosis == provider.chain_spec().chain_id() ||
1443 NamedChain::Chiado == provider.chain_spec().chain_id()
1444 {
1445 debug!(target: "reth::providers::static_file", ?segment, "Skipping receipts segment: broken historical import for gnosis/chiado");
1449 return false;
1450 }
1451
1452 true
1453 }
1454 StaticFileSegment::TransactionSenders => {
1455 if EitherWriterDestination::senders(provider).is_database() {
1456 debug!(target: "reth::providers::static_file", ?segment, "Skipping senders segment: senders stored in database");
1457 return false;
1458 }
1459
1460 if Self::is_segment_fully_pruned(provider, PruneSegment::SenderRecovery) {
1461 debug!(target: "reth::providers::static_file", ?segment, "Skipping senders segment: fully pruned");
1462 return false;
1463 }
1464
1465 true
1466 }
1467 StaticFileSegment::AccountChangeSets => {
1468 if EitherWriter::account_changesets_destination(provider).is_database() {
1469 debug!(target: "reth::providers::static_file", ?segment, "Skipping account changesets segment: changesets stored in database");
1470 return false;
1471 }
1472 true
1473 }
1474 StaticFileSegment::StorageChangeSets => {
1475 if EitherWriter::storage_changesets_destination(provider).is_database() {
1476 debug!(target: "reth::providers::static_file", ?segment, "Skipping storage changesets segment: changesets stored in database");
1477 return false
1478 }
1479 true
1480 }
1481 }
1482 }
1483
1484 fn is_segment_fully_pruned<Provider>(provider: &Provider, segment: PruneSegment) -> bool
1488 where
1489 Provider: PruneCheckpointReader,
1490 {
1491 provider
1492 .get_prune_checkpoint(segment)
1493 .ok()
1494 .flatten()
1495 .is_some_and(|checkpoint| checkpoint.prune_mode.is_full())
1496 }
1497
1498 fn check_segment_consistency(&self, segment: StaticFileSegment) -> ProviderResult<()> {
1503 debug!(target: "reth::providers::static_file", "Checking segment consistency");
1504 if let Some(latest_block) = self.get_highest_static_file_block(segment) {
1505 let file_path = self
1506 .directory()
1507 .join(segment.filename(&self.find_fixed_range(segment, latest_block)));
1508 debug!(target: "reth::providers::static_file", ?file_path, latest_block, "Loading NippyJar for consistency check");
1509
1510 let jar = NippyJar::<SegmentHeader>::load(&file_path).map_err(ProviderError::other)?;
1511 debug!(target: "reth::providers::static_file", "NippyJar loaded, checking consistency");
1512
1513 NippyJarChecker::new(jar).check_consistency().map_err(ProviderError::other)?;
1514 debug!(target: "reth::providers::static_file", "NippyJar consistency check passed");
1515 } else {
1516 debug!(target: "reth::providers::static_file", "No static file block found, skipping consistency check");
1517 }
1518 Ok(())
1519 }
1520
1521 fn maybe_heal_segment(
1537 &self,
1538 segment: StaticFileSegment,
1539 ) -> ProviderResult<(Option<BlockNumber>, Option<BlockNumber>)> {
1540 let initial_highest_block = self.get_highest_static_file_block(segment);
1541 debug!(target: "reth::providers::static_file", ?initial_highest_block, "Initial highest block for segment");
1542
1543 if self.access.is_read_only() {
1544 debug!(target: "reth::providers::static_file", "Checking segment consistency (read-only)");
1547 self.check_segment_consistency(segment)?;
1548 } else {
1549 debug!(target: "reth::providers::static_file", "Fetching latest writer which might heal any potential inconsistency");
1552 self.latest_writer(segment)?;
1553 }
1554
1555 let highest_block = self.get_highest_static_file_block(segment);
1558
1559 Ok((initial_highest_block, highest_block))
1560 }
1561
1562 fn ensure_invariants_for<Provider>(
1564 &self,
1565 provider: &Provider,
1566 segment: StaticFileSegment,
1567 highest_tx: Option<u64>,
1568 highest_block: Option<BlockNumber>,
1569 ) -> ProviderResult<Option<BlockNumber>>
1570 where
1571 Provider: DBProvider + BlockReader + StageCheckpointReader,
1572 N: NodePrimitives<Receipt: Value, BlockHeader: Value, SignedTx: Value>,
1573 {
1574 match segment {
1575 StaticFileSegment::Headers => self
1576 .ensure_invariants::<_, tables::Headers<N::BlockHeader>>(
1577 provider,
1578 segment,
1579 highest_block,
1580 highest_block,
1581 ),
1582 StaticFileSegment::Transactions => self
1583 .ensure_invariants::<_, tables::Transactions<N::SignedTx>>(
1584 provider,
1585 segment,
1586 highest_tx,
1587 highest_block,
1588 ),
1589 StaticFileSegment::Receipts => self
1590 .ensure_invariants::<_, tables::Receipts<N::Receipt>>(
1591 provider,
1592 segment,
1593 highest_tx,
1594 highest_block,
1595 ),
1596 StaticFileSegment::TransactionSenders => self
1597 .ensure_invariants::<_, tables::TransactionSenders>(
1598 provider,
1599 segment,
1600 highest_tx,
1601 highest_block,
1602 ),
1603 StaticFileSegment::AccountChangeSets => self
1604 .ensure_invariants::<_, tables::AccountChangeSets>(
1605 provider,
1606 segment,
1607 highest_tx,
1608 highest_block,
1609 ),
1610 StaticFileSegment::StorageChangeSets => self
1611 .ensure_changeset_invariants_by_block::<_, tables::StorageChangeSets, _>(
1612 provider,
1613 segment,
1614 highest_block,
1615 |key| key.block_number(),
1616 ),
1617 }
1618 }
1619
1620 #[instrument(skip(self, provider, segment), fields(table = T::NAME))]
1635 fn ensure_invariants<Provider, T: Table<Key = u64>>(
1636 &self,
1637 provider: &Provider,
1638 segment: StaticFileSegment,
1639 highest_static_file_entry: Option<u64>,
1640 highest_static_file_block: Option<BlockNumber>,
1641 ) -> ProviderResult<Option<BlockNumber>>
1642 where
1643 Provider: DBProvider + BlockReader + StageCheckpointReader,
1644 {
1645 debug!(target: "reth::providers::static_file", "Ensuring invariants");
1646 let mut db_cursor = provider.tx_ref().cursor_read::<T>()?;
1647
1648 if let Some((db_first_entry, _)) = db_cursor.first()? {
1649 debug!(target: "reth::providers::static_file", db_first_entry, "Found first database entry");
1650 if let (Some(highest_entry), Some(highest_block)) =
1651 (highest_static_file_entry, highest_static_file_block)
1652 {
1653 if !(db_first_entry <= highest_entry || highest_entry + 1 == db_first_entry) {
1657 info!(
1658 target: "reth::providers::static_file",
1659 ?db_first_entry,
1660 ?highest_entry,
1661 unwind_target = highest_block,
1662 "Setting unwind target."
1663 );
1664 return Ok(Some(highest_block));
1665 }
1666 }
1667
1668 if let Some((db_last_entry, _)) = db_cursor.last()? &&
1669 highest_static_file_entry
1670 .is_none_or(|highest_entry| db_last_entry > highest_entry)
1671 {
1672 debug!(target: "reth::providers::static_file", db_last_entry, "Database has entries beyond static files, no unwind needed");
1673 return Ok(None)
1674 }
1675 } else {
1676 debug!(target: "reth::providers::static_file", "No database entries found");
1677 }
1678
1679 let highest_static_file_entry = highest_static_file_entry.unwrap_or_default();
1680 let highest_static_file_block = highest_static_file_block.unwrap_or_default();
1681
1682 let stage_id = segment.to_stage_id();
1685 let checkpoint_block_number =
1686 provider.get_stage_checkpoint(stage_id)?.unwrap_or_default().block_number;
1687 debug!(target: "reth::providers::static_file", ?stage_id, checkpoint_block_number, "Retrieved stage checkpoint");
1688
1689 if checkpoint_block_number > highest_static_file_block {
1691 info!(
1692 target: "reth::providers::static_file",
1693 checkpoint_block_number,
1694 unwind_target = highest_static_file_block,
1695 "Setting unwind target."
1696 );
1697 return Ok(Some(highest_static_file_block));
1698 }
1699
1700 if checkpoint_block_number >= highest_static_file_block {
1702 debug!(target: "reth::providers::static_file", "Invariants ensured, returning None");
1703 return Ok(None);
1704 }
1705
1706 info!(
1712 target: "reth::providers",
1713 from = highest_static_file_block,
1714 to = checkpoint_block_number,
1715 "Unwinding static file segment."
1716 );
1717 let mut writer = self.latest_writer(segment)?;
1718
1719 match segment {
1720 StaticFileSegment::Headers => {
1721 let prune_count = highest_static_file_block - checkpoint_block_number;
1722 debug!(target: "reth::providers::static_file", prune_count, "Pruning headers");
1723 writer.prune_headers(prune_count)?;
1725 }
1726 StaticFileSegment::Transactions |
1727 StaticFileSegment::Receipts |
1728 StaticFileSegment::TransactionSenders => {
1729 if let Some(block) = provider.block_body_indices(checkpoint_block_number)? {
1730 let number = highest_static_file_entry - block.last_tx_num();
1731 debug!(target: "reth::providers::static_file", prune_count = number, checkpoint_block_number, "Pruning transaction based segment");
1732
1733 match segment {
1734 StaticFileSegment::Transactions => {
1735 writer.prune_transactions(number, checkpoint_block_number)?
1736 }
1737 StaticFileSegment::Receipts => {
1738 writer.prune_receipts(number, checkpoint_block_number)?
1739 }
1740 StaticFileSegment::TransactionSenders => {
1741 writer.prune_transaction_senders(number, checkpoint_block_number)?
1742 }
1743 StaticFileSegment::Headers |
1744 StaticFileSegment::AccountChangeSets |
1745 StaticFileSegment::StorageChangeSets => {
1746 unreachable!()
1747 }
1748 }
1749 } else {
1750 debug!(target: "reth::providers::static_file", checkpoint_block_number, "No block body indices found for checkpoint block");
1751 }
1752 }
1753 StaticFileSegment::AccountChangeSets => {
1754 writer.prune_account_changesets(checkpoint_block_number)?;
1755 }
1756 StaticFileSegment::StorageChangeSets => {
1757 writer.prune_storage_changesets(checkpoint_block_number)?;
1758 }
1759 }
1760
1761 debug!(target: "reth::providers::static_file", "Committing writer after pruning");
1762 writer.commit()?;
1763 debug!(target: "reth::providers::static_file", "Writer committed successfully");
1764
1765 debug!(target: "reth::providers::static_file", "Invariants ensured, returning None");
1766 Ok(None)
1767 }
1768
1769 fn ensure_changeset_invariants_by_block<Provider, T, F>(
1770 &self,
1771 provider: &Provider,
1772 segment: StaticFileSegment,
1773 highest_static_file_block: Option<BlockNumber>,
1774 block_from_key: F,
1775 ) -> ProviderResult<Option<BlockNumber>>
1776 where
1777 Provider: DBProvider + BlockReader + StageCheckpointReader,
1778 T: Table,
1779 F: Fn(&T::Key) -> BlockNumber,
1780 {
1781 debug!(
1782 target: "reth::providers::static_file",
1783 ?segment,
1784 ?highest_static_file_block,
1785 "Ensuring changeset invariants"
1786 );
1787 let mut db_cursor = provider.tx_ref().cursor_read::<T>()?;
1788
1789 if let Some((db_first_key, _)) = db_cursor.first()? {
1790 let db_first_block = block_from_key(&db_first_key);
1791 if let Some(highest_block) = highest_static_file_block &&
1792 !(db_first_block <= highest_block || highest_block + 1 == db_first_block)
1793 {
1794 info!(
1795 target: "reth::providers::static_file",
1796 ?db_first_block,
1797 ?highest_block,
1798 unwind_target = highest_block,
1799 ?segment,
1800 "Setting unwind target."
1801 );
1802 return Ok(Some(highest_block))
1803 }
1804
1805 if let Some((db_last_key, _)) = db_cursor.last()? &&
1806 highest_static_file_block
1807 .is_none_or(|highest_block| block_from_key(&db_last_key) > highest_block)
1808 {
1809 debug!(
1810 target: "reth::providers::static_file",
1811 ?segment,
1812 "Database has entries beyond static files, no unwind needed"
1813 );
1814 return Ok(None)
1815 }
1816 } else {
1817 debug!(target: "reth::providers::static_file", ?segment, "No database entries found");
1818 }
1819
1820 let highest_static_file_block = highest_static_file_block.unwrap_or_default();
1821
1822 let stage_id = segment.to_stage_id();
1823 let checkpoint_block_number =
1824 provider.get_stage_checkpoint(stage_id)?.unwrap_or_default().block_number;
1825
1826 if checkpoint_block_number > highest_static_file_block {
1827 info!(
1828 target: "reth::providers::static_file",
1829 checkpoint_block_number,
1830 unwind_target = highest_static_file_block,
1831 ?segment,
1832 "Setting unwind target."
1833 );
1834 return Ok(Some(highest_static_file_block))
1835 }
1836
1837 if checkpoint_block_number < highest_static_file_block {
1838 info!(
1839 target: "reth::providers",
1840 ?segment,
1841 from = highest_static_file_block,
1842 to = checkpoint_block_number,
1843 "Unwinding static file segment."
1844 );
1845 let mut writer = self.latest_writer(segment)?;
1846 match segment {
1847 StaticFileSegment::AccountChangeSets => {
1848 writer.prune_account_changesets(checkpoint_block_number)?;
1849 }
1850 StaticFileSegment::StorageChangeSets => {
1851 writer.prune_storage_changesets(checkpoint_block_number)?;
1852 }
1853 _ => unreachable!("invalid segment for changeset invariants"),
1854 }
1855 writer.commit()?;
1856 }
1857
1858 Ok(None)
1859 }
1860
1861 pub fn earliest_history_height(&self) -> BlockNumber {
1869 self.earliest_history_height.load(std::sync::atomic::Ordering::Relaxed)
1870 }
1871
1872 pub fn get_lowest_range(&self, segment: StaticFileSegment) -> Option<SegmentRangeInclusive> {
1876 self.indexes.read().get(segment).and_then(|index| index.min_block_range)
1877 }
1878
1879 pub fn get_lowest_range_start(&self, segment: StaticFileSegment) -> Option<BlockNumber> {
1885 self.get_lowest_range(segment).map(|range| range.start())
1886 }
1887
1888 pub fn get_lowest_range_end(&self, segment: StaticFileSegment) -> Option<BlockNumber> {
1894 self.get_lowest_range(segment).map(|range| range.end())
1895 }
1896
1897 pub fn get_highest_static_file_block(&self, segment: StaticFileSegment) -> Option<BlockNumber> {
1901 self.indexes.read().get(segment).map(|index| index.max_block)
1902 }
1903
1904 fn bound_range(
1911 &self,
1912 range: impl RangeBounds<BlockNumber>,
1913 segment: StaticFileSegment,
1914 ) -> RangeInclusive<BlockNumber> {
1915 let highest_block = self.get_highest_static_file_block(segment).unwrap_or(0);
1916
1917 let start = match range.start_bound() {
1918 Bound::Included(&n) => n,
1919 Bound::Excluded(&n) => n.saturating_add(1),
1920 Bound::Unbounded => 0,
1921 };
1922 let end = match range.end_bound() {
1923 Bound::Included(&n) => n.min(highest_block),
1924 Bound::Excluded(&n) => n.saturating_sub(1).min(highest_block),
1925 Bound::Unbounded => highest_block,
1926 };
1927
1928 start..=end
1929 }
1930
1931 pub fn get_highest_static_file_tx(&self, segment: StaticFileSegment) -> Option<TxNumber> {
1935 self.indexes
1936 .read()
1937 .get(segment)
1938 .and_then(|index| index.available_block_ranges_by_max_tx.as_ref())
1939 .and_then(|index| index.last_key_value().map(|(last_tx, _)| *last_tx))
1940 }
1941
1942 pub fn get_highest_static_files(&self) -> HighestStaticFiles {
1944 HighestStaticFiles {
1945 receipts: self.get_highest_static_file_block(StaticFileSegment::Receipts),
1946 }
1947 }
1948
1949 pub fn find_static_file<T>(
1952 &self,
1953 segment: StaticFileSegment,
1954 func: impl Fn(StaticFileJarProvider<'_, N>) -> ProviderResult<Option<T>>,
1955 ) -> ProviderResult<Option<T>> {
1956 if let Some(ranges) =
1957 self.indexes.read().get(segment).map(|index| &index.expected_block_ranges_by_max_block)
1958 {
1959 for range in ranges.values().rev() {
1961 if let Some(res) = func(self.get_or_create_jar_provider(segment, range)?)? {
1962 return Ok(Some(res));
1963 }
1964 }
1965 }
1966
1967 Ok(None)
1968 }
1969
1970 pub fn fetch_range_with_predicate<T, F, P>(
1976 &self,
1977 segment: StaticFileSegment,
1978 range: Range<u64>,
1979 mut get_fn: F,
1980 mut predicate: P,
1981 ) -> ProviderResult<Vec<T>>
1982 where
1983 F: FnMut(&mut StaticFileCursor<'_>, u64) -> ProviderResult<Option<T>>,
1984 P: FnMut(&T) -> bool,
1985 {
1986 let mut result = Vec::with_capacity((range.end - range.start).min(100) as usize);
1987
1988 macro_rules! get_provider {
1992 ($number:expr) => {{
1993 match self.get_segment_provider(segment, $number) {
1994 Ok(provider) => provider,
1995 Err(
1996 ProviderError::MissingStaticFileBlock(_, _) |
1997 ProviderError::MissingStaticFileTx(_, _),
1998 ) => return Ok(result),
1999 Err(err) => return Err(err),
2000 }
2001 }};
2002 }
2003
2004 let mut provider = get_provider!(range.start);
2005 let mut cursor = provider.cursor()?;
2006
2007 'outer: for number in range {
2009 let mut retrying = false;
2013
2014 'inner: loop {
2016 match get_fn(&mut cursor, number)? {
2017 Some(res) => {
2018 if !predicate(&res) {
2019 break 'outer;
2020 }
2021 result.push(res);
2022 break 'inner;
2023 }
2024 None => {
2025 if retrying {
2026 return Ok(result);
2027 }
2028 drop(cursor);
2033 drop(provider);
2034 provider = get_provider!(number);
2035 cursor = provider.cursor()?;
2036 retrying = true;
2037 }
2038 }
2039 }
2040 }
2041
2042 result.shrink_to_fit();
2043
2044 Ok(result)
2045 }
2046
2047 pub fn fetch_range_iter<'a, T, F>(
2052 &'a self,
2053 segment: StaticFileSegment,
2054 range: Range<u64>,
2055 get_fn: F,
2056 ) -> ProviderResult<impl Iterator<Item = ProviderResult<Option<T>>> + 'a>
2057 where
2058 F: Fn(&mut StaticFileCursor<'_>, u64) -> ProviderResult<Option<T>> + 'a,
2059 T: std::fmt::Debug,
2060 {
2061 let mut provider = self.get_maybe_segment_provider(segment, range.start)?;
2062 Ok(range.map(move |number| {
2063 match provider
2064 .as_ref()
2065 .map(|provider| get_fn(&mut provider.cursor()?, number))
2066 .and_then(|result| result.transpose())
2067 {
2068 Some(result) => result.map(Some),
2069 None => {
2070 provider.take();
2074 provider = self.get_maybe_segment_provider(segment, number)?;
2075 provider
2076 .as_ref()
2077 .map(|provider| get_fn(&mut provider.cursor()?, number))
2078 .and_then(|result| result.transpose())
2079 .transpose()
2080 }
2081 }
2082 }))
2083 }
2084
2085 pub fn directory(&self) -> &Path {
2087 &self.path
2088 }
2089
2090 pub fn get_with_static_file_or_database<T, FS, FD>(
2100 &self,
2101 segment: StaticFileSegment,
2102 number: u64,
2103 fetch_from_static_file: FS,
2104 fetch_from_database: FD,
2105 ) -> ProviderResult<Option<T>>
2106 where
2107 FS: Fn(&Self) -> ProviderResult<Option<T>>,
2108 FD: Fn() -> ProviderResult<Option<T>>,
2109 {
2110 let static_file_upper_bound = if segment.is_block_or_change_based() {
2112 self.get_highest_static_file_block(segment)
2113 } else {
2114 self.get_highest_static_file_tx(segment)
2115 };
2116
2117 if static_file_upper_bound
2118 .is_some_and(|static_file_upper_bound| static_file_upper_bound >= number)
2119 {
2120 return fetch_from_static_file(self);
2121 }
2122 fetch_from_database()
2123 }
2124
2125 pub fn get_range_with_static_file_or_database<T, P, FS, FD>(
2137 &self,
2138 segment: StaticFileSegment,
2139 mut block_or_tx_range: Range<u64>,
2140 fetch_from_static_file: FS,
2141 mut fetch_from_database: FD,
2142 mut predicate: P,
2143 ) -> ProviderResult<Vec<T>>
2144 where
2145 FS: Fn(&Self, Range<u64>, &mut P) -> ProviderResult<Vec<T>>,
2146 FD: FnMut(Range<u64>, P) -> ProviderResult<Vec<T>>,
2147 P: FnMut(&T) -> bool,
2148 {
2149 let mut data = Vec::new();
2150
2151 if let Some(static_file_upper_bound) = if segment.is_block_or_change_based() {
2153 self.get_highest_static_file_block(segment)
2154 } else {
2155 self.get_highest_static_file_tx(segment)
2156 } && block_or_tx_range.start <= static_file_upper_bound
2157 {
2158 let end = block_or_tx_range.end.min(static_file_upper_bound + 1);
2159 data.extend(fetch_from_static_file(
2160 self,
2161 block_or_tx_range.start..end,
2162 &mut predicate,
2163 )?);
2164 block_or_tx_range.start = end;
2165 }
2166
2167 if block_or_tx_range.end > block_or_tx_range.start {
2168 data.extend(fetch_from_database(block_or_tx_range, predicate)?)
2169 }
2170
2171 Ok(data)
2172 }
2173
2174 #[cfg(any(test, feature = "test-utils"))]
2176 pub fn path(&self) -> &Path {
2177 &self.path
2178 }
2179
2180 #[cfg(any(test, feature = "test-utils"))]
2182 pub fn tx_index(&self, segment: StaticFileSegment) -> Option<SegmentRanges> {
2183 self.indexes
2184 .read()
2185 .get(segment)
2186 .and_then(|index| index.available_block_ranges_by_max_tx.as_ref())
2187 .cloned()
2188 }
2189
2190 #[cfg(any(test, feature = "test-utils"))]
2192 pub fn expected_block_index(&self, segment: StaticFileSegment) -> Option<SegmentRanges> {
2193 self.indexes
2194 .read()
2195 .get(segment)
2196 .map(|index| &index.expected_block_ranges_by_max_block)
2197 .cloned()
2198 }
2199}
2200
2201#[derive(Debug)]
2202struct StaticFileSegmentIndex {
2203 min_block_range: Option<SegmentRangeInclusive>,
2215 max_block: u64,
2217 expected_block_ranges_by_max_block: SegmentRanges,
2223 available_block_ranges_by_max_tx: Option<SegmentRanges>,
2230}
2231
2232pub trait StaticFileWriter {
2234 type Primitives: Send + Sync + 'static;
2236
2237 fn get_writer(
2239 &self,
2240 block: BlockNumber,
2241 segment: StaticFileSegment,
2242 ) -> ProviderResult<StaticFileProviderRWRefMut<'_, Self::Primitives>>;
2243
2244 fn latest_writer(
2247 &self,
2248 segment: StaticFileSegment,
2249 ) -> ProviderResult<StaticFileProviderRWRefMut<'_, Self::Primitives>>;
2250
2251 fn commit(&self) -> ProviderResult<()>;
2253
2254 fn has_unwind_queued(&self) -> bool;
2256
2257 fn finalize(&self) -> ProviderResult<()>;
2261}
2262
2263impl<N: NodePrimitives> StaticFileWriter for StaticFileProvider<N> {
2264 type Primitives = N;
2265
2266 fn get_writer(
2267 &self,
2268 block: BlockNumber,
2269 segment: StaticFileSegment,
2270 ) -> ProviderResult<StaticFileProviderRWRefMut<'_, Self::Primitives>> {
2271 if self.access.is_read_only() {
2272 return Err(ProviderError::ReadOnlyStaticFileAccess);
2273 }
2274
2275 trace!(target: "providers::static_file", ?block, ?segment, "Getting static file writer.");
2276 self.writers.get_or_create(segment, || {
2277 StaticFileProviderRW::new(segment, block, Arc::downgrade(&self.0), self.metrics.clone())
2278 })
2279 }
2280
2281 fn latest_writer(
2282 &self,
2283 segment: StaticFileSegment,
2284 ) -> ProviderResult<StaticFileProviderRWRefMut<'_, Self::Primitives>> {
2285 let genesis_number = self.0.as_ref().genesis_block_number();
2286 self.get_writer(
2287 self.get_highest_static_file_block(segment).unwrap_or(genesis_number),
2288 segment,
2289 )
2290 }
2291
2292 fn commit(&self) -> ProviderResult<()> {
2293 self.writers.commit()
2294 }
2295
2296 fn has_unwind_queued(&self) -> bool {
2297 self.writers.has_unwind_queued()
2298 }
2299
2300 fn finalize(&self) -> ProviderResult<()> {
2301 self.writers.finalize()
2302 }
2303}
2304
2305impl<N: NodePrimitives> ChangeSetReader for StaticFileProvider<N> {
2306 fn account_block_changeset(
2307 &self,
2308 block_number: BlockNumber,
2309 ) -> ProviderResult<Vec<reth_db::models::AccountBeforeTx>> {
2310 let provider = match self.get_segment_provider_for_block(
2311 StaticFileSegment::AccountChangeSets,
2312 block_number,
2313 None,
2314 ) {
2315 Ok(provider) => provider,
2316 Err(ProviderError::MissingStaticFileBlock(_, _)) => return Ok(Vec::new()),
2317 Err(err) => return Err(err),
2318 };
2319
2320 if let Some(offset) = provider.read_changeset_offset(block_number)? {
2321 let mut cursor = provider.cursor()?;
2322 let mut changeset = Vec::with_capacity(offset.num_changes() as usize);
2323
2324 for i in offset.changeset_range() {
2325 if let Some(change) =
2326 cursor.get_one::<reth_db::static_file::AccountChangesetMask>(i.into())?
2327 {
2328 changeset.push(change)
2329 }
2330 }
2331 Ok(changeset)
2332 } else {
2333 Ok(Vec::new())
2334 }
2335 }
2336
2337 fn get_account_before_block(
2338 &self,
2339 block_number: BlockNumber,
2340 address: Address,
2341 ) -> ProviderResult<Option<reth_db::models::AccountBeforeTx>> {
2342 let provider = match self.get_segment_provider_for_block(
2343 StaticFileSegment::AccountChangeSets,
2344 block_number,
2345 None,
2346 ) {
2347 Ok(provider) => provider,
2348 Err(ProviderError::MissingStaticFileBlock(_, _)) => return Ok(None),
2349 Err(err) => return Err(err),
2350 };
2351
2352 let Some(offset) = provider.read_changeset_offset(block_number)? else {
2353 return Ok(None);
2354 };
2355
2356 let mut cursor = provider.cursor()?;
2357 let range = offset.changeset_range();
2358 let mut low = range.start;
2359 let mut high = range.end;
2360
2361 while low < high {
2362 let mid = low + (high - low) / 2;
2363 if let Some(change) =
2364 cursor.get_one::<reth_db::static_file::AccountChangesetMask>(mid.into())?
2365 {
2366 if change.address < address {
2367 low = mid + 1;
2368 } else {
2369 high = mid;
2370 }
2371 } else {
2372 debug!(
2375 target: "providers::static_file",
2376 ?low,
2377 ?mid,
2378 ?high,
2379 ?range,
2380 ?block_number,
2381 ?address,
2382 "Cannot continue binary search for account changeset fetch"
2383 );
2384 low = range.end;
2385 break;
2386 }
2387 }
2388
2389 if low < range.end &&
2390 let Some(change) = cursor
2391 .get_one::<reth_db::static_file::AccountChangesetMask>(low.into())?
2392 .filter(|change| change.address == address)
2393 {
2394 return Ok(Some(change));
2395 }
2396
2397 Ok(None)
2398 }
2399
2400 fn account_changesets_range(
2401 &self,
2402 range: impl core::ops::RangeBounds<BlockNumber>,
2403 ) -> ProviderResult<Vec<(BlockNumber, reth_db::models::AccountBeforeTx)>> {
2404 let range = self.bound_range(range, StaticFileSegment::AccountChangeSets);
2405 self.walk_account_changeset_range(range).collect()
2406 }
2407}
2408
2409impl<N: NodePrimitives> StorageChangeSetReader for StaticFileProvider<N> {
2410 fn storage_changeset(
2411 &self,
2412 block_number: BlockNumber,
2413 ) -> ProviderResult<Vec<(BlockNumberAddress, StorageEntry)>> {
2414 let provider = match self.get_segment_provider_for_block(
2415 StaticFileSegment::StorageChangeSets,
2416 block_number,
2417 None,
2418 ) {
2419 Ok(provider) => provider,
2420 Err(ProviderError::MissingStaticFileBlock(_, _)) => return Ok(Vec::new()),
2421 Err(err) => return Err(err),
2422 };
2423
2424 if let Some(offset) = provider.read_changeset_offset(block_number)? {
2425 let mut cursor = provider.cursor()?;
2426 let mut changeset = Vec::with_capacity(offset.num_changes() as usize);
2427
2428 for i in offset.changeset_range() {
2429 if let Some(change) = cursor.get_one::<StorageChangesetMask>(i.into())? {
2430 let block_address = BlockNumberAddress((block_number, change.address));
2431 let entry = StorageEntry { key: change.key, value: change.value };
2432 changeset.push((block_address, entry));
2433 }
2434 }
2435 Ok(changeset)
2436 } else {
2437 Ok(Vec::new())
2438 }
2439 }
2440
2441 fn get_storage_before_block(
2442 &self,
2443 block_number: BlockNumber,
2444 address: Address,
2445 storage_key: B256,
2446 ) -> ProviderResult<Option<StorageEntry>> {
2447 let provider = match self.get_segment_provider_for_block(
2448 StaticFileSegment::StorageChangeSets,
2449 block_number,
2450 None,
2451 ) {
2452 Ok(provider) => provider,
2453 Err(ProviderError::MissingStaticFileBlock(_, _)) => return Ok(None),
2454 Err(err) => return Err(err),
2455 };
2456
2457 let Some(offset) = provider.read_changeset_offset(block_number)? else {
2458 return Ok(None);
2459 };
2460
2461 let mut cursor = provider.cursor()?;
2462 let range = offset.changeset_range();
2463 let mut low = range.start;
2464 let mut high = range.end;
2465
2466 while low < high {
2467 let mid = low + (high - low) / 2;
2468 if let Some(change) = cursor.get_one::<StorageChangesetMask>(mid.into())? {
2469 match (change.address, change.key).cmp(&(address, storage_key)) {
2470 std::cmp::Ordering::Less => low = mid + 1,
2471 _ => high = mid,
2472 }
2473 } else {
2474 debug!(
2475 target: "providers::static_file",
2476 ?low,
2477 ?mid,
2478 ?high,
2479 ?range,
2480 ?block_number,
2481 ?address,
2482 ?storage_key,
2483 "Cannot continue binary search for storage changeset fetch"
2484 );
2485 low = range.end;
2486 break;
2487 }
2488 }
2489
2490 if low < range.end &&
2491 let Some(change) = cursor
2492 .get_one::<StorageChangesetMask>(low.into())?
2493 .filter(|change| change.address == address && change.key == storage_key)
2494 {
2495 return Ok(Some(StorageEntry { key: change.key, value: change.value }));
2496 }
2497
2498 Ok(None)
2499 }
2500
2501 fn storage_changesets_range(
2502 &self,
2503 range: impl RangeBounds<BlockNumber>,
2504 ) -> ProviderResult<Vec<(BlockNumberAddress, StorageEntry)>> {
2505 let range = self.bound_range(range, StaticFileSegment::StorageChangeSets);
2506 self.walk_storage_changeset_range(range).collect()
2507 }
2508}
2509
2510impl<N: NodePrimitives> StaticFileProvider<N> {
2511 pub fn walk_account_changeset_range(
2521 &self,
2522 range: impl RangeBounds<BlockNumber>,
2523 ) -> StaticFileAccountChangesetWalker<Self> {
2524 StaticFileAccountChangesetWalker::new(self.clone(), range)
2525 }
2526
2527 pub fn walk_storage_changeset_range(
2529 &self,
2530 range: impl RangeBounds<BlockNumber>,
2531 ) -> StaticFileStorageChangesetWalker<Self> {
2532 StaticFileStorageChangesetWalker::new(self.clone(), range)
2533 }
2534}
2535
2536impl<N: NodePrimitives<BlockHeader: Value>> HeaderProvider for StaticFileProvider<N> {
2537 type Header = N::BlockHeader;
2538
2539 fn header(&self, block_hash: BlockHash) -> ProviderResult<Option<Self::Header>> {
2540 self.find_static_file(StaticFileSegment::Headers, |jar_provider| {
2541 Ok(jar_provider
2542 .cursor()?
2543 .get_two::<HeaderWithHashMask<Self::Header>>((&block_hash).into())?
2544 .and_then(|(header, hash)| {
2545 if hash == block_hash {
2546 return Some(header);
2547 }
2548 None
2549 }))
2550 })
2551 }
2552
2553 fn header_by_number(&self, num: BlockNumber) -> ProviderResult<Option<Self::Header>> {
2554 self.get_segment_provider_for_block(StaticFileSegment::Headers, num, None)
2555 .and_then(|provider| provider.header_by_number(num))
2556 .or_else(|err| {
2557 if let ProviderError::MissingStaticFileBlock(_, _) = err {
2558 Ok(None)
2559 } else {
2560 Err(err)
2561 }
2562 })
2563 }
2564
2565 fn headers_range(
2566 &self,
2567 range: impl RangeBounds<BlockNumber>,
2568 ) -> ProviderResult<Vec<Self::Header>> {
2569 self.fetch_range_with_predicate(
2570 StaticFileSegment::Headers,
2571 to_range(range),
2572 |cursor, number| cursor.get_one::<HeaderMask<Self::Header>>(number.into()),
2573 |_| true,
2574 )
2575 }
2576
2577 fn sealed_header(
2578 &self,
2579 num: BlockNumber,
2580 ) -> ProviderResult<Option<SealedHeader<Self::Header>>> {
2581 self.get_segment_provider_for_block(StaticFileSegment::Headers, num, None)
2582 .and_then(|provider| provider.sealed_header(num))
2583 .or_else(|err| {
2584 if let ProviderError::MissingStaticFileBlock(_, _) = err {
2585 Ok(None)
2586 } else {
2587 Err(err)
2588 }
2589 })
2590 }
2591
2592 fn sealed_headers_while(
2593 &self,
2594 range: impl RangeBounds<BlockNumber>,
2595 predicate: impl FnMut(&SealedHeader<Self::Header>) -> bool,
2596 ) -> ProviderResult<Vec<SealedHeader<Self::Header>>> {
2597 self.fetch_range_with_predicate(
2598 StaticFileSegment::Headers,
2599 to_range(range),
2600 |cursor, number| {
2601 Ok(cursor
2602 .get_two::<HeaderWithHashMask<Self::Header>>(number.into())?
2603 .map(|(header, hash)| SealedHeader::new(header, hash)))
2604 },
2605 predicate,
2606 )
2607 }
2608}
2609
2610impl<N: NodePrimitives> BlockHashReader for StaticFileProvider<N> {
2611 fn block_hash(&self, num: u64) -> ProviderResult<Option<B256>> {
2612 self.get_segment_provider_for_block(StaticFileSegment::Headers, num, None)
2613 .and_then(|provider| provider.block_hash(num))
2614 .or_else(|err| {
2615 if let ProviderError::MissingStaticFileBlock(_, _) = err {
2616 Ok(None)
2617 } else {
2618 Err(err)
2619 }
2620 })
2621 }
2622
2623 fn canonical_hashes_range(
2624 &self,
2625 start: BlockNumber,
2626 end: BlockNumber,
2627 ) -> ProviderResult<Vec<B256>> {
2628 self.fetch_range_with_predicate(
2629 StaticFileSegment::Headers,
2630 start..end,
2631 |cursor, number| cursor.get_one::<BlockHashMask>(number.into()),
2632 |_| true,
2633 )
2634 }
2635}
2636
2637impl<N: NodePrimitives<SignedTx: Value + SignedTransaction, Receipt: Value>> ReceiptProvider
2638 for StaticFileProvider<N>
2639{
2640 type Receipt = N::Receipt;
2641
2642 fn receipt(&self, num: TxNumber) -> ProviderResult<Option<Self::Receipt>> {
2643 self.get_segment_provider_for_transaction(StaticFileSegment::Receipts, num, None)
2644 .and_then(|provider| provider.receipt(num))
2645 .or_else(|err| {
2646 if let ProviderError::MissingStaticFileTx(_, _) = err {
2647 Ok(None)
2648 } else {
2649 Err(err)
2650 }
2651 })
2652 }
2653
2654 fn receipt_by_hash(&self, hash: TxHash) -> ProviderResult<Option<Self::Receipt>> {
2655 if let Some(num) = self.transaction_id(hash)? {
2656 return self.receipt(num);
2657 }
2658 Ok(None)
2659 }
2660
2661 fn receipts_by_block(
2662 &self,
2663 _block: BlockHashOrNumber,
2664 ) -> ProviderResult<Option<Vec<Self::Receipt>>> {
2665 unreachable!()
2666 }
2667
2668 fn receipts_by_tx_range(
2669 &self,
2670 range: impl RangeBounds<TxNumber>,
2671 ) -> ProviderResult<Vec<Self::Receipt>> {
2672 self.fetch_range_with_predicate(
2673 StaticFileSegment::Receipts,
2674 to_range(range),
2675 |cursor, number| cursor.get_one::<ReceiptMask<Self::Receipt>>(number.into()),
2676 |_| true,
2677 )
2678 }
2679
2680 fn receipts_by_block_range(
2681 &self,
2682 _block_range: RangeInclusive<BlockNumber>,
2683 ) -> ProviderResult<Vec<Vec<Self::Receipt>>> {
2684 Err(ProviderError::UnsupportedProvider)
2685 }
2686}
2687
2688impl<N: NodePrimitives<SignedTx: Value, Receipt: Value, BlockHeader: Value>> TransactionsProviderExt
2689 for StaticFileProvider<N>
2690{
2691 fn transaction_hashes_by_range(
2692 &self,
2693 tx_range: Range<TxNumber>,
2694 ) -> ProviderResult<Vec<(TxHash, TxNumber)>> {
2695 let tx_range_size = (tx_range.end - tx_range.start) as usize;
2696
2697 let chunk_size = 100;
2701
2702 let chunks = tx_range
2704 .clone()
2705 .step_by(chunk_size)
2706 .map(|start| start..std::cmp::min(start + chunk_size as u64, tx_range.end));
2707 let mut channels = Vec::with_capacity(tx_range_size.div_ceil(chunk_size));
2708
2709 for chunk_range in chunks {
2710 let (channel_tx, channel_rx) = mpsc::channel();
2711 channels.push(channel_rx);
2712
2713 let manager = self.clone();
2714
2715 rayon::spawn(move || {
2719 let mut rlp_buf = Vec::with_capacity(128);
2720 let _ = manager.fetch_range_with_predicate(
2721 StaticFileSegment::Transactions,
2722 chunk_range,
2723 |cursor, number| {
2724 Ok(cursor
2725 .get_one::<TransactionMask<Self::Transaction>>(number.into())?
2726 .map(|transaction| {
2727 rlp_buf.clear();
2728 let _ = channel_tx
2729 .send(calculate_hash((number, transaction), &mut rlp_buf));
2730 }))
2731 },
2732 |_| true,
2733 );
2734 });
2735 }
2736
2737 let mut tx_list = Vec::with_capacity(tx_range_size);
2738
2739 for channel in channels {
2741 while let Ok(tx) = channel.recv() {
2742 let (tx_hash, tx_id) = tx.map_err(|boxed| *boxed)?;
2743 tx_list.push((tx_hash, tx_id));
2744 }
2745 }
2746
2747 Ok(tx_list)
2748 }
2749}
2750
2751impl<N: NodePrimitives<SignedTx: Decompress + SignedTransaction>> TransactionsProvider
2752 for StaticFileProvider<N>
2753{
2754 type Transaction = N::SignedTx;
2755
2756 fn transaction_id(&self, tx_hash: TxHash) -> ProviderResult<Option<TxNumber>> {
2757 self.find_static_file(StaticFileSegment::Transactions, |jar_provider| {
2758 let mut cursor = jar_provider.cursor()?;
2759 if cursor
2760 .get_one::<TransactionMask<Self::Transaction>>((&tx_hash).into())?
2761 .and_then(|tx| (tx.trie_hash() == tx_hash).then_some(tx))
2762 .is_some()
2763 {
2764 Ok(cursor.number())
2765 } else {
2766 Ok(None)
2767 }
2768 })
2769 }
2770
2771 fn transaction_by_id(&self, num: TxNumber) -> ProviderResult<Option<Self::Transaction>> {
2772 self.get_segment_provider_for_transaction(StaticFileSegment::Transactions, num, None)
2773 .and_then(|provider| provider.transaction_by_id(num))
2774 .or_else(|err| {
2775 if let ProviderError::MissingStaticFileTx(_, _) = err {
2776 Ok(None)
2777 } else {
2778 Err(err)
2779 }
2780 })
2781 }
2782
2783 fn transaction_by_id_unhashed(
2784 &self,
2785 num: TxNumber,
2786 ) -> ProviderResult<Option<Self::Transaction>> {
2787 self.get_segment_provider_for_transaction(StaticFileSegment::Transactions, num, None)
2788 .and_then(|provider| provider.transaction_by_id_unhashed(num))
2789 .or_else(|err| {
2790 if let ProviderError::MissingStaticFileTx(_, _) = err {
2791 Ok(None)
2792 } else {
2793 Err(err)
2794 }
2795 })
2796 }
2797
2798 fn transaction_by_hash(&self, hash: TxHash) -> ProviderResult<Option<Self::Transaction>> {
2799 self.find_static_file(StaticFileSegment::Transactions, |jar_provider| {
2800 Ok(jar_provider
2801 .cursor()?
2802 .get_one::<TransactionMask<Self::Transaction>>((&hash).into())?
2803 .and_then(|tx| (tx.trie_hash() == hash).then_some(tx)))
2804 })
2805 }
2806
2807 fn transaction_by_hash_with_meta(
2808 &self,
2809 _hash: TxHash,
2810 ) -> ProviderResult<Option<(Self::Transaction, TransactionMeta)>> {
2811 Err(ProviderError::UnsupportedProvider)
2813 }
2814
2815 fn transactions_by_block(
2816 &self,
2817 _block_id: BlockHashOrNumber,
2818 ) -> ProviderResult<Option<Vec<Self::Transaction>>> {
2819 Err(ProviderError::UnsupportedProvider)
2821 }
2822
2823 fn transactions_by_block_range(
2824 &self,
2825 _range: impl RangeBounds<BlockNumber>,
2826 ) -> ProviderResult<Vec<Vec<Self::Transaction>>> {
2827 Err(ProviderError::UnsupportedProvider)
2829 }
2830
2831 fn transactions_by_tx_range(
2832 &self,
2833 range: impl RangeBounds<TxNumber>,
2834 ) -> ProviderResult<Vec<Self::Transaction>> {
2835 self.fetch_range_with_predicate(
2836 StaticFileSegment::Transactions,
2837 to_range(range),
2838 |cursor, number| cursor.get_one::<TransactionMask<Self::Transaction>>(number.into()),
2839 |_| true,
2840 )
2841 }
2842
2843 fn senders_by_tx_range(
2844 &self,
2845 range: impl RangeBounds<TxNumber>,
2846 ) -> ProviderResult<Vec<Address>> {
2847 self.fetch_range_with_predicate(
2848 StaticFileSegment::TransactionSenders,
2849 to_range(range),
2850 |cursor, number| cursor.get_one::<TransactionSenderMask>(number.into()),
2851 |_| true,
2852 )
2853 }
2854
2855 fn transaction_sender(&self, id: TxNumber) -> ProviderResult<Option<Address>> {
2856 self.get_segment_provider_for_transaction(StaticFileSegment::TransactionSenders, id, None)
2857 .and_then(|provider| provider.transaction_sender(id))
2858 .or_else(|err| {
2859 if let ProviderError::MissingStaticFileTx(_, _) = err {
2860 Ok(None)
2861 } else {
2862 Err(err)
2863 }
2864 })
2865 }
2866}
2867
2868impl<N: NodePrimitives> BlockNumReader for StaticFileProvider<N> {
2869 fn chain_info(&self) -> ProviderResult<ChainInfo> {
2870 Err(ProviderError::UnsupportedProvider)
2872 }
2873
2874 fn best_block_number(&self) -> ProviderResult<BlockNumber> {
2875 Err(ProviderError::UnsupportedProvider)
2877 }
2878
2879 fn last_block_number(&self) -> ProviderResult<BlockNumber> {
2880 Ok(self.get_highest_static_file_block(StaticFileSegment::Headers).unwrap_or_default())
2881 }
2882
2883 fn block_number(&self, _hash: B256) -> ProviderResult<Option<BlockNumber>> {
2884 Err(ProviderError::UnsupportedProvider)
2886 }
2887}
2888
2889impl<N: NodePrimitives<SignedTx: Value, Receipt: Value, BlockHeader: Value>> BlockReader
2892 for StaticFileProvider<N>
2893{
2894 type Block = N::Block;
2895
2896 fn find_block_by_hash(
2897 &self,
2898 _hash: B256,
2899 _source: BlockSource,
2900 ) -> ProviderResult<Option<Self::Block>> {
2901 Err(ProviderError::UnsupportedProvider)
2903 }
2904
2905 fn block(&self, _id: BlockHashOrNumber) -> ProviderResult<Option<Self::Block>> {
2906 Err(ProviderError::UnsupportedProvider)
2908 }
2909
2910 fn pending_block(&self) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
2911 Err(ProviderError::UnsupportedProvider)
2913 }
2914
2915 fn pending_block_and_receipts(
2916 &self,
2917 ) -> ProviderResult<Option<(RecoveredBlock<Self::Block>, Vec<Self::Receipt>)>> {
2918 Err(ProviderError::UnsupportedProvider)
2920 }
2921
2922 fn recovered_block(
2923 &self,
2924 _id: BlockHashOrNumber,
2925 _transaction_kind: TransactionVariant,
2926 ) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
2927 Err(ProviderError::UnsupportedProvider)
2929 }
2930
2931 fn sealed_block_with_senders(
2932 &self,
2933 _id: BlockHashOrNumber,
2934 _transaction_kind: TransactionVariant,
2935 ) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
2936 Err(ProviderError::UnsupportedProvider)
2938 }
2939
2940 fn block_range(&self, _range: RangeInclusive<BlockNumber>) -> ProviderResult<Vec<Self::Block>> {
2941 Err(ProviderError::UnsupportedProvider)
2943 }
2944
2945 fn block_with_senders_range(
2946 &self,
2947 _range: RangeInclusive<BlockNumber>,
2948 ) -> ProviderResult<Vec<RecoveredBlock<Self::Block>>> {
2949 Err(ProviderError::UnsupportedProvider)
2950 }
2951
2952 fn recovered_block_range(
2953 &self,
2954 _range: RangeInclusive<BlockNumber>,
2955 ) -> ProviderResult<Vec<RecoveredBlock<Self::Block>>> {
2956 Err(ProviderError::UnsupportedProvider)
2957 }
2958
2959 fn block_by_transaction_id(&self, _id: TxNumber) -> ProviderResult<Option<BlockNumber>> {
2960 Err(ProviderError::UnsupportedProvider)
2961 }
2962}
2963
2964impl<N: NodePrimitives> BlockBodyIndicesProvider for StaticFileProvider<N> {
2965 fn block_body_indices(&self, _num: u64) -> ProviderResult<Option<StoredBlockBodyIndices>> {
2966 Err(ProviderError::UnsupportedProvider)
2967 }
2968
2969 fn block_body_indices_range(
2970 &self,
2971 _range: RangeInclusive<BlockNumber>,
2972 ) -> ProviderResult<Vec<StoredBlockBodyIndices>> {
2973 Err(ProviderError::UnsupportedProvider)
2974 }
2975}
2976
2977impl<N: NodePrimitives> StatsReader for StaticFileProvider<N> {
2978 fn count_entries<T: Table>(&self) -> ProviderResult<usize> {
2979 match T::NAME {
2980 tables::CanonicalHeaders::NAME |
2981 tables::Headers::<Header>::NAME |
2982 tables::HeaderTerminalDifficulties::NAME => Ok(self
2983 .get_highest_static_file_block(StaticFileSegment::Headers)
2984 .map(|block| block + 1)
2985 .unwrap_or_default()
2986 as usize),
2987 tables::Receipts::<Receipt>::NAME => Ok(self
2988 .get_highest_static_file_tx(StaticFileSegment::Receipts)
2989 .map(|receipts| receipts + 1)
2990 .unwrap_or_default() as usize),
2991 tables::Transactions::<TransactionSigned>::NAME => Ok(self
2992 .get_highest_static_file_tx(StaticFileSegment::Transactions)
2993 .map(|txs| txs + 1)
2994 .unwrap_or_default()
2995 as usize),
2996 tables::TransactionSenders::NAME => Ok(self
2997 .get_highest_static_file_tx(StaticFileSegment::TransactionSenders)
2998 .map(|txs| txs + 1)
2999 .unwrap_or_default() as usize),
3000 _ => Err(ProviderError::UnsupportedProvider),
3001 }
3002 }
3003}
3004
3005#[inline]
3007fn calculate_hash<T>(
3008 entry: (TxNumber, T),
3009 rlp_buf: &mut Vec<u8>,
3010) -> Result<(B256, TxNumber), Box<ProviderError>>
3011where
3012 T: Encodable2718,
3013{
3014 let (tx_id, tx) = entry;
3015 tx.encode_2718(rlp_buf);
3016 Ok((keccak256(rlp_buf), tx_id))
3017}
3018
3019#[cfg(test)]
3020mod tests {
3021 use std::collections::BTreeMap;
3022
3023 use reth_chain_state::EthPrimitives;
3024 use reth_db::test_utils::create_test_static_files_dir;
3025 use reth_static_file_types::{SegmentRangeInclusive, StaticFileSegment};
3026
3027 use crate::{providers::StaticFileProvider, StaticFileProviderBuilder};
3028
3029 #[test]
3030 fn test_find_fixed_range_with_block_index() -> eyre::Result<()> {
3031 let (static_dir, _) = create_test_static_files_dir();
3032 let sf_rw: StaticFileProvider<EthPrimitives> =
3033 StaticFileProviderBuilder::read_write(&static_dir).with_blocks_per_file(100).build()?;
3034
3035 let segment = StaticFileSegment::Headers;
3036
3037 assert_eq!(
3039 sf_rw.find_fixed_range_with_block_index(segment, None, 0),
3040 SegmentRangeInclusive::new(0, 99)
3041 );
3042 assert_eq!(
3043 sf_rw.find_fixed_range_with_block_index(segment, None, 250),
3044 SegmentRangeInclusive::new(200, 299)
3045 );
3046
3047 assert_eq!(
3049 sf_rw.find_fixed_range_with_block_index(segment, Some(&BTreeMap::new()), 150),
3050 SegmentRangeInclusive::new(100, 199)
3051 );
3052
3053 let block_index = BTreeMap::from_iter([
3055 (99, SegmentRangeInclusive::new(0, 99)),
3056 (199, SegmentRangeInclusive::new(100, 199)),
3057 (299, SegmentRangeInclusive::new(200, 299)),
3058 ]);
3059
3060 assert_eq!(
3062 sf_rw.find_fixed_range_with_block_index(segment, Some(&block_index), 0),
3063 SegmentRangeInclusive::new(0, 99)
3064 );
3065 assert_eq!(
3066 sf_rw.find_fixed_range_with_block_index(segment, Some(&block_index), 50),
3067 SegmentRangeInclusive::new(0, 99)
3068 );
3069 assert_eq!(
3070 sf_rw.find_fixed_range_with_block_index(segment, Some(&block_index), 99),
3071 SegmentRangeInclusive::new(0, 99)
3072 );
3073 assert_eq!(
3074 sf_rw.find_fixed_range_with_block_index(segment, Some(&block_index), 100),
3075 SegmentRangeInclusive::new(100, 199)
3076 );
3077 assert_eq!(
3078 sf_rw.find_fixed_range_with_block_index(segment, Some(&block_index), 150),
3079 SegmentRangeInclusive::new(100, 199)
3080 );
3081 assert_eq!(
3082 sf_rw.find_fixed_range_with_block_index(segment, Some(&block_index), 199),
3083 SegmentRangeInclusive::new(100, 199)
3084 );
3085
3086 assert_eq!(
3089 sf_rw.find_fixed_range_with_block_index(segment, Some(&block_index), 300),
3090 SegmentRangeInclusive::new(300, 399)
3091 );
3092 assert_eq!(
3093 sf_rw.find_fixed_range_with_block_index(segment, Some(&block_index), 350),
3094 SegmentRangeInclusive::new(300, 399)
3095 );
3096
3097 assert_eq!(
3099 sf_rw.find_fixed_range_with_block_index(segment, Some(&block_index), 500),
3100 SegmentRangeInclusive::new(500, 599)
3101 );
3102
3103 assert_eq!(
3105 sf_rw.find_fixed_range_with_block_index(segment, Some(&block_index), 1000),
3106 SegmentRangeInclusive::new(1000, 1099)
3107 );
3108
3109 let mixed_size_index = BTreeMap::from_iter([
3112 (49, SegmentRangeInclusive::new(0, 49)), (149, SegmentRangeInclusive::new(50, 149)), (349, SegmentRangeInclusive::new(150, 349)), ]);
3116
3117 assert_eq!(
3119 sf_rw.find_fixed_range_with_block_index(segment, Some(&mixed_size_index), 25),
3120 SegmentRangeInclusive::new(0, 49)
3121 );
3122 assert_eq!(
3123 sf_rw.find_fixed_range_with_block_index(segment, Some(&mixed_size_index), 100),
3124 SegmentRangeInclusive::new(50, 149)
3125 );
3126 assert_eq!(
3127 sf_rw.find_fixed_range_with_block_index(segment, Some(&mixed_size_index), 200),
3128 SegmentRangeInclusive::new(150, 349)
3129 );
3130
3131 assert_eq!(
3134 sf_rw.find_fixed_range_with_block_index(segment, Some(&mixed_size_index), 350),
3135 SegmentRangeInclusive::new(350, 449)
3136 );
3137 assert_eq!(
3138 sf_rw.find_fixed_range_with_block_index(segment, Some(&mixed_size_index), 450),
3139 SegmentRangeInclusive::new(450, 549)
3140 );
3141 assert_eq!(
3142 sf_rw.find_fixed_range_with_block_index(segment, Some(&mixed_size_index), 550),
3143 SegmentRangeInclusive::new(550, 649)
3144 );
3145
3146 Ok(())
3147 }
3148}