reth_provider/
changeset_walker.rs1use crate::ProviderResult;
5use alloy_primitives::BlockNumber;
6use reth_db::models::AccountBeforeTx;
7use reth_db_api::models::BlockNumberAddress;
8use reth_primitives_traits::StorageEntry;
9use reth_storage_api::{ChangeSetReader, StorageChangeSetReader};
10use std::ops::{Bound, RangeBounds};
11
12#[derive(Debug)]
16pub struct StaticFileAccountChangesetWalker<P> {
17 provider: P,
19 end_block: Option<BlockNumber>,
21 current_block: BlockNumber,
23 current_changesets: Vec<AccountBeforeTx>,
25 changeset_index: usize,
27}
28
29impl<P> StaticFileAccountChangesetWalker<P> {
30 pub fn new(provider: P, range: impl RangeBounds<BlockNumber>) -> Self {
39 let start = match range.start_bound() {
40 Bound::Included(&n) => n,
41 Bound::Excluded(&n) => n + 1,
42 Bound::Unbounded => 0,
43 };
44
45 let end_block = match range.end_bound() {
46 Bound::Included(&n) => Some(n + 1),
47 Bound::Excluded(&n) => Some(n),
48 Bound::Unbounded => None,
49 };
50
51 Self {
52 provider,
53 end_block,
54 current_block: start,
55 current_changesets: Vec::new(),
56 changeset_index: 0,
57 }
58 }
59}
60
61impl<P> Iterator for StaticFileAccountChangesetWalker<P>
62where
63 P: ChangeSetReader,
64{
65 type Item = ProviderResult<(BlockNumber, AccountBeforeTx)>;
66
67 fn next(&mut self) -> Option<Self::Item> {
68 if let Some(changeset) = self.current_changesets.get(self.changeset_index).cloned() {
70 self.changeset_index += 1;
71 return Some(Ok((self.current_block, changeset)));
72 }
73
74 if !self.current_changesets.is_empty() {
80 self.current_block += 1;
81 }
82
83 while self.end_block.is_none_or(|end| self.current_block < end) {
85 match self.provider.account_block_changeset(self.current_block) {
86 Ok(changesets) if !changesets.is_empty() => {
87 self.current_changesets = changesets;
88 self.changeset_index = 1;
89 return Some(Ok((self.current_block, self.current_changesets[0].clone())));
90 }
91 Ok(_) => self.current_block += 1,
92 Err(e) => {
93 self.current_block += 1;
94 return Some(Err(e));
95 }
96 }
97 }
98
99 None
100 }
101}
102
103#[derive(Debug)]
105pub struct StaticFileStorageChangesetWalker<P> {
106 provider: P,
108 end_block: Option<BlockNumber>,
110 current_block: BlockNumber,
112 current_changesets: Vec<(BlockNumberAddress, StorageEntry)>,
114 changeset_index: usize,
116}
117
118impl<P> StaticFileStorageChangesetWalker<P> {
119 pub fn new(provider: P, range: impl RangeBounds<BlockNumber>) -> Self {
121 let start = match range.start_bound() {
122 Bound::Included(&n) => n,
123 Bound::Excluded(&n) => n + 1,
124 Bound::Unbounded => 0,
125 };
126
127 let end_block = match range.end_bound() {
128 Bound::Included(&n) => Some(n + 1),
129 Bound::Excluded(&n) => Some(n),
130 Bound::Unbounded => None,
131 };
132
133 Self {
134 provider,
135 end_block,
136 current_block: start,
137 current_changesets: Vec::new(),
138 changeset_index: 0,
139 }
140 }
141}
142
143impl<P> Iterator for StaticFileStorageChangesetWalker<P>
144where
145 P: StorageChangeSetReader,
146{
147 type Item = ProviderResult<(BlockNumberAddress, StorageEntry)>;
148
149 fn next(&mut self) -> Option<Self::Item> {
150 if let Some(changeset) = self.current_changesets.get(self.changeset_index).copied() {
151 self.changeset_index += 1;
152 return Some(Ok(changeset));
153 }
154
155 if !self.current_changesets.is_empty() {
156 self.current_block += 1;
157 }
158
159 while self.end_block.is_none_or(|end| self.current_block < end) {
160 match self.provider.storage_changeset(self.current_block) {
161 Ok(changesets) if !changesets.is_empty() => {
162 self.current_changesets = changesets;
163 self.changeset_index = 1;
164 return Some(Ok(self.current_changesets[0]));
165 }
166 Ok(_) => self.current_block += 1,
167 Err(e) => {
168 self.current_block += 1;
169 return Some(Err(e));
170 }
171 }
172 }
173
174 None
175 }
176}