1use super::job::BackfillJobResult;
2use crate::{BackfillJob, SingleBlockBackfillJob};
3use alloy_primitives::BlockNumber;
4use futures::{
5 stream::{FuturesOrdered, Stream},
6 StreamExt,
7};
8use reth_ethereum_primitives::EthPrimitives;
9use reth_evm::execute::{BlockExecutionError, BlockExecutionOutput, BlockExecutorProvider};
10use reth_node_api::NodePrimitives;
11use reth_primitives_traits::RecoveredBlock;
12use reth_provider::{BlockReader, Chain, StateProviderFactory};
13use reth_prune_types::PruneModes;
14use reth_stages_api::ExecutionStageThresholds;
15use reth_tracing::tracing::debug;
16use std::{
17 ops::RangeInclusive,
18 pin::Pin,
19 task::{ready, Context, Poll},
20};
21use tokio::task::JoinHandle;
22
23pub(crate) const DEFAULT_PARALLELISM: usize = 4;
25const DEFAULT_BATCH_SIZE: usize = 100;
27
28type BackfillTaskIterator<T> =
30 Box<dyn Iterator<Item = BackfillJobResult<T>> + Send + Sync + 'static>;
31
32struct BackfillTaskOutput<T> {
34 job: BackfillTaskIterator<T>,
35 result: Option<BackfillJobResult<T>>,
36}
37
38type BackfillTasks<T> = FuturesOrdered<JoinHandle<BackfillTaskOutput<T>>>;
40
41type SingleBlockStreamItem<N = EthPrimitives> = (
42 RecoveredBlock<<N as NodePrimitives>::Block>,
43 BlockExecutionOutput<<N as NodePrimitives>::Receipt>,
44);
45type BatchBlockStreamItem<N = EthPrimitives> = Chain<N>;
46
47#[derive(Debug)]
52pub struct StreamBackfillJob<E, P, T> {
53 executor: E,
54 provider: P,
55 prune_modes: PruneModes,
56 range: RangeInclusive<BlockNumber>,
57 tasks: BackfillTasks<T>,
58 parallelism: usize,
59 batch_size: usize,
60 thresholds: ExecutionStageThresholds,
61}
62
63impl<E, P, T> StreamBackfillJob<E, P, T>
64where
65 T: Send + Sync + 'static,
66{
67 pub const fn with_parallelism(mut self, parallelism: usize) -> Self {
69 self.parallelism = parallelism;
70 self
71 }
72
73 pub const fn with_batch_size(mut self, batch_size: usize) -> Self {
75 self.batch_size = batch_size;
76 self
77 }
78
79 fn push_back(&mut self, mut job: BackfillTaskIterator<T>) {
82 self.tasks.push_back(tokio::task::spawn_blocking(move || BackfillTaskOutput {
83 result: job.next(),
84 job,
85 }));
86 }
87
88 fn push_front(&mut self, mut job: BackfillTaskIterator<T>) {
91 self.tasks.push_front(tokio::task::spawn_blocking(move || BackfillTaskOutput {
92 result: job.next(),
93 job,
94 }));
95 }
96
97 fn poll_next_task(&mut self, cx: &mut Context<'_>) -> Poll<Option<BackfillJobResult<T>>> {
99 while let Some(res) = ready!(self.tasks.poll_next_unpin(cx)) {
100 let task_result = res.map_err(BlockExecutionError::other)?;
101
102 if let BackfillTaskOutput { result: Some(job_result), job } = task_result {
103 self.push_front(job);
107
108 return Poll::Ready(Some(job_result))
109 };
110 }
111
112 Poll::Ready(None)
113 }
114}
115
116impl<E, P> Stream for StreamBackfillJob<E, P, SingleBlockStreamItem<E::Primitives>>
117where
118 E: BlockExecutorProvider<Primitives: NodePrimitives<Block = P::Block>> + Clone + 'static,
119 P: BlockReader + StateProviderFactory + Clone + Unpin + 'static,
120{
121 type Item = BackfillJobResult<SingleBlockStreamItem<E::Primitives>>;
122
123 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
124 let this = self.get_mut();
125
126 while this.tasks.len() < this.parallelism {
128 let Some(block_number) = this.range.next() else {
130 debug!(target: "exex::backfill", tasks = %this.tasks.len(), range = ?this.range, "No more single blocks to backfill");
131 break;
132 };
133
134 debug!(target: "exex::backfill", tasks = %this.tasks.len(), ?block_number, "Spawning new single block backfill task");
136 let job = Box::new(SingleBlockBackfillJob {
137 executor: this.executor.clone(),
138 provider: this.provider.clone(),
139 range: block_number..=block_number,
140 stream_parallelism: this.parallelism,
141 }) as BackfillTaskIterator<_>;
142 this.push_back(job);
143 }
144
145 this.poll_next_task(cx)
146 }
147}
148
149impl<E, P> Stream for StreamBackfillJob<E, P, BatchBlockStreamItem<E::Primitives>>
150where
151 E: BlockExecutorProvider<Primitives: NodePrimitives<Block = P::Block>> + Clone + 'static,
152 P: BlockReader + StateProviderFactory + Clone + Unpin + 'static,
153{
154 type Item = BackfillJobResult<BatchBlockStreamItem<E::Primitives>>;
155
156 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
157 let this = self.get_mut();
158
159 loop {
160 while this.tasks.len() < this.parallelism {
162 let mut range = this.range.by_ref().take(this.batch_size);
164 let start = range.next();
165 let range_bounds = start.zip(range.last().or(start));
166
167 let Some(range) = range_bounds.map(|(first, last)| first..=last) else {
169 debug!(target: "exex::backfill", tasks = %this.tasks.len(), range = ?this.range, "No more block batches to backfill");
170 break;
171 };
172
173 debug!(target: "exex::backfill", tasks = %this.tasks.len(), ?range, "Spawning new block batch backfill task");
175 let job = Box::new(BackfillJob {
176 executor: this.executor.clone(),
177 provider: this.provider.clone(),
178 prune_modes: this.prune_modes.clone(),
179 thresholds: this.thresholds.clone(),
180 range,
181 stream_parallelism: this.parallelism,
182 }) as BackfillTaskIterator<_>;
183 this.push_back(job);
184 }
185
186 let res = ready!(this.poll_next_task(cx));
187
188 if res.is_some() {
189 return Poll::Ready(res);
190 }
191
192 if this.range.is_empty() {
193 return Poll::Ready(None);
195 }
196 }
197 }
198}
199
200impl<E, P> From<SingleBlockBackfillJob<E, P>> for StreamBackfillJob<E, P, SingleBlockStreamItem> {
201 fn from(job: SingleBlockBackfillJob<E, P>) -> Self {
202 Self {
203 executor: job.executor,
204 provider: job.provider,
205 prune_modes: PruneModes::default(),
206 range: job.range,
207 tasks: FuturesOrdered::new(),
208 parallelism: job.stream_parallelism,
209 batch_size: 1,
210 thresholds: ExecutionStageThresholds { max_blocks: Some(1), ..Default::default() },
211 }
212 }
213}
214
215impl<E, P> From<BackfillJob<E, P>> for StreamBackfillJob<E, P, BatchBlockStreamItem<E::Primitives>>
216where
217 E: BlockExecutorProvider,
218{
219 fn from(job: BackfillJob<E, P>) -> Self {
220 let batch_size = job.thresholds.max_blocks.map_or(DEFAULT_BATCH_SIZE, |max| max as usize);
221 Self {
222 executor: job.executor,
223 provider: job.provider,
224 prune_modes: job.prune_modes,
225 range: job.range,
226 tasks: FuturesOrdered::new(),
227 parallelism: job.stream_parallelism,
228 batch_size,
229 thresholds: ExecutionStageThresholds {
230 max_blocks: Some(batch_size as u64),
231 ..job.thresholds
232 },
233 }
234 }
235}
236
237#[cfg(test)]
238mod tests {
239 use crate::{
240 backfill::test_utils::{
241 blocks_and_execution_outcome, blocks_and_execution_outputs, chain_spec,
242 },
243 BackfillJobFactory,
244 };
245 use futures::StreamExt;
246 use reth_db_common::init::init_genesis;
247 use reth_evm_ethereum::execute::EthExecutorProvider;
248 use reth_primitives_traits::crypto::secp256k1::public_key_to_address;
249 use reth_provider::{
250 providers::BlockchainProvider, test_utils::create_test_provider_factory_with_chain_spec,
251 };
252 use reth_stages_api::ExecutionStageThresholds;
253 use reth_testing_utils::generators;
254
255 #[tokio::test]
256 async fn test_single_blocks() -> eyre::Result<()> {
257 reth_tracing::init_test_tracing();
258
259 let key_pair = generators::generate_key(&mut generators::rng());
261 let address = public_key_to_address(key_pair.public_key());
262
263 let chain_spec = chain_spec(address);
264
265 let executor = EthExecutorProvider::ethereum(chain_spec.clone());
266 let provider_factory = create_test_provider_factory_with_chain_spec(chain_spec.clone());
267 init_genesis(&provider_factory)?;
268 let blockchain_db = BlockchainProvider::new(provider_factory.clone())?;
269
270 let blocks_and_execution_outcomes =
272 blocks_and_execution_outputs(provider_factory, chain_spec, key_pair)?;
273
274 let factory = BackfillJobFactory::new(executor.clone(), blockchain_db.clone());
276 let mut backfill_stream = factory.backfill(1..=1).into_single_blocks().into_stream();
277
278 let (block, mut execution_output) = backfill_stream.next().await.unwrap().unwrap();
280 execution_output.state.reverts.sort();
281 let expected_block = blocks_and_execution_outcomes[0].0.clone();
282 let expected_output = &blocks_and_execution_outcomes[0].1;
283 assert_eq!(block, expected_block);
284 assert_eq!(&execution_output, expected_output);
285
286 assert!(backfill_stream.next().await.is_none());
288
289 Ok(())
290 }
291
292 #[tokio::test]
293 async fn test_batch() -> eyre::Result<()> {
294 reth_tracing::init_test_tracing();
295
296 let key_pair = generators::generate_key(&mut generators::rng());
298 let address = public_key_to_address(key_pair.public_key());
299
300 let chain_spec = chain_spec(address);
301
302 let executor = EthExecutorProvider::ethereum(chain_spec.clone());
303 let provider_factory = create_test_provider_factory_with_chain_spec(chain_spec.clone());
304 init_genesis(&provider_factory)?;
305 let blockchain_db = BlockchainProvider::new(provider_factory.clone())?;
306
307 let (blocks, execution_outcome) =
309 blocks_and_execution_outcome(provider_factory, chain_spec, key_pair)?;
310
311 let factory = BackfillJobFactory::new(executor.clone(), blockchain_db.clone())
313 .with_thresholds(ExecutionStageThresholds { max_blocks: Some(2), ..Default::default() })
314 .with_stream_parallelism(1);
315 let mut backfill_stream = factory.backfill(1..=2).into_stream();
316 let mut chain = backfill_stream.next().await.unwrap().unwrap();
317 chain.execution_outcome_mut().state_mut().reverts.sort();
318
319 assert!(chain.blocks_iter().eq(&blocks));
320 assert_eq!(chain.execution_outcome(), &execution_outcome);
321
322 assert!(backfill_stream.next().await.is_none());
324
325 Ok(())
326 }
327}