reth_engine_tree/tree/payload_processor/
prewarm.rs
1use crate::tree::{
4 cached_state::{CachedStateMetrics, CachedStateProvider, ProviderCaches, SavedCache},
5 payload_processor::{
6 executor::WorkloadExecutor, multiproof::MultiProofMessage, ExecutionCache,
7 },
8 StateProviderBuilder,
9};
10use alloy_consensus::transaction::Recovered;
11use alloy_primitives::{keccak256, map::B256Set, B256};
12use metrics::{Gauge, Histogram};
13use reth_evm::{ConfigureEvm, Evm};
14use reth_metrics::Metrics;
15use reth_primitives_traits::{header::SealedHeaderFor, NodePrimitives, SignedTransaction};
16use reth_provider::{BlockReader, StateCommitmentProvider, StateProviderFactory, StateReader};
17use reth_revm::{database::StateProviderDatabase, db::BundleState, state::EvmState};
18use reth_trie::MultiProofTargets;
19use std::{
20 collections::VecDeque,
21 sync::mpsc::{channel, Receiver, Sender},
22 time::Instant,
23};
24use tracing::{debug, trace};
25
26pub(super) struct PrewarmCacheTask<N: NodePrimitives, P, Evm> {
31 executor: WorkloadExecutor,
33 execution_cache: ExecutionCache,
35 pending: VecDeque<Recovered<N::SignedTx>>,
37 ctx: PrewarmContext<N, P, Evm>,
39 in_progress: usize,
41 max_concurrency: usize,
43 to_multi_proof: Option<Sender<MultiProofMessage>>,
45 actions_rx: Receiver<PrewarmTaskEvent>,
47 actions_tx: Sender<PrewarmTaskEvent>,
49}
50
51impl<N, P, Evm> PrewarmCacheTask<N, P, Evm>
52where
53 N: NodePrimitives,
54 P: BlockReader + StateProviderFactory + StateReader + StateCommitmentProvider + Clone + 'static,
55 Evm: ConfigureEvm<Primitives = N> + 'static,
56{
57 pub(super) fn new(
59 executor: WorkloadExecutor,
60 execution_cache: ExecutionCache,
61 ctx: PrewarmContext<N, P, Evm>,
62 to_multi_proof: Option<Sender<MultiProofMessage>>,
63 pending: VecDeque<Recovered<N::SignedTx>>,
64 ) -> Self {
65 let (actions_tx, actions_rx) = channel();
66 Self {
67 executor,
68 execution_cache,
69 pending,
70 ctx,
71 in_progress: 0,
72 max_concurrency: 4,
74 to_multi_proof,
75 actions_rx,
76 actions_tx,
77 }
78 }
79
80 pub(super) fn actions_tx(&self) -> Sender<PrewarmTaskEvent> {
82 self.actions_tx.clone()
83 }
84
85 fn spawn_next(&mut self) {
87 while self.in_progress < self.max_concurrency {
88 if let Some(tx) = self.pending.pop_front() {
89 self.spawn_transaction(tx);
90 } else {
91 break
92 }
93 }
94 }
95
96 fn spawn_transaction(&self, tx: Recovered<N::SignedTx>) {
98 let ctx = self.ctx.clone();
99 let metrics = self.ctx.metrics.clone();
100 let actions_tx = self.actions_tx.clone();
101 let prepare_proof_targets = self.should_prepare_multi_proof_targets();
102 self.executor.spawn_blocking(move || {
103 let start = Instant::now();
104 let proof_targets = if prepare_proof_targets {
107 ctx.prepare_multiproof_targets(tx)
108 } else {
109 ctx.transact(tx);
110 None
111 };
112 let _ = actions_tx.send(PrewarmTaskEvent::Outcome { proof_targets });
113 metrics.total_runtime.record(start.elapsed());
114 });
115 }
116
117 fn should_prepare_multi_proof_targets(&self) -> bool {
119 self.to_multi_proof.is_some()
120 }
121
122 fn send_multi_proof_targets(&self, targets: Option<MultiProofTargets>) {
124 if let Some((proof_targets, to_multi_proof)) = targets.zip(self.to_multi_proof.as_ref()) {
125 let _ = to_multi_proof.send(MultiProofMessage::PrefetchProofs(proof_targets));
126 }
127 }
128
129 fn save_cache(&self, state: BundleState) {
131 let start = Instant::now();
132 let cache = SavedCache::new(
133 self.ctx.header.hash(),
134 self.ctx.cache.clone(),
135 self.ctx.cache_metrics.clone(),
136 );
137 if cache.cache().insert_state(&state).is_err() {
138 return
139 }
140
141 cache.update_metrics();
142
143 debug!(target: "engine::caching", "Updated state caches");
144
145 self.execution_cache.save_cache(cache);
147 self.ctx.metrics.cache_saving_duration.set(start.elapsed().as_secs_f64());
148 }
149
150 pub(super) fn run(mut self) {
155 self.ctx.metrics.transactions.set(self.pending.len() as f64);
156 self.ctx.metrics.transactions_histogram.record(self.pending.len() as f64);
157
158 self.spawn_next();
160
161 while let Ok(event) = self.actions_rx.recv() {
162 match event {
163 PrewarmTaskEvent::TerminateTransactionExecution => {
164 self.pending.clear();
166 }
167 PrewarmTaskEvent::Outcome { proof_targets } => {
168 self.in_progress -= 1;
170 self.send_multi_proof_targets(proof_targets);
171 }
172 PrewarmTaskEvent::Terminate { block_output } => {
173 if let Some(state) = block_output {
175 self.save_cache(state);
176 }
177
178 break
179 }
180 }
181
182 self.spawn_next();
184 }
185 }
186}
187
188#[derive(Debug, Clone)]
190pub(super) struct PrewarmContext<N: NodePrimitives, P, Evm> {
191 pub(super) header: SealedHeaderFor<N>,
192 pub(super) evm_config: Evm,
193 pub(super) cache: ProviderCaches,
194 pub(super) cache_metrics: CachedStateMetrics,
195 pub(super) provider: StateProviderBuilder<N, P>,
197 pub(super) metrics: PrewarmMetrics,
198}
199
200impl<N, P, Evm> PrewarmContext<N, P, Evm>
201where
202 N: NodePrimitives,
203 P: BlockReader + StateProviderFactory + StateReader + StateCommitmentProvider + Clone + 'static,
204 Evm: ConfigureEvm<Primitives = N> + 'static,
205{
206 fn prepare_multiproof_targets(self, tx: Recovered<N::SignedTx>) -> Option<MultiProofTargets> {
208 let metrics = self.metrics.clone();
209 let state = self.transact(tx)?;
210
211 let mut targets = MultiProofTargets::with_capacity(state.len());
212 let mut storage_targets = 0;
213
214 for (addr, account) in state {
215 if !account.is_touched() || account.is_selfdestructed() {
223 continue
224 }
225
226 let mut storage_set =
227 B256Set::with_capacity_and_hasher(account.storage.len(), Default::default());
228 for (key, slot) in account.storage {
229 if !slot.is_changed() {
231 continue
232 }
233
234 storage_set.insert(keccak256(B256::new(key.to_be_bytes())));
235 }
236
237 storage_targets += storage_set.len();
238 targets.insert(keccak256(addr), storage_set);
239 }
240
241 metrics.prefetch_storage_targets.record(storage_targets as f64);
242
243 Some(targets)
244 }
245
246 fn transact(self, tx: Recovered<N::SignedTx>) -> Option<EvmState> {
254 let Self { header, evm_config, cache: caches, cache_metrics, provider, metrics } = self;
255 let state_provider = match provider.build() {
257 Ok(provider) => provider,
258 Err(err) => {
259 trace!(
260 target: "engine::tree",
261 %err,
262 "Failed to build state provider in prewarm thread"
263 );
264 return None
265 }
266 };
267
268 let state_provider =
270 CachedStateProvider::new_with_caches(state_provider, caches, cache_metrics);
271
272 let state_provider = StateProviderDatabase::new(&state_provider);
273
274 let mut evm_env = evm_config.evm_env(&header);
275
276 evm_env.cfg_env.disable_nonce_check = true;
279
280 let mut evm = evm_config.evm_with_env(state_provider, evm_env);
282
283 let tx_env = evm_config.tx_env(&tx);
285 let start = Instant::now();
286 let res = match evm.transact(tx_env) {
287 Ok(res) => res,
288 Err(err) => {
289 trace!(
290 target: "engine::tree",
291 %err,
292 tx_hash=%tx.tx_hash(),
293 sender=%tx.signer(),
294 "Error when executing prewarm transaction",
295 );
296 return None
297 }
298 };
299 metrics.execution_duration.record(start.elapsed());
300
301 Some(res.state)
302 }
303}
304
305pub(super) enum PrewarmTaskEvent {
307 TerminateTransactionExecution,
309 Terminate {
312 block_output: Option<BundleState>,
314 },
315 Outcome {
317 proof_targets: Option<MultiProofTargets>,
319 },
320}
321
322#[derive(Metrics, Clone)]
324#[metrics(scope = "sync.prewarm")]
325pub(crate) struct PrewarmMetrics {
326 pub(crate) transactions: Gauge,
328 pub(crate) transactions_histogram: Histogram,
330 pub(crate) total_runtime: Histogram,
332 pub(crate) execution_duration: Histogram,
334 pub(crate) prefetch_storage_targets: Histogram,
336 pub(crate) cache_saving_duration: Gauge,
338}