1use super::{
2 metrics::StaticFileProviderMetrics, writer::StaticFileWriters, LoadedJar,
3 StaticFileJarProvider, StaticFileProviderRW, StaticFileProviderRWRefMut,
4};
5use crate::{
6 changeset_walker::{StaticFileAccountChangesetWalker, StaticFileStorageChangesetWalker},
7 to_range, BlockHashReader, BlockNumReader, BlockReader, BlockSource, EitherWriter,
8 EitherWriterDestination, HeaderProvider, ReceiptProvider, StageCheckpointReader, StatsReader,
9 TransactionVariant, TransactionsProvider, TransactionsProviderExt,
10};
11use alloy_consensus::{transaction::TransactionMeta, Header};
12use alloy_eips::{eip2718::Encodable2718, BlockHashOrNumber};
13use alloy_primitives::{b256, keccak256, Address, BlockHash, BlockNumber, TxHash, TxNumber, B256};
14use notify::{RecommendedWatcher, RecursiveMode, Watcher};
15use parking_lot::RwLock;
16use reth_chain_state::ExecutedBlock;
17use reth_chainspec::{ChainInfo, ChainSpecProvider, EthChainSpec, NamedChain};
18use reth_db::{
19 lockfile::StorageLock,
20 static_file::{
21 iter_static_files, BlockHashMask, HeaderMask, HeaderWithHashMask, ReceiptMask,
22 StaticFileCursor, StorageChangesetMask, TransactionMask, TransactionSenderMask,
23 },
24};
25use reth_db_api::{
26 cursor::DbCursorRO,
27 models::{AccountBeforeTx, BlockNumberAddress, StorageBeforeTx, StoredBlockBodyIndices},
28 table::{Decompress, Table, Value},
29 tables,
30 transaction::DbTx,
31};
32use reth_ethereum_primitives::{Receipt, TransactionSigned};
33use reth_nippy_jar::{NippyJar, NippyJarChecker, CONFIG_FILE_EXTENSION};
34use reth_node_types::NodePrimitives;
35use reth_primitives_traits::{
36 dashmap::DashMap, AlloyBlockHeader as _, BlockBody as _, RecoveredBlock, SealedHeader,
37 SignedTransaction, StorageSlotKey,
38};
39use reth_prune_types::PruneSegment;
40use reth_stages_types::PipelineTarget;
41use reth_static_file_types::{
42 find_fixed_range, ChangesetOffsetReader, HighestStaticFiles, SegmentHeader,
43 SegmentRangeInclusive, StaticFileMap, StaticFileSegment, DEFAULT_BLOCKS_PER_STATIC_FILE,
44};
45use reth_storage_api::{
46 BlockBodyIndicesProvider, ChangeSetReader, ChangesetEntry, DBProvider, PruneCheckpointReader,
47 StorageChangeSetReader, StorageSettingsCache,
48};
49use reth_storage_errors::provider::{ProviderError, ProviderResult, StaticFileWriterError};
50use std::{
51 collections::BTreeMap,
52 fmt::Debug,
53 ops::{Bound, Deref, Range, RangeBounds, RangeInclusive},
54 path::{Path, PathBuf},
55 sync::{atomic::AtomicU64, mpsc, Arc},
56};
57use tracing::{debug, info, info_span, instrument, trace, warn};
58
59type SegmentRanges = BTreeMap<u64, SegmentRangeInclusive>;
62
63#[derive(Debug, Default, PartialEq, Eq)]
65pub enum StaticFileAccess {
66 #[default]
68 RO,
69 RW,
71}
72
73impl StaticFileAccess {
74 pub const fn is_read_only(&self) -> bool {
76 matches!(self, Self::RO)
77 }
78
79 pub const fn is_read_write(&self) -> bool {
81 matches!(self, Self::RW)
82 }
83}
84
85#[derive(Debug, Clone, Copy, Default)]
89pub struct StaticFileWriteCtx {
90 pub write_senders: bool,
92 pub write_receipts: bool,
94 pub write_account_changesets: bool,
96 pub write_storage_changesets: bool,
98 pub tip: BlockNumber,
100 pub receipts_prune_mode: Option<reth_prune_types::PruneMode>,
102 pub receipts_prunable: bool,
104}
105
106#[derive(Debug)]
115pub struct StaticFileProvider<N>(pub(crate) Arc<StaticFileProviderInner<N>>);
116
117impl<N> Clone for StaticFileProvider<N> {
118 fn clone(&self) -> Self {
119 Self(self.0.clone())
120 }
121}
122
123#[derive(Debug)]
125pub struct StaticFileProviderBuilder<P> {
126 access: StaticFileAccess,
127 use_metrics: bool,
128 blocks_per_file: StaticFileMap<u64>,
129 path: P,
130 genesis_block_number: u64,
131}
132
133impl<P: AsRef<Path>> StaticFileProviderBuilder<P> {
134 pub fn read_write(path: P) -> Self {
136 Self {
137 path,
138 access: StaticFileAccess::RW,
139 blocks_per_file: Default::default(),
140 use_metrics: false,
141 genesis_block_number: 0,
142 }
143 }
144
145 pub fn read_only(path: P) -> Self {
147 Self {
148 path,
149 access: StaticFileAccess::RO,
150 blocks_per_file: Default::default(),
151 use_metrics: false,
152 genesis_block_number: 0,
153 }
154 }
155
156 pub fn with_blocks_per_file_for_segments(
168 mut self,
169 segments: &<StaticFileMap<u64> as Deref>::Target,
170 ) -> Self {
171 for (segment, &blocks_per_file) in segments {
172 self.blocks_per_file.insert(segment, blocks_per_file);
173 }
174 self
175 }
176
177 pub fn with_blocks_per_file(mut self, blocks_per_file: u64) -> Self {
179 for segment in StaticFileSegment::iter() {
180 self.blocks_per_file.insert(segment, blocks_per_file);
181 }
182 self
183 }
184
185 pub fn with_blocks_per_file_for_segment(
187 mut self,
188 segment: StaticFileSegment,
189 blocks_per_file: u64,
190 ) -> Self {
191 self.blocks_per_file.insert(segment, blocks_per_file);
192 self
193 }
194
195 pub const fn with_metrics(mut self) -> Self {
197 self.use_metrics = true;
198 self
199 }
200
201 pub const fn with_genesis_block_number(mut self, genesis_block_number: u64) -> Self {
214 self.genesis_block_number = genesis_block_number;
215 self
216 }
217
218 pub fn build<N: NodePrimitives>(self) -> ProviderResult<StaticFileProvider<N>> {
220 let mut provider = StaticFileProviderInner::new(self.path, self.access)?;
221 if self.use_metrics {
222 provider.metrics = Some(Arc::new(StaticFileProviderMetrics::default()));
223 }
224
225 for (segment, blocks_per_file) in *self.blocks_per_file {
226 provider.blocks_per_file.insert(segment, blocks_per_file);
227 }
228 provider.genesis_block_number = self.genesis_block_number;
229
230 let provider = StaticFileProvider(Arc::new(provider));
231 provider.initialize_index()?;
232 Ok(provider)
233 }
234}
235
236impl<N: NodePrimitives> StaticFileProvider<N> {
237 fn new(path: impl AsRef<Path>, access: StaticFileAccess) -> ProviderResult<Self> {
239 let provider = Self(Arc::new(StaticFileProviderInner::new(path, access)?));
240 provider.initialize_index()?;
241 Ok(provider)
242 }
243}
244
245impl<N: NodePrimitives> StaticFileProvider<N> {
246 pub fn read_only(path: impl AsRef<Path>, watch_directory: bool) -> ProviderResult<Self> {
256 let provider = Self::new(path, StaticFileAccess::RO)?;
257
258 if watch_directory {
259 provider.watch_directory();
260 }
261
262 Ok(provider)
263 }
264
265 pub fn read_write(path: impl AsRef<Path>) -> ProviderResult<Self> {
267 Self::new(path, StaticFileAccess::RW)
268 }
269
270 pub fn watch_directory(&self) {
276 let provider = self.clone();
277 reth_tasks::spawn_os_thread("sf-watch", move || {
278 let (tx, rx) = std::sync::mpsc::channel();
279 let mut watcher = RecommendedWatcher::new(
280 move |res| tx.send(res).unwrap(),
281 notify::Config::default(),
282 )
283 .expect("failed to create watcher");
284
285 watcher
286 .watch(&provider.path, RecursiveMode::NonRecursive)
287 .expect("failed to watch path");
288
289 let mut last_event_timestamp = None;
291
292 while let Ok(res) = rx.recv() {
293 match res {
294 Ok(event) => {
295 if !matches!(
297 event.kind,
298 notify::EventKind::Modify(_) |
299 notify::EventKind::Create(_) |
300 notify::EventKind::Remove(_)
301 ) {
302 continue;
303 }
304
305 for segment in event.paths {
310 if segment
312 .extension()
313 .is_none_or(|s| s.to_str() != Some(CONFIG_FILE_EXTENSION))
314 {
315 continue;
316 }
317
318 if StaticFileSegment::parse_filename(
320 &segment.file_stem().expect("qed").to_string_lossy(),
321 )
322 .is_none()
323 {
324 continue;
325 }
326
327 if let Ok(current_modified_timestamp) =
330 std::fs::metadata(&segment).and_then(|m| m.modified())
331 {
332 if last_event_timestamp.is_some_and(|last_timestamp| {
333 last_timestamp >= current_modified_timestamp
334 }) {
335 continue;
336 }
337 last_event_timestamp = Some(current_modified_timestamp);
338 }
339
340 info!(target: "providers::static_file", updated_file = ?segment.file_stem(), "re-initializing static file provider index");
341 if let Err(err) = provider.initialize_index() {
342 warn!(target: "providers::static_file", "failed to re-initialize index: {err}");
343 }
344 break;
345 }
346 }
347
348 Err(err) => warn!(target: "providers::watcher", "watch error: {err:?}"),
349 }
350 }
351 });
352 }
353}
354
355impl<N: NodePrimitives> Deref for StaticFileProvider<N> {
356 type Target = StaticFileProviderInner<N>;
357
358 fn deref(&self) -> &Self::Target {
359 &self.0
360 }
361}
362
363#[derive(Debug)]
365pub struct StaticFileProviderInner<N> {
366 map: DashMap<(BlockNumber, StaticFileSegment), LoadedJar>,
369 indexes: RwLock<StaticFileMap<StaticFileSegmentIndex>>,
371 earliest_history_height: AtomicU64,
382 path: PathBuf,
384 writers: StaticFileWriters<N>,
386 metrics: Option<Arc<StaticFileProviderMetrics>>,
388 access: StaticFileAccess,
390 blocks_per_file: StaticFileMap<u64>,
392 _lock_file: Option<StorageLock>,
394 genesis_block_number: u64,
396}
397
398impl<N: NodePrimitives> StaticFileProviderInner<N> {
399 fn new(path: impl AsRef<Path>, access: StaticFileAccess) -> ProviderResult<Self> {
401 let _lock_file = if access.is_read_write() {
402 StorageLock::try_acquire(path.as_ref()).map_err(ProviderError::other)?.into()
403 } else {
404 None
405 };
406
407 let mut blocks_per_file = StaticFileMap::default();
408 for segment in StaticFileSegment::iter() {
409 blocks_per_file.insert(segment, DEFAULT_BLOCKS_PER_STATIC_FILE);
410 }
411
412 let provider = Self {
413 map: Default::default(),
414 indexes: Default::default(),
415 writers: Default::default(),
416 earliest_history_height: Default::default(),
417 path: path.as_ref().to_path_buf(),
418 metrics: None,
419 access,
420 blocks_per_file,
421 _lock_file,
422 genesis_block_number: 0,
423 };
424
425 Ok(provider)
426 }
427
428 pub const fn is_read_only(&self) -> bool {
429 self.access.is_read_only()
430 }
431
432 pub fn find_fixed_range_with_block_index(
441 &self,
442 segment: StaticFileSegment,
443 block_index: Option<&SegmentRanges>,
444 block: BlockNumber,
445 ) -> SegmentRangeInclusive {
446 let blocks_per_file =
447 self.blocks_per_file.get(segment).copied().unwrap_or(DEFAULT_BLOCKS_PER_STATIC_FILE);
448
449 if let Some(block_index) = block_index {
450 if let Some((_, range)) = block_index.iter().find(|(max_block, _)| block <= **max_block)
452 {
453 return *range;
455 } else if let Some((_, range)) = block_index.last_key_value() {
456 let blocks_after_last_range = block - range.end();
462 let segments_to_skip = (blocks_after_last_range - 1) / blocks_per_file;
463 let start = range.end() + 1 + segments_to_skip * blocks_per_file;
464 return SegmentRangeInclusive::new(start, start + blocks_per_file - 1);
465 }
466 }
467 find_fixed_range(block, blocks_per_file)
470 }
471
472 pub fn find_fixed_range(
485 &self,
486 segment: StaticFileSegment,
487 block: BlockNumber,
488 ) -> SegmentRangeInclusive {
489 self.find_fixed_range_with_block_index(
490 segment,
491 self.indexes.read().get(segment).map(|index| &index.expected_block_ranges_by_max_block),
492 block,
493 )
494 }
495
496 pub const fn genesis_block_number(&self) -> u64 {
498 self.genesis_block_number
499 }
500}
501
502impl<N: NodePrimitives> StaticFileProvider<N> {
503 pub fn report_metrics(&self) -> ProviderResult<()> {
508 let Some(metrics) = &self.metrics else { return Ok(()) };
509
510 let static_files = iter_static_files(&self.path).map_err(ProviderError::other)?;
511 for (segment, headers) in &*static_files {
512 let mut entries = 0;
513 let mut size = 0;
514
515 for (block_range, _) in headers {
516 let fixed_block_range = self.find_fixed_range(segment, block_range.start());
517 let jar_provider = self
518 .get_segment_provider_for_range(segment, || Some(fixed_block_range), None)?
519 .ok_or_else(|| {
520 ProviderError::MissingStaticFileBlock(segment, block_range.start())
521 })?;
522
523 entries += jar_provider.rows();
524 size += jar_provider.size() as u64;
525 }
526
527 metrics.record_segment(segment, size, headers.len(), entries);
528 }
529
530 Ok(())
531 }
532
533 #[instrument(level = "debug", target = "providers::static_file", skip_all)]
535 fn write_headers(
536 w: &mut StaticFileProviderRWRefMut<'_, N>,
537 blocks: &[ExecutedBlock<N>],
538 ) -> ProviderResult<()> {
539 for block in blocks {
540 let b = block.recovered_block();
541 w.append_header(b.header(), &b.hash())?;
542 }
543 Ok(())
544 }
545
546 #[instrument(level = "debug", target = "providers::static_file", skip_all)]
548 fn write_transactions(
549 w: &mut StaticFileProviderRWRefMut<'_, N>,
550 blocks: &[ExecutedBlock<N>],
551 tx_nums: &[TxNumber],
552 ) -> ProviderResult<()> {
553 for (block, &first_tx) in blocks.iter().zip(tx_nums) {
554 let b = block.recovered_block();
555 w.increment_block(b.number())?;
556 for (i, tx) in b.body().transactions().iter().enumerate() {
557 w.append_transaction(first_tx + i as u64, tx)?;
558 }
559 }
560 Ok(())
561 }
562
563 #[instrument(level = "debug", target = "providers::static_file", skip_all)]
565 fn write_transaction_senders(
566 w: &mut StaticFileProviderRWRefMut<'_, N>,
567 blocks: &[ExecutedBlock<N>],
568 tx_nums: &[TxNumber],
569 ) -> ProviderResult<()> {
570 for (block, &first_tx) in blocks.iter().zip(tx_nums) {
571 let b = block.recovered_block();
572 w.increment_block(b.number())?;
573 for (i, sender) in b.senders_iter().enumerate() {
574 w.append_transaction_sender(first_tx + i as u64, sender)?;
575 }
576 }
577 Ok(())
578 }
579
580 #[instrument(level = "debug", target = "providers::static_file", skip_all)]
582 fn write_receipts(
583 w: &mut StaticFileProviderRWRefMut<'_, N>,
584 blocks: &[ExecutedBlock<N>],
585 tx_nums: &[TxNumber],
586 ctx: &StaticFileWriteCtx,
587 ) -> ProviderResult<()> {
588 for (block, &first_tx) in blocks.iter().zip(tx_nums) {
589 let block_number = block.recovered_block().number();
590 w.increment_block(block_number)?;
591
592 if ctx.receipts_prunable &&
594 ctx.receipts_prune_mode
595 .is_some_and(|mode| mode.should_prune(block_number, ctx.tip))
596 {
597 continue
598 }
599
600 for (i, receipt) in block.execution_outcome().receipts.iter().enumerate() {
601 w.append_receipt(first_tx + i as u64, receipt)?;
602 }
603 }
604 Ok(())
605 }
606
607 #[instrument(level = "debug", target = "providers::static_file", skip_all)]
609 fn write_account_changesets(
610 w: &mut StaticFileProviderRWRefMut<'_, N>,
611 blocks: &[ExecutedBlock<N>],
612 ) -> ProviderResult<()> {
613 for block in blocks {
614 let block_number = block.recovered_block().number();
615 let reverts = block.execution_outcome().state.reverts.to_plain_state_reverts();
616
617 let changeset: Vec<_> = reverts
618 .accounts
619 .into_iter()
620 .flatten()
621 .map(|(address, info)| AccountBeforeTx { address, info: info.map(Into::into) })
622 .collect();
623 w.append_account_changeset(changeset, block_number)?;
624 }
625 Ok(())
626 }
627
628 #[instrument(level = "debug", target = "providers::db", skip_all)]
630 fn write_storage_changesets(
631 w: &mut StaticFileProviderRWRefMut<'_, N>,
632 blocks: &[ExecutedBlock<N>],
633 ) -> ProviderResult<()> {
634 for block in blocks {
635 let block_number = block.recovered_block().number();
636 let reverts = block.execution_outcome().state.reverts.to_plain_state_reverts();
637
638 let changeset: Vec<_> = reverts
639 .storage
640 .into_iter()
641 .flatten()
642 .flat_map(|revert| {
643 revert.storage_revert.into_iter().map(move |(key, revert_to_slot)| {
644 StorageBeforeTx {
645 address: revert.address,
646 key: StorageSlotKey::from_u256(key).to_hashed(),
647 value: revert_to_slot.to_previous_value(),
648 }
649 })
650 })
651 .collect();
652 w.append_storage_changeset(changeset, block_number)?;
653 }
654 Ok(())
655 }
656
657 fn write_segment<F>(
662 &self,
663 segment: StaticFileSegment,
664 first_block_number: BlockNumber,
665 f: F,
666 ) -> ProviderResult<()>
667 where
668 F: FnOnce(&mut StaticFileProviderRWRefMut<'_, N>) -> ProviderResult<()>,
669 {
670 let mut w = self.get_writer(first_block_number, segment)?;
671 f(&mut w)?;
672 w.sync_all()
673 }
674
675 #[instrument(level = "debug", target = "providers::static_file", skip_all)]
680 pub fn write_blocks_data(
681 &self,
682 blocks: &[ExecutedBlock<N>],
683 tx_nums: &[TxNumber],
684 ctx: StaticFileWriteCtx,
685 runtime: &reth_tasks::Runtime,
686 ) -> ProviderResult<()> {
687 if blocks.is_empty() {
688 return Ok(());
689 }
690
691 let first_block_number = blocks[0].recovered_block().number();
692
693 let mut r_headers = None;
694 let mut r_txs = None;
695 let mut r_senders = None;
696 let mut r_receipts = None;
697 let mut r_account_changesets = None;
698 let mut r_storage_changesets = None;
699
700 runtime.storage_pool().in_place_scope(|s| {
701 s.spawn(|_| {
702 r_headers =
703 Some(self.write_segment(StaticFileSegment::Headers, first_block_number, |w| {
704 Self::write_headers(w, blocks)
705 }));
706 });
707
708 s.spawn(|_| {
709 r_txs = Some(self.write_segment(
710 StaticFileSegment::Transactions,
711 first_block_number,
712 |w| Self::write_transactions(w, blocks, tx_nums),
713 ));
714 });
715
716 if ctx.write_senders {
717 s.spawn(|_| {
718 r_senders = Some(self.write_segment(
719 StaticFileSegment::TransactionSenders,
720 first_block_number,
721 |w| Self::write_transaction_senders(w, blocks, tx_nums),
722 ));
723 });
724 }
725
726 if ctx.write_receipts {
727 s.spawn(|_| {
728 r_receipts = Some(self.write_segment(
729 StaticFileSegment::Receipts,
730 first_block_number,
731 |w| Self::write_receipts(w, blocks, tx_nums, &ctx),
732 ));
733 });
734 }
735
736 if ctx.write_account_changesets {
737 s.spawn(|_| {
738 r_account_changesets = Some(self.write_segment(
739 StaticFileSegment::AccountChangeSets,
740 first_block_number,
741 |w| Self::write_account_changesets(w, blocks),
742 ));
743 });
744 }
745
746 if ctx.write_storage_changesets {
747 s.spawn(|_| {
748 r_storage_changesets = Some(self.write_segment(
749 StaticFileSegment::StorageChangeSets,
750 first_block_number,
751 |w| Self::write_storage_changesets(w, blocks),
752 ));
753 });
754 }
755 });
756
757 r_headers.ok_or(StaticFileWriterError::ThreadPanic("headers"))??;
758 r_txs.ok_or(StaticFileWriterError::ThreadPanic("transactions"))??;
759 if ctx.write_senders {
760 r_senders.ok_or(StaticFileWriterError::ThreadPanic("senders"))??;
761 }
762 if ctx.write_receipts {
763 r_receipts.ok_or(StaticFileWriterError::ThreadPanic("receipts"))??;
764 }
765 if ctx.write_account_changesets {
766 r_account_changesets
767 .ok_or(StaticFileWriterError::ThreadPanic("account_changesets"))??;
768 }
769 if ctx.write_storage_changesets {
770 r_storage_changesets
771 .ok_or(StaticFileWriterError::ThreadPanic("storage_changesets"))??;
772 }
773 Ok(())
774 }
775
776 pub fn get_segment_provider(
779 &self,
780 segment: StaticFileSegment,
781 number: u64,
782 ) -> ProviderResult<StaticFileJarProvider<'_, N>> {
783 if segment.is_block_or_change_based() {
784 self.get_segment_provider_for_block(segment, number, None)
785 } else {
786 self.get_segment_provider_for_transaction(segment, number, None)
787 }
788 }
789
790 pub fn get_maybe_segment_provider(
795 &self,
796 segment: StaticFileSegment,
797 number: u64,
798 ) -> ProviderResult<Option<StaticFileJarProvider<'_, N>>> {
799 let provider = if segment.is_block_or_change_based() {
800 self.get_segment_provider_for_block(segment, number, None)
801 } else {
802 self.get_segment_provider_for_transaction(segment, number, None)
803 };
804
805 match provider {
806 Ok(provider) => Ok(Some(provider)),
807 Err(
808 ProviderError::MissingStaticFileBlock(_, _) |
809 ProviderError::MissingStaticFileTx(_, _),
810 ) => Ok(None),
811 Err(err) => Err(err),
812 }
813 }
814
815 pub fn get_segment_provider_for_block(
817 &self,
818 segment: StaticFileSegment,
819 block: BlockNumber,
820 path: Option<&Path>,
821 ) -> ProviderResult<StaticFileJarProvider<'_, N>> {
822 self.get_segment_provider_for_range(
823 segment,
824 || self.get_segment_ranges_from_block(segment, block),
825 path,
826 )?
827 .ok_or(ProviderError::MissingStaticFileBlock(segment, block))
828 }
829
830 pub fn get_segment_provider_for_transaction(
832 &self,
833 segment: StaticFileSegment,
834 tx: TxNumber,
835 path: Option<&Path>,
836 ) -> ProviderResult<StaticFileJarProvider<'_, N>> {
837 self.get_segment_provider_for_range(
838 segment,
839 || self.get_segment_ranges_from_transaction(segment, tx),
840 path,
841 )?
842 .ok_or(ProviderError::MissingStaticFileTx(segment, tx))
843 }
844
845 pub fn get_segment_provider_for_range(
849 &self,
850 segment: StaticFileSegment,
851 fn_range: impl Fn() -> Option<SegmentRangeInclusive>,
852 path: Option<&Path>,
853 ) -> ProviderResult<Option<StaticFileJarProvider<'_, N>>> {
854 let block_range = match path {
857 Some(path) => StaticFileSegment::parse_filename(
858 &path
859 .file_name()
860 .ok_or_else(|| {
861 ProviderError::MissingStaticFileSegmentPath(segment, path.to_path_buf())
862 })?
863 .to_string_lossy(),
864 )
865 .and_then(|(parsed_segment, block_range)| {
866 if parsed_segment == segment {
867 return Some(block_range);
868 }
869 None
870 }),
871 None => fn_range(),
872 };
873
874 if let Some(block_range) = block_range {
876 return Ok(Some(self.get_or_create_jar_provider(segment, &block_range)?));
877 }
878
879 Ok(None)
880 }
881
882 pub fn get_segment_provider_for_path(
884 &self,
885 path: &Path,
886 ) -> ProviderResult<Option<StaticFileJarProvider<'_, N>>> {
887 StaticFileSegment::parse_filename(
888 &path
889 .file_name()
890 .ok_or_else(|| ProviderError::MissingStaticFilePath(path.to_path_buf()))?
891 .to_string_lossy(),
892 )
893 .map(|(segment, block_range)| self.get_or_create_jar_provider(segment, &block_range))
894 .transpose()
895 }
896
897 pub fn remove_cached_provider(
901 &self,
902 segment: StaticFileSegment,
903 fixed_block_range_end: BlockNumber,
904 ) {
905 self.map.remove(&(fixed_block_range_end, segment));
906 }
907
908 pub fn delete_segment_below_block(
925 &self,
926 segment: StaticFileSegment,
927 block: BlockNumber,
928 ) -> ProviderResult<Vec<SegmentHeader>> {
929 if block == 0 {
931 return Ok(Vec::new());
932 }
933
934 let highest_block = self.get_highest_static_file_block(segment);
935 let mut deleted_headers = Vec::new();
936
937 loop {
938 let Some(block_height) = self.get_lowest_range_end(segment) else {
939 return Ok(deleted_headers);
940 };
941
942 if block_height >= block || Some(block_height) == highest_block {
944 return Ok(deleted_headers);
945 }
946
947 debug!(
948 target: "providers::static_file",
949 ?segment,
950 ?block_height,
951 "Deleting static file below block"
952 );
953
954 let header = self.delete_jar(segment, block_height).inspect_err(|err| {
957 warn!( target: "providers::static_file", ?segment, %block_height, ?err, "Failed to delete static file below block")
958 })?;
959
960 deleted_headers.push(header);
961 }
962 }
963
964 pub fn delete_jar(
972 &self,
973 segment: StaticFileSegment,
974 block: BlockNumber,
975 ) -> ProviderResult<SegmentHeader> {
976 let fixed_block_range = self.find_fixed_range(segment, block);
977 let key = (fixed_block_range.end(), segment);
978 let file = self.path.join(segment.filename(&fixed_block_range));
979 let jar = if let Some((_, jar)) = self.map.remove(&key) {
980 jar.jar
981 } else {
982 debug!(
983 target: "providers::static_file",
984 ?file,
985 ?fixed_block_range,
986 ?block,
987 "Loading static file jar for deletion"
988 );
989 NippyJar::<SegmentHeader>::load(&file).map_err(ProviderError::other)?
990 };
991
992 let header = jar.user_header().clone();
993
994 if segment.is_change_based() {
996 let csoff_path = file.with_extension("csoff");
997 if csoff_path.exists() {
998 std::fs::remove_file(&csoff_path).map_err(ProviderError::other)?;
999 }
1000 }
1001
1002 jar.delete().map_err(ProviderError::other)?;
1003
1004 self.initialize_index()?;
1007
1008 Ok(header)
1009 }
1010
1011 pub fn delete_segment(&self, segment: StaticFileSegment) -> ProviderResult<Vec<SegmentHeader>> {
1019 let mut deleted_headers = Vec::new();
1020
1021 while let Some(block_height) = self.get_highest_static_file_block(segment) {
1022 debug!(
1023 target: "providers::static_file",
1024 ?segment,
1025 ?block_height,
1026 "Deleting static file jar"
1027 );
1028
1029 let header = self.delete_jar(segment, block_height).inspect_err(|err| {
1030 warn!(target: "providers::static_file", ?segment, %block_height, ?err, "Failed to delete static file jar")
1031 })?;
1032
1033 deleted_headers.push(header);
1034 }
1035
1036 Ok(deleted_headers)
1037 }
1038
1039 fn get_or_create_jar_provider(
1043 &self,
1044 segment: StaticFileSegment,
1045 fixed_block_range: &SegmentRangeInclusive,
1046 ) -> ProviderResult<StaticFileJarProvider<'_, N>> {
1047 let key = (fixed_block_range.end(), segment);
1048
1049 trace!(target: "providers::static_file", ?segment, ?fixed_block_range, "Getting provider");
1051 let mut provider: StaticFileJarProvider<'_, N> = if let Some(jar) = self.map.get(&key) {
1052 trace!(target: "providers::static_file", ?segment, ?fixed_block_range, "Jar found in cache");
1053 jar.into()
1054 } else {
1055 trace!(target: "providers::static_file", ?segment, ?fixed_block_range, "Creating jar from scratch");
1056 let path = self.path.join(segment.filename(fixed_block_range));
1057 let jar = NippyJar::load(&path).map_err(ProviderError::other)?;
1058 self.map.entry(key).insert(LoadedJar::new(jar)?).downgrade().into()
1059 };
1060
1061 if let Some(metrics) = &self.metrics {
1062 provider = provider.with_metrics(metrics.clone());
1063 }
1064 Ok(provider)
1065 }
1066
1067 fn get_segment_ranges_from_block(
1070 &self,
1071 segment: StaticFileSegment,
1072 block: u64,
1073 ) -> Option<SegmentRangeInclusive> {
1074 let indexes = self.indexes.read();
1075 let index = indexes.get(segment)?;
1076
1077 (index.max_block >= block).then(|| {
1078 self.find_fixed_range_with_block_index(
1079 segment,
1080 Some(&index.expected_block_ranges_by_max_block),
1081 block,
1082 )
1083 })
1084 }
1085
1086 fn get_segment_ranges_from_transaction(
1089 &self,
1090 segment: StaticFileSegment,
1091 tx: u64,
1092 ) -> Option<SegmentRangeInclusive> {
1093 let indexes = self.indexes.read();
1094 let index = indexes.get(segment)?;
1095 let available_block_ranges_by_max_tx = index.available_block_ranges_by_max_tx.as_ref()?;
1096
1097 let mut static_files_rev_iter = available_block_ranges_by_max_tx.iter().rev().peekable();
1100
1101 while let Some((tx_end, block_range)) = static_files_rev_iter.next() {
1102 if tx > *tx_end {
1103 return None;
1105 }
1106 let tx_start = static_files_rev_iter.peek().map(|(tx_end, _)| *tx_end + 1).unwrap_or(0);
1107 if tx_start <= tx {
1108 return Some(self.find_fixed_range_with_block_index(
1109 segment,
1110 Some(&index.expected_block_ranges_by_max_block),
1111 block_range.end(),
1112 ));
1113 }
1114 }
1115 None
1116 }
1117
1118 pub fn update_index(
1125 &self,
1126 segment: StaticFileSegment,
1127 segment_max_block: Option<BlockNumber>,
1128 ) -> ProviderResult<()> {
1129 debug!(
1130 target: "providers::static_file",
1131 ?segment,
1132 ?segment_max_block,
1133 "Updating provider index"
1134 );
1135 let mut indexes = self.indexes.write();
1136
1137 match segment_max_block {
1138 Some(segment_max_block) => {
1139 let fixed_range = self.find_fixed_range_with_block_index(
1140 segment,
1141 indexes.get(segment).map(|index| &index.expected_block_ranges_by_max_block),
1142 segment_max_block,
1143 );
1144
1145 let jar = NippyJar::<SegmentHeader>::load(
1146 &self.path.join(segment.filename(&fixed_range)),
1147 )
1148 .map_err(ProviderError::other)?;
1149
1150 let index = indexes
1151 .entry(segment)
1152 .and_modify(|index| {
1153 index.max_block = segment_max_block;
1155
1156 index
1160 .expected_block_ranges_by_max_block
1161 .retain(|_, block_range| block_range.start() < fixed_range.start());
1162 index
1164 .expected_block_ranges_by_max_block
1165 .insert(fixed_range.end(), fixed_range);
1166 })
1167 .or_insert_with(|| StaticFileSegmentIndex {
1168 min_block_range: None,
1169 max_block: segment_max_block,
1170 expected_block_ranges_by_max_block: BTreeMap::from([(
1171 fixed_range.end(),
1172 fixed_range,
1173 )]),
1174 available_block_ranges_by_max_tx: None,
1175 });
1176
1177 if let Some(current_block_range) = jar.user_header().block_range() {
1193 if let Some(min_block_range) = index.min_block_range.as_mut() {
1194 if current_block_range.start() == min_block_range.start() {
1197 *min_block_range = current_block_range;
1198 }
1199 } else {
1200 index.min_block_range = Some(current_block_range);
1201 }
1202 }
1203
1204 if let Some(tx_range) = jar.user_header().tx_range() {
1207 if let Some(current_block_range) = jar.user_header().block_range() {
1210 let tx_end = tx_range.end();
1211
1212 if let Some(index) = index.available_block_ranges_by_max_tx.as_mut() {
1221 index
1222 .retain(|_, block_range| block_range.start() < fixed_range.start());
1223 index.insert(tx_end, current_block_range);
1224 } else {
1225 index.available_block_ranges_by_max_tx =
1226 Some(BTreeMap::from([(tx_end, current_block_range)]));
1227 }
1228 }
1229 } else if segment.is_tx_based() {
1230 if let Some(index) = index.available_block_ranges_by_max_tx.as_mut() {
1234 index.retain(|_, block_range| block_range.start() < fixed_range.start());
1235 }
1236
1237 index.available_block_ranges_by_max_tx.take_if(|index| index.is_empty());
1239 }
1240
1241 debug!(target: "providers::static_file", ?segment, "Inserting updated jar into cache");
1243 self.map.insert((fixed_range.end(), segment), LoadedJar::new(jar)?);
1244
1245 debug!(target: "providers::static_file", ?segment, "Cleaning up jar map");
1247 self.map.retain(|(end, seg), _| !(*seg == segment && *end > fixed_range.end()));
1248 }
1249 None => {
1250 debug!(target: "providers::static_file", ?segment, "Removing segment from index");
1251 indexes.remove(segment);
1252 }
1253 };
1254
1255 debug!(target: "providers::static_file", ?segment, "Updated provider index");
1256 Ok(())
1257 }
1258
1259 pub fn initialize_index(&self) -> ProviderResult<()> {
1261 let mut indexes = self.indexes.write();
1262 indexes.clear();
1263
1264 for (segment, headers) in &*iter_static_files(&self.path).map_err(ProviderError::other)? {
1265 let min_block_range = Some(headers.first().expect("headers are not empty").0);
1270 let max_block = headers.last().expect("headers are not empty").0.end();
1271
1272 let mut expected_block_ranges_by_max_block = BTreeMap::default();
1273 let mut available_block_ranges_by_max_tx = None;
1274
1275 for (block_range, header) in headers {
1276 expected_block_ranges_by_max_block
1278 .insert(header.expected_block_end(), header.expected_block_range());
1279
1280 if let Some(tx_range) = header.tx_range() {
1282 let tx_end = tx_range.end();
1283
1284 available_block_ranges_by_max_tx
1285 .get_or_insert_with(BTreeMap::default)
1286 .insert(tx_end, *block_range);
1287 }
1288 }
1289
1290 indexes.insert(
1291 segment,
1292 StaticFileSegmentIndex {
1293 min_block_range,
1294 max_block,
1295 expected_block_ranges_by_max_block,
1296 available_block_ranges_by_max_tx,
1297 },
1298 );
1299 }
1300
1301 self.map.clear();
1303
1304 if let Some(lowest_range) =
1306 indexes.get(StaticFileSegment::Transactions).and_then(|index| index.min_block_range)
1307 {
1308 self.earliest_history_height
1310 .store(lowest_range.start(), std::sync::atomic::Ordering::Relaxed);
1311 }
1312
1313 Ok(())
1314 }
1315
1316 #[instrument(skip(self, provider), fields(read_only = self.is_read_only()))]
1340 pub fn check_consistency<Provider>(
1341 &self,
1342 provider: &Provider,
1343 ) -> ProviderResult<Option<PipelineTarget>>
1344 where
1345 Provider: DBProvider
1346 + BlockReader
1347 + StageCheckpointReader
1348 + PruneCheckpointReader
1349 + ChainSpecProvider
1350 + StorageSettingsCache,
1351 N: NodePrimitives<Receipt: Value, BlockHeader: Value, SignedTx: Value>,
1352 {
1353 if provider.chain_spec().is_optimism() &&
1360 reth_chainspec::Chain::optimism_mainnet() == provider.chain_spec().chain_id()
1361 {
1362 const OVM_HEADER_1_HASH: B256 =
1364 b256!("0xbee7192e575af30420cae0c7776304ac196077ee72b048970549e4f08e875453");
1365 if provider.block_number(OVM_HEADER_1_HASH)?.is_some() {
1366 info!(target: "reth::cli",
1367 "Skipping storage verification for OP mainnet, expected inconsistency in OVM chain"
1368 );
1369 return Ok(None);
1370 }
1371 }
1372
1373 info!(target: "reth::cli", "Verifying storage consistency.");
1374
1375 let mut unwind_target: Option<BlockNumber> = None;
1376
1377 let mut update_unwind_target = |new_target| {
1378 unwind_target =
1379 unwind_target.map(|current| current.min(new_target)).or(Some(new_target));
1380 };
1381
1382 for segment in self.segments_to_check(provider) {
1383 let span = info_span!(
1384 "Checking consistency for segment",
1385 ?segment,
1386 initial_highest_block = tracing::field::Empty,
1387 highest_block = tracing::field::Empty,
1388 highest_tx = tracing::field::Empty,
1389 );
1390 let _guard = span.enter();
1391
1392 debug!(target: "reth::providers::static_file", "Checking consistency for segment");
1393
1394 let (initial_highest_block, mut highest_block) = self.maybe_heal_segment(segment)?;
1396 span.record("initial_highest_block", initial_highest_block);
1397 span.record("highest_block", highest_block);
1398
1399 if initial_highest_block != highest_block {
1404 info!(
1405 target: "reth::providers::static_file",
1406 unwind_target = highest_block,
1407 "Setting unwind target."
1408 );
1409 update_unwind_target(highest_block.unwrap_or_default());
1410 }
1411
1412 let highest_tx = self.get_highest_static_file_tx(segment);
1418 span.record("highest_tx", highest_tx);
1419 debug!(target: "reth::providers::static_file", "Checking tx index segment");
1420
1421 if let Some(highest_tx) = highest_tx {
1422 let mut last_block = highest_block.unwrap_or_default();
1423 debug!(target: "reth::providers::static_file", last_block, highest_tx, "Verifying last transaction matches last block indices");
1424 loop {
1425 let Some(indices) = provider.block_body_indices(last_block)? else {
1426 debug!(target: "reth::providers::static_file", last_block, "Block body indices not found, static files ahead of database");
1427 break
1431 };
1432
1433 debug!(target: "reth::providers::static_file", last_block, last_tx_num = indices.last_tx_num(), "Found block body indices");
1434
1435 if indices.last_tx_num() <= highest_tx {
1436 break
1437 }
1438
1439 if last_block == 0 {
1440 debug!(target: "reth::providers::static_file", "Reached block 0 in verification loop");
1441 break
1442 }
1443
1444 last_block -= 1;
1445
1446 info!(
1447 target: "reth::providers::static_file",
1448 highest_block = self.get_highest_static_file_block(segment),
1449 unwind_target = last_block,
1450 "Setting unwind target."
1451 );
1452 span.record("highest_block", last_block);
1453 highest_block = Some(last_block);
1454 update_unwind_target(last_block);
1455 }
1456 }
1457
1458 debug!(target: "reth::providers::static_file", "Ensuring invariants for segment");
1459
1460 match self.ensure_invariants_for(provider, segment, highest_tx, highest_block)? {
1461 Some(unwind) => {
1462 debug!(target: "reth::providers::static_file", unwind_target=unwind, "Invariants check returned unwind target");
1463 update_unwind_target(unwind);
1464 }
1465 None => {
1466 debug!(target: "reth::providers::static_file", "Invariants check completed, no unwind needed")
1467 }
1468 }
1469 }
1470
1471 Ok(unwind_target.map(PipelineTarget::Unwind))
1472 }
1473
1474 pub fn check_file_consistency<Provider>(&self, provider: &Provider) -> ProviderResult<()>
1483 where
1484 Provider: DBProvider + ChainSpecProvider + StorageSettingsCache + PruneCheckpointReader,
1485 {
1486 info!(target: "reth::cli", "Healing static file inconsistencies.");
1487
1488 for segment in self.segments_to_check(provider) {
1489 let _guard = info_span!("Healing static file segment", ?segment).entered();
1490 let _ = self.maybe_heal_segment(segment)?;
1491 }
1492
1493 Ok(())
1494 }
1495
1496 fn segments_to_check<'a, Provider>(
1498 &'a self,
1499 provider: &'a Provider,
1500 ) -> impl Iterator<Item = StaticFileSegment> + 'a
1501 where
1502 Provider: DBProvider + ChainSpecProvider + StorageSettingsCache + PruneCheckpointReader,
1503 {
1504 StaticFileSegment::iter()
1505 .filter(move |segment| self.should_check_segment(provider, *segment))
1506 }
1507
1508 fn should_check_segment<Provider>(
1510 &self,
1511 provider: &Provider,
1512 segment: StaticFileSegment,
1513 ) -> bool
1514 where
1515 Provider: DBProvider + ChainSpecProvider + StorageSettingsCache + PruneCheckpointReader,
1516 {
1517 match segment {
1518 StaticFileSegment::Headers | StaticFileSegment::Transactions => true,
1519 StaticFileSegment::Receipts => {
1520 if EitherWriter::receipts_destination(provider).is_database() {
1521 debug!(target: "reth::providers::static_file", ?segment, "Skipping receipts segment: receipts stored in database");
1524 return false;
1525 }
1526
1527 if NamedChain::Gnosis == provider.chain_spec().chain_id() ||
1528 NamedChain::Chiado == provider.chain_spec().chain_id()
1529 {
1530 debug!(target: "reth::providers::static_file", ?segment, "Skipping receipts segment: broken historical import for gnosis/chiado");
1534 return false;
1535 }
1536
1537 true
1538 }
1539 StaticFileSegment::TransactionSenders => {
1540 if EitherWriterDestination::senders(provider).is_database() {
1541 debug!(target: "reth::providers::static_file", ?segment, "Skipping senders segment: senders stored in database");
1542 return false;
1543 }
1544
1545 if Self::is_segment_fully_pruned(provider, PruneSegment::SenderRecovery) {
1546 debug!(target: "reth::providers::static_file", ?segment, "Skipping senders segment: fully pruned");
1547 return false;
1548 }
1549
1550 true
1551 }
1552 StaticFileSegment::AccountChangeSets => {
1553 if EitherWriter::account_changesets_destination(provider).is_database() {
1554 debug!(target: "reth::providers::static_file", ?segment, "Skipping account changesets segment: changesets stored in database");
1555 return false;
1556 }
1557 true
1558 }
1559 StaticFileSegment::StorageChangeSets => {
1560 if EitherWriter::storage_changesets_destination(provider).is_database() {
1561 debug!(target: "reth::providers::static_file", ?segment, "Skipping storage changesets segment: changesets stored in database");
1562 return false
1563 }
1564 true
1565 }
1566 }
1567 }
1568
1569 fn is_segment_fully_pruned<Provider>(provider: &Provider, segment: PruneSegment) -> bool
1573 where
1574 Provider: PruneCheckpointReader,
1575 {
1576 provider
1577 .get_prune_checkpoint(segment)
1578 .ok()
1579 .flatten()
1580 .is_some_and(|checkpoint| checkpoint.prune_mode.is_full())
1581 }
1582
1583 fn check_segment_consistency(&self, segment: StaticFileSegment) -> ProviderResult<()> {
1588 debug!(target: "reth::providers::static_file", "Checking segment consistency");
1589 if let Some(latest_block) = self.get_highest_static_file_block(segment) {
1590 let file_path = self
1591 .directory()
1592 .join(segment.filename(&self.find_fixed_range(segment, latest_block)));
1593 debug!(target: "reth::providers::static_file", ?file_path, latest_block, "Loading NippyJar for consistency check");
1594
1595 let jar = NippyJar::<SegmentHeader>::load(&file_path).map_err(ProviderError::other)?;
1596 debug!(target: "reth::providers::static_file", "NippyJar loaded, checking consistency");
1597
1598 NippyJarChecker::new(jar).check_consistency().map_err(ProviderError::other)?;
1599 debug!(target: "reth::providers::static_file", "NippyJar consistency check passed");
1600 } else {
1601 debug!(target: "reth::providers::static_file", "No static file block found, skipping consistency check");
1602 }
1603 Ok(())
1604 }
1605
1606 fn maybe_heal_segment(
1622 &self,
1623 segment: StaticFileSegment,
1624 ) -> ProviderResult<(Option<BlockNumber>, Option<BlockNumber>)> {
1625 let initial_highest_block = self.get_highest_static_file_block(segment);
1626 debug!(target: "reth::providers::static_file", ?initial_highest_block, "Initial highest block for segment");
1627
1628 if self.access.is_read_only() {
1629 debug!(target: "reth::providers::static_file", "Checking segment consistency (read-only)");
1632 self.check_segment_consistency(segment)?;
1633 } else {
1634 debug!(target: "reth::providers::static_file", "Fetching latest writer which might heal any potential inconsistency");
1637 self.latest_writer(segment)?;
1638 }
1639
1640 let highest_block = self.get_highest_static_file_block(segment);
1643
1644 Ok((initial_highest_block, highest_block))
1645 }
1646
1647 fn ensure_invariants_for<Provider>(
1649 &self,
1650 provider: &Provider,
1651 segment: StaticFileSegment,
1652 highest_tx: Option<u64>,
1653 highest_block: Option<BlockNumber>,
1654 ) -> ProviderResult<Option<BlockNumber>>
1655 where
1656 Provider: DBProvider + BlockReader + StageCheckpointReader,
1657 N: NodePrimitives<Receipt: Value, BlockHeader: Value, SignedTx: Value>,
1658 {
1659 match segment {
1660 StaticFileSegment::Headers => self
1661 .ensure_invariants::<_, tables::Headers<N::BlockHeader>>(
1662 provider,
1663 segment,
1664 highest_block,
1665 highest_block,
1666 ),
1667 StaticFileSegment::Transactions => self
1668 .ensure_invariants::<_, tables::Transactions<N::SignedTx>>(
1669 provider,
1670 segment,
1671 highest_tx,
1672 highest_block,
1673 ),
1674 StaticFileSegment::Receipts => self
1675 .ensure_invariants::<_, tables::Receipts<N::Receipt>>(
1676 provider,
1677 segment,
1678 highest_tx,
1679 highest_block,
1680 ),
1681 StaticFileSegment::TransactionSenders => self
1682 .ensure_invariants::<_, tables::TransactionSenders>(
1683 provider,
1684 segment,
1685 highest_tx,
1686 highest_block,
1687 ),
1688 StaticFileSegment::AccountChangeSets => self
1689 .ensure_invariants::<_, tables::AccountChangeSets>(
1690 provider,
1691 segment,
1692 highest_tx,
1693 highest_block,
1694 ),
1695 StaticFileSegment::StorageChangeSets => self
1696 .ensure_changeset_invariants_by_block::<_, tables::StorageChangeSets, _>(
1697 provider,
1698 segment,
1699 highest_block,
1700 |key| key.block_number(),
1701 ),
1702 }
1703 }
1704
1705 #[instrument(skip(self, provider, segment), fields(table = T::NAME))]
1720 fn ensure_invariants<Provider, T: Table<Key = u64>>(
1721 &self,
1722 provider: &Provider,
1723 segment: StaticFileSegment,
1724 highest_static_file_entry: Option<u64>,
1725 highest_static_file_block: Option<BlockNumber>,
1726 ) -> ProviderResult<Option<BlockNumber>>
1727 where
1728 Provider: DBProvider + BlockReader + StageCheckpointReader,
1729 {
1730 debug!(target: "reth::providers::static_file", "Ensuring invariants");
1731 let mut db_cursor = provider.tx_ref().cursor_read::<T>()?;
1732
1733 if let Some((db_first_entry, _)) = db_cursor.first()? {
1734 debug!(target: "reth::providers::static_file", db_first_entry, "Found first database entry");
1735 if let (Some(highest_entry), Some(highest_block)) =
1736 (highest_static_file_entry, highest_static_file_block)
1737 {
1738 if !(db_first_entry <= highest_entry || highest_entry + 1 == db_first_entry) {
1742 info!(
1743 target: "reth::providers::static_file",
1744 ?db_first_entry,
1745 ?highest_entry,
1746 unwind_target = highest_block,
1747 "Setting unwind target."
1748 );
1749 return Ok(Some(highest_block));
1750 }
1751 }
1752
1753 if let Some((db_last_entry, _)) = db_cursor.last()? &&
1754 highest_static_file_entry
1755 .is_none_or(|highest_entry| db_last_entry > highest_entry)
1756 {
1757 debug!(target: "reth::providers::static_file", db_last_entry, "Database has entries beyond static files, no unwind needed");
1758 return Ok(None)
1759 }
1760 } else {
1761 debug!(target: "reth::providers::static_file", "No database entries found");
1762 }
1763
1764 let highest_static_file_entry = highest_static_file_entry.unwrap_or_default();
1765 let highest_static_file_block = highest_static_file_block.unwrap_or_default();
1766
1767 let stage_id = segment.to_stage_id();
1770 let checkpoint_block_number =
1771 provider.get_stage_checkpoint(stage_id)?.unwrap_or_default().block_number;
1772 debug!(target: "reth::providers::static_file", ?stage_id, checkpoint_block_number, "Retrieved stage checkpoint");
1773
1774 if checkpoint_block_number > highest_static_file_block {
1776 info!(
1777 target: "reth::providers::static_file",
1778 checkpoint_block_number,
1779 unwind_target = highest_static_file_block,
1780 "Setting unwind target."
1781 );
1782 return Ok(Some(highest_static_file_block));
1783 }
1784
1785 if checkpoint_block_number >= highest_static_file_block {
1787 debug!(target: "reth::providers::static_file", "Invariants ensured, returning None");
1788 return Ok(None);
1789 }
1790
1791 info!(
1797 target: "reth::providers",
1798 from = highest_static_file_block,
1799 to = checkpoint_block_number,
1800 "Unwinding static file segment."
1801 );
1802 let mut writer = self.latest_writer(segment)?;
1803
1804 match segment {
1805 StaticFileSegment::Headers => {
1806 let prune_count = highest_static_file_block - checkpoint_block_number;
1807 debug!(target: "reth::providers::static_file", prune_count, "Pruning headers");
1808 writer.prune_headers(prune_count)?;
1810 }
1811 StaticFileSegment::Transactions |
1812 StaticFileSegment::Receipts |
1813 StaticFileSegment::TransactionSenders => {
1814 if let Some(block) = provider.block_body_indices(checkpoint_block_number)? {
1815 let number = highest_static_file_entry - block.last_tx_num();
1816 debug!(target: "reth::providers::static_file", prune_count = number, checkpoint_block_number, "Pruning transaction based segment");
1817
1818 match segment {
1819 StaticFileSegment::Transactions => {
1820 writer.prune_transactions(number, checkpoint_block_number)?
1821 }
1822 StaticFileSegment::Receipts => {
1823 writer.prune_receipts(number, checkpoint_block_number)?
1824 }
1825 StaticFileSegment::TransactionSenders => {
1826 writer.prune_transaction_senders(number, checkpoint_block_number)?
1827 }
1828 StaticFileSegment::Headers |
1829 StaticFileSegment::AccountChangeSets |
1830 StaticFileSegment::StorageChangeSets => {
1831 unreachable!()
1832 }
1833 }
1834 } else {
1835 debug!(target: "reth::providers::static_file", checkpoint_block_number, "No block body indices found for checkpoint block");
1836 }
1837 }
1838 StaticFileSegment::AccountChangeSets => {
1839 writer.prune_account_changesets(checkpoint_block_number)?;
1840 }
1841 StaticFileSegment::StorageChangeSets => {
1842 writer.prune_storage_changesets(checkpoint_block_number)?;
1843 }
1844 }
1845
1846 debug!(target: "reth::providers::static_file", "Committing writer after pruning");
1847 writer.commit()?;
1848 debug!(target: "reth::providers::static_file", "Writer committed successfully");
1849
1850 debug!(target: "reth::providers::static_file", "Invariants ensured, returning None");
1851 Ok(None)
1852 }
1853
1854 fn ensure_changeset_invariants_by_block<Provider, T, F>(
1855 &self,
1856 provider: &Provider,
1857 segment: StaticFileSegment,
1858 highest_static_file_block: Option<BlockNumber>,
1859 block_from_key: F,
1860 ) -> ProviderResult<Option<BlockNumber>>
1861 where
1862 Provider: DBProvider + BlockReader + StageCheckpointReader,
1863 T: Table,
1864 F: Fn(&T::Key) -> BlockNumber,
1865 {
1866 debug!(
1867 target: "reth::providers::static_file",
1868 ?segment,
1869 ?highest_static_file_block,
1870 "Ensuring changeset invariants"
1871 );
1872 let mut db_cursor = provider.tx_ref().cursor_read::<T>()?;
1873
1874 if let Some((db_first_key, _)) = db_cursor.first()? {
1875 let db_first_block = block_from_key(&db_first_key);
1876 if let Some(highest_block) = highest_static_file_block &&
1877 !(db_first_block <= highest_block || highest_block + 1 == db_first_block)
1878 {
1879 info!(
1880 target: "reth::providers::static_file",
1881 ?db_first_block,
1882 ?highest_block,
1883 unwind_target = highest_block,
1884 ?segment,
1885 "Setting unwind target."
1886 );
1887 return Ok(Some(highest_block))
1888 }
1889
1890 if let Some((db_last_key, _)) = db_cursor.last()? &&
1891 highest_static_file_block
1892 .is_none_or(|highest_block| block_from_key(&db_last_key) > highest_block)
1893 {
1894 debug!(
1895 target: "reth::providers::static_file",
1896 ?segment,
1897 "Database has entries beyond static files, no unwind needed"
1898 );
1899 return Ok(None)
1900 }
1901 } else {
1902 debug!(target: "reth::providers::static_file", ?segment, "No database entries found");
1903 }
1904
1905 let highest_static_file_block = highest_static_file_block.unwrap_or_default();
1906
1907 let stage_id = segment.to_stage_id();
1908 let checkpoint_block_number =
1909 provider.get_stage_checkpoint(stage_id)?.unwrap_or_default().block_number;
1910
1911 if checkpoint_block_number > highest_static_file_block {
1912 info!(
1913 target: "reth::providers::static_file",
1914 checkpoint_block_number,
1915 unwind_target = highest_static_file_block,
1916 ?segment,
1917 "Setting unwind target."
1918 );
1919 return Ok(Some(highest_static_file_block))
1920 }
1921
1922 if checkpoint_block_number < highest_static_file_block {
1923 info!(
1924 target: "reth::providers",
1925 ?segment,
1926 from = highest_static_file_block,
1927 to = checkpoint_block_number,
1928 "Unwinding static file segment."
1929 );
1930 let mut writer = self.latest_writer(segment)?;
1931 match segment {
1932 StaticFileSegment::AccountChangeSets => {
1933 writer.prune_account_changesets(checkpoint_block_number)?;
1934 }
1935 StaticFileSegment::StorageChangeSets => {
1936 writer.prune_storage_changesets(checkpoint_block_number)?;
1937 }
1938 _ => unreachable!("invalid segment for changeset invariants"),
1939 }
1940 writer.commit()?;
1941 }
1942
1943 Ok(None)
1944 }
1945
1946 pub fn earliest_history_height(&self) -> BlockNumber {
1954 self.earliest_history_height.load(std::sync::atomic::Ordering::Relaxed)
1955 }
1956
1957 pub fn get_lowest_range(&self, segment: StaticFileSegment) -> Option<SegmentRangeInclusive> {
1961 self.indexes.read().get(segment).and_then(|index| index.min_block_range)
1962 }
1963
1964 pub fn get_lowest_range_start(&self, segment: StaticFileSegment) -> Option<BlockNumber> {
1970 self.get_lowest_range(segment).map(|range| range.start())
1971 }
1972
1973 pub fn get_lowest_range_end(&self, segment: StaticFileSegment) -> Option<BlockNumber> {
1979 self.get_lowest_range(segment).map(|range| range.end())
1980 }
1981
1982 pub fn get_highest_static_file_block(&self, segment: StaticFileSegment) -> Option<BlockNumber> {
1986 self.indexes.read().get(segment).map(|index| index.max_block)
1987 }
1988
1989 fn bound_range(
1996 &self,
1997 range: impl RangeBounds<BlockNumber>,
1998 segment: StaticFileSegment,
1999 ) -> RangeInclusive<BlockNumber> {
2000 let highest_block = self.get_highest_static_file_block(segment).unwrap_or(0);
2001
2002 let start = match range.start_bound() {
2003 Bound::Included(&n) => n,
2004 Bound::Excluded(&n) => n.saturating_add(1),
2005 Bound::Unbounded => 0,
2006 };
2007 let end = match range.end_bound() {
2008 Bound::Included(&n) => n.min(highest_block),
2009 Bound::Excluded(&n) => n.saturating_sub(1).min(highest_block),
2010 Bound::Unbounded => highest_block,
2011 };
2012
2013 start..=end
2014 }
2015
2016 pub fn get_highest_static_file_tx(&self, segment: StaticFileSegment) -> Option<TxNumber> {
2020 self.indexes
2021 .read()
2022 .get(segment)
2023 .and_then(|index| index.available_block_ranges_by_max_tx.as_ref())
2024 .and_then(|index| index.last_key_value().map(|(last_tx, _)| *last_tx))
2025 }
2026
2027 pub fn get_highest_static_files(&self) -> HighestStaticFiles {
2029 HighestStaticFiles {
2030 receipts: self.get_highest_static_file_block(StaticFileSegment::Receipts),
2031 }
2032 }
2033
2034 pub fn find_static_file<T>(
2037 &self,
2038 segment: StaticFileSegment,
2039 func: impl Fn(StaticFileJarProvider<'_, N>) -> ProviderResult<Option<T>>,
2040 ) -> ProviderResult<Option<T>> {
2041 if let Some(ranges) =
2042 self.indexes.read().get(segment).map(|index| &index.expected_block_ranges_by_max_block)
2043 {
2044 for range in ranges.values().rev() {
2046 if let Some(res) = func(self.get_or_create_jar_provider(segment, range)?)? {
2047 return Ok(Some(res));
2048 }
2049 }
2050 }
2051
2052 Ok(None)
2053 }
2054
2055 pub fn fetch_range_with_predicate<T, F, P>(
2061 &self,
2062 segment: StaticFileSegment,
2063 range: Range<u64>,
2064 mut get_fn: F,
2065 mut predicate: P,
2066 ) -> ProviderResult<Vec<T>>
2067 where
2068 F: FnMut(&mut StaticFileCursor<'_>, u64) -> ProviderResult<Option<T>>,
2069 P: FnMut(&T) -> bool,
2070 {
2071 let mut result = Vec::with_capacity((range.end - range.start).min(100) as usize);
2072
2073 macro_rules! get_provider {
2077 ($number:expr) => {{
2078 match self.get_segment_provider(segment, $number) {
2079 Ok(provider) => provider,
2080 Err(
2081 ProviderError::MissingStaticFileBlock(_, _) |
2082 ProviderError::MissingStaticFileTx(_, _),
2083 ) => return Ok(result),
2084 Err(err) => return Err(err),
2085 }
2086 }};
2087 }
2088
2089 let mut provider = get_provider!(range.start);
2090 let mut cursor = provider.cursor()?;
2091
2092 'outer: for number in range {
2094 let mut retrying = false;
2098
2099 'inner: loop {
2101 match get_fn(&mut cursor, number)? {
2102 Some(res) => {
2103 if !predicate(&res) {
2104 break 'outer;
2105 }
2106 result.push(res);
2107 break 'inner;
2108 }
2109 None => {
2110 if retrying {
2111 return Ok(result);
2112 }
2113 drop(cursor);
2118 drop(provider);
2119 provider = get_provider!(number);
2120 cursor = provider.cursor()?;
2121 retrying = true;
2122 }
2123 }
2124 }
2125 }
2126
2127 result.shrink_to_fit();
2128
2129 Ok(result)
2130 }
2131
2132 pub fn fetch_range_iter<'a, T, F>(
2137 &'a self,
2138 segment: StaticFileSegment,
2139 range: Range<u64>,
2140 get_fn: F,
2141 ) -> ProviderResult<impl Iterator<Item = ProviderResult<Option<T>>> + 'a>
2142 where
2143 F: Fn(&mut StaticFileCursor<'_>, u64) -> ProviderResult<Option<T>> + 'a,
2144 T: std::fmt::Debug,
2145 {
2146 let mut provider = self.get_maybe_segment_provider(segment, range.start)?;
2147 Ok(range.map(move |number| {
2148 match provider
2149 .as_ref()
2150 .map(|provider| get_fn(&mut provider.cursor()?, number))
2151 .and_then(|result| result.transpose())
2152 {
2153 Some(result) => result.map(Some),
2154 None => {
2155 provider.take();
2159 provider = self.get_maybe_segment_provider(segment, number)?;
2160 provider
2161 .as_ref()
2162 .map(|provider| get_fn(&mut provider.cursor()?, number))
2163 .and_then(|result| result.transpose())
2164 .transpose()
2165 }
2166 }
2167 }))
2168 }
2169
2170 pub fn directory(&self) -> &Path {
2172 &self.path
2173 }
2174
2175 pub fn get_with_static_file_or_database<T, FS, FD>(
2185 &self,
2186 segment: StaticFileSegment,
2187 number: u64,
2188 fetch_from_static_file: FS,
2189 fetch_from_database: FD,
2190 ) -> ProviderResult<Option<T>>
2191 where
2192 FS: Fn(&Self) -> ProviderResult<Option<T>>,
2193 FD: Fn() -> ProviderResult<Option<T>>,
2194 {
2195 let static_file_upper_bound = if segment.is_block_or_change_based() {
2197 self.get_highest_static_file_block(segment)
2198 } else {
2199 self.get_highest_static_file_tx(segment)
2200 };
2201
2202 if static_file_upper_bound
2203 .is_some_and(|static_file_upper_bound| static_file_upper_bound >= number)
2204 {
2205 return fetch_from_static_file(self);
2206 }
2207 fetch_from_database()
2208 }
2209
2210 pub fn get_range_with_static_file_or_database<T, P, FS, FD>(
2222 &self,
2223 segment: StaticFileSegment,
2224 mut block_or_tx_range: Range<u64>,
2225 fetch_from_static_file: FS,
2226 mut fetch_from_database: FD,
2227 mut predicate: P,
2228 ) -> ProviderResult<Vec<T>>
2229 where
2230 FS: Fn(&Self, Range<u64>, &mut P) -> ProviderResult<Vec<T>>,
2231 FD: FnMut(Range<u64>, P) -> ProviderResult<Vec<T>>,
2232 P: FnMut(&T) -> bool,
2233 {
2234 let mut data = Vec::new();
2235
2236 if let Some(static_file_upper_bound) = if segment.is_block_or_change_based() {
2238 self.get_highest_static_file_block(segment)
2239 } else {
2240 self.get_highest_static_file_tx(segment)
2241 } && block_or_tx_range.start <= static_file_upper_bound
2242 {
2243 let end = block_or_tx_range.end.min(static_file_upper_bound + 1);
2244 data.extend(fetch_from_static_file(
2245 self,
2246 block_or_tx_range.start..end,
2247 &mut predicate,
2248 )?);
2249 block_or_tx_range.start = end;
2250 }
2251
2252 if block_or_tx_range.end > block_or_tx_range.start {
2253 data.extend(fetch_from_database(block_or_tx_range, predicate)?)
2254 }
2255
2256 Ok(data)
2257 }
2258
2259 #[cfg(any(test, feature = "test-utils"))]
2261 pub fn path(&self) -> &Path {
2262 &self.path
2263 }
2264
2265 #[cfg(any(test, feature = "test-utils"))]
2267 pub fn tx_index(&self, segment: StaticFileSegment) -> Option<SegmentRanges> {
2268 self.indexes
2269 .read()
2270 .get(segment)
2271 .and_then(|index| index.available_block_ranges_by_max_tx.as_ref())
2272 .cloned()
2273 }
2274
2275 #[cfg(any(test, feature = "test-utils"))]
2277 pub fn expected_block_index(&self, segment: StaticFileSegment) -> Option<SegmentRanges> {
2278 self.indexes
2279 .read()
2280 .get(segment)
2281 .map(|index| &index.expected_block_ranges_by_max_block)
2282 .cloned()
2283 }
2284}
2285
2286#[derive(Debug)]
2287struct StaticFileSegmentIndex {
2288 min_block_range: Option<SegmentRangeInclusive>,
2300 max_block: u64,
2302 expected_block_ranges_by_max_block: SegmentRanges,
2308 available_block_ranges_by_max_tx: Option<SegmentRanges>,
2315}
2316
2317pub trait StaticFileWriter {
2319 type Primitives: Send + Sync + 'static;
2321
2322 fn get_writer(
2324 &self,
2325 block: BlockNumber,
2326 segment: StaticFileSegment,
2327 ) -> ProviderResult<StaticFileProviderRWRefMut<'_, Self::Primitives>>;
2328
2329 fn latest_writer(
2332 &self,
2333 segment: StaticFileSegment,
2334 ) -> ProviderResult<StaticFileProviderRWRefMut<'_, Self::Primitives>>;
2335
2336 fn commit(&self) -> ProviderResult<()>;
2338
2339 fn has_unwind_queued(&self) -> bool;
2341
2342 fn finalize(&self) -> ProviderResult<()>;
2346}
2347
2348impl<N: NodePrimitives> StaticFileWriter for StaticFileProvider<N> {
2349 type Primitives = N;
2350
2351 fn get_writer(
2352 &self,
2353 block: BlockNumber,
2354 segment: StaticFileSegment,
2355 ) -> ProviderResult<StaticFileProviderRWRefMut<'_, Self::Primitives>> {
2356 if self.access.is_read_only() {
2357 return Err(ProviderError::ReadOnlyStaticFileAccess);
2358 }
2359
2360 trace!(target: "providers::static_file", ?block, ?segment, "Getting static file writer.");
2361 self.writers.get_or_create(segment, || {
2362 StaticFileProviderRW::new(segment, block, Arc::downgrade(&self.0), self.metrics.clone())
2363 })
2364 }
2365
2366 fn latest_writer(
2367 &self,
2368 segment: StaticFileSegment,
2369 ) -> ProviderResult<StaticFileProviderRWRefMut<'_, Self::Primitives>> {
2370 let genesis_number = self.0.as_ref().genesis_block_number();
2371 self.get_writer(
2372 self.get_highest_static_file_block(segment).unwrap_or(genesis_number),
2373 segment,
2374 )
2375 }
2376
2377 fn commit(&self) -> ProviderResult<()> {
2378 self.writers.commit()
2379 }
2380
2381 fn has_unwind_queued(&self) -> bool {
2382 self.writers.has_unwind_queued()
2383 }
2384
2385 fn finalize(&self) -> ProviderResult<()> {
2386 self.writers.finalize()
2387 }
2388}
2389
2390impl<N: NodePrimitives> ChangeSetReader for StaticFileProvider<N> {
2391 fn account_block_changeset(
2392 &self,
2393 block_number: BlockNumber,
2394 ) -> ProviderResult<Vec<reth_db::models::AccountBeforeTx>> {
2395 let provider = match self.get_segment_provider_for_block(
2396 StaticFileSegment::AccountChangeSets,
2397 block_number,
2398 None,
2399 ) {
2400 Ok(provider) => provider,
2401 Err(ProviderError::MissingStaticFileBlock(_, _)) => return Ok(Vec::new()),
2402 Err(err) => return Err(err),
2403 };
2404
2405 if let Some(offset) = provider.read_changeset_offset(block_number)? {
2406 let mut cursor = provider.cursor()?;
2407 let mut changeset = Vec::with_capacity(offset.num_changes() as usize);
2408
2409 for i in offset.changeset_range() {
2410 if let Some(change) =
2411 cursor.get_one::<reth_db::static_file::AccountChangesetMask>(i.into())?
2412 {
2413 changeset.push(change)
2414 }
2415 }
2416 Ok(changeset)
2417 } else {
2418 Ok(Vec::new())
2419 }
2420 }
2421
2422 fn get_account_before_block(
2423 &self,
2424 block_number: BlockNumber,
2425 address: Address,
2426 ) -> ProviderResult<Option<reth_db::models::AccountBeforeTx>> {
2427 let provider = match self.get_segment_provider_for_block(
2428 StaticFileSegment::AccountChangeSets,
2429 block_number,
2430 None,
2431 ) {
2432 Ok(provider) => provider,
2433 Err(ProviderError::MissingStaticFileBlock(_, _)) => return Ok(None),
2434 Err(err) => return Err(err),
2435 };
2436
2437 let Some(offset) = provider.read_changeset_offset(block_number)? else {
2438 return Ok(None);
2439 };
2440
2441 let mut cursor = provider.cursor()?;
2442 let range = offset.changeset_range();
2443 let mut low = range.start;
2444 let mut high = range.end;
2445
2446 while low < high {
2447 let mid = low + (high - low) / 2;
2448 if let Some(change) =
2449 cursor.get_one::<reth_db::static_file::AccountChangesetMask>(mid.into())?
2450 {
2451 if change.address < address {
2452 low = mid + 1;
2453 } else {
2454 high = mid;
2455 }
2456 } else {
2457 debug!(
2460 target: "providers::static_file",
2461 ?low,
2462 ?mid,
2463 ?high,
2464 ?range,
2465 ?block_number,
2466 ?address,
2467 "Cannot continue binary search for account changeset fetch"
2468 );
2469 low = range.end;
2470 break;
2471 }
2472 }
2473
2474 if low < range.end &&
2475 let Some(change) = cursor
2476 .get_one::<reth_db::static_file::AccountChangesetMask>(low.into())?
2477 .filter(|change| change.address == address)
2478 {
2479 return Ok(Some(change));
2480 }
2481
2482 Ok(None)
2483 }
2484
2485 fn account_changesets_range(
2486 &self,
2487 range: impl core::ops::RangeBounds<BlockNumber>,
2488 ) -> ProviderResult<Vec<(BlockNumber, reth_db::models::AccountBeforeTx)>> {
2489 let range = self.bound_range(range, StaticFileSegment::AccountChangeSets);
2490 self.walk_account_changeset_range(range).collect()
2491 }
2492
2493 fn account_changeset_count(&self) -> ProviderResult<usize> {
2494 let mut count = 0;
2495
2496 let static_files = iter_static_files(&self.path).map_err(ProviderError::other)?;
2497 if let Some(changeset_segments) = static_files.get(StaticFileSegment::AccountChangeSets) {
2498 for (block_range, header) in changeset_segments {
2499 let csoff_path = self
2500 .path
2501 .join(StaticFileSegment::AccountChangeSets.filename(block_range))
2502 .with_extension("csoff");
2503 if csoff_path.exists() {
2504 let len = header.changeset_offsets_len();
2505 let mut reader = ChangesetOffsetReader::new(&csoff_path, len)
2506 .map_err(ProviderError::other)?;
2507 let offsets = reader.get_range(0, len).map_err(ProviderError::other)?;
2508 for offset in offsets {
2509 count += offset.num_changes() as usize;
2510 }
2511 }
2512 }
2513 }
2514
2515 Ok(count)
2516 }
2517}
2518
2519impl<N: NodePrimitives> StorageChangeSetReader for StaticFileProvider<N> {
2520 fn storage_changeset(
2521 &self,
2522 block_number: BlockNumber,
2523 ) -> ProviderResult<Vec<(BlockNumberAddress, ChangesetEntry)>> {
2524 let provider = match self.get_segment_provider_for_block(
2525 StaticFileSegment::StorageChangeSets,
2526 block_number,
2527 None,
2528 ) {
2529 Ok(provider) => provider,
2530 Err(ProviderError::MissingStaticFileBlock(_, _)) => return Ok(Vec::new()),
2531 Err(err) => return Err(err),
2532 };
2533
2534 if let Some(offset) = provider.read_changeset_offset(block_number)? {
2535 let mut cursor = provider.cursor()?;
2536 let mut changeset = Vec::with_capacity(offset.num_changes() as usize);
2537
2538 for i in offset.changeset_range() {
2539 if let Some(change) = cursor.get_one::<StorageChangesetMask>(i.into())? {
2540 let block_address = BlockNumberAddress((block_number, change.address));
2541 let entry = ChangesetEntry {
2542 key: StorageSlotKey::hashed(change.key),
2543 value: change.value,
2544 };
2545 changeset.push((block_address, entry));
2546 }
2547 }
2548 Ok(changeset)
2549 } else {
2550 Ok(Vec::new())
2551 }
2552 }
2553
2554 fn get_storage_before_block(
2555 &self,
2556 block_number: BlockNumber,
2557 address: Address,
2558 storage_key: B256,
2559 ) -> ProviderResult<Option<ChangesetEntry>> {
2560 let provider = match self.get_segment_provider_for_block(
2561 StaticFileSegment::StorageChangeSets,
2562 block_number,
2563 None,
2564 ) {
2565 Ok(provider) => provider,
2566 Err(ProviderError::MissingStaticFileBlock(_, _)) => return Ok(None),
2567 Err(err) => return Err(err),
2568 };
2569
2570 let Some(offset) = provider.read_changeset_offset(block_number)? else {
2571 return Ok(None);
2572 };
2573
2574 let mut cursor = provider.cursor()?;
2575 let range = offset.changeset_range();
2576 let mut low = range.start;
2577 let mut high = range.end;
2578
2579 while low < high {
2580 let mid = low + (high - low) / 2;
2581 if let Some(change) = cursor.get_one::<StorageChangesetMask>(mid.into())? {
2582 match (change.address, change.key).cmp(&(address, storage_key)) {
2583 std::cmp::Ordering::Less => low = mid + 1,
2584 _ => high = mid,
2585 }
2586 } else {
2587 debug!(
2588 target: "providers::static_file",
2589 ?low,
2590 ?mid,
2591 ?high,
2592 ?range,
2593 ?block_number,
2594 ?address,
2595 ?storage_key,
2596 "Cannot continue binary search for storage changeset fetch"
2597 );
2598 low = range.end;
2599 break;
2600 }
2601 }
2602
2603 if low < range.end &&
2604 let Some(change) = cursor
2605 .get_one::<StorageChangesetMask>(low.into())?
2606 .filter(|change| change.address == address && change.key == storage_key)
2607 {
2608 return Ok(Some(ChangesetEntry {
2609 key: StorageSlotKey::hashed(change.key),
2610 value: change.value,
2611 }));
2612 }
2613
2614 Ok(None)
2615 }
2616
2617 fn storage_changesets_range(
2618 &self,
2619 range: impl RangeBounds<BlockNumber>,
2620 ) -> ProviderResult<Vec<(BlockNumberAddress, ChangesetEntry)>> {
2621 let range = self.bound_range(range, StaticFileSegment::StorageChangeSets);
2622 self.walk_storage_changeset_range(range).collect()
2623 }
2624
2625 fn storage_changeset_count(&self) -> ProviderResult<usize> {
2626 let mut count = 0;
2627
2628 let static_files = iter_static_files(&self.path).map_err(ProviderError::other)?;
2629 if let Some(changeset_segments) = static_files.get(StaticFileSegment::StorageChangeSets) {
2630 for (block_range, header) in changeset_segments {
2631 let csoff_path = self
2632 .path
2633 .join(StaticFileSegment::StorageChangeSets.filename(block_range))
2634 .with_extension("csoff");
2635 if csoff_path.exists() {
2636 let len = header.changeset_offsets_len();
2637 let mut reader = ChangesetOffsetReader::new(&csoff_path, len)
2638 .map_err(ProviderError::other)?;
2639 let offsets = reader.get_range(0, len).map_err(ProviderError::other)?;
2640 for offset in offsets {
2641 count += offset.num_changes() as usize;
2642 }
2643 }
2644 }
2645 }
2646
2647 Ok(count)
2648 }
2649}
2650
2651impl<N: NodePrimitives> StaticFileProvider<N> {
2652 pub fn walk_account_changeset_range(
2662 &self,
2663 range: impl RangeBounds<BlockNumber>,
2664 ) -> StaticFileAccountChangesetWalker<Self> {
2665 StaticFileAccountChangesetWalker::new(self.clone(), range)
2666 }
2667
2668 pub fn walk_storage_changeset_range(
2670 &self,
2671 range: impl RangeBounds<BlockNumber>,
2672 ) -> StaticFileStorageChangesetWalker<Self> {
2673 StaticFileStorageChangesetWalker::new(self.clone(), range)
2674 }
2675}
2676
2677impl<N: NodePrimitives<BlockHeader: Value>> HeaderProvider for StaticFileProvider<N> {
2678 type Header = N::BlockHeader;
2679
2680 fn header(&self, block_hash: BlockHash) -> ProviderResult<Option<Self::Header>> {
2681 self.find_static_file(StaticFileSegment::Headers, |jar_provider| {
2682 Ok(jar_provider
2683 .cursor()?
2684 .get_two::<HeaderWithHashMask<Self::Header>>((&block_hash).into())?
2685 .and_then(|(header, hash)| {
2686 if hash == block_hash {
2687 return Some(header);
2688 }
2689 None
2690 }))
2691 })
2692 }
2693
2694 fn header_by_number(&self, num: BlockNumber) -> ProviderResult<Option<Self::Header>> {
2695 self.get_segment_provider_for_block(StaticFileSegment::Headers, num, None)
2696 .and_then(|provider| provider.header_by_number(num))
2697 .or_else(|err| {
2698 if let ProviderError::MissingStaticFileBlock(_, _) = err {
2699 Ok(None)
2700 } else {
2701 Err(err)
2702 }
2703 })
2704 }
2705
2706 fn headers_range(
2707 &self,
2708 range: impl RangeBounds<BlockNumber>,
2709 ) -> ProviderResult<Vec<Self::Header>> {
2710 self.fetch_range_with_predicate(
2711 StaticFileSegment::Headers,
2712 to_range(range),
2713 |cursor, number| cursor.get_one::<HeaderMask<Self::Header>>(number.into()),
2714 |_| true,
2715 )
2716 }
2717
2718 fn sealed_header(
2719 &self,
2720 num: BlockNumber,
2721 ) -> ProviderResult<Option<SealedHeader<Self::Header>>> {
2722 self.get_segment_provider_for_block(StaticFileSegment::Headers, num, None)
2723 .and_then(|provider| provider.sealed_header(num))
2724 .or_else(|err| {
2725 if let ProviderError::MissingStaticFileBlock(_, _) = err {
2726 Ok(None)
2727 } else {
2728 Err(err)
2729 }
2730 })
2731 }
2732
2733 fn sealed_headers_while(
2734 &self,
2735 range: impl RangeBounds<BlockNumber>,
2736 predicate: impl FnMut(&SealedHeader<Self::Header>) -> bool,
2737 ) -> ProviderResult<Vec<SealedHeader<Self::Header>>> {
2738 self.fetch_range_with_predicate(
2739 StaticFileSegment::Headers,
2740 to_range(range),
2741 |cursor, number| {
2742 Ok(cursor
2743 .get_two::<HeaderWithHashMask<Self::Header>>(number.into())?
2744 .map(|(header, hash)| SealedHeader::new(header, hash)))
2745 },
2746 predicate,
2747 )
2748 }
2749}
2750
2751impl<N: NodePrimitives> BlockHashReader for StaticFileProvider<N> {
2752 fn block_hash(&self, num: u64) -> ProviderResult<Option<B256>> {
2753 self.get_segment_provider_for_block(StaticFileSegment::Headers, num, None)
2754 .and_then(|provider| provider.block_hash(num))
2755 .or_else(|err| {
2756 if let ProviderError::MissingStaticFileBlock(_, _) = err {
2757 Ok(None)
2758 } else {
2759 Err(err)
2760 }
2761 })
2762 }
2763
2764 fn canonical_hashes_range(
2765 &self,
2766 start: BlockNumber,
2767 end: BlockNumber,
2768 ) -> ProviderResult<Vec<B256>> {
2769 self.fetch_range_with_predicate(
2770 StaticFileSegment::Headers,
2771 start..end,
2772 |cursor, number| cursor.get_one::<BlockHashMask>(number.into()),
2773 |_| true,
2774 )
2775 }
2776}
2777
2778impl<N: NodePrimitives<SignedTx: Value + SignedTransaction, Receipt: Value>> ReceiptProvider
2779 for StaticFileProvider<N>
2780{
2781 type Receipt = N::Receipt;
2782
2783 fn receipt(&self, num: TxNumber) -> ProviderResult<Option<Self::Receipt>> {
2784 self.get_segment_provider_for_transaction(StaticFileSegment::Receipts, num, None)
2785 .and_then(|provider| provider.receipt(num))
2786 .or_else(|err| {
2787 if let ProviderError::MissingStaticFileTx(_, _) = err {
2788 Ok(None)
2789 } else {
2790 Err(err)
2791 }
2792 })
2793 }
2794
2795 fn receipt_by_hash(&self, hash: TxHash) -> ProviderResult<Option<Self::Receipt>> {
2796 if let Some(num) = self.transaction_id(hash)? {
2797 return self.receipt(num);
2798 }
2799 Ok(None)
2800 }
2801
2802 fn receipts_by_block(
2803 &self,
2804 _block: BlockHashOrNumber,
2805 ) -> ProviderResult<Option<Vec<Self::Receipt>>> {
2806 unreachable!()
2807 }
2808
2809 fn receipts_by_tx_range(
2810 &self,
2811 range: impl RangeBounds<TxNumber>,
2812 ) -> ProviderResult<Vec<Self::Receipt>> {
2813 self.fetch_range_with_predicate(
2814 StaticFileSegment::Receipts,
2815 to_range(range),
2816 |cursor, number| cursor.get_one::<ReceiptMask<Self::Receipt>>(number.into()),
2817 |_| true,
2818 )
2819 }
2820
2821 fn receipts_by_block_range(
2822 &self,
2823 _block_range: RangeInclusive<BlockNumber>,
2824 ) -> ProviderResult<Vec<Vec<Self::Receipt>>> {
2825 Err(ProviderError::UnsupportedProvider)
2826 }
2827}
2828
2829impl<N: NodePrimitives<SignedTx: Value, Receipt: Value, BlockHeader: Value>> TransactionsProviderExt
2830 for StaticFileProvider<N>
2831{
2832 fn transaction_hashes_by_range(
2833 &self,
2834 tx_range: Range<TxNumber>,
2835 ) -> ProviderResult<Vec<(TxHash, TxNumber)>> {
2836 let tx_range_size = (tx_range.end - tx_range.start) as usize;
2837
2838 let chunk_size = 100;
2842
2843 let chunks = tx_range
2845 .clone()
2846 .step_by(chunk_size)
2847 .map(|start| start..std::cmp::min(start + chunk_size as u64, tx_range.end));
2848 let mut channels = Vec::with_capacity(tx_range_size.div_ceil(chunk_size));
2849
2850 for chunk_range in chunks {
2851 let (channel_tx, channel_rx) = mpsc::channel();
2852 channels.push(channel_rx);
2853
2854 let manager = self.clone();
2855
2856 rayon::spawn(move || {
2860 let mut rlp_buf = Vec::with_capacity(128);
2861 let _ = manager.fetch_range_with_predicate(
2862 StaticFileSegment::Transactions,
2863 chunk_range,
2864 |cursor, number| {
2865 Ok(cursor
2866 .get_one::<TransactionMask<Self::Transaction>>(number.into())?
2867 .map(|transaction| {
2868 rlp_buf.clear();
2869 let _ = channel_tx
2870 .send(calculate_hash((number, transaction), &mut rlp_buf));
2871 }))
2872 },
2873 |_| true,
2874 );
2875 });
2876 }
2877
2878 let mut tx_list = Vec::with_capacity(tx_range_size);
2879
2880 for channel in channels {
2882 while let Ok(tx) = channel.recv() {
2883 let (tx_hash, tx_id) = tx.map_err(|boxed| *boxed)?;
2884 tx_list.push((tx_hash, tx_id));
2885 }
2886 }
2887
2888 Ok(tx_list)
2889 }
2890}
2891
2892impl<N: NodePrimitives<SignedTx: Decompress + SignedTransaction>> TransactionsProvider
2893 for StaticFileProvider<N>
2894{
2895 type Transaction = N::SignedTx;
2896
2897 fn transaction_id(&self, tx_hash: TxHash) -> ProviderResult<Option<TxNumber>> {
2898 self.find_static_file(StaticFileSegment::Transactions, |jar_provider| {
2899 let mut cursor = jar_provider.cursor()?;
2900 if cursor
2901 .get_one::<TransactionMask<Self::Transaction>>((&tx_hash).into())?
2902 .and_then(|tx| (tx.trie_hash() == tx_hash).then_some(tx))
2903 .is_some()
2904 {
2905 Ok(cursor.number())
2906 } else {
2907 Ok(None)
2908 }
2909 })
2910 }
2911
2912 fn transaction_by_id(&self, num: TxNumber) -> ProviderResult<Option<Self::Transaction>> {
2913 self.get_segment_provider_for_transaction(StaticFileSegment::Transactions, num, None)
2914 .and_then(|provider| provider.transaction_by_id(num))
2915 .or_else(|err| {
2916 if let ProviderError::MissingStaticFileTx(_, _) = err {
2917 Ok(None)
2918 } else {
2919 Err(err)
2920 }
2921 })
2922 }
2923
2924 fn transaction_by_id_unhashed(
2925 &self,
2926 num: TxNumber,
2927 ) -> ProviderResult<Option<Self::Transaction>> {
2928 self.get_segment_provider_for_transaction(StaticFileSegment::Transactions, num, None)
2929 .and_then(|provider| provider.transaction_by_id_unhashed(num))
2930 .or_else(|err| {
2931 if let ProviderError::MissingStaticFileTx(_, _) = err {
2932 Ok(None)
2933 } else {
2934 Err(err)
2935 }
2936 })
2937 }
2938
2939 fn transaction_by_hash(&self, hash: TxHash) -> ProviderResult<Option<Self::Transaction>> {
2940 self.find_static_file(StaticFileSegment::Transactions, |jar_provider| {
2941 Ok(jar_provider
2942 .cursor()?
2943 .get_one::<TransactionMask<Self::Transaction>>((&hash).into())?
2944 .and_then(|tx| (tx.trie_hash() == hash).then_some(tx)))
2945 })
2946 }
2947
2948 fn transaction_by_hash_with_meta(
2949 &self,
2950 _hash: TxHash,
2951 ) -> ProviderResult<Option<(Self::Transaction, TransactionMeta)>> {
2952 Err(ProviderError::UnsupportedProvider)
2954 }
2955
2956 fn transactions_by_block(
2957 &self,
2958 _block_id: BlockHashOrNumber,
2959 ) -> ProviderResult<Option<Vec<Self::Transaction>>> {
2960 Err(ProviderError::UnsupportedProvider)
2962 }
2963
2964 fn transactions_by_block_range(
2965 &self,
2966 _range: impl RangeBounds<BlockNumber>,
2967 ) -> ProviderResult<Vec<Vec<Self::Transaction>>> {
2968 Err(ProviderError::UnsupportedProvider)
2970 }
2971
2972 fn transactions_by_tx_range(
2973 &self,
2974 range: impl RangeBounds<TxNumber>,
2975 ) -> ProviderResult<Vec<Self::Transaction>> {
2976 self.fetch_range_with_predicate(
2977 StaticFileSegment::Transactions,
2978 to_range(range),
2979 |cursor, number| cursor.get_one::<TransactionMask<Self::Transaction>>(number.into()),
2980 |_| true,
2981 )
2982 }
2983
2984 fn senders_by_tx_range(
2985 &self,
2986 range: impl RangeBounds<TxNumber>,
2987 ) -> ProviderResult<Vec<Address>> {
2988 self.fetch_range_with_predicate(
2989 StaticFileSegment::TransactionSenders,
2990 to_range(range),
2991 |cursor, number| cursor.get_one::<TransactionSenderMask>(number.into()),
2992 |_| true,
2993 )
2994 }
2995
2996 fn transaction_sender(&self, id: TxNumber) -> ProviderResult<Option<Address>> {
2997 self.get_segment_provider_for_transaction(StaticFileSegment::TransactionSenders, id, None)
2998 .and_then(|provider| provider.transaction_sender(id))
2999 .or_else(|err| {
3000 if let ProviderError::MissingStaticFileTx(_, _) = err {
3001 Ok(None)
3002 } else {
3003 Err(err)
3004 }
3005 })
3006 }
3007}
3008
3009impl<N: NodePrimitives> BlockNumReader for StaticFileProvider<N> {
3010 fn chain_info(&self) -> ProviderResult<ChainInfo> {
3011 Err(ProviderError::UnsupportedProvider)
3013 }
3014
3015 fn best_block_number(&self) -> ProviderResult<BlockNumber> {
3016 Err(ProviderError::UnsupportedProvider)
3018 }
3019
3020 fn last_block_number(&self) -> ProviderResult<BlockNumber> {
3021 Ok(self.get_highest_static_file_block(StaticFileSegment::Headers).unwrap_or_default())
3022 }
3023
3024 fn block_number(&self, _hash: B256) -> ProviderResult<Option<BlockNumber>> {
3025 Err(ProviderError::UnsupportedProvider)
3027 }
3028}
3029
3030impl<N: NodePrimitives<SignedTx: Value, Receipt: Value, BlockHeader: Value>> BlockReader
3033 for StaticFileProvider<N>
3034{
3035 type Block = N::Block;
3036
3037 fn find_block_by_hash(
3038 &self,
3039 _hash: B256,
3040 _source: BlockSource,
3041 ) -> ProviderResult<Option<Self::Block>> {
3042 Err(ProviderError::UnsupportedProvider)
3044 }
3045
3046 fn block(&self, _id: BlockHashOrNumber) -> ProviderResult<Option<Self::Block>> {
3047 Err(ProviderError::UnsupportedProvider)
3049 }
3050
3051 fn pending_block(&self) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
3052 Err(ProviderError::UnsupportedProvider)
3054 }
3055
3056 fn pending_block_and_receipts(
3057 &self,
3058 ) -> ProviderResult<Option<(RecoveredBlock<Self::Block>, Vec<Self::Receipt>)>> {
3059 Err(ProviderError::UnsupportedProvider)
3061 }
3062
3063 fn recovered_block(
3064 &self,
3065 _id: BlockHashOrNumber,
3066 _transaction_kind: TransactionVariant,
3067 ) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
3068 Err(ProviderError::UnsupportedProvider)
3070 }
3071
3072 fn sealed_block_with_senders(
3073 &self,
3074 _id: BlockHashOrNumber,
3075 _transaction_kind: TransactionVariant,
3076 ) -> ProviderResult<Option<RecoveredBlock<Self::Block>>> {
3077 Err(ProviderError::UnsupportedProvider)
3079 }
3080
3081 fn block_range(&self, _range: RangeInclusive<BlockNumber>) -> ProviderResult<Vec<Self::Block>> {
3082 Err(ProviderError::UnsupportedProvider)
3084 }
3085
3086 fn block_with_senders_range(
3087 &self,
3088 _range: RangeInclusive<BlockNumber>,
3089 ) -> ProviderResult<Vec<RecoveredBlock<Self::Block>>> {
3090 Err(ProviderError::UnsupportedProvider)
3091 }
3092
3093 fn recovered_block_range(
3094 &self,
3095 _range: RangeInclusive<BlockNumber>,
3096 ) -> ProviderResult<Vec<RecoveredBlock<Self::Block>>> {
3097 Err(ProviderError::UnsupportedProvider)
3098 }
3099
3100 fn block_by_transaction_id(&self, _id: TxNumber) -> ProviderResult<Option<BlockNumber>> {
3101 Err(ProviderError::UnsupportedProvider)
3102 }
3103}
3104
3105impl<N: NodePrimitives> BlockBodyIndicesProvider for StaticFileProvider<N> {
3106 fn block_body_indices(&self, _num: u64) -> ProviderResult<Option<StoredBlockBodyIndices>> {
3107 Err(ProviderError::UnsupportedProvider)
3108 }
3109
3110 fn block_body_indices_range(
3111 &self,
3112 _range: RangeInclusive<BlockNumber>,
3113 ) -> ProviderResult<Vec<StoredBlockBodyIndices>> {
3114 Err(ProviderError::UnsupportedProvider)
3115 }
3116}
3117
3118impl<N: NodePrimitives> StatsReader for StaticFileProvider<N> {
3119 fn count_entries<T: Table>(&self) -> ProviderResult<usize> {
3120 match T::NAME {
3121 tables::CanonicalHeaders::NAME |
3122 tables::Headers::<Header>::NAME |
3123 tables::HeaderTerminalDifficulties::NAME => Ok(self
3124 .get_highest_static_file_block(StaticFileSegment::Headers)
3125 .map(|block| block + 1)
3126 .unwrap_or_default()
3127 as usize),
3128 tables::Receipts::<Receipt>::NAME => Ok(self
3129 .get_highest_static_file_tx(StaticFileSegment::Receipts)
3130 .map(|receipts| receipts + 1)
3131 .unwrap_or_default() as usize),
3132 tables::Transactions::<TransactionSigned>::NAME => Ok(self
3133 .get_highest_static_file_tx(StaticFileSegment::Transactions)
3134 .map(|txs| txs + 1)
3135 .unwrap_or_default()
3136 as usize),
3137 tables::TransactionSenders::NAME => Ok(self
3138 .get_highest_static_file_tx(StaticFileSegment::TransactionSenders)
3139 .map(|txs| txs + 1)
3140 .unwrap_or_default() as usize),
3141 _ => Err(ProviderError::UnsupportedProvider),
3142 }
3143 }
3144}
3145
3146#[inline]
3148fn calculate_hash<T>(
3149 entry: (TxNumber, T),
3150 rlp_buf: &mut Vec<u8>,
3151) -> Result<(B256, TxNumber), Box<ProviderError>>
3152where
3153 T: Encodable2718,
3154{
3155 let (tx_id, tx) = entry;
3156 tx.encode_2718(rlp_buf);
3157 Ok((keccak256(rlp_buf), tx_id))
3158}
3159
3160#[cfg(test)]
3161mod tests {
3162 use std::collections::BTreeMap;
3163
3164 use reth_chain_state::EthPrimitives;
3165 use reth_db::test_utils::create_test_static_files_dir;
3166 use reth_static_file_types::{SegmentRangeInclusive, StaticFileSegment};
3167
3168 use crate::{providers::StaticFileProvider, StaticFileProviderBuilder};
3169
3170 #[test]
3171 fn test_find_fixed_range_with_block_index() -> eyre::Result<()> {
3172 let (static_dir, _) = create_test_static_files_dir();
3173 let sf_rw: StaticFileProvider<EthPrimitives> =
3174 StaticFileProviderBuilder::read_write(&static_dir).with_blocks_per_file(100).build()?;
3175
3176 let segment = StaticFileSegment::Headers;
3177
3178 assert_eq!(
3180 sf_rw.find_fixed_range_with_block_index(segment, None, 0),
3181 SegmentRangeInclusive::new(0, 99)
3182 );
3183 assert_eq!(
3184 sf_rw.find_fixed_range_with_block_index(segment, None, 250),
3185 SegmentRangeInclusive::new(200, 299)
3186 );
3187
3188 assert_eq!(
3190 sf_rw.find_fixed_range_with_block_index(segment, Some(&BTreeMap::new()), 150),
3191 SegmentRangeInclusive::new(100, 199)
3192 );
3193
3194 let block_index = BTreeMap::from_iter([
3196 (99, SegmentRangeInclusive::new(0, 99)),
3197 (199, SegmentRangeInclusive::new(100, 199)),
3198 (299, SegmentRangeInclusive::new(200, 299)),
3199 ]);
3200
3201 assert_eq!(
3203 sf_rw.find_fixed_range_with_block_index(segment, Some(&block_index), 0),
3204 SegmentRangeInclusive::new(0, 99)
3205 );
3206 assert_eq!(
3207 sf_rw.find_fixed_range_with_block_index(segment, Some(&block_index), 50),
3208 SegmentRangeInclusive::new(0, 99)
3209 );
3210 assert_eq!(
3211 sf_rw.find_fixed_range_with_block_index(segment, Some(&block_index), 99),
3212 SegmentRangeInclusive::new(0, 99)
3213 );
3214 assert_eq!(
3215 sf_rw.find_fixed_range_with_block_index(segment, Some(&block_index), 100),
3216 SegmentRangeInclusive::new(100, 199)
3217 );
3218 assert_eq!(
3219 sf_rw.find_fixed_range_with_block_index(segment, Some(&block_index), 150),
3220 SegmentRangeInclusive::new(100, 199)
3221 );
3222 assert_eq!(
3223 sf_rw.find_fixed_range_with_block_index(segment, Some(&block_index), 199),
3224 SegmentRangeInclusive::new(100, 199)
3225 );
3226
3227 assert_eq!(
3230 sf_rw.find_fixed_range_with_block_index(segment, Some(&block_index), 300),
3231 SegmentRangeInclusive::new(300, 399)
3232 );
3233 assert_eq!(
3234 sf_rw.find_fixed_range_with_block_index(segment, Some(&block_index), 350),
3235 SegmentRangeInclusive::new(300, 399)
3236 );
3237
3238 assert_eq!(
3240 sf_rw.find_fixed_range_with_block_index(segment, Some(&block_index), 500),
3241 SegmentRangeInclusive::new(500, 599)
3242 );
3243
3244 assert_eq!(
3246 sf_rw.find_fixed_range_with_block_index(segment, Some(&block_index), 1000),
3247 SegmentRangeInclusive::new(1000, 1099)
3248 );
3249
3250 let mixed_size_index = BTreeMap::from_iter([
3253 (49, SegmentRangeInclusive::new(0, 49)), (149, SegmentRangeInclusive::new(50, 149)), (349, SegmentRangeInclusive::new(150, 349)), ]);
3257
3258 assert_eq!(
3260 sf_rw.find_fixed_range_with_block_index(segment, Some(&mixed_size_index), 25),
3261 SegmentRangeInclusive::new(0, 49)
3262 );
3263 assert_eq!(
3264 sf_rw.find_fixed_range_with_block_index(segment, Some(&mixed_size_index), 100),
3265 SegmentRangeInclusive::new(50, 149)
3266 );
3267 assert_eq!(
3268 sf_rw.find_fixed_range_with_block_index(segment, Some(&mixed_size_index), 200),
3269 SegmentRangeInclusive::new(150, 349)
3270 );
3271
3272 assert_eq!(
3275 sf_rw.find_fixed_range_with_block_index(segment, Some(&mixed_size_index), 350),
3276 SegmentRangeInclusive::new(350, 449)
3277 );
3278 assert_eq!(
3279 sf_rw.find_fixed_range_with_block_index(segment, Some(&mixed_size_index), 450),
3280 SegmentRangeInclusive::new(450, 549)
3281 );
3282 assert_eq!(
3283 sf_rw.find_fixed_range_with_block_index(segment, Some(&mixed_size_index), 550),
3284 SegmentRangeInclusive::new(550, 649)
3285 );
3286
3287 Ok(())
3288 }
3289}