Skip to main content

reth_stages/stages/
merkle.rs

1use alloy_consensus::{constants::KECCAK_EMPTY, BlockHeader};
2use alloy_primitives::{BlockNumber, Sealable, B256};
3use reth_codecs::Compact;
4use reth_consensus::ConsensusError;
5use reth_db_api::{
6    tables,
7    transaction::{DbTx, DbTxMut},
8};
9use reth_primitives_traits::{GotExpected, SealedHeader};
10use reth_provider::{
11    ChangeSetReader, DBProvider, HeaderProvider, ProviderError, StageCheckpointReader,
12    StageCheckpointWriter, StatsReader, StorageChangeSetReader, StorageSettingsCache, TrieWriter,
13};
14use reth_stages_api::{
15    BlockErrorKind, EntitiesCheckpoint, ExecInput, ExecOutput, MerkleCheckpoint, Stage,
16    StageCheckpoint, StageError, StageId, StorageRootMerkleCheckpoint, UnwindInput, UnwindOutput,
17};
18use reth_trie::{IntermediateStateRootState, StateRoot, StateRootProgress, StoredSubNode};
19use reth_trie_db::DatabaseStateRoot;
20use std::fmt::Debug;
21use tracing::*;
22
23// TODO: automate the process outlined below so the user can just send in a debugging package
24/// The error message that we include in invalid state root errors to tell users what information
25/// they should include in a bug report, since true state root errors can be impossible to debug
26/// with just basic logs.
27pub const INVALID_STATE_ROOT_ERROR_MESSAGE: &str = r#"
28Invalid state root error on stage verification!
29This is an error that likely requires a report to the reth team with additional information.
30Please include the following information in your report:
31 * This error message
32 * The state root of the block that was rejected
33 * The output of `reth db stats --checksum` from the database that was being used. This will take a long time to run!
34 * 50-100 lines of logs before and after the first occurrence of the log message with the state root of the block that was rejected.
35 * The debug logs from __the same time period__. To find the default location for these logs, run:
36   `reth --help | grep -A 4 'log.file.directory'`
37
38Once you have this information, please submit a github issue at https://github.com/paradigmxyz/reth/issues/new
39"#;
40
41/// The default threshold (in number of blocks) for switching from incremental trie building
42/// of changes to whole rebuild.
43pub const MERKLE_STAGE_DEFAULT_REBUILD_THRESHOLD: u64 = 100_000;
44
45/// The default threshold (in number of blocks) to run the stage in incremental mode. The
46/// incremental mode will calculate the state root for a large range of blocks by calculating the
47/// new state root for this many blocks, in batches, repeating until we reach the desired block
48/// number.
49pub const MERKLE_STAGE_DEFAULT_INCREMENTAL_THRESHOLD: u64 = 7_000;
50
51/// The merkle hashing stage uses input from
52/// [`AccountHashingStage`][crate::stages::AccountHashingStage] and
53/// [`StorageHashingStage`][crate::stages::StorageHashingStage] to calculate intermediate hashes
54/// and state roots.
55///
56/// This stage should be run with the above two stages, otherwise it is a no-op.
57///
58/// This stage is split in two: one for calculating hashes and one for unwinding.
59///
60/// When run in execution, it's going to be executed AFTER the hashing stages, to generate
61/// the state root. When run in unwind mode, it's going to be executed BEFORE the hashing stages,
62/// so that it unwinds the intermediate hashes based on the unwound hashed state from the hashing
63/// stages. The order of these two variants is important. The unwind variant should be added to the
64/// pipeline before the execution variant.
65///
66/// An example pipeline to only hash state would be:
67///
68/// - [`MerkleStage::Unwind`]
69/// - [`AccountHashingStage`][crate::stages::AccountHashingStage]
70/// - [`StorageHashingStage`][crate::stages::StorageHashingStage]
71/// - [`MerkleStage::Execution`]
72#[derive(Debug, Clone)]
73pub enum MerkleStage {
74    /// The execution portion of the merkle stage.
75    Execution {
76        // TODO: make struct for holding incremental settings, for code reuse between `Execution`
77        // variant and `Both`
78        /// The threshold (in number of blocks) for switching from incremental trie building
79        /// of changes to whole rebuild.
80        rebuild_threshold: u64,
81        /// The threshold (in number of blocks) to run the stage in incremental mode. The
82        /// incremental mode will calculate the state root by calculating the new state root for
83        /// some number of blocks, repeating until we reach the desired block number.
84        incremental_threshold: u64,
85    },
86    /// The unwind portion of the merkle stage.
87    Unwind,
88    /// Able to execute and unwind. Used for tests
89    #[cfg(any(test, feature = "test-utils"))]
90    Both {
91        /// The threshold (in number of blocks) for switching from incremental trie building
92        /// of changes to whole rebuild.
93        rebuild_threshold: u64,
94        /// The threshold (in number of blocks) to run the stage in incremental mode. The
95        /// incremental mode will calculate the state root by calculating the new state root for
96        /// some number of blocks, repeating until we reach the desired block number.
97        incremental_threshold: u64,
98    },
99}
100
101impl MerkleStage {
102    /// Stage default for the [`MerkleStage::Execution`].
103    pub const fn default_execution() -> Self {
104        Self::Execution {
105            rebuild_threshold: MERKLE_STAGE_DEFAULT_REBUILD_THRESHOLD,
106            incremental_threshold: MERKLE_STAGE_DEFAULT_INCREMENTAL_THRESHOLD,
107        }
108    }
109
110    /// Stage default for the [`MerkleStage::Unwind`].
111    pub const fn default_unwind() -> Self {
112        Self::Unwind
113    }
114
115    /// Create new instance of [`MerkleStage::Execution`].
116    pub const fn new_execution(rebuild_threshold: u64, incremental_threshold: u64) -> Self {
117        Self::Execution { rebuild_threshold, incremental_threshold }
118    }
119
120    /// Gets the hashing progress
121    pub fn get_execution_checkpoint(
122        &self,
123        provider: &impl StageCheckpointReader,
124    ) -> Result<Option<MerkleCheckpoint>, StageError> {
125        let buf =
126            provider.get_stage_checkpoint_progress(StageId::MerkleExecute)?.unwrap_or_default();
127
128        if buf.is_empty() {
129            return Ok(None)
130        }
131
132        let (checkpoint, _) = MerkleCheckpoint::from_compact(&buf, buf.len());
133        Ok(Some(checkpoint))
134    }
135
136    /// Saves the hashing progress
137    pub fn save_execution_checkpoint(
138        &self,
139        provider: &impl StageCheckpointWriter,
140        checkpoint: Option<MerkleCheckpoint>,
141    ) -> Result<(), StageError> {
142        let mut buf = vec![];
143        if let Some(checkpoint) = checkpoint {
144            debug!(
145                target: "sync::stages::merkle::exec",
146                last_account_key = ?checkpoint.last_account_key,
147                "Saving inner merkle checkpoint"
148            );
149            checkpoint.to_compact(&mut buf);
150        }
151        Ok(provider.save_stage_checkpoint_progress(StageId::MerkleExecute, buf)?)
152    }
153}
154
155impl<Provider> Stage<Provider> for MerkleStage
156where
157    Provider: DBProvider<Tx: DbTxMut>
158        + TrieWriter
159        + StatsReader
160        + HeaderProvider
161        + ChangeSetReader
162        + StorageChangeSetReader
163        + StorageSettingsCache
164        + StageCheckpointReader
165        + StageCheckpointWriter,
166{
167    /// Return the id of the stage
168    fn id(&self) -> StageId {
169        match self {
170            Self::Execution { .. } => StageId::MerkleExecute,
171            Self::Unwind => StageId::MerkleUnwind,
172            #[cfg(any(test, feature = "test-utils"))]
173            Self::Both { .. } => StageId::Other("MerkleBoth"),
174        }
175    }
176
177    /// Execute the stage.
178    fn execute(&mut self, provider: &Provider, input: ExecInput) -> Result<ExecOutput, StageError> {
179        let (threshold, incremental_threshold) = match self {
180            Self::Unwind => {
181                info!(target: "sync::stages::merkle::unwind", "Stage is always skipped");
182                return Ok(ExecOutput::done(StageCheckpoint::new(input.target())))
183            }
184            Self::Execution { rebuild_threshold, incremental_threshold } => {
185                (*rebuild_threshold, *incremental_threshold)
186            }
187            #[cfg(any(test, feature = "test-utils"))]
188            Self::Both { rebuild_threshold, incremental_threshold } => {
189                (*rebuild_threshold, *incremental_threshold)
190            }
191        };
192
193        let range = input.next_block_range();
194        let (from_block, to_block) = range.clone().into_inner();
195        let current_block_number = input.checkpoint().block_number;
196
197        let target_block = provider
198            .header_by_number(to_block)?
199            .ok_or_else(|| ProviderError::HeaderNotFound(to_block.into()))?;
200        let target_block_root = target_block.state_root();
201
202        let (trie_root, entities_checkpoint) = if range.is_empty() {
203            (target_block_root, input.checkpoint().entities_stage_checkpoint().unwrap_or_default())
204        } else if to_block - from_block > threshold || from_block == 1 {
205            let mut checkpoint = self.get_execution_checkpoint(provider)?;
206
207            // if there are more blocks than threshold it is faster to rebuild the trie
208            let mut entities_checkpoint = if let Some(checkpoint) =
209                checkpoint.as_ref().filter(|c| c.target_block == to_block)
210            {
211                debug!(
212                    target: "sync::stages::merkle::exec",
213                    current = ?current_block_number,
214                    target = ?to_block,
215                    last_account_key = ?checkpoint.last_account_key,
216                    "Continuing inner merkle checkpoint"
217                );
218
219                input.checkpoint().entities_stage_checkpoint()
220            } else {
221                debug!(
222                    target: "sync::stages::merkle::exec",
223                    current = ?current_block_number,
224                    target = ?to_block,
225                    previous_checkpoint = ?checkpoint,
226                    "Rebuilding trie"
227                );
228                // Reset the checkpoint and clear trie tables
229                checkpoint = None;
230                self.save_execution_checkpoint(provider, None)?;
231                provider.tx_ref().clear::<tables::AccountsTrie>()?;
232                provider.tx_ref().clear::<tables::StoragesTrie>()?;
233
234                None
235            }
236            .unwrap_or(EntitiesCheckpoint {
237                processed: 0,
238                total: (provider.count_entries::<tables::HashedAccounts>()? +
239                    provider.count_entries::<tables::HashedStorages>()?)
240                    as u64,
241            });
242
243            let tx = provider.tx_ref();
244            let progress = StateRoot::from_tx(tx)
245                .with_intermediate_state(checkpoint.map(IntermediateStateRootState::from))
246                .root_with_progress()
247                .map_err(|e| {
248                    error!(target: "sync::stages::merkle", %e, ?current_block_number, ?to_block, "State root with progress failed! {INVALID_STATE_ROOT_ERROR_MESSAGE}");
249                    StageError::Fatal(Box::new(e))
250                })?;
251            match progress {
252                StateRootProgress::Progress(state, hashed_entries_walked, updates) => {
253                    provider.write_trie_updates(updates)?;
254
255                    let mut checkpoint = MerkleCheckpoint::new(
256                        to_block,
257                        state.account_root_state.last_hashed_key,
258                        state
259                            .account_root_state
260                            .walker_stack
261                            .into_iter()
262                            .map(StoredSubNode::from)
263                            .collect(),
264                        state.account_root_state.hash_builder.into(),
265                    );
266
267                    // Save storage root state if present
268                    if let Some(storage_state) = state.storage_root_state {
269                        checkpoint.storage_root_checkpoint =
270                            Some(StorageRootMerkleCheckpoint::new(
271                                storage_state.state.last_hashed_key,
272                                storage_state
273                                    .state
274                                    .walker_stack
275                                    .into_iter()
276                                    .map(StoredSubNode::from)
277                                    .collect(),
278                                storage_state.state.hash_builder.into(),
279                                storage_state.account.nonce,
280                                storage_state.account.balance,
281                                storage_state.account.bytecode_hash.unwrap_or(KECCAK_EMPTY),
282                            ));
283                    }
284                    self.save_execution_checkpoint(provider, Some(checkpoint))?;
285
286                    entities_checkpoint.processed += hashed_entries_walked as u64;
287
288                    return Ok(ExecOutput {
289                        checkpoint: input
290                            .checkpoint()
291                            .with_entities_stage_checkpoint(entities_checkpoint),
292                        done: false,
293                    })
294                }
295                StateRootProgress::Complete(root, hashed_entries_walked, updates) => {
296                    provider.write_trie_updates(updates)?;
297
298                    entities_checkpoint.processed += hashed_entries_walked as u64;
299
300                    (root, entities_checkpoint)
301                }
302            }
303        } else {
304            debug!(target: "sync::stages::merkle::exec", current = ?current_block_number, target = ?to_block, "Updating trie in chunks");
305            let mut final_root = None;
306            for start_block in range.step_by(incremental_threshold as usize) {
307                let chunk_to = std::cmp::min(start_block + incremental_threshold, to_block);
308                let chunk_range = start_block..=chunk_to;
309                debug!(
310                    target: "sync::stages::merkle::exec",
311                    current = ?current_block_number,
312                    target = ?to_block,
313                    incremental_threshold,
314                    chunk_range = ?chunk_range,
315                    "Processing chunk"
316                );
317                let (root, updates) =
318                StateRoot::incremental_root_with_updates(provider, chunk_range)
319                    .map_err(|e| {
320                        error!(target: "sync::stages::merkle", %e, ?current_block_number, ?to_block, "Incremental state root failed! {INVALID_STATE_ROOT_ERROR_MESSAGE}");
321                        StageError::Fatal(Box::new(e))
322                    })?;
323                provider.write_trie_updates(updates)?;
324                final_root = Some(root);
325            }
326
327            // if we had no final root, we must have not looped above, which should not be possible
328            let final_root = final_root.ok_or(StageError::Fatal(
329                "Incremental merkle hashing did not produce a final root".into(),
330            ))?;
331
332            let total_hashed_entries = (provider.count_entries::<tables::HashedAccounts>()? +
333                provider.count_entries::<tables::HashedStorages>()?)
334                as u64;
335
336            let entities_checkpoint = EntitiesCheckpoint {
337                // This is fine because `range` doesn't have an upper bound, so in this `else`
338                // branch we're just hashing all remaining accounts and storage slots we have in the
339                // database.
340                processed: total_hashed_entries,
341                total: total_hashed_entries,
342            };
343            // Save the checkpoint
344            (final_root, entities_checkpoint)
345        };
346
347        // Reset the checkpoint
348        self.save_execution_checkpoint(provider, None)?;
349
350        validate_state_root(trie_root, SealedHeader::seal_slow(target_block), to_block)?;
351
352        Ok(ExecOutput {
353            checkpoint: StageCheckpoint::new(to_block)
354                .with_entities_stage_checkpoint(entities_checkpoint),
355            done: true,
356        })
357    }
358
359    /// Unwind the stage.
360    fn unwind(
361        &mut self,
362        provider: &Provider,
363        input: UnwindInput,
364    ) -> Result<UnwindOutput, StageError> {
365        let tx = provider.tx_ref();
366        let range = input.unwind_block_range();
367        if matches!(self, Self::Execution { .. }) {
368            info!(target: "sync::stages::merkle::unwind", "Stage is always skipped");
369            return Ok(UnwindOutput { checkpoint: StageCheckpoint::new(input.unwind_to) })
370        }
371
372        let mut entities_checkpoint =
373            input.checkpoint.entities_stage_checkpoint().unwrap_or(EntitiesCheckpoint {
374                processed: 0,
375                total: (tx.entries::<tables::HashedAccounts>()? +
376                    tx.entries::<tables::HashedStorages>()?) as u64,
377            });
378
379        if input.unwind_to == 0 {
380            tx.clear::<tables::AccountsTrie>()?;
381            tx.clear::<tables::StoragesTrie>()?;
382
383            entities_checkpoint.processed = 0;
384
385            return Ok(UnwindOutput {
386                checkpoint: StageCheckpoint::new(input.unwind_to)
387                    .with_entities_stage_checkpoint(entities_checkpoint),
388            })
389        }
390
391        // Unwind trie only if there are transitions
392        if range.is_empty() {
393            info!(target: "sync::stages::merkle::unwind", "Nothing to unwind");
394        } else {
395            let (block_root, updates) = StateRoot::incremental_root_with_updates(provider, range)
396                .map_err(|e| StageError::Fatal(Box::new(e)))?;
397
398            // Validate the calculated state root
399            let target = provider
400                .header_by_number(input.unwind_to)?
401                .ok_or_else(|| ProviderError::HeaderNotFound(input.unwind_to.into()))?;
402
403            validate_state_root(block_root, SealedHeader::seal_slow(target), input.unwind_to)?;
404
405            // Validation passed, apply unwind changes to the database.
406            provider.write_trie_updates(updates)?;
407
408            // Update entities checkpoint to reflect the unwind operation
409            // Since we're unwinding, we need to recalculate the total entities at the target block
410            let accounts = tx.entries::<tables::HashedAccounts>()?;
411            let storages = tx.entries::<tables::HashedStorages>()?;
412            let total = (accounts + storages) as u64;
413            entities_checkpoint.total = total;
414            entities_checkpoint.processed = total;
415        }
416
417        Ok(UnwindOutput {
418            checkpoint: StageCheckpoint::new(input.unwind_to)
419                .with_entities_stage_checkpoint(entities_checkpoint),
420        })
421    }
422}
423
424/// Check that the computed state root matches the root in the expected header.
425#[inline]
426fn validate_state_root<H: BlockHeader + Sealable + Debug>(
427    got: B256,
428    expected: SealedHeader<H>,
429    target_block: BlockNumber,
430) -> Result<(), StageError> {
431    if got == expected.state_root() {
432        Ok(())
433    } else {
434        error!(target: "sync::stages::merkle", ?target_block, ?got, ?expected, "Failed to verify block state root! {INVALID_STATE_ROOT_ERROR_MESSAGE}");
435        Err(StageError::Block {
436            error: BlockErrorKind::Validation(ConsensusError::BodyStateRootDiff(
437                GotExpected { got, expected: expected.state_root() }.into(),
438            )),
439            block: Box::new(expected.block_with_parent()),
440        })
441    }
442}
443
444#[cfg(test)]
445mod tests {
446    use super::*;
447    use crate::test_utils::{
448        stage_test_suite_ext, ExecuteStageTestRunner, StageTestRunner, StorageKind,
449        TestRunnerError, TestStageDB, UnwindStageTestRunner,
450    };
451    use alloy_primitives::{keccak256, U256};
452    use assert_matches::assert_matches;
453    use reth_db_api::cursor::{DbCursorRO, DbCursorRW, DbDupCursorRO};
454    use reth_primitives_traits::{SealedBlock, StorageEntry};
455    use reth_provider::{providers::StaticFileWriter, StaticFileProviderFactory};
456    use reth_stages_api::StageUnitCheckpoint;
457    use reth_static_file_types::StaticFileSegment;
458    use reth_testing_utils::generators::{
459        self, random_block, random_block_range, random_changeset_range,
460        random_contract_account_range, BlockParams, BlockRangeParams,
461    };
462    use reth_trie::test_utils::{state_root, state_root_prehashed};
463    use std::collections::BTreeMap;
464
465    stage_test_suite_ext!(MerkleTestRunner, merkle);
466
467    /// Execute from genesis so as to merkelize whole state
468    #[tokio::test]
469    async fn execute_clean_merkle() {
470        let (previous_stage, stage_progress) = (500, 0);
471
472        // Set up the runner
473        let mut runner = MerkleTestRunner::default();
474        // set low threshold so we hash the whole storage
475        let input = ExecInput {
476            target: Some(previous_stage),
477            checkpoint: Some(StageCheckpoint::new(stage_progress)),
478        };
479
480        runner.seed_execution(input).expect("failed to seed execution");
481
482        let rx = runner.execute(input);
483
484        // Assert the successful result
485        let result = rx.await.unwrap();
486        assert_matches!(
487            result,
488            Ok(ExecOutput {
489                checkpoint: StageCheckpoint {
490                    block_number,
491                    stage_checkpoint: Some(StageUnitCheckpoint::Entities(EntitiesCheckpoint {
492                        processed,
493                        total
494                    }))
495                },
496                done: true
497            }) if block_number == previous_stage && processed == total &&
498                total == (
499                    runner.db.count_entries::<tables::HashedAccounts>().unwrap() +
500                    runner.db.count_entries::<tables::HashedStorages>().unwrap()
501                ) as u64
502        );
503
504        // Validate the stage execution
505        assert!(runner.validate_execution(input, result.ok()).is_ok(), "execution validation");
506    }
507
508    /// Update small trie
509    #[tokio::test]
510    async fn execute_small_merkle() {
511        let (previous_stage, stage_progress) = (2, 1);
512
513        // Set up the runner
514        let mut runner = MerkleTestRunner::default();
515        let input = ExecInput {
516            target: Some(previous_stage),
517            checkpoint: Some(StageCheckpoint::new(stage_progress)),
518        };
519
520        runner.seed_execution(input).expect("failed to seed execution");
521
522        let rx = runner.execute(input);
523
524        // Assert the successful result
525        let result = rx.await.unwrap();
526        assert_matches!(
527            result,
528            Ok(ExecOutput {
529                checkpoint: StageCheckpoint {
530                    block_number,
531                    stage_checkpoint: Some(StageUnitCheckpoint::Entities(EntitiesCheckpoint {
532                        processed,
533                        total
534                    }))
535                },
536                done: true
537            }) if block_number == previous_stage && processed == total &&
538                total == (
539                    runner.db.count_entries::<tables::HashedAccounts>().unwrap() +
540                    runner.db.count_entries::<tables::HashedStorages>().unwrap()
541                ) as u64
542        );
543
544        // Validate the stage execution
545        assert!(runner.validate_execution(input, result.ok()).is_ok(), "execution validation");
546    }
547
548    #[tokio::test]
549    async fn execute_chunked_merkle() {
550        let (previous_stage, stage_progress) = (200, 100);
551        let clean_threshold = 100;
552        let incremental_threshold = 10;
553
554        // Set up the runner
555        let mut runner =
556            MerkleTestRunner { db: TestStageDB::default(), clean_threshold, incremental_threshold };
557
558        let input = ExecInput {
559            target: Some(previous_stage),
560            checkpoint: Some(StageCheckpoint::new(stage_progress)),
561        };
562
563        runner.seed_execution(input).expect("failed to seed execution");
564        let rx = runner.execute(input);
565
566        // Assert the successful result
567        let result = rx.await.unwrap();
568        assert_matches!(
569            result,
570            Ok(ExecOutput {
571                checkpoint: StageCheckpoint {
572                    block_number,
573                    stage_checkpoint: Some(StageUnitCheckpoint::Entities(EntitiesCheckpoint {
574                        processed,
575                        total
576                    }))
577                },
578                done: true
579            }) if block_number == previous_stage && processed == total &&
580                total == (
581                    runner.db.count_entries::<tables::HashedAccounts>().unwrap() +
582                    runner.db.count_entries::<tables::HashedStorages>().unwrap()
583                ) as u64
584        );
585
586        // Validate the stage execution
587        let provider = runner.db.factory.provider().unwrap();
588        let header = provider.header_by_number(previous_stage).unwrap().unwrap();
589        let expected_root = header.state_root;
590
591        let actual_root = runner
592            .db
593            .query_with_provider(|provider| {
594                Ok(StateRoot::incremental_root_with_updates(
595                    &provider,
596                    stage_progress + 1..=previous_stage,
597                ))
598            })
599            .unwrap();
600
601        assert_eq!(
602            actual_root.unwrap().0,
603            expected_root,
604            "State root mismatch after chunked processing"
605        );
606    }
607
608    struct MerkleTestRunner {
609        db: TestStageDB,
610        clean_threshold: u64,
611        incremental_threshold: u64,
612    }
613
614    impl Default for MerkleTestRunner {
615        fn default() -> Self {
616            Self {
617                db: TestStageDB::default(),
618                clean_threshold: 10000,
619                incremental_threshold: 10000,
620            }
621        }
622    }
623
624    impl StageTestRunner for MerkleTestRunner {
625        type S = MerkleStage;
626
627        fn db(&self) -> &TestStageDB {
628            &self.db
629        }
630
631        fn stage(&self) -> Self::S {
632            Self::S::Both {
633                rebuild_threshold: self.clean_threshold,
634                incremental_threshold: self.incremental_threshold,
635            }
636        }
637    }
638
639    impl ExecuteStageTestRunner for MerkleTestRunner {
640        type Seed = Vec<SealedBlock<reth_ethereum_primitives::Block>>;
641
642        fn seed_execution(&mut self, input: ExecInput) -> Result<Self::Seed, TestRunnerError> {
643            let stage_progress = input.checkpoint().block_number;
644            let start = stage_progress + 1;
645            let end = input.target();
646            let mut rng = generators::rng();
647
648            let mut preblocks = vec![];
649            if stage_progress > 0 {
650                preblocks.append(&mut random_block_range(
651                    &mut rng,
652                    0..=stage_progress - 1,
653                    BlockRangeParams {
654                        parent: Some(B256::ZERO),
655                        tx_count: 0..1,
656                        ..Default::default()
657                    },
658                ));
659                self.db.insert_blocks(preblocks.iter(), StorageKind::Static)?;
660            }
661
662            let num_of_accounts = 31;
663            let accounts = random_contract_account_range(&mut rng, &mut (0..num_of_accounts))
664                .into_iter()
665                .collect::<BTreeMap<_, _>>();
666
667            self.db.insert_accounts_and_storages(
668                accounts.iter().map(|(addr, acc)| (*addr, (*acc, std::iter::empty()))),
669            )?;
670
671            let (header, body) = random_block(
672                &mut rng,
673                stage_progress,
674                BlockParams { parent: preblocks.last().map(|b| b.hash()), ..Default::default() },
675            )
676            .split_sealed_header_body();
677            let mut header = header.unseal();
678
679            header.state_root = state_root(
680                accounts
681                    .clone()
682                    .into_iter()
683                    .map(|(address, account)| (address, (account, std::iter::empty()))),
684            );
685            let sealed_head = SealedBlock::<reth_ethereum_primitives::Block>::from_sealed_parts(
686                SealedHeader::seal_slow(header),
687                body,
688            );
689
690            let head_hash = sealed_head.hash();
691            let mut blocks = vec![sealed_head];
692            blocks.extend(random_block_range(
693                &mut rng,
694                start..=end,
695                BlockRangeParams { parent: Some(head_hash), tx_count: 0..3, ..Default::default() },
696            ));
697            let last_block = blocks.last().cloned().unwrap();
698            self.db.insert_blocks(blocks.iter(), StorageKind::Static)?;
699
700            let (transitions, final_state) = random_changeset_range(
701                &mut rng,
702                blocks.iter(),
703                accounts.into_iter().map(|(addr, acc)| (addr, (acc, Vec::new()))),
704                0..3,
705                0..256,
706            );
707            // add block changeset from block 1.
708            self.db.insert_changesets(transitions, Some(start))?;
709            self.db.insert_accounts_and_storages(final_state)?;
710
711            // Calculate state root
712            let root = self.db.query(|tx| {
713                let mut accounts = BTreeMap::default();
714                let mut accounts_cursor = tx.cursor_read::<tables::HashedAccounts>()?;
715                let mut storage_cursor = tx.cursor_dup_read::<tables::HashedStorages>()?;
716                for entry in accounts_cursor.walk_range(..)? {
717                    let (key, account) = entry?;
718                    let mut storage_entries = Vec::new();
719                    let mut entry = storage_cursor.seek_exact(key)?;
720                    while let Some((_, storage)) = entry {
721                        storage_entries.push(storage);
722                        entry = storage_cursor.next_dup()?;
723                    }
724                    let storage = storage_entries
725                        .into_iter()
726                        .filter(|v| !v.value.is_zero())
727                        .map(|v| (v.key, v.value))
728                        .collect::<Vec<_>>();
729                    accounts.insert(key, (account, storage));
730                }
731
732                Ok(state_root_prehashed(accounts.into_iter()))
733            })?;
734
735            let static_file_provider = self.db.factory.static_file_provider();
736            let mut writer =
737                static_file_provider.latest_writer(StaticFileSegment::Headers).unwrap();
738            let mut last_header = last_block.clone_sealed_header();
739            last_header.set_state_root(root);
740
741            let hash = last_header.hash_slow();
742            writer.prune_headers(1).unwrap();
743            writer.commit().unwrap();
744            writer.append_header(&last_header, &hash).unwrap();
745            writer.commit().unwrap();
746
747            Ok(blocks)
748        }
749
750        fn validate_execution(
751            &self,
752            _input: ExecInput,
753            _output: Option<ExecOutput>,
754        ) -> Result<(), TestRunnerError> {
755            // The execution is validated within the stage
756            Ok(())
757        }
758    }
759
760    impl UnwindStageTestRunner for MerkleTestRunner {
761        fn validate_unwind(&self, _input: UnwindInput) -> Result<(), TestRunnerError> {
762            // The unwind is validated within the stage
763            Ok(())
764        }
765
766        fn before_unwind(&self, input: UnwindInput) -> Result<(), TestRunnerError> {
767            let target_block = input.unwind_to + 1;
768
769            self.db
770                .commit(|tx| {
771                    let mut storage_changesets_cursor =
772                        tx.cursor_dup_read::<tables::StorageChangeSets>().unwrap();
773                    let mut storage_cursor =
774                        tx.cursor_dup_write::<tables::HashedStorages>().unwrap();
775
776                    let mut tree: BTreeMap<B256, BTreeMap<B256, U256>> = BTreeMap::new();
777
778                    let mut rev_changeset_walker =
779                        storage_changesets_cursor.walk_back(None).unwrap();
780                    while let Some((bn_address, entry)) =
781                        rev_changeset_walker.next().transpose().unwrap()
782                    {
783                        if bn_address.block_number() < target_block {
784                            break
785                        }
786
787                        tree.entry(keccak256(bn_address.address()))
788                            .or_default()
789                            .insert(keccak256(entry.key), entry.value);
790                    }
791                    for (hashed_address, storage) in tree {
792                        for (hashed_slot, value) in storage {
793                            let storage_entry = storage_cursor
794                                .seek_by_key_subkey(hashed_address, hashed_slot)
795                                .unwrap();
796                            if storage_entry.is_some_and(|v| v.key == hashed_slot) {
797                                storage_cursor.delete_current().unwrap();
798                            }
799
800                            if !value.is_zero() {
801                                let storage_entry = StorageEntry { key: hashed_slot, value };
802                                storage_cursor.upsert(hashed_address, &storage_entry).unwrap();
803                            }
804                        }
805                    }
806
807                    let mut changeset_cursor =
808                        tx.cursor_dup_write::<tables::AccountChangeSets>().unwrap();
809                    let mut rev_changeset_walker = changeset_cursor.walk_back(None).unwrap();
810
811                    while let Some((block_number, account_before_tx)) =
812                        rev_changeset_walker.next().transpose().unwrap()
813                    {
814                        if block_number < target_block {
815                            break
816                        }
817
818                        if let Some(acc) = account_before_tx.info {
819                            tx.put::<tables::HashedAccounts>(
820                                keccak256(account_before_tx.address),
821                                acc,
822                            )
823                            .unwrap();
824                        } else {
825                            tx.delete::<tables::HashedAccounts>(
826                                keccak256(account_before_tx.address),
827                                None,
828                            )
829                            .unwrap();
830                        }
831                    }
832                    Ok(())
833                })
834                .unwrap();
835            Ok(())
836        }
837    }
838}