1use super::{
2 metrics::StaticFileProviderMetrics, writer::StaticFileWriters, LoadedJar,
3 StaticFileJarProvider, StaticFileProviderRW, StaticFileProviderRWRefMut,
4};
5use crate::{
6 to_range, BlockHashReader, BlockNumReader, BlockReader, BlockSource, EitherWriter,
7 EitherWriterDestination, HeaderProvider, ReceiptProvider, StageCheckpointReader, StatsReader,
8 TransactionVariant, TransactionsProvider, TransactionsProviderExt,
9};
10use alloy_consensus::{transaction::TransactionMeta, Header};
11use alloy_eips::{eip2718::Encodable2718, BlockHashOrNumber};
12use alloy_primitives::{b256, keccak256, Address, BlockHash, BlockNumber, TxHash, TxNumber, B256};
13use dashmap::DashMap;
14use notify::{RecommendedWatcher, RecursiveMode, Watcher};
15use parking_lot::RwLock;
16use reth_chainspec::{ChainInfo, ChainSpecProvider, EthChainSpec, NamedChain};
17use reth_db::{
18 lockfile::StorageLock,
19 static_file::{
20 iter_static_files, BlockHashMask, HeaderMask, HeaderWithHashMask, ReceiptMask,
21 StaticFileCursor, TransactionMask, TransactionSenderMask,
22 },
23};
24use reth_db_api::{
25 cursor::DbCursorRO,
26 models::StoredBlockBodyIndices,
27 table::{Decompress, Table, Value},
28 tables,
29 transaction::DbTx,
30};
31use reth_ethereum_primitives::{Receipt, TransactionSigned};
32use reth_nippy_jar::{NippyJar, NippyJarChecker, CONFIG_FILE_EXTENSION};
33use reth_node_types::NodePrimitives;
34use reth_primitives_traits::{RecoveredBlock, SealedHeader, SignedTransaction};
35use reth_stages_types::{PipelineTarget, StageId};
36use reth_static_file_types::{
37 find_fixed_range, HighestStaticFiles, SegmentHeader, SegmentRangeInclusive, StaticFileSegment,
38 DEFAULT_BLOCKS_PER_STATIC_FILE,
39};
40use reth_storage_api::{BlockBodyIndicesProvider, DBProvider, StorageSettingsCache};
41use reth_storage_errors::provider::{ProviderError, ProviderResult};
42use std::{
43 collections::{BTreeMap, HashMap},
44 fmt::Debug,
45 ops::{Deref, Range, RangeBounds, RangeInclusive},
46 path::{Path, PathBuf},
47 sync::{atomic::AtomicU64, mpsc, Arc},
48};
49use tracing::{debug, info, trace, warn};
50
51type SegmentRanges = BTreeMap<u64, SegmentRangeInclusive>;
54
55#[derive(Debug, Default, PartialEq, Eq)]
57pub enum StaticFileAccess {
58 #[default]
60 RO,
61 RW,
63}
64
65impl StaticFileAccess {
66 pub const fn is_read_only(&self) -> bool {
68 matches!(self, Self::RO)
69 }
70
71 pub const fn is_read_write(&self) -> bool {
73 matches!(self, Self::RW)
74 }
75}
76
77#[derive(Debug)]
86pub struct StaticFileProvider<N>(pub(crate) Arc<StaticFileProviderInner<N>>);
87
88impl<N> Clone for StaticFileProvider<N> {
89 fn clone(&self) -> Self {
90 Self(self.0.clone())
91 }
92}
93
94#[derive(Debug)]
96pub struct StaticFileProviderBuilder<N> {
97 inner: StaticFileProviderInner<N>,
98}
99
100impl<N: NodePrimitives> StaticFileProviderBuilder<N> {
101 pub fn read_write(path: impl AsRef<Path>) -> ProviderResult<Self> {
103 StaticFileProviderInner::new(path, StaticFileAccess::RW).map(|inner| Self { inner })
104 }
105
106 pub fn read_only(path: impl AsRef<Path>) -> ProviderResult<Self> {
108 StaticFileProviderInner::new(path, StaticFileAccess::RO).map(|inner| Self { inner })
109 }
110
111 pub fn with_blocks_per_file_for_segments(
123 mut self,
124 segments: HashMap<StaticFileSegment, u64>,
125 ) -> Self {
126 self.inner.blocks_per_file.extend(segments);
127 self
128 }
129
130 pub fn with_blocks_per_file(mut self, blocks_per_file: u64) -> Self {
132 for segment in StaticFileSegment::iter() {
133 self.inner.blocks_per_file.insert(segment, blocks_per_file);
134 }
135 self
136 }
137
138 pub fn with_blocks_per_file_for_segment(
140 mut self,
141 segment: StaticFileSegment,
142 blocks_per_file: u64,
143 ) -> Self {
144 self.inner.blocks_per_file.insert(segment, blocks_per_file);
145 self
146 }
147
148 pub fn with_metrics(mut self) -> Self {
150 self.inner.metrics = Some(Arc::new(StaticFileProviderMetrics::default()));
151 self
152 }
153
154 pub fn build(self) -> ProviderResult<StaticFileProvider<N>> {
156 let provider = StaticFileProvider(Arc::new(self.inner));
157 provider.initialize_index()?;
158 Ok(provider)
159 }
160}
161
162impl<N: NodePrimitives> StaticFileProvider<N> {
163 pub fn read_only(path: impl AsRef<Path>, watch_directory: bool) -> ProviderResult<Self> {
173 let provider = StaticFileProviderBuilder::read_only(path)?.build()?;
174
175 if watch_directory {
176 provider.watch_directory();
177 }
178
179 Ok(provider)
180 }
181
182 pub fn read_write(path: impl AsRef<Path>) -> ProviderResult<Self> {
184 StaticFileProviderBuilder::read_write(path)?.build()
185 }
186
187 pub fn watch_directory(&self) {
193 let provider = self.clone();
194 std::thread::spawn(move || {
195 let (tx, rx) = std::sync::mpsc::channel();
196 let mut watcher = RecommendedWatcher::new(
197 move |res| tx.send(res).unwrap(),
198 notify::Config::default(),
199 )
200 .expect("failed to create watcher");
201
202 watcher
203 .watch(&provider.path, RecursiveMode::NonRecursive)
204 .expect("failed to watch path");
205
206 let mut last_event_timestamp = None;
208
209 while let Ok(res) = rx.recv() {
210 match res {
211 Ok(event) => {
212 if !matches!(
214 event.kind,
215 notify::EventKind::Modify(_) |
216 notify::EventKind::Create(_) |
217 notify::EventKind::Remove(_)
218 ) {
219 continue
220 }
221
222 for segment in event.paths {
227 if segment
229 .extension()
230 .is_none_or(|s| s.to_str() != Some(CONFIG_FILE_EXTENSION))
231 {
232 continue
233 }
234
235 if StaticFileSegment::parse_filename(
237 &segment.file_stem().expect("qed").to_string_lossy(),
238 )
239 .is_none()
240 {
241 continue
242 }
243
244 if let Ok(current_modified_timestamp) =
247 std::fs::metadata(&segment).and_then(|m| m.modified())
248 {
249 if last_event_timestamp.is_some_and(|last_timestamp| {
250 last_timestamp >= current_modified_timestamp
251 }) {
252 continue
253 }
254 last_event_timestamp = Some(current_modified_timestamp);
255 }
256
257 info!(target: "providers::static_file", updated_file = ?segment.file_stem(), "re-initializing static file provider index");
258 if let Err(err) = provider.initialize_index() {
259 warn!(target: "providers::static_file", "failed to re-initialize index: {err}");
260 }
261 break
262 }
263 }
264
265 Err(err) => warn!(target: "providers::watcher", "watch error: {err:?}"),
266 }
267 }
268 });
269 }
270}
271
272impl<N: NodePrimitives> Deref for StaticFileProvider<N> {
273 type Target = StaticFileProviderInner<N>;
274
275 fn deref(&self) -> &Self::Target {
276 &self.0
277 }
278}
279
280#[derive(Debug)]
282pub struct StaticFileProviderInner<N> {
283 map: DashMap<(BlockNumber, StaticFileSegment), LoadedJar>,
286 indexes: RwLock<HashMap<StaticFileSegment, StaticFileSegmentIndex>>,
288 earliest_history_height: AtomicU64,
299 path: PathBuf,
301 writers: StaticFileWriters<N>,
303 metrics: Option<Arc<StaticFileProviderMetrics>>,
305 access: StaticFileAccess,
307 blocks_per_file: HashMap<StaticFileSegment, u64>,
309 _lock_file: Option<StorageLock>,
311}
312
313impl<N: NodePrimitives> StaticFileProviderInner<N> {
314 fn new(path: impl AsRef<Path>, access: StaticFileAccess) -> ProviderResult<Self> {
316 let _lock_file = if access.is_read_write() {
317 StorageLock::try_acquire(path.as_ref()).map_err(ProviderError::other)?.into()
318 } else {
319 None
320 };
321
322 let mut blocks_per_file = HashMap::new();
323 for segment in StaticFileSegment::iter() {
324 blocks_per_file.insert(segment, DEFAULT_BLOCKS_PER_STATIC_FILE);
325 }
326
327 let provider = Self {
328 map: Default::default(),
329 indexes: Default::default(),
330 writers: Default::default(),
331 earliest_history_height: Default::default(),
332 path: path.as_ref().to_path_buf(),
333 metrics: None,
334 access,
335 blocks_per_file,
336 _lock_file,
337 };
338
339 Ok(provider)
340 }
341
342 pub const fn is_read_only(&self) -> bool {
343 self.access.is_read_only()
344 }
345
346 pub fn find_fixed_range_with_block_index(
355 &self,
356 segment: StaticFileSegment,
357 block_index: Option<&SegmentRanges>,
358 block: BlockNumber,
359 ) -> SegmentRangeInclusive {
360 let blocks_per_file =
361 self.blocks_per_file.get(&segment).copied().unwrap_or(DEFAULT_BLOCKS_PER_STATIC_FILE);
362
363 if let Some(block_index) = block_index {
364 if let Some((_, range)) = block_index.iter().find(|(max_block, _)| block <= **max_block)
366 {
367 return *range
369 } else if let Some((_, range)) = block_index.last_key_value() {
370 let blocks_after_last_range = block - range.end();
376 let segments_to_skip = (blocks_after_last_range - 1) / blocks_per_file;
377 let start = range.end() + 1 + segments_to_skip * blocks_per_file;
378 return SegmentRangeInclusive::new(start, start + blocks_per_file - 1)
379 }
380 }
381 find_fixed_range(block, blocks_per_file)
384 }
385
386 pub fn find_fixed_range(
399 &self,
400 segment: StaticFileSegment,
401 block: BlockNumber,
402 ) -> SegmentRangeInclusive {
403 self.find_fixed_range_with_block_index(
404 segment,
405 self.indexes
406 .read()
407 .get(&segment)
408 .map(|index| &index.expected_block_ranges_by_max_block),
409 block,
410 )
411 }
412}
413
414impl<N: NodePrimitives> StaticFileProvider<N> {
415 pub fn report_metrics(&self) -> ProviderResult<()> {
417 let Some(metrics) = &self.metrics else { return Ok(()) };
418
419 let static_files = iter_static_files(&self.path).map_err(ProviderError::other)?;
420 for (segment, headers) in static_files {
421 let mut entries = 0;
422 let mut size = 0;
423
424 for (block_range, _) in &headers {
425 let fixed_block_range = self.find_fixed_range(segment, block_range.start());
426 let jar_provider = self
427 .get_segment_provider_for_range(segment, || Some(fixed_block_range), None)?
428 .ok_or_else(|| {
429 ProviderError::MissingStaticFileBlock(segment, block_range.start())
430 })?;
431
432 entries += jar_provider.rows();
433
434 let data_size = reth_fs_util::metadata(jar_provider.data_path())
435 .map(|metadata| metadata.len())
436 .unwrap_or_default();
437 let index_size = reth_fs_util::metadata(jar_provider.index_path())
438 .map(|metadata| metadata.len())
439 .unwrap_or_default();
440 let offsets_size = reth_fs_util::metadata(jar_provider.offsets_path())
441 .map(|metadata| metadata.len())
442 .unwrap_or_default();
443 let config_size = reth_fs_util::metadata(jar_provider.config_path())
444 .map(|metadata| metadata.len())
445 .unwrap_or_default();
446
447 size += data_size + index_size + offsets_size + config_size;
448 }
449
450 metrics.record_segment(segment, size, headers.len(), entries);
451 }
452
453 Ok(())
454 }
455
456 pub fn get_segment_provider(
459 &self,
460 segment: StaticFileSegment,
461 number: u64,
462 ) -> ProviderResult<StaticFileJarProvider<'_, N>> {
463 if segment.is_block_based() {
464 self.get_segment_provider_for_block(segment, number, None)
465 } else {
466 self.get_segment_provider_for_transaction(segment, number, None)
467 }
468 }
469
470 pub fn get_maybe_segment_provider(
475 &self,
476 segment: StaticFileSegment,
477 number: u64,
478 ) -> ProviderResult<Option<StaticFileJarProvider<'_, N>>> {
479 let provider = if segment.is_block_based() {
480 self.get_segment_provider_for_block(segment, number, None)
481 } else {
482 self.get_segment_provider_for_transaction(segment, number, None)
483 };
484
485 match provider {
486 Ok(provider) => Ok(Some(provider)),
487 Err(
488 ProviderError::MissingStaticFileBlock(_, _) |
489 ProviderError::MissingStaticFileTx(_, _),
490 ) => Ok(None),
491 Err(err) => Err(err),
492 }
493 }
494
495 pub fn get_segment_provider_for_block(
497 &self,
498 segment: StaticFileSegment,
499 block: BlockNumber,
500 path: Option<&Path>,
501 ) -> ProviderResult<StaticFileJarProvider<'_, N>> {
502 self.get_segment_provider_for_range(
503 segment,
504 || self.get_segment_ranges_from_block(segment, block),
505 path,
506 )?
507 .ok_or(ProviderError::MissingStaticFileBlock(segment, block))
508 }
509
510 pub fn get_segment_provider_for_transaction(
512 &self,
513 segment: StaticFileSegment,
514 tx: TxNumber,
515 path: Option<&Path>,
516 ) -> ProviderResult<StaticFileJarProvider<'_, N>> {
517 self.get_segment_provider_for_range(
518 segment,
519 || self.get_segment_ranges_from_transaction(segment, tx),
520 path,
521 )?
522 .ok_or(ProviderError::MissingStaticFileTx(segment, tx))
523 }
524
525 pub fn get_segment_provider_for_range(
529 &self,
530 segment: StaticFileSegment,
531 fn_range: impl Fn() -> Option<SegmentRangeInclusive>,
532 path: Option<&Path>,
533 ) -> ProviderResult<Option<StaticFileJarProvider<'_, N>>> {
534 let block_range = match path {
537 Some(path) => StaticFileSegment::parse_filename(
538 &path
539 .file_name()
540 .ok_or_else(|| {
541 ProviderError::MissingStaticFileSegmentPath(segment, path.to_path_buf())
542 })?
543 .to_string_lossy(),
544 )
545 .and_then(|(parsed_segment, block_range)| {
546 if parsed_segment == segment {
547 return Some(block_range)
548 }
549 None
550 }),
551 None => fn_range(),
552 };
553
554 if let Some(block_range) = block_range {
556 return Ok(Some(self.get_or_create_jar_provider(segment, &block_range)?))
557 }
558
559 Ok(None)
560 }
561
562 pub fn get_segment_provider_for_path(
564 &self,
565 path: &Path,
566 ) -> ProviderResult<Option<StaticFileJarProvider<'_, N>>> {
567 StaticFileSegment::parse_filename(
568 &path
569 .file_name()
570 .ok_or_else(|| ProviderError::MissingStaticFilePath(path.to_path_buf()))?
571 .to_string_lossy(),
572 )
573 .map(|(segment, block_range)| self.get_or_create_jar_provider(segment, &block_range))
574 .transpose()
575 }
576
577 pub fn remove_cached_provider(
581 &self,
582 segment: StaticFileSegment,
583 fixed_block_range_end: BlockNumber,
584 ) {
585 self.map.remove(&(fixed_block_range_end, segment));
586 }
587
588 pub fn delete_segment_below_block(
605 &self,
606 segment: StaticFileSegment,
607 block: BlockNumber,
608 ) -> ProviderResult<Vec<SegmentHeader>> {
609 if block == 0 {
611 return Ok(Vec::new())
612 }
613
614 let highest_block = self.get_highest_static_file_block(segment);
615 let mut deleted_headers = Vec::new();
616
617 loop {
618 let Some(block_height) = self.get_lowest_range_end(segment) else {
619 return Ok(deleted_headers)
620 };
621
622 if block_height >= block || Some(block_height) == highest_block {
624 return Ok(deleted_headers)
625 }
626
627 debug!(
628 target: "provider::static_file",
629 ?segment,
630 ?block_height,
631 "Deleting static file below block"
632 );
633
634 let header = self.delete_jar(segment, block_height).inspect_err(|err| {
637 warn!( target: "provider::static_file", ?segment, %block_height, ?err, "Failed to delete static file below block")
638 })?;
639
640 deleted_headers.push(header);
641 }
642 }
643
644 pub fn delete_jar(
652 &self,
653 segment: StaticFileSegment,
654 block: BlockNumber,
655 ) -> ProviderResult<SegmentHeader> {
656 let fixed_block_range = self.find_fixed_range(segment, block);
657 let key = (fixed_block_range.end(), segment);
658 let jar = if let Some((_, jar)) = self.map.remove(&key) {
659 jar.jar
660 } else {
661 let file = self.path.join(segment.filename(&fixed_block_range));
662 debug!(
663 target: "provider::static_file",
664 ?file,
665 ?fixed_block_range,
666 ?block,
667 "Loading static file jar for deletion"
668 );
669 NippyJar::<SegmentHeader>::load(&file).map_err(ProviderError::other)?
670 };
671
672 let header = *jar.user_header();
673 jar.delete().map_err(ProviderError::other)?;
674
675 self.initialize_index()?;
678
679 Ok(header)
680 }
681
682 fn get_or_create_jar_provider(
686 &self,
687 segment: StaticFileSegment,
688 fixed_block_range: &SegmentRangeInclusive,
689 ) -> ProviderResult<StaticFileJarProvider<'_, N>> {
690 let key = (fixed_block_range.end(), segment);
691
692 trace!(target: "provider::static_file", ?segment, ?fixed_block_range, "Getting provider");
694 let mut provider: StaticFileJarProvider<'_, N> = if let Some(jar) = self.map.get(&key) {
695 trace!(target: "provider::static_file", ?segment, ?fixed_block_range, "Jar found in cache");
696 jar.into()
697 } else {
698 trace!(target: "provider::static_file", ?segment, ?fixed_block_range, "Creating jar from scratch");
699 let path = self.path.join(segment.filename(fixed_block_range));
700 let jar = NippyJar::load(&path).map_err(ProviderError::other)?;
701 self.map.entry(key).insert(LoadedJar::new(jar)?).downgrade().into()
702 };
703
704 if let Some(metrics) = &self.metrics {
705 provider = provider.with_metrics(metrics.clone());
706 }
707 Ok(provider)
708 }
709
710 fn get_segment_ranges_from_block(
713 &self,
714 segment: StaticFileSegment,
715 block: u64,
716 ) -> Option<SegmentRangeInclusive> {
717 let indexes = self.indexes.read();
718 let index = indexes.get(&segment)?;
719
720 (index.max_block >= block).then(|| {
721 self.find_fixed_range_with_block_index(
722 segment,
723 Some(&index.expected_block_ranges_by_max_block),
724 block,
725 )
726 })
727 }
728
729 fn get_segment_ranges_from_transaction(
732 &self,
733 segment: StaticFileSegment,
734 tx: u64,
735 ) -> Option<SegmentRangeInclusive> {
736 let indexes = self.indexes.read();
737 let index = indexes.get(&segment)?;
738 let available_block_ranges_by_max_tx = index.available_block_ranges_by_max_tx.as_ref()?;
739
740 let mut static_files_rev_iter = available_block_ranges_by_max_tx.iter().rev().peekable();
743
744 while let Some((tx_end, block_range)) = static_files_rev_iter.next() {
745 if tx > *tx_end {
746 return None
748 }
749 let tx_start = static_files_rev_iter.peek().map(|(tx_end, _)| *tx_end + 1).unwrap_or(0);
750 if tx_start <= tx {
751 return Some(self.find_fixed_range_with_block_index(
752 segment,
753 Some(&index.expected_block_ranges_by_max_block),
754 block_range.end(),
755 ))
756 }
757 }
758 None
759 }
760
761 pub fn update_index(
768 &self,
769 segment: StaticFileSegment,
770 segment_max_block: Option<BlockNumber>,
771 ) -> ProviderResult<()> {
772 debug!(
773 target: "provider::static_file",
774 ?segment,
775 ?segment_max_block,
776 "Updating provider index"
777 );
778 let mut indexes = self.indexes.write();
779
780 match segment_max_block {
781 Some(segment_max_block) => {
782 let fixed_range = self.find_fixed_range_with_block_index(
783 segment,
784 indexes.get(&segment).map(|index| &index.expected_block_ranges_by_max_block),
785 segment_max_block,
786 );
787
788 let jar = NippyJar::<SegmentHeader>::load(
789 &self.path.join(segment.filename(&fixed_range)),
790 )
791 .map_err(ProviderError::other)?;
792
793 let index = indexes
794 .entry(segment)
795 .and_modify(|index| {
796 index.max_block = segment_max_block;
798
799 index
803 .expected_block_ranges_by_max_block
804 .retain(|_, block_range| block_range.start() < fixed_range.start());
805 index
807 .expected_block_ranges_by_max_block
808 .insert(fixed_range.end(), fixed_range);
809 })
810 .or_insert_with(|| StaticFileSegmentIndex {
811 min_block_range: None,
812 max_block: segment_max_block,
813 expected_block_ranges_by_max_block: BTreeMap::from([(
814 fixed_range.end(),
815 fixed_range,
816 )]),
817 available_block_ranges_by_max_tx: None,
818 });
819
820 if let Some(current_block_range) = jar.user_header().block_range() {
836 if let Some(min_block_range) = index.min_block_range.as_mut() {
837 if current_block_range.start() == min_block_range.start() {
840 *min_block_range = current_block_range;
841 }
842 } else {
843 index.min_block_range = Some(current_block_range);
844 }
845 }
846
847 if let Some(tx_range) = jar.user_header().tx_range() {
850 if let Some(current_block_range) = jar.user_header().block_range() {
853 let tx_end = tx_range.end();
854
855 if let Some(index) = index.available_block_ranges_by_max_tx.as_mut() {
864 index
865 .retain(|_, block_range| block_range.start() < fixed_range.start());
866 index.insert(tx_end, current_block_range);
867 } else {
868 index.available_block_ranges_by_max_tx =
869 Some(BTreeMap::from([(tx_end, current_block_range)]));
870 }
871 }
872 } else if segment.is_tx_based() {
873 if let Some(index) = index.available_block_ranges_by_max_tx.as_mut() {
877 index.retain(|_, block_range| block_range.start() < fixed_range.start());
878 }
879
880 index.available_block_ranges_by_max_tx.take_if(|index| index.is_empty());
882 }
883
884 debug!(target: "provider::static_file", ?segment, "Inserting updated jar into cache");
886 self.map.insert((fixed_range.end(), segment), LoadedJar::new(jar)?);
887
888 debug!(target: "provider::static_file", ?segment, "Cleaning up jar map");
890 self.map.retain(|(end, seg), _| !(*seg == segment && *end > fixed_range.end()));
891 }
892 None => {
893 debug!(target: "provider::static_file", ?segment, "Removing segment from index");
894 indexes.remove(&segment);
895 }
896 };
897
898 debug!(target: "provider::static_file", ?segment, "Updated provider index");
899 Ok(())
900 }
901
902 pub fn initialize_index(&self) -> ProviderResult<()> {
904 let mut indexes = self.indexes.write();
905 indexes.clear();
906
907 for (segment, headers) in iter_static_files(&self.path).map_err(ProviderError::other)? {
908 let min_block_range = Some(headers.first().expect("headers are not empty").0);
913 let max_block = headers.last().expect("headers are not empty").0.end();
914
915 let mut expected_block_ranges_by_max_block = BTreeMap::default();
916 let mut available_block_ranges_by_max_tx = None;
917
918 for (block_range, header) in headers {
919 expected_block_ranges_by_max_block
921 .insert(header.expected_block_end(), header.expected_block_range());
922
923 if let Some(tx_range) = header.tx_range() {
925 let tx_end = tx_range.end();
926
927 available_block_ranges_by_max_tx
928 .get_or_insert_with(BTreeMap::default)
929 .insert(tx_end, block_range);
930 }
931 }
932
933 indexes.insert(
934 segment,
935 StaticFileSegmentIndex {
936 min_block_range,
937 max_block,
938 expected_block_ranges_by_max_block,
939 available_block_ranges_by_max_tx,
940 },
941 );
942 }
943
944 self.map.clear();
946
947 if let Some(lowest_range) =
949 indexes.get(&StaticFileSegment::Transactions).and_then(|index| index.min_block_range)
950 {
951 self.earliest_history_height
953 .store(lowest_range.start(), std::sync::atomic::Ordering::Relaxed);
954 }
955
956 Ok(())
957 }
958
959 pub fn check_consistency<Provider>(
983 &self,
984 provider: &Provider,
985 ) -> ProviderResult<Option<PipelineTarget>>
986 where
987 Provider: DBProvider
988 + BlockReader
989 + StageCheckpointReader
990 + ChainSpecProvider
991 + StorageSettingsCache,
992 N: NodePrimitives<Receipt: Value, BlockHeader: Value, SignedTx: Value>,
993 {
994 if provider.chain_spec().is_optimism() &&
1001 reth_chainspec::Chain::optimism_mainnet() == provider.chain_spec().chain_id()
1002 {
1003 const OVM_HEADER_1_HASH: B256 =
1005 b256!("0xbee7192e575af30420cae0c7776304ac196077ee72b048970549e4f08e875453");
1006 if provider.block_number(OVM_HEADER_1_HASH)?.is_some() {
1007 info!(target: "reth::cli",
1008 "Skipping storage verification for OP mainnet, expected inconsistency in OVM chain"
1009 );
1010 return Ok(None)
1011 }
1012 }
1013
1014 info!(target: "reth::cli", "Verifying storage consistency.");
1015
1016 let mut unwind_target: Option<BlockNumber> = None;
1017 let mut update_unwind_target = |new_target: BlockNumber| {
1018 if let Some(target) = unwind_target.as_mut() {
1019 *target = (*target).min(new_target);
1020 } else {
1021 unwind_target = Some(new_target);
1022 }
1023 };
1024
1025 for segment in StaticFileSegment::iter() {
1026 debug!(target: "reth::providers::static_file", ?segment, "Checking consistency for segment");
1027 match segment {
1028 StaticFileSegment::Headers | StaticFileSegment::Transactions => {}
1029 StaticFileSegment::Receipts => {
1030 if EitherWriter::receipts_destination(provider).is_database() {
1031 debug!(target: "reth::providers::static_file", ?segment, "Skipping receipts consistency check: receipts stored in database");
1034 continue
1035 }
1036
1037 if NamedChain::Gnosis == provider.chain_spec().chain_id() ||
1038 NamedChain::Chiado == provider.chain_spec().chain_id()
1039 {
1040 debug!(target: "reth::providers::static_file", ?segment, "Skipping receipts consistency check: broken historical import for gnosis/chiado");
1044 continue;
1045 }
1046 }
1047 StaticFileSegment::TransactionSenders => {
1048 if EitherWriterDestination::senders(provider).is_database() {
1049 continue
1050 }
1051 }
1052 }
1053
1054 let initial_highest_block = self.get_highest_static_file_block(segment);
1055 debug!(target: "reth::providers::static_file", ?segment, ?initial_highest_block, "Initial highest block for segment");
1056
1057 if self.access.is_read_only() {
1066 debug!(target: "reth::providers::static_file", ?segment, "Checking segment consistency (read-only)");
1067 self.check_segment_consistency(segment)?;
1068 } else {
1069 debug!(target: "reth::providers::static_file", ?segment, "Fetching latest writer which might heal any potential inconsistency");
1070 self.latest_writer(segment)?;
1072 }
1073
1074 let mut highest_block = self.get_highest_static_file_block(segment);
1079 if initial_highest_block != highest_block {
1080 info!(
1081 target: "reth::providers::static_file",
1082 ?initial_highest_block,
1083 unwind_target = highest_block,
1084 ?segment,
1085 "Setting unwind target."
1086 );
1087 update_unwind_target(highest_block.unwrap_or_default());
1088 }
1089
1090 let highest_tx = self.get_highest_static_file_tx(segment);
1096 debug!(target: "reth::providers::static_file", ?segment, ?highest_tx, ?highest_block, "Highest transaction for segment");
1097 if let Some(highest_tx) = highest_tx {
1098 let mut last_block = highest_block.unwrap_or_default();
1099 debug!(target: "reth::providers::static_file", ?segment, last_block, highest_tx, "Verifying last transaction matches last block indices");
1100 loop {
1101 if let Some(indices) = provider.block_body_indices(last_block)? {
1102 debug!(target: "reth::providers::static_file", ?segment, last_block, last_tx_num = indices.last_tx_num(), highest_tx, "Found block body indices");
1103 if indices.last_tx_num() <= highest_tx {
1104 break
1105 }
1106 } else {
1107 debug!(target: "reth::providers::static_file", ?segment, last_block, "Block body indices not found, static files ahead of database");
1108 break
1112 }
1113 if last_block == 0 {
1114 debug!(target: "reth::providers::static_file", ?segment, "Reached block 0 in verification loop");
1115 break
1116 }
1117 last_block -= 1;
1118
1119 info!(
1120 target: "reth::providers::static_file",
1121 highest_block = self.get_highest_static_file_block(segment),
1122 unwind_target = last_block,
1123 ?segment,
1124 "Setting unwind target."
1125 );
1126 highest_block = Some(last_block);
1127 update_unwind_target(last_block);
1128 }
1129 }
1130
1131 debug!(target: "reth::providers::static_file", ?segment, "Ensuring invariants for segment");
1132 if let Some(unwind) = match segment {
1133 StaticFileSegment::Headers => self
1134 .ensure_invariants::<_, tables::Headers<N::BlockHeader>>(
1135 provider,
1136 segment,
1137 highest_block,
1138 highest_block,
1139 )?,
1140 StaticFileSegment::Transactions => self
1141 .ensure_invariants::<_, tables::Transactions<N::SignedTx>>(
1142 provider,
1143 segment,
1144 highest_tx,
1145 highest_block,
1146 )?,
1147 StaticFileSegment::Receipts => self
1148 .ensure_invariants::<_, tables::Receipts<N::Receipt>>(
1149 provider,
1150 segment,
1151 highest_tx,
1152 highest_block,
1153 )?,
1154 StaticFileSegment::TransactionSenders => self
1155 .ensure_invariants::<_, tables::TransactionSenders>(
1156 provider,
1157 segment,
1158 highest_tx,
1159 highest_block,
1160 )?,
1161 } {
1162 debug!(target: "reth::providers::static_file", ?segment, unwind_target=unwind, "Invariants check returned unwind target");
1163 update_unwind_target(unwind);
1164 } else {
1165 debug!(target: "reth::providers::static_file", ?segment, "Invariants check completed, no unwind needed");
1166 }
1167 }
1168
1169 Ok(unwind_target.map(PipelineTarget::Unwind))
1170 }
1171
1172 pub fn check_segment_consistency(&self, segment: StaticFileSegment) -> ProviderResult<()> {
1175 debug!(target: "reth::providers::static_file", ?segment, "Checking segment consistency");
1176 if let Some(latest_block) = self.get_highest_static_file_block(segment) {
1177 let file_path = self
1178 .directory()
1179 .join(segment.filename(&self.find_fixed_range(segment, latest_block)));
1180 debug!(target: "reth::providers::static_file", ?segment, ?file_path, latest_block, "Loading NippyJar for consistency check");
1181
1182 let jar = NippyJar::<SegmentHeader>::load(&file_path).map_err(ProviderError::other)?;
1183 debug!(target: "reth::providers::static_file", ?segment, "NippyJar loaded, checking consistency");
1184
1185 NippyJarChecker::new(jar).check_consistency().map_err(ProviderError::other)?;
1186 debug!(target: "reth::providers::static_file", ?segment, "NippyJar consistency check passed");
1187 } else {
1188 debug!(target: "reth::providers::static_file", ?segment, "No static file block found, skipping consistency check");
1189 }
1190 Ok(())
1191 }
1192
1193 fn ensure_invariants<Provider, T: Table<Key = u64>>(
1208 &self,
1209 provider: &Provider,
1210 segment: StaticFileSegment,
1211 highest_static_file_entry: Option<u64>,
1212 highest_static_file_block: Option<BlockNumber>,
1213 ) -> ProviderResult<Option<BlockNumber>>
1214 where
1215 Provider: DBProvider + BlockReader + StageCheckpointReader,
1216 {
1217 debug!(target: "reth::providers::static_file", ?segment, ?highest_static_file_entry, ?highest_static_file_block, "Ensuring invariants");
1218 let mut db_cursor = provider.tx_ref().cursor_read::<T>()?;
1219
1220 if let Some((db_first_entry, _)) = db_cursor.first()? {
1221 debug!(target: "reth::providers::static_file", ?segment, db_first_entry, "Found first database entry");
1222 if let (Some(highest_entry), Some(highest_block)) =
1223 (highest_static_file_entry, highest_static_file_block)
1224 {
1225 if !(db_first_entry <= highest_entry || highest_entry + 1 == db_first_entry) {
1229 info!(
1230 target: "reth::providers::static_file",
1231 ?db_first_entry,
1232 ?highest_entry,
1233 unwind_target = highest_block,
1234 ?segment,
1235 "Setting unwind target."
1236 );
1237 return Ok(Some(highest_block))
1238 }
1239 }
1240
1241 if let Some((db_last_entry, _)) = db_cursor.last()? &&
1242 highest_static_file_entry
1243 .is_none_or(|highest_entry| db_last_entry > highest_entry)
1244 {
1245 debug!(target: "reth::providers::static_file", ?segment, db_last_entry, ?highest_static_file_entry, "Database has entries beyond static files, no unwind needed");
1246 return Ok(None)
1247 }
1248 } else {
1249 debug!(target: "reth::providers::static_file", ?segment, "No database entries found");
1250 }
1251
1252 let highest_static_file_entry = highest_static_file_entry.unwrap_or_default();
1253 let highest_static_file_block = highest_static_file_block.unwrap_or_default();
1254
1255 let stage_id = match segment {
1258 StaticFileSegment::Headers => StageId::Headers,
1259 StaticFileSegment::Transactions => StageId::Bodies,
1260 StaticFileSegment::Receipts => StageId::Execution,
1261 StaticFileSegment::TransactionSenders => StageId::SenderRecovery,
1262 };
1263 let checkpoint_block_number =
1264 provider.get_stage_checkpoint(stage_id)?.unwrap_or_default().block_number;
1265 debug!(target: "reth::providers::static_file", ?segment, ?stage_id, checkpoint_block_number, highest_static_file_block, "Retrieved stage checkpoint");
1266
1267 if checkpoint_block_number > highest_static_file_block {
1269 info!(
1270 target: "reth::providers::static_file",
1271 checkpoint_block_number,
1272 unwind_target = highest_static_file_block,
1273 ?segment,
1274 "Setting unwind target."
1275 );
1276 return Ok(Some(highest_static_file_block))
1277 }
1278
1279 if checkpoint_block_number < highest_static_file_block {
1283 info!(
1284 target: "reth::providers",
1285 ?segment,
1286 from = highest_static_file_block,
1287 to = checkpoint_block_number,
1288 "Unwinding static file segment."
1289 );
1290 let mut writer = self.latest_writer(segment)?;
1291 match segment {
1292 StaticFileSegment::Headers => {
1293 let prune_count = highest_static_file_block - checkpoint_block_number;
1294 debug!(target: "reth::providers::static_file", ?segment, prune_count, "Pruning headers");
1295 writer.prune_headers(prune_count)?;
1297 }
1298 StaticFileSegment::Transactions |
1299 StaticFileSegment::Receipts |
1300 StaticFileSegment::TransactionSenders => {
1301 if let Some(block) = provider.block_body_indices(checkpoint_block_number)? {
1302 let number = highest_static_file_entry - block.last_tx_num();
1303 debug!(target: "reth::providers::static_file", ?segment, prune_count = number, checkpoint_block_number, "Pruning transaction based segment");
1304
1305 match segment {
1306 StaticFileSegment::Transactions => {
1307 writer.prune_transactions(number, checkpoint_block_number)?
1308 }
1309 StaticFileSegment::Receipts => {
1310 writer.prune_receipts(number, checkpoint_block_number)?
1311 }
1312 StaticFileSegment::TransactionSenders => {
1313 writer.prune_transaction_senders(number, checkpoint_block_number)?
1314 }
1315 StaticFileSegment::Headers => unreachable!(),
1316 }
1317 } else {
1318 debug!(target: "reth::providers::static_file", ?segment, checkpoint_block_number, "No block body indices found for checkpoint block");
1319 }
1320 }
1321 }
1322 debug!(target: "reth::providers::static_file", ?segment, "Committing writer after pruning");
1323 writer.commit()?;
1324 debug!(target: "reth::providers::static_file", ?segment, "Writer committed successfully");
1325 }
1326
1327 debug!(target: "reth::providers::static_file", ?segment, "Invariants ensured, returning None");
1328 Ok(None)
1329 }
1330
1331 pub fn earliest_history_height(&self) -> BlockNumber {
1339 self.earliest_history_height.load(std::sync::atomic::Ordering::Relaxed)
1340 }
1341
1342 pub fn get_lowest_range(&self, segment: StaticFileSegment) -> Option<SegmentRangeInclusive> {
1346 self.indexes.read().get(&segment).and_then(|index| index.min_block_range)
1347 }
1348
1349 pub fn get_lowest_range_start(&self, segment: StaticFileSegment) -> Option<BlockNumber> {
1355 self.get_lowest_range(segment).map(|range| range.start())
1356 }
1357
1358 pub fn get_lowest_range_end(&self, segment: StaticFileSegment) -> Option<BlockNumber> {
1364 self.get_lowest_range(segment).map(|range| range.end())
1365 }
1366
1367 pub fn get_highest_static_file_block(&self, segment: StaticFileSegment) -> Option<BlockNumber> {
1371 self.indexes.read().get(&segment).map(|index| index.max_block)
1372 }
1373
1374 pub fn get_highest_static_file_tx(&self, segment: StaticFileSegment) -> Option<TxNumber> {
1378 self.indexes
1379 .read()
1380 .get(&segment)
1381 .and_then(|index| index.available_block_ranges_by_max_tx.as_ref())
1382 .and_then(|index| index.last_key_value().map(|(last_tx, _)| *last_tx))
1383 }
1384
1385 pub fn get_highest_static_files(&self) -> HighestStaticFiles {
1387 HighestStaticFiles {
1388 receipts: self.get_highest_static_file_block(StaticFileSegment::Receipts),
1389 }
1390 }
1391
1392 pub fn find_static_file<T>(
1395 &self,
1396 segment: StaticFileSegment,
1397 func: impl Fn(StaticFileJarProvider<'_, N>) -> ProviderResult<Option<T>>,
1398 ) -> ProviderResult<Option<T>> {
1399 if let Some(ranges) =
1400 self.indexes.read().get(&segment).map(|index| &index.expected_block_ranges_by_max_block)
1401 {
1402 for range in ranges.values().rev() {
1404 if let Some(res) = func(self.get_or_create_jar_provider(segment, range)?)? {
1405 return Ok(Some(res))
1406 }
1407 }
1408 }
1409
1410 Ok(None)
1411 }
1412
1413 pub fn fetch_range_with_predicate<T, F, P>(
1419 &self,
1420 segment: StaticFileSegment,
1421 range: Range<u64>,
1422 mut get_fn: F,
1423 mut predicate: P,
1424 ) -> ProviderResult<Vec<T>>
1425 where
1426 F: FnMut(&mut StaticFileCursor<'_>, u64) -> ProviderResult<Option<T>>,
1427 P: FnMut(&T) -> bool,
1428 {
1429 let mut result = Vec::with_capacity((range.end - range.start).min(100) as usize);
1430
1431 macro_rules! get_provider {
1435 ($number:expr) => {{
1436 match self.get_segment_provider(segment, $number) {
1437 Ok(provider) => provider,
1438 Err(
1439 ProviderError::MissingStaticFileBlock(_, _) |
1440 ProviderError::MissingStaticFileTx(_, _),
1441 ) => return Ok(result),
1442 Err(err) => return Err(err),
1443 }
1444 }};
1445 }
1446
1447 let mut provider = get_provider!(range.start);
1448 let mut cursor = provider.cursor()?;
1449
1450 'outer: for number in range {
1452 let mut retrying = false;
1456
1457 'inner: loop {
1459 match get_fn(&mut cursor, number)? {
1460 Some(res) => {
1461 if !predicate(&res) {
1462 break 'outer
1463 }
1464 result.push(res);
1465 break 'inner
1466 }
1467 None => {
1468 if retrying {
1469 return Ok(result)
1470 }
1471 drop(cursor);
1476 drop(provider);
1477 provider = get_provider!(number);
1478 cursor = provider.cursor()?;
1479 retrying = true;
1480 }
1481 }
1482 }
1483 }
1484
1485 Ok(result)
1486 }
1487
1488 pub fn fetch_range_iter<'a, T, F>(
1493 &'a self,
1494 segment: StaticFileSegment,
1495 range: Range<u64>,
1496 get_fn: F,
1497 ) -> ProviderResult<impl Iterator<Item = ProviderResult<Option<T>>> + 'a>
1498 where
1499 F: Fn(&mut StaticFileCursor<'_>, u64) -> ProviderResult<Option<T>> + 'a,
1500 T: std::fmt::Debug,
1501 {
1502 let mut provider = self.get_maybe_segment_provider(segment, range.start)?;
1503 Ok(range.map(move |number| {
1504 match provider
1505 .as_ref()
1506 .map(|provider| get_fn(&mut provider.cursor()?, number))
1507 .and_then(|result| result.transpose())
1508 {
1509 Some(result) => result.map(Some),
1510 None => {
1511 provider.take();
1515 provider = self.get_maybe_segment_provider(segment, number)?;
1516 provider
1517 .as_ref()
1518 .map(|provider| get_fn(&mut provider.cursor()?, number))
1519 .and_then(|result| result.transpose())
1520 .transpose()
1521 }
1522 }
1523 }))
1524 }
1525
1526 pub fn directory(&self) -> &Path {
1528 &self.path
1529 }
1530
1531 pub fn get_with_static_file_or_database<T, FS, FD>(
1541 &self,
1542 segment: StaticFileSegment,
1543 number: u64,
1544 fetch_from_static_file: FS,
1545 fetch_from_database: FD,
1546 ) -> ProviderResult<Option<T>>
1547 where
1548 FS: Fn(&Self) -> ProviderResult<Option<T>>,
1549 FD: Fn() -> ProviderResult<Option<T>>,
1550 {
1551 let static_file_upper_bound = if segment.is_block_based() {
1553 self.get_highest_static_file_block(segment)
1554 } else {
1555 self.get_highest_static_file_tx(segment)
1556 };
1557
1558 if static_file_upper_bound
1559 .is_some_and(|static_file_upper_bound| static_file_upper_bound >= number)
1560 {
1561 return fetch_from_static_file(self)
1562 }
1563 fetch_from_database()
1564 }
1565
1566 pub fn get_range_with_static_file_or_database<T, P, FS, FD>(
1578 &self,
1579 segment: StaticFileSegment,
1580 mut block_or_tx_range: Range<u64>,
1581 fetch_from_static_file: FS,
1582 mut fetch_from_database: FD,
1583 mut predicate: P,
1584 ) -> ProviderResult<Vec<T>>
1585 where
1586 FS: Fn(&Self, Range<u64>, &mut P) -> ProviderResult<Vec<T>>,
1587 FD: FnMut(Range<u64>, P) -> ProviderResult<Vec<T>>,
1588 P: FnMut(&T) -> bool,
1589 {
1590 let mut data = Vec::new();
1591
1592 if let Some(static_file_upper_bound) = if segment.is_block_based() {
1594 self.get_highest_static_file_block(segment)
1595 } else {
1596 self.get_highest_static_file_tx(segment)
1597 } && block_or_tx_range.start <= static_file_upper_bound
1598 {
1599 let end = block_or_tx_range.end.min(static_file_upper_bound + 1);
1600 data.extend(fetch_from_static_file(
1601 self,
1602 block_or_tx_range.start..end,
1603 &mut predicate,
1604 )?);
1605 block_or_tx_range.start = end;
1606 }
1607
1608 if block_or_tx_range.end > block_or_tx_range.start {
1609 data.extend(fetch_from_database(block_or_tx_range, predicate)?)
1610 }
1611
1612 Ok(data)
1613 }
1614
1615 #[cfg(any(test, feature = "test-utils"))]
1617 pub fn path(&self) -> &Path {
1618 &self.path
1619 }
1620
1621 #[cfg(any(test, feature = "test-utils"))]
1623 pub fn tx_index(&self, segment: StaticFileSegment) -> Option<SegmentRanges> {
1624 self.indexes
1625 .read()
1626 .get(&segment)
1627 .and_then(|index| index.available_block_ranges_by_max_tx.as_ref())
1628 .cloned()
1629 }
1630
1631 #[cfg(any(test, feature = "test-utils"))]
1633 pub fn expected_block_index(&self, segment: StaticFileSegment) -> Option<SegmentRanges> {
1634 self.indexes
1635 .read()
1636 .get(&segment)
1637 .map(|index| &index.expected_block_ranges_by_max_block)
1638 .cloned()
1639 }
1640}
1641
1642#[derive(Debug)]
1643struct StaticFileSegmentIndex {
1644 min_block_range: Option<SegmentRangeInclusive>,
1656 max_block: u64,
1658 expected_block_ranges_by_max_block: SegmentRanges,
1664 available_block_ranges_by_max_tx: Option<SegmentRanges>,
1671}
1672
1673pub trait StaticFileWriter {
1675 type Primitives: Send + Sync + 'static;
1677
1678 fn get_writer(
1680 &self,
1681 block: BlockNumber,
1682 segment: StaticFileSegment,
1683 ) -> ProviderResult<StaticFileProviderRWRefMut<'_, Self::Primitives>>;
1684
1685 fn latest_writer(
1688 &self,
1689 segment: StaticFileSegment,
1690 ) -> ProviderResult<StaticFileProviderRWRefMut<'_, Self::Primitives>>;
1691
1692 fn commit(&self) -> ProviderResult<()>;
1694
1695 fn has_unwind_queued(&self) -> bool;
1697}
1698
1699impl<N: NodePrimitives> StaticFileWriter for StaticFileProvider<N> {
1700 type Primitives = N;
1701
1702 fn get_writer(
1703 &self,
1704 block: BlockNumber,
1705 segment: StaticFileSegment,
1706 ) -> ProviderResult<StaticFileProviderRWRefMut<'_, Self::Primitives>> {
1707 if self.access.is_read_only() {
1708 return Err(ProviderError::ReadOnlyStaticFileAccess)
1709 }
1710
1711 trace!(target: "provider::static_file", ?block, ?segment, "Getting static file writer.");
1712 self.writers.get_or_create(segment, || {
1713 StaticFileProviderRW::new(segment, block, Arc::downgrade(&self.0), self.metrics.clone())
1714 })
1715 }
1716
1717 fn latest_writer(
1718 &self,
1719 segment: StaticFileSegment,
1720 ) -> ProviderResult<StaticFileProviderRWRefMut<'_, Self::Primitives>> {
1721 self.get_writer(self.get_highest_static_file_block(segment).unwrap_or_default(), segment)
1722 }
1723
1724 fn commit(&self) -> ProviderResult<()> {
1725 self.writers.commit()
1726 }
1727
1728 fn has_unwind_queued(&self) -> bool {
1729 self.writers.has_unwind_queued()
1730 }
1731}
1732
1733impl<N: NodePrimitives<BlockHeader: Value>> HeaderProvider for StaticFileProvider<N> {
1734 type Header = N::BlockHeader;
1735
1736 fn header(&self, block_hash: BlockHash) -> ProviderResult<Option<Self::Header>> {
1737 self.find_static_file(StaticFileSegment::Headers, |jar_provider| {
1738 Ok(jar_provider
1739 .cursor()?
1740 .get_two::<HeaderWithHashMask<Self::Header>>((&block_hash).into())?
1741 .and_then(|(header, hash)| {
1742 if hash == block_hash {
1743 return Some(header)
1744 }
1745 None
1746 }))
1747 })
1748 }
1749
1750 fn header_by_number(&self, num: BlockNumber) -> ProviderResult<Option<Self::Header>> {
1751 self.get_segment_provider_for_block(StaticFileSegment::Headers, num, None)
1752 .and_then(|provider| provider.header_by_number(num))
1753 .or_else(|err| {
1754 if let ProviderError::MissingStaticFileBlock(_, _) = err {
1755 Ok(None)
1756 } else {
1757 Err(err)
1758 }
1759 })
1760 }
1761
1762 fn headers_range(
1763 &self,
1764 range: impl RangeBounds<BlockNumber>,
1765 ) -> ProviderResult<Vec<Self::Header>> {
1766 self.fetch_range_with_predicate(
1767 StaticFileSegment::Headers,
1768 to_range(range),
1769 |cursor, number| cursor.get_one::<HeaderMask<Self::Header>>(number.into()),
1770 |_| true,
1771 )
1772 }
1773
1774 fn sealed_header(
1775 &self,
1776 num: BlockNumber,
1777 ) -> ProviderResult<Option<SealedHeader<Self::Header>>> {
1778 self.get_segment_provider_for_block(StaticFileSegment::Headers, num, None)
1779 .and_then(|provider| provider.sealed_header(num))
1780 .or_else(|err| {
1781 if let ProviderError::MissingStaticFileBlock(_, _) = err {
1782 Ok(None)
1783 } else {
1784 Err(err)
1785 }
1786 })
1787 }
1788
1789 fn sealed_headers_while(
1790 &self,
1791 range: impl RangeBounds<BlockNumber>,
1792 predicate: impl FnMut(&SealedHeader<Self::Header>) -> bool,
1793 ) -> ProviderResult<Vec<SealedHeader<Self::Header>>> {
1794 self.fetch_range_with_predicate(
1795 StaticFileSegment::Headers,
1796 to_range(range),
1797 |cursor, number| {
1798 Ok(cursor
1799 .get_two::<HeaderWithHashMask<Self::Header>>(number.into())?
1800 .map(|(header, hash)| SealedHeader::new(header, hash)))
1801 },
1802 predicate,
1803 )
1804 }
1805}
1806
1807impl<N: NodePrimitives> BlockHashReader for StaticFileProvider<N> {
1808 fn block_hash(&self, num: u64) -> ProviderResult<Option<B256>> {
1809 self.get_segment_provider_for_block(StaticFileSegment::Headers, num, None)
1810 .and_then(|provider| provider.block_hash(num))
1811 .or_else(|err| {
1812 if let ProviderError::MissingStaticFileBlock(_, _) = err {
1813 Ok(None)
1814 } else {
1815 Err(err)
1816 }
1817 })
1818 }
1819
1820 fn canonical_hashes_range(
1821 &self,
1822 start: BlockNumber,
1823 end: BlockNumber,
1824 ) -> ProviderResult<Vec<B256>> {
1825 self.fetch_range_with_predicate(
1826 StaticFileSegment::Headers,
1827 start..end,
1828 |cursor, number| cursor.get_one::<BlockHashMask>(number.into()),
1829 |_| true,
1830 )
1831 }
1832}
1833
1834impl<N: NodePrimitives<SignedTx: Value + SignedTransaction, Receipt: Value>> ReceiptProvider
1835 for StaticFileProvider<N>
1836{
1837 type Receipt = N::Receipt;
1838
1839 fn receipt(&self, num: TxNumber) -> ProviderResult<Option<Self::Receipt>> {
1840 self.get_segment_provider_for_transaction(StaticFileSegment::Receipts, num, None)
1841 .and_then(|provider| provider.receipt(num))
1842 .or_else(|err| {
1843 if let ProviderError::MissingStaticFileTx(_, _) = err {
1844 Ok(None)
1845 } else {
1846 Err(err)
1847 }
1848 })
1849 }
1850
1851 fn receipt_by_hash(&self, hash: TxHash) -> ProviderResult<Option<Self::Receipt>> {
1852 if let Some(num) = self.transaction_id(hash)? {
1853 return self.receipt(num)
1854 }
1855 Ok(None)
1856 }
1857
1858 fn receipts_by_block(
1859 &self,
1860 _block: BlockHashOrNumber,
1861 ) -> ProviderResult<Option<Vec<Self::Receipt>>> {
1862 unreachable!()
1863 }
1864
1865 fn receipts_by_tx_range(
1866 &self,
1867 range: impl RangeBounds<TxNumber>,
1868 ) -> ProviderResult<Vec<Self::Receipt>> {
1869 self.fetch_range_with_predicate(
1870 StaticFileSegment::Receipts,
1871 to_range(range),
1872 |cursor, number| cursor.get_one::<ReceiptMask<Self::Receipt>>(number.into()),
1873 |_| true,
1874 )
1875 }
1876
1877 fn receipts_by_block_range(
1878 &self,
1879 _block_range: RangeInclusive<BlockNumber>,
1880 ) -> ProviderResult<Vec<Vec<Self::Receipt>>> {
1881 Err(ProviderError::UnsupportedProvider)
1882 }
1883}
1884
1885impl<N: NodePrimitives<SignedTx: Value, Receipt: Value, BlockHeader: Value>> TransactionsProviderExt
1886 for StaticFileProvider<N>
1887{
1888 fn transaction_hashes_by_range(
1889 &self,
1890 tx_range: Range<TxNumber>,
1891 ) -> ProviderResult<Vec<(TxHash, TxNumber)>> {
1892 let tx_range_size = (tx_range.end - tx_range.start) as usize;
1893
1894 let chunk_size = 100;
1898
1899 let chunks = tx_range
1901 .clone()
1902 .step_by(chunk_size)
1903 .map(|start| start..std::cmp::min(start + chunk_size as u64, tx_range.end));
1904 let mut channels = Vec::with_capacity(tx_range_size.div_ceil(chunk_size));
1905
1906 for chunk_range in chunks {
1907 let (channel_tx, channel_rx) = mpsc::channel();
1908 channels.push(channel_rx);
1909
1910 let manager = self.clone();
1911
1912 rayon::spawn(move || {
1916 let mut rlp_buf = Vec::with_capacity(128);
1917 let _ = manager.fetch_range_with_predicate(
1918 StaticFileSegment::Transactions,
1919 chunk_range,
1920 |cursor, number| {
1921 Ok(cursor
1922 .get_one::<TransactionMask<Self::Transaction>>(number.into())?
1923 .map(|transaction| {
1924 rlp_buf.clear();
1925 let _ = channel_tx
1926 .send(calculate_hash((number, transaction), &mut rlp_buf));
1927 }))
1928 },
1929 |_| true,
1930 );
1931 });
1932 }
1933
1934 let mut tx_list = Vec::with_capacity(tx_range_size);
1935
1936 for channel in channels {
1938 while let Ok(tx) = channel.recv() {
1939 let (tx_hash, tx_id) = tx.map_err(|boxed| *boxed)?;
1940 tx_list.push((tx_hash, tx_id));
1941 }
1942 }
1943
1944 Ok(tx_list)
1945 }
1946}
1947
1948impl<N: NodePrimitives<SignedTx: Decompress + SignedTransaction>> TransactionsProvider
1949 for StaticFileProvider<N>
1950{
1951 type Transaction = N::SignedTx;
1952
1953 fn transaction_id(&self, tx_hash: TxHash) -> ProviderResult<Option<TxNumber>> {
1954 self.find_static_file(StaticFileSegment::Transactions, |jar_provider| {
1955 let mut cursor = jar_provider.cursor()?;
1956 if cursor
1957 .get_one::<TransactionMask<Self::Transaction>>((&tx_hash).into())?
1958 .and_then(|tx| (tx.trie_hash() == tx_hash).then_some(tx))
1959 .is_some()
1960 {
1961 Ok(cursor.number())
1962 } else {
1963 Ok(None)
1964 }
1965 })
1966 }
1967
1968 fn transaction_by_id(&self, num: TxNumber) -> ProviderResult<Option<Self::Transaction>> {
1969 self.get_segment_provider_for_transaction(StaticFileSegment::Transactions, num, None)
1970 .and_then(|provider| provider.transaction_by_id(num))
1971 .or_else(|err| {
1972 if let ProviderError::MissingStaticFileTx(_, _) = err {
1973 Ok(None)
1974 } else {
1975 Err(err)
1976 }
1977 })
1978 }
1979
1980 fn transaction_by_id_unhashed(
1981 &self,
1982 num: TxNumber,
1983 ) -> ProviderResult<Option<Self::Transaction>> {
1984 self.get_segment_provider_for_transaction(StaticFileSegment::Transactions, num, None)
1985 .and_then(|provider| provider.transaction_by_id_unhashed(num))
1986 .or_else(|err| {
1987 if let ProviderError::MissingStaticFileTx(_, _) = err {
1988 Ok(None)
1989 } else {
1990 Err(err)
1991 }
1992 })
1993 }
1994
1995 fn transaction_by_hash(&self, hash: TxHash) -> ProviderResult<Option<Self::Transaction>> {
1996 self.find_static_file(StaticFileSegment::Transactions, |jar_provider| {
1997 Ok(jar_provider
1998 .cursor()?
1999 .get_one::<TransactionMask<Self::Transaction>>((&hash).into())?
2000 .and_then(|tx| (tx.trie_hash() == hash).then_some(tx)))
2001 })
2002 }
2003
2004 fn transaction_by_hash_with_meta(
2005 &self,
2006 _hash: TxHash,
2007 ) -> ProviderResult<Option<(Self::Transaction, TransactionMeta)>> {
2008 Err(ProviderError::UnsupportedProvider)
2010 }
2011
2012 fn transactions_by_block(
2013 &self,
2014 _block_id: BlockHashOrNumber,
2015 ) -> ProviderResult<Option<Vec<Self::Transaction>>> {
2016 Err(ProviderError::UnsupportedProvider)
2018 }
2019
2020 fn transactions_by_block_range(
2021 &self,
2022 _range: impl RangeBounds<BlockNumber>,
2023 ) -> ProviderResult<Vec<Vec<Self::Transaction>>> {
2024 Err(ProviderError::UnsupportedProvider)
2026 }
2027
2028 fn transactions_by_tx_range(
2029 &self,
2030 range: impl RangeBounds<TxNumber>,
2031 ) -> ProviderResult<Vec<Self::Transaction>> {
2032 self.fetch_range_with_predicate(
2033 StaticFileSegment::Transactions,
2034 to_range(range),
2035 |cursor, number| cursor.get_one::<TransactionMask<Self::Transaction>>(number.into()),
2036 |_| true,
2037 )
2038 }
2039
2040 fn senders_by_tx_range(
2041 &self,
2042 range: impl RangeBounds<TxNumber>,
2043 ) -> ProviderResult<Vec<Address>> {
2044 self.fetch_range_with_predicate(
2045 StaticFileSegment::TransactionSenders,
2046 to_range(range),
2047 |cursor, number| cursor.get_one::<TransactionSenderMask>(number.into()),
2048 |_| true,
2049 )
2050 }
2051
2052 fn transaction_sender(&self, id: TxNumber) -> ProviderResult<Option<Address>> {
2053 self.get_segment_provider_for_transaction(StaticFileSegment::TransactionSenders, id, None)
2054 .and_then(|provider| provider.transaction_sender(id))
2055 .or_else(|err| {
2056 if let ProviderError::MissingStaticFileTx(_, _) = err {
2057 Ok(None)
2058 } else {
2059 Err(err)
2060 }
2061 })
2062 }
2063}
2064
2065impl<N: NodePrimitives> BlockNumReader for StaticFileProvider<N> {
2066 fn chain_info(&self) -> ProviderResult<ChainInfo> {
2067 Err(ProviderError::UnsupportedProvider)
2069 }
2070
2071 fn best_block_number(&self) -> ProviderResult<BlockNumber> {
2072 Err(ProviderError::UnsupportedProvider)
2074 }
2075
2076 fn last_block_number(&self) -> ProviderResult<BlockNumber> {
2077 Ok(self.get_highest_static_file_block(StaticFileSegment::Headers).unwrap_or_default())
2078 }
2079
2080 fn block_number(&self, _hash: B256) -> ProviderResult<Option<BlockNumber>> {
2081 Err(ProviderError::UnsupportedProvider)
2083 }
2084}
2085
2086impl<N: NodePrimitives<SignedTx: Value, Receipt: Value, BlockHeader: Value>> BlockReader
2089 for StaticFileProvider<N>
2090{
2091 type Block = N::Block;
2092
2093 fn find_block_by_hash(
2094 &self,
2095 _hash: B256,
2096 _source: BlockSource,
2097 ) -> ProviderResult<Option<Self::Block>> {
2098 Err(ProviderError::UnsupportedProvider)
2100 }
2101
2102 fn block(&self, _id: BlockHashOrNumber) -> ProviderResult<Option<Self::Block>> {
2103 Err(ProviderError::UnsupportedProvider)
2105 }
2106
2107 fn pending_block(&self) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
2108 Err(ProviderError::UnsupportedProvider)
2110 }
2111
2112 fn pending_block_and_receipts(
2113 &self,
2114 ) -> ProviderResult<Option<(RecoveredBlock<Self::Block>, Vec<Self::Receipt>)>> {
2115 Err(ProviderError::UnsupportedProvider)
2117 }
2118
2119 fn recovered_block(
2120 &self,
2121 _id: BlockHashOrNumber,
2122 _transaction_kind: TransactionVariant,
2123 ) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
2124 Err(ProviderError::UnsupportedProvider)
2126 }
2127
2128 fn sealed_block_with_senders(
2129 &self,
2130 _id: BlockHashOrNumber,
2131 _transaction_kind: TransactionVariant,
2132 ) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
2133 Err(ProviderError::UnsupportedProvider)
2135 }
2136
2137 fn block_range(&self, _range: RangeInclusive<BlockNumber>) -> ProviderResult<Vec<Self::Block>> {
2138 Err(ProviderError::UnsupportedProvider)
2140 }
2141
2142 fn block_with_senders_range(
2143 &self,
2144 _range: RangeInclusive<BlockNumber>,
2145 ) -> ProviderResult<Vec<RecoveredBlock<Self::Block>>> {
2146 Err(ProviderError::UnsupportedProvider)
2147 }
2148
2149 fn recovered_block_range(
2150 &self,
2151 _range: RangeInclusive<BlockNumber>,
2152 ) -> ProviderResult<Vec<RecoveredBlock<Self::Block>>> {
2153 Err(ProviderError::UnsupportedProvider)
2154 }
2155
2156 fn block_by_transaction_id(&self, _id: TxNumber) -> ProviderResult<Option<BlockNumber>> {
2157 Err(ProviderError::UnsupportedProvider)
2158 }
2159}
2160
2161impl<N: NodePrimitives> BlockBodyIndicesProvider for StaticFileProvider<N> {
2162 fn block_body_indices(&self, _num: u64) -> ProviderResult<Option<StoredBlockBodyIndices>> {
2163 Err(ProviderError::UnsupportedProvider)
2164 }
2165
2166 fn block_body_indices_range(
2167 &self,
2168 _range: RangeInclusive<BlockNumber>,
2169 ) -> ProviderResult<Vec<StoredBlockBodyIndices>> {
2170 Err(ProviderError::UnsupportedProvider)
2171 }
2172}
2173
2174impl<N: NodePrimitives> StatsReader for StaticFileProvider<N> {
2175 fn count_entries<T: Table>(&self) -> ProviderResult<usize> {
2176 match T::NAME {
2177 tables::CanonicalHeaders::NAME |
2178 tables::Headers::<Header>::NAME |
2179 tables::HeaderTerminalDifficulties::NAME => Ok(self
2180 .get_highest_static_file_block(StaticFileSegment::Headers)
2181 .map(|block| block + 1)
2182 .unwrap_or_default()
2183 as usize),
2184 tables::Receipts::<Receipt>::NAME => Ok(self
2185 .get_highest_static_file_tx(StaticFileSegment::Receipts)
2186 .map(|receipts| receipts + 1)
2187 .unwrap_or_default() as usize),
2188 tables::Transactions::<TransactionSigned>::NAME => Ok(self
2189 .get_highest_static_file_tx(StaticFileSegment::Transactions)
2190 .map(|txs| txs + 1)
2191 .unwrap_or_default()
2192 as usize),
2193 tables::TransactionSenders::NAME => Ok(self
2194 .get_highest_static_file_tx(StaticFileSegment::TransactionSenders)
2195 .map(|txs| txs + 1)
2196 .unwrap_or_default() as usize),
2197 _ => Err(ProviderError::UnsupportedProvider),
2198 }
2199 }
2200}
2201
2202#[inline]
2204fn calculate_hash<T>(
2205 entry: (TxNumber, T),
2206 rlp_buf: &mut Vec<u8>,
2207) -> Result<(B256, TxNumber), Box<ProviderError>>
2208where
2209 T: Encodable2718,
2210{
2211 let (tx_id, tx) = entry;
2212 tx.encode_2718(rlp_buf);
2213 Ok((keccak256(rlp_buf), tx_id))
2214}
2215
2216#[cfg(test)]
2217mod tests {
2218 use std::collections::BTreeMap;
2219
2220 use reth_chain_state::EthPrimitives;
2221 use reth_db::test_utils::create_test_static_files_dir;
2222 use reth_static_file_types::{SegmentRangeInclusive, StaticFileSegment};
2223
2224 use crate::StaticFileProviderBuilder;
2225
2226 #[test]
2227 fn test_find_fixed_range_with_block_index() -> eyre::Result<()> {
2228 let (static_dir, _) = create_test_static_files_dir();
2229 let sf_rw = StaticFileProviderBuilder::<EthPrimitives>::read_write(&static_dir)?
2230 .with_blocks_per_file(100)
2231 .build()?;
2232
2233 let segment = StaticFileSegment::Headers;
2234
2235 assert_eq!(
2237 sf_rw.find_fixed_range_with_block_index(segment, None, 0),
2238 SegmentRangeInclusive::new(0, 99)
2239 );
2240 assert_eq!(
2241 sf_rw.find_fixed_range_with_block_index(segment, None, 250),
2242 SegmentRangeInclusive::new(200, 299)
2243 );
2244
2245 assert_eq!(
2247 sf_rw.find_fixed_range_with_block_index(segment, Some(&BTreeMap::new()), 150),
2248 SegmentRangeInclusive::new(100, 199)
2249 );
2250
2251 let block_index = BTreeMap::from_iter([
2253 (99, SegmentRangeInclusive::new(0, 99)),
2254 (199, SegmentRangeInclusive::new(100, 199)),
2255 (299, SegmentRangeInclusive::new(200, 299)),
2256 ]);
2257
2258 assert_eq!(
2260 sf_rw.find_fixed_range_with_block_index(segment, Some(&block_index), 0),
2261 SegmentRangeInclusive::new(0, 99)
2262 );
2263 assert_eq!(
2264 sf_rw.find_fixed_range_with_block_index(segment, Some(&block_index), 50),
2265 SegmentRangeInclusive::new(0, 99)
2266 );
2267 assert_eq!(
2268 sf_rw.find_fixed_range_with_block_index(segment, Some(&block_index), 99),
2269 SegmentRangeInclusive::new(0, 99)
2270 );
2271 assert_eq!(
2272 sf_rw.find_fixed_range_with_block_index(segment, Some(&block_index), 100),
2273 SegmentRangeInclusive::new(100, 199)
2274 );
2275 assert_eq!(
2276 sf_rw.find_fixed_range_with_block_index(segment, Some(&block_index), 150),
2277 SegmentRangeInclusive::new(100, 199)
2278 );
2279 assert_eq!(
2280 sf_rw.find_fixed_range_with_block_index(segment, Some(&block_index), 199),
2281 SegmentRangeInclusive::new(100, 199)
2282 );
2283
2284 assert_eq!(
2287 sf_rw.find_fixed_range_with_block_index(segment, Some(&block_index), 300),
2288 SegmentRangeInclusive::new(300, 399)
2289 );
2290 assert_eq!(
2291 sf_rw.find_fixed_range_with_block_index(segment, Some(&block_index), 350),
2292 SegmentRangeInclusive::new(300, 399)
2293 );
2294
2295 assert_eq!(
2297 sf_rw.find_fixed_range_with_block_index(segment, Some(&block_index), 500),
2298 SegmentRangeInclusive::new(500, 599)
2299 );
2300
2301 assert_eq!(
2303 sf_rw.find_fixed_range_with_block_index(segment, Some(&block_index), 1000),
2304 SegmentRangeInclusive::new(1000, 1099)
2305 );
2306
2307 let mixed_size_index = BTreeMap::from_iter([
2310 (49, SegmentRangeInclusive::new(0, 49)), (149, SegmentRangeInclusive::new(50, 149)), (349, SegmentRangeInclusive::new(150, 349)), ]);
2314
2315 assert_eq!(
2317 sf_rw.find_fixed_range_with_block_index(segment, Some(&mixed_size_index), 25),
2318 SegmentRangeInclusive::new(0, 49)
2319 );
2320 assert_eq!(
2321 sf_rw.find_fixed_range_with_block_index(segment, Some(&mixed_size_index), 100),
2322 SegmentRangeInclusive::new(50, 149)
2323 );
2324 assert_eq!(
2325 sf_rw.find_fixed_range_with_block_index(segment, Some(&mixed_size_index), 200),
2326 SegmentRangeInclusive::new(150, 349)
2327 );
2328
2329 assert_eq!(
2332 sf_rw.find_fixed_range_with_block_index(segment, Some(&mixed_size_index), 350),
2333 SegmentRangeInclusive::new(350, 449)
2334 );
2335 assert_eq!(
2336 sf_rw.find_fixed_range_with_block_index(segment, Some(&mixed_size_index), 450),
2337 SegmentRangeInclusive::new(450, 549)
2338 );
2339 assert_eq!(
2340 sf_rw.find_fixed_range_with_block_index(segment, Some(&mixed_size_index), 550),
2341 SegmentRangeInclusive::new(550, 649)
2342 );
2343
2344 Ok(())
2345 }
2346}