1use super::{
2 metrics::StaticFileProviderMetrics, writer::StaticFileWriters, LoadedJar,
3 StaticFileJarProvider, StaticFileProviderRW, StaticFileProviderRWRefMut,
4};
5use crate::{
6 to_range, BlockHashReader, BlockNumReader, BlockReader, BlockSource, EitherWriter,
7 EitherWriterDestination, HeaderProvider, ReceiptProvider, StageCheckpointReader, StatsReader,
8 TransactionVariant, TransactionsProvider, TransactionsProviderExt,
9};
10use alloy_consensus::{transaction::TransactionMeta, Header};
11use alloy_eips::{eip2718::Encodable2718, BlockHashOrNumber};
12use alloy_primitives::{b256, keccak256, Address, BlockHash, BlockNumber, TxHash, TxNumber, B256};
13use dashmap::DashMap;
14use notify::{RecommendedWatcher, RecursiveMode, Watcher};
15use parking_lot::RwLock;
16use reth_chainspec::{ChainInfo, ChainSpecProvider, EthChainSpec, NamedChain};
17use reth_db::{
18 lockfile::StorageLock,
19 static_file::{
20 iter_static_files, BlockHashMask, HeaderMask, HeaderWithHashMask, ReceiptMask,
21 StaticFileCursor, TransactionMask, TransactionSenderMask,
22 },
23};
24use reth_db_api::{
25 cursor::DbCursorRO,
26 models::StoredBlockBodyIndices,
27 table::{Decompress, Table, Value},
28 tables,
29 transaction::DbTx,
30};
31use reth_ethereum_primitives::{Receipt, TransactionSigned};
32use reth_nippy_jar::{NippyJar, NippyJarChecker, CONFIG_FILE_EXTENSION};
33use reth_node_types::NodePrimitives;
34use reth_primitives_traits::{RecoveredBlock, SealedHeader, SignedTransaction};
35use reth_stages_types::{PipelineTarget, StageId};
36use reth_static_file_types::{
37 find_fixed_range, HighestStaticFiles, SegmentHeader, SegmentRangeInclusive, StaticFileSegment,
38 DEFAULT_BLOCKS_PER_STATIC_FILE,
39};
40use reth_storage_api::{BlockBodyIndicesProvider, DBProvider, StorageSettingsCache};
41use reth_storage_errors::provider::{ProviderError, ProviderResult};
42use std::{
43 collections::{BTreeMap, HashMap},
44 fmt::Debug,
45 ops::{Deref, Range, RangeBounds, RangeInclusive},
46 path::{Path, PathBuf},
47 sync::{atomic::AtomicU64, mpsc, Arc},
48};
49use tracing::{debug, info, trace, warn};
50
51type SegmentRanges = BTreeMap<u64, SegmentRangeInclusive>;
54
55#[derive(Debug, Default, PartialEq, Eq)]
57pub enum StaticFileAccess {
58 #[default]
60 RO,
61 RW,
63}
64
65impl StaticFileAccess {
66 pub const fn is_read_only(&self) -> bool {
68 matches!(self, Self::RO)
69 }
70
71 pub const fn is_read_write(&self) -> bool {
73 matches!(self, Self::RW)
74 }
75}
76
77#[derive(Debug)]
86pub struct StaticFileProvider<N>(pub(crate) Arc<StaticFileProviderInner<N>>);
87
88impl<N> Clone for StaticFileProvider<N> {
89 fn clone(&self) -> Self {
90 Self(self.0.clone())
91 }
92}
93
94#[derive(Debug)]
96pub struct StaticFileProviderBuilder<N> {
97 inner: StaticFileProviderInner<N>,
98}
99
100impl<N: NodePrimitives> StaticFileProviderBuilder<N> {
101 pub fn read_write(path: impl AsRef<Path>) -> ProviderResult<Self> {
103 StaticFileProviderInner::new(path, StaticFileAccess::RW).map(|inner| Self { inner })
104 }
105
106 pub fn read_only(path: impl AsRef<Path>) -> ProviderResult<Self> {
108 StaticFileProviderInner::new(path, StaticFileAccess::RO).map(|inner| Self { inner })
109 }
110
111 pub fn with_blocks_per_file_for_segments(
123 mut self,
124 segments: HashMap<StaticFileSegment, u64>,
125 ) -> Self {
126 self.inner.blocks_per_file.extend(segments);
127 self
128 }
129
130 pub fn with_blocks_per_file(mut self, blocks_per_file: u64) -> Self {
132 for segment in StaticFileSegment::iter() {
133 self.inner.blocks_per_file.insert(segment, blocks_per_file);
134 }
135 self
136 }
137
138 pub fn with_blocks_per_file_for_segment(
140 mut self,
141 segment: StaticFileSegment,
142 blocks_per_file: u64,
143 ) -> Self {
144 self.inner.blocks_per_file.insert(segment, blocks_per_file);
145 self
146 }
147
148 pub fn with_metrics(mut self) -> Self {
150 self.inner.metrics = Some(Arc::new(StaticFileProviderMetrics::default()));
151 self
152 }
153
154 pub const fn with_genesis_block_number(mut self, genesis_block_number: u64) -> Self {
167 self.inner.genesis_block_number = genesis_block_number;
168 self
169 }
170
171 pub fn build(self) -> ProviderResult<StaticFileProvider<N>> {
173 let provider = StaticFileProvider(Arc::new(self.inner));
174 provider.initialize_index()?;
175 Ok(provider)
176 }
177}
178
179impl<N: NodePrimitives> StaticFileProvider<N> {
180 pub fn read_only(path: impl AsRef<Path>, watch_directory: bool) -> ProviderResult<Self> {
190 let provider = StaticFileProviderBuilder::read_only(path)?.build()?;
191
192 if watch_directory {
193 provider.watch_directory();
194 }
195
196 Ok(provider)
197 }
198
199 pub fn read_write(path: impl AsRef<Path>) -> ProviderResult<Self> {
201 StaticFileProviderBuilder::read_write(path)?.build()
202 }
203
204 pub fn watch_directory(&self) {
210 let provider = self.clone();
211 std::thread::spawn(move || {
212 let (tx, rx) = std::sync::mpsc::channel();
213 let mut watcher = RecommendedWatcher::new(
214 move |res| tx.send(res).unwrap(),
215 notify::Config::default(),
216 )
217 .expect("failed to create watcher");
218
219 watcher
220 .watch(&provider.path, RecursiveMode::NonRecursive)
221 .expect("failed to watch path");
222
223 let mut last_event_timestamp = None;
225
226 while let Ok(res) = rx.recv() {
227 match res {
228 Ok(event) => {
229 if !matches!(
231 event.kind,
232 notify::EventKind::Modify(_) |
233 notify::EventKind::Create(_) |
234 notify::EventKind::Remove(_)
235 ) {
236 continue
237 }
238
239 for segment in event.paths {
244 if segment
246 .extension()
247 .is_none_or(|s| s.to_str() != Some(CONFIG_FILE_EXTENSION))
248 {
249 continue
250 }
251
252 if StaticFileSegment::parse_filename(
254 &segment.file_stem().expect("qed").to_string_lossy(),
255 )
256 .is_none()
257 {
258 continue
259 }
260
261 if let Ok(current_modified_timestamp) =
264 std::fs::metadata(&segment).and_then(|m| m.modified())
265 {
266 if last_event_timestamp.is_some_and(|last_timestamp| {
267 last_timestamp >= current_modified_timestamp
268 }) {
269 continue
270 }
271 last_event_timestamp = Some(current_modified_timestamp);
272 }
273
274 info!(target: "providers::static_file", updated_file = ?segment.file_stem(), "re-initializing static file provider index");
275 if let Err(err) = provider.initialize_index() {
276 warn!(target: "providers::static_file", "failed to re-initialize index: {err}");
277 }
278 break
279 }
280 }
281
282 Err(err) => warn!(target: "providers::watcher", "watch error: {err:?}"),
283 }
284 }
285 });
286 }
287}
288
289impl<N: NodePrimitives> Deref for StaticFileProvider<N> {
290 type Target = StaticFileProviderInner<N>;
291
292 fn deref(&self) -> &Self::Target {
293 &self.0
294 }
295}
296
297#[derive(Debug)]
299pub struct StaticFileProviderInner<N> {
300 map: DashMap<(BlockNumber, StaticFileSegment), LoadedJar>,
303 indexes: RwLock<HashMap<StaticFileSegment, StaticFileSegmentIndex>>,
305 earliest_history_height: AtomicU64,
316 path: PathBuf,
318 writers: StaticFileWriters<N>,
320 metrics: Option<Arc<StaticFileProviderMetrics>>,
322 access: StaticFileAccess,
324 blocks_per_file: HashMap<StaticFileSegment, u64>,
326 _lock_file: Option<StorageLock>,
328 genesis_block_number: u64,
330}
331
332impl<N: NodePrimitives> StaticFileProviderInner<N> {
333 fn new(path: impl AsRef<Path>, access: StaticFileAccess) -> ProviderResult<Self> {
335 let _lock_file = if access.is_read_write() {
336 StorageLock::try_acquire(path.as_ref()).map_err(ProviderError::other)?.into()
337 } else {
338 None
339 };
340
341 let mut blocks_per_file = HashMap::new();
342 for segment in StaticFileSegment::iter() {
343 blocks_per_file.insert(segment, DEFAULT_BLOCKS_PER_STATIC_FILE);
344 }
345
346 let provider = Self {
347 map: Default::default(),
348 indexes: Default::default(),
349 writers: Default::default(),
350 earliest_history_height: Default::default(),
351 path: path.as_ref().to_path_buf(),
352 metrics: None,
353 access,
354 blocks_per_file,
355 _lock_file,
356 genesis_block_number: 0,
357 };
358
359 Ok(provider)
360 }
361
362 pub const fn is_read_only(&self) -> bool {
363 self.access.is_read_only()
364 }
365
366 pub fn find_fixed_range_with_block_index(
375 &self,
376 segment: StaticFileSegment,
377 block_index: Option<&SegmentRanges>,
378 block: BlockNumber,
379 ) -> SegmentRangeInclusive {
380 let blocks_per_file =
381 self.blocks_per_file.get(&segment).copied().unwrap_or(DEFAULT_BLOCKS_PER_STATIC_FILE);
382
383 if let Some(block_index) = block_index {
384 if let Some((_, range)) = block_index.iter().find(|(max_block, _)| block <= **max_block)
386 {
387 return *range
389 } else if let Some((_, range)) = block_index.last_key_value() {
390 let blocks_after_last_range = block - range.end();
396 let segments_to_skip = (blocks_after_last_range - 1) / blocks_per_file;
397 let start = range.end() + 1 + segments_to_skip * blocks_per_file;
398 return SegmentRangeInclusive::new(start, start + blocks_per_file - 1)
399 }
400 }
401 find_fixed_range(block, blocks_per_file)
404 }
405
406 pub fn find_fixed_range(
419 &self,
420 segment: StaticFileSegment,
421 block: BlockNumber,
422 ) -> SegmentRangeInclusive {
423 self.find_fixed_range_with_block_index(
424 segment,
425 self.indexes
426 .read()
427 .get(&segment)
428 .map(|index| &index.expected_block_ranges_by_max_block),
429 block,
430 )
431 }
432
433 pub const fn genesis_block_number(&self) -> u64 {
435 self.genesis_block_number
436 }
437}
438
439impl<N: NodePrimitives> StaticFileProvider<N> {
440 pub fn report_metrics(&self) -> ProviderResult<()> {
442 let Some(metrics) = &self.metrics else { return Ok(()) };
443
444 let static_files = iter_static_files(&self.path).map_err(ProviderError::other)?;
445 for (segment, headers) in static_files {
446 let mut entries = 0;
447 let mut size = 0;
448
449 for (block_range, _) in &headers {
450 let fixed_block_range = self.find_fixed_range(segment, block_range.start());
451 let jar_provider = self
452 .get_segment_provider_for_range(segment, || Some(fixed_block_range), None)?
453 .ok_or_else(|| {
454 ProviderError::MissingStaticFileBlock(segment, block_range.start())
455 })?;
456
457 entries += jar_provider.rows();
458
459 let data_path = jar_provider.data_path().to_path_buf();
460 let index_path = jar_provider.index_path();
461 let offsets_path = jar_provider.offsets_path();
462 let config_path = jar_provider.config_path();
463
464 drop(jar_provider);
466
467 let data_size = reth_fs_util::metadata(data_path)
468 .map(|metadata| metadata.len())
469 .unwrap_or_default();
470 let index_size = reth_fs_util::metadata(index_path)
471 .map(|metadata| metadata.len())
472 .unwrap_or_default();
473 let offsets_size = reth_fs_util::metadata(offsets_path)
474 .map(|metadata| metadata.len())
475 .unwrap_or_default();
476 let config_size = reth_fs_util::metadata(config_path)
477 .map(|metadata| metadata.len())
478 .unwrap_or_default();
479
480 size += data_size + index_size + offsets_size + config_size;
481 }
482
483 metrics.record_segment(segment, size, headers.len(), entries);
484 }
485
486 Ok(())
487 }
488
489 pub fn get_segment_provider(
492 &self,
493 segment: StaticFileSegment,
494 number: u64,
495 ) -> ProviderResult<StaticFileJarProvider<'_, N>> {
496 if segment.is_block_based() {
497 self.get_segment_provider_for_block(segment, number, None)
498 } else {
499 self.get_segment_provider_for_transaction(segment, number, None)
500 }
501 }
502
503 pub fn get_maybe_segment_provider(
508 &self,
509 segment: StaticFileSegment,
510 number: u64,
511 ) -> ProviderResult<Option<StaticFileJarProvider<'_, N>>> {
512 let provider = if segment.is_block_based() {
513 self.get_segment_provider_for_block(segment, number, None)
514 } else {
515 self.get_segment_provider_for_transaction(segment, number, None)
516 };
517
518 match provider {
519 Ok(provider) => Ok(Some(provider)),
520 Err(
521 ProviderError::MissingStaticFileBlock(_, _) |
522 ProviderError::MissingStaticFileTx(_, _),
523 ) => Ok(None),
524 Err(err) => Err(err),
525 }
526 }
527
528 pub fn get_segment_provider_for_block(
530 &self,
531 segment: StaticFileSegment,
532 block: BlockNumber,
533 path: Option<&Path>,
534 ) -> ProviderResult<StaticFileJarProvider<'_, N>> {
535 self.get_segment_provider_for_range(
536 segment,
537 || self.get_segment_ranges_from_block(segment, block),
538 path,
539 )?
540 .ok_or(ProviderError::MissingStaticFileBlock(segment, block))
541 }
542
543 pub fn get_segment_provider_for_transaction(
545 &self,
546 segment: StaticFileSegment,
547 tx: TxNumber,
548 path: Option<&Path>,
549 ) -> ProviderResult<StaticFileJarProvider<'_, N>> {
550 self.get_segment_provider_for_range(
551 segment,
552 || self.get_segment_ranges_from_transaction(segment, tx),
553 path,
554 )?
555 .ok_or(ProviderError::MissingStaticFileTx(segment, tx))
556 }
557
558 pub fn get_segment_provider_for_range(
562 &self,
563 segment: StaticFileSegment,
564 fn_range: impl Fn() -> Option<SegmentRangeInclusive>,
565 path: Option<&Path>,
566 ) -> ProviderResult<Option<StaticFileJarProvider<'_, N>>> {
567 let block_range = match path {
570 Some(path) => StaticFileSegment::parse_filename(
571 &path
572 .file_name()
573 .ok_or_else(|| {
574 ProviderError::MissingStaticFileSegmentPath(segment, path.to_path_buf())
575 })?
576 .to_string_lossy(),
577 )
578 .and_then(|(parsed_segment, block_range)| {
579 if parsed_segment == segment {
580 return Some(block_range)
581 }
582 None
583 }),
584 None => fn_range(),
585 };
586
587 if let Some(block_range) = block_range {
589 return Ok(Some(self.get_or_create_jar_provider(segment, &block_range)?))
590 }
591
592 Ok(None)
593 }
594
595 pub fn get_segment_provider_for_path(
597 &self,
598 path: &Path,
599 ) -> ProviderResult<Option<StaticFileJarProvider<'_, N>>> {
600 StaticFileSegment::parse_filename(
601 &path
602 .file_name()
603 .ok_or_else(|| ProviderError::MissingStaticFilePath(path.to_path_buf()))?
604 .to_string_lossy(),
605 )
606 .map(|(segment, block_range)| self.get_or_create_jar_provider(segment, &block_range))
607 .transpose()
608 }
609
610 pub fn remove_cached_provider(
614 &self,
615 segment: StaticFileSegment,
616 fixed_block_range_end: BlockNumber,
617 ) {
618 self.map.remove(&(fixed_block_range_end, segment));
619 }
620
621 pub fn delete_segment_below_block(
638 &self,
639 segment: StaticFileSegment,
640 block: BlockNumber,
641 ) -> ProviderResult<Vec<SegmentHeader>> {
642 if block == 0 {
644 return Ok(Vec::new())
645 }
646
647 let highest_block = self.get_highest_static_file_block(segment);
648 let mut deleted_headers = Vec::new();
649
650 loop {
651 let Some(block_height) = self.get_lowest_range_end(segment) else {
652 return Ok(deleted_headers)
653 };
654
655 if block_height >= block || Some(block_height) == highest_block {
657 return Ok(deleted_headers)
658 }
659
660 debug!(
661 target: "provider::static_file",
662 ?segment,
663 ?block_height,
664 "Deleting static file below block"
665 );
666
667 let header = self.delete_jar(segment, block_height).inspect_err(|err| {
670 warn!( target: "provider::static_file", ?segment, %block_height, ?err, "Failed to delete static file below block")
671 })?;
672
673 deleted_headers.push(header);
674 }
675 }
676
677 pub fn delete_jar(
685 &self,
686 segment: StaticFileSegment,
687 block: BlockNumber,
688 ) -> ProviderResult<SegmentHeader> {
689 let fixed_block_range = self.find_fixed_range(segment, block);
690 let key = (fixed_block_range.end(), segment);
691 let jar = if let Some((_, jar)) = self.map.remove(&key) {
692 jar.jar
693 } else {
694 let file = self.path.join(segment.filename(&fixed_block_range));
695 debug!(
696 target: "provider::static_file",
697 ?file,
698 ?fixed_block_range,
699 ?block,
700 "Loading static file jar for deletion"
701 );
702 NippyJar::<SegmentHeader>::load(&file).map_err(ProviderError::other)?
703 };
704
705 let header = *jar.user_header();
706 jar.delete().map_err(ProviderError::other)?;
707
708 self.initialize_index()?;
711
712 Ok(header)
713 }
714
715 fn get_or_create_jar_provider(
719 &self,
720 segment: StaticFileSegment,
721 fixed_block_range: &SegmentRangeInclusive,
722 ) -> ProviderResult<StaticFileJarProvider<'_, N>> {
723 let key = (fixed_block_range.end(), segment);
724
725 trace!(target: "provider::static_file", ?segment, ?fixed_block_range, "Getting provider");
727 let mut provider: StaticFileJarProvider<'_, N> = if let Some(jar) = self.map.get(&key) {
728 trace!(target: "provider::static_file", ?segment, ?fixed_block_range, "Jar found in cache");
729 jar.into()
730 } else {
731 trace!(target: "provider::static_file", ?segment, ?fixed_block_range, "Creating jar from scratch");
732 let path = self.path.join(segment.filename(fixed_block_range));
733 let jar = NippyJar::load(&path).map_err(ProviderError::other)?;
734 self.map.entry(key).insert(LoadedJar::new(jar)?).downgrade().into()
735 };
736
737 if let Some(metrics) = &self.metrics {
738 provider = provider.with_metrics(metrics.clone());
739 }
740 Ok(provider)
741 }
742
743 fn get_segment_ranges_from_block(
746 &self,
747 segment: StaticFileSegment,
748 block: u64,
749 ) -> Option<SegmentRangeInclusive> {
750 let indexes = self.indexes.read();
751 let index = indexes.get(&segment)?;
752
753 (index.max_block >= block).then(|| {
754 self.find_fixed_range_with_block_index(
755 segment,
756 Some(&index.expected_block_ranges_by_max_block),
757 block,
758 )
759 })
760 }
761
762 fn get_segment_ranges_from_transaction(
765 &self,
766 segment: StaticFileSegment,
767 tx: u64,
768 ) -> Option<SegmentRangeInclusive> {
769 let indexes = self.indexes.read();
770 let index = indexes.get(&segment)?;
771 let available_block_ranges_by_max_tx = index.available_block_ranges_by_max_tx.as_ref()?;
772
773 let mut static_files_rev_iter = available_block_ranges_by_max_tx.iter().rev().peekable();
776
777 while let Some((tx_end, block_range)) = static_files_rev_iter.next() {
778 if tx > *tx_end {
779 return None
781 }
782 let tx_start = static_files_rev_iter.peek().map(|(tx_end, _)| *tx_end + 1).unwrap_or(0);
783 if tx_start <= tx {
784 return Some(self.find_fixed_range_with_block_index(
785 segment,
786 Some(&index.expected_block_ranges_by_max_block),
787 block_range.end(),
788 ))
789 }
790 }
791 None
792 }
793
794 pub fn update_index(
801 &self,
802 segment: StaticFileSegment,
803 segment_max_block: Option<BlockNumber>,
804 ) -> ProviderResult<()> {
805 debug!(
806 target: "provider::static_file",
807 ?segment,
808 ?segment_max_block,
809 "Updating provider index"
810 );
811 let mut indexes = self.indexes.write();
812
813 match segment_max_block {
814 Some(segment_max_block) => {
815 let fixed_range = self.find_fixed_range_with_block_index(
816 segment,
817 indexes.get(&segment).map(|index| &index.expected_block_ranges_by_max_block),
818 segment_max_block,
819 );
820
821 let jar = NippyJar::<SegmentHeader>::load(
822 &self.path.join(segment.filename(&fixed_range)),
823 )
824 .map_err(ProviderError::other)?;
825
826 let index = indexes
827 .entry(segment)
828 .and_modify(|index| {
829 index.max_block = segment_max_block;
831
832 index
836 .expected_block_ranges_by_max_block
837 .retain(|_, block_range| block_range.start() < fixed_range.start());
838 index
840 .expected_block_ranges_by_max_block
841 .insert(fixed_range.end(), fixed_range);
842 })
843 .or_insert_with(|| StaticFileSegmentIndex {
844 min_block_range: None,
845 max_block: segment_max_block,
846 expected_block_ranges_by_max_block: BTreeMap::from([(
847 fixed_range.end(),
848 fixed_range,
849 )]),
850 available_block_ranges_by_max_tx: None,
851 });
852
853 if let Some(current_block_range) = jar.user_header().block_range() {
869 if let Some(min_block_range) = index.min_block_range.as_mut() {
870 if current_block_range.start() == min_block_range.start() {
873 *min_block_range = current_block_range;
874 }
875 } else {
876 index.min_block_range = Some(current_block_range);
877 }
878 }
879
880 if let Some(tx_range) = jar.user_header().tx_range() {
883 if let Some(current_block_range) = jar.user_header().block_range() {
886 let tx_end = tx_range.end();
887
888 if let Some(index) = index.available_block_ranges_by_max_tx.as_mut() {
897 index
898 .retain(|_, block_range| block_range.start() < fixed_range.start());
899 index.insert(tx_end, current_block_range);
900 } else {
901 index.available_block_ranges_by_max_tx =
902 Some(BTreeMap::from([(tx_end, current_block_range)]));
903 }
904 }
905 } else if segment.is_tx_based() {
906 if let Some(index) = index.available_block_ranges_by_max_tx.as_mut() {
910 index.retain(|_, block_range| block_range.start() < fixed_range.start());
911 }
912
913 index.available_block_ranges_by_max_tx.take_if(|index| index.is_empty());
915 }
916
917 debug!(target: "provider::static_file", ?segment, "Inserting updated jar into cache");
919 self.map.insert((fixed_range.end(), segment), LoadedJar::new(jar)?);
920
921 debug!(target: "provider::static_file", ?segment, "Cleaning up jar map");
923 self.map.retain(|(end, seg), _| !(*seg == segment && *end > fixed_range.end()));
924 }
925 None => {
926 debug!(target: "provider::static_file", ?segment, "Removing segment from index");
927 indexes.remove(&segment);
928 }
929 };
930
931 debug!(target: "provider::static_file", ?segment, "Updated provider index");
932 Ok(())
933 }
934
935 pub fn initialize_index(&self) -> ProviderResult<()> {
937 let mut indexes = self.indexes.write();
938 indexes.clear();
939
940 for (segment, headers) in iter_static_files(&self.path).map_err(ProviderError::other)? {
941 let min_block_range = Some(headers.first().expect("headers are not empty").0);
946 let max_block = headers.last().expect("headers are not empty").0.end();
947
948 let mut expected_block_ranges_by_max_block = BTreeMap::default();
949 let mut available_block_ranges_by_max_tx = None;
950
951 for (block_range, header) in headers {
952 expected_block_ranges_by_max_block
954 .insert(header.expected_block_end(), header.expected_block_range());
955
956 if let Some(tx_range) = header.tx_range() {
958 let tx_end = tx_range.end();
959
960 available_block_ranges_by_max_tx
961 .get_or_insert_with(BTreeMap::default)
962 .insert(tx_end, block_range);
963 }
964 }
965
966 indexes.insert(
967 segment,
968 StaticFileSegmentIndex {
969 min_block_range,
970 max_block,
971 expected_block_ranges_by_max_block,
972 available_block_ranges_by_max_tx,
973 },
974 );
975 }
976
977 self.map.clear();
979
980 if let Some(lowest_range) =
982 indexes.get(&StaticFileSegment::Transactions).and_then(|index| index.min_block_range)
983 {
984 self.earliest_history_height
986 .store(lowest_range.start(), std::sync::atomic::Ordering::Relaxed);
987 }
988
989 Ok(())
990 }
991
992 pub fn check_consistency<Provider>(
1016 &self,
1017 provider: &Provider,
1018 ) -> ProviderResult<Option<PipelineTarget>>
1019 where
1020 Provider: DBProvider
1021 + BlockReader
1022 + StageCheckpointReader
1023 + ChainSpecProvider
1024 + StorageSettingsCache,
1025 N: NodePrimitives<Receipt: Value, BlockHeader: Value, SignedTx: Value>,
1026 {
1027 if provider.chain_spec().is_optimism() &&
1034 reth_chainspec::Chain::optimism_mainnet() == provider.chain_spec().chain_id()
1035 {
1036 const OVM_HEADER_1_HASH: B256 =
1038 b256!("0xbee7192e575af30420cae0c7776304ac196077ee72b048970549e4f08e875453");
1039 if provider.block_number(OVM_HEADER_1_HASH)?.is_some() {
1040 info!(target: "reth::cli",
1041 "Skipping storage verification for OP mainnet, expected inconsistency in OVM chain"
1042 );
1043 return Ok(None)
1044 }
1045 }
1046
1047 info!(target: "reth::cli", "Verifying storage consistency.");
1048
1049 let mut unwind_target: Option<BlockNumber> = None;
1050 let mut update_unwind_target = |new_target: BlockNumber| {
1051 if let Some(target) = unwind_target.as_mut() {
1052 *target = (*target).min(new_target);
1053 } else {
1054 unwind_target = Some(new_target);
1055 }
1056 };
1057
1058 for segment in self.segments_to_check(provider) {
1059 debug!(target: "reth::providers::static_file", ?segment, "Checking consistency for segment");
1060
1061 let (initial_highest_block, mut highest_block) = self.maybe_heal_segment(segment)?;
1063
1064 if initial_highest_block != highest_block {
1069 info!(
1070 target: "reth::providers::static_file",
1071 ?initial_highest_block,
1072 unwind_target = highest_block,
1073 ?segment,
1074 "Setting unwind target."
1075 );
1076 update_unwind_target(highest_block.unwrap_or_default());
1077 }
1078
1079 let highest_tx = self.get_highest_static_file_tx(segment);
1085 debug!(target: "reth::providers::static_file", ?segment, ?highest_tx, ?highest_block, "Highest transaction for segment");
1086 if let Some(highest_tx) = highest_tx {
1087 let mut last_block = highest_block.unwrap_or_default();
1088 debug!(target: "reth::providers::static_file", ?segment, last_block, highest_tx, "Verifying last transaction matches last block indices");
1089 loop {
1090 if let Some(indices) = provider.block_body_indices(last_block)? {
1091 debug!(target: "reth::providers::static_file", ?segment, last_block, last_tx_num = indices.last_tx_num(), highest_tx, "Found block body indices");
1092 if indices.last_tx_num() <= highest_tx {
1093 break
1094 }
1095 } else {
1096 debug!(target: "reth::providers::static_file", ?segment, last_block, "Block body indices not found, static files ahead of database");
1097 break
1101 }
1102 if last_block == 0 {
1103 debug!(target: "reth::providers::static_file", ?segment, "Reached block 0 in verification loop");
1104 break
1105 }
1106 last_block -= 1;
1107
1108 info!(
1109 target: "reth::providers::static_file",
1110 highest_block = self.get_highest_static_file_block(segment),
1111 unwind_target = last_block,
1112 ?segment,
1113 "Setting unwind target."
1114 );
1115 highest_block = Some(last_block);
1116 update_unwind_target(last_block);
1117 }
1118 }
1119
1120 debug!(target: "reth::providers::static_file", ?segment, "Ensuring invariants for segment");
1121 if let Some(unwind) = match segment {
1122 StaticFileSegment::Headers => self
1123 .ensure_invariants::<_, tables::Headers<N::BlockHeader>>(
1124 provider,
1125 segment,
1126 highest_block,
1127 highest_block,
1128 )?,
1129 StaticFileSegment::Transactions => self
1130 .ensure_invariants::<_, tables::Transactions<N::SignedTx>>(
1131 provider,
1132 segment,
1133 highest_tx,
1134 highest_block,
1135 )?,
1136 StaticFileSegment::Receipts => self
1137 .ensure_invariants::<_, tables::Receipts<N::Receipt>>(
1138 provider,
1139 segment,
1140 highest_tx,
1141 highest_block,
1142 )?,
1143 StaticFileSegment::TransactionSenders => self
1144 .ensure_invariants::<_, tables::TransactionSenders>(
1145 provider,
1146 segment,
1147 highest_tx,
1148 highest_block,
1149 )?,
1150 } {
1151 debug!(target: "reth::providers::static_file", ?segment, unwind_target=unwind, "Invariants check returned unwind target");
1152 update_unwind_target(unwind);
1153 } else {
1154 debug!(target: "reth::providers::static_file", ?segment, "Invariants check completed, no unwind needed");
1155 }
1156 }
1157
1158 Ok(unwind_target.map(PipelineTarget::Unwind))
1159 }
1160
1161 pub fn check_file_consistency<Provider>(&self, provider: &Provider) -> ProviderResult<()>
1167 where
1168 Provider: DBProvider + ChainSpecProvider + StorageSettingsCache,
1169 {
1170 info!(target: "reth::cli", "Healing static file inconsistencies.");
1171
1172 for segment in self.segments_to_check(provider) {
1173 let _ = self.maybe_heal_segment(segment)?;
1174 }
1175
1176 Ok(())
1177 }
1178
1179 fn segments_to_check<'a, Provider>(
1181 &'a self,
1182 provider: &'a Provider,
1183 ) -> impl Iterator<Item = StaticFileSegment> + 'a
1184 where
1185 Provider: DBProvider + ChainSpecProvider + StorageSettingsCache,
1186 {
1187 StaticFileSegment::iter()
1188 .filter(move |segment| self.should_check_segment(provider, *segment))
1189 }
1190
1191 fn should_check_segment<Provider>(
1192 &self,
1193 provider: &Provider,
1194 segment: StaticFileSegment,
1195 ) -> bool
1196 where
1197 Provider: DBProvider + ChainSpecProvider + StorageSettingsCache,
1198 {
1199 match segment {
1200 StaticFileSegment::Headers | StaticFileSegment::Transactions => true,
1201 StaticFileSegment::Receipts => {
1202 if EitherWriter::receipts_destination(provider).is_database() {
1203 debug!(target: "reth::providers::static_file", ?segment, "Skipping receipts segment: receipts stored in database");
1206 return false
1207 }
1208
1209 if NamedChain::Gnosis == provider.chain_spec().chain_id() ||
1210 NamedChain::Chiado == provider.chain_spec().chain_id()
1211 {
1212 debug!(target: "reth::providers::static_file", ?segment, "Skipping receipts segment: broken historical import for gnosis/chiado");
1216 return false;
1217 }
1218
1219 true
1220 }
1221 StaticFileSegment::TransactionSenders => {
1222 !EitherWriterDestination::senders(provider).is_database()
1223 }
1224 }
1225 }
1226
1227 pub fn check_segment_consistency(&self, segment: StaticFileSegment) -> ProviderResult<()> {
1230 debug!(target: "reth::providers::static_file", ?segment, "Checking segment consistency");
1231 if let Some(latest_block) = self.get_highest_static_file_block(segment) {
1232 let file_path = self
1233 .directory()
1234 .join(segment.filename(&self.find_fixed_range(segment, latest_block)));
1235 debug!(target: "reth::providers::static_file", ?segment, ?file_path, latest_block, "Loading NippyJar for consistency check");
1236
1237 let jar = NippyJar::<SegmentHeader>::load(&file_path).map_err(ProviderError::other)?;
1238 debug!(target: "reth::providers::static_file", ?segment, "NippyJar loaded, checking consistency");
1239
1240 NippyJarChecker::new(jar).check_consistency().map_err(ProviderError::other)?;
1241 debug!(target: "reth::providers::static_file", ?segment, "NippyJar consistency check passed");
1242 } else {
1243 debug!(target: "reth::providers::static_file", ?segment, "No static file block found, skipping consistency check");
1244 }
1245 Ok(())
1246 }
1247
1248 fn maybe_heal_segment(
1264 &self,
1265 segment: StaticFileSegment,
1266 ) -> ProviderResult<(Option<BlockNumber>, Option<BlockNumber>)> {
1267 let initial_highest_block = self.get_highest_static_file_block(segment);
1268 debug!(target: "reth::providers::static_file", ?segment, ?initial_highest_block, "Initial highest block for segment");
1269
1270 if self.access.is_read_only() {
1271 debug!(target: "reth::providers::static_file", ?segment, "Checking segment consistency (read-only)");
1274 self.check_segment_consistency(segment)?;
1275 } else {
1276 debug!(target: "reth::providers::static_file", ?segment, "Fetching latest writer which might heal any potential inconsistency");
1279 self.latest_writer(segment)?;
1280 }
1281
1282 let highest_block = self.get_highest_static_file_block(segment);
1285
1286 Ok((initial_highest_block, highest_block))
1287 }
1288
1289 fn ensure_invariants<Provider, T: Table<Key = u64>>(
1304 &self,
1305 provider: &Provider,
1306 segment: StaticFileSegment,
1307 highest_static_file_entry: Option<u64>,
1308 highest_static_file_block: Option<BlockNumber>,
1309 ) -> ProviderResult<Option<BlockNumber>>
1310 where
1311 Provider: DBProvider + BlockReader + StageCheckpointReader,
1312 {
1313 debug!(target: "reth::providers::static_file", ?segment, ?highest_static_file_entry, ?highest_static_file_block, "Ensuring invariants");
1314 let mut db_cursor = provider.tx_ref().cursor_read::<T>()?;
1315
1316 if let Some((db_first_entry, _)) = db_cursor.first()? {
1317 debug!(target: "reth::providers::static_file", ?segment, db_first_entry, "Found first database entry");
1318 if let (Some(highest_entry), Some(highest_block)) =
1319 (highest_static_file_entry, highest_static_file_block)
1320 {
1321 if !(db_first_entry <= highest_entry || highest_entry + 1 == db_first_entry) {
1325 info!(
1326 target: "reth::providers::static_file",
1327 ?db_first_entry,
1328 ?highest_entry,
1329 unwind_target = highest_block,
1330 ?segment,
1331 "Setting unwind target."
1332 );
1333 return Ok(Some(highest_block))
1334 }
1335 }
1336
1337 if let Some((db_last_entry, _)) = db_cursor.last()? &&
1338 highest_static_file_entry
1339 .is_none_or(|highest_entry| db_last_entry > highest_entry)
1340 {
1341 debug!(target: "reth::providers::static_file", ?segment, db_last_entry, ?highest_static_file_entry, "Database has entries beyond static files, no unwind needed");
1342 return Ok(None)
1343 }
1344 } else {
1345 debug!(target: "reth::providers::static_file", ?segment, "No database entries found");
1346 }
1347
1348 let highest_static_file_entry = highest_static_file_entry.unwrap_or_default();
1349 let highest_static_file_block = highest_static_file_block.unwrap_or_default();
1350
1351 let stage_id = match segment {
1354 StaticFileSegment::Headers => StageId::Headers,
1355 StaticFileSegment::Transactions => StageId::Bodies,
1356 StaticFileSegment::Receipts => StageId::Execution,
1357 StaticFileSegment::TransactionSenders => StageId::SenderRecovery,
1358 };
1359 let checkpoint_block_number =
1360 provider.get_stage_checkpoint(stage_id)?.unwrap_or_default().block_number;
1361 debug!(target: "reth::providers::static_file", ?segment, ?stage_id, checkpoint_block_number, highest_static_file_block, "Retrieved stage checkpoint");
1362
1363 if checkpoint_block_number > highest_static_file_block {
1365 info!(
1366 target: "reth::providers::static_file",
1367 checkpoint_block_number,
1368 unwind_target = highest_static_file_block,
1369 ?segment,
1370 "Setting unwind target."
1371 );
1372 return Ok(Some(highest_static_file_block))
1373 }
1374
1375 if checkpoint_block_number < highest_static_file_block {
1379 info!(
1380 target: "reth::providers",
1381 ?segment,
1382 from = highest_static_file_block,
1383 to = checkpoint_block_number,
1384 "Unwinding static file segment."
1385 );
1386 let mut writer = self.latest_writer(segment)?;
1387 match segment {
1388 StaticFileSegment::Headers => {
1389 let prune_count = highest_static_file_block - checkpoint_block_number;
1390 debug!(target: "reth::providers::static_file", ?segment, prune_count, "Pruning headers");
1391 writer.prune_headers(prune_count)?;
1393 }
1394 StaticFileSegment::Transactions |
1395 StaticFileSegment::Receipts |
1396 StaticFileSegment::TransactionSenders => {
1397 if let Some(block) = provider.block_body_indices(checkpoint_block_number)? {
1398 let number = highest_static_file_entry - block.last_tx_num();
1399 debug!(target: "reth::providers::static_file", ?segment, prune_count = number, checkpoint_block_number, "Pruning transaction based segment");
1400
1401 match segment {
1402 StaticFileSegment::Transactions => {
1403 writer.prune_transactions(number, checkpoint_block_number)?
1404 }
1405 StaticFileSegment::Receipts => {
1406 writer.prune_receipts(number, checkpoint_block_number)?
1407 }
1408 StaticFileSegment::TransactionSenders => {
1409 writer.prune_transaction_senders(number, checkpoint_block_number)?
1410 }
1411 StaticFileSegment::Headers => unreachable!(),
1412 }
1413 } else {
1414 debug!(target: "reth::providers::static_file", ?segment, checkpoint_block_number, "No block body indices found for checkpoint block");
1415 }
1416 }
1417 }
1418 debug!(target: "reth::providers::static_file", ?segment, "Committing writer after pruning");
1419 writer.commit()?;
1420 debug!(target: "reth::providers::static_file", ?segment, "Writer committed successfully");
1421 }
1422
1423 debug!(target: "reth::providers::static_file", ?segment, "Invariants ensured, returning None");
1424 Ok(None)
1425 }
1426
1427 pub fn earliest_history_height(&self) -> BlockNumber {
1435 self.earliest_history_height.load(std::sync::atomic::Ordering::Relaxed)
1436 }
1437
1438 pub fn get_lowest_range(&self, segment: StaticFileSegment) -> Option<SegmentRangeInclusive> {
1442 self.indexes.read().get(&segment).and_then(|index| index.min_block_range)
1443 }
1444
1445 pub fn get_lowest_range_start(&self, segment: StaticFileSegment) -> Option<BlockNumber> {
1451 self.get_lowest_range(segment).map(|range| range.start())
1452 }
1453
1454 pub fn get_lowest_range_end(&self, segment: StaticFileSegment) -> Option<BlockNumber> {
1460 self.get_lowest_range(segment).map(|range| range.end())
1461 }
1462
1463 pub fn get_highest_static_file_block(&self, segment: StaticFileSegment) -> Option<BlockNumber> {
1467 self.indexes.read().get(&segment).map(|index| index.max_block)
1468 }
1469
1470 pub fn get_highest_static_file_tx(&self, segment: StaticFileSegment) -> Option<TxNumber> {
1474 self.indexes
1475 .read()
1476 .get(&segment)
1477 .and_then(|index| index.available_block_ranges_by_max_tx.as_ref())
1478 .and_then(|index| index.last_key_value().map(|(last_tx, _)| *last_tx))
1479 }
1480
1481 pub fn get_highest_static_files(&self) -> HighestStaticFiles {
1483 HighestStaticFiles {
1484 receipts: self.get_highest_static_file_block(StaticFileSegment::Receipts),
1485 }
1486 }
1487
1488 pub fn find_static_file<T>(
1491 &self,
1492 segment: StaticFileSegment,
1493 func: impl Fn(StaticFileJarProvider<'_, N>) -> ProviderResult<Option<T>>,
1494 ) -> ProviderResult<Option<T>> {
1495 if let Some(ranges) =
1496 self.indexes.read().get(&segment).map(|index| &index.expected_block_ranges_by_max_block)
1497 {
1498 for range in ranges.values().rev() {
1500 if let Some(res) = func(self.get_or_create_jar_provider(segment, range)?)? {
1501 return Ok(Some(res))
1502 }
1503 }
1504 }
1505
1506 Ok(None)
1507 }
1508
1509 pub fn fetch_range_with_predicate<T, F, P>(
1515 &self,
1516 segment: StaticFileSegment,
1517 range: Range<u64>,
1518 mut get_fn: F,
1519 mut predicate: P,
1520 ) -> ProviderResult<Vec<T>>
1521 where
1522 F: FnMut(&mut StaticFileCursor<'_>, u64) -> ProviderResult<Option<T>>,
1523 P: FnMut(&T) -> bool,
1524 {
1525 let mut result = Vec::with_capacity((range.end - range.start).min(100) as usize);
1526
1527 macro_rules! get_provider {
1531 ($number:expr) => {{
1532 match self.get_segment_provider(segment, $number) {
1533 Ok(provider) => provider,
1534 Err(
1535 ProviderError::MissingStaticFileBlock(_, _) |
1536 ProviderError::MissingStaticFileTx(_, _),
1537 ) => return Ok(result),
1538 Err(err) => return Err(err),
1539 }
1540 }};
1541 }
1542
1543 let mut provider = get_provider!(range.start);
1544 let mut cursor = provider.cursor()?;
1545
1546 'outer: for number in range {
1548 let mut retrying = false;
1552
1553 'inner: loop {
1555 match get_fn(&mut cursor, number)? {
1556 Some(res) => {
1557 if !predicate(&res) {
1558 break 'outer
1559 }
1560 result.push(res);
1561 break 'inner
1562 }
1563 None => {
1564 if retrying {
1565 return Ok(result)
1566 }
1567 drop(cursor);
1572 drop(provider);
1573 provider = get_provider!(number);
1574 cursor = provider.cursor()?;
1575 retrying = true;
1576 }
1577 }
1578 }
1579 }
1580
1581 result.shrink_to_fit();
1582
1583 Ok(result)
1584 }
1585
1586 pub fn fetch_range_iter<'a, T, F>(
1591 &'a self,
1592 segment: StaticFileSegment,
1593 range: Range<u64>,
1594 get_fn: F,
1595 ) -> ProviderResult<impl Iterator<Item = ProviderResult<Option<T>>> + 'a>
1596 where
1597 F: Fn(&mut StaticFileCursor<'_>, u64) -> ProviderResult<Option<T>> + 'a,
1598 T: std::fmt::Debug,
1599 {
1600 let mut provider = self.get_maybe_segment_provider(segment, range.start)?;
1601 Ok(range.map(move |number| {
1602 match provider
1603 .as_ref()
1604 .map(|provider| get_fn(&mut provider.cursor()?, number))
1605 .and_then(|result| result.transpose())
1606 {
1607 Some(result) => result.map(Some),
1608 None => {
1609 provider.take();
1613 provider = self.get_maybe_segment_provider(segment, number)?;
1614 provider
1615 .as_ref()
1616 .map(|provider| get_fn(&mut provider.cursor()?, number))
1617 .and_then(|result| result.transpose())
1618 .transpose()
1619 }
1620 }
1621 }))
1622 }
1623
1624 pub fn directory(&self) -> &Path {
1626 &self.path
1627 }
1628
1629 pub fn get_with_static_file_or_database<T, FS, FD>(
1639 &self,
1640 segment: StaticFileSegment,
1641 number: u64,
1642 fetch_from_static_file: FS,
1643 fetch_from_database: FD,
1644 ) -> ProviderResult<Option<T>>
1645 where
1646 FS: Fn(&Self) -> ProviderResult<Option<T>>,
1647 FD: Fn() -> ProviderResult<Option<T>>,
1648 {
1649 let static_file_upper_bound = if segment.is_block_based() {
1651 self.get_highest_static_file_block(segment)
1652 } else {
1653 self.get_highest_static_file_tx(segment)
1654 };
1655
1656 if static_file_upper_bound
1657 .is_some_and(|static_file_upper_bound| static_file_upper_bound >= number)
1658 {
1659 return fetch_from_static_file(self)
1660 }
1661 fetch_from_database()
1662 }
1663
1664 pub fn get_range_with_static_file_or_database<T, P, FS, FD>(
1676 &self,
1677 segment: StaticFileSegment,
1678 mut block_or_tx_range: Range<u64>,
1679 fetch_from_static_file: FS,
1680 mut fetch_from_database: FD,
1681 mut predicate: P,
1682 ) -> ProviderResult<Vec<T>>
1683 where
1684 FS: Fn(&Self, Range<u64>, &mut P) -> ProviderResult<Vec<T>>,
1685 FD: FnMut(Range<u64>, P) -> ProviderResult<Vec<T>>,
1686 P: FnMut(&T) -> bool,
1687 {
1688 let mut data = Vec::new();
1689
1690 if let Some(static_file_upper_bound) = if segment.is_block_based() {
1692 self.get_highest_static_file_block(segment)
1693 } else {
1694 self.get_highest_static_file_tx(segment)
1695 } && block_or_tx_range.start <= static_file_upper_bound
1696 {
1697 let end = block_or_tx_range.end.min(static_file_upper_bound + 1);
1698 data.extend(fetch_from_static_file(
1699 self,
1700 block_or_tx_range.start..end,
1701 &mut predicate,
1702 )?);
1703 block_or_tx_range.start = end;
1704 }
1705
1706 if block_or_tx_range.end > block_or_tx_range.start {
1707 data.extend(fetch_from_database(block_or_tx_range, predicate)?)
1708 }
1709
1710 Ok(data)
1711 }
1712
1713 #[cfg(any(test, feature = "test-utils"))]
1715 pub fn path(&self) -> &Path {
1716 &self.path
1717 }
1718
1719 #[cfg(any(test, feature = "test-utils"))]
1721 pub fn tx_index(&self, segment: StaticFileSegment) -> Option<SegmentRanges> {
1722 self.indexes
1723 .read()
1724 .get(&segment)
1725 .and_then(|index| index.available_block_ranges_by_max_tx.as_ref())
1726 .cloned()
1727 }
1728
1729 #[cfg(any(test, feature = "test-utils"))]
1731 pub fn expected_block_index(&self, segment: StaticFileSegment) -> Option<SegmentRanges> {
1732 self.indexes
1733 .read()
1734 .get(&segment)
1735 .map(|index| &index.expected_block_ranges_by_max_block)
1736 .cloned()
1737 }
1738}
1739
1740#[derive(Debug)]
1741struct StaticFileSegmentIndex {
1742 min_block_range: Option<SegmentRangeInclusive>,
1754 max_block: u64,
1756 expected_block_ranges_by_max_block: SegmentRanges,
1762 available_block_ranges_by_max_tx: Option<SegmentRanges>,
1769}
1770
1771pub trait StaticFileWriter {
1773 type Primitives: Send + Sync + 'static;
1775
1776 fn get_writer(
1778 &self,
1779 block: BlockNumber,
1780 segment: StaticFileSegment,
1781 ) -> ProviderResult<StaticFileProviderRWRefMut<'_, Self::Primitives>>;
1782
1783 fn latest_writer(
1786 &self,
1787 segment: StaticFileSegment,
1788 ) -> ProviderResult<StaticFileProviderRWRefMut<'_, Self::Primitives>>;
1789
1790 fn commit(&self) -> ProviderResult<()>;
1792
1793 fn has_unwind_queued(&self) -> bool;
1795}
1796
1797impl<N: NodePrimitives> StaticFileWriter for StaticFileProvider<N> {
1798 type Primitives = N;
1799
1800 fn get_writer(
1801 &self,
1802 block: BlockNumber,
1803 segment: StaticFileSegment,
1804 ) -> ProviderResult<StaticFileProviderRWRefMut<'_, Self::Primitives>> {
1805 if self.access.is_read_only() {
1806 return Err(ProviderError::ReadOnlyStaticFileAccess)
1807 }
1808
1809 trace!(target: "provider::static_file", ?block, ?segment, "Getting static file writer.");
1810 self.writers.get_or_create(segment, || {
1811 StaticFileProviderRW::new(segment, block, Arc::downgrade(&self.0), self.metrics.clone())
1812 })
1813 }
1814
1815 fn latest_writer(
1816 &self,
1817 segment: StaticFileSegment,
1818 ) -> ProviderResult<StaticFileProviderRWRefMut<'_, Self::Primitives>> {
1819 let genesis_number = self.0.as_ref().genesis_block_number();
1820 self.get_writer(
1821 self.get_highest_static_file_block(segment).unwrap_or(genesis_number),
1822 segment,
1823 )
1824 }
1825
1826 fn commit(&self) -> ProviderResult<()> {
1827 self.writers.commit()
1828 }
1829
1830 fn has_unwind_queued(&self) -> bool {
1831 self.writers.has_unwind_queued()
1832 }
1833}
1834
1835impl<N: NodePrimitives<BlockHeader: Value>> HeaderProvider for StaticFileProvider<N> {
1836 type Header = N::BlockHeader;
1837
1838 fn header(&self, block_hash: BlockHash) -> ProviderResult<Option<Self::Header>> {
1839 self.find_static_file(StaticFileSegment::Headers, |jar_provider| {
1840 Ok(jar_provider
1841 .cursor()?
1842 .get_two::<HeaderWithHashMask<Self::Header>>((&block_hash).into())?
1843 .and_then(|(header, hash)| {
1844 if hash == block_hash {
1845 return Some(header)
1846 }
1847 None
1848 }))
1849 })
1850 }
1851
1852 fn header_by_number(&self, num: BlockNumber) -> ProviderResult<Option<Self::Header>> {
1853 self.get_segment_provider_for_block(StaticFileSegment::Headers, num, None)
1854 .and_then(|provider| provider.header_by_number(num))
1855 .or_else(|err| {
1856 if let ProviderError::MissingStaticFileBlock(_, _) = err {
1857 Ok(None)
1858 } else {
1859 Err(err)
1860 }
1861 })
1862 }
1863
1864 fn headers_range(
1865 &self,
1866 range: impl RangeBounds<BlockNumber>,
1867 ) -> ProviderResult<Vec<Self::Header>> {
1868 self.fetch_range_with_predicate(
1869 StaticFileSegment::Headers,
1870 to_range(range),
1871 |cursor, number| cursor.get_one::<HeaderMask<Self::Header>>(number.into()),
1872 |_| true,
1873 )
1874 }
1875
1876 fn sealed_header(
1877 &self,
1878 num: BlockNumber,
1879 ) -> ProviderResult<Option<SealedHeader<Self::Header>>> {
1880 self.get_segment_provider_for_block(StaticFileSegment::Headers, num, None)
1881 .and_then(|provider| provider.sealed_header(num))
1882 .or_else(|err| {
1883 if let ProviderError::MissingStaticFileBlock(_, _) = err {
1884 Ok(None)
1885 } else {
1886 Err(err)
1887 }
1888 })
1889 }
1890
1891 fn sealed_headers_while(
1892 &self,
1893 range: impl RangeBounds<BlockNumber>,
1894 predicate: impl FnMut(&SealedHeader<Self::Header>) -> bool,
1895 ) -> ProviderResult<Vec<SealedHeader<Self::Header>>> {
1896 self.fetch_range_with_predicate(
1897 StaticFileSegment::Headers,
1898 to_range(range),
1899 |cursor, number| {
1900 Ok(cursor
1901 .get_two::<HeaderWithHashMask<Self::Header>>(number.into())?
1902 .map(|(header, hash)| SealedHeader::new(header, hash)))
1903 },
1904 predicate,
1905 )
1906 }
1907}
1908
1909impl<N: NodePrimitives> BlockHashReader for StaticFileProvider<N> {
1910 fn block_hash(&self, num: u64) -> ProviderResult<Option<B256>> {
1911 self.get_segment_provider_for_block(StaticFileSegment::Headers, num, None)
1912 .and_then(|provider| provider.block_hash(num))
1913 .or_else(|err| {
1914 if let ProviderError::MissingStaticFileBlock(_, _) = err {
1915 Ok(None)
1916 } else {
1917 Err(err)
1918 }
1919 })
1920 }
1921
1922 fn canonical_hashes_range(
1923 &self,
1924 start: BlockNumber,
1925 end: BlockNumber,
1926 ) -> ProviderResult<Vec<B256>> {
1927 self.fetch_range_with_predicate(
1928 StaticFileSegment::Headers,
1929 start..end,
1930 |cursor, number| cursor.get_one::<BlockHashMask>(number.into()),
1931 |_| true,
1932 )
1933 }
1934}
1935
1936impl<N: NodePrimitives<SignedTx: Value + SignedTransaction, Receipt: Value>> ReceiptProvider
1937 for StaticFileProvider<N>
1938{
1939 type Receipt = N::Receipt;
1940
1941 fn receipt(&self, num: TxNumber) -> ProviderResult<Option<Self::Receipt>> {
1942 self.get_segment_provider_for_transaction(StaticFileSegment::Receipts, num, None)
1943 .and_then(|provider| provider.receipt(num))
1944 .or_else(|err| {
1945 if let ProviderError::MissingStaticFileTx(_, _) = err {
1946 Ok(None)
1947 } else {
1948 Err(err)
1949 }
1950 })
1951 }
1952
1953 fn receipt_by_hash(&self, hash: TxHash) -> ProviderResult<Option<Self::Receipt>> {
1954 if let Some(num) = self.transaction_id(hash)? {
1955 return self.receipt(num)
1956 }
1957 Ok(None)
1958 }
1959
1960 fn receipts_by_block(
1961 &self,
1962 _block: BlockHashOrNumber,
1963 ) -> ProviderResult<Option<Vec<Self::Receipt>>> {
1964 unreachable!()
1965 }
1966
1967 fn receipts_by_tx_range(
1968 &self,
1969 range: impl RangeBounds<TxNumber>,
1970 ) -> ProviderResult<Vec<Self::Receipt>> {
1971 self.fetch_range_with_predicate(
1972 StaticFileSegment::Receipts,
1973 to_range(range),
1974 |cursor, number| cursor.get_one::<ReceiptMask<Self::Receipt>>(number.into()),
1975 |_| true,
1976 )
1977 }
1978
1979 fn receipts_by_block_range(
1980 &self,
1981 _block_range: RangeInclusive<BlockNumber>,
1982 ) -> ProviderResult<Vec<Vec<Self::Receipt>>> {
1983 Err(ProviderError::UnsupportedProvider)
1984 }
1985}
1986
1987impl<N: NodePrimitives<SignedTx: Value, Receipt: Value, BlockHeader: Value>> TransactionsProviderExt
1988 for StaticFileProvider<N>
1989{
1990 fn transaction_hashes_by_range(
1991 &self,
1992 tx_range: Range<TxNumber>,
1993 ) -> ProviderResult<Vec<(TxHash, TxNumber)>> {
1994 let tx_range_size = (tx_range.end - tx_range.start) as usize;
1995
1996 let chunk_size = 100;
2000
2001 let chunks = tx_range
2003 .clone()
2004 .step_by(chunk_size)
2005 .map(|start| start..std::cmp::min(start + chunk_size as u64, tx_range.end));
2006 let mut channels = Vec::with_capacity(tx_range_size.div_ceil(chunk_size));
2007
2008 for chunk_range in chunks {
2009 let (channel_tx, channel_rx) = mpsc::channel();
2010 channels.push(channel_rx);
2011
2012 let manager = self.clone();
2013
2014 rayon::spawn(move || {
2018 let mut rlp_buf = Vec::with_capacity(128);
2019 let _ = manager.fetch_range_with_predicate(
2020 StaticFileSegment::Transactions,
2021 chunk_range,
2022 |cursor, number| {
2023 Ok(cursor
2024 .get_one::<TransactionMask<Self::Transaction>>(number.into())?
2025 .map(|transaction| {
2026 rlp_buf.clear();
2027 let _ = channel_tx
2028 .send(calculate_hash((number, transaction), &mut rlp_buf));
2029 }))
2030 },
2031 |_| true,
2032 );
2033 });
2034 }
2035
2036 let mut tx_list = Vec::with_capacity(tx_range_size);
2037
2038 for channel in channels {
2040 while let Ok(tx) = channel.recv() {
2041 let (tx_hash, tx_id) = tx.map_err(|boxed| *boxed)?;
2042 tx_list.push((tx_hash, tx_id));
2043 }
2044 }
2045
2046 Ok(tx_list)
2047 }
2048}
2049
2050impl<N: NodePrimitives<SignedTx: Decompress + SignedTransaction>> TransactionsProvider
2051 for StaticFileProvider<N>
2052{
2053 type Transaction = N::SignedTx;
2054
2055 fn transaction_id(&self, tx_hash: TxHash) -> ProviderResult<Option<TxNumber>> {
2056 self.find_static_file(StaticFileSegment::Transactions, |jar_provider| {
2057 let mut cursor = jar_provider.cursor()?;
2058 if cursor
2059 .get_one::<TransactionMask<Self::Transaction>>((&tx_hash).into())?
2060 .and_then(|tx| (tx.trie_hash() == tx_hash).then_some(tx))
2061 .is_some()
2062 {
2063 Ok(cursor.number())
2064 } else {
2065 Ok(None)
2066 }
2067 })
2068 }
2069
2070 fn transaction_by_id(&self, num: TxNumber) -> ProviderResult<Option<Self::Transaction>> {
2071 self.get_segment_provider_for_transaction(StaticFileSegment::Transactions, num, None)
2072 .and_then(|provider| provider.transaction_by_id(num))
2073 .or_else(|err| {
2074 if let ProviderError::MissingStaticFileTx(_, _) = err {
2075 Ok(None)
2076 } else {
2077 Err(err)
2078 }
2079 })
2080 }
2081
2082 fn transaction_by_id_unhashed(
2083 &self,
2084 num: TxNumber,
2085 ) -> ProviderResult<Option<Self::Transaction>> {
2086 self.get_segment_provider_for_transaction(StaticFileSegment::Transactions, num, None)
2087 .and_then(|provider| provider.transaction_by_id_unhashed(num))
2088 .or_else(|err| {
2089 if let ProviderError::MissingStaticFileTx(_, _) = err {
2090 Ok(None)
2091 } else {
2092 Err(err)
2093 }
2094 })
2095 }
2096
2097 fn transaction_by_hash(&self, hash: TxHash) -> ProviderResult<Option<Self::Transaction>> {
2098 self.find_static_file(StaticFileSegment::Transactions, |jar_provider| {
2099 Ok(jar_provider
2100 .cursor()?
2101 .get_one::<TransactionMask<Self::Transaction>>((&hash).into())?
2102 .and_then(|tx| (tx.trie_hash() == hash).then_some(tx)))
2103 })
2104 }
2105
2106 fn transaction_by_hash_with_meta(
2107 &self,
2108 _hash: TxHash,
2109 ) -> ProviderResult<Option<(Self::Transaction, TransactionMeta)>> {
2110 Err(ProviderError::UnsupportedProvider)
2112 }
2113
2114 fn transactions_by_block(
2115 &self,
2116 _block_id: BlockHashOrNumber,
2117 ) -> ProviderResult<Option<Vec<Self::Transaction>>> {
2118 Err(ProviderError::UnsupportedProvider)
2120 }
2121
2122 fn transactions_by_block_range(
2123 &self,
2124 _range: impl RangeBounds<BlockNumber>,
2125 ) -> ProviderResult<Vec<Vec<Self::Transaction>>> {
2126 Err(ProviderError::UnsupportedProvider)
2128 }
2129
2130 fn transactions_by_tx_range(
2131 &self,
2132 range: impl RangeBounds<TxNumber>,
2133 ) -> ProviderResult<Vec<Self::Transaction>> {
2134 self.fetch_range_with_predicate(
2135 StaticFileSegment::Transactions,
2136 to_range(range),
2137 |cursor, number| cursor.get_one::<TransactionMask<Self::Transaction>>(number.into()),
2138 |_| true,
2139 )
2140 }
2141
2142 fn senders_by_tx_range(
2143 &self,
2144 range: impl RangeBounds<TxNumber>,
2145 ) -> ProviderResult<Vec<Address>> {
2146 self.fetch_range_with_predicate(
2147 StaticFileSegment::TransactionSenders,
2148 to_range(range),
2149 |cursor, number| cursor.get_one::<TransactionSenderMask>(number.into()),
2150 |_| true,
2151 )
2152 }
2153
2154 fn transaction_sender(&self, id: TxNumber) -> ProviderResult<Option<Address>> {
2155 self.get_segment_provider_for_transaction(StaticFileSegment::TransactionSenders, id, None)
2156 .and_then(|provider| provider.transaction_sender(id))
2157 .or_else(|err| {
2158 if let ProviderError::MissingStaticFileTx(_, _) = err {
2159 Ok(None)
2160 } else {
2161 Err(err)
2162 }
2163 })
2164 }
2165}
2166
2167impl<N: NodePrimitives> BlockNumReader for StaticFileProvider<N> {
2168 fn chain_info(&self) -> ProviderResult<ChainInfo> {
2169 Err(ProviderError::UnsupportedProvider)
2171 }
2172
2173 fn best_block_number(&self) -> ProviderResult<BlockNumber> {
2174 Err(ProviderError::UnsupportedProvider)
2176 }
2177
2178 fn last_block_number(&self) -> ProviderResult<BlockNumber> {
2179 Ok(self.get_highest_static_file_block(StaticFileSegment::Headers).unwrap_or_default())
2180 }
2181
2182 fn block_number(&self, _hash: B256) -> ProviderResult<Option<BlockNumber>> {
2183 Err(ProviderError::UnsupportedProvider)
2185 }
2186}
2187
2188impl<N: NodePrimitives<SignedTx: Value, Receipt: Value, BlockHeader: Value>> BlockReader
2191 for StaticFileProvider<N>
2192{
2193 type Block = N::Block;
2194
2195 fn find_block_by_hash(
2196 &self,
2197 _hash: B256,
2198 _source: BlockSource,
2199 ) -> ProviderResult<Option<Self::Block>> {
2200 Err(ProviderError::UnsupportedProvider)
2202 }
2203
2204 fn block(&self, _id: BlockHashOrNumber) -> ProviderResult<Option<Self::Block>> {
2205 Err(ProviderError::UnsupportedProvider)
2207 }
2208
2209 fn pending_block(&self) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
2210 Err(ProviderError::UnsupportedProvider)
2212 }
2213
2214 fn pending_block_and_receipts(
2215 &self,
2216 ) -> ProviderResult<Option<(RecoveredBlock<Self::Block>, Vec<Self::Receipt>)>> {
2217 Err(ProviderError::UnsupportedProvider)
2219 }
2220
2221 fn recovered_block(
2222 &self,
2223 _id: BlockHashOrNumber,
2224 _transaction_kind: TransactionVariant,
2225 ) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
2226 Err(ProviderError::UnsupportedProvider)
2228 }
2229
2230 fn sealed_block_with_senders(
2231 &self,
2232 _id: BlockHashOrNumber,
2233 _transaction_kind: TransactionVariant,
2234 ) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
2235 Err(ProviderError::UnsupportedProvider)
2237 }
2238
2239 fn block_range(&self, _range: RangeInclusive<BlockNumber>) -> ProviderResult<Vec<Self::Block>> {
2240 Err(ProviderError::UnsupportedProvider)
2242 }
2243
2244 fn block_with_senders_range(
2245 &self,
2246 _range: RangeInclusive<BlockNumber>,
2247 ) -> ProviderResult<Vec<RecoveredBlock<Self::Block>>> {
2248 Err(ProviderError::UnsupportedProvider)
2249 }
2250
2251 fn recovered_block_range(
2252 &self,
2253 _range: RangeInclusive<BlockNumber>,
2254 ) -> ProviderResult<Vec<RecoveredBlock<Self::Block>>> {
2255 Err(ProviderError::UnsupportedProvider)
2256 }
2257
2258 fn block_by_transaction_id(&self, _id: TxNumber) -> ProviderResult<Option<BlockNumber>> {
2259 Err(ProviderError::UnsupportedProvider)
2260 }
2261}
2262
2263impl<N: NodePrimitives> BlockBodyIndicesProvider for StaticFileProvider<N> {
2264 fn block_body_indices(&self, _num: u64) -> ProviderResult<Option<StoredBlockBodyIndices>> {
2265 Err(ProviderError::UnsupportedProvider)
2266 }
2267
2268 fn block_body_indices_range(
2269 &self,
2270 _range: RangeInclusive<BlockNumber>,
2271 ) -> ProviderResult<Vec<StoredBlockBodyIndices>> {
2272 Err(ProviderError::UnsupportedProvider)
2273 }
2274}
2275
2276impl<N: NodePrimitives> StatsReader for StaticFileProvider<N> {
2277 fn count_entries<T: Table>(&self) -> ProviderResult<usize> {
2278 match T::NAME {
2279 tables::CanonicalHeaders::NAME |
2280 tables::Headers::<Header>::NAME |
2281 tables::HeaderTerminalDifficulties::NAME => Ok(self
2282 .get_highest_static_file_block(StaticFileSegment::Headers)
2283 .map(|block| block + 1)
2284 .unwrap_or_default()
2285 as usize),
2286 tables::Receipts::<Receipt>::NAME => Ok(self
2287 .get_highest_static_file_tx(StaticFileSegment::Receipts)
2288 .map(|receipts| receipts + 1)
2289 .unwrap_or_default() as usize),
2290 tables::Transactions::<TransactionSigned>::NAME => Ok(self
2291 .get_highest_static_file_tx(StaticFileSegment::Transactions)
2292 .map(|txs| txs + 1)
2293 .unwrap_or_default()
2294 as usize),
2295 tables::TransactionSenders::NAME => Ok(self
2296 .get_highest_static_file_tx(StaticFileSegment::TransactionSenders)
2297 .map(|txs| txs + 1)
2298 .unwrap_or_default() as usize),
2299 _ => Err(ProviderError::UnsupportedProvider),
2300 }
2301 }
2302}
2303
2304#[inline]
2306fn calculate_hash<T>(
2307 entry: (TxNumber, T),
2308 rlp_buf: &mut Vec<u8>,
2309) -> Result<(B256, TxNumber), Box<ProviderError>>
2310where
2311 T: Encodable2718,
2312{
2313 let (tx_id, tx) = entry;
2314 tx.encode_2718(rlp_buf);
2315 Ok((keccak256(rlp_buf), tx_id))
2316}
2317
2318#[cfg(test)]
2319mod tests {
2320 use std::collections::BTreeMap;
2321
2322 use reth_chain_state::EthPrimitives;
2323 use reth_db::test_utils::create_test_static_files_dir;
2324 use reth_static_file_types::{SegmentRangeInclusive, StaticFileSegment};
2325
2326 use crate::StaticFileProviderBuilder;
2327
2328 #[test]
2329 fn test_find_fixed_range_with_block_index() -> eyre::Result<()> {
2330 let (static_dir, _) = create_test_static_files_dir();
2331 let sf_rw = StaticFileProviderBuilder::<EthPrimitives>::read_write(&static_dir)?
2332 .with_blocks_per_file(100)
2333 .build()?;
2334
2335 let segment = StaticFileSegment::Headers;
2336
2337 assert_eq!(
2339 sf_rw.find_fixed_range_with_block_index(segment, None, 0),
2340 SegmentRangeInclusive::new(0, 99)
2341 );
2342 assert_eq!(
2343 sf_rw.find_fixed_range_with_block_index(segment, None, 250),
2344 SegmentRangeInclusive::new(200, 299)
2345 );
2346
2347 assert_eq!(
2349 sf_rw.find_fixed_range_with_block_index(segment, Some(&BTreeMap::new()), 150),
2350 SegmentRangeInclusive::new(100, 199)
2351 );
2352
2353 let block_index = BTreeMap::from_iter([
2355 (99, SegmentRangeInclusive::new(0, 99)),
2356 (199, SegmentRangeInclusive::new(100, 199)),
2357 (299, SegmentRangeInclusive::new(200, 299)),
2358 ]);
2359
2360 assert_eq!(
2362 sf_rw.find_fixed_range_with_block_index(segment, Some(&block_index), 0),
2363 SegmentRangeInclusive::new(0, 99)
2364 );
2365 assert_eq!(
2366 sf_rw.find_fixed_range_with_block_index(segment, Some(&block_index), 50),
2367 SegmentRangeInclusive::new(0, 99)
2368 );
2369 assert_eq!(
2370 sf_rw.find_fixed_range_with_block_index(segment, Some(&block_index), 99),
2371 SegmentRangeInclusive::new(0, 99)
2372 );
2373 assert_eq!(
2374 sf_rw.find_fixed_range_with_block_index(segment, Some(&block_index), 100),
2375 SegmentRangeInclusive::new(100, 199)
2376 );
2377 assert_eq!(
2378 sf_rw.find_fixed_range_with_block_index(segment, Some(&block_index), 150),
2379 SegmentRangeInclusive::new(100, 199)
2380 );
2381 assert_eq!(
2382 sf_rw.find_fixed_range_with_block_index(segment, Some(&block_index), 199),
2383 SegmentRangeInclusive::new(100, 199)
2384 );
2385
2386 assert_eq!(
2389 sf_rw.find_fixed_range_with_block_index(segment, Some(&block_index), 300),
2390 SegmentRangeInclusive::new(300, 399)
2391 );
2392 assert_eq!(
2393 sf_rw.find_fixed_range_with_block_index(segment, Some(&block_index), 350),
2394 SegmentRangeInclusive::new(300, 399)
2395 );
2396
2397 assert_eq!(
2399 sf_rw.find_fixed_range_with_block_index(segment, Some(&block_index), 500),
2400 SegmentRangeInclusive::new(500, 599)
2401 );
2402
2403 assert_eq!(
2405 sf_rw.find_fixed_range_with_block_index(segment, Some(&block_index), 1000),
2406 SegmentRangeInclusive::new(1000, 1099)
2407 );
2408
2409 let mixed_size_index = BTreeMap::from_iter([
2412 (49, SegmentRangeInclusive::new(0, 49)), (149, SegmentRangeInclusive::new(50, 149)), (349, SegmentRangeInclusive::new(150, 349)), ]);
2416
2417 assert_eq!(
2419 sf_rw.find_fixed_range_with_block_index(segment, Some(&mixed_size_index), 25),
2420 SegmentRangeInclusive::new(0, 49)
2421 );
2422 assert_eq!(
2423 sf_rw.find_fixed_range_with_block_index(segment, Some(&mixed_size_index), 100),
2424 SegmentRangeInclusive::new(50, 149)
2425 );
2426 assert_eq!(
2427 sf_rw.find_fixed_range_with_block_index(segment, Some(&mixed_size_index), 200),
2428 SegmentRangeInclusive::new(150, 349)
2429 );
2430
2431 assert_eq!(
2434 sf_rw.find_fixed_range_with_block_index(segment, Some(&mixed_size_index), 350),
2435 SegmentRangeInclusive::new(350, 449)
2436 );
2437 assert_eq!(
2438 sf_rw.find_fixed_range_with_block_index(segment, Some(&mixed_size_index), 450),
2439 SegmentRangeInclusive::new(450, 549)
2440 );
2441 assert_eq!(
2442 sf_rw.find_fixed_range_with_block_index(segment, Some(&mixed_size_index), 550),
2443 SegmentRangeInclusive::new(550, 649)
2444 );
2445
2446 Ok(())
2447 }
2448}