1use alloy_consensus::{constants::KECCAK_EMPTY, BlockHeader};
2use alloy_primitives::{BlockNumber, Sealable, B256};
3use reth_codecs::Compact;
4use reth_consensus::ConsensusError;
5use reth_db_api::{
6 tables,
7 transaction::{DbTx, DbTxMut},
8};
9use reth_primitives_traits::{GotExpected, SealedHeader};
10use reth_provider::{
11 ChangeSetReader, DBProvider, HeaderProvider, ProviderError, StageCheckpointReader,
12 StageCheckpointWriter, StatsReader, StorageChangeSetReader, StorageSettingsCache, TrieWriter,
13};
14use reth_stages_api::{
15 BlockErrorKind, EntitiesCheckpoint, ExecInput, ExecOutput, MerkleCheckpoint, Stage,
16 StageCheckpoint, StageError, StageId, StorageRootMerkleCheckpoint, UnwindInput, UnwindOutput,
17};
18use reth_trie::{IntermediateStateRootState, StateRoot, StateRootProgress, StoredSubNode};
19use reth_trie_db::DatabaseStateRoot;
20use std::fmt::Debug;
21use tracing::*;
22
23pub const INVALID_STATE_ROOT_ERROR_MESSAGE: &str = r#"
28Invalid state root error on stage verification!
29This is an error that likely requires a report to the reth team with additional information.
30Please include the following information in your report:
31 * This error message
32 * The state root of the block that was rejected
33 * The output of `reth db stats --checksum` from the database that was being used. This will take a long time to run!
34 * 50-100 lines of logs before and after the first occurrence of the log message with the state root of the block that was rejected.
35 * The debug logs from __the same time period__. To find the default location for these logs, run:
36 `reth --help | grep -A 4 'log.file.directory'`
37
38Once you have this information, please submit a github issue at https://github.com/paradigmxyz/reth/issues/new
39"#;
40
41pub const MERKLE_STAGE_DEFAULT_REBUILD_THRESHOLD: u64 = 100_000;
44
45pub const MERKLE_STAGE_DEFAULT_INCREMENTAL_THRESHOLD: u64 = 7_000;
50
51#[derive(Debug, Clone)]
73pub enum MerkleStage {
74 Execution {
76 rebuild_threshold: u64,
81 incremental_threshold: u64,
85 },
86 Unwind,
88 #[cfg(any(test, feature = "test-utils"))]
90 Both {
91 rebuild_threshold: u64,
94 incremental_threshold: u64,
98 },
99}
100
101impl MerkleStage {
102 pub const fn default_execution() -> Self {
104 Self::Execution {
105 rebuild_threshold: MERKLE_STAGE_DEFAULT_REBUILD_THRESHOLD,
106 incremental_threshold: MERKLE_STAGE_DEFAULT_INCREMENTAL_THRESHOLD,
107 }
108 }
109
110 pub const fn default_unwind() -> Self {
112 Self::Unwind
113 }
114
115 pub const fn new_execution(rebuild_threshold: u64, incremental_threshold: u64) -> Self {
117 Self::Execution { rebuild_threshold, incremental_threshold }
118 }
119
120 pub fn get_execution_checkpoint(
122 &self,
123 provider: &impl StageCheckpointReader,
124 ) -> Result<Option<MerkleCheckpoint>, StageError> {
125 let buf =
126 provider.get_stage_checkpoint_progress(StageId::MerkleExecute)?.unwrap_or_default();
127
128 if buf.is_empty() {
129 return Ok(None)
130 }
131
132 let (checkpoint, _) = MerkleCheckpoint::from_compact(&buf, buf.len());
133 Ok(Some(checkpoint))
134 }
135
136 pub fn save_execution_checkpoint(
138 &self,
139 provider: &impl StageCheckpointWriter,
140 checkpoint: Option<MerkleCheckpoint>,
141 ) -> Result<(), StageError> {
142 let mut buf = vec![];
143 if let Some(checkpoint) = checkpoint {
144 debug!(
145 target: "sync::stages::merkle::exec",
146 last_account_key = ?checkpoint.last_account_key,
147 "Saving inner merkle checkpoint"
148 );
149 checkpoint.to_compact(&mut buf);
150 }
151 Ok(provider.save_stage_checkpoint_progress(StageId::MerkleExecute, buf)?)
152 }
153}
154
155impl<Provider> Stage<Provider> for MerkleStage
156where
157 Provider: DBProvider<Tx: DbTxMut>
158 + TrieWriter
159 + StatsReader
160 + HeaderProvider
161 + ChangeSetReader
162 + StorageChangeSetReader
163 + StorageSettingsCache
164 + StageCheckpointReader
165 + StageCheckpointWriter,
166{
167 fn id(&self) -> StageId {
169 match self {
170 Self::Execution { .. } => StageId::MerkleExecute,
171 Self::Unwind => StageId::MerkleUnwind,
172 #[cfg(any(test, feature = "test-utils"))]
173 Self::Both { .. } => StageId::Other("MerkleBoth"),
174 }
175 }
176
177 fn execute(&mut self, provider: &Provider, input: ExecInput) -> Result<ExecOutput, StageError> {
179 let (threshold, incremental_threshold) = match self {
180 Self::Unwind => {
181 info!(target: "sync::stages::merkle::unwind", "Stage is always skipped");
182 return Ok(ExecOutput::done(StageCheckpoint::new(input.target())))
183 }
184 Self::Execution { rebuild_threshold, incremental_threshold } => {
185 (*rebuild_threshold, *incremental_threshold)
186 }
187 #[cfg(any(test, feature = "test-utils"))]
188 Self::Both { rebuild_threshold, incremental_threshold } => {
189 (*rebuild_threshold, *incremental_threshold)
190 }
191 };
192
193 let range = input.next_block_range();
194 let (from_block, to_block) = range.clone().into_inner();
195 let current_block_number = input.checkpoint().block_number;
196
197 let target_block = provider
198 .header_by_number(to_block)?
199 .ok_or_else(|| ProviderError::HeaderNotFound(to_block.into()))?;
200 let target_block_root = target_block.state_root();
201
202 let (trie_root, entities_checkpoint) = if range.is_empty() {
203 (target_block_root, input.checkpoint().entities_stage_checkpoint().unwrap_or_default())
204 } else if to_block - from_block > threshold || from_block == 1 {
205 let mut checkpoint = self.get_execution_checkpoint(provider)?;
206
207 let mut entities_checkpoint = if let Some(checkpoint) =
209 checkpoint.as_ref().filter(|c| c.target_block == to_block)
210 {
211 debug!(
212 target: "sync::stages::merkle::exec",
213 current = ?current_block_number,
214 target = ?to_block,
215 last_account_key = ?checkpoint.last_account_key,
216 "Continuing inner merkle checkpoint"
217 );
218
219 input.checkpoint().entities_stage_checkpoint()
220 } else {
221 debug!(
222 target: "sync::stages::merkle::exec",
223 current = ?current_block_number,
224 target = ?to_block,
225 previous_checkpoint = ?checkpoint,
226 "Rebuilding trie"
227 );
228 checkpoint = None;
230 self.save_execution_checkpoint(provider, None)?;
231 provider.tx_ref().clear::<tables::AccountsTrie>()?;
232 provider.tx_ref().clear::<tables::StoragesTrie>()?;
233
234 None
235 }
236 .unwrap_or(EntitiesCheckpoint {
237 processed: 0,
238 total: (provider.count_entries::<tables::HashedAccounts>()? +
239 provider.count_entries::<tables::HashedStorages>()?)
240 as u64,
241 });
242
243 let tx = provider.tx_ref();
244 let progress = StateRoot::from_tx(tx)
245 .with_intermediate_state(checkpoint.map(IntermediateStateRootState::from))
246 .root_with_progress()
247 .map_err(|e| {
248 error!(target: "sync::stages::merkle", %e, ?current_block_number, ?to_block, "State root with progress failed! {INVALID_STATE_ROOT_ERROR_MESSAGE}");
249 StageError::Fatal(Box::new(e))
250 })?;
251 match progress {
252 StateRootProgress::Progress(state, hashed_entries_walked, updates) => {
253 provider.write_trie_updates(updates)?;
254
255 let mut checkpoint = MerkleCheckpoint::new(
256 to_block,
257 state.account_root_state.last_hashed_key,
258 state
259 .account_root_state
260 .walker_stack
261 .into_iter()
262 .map(StoredSubNode::from)
263 .collect(),
264 state.account_root_state.hash_builder.into(),
265 );
266
267 if let Some(storage_state) = state.storage_root_state {
269 checkpoint.storage_root_checkpoint =
270 Some(StorageRootMerkleCheckpoint::new(
271 storage_state.state.last_hashed_key,
272 storage_state
273 .state
274 .walker_stack
275 .into_iter()
276 .map(StoredSubNode::from)
277 .collect(),
278 storage_state.state.hash_builder.into(),
279 storage_state.account.nonce,
280 storage_state.account.balance,
281 storage_state.account.bytecode_hash.unwrap_or(KECCAK_EMPTY),
282 ));
283 }
284 self.save_execution_checkpoint(provider, Some(checkpoint))?;
285
286 entities_checkpoint.processed += hashed_entries_walked as u64;
287
288 return Ok(ExecOutput {
289 checkpoint: input
290 .checkpoint()
291 .with_entities_stage_checkpoint(entities_checkpoint),
292 done: false,
293 })
294 }
295 StateRootProgress::Complete(root, hashed_entries_walked, updates) => {
296 provider.write_trie_updates(updates)?;
297
298 entities_checkpoint.processed += hashed_entries_walked as u64;
299
300 (root, entities_checkpoint)
301 }
302 }
303 } else {
304 debug!(target: "sync::stages::merkle::exec", current = ?current_block_number, target = ?to_block, "Updating trie in chunks");
305 let mut final_root = None;
306 for start_block in range.step_by(incremental_threshold as usize) {
307 let chunk_to = std::cmp::min(start_block + incremental_threshold, to_block);
308 let chunk_range = start_block..=chunk_to;
309 debug!(
310 target: "sync::stages::merkle::exec",
311 current = ?current_block_number,
312 target = ?to_block,
313 incremental_threshold,
314 chunk_range = ?chunk_range,
315 "Processing chunk"
316 );
317 let (root, updates) =
318 StateRoot::incremental_root_with_updates(provider, chunk_range)
319 .map_err(|e| {
320 error!(target: "sync::stages::merkle", %e, ?current_block_number, ?to_block, "Incremental state root failed! {INVALID_STATE_ROOT_ERROR_MESSAGE}");
321 StageError::Fatal(Box::new(e))
322 })?;
323 provider.write_trie_updates(updates)?;
324 final_root = Some(root);
325 }
326
327 let final_root = final_root.ok_or(StageError::Fatal(
329 "Incremental merkle hashing did not produce a final root".into(),
330 ))?;
331
332 let total_hashed_entries = (provider.count_entries::<tables::HashedAccounts>()? +
333 provider.count_entries::<tables::HashedStorages>()?)
334 as u64;
335
336 let entities_checkpoint = EntitiesCheckpoint {
337 processed: total_hashed_entries,
341 total: total_hashed_entries,
342 };
343 (final_root, entities_checkpoint)
345 };
346
347 self.save_execution_checkpoint(provider, None)?;
349
350 validate_state_root(trie_root, SealedHeader::seal_slow(target_block), to_block)?;
351
352 Ok(ExecOutput {
353 checkpoint: StageCheckpoint::new(to_block)
354 .with_entities_stage_checkpoint(entities_checkpoint),
355 done: true,
356 })
357 }
358
359 fn unwind(
361 &mut self,
362 provider: &Provider,
363 input: UnwindInput,
364 ) -> Result<UnwindOutput, StageError> {
365 let tx = provider.tx_ref();
366 let range = input.unwind_block_range();
367 if matches!(self, Self::Execution { .. }) {
368 info!(target: "sync::stages::merkle::unwind", "Stage is always skipped");
369 return Ok(UnwindOutput { checkpoint: StageCheckpoint::new(input.unwind_to) })
370 }
371
372 let mut entities_checkpoint =
373 input.checkpoint.entities_stage_checkpoint().unwrap_or(EntitiesCheckpoint {
374 processed: 0,
375 total: (tx.entries::<tables::HashedAccounts>()? +
376 tx.entries::<tables::HashedStorages>()?) as u64,
377 });
378
379 if input.unwind_to == 0 {
380 tx.clear::<tables::AccountsTrie>()?;
381 tx.clear::<tables::StoragesTrie>()?;
382
383 entities_checkpoint.processed = 0;
384
385 return Ok(UnwindOutput {
386 checkpoint: StageCheckpoint::new(input.unwind_to)
387 .with_entities_stage_checkpoint(entities_checkpoint),
388 })
389 }
390
391 if range.is_empty() {
393 info!(target: "sync::stages::merkle::unwind", "Nothing to unwind");
394 } else {
395 let (block_root, updates) = StateRoot::incremental_root_with_updates(provider, range)
396 .map_err(|e| StageError::Fatal(Box::new(e)))?;
397
398 let target = provider
400 .header_by_number(input.unwind_to)?
401 .ok_or_else(|| ProviderError::HeaderNotFound(input.unwind_to.into()))?;
402
403 validate_state_root(block_root, SealedHeader::seal_slow(target), input.unwind_to)?;
404
405 provider.write_trie_updates(updates)?;
407
408 let accounts = tx.entries::<tables::HashedAccounts>()?;
411 let storages = tx.entries::<tables::HashedStorages>()?;
412 let total = (accounts + storages) as u64;
413 entities_checkpoint.total = total;
414 entities_checkpoint.processed = total;
415 }
416
417 Ok(UnwindOutput {
418 checkpoint: StageCheckpoint::new(input.unwind_to)
419 .with_entities_stage_checkpoint(entities_checkpoint),
420 })
421 }
422}
423
424#[inline]
426fn validate_state_root<H: BlockHeader + Sealable + Debug>(
427 got: B256,
428 expected: SealedHeader<H>,
429 target_block: BlockNumber,
430) -> Result<(), StageError> {
431 if got == expected.state_root() {
432 Ok(())
433 } else {
434 error!(target: "sync::stages::merkle", ?target_block, ?got, ?expected, "Failed to verify block state root! {INVALID_STATE_ROOT_ERROR_MESSAGE}");
435 Err(StageError::Block {
436 error: BlockErrorKind::Validation(ConsensusError::BodyStateRootDiff(
437 GotExpected { got, expected: expected.state_root() }.into(),
438 )),
439 block: Box::new(expected.block_with_parent()),
440 })
441 }
442}
443
444#[cfg(test)]
445mod tests {
446 use super::*;
447 use crate::test_utils::{
448 stage_test_suite_ext, ExecuteStageTestRunner, StageTestRunner, StorageKind,
449 TestRunnerError, TestStageDB, UnwindStageTestRunner,
450 };
451 use alloy_primitives::{keccak256, U256};
452 use assert_matches::assert_matches;
453 use reth_db_api::cursor::{DbCursorRO, DbCursorRW, DbDupCursorRO};
454 use reth_primitives_traits::{SealedBlock, StorageEntry};
455 use reth_provider::{providers::StaticFileWriter, StaticFileProviderFactory};
456 use reth_stages_api::StageUnitCheckpoint;
457 use reth_static_file_types::StaticFileSegment;
458 use reth_testing_utils::generators::{
459 self, random_block, random_block_range, random_changeset_range,
460 random_contract_account_range, BlockParams, BlockRangeParams,
461 };
462 use reth_trie::test_utils::{state_root, state_root_prehashed};
463 use std::collections::BTreeMap;
464
465 stage_test_suite_ext!(MerkleTestRunner, merkle);
466
467 #[tokio::test]
469 async fn execute_clean_merkle() {
470 let (previous_stage, stage_progress) = (500, 0);
471
472 let mut runner = MerkleTestRunner::default();
474 let input = ExecInput {
476 target: Some(previous_stage),
477 checkpoint: Some(StageCheckpoint::new(stage_progress)),
478 };
479
480 runner.seed_execution(input).expect("failed to seed execution");
481
482 let rx = runner.execute(input);
483
484 let result = rx.await.unwrap();
486 assert_matches!(
487 result,
488 Ok(ExecOutput {
489 checkpoint: StageCheckpoint {
490 block_number,
491 stage_checkpoint: Some(StageUnitCheckpoint::Entities(EntitiesCheckpoint {
492 processed,
493 total
494 }))
495 },
496 done: true
497 }) if block_number == previous_stage && processed == total &&
498 total == (
499 runner.db.count_entries::<tables::HashedAccounts>().unwrap() +
500 runner.db.count_entries::<tables::HashedStorages>().unwrap()
501 ) as u64
502 );
503
504 assert!(runner.validate_execution(input, result.ok()).is_ok(), "execution validation");
506 }
507
508 #[tokio::test]
510 async fn execute_small_merkle() {
511 let (previous_stage, stage_progress) = (2, 1);
512
513 let mut runner = MerkleTestRunner::default();
515 let input = ExecInput {
516 target: Some(previous_stage),
517 checkpoint: Some(StageCheckpoint::new(stage_progress)),
518 };
519
520 runner.seed_execution(input).expect("failed to seed execution");
521
522 let rx = runner.execute(input);
523
524 let result = rx.await.unwrap();
526 assert_matches!(
527 result,
528 Ok(ExecOutput {
529 checkpoint: StageCheckpoint {
530 block_number,
531 stage_checkpoint: Some(StageUnitCheckpoint::Entities(EntitiesCheckpoint {
532 processed,
533 total
534 }))
535 },
536 done: true
537 }) if block_number == previous_stage && processed == total &&
538 total == (
539 runner.db.count_entries::<tables::HashedAccounts>().unwrap() +
540 runner.db.count_entries::<tables::HashedStorages>().unwrap()
541 ) as u64
542 );
543
544 assert!(runner.validate_execution(input, result.ok()).is_ok(), "execution validation");
546 }
547
548 #[tokio::test]
549 async fn execute_chunked_merkle() {
550 let (previous_stage, stage_progress) = (200, 100);
551 let clean_threshold = 100;
552 let incremental_threshold = 10;
553
554 let mut runner =
556 MerkleTestRunner { db: TestStageDB::default(), clean_threshold, incremental_threshold };
557
558 let input = ExecInput {
559 target: Some(previous_stage),
560 checkpoint: Some(StageCheckpoint::new(stage_progress)),
561 };
562
563 runner.seed_execution(input).expect("failed to seed execution");
564 let rx = runner.execute(input);
565
566 let result = rx.await.unwrap();
568 assert_matches!(
569 result,
570 Ok(ExecOutput {
571 checkpoint: StageCheckpoint {
572 block_number,
573 stage_checkpoint: Some(StageUnitCheckpoint::Entities(EntitiesCheckpoint {
574 processed,
575 total
576 }))
577 },
578 done: true
579 }) if block_number == previous_stage && processed == total &&
580 total == (
581 runner.db.count_entries::<tables::HashedAccounts>().unwrap() +
582 runner.db.count_entries::<tables::HashedStorages>().unwrap()
583 ) as u64
584 );
585
586 let provider = runner.db.factory.provider().unwrap();
588 let header = provider.header_by_number(previous_stage).unwrap().unwrap();
589 let expected_root = header.state_root;
590
591 let actual_root = runner
592 .db
593 .query_with_provider(|provider| {
594 Ok(StateRoot::incremental_root_with_updates(
595 &provider,
596 stage_progress + 1..=previous_stage,
597 ))
598 })
599 .unwrap();
600
601 assert_eq!(
602 actual_root.unwrap().0,
603 expected_root,
604 "State root mismatch after chunked processing"
605 );
606 }
607
608 struct MerkleTestRunner {
609 db: TestStageDB,
610 clean_threshold: u64,
611 incremental_threshold: u64,
612 }
613
614 impl Default for MerkleTestRunner {
615 fn default() -> Self {
616 Self {
617 db: TestStageDB::default(),
618 clean_threshold: 10000,
619 incremental_threshold: 10000,
620 }
621 }
622 }
623
624 impl StageTestRunner for MerkleTestRunner {
625 type S = MerkleStage;
626
627 fn db(&self) -> &TestStageDB {
628 &self.db
629 }
630
631 fn stage(&self) -> Self::S {
632 Self::S::Both {
633 rebuild_threshold: self.clean_threshold,
634 incremental_threshold: self.incremental_threshold,
635 }
636 }
637 }
638
639 impl ExecuteStageTestRunner for MerkleTestRunner {
640 type Seed = Vec<SealedBlock<reth_ethereum_primitives::Block>>;
641
642 fn seed_execution(&mut self, input: ExecInput) -> Result<Self::Seed, TestRunnerError> {
643 let stage_progress = input.checkpoint().block_number;
644 let start = stage_progress + 1;
645 let end = input.target();
646 let mut rng = generators::rng();
647
648 let mut preblocks = vec![];
649 if stage_progress > 0 {
650 preblocks.append(&mut random_block_range(
651 &mut rng,
652 0..=stage_progress - 1,
653 BlockRangeParams {
654 parent: Some(B256::ZERO),
655 tx_count: 0..1,
656 ..Default::default()
657 },
658 ));
659 self.db.insert_blocks(preblocks.iter(), StorageKind::Static)?;
660 }
661
662 let num_of_accounts = 31;
663 let accounts = random_contract_account_range(&mut rng, &mut (0..num_of_accounts))
664 .into_iter()
665 .collect::<BTreeMap<_, _>>();
666
667 self.db.insert_accounts_and_storages(
668 accounts.iter().map(|(addr, acc)| (*addr, (*acc, std::iter::empty()))),
669 )?;
670
671 let (header, body) = random_block(
672 &mut rng,
673 stage_progress,
674 BlockParams { parent: preblocks.last().map(|b| b.hash()), ..Default::default() },
675 )
676 .split_sealed_header_body();
677 let mut header = header.unseal();
678
679 header.state_root = state_root(
680 accounts
681 .clone()
682 .into_iter()
683 .map(|(address, account)| (address, (account, std::iter::empty()))),
684 );
685 let sealed_head = SealedBlock::<reth_ethereum_primitives::Block>::from_sealed_parts(
686 SealedHeader::seal_slow(header),
687 body,
688 );
689
690 let head_hash = sealed_head.hash();
691 let mut blocks = vec![sealed_head];
692 blocks.extend(random_block_range(
693 &mut rng,
694 start..=end,
695 BlockRangeParams { parent: Some(head_hash), tx_count: 0..3, ..Default::default() },
696 ));
697 let last_block = blocks.last().cloned().unwrap();
698 self.db.insert_blocks(blocks.iter(), StorageKind::Static)?;
699
700 let (transitions, final_state) = random_changeset_range(
701 &mut rng,
702 blocks.iter(),
703 accounts.into_iter().map(|(addr, acc)| (addr, (acc, Vec::new()))),
704 0..3,
705 0..256,
706 );
707 self.db.insert_changesets(transitions, Some(start))?;
709 self.db.insert_accounts_and_storages(final_state)?;
710
711 let root = self.db.query(|tx| {
713 let mut accounts = BTreeMap::default();
714 let mut accounts_cursor = tx.cursor_read::<tables::HashedAccounts>()?;
715 let mut storage_cursor = tx.cursor_dup_read::<tables::HashedStorages>()?;
716 for entry in accounts_cursor.walk_range(..)? {
717 let (key, account) = entry?;
718 let mut storage_entries = Vec::new();
719 let mut entry = storage_cursor.seek_exact(key)?;
720 while let Some((_, storage)) = entry {
721 storage_entries.push(storage);
722 entry = storage_cursor.next_dup()?;
723 }
724 let storage = storage_entries
725 .into_iter()
726 .filter(|v| !v.value.is_zero())
727 .map(|v| (v.key, v.value))
728 .collect::<Vec<_>>();
729 accounts.insert(key, (account, storage));
730 }
731
732 Ok(state_root_prehashed(accounts.into_iter()))
733 })?;
734
735 let static_file_provider = self.db.factory.static_file_provider();
736 let mut writer =
737 static_file_provider.latest_writer(StaticFileSegment::Headers).unwrap();
738 let mut last_header = last_block.clone_sealed_header();
739 last_header.set_state_root(root);
740
741 let hash = last_header.hash_slow();
742 writer.prune_headers(1).unwrap();
743 writer.commit().unwrap();
744 writer.append_header(&last_header, &hash).unwrap();
745 writer.commit().unwrap();
746
747 Ok(blocks)
748 }
749
750 fn validate_execution(
751 &self,
752 _input: ExecInput,
753 _output: Option<ExecOutput>,
754 ) -> Result<(), TestRunnerError> {
755 Ok(())
757 }
758 }
759
760 impl UnwindStageTestRunner for MerkleTestRunner {
761 fn validate_unwind(&self, _input: UnwindInput) -> Result<(), TestRunnerError> {
762 Ok(())
764 }
765
766 fn before_unwind(&self, input: UnwindInput) -> Result<(), TestRunnerError> {
767 let target_block = input.unwind_to + 1;
768
769 self.db
770 .commit(|tx| {
771 let mut storage_changesets_cursor =
772 tx.cursor_dup_read::<tables::StorageChangeSets>().unwrap();
773 let mut storage_cursor =
774 tx.cursor_dup_write::<tables::HashedStorages>().unwrap();
775
776 let mut tree: BTreeMap<B256, BTreeMap<B256, U256>> = BTreeMap::new();
777
778 let mut rev_changeset_walker =
779 storage_changesets_cursor.walk_back(None).unwrap();
780 while let Some((bn_address, entry)) =
781 rev_changeset_walker.next().transpose().unwrap()
782 {
783 if bn_address.block_number() < target_block {
784 break
785 }
786
787 tree.entry(keccak256(bn_address.address()))
788 .or_default()
789 .insert(keccak256(entry.key), entry.value);
790 }
791 for (hashed_address, storage) in tree {
792 for (hashed_slot, value) in storage {
793 let storage_entry = storage_cursor
794 .seek_by_key_subkey(hashed_address, hashed_slot)
795 .unwrap();
796 if storage_entry.is_some_and(|v| v.key == hashed_slot) {
797 storage_cursor.delete_current().unwrap();
798 }
799
800 if !value.is_zero() {
801 let storage_entry = StorageEntry { key: hashed_slot, value };
802 storage_cursor.upsert(hashed_address, &storage_entry).unwrap();
803 }
804 }
805 }
806
807 let mut changeset_cursor =
808 tx.cursor_dup_write::<tables::AccountChangeSets>().unwrap();
809 let mut rev_changeset_walker = changeset_cursor.walk_back(None).unwrap();
810
811 while let Some((block_number, account_before_tx)) =
812 rev_changeset_walker.next().transpose().unwrap()
813 {
814 if block_number < target_block {
815 break
816 }
817
818 if let Some(acc) = account_before_tx.info {
819 tx.put::<tables::HashedAccounts>(
820 keccak256(account_before_tx.address),
821 acc,
822 )
823 .unwrap();
824 } else {
825 tx.delete::<tables::HashedAccounts>(
826 keccak256(account_before_tx.address),
827 None,
828 )
829 .unwrap();
830 }
831 }
832 Ok(())
833 })
834 .unwrap();
835 Ok(())
836 }
837 }
838}