reth_exex/backfill/
stream.rs

1use super::job::BackfillJobResult;
2use crate::{BackfillJob, SingleBlockBackfillJob};
3use alloy_primitives::BlockNumber;
4use futures::{
5    stream::{FuturesOrdered, Stream},
6    StreamExt,
7};
8use reth_ethereum_primitives::EthPrimitives;
9use reth_evm::execute::{BlockExecutionError, BlockExecutionOutput, BlockExecutorProvider};
10use reth_node_api::NodePrimitives;
11use reth_primitives_traits::RecoveredBlock;
12use reth_provider::{BlockReader, Chain, StateProviderFactory};
13use reth_prune_types::PruneModes;
14use reth_stages_api::ExecutionStageThresholds;
15use reth_tracing::tracing::debug;
16use std::{
17    ops::RangeInclusive,
18    pin::Pin,
19    task::{ready, Context, Poll},
20};
21use tokio::task::JoinHandle;
22
23/// The default parallelism for active tasks in [`StreamBackfillJob`].
24pub(crate) const DEFAULT_PARALLELISM: usize = 4;
25/// The default batch size for active tasks in [`StreamBackfillJob`].
26const DEFAULT_BATCH_SIZE: usize = 100;
27
28/// Boxed thread-safe iterator that yields [`BackfillJobResult`]s.
29type BackfillTaskIterator<T> =
30    Box<dyn Iterator<Item = BackfillJobResult<T>> + Send + Sync + 'static>;
31
32/// Backfill task output.
33struct BackfillTaskOutput<T> {
34    job: BackfillTaskIterator<T>,
35    result: Option<BackfillJobResult<T>>,
36}
37
38/// Ordered queue of [`JoinHandle`]s that yield [`BackfillTaskOutput`]s.
39type BackfillTasks<T> = FuturesOrdered<JoinHandle<BackfillTaskOutput<T>>>;
40
41type SingleBlockStreamItem<N = EthPrimitives> = (
42    RecoveredBlock<<N as NodePrimitives>::Block>,
43    BlockExecutionOutput<<N as NodePrimitives>::Receipt>,
44);
45type BatchBlockStreamItem<N = EthPrimitives> = Chain<N>;
46
47/// Stream for processing backfill jobs asynchronously.
48///
49/// This struct manages the execution of [`SingleBlockBackfillJob`] tasks, allowing blocks to be
50/// processed asynchronously but in order within a specified range.
51#[derive(Debug)]
52pub struct StreamBackfillJob<E, P, T> {
53    executor: E,
54    provider: P,
55    prune_modes: PruneModes,
56    range: RangeInclusive<BlockNumber>,
57    tasks: BackfillTasks<T>,
58    parallelism: usize,
59    batch_size: usize,
60    thresholds: ExecutionStageThresholds,
61}
62
63impl<E, P, T> StreamBackfillJob<E, P, T>
64where
65    T: Send + Sync + 'static,
66{
67    /// Configures the parallelism of the [`StreamBackfillJob`] to handle active tasks.
68    pub const fn with_parallelism(mut self, parallelism: usize) -> Self {
69        self.parallelism = parallelism;
70        self
71    }
72
73    /// Configures the batch size for the [`StreamBackfillJob`].
74    pub const fn with_batch_size(mut self, batch_size: usize) -> Self {
75        self.batch_size = batch_size;
76        self
77    }
78
79    /// Spawns a new task calling the [`BackfillTaskIterator::next`] method and pushes it to the end
80    /// of the [`BackfillTasks`] queue.
81    fn push_back(&mut self, mut job: BackfillTaskIterator<T>) {
82        self.tasks.push_back(tokio::task::spawn_blocking(move || BackfillTaskOutput {
83            result: job.next(),
84            job,
85        }));
86    }
87
88    /// Spawns a new task calling the [`BackfillTaskIterator::next`] method and pushes it to the
89    /// front of the  [`BackfillTasks`] queue.
90    fn push_front(&mut self, mut job: BackfillTaskIterator<T>) {
91        self.tasks.push_front(tokio::task::spawn_blocking(move || BackfillTaskOutput {
92            result: job.next(),
93            job,
94        }));
95    }
96
97    /// Polls the next task in the [`BackfillTasks`] queue until it returns a non-empty result.
98    fn poll_next_task(&mut self, cx: &mut Context<'_>) -> Poll<Option<BackfillJobResult<T>>> {
99        while let Some(res) = ready!(self.tasks.poll_next_unpin(cx)) {
100            let task_result = res.map_err(BlockExecutionError::other)?;
101
102            if let BackfillTaskOutput { result: Some(job_result), job } = task_result {
103                // If the task returned a non-empty result, a new task advancing the job is created
104                // and pushed to the __front__ of the queue, so that the next item of this returned
105                // next.
106                self.push_front(job);
107
108                return Poll::Ready(Some(job_result))
109            };
110        }
111
112        Poll::Ready(None)
113    }
114}
115
116impl<E, P> Stream for StreamBackfillJob<E, P, SingleBlockStreamItem<E::Primitives>>
117where
118    E: BlockExecutorProvider<Primitives: NodePrimitives<Block = P::Block>> + Clone + 'static,
119    P: BlockReader + StateProviderFactory + Clone + Unpin + 'static,
120{
121    type Item = BackfillJobResult<SingleBlockStreamItem<E::Primitives>>;
122
123    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
124        let this = self.get_mut();
125
126        // Spawn new tasks only if we are below the parallelism configured.
127        while this.tasks.len() < this.parallelism {
128            // Get the next block number from the range. If it is empty, we are done.
129            let Some(block_number) = this.range.next() else {
130                debug!(target: "exex::backfill", tasks = %this.tasks.len(), range = ?this.range, "No more single blocks to backfill");
131                break;
132            };
133
134            // Spawn a new task for that block
135            debug!(target: "exex::backfill", tasks = %this.tasks.len(), ?block_number, "Spawning new single block backfill task");
136            let job = Box::new(SingleBlockBackfillJob {
137                executor: this.executor.clone(),
138                provider: this.provider.clone(),
139                range: block_number..=block_number,
140                stream_parallelism: this.parallelism,
141            }) as BackfillTaskIterator<_>;
142            this.push_back(job);
143        }
144
145        this.poll_next_task(cx)
146    }
147}
148
149impl<E, P> Stream for StreamBackfillJob<E, P, BatchBlockStreamItem<E::Primitives>>
150where
151    E: BlockExecutorProvider<Primitives: NodePrimitives<Block = P::Block>> + Clone + 'static,
152    P: BlockReader + StateProviderFactory + Clone + Unpin + 'static,
153{
154    type Item = BackfillJobResult<BatchBlockStreamItem<E::Primitives>>;
155
156    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
157        let this = self.get_mut();
158
159        loop {
160            // Spawn new tasks only if we are below the parallelism configured.
161            while this.tasks.len() < this.parallelism {
162                // Take the next `batch_size` blocks from the range and calculate the range bounds
163                let mut range = this.range.by_ref().take(this.batch_size);
164                let start = range.next();
165                let range_bounds = start.zip(range.last().or(start));
166
167                // Create the range from the range bounds. If it is empty, we are done.
168                let Some(range) = range_bounds.map(|(first, last)| first..=last) else {
169                    debug!(target: "exex::backfill", tasks = %this.tasks.len(), range = ?this.range, "No more block batches to backfill");
170                    break;
171                };
172
173                // Spawn a new task for that range
174                debug!(target: "exex::backfill", tasks = %this.tasks.len(), ?range, "Spawning new block batch backfill task");
175                let job = Box::new(BackfillJob {
176                    executor: this.executor.clone(),
177                    provider: this.provider.clone(),
178                    prune_modes: this.prune_modes.clone(),
179                    thresholds: this.thresholds.clone(),
180                    range,
181                    stream_parallelism: this.parallelism,
182                }) as BackfillTaskIterator<_>;
183                this.push_back(job);
184            }
185
186            let res = ready!(this.poll_next_task(cx));
187
188            if res.is_some() {
189                return Poll::Ready(res);
190            }
191
192            if this.range.is_empty() {
193                // only terminate the stream if there are no more blocks to process
194                return Poll::Ready(None);
195            }
196        }
197    }
198}
199
200impl<E, P> From<SingleBlockBackfillJob<E, P>> for StreamBackfillJob<E, P, SingleBlockStreamItem> {
201    fn from(job: SingleBlockBackfillJob<E, P>) -> Self {
202        Self {
203            executor: job.executor,
204            provider: job.provider,
205            prune_modes: PruneModes::default(),
206            range: job.range,
207            tasks: FuturesOrdered::new(),
208            parallelism: job.stream_parallelism,
209            batch_size: 1,
210            thresholds: ExecutionStageThresholds { max_blocks: Some(1), ..Default::default() },
211        }
212    }
213}
214
215impl<E, P> From<BackfillJob<E, P>> for StreamBackfillJob<E, P, BatchBlockStreamItem<E::Primitives>>
216where
217    E: BlockExecutorProvider,
218{
219    fn from(job: BackfillJob<E, P>) -> Self {
220        let batch_size = job.thresholds.max_blocks.map_or(DEFAULT_BATCH_SIZE, |max| max as usize);
221        Self {
222            executor: job.executor,
223            provider: job.provider,
224            prune_modes: job.prune_modes,
225            range: job.range,
226            tasks: FuturesOrdered::new(),
227            parallelism: job.stream_parallelism,
228            batch_size,
229            thresholds: ExecutionStageThresholds {
230                max_blocks: Some(batch_size as u64),
231                ..job.thresholds
232            },
233        }
234    }
235}
236
237#[cfg(test)]
238mod tests {
239    use crate::{
240        backfill::test_utils::{
241            blocks_and_execution_outcome, blocks_and_execution_outputs, chain_spec,
242        },
243        BackfillJobFactory,
244    };
245    use futures::StreamExt;
246    use reth_db_common::init::init_genesis;
247    use reth_evm_ethereum::execute::EthExecutorProvider;
248    use reth_primitives_traits::crypto::secp256k1::public_key_to_address;
249    use reth_provider::{
250        providers::BlockchainProvider, test_utils::create_test_provider_factory_with_chain_spec,
251    };
252    use reth_stages_api::ExecutionStageThresholds;
253    use reth_testing_utils::generators;
254
255    #[tokio::test]
256    async fn test_single_blocks() -> eyre::Result<()> {
257        reth_tracing::init_test_tracing();
258
259        // Create a key pair for the sender
260        let key_pair = generators::generate_key(&mut generators::rng());
261        let address = public_key_to_address(key_pair.public_key());
262
263        let chain_spec = chain_spec(address);
264
265        let executor = EthExecutorProvider::ethereum(chain_spec.clone());
266        let provider_factory = create_test_provider_factory_with_chain_spec(chain_spec.clone());
267        init_genesis(&provider_factory)?;
268        let blockchain_db = BlockchainProvider::new(provider_factory.clone())?;
269
270        // Create first 2 blocks
271        let blocks_and_execution_outcomes =
272            blocks_and_execution_outputs(provider_factory, chain_spec, key_pair)?;
273
274        // Backfill the first block
275        let factory = BackfillJobFactory::new(executor.clone(), blockchain_db.clone());
276        let mut backfill_stream = factory.backfill(1..=1).into_single_blocks().into_stream();
277
278        // execute first block
279        let (block, mut execution_output) = backfill_stream.next().await.unwrap().unwrap();
280        execution_output.state.reverts.sort();
281        let expected_block = blocks_and_execution_outcomes[0].0.clone();
282        let expected_output = &blocks_and_execution_outcomes[0].1;
283        assert_eq!(block, expected_block);
284        assert_eq!(&execution_output, expected_output);
285
286        // expect no more blocks
287        assert!(backfill_stream.next().await.is_none());
288
289        Ok(())
290    }
291
292    #[tokio::test]
293    async fn test_batch() -> eyre::Result<()> {
294        reth_tracing::init_test_tracing();
295
296        // Create a key pair for the sender
297        let key_pair = generators::generate_key(&mut generators::rng());
298        let address = public_key_to_address(key_pair.public_key());
299
300        let chain_spec = chain_spec(address);
301
302        let executor = EthExecutorProvider::ethereum(chain_spec.clone());
303        let provider_factory = create_test_provider_factory_with_chain_spec(chain_spec.clone());
304        init_genesis(&provider_factory)?;
305        let blockchain_db = BlockchainProvider::new(provider_factory.clone())?;
306
307        // Create first 2 blocks
308        let (blocks, execution_outcome) =
309            blocks_and_execution_outcome(provider_factory, chain_spec, key_pair)?;
310
311        // Backfill the same range
312        let factory = BackfillJobFactory::new(executor.clone(), blockchain_db.clone())
313            .with_thresholds(ExecutionStageThresholds { max_blocks: Some(2), ..Default::default() })
314            .with_stream_parallelism(1);
315        let mut backfill_stream = factory.backfill(1..=2).into_stream();
316        let mut chain = backfill_stream.next().await.unwrap().unwrap();
317        chain.execution_outcome_mut().state_mut().reverts.sort();
318
319        assert!(chain.blocks_iter().eq(&blocks));
320        assert_eq!(chain.execution_outcome(), &execution_outcome);
321
322        // expect no more blocks
323        assert!(backfill_stream.next().await.is_none());
324
325        Ok(())
326    }
327}