reth_engine_tree/tree/payload_processor/
mod.rs
1use crate::tree::{
4 cached_state::{CachedStateMetrics, ProviderCacheBuilder, ProviderCaches, SavedCache},
5 payload_processor::{
6 prewarm::{PrewarmCacheTask, PrewarmContext, PrewarmTaskEvent},
7 sparse_trie::StateRootComputeOutcome,
8 },
9 sparse_trie::SparseTrieTask,
10 StateProviderBuilder, TreeConfig,
11};
12use alloy_consensus::{transaction::Recovered, BlockHeader};
13use alloy_evm::block::StateChangeSource;
14use alloy_primitives::B256;
15use executor::WorkloadExecutor;
16use multiproof::*;
17use parking_lot::RwLock;
18use prewarm::PrewarmMetrics;
19use reth_evm::{ConfigureEvm, OnStateHook};
20use reth_primitives_traits::{NodePrimitives, SealedHeaderFor};
21use reth_provider::{
22 providers::ConsistentDbView, BlockReader, DatabaseProviderFactory, StateCommitmentProvider,
23 StateProviderFactory, StateReader,
24};
25use reth_revm::{db::BundleState, state::EvmState};
26use reth_trie::TrieInput;
27use reth_trie_parallel::root::ParallelStateRootError;
28use std::{
29 collections::VecDeque,
30 sync::{
31 mpsc,
32 mpsc::{channel, Sender},
33 Arc,
34 },
35};
36
37pub mod executor;
38pub mod multiproof;
39pub mod prewarm;
40pub mod sparse_trie;
41
42#[derive(Debug, Clone)]
44pub struct PayloadProcessor<N, Evm> {
45 executor: WorkloadExecutor,
47 execution_cache: ExecutionCache,
49 trie_metrics: MultiProofTaskMetrics,
51 cross_block_cache_size: u64,
53 use_transaction_prewarming: bool,
55 evm_config: Evm,
57 _marker: std::marker::PhantomData<N>,
58}
59
60impl<N, Evm> PayloadProcessor<N, Evm> {
61 pub fn new(executor: WorkloadExecutor, evm_config: Evm, config: &TreeConfig) -> Self {
63 Self {
64 executor,
65 execution_cache: Default::default(),
66 trie_metrics: Default::default(),
67 cross_block_cache_size: config.cross_block_cache_size(),
68 use_transaction_prewarming: config.use_caching_and_prewarming(),
69 evm_config,
70 _marker: Default::default(),
71 }
72 }
73}
74
75impl<N, Evm> PayloadProcessor<N, Evm>
76where
77 N: NodePrimitives,
78 Evm: ConfigureEvm<Primitives = N> + 'static,
79{
80 pub fn spawn<P>(
113 &self,
114 header: SealedHeaderFor<N>,
115 transactions: VecDeque<Recovered<N::SignedTx>>,
116 provider_builder: StateProviderBuilder<N, P>,
117 consistent_view: ConsistentDbView<P>,
118 trie_input: TrieInput,
119 ) -> PayloadHandle
120 where
121 P: DatabaseProviderFactory<Provider: BlockReader>
122 + BlockReader
123 + StateProviderFactory
124 + StateReader
125 + StateCommitmentProvider
126 + Clone
127 + 'static,
128 {
129 let (to_sparse_trie, sparse_trie_rx) = channel();
130 let state_root_config = MultiProofConfig::new_from_input(consistent_view, trie_input);
132 let multi_proof_task =
133 MultiProofTask::new(state_root_config.clone(), self.executor.clone(), to_sparse_trie);
134
135 let to_multi_proof = Some(multi_proof_task.state_root_message_sender());
137
138 let prewarm_handle =
139 self.spawn_caching_with(header, transactions, provider_builder, to_multi_proof.clone());
140
141 self.executor.spawn_blocking(move || {
143 multi_proof_task.run();
144 });
145
146 let sparse_trie_task = SparseTrieTask {
147 executor: self.executor.clone(),
148 updates: sparse_trie_rx,
149 config: state_root_config,
150 metrics: self.trie_metrics.clone(),
151 };
152
153 let (state_root_tx, state_root_rx) = channel();
155 self.executor.spawn_blocking(move || {
156 let res = sparse_trie_task.run();
157 let _ = state_root_tx.send(res);
158 });
159
160 PayloadHandle { to_multi_proof, prewarm_handle, state_root: Some(state_root_rx) }
161 }
162
163 pub(super) fn spawn_cache_exclusive<P>(
167 &self,
168 header: SealedHeaderFor<N>,
169 transactions: VecDeque<Recovered<N::SignedTx>>,
170 provider_builder: StateProviderBuilder<N, P>,
171 ) -> PayloadHandle
172 where
173 P: BlockReader
174 + StateProviderFactory
175 + StateReader
176 + StateCommitmentProvider
177 + Clone
178 + 'static,
179 {
180 let prewarm_handle = self.spawn_caching_with(header, transactions, provider_builder, None);
181 PayloadHandle { to_multi_proof: None, prewarm_handle, state_root: None }
182 }
183
184 fn spawn_caching_with<P>(
186 &self,
187 header: SealedHeaderFor<N>,
188 mut transactions: VecDeque<Recovered<N::SignedTx>>,
189 provider_builder: StateProviderBuilder<N, P>,
190 to_multi_proof: Option<Sender<MultiProofMessage>>,
191 ) -> CacheTaskHandle
192 where
193 P: BlockReader
194 + StateProviderFactory
195 + StateReader
196 + StateCommitmentProvider
197 + Clone
198 + 'static,
199 {
200 if !self.use_transaction_prewarming {
201 transactions.clear();
204 }
205
206 let (cache, cache_metrics) = self.cache_for(header.parent_hash()).split();
207 let prewarm_ctx = PrewarmContext {
209 header,
210 evm_config: self.evm_config.clone(),
211 cache: cache.clone(),
212 cache_metrics: cache_metrics.clone(),
213 provider: provider_builder,
214 metrics: PrewarmMetrics::default(),
215 };
216
217 let prewarm_task = PrewarmCacheTask::new(
218 self.executor.clone(),
219 self.execution_cache.clone(),
220 prewarm_ctx,
221 to_multi_proof,
222 transactions,
223 );
224 let to_prewarm_task = prewarm_task.actions_tx();
225
226 self.executor.spawn_blocking(move || {
228 prewarm_task.run();
229 });
230 CacheTaskHandle { cache, to_prewarm_task: Some(to_prewarm_task), cache_metrics }
231 }
232
233 fn cache_for(&self, parent_hash: B256) -> SavedCache {
238 self.execution_cache.get_cache_for(parent_hash).unwrap_or_else(|| {
239 let cache = ProviderCacheBuilder::default().build_caches(self.cross_block_cache_size);
240 SavedCache::new(parent_hash, cache, CachedStateMetrics::zeroed())
241 })
242 }
243}
244
245#[derive(Debug)]
247pub struct PayloadHandle {
248 to_multi_proof: Option<Sender<MultiProofMessage>>,
250 prewarm_handle: CacheTaskHandle,
252 state_root: Option<mpsc::Receiver<Result<StateRootComputeOutcome, ParallelStateRootError>>>,
254}
255
256impl PayloadHandle {
257 pub fn state_root(&mut self) -> Result<StateRootComputeOutcome, ParallelStateRootError> {
263 self.state_root
264 .take()
265 .expect("state_root is None")
266 .recv()
267 .map_err(|_| ParallelStateRootError::Other("sparse trie task dropped".to_string()))?
268 }
269
270 pub fn state_hook(&self) -> impl OnStateHook {
274 let to_multi_proof = self.to_multi_proof.clone().map(StateHookSender::new);
276
277 move |source: StateChangeSource, state: &EvmState| {
278 if let Some(sender) = &to_multi_proof {
279 let _ = sender.send(MultiProofMessage::StateUpdate(source, state.clone()));
280 }
281 }
282 }
283
284 pub(super) fn caches(&self) -> ProviderCaches {
286 self.prewarm_handle.cache.clone()
287 }
288
289 pub(super) fn cache_metrics(&self) -> CachedStateMetrics {
290 self.prewarm_handle.cache_metrics.clone()
291 }
292
293 pub(super) fn stop_prewarming_execution(&self) {
297 self.prewarm_handle.stop_prewarming_execution()
298 }
299
300 pub(super) fn terminate_caching(&mut self, block_output: Option<BundleState>) {
304 self.prewarm_handle.terminate_caching(block_output)
305 }
306}
307
308#[derive(Debug)]
310pub(crate) struct CacheTaskHandle {
311 cache: ProviderCaches,
313 cache_metrics: CachedStateMetrics,
315 to_prewarm_task: Option<Sender<PrewarmTaskEvent>>,
317}
318
319impl CacheTaskHandle {
320 pub(super) fn stop_prewarming_execution(&self) {
324 self.to_prewarm_task
325 .as_ref()
326 .map(|tx| tx.send(PrewarmTaskEvent::TerminateTransactionExecution).ok());
327 }
328
329 pub(super) fn terminate_caching(&mut self, block_output: Option<BundleState>) {
333 self.to_prewarm_task
334 .take()
335 .map(|tx| tx.send(PrewarmTaskEvent::Terminate { block_output }).ok());
336 }
337}
338
339impl Drop for CacheTaskHandle {
340 fn drop(&mut self) {
341 self.terminate_caching(None);
343 }
344}
345
346#[derive(Clone, Debug, Default)]
354struct ExecutionCache {
355 inner: Arc<RwLock<Option<SavedCache>>>,
357}
358
359impl ExecutionCache {
360 pub(crate) fn get_cache_for(&self, parent_hash: B256) -> Option<SavedCache> {
362 let cache = self.inner.read();
363 cache
364 .as_ref()
365 .and_then(|cache| (cache.executed_block_hash() == parent_hash).then(|| cache.clone()))
366 }
367
368 #[allow(unused)]
370 pub(crate) fn clear(&self) {
371 self.inner.write().take();
372 }
373
374 pub(crate) fn save_cache(&self, cache: SavedCache) {
376 self.inner.write().replace(cache);
377 }
378}
379
380#[cfg(test)]
381mod tests {
382 use std::sync::Arc;
383
384 use crate::tree::{
385 payload_processor::{
386 evm_state_to_hashed_post_state, executor::WorkloadExecutor, PayloadProcessor,
387 },
388 StateProviderBuilder, TreeConfig,
389 };
390 use alloy_evm::block::StateChangeSource;
391 use reth_chainspec::ChainSpec;
392 use reth_db_common::init::init_genesis;
393 use reth_ethereum_primitives::EthPrimitives;
394 use reth_evm::OnStateHook;
395 use reth_evm_ethereum::EthEvmConfig;
396 use reth_primitives_traits::{Account, StorageEntry};
397 use reth_provider::{
398 providers::{BlockchainProvider, ConsistentDbView},
399 test_utils::create_test_provider_factory_with_chain_spec,
400 ChainSpecProvider, HashingWriter,
401 };
402 use reth_testing_utils::generators::{self, Rng};
403 use reth_trie::{test_utils::state_root, HashedPostState, TrieInput};
404 use revm_primitives::{Address, HashMap, B256, KECCAK_EMPTY, U256};
405 use revm_state::{AccountInfo, AccountStatus, EvmState, EvmStorageSlot};
406
407 fn create_mock_state_updates(num_accounts: usize, updates_per_account: usize) -> Vec<EvmState> {
408 let mut rng = generators::rng();
409 let all_addresses: Vec<Address> = (0..num_accounts).map(|_| rng.gen()).collect();
410 let mut updates = Vec::new();
411
412 for _ in 0..updates_per_account {
413 let num_accounts_in_update = rng.gen_range(1..=num_accounts);
414 let mut state_update = EvmState::default();
415
416 let selected_addresses = &all_addresses[0..num_accounts_in_update];
417
418 for &address in selected_addresses {
419 let mut storage = HashMap::default();
420 if rng.gen_bool(0.7) {
421 for _ in 0..rng.gen_range(1..10) {
422 let slot = U256::from(rng.gen::<u64>());
423 storage.insert(
424 slot,
425 EvmStorageSlot::new_changed(U256::ZERO, U256::from(rng.gen::<u64>())),
426 );
427 }
428 }
429
430 let account = revm_state::Account {
431 info: AccountInfo {
432 balance: U256::from(rng.gen::<u64>()),
433 nonce: rng.gen::<u64>(),
434 code_hash: KECCAK_EMPTY,
435 code: Some(Default::default()),
436 },
437 storage,
438 status: AccountStatus::Touched,
439 };
440
441 state_update.insert(address, account);
442 }
443
444 updates.push(state_update);
445 }
446
447 updates
448 }
449
450 #[test]
451 fn test_state_root() {
452 reth_tracing::init_test_tracing();
453
454 let factory = create_test_provider_factory_with_chain_spec(Arc::new(ChainSpec::default()));
455 let genesis_hash = init_genesis(&factory).unwrap();
456
457 let state_updates = create_mock_state_updates(10, 10);
458 let mut hashed_state = HashedPostState::default();
459 let mut accumulated_state: HashMap<Address, (Account, HashMap<B256, U256>)> =
460 HashMap::default();
461
462 {
463 let provider_rw = factory.provider_rw().expect("failed to get provider");
464
465 for update in &state_updates {
466 let account_updates = update.iter().map(|(address, account)| {
467 (*address, Some(Account::from_revm_account(account)))
468 });
469 provider_rw
470 .insert_account_for_hashing(account_updates)
471 .expect("failed to insert accounts");
472
473 let storage_updates = update.iter().map(|(address, account)| {
474 let storage_entries = account.storage.iter().map(|(slot, value)| {
475 StorageEntry { key: B256::from(*slot), value: value.present_value }
476 });
477 (*address, storage_entries)
478 });
479 provider_rw
480 .insert_storage_for_hashing(storage_updates)
481 .expect("failed to insert storage");
482 }
483 provider_rw.commit().expect("failed to commit changes");
484 }
485
486 for update in &state_updates {
487 hashed_state.extend(evm_state_to_hashed_post_state(update.clone()));
488
489 for (address, account) in update {
490 let storage: HashMap<B256, U256> = account
491 .storage
492 .iter()
493 .map(|(k, v)| (B256::from(*k), v.present_value))
494 .collect();
495
496 let entry = accumulated_state.entry(*address).or_default();
497 entry.0 = Account::from_revm_account(account);
498 entry.1.extend(storage);
499 }
500 }
501
502 let payload_processor = PayloadProcessor::<EthPrimitives, _>::new(
503 WorkloadExecutor::default(),
504 EthEvmConfig::new(factory.chain_spec()),
505 &TreeConfig::default(),
506 );
507 let provider = BlockchainProvider::new(factory).unwrap();
508 let mut handle = payload_processor.spawn(
509 Default::default(),
510 Default::default(),
511 StateProviderBuilder::new(provider.clone(), genesis_hash, None),
512 ConsistentDbView::new_with_latest_tip(provider).unwrap(),
513 TrieInput::from_state(hashed_state),
514 );
515
516 let mut state_hook = handle.state_hook();
517
518 for (i, update) in state_updates.into_iter().enumerate() {
519 state_hook.on_state(StateChangeSource::Transaction(i), &update);
520 }
521 drop(state_hook);
522
523 let root_from_task = handle.state_root().expect("task failed").state_root;
524 let root_from_regular = state_root(accumulated_state);
525
526 assert_eq!(
527 root_from_task, root_from_regular,
528 "State root mismatch: task={root_from_task}, base={root_from_regular}"
529 );
530 }
531}