1use super::job::BackfillJobResult;
2use crate::{BackfillJob, SingleBlockBackfillJob};
3use alloy_primitives::BlockNumber;
4use futures::{
5 stream::{FuturesOrdered, Stream},
6 StreamExt,
7};
8use reth_ethereum_primitives::EthPrimitives;
9use reth_evm::{
10 execute::{BlockExecutionError, BlockExecutionOutput},
11 ConfigureEvm,
12};
13use reth_node_api::NodePrimitives;
14use reth_primitives_traits::RecoveredBlock;
15use reth_provider::{BlockReader, Chain, StateProviderFactory};
16use reth_prune_types::PruneModes;
17use reth_stages_api::ExecutionStageThresholds;
18use reth_tracing::tracing::debug;
19use std::{
20 ops::RangeInclusive,
21 pin::Pin,
22 task::{ready, Context, Poll},
23};
24use tokio::task::JoinHandle;
25
26pub(crate) const DEFAULT_PARALLELISM: usize = 4;
28const DEFAULT_BATCH_SIZE: usize = 100;
30
31type BackfillTaskIterator<T> =
33 Box<dyn Iterator<Item = BackfillJobResult<T>> + Send + Sync + 'static>;
34
35struct BackfillTaskOutput<T> {
37 job: BackfillTaskIterator<T>,
38 result: Option<BackfillJobResult<T>>,
39}
40
41type BackfillTasks<T> = FuturesOrdered<JoinHandle<BackfillTaskOutput<T>>>;
43
44type SingleBlockStreamItem<N = EthPrimitives> = (
45 RecoveredBlock<<N as NodePrimitives>::Block>,
46 BlockExecutionOutput<<N as NodePrimitives>::Receipt>,
47);
48type BatchBlockStreamItem<N = EthPrimitives> = Chain<N>;
49
50#[derive(Debug)]
55pub struct StreamBackfillJob<E, P, T> {
56 evm_config: E,
57 provider: P,
58 prune_modes: PruneModes,
59 range: RangeInclusive<BlockNumber>,
60 tasks: BackfillTasks<T>,
61 parallelism: usize,
62 batch_size: usize,
63 thresholds: ExecutionStageThresholds,
64}
65
66impl<E, P, T> StreamBackfillJob<E, P, T>
67where
68 T: Send + Sync + 'static,
69{
70 pub const fn with_parallelism(mut self, parallelism: usize) -> Self {
72 self.parallelism = parallelism;
73 self
74 }
75
76 pub const fn with_batch_size(mut self, batch_size: usize) -> Self {
78 self.batch_size = batch_size;
79 self
80 }
81
82 fn push_back(&mut self, mut job: BackfillTaskIterator<T>) {
85 self.tasks.push_back(tokio::task::spawn_blocking(move || BackfillTaskOutput {
86 result: job.next(),
87 job,
88 }));
89 }
90
91 fn push_front(&mut self, mut job: BackfillTaskIterator<T>) {
94 self.tasks.push_front(tokio::task::spawn_blocking(move || BackfillTaskOutput {
95 result: job.next(),
96 job,
97 }));
98 }
99
100 fn poll_next_task(&mut self, cx: &mut Context<'_>) -> Poll<Option<BackfillJobResult<T>>> {
102 while let Some(res) = ready!(self.tasks.poll_next_unpin(cx)) {
103 let task_result = res.map_err(BlockExecutionError::other)?;
104
105 if let BackfillTaskOutput { result: Some(job_result), job } = task_result {
106 self.push_front(job);
110
111 return Poll::Ready(Some(job_result))
112 };
113 }
114
115 Poll::Ready(None)
116 }
117}
118
119impl<E, P> Stream for StreamBackfillJob<E, P, SingleBlockStreamItem<E::Primitives>>
120where
121 E: ConfigureEvm<Primitives: NodePrimitives<Block = P::Block>> + 'static,
122 P: BlockReader + StateProviderFactory + Clone + Unpin + 'static,
123{
124 type Item = BackfillJobResult<SingleBlockStreamItem<E::Primitives>>;
125
126 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
127 let this = self.get_mut();
128
129 while this.tasks.len() < this.parallelism {
131 let Some(block_number) = this.range.next() else {
133 debug!(target: "exex::backfill", tasks = %this.tasks.len(), range = ?this.range, "No more single blocks to backfill");
134 break;
135 };
136
137 debug!(target: "exex::backfill", tasks = %this.tasks.len(), ?block_number, "Spawning new single block backfill task");
139 let job = Box::new(SingleBlockBackfillJob {
140 evm_config: this.evm_config.clone(),
141 provider: this.provider.clone(),
142 range: block_number..=block_number,
143 stream_parallelism: this.parallelism,
144 }) as BackfillTaskIterator<_>;
145 this.push_back(job);
146 }
147
148 this.poll_next_task(cx)
149 }
150}
151
152impl<E, P> Stream for StreamBackfillJob<E, P, BatchBlockStreamItem<E::Primitives>>
153where
154 E: ConfigureEvm<Primitives: NodePrimitives<Block = P::Block>> + 'static,
155 P: BlockReader + StateProviderFactory + Clone + Unpin + 'static,
156{
157 type Item = BackfillJobResult<BatchBlockStreamItem<E::Primitives>>;
158
159 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
160 let this = self.get_mut();
161
162 loop {
163 while this.tasks.len() < this.parallelism {
165 let mut range = this.range.by_ref().take(this.batch_size);
167 let start = range.next();
168 let range_bounds = start.zip(range.last().or(start));
169
170 let Some(range) = range_bounds.map(|(first, last)| first..=last) else {
172 debug!(target: "exex::backfill", tasks = %this.tasks.len(), range = ?this.range, "No more block batches to backfill");
173 break;
174 };
175
176 debug!(target: "exex::backfill", tasks = %this.tasks.len(), ?range, "Spawning new block batch backfill task");
178 let job = Box::new(BackfillJob {
179 evm_config: this.evm_config.clone(),
180 provider: this.provider.clone(),
181 prune_modes: this.prune_modes.clone(),
182 thresholds: this.thresholds.clone(),
183 range,
184 stream_parallelism: this.parallelism,
185 }) as BackfillTaskIterator<_>;
186 this.push_back(job);
187 }
188
189 let res = ready!(this.poll_next_task(cx));
190
191 if res.is_some() {
192 return Poll::Ready(res);
193 }
194
195 if this.range.is_empty() {
196 return Poll::Ready(None);
198 }
199 }
200 }
201}
202
203impl<E, P> From<SingleBlockBackfillJob<E, P>> for StreamBackfillJob<E, P, SingleBlockStreamItem> {
204 fn from(job: SingleBlockBackfillJob<E, P>) -> Self {
205 Self {
206 evm_config: job.evm_config,
207 provider: job.provider,
208 prune_modes: PruneModes::default(),
209 range: job.range,
210 tasks: FuturesOrdered::new(),
211 parallelism: job.stream_parallelism,
212 batch_size: 1,
213 thresholds: ExecutionStageThresholds { max_blocks: Some(1), ..Default::default() },
214 }
215 }
216}
217
218impl<E, P> From<BackfillJob<E, P>> for StreamBackfillJob<E, P, BatchBlockStreamItem<E::Primitives>>
219where
220 E: ConfigureEvm,
221{
222 fn from(job: BackfillJob<E, P>) -> Self {
223 let batch_size = job.thresholds.max_blocks.map_or(DEFAULT_BATCH_SIZE, |max| max as usize);
224 Self {
225 evm_config: job.evm_config,
226 provider: job.provider,
227 prune_modes: job.prune_modes,
228 range: job.range,
229 tasks: FuturesOrdered::new(),
230 parallelism: job.stream_parallelism,
231 batch_size,
232 thresholds: ExecutionStageThresholds {
233 max_blocks: Some(batch_size as u64),
234 ..job.thresholds
235 },
236 }
237 }
238}
239
240#[cfg(test)]
241mod tests {
242 use super::*;
243 use crate::{
244 backfill::test_utils::{
245 blocks_and_execution_outcome, blocks_and_execution_outputs, chain_spec,
246 execute_block_and_commit_to_database,
247 },
248 BackfillJobFactory,
249 };
250 use alloy_consensus::{constants::ETH_TO_WEI, Header, TxEip2930};
251 use alloy_primitives::{b256, Address, TxKind, U256};
252 use eyre::Result;
253 use futures::StreamExt;
254 use reth_chainspec::{ChainSpec, EthereumHardfork, MIN_TRANSACTION_GAS};
255 use reth_db_common::init::init_genesis;
256 use reth_ethereum_primitives::{Block, BlockBody, Transaction};
257 use reth_evm_ethereum::EthEvmConfig;
258 use reth_primitives_traits::{
259 crypto::secp256k1::public_key_to_address, Block as _, FullNodePrimitives,
260 };
261 use reth_provider::{
262 providers::{BlockchainProvider, ProviderNodeTypes},
263 test_utils::create_test_provider_factory_with_chain_spec,
264 ProviderFactory,
265 };
266 use reth_stages_api::ExecutionStageThresholds;
267 use reth_testing_utils::{generators, generators::sign_tx_with_key_pair};
268 use secp256k1::Keypair;
269 use std::sync::Arc;
270
271 #[tokio::test]
272 async fn test_single_blocks() -> eyre::Result<()> {
273 reth_tracing::init_test_tracing();
274
275 let key_pair = generators::generate_key(&mut generators::rng());
277 let address = public_key_to_address(key_pair.public_key());
278
279 let chain_spec = chain_spec(address);
280
281 let executor = EthEvmConfig::ethereum(chain_spec.clone());
282 let provider_factory = create_test_provider_factory_with_chain_spec(chain_spec.clone());
283 init_genesis(&provider_factory)?;
284 let blockchain_db = BlockchainProvider::new(provider_factory.clone())?;
285
286 let blocks_and_execution_outcomes =
288 blocks_and_execution_outputs(provider_factory, chain_spec, key_pair)?;
289
290 let factory = BackfillJobFactory::new(executor.clone(), blockchain_db.clone());
292 let mut backfill_stream = factory.backfill(1..=1).into_single_blocks().into_stream();
293
294 let (block, mut execution_output) = backfill_stream.next().await.unwrap().unwrap();
296 execution_output.state.reverts.sort();
297 let expected_block = blocks_and_execution_outcomes[0].0.clone();
298 let expected_output = &blocks_and_execution_outcomes[0].1;
299 assert_eq!(block, expected_block);
300 assert_eq!(&execution_output, expected_output);
301
302 assert!(backfill_stream.next().await.is_none());
304
305 Ok(())
306 }
307
308 #[tokio::test]
309 async fn test_batch() -> eyre::Result<()> {
310 reth_tracing::init_test_tracing();
311
312 let key_pair = generators::generate_key(&mut generators::rng());
314 let address = public_key_to_address(key_pair.public_key());
315
316 let chain_spec = chain_spec(address);
317
318 let executor = EthEvmConfig::ethereum(chain_spec.clone());
319 let provider_factory = create_test_provider_factory_with_chain_spec(chain_spec.clone());
320 init_genesis(&provider_factory)?;
321 let blockchain_db = BlockchainProvider::new(provider_factory.clone())?;
322
323 let (blocks, execution_outcome) =
325 blocks_and_execution_outcome(provider_factory, chain_spec, key_pair)?;
326
327 let factory = BackfillJobFactory::new(executor.clone(), blockchain_db.clone())
329 .with_thresholds(ExecutionStageThresholds { max_blocks: Some(2), ..Default::default() })
330 .with_stream_parallelism(1);
331 let mut backfill_stream = factory.backfill(1..=2).into_stream();
332 let mut chain = backfill_stream.next().await.unwrap().unwrap();
333 chain.execution_outcome_mut().state_mut().reverts.sort();
334
335 assert!(chain.blocks_iter().eq(&blocks));
336 assert_eq!(chain.execution_outcome(), &execution_outcome);
337
338 assert!(backfill_stream.next().await.is_none());
340
341 Ok(())
342 }
343
344 fn create_blocks(
345 chain_spec: &Arc<ChainSpec>,
346 key_pair: Keypair,
347 n: u64,
348 ) -> Result<Vec<RecoveredBlock<reth_ethereum_primitives::Block>>> {
349 let mut blocks = Vec::with_capacity(n as usize);
350 let mut parent_hash = chain_spec.genesis_hash();
351
352 for (i, nonce) in (1..=n).zip(0..n) {
353 let block = Block {
354 header: Header {
355 parent_hash,
356 receipts_root: b256!(
358 "0xd3a6acf9a244d78b33831df95d472c4128ea85bf079a1d41e32ed0b7d2244c9e"
359 ),
360 difficulty: chain_spec.fork(EthereumHardfork::Paris).ttd().expect("Paris TTD"),
361 number: i,
362 gas_limit: MIN_TRANSACTION_GAS,
363 gas_used: MIN_TRANSACTION_GAS,
364 ..Default::default()
365 },
366 body: BlockBody {
367 transactions: vec![sign_tx_with_key_pair(
368 key_pair,
369 Transaction::Eip2930(TxEip2930 {
370 chain_id: chain_spec.chain.id(),
371 nonce,
372 gas_limit: MIN_TRANSACTION_GAS,
373 gas_price: 1_500_000_000,
374 to: TxKind::Call(Address::ZERO),
375 value: U256::from(0.1 * ETH_TO_WEI as f64),
376 ..Default::default()
377 }),
378 )],
379 ..Default::default()
380 },
381 }
382 .try_into_recovered()?;
383
384 parent_hash = block.hash();
385 blocks.push(block);
386 }
387
388 Ok(blocks)
389 }
390
391 fn execute_and_commit_blocks<N>(
392 provider_factory: &ProviderFactory<N>,
393 chain_spec: &Arc<ChainSpec>,
394 blocks: &[RecoveredBlock<reth_ethereum_primitives::Block>],
395 ) -> Result<()>
396 where
397 N: ProviderNodeTypes<
398 Primitives: FullNodePrimitives<
399 Block = reth_ethereum_primitives::Block,
400 BlockBody = reth_ethereum_primitives::BlockBody,
401 Receipt = reth_ethereum_primitives::Receipt,
402 >,
403 >,
404 {
405 for block in blocks {
406 execute_block_and_commit_to_database(provider_factory, chain_spec.clone(), block)?;
407 }
408 Ok(())
409 }
410
411 #[tokio::test]
412 async fn test_batch_parallel_range_advance() -> Result<()> {
413 reth_tracing::init_test_tracing();
414
415 let key_pair = generators::generate_key(&mut generators::rng());
417 let address = public_key_to_address(key_pair.public_key());
418
419 let chain_spec = chain_spec(address);
420
421 let executor = EthEvmConfig::ethereum(chain_spec.clone());
422 let provider_factory = create_test_provider_factory_with_chain_spec(chain_spec.clone());
423 init_genesis(&provider_factory)?;
424 let blockchain_db = BlockchainProvider::new(provider_factory.clone())?;
425
426 let blocks = create_blocks(&chain_spec, key_pair, 4)?;
428 execute_and_commit_blocks(&provider_factory, &chain_spec, &blocks)?;
429
430 let factory = BackfillJobFactory::new(executor.clone(), blockchain_db.clone())
432 .with_thresholds(ExecutionStageThresholds { max_blocks: Some(2), ..Default::default() })
433 .with_stream_parallelism(2);
434
435 let mut backfill_stream = factory.backfill(1..=4).into_stream();
437
438 let mut chain1 = backfill_stream.next().await.unwrap()?;
440 let mut chain2 = backfill_stream.next().await.unwrap()?;
441 assert!(backfill_stream.next().await.is_none());
442
443 chain1.execution_outcome_mut().state_mut().reverts.sort();
445 chain2.execution_outcome_mut().state_mut().reverts.sort();
446
447 let factory_seq =
449 BackfillJobFactory::new(executor.clone(), blockchain_db.clone()).with_thresholds(
450 ExecutionStageThresholds { max_blocks: Some(2), ..Default::default() },
451 );
452
453 let mut expected_chain1 =
454 factory_seq.backfill(1..=2).collect::<Result<Vec<_>, _>>()?.into_iter().next().unwrap();
455 let mut expected_chain2 =
456 factory_seq.backfill(3..=4).collect::<Result<Vec<_>, _>>()?.into_iter().next().unwrap();
457
458 expected_chain1.execution_outcome_mut().state_mut().reverts.sort();
460 expected_chain2.execution_outcome_mut().state_mut().reverts.sort();
461
462 assert_eq!(chain1.blocks(), expected_chain1.blocks());
464 assert_eq!(chain1.execution_outcome(), expected_chain1.execution_outcome());
465 assert_eq!(chain2.blocks(), expected_chain2.blocks());
466 assert_eq!(chain2.execution_outcome(), expected_chain2.execution_outcome());
467
468 Ok(())
469 }
470}