reth_provider/
changeset_walker.rs1use crate::ProviderResult;
5use alloy_primitives::BlockNumber;
6use reth_db::models::AccountBeforeTx;
7use reth_db_api::models::BlockNumberAddress;
8use reth_storage_api::{ChangeSetReader, ChangesetEntry, StorageChangeSetReader};
9use std::ops::{Bound, RangeBounds};
10
11#[derive(Debug)]
15pub struct StaticFileAccountChangesetWalker<P> {
16 provider: P,
18 end_block: Option<BlockNumber>,
20 current_block: BlockNumber,
22 current_changesets: Vec<AccountBeforeTx>,
24 changeset_index: usize,
26}
27
28impl<P> StaticFileAccountChangesetWalker<P> {
29 pub fn new(provider: P, range: impl RangeBounds<BlockNumber>) -> Self {
38 let start = match range.start_bound() {
39 Bound::Included(&n) => n,
40 Bound::Excluded(&n) => n + 1,
41 Bound::Unbounded => 0,
42 };
43
44 let end_block = match range.end_bound() {
45 Bound::Included(&n) => Some(n + 1),
46 Bound::Excluded(&n) => Some(n),
47 Bound::Unbounded => None,
48 };
49
50 Self {
51 provider,
52 end_block,
53 current_block: start,
54 current_changesets: Vec::new(),
55 changeset_index: 0,
56 }
57 }
58}
59
60impl<P> Iterator for StaticFileAccountChangesetWalker<P>
61where
62 P: ChangeSetReader,
63{
64 type Item = ProviderResult<(BlockNumber, AccountBeforeTx)>;
65
66 fn next(&mut self) -> Option<Self::Item> {
67 if let Some(changeset) = self.current_changesets.get(self.changeset_index).cloned() {
69 self.changeset_index += 1;
70 return Some(Ok((self.current_block, changeset)));
71 }
72
73 if !self.current_changesets.is_empty() {
79 self.current_block += 1;
80 }
81
82 while self.end_block.is_none_or(|end| self.current_block < end) {
84 match self.provider.account_block_changeset(self.current_block) {
85 Ok(changesets) if !changesets.is_empty() => {
86 self.current_changesets = changesets;
87 self.changeset_index = 1;
88 return Some(Ok((self.current_block, self.current_changesets[0].clone())));
89 }
90 Ok(_) => self.current_block += 1,
91 Err(e) => {
92 self.current_block += 1;
93 return Some(Err(e));
94 }
95 }
96 }
97
98 None
99 }
100}
101
102#[derive(Debug)]
104pub struct StaticFileStorageChangesetWalker<P> {
105 provider: P,
107 end_block: Option<BlockNumber>,
109 current_block: BlockNumber,
111 current_changesets: Vec<(BlockNumberAddress, ChangesetEntry)>,
113 changeset_index: usize,
115}
116
117impl<P> StaticFileStorageChangesetWalker<P> {
118 pub fn new(provider: P, range: impl RangeBounds<BlockNumber>) -> Self {
120 let start = match range.start_bound() {
121 Bound::Included(&n) => n,
122 Bound::Excluded(&n) => n + 1,
123 Bound::Unbounded => 0,
124 };
125
126 let end_block = match range.end_bound() {
127 Bound::Included(&n) => Some(n + 1),
128 Bound::Excluded(&n) => Some(n),
129 Bound::Unbounded => None,
130 };
131
132 Self {
133 provider,
134 end_block,
135 current_block: start,
136 current_changesets: Vec::new(),
137 changeset_index: 0,
138 }
139 }
140}
141
142impl<P> Iterator for StaticFileStorageChangesetWalker<P>
143where
144 P: StorageChangeSetReader,
145{
146 type Item = ProviderResult<(BlockNumberAddress, ChangesetEntry)>;
147
148 fn next(&mut self) -> Option<Self::Item> {
149 if let Some(changeset) = self.current_changesets.get(self.changeset_index).copied() {
150 self.changeset_index += 1;
151 return Some(Ok(changeset));
152 }
153
154 if !self.current_changesets.is_empty() {
155 self.current_block += 1;
156 }
157
158 while self.end_block.is_none_or(|end| self.current_block < end) {
159 match self.provider.storage_changeset(self.current_block) {
160 Ok(changesets) if !changesets.is_empty() => {
161 self.current_changesets = changesets;
162 self.changeset_index = 1;
163 return Some(Ok(self.current_changesets[0]));
164 }
165 Ok(_) => self.current_block += 1,
166 Err(e) => {
167 self.current_block += 1;
168 return Some(Err(e));
169 }
170 }
171 }
172
173 None
174 }
175}