1use crate::tree::{
4 cached_state::{CachedStateMetrics, ProviderCacheBuilder, ProviderCaches, SavedCache},
5 payload_processor::{
6 prewarm::{PrewarmCacheTask, PrewarmContext, PrewarmTaskEvent},
7 sparse_trie::StateRootComputeOutcome,
8 },
9 sparse_trie::SparseTrieTask,
10 StateProviderBuilder, TreeConfig,
11};
12use alloy_consensus::{transaction::Recovered, BlockHeader};
13use alloy_evm::block::StateChangeSource;
14use alloy_primitives::B256;
15use executor::WorkloadExecutor;
16use multiproof::*;
17use parking_lot::RwLock;
18use prewarm::PrewarmMetrics;
19use reth_evm::{ConfigureEvm, OnStateHook};
20use reth_primitives_traits::{NodePrimitives, SealedHeaderFor};
21use reth_provider::{
22 providers::ConsistentDbView, BlockReader, DatabaseProviderFactory, StateCommitmentProvider,
23 StateProviderFactory, StateReader,
24};
25use reth_revm::{db::BundleState, state::EvmState};
26use reth_trie::TrieInput;
27use reth_trie_parallel::{
28 proof_task::{ProofTaskCtx, ProofTaskManager},
29 root::ParallelStateRootError,
30};
31use std::{
32 collections::VecDeque,
33 sync::{
34 mpsc,
35 mpsc::{channel, Sender},
36 Arc,
37 },
38};
39
40pub mod executor;
41pub mod multiproof;
42pub mod prewarm;
43pub mod sparse_trie;
44
45#[derive(Debug, Clone)]
47pub struct PayloadProcessor<N, Evm> {
48 executor: WorkloadExecutor,
50 execution_cache: ExecutionCache,
52 trie_metrics: MultiProofTaskMetrics,
54 cross_block_cache_size: u64,
56 use_transaction_prewarming: bool,
58 evm_config: Evm,
60 _marker: std::marker::PhantomData<N>,
61}
62
63impl<N, Evm> PayloadProcessor<N, Evm> {
64 pub fn new(executor: WorkloadExecutor, evm_config: Evm, config: &TreeConfig) -> Self {
66 Self {
67 executor,
68 execution_cache: Default::default(),
69 trie_metrics: Default::default(),
70 cross_block_cache_size: config.cross_block_cache_size(),
71 use_transaction_prewarming: config.use_caching_and_prewarming(),
72 evm_config,
73 _marker: Default::default(),
74 }
75 }
76}
77
78impl<N, Evm> PayloadProcessor<N, Evm>
79where
80 N: NodePrimitives,
81 Evm: ConfigureEvm<Primitives = N> + 'static,
82{
83 pub fn spawn<P>(
116 &self,
117 header: SealedHeaderFor<N>,
118 transactions: VecDeque<Recovered<N::SignedTx>>,
119 provider_builder: StateProviderBuilder<N, P>,
120 consistent_view: ConsistentDbView<P>,
121 trie_input: TrieInput,
122 config: &TreeConfig,
123 ) -> PayloadHandle
124 where
125 P: DatabaseProviderFactory<Provider: BlockReader>
126 + BlockReader
127 + StateProviderFactory
128 + StateReader
129 + StateCommitmentProvider
130 + Clone
131 + 'static,
132 {
133 let (to_sparse_trie, sparse_trie_rx) = channel();
134 let state_root_config = MultiProofConfig::new_from_input(consistent_view, trie_input);
136
137 let task_ctx = ProofTaskCtx::new(
139 state_root_config.nodes_sorted.clone(),
140 state_root_config.state_sorted.clone(),
141 state_root_config.prefix_sets.clone(),
142 );
143 let max_proof_task_concurrency = config.max_proof_task_concurrency() as usize;
144 let proof_task = ProofTaskManager::new(
145 self.executor.handle().clone(),
146 state_root_config.consistent_view.clone(),
147 task_ctx,
148 max_proof_task_concurrency,
149 );
150
151 let max_multi_proof_task_concurrency = max_proof_task_concurrency / 2;
154 let multi_proof_task = MultiProofTask::new(
155 state_root_config,
156 self.executor.clone(),
157 proof_task.handle(),
158 to_sparse_trie,
159 max_multi_proof_task_concurrency,
160 );
161
162 let to_multi_proof = Some(multi_proof_task.state_root_message_sender());
164
165 let prewarm_handle =
166 self.spawn_caching_with(header, transactions, provider_builder, to_multi_proof.clone());
167
168 self.executor.spawn_blocking(move || {
170 multi_proof_task.run();
171 });
172
173 let sparse_trie_task = SparseTrieTask::new(
174 self.executor.clone(),
175 sparse_trie_rx,
176 proof_task.handle(),
177 self.trie_metrics.clone(),
178 );
179
180 let (state_root_tx, state_root_rx) = channel();
182 self.executor.spawn_blocking(move || {
183 let res = sparse_trie_task.run();
184 let _ = state_root_tx.send(res);
185 });
186
187 self.executor.spawn_blocking(move || {
189 if let Err(err) = proof_task.run() {
190 tracing::error!(
192 target: "engine::root",
193 ?err,
194 "Storage proof task returned an error"
195 );
196 }
197 });
198
199 PayloadHandle { to_multi_proof, prewarm_handle, state_root: Some(state_root_rx) }
200 }
201
202 pub(super) fn spawn_cache_exclusive<P>(
206 &self,
207 header: SealedHeaderFor<N>,
208 transactions: VecDeque<Recovered<N::SignedTx>>,
209 provider_builder: StateProviderBuilder<N, P>,
210 ) -> PayloadHandle
211 where
212 P: BlockReader
213 + StateProviderFactory
214 + StateReader
215 + StateCommitmentProvider
216 + Clone
217 + 'static,
218 {
219 let prewarm_handle = self.spawn_caching_with(header, transactions, provider_builder, None);
220 PayloadHandle { to_multi_proof: None, prewarm_handle, state_root: None }
221 }
222
223 fn spawn_caching_with<P>(
225 &self,
226 header: SealedHeaderFor<N>,
227 mut transactions: VecDeque<Recovered<N::SignedTx>>,
228 provider_builder: StateProviderBuilder<N, P>,
229 to_multi_proof: Option<Sender<MultiProofMessage>>,
230 ) -> CacheTaskHandle
231 where
232 P: BlockReader
233 + StateProviderFactory
234 + StateReader
235 + StateCommitmentProvider
236 + Clone
237 + 'static,
238 {
239 if !self.use_transaction_prewarming {
240 transactions.clear();
243 }
244
245 let (cache, cache_metrics) = self.cache_for(header.parent_hash()).split();
246 let prewarm_ctx = PrewarmContext {
248 header,
249 evm_config: self.evm_config.clone(),
250 cache: cache.clone(),
251 cache_metrics: cache_metrics.clone(),
252 provider: provider_builder,
253 metrics: PrewarmMetrics::default(),
254 };
255
256 let prewarm_task = PrewarmCacheTask::new(
257 self.executor.clone(),
258 self.execution_cache.clone(),
259 prewarm_ctx,
260 to_multi_proof,
261 transactions,
262 );
263 let to_prewarm_task = prewarm_task.actions_tx();
264
265 self.executor.spawn_blocking(move || {
267 prewarm_task.run();
268 });
269 CacheTaskHandle { cache, to_prewarm_task: Some(to_prewarm_task), cache_metrics }
270 }
271
272 fn cache_for(&self, parent_hash: B256) -> SavedCache {
277 self.execution_cache.get_cache_for(parent_hash).unwrap_or_else(|| {
278 let cache = ProviderCacheBuilder::default().build_caches(self.cross_block_cache_size);
279 SavedCache::new(parent_hash, cache, CachedStateMetrics::zeroed())
280 })
281 }
282}
283
284#[derive(Debug)]
286pub struct PayloadHandle {
287 to_multi_proof: Option<Sender<MultiProofMessage>>,
289 prewarm_handle: CacheTaskHandle,
291 state_root: Option<mpsc::Receiver<Result<StateRootComputeOutcome, ParallelStateRootError>>>,
293}
294
295impl PayloadHandle {
296 pub fn state_root(&mut self) -> Result<StateRootComputeOutcome, ParallelStateRootError> {
302 self.state_root
303 .take()
304 .expect("state_root is None")
305 .recv()
306 .map_err(|_| ParallelStateRootError::Other("sparse trie task dropped".to_string()))?
307 }
308
309 pub fn state_hook(&self) -> impl OnStateHook {
313 let to_multi_proof = self.to_multi_proof.clone().map(StateHookSender::new);
315
316 move |source: StateChangeSource, state: &EvmState| {
317 if let Some(sender) = &to_multi_proof {
318 let _ = sender.send(MultiProofMessage::StateUpdate(source, state.clone()));
319 }
320 }
321 }
322
323 pub(super) fn caches(&self) -> ProviderCaches {
325 self.prewarm_handle.cache.clone()
326 }
327
328 pub(super) fn cache_metrics(&self) -> CachedStateMetrics {
329 self.prewarm_handle.cache_metrics.clone()
330 }
331
332 pub(super) fn stop_prewarming_execution(&self) {
336 self.prewarm_handle.stop_prewarming_execution()
337 }
338
339 pub(super) fn terminate_caching(&mut self, block_output: Option<BundleState>) {
343 self.prewarm_handle.terminate_caching(block_output)
344 }
345}
346
347#[derive(Debug)]
349pub(crate) struct CacheTaskHandle {
350 cache: ProviderCaches,
352 cache_metrics: CachedStateMetrics,
354 to_prewarm_task: Option<Sender<PrewarmTaskEvent>>,
356}
357
358impl CacheTaskHandle {
359 pub(super) fn stop_prewarming_execution(&self) {
363 self.to_prewarm_task
364 .as_ref()
365 .map(|tx| tx.send(PrewarmTaskEvent::TerminateTransactionExecution).ok());
366 }
367
368 pub(super) fn terminate_caching(&mut self, block_output: Option<BundleState>) {
372 self.to_prewarm_task
373 .take()
374 .map(|tx| tx.send(PrewarmTaskEvent::Terminate { block_output }).ok());
375 }
376}
377
378impl Drop for CacheTaskHandle {
379 fn drop(&mut self) {
380 self.terminate_caching(None);
382 }
383}
384
385#[derive(Clone, Debug, Default)]
393struct ExecutionCache {
394 inner: Arc<RwLock<Option<SavedCache>>>,
396}
397
398impl ExecutionCache {
399 pub(crate) fn get_cache_for(&self, parent_hash: B256) -> Option<SavedCache> {
401 let cache = self.inner.read();
402 cache
403 .as_ref()
404 .and_then(|cache| (cache.executed_block_hash() == parent_hash).then(|| cache.clone()))
405 }
406
407 #[expect(unused)]
409 pub(crate) fn clear(&self) {
410 self.inner.write().take();
411 }
412
413 pub(crate) fn save_cache(&self, cache: SavedCache) {
415 self.inner.write().replace(cache);
416 }
417}
418
419#[cfg(test)]
420mod tests {
421 use std::sync::Arc;
422
423 use crate::tree::{
424 payload_processor::{
425 evm_state_to_hashed_post_state, executor::WorkloadExecutor, PayloadProcessor,
426 },
427 StateProviderBuilder, TreeConfig,
428 };
429 use alloy_evm::block::StateChangeSource;
430 use rand::Rng;
431 use reth_chainspec::ChainSpec;
432 use reth_db_common::init::init_genesis;
433 use reth_ethereum_primitives::EthPrimitives;
434 use reth_evm::OnStateHook;
435 use reth_evm_ethereum::EthEvmConfig;
436 use reth_primitives_traits::{Account, StorageEntry};
437 use reth_provider::{
438 providers::{BlockchainProvider, ConsistentDbView},
439 test_utils::create_test_provider_factory_with_chain_spec,
440 ChainSpecProvider, HashingWriter,
441 };
442 use reth_testing_utils::generators;
443 use reth_trie::{test_utils::state_root, HashedPostState, TrieInput};
444 use revm_primitives::{Address, HashMap, B256, KECCAK_EMPTY, U256};
445 use revm_state::{AccountInfo, AccountStatus, EvmState, EvmStorageSlot};
446
447 fn create_mock_state_updates(num_accounts: usize, updates_per_account: usize) -> Vec<EvmState> {
448 let mut rng = generators::rng();
449 let all_addresses: Vec<Address> = (0..num_accounts).map(|_| rng.random()).collect();
450 let mut updates = Vec::new();
451
452 for _ in 0..updates_per_account {
453 let num_accounts_in_update = rng.random_range(1..=num_accounts);
454 let mut state_update = EvmState::default();
455
456 let selected_addresses = &all_addresses[0..num_accounts_in_update];
457
458 for &address in selected_addresses {
459 let mut storage = HashMap::default();
460 if rng.random_bool(0.7) {
461 for _ in 0..rng.random_range(1..10) {
462 let slot = U256::from(rng.random::<u64>());
463 storage.insert(
464 slot,
465 EvmStorageSlot::new_changed(
466 U256::ZERO,
467 U256::from(rng.random::<u64>()),
468 ),
469 );
470 }
471 }
472
473 let account = revm_state::Account {
474 info: AccountInfo {
475 balance: U256::from(rng.random::<u64>()),
476 nonce: rng.random::<u64>(),
477 code_hash: KECCAK_EMPTY,
478 code: Some(Default::default()),
479 },
480 storage,
481 status: AccountStatus::Touched,
482 };
483
484 state_update.insert(address, account);
485 }
486
487 updates.push(state_update);
488 }
489
490 updates
491 }
492
493 #[test]
494 fn test_state_root() {
495 reth_tracing::init_test_tracing();
496
497 let factory = create_test_provider_factory_with_chain_spec(Arc::new(ChainSpec::default()));
498 let genesis_hash = init_genesis(&factory).unwrap();
499
500 let state_updates = create_mock_state_updates(10, 10);
501 let mut hashed_state = HashedPostState::default();
502 let mut accumulated_state: HashMap<Address, (Account, HashMap<B256, U256>)> =
503 HashMap::default();
504
505 {
506 let provider_rw = factory.provider_rw().expect("failed to get provider");
507
508 for update in &state_updates {
509 let account_updates = update.iter().map(|(address, account)| {
510 (*address, Some(Account::from_revm_account(account)))
511 });
512 provider_rw
513 .insert_account_for_hashing(account_updates)
514 .expect("failed to insert accounts");
515
516 let storage_updates = update.iter().map(|(address, account)| {
517 let storage_entries = account.storage.iter().map(|(slot, value)| {
518 StorageEntry { key: B256::from(*slot), value: value.present_value }
519 });
520 (*address, storage_entries)
521 });
522 provider_rw
523 .insert_storage_for_hashing(storage_updates)
524 .expect("failed to insert storage");
525 }
526 provider_rw.commit().expect("failed to commit changes");
527 }
528
529 for update in &state_updates {
530 hashed_state.extend(evm_state_to_hashed_post_state(update.clone()));
531
532 for (address, account) in update {
533 let storage: HashMap<B256, U256> = account
534 .storage
535 .iter()
536 .map(|(k, v)| (B256::from(*k), v.present_value))
537 .collect();
538
539 let entry = accumulated_state.entry(*address).or_default();
540 entry.0 = Account::from_revm_account(account);
541 entry.1.extend(storage);
542 }
543 }
544
545 let payload_processor = PayloadProcessor::<EthPrimitives, _>::new(
546 WorkloadExecutor::default(),
547 EthEvmConfig::new(factory.chain_spec()),
548 &TreeConfig::default(),
549 );
550 let provider = BlockchainProvider::new(factory).unwrap();
551 let mut handle = payload_processor.spawn(
552 Default::default(),
553 Default::default(),
554 StateProviderBuilder::new(provider.clone(), genesis_hash, None),
555 ConsistentDbView::new_with_latest_tip(provider).unwrap(),
556 TrieInput::from_state(hashed_state),
557 &TreeConfig::default(),
558 );
559
560 let mut state_hook = handle.state_hook();
561
562 for (i, update) in state_updates.into_iter().enumerate() {
563 state_hook.on_state(StateChangeSource::Transaction(i), &update);
564 }
565 drop(state_hook);
566
567 let root_from_task = handle.state_root().expect("task failed").state_root;
568 let root_from_regular = state_root(accumulated_state);
569
570 assert_eq!(
571 root_from_task, root_from_regular,
572 "State root mismatch: task={root_from_task}, base={root_from_regular}"
573 );
574 }
575}