1use crate::tree::{
4 cached_state::CachedStateProvider,
5 error::{InsertBlockError, InsertBlockErrorKind, InsertPayloadError},
6 executor::WorkloadExecutor,
7 instrumented_state::InstrumentedStateProvider,
8 payload_processor::PayloadProcessor,
9 persistence_state::CurrentPersistenceAction,
10 precompile_cache::{CachedPrecompile, CachedPrecompileMetrics, PrecompileCacheMap},
11 sparse_trie::StateRootComputeOutcome,
12 ConsistentDbView, EngineApiMetrics, EngineApiTreeState, ExecutionEnv, PayloadHandle,
13 PersistenceState, PersistingKind, StateProviderBuilder, StateProviderDatabase, TreeConfig,
14};
15use alloy_consensus::transaction::Either;
16use alloy_eips::{eip1898::BlockWithParent, NumHash};
17use alloy_evm::Evm;
18use alloy_primitives::B256;
19use reth_chain_state::{
20 CanonicalInMemoryState, ExecutedBlock, ExecutedBlockWithTrieUpdates, ExecutedTrieUpdates,
21};
22use reth_consensus::{ConsensusError, FullConsensus};
23use reth_engine_primitives::{
24 ConfigureEngineEvm, ExecutableTxIterator, ExecutionPayload, InvalidBlockHook, PayloadValidator,
25};
26use reth_errors::{BlockExecutionError, ProviderResult};
27use reth_evm::{
28 block::BlockExecutor, execute::ExecutableTxFor, ConfigureEvm, EvmEnvFor, ExecutionCtxFor,
29 SpecFor,
30};
31use reth_payload_primitives::{
32 BuiltPayload, InvalidPayloadAttributesError, NewPayloadError, PayloadTypes,
33};
34use reth_primitives_traits::{
35 AlloyBlockHeader, BlockTy, GotExpected, NodePrimitives, RecoveredBlock, SealedHeader,
36};
37use reth_provider::{
38 BlockExecutionOutput, BlockHashReader, BlockNumReader, BlockReader, DBProvider,
39 DatabaseProviderFactory, ExecutionOutcome, HashedPostStateProvider, HeaderProvider,
40 ProviderError, StateProvider, StateProviderFactory, StateReader, StateRootProvider,
41};
42use reth_revm::db::State;
43use reth_trie::{updates::TrieUpdates, HashedPostState, KeccakKeyHasher, TrieInput};
44use reth_trie_db::DatabaseHashedPostState;
45use reth_trie_parallel::root::{ParallelStateRoot, ParallelStateRootError};
46use std::{collections::HashMap, sync::Arc, time::Instant};
47use tracing::{debug, debug_span, error, info, trace, warn};
48
49pub struct TreeCtx<'a, N: NodePrimitives> {
54 state: &'a mut EngineApiTreeState<N>,
56 persistence: &'a PersistenceState,
58 canonical_in_memory_state: &'a CanonicalInMemoryState<N>,
60}
61
62impl<'a, N: NodePrimitives> std::fmt::Debug for TreeCtx<'a, N> {
63 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
64 f.debug_struct("TreeCtx")
65 .field("state", &"EngineApiTreeState")
66 .field("persistence_info", &self.persistence)
67 .field("canonical_in_memory_state", &self.canonical_in_memory_state)
68 .finish()
69 }
70}
71
72impl<'a, N: NodePrimitives> TreeCtx<'a, N> {
73 pub const fn new(
75 state: &'a mut EngineApiTreeState<N>,
76 persistence: &'a PersistenceState,
77 canonical_in_memory_state: &'a CanonicalInMemoryState<N>,
78 ) -> Self {
79 Self { state, persistence, canonical_in_memory_state }
80 }
81
82 pub const fn state(&self) -> &EngineApiTreeState<N> {
84 &*self.state
85 }
86
87 pub const fn state_mut(&mut self) -> &mut EngineApiTreeState<N> {
89 self.state
90 }
91
92 pub const fn persistence(&self) -> &PersistenceState {
94 self.persistence
95 }
96
97 pub const fn canonical_in_memory_state(&self) -> &'a CanonicalInMemoryState<N> {
99 self.canonical_in_memory_state
100 }
101
102 pub fn persisting_kind_for(&self, block: BlockWithParent) -> PersistingKind {
109 let Some(action) = self.persistence().current_action() else {
111 return PersistingKind::NotPersisting
112 };
113 let CurrentPersistenceAction::SavingBlocks { highest } = action else {
115 return PersistingKind::PersistingNotDescendant
116 };
117
118 if block.block.number > highest.number &&
121 self.state().tree_state.is_descendant(*highest, block)
122 {
123 return PersistingKind::PersistingDescendant
124 }
125
126 PersistingKind::PersistingNotDescendant
128 }
129}
130
131#[derive(derive_more::Debug)]
139pub struct BasicEngineValidator<P, Evm, V>
140where
141 Evm: ConfigureEvm,
142{
143 provider: P,
145 consensus: Arc<dyn FullConsensus<Evm::Primitives, Error = ConsensusError>>,
147 evm_config: Evm,
149 config: TreeConfig,
151 payload_processor: PayloadProcessor<Evm>,
153 precompile_cache_map: PrecompileCacheMap<SpecFor<Evm>>,
155 precompile_cache_metrics: HashMap<alloy_primitives::Address, CachedPrecompileMetrics>,
157 #[debug(skip)]
159 invalid_block_hook: Box<dyn InvalidBlockHook<Evm::Primitives>>,
160 metrics: EngineApiMetrics,
162 validator: V,
164}
165
166impl<N, P, Evm, V> BasicEngineValidator<P, Evm, V>
167where
168 N: NodePrimitives,
169 P: DatabaseProviderFactory<Provider: BlockReader>
170 + BlockReader<Header = N::BlockHeader>
171 + StateProviderFactory
172 + StateReader
173 + HashedPostStateProvider
174 + Clone
175 + 'static,
176 Evm: ConfigureEvm<Primitives = N> + 'static,
177{
178 #[allow(clippy::too_many_arguments)]
180 pub fn new(
181 provider: P,
182 consensus: Arc<dyn FullConsensus<N, Error = ConsensusError>>,
183 evm_config: Evm,
184 validator: V,
185 config: TreeConfig,
186 invalid_block_hook: Box<dyn InvalidBlockHook<N>>,
187 ) -> Self {
188 let precompile_cache_map = PrecompileCacheMap::default();
189 let payload_processor = PayloadProcessor::new(
190 WorkloadExecutor::default(),
191 evm_config.clone(),
192 &config,
193 precompile_cache_map.clone(),
194 );
195 Self {
196 provider,
197 consensus,
198 evm_config,
199 payload_processor,
200 precompile_cache_map,
201 precompile_cache_metrics: HashMap::new(),
202 config,
203 invalid_block_hook,
204 metrics: EngineApiMetrics::default(),
205 validator,
206 }
207 }
208
209 pub fn convert_to_block<T: PayloadTypes<BuiltPayload: BuiltPayload<Primitives = N>>>(
211 &self,
212 input: BlockOrPayload<T>,
213 ) -> Result<RecoveredBlock<N::Block>, NewPayloadError>
214 where
215 V: PayloadValidator<T, Block = N::Block>,
216 {
217 match input {
218 BlockOrPayload::Payload(payload) => self.validator.ensure_well_formed_payload(payload),
219 BlockOrPayload::Block(block) => Ok(block),
220 }
221 }
222
223 pub fn evm_env_for<T: PayloadTypes<BuiltPayload: BuiltPayload<Primitives = N>>>(
225 &self,
226 input: &BlockOrPayload<T>,
227 ) -> Result<EvmEnvFor<Evm>, Evm::Error>
228 where
229 V: PayloadValidator<T, Block = N::Block>,
230 Evm: ConfigureEngineEvm<T::ExecutionData, Primitives = N>,
231 {
232 match input {
233 BlockOrPayload::Payload(payload) => Ok(self.evm_config.evm_env_for_payload(payload)?),
234 BlockOrPayload::Block(block) => Ok(self.evm_config.evm_env(block.header())?),
235 }
236 }
237
238 pub fn tx_iterator_for<'a, T: PayloadTypes<BuiltPayload: BuiltPayload<Primitives = N>>>(
240 &'a self,
241 input: &'a BlockOrPayload<T>,
242 ) -> Result<impl ExecutableTxIterator<Evm> + 'a, NewPayloadError>
243 where
244 V: PayloadValidator<T, Block = N::Block>,
245 Evm: ConfigureEngineEvm<T::ExecutionData, Primitives = N>,
246 {
247 match input {
248 BlockOrPayload::Payload(payload) => Ok(Either::Left(
249 self.evm_config
250 .tx_iterator_for_payload(payload)
251 .map_err(NewPayloadError::other)?
252 .map(|res| res.map(Either::Left)),
253 )),
254 BlockOrPayload::Block(block) => {
255 let transactions = block.clone_transactions_recovered().collect::<Vec<_>>();
256 Ok(Either::Right(transactions.into_iter().map(|tx| Ok(Either::Right(tx)))))
257 }
258 }
259 }
260
261 pub fn execution_ctx_for<'a, T: PayloadTypes<BuiltPayload: BuiltPayload<Primitives = N>>>(
263 &self,
264 input: &'a BlockOrPayload<T>,
265 ) -> Result<ExecutionCtxFor<'a, Evm>, Evm::Error>
266 where
267 V: PayloadValidator<T, Block = N::Block>,
268 Evm: ConfigureEngineEvm<T::ExecutionData, Primitives = N>,
269 {
270 match input {
271 BlockOrPayload::Payload(payload) => Ok(self.evm_config.context_for_payload(payload)?),
272 BlockOrPayload::Block(block) => Ok(self.evm_config.context_for_block(block)?),
273 }
274 }
275
276 fn handle_execution_error<T: PayloadTypes<BuiltPayload: BuiltPayload<Primitives = N>>>(
281 &self,
282 input: BlockOrPayload<T>,
283 execution_err: InsertBlockErrorKind,
284 parent_block: &SealedHeader<N::BlockHeader>,
285 ) -> Result<ExecutedBlockWithTrieUpdates<N>, InsertPayloadError<N::Block>>
286 where
287 V: PayloadValidator<T, Block = N::Block>,
288 {
289 debug!(
290 target: "engine::tree",
291 ?execution_err,
292 block = ?input.num_hash(),
293 "Block execution failed, checking for header validation errors"
294 );
295
296 let block = self.convert_to_block(input)?;
299
300 if let Err(consensus_err) = self.validate_block_inner(&block) {
302 return Err(InsertBlockError::new(block.into_sealed_block(), consensus_err.into()).into())
304 }
305
306 if let Err(consensus_err) =
308 self.consensus.validate_header_against_parent(block.sealed_header(), parent_block)
309 {
310 return Err(InsertBlockError::new(block.into_sealed_block(), consensus_err.into()).into())
312 }
313
314 Err(InsertBlockError::new(block.into_sealed_block(), execution_err).into())
316 }
317
318 pub fn validate_block_with_state<T: PayloadTypes<BuiltPayload: BuiltPayload<Primitives = N>>>(
326 &mut self,
327 input: BlockOrPayload<T>,
328 mut ctx: TreeCtx<'_, N>,
329 ) -> ValidationOutcome<N, InsertPayloadError<N::Block>>
330 where
331 V: PayloadValidator<T, Block = N::Block>,
332 Evm: ConfigureEngineEvm<T::ExecutionData, Primitives = N>,
333 {
334 macro_rules! ensure_ok {
337 ($expr:expr) => {
338 match $expr {
339 Ok(val) => val,
340 Err(e) => {
341 let block = self.convert_to_block(input)?;
342 return Err(
343 InsertBlockError::new(block.into_sealed_block(), e.into()).into()
344 )
345 }
346 }
347 };
348 }
349
350 macro_rules! ensure_ok_post_block {
352 ($expr:expr, $block:expr) => {
353 match $expr {
354 Ok(val) => val,
355 Err(e) => {
356 return Err(
357 InsertBlockError::new($block.into_sealed_block(), e.into()).into()
358 )
359 }
360 }
361 };
362 }
363
364 let parent_hash = input.parent_hash();
365 let block_num_hash = input.num_hash();
366
367 trace!(target: "engine::tree", block=?block_num_hash, parent=?parent_hash, "Fetching block state provider");
368 let Some(provider_builder) =
369 ensure_ok!(self.state_provider_builder(parent_hash, ctx.state()))
370 else {
371 return Err(InsertBlockError::new(
373 self.convert_to_block(input)?.into_sealed_block(),
374 ProviderError::HeaderNotFound(parent_hash.into()).into(),
375 )
376 .into())
377 };
378
379 let state_provider = ensure_ok!(provider_builder.build());
380
381 let Some(parent_block) = ensure_ok!(self.sealed_header_by_hash(parent_hash, ctx.state()))
383 else {
384 return Err(InsertBlockError::new(
385 self.convert_to_block(input)?.into_sealed_block(),
386 ProviderError::HeaderNotFound(parent_hash.into()).into(),
387 )
388 .into())
389 };
390
391 let evm_env = self.evm_env_for(&input).map_err(NewPayloadError::other)?;
392
393 let env = ExecutionEnv { evm_env, hash: input.hash(), parent_hash: input.parent_hash() };
394
395 let state_root_plan = self.plan_state_root_computation(&input, &ctx);
397 let persisting_kind = state_root_plan.persisting_kind;
398 let has_ancestors_with_missing_trie_updates =
399 state_root_plan.has_ancestors_with_missing_trie_updates;
400 let strategy = state_root_plan.strategy;
401
402 debug!(
403 target: "engine::tree",
404 block=?block_num_hash,
405 ?strategy,
406 ?has_ancestors_with_missing_trie_updates,
407 "Deciding which state root algorithm to run"
408 );
409
410 let txs = self.tx_iterator_for(&input)?;
412
413 let (mut handle, strategy) = ensure_ok!(self.spawn_payload_processor(
415 env.clone(),
416 txs,
417 provider_builder,
418 persisting_kind,
419 parent_hash,
420 ctx.state(),
421 block_num_hash,
422 strategy,
423 ));
424
425 let state_provider = CachedStateProvider::new_with_caches(
428 state_provider,
429 handle.caches(),
430 handle.cache_metrics(),
431 );
432
433 let output = match if self.config.state_provider_metrics() {
435 let state_provider = InstrumentedStateProvider::from_state_provider(&state_provider);
436 let result = self.execute_block(&state_provider, env, &input, &mut handle);
437 state_provider.record_total_latency();
438 result
439 } else {
440 self.execute_block(&state_provider, env, &input, &mut handle)
441 } {
442 Ok(output) => output,
443 Err(err) => return self.handle_execution_error(input, err, &parent_block),
444 };
445
446 handle.stop_prewarming_execution();
448
449 let block = self.convert_to_block(input)?;
450
451 let hashed_state = ensure_ok_post_block!(
452 self.validate_post_execution(&block, &parent_block, &output, &mut ctx),
453 block
454 );
455
456 debug!(target: "engine::tree", block=?block_num_hash, "Calculating block state root");
457
458 let root_time = Instant::now();
459
460 let mut maybe_state_root = None;
461
462 match strategy {
463 StateRootStrategy::StateRootTask => {
464 debug!(target: "engine::tree", block=?block_num_hash, "Using sparse trie state root algorithm");
465 match handle.state_root() {
466 Ok(StateRootComputeOutcome { state_root, trie_updates }) => {
467 let elapsed = root_time.elapsed();
468 info!(target: "engine::tree", ?state_root, ?elapsed, "State root task finished");
469 if state_root == block.header().state_root() {
471 maybe_state_root = Some((state_root, trie_updates, elapsed))
472 } else {
473 warn!(
474 target: "engine::tree",
475 ?state_root,
476 block_state_root = ?block.header().state_root(),
477 "State root task returned incorrect state root"
478 );
479 }
480 }
481 Err(error) => {
482 debug!(target: "engine::tree", %error, "State root task failed");
483 }
484 }
485 }
486 StateRootStrategy::Parallel => {
487 debug!(target: "engine::tree", block=?block_num_hash, "Using parallel state root algorithm");
488 match self.compute_state_root_parallel(
489 persisting_kind,
490 block.parent_hash(),
491 &hashed_state,
492 ctx.state(),
493 ) {
494 Ok(result) => {
495 info!(
496 target: "engine::tree",
497 block = ?block_num_hash,
498 regular_state_root = ?result.0,
499 "Regular root task finished"
500 );
501 maybe_state_root = Some((result.0, result.1, root_time.elapsed()));
502 }
503 Err(error) => {
504 debug!(target: "engine::tree", %error, "Parallel state root computation failed");
505 }
506 }
507 }
508 StateRootStrategy::Synchronous => {}
509 }
510
511 let (state_root, trie_output, root_elapsed) = if let Some(maybe_state_root) =
515 maybe_state_root
516 {
517 maybe_state_root
518 } else {
519 if self.config.state_root_fallback() {
521 debug!(target: "engine::tree", block=?block_num_hash, "Using state root fallback for testing");
522 } else {
523 warn!(target: "engine::tree", block=?block_num_hash, ?persisting_kind, "Failed to compute state root in parallel");
524 self.metrics.block_validation.state_root_parallel_fallback_total.increment(1);
525 }
526
527 let (root, updates) = ensure_ok_post_block!(
528 state_provider.state_root_with_updates(hashed_state.clone()),
529 block
530 );
531 (root, updates, root_time.elapsed())
532 };
533
534 self.metrics.block_validation.record_state_root(&trie_output, root_elapsed.as_secs_f64());
535 debug!(target: "engine::tree", ?root_elapsed, block=?block_num_hash, "Calculated state root");
536
537 if state_root != block.header().state_root() {
539 self.on_invalid_block(
541 &parent_block,
542 &block,
543 &output,
544 Some((&trie_output, state_root)),
545 ctx.state_mut(),
546 );
547 let block_state_root = block.header().state_root();
548 return Err(InsertBlockError::new(
549 block.into_sealed_block(),
550 ConsensusError::BodyStateRootDiff(
551 GotExpected { got: state_root, expected: block_state_root }.into(),
552 )
553 .into(),
554 )
555 .into())
556 }
557
558 handle.terminate_caching(Some(&output.state));
560
561 let connects_to_last_persisted =
569 ensure_ok_post_block!(self.block_connects_to_last_persisted(ctx, &block), block);
570 let should_discard_trie_updates =
571 !connects_to_last_persisted || has_ancestors_with_missing_trie_updates;
572 debug!(
573 target: "engine::tree",
574 block = ?block_num_hash,
575 connects_to_last_persisted,
576 has_ancestors_with_missing_trie_updates,
577 should_discard_trie_updates,
578 "Checking if should discard trie updates"
579 );
580 let trie_updates = if should_discard_trie_updates {
581 ExecutedTrieUpdates::Missing
582 } else {
583 ExecutedTrieUpdates::Present(Arc::new(trie_output))
584 };
585
586 Ok(ExecutedBlockWithTrieUpdates {
587 block: ExecutedBlock {
588 recovered_block: Arc::new(block),
589 execution_output: Arc::new(ExecutionOutcome::from((output, block_num_hash.number))),
590 hashed_state: Arc::new(hashed_state),
591 },
592 trie: trie_updates,
593 })
594 }
595
596 fn sealed_header_by_hash(
598 &self,
599 hash: B256,
600 state: &EngineApiTreeState<N>,
601 ) -> ProviderResult<Option<SealedHeader<N::BlockHeader>>> {
602 let header = state.tree_state.sealed_header_by_hash(&hash);
604
605 if header.is_some() {
606 Ok(header)
607 } else {
608 self.provider.sealed_header_by_hash(hash)
609 }
610 }
611
612 fn validate_block_inner(&self, block: &RecoveredBlock<N::Block>) -> Result<(), ConsensusError> {
615 if let Err(e) = self.consensus.validate_header(block.sealed_header()) {
616 error!(target: "engine::tree", ?block, "Failed to validate header {}: {e}", block.hash());
617 return Err(e)
618 }
619
620 if let Err(e) = self.consensus.validate_block_pre_execution(block.sealed_block()) {
621 error!(target: "engine::tree", ?block, "Failed to validate block {}: {e}", block.hash());
622 return Err(e)
623 }
624
625 Ok(())
626 }
627
628 fn execute_block<S, Err, T>(
630 &mut self,
631 state_provider: S,
632 env: ExecutionEnv<Evm>,
633 input: &BlockOrPayload<T>,
634 handle: &mut PayloadHandle<impl ExecutableTxFor<Evm>, Err>,
635 ) -> Result<BlockExecutionOutput<N::Receipt>, InsertBlockErrorKind>
636 where
637 S: StateProvider,
638 Err: core::error::Error + Send + Sync + 'static,
639 V: PayloadValidator<T, Block = N::Block>,
640 T: PayloadTypes<BuiltPayload: BuiltPayload<Primitives = N>>,
641 Evm: ConfigureEngineEvm<T::ExecutionData, Primitives = N>,
642 {
643 let num_hash = NumHash::new(env.evm_env.block_env.number.to(), env.hash);
644
645 let span = debug_span!(target: "engine::tree", "execute_block", num = ?num_hash.number, hash = ?num_hash.hash);
646 let _enter = span.enter();
647 debug!(target: "engine::tree", "Executing block");
648
649 let mut db = State::builder()
650 .with_database(StateProviderDatabase::new(&state_provider))
651 .with_bundle_update()
652 .without_state_clear()
653 .build();
654
655 let evm = self.evm_config.evm_with_env(&mut db, env.evm_env.clone());
656 let ctx =
657 self.execution_ctx_for(input).map_err(|e| InsertBlockErrorKind::Other(Box::new(e)))?;
658 let mut executor = self.evm_config.create_executor(evm, ctx);
659
660 if !self.config.precompile_cache_disabled() {
661 executor.evm_mut().precompiles_mut().map_pure_precompiles(|address, precompile| {
663 let metrics = self
664 .precompile_cache_metrics
665 .entry(*address)
666 .or_insert_with(|| CachedPrecompileMetrics::new_with_address(*address))
667 .clone();
668 CachedPrecompile::wrap(
669 precompile,
670 self.precompile_cache_map.cache_for_address(*address),
671 *env.evm_env.spec_id(),
672 Some(metrics),
673 )
674 });
675 }
676
677 let execution_start = Instant::now();
678 let state_hook = Box::new(handle.state_hook());
679 let output = self.metrics.execute_metered(
680 executor,
681 handle.iter_transactions().map(|res| res.map_err(BlockExecutionError::other)),
682 state_hook,
683 )?;
684 let execution_finish = Instant::now();
685 let execution_time = execution_finish.duration_since(execution_start);
686 debug!(target: "engine::tree", elapsed = ?execution_time, number=?num_hash.number, "Executed block");
687 Ok(output)
688 }
689
690 fn compute_state_root_parallel(
699 &self,
700 persisting_kind: PersistingKind,
701 parent_hash: B256,
702 hashed_state: &HashedPostState,
703 state: &EngineApiTreeState<N>,
704 ) -> Result<(B256, TrieUpdates), ParallelStateRootError> {
705 let consistent_view = ConsistentDbView::new_with_latest_tip(self.provider.clone())?;
706
707 let mut input = self.compute_trie_input(
708 persisting_kind,
709 consistent_view.provider_ro()?,
710 parent_hash,
711 state,
712 None,
713 )?;
714 input.append_ref(hashed_state);
716
717 ParallelStateRoot::new(consistent_view, input).incremental_root_with_updates()
718 }
719
720 fn block_connects_to_last_persisted(
725 &self,
726 ctx: TreeCtx<'_, N>,
727 block: &RecoveredBlock<N::Block>,
728 ) -> ProviderResult<bool> {
729 let provider = self.provider.database_provider_ro()?;
730 let last_persisted_block = provider.best_block_number()?;
731 let last_persisted_hash = provider
732 .block_hash(last_persisted_block)?
733 .ok_or(ProviderError::HeaderNotFound(last_persisted_block.into()))?;
734 let last_persisted = NumHash::new(last_persisted_block, last_persisted_hash);
735
736 let parent_num_hash = |hash: B256| -> ProviderResult<NumHash> {
737 let parent_num_hash =
738 if let Some(header) = ctx.state().tree_state.sealed_header_by_hash(&hash) {
739 Some(header.parent_num_hash())
740 } else {
741 provider.sealed_header_by_hash(hash)?.map(|header| header.parent_num_hash())
742 };
743
744 parent_num_hash.ok_or(ProviderError::BlockHashNotFound(hash))
745 };
746
747 let mut parent_block = block.parent_num_hash();
748 while parent_block.number > last_persisted.number {
749 parent_block = parent_num_hash(parent_block.hash)?;
750 }
751
752 let connects = parent_block == last_persisted;
753
754 debug!(
755 target: "engine::tree",
756 num_hash = ?block.num_hash(),
757 ?last_persisted,
758 ?parent_block,
759 "Checking if block connects to last persisted block"
760 );
761
762 Ok(connects)
763 }
764
765 fn validate_post_execution<T: PayloadTypes<BuiltPayload: BuiltPayload<Primitives = N>>>(
772 &self,
773 block: &RecoveredBlock<N::Block>,
774 parent_block: &SealedHeader<N::BlockHeader>,
775 output: &BlockExecutionOutput<N::Receipt>,
776 ctx: &mut TreeCtx<'_, N>,
777 ) -> Result<HashedPostState, InsertBlockErrorKind>
778 where
779 V: PayloadValidator<T, Block = N::Block>,
780 {
781 let start = Instant::now();
782
783 trace!(target: "engine::tree", block=?block.num_hash(), "Validating block consensus");
784 if let Err(e) = self.validate_block_inner(block) {
786 return Err(e.into())
787 }
788
789 if let Err(e) =
791 self.consensus.validate_header_against_parent(block.sealed_header(), parent_block)
792 {
793 warn!(target: "engine::tree", ?block, "Failed to validate header {} against parent: {e}", block.hash());
794 return Err(e.into())
795 }
796
797 if let Err(err) = self.consensus.validate_block_post_execution(block, output) {
798 self.on_invalid_block(parent_block, block, output, None, ctx.state_mut());
800 return Err(err.into())
801 }
802
803 let hashed_state = self.provider.hashed_post_state(&output.state);
804
805 if let Err(err) =
806 self.validator.validate_block_post_execution_with_hashed_state(&hashed_state, block)
807 {
808 self.on_invalid_block(parent_block, block, output, None, ctx.state_mut());
810 return Err(err.into())
811 }
812
813 self.metrics
815 .block_validation
816 .post_execution_validation_duration
817 .record(start.elapsed().as_secs_f64());
818
819 Ok(hashed_state)
820 }
821
822 #[allow(clippy::too_many_arguments)]
833 fn spawn_payload_processor<T: ExecutableTxIterator<Evm>>(
834 &mut self,
835 env: ExecutionEnv<Evm>,
836 txs: T,
837 provider_builder: StateProviderBuilder<N, P>,
838 persisting_kind: PersistingKind,
839 parent_hash: B256,
840 state: &EngineApiTreeState<N>,
841 block_num_hash: NumHash,
842 strategy: StateRootStrategy,
843 ) -> Result<
844 (
845 PayloadHandle<
846 impl ExecutableTxFor<Evm> + use<N, P, Evm, V, T>,
847 impl core::error::Error + Send + Sync + 'static + use<N, P, Evm, V, T>,
848 >,
849 StateRootStrategy,
850 ),
851 InsertBlockErrorKind,
852 > {
853 match strategy {
854 StateRootStrategy::StateRootTask => {
855 let consistent_view = ConsistentDbView::new_with_latest_tip(self.provider.clone())?;
857
858 let allocated_trie_input = self.payload_processor.take_trie_input();
860
861 let trie_input_start = Instant::now();
863 let trie_input = self.compute_trie_input(
864 persisting_kind,
865 consistent_view.provider_ro()?,
866 parent_hash,
867 state,
868 allocated_trie_input,
869 )?;
870
871 self.metrics
872 .block_validation
873 .trie_input_duration
874 .record(trie_input_start.elapsed().as_secs_f64());
875
876 let spawn_start = Instant::now();
879 let (handle, strategy) = if trie_input.prefix_sets.is_empty() {
880 match self.payload_processor.spawn(
881 env,
882 txs,
883 provider_builder,
884 consistent_view,
885 trie_input,
886 &self.config,
887 ) {
888 Ok(handle) => {
889 (handle, StateRootStrategy::StateRootTask)
891 }
892 Err((error, txs, env, provider_builder)) => {
893 error!(
896 target: "engine::tree",
897 block=?block_num_hash,
898 ?error,
899 "Failed to initialize proof task manager, falling back to parallel state root"
900 );
901 (
902 self.payload_processor.spawn_cache_exclusive(
903 env,
904 txs,
905 provider_builder,
906 ),
907 StateRootStrategy::Parallel,
908 )
909 }
910 }
911 } else {
914 debug!(
915 target: "engine::tree",
916 block=?block_num_hash,
917 "Disabling state root task due to non-empty prefix sets"
918 );
919 (
920 self.payload_processor.spawn_cache_exclusive(env, txs, provider_builder),
921 StateRootStrategy::Parallel,
922 )
923 };
924
925 self.metrics
927 .block_validation
928 .spawn_payload_processor
929 .record(spawn_start.elapsed().as_secs_f64());
930
931 Ok((handle, strategy))
932 }
933 strategy @ (StateRootStrategy::Parallel | StateRootStrategy::Synchronous) => {
934 let start = Instant::now();
935 let handle =
936 self.payload_processor.spawn_cache_exclusive(env, txs, provider_builder);
937
938 self.metrics
940 .block_validation
941 .spawn_payload_processor
942 .record(start.elapsed().as_secs_f64());
943
944 Ok((handle, strategy))
945 }
946 }
947 }
948
949 fn has_ancestors_with_missing_trie_updates(
951 &self,
952 target_header: BlockWithParent,
953 state: &EngineApiTreeState<N>,
954 ) -> bool {
955 let mut current_hash = target_header.parent;
957 while let Some(block) = state.tree_state.blocks_by_hash.get(¤t_hash) {
958 if block.trie.is_missing() {
960 return true;
961 }
962
963 current_hash = block.recovered_block().parent_hash();
965 }
966
967 false
968 }
969
970 fn state_provider_builder(
975 &self,
976 hash: B256,
977 state: &EngineApiTreeState<N>,
978 ) -> ProviderResult<Option<StateProviderBuilder<N, P>>> {
979 if let Some((historical, blocks)) = state.tree_state.blocks_by_hash(hash) {
980 debug!(target: "engine::tree", %hash, %historical, "found canonical state for block in memory, creating provider builder");
981 return Ok(Some(StateProviderBuilder::new(
983 self.provider.clone(),
984 historical,
985 Some(blocks),
986 )))
987 }
988
989 if let Some(header) = self.provider.header(hash)? {
991 debug!(target: "engine::tree", %hash, number = %header.number(), "found canonical state for block in database, creating provider builder");
992 return Ok(Some(StateProviderBuilder::new(self.provider.clone(), hash, None)))
995 }
996
997 debug!(target: "engine::tree", %hash, "no canonical state found for block");
998 Ok(None)
999 }
1000
1001 fn plan_state_root_computation<T: PayloadTypes<BuiltPayload: BuiltPayload<Primitives = N>>>(
1003 &self,
1004 input: &BlockOrPayload<T>,
1005 ctx: &TreeCtx<'_, N>,
1006 ) -> StateRootPlan {
1007 let persisting_kind = ctx.persisting_kind_for(input.block_with_parent());
1017 let can_run_parallel =
1018 persisting_kind.can_run_parallel_state_root() && !self.config.state_root_fallback();
1019
1020 let has_ancestors_with_missing_trie_updates =
1022 self.has_ancestors_with_missing_trie_updates(input.block_with_parent(), ctx.state());
1023
1024 let strategy = if can_run_parallel {
1033 if self.config.use_state_root_task() && !has_ancestors_with_missing_trie_updates {
1034 StateRootStrategy::StateRootTask
1035 } else {
1036 StateRootStrategy::Parallel
1037 }
1038 } else {
1039 StateRootStrategy::Synchronous
1040 };
1041
1042 debug!(
1043 target: "engine::tree",
1044 block=?input.num_hash(),
1045 ?strategy,
1046 has_ancestors_with_missing_trie_updates,
1047 "Planned state root computation strategy"
1048 );
1049
1050 StateRootPlan { strategy, has_ancestors_with_missing_trie_updates, persisting_kind }
1051 }
1052
1053 fn on_invalid_block(
1055 &self,
1056 parent_header: &SealedHeader<N::BlockHeader>,
1057 block: &RecoveredBlock<N::Block>,
1058 output: &BlockExecutionOutput<N::Receipt>,
1059 trie_updates: Option<(&TrieUpdates, B256)>,
1060 state: &mut EngineApiTreeState<N>,
1061 ) {
1062 if state.invalid_headers.get(&block.hash()).is_some() {
1063 return
1065 }
1066 self.invalid_block_hook.on_invalid_block(parent_header, block, output, trie_updates);
1067 }
1068
1069 fn compute_trie_input<TP: DBProvider + BlockNumReader>(
1085 &self,
1086 persisting_kind: PersistingKind,
1087 provider: TP,
1088 parent_hash: B256,
1089 state: &EngineApiTreeState<N>,
1090 allocated_trie_input: Option<TrieInput>,
1091 ) -> ProviderResult<TrieInput> {
1092 let mut input = allocated_trie_input.unwrap_or_default();
1094
1095 let best_block_number = provider.best_block_number()?;
1096
1097 let (mut historical, mut blocks) = state
1098 .tree_state
1099 .blocks_by_hash(parent_hash)
1100 .map_or_else(|| (parent_hash.into(), vec![]), |(hash, blocks)| (hash.into(), blocks));
1101
1102 if persisting_kind.is_descendant() {
1105 while let Some(block) = blocks.last() {
1107 let recovered_block = block.recovered_block();
1108 if recovered_block.number() <= best_block_number {
1109 blocks.pop();
1112 } else {
1113 break
1116 }
1117 }
1118
1119 historical = if let Some(block) = blocks.last() {
1120 (block.recovered_block().number() - 1).into()
1123 } else {
1124 parent_hash.into()
1126 };
1127 }
1128
1129 if blocks.is_empty() {
1130 debug!(target: "engine::tree", %parent_hash, "Parent found on disk");
1131 } else {
1132 debug!(target: "engine::tree", %parent_hash, %historical, blocks = blocks.len(), "Parent found in memory");
1133 }
1134
1135 let block_number = provider
1137 .convert_hash_or_number(historical)?
1138 .ok_or_else(|| ProviderError::BlockHashNotFound(historical.as_hash().unwrap()))?;
1139
1140 let revert_state = if block_number == best_block_number {
1142 debug!(target: "engine::tree", block_number, best_block_number, "Empty revert state");
1145 HashedPostState::default()
1146 } else {
1147 let revert_state = HashedPostState::from_reverts::<KeccakKeyHasher>(
1148 provider.tx_ref(),
1149 block_number + 1..,
1150 )
1151 .map_err(ProviderError::from)?;
1152 debug!(
1153 target: "engine::tree",
1154 block_number,
1155 best_block_number,
1156 accounts = revert_state.accounts.len(),
1157 storages = revert_state.storages.len(),
1158 "Non-empty revert state"
1159 );
1160 revert_state
1161 };
1162 input.append(revert_state);
1163
1164 input.extend_with_blocks(
1166 blocks.iter().rev().map(|block| (block.hashed_state(), block.trie_updates())),
1167 );
1168
1169 Ok(input)
1170 }
1171}
1172
1173pub type ValidationOutcome<N, E = InsertPayloadError<BlockTy<N>>> =
1175 Result<ExecutedBlockWithTrieUpdates<N>, E>;
1176
1177#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1179enum StateRootStrategy {
1180 StateRootTask,
1182 Parallel,
1184 Synchronous,
1186}
1187
1188struct StateRootPlan {
1190 strategy: StateRootStrategy,
1192 has_ancestors_with_missing_trie_updates: bool,
1194 persisting_kind: PersistingKind,
1196}
1197
1198pub trait EngineValidator<
1202 Types: PayloadTypes,
1203 N: NodePrimitives = <<Types as PayloadTypes>::BuiltPayload as BuiltPayload>::Primitives,
1204>: Send + Sync + 'static
1205{
1206 fn validate_payload_attributes_against_header(
1216 &self,
1217 attr: &Types::PayloadAttributes,
1218 header: &N::BlockHeader,
1219 ) -> Result<(), InvalidPayloadAttributesError>;
1220
1221 fn ensure_well_formed_payload(
1230 &self,
1231 payload: Types::ExecutionData,
1232 ) -> Result<RecoveredBlock<N::Block>, NewPayloadError>;
1233
1234 fn validate_payload(
1236 &mut self,
1237 payload: Types::ExecutionData,
1238 ctx: TreeCtx<'_, N>,
1239 ) -> ValidationOutcome<N>;
1240
1241 fn validate_block(
1243 &mut self,
1244 block: RecoveredBlock<N::Block>,
1245 ctx: TreeCtx<'_, N>,
1246 ) -> ValidationOutcome<N>;
1247}
1248
1249impl<N, Types, P, Evm, V> EngineValidator<Types> for BasicEngineValidator<P, Evm, V>
1250where
1251 P: DatabaseProviderFactory<Provider: BlockReader>
1252 + BlockReader<Header = N::BlockHeader>
1253 + StateProviderFactory
1254 + StateReader
1255 + HashedPostStateProvider
1256 + Clone
1257 + 'static,
1258 N: NodePrimitives,
1259 V: PayloadValidator<Types, Block = N::Block>,
1260 Evm: ConfigureEngineEvm<Types::ExecutionData, Primitives = N> + 'static,
1261 Types: PayloadTypes<BuiltPayload: BuiltPayload<Primitives = N>>,
1262{
1263 fn validate_payload_attributes_against_header(
1264 &self,
1265 attr: &Types::PayloadAttributes,
1266 header: &N::BlockHeader,
1267 ) -> Result<(), InvalidPayloadAttributesError> {
1268 self.validator.validate_payload_attributes_against_header(attr, header)
1269 }
1270
1271 fn ensure_well_formed_payload(
1272 &self,
1273 payload: Types::ExecutionData,
1274 ) -> Result<RecoveredBlock<N::Block>, NewPayloadError> {
1275 let block = self.validator.ensure_well_formed_payload(payload)?;
1276 Ok(block)
1277 }
1278
1279 fn validate_payload(
1280 &mut self,
1281 payload: Types::ExecutionData,
1282 ctx: TreeCtx<'_, N>,
1283 ) -> ValidationOutcome<N> {
1284 self.validate_block_with_state(BlockOrPayload::Payload(payload), ctx)
1285 }
1286
1287 fn validate_block(
1288 &mut self,
1289 block: RecoveredBlock<N::Block>,
1290 ctx: TreeCtx<'_, N>,
1291 ) -> ValidationOutcome<N> {
1292 self.validate_block_with_state(BlockOrPayload::Block(block), ctx)
1293 }
1294}
1295
1296#[derive(Debug)]
1298pub enum BlockOrPayload<T: PayloadTypes> {
1299 Payload(T::ExecutionData),
1301 Block(RecoveredBlock<BlockTy<<T::BuiltPayload as BuiltPayload>::Primitives>>),
1303}
1304
1305impl<T: PayloadTypes> BlockOrPayload<T> {
1306 pub fn hash(&self) -> B256 {
1308 match self {
1309 Self::Payload(payload) => payload.block_hash(),
1310 Self::Block(block) => block.hash(),
1311 }
1312 }
1313
1314 pub fn num_hash(&self) -> NumHash {
1316 match self {
1317 Self::Payload(payload) => payload.num_hash(),
1318 Self::Block(block) => block.num_hash(),
1319 }
1320 }
1321
1322 pub fn parent_hash(&self) -> B256 {
1324 match self {
1325 Self::Payload(payload) => payload.parent_hash(),
1326 Self::Block(block) => block.parent_hash(),
1327 }
1328 }
1329
1330 pub fn block_with_parent(&self) -> BlockWithParent {
1332 match self {
1333 Self::Payload(payload) => payload.block_with_parent(),
1334 Self::Block(block) => block.block_with_parent(),
1335 }
1336 }
1337}