Skip to main content

reth_stages/stages/
merkle.rs

1use alloy_consensus::{constants::KECCAK_EMPTY, BlockHeader};
2use alloy_primitives::{BlockNumber, Sealable, B256};
3use reth_codecs::Compact;
4use reth_consensus::ConsensusError;
5use reth_db_api::{
6    tables,
7    transaction::{DbTx, DbTxMut},
8};
9use reth_primitives_traits::{GotExpected, SealedHeader};
10use reth_provider::{
11    ChangeSetReader, DBProvider, HeaderProvider, ProviderError, StageCheckpointReader,
12    StageCheckpointWriter, StatsReader, StorageChangeSetReader, StorageSettingsCache, TrieWriter,
13};
14use reth_stages_api::{
15    BlockErrorKind, EntitiesCheckpoint, ExecInput, ExecOutput, MerkleCheckpoint, Stage,
16    StageCheckpoint, StageError, StageId, StorageRootMerkleCheckpoint, UnwindInput, UnwindOutput,
17};
18use reth_trie::{IntermediateStateRootState, StateRoot, StateRootProgress, StoredSubNode};
19use reth_trie_db::DatabaseStateRoot;
20
21use std::fmt::Debug;
22
23type DbStateRoot<'a, TX, A> = StateRoot<
24    reth_trie_db::DatabaseTrieCursorFactory<&'a TX, A>,
25    reth_trie_db::DatabaseHashedCursorFactory<&'a TX>,
26>;
27use tracing::*;
28
29// TODO: automate the process outlined below so the user can just send in a debugging package
30/// The error message that we include in invalid state root errors to tell users what information
31/// they should include in a bug report, since true state root errors can be impossible to debug
32/// with just basic logs.
33pub const INVALID_STATE_ROOT_ERROR_MESSAGE: &str = r#"
34Invalid state root error on stage verification!
35This is an error that likely requires a report to the reth team with additional information.
36Please include the following information in your report:
37 * This error message
38 * The state root of the block that was rejected
39 * The output of `reth db stats --checksum` from the database that was being used. This will take a long time to run!
40 * 50-100 lines of logs before and after the first occurrence of the log message with the state root of the block that was rejected.
41 * The debug logs from __the same time period__. To find the default location for these logs, run:
42   `reth --help | grep -A 4 'log.file.directory'`
43
44Once you have this information, please submit a github issue at https://github.com/paradigmxyz/reth/issues/new
45"#;
46
47/// The default threshold (in number of blocks) for switching from incremental trie building
48/// of changes to whole rebuild.
49pub const MERKLE_STAGE_DEFAULT_REBUILD_THRESHOLD: u64 = 100_000;
50
51/// The default threshold (in number of blocks) to run the stage in incremental mode. The
52/// incremental mode will calculate the state root for a large range of blocks by calculating the
53/// new state root for this many blocks, in batches, repeating until we reach the desired block
54/// number.
55pub const MERKLE_STAGE_DEFAULT_INCREMENTAL_THRESHOLD: u64 = 7_000;
56
57/// The merkle hashing stage uses input from
58/// [`AccountHashingStage`][crate::stages::AccountHashingStage] and
59/// [`StorageHashingStage`][crate::stages::StorageHashingStage] to calculate intermediate hashes
60/// and state roots.
61///
62/// This stage should be run with the above two stages, otherwise it is a no-op.
63///
64/// This stage is split in two: one for calculating hashes and one for unwinding.
65///
66/// When run in execution, it's going to be executed AFTER the hashing stages, to generate
67/// the state root. When run in unwind mode, it's going to be executed BEFORE the hashing stages,
68/// so that it unwinds the intermediate hashes based on the unwound hashed state from the hashing
69/// stages. The order of these two variants is important. The unwind variant should be added to the
70/// pipeline before the execution variant.
71///
72/// An example pipeline to only hash state would be:
73///
74/// - [`MerkleStage::Unwind`]
75/// - [`AccountHashingStage`][crate::stages::AccountHashingStage]
76/// - [`StorageHashingStage`][crate::stages::StorageHashingStage]
77/// - [`MerkleStage::Execution`]
78#[derive(Debug, Clone)]
79pub enum MerkleStage {
80    /// The execution portion of the merkle stage.
81    Execution {
82        // TODO: make struct for holding incremental settings, for code reuse between `Execution`
83        // variant and `Both`
84        /// The threshold (in number of blocks) for switching from incremental trie building
85        /// of changes to whole rebuild.
86        rebuild_threshold: u64,
87        /// The threshold (in number of blocks) to run the stage in incremental mode. The
88        /// incremental mode will calculate the state root by calculating the new state root for
89        /// some number of blocks, repeating until we reach the desired block number.
90        incremental_threshold: u64,
91    },
92    /// The unwind portion of the merkle stage.
93    Unwind,
94    /// Able to execute and unwind. Used for tests
95    #[cfg(any(test, feature = "test-utils"))]
96    Both {
97        /// The threshold (in number of blocks) for switching from incremental trie building
98        /// of changes to whole rebuild.
99        rebuild_threshold: u64,
100        /// The threshold (in number of blocks) to run the stage in incremental mode. The
101        /// incremental mode will calculate the state root by calculating the new state root for
102        /// some number of blocks, repeating until we reach the desired block number.
103        incremental_threshold: u64,
104    },
105}
106
107impl MerkleStage {
108    /// Stage default for the [`MerkleStage::Execution`].
109    pub const fn default_execution() -> Self {
110        Self::Execution {
111            rebuild_threshold: MERKLE_STAGE_DEFAULT_REBUILD_THRESHOLD,
112            incremental_threshold: MERKLE_STAGE_DEFAULT_INCREMENTAL_THRESHOLD,
113        }
114    }
115
116    /// Stage default for the [`MerkleStage::Unwind`].
117    pub const fn default_unwind() -> Self {
118        Self::Unwind
119    }
120
121    /// Create new instance of [`MerkleStage::Execution`].
122    pub const fn new_execution(rebuild_threshold: u64, incremental_threshold: u64) -> Self {
123        Self::Execution { rebuild_threshold, incremental_threshold }
124    }
125
126    /// Gets the hashing progress
127    pub fn get_execution_checkpoint(
128        &self,
129        provider: &impl StageCheckpointReader,
130    ) -> Result<Option<MerkleCheckpoint>, StageError> {
131        let buf =
132            provider.get_stage_checkpoint_progress(StageId::MerkleExecute)?.unwrap_or_default();
133
134        if buf.is_empty() {
135            return Ok(None)
136        }
137
138        let (checkpoint, _) = MerkleCheckpoint::from_compact(&buf, buf.len());
139        Ok(Some(checkpoint))
140    }
141
142    /// Saves the hashing progress
143    pub fn save_execution_checkpoint(
144        &self,
145        provider: &impl StageCheckpointWriter,
146        checkpoint: Option<MerkleCheckpoint>,
147    ) -> Result<(), StageError> {
148        let mut buf = vec![];
149        if let Some(checkpoint) = checkpoint {
150            debug!(
151                target: "sync::stages::merkle::exec",
152                last_account_key = ?checkpoint.last_account_key,
153                "Saving inner merkle checkpoint"
154            );
155            checkpoint.to_compact(&mut buf);
156        }
157        Ok(provider.save_stage_checkpoint_progress(StageId::MerkleExecute, buf)?)
158    }
159}
160
161impl<Provider> Stage<Provider> for MerkleStage
162where
163    Provider: DBProvider<Tx: DbTxMut>
164        + TrieWriter
165        + StatsReader
166        + HeaderProvider
167        + ChangeSetReader
168        + StorageChangeSetReader
169        + StorageSettingsCache
170        + StageCheckpointReader
171        + StageCheckpointWriter,
172{
173    /// Return the id of the stage
174    fn id(&self) -> StageId {
175        match self {
176            Self::Execution { .. } => StageId::MerkleExecute,
177            Self::Unwind => StageId::MerkleUnwind,
178            #[cfg(any(test, feature = "test-utils"))]
179            Self::Both { .. } => StageId::Other("MerkleBoth"),
180        }
181    }
182
183    /// Execute the stage.
184    fn execute(&mut self, provider: &Provider, input: ExecInput) -> Result<ExecOutput, StageError> {
185        let (threshold, incremental_threshold) = match self {
186            Self::Unwind => {
187                info!(target: "sync::stages::merkle::unwind", "Stage is always skipped");
188                return Ok(ExecOutput::done(StageCheckpoint::new(input.target())))
189            }
190            Self::Execution { rebuild_threshold, incremental_threshold } => {
191                (*rebuild_threshold, *incremental_threshold)
192            }
193            #[cfg(any(test, feature = "test-utils"))]
194            Self::Both { rebuild_threshold, incremental_threshold } => {
195                (*rebuild_threshold, *incremental_threshold)
196            }
197        };
198
199        let range = input.next_block_range();
200        let (from_block, to_block) = range.clone().into_inner();
201        let current_block_number = input.checkpoint().block_number;
202
203        let target_block = provider
204            .header_by_number(to_block)?
205            .ok_or_else(|| ProviderError::HeaderNotFound(to_block.into()))?;
206        let target_block_root = target_block.state_root();
207
208        let (trie_root, entities_checkpoint) = if range.is_empty() {
209            (target_block_root, input.checkpoint().entities_stage_checkpoint().unwrap_or_default())
210        } else if to_block - from_block > threshold || from_block == 1 {
211            let mut checkpoint = self.get_execution_checkpoint(provider)?;
212
213            // if there are more blocks than threshold it is faster to rebuild the trie
214            let mut entities_checkpoint = if let Some(checkpoint) =
215                checkpoint.as_ref().filter(|c| c.target_block == to_block)
216            {
217                debug!(
218                    target: "sync::stages::merkle::exec",
219                    current = ?current_block_number,
220                    target = ?to_block,
221                    last_account_key = ?checkpoint.last_account_key,
222                    "Continuing inner merkle checkpoint"
223                );
224
225                input.checkpoint().entities_stage_checkpoint()
226            } else {
227                debug!(
228                    target: "sync::stages::merkle::exec",
229                    current = ?current_block_number,
230                    target = ?to_block,
231                    previous_checkpoint = ?checkpoint,
232                    "Rebuilding trie"
233                );
234                // Reset the checkpoint and clear trie tables
235                checkpoint = None;
236                self.save_execution_checkpoint(provider, None)?;
237                provider.tx_ref().clear::<tables::AccountsTrie>()?;
238                provider.tx_ref().clear::<tables::StoragesTrie>()?;
239
240                None
241            }
242            .unwrap_or(EntitiesCheckpoint {
243                processed: 0,
244                total: (provider.count_entries::<tables::HashedAccounts>()? +
245                    provider.count_entries::<tables::HashedStorages>()?)
246                    as u64,
247            });
248
249            let tx = provider.tx_ref();
250            let progress = reth_trie_db::with_adapter!(provider, |A| {
251                DbStateRoot::<_, A>::from_tx(tx)
252                    .with_intermediate_state(checkpoint.map(IntermediateStateRootState::from))
253                    .root_with_progress()
254            })
255            .map_err(|e| {
256                error!(target: "sync::stages::merkle", %e, ?current_block_number, ?to_block, "State root with progress failed! {INVALID_STATE_ROOT_ERROR_MESSAGE}");
257                StageError::Fatal(Box::new(e))
258            })?;
259            match progress {
260                StateRootProgress::Progress(state, hashed_entries_walked, updates) => {
261                    provider.write_trie_updates(updates)?;
262
263                    let mut checkpoint = MerkleCheckpoint::new(
264                        to_block,
265                        state.account_root_state.last_hashed_key,
266                        state
267                            .account_root_state
268                            .walker_stack
269                            .into_iter()
270                            .map(StoredSubNode::from)
271                            .collect(),
272                        state.account_root_state.hash_builder.into(),
273                    );
274
275                    // Save storage root state if present
276                    if let Some(storage_state) = state.storage_root_state {
277                        checkpoint.storage_root_checkpoint =
278                            Some(StorageRootMerkleCheckpoint::new(
279                                storage_state.state.last_hashed_key,
280                                storage_state
281                                    .state
282                                    .walker_stack
283                                    .into_iter()
284                                    .map(StoredSubNode::from)
285                                    .collect(),
286                                storage_state.state.hash_builder.into(),
287                                storage_state.account.nonce,
288                                storage_state.account.balance,
289                                storage_state.account.bytecode_hash.unwrap_or(KECCAK_EMPTY),
290                            ));
291                    }
292                    self.save_execution_checkpoint(provider, Some(checkpoint))?;
293
294                    entities_checkpoint.processed += hashed_entries_walked as u64;
295
296                    return Ok(ExecOutput {
297                        checkpoint: input
298                            .checkpoint()
299                            .with_entities_stage_checkpoint(entities_checkpoint),
300                        done: false,
301                    })
302                }
303                StateRootProgress::Complete(root, hashed_entries_walked, updates) => {
304                    provider.write_trie_updates(updates)?;
305
306                    entities_checkpoint.processed += hashed_entries_walked as u64;
307
308                    (root, entities_checkpoint)
309                }
310            }
311        } else {
312            debug!(target: "sync::stages::merkle::exec", current = ?current_block_number, target = ?to_block, "Updating trie in chunks");
313            let mut final_root = None;
314            for start_block in range.step_by(incremental_threshold as usize) {
315                let chunk_to = std::cmp::min(start_block + incremental_threshold, to_block);
316                let chunk_range = start_block..=chunk_to;
317                debug!(
318                    target: "sync::stages::merkle::exec",
319                    current = ?current_block_number,
320                    target = ?to_block,
321                    incremental_threshold,
322                    chunk_range = ?chunk_range,
323                    "Processing chunk"
324                );
325                let (root, updates) = reth_trie_db::with_adapter!(provider, |A| {
326                    DbStateRoot::<_, A>::incremental_root_with_updates(provider, chunk_range)
327                })
328                .map_err(|e| {
329                    error!(target: "sync::stages::merkle", %e, ?current_block_number, ?to_block, "Incremental state root failed! {INVALID_STATE_ROOT_ERROR_MESSAGE}");
330                    StageError::Fatal(Box::new(e))
331                })?;
332                provider.write_trie_updates(updates)?;
333                final_root = Some(root);
334            }
335
336            // if we had no final root, we must have not looped above, which should not be possible
337            let final_root = final_root.ok_or(StageError::Fatal(
338                "Incremental merkle hashing did not produce a final root".into(),
339            ))?;
340
341            let total_hashed_entries = (provider.count_entries::<tables::HashedAccounts>()? +
342                provider.count_entries::<tables::HashedStorages>()?)
343                as u64;
344
345            let entities_checkpoint = EntitiesCheckpoint {
346                // This is fine because `range` doesn't have an upper bound, so in this `else`
347                // branch we're just hashing all remaining accounts and storage slots we have in the
348                // database.
349                processed: total_hashed_entries,
350                total: total_hashed_entries,
351            };
352            // Save the checkpoint
353            (final_root, entities_checkpoint)
354        };
355
356        // Reset the checkpoint
357        self.save_execution_checkpoint(provider, None)?;
358
359        validate_state_root(trie_root, SealedHeader::seal_slow(target_block), to_block)?;
360
361        Ok(ExecOutput {
362            checkpoint: StageCheckpoint::new(to_block)
363                .with_entities_stage_checkpoint(entities_checkpoint),
364            done: true,
365        })
366    }
367
368    /// Unwind the stage.
369    fn unwind(
370        &mut self,
371        provider: &Provider,
372        input: UnwindInput,
373    ) -> Result<UnwindOutput, StageError> {
374        let tx = provider.tx_ref();
375        let range = input.unwind_block_range();
376        if matches!(self, Self::Execution { .. }) {
377            info!(target: "sync::stages::merkle::unwind", "Stage is always skipped");
378            return Ok(UnwindOutput { checkpoint: StageCheckpoint::new(input.unwind_to) })
379        }
380
381        let mut entities_checkpoint =
382            input.checkpoint.entities_stage_checkpoint().unwrap_or(EntitiesCheckpoint {
383                processed: 0,
384                total: (tx.entries::<tables::HashedAccounts>()? +
385                    tx.entries::<tables::HashedStorages>()?) as u64,
386            });
387
388        if input.unwind_to == 0 {
389            tx.clear::<tables::AccountsTrie>()?;
390            tx.clear::<tables::StoragesTrie>()?;
391
392            entities_checkpoint.processed = 0;
393
394            return Ok(UnwindOutput {
395                checkpoint: StageCheckpoint::new(input.unwind_to)
396                    .with_entities_stage_checkpoint(entities_checkpoint),
397            })
398        }
399
400        // Unwind trie only if there are transitions
401        if range.is_empty() {
402            info!(target: "sync::stages::merkle::unwind", "Nothing to unwind");
403        } else {
404            let (block_root, updates) = reth_trie_db::with_adapter!(provider, |A| {
405                DbStateRoot::<_, A>::incremental_root_with_updates(provider, range)
406            })
407            .map_err(|e| StageError::Fatal(Box::new(e)))?;
408
409            // Validate the calculated state root
410            let target = provider
411                .header_by_number(input.unwind_to)?
412                .ok_or_else(|| ProviderError::HeaderNotFound(input.unwind_to.into()))?;
413
414            validate_state_root(block_root, SealedHeader::seal_slow(target), input.unwind_to)?;
415
416            // Validation passed, apply unwind changes to the database.
417            provider.write_trie_updates(updates)?;
418
419            // Update entities checkpoint to reflect the unwind operation
420            // Since we're unwinding, we need to recalculate the total entities at the target block
421            let accounts = tx.entries::<tables::HashedAccounts>()?;
422            let storages = tx.entries::<tables::HashedStorages>()?;
423            let total = (accounts + storages) as u64;
424            entities_checkpoint.total = total;
425            entities_checkpoint.processed = total;
426        }
427
428        Ok(UnwindOutput {
429            checkpoint: StageCheckpoint::new(input.unwind_to)
430                .with_entities_stage_checkpoint(entities_checkpoint),
431        })
432    }
433}
434
435/// Check that the computed state root matches the root in the expected header.
436#[inline]
437fn validate_state_root<H: BlockHeader + Sealable + Debug>(
438    got: B256,
439    expected: SealedHeader<H>,
440    target_block: BlockNumber,
441) -> Result<(), StageError> {
442    if got == expected.state_root() {
443        Ok(())
444    } else {
445        error!(target: "sync::stages::merkle", ?target_block, ?got, ?expected, "Failed to verify block state root! {INVALID_STATE_ROOT_ERROR_MESSAGE}");
446        Err(StageError::Block {
447            error: BlockErrorKind::Validation(ConsensusError::BodyStateRootDiff(
448                GotExpected { got, expected: expected.state_root() }.into(),
449            )),
450            block: Box::new(expected.block_with_parent()),
451        })
452    }
453}
454
455#[cfg(test)]
456mod tests {
457    use super::*;
458    use crate::test_utils::{
459        stage_test_suite_ext, ExecuteStageTestRunner, StageTestRunner, StorageKind,
460        TestRunnerError, TestStageDB, UnwindStageTestRunner,
461    };
462    use alloy_primitives::{keccak256, U256};
463    use assert_matches::assert_matches;
464    use reth_db_api::cursor::{DbCursorRO, DbCursorRW, DbDupCursorRO};
465    use reth_primitives_traits::{SealedBlock, StorageEntry};
466    use reth_provider::{providers::StaticFileWriter, StaticFileProviderFactory};
467    use reth_stages_api::StageUnitCheckpoint;
468    use reth_static_file_types::StaticFileSegment;
469    use reth_testing_utils::generators::{
470        self, random_block, random_block_range, random_changeset_range,
471        random_contract_account_range, BlockParams, BlockRangeParams,
472    };
473    use reth_trie::test_utils::{state_root, state_root_prehashed};
474    use std::collections::BTreeMap;
475
476    stage_test_suite_ext!(MerkleTestRunner, merkle);
477
478    /// Execute from genesis so as to merkelize whole state
479    #[tokio::test]
480    async fn execute_clean_merkle() {
481        let (previous_stage, stage_progress) = (500, 0);
482
483        // Set up the runner
484        let mut runner = MerkleTestRunner::default();
485        // set low threshold so we hash the whole storage
486        let input = ExecInput {
487            target: Some(previous_stage),
488            checkpoint: Some(StageCheckpoint::new(stage_progress)),
489        };
490
491        runner.seed_execution(input).expect("failed to seed execution");
492
493        let rx = runner.execute(input);
494
495        // Assert the successful result
496        let result = rx.await.unwrap();
497        assert_matches!(
498            result,
499            Ok(ExecOutput {
500                checkpoint: StageCheckpoint {
501                    block_number,
502                    stage_checkpoint: Some(StageUnitCheckpoint::Entities(EntitiesCheckpoint {
503                        processed,
504                        total
505                    }))
506                },
507                done: true
508            }) if block_number == previous_stage && processed == total &&
509                total == (
510                    runner.db.count_entries::<tables::HashedAccounts>().unwrap() +
511                    runner.db.count_entries::<tables::HashedStorages>().unwrap()
512                ) as u64
513        );
514
515        // Validate the stage execution
516        assert!(runner.validate_execution(input, result.ok()).is_ok(), "execution validation");
517    }
518
519    /// Update small trie
520    #[tokio::test]
521    async fn execute_small_merkle() {
522        let (previous_stage, stage_progress) = (2, 1);
523
524        // Set up the runner
525        let mut runner = MerkleTestRunner::default();
526        let input = ExecInput {
527            target: Some(previous_stage),
528            checkpoint: Some(StageCheckpoint::new(stage_progress)),
529        };
530
531        runner.seed_execution(input).expect("failed to seed execution");
532
533        let rx = runner.execute(input);
534
535        // Assert the successful result
536        let result = rx.await.unwrap();
537        assert_matches!(
538            result,
539            Ok(ExecOutput {
540                checkpoint: StageCheckpoint {
541                    block_number,
542                    stage_checkpoint: Some(StageUnitCheckpoint::Entities(EntitiesCheckpoint {
543                        processed,
544                        total
545                    }))
546                },
547                done: true
548            }) if block_number == previous_stage && processed == total &&
549                total == (
550                    runner.db.count_entries::<tables::HashedAccounts>().unwrap() +
551                    runner.db.count_entries::<tables::HashedStorages>().unwrap()
552                ) as u64
553        );
554
555        // Validate the stage execution
556        assert!(runner.validate_execution(input, result.ok()).is_ok(), "execution validation");
557    }
558
559    #[tokio::test]
560    async fn execute_chunked_merkle() {
561        let (previous_stage, stage_progress) = (200, 100);
562        let clean_threshold = 100;
563        let incremental_threshold = 10;
564
565        // Set up the runner
566        let mut runner =
567            MerkleTestRunner { db: TestStageDB::default(), clean_threshold, incremental_threshold };
568
569        let input = ExecInput {
570            target: Some(previous_stage),
571            checkpoint: Some(StageCheckpoint::new(stage_progress)),
572        };
573
574        runner.seed_execution(input).expect("failed to seed execution");
575        let rx = runner.execute(input);
576
577        // Assert the successful result
578        let result = rx.await.unwrap();
579        assert_matches!(
580            result,
581            Ok(ExecOutput {
582                checkpoint: StageCheckpoint {
583                    block_number,
584                    stage_checkpoint: Some(StageUnitCheckpoint::Entities(EntitiesCheckpoint {
585                        processed,
586                        total
587                    }))
588                },
589                done: true
590            }) if block_number == previous_stage && processed == total &&
591                total == (
592                    runner.db.count_entries::<tables::HashedAccounts>().unwrap() +
593                    runner.db.count_entries::<tables::HashedStorages>().unwrap()
594                ) as u64
595        );
596
597        // Validate the stage execution
598        let provider = runner.db.factory.provider().unwrap();
599        let header = provider.header_by_number(previous_stage).unwrap().unwrap();
600        let expected_root = header.state_root;
601
602        let actual_root = runner
603            .db
604            .query_with_provider(|provider| {
605                Ok(reth_trie_db::with_adapter!(provider, |A| {
606                    DbStateRoot::<_, A>::incremental_root_with_updates(
607                        &provider,
608                        stage_progress + 1..=previous_stage,
609                    )
610                }))
611            })
612            .unwrap();
613
614        assert_eq!(
615            actual_root.unwrap().0,
616            expected_root,
617            "State root mismatch after chunked processing"
618        );
619    }
620
621    struct MerkleTestRunner {
622        db: TestStageDB,
623        clean_threshold: u64,
624        incremental_threshold: u64,
625    }
626
627    impl Default for MerkleTestRunner {
628        fn default() -> Self {
629            Self {
630                db: TestStageDB::default(),
631                clean_threshold: 10000,
632                incremental_threshold: 10000,
633            }
634        }
635    }
636
637    impl StageTestRunner for MerkleTestRunner {
638        type S = MerkleStage;
639
640        fn db(&self) -> &TestStageDB {
641            &self.db
642        }
643
644        fn stage(&self) -> Self::S {
645            Self::S::Both {
646                rebuild_threshold: self.clean_threshold,
647                incremental_threshold: self.incremental_threshold,
648            }
649        }
650    }
651
652    impl ExecuteStageTestRunner for MerkleTestRunner {
653        type Seed = Vec<SealedBlock<reth_ethereum_primitives::Block>>;
654
655        fn seed_execution(&mut self, input: ExecInput) -> Result<Self::Seed, TestRunnerError> {
656            let stage_progress = input.checkpoint().block_number;
657            let start = stage_progress + 1;
658            let end = input.target();
659            let mut rng = generators::rng();
660
661            let mut preblocks = vec![];
662            if stage_progress > 0 {
663                preblocks.append(&mut random_block_range(
664                    &mut rng,
665                    0..=stage_progress - 1,
666                    BlockRangeParams {
667                        parent: Some(B256::ZERO),
668                        tx_count: 0..1,
669                        ..Default::default()
670                    },
671                ));
672                self.db.insert_blocks(preblocks.iter(), StorageKind::Static)?;
673            }
674
675            let num_of_accounts = 31;
676            let accounts = random_contract_account_range(&mut rng, &mut (0..num_of_accounts))
677                .into_iter()
678                .collect::<BTreeMap<_, _>>();
679
680            self.db.insert_accounts_and_storages(
681                accounts.iter().map(|(addr, acc)| (*addr, (*acc, std::iter::empty()))),
682            )?;
683
684            let (header, body) = random_block(
685                &mut rng,
686                stage_progress,
687                BlockParams { parent: preblocks.last().map(|b| b.hash()), ..Default::default() },
688            )
689            .split_sealed_header_body();
690            let mut header = header.unseal();
691
692            header.state_root = state_root(
693                accounts
694                    .clone()
695                    .into_iter()
696                    .map(|(address, account)| (address, (account, std::iter::empty()))),
697            );
698            let sealed_head = SealedBlock::<reth_ethereum_primitives::Block>::from_sealed_parts(
699                SealedHeader::seal_slow(header),
700                body,
701            );
702
703            let head_hash = sealed_head.hash();
704            let mut blocks = vec![sealed_head];
705            blocks.extend(random_block_range(
706                &mut rng,
707                start..=end,
708                BlockRangeParams { parent: Some(head_hash), tx_count: 0..3, ..Default::default() },
709            ));
710            let last_block = blocks.last().cloned().unwrap();
711            self.db.insert_blocks(blocks.iter(), StorageKind::Static)?;
712
713            let (transitions, final_state) = random_changeset_range(
714                &mut rng,
715                blocks.iter(),
716                accounts.into_iter().map(|(addr, acc)| (addr, (acc, Vec::new()))),
717                0..3,
718                0..256,
719            );
720            // add block changeset from block 1.
721            self.db.insert_changesets(transitions, Some(start))?;
722            self.db.insert_accounts_and_storages(final_state)?;
723
724            // Calculate state root
725            let root = self.db.query(|tx| {
726                let mut accounts = BTreeMap::default();
727                let mut accounts_cursor = tx.cursor_read::<tables::HashedAccounts>()?;
728                let mut storage_cursor = tx.cursor_dup_read::<tables::HashedStorages>()?;
729                for entry in accounts_cursor.walk_range(..)? {
730                    let (key, account) = entry?;
731                    let mut storage_entries = Vec::new();
732                    let mut entry = storage_cursor.seek_exact(key)?;
733                    while let Some((_, storage)) = entry {
734                        storage_entries.push(storage);
735                        entry = storage_cursor.next_dup()?;
736                    }
737                    let storage = storage_entries
738                        .into_iter()
739                        .filter(|v| !v.value.is_zero())
740                        .map(|v| (v.key, v.value))
741                        .collect::<Vec<_>>();
742                    accounts.insert(key, (account, storage));
743                }
744
745                Ok(state_root_prehashed(accounts))
746            })?;
747
748            let static_file_provider = self.db.factory.static_file_provider();
749            let mut writer =
750                static_file_provider.latest_writer(StaticFileSegment::Headers).unwrap();
751            let mut last_header = last_block.clone_sealed_header();
752            last_header.set_state_root(root);
753
754            let hash = last_header.hash_slow();
755            writer.prune_headers(1).unwrap();
756            writer.commit().unwrap();
757            writer.append_header(&last_header, &hash).unwrap();
758            writer.commit().unwrap();
759
760            Ok(blocks)
761        }
762
763        fn validate_execution(
764            &self,
765            _input: ExecInput,
766            _output: Option<ExecOutput>,
767        ) -> Result<(), TestRunnerError> {
768            // The execution is validated within the stage
769            Ok(())
770        }
771    }
772
773    impl UnwindStageTestRunner for MerkleTestRunner {
774        fn validate_unwind(&self, _input: UnwindInput) -> Result<(), TestRunnerError> {
775            // The unwind is validated within the stage
776            Ok(())
777        }
778
779        fn before_unwind(&self, input: UnwindInput) -> Result<(), TestRunnerError> {
780            let target_block = input.unwind_to + 1;
781
782            self.db
783                .commit(|tx| {
784                    let mut storage_changesets_cursor =
785                        tx.cursor_dup_read::<tables::StorageChangeSets>().unwrap();
786                    let mut storage_cursor =
787                        tx.cursor_dup_write::<tables::HashedStorages>().unwrap();
788
789                    let mut tree: BTreeMap<B256, BTreeMap<B256, U256>> = BTreeMap::new();
790
791                    let mut rev_changeset_walker =
792                        storage_changesets_cursor.walk_back(None).unwrap();
793                    while let Some((bn_address, entry)) =
794                        rev_changeset_walker.next().transpose().unwrap()
795                    {
796                        if bn_address.block_number() < target_block {
797                            break
798                        }
799
800                        tree.entry(keccak256(bn_address.address()))
801                            .or_default()
802                            .insert(keccak256(entry.key), entry.value);
803                    }
804                    for (hashed_address, storage) in tree {
805                        for (hashed_slot, value) in storage {
806                            let storage_entry = storage_cursor
807                                .seek_by_key_subkey(hashed_address, hashed_slot)
808                                .unwrap();
809                            if storage_entry.is_some_and(|v| v.key == hashed_slot) {
810                                storage_cursor.delete_current().unwrap();
811                            }
812
813                            if !value.is_zero() {
814                                let storage_entry = StorageEntry { key: hashed_slot, value };
815                                storage_cursor.upsert(hashed_address, &storage_entry).unwrap();
816                            }
817                        }
818                    }
819
820                    let mut changeset_cursor =
821                        tx.cursor_dup_write::<tables::AccountChangeSets>().unwrap();
822                    let mut rev_changeset_walker = changeset_cursor.walk_back(None).unwrap();
823
824                    while let Some((block_number, account_before_tx)) =
825                        rev_changeset_walker.next().transpose().unwrap()
826                    {
827                        if block_number < target_block {
828                            break
829                        }
830
831                        if let Some(acc) = account_before_tx.info {
832                            tx.put::<tables::HashedAccounts>(
833                                keccak256(account_before_tx.address),
834                                acc,
835                            )
836                            .unwrap();
837                        } else {
838                            tx.delete::<tables::HashedAccounts>(
839                                keccak256(account_before_tx.address),
840                                None,
841                            )
842                            .unwrap();
843                        }
844                    }
845                    Ok(())
846                })
847                .unwrap();
848            Ok(())
849        }
850    }
851}