1use alloy_consensus::BlockHeader;
2use alloy_primitives::{BlockNumber, Sealable, B256};
3use reth_codecs::Compact;
4use reth_consensus::ConsensusError;
5use reth_db_api::{
6 tables,
7 transaction::{DbTx, DbTxMut},
8};
9use reth_primitives_traits::{GotExpected, SealedHeader};
10use reth_provider::{
11 DBProvider, HeaderProvider, ProviderError, StageCheckpointReader, StageCheckpointWriter,
12 StatsReader, TrieWriter,
13};
14use reth_stages_api::{
15 BlockErrorKind, EntitiesCheckpoint, ExecInput, ExecOutput, MerkleCheckpoint, Stage,
16 StageCheckpoint, StageError, StageId, UnwindInput, UnwindOutput,
17};
18use reth_trie::{IntermediateStateRootState, StateRoot, StateRootProgress, StoredSubNode};
19use reth_trie_db::DatabaseStateRoot;
20use std::fmt::Debug;
21use tracing::*;
22
23pub const INVALID_STATE_ROOT_ERROR_MESSAGE: &str = r#"
28Invalid state root error on stage verification!
29This is an error that likely requires a report to the reth team with additional information.
30Please include the following information in your report:
31 * This error message
32 * The state root of the block that was rejected
33 * The output of `reth db stats --checksum` from the database that was being used. This will take a long time to run!
34 * 50-100 lines of logs before and after the first occurrence of the log message with the state root of the block that was rejected.
35 * The debug logs from __the same time period__. To find the default location for these logs, run:
36 `reth --help | grep -A 4 'log.file.directory'`
37
38Once you have this information, please submit a github issue at https://github.com/paradigmxyz/reth/issues/new
39"#;
40
41pub const MERKLE_STAGE_DEFAULT_CLEAN_THRESHOLD: u64 = 5_000;
44
45#[derive(Debug, Clone)]
67pub enum MerkleStage {
68 Execution {
70 clean_threshold: u64,
73 },
74 Unwind,
76 #[cfg(any(test, feature = "test-utils"))]
78 Both {
79 clean_threshold: u64,
82 },
83}
84
85impl MerkleStage {
86 pub const fn default_execution() -> Self {
88 Self::Execution { clean_threshold: MERKLE_STAGE_DEFAULT_CLEAN_THRESHOLD }
89 }
90
91 pub const fn default_unwind() -> Self {
93 Self::Unwind
94 }
95
96 pub const fn new_execution(clean_threshold: u64) -> Self {
98 Self::Execution { clean_threshold }
99 }
100
101 pub fn get_execution_checkpoint(
103 &self,
104 provider: &impl StageCheckpointReader,
105 ) -> Result<Option<MerkleCheckpoint>, StageError> {
106 let buf =
107 provider.get_stage_checkpoint_progress(StageId::MerkleExecute)?.unwrap_or_default();
108
109 if buf.is_empty() {
110 return Ok(None)
111 }
112
113 let (checkpoint, _) = MerkleCheckpoint::from_compact(&buf, buf.len());
114 Ok(Some(checkpoint))
115 }
116
117 pub fn save_execution_checkpoint(
119 &self,
120 provider: &impl StageCheckpointWriter,
121 checkpoint: Option<MerkleCheckpoint>,
122 ) -> Result<(), StageError> {
123 let mut buf = vec![];
124 if let Some(checkpoint) = checkpoint {
125 debug!(
126 target: "sync::stages::merkle::exec",
127 last_account_key = ?checkpoint.last_account_key,
128 "Saving inner merkle checkpoint"
129 );
130 checkpoint.to_compact(&mut buf);
131 }
132 Ok(provider.save_stage_checkpoint_progress(StageId::MerkleExecute, buf)?)
133 }
134}
135
136impl<Provider> Stage<Provider> for MerkleStage
137where
138 Provider: DBProvider<Tx: DbTxMut>
139 + TrieWriter
140 + StatsReader
141 + HeaderProvider
142 + StageCheckpointReader
143 + StageCheckpointWriter,
144{
145 fn id(&self) -> StageId {
147 match self {
148 Self::Execution { .. } => StageId::MerkleExecute,
149 Self::Unwind => StageId::MerkleUnwind,
150 #[cfg(any(test, feature = "test-utils"))]
151 Self::Both { .. } => StageId::Other("MerkleBoth"),
152 }
153 }
154
155 fn execute(&mut self, provider: &Provider, input: ExecInput) -> Result<ExecOutput, StageError> {
157 let threshold = match self {
158 Self::Unwind => {
159 info!(target: "sync::stages::merkle::unwind", "Stage is always skipped");
160 return Ok(ExecOutput::done(StageCheckpoint::new(input.target())))
161 }
162 Self::Execution { clean_threshold } => *clean_threshold,
163 #[cfg(any(test, feature = "test-utils"))]
164 Self::Both { clean_threshold } => *clean_threshold,
165 };
166
167 let range = input.next_block_range();
168 let (from_block, to_block) = range.clone().into_inner();
169 let current_block_number = input.checkpoint().block_number;
170
171 let target_block = provider
172 .header_by_number(to_block)?
173 .ok_or_else(|| ProviderError::HeaderNotFound(to_block.into()))?;
174 let target_block_root = target_block.state_root();
175
176 let mut checkpoint = self.get_execution_checkpoint(provider)?;
177 let (trie_root, entities_checkpoint) = if range.is_empty() {
178 (target_block_root, input.checkpoint().entities_stage_checkpoint().unwrap_or_default())
179 } else if to_block - from_block > threshold || from_block == 1 {
180 let mut entities_checkpoint = if let Some(checkpoint) =
182 checkpoint.as_ref().filter(|c| c.target_block == to_block)
183 {
184 debug!(
185 target: "sync::stages::merkle::exec",
186 current = ?current_block_number,
187 target = ?to_block,
188 last_account_key = ?checkpoint.last_account_key,
189 "Continuing inner merkle checkpoint"
190 );
191
192 input.checkpoint().entities_stage_checkpoint()
193 } else {
194 debug!(
195 target: "sync::stages::merkle::exec",
196 current = ?current_block_number,
197 target = ?to_block,
198 previous_checkpoint = ?checkpoint,
199 "Rebuilding trie"
200 );
201 checkpoint = None;
203 self.save_execution_checkpoint(provider, None)?;
204 provider.tx_ref().clear::<tables::AccountsTrie>()?;
205 provider.tx_ref().clear::<tables::StoragesTrie>()?;
206
207 None
208 }
209 .unwrap_or(EntitiesCheckpoint {
210 processed: 0,
211 total: (provider.count_entries::<tables::HashedAccounts>()? +
212 provider.count_entries::<tables::HashedStorages>()?)
213 as u64,
214 });
215
216 let tx = provider.tx_ref();
217 let progress = StateRoot::from_tx(tx)
218 .with_intermediate_state(checkpoint.map(IntermediateStateRootState::from))
219 .root_with_progress()
220 .map_err(|e| {
221 error!(target: "sync::stages::merkle", %e, ?current_block_number, ?to_block, "State root with progress failed! {INVALID_STATE_ROOT_ERROR_MESSAGE}");
222 StageError::Fatal(Box::new(e))
223 })?;
224 match progress {
225 StateRootProgress::Progress(state, hashed_entries_walked, updates) => {
226 provider.write_trie_updates(&updates)?;
227
228 let checkpoint = MerkleCheckpoint::new(
229 to_block,
230 state.last_account_key,
231 state.walker_stack.into_iter().map(StoredSubNode::from).collect(),
232 state.hash_builder.into(),
233 );
234 self.save_execution_checkpoint(provider, Some(checkpoint))?;
235
236 entities_checkpoint.processed += hashed_entries_walked as u64;
237
238 return Ok(ExecOutput {
239 checkpoint: input
240 .checkpoint()
241 .with_entities_stage_checkpoint(entities_checkpoint),
242 done: false,
243 })
244 }
245 StateRootProgress::Complete(root, hashed_entries_walked, updates) => {
246 provider.write_trie_updates(&updates)?;
247
248 entities_checkpoint.processed += hashed_entries_walked as u64;
249
250 (root, entities_checkpoint)
251 }
252 }
253 } else {
254 debug!(target: "sync::stages::merkle::exec", current = ?current_block_number, target = ?to_block, "Updating trie");
255 let (root, updates) =
256 StateRoot::incremental_root_with_updates(provider.tx_ref(), range)
257 .map_err(|e| {
258 error!(target: "sync::stages::merkle", %e, ?current_block_number, ?to_block, "Incremental state root failed! {INVALID_STATE_ROOT_ERROR_MESSAGE}");
259 StageError::Fatal(Box::new(e))
260 })?;
261
262 provider.write_trie_updates(&updates)?;
263
264 let total_hashed_entries = (provider.count_entries::<tables::HashedAccounts>()? +
265 provider.count_entries::<tables::HashedStorages>()?)
266 as u64;
267
268 let entities_checkpoint = EntitiesCheckpoint {
269 processed: total_hashed_entries,
273 total: total_hashed_entries,
274 };
275
276 (root, entities_checkpoint)
277 };
278
279 self.save_execution_checkpoint(provider, None)?;
281
282 validate_state_root(trie_root, SealedHeader::seal_slow(target_block), to_block)?;
283
284 Ok(ExecOutput {
285 checkpoint: StageCheckpoint::new(to_block)
286 .with_entities_stage_checkpoint(entities_checkpoint),
287 done: true,
288 })
289 }
290
291 fn unwind(
293 &mut self,
294 provider: &Provider,
295 input: UnwindInput,
296 ) -> Result<UnwindOutput, StageError> {
297 let tx = provider.tx_ref();
298 let range = input.unwind_block_range();
299 if matches!(self, Self::Execution { .. }) {
300 info!(target: "sync::stages::merkle::unwind", "Stage is always skipped");
301 return Ok(UnwindOutput { checkpoint: StageCheckpoint::new(input.unwind_to) })
302 }
303
304 let mut entities_checkpoint =
305 input.checkpoint.entities_stage_checkpoint().unwrap_or(EntitiesCheckpoint {
306 processed: 0,
307 total: (tx.entries::<tables::HashedAccounts>()? +
308 tx.entries::<tables::HashedStorages>()?) as u64,
309 });
310
311 if input.unwind_to == 0 {
312 tx.clear::<tables::AccountsTrie>()?;
313 tx.clear::<tables::StoragesTrie>()?;
314
315 entities_checkpoint.processed = 0;
316
317 return Ok(UnwindOutput {
318 checkpoint: StageCheckpoint::new(input.unwind_to)
319 .with_entities_stage_checkpoint(entities_checkpoint),
320 })
321 }
322
323 if range.is_empty() {
325 info!(target: "sync::stages::merkle::unwind", "Nothing to unwind");
326 } else {
327 let (block_root, updates) = StateRoot::incremental_root_with_updates(tx, range)
328 .map_err(|e| StageError::Fatal(Box::new(e)))?;
329
330 let target = provider
332 .header_by_number(input.unwind_to)?
333 .ok_or_else(|| ProviderError::HeaderNotFound(input.unwind_to.into()))?;
334
335 validate_state_root(block_root, SealedHeader::seal_slow(target), input.unwind_to)?;
336
337 provider.write_trie_updates(&updates)?;
339
340 }
342
343 Ok(UnwindOutput { checkpoint: StageCheckpoint::new(input.unwind_to) })
344 }
345}
346
347#[inline]
349fn validate_state_root<H: BlockHeader + Sealable + Debug>(
350 got: B256,
351 expected: SealedHeader<H>,
352 target_block: BlockNumber,
353) -> Result<(), StageError> {
354 if got == expected.state_root() {
355 Ok(())
356 } else {
357 error!(target: "sync::stages::merkle", ?target_block, ?got, ?expected, "Failed to verify block state root! {INVALID_STATE_ROOT_ERROR_MESSAGE}");
358 Err(StageError::Block {
359 error: BlockErrorKind::Validation(ConsensusError::BodyStateRootDiff(
360 GotExpected { got, expected: expected.state_root() }.into(),
361 )),
362 block: Box::new(expected.block_with_parent()),
363 })
364 }
365}
366
367#[cfg(test)]
368mod tests {
369 use super::*;
370 use crate::test_utils::{
371 stage_test_suite_ext, ExecuteStageTestRunner, StageTestRunner, StorageKind,
372 TestRunnerError, TestStageDB, UnwindStageTestRunner,
373 };
374 use alloy_primitives::{keccak256, U256};
375 use assert_matches::assert_matches;
376 use reth_db_api::cursor::{DbCursorRO, DbCursorRW, DbDupCursorRO};
377 use reth_primitives_traits::{SealedBlock, StorageEntry};
378 use reth_provider::{providers::StaticFileWriter, StaticFileProviderFactory};
379 use reth_stages_api::StageUnitCheckpoint;
380 use reth_static_file_types::StaticFileSegment;
381 use reth_testing_utils::generators::{
382 self, random_block, random_block_range, random_changeset_range,
383 random_contract_account_range, BlockParams, BlockRangeParams,
384 };
385 use reth_trie::test_utils::{state_root, state_root_prehashed};
386 use std::collections::BTreeMap;
387
388 stage_test_suite_ext!(MerkleTestRunner, merkle);
389
390 #[tokio::test]
392 async fn execute_clean_merkle() {
393 let (previous_stage, stage_progress) = (500, 0);
394
395 let mut runner = MerkleTestRunner::default();
397 let input = ExecInput {
399 target: Some(previous_stage),
400 checkpoint: Some(StageCheckpoint::new(stage_progress)),
401 };
402
403 runner.seed_execution(input).expect("failed to seed execution");
404
405 let rx = runner.execute(input);
406
407 let result = rx.await.unwrap();
409 assert_matches!(
410 result,
411 Ok(ExecOutput {
412 checkpoint: StageCheckpoint {
413 block_number,
414 stage_checkpoint: Some(StageUnitCheckpoint::Entities(EntitiesCheckpoint {
415 processed,
416 total
417 }))
418 },
419 done: true
420 }) if block_number == previous_stage && processed == total &&
421 total == (
422 runner.db.table::<tables::HashedAccounts>().unwrap().len() +
423 runner.db.table::<tables::HashedStorages>().unwrap().len()
424 ) as u64
425 );
426
427 assert!(runner.validate_execution(input, result.ok()).is_ok(), "execution validation");
429 }
430
431 #[tokio::test]
433 async fn execute_small_merkle() {
434 let (previous_stage, stage_progress) = (2, 1);
435
436 let mut runner = MerkleTestRunner::default();
438 let input = ExecInput {
439 target: Some(previous_stage),
440 checkpoint: Some(StageCheckpoint::new(stage_progress)),
441 };
442
443 runner.seed_execution(input).expect("failed to seed execution");
444
445 let rx = runner.execute(input);
446
447 let result = rx.await.unwrap();
449 assert_matches!(
450 result,
451 Ok(ExecOutput {
452 checkpoint: StageCheckpoint {
453 block_number,
454 stage_checkpoint: Some(StageUnitCheckpoint::Entities(EntitiesCheckpoint {
455 processed,
456 total
457 }))
458 },
459 done: true
460 }) if block_number == previous_stage && processed == total &&
461 total == (
462 runner.db.table::<tables::HashedAccounts>().unwrap().len() +
463 runner.db.table::<tables::HashedStorages>().unwrap().len()
464 ) as u64
465 );
466
467 assert!(runner.validate_execution(input, result.ok()).is_ok(), "execution validation");
469 }
470
471 struct MerkleTestRunner {
472 db: TestStageDB,
473 clean_threshold: u64,
474 }
475
476 impl Default for MerkleTestRunner {
477 fn default() -> Self {
478 Self { db: TestStageDB::default(), clean_threshold: 10000 }
479 }
480 }
481
482 impl StageTestRunner for MerkleTestRunner {
483 type S = MerkleStage;
484
485 fn db(&self) -> &TestStageDB {
486 &self.db
487 }
488
489 fn stage(&self) -> Self::S {
490 Self::S::Both { clean_threshold: self.clean_threshold }
491 }
492 }
493
494 impl ExecuteStageTestRunner for MerkleTestRunner {
495 type Seed = Vec<SealedBlock<reth_ethereum_primitives::Block>>;
496
497 fn seed_execution(&mut self, input: ExecInput) -> Result<Self::Seed, TestRunnerError> {
498 let stage_progress = input.checkpoint().block_number;
499 let start = stage_progress + 1;
500 let end = input.target();
501 let mut rng = generators::rng();
502
503 let mut preblocks = vec![];
504 if stage_progress > 0 {
505 preblocks.append(&mut random_block_range(
506 &mut rng,
507 0..=stage_progress - 1,
508 BlockRangeParams {
509 parent: Some(B256::ZERO),
510 tx_count: 0..1,
511 ..Default::default()
512 },
513 ));
514 self.db.insert_blocks(preblocks.iter(), StorageKind::Static)?;
515 }
516
517 let num_of_accounts = 31;
518 let accounts = random_contract_account_range(&mut rng, &mut (0..num_of_accounts))
519 .into_iter()
520 .collect::<BTreeMap<_, _>>();
521
522 self.db.insert_accounts_and_storages(
523 accounts.iter().map(|(addr, acc)| (*addr, (*acc, std::iter::empty()))),
524 )?;
525
526 let (header, body) = random_block(
527 &mut rng,
528 stage_progress,
529 BlockParams { parent: preblocks.last().map(|b| b.hash()), ..Default::default() },
530 )
531 .split_sealed_header_body();
532 let mut header = header.unseal();
533
534 header.state_root = state_root(
535 accounts
536 .clone()
537 .into_iter()
538 .map(|(address, account)| (address, (account, std::iter::empty()))),
539 );
540 let sealed_head = SealedBlock::<reth_ethereum_primitives::Block>::from_sealed_parts(
541 SealedHeader::seal_slow(header),
542 body,
543 );
544
545 let head_hash = sealed_head.hash();
546 let mut blocks = vec![sealed_head];
547 blocks.extend(random_block_range(
548 &mut rng,
549 start..=end,
550 BlockRangeParams { parent: Some(head_hash), tx_count: 0..3, ..Default::default() },
551 ));
552 let last_block = blocks.last().cloned().unwrap();
553 self.db.insert_blocks(blocks.iter(), StorageKind::Static)?;
554
555 let (transitions, final_state) = random_changeset_range(
556 &mut rng,
557 blocks.iter(),
558 accounts.into_iter().map(|(addr, acc)| (addr, (acc, Vec::new()))),
559 0..3,
560 0..256,
561 );
562 self.db.insert_changesets(transitions, Some(start))?;
564 self.db.insert_accounts_and_storages(final_state)?;
565
566 let root = self.db.query(|tx| {
568 let mut accounts = BTreeMap::default();
569 let mut accounts_cursor = tx.cursor_read::<tables::HashedAccounts>()?;
570 let mut storage_cursor = tx.cursor_dup_read::<tables::HashedStorages>()?;
571 for entry in accounts_cursor.walk_range(..)? {
572 let (key, account) = entry?;
573 let mut storage_entries = Vec::new();
574 let mut entry = storage_cursor.seek_exact(key)?;
575 while let Some((_, storage)) = entry {
576 storage_entries.push(storage);
577 entry = storage_cursor.next_dup()?;
578 }
579 let storage = storage_entries
580 .into_iter()
581 .filter(|v| !v.value.is_zero())
582 .map(|v| (v.key, v.value))
583 .collect::<Vec<_>>();
584 accounts.insert(key, (account, storage));
585 }
586
587 Ok(state_root_prehashed(accounts.into_iter()))
588 })?;
589
590 let static_file_provider = self.db.factory.static_file_provider();
591 let mut writer =
592 static_file_provider.latest_writer(StaticFileSegment::Headers).unwrap();
593 let mut last_header = last_block.clone_sealed_header();
594 last_header.set_state_root(root);
595
596 let hash = last_header.hash_slow();
597 writer.prune_headers(1).unwrap();
598 writer.commit().unwrap();
599 writer.append_header(&last_header, U256::ZERO, &hash).unwrap();
600 writer.commit().unwrap();
601
602 Ok(blocks)
603 }
604
605 fn validate_execution(
606 &self,
607 _input: ExecInput,
608 _output: Option<ExecOutput>,
609 ) -> Result<(), TestRunnerError> {
610 Ok(())
612 }
613 }
614
615 impl UnwindStageTestRunner for MerkleTestRunner {
616 fn validate_unwind(&self, _input: UnwindInput) -> Result<(), TestRunnerError> {
617 Ok(())
619 }
620
621 fn before_unwind(&self, input: UnwindInput) -> Result<(), TestRunnerError> {
622 let target_block = input.unwind_to + 1;
623
624 self.db
625 .commit(|tx| {
626 let mut storage_changesets_cursor =
627 tx.cursor_dup_read::<tables::StorageChangeSets>().unwrap();
628 let mut storage_cursor =
629 tx.cursor_dup_write::<tables::HashedStorages>().unwrap();
630
631 let mut tree: BTreeMap<B256, BTreeMap<B256, U256>> = BTreeMap::new();
632
633 let mut rev_changeset_walker =
634 storage_changesets_cursor.walk_back(None).unwrap();
635 while let Some((bn_address, entry)) =
636 rev_changeset_walker.next().transpose().unwrap()
637 {
638 if bn_address.block_number() < target_block {
639 break
640 }
641
642 tree.entry(keccak256(bn_address.address()))
643 .or_default()
644 .insert(keccak256(entry.key), entry.value);
645 }
646 for (hashed_address, storage) in tree {
647 for (hashed_slot, value) in storage {
648 let storage_entry = storage_cursor
649 .seek_by_key_subkey(hashed_address, hashed_slot)
650 .unwrap();
651 if storage_entry.is_some_and(|v| v.key == hashed_slot) {
652 storage_cursor.delete_current().unwrap();
653 }
654
655 if !value.is_zero() {
656 let storage_entry = StorageEntry { key: hashed_slot, value };
657 storage_cursor.upsert(hashed_address, &storage_entry).unwrap();
658 }
659 }
660 }
661
662 let mut changeset_cursor =
663 tx.cursor_dup_write::<tables::AccountChangeSets>().unwrap();
664 let mut rev_changeset_walker = changeset_cursor.walk_back(None).unwrap();
665
666 while let Some((block_number, account_before_tx)) =
667 rev_changeset_walker.next().transpose().unwrap()
668 {
669 if block_number < target_block {
670 break
671 }
672
673 if let Some(acc) = account_before_tx.info {
674 tx.put::<tables::HashedAccounts>(
675 keccak256(account_before_tx.address),
676 acc,
677 )
678 .unwrap();
679 } else {
680 tx.delete::<tables::HashedAccounts>(
681 keccak256(account_before_tx.address),
682 None,
683 )
684 .unwrap();
685 }
686 }
687 Ok(())
688 })
689 .unwrap();
690 Ok(())
691 }
692 }
693}