1use super::{
2 metrics::StaticFileProviderMetrics, writer::StaticFileWriters, LoadedJar,
3 StaticFileJarProvider, StaticFileProviderRW, StaticFileProviderRWRefMut,
4};
5use crate::{
6 to_range, BlockHashReader, BlockNumReader, BlockReader, BlockSource, HeaderProvider,
7 ReceiptProvider, StageCheckpointReader, StatsReader, TransactionVariant, TransactionsProvider,
8 TransactionsProviderExt, WithdrawalsProvider,
9};
10use alloy_consensus::{transaction::TransactionMeta, Header};
11use alloy_eips::{eip2718::Encodable2718, eip4895::Withdrawals, BlockHashOrNumber};
12use alloy_primitives::{
13 b256, keccak256, Address, BlockHash, BlockNumber, TxHash, TxNumber, B256, U256,
14};
15use dashmap::DashMap;
16use notify::{RecommendedWatcher, RecursiveMode, Watcher};
17use parking_lot::RwLock;
18use reth_chainspec::{ChainInfo, ChainSpecProvider, EthChainSpec};
19use reth_db::{
20 lockfile::StorageLock,
21 static_file::{
22 iter_static_files, BlockHashMask, BodyIndicesMask, HeaderMask, HeaderWithHashMask,
23 ReceiptMask, StaticFileCursor, TDWithHashMask, TransactionMask,
24 },
25};
26use reth_db_api::{
27 cursor::DbCursorRO,
28 models::StoredBlockBodyIndices,
29 table::{Decompress, Table, Value},
30 tables,
31 transaction::DbTx,
32};
33use reth_nippy_jar::{NippyJar, NippyJarChecker, CONFIG_FILE_EXTENSION};
34use reth_node_types::{FullNodePrimitives, NodePrimitives};
35use reth_primitives::{
36 static_file::{
37 find_fixed_range, HighestStaticFiles, SegmentHeader, SegmentRangeInclusive,
38 DEFAULT_BLOCKS_PER_STATIC_FILE,
39 },
40 Receipt, RecoveredBlock, SealedBlock, SealedHeader, StaticFileSegment, TransactionSigned,
41};
42use reth_primitives_traits::SignedTransaction;
43use reth_stages_types::{PipelineTarget, StageId};
44use reth_storage_api::{BlockBodyIndicesProvider, DBProvider, OmmersProvider};
45use reth_storage_errors::provider::{ProviderError, ProviderResult};
46use std::{
47 collections::{hash_map::Entry, BTreeMap, HashMap},
48 fmt::Debug,
49 marker::PhantomData,
50 ops::{Deref, Range, RangeBounds, RangeInclusive},
51 path::{Path, PathBuf},
52 sync::{mpsc, Arc},
53};
54use tracing::{info, trace, warn};
55
56type SegmentRanges = HashMap<StaticFileSegment, BTreeMap<TxNumber, SegmentRangeInclusive>>;
60
61#[derive(Debug, Default, PartialEq, Eq)]
63pub enum StaticFileAccess {
64 #[default]
66 RO,
67 RW,
69}
70
71impl StaticFileAccess {
72 pub const fn is_read_only(&self) -> bool {
74 matches!(self, Self::RO)
75 }
76
77 pub const fn is_read_write(&self) -> bool {
79 matches!(self, Self::RW)
80 }
81}
82
83#[derive(Debug)]
92pub struct StaticFileProvider<N>(pub(crate) Arc<StaticFileProviderInner<N>>);
93
94impl<N> Clone for StaticFileProvider<N> {
95 fn clone(&self) -> Self {
96 Self(self.0.clone())
97 }
98}
99
100impl<N: NodePrimitives> StaticFileProvider<N> {
101 fn new(path: impl AsRef<Path>, access: StaticFileAccess) -> ProviderResult<Self> {
103 let provider = Self(Arc::new(StaticFileProviderInner::new(path, access)?));
104 provider.initialize_index()?;
105 Ok(provider)
106 }
107
108 pub fn read_only(path: impl AsRef<Path>, watch_directory: bool) -> ProviderResult<Self> {
118 let provider = Self::new(path, StaticFileAccess::RO)?;
119
120 if watch_directory {
121 provider.watch_directory();
122 }
123
124 Ok(provider)
125 }
126
127 pub fn read_write(path: impl AsRef<Path>) -> ProviderResult<Self> {
129 Self::new(path, StaticFileAccess::RW)
130 }
131
132 pub fn watch_directory(&self) {
138 let provider = self.clone();
139 std::thread::spawn(move || {
140 let (tx, rx) = std::sync::mpsc::channel();
141 let mut watcher = RecommendedWatcher::new(
142 move |res| tx.send(res).unwrap(),
143 notify::Config::default(),
144 )
145 .expect("failed to create watcher");
146
147 watcher
148 .watch(&provider.path, RecursiveMode::NonRecursive)
149 .expect("failed to watch path");
150
151 let mut last_event_timestamp = None;
153
154 while let Ok(res) = rx.recv() {
155 match res {
156 Ok(event) => {
157 if !matches!(
159 event.kind,
160 notify::EventKind::Modify(_) |
161 notify::EventKind::Create(_) |
162 notify::EventKind::Remove(_)
163 ) {
164 continue
165 }
166
167 for segment in event.paths {
172 #[allow(clippy::nonminimal_bool)]
174 if !segment
175 .extension()
176 .is_some_and(|s| s.to_str() == Some(CONFIG_FILE_EXTENSION))
177 {
178 continue
179 }
180
181 if StaticFileSegment::parse_filename(
183 &segment.file_stem().expect("qed").to_string_lossy(),
184 )
185 .is_none()
186 {
187 continue
188 }
189
190 if let Ok(current_modified_timestamp) =
193 std::fs::metadata(&segment).and_then(|m| m.modified())
194 {
195 if last_event_timestamp.is_some_and(|last_timestamp| {
196 last_timestamp >= current_modified_timestamp
197 }) {
198 continue
199 }
200 last_event_timestamp = Some(current_modified_timestamp);
201 }
202
203 info!(target: "providers::static_file", updated_file = ?segment.file_stem(), "re-initializing static file provider index");
204 if let Err(err) = provider.initialize_index() {
205 warn!(target: "providers::static_file", "failed to re-initialize index: {err}");
206 }
207 break
208 }
209 }
210
211 Err(err) => warn!(target: "providers::watcher", "watch error: {err:?}"),
212 }
213 }
214 });
215 }
216}
217
218impl<N: NodePrimitives> Deref for StaticFileProvider<N> {
219 type Target = StaticFileProviderInner<N>;
220
221 fn deref(&self) -> &Self::Target {
222 &self.0
223 }
224}
225
226#[derive(Debug)]
228pub struct StaticFileProviderInner<N> {
229 map: DashMap<(BlockNumber, StaticFileSegment), LoadedJar>,
232 static_files_max_block: RwLock<HashMap<StaticFileSegment, u64>>,
234 static_files_tx_index: RwLock<SegmentRanges>,
236 path: PathBuf,
238 writers: StaticFileWriters<N>,
240 metrics: Option<Arc<StaticFileProviderMetrics>>,
242 access: StaticFileAccess,
244 blocks_per_file: u64,
246 _lock_file: Option<StorageLock>,
248 _pd: PhantomData<N>,
250}
251
252impl<N: NodePrimitives> StaticFileProviderInner<N> {
253 fn new(path: impl AsRef<Path>, access: StaticFileAccess) -> ProviderResult<Self> {
255 let _lock_file = if access.is_read_write() {
256 StorageLock::try_acquire(path.as_ref())?.into()
257 } else {
258 None
259 };
260
261 let provider = Self {
262 map: Default::default(),
263 writers: Default::default(),
264 static_files_max_block: Default::default(),
265 static_files_tx_index: Default::default(),
266 path: path.as_ref().to_path_buf(),
267 metrics: None,
268 access,
269 blocks_per_file: DEFAULT_BLOCKS_PER_STATIC_FILE,
270 _lock_file,
271 _pd: Default::default(),
272 };
273
274 Ok(provider)
275 }
276
277 pub const fn is_read_only(&self) -> bool {
278 self.access.is_read_only()
279 }
280
281 pub const fn find_fixed_range(&self, block: BlockNumber) -> SegmentRangeInclusive {
284 find_fixed_range(block, self.blocks_per_file)
285 }
286}
287
288impl<N: NodePrimitives> StaticFileProvider<N> {
289 #[cfg(any(test, feature = "test-utils"))]
291 pub fn with_custom_blocks_per_file(self, blocks_per_file: u64) -> Self {
292 let mut provider =
293 Arc::try_unwrap(self.0).expect("should be called when initializing only");
294 provider.blocks_per_file = blocks_per_file;
295 Self(Arc::new(provider))
296 }
297
298 pub fn with_metrics(self) -> Self {
300 let mut provider =
301 Arc::try_unwrap(self.0).expect("should be called when initializing only");
302 provider.metrics = Some(Arc::new(StaticFileProviderMetrics::default()));
303 Self(Arc::new(provider))
304 }
305
306 pub fn report_metrics(&self) -> ProviderResult<()> {
308 let Some(metrics) = &self.metrics else { return Ok(()) };
309
310 let static_files = iter_static_files(&self.path).map_err(ProviderError::other)?;
311 for (segment, ranges) in static_files {
312 let mut entries = 0;
313 let mut size = 0;
314
315 for (block_range, _) in &ranges {
316 let fixed_block_range = self.find_fixed_range(block_range.start());
317 let jar_provider = self
318 .get_segment_provider(segment, || Some(fixed_block_range), None)?
319 .ok_or_else(|| {
320 ProviderError::MissingStaticFileBlock(segment, block_range.start())
321 })?;
322
323 entries += jar_provider.rows();
324
325 let data_size = reth_fs_util::metadata(jar_provider.data_path())
326 .map(|metadata| metadata.len())
327 .unwrap_or_default();
328 let index_size = reth_fs_util::metadata(jar_provider.index_path())
329 .map(|metadata| metadata.len())
330 .unwrap_or_default();
331 let offsets_size = reth_fs_util::metadata(jar_provider.offsets_path())
332 .map(|metadata| metadata.len())
333 .unwrap_or_default();
334 let config_size = reth_fs_util::metadata(jar_provider.config_path())
335 .map(|metadata| metadata.len())
336 .unwrap_or_default();
337
338 size += data_size + index_size + offsets_size + config_size;
339 }
340
341 metrics.record_segment(segment, size, ranges.len(), entries);
342 }
343
344 Ok(())
345 }
346
347 pub fn get_segment_provider_from_block(
349 &self,
350 segment: StaticFileSegment,
351 block: BlockNumber,
352 path: Option<&Path>,
353 ) -> ProviderResult<StaticFileJarProvider<'_, N>> {
354 self.get_segment_provider(
355 segment,
356 || self.get_segment_ranges_from_block(segment, block),
357 path,
358 )?
359 .ok_or(ProviderError::MissingStaticFileBlock(segment, block))
360 }
361
362 pub fn get_segment_provider_from_transaction(
364 &self,
365 segment: StaticFileSegment,
366 tx: TxNumber,
367 path: Option<&Path>,
368 ) -> ProviderResult<StaticFileJarProvider<'_, N>> {
369 self.get_segment_provider(
370 segment,
371 || self.get_segment_ranges_from_transaction(segment, tx),
372 path,
373 )?
374 .ok_or(ProviderError::MissingStaticFileTx(segment, tx))
375 }
376
377 pub fn get_segment_provider(
381 &self,
382 segment: StaticFileSegment,
383 fn_range: impl Fn() -> Option<SegmentRangeInclusive>,
384 path: Option<&Path>,
385 ) -> ProviderResult<Option<StaticFileJarProvider<'_, N>>> {
386 let block_range = match path {
389 Some(path) => StaticFileSegment::parse_filename(
390 &path
391 .file_name()
392 .ok_or_else(|| {
393 ProviderError::MissingStaticFilePath(segment, path.to_path_buf())
394 })?
395 .to_string_lossy(),
396 )
397 .and_then(|(parsed_segment, block_range)| {
398 if parsed_segment == segment {
399 return Some(block_range)
400 }
401 None
402 }),
403 None => fn_range(),
404 };
405
406 if let Some(block_range) = block_range {
408 return Ok(Some(self.get_or_create_jar_provider(segment, &block_range)?))
409 }
410
411 Ok(None)
412 }
413
414 pub fn remove_cached_provider(
418 &self,
419 segment: StaticFileSegment,
420 fixed_block_range_end: BlockNumber,
421 ) {
422 self.map.remove(&(fixed_block_range_end, segment));
423 }
424
425 pub fn delete_jar(&self, segment: StaticFileSegment, block: BlockNumber) -> ProviderResult<()> {
429 let fixed_block_range = self.find_fixed_range(block);
430 let key = (fixed_block_range.end(), segment);
431 let jar = if let Some((_, jar)) = self.map.remove(&key) {
432 jar.jar
433 } else {
434 NippyJar::<SegmentHeader>::load(&self.path.join(segment.filename(&fixed_block_range)))
435 .map_err(ProviderError::other)?
436 };
437
438 jar.delete().map_err(ProviderError::other)?;
439
440 let mut segment_max_block = None;
441 if fixed_block_range.start() > 0 {
442 segment_max_block = Some(fixed_block_range.start() - 1)
443 };
444 self.update_index(segment, segment_max_block)?;
445
446 Ok(())
447 }
448
449 fn get_or_create_jar_provider(
453 &self,
454 segment: StaticFileSegment,
455 fixed_block_range: &SegmentRangeInclusive,
456 ) -> ProviderResult<StaticFileJarProvider<'_, N>> {
457 let key = (fixed_block_range.end(), segment);
458
459 trace!(target: "provider::static_file", ?segment, ?fixed_block_range, "Getting provider");
461 let mut provider: StaticFileJarProvider<'_, N> = if let Some(jar) = self.map.get(&key) {
462 trace!(target: "provider::static_file", ?segment, ?fixed_block_range, "Jar found in cache");
463 jar.into()
464 } else {
465 trace!(target: "provider::static_file", ?segment, ?fixed_block_range, "Creating jar from scratch");
466 let path = self.path.join(segment.filename(fixed_block_range));
467 let jar = NippyJar::load(&path).map_err(ProviderError::other)?;
468 self.map.entry(key).insert(LoadedJar::new(jar)?).downgrade().into()
469 };
470
471 if let Some(metrics) = &self.metrics {
472 provider = provider.with_metrics(metrics.clone());
473 }
474 Ok(provider)
475 }
476
477 fn get_segment_ranges_from_block(
480 &self,
481 segment: StaticFileSegment,
482 block: u64,
483 ) -> Option<SegmentRangeInclusive> {
484 self.static_files_max_block
485 .read()
486 .get(&segment)
487 .filter(|max| **max >= block)
488 .map(|_| self.find_fixed_range(block))
489 }
490
491 fn get_segment_ranges_from_transaction(
494 &self,
495 segment: StaticFileSegment,
496 tx: u64,
497 ) -> Option<SegmentRangeInclusive> {
498 let static_files = self.static_files_tx_index.read();
499 let segment_static_files = static_files.get(&segment)?;
500
501 let mut static_files_rev_iter = segment_static_files.iter().rev().peekable();
504
505 while let Some((tx_end, block_range)) = static_files_rev_iter.next() {
506 if tx > *tx_end {
507 return None
509 }
510 let tx_start = static_files_rev_iter.peek().map(|(tx_end, _)| *tx_end + 1).unwrap_or(0);
511 if tx_start <= tx {
512 return Some(self.find_fixed_range(block_range.end()))
513 }
514 }
515 None
516 }
517
518 pub fn update_index(
525 &self,
526 segment: StaticFileSegment,
527 segment_max_block: Option<BlockNumber>,
528 ) -> ProviderResult<()> {
529 let mut max_block = self.static_files_max_block.write();
530 let mut tx_index = self.static_files_tx_index.write();
531
532 match segment_max_block {
533 Some(segment_max_block) => {
534 max_block.insert(segment, segment_max_block);
536 let fixed_range = self.find_fixed_range(segment_max_block);
537
538 let jar = NippyJar::<SegmentHeader>::load(
539 &self.path.join(segment.filename(&fixed_range)),
540 )
541 .map_err(ProviderError::other)?;
542
543 if let Some(tx_range) = jar.user_header().tx_range() {
546 let tx_end = tx_range.end();
547
548 if let Some(current_block_range) = jar.user_header().block_range().copied() {
551 tx_index
560 .entry(segment)
561 .and_modify(|index| {
562 index.retain(|_, block_range| {
563 block_range.start() < fixed_range.start()
564 });
565 index.insert(tx_end, current_block_range);
566 })
567 .or_insert_with(|| BTreeMap::from([(tx_end, current_block_range)]));
568 }
569 } else if segment.is_tx_based() {
570 tx_index.entry(segment).and_modify(|index| {
574 index.retain(|_, block_range| block_range.start() < fixed_range.start());
575 });
576
577 if tx_index.get(&segment).is_some_and(|index| index.is_empty()) {
579 tx_index.remove(&segment);
580 }
581 }
582
583 self.map.insert((fixed_range.end(), segment), LoadedJar::new(jar)?);
585
586 self.map.retain(|(end, seg), _| !(*seg == segment && *end > fixed_range.end()));
588 }
589 None => {
590 tx_index.remove(&segment);
591 max_block.remove(&segment);
592 }
593 };
594
595 Ok(())
596 }
597
598 pub fn initialize_index(&self) -> ProviderResult<()> {
600 let mut max_block = self.static_files_max_block.write();
601 let mut tx_index = self.static_files_tx_index.write();
602
603 max_block.clear();
604 tx_index.clear();
605
606 for (segment, ranges) in iter_static_files(&self.path).map_err(ProviderError::other)? {
607 if let Some((block_range, _)) = ranges.last() {
609 max_block.insert(segment, block_range.end());
610 }
611
612 for (block_range, tx_range) in ranges {
614 if let Some(tx_range) = tx_range {
615 let tx_end = tx_range.end();
616
617 match tx_index.entry(segment) {
618 Entry::Occupied(mut index) => {
619 index.get_mut().insert(tx_end, block_range);
620 }
621 Entry::Vacant(index) => {
622 index.insert(BTreeMap::from([(tx_end, block_range)]));
623 }
624 };
625 }
626 }
627 }
628
629 self.map.clear();
631
632 Ok(())
633 }
634
635 #[allow(clippy::while_let_loop)]
659 pub fn check_consistency<Provider>(
660 &self,
661 provider: &Provider,
662 has_receipt_pruning: bool,
663 ) -> ProviderResult<Option<PipelineTarget>>
664 where
665 Provider: DBProvider + BlockReader + StageCheckpointReader + ChainSpecProvider,
666 N: NodePrimitives<Receipt: Value, BlockHeader: Value, SignedTx: Value>,
667 {
668 if provider.chain_spec().is_optimism() &&
675 reth_chainspec::Chain::optimism_mainnet() == provider.chain_spec().chain_id()
676 {
677 const OVM_HEADER_1_HASH: B256 =
679 b256!("0xbee7192e575af30420cae0c7776304ac196077ee72b048970549e4f08e875453");
680 if provider.block_number(OVM_HEADER_1_HASH)?.is_some() {
681 info!(target: "reth::cli",
682 "Skipping storage verification for OP mainnet, expected inconsistency in OVM chain"
683 );
684 return Ok(None)
685 }
686 }
687
688 info!(target: "reth::cli", "Verifying storage consistency.");
689
690 let mut unwind_target: Option<BlockNumber> = None;
691 let mut update_unwind_target = |new_target: BlockNumber| {
692 if let Some(target) = unwind_target.as_mut() {
693 *target = (*target).min(new_target);
694 } else {
695 unwind_target = Some(new_target);
696 }
697 };
698
699 for segment in StaticFileSegment::iter() {
700 if segment.is_block_meta() {
702 continue
703 }
704
705 if has_receipt_pruning && segment.is_receipts() {
706 continue
708 }
709
710 let initial_highest_block = self.get_highest_static_file_block(segment);
711
712 if self.access.is_read_only() {
721 self.check_segment_consistency(segment)?;
722 } else {
723 self.latest_writer(segment)?;
725 }
726
727 let mut highest_block = self.get_highest_static_file_block(segment);
732 if initial_highest_block != highest_block {
733 info!(
734 target: "reth::providers::static_file",
735 ?initial_highest_block,
736 unwind_target = highest_block,
737 ?segment,
738 "Setting unwind target."
739 );
740 update_unwind_target(highest_block.unwrap_or_default());
741 }
742
743 let highest_tx = self.get_highest_static_file_tx(segment);
749 if let Some(highest_tx) = highest_tx {
750 let mut last_block = highest_block.unwrap_or_default();
751 loop {
752 if let Some(indices) = provider.block_body_indices(last_block)? {
753 if indices.last_tx_num() <= highest_tx {
754 break
755 }
756 } else {
757 break
761 }
762 if last_block == 0 {
763 break
764 }
765 last_block -= 1;
766
767 info!(
768 target: "reth::providers::static_file",
769 highest_block = self.get_highest_static_file_block(segment),
770 unwind_target = last_block,
771 ?segment,
772 "Setting unwind target."
773 );
774 highest_block = Some(last_block);
775 update_unwind_target(last_block);
776 }
777 }
778
779 if let Some(unwind) = match segment {
780 StaticFileSegment::Headers => self
781 .ensure_invariants::<_, tables::Headers<N::BlockHeader>>(
782 provider,
783 segment,
784 highest_block,
785 highest_block,
786 )?,
787 StaticFileSegment::Transactions => self
788 .ensure_invariants::<_, tables::Transactions<N::SignedTx>>(
789 provider,
790 segment,
791 highest_tx,
792 highest_block,
793 )?,
794 StaticFileSegment::Receipts => self
795 .ensure_invariants::<_, tables::Receipts<N::Receipt>>(
796 provider,
797 segment,
798 highest_tx,
799 highest_block,
800 )?,
801 StaticFileSegment::BlockMeta => self
802 .ensure_invariants::<_, tables::BlockBodyIndices>(
803 provider,
804 segment,
805 highest_block,
806 highest_block,
807 )?,
808 } {
809 update_unwind_target(unwind);
810 }
811 }
812
813 Ok(unwind_target.map(PipelineTarget::Unwind))
814 }
815
816 pub fn check_segment_consistency(&self, segment: StaticFileSegment) -> ProviderResult<()> {
819 if let Some(latest_block) = self.get_highest_static_file_block(segment) {
820 let file_path =
821 self.directory().join(segment.filename(&self.find_fixed_range(latest_block)));
822
823 let jar = NippyJar::<SegmentHeader>::load(&file_path).map_err(ProviderError::other)?;
824
825 NippyJarChecker::new(jar).check_consistency().map_err(ProviderError::other)?;
826 }
827 Ok(())
828 }
829
830 fn ensure_invariants<Provider, T: Table<Key = u64>>(
845 &self,
846 provider: &Provider,
847 segment: StaticFileSegment,
848 highest_static_file_entry: Option<u64>,
849 highest_static_file_block: Option<BlockNumber>,
850 ) -> ProviderResult<Option<BlockNumber>>
851 where
852 Provider: DBProvider + BlockReader + StageCheckpointReader,
853 {
854 let mut db_cursor = provider.tx_ref().cursor_read::<T>()?;
855
856 if let Some((db_first_entry, _)) = db_cursor.first()? {
857 if let (Some(highest_entry), Some(highest_block)) =
858 (highest_static_file_entry, highest_static_file_block)
859 {
860 if !(db_first_entry <= highest_entry || highest_entry + 1 == db_first_entry) {
864 info!(
865 target: "reth::providers::static_file",
866 ?db_first_entry,
867 ?highest_entry,
868 unwind_target = highest_block,
869 ?segment,
870 "Setting unwind target."
871 );
872 return Ok(Some(highest_block))
873 }
874 }
875
876 if let Some((db_last_entry, _)) = db_cursor.last()? {
877 if highest_static_file_entry
878 .is_none_or(|highest_entry| db_last_entry > highest_entry)
879 {
880 return Ok(None)
881 }
882 }
883 }
884
885 let highest_static_file_entry = highest_static_file_entry.unwrap_or_default();
886 let highest_static_file_block = highest_static_file_block.unwrap_or_default();
887
888 let checkpoint_block_number = provider
891 .get_stage_checkpoint(match segment {
892 StaticFileSegment::Headers => StageId::Headers,
893 StaticFileSegment::Transactions | StaticFileSegment::BlockMeta => StageId::Bodies,
894 StaticFileSegment::Receipts => StageId::Execution,
895 })?
896 .unwrap_or_default()
897 .block_number;
898
899 if checkpoint_block_number > highest_static_file_block {
901 info!(
902 target: "reth::providers::static_file",
903 checkpoint_block_number,
904 unwind_target = highest_static_file_block,
905 ?segment,
906 "Setting unwind target."
907 );
908 return Ok(Some(highest_static_file_block))
909 }
910
911 if checkpoint_block_number < highest_static_file_block {
915 info!(
916 target: "reth::providers",
917 ?segment,
918 from = highest_static_file_block,
919 to = checkpoint_block_number,
920 "Unwinding static file segment."
921 );
922 let mut writer = self.latest_writer(segment)?;
923 if segment.is_headers() {
924 writer.prune_headers(highest_static_file_block - checkpoint_block_number)?;
926 } else if let Some(block) = provider.block_body_indices(checkpoint_block_number)? {
927 let number = highest_static_file_entry - block.last_tx_num();
930 if segment.is_receipts() {
931 writer.prune_receipts(number, checkpoint_block_number)?;
932 } else {
933 writer.prune_transactions(number, checkpoint_block_number)?;
934 }
935 }
936 writer.commit()?;
937 }
938
939 Ok(None)
940 }
941
942 pub fn get_highest_static_file_block(&self, segment: StaticFileSegment) -> Option<BlockNumber> {
946 self.static_files_max_block.read().get(&segment).copied()
947 }
948
949 pub fn get_highest_static_file_tx(&self, segment: StaticFileSegment) -> Option<TxNumber> {
953 self.static_files_tx_index
954 .read()
955 .get(&segment)
956 .and_then(|index| index.last_key_value().map(|(last_tx, _)| *last_tx))
957 }
958
959 pub fn get_highest_static_files(&self) -> HighestStaticFiles {
961 HighestStaticFiles {
962 headers: self.get_highest_static_file_block(StaticFileSegment::Headers),
963 receipts: self.get_highest_static_file_block(StaticFileSegment::Receipts),
964 transactions: self.get_highest_static_file_block(StaticFileSegment::Transactions),
965 block_meta: self.get_highest_static_file_block(StaticFileSegment::BlockMeta),
966 }
967 }
968
969 pub fn find_static_file<T>(
972 &self,
973 segment: StaticFileSegment,
974 func: impl Fn(StaticFileJarProvider<'_, N>) -> ProviderResult<Option<T>>,
975 ) -> ProviderResult<Option<T>> {
976 if let Some(highest_block) = self.get_highest_static_file_block(segment) {
977 let mut range = self.find_fixed_range(highest_block);
978 while range.end() > 0 {
979 if let Some(res) = func(self.get_or_create_jar_provider(segment, &range)?)? {
980 return Ok(Some(res))
981 }
982 range = SegmentRangeInclusive::new(
983 range.start().saturating_sub(self.blocks_per_file),
984 range.end().saturating_sub(self.blocks_per_file),
985 );
986 }
987 }
988
989 Ok(None)
990 }
991
992 pub fn fetch_range_with_predicate<T, F, P>(
998 &self,
999 segment: StaticFileSegment,
1000 range: Range<u64>,
1001 mut get_fn: F,
1002 mut predicate: P,
1003 ) -> ProviderResult<Vec<T>>
1004 where
1005 F: FnMut(&mut StaticFileCursor<'_>, u64) -> ProviderResult<Option<T>>,
1006 P: FnMut(&T) -> bool,
1007 {
1008 let get_provider = |start: u64| {
1009 if segment.is_block_based() {
1010 self.get_segment_provider_from_block(segment, start, None)
1011 } else {
1012 self.get_segment_provider_from_transaction(segment, start, None)
1013 }
1014 };
1015
1016 let mut result = Vec::with_capacity((range.end - range.start).min(100) as usize);
1017 let mut provider = get_provider(range.start)?;
1018 let mut cursor = provider.cursor()?;
1019
1020 'outer: for number in range {
1022 let mut retrying = false;
1026
1027 'inner: loop {
1029 match get_fn(&mut cursor, number)? {
1030 Some(res) => {
1031 if !predicate(&res) {
1032 break 'outer
1033 }
1034 result.push(res);
1035 break 'inner
1036 }
1037 None => {
1038 if retrying {
1039 warn!(
1040 target: "provider::static_file",
1041 ?segment,
1042 ?number,
1043 "Could not find block or tx number on a range request"
1044 );
1045
1046 let err = if segment.is_block_based() {
1047 ProviderError::MissingStaticFileBlock(segment, number)
1048 } else {
1049 ProviderError::MissingStaticFileTx(segment, number)
1050 };
1051 return Err(err)
1052 }
1053 drop(cursor);
1058 drop(provider);
1059 provider = get_provider(number)?;
1060 cursor = provider.cursor()?;
1061 retrying = true;
1062 }
1063 }
1064 }
1065 }
1066
1067 Ok(result)
1068 }
1069
1070 pub fn fetch_range_iter<'a, T, F>(
1074 &'a self,
1075 segment: StaticFileSegment,
1076 range: Range<u64>,
1077 get_fn: F,
1078 ) -> ProviderResult<impl Iterator<Item = ProviderResult<T>> + 'a>
1079 where
1080 F: Fn(&mut StaticFileCursor<'_>, u64) -> ProviderResult<Option<T>> + 'a,
1081 T: std::fmt::Debug,
1082 {
1083 let get_provider = move |start: u64| {
1084 if segment.is_block_based() {
1085 self.get_segment_provider_from_block(segment, start, None)
1086 } else {
1087 self.get_segment_provider_from_transaction(segment, start, None)
1088 }
1089 };
1090
1091 let mut provider = Some(get_provider(range.start)?);
1092 Ok(range.filter_map(move |number| {
1093 match get_fn(&mut provider.as_ref().expect("qed").cursor().ok()?, number).transpose() {
1094 Some(result) => Some(result),
1095 None => {
1096 provider.take();
1101 provider = Some(get_provider(number).ok()?);
1102 get_fn(&mut provider.as_ref().expect("qed").cursor().ok()?, number).transpose()
1103 }
1104 }
1105 }))
1106 }
1107
1108 pub fn directory(&self) -> &Path {
1110 &self.path
1111 }
1112
1113 pub fn get_with_static_file_or_database<T, FS, FD>(
1123 &self,
1124 segment: StaticFileSegment,
1125 number: u64,
1126 fetch_from_static_file: FS,
1127 fetch_from_database: FD,
1128 ) -> ProviderResult<Option<T>>
1129 where
1130 FS: Fn(&Self) -> ProviderResult<Option<T>>,
1131 FD: Fn() -> ProviderResult<Option<T>>,
1132 {
1133 let static_file_upper_bound = if segment.is_block_based() {
1135 self.get_highest_static_file_block(segment)
1136 } else {
1137 self.get_highest_static_file_tx(segment)
1138 };
1139
1140 if static_file_upper_bound
1141 .is_some_and(|static_file_upper_bound| static_file_upper_bound >= number)
1142 {
1143 return fetch_from_static_file(self)
1144 }
1145 fetch_from_database()
1146 }
1147
1148 pub fn get_range_with_static_file_or_database<T, P, FS, FD>(
1160 &self,
1161 segment: StaticFileSegment,
1162 mut block_or_tx_range: Range<u64>,
1163 fetch_from_static_file: FS,
1164 mut fetch_from_database: FD,
1165 mut predicate: P,
1166 ) -> ProviderResult<Vec<T>>
1167 where
1168 FS: Fn(&Self, Range<u64>, &mut P) -> ProviderResult<Vec<T>>,
1169 FD: FnMut(Range<u64>, P) -> ProviderResult<Vec<T>>,
1170 P: FnMut(&T) -> bool,
1171 {
1172 let mut data = Vec::new();
1173
1174 if let Some(static_file_upper_bound) = if segment.is_block_based() {
1176 self.get_highest_static_file_block(segment)
1177 } else {
1178 self.get_highest_static_file_tx(segment)
1179 } {
1180 if block_or_tx_range.start <= static_file_upper_bound {
1181 let end = block_or_tx_range.end.min(static_file_upper_bound + 1);
1182 data.extend(fetch_from_static_file(
1183 self,
1184 block_or_tx_range.start..end,
1185 &mut predicate,
1186 )?);
1187 block_or_tx_range.start = end;
1188 }
1189 }
1190
1191 if block_or_tx_range.end > block_or_tx_range.start {
1192 data.extend(fetch_from_database(block_or_tx_range, predicate)?)
1193 }
1194
1195 Ok(data)
1196 }
1197
1198 #[cfg(any(test, feature = "test-utils"))]
1200 pub fn path(&self) -> &Path {
1201 &self.path
1202 }
1203
1204 #[cfg(any(test, feature = "test-utils"))]
1206 pub fn tx_index(&self) -> &RwLock<SegmentRanges> {
1207 &self.static_files_tx_index
1208 }
1209}
1210
1211pub trait StaticFileWriter {
1213 type Primitives: Send + Sync + 'static;
1215
1216 fn get_writer(
1218 &self,
1219 block: BlockNumber,
1220 segment: StaticFileSegment,
1221 ) -> ProviderResult<StaticFileProviderRWRefMut<'_, Self::Primitives>>;
1222
1223 fn latest_writer(
1226 &self,
1227 segment: StaticFileSegment,
1228 ) -> ProviderResult<StaticFileProviderRWRefMut<'_, Self::Primitives>>;
1229
1230 fn commit(&self) -> ProviderResult<()>;
1232}
1233
1234impl<N: NodePrimitives> StaticFileWriter for StaticFileProvider<N> {
1235 type Primitives = N;
1236
1237 fn get_writer(
1238 &self,
1239 block: BlockNumber,
1240 segment: StaticFileSegment,
1241 ) -> ProviderResult<StaticFileProviderRWRefMut<'_, Self::Primitives>> {
1242 if self.access.is_read_only() {
1243 return Err(ProviderError::ReadOnlyStaticFileAccess)
1244 }
1245
1246 trace!(target: "provider::static_file", ?block, ?segment, "Getting static file writer.");
1247 self.writers.get_or_create(segment, || {
1248 StaticFileProviderRW::new(segment, block, Arc::downgrade(&self.0), self.metrics.clone())
1249 })
1250 }
1251
1252 fn latest_writer(
1253 &self,
1254 segment: StaticFileSegment,
1255 ) -> ProviderResult<StaticFileProviderRWRefMut<'_, Self::Primitives>> {
1256 self.get_writer(self.get_highest_static_file_block(segment).unwrap_or_default(), segment)
1257 }
1258
1259 fn commit(&self) -> ProviderResult<()> {
1260 self.writers.commit()
1261 }
1262}
1263
1264impl<N: NodePrimitives<BlockHeader: Value>> HeaderProvider for StaticFileProvider<N> {
1265 type Header = N::BlockHeader;
1266
1267 fn header(&self, block_hash: &BlockHash) -> ProviderResult<Option<Self::Header>> {
1268 self.find_static_file(StaticFileSegment::Headers, |jar_provider| {
1269 Ok(jar_provider
1270 .cursor()?
1271 .get_two::<HeaderWithHashMask<Self::Header>>(block_hash.into())?
1272 .and_then(|(header, hash)| {
1273 if &hash == block_hash {
1274 return Some(header)
1275 }
1276 None
1277 }))
1278 })
1279 }
1280
1281 fn header_by_number(&self, num: BlockNumber) -> ProviderResult<Option<Self::Header>> {
1282 self.get_segment_provider_from_block(StaticFileSegment::Headers, num, None)
1283 .and_then(|provider| provider.header_by_number(num))
1284 .or_else(|err| {
1285 if let ProviderError::MissingStaticFileBlock(_, _) = err {
1286 Ok(None)
1287 } else {
1288 Err(err)
1289 }
1290 })
1291 }
1292
1293 fn header_td(&self, block_hash: &BlockHash) -> ProviderResult<Option<U256>> {
1294 self.find_static_file(StaticFileSegment::Headers, |jar_provider| {
1295 Ok(jar_provider
1296 .cursor()?
1297 .get_two::<TDWithHashMask>(block_hash.into())?
1298 .and_then(|(td, hash)| (&hash == block_hash).then_some(td.0)))
1299 })
1300 }
1301
1302 fn header_td_by_number(&self, num: BlockNumber) -> ProviderResult<Option<U256>> {
1303 self.get_segment_provider_from_block(StaticFileSegment::Headers, num, None)
1304 .and_then(|provider| provider.header_td_by_number(num))
1305 .or_else(|err| {
1306 if let ProviderError::MissingStaticFileBlock(_, _) = err {
1307 Ok(None)
1308 } else {
1309 Err(err)
1310 }
1311 })
1312 }
1313
1314 fn headers_range(
1315 &self,
1316 range: impl RangeBounds<BlockNumber>,
1317 ) -> ProviderResult<Vec<Self::Header>> {
1318 self.fetch_range_with_predicate(
1319 StaticFileSegment::Headers,
1320 to_range(range),
1321 |cursor, number| cursor.get_one::<HeaderMask<Self::Header>>(number.into()),
1322 |_| true,
1323 )
1324 }
1325
1326 fn sealed_header(
1327 &self,
1328 num: BlockNumber,
1329 ) -> ProviderResult<Option<SealedHeader<Self::Header>>> {
1330 self.get_segment_provider_from_block(StaticFileSegment::Headers, num, None)
1331 .and_then(|provider| provider.sealed_header(num))
1332 .or_else(|err| {
1333 if let ProviderError::MissingStaticFileBlock(_, _) = err {
1334 Ok(None)
1335 } else {
1336 Err(err)
1337 }
1338 })
1339 }
1340
1341 fn sealed_headers_while(
1342 &self,
1343 range: impl RangeBounds<BlockNumber>,
1344 predicate: impl FnMut(&SealedHeader<Self::Header>) -> bool,
1345 ) -> ProviderResult<Vec<SealedHeader<Self::Header>>> {
1346 self.fetch_range_with_predicate(
1347 StaticFileSegment::Headers,
1348 to_range(range),
1349 |cursor, number| {
1350 Ok(cursor
1351 .get_two::<HeaderWithHashMask<Self::Header>>(number.into())?
1352 .map(|(header, hash)| SealedHeader::new(header, hash)))
1353 },
1354 predicate,
1355 )
1356 }
1357}
1358
1359impl<N: NodePrimitives> BlockHashReader for StaticFileProvider<N> {
1360 fn block_hash(&self, num: u64) -> ProviderResult<Option<B256>> {
1361 self.get_segment_provider_from_block(StaticFileSegment::Headers, num, None)?.block_hash(num)
1362 }
1363
1364 fn canonical_hashes_range(
1365 &self,
1366 start: BlockNumber,
1367 end: BlockNumber,
1368 ) -> ProviderResult<Vec<B256>> {
1369 self.fetch_range_with_predicate(
1370 StaticFileSegment::Headers,
1371 start..end,
1372 |cursor, number| cursor.get_one::<BlockHashMask>(number.into()),
1373 |_| true,
1374 )
1375 }
1376}
1377
1378impl<N: NodePrimitives<SignedTx: Value + SignedTransaction, Receipt: Value>> ReceiptProvider
1379 for StaticFileProvider<N>
1380{
1381 type Receipt = N::Receipt;
1382
1383 fn receipt(&self, num: TxNumber) -> ProviderResult<Option<Self::Receipt>> {
1384 self.get_segment_provider_from_transaction(StaticFileSegment::Receipts, num, None)
1385 .and_then(|provider| provider.receipt(num))
1386 .or_else(|err| {
1387 if let ProviderError::MissingStaticFileTx(_, _) = err {
1388 Ok(None)
1389 } else {
1390 Err(err)
1391 }
1392 })
1393 }
1394
1395 fn receipt_by_hash(&self, hash: TxHash) -> ProviderResult<Option<Self::Receipt>> {
1396 if let Some(num) = self.transaction_id(hash)? {
1397 return self.receipt(num)
1398 }
1399 Ok(None)
1400 }
1401
1402 fn receipts_by_block(
1403 &self,
1404 _block: BlockHashOrNumber,
1405 ) -> ProviderResult<Option<Vec<Self::Receipt>>> {
1406 unreachable!()
1407 }
1408
1409 fn receipts_by_tx_range(
1410 &self,
1411 range: impl RangeBounds<TxNumber>,
1412 ) -> ProviderResult<Vec<Self::Receipt>> {
1413 self.fetch_range_with_predicate(
1414 StaticFileSegment::Receipts,
1415 to_range(range),
1416 |cursor, number| cursor.get_one::<ReceiptMask<Self::Receipt>>(number.into()),
1417 |_| true,
1418 )
1419 }
1420}
1421
1422impl<N: FullNodePrimitives<SignedTx: Value, Receipt: Value, BlockHeader: Value>>
1423 TransactionsProviderExt for StaticFileProvider<N>
1424{
1425 fn transaction_hashes_by_range(
1426 &self,
1427 tx_range: Range<TxNumber>,
1428 ) -> ProviderResult<Vec<(TxHash, TxNumber)>> {
1429 let tx_range_size = (tx_range.end - tx_range.start) as usize;
1430
1431 let chunk_size = 100;
1435
1436 let chunks = tx_range
1438 .clone()
1439 .step_by(chunk_size)
1440 .map(|start| start..std::cmp::min(start + chunk_size as u64, tx_range.end));
1441 let mut channels = Vec::with_capacity(tx_range_size.div_ceil(chunk_size));
1442
1443 for chunk_range in chunks {
1444 let (channel_tx, channel_rx) = mpsc::channel();
1445 channels.push(channel_rx);
1446
1447 let manager = self.clone();
1448
1449 rayon::spawn(move || {
1453 let mut rlp_buf = Vec::with_capacity(128);
1454 let _ = manager.fetch_range_with_predicate(
1455 StaticFileSegment::Transactions,
1456 chunk_range,
1457 |cursor, number| {
1458 Ok(cursor
1459 .get_one::<TransactionMask<Self::Transaction>>(number.into())?
1460 .map(|transaction| {
1461 rlp_buf.clear();
1462 let _ = channel_tx
1463 .send(calculate_hash((number, transaction), &mut rlp_buf));
1464 }))
1465 },
1466 |_| true,
1467 );
1468 });
1469 }
1470
1471 let mut tx_list = Vec::with_capacity(tx_range_size);
1472
1473 for channel in channels {
1475 while let Ok(tx) = channel.recv() {
1476 let (tx_hash, tx_id) = tx.map_err(|boxed| *boxed)?;
1477 tx_list.push((tx_hash, tx_id));
1478 }
1479 }
1480
1481 Ok(tx_list)
1482 }
1483}
1484
1485impl<N: NodePrimitives<SignedTx: Decompress + SignedTransaction>> TransactionsProvider
1486 for StaticFileProvider<N>
1487{
1488 type Transaction = N::SignedTx;
1489
1490 fn transaction_id(&self, tx_hash: TxHash) -> ProviderResult<Option<TxNumber>> {
1491 self.find_static_file(StaticFileSegment::Transactions, |jar_provider| {
1492 let mut cursor = jar_provider.cursor()?;
1493 if cursor
1494 .get_one::<TransactionMask<Self::Transaction>>((&tx_hash).into())?
1495 .and_then(|tx| (tx.trie_hash() == tx_hash).then_some(tx))
1496 .is_some()
1497 {
1498 Ok(cursor.number())
1499 } else {
1500 Ok(None)
1501 }
1502 })
1503 }
1504
1505 fn transaction_by_id(&self, num: TxNumber) -> ProviderResult<Option<Self::Transaction>> {
1506 self.get_segment_provider_from_transaction(StaticFileSegment::Transactions, num, None)
1507 .and_then(|provider| provider.transaction_by_id(num))
1508 .or_else(|err| {
1509 if let ProviderError::MissingStaticFileTx(_, _) = err {
1510 Ok(None)
1511 } else {
1512 Err(err)
1513 }
1514 })
1515 }
1516
1517 fn transaction_by_id_unhashed(
1518 &self,
1519 num: TxNumber,
1520 ) -> ProviderResult<Option<Self::Transaction>> {
1521 self.get_segment_provider_from_transaction(StaticFileSegment::Transactions, num, None)
1522 .and_then(|provider| provider.transaction_by_id_unhashed(num))
1523 .or_else(|err| {
1524 if let ProviderError::MissingStaticFileTx(_, _) = err {
1525 Ok(None)
1526 } else {
1527 Err(err)
1528 }
1529 })
1530 }
1531
1532 fn transaction_by_hash(&self, hash: TxHash) -> ProviderResult<Option<Self::Transaction>> {
1533 self.find_static_file(StaticFileSegment::Transactions, |jar_provider| {
1534 Ok(jar_provider
1535 .cursor()?
1536 .get_one::<TransactionMask<Self::Transaction>>((&hash).into())?
1537 .and_then(|tx| (tx.trie_hash() == hash).then_some(tx)))
1538 })
1539 }
1540
1541 fn transaction_by_hash_with_meta(
1542 &self,
1543 _hash: TxHash,
1544 ) -> ProviderResult<Option<(Self::Transaction, TransactionMeta)>> {
1545 Err(ProviderError::UnsupportedProvider)
1547 }
1548
1549 fn transaction_block(&self, _id: TxNumber) -> ProviderResult<Option<BlockNumber>> {
1550 Err(ProviderError::UnsupportedProvider)
1552 }
1553
1554 fn transactions_by_block(
1555 &self,
1556 _block_id: BlockHashOrNumber,
1557 ) -> ProviderResult<Option<Vec<Self::Transaction>>> {
1558 Err(ProviderError::UnsupportedProvider)
1560 }
1561
1562 fn transactions_by_block_range(
1563 &self,
1564 _range: impl RangeBounds<BlockNumber>,
1565 ) -> ProviderResult<Vec<Vec<Self::Transaction>>> {
1566 Err(ProviderError::UnsupportedProvider)
1568 }
1569
1570 fn transactions_by_tx_range(
1571 &self,
1572 range: impl RangeBounds<TxNumber>,
1573 ) -> ProviderResult<Vec<Self::Transaction>> {
1574 self.fetch_range_with_predicate(
1575 StaticFileSegment::Transactions,
1576 to_range(range),
1577 |cursor, number| cursor.get_one::<TransactionMask<Self::Transaction>>(number.into()),
1578 |_| true,
1579 )
1580 }
1581
1582 fn senders_by_tx_range(
1583 &self,
1584 range: impl RangeBounds<TxNumber>,
1585 ) -> ProviderResult<Vec<Address>> {
1586 let txes = self.transactions_by_tx_range(range)?;
1587 Ok(reth_primitives_traits::transaction::recover::recover_signers(&txes)?)
1588 }
1589
1590 fn transaction_sender(&self, id: TxNumber) -> ProviderResult<Option<Address>> {
1591 match self.transaction_by_id_unhashed(id)? {
1592 Some(tx) => Ok(tx.recover_signer().ok()),
1593 None => Ok(None),
1594 }
1595 }
1596}
1597
1598impl<N: NodePrimitives> BlockNumReader for StaticFileProvider<N> {
1601 fn chain_info(&self) -> ProviderResult<ChainInfo> {
1602 Err(ProviderError::UnsupportedProvider)
1604 }
1605
1606 fn best_block_number(&self) -> ProviderResult<BlockNumber> {
1607 Err(ProviderError::UnsupportedProvider)
1609 }
1610
1611 fn last_block_number(&self) -> ProviderResult<BlockNumber> {
1612 Err(ProviderError::UnsupportedProvider)
1614 }
1615
1616 fn block_number(&self, _hash: B256) -> ProviderResult<Option<BlockNumber>> {
1617 Err(ProviderError::UnsupportedProvider)
1619 }
1620}
1621
1622impl<N: FullNodePrimitives<SignedTx: Value, Receipt: Value, BlockHeader: Value>> BlockReader
1623 for StaticFileProvider<N>
1624{
1625 type Block = N::Block;
1626
1627 fn find_block_by_hash(
1628 &self,
1629 _hash: B256,
1630 _source: BlockSource,
1631 ) -> ProviderResult<Option<Self::Block>> {
1632 Err(ProviderError::UnsupportedProvider)
1634 }
1635
1636 fn block(&self, _id: BlockHashOrNumber) -> ProviderResult<Option<Self::Block>> {
1637 Err(ProviderError::UnsupportedProvider)
1639 }
1640
1641 fn pending_block(&self) -> ProviderResult<Option<SealedBlock<Self::Block>>> {
1642 Err(ProviderError::UnsupportedProvider)
1644 }
1645
1646 fn pending_block_with_senders(&self) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
1647 Err(ProviderError::UnsupportedProvider)
1649 }
1650
1651 fn pending_block_and_receipts(
1652 &self,
1653 ) -> ProviderResult<Option<(SealedBlock<Self::Block>, Vec<Self::Receipt>)>> {
1654 Err(ProviderError::UnsupportedProvider)
1656 }
1657
1658 fn recovered_block(
1659 &self,
1660 _id: BlockHashOrNumber,
1661 _transaction_kind: TransactionVariant,
1662 ) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
1663 Err(ProviderError::UnsupportedProvider)
1665 }
1666
1667 fn sealed_block_with_senders(
1668 &self,
1669 _id: BlockHashOrNumber,
1670 _transaction_kind: TransactionVariant,
1671 ) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
1672 Err(ProviderError::UnsupportedProvider)
1674 }
1675
1676 fn block_range(&self, _range: RangeInclusive<BlockNumber>) -> ProviderResult<Vec<Self::Block>> {
1677 Err(ProviderError::UnsupportedProvider)
1679 }
1680
1681 fn block_with_senders_range(
1682 &self,
1683 _range: RangeInclusive<BlockNumber>,
1684 ) -> ProviderResult<Vec<RecoveredBlock<Self::Block>>> {
1685 Err(ProviderError::UnsupportedProvider)
1686 }
1687
1688 fn recovered_block_range(
1689 &self,
1690 _range: RangeInclusive<BlockNumber>,
1691 ) -> ProviderResult<Vec<RecoveredBlock<Self::Block>>> {
1692 Err(ProviderError::UnsupportedProvider)
1693 }
1694}
1695
1696impl<N: NodePrimitives> WithdrawalsProvider for StaticFileProvider<N> {
1697 fn withdrawals_by_block(
1698 &self,
1699 id: BlockHashOrNumber,
1700 timestamp: u64,
1701 ) -> ProviderResult<Option<Withdrawals>> {
1702 if let Some(num) = id.as_number() {
1703 return self
1704 .get_segment_provider_from_block(StaticFileSegment::BlockMeta, num, None)
1705 .and_then(|provider| provider.withdrawals_by_block(id, timestamp))
1706 .or_else(|err| {
1707 if let ProviderError::MissingStaticFileBlock(_, _) = err {
1708 Ok(None)
1709 } else {
1710 Err(err)
1711 }
1712 })
1713 }
1714 Err(ProviderError::UnsupportedProvider)
1716 }
1717}
1718
1719impl<N: FullNodePrimitives<BlockHeader: Value>> OmmersProvider for StaticFileProvider<N> {
1720 fn ommers(&self, id: BlockHashOrNumber) -> ProviderResult<Option<Vec<Self::Header>>> {
1721 if let Some(num) = id.as_number() {
1722 return self
1723 .get_segment_provider_from_block(StaticFileSegment::BlockMeta, num, None)
1724 .and_then(|provider| provider.ommers(id))
1725 .or_else(|err| {
1726 if let ProviderError::MissingStaticFileBlock(_, _) = err {
1727 Ok(None)
1728 } else {
1729 Err(err)
1730 }
1731 })
1732 }
1733 Err(ProviderError::UnsupportedProvider)
1735 }
1736}
1737
1738impl<N: NodePrimitives> BlockBodyIndicesProvider for StaticFileProvider<N> {
1739 fn block_body_indices(&self, num: u64) -> ProviderResult<Option<StoredBlockBodyIndices>> {
1740 self.get_segment_provider_from_block(StaticFileSegment::BlockMeta, num, None)
1741 .and_then(|provider| provider.block_body_indices(num))
1742 .or_else(|err| {
1743 if let ProviderError::MissingStaticFileBlock(_, _) = err {
1744 Ok(None)
1745 } else {
1746 Err(err)
1747 }
1748 })
1749 }
1750
1751 fn block_body_indices_range(
1752 &self,
1753 range: RangeInclusive<BlockNumber>,
1754 ) -> ProviderResult<Vec<StoredBlockBodyIndices>> {
1755 self.fetch_range_with_predicate(
1756 StaticFileSegment::BlockMeta,
1757 *range.start()..*range.end() + 1,
1758 |cursor, number| cursor.get_one::<BodyIndicesMask>(number.into()),
1759 |_| true,
1760 )
1761 }
1762}
1763
1764impl<N: NodePrimitives> StatsReader for StaticFileProvider<N> {
1765 fn count_entries<T: Table>(&self) -> ProviderResult<usize> {
1766 match T::NAME {
1767 tables::CanonicalHeaders::NAME |
1768 tables::Headers::<Header>::NAME |
1769 tables::HeaderTerminalDifficulties::NAME => Ok(self
1770 .get_highest_static_file_block(StaticFileSegment::Headers)
1771 .map(|block| block + 1)
1772 .unwrap_or_default()
1773 as usize),
1774 tables::Receipts::<Receipt>::NAME => Ok(self
1775 .get_highest_static_file_tx(StaticFileSegment::Receipts)
1776 .map(|receipts| receipts + 1)
1777 .unwrap_or_default() as usize),
1778 tables::Transactions::<TransactionSigned>::NAME => Ok(self
1779 .get_highest_static_file_tx(StaticFileSegment::Transactions)
1780 .map(|txs| txs + 1)
1781 .unwrap_or_default()
1782 as usize),
1783 _ => Err(ProviderError::UnsupportedProvider),
1784 }
1785 }
1786}
1787
1788#[inline]
1790fn calculate_hash<T>(
1791 entry: (TxNumber, T),
1792 rlp_buf: &mut Vec<u8>,
1793) -> Result<(B256, TxNumber), Box<ProviderError>>
1794where
1795 T: Encodable2718,
1796{
1797 let (tx_id, tx) = entry;
1798 tx.encode_2718(rlp_buf);
1799 Ok((keccak256(rlp_buf), tx_id))
1800}