1use super::{
2 metrics::StaticFileProviderMetrics, writer::StaticFileWriters, LoadedJar,
3 StaticFileJarProvider, StaticFileProviderRW, StaticFileProviderRWRefMut,
4};
5use crate::{
6 to_range, BlockHashReader, BlockNumReader, BlockReader, BlockSource, EitherWriter,
7 EitherWriterDestination, HeaderProvider, ReceiptProvider, StageCheckpointReader, StatsReader,
8 TransactionVariant, TransactionsProvider, TransactionsProviderExt,
9};
10use alloy_consensus::{transaction::TransactionMeta, Header};
11use alloy_eips::{eip2718::Encodable2718, BlockHashOrNumber};
12use alloy_primitives::{b256, keccak256, Address, BlockHash, BlockNumber, TxHash, TxNumber, B256};
13use dashmap::DashMap;
14use notify::{RecommendedWatcher, RecursiveMode, Watcher};
15use parking_lot::RwLock;
16use reth_chainspec::{ChainInfo, ChainSpecProvider, EthChainSpec, NamedChain};
17use reth_db::{
18 lockfile::StorageLock,
19 static_file::{
20 iter_static_files, BlockHashMask, HeaderMask, HeaderWithHashMask, ReceiptMask,
21 StaticFileCursor, TransactionMask, TransactionSenderMask,
22 },
23};
24use reth_db_api::{
25 cursor::DbCursorRO,
26 models::StoredBlockBodyIndices,
27 table::{Decompress, Table, Value},
28 tables,
29 transaction::DbTx,
30};
31use reth_ethereum_primitives::{Receipt, TransactionSigned};
32use reth_nippy_jar::{NippyJar, NippyJarChecker, CONFIG_FILE_EXTENSION};
33use reth_node_types::NodePrimitives;
34use reth_primitives_traits::{RecoveredBlock, SealedHeader, SignedTransaction};
35use reth_stages_types::{PipelineTarget, StageId};
36use reth_static_file_types::{
37 find_fixed_range, HighestStaticFiles, SegmentHeader, SegmentRangeInclusive, StaticFileSegment,
38 DEFAULT_BLOCKS_PER_STATIC_FILE,
39};
40use reth_storage_api::{BlockBodyIndicesProvider, DBProvider, StorageSettingsCache};
41use reth_storage_errors::provider::{ProviderError, ProviderResult};
42use std::{
43 collections::{BTreeMap, HashMap},
44 fmt::Debug,
45 ops::{Deref, Range, RangeBounds, RangeInclusive},
46 path::{Path, PathBuf},
47 sync::{atomic::AtomicU64, mpsc, Arc},
48};
49use tracing::{debug, info, trace, warn};
50
51type SegmentRanges = BTreeMap<u64, SegmentRangeInclusive>;
54
55#[derive(Debug, Default, PartialEq, Eq)]
57pub enum StaticFileAccess {
58 #[default]
60 RO,
61 RW,
63}
64
65impl StaticFileAccess {
66 pub const fn is_read_only(&self) -> bool {
68 matches!(self, Self::RO)
69 }
70
71 pub const fn is_read_write(&self) -> bool {
73 matches!(self, Self::RW)
74 }
75}
76
77#[derive(Debug)]
86pub struct StaticFileProvider<N>(pub(crate) Arc<StaticFileProviderInner<N>>);
87
88impl<N> Clone for StaticFileProvider<N> {
89 fn clone(&self) -> Self {
90 Self(self.0.clone())
91 }
92}
93
94#[derive(Debug)]
96pub struct StaticFileProviderBuilder<N> {
97 inner: StaticFileProviderInner<N>,
98}
99
100impl<N: NodePrimitives> StaticFileProviderBuilder<N> {
101 pub fn read_write(path: impl AsRef<Path>) -> ProviderResult<Self> {
103 StaticFileProviderInner::new(path, StaticFileAccess::RW).map(|inner| Self { inner })
104 }
105
106 pub fn read_only(path: impl AsRef<Path>) -> ProviderResult<Self> {
108 StaticFileProviderInner::new(path, StaticFileAccess::RO).map(|inner| Self { inner })
109 }
110
111 pub fn with_blocks_per_file_for_segments(
123 mut self,
124 segments: HashMap<StaticFileSegment, u64>,
125 ) -> Self {
126 self.inner.blocks_per_file.extend(segments);
127 self
128 }
129
130 pub fn with_blocks_per_file(mut self, blocks_per_file: u64) -> Self {
132 for segment in StaticFileSegment::iter() {
133 self.inner.blocks_per_file.insert(segment, blocks_per_file);
134 }
135 self
136 }
137
138 pub fn with_blocks_per_file_for_segment(
140 mut self,
141 segment: StaticFileSegment,
142 blocks_per_file: u64,
143 ) -> Self {
144 self.inner.blocks_per_file.insert(segment, blocks_per_file);
145 self
146 }
147
148 pub fn with_metrics(mut self) -> Self {
150 self.inner.metrics = Some(Arc::new(StaticFileProviderMetrics::default()));
151 self
152 }
153
154 pub fn build(self) -> ProviderResult<StaticFileProvider<N>> {
156 let provider = StaticFileProvider(Arc::new(self.inner));
157 provider.initialize_index()?;
158 Ok(provider)
159 }
160}
161
162impl<N: NodePrimitives> StaticFileProvider<N> {
163 pub fn read_only(path: impl AsRef<Path>, watch_directory: bool) -> ProviderResult<Self> {
173 let provider = StaticFileProviderBuilder::read_only(path)?.build()?;
174
175 if watch_directory {
176 provider.watch_directory();
177 }
178
179 Ok(provider)
180 }
181
182 pub fn read_write(path: impl AsRef<Path>) -> ProviderResult<Self> {
184 StaticFileProviderBuilder::read_write(path)?.build()
185 }
186
187 pub fn watch_directory(&self) {
193 let provider = self.clone();
194 std::thread::spawn(move || {
195 let (tx, rx) = std::sync::mpsc::channel();
196 let mut watcher = RecommendedWatcher::new(
197 move |res| tx.send(res).unwrap(),
198 notify::Config::default(),
199 )
200 .expect("failed to create watcher");
201
202 watcher
203 .watch(&provider.path, RecursiveMode::NonRecursive)
204 .expect("failed to watch path");
205
206 let mut last_event_timestamp = None;
208
209 while let Ok(res) = rx.recv() {
210 match res {
211 Ok(event) => {
212 if !matches!(
214 event.kind,
215 notify::EventKind::Modify(_) |
216 notify::EventKind::Create(_) |
217 notify::EventKind::Remove(_)
218 ) {
219 continue
220 }
221
222 for segment in event.paths {
227 if segment
229 .extension()
230 .is_none_or(|s| s.to_str() != Some(CONFIG_FILE_EXTENSION))
231 {
232 continue
233 }
234
235 if StaticFileSegment::parse_filename(
237 &segment.file_stem().expect("qed").to_string_lossy(),
238 )
239 .is_none()
240 {
241 continue
242 }
243
244 if let Ok(current_modified_timestamp) =
247 std::fs::metadata(&segment).and_then(|m| m.modified())
248 {
249 if last_event_timestamp.is_some_and(|last_timestamp| {
250 last_timestamp >= current_modified_timestamp
251 }) {
252 continue
253 }
254 last_event_timestamp = Some(current_modified_timestamp);
255 }
256
257 info!(target: "providers::static_file", updated_file = ?segment.file_stem(), "re-initializing static file provider index");
258 if let Err(err) = provider.initialize_index() {
259 warn!(target: "providers::static_file", "failed to re-initialize index: {err}");
260 }
261 break
262 }
263 }
264
265 Err(err) => warn!(target: "providers::watcher", "watch error: {err:?}"),
266 }
267 }
268 });
269 }
270}
271
272impl<N: NodePrimitives> Deref for StaticFileProvider<N> {
273 type Target = StaticFileProviderInner<N>;
274
275 fn deref(&self) -> &Self::Target {
276 &self.0
277 }
278}
279
280#[derive(Debug)]
282pub struct StaticFileProviderInner<N> {
283 map: DashMap<(BlockNumber, StaticFileSegment), LoadedJar>,
286 indexes: RwLock<HashMap<StaticFileSegment, StaticFileSegmentIndex>>,
288 earliest_history_height: AtomicU64,
299 path: PathBuf,
301 writers: StaticFileWriters<N>,
303 metrics: Option<Arc<StaticFileProviderMetrics>>,
305 access: StaticFileAccess,
307 blocks_per_file: HashMap<StaticFileSegment, u64>,
309 _lock_file: Option<StorageLock>,
311}
312
313impl<N: NodePrimitives> StaticFileProviderInner<N> {
314 fn new(path: impl AsRef<Path>, access: StaticFileAccess) -> ProviderResult<Self> {
316 let _lock_file = if access.is_read_write() {
317 StorageLock::try_acquire(path.as_ref()).map_err(ProviderError::other)?.into()
318 } else {
319 None
320 };
321
322 let mut blocks_per_file = HashMap::new();
323 for segment in StaticFileSegment::iter() {
324 blocks_per_file.insert(segment, DEFAULT_BLOCKS_PER_STATIC_FILE);
325 }
326
327 let provider = Self {
328 map: Default::default(),
329 indexes: Default::default(),
330 writers: Default::default(),
331 earliest_history_height: Default::default(),
332 path: path.as_ref().to_path_buf(),
333 metrics: None,
334 access,
335 blocks_per_file,
336 _lock_file,
337 };
338
339 Ok(provider)
340 }
341
342 pub const fn is_read_only(&self) -> bool {
343 self.access.is_read_only()
344 }
345
346 pub fn find_fixed_range_with_block_index(
355 &self,
356 segment: StaticFileSegment,
357 block_index: Option<&SegmentRanges>,
358 block: BlockNumber,
359 ) -> SegmentRangeInclusive {
360 let blocks_per_file =
361 self.blocks_per_file.get(&segment).copied().unwrap_or(DEFAULT_BLOCKS_PER_STATIC_FILE);
362
363 if let Some(block_index) = block_index {
364 if let Some((_, range)) = block_index.iter().find(|(max_block, _)| block <= **max_block)
366 {
367 return *range
369 } else if let Some((_, range)) = block_index.last_key_value() {
370 let blocks_after_last_range = block - range.end();
376 let segments_to_skip = (blocks_after_last_range - 1) / blocks_per_file;
377 let start = range.end() + 1 + segments_to_skip * blocks_per_file;
378 return SegmentRangeInclusive::new(start, start + blocks_per_file - 1)
379 }
380 }
381 find_fixed_range(block, blocks_per_file)
384 }
385
386 pub fn find_fixed_range(
399 &self,
400 segment: StaticFileSegment,
401 block: BlockNumber,
402 ) -> SegmentRangeInclusive {
403 self.find_fixed_range_with_block_index(
404 segment,
405 self.indexes
406 .read()
407 .get(&segment)
408 .map(|index| &index.expected_block_ranges_by_max_block),
409 block,
410 )
411 }
412}
413
414impl<N: NodePrimitives> StaticFileProvider<N> {
415 pub fn report_metrics(&self) -> ProviderResult<()> {
417 let Some(metrics) = &self.metrics else { return Ok(()) };
418
419 let static_files = iter_static_files(&self.path).map_err(ProviderError::other)?;
420 for (segment, headers) in static_files {
421 let mut entries = 0;
422 let mut size = 0;
423
424 for (block_range, _) in &headers {
425 let fixed_block_range = self.find_fixed_range(segment, block_range.start());
426 let jar_provider = self
427 .get_segment_provider_for_range(segment, || Some(fixed_block_range), None)?
428 .ok_or_else(|| {
429 ProviderError::MissingStaticFileBlock(segment, block_range.start())
430 })?;
431
432 entries += jar_provider.rows();
433
434 let data_path = jar_provider.data_path().to_path_buf();
435 let index_path = jar_provider.index_path();
436 let offsets_path = jar_provider.offsets_path();
437 let config_path = jar_provider.config_path();
438
439 drop(jar_provider);
441
442 let data_size = reth_fs_util::metadata(data_path)
443 .map(|metadata| metadata.len())
444 .unwrap_or_default();
445 let index_size = reth_fs_util::metadata(index_path)
446 .map(|metadata| metadata.len())
447 .unwrap_or_default();
448 let offsets_size = reth_fs_util::metadata(offsets_path)
449 .map(|metadata| metadata.len())
450 .unwrap_or_default();
451 let config_size = reth_fs_util::metadata(config_path)
452 .map(|metadata| metadata.len())
453 .unwrap_or_default();
454
455 size += data_size + index_size + offsets_size + config_size;
456 }
457
458 metrics.record_segment(segment, size, headers.len(), entries);
459 }
460
461 Ok(())
462 }
463
464 pub fn get_segment_provider(
467 &self,
468 segment: StaticFileSegment,
469 number: u64,
470 ) -> ProviderResult<StaticFileJarProvider<'_, N>> {
471 if segment.is_block_based() {
472 self.get_segment_provider_for_block(segment, number, None)
473 } else {
474 self.get_segment_provider_for_transaction(segment, number, None)
475 }
476 }
477
478 pub fn get_maybe_segment_provider(
483 &self,
484 segment: StaticFileSegment,
485 number: u64,
486 ) -> ProviderResult<Option<StaticFileJarProvider<'_, N>>> {
487 let provider = if segment.is_block_based() {
488 self.get_segment_provider_for_block(segment, number, None)
489 } else {
490 self.get_segment_provider_for_transaction(segment, number, None)
491 };
492
493 match provider {
494 Ok(provider) => Ok(Some(provider)),
495 Err(
496 ProviderError::MissingStaticFileBlock(_, _) |
497 ProviderError::MissingStaticFileTx(_, _),
498 ) => Ok(None),
499 Err(err) => Err(err),
500 }
501 }
502
503 pub fn get_segment_provider_for_block(
505 &self,
506 segment: StaticFileSegment,
507 block: BlockNumber,
508 path: Option<&Path>,
509 ) -> ProviderResult<StaticFileJarProvider<'_, N>> {
510 self.get_segment_provider_for_range(
511 segment,
512 || self.get_segment_ranges_from_block(segment, block),
513 path,
514 )?
515 .ok_or(ProviderError::MissingStaticFileBlock(segment, block))
516 }
517
518 pub fn get_segment_provider_for_transaction(
520 &self,
521 segment: StaticFileSegment,
522 tx: TxNumber,
523 path: Option<&Path>,
524 ) -> ProviderResult<StaticFileJarProvider<'_, N>> {
525 self.get_segment_provider_for_range(
526 segment,
527 || self.get_segment_ranges_from_transaction(segment, tx),
528 path,
529 )?
530 .ok_or(ProviderError::MissingStaticFileTx(segment, tx))
531 }
532
533 pub fn get_segment_provider_for_range(
537 &self,
538 segment: StaticFileSegment,
539 fn_range: impl Fn() -> Option<SegmentRangeInclusive>,
540 path: Option<&Path>,
541 ) -> ProviderResult<Option<StaticFileJarProvider<'_, N>>> {
542 let block_range = match path {
545 Some(path) => StaticFileSegment::parse_filename(
546 &path
547 .file_name()
548 .ok_or_else(|| {
549 ProviderError::MissingStaticFileSegmentPath(segment, path.to_path_buf())
550 })?
551 .to_string_lossy(),
552 )
553 .and_then(|(parsed_segment, block_range)| {
554 if parsed_segment == segment {
555 return Some(block_range)
556 }
557 None
558 }),
559 None => fn_range(),
560 };
561
562 if let Some(block_range) = block_range {
564 return Ok(Some(self.get_or_create_jar_provider(segment, &block_range)?))
565 }
566
567 Ok(None)
568 }
569
570 pub fn get_segment_provider_for_path(
572 &self,
573 path: &Path,
574 ) -> ProviderResult<Option<StaticFileJarProvider<'_, N>>> {
575 StaticFileSegment::parse_filename(
576 &path
577 .file_name()
578 .ok_or_else(|| ProviderError::MissingStaticFilePath(path.to_path_buf()))?
579 .to_string_lossy(),
580 )
581 .map(|(segment, block_range)| self.get_or_create_jar_provider(segment, &block_range))
582 .transpose()
583 }
584
585 pub fn remove_cached_provider(
589 &self,
590 segment: StaticFileSegment,
591 fixed_block_range_end: BlockNumber,
592 ) {
593 self.map.remove(&(fixed_block_range_end, segment));
594 }
595
596 pub fn delete_segment_below_block(
613 &self,
614 segment: StaticFileSegment,
615 block: BlockNumber,
616 ) -> ProviderResult<Vec<SegmentHeader>> {
617 if block == 0 {
619 return Ok(Vec::new())
620 }
621
622 let highest_block = self.get_highest_static_file_block(segment);
623 let mut deleted_headers = Vec::new();
624
625 loop {
626 let Some(block_height) = self.get_lowest_range_end(segment) else {
627 return Ok(deleted_headers)
628 };
629
630 if block_height >= block || Some(block_height) == highest_block {
632 return Ok(deleted_headers)
633 }
634
635 debug!(
636 target: "provider::static_file",
637 ?segment,
638 ?block_height,
639 "Deleting static file below block"
640 );
641
642 let header = self.delete_jar(segment, block_height).inspect_err(|err| {
645 warn!( target: "provider::static_file", ?segment, %block_height, ?err, "Failed to delete static file below block")
646 })?;
647
648 deleted_headers.push(header);
649 }
650 }
651
652 pub fn delete_jar(
660 &self,
661 segment: StaticFileSegment,
662 block: BlockNumber,
663 ) -> ProviderResult<SegmentHeader> {
664 let fixed_block_range = self.find_fixed_range(segment, block);
665 let key = (fixed_block_range.end(), segment);
666 let jar = if let Some((_, jar)) = self.map.remove(&key) {
667 jar.jar
668 } else {
669 let file = self.path.join(segment.filename(&fixed_block_range));
670 debug!(
671 target: "provider::static_file",
672 ?file,
673 ?fixed_block_range,
674 ?block,
675 "Loading static file jar for deletion"
676 );
677 NippyJar::<SegmentHeader>::load(&file).map_err(ProviderError::other)?
678 };
679
680 let header = *jar.user_header();
681 jar.delete().map_err(ProviderError::other)?;
682
683 self.initialize_index()?;
686
687 Ok(header)
688 }
689
690 fn get_or_create_jar_provider(
694 &self,
695 segment: StaticFileSegment,
696 fixed_block_range: &SegmentRangeInclusive,
697 ) -> ProviderResult<StaticFileJarProvider<'_, N>> {
698 let key = (fixed_block_range.end(), segment);
699
700 trace!(target: "provider::static_file", ?segment, ?fixed_block_range, "Getting provider");
702 let mut provider: StaticFileJarProvider<'_, N> = if let Some(jar) = self.map.get(&key) {
703 trace!(target: "provider::static_file", ?segment, ?fixed_block_range, "Jar found in cache");
704 jar.into()
705 } else {
706 trace!(target: "provider::static_file", ?segment, ?fixed_block_range, "Creating jar from scratch");
707 let path = self.path.join(segment.filename(fixed_block_range));
708 let jar = NippyJar::load(&path).map_err(ProviderError::other)?;
709 self.map.entry(key).insert(LoadedJar::new(jar)?).downgrade().into()
710 };
711
712 if let Some(metrics) = &self.metrics {
713 provider = provider.with_metrics(metrics.clone());
714 }
715 Ok(provider)
716 }
717
718 fn get_segment_ranges_from_block(
721 &self,
722 segment: StaticFileSegment,
723 block: u64,
724 ) -> Option<SegmentRangeInclusive> {
725 let indexes = self.indexes.read();
726 let index = indexes.get(&segment)?;
727
728 (index.max_block >= block).then(|| {
729 self.find_fixed_range_with_block_index(
730 segment,
731 Some(&index.expected_block_ranges_by_max_block),
732 block,
733 )
734 })
735 }
736
737 fn get_segment_ranges_from_transaction(
740 &self,
741 segment: StaticFileSegment,
742 tx: u64,
743 ) -> Option<SegmentRangeInclusive> {
744 let indexes = self.indexes.read();
745 let index = indexes.get(&segment)?;
746 let available_block_ranges_by_max_tx = index.available_block_ranges_by_max_tx.as_ref()?;
747
748 let mut static_files_rev_iter = available_block_ranges_by_max_tx.iter().rev().peekable();
751
752 while let Some((tx_end, block_range)) = static_files_rev_iter.next() {
753 if tx > *tx_end {
754 return None
756 }
757 let tx_start = static_files_rev_iter.peek().map(|(tx_end, _)| *tx_end + 1).unwrap_or(0);
758 if tx_start <= tx {
759 return Some(self.find_fixed_range_with_block_index(
760 segment,
761 Some(&index.expected_block_ranges_by_max_block),
762 block_range.end(),
763 ))
764 }
765 }
766 None
767 }
768
769 pub fn update_index(
776 &self,
777 segment: StaticFileSegment,
778 segment_max_block: Option<BlockNumber>,
779 ) -> ProviderResult<()> {
780 debug!(
781 target: "provider::static_file",
782 ?segment,
783 ?segment_max_block,
784 "Updating provider index"
785 );
786 let mut indexes = self.indexes.write();
787
788 match segment_max_block {
789 Some(segment_max_block) => {
790 let fixed_range = self.find_fixed_range_with_block_index(
791 segment,
792 indexes.get(&segment).map(|index| &index.expected_block_ranges_by_max_block),
793 segment_max_block,
794 );
795
796 let jar = NippyJar::<SegmentHeader>::load(
797 &self.path.join(segment.filename(&fixed_range)),
798 )
799 .map_err(ProviderError::other)?;
800
801 let index = indexes
802 .entry(segment)
803 .and_modify(|index| {
804 index.max_block = segment_max_block;
806
807 index
811 .expected_block_ranges_by_max_block
812 .retain(|_, block_range| block_range.start() < fixed_range.start());
813 index
815 .expected_block_ranges_by_max_block
816 .insert(fixed_range.end(), fixed_range);
817 })
818 .or_insert_with(|| StaticFileSegmentIndex {
819 min_block_range: None,
820 max_block: segment_max_block,
821 expected_block_ranges_by_max_block: BTreeMap::from([(
822 fixed_range.end(),
823 fixed_range,
824 )]),
825 available_block_ranges_by_max_tx: None,
826 });
827
828 if let Some(current_block_range) = jar.user_header().block_range() {
844 if let Some(min_block_range) = index.min_block_range.as_mut() {
845 if current_block_range.start() == min_block_range.start() {
848 *min_block_range = current_block_range;
849 }
850 } else {
851 index.min_block_range = Some(current_block_range);
852 }
853 }
854
855 if let Some(tx_range) = jar.user_header().tx_range() {
858 if let Some(current_block_range) = jar.user_header().block_range() {
861 let tx_end = tx_range.end();
862
863 if let Some(index) = index.available_block_ranges_by_max_tx.as_mut() {
872 index
873 .retain(|_, block_range| block_range.start() < fixed_range.start());
874 index.insert(tx_end, current_block_range);
875 } else {
876 index.available_block_ranges_by_max_tx =
877 Some(BTreeMap::from([(tx_end, current_block_range)]));
878 }
879 }
880 } else if segment.is_tx_based() {
881 if let Some(index) = index.available_block_ranges_by_max_tx.as_mut() {
885 index.retain(|_, block_range| block_range.start() < fixed_range.start());
886 }
887
888 index.available_block_ranges_by_max_tx.take_if(|index| index.is_empty());
890 }
891
892 debug!(target: "provider::static_file", ?segment, "Inserting updated jar into cache");
894 self.map.insert((fixed_range.end(), segment), LoadedJar::new(jar)?);
895
896 debug!(target: "provider::static_file", ?segment, "Cleaning up jar map");
898 self.map.retain(|(end, seg), _| !(*seg == segment && *end > fixed_range.end()));
899 }
900 None => {
901 debug!(target: "provider::static_file", ?segment, "Removing segment from index");
902 indexes.remove(&segment);
903 }
904 };
905
906 debug!(target: "provider::static_file", ?segment, "Updated provider index");
907 Ok(())
908 }
909
910 pub fn initialize_index(&self) -> ProviderResult<()> {
912 let mut indexes = self.indexes.write();
913 indexes.clear();
914
915 for (segment, headers) in iter_static_files(&self.path).map_err(ProviderError::other)? {
916 let min_block_range = Some(headers.first().expect("headers are not empty").0);
921 let max_block = headers.last().expect("headers are not empty").0.end();
922
923 let mut expected_block_ranges_by_max_block = BTreeMap::default();
924 let mut available_block_ranges_by_max_tx = None;
925
926 for (block_range, header) in headers {
927 expected_block_ranges_by_max_block
929 .insert(header.expected_block_end(), header.expected_block_range());
930
931 if let Some(tx_range) = header.tx_range() {
933 let tx_end = tx_range.end();
934
935 available_block_ranges_by_max_tx
936 .get_or_insert_with(BTreeMap::default)
937 .insert(tx_end, block_range);
938 }
939 }
940
941 indexes.insert(
942 segment,
943 StaticFileSegmentIndex {
944 min_block_range,
945 max_block,
946 expected_block_ranges_by_max_block,
947 available_block_ranges_by_max_tx,
948 },
949 );
950 }
951
952 self.map.clear();
954
955 if let Some(lowest_range) =
957 indexes.get(&StaticFileSegment::Transactions).and_then(|index| index.min_block_range)
958 {
959 self.earliest_history_height
961 .store(lowest_range.start(), std::sync::atomic::Ordering::Relaxed);
962 }
963
964 Ok(())
965 }
966
967 pub fn check_consistency<Provider>(
991 &self,
992 provider: &Provider,
993 ) -> ProviderResult<Option<PipelineTarget>>
994 where
995 Provider: DBProvider
996 + BlockReader
997 + StageCheckpointReader
998 + ChainSpecProvider
999 + StorageSettingsCache,
1000 N: NodePrimitives<Receipt: Value, BlockHeader: Value, SignedTx: Value>,
1001 {
1002 if provider.chain_spec().is_optimism() &&
1009 reth_chainspec::Chain::optimism_mainnet() == provider.chain_spec().chain_id()
1010 {
1011 const OVM_HEADER_1_HASH: B256 =
1013 b256!("0xbee7192e575af30420cae0c7776304ac196077ee72b048970549e4f08e875453");
1014 if provider.block_number(OVM_HEADER_1_HASH)?.is_some() {
1015 info!(target: "reth::cli",
1016 "Skipping storage verification for OP mainnet, expected inconsistency in OVM chain"
1017 );
1018 return Ok(None)
1019 }
1020 }
1021
1022 info!(target: "reth::cli", "Verifying storage consistency.");
1023
1024 let mut unwind_target: Option<BlockNumber> = None;
1025 let mut update_unwind_target = |new_target: BlockNumber| {
1026 if let Some(target) = unwind_target.as_mut() {
1027 *target = (*target).min(new_target);
1028 } else {
1029 unwind_target = Some(new_target);
1030 }
1031 };
1032
1033 for segment in StaticFileSegment::iter() {
1034 debug!(target: "reth::providers::static_file", ?segment, "Checking consistency for segment");
1035 match segment {
1036 StaticFileSegment::Headers | StaticFileSegment::Transactions => {}
1037 StaticFileSegment::Receipts => {
1038 if EitherWriter::receipts_destination(provider).is_database() {
1039 debug!(target: "reth::providers::static_file", ?segment, "Skipping receipts consistency check: receipts stored in database");
1042 continue
1043 }
1044
1045 if NamedChain::Gnosis == provider.chain_spec().chain_id() ||
1046 NamedChain::Chiado == provider.chain_spec().chain_id()
1047 {
1048 debug!(target: "reth::providers::static_file", ?segment, "Skipping receipts consistency check: broken historical import for gnosis/chiado");
1052 continue;
1053 }
1054 }
1055 StaticFileSegment::TransactionSenders => {
1056 if EitherWriterDestination::senders(provider).is_database() {
1057 continue
1058 }
1059 }
1060 }
1061
1062 let initial_highest_block = self.get_highest_static_file_block(segment);
1063 debug!(target: "reth::providers::static_file", ?segment, ?initial_highest_block, "Initial highest block for segment");
1064
1065 if self.access.is_read_only() {
1074 debug!(target: "reth::providers::static_file", ?segment, "Checking segment consistency (read-only)");
1075 self.check_segment_consistency(segment)?;
1076 } else {
1077 debug!(target: "reth::providers::static_file", ?segment, "Fetching latest writer which might heal any potential inconsistency");
1078 self.latest_writer(segment)?;
1080 }
1081
1082 let mut highest_block = self.get_highest_static_file_block(segment);
1087 if initial_highest_block != highest_block {
1088 info!(
1089 target: "reth::providers::static_file",
1090 ?initial_highest_block,
1091 unwind_target = highest_block,
1092 ?segment,
1093 "Setting unwind target."
1094 );
1095 update_unwind_target(highest_block.unwrap_or_default());
1096 }
1097
1098 let highest_tx = self.get_highest_static_file_tx(segment);
1104 debug!(target: "reth::providers::static_file", ?segment, ?highest_tx, ?highest_block, "Highest transaction for segment");
1105 if let Some(highest_tx) = highest_tx {
1106 let mut last_block = highest_block.unwrap_or_default();
1107 debug!(target: "reth::providers::static_file", ?segment, last_block, highest_tx, "Verifying last transaction matches last block indices");
1108 loop {
1109 if let Some(indices) = provider.block_body_indices(last_block)? {
1110 debug!(target: "reth::providers::static_file", ?segment, last_block, last_tx_num = indices.last_tx_num(), highest_tx, "Found block body indices");
1111 if indices.last_tx_num() <= highest_tx {
1112 break
1113 }
1114 } else {
1115 debug!(target: "reth::providers::static_file", ?segment, last_block, "Block body indices not found, static files ahead of database");
1116 break
1120 }
1121 if last_block == 0 {
1122 debug!(target: "reth::providers::static_file", ?segment, "Reached block 0 in verification loop");
1123 break
1124 }
1125 last_block -= 1;
1126
1127 info!(
1128 target: "reth::providers::static_file",
1129 highest_block = self.get_highest_static_file_block(segment),
1130 unwind_target = last_block,
1131 ?segment,
1132 "Setting unwind target."
1133 );
1134 highest_block = Some(last_block);
1135 update_unwind_target(last_block);
1136 }
1137 }
1138
1139 debug!(target: "reth::providers::static_file", ?segment, "Ensuring invariants for segment");
1140 if let Some(unwind) = match segment {
1141 StaticFileSegment::Headers => self
1142 .ensure_invariants::<_, tables::Headers<N::BlockHeader>>(
1143 provider,
1144 segment,
1145 highest_block,
1146 highest_block,
1147 )?,
1148 StaticFileSegment::Transactions => self
1149 .ensure_invariants::<_, tables::Transactions<N::SignedTx>>(
1150 provider,
1151 segment,
1152 highest_tx,
1153 highest_block,
1154 )?,
1155 StaticFileSegment::Receipts => self
1156 .ensure_invariants::<_, tables::Receipts<N::Receipt>>(
1157 provider,
1158 segment,
1159 highest_tx,
1160 highest_block,
1161 )?,
1162 StaticFileSegment::TransactionSenders => self
1163 .ensure_invariants::<_, tables::TransactionSenders>(
1164 provider,
1165 segment,
1166 highest_tx,
1167 highest_block,
1168 )?,
1169 } {
1170 debug!(target: "reth::providers::static_file", ?segment, unwind_target=unwind, "Invariants check returned unwind target");
1171 update_unwind_target(unwind);
1172 } else {
1173 debug!(target: "reth::providers::static_file", ?segment, "Invariants check completed, no unwind needed");
1174 }
1175 }
1176
1177 Ok(unwind_target.map(PipelineTarget::Unwind))
1178 }
1179
1180 pub fn check_segment_consistency(&self, segment: StaticFileSegment) -> ProviderResult<()> {
1183 debug!(target: "reth::providers::static_file", ?segment, "Checking segment consistency");
1184 if let Some(latest_block) = self.get_highest_static_file_block(segment) {
1185 let file_path = self
1186 .directory()
1187 .join(segment.filename(&self.find_fixed_range(segment, latest_block)));
1188 debug!(target: "reth::providers::static_file", ?segment, ?file_path, latest_block, "Loading NippyJar for consistency check");
1189
1190 let jar = NippyJar::<SegmentHeader>::load(&file_path).map_err(ProviderError::other)?;
1191 debug!(target: "reth::providers::static_file", ?segment, "NippyJar loaded, checking consistency");
1192
1193 NippyJarChecker::new(jar).check_consistency().map_err(ProviderError::other)?;
1194 debug!(target: "reth::providers::static_file", ?segment, "NippyJar consistency check passed");
1195 } else {
1196 debug!(target: "reth::providers::static_file", ?segment, "No static file block found, skipping consistency check");
1197 }
1198 Ok(())
1199 }
1200
1201 fn ensure_invariants<Provider, T: Table<Key = u64>>(
1216 &self,
1217 provider: &Provider,
1218 segment: StaticFileSegment,
1219 highest_static_file_entry: Option<u64>,
1220 highest_static_file_block: Option<BlockNumber>,
1221 ) -> ProviderResult<Option<BlockNumber>>
1222 where
1223 Provider: DBProvider + BlockReader + StageCheckpointReader,
1224 {
1225 debug!(target: "reth::providers::static_file", ?segment, ?highest_static_file_entry, ?highest_static_file_block, "Ensuring invariants");
1226 let mut db_cursor = provider.tx_ref().cursor_read::<T>()?;
1227
1228 if let Some((db_first_entry, _)) = db_cursor.first()? {
1229 debug!(target: "reth::providers::static_file", ?segment, db_first_entry, "Found first database entry");
1230 if let (Some(highest_entry), Some(highest_block)) =
1231 (highest_static_file_entry, highest_static_file_block)
1232 {
1233 if !(db_first_entry <= highest_entry || highest_entry + 1 == db_first_entry) {
1237 info!(
1238 target: "reth::providers::static_file",
1239 ?db_first_entry,
1240 ?highest_entry,
1241 unwind_target = highest_block,
1242 ?segment,
1243 "Setting unwind target."
1244 );
1245 return Ok(Some(highest_block))
1246 }
1247 }
1248
1249 if let Some((db_last_entry, _)) = db_cursor.last()? &&
1250 highest_static_file_entry
1251 .is_none_or(|highest_entry| db_last_entry > highest_entry)
1252 {
1253 debug!(target: "reth::providers::static_file", ?segment, db_last_entry, ?highest_static_file_entry, "Database has entries beyond static files, no unwind needed");
1254 return Ok(None)
1255 }
1256 } else {
1257 debug!(target: "reth::providers::static_file", ?segment, "No database entries found");
1258 }
1259
1260 let highest_static_file_entry = highest_static_file_entry.unwrap_or_default();
1261 let highest_static_file_block = highest_static_file_block.unwrap_or_default();
1262
1263 let stage_id = match segment {
1266 StaticFileSegment::Headers => StageId::Headers,
1267 StaticFileSegment::Transactions => StageId::Bodies,
1268 StaticFileSegment::Receipts => StageId::Execution,
1269 StaticFileSegment::TransactionSenders => StageId::SenderRecovery,
1270 };
1271 let checkpoint_block_number =
1272 provider.get_stage_checkpoint(stage_id)?.unwrap_or_default().block_number;
1273 debug!(target: "reth::providers::static_file", ?segment, ?stage_id, checkpoint_block_number, highest_static_file_block, "Retrieved stage checkpoint");
1274
1275 if checkpoint_block_number > highest_static_file_block {
1277 info!(
1278 target: "reth::providers::static_file",
1279 checkpoint_block_number,
1280 unwind_target = highest_static_file_block,
1281 ?segment,
1282 "Setting unwind target."
1283 );
1284 return Ok(Some(highest_static_file_block))
1285 }
1286
1287 if checkpoint_block_number < highest_static_file_block {
1291 info!(
1292 target: "reth::providers",
1293 ?segment,
1294 from = highest_static_file_block,
1295 to = checkpoint_block_number,
1296 "Unwinding static file segment."
1297 );
1298 let mut writer = self.latest_writer(segment)?;
1299 match segment {
1300 StaticFileSegment::Headers => {
1301 let prune_count = highest_static_file_block - checkpoint_block_number;
1302 debug!(target: "reth::providers::static_file", ?segment, prune_count, "Pruning headers");
1303 writer.prune_headers(prune_count)?;
1305 }
1306 StaticFileSegment::Transactions |
1307 StaticFileSegment::Receipts |
1308 StaticFileSegment::TransactionSenders => {
1309 if let Some(block) = provider.block_body_indices(checkpoint_block_number)? {
1310 let number = highest_static_file_entry - block.last_tx_num();
1311 debug!(target: "reth::providers::static_file", ?segment, prune_count = number, checkpoint_block_number, "Pruning transaction based segment");
1312
1313 match segment {
1314 StaticFileSegment::Transactions => {
1315 writer.prune_transactions(number, checkpoint_block_number)?
1316 }
1317 StaticFileSegment::Receipts => {
1318 writer.prune_receipts(number, checkpoint_block_number)?
1319 }
1320 StaticFileSegment::TransactionSenders => {
1321 writer.prune_transaction_senders(number, checkpoint_block_number)?
1322 }
1323 StaticFileSegment::Headers => unreachable!(),
1324 }
1325 } else {
1326 debug!(target: "reth::providers::static_file", ?segment, checkpoint_block_number, "No block body indices found for checkpoint block");
1327 }
1328 }
1329 }
1330 debug!(target: "reth::providers::static_file", ?segment, "Committing writer after pruning");
1331 writer.commit()?;
1332 debug!(target: "reth::providers::static_file", ?segment, "Writer committed successfully");
1333 }
1334
1335 debug!(target: "reth::providers::static_file", ?segment, "Invariants ensured, returning None");
1336 Ok(None)
1337 }
1338
1339 pub fn earliest_history_height(&self) -> BlockNumber {
1347 self.earliest_history_height.load(std::sync::atomic::Ordering::Relaxed)
1348 }
1349
1350 pub fn get_lowest_range(&self, segment: StaticFileSegment) -> Option<SegmentRangeInclusive> {
1354 self.indexes.read().get(&segment).and_then(|index| index.min_block_range)
1355 }
1356
1357 pub fn get_lowest_range_start(&self, segment: StaticFileSegment) -> Option<BlockNumber> {
1363 self.get_lowest_range(segment).map(|range| range.start())
1364 }
1365
1366 pub fn get_lowest_range_end(&self, segment: StaticFileSegment) -> Option<BlockNumber> {
1372 self.get_lowest_range(segment).map(|range| range.end())
1373 }
1374
1375 pub fn get_highest_static_file_block(&self, segment: StaticFileSegment) -> Option<BlockNumber> {
1379 self.indexes.read().get(&segment).map(|index| index.max_block)
1380 }
1381
1382 pub fn get_highest_static_file_tx(&self, segment: StaticFileSegment) -> Option<TxNumber> {
1386 self.indexes
1387 .read()
1388 .get(&segment)
1389 .and_then(|index| index.available_block_ranges_by_max_tx.as_ref())
1390 .and_then(|index| index.last_key_value().map(|(last_tx, _)| *last_tx))
1391 }
1392
1393 pub fn get_highest_static_files(&self) -> HighestStaticFiles {
1395 HighestStaticFiles {
1396 receipts: self.get_highest_static_file_block(StaticFileSegment::Receipts),
1397 }
1398 }
1399
1400 pub fn find_static_file<T>(
1403 &self,
1404 segment: StaticFileSegment,
1405 func: impl Fn(StaticFileJarProvider<'_, N>) -> ProviderResult<Option<T>>,
1406 ) -> ProviderResult<Option<T>> {
1407 if let Some(ranges) =
1408 self.indexes.read().get(&segment).map(|index| &index.expected_block_ranges_by_max_block)
1409 {
1410 for range in ranges.values().rev() {
1412 if let Some(res) = func(self.get_or_create_jar_provider(segment, range)?)? {
1413 return Ok(Some(res))
1414 }
1415 }
1416 }
1417
1418 Ok(None)
1419 }
1420
1421 pub fn fetch_range_with_predicate<T, F, P>(
1427 &self,
1428 segment: StaticFileSegment,
1429 range: Range<u64>,
1430 mut get_fn: F,
1431 mut predicate: P,
1432 ) -> ProviderResult<Vec<T>>
1433 where
1434 F: FnMut(&mut StaticFileCursor<'_>, u64) -> ProviderResult<Option<T>>,
1435 P: FnMut(&T) -> bool,
1436 {
1437 let mut result = Vec::with_capacity((range.end - range.start).min(100) as usize);
1438
1439 macro_rules! get_provider {
1443 ($number:expr) => {{
1444 match self.get_segment_provider(segment, $number) {
1445 Ok(provider) => provider,
1446 Err(
1447 ProviderError::MissingStaticFileBlock(_, _) |
1448 ProviderError::MissingStaticFileTx(_, _),
1449 ) => return Ok(result),
1450 Err(err) => return Err(err),
1451 }
1452 }};
1453 }
1454
1455 let mut provider = get_provider!(range.start);
1456 let mut cursor = provider.cursor()?;
1457
1458 'outer: for number in range {
1460 let mut retrying = false;
1464
1465 'inner: loop {
1467 match get_fn(&mut cursor, number)? {
1468 Some(res) => {
1469 if !predicate(&res) {
1470 break 'outer
1471 }
1472 result.push(res);
1473 break 'inner
1474 }
1475 None => {
1476 if retrying {
1477 return Ok(result)
1478 }
1479 drop(cursor);
1484 drop(provider);
1485 provider = get_provider!(number);
1486 cursor = provider.cursor()?;
1487 retrying = true;
1488 }
1489 }
1490 }
1491 }
1492
1493 Ok(result)
1494 }
1495
1496 pub fn fetch_range_iter<'a, T, F>(
1501 &'a self,
1502 segment: StaticFileSegment,
1503 range: Range<u64>,
1504 get_fn: F,
1505 ) -> ProviderResult<impl Iterator<Item = ProviderResult<Option<T>>> + 'a>
1506 where
1507 F: Fn(&mut StaticFileCursor<'_>, u64) -> ProviderResult<Option<T>> + 'a,
1508 T: std::fmt::Debug,
1509 {
1510 let mut provider = self.get_maybe_segment_provider(segment, range.start)?;
1511 Ok(range.map(move |number| {
1512 match provider
1513 .as_ref()
1514 .map(|provider| get_fn(&mut provider.cursor()?, number))
1515 .and_then(|result| result.transpose())
1516 {
1517 Some(result) => result.map(Some),
1518 None => {
1519 provider.take();
1523 provider = self.get_maybe_segment_provider(segment, number)?;
1524 provider
1525 .as_ref()
1526 .map(|provider| get_fn(&mut provider.cursor()?, number))
1527 .and_then(|result| result.transpose())
1528 .transpose()
1529 }
1530 }
1531 }))
1532 }
1533
1534 pub fn directory(&self) -> &Path {
1536 &self.path
1537 }
1538
1539 pub fn get_with_static_file_or_database<T, FS, FD>(
1549 &self,
1550 segment: StaticFileSegment,
1551 number: u64,
1552 fetch_from_static_file: FS,
1553 fetch_from_database: FD,
1554 ) -> ProviderResult<Option<T>>
1555 where
1556 FS: Fn(&Self) -> ProviderResult<Option<T>>,
1557 FD: Fn() -> ProviderResult<Option<T>>,
1558 {
1559 let static_file_upper_bound = if segment.is_block_based() {
1561 self.get_highest_static_file_block(segment)
1562 } else {
1563 self.get_highest_static_file_tx(segment)
1564 };
1565
1566 if static_file_upper_bound
1567 .is_some_and(|static_file_upper_bound| static_file_upper_bound >= number)
1568 {
1569 return fetch_from_static_file(self)
1570 }
1571 fetch_from_database()
1572 }
1573
1574 pub fn get_range_with_static_file_or_database<T, P, FS, FD>(
1586 &self,
1587 segment: StaticFileSegment,
1588 mut block_or_tx_range: Range<u64>,
1589 fetch_from_static_file: FS,
1590 mut fetch_from_database: FD,
1591 mut predicate: P,
1592 ) -> ProviderResult<Vec<T>>
1593 where
1594 FS: Fn(&Self, Range<u64>, &mut P) -> ProviderResult<Vec<T>>,
1595 FD: FnMut(Range<u64>, P) -> ProviderResult<Vec<T>>,
1596 P: FnMut(&T) -> bool,
1597 {
1598 let mut data = Vec::new();
1599
1600 if let Some(static_file_upper_bound) = if segment.is_block_based() {
1602 self.get_highest_static_file_block(segment)
1603 } else {
1604 self.get_highest_static_file_tx(segment)
1605 } && block_or_tx_range.start <= static_file_upper_bound
1606 {
1607 let end = block_or_tx_range.end.min(static_file_upper_bound + 1);
1608 data.extend(fetch_from_static_file(
1609 self,
1610 block_or_tx_range.start..end,
1611 &mut predicate,
1612 )?);
1613 block_or_tx_range.start = end;
1614 }
1615
1616 if block_or_tx_range.end > block_or_tx_range.start {
1617 data.extend(fetch_from_database(block_or_tx_range, predicate)?)
1618 }
1619
1620 Ok(data)
1621 }
1622
1623 #[cfg(any(test, feature = "test-utils"))]
1625 pub fn path(&self) -> &Path {
1626 &self.path
1627 }
1628
1629 #[cfg(any(test, feature = "test-utils"))]
1631 pub fn tx_index(&self, segment: StaticFileSegment) -> Option<SegmentRanges> {
1632 self.indexes
1633 .read()
1634 .get(&segment)
1635 .and_then(|index| index.available_block_ranges_by_max_tx.as_ref())
1636 .cloned()
1637 }
1638
1639 #[cfg(any(test, feature = "test-utils"))]
1641 pub fn expected_block_index(&self, segment: StaticFileSegment) -> Option<SegmentRanges> {
1642 self.indexes
1643 .read()
1644 .get(&segment)
1645 .map(|index| &index.expected_block_ranges_by_max_block)
1646 .cloned()
1647 }
1648}
1649
1650#[derive(Debug)]
1651struct StaticFileSegmentIndex {
1652 min_block_range: Option<SegmentRangeInclusive>,
1664 max_block: u64,
1666 expected_block_ranges_by_max_block: SegmentRanges,
1672 available_block_ranges_by_max_tx: Option<SegmentRanges>,
1679}
1680
1681pub trait StaticFileWriter {
1683 type Primitives: Send + Sync + 'static;
1685
1686 fn get_writer(
1688 &self,
1689 block: BlockNumber,
1690 segment: StaticFileSegment,
1691 ) -> ProviderResult<StaticFileProviderRWRefMut<'_, Self::Primitives>>;
1692
1693 fn latest_writer(
1696 &self,
1697 segment: StaticFileSegment,
1698 ) -> ProviderResult<StaticFileProviderRWRefMut<'_, Self::Primitives>>;
1699
1700 fn commit(&self) -> ProviderResult<()>;
1702
1703 fn has_unwind_queued(&self) -> bool;
1705}
1706
1707impl<N: NodePrimitives> StaticFileWriter for StaticFileProvider<N> {
1708 type Primitives = N;
1709
1710 fn get_writer(
1711 &self,
1712 block: BlockNumber,
1713 segment: StaticFileSegment,
1714 ) -> ProviderResult<StaticFileProviderRWRefMut<'_, Self::Primitives>> {
1715 if self.access.is_read_only() {
1716 return Err(ProviderError::ReadOnlyStaticFileAccess)
1717 }
1718
1719 trace!(target: "provider::static_file", ?block, ?segment, "Getting static file writer.");
1720 self.writers.get_or_create(segment, || {
1721 StaticFileProviderRW::new(segment, block, Arc::downgrade(&self.0), self.metrics.clone())
1722 })
1723 }
1724
1725 fn latest_writer(
1726 &self,
1727 segment: StaticFileSegment,
1728 ) -> ProviderResult<StaticFileProviderRWRefMut<'_, Self::Primitives>> {
1729 self.get_writer(self.get_highest_static_file_block(segment).unwrap_or_default(), segment)
1730 }
1731
1732 fn commit(&self) -> ProviderResult<()> {
1733 self.writers.commit()
1734 }
1735
1736 fn has_unwind_queued(&self) -> bool {
1737 self.writers.has_unwind_queued()
1738 }
1739}
1740
1741impl<N: NodePrimitives<BlockHeader: Value>> HeaderProvider for StaticFileProvider<N> {
1742 type Header = N::BlockHeader;
1743
1744 fn header(&self, block_hash: BlockHash) -> ProviderResult<Option<Self::Header>> {
1745 self.find_static_file(StaticFileSegment::Headers, |jar_provider| {
1746 Ok(jar_provider
1747 .cursor()?
1748 .get_two::<HeaderWithHashMask<Self::Header>>((&block_hash).into())?
1749 .and_then(|(header, hash)| {
1750 if hash == block_hash {
1751 return Some(header)
1752 }
1753 None
1754 }))
1755 })
1756 }
1757
1758 fn header_by_number(&self, num: BlockNumber) -> ProviderResult<Option<Self::Header>> {
1759 self.get_segment_provider_for_block(StaticFileSegment::Headers, num, None)
1760 .and_then(|provider| provider.header_by_number(num))
1761 .or_else(|err| {
1762 if let ProviderError::MissingStaticFileBlock(_, _) = err {
1763 Ok(None)
1764 } else {
1765 Err(err)
1766 }
1767 })
1768 }
1769
1770 fn headers_range(
1771 &self,
1772 range: impl RangeBounds<BlockNumber>,
1773 ) -> ProviderResult<Vec<Self::Header>> {
1774 self.fetch_range_with_predicate(
1775 StaticFileSegment::Headers,
1776 to_range(range),
1777 |cursor, number| cursor.get_one::<HeaderMask<Self::Header>>(number.into()),
1778 |_| true,
1779 )
1780 }
1781
1782 fn sealed_header(
1783 &self,
1784 num: BlockNumber,
1785 ) -> ProviderResult<Option<SealedHeader<Self::Header>>> {
1786 self.get_segment_provider_for_block(StaticFileSegment::Headers, num, None)
1787 .and_then(|provider| provider.sealed_header(num))
1788 .or_else(|err| {
1789 if let ProviderError::MissingStaticFileBlock(_, _) = err {
1790 Ok(None)
1791 } else {
1792 Err(err)
1793 }
1794 })
1795 }
1796
1797 fn sealed_headers_while(
1798 &self,
1799 range: impl RangeBounds<BlockNumber>,
1800 predicate: impl FnMut(&SealedHeader<Self::Header>) -> bool,
1801 ) -> ProviderResult<Vec<SealedHeader<Self::Header>>> {
1802 self.fetch_range_with_predicate(
1803 StaticFileSegment::Headers,
1804 to_range(range),
1805 |cursor, number| {
1806 Ok(cursor
1807 .get_two::<HeaderWithHashMask<Self::Header>>(number.into())?
1808 .map(|(header, hash)| SealedHeader::new(header, hash)))
1809 },
1810 predicate,
1811 )
1812 }
1813}
1814
1815impl<N: NodePrimitives> BlockHashReader for StaticFileProvider<N> {
1816 fn block_hash(&self, num: u64) -> ProviderResult<Option<B256>> {
1817 self.get_segment_provider_for_block(StaticFileSegment::Headers, num, None)
1818 .and_then(|provider| provider.block_hash(num))
1819 .or_else(|err| {
1820 if let ProviderError::MissingStaticFileBlock(_, _) = err {
1821 Ok(None)
1822 } else {
1823 Err(err)
1824 }
1825 })
1826 }
1827
1828 fn canonical_hashes_range(
1829 &self,
1830 start: BlockNumber,
1831 end: BlockNumber,
1832 ) -> ProviderResult<Vec<B256>> {
1833 self.fetch_range_with_predicate(
1834 StaticFileSegment::Headers,
1835 start..end,
1836 |cursor, number| cursor.get_one::<BlockHashMask>(number.into()),
1837 |_| true,
1838 )
1839 }
1840}
1841
1842impl<N: NodePrimitives<SignedTx: Value + SignedTransaction, Receipt: Value>> ReceiptProvider
1843 for StaticFileProvider<N>
1844{
1845 type Receipt = N::Receipt;
1846
1847 fn receipt(&self, num: TxNumber) -> ProviderResult<Option<Self::Receipt>> {
1848 self.get_segment_provider_for_transaction(StaticFileSegment::Receipts, num, None)
1849 .and_then(|provider| provider.receipt(num))
1850 .or_else(|err| {
1851 if let ProviderError::MissingStaticFileTx(_, _) = err {
1852 Ok(None)
1853 } else {
1854 Err(err)
1855 }
1856 })
1857 }
1858
1859 fn receipt_by_hash(&self, hash: TxHash) -> ProviderResult<Option<Self::Receipt>> {
1860 if let Some(num) = self.transaction_id(hash)? {
1861 return self.receipt(num)
1862 }
1863 Ok(None)
1864 }
1865
1866 fn receipts_by_block(
1867 &self,
1868 _block: BlockHashOrNumber,
1869 ) -> ProviderResult<Option<Vec<Self::Receipt>>> {
1870 unreachable!()
1871 }
1872
1873 fn receipts_by_tx_range(
1874 &self,
1875 range: impl RangeBounds<TxNumber>,
1876 ) -> ProviderResult<Vec<Self::Receipt>> {
1877 self.fetch_range_with_predicate(
1878 StaticFileSegment::Receipts,
1879 to_range(range),
1880 |cursor, number| cursor.get_one::<ReceiptMask<Self::Receipt>>(number.into()),
1881 |_| true,
1882 )
1883 }
1884
1885 fn receipts_by_block_range(
1886 &self,
1887 _block_range: RangeInclusive<BlockNumber>,
1888 ) -> ProviderResult<Vec<Vec<Self::Receipt>>> {
1889 Err(ProviderError::UnsupportedProvider)
1890 }
1891}
1892
1893impl<N: NodePrimitives<SignedTx: Value, Receipt: Value, BlockHeader: Value>> TransactionsProviderExt
1894 for StaticFileProvider<N>
1895{
1896 fn transaction_hashes_by_range(
1897 &self,
1898 tx_range: Range<TxNumber>,
1899 ) -> ProviderResult<Vec<(TxHash, TxNumber)>> {
1900 let tx_range_size = (tx_range.end - tx_range.start) as usize;
1901
1902 let chunk_size = 100;
1906
1907 let chunks = tx_range
1909 .clone()
1910 .step_by(chunk_size)
1911 .map(|start| start..std::cmp::min(start + chunk_size as u64, tx_range.end));
1912 let mut channels = Vec::with_capacity(tx_range_size.div_ceil(chunk_size));
1913
1914 for chunk_range in chunks {
1915 let (channel_tx, channel_rx) = mpsc::channel();
1916 channels.push(channel_rx);
1917
1918 let manager = self.clone();
1919
1920 rayon::spawn(move || {
1924 let mut rlp_buf = Vec::with_capacity(128);
1925 let _ = manager.fetch_range_with_predicate(
1926 StaticFileSegment::Transactions,
1927 chunk_range,
1928 |cursor, number| {
1929 Ok(cursor
1930 .get_one::<TransactionMask<Self::Transaction>>(number.into())?
1931 .map(|transaction| {
1932 rlp_buf.clear();
1933 let _ = channel_tx
1934 .send(calculate_hash((number, transaction), &mut rlp_buf));
1935 }))
1936 },
1937 |_| true,
1938 );
1939 });
1940 }
1941
1942 let mut tx_list = Vec::with_capacity(tx_range_size);
1943
1944 for channel in channels {
1946 while let Ok(tx) = channel.recv() {
1947 let (tx_hash, tx_id) = tx.map_err(|boxed| *boxed)?;
1948 tx_list.push((tx_hash, tx_id));
1949 }
1950 }
1951
1952 Ok(tx_list)
1953 }
1954}
1955
1956impl<N: NodePrimitives<SignedTx: Decompress + SignedTransaction>> TransactionsProvider
1957 for StaticFileProvider<N>
1958{
1959 type Transaction = N::SignedTx;
1960
1961 fn transaction_id(&self, tx_hash: TxHash) -> ProviderResult<Option<TxNumber>> {
1962 self.find_static_file(StaticFileSegment::Transactions, |jar_provider| {
1963 let mut cursor = jar_provider.cursor()?;
1964 if cursor
1965 .get_one::<TransactionMask<Self::Transaction>>((&tx_hash).into())?
1966 .and_then(|tx| (tx.trie_hash() == tx_hash).then_some(tx))
1967 .is_some()
1968 {
1969 Ok(cursor.number())
1970 } else {
1971 Ok(None)
1972 }
1973 })
1974 }
1975
1976 fn transaction_by_id(&self, num: TxNumber) -> ProviderResult<Option<Self::Transaction>> {
1977 self.get_segment_provider_for_transaction(StaticFileSegment::Transactions, num, None)
1978 .and_then(|provider| provider.transaction_by_id(num))
1979 .or_else(|err| {
1980 if let ProviderError::MissingStaticFileTx(_, _) = err {
1981 Ok(None)
1982 } else {
1983 Err(err)
1984 }
1985 })
1986 }
1987
1988 fn transaction_by_id_unhashed(
1989 &self,
1990 num: TxNumber,
1991 ) -> ProviderResult<Option<Self::Transaction>> {
1992 self.get_segment_provider_for_transaction(StaticFileSegment::Transactions, num, None)
1993 .and_then(|provider| provider.transaction_by_id_unhashed(num))
1994 .or_else(|err| {
1995 if let ProviderError::MissingStaticFileTx(_, _) = err {
1996 Ok(None)
1997 } else {
1998 Err(err)
1999 }
2000 })
2001 }
2002
2003 fn transaction_by_hash(&self, hash: TxHash) -> ProviderResult<Option<Self::Transaction>> {
2004 self.find_static_file(StaticFileSegment::Transactions, |jar_provider| {
2005 Ok(jar_provider
2006 .cursor()?
2007 .get_one::<TransactionMask<Self::Transaction>>((&hash).into())?
2008 .and_then(|tx| (tx.trie_hash() == hash).then_some(tx)))
2009 })
2010 }
2011
2012 fn transaction_by_hash_with_meta(
2013 &self,
2014 _hash: TxHash,
2015 ) -> ProviderResult<Option<(Self::Transaction, TransactionMeta)>> {
2016 Err(ProviderError::UnsupportedProvider)
2018 }
2019
2020 fn transactions_by_block(
2021 &self,
2022 _block_id: BlockHashOrNumber,
2023 ) -> ProviderResult<Option<Vec<Self::Transaction>>> {
2024 Err(ProviderError::UnsupportedProvider)
2026 }
2027
2028 fn transactions_by_block_range(
2029 &self,
2030 _range: impl RangeBounds<BlockNumber>,
2031 ) -> ProviderResult<Vec<Vec<Self::Transaction>>> {
2032 Err(ProviderError::UnsupportedProvider)
2034 }
2035
2036 fn transactions_by_tx_range(
2037 &self,
2038 range: impl RangeBounds<TxNumber>,
2039 ) -> ProviderResult<Vec<Self::Transaction>> {
2040 self.fetch_range_with_predicate(
2041 StaticFileSegment::Transactions,
2042 to_range(range),
2043 |cursor, number| cursor.get_one::<TransactionMask<Self::Transaction>>(number.into()),
2044 |_| true,
2045 )
2046 }
2047
2048 fn senders_by_tx_range(
2049 &self,
2050 range: impl RangeBounds<TxNumber>,
2051 ) -> ProviderResult<Vec<Address>> {
2052 self.fetch_range_with_predicate(
2053 StaticFileSegment::TransactionSenders,
2054 to_range(range),
2055 |cursor, number| cursor.get_one::<TransactionSenderMask>(number.into()),
2056 |_| true,
2057 )
2058 }
2059
2060 fn transaction_sender(&self, id: TxNumber) -> ProviderResult<Option<Address>> {
2061 self.get_segment_provider_for_transaction(StaticFileSegment::TransactionSenders, id, None)
2062 .and_then(|provider| provider.transaction_sender(id))
2063 .or_else(|err| {
2064 if let ProviderError::MissingStaticFileTx(_, _) = err {
2065 Ok(None)
2066 } else {
2067 Err(err)
2068 }
2069 })
2070 }
2071}
2072
2073impl<N: NodePrimitives> BlockNumReader for StaticFileProvider<N> {
2074 fn chain_info(&self) -> ProviderResult<ChainInfo> {
2075 Err(ProviderError::UnsupportedProvider)
2077 }
2078
2079 fn best_block_number(&self) -> ProviderResult<BlockNumber> {
2080 Err(ProviderError::UnsupportedProvider)
2082 }
2083
2084 fn last_block_number(&self) -> ProviderResult<BlockNumber> {
2085 Ok(self.get_highest_static_file_block(StaticFileSegment::Headers).unwrap_or_default())
2086 }
2087
2088 fn block_number(&self, _hash: B256) -> ProviderResult<Option<BlockNumber>> {
2089 Err(ProviderError::UnsupportedProvider)
2091 }
2092}
2093
2094impl<N: NodePrimitives<SignedTx: Value, Receipt: Value, BlockHeader: Value>> BlockReader
2097 for StaticFileProvider<N>
2098{
2099 type Block = N::Block;
2100
2101 fn find_block_by_hash(
2102 &self,
2103 _hash: B256,
2104 _source: BlockSource,
2105 ) -> ProviderResult<Option<Self::Block>> {
2106 Err(ProviderError::UnsupportedProvider)
2108 }
2109
2110 fn block(&self, _id: BlockHashOrNumber) -> ProviderResult<Option<Self::Block>> {
2111 Err(ProviderError::UnsupportedProvider)
2113 }
2114
2115 fn pending_block(&self) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
2116 Err(ProviderError::UnsupportedProvider)
2118 }
2119
2120 fn pending_block_and_receipts(
2121 &self,
2122 ) -> ProviderResult<Option<(RecoveredBlock<Self::Block>, Vec<Self::Receipt>)>> {
2123 Err(ProviderError::UnsupportedProvider)
2125 }
2126
2127 fn recovered_block(
2128 &self,
2129 _id: BlockHashOrNumber,
2130 _transaction_kind: TransactionVariant,
2131 ) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
2132 Err(ProviderError::UnsupportedProvider)
2134 }
2135
2136 fn sealed_block_with_senders(
2137 &self,
2138 _id: BlockHashOrNumber,
2139 _transaction_kind: TransactionVariant,
2140 ) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
2141 Err(ProviderError::UnsupportedProvider)
2143 }
2144
2145 fn block_range(&self, _range: RangeInclusive<BlockNumber>) -> ProviderResult<Vec<Self::Block>> {
2146 Err(ProviderError::UnsupportedProvider)
2148 }
2149
2150 fn block_with_senders_range(
2151 &self,
2152 _range: RangeInclusive<BlockNumber>,
2153 ) -> ProviderResult<Vec<RecoveredBlock<Self::Block>>> {
2154 Err(ProviderError::UnsupportedProvider)
2155 }
2156
2157 fn recovered_block_range(
2158 &self,
2159 _range: RangeInclusive<BlockNumber>,
2160 ) -> ProviderResult<Vec<RecoveredBlock<Self::Block>>> {
2161 Err(ProviderError::UnsupportedProvider)
2162 }
2163
2164 fn block_by_transaction_id(&self, _id: TxNumber) -> ProviderResult<Option<BlockNumber>> {
2165 Err(ProviderError::UnsupportedProvider)
2166 }
2167}
2168
2169impl<N: NodePrimitives> BlockBodyIndicesProvider for StaticFileProvider<N> {
2170 fn block_body_indices(&self, _num: u64) -> ProviderResult<Option<StoredBlockBodyIndices>> {
2171 Err(ProviderError::UnsupportedProvider)
2172 }
2173
2174 fn block_body_indices_range(
2175 &self,
2176 _range: RangeInclusive<BlockNumber>,
2177 ) -> ProviderResult<Vec<StoredBlockBodyIndices>> {
2178 Err(ProviderError::UnsupportedProvider)
2179 }
2180}
2181
2182impl<N: NodePrimitives> StatsReader for StaticFileProvider<N> {
2183 fn count_entries<T: Table>(&self) -> ProviderResult<usize> {
2184 match T::NAME {
2185 tables::CanonicalHeaders::NAME |
2186 tables::Headers::<Header>::NAME |
2187 tables::HeaderTerminalDifficulties::NAME => Ok(self
2188 .get_highest_static_file_block(StaticFileSegment::Headers)
2189 .map(|block| block + 1)
2190 .unwrap_or_default()
2191 as usize),
2192 tables::Receipts::<Receipt>::NAME => Ok(self
2193 .get_highest_static_file_tx(StaticFileSegment::Receipts)
2194 .map(|receipts| receipts + 1)
2195 .unwrap_or_default() as usize),
2196 tables::Transactions::<TransactionSigned>::NAME => Ok(self
2197 .get_highest_static_file_tx(StaticFileSegment::Transactions)
2198 .map(|txs| txs + 1)
2199 .unwrap_or_default()
2200 as usize),
2201 tables::TransactionSenders::NAME => Ok(self
2202 .get_highest_static_file_tx(StaticFileSegment::TransactionSenders)
2203 .map(|txs| txs + 1)
2204 .unwrap_or_default() as usize),
2205 _ => Err(ProviderError::UnsupportedProvider),
2206 }
2207 }
2208}
2209
2210#[inline]
2212fn calculate_hash<T>(
2213 entry: (TxNumber, T),
2214 rlp_buf: &mut Vec<u8>,
2215) -> Result<(B256, TxNumber), Box<ProviderError>>
2216where
2217 T: Encodable2718,
2218{
2219 let (tx_id, tx) = entry;
2220 tx.encode_2718(rlp_buf);
2221 Ok((keccak256(rlp_buf), tx_id))
2222}
2223
2224#[cfg(test)]
2225mod tests {
2226 use std::collections::BTreeMap;
2227
2228 use reth_chain_state::EthPrimitives;
2229 use reth_db::test_utils::create_test_static_files_dir;
2230 use reth_static_file_types::{SegmentRangeInclusive, StaticFileSegment};
2231
2232 use crate::StaticFileProviderBuilder;
2233
2234 #[test]
2235 fn test_find_fixed_range_with_block_index() -> eyre::Result<()> {
2236 let (static_dir, _) = create_test_static_files_dir();
2237 let sf_rw = StaticFileProviderBuilder::<EthPrimitives>::read_write(&static_dir)?
2238 .with_blocks_per_file(100)
2239 .build()?;
2240
2241 let segment = StaticFileSegment::Headers;
2242
2243 assert_eq!(
2245 sf_rw.find_fixed_range_with_block_index(segment, None, 0),
2246 SegmentRangeInclusive::new(0, 99)
2247 );
2248 assert_eq!(
2249 sf_rw.find_fixed_range_with_block_index(segment, None, 250),
2250 SegmentRangeInclusive::new(200, 299)
2251 );
2252
2253 assert_eq!(
2255 sf_rw.find_fixed_range_with_block_index(segment, Some(&BTreeMap::new()), 150),
2256 SegmentRangeInclusive::new(100, 199)
2257 );
2258
2259 let block_index = BTreeMap::from_iter([
2261 (99, SegmentRangeInclusive::new(0, 99)),
2262 (199, SegmentRangeInclusive::new(100, 199)),
2263 (299, SegmentRangeInclusive::new(200, 299)),
2264 ]);
2265
2266 assert_eq!(
2268 sf_rw.find_fixed_range_with_block_index(segment, Some(&block_index), 0),
2269 SegmentRangeInclusive::new(0, 99)
2270 );
2271 assert_eq!(
2272 sf_rw.find_fixed_range_with_block_index(segment, Some(&block_index), 50),
2273 SegmentRangeInclusive::new(0, 99)
2274 );
2275 assert_eq!(
2276 sf_rw.find_fixed_range_with_block_index(segment, Some(&block_index), 99),
2277 SegmentRangeInclusive::new(0, 99)
2278 );
2279 assert_eq!(
2280 sf_rw.find_fixed_range_with_block_index(segment, Some(&block_index), 100),
2281 SegmentRangeInclusive::new(100, 199)
2282 );
2283 assert_eq!(
2284 sf_rw.find_fixed_range_with_block_index(segment, Some(&block_index), 150),
2285 SegmentRangeInclusive::new(100, 199)
2286 );
2287 assert_eq!(
2288 sf_rw.find_fixed_range_with_block_index(segment, Some(&block_index), 199),
2289 SegmentRangeInclusive::new(100, 199)
2290 );
2291
2292 assert_eq!(
2295 sf_rw.find_fixed_range_with_block_index(segment, Some(&block_index), 300),
2296 SegmentRangeInclusive::new(300, 399)
2297 );
2298 assert_eq!(
2299 sf_rw.find_fixed_range_with_block_index(segment, Some(&block_index), 350),
2300 SegmentRangeInclusive::new(300, 399)
2301 );
2302
2303 assert_eq!(
2305 sf_rw.find_fixed_range_with_block_index(segment, Some(&block_index), 500),
2306 SegmentRangeInclusive::new(500, 599)
2307 );
2308
2309 assert_eq!(
2311 sf_rw.find_fixed_range_with_block_index(segment, Some(&block_index), 1000),
2312 SegmentRangeInclusive::new(1000, 1099)
2313 );
2314
2315 let mixed_size_index = BTreeMap::from_iter([
2318 (49, SegmentRangeInclusive::new(0, 49)), (149, SegmentRangeInclusive::new(50, 149)), (349, SegmentRangeInclusive::new(150, 349)), ]);
2322
2323 assert_eq!(
2325 sf_rw.find_fixed_range_with_block_index(segment, Some(&mixed_size_index), 25),
2326 SegmentRangeInclusive::new(0, 49)
2327 );
2328 assert_eq!(
2329 sf_rw.find_fixed_range_with_block_index(segment, Some(&mixed_size_index), 100),
2330 SegmentRangeInclusive::new(50, 149)
2331 );
2332 assert_eq!(
2333 sf_rw.find_fixed_range_with_block_index(segment, Some(&mixed_size_index), 200),
2334 SegmentRangeInclusive::new(150, 349)
2335 );
2336
2337 assert_eq!(
2340 sf_rw.find_fixed_range_with_block_index(segment, Some(&mixed_size_index), 350),
2341 SegmentRangeInclusive::new(350, 449)
2342 );
2343 assert_eq!(
2344 sf_rw.find_fixed_range_with_block_index(segment, Some(&mixed_size_index), 450),
2345 SegmentRangeInclusive::new(450, 549)
2346 );
2347 assert_eq!(
2348 sf_rw.find_fixed_range_with_block_index(segment, Some(&mixed_size_index), 550),
2349 SegmentRangeInclusive::new(550, 649)
2350 );
2351
2352 Ok(())
2353 }
2354}