1use crate::StreamBackfillJob;
2use reth_evm::ConfigureEvm;
3use std::{
4 collections::BTreeMap,
5 ops::RangeInclusive,
6 time::{Duration, Instant},
7};
8
9use alloy_consensus::BlockHeader;
10use alloy_primitives::BlockNumber;
11use reth_ethereum_primitives::Receipt;
12use reth_evm::execute::{BlockExecutionError, BlockExecutionOutput, Executor};
13use reth_node_api::{Block as _, BlockBody as _, NodePrimitives};
14use reth_primitives_traits::{format_gas_throughput, RecoveredBlock, SignedTransaction};
15use reth_provider::{
16 BlockReader, Chain, ExecutionOutcome, HeaderProvider, ProviderError, StateProviderFactory,
17 TransactionVariant,
18};
19use reth_prune_types::PruneModes;
20use reth_revm::database::StateProviderDatabase;
21use reth_stages_api::ExecutionStageThresholds;
22use reth_tracing::tracing::{debug, trace};
23
24pub(super) type BackfillJobResult<T> = Result<T, BlockExecutionError>;
25
26#[derive(Debug)]
32pub struct BackfillJob<E, P> {
33 pub(crate) evm_config: E,
34 pub(crate) provider: P,
35 pub(crate) prune_modes: PruneModes,
36 pub(crate) thresholds: ExecutionStageThresholds,
37 pub(crate) range: RangeInclusive<BlockNumber>,
38 pub(crate) stream_parallelism: usize,
39}
40
41impl<E, P> Iterator for BackfillJob<E, P>
42where
43 E: ConfigureEvm<Primitives: NodePrimitives<Block = P::Block>> + 'static,
44 P: HeaderProvider + BlockReader<Transaction: SignedTransaction> + StateProviderFactory,
45{
46 type Item = BackfillJobResult<Chain<E::Primitives>>;
47
48 fn next(&mut self) -> Option<Self::Item> {
49 if self.range.is_empty() {
50 return None
51 }
52
53 Some(self.execute_range())
54 }
55}
56
57impl<E, P> BackfillJob<E, P>
58where
59 E: ConfigureEvm<Primitives: NodePrimitives<Block = P::Block>> + 'static,
60 P: BlockReader<Transaction: SignedTransaction> + HeaderProvider + StateProviderFactory,
61{
62 pub fn into_single_blocks(self) -> SingleBlockBackfillJob<E, P> {
64 self.into()
65 }
66
67 pub fn into_stream(self) -> StreamBackfillJob<E, P, Chain<E::Primitives>> {
69 self.into()
70 }
71
72 fn execute_range(&mut self) -> BackfillJobResult<Chain<E::Primitives>> {
73 debug!(
74 target: "exex::backfill",
75 range = ?self.range,
76 "Executing block range"
77 );
78
79 let mut executor = self.evm_config.batch_executor(StateProviderDatabase::new(
80 self.provider
81 .history_by_block_number(self.range.start().saturating_sub(1))
82 .map_err(BlockExecutionError::other)?,
83 ));
84
85 let mut fetch_block_duration = Duration::default();
86 let mut execution_duration = Duration::default();
87 let mut cumulative_gas = 0;
88 let batch_start = Instant::now();
89
90 let mut blocks = Vec::new();
91 let mut results = Vec::new();
92 for block_number in self.range.clone() {
93 let fetch_block_start = Instant::now();
95
96 let block = self
98 .provider
99 .sealed_block_with_senders(block_number.into(), TransactionVariant::WithHash)
100 .map_err(BlockExecutionError::other)?
101 .ok_or_else(|| ProviderError::HeaderNotFound(block_number.into()))
102 .map_err(BlockExecutionError::other)?;
103
104 fetch_block_duration += fetch_block_start.elapsed();
105
106 cumulative_gas += block.gas_used();
107
108 trace!(target: "exex::backfill", number = block_number, txs = block.body().transactions().len(), "Executing block");
110
111 let execute_start = Instant::now();
113
114 let (block, senders) = block.split_sealed();
116 let (header, body) = block.split_sealed_header_body();
117 let block = P::Block::new_sealed(header, body).with_senders(senders);
118
119 results.push(executor.execute_one(&block)?);
120 execution_duration += execute_start.elapsed();
121
122 blocks.push(block);
124 if self.thresholds.is_end_of_batch(
126 block_number - *self.range.start() + 1,
127 executor.size_hint() as u64,
128 cumulative_gas,
129 batch_start.elapsed(),
130 ) {
131 break
132 }
133 }
134
135 let first_block_number = blocks.first().expect("blocks should not be empty").number();
136 let last_block_number = blocks.last().expect("blocks should not be empty").number();
137 debug!(
138 target: "exex::backfill",
139 range = ?*self.range.start()..=last_block_number,
140 block_fetch = ?fetch_block_duration,
141 execution = ?execution_duration,
142 throughput = format_gas_throughput(cumulative_gas, execution_duration),
143 "Finished executing block range"
144 );
145 self.range = last_block_number + 1..=*self.range.end();
146
147 let outcome = ExecutionOutcome::from_blocks(
148 first_block_number,
149 executor.into_state().take_bundle(),
150 results,
151 );
152 let chain = Chain::new(blocks, outcome, BTreeMap::new());
153 Ok(chain)
154 }
155}
156
157#[derive(Debug, Clone)]
162pub struct SingleBlockBackfillJob<E, P> {
163 pub(crate) evm_config: E,
164 pub(crate) provider: P,
165 pub(crate) range: RangeInclusive<BlockNumber>,
166 pub(crate) stream_parallelism: usize,
167}
168
169impl<E, P> Iterator for SingleBlockBackfillJob<E, P>
170where
171 E: ConfigureEvm<Primitives: NodePrimitives<Block = P::Block>> + 'static,
172 P: HeaderProvider + BlockReader + StateProviderFactory,
173{
174 type Item = BackfillJobResult<(
175 RecoveredBlock<P::Block>,
176 BlockExecutionOutput<<E::Primitives as NodePrimitives>::Receipt>,
177 )>;
178
179 fn next(&mut self) -> Option<Self::Item> {
180 self.range.next().map(|block_number| self.execute_block(block_number))
181 }
182}
183
184impl<E, P> SingleBlockBackfillJob<E, P>
185where
186 E: ConfigureEvm<Primitives: NodePrimitives<Block = P::Block>> + 'static,
187 P: HeaderProvider + BlockReader + StateProviderFactory,
188{
189 pub fn into_stream(
191 self,
192 ) -> StreamBackfillJob<
193 E,
194 P,
195 (RecoveredBlock<reth_ethereum_primitives::Block>, BlockExecutionOutput<Receipt>),
196 > {
197 self.into()
198 }
199
200 #[expect(clippy::type_complexity)]
201 pub(crate) fn execute_block(
202 &self,
203 block_number: u64,
204 ) -> BackfillJobResult<(
205 RecoveredBlock<P::Block>,
206 BlockExecutionOutput<<E::Primitives as NodePrimitives>::Receipt>,
207 )> {
208 let block_with_senders = self
210 .provider
211 .recovered_block(block_number.into(), TransactionVariant::WithHash)
212 .map_err(BlockExecutionError::other)?
213 .ok_or_else(|| ProviderError::HeaderNotFound(block_number.into()))
214 .map_err(BlockExecutionError::other)?;
215
216 let executor = self.evm_config.batch_executor(StateProviderDatabase::new(
218 self.provider
219 .history_by_block_number(block_number.saturating_sub(1))
220 .map_err(BlockExecutionError::other)?,
221 ));
222
223 trace!(target: "exex::backfill", number = block_number, txs = block_with_senders.body().transaction_count(), "Executing block");
224
225 let block_execution_output = executor.execute(&block_with_senders)?;
226
227 Ok((block_with_senders, block_execution_output))
228 }
229}
230
231impl<E, P> From<BackfillJob<E, P>> for SingleBlockBackfillJob<E, P> {
232 fn from(job: BackfillJob<E, P>) -> Self {
233 Self {
234 evm_config: job.evm_config,
235 provider: job.provider,
236 range: job.range,
237 stream_parallelism: job.stream_parallelism,
238 }
239 }
240}
241
242#[cfg(test)]
243mod tests {
244 use crate::{
245 backfill::{
246 job::ExecutionStageThresholds,
247 test_utils::{blocks_and_execution_outputs, chain_spec, to_execution_outcome},
248 },
249 BackfillJobFactory,
250 };
251 use alloy_consensus::BlockHeader;
252 use reth_db_common::init::init_genesis;
253 use reth_evm_ethereum::EthEvmConfig;
254 use reth_primitives_traits::crypto::secp256k1::public_key_to_address;
255 use reth_provider::{
256 providers::BlockchainProvider, test_utils::create_test_provider_factory_with_chain_spec,
257 };
258 use reth_testing_utils::generators;
259
260 #[test]
261 fn test_backfill() -> eyre::Result<()> {
262 reth_tracing::init_test_tracing();
263
264 let key_pair = generators::generate_key(&mut generators::rng());
266 let address = public_key_to_address(key_pair.public_key());
267
268 let chain_spec = chain_spec(address);
269
270 let executor = EthEvmConfig::ethereum(chain_spec.clone());
271 let provider_factory = create_test_provider_factory_with_chain_spec(chain_spec.clone());
272 init_genesis(&provider_factory)?;
273 let blockchain_db = BlockchainProvider::new(provider_factory.clone())?;
274
275 let blocks_and_execution_outputs =
276 blocks_and_execution_outputs(provider_factory, chain_spec, key_pair)?;
277 let (block, block_execution_output) = blocks_and_execution_outputs.first().unwrap();
278 let execution_outcome = to_execution_outcome(block.number, block_execution_output);
279
280 let factory = BackfillJobFactory::new(executor, blockchain_db);
282 let job = factory.backfill(1..=1);
283 let chains = job.collect::<Result<Vec<_>, _>>()?;
284
285 assert_eq!(chains.len(), 1);
288 let mut chain = chains.into_iter().next().unwrap();
289 chain.execution_outcome_mut().bundle.reverts.sort();
290 assert_eq!(chain.blocks(), &[(1, block.clone())].into());
291 assert_eq!(chain.execution_outcome(), &execution_outcome);
292
293 Ok(())
294 }
295
296 #[test]
297 fn test_single_block_backfill() -> eyre::Result<()> {
298 reth_tracing::init_test_tracing();
299
300 let key_pair = generators::generate_key(&mut generators::rng());
302 let address = public_key_to_address(key_pair.public_key());
303
304 let chain_spec = chain_spec(address);
305
306 let executor = EthEvmConfig::ethereum(chain_spec.clone());
307 let provider_factory = create_test_provider_factory_with_chain_spec(chain_spec.clone());
308 init_genesis(&provider_factory)?;
309 let blockchain_db = BlockchainProvider::new(provider_factory.clone())?;
310
311 let blocks_and_execution_outcomes =
312 blocks_and_execution_outputs(provider_factory, chain_spec, key_pair)?;
313
314 let factory = BackfillJobFactory::new(executor, blockchain_db);
316 let job = factory.backfill(1..=1);
317 let single_job = job.into_single_blocks();
318 let block_execution_it = single_job.into_iter();
319
320 let blocks_and_outcomes = block_execution_it.collect::<Vec<_>>();
322 assert_eq!(blocks_and_outcomes.len(), 1);
323
324 for (i, res) in blocks_and_outcomes.into_iter().enumerate() {
327 let (block, mut execution_output) = res?;
328 execution_output.state.reverts.sort();
329
330 let expected_block = blocks_and_execution_outcomes[i].0.clone();
331 let expected_output = &blocks_and_execution_outcomes[i].1;
332
333 assert_eq!(block, expected_block);
334 assert_eq!(&execution_output, expected_output);
335 }
336
337 Ok(())
338 }
339
340 #[test]
348 fn test_backfill_state_provider_parity() -> eyre::Result<()> {
349 reth_tracing::init_test_tracing();
350
351 let key_pair = generators::generate_key(&mut generators::rng());
352 let address = public_key_to_address(key_pair.public_key());
353
354 let chain_spec = chain_spec(address);
355
356 let executor = EthEvmConfig::ethereum(chain_spec.clone());
357 let provider_factory = create_test_provider_factory_with_chain_spec(chain_spec.clone());
358 init_genesis(&provider_factory)?;
359 let blockchain_db = BlockchainProvider::new(provider_factory.clone())?;
360
361 let pipeline_results =
364 blocks_and_execution_outputs(provider_factory, chain_spec, key_pair)?;
365
366 let factory = BackfillJobFactory::new(executor, blockchain_db);
368 let job = factory.backfill(1..=2);
369 let single_job = job.into_single_blocks();
370 let backfill_results: Vec<_> = single_job.into_iter().collect::<Result<Vec<_>, _>>()?;
371
372 assert_eq!(
373 pipeline_results.len(),
374 backfill_results.len(),
375 "should produce same number of block results"
376 );
377
378 for (i, ((pipeline_block, pipeline_output), (backfill_block, mut backfill_output))) in
379 pipeline_results.iter().zip(backfill_results.into_iter()).enumerate()
380 {
381 backfill_output.state.reverts.sort();
382
383 assert_eq!(
384 backfill_block, *pipeline_block,
385 "block {i} mismatch between pipeline and backfill"
386 );
387
388 assert_eq!(
389 backfill_output.receipts, pipeline_output.receipts,
390 "block {i}: receipts differ — gas accounting divergence between \
391 LatestStateProvider and history_by_block_number"
392 );
393
394 assert_eq!(
395 backfill_output.gas_used, pipeline_output.gas_used,
396 "block {i}: gas_used differs"
397 );
398
399 assert_eq!(
400 &backfill_output, pipeline_output,
401 "block {i}: full execution output differs between pipeline and backfill"
402 );
403 }
404
405 Ok(())
406 }
407
408 #[test]
412 fn test_backfill_batch_state_provider_parity() -> eyre::Result<()> {
413 reth_tracing::init_test_tracing();
414
415 let key_pair = generators::generate_key(&mut generators::rng());
416 let address = public_key_to_address(key_pair.public_key());
417
418 let chain_spec = chain_spec(address);
419
420 let executor = EthEvmConfig::ethereum(chain_spec.clone());
421 let provider_factory = create_test_provider_factory_with_chain_spec(chain_spec.clone());
422 init_genesis(&provider_factory)?;
423 let blockchain_db = BlockchainProvider::new(provider_factory.clone())?;
424
425 let pipeline_results =
426 blocks_and_execution_outputs(provider_factory, chain_spec, key_pair)?;
427
428 let factory = BackfillJobFactory::new(executor, blockchain_db);
430 let job = factory.backfill(1..=2);
431 let chains = job.collect::<Result<Vec<_>, _>>()?;
432
433 assert_eq!(chains.len(), 1, "two blocks without threshold should yield one chain");
434 let chain = chains.into_iter().next().unwrap();
435
436 for (i, (pipeline_block, pipeline_output)) in pipeline_results.iter().enumerate() {
438 let block_number = pipeline_block.number();
439 let chain_block = chain.blocks().get(&block_number).expect("block should be in chain");
440 assert_eq!(chain_block, pipeline_block, "block {i}: block mismatch in batch backfill");
441
442 let chain_receipts = &chain.execution_outcome().receipts[i];
443 assert_eq!(
444 chain_receipts, &pipeline_output.receipts,
445 "block {i}: receipts differ in batch backfill — potential gas accounting \
446 divergence between LatestStateProvider and history_by_block_number"
447 );
448 }
449
450 Ok(())
451 }
452
453 #[test]
454 fn test_backfill_with_batch_threshold() -> eyre::Result<()> {
455 reth_tracing::init_test_tracing();
456
457 let key_pair = generators::generate_key(&mut generators::rng());
459 let address = public_key_to_address(key_pair.public_key());
460
461 let chain_spec = chain_spec(address);
462
463 let executor = EthEvmConfig::ethereum(chain_spec.clone());
464 let provider_factory = create_test_provider_factory_with_chain_spec(chain_spec.clone());
465 init_genesis(&provider_factory)?;
466 let blockchain_db = BlockchainProvider::new(provider_factory.clone())?;
467
468 let blocks_and_execution_outputs =
469 blocks_and_execution_outputs(provider_factory, chain_spec, key_pair)?;
470 let (block1, output1) = blocks_and_execution_outputs[0].clone();
471 let (block2, output2) = blocks_and_execution_outputs[1].clone();
472
473 let factory = BackfillJobFactory::new(executor, blockchain_db).with_thresholds(
475 ExecutionStageThresholds { max_blocks: Some(1), ..Default::default() },
476 );
477 let job = factory.backfill(1..=2);
478 let chains = job.collect::<Result<Vec<_>, _>>()?;
479
480 assert_eq!(chains.len(), 2);
482
483 let mut chain1 = chains[0].clone();
484 chain1.execution_outcome_mut().bundle.reverts.sort();
485 assert_eq!(chain1.blocks(), &[(1, block1)].into());
486 assert_eq!(chain1.execution_outcome(), &to_execution_outcome(1, &output1));
487
488 let mut chain2 = chains[1].clone();
489 chain2.execution_outcome_mut().bundle.reverts.sort();
490 assert_eq!(chain2.blocks(), &[(2, block2)].into());
491 assert_eq!(chain2.execution_outcome(), &to_execution_outcome(2, &output2));
492
493 Ok(())
494 }
495}