1use crate::tree::{
4 cached_state::{CachedStateMetrics, ProviderCacheBuilder, ProviderCaches, SavedCache},
5 payload_processor::{
6 prewarm::{PrewarmCacheTask, PrewarmContext, PrewarmTaskEvent},
7 sparse_trie::StateRootComputeOutcome,
8 },
9 sparse_trie::SparseTrieTask,
10 StateProviderBuilder, TreeConfig,
11};
12use alloy_evm::{block::StateChangeSource, ToTxEnv};
13use alloy_primitives::B256;
14use executor::WorkloadExecutor;
15use multiproof::{SparseTrieUpdate, *};
16use parking_lot::RwLock;
17use prewarm::PrewarmMetrics;
18use reth_engine_primitives::ExecutableTxIterator;
19use reth_evm::{
20 execute::{ExecutableTxFor, WithTxEnv},
21 ConfigureEvm, EvmEnvFor, OnStateHook, SpecFor, TxEnvFor,
22};
23use reth_primitives_traits::NodePrimitives;
24use reth_provider::{
25 providers::ConsistentDbView, BlockReader, DatabaseProviderFactory, StateProviderFactory,
26 StateReader,
27};
28use reth_revm::{db::BundleState, state::EvmState};
29use reth_trie::TrieInput;
30use reth_trie_parallel::{
31 proof_task::{ProofTaskCtx, ProofTaskManager},
32 root::ParallelStateRootError,
33};
34use reth_trie_sparse::{
35 provider::{TrieNodeProvider, TrieNodeProviderFactory},
36 ClearedSparseStateTrie, SerialSparseTrie, SparseStateTrie, SparseTrie,
37};
38use std::sync::{
39 atomic::AtomicBool,
40 mpsc::{self, channel, Sender},
41 Arc,
42};
43
44use super::precompile_cache::PrecompileCacheMap;
45
46mod configured_sparse_trie;
47pub mod executor;
48pub mod multiproof;
49pub mod prewarm;
50pub mod sparse_trie;
51
52use configured_sparse_trie::ConfiguredSparseTrie;
53
54#[derive(Debug)]
56pub struct PayloadProcessor<Evm>
57where
58 Evm: ConfigureEvm,
59{
60 executor: WorkloadExecutor,
62 execution_cache: ExecutionCache,
64 trie_metrics: MultiProofTaskMetrics,
66 cross_block_cache_size: u64,
68 disable_transaction_prewarming: bool,
70 evm_config: Evm,
72 precompile_cache_disabled: bool,
74 precompile_cache_map: PrecompileCacheMap<SpecFor<Evm>>,
76 sparse_state_trie: Arc<
79 parking_lot::Mutex<Option<ClearedSparseStateTrie<ConfiguredSparseTrie, SerialSparseTrie>>>,
80 >,
81 disable_parallel_sparse_trie: bool,
83 trie_input: Option<TrieInput>,
85}
86
87impl<N, Evm> PayloadProcessor<Evm>
88where
89 N: NodePrimitives,
90 Evm: ConfigureEvm<Primitives = N>,
91{
92 pub fn new(
94 executor: WorkloadExecutor,
95 evm_config: Evm,
96 config: &TreeConfig,
97 precompile_cache_map: PrecompileCacheMap<SpecFor<Evm>>,
98 ) -> Self {
99 Self {
100 executor,
101 execution_cache: Default::default(),
102 trie_metrics: Default::default(),
103 cross_block_cache_size: config.cross_block_cache_size(),
104 disable_transaction_prewarming: config.disable_caching_and_prewarming(),
105 evm_config,
106 precompile_cache_disabled: config.precompile_cache_disabled(),
107 precompile_cache_map,
108 sparse_state_trie: Arc::default(),
109 trie_input: None,
110 disable_parallel_sparse_trie: config.disable_parallel_sparse_trie(),
111 }
112 }
113}
114
115impl<N, Evm> PayloadProcessor<Evm>
116where
117 N: NodePrimitives,
118 Evm: ConfigureEvm<Primitives = N> + 'static,
119{
120 pub fn spawn<P, I: ExecutableTxIterator<Evm>>(
153 &mut self,
154 env: ExecutionEnv<Evm>,
155 transactions: I,
156 provider_builder: StateProviderBuilder<N, P>,
157 consistent_view: ConsistentDbView<P>,
158 trie_input: TrieInput,
159 config: &TreeConfig,
160 ) -> PayloadHandle<WithTxEnv<TxEnvFor<Evm>, I::Tx>, I::Error>
161 where
162 P: DatabaseProviderFactory<Provider: BlockReader>
163 + BlockReader
164 + StateProviderFactory
165 + StateReader
166 + Clone
167 + 'static,
168 {
169 let (to_sparse_trie, sparse_trie_rx) = channel();
170 let (trie_input, state_root_config) =
172 MultiProofConfig::new_from_input(consistent_view, trie_input);
173 self.trie_input = Some(trie_input);
174
175 let task_ctx = ProofTaskCtx::new(
177 state_root_config.nodes_sorted.clone(),
178 state_root_config.state_sorted.clone(),
179 state_root_config.prefix_sets.clone(),
180 );
181 let max_proof_task_concurrency = config.max_proof_task_concurrency() as usize;
182 let proof_task = ProofTaskManager::new(
183 self.executor.handle().clone(),
184 state_root_config.consistent_view.clone(),
185 task_ctx,
186 max_proof_task_concurrency,
187 );
188
189 let max_multi_proof_task_concurrency = max_proof_task_concurrency / 2;
192 let multi_proof_task = MultiProofTask::new(
193 state_root_config,
194 self.executor.clone(),
195 proof_task.handle(),
196 to_sparse_trie,
197 max_multi_proof_task_concurrency,
198 );
199
200 let to_multi_proof = Some(multi_proof_task.state_root_message_sender());
202
203 let (prewarm_rx, execution_rx) = self.spawn_tx_iterator(transactions);
204
205 let prewarm_handle =
206 self.spawn_caching_with(env, prewarm_rx, provider_builder, to_multi_proof.clone());
207
208 self.executor.spawn_blocking(move || {
210 multi_proof_task.run();
211 });
212
213 let (state_root_tx, state_root_rx) = channel();
215
216 self.spawn_sparse_trie_task(sparse_trie_rx, proof_task.handle(), state_root_tx);
218
219 self.executor.spawn_blocking(move || {
221 if let Err(err) = proof_task.run() {
222 tracing::error!(
224 target: "engine::root",
225 ?err,
226 "Storage proof task returned an error"
227 );
228 }
229 });
230
231 PayloadHandle {
232 to_multi_proof,
233 prewarm_handle,
234 state_root: Some(state_root_rx),
235 transactions: execution_rx,
236 }
237 }
238
239 pub(super) fn spawn_cache_exclusive<P, I: ExecutableTxIterator<Evm>>(
243 &self,
244 env: ExecutionEnv<Evm>,
245 transactions: I,
246 provider_builder: StateProviderBuilder<N, P>,
247 ) -> PayloadHandle<WithTxEnv<TxEnvFor<Evm>, I::Tx>, I::Error>
248 where
249 P: BlockReader + StateProviderFactory + StateReader + Clone + 'static,
250 {
251 let (prewarm_rx, execution_rx) = self.spawn_tx_iterator(transactions);
252 let prewarm_handle = self.spawn_caching_with(env, prewarm_rx, provider_builder, None);
253 PayloadHandle {
254 to_multi_proof: None,
255 prewarm_handle,
256 state_root: None,
257 transactions: execution_rx,
258 }
259 }
260
261 #[expect(clippy::type_complexity)]
263 fn spawn_tx_iterator<I: ExecutableTxIterator<Evm>>(
264 &self,
265 transactions: I,
266 ) -> (
267 mpsc::Receiver<WithTxEnv<TxEnvFor<Evm>, I::Tx>>,
268 mpsc::Receiver<Result<WithTxEnv<TxEnvFor<Evm>, I::Tx>, I::Error>>,
269 ) {
270 let (prewarm_tx, prewarm_rx) = mpsc::channel();
271 let (execute_tx, execute_rx) = mpsc::channel();
272 self.executor.spawn_blocking(move || {
273 for tx in transactions {
274 let tx = tx.map(|tx| WithTxEnv { tx_env: tx.to_tx_env(), tx });
275 if let Ok(tx) = &tx {
277 let _ = prewarm_tx.send(tx.clone());
278 }
279 let _ = execute_tx.send(tx);
280 }
281 });
282
283 (prewarm_rx, execute_rx)
284 }
285
286 fn spawn_caching_with<P>(
288 &self,
289 env: ExecutionEnv<Evm>,
290 mut transactions: mpsc::Receiver<impl ExecutableTxFor<Evm> + Send + 'static>,
291 provider_builder: StateProviderBuilder<N, P>,
292 to_multi_proof: Option<Sender<MultiProofMessage>>,
293 ) -> CacheTaskHandle
294 where
295 P: BlockReader + StateProviderFactory + StateReader + Clone + 'static,
296 {
297 if self.disable_transaction_prewarming {
298 transactions = mpsc::channel().1;
301 }
302
303 let (cache, cache_metrics) = self.cache_for(env.parent_hash).split();
304 let prewarm_ctx = PrewarmContext {
306 env,
307 evm_config: self.evm_config.clone(),
308 cache: cache.clone(),
309 cache_metrics: cache_metrics.clone(),
310 provider: provider_builder,
311 metrics: PrewarmMetrics::default(),
312 terminate_execution: Arc::new(AtomicBool::new(false)),
313 precompile_cache_disabled: self.precompile_cache_disabled,
314 precompile_cache_map: self.precompile_cache_map.clone(),
315 };
316
317 let (prewarm_task, to_prewarm_task) = PrewarmCacheTask::new(
318 self.executor.clone(),
319 self.execution_cache.clone(),
320 prewarm_ctx,
321 to_multi_proof,
322 );
323
324 {
326 let to_prewarm_task = to_prewarm_task.clone();
327 self.executor.spawn_blocking(move || {
328 prewarm_task.run(transactions, to_prewarm_task);
329 });
330 }
331
332 CacheTaskHandle { cache, to_prewarm_task: Some(to_prewarm_task), cache_metrics }
333 }
334
335 pub const fn take_trie_input(&mut self) -> Option<TrieInput> {
337 self.trie_input.take()
338 }
339
340 fn cache_for(&self, parent_hash: B256) -> SavedCache {
345 self.execution_cache.get_cache_for(parent_hash).unwrap_or_else(|| {
346 let cache = ProviderCacheBuilder::default().build_caches(self.cross_block_cache_size);
347 SavedCache::new(parent_hash, cache, CachedStateMetrics::zeroed())
348 })
349 }
350
351 fn spawn_sparse_trie_task<BPF>(
353 &self,
354 sparse_trie_rx: mpsc::Receiver<SparseTrieUpdate>,
355 proof_task_handle: BPF,
356 state_root_tx: mpsc::Sender<Result<StateRootComputeOutcome, ParallelStateRootError>>,
357 ) where
358 BPF: TrieNodeProviderFactory + Clone + Send + Sync + 'static,
359 BPF::AccountNodeProvider: TrieNodeProvider + Send + Sync,
360 BPF::StorageNodeProvider: TrieNodeProvider + Send + Sync,
361 {
362 let cleared_sparse_trie = Arc::clone(&self.sparse_state_trie);
365 let sparse_state_trie = cleared_sparse_trie.lock().take().unwrap_or_else(|| {
366 let accounts_trie = if self.disable_parallel_sparse_trie {
367 ConfiguredSparseTrie::Serial(Default::default())
368 } else {
369 ConfiguredSparseTrie::Parallel(Default::default())
370 };
371 ClearedSparseStateTrie::from_state_trie(
372 SparseStateTrie::new()
373 .with_accounts_trie(SparseTrie::Blind(Some(Box::new(accounts_trie))))
374 .with_updates(true),
375 )
376 });
377
378 let task =
379 SparseTrieTask::<_, ConfiguredSparseTrie, SerialSparseTrie>::new_with_cleared_trie(
380 sparse_trie_rx,
381 proof_task_handle,
382 self.trie_metrics.clone(),
383 sparse_state_trie,
384 );
385
386 self.executor.spawn_blocking(move || {
387 let (result, trie) = task.run();
388 let _ = state_root_tx.send(result);
390
391 cleared_sparse_trie.lock().replace(ClearedSparseStateTrie::from_state_trie(trie));
394 });
395 }
396}
397
398#[derive(Debug)]
400pub struct PayloadHandle<Tx, Err> {
401 to_multi_proof: Option<Sender<MultiProofMessage>>,
403 prewarm_handle: CacheTaskHandle,
405 state_root: Option<mpsc::Receiver<Result<StateRootComputeOutcome, ParallelStateRootError>>>,
407 transactions: mpsc::Receiver<Result<Tx, Err>>,
409}
410
411impl<Tx, Err> PayloadHandle<Tx, Err> {
412 pub fn state_root(&mut self) -> Result<StateRootComputeOutcome, ParallelStateRootError> {
418 self.state_root
419 .take()
420 .expect("state_root is None")
421 .recv()
422 .map_err(|_| ParallelStateRootError::Other("sparse trie task dropped".to_string()))?
423 }
424
425 pub fn state_hook(&self) -> impl OnStateHook {
429 let to_multi_proof = self.to_multi_proof.clone().map(StateHookSender::new);
431
432 move |source: StateChangeSource, state: &EvmState| {
433 if let Some(sender) = &to_multi_proof {
434 let _ = sender.send(MultiProofMessage::StateUpdate(source, state.clone()));
435 }
436 }
437 }
438
439 pub(super) fn caches(&self) -> ProviderCaches {
441 self.prewarm_handle.cache.clone()
442 }
443
444 pub(super) fn cache_metrics(&self) -> CachedStateMetrics {
445 self.prewarm_handle.cache_metrics.clone()
446 }
447
448 pub(super) fn stop_prewarming_execution(&self) {
452 self.prewarm_handle.stop_prewarming_execution()
453 }
454
455 pub(super) fn terminate_caching(&mut self, block_output: Option<BundleState>) {
459 self.prewarm_handle.terminate_caching(block_output)
460 }
461
462 pub fn iter_transactions(&mut self) -> impl Iterator<Item = Result<Tx, Err>> + '_ {
464 core::iter::repeat_with(|| self.transactions.recv())
465 .take_while(|res| res.is_ok())
466 .map(|res| res.unwrap())
467 }
468}
469
470#[derive(Debug)]
472pub(crate) struct CacheTaskHandle {
473 cache: ProviderCaches,
475 cache_metrics: CachedStateMetrics,
477 to_prewarm_task: Option<Sender<PrewarmTaskEvent>>,
479}
480
481impl CacheTaskHandle {
482 pub(super) fn stop_prewarming_execution(&self) {
486 self.to_prewarm_task
487 .as_ref()
488 .map(|tx| tx.send(PrewarmTaskEvent::TerminateTransactionExecution).ok());
489 }
490
491 pub(super) fn terminate_caching(&mut self, block_output: Option<BundleState>) {
495 self.to_prewarm_task
496 .take()
497 .map(|tx| tx.send(PrewarmTaskEvent::Terminate { block_output }).ok());
498 }
499}
500
501impl Drop for CacheTaskHandle {
502 fn drop(&mut self) {
503 self.terminate_caching(None);
505 }
506}
507
508#[derive(Clone, Debug, Default)]
516struct ExecutionCache {
517 inner: Arc<RwLock<Option<SavedCache>>>,
519}
520
521impl ExecutionCache {
522 pub(crate) fn get_cache_for(&self, parent_hash: B256) -> Option<SavedCache> {
524 let cache = self.inner.read();
525 cache
526 .as_ref()
527 .and_then(|cache| (cache.executed_block_hash() == parent_hash).then(|| cache.clone()))
528 }
529
530 #[expect(unused)]
532 pub(crate) fn clear(&self) {
533 self.inner.write().take();
534 }
535
536 pub(crate) fn save_cache(&self, cache: SavedCache) {
538 self.inner.write().replace(cache);
539 }
540}
541
542#[derive(Debug, Clone)]
544pub struct ExecutionEnv<Evm: ConfigureEvm> {
545 pub evm_env: EvmEnvFor<Evm>,
547 pub hash: B256,
549 pub parent_hash: B256,
551}
552
553impl<Evm: ConfigureEvm> Default for ExecutionEnv<Evm>
554where
555 EvmEnvFor<Evm>: Default,
556{
557 fn default() -> Self {
558 Self {
559 evm_env: Default::default(),
560 hash: Default::default(),
561 parent_hash: Default::default(),
562 }
563 }
564}
565
566#[cfg(test)]
567mod tests {
568 use crate::tree::{
569 payload_processor::{
570 evm_state_to_hashed_post_state, executor::WorkloadExecutor, PayloadProcessor,
571 },
572 precompile_cache::PrecompileCacheMap,
573 StateProviderBuilder, TreeConfig,
574 };
575 use alloy_evm::block::StateChangeSource;
576 use rand::Rng;
577 use reth_chainspec::ChainSpec;
578 use reth_db_common::init::init_genesis;
579 use reth_ethereum_primitives::TransactionSigned;
580 use reth_evm::OnStateHook;
581 use reth_evm_ethereum::EthEvmConfig;
582 use reth_primitives_traits::{Account, Recovered, StorageEntry};
583 use reth_provider::{
584 providers::{BlockchainProvider, ConsistentDbView},
585 test_utils::create_test_provider_factory_with_chain_spec,
586 ChainSpecProvider, HashingWriter,
587 };
588 use reth_testing_utils::generators;
589 use reth_trie::{test_utils::state_root, HashedPostState, TrieInput};
590 use revm_primitives::{Address, HashMap, B256, KECCAK_EMPTY, U256};
591 use revm_state::{AccountInfo, AccountStatus, EvmState, EvmStorageSlot};
592 use std::sync::Arc;
593
594 fn create_mock_state_updates(num_accounts: usize, updates_per_account: usize) -> Vec<EvmState> {
595 let mut rng = generators::rng();
596 let all_addresses: Vec<Address> = (0..num_accounts).map(|_| rng.random()).collect();
597 let mut updates = Vec::with_capacity(updates_per_account);
598
599 for _ in 0..updates_per_account {
600 let num_accounts_in_update = rng.random_range(1..=num_accounts);
601 let mut state_update = EvmState::default();
602
603 let selected_addresses = &all_addresses[0..num_accounts_in_update];
604
605 for &address in selected_addresses {
606 let mut storage = HashMap::default();
607 if rng.random_bool(0.7) {
608 for _ in 0..rng.random_range(1..10) {
609 let slot = U256::from(rng.random::<u64>());
610 storage.insert(
611 slot,
612 EvmStorageSlot::new_changed(
613 U256::ZERO,
614 U256::from(rng.random::<u64>()),
615 0,
616 ),
617 );
618 }
619 }
620
621 let account = revm_state::Account {
622 info: AccountInfo {
623 balance: U256::from(rng.random::<u64>()),
624 nonce: rng.random::<u64>(),
625 code_hash: KECCAK_EMPTY,
626 code: Some(Default::default()),
627 },
628 storage,
629 status: AccountStatus::Touched,
630 transaction_id: 0,
631 };
632
633 state_update.insert(address, account);
634 }
635
636 updates.push(state_update);
637 }
638
639 updates
640 }
641
642 #[test]
643 fn test_state_root() {
644 reth_tracing::init_test_tracing();
645
646 let factory = create_test_provider_factory_with_chain_spec(Arc::new(ChainSpec::default()));
647 let genesis_hash = init_genesis(&factory).unwrap();
648
649 let state_updates = create_mock_state_updates(10, 10);
650 let mut hashed_state = HashedPostState::default();
651 let mut accumulated_state: HashMap<Address, (Account, HashMap<B256, U256>)> =
652 HashMap::default();
653
654 {
655 let provider_rw = factory.provider_rw().expect("failed to get provider");
656
657 for update in &state_updates {
658 let account_updates = update.iter().map(|(address, account)| {
659 (*address, Some(Account::from_revm_account(account)))
660 });
661 provider_rw
662 .insert_account_for_hashing(account_updates)
663 .expect("failed to insert accounts");
664
665 let storage_updates = update.iter().map(|(address, account)| {
666 let storage_entries = account.storage.iter().map(|(slot, value)| {
667 StorageEntry { key: B256::from(*slot), value: value.present_value }
668 });
669 (*address, storage_entries)
670 });
671 provider_rw
672 .insert_storage_for_hashing(storage_updates)
673 .expect("failed to insert storage");
674 }
675 provider_rw.commit().expect("failed to commit changes");
676 }
677
678 for update in &state_updates {
679 hashed_state.extend(evm_state_to_hashed_post_state(update.clone()));
680
681 for (address, account) in update {
682 let storage: HashMap<B256, U256> = account
683 .storage
684 .iter()
685 .map(|(k, v)| (B256::from(*k), v.present_value))
686 .collect();
687
688 let entry = accumulated_state.entry(*address).or_default();
689 entry.0 = Account::from_revm_account(account);
690 entry.1.extend(storage);
691 }
692 }
693
694 let mut payload_processor = PayloadProcessor::new(
695 WorkloadExecutor::default(),
696 EthEvmConfig::new(factory.chain_spec()),
697 &TreeConfig::default(),
698 PrecompileCacheMap::default(),
699 );
700 let provider = BlockchainProvider::new(factory).unwrap();
701 let mut handle = payload_processor.spawn(
702 Default::default(),
703 core::iter::empty::<Result<Recovered<TransactionSigned>, core::convert::Infallible>>(),
704 StateProviderBuilder::new(provider.clone(), genesis_hash, None),
705 ConsistentDbView::new_with_latest_tip(provider).unwrap(),
706 TrieInput::from_state(hashed_state),
707 &TreeConfig::default(),
708 );
709
710 let mut state_hook = handle.state_hook();
711
712 for (i, update) in state_updates.into_iter().enumerate() {
713 state_hook.on_state(StateChangeSource::Transaction(i), &update);
714 }
715 drop(state_hook);
716
717 let root_from_task = handle.state_root().expect("task failed").state_root;
718 let root_from_regular = state_root(accumulated_state);
719
720 assert_eq!(
721 root_from_task, root_from_regular,
722 "State root mismatch: task={root_from_task}, base={root_from_regular}"
723 );
724 }
725}