reth_exex/backfill/
job.rs

1use crate::StreamBackfillJob;
2use reth_evm::ConfigureEvm;
3use std::{
4    ops::RangeInclusive,
5    time::{Duration, Instant},
6};
7
8use alloy_consensus::BlockHeader;
9use alloy_primitives::BlockNumber;
10use reth_ethereum_primitives::Receipt;
11use reth_evm::execute::{BlockExecutionError, BlockExecutionOutput, Executor};
12use reth_node_api::{Block as _, BlockBody as _, NodePrimitives};
13use reth_primitives_traits::{format_gas_throughput, RecoveredBlock, SignedTransaction};
14use reth_provider::{
15    BlockReader, Chain, ExecutionOutcome, HeaderProvider, ProviderError, StateProviderFactory,
16    TransactionVariant,
17};
18use reth_prune_types::PruneModes;
19use reth_revm::database::StateProviderDatabase;
20use reth_stages_api::ExecutionStageThresholds;
21use reth_tracing::tracing::{debug, trace};
22
23pub(super) type BackfillJobResult<T> = Result<T, BlockExecutionError>;
24
25/// Backfill job started for a specific range.
26///
27/// It implements [`Iterator`] that executes blocks in batches according to the provided thresholds
28/// and yields [`Chain`]. In other words, this iterator can yield multiple items for the given range
29/// depending on the configured thresholds.
30#[derive(Debug)]
31pub struct BackfillJob<E, P> {
32    pub(crate) evm_config: E,
33    pub(crate) provider: P,
34    pub(crate) prune_modes: PruneModes,
35    pub(crate) thresholds: ExecutionStageThresholds,
36    pub(crate) range: RangeInclusive<BlockNumber>,
37    pub(crate) stream_parallelism: usize,
38}
39
40impl<E, P> Iterator for BackfillJob<E, P>
41where
42    E: ConfigureEvm<Primitives: NodePrimitives<Block = P::Block>> + 'static,
43    P: HeaderProvider + BlockReader<Transaction: SignedTransaction> + StateProviderFactory,
44{
45    type Item = BackfillJobResult<Chain<E::Primitives>>;
46
47    fn next(&mut self) -> Option<Self::Item> {
48        if self.range.is_empty() {
49            return None
50        }
51
52        Some(self.execute_range())
53    }
54}
55
56impl<E, P> BackfillJob<E, P>
57where
58    E: ConfigureEvm<Primitives: NodePrimitives<Block = P::Block>> + 'static,
59    P: BlockReader<Transaction: SignedTransaction> + HeaderProvider + StateProviderFactory,
60{
61    /// Converts the backfill job into a single block backfill job.
62    pub fn into_single_blocks(self) -> SingleBlockBackfillJob<E, P> {
63        self.into()
64    }
65
66    /// Converts the backfill job into a stream.
67    pub fn into_stream(self) -> StreamBackfillJob<E, P, Chain<E::Primitives>> {
68        self.into()
69    }
70
71    fn execute_range(&mut self) -> BackfillJobResult<Chain<E::Primitives>> {
72        debug!(
73            target: "exex::backfill",
74            range = ?self.range,
75            "Executing block range"
76        );
77
78        let mut executor = self.evm_config.batch_executor(StateProviderDatabase::new(
79            self.provider
80                .history_by_block_number(self.range.start().saturating_sub(1))
81                .map_err(BlockExecutionError::other)?,
82        ));
83
84        let mut fetch_block_duration = Duration::default();
85        let mut execution_duration = Duration::default();
86        let mut cumulative_gas = 0;
87        let batch_start = Instant::now();
88
89        let mut blocks = Vec::new();
90        let mut results = Vec::new();
91        for block_number in self.range.clone() {
92            // Fetch the block
93            let fetch_block_start = Instant::now();
94
95            // we need the block's transactions along with their hashes
96            let block = self
97                .provider
98                .sealed_block_with_senders(block_number.into(), TransactionVariant::WithHash)
99                .map_err(BlockExecutionError::other)?
100                .ok_or_else(|| ProviderError::HeaderNotFound(block_number.into()))
101                .map_err(BlockExecutionError::other)?;
102
103            fetch_block_duration += fetch_block_start.elapsed();
104
105            cumulative_gas += block.gas_used();
106
107            // Configure the executor to use the current state.
108            trace!(target: "exex::backfill", number = block_number, txs = block.body().transactions().len(), "Executing block");
109
110            // Execute the block
111            let execute_start = Instant::now();
112
113            // Unseal the block for execution
114            let (block, senders) = block.split_sealed();
115            let (header, body) = block.split_sealed_header_body();
116            let block = P::Block::new_sealed(header, body).with_senders(senders);
117
118            results.push(executor.execute_one(&block)?);
119            execution_duration += execute_start.elapsed();
120
121            // Seal the block back and save it
122            blocks.push(block);
123            // Check if we should commit now
124            if self.thresholds.is_end_of_batch(
125                block_number - *self.range.start() + 1,
126                executor.size_hint() as u64,
127                cumulative_gas,
128                batch_start.elapsed(),
129            ) {
130                break
131            }
132        }
133
134        let first_block_number = blocks.first().expect("blocks should not be empty").number();
135        let last_block_number = blocks.last().expect("blocks should not be empty").number();
136        debug!(
137            target: "exex::backfill",
138            range = ?*self.range.start()..=last_block_number,
139            block_fetch = ?fetch_block_duration,
140            execution = ?execution_duration,
141            throughput = format_gas_throughput(cumulative_gas, execution_duration),
142            "Finished executing block range"
143        );
144        self.range = last_block_number + 1..=*self.range.end();
145
146        let outcome = ExecutionOutcome::from_blocks(
147            first_block_number,
148            executor.into_state().take_bundle(),
149            results,
150        );
151        let chain = Chain::new(blocks, outcome, None);
152        Ok(chain)
153    }
154}
155
156/// Single block Backfill job started for a specific range.
157///
158/// It implements [`Iterator`] which executes a block each time the
159/// iterator is advanced and yields ([`RecoveredBlock`], [`BlockExecutionOutput`])
160#[derive(Debug, Clone)]
161pub struct SingleBlockBackfillJob<E, P> {
162    pub(crate) evm_config: E,
163    pub(crate) provider: P,
164    pub(crate) range: RangeInclusive<BlockNumber>,
165    pub(crate) stream_parallelism: usize,
166}
167
168impl<E, P> Iterator for SingleBlockBackfillJob<E, P>
169where
170    E: ConfigureEvm<Primitives: NodePrimitives<Block = P::Block>> + 'static,
171    P: HeaderProvider + BlockReader + StateProviderFactory,
172{
173    type Item = BackfillJobResult<(
174        RecoveredBlock<P::Block>,
175        BlockExecutionOutput<<E::Primitives as NodePrimitives>::Receipt>,
176    )>;
177
178    fn next(&mut self) -> Option<Self::Item> {
179        self.range.next().map(|block_number| self.execute_block(block_number))
180    }
181}
182
183impl<E, P> SingleBlockBackfillJob<E, P>
184where
185    E: ConfigureEvm<Primitives: NodePrimitives<Block = P::Block>> + 'static,
186    P: HeaderProvider + BlockReader + StateProviderFactory,
187{
188    /// Converts the single block backfill job into a stream.
189    pub fn into_stream(
190        self,
191    ) -> StreamBackfillJob<
192        E,
193        P,
194        (RecoveredBlock<reth_ethereum_primitives::Block>, BlockExecutionOutput<Receipt>),
195    > {
196        self.into()
197    }
198
199    #[expect(clippy::type_complexity)]
200    pub(crate) fn execute_block(
201        &self,
202        block_number: u64,
203    ) -> BackfillJobResult<(
204        RecoveredBlock<P::Block>,
205        BlockExecutionOutput<<E::Primitives as NodePrimitives>::Receipt>,
206    )> {
207        // Fetch the block with senders for execution.
208        let block_with_senders = self
209            .provider
210            .recovered_block(block_number.into(), TransactionVariant::WithHash)
211            .map_err(BlockExecutionError::other)?
212            .ok_or_else(|| ProviderError::HeaderNotFound(block_number.into()))
213            .map_err(BlockExecutionError::other)?;
214
215        // Configure the executor to use the previous block's state.
216        let executor = self.evm_config.batch_executor(StateProviderDatabase::new(
217            self.provider
218                .history_by_block_number(block_number.saturating_sub(1))
219                .map_err(BlockExecutionError::other)?,
220        ));
221
222        trace!(target: "exex::backfill", number = block_number, txs = block_with_senders.body().transaction_count(), "Executing block");
223
224        let block_execution_output = executor.execute(&block_with_senders)?;
225
226        Ok((block_with_senders, block_execution_output))
227    }
228}
229
230impl<E, P> From<BackfillJob<E, P>> for SingleBlockBackfillJob<E, P> {
231    fn from(job: BackfillJob<E, P>) -> Self {
232        Self {
233            evm_config: job.evm_config,
234            provider: job.provider,
235            range: job.range,
236            stream_parallelism: job.stream_parallelism,
237        }
238    }
239}
240
241#[cfg(test)]
242mod tests {
243    use crate::{
244        backfill::{
245            job::ExecutionStageThresholds,
246            test_utils::{blocks_and_execution_outputs, chain_spec, to_execution_outcome},
247        },
248        BackfillJobFactory,
249    };
250    use reth_db_common::init::init_genesis;
251    use reth_evm_ethereum::EthEvmConfig;
252    use reth_primitives_traits::crypto::secp256k1::public_key_to_address;
253    use reth_provider::{
254        providers::BlockchainProvider, test_utils::create_test_provider_factory_with_chain_spec,
255    };
256    use reth_testing_utils::generators;
257
258    #[test]
259    fn test_backfill() -> eyre::Result<()> {
260        reth_tracing::init_test_tracing();
261
262        // Create a key pair for the sender
263        let key_pair = generators::generate_key(&mut generators::rng());
264        let address = public_key_to_address(key_pair.public_key());
265
266        let chain_spec = chain_spec(address);
267
268        let executor = EthEvmConfig::ethereum(chain_spec.clone());
269        let provider_factory = create_test_provider_factory_with_chain_spec(chain_spec.clone());
270        init_genesis(&provider_factory)?;
271        let blockchain_db = BlockchainProvider::new(provider_factory.clone())?;
272
273        let blocks_and_execution_outputs =
274            blocks_and_execution_outputs(provider_factory, chain_spec, key_pair)?;
275        let (block, block_execution_output) = blocks_and_execution_outputs.first().unwrap();
276        let execution_outcome = to_execution_outcome(block.number, block_execution_output);
277
278        // Backfill the first block
279        let factory = BackfillJobFactory::new(executor, blockchain_db);
280        let job = factory.backfill(1..=1);
281        let chains = job.collect::<Result<Vec<_>, _>>()?;
282
283        // Assert that the backfill job produced the same chain as we got before when we were
284        // executing only the first block
285        assert_eq!(chains.len(), 1);
286        let mut chain = chains.into_iter().next().unwrap();
287        chain.execution_outcome_mut().bundle.reverts.sort();
288        assert_eq!(chain.blocks(), &[(1, block.clone())].into());
289        assert_eq!(chain.execution_outcome(), &execution_outcome);
290
291        Ok(())
292    }
293
294    #[test]
295    fn test_single_block_backfill() -> eyre::Result<()> {
296        reth_tracing::init_test_tracing();
297
298        // Create a key pair for the sender
299        let key_pair = generators::generate_key(&mut generators::rng());
300        let address = public_key_to_address(key_pair.public_key());
301
302        let chain_spec = chain_spec(address);
303
304        let executor = EthEvmConfig::ethereum(chain_spec.clone());
305        let provider_factory = create_test_provider_factory_with_chain_spec(chain_spec.clone());
306        init_genesis(&provider_factory)?;
307        let blockchain_db = BlockchainProvider::new(provider_factory.clone())?;
308
309        let blocks_and_execution_outcomes =
310            blocks_and_execution_outputs(provider_factory, chain_spec, key_pair)?;
311
312        // Backfill the first block
313        let factory = BackfillJobFactory::new(executor, blockchain_db);
314        let job = factory.backfill(1..=1);
315        let single_job = job.into_single_blocks();
316        let block_execution_it = single_job.into_iter();
317
318        // Assert that the backfill job only produces a single block
319        let blocks_and_outcomes = block_execution_it.collect::<Vec<_>>();
320        assert_eq!(blocks_and_outcomes.len(), 1);
321
322        // Assert that the backfill job single block iterator produces the expected output for each
323        // block
324        for (i, res) in blocks_and_outcomes.into_iter().enumerate() {
325            let (block, mut execution_output) = res?;
326            execution_output.state.reverts.sort();
327
328            let expected_block = blocks_and_execution_outcomes[i].0.clone();
329            let expected_output = &blocks_and_execution_outcomes[i].1;
330
331            assert_eq!(block, expected_block);
332            assert_eq!(&execution_output, expected_output);
333        }
334
335        Ok(())
336    }
337
338    #[test]
339    fn test_backfill_with_batch_threshold() -> eyre::Result<()> {
340        reth_tracing::init_test_tracing();
341
342        // Create a key pair for the sender
343        let key_pair = generators::generate_key(&mut generators::rng());
344        let address = public_key_to_address(key_pair.public_key());
345
346        let chain_spec = chain_spec(address);
347
348        let executor = EthEvmConfig::ethereum(chain_spec.clone());
349        let provider_factory = create_test_provider_factory_with_chain_spec(chain_spec.clone());
350        init_genesis(&provider_factory)?;
351        let blockchain_db = BlockchainProvider::new(provider_factory.clone())?;
352
353        let blocks_and_execution_outputs =
354            blocks_and_execution_outputs(provider_factory, chain_spec, key_pair)?;
355        let (block1, output1) = blocks_and_execution_outputs[0].clone();
356        let (block2, output2) = blocks_and_execution_outputs[1].clone();
357
358        // Backfill with max_blocks=1, expect two separate chains
359        let factory = BackfillJobFactory::new(executor, blockchain_db).with_thresholds(
360            ExecutionStageThresholds { max_blocks: Some(1), ..Default::default() },
361        );
362        let job = factory.backfill(1..=2);
363        let chains = job.collect::<Result<Vec<_>, _>>()?;
364
365        // Assert two chains, each with one block
366        assert_eq!(chains.len(), 2);
367
368        let mut chain1 = chains[0].clone();
369        chain1.execution_outcome_mut().bundle.reverts.sort();
370        assert_eq!(chain1.blocks(), &[(1, block1)].into());
371        assert_eq!(chain1.execution_outcome(), &to_execution_outcome(1, &output1));
372
373        let mut chain2 = chains[1].clone();
374        chain2.execution_outcome_mut().bundle.reverts.sort();
375        assert_eq!(chain2.blocks(), &[(2, block2)].into());
376        assert_eq!(chain2.execution_outcome(), &to_execution_outcome(2, &output2));
377
378        Ok(())
379    }
380}