reth_exex/backfill/
job.rs

1use crate::StreamBackfillJob;
2use std::{
3    ops::RangeInclusive,
4    time::{Duration, Instant},
5};
6
7use alloy_consensus::BlockHeader;
8use alloy_primitives::BlockNumber;
9use reth_evm::execute::{
10    BlockExecutionError, BlockExecutionOutput, BlockExecutorProvider, Executor,
11};
12use reth_node_api::{Block as _, BlockBody as _, NodePrimitives};
13use reth_primitives::{Receipt, RecoveredBlock};
14use reth_primitives_traits::{format_gas_throughput, SignedTransaction};
15use reth_provider::{
16    BlockReader, Chain, ExecutionOutcome, HeaderProvider, ProviderError, StateProviderFactory,
17    TransactionVariant,
18};
19use reth_prune_types::PruneModes;
20use reth_revm::database::StateProviderDatabase;
21use reth_stages_api::ExecutionStageThresholds;
22use reth_tracing::tracing::{debug, trace};
23
24pub(super) type BackfillJobResult<T> = Result<T, BlockExecutionError>;
25
26/// Backfill job started for a specific range.
27///
28/// It implements [`Iterator`] that executes blocks in batches according to the provided thresholds
29/// and yields [`Chain`]. In other words, this iterator can yield multiple items for the given range
30/// depending on the configured thresholds.
31#[derive(Debug)]
32pub struct BackfillJob<E, P> {
33    pub(crate) executor: E,
34    pub(crate) provider: P,
35    pub(crate) prune_modes: PruneModes,
36    pub(crate) thresholds: ExecutionStageThresholds,
37    pub(crate) range: RangeInclusive<BlockNumber>,
38    pub(crate) stream_parallelism: usize,
39}
40
41impl<E, P> Iterator for BackfillJob<E, P>
42where
43    E: BlockExecutorProvider<Primitives: NodePrimitives<Block = P::Block>>,
44    P: HeaderProvider + BlockReader<Transaction: SignedTransaction> + StateProviderFactory,
45{
46    type Item = BackfillJobResult<Chain<E::Primitives>>;
47
48    fn next(&mut self) -> Option<Self::Item> {
49        if self.range.is_empty() {
50            return None
51        }
52
53        Some(self.execute_range())
54    }
55}
56
57impl<E, P> BackfillJob<E, P>
58where
59    E: BlockExecutorProvider<Primitives: NodePrimitives<Block = P::Block>>,
60    P: BlockReader<Transaction: SignedTransaction> + HeaderProvider + StateProviderFactory,
61{
62    /// Converts the backfill job into a single block backfill job.
63    pub fn into_single_blocks(self) -> SingleBlockBackfillJob<E, P> {
64        self.into()
65    }
66
67    /// Converts the backfill job into a stream.
68    pub fn into_stream(self) -> StreamBackfillJob<E, P, Chain<E::Primitives>> {
69        self.into()
70    }
71
72    fn execute_range(&mut self) -> BackfillJobResult<Chain<E::Primitives>> {
73        debug!(
74            target: "exex::backfill",
75            range = ?self.range,
76            "Executing block range"
77        );
78
79        let mut executor = self.executor.executor(StateProviderDatabase::new(
80            self.provider
81                .history_by_block_number(self.range.start().saturating_sub(1))
82                .map_err(BlockExecutionError::other)?,
83        ));
84
85        let mut fetch_block_duration = Duration::default();
86        let mut execution_duration = Duration::default();
87        let mut cumulative_gas = 0;
88        let batch_start = Instant::now();
89
90        let mut blocks = Vec::new();
91        let mut results = Vec::new();
92        for block_number in self.range.clone() {
93            // Fetch the block
94            let fetch_block_start = Instant::now();
95
96            // we need the block's transactions along with their hashes
97            let block = self
98                .provider
99                .sealed_block_with_senders(block_number.into(), TransactionVariant::WithHash)
100                .map_err(BlockExecutionError::other)?
101                .ok_or_else(|| ProviderError::HeaderNotFound(block_number.into()))
102                .map_err(BlockExecutionError::other)?;
103
104            fetch_block_duration += fetch_block_start.elapsed();
105
106            cumulative_gas += block.gas_used();
107
108            // Configure the executor to use the current state.
109            trace!(target: "exex::backfill", number = block_number, txs = block.body().transactions().len(), "Executing block");
110
111            // Execute the block
112            let execute_start = Instant::now();
113
114            // Unseal the block for execution
115            let (block, senders) = block.split_sealed();
116            let (header, body) = block.split_sealed_header_body();
117            let block = P::Block::new_sealed(header, body).with_senders(senders);
118
119            results.push(executor.execute_one(&block)?);
120            execution_duration += execute_start.elapsed();
121
122            // TODO(alexey): report gas metrics using `block.header.gas_used`
123
124            // Seal the block back and save it
125            blocks.push(block);
126            // Check if we should commit now
127            if self.thresholds.is_end_of_batch(
128                block_number - *self.range.start(),
129                executor.size_hint() as u64,
130                cumulative_gas,
131                batch_start.elapsed(),
132            ) {
133                break
134            }
135        }
136
137        let first_block_number = blocks.first().expect("blocks should not be empty").number();
138        let last_block_number = blocks.last().expect("blocks should not be empty").number();
139        debug!(
140            target: "exex::backfill",
141            range = ?*self.range.start()..=last_block_number,
142            block_fetch = ?fetch_block_duration,
143            execution = ?execution_duration,
144            throughput = format_gas_throughput(cumulative_gas, execution_duration),
145            "Finished executing block range"
146        );
147        self.range = last_block_number + 1..=*self.range.end();
148
149        let outcome = ExecutionOutcome::from_blocks(
150            first_block_number,
151            executor.into_state().take_bundle(),
152            results,
153        );
154        let chain = Chain::new(blocks, outcome, None);
155        Ok(chain)
156    }
157}
158
159/// Single block Backfill job started for a specific range.
160///
161/// It implements [`Iterator`] which executes a block each time the
162/// iterator is advanced and yields ([`RecoveredBlock`], [`BlockExecutionOutput`])
163#[derive(Debug, Clone)]
164pub struct SingleBlockBackfillJob<E, P> {
165    pub(crate) executor: E,
166    pub(crate) provider: P,
167    pub(crate) range: RangeInclusive<BlockNumber>,
168    pub(crate) stream_parallelism: usize,
169}
170
171impl<E, P> Iterator for SingleBlockBackfillJob<E, P>
172where
173    E: BlockExecutorProvider<Primitives: NodePrimitives<Block = P::Block>>,
174    P: HeaderProvider + BlockReader + StateProviderFactory,
175{
176    type Item = BackfillJobResult<(
177        RecoveredBlock<P::Block>,
178        BlockExecutionOutput<<E::Primitives as NodePrimitives>::Receipt>,
179    )>;
180
181    fn next(&mut self) -> Option<Self::Item> {
182        self.range.next().map(|block_number| self.execute_block(block_number))
183    }
184}
185
186impl<E, P> SingleBlockBackfillJob<E, P>
187where
188    E: BlockExecutorProvider<Primitives: NodePrimitives<Block = P::Block>>,
189    P: HeaderProvider + BlockReader + StateProviderFactory,
190{
191    /// Converts the single block backfill job into a stream.
192    pub fn into_stream(
193        self,
194    ) -> StreamBackfillJob<
195        E,
196        P,
197        (RecoveredBlock<reth_primitives::Block>, BlockExecutionOutput<Receipt>),
198    > {
199        self.into()
200    }
201
202    #[expect(clippy::type_complexity)]
203    pub(crate) fn execute_block(
204        &self,
205        block_number: u64,
206    ) -> BackfillJobResult<(
207        RecoveredBlock<P::Block>,
208        BlockExecutionOutput<<E::Primitives as NodePrimitives>::Receipt>,
209    )> {
210        // Fetch the block with senders for execution.
211        let block_with_senders = self
212            .provider
213            .recovered_block(block_number.into(), TransactionVariant::WithHash)
214            .map_err(BlockExecutionError::other)?
215            .ok_or_else(|| ProviderError::HeaderNotFound(block_number.into()))
216            .map_err(BlockExecutionError::other)?;
217
218        // Configure the executor to use the previous block's state.
219        let executor = self.executor.executor(StateProviderDatabase::new(
220            self.provider
221                .history_by_block_number(block_number.saturating_sub(1))
222                .map_err(BlockExecutionError::other)?,
223        ));
224
225        trace!(target: "exex::backfill", number = block_number, txs = block_with_senders.body().transaction_count(), "Executing block");
226
227        let block_execution_output = executor.execute(&block_with_senders)?;
228
229        Ok((block_with_senders, block_execution_output))
230    }
231}
232
233impl<E, P> From<BackfillJob<E, P>> for SingleBlockBackfillJob<E, P> {
234    fn from(job: BackfillJob<E, P>) -> Self {
235        Self {
236            executor: job.executor,
237            provider: job.provider,
238            range: job.range,
239            stream_parallelism: job.stream_parallelism,
240        }
241    }
242}
243
244#[cfg(test)]
245mod tests {
246    use crate::{
247        backfill::test_utils::{blocks_and_execution_outputs, chain_spec, to_execution_outcome},
248        BackfillJobFactory,
249    };
250    use reth_db_common::init::init_genesis;
251    use reth_evm_ethereum::execute::EthExecutorProvider;
252    use reth_primitives_traits::crypto::secp256k1::public_key_to_address;
253    use reth_provider::{
254        providers::BlockchainProvider, test_utils::create_test_provider_factory_with_chain_spec,
255    };
256    use reth_testing_utils::generators;
257    use secp256k1::Keypair;
258
259    #[test]
260    fn test_backfill() -> eyre::Result<()> {
261        reth_tracing::init_test_tracing();
262
263        // Create a key pair for the sender
264        let key_pair = Keypair::new_global(&mut generators::rng());
265        let address = public_key_to_address(key_pair.public_key());
266
267        let chain_spec = chain_spec(address);
268
269        let executor = EthExecutorProvider::ethereum(chain_spec.clone());
270        let provider_factory = create_test_provider_factory_with_chain_spec(chain_spec.clone());
271        init_genesis(&provider_factory)?;
272        let blockchain_db = BlockchainProvider::new(provider_factory.clone())?;
273
274        let blocks_and_execution_outputs =
275            blocks_and_execution_outputs(provider_factory, chain_spec, key_pair)?;
276        let (block, block_execution_output) = blocks_and_execution_outputs.first().unwrap();
277        let execution_outcome = to_execution_outcome(block.number, block_execution_output);
278
279        // Backfill the first block
280        let factory = BackfillJobFactory::new(executor, blockchain_db);
281        let job = factory.backfill(1..=1);
282        let chains = job.collect::<Result<Vec<_>, _>>()?;
283
284        // Assert that the backfill job produced the same chain as we got before when we were
285        // executing only the first block
286        assert_eq!(chains.len(), 1);
287        let mut chain = chains.into_iter().next().unwrap();
288        chain.execution_outcome_mut().bundle.reverts.sort();
289        assert_eq!(chain.blocks(), &[(1, block.clone())].into());
290        assert_eq!(chain.execution_outcome(), &execution_outcome);
291
292        Ok(())
293    }
294
295    #[test]
296    fn test_single_block_backfill() -> eyre::Result<()> {
297        reth_tracing::init_test_tracing();
298
299        // Create a key pair for the sender
300        let key_pair = Keypair::new_global(&mut generators::rng());
301        let address = public_key_to_address(key_pair.public_key());
302
303        let chain_spec = chain_spec(address);
304
305        let executor = EthExecutorProvider::ethereum(chain_spec.clone());
306        let provider_factory = create_test_provider_factory_with_chain_spec(chain_spec.clone());
307        init_genesis(&provider_factory)?;
308        let blockchain_db = BlockchainProvider::new(provider_factory.clone())?;
309
310        let blocks_and_execution_outcomes =
311            blocks_and_execution_outputs(provider_factory, chain_spec, key_pair)?;
312
313        // Backfill the first block
314        let factory = BackfillJobFactory::new(executor, blockchain_db);
315        let job = factory.backfill(1..=1);
316        let single_job = job.into_single_blocks();
317        let block_execution_it = single_job.into_iter();
318
319        // Assert that the backfill job only produces a single block
320        let blocks_and_outcomes = block_execution_it.collect::<Vec<_>>();
321        assert_eq!(blocks_and_outcomes.len(), 1);
322
323        // Assert that the backfill job single block iterator produces the expected output for each
324        // block
325        for (i, res) in blocks_and_outcomes.into_iter().enumerate() {
326            let (block, mut execution_output) = res?;
327            execution_output.state.reverts.sort();
328
329            let expected_block = blocks_and_execution_outcomes[i].0.clone();
330            let expected_output = &blocks_and_execution_outcomes[i].1;
331
332            assert_eq!(block, expected_block);
333            assert_eq!(&execution_output, expected_output);
334        }
335
336        Ok(())
337    }
338}