Skip to main content

reth_exex/backfill/
job.rs

1use crate::StreamBackfillJob;
2use reth_evm::ConfigureEvm;
3use std::{
4    collections::BTreeMap,
5    ops::RangeInclusive,
6    time::{Duration, Instant},
7};
8
9use alloy_consensus::BlockHeader;
10use alloy_primitives::BlockNumber;
11use reth_ethereum_primitives::Receipt;
12use reth_evm::execute::{BlockExecutionError, BlockExecutionOutput, Executor};
13use reth_node_api::{Block as _, BlockBody as _, NodePrimitives};
14use reth_primitives_traits::{format_gas_throughput, RecoveredBlock, SignedTransaction};
15use reth_provider::{
16    BlockReader, Chain, ExecutionOutcome, HeaderProvider, ProviderError, StateProviderFactory,
17    TransactionVariant,
18};
19use reth_prune_types::PruneModes;
20use reth_revm::database::StateProviderDatabase;
21use reth_stages_api::ExecutionStageThresholds;
22use reth_tracing::tracing::{debug, trace};
23
24pub(super) type BackfillJobResult<T> = Result<T, BlockExecutionError>;
25
26/// Backfill job started for a specific range.
27///
28/// It implements [`Iterator`] that executes blocks in batches according to the provided thresholds
29/// and yields [`Chain`]. In other words, this iterator can yield multiple items for the given range
30/// depending on the configured thresholds.
31#[derive(Debug)]
32pub struct BackfillJob<E, P> {
33    pub(crate) evm_config: E,
34    pub(crate) provider: P,
35    pub(crate) prune_modes: PruneModes,
36    pub(crate) thresholds: ExecutionStageThresholds,
37    pub(crate) range: RangeInclusive<BlockNumber>,
38    pub(crate) stream_parallelism: usize,
39}
40
41impl<E, P> Iterator for BackfillJob<E, P>
42where
43    E: ConfigureEvm<Primitives: NodePrimitives<Block = P::Block>> + 'static,
44    P: HeaderProvider + BlockReader<Transaction: SignedTransaction> + StateProviderFactory,
45{
46    type Item = BackfillJobResult<Chain<E::Primitives>>;
47
48    fn next(&mut self) -> Option<Self::Item> {
49        if self.range.is_empty() {
50            return None
51        }
52
53        Some(self.execute_range())
54    }
55}
56
57impl<E, P> BackfillJob<E, P>
58where
59    E: ConfigureEvm<Primitives: NodePrimitives<Block = P::Block>> + 'static,
60    P: BlockReader<Transaction: SignedTransaction> + HeaderProvider + StateProviderFactory,
61{
62    /// Converts the backfill job into a single block backfill job.
63    pub fn into_single_blocks(self) -> SingleBlockBackfillJob<E, P> {
64        self.into()
65    }
66
67    /// Converts the backfill job into a stream.
68    pub fn into_stream(self) -> StreamBackfillJob<E, P, Chain<E::Primitives>> {
69        self.into()
70    }
71
72    fn execute_range(&mut self) -> BackfillJobResult<Chain<E::Primitives>> {
73        debug!(
74            target: "exex::backfill",
75            range = ?self.range,
76            "Executing block range"
77        );
78
79        let mut executor = self.evm_config.batch_executor(StateProviderDatabase::new(
80            self.provider
81                .history_by_block_number(self.range.start().saturating_sub(1))
82                .map_err(BlockExecutionError::other)?,
83        ));
84
85        let mut fetch_block_duration = Duration::default();
86        let mut execution_duration = Duration::default();
87        let mut cumulative_gas = 0;
88        let batch_start = Instant::now();
89
90        let mut blocks = Vec::new();
91        let mut results = Vec::new();
92        for block_number in self.range.clone() {
93            // Fetch the block
94            let fetch_block_start = Instant::now();
95
96            // we need the block's transactions along with their hashes
97            let block = self
98                .provider
99                .sealed_block_with_senders(block_number.into(), TransactionVariant::WithHash)
100                .map_err(BlockExecutionError::other)?
101                .ok_or_else(|| ProviderError::HeaderNotFound(block_number.into()))
102                .map_err(BlockExecutionError::other)?;
103
104            fetch_block_duration += fetch_block_start.elapsed();
105
106            cumulative_gas += block.gas_used();
107
108            // Configure the executor to use the current state.
109            trace!(target: "exex::backfill", number = block_number, txs = block.body().transactions().len(), "Executing block");
110
111            // Execute the block
112            let execute_start = Instant::now();
113
114            // Unseal the block for execution
115            let (block, senders) = block.split_sealed();
116            let (header, body) = block.split_sealed_header_body();
117            let block = P::Block::new_sealed(header, body).with_senders(senders);
118
119            results.push(executor.execute_one(&block)?);
120            execution_duration += execute_start.elapsed();
121
122            // Seal the block back and save it
123            blocks.push(block);
124            // Check if we should commit now
125            if self.thresholds.is_end_of_batch(
126                block_number - *self.range.start() + 1,
127                executor.size_hint() as u64,
128                cumulative_gas,
129                batch_start.elapsed(),
130            ) {
131                break
132            }
133        }
134
135        let first_block_number = blocks.first().expect("blocks should not be empty").number();
136        let last_block_number = blocks.last().expect("blocks should not be empty").number();
137        debug!(
138            target: "exex::backfill",
139            range = ?*self.range.start()..=last_block_number,
140            block_fetch = ?fetch_block_duration,
141            execution = ?execution_duration,
142            throughput = format_gas_throughput(cumulative_gas, execution_duration),
143            "Finished executing block range"
144        );
145        self.range = last_block_number + 1..=*self.range.end();
146
147        let outcome = ExecutionOutcome::from_blocks(
148            first_block_number,
149            executor.into_state().take_bundle(),
150            results,
151        );
152        let chain = Chain::new(blocks, outcome, BTreeMap::new());
153        Ok(chain)
154    }
155}
156
157/// Single block Backfill job started for a specific range.
158///
159/// It implements [`Iterator`] which executes a block each time the
160/// iterator is advanced and yields ([`RecoveredBlock`], [`BlockExecutionOutput`])
161#[derive(Debug, Clone)]
162pub struct SingleBlockBackfillJob<E, P> {
163    pub(crate) evm_config: E,
164    pub(crate) provider: P,
165    pub(crate) range: RangeInclusive<BlockNumber>,
166    pub(crate) stream_parallelism: usize,
167}
168
169impl<E, P> Iterator for SingleBlockBackfillJob<E, P>
170where
171    E: ConfigureEvm<Primitives: NodePrimitives<Block = P::Block>> + 'static,
172    P: HeaderProvider + BlockReader + StateProviderFactory,
173{
174    type Item = BackfillJobResult<(
175        RecoveredBlock<P::Block>,
176        BlockExecutionOutput<<E::Primitives as NodePrimitives>::Receipt>,
177    )>;
178
179    fn next(&mut self) -> Option<Self::Item> {
180        self.range.next().map(|block_number| self.execute_block(block_number))
181    }
182}
183
184impl<E, P> SingleBlockBackfillJob<E, P>
185where
186    E: ConfigureEvm<Primitives: NodePrimitives<Block = P::Block>> + 'static,
187    P: HeaderProvider + BlockReader + StateProviderFactory,
188{
189    /// Converts the single block backfill job into a stream.
190    pub fn into_stream(
191        self,
192    ) -> StreamBackfillJob<
193        E,
194        P,
195        (RecoveredBlock<reth_ethereum_primitives::Block>, BlockExecutionOutput<Receipt>),
196    > {
197        self.into()
198    }
199
200    #[expect(clippy::type_complexity)]
201    pub(crate) fn execute_block(
202        &self,
203        block_number: u64,
204    ) -> BackfillJobResult<(
205        RecoveredBlock<P::Block>,
206        BlockExecutionOutput<<E::Primitives as NodePrimitives>::Receipt>,
207    )> {
208        // Fetch the block with senders for execution.
209        let block_with_senders = self
210            .provider
211            .recovered_block(block_number.into(), TransactionVariant::WithHash)
212            .map_err(BlockExecutionError::other)?
213            .ok_or_else(|| ProviderError::HeaderNotFound(block_number.into()))
214            .map_err(BlockExecutionError::other)?;
215
216        // Configure the executor to use the previous block's state.
217        let executor = self.evm_config.batch_executor(StateProviderDatabase::new(
218            self.provider
219                .history_by_block_number(block_number.saturating_sub(1))
220                .map_err(BlockExecutionError::other)?,
221        ));
222
223        trace!(target: "exex::backfill", number = block_number, txs = block_with_senders.body().transaction_count(), "Executing block");
224
225        let block_execution_output = executor.execute(&block_with_senders)?;
226
227        Ok((block_with_senders, block_execution_output))
228    }
229}
230
231impl<E, P> From<BackfillJob<E, P>> for SingleBlockBackfillJob<E, P> {
232    fn from(job: BackfillJob<E, P>) -> Self {
233        Self {
234            evm_config: job.evm_config,
235            provider: job.provider,
236            range: job.range,
237            stream_parallelism: job.stream_parallelism,
238        }
239    }
240}
241
242#[cfg(test)]
243mod tests {
244    use crate::{
245        backfill::{
246            job::ExecutionStageThresholds,
247            test_utils::{blocks_and_execution_outputs, chain_spec, to_execution_outcome},
248        },
249        BackfillJobFactory,
250    };
251    use alloy_consensus::BlockHeader;
252    use reth_db_common::init::init_genesis;
253    use reth_evm_ethereum::EthEvmConfig;
254    use reth_primitives_traits::crypto::secp256k1::public_key_to_address;
255    use reth_provider::{
256        providers::BlockchainProvider, test_utils::create_test_provider_factory_with_chain_spec,
257    };
258    use reth_testing_utils::generators;
259
260    #[test]
261    fn test_backfill() -> eyre::Result<()> {
262        reth_tracing::init_test_tracing();
263
264        // Create a key pair for the sender
265        let key_pair = generators::generate_key(&mut generators::rng());
266        let address = public_key_to_address(key_pair.public_key());
267
268        let chain_spec = chain_spec(address);
269
270        let executor = EthEvmConfig::ethereum(chain_spec.clone());
271        let provider_factory = create_test_provider_factory_with_chain_spec(chain_spec.clone());
272        init_genesis(&provider_factory)?;
273        let blockchain_db = BlockchainProvider::new(provider_factory.clone())?;
274
275        let blocks_and_execution_outputs =
276            blocks_and_execution_outputs(provider_factory, chain_spec, key_pair)?;
277        let (block, block_execution_output) = blocks_and_execution_outputs.first().unwrap();
278        let execution_outcome = to_execution_outcome(block.number, block_execution_output);
279
280        // Backfill the first block
281        let factory = BackfillJobFactory::new(executor, blockchain_db);
282        let job = factory.backfill(1..=1);
283        let chains = job.collect::<Result<Vec<_>, _>>()?;
284
285        // Assert that the backfill job produced the same chain as we got before when we were
286        // executing only the first block
287        assert_eq!(chains.len(), 1);
288        let mut chain = chains.into_iter().next().unwrap();
289        chain.execution_outcome_mut().bundle.reverts.sort();
290        assert_eq!(chain.blocks(), &[(1, block.clone())].into());
291        assert_eq!(chain.execution_outcome(), &execution_outcome);
292
293        Ok(())
294    }
295
296    #[test]
297    fn test_single_block_backfill() -> eyre::Result<()> {
298        reth_tracing::init_test_tracing();
299
300        // Create a key pair for the sender
301        let key_pair = generators::generate_key(&mut generators::rng());
302        let address = public_key_to_address(key_pair.public_key());
303
304        let chain_spec = chain_spec(address);
305
306        let executor = EthEvmConfig::ethereum(chain_spec.clone());
307        let provider_factory = create_test_provider_factory_with_chain_spec(chain_spec.clone());
308        init_genesis(&provider_factory)?;
309        let blockchain_db = BlockchainProvider::new(provider_factory.clone())?;
310
311        let blocks_and_execution_outcomes =
312            blocks_and_execution_outputs(provider_factory, chain_spec, key_pair)?;
313
314        // Backfill the first block
315        let factory = BackfillJobFactory::new(executor, blockchain_db);
316        let job = factory.backfill(1..=1);
317        let single_job = job.into_single_blocks();
318        let block_execution_it = single_job.into_iter();
319
320        // Assert that the backfill job only produces a single block
321        let blocks_and_outcomes = block_execution_it.collect::<Vec<_>>();
322        assert_eq!(blocks_and_outcomes.len(), 1);
323
324        // Assert that the backfill job single block iterator produces the expected output for each
325        // block
326        for (i, res) in blocks_and_outcomes.into_iter().enumerate() {
327            let (block, mut execution_output) = res?;
328            execution_output.state.reverts.sort();
329
330            let expected_block = blocks_and_execution_outcomes[i].0.clone();
331            let expected_output = &blocks_and_execution_outcomes[i].1;
332
333            assert_eq!(block, expected_block);
334            assert_eq!(&execution_output, expected_output);
335        }
336
337        Ok(())
338    }
339
340    /// Verify that ExEx backfill (using `history_by_block_number`) produces identical execution
341    /// results to the pipeline path (using `LatestStateProvider`).
342    ///
343    /// This is a regression test for an issue reported on mainnet where backfill fails around
344    /// blocks 1.7M-3.8M with "transaction gas limit X is more than blocks available gas Y",
345    /// suggesting the state provider used during backfill may return different state than what the
346    /// pipeline used during initial sync.
347    #[test]
348    fn test_backfill_state_provider_parity() -> eyre::Result<()> {
349        reth_tracing::init_test_tracing();
350
351        let key_pair = generators::generate_key(&mut generators::rng());
352        let address = public_key_to_address(key_pair.public_key());
353
354        let chain_spec = chain_spec(address);
355
356        let executor = EthEvmConfig::ethereum(chain_spec.clone());
357        let provider_factory = create_test_provider_factory_with_chain_spec(chain_spec.clone());
358        init_genesis(&provider_factory)?;
359        let blockchain_db = BlockchainProvider::new(provider_factory.clone())?;
360
361        // Execute blocks via LatestStateProvider (pipeline-style) and commit to DB.
362        // This mirrors what the pipeline's ExecutionStage does.
363        let pipeline_results =
364            blocks_and_execution_outputs(provider_factory, chain_spec, key_pair)?;
365
366        // Now re-execute via SingleBlockBackfillJob which uses history_by_block_number.
367        let factory = BackfillJobFactory::new(executor, blockchain_db);
368        let job = factory.backfill(1..=2);
369        let single_job = job.into_single_blocks();
370        let backfill_results: Vec<_> = single_job.into_iter().collect::<Result<Vec<_>, _>>()?;
371
372        assert_eq!(
373            pipeline_results.len(),
374            backfill_results.len(),
375            "should produce same number of block results"
376        );
377
378        for (i, ((pipeline_block, pipeline_output), (backfill_block, mut backfill_output))) in
379            pipeline_results.iter().zip(backfill_results.into_iter()).enumerate()
380        {
381            backfill_output.state.reverts.sort();
382
383            assert_eq!(
384                backfill_block, *pipeline_block,
385                "block {i} mismatch between pipeline and backfill"
386            );
387
388            assert_eq!(
389                backfill_output.receipts, pipeline_output.receipts,
390                "block {i}: receipts differ — gas accounting divergence between \
391                 LatestStateProvider and history_by_block_number"
392            );
393
394            assert_eq!(
395                backfill_output.gas_used, pipeline_output.gas_used,
396                "block {i}: gas_used differs"
397            );
398
399            assert_eq!(
400                &backfill_output, pipeline_output,
401                "block {i}: full execution output differs between pipeline and backfill"
402            );
403        }
404
405        Ok(())
406    }
407
408    /// Same as above but for the batch `BackfillJob` path (`execute_range`), which also uses
409    /// `history_by_block_number`. Verifies the batch execution outcome matches what the pipeline
410    /// produced block-by-block.
411    #[test]
412    fn test_backfill_batch_state_provider_parity() -> eyre::Result<()> {
413        reth_tracing::init_test_tracing();
414
415        let key_pair = generators::generate_key(&mut generators::rng());
416        let address = public_key_to_address(key_pair.public_key());
417
418        let chain_spec = chain_spec(address);
419
420        let executor = EthEvmConfig::ethereum(chain_spec.clone());
421        let provider_factory = create_test_provider_factory_with_chain_spec(chain_spec.clone());
422        init_genesis(&provider_factory)?;
423        let blockchain_db = BlockchainProvider::new(provider_factory.clone())?;
424
425        let pipeline_results =
426            blocks_and_execution_outputs(provider_factory, chain_spec, key_pair)?;
427
428        // Re-execute via batch BackfillJob (execute_range) using history_by_block_number
429        let factory = BackfillJobFactory::new(executor, blockchain_db);
430        let job = factory.backfill(1..=2);
431        let chains = job.collect::<Result<Vec<_>, _>>()?;
432
433        assert_eq!(chains.len(), 1, "two blocks without threshold should yield one chain");
434        let chain = chains.into_iter().next().unwrap();
435
436        // Compare each block's receipts from the chain against the pipeline outputs
437        for (i, (pipeline_block, pipeline_output)) in pipeline_results.iter().enumerate() {
438            let block_number = pipeline_block.number();
439            let chain_block = chain.blocks().get(&block_number).expect("block should be in chain");
440            assert_eq!(chain_block, pipeline_block, "block {i}: block mismatch in batch backfill");
441
442            let chain_receipts = &chain.execution_outcome().receipts[i];
443            assert_eq!(
444                chain_receipts, &pipeline_output.receipts,
445                "block {i}: receipts differ in batch backfill — potential gas accounting \
446                 divergence between LatestStateProvider and history_by_block_number"
447            );
448        }
449
450        Ok(())
451    }
452
453    #[test]
454    fn test_backfill_with_batch_threshold() -> eyre::Result<()> {
455        reth_tracing::init_test_tracing();
456
457        // Create a key pair for the sender
458        let key_pair = generators::generate_key(&mut generators::rng());
459        let address = public_key_to_address(key_pair.public_key());
460
461        let chain_spec = chain_spec(address);
462
463        let executor = EthEvmConfig::ethereum(chain_spec.clone());
464        let provider_factory = create_test_provider_factory_with_chain_spec(chain_spec.clone());
465        init_genesis(&provider_factory)?;
466        let blockchain_db = BlockchainProvider::new(provider_factory.clone())?;
467
468        let blocks_and_execution_outputs =
469            blocks_and_execution_outputs(provider_factory, chain_spec, key_pair)?;
470        let (block1, output1) = blocks_and_execution_outputs[0].clone();
471        let (block2, output2) = blocks_and_execution_outputs[1].clone();
472
473        // Backfill with max_blocks=1, expect two separate chains
474        let factory = BackfillJobFactory::new(executor, blockchain_db).with_thresholds(
475            ExecutionStageThresholds { max_blocks: Some(1), ..Default::default() },
476        );
477        let job = factory.backfill(1..=2);
478        let chains = job.collect::<Result<Vec<_>, _>>()?;
479
480        // Assert two chains, each with one block
481        assert_eq!(chains.len(), 2);
482
483        let mut chain1 = chains[0].clone();
484        chain1.execution_outcome_mut().bundle.reverts.sort();
485        assert_eq!(chain1.blocks(), &[(1, block1)].into());
486        assert_eq!(chain1.execution_outcome(), &to_execution_outcome(1, &output1));
487
488        let mut chain2 = chains[1].clone();
489        chain2.execution_outcome_mut().bundle.reverts.sort();
490        assert_eq!(chain2.blocks(), &[(2, block2)].into());
491        assert_eq!(chain2.execution_outcome(), &to_execution_outcome(2, &output2));
492
493        Ok(())
494    }
495}