1use crate::StreamBackfillJob;
2use std::{
3 ops::RangeInclusive,
4 time::{Duration, Instant},
5};
6
7use alloy_consensus::BlockHeader;
8use alloy_primitives::BlockNumber;
9use reth_evm::execute::{
10 BlockExecutionError, BlockExecutionOutput, BlockExecutorProvider, Executor,
11};
12use reth_node_api::{Block as _, BlockBody as _, NodePrimitives};
13use reth_primitives::{Receipt, RecoveredBlock};
14use reth_primitives_traits::{format_gas_throughput, SignedTransaction};
15use reth_provider::{
16 BlockReader, Chain, ExecutionOutcome, HeaderProvider, ProviderError, StateProviderFactory,
17 TransactionVariant,
18};
19use reth_prune_types::PruneModes;
20use reth_revm::database::StateProviderDatabase;
21use reth_stages_api::ExecutionStageThresholds;
22use reth_tracing::tracing::{debug, trace};
23
24pub(super) type BackfillJobResult<T> = Result<T, BlockExecutionError>;
25
26#[derive(Debug)]
32pub struct BackfillJob<E, P> {
33 pub(crate) executor: E,
34 pub(crate) provider: P,
35 pub(crate) prune_modes: PruneModes,
36 pub(crate) thresholds: ExecutionStageThresholds,
37 pub(crate) range: RangeInclusive<BlockNumber>,
38 pub(crate) stream_parallelism: usize,
39}
40
41impl<E, P> Iterator for BackfillJob<E, P>
42where
43 E: BlockExecutorProvider<Primitives: NodePrimitives<Block = P::Block>>,
44 P: HeaderProvider + BlockReader<Transaction: SignedTransaction> + StateProviderFactory,
45{
46 type Item = BackfillJobResult<Chain<E::Primitives>>;
47
48 fn next(&mut self) -> Option<Self::Item> {
49 if self.range.is_empty() {
50 return None
51 }
52
53 Some(self.execute_range())
54 }
55}
56
57impl<E, P> BackfillJob<E, P>
58where
59 E: BlockExecutorProvider<Primitives: NodePrimitives<Block = P::Block>>,
60 P: BlockReader<Transaction: SignedTransaction> + HeaderProvider + StateProviderFactory,
61{
62 pub fn into_single_blocks(self) -> SingleBlockBackfillJob<E, P> {
64 self.into()
65 }
66
67 pub fn into_stream(self) -> StreamBackfillJob<E, P, Chain<E::Primitives>> {
69 self.into()
70 }
71
72 fn execute_range(&mut self) -> BackfillJobResult<Chain<E::Primitives>> {
73 debug!(
74 target: "exex::backfill",
75 range = ?self.range,
76 "Executing block range"
77 );
78
79 let mut executor = self.executor.executor(StateProviderDatabase::new(
80 self.provider
81 .history_by_block_number(self.range.start().saturating_sub(1))
82 .map_err(BlockExecutionError::other)?,
83 ));
84
85 let mut fetch_block_duration = Duration::default();
86 let mut execution_duration = Duration::default();
87 let mut cumulative_gas = 0;
88 let batch_start = Instant::now();
89
90 let mut blocks = Vec::new();
91 let mut results = Vec::new();
92 for block_number in self.range.clone() {
93 let fetch_block_start = Instant::now();
95
96 let block = self
98 .provider
99 .sealed_block_with_senders(block_number.into(), TransactionVariant::WithHash)
100 .map_err(BlockExecutionError::other)?
101 .ok_or_else(|| ProviderError::HeaderNotFound(block_number.into()))
102 .map_err(BlockExecutionError::other)?;
103
104 fetch_block_duration += fetch_block_start.elapsed();
105
106 cumulative_gas += block.gas_used();
107
108 trace!(target: "exex::backfill", number = block_number, txs = block.body().transactions().len(), "Executing block");
110
111 let execute_start = Instant::now();
113
114 let (block, senders) = block.split_sealed();
116 let (header, body) = block.split_sealed_header_body();
117 let block = P::Block::new_sealed(header, body).with_senders(senders);
118
119 results.push(executor.execute_one(&block)?);
120 execution_duration += execute_start.elapsed();
121
122 blocks.push(block);
126 if self.thresholds.is_end_of_batch(
128 block_number - *self.range.start(),
129 executor.size_hint() as u64,
130 cumulative_gas,
131 batch_start.elapsed(),
132 ) {
133 break
134 }
135 }
136
137 let first_block_number = blocks.first().expect("blocks should not be empty").number();
138 let last_block_number = blocks.last().expect("blocks should not be empty").number();
139 debug!(
140 target: "exex::backfill",
141 range = ?*self.range.start()..=last_block_number,
142 block_fetch = ?fetch_block_duration,
143 execution = ?execution_duration,
144 throughput = format_gas_throughput(cumulative_gas, execution_duration),
145 "Finished executing block range"
146 );
147 self.range = last_block_number + 1..=*self.range.end();
148
149 let outcome = ExecutionOutcome::from_blocks(
150 first_block_number,
151 executor.into_state().take_bundle(),
152 results,
153 );
154 let chain = Chain::new(blocks, outcome, None);
155 Ok(chain)
156 }
157}
158
159#[derive(Debug, Clone)]
164pub struct SingleBlockBackfillJob<E, P> {
165 pub(crate) executor: E,
166 pub(crate) provider: P,
167 pub(crate) range: RangeInclusive<BlockNumber>,
168 pub(crate) stream_parallelism: usize,
169}
170
171impl<E, P> Iterator for SingleBlockBackfillJob<E, P>
172where
173 E: BlockExecutorProvider<Primitives: NodePrimitives<Block = P::Block>>,
174 P: HeaderProvider + BlockReader + StateProviderFactory,
175{
176 type Item = BackfillJobResult<(
177 RecoveredBlock<P::Block>,
178 BlockExecutionOutput<<E::Primitives as NodePrimitives>::Receipt>,
179 )>;
180
181 fn next(&mut self) -> Option<Self::Item> {
182 self.range.next().map(|block_number| self.execute_block(block_number))
183 }
184}
185
186impl<E, P> SingleBlockBackfillJob<E, P>
187where
188 E: BlockExecutorProvider<Primitives: NodePrimitives<Block = P::Block>>,
189 P: HeaderProvider + BlockReader + StateProviderFactory,
190{
191 pub fn into_stream(
193 self,
194 ) -> StreamBackfillJob<
195 E,
196 P,
197 (RecoveredBlock<reth_primitives::Block>, BlockExecutionOutput<Receipt>),
198 > {
199 self.into()
200 }
201
202 #[expect(clippy::type_complexity)]
203 pub(crate) fn execute_block(
204 &self,
205 block_number: u64,
206 ) -> BackfillJobResult<(
207 RecoveredBlock<P::Block>,
208 BlockExecutionOutput<<E::Primitives as NodePrimitives>::Receipt>,
209 )> {
210 let block_with_senders = self
212 .provider
213 .recovered_block(block_number.into(), TransactionVariant::WithHash)
214 .map_err(BlockExecutionError::other)?
215 .ok_or_else(|| ProviderError::HeaderNotFound(block_number.into()))
216 .map_err(BlockExecutionError::other)?;
217
218 let executor = self.executor.executor(StateProviderDatabase::new(
220 self.provider
221 .history_by_block_number(block_number.saturating_sub(1))
222 .map_err(BlockExecutionError::other)?,
223 ));
224
225 trace!(target: "exex::backfill", number = block_number, txs = block_with_senders.body().transaction_count(), "Executing block");
226
227 let block_execution_output = executor.execute(&block_with_senders)?;
228
229 Ok((block_with_senders, block_execution_output))
230 }
231}
232
233impl<E, P> From<BackfillJob<E, P>> for SingleBlockBackfillJob<E, P> {
234 fn from(job: BackfillJob<E, P>) -> Self {
235 Self {
236 executor: job.executor,
237 provider: job.provider,
238 range: job.range,
239 stream_parallelism: job.stream_parallelism,
240 }
241 }
242}
243
244#[cfg(test)]
245mod tests {
246 use crate::{
247 backfill::test_utils::{blocks_and_execution_outputs, chain_spec, to_execution_outcome},
248 BackfillJobFactory,
249 };
250 use reth_db_common::init::init_genesis;
251 use reth_evm_ethereum::execute::EthExecutorProvider;
252 use reth_primitives_traits::crypto::secp256k1::public_key_to_address;
253 use reth_provider::{
254 providers::BlockchainProvider, test_utils::create_test_provider_factory_with_chain_spec,
255 };
256 use reth_testing_utils::generators;
257 use secp256k1::Keypair;
258
259 #[test]
260 fn test_backfill() -> eyre::Result<()> {
261 reth_tracing::init_test_tracing();
262
263 let key_pair = Keypair::new_global(&mut generators::rng());
265 let address = public_key_to_address(key_pair.public_key());
266
267 let chain_spec = chain_spec(address);
268
269 let executor = EthExecutorProvider::ethereum(chain_spec.clone());
270 let provider_factory = create_test_provider_factory_with_chain_spec(chain_spec.clone());
271 init_genesis(&provider_factory)?;
272 let blockchain_db = BlockchainProvider::new(provider_factory.clone())?;
273
274 let blocks_and_execution_outputs =
275 blocks_and_execution_outputs(provider_factory, chain_spec, key_pair)?;
276 let (block, block_execution_output) = blocks_and_execution_outputs.first().unwrap();
277 let execution_outcome = to_execution_outcome(block.number, block_execution_output);
278
279 let factory = BackfillJobFactory::new(executor, blockchain_db);
281 let job = factory.backfill(1..=1);
282 let chains = job.collect::<Result<Vec<_>, _>>()?;
283
284 assert_eq!(chains.len(), 1);
287 let mut chain = chains.into_iter().next().unwrap();
288 chain.execution_outcome_mut().bundle.reverts.sort();
289 assert_eq!(chain.blocks(), &[(1, block.clone())].into());
290 assert_eq!(chain.execution_outcome(), &execution_outcome);
291
292 Ok(())
293 }
294
295 #[test]
296 fn test_single_block_backfill() -> eyre::Result<()> {
297 reth_tracing::init_test_tracing();
298
299 let key_pair = Keypair::new_global(&mut generators::rng());
301 let address = public_key_to_address(key_pair.public_key());
302
303 let chain_spec = chain_spec(address);
304
305 let executor = EthExecutorProvider::ethereum(chain_spec.clone());
306 let provider_factory = create_test_provider_factory_with_chain_spec(chain_spec.clone());
307 init_genesis(&provider_factory)?;
308 let blockchain_db = BlockchainProvider::new(provider_factory.clone())?;
309
310 let blocks_and_execution_outcomes =
311 blocks_and_execution_outputs(provider_factory, chain_spec, key_pair)?;
312
313 let factory = BackfillJobFactory::new(executor, blockchain_db);
315 let job = factory.backfill(1..=1);
316 let single_job = job.into_single_blocks();
317 let block_execution_it = single_job.into_iter();
318
319 let blocks_and_outcomes = block_execution_it.collect::<Vec<_>>();
321 assert_eq!(blocks_and_outcomes.len(), 1);
322
323 for (i, res) in blocks_and_outcomes.into_iter().enumerate() {
326 let (block, mut execution_output) = res?;
327 execution_output.state.reverts.sort();
328
329 let expected_block = blocks_and_execution_outcomes[i].0.clone();
330 let expected_output = &blocks_and_execution_outcomes[i].1;
331
332 assert_eq!(block, expected_block);
333 assert_eq!(&execution_output, expected_output);
334 }
335
336 Ok(())
337 }
338}