reth_engine_tree/tree/payload_processor/
mod.rs

1//! Entrypoint for payload processing.
2
3use crate::tree::{
4    cached_state::{CachedStateMetrics, ProviderCacheBuilder, ProviderCaches, SavedCache},
5    payload_processor::{
6        prewarm::{PrewarmCacheTask, PrewarmContext, PrewarmTaskEvent},
7        sparse_trie::StateRootComputeOutcome,
8    },
9    sparse_trie::SparseTrieTask,
10    StateProviderBuilder, TreeConfig,
11};
12use alloy_consensus::{transaction::Recovered, BlockHeader};
13use alloy_evm::block::StateChangeSource;
14use alloy_primitives::B256;
15use executor::WorkloadExecutor;
16use multiproof::*;
17use parking_lot::RwLock;
18use prewarm::PrewarmMetrics;
19use reth_evm::{ConfigureEvm, OnStateHook};
20use reth_primitives_traits::{NodePrimitives, SealedHeaderFor};
21use reth_provider::{
22    providers::ConsistentDbView, BlockReader, DatabaseProviderFactory, StateCommitmentProvider,
23    StateProviderFactory, StateReader,
24};
25use reth_revm::{db::BundleState, state::EvmState};
26use reth_trie::TrieInput;
27use reth_trie_parallel::{
28    proof_task::{ProofTaskCtx, ProofTaskManager},
29    root::ParallelStateRootError,
30};
31use std::{
32    collections::VecDeque,
33    sync::{
34        mpsc,
35        mpsc::{channel, Sender},
36        Arc,
37    },
38};
39
40pub mod executor;
41pub mod multiproof;
42pub mod prewarm;
43pub mod sparse_trie;
44
45/// Entrypoint for executing the payload.
46#[derive(Debug, Clone)]
47pub struct PayloadProcessor<N, Evm> {
48    /// The executor used by to spawn tasks.
49    executor: WorkloadExecutor,
50    /// The most recent cache used for execution.
51    execution_cache: ExecutionCache,
52    /// Metrics for trie operations
53    trie_metrics: MultiProofTaskMetrics,
54    /// Cross-block cache size in bytes.
55    cross_block_cache_size: u64,
56    /// Whether transactions should be executed on prewarming task.
57    use_transaction_prewarming: bool,
58    /// Determines how to configure the evm for execution.
59    evm_config: Evm,
60    _marker: std::marker::PhantomData<N>,
61}
62
63impl<N, Evm> PayloadProcessor<N, Evm> {
64    /// Creates a new payload processor.
65    pub fn new(executor: WorkloadExecutor, evm_config: Evm, config: &TreeConfig) -> Self {
66        Self {
67            executor,
68            execution_cache: Default::default(),
69            trie_metrics: Default::default(),
70            cross_block_cache_size: config.cross_block_cache_size(),
71            use_transaction_prewarming: config.use_caching_and_prewarming(),
72            evm_config,
73            _marker: Default::default(),
74        }
75    }
76}
77
78impl<N, Evm> PayloadProcessor<N, Evm>
79where
80    N: NodePrimitives,
81    Evm: ConfigureEvm<Primitives = N> + 'static,
82{
83    /// Spawns all background tasks and returns a handle connected to the tasks.
84    ///
85    /// - Transaction prewarming task
86    /// - State root task
87    /// - Sparse trie task
88    ///
89    /// # Transaction prewarming task
90    ///
91    /// Responsible for feeding state updates to the multi proof task.
92    ///
93    /// This task runs until:
94    ///  - externally cancelled (e.g. sequential block execution is complete)
95    ///
96    /// ## Multi proof task
97    ///
98    /// Responsible for preparing sparse trie messages for the sparse trie task.
99    /// A state update (e.g. tx output) is converted into a multiproof calculation that returns an
100    /// output back to this task.
101    ///
102    /// Receives updates from sequential execution.
103    /// This task runs until it receives a shutdown signal, which should be after the block
104    /// was fully executed.
105    ///
106    /// ## Sparse trie task
107    ///
108    /// Responsible for calculating the state root based on the received [`SparseTrieUpdate`].
109    ///
110    /// This task runs until there are no further updates to process.
111    ///
112    ///
113    /// This returns a handle to await the final state root and to interact with the tasks (e.g.
114    /// canceling)
115    pub fn spawn<P>(
116        &self,
117        header: SealedHeaderFor<N>,
118        transactions: VecDeque<Recovered<N::SignedTx>>,
119        provider_builder: StateProviderBuilder<N, P>,
120        consistent_view: ConsistentDbView<P>,
121        trie_input: TrieInput,
122        config: &TreeConfig,
123    ) -> PayloadHandle
124    where
125        P: DatabaseProviderFactory<Provider: BlockReader>
126            + BlockReader
127            + StateProviderFactory
128            + StateReader
129            + StateCommitmentProvider
130            + Clone
131            + 'static,
132    {
133        let (to_sparse_trie, sparse_trie_rx) = channel();
134        // spawn multiproof task
135        let state_root_config = MultiProofConfig::new_from_input(consistent_view, trie_input);
136
137        // Create and spawn the storage proof task
138        let task_ctx = ProofTaskCtx::new(
139            state_root_config.nodes_sorted.clone(),
140            state_root_config.state_sorted.clone(),
141            state_root_config.prefix_sets.clone(),
142        );
143        let max_proof_task_concurrency = config.max_proof_task_concurrency() as usize;
144        let proof_task = ProofTaskManager::new(
145            self.executor.handle().clone(),
146            state_root_config.consistent_view.clone(),
147            task_ctx,
148            max_proof_task_concurrency,
149        );
150
151        // We set it to half of the proof task concurrency, because often for each multiproof we
152        // spawn one Tokio task for the account proof, and one Tokio task for the storage proof.
153        let max_multi_proof_task_concurrency = max_proof_task_concurrency / 2;
154        let multi_proof_task = MultiProofTask::new(
155            state_root_config,
156            self.executor.clone(),
157            proof_task.handle(),
158            to_sparse_trie,
159            max_multi_proof_task_concurrency,
160        );
161
162        // wire the multiproof task to the prewarm task
163        let to_multi_proof = Some(multi_proof_task.state_root_message_sender());
164
165        let prewarm_handle =
166            self.spawn_caching_with(header, transactions, provider_builder, to_multi_proof.clone());
167
168        // spawn multi-proof task
169        self.executor.spawn_blocking(move || {
170            multi_proof_task.run();
171        });
172
173        let sparse_trie_task = SparseTrieTask::new(
174            self.executor.clone(),
175            sparse_trie_rx,
176            proof_task.handle(),
177            self.trie_metrics.clone(),
178        );
179
180        // wire the sparse trie to the state root response receiver
181        let (state_root_tx, state_root_rx) = channel();
182        self.executor.spawn_blocking(move || {
183            let res = sparse_trie_task.run();
184            let _ = state_root_tx.send(res);
185        });
186
187        // spawn the proof task
188        self.executor.spawn_blocking(move || {
189            if let Err(err) = proof_task.run() {
190                // At least log if there is an error at any point
191                tracing::error!(
192                    target: "engine::root",
193                    ?err,
194                    "Storage proof task returned an error"
195                );
196            }
197        });
198
199        PayloadHandle { to_multi_proof, prewarm_handle, state_root: Some(state_root_rx) }
200    }
201
202    /// Spawn cache prewarming exclusively.
203    ///
204    /// Returns a [`PayloadHandle`] to communicate with the task.
205    pub(super) fn spawn_cache_exclusive<P>(
206        &self,
207        header: SealedHeaderFor<N>,
208        transactions: VecDeque<Recovered<N::SignedTx>>,
209        provider_builder: StateProviderBuilder<N, P>,
210    ) -> PayloadHandle
211    where
212        P: BlockReader
213            + StateProviderFactory
214            + StateReader
215            + StateCommitmentProvider
216            + Clone
217            + 'static,
218    {
219        let prewarm_handle = self.spawn_caching_with(header, transactions, provider_builder, None);
220        PayloadHandle { to_multi_proof: None, prewarm_handle, state_root: None }
221    }
222
223    /// Spawn prewarming optionally wired to the multiproof task for target updates.
224    fn spawn_caching_with<P>(
225        &self,
226        header: SealedHeaderFor<N>,
227        mut transactions: VecDeque<Recovered<N::SignedTx>>,
228        provider_builder: StateProviderBuilder<N, P>,
229        to_multi_proof: Option<Sender<MultiProofMessage>>,
230    ) -> CacheTaskHandle
231    where
232        P: BlockReader
233            + StateProviderFactory
234            + StateReader
235            + StateCommitmentProvider
236            + Clone
237            + 'static,
238    {
239        if !self.use_transaction_prewarming {
240            // if no transactions should be executed we clear them but still spawn the task for
241            // caching updates
242            transactions.clear();
243        }
244
245        let (cache, cache_metrics) = self.cache_for(header.parent_hash()).split();
246        // configure prewarming
247        let prewarm_ctx = PrewarmContext {
248            header,
249            evm_config: self.evm_config.clone(),
250            cache: cache.clone(),
251            cache_metrics: cache_metrics.clone(),
252            provider: provider_builder,
253            metrics: PrewarmMetrics::default(),
254        };
255
256        let prewarm_task = PrewarmCacheTask::new(
257            self.executor.clone(),
258            self.execution_cache.clone(),
259            prewarm_ctx,
260            to_multi_proof,
261            transactions,
262        );
263        let to_prewarm_task = prewarm_task.actions_tx();
264
265        // spawn pre-warm task
266        self.executor.spawn_blocking(move || {
267            prewarm_task.run();
268        });
269        CacheTaskHandle { cache, to_prewarm_task: Some(to_prewarm_task), cache_metrics }
270    }
271
272    /// Returns the cache for the given parent hash.
273    ///
274    /// If the given hash is different then what is recently cached, then this will create a new
275    /// instance.
276    fn cache_for(&self, parent_hash: B256) -> SavedCache {
277        self.execution_cache.get_cache_for(parent_hash).unwrap_or_else(|| {
278            let cache = ProviderCacheBuilder::default().build_caches(self.cross_block_cache_size);
279            SavedCache::new(parent_hash, cache, CachedStateMetrics::zeroed())
280        })
281    }
282}
283
284/// Handle to all the spawned tasks.
285#[derive(Debug)]
286pub struct PayloadHandle {
287    /// Channel for evm state updates
288    to_multi_proof: Option<Sender<MultiProofMessage>>,
289    // must include the receiver of the state root wired to the sparse trie
290    prewarm_handle: CacheTaskHandle,
291    /// Receiver for the state root
292    state_root: Option<mpsc::Receiver<Result<StateRootComputeOutcome, ParallelStateRootError>>>,
293}
294
295impl PayloadHandle {
296    /// Awaits the state root
297    ///
298    /// # Panics
299    ///
300    /// If payload processing was started without background tasks.
301    pub fn state_root(&mut self) -> Result<StateRootComputeOutcome, ParallelStateRootError> {
302        self.state_root
303            .take()
304            .expect("state_root is None")
305            .recv()
306            .map_err(|_| ParallelStateRootError::Other("sparse trie task dropped".to_string()))?
307    }
308
309    /// Returns a state hook to be used to send state updates to this task.
310    ///
311    /// If a multiproof task is spawned the hook will notify it about new states.
312    pub fn state_hook(&self) -> impl OnStateHook {
313        // convert the channel into a `StateHookSender` that emits an event on drop
314        let to_multi_proof = self.to_multi_proof.clone().map(StateHookSender::new);
315
316        move |source: StateChangeSource, state: &EvmState| {
317            if let Some(sender) = &to_multi_proof {
318                let _ = sender.send(MultiProofMessage::StateUpdate(source, state.clone()));
319            }
320        }
321    }
322
323    /// Returns a clone of the caches used by prewarming
324    pub(super) fn caches(&self) -> ProviderCaches {
325        self.prewarm_handle.cache.clone()
326    }
327
328    pub(super) fn cache_metrics(&self) -> CachedStateMetrics {
329        self.prewarm_handle.cache_metrics.clone()
330    }
331
332    /// Terminates the pre-warming transaction processing.
333    ///
334    /// Note: This does not terminate the task yet.
335    pub(super) fn stop_prewarming_execution(&self) {
336        self.prewarm_handle.stop_prewarming_execution()
337    }
338
339    /// Terminates the entire caching task.
340    ///
341    /// If the [`BundleState`] is provided it will update the shared cache.
342    pub(super) fn terminate_caching(&mut self, block_output: Option<BundleState>) {
343        self.prewarm_handle.terminate_caching(block_output)
344    }
345}
346
347/// Access to the spawned [`PrewarmCacheTask`].
348#[derive(Debug)]
349pub(crate) struct CacheTaskHandle {
350    /// The shared cache the task operates with.
351    cache: ProviderCaches,
352    /// Metrics for the caches
353    cache_metrics: CachedStateMetrics,
354    /// Channel to the spawned prewarm task if any
355    to_prewarm_task: Option<Sender<PrewarmTaskEvent>>,
356}
357
358impl CacheTaskHandle {
359    /// Terminates the pre-warming transaction processing.
360    ///
361    /// Note: This does not terminate the task yet.
362    pub(super) fn stop_prewarming_execution(&self) {
363        self.to_prewarm_task
364            .as_ref()
365            .map(|tx| tx.send(PrewarmTaskEvent::TerminateTransactionExecution).ok());
366    }
367
368    /// Terminates the entire pre-warming task.
369    ///
370    /// If the [`BundleState`] is provided it will update the shared cache.
371    pub(super) fn terminate_caching(&mut self, block_output: Option<BundleState>) {
372        self.to_prewarm_task
373            .take()
374            .map(|tx| tx.send(PrewarmTaskEvent::Terminate { block_output }).ok());
375    }
376}
377
378impl Drop for CacheTaskHandle {
379    fn drop(&mut self) {
380        // Ensure we always terminate on drop
381        self.terminate_caching(None);
382    }
383}
384
385/// Shared access to most recently used cache.
386///
387/// This cache is intended to used for processing the payload in the following manner:
388///  - Get Cache if the payload's parent block matches the parent block
389///  - Update cache upon successful payload execution
390///
391/// This process assumes that payloads are received sequentially.
392#[derive(Clone, Debug, Default)]
393struct ExecutionCache {
394    /// Guarded cloneable cache identified by a block hash.
395    inner: Arc<RwLock<Option<SavedCache>>>,
396}
397
398impl ExecutionCache {
399    /// Returns the cache if the currently store cache is for the given `parent_hash`
400    pub(crate) fn get_cache_for(&self, parent_hash: B256) -> Option<SavedCache> {
401        let cache = self.inner.read();
402        cache
403            .as_ref()
404            .and_then(|cache| (cache.executed_block_hash() == parent_hash).then(|| cache.clone()))
405    }
406
407    /// Clears the tracked cache
408    #[expect(unused)]
409    pub(crate) fn clear(&self) {
410        self.inner.write().take();
411    }
412
413    /// Stores the provider cache
414    pub(crate) fn save_cache(&self, cache: SavedCache) {
415        self.inner.write().replace(cache);
416    }
417}
418
419#[cfg(test)]
420mod tests {
421    use std::sync::Arc;
422
423    use crate::tree::{
424        payload_processor::{
425            evm_state_to_hashed_post_state, executor::WorkloadExecutor, PayloadProcessor,
426        },
427        StateProviderBuilder, TreeConfig,
428    };
429    use alloy_evm::block::StateChangeSource;
430    use rand::Rng;
431    use reth_chainspec::ChainSpec;
432    use reth_db_common::init::init_genesis;
433    use reth_ethereum_primitives::EthPrimitives;
434    use reth_evm::OnStateHook;
435    use reth_evm_ethereum::EthEvmConfig;
436    use reth_primitives_traits::{Account, StorageEntry};
437    use reth_provider::{
438        providers::{BlockchainProvider, ConsistentDbView},
439        test_utils::create_test_provider_factory_with_chain_spec,
440        ChainSpecProvider, HashingWriter,
441    };
442    use reth_testing_utils::generators;
443    use reth_trie::{test_utils::state_root, HashedPostState, TrieInput};
444    use revm_primitives::{Address, HashMap, B256, KECCAK_EMPTY, U256};
445    use revm_state::{AccountInfo, AccountStatus, EvmState, EvmStorageSlot};
446
447    fn create_mock_state_updates(num_accounts: usize, updates_per_account: usize) -> Vec<EvmState> {
448        let mut rng = generators::rng();
449        let all_addresses: Vec<Address> = (0..num_accounts).map(|_| rng.random()).collect();
450        let mut updates = Vec::new();
451
452        for _ in 0..updates_per_account {
453            let num_accounts_in_update = rng.random_range(1..=num_accounts);
454            let mut state_update = EvmState::default();
455
456            let selected_addresses = &all_addresses[0..num_accounts_in_update];
457
458            for &address in selected_addresses {
459                let mut storage = HashMap::default();
460                if rng.random_bool(0.7) {
461                    for _ in 0..rng.random_range(1..10) {
462                        let slot = U256::from(rng.random::<u64>());
463                        storage.insert(
464                            slot,
465                            EvmStorageSlot::new_changed(
466                                U256::ZERO,
467                                U256::from(rng.random::<u64>()),
468                            ),
469                        );
470                    }
471                }
472
473                let account = revm_state::Account {
474                    info: AccountInfo {
475                        balance: U256::from(rng.random::<u64>()),
476                        nonce: rng.random::<u64>(),
477                        code_hash: KECCAK_EMPTY,
478                        code: Some(Default::default()),
479                    },
480                    storage,
481                    status: AccountStatus::Touched,
482                };
483
484                state_update.insert(address, account);
485            }
486
487            updates.push(state_update);
488        }
489
490        updates
491    }
492
493    #[test]
494    fn test_state_root() {
495        reth_tracing::init_test_tracing();
496
497        let factory = create_test_provider_factory_with_chain_spec(Arc::new(ChainSpec::default()));
498        let genesis_hash = init_genesis(&factory).unwrap();
499
500        let state_updates = create_mock_state_updates(10, 10);
501        let mut hashed_state = HashedPostState::default();
502        let mut accumulated_state: HashMap<Address, (Account, HashMap<B256, U256>)> =
503            HashMap::default();
504
505        {
506            let provider_rw = factory.provider_rw().expect("failed to get provider");
507
508            for update in &state_updates {
509                let account_updates = update.iter().map(|(address, account)| {
510                    (*address, Some(Account::from_revm_account(account)))
511                });
512                provider_rw
513                    .insert_account_for_hashing(account_updates)
514                    .expect("failed to insert accounts");
515
516                let storage_updates = update.iter().map(|(address, account)| {
517                    let storage_entries = account.storage.iter().map(|(slot, value)| {
518                        StorageEntry { key: B256::from(*slot), value: value.present_value }
519                    });
520                    (*address, storage_entries)
521                });
522                provider_rw
523                    .insert_storage_for_hashing(storage_updates)
524                    .expect("failed to insert storage");
525            }
526            provider_rw.commit().expect("failed to commit changes");
527        }
528
529        for update in &state_updates {
530            hashed_state.extend(evm_state_to_hashed_post_state(update.clone()));
531
532            for (address, account) in update {
533                let storage: HashMap<B256, U256> = account
534                    .storage
535                    .iter()
536                    .map(|(k, v)| (B256::from(*k), v.present_value))
537                    .collect();
538
539                let entry = accumulated_state.entry(*address).or_default();
540                entry.0 = Account::from_revm_account(account);
541                entry.1.extend(storage);
542            }
543        }
544
545        let payload_processor = PayloadProcessor::<EthPrimitives, _>::new(
546            WorkloadExecutor::default(),
547            EthEvmConfig::new(factory.chain_spec()),
548            &TreeConfig::default(),
549        );
550        let provider = BlockchainProvider::new(factory).unwrap();
551        let mut handle = payload_processor.spawn(
552            Default::default(),
553            Default::default(),
554            StateProviderBuilder::new(provider.clone(), genesis_hash, None),
555            ConsistentDbView::new_with_latest_tip(provider).unwrap(),
556            TrieInput::from_state(hashed_state),
557            &TreeConfig::default(),
558        );
559
560        let mut state_hook = handle.state_hook();
561
562        for (i, update) in state_updates.into_iter().enumerate() {
563            state_hook.on_state(StateChangeSource::Transaction(i), &update);
564        }
565        drop(state_hook);
566
567        let root_from_task = handle.state_root().expect("task failed").state_root;
568        let root_from_regular = state_root(accumulated_state);
569
570        assert_eq!(
571            root_from_task, root_from_regular,
572            "State root mismatch: task={root_from_task}, base={root_from_regular}"
573        );
574    }
575}