Skip to main content

reth_provider/
changeset_walker.rs

1//! Account/storage changeset iteration support for walking through historical state changes in
2//! static files.
3
4use crate::ProviderResult;
5use alloy_primitives::BlockNumber;
6use reth_db::models::AccountBeforeTx;
7use reth_db_api::models::BlockNumberAddress;
8use reth_storage_api::{ChangeSetReader, ChangesetEntry, StorageChangeSetReader};
9use std::ops::{Bound, RangeBounds};
10
11/// Iterator that walks account changesets from static files in a block range.
12///
13/// This iterator fetches changesets block by block to avoid loading everything into memory.
14#[derive(Debug)]
15pub struct StaticFileAccountChangesetWalker<P> {
16    /// Static file provider
17    provider: P,
18    /// End block (exclusive). `None` means iterate until exhausted.
19    end_block: Option<BlockNumber>,
20    /// Current block being processed
21    current_block: BlockNumber,
22    /// Changesets for current block
23    current_changesets: Vec<AccountBeforeTx>,
24    /// Index within current block's changesets
25    changeset_index: usize,
26}
27
28impl<P> StaticFileAccountChangesetWalker<P> {
29    /// Create a new static file changeset walker.
30    ///
31    /// Accepts any range type that implements `RangeBounds<BlockNumber>`, including:
32    /// - `Range<BlockNumber>` (e.g., `0..100`)
33    /// - `RangeInclusive<BlockNumber>` (e.g., `0..=99`)
34    /// - `RangeFrom<BlockNumber>` (e.g., `0..`) - iterates until exhausted
35    ///
36    /// If there is no start bound, 0 is used as the start block.
37    pub fn new(provider: P, range: impl RangeBounds<BlockNumber>) -> Self {
38        let start = match range.start_bound() {
39            Bound::Included(&n) => n,
40            Bound::Excluded(&n) => n + 1,
41            Bound::Unbounded => 0,
42        };
43
44        let end_block = match range.end_bound() {
45            Bound::Included(&n) => Some(n + 1),
46            Bound::Excluded(&n) => Some(n),
47            Bound::Unbounded => None,
48        };
49
50        Self {
51            provider,
52            end_block,
53            current_block: start,
54            current_changesets: Vec::new(),
55            changeset_index: 0,
56        }
57    }
58}
59
60impl<P> Iterator for StaticFileAccountChangesetWalker<P>
61where
62    P: ChangeSetReader,
63{
64    type Item = ProviderResult<(BlockNumber, AccountBeforeTx)>;
65
66    fn next(&mut self) -> Option<Self::Item> {
67        // Yield remaining changesets from current block
68        if let Some(changeset) = self.current_changesets.get(self.changeset_index).cloned() {
69            self.changeset_index += 1;
70            return Some(Ok((self.current_block, changeset)));
71        }
72
73        // Advance to next block if we exhausted the previous one
74        //
75        // If we do not return from the previous condition, but the current changesets are
76        // non-empty, then we have run past the current changeset and must fetch the next
77        // changeset.
78        if !self.current_changesets.is_empty() {
79            self.current_block += 1;
80        }
81
82        // Load next block with changesets
83        while self.end_block.is_none_or(|end| self.current_block < end) {
84            match self.provider.account_block_changeset(self.current_block) {
85                Ok(changesets) if !changesets.is_empty() => {
86                    self.current_changesets = changesets;
87                    self.changeset_index = 1;
88                    return Some(Ok((self.current_block, self.current_changesets[0].clone())));
89                }
90                Ok(_) => self.current_block += 1,
91                Err(e) => {
92                    self.current_block += 1;
93                    return Some(Err(e));
94                }
95            }
96        }
97
98        None
99    }
100}
101
102/// Iterator that walks storage changesets from static files in a block range.
103#[derive(Debug)]
104pub struct StaticFileStorageChangesetWalker<P> {
105    /// Static file provider
106    provider: P,
107    /// End block (exclusive). `None` means iterate until exhausted.
108    end_block: Option<BlockNumber>,
109    /// Current block being processed
110    current_block: BlockNumber,
111    /// Changesets for current block
112    current_changesets: Vec<(BlockNumberAddress, ChangesetEntry)>,
113    /// Index within current block's changesets
114    changeset_index: usize,
115}
116
117impl<P> StaticFileStorageChangesetWalker<P> {
118    /// Create a new static file storage changeset walker.
119    pub fn new(provider: P, range: impl RangeBounds<BlockNumber>) -> Self {
120        let start = match range.start_bound() {
121            Bound::Included(&n) => n,
122            Bound::Excluded(&n) => n + 1,
123            Bound::Unbounded => 0,
124        };
125
126        let end_block = match range.end_bound() {
127            Bound::Included(&n) => Some(n + 1),
128            Bound::Excluded(&n) => Some(n),
129            Bound::Unbounded => None,
130        };
131
132        Self {
133            provider,
134            end_block,
135            current_block: start,
136            current_changesets: Vec::new(),
137            changeset_index: 0,
138        }
139    }
140}
141
142impl<P> Iterator for StaticFileStorageChangesetWalker<P>
143where
144    P: StorageChangeSetReader,
145{
146    type Item = ProviderResult<(BlockNumberAddress, ChangesetEntry)>;
147
148    fn next(&mut self) -> Option<Self::Item> {
149        if let Some(changeset) = self.current_changesets.get(self.changeset_index).copied() {
150            self.changeset_index += 1;
151            return Some(Ok(changeset));
152        }
153
154        if !self.current_changesets.is_empty() {
155            self.current_block += 1;
156        }
157
158        while self.end_block.is_none_or(|end| self.current_block < end) {
159            match self.provider.storage_changeset(self.current_block) {
160                Ok(changesets) if !changesets.is_empty() => {
161                    self.current_changesets = changesets;
162                    self.changeset_index = 1;
163                    return Some(Ok(self.current_changesets[0]));
164                }
165                Ok(_) => self.current_block += 1,
166                Err(e) => {
167                    self.current_block += 1;
168                    return Some(Err(e));
169                }
170            }
171        }
172
173        None
174    }
175}