reth_stages/stages/
merkle.rs

1use alloy_consensus::BlockHeader;
2use alloy_primitives::{BlockNumber, Sealable, B256};
3use reth_codecs::Compact;
4use reth_consensus::ConsensusError;
5use reth_db_api::{
6    tables,
7    transaction::{DbTx, DbTxMut},
8};
9use reth_primitives_traits::{GotExpected, SealedHeader};
10use reth_provider::{
11    DBProvider, HeaderProvider, ProviderError, StageCheckpointReader, StageCheckpointWriter,
12    StatsReader, TrieWriter,
13};
14use reth_stages_api::{
15    BlockErrorKind, EntitiesCheckpoint, ExecInput, ExecOutput, MerkleCheckpoint, Stage,
16    StageCheckpoint, StageError, StageId, UnwindInput, UnwindOutput,
17};
18use reth_trie::{IntermediateStateRootState, StateRoot, StateRootProgress, StoredSubNode};
19use reth_trie_db::DatabaseStateRoot;
20use std::fmt::Debug;
21use tracing::*;
22
23// TODO: automate the process outlined below so the user can just send in a debugging package
24/// The error message that we include in invalid state root errors to tell users what information
25/// they should include in a bug report, since true state root errors can be impossible to debug
26/// with just basic logs.
27pub const INVALID_STATE_ROOT_ERROR_MESSAGE: &str = r#"
28Invalid state root error on stage verification!
29This is an error that likely requires a report to the reth team with additional information.
30Please include the following information in your report:
31 * This error message
32 * The state root of the block that was rejected
33 * The output of `reth db stats --checksum` from the database that was being used. This will take a long time to run!
34 * 50-100 lines of logs before and after the first occurrence of the log message with the state root of the block that was rejected.
35 * The debug logs from __the same time period__. To find the default location for these logs, run:
36   `reth --help | grep -A 4 'log.file.directory'`
37
38Once you have this information, please submit a github issue at https://github.com/paradigmxyz/reth/issues/new
39"#;
40
41/// The default threshold (in number of blocks) for switching from incremental trie building
42/// of changes to whole rebuild.
43pub const MERKLE_STAGE_DEFAULT_CLEAN_THRESHOLD: u64 = 5_000;
44
45/// The merkle hashing stage uses input from
46/// [`AccountHashingStage`][crate::stages::AccountHashingStage] and
47/// [`StorageHashingStage`][crate::stages::AccountHashingStage] to calculate intermediate hashes
48/// and state roots.
49///
50/// This stage should be run with the above two stages, otherwise it is a no-op.
51///
52/// This stage is split in two: one for calculating hashes and one for unwinding.
53///
54/// When run in execution, it's going to be executed AFTER the hashing stages, to generate
55/// the state root. When run in unwind mode, it's going to be executed BEFORE the hashing stages,
56/// so that it unwinds the intermediate hashes based on the unwound hashed state from the hashing
57/// stages. The order of these two variants is important. The unwind variant should be added to the
58/// pipeline before the execution variant.
59///
60/// An example pipeline to only hash state would be:
61///
62/// - [`MerkleStage::Unwind`]
63/// - [`AccountHashingStage`][crate::stages::AccountHashingStage]
64/// - [`StorageHashingStage`][crate::stages::StorageHashingStage]
65/// - [`MerkleStage::Execution`]
66#[derive(Debug, Clone)]
67pub enum MerkleStage {
68    /// The execution portion of the merkle stage.
69    Execution {
70        /// The threshold (in number of blocks) for switching from incremental trie building
71        /// of changes to whole rebuild.
72        clean_threshold: u64,
73    },
74    /// The unwind portion of the merkle stage.
75    Unwind,
76    /// Able to execute and unwind. Used for tests
77    #[cfg(any(test, feature = "test-utils"))]
78    Both {
79        /// The threshold (in number of blocks) for switching from incremental trie building
80        /// of changes to whole rebuild.
81        clean_threshold: u64,
82    },
83}
84
85impl MerkleStage {
86    /// Stage default for the [`MerkleStage::Execution`].
87    pub const fn default_execution() -> Self {
88        Self::Execution { clean_threshold: MERKLE_STAGE_DEFAULT_CLEAN_THRESHOLD }
89    }
90
91    /// Stage default for the [`MerkleStage::Unwind`].
92    pub const fn default_unwind() -> Self {
93        Self::Unwind
94    }
95
96    /// Create new instance of [`MerkleStage::Execution`].
97    pub const fn new_execution(clean_threshold: u64) -> Self {
98        Self::Execution { clean_threshold }
99    }
100
101    /// Gets the hashing progress
102    pub fn get_execution_checkpoint(
103        &self,
104        provider: &impl StageCheckpointReader,
105    ) -> Result<Option<MerkleCheckpoint>, StageError> {
106        let buf =
107            provider.get_stage_checkpoint_progress(StageId::MerkleExecute)?.unwrap_or_default();
108
109        if buf.is_empty() {
110            return Ok(None)
111        }
112
113        let (checkpoint, _) = MerkleCheckpoint::from_compact(&buf, buf.len());
114        Ok(Some(checkpoint))
115    }
116
117    /// Saves the hashing progress
118    pub fn save_execution_checkpoint(
119        &self,
120        provider: &impl StageCheckpointWriter,
121        checkpoint: Option<MerkleCheckpoint>,
122    ) -> Result<(), StageError> {
123        let mut buf = vec![];
124        if let Some(checkpoint) = checkpoint {
125            debug!(
126                target: "sync::stages::merkle::exec",
127                last_account_key = ?checkpoint.last_account_key,
128                "Saving inner merkle checkpoint"
129            );
130            checkpoint.to_compact(&mut buf);
131        }
132        Ok(provider.save_stage_checkpoint_progress(StageId::MerkleExecute, buf)?)
133    }
134}
135
136impl<Provider> Stage<Provider> for MerkleStage
137where
138    Provider: DBProvider<Tx: DbTxMut>
139        + TrieWriter
140        + StatsReader
141        + HeaderProvider
142        + StageCheckpointReader
143        + StageCheckpointWriter,
144{
145    /// Return the id of the stage
146    fn id(&self) -> StageId {
147        match self {
148            Self::Execution { .. } => StageId::MerkleExecute,
149            Self::Unwind => StageId::MerkleUnwind,
150            #[cfg(any(test, feature = "test-utils"))]
151            Self::Both { .. } => StageId::Other("MerkleBoth"),
152        }
153    }
154
155    /// Execute the stage.
156    fn execute(&mut self, provider: &Provider, input: ExecInput) -> Result<ExecOutput, StageError> {
157        let threshold = match self {
158            Self::Unwind => {
159                info!(target: "sync::stages::merkle::unwind", "Stage is always skipped");
160                return Ok(ExecOutput::done(StageCheckpoint::new(input.target())))
161            }
162            Self::Execution { clean_threshold } => *clean_threshold,
163            #[cfg(any(test, feature = "test-utils"))]
164            Self::Both { clean_threshold } => *clean_threshold,
165        };
166
167        let range = input.next_block_range();
168        let (from_block, to_block) = range.clone().into_inner();
169        let current_block_number = input.checkpoint().block_number;
170
171        let target_block = provider
172            .header_by_number(to_block)?
173            .ok_or_else(|| ProviderError::HeaderNotFound(to_block.into()))?;
174        let target_block_root = target_block.state_root();
175
176        let mut checkpoint = self.get_execution_checkpoint(provider)?;
177        let (trie_root, entities_checkpoint) = if range.is_empty() {
178            (target_block_root, input.checkpoint().entities_stage_checkpoint().unwrap_or_default())
179        } else if to_block - from_block > threshold || from_block == 1 {
180            // if there are more blocks than threshold it is faster to rebuild the trie
181            let mut entities_checkpoint = if let Some(checkpoint) =
182                checkpoint.as_ref().filter(|c| c.target_block == to_block)
183            {
184                debug!(
185                    target: "sync::stages::merkle::exec",
186                    current = ?current_block_number,
187                    target = ?to_block,
188                    last_account_key = ?checkpoint.last_account_key,
189                    "Continuing inner merkle checkpoint"
190                );
191
192                input.checkpoint().entities_stage_checkpoint()
193            } else {
194                debug!(
195                    target: "sync::stages::merkle::exec",
196                    current = ?current_block_number,
197                    target = ?to_block,
198                    previous_checkpoint = ?checkpoint,
199                    "Rebuilding trie"
200                );
201                // Reset the checkpoint and clear trie tables
202                checkpoint = None;
203                self.save_execution_checkpoint(provider, None)?;
204                provider.tx_ref().clear::<tables::AccountsTrie>()?;
205                provider.tx_ref().clear::<tables::StoragesTrie>()?;
206
207                None
208            }
209            .unwrap_or(EntitiesCheckpoint {
210                processed: 0,
211                total: (provider.count_entries::<tables::HashedAccounts>()? +
212                    provider.count_entries::<tables::HashedStorages>()?)
213                    as u64,
214            });
215
216            let tx = provider.tx_ref();
217            let progress = StateRoot::from_tx(tx)
218                .with_intermediate_state(checkpoint.map(IntermediateStateRootState::from))
219                .root_with_progress()
220                .map_err(|e| {
221                    error!(target: "sync::stages::merkle", %e, ?current_block_number, ?to_block, "State root with progress failed! {INVALID_STATE_ROOT_ERROR_MESSAGE}");
222                    StageError::Fatal(Box::new(e))
223                })?;
224            match progress {
225                StateRootProgress::Progress(state, hashed_entries_walked, updates) => {
226                    provider.write_trie_updates(&updates)?;
227
228                    let checkpoint = MerkleCheckpoint::new(
229                        to_block,
230                        state.last_account_key,
231                        state.walker_stack.into_iter().map(StoredSubNode::from).collect(),
232                        state.hash_builder.into(),
233                    );
234                    self.save_execution_checkpoint(provider, Some(checkpoint))?;
235
236                    entities_checkpoint.processed += hashed_entries_walked as u64;
237
238                    return Ok(ExecOutput {
239                        checkpoint: input
240                            .checkpoint()
241                            .with_entities_stage_checkpoint(entities_checkpoint),
242                        done: false,
243                    })
244                }
245                StateRootProgress::Complete(root, hashed_entries_walked, updates) => {
246                    provider.write_trie_updates(&updates)?;
247
248                    entities_checkpoint.processed += hashed_entries_walked as u64;
249
250                    (root, entities_checkpoint)
251                }
252            }
253        } else {
254            debug!(target: "sync::stages::merkle::exec", current = ?current_block_number, target = ?to_block, "Updating trie");
255            let (root, updates) =
256                StateRoot::incremental_root_with_updates(provider.tx_ref(), range)
257                    .map_err(|e| {
258                        error!(target: "sync::stages::merkle", %e, ?current_block_number, ?to_block, "Incremental state root failed! {INVALID_STATE_ROOT_ERROR_MESSAGE}");
259                        StageError::Fatal(Box::new(e))
260                    })?;
261
262            provider.write_trie_updates(&updates)?;
263
264            let total_hashed_entries = (provider.count_entries::<tables::HashedAccounts>()? +
265                provider.count_entries::<tables::HashedStorages>()?)
266                as u64;
267
268            let entities_checkpoint = EntitiesCheckpoint {
269                // This is fine because `range` doesn't have an upper bound, so in this `else`
270                // branch we're just hashing all remaining accounts and storage slots we have in the
271                // database.
272                processed: total_hashed_entries,
273                total: total_hashed_entries,
274            };
275
276            (root, entities_checkpoint)
277        };
278
279        // Reset the checkpoint
280        self.save_execution_checkpoint(provider, None)?;
281
282        validate_state_root(trie_root, SealedHeader::seal_slow(target_block), to_block)?;
283
284        Ok(ExecOutput {
285            checkpoint: StageCheckpoint::new(to_block)
286                .with_entities_stage_checkpoint(entities_checkpoint),
287            done: true,
288        })
289    }
290
291    /// Unwind the stage.
292    fn unwind(
293        &mut self,
294        provider: &Provider,
295        input: UnwindInput,
296    ) -> Result<UnwindOutput, StageError> {
297        let tx = provider.tx_ref();
298        let range = input.unwind_block_range();
299        if matches!(self, Self::Execution { .. }) {
300            info!(target: "sync::stages::merkle::unwind", "Stage is always skipped");
301            return Ok(UnwindOutput { checkpoint: StageCheckpoint::new(input.unwind_to) })
302        }
303
304        let mut entities_checkpoint =
305            input.checkpoint.entities_stage_checkpoint().unwrap_or(EntitiesCheckpoint {
306                processed: 0,
307                total: (tx.entries::<tables::HashedAccounts>()? +
308                    tx.entries::<tables::HashedStorages>()?) as u64,
309            });
310
311        if input.unwind_to == 0 {
312            tx.clear::<tables::AccountsTrie>()?;
313            tx.clear::<tables::StoragesTrie>()?;
314
315            entities_checkpoint.processed = 0;
316
317            return Ok(UnwindOutput {
318                checkpoint: StageCheckpoint::new(input.unwind_to)
319                    .with_entities_stage_checkpoint(entities_checkpoint),
320            })
321        }
322
323        // Unwind trie only if there are transitions
324        if range.is_empty() {
325            info!(target: "sync::stages::merkle::unwind", "Nothing to unwind");
326        } else {
327            let (block_root, updates) = StateRoot::incremental_root_with_updates(tx, range)
328                .map_err(|e| StageError::Fatal(Box::new(e)))?;
329
330            // Validate the calculated state root
331            let target = provider
332                .header_by_number(input.unwind_to)?
333                .ok_or_else(|| ProviderError::HeaderNotFound(input.unwind_to.into()))?;
334
335            validate_state_root(block_root, SealedHeader::seal_slow(target), input.unwind_to)?;
336
337            // Validation passed, apply unwind changes to the database.
338            provider.write_trie_updates(&updates)?;
339
340            // TODO(alexey): update entities checkpoint
341        }
342
343        Ok(UnwindOutput { checkpoint: StageCheckpoint::new(input.unwind_to) })
344    }
345}
346
347/// Check that the computed state root matches the root in the expected header.
348#[inline]
349fn validate_state_root<H: BlockHeader + Sealable + Debug>(
350    got: B256,
351    expected: SealedHeader<H>,
352    target_block: BlockNumber,
353) -> Result<(), StageError> {
354    if got == expected.state_root() {
355        Ok(())
356    } else {
357        error!(target: "sync::stages::merkle", ?target_block, ?got, ?expected, "Failed to verify block state root! {INVALID_STATE_ROOT_ERROR_MESSAGE}");
358        Err(StageError::Block {
359            error: BlockErrorKind::Validation(ConsensusError::BodyStateRootDiff(
360                GotExpected { got, expected: expected.state_root() }.into(),
361            )),
362            block: Box::new(expected.block_with_parent()),
363        })
364    }
365}
366
367#[cfg(test)]
368mod tests {
369    use super::*;
370    use crate::test_utils::{
371        stage_test_suite_ext, ExecuteStageTestRunner, StageTestRunner, StorageKind,
372        TestRunnerError, TestStageDB, UnwindStageTestRunner,
373    };
374    use alloy_primitives::{keccak256, U256};
375    use assert_matches::assert_matches;
376    use reth_db_api::cursor::{DbCursorRO, DbCursorRW, DbDupCursorRO};
377    use reth_primitives_traits::{SealedBlock, StorageEntry};
378    use reth_provider::{providers::StaticFileWriter, StaticFileProviderFactory};
379    use reth_stages_api::StageUnitCheckpoint;
380    use reth_static_file_types::StaticFileSegment;
381    use reth_testing_utils::generators::{
382        self, random_block, random_block_range, random_changeset_range,
383        random_contract_account_range, BlockParams, BlockRangeParams,
384    };
385    use reth_trie::test_utils::{state_root, state_root_prehashed};
386    use std::collections::BTreeMap;
387
388    stage_test_suite_ext!(MerkleTestRunner, merkle);
389
390    /// Execute from genesis so as to merkelize whole state
391    #[tokio::test]
392    async fn execute_clean_merkle() {
393        let (previous_stage, stage_progress) = (500, 0);
394
395        // Set up the runner
396        let mut runner = MerkleTestRunner::default();
397        // set low threshold so we hash the whole storage
398        let input = ExecInput {
399            target: Some(previous_stage),
400            checkpoint: Some(StageCheckpoint::new(stage_progress)),
401        };
402
403        runner.seed_execution(input).expect("failed to seed execution");
404
405        let rx = runner.execute(input);
406
407        // Assert the successful result
408        let result = rx.await.unwrap();
409        assert_matches!(
410            result,
411            Ok(ExecOutput {
412                checkpoint: StageCheckpoint {
413                    block_number,
414                    stage_checkpoint: Some(StageUnitCheckpoint::Entities(EntitiesCheckpoint {
415                        processed,
416                        total
417                    }))
418                },
419                done: true
420            }) if block_number == previous_stage && processed == total &&
421                total == (
422                    runner.db.table::<tables::HashedAccounts>().unwrap().len() +
423                    runner.db.table::<tables::HashedStorages>().unwrap().len()
424                ) as u64
425        );
426
427        // Validate the stage execution
428        assert!(runner.validate_execution(input, result.ok()).is_ok(), "execution validation");
429    }
430
431    /// Update small trie
432    #[tokio::test]
433    async fn execute_small_merkle() {
434        let (previous_stage, stage_progress) = (2, 1);
435
436        // Set up the runner
437        let mut runner = MerkleTestRunner::default();
438        let input = ExecInput {
439            target: Some(previous_stage),
440            checkpoint: Some(StageCheckpoint::new(stage_progress)),
441        };
442
443        runner.seed_execution(input).expect("failed to seed execution");
444
445        let rx = runner.execute(input);
446
447        // Assert the successful result
448        let result = rx.await.unwrap();
449        assert_matches!(
450            result,
451            Ok(ExecOutput {
452                checkpoint: StageCheckpoint {
453                    block_number,
454                    stage_checkpoint: Some(StageUnitCheckpoint::Entities(EntitiesCheckpoint {
455                        processed,
456                        total
457                    }))
458                },
459                done: true
460            }) if block_number == previous_stage && processed == total &&
461                total == (
462                    runner.db.table::<tables::HashedAccounts>().unwrap().len() +
463                    runner.db.table::<tables::HashedStorages>().unwrap().len()
464                ) as u64
465        );
466
467        // Validate the stage execution
468        assert!(runner.validate_execution(input, result.ok()).is_ok(), "execution validation");
469    }
470
471    struct MerkleTestRunner {
472        db: TestStageDB,
473        clean_threshold: u64,
474    }
475
476    impl Default for MerkleTestRunner {
477        fn default() -> Self {
478            Self { db: TestStageDB::default(), clean_threshold: 10000 }
479        }
480    }
481
482    impl StageTestRunner for MerkleTestRunner {
483        type S = MerkleStage;
484
485        fn db(&self) -> &TestStageDB {
486            &self.db
487        }
488
489        fn stage(&self) -> Self::S {
490            Self::S::Both { clean_threshold: self.clean_threshold }
491        }
492    }
493
494    impl ExecuteStageTestRunner for MerkleTestRunner {
495        type Seed = Vec<SealedBlock<reth_ethereum_primitives::Block>>;
496
497        fn seed_execution(&mut self, input: ExecInput) -> Result<Self::Seed, TestRunnerError> {
498            let stage_progress = input.checkpoint().block_number;
499            let start = stage_progress + 1;
500            let end = input.target();
501            let mut rng = generators::rng();
502
503            let mut preblocks = vec![];
504            if stage_progress > 0 {
505                preblocks.append(&mut random_block_range(
506                    &mut rng,
507                    0..=stage_progress - 1,
508                    BlockRangeParams {
509                        parent: Some(B256::ZERO),
510                        tx_count: 0..1,
511                        ..Default::default()
512                    },
513                ));
514                self.db.insert_blocks(preblocks.iter(), StorageKind::Static)?;
515            }
516
517            let num_of_accounts = 31;
518            let accounts = random_contract_account_range(&mut rng, &mut (0..num_of_accounts))
519                .into_iter()
520                .collect::<BTreeMap<_, _>>();
521
522            self.db.insert_accounts_and_storages(
523                accounts.iter().map(|(addr, acc)| (*addr, (*acc, std::iter::empty()))),
524            )?;
525
526            let (header, body) = random_block(
527                &mut rng,
528                stage_progress,
529                BlockParams { parent: preblocks.last().map(|b| b.hash()), ..Default::default() },
530            )
531            .split_sealed_header_body();
532            let mut header = header.unseal();
533
534            header.state_root = state_root(
535                accounts
536                    .clone()
537                    .into_iter()
538                    .map(|(address, account)| (address, (account, std::iter::empty()))),
539            );
540            let sealed_head = SealedBlock::<reth_ethereum_primitives::Block>::from_sealed_parts(
541                SealedHeader::seal_slow(header),
542                body,
543            );
544
545            let head_hash = sealed_head.hash();
546            let mut blocks = vec![sealed_head];
547            blocks.extend(random_block_range(
548                &mut rng,
549                start..=end,
550                BlockRangeParams { parent: Some(head_hash), tx_count: 0..3, ..Default::default() },
551            ));
552            let last_block = blocks.last().cloned().unwrap();
553            self.db.insert_blocks(blocks.iter(), StorageKind::Static)?;
554
555            let (transitions, final_state) = random_changeset_range(
556                &mut rng,
557                blocks.iter(),
558                accounts.into_iter().map(|(addr, acc)| (addr, (acc, Vec::new()))),
559                0..3,
560                0..256,
561            );
562            // add block changeset from block 1.
563            self.db.insert_changesets(transitions, Some(start))?;
564            self.db.insert_accounts_and_storages(final_state)?;
565
566            // Calculate state root
567            let root = self.db.query(|tx| {
568                let mut accounts = BTreeMap::default();
569                let mut accounts_cursor = tx.cursor_read::<tables::HashedAccounts>()?;
570                let mut storage_cursor = tx.cursor_dup_read::<tables::HashedStorages>()?;
571                for entry in accounts_cursor.walk_range(..)? {
572                    let (key, account) = entry?;
573                    let mut storage_entries = Vec::new();
574                    let mut entry = storage_cursor.seek_exact(key)?;
575                    while let Some((_, storage)) = entry {
576                        storage_entries.push(storage);
577                        entry = storage_cursor.next_dup()?;
578                    }
579                    let storage = storage_entries
580                        .into_iter()
581                        .filter(|v| !v.value.is_zero())
582                        .map(|v| (v.key, v.value))
583                        .collect::<Vec<_>>();
584                    accounts.insert(key, (account, storage));
585                }
586
587                Ok(state_root_prehashed(accounts.into_iter()))
588            })?;
589
590            let static_file_provider = self.db.factory.static_file_provider();
591            let mut writer =
592                static_file_provider.latest_writer(StaticFileSegment::Headers).unwrap();
593            let mut last_header = last_block.clone_sealed_header();
594            last_header.set_state_root(root);
595
596            let hash = last_header.hash_slow();
597            writer.prune_headers(1).unwrap();
598            writer.commit().unwrap();
599            writer.append_header(&last_header, U256::ZERO, &hash).unwrap();
600            writer.commit().unwrap();
601
602            Ok(blocks)
603        }
604
605        fn validate_execution(
606            &self,
607            _input: ExecInput,
608            _output: Option<ExecOutput>,
609        ) -> Result<(), TestRunnerError> {
610            // The execution is validated within the stage
611            Ok(())
612        }
613    }
614
615    impl UnwindStageTestRunner for MerkleTestRunner {
616        fn validate_unwind(&self, _input: UnwindInput) -> Result<(), TestRunnerError> {
617            // The unwind is validated within the stage
618            Ok(())
619        }
620
621        fn before_unwind(&self, input: UnwindInput) -> Result<(), TestRunnerError> {
622            let target_block = input.unwind_to + 1;
623
624            self.db
625                .commit(|tx| {
626                    let mut storage_changesets_cursor =
627                        tx.cursor_dup_read::<tables::StorageChangeSets>().unwrap();
628                    let mut storage_cursor =
629                        tx.cursor_dup_write::<tables::HashedStorages>().unwrap();
630
631                    let mut tree: BTreeMap<B256, BTreeMap<B256, U256>> = BTreeMap::new();
632
633                    let mut rev_changeset_walker =
634                        storage_changesets_cursor.walk_back(None).unwrap();
635                    while let Some((bn_address, entry)) =
636                        rev_changeset_walker.next().transpose().unwrap()
637                    {
638                        if bn_address.block_number() < target_block {
639                            break
640                        }
641
642                        tree.entry(keccak256(bn_address.address()))
643                            .or_default()
644                            .insert(keccak256(entry.key), entry.value);
645                    }
646                    for (hashed_address, storage) in tree {
647                        for (hashed_slot, value) in storage {
648                            let storage_entry = storage_cursor
649                                .seek_by_key_subkey(hashed_address, hashed_slot)
650                                .unwrap();
651                            if storage_entry.is_some_and(|v| v.key == hashed_slot) {
652                                storage_cursor.delete_current().unwrap();
653                            }
654
655                            if !value.is_zero() {
656                                let storage_entry = StorageEntry { key: hashed_slot, value };
657                                storage_cursor.upsert(hashed_address, &storage_entry).unwrap();
658                            }
659                        }
660                    }
661
662                    let mut changeset_cursor =
663                        tx.cursor_dup_write::<tables::AccountChangeSets>().unwrap();
664                    let mut rev_changeset_walker = changeset_cursor.walk_back(None).unwrap();
665
666                    while let Some((block_number, account_before_tx)) =
667                        rev_changeset_walker.next().transpose().unwrap()
668                    {
669                        if block_number < target_block {
670                            break
671                        }
672
673                        if let Some(acc) = account_before_tx.info {
674                            tx.put::<tables::HashedAccounts>(
675                                keccak256(account_before_tx.address),
676                                acc,
677                            )
678                            .unwrap();
679                        } else {
680                            tx.delete::<tables::HashedAccounts>(
681                                keccak256(account_before_tx.address),
682                                None,
683                            )
684                            .unwrap();
685                        }
686                    }
687                    Ok(())
688                })
689                .unwrap();
690            Ok(())
691        }
692    }
693}