reth_engine_tree/tree/payload_processor/
mod.rs

1//! Entrypoint for payload processing.
2
3use crate::tree::{
4    cached_state::{CachedStateMetrics, ProviderCacheBuilder, ProviderCaches, SavedCache},
5    payload_processor::{
6        prewarm::{PrewarmCacheTask, PrewarmContext, PrewarmTaskEvent},
7        sparse_trie::StateRootComputeOutcome,
8    },
9    sparse_trie::SparseTrieTask,
10    StateProviderBuilder, TreeConfig,
11};
12use alloy_consensus::{transaction::Recovered, BlockHeader};
13use alloy_evm::block::StateChangeSource;
14use alloy_primitives::B256;
15use executor::WorkloadExecutor;
16use multiproof::*;
17use parking_lot::RwLock;
18use prewarm::PrewarmMetrics;
19use reth_evm::{ConfigureEvm, OnStateHook};
20use reth_primitives_traits::{NodePrimitives, SealedHeaderFor};
21use reth_provider::{
22    providers::ConsistentDbView, BlockReader, DatabaseProviderFactory, StateCommitmentProvider,
23    StateProviderFactory, StateReader,
24};
25use reth_revm::{db::BundleState, state::EvmState};
26use reth_trie::TrieInput;
27use reth_trie_parallel::root::ParallelStateRootError;
28use std::{
29    collections::VecDeque,
30    sync::{
31        mpsc,
32        mpsc::{channel, Sender},
33        Arc,
34    },
35};
36
37pub mod executor;
38pub mod multiproof;
39pub mod prewarm;
40pub mod sparse_trie;
41
42/// Entrypoint for executing the payload.
43#[derive(Debug, Clone)]
44pub struct PayloadProcessor<N, Evm> {
45    /// The executor used by to spawn tasks.
46    executor: WorkloadExecutor,
47    /// The most recent cache used for execution.
48    execution_cache: ExecutionCache,
49    /// Metrics for trie operations
50    trie_metrics: MultiProofTaskMetrics,
51    /// Cross-block cache size in bytes.
52    cross_block_cache_size: u64,
53    /// Whether transactions should be executed on prewarming task.
54    use_transaction_prewarming: bool,
55    /// Determines how to configure the evm for execution.
56    evm_config: Evm,
57    _marker: std::marker::PhantomData<N>,
58}
59
60impl<N, Evm> PayloadProcessor<N, Evm> {
61    /// Creates a new payload processor.
62    pub fn new(executor: WorkloadExecutor, evm_config: Evm, config: &TreeConfig) -> Self {
63        Self {
64            executor,
65            execution_cache: Default::default(),
66            trie_metrics: Default::default(),
67            cross_block_cache_size: config.cross_block_cache_size(),
68            use_transaction_prewarming: config.use_caching_and_prewarming(),
69            evm_config,
70            _marker: Default::default(),
71        }
72    }
73}
74
75impl<N, Evm> PayloadProcessor<N, Evm>
76where
77    N: NodePrimitives,
78    Evm: ConfigureEvm<Primitives = N> + 'static,
79{
80    /// Spawns all background tasks and returns a handle connected to the tasks.
81    ///
82    /// - Transaction prewarming task
83    /// - State root task
84    /// - Sparse trie task
85    ///
86    /// # Transaction prewarming task
87    ///
88    /// Responsible for feeding state updates to the multi proof task.
89    ///
90    /// This task runs until:
91    ///  - externally cancelled (e.g. sequential block execution is complete)
92    ///
93    /// ## Multi proof task
94    ///
95    /// Responsible for preparing sparse trie messages for the sparse trie task.
96    /// A state update (e.g. tx output) is converted into a multiproof calculation that returns an
97    /// output back to this task.
98    ///
99    /// Receives updates from sequential execution.
100    /// This task runs until it receives a shutdown signal, which should be after after the block
101    /// was fully executed.
102    ///
103    /// ## Sparse trie task
104    ///
105    /// Responsible for calculating the state root based on the received [`SparseTrieUpdate`].
106    ///
107    /// This task runs until there are no further updates to process.
108    ///
109    ///
110    /// This returns a handle to await the final state root and to interact with the tasks (e.g.
111    /// canceling)
112    pub fn spawn<P>(
113        &self,
114        header: SealedHeaderFor<N>,
115        transactions: VecDeque<Recovered<N::SignedTx>>,
116        provider_builder: StateProviderBuilder<N, P>,
117        consistent_view: ConsistentDbView<P>,
118        trie_input: TrieInput,
119    ) -> PayloadHandle
120    where
121        P: DatabaseProviderFactory<Provider: BlockReader>
122            + BlockReader
123            + StateProviderFactory
124            + StateReader
125            + StateCommitmentProvider
126            + Clone
127            + 'static,
128    {
129        let (to_sparse_trie, sparse_trie_rx) = channel();
130        // spawn multiproof task
131        let state_root_config = MultiProofConfig::new_from_input(consistent_view, trie_input);
132        let multi_proof_task =
133            MultiProofTask::new(state_root_config.clone(), self.executor.clone(), to_sparse_trie);
134
135        // wire the multiproof task to the prewarm task
136        let to_multi_proof = Some(multi_proof_task.state_root_message_sender());
137
138        let prewarm_handle =
139            self.spawn_caching_with(header, transactions, provider_builder, to_multi_proof.clone());
140
141        // spawn multi-proof task
142        self.executor.spawn_blocking(move || {
143            multi_proof_task.run();
144        });
145
146        let sparse_trie_task = SparseTrieTask {
147            executor: self.executor.clone(),
148            updates: sparse_trie_rx,
149            config: state_root_config,
150            metrics: self.trie_metrics.clone(),
151        };
152
153        // wire the sparse trie to the state root response receiver
154        let (state_root_tx, state_root_rx) = channel();
155        self.executor.spawn_blocking(move || {
156            let res = sparse_trie_task.run();
157            let _ = state_root_tx.send(res);
158        });
159
160        PayloadHandle { to_multi_proof, prewarm_handle, state_root: Some(state_root_rx) }
161    }
162
163    /// Spawn cache prewarming exclusively.
164    ///
165    /// Returns a [`PayloadHandle`] to communicate with the task.
166    pub(super) fn spawn_cache_exclusive<P>(
167        &self,
168        header: SealedHeaderFor<N>,
169        transactions: VecDeque<Recovered<N::SignedTx>>,
170        provider_builder: StateProviderBuilder<N, P>,
171    ) -> PayloadHandle
172    where
173        P: BlockReader
174            + StateProviderFactory
175            + StateReader
176            + StateCommitmentProvider
177            + Clone
178            + 'static,
179    {
180        let prewarm_handle = self.spawn_caching_with(header, transactions, provider_builder, None);
181        PayloadHandle { to_multi_proof: None, prewarm_handle, state_root: None }
182    }
183
184    /// Spawn prewarming optionally wired to the multiproof task for target updates.
185    fn spawn_caching_with<P>(
186        &self,
187        header: SealedHeaderFor<N>,
188        mut transactions: VecDeque<Recovered<N::SignedTx>>,
189        provider_builder: StateProviderBuilder<N, P>,
190        to_multi_proof: Option<Sender<MultiProofMessage>>,
191    ) -> CacheTaskHandle
192    where
193        P: BlockReader
194            + StateProviderFactory
195            + StateReader
196            + StateCommitmentProvider
197            + Clone
198            + 'static,
199    {
200        if !self.use_transaction_prewarming {
201            // if no transactions should be executed we clear them but still spawn the task for
202            // caching updates
203            transactions.clear();
204        }
205
206        let (cache, cache_metrics) = self.cache_for(header.parent_hash()).split();
207        // configure prewarming
208        let prewarm_ctx = PrewarmContext {
209            header,
210            evm_config: self.evm_config.clone(),
211            cache: cache.clone(),
212            cache_metrics: cache_metrics.clone(),
213            provider: provider_builder,
214            metrics: PrewarmMetrics::default(),
215        };
216
217        let prewarm_task = PrewarmCacheTask::new(
218            self.executor.clone(),
219            self.execution_cache.clone(),
220            prewarm_ctx,
221            to_multi_proof,
222            transactions,
223        );
224        let to_prewarm_task = prewarm_task.actions_tx();
225
226        // spawn pre-warm task
227        self.executor.spawn_blocking(move || {
228            prewarm_task.run();
229        });
230        CacheTaskHandle { cache, to_prewarm_task: Some(to_prewarm_task), cache_metrics }
231    }
232
233    /// Returns the cache for the given parent hash.
234    ///
235    /// If the given hash is different then what is recently cached, then this will create a new
236    /// instance.
237    fn cache_for(&self, parent_hash: B256) -> SavedCache {
238        self.execution_cache.get_cache_for(parent_hash).unwrap_or_else(|| {
239            let cache = ProviderCacheBuilder::default().build_caches(self.cross_block_cache_size);
240            SavedCache::new(parent_hash, cache, CachedStateMetrics::zeroed())
241        })
242    }
243}
244
245/// Handle to all the spawned tasks.
246#[derive(Debug)]
247pub struct PayloadHandle {
248    /// Channel for evm state updates
249    to_multi_proof: Option<Sender<MultiProofMessage>>,
250    // must include the receiver of the state root wired to the sparse trie
251    prewarm_handle: CacheTaskHandle,
252    /// Receiver for the state root
253    state_root: Option<mpsc::Receiver<Result<StateRootComputeOutcome, ParallelStateRootError>>>,
254}
255
256impl PayloadHandle {
257    /// Awaits the state root
258    ///
259    /// # Panics
260    ///
261    /// If payload processing was started without background tasks.
262    pub fn state_root(&mut self) -> Result<StateRootComputeOutcome, ParallelStateRootError> {
263        self.state_root
264            .take()
265            .expect("state_root is None")
266            .recv()
267            .map_err(|_| ParallelStateRootError::Other("sparse trie task dropped".to_string()))?
268    }
269
270    /// Returns a state hook to be used to send state updates to this task.
271    ///
272    /// If a multiproof task is spawned the hook will notify it about new states.
273    pub fn state_hook(&self) -> impl OnStateHook {
274        // convert the channel into a `StateHookSender` that emits an event on drop
275        let to_multi_proof = self.to_multi_proof.clone().map(StateHookSender::new);
276
277        move |source: StateChangeSource, state: &EvmState| {
278            if let Some(sender) = &to_multi_proof {
279                let _ = sender.send(MultiProofMessage::StateUpdate(source, state.clone()));
280            }
281        }
282    }
283
284    /// Returns a clone of the caches used by prewarming
285    pub(super) fn caches(&self) -> ProviderCaches {
286        self.prewarm_handle.cache.clone()
287    }
288
289    pub(super) fn cache_metrics(&self) -> CachedStateMetrics {
290        self.prewarm_handle.cache_metrics.clone()
291    }
292
293    /// Terminates the pre-warming transaction processing.
294    ///
295    /// Note: This does not terminate the task yet.
296    pub(super) fn stop_prewarming_execution(&self) {
297        self.prewarm_handle.stop_prewarming_execution()
298    }
299
300    /// Terminates the entire caching task.
301    ///
302    /// If the [`BundleState`] is provided it will update the shared cache.
303    pub(super) fn terminate_caching(&mut self, block_output: Option<BundleState>) {
304        self.prewarm_handle.terminate_caching(block_output)
305    }
306}
307
308/// Access to the spawned [`PrewarmCacheTask`].
309#[derive(Debug)]
310pub(crate) struct CacheTaskHandle {
311    /// The shared cache the task operates with.
312    cache: ProviderCaches,
313    /// Metrics for the caches
314    cache_metrics: CachedStateMetrics,
315    /// Channel to the spawned prewarm task if any
316    to_prewarm_task: Option<Sender<PrewarmTaskEvent>>,
317}
318
319impl CacheTaskHandle {
320    /// Terminates the pre-warming transaction processing.
321    ///
322    /// Note: This does not terminate the task yet.
323    pub(super) fn stop_prewarming_execution(&self) {
324        self.to_prewarm_task
325            .as_ref()
326            .map(|tx| tx.send(PrewarmTaskEvent::TerminateTransactionExecution).ok());
327    }
328
329    /// Terminates the entire pre-warming task.
330    ///
331    /// If the [`BundleState`] is provided it will update the shared cache.
332    pub(super) fn terminate_caching(&mut self, block_output: Option<BundleState>) {
333        self.to_prewarm_task
334            .take()
335            .map(|tx| tx.send(PrewarmTaskEvent::Terminate { block_output }).ok());
336    }
337}
338
339impl Drop for CacheTaskHandle {
340    fn drop(&mut self) {
341        // Ensure we always terminate on drop
342        self.terminate_caching(None);
343    }
344}
345
346/// Shared access to most recently used cache.
347///
348/// This cache is intended to used for processing the payload in the following manner:
349///  - Get Cache if the payload's parent block matches the parent block
350///  - Update cache upon successful payload execution
351///
352/// This process assumes that payloads are received sequentially.
353#[derive(Clone, Debug, Default)]
354struct ExecutionCache {
355    /// Guarded cloneable cache identified by a block hash.
356    inner: Arc<RwLock<Option<SavedCache>>>,
357}
358
359impl ExecutionCache {
360    /// Returns the cache if the currently store cache is for the given `parent_hash`
361    pub(crate) fn get_cache_for(&self, parent_hash: B256) -> Option<SavedCache> {
362        let cache = self.inner.read();
363        cache
364            .as_ref()
365            .and_then(|cache| (cache.executed_block_hash() == parent_hash).then(|| cache.clone()))
366    }
367
368    /// Clears the tracked cache
369    #[allow(unused)]
370    pub(crate) fn clear(&self) {
371        self.inner.write().take();
372    }
373
374    /// Stores the provider cache
375    pub(crate) fn save_cache(&self, cache: SavedCache) {
376        self.inner.write().replace(cache);
377    }
378}
379
380#[cfg(test)]
381mod tests {
382    use std::sync::Arc;
383
384    use crate::tree::{
385        payload_processor::{
386            evm_state_to_hashed_post_state, executor::WorkloadExecutor, PayloadProcessor,
387        },
388        StateProviderBuilder, TreeConfig,
389    };
390    use alloy_evm::block::StateChangeSource;
391    use reth_chainspec::ChainSpec;
392    use reth_db_common::init::init_genesis;
393    use reth_ethereum_primitives::EthPrimitives;
394    use reth_evm::OnStateHook;
395    use reth_evm_ethereum::EthEvmConfig;
396    use reth_primitives_traits::{Account, StorageEntry};
397    use reth_provider::{
398        providers::{BlockchainProvider, ConsistentDbView},
399        test_utils::create_test_provider_factory_with_chain_spec,
400        ChainSpecProvider, HashingWriter,
401    };
402    use reth_testing_utils::generators::{self, Rng};
403    use reth_trie::{test_utils::state_root, HashedPostState, TrieInput};
404    use revm_primitives::{Address, HashMap, B256, KECCAK_EMPTY, U256};
405    use revm_state::{AccountInfo, AccountStatus, EvmState, EvmStorageSlot};
406
407    fn create_mock_state_updates(num_accounts: usize, updates_per_account: usize) -> Vec<EvmState> {
408        let mut rng = generators::rng();
409        let all_addresses: Vec<Address> = (0..num_accounts).map(|_| rng.gen()).collect();
410        let mut updates = Vec::new();
411
412        for _ in 0..updates_per_account {
413            let num_accounts_in_update = rng.gen_range(1..=num_accounts);
414            let mut state_update = EvmState::default();
415
416            let selected_addresses = &all_addresses[0..num_accounts_in_update];
417
418            for &address in selected_addresses {
419                let mut storage = HashMap::default();
420                if rng.gen_bool(0.7) {
421                    for _ in 0..rng.gen_range(1..10) {
422                        let slot = U256::from(rng.gen::<u64>());
423                        storage.insert(
424                            slot,
425                            EvmStorageSlot::new_changed(U256::ZERO, U256::from(rng.gen::<u64>())),
426                        );
427                    }
428                }
429
430                let account = revm_state::Account {
431                    info: AccountInfo {
432                        balance: U256::from(rng.gen::<u64>()),
433                        nonce: rng.gen::<u64>(),
434                        code_hash: KECCAK_EMPTY,
435                        code: Some(Default::default()),
436                    },
437                    storage,
438                    status: AccountStatus::Touched,
439                };
440
441                state_update.insert(address, account);
442            }
443
444            updates.push(state_update);
445        }
446
447        updates
448    }
449
450    #[test]
451    fn test_state_root() {
452        reth_tracing::init_test_tracing();
453
454        let factory = create_test_provider_factory_with_chain_spec(Arc::new(ChainSpec::default()));
455        let genesis_hash = init_genesis(&factory).unwrap();
456
457        let state_updates = create_mock_state_updates(10, 10);
458        let mut hashed_state = HashedPostState::default();
459        let mut accumulated_state: HashMap<Address, (Account, HashMap<B256, U256>)> =
460            HashMap::default();
461
462        {
463            let provider_rw = factory.provider_rw().expect("failed to get provider");
464
465            for update in &state_updates {
466                let account_updates = update.iter().map(|(address, account)| {
467                    (*address, Some(Account::from_revm_account(account)))
468                });
469                provider_rw
470                    .insert_account_for_hashing(account_updates)
471                    .expect("failed to insert accounts");
472
473                let storage_updates = update.iter().map(|(address, account)| {
474                    let storage_entries = account.storage.iter().map(|(slot, value)| {
475                        StorageEntry { key: B256::from(*slot), value: value.present_value }
476                    });
477                    (*address, storage_entries)
478                });
479                provider_rw
480                    .insert_storage_for_hashing(storage_updates)
481                    .expect("failed to insert storage");
482            }
483            provider_rw.commit().expect("failed to commit changes");
484        }
485
486        for update in &state_updates {
487            hashed_state.extend(evm_state_to_hashed_post_state(update.clone()));
488
489            for (address, account) in update {
490                let storage: HashMap<B256, U256> = account
491                    .storage
492                    .iter()
493                    .map(|(k, v)| (B256::from(*k), v.present_value))
494                    .collect();
495
496                let entry = accumulated_state.entry(*address).or_default();
497                entry.0 = Account::from_revm_account(account);
498                entry.1.extend(storage);
499            }
500        }
501
502        let payload_processor = PayloadProcessor::<EthPrimitives, _>::new(
503            WorkloadExecutor::default(),
504            EthEvmConfig::new(factory.chain_spec()),
505            &TreeConfig::default(),
506        );
507        let provider = BlockchainProvider::new(factory).unwrap();
508        let mut handle = payload_processor.spawn(
509            Default::default(),
510            Default::default(),
511            StateProviderBuilder::new(provider.clone(), genesis_hash, None),
512            ConsistentDbView::new_with_latest_tip(provider).unwrap(),
513            TrieInput::from_state(hashed_state),
514        );
515
516        let mut state_hook = handle.state_hook();
517
518        for (i, update) in state_updates.into_iter().enumerate() {
519            state_hook.on_state(StateChangeSource::Transaction(i), &update);
520        }
521        drop(state_hook);
522
523        let root_from_task = handle.state_root().expect("task failed").state_root;
524        let root_from_regular = state_root(accumulated_state);
525
526        assert_eq!(
527            root_from_task, root_from_regular,
528            "State root mismatch: task={root_from_task}, base={root_from_regular}"
529        );
530    }
531}