1use super::precompile_cache::PrecompileCacheMap;
4use crate::tree::{
5 cached_state::{
6 CachedStateMetrics, ExecutionCache as StateExecutionCache, ExecutionCacheBuilder,
7 SavedCache,
8 },
9 payload_processor::{
10 prewarm::{PrewarmCacheTask, PrewarmContext, PrewarmTaskEvent},
11 sparse_trie::StateRootComputeOutcome,
12 },
13 sparse_trie::SparseTrieTask,
14 StateProviderBuilder, TreeConfig,
15};
16use alloy_evm::{block::StateChangeSource, ToTxEnv};
17use alloy_primitives::B256;
18use executor::WorkloadExecutor;
19use multiproof::{SparseTrieUpdate, *};
20use parking_lot::RwLock;
21use prewarm::PrewarmMetrics;
22use reth_engine_primitives::ExecutableTxIterator;
23use reth_evm::{
24 execute::{ExecutableTxFor, WithTxEnv},
25 ConfigureEvm, EvmEnvFor, OnStateHook, SpecFor, TxEnvFor,
26};
27use reth_primitives_traits::NodePrimitives;
28use reth_provider::{
29 providers::ConsistentDbView, BlockReader, DatabaseProviderFactory, StateProviderFactory,
30 StateReader,
31};
32use reth_revm::{db::BundleState, state::EvmState};
33use reth_trie::TrieInput;
34use reth_trie_parallel::{
35 proof_task::{ProofTaskCtx, ProofTaskManager},
36 root::ParallelStateRootError,
37};
38use reth_trie_sparse::{
39 provider::{TrieNodeProvider, TrieNodeProviderFactory},
40 ClearedSparseStateTrie, SparseStateTrie, SparseTrie,
41};
42use reth_trie_sparse_parallel::{ParallelSparseTrie, ParallelismThresholds};
43use std::sync::{
44 atomic::AtomicBool,
45 mpsc::{self, channel, Sender},
46 Arc,
47};
48use tracing::{debug, instrument};
49
50mod configured_sparse_trie;
51pub mod executor;
52pub mod multiproof;
53pub mod prewarm;
54pub mod sparse_trie;
55
56use configured_sparse_trie::ConfiguredSparseTrie;
57
58pub const PARALLEL_SPARSE_TRIE_PARALLELISM_THRESHOLDS: ParallelismThresholds =
64 ParallelismThresholds { min_revealed_nodes: 100, min_updated_nodes: 100 };
65
66#[derive(Debug)]
68pub struct PayloadProcessor<Evm>
69where
70 Evm: ConfigureEvm,
71{
72 executor: WorkloadExecutor,
74 execution_cache: ExecutionCache,
76 trie_metrics: MultiProofTaskMetrics,
78 cross_block_cache_size: u64,
80 disable_transaction_prewarming: bool,
82 evm_config: Evm,
84 precompile_cache_disabled: bool,
86 precompile_cache_map: PrecompileCacheMap<SpecFor<Evm>>,
88 sparse_state_trie: Arc<
91 parking_lot::Mutex<
92 Option<ClearedSparseStateTrie<ConfiguredSparseTrie, ConfiguredSparseTrie>>,
93 >,
94 >,
95 disable_parallel_sparse_trie: bool,
97 trie_input: Option<TrieInput>,
99}
100
101impl<N, Evm> PayloadProcessor<Evm>
102where
103 N: NodePrimitives,
104 Evm: ConfigureEvm<Primitives = N>,
105{
106 pub fn new(
108 executor: WorkloadExecutor,
109 evm_config: Evm,
110 config: &TreeConfig,
111 precompile_cache_map: PrecompileCacheMap<SpecFor<Evm>>,
112 ) -> Self {
113 Self {
114 executor,
115 execution_cache: Default::default(),
116 trie_metrics: Default::default(),
117 cross_block_cache_size: config.cross_block_cache_size(),
118 disable_transaction_prewarming: config.disable_caching_and_prewarming(),
119 evm_config,
120 precompile_cache_disabled: config.precompile_cache_disabled(),
121 precompile_cache_map,
122 sparse_state_trie: Arc::default(),
123 trie_input: None,
124 disable_parallel_sparse_trie: config.disable_parallel_sparse_trie(),
125 }
126 }
127}
128
129impl<N, Evm> PayloadProcessor<Evm>
130where
131 N: NodePrimitives,
132 Evm: ConfigureEvm<Primitives = N> + 'static,
133{
134 pub fn spawn<P, I: ExecutableTxIterator<Evm>>(
167 &mut self,
168 env: ExecutionEnv<Evm>,
169 transactions: I,
170 provider_builder: StateProviderBuilder<N, P>,
171 consistent_view: ConsistentDbView<P>,
172 trie_input: TrieInput,
173 config: &TreeConfig,
174 ) -> PayloadHandle<WithTxEnv<TxEnvFor<Evm>, I::Tx>, I::Error>
175 where
176 P: DatabaseProviderFactory<Provider: BlockReader>
177 + BlockReader
178 + StateProviderFactory
179 + StateReader
180 + Clone
181 + 'static,
182 {
183 let (to_sparse_trie, sparse_trie_rx) = channel();
184 let (trie_input, state_root_config) =
186 MultiProofConfig::new_from_input(consistent_view, trie_input);
187 self.trie_input = Some(trie_input);
188
189 let task_ctx = ProofTaskCtx::new(
191 state_root_config.nodes_sorted.clone(),
192 state_root_config.state_sorted.clone(),
193 state_root_config.prefix_sets.clone(),
194 );
195 let max_proof_task_concurrency = config.max_proof_task_concurrency() as usize;
196 let proof_task = ProofTaskManager::new(
197 self.executor.handle().clone(),
198 state_root_config.consistent_view.clone(),
199 task_ctx,
200 max_proof_task_concurrency,
201 );
202
203 let max_multi_proof_task_concurrency = max_proof_task_concurrency / 2;
206 let multi_proof_task = MultiProofTask::new(
207 state_root_config,
208 self.executor.clone(),
209 proof_task.handle(),
210 to_sparse_trie,
211 max_multi_proof_task_concurrency,
212 );
213
214 let to_multi_proof = Some(multi_proof_task.state_root_message_sender());
216
217 let (prewarm_rx, execution_rx) = self.spawn_tx_iterator(transactions);
218
219 let prewarm_handle =
220 self.spawn_caching_with(env, prewarm_rx, provider_builder, to_multi_proof.clone());
221
222 self.executor.spawn_blocking(move || {
224 multi_proof_task.run();
225 });
226
227 let (state_root_tx, state_root_rx) = channel();
229
230 self.spawn_sparse_trie_task(sparse_trie_rx, proof_task.handle(), state_root_tx);
232
233 self.executor.spawn_blocking(move || {
235 if let Err(err) = proof_task.run() {
236 tracing::error!(
238 target: "engine::root",
239 ?err,
240 "Storage proof task returned an error"
241 );
242 }
243 });
244
245 PayloadHandle {
246 to_multi_proof,
247 prewarm_handle,
248 state_root: Some(state_root_rx),
249 transactions: execution_rx,
250 }
251 }
252
253 pub(super) fn spawn_cache_exclusive<P, I: ExecutableTxIterator<Evm>>(
257 &self,
258 env: ExecutionEnv<Evm>,
259 transactions: I,
260 provider_builder: StateProviderBuilder<N, P>,
261 ) -> PayloadHandle<WithTxEnv<TxEnvFor<Evm>, I::Tx>, I::Error>
262 where
263 P: BlockReader + StateProviderFactory + StateReader + Clone + 'static,
264 {
265 let (prewarm_rx, execution_rx) = self.spawn_tx_iterator(transactions);
266 let prewarm_handle = self.spawn_caching_with(env, prewarm_rx, provider_builder, None);
267 PayloadHandle {
268 to_multi_proof: None,
269 prewarm_handle,
270 state_root: None,
271 transactions: execution_rx,
272 }
273 }
274
275 #[expect(clippy::type_complexity)]
277 fn spawn_tx_iterator<I: ExecutableTxIterator<Evm>>(
278 &self,
279 transactions: I,
280 ) -> (
281 mpsc::Receiver<WithTxEnv<TxEnvFor<Evm>, I::Tx>>,
282 mpsc::Receiver<Result<WithTxEnv<TxEnvFor<Evm>, I::Tx>, I::Error>>,
283 ) {
284 let (prewarm_tx, prewarm_rx) = mpsc::channel();
285 let (execute_tx, execute_rx) = mpsc::channel();
286 self.executor.spawn_blocking(move || {
287 for tx in transactions {
288 let tx = tx.map(|tx| WithTxEnv { tx_env: tx.to_tx_env(), tx });
289 if let Ok(tx) = &tx {
291 let _ = prewarm_tx.send(tx.clone());
292 }
293 let _ = execute_tx.send(tx);
294 }
295 });
296
297 (prewarm_rx, execute_rx)
298 }
299
300 fn spawn_caching_with<P>(
302 &self,
303 env: ExecutionEnv<Evm>,
304 mut transactions: mpsc::Receiver<impl ExecutableTxFor<Evm> + Send + 'static>,
305 provider_builder: StateProviderBuilder<N, P>,
306 to_multi_proof: Option<Sender<MultiProofMessage>>,
307 ) -> CacheTaskHandle
308 where
309 P: BlockReader + StateProviderFactory + StateReader + Clone + 'static,
310 {
311 if self.disable_transaction_prewarming {
312 transactions = mpsc::channel().1;
315 }
316
317 let (cache, cache_metrics) = self.cache_for(env.parent_hash).split();
318 let prewarm_ctx = PrewarmContext {
320 env,
321 evm_config: self.evm_config.clone(),
322 cache: cache.clone(),
323 cache_metrics: cache_metrics.clone(),
324 provider: provider_builder,
325 metrics: PrewarmMetrics::default(),
326 terminate_execution: Arc::new(AtomicBool::new(false)),
327 precompile_cache_disabled: self.precompile_cache_disabled,
328 precompile_cache_map: self.precompile_cache_map.clone(),
329 };
330
331 let (prewarm_task, to_prewarm_task) = PrewarmCacheTask::new(
332 self.executor.clone(),
333 self.execution_cache.clone(),
334 prewarm_ctx,
335 to_multi_proof,
336 );
337
338 {
340 let to_prewarm_task = to_prewarm_task.clone();
341 self.executor.spawn_blocking(move || {
342 prewarm_task.run(transactions, to_prewarm_task);
343 });
344 }
345
346 CacheTaskHandle { cache, to_prewarm_task: Some(to_prewarm_task), cache_metrics }
347 }
348
349 pub const fn take_trie_input(&mut self) -> Option<TrieInput> {
351 self.trie_input.take()
352 }
353
354 #[instrument(target = "engine::caching", skip(self))]
359 fn cache_for(&self, parent_hash: B256) -> SavedCache {
360 self.execution_cache
361 .get_cache_for(parent_hash)
362 .inspect(|_| debug!("reusing execution cache"))
363 .unwrap_or_else(|| {
364 debug!("creating new execution cache on cache miss");
365 let cache =
366 ExecutionCacheBuilder::default().build_caches(self.cross_block_cache_size);
367 SavedCache::new(parent_hash, cache, CachedStateMetrics::zeroed())
368 })
369 }
370
371 fn spawn_sparse_trie_task<BPF>(
373 &self,
374 sparse_trie_rx: mpsc::Receiver<SparseTrieUpdate>,
375 proof_task_handle: BPF,
376 state_root_tx: mpsc::Sender<Result<StateRootComputeOutcome, ParallelStateRootError>>,
377 ) where
378 BPF: TrieNodeProviderFactory + Clone + Send + Sync + 'static,
379 BPF::AccountNodeProvider: TrieNodeProvider + Send + Sync,
380 BPF::StorageNodeProvider: TrieNodeProvider + Send + Sync,
381 {
382 let cleared_sparse_trie = Arc::clone(&self.sparse_state_trie);
385 let sparse_state_trie = cleared_sparse_trie.lock().take().unwrap_or_else(|| {
386 let default_trie = SparseTrie::blind_from(if self.disable_parallel_sparse_trie {
387 ConfiguredSparseTrie::Serial(Default::default())
388 } else {
389 ConfiguredSparseTrie::Parallel(Box::new(
390 ParallelSparseTrie::default()
391 .with_parallelism_thresholds(PARALLEL_SPARSE_TRIE_PARALLELISM_THRESHOLDS),
392 ))
393 });
394 ClearedSparseStateTrie::from_state_trie(
395 SparseStateTrie::new()
396 .with_accounts_trie(default_trie.clone())
397 .with_default_storage_trie(default_trie)
398 .with_updates(true),
399 )
400 });
401
402 let task =
403 SparseTrieTask::<_, ConfiguredSparseTrie, ConfiguredSparseTrie>::new_with_cleared_trie(
404 sparse_trie_rx,
405 proof_task_handle,
406 self.trie_metrics.clone(),
407 sparse_state_trie,
408 );
409
410 self.executor.spawn_blocking(move || {
411 let (result, trie) = task.run();
412 let _ = state_root_tx.send(result);
414
415 cleared_sparse_trie.lock().replace(ClearedSparseStateTrie::from_state_trie(trie));
418 });
419 }
420}
421
422#[derive(Debug)]
424pub struct PayloadHandle<Tx, Err> {
425 to_multi_proof: Option<Sender<MultiProofMessage>>,
427 prewarm_handle: CacheTaskHandle,
429 state_root: Option<mpsc::Receiver<Result<StateRootComputeOutcome, ParallelStateRootError>>>,
431 transactions: mpsc::Receiver<Result<Tx, Err>>,
433}
434
435impl<Tx, Err> PayloadHandle<Tx, Err> {
436 pub fn state_root(&mut self) -> Result<StateRootComputeOutcome, ParallelStateRootError> {
442 self.state_root
443 .take()
444 .expect("state_root is None")
445 .recv()
446 .map_err(|_| ParallelStateRootError::Other("sparse trie task dropped".to_string()))?
447 }
448
449 pub fn state_hook(&self) -> impl OnStateHook {
453 let to_multi_proof = self.to_multi_proof.clone().map(StateHookSender::new);
455
456 move |source: StateChangeSource, state: &EvmState| {
457 if let Some(sender) = &to_multi_proof {
458 let _ = sender.send(MultiProofMessage::StateUpdate(source, state.clone()));
459 }
460 }
461 }
462
463 pub(super) fn caches(&self) -> StateExecutionCache {
465 self.prewarm_handle.cache.clone()
466 }
467
468 pub(super) fn cache_metrics(&self) -> CachedStateMetrics {
469 self.prewarm_handle.cache_metrics.clone()
470 }
471
472 pub(super) fn stop_prewarming_execution(&self) {
476 self.prewarm_handle.stop_prewarming_execution()
477 }
478
479 pub(super) fn terminate_caching(&mut self, block_output: Option<BundleState>) {
483 self.prewarm_handle.terminate_caching(block_output)
484 }
485
486 pub fn iter_transactions(&mut self) -> impl Iterator<Item = Result<Tx, Err>> + '_ {
488 core::iter::repeat_with(|| self.transactions.recv())
489 .take_while(|res| res.is_ok())
490 .map(|res| res.unwrap())
491 }
492}
493
494#[derive(Debug)]
496pub(crate) struct CacheTaskHandle {
497 cache: StateExecutionCache,
499 cache_metrics: CachedStateMetrics,
501 to_prewarm_task: Option<Sender<PrewarmTaskEvent>>,
503}
504
505impl CacheTaskHandle {
506 pub(super) fn stop_prewarming_execution(&self) {
510 self.to_prewarm_task
511 .as_ref()
512 .map(|tx| tx.send(PrewarmTaskEvent::TerminateTransactionExecution).ok());
513 }
514
515 pub(super) fn terminate_caching(&mut self, block_output: Option<BundleState>) {
519 self.to_prewarm_task
520 .take()
521 .map(|tx| tx.send(PrewarmTaskEvent::Terminate { block_output }).ok());
522 }
523}
524
525impl Drop for CacheTaskHandle {
526 fn drop(&mut self) {
527 self.terminate_caching(None);
529 }
530}
531
532#[derive(Clone, Debug, Default)]
558struct ExecutionCache {
559 inner: Arc<RwLock<Option<SavedCache>>>,
561}
562
563impl ExecutionCache {
564 pub(crate) fn get_cache_for(&self, parent_hash: B256) -> Option<SavedCache> {
566 let cache = self.inner.read();
567 cache
568 .as_ref()
569 .and_then(|cache| (cache.executed_block_hash() == parent_hash).then(|| cache.clone()))
570 }
571
572 #[expect(unused)]
574 pub(crate) fn clear(&self) {
575 self.inner.write().take();
576 }
577
578 pub(crate) fn update_with_guard<F>(&self, update_fn: F)
592 where
593 F: FnOnce(&mut Option<SavedCache>),
594 {
595 let mut guard = self.inner.write();
596 update_fn(&mut guard);
597 }
598}
599
600#[derive(Debug, Clone)]
602pub struct ExecutionEnv<Evm: ConfigureEvm> {
603 pub evm_env: EvmEnvFor<Evm>,
605 pub hash: B256,
607 pub parent_hash: B256,
609}
610
611impl<Evm: ConfigureEvm> Default for ExecutionEnv<Evm>
612where
613 EvmEnvFor<Evm>: Default,
614{
615 fn default() -> Self {
616 Self {
617 evm_env: Default::default(),
618 hash: Default::default(),
619 parent_hash: Default::default(),
620 }
621 }
622}
623
624#[cfg(test)]
625mod tests {
626 use crate::tree::{
627 payload_processor::{
628 evm_state_to_hashed_post_state, executor::WorkloadExecutor, PayloadProcessor,
629 },
630 precompile_cache::PrecompileCacheMap,
631 StateProviderBuilder, TreeConfig,
632 };
633 use alloy_evm::block::StateChangeSource;
634 use rand::Rng;
635 use reth_chainspec::ChainSpec;
636 use reth_db_common::init::init_genesis;
637 use reth_ethereum_primitives::TransactionSigned;
638 use reth_evm::OnStateHook;
639 use reth_evm_ethereum::EthEvmConfig;
640 use reth_primitives_traits::{Account, Recovered, StorageEntry};
641 use reth_provider::{
642 providers::{BlockchainProvider, ConsistentDbView},
643 test_utils::create_test_provider_factory_with_chain_spec,
644 ChainSpecProvider, HashingWriter,
645 };
646 use reth_testing_utils::generators;
647 use reth_trie::{test_utils::state_root, HashedPostState, TrieInput};
648 use revm_primitives::{Address, HashMap, B256, KECCAK_EMPTY, U256};
649 use revm_state::{AccountInfo, AccountStatus, EvmState, EvmStorageSlot};
650 use std::sync::Arc;
651
652 fn create_mock_state_updates(num_accounts: usize, updates_per_account: usize) -> Vec<EvmState> {
653 let mut rng = generators::rng();
654 let all_addresses: Vec<Address> = (0..num_accounts).map(|_| rng.random()).collect();
655 let mut updates = Vec::with_capacity(updates_per_account);
656
657 for _ in 0..updates_per_account {
658 let num_accounts_in_update = rng.random_range(1..=num_accounts);
659 let mut state_update = EvmState::default();
660
661 let selected_addresses = &all_addresses[0..num_accounts_in_update];
662
663 for &address in selected_addresses {
664 let mut storage = HashMap::default();
665 if rng.random_bool(0.7) {
666 for _ in 0..rng.random_range(1..10) {
667 let slot = U256::from(rng.random::<u64>());
668 storage.insert(
669 slot,
670 EvmStorageSlot::new_changed(
671 U256::ZERO,
672 U256::from(rng.random::<u64>()),
673 0,
674 ),
675 );
676 }
677 }
678
679 let account = revm_state::Account {
680 info: AccountInfo {
681 balance: U256::from(rng.random::<u64>()),
682 nonce: rng.random::<u64>(),
683 code_hash: KECCAK_EMPTY,
684 code: Some(Default::default()),
685 },
686 storage,
687 status: AccountStatus::Touched,
688 transaction_id: 0,
689 };
690
691 state_update.insert(address, account);
692 }
693
694 updates.push(state_update);
695 }
696
697 updates
698 }
699
700 #[test]
701 fn test_state_root() {
702 reth_tracing::init_test_tracing();
703
704 let factory = create_test_provider_factory_with_chain_spec(Arc::new(ChainSpec::default()));
705 let genesis_hash = init_genesis(&factory).unwrap();
706
707 let state_updates = create_mock_state_updates(10, 10);
708 let mut hashed_state = HashedPostState::default();
709 let mut accumulated_state: HashMap<Address, (Account, HashMap<B256, U256>)> =
710 HashMap::default();
711
712 {
713 let provider_rw = factory.provider_rw().expect("failed to get provider");
714
715 for update in &state_updates {
716 let account_updates = update.iter().map(|(address, account)| {
717 (*address, Some(Account::from_revm_account(account)))
718 });
719 provider_rw
720 .insert_account_for_hashing(account_updates)
721 .expect("failed to insert accounts");
722
723 let storage_updates = update.iter().map(|(address, account)| {
724 let storage_entries = account.storage.iter().map(|(slot, value)| {
725 StorageEntry { key: B256::from(*slot), value: value.present_value }
726 });
727 (*address, storage_entries)
728 });
729 provider_rw
730 .insert_storage_for_hashing(storage_updates)
731 .expect("failed to insert storage");
732 }
733 provider_rw.commit().expect("failed to commit changes");
734 }
735
736 for update in &state_updates {
737 hashed_state.extend(evm_state_to_hashed_post_state(update.clone()));
738
739 for (address, account) in update {
740 let storage: HashMap<B256, U256> = account
741 .storage
742 .iter()
743 .map(|(k, v)| (B256::from(*k), v.present_value))
744 .collect();
745
746 let entry = accumulated_state.entry(*address).or_default();
747 entry.0 = Account::from_revm_account(account);
748 entry.1.extend(storage);
749 }
750 }
751
752 let mut payload_processor = PayloadProcessor::new(
753 WorkloadExecutor::default(),
754 EthEvmConfig::new(factory.chain_spec()),
755 &TreeConfig::default(),
756 PrecompileCacheMap::default(),
757 );
758 let provider = BlockchainProvider::new(factory).unwrap();
759 let mut handle = payload_processor.spawn(
760 Default::default(),
761 core::iter::empty::<Result<Recovered<TransactionSigned>, core::convert::Infallible>>(),
762 StateProviderBuilder::new(provider.clone(), genesis_hash, None),
763 ConsistentDbView::new_with_latest_tip(provider).unwrap(),
764 TrieInput::from_state(hashed_state),
765 &TreeConfig::default(),
766 );
767
768 let mut state_hook = handle.state_hook();
769
770 for (i, update) in state_updates.into_iter().enumerate() {
771 state_hook.on_state(StateChangeSource::Transaction(i), &update);
772 }
773 drop(state_hook);
774
775 let root_from_task = handle.state_root().expect("task failed").state_root;
776 let root_from_regular = state_root(accumulated_state);
777
778 assert_eq!(
779 root_from_task, root_from_regular,
780 "State root mismatch: task={root_from_task}, base={root_from_regular}"
781 );
782 }
783}