reth_engine_tree/tree/payload_processor/
prewarm.rs
1use crate::tree::{
4 cached_state::{CachedStateMetrics, CachedStateProvider, ProviderCaches, SavedCache},
5 payload_processor::{
6 executor::WorkloadExecutor, multiproof::MultiProofMessage, ExecutionCache,
7 },
8 StateProviderBuilder,
9};
10use alloy_consensus::transaction::Recovered;
11use alloy_evm::Database;
12use alloy_primitives::{keccak256, map::B256Set, B256};
13use itertools::Itertools;
14use metrics::{Gauge, Histogram};
15use reth_evm::{ConfigureEvm, Evm, EvmFor};
16use reth_metrics::Metrics;
17use reth_primitives_traits::{header::SealedHeaderFor, NodePrimitives, SignedTransaction};
18use reth_provider::{BlockReader, StateCommitmentProvider, StateProviderFactory, StateReader};
19use reth_revm::{database::StateProviderDatabase, db::BundleState, state::EvmState};
20use reth_trie::MultiProofTargets;
21use std::{
22 collections::VecDeque,
23 sync::mpsc::{channel, Receiver, Sender},
24 time::Instant,
25};
26use tracing::{debug, trace};
27
28pub(super) struct PrewarmCacheTask<N: NodePrimitives, P, Evm> {
33 executor: WorkloadExecutor,
35 execution_cache: ExecutionCache,
37 pending: VecDeque<Recovered<N::SignedTx>>,
39 ctx: PrewarmContext<N, P, Evm>,
41 max_concurrency: usize,
43 to_multi_proof: Option<Sender<MultiProofMessage>>,
45 actions_rx: Receiver<PrewarmTaskEvent>,
47 actions_tx: Sender<PrewarmTaskEvent>,
49}
50
51impl<N, P, Evm> PrewarmCacheTask<N, P, Evm>
52where
53 N: NodePrimitives,
54 P: BlockReader + StateProviderFactory + StateReader + StateCommitmentProvider + Clone + 'static,
55 Evm: ConfigureEvm<Primitives = N> + 'static,
56{
57 pub(super) fn new(
59 executor: WorkloadExecutor,
60 execution_cache: ExecutionCache,
61 ctx: PrewarmContext<N, P, Evm>,
62 to_multi_proof: Option<Sender<MultiProofMessage>>,
63 pending: VecDeque<Recovered<N::SignedTx>>,
64 ) -> Self {
65 let (actions_tx, actions_rx) = channel();
66 Self {
67 executor,
68 execution_cache,
69 pending,
70 ctx,
71 max_concurrency: 64,
72 to_multi_proof,
73 actions_rx,
74 actions_tx,
75 }
76 }
77
78 pub(super) fn actions_tx(&self) -> Sender<PrewarmTaskEvent> {
80 self.actions_tx.clone()
81 }
82
83 fn spawn_all(&mut self) {
85 let chunk_size = (self.pending.len() / self.max_concurrency).max(1);
86
87 for chunk in &self.pending.drain(..).chunks(chunk_size) {
88 let sender = self.actions_tx.clone();
89 let ctx = self.ctx.clone();
90 let pending_chunk = chunk.collect::<Vec<_>>();
91
92 self.executor.spawn_blocking(move || {
93 ctx.transact_batch(&pending_chunk, sender);
94 });
95 }
96 }
97
98 fn send_multi_proof_targets(&self, targets: Option<MultiProofTargets>) {
100 if let Some((proof_targets, to_multi_proof)) = targets.zip(self.to_multi_proof.as_ref()) {
101 let _ = to_multi_proof.send(MultiProofMessage::PrefetchProofs(proof_targets));
102 }
103 }
104
105 fn save_cache(&self, state: BundleState) {
107 let start = Instant::now();
108 let cache = SavedCache::new(
109 self.ctx.header.hash(),
110 self.ctx.cache.clone(),
111 self.ctx.cache_metrics.clone(),
112 );
113 if cache.cache().insert_state(&state).is_err() {
114 return
115 }
116
117 cache.update_metrics();
118
119 debug!(target: "engine::caching", "Updated state caches");
120
121 self.execution_cache.save_cache(cache);
123 self.ctx.metrics.cache_saving_duration.set(start.elapsed().as_secs_f64());
124 }
125
126 pub(super) fn run(mut self) {
131 self.ctx.metrics.transactions.set(self.pending.len() as f64);
132 self.ctx.metrics.transactions_histogram.record(self.pending.len() as f64);
133
134 self.spawn_all();
136
137 while let Ok(event) = self.actions_rx.recv() {
138 match event {
139 PrewarmTaskEvent::TerminateTransactionExecution => {
140 self.pending.clear();
142 }
143 PrewarmTaskEvent::Outcome { proof_targets } => {
144 self.send_multi_proof_targets(proof_targets);
146 }
147 PrewarmTaskEvent::Terminate { block_output } => {
148 if let Some(state) = block_output {
150 self.save_cache(state);
151 }
152
153 break
154 }
155 }
156 }
157 }
158}
159
160#[derive(Debug, Clone)]
162pub(super) struct PrewarmContext<N: NodePrimitives, P, Evm> {
163 pub(super) header: SealedHeaderFor<N>,
164 pub(super) evm_config: Evm,
165 pub(super) cache: ProviderCaches,
166 pub(super) cache_metrics: CachedStateMetrics,
167 pub(super) provider: StateProviderBuilder<N, P>,
169 pub(super) metrics: PrewarmMetrics,
170}
171
172impl<N, P, Evm> PrewarmContext<N, P, Evm>
173where
174 N: NodePrimitives,
175 P: BlockReader + StateProviderFactory + StateReader + StateCommitmentProvider + Clone + 'static,
176 Evm: ConfigureEvm<Primitives = N> + 'static,
177{
178 fn evm_for_ctx(self) -> Option<(EvmFor<Evm, impl Database>, Evm, PrewarmMetrics)> {
180 let Self { header, evm_config, cache: caches, cache_metrics, provider, metrics } = self;
181
182 let state_provider = match provider.build() {
183 Ok(provider) => provider,
184 Err(err) => {
185 trace!(
186 target: "engine::tree",
187 %err,
188 "Failed to build state provider in prewarm thread"
189 );
190 return None
191 }
192 };
193
194 let state_provider =
196 CachedStateProvider::new_with_caches(state_provider, caches, cache_metrics);
197
198 let state_provider = StateProviderDatabase::new(state_provider);
199
200 let mut evm_env = evm_config.evm_env(&header);
201
202 evm_env.cfg_env.disable_nonce_check = true;
205
206 let evm = evm_config.evm_with_env(state_provider, evm_env);
208
209 Some((evm, evm_config, metrics))
210 }
211
212 fn transact_batch(self, txs: &[Recovered<N::SignedTx>], sender: Sender<PrewarmTaskEvent>) {
220 let Some((mut evm, evm_config, metrics)) = self.evm_for_ctx() else { return };
221
222 for tx in txs {
223 let tx_env = evm_config.tx_env(tx);
225 let start = Instant::now();
226 let res = match evm.transact(tx_env) {
227 Ok(res) => res,
228 Err(err) => {
229 trace!(
230 target: "engine::tree",
231 %err,
232 tx_hash=%tx.tx_hash(),
233 sender=%tx.signer(),
234 "Error when executing prewarm transaction",
235 );
236 return
237 }
238 };
239 metrics.execution_duration.record(start.elapsed());
240
241 let (targets, storage_targets) = multiproof_targets_from_state(res.state);
242 metrics.prefetch_storage_targets.record(storage_targets as f64);
243 metrics.total_runtime.record(start.elapsed());
244
245 let _ = sender.send(PrewarmTaskEvent::Outcome { proof_targets: Some(targets) });
246 }
247 }
248}
249
250fn multiproof_targets_from_state(state: EvmState) -> (MultiProofTargets, usize) {
253 let mut targets = MultiProofTargets::with_capacity(state.len());
254 let mut storage_targets = 0;
255 for (addr, account) in state {
256 if !account.is_touched() || account.is_selfdestructed() {
264 continue
265 }
266
267 let mut storage_set =
268 B256Set::with_capacity_and_hasher(account.storage.len(), Default::default());
269 for (key, slot) in account.storage {
270 if !slot.is_changed() {
272 continue
273 }
274
275 storage_set.insert(keccak256(B256::new(key.to_be_bytes())));
276 }
277
278 storage_targets += storage_set.len();
279 targets.insert(keccak256(addr), storage_set);
280 }
281
282 (targets, storage_targets)
283}
284
285pub(super) enum PrewarmTaskEvent {
287 TerminateTransactionExecution,
289 Terminate {
292 block_output: Option<BundleState>,
294 },
295 Outcome {
297 proof_targets: Option<MultiProofTargets>,
299 },
300}
301
302#[derive(Metrics, Clone)]
304#[metrics(scope = "sync.prewarm")]
305pub(crate) struct PrewarmMetrics {
306 pub(crate) transactions: Gauge,
308 pub(crate) transactions_histogram: Histogram,
310 pub(crate) total_runtime: Histogram,
312 pub(crate) execution_duration: Histogram,
314 pub(crate) prefetch_storage_targets: Histogram,
316 pub(crate) cache_saving_duration: Gauge,
318}