1use crate::StreamBackfillJob;
2use reth_evm::ConfigureEvm;
3use std::{
4 ops::RangeInclusive,
5 time::{Duration, Instant},
6};
7
8use alloy_consensus::BlockHeader;
9use alloy_primitives::BlockNumber;
10use reth_ethereum_primitives::Receipt;
11use reth_evm::execute::{BlockExecutionError, BlockExecutionOutput, Executor};
12use reth_node_api::{Block as _, BlockBody as _, NodePrimitives};
13use reth_primitives_traits::{format_gas_throughput, RecoveredBlock, SignedTransaction};
14use reth_provider::{
15 BlockReader, Chain, ExecutionOutcome, HeaderProvider, ProviderError, StateProviderFactory,
16 TransactionVariant,
17};
18use reth_prune_types::PruneModes;
19use reth_revm::database::StateProviderDatabase;
20use reth_stages_api::ExecutionStageThresholds;
21use reth_tracing::tracing::{debug, trace};
22
23pub(super) type BackfillJobResult<T> = Result<T, BlockExecutionError>;
24
25#[derive(Debug)]
31pub struct BackfillJob<E, P> {
32 pub(crate) evm_config: E,
33 pub(crate) provider: P,
34 pub(crate) prune_modes: PruneModes,
35 pub(crate) thresholds: ExecutionStageThresholds,
36 pub(crate) range: RangeInclusive<BlockNumber>,
37 pub(crate) stream_parallelism: usize,
38}
39
40impl<E, P> Iterator for BackfillJob<E, P>
41where
42 E: ConfigureEvm<Primitives: NodePrimitives<Block = P::Block>> + 'static,
43 P: HeaderProvider + BlockReader<Transaction: SignedTransaction> + StateProviderFactory,
44{
45 type Item = BackfillJobResult<Chain<E::Primitives>>;
46
47 fn next(&mut self) -> Option<Self::Item> {
48 if self.range.is_empty() {
49 return None
50 }
51
52 Some(self.execute_range())
53 }
54}
55
56impl<E, P> BackfillJob<E, P>
57where
58 E: ConfigureEvm<Primitives: NodePrimitives<Block = P::Block>> + 'static,
59 P: BlockReader<Transaction: SignedTransaction> + HeaderProvider + StateProviderFactory,
60{
61 pub fn into_single_blocks(self) -> SingleBlockBackfillJob<E, P> {
63 self.into()
64 }
65
66 pub fn into_stream(self) -> StreamBackfillJob<E, P, Chain<E::Primitives>> {
68 self.into()
69 }
70
71 fn execute_range(&mut self) -> BackfillJobResult<Chain<E::Primitives>> {
72 debug!(
73 target: "exex::backfill",
74 range = ?self.range,
75 "Executing block range"
76 );
77
78 let mut executor = self.evm_config.batch_executor(StateProviderDatabase::new(
79 self.provider
80 .history_by_block_number(self.range.start().saturating_sub(1))
81 .map_err(BlockExecutionError::other)?,
82 ));
83
84 let mut fetch_block_duration = Duration::default();
85 let mut execution_duration = Duration::default();
86 let mut cumulative_gas = 0;
87 let batch_start = Instant::now();
88
89 let mut blocks = Vec::new();
90 let mut results = Vec::new();
91 for block_number in self.range.clone() {
92 let fetch_block_start = Instant::now();
94
95 let block = self
97 .provider
98 .sealed_block_with_senders(block_number.into(), TransactionVariant::WithHash)
99 .map_err(BlockExecutionError::other)?
100 .ok_or_else(|| ProviderError::HeaderNotFound(block_number.into()))
101 .map_err(BlockExecutionError::other)?;
102
103 fetch_block_duration += fetch_block_start.elapsed();
104
105 cumulative_gas += block.gas_used();
106
107 trace!(target: "exex::backfill", number = block_number, txs = block.body().transactions().len(), "Executing block");
109
110 let execute_start = Instant::now();
112
113 let (block, senders) = block.split_sealed();
115 let (header, body) = block.split_sealed_header_body();
116 let block = P::Block::new_sealed(header, body).with_senders(senders);
117
118 results.push(executor.execute_one(&block)?);
119 execution_duration += execute_start.elapsed();
120
121 blocks.push(block);
123 if self.thresholds.is_end_of_batch(
125 block_number - *self.range.start() + 1,
126 executor.size_hint() as u64,
127 cumulative_gas,
128 batch_start.elapsed(),
129 ) {
130 break
131 }
132 }
133
134 let first_block_number = blocks.first().expect("blocks should not be empty").number();
135 let last_block_number = blocks.last().expect("blocks should not be empty").number();
136 debug!(
137 target: "exex::backfill",
138 range = ?*self.range.start()..=last_block_number,
139 block_fetch = ?fetch_block_duration,
140 execution = ?execution_duration,
141 throughput = format_gas_throughput(cumulative_gas, execution_duration),
142 "Finished executing block range"
143 );
144 self.range = last_block_number + 1..=*self.range.end();
145
146 let outcome = ExecutionOutcome::from_blocks(
147 first_block_number,
148 executor.into_state().take_bundle(),
149 results,
150 );
151 let chain = Chain::new(blocks, outcome, None);
152 Ok(chain)
153 }
154}
155
156#[derive(Debug, Clone)]
161pub struct SingleBlockBackfillJob<E, P> {
162 pub(crate) evm_config: E,
163 pub(crate) provider: P,
164 pub(crate) range: RangeInclusive<BlockNumber>,
165 pub(crate) stream_parallelism: usize,
166}
167
168impl<E, P> Iterator for SingleBlockBackfillJob<E, P>
169where
170 E: ConfigureEvm<Primitives: NodePrimitives<Block = P::Block>> + 'static,
171 P: HeaderProvider + BlockReader + StateProviderFactory,
172{
173 type Item = BackfillJobResult<(
174 RecoveredBlock<P::Block>,
175 BlockExecutionOutput<<E::Primitives as NodePrimitives>::Receipt>,
176 )>;
177
178 fn next(&mut self) -> Option<Self::Item> {
179 self.range.next().map(|block_number| self.execute_block(block_number))
180 }
181}
182
183impl<E, P> SingleBlockBackfillJob<E, P>
184where
185 E: ConfigureEvm<Primitives: NodePrimitives<Block = P::Block>> + 'static,
186 P: HeaderProvider + BlockReader + StateProviderFactory,
187{
188 pub fn into_stream(
190 self,
191 ) -> StreamBackfillJob<
192 E,
193 P,
194 (RecoveredBlock<reth_ethereum_primitives::Block>, BlockExecutionOutput<Receipt>),
195 > {
196 self.into()
197 }
198
199 #[expect(clippy::type_complexity)]
200 pub(crate) fn execute_block(
201 &self,
202 block_number: u64,
203 ) -> BackfillJobResult<(
204 RecoveredBlock<P::Block>,
205 BlockExecutionOutput<<E::Primitives as NodePrimitives>::Receipt>,
206 )> {
207 let block_with_senders = self
209 .provider
210 .recovered_block(block_number.into(), TransactionVariant::WithHash)
211 .map_err(BlockExecutionError::other)?
212 .ok_or_else(|| ProviderError::HeaderNotFound(block_number.into()))
213 .map_err(BlockExecutionError::other)?;
214
215 let executor = self.evm_config.batch_executor(StateProviderDatabase::new(
217 self.provider
218 .history_by_block_number(block_number.saturating_sub(1))
219 .map_err(BlockExecutionError::other)?,
220 ));
221
222 trace!(target: "exex::backfill", number = block_number, txs = block_with_senders.body().transaction_count(), "Executing block");
223
224 let block_execution_output = executor.execute(&block_with_senders)?;
225
226 Ok((block_with_senders, block_execution_output))
227 }
228}
229
230impl<E, P> From<BackfillJob<E, P>> for SingleBlockBackfillJob<E, P> {
231 fn from(job: BackfillJob<E, P>) -> Self {
232 Self {
233 evm_config: job.evm_config,
234 provider: job.provider,
235 range: job.range,
236 stream_parallelism: job.stream_parallelism,
237 }
238 }
239}
240
241#[cfg(test)]
242mod tests {
243 use crate::{
244 backfill::{
245 job::ExecutionStageThresholds,
246 test_utils::{blocks_and_execution_outputs, chain_spec, to_execution_outcome},
247 },
248 BackfillJobFactory,
249 };
250 use reth_db_common::init::init_genesis;
251 use reth_evm_ethereum::EthEvmConfig;
252 use reth_primitives_traits::crypto::secp256k1::public_key_to_address;
253 use reth_provider::{
254 providers::BlockchainProvider, test_utils::create_test_provider_factory_with_chain_spec,
255 };
256 use reth_testing_utils::generators;
257
258 #[test]
259 fn test_backfill() -> eyre::Result<()> {
260 reth_tracing::init_test_tracing();
261
262 let key_pair = generators::generate_key(&mut generators::rng());
264 let address = public_key_to_address(key_pair.public_key());
265
266 let chain_spec = chain_spec(address);
267
268 let executor = EthEvmConfig::ethereum(chain_spec.clone());
269 let provider_factory = create_test_provider_factory_with_chain_spec(chain_spec.clone());
270 init_genesis(&provider_factory)?;
271 let blockchain_db = BlockchainProvider::new(provider_factory.clone())?;
272
273 let blocks_and_execution_outputs =
274 blocks_and_execution_outputs(provider_factory, chain_spec, key_pair)?;
275 let (block, block_execution_output) = blocks_and_execution_outputs.first().unwrap();
276 let execution_outcome = to_execution_outcome(block.number, block_execution_output);
277
278 let factory = BackfillJobFactory::new(executor, blockchain_db);
280 let job = factory.backfill(1..=1);
281 let chains = job.collect::<Result<Vec<_>, _>>()?;
282
283 assert_eq!(chains.len(), 1);
286 let mut chain = chains.into_iter().next().unwrap();
287 chain.execution_outcome_mut().bundle.reverts.sort();
288 assert_eq!(chain.blocks(), &[(1, block.clone())].into());
289 assert_eq!(chain.execution_outcome(), &execution_outcome);
290
291 Ok(())
292 }
293
294 #[test]
295 fn test_single_block_backfill() -> eyre::Result<()> {
296 reth_tracing::init_test_tracing();
297
298 let key_pair = generators::generate_key(&mut generators::rng());
300 let address = public_key_to_address(key_pair.public_key());
301
302 let chain_spec = chain_spec(address);
303
304 let executor = EthEvmConfig::ethereum(chain_spec.clone());
305 let provider_factory = create_test_provider_factory_with_chain_spec(chain_spec.clone());
306 init_genesis(&provider_factory)?;
307 let blockchain_db = BlockchainProvider::new(provider_factory.clone())?;
308
309 let blocks_and_execution_outcomes =
310 blocks_and_execution_outputs(provider_factory, chain_spec, key_pair)?;
311
312 let factory = BackfillJobFactory::new(executor, blockchain_db);
314 let job = factory.backfill(1..=1);
315 let single_job = job.into_single_blocks();
316 let block_execution_it = single_job.into_iter();
317
318 let blocks_and_outcomes = block_execution_it.collect::<Vec<_>>();
320 assert_eq!(blocks_and_outcomes.len(), 1);
321
322 for (i, res) in blocks_and_outcomes.into_iter().enumerate() {
325 let (block, mut execution_output) = res?;
326 execution_output.state.reverts.sort();
327
328 let expected_block = blocks_and_execution_outcomes[i].0.clone();
329 let expected_output = &blocks_and_execution_outcomes[i].1;
330
331 assert_eq!(block, expected_block);
332 assert_eq!(&execution_output, expected_output);
333 }
334
335 Ok(())
336 }
337
338 #[test]
339 fn test_backfill_with_batch_threshold() -> eyre::Result<()> {
340 reth_tracing::init_test_tracing();
341
342 let key_pair = generators::generate_key(&mut generators::rng());
344 let address = public_key_to_address(key_pair.public_key());
345
346 let chain_spec = chain_spec(address);
347
348 let executor = EthEvmConfig::ethereum(chain_spec.clone());
349 let provider_factory = create_test_provider_factory_with_chain_spec(chain_spec.clone());
350 init_genesis(&provider_factory)?;
351 let blockchain_db = BlockchainProvider::new(provider_factory.clone())?;
352
353 let blocks_and_execution_outputs =
354 blocks_and_execution_outputs(provider_factory, chain_spec, key_pair)?;
355 let (block1, output1) = blocks_and_execution_outputs[0].clone();
356 let (block2, output2) = blocks_and_execution_outputs[1].clone();
357
358 let factory = BackfillJobFactory::new(executor, blockchain_db).with_thresholds(
360 ExecutionStageThresholds { max_blocks: Some(1), ..Default::default() },
361 );
362 let job = factory.backfill(1..=2);
363 let chains = job.collect::<Result<Vec<_>, _>>()?;
364
365 assert_eq!(chains.len(), 2);
367
368 let mut chain1 = chains[0].clone();
369 chain1.execution_outcome_mut().bundle.reverts.sort();
370 assert_eq!(chain1.blocks(), &[(1, block1)].into());
371 assert_eq!(chain1.execution_outcome(), &to_execution_outcome(1, &output1));
372
373 let mut chain2 = chains[1].clone();
374 chain2.execution_outcome_mut().bundle.reverts.sort();
375 assert_eq!(chain2.blocks(), &[(2, block2)].into());
376 assert_eq!(chain2.execution_outcome(), &to_execution_outcome(2, &output2));
377
378 Ok(())
379 }
380}