reth_exex/backfill/
stream.rs

1use super::job::BackfillJobResult;
2use crate::{BackfillJob, SingleBlockBackfillJob};
3use alloy_primitives::BlockNumber;
4use futures::{
5    stream::{FuturesOrdered, Stream},
6    StreamExt,
7};
8use reth_ethereum_primitives::EthPrimitives;
9use reth_evm::{
10    execute::{BlockExecutionError, BlockExecutionOutput},
11    ConfigureEvm,
12};
13use reth_node_api::NodePrimitives;
14use reth_primitives_traits::RecoveredBlock;
15use reth_provider::{BlockReader, Chain, StateProviderFactory};
16use reth_prune_types::PruneModes;
17use reth_stages_api::ExecutionStageThresholds;
18use reth_tracing::tracing::debug;
19use std::{
20    ops::RangeInclusive,
21    pin::Pin,
22    task::{ready, Context, Poll},
23};
24use tokio::task::JoinHandle;
25
26/// The default parallelism for active tasks in [`StreamBackfillJob`].
27pub(crate) const DEFAULT_PARALLELISM: usize = 4;
28/// The default batch size for active tasks in [`StreamBackfillJob`].
29const DEFAULT_BATCH_SIZE: usize = 100;
30
31/// Boxed thread-safe iterator that yields [`BackfillJobResult`]s.
32type BackfillTaskIterator<T> =
33    Box<dyn Iterator<Item = BackfillJobResult<T>> + Send + Sync + 'static>;
34
35/// Backfill task output.
36struct BackfillTaskOutput<T> {
37    job: BackfillTaskIterator<T>,
38    result: Option<BackfillJobResult<T>>,
39}
40
41/// Ordered queue of [`JoinHandle`]s that yield [`BackfillTaskOutput`]s.
42type BackfillTasks<T> = FuturesOrdered<JoinHandle<BackfillTaskOutput<T>>>;
43
44type SingleBlockStreamItem<N = EthPrimitives> = (
45    RecoveredBlock<<N as NodePrimitives>::Block>,
46    BlockExecutionOutput<<N as NodePrimitives>::Receipt>,
47);
48type BatchBlockStreamItem<N = EthPrimitives> = Chain<N>;
49
50/// Stream for processing backfill jobs asynchronously.
51///
52/// This struct manages the execution of [`SingleBlockBackfillJob`] tasks, allowing blocks to be
53/// processed asynchronously but in order within a specified range.
54#[derive(Debug)]
55pub struct StreamBackfillJob<E, P, T> {
56    evm_config: E,
57    provider: P,
58    prune_modes: PruneModes,
59    range: RangeInclusive<BlockNumber>,
60    tasks: BackfillTasks<T>,
61    parallelism: usize,
62    batch_size: usize,
63    thresholds: ExecutionStageThresholds,
64}
65
66impl<E, P, T> StreamBackfillJob<E, P, T>
67where
68    T: Send + Sync + 'static,
69{
70    /// Configures the parallelism of the [`StreamBackfillJob`] to handle active tasks.
71    pub const fn with_parallelism(mut self, parallelism: usize) -> Self {
72        self.parallelism = parallelism;
73        self
74    }
75
76    /// Configures the batch size for the [`StreamBackfillJob`].
77    pub const fn with_batch_size(mut self, batch_size: usize) -> Self {
78        self.batch_size = batch_size;
79        self
80    }
81
82    /// Spawns a new task calling the [`BackfillTaskIterator::next`] method and pushes it to the end
83    /// of the [`BackfillTasks`] queue.
84    fn push_back(&mut self, mut job: BackfillTaskIterator<T>) {
85        self.tasks.push_back(tokio::task::spawn_blocking(move || BackfillTaskOutput {
86            result: job.next(),
87            job,
88        }));
89    }
90
91    /// Spawns a new task calling the [`BackfillTaskIterator::next`] method and pushes it to the
92    /// front of the  [`BackfillTasks`] queue.
93    fn push_front(&mut self, mut job: BackfillTaskIterator<T>) {
94        self.tasks.push_front(tokio::task::spawn_blocking(move || BackfillTaskOutput {
95            result: job.next(),
96            job,
97        }));
98    }
99
100    /// Polls the next task in the [`BackfillTasks`] queue until it returns a non-empty result.
101    fn poll_next_task(&mut self, cx: &mut Context<'_>) -> Poll<Option<BackfillJobResult<T>>> {
102        while let Some(res) = ready!(self.tasks.poll_next_unpin(cx)) {
103            let task_result = res.map_err(BlockExecutionError::other)?;
104
105            if let BackfillTaskOutput { result: Some(job_result), job } = task_result {
106                // If the task returned a non-empty result, a new task advancing the job is created
107                // and pushed to the __front__ of the queue, so that the next item of this returned
108                // next.
109                self.push_front(job);
110
111                return Poll::Ready(Some(job_result))
112            };
113        }
114
115        Poll::Ready(None)
116    }
117}
118
119impl<E, P> Stream for StreamBackfillJob<E, P, SingleBlockStreamItem<E::Primitives>>
120where
121    E: ConfigureEvm<Primitives: NodePrimitives<Block = P::Block>> + 'static,
122    P: BlockReader + StateProviderFactory + Clone + Unpin + 'static,
123{
124    type Item = BackfillJobResult<SingleBlockStreamItem<E::Primitives>>;
125
126    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
127        let this = self.get_mut();
128
129        // Spawn new tasks only if we are below the parallelism configured.
130        while this.tasks.len() < this.parallelism {
131            // Get the next block number from the range. If it is empty, we are done.
132            let Some(block_number) = this.range.next() else {
133                debug!(target: "exex::backfill", tasks = %this.tasks.len(), range = ?this.range, "No more single blocks to backfill");
134                break;
135            };
136
137            // Spawn a new task for that block
138            debug!(target: "exex::backfill", tasks = %this.tasks.len(), ?block_number, "Spawning new single block backfill task");
139            let job = Box::new(SingleBlockBackfillJob {
140                evm_config: this.evm_config.clone(),
141                provider: this.provider.clone(),
142                range: block_number..=block_number,
143                stream_parallelism: this.parallelism,
144            }) as BackfillTaskIterator<_>;
145            this.push_back(job);
146        }
147
148        this.poll_next_task(cx)
149    }
150}
151
152impl<E, P> Stream for StreamBackfillJob<E, P, BatchBlockStreamItem<E::Primitives>>
153where
154    E: ConfigureEvm<Primitives: NodePrimitives<Block = P::Block>> + 'static,
155    P: BlockReader + StateProviderFactory + Clone + Unpin + 'static,
156{
157    type Item = BackfillJobResult<BatchBlockStreamItem<E::Primitives>>;
158
159    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
160        let this = self.get_mut();
161
162        loop {
163            // Spawn new tasks only if we are below the parallelism configured.
164            while this.tasks.len() < this.parallelism {
165                // Take the next `batch_size` blocks from the range and calculate the range bounds
166                let mut range = this.range.by_ref().take(this.batch_size);
167                let start = range.next();
168                let range_bounds = start.zip(range.last().or(start));
169
170                // Create the range from the range bounds. If it is empty, we are done.
171                let Some(range) = range_bounds.map(|(first, last)| first..=last) else {
172                    debug!(target: "exex::backfill", tasks = %this.tasks.len(), range = ?this.range, "No more block batches to backfill");
173                    break;
174                };
175
176                // Spawn a new task for that range
177                debug!(target: "exex::backfill", tasks = %this.tasks.len(), ?range, "Spawning new block batch backfill task");
178                let job = Box::new(BackfillJob {
179                    evm_config: this.evm_config.clone(),
180                    provider: this.provider.clone(),
181                    prune_modes: this.prune_modes.clone(),
182                    thresholds: this.thresholds.clone(),
183                    range,
184                    stream_parallelism: this.parallelism,
185                }) as BackfillTaskIterator<_>;
186                this.push_back(job);
187            }
188
189            let res = ready!(this.poll_next_task(cx));
190
191            if res.is_some() {
192                return Poll::Ready(res);
193            }
194
195            if this.range.is_empty() {
196                // only terminate the stream if there are no more blocks to process
197                return Poll::Ready(None);
198            }
199        }
200    }
201}
202
203impl<E, P> From<SingleBlockBackfillJob<E, P>> for StreamBackfillJob<E, P, SingleBlockStreamItem> {
204    fn from(job: SingleBlockBackfillJob<E, P>) -> Self {
205        Self {
206            evm_config: job.evm_config,
207            provider: job.provider,
208            prune_modes: PruneModes::default(),
209            range: job.range,
210            tasks: FuturesOrdered::new(),
211            parallelism: job.stream_parallelism,
212            batch_size: 1,
213            thresholds: ExecutionStageThresholds { max_blocks: Some(1), ..Default::default() },
214        }
215    }
216}
217
218impl<E, P> From<BackfillJob<E, P>> for StreamBackfillJob<E, P, BatchBlockStreamItem<E::Primitives>>
219where
220    E: ConfigureEvm,
221{
222    fn from(job: BackfillJob<E, P>) -> Self {
223        let batch_size = job.thresholds.max_blocks.map_or(DEFAULT_BATCH_SIZE, |max| max as usize);
224        Self {
225            evm_config: job.evm_config,
226            provider: job.provider,
227            prune_modes: job.prune_modes,
228            range: job.range,
229            tasks: FuturesOrdered::new(),
230            parallelism: job.stream_parallelism,
231            batch_size,
232            thresholds: ExecutionStageThresholds {
233                max_blocks: Some(batch_size as u64),
234                ..job.thresholds
235            },
236        }
237    }
238}
239
240#[cfg(test)]
241mod tests {
242    use super::*;
243    use crate::{
244        backfill::test_utils::{
245            blocks_and_execution_outcome, blocks_and_execution_outputs, chain_spec,
246            execute_block_and_commit_to_database,
247        },
248        BackfillJobFactory,
249    };
250    use alloy_consensus::{constants::ETH_TO_WEI, Header, TxEip2930};
251    use alloy_primitives::{b256, Address, TxKind, U256};
252    use eyre::Result;
253    use futures::StreamExt;
254    use reth_chainspec::{ChainSpec, EthereumHardfork, MIN_TRANSACTION_GAS};
255    use reth_db_common::init::init_genesis;
256    use reth_ethereum_primitives::{Block, BlockBody, Transaction};
257    use reth_evm_ethereum::EthEvmConfig;
258    use reth_primitives_traits::{
259        crypto::secp256k1::public_key_to_address, Block as _, FullNodePrimitives,
260    };
261    use reth_provider::{
262        providers::{BlockchainProvider, ProviderNodeTypes},
263        test_utils::create_test_provider_factory_with_chain_spec,
264        ProviderFactory,
265    };
266    use reth_stages_api::ExecutionStageThresholds;
267    use reth_testing_utils::{generators, generators::sign_tx_with_key_pair};
268    use secp256k1::Keypair;
269    use std::sync::Arc;
270
271    #[tokio::test]
272    async fn test_single_blocks() -> eyre::Result<()> {
273        reth_tracing::init_test_tracing();
274
275        // Create a key pair for the sender
276        let key_pair = generators::generate_key(&mut generators::rng());
277        let address = public_key_to_address(key_pair.public_key());
278
279        let chain_spec = chain_spec(address);
280
281        let executor = EthEvmConfig::ethereum(chain_spec.clone());
282        let provider_factory = create_test_provider_factory_with_chain_spec(chain_spec.clone());
283        init_genesis(&provider_factory)?;
284        let blockchain_db = BlockchainProvider::new(provider_factory.clone())?;
285
286        // Create first 2 blocks
287        let blocks_and_execution_outcomes =
288            blocks_and_execution_outputs(provider_factory, chain_spec, key_pair)?;
289
290        // Backfill the first block
291        let factory = BackfillJobFactory::new(executor.clone(), blockchain_db.clone());
292        let mut backfill_stream = factory.backfill(1..=1).into_single_blocks().into_stream();
293
294        // execute first block
295        let (block, mut execution_output) = backfill_stream.next().await.unwrap().unwrap();
296        execution_output.state.reverts.sort();
297        let expected_block = blocks_and_execution_outcomes[0].0.clone();
298        let expected_output = &blocks_and_execution_outcomes[0].1;
299        assert_eq!(block, expected_block);
300        assert_eq!(&execution_output, expected_output);
301
302        // expect no more blocks
303        assert!(backfill_stream.next().await.is_none());
304
305        Ok(())
306    }
307
308    #[tokio::test]
309    async fn test_batch() -> eyre::Result<()> {
310        reth_tracing::init_test_tracing();
311
312        // Create a key pair for the sender
313        let key_pair = generators::generate_key(&mut generators::rng());
314        let address = public_key_to_address(key_pair.public_key());
315
316        let chain_spec = chain_spec(address);
317
318        let executor = EthEvmConfig::ethereum(chain_spec.clone());
319        let provider_factory = create_test_provider_factory_with_chain_spec(chain_spec.clone());
320        init_genesis(&provider_factory)?;
321        let blockchain_db = BlockchainProvider::new(provider_factory.clone())?;
322
323        // Create first 2 blocks
324        let (blocks, execution_outcome) =
325            blocks_and_execution_outcome(provider_factory, chain_spec, key_pair)?;
326
327        // Backfill the same range
328        let factory = BackfillJobFactory::new(executor.clone(), blockchain_db.clone())
329            .with_thresholds(ExecutionStageThresholds { max_blocks: Some(2), ..Default::default() })
330            .with_stream_parallelism(1);
331        let mut backfill_stream = factory.backfill(1..=2).into_stream();
332        let mut chain = backfill_stream.next().await.unwrap().unwrap();
333        chain.execution_outcome_mut().state_mut().reverts.sort();
334
335        assert!(chain.blocks_iter().eq(&blocks));
336        assert_eq!(chain.execution_outcome(), &execution_outcome);
337
338        // expect no more blocks
339        assert!(backfill_stream.next().await.is_none());
340
341        Ok(())
342    }
343
344    fn create_blocks(
345        chain_spec: &Arc<ChainSpec>,
346        key_pair: Keypair,
347        n: u64,
348    ) -> Result<Vec<RecoveredBlock<reth_ethereum_primitives::Block>>> {
349        let mut blocks = Vec::with_capacity(n as usize);
350        let mut parent_hash = chain_spec.genesis_hash();
351
352        for (i, nonce) in (1..=n).zip(0..n) {
353            let block = Block {
354                header: Header {
355                    parent_hash,
356                    // Hardcoded receipts_root matching the original test (same tx in each block)
357                    receipts_root: b256!(
358                        "0xd3a6acf9a244d78b33831df95d472c4128ea85bf079a1d41e32ed0b7d2244c9e"
359                    ),
360                    difficulty: chain_spec.fork(EthereumHardfork::Paris).ttd().expect("Paris TTD"),
361                    number: i,
362                    gas_limit: MIN_TRANSACTION_GAS,
363                    gas_used: MIN_TRANSACTION_GAS,
364                    ..Default::default()
365                },
366                body: BlockBody {
367                    transactions: vec![sign_tx_with_key_pair(
368                        key_pair,
369                        Transaction::Eip2930(TxEip2930 {
370                            chain_id: chain_spec.chain.id(),
371                            nonce,
372                            gas_limit: MIN_TRANSACTION_GAS,
373                            gas_price: 1_500_000_000,
374                            to: TxKind::Call(Address::ZERO),
375                            value: U256::from(0.1 * ETH_TO_WEI as f64),
376                            ..Default::default()
377                        }),
378                    )],
379                    ..Default::default()
380                },
381            }
382            .try_into_recovered()?;
383
384            parent_hash = block.hash();
385            blocks.push(block);
386        }
387
388        Ok(blocks)
389    }
390
391    fn execute_and_commit_blocks<N>(
392        provider_factory: &ProviderFactory<N>,
393        chain_spec: &Arc<ChainSpec>,
394        blocks: &[RecoveredBlock<reth_ethereum_primitives::Block>],
395    ) -> Result<()>
396    where
397        N: ProviderNodeTypes<
398            Primitives: FullNodePrimitives<
399                Block = reth_ethereum_primitives::Block,
400                BlockBody = reth_ethereum_primitives::BlockBody,
401                Receipt = reth_ethereum_primitives::Receipt,
402            >,
403        >,
404    {
405        for block in blocks {
406            execute_block_and_commit_to_database(provider_factory, chain_spec.clone(), block)?;
407        }
408        Ok(())
409    }
410
411    #[tokio::test]
412    async fn test_batch_parallel_range_advance() -> Result<()> {
413        reth_tracing::init_test_tracing();
414
415        // Create a key pair for the sender
416        let key_pair = generators::generate_key(&mut generators::rng());
417        let address = public_key_to_address(key_pair.public_key());
418
419        let chain_spec = chain_spec(address);
420
421        let executor = EthEvmConfig::ethereum(chain_spec.clone());
422        let provider_factory = create_test_provider_factory_with_chain_spec(chain_spec.clone());
423        init_genesis(&provider_factory)?;
424        let blockchain_db = BlockchainProvider::new(provider_factory.clone())?;
425
426        // Create and commit 4 blocks
427        let blocks = create_blocks(&chain_spec, key_pair, 4)?;
428        execute_and_commit_blocks(&provider_factory, &chain_spec, &blocks)?;
429
430        // Create factory with batch size 2 (via thresholds max_blocks=2) and parallelism=2
431        let factory = BackfillJobFactory::new(executor.clone(), blockchain_db.clone())
432            .with_thresholds(ExecutionStageThresholds { max_blocks: Some(2), ..Default::default() })
433            .with_stream_parallelism(2);
434
435        // Stream backfill for range 1..=4
436        let mut backfill_stream = factory.backfill(1..=4).into_stream();
437
438        // Collect the two expected chains from the stream
439        let mut chain1 = backfill_stream.next().await.unwrap()?;
440        let mut chain2 = backfill_stream.next().await.unwrap()?;
441        assert!(backfill_stream.next().await.is_none());
442
443        // Sort reverts for comparison
444        chain1.execution_outcome_mut().state_mut().reverts.sort();
445        chain2.execution_outcome_mut().state_mut().reverts.sort();
446
447        // Compute expected chains using non-stream BackfillJob (sequential)
448        let factory_seq =
449            BackfillJobFactory::new(executor.clone(), blockchain_db.clone()).with_thresholds(
450                ExecutionStageThresholds { max_blocks: Some(2), ..Default::default() },
451            );
452
453        let mut expected_chain1 =
454            factory_seq.backfill(1..=2).collect::<Result<Vec<_>, _>>()?.into_iter().next().unwrap();
455        let mut expected_chain2 =
456            factory_seq.backfill(3..=4).collect::<Result<Vec<_>, _>>()?.into_iter().next().unwrap();
457
458        // Sort reverts for expected
459        expected_chain1.execution_outcome_mut().state_mut().reverts.sort();
460        expected_chain2.execution_outcome_mut().state_mut().reverts.sort();
461
462        // Assert the streamed chains match the expected sequential ones
463        assert_eq!(chain1.blocks(), expected_chain1.blocks());
464        assert_eq!(chain1.execution_outcome(), expected_chain1.execution_outcome());
465        assert_eq!(chain2.blocks(), expected_chain2.blocks());
466        assert_eq!(chain2.execution_outcome(), expected_chain2.execution_outcome());
467
468        Ok(())
469    }
470}