1use super::{
2 metrics::StaticFileProviderMetrics, writer::StaticFileWriters, LoadedJar,
3 StaticFileJarProvider, StaticFileProviderRW, StaticFileProviderRWRefMut,
4};
5use crate::{
6 to_range, BlockHashReader, BlockNumReader, BlockReader, BlockSource, EitherWriter,
7 HeaderProvider, ReceiptProvider, StageCheckpointReader, StatsReader, TransactionVariant,
8 TransactionsProvider, TransactionsProviderExt,
9};
10use alloy_consensus::{
11 transaction::{SignerRecoverable, TransactionMeta},
12 Header,
13};
14use alloy_eips::{eip2718::Encodable2718, BlockHashOrNumber};
15use alloy_primitives::{b256, keccak256, Address, BlockHash, BlockNumber, TxHash, TxNumber, B256};
16use dashmap::DashMap;
17use notify::{RecommendedWatcher, RecursiveMode, Watcher};
18use parking_lot::RwLock;
19use reth_chainspec::{ChainInfo, ChainSpecProvider, EthChainSpec, NamedChain};
20use reth_db::{
21 lockfile::StorageLock,
22 static_file::{
23 iter_static_files, BlockHashMask, HeaderMask, HeaderWithHashMask, ReceiptMask,
24 StaticFileCursor, TransactionMask,
25 },
26};
27use reth_db_api::{
28 cursor::DbCursorRO,
29 models::StoredBlockBodyIndices,
30 table::{Decompress, Table, Value},
31 tables,
32 transaction::DbTx,
33};
34use reth_ethereum_primitives::{Receipt, TransactionSigned};
35use reth_nippy_jar::{NippyJar, NippyJarChecker, CONFIG_FILE_EXTENSION};
36use reth_node_types::NodePrimitives;
37use reth_primitives_traits::{RecoveredBlock, SealedHeader, SignedTransaction};
38use reth_stages_types::{PipelineTarget, StageId};
39use reth_static_file_types::{
40 find_fixed_range, HighestStaticFiles, SegmentHeader, SegmentRangeInclusive, StaticFileSegment,
41 DEFAULT_BLOCKS_PER_STATIC_FILE,
42};
43use reth_storage_api::{BlockBodyIndicesProvider, DBProvider, StorageSettingsCache};
44use reth_storage_errors::provider::{ProviderError, ProviderResult};
45use std::{
46 collections::{BTreeMap, HashMap},
47 fmt::Debug,
48 ops::{Deref, Range, RangeBounds, RangeInclusive},
49 path::{Path, PathBuf},
50 sync::{atomic::AtomicU64, mpsc, Arc},
51};
52use tracing::{debug, info, trace, warn};
53
54type SegmentRanges = HashMap<StaticFileSegment, BTreeMap<u64, SegmentRangeInclusive>>;
57
58#[derive(Debug, Default, PartialEq, Eq)]
60pub enum StaticFileAccess {
61 #[default]
63 RO,
64 RW,
66}
67
68impl StaticFileAccess {
69 pub const fn is_read_only(&self) -> bool {
71 matches!(self, Self::RO)
72 }
73
74 pub const fn is_read_write(&self) -> bool {
76 matches!(self, Self::RW)
77 }
78}
79
80#[derive(Debug)]
89pub struct StaticFileProvider<N>(pub(crate) Arc<StaticFileProviderInner<N>>);
90
91impl<N> Clone for StaticFileProvider<N> {
92 fn clone(&self) -> Self {
93 Self(self.0.clone())
94 }
95}
96
97#[derive(Debug)]
99pub struct StaticFileProviderBuilder<N> {
100 inner: StaticFileProviderInner<N>,
101}
102
103impl<N: NodePrimitives> StaticFileProviderBuilder<N> {
104 pub fn read_write(path: impl AsRef<Path>) -> ProviderResult<Self> {
106 StaticFileProviderInner::new(path, StaticFileAccess::RW).map(|inner| Self { inner })
107 }
108
109 pub fn read_only(path: impl AsRef<Path>) -> ProviderResult<Self> {
111 StaticFileProviderInner::new(path, StaticFileAccess::RO).map(|inner| Self { inner })
112 }
113
114 pub fn with_blocks_per_file_for_segments(
126 mut self,
127 segments: HashMap<StaticFileSegment, u64>,
128 ) -> Self {
129 self.inner.blocks_per_file.extend(segments);
130 self
131 }
132
133 pub fn with_blocks_per_file(mut self, blocks_per_file: u64) -> Self {
135 for segment in StaticFileSegment::iter() {
136 self.inner.blocks_per_file.insert(segment, blocks_per_file);
137 }
138 self
139 }
140
141 pub fn with_blocks_per_file_for_segment(
143 mut self,
144 segment: StaticFileSegment,
145 blocks_per_file: u64,
146 ) -> Self {
147 self.inner.blocks_per_file.insert(segment, blocks_per_file);
148 self
149 }
150
151 pub fn with_metrics(mut self) -> Self {
153 self.inner.metrics = Some(Arc::new(StaticFileProviderMetrics::default()));
154 self
155 }
156
157 pub fn build(self) -> ProviderResult<StaticFileProvider<N>> {
159 let provider = StaticFileProvider(Arc::new(self.inner));
160 provider.initialize_index()?;
161 Ok(provider)
162 }
163}
164
165impl<N: NodePrimitives> StaticFileProvider<N> {
166 pub fn read_only(path: impl AsRef<Path>, watch_directory: bool) -> ProviderResult<Self> {
176 let provider = StaticFileProviderBuilder::read_only(path)?.build()?;
177
178 if watch_directory {
179 provider.watch_directory();
180 }
181
182 Ok(provider)
183 }
184
185 pub fn read_write(path: impl AsRef<Path>) -> ProviderResult<Self> {
187 StaticFileProviderBuilder::read_write(path)?.build()
188 }
189
190 pub fn watch_directory(&self) {
196 let provider = self.clone();
197 std::thread::spawn(move || {
198 let (tx, rx) = std::sync::mpsc::channel();
199 let mut watcher = RecommendedWatcher::new(
200 move |res| tx.send(res).unwrap(),
201 notify::Config::default(),
202 )
203 .expect("failed to create watcher");
204
205 watcher
206 .watch(&provider.path, RecursiveMode::NonRecursive)
207 .expect("failed to watch path");
208
209 let mut last_event_timestamp = None;
211
212 while let Ok(res) = rx.recv() {
213 match res {
214 Ok(event) => {
215 if !matches!(
217 event.kind,
218 notify::EventKind::Modify(_) |
219 notify::EventKind::Create(_) |
220 notify::EventKind::Remove(_)
221 ) {
222 continue
223 }
224
225 for segment in event.paths {
230 if segment
232 .extension()
233 .is_none_or(|s| s.to_str() != Some(CONFIG_FILE_EXTENSION))
234 {
235 continue
236 }
237
238 if StaticFileSegment::parse_filename(
240 &segment.file_stem().expect("qed").to_string_lossy(),
241 )
242 .is_none()
243 {
244 continue
245 }
246
247 if let Ok(current_modified_timestamp) =
250 std::fs::metadata(&segment).and_then(|m| m.modified())
251 {
252 if last_event_timestamp.is_some_and(|last_timestamp| {
253 last_timestamp >= current_modified_timestamp
254 }) {
255 continue
256 }
257 last_event_timestamp = Some(current_modified_timestamp);
258 }
259
260 info!(target: "providers::static_file", updated_file = ?segment.file_stem(), "re-initializing static file provider index");
261 if let Err(err) = provider.initialize_index() {
262 warn!(target: "providers::static_file", "failed to re-initialize index: {err}");
263 }
264 break
265 }
266 }
267
268 Err(err) => warn!(target: "providers::watcher", "watch error: {err:?}"),
269 }
270 }
271 });
272 }
273}
274
275impl<N: NodePrimitives> Deref for StaticFileProvider<N> {
276 type Target = StaticFileProviderInner<N>;
277
278 fn deref(&self) -> &Self::Target {
279 &self.0
280 }
281}
282
283#[derive(Debug)]
285pub struct StaticFileProviderInner<N> {
286 map: DashMap<(BlockNumber, StaticFileSegment), LoadedJar>,
289 static_files_min_block: RwLock<HashMap<StaticFileSegment, SegmentRangeInclusive>>,
299 earliest_history_height: AtomicU64,
310 static_files_max_block: RwLock<HashMap<StaticFileSegment, u64>>,
312 static_files_expected_block_index: RwLock<SegmentRanges>,
318 static_files_tx_index: RwLock<SegmentRanges>,
325 path: PathBuf,
327 writers: StaticFileWriters<N>,
329 metrics: Option<Arc<StaticFileProviderMetrics>>,
331 access: StaticFileAccess,
333 blocks_per_file: HashMap<StaticFileSegment, u64>,
335 _lock_file: Option<StorageLock>,
337}
338
339impl<N: NodePrimitives> StaticFileProviderInner<N> {
340 fn new(path: impl AsRef<Path>, access: StaticFileAccess) -> ProviderResult<Self> {
342 let _lock_file = if access.is_read_write() {
343 StorageLock::try_acquire(path.as_ref()).map_err(ProviderError::other)?.into()
344 } else {
345 None
346 };
347
348 let mut blocks_per_file = HashMap::new();
349 for segment in StaticFileSegment::iter() {
350 blocks_per_file.insert(segment, DEFAULT_BLOCKS_PER_STATIC_FILE);
351 }
352
353 let provider = Self {
354 map: Default::default(),
355 writers: Default::default(),
356 static_files_min_block: Default::default(),
357 earliest_history_height: Default::default(),
358 static_files_max_block: Default::default(),
359 static_files_expected_block_index: Default::default(),
360 static_files_tx_index: Default::default(),
361 path: path.as_ref().to_path_buf(),
362 metrics: None,
363 access,
364 blocks_per_file,
365 _lock_file,
366 };
367
368 Ok(provider)
369 }
370
371 pub const fn is_read_only(&self) -> bool {
372 self.access.is_read_only()
373 }
374
375 pub fn find_fixed_range_with_block_index(
384 &self,
385 segment: StaticFileSegment,
386 block_index: Option<&BTreeMap<u64, SegmentRangeInclusive>>,
387 block: BlockNumber,
388 ) -> SegmentRangeInclusive {
389 let blocks_per_file =
390 self.blocks_per_file.get(&segment).copied().unwrap_or(DEFAULT_BLOCKS_PER_STATIC_FILE);
391
392 if let Some(block_index) = block_index {
393 if let Some((_, range)) = block_index.iter().find(|(max_block, _)| block <= **max_block)
395 {
396 return *range
398 } else if let Some((_, range)) = block_index.last_key_value() {
399 let blocks_after_last_range = block - range.end();
405 let segments_to_skip = (blocks_after_last_range - 1) / blocks_per_file;
406 let start = range.end() + 1 + segments_to_skip * blocks_per_file;
407 return SegmentRangeInclusive::new(start, start + blocks_per_file - 1)
408 }
409 }
410 find_fixed_range(block, blocks_per_file)
413 }
414
415 pub fn find_fixed_range(
428 &self,
429 segment: StaticFileSegment,
430 block: BlockNumber,
431 ) -> SegmentRangeInclusive {
432 self.find_fixed_range_with_block_index(
433 segment,
434 self.static_files_expected_block_index.read().get(&segment),
435 block,
436 )
437 }
438}
439
440impl<N: NodePrimitives> StaticFileProvider<N> {
441 pub fn report_metrics(&self) -> ProviderResult<()> {
443 let Some(metrics) = &self.metrics else { return Ok(()) };
444
445 let static_files = iter_static_files(&self.path).map_err(ProviderError::other)?;
446 for (segment, headers) in static_files {
447 let mut entries = 0;
448 let mut size = 0;
449
450 for (block_range, _) in &headers {
451 let fixed_block_range = self.find_fixed_range(segment, block_range.start());
452 let jar_provider = self
453 .get_segment_provider_for_range(segment, || Some(fixed_block_range), None)?
454 .ok_or_else(|| {
455 ProviderError::MissingStaticFileBlock(segment, block_range.start())
456 })?;
457
458 entries += jar_provider.rows();
459
460 let data_size = reth_fs_util::metadata(jar_provider.data_path())
461 .map(|metadata| metadata.len())
462 .unwrap_or_default();
463 let index_size = reth_fs_util::metadata(jar_provider.index_path())
464 .map(|metadata| metadata.len())
465 .unwrap_or_default();
466 let offsets_size = reth_fs_util::metadata(jar_provider.offsets_path())
467 .map(|metadata| metadata.len())
468 .unwrap_or_default();
469 let config_size = reth_fs_util::metadata(jar_provider.config_path())
470 .map(|metadata| metadata.len())
471 .unwrap_or_default();
472
473 size += data_size + index_size + offsets_size + config_size;
474 }
475
476 metrics.record_segment(segment, size, headers.len(), entries);
477 }
478
479 Ok(())
480 }
481
482 pub fn get_segment_provider(
485 &self,
486 segment: StaticFileSegment,
487 start: u64,
488 ) -> ProviderResult<StaticFileJarProvider<'_, N>> {
489 if segment.is_block_based() {
490 self.get_segment_provider_for_block(segment, start, None)
491 } else {
492 self.get_segment_provider_for_transaction(segment, start, None)
493 }
494 }
495
496 pub fn get_segment_provider_for_block(
498 &self,
499 segment: StaticFileSegment,
500 block: BlockNumber,
501 path: Option<&Path>,
502 ) -> ProviderResult<StaticFileJarProvider<'_, N>> {
503 self.get_segment_provider_for_range(
504 segment,
505 || self.get_segment_ranges_from_block(segment, block),
506 path,
507 )?
508 .ok_or(ProviderError::MissingStaticFileBlock(segment, block))
509 }
510
511 pub fn get_segment_provider_for_transaction(
513 &self,
514 segment: StaticFileSegment,
515 tx: TxNumber,
516 path: Option<&Path>,
517 ) -> ProviderResult<StaticFileJarProvider<'_, N>> {
518 self.get_segment_provider_for_range(
519 segment,
520 || self.get_segment_ranges_from_transaction(segment, tx),
521 path,
522 )?
523 .ok_or(ProviderError::MissingStaticFileTx(segment, tx))
524 }
525
526 pub fn get_segment_provider_for_range(
530 &self,
531 segment: StaticFileSegment,
532 fn_range: impl Fn() -> Option<SegmentRangeInclusive>,
533 path: Option<&Path>,
534 ) -> ProviderResult<Option<StaticFileJarProvider<'_, N>>> {
535 let block_range = match path {
538 Some(path) => StaticFileSegment::parse_filename(
539 &path
540 .file_name()
541 .ok_or_else(|| {
542 ProviderError::MissingStaticFileSegmentPath(segment, path.to_path_buf())
543 })?
544 .to_string_lossy(),
545 )
546 .and_then(|(parsed_segment, block_range)| {
547 if parsed_segment == segment {
548 return Some(block_range)
549 }
550 None
551 }),
552 None => fn_range(),
553 };
554
555 if let Some(block_range) = block_range {
557 return Ok(Some(self.get_or_create_jar_provider(segment, &block_range)?))
558 }
559
560 Ok(None)
561 }
562
563 pub fn get_segment_provider_for_path(
565 &self,
566 path: &Path,
567 ) -> ProviderResult<Option<StaticFileJarProvider<'_, N>>> {
568 StaticFileSegment::parse_filename(
569 &path
570 .file_name()
571 .ok_or_else(|| ProviderError::MissingStaticFilePath(path.to_path_buf()))?
572 .to_string_lossy(),
573 )
574 .map(|(segment, block_range)| self.get_or_create_jar_provider(segment, &block_range))
575 .transpose()
576 }
577
578 pub fn remove_cached_provider(
582 &self,
583 segment: StaticFileSegment,
584 fixed_block_range_end: BlockNumber,
585 ) {
586 self.map.remove(&(fixed_block_range_end, segment));
587 }
588
589 pub fn delete_segment_below_block(
606 &self,
607 segment: StaticFileSegment,
608 block: BlockNumber,
609 ) -> ProviderResult<Vec<SegmentHeader>> {
610 if block == 0 {
612 return Ok(Vec::new())
613 }
614
615 let highest_block = self.get_highest_static_file_block(segment);
616 let mut deleted_headers = Vec::new();
617
618 loop {
619 let Some(block_height) = self.get_lowest_range_end(segment) else {
620 return Ok(deleted_headers)
621 };
622
623 if block_height >= block || Some(block_height) == highest_block {
625 return Ok(deleted_headers)
626 }
627
628 debug!(
629 target: "provider::static_file",
630 ?segment,
631 ?block_height,
632 "Deleting static file below block"
633 );
634
635 let header = self.delete_jar(segment, block_height).inspect_err(|err| {
638 warn!( target: "provider::static_file", ?segment, %block_height, ?err, "Failed to delete static file below block")
639 })?;
640
641 deleted_headers.push(header);
642 }
643 }
644
645 pub fn delete_jar(
653 &self,
654 segment: StaticFileSegment,
655 block: BlockNumber,
656 ) -> ProviderResult<SegmentHeader> {
657 let fixed_block_range = self.find_fixed_range(segment, block);
658 let key = (fixed_block_range.end(), segment);
659 let jar = if let Some((_, jar)) = self.map.remove(&key) {
660 jar.jar
661 } else {
662 let file = self.path.join(segment.filename(&fixed_block_range));
663 debug!(
664 target: "provider::static_file",
665 ?file,
666 ?fixed_block_range,
667 ?block,
668 "Loading static file jar for deletion"
669 );
670 NippyJar::<SegmentHeader>::load(&file).map_err(ProviderError::other)?
671 };
672
673 let header = *jar.user_header();
674 jar.delete().map_err(ProviderError::other)?;
675
676 self.initialize_index()?;
679
680 Ok(header)
681 }
682
683 fn get_or_create_jar_provider(
687 &self,
688 segment: StaticFileSegment,
689 fixed_block_range: &SegmentRangeInclusive,
690 ) -> ProviderResult<StaticFileJarProvider<'_, N>> {
691 let key = (fixed_block_range.end(), segment);
692
693 trace!(target: "provider::static_file", ?segment, ?fixed_block_range, "Getting provider");
695 let mut provider: StaticFileJarProvider<'_, N> = if let Some(jar) = self.map.get(&key) {
696 trace!(target: "provider::static_file", ?segment, ?fixed_block_range, "Jar found in cache");
697 jar.into()
698 } else {
699 trace!(target: "provider::static_file", ?segment, ?fixed_block_range, "Creating jar from scratch");
700 let path = self.path.join(segment.filename(fixed_block_range));
701 let jar = NippyJar::load(&path).map_err(ProviderError::other)?;
702 self.map.entry(key).insert(LoadedJar::new(jar)?).downgrade().into()
703 };
704
705 if let Some(metrics) = &self.metrics {
706 provider = provider.with_metrics(metrics.clone());
707 }
708 Ok(provider)
709 }
710
711 fn get_segment_ranges_from_block(
714 &self,
715 segment: StaticFileSegment,
716 block: u64,
717 ) -> Option<SegmentRangeInclusive> {
718 self.static_files_max_block
719 .read()
720 .get(&segment)
721 .filter(|max| **max >= block)
722 .map(|_| self.find_fixed_range(segment, block))
723 }
724
725 fn get_segment_ranges_from_transaction(
728 &self,
729 segment: StaticFileSegment,
730 tx: u64,
731 ) -> Option<SegmentRangeInclusive> {
732 let static_files = self.static_files_tx_index.read();
733 let segment_static_files = static_files.get(&segment)?;
734
735 let mut static_files_rev_iter = segment_static_files.iter().rev().peekable();
738
739 while let Some((tx_end, block_range)) = static_files_rev_iter.next() {
740 if tx > *tx_end {
741 return None
743 }
744 let tx_start = static_files_rev_iter.peek().map(|(tx_end, _)| *tx_end + 1).unwrap_or(0);
745 if tx_start <= tx {
746 return Some(self.find_fixed_range(segment, block_range.end()))
747 }
748 }
749 None
750 }
751
752 pub fn update_index(
759 &self,
760 segment: StaticFileSegment,
761 segment_max_block: Option<BlockNumber>,
762 ) -> ProviderResult<()> {
763 let mut min_block = self.static_files_min_block.write();
764 let mut max_block = self.static_files_max_block.write();
765 let mut expected_block_index = self.static_files_expected_block_index.write();
766 let mut tx_index = self.static_files_tx_index.write();
767
768 match segment_max_block {
769 Some(segment_max_block) => {
770 max_block.insert(segment, segment_max_block);
772 let fixed_range = self.find_fixed_range_with_block_index(
773 segment,
774 expected_block_index.get(&segment),
775 segment_max_block,
776 );
777
778 let jar = NippyJar::<SegmentHeader>::load(
779 &self.path.join(segment.filename(&fixed_range)),
780 )
781 .map_err(ProviderError::other)?;
782
783 if let Some(current_block_range) = jar.user_header().block_range() {
799 min_block
800 .entry(segment)
801 .and_modify(|current_min| {
802 if current_block_range.start() == current_min.start() {
805 *current_min = current_block_range;
806 }
807 })
808 .or_insert(current_block_range);
809 }
810
811 expected_block_index
813 .entry(segment)
814 .and_modify(|index| {
815 index.retain(|_, block_range| block_range.start() < fixed_range.start());
816
817 index.insert(fixed_range.end(), fixed_range);
818 })
819 .or_insert_with(|| BTreeMap::from([(fixed_range.end(), fixed_range)]));
820
821 if let Some(tx_range) = jar.user_header().tx_range() {
824 if let Some(current_block_range) = jar.user_header().block_range() {
827 let tx_end = tx_range.end();
828
829 tx_index
838 .entry(segment)
839 .and_modify(|index| {
840 index.retain(|_, block_range| {
841 block_range.start() < fixed_range.start()
842 });
843 index.insert(tx_end, current_block_range);
844 })
845 .or_insert_with(|| BTreeMap::from([(tx_end, current_block_range)]));
846 }
847 } else if segment.is_tx_based() {
848 tx_index.entry(segment).and_modify(|index| {
852 index.retain(|_, block_range| block_range.start() < fixed_range.start());
853 });
854
855 if tx_index.get(&segment).is_some_and(|index| index.is_empty()) {
857 tx_index.remove(&segment);
858 }
859 }
860
861 self.map.insert((fixed_range.end(), segment), LoadedJar::new(jar)?);
863
864 self.map.retain(|(end, seg), _| !(*seg == segment && *end > fixed_range.end()));
866 }
867 None => {
868 max_block.remove(&segment);
869 min_block.remove(&segment);
870 expected_block_index.remove(&segment);
871 tx_index.remove(&segment);
872 }
873 };
874
875 Ok(())
876 }
877
878 pub fn initialize_index(&self) -> ProviderResult<()> {
880 let mut min_block = self.static_files_min_block.write();
881 let mut max_block = self.static_files_max_block.write();
882 let mut expected_block_index = self.static_files_expected_block_index.write();
883 let mut tx_index = self.static_files_tx_index.write();
884
885 min_block.clear();
886 max_block.clear();
887 tx_index.clear();
888
889 for (segment, headers) in iter_static_files(&self.path).map_err(ProviderError::other)? {
890 if let Some((block_range, _)) = headers.first() {
892 min_block.insert(segment, *block_range);
893 }
894 if let Some((block_range, _)) = headers.last() {
895 max_block.insert(segment, block_range.end());
896 }
897
898 for (block_range, header) in headers {
899 expected_block_index
901 .entry(segment)
902 .and_modify(|index| {
903 index.insert(header.expected_block_end(), header.expected_block_range());
904 })
905 .or_insert_with(|| {
906 BTreeMap::from([(
907 header.expected_block_end(),
908 header.expected_block_range(),
909 )])
910 });
911
912 if let Some(tx_range) = header.tx_range() {
914 let tx_end = tx_range.end();
915
916 tx_index
917 .entry(segment)
918 .and_modify(|index| {
919 index.insert(tx_end, block_range);
920 })
921 .or_insert_with(|| BTreeMap::from([(tx_end, block_range)]));
922 }
923 }
924 }
925
926 self.map.clear();
928
929 if let Some(lowest_range) = min_block.get(&StaticFileSegment::Transactions) {
931 self.earliest_history_height
933 .store(lowest_range.start(), std::sync::atomic::Ordering::Relaxed);
934 }
935
936 Ok(())
937 }
938
939 pub fn check_consistency<Provider>(
963 &self,
964 provider: &Provider,
965 ) -> ProviderResult<Option<PipelineTarget>>
966 where
967 Provider: DBProvider
968 + BlockReader
969 + StageCheckpointReader
970 + ChainSpecProvider
971 + StorageSettingsCache,
972 N: NodePrimitives<Receipt: Value, BlockHeader: Value, SignedTx: Value>,
973 {
974 if provider.chain_spec().is_optimism() &&
981 reth_chainspec::Chain::optimism_mainnet() == provider.chain_spec().chain_id()
982 {
983 const OVM_HEADER_1_HASH: B256 =
985 b256!("0xbee7192e575af30420cae0c7776304ac196077ee72b048970549e4f08e875453");
986 if provider.block_number(OVM_HEADER_1_HASH)?.is_some() {
987 info!(target: "reth::cli",
988 "Skipping storage verification for OP mainnet, expected inconsistency in OVM chain"
989 );
990 return Ok(None)
991 }
992 }
993
994 info!(target: "reth::cli", "Verifying storage consistency.");
995
996 let mut unwind_target: Option<BlockNumber> = None;
997 let mut update_unwind_target = |new_target: BlockNumber| {
998 if let Some(target) = unwind_target.as_mut() {
999 *target = (*target).min(new_target);
1000 } else {
1001 unwind_target = Some(new_target);
1002 }
1003 };
1004
1005 for segment in StaticFileSegment::iter() {
1006 match segment {
1007 StaticFileSegment::Headers | StaticFileSegment::Transactions => {}
1008 StaticFileSegment::Receipts => {
1009 if EitherWriter::receipts_destination(provider).is_database() {
1010 continue
1013 }
1014
1015 if NamedChain::Gnosis == provider.chain_spec().chain_id() ||
1016 NamedChain::Chiado == provider.chain_spec().chain_id()
1017 {
1018 continue;
1022 }
1023 }
1024 }
1025
1026 let initial_highest_block = self.get_highest_static_file_block(segment);
1027
1028 if self.access.is_read_only() {
1037 self.check_segment_consistency(segment)?;
1038 } else {
1039 self.latest_writer(segment)?;
1041 }
1042
1043 let mut highest_block = self.get_highest_static_file_block(segment);
1048 if initial_highest_block != highest_block {
1049 info!(
1050 target: "reth::providers::static_file",
1051 ?initial_highest_block,
1052 unwind_target = highest_block,
1053 ?segment,
1054 "Setting unwind target."
1055 );
1056 update_unwind_target(highest_block.unwrap_or_default());
1057 }
1058
1059 let highest_tx = self.get_highest_static_file_tx(segment);
1065 if let Some(highest_tx) = highest_tx {
1066 let mut last_block = highest_block.unwrap_or_default();
1067 loop {
1068 if let Some(indices) = provider.block_body_indices(last_block)? {
1069 if indices.last_tx_num() <= highest_tx {
1070 break
1071 }
1072 } else {
1073 break
1077 }
1078 if last_block == 0 {
1079 break
1080 }
1081 last_block -= 1;
1082
1083 info!(
1084 target: "reth::providers::static_file",
1085 highest_block = self.get_highest_static_file_block(segment),
1086 unwind_target = last_block,
1087 ?segment,
1088 "Setting unwind target."
1089 );
1090 highest_block = Some(last_block);
1091 update_unwind_target(last_block);
1092 }
1093 }
1094
1095 if let Some(unwind) = match segment {
1096 StaticFileSegment::Headers => self
1097 .ensure_invariants::<_, tables::Headers<N::BlockHeader>>(
1098 provider,
1099 segment,
1100 highest_block,
1101 highest_block,
1102 )?,
1103 StaticFileSegment::Transactions => self
1104 .ensure_invariants::<_, tables::Transactions<N::SignedTx>>(
1105 provider,
1106 segment,
1107 highest_tx,
1108 highest_block,
1109 )?,
1110 StaticFileSegment::Receipts => self
1111 .ensure_invariants::<_, tables::Receipts<N::Receipt>>(
1112 provider,
1113 segment,
1114 highest_tx,
1115 highest_block,
1116 )?,
1117 } {
1118 update_unwind_target(unwind);
1119 }
1120 }
1121
1122 Ok(unwind_target.map(PipelineTarget::Unwind))
1123 }
1124
1125 pub fn check_segment_consistency(&self, segment: StaticFileSegment) -> ProviderResult<()> {
1128 if let Some(latest_block) = self.get_highest_static_file_block(segment) {
1129 let file_path = self
1130 .directory()
1131 .join(segment.filename(&self.find_fixed_range(segment, latest_block)));
1132
1133 let jar = NippyJar::<SegmentHeader>::load(&file_path).map_err(ProviderError::other)?;
1134
1135 NippyJarChecker::new(jar).check_consistency().map_err(ProviderError::other)?;
1136 }
1137 Ok(())
1138 }
1139
1140 fn ensure_invariants<Provider, T: Table<Key = u64>>(
1155 &self,
1156 provider: &Provider,
1157 segment: StaticFileSegment,
1158 highest_static_file_entry: Option<u64>,
1159 highest_static_file_block: Option<BlockNumber>,
1160 ) -> ProviderResult<Option<BlockNumber>>
1161 where
1162 Provider: DBProvider + BlockReader + StageCheckpointReader,
1163 {
1164 let mut db_cursor = provider.tx_ref().cursor_read::<T>()?;
1165
1166 if let Some((db_first_entry, _)) = db_cursor.first()? {
1167 if let (Some(highest_entry), Some(highest_block)) =
1168 (highest_static_file_entry, highest_static_file_block)
1169 {
1170 if !(db_first_entry <= highest_entry || highest_entry + 1 == db_first_entry) {
1174 info!(
1175 target: "reth::providers::static_file",
1176 ?db_first_entry,
1177 ?highest_entry,
1178 unwind_target = highest_block,
1179 ?segment,
1180 "Setting unwind target."
1181 );
1182 return Ok(Some(highest_block))
1183 }
1184 }
1185
1186 if let Some((db_last_entry, _)) = db_cursor.last()? &&
1187 highest_static_file_entry
1188 .is_none_or(|highest_entry| db_last_entry > highest_entry)
1189 {
1190 return Ok(None)
1191 }
1192 }
1193
1194 let highest_static_file_entry = highest_static_file_entry.unwrap_or_default();
1195 let highest_static_file_block = highest_static_file_block.unwrap_or_default();
1196
1197 let checkpoint_block_number = provider
1200 .get_stage_checkpoint(match segment {
1201 StaticFileSegment::Headers => StageId::Headers,
1202 StaticFileSegment::Transactions => StageId::Bodies,
1203 StaticFileSegment::Receipts => StageId::Execution,
1204 })?
1205 .unwrap_or_default()
1206 .block_number;
1207
1208 if checkpoint_block_number > highest_static_file_block {
1210 info!(
1211 target: "reth::providers::static_file",
1212 checkpoint_block_number,
1213 unwind_target = highest_static_file_block,
1214 ?segment,
1215 "Setting unwind target."
1216 );
1217 return Ok(Some(highest_static_file_block))
1218 }
1219
1220 if checkpoint_block_number < highest_static_file_block {
1224 info!(
1225 target: "reth::providers",
1226 ?segment,
1227 from = highest_static_file_block,
1228 to = checkpoint_block_number,
1229 "Unwinding static file segment."
1230 );
1231 let mut writer = self.latest_writer(segment)?;
1232 match segment {
1233 StaticFileSegment::Headers => {
1234 writer.prune_headers(highest_static_file_block - checkpoint_block_number)?;
1236 }
1237 StaticFileSegment::Transactions | StaticFileSegment::Receipts => {
1238 if let Some(block) = provider.block_body_indices(checkpoint_block_number)? {
1239 let number = highest_static_file_entry - block.last_tx_num();
1240
1241 match segment {
1242 StaticFileSegment::Transactions => {
1243 writer.prune_transactions(number, checkpoint_block_number)?
1244 }
1245 StaticFileSegment::Receipts => {
1246 writer.prune_receipts(number, checkpoint_block_number)?
1247 }
1248 StaticFileSegment::Headers => unreachable!(),
1249 }
1250 }
1251 }
1252 }
1253 writer.commit()?;
1254 }
1255
1256 Ok(None)
1257 }
1258
1259 pub fn earliest_history_height(&self) -> BlockNumber {
1267 self.earliest_history_height.load(std::sync::atomic::Ordering::Relaxed)
1268 }
1269
1270 pub fn get_lowest_range(&self, segment: StaticFileSegment) -> Option<SegmentRangeInclusive> {
1274 self.static_files_min_block.read().get(&segment).copied()
1275 }
1276
1277 pub fn get_lowest_range_start(&self, segment: StaticFileSegment) -> Option<BlockNumber> {
1283 self.get_lowest_range(segment).map(|range| range.start())
1284 }
1285
1286 pub fn get_lowest_range_end(&self, segment: StaticFileSegment) -> Option<BlockNumber> {
1292 self.get_lowest_range(segment).map(|range| range.end())
1293 }
1294
1295 pub fn get_highest_static_file_block(&self, segment: StaticFileSegment) -> Option<BlockNumber> {
1299 self.static_files_max_block.read().get(&segment).copied()
1300 }
1301
1302 pub fn get_highest_static_file_tx(&self, segment: StaticFileSegment) -> Option<TxNumber> {
1306 self.static_files_tx_index
1307 .read()
1308 .get(&segment)
1309 .and_then(|index| index.last_key_value().map(|(last_tx, _)| *last_tx))
1310 }
1311
1312 pub fn get_highest_static_files(&self) -> HighestStaticFiles {
1314 HighestStaticFiles {
1315 receipts: self.get_highest_static_file_block(StaticFileSegment::Receipts),
1316 }
1317 }
1318
1319 pub fn find_static_file<T>(
1322 &self,
1323 segment: StaticFileSegment,
1324 func: impl Fn(StaticFileJarProvider<'_, N>) -> ProviderResult<Option<T>>,
1325 ) -> ProviderResult<Option<T>> {
1326 if let Some(ranges) = self.static_files_expected_block_index.read().get(&segment) {
1327 for range in ranges.values().rev() {
1329 if let Some(res) = func(self.get_or_create_jar_provider(segment, range)?)? {
1330 return Ok(Some(res))
1331 }
1332 }
1333 }
1334
1335 Ok(None)
1336 }
1337
1338 pub fn fetch_range_with_predicate<T, F, P>(
1344 &self,
1345 segment: StaticFileSegment,
1346 range: Range<u64>,
1347 mut get_fn: F,
1348 mut predicate: P,
1349 ) -> ProviderResult<Vec<T>>
1350 where
1351 F: FnMut(&mut StaticFileCursor<'_>, u64) -> ProviderResult<Option<T>>,
1352 P: FnMut(&T) -> bool,
1353 {
1354 let mut result = Vec::with_capacity((range.end - range.start).min(100) as usize);
1355
1356 macro_rules! get_provider {
1360 ($number:expr) => {{
1361 match self.get_segment_provider(segment, $number) {
1362 Ok(provider) => provider,
1363 Err(
1364 ProviderError::MissingStaticFileBlock(_, _) |
1365 ProviderError::MissingStaticFileTx(_, _),
1366 ) => return Ok(result),
1367 Err(err) => return Err(err),
1368 }
1369 }};
1370 }
1371
1372 let mut provider = get_provider!(range.start);
1373 let mut cursor = provider.cursor()?;
1374
1375 'outer: for number in range {
1377 let mut retrying = false;
1381
1382 'inner: loop {
1384 match get_fn(&mut cursor, number)? {
1385 Some(res) => {
1386 if !predicate(&res) {
1387 break 'outer
1388 }
1389 result.push(res);
1390 break 'inner
1391 }
1392 None => {
1393 if retrying {
1394 return Ok(result)
1395 }
1396 drop(cursor);
1401 drop(provider);
1402 provider = get_provider!(number);
1403 cursor = provider.cursor()?;
1404 retrying = true;
1405 }
1406 }
1407 }
1408 }
1409
1410 Ok(result)
1411 }
1412
1413 pub fn fetch_range_iter<'a, T, F>(
1417 &'a self,
1418 segment: StaticFileSegment,
1419 range: Range<u64>,
1420 get_fn: F,
1421 ) -> ProviderResult<impl Iterator<Item = ProviderResult<T>> + 'a>
1422 where
1423 F: Fn(&mut StaticFileCursor<'_>, u64) -> ProviderResult<Option<T>> + 'a,
1424 T: std::fmt::Debug,
1425 {
1426 let mut provider = Some(self.get_segment_provider(segment, range.start)?);
1427 Ok(range.filter_map(move |number| {
1428 match get_fn(&mut provider.as_ref().expect("qed").cursor().ok()?, number).transpose() {
1429 Some(result) => Some(result),
1430 None => {
1431 provider.take();
1436 provider = Some(self.get_segment_provider(segment, number).ok()?);
1437 get_fn(&mut provider.as_ref().expect("qed").cursor().ok()?, number).transpose()
1438 }
1439 }
1440 }))
1441 }
1442
1443 pub fn directory(&self) -> &Path {
1445 &self.path
1446 }
1447
1448 pub fn get_with_static_file_or_database<T, FS, FD>(
1458 &self,
1459 segment: StaticFileSegment,
1460 number: u64,
1461 fetch_from_static_file: FS,
1462 fetch_from_database: FD,
1463 ) -> ProviderResult<Option<T>>
1464 where
1465 FS: Fn(&Self) -> ProviderResult<Option<T>>,
1466 FD: Fn() -> ProviderResult<Option<T>>,
1467 {
1468 let static_file_upper_bound = if segment.is_block_based() {
1470 self.get_highest_static_file_block(segment)
1471 } else {
1472 self.get_highest_static_file_tx(segment)
1473 };
1474
1475 if static_file_upper_bound
1476 .is_some_and(|static_file_upper_bound| static_file_upper_bound >= number)
1477 {
1478 return fetch_from_static_file(self)
1479 }
1480 fetch_from_database()
1481 }
1482
1483 pub fn get_range_with_static_file_or_database<T, P, FS, FD>(
1495 &self,
1496 segment: StaticFileSegment,
1497 mut block_or_tx_range: Range<u64>,
1498 fetch_from_static_file: FS,
1499 mut fetch_from_database: FD,
1500 mut predicate: P,
1501 ) -> ProviderResult<Vec<T>>
1502 where
1503 FS: Fn(&Self, Range<u64>, &mut P) -> ProviderResult<Vec<T>>,
1504 FD: FnMut(Range<u64>, P) -> ProviderResult<Vec<T>>,
1505 P: FnMut(&T) -> bool,
1506 {
1507 let mut data = Vec::new();
1508
1509 if let Some(static_file_upper_bound) = if segment.is_block_based() {
1511 self.get_highest_static_file_block(segment)
1512 } else {
1513 self.get_highest_static_file_tx(segment)
1514 } && block_or_tx_range.start <= static_file_upper_bound
1515 {
1516 let end = block_or_tx_range.end.min(static_file_upper_bound + 1);
1517 data.extend(fetch_from_static_file(
1518 self,
1519 block_or_tx_range.start..end,
1520 &mut predicate,
1521 )?);
1522 block_or_tx_range.start = end;
1523 }
1524
1525 if block_or_tx_range.end > block_or_tx_range.start {
1526 data.extend(fetch_from_database(block_or_tx_range, predicate)?)
1527 }
1528
1529 Ok(data)
1530 }
1531
1532 #[cfg(any(test, feature = "test-utils"))]
1534 pub fn path(&self) -> &Path {
1535 &self.path
1536 }
1537
1538 #[cfg(any(test, feature = "test-utils"))]
1540 pub fn tx_index(&self) -> &RwLock<SegmentRanges> {
1541 &self.static_files_tx_index
1542 }
1543
1544 #[cfg(any(test, feature = "test-utils"))]
1546 pub fn expected_block_index(&self) -> &RwLock<SegmentRanges> {
1547 &self.static_files_expected_block_index
1548 }
1549}
1550
1551pub trait StaticFileWriter {
1553 type Primitives: Send + Sync + 'static;
1555
1556 fn get_writer(
1558 &self,
1559 block: BlockNumber,
1560 segment: StaticFileSegment,
1561 ) -> ProviderResult<StaticFileProviderRWRefMut<'_, Self::Primitives>>;
1562
1563 fn latest_writer(
1566 &self,
1567 segment: StaticFileSegment,
1568 ) -> ProviderResult<StaticFileProviderRWRefMut<'_, Self::Primitives>>;
1569
1570 fn commit(&self) -> ProviderResult<()>;
1572
1573 fn has_unwind_queued(&self) -> bool;
1575}
1576
1577impl<N: NodePrimitives> StaticFileWriter for StaticFileProvider<N> {
1578 type Primitives = N;
1579
1580 fn get_writer(
1581 &self,
1582 block: BlockNumber,
1583 segment: StaticFileSegment,
1584 ) -> ProviderResult<StaticFileProviderRWRefMut<'_, Self::Primitives>> {
1585 if self.access.is_read_only() {
1586 return Err(ProviderError::ReadOnlyStaticFileAccess)
1587 }
1588
1589 trace!(target: "provider::static_file", ?block, ?segment, "Getting static file writer.");
1590 self.writers.get_or_create(segment, || {
1591 StaticFileProviderRW::new(segment, block, Arc::downgrade(&self.0), self.metrics.clone())
1592 })
1593 }
1594
1595 fn latest_writer(
1596 &self,
1597 segment: StaticFileSegment,
1598 ) -> ProviderResult<StaticFileProviderRWRefMut<'_, Self::Primitives>> {
1599 self.get_writer(self.get_highest_static_file_block(segment).unwrap_or_default(), segment)
1600 }
1601
1602 fn commit(&self) -> ProviderResult<()> {
1603 self.writers.commit()
1604 }
1605
1606 fn has_unwind_queued(&self) -> bool {
1607 self.writers.has_unwind_queued()
1608 }
1609}
1610
1611impl<N: NodePrimitives<BlockHeader: Value>> HeaderProvider for StaticFileProvider<N> {
1612 type Header = N::BlockHeader;
1613
1614 fn header(&self, block_hash: BlockHash) -> ProviderResult<Option<Self::Header>> {
1615 self.find_static_file(StaticFileSegment::Headers, |jar_provider| {
1616 Ok(jar_provider
1617 .cursor()?
1618 .get_two::<HeaderWithHashMask<Self::Header>>((&block_hash).into())?
1619 .and_then(|(header, hash)| {
1620 if hash == block_hash {
1621 return Some(header)
1622 }
1623 None
1624 }))
1625 })
1626 }
1627
1628 fn header_by_number(&self, num: BlockNumber) -> ProviderResult<Option<Self::Header>> {
1629 self.get_segment_provider_for_block(StaticFileSegment::Headers, num, None)
1630 .and_then(|provider| provider.header_by_number(num))
1631 .or_else(|err| {
1632 if let ProviderError::MissingStaticFileBlock(_, _) = err {
1633 Ok(None)
1634 } else {
1635 Err(err)
1636 }
1637 })
1638 }
1639
1640 fn headers_range(
1641 &self,
1642 range: impl RangeBounds<BlockNumber>,
1643 ) -> ProviderResult<Vec<Self::Header>> {
1644 self.fetch_range_with_predicate(
1645 StaticFileSegment::Headers,
1646 to_range(range),
1647 |cursor, number| cursor.get_one::<HeaderMask<Self::Header>>(number.into()),
1648 |_| true,
1649 )
1650 }
1651
1652 fn sealed_header(
1653 &self,
1654 num: BlockNumber,
1655 ) -> ProviderResult<Option<SealedHeader<Self::Header>>> {
1656 self.get_segment_provider_for_block(StaticFileSegment::Headers, num, None)
1657 .and_then(|provider| provider.sealed_header(num))
1658 .or_else(|err| {
1659 if let ProviderError::MissingStaticFileBlock(_, _) = err {
1660 Ok(None)
1661 } else {
1662 Err(err)
1663 }
1664 })
1665 }
1666
1667 fn sealed_headers_while(
1668 &self,
1669 range: impl RangeBounds<BlockNumber>,
1670 predicate: impl FnMut(&SealedHeader<Self::Header>) -> bool,
1671 ) -> ProviderResult<Vec<SealedHeader<Self::Header>>> {
1672 self.fetch_range_with_predicate(
1673 StaticFileSegment::Headers,
1674 to_range(range),
1675 |cursor, number| {
1676 Ok(cursor
1677 .get_two::<HeaderWithHashMask<Self::Header>>(number.into())?
1678 .map(|(header, hash)| SealedHeader::new(header, hash)))
1679 },
1680 predicate,
1681 )
1682 }
1683}
1684
1685impl<N: NodePrimitives> BlockHashReader for StaticFileProvider<N> {
1686 fn block_hash(&self, num: u64) -> ProviderResult<Option<B256>> {
1687 self.get_segment_provider_for_block(StaticFileSegment::Headers, num, None)
1688 .and_then(|provider| provider.block_hash(num))
1689 .or_else(|err| {
1690 if let ProviderError::MissingStaticFileBlock(_, _) = err {
1691 Ok(None)
1692 } else {
1693 Err(err)
1694 }
1695 })
1696 }
1697
1698 fn canonical_hashes_range(
1699 &self,
1700 start: BlockNumber,
1701 end: BlockNumber,
1702 ) -> ProviderResult<Vec<B256>> {
1703 self.fetch_range_with_predicate(
1704 StaticFileSegment::Headers,
1705 start..end,
1706 |cursor, number| cursor.get_one::<BlockHashMask>(number.into()),
1707 |_| true,
1708 )
1709 }
1710}
1711
1712impl<N: NodePrimitives<SignedTx: Value + SignedTransaction, Receipt: Value>> ReceiptProvider
1713 for StaticFileProvider<N>
1714{
1715 type Receipt = N::Receipt;
1716
1717 fn receipt(&self, num: TxNumber) -> ProviderResult<Option<Self::Receipt>> {
1718 self.get_segment_provider_for_transaction(StaticFileSegment::Receipts, num, None)
1719 .and_then(|provider| provider.receipt(num))
1720 .or_else(|err| {
1721 if let ProviderError::MissingStaticFileTx(_, _) = err {
1722 Ok(None)
1723 } else {
1724 Err(err)
1725 }
1726 })
1727 }
1728
1729 fn receipt_by_hash(&self, hash: TxHash) -> ProviderResult<Option<Self::Receipt>> {
1730 if let Some(num) = self.transaction_id(hash)? {
1731 return self.receipt(num)
1732 }
1733 Ok(None)
1734 }
1735
1736 fn receipts_by_block(
1737 &self,
1738 _block: BlockHashOrNumber,
1739 ) -> ProviderResult<Option<Vec<Self::Receipt>>> {
1740 unreachable!()
1741 }
1742
1743 fn receipts_by_tx_range(
1744 &self,
1745 range: impl RangeBounds<TxNumber>,
1746 ) -> ProviderResult<Vec<Self::Receipt>> {
1747 self.fetch_range_with_predicate(
1748 StaticFileSegment::Receipts,
1749 to_range(range),
1750 |cursor, number| cursor.get_one::<ReceiptMask<Self::Receipt>>(number.into()),
1751 |_| true,
1752 )
1753 }
1754
1755 fn receipts_by_block_range(
1756 &self,
1757 _block_range: RangeInclusive<BlockNumber>,
1758 ) -> ProviderResult<Vec<Vec<Self::Receipt>>> {
1759 Err(ProviderError::UnsupportedProvider)
1760 }
1761}
1762
1763impl<N: NodePrimitives<SignedTx: Value, Receipt: Value, BlockHeader: Value>> TransactionsProviderExt
1764 for StaticFileProvider<N>
1765{
1766 fn transaction_hashes_by_range(
1767 &self,
1768 tx_range: Range<TxNumber>,
1769 ) -> ProviderResult<Vec<(TxHash, TxNumber)>> {
1770 let tx_range_size = (tx_range.end - tx_range.start) as usize;
1771
1772 let chunk_size = 100;
1776
1777 let chunks = tx_range
1779 .clone()
1780 .step_by(chunk_size)
1781 .map(|start| start..std::cmp::min(start + chunk_size as u64, tx_range.end));
1782 let mut channels = Vec::with_capacity(tx_range_size.div_ceil(chunk_size));
1783
1784 for chunk_range in chunks {
1785 let (channel_tx, channel_rx) = mpsc::channel();
1786 channels.push(channel_rx);
1787
1788 let manager = self.clone();
1789
1790 rayon::spawn(move || {
1794 let mut rlp_buf = Vec::with_capacity(128);
1795 let _ = manager.fetch_range_with_predicate(
1796 StaticFileSegment::Transactions,
1797 chunk_range,
1798 |cursor, number| {
1799 Ok(cursor
1800 .get_one::<TransactionMask<Self::Transaction>>(number.into())?
1801 .map(|transaction| {
1802 rlp_buf.clear();
1803 let _ = channel_tx
1804 .send(calculate_hash((number, transaction), &mut rlp_buf));
1805 }))
1806 },
1807 |_| true,
1808 );
1809 });
1810 }
1811
1812 let mut tx_list = Vec::with_capacity(tx_range_size);
1813
1814 for channel in channels {
1816 while let Ok(tx) = channel.recv() {
1817 let (tx_hash, tx_id) = tx.map_err(|boxed| *boxed)?;
1818 tx_list.push((tx_hash, tx_id));
1819 }
1820 }
1821
1822 Ok(tx_list)
1823 }
1824}
1825
1826impl<N: NodePrimitives<SignedTx: Decompress + SignedTransaction>> TransactionsProvider
1827 for StaticFileProvider<N>
1828{
1829 type Transaction = N::SignedTx;
1830
1831 fn transaction_id(&self, tx_hash: TxHash) -> ProviderResult<Option<TxNumber>> {
1832 self.find_static_file(StaticFileSegment::Transactions, |jar_provider| {
1833 let mut cursor = jar_provider.cursor()?;
1834 if cursor
1835 .get_one::<TransactionMask<Self::Transaction>>((&tx_hash).into())?
1836 .and_then(|tx| (tx.trie_hash() == tx_hash).then_some(tx))
1837 .is_some()
1838 {
1839 Ok(cursor.number())
1840 } else {
1841 Ok(None)
1842 }
1843 })
1844 }
1845
1846 fn transaction_by_id(&self, num: TxNumber) -> ProviderResult<Option<Self::Transaction>> {
1847 self.get_segment_provider_for_transaction(StaticFileSegment::Transactions, num, None)
1848 .and_then(|provider| provider.transaction_by_id(num))
1849 .or_else(|err| {
1850 if let ProviderError::MissingStaticFileTx(_, _) = err {
1851 Ok(None)
1852 } else {
1853 Err(err)
1854 }
1855 })
1856 }
1857
1858 fn transaction_by_id_unhashed(
1859 &self,
1860 num: TxNumber,
1861 ) -> ProviderResult<Option<Self::Transaction>> {
1862 self.get_segment_provider_for_transaction(StaticFileSegment::Transactions, num, None)
1863 .and_then(|provider| provider.transaction_by_id_unhashed(num))
1864 .or_else(|err| {
1865 if let ProviderError::MissingStaticFileTx(_, _) = err {
1866 Ok(None)
1867 } else {
1868 Err(err)
1869 }
1870 })
1871 }
1872
1873 fn transaction_by_hash(&self, hash: TxHash) -> ProviderResult<Option<Self::Transaction>> {
1874 self.find_static_file(StaticFileSegment::Transactions, |jar_provider| {
1875 Ok(jar_provider
1876 .cursor()?
1877 .get_one::<TransactionMask<Self::Transaction>>((&hash).into())?
1878 .and_then(|tx| (tx.trie_hash() == hash).then_some(tx)))
1879 })
1880 }
1881
1882 fn transaction_by_hash_with_meta(
1883 &self,
1884 _hash: TxHash,
1885 ) -> ProviderResult<Option<(Self::Transaction, TransactionMeta)>> {
1886 Err(ProviderError::UnsupportedProvider)
1888 }
1889
1890 fn transactions_by_block(
1891 &self,
1892 _block_id: BlockHashOrNumber,
1893 ) -> ProviderResult<Option<Vec<Self::Transaction>>> {
1894 Err(ProviderError::UnsupportedProvider)
1896 }
1897
1898 fn transactions_by_block_range(
1899 &self,
1900 _range: impl RangeBounds<BlockNumber>,
1901 ) -> ProviderResult<Vec<Vec<Self::Transaction>>> {
1902 Err(ProviderError::UnsupportedProvider)
1904 }
1905
1906 fn transactions_by_tx_range(
1907 &self,
1908 range: impl RangeBounds<TxNumber>,
1909 ) -> ProviderResult<Vec<Self::Transaction>> {
1910 self.fetch_range_with_predicate(
1911 StaticFileSegment::Transactions,
1912 to_range(range),
1913 |cursor, number| cursor.get_one::<TransactionMask<Self::Transaction>>(number.into()),
1914 |_| true,
1915 )
1916 }
1917
1918 fn senders_by_tx_range(
1919 &self,
1920 range: impl RangeBounds<TxNumber>,
1921 ) -> ProviderResult<Vec<Address>> {
1922 let txes = self.transactions_by_tx_range(range)?;
1923 Ok(reth_primitives_traits::transaction::recover::recover_signers(&txes)?)
1924 }
1925
1926 fn transaction_sender(&self, id: TxNumber) -> ProviderResult<Option<Address>> {
1927 match self.transaction_by_id_unhashed(id)? {
1928 Some(tx) => Ok(tx.recover_signer().ok()),
1929 None => Ok(None),
1930 }
1931 }
1932}
1933
1934impl<N: NodePrimitives> BlockNumReader for StaticFileProvider<N> {
1935 fn chain_info(&self) -> ProviderResult<ChainInfo> {
1936 Err(ProviderError::UnsupportedProvider)
1938 }
1939
1940 fn best_block_number(&self) -> ProviderResult<BlockNumber> {
1941 Err(ProviderError::UnsupportedProvider)
1943 }
1944
1945 fn last_block_number(&self) -> ProviderResult<BlockNumber> {
1946 Ok(self.get_highest_static_file_block(StaticFileSegment::Headers).unwrap_or_default())
1947 }
1948
1949 fn block_number(&self, _hash: B256) -> ProviderResult<Option<BlockNumber>> {
1950 Err(ProviderError::UnsupportedProvider)
1952 }
1953}
1954
1955impl<N: NodePrimitives<SignedTx: Value, Receipt: Value, BlockHeader: Value>> BlockReader
1958 for StaticFileProvider<N>
1959{
1960 type Block = N::Block;
1961
1962 fn find_block_by_hash(
1963 &self,
1964 _hash: B256,
1965 _source: BlockSource,
1966 ) -> ProviderResult<Option<Self::Block>> {
1967 Err(ProviderError::UnsupportedProvider)
1969 }
1970
1971 fn block(&self, _id: BlockHashOrNumber) -> ProviderResult<Option<Self::Block>> {
1972 Err(ProviderError::UnsupportedProvider)
1974 }
1975
1976 fn pending_block(&self) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
1977 Err(ProviderError::UnsupportedProvider)
1979 }
1980
1981 fn pending_block_and_receipts(
1982 &self,
1983 ) -> ProviderResult<Option<(RecoveredBlock<Self::Block>, Vec<Self::Receipt>)>> {
1984 Err(ProviderError::UnsupportedProvider)
1986 }
1987
1988 fn recovered_block(
1989 &self,
1990 _id: BlockHashOrNumber,
1991 _transaction_kind: TransactionVariant,
1992 ) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
1993 Err(ProviderError::UnsupportedProvider)
1995 }
1996
1997 fn sealed_block_with_senders(
1998 &self,
1999 _id: BlockHashOrNumber,
2000 _transaction_kind: TransactionVariant,
2001 ) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
2002 Err(ProviderError::UnsupportedProvider)
2004 }
2005
2006 fn block_range(&self, _range: RangeInclusive<BlockNumber>) -> ProviderResult<Vec<Self::Block>> {
2007 Err(ProviderError::UnsupportedProvider)
2009 }
2010
2011 fn block_with_senders_range(
2012 &self,
2013 _range: RangeInclusive<BlockNumber>,
2014 ) -> ProviderResult<Vec<RecoveredBlock<Self::Block>>> {
2015 Err(ProviderError::UnsupportedProvider)
2016 }
2017
2018 fn recovered_block_range(
2019 &self,
2020 _range: RangeInclusive<BlockNumber>,
2021 ) -> ProviderResult<Vec<RecoveredBlock<Self::Block>>> {
2022 Err(ProviderError::UnsupportedProvider)
2023 }
2024
2025 fn block_by_transaction_id(&self, _id: TxNumber) -> ProviderResult<Option<BlockNumber>> {
2026 Err(ProviderError::UnsupportedProvider)
2027 }
2028}
2029
2030impl<N: NodePrimitives> BlockBodyIndicesProvider for StaticFileProvider<N> {
2031 fn block_body_indices(&self, _num: u64) -> ProviderResult<Option<StoredBlockBodyIndices>> {
2032 Err(ProviderError::UnsupportedProvider)
2033 }
2034
2035 fn block_body_indices_range(
2036 &self,
2037 _range: RangeInclusive<BlockNumber>,
2038 ) -> ProviderResult<Vec<StoredBlockBodyIndices>> {
2039 Err(ProviderError::UnsupportedProvider)
2040 }
2041}
2042
2043impl<N: NodePrimitives> StatsReader for StaticFileProvider<N> {
2044 fn count_entries<T: Table>(&self) -> ProviderResult<usize> {
2045 match T::NAME {
2046 tables::CanonicalHeaders::NAME |
2047 tables::Headers::<Header>::NAME |
2048 tables::HeaderTerminalDifficulties::NAME => Ok(self
2049 .get_highest_static_file_block(StaticFileSegment::Headers)
2050 .map(|block| block + 1)
2051 .unwrap_or_default()
2052 as usize),
2053 tables::Receipts::<Receipt>::NAME => Ok(self
2054 .get_highest_static_file_tx(StaticFileSegment::Receipts)
2055 .map(|receipts| receipts + 1)
2056 .unwrap_or_default() as usize),
2057 tables::Transactions::<TransactionSigned>::NAME => Ok(self
2058 .get_highest_static_file_tx(StaticFileSegment::Transactions)
2059 .map(|txs| txs + 1)
2060 .unwrap_or_default()
2061 as usize),
2062 _ => Err(ProviderError::UnsupportedProvider),
2063 }
2064 }
2065}
2066
2067#[inline]
2069fn calculate_hash<T>(
2070 entry: (TxNumber, T),
2071 rlp_buf: &mut Vec<u8>,
2072) -> Result<(B256, TxNumber), Box<ProviderError>>
2073where
2074 T: Encodable2718,
2075{
2076 let (tx_id, tx) = entry;
2077 tx.encode_2718(rlp_buf);
2078 Ok((keccak256(rlp_buf), tx_id))
2079}
2080
2081#[cfg(test)]
2082mod tests {
2083 use std::collections::BTreeMap;
2084
2085 use reth_chain_state::EthPrimitives;
2086 use reth_db::test_utils::create_test_static_files_dir;
2087 use reth_static_file_types::{SegmentRangeInclusive, StaticFileSegment};
2088
2089 use crate::StaticFileProviderBuilder;
2090
2091 #[test]
2092 fn test_find_fixed_range_with_block_index() -> eyre::Result<()> {
2093 let (static_dir, _) = create_test_static_files_dir();
2094 let sf_rw = StaticFileProviderBuilder::<EthPrimitives>::read_write(&static_dir)?
2095 .with_blocks_per_file(100)
2096 .build()?;
2097
2098 let segment = StaticFileSegment::Headers;
2099
2100 assert_eq!(
2102 sf_rw.find_fixed_range_with_block_index(segment, None, 0),
2103 SegmentRangeInclusive::new(0, 99)
2104 );
2105 assert_eq!(
2106 sf_rw.find_fixed_range_with_block_index(segment, None, 250),
2107 SegmentRangeInclusive::new(200, 299)
2108 );
2109
2110 assert_eq!(
2112 sf_rw.find_fixed_range_with_block_index(segment, Some(&BTreeMap::new()), 150),
2113 SegmentRangeInclusive::new(100, 199)
2114 );
2115
2116 let block_index = BTreeMap::from_iter([
2118 (99, SegmentRangeInclusive::new(0, 99)),
2119 (199, SegmentRangeInclusive::new(100, 199)),
2120 (299, SegmentRangeInclusive::new(200, 299)),
2121 ]);
2122
2123 assert_eq!(
2125 sf_rw.find_fixed_range_with_block_index(segment, Some(&block_index), 0),
2126 SegmentRangeInclusive::new(0, 99)
2127 );
2128 assert_eq!(
2129 sf_rw.find_fixed_range_with_block_index(segment, Some(&block_index), 50),
2130 SegmentRangeInclusive::new(0, 99)
2131 );
2132 assert_eq!(
2133 sf_rw.find_fixed_range_with_block_index(segment, Some(&block_index), 99),
2134 SegmentRangeInclusive::new(0, 99)
2135 );
2136 assert_eq!(
2137 sf_rw.find_fixed_range_with_block_index(segment, Some(&block_index), 100),
2138 SegmentRangeInclusive::new(100, 199)
2139 );
2140 assert_eq!(
2141 sf_rw.find_fixed_range_with_block_index(segment, Some(&block_index), 150),
2142 SegmentRangeInclusive::new(100, 199)
2143 );
2144 assert_eq!(
2145 sf_rw.find_fixed_range_with_block_index(segment, Some(&block_index), 199),
2146 SegmentRangeInclusive::new(100, 199)
2147 );
2148
2149 assert_eq!(
2152 sf_rw.find_fixed_range_with_block_index(segment, Some(&block_index), 300),
2153 SegmentRangeInclusive::new(300, 399)
2154 );
2155 assert_eq!(
2156 sf_rw.find_fixed_range_with_block_index(segment, Some(&block_index), 350),
2157 SegmentRangeInclusive::new(300, 399)
2158 );
2159
2160 assert_eq!(
2162 sf_rw.find_fixed_range_with_block_index(segment, Some(&block_index), 500),
2163 SegmentRangeInclusive::new(500, 599)
2164 );
2165
2166 assert_eq!(
2168 sf_rw.find_fixed_range_with_block_index(segment, Some(&block_index), 1000),
2169 SegmentRangeInclusive::new(1000, 1099)
2170 );
2171
2172 let mixed_size_index = BTreeMap::from_iter([
2175 (49, SegmentRangeInclusive::new(0, 49)), (149, SegmentRangeInclusive::new(50, 149)), (349, SegmentRangeInclusive::new(150, 349)), ]);
2179
2180 assert_eq!(
2182 sf_rw.find_fixed_range_with_block_index(segment, Some(&mixed_size_index), 25),
2183 SegmentRangeInclusive::new(0, 49)
2184 );
2185 assert_eq!(
2186 sf_rw.find_fixed_range_with_block_index(segment, Some(&mixed_size_index), 100),
2187 SegmentRangeInclusive::new(50, 149)
2188 );
2189 assert_eq!(
2190 sf_rw.find_fixed_range_with_block_index(segment, Some(&mixed_size_index), 200),
2191 SegmentRangeInclusive::new(150, 349)
2192 );
2193
2194 assert_eq!(
2197 sf_rw.find_fixed_range_with_block_index(segment, Some(&mixed_size_index), 350),
2198 SegmentRangeInclusive::new(350, 449)
2199 );
2200 assert_eq!(
2201 sf_rw.find_fixed_range_with_block_index(segment, Some(&mixed_size_index), 450),
2202 SegmentRangeInclusive::new(450, 549)
2203 );
2204 assert_eq!(
2205 sf_rw.find_fixed_range_with_block_index(segment, Some(&mixed_size_index), 550),
2206 SegmentRangeInclusive::new(550, 649)
2207 );
2208
2209 Ok(())
2210 }
2211}