1use alloy_consensus::{constants::KECCAK_EMPTY, BlockHeader};
2use alloy_primitives::{BlockNumber, Sealable, B256};
3use reth_codecs::Compact;
4use reth_consensus::ConsensusError;
5use reth_db_api::{
6 tables,
7 transaction::{DbTx, DbTxMut},
8};
9use reth_primitives_traits::{GotExpected, SealedHeader};
10use reth_provider::{
11 DBProvider, HeaderProvider, ProviderError, StageCheckpointReader, StageCheckpointWriter,
12 StatsReader, TrieWriter,
13};
14use reth_stages_api::{
15 BlockErrorKind, EntitiesCheckpoint, ExecInput, ExecOutput, MerkleCheckpoint, Stage,
16 StageCheckpoint, StageError, StageId, StorageRootMerkleCheckpoint, UnwindInput, UnwindOutput,
17};
18use reth_trie::{IntermediateStateRootState, StateRoot, StateRootProgress, StoredSubNode};
19use reth_trie_db::DatabaseStateRoot;
20use std::fmt::Debug;
21use tracing::*;
22
23pub const INVALID_STATE_ROOT_ERROR_MESSAGE: &str = r#"
28Invalid state root error on stage verification!
29This is an error that likely requires a report to the reth team with additional information.
30Please include the following information in your report:
31 * This error message
32 * The state root of the block that was rejected
33 * The output of `reth db stats --checksum` from the database that was being used. This will take a long time to run!
34 * 50-100 lines of logs before and after the first occurrence of the log message with the state root of the block that was rejected.
35 * The debug logs from __the same time period__. To find the default location for these logs, run:
36 `reth --help | grep -A 4 'log.file.directory'`
37
38Once you have this information, please submit a github issue at https://github.com/paradigmxyz/reth/issues/new
39"#;
40
41pub const MERKLE_STAGE_DEFAULT_REBUILD_THRESHOLD: u64 = 100_000;
44
45pub const MERKLE_STAGE_DEFAULT_INCREMENTAL_THRESHOLD: u64 = 7_000;
50
51#[derive(Debug, Clone)]
73pub enum MerkleStage {
74 Execution {
76 rebuild_threshold: u64,
81 incremental_threshold: u64,
85 },
86 Unwind,
88 #[cfg(any(test, feature = "test-utils"))]
90 Both {
91 rebuild_threshold: u64,
94 incremental_threshold: u64,
98 },
99}
100
101impl MerkleStage {
102 pub const fn default_execution() -> Self {
104 Self::Execution {
105 rebuild_threshold: MERKLE_STAGE_DEFAULT_REBUILD_THRESHOLD,
106 incremental_threshold: MERKLE_STAGE_DEFAULT_INCREMENTAL_THRESHOLD,
107 }
108 }
109
110 pub const fn default_unwind() -> Self {
112 Self::Unwind
113 }
114
115 pub const fn new_execution(rebuild_threshold: u64, incremental_threshold: u64) -> Self {
117 Self::Execution { rebuild_threshold, incremental_threshold }
118 }
119
120 pub fn get_execution_checkpoint(
122 &self,
123 provider: &impl StageCheckpointReader,
124 ) -> Result<Option<MerkleCheckpoint>, StageError> {
125 let buf =
126 provider.get_stage_checkpoint_progress(StageId::MerkleExecute)?.unwrap_or_default();
127
128 if buf.is_empty() {
129 return Ok(None)
130 }
131
132 let (checkpoint, _) = MerkleCheckpoint::from_compact(&buf, buf.len());
133 Ok(Some(checkpoint))
134 }
135
136 pub fn save_execution_checkpoint(
138 &self,
139 provider: &impl StageCheckpointWriter,
140 checkpoint: Option<MerkleCheckpoint>,
141 ) -> Result<(), StageError> {
142 let mut buf = vec![];
143 if let Some(checkpoint) = checkpoint {
144 debug!(
145 target: "sync::stages::merkle::exec",
146 last_account_key = ?checkpoint.last_account_key,
147 "Saving inner merkle checkpoint"
148 );
149 checkpoint.to_compact(&mut buf);
150 }
151 Ok(provider.save_stage_checkpoint_progress(StageId::MerkleExecute, buf)?)
152 }
153}
154
155impl<Provider> Stage<Provider> for MerkleStage
156where
157 Provider: DBProvider<Tx: DbTxMut>
158 + TrieWriter
159 + StatsReader
160 + HeaderProvider
161 + StageCheckpointReader
162 + StageCheckpointWriter,
163{
164 fn id(&self) -> StageId {
166 match self {
167 Self::Execution { .. } => StageId::MerkleExecute,
168 Self::Unwind => StageId::MerkleUnwind,
169 #[cfg(any(test, feature = "test-utils"))]
170 Self::Both { .. } => StageId::Other("MerkleBoth"),
171 }
172 }
173
174 fn execute(&mut self, provider: &Provider, input: ExecInput) -> Result<ExecOutput, StageError> {
176 let (threshold, incremental_threshold) = match self {
177 Self::Unwind => {
178 info!(target: "sync::stages::merkle::unwind", "Stage is always skipped");
179 return Ok(ExecOutput::done(StageCheckpoint::new(input.target())))
180 }
181 Self::Execution { rebuild_threshold, incremental_threshold } => {
182 (*rebuild_threshold, *incremental_threshold)
183 }
184 #[cfg(any(test, feature = "test-utils"))]
185 Self::Both { rebuild_threshold, incremental_threshold } => {
186 (*rebuild_threshold, *incremental_threshold)
187 }
188 };
189
190 let range = input.next_block_range();
191 let (from_block, to_block) = range.clone().into_inner();
192 let current_block_number = input.checkpoint().block_number;
193
194 let target_block = provider
195 .header_by_number(to_block)?
196 .ok_or_else(|| ProviderError::HeaderNotFound(to_block.into()))?;
197 let target_block_root = target_block.state_root();
198
199 let (trie_root, entities_checkpoint) = if range.is_empty() {
200 (target_block_root, input.checkpoint().entities_stage_checkpoint().unwrap_or_default())
201 } else if to_block - from_block > threshold || from_block == 1 {
202 let mut checkpoint = self.get_execution_checkpoint(provider)?;
203
204 let mut entities_checkpoint = if let Some(checkpoint) =
206 checkpoint.as_ref().filter(|c| c.target_block == to_block)
207 {
208 debug!(
209 target: "sync::stages::merkle::exec",
210 current = ?current_block_number,
211 target = ?to_block,
212 last_account_key = ?checkpoint.last_account_key,
213 "Continuing inner merkle checkpoint"
214 );
215
216 input.checkpoint().entities_stage_checkpoint()
217 } else {
218 debug!(
219 target: "sync::stages::merkle::exec",
220 current = ?current_block_number,
221 target = ?to_block,
222 previous_checkpoint = ?checkpoint,
223 "Rebuilding trie"
224 );
225 checkpoint = None;
227 self.save_execution_checkpoint(provider, None)?;
228 provider.tx_ref().clear::<tables::AccountsTrie>()?;
229 provider.tx_ref().clear::<tables::StoragesTrie>()?;
230
231 None
232 }
233 .unwrap_or(EntitiesCheckpoint {
234 processed: 0,
235 total: (provider.count_entries::<tables::HashedAccounts>()? +
236 provider.count_entries::<tables::HashedStorages>()?)
237 as u64,
238 });
239
240 let tx = provider.tx_ref();
241 let progress = StateRoot::from_tx(tx)
242 .with_intermediate_state(checkpoint.map(IntermediateStateRootState::from))
243 .root_with_progress()
244 .map_err(|e| {
245 error!(target: "sync::stages::merkle", %e, ?current_block_number, ?to_block, "State root with progress failed! {INVALID_STATE_ROOT_ERROR_MESSAGE}");
246 StageError::Fatal(Box::new(e))
247 })?;
248 match progress {
249 StateRootProgress::Progress(state, hashed_entries_walked, updates) => {
250 provider.write_trie_updates(&updates)?;
251
252 let mut checkpoint = MerkleCheckpoint::new(
253 to_block,
254 state.account_root_state.last_hashed_key,
255 state
256 .account_root_state
257 .walker_stack
258 .into_iter()
259 .map(StoredSubNode::from)
260 .collect(),
261 state.account_root_state.hash_builder.into(),
262 );
263
264 if let Some(storage_state) = state.storage_root_state {
266 checkpoint.storage_root_checkpoint =
267 Some(StorageRootMerkleCheckpoint::new(
268 storage_state.state.last_hashed_key,
269 storage_state
270 .state
271 .walker_stack
272 .into_iter()
273 .map(StoredSubNode::from)
274 .collect(),
275 storage_state.state.hash_builder.into(),
276 storage_state.account.nonce,
277 storage_state.account.balance,
278 storage_state.account.bytecode_hash.unwrap_or(KECCAK_EMPTY),
279 ));
280 }
281 self.save_execution_checkpoint(provider, Some(checkpoint))?;
282
283 entities_checkpoint.processed += hashed_entries_walked as u64;
284
285 return Ok(ExecOutput {
286 checkpoint: input
287 .checkpoint()
288 .with_entities_stage_checkpoint(entities_checkpoint),
289 done: false,
290 })
291 }
292 StateRootProgress::Complete(root, hashed_entries_walked, updates) => {
293 provider.write_trie_updates(&updates)?;
294
295 entities_checkpoint.processed += hashed_entries_walked as u64;
296
297 (root, entities_checkpoint)
298 }
299 }
300 } else {
301 debug!(target: "sync::stages::merkle::exec", current = ?current_block_number, target = ?to_block, "Updating trie in chunks");
302 let mut final_root = None;
303 for start_block in range.step_by(incremental_threshold as usize) {
304 let chunk_to = std::cmp::min(start_block + incremental_threshold, to_block);
305 let chunk_range = start_block..=chunk_to;
306 debug!(
307 target: "sync::stages::merkle::exec",
308 current = ?current_block_number,
309 target = ?to_block,
310 incremental_threshold,
311 chunk_range = ?chunk_range,
312 "Processing chunk"
313 );
314 let (root, updates) =
315 StateRoot::incremental_root_with_updates(provider.tx_ref(), chunk_range)
316 .map_err(|e| {
317 error!(target: "sync::stages::merkle", %e, ?current_block_number, ?to_block, "Incremental state root failed! {INVALID_STATE_ROOT_ERROR_MESSAGE}");
318 StageError::Fatal(Box::new(e))
319 })?;
320 provider.write_trie_updates(&updates)?;
321 final_root = Some(root);
322 }
323
324 let final_root = final_root.ok_or(StageError::Fatal(
326 "Incremental merkle hashing did not produce a final root".into(),
327 ))?;
328
329 let total_hashed_entries = (provider.count_entries::<tables::HashedAccounts>()? +
330 provider.count_entries::<tables::HashedStorages>()?)
331 as u64;
332
333 let entities_checkpoint = EntitiesCheckpoint {
334 processed: total_hashed_entries,
338 total: total_hashed_entries,
339 };
340 (final_root, entities_checkpoint)
342 };
343
344 self.save_execution_checkpoint(provider, None)?;
346
347 validate_state_root(trie_root, SealedHeader::seal_slow(target_block), to_block)?;
348
349 Ok(ExecOutput {
350 checkpoint: StageCheckpoint::new(to_block)
351 .with_entities_stage_checkpoint(entities_checkpoint),
352 done: true,
353 })
354 }
355
356 fn unwind(
358 &mut self,
359 provider: &Provider,
360 input: UnwindInput,
361 ) -> Result<UnwindOutput, StageError> {
362 let tx = provider.tx_ref();
363 let range = input.unwind_block_range();
364 if matches!(self, Self::Execution { .. }) {
365 info!(target: "sync::stages::merkle::unwind", "Stage is always skipped");
366 return Ok(UnwindOutput { checkpoint: StageCheckpoint::new(input.unwind_to) })
367 }
368
369 let mut entities_checkpoint =
370 input.checkpoint.entities_stage_checkpoint().unwrap_or(EntitiesCheckpoint {
371 processed: 0,
372 total: (tx.entries::<tables::HashedAccounts>()? +
373 tx.entries::<tables::HashedStorages>()?) as u64,
374 });
375
376 if input.unwind_to == 0 {
377 tx.clear::<tables::AccountsTrie>()?;
378 tx.clear::<tables::StoragesTrie>()?;
379
380 entities_checkpoint.processed = 0;
381
382 return Ok(UnwindOutput {
383 checkpoint: StageCheckpoint::new(input.unwind_to)
384 .with_entities_stage_checkpoint(entities_checkpoint),
385 })
386 }
387
388 if range.is_empty() {
390 info!(target: "sync::stages::merkle::unwind", "Nothing to unwind");
391 } else {
392 let (block_root, updates) = StateRoot::incremental_root_with_updates(tx, range)
393 .map_err(|e| StageError::Fatal(Box::new(e)))?;
394
395 let target = provider
397 .header_by_number(input.unwind_to)?
398 .ok_or_else(|| ProviderError::HeaderNotFound(input.unwind_to.into()))?;
399
400 validate_state_root(block_root, SealedHeader::seal_slow(target), input.unwind_to)?;
401
402 provider.write_trie_updates(&updates)?;
404
405 let accounts = tx.entries::<tables::HashedAccounts>()?;
408 let storages = tx.entries::<tables::HashedStorages>()?;
409 let total = (accounts + storages) as u64;
410 entities_checkpoint.total = total;
411 entities_checkpoint.processed = total;
412 }
413
414 Ok(UnwindOutput {
415 checkpoint: StageCheckpoint::new(input.unwind_to)
416 .with_entities_stage_checkpoint(entities_checkpoint),
417 })
418 }
419}
420
421#[inline]
423fn validate_state_root<H: BlockHeader + Sealable + Debug>(
424 got: B256,
425 expected: SealedHeader<H>,
426 target_block: BlockNumber,
427) -> Result<(), StageError> {
428 if got == expected.state_root() {
429 Ok(())
430 } else {
431 error!(target: "sync::stages::merkle", ?target_block, ?got, ?expected, "Failed to verify block state root! {INVALID_STATE_ROOT_ERROR_MESSAGE}");
432 Err(StageError::Block {
433 error: BlockErrorKind::Validation(ConsensusError::BodyStateRootDiff(
434 GotExpected { got, expected: expected.state_root() }.into(),
435 )),
436 block: Box::new(expected.block_with_parent()),
437 })
438 }
439}
440
441#[cfg(test)]
442mod tests {
443 use super::*;
444 use crate::test_utils::{
445 stage_test_suite_ext, ExecuteStageTestRunner, StageTestRunner, StorageKind,
446 TestRunnerError, TestStageDB, UnwindStageTestRunner,
447 };
448 use alloy_primitives::{keccak256, U256};
449 use assert_matches::assert_matches;
450 use reth_db_api::cursor::{DbCursorRO, DbCursorRW, DbDupCursorRO};
451 use reth_primitives_traits::{SealedBlock, StorageEntry};
452 use reth_provider::{providers::StaticFileWriter, StaticFileProviderFactory};
453 use reth_stages_api::StageUnitCheckpoint;
454 use reth_static_file_types::StaticFileSegment;
455 use reth_testing_utils::generators::{
456 self, random_block, random_block_range, random_changeset_range,
457 random_contract_account_range, BlockParams, BlockRangeParams,
458 };
459 use reth_trie::test_utils::{state_root, state_root_prehashed};
460 use std::collections::BTreeMap;
461
462 stage_test_suite_ext!(MerkleTestRunner, merkle);
463
464 #[tokio::test]
466 async fn execute_clean_merkle() {
467 let (previous_stage, stage_progress) = (500, 0);
468
469 let mut runner = MerkleTestRunner::default();
471 let input = ExecInput {
473 target: Some(previous_stage),
474 checkpoint: Some(StageCheckpoint::new(stage_progress)),
475 };
476
477 runner.seed_execution(input).expect("failed to seed execution");
478
479 let rx = runner.execute(input);
480
481 let result = rx.await.unwrap();
483 assert_matches!(
484 result,
485 Ok(ExecOutput {
486 checkpoint: StageCheckpoint {
487 block_number,
488 stage_checkpoint: Some(StageUnitCheckpoint::Entities(EntitiesCheckpoint {
489 processed,
490 total
491 }))
492 },
493 done: true
494 }) if block_number == previous_stage && processed == total &&
495 total == (
496 runner.db.count_entries::<tables::HashedAccounts>().unwrap() +
497 runner.db.count_entries::<tables::HashedStorages>().unwrap()
498 ) as u64
499 );
500
501 assert!(runner.validate_execution(input, result.ok()).is_ok(), "execution validation");
503 }
504
505 #[tokio::test]
507 async fn execute_small_merkle() {
508 let (previous_stage, stage_progress) = (2, 1);
509
510 let mut runner = MerkleTestRunner::default();
512 let input = ExecInput {
513 target: Some(previous_stage),
514 checkpoint: Some(StageCheckpoint::new(stage_progress)),
515 };
516
517 runner.seed_execution(input).expect("failed to seed execution");
518
519 let rx = runner.execute(input);
520
521 let result = rx.await.unwrap();
523 assert_matches!(
524 result,
525 Ok(ExecOutput {
526 checkpoint: StageCheckpoint {
527 block_number,
528 stage_checkpoint: Some(StageUnitCheckpoint::Entities(EntitiesCheckpoint {
529 processed,
530 total
531 }))
532 },
533 done: true
534 }) if block_number == previous_stage && processed == total &&
535 total == (
536 runner.db.count_entries::<tables::HashedAccounts>().unwrap() +
537 runner.db.count_entries::<tables::HashedStorages>().unwrap()
538 ) as u64
539 );
540
541 assert!(runner.validate_execution(input, result.ok()).is_ok(), "execution validation");
543 }
544
545 #[tokio::test]
546 async fn execute_chunked_merkle() {
547 let (previous_stage, stage_progress) = (200, 100);
548 let clean_threshold = 100;
549 let incremental_threshold = 10;
550
551 let mut runner =
553 MerkleTestRunner { db: TestStageDB::default(), clean_threshold, incremental_threshold };
554
555 let input = ExecInput {
556 target: Some(previous_stage),
557 checkpoint: Some(StageCheckpoint::new(stage_progress)),
558 };
559
560 runner.seed_execution(input).expect("failed to seed execution");
561 let rx = runner.execute(input);
562
563 let result = rx.await.unwrap();
565 assert_matches!(
566 result,
567 Ok(ExecOutput {
568 checkpoint: StageCheckpoint {
569 block_number,
570 stage_checkpoint: Some(StageUnitCheckpoint::Entities(EntitiesCheckpoint {
571 processed,
572 total
573 }))
574 },
575 done: true
576 }) if block_number == previous_stage && processed == total &&
577 total == (
578 runner.db.count_entries::<tables::HashedAccounts>().unwrap() +
579 runner.db.count_entries::<tables::HashedStorages>().unwrap()
580 ) as u64
581 );
582
583 let provider = runner.db.factory.provider().unwrap();
585 let header = provider.header_by_number(previous_stage).unwrap().unwrap();
586 let expected_root = header.state_root;
587
588 let actual_root = runner
589 .db
590 .query(|tx| {
591 Ok(StateRoot::incremental_root_with_updates(
592 tx,
593 stage_progress + 1..=previous_stage,
594 ))
595 })
596 .unwrap();
597
598 assert_eq!(
599 actual_root.unwrap().0,
600 expected_root,
601 "State root mismatch after chunked processing"
602 );
603 }
604
605 struct MerkleTestRunner {
606 db: TestStageDB,
607 clean_threshold: u64,
608 incremental_threshold: u64,
609 }
610
611 impl Default for MerkleTestRunner {
612 fn default() -> Self {
613 Self {
614 db: TestStageDB::default(),
615 clean_threshold: 10000,
616 incremental_threshold: 10000,
617 }
618 }
619 }
620
621 impl StageTestRunner for MerkleTestRunner {
622 type S = MerkleStage;
623
624 fn db(&self) -> &TestStageDB {
625 &self.db
626 }
627
628 fn stage(&self) -> Self::S {
629 Self::S::Both {
630 rebuild_threshold: self.clean_threshold,
631 incremental_threshold: self.incremental_threshold,
632 }
633 }
634 }
635
636 impl ExecuteStageTestRunner for MerkleTestRunner {
637 type Seed = Vec<SealedBlock<reth_ethereum_primitives::Block>>;
638
639 fn seed_execution(&mut self, input: ExecInput) -> Result<Self::Seed, TestRunnerError> {
640 let stage_progress = input.checkpoint().block_number;
641 let start = stage_progress + 1;
642 let end = input.target();
643 let mut rng = generators::rng();
644
645 let mut preblocks = vec![];
646 if stage_progress > 0 {
647 preblocks.append(&mut random_block_range(
648 &mut rng,
649 0..=stage_progress - 1,
650 BlockRangeParams {
651 parent: Some(B256::ZERO),
652 tx_count: 0..1,
653 ..Default::default()
654 },
655 ));
656 self.db.insert_blocks(preblocks.iter(), StorageKind::Static)?;
657 }
658
659 let num_of_accounts = 31;
660 let accounts = random_contract_account_range(&mut rng, &mut (0..num_of_accounts))
661 .into_iter()
662 .collect::<BTreeMap<_, _>>();
663
664 self.db.insert_accounts_and_storages(
665 accounts.iter().map(|(addr, acc)| (*addr, (*acc, std::iter::empty()))),
666 )?;
667
668 let (header, body) = random_block(
669 &mut rng,
670 stage_progress,
671 BlockParams { parent: preblocks.last().map(|b| b.hash()), ..Default::default() },
672 )
673 .split_sealed_header_body();
674 let mut header = header.unseal();
675
676 header.state_root = state_root(
677 accounts
678 .clone()
679 .into_iter()
680 .map(|(address, account)| (address, (account, std::iter::empty()))),
681 );
682 let sealed_head = SealedBlock::<reth_ethereum_primitives::Block>::from_sealed_parts(
683 SealedHeader::seal_slow(header),
684 body,
685 );
686
687 let head_hash = sealed_head.hash();
688 let mut blocks = vec![sealed_head];
689 blocks.extend(random_block_range(
690 &mut rng,
691 start..=end,
692 BlockRangeParams { parent: Some(head_hash), tx_count: 0..3, ..Default::default() },
693 ));
694 let last_block = blocks.last().cloned().unwrap();
695 self.db.insert_blocks(blocks.iter(), StorageKind::Static)?;
696
697 let (transitions, final_state) = random_changeset_range(
698 &mut rng,
699 blocks.iter(),
700 accounts.into_iter().map(|(addr, acc)| (addr, (acc, Vec::new()))),
701 0..3,
702 0..256,
703 );
704 self.db.insert_changesets(transitions, Some(start))?;
706 self.db.insert_accounts_and_storages(final_state)?;
707
708 let root = self.db.query(|tx| {
710 let mut accounts = BTreeMap::default();
711 let mut accounts_cursor = tx.cursor_read::<tables::HashedAccounts>()?;
712 let mut storage_cursor = tx.cursor_dup_read::<tables::HashedStorages>()?;
713 for entry in accounts_cursor.walk_range(..)? {
714 let (key, account) = entry?;
715 let mut storage_entries = Vec::new();
716 let mut entry = storage_cursor.seek_exact(key)?;
717 while let Some((_, storage)) = entry {
718 storage_entries.push(storage);
719 entry = storage_cursor.next_dup()?;
720 }
721 let storage = storage_entries
722 .into_iter()
723 .filter(|v| !v.value.is_zero())
724 .map(|v| (v.key, v.value))
725 .collect::<Vec<_>>();
726 accounts.insert(key, (account, storage));
727 }
728
729 Ok(state_root_prehashed(accounts.into_iter()))
730 })?;
731
732 let static_file_provider = self.db.factory.static_file_provider();
733 let mut writer =
734 static_file_provider.latest_writer(StaticFileSegment::Headers).unwrap();
735 let mut last_header = last_block.clone_sealed_header();
736 last_header.set_state_root(root);
737
738 let hash = last_header.hash_slow();
739 writer.prune_headers(1).unwrap();
740 writer.commit().unwrap();
741 writer.append_header(&last_header, U256::ZERO, &hash).unwrap();
742 writer.commit().unwrap();
743
744 Ok(blocks)
745 }
746
747 fn validate_execution(
748 &self,
749 _input: ExecInput,
750 _output: Option<ExecOutput>,
751 ) -> Result<(), TestRunnerError> {
752 Ok(())
754 }
755 }
756
757 impl UnwindStageTestRunner for MerkleTestRunner {
758 fn validate_unwind(&self, _input: UnwindInput) -> Result<(), TestRunnerError> {
759 Ok(())
761 }
762
763 fn before_unwind(&self, input: UnwindInput) -> Result<(), TestRunnerError> {
764 let target_block = input.unwind_to + 1;
765
766 self.db
767 .commit(|tx| {
768 let mut storage_changesets_cursor =
769 tx.cursor_dup_read::<tables::StorageChangeSets>().unwrap();
770 let mut storage_cursor =
771 tx.cursor_dup_write::<tables::HashedStorages>().unwrap();
772
773 let mut tree: BTreeMap<B256, BTreeMap<B256, U256>> = BTreeMap::new();
774
775 let mut rev_changeset_walker =
776 storage_changesets_cursor.walk_back(None).unwrap();
777 while let Some((bn_address, entry)) =
778 rev_changeset_walker.next().transpose().unwrap()
779 {
780 if bn_address.block_number() < target_block {
781 break
782 }
783
784 tree.entry(keccak256(bn_address.address()))
785 .or_default()
786 .insert(keccak256(entry.key), entry.value);
787 }
788 for (hashed_address, storage) in tree {
789 for (hashed_slot, value) in storage {
790 let storage_entry = storage_cursor
791 .seek_by_key_subkey(hashed_address, hashed_slot)
792 .unwrap();
793 if storage_entry.is_some_and(|v| v.key == hashed_slot) {
794 storage_cursor.delete_current().unwrap();
795 }
796
797 if !value.is_zero() {
798 let storage_entry = StorageEntry { key: hashed_slot, value };
799 storage_cursor.upsert(hashed_address, &storage_entry).unwrap();
800 }
801 }
802 }
803
804 let mut changeset_cursor =
805 tx.cursor_dup_write::<tables::AccountChangeSets>().unwrap();
806 let mut rev_changeset_walker = changeset_cursor.walk_back(None).unwrap();
807
808 while let Some((block_number, account_before_tx)) =
809 rev_changeset_walker.next().transpose().unwrap()
810 {
811 if block_number < target_block {
812 break
813 }
814
815 if let Some(acc) = account_before_tx.info {
816 tx.put::<tables::HashedAccounts>(
817 keccak256(account_before_tx.address),
818 acc,
819 )
820 .unwrap();
821 } else {
822 tx.delete::<tables::HashedAccounts>(
823 keccak256(account_before_tx.address),
824 None,
825 )
826 .unwrap();
827 }
828 }
829 Ok(())
830 })
831 .unwrap();
832 Ok(())
833 }
834 }
835}