reth_exex/backfill/
stream.rs

1use super::job::BackfillJobResult;
2use crate::{BackfillJob, SingleBlockBackfillJob};
3use alloy_primitives::BlockNumber;
4use futures::{
5    stream::{FuturesOrdered, Stream},
6    StreamExt,
7};
8use reth_evm::execute::{BlockExecutionError, BlockExecutionOutput, BlockExecutorProvider};
9use reth_node_api::NodePrimitives;
10use reth_primitives::{EthPrimitives, RecoveredBlock};
11use reth_provider::{BlockReader, Chain, StateProviderFactory};
12use reth_prune_types::PruneModes;
13use reth_stages_api::ExecutionStageThresholds;
14use reth_tracing::tracing::debug;
15use std::{
16    ops::RangeInclusive,
17    pin::Pin,
18    task::{ready, Context, Poll},
19};
20use tokio::task::JoinHandle;
21
22/// The default parallelism for active tasks in [`StreamBackfillJob`].
23pub(crate) const DEFAULT_PARALLELISM: usize = 4;
24/// The default batch size for active tasks in [`StreamBackfillJob`].
25const DEFAULT_BATCH_SIZE: usize = 100;
26
27/// Boxed thread-safe iterator that yields [`BackfillJobResult`]s.
28type BackfillTaskIterator<T> =
29    Box<dyn Iterator<Item = BackfillJobResult<T>> + Send + Sync + 'static>;
30
31/// Backfill task output.
32struct BackfillTaskOutput<T> {
33    job: BackfillTaskIterator<T>,
34    result: Option<BackfillJobResult<T>>,
35}
36
37/// Ordered queue of [`JoinHandle`]s that yield [`BackfillTaskOutput`]s.
38type BackfillTasks<T> = FuturesOrdered<JoinHandle<BackfillTaskOutput<T>>>;
39
40type SingleBlockStreamItem<N = EthPrimitives> = (
41    RecoveredBlock<<N as NodePrimitives>::Block>,
42    BlockExecutionOutput<<N as NodePrimitives>::Receipt>,
43);
44type BatchBlockStreamItem<N = EthPrimitives> = Chain<N>;
45
46/// Stream for processing backfill jobs asynchronously.
47///
48/// This struct manages the execution of [`SingleBlockBackfillJob`] tasks, allowing blocks to be
49/// processed asynchronously but in order within a specified range.
50#[derive(Debug)]
51pub struct StreamBackfillJob<E, P, T> {
52    executor: E,
53    provider: P,
54    prune_modes: PruneModes,
55    range: RangeInclusive<BlockNumber>,
56    tasks: BackfillTasks<T>,
57    parallelism: usize,
58    batch_size: usize,
59    thresholds: ExecutionStageThresholds,
60}
61
62impl<E, P, T> StreamBackfillJob<E, P, T>
63where
64    T: Send + Sync + 'static,
65{
66    /// Configures the parallelism of the [`StreamBackfillJob`] to handle active tasks.
67    pub const fn with_parallelism(mut self, parallelism: usize) -> Self {
68        self.parallelism = parallelism;
69        self
70    }
71
72    /// Configures the batch size for the [`StreamBackfillJob`].
73    pub const fn with_batch_size(mut self, batch_size: usize) -> Self {
74        self.batch_size = batch_size;
75        self
76    }
77
78    /// Spawns a new task calling the [`BackfillTaskIterator::next`] method and pushes it to the end
79    /// of the [`BackfillTasks`] queue.
80    fn push_back(&mut self, mut job: BackfillTaskIterator<T>) {
81        self.tasks.push_back(tokio::task::spawn_blocking(move || BackfillTaskOutput {
82            result: job.next(),
83            job,
84        }));
85    }
86
87    /// Spawns a new task calling the [`BackfillTaskIterator::next`] method and pushes it to the
88    /// front of the  [`BackfillTasks`] queue.
89    fn push_front(&mut self, mut job: BackfillTaskIterator<T>) {
90        self.tasks.push_front(tokio::task::spawn_blocking(move || BackfillTaskOutput {
91            result: job.next(),
92            job,
93        }));
94    }
95
96    /// Polls the next task in the [`BackfillTasks`] queue until it returns a non-empty result.
97    fn poll_next_task(&mut self, cx: &mut Context<'_>) -> Poll<Option<BackfillJobResult<T>>> {
98        while let Some(res) = ready!(self.tasks.poll_next_unpin(cx)) {
99            let task_result = res.map_err(BlockExecutionError::other)?;
100
101            if let BackfillTaskOutput { result: Some(job_result), job } = task_result {
102                // If the task returned a non-empty result, a new task advancing the job is created
103                // and pushed to the __front__ of the queue, so that the next item of this returned
104                // next.
105                self.push_front(job);
106
107                return Poll::Ready(Some(job_result))
108            };
109        }
110
111        Poll::Ready(None)
112    }
113}
114
115impl<E, P> Stream for StreamBackfillJob<E, P, SingleBlockStreamItem<E::Primitives>>
116where
117    E: BlockExecutorProvider<Primitives: NodePrimitives<Block = P::Block>> + Clone + 'static,
118    P: BlockReader + StateProviderFactory + Clone + Unpin + 'static,
119{
120    type Item = BackfillJobResult<SingleBlockStreamItem<E::Primitives>>;
121
122    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
123        let this = self.get_mut();
124
125        // Spawn new tasks only if we are below the parallelism configured.
126        while this.tasks.len() < this.parallelism {
127            // Get the next block number from the range. If it is empty, we are done.
128            let Some(block_number) = this.range.next() else {
129                debug!(target: "exex::backfill", tasks = %this.tasks.len(), range = ?this.range, "No more single blocks to backfill");
130                break;
131            };
132
133            // Spawn a new task for that block
134            debug!(target: "exex::backfill", tasks = %this.tasks.len(), ?block_number, "Spawning new single block backfill task");
135            let job = Box::new(SingleBlockBackfillJob {
136                executor: this.executor.clone(),
137                provider: this.provider.clone(),
138                range: block_number..=block_number,
139                stream_parallelism: this.parallelism,
140            }) as BackfillTaskIterator<_>;
141            this.push_back(job);
142        }
143
144        this.poll_next_task(cx)
145    }
146}
147
148impl<E, P> Stream for StreamBackfillJob<E, P, BatchBlockStreamItem<E::Primitives>>
149where
150    E: BlockExecutorProvider<Primitives: NodePrimitives<Block = P::Block>> + Clone + 'static,
151    P: BlockReader + StateProviderFactory + Clone + Unpin + 'static,
152{
153    type Item = BackfillJobResult<BatchBlockStreamItem<E::Primitives>>;
154
155    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
156        let this = self.get_mut();
157
158        loop {
159            // Spawn new tasks only if we are below the parallelism configured.
160            while this.tasks.len() < this.parallelism {
161                // Take the next `batch_size` blocks from the range and calculate the range bounds
162                let mut range = this.range.by_ref().take(this.batch_size);
163                let start = range.next();
164                let range_bounds = start.zip(range.last().or(start));
165
166                // Create the range from the range bounds. If it is empty, we are done.
167                let Some(range) = range_bounds.map(|(first, last)| first..=last) else {
168                    debug!(target: "exex::backfill", tasks = %this.tasks.len(), range = ?this.range, "No more block batches to backfill");
169                    break;
170                };
171
172                // Spawn a new task for that range
173                debug!(target: "exex::backfill", tasks = %this.tasks.len(), ?range, "Spawning new block batch backfill task");
174                let job = Box::new(BackfillJob {
175                    executor: this.executor.clone(),
176                    provider: this.provider.clone(),
177                    prune_modes: this.prune_modes.clone(),
178                    thresholds: this.thresholds.clone(),
179                    range,
180                    stream_parallelism: this.parallelism,
181                }) as BackfillTaskIterator<_>;
182                this.push_back(job);
183            }
184
185            let res = ready!(this.poll_next_task(cx));
186
187            if res.is_some() {
188                return Poll::Ready(res);
189            }
190
191            if this.range.is_empty() {
192                // only terminate the stream if there are no more blocks to process
193                return Poll::Ready(None);
194            }
195        }
196    }
197}
198
199impl<E, P> From<SingleBlockBackfillJob<E, P>> for StreamBackfillJob<E, P, SingleBlockStreamItem> {
200    fn from(job: SingleBlockBackfillJob<E, P>) -> Self {
201        Self {
202            executor: job.executor,
203            provider: job.provider,
204            prune_modes: PruneModes::default(),
205            range: job.range,
206            tasks: FuturesOrdered::new(),
207            parallelism: job.stream_parallelism,
208            batch_size: 1,
209            thresholds: ExecutionStageThresholds { max_blocks: Some(1), ..Default::default() },
210        }
211    }
212}
213
214impl<E, P> From<BackfillJob<E, P>> for StreamBackfillJob<E, P, BatchBlockStreamItem<E::Primitives>>
215where
216    E: BlockExecutorProvider,
217{
218    fn from(job: BackfillJob<E, P>) -> Self {
219        let batch_size = job.thresholds.max_blocks.map_or(DEFAULT_BATCH_SIZE, |max| max as usize);
220        Self {
221            executor: job.executor,
222            provider: job.provider,
223            prune_modes: job.prune_modes,
224            range: job.range,
225            tasks: FuturesOrdered::new(),
226            parallelism: job.stream_parallelism,
227            batch_size,
228            thresholds: ExecutionStageThresholds {
229                max_blocks: Some(batch_size as u64),
230                ..job.thresholds
231            },
232        }
233    }
234}
235
236#[cfg(test)]
237mod tests {
238    use crate::{
239        backfill::test_utils::{
240            blocks_and_execution_outcome, blocks_and_execution_outputs, chain_spec,
241        },
242        BackfillJobFactory,
243    };
244    use futures::StreamExt;
245    use reth_db_common::init::init_genesis;
246    use reth_evm_ethereum::execute::EthExecutorProvider;
247    use reth_primitives_traits::crypto::secp256k1::public_key_to_address;
248    use reth_provider::{
249        providers::BlockchainProvider, test_utils::create_test_provider_factory_with_chain_spec,
250    };
251    use reth_stages_api::ExecutionStageThresholds;
252    use reth_testing_utils::generators;
253    use secp256k1::Keypair;
254
255    #[tokio::test]
256    async fn test_single_blocks() -> eyre::Result<()> {
257        reth_tracing::init_test_tracing();
258
259        // Create a key pair for the sender
260        let key_pair = Keypair::new_global(&mut generators::rng());
261        let address = public_key_to_address(key_pair.public_key());
262
263        let chain_spec = chain_spec(address);
264
265        let executor = EthExecutorProvider::ethereum(chain_spec.clone());
266        let provider_factory = create_test_provider_factory_with_chain_spec(chain_spec.clone());
267        init_genesis(&provider_factory)?;
268        let blockchain_db = BlockchainProvider::new(provider_factory.clone())?;
269
270        // Create first 2 blocks
271        let blocks_and_execution_outcomes =
272            blocks_and_execution_outputs(provider_factory, chain_spec, key_pair)?;
273
274        // Backfill the first block
275        let factory = BackfillJobFactory::new(executor.clone(), blockchain_db.clone());
276        let mut backfill_stream = factory.backfill(1..=1).into_single_blocks().into_stream();
277
278        // execute first block
279        let (block, mut execution_output) = backfill_stream.next().await.unwrap().unwrap();
280        execution_output.state.reverts.sort();
281        let expected_block = blocks_and_execution_outcomes[0].0.clone();
282        let expected_output = &blocks_and_execution_outcomes[0].1;
283        assert_eq!(block, expected_block);
284        assert_eq!(&execution_output, expected_output);
285
286        // expect no more blocks
287        assert!(backfill_stream.next().await.is_none());
288
289        Ok(())
290    }
291
292    #[tokio::test]
293    async fn test_batch() -> eyre::Result<()> {
294        reth_tracing::init_test_tracing();
295
296        // Create a key pair for the sender
297        let key_pair = Keypair::new_global(&mut generators::rng());
298        let address = public_key_to_address(key_pair.public_key());
299
300        let chain_spec = chain_spec(address);
301
302        let executor = EthExecutorProvider::ethereum(chain_spec.clone());
303        let provider_factory = create_test_provider_factory_with_chain_spec(chain_spec.clone());
304        init_genesis(&provider_factory)?;
305        let blockchain_db = BlockchainProvider::new(provider_factory.clone())?;
306
307        // Create first 2 blocks
308        let (blocks, execution_outcome) =
309            blocks_and_execution_outcome(provider_factory, chain_spec, key_pair)?;
310
311        // Backfill the same range
312        let factory = BackfillJobFactory::new(executor.clone(), blockchain_db.clone())
313            .with_thresholds(ExecutionStageThresholds { max_blocks: Some(2), ..Default::default() })
314            .with_stream_parallelism(1);
315        let mut backfill_stream = factory.backfill(1..=2).into_stream();
316        let mut chain = backfill_stream.next().await.unwrap().unwrap();
317        chain.execution_outcome_mut().state_mut().reverts.sort();
318
319        assert!(chain.blocks_iter().eq(&blocks));
320        assert_eq!(chain.execution_outcome(), &execution_outcome);
321
322        // expect no more blocks
323        assert!(backfill_stream.next().await.is_none());
324
325        Ok(())
326    }
327}