1use crate::StreamBackfillJob;
2use reth_evm::ConfigureEvm;
3use std::{
4 collections::BTreeMap,
5 ops::RangeInclusive,
6 time::{Duration, Instant},
7};
8
9use alloy_consensus::BlockHeader;
10use alloy_primitives::BlockNumber;
11use reth_ethereum_primitives::Receipt;
12use reth_evm::execute::{BlockExecutionError, BlockExecutionOutput, Executor};
13use reth_node_api::{Block as _, BlockBody as _, NodePrimitives};
14use reth_primitives_traits::{format_gas_throughput, RecoveredBlock, SignedTransaction};
15use reth_provider::{
16 BlockReader, Chain, ExecutionOutcome, HeaderProvider, ProviderError, StateProviderFactory,
17 TransactionVariant,
18};
19use reth_prune_types::PruneModes;
20use reth_revm::database::StateProviderDatabase;
21use reth_stages_api::ExecutionStageThresholds;
22use reth_tracing::tracing::{debug, trace};
23
24pub(super) type BackfillJobResult<T> = Result<T, BlockExecutionError>;
25
26#[derive(Debug)]
32pub struct BackfillJob<E, P> {
33 pub(crate) evm_config: E,
34 pub(crate) provider: P,
35 pub(crate) prune_modes: PruneModes,
36 pub(crate) thresholds: ExecutionStageThresholds,
37 pub(crate) range: RangeInclusive<BlockNumber>,
38 pub(crate) stream_parallelism: usize,
39}
40
41impl<E, P> Iterator for BackfillJob<E, P>
42where
43 E: ConfigureEvm<Primitives: NodePrimitives<Block = P::Block>> + 'static,
44 P: HeaderProvider + BlockReader<Transaction: SignedTransaction> + StateProviderFactory,
45{
46 type Item = BackfillJobResult<Chain<E::Primitives>>;
47
48 fn next(&mut self) -> Option<Self::Item> {
49 if self.range.is_empty() {
50 return None
51 }
52
53 Some(self.execute_range())
54 }
55}
56
57impl<E, P> BackfillJob<E, P>
58where
59 E: ConfigureEvm<Primitives: NodePrimitives<Block = P::Block>> + 'static,
60 P: BlockReader<Transaction: SignedTransaction> + HeaderProvider + StateProviderFactory,
61{
62 pub fn into_single_blocks(self) -> SingleBlockBackfillJob<E, P> {
64 self.into()
65 }
66
67 pub fn into_stream(self) -> StreamBackfillJob<E, P, Chain<E::Primitives>> {
69 self.into()
70 }
71
72 fn execute_range(&mut self) -> BackfillJobResult<Chain<E::Primitives>> {
73 debug!(
74 target: "exex::backfill",
75 range = ?self.range,
76 "Executing block range"
77 );
78
79 let mut executor = self.evm_config.batch_executor(StateProviderDatabase::new(
80 self.provider
81 .history_by_block_number(self.range.start().saturating_sub(1))
82 .map_err(BlockExecutionError::other)?,
83 ));
84
85 let mut fetch_block_duration = Duration::default();
86 let mut execution_duration = Duration::default();
87 let mut cumulative_gas = 0;
88 let batch_start = Instant::now();
89
90 let mut blocks = Vec::new();
91 let mut results = Vec::new();
92 for block_number in self.range.clone() {
93 let fetch_block_start = Instant::now();
95
96 let block = self
98 .provider
99 .sealed_block_with_senders(block_number.into(), TransactionVariant::WithHash)
100 .map_err(BlockExecutionError::other)?
101 .ok_or_else(|| ProviderError::HeaderNotFound(block_number.into()))
102 .map_err(BlockExecutionError::other)?;
103
104 fetch_block_duration += fetch_block_start.elapsed();
105
106 cumulative_gas += block.gas_used();
107
108 trace!(target: "exex::backfill", number = block_number, txs = block.body().transactions().len(), "Executing block");
110
111 let execute_start = Instant::now();
113
114 let (block, senders) = block.split_sealed();
116 let (header, body) = block.split_sealed_header_body();
117 let block = P::Block::new_sealed(header, body).with_senders(senders);
118
119 results.push(executor.execute_one(&block)?);
120 execution_duration += execute_start.elapsed();
121
122 blocks.push(block);
124 if self.thresholds.is_end_of_batch(
126 block_number - *self.range.start() + 1,
127 executor.size_hint() as u64,
128 cumulative_gas,
129 batch_start.elapsed(),
130 ) {
131 break
132 }
133 }
134
135 let first_block_number = blocks.first().expect("blocks should not be empty").number();
136 let last_block_number = blocks.last().expect("blocks should not be empty").number();
137 debug!(
138 target: "exex::backfill",
139 range = ?*self.range.start()..=last_block_number,
140 block_fetch = ?fetch_block_duration,
141 execution = ?execution_duration,
142 throughput = format_gas_throughput(cumulative_gas, execution_duration),
143 "Finished executing block range"
144 );
145 self.range = last_block_number + 1..=*self.range.end();
146
147 let outcome = ExecutionOutcome::from_blocks(
148 first_block_number,
149 executor.into_state().take_bundle(),
150 results,
151 );
152 let chain = Chain::new(blocks, outcome, BTreeMap::new());
153 Ok(chain)
154 }
155}
156
157#[derive(Debug, Clone)]
162pub struct SingleBlockBackfillJob<E, P> {
163 pub(crate) evm_config: E,
164 pub(crate) provider: P,
165 pub(crate) range: RangeInclusive<BlockNumber>,
166 pub(crate) stream_parallelism: usize,
167}
168
169impl<E, P> Iterator for SingleBlockBackfillJob<E, P>
170where
171 E: ConfigureEvm<Primitives: NodePrimitives<Block = P::Block>> + 'static,
172 P: HeaderProvider + BlockReader + StateProviderFactory,
173{
174 type Item = BackfillJobResult<(
175 RecoveredBlock<P::Block>,
176 BlockExecutionOutput<<E::Primitives as NodePrimitives>::Receipt>,
177 )>;
178
179 fn next(&mut self) -> Option<Self::Item> {
180 self.range.next().map(|block_number| self.execute_block(block_number))
181 }
182}
183
184impl<E, P> SingleBlockBackfillJob<E, P>
185where
186 E: ConfigureEvm<Primitives: NodePrimitives<Block = P::Block>> + 'static,
187 P: HeaderProvider + BlockReader + StateProviderFactory,
188{
189 pub fn into_stream(
191 self,
192 ) -> StreamBackfillJob<
193 E,
194 P,
195 (RecoveredBlock<reth_ethereum_primitives::Block>, BlockExecutionOutput<Receipt>),
196 > {
197 self.into()
198 }
199
200 #[expect(clippy::type_complexity)]
201 pub(crate) fn execute_block(
202 &self,
203 block_number: u64,
204 ) -> BackfillJobResult<(
205 RecoveredBlock<P::Block>,
206 BlockExecutionOutput<<E::Primitives as NodePrimitives>::Receipt>,
207 )> {
208 let block_with_senders = self
210 .provider
211 .recovered_block(block_number.into(), TransactionVariant::WithHash)
212 .map_err(BlockExecutionError::other)?
213 .ok_or_else(|| ProviderError::HeaderNotFound(block_number.into()))
214 .map_err(BlockExecutionError::other)?;
215
216 let executor = self.evm_config.batch_executor(StateProviderDatabase::new(
218 self.provider
219 .history_by_block_number(block_number.saturating_sub(1))
220 .map_err(BlockExecutionError::other)?,
221 ));
222
223 trace!(target: "exex::backfill", number = block_number, txs = block_with_senders.body().transaction_count(), "Executing block");
224
225 let block_execution_output = executor.execute(&block_with_senders)?;
226
227 Ok((block_with_senders, block_execution_output))
228 }
229}
230
231impl<E, P> From<BackfillJob<E, P>> for SingleBlockBackfillJob<E, P> {
232 fn from(job: BackfillJob<E, P>) -> Self {
233 Self {
234 evm_config: job.evm_config,
235 provider: job.provider,
236 range: job.range,
237 stream_parallelism: job.stream_parallelism,
238 }
239 }
240}
241
242#[cfg(test)]
243mod tests {
244 use crate::{
245 backfill::{
246 job::ExecutionStageThresholds,
247 test_utils::{blocks_and_execution_outputs, chain_spec, to_execution_outcome},
248 },
249 BackfillJobFactory,
250 };
251 use reth_db_common::init::init_genesis;
252 use reth_evm_ethereum::EthEvmConfig;
253 use reth_primitives_traits::crypto::secp256k1::public_key_to_address;
254 use reth_provider::{
255 providers::BlockchainProvider, test_utils::create_test_provider_factory_with_chain_spec,
256 };
257 use reth_testing_utils::generators;
258
259 #[test]
260 fn test_backfill() -> eyre::Result<()> {
261 reth_tracing::init_test_tracing();
262
263 let key_pair = generators::generate_key(&mut generators::rng());
265 let address = public_key_to_address(key_pair.public_key());
266
267 let chain_spec = chain_spec(address);
268
269 let executor = EthEvmConfig::ethereum(chain_spec.clone());
270 let provider_factory = create_test_provider_factory_with_chain_spec(chain_spec.clone());
271 init_genesis(&provider_factory)?;
272 let blockchain_db = BlockchainProvider::new(provider_factory.clone())?;
273
274 let blocks_and_execution_outputs =
275 blocks_and_execution_outputs(provider_factory, chain_spec, key_pair)?;
276 let (block, block_execution_output) = blocks_and_execution_outputs.first().unwrap();
277 let execution_outcome = to_execution_outcome(block.number, block_execution_output);
278
279 let factory = BackfillJobFactory::new(executor, blockchain_db);
281 let job = factory.backfill(1..=1);
282 let chains = job.collect::<Result<Vec<_>, _>>()?;
283
284 assert_eq!(chains.len(), 1);
287 let mut chain = chains.into_iter().next().unwrap();
288 chain.execution_outcome_mut().bundle.reverts.sort();
289 assert_eq!(chain.blocks(), &[(1, block.clone())].into());
290 assert_eq!(chain.execution_outcome(), &execution_outcome);
291
292 Ok(())
293 }
294
295 #[test]
296 fn test_single_block_backfill() -> eyre::Result<()> {
297 reth_tracing::init_test_tracing();
298
299 let key_pair = generators::generate_key(&mut generators::rng());
301 let address = public_key_to_address(key_pair.public_key());
302
303 let chain_spec = chain_spec(address);
304
305 let executor = EthEvmConfig::ethereum(chain_spec.clone());
306 let provider_factory = create_test_provider_factory_with_chain_spec(chain_spec.clone());
307 init_genesis(&provider_factory)?;
308 let blockchain_db = BlockchainProvider::new(provider_factory.clone())?;
309
310 let blocks_and_execution_outcomes =
311 blocks_and_execution_outputs(provider_factory, chain_spec, key_pair)?;
312
313 let factory = BackfillJobFactory::new(executor, blockchain_db);
315 let job = factory.backfill(1..=1);
316 let single_job = job.into_single_blocks();
317 let block_execution_it = single_job.into_iter();
318
319 let blocks_and_outcomes = block_execution_it.collect::<Vec<_>>();
321 assert_eq!(blocks_and_outcomes.len(), 1);
322
323 for (i, res) in blocks_and_outcomes.into_iter().enumerate() {
326 let (block, mut execution_output) = res?;
327 execution_output.state.reverts.sort();
328
329 let expected_block = blocks_and_execution_outcomes[i].0.clone();
330 let expected_output = &blocks_and_execution_outcomes[i].1;
331
332 assert_eq!(block, expected_block);
333 assert_eq!(&execution_output, expected_output);
334 }
335
336 Ok(())
337 }
338
339 #[test]
340 fn test_backfill_with_batch_threshold() -> eyre::Result<()> {
341 reth_tracing::init_test_tracing();
342
343 let key_pair = generators::generate_key(&mut generators::rng());
345 let address = public_key_to_address(key_pair.public_key());
346
347 let chain_spec = chain_spec(address);
348
349 let executor = EthEvmConfig::ethereum(chain_spec.clone());
350 let provider_factory = create_test_provider_factory_with_chain_spec(chain_spec.clone());
351 init_genesis(&provider_factory)?;
352 let blockchain_db = BlockchainProvider::new(provider_factory.clone())?;
353
354 let blocks_and_execution_outputs =
355 blocks_and_execution_outputs(provider_factory, chain_spec, key_pair)?;
356 let (block1, output1) = blocks_and_execution_outputs[0].clone();
357 let (block2, output2) = blocks_and_execution_outputs[1].clone();
358
359 let factory = BackfillJobFactory::new(executor, blockchain_db).with_thresholds(
361 ExecutionStageThresholds { max_blocks: Some(1), ..Default::default() },
362 );
363 let job = factory.backfill(1..=2);
364 let chains = job.collect::<Result<Vec<_>, _>>()?;
365
366 assert_eq!(chains.len(), 2);
368
369 let mut chain1 = chains[0].clone();
370 chain1.execution_outcome_mut().bundle.reverts.sort();
371 assert_eq!(chain1.blocks(), &[(1, block1)].into());
372 assert_eq!(chain1.execution_outcome(), &to_execution_outcome(1, &output1));
373
374 let mut chain2 = chains[1].clone();
375 chain2.execution_outcome_mut().bundle.reverts.sort();
376 assert_eq!(chain2.blocks(), &[(2, block2)].into());
377 assert_eq!(chain2.execution_outcome(), &to_execution_outcome(2, &output2));
378
379 Ok(())
380 }
381}