1use super::job::BackfillJobResult;
2use crate::{BackfillJob, SingleBlockBackfillJob};
3use alloy_primitives::BlockNumber;
4use futures::{
5 stream::{FuturesOrdered, Stream},
6 StreamExt,
7};
8use reth_evm::execute::{BlockExecutionError, BlockExecutionOutput, BlockExecutorProvider};
9use reth_node_api::NodePrimitives;
10use reth_primitives::{EthPrimitives, RecoveredBlock};
11use reth_provider::{BlockReader, Chain, StateProviderFactory};
12use reth_prune_types::PruneModes;
13use reth_stages_api::ExecutionStageThresholds;
14use reth_tracing::tracing::debug;
15use std::{
16 ops::RangeInclusive,
17 pin::Pin,
18 task::{ready, Context, Poll},
19};
20use tokio::task::JoinHandle;
21
22pub(crate) const DEFAULT_PARALLELISM: usize = 4;
24const DEFAULT_BATCH_SIZE: usize = 100;
26
27type BackfillTaskIterator<T> =
29 Box<dyn Iterator<Item = BackfillJobResult<T>> + Send + Sync + 'static>;
30
31struct BackfillTaskOutput<T> {
33 job: BackfillTaskIterator<T>,
34 result: Option<BackfillJobResult<T>>,
35}
36
37type BackfillTasks<T> = FuturesOrdered<JoinHandle<BackfillTaskOutput<T>>>;
39
40type SingleBlockStreamItem<N = EthPrimitives> = (
41 RecoveredBlock<<N as NodePrimitives>::Block>,
42 BlockExecutionOutput<<N as NodePrimitives>::Receipt>,
43);
44type BatchBlockStreamItem<N = EthPrimitives> = Chain<N>;
45
46#[derive(Debug)]
51pub struct StreamBackfillJob<E, P, T> {
52 executor: E,
53 provider: P,
54 prune_modes: PruneModes,
55 range: RangeInclusive<BlockNumber>,
56 tasks: BackfillTasks<T>,
57 parallelism: usize,
58 batch_size: usize,
59 thresholds: ExecutionStageThresholds,
60}
61
62impl<E, P, T> StreamBackfillJob<E, P, T>
63where
64 T: Send + Sync + 'static,
65{
66 pub const fn with_parallelism(mut self, parallelism: usize) -> Self {
68 self.parallelism = parallelism;
69 self
70 }
71
72 pub const fn with_batch_size(mut self, batch_size: usize) -> Self {
74 self.batch_size = batch_size;
75 self
76 }
77
78 fn push_back(&mut self, mut job: BackfillTaskIterator<T>) {
81 self.tasks.push_back(tokio::task::spawn_blocking(move || BackfillTaskOutput {
82 result: job.next(),
83 job,
84 }));
85 }
86
87 fn push_front(&mut self, mut job: BackfillTaskIterator<T>) {
90 self.tasks.push_front(tokio::task::spawn_blocking(move || BackfillTaskOutput {
91 result: job.next(),
92 job,
93 }));
94 }
95
96 fn poll_next_task(&mut self, cx: &mut Context<'_>) -> Poll<Option<BackfillJobResult<T>>> {
98 while let Some(res) = ready!(self.tasks.poll_next_unpin(cx)) {
99 let task_result = res.map_err(BlockExecutionError::other)?;
100
101 if let BackfillTaskOutput { result: Some(job_result), job } = task_result {
102 self.push_front(job);
106
107 return Poll::Ready(Some(job_result))
108 };
109 }
110
111 Poll::Ready(None)
112 }
113}
114
115impl<E, P> Stream for StreamBackfillJob<E, P, SingleBlockStreamItem<E::Primitives>>
116where
117 E: BlockExecutorProvider<Primitives: NodePrimitives<Block = P::Block>> + Clone + 'static,
118 P: BlockReader + StateProviderFactory + Clone + Unpin + 'static,
119{
120 type Item = BackfillJobResult<SingleBlockStreamItem<E::Primitives>>;
121
122 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
123 let this = self.get_mut();
124
125 while this.tasks.len() < this.parallelism {
127 let Some(block_number) = this.range.next() else {
129 debug!(target: "exex::backfill", tasks = %this.tasks.len(), range = ?this.range, "No more single blocks to backfill");
130 break;
131 };
132
133 debug!(target: "exex::backfill", tasks = %this.tasks.len(), ?block_number, "Spawning new single block backfill task");
135 let job = Box::new(SingleBlockBackfillJob {
136 executor: this.executor.clone(),
137 provider: this.provider.clone(),
138 range: block_number..=block_number,
139 stream_parallelism: this.parallelism,
140 }) as BackfillTaskIterator<_>;
141 this.push_back(job);
142 }
143
144 this.poll_next_task(cx)
145 }
146}
147
148impl<E, P> Stream for StreamBackfillJob<E, P, BatchBlockStreamItem<E::Primitives>>
149where
150 E: BlockExecutorProvider<Primitives: NodePrimitives<Block = P::Block>> + Clone + 'static,
151 P: BlockReader + StateProviderFactory + Clone + Unpin + 'static,
152{
153 type Item = BackfillJobResult<BatchBlockStreamItem<E::Primitives>>;
154
155 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
156 let this = self.get_mut();
157
158 loop {
159 while this.tasks.len() < this.parallelism {
161 let mut range = this.range.by_ref().take(this.batch_size);
163 let start = range.next();
164 let range_bounds = start.zip(range.last().or(start));
165
166 let Some(range) = range_bounds.map(|(first, last)| first..=last) else {
168 debug!(target: "exex::backfill", tasks = %this.tasks.len(), range = ?this.range, "No more block batches to backfill");
169 break;
170 };
171
172 debug!(target: "exex::backfill", tasks = %this.tasks.len(), ?range, "Spawning new block batch backfill task");
174 let job = Box::new(BackfillJob {
175 executor: this.executor.clone(),
176 provider: this.provider.clone(),
177 prune_modes: this.prune_modes.clone(),
178 thresholds: this.thresholds.clone(),
179 range,
180 stream_parallelism: this.parallelism,
181 }) as BackfillTaskIterator<_>;
182 this.push_back(job);
183 }
184
185 let res = ready!(this.poll_next_task(cx));
186
187 if res.is_some() {
188 return Poll::Ready(res);
189 }
190
191 if this.range.is_empty() {
192 return Poll::Ready(None);
194 }
195 }
196 }
197}
198
199impl<E, P> From<SingleBlockBackfillJob<E, P>> for StreamBackfillJob<E, P, SingleBlockStreamItem> {
200 fn from(job: SingleBlockBackfillJob<E, P>) -> Self {
201 Self {
202 executor: job.executor,
203 provider: job.provider,
204 prune_modes: PruneModes::default(),
205 range: job.range,
206 tasks: FuturesOrdered::new(),
207 parallelism: job.stream_parallelism,
208 batch_size: 1,
209 thresholds: ExecutionStageThresholds { max_blocks: Some(1), ..Default::default() },
210 }
211 }
212}
213
214impl<E, P> From<BackfillJob<E, P>> for StreamBackfillJob<E, P, BatchBlockStreamItem<E::Primitives>>
215where
216 E: BlockExecutorProvider,
217{
218 fn from(job: BackfillJob<E, P>) -> Self {
219 let batch_size = job.thresholds.max_blocks.map_or(DEFAULT_BATCH_SIZE, |max| max as usize);
220 Self {
221 executor: job.executor,
222 provider: job.provider,
223 prune_modes: job.prune_modes,
224 range: job.range,
225 tasks: FuturesOrdered::new(),
226 parallelism: job.stream_parallelism,
227 batch_size,
228 thresholds: ExecutionStageThresholds {
229 max_blocks: Some(batch_size as u64),
230 ..job.thresholds
231 },
232 }
233 }
234}
235
236#[cfg(test)]
237mod tests {
238 use crate::{
239 backfill::test_utils::{
240 blocks_and_execution_outcome, blocks_and_execution_outputs, chain_spec,
241 },
242 BackfillJobFactory,
243 };
244 use futures::StreamExt;
245 use reth_db_common::init::init_genesis;
246 use reth_evm_ethereum::execute::EthExecutorProvider;
247 use reth_primitives_traits::crypto::secp256k1::public_key_to_address;
248 use reth_provider::{
249 providers::BlockchainProvider, test_utils::create_test_provider_factory_with_chain_spec,
250 };
251 use reth_stages_api::ExecutionStageThresholds;
252 use reth_testing_utils::generators;
253 use secp256k1::Keypair;
254
255 #[tokio::test]
256 async fn test_single_blocks() -> eyre::Result<()> {
257 reth_tracing::init_test_tracing();
258
259 let key_pair = Keypair::new_global(&mut generators::rng());
261 let address = public_key_to_address(key_pair.public_key());
262
263 let chain_spec = chain_spec(address);
264
265 let executor = EthExecutorProvider::ethereum(chain_spec.clone());
266 let provider_factory = create_test_provider_factory_with_chain_spec(chain_spec.clone());
267 init_genesis(&provider_factory)?;
268 let blockchain_db = BlockchainProvider::new(provider_factory.clone())?;
269
270 let blocks_and_execution_outcomes =
272 blocks_and_execution_outputs(provider_factory, chain_spec, key_pair)?;
273
274 let factory = BackfillJobFactory::new(executor.clone(), blockchain_db.clone());
276 let mut backfill_stream = factory.backfill(1..=1).into_single_blocks().into_stream();
277
278 let (block, mut execution_output) = backfill_stream.next().await.unwrap().unwrap();
280 execution_output.state.reverts.sort();
281 let expected_block = blocks_and_execution_outcomes[0].0.clone();
282 let expected_output = &blocks_and_execution_outcomes[0].1;
283 assert_eq!(block, expected_block);
284 assert_eq!(&execution_output, expected_output);
285
286 assert!(backfill_stream.next().await.is_none());
288
289 Ok(())
290 }
291
292 #[tokio::test]
293 async fn test_batch() -> eyre::Result<()> {
294 reth_tracing::init_test_tracing();
295
296 let key_pair = Keypair::new_global(&mut generators::rng());
298 let address = public_key_to_address(key_pair.public_key());
299
300 let chain_spec = chain_spec(address);
301
302 let executor = EthExecutorProvider::ethereum(chain_spec.clone());
303 let provider_factory = create_test_provider_factory_with_chain_spec(chain_spec.clone());
304 init_genesis(&provider_factory)?;
305 let blockchain_db = BlockchainProvider::new(provider_factory.clone())?;
306
307 let (blocks, execution_outcome) =
309 blocks_and_execution_outcome(provider_factory, chain_spec, key_pair)?;
310
311 let factory = BackfillJobFactory::new(executor.clone(), blockchain_db.clone())
313 .with_thresholds(ExecutionStageThresholds { max_blocks: Some(2), ..Default::default() })
314 .with_stream_parallelism(1);
315 let mut backfill_stream = factory.backfill(1..=2).into_stream();
316 let mut chain = backfill_stream.next().await.unwrap().unwrap();
317 chain.execution_outcome_mut().state_mut().reverts.sort();
318
319 assert!(chain.blocks_iter().eq(&blocks));
320 assert_eq!(chain.execution_outcome(), &execution_outcome);
321
322 assert!(backfill_stream.next().await.is_none());
324
325 Ok(())
326 }
327}