1use alloy_consensus::{constants::KECCAK_EMPTY, BlockHeader};
2use alloy_primitives::{BlockNumber, Sealable, B256};
3use reth_codecs::Compact;
4use reth_consensus::ConsensusError;
5use reth_db_api::{
6 tables,
7 transaction::{DbTx, DbTxMut},
8};
9use reth_primitives_traits::{GotExpected, SealedHeader};
10use reth_provider::{
11 ChangeSetReader, DBProvider, HeaderProvider, ProviderError, StageCheckpointReader,
12 StageCheckpointWriter, StatsReader, StorageChangeSetReader, StorageSettingsCache, TrieWriter,
13};
14use reth_stages_api::{
15 BlockErrorKind, EntitiesCheckpoint, ExecInput, ExecOutput, MerkleCheckpoint, Stage,
16 StageCheckpoint, StageError, StageId, StorageRootMerkleCheckpoint, UnwindInput, UnwindOutput,
17};
18use reth_trie::{IntermediateStateRootState, StateRoot, StateRootProgress, StoredSubNode};
19use reth_trie_db::DatabaseStateRoot;
20
21use std::fmt::Debug;
22
23type DbStateRoot<'a, TX, A> = StateRoot<
24 reth_trie_db::DatabaseTrieCursorFactory<&'a TX, A>,
25 reth_trie_db::DatabaseHashedCursorFactory<&'a TX>,
26>;
27use tracing::*;
28
29pub const INVALID_STATE_ROOT_ERROR_MESSAGE: &str = r#"
34Invalid state root error on stage verification!
35This is an error that likely requires a report to the reth team with additional information.
36Please include the following information in your report:
37 * This error message
38 * The state root of the block that was rejected
39 * The output of `reth db stats --checksum` from the database that was being used. This will take a long time to run!
40 * 50-100 lines of logs before and after the first occurrence of the log message with the state root of the block that was rejected.
41 * The debug logs from __the same time period__. To find the default location for these logs, run:
42 `reth --help | grep -A 4 'log.file.directory'`
43
44Once you have this information, please submit a github issue at https://github.com/paradigmxyz/reth/issues/new
45"#;
46
47pub const MERKLE_STAGE_DEFAULT_REBUILD_THRESHOLD: u64 = 100_000;
50
51pub const MERKLE_STAGE_DEFAULT_INCREMENTAL_THRESHOLD: u64 = 7_000;
56
57#[derive(Debug, Clone)]
79pub enum MerkleStage {
80 Execution {
82 rebuild_threshold: u64,
87 incremental_threshold: u64,
91 },
92 Unwind,
94 #[cfg(any(test, feature = "test-utils"))]
96 Both {
97 rebuild_threshold: u64,
100 incremental_threshold: u64,
104 },
105}
106
107impl MerkleStage {
108 pub const fn default_execution() -> Self {
110 Self::Execution {
111 rebuild_threshold: MERKLE_STAGE_DEFAULT_REBUILD_THRESHOLD,
112 incremental_threshold: MERKLE_STAGE_DEFAULT_INCREMENTAL_THRESHOLD,
113 }
114 }
115
116 pub const fn default_unwind() -> Self {
118 Self::Unwind
119 }
120
121 pub const fn new_execution(rebuild_threshold: u64, incremental_threshold: u64) -> Self {
123 Self::Execution { rebuild_threshold, incremental_threshold }
124 }
125
126 pub fn get_execution_checkpoint(
128 &self,
129 provider: &impl StageCheckpointReader,
130 ) -> Result<Option<MerkleCheckpoint>, StageError> {
131 let buf =
132 provider.get_stage_checkpoint_progress(StageId::MerkleExecute)?.unwrap_or_default();
133
134 if buf.is_empty() {
135 return Ok(None)
136 }
137
138 let (checkpoint, _) = MerkleCheckpoint::from_compact(&buf, buf.len());
139 Ok(Some(checkpoint))
140 }
141
142 pub fn save_execution_checkpoint(
144 &self,
145 provider: &impl StageCheckpointWriter,
146 checkpoint: Option<MerkleCheckpoint>,
147 ) -> Result<(), StageError> {
148 let mut buf = vec![];
149 if let Some(checkpoint) = checkpoint {
150 debug!(
151 target: "sync::stages::merkle::exec",
152 last_account_key = ?checkpoint.last_account_key,
153 "Saving inner merkle checkpoint"
154 );
155 checkpoint.to_compact(&mut buf);
156 }
157 Ok(provider.save_stage_checkpoint_progress(StageId::MerkleExecute, buf)?)
158 }
159}
160
161impl<Provider> Stage<Provider> for MerkleStage
162where
163 Provider: DBProvider<Tx: DbTxMut>
164 + TrieWriter
165 + StatsReader
166 + HeaderProvider
167 + ChangeSetReader
168 + StorageChangeSetReader
169 + StorageSettingsCache
170 + StageCheckpointReader
171 + StageCheckpointWriter,
172{
173 fn id(&self) -> StageId {
175 match self {
176 Self::Execution { .. } => StageId::MerkleExecute,
177 Self::Unwind => StageId::MerkleUnwind,
178 #[cfg(any(test, feature = "test-utils"))]
179 Self::Both { .. } => StageId::Other("MerkleBoth"),
180 }
181 }
182
183 fn execute(&mut self, provider: &Provider, input: ExecInput) -> Result<ExecOutput, StageError> {
185 let (threshold, incremental_threshold) = match self {
186 Self::Unwind => {
187 info!(target: "sync::stages::merkle::unwind", "Stage is always skipped");
188 return Ok(ExecOutput::done(StageCheckpoint::new(input.target())))
189 }
190 Self::Execution { rebuild_threshold, incremental_threshold } => {
191 (*rebuild_threshold, *incremental_threshold)
192 }
193 #[cfg(any(test, feature = "test-utils"))]
194 Self::Both { rebuild_threshold, incremental_threshold } => {
195 (*rebuild_threshold, *incremental_threshold)
196 }
197 };
198
199 let range = input.next_block_range();
200 let (from_block, to_block) = range.clone().into_inner();
201 let current_block_number = input.checkpoint().block_number;
202
203 let target_block = provider
204 .header_by_number(to_block)?
205 .ok_or_else(|| ProviderError::HeaderNotFound(to_block.into()))?;
206 let target_block_root = target_block.state_root();
207
208 let (trie_root, entities_checkpoint) = if range.is_empty() {
209 (target_block_root, input.checkpoint().entities_stage_checkpoint().unwrap_or_default())
210 } else if to_block - from_block > threshold || from_block == 1 {
211 let mut checkpoint = self.get_execution_checkpoint(provider)?;
212
213 let mut entities_checkpoint = if let Some(checkpoint) =
215 checkpoint.as_ref().filter(|c| c.target_block == to_block)
216 {
217 debug!(
218 target: "sync::stages::merkle::exec",
219 current = ?current_block_number,
220 target = ?to_block,
221 last_account_key = ?checkpoint.last_account_key,
222 "Continuing inner merkle checkpoint"
223 );
224
225 input.checkpoint().entities_stage_checkpoint()
226 } else {
227 debug!(
228 target: "sync::stages::merkle::exec",
229 current = ?current_block_number,
230 target = ?to_block,
231 previous_checkpoint = ?checkpoint,
232 "Rebuilding trie"
233 );
234 checkpoint = None;
236 self.save_execution_checkpoint(provider, None)?;
237 provider.tx_ref().clear::<tables::AccountsTrie>()?;
238 provider.tx_ref().clear::<tables::StoragesTrie>()?;
239
240 None
241 }
242 .unwrap_or(EntitiesCheckpoint {
243 processed: 0,
244 total: (provider.count_entries::<tables::HashedAccounts>()? +
245 provider.count_entries::<tables::HashedStorages>()?)
246 as u64,
247 });
248
249 let tx = provider.tx_ref();
250 let progress = reth_trie_db::with_adapter!(provider, |A| {
251 DbStateRoot::<_, A>::from_tx(tx)
252 .with_intermediate_state(checkpoint.map(IntermediateStateRootState::from))
253 .root_with_progress()
254 })
255 .map_err(|e| {
256 error!(target: "sync::stages::merkle", %e, ?current_block_number, ?to_block, "State root with progress failed! {INVALID_STATE_ROOT_ERROR_MESSAGE}");
257 StageError::Fatal(Box::new(e))
258 })?;
259 match progress {
260 StateRootProgress::Progress(state, hashed_entries_walked, updates) => {
261 provider.write_trie_updates(updates)?;
262
263 let mut checkpoint = MerkleCheckpoint::new(
264 to_block,
265 state.account_root_state.last_hashed_key,
266 state
267 .account_root_state
268 .walker_stack
269 .into_iter()
270 .map(StoredSubNode::from)
271 .collect(),
272 state.account_root_state.hash_builder.into(),
273 );
274
275 if let Some(storage_state) = state.storage_root_state {
277 checkpoint.storage_root_checkpoint =
278 Some(StorageRootMerkleCheckpoint::new(
279 storage_state.state.last_hashed_key,
280 storage_state
281 .state
282 .walker_stack
283 .into_iter()
284 .map(StoredSubNode::from)
285 .collect(),
286 storage_state.state.hash_builder.into(),
287 storage_state.account.nonce,
288 storage_state.account.balance,
289 storage_state.account.bytecode_hash.unwrap_or(KECCAK_EMPTY),
290 ));
291 }
292 self.save_execution_checkpoint(provider, Some(checkpoint))?;
293
294 entities_checkpoint.processed += hashed_entries_walked as u64;
295
296 return Ok(ExecOutput {
297 checkpoint: input
298 .checkpoint()
299 .with_entities_stage_checkpoint(entities_checkpoint),
300 done: false,
301 })
302 }
303 StateRootProgress::Complete(root, hashed_entries_walked, updates) => {
304 provider.write_trie_updates(updates)?;
305
306 entities_checkpoint.processed += hashed_entries_walked as u64;
307
308 (root, entities_checkpoint)
309 }
310 }
311 } else {
312 debug!(target: "sync::stages::merkle::exec", current = ?current_block_number, target = ?to_block, "Updating trie in chunks");
313 let mut final_root = None;
314 for start_block in range.step_by(incremental_threshold as usize) {
315 let chunk_to = std::cmp::min(start_block + incremental_threshold, to_block);
316 let chunk_range = start_block..=chunk_to;
317 debug!(
318 target: "sync::stages::merkle::exec",
319 current = ?current_block_number,
320 target = ?to_block,
321 incremental_threshold,
322 chunk_range = ?chunk_range,
323 "Processing chunk"
324 );
325 let (root, updates) = reth_trie_db::with_adapter!(provider, |A| {
326 DbStateRoot::<_, A>::incremental_root_with_updates(provider, chunk_range)
327 })
328 .map_err(|e| {
329 error!(target: "sync::stages::merkle", %e, ?current_block_number, ?to_block, "Incremental state root failed! {INVALID_STATE_ROOT_ERROR_MESSAGE}");
330 StageError::Fatal(Box::new(e))
331 })?;
332 provider.write_trie_updates(updates)?;
333 final_root = Some(root);
334 }
335
336 let final_root = final_root.ok_or(StageError::Fatal(
338 "Incremental merkle hashing did not produce a final root".into(),
339 ))?;
340
341 let total_hashed_entries = (provider.count_entries::<tables::HashedAccounts>()? +
342 provider.count_entries::<tables::HashedStorages>()?)
343 as u64;
344
345 let entities_checkpoint = EntitiesCheckpoint {
346 processed: total_hashed_entries,
350 total: total_hashed_entries,
351 };
352 (final_root, entities_checkpoint)
354 };
355
356 self.save_execution_checkpoint(provider, None)?;
358
359 validate_state_root(trie_root, SealedHeader::seal_slow(target_block), to_block)?;
360
361 Ok(ExecOutput {
362 checkpoint: StageCheckpoint::new(to_block)
363 .with_entities_stage_checkpoint(entities_checkpoint),
364 done: true,
365 })
366 }
367
368 fn unwind(
370 &mut self,
371 provider: &Provider,
372 input: UnwindInput,
373 ) -> Result<UnwindOutput, StageError> {
374 let tx = provider.tx_ref();
375 let range = input.unwind_block_range();
376 if matches!(self, Self::Execution { .. }) {
377 info!(target: "sync::stages::merkle::unwind", "Stage is always skipped");
378 return Ok(UnwindOutput { checkpoint: StageCheckpoint::new(input.unwind_to) })
379 }
380
381 let mut entities_checkpoint =
382 input.checkpoint.entities_stage_checkpoint().unwrap_or(EntitiesCheckpoint {
383 processed: 0,
384 total: (tx.entries::<tables::HashedAccounts>()? +
385 tx.entries::<tables::HashedStorages>()?) as u64,
386 });
387
388 if input.unwind_to == 0 {
389 tx.clear::<tables::AccountsTrie>()?;
390 tx.clear::<tables::StoragesTrie>()?;
391
392 entities_checkpoint.processed = 0;
393
394 return Ok(UnwindOutput {
395 checkpoint: StageCheckpoint::new(input.unwind_to)
396 .with_entities_stage_checkpoint(entities_checkpoint),
397 })
398 }
399
400 if range.is_empty() {
402 info!(target: "sync::stages::merkle::unwind", "Nothing to unwind");
403 } else {
404 let (block_root, updates) = reth_trie_db::with_adapter!(provider, |A| {
405 DbStateRoot::<_, A>::incremental_root_with_updates(provider, range)
406 })
407 .map_err(|e| StageError::Fatal(Box::new(e)))?;
408
409 let target = provider
411 .header_by_number(input.unwind_to)?
412 .ok_or_else(|| ProviderError::HeaderNotFound(input.unwind_to.into()))?;
413
414 validate_state_root(block_root, SealedHeader::seal_slow(target), input.unwind_to)?;
415
416 provider.write_trie_updates(updates)?;
418
419 let accounts = tx.entries::<tables::HashedAccounts>()?;
422 let storages = tx.entries::<tables::HashedStorages>()?;
423 let total = (accounts + storages) as u64;
424 entities_checkpoint.total = total;
425 entities_checkpoint.processed = total;
426 }
427
428 Ok(UnwindOutput {
429 checkpoint: StageCheckpoint::new(input.unwind_to)
430 .with_entities_stage_checkpoint(entities_checkpoint),
431 })
432 }
433}
434
435#[inline]
437fn validate_state_root<H: BlockHeader + Sealable + Debug>(
438 got: B256,
439 expected: SealedHeader<H>,
440 target_block: BlockNumber,
441) -> Result<(), StageError> {
442 if got == expected.state_root() {
443 Ok(())
444 } else {
445 error!(target: "sync::stages::merkle", ?target_block, ?got, ?expected, "Failed to verify block state root! {INVALID_STATE_ROOT_ERROR_MESSAGE}");
446 Err(StageError::Block {
447 error: BlockErrorKind::Validation(ConsensusError::BodyStateRootDiff(
448 GotExpected { got, expected: expected.state_root() }.into(),
449 )),
450 block: Box::new(expected.block_with_parent()),
451 })
452 }
453}
454
455#[cfg(test)]
456mod tests {
457 use super::*;
458 use crate::test_utils::{
459 stage_test_suite_ext, ExecuteStageTestRunner, StageTestRunner, StorageKind,
460 TestRunnerError, TestStageDB, UnwindStageTestRunner,
461 };
462 use alloy_primitives::{keccak256, U256};
463 use assert_matches::assert_matches;
464 use reth_db_api::cursor::{DbCursorRO, DbCursorRW, DbDupCursorRO};
465 use reth_primitives_traits::{SealedBlock, StorageEntry};
466 use reth_provider::{providers::StaticFileWriter, StaticFileProviderFactory};
467 use reth_stages_api::StageUnitCheckpoint;
468 use reth_static_file_types::StaticFileSegment;
469 use reth_testing_utils::generators::{
470 self, random_block, random_block_range, random_changeset_range,
471 random_contract_account_range, BlockParams, BlockRangeParams,
472 };
473 use reth_trie::test_utils::{state_root, state_root_prehashed};
474 use std::collections::BTreeMap;
475
476 stage_test_suite_ext!(MerkleTestRunner, merkle);
477
478 #[tokio::test]
480 async fn execute_clean_merkle() {
481 let (previous_stage, stage_progress) = (500, 0);
482
483 let mut runner = MerkleTestRunner::default();
485 let input = ExecInput {
487 target: Some(previous_stage),
488 checkpoint: Some(StageCheckpoint::new(stage_progress)),
489 };
490
491 runner.seed_execution(input).expect("failed to seed execution");
492
493 let rx = runner.execute(input);
494
495 let result = rx.await.unwrap();
497 assert_matches!(
498 result,
499 Ok(ExecOutput {
500 checkpoint: StageCheckpoint {
501 block_number,
502 stage_checkpoint: Some(StageUnitCheckpoint::Entities(EntitiesCheckpoint {
503 processed,
504 total
505 }))
506 },
507 done: true
508 }) if block_number == previous_stage && processed == total &&
509 total == (
510 runner.db.count_entries::<tables::HashedAccounts>().unwrap() +
511 runner.db.count_entries::<tables::HashedStorages>().unwrap()
512 ) as u64
513 );
514
515 assert!(runner.validate_execution(input, result.ok()).is_ok(), "execution validation");
517 }
518
519 #[tokio::test]
521 async fn execute_small_merkle() {
522 let (previous_stage, stage_progress) = (2, 1);
523
524 let mut runner = MerkleTestRunner::default();
526 let input = ExecInput {
527 target: Some(previous_stage),
528 checkpoint: Some(StageCheckpoint::new(stage_progress)),
529 };
530
531 runner.seed_execution(input).expect("failed to seed execution");
532
533 let rx = runner.execute(input);
534
535 let result = rx.await.unwrap();
537 assert_matches!(
538 result,
539 Ok(ExecOutput {
540 checkpoint: StageCheckpoint {
541 block_number,
542 stage_checkpoint: Some(StageUnitCheckpoint::Entities(EntitiesCheckpoint {
543 processed,
544 total
545 }))
546 },
547 done: true
548 }) if block_number == previous_stage && processed == total &&
549 total == (
550 runner.db.count_entries::<tables::HashedAccounts>().unwrap() +
551 runner.db.count_entries::<tables::HashedStorages>().unwrap()
552 ) as u64
553 );
554
555 assert!(runner.validate_execution(input, result.ok()).is_ok(), "execution validation");
557 }
558
559 #[tokio::test]
560 async fn execute_chunked_merkle() {
561 let (previous_stage, stage_progress) = (200, 100);
562 let clean_threshold = 100;
563 let incremental_threshold = 10;
564
565 let mut runner =
567 MerkleTestRunner { db: TestStageDB::default(), clean_threshold, incremental_threshold };
568
569 let input = ExecInput {
570 target: Some(previous_stage),
571 checkpoint: Some(StageCheckpoint::new(stage_progress)),
572 };
573
574 runner.seed_execution(input).expect("failed to seed execution");
575 let rx = runner.execute(input);
576
577 let result = rx.await.unwrap();
579 assert_matches!(
580 result,
581 Ok(ExecOutput {
582 checkpoint: StageCheckpoint {
583 block_number,
584 stage_checkpoint: Some(StageUnitCheckpoint::Entities(EntitiesCheckpoint {
585 processed,
586 total
587 }))
588 },
589 done: true
590 }) if block_number == previous_stage && processed == total &&
591 total == (
592 runner.db.count_entries::<tables::HashedAccounts>().unwrap() +
593 runner.db.count_entries::<tables::HashedStorages>().unwrap()
594 ) as u64
595 );
596
597 let provider = runner.db.factory.provider().unwrap();
599 let header = provider.header_by_number(previous_stage).unwrap().unwrap();
600 let expected_root = header.state_root;
601
602 let actual_root = runner
603 .db
604 .query_with_provider(|provider| {
605 Ok(reth_trie_db::with_adapter!(provider, |A| {
606 DbStateRoot::<_, A>::incremental_root_with_updates(
607 &provider,
608 stage_progress + 1..=previous_stage,
609 )
610 }))
611 })
612 .unwrap();
613
614 assert_eq!(
615 actual_root.unwrap().0,
616 expected_root,
617 "State root mismatch after chunked processing"
618 );
619 }
620
621 struct MerkleTestRunner {
622 db: TestStageDB,
623 clean_threshold: u64,
624 incremental_threshold: u64,
625 }
626
627 impl Default for MerkleTestRunner {
628 fn default() -> Self {
629 Self {
630 db: TestStageDB::default(),
631 clean_threshold: 10000,
632 incremental_threshold: 10000,
633 }
634 }
635 }
636
637 impl StageTestRunner for MerkleTestRunner {
638 type S = MerkleStage;
639
640 fn db(&self) -> &TestStageDB {
641 &self.db
642 }
643
644 fn stage(&self) -> Self::S {
645 Self::S::Both {
646 rebuild_threshold: self.clean_threshold,
647 incremental_threshold: self.incremental_threshold,
648 }
649 }
650 }
651
652 impl ExecuteStageTestRunner for MerkleTestRunner {
653 type Seed = Vec<SealedBlock<reth_ethereum_primitives::Block>>;
654
655 fn seed_execution(&mut self, input: ExecInput) -> Result<Self::Seed, TestRunnerError> {
656 let stage_progress = input.checkpoint().block_number;
657 let start = stage_progress + 1;
658 let end = input.target();
659 let mut rng = generators::rng();
660
661 let mut preblocks = vec![];
662 if stage_progress > 0 {
663 preblocks.append(&mut random_block_range(
664 &mut rng,
665 0..=stage_progress - 1,
666 BlockRangeParams {
667 parent: Some(B256::ZERO),
668 tx_count: 0..1,
669 ..Default::default()
670 },
671 ));
672 self.db.insert_blocks(preblocks.iter(), StorageKind::Static)?;
673 }
674
675 let num_of_accounts = 31;
676 let accounts = random_contract_account_range(&mut rng, &mut (0..num_of_accounts))
677 .into_iter()
678 .collect::<BTreeMap<_, _>>();
679
680 self.db.insert_accounts_and_storages(
681 accounts.iter().map(|(addr, acc)| (*addr, (*acc, std::iter::empty()))),
682 )?;
683
684 let (header, body) = random_block(
685 &mut rng,
686 stage_progress,
687 BlockParams { parent: preblocks.last().map(|b| b.hash()), ..Default::default() },
688 )
689 .split_sealed_header_body();
690 let mut header = header.unseal();
691
692 header.state_root = state_root(
693 accounts
694 .clone()
695 .into_iter()
696 .map(|(address, account)| (address, (account, std::iter::empty()))),
697 );
698 let sealed_head = SealedBlock::<reth_ethereum_primitives::Block>::from_sealed_parts(
699 SealedHeader::seal_slow(header),
700 body,
701 );
702
703 let head_hash = sealed_head.hash();
704 let mut blocks = vec![sealed_head];
705 blocks.extend(random_block_range(
706 &mut rng,
707 start..=end,
708 BlockRangeParams { parent: Some(head_hash), tx_count: 0..3, ..Default::default() },
709 ));
710 let last_block = blocks.last().cloned().unwrap();
711 self.db.insert_blocks(blocks.iter(), StorageKind::Static)?;
712
713 let (transitions, final_state) = random_changeset_range(
714 &mut rng,
715 blocks.iter(),
716 accounts.into_iter().map(|(addr, acc)| (addr, (acc, Vec::new()))),
717 0..3,
718 0..256,
719 );
720 self.db.insert_changesets(transitions, Some(start))?;
722 self.db.insert_accounts_and_storages(final_state)?;
723
724 let root = self.db.query(|tx| {
726 let mut accounts = BTreeMap::default();
727 let mut accounts_cursor = tx.cursor_read::<tables::HashedAccounts>()?;
728 let mut storage_cursor = tx.cursor_dup_read::<tables::HashedStorages>()?;
729 for entry in accounts_cursor.walk_range(..)? {
730 let (key, account) = entry?;
731 let mut storage_entries = Vec::new();
732 let mut entry = storage_cursor.seek_exact(key)?;
733 while let Some((_, storage)) = entry {
734 storage_entries.push(storage);
735 entry = storage_cursor.next_dup()?;
736 }
737 let storage = storage_entries
738 .into_iter()
739 .filter(|v| !v.value.is_zero())
740 .map(|v| (v.key, v.value))
741 .collect::<Vec<_>>();
742 accounts.insert(key, (account, storage));
743 }
744
745 Ok(state_root_prehashed(accounts))
746 })?;
747
748 let static_file_provider = self.db.factory.static_file_provider();
749 let mut writer =
750 static_file_provider.latest_writer(StaticFileSegment::Headers).unwrap();
751 let mut last_header = last_block.clone_sealed_header();
752 last_header.set_state_root(root);
753
754 let hash = last_header.hash_slow();
755 writer.prune_headers(1).unwrap();
756 writer.commit().unwrap();
757 writer.append_header(&last_header, &hash).unwrap();
758 writer.commit().unwrap();
759
760 Ok(blocks)
761 }
762
763 fn validate_execution(
764 &self,
765 _input: ExecInput,
766 _output: Option<ExecOutput>,
767 ) -> Result<(), TestRunnerError> {
768 Ok(())
770 }
771 }
772
773 impl UnwindStageTestRunner for MerkleTestRunner {
774 fn validate_unwind(&self, _input: UnwindInput) -> Result<(), TestRunnerError> {
775 Ok(())
777 }
778
779 fn before_unwind(&self, input: UnwindInput) -> Result<(), TestRunnerError> {
780 let target_block = input.unwind_to + 1;
781
782 self.db
783 .commit(|tx| {
784 let mut storage_changesets_cursor =
785 tx.cursor_dup_read::<tables::StorageChangeSets>().unwrap();
786 let mut storage_cursor =
787 tx.cursor_dup_write::<tables::HashedStorages>().unwrap();
788
789 let mut tree: BTreeMap<B256, BTreeMap<B256, U256>> = BTreeMap::new();
790
791 let mut rev_changeset_walker =
792 storage_changesets_cursor.walk_back(None).unwrap();
793 while let Some((bn_address, entry)) =
794 rev_changeset_walker.next().transpose().unwrap()
795 {
796 if bn_address.block_number() < target_block {
797 break
798 }
799
800 tree.entry(keccak256(bn_address.address()))
801 .or_default()
802 .insert(keccak256(entry.key), entry.value);
803 }
804 for (hashed_address, storage) in tree {
805 for (hashed_slot, value) in storage {
806 let storage_entry = storage_cursor
807 .seek_by_key_subkey(hashed_address, hashed_slot)
808 .unwrap();
809 if storage_entry.is_some_and(|v| v.key == hashed_slot) {
810 storage_cursor.delete_current().unwrap();
811 }
812
813 if !value.is_zero() {
814 let storage_entry = StorageEntry { key: hashed_slot, value };
815 storage_cursor.upsert(hashed_address, &storage_entry).unwrap();
816 }
817 }
818 }
819
820 let mut changeset_cursor =
821 tx.cursor_dup_write::<tables::AccountChangeSets>().unwrap();
822 let mut rev_changeset_walker = changeset_cursor.walk_back(None).unwrap();
823
824 while let Some((block_number, account_before_tx)) =
825 rev_changeset_walker.next().transpose().unwrap()
826 {
827 if block_number < target_block {
828 break
829 }
830
831 if let Some(acc) = account_before_tx.info {
832 tx.put::<tables::HashedAccounts>(
833 keccak256(account_before_tx.address),
834 acc,
835 )
836 .unwrap();
837 } else {
838 tx.delete::<tables::HashedAccounts>(
839 keccak256(account_before_tx.address),
840 None,
841 )
842 .unwrap();
843 }
844 }
845 Ok(())
846 })
847 .unwrap();
848 Ok(())
849 }
850 }
851}