reth_provider/
changeset_walker.rs

1//! Account/storage changeset iteration support for walking through historical state changes in
2//! static files.
3
4use crate::ProviderResult;
5use alloy_primitives::BlockNumber;
6use reth_db::models::AccountBeforeTx;
7use reth_db_api::models::BlockNumberAddress;
8use reth_primitives_traits::StorageEntry;
9use reth_storage_api::{ChangeSetReader, StorageChangeSetReader};
10use std::ops::{Bound, RangeBounds};
11
12/// Iterator that walks account changesets from static files in a block range.
13///
14/// This iterator fetches changesets block by block to avoid loading everything into memory.
15#[derive(Debug)]
16pub struct StaticFileAccountChangesetWalker<P> {
17    /// Static file provider
18    provider: P,
19    /// End block (exclusive). `None` means iterate until exhausted.
20    end_block: Option<BlockNumber>,
21    /// Current block being processed
22    current_block: BlockNumber,
23    /// Changesets for current block
24    current_changesets: Vec<AccountBeforeTx>,
25    /// Index within current block's changesets
26    changeset_index: usize,
27}
28
29impl<P> StaticFileAccountChangesetWalker<P> {
30    /// Create a new static file changeset walker.
31    ///
32    /// Accepts any range type that implements `RangeBounds<BlockNumber>`, including:
33    /// - `Range<BlockNumber>` (e.g., `0..100`)
34    /// - `RangeInclusive<BlockNumber>` (e.g., `0..=99`)
35    /// - `RangeFrom<BlockNumber>` (e.g., `0..`) - iterates until exhausted
36    ///
37    /// If there is no start bound, 0 is used as the start block.
38    pub fn new(provider: P, range: impl RangeBounds<BlockNumber>) -> Self {
39        let start = match range.start_bound() {
40            Bound::Included(&n) => n,
41            Bound::Excluded(&n) => n + 1,
42            Bound::Unbounded => 0,
43        };
44
45        let end_block = match range.end_bound() {
46            Bound::Included(&n) => Some(n + 1),
47            Bound::Excluded(&n) => Some(n),
48            Bound::Unbounded => None,
49        };
50
51        Self {
52            provider,
53            end_block,
54            current_block: start,
55            current_changesets: Vec::new(),
56            changeset_index: 0,
57        }
58    }
59}
60
61impl<P> Iterator for StaticFileAccountChangesetWalker<P>
62where
63    P: ChangeSetReader,
64{
65    type Item = ProviderResult<(BlockNumber, AccountBeforeTx)>;
66
67    fn next(&mut self) -> Option<Self::Item> {
68        // Yield remaining changesets from current block
69        if let Some(changeset) = self.current_changesets.get(self.changeset_index).cloned() {
70            self.changeset_index += 1;
71            return Some(Ok((self.current_block, changeset)));
72        }
73
74        // Advance to next block if we exhausted the previous one
75        //
76        // If we do not return from the previous condition, but the current changesets are
77        // non-empty, then we have run past the current changeset and must fetch the next
78        // changeset.
79        if !self.current_changesets.is_empty() {
80            self.current_block += 1;
81        }
82
83        // Load next block with changesets
84        while self.end_block.is_none_or(|end| self.current_block < end) {
85            match self.provider.account_block_changeset(self.current_block) {
86                Ok(changesets) if !changesets.is_empty() => {
87                    self.current_changesets = changesets;
88                    self.changeset_index = 1;
89                    return Some(Ok((self.current_block, self.current_changesets[0].clone())));
90                }
91                Ok(_) => self.current_block += 1,
92                Err(e) => {
93                    self.current_block += 1;
94                    return Some(Err(e));
95                }
96            }
97        }
98
99        None
100    }
101}
102
103/// Iterator that walks storage changesets from static files in a block range.
104#[derive(Debug)]
105pub struct StaticFileStorageChangesetWalker<P> {
106    /// Static file provider
107    provider: P,
108    /// End block (exclusive). `None` means iterate until exhausted.
109    end_block: Option<BlockNumber>,
110    /// Current block being processed
111    current_block: BlockNumber,
112    /// Changesets for current block
113    current_changesets: Vec<(BlockNumberAddress, StorageEntry)>,
114    /// Index within current block's changesets
115    changeset_index: usize,
116}
117
118impl<P> StaticFileStorageChangesetWalker<P> {
119    /// Create a new static file storage changeset walker.
120    pub fn new(provider: P, range: impl RangeBounds<BlockNumber>) -> Self {
121        let start = match range.start_bound() {
122            Bound::Included(&n) => n,
123            Bound::Excluded(&n) => n + 1,
124            Bound::Unbounded => 0,
125        };
126
127        let end_block = match range.end_bound() {
128            Bound::Included(&n) => Some(n + 1),
129            Bound::Excluded(&n) => Some(n),
130            Bound::Unbounded => None,
131        };
132
133        Self {
134            provider,
135            end_block,
136            current_block: start,
137            current_changesets: Vec::new(),
138            changeset_index: 0,
139        }
140    }
141}
142
143impl<P> Iterator for StaticFileStorageChangesetWalker<P>
144where
145    P: StorageChangeSetReader,
146{
147    type Item = ProviderResult<(BlockNumberAddress, StorageEntry)>;
148
149    fn next(&mut self) -> Option<Self::Item> {
150        if let Some(changeset) = self.current_changesets.get(self.changeset_index).copied() {
151            self.changeset_index += 1;
152            return Some(Ok(changeset));
153        }
154
155        if !self.current_changesets.is_empty() {
156            self.current_block += 1;
157        }
158
159        while self.end_block.is_none_or(|end| self.current_block < end) {
160            match self.provider.storage_changeset(self.current_block) {
161                Ok(changesets) if !changesets.is_empty() => {
162                    self.current_changesets = changesets;
163                    self.changeset_index = 1;
164                    return Some(Ok(self.current_changesets[0]));
165                }
166                Ok(_) => self.current_block += 1,
167                Err(e) => {
168                    self.current_block += 1;
169                    return Some(Err(e));
170                }
171            }
172        }
173
174        None
175    }
176}