1use super::{
2 metrics::StaticFileProviderMetrics, writer::StaticFileWriters, LoadedJar,
3 StaticFileJarProvider, StaticFileProviderRW, StaticFileProviderRWRefMut,
4};
5use crate::{
6 to_range, BlockHashReader, BlockNumReader, BlockReader, BlockSource, HeaderProvider,
7 ReceiptProvider, StageCheckpointReader, StatsReader, TransactionVariant, TransactionsProvider,
8 TransactionsProviderExt, WithdrawalsProvider,
9};
10use alloy_consensus::{transaction::TransactionMeta, Header};
11use alloy_eips::{eip2718::Encodable2718, eip4895::Withdrawals, BlockHashOrNumber};
12use alloy_primitives::{
13 b256, keccak256, Address, BlockHash, BlockNumber, TxHash, TxNumber, B256, U256,
14};
15use dashmap::DashMap;
16use notify::{RecommendedWatcher, RecursiveMode, Watcher};
17use parking_lot::RwLock;
18use reth_chainspec::{ChainInfo, ChainSpecProvider, EthChainSpec};
19use reth_db::{
20 lockfile::StorageLock,
21 static_file::{
22 iter_static_files, BlockHashMask, BodyIndicesMask, HeaderMask, HeaderWithHashMask,
23 ReceiptMask, StaticFileCursor, TDWithHashMask, TransactionMask,
24 },
25};
26use reth_db_api::{
27 cursor::DbCursorRO,
28 models::StoredBlockBodyIndices,
29 table::{Decompress, Table, Value},
30 tables,
31 transaction::DbTx,
32};
33use reth_ethereum_primitives::{Receipt, TransactionSigned};
34use reth_nippy_jar::{NippyJar, NippyJarChecker, CONFIG_FILE_EXTENSION};
35use reth_node_types::{FullNodePrimitives, NodePrimitives};
36use reth_primitives_traits::{RecoveredBlock, SealedBlock, SealedHeader, SignedTransaction};
37use reth_stages_types::{PipelineTarget, StageId};
38use reth_static_file_types::{
39 find_fixed_range, HighestStaticFiles, SegmentHeader, SegmentRangeInclusive, StaticFileSegment,
40 DEFAULT_BLOCKS_PER_STATIC_FILE,
41};
42use reth_storage_api::{BlockBodyIndicesProvider, DBProvider, OmmersProvider};
43use reth_storage_errors::provider::{ProviderError, ProviderResult};
44use std::{
45 collections::{hash_map::Entry, BTreeMap, HashMap},
46 fmt::Debug,
47 marker::PhantomData,
48 ops::{Deref, Range, RangeBounds, RangeInclusive},
49 path::{Path, PathBuf},
50 sync::{mpsc, Arc},
51};
52use tracing::{info, trace, warn};
53
54type SegmentRanges = HashMap<StaticFileSegment, BTreeMap<TxNumber, SegmentRangeInclusive>>;
58
59#[derive(Debug, Default, PartialEq, Eq)]
61pub enum StaticFileAccess {
62 #[default]
64 RO,
65 RW,
67}
68
69impl StaticFileAccess {
70 pub const fn is_read_only(&self) -> bool {
72 matches!(self, Self::RO)
73 }
74
75 pub const fn is_read_write(&self) -> bool {
77 matches!(self, Self::RW)
78 }
79}
80
81#[derive(Debug)]
90pub struct StaticFileProvider<N>(pub(crate) Arc<StaticFileProviderInner<N>>);
91
92impl<N> Clone for StaticFileProvider<N> {
93 fn clone(&self) -> Self {
94 Self(self.0.clone())
95 }
96}
97
98impl<N: NodePrimitives> StaticFileProvider<N> {
99 fn new(path: impl AsRef<Path>, access: StaticFileAccess) -> ProviderResult<Self> {
101 let provider = Self(Arc::new(StaticFileProviderInner::new(path, access)?));
102 provider.initialize_index()?;
103 Ok(provider)
104 }
105
106 pub fn read_only(path: impl AsRef<Path>, watch_directory: bool) -> ProviderResult<Self> {
116 let provider = Self::new(path, StaticFileAccess::RO)?;
117
118 if watch_directory {
119 provider.watch_directory();
120 }
121
122 Ok(provider)
123 }
124
125 pub fn read_write(path: impl AsRef<Path>) -> ProviderResult<Self> {
127 Self::new(path, StaticFileAccess::RW)
128 }
129
130 pub fn watch_directory(&self) {
136 let provider = self.clone();
137 std::thread::spawn(move || {
138 let (tx, rx) = std::sync::mpsc::channel();
139 let mut watcher = RecommendedWatcher::new(
140 move |res| tx.send(res).unwrap(),
141 notify::Config::default(),
142 )
143 .expect("failed to create watcher");
144
145 watcher
146 .watch(&provider.path, RecursiveMode::NonRecursive)
147 .expect("failed to watch path");
148
149 let mut last_event_timestamp = None;
151
152 while let Ok(res) = rx.recv() {
153 match res {
154 Ok(event) => {
155 if !matches!(
157 event.kind,
158 notify::EventKind::Modify(_) |
159 notify::EventKind::Create(_) |
160 notify::EventKind::Remove(_)
161 ) {
162 continue
163 }
164
165 for segment in event.paths {
170 if segment
172 .extension()
173 .is_none_or(|s| s.to_str() != Some(CONFIG_FILE_EXTENSION))
174 {
175 continue
176 }
177
178 if StaticFileSegment::parse_filename(
180 &segment.file_stem().expect("qed").to_string_lossy(),
181 )
182 .is_none()
183 {
184 continue
185 }
186
187 if let Ok(current_modified_timestamp) =
190 std::fs::metadata(&segment).and_then(|m| m.modified())
191 {
192 if last_event_timestamp.is_some_and(|last_timestamp| {
193 last_timestamp >= current_modified_timestamp
194 }) {
195 continue
196 }
197 last_event_timestamp = Some(current_modified_timestamp);
198 }
199
200 info!(target: "providers::static_file", updated_file = ?segment.file_stem(), "re-initializing static file provider index");
201 if let Err(err) = provider.initialize_index() {
202 warn!(target: "providers::static_file", "failed to re-initialize index: {err}");
203 }
204 break
205 }
206 }
207
208 Err(err) => warn!(target: "providers::watcher", "watch error: {err:?}"),
209 }
210 }
211 });
212 }
213}
214
215impl<N: NodePrimitives> Deref for StaticFileProvider<N> {
216 type Target = StaticFileProviderInner<N>;
217
218 fn deref(&self) -> &Self::Target {
219 &self.0
220 }
221}
222
223#[derive(Debug)]
225pub struct StaticFileProviderInner<N> {
226 map: DashMap<(BlockNumber, StaticFileSegment), LoadedJar>,
229 static_files_max_block: RwLock<HashMap<StaticFileSegment, u64>>,
231 static_files_tx_index: RwLock<SegmentRanges>,
233 path: PathBuf,
235 writers: StaticFileWriters<N>,
237 metrics: Option<Arc<StaticFileProviderMetrics>>,
239 access: StaticFileAccess,
241 blocks_per_file: u64,
243 _lock_file: Option<StorageLock>,
245 _pd: PhantomData<N>,
247}
248
249impl<N: NodePrimitives> StaticFileProviderInner<N> {
250 fn new(path: impl AsRef<Path>, access: StaticFileAccess) -> ProviderResult<Self> {
252 let _lock_file = if access.is_read_write() {
253 StorageLock::try_acquire(path.as_ref()).map_err(ProviderError::other)?.into()
254 } else {
255 None
256 };
257
258 let provider = Self {
259 map: Default::default(),
260 writers: Default::default(),
261 static_files_max_block: Default::default(),
262 static_files_tx_index: Default::default(),
263 path: path.as_ref().to_path_buf(),
264 metrics: None,
265 access,
266 blocks_per_file: DEFAULT_BLOCKS_PER_STATIC_FILE,
267 _lock_file,
268 _pd: Default::default(),
269 };
270
271 Ok(provider)
272 }
273
274 pub const fn is_read_only(&self) -> bool {
275 self.access.is_read_only()
276 }
277
278 pub const fn find_fixed_range(&self, block: BlockNumber) -> SegmentRangeInclusive {
281 find_fixed_range(block, self.blocks_per_file)
282 }
283}
284
285impl<N: NodePrimitives> StaticFileProvider<N> {
286 #[cfg(any(test, feature = "test-utils"))]
288 pub fn with_custom_blocks_per_file(self, blocks_per_file: u64) -> Self {
289 let mut provider =
290 Arc::try_unwrap(self.0).expect("should be called when initializing only");
291 provider.blocks_per_file = blocks_per_file;
292 Self(Arc::new(provider))
293 }
294
295 pub fn with_metrics(self) -> Self {
297 let mut provider =
298 Arc::try_unwrap(self.0).expect("should be called when initializing only");
299 provider.metrics = Some(Arc::new(StaticFileProviderMetrics::default()));
300 Self(Arc::new(provider))
301 }
302
303 pub fn report_metrics(&self) -> ProviderResult<()> {
305 let Some(metrics) = &self.metrics else { return Ok(()) };
306
307 let static_files = iter_static_files(&self.path).map_err(ProviderError::other)?;
308 for (segment, ranges) in static_files {
309 let mut entries = 0;
310 let mut size = 0;
311
312 for (block_range, _) in &ranges {
313 let fixed_block_range = self.find_fixed_range(block_range.start());
314 let jar_provider = self
315 .get_segment_provider(segment, || Some(fixed_block_range), None)?
316 .ok_or_else(|| {
317 ProviderError::MissingStaticFileBlock(segment, block_range.start())
318 })?;
319
320 entries += jar_provider.rows();
321
322 let data_size = reth_fs_util::metadata(jar_provider.data_path())
323 .map(|metadata| metadata.len())
324 .unwrap_or_default();
325 let index_size = reth_fs_util::metadata(jar_provider.index_path())
326 .map(|metadata| metadata.len())
327 .unwrap_or_default();
328 let offsets_size = reth_fs_util::metadata(jar_provider.offsets_path())
329 .map(|metadata| metadata.len())
330 .unwrap_or_default();
331 let config_size = reth_fs_util::metadata(jar_provider.config_path())
332 .map(|metadata| metadata.len())
333 .unwrap_or_default();
334
335 size += data_size + index_size + offsets_size + config_size;
336 }
337
338 metrics.record_segment(segment, size, ranges.len(), entries);
339 }
340
341 Ok(())
342 }
343
344 pub fn get_segment_provider_from_block(
346 &self,
347 segment: StaticFileSegment,
348 block: BlockNumber,
349 path: Option<&Path>,
350 ) -> ProviderResult<StaticFileJarProvider<'_, N>> {
351 self.get_segment_provider(
352 segment,
353 || self.get_segment_ranges_from_block(segment, block),
354 path,
355 )?
356 .ok_or(ProviderError::MissingStaticFileBlock(segment, block))
357 }
358
359 pub fn get_segment_provider_from_transaction(
361 &self,
362 segment: StaticFileSegment,
363 tx: TxNumber,
364 path: Option<&Path>,
365 ) -> ProviderResult<StaticFileJarProvider<'_, N>> {
366 self.get_segment_provider(
367 segment,
368 || self.get_segment_ranges_from_transaction(segment, tx),
369 path,
370 )?
371 .ok_or(ProviderError::MissingStaticFileTx(segment, tx))
372 }
373
374 pub fn get_segment_provider(
378 &self,
379 segment: StaticFileSegment,
380 fn_range: impl Fn() -> Option<SegmentRangeInclusive>,
381 path: Option<&Path>,
382 ) -> ProviderResult<Option<StaticFileJarProvider<'_, N>>> {
383 let block_range = match path {
386 Some(path) => StaticFileSegment::parse_filename(
387 &path
388 .file_name()
389 .ok_or_else(|| {
390 ProviderError::MissingStaticFilePath(segment, path.to_path_buf())
391 })?
392 .to_string_lossy(),
393 )
394 .and_then(|(parsed_segment, block_range)| {
395 if parsed_segment == segment {
396 return Some(block_range)
397 }
398 None
399 }),
400 None => fn_range(),
401 };
402
403 if let Some(block_range) = block_range {
405 return Ok(Some(self.get_or_create_jar_provider(segment, &block_range)?))
406 }
407
408 Ok(None)
409 }
410
411 pub fn remove_cached_provider(
415 &self,
416 segment: StaticFileSegment,
417 fixed_block_range_end: BlockNumber,
418 ) {
419 self.map.remove(&(fixed_block_range_end, segment));
420 }
421
422 pub fn delete_jar(&self, segment: StaticFileSegment, block: BlockNumber) -> ProviderResult<()> {
426 let fixed_block_range = self.find_fixed_range(block);
427 let key = (fixed_block_range.end(), segment);
428 let jar = if let Some((_, jar)) = self.map.remove(&key) {
429 jar.jar
430 } else {
431 NippyJar::<SegmentHeader>::load(&self.path.join(segment.filename(&fixed_block_range)))
432 .map_err(ProviderError::other)?
433 };
434
435 jar.delete().map_err(ProviderError::other)?;
436
437 let mut segment_max_block = None;
438 if fixed_block_range.start() > 0 {
439 segment_max_block = Some(fixed_block_range.start() - 1)
440 };
441 self.update_index(segment, segment_max_block)?;
442
443 Ok(())
444 }
445
446 fn get_or_create_jar_provider(
450 &self,
451 segment: StaticFileSegment,
452 fixed_block_range: &SegmentRangeInclusive,
453 ) -> ProviderResult<StaticFileJarProvider<'_, N>> {
454 let key = (fixed_block_range.end(), segment);
455
456 trace!(target: "provider::static_file", ?segment, ?fixed_block_range, "Getting provider");
458 let mut provider: StaticFileJarProvider<'_, N> = if let Some(jar) = self.map.get(&key) {
459 trace!(target: "provider::static_file", ?segment, ?fixed_block_range, "Jar found in cache");
460 jar.into()
461 } else {
462 trace!(target: "provider::static_file", ?segment, ?fixed_block_range, "Creating jar from scratch");
463 let path = self.path.join(segment.filename(fixed_block_range));
464 let jar = NippyJar::load(&path).map_err(ProviderError::other)?;
465 self.map.entry(key).insert(LoadedJar::new(jar)?).downgrade().into()
466 };
467
468 if let Some(metrics) = &self.metrics {
469 provider = provider.with_metrics(metrics.clone());
470 }
471 Ok(provider)
472 }
473
474 fn get_segment_ranges_from_block(
477 &self,
478 segment: StaticFileSegment,
479 block: u64,
480 ) -> Option<SegmentRangeInclusive> {
481 self.static_files_max_block
482 .read()
483 .get(&segment)
484 .filter(|max| **max >= block)
485 .map(|_| self.find_fixed_range(block))
486 }
487
488 fn get_segment_ranges_from_transaction(
491 &self,
492 segment: StaticFileSegment,
493 tx: u64,
494 ) -> Option<SegmentRangeInclusive> {
495 let static_files = self.static_files_tx_index.read();
496 let segment_static_files = static_files.get(&segment)?;
497
498 let mut static_files_rev_iter = segment_static_files.iter().rev().peekable();
501
502 while let Some((tx_end, block_range)) = static_files_rev_iter.next() {
503 if tx > *tx_end {
504 return None
506 }
507 let tx_start = static_files_rev_iter.peek().map(|(tx_end, _)| *tx_end + 1).unwrap_or(0);
508 if tx_start <= tx {
509 return Some(self.find_fixed_range(block_range.end()))
510 }
511 }
512 None
513 }
514
515 pub fn update_index(
522 &self,
523 segment: StaticFileSegment,
524 segment_max_block: Option<BlockNumber>,
525 ) -> ProviderResult<()> {
526 let mut max_block = self.static_files_max_block.write();
527 let mut tx_index = self.static_files_tx_index.write();
528
529 match segment_max_block {
530 Some(segment_max_block) => {
531 max_block.insert(segment, segment_max_block);
533 let fixed_range = self.find_fixed_range(segment_max_block);
534
535 let jar = NippyJar::<SegmentHeader>::load(
536 &self.path.join(segment.filename(&fixed_range)),
537 )
538 .map_err(ProviderError::other)?;
539
540 if let Some(tx_range) = jar.user_header().tx_range() {
543 let tx_end = tx_range.end();
544
545 if let Some(current_block_range) = jar.user_header().block_range().copied() {
548 tx_index
557 .entry(segment)
558 .and_modify(|index| {
559 index.retain(|_, block_range| {
560 block_range.start() < fixed_range.start()
561 });
562 index.insert(tx_end, current_block_range);
563 })
564 .or_insert_with(|| BTreeMap::from([(tx_end, current_block_range)]));
565 }
566 } else if segment.is_tx_based() {
567 tx_index.entry(segment).and_modify(|index| {
571 index.retain(|_, block_range| block_range.start() < fixed_range.start());
572 });
573
574 if tx_index.get(&segment).is_some_and(|index| index.is_empty()) {
576 tx_index.remove(&segment);
577 }
578 }
579
580 self.map.insert((fixed_range.end(), segment), LoadedJar::new(jar)?);
582
583 self.map.retain(|(end, seg), _| !(*seg == segment && *end > fixed_range.end()));
585 }
586 None => {
587 tx_index.remove(&segment);
588 max_block.remove(&segment);
589 }
590 };
591
592 Ok(())
593 }
594
595 pub fn initialize_index(&self) -> ProviderResult<()> {
597 let mut max_block = self.static_files_max_block.write();
598 let mut tx_index = self.static_files_tx_index.write();
599
600 max_block.clear();
601 tx_index.clear();
602
603 for (segment, ranges) in iter_static_files(&self.path).map_err(ProviderError::other)? {
604 if let Some((block_range, _)) = ranges.last() {
606 max_block.insert(segment, block_range.end());
607 }
608
609 for (block_range, tx_range) in ranges {
611 if let Some(tx_range) = tx_range {
612 let tx_end = tx_range.end();
613
614 match tx_index.entry(segment) {
615 Entry::Occupied(mut index) => {
616 index.get_mut().insert(tx_end, block_range);
617 }
618 Entry::Vacant(index) => {
619 index.insert(BTreeMap::from([(tx_end, block_range)]));
620 }
621 };
622 }
623 }
624 }
625
626 self.map.clear();
628
629 Ok(())
630 }
631
632 pub fn check_consistency<Provider>(
656 &self,
657 provider: &Provider,
658 has_receipt_pruning: bool,
659 ) -> ProviderResult<Option<PipelineTarget>>
660 where
661 Provider: DBProvider + BlockReader + StageCheckpointReader + ChainSpecProvider,
662 N: NodePrimitives<Receipt: Value, BlockHeader: Value, SignedTx: Value>,
663 {
664 if provider.chain_spec().is_optimism() &&
671 reth_chainspec::Chain::optimism_mainnet() == provider.chain_spec().chain_id()
672 {
673 const OVM_HEADER_1_HASH: B256 =
675 b256!("0xbee7192e575af30420cae0c7776304ac196077ee72b048970549e4f08e875453");
676 if provider.block_number(OVM_HEADER_1_HASH)?.is_some() {
677 info!(target: "reth::cli",
678 "Skipping storage verification for OP mainnet, expected inconsistency in OVM chain"
679 );
680 return Ok(None)
681 }
682 }
683
684 info!(target: "reth::cli", "Verifying storage consistency.");
685
686 let mut unwind_target: Option<BlockNumber> = None;
687 let mut update_unwind_target = |new_target: BlockNumber| {
688 if let Some(target) = unwind_target.as_mut() {
689 *target = (*target).min(new_target);
690 } else {
691 unwind_target = Some(new_target);
692 }
693 };
694
695 for segment in StaticFileSegment::iter() {
696 if segment.is_block_meta() {
698 continue
699 }
700
701 if has_receipt_pruning && segment.is_receipts() {
702 continue
704 }
705
706 let initial_highest_block = self.get_highest_static_file_block(segment);
707
708 if self.access.is_read_only() {
717 self.check_segment_consistency(segment)?;
718 } else {
719 self.latest_writer(segment)?;
721 }
722
723 let mut highest_block = self.get_highest_static_file_block(segment);
728 if initial_highest_block != highest_block {
729 info!(
730 target: "reth::providers::static_file",
731 ?initial_highest_block,
732 unwind_target = highest_block,
733 ?segment,
734 "Setting unwind target."
735 );
736 update_unwind_target(highest_block.unwrap_or_default());
737 }
738
739 let highest_tx = self.get_highest_static_file_tx(segment);
745 if let Some(highest_tx) = highest_tx {
746 let mut last_block = highest_block.unwrap_or_default();
747 loop {
748 if let Some(indices) = provider.block_body_indices(last_block)? {
749 if indices.last_tx_num() <= highest_tx {
750 break
751 }
752 } else {
753 break
757 }
758 if last_block == 0 {
759 break
760 }
761 last_block -= 1;
762
763 info!(
764 target: "reth::providers::static_file",
765 highest_block = self.get_highest_static_file_block(segment),
766 unwind_target = last_block,
767 ?segment,
768 "Setting unwind target."
769 );
770 highest_block = Some(last_block);
771 update_unwind_target(last_block);
772 }
773 }
774
775 if let Some(unwind) = match segment {
776 StaticFileSegment::Headers => self
777 .ensure_invariants::<_, tables::Headers<N::BlockHeader>>(
778 provider,
779 segment,
780 highest_block,
781 highest_block,
782 )?,
783 StaticFileSegment::Transactions => self
784 .ensure_invariants::<_, tables::Transactions<N::SignedTx>>(
785 provider,
786 segment,
787 highest_tx,
788 highest_block,
789 )?,
790 StaticFileSegment::Receipts => self
791 .ensure_invariants::<_, tables::Receipts<N::Receipt>>(
792 provider,
793 segment,
794 highest_tx,
795 highest_block,
796 )?,
797 StaticFileSegment::BlockMeta => self
798 .ensure_invariants::<_, tables::BlockBodyIndices>(
799 provider,
800 segment,
801 highest_block,
802 highest_block,
803 )?,
804 } {
805 update_unwind_target(unwind);
806 }
807 }
808
809 Ok(unwind_target.map(PipelineTarget::Unwind))
810 }
811
812 pub fn check_segment_consistency(&self, segment: StaticFileSegment) -> ProviderResult<()> {
815 if let Some(latest_block) = self.get_highest_static_file_block(segment) {
816 let file_path =
817 self.directory().join(segment.filename(&self.find_fixed_range(latest_block)));
818
819 let jar = NippyJar::<SegmentHeader>::load(&file_path).map_err(ProviderError::other)?;
820
821 NippyJarChecker::new(jar).check_consistency().map_err(ProviderError::other)?;
822 }
823 Ok(())
824 }
825
826 fn ensure_invariants<Provider, T: Table<Key = u64>>(
841 &self,
842 provider: &Provider,
843 segment: StaticFileSegment,
844 highest_static_file_entry: Option<u64>,
845 highest_static_file_block: Option<BlockNumber>,
846 ) -> ProviderResult<Option<BlockNumber>>
847 where
848 Provider: DBProvider + BlockReader + StageCheckpointReader,
849 {
850 let mut db_cursor = provider.tx_ref().cursor_read::<T>()?;
851
852 if let Some((db_first_entry, _)) = db_cursor.first()? {
853 if let (Some(highest_entry), Some(highest_block)) =
854 (highest_static_file_entry, highest_static_file_block)
855 {
856 if !(db_first_entry <= highest_entry || highest_entry + 1 == db_first_entry) {
860 info!(
861 target: "reth::providers::static_file",
862 ?db_first_entry,
863 ?highest_entry,
864 unwind_target = highest_block,
865 ?segment,
866 "Setting unwind target."
867 );
868 return Ok(Some(highest_block))
869 }
870 }
871
872 if let Some((db_last_entry, _)) = db_cursor.last()? {
873 if highest_static_file_entry
874 .is_none_or(|highest_entry| db_last_entry > highest_entry)
875 {
876 return Ok(None)
877 }
878 }
879 }
880
881 let highest_static_file_entry = highest_static_file_entry.unwrap_or_default();
882 let highest_static_file_block = highest_static_file_block.unwrap_or_default();
883
884 let checkpoint_block_number = provider
887 .get_stage_checkpoint(match segment {
888 StaticFileSegment::Headers => StageId::Headers,
889 StaticFileSegment::Transactions | StaticFileSegment::BlockMeta => StageId::Bodies,
890 StaticFileSegment::Receipts => StageId::Execution,
891 })?
892 .unwrap_or_default()
893 .block_number;
894
895 if checkpoint_block_number > highest_static_file_block {
897 info!(
898 target: "reth::providers::static_file",
899 checkpoint_block_number,
900 unwind_target = highest_static_file_block,
901 ?segment,
902 "Setting unwind target."
903 );
904 return Ok(Some(highest_static_file_block))
905 }
906
907 if checkpoint_block_number < highest_static_file_block {
911 info!(
912 target: "reth::providers",
913 ?segment,
914 from = highest_static_file_block,
915 to = checkpoint_block_number,
916 "Unwinding static file segment."
917 );
918 let mut writer = self.latest_writer(segment)?;
919 if segment.is_headers() {
920 writer.prune_headers(highest_static_file_block - checkpoint_block_number)?;
922 } else if let Some(block) = provider.block_body_indices(checkpoint_block_number)? {
923 let number = highest_static_file_entry - block.last_tx_num();
926 if segment.is_receipts() {
927 writer.prune_receipts(number, checkpoint_block_number)?;
928 } else {
929 writer.prune_transactions(number, checkpoint_block_number)?;
930 }
931 }
932 writer.commit()?;
933 }
934
935 Ok(None)
936 }
937
938 pub fn get_highest_static_file_block(&self, segment: StaticFileSegment) -> Option<BlockNumber> {
942 self.static_files_max_block.read().get(&segment).copied()
943 }
944
945 pub fn get_highest_static_file_tx(&self, segment: StaticFileSegment) -> Option<TxNumber> {
949 self.static_files_tx_index
950 .read()
951 .get(&segment)
952 .and_then(|index| index.last_key_value().map(|(last_tx, _)| *last_tx))
953 }
954
955 pub fn get_highest_static_files(&self) -> HighestStaticFiles {
957 HighestStaticFiles {
958 headers: self.get_highest_static_file_block(StaticFileSegment::Headers),
959 receipts: self.get_highest_static_file_block(StaticFileSegment::Receipts),
960 transactions: self.get_highest_static_file_block(StaticFileSegment::Transactions),
961 block_meta: self.get_highest_static_file_block(StaticFileSegment::BlockMeta),
962 }
963 }
964
965 pub fn find_static_file<T>(
968 &self,
969 segment: StaticFileSegment,
970 func: impl Fn(StaticFileJarProvider<'_, N>) -> ProviderResult<Option<T>>,
971 ) -> ProviderResult<Option<T>> {
972 if let Some(highest_block) = self.get_highest_static_file_block(segment) {
973 let mut range = self.find_fixed_range(highest_block);
974 while range.end() > 0 {
975 if let Some(res) = func(self.get_or_create_jar_provider(segment, &range)?)? {
976 return Ok(Some(res))
977 }
978 range = SegmentRangeInclusive::new(
979 range.start().saturating_sub(self.blocks_per_file),
980 range.end().saturating_sub(self.blocks_per_file),
981 );
982 }
983 }
984
985 Ok(None)
986 }
987
988 pub fn fetch_range_with_predicate<T, F, P>(
994 &self,
995 segment: StaticFileSegment,
996 range: Range<u64>,
997 mut get_fn: F,
998 mut predicate: P,
999 ) -> ProviderResult<Vec<T>>
1000 where
1001 F: FnMut(&mut StaticFileCursor<'_>, u64) -> ProviderResult<Option<T>>,
1002 P: FnMut(&T) -> bool,
1003 {
1004 let get_provider = |start: u64| {
1005 if segment.is_block_based() {
1006 self.get_segment_provider_from_block(segment, start, None)
1007 } else {
1008 self.get_segment_provider_from_transaction(segment, start, None)
1009 }
1010 };
1011
1012 let mut result = Vec::with_capacity((range.end - range.start).min(100) as usize);
1013 let mut provider = get_provider(range.start)?;
1014 let mut cursor = provider.cursor()?;
1015
1016 'outer: for number in range {
1018 let mut retrying = false;
1022
1023 'inner: loop {
1025 match get_fn(&mut cursor, number)? {
1026 Some(res) => {
1027 if !predicate(&res) {
1028 break 'outer
1029 }
1030 result.push(res);
1031 break 'inner
1032 }
1033 None => {
1034 if retrying {
1035 warn!(
1036 target: "provider::static_file",
1037 ?segment,
1038 ?number,
1039 "Could not find block or tx number on a range request"
1040 );
1041
1042 let err = if segment.is_block_based() {
1043 ProviderError::MissingStaticFileBlock(segment, number)
1044 } else {
1045 ProviderError::MissingStaticFileTx(segment, number)
1046 };
1047 return Err(err)
1048 }
1049 drop(cursor);
1054 drop(provider);
1055 provider = get_provider(number)?;
1056 cursor = provider.cursor()?;
1057 retrying = true;
1058 }
1059 }
1060 }
1061 }
1062
1063 Ok(result)
1064 }
1065
1066 pub fn fetch_range_iter<'a, T, F>(
1070 &'a self,
1071 segment: StaticFileSegment,
1072 range: Range<u64>,
1073 get_fn: F,
1074 ) -> ProviderResult<impl Iterator<Item = ProviderResult<T>> + 'a>
1075 where
1076 F: Fn(&mut StaticFileCursor<'_>, u64) -> ProviderResult<Option<T>> + 'a,
1077 T: std::fmt::Debug,
1078 {
1079 let get_provider = move |start: u64| {
1080 if segment.is_block_based() {
1081 self.get_segment_provider_from_block(segment, start, None)
1082 } else {
1083 self.get_segment_provider_from_transaction(segment, start, None)
1084 }
1085 };
1086
1087 let mut provider = Some(get_provider(range.start)?);
1088 Ok(range.filter_map(move |number| {
1089 match get_fn(&mut provider.as_ref().expect("qed").cursor().ok()?, number).transpose() {
1090 Some(result) => Some(result),
1091 None => {
1092 provider.take();
1097 provider = Some(get_provider(number).ok()?);
1098 get_fn(&mut provider.as_ref().expect("qed").cursor().ok()?, number).transpose()
1099 }
1100 }
1101 }))
1102 }
1103
1104 pub fn directory(&self) -> &Path {
1106 &self.path
1107 }
1108
1109 pub fn get_with_static_file_or_database<T, FS, FD>(
1119 &self,
1120 segment: StaticFileSegment,
1121 number: u64,
1122 fetch_from_static_file: FS,
1123 fetch_from_database: FD,
1124 ) -> ProviderResult<Option<T>>
1125 where
1126 FS: Fn(&Self) -> ProviderResult<Option<T>>,
1127 FD: Fn() -> ProviderResult<Option<T>>,
1128 {
1129 let static_file_upper_bound = if segment.is_block_based() {
1131 self.get_highest_static_file_block(segment)
1132 } else {
1133 self.get_highest_static_file_tx(segment)
1134 };
1135
1136 if static_file_upper_bound
1137 .is_some_and(|static_file_upper_bound| static_file_upper_bound >= number)
1138 {
1139 return fetch_from_static_file(self)
1140 }
1141 fetch_from_database()
1142 }
1143
1144 pub fn get_range_with_static_file_or_database<T, P, FS, FD>(
1156 &self,
1157 segment: StaticFileSegment,
1158 mut block_or_tx_range: Range<u64>,
1159 fetch_from_static_file: FS,
1160 mut fetch_from_database: FD,
1161 mut predicate: P,
1162 ) -> ProviderResult<Vec<T>>
1163 where
1164 FS: Fn(&Self, Range<u64>, &mut P) -> ProviderResult<Vec<T>>,
1165 FD: FnMut(Range<u64>, P) -> ProviderResult<Vec<T>>,
1166 P: FnMut(&T) -> bool,
1167 {
1168 let mut data = Vec::new();
1169
1170 if let Some(static_file_upper_bound) = if segment.is_block_based() {
1172 self.get_highest_static_file_block(segment)
1173 } else {
1174 self.get_highest_static_file_tx(segment)
1175 } {
1176 if block_or_tx_range.start <= static_file_upper_bound {
1177 let end = block_or_tx_range.end.min(static_file_upper_bound + 1);
1178 data.extend(fetch_from_static_file(
1179 self,
1180 block_or_tx_range.start..end,
1181 &mut predicate,
1182 )?);
1183 block_or_tx_range.start = end;
1184 }
1185 }
1186
1187 if block_or_tx_range.end > block_or_tx_range.start {
1188 data.extend(fetch_from_database(block_or_tx_range, predicate)?)
1189 }
1190
1191 Ok(data)
1192 }
1193
1194 #[cfg(any(test, feature = "test-utils"))]
1196 pub fn path(&self) -> &Path {
1197 &self.path
1198 }
1199
1200 #[cfg(any(test, feature = "test-utils"))]
1202 pub fn tx_index(&self) -> &RwLock<SegmentRanges> {
1203 &self.static_files_tx_index
1204 }
1205}
1206
1207pub trait StaticFileWriter {
1209 type Primitives: Send + Sync + 'static;
1211
1212 fn get_writer(
1214 &self,
1215 block: BlockNumber,
1216 segment: StaticFileSegment,
1217 ) -> ProviderResult<StaticFileProviderRWRefMut<'_, Self::Primitives>>;
1218
1219 fn latest_writer(
1222 &self,
1223 segment: StaticFileSegment,
1224 ) -> ProviderResult<StaticFileProviderRWRefMut<'_, Self::Primitives>>;
1225
1226 fn commit(&self) -> ProviderResult<()>;
1228}
1229
1230impl<N: NodePrimitives> StaticFileWriter for StaticFileProvider<N> {
1231 type Primitives = N;
1232
1233 fn get_writer(
1234 &self,
1235 block: BlockNumber,
1236 segment: StaticFileSegment,
1237 ) -> ProviderResult<StaticFileProviderRWRefMut<'_, Self::Primitives>> {
1238 if self.access.is_read_only() {
1239 return Err(ProviderError::ReadOnlyStaticFileAccess)
1240 }
1241
1242 trace!(target: "provider::static_file", ?block, ?segment, "Getting static file writer.");
1243 self.writers.get_or_create(segment, || {
1244 StaticFileProviderRW::new(segment, block, Arc::downgrade(&self.0), self.metrics.clone())
1245 })
1246 }
1247
1248 fn latest_writer(
1249 &self,
1250 segment: StaticFileSegment,
1251 ) -> ProviderResult<StaticFileProviderRWRefMut<'_, Self::Primitives>> {
1252 self.get_writer(self.get_highest_static_file_block(segment).unwrap_or_default(), segment)
1253 }
1254
1255 fn commit(&self) -> ProviderResult<()> {
1256 self.writers.commit()
1257 }
1258}
1259
1260impl<N: NodePrimitives<BlockHeader: Value>> HeaderProvider for StaticFileProvider<N> {
1261 type Header = N::BlockHeader;
1262
1263 fn header(&self, block_hash: &BlockHash) -> ProviderResult<Option<Self::Header>> {
1264 self.find_static_file(StaticFileSegment::Headers, |jar_provider| {
1265 Ok(jar_provider
1266 .cursor()?
1267 .get_two::<HeaderWithHashMask<Self::Header>>(block_hash.into())?
1268 .and_then(|(header, hash)| {
1269 if &hash == block_hash {
1270 return Some(header)
1271 }
1272 None
1273 }))
1274 })
1275 }
1276
1277 fn header_by_number(&self, num: BlockNumber) -> ProviderResult<Option<Self::Header>> {
1278 self.get_segment_provider_from_block(StaticFileSegment::Headers, num, None)
1279 .and_then(|provider| provider.header_by_number(num))
1280 .or_else(|err| {
1281 if let ProviderError::MissingStaticFileBlock(_, _) = err {
1282 Ok(None)
1283 } else {
1284 Err(err)
1285 }
1286 })
1287 }
1288
1289 fn header_td(&self, block_hash: &BlockHash) -> ProviderResult<Option<U256>> {
1290 self.find_static_file(StaticFileSegment::Headers, |jar_provider| {
1291 Ok(jar_provider
1292 .cursor()?
1293 .get_two::<TDWithHashMask>(block_hash.into())?
1294 .and_then(|(td, hash)| (&hash == block_hash).then_some(td.0)))
1295 })
1296 }
1297
1298 fn header_td_by_number(&self, num: BlockNumber) -> ProviderResult<Option<U256>> {
1299 self.get_segment_provider_from_block(StaticFileSegment::Headers, num, None)
1300 .and_then(|provider| provider.header_td_by_number(num))
1301 .or_else(|err| {
1302 if let ProviderError::MissingStaticFileBlock(_, _) = err {
1303 Ok(None)
1304 } else {
1305 Err(err)
1306 }
1307 })
1308 }
1309
1310 fn headers_range(
1311 &self,
1312 range: impl RangeBounds<BlockNumber>,
1313 ) -> ProviderResult<Vec<Self::Header>> {
1314 self.fetch_range_with_predicate(
1315 StaticFileSegment::Headers,
1316 to_range(range),
1317 |cursor, number| cursor.get_one::<HeaderMask<Self::Header>>(number.into()),
1318 |_| true,
1319 )
1320 }
1321
1322 fn sealed_header(
1323 &self,
1324 num: BlockNumber,
1325 ) -> ProviderResult<Option<SealedHeader<Self::Header>>> {
1326 self.get_segment_provider_from_block(StaticFileSegment::Headers, num, None)
1327 .and_then(|provider| provider.sealed_header(num))
1328 .or_else(|err| {
1329 if let ProviderError::MissingStaticFileBlock(_, _) = err {
1330 Ok(None)
1331 } else {
1332 Err(err)
1333 }
1334 })
1335 }
1336
1337 fn sealed_headers_while(
1338 &self,
1339 range: impl RangeBounds<BlockNumber>,
1340 predicate: impl FnMut(&SealedHeader<Self::Header>) -> bool,
1341 ) -> ProviderResult<Vec<SealedHeader<Self::Header>>> {
1342 self.fetch_range_with_predicate(
1343 StaticFileSegment::Headers,
1344 to_range(range),
1345 |cursor, number| {
1346 Ok(cursor
1347 .get_two::<HeaderWithHashMask<Self::Header>>(number.into())?
1348 .map(|(header, hash)| SealedHeader::new(header, hash)))
1349 },
1350 predicate,
1351 )
1352 }
1353}
1354
1355impl<N: NodePrimitives> BlockHashReader for StaticFileProvider<N> {
1356 fn block_hash(&self, num: u64) -> ProviderResult<Option<B256>> {
1357 self.get_segment_provider_from_block(StaticFileSegment::Headers, num, None)?.block_hash(num)
1358 }
1359
1360 fn canonical_hashes_range(
1361 &self,
1362 start: BlockNumber,
1363 end: BlockNumber,
1364 ) -> ProviderResult<Vec<B256>> {
1365 self.fetch_range_with_predicate(
1366 StaticFileSegment::Headers,
1367 start..end,
1368 |cursor, number| cursor.get_one::<BlockHashMask>(number.into()),
1369 |_| true,
1370 )
1371 }
1372}
1373
1374impl<N: NodePrimitives<SignedTx: Value + SignedTransaction, Receipt: Value>> ReceiptProvider
1375 for StaticFileProvider<N>
1376{
1377 type Receipt = N::Receipt;
1378
1379 fn receipt(&self, num: TxNumber) -> ProviderResult<Option<Self::Receipt>> {
1380 self.get_segment_provider_from_transaction(StaticFileSegment::Receipts, num, None)
1381 .and_then(|provider| provider.receipt(num))
1382 .or_else(|err| {
1383 if let ProviderError::MissingStaticFileTx(_, _) = err {
1384 Ok(None)
1385 } else {
1386 Err(err)
1387 }
1388 })
1389 }
1390
1391 fn receipt_by_hash(&self, hash: TxHash) -> ProviderResult<Option<Self::Receipt>> {
1392 if let Some(num) = self.transaction_id(hash)? {
1393 return self.receipt(num)
1394 }
1395 Ok(None)
1396 }
1397
1398 fn receipts_by_block(
1399 &self,
1400 _block: BlockHashOrNumber,
1401 ) -> ProviderResult<Option<Vec<Self::Receipt>>> {
1402 unreachable!()
1403 }
1404
1405 fn receipts_by_tx_range(
1406 &self,
1407 range: impl RangeBounds<TxNumber>,
1408 ) -> ProviderResult<Vec<Self::Receipt>> {
1409 self.fetch_range_with_predicate(
1410 StaticFileSegment::Receipts,
1411 to_range(range),
1412 |cursor, number| cursor.get_one::<ReceiptMask<Self::Receipt>>(number.into()),
1413 |_| true,
1414 )
1415 }
1416}
1417
1418impl<N: FullNodePrimitives<SignedTx: Value, Receipt: Value, BlockHeader: Value>>
1419 TransactionsProviderExt for StaticFileProvider<N>
1420{
1421 fn transaction_hashes_by_range(
1422 &self,
1423 tx_range: Range<TxNumber>,
1424 ) -> ProviderResult<Vec<(TxHash, TxNumber)>> {
1425 let tx_range_size = (tx_range.end - tx_range.start) as usize;
1426
1427 let chunk_size = 100;
1431
1432 let chunks = tx_range
1434 .clone()
1435 .step_by(chunk_size)
1436 .map(|start| start..std::cmp::min(start + chunk_size as u64, tx_range.end));
1437 let mut channels = Vec::with_capacity(tx_range_size.div_ceil(chunk_size));
1438
1439 for chunk_range in chunks {
1440 let (channel_tx, channel_rx) = mpsc::channel();
1441 channels.push(channel_rx);
1442
1443 let manager = self.clone();
1444
1445 rayon::spawn(move || {
1449 let mut rlp_buf = Vec::with_capacity(128);
1450 let _ = manager.fetch_range_with_predicate(
1451 StaticFileSegment::Transactions,
1452 chunk_range,
1453 |cursor, number| {
1454 Ok(cursor
1455 .get_one::<TransactionMask<Self::Transaction>>(number.into())?
1456 .map(|transaction| {
1457 rlp_buf.clear();
1458 let _ = channel_tx
1459 .send(calculate_hash((number, transaction), &mut rlp_buf));
1460 }))
1461 },
1462 |_| true,
1463 );
1464 });
1465 }
1466
1467 let mut tx_list = Vec::with_capacity(tx_range_size);
1468
1469 for channel in channels {
1471 while let Ok(tx) = channel.recv() {
1472 let (tx_hash, tx_id) = tx.map_err(|boxed| *boxed)?;
1473 tx_list.push((tx_hash, tx_id));
1474 }
1475 }
1476
1477 Ok(tx_list)
1478 }
1479}
1480
1481impl<N: NodePrimitives<SignedTx: Decompress + SignedTransaction>> TransactionsProvider
1482 for StaticFileProvider<N>
1483{
1484 type Transaction = N::SignedTx;
1485
1486 fn transaction_id(&self, tx_hash: TxHash) -> ProviderResult<Option<TxNumber>> {
1487 self.find_static_file(StaticFileSegment::Transactions, |jar_provider| {
1488 let mut cursor = jar_provider.cursor()?;
1489 if cursor
1490 .get_one::<TransactionMask<Self::Transaction>>((&tx_hash).into())?
1491 .and_then(|tx| (tx.trie_hash() == tx_hash).then_some(tx))
1492 .is_some()
1493 {
1494 Ok(cursor.number())
1495 } else {
1496 Ok(None)
1497 }
1498 })
1499 }
1500
1501 fn transaction_by_id(&self, num: TxNumber) -> ProviderResult<Option<Self::Transaction>> {
1502 self.get_segment_provider_from_transaction(StaticFileSegment::Transactions, num, None)
1503 .and_then(|provider| provider.transaction_by_id(num))
1504 .or_else(|err| {
1505 if let ProviderError::MissingStaticFileTx(_, _) = err {
1506 Ok(None)
1507 } else {
1508 Err(err)
1509 }
1510 })
1511 }
1512
1513 fn transaction_by_id_unhashed(
1514 &self,
1515 num: TxNumber,
1516 ) -> ProviderResult<Option<Self::Transaction>> {
1517 self.get_segment_provider_from_transaction(StaticFileSegment::Transactions, num, None)
1518 .and_then(|provider| provider.transaction_by_id_unhashed(num))
1519 .or_else(|err| {
1520 if let ProviderError::MissingStaticFileTx(_, _) = err {
1521 Ok(None)
1522 } else {
1523 Err(err)
1524 }
1525 })
1526 }
1527
1528 fn transaction_by_hash(&self, hash: TxHash) -> ProviderResult<Option<Self::Transaction>> {
1529 self.find_static_file(StaticFileSegment::Transactions, |jar_provider| {
1530 Ok(jar_provider
1531 .cursor()?
1532 .get_one::<TransactionMask<Self::Transaction>>((&hash).into())?
1533 .and_then(|tx| (tx.trie_hash() == hash).then_some(tx)))
1534 })
1535 }
1536
1537 fn transaction_by_hash_with_meta(
1538 &self,
1539 _hash: TxHash,
1540 ) -> ProviderResult<Option<(Self::Transaction, TransactionMeta)>> {
1541 Err(ProviderError::UnsupportedProvider)
1543 }
1544
1545 fn transaction_block(&self, _id: TxNumber) -> ProviderResult<Option<BlockNumber>> {
1546 Err(ProviderError::UnsupportedProvider)
1548 }
1549
1550 fn transactions_by_block(
1551 &self,
1552 _block_id: BlockHashOrNumber,
1553 ) -> ProviderResult<Option<Vec<Self::Transaction>>> {
1554 Err(ProviderError::UnsupportedProvider)
1556 }
1557
1558 fn transactions_by_block_range(
1559 &self,
1560 _range: impl RangeBounds<BlockNumber>,
1561 ) -> ProviderResult<Vec<Vec<Self::Transaction>>> {
1562 Err(ProviderError::UnsupportedProvider)
1564 }
1565
1566 fn transactions_by_tx_range(
1567 &self,
1568 range: impl RangeBounds<TxNumber>,
1569 ) -> ProviderResult<Vec<Self::Transaction>> {
1570 self.fetch_range_with_predicate(
1571 StaticFileSegment::Transactions,
1572 to_range(range),
1573 |cursor, number| cursor.get_one::<TransactionMask<Self::Transaction>>(number.into()),
1574 |_| true,
1575 )
1576 }
1577
1578 fn senders_by_tx_range(
1579 &self,
1580 range: impl RangeBounds<TxNumber>,
1581 ) -> ProviderResult<Vec<Address>> {
1582 let txes = self.transactions_by_tx_range(range)?;
1583 Ok(reth_primitives_traits::transaction::recover::recover_signers(&txes)?)
1584 }
1585
1586 fn transaction_sender(&self, id: TxNumber) -> ProviderResult<Option<Address>> {
1587 match self.transaction_by_id_unhashed(id)? {
1588 Some(tx) => Ok(tx.recover_signer().ok()),
1589 None => Ok(None),
1590 }
1591 }
1592}
1593
1594impl<N: NodePrimitives> BlockNumReader for StaticFileProvider<N> {
1597 fn chain_info(&self) -> ProviderResult<ChainInfo> {
1598 Err(ProviderError::UnsupportedProvider)
1600 }
1601
1602 fn best_block_number(&self) -> ProviderResult<BlockNumber> {
1603 Err(ProviderError::UnsupportedProvider)
1605 }
1606
1607 fn last_block_number(&self) -> ProviderResult<BlockNumber> {
1608 Err(ProviderError::UnsupportedProvider)
1610 }
1611
1612 fn block_number(&self, _hash: B256) -> ProviderResult<Option<BlockNumber>> {
1613 Err(ProviderError::UnsupportedProvider)
1615 }
1616}
1617
1618impl<N: FullNodePrimitives<SignedTx: Value, Receipt: Value, BlockHeader: Value>> BlockReader
1619 for StaticFileProvider<N>
1620{
1621 type Block = N::Block;
1622
1623 fn find_block_by_hash(
1624 &self,
1625 _hash: B256,
1626 _source: BlockSource,
1627 ) -> ProviderResult<Option<Self::Block>> {
1628 Err(ProviderError::UnsupportedProvider)
1630 }
1631
1632 fn block(&self, _id: BlockHashOrNumber) -> ProviderResult<Option<Self::Block>> {
1633 Err(ProviderError::UnsupportedProvider)
1635 }
1636
1637 fn pending_block(&self) -> ProviderResult<Option<SealedBlock<Self::Block>>> {
1638 Err(ProviderError::UnsupportedProvider)
1640 }
1641
1642 fn pending_block_with_senders(&self) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
1643 Err(ProviderError::UnsupportedProvider)
1645 }
1646
1647 fn pending_block_and_receipts(
1648 &self,
1649 ) -> ProviderResult<Option<(SealedBlock<Self::Block>, Vec<Self::Receipt>)>> {
1650 Err(ProviderError::UnsupportedProvider)
1652 }
1653
1654 fn recovered_block(
1655 &self,
1656 _id: BlockHashOrNumber,
1657 _transaction_kind: TransactionVariant,
1658 ) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
1659 Err(ProviderError::UnsupportedProvider)
1661 }
1662
1663 fn sealed_block_with_senders(
1664 &self,
1665 _id: BlockHashOrNumber,
1666 _transaction_kind: TransactionVariant,
1667 ) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
1668 Err(ProviderError::UnsupportedProvider)
1670 }
1671
1672 fn block_range(&self, _range: RangeInclusive<BlockNumber>) -> ProviderResult<Vec<Self::Block>> {
1673 Err(ProviderError::UnsupportedProvider)
1675 }
1676
1677 fn block_with_senders_range(
1678 &self,
1679 _range: RangeInclusive<BlockNumber>,
1680 ) -> ProviderResult<Vec<RecoveredBlock<Self::Block>>> {
1681 Err(ProviderError::UnsupportedProvider)
1682 }
1683
1684 fn recovered_block_range(
1685 &self,
1686 _range: RangeInclusive<BlockNumber>,
1687 ) -> ProviderResult<Vec<RecoveredBlock<Self::Block>>> {
1688 Err(ProviderError::UnsupportedProvider)
1689 }
1690}
1691
1692impl<N: NodePrimitives> WithdrawalsProvider for StaticFileProvider<N> {
1693 fn withdrawals_by_block(
1694 &self,
1695 id: BlockHashOrNumber,
1696 timestamp: u64,
1697 ) -> ProviderResult<Option<Withdrawals>> {
1698 if let Some(num) = id.as_number() {
1699 return self
1700 .get_segment_provider_from_block(StaticFileSegment::BlockMeta, num, None)
1701 .and_then(|provider| provider.withdrawals_by_block(id, timestamp))
1702 .or_else(|err| {
1703 if let ProviderError::MissingStaticFileBlock(_, _) = err {
1704 Ok(None)
1705 } else {
1706 Err(err)
1707 }
1708 })
1709 }
1710 Err(ProviderError::UnsupportedProvider)
1712 }
1713}
1714
1715impl<N: FullNodePrimitives<BlockHeader: Value>> OmmersProvider for StaticFileProvider<N> {
1716 fn ommers(&self, id: BlockHashOrNumber) -> ProviderResult<Option<Vec<Self::Header>>> {
1717 if let Some(num) = id.as_number() {
1718 return self
1719 .get_segment_provider_from_block(StaticFileSegment::BlockMeta, num, None)
1720 .and_then(|provider| provider.ommers(id))
1721 .or_else(|err| {
1722 if let ProviderError::MissingStaticFileBlock(_, _) = err {
1723 Ok(None)
1724 } else {
1725 Err(err)
1726 }
1727 })
1728 }
1729 Err(ProviderError::UnsupportedProvider)
1731 }
1732}
1733
1734impl<N: NodePrimitives> BlockBodyIndicesProvider for StaticFileProvider<N> {
1735 fn block_body_indices(&self, num: u64) -> ProviderResult<Option<StoredBlockBodyIndices>> {
1736 self.get_segment_provider_from_block(StaticFileSegment::BlockMeta, num, None)
1737 .and_then(|provider| provider.block_body_indices(num))
1738 .or_else(|err| {
1739 if let ProviderError::MissingStaticFileBlock(_, _) = err {
1740 Ok(None)
1741 } else {
1742 Err(err)
1743 }
1744 })
1745 }
1746
1747 fn block_body_indices_range(
1748 &self,
1749 range: RangeInclusive<BlockNumber>,
1750 ) -> ProviderResult<Vec<StoredBlockBodyIndices>> {
1751 self.fetch_range_with_predicate(
1752 StaticFileSegment::BlockMeta,
1753 *range.start()..*range.end() + 1,
1754 |cursor, number| cursor.get_one::<BodyIndicesMask>(number.into()),
1755 |_| true,
1756 )
1757 }
1758}
1759
1760impl<N: NodePrimitives> StatsReader for StaticFileProvider<N> {
1761 fn count_entries<T: Table>(&self) -> ProviderResult<usize> {
1762 match T::NAME {
1763 tables::CanonicalHeaders::NAME |
1764 tables::Headers::<Header>::NAME |
1765 tables::HeaderTerminalDifficulties::NAME => Ok(self
1766 .get_highest_static_file_block(StaticFileSegment::Headers)
1767 .map(|block| block + 1)
1768 .unwrap_or_default()
1769 as usize),
1770 tables::Receipts::<Receipt>::NAME => Ok(self
1771 .get_highest_static_file_tx(StaticFileSegment::Receipts)
1772 .map(|receipts| receipts + 1)
1773 .unwrap_or_default() as usize),
1774 tables::Transactions::<TransactionSigned>::NAME => Ok(self
1775 .get_highest_static_file_tx(StaticFileSegment::Transactions)
1776 .map(|txs| txs + 1)
1777 .unwrap_or_default()
1778 as usize),
1779 _ => Err(ProviderError::UnsupportedProvider),
1780 }
1781 }
1782}
1783
1784#[inline]
1786fn calculate_hash<T>(
1787 entry: (TxNumber, T),
1788 rlp_buf: &mut Vec<u8>,
1789) -> Result<(B256, TxNumber), Box<ProviderError>>
1790where
1791 T: Encodable2718,
1792{
1793 let (tx_id, tx) = entry;
1794 tx.encode_2718(rlp_buf);
1795 Ok((keccak256(rlp_buf), tx_id))
1796}