Skip to main content

reth_engine_tree/tree/payload_processor/
bal_prewarm_pool.rs

1use alloy_primitives::{Address, StorageKey};
2use reth_execution_cache::{CachedStateProvider, ExecutionCache};
3use reth_provider::{
4    AccountReader, BytecodeReader, ProviderResult, StateProvider, StateProviderBox,
5};
6use std::{
7    sync::{
8        atomic::{AtomicUsize, Ordering},
9        Arc,
10    },
11    thread::JoinHandle,
12};
13use tokio::sync::oneshot;
14use tracing::trace;
15
16/// Builds a fresh `StateProviderBox` over the block's parent state. Type-erased so the pool is not
17/// generic over the provider factory; each worker builds its own per block.
18type BuildProviderFn = dyn Fn() -> ProviderResult<StateProviderBox> + Send + Sync;
19
20/// A single warm request: a whole account (basic account + its bytecode) or one storage slot.
21enum PrewarmTarget {
22    Account(Address),
23    Storage(Address, StorageKey),
24}
25
26/// A message in a worker's queue. The per-block lifecycle is explicit and ordered (the queue is
27/// FIFO): one `BeginBlock`, then the worker's share of `Warm`s, then one `EndBlock`.
28enum PrewarmMsg {
29    /// Open a read txn for the new block: build a provider over the parent state and hold it.
30    BeginBlock { build: Arc<BuildProviderFn>, caches: ExecutionCache },
31    /// Warm one target into the held provider's cache. Ignored if no provider is held.
32    Warm(PrewarmTarget),
33    /// Drop the held provider (and its read txn).
34    EndBlock(Arc<SendOnDrop>),
35}
36
37/// Long-lived pool of blocking threads that warm the BAL read-set into the shared execution cache.
38#[derive(Debug)]
39pub(crate) struct BalPrewarmPool {
40    /// One queue per worker. `BeginBlock`/`EndBlock` are broadcast to all; `Warm`s round-robin.
41    workers: Vec<crossbeam_channel::Sender<PrewarmMsg>>,
42    /// Round-robin cursor for distributing warm requests across workers.
43    next: AtomicUsize,
44    _handles: Vec<JoinHandle<()>>,
45}
46
47impl BalPrewarmPool {
48    /// Spawns `num_threads` long-lived blocking worker threads. Owned by the
49    /// [`PayloadProcessor`](super::PayloadProcessor); the threads exit when the pool is dropped.
50    pub(crate) fn new(num_threads: usize) -> Arc<Self> {
51        let mut workers = Vec::with_capacity(num_threads);
52        let mut handles = Vec::with_capacity(num_threads);
53        for i in 0..num_threads {
54            let (tx, rx) = crossbeam_channel::unbounded::<PrewarmMsg>();
55            workers.push(tx);
56            handles.push(
57                std::thread::Builder::new()
58                    .name(format!("bal-prewarm-{i:03}"))
59                    .spawn(move || prewarm_loop(rx))
60                    .expect("spawn bal-prewarm thread"),
61            );
62        }
63        trace!(target: "engine::tree::bal_prewarm_pool", num_threads, "BalPrewarmPool spawned");
64        Arc::new(Self { workers, next: AtomicUsize::new(0), _handles: handles })
65    }
66
67    /// Begins a block: hands every worker the provider builder and shared cache so each opens its
68    /// own read txn over the parent state. Pair with [`end_block`](Self::end_block).
69    pub(crate) fn begin_block(&self, build: Arc<BuildProviderFn>, caches: ExecutionCache) {
70        for worker in &self.workers {
71            let _ = worker
72                .send(PrewarmMsg::BeginBlock { build: build.clone(), caches: caches.clone() });
73        }
74    }
75
76    /// Fire-and-forget: warm an account (basic account + bytecode) on some worker.
77    pub(crate) fn warm_account(&self, addr: Address) {
78        self.send_warm(PrewarmTarget::Account(addr));
79    }
80
81    /// Fire-and-forget: warm one storage slot on some worker.
82    pub(crate) fn warm_storage(&self, addr: Address, slot: StorageKey) {
83        self.send_warm(PrewarmTarget::Storage(addr, slot));
84    }
85
86    /// Ends the block: every worker drops its provider (and read txn) once it has drained the warm
87    /// requests queued ahead of this message.
88    ///
89    /// Blocks until all workers processed the end block message.
90    pub(crate) fn end_block(&self) {
91        let (tx, rx) = oneshot::channel();
92        let tx = Arc::new(SendOnDrop { sender: Some(tx) });
93
94        for worker in &self.workers {
95            let _ = worker.send(PrewarmMsg::EndBlock(tx.clone()));
96        }
97
98        drop(tx);
99        rx.blocking_recv().expect("BAL prewarm pool dropped without signaling completion");
100    }
101
102    fn send_warm(&self, target: PrewarmTarget) {
103        let i = self.next.fetch_add(1, Ordering::Relaxed) % self.workers.len();
104        let _ = self.workers[i].send(PrewarmMsg::Warm(target));
105    }
106}
107
108/// Number of warming threads.
109///
110/// The work performed on those threads boils down mostly to MDBX reads. An MDBX read consists of
111/// a tree traversal and major page faults causing I/O.
112///
113/// In order to utilize the parallelism of `NVMe` we have to give it enough work, or equally,
114/// maintain a high queue depth. Modern `NVMe` devices require in between 64-128 requests in-flight
115/// to achieve its peak performance. Ideally we don't grow past that but it's OK to do so, it just
116/// means that a request is going to wait in the `NVMe` queue rather than in memory.
117///
118/// MDBX piggy-backs on the OS page cache for its buffers. Oftentimes, the hit rate reaches 90-99%
119/// hit rate. At that point, the workload can be classified as CPU-bound. In that case, having
120/// a high number of threads is counterproductive due to the effects of context switching, core
121/// migration, contention, etc.
122///
123/// However, that overhead is considered negligible compared to the benefits of fully utilizing
124/// `NVMe` resources. For example, with request latency of 100µs, 100k IO requests the expected
125/// time to finish is 312.5ms at QD=32 and 156.26ms at QD=64.
126///
127/// This should explain why this particular value is picked.
128pub(crate) const DEFAULT_BAL_PREWARM_THREADS: usize = 128;
129
130fn prewarm_loop(rx: crossbeam_channel::Receiver<PrewarmMsg>) {
131    // The provider (and its MDBX read txn) held for the current block, between `BeginBlock` and
132    // `EndBlock`. `None` while idle, so no read txn is pinned across the inter-block gap.
133    let mut provider: Option<CachedStateProvider<StateProviderBox>> = None;
134
135    // Blocks when idle; the channel disconnects (and the loop ends) when the pool is dropped.
136    while let Ok(msg) = rx.recv() {
137        match msg {
138            PrewarmMsg::BeginBlock { build, caches } => {
139                provider = match (build)() {
140                    Ok(inner) => Some(CachedStateProvider::new_prewarm(inner, caches)),
141                    Err(err) => {
142                        trace!(target: "engine::tree::bal_prewarm_pool", %err, "failed to build provider");
143                        None
144                    }
145                };
146            }
147            PrewarmMsg::Warm(target) => {
148                let Some(provider) = provider.as_ref() else { continue };
149                match target {
150                    PrewarmTarget::Account(addr) => {
151                        if let Ok(Some(account)) = provider.basic_account(&addr) &&
152                            let Some(code_hash) = account.bytecode_hash &&
153                            code_hash != alloy_consensus::constants::KECCAK_EMPTY
154                        {
155                            let _ = provider.bytecode_by_hash(&code_hash);
156                        }
157                    }
158                    PrewarmTarget::Storage(addr, slot) => {
159                        let _ = provider.storage(addr, slot);
160                    }
161                }
162            }
163            PrewarmMsg::EndBlock(end_tx) => {
164                provider = None;
165                drop(end_tx);
166            }
167        }
168    }
169}
170
171struct SendOnDrop {
172    sender: Option<oneshot::Sender<()>>,
173}
174
175impl Drop for SendOnDrop {
176    fn drop(&mut self) {
177        if let Some(sender) = self.sender.take() {
178            let _ = sender.send(());
179        }
180    }
181}