1use super::{
2 metrics::StaticFileProviderMetrics, writer::StaticFileWriters, LoadedJar,
3 StaticFileJarProvider, StaticFileProviderRW, StaticFileProviderRWRefMut,
4};
5use crate::{
6 changeset_walker::{StaticFileAccountChangesetWalker, StaticFileStorageChangesetWalker},
7 to_range, BlockHashReader, BlockNumReader, BlockReader, BlockSource, EitherWriter,
8 EitherWriterDestination, HeaderProvider, ReceiptProvider, StageCheckpointReader, StatsReader,
9 TransactionVariant, TransactionsProvider, TransactionsProviderExt,
10};
11use alloy_consensus::{transaction::TransactionMeta, Header};
12use alloy_eips::{eip2718::Encodable2718, BlockHashOrNumber};
13use alloy_primitives::{b256, keccak256, Address, BlockHash, BlockNumber, TxHash, TxNumber, B256};
14use notify::{RecommendedWatcher, RecursiveMode, Watcher};
15use parking_lot::RwLock;
16use reth_chain_state::ExecutedBlock;
17use reth_chainspec::{ChainInfo, ChainSpecProvider, EthChainSpec, NamedChain};
18use reth_db::{
19 lockfile::StorageLock,
20 static_file::{
21 iter_static_files, BlockHashMask, HeaderMask, HeaderWithHashMask, ReceiptMask,
22 StaticFileCursor, StorageChangesetMask, TransactionMask, TransactionSenderMask,
23 },
24};
25use reth_db_api::{
26 cursor::DbCursorRO,
27 models::{AccountBeforeTx, BlockNumberAddress, StorageBeforeTx, StoredBlockBodyIndices},
28 table::{Decompress, Table, Value},
29 tables,
30 transaction::DbTx,
31};
32use reth_ethereum_primitives::{Receipt, TransactionSigned};
33use reth_nippy_jar::{NippyJar, NippyJarChecker, CONFIG_FILE_EXTENSION};
34use reth_node_types::NodePrimitives;
35use reth_primitives_traits::{
36 dashmap::DashMap, AlloyBlockHeader as _, BlockBody as _, RecoveredBlock, SealedHeader,
37 SignedTransaction, StorageEntry,
38};
39use reth_prune_types::PruneSegment;
40use reth_stages_types::PipelineTarget;
41use reth_static_file_types::{
42 find_fixed_range, ChangesetOffsetReader, HighestStaticFiles, SegmentHeader,
43 SegmentRangeInclusive, StaticFileMap, StaticFileSegment, DEFAULT_BLOCKS_PER_STATIC_FILE,
44};
45use reth_storage_api::{
46 BlockBodyIndicesProvider, ChangeSetReader, DBProvider, PruneCheckpointReader,
47 StorageChangeSetReader, StorageSettingsCache,
48};
49use reth_storage_errors::provider::{ProviderError, ProviderResult, StaticFileWriterError};
50use std::{
51 collections::BTreeMap,
52 fmt::Debug,
53 ops::{Bound, Deref, Range, RangeBounds, RangeInclusive},
54 path::{Path, PathBuf},
55 sync::{atomic::AtomicU64, mpsc, Arc},
56};
57use tracing::{debug, info, info_span, instrument, trace, warn};
58
59type SegmentRanges = BTreeMap<u64, SegmentRangeInclusive>;
62
63#[derive(Debug, Default, PartialEq, Eq)]
65pub enum StaticFileAccess {
66 #[default]
68 RO,
69 RW,
71}
72
73impl StaticFileAccess {
74 pub const fn is_read_only(&self) -> bool {
76 matches!(self, Self::RO)
77 }
78
79 pub const fn is_read_write(&self) -> bool {
81 matches!(self, Self::RW)
82 }
83}
84
85#[derive(Debug, Clone, Copy, Default)]
89pub struct StaticFileWriteCtx {
90 pub write_senders: bool,
92 pub write_receipts: bool,
94 pub write_account_changesets: bool,
96 pub write_storage_changesets: bool,
98 pub tip: BlockNumber,
100 pub receipts_prune_mode: Option<reth_prune_types::PruneMode>,
102 pub receipts_prunable: bool,
104}
105
106#[derive(Debug)]
115pub struct StaticFileProvider<N>(pub(crate) Arc<StaticFileProviderInner<N>>);
116
117impl<N> Clone for StaticFileProvider<N> {
118 fn clone(&self) -> Self {
119 Self(self.0.clone())
120 }
121}
122
123#[derive(Debug)]
125pub struct StaticFileProviderBuilder<P> {
126 access: StaticFileAccess,
127 use_metrics: bool,
128 blocks_per_file: StaticFileMap<u64>,
129 path: P,
130 genesis_block_number: u64,
131}
132
133impl<P: AsRef<Path>> StaticFileProviderBuilder<P> {
134 pub fn read_write(path: P) -> Self {
136 Self {
137 path,
138 access: StaticFileAccess::RW,
139 blocks_per_file: Default::default(),
140 use_metrics: false,
141 genesis_block_number: 0,
142 }
143 }
144
145 pub fn read_only(path: P) -> Self {
147 Self {
148 path,
149 access: StaticFileAccess::RO,
150 blocks_per_file: Default::default(),
151 use_metrics: false,
152 genesis_block_number: 0,
153 }
154 }
155
156 pub fn with_blocks_per_file_for_segments(
168 mut self,
169 segments: &<StaticFileMap<u64> as Deref>::Target,
170 ) -> Self {
171 for (segment, &blocks_per_file) in segments {
172 self.blocks_per_file.insert(segment, blocks_per_file);
173 }
174 self
175 }
176
177 pub fn with_blocks_per_file(mut self, blocks_per_file: u64) -> Self {
179 for segment in StaticFileSegment::iter() {
180 self.blocks_per_file.insert(segment, blocks_per_file);
181 }
182 self
183 }
184
185 pub fn with_blocks_per_file_for_segment(
187 mut self,
188 segment: StaticFileSegment,
189 blocks_per_file: u64,
190 ) -> Self {
191 self.blocks_per_file.insert(segment, blocks_per_file);
192 self
193 }
194
195 pub const fn with_metrics(mut self) -> Self {
197 self.use_metrics = true;
198 self
199 }
200
201 pub const fn with_genesis_block_number(mut self, genesis_block_number: u64) -> Self {
214 self.genesis_block_number = genesis_block_number;
215 self
216 }
217
218 pub fn build<N: NodePrimitives>(self) -> ProviderResult<StaticFileProvider<N>> {
220 let mut provider = StaticFileProviderInner::new(self.path, self.access)?;
221 if self.use_metrics {
222 provider.metrics = Some(Arc::new(StaticFileProviderMetrics::default()));
223 }
224
225 for (segment, blocks_per_file) in *self.blocks_per_file {
226 provider.blocks_per_file.insert(segment, blocks_per_file);
227 }
228 provider.genesis_block_number = self.genesis_block_number;
229
230 let provider = StaticFileProvider(Arc::new(provider));
231 provider.initialize_index()?;
232 Ok(provider)
233 }
234}
235
236impl<N: NodePrimitives> StaticFileProvider<N> {
237 fn new(path: impl AsRef<Path>, access: StaticFileAccess) -> ProviderResult<Self> {
239 let provider = Self(Arc::new(StaticFileProviderInner::new(path, access)?));
240 provider.initialize_index()?;
241 Ok(provider)
242 }
243}
244
245impl<N: NodePrimitives> StaticFileProvider<N> {
246 pub fn read_only(path: impl AsRef<Path>, watch_directory: bool) -> ProviderResult<Self> {
256 let provider = Self::new(path, StaticFileAccess::RO)?;
257
258 if watch_directory {
259 provider.watch_directory();
260 }
261
262 Ok(provider)
263 }
264
265 pub fn read_write(path: impl AsRef<Path>) -> ProviderResult<Self> {
267 Self::new(path, StaticFileAccess::RW)
268 }
269
270 pub fn watch_directory(&self) {
276 let provider = self.clone();
277 reth_tasks::spawn_os_thread("sf-watch", move || {
278 let (tx, rx) = std::sync::mpsc::channel();
279 let mut watcher = RecommendedWatcher::new(
280 move |res| tx.send(res).unwrap(),
281 notify::Config::default(),
282 )
283 .expect("failed to create watcher");
284
285 watcher
286 .watch(&provider.path, RecursiveMode::NonRecursive)
287 .expect("failed to watch path");
288
289 let mut last_event_timestamp = None;
291
292 while let Ok(res) = rx.recv() {
293 match res {
294 Ok(event) => {
295 if !matches!(
297 event.kind,
298 notify::EventKind::Modify(_) |
299 notify::EventKind::Create(_) |
300 notify::EventKind::Remove(_)
301 ) {
302 continue;
303 }
304
305 for segment in event.paths {
310 if segment
312 .extension()
313 .is_none_or(|s| s.to_str() != Some(CONFIG_FILE_EXTENSION))
314 {
315 continue;
316 }
317
318 if StaticFileSegment::parse_filename(
320 &segment.file_stem().expect("qed").to_string_lossy(),
321 )
322 .is_none()
323 {
324 continue;
325 }
326
327 if let Ok(current_modified_timestamp) =
330 std::fs::metadata(&segment).and_then(|m| m.modified())
331 {
332 if last_event_timestamp.is_some_and(|last_timestamp| {
333 last_timestamp >= current_modified_timestamp
334 }) {
335 continue;
336 }
337 last_event_timestamp = Some(current_modified_timestamp);
338 }
339
340 info!(target: "providers::static_file", updated_file = ?segment.file_stem(), "re-initializing static file provider index");
341 if let Err(err) = provider.initialize_index() {
342 warn!(target: "providers::static_file", "failed to re-initialize index: {err}");
343 }
344 break;
345 }
346 }
347
348 Err(err) => warn!(target: "providers::watcher", "watch error: {err:?}"),
349 }
350 }
351 });
352 }
353}
354
355impl<N: NodePrimitives> Deref for StaticFileProvider<N> {
356 type Target = StaticFileProviderInner<N>;
357
358 fn deref(&self) -> &Self::Target {
359 &self.0
360 }
361}
362
363#[derive(Debug)]
365pub struct StaticFileProviderInner<N> {
366 map: DashMap<(BlockNumber, StaticFileSegment), LoadedJar>,
369 indexes: RwLock<StaticFileMap<StaticFileSegmentIndex>>,
371 earliest_history_height: AtomicU64,
382 path: PathBuf,
384 writers: StaticFileWriters<N>,
386 metrics: Option<Arc<StaticFileProviderMetrics>>,
388 access: StaticFileAccess,
390 blocks_per_file: StaticFileMap<u64>,
392 _lock_file: Option<StorageLock>,
394 genesis_block_number: u64,
396}
397
398impl<N: NodePrimitives> StaticFileProviderInner<N> {
399 fn new(path: impl AsRef<Path>, access: StaticFileAccess) -> ProviderResult<Self> {
401 let _lock_file = if access.is_read_write() {
402 StorageLock::try_acquire(path.as_ref()).map_err(ProviderError::other)?.into()
403 } else {
404 None
405 };
406
407 let mut blocks_per_file = StaticFileMap::default();
408 for segment in StaticFileSegment::iter() {
409 blocks_per_file.insert(segment, DEFAULT_BLOCKS_PER_STATIC_FILE);
410 }
411
412 let provider = Self {
413 map: Default::default(),
414 indexes: Default::default(),
415 writers: Default::default(),
416 earliest_history_height: Default::default(),
417 path: path.as_ref().to_path_buf(),
418 metrics: None,
419 access,
420 blocks_per_file,
421 _lock_file,
422 genesis_block_number: 0,
423 };
424
425 Ok(provider)
426 }
427
428 pub const fn is_read_only(&self) -> bool {
429 self.access.is_read_only()
430 }
431
432 pub fn find_fixed_range_with_block_index(
441 &self,
442 segment: StaticFileSegment,
443 block_index: Option<&SegmentRanges>,
444 block: BlockNumber,
445 ) -> SegmentRangeInclusive {
446 let blocks_per_file =
447 self.blocks_per_file.get(segment).copied().unwrap_or(DEFAULT_BLOCKS_PER_STATIC_FILE);
448
449 if let Some(block_index) = block_index {
450 if let Some((_, range)) = block_index.iter().find(|(max_block, _)| block <= **max_block)
452 {
453 return *range;
455 } else if let Some((_, range)) = block_index.last_key_value() {
456 let blocks_after_last_range = block - range.end();
462 let segments_to_skip = (blocks_after_last_range - 1) / blocks_per_file;
463 let start = range.end() + 1 + segments_to_skip * blocks_per_file;
464 return SegmentRangeInclusive::new(start, start + blocks_per_file - 1);
465 }
466 }
467 find_fixed_range(block, blocks_per_file)
470 }
471
472 pub fn find_fixed_range(
485 &self,
486 segment: StaticFileSegment,
487 block: BlockNumber,
488 ) -> SegmentRangeInclusive {
489 self.find_fixed_range_with_block_index(
490 segment,
491 self.indexes.read().get(segment).map(|index| &index.expected_block_ranges_by_max_block),
492 block,
493 )
494 }
495
496 pub const fn genesis_block_number(&self) -> u64 {
498 self.genesis_block_number
499 }
500}
501
502impl<N: NodePrimitives> StaticFileProvider<N> {
503 pub fn report_metrics(&self) -> ProviderResult<()> {
508 let Some(metrics) = &self.metrics else { return Ok(()) };
509
510 let static_files = iter_static_files(&self.path).map_err(ProviderError::other)?;
511 for (segment, headers) in &*static_files {
512 let mut entries = 0;
513 let mut size = 0;
514
515 for (block_range, _) in headers {
516 let fixed_block_range = self.find_fixed_range(segment, block_range.start());
517 let jar_provider = self
518 .get_segment_provider_for_range(segment, || Some(fixed_block_range), None)?
519 .ok_or_else(|| {
520 ProviderError::MissingStaticFileBlock(segment, block_range.start())
521 })?;
522
523 entries += jar_provider.rows();
524 size += jar_provider.size() as u64;
525 }
526
527 metrics.record_segment(segment, size, headers.len(), entries);
528 }
529
530 Ok(())
531 }
532
533 #[instrument(level = "debug", target = "providers::static_file", skip_all)]
535 fn write_headers(
536 w: &mut StaticFileProviderRWRefMut<'_, N>,
537 blocks: &[ExecutedBlock<N>],
538 ) -> ProviderResult<()> {
539 for block in blocks {
540 let b = block.recovered_block();
541 w.append_header(b.header(), &b.hash())?;
542 }
543 Ok(())
544 }
545
546 #[instrument(level = "debug", target = "providers::static_file", skip_all)]
548 fn write_transactions(
549 w: &mut StaticFileProviderRWRefMut<'_, N>,
550 blocks: &[ExecutedBlock<N>],
551 tx_nums: &[TxNumber],
552 ) -> ProviderResult<()> {
553 for (block, &first_tx) in blocks.iter().zip(tx_nums) {
554 let b = block.recovered_block();
555 w.increment_block(b.number())?;
556 for (i, tx) in b.body().transactions().iter().enumerate() {
557 w.append_transaction(first_tx + i as u64, tx)?;
558 }
559 }
560 Ok(())
561 }
562
563 #[instrument(level = "debug", target = "providers::static_file", skip_all)]
565 fn write_transaction_senders(
566 w: &mut StaticFileProviderRWRefMut<'_, N>,
567 blocks: &[ExecutedBlock<N>],
568 tx_nums: &[TxNumber],
569 ) -> ProviderResult<()> {
570 for (block, &first_tx) in blocks.iter().zip(tx_nums) {
571 let b = block.recovered_block();
572 w.increment_block(b.number())?;
573 for (i, sender) in b.senders_iter().enumerate() {
574 w.append_transaction_sender(first_tx + i as u64, sender)?;
575 }
576 }
577 Ok(())
578 }
579
580 #[instrument(level = "debug", target = "providers::static_file", skip_all)]
582 fn write_receipts(
583 w: &mut StaticFileProviderRWRefMut<'_, N>,
584 blocks: &[ExecutedBlock<N>],
585 tx_nums: &[TxNumber],
586 ctx: &StaticFileWriteCtx,
587 ) -> ProviderResult<()> {
588 for (block, &first_tx) in blocks.iter().zip(tx_nums) {
589 let block_number = block.recovered_block().number();
590 w.increment_block(block_number)?;
591
592 if ctx.receipts_prunable &&
594 ctx.receipts_prune_mode
595 .is_some_and(|mode| mode.should_prune(block_number, ctx.tip))
596 {
597 continue
598 }
599
600 for (i, receipt) in block.execution_outcome().receipts.iter().enumerate() {
601 w.append_receipt(first_tx + i as u64, receipt)?;
602 }
603 }
604 Ok(())
605 }
606
607 #[instrument(level = "debug", target = "providers::static_file", skip_all)]
609 fn write_account_changesets(
610 w: &mut StaticFileProviderRWRefMut<'_, N>,
611 blocks: &[ExecutedBlock<N>],
612 ) -> ProviderResult<()> {
613 for block in blocks {
614 let block_number = block.recovered_block().number();
615 let reverts = block.execution_outcome().state.reverts.to_plain_state_reverts();
616
617 let changeset: Vec<_> = reverts
618 .accounts
619 .into_iter()
620 .flatten()
621 .map(|(address, info)| AccountBeforeTx { address, info: info.map(Into::into) })
622 .collect();
623 w.append_account_changeset(changeset, block_number)?;
624 }
625 Ok(())
626 }
627
628 #[instrument(level = "debug", target = "providers::db", skip_all)]
630 fn write_storage_changesets(
631 w: &mut StaticFileProviderRWRefMut<'_, N>,
632 blocks: &[ExecutedBlock<N>],
633 ) -> ProviderResult<()> {
634 for block in blocks {
635 let block_number = block.recovered_block().number();
636 let reverts = block.execution_outcome().state.reverts.to_plain_state_reverts();
637
638 let changeset: Vec<_> = reverts
639 .storage
640 .into_iter()
641 .flatten()
642 .flat_map(|revert| {
643 revert.storage_revert.into_iter().map(move |(key, revert_to_slot)| {
644 StorageBeforeTx {
645 address: revert.address,
646 key: B256::from(key.to_be_bytes()),
647 value: revert_to_slot.to_previous_value(),
648 }
649 })
650 })
651 .collect();
652 w.append_storage_changeset(changeset, block_number)?;
653 }
654 Ok(())
655 }
656
657 #[instrument(level = "debug", target = "providers::static_file", skip_all, fields(?segment))]
662 fn write_segment<F>(
663 &self,
664 segment: StaticFileSegment,
665 first_block_number: BlockNumber,
666 f: F,
667 ) -> ProviderResult<()>
668 where
669 F: FnOnce(&mut StaticFileProviderRWRefMut<'_, N>) -> ProviderResult<()>,
670 {
671 let mut w = self.get_writer(first_block_number, segment)?;
672 f(&mut w)?;
673 w.sync_all()
674 }
675
676 #[instrument(level = "debug", target = "providers::static_file", skip_all)]
681 pub fn write_blocks_data(
682 &self,
683 blocks: &[ExecutedBlock<N>],
684 tx_nums: &[TxNumber],
685 ctx: StaticFileWriteCtx,
686 runtime: &reth_tasks::Runtime,
687 ) -> ProviderResult<()> {
688 if blocks.is_empty() {
689 return Ok(());
690 }
691
692 let first_block_number = blocks[0].recovered_block().number();
693
694 let mut r_headers = None;
695 let mut r_txs = None;
696 let mut r_senders = None;
697 let mut r_receipts = None;
698 let mut r_account_changesets = None;
699 let mut r_storage_changesets = None;
700
701 let span = tracing::Span::current();
704 runtime.storage_pool().in_place_scope(|s| {
705 s.spawn(|_| {
706 let _guard = span.enter();
707 r_headers =
708 Some(self.write_segment(StaticFileSegment::Headers, first_block_number, |w| {
709 Self::write_headers(w, blocks)
710 }));
711 });
712
713 s.spawn(|_| {
714 let _guard = span.enter();
715 r_txs = Some(self.write_segment(
716 StaticFileSegment::Transactions,
717 first_block_number,
718 |w| Self::write_transactions(w, blocks, tx_nums),
719 ));
720 });
721
722 if ctx.write_senders {
723 s.spawn(|_| {
724 let _guard = span.enter();
725 r_senders = Some(self.write_segment(
726 StaticFileSegment::TransactionSenders,
727 first_block_number,
728 |w| Self::write_transaction_senders(w, blocks, tx_nums),
729 ));
730 });
731 }
732
733 if ctx.write_receipts {
734 s.spawn(|_| {
735 let _guard = span.enter();
736 r_receipts = Some(self.write_segment(
737 StaticFileSegment::Receipts,
738 first_block_number,
739 |w| Self::write_receipts(w, blocks, tx_nums, &ctx),
740 ));
741 });
742 }
743
744 if ctx.write_account_changesets {
745 s.spawn(|_| {
746 let _guard = span.enter();
747 r_account_changesets = Some(self.write_segment(
748 StaticFileSegment::AccountChangeSets,
749 first_block_number,
750 |w| Self::write_account_changesets(w, blocks),
751 ));
752 });
753 }
754
755 if ctx.write_storage_changesets {
756 s.spawn(|_| {
757 let _guard = span.enter();
758 r_storage_changesets = Some(self.write_segment(
759 StaticFileSegment::StorageChangeSets,
760 first_block_number,
761 |w| Self::write_storage_changesets(w, blocks),
762 ));
763 });
764 }
765 });
766
767 r_headers.ok_or(StaticFileWriterError::ThreadPanic("headers"))??;
768 r_txs.ok_or(StaticFileWriterError::ThreadPanic("transactions"))??;
769 if ctx.write_senders {
770 r_senders.ok_or(StaticFileWriterError::ThreadPanic("senders"))??;
771 }
772 if ctx.write_receipts {
773 r_receipts.ok_or(StaticFileWriterError::ThreadPanic("receipts"))??;
774 }
775 if ctx.write_account_changesets {
776 r_account_changesets
777 .ok_or(StaticFileWriterError::ThreadPanic("account_changesets"))??;
778 }
779 if ctx.write_storage_changesets {
780 r_storage_changesets
781 .ok_or(StaticFileWriterError::ThreadPanic("storage_changesets"))??;
782 }
783 Ok(())
784 }
785
786 pub fn get_segment_provider(
789 &self,
790 segment: StaticFileSegment,
791 number: u64,
792 ) -> ProviderResult<StaticFileJarProvider<'_, N>> {
793 if segment.is_block_or_change_based() {
794 self.get_segment_provider_for_block(segment, number, None)
795 } else {
796 self.get_segment_provider_for_transaction(segment, number, None)
797 }
798 }
799
800 pub fn get_maybe_segment_provider(
805 &self,
806 segment: StaticFileSegment,
807 number: u64,
808 ) -> ProviderResult<Option<StaticFileJarProvider<'_, N>>> {
809 let provider = if segment.is_block_or_change_based() {
810 self.get_segment_provider_for_block(segment, number, None)
811 } else {
812 self.get_segment_provider_for_transaction(segment, number, None)
813 };
814
815 match provider {
816 Ok(provider) => Ok(Some(provider)),
817 Err(
818 ProviderError::MissingStaticFileBlock(_, _) |
819 ProviderError::MissingStaticFileTx(_, _),
820 ) => Ok(None),
821 Err(err) => Err(err),
822 }
823 }
824
825 pub fn get_segment_provider_for_block(
827 &self,
828 segment: StaticFileSegment,
829 block: BlockNumber,
830 path: Option<&Path>,
831 ) -> ProviderResult<StaticFileJarProvider<'_, N>> {
832 self.get_segment_provider_for_range(
833 segment,
834 || self.get_segment_ranges_from_block(segment, block),
835 path,
836 )?
837 .ok_or(ProviderError::MissingStaticFileBlock(segment, block))
838 }
839
840 pub fn get_segment_provider_for_transaction(
842 &self,
843 segment: StaticFileSegment,
844 tx: TxNumber,
845 path: Option<&Path>,
846 ) -> ProviderResult<StaticFileJarProvider<'_, N>> {
847 self.get_segment_provider_for_range(
848 segment,
849 || self.get_segment_ranges_from_transaction(segment, tx),
850 path,
851 )?
852 .ok_or(ProviderError::MissingStaticFileTx(segment, tx))
853 }
854
855 pub fn get_segment_provider_for_range(
859 &self,
860 segment: StaticFileSegment,
861 fn_range: impl Fn() -> Option<SegmentRangeInclusive>,
862 path: Option<&Path>,
863 ) -> ProviderResult<Option<StaticFileJarProvider<'_, N>>> {
864 let block_range = match path {
867 Some(path) => StaticFileSegment::parse_filename(
868 &path
869 .file_name()
870 .ok_or_else(|| {
871 ProviderError::MissingStaticFileSegmentPath(segment, path.to_path_buf())
872 })?
873 .to_string_lossy(),
874 )
875 .and_then(|(parsed_segment, block_range)| {
876 if parsed_segment == segment {
877 return Some(block_range);
878 }
879 None
880 }),
881 None => fn_range(),
882 };
883
884 if let Some(block_range) = block_range {
886 return Ok(Some(self.get_or_create_jar_provider(segment, &block_range)?));
887 }
888
889 Ok(None)
890 }
891
892 pub fn get_segment_provider_for_path(
894 &self,
895 path: &Path,
896 ) -> ProviderResult<Option<StaticFileJarProvider<'_, N>>> {
897 StaticFileSegment::parse_filename(
898 &path
899 .file_name()
900 .ok_or_else(|| ProviderError::MissingStaticFilePath(path.to_path_buf()))?
901 .to_string_lossy(),
902 )
903 .map(|(segment, block_range)| self.get_or_create_jar_provider(segment, &block_range))
904 .transpose()
905 }
906
907 pub fn remove_cached_provider(
911 &self,
912 segment: StaticFileSegment,
913 fixed_block_range_end: BlockNumber,
914 ) {
915 self.map.remove(&(fixed_block_range_end, segment));
916 }
917
918 pub fn delete_segment_below_block(
935 &self,
936 segment: StaticFileSegment,
937 block: BlockNumber,
938 ) -> ProviderResult<Vec<SegmentHeader>> {
939 if block == 0 {
941 return Ok(Vec::new());
942 }
943
944 let highest_block = self.get_highest_static_file_block(segment);
945 let mut deleted_headers = Vec::new();
946
947 loop {
948 let Some(block_height) = self.get_lowest_range_end(segment) else {
949 return Ok(deleted_headers);
950 };
951
952 if block_height >= block || Some(block_height) == highest_block {
954 return Ok(deleted_headers);
955 }
956
957 debug!(
958 target: "providers::static_file",
959 ?segment,
960 ?block_height,
961 "Deleting static file below block"
962 );
963
964 let header = self.delete_jar(segment, block_height).inspect_err(|err| {
967 warn!( target: "providers::static_file", ?segment, %block_height, ?err, "Failed to delete static file below block")
968 })?;
969
970 deleted_headers.push(header);
971 }
972 }
973
974 pub fn delete_jar(
982 &self,
983 segment: StaticFileSegment,
984 block: BlockNumber,
985 ) -> ProviderResult<SegmentHeader> {
986 let fixed_block_range = self.find_fixed_range(segment, block);
987 let key = (fixed_block_range.end(), segment);
988 let file = self.path.join(segment.filename(&fixed_block_range));
989 let jar = if let Some((_, jar)) = self.map.remove(&key) {
990 jar.jar
991 } else {
992 debug!(
993 target: "providers::static_file",
994 ?file,
995 ?fixed_block_range,
996 ?block,
997 "Loading static file jar for deletion"
998 );
999 NippyJar::<SegmentHeader>::load(&file).map_err(ProviderError::other)?
1000 };
1001
1002 let header = jar.user_header().clone();
1003
1004 if segment.is_change_based() {
1006 let csoff_path = file.with_extension("csoff");
1007 if csoff_path.exists() {
1008 std::fs::remove_file(&csoff_path).map_err(ProviderError::other)?;
1009 }
1010 }
1011
1012 jar.delete().map_err(ProviderError::other)?;
1013
1014 self.initialize_index()?;
1017
1018 Ok(header)
1019 }
1020
1021 pub fn delete_segment(&self, segment: StaticFileSegment) -> ProviderResult<Vec<SegmentHeader>> {
1029 let mut deleted_headers = Vec::new();
1030
1031 while let Some(block_height) = self.get_highest_static_file_block(segment) {
1032 debug!(
1033 target: "providers::static_file",
1034 ?segment,
1035 ?block_height,
1036 "Deleting static file jar"
1037 );
1038
1039 let header = self.delete_jar(segment, block_height).inspect_err(|err| {
1040 warn!(target: "providers::static_file", ?segment, %block_height, ?err, "Failed to delete static file jar")
1041 })?;
1042
1043 deleted_headers.push(header);
1044 }
1045
1046 Ok(deleted_headers)
1047 }
1048
1049 fn get_or_create_jar_provider(
1053 &self,
1054 segment: StaticFileSegment,
1055 fixed_block_range: &SegmentRangeInclusive,
1056 ) -> ProviderResult<StaticFileJarProvider<'_, N>> {
1057 let key = (fixed_block_range.end(), segment);
1058
1059 trace!(target: "providers::static_file", ?segment, ?fixed_block_range, "Getting provider");
1061 let mut provider: StaticFileJarProvider<'_, N> = if let Some(jar) = self.map.get(&key) {
1062 trace!(target: "providers::static_file", ?segment, ?fixed_block_range, "Jar found in cache");
1063 jar.into()
1064 } else {
1065 trace!(target: "providers::static_file", ?segment, ?fixed_block_range, "Creating jar from scratch");
1066 let path = self.path.join(segment.filename(fixed_block_range));
1067 let jar = NippyJar::load(&path).map_err(ProviderError::other)?;
1068 self.map.entry(key).insert(LoadedJar::new(jar)?).downgrade().into()
1069 };
1070
1071 if let Some(metrics) = &self.metrics {
1072 provider = provider.with_metrics(metrics.clone());
1073 }
1074 Ok(provider)
1075 }
1076
1077 fn get_segment_ranges_from_block(
1080 &self,
1081 segment: StaticFileSegment,
1082 block: u64,
1083 ) -> Option<SegmentRangeInclusive> {
1084 let indexes = self.indexes.read();
1085 let index = indexes.get(segment)?;
1086
1087 (index.max_block >= block).then(|| {
1088 self.find_fixed_range_with_block_index(
1089 segment,
1090 Some(&index.expected_block_ranges_by_max_block),
1091 block,
1092 )
1093 })
1094 }
1095
1096 fn get_segment_ranges_from_transaction(
1099 &self,
1100 segment: StaticFileSegment,
1101 tx: u64,
1102 ) -> Option<SegmentRangeInclusive> {
1103 let indexes = self.indexes.read();
1104 let index = indexes.get(segment)?;
1105 let available_block_ranges_by_max_tx = index.available_block_ranges_by_max_tx.as_ref()?;
1106
1107 let mut static_files_rev_iter = available_block_ranges_by_max_tx.iter().rev().peekable();
1110
1111 while let Some((tx_end, block_range)) = static_files_rev_iter.next() {
1112 if tx > *tx_end {
1113 return None;
1115 }
1116 let tx_start = static_files_rev_iter.peek().map(|(tx_end, _)| *tx_end + 1).unwrap_or(0);
1117 if tx_start <= tx {
1118 return Some(self.find_fixed_range_with_block_index(
1119 segment,
1120 Some(&index.expected_block_ranges_by_max_block),
1121 block_range.end(),
1122 ));
1123 }
1124 }
1125 None
1126 }
1127
1128 pub fn update_index(
1135 &self,
1136 segment: StaticFileSegment,
1137 segment_max_block: Option<BlockNumber>,
1138 ) -> ProviderResult<()> {
1139 debug!(
1140 target: "providers::static_file",
1141 ?segment,
1142 ?segment_max_block,
1143 "Updating provider index"
1144 );
1145 let mut indexes = self.indexes.write();
1146
1147 match segment_max_block {
1148 Some(segment_max_block) => {
1149 let fixed_range = self.find_fixed_range_with_block_index(
1150 segment,
1151 indexes.get(segment).map(|index| &index.expected_block_ranges_by_max_block),
1152 segment_max_block,
1153 );
1154
1155 let jar = NippyJar::<SegmentHeader>::load(
1156 &self.path.join(segment.filename(&fixed_range)),
1157 )
1158 .map_err(ProviderError::other)?;
1159
1160 let index = indexes
1161 .entry(segment)
1162 .and_modify(|index| {
1163 index.max_block = segment_max_block;
1165
1166 index
1170 .expected_block_ranges_by_max_block
1171 .retain(|_, block_range| block_range.start() < fixed_range.start());
1172 index
1174 .expected_block_ranges_by_max_block
1175 .insert(fixed_range.end(), fixed_range);
1176 })
1177 .or_insert_with(|| StaticFileSegmentIndex {
1178 min_block_range: None,
1179 max_block: segment_max_block,
1180 expected_block_ranges_by_max_block: BTreeMap::from([(
1181 fixed_range.end(),
1182 fixed_range,
1183 )]),
1184 available_block_ranges_by_max_tx: None,
1185 });
1186
1187 if let Some(current_block_range) = jar.user_header().block_range() {
1203 if let Some(min_block_range) = index.min_block_range.as_mut() {
1204 if current_block_range.start() == min_block_range.start() {
1207 *min_block_range = current_block_range;
1208 }
1209 } else {
1210 index.min_block_range = Some(current_block_range);
1211 }
1212 }
1213
1214 if let Some(tx_range) = jar.user_header().tx_range() {
1217 if let Some(current_block_range) = jar.user_header().block_range() {
1220 let tx_end = tx_range.end();
1221
1222 if let Some(index) = index.available_block_ranges_by_max_tx.as_mut() {
1231 index
1232 .retain(|_, block_range| block_range.start() < fixed_range.start());
1233 index.insert(tx_end, current_block_range);
1234 } else {
1235 index.available_block_ranges_by_max_tx =
1236 Some(BTreeMap::from([(tx_end, current_block_range)]));
1237 }
1238 }
1239 } else if segment.is_tx_based() {
1240 if let Some(index) = index.available_block_ranges_by_max_tx.as_mut() {
1244 index.retain(|_, block_range| block_range.start() < fixed_range.start());
1245 }
1246
1247 index.available_block_ranges_by_max_tx.take_if(|index| index.is_empty());
1249 }
1250
1251 debug!(target: "providers::static_file", ?segment, "Inserting updated jar into cache");
1253 self.map.insert((fixed_range.end(), segment), LoadedJar::new(jar)?);
1254
1255 debug!(target: "providers::static_file", ?segment, "Cleaning up jar map");
1257 self.map.retain(|(end, seg), _| !(*seg == segment && *end > fixed_range.end()));
1258 }
1259 None => {
1260 debug!(target: "providers::static_file", ?segment, "Removing segment from index");
1261 indexes.remove(segment);
1262 }
1263 };
1264
1265 debug!(target: "providers::static_file", ?segment, "Updated provider index");
1266 Ok(())
1267 }
1268
1269 pub fn initialize_index(&self) -> ProviderResult<()> {
1271 let mut indexes = self.indexes.write();
1272 indexes.clear();
1273
1274 for (segment, headers) in &*iter_static_files(&self.path).map_err(ProviderError::other)? {
1275 let min_block_range = Some(headers.first().expect("headers are not empty").0);
1280 let max_block = headers.last().expect("headers are not empty").0.end();
1281
1282 let mut expected_block_ranges_by_max_block = BTreeMap::default();
1283 let mut available_block_ranges_by_max_tx = None;
1284
1285 for (block_range, header) in headers {
1286 expected_block_ranges_by_max_block
1288 .insert(header.expected_block_end(), header.expected_block_range());
1289
1290 if let Some(tx_range) = header.tx_range() {
1292 let tx_end = tx_range.end();
1293
1294 available_block_ranges_by_max_tx
1295 .get_or_insert_with(BTreeMap::default)
1296 .insert(tx_end, *block_range);
1297 }
1298 }
1299
1300 indexes.insert(
1301 segment,
1302 StaticFileSegmentIndex {
1303 min_block_range,
1304 max_block,
1305 expected_block_ranges_by_max_block,
1306 available_block_ranges_by_max_tx,
1307 },
1308 );
1309 }
1310
1311 self.map.clear();
1313
1314 if let Some(lowest_range) =
1316 indexes.get(StaticFileSegment::Transactions).and_then(|index| index.min_block_range)
1317 {
1318 self.earliest_history_height
1320 .store(lowest_range.start(), std::sync::atomic::Ordering::Relaxed);
1321 }
1322
1323 Ok(())
1324 }
1325
1326 #[instrument(skip(self, provider), fields(read_only = self.is_read_only()))]
1350 pub fn check_consistency<Provider>(
1351 &self,
1352 provider: &Provider,
1353 ) -> ProviderResult<Option<PipelineTarget>>
1354 where
1355 Provider: DBProvider
1356 + BlockReader
1357 + StageCheckpointReader
1358 + PruneCheckpointReader
1359 + ChainSpecProvider
1360 + StorageSettingsCache,
1361 N: NodePrimitives<Receipt: Value, BlockHeader: Value, SignedTx: Value>,
1362 {
1363 if provider.chain_spec().is_optimism() &&
1370 reth_chainspec::Chain::optimism_mainnet() == provider.chain_spec().chain_id()
1371 {
1372 const OVM_HEADER_1_HASH: B256 =
1374 b256!("0xbee7192e575af30420cae0c7776304ac196077ee72b048970549e4f08e875453");
1375 if provider.block_number(OVM_HEADER_1_HASH)?.is_some() {
1376 info!(target: "reth::cli",
1377 "Skipping storage verification for OP mainnet, expected inconsistency in OVM chain"
1378 );
1379 return Ok(None);
1380 }
1381 }
1382
1383 info!(target: "reth::cli", "Verifying storage consistency.");
1384
1385 let mut unwind_target: Option<BlockNumber> = None;
1386
1387 let mut update_unwind_target = |new_target| {
1388 unwind_target =
1389 unwind_target.map(|current| current.min(new_target)).or(Some(new_target));
1390 };
1391
1392 for segment in self.segments_to_check(provider) {
1393 let span = info_span!(
1394 "Checking consistency for segment",
1395 ?segment,
1396 initial_highest_block = tracing::field::Empty,
1397 highest_block = tracing::field::Empty,
1398 highest_tx = tracing::field::Empty,
1399 );
1400 let _guard = span.enter();
1401
1402 debug!(target: "reth::providers::static_file", "Checking consistency for segment");
1403
1404 let (initial_highest_block, mut highest_block) = self.maybe_heal_segment(segment)?;
1406 span.record("initial_highest_block", initial_highest_block);
1407 span.record("highest_block", highest_block);
1408
1409 if initial_highest_block != highest_block {
1414 info!(
1415 target: "reth::providers::static_file",
1416 unwind_target = highest_block,
1417 "Setting unwind target."
1418 );
1419 update_unwind_target(highest_block.unwrap_or_default());
1420 }
1421
1422 let highest_tx = self.get_highest_static_file_tx(segment);
1428 span.record("highest_tx", highest_tx);
1429 debug!(target: "reth::providers::static_file", "Checking tx index segment");
1430
1431 if let Some(highest_tx) = highest_tx {
1432 let mut last_block = highest_block.unwrap_or_default();
1433 debug!(target: "reth::providers::static_file", last_block, highest_tx, "Verifying last transaction matches last block indices");
1434 loop {
1435 let Some(indices) = provider.block_body_indices(last_block)? else {
1436 debug!(target: "reth::providers::static_file", last_block, "Block body indices not found, static files ahead of database");
1437 break
1441 };
1442
1443 debug!(target: "reth::providers::static_file", last_block, last_tx_num = indices.last_tx_num(), "Found block body indices");
1444
1445 if indices.last_tx_num() <= highest_tx {
1446 break
1447 }
1448
1449 if last_block == 0 {
1450 debug!(target: "reth::providers::static_file", "Reached block 0 in verification loop");
1451 break
1452 }
1453
1454 last_block -= 1;
1455
1456 info!(
1457 target: "reth::providers::static_file",
1458 highest_block = self.get_highest_static_file_block(segment),
1459 unwind_target = last_block,
1460 "Setting unwind target."
1461 );
1462 span.record("highest_block", last_block);
1463 highest_block = Some(last_block);
1464 update_unwind_target(last_block);
1465 }
1466 }
1467
1468 debug!(target: "reth::providers::static_file", "Ensuring invariants for segment");
1469
1470 match self.ensure_invariants_for(provider, segment, highest_tx, highest_block)? {
1471 Some(unwind) => {
1472 debug!(target: "reth::providers::static_file", unwind_target=unwind, "Invariants check returned unwind target");
1473 update_unwind_target(unwind);
1474 }
1475 None => {
1476 debug!(target: "reth::providers::static_file", "Invariants check completed, no unwind needed")
1477 }
1478 }
1479 }
1480
1481 Ok(unwind_target.map(PipelineTarget::Unwind))
1482 }
1483
1484 pub fn check_file_consistency<Provider>(&self, provider: &Provider) -> ProviderResult<()>
1493 where
1494 Provider: DBProvider + ChainSpecProvider + StorageSettingsCache + PruneCheckpointReader,
1495 {
1496 info!(target: "reth::cli", "Healing static file inconsistencies.");
1497
1498 for segment in self.segments_to_check(provider) {
1499 let _guard = info_span!("healing_static_file_segment", ?segment).entered();
1500 let _ = self.maybe_heal_segment(segment)?;
1501 }
1502
1503 Ok(())
1504 }
1505
1506 fn segments_to_check<'a, Provider>(
1508 &'a self,
1509 provider: &'a Provider,
1510 ) -> impl Iterator<Item = StaticFileSegment> + 'a
1511 where
1512 Provider: DBProvider + ChainSpecProvider + StorageSettingsCache + PruneCheckpointReader,
1513 {
1514 StaticFileSegment::iter()
1515 .filter(move |segment| self.should_check_segment(provider, *segment))
1516 }
1517
1518 fn should_check_segment<Provider>(
1520 &self,
1521 provider: &Provider,
1522 segment: StaticFileSegment,
1523 ) -> bool
1524 where
1525 Provider: DBProvider + ChainSpecProvider + StorageSettingsCache + PruneCheckpointReader,
1526 {
1527 match segment {
1528 StaticFileSegment::Headers | StaticFileSegment::Transactions => true,
1529 StaticFileSegment::Receipts => {
1530 if EitherWriter::receipts_destination(provider).is_database() {
1531 debug!(target: "reth::providers::static_file", ?segment, "Skipping receipts segment: receipts stored in database");
1534 return false;
1535 }
1536
1537 if NamedChain::Gnosis == provider.chain_spec().chain_id() ||
1538 NamedChain::Chiado == provider.chain_spec().chain_id()
1539 {
1540 debug!(target: "reth::providers::static_file", ?segment, "Skipping receipts segment: broken historical import for gnosis/chiado");
1544 return false;
1545 }
1546
1547 true
1548 }
1549 StaticFileSegment::TransactionSenders => {
1550 if EitherWriterDestination::senders(provider).is_database() {
1551 debug!(target: "reth::providers::static_file", ?segment, "Skipping senders segment: senders stored in database");
1552 return false;
1553 }
1554
1555 if Self::is_segment_fully_pruned(provider, PruneSegment::SenderRecovery) {
1556 debug!(target: "reth::providers::static_file", ?segment, "Skipping senders segment: fully pruned");
1557 return false;
1558 }
1559
1560 true
1561 }
1562 StaticFileSegment::AccountChangeSets => {
1563 if EitherWriter::account_changesets_destination(provider).is_database() {
1564 debug!(target: "reth::providers::static_file", ?segment, "Skipping account changesets segment: changesets stored in database");
1565 return false;
1566 }
1567 true
1568 }
1569 StaticFileSegment::StorageChangeSets => {
1570 if EitherWriter::storage_changesets_destination(provider).is_database() {
1571 debug!(target: "reth::providers::static_file", ?segment, "Skipping storage changesets segment: changesets stored in database");
1572 return false
1573 }
1574 true
1575 }
1576 }
1577 }
1578
1579 fn is_segment_fully_pruned<Provider>(provider: &Provider, segment: PruneSegment) -> bool
1583 where
1584 Provider: PruneCheckpointReader,
1585 {
1586 provider
1587 .get_prune_checkpoint(segment)
1588 .ok()
1589 .flatten()
1590 .is_some_and(|checkpoint| checkpoint.prune_mode.is_full())
1591 }
1592
1593 fn check_segment_consistency(&self, segment: StaticFileSegment) -> ProviderResult<()> {
1598 debug!(target: "reth::providers::static_file", "Checking segment consistency");
1599 if let Some(latest_block) = self.get_highest_static_file_block(segment) {
1600 let file_path = self
1601 .directory()
1602 .join(segment.filename(&self.find_fixed_range(segment, latest_block)));
1603 debug!(target: "reth::providers::static_file", ?file_path, latest_block, "Loading NippyJar for consistency check");
1604
1605 let jar = NippyJar::<SegmentHeader>::load(&file_path).map_err(ProviderError::other)?;
1606 debug!(target: "reth::providers::static_file", "NippyJar loaded, checking consistency");
1607
1608 NippyJarChecker::new(jar).check_consistency().map_err(ProviderError::other)?;
1609 debug!(target: "reth::providers::static_file", "NippyJar consistency check passed");
1610 } else {
1611 debug!(target: "reth::providers::static_file", "No static file block found, skipping consistency check");
1612 }
1613 Ok(())
1614 }
1615
1616 fn maybe_heal_segment(
1632 &self,
1633 segment: StaticFileSegment,
1634 ) -> ProviderResult<(Option<BlockNumber>, Option<BlockNumber>)> {
1635 let initial_highest_block = self.get_highest_static_file_block(segment);
1636 debug!(target: "reth::providers::static_file", ?initial_highest_block, "Initial highest block for segment");
1637
1638 if self.access.is_read_only() {
1639 debug!(target: "reth::providers::static_file", "Checking segment consistency (read-only)");
1642 self.check_segment_consistency(segment)?;
1643 } else {
1644 debug!(target: "reth::providers::static_file", "Fetching latest writer which might heal any potential inconsistency");
1647 self.latest_writer(segment)?;
1648 }
1649
1650 let highest_block = self.get_highest_static_file_block(segment);
1653
1654 Ok((initial_highest_block, highest_block))
1655 }
1656
1657 fn ensure_invariants_for<Provider>(
1659 &self,
1660 provider: &Provider,
1661 segment: StaticFileSegment,
1662 highest_tx: Option<u64>,
1663 highest_block: Option<BlockNumber>,
1664 ) -> ProviderResult<Option<BlockNumber>>
1665 where
1666 Provider: DBProvider + BlockReader + StageCheckpointReader,
1667 N: NodePrimitives<Receipt: Value, BlockHeader: Value, SignedTx: Value>,
1668 {
1669 match segment {
1670 StaticFileSegment::Headers => self
1671 .ensure_invariants::<_, tables::Headers<N::BlockHeader>>(
1672 provider,
1673 segment,
1674 highest_block,
1675 highest_block,
1676 ),
1677 StaticFileSegment::Transactions => self
1678 .ensure_invariants::<_, tables::Transactions<N::SignedTx>>(
1679 provider,
1680 segment,
1681 highest_tx,
1682 highest_block,
1683 ),
1684 StaticFileSegment::Receipts => self
1685 .ensure_invariants::<_, tables::Receipts<N::Receipt>>(
1686 provider,
1687 segment,
1688 highest_tx,
1689 highest_block,
1690 ),
1691 StaticFileSegment::TransactionSenders => self
1692 .ensure_invariants::<_, tables::TransactionSenders>(
1693 provider,
1694 segment,
1695 highest_tx,
1696 highest_block,
1697 ),
1698 StaticFileSegment::AccountChangeSets => self
1699 .ensure_invariants::<_, tables::AccountChangeSets>(
1700 provider,
1701 segment,
1702 highest_tx,
1703 highest_block,
1704 ),
1705 StaticFileSegment::StorageChangeSets => self
1706 .ensure_changeset_invariants_by_block::<_, tables::StorageChangeSets, _>(
1707 provider,
1708 segment,
1709 highest_block,
1710 |key| key.block_number(),
1711 ),
1712 }
1713 }
1714
1715 #[instrument(skip(self, provider, segment), fields(table = T::NAME))]
1730 fn ensure_invariants<Provider, T: Table<Key = u64>>(
1731 &self,
1732 provider: &Provider,
1733 segment: StaticFileSegment,
1734 highest_static_file_entry: Option<u64>,
1735 highest_static_file_block: Option<BlockNumber>,
1736 ) -> ProviderResult<Option<BlockNumber>>
1737 where
1738 Provider: DBProvider + BlockReader + StageCheckpointReader,
1739 {
1740 debug!(target: "reth::providers::static_file", "Ensuring invariants");
1741 let mut db_cursor = provider.tx_ref().cursor_read::<T>()?;
1742
1743 if let Some((db_first_entry, _)) = db_cursor.first()? {
1744 debug!(target: "reth::providers::static_file", db_first_entry, "Found first database entry");
1745 if let (Some(highest_entry), Some(highest_block)) =
1746 (highest_static_file_entry, highest_static_file_block)
1747 {
1748 if !(db_first_entry <= highest_entry || highest_entry + 1 == db_first_entry) {
1752 info!(
1753 target: "reth::providers::static_file",
1754 ?db_first_entry,
1755 ?highest_entry,
1756 unwind_target = highest_block,
1757 "Setting unwind target."
1758 );
1759 return Ok(Some(highest_block));
1760 }
1761 }
1762
1763 if let Some((db_last_entry, _)) = db_cursor.last()? &&
1764 highest_static_file_entry
1765 .is_none_or(|highest_entry| db_last_entry > highest_entry)
1766 {
1767 debug!(target: "reth::providers::static_file", db_last_entry, "Database has entries beyond static files, no unwind needed");
1768 return Ok(None)
1769 }
1770 } else {
1771 debug!(target: "reth::providers::static_file", "No database entries found");
1772 }
1773
1774 let highest_static_file_entry = highest_static_file_entry.unwrap_or_default();
1775 let highest_static_file_block = highest_static_file_block.unwrap_or_default();
1776
1777 let stage_id = segment.to_stage_id();
1780 let checkpoint_block_number =
1781 provider.get_stage_checkpoint(stage_id)?.unwrap_or_default().block_number;
1782 debug!(target: "reth::providers::static_file", ?stage_id, checkpoint_block_number, "Retrieved stage checkpoint");
1783
1784 if checkpoint_block_number > highest_static_file_block {
1786 info!(
1787 target: "reth::providers::static_file",
1788 checkpoint_block_number,
1789 unwind_target = highest_static_file_block,
1790 "Setting unwind target."
1791 );
1792 return Ok(Some(highest_static_file_block));
1793 }
1794
1795 if checkpoint_block_number >= highest_static_file_block {
1797 debug!(target: "reth::providers::static_file", "Invariants ensured, returning None");
1798 return Ok(None);
1799 }
1800
1801 info!(
1807 target: "reth::providers",
1808 from = highest_static_file_block,
1809 to = checkpoint_block_number,
1810 "Unwinding static file segment."
1811 );
1812 let mut writer = self.latest_writer(segment)?;
1813
1814 match segment {
1815 StaticFileSegment::Headers => {
1816 let prune_count = highest_static_file_block - checkpoint_block_number;
1817 debug!(target: "reth::providers::static_file", prune_count, "Pruning headers");
1818 writer.prune_headers(prune_count)?;
1820 }
1821 StaticFileSegment::Transactions |
1822 StaticFileSegment::Receipts |
1823 StaticFileSegment::TransactionSenders => {
1824 if let Some(block) = provider.block_body_indices(checkpoint_block_number)? {
1825 let number = highest_static_file_entry - block.last_tx_num();
1826 debug!(target: "reth::providers::static_file", prune_count = number, checkpoint_block_number, "Pruning transaction based segment");
1827
1828 match segment {
1829 StaticFileSegment::Transactions => {
1830 writer.prune_transactions(number, checkpoint_block_number)?
1831 }
1832 StaticFileSegment::Receipts => {
1833 writer.prune_receipts(number, checkpoint_block_number)?
1834 }
1835 StaticFileSegment::TransactionSenders => {
1836 writer.prune_transaction_senders(number, checkpoint_block_number)?
1837 }
1838 StaticFileSegment::Headers |
1839 StaticFileSegment::AccountChangeSets |
1840 StaticFileSegment::StorageChangeSets => {
1841 unreachable!()
1842 }
1843 }
1844 } else {
1845 debug!(target: "reth::providers::static_file", checkpoint_block_number, "No block body indices found for checkpoint block");
1846 }
1847 }
1848 StaticFileSegment::AccountChangeSets => {
1849 writer.prune_account_changesets(checkpoint_block_number)?;
1850 }
1851 StaticFileSegment::StorageChangeSets => {
1852 writer.prune_storage_changesets(checkpoint_block_number)?;
1853 }
1854 }
1855
1856 debug!(target: "reth::providers::static_file", "Committing writer after pruning");
1857 writer.commit()?;
1858 debug!(target: "reth::providers::static_file", "Writer committed successfully");
1859
1860 debug!(target: "reth::providers::static_file", "Invariants ensured, returning None");
1861 Ok(None)
1862 }
1863
1864 fn ensure_changeset_invariants_by_block<Provider, T, F>(
1865 &self,
1866 provider: &Provider,
1867 segment: StaticFileSegment,
1868 highest_static_file_block: Option<BlockNumber>,
1869 block_from_key: F,
1870 ) -> ProviderResult<Option<BlockNumber>>
1871 where
1872 Provider: DBProvider + BlockReader + StageCheckpointReader,
1873 T: Table,
1874 F: Fn(&T::Key) -> BlockNumber,
1875 {
1876 debug!(
1877 target: "reth::providers::static_file",
1878 ?segment,
1879 ?highest_static_file_block,
1880 "Ensuring changeset invariants"
1881 );
1882 let mut db_cursor = provider.tx_ref().cursor_read::<T>()?;
1883
1884 if let Some((db_first_key, _)) = db_cursor.first()? {
1885 let db_first_block = block_from_key(&db_first_key);
1886 if let Some(highest_block) = highest_static_file_block &&
1887 !(db_first_block <= highest_block || highest_block + 1 == db_first_block)
1888 {
1889 info!(
1890 target: "reth::providers::static_file",
1891 ?db_first_block,
1892 ?highest_block,
1893 unwind_target = highest_block,
1894 ?segment,
1895 "Setting unwind target."
1896 );
1897 return Ok(Some(highest_block))
1898 }
1899
1900 if let Some((db_last_key, _)) = db_cursor.last()? &&
1901 highest_static_file_block
1902 .is_none_or(|highest_block| block_from_key(&db_last_key) > highest_block)
1903 {
1904 debug!(
1905 target: "reth::providers::static_file",
1906 ?segment,
1907 "Database has entries beyond static files, no unwind needed"
1908 );
1909 return Ok(None)
1910 }
1911 } else {
1912 debug!(target: "reth::providers::static_file", ?segment, "No database entries found");
1913 }
1914
1915 let highest_static_file_block = highest_static_file_block.unwrap_or_default();
1916
1917 let stage_id = segment.to_stage_id();
1918 let checkpoint_block_number =
1919 provider.get_stage_checkpoint(stage_id)?.unwrap_or_default().block_number;
1920
1921 if checkpoint_block_number > highest_static_file_block {
1922 info!(
1923 target: "reth::providers::static_file",
1924 checkpoint_block_number,
1925 unwind_target = highest_static_file_block,
1926 ?segment,
1927 "Setting unwind target."
1928 );
1929 return Ok(Some(highest_static_file_block))
1930 }
1931
1932 if checkpoint_block_number < highest_static_file_block {
1933 info!(
1934 target: "reth::providers",
1935 ?segment,
1936 from = highest_static_file_block,
1937 to = checkpoint_block_number,
1938 "Unwinding static file segment."
1939 );
1940 let mut writer = self.latest_writer(segment)?;
1941 match segment {
1942 StaticFileSegment::AccountChangeSets => {
1943 writer.prune_account_changesets(checkpoint_block_number)?;
1944 }
1945 StaticFileSegment::StorageChangeSets => {
1946 writer.prune_storage_changesets(checkpoint_block_number)?;
1947 }
1948 _ => unreachable!("invalid segment for changeset invariants"),
1949 }
1950 writer.commit()?;
1951 }
1952
1953 Ok(None)
1954 }
1955
1956 pub fn earliest_history_height(&self) -> BlockNumber {
1964 self.earliest_history_height.load(std::sync::atomic::Ordering::Relaxed)
1965 }
1966
1967 pub fn get_lowest_range(&self, segment: StaticFileSegment) -> Option<SegmentRangeInclusive> {
1971 self.indexes.read().get(segment).and_then(|index| index.min_block_range)
1972 }
1973
1974 pub fn get_lowest_range_start(&self, segment: StaticFileSegment) -> Option<BlockNumber> {
1980 self.get_lowest_range(segment).map(|range| range.start())
1981 }
1982
1983 pub fn get_lowest_range_end(&self, segment: StaticFileSegment) -> Option<BlockNumber> {
1989 self.get_lowest_range(segment).map(|range| range.end())
1990 }
1991
1992 pub fn get_highest_static_file_block(&self, segment: StaticFileSegment) -> Option<BlockNumber> {
1996 self.indexes.read().get(segment).map(|index| index.max_block)
1997 }
1998
1999 fn bound_range(
2006 &self,
2007 range: impl RangeBounds<BlockNumber>,
2008 segment: StaticFileSegment,
2009 ) -> RangeInclusive<BlockNumber> {
2010 let highest_block = self.get_highest_static_file_block(segment).unwrap_or(0);
2011
2012 let start = match range.start_bound() {
2013 Bound::Included(&n) => n,
2014 Bound::Excluded(&n) => n.saturating_add(1),
2015 Bound::Unbounded => 0,
2016 };
2017 let end = match range.end_bound() {
2018 Bound::Included(&n) => n.min(highest_block),
2019 Bound::Excluded(&n) => n.saturating_sub(1).min(highest_block),
2020 Bound::Unbounded => highest_block,
2021 };
2022
2023 start..=end
2024 }
2025
2026 pub fn get_highest_static_file_tx(&self, segment: StaticFileSegment) -> Option<TxNumber> {
2030 self.indexes
2031 .read()
2032 .get(segment)
2033 .and_then(|index| index.available_block_ranges_by_max_tx.as_ref())
2034 .and_then(|index| index.last_key_value().map(|(last_tx, _)| *last_tx))
2035 }
2036
2037 pub fn get_highest_static_files(&self) -> HighestStaticFiles {
2039 HighestStaticFiles {
2040 receipts: self.get_highest_static_file_block(StaticFileSegment::Receipts),
2041 }
2042 }
2043
2044 pub fn find_static_file<T>(
2047 &self,
2048 segment: StaticFileSegment,
2049 func: impl Fn(StaticFileJarProvider<'_, N>) -> ProviderResult<Option<T>>,
2050 ) -> ProviderResult<Option<T>> {
2051 if let Some(ranges) =
2052 self.indexes.read().get(segment).map(|index| &index.expected_block_ranges_by_max_block)
2053 {
2054 for range in ranges.values().rev() {
2056 if let Some(res) = func(self.get_or_create_jar_provider(segment, range)?)? {
2057 return Ok(Some(res));
2058 }
2059 }
2060 }
2061
2062 Ok(None)
2063 }
2064
2065 pub fn fetch_range_with_predicate<T, F, P>(
2071 &self,
2072 segment: StaticFileSegment,
2073 range: Range<u64>,
2074 mut get_fn: F,
2075 mut predicate: P,
2076 ) -> ProviderResult<Vec<T>>
2077 where
2078 F: FnMut(&mut StaticFileCursor<'_>, u64) -> ProviderResult<Option<T>>,
2079 P: FnMut(&T) -> bool,
2080 {
2081 let mut result = Vec::with_capacity((range.end - range.start).min(100) as usize);
2082
2083 macro_rules! get_provider {
2087 ($number:expr) => {{
2088 match self.get_segment_provider(segment, $number) {
2089 Ok(provider) => provider,
2090 Err(
2091 ProviderError::MissingStaticFileBlock(_, _) |
2092 ProviderError::MissingStaticFileTx(_, _),
2093 ) => return Ok(result),
2094 Err(err) => return Err(err),
2095 }
2096 }};
2097 }
2098
2099 let mut provider = get_provider!(range.start);
2100 let mut cursor = provider.cursor()?;
2101
2102 'outer: for number in range {
2104 let mut retrying = false;
2108
2109 'inner: loop {
2111 match get_fn(&mut cursor, number)? {
2112 Some(res) => {
2113 if !predicate(&res) {
2114 break 'outer;
2115 }
2116 result.push(res);
2117 break 'inner;
2118 }
2119 None => {
2120 if retrying {
2121 return Ok(result);
2122 }
2123 drop(cursor);
2128 drop(provider);
2129 provider = get_provider!(number);
2130 cursor = provider.cursor()?;
2131 retrying = true;
2132 }
2133 }
2134 }
2135 }
2136
2137 result.shrink_to_fit();
2138
2139 Ok(result)
2140 }
2141
2142 pub fn fetch_range_iter<'a, T, F>(
2147 &'a self,
2148 segment: StaticFileSegment,
2149 range: Range<u64>,
2150 get_fn: F,
2151 ) -> ProviderResult<impl Iterator<Item = ProviderResult<Option<T>>> + 'a>
2152 where
2153 F: Fn(&mut StaticFileCursor<'_>, u64) -> ProviderResult<Option<T>> + 'a,
2154 T: std::fmt::Debug,
2155 {
2156 let mut provider = self.get_maybe_segment_provider(segment, range.start)?;
2157 Ok(range.map(move |number| {
2158 match provider
2159 .as_ref()
2160 .map(|provider| get_fn(&mut provider.cursor()?, number))
2161 .and_then(|result| result.transpose())
2162 {
2163 Some(result) => result.map(Some),
2164 None => {
2165 provider.take();
2169 provider = self.get_maybe_segment_provider(segment, number)?;
2170 provider
2171 .as_ref()
2172 .map(|provider| get_fn(&mut provider.cursor()?, number))
2173 .and_then(|result| result.transpose())
2174 .transpose()
2175 }
2176 }
2177 }))
2178 }
2179
2180 pub fn directory(&self) -> &Path {
2182 &self.path
2183 }
2184
2185 pub fn get_with_static_file_or_database<T, FS, FD>(
2195 &self,
2196 segment: StaticFileSegment,
2197 number: u64,
2198 fetch_from_static_file: FS,
2199 fetch_from_database: FD,
2200 ) -> ProviderResult<Option<T>>
2201 where
2202 FS: Fn(&Self) -> ProviderResult<Option<T>>,
2203 FD: Fn() -> ProviderResult<Option<T>>,
2204 {
2205 let static_file_upper_bound = if segment.is_block_or_change_based() {
2207 self.get_highest_static_file_block(segment)
2208 } else {
2209 self.get_highest_static_file_tx(segment)
2210 };
2211
2212 if static_file_upper_bound
2213 .is_some_and(|static_file_upper_bound| static_file_upper_bound >= number)
2214 {
2215 return fetch_from_static_file(self);
2216 }
2217 fetch_from_database()
2218 }
2219
2220 pub fn get_range_with_static_file_or_database<T, P, FS, FD>(
2232 &self,
2233 segment: StaticFileSegment,
2234 mut block_or_tx_range: Range<u64>,
2235 fetch_from_static_file: FS,
2236 mut fetch_from_database: FD,
2237 mut predicate: P,
2238 ) -> ProviderResult<Vec<T>>
2239 where
2240 FS: Fn(&Self, Range<u64>, &mut P) -> ProviderResult<Vec<T>>,
2241 FD: FnMut(Range<u64>, P) -> ProviderResult<Vec<T>>,
2242 P: FnMut(&T) -> bool,
2243 {
2244 let mut data = Vec::new();
2245
2246 if let Some(static_file_upper_bound) = if segment.is_block_or_change_based() {
2248 self.get_highest_static_file_block(segment)
2249 } else {
2250 self.get_highest_static_file_tx(segment)
2251 } && block_or_tx_range.start <= static_file_upper_bound
2252 {
2253 let end = block_or_tx_range.end.min(static_file_upper_bound + 1);
2254 data.extend(fetch_from_static_file(
2255 self,
2256 block_or_tx_range.start..end,
2257 &mut predicate,
2258 )?);
2259 block_or_tx_range.start = end;
2260 }
2261
2262 if block_or_tx_range.end > block_or_tx_range.start {
2263 data.extend(fetch_from_database(block_or_tx_range, predicate)?)
2264 }
2265
2266 Ok(data)
2267 }
2268
2269 #[cfg(any(test, feature = "test-utils"))]
2271 pub fn path(&self) -> &Path {
2272 &self.path
2273 }
2274
2275 #[cfg(any(test, feature = "test-utils"))]
2277 pub fn tx_index(&self, segment: StaticFileSegment) -> Option<SegmentRanges> {
2278 self.indexes
2279 .read()
2280 .get(segment)
2281 .and_then(|index| index.available_block_ranges_by_max_tx.as_ref())
2282 .cloned()
2283 }
2284
2285 #[cfg(any(test, feature = "test-utils"))]
2287 pub fn expected_block_index(&self, segment: StaticFileSegment) -> Option<SegmentRanges> {
2288 self.indexes
2289 .read()
2290 .get(segment)
2291 .map(|index| &index.expected_block_ranges_by_max_block)
2292 .cloned()
2293 }
2294}
2295
2296#[derive(Debug)]
2297struct StaticFileSegmentIndex {
2298 min_block_range: Option<SegmentRangeInclusive>,
2310 max_block: u64,
2312 expected_block_ranges_by_max_block: SegmentRanges,
2318 available_block_ranges_by_max_tx: Option<SegmentRanges>,
2325}
2326
2327pub trait StaticFileWriter {
2329 type Primitives: Send + Sync + 'static;
2331
2332 fn get_writer(
2334 &self,
2335 block: BlockNumber,
2336 segment: StaticFileSegment,
2337 ) -> ProviderResult<StaticFileProviderRWRefMut<'_, Self::Primitives>>;
2338
2339 fn latest_writer(
2342 &self,
2343 segment: StaticFileSegment,
2344 ) -> ProviderResult<StaticFileProviderRWRefMut<'_, Self::Primitives>>;
2345
2346 fn commit(&self) -> ProviderResult<()>;
2348
2349 fn has_unwind_queued(&self) -> bool;
2351
2352 fn finalize(&self) -> ProviderResult<()>;
2356}
2357
2358impl<N: NodePrimitives> StaticFileWriter for StaticFileProvider<N> {
2359 type Primitives = N;
2360
2361 fn get_writer(
2362 &self,
2363 block: BlockNumber,
2364 segment: StaticFileSegment,
2365 ) -> ProviderResult<StaticFileProviderRWRefMut<'_, Self::Primitives>> {
2366 if self.access.is_read_only() {
2367 return Err(ProviderError::ReadOnlyStaticFileAccess);
2368 }
2369
2370 trace!(target: "providers::static_file", ?block, ?segment, "Getting static file writer.");
2371 self.writers.get_or_create(segment, || {
2372 StaticFileProviderRW::new(segment, block, Arc::downgrade(&self.0), self.metrics.clone())
2373 })
2374 }
2375
2376 fn latest_writer(
2377 &self,
2378 segment: StaticFileSegment,
2379 ) -> ProviderResult<StaticFileProviderRWRefMut<'_, Self::Primitives>> {
2380 let genesis_number = self.0.as_ref().genesis_block_number();
2381 self.get_writer(
2382 self.get_highest_static_file_block(segment).unwrap_or(genesis_number),
2383 segment,
2384 )
2385 }
2386
2387 fn commit(&self) -> ProviderResult<()> {
2388 self.writers.commit()
2389 }
2390
2391 fn has_unwind_queued(&self) -> bool {
2392 self.writers.has_unwind_queued()
2393 }
2394
2395 fn finalize(&self) -> ProviderResult<()> {
2396 self.writers.finalize()
2397 }
2398}
2399
2400impl<N: NodePrimitives> ChangeSetReader for StaticFileProvider<N> {
2401 fn account_block_changeset(
2402 &self,
2403 block_number: BlockNumber,
2404 ) -> ProviderResult<Vec<reth_db::models::AccountBeforeTx>> {
2405 let provider = match self.get_segment_provider_for_block(
2406 StaticFileSegment::AccountChangeSets,
2407 block_number,
2408 None,
2409 ) {
2410 Ok(provider) => provider,
2411 Err(ProviderError::MissingStaticFileBlock(_, _)) => return Ok(Vec::new()),
2412 Err(err) => return Err(err),
2413 };
2414
2415 if let Some(offset) = provider.read_changeset_offset(block_number)? {
2416 let mut cursor = provider.cursor()?;
2417 let mut changeset = Vec::with_capacity(offset.num_changes() as usize);
2418
2419 for i in offset.changeset_range() {
2420 if let Some(change) =
2421 cursor.get_one::<reth_db::static_file::AccountChangesetMask>(i.into())?
2422 {
2423 changeset.push(change)
2424 }
2425 }
2426 Ok(changeset)
2427 } else {
2428 Ok(Vec::new())
2429 }
2430 }
2431
2432 fn get_account_before_block(
2433 &self,
2434 block_number: BlockNumber,
2435 address: Address,
2436 ) -> ProviderResult<Option<reth_db::models::AccountBeforeTx>> {
2437 let provider = match self.get_segment_provider_for_block(
2438 StaticFileSegment::AccountChangeSets,
2439 block_number,
2440 None,
2441 ) {
2442 Ok(provider) => provider,
2443 Err(ProviderError::MissingStaticFileBlock(_, _)) => return Ok(None),
2444 Err(err) => return Err(err),
2445 };
2446
2447 let Some(offset) = provider.read_changeset_offset(block_number)? else {
2448 return Ok(None);
2449 };
2450
2451 let mut cursor = provider.cursor()?;
2452 let range = offset.changeset_range();
2453 let mut low = range.start;
2454 let mut high = range.end;
2455
2456 while low < high {
2457 let mid = low + (high - low) / 2;
2458 if let Some(change) =
2459 cursor.get_one::<reth_db::static_file::AccountChangesetMask>(mid.into())?
2460 {
2461 if change.address < address {
2462 low = mid + 1;
2463 } else {
2464 high = mid;
2465 }
2466 } else {
2467 debug!(
2470 target: "providers::static_file",
2471 ?low,
2472 ?mid,
2473 ?high,
2474 ?range,
2475 ?block_number,
2476 ?address,
2477 "Cannot continue binary search for account changeset fetch"
2478 );
2479 low = range.end;
2480 break;
2481 }
2482 }
2483
2484 if low < range.end &&
2485 let Some(change) = cursor
2486 .get_one::<reth_db::static_file::AccountChangesetMask>(low.into())?
2487 .filter(|change| change.address == address)
2488 {
2489 return Ok(Some(change));
2490 }
2491
2492 Ok(None)
2493 }
2494
2495 fn account_changesets_range(
2496 &self,
2497 range: impl core::ops::RangeBounds<BlockNumber>,
2498 ) -> ProviderResult<Vec<(BlockNumber, reth_db::models::AccountBeforeTx)>> {
2499 let range = self.bound_range(range, StaticFileSegment::AccountChangeSets);
2500 self.walk_account_changeset_range(range).collect()
2501 }
2502
2503 fn account_changeset_count(&self) -> ProviderResult<usize> {
2504 let mut count = 0;
2505
2506 let static_files = iter_static_files(&self.path).map_err(ProviderError::other)?;
2507 if let Some(changeset_segments) = static_files.get(StaticFileSegment::AccountChangeSets) {
2508 for (block_range, header) in changeset_segments {
2509 let csoff_path = self
2510 .path
2511 .join(StaticFileSegment::AccountChangeSets.filename(block_range))
2512 .with_extension("csoff");
2513 if csoff_path.exists() {
2514 let len = header.changeset_offsets_len();
2515 let mut reader = ChangesetOffsetReader::new(&csoff_path, len)
2516 .map_err(ProviderError::other)?;
2517 let offsets = reader.get_range(0, len).map_err(ProviderError::other)?;
2518 for offset in offsets {
2519 count += offset.num_changes() as usize;
2520 }
2521 }
2522 }
2523 }
2524
2525 Ok(count)
2526 }
2527}
2528
2529impl<N: NodePrimitives> StorageChangeSetReader for StaticFileProvider<N> {
2530 fn storage_changeset(
2531 &self,
2532 block_number: BlockNumber,
2533 ) -> ProviderResult<Vec<(BlockNumberAddress, StorageEntry)>> {
2534 let provider = match self.get_segment_provider_for_block(
2535 StaticFileSegment::StorageChangeSets,
2536 block_number,
2537 None,
2538 ) {
2539 Ok(provider) => provider,
2540 Err(ProviderError::MissingStaticFileBlock(_, _)) => return Ok(Vec::new()),
2541 Err(err) => return Err(err),
2542 };
2543
2544 if let Some(offset) = provider.read_changeset_offset(block_number)? {
2545 let mut cursor = provider.cursor()?;
2546 let mut changeset = Vec::with_capacity(offset.num_changes() as usize);
2547
2548 for i in offset.changeset_range() {
2549 if let Some(change) = cursor.get_one::<StorageChangesetMask>(i.into())? {
2550 let block_address = BlockNumberAddress((block_number, change.address));
2551 let entry = StorageEntry { key: change.key, value: change.value };
2552 changeset.push((block_address, entry));
2553 }
2554 }
2555 Ok(changeset)
2556 } else {
2557 Ok(Vec::new())
2558 }
2559 }
2560
2561 fn get_storage_before_block(
2562 &self,
2563 block_number: BlockNumber,
2564 address: Address,
2565 storage_key: B256,
2566 ) -> ProviderResult<Option<StorageEntry>> {
2567 let provider = match self.get_segment_provider_for_block(
2568 StaticFileSegment::StorageChangeSets,
2569 block_number,
2570 None,
2571 ) {
2572 Ok(provider) => provider,
2573 Err(ProviderError::MissingStaticFileBlock(_, _)) => return Ok(None),
2574 Err(err) => return Err(err),
2575 };
2576
2577 let Some(offset) = provider.read_changeset_offset(block_number)? else {
2578 return Ok(None);
2579 };
2580
2581 let mut cursor = provider.cursor()?;
2582 let range = offset.changeset_range();
2583 let mut low = range.start;
2584 let mut high = range.end;
2585
2586 while low < high {
2587 let mid = low + (high - low) / 2;
2588 if let Some(change) = cursor.get_one::<StorageChangesetMask>(mid.into())? {
2589 match (change.address, change.key).cmp(&(address, storage_key)) {
2590 std::cmp::Ordering::Less => low = mid + 1,
2591 _ => high = mid,
2592 }
2593 } else {
2594 debug!(
2595 target: "providers::static_file",
2596 ?low,
2597 ?mid,
2598 ?high,
2599 ?range,
2600 ?block_number,
2601 ?address,
2602 ?storage_key,
2603 "Cannot continue binary search for storage changeset fetch"
2604 );
2605 low = range.end;
2606 break;
2607 }
2608 }
2609
2610 if low < range.end &&
2611 let Some(change) = cursor
2612 .get_one::<StorageChangesetMask>(low.into())?
2613 .filter(|change| change.address == address && change.key == storage_key)
2614 {
2615 return Ok(Some(StorageEntry { key: change.key, value: change.value }));
2616 }
2617
2618 Ok(None)
2619 }
2620
2621 fn storage_changesets_range(
2622 &self,
2623 range: impl RangeBounds<BlockNumber>,
2624 ) -> ProviderResult<Vec<(BlockNumberAddress, StorageEntry)>> {
2625 let range = self.bound_range(range, StaticFileSegment::StorageChangeSets);
2626 self.walk_storage_changeset_range(range).collect()
2627 }
2628
2629 fn storage_changeset_count(&self) -> ProviderResult<usize> {
2630 let mut count = 0;
2631
2632 let static_files = iter_static_files(&self.path).map_err(ProviderError::other)?;
2633 if let Some(changeset_segments) = static_files.get(StaticFileSegment::StorageChangeSets) {
2634 for (block_range, header) in changeset_segments {
2635 let csoff_path = self
2636 .path
2637 .join(StaticFileSegment::StorageChangeSets.filename(block_range))
2638 .with_extension("csoff");
2639 if csoff_path.exists() {
2640 let len = header.changeset_offsets_len();
2641 let mut reader = ChangesetOffsetReader::new(&csoff_path, len)
2642 .map_err(ProviderError::other)?;
2643 let offsets = reader.get_range(0, len).map_err(ProviderError::other)?;
2644 for offset in offsets {
2645 count += offset.num_changes() as usize;
2646 }
2647 }
2648 }
2649 }
2650
2651 Ok(count)
2652 }
2653}
2654
2655impl<N: NodePrimitives> StaticFileProvider<N> {
2656 pub fn walk_account_changeset_range(
2666 &self,
2667 range: impl RangeBounds<BlockNumber>,
2668 ) -> StaticFileAccountChangesetWalker<Self> {
2669 StaticFileAccountChangesetWalker::new(self.clone(), range)
2670 }
2671
2672 pub fn walk_storage_changeset_range(
2674 &self,
2675 range: impl RangeBounds<BlockNumber>,
2676 ) -> StaticFileStorageChangesetWalker<Self> {
2677 StaticFileStorageChangesetWalker::new(self.clone(), range)
2678 }
2679}
2680
2681impl<N: NodePrimitives<BlockHeader: Value>> HeaderProvider for StaticFileProvider<N> {
2682 type Header = N::BlockHeader;
2683
2684 fn header(&self, block_hash: BlockHash) -> ProviderResult<Option<Self::Header>> {
2685 self.find_static_file(StaticFileSegment::Headers, |jar_provider| {
2686 Ok(jar_provider
2687 .cursor()?
2688 .get_two::<HeaderWithHashMask<Self::Header>>((&block_hash).into())?
2689 .and_then(|(header, hash)| {
2690 if hash == block_hash {
2691 return Some(header);
2692 }
2693 None
2694 }))
2695 })
2696 }
2697
2698 fn header_by_number(&self, num: BlockNumber) -> ProviderResult<Option<Self::Header>> {
2699 self.get_segment_provider_for_block(StaticFileSegment::Headers, num, None)
2700 .and_then(|provider| provider.header_by_number(num))
2701 .or_else(|err| {
2702 if let ProviderError::MissingStaticFileBlock(_, _) = err {
2703 Ok(None)
2704 } else {
2705 Err(err)
2706 }
2707 })
2708 }
2709
2710 fn headers_range(
2711 &self,
2712 range: impl RangeBounds<BlockNumber>,
2713 ) -> ProviderResult<Vec<Self::Header>> {
2714 self.fetch_range_with_predicate(
2715 StaticFileSegment::Headers,
2716 to_range(range),
2717 |cursor, number| cursor.get_one::<HeaderMask<Self::Header>>(number.into()),
2718 |_| true,
2719 )
2720 }
2721
2722 fn sealed_header(
2723 &self,
2724 num: BlockNumber,
2725 ) -> ProviderResult<Option<SealedHeader<Self::Header>>> {
2726 self.get_segment_provider_for_block(StaticFileSegment::Headers, num, None)
2727 .and_then(|provider| provider.sealed_header(num))
2728 .or_else(|err| {
2729 if let ProviderError::MissingStaticFileBlock(_, _) = err {
2730 Ok(None)
2731 } else {
2732 Err(err)
2733 }
2734 })
2735 }
2736
2737 fn sealed_headers_while(
2738 &self,
2739 range: impl RangeBounds<BlockNumber>,
2740 predicate: impl FnMut(&SealedHeader<Self::Header>) -> bool,
2741 ) -> ProviderResult<Vec<SealedHeader<Self::Header>>> {
2742 self.fetch_range_with_predicate(
2743 StaticFileSegment::Headers,
2744 to_range(range),
2745 |cursor, number| {
2746 Ok(cursor
2747 .get_two::<HeaderWithHashMask<Self::Header>>(number.into())?
2748 .map(|(header, hash)| SealedHeader::new(header, hash)))
2749 },
2750 predicate,
2751 )
2752 }
2753}
2754
2755impl<N: NodePrimitives> BlockHashReader for StaticFileProvider<N> {
2756 fn block_hash(&self, num: u64) -> ProviderResult<Option<B256>> {
2757 self.get_segment_provider_for_block(StaticFileSegment::Headers, num, None)
2758 .and_then(|provider| provider.block_hash(num))
2759 .or_else(|err| {
2760 if let ProviderError::MissingStaticFileBlock(_, _) = err {
2761 Ok(None)
2762 } else {
2763 Err(err)
2764 }
2765 })
2766 }
2767
2768 fn canonical_hashes_range(
2769 &self,
2770 start: BlockNumber,
2771 end: BlockNumber,
2772 ) -> ProviderResult<Vec<B256>> {
2773 self.fetch_range_with_predicate(
2774 StaticFileSegment::Headers,
2775 start..end,
2776 |cursor, number| cursor.get_one::<BlockHashMask>(number.into()),
2777 |_| true,
2778 )
2779 }
2780}
2781
2782impl<N: NodePrimitives<SignedTx: Value + SignedTransaction, Receipt: Value>> ReceiptProvider
2783 for StaticFileProvider<N>
2784{
2785 type Receipt = N::Receipt;
2786
2787 fn receipt(&self, num: TxNumber) -> ProviderResult<Option<Self::Receipt>> {
2788 self.get_segment_provider_for_transaction(StaticFileSegment::Receipts, num, None)
2789 .and_then(|provider| provider.receipt(num))
2790 .or_else(|err| {
2791 if let ProviderError::MissingStaticFileTx(_, _) = err {
2792 Ok(None)
2793 } else {
2794 Err(err)
2795 }
2796 })
2797 }
2798
2799 fn receipt_by_hash(&self, hash: TxHash) -> ProviderResult<Option<Self::Receipt>> {
2800 if let Some(num) = self.transaction_id(hash)? {
2801 return self.receipt(num);
2802 }
2803 Ok(None)
2804 }
2805
2806 fn receipts_by_block(
2807 &self,
2808 _block: BlockHashOrNumber,
2809 ) -> ProviderResult<Option<Vec<Self::Receipt>>> {
2810 unreachable!()
2811 }
2812
2813 fn receipts_by_tx_range(
2814 &self,
2815 range: impl RangeBounds<TxNumber>,
2816 ) -> ProviderResult<Vec<Self::Receipt>> {
2817 self.fetch_range_with_predicate(
2818 StaticFileSegment::Receipts,
2819 to_range(range),
2820 |cursor, number| cursor.get_one::<ReceiptMask<Self::Receipt>>(number.into()),
2821 |_| true,
2822 )
2823 }
2824
2825 fn receipts_by_block_range(
2826 &self,
2827 _block_range: RangeInclusive<BlockNumber>,
2828 ) -> ProviderResult<Vec<Vec<Self::Receipt>>> {
2829 Err(ProviderError::UnsupportedProvider)
2830 }
2831}
2832
2833impl<N: NodePrimitives<SignedTx: Value, Receipt: Value, BlockHeader: Value>> TransactionsProviderExt
2834 for StaticFileProvider<N>
2835{
2836 fn transaction_hashes_by_range(
2837 &self,
2838 tx_range: Range<TxNumber>,
2839 ) -> ProviderResult<Vec<(TxHash, TxNumber)>> {
2840 let tx_range_size = (tx_range.end - tx_range.start) as usize;
2841
2842 let chunk_size = 100;
2846
2847 let chunks = tx_range
2849 .clone()
2850 .step_by(chunk_size)
2851 .map(|start| start..std::cmp::min(start + chunk_size as u64, tx_range.end));
2852 let mut channels = Vec::with_capacity(tx_range_size.div_ceil(chunk_size));
2853
2854 for chunk_range in chunks {
2855 let (channel_tx, channel_rx) = mpsc::channel();
2856 channels.push(channel_rx);
2857
2858 let manager = self.clone();
2859
2860 rayon::spawn(move || {
2864 let mut rlp_buf = Vec::with_capacity(128);
2865 let _ = manager.fetch_range_with_predicate(
2866 StaticFileSegment::Transactions,
2867 chunk_range,
2868 |cursor, number| {
2869 Ok(cursor
2870 .get_one::<TransactionMask<Self::Transaction>>(number.into())?
2871 .map(|transaction| {
2872 rlp_buf.clear();
2873 let _ = channel_tx
2874 .send(calculate_hash((number, transaction), &mut rlp_buf));
2875 }))
2876 },
2877 |_| true,
2878 );
2879 });
2880 }
2881
2882 let mut tx_list = Vec::with_capacity(tx_range_size);
2883
2884 for channel in channels {
2886 while let Ok(tx) = channel.recv() {
2887 let (tx_hash, tx_id) = tx.map_err(|boxed| *boxed)?;
2888 tx_list.push((tx_hash, tx_id));
2889 }
2890 }
2891
2892 Ok(tx_list)
2893 }
2894}
2895
2896impl<N: NodePrimitives<SignedTx: Decompress + SignedTransaction>> TransactionsProvider
2897 for StaticFileProvider<N>
2898{
2899 type Transaction = N::SignedTx;
2900
2901 fn transaction_id(&self, tx_hash: TxHash) -> ProviderResult<Option<TxNumber>> {
2902 self.find_static_file(StaticFileSegment::Transactions, |jar_provider| {
2903 let mut cursor = jar_provider.cursor()?;
2904 if cursor
2905 .get_one::<TransactionMask<Self::Transaction>>((&tx_hash).into())?
2906 .and_then(|tx| (tx.trie_hash() == tx_hash).then_some(tx))
2907 .is_some()
2908 {
2909 Ok(cursor.number())
2910 } else {
2911 Ok(None)
2912 }
2913 })
2914 }
2915
2916 fn transaction_by_id(&self, num: TxNumber) -> ProviderResult<Option<Self::Transaction>> {
2917 self.get_segment_provider_for_transaction(StaticFileSegment::Transactions, num, None)
2918 .and_then(|provider| provider.transaction_by_id(num))
2919 .or_else(|err| {
2920 if let ProviderError::MissingStaticFileTx(_, _) = err {
2921 Ok(None)
2922 } else {
2923 Err(err)
2924 }
2925 })
2926 }
2927
2928 fn transaction_by_id_unhashed(
2929 &self,
2930 num: TxNumber,
2931 ) -> ProviderResult<Option<Self::Transaction>> {
2932 self.get_segment_provider_for_transaction(StaticFileSegment::Transactions, num, None)
2933 .and_then(|provider| provider.transaction_by_id_unhashed(num))
2934 .or_else(|err| {
2935 if let ProviderError::MissingStaticFileTx(_, _) = err {
2936 Ok(None)
2937 } else {
2938 Err(err)
2939 }
2940 })
2941 }
2942
2943 fn transaction_by_hash(&self, hash: TxHash) -> ProviderResult<Option<Self::Transaction>> {
2944 self.find_static_file(StaticFileSegment::Transactions, |jar_provider| {
2945 Ok(jar_provider
2946 .cursor()?
2947 .get_one::<TransactionMask<Self::Transaction>>((&hash).into())?
2948 .and_then(|tx| (tx.trie_hash() == hash).then_some(tx)))
2949 })
2950 }
2951
2952 fn transaction_by_hash_with_meta(
2953 &self,
2954 _hash: TxHash,
2955 ) -> ProviderResult<Option<(Self::Transaction, TransactionMeta)>> {
2956 Err(ProviderError::UnsupportedProvider)
2958 }
2959
2960 fn transactions_by_block(
2961 &self,
2962 _block_id: BlockHashOrNumber,
2963 ) -> ProviderResult<Option<Vec<Self::Transaction>>> {
2964 Err(ProviderError::UnsupportedProvider)
2966 }
2967
2968 fn transactions_by_block_range(
2969 &self,
2970 _range: impl RangeBounds<BlockNumber>,
2971 ) -> ProviderResult<Vec<Vec<Self::Transaction>>> {
2972 Err(ProviderError::UnsupportedProvider)
2974 }
2975
2976 fn transactions_by_tx_range(
2977 &self,
2978 range: impl RangeBounds<TxNumber>,
2979 ) -> ProviderResult<Vec<Self::Transaction>> {
2980 self.fetch_range_with_predicate(
2981 StaticFileSegment::Transactions,
2982 to_range(range),
2983 |cursor, number| cursor.get_one::<TransactionMask<Self::Transaction>>(number.into()),
2984 |_| true,
2985 )
2986 }
2987
2988 fn senders_by_tx_range(
2989 &self,
2990 range: impl RangeBounds<TxNumber>,
2991 ) -> ProviderResult<Vec<Address>> {
2992 self.fetch_range_with_predicate(
2993 StaticFileSegment::TransactionSenders,
2994 to_range(range),
2995 |cursor, number| cursor.get_one::<TransactionSenderMask>(number.into()),
2996 |_| true,
2997 )
2998 }
2999
3000 fn transaction_sender(&self, id: TxNumber) -> ProviderResult<Option<Address>> {
3001 self.get_segment_provider_for_transaction(StaticFileSegment::TransactionSenders, id, None)
3002 .and_then(|provider| provider.transaction_sender(id))
3003 .or_else(|err| {
3004 if let ProviderError::MissingStaticFileTx(_, _) = err {
3005 Ok(None)
3006 } else {
3007 Err(err)
3008 }
3009 })
3010 }
3011}
3012
3013impl<N: NodePrimitives> BlockNumReader for StaticFileProvider<N> {
3014 fn chain_info(&self) -> ProviderResult<ChainInfo> {
3015 Err(ProviderError::UnsupportedProvider)
3017 }
3018
3019 fn best_block_number(&self) -> ProviderResult<BlockNumber> {
3020 Err(ProviderError::UnsupportedProvider)
3022 }
3023
3024 fn last_block_number(&self) -> ProviderResult<BlockNumber> {
3025 Ok(self.get_highest_static_file_block(StaticFileSegment::Headers).unwrap_or_default())
3026 }
3027
3028 fn block_number(&self, _hash: B256) -> ProviderResult<Option<BlockNumber>> {
3029 Err(ProviderError::UnsupportedProvider)
3031 }
3032}
3033
3034impl<N: NodePrimitives<SignedTx: Value, Receipt: Value, BlockHeader: Value>> BlockReader
3037 for StaticFileProvider<N>
3038{
3039 type Block = N::Block;
3040
3041 fn find_block_by_hash(
3042 &self,
3043 _hash: B256,
3044 _source: BlockSource,
3045 ) -> ProviderResult<Option<Self::Block>> {
3046 Err(ProviderError::UnsupportedProvider)
3048 }
3049
3050 fn block(&self, _id: BlockHashOrNumber) -> ProviderResult<Option<Self::Block>> {
3051 Err(ProviderError::UnsupportedProvider)
3053 }
3054
3055 fn pending_block(&self) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
3056 Err(ProviderError::UnsupportedProvider)
3058 }
3059
3060 fn pending_block_and_receipts(
3061 &self,
3062 ) -> ProviderResult<Option<(RecoveredBlock<Self::Block>, Vec<Self::Receipt>)>> {
3063 Err(ProviderError::UnsupportedProvider)
3065 }
3066
3067 fn recovered_block(
3068 &self,
3069 _id: BlockHashOrNumber,
3070 _transaction_kind: TransactionVariant,
3071 ) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
3072 Err(ProviderError::UnsupportedProvider)
3074 }
3075
3076 fn sealed_block_with_senders(
3077 &self,
3078 _id: BlockHashOrNumber,
3079 _transaction_kind: TransactionVariant,
3080 ) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
3081 Err(ProviderError::UnsupportedProvider)
3083 }
3084
3085 fn block_range(&self, _range: RangeInclusive<BlockNumber>) -> ProviderResult<Vec<Self::Block>> {
3086 Err(ProviderError::UnsupportedProvider)
3088 }
3089
3090 fn block_with_senders_range(
3091 &self,
3092 _range: RangeInclusive<BlockNumber>,
3093 ) -> ProviderResult<Vec<RecoveredBlock<Self::Block>>> {
3094 Err(ProviderError::UnsupportedProvider)
3095 }
3096
3097 fn recovered_block_range(
3098 &self,
3099 _range: RangeInclusive<BlockNumber>,
3100 ) -> ProviderResult<Vec<RecoveredBlock<Self::Block>>> {
3101 Err(ProviderError::UnsupportedProvider)
3102 }
3103
3104 fn block_by_transaction_id(&self, _id: TxNumber) -> ProviderResult<Option<BlockNumber>> {
3105 Err(ProviderError::UnsupportedProvider)
3106 }
3107}
3108
3109impl<N: NodePrimitives> BlockBodyIndicesProvider for StaticFileProvider<N> {
3110 fn block_body_indices(&self, _num: u64) -> ProviderResult<Option<StoredBlockBodyIndices>> {
3111 Err(ProviderError::UnsupportedProvider)
3112 }
3113
3114 fn block_body_indices_range(
3115 &self,
3116 _range: RangeInclusive<BlockNumber>,
3117 ) -> ProviderResult<Vec<StoredBlockBodyIndices>> {
3118 Err(ProviderError::UnsupportedProvider)
3119 }
3120}
3121
3122impl<N: NodePrimitives> StatsReader for StaticFileProvider<N> {
3123 fn count_entries<T: Table>(&self) -> ProviderResult<usize> {
3124 match T::NAME {
3125 tables::CanonicalHeaders::NAME |
3126 tables::Headers::<Header>::NAME |
3127 tables::HeaderTerminalDifficulties::NAME => Ok(self
3128 .get_highest_static_file_block(StaticFileSegment::Headers)
3129 .map(|block| block + 1)
3130 .unwrap_or_default()
3131 as usize),
3132 tables::Receipts::<Receipt>::NAME => Ok(self
3133 .get_highest_static_file_tx(StaticFileSegment::Receipts)
3134 .map(|receipts| receipts + 1)
3135 .unwrap_or_default() as usize),
3136 tables::Transactions::<TransactionSigned>::NAME => Ok(self
3137 .get_highest_static_file_tx(StaticFileSegment::Transactions)
3138 .map(|txs| txs + 1)
3139 .unwrap_or_default()
3140 as usize),
3141 tables::TransactionSenders::NAME => Ok(self
3142 .get_highest_static_file_tx(StaticFileSegment::TransactionSenders)
3143 .map(|txs| txs + 1)
3144 .unwrap_or_default() as usize),
3145 _ => Err(ProviderError::UnsupportedProvider),
3146 }
3147 }
3148}
3149
3150#[inline]
3152fn calculate_hash<T>(
3153 entry: (TxNumber, T),
3154 rlp_buf: &mut Vec<u8>,
3155) -> Result<(B256, TxNumber), Box<ProviderError>>
3156where
3157 T: Encodable2718,
3158{
3159 let (tx_id, tx) = entry;
3160 tx.encode_2718(rlp_buf);
3161 Ok((keccak256(rlp_buf), tx_id))
3162}
3163
3164#[cfg(test)]
3165mod tests {
3166 use std::collections::BTreeMap;
3167
3168 use reth_chain_state::EthPrimitives;
3169 use reth_db::test_utils::create_test_static_files_dir;
3170 use reth_static_file_types::{SegmentRangeInclusive, StaticFileSegment};
3171
3172 use crate::{providers::StaticFileProvider, StaticFileProviderBuilder};
3173
3174 #[test]
3175 fn test_find_fixed_range_with_block_index() -> eyre::Result<()> {
3176 let (static_dir, _) = create_test_static_files_dir();
3177 let sf_rw: StaticFileProvider<EthPrimitives> =
3178 StaticFileProviderBuilder::read_write(&static_dir).with_blocks_per_file(100).build()?;
3179
3180 let segment = StaticFileSegment::Headers;
3181
3182 assert_eq!(
3184 sf_rw.find_fixed_range_with_block_index(segment, None, 0),
3185 SegmentRangeInclusive::new(0, 99)
3186 );
3187 assert_eq!(
3188 sf_rw.find_fixed_range_with_block_index(segment, None, 250),
3189 SegmentRangeInclusive::new(200, 299)
3190 );
3191
3192 assert_eq!(
3194 sf_rw.find_fixed_range_with_block_index(segment, Some(&BTreeMap::new()), 150),
3195 SegmentRangeInclusive::new(100, 199)
3196 );
3197
3198 let block_index = BTreeMap::from_iter([
3200 (99, SegmentRangeInclusive::new(0, 99)),
3201 (199, SegmentRangeInclusive::new(100, 199)),
3202 (299, SegmentRangeInclusive::new(200, 299)),
3203 ]);
3204
3205 assert_eq!(
3207 sf_rw.find_fixed_range_with_block_index(segment, Some(&block_index), 0),
3208 SegmentRangeInclusive::new(0, 99)
3209 );
3210 assert_eq!(
3211 sf_rw.find_fixed_range_with_block_index(segment, Some(&block_index), 50),
3212 SegmentRangeInclusive::new(0, 99)
3213 );
3214 assert_eq!(
3215 sf_rw.find_fixed_range_with_block_index(segment, Some(&block_index), 99),
3216 SegmentRangeInclusive::new(0, 99)
3217 );
3218 assert_eq!(
3219 sf_rw.find_fixed_range_with_block_index(segment, Some(&block_index), 100),
3220 SegmentRangeInclusive::new(100, 199)
3221 );
3222 assert_eq!(
3223 sf_rw.find_fixed_range_with_block_index(segment, Some(&block_index), 150),
3224 SegmentRangeInclusive::new(100, 199)
3225 );
3226 assert_eq!(
3227 sf_rw.find_fixed_range_with_block_index(segment, Some(&block_index), 199),
3228 SegmentRangeInclusive::new(100, 199)
3229 );
3230
3231 assert_eq!(
3234 sf_rw.find_fixed_range_with_block_index(segment, Some(&block_index), 300),
3235 SegmentRangeInclusive::new(300, 399)
3236 );
3237 assert_eq!(
3238 sf_rw.find_fixed_range_with_block_index(segment, Some(&block_index), 350),
3239 SegmentRangeInclusive::new(300, 399)
3240 );
3241
3242 assert_eq!(
3244 sf_rw.find_fixed_range_with_block_index(segment, Some(&block_index), 500),
3245 SegmentRangeInclusive::new(500, 599)
3246 );
3247
3248 assert_eq!(
3250 sf_rw.find_fixed_range_with_block_index(segment, Some(&block_index), 1000),
3251 SegmentRangeInclusive::new(1000, 1099)
3252 );
3253
3254 let mixed_size_index = BTreeMap::from_iter([
3257 (49, SegmentRangeInclusive::new(0, 49)), (149, SegmentRangeInclusive::new(50, 149)), (349, SegmentRangeInclusive::new(150, 349)), ]);
3261
3262 assert_eq!(
3264 sf_rw.find_fixed_range_with_block_index(segment, Some(&mixed_size_index), 25),
3265 SegmentRangeInclusive::new(0, 49)
3266 );
3267 assert_eq!(
3268 sf_rw.find_fixed_range_with_block_index(segment, Some(&mixed_size_index), 100),
3269 SegmentRangeInclusive::new(50, 149)
3270 );
3271 assert_eq!(
3272 sf_rw.find_fixed_range_with_block_index(segment, Some(&mixed_size_index), 200),
3273 SegmentRangeInclusive::new(150, 349)
3274 );
3275
3276 assert_eq!(
3279 sf_rw.find_fixed_range_with_block_index(segment, Some(&mixed_size_index), 350),
3280 SegmentRangeInclusive::new(350, 449)
3281 );
3282 assert_eq!(
3283 sf_rw.find_fixed_range_with_block_index(segment, Some(&mixed_size_index), 450),
3284 SegmentRangeInclusive::new(450, 549)
3285 );
3286 assert_eq!(
3287 sf_rw.find_fixed_range_with_block_index(segment, Some(&mixed_size_index), 550),
3288 SegmentRangeInclusive::new(550, 649)
3289 );
3290
3291 Ok(())
3292 }
3293}