Skip to main content

reth_tasks/
runtime.rs

1//! Centralized management of async and parallel execution.
2//!
3//! This module provides [`Runtime`], a cheaply cloneable handle that manages:
4//! - Tokio runtime (either owned or attached)
5//! - Task spawning with shutdown awareness and panic monitoring
6//! - Dedicated rayon thread pools for different workloads (with `rayon` feature)
7//! - [`BlockingTaskGuard`] for rate-limiting expensive operations (with `rayon` feature)
8
9#[cfg(feature = "rayon")]
10use crate::pool::{build_pool_with_panic_handler, BlockingTaskGuard, BlockingTaskPool, WorkerPool};
11use crate::{
12    metrics::{IncCounterOnDrop, TaskExecutorMetrics},
13    shutdown::{GracefulShutdown, GracefulShutdownGuard, Shutdown},
14    worker_map::WorkerMap,
15    PanickedTaskError, TaskEvent, TaskManager,
16};
17use futures_util::{future::select, Future, FutureExt, TryFutureExt};
18#[cfg(feature = "rayon")]
19use std::{num::NonZeroUsize, thread::available_parallelism};
20use std::{
21    pin::pin,
22    sync::{
23        atomic::{AtomicUsize, Ordering},
24        Arc, Mutex,
25    },
26    thread,
27    time::{Duration, Instant},
28};
29use tokio::{runtime::Handle, sync::mpsc::UnboundedSender, task::JoinHandle};
30use tracing::{debug, error};
31use tracing_futures::Instrument;
32
33use tokio::runtime::Runtime as TokioRuntime;
34
35/// Default thread keep-alive duration for the tokio runtime.
36pub const DEFAULT_THREAD_KEEP_ALIVE: Duration = Duration::from_secs(15);
37
38/// Default reserved CPU cores for OS and other processes.
39pub const DEFAULT_RESERVED_CPU_CORES: usize = 2;
40
41/// Default number of threads for the storage I/O pool.
42pub const DEFAULT_STORAGE_POOL_THREADS: usize = 16;
43
44/// Default number of threads for the state trie overlay worker pool.
45#[cfg(feature = "rayon")]
46pub const DEFAULT_STATE_TRIE_OVERLAY_WORKER_THREADS: usize = 4;
47
48/// Default maximum number of concurrent blocking tasks (for RPC tracing guard).
49pub const DEFAULT_MAX_BLOCKING_TASKS: usize = 512;
50
51/// Configuration for the tokio runtime.
52#[derive(Debug, Clone)]
53pub enum TokioConfig {
54    /// Build and own a new tokio runtime.
55    Owned {
56        /// Number of worker threads. If `None`, uses tokio's default (number of CPU cores).
57        worker_threads: Option<usize>,
58        /// How long to keep worker threads alive when idle.
59        thread_keep_alive: Duration,
60        /// Thread name prefix.
61        thread_name: &'static str,
62    },
63    /// Attach to an existing tokio runtime handle.
64    ExistingHandle(Handle),
65}
66
67impl Default for TokioConfig {
68    fn default() -> Self {
69        Self::Owned {
70            worker_threads: None,
71            thread_keep_alive: DEFAULT_THREAD_KEEP_ALIVE,
72            thread_name: "tokio-rt",
73        }
74    }
75}
76
77impl TokioConfig {
78    /// Create a config that attaches to an existing runtime handle.
79    pub const fn existing_handle(handle: Handle) -> Self {
80        Self::ExistingHandle(handle)
81    }
82
83    /// Create a config for an owned runtime with the specified number of worker threads.
84    pub const fn with_worker_threads(worker_threads: usize) -> Self {
85        Self::Owned {
86            worker_threads: Some(worker_threads),
87            thread_keep_alive: DEFAULT_THREAD_KEEP_ALIVE,
88            thread_name: "tokio-rt",
89        }
90    }
91}
92
93/// Configuration for the rayon thread pools.
94#[derive(Debug, Clone)]
95#[cfg(feature = "rayon")]
96pub struct RayonConfig {
97    /// Number of threads for the general CPU pool.
98    /// If `None`, derived from available parallelism minus reserved cores.
99    pub cpu_threads: Option<usize>,
100    /// Number of CPU cores to reserve for OS and other processes.
101    pub reserved_cpu_cores: usize,
102    /// Number of threads for the RPC blocking pool (trace calls, `eth_getProof`, etc.).
103    /// If `None`, uses the same as `cpu_threads`.
104    pub rpc_threads: Option<usize>,
105    /// Number of threads for the storage I/O pool (static file, `RocksDB` writes in
106    /// `save_blocks`). If `None`, uses [`DEFAULT_STORAGE_POOL_THREADS`].
107    pub storage_threads: Option<usize>,
108    /// Maximum number of concurrent blocking tasks for the RPC guard semaphore.
109    pub max_blocking_tasks: usize,
110    /// Number of threads for the proof storage worker pool (trie storage proof workers).
111    /// If `None`, derived from available parallelism.
112    pub proof_storage_worker_threads: Option<usize>,
113    /// Number of threads for the proof account worker pool (trie account proof workers).
114    /// If `None`, derived from available parallelism.
115    pub proof_account_worker_threads: Option<usize>,
116    /// Number of threads for the prewarming pool (execution prewarming workers).
117    /// If `None`, derived from available parallelism.
118    pub prewarming_threads: Option<usize>,
119    /// Number of threads for the BAL streaming pool (BAL hashed state streaming).
120    /// If `None`, derived from available parallelism.
121    pub bal_streaming_threads: Option<usize>,
122    /// Number of threads for the state trie overlay worker pool.
123    /// If `None`, uses [`DEFAULT_STATE_TRIE_OVERLAY_WORKER_THREADS`].
124    pub state_trie_overlay_worker_threads: Option<usize>,
125}
126
127#[cfg(feature = "rayon")]
128impl Default for RayonConfig {
129    fn default() -> Self {
130        Self {
131            cpu_threads: None,
132            reserved_cpu_cores: DEFAULT_RESERVED_CPU_CORES,
133            rpc_threads: None,
134            storage_threads: None,
135            max_blocking_tasks: DEFAULT_MAX_BLOCKING_TASKS,
136            proof_storage_worker_threads: None,
137            proof_account_worker_threads: None,
138            prewarming_threads: None,
139            bal_streaming_threads: None,
140            state_trie_overlay_worker_threads: None,
141        }
142    }
143}
144
145#[cfg(feature = "rayon")]
146impl RayonConfig {
147    /// Set the number of reserved CPU cores.
148    pub const fn with_reserved_cpu_cores(mut self, reserved_cpu_cores: usize) -> Self {
149        self.reserved_cpu_cores = reserved_cpu_cores;
150        self
151    }
152
153    /// Set the maximum number of concurrent blocking tasks.
154    pub const fn with_max_blocking_tasks(mut self, max_blocking_tasks: usize) -> Self {
155        self.max_blocking_tasks = max_blocking_tasks;
156        self
157    }
158
159    /// Set the number of threads for the RPC blocking pool.
160    pub const fn with_rpc_threads(mut self, rpc_threads: usize) -> Self {
161        self.rpc_threads = Some(rpc_threads);
162        self
163    }
164
165    /// Set the number of threads for the storage I/O pool.
166    pub const fn with_storage_threads(mut self, storage_threads: usize) -> Self {
167        self.storage_threads = Some(storage_threads);
168        self
169    }
170
171    /// Set the number of threads for the proof storage worker pool.
172    pub const fn with_proof_storage_worker_threads(
173        mut self,
174        proof_storage_worker_threads: usize,
175    ) -> Self {
176        self.proof_storage_worker_threads = Some(proof_storage_worker_threads);
177        self
178    }
179
180    /// Set the number of threads for the proof account worker pool.
181    pub const fn with_proof_account_worker_threads(
182        mut self,
183        proof_account_worker_threads: usize,
184    ) -> Self {
185        self.proof_account_worker_threads = Some(proof_account_worker_threads);
186        self
187    }
188
189    /// Set the number of threads for the prewarming pool.
190    pub const fn with_prewarming_threads(mut self, prewarming_threads: usize) -> Self {
191        self.prewarming_threads = Some(prewarming_threads);
192        self
193    }
194
195    /// Set the number of threads for the BAL streaming pool.
196    pub const fn with_bal_streaming_threads(mut self, bal_streaming_threads: usize) -> Self {
197        self.bal_streaming_threads = Some(bal_streaming_threads);
198        self
199    }
200
201    /// Set the number of threads for the state trie overlay worker pool.
202    pub const fn with_state_trie_overlay_worker_threads(
203        mut self,
204        state_trie_overlay_worker_threads: usize,
205    ) -> Self {
206        self.state_trie_overlay_worker_threads = Some(state_trie_overlay_worker_threads);
207        self
208    }
209
210    /// Compute the default number of threads based on available parallelism.
211    fn default_thread_count(&self) -> usize {
212        // TODO: reserved_cpu_cores is currently ignored because subtracting from thread pool
213        // sizes doesn't actually reserve CPU cores for other processes.
214        let _ = self.reserved_cpu_cores;
215        self.cpu_threads.unwrap_or_else(|| available_parallelism().map_or(1, NonZeroUsize::get))
216    }
217}
218
219/// Configuration for building a [`Runtime`].
220#[derive(Debug, Clone, Default)]
221pub struct RuntimeConfig {
222    /// Tokio runtime configuration.
223    pub tokio: TokioConfig,
224    /// Rayon thread pool configuration.
225    #[cfg(feature = "rayon")]
226    pub rayon: RayonConfig,
227}
228
229impl RuntimeConfig {
230    /// Set the tokio configuration.
231    pub fn with_tokio(mut self, tokio: TokioConfig) -> Self {
232        self.tokio = tokio;
233        self
234    }
235
236    /// Set the rayon configuration.
237    #[cfg(feature = "rayon")]
238    pub const fn with_rayon(mut self, rayon: RayonConfig) -> Self {
239        self.rayon = rayon;
240        self
241    }
242}
243
244/// Error returned when [`RuntimeBuilder::build`] fails.
245#[derive(Debug, thiserror::Error)]
246pub enum RuntimeBuildError {
247    /// Failed to build the tokio runtime.
248    #[error("Failed to build tokio runtime: {0}")]
249    TokioBuild(#[from] std::io::Error),
250    /// Failed to build a rayon thread pool.
251    #[cfg(feature = "rayon")]
252    #[error("Failed to build rayon thread pool: {0}")]
253    RayonBuild(#[from] rayon::ThreadPoolBuildError),
254}
255
256// ── RuntimeInner ──────────────────────────────────────────────────────
257
258struct RuntimeInner {
259    /// Owned tokio runtime, if we built one. Kept alive via the `Arc<RuntimeInner>`.
260    _tokio_runtime: Option<TokioRuntime>,
261    /// Handle to the tokio runtime.
262    handle: Handle,
263    /// Receiver of the shutdown signal.
264    on_shutdown: Shutdown,
265    /// Sender half for sending task events to the [`TaskManager`].
266    task_events_tx: UnboundedSender<TaskEvent>,
267    /// Task executor metrics.
268    metrics: TaskExecutorMetrics,
269    /// How many [`GracefulShutdown`] tasks are currently active.
270    graceful_tasks: Arc<AtomicUsize>,
271    /// General-purpose rayon CPU pool.
272    #[cfg(feature = "rayon")]
273    cpu_pool: rayon::ThreadPool,
274    /// RPC blocking pool.
275    #[cfg(feature = "rayon")]
276    rpc_pool: BlockingTaskPool,
277    /// Storage I/O pool.
278    #[cfg(feature = "rayon")]
279    storage_pool: rayon::ThreadPool,
280    /// Rate limiter for expensive RPC operations.
281    #[cfg(feature = "rayon")]
282    blocking_guard: BlockingTaskGuard,
283    /// Proof storage worker pool (trie storage proof computation).
284    #[cfg(feature = "rayon")]
285    proof_storage_worker_pool: WorkerPool,
286    /// Proof account worker pool (trie account proof computation).
287    #[cfg(feature = "rayon")]
288    proof_account_worker_pool: WorkerPool,
289    /// Prewarming pool (execution prewarming workers).
290    #[cfg(feature = "rayon")]
291    prewarming_pool: WorkerPool,
292    /// BAL streaming pool (BAL hashed state streaming).
293    #[cfg(feature = "rayon")]
294    bal_streaming_pool: WorkerPool,
295    /// State trie overlay worker pool.
296    #[cfg(feature = "rayon")]
297    state_trie_overlay_worker_pool: Arc<WorkerPool>,
298    /// Named single-thread worker map. Each unique name gets a dedicated OS thread
299    /// that is reused across all tasks submitted under that name.
300    worker_map: WorkerMap,
301    /// Handle to the spawned [`TaskManager`] background task.
302    /// The task monitors critical tasks for panics and fires the shutdown signal.
303    /// Can be taken via [`Runtime::take_task_manager_handle`] to poll for panic errors.
304    task_manager_handle: Mutex<Option<JoinHandle<Result<(), PanickedTaskError>>>>,
305}
306
307// ── Runtime ───────────────────────────────────────────────────────────
308
309/// A cheaply cloneable handle to the runtime resources.
310///
311/// Wraps an `Arc<RuntimeInner>` and provides access to:
312/// - The tokio [`Handle`]
313/// - Task spawning with shutdown awareness and panic monitoring
314/// - Rayon thread pools (with `rayon` feature)
315#[derive(Clone)]
316pub struct Runtime(Arc<RuntimeInner>);
317
318impl std::fmt::Debug for Runtime {
319    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
320        f.debug_struct("Runtime").field("handle", &self.0.handle).finish()
321    }
322}
323
324// ── Pool accessors ────────────────────────────────────────────────────
325
326impl Runtime {
327    /// Takes the [`TaskManager`] handle out of this runtime, if one is stored.
328    ///
329    /// The handle resolves with `Err(PanickedTaskError)` if a critical task panicked,
330    /// or `Ok(())` if shutdown was requested. If not taken, the background task still
331    /// runs and logs panics at `debug!` level.
332    pub fn take_task_manager_handle(&self) -> Option<JoinHandle<Result<(), PanickedTaskError>>> {
333        self.0.task_manager_handle.lock().unwrap().take()
334    }
335
336    /// Returns the tokio runtime [`Handle`].
337    pub fn handle(&self) -> &Handle {
338        &self.0.handle
339    }
340
341    /// Get the general-purpose rayon CPU thread pool.
342    #[cfg(feature = "rayon")]
343    pub fn cpu_pool(&self) -> &rayon::ThreadPool {
344        &self.0.cpu_pool
345    }
346
347    /// Get the RPC blocking task pool.
348    #[cfg(feature = "rayon")]
349    pub fn rpc_pool(&self) -> &BlockingTaskPool {
350        &self.0.rpc_pool
351    }
352
353    /// Get the storage I/O pool.
354    #[cfg(feature = "rayon")]
355    pub fn storage_pool(&self) -> &rayon::ThreadPool {
356        &self.0.storage_pool
357    }
358
359    /// Get a clone of the [`BlockingTaskGuard`].
360    #[cfg(feature = "rayon")]
361    pub fn blocking_guard(&self) -> BlockingTaskGuard {
362        self.0.blocking_guard.clone()
363    }
364
365    /// Get the proof storage worker pool.
366    #[cfg(feature = "rayon")]
367    pub fn proof_storage_worker_pool(&self) -> &WorkerPool {
368        &self.0.proof_storage_worker_pool
369    }
370
371    /// Get the proof account worker pool.
372    #[cfg(feature = "rayon")]
373    pub fn proof_account_worker_pool(&self) -> &WorkerPool {
374        &self.0.proof_account_worker_pool
375    }
376
377    /// Get the prewarming pool.
378    #[cfg(feature = "rayon")]
379    pub fn prewarming_pool(&self) -> &WorkerPool {
380        &self.0.prewarming_pool
381    }
382
383    /// Get the BAL streaming pool.
384    #[cfg(feature = "rayon")]
385    pub fn bal_streaming_pool(&self) -> &WorkerPool {
386        &self.0.bal_streaming_pool
387    }
388
389    /// Get the state trie overlay worker pool.
390    #[cfg(feature = "rayon")]
391    pub fn state_trie_overlay_worker_pool(&self) -> Arc<WorkerPool> {
392        Arc::clone(&self.0.state_trie_overlay_worker_pool)
393    }
394}
395
396// ── Test helpers ──────────────────────────────────────────────────────
397
398impl Runtime {
399    /// Creates a lightweight [`Runtime`] for tests with minimal thread pools.
400    ///
401    /// If called from within a tokio runtime (e.g. `#[tokio::test]`), attaches to the existing
402    /// handle to avoid shutdown panics when the test runtime is dropped.
403    pub fn test() -> Self {
404        let config = match Handle::try_current() {
405            Ok(handle) => Self::test_config().with_tokio(TokioConfig::existing_handle(handle)),
406            Err(_) => Self::test_config(),
407        };
408        RuntimeBuilder::new(config).build().expect("failed to build test Runtime")
409    }
410
411    const fn test_config() -> RuntimeConfig {
412        RuntimeConfig {
413            tokio: TokioConfig::Owned {
414                worker_threads: Some(2),
415                thread_keep_alive: DEFAULT_THREAD_KEEP_ALIVE,
416                thread_name: "tokio-test",
417            },
418            #[cfg(feature = "rayon")]
419            rayon: RayonConfig {
420                cpu_threads: Some(2),
421                reserved_cpu_cores: 0,
422                rpc_threads: Some(2),
423                storage_threads: Some(2),
424                max_blocking_tasks: 16,
425                proof_storage_worker_threads: Some(2),
426                proof_account_worker_threads: Some(2),
427                prewarming_threads: Some(2),
428                bal_streaming_threads: Some(2),
429                state_trie_overlay_worker_threads: Some(2),
430            },
431        }
432    }
433}
434
435// ── Spawn methods ─────────────────────────────────────────────────────
436
437/// Determines how a task is spawned.
438enum TaskKind {
439    /// Spawn the task to the default executor [`Handle::spawn`].
440    Default,
441    /// Spawn the task to the blocking executor [`Handle::spawn_blocking`].
442    Blocking,
443}
444
445impl Runtime {
446    /// Returns the receiver of the shutdown signal.
447    pub fn on_shutdown_signal(&self) -> &Shutdown {
448        &self.0.on_shutdown
449    }
450
451    /// Spawns a future on the tokio runtime depending on the [`TaskKind`].
452    fn spawn_on_rt<F>(&self, fut: F, task_kind: TaskKind) -> JoinHandle<()>
453    where
454        F: Future<Output = ()> + Send + 'static,
455    {
456        match task_kind {
457            TaskKind::Default => self.0.handle.spawn(fut),
458            TaskKind::Blocking => {
459                let handle = self.0.handle.clone();
460                self.0.handle.spawn_blocking(move || handle.block_on(fut))
461            }
462        }
463    }
464
465    /// Spawns a regular task depending on the given [`TaskKind`].
466    fn spawn_task_as<F>(&self, fut: F, task_kind: TaskKind) -> JoinHandle<()>
467    where
468        F: Future<Output = ()> + Send + 'static,
469    {
470        match task_kind {
471            TaskKind::Default => self.0.metrics.inc_regular_tasks(),
472            TaskKind::Blocking => self.0.metrics.inc_regular_blocking_tasks(),
473        }
474        let on_shutdown = self.0.on_shutdown.clone();
475
476        let finished_counter = match task_kind {
477            TaskKind::Default => self.0.metrics.finished_regular_tasks_total.clone(),
478            TaskKind::Blocking => self.0.metrics.finished_regular_blocking_tasks_total.clone(),
479        };
480
481        let task = {
482            async move {
483                let _inc_counter_on_drop = IncCounterOnDrop::new(finished_counter);
484                let fut = pin!(fut);
485                let _ = select(on_shutdown, fut).await;
486            }
487        }
488        .in_current_span();
489
490        self.spawn_on_rt(task, task_kind)
491    }
492
493    /// Spawns the task onto the runtime.
494    /// The given future resolves as soon as the [Shutdown] signal is received.
495    ///
496    /// See also [`Handle::spawn`].
497    pub fn spawn_task<F>(&self, fut: F) -> JoinHandle<()>
498    where
499        F: Future<Output = ()> + Send + 'static,
500    {
501        self.spawn_task_as(fut, TaskKind::Default)
502    }
503
504    /// Spawns a blocking task onto the runtime.
505    /// The given future resolves as soon as the [Shutdown] signal is received.
506    ///
507    /// See also [`Handle::spawn_blocking`].
508    pub fn spawn_blocking_task<F>(&self, fut: F) -> JoinHandle<()>
509    where
510        F: Future<Output = ()> + Send + 'static,
511    {
512        self.spawn_task_as(fut, TaskKind::Blocking)
513    }
514
515    /// Spawns a blocking closure directly on the tokio runtime, bypassing shutdown
516    /// awareness. Useful for raw CPU-bound work.
517    pub fn spawn_blocking<F, R>(&self, func: F) -> JoinHandle<R>
518    where
519        F: FnOnce() -> R + Send + 'static,
520        R: Send + 'static,
521    {
522        self.0.handle.spawn_blocking(func)
523    }
524
525    /// Moves the given value to a dedicated background thread for deallocation.
526    ///
527    /// This is useful when dropping a value is expensive (e.g. large nested collections)
528    /// and should not block the current task. Uses a persistent named thread (`"drop"`)
529    /// to avoid thread creation overhead on hot paths.
530    pub fn spawn_drop<T: Send + 'static>(&self, value: T) {
531        self.spawn_blocking_named("drop", move || drop(value));
532    }
533
534    /// Spawns a blocking closure on a dedicated, named OS thread.
535    ///
536    /// Unlike [`spawn_blocking`](Self::spawn_blocking) which uses tokio's blocking thread pool,
537    /// this reuses the same OS thread for all tasks submitted under the same `name`. The thread
538    /// is created lazily on first use and its OS thread name is set to `name`.
539    ///
540    /// This is useful for tasks that benefit from running on a stable thread, e.g. for
541    /// thread-local state reuse or to avoid thread creation overhead on hot paths.
542    ///
543    /// Returns a [`LazyHandle`](crate::LazyHandle) handle that resolves on first access and caches
544    /// the result.
545    pub fn spawn_blocking_named<F, R>(&self, name: &'static str, func: F) -> crate::LazyHandle<R>
546    where
547        F: FnOnce() -> R + Send + 'static,
548        R: Send + 'static,
549    {
550        crate::LazyHandle::new(self.0.worker_map.spawn_on(name, func))
551    }
552
553    /// Attempts to spawn a blocking closure on a dedicated, named OS thread.
554    ///
555    /// Returns `None` if the named worker already has a task running or queued, allowing the caller
556    /// to fall back to another executor instead of serializing behind the named worker.
557    pub fn try_spawn_blocking_named<F, R>(
558        &self,
559        name: &'static str,
560        func: F,
561    ) -> Option<crate::LazyHandle<R>>
562    where
563        F: FnOnce() -> R + Send + 'static,
564        R: Send + 'static,
565    {
566        self.0.worker_map.try_spawn_on(name, func).map(crate::LazyHandle::new)
567    }
568
569    /// Spawns a blocking closure on a named OS thread if it is idle, otherwise falls back to
570    /// tokio's unnamed blocking thread pool.
571    ///
572    /// Returns `true` if the closure was spawned on the named thread.
573    pub fn spawn_blocking_named_or_tokio<F>(&self, name: &'static str, func: F) -> bool
574    where
575        F: FnOnce() + Send + 'static,
576    {
577        let func = Arc::new(parking_lot::Mutex::new(Some(func)));
578        let named_func = func.clone();
579
580        if self
581            .try_spawn_blocking_named(name, move || {
582                if let Some(func) = named_func.lock().take() {
583                    func();
584                }
585            })
586            .is_some()
587        {
588            return true
589        }
590
591        if let Some(func) = func.lock().take() {
592            self.spawn_blocking(func);
593        }
594        false
595    }
596
597    /// Spawns the task onto the runtime.
598    /// The given future resolves as soon as the [Shutdown] signal is received.
599    ///
600    /// See also [`Handle::spawn`].
601    pub fn spawn_with_signal<F>(&self, f: impl FnOnce(Shutdown) -> F) -> JoinHandle<()>
602    where
603        F: Future<Output = ()> + Send + 'static,
604    {
605        let on_shutdown = self.0.on_shutdown.clone();
606        let fut = f(on_shutdown);
607        let task = fut.in_current_span();
608        self.0.handle.spawn(task)
609    }
610
611    /// Spawns a critical task depending on the given [`TaskKind`].
612    fn spawn_critical_as<F>(
613        &self,
614        name: &'static str,
615        fut: F,
616        task_kind: TaskKind,
617    ) -> JoinHandle<()>
618    where
619        F: Future<Output = ()> + Send + 'static,
620    {
621        self.0.metrics.inc_critical_tasks();
622        let panicked_tasks_tx = self.0.task_events_tx.clone();
623        let on_shutdown = self.0.on_shutdown.clone();
624
625        // wrap the task in catch unwind
626        let task = std::panic::AssertUnwindSafe(fut)
627            .catch_unwind()
628            .map_err(move |error| {
629                let task_error = PanickedTaskError::new(name, error);
630                error!("{task_error}");
631                let _ = panicked_tasks_tx.send(TaskEvent::Panic(task_error));
632            })
633            .in_current_span();
634
635        let finished_critical_tasks_total_metrics =
636            self.0.metrics.finished_critical_tasks_total.clone();
637        let task = async move {
638            let _inc_counter_on_drop = IncCounterOnDrop::new(finished_critical_tasks_total_metrics);
639            let task = pin!(task);
640            let _ = select(on_shutdown, task).await;
641        };
642
643        self.spawn_on_rt(task, task_kind)
644    }
645
646    /// This spawns a critical task onto the runtime.
647    /// The given future resolves as soon as the [Shutdown] signal is received.
648    ///
649    /// If this task panics, the [`TaskManager`] is notified.
650    pub fn spawn_critical_task<F>(&self, name: &'static str, fut: F) -> JoinHandle<()>
651    where
652        F: Future<Output = ()> + Send + 'static,
653    {
654        self.spawn_critical_as(name, fut, TaskKind::Default)
655    }
656
657    /// This spawns a critical blocking task onto the runtime.
658    /// The given future resolves as soon as the [Shutdown] signal is received.
659    ///
660    /// If this task panics, the [`TaskManager`] is notified.
661    pub fn spawn_critical_blocking_task<F>(&self, name: &'static str, fut: F) -> JoinHandle<()>
662    where
663        F: Future<Output = ()> + Send + 'static,
664    {
665        self.spawn_critical_as(name, fut, TaskKind::Blocking)
666    }
667
668    /// This spawns a critical task onto a dedicated named OS thread.
669    /// The given future resolves as soon as the [`Shutdown`] signal is received.
670    ///
671    /// If this task panics, the [`TaskManager`] is notified.
672    pub fn spawn_critical_os_thread<F>(
673        &self,
674        thread_name: &'static str,
675        task_name: &'static str,
676        fut: F,
677    ) -> thread::JoinHandle<()>
678    where
679        F: Future<Output = ()> + Send + 'static,
680    {
681        self.0.metrics.inc_critical_tasks();
682        let handle = self.0.handle.clone();
683        let panicked_tasks_tx = self.0.task_events_tx.clone();
684        let on_shutdown = self.0.on_shutdown.clone();
685
686        let task = std::panic::AssertUnwindSafe(fut)
687            .catch_unwind()
688            .map_err(move |error| {
689                let task_error = PanickedTaskError::new(task_name, error);
690                error!("{task_error}");
691                let _ = panicked_tasks_tx.send(TaskEvent::Panic(task_error));
692            })
693            .in_current_span();
694
695        let finished_critical_tasks_total_metrics =
696            self.0.metrics.finished_critical_tasks_total.clone();
697        let task = async move {
698            let _inc_counter_on_drop = IncCounterOnDrop::new(finished_critical_tasks_total_metrics);
699            let task = pin!(task);
700            let _ = select(on_shutdown, task).await;
701        };
702
703        thread::Builder::new()
704            .name(thread_name.to_string())
705            .spawn(move || {
706                let _guard = handle.enter();
707                handle.block_on(task);
708            })
709            .unwrap_or_else(|e| panic!("failed to spawn critical OS thread {thread_name:?}: {e}"))
710    }
711
712    /// This spawns a critical task onto the runtime.
713    ///
714    /// If this task panics, the [`TaskManager`] is notified.
715    /// The [`TaskManager`] will wait until the given future has completed before shutting down.
716    ///
717    /// # Example
718    ///
719    /// ```no_run
720    /// # async fn t(executor: reth_tasks::TaskExecutor) {
721    ///
722    /// executor.spawn_critical_with_graceful_shutdown_signal("grace", async move |shutdown| {
723    ///     // await the shutdown signal
724    ///     let guard = shutdown.await;
725    ///     // do work before exiting the program
726    ///     tokio::time::sleep(std::time::Duration::from_secs(1)).await;
727    ///     // allow graceful shutdown
728    ///     drop(guard);
729    /// });
730    /// # }
731    /// ```
732    pub fn spawn_critical_with_graceful_shutdown_signal<F>(
733        &self,
734        name: &'static str,
735        f: impl FnOnce(GracefulShutdown) -> F,
736    ) -> JoinHandle<()>
737    where
738        F: Future<Output = ()> + Send + 'static,
739    {
740        let panicked_tasks_tx = self.0.task_events_tx.clone();
741        let on_shutdown = GracefulShutdown::new(
742            self.0.on_shutdown.clone(),
743            GracefulShutdownGuard::new(Arc::clone(&self.0.graceful_tasks)),
744        );
745        let fut = f(on_shutdown);
746
747        // wrap the task in catch unwind
748        let task = std::panic::AssertUnwindSafe(fut)
749            .catch_unwind()
750            .map_err(move |error| {
751                let task_error = PanickedTaskError::new(name, error);
752                error!("{task_error}");
753                let _ = panicked_tasks_tx.send(TaskEvent::Panic(task_error));
754            })
755            .map(drop)
756            .in_current_span();
757
758        self.0.handle.spawn(task)
759    }
760
761    /// This spawns a regular task onto the runtime.
762    ///
763    /// The [`TaskManager`] will wait until the given future has completed before shutting down.
764    ///
765    /// # Example
766    ///
767    /// ```no_run
768    /// # async fn t(executor: reth_tasks::TaskExecutor) {
769    ///
770    /// executor.spawn_with_graceful_shutdown_signal(async move |shutdown| {
771    ///     // await the shutdown signal
772    ///     let guard = shutdown.await;
773    ///     // do work before exiting the program
774    ///     tokio::time::sleep(std::time::Duration::from_secs(1)).await;
775    ///     // allow graceful shutdown
776    ///     drop(guard);
777    /// });
778    /// # }
779    /// ```
780    pub fn spawn_with_graceful_shutdown_signal<F>(
781        &self,
782        f: impl FnOnce(GracefulShutdown) -> F,
783    ) -> JoinHandle<()>
784    where
785        F: Future<Output = ()> + Send + 'static,
786    {
787        let on_shutdown = GracefulShutdown::new(
788            self.0.on_shutdown.clone(),
789            GracefulShutdownGuard::new(Arc::clone(&self.0.graceful_tasks)),
790        );
791        let fut = f(on_shutdown);
792
793        self.0.handle.spawn(fut)
794    }
795
796    /// Sends a request to the `TaskManager` to initiate a graceful shutdown.
797    ///
798    /// Caution: This will terminate the entire program.
799    pub fn initiate_graceful_shutdown(
800        &self,
801    ) -> Result<GracefulShutdown, tokio::sync::mpsc::error::SendError<()>> {
802        self.0
803            .task_events_tx
804            .send(TaskEvent::GracefulShutdown)
805            .map_err(|_send_error_with_task_event| tokio::sync::mpsc::error::SendError(()))?;
806
807        Ok(GracefulShutdown::new(
808            self.0.on_shutdown.clone(),
809            GracefulShutdownGuard::new(Arc::clone(&self.0.graceful_tasks)),
810        ))
811    }
812
813    /// Fires the shutdown signal and waits until all graceful tasks complete.
814    pub fn graceful_shutdown(&self) {
815        let _ = self.do_graceful_shutdown(None);
816    }
817
818    /// Fires the shutdown signal and waits until all graceful tasks complete or the timeout
819    /// elapses.
820    ///
821    /// Returns `true` if all tasks completed before the timeout.
822    pub fn graceful_shutdown_with_timeout(&self, timeout: Duration) -> bool {
823        self.do_graceful_shutdown(Some(timeout))
824    }
825
826    fn do_graceful_shutdown(&self, timeout: Option<Duration>) -> bool {
827        let _ = self.0.task_events_tx.send(TaskEvent::GracefulShutdown);
828        let deadline = timeout.map(|t| Instant::now() + t);
829        while self.0.graceful_tasks.load(Ordering::SeqCst) > 0 {
830            if deadline.is_some_and(|d| Instant::now() > d) {
831                debug!("graceful shutdown timed out");
832                return false;
833            }
834            std::thread::yield_now();
835        }
836        debug!("gracefully shut down");
837        true
838    }
839}
840
841// ── RuntimeBuilder ────────────────────────────────────────────────────
842
843/// Builder for constructing a [`Runtime`].
844#[derive(Debug, Clone)]
845pub struct RuntimeBuilder {
846    config: RuntimeConfig,
847}
848
849impl RuntimeBuilder {
850    /// Create a new builder with the given configuration.
851    pub const fn new(config: RuntimeConfig) -> Self {
852        Self { config }
853    }
854
855    /// Build the [`Runtime`].
856    ///
857    /// The [`TaskManager`] is automatically spawned as a background task that monitors
858    /// critical tasks for panics. Use [`Runtime::take_task_manager_handle`] to extract
859    /// the join handle if you need to poll for panic errors.
860    #[tracing::instrument(name = "RuntimeBuilder::build", level = "debug", skip_all)]
861    pub fn build(self) -> Result<Runtime, RuntimeBuildError> {
862        debug!(?self.config, "Building runtime");
863        let config = self.config;
864
865        let (owned_runtime, handle) = match &config.tokio {
866            TokioConfig::Owned { worker_threads, thread_keep_alive, thread_name } => {
867                let mut builder = tokio::runtime::Builder::new_multi_thread();
868                builder
869                    .enable_all()
870                    .thread_keep_alive(*thread_keep_alive)
871                    .thread_name(*thread_name);
872
873                if let Some(threads) = worker_threads {
874                    builder.worker_threads(*threads);
875                }
876
877                let runtime = builder.build()?;
878                let h = runtime.handle().clone();
879                (Some(runtime), h)
880            }
881            TokioConfig::ExistingHandle(h) => (None, h.clone()),
882        };
883
884        let (task_manager, on_shutdown, task_events_tx, graceful_tasks) =
885            TaskManager::new_parts(handle.clone());
886
887        #[cfg(feature = "rayon")]
888        let (
889            cpu_pool,
890            rpc_pool,
891            storage_pool,
892            blocking_guard,
893            proof_storage_worker_pool,
894            proof_account_worker_pool,
895            prewarming_pool,
896            bal_streaming_pool,
897            state_trie_overlay_worker_pool,
898        ) = {
899            let default_threads = config.rayon.default_thread_count();
900            let rpc_threads = config.rayon.rpc_threads.unwrap_or(default_threads);
901
902            let cpu_pool = build_pool_with_panic_handler(
903                rayon::ThreadPoolBuilder::new()
904                    .num_threads(default_threads)
905                    .thread_name(|i| format!("cpu-{i:02}")),
906            )?;
907
908            let rpc_raw = build_pool_with_panic_handler(
909                rayon::ThreadPoolBuilder::new()
910                    .num_threads(rpc_threads)
911                    .thread_name(|i| format!("rpc-{i:02}")),
912            )?;
913            let rpc_pool = BlockingTaskPool::new(rpc_raw);
914
915            let storage_threads =
916                config.rayon.storage_threads.unwrap_or(DEFAULT_STORAGE_POOL_THREADS);
917            let storage_pool = build_pool_with_panic_handler(
918                rayon::ThreadPoolBuilder::new()
919                    .num_threads(storage_threads)
920                    .thread_name(|i| format!("storage-{i:02}")),
921            )?;
922
923            let blocking_guard = BlockingTaskGuard::new(config.rayon.max_blocking_tasks);
924
925            let proof_storage_worker_threads =
926                config.rayon.proof_storage_worker_threads.unwrap_or(default_threads * 2);
927            let proof_storage_worker_pool =
928                WorkerPool::new(proof_storage_worker_threads, "proof-strg");
929
930            let proof_account_worker_threads =
931                config.rayon.proof_account_worker_threads.unwrap_or(default_threads * 2);
932            let proof_account_worker_pool =
933                WorkerPool::new(proof_account_worker_threads, "proof-acct");
934
935            let prewarming_threads = config.rayon.prewarming_threads.unwrap_or(default_threads);
936            let prewarming_pool = WorkerPool::new(prewarming_threads, "prewarm");
937
938            let bal_streaming_threads =
939                config.rayon.bal_streaming_threads.unwrap_or(default_threads);
940            let bal_streaming_pool = WorkerPool::new(bal_streaming_threads, "bal-stream");
941
942            let state_trie_overlay_worker_threads = config
943                .rayon
944                .state_trie_overlay_worker_threads
945                .unwrap_or(DEFAULT_STATE_TRIE_OVERLAY_WORKER_THREADS);
946            let state_trie_overlay_worker_pool =
947                Arc::new(WorkerPool::new(state_trie_overlay_worker_threads, "state-ovly"));
948
949            debug!(
950                default_threads,
951                rpc_threads,
952                storage_threads,
953                proof_storage_worker_threads,
954                proof_account_worker_threads,
955                prewarming_threads,
956                bal_streaming_threads,
957                state_trie_overlay_worker_threads,
958                max_blocking_tasks = config.rayon.max_blocking_tasks,
959                "Configured lazy rayon worker pools"
960            );
961
962            (
963                cpu_pool,
964                rpc_pool,
965                storage_pool,
966                blocking_guard,
967                proof_storage_worker_pool,
968                proof_account_worker_pool,
969                prewarming_pool,
970                bal_streaming_pool,
971                state_trie_overlay_worker_pool,
972            )
973        };
974
975        let task_manager_handle = handle.spawn(async move {
976            let result = task_manager.await;
977            if let Err(ref err) = result {
978                debug!("{err}");
979            }
980            result
981        });
982
983        let inner = RuntimeInner {
984            _tokio_runtime: owned_runtime,
985            handle,
986            on_shutdown,
987            task_events_tx,
988            metrics: Default::default(),
989            graceful_tasks,
990            #[cfg(feature = "rayon")]
991            cpu_pool,
992            #[cfg(feature = "rayon")]
993            rpc_pool,
994            #[cfg(feature = "rayon")]
995            storage_pool,
996            #[cfg(feature = "rayon")]
997            blocking_guard,
998            #[cfg(feature = "rayon")]
999            proof_storage_worker_pool,
1000            #[cfg(feature = "rayon")]
1001            proof_account_worker_pool,
1002            #[cfg(feature = "rayon")]
1003            prewarming_pool,
1004            #[cfg(feature = "rayon")]
1005            bal_streaming_pool,
1006            #[cfg(feature = "rayon")]
1007            state_trie_overlay_worker_pool,
1008            worker_map: WorkerMap::new(),
1009            task_manager_handle: Mutex::new(Some(task_manager_handle)),
1010        };
1011
1012        Ok(Runtime(Arc::new(inner)))
1013    }
1014}
1015
1016#[cfg(test)]
1017mod tests {
1018    use super::*;
1019
1020    #[test]
1021    fn test_runtime_config_default() {
1022        let config = RuntimeConfig::default();
1023        assert!(matches!(config.tokio, TokioConfig::Owned { .. }));
1024    }
1025
1026    #[test]
1027    fn test_runtime_config_existing_handle() {
1028        let rt = TokioRuntime::new().unwrap();
1029        let config =
1030            Runtime::test_config().with_tokio(TokioConfig::existing_handle(rt.handle().clone()));
1031        assert!(matches!(config.tokio, TokioConfig::ExistingHandle(_)));
1032    }
1033
1034    #[cfg(feature = "rayon")]
1035    #[test]
1036    fn test_rayon_config_thread_count() {
1037        let config = RayonConfig::default();
1038        let count = config.default_thread_count();
1039        assert!(count >= 1);
1040    }
1041
1042    #[test]
1043    fn test_runtime_builder() {
1044        let rt = TokioRuntime::new().unwrap();
1045        let config =
1046            Runtime::test_config().with_tokio(TokioConfig::existing_handle(rt.handle().clone()));
1047        let runtime = RuntimeBuilder::new(config).build().unwrap();
1048        let _ = runtime.handle();
1049    }
1050
1051    #[test]
1052    fn critical_os_thread_uses_requested_name() {
1053        let runtime = Runtime::test();
1054        let (tx, rx) = std::sync::mpsc::channel();
1055
1056        let handle = runtime.spawn_critical_os_thread(
1057            "critical-os-test",
1058            "critical os thread test",
1059            async move {
1060                let name = thread::current().name().unwrap().to_string();
1061                tx.send(name).unwrap();
1062            },
1063        );
1064
1065        let name = rx.recv_timeout(Duration::from_secs(5)).unwrap();
1066        assert_eq!(name, "critical-os-test");
1067        handle.join().unwrap();
1068    }
1069
1070    #[test]
1071    fn critical_os_thread_panic_is_reported() {
1072        let runtime = Runtime::test();
1073        let manager_handle = runtime.take_task_manager_handle().unwrap();
1074
1075        let handle = runtime.spawn_critical_os_thread(
1076            "critical-os-panic",
1077            "critical os thread panic test",
1078            async { panic!("critical os thread panic") },
1079        );
1080
1081        let err =
1082            runtime.handle().block_on(async move { manager_handle.await.unwrap().unwrap_err() });
1083        assert_eq!(err.task_name, "critical os thread panic test");
1084        assert_eq!(err.error, Some("critical os thread panic".to_string()));
1085        handle.join().unwrap();
1086    }
1087
1088    #[cfg(feature = "rayon")]
1089    #[test]
1090    fn test_worker_pools_are_lazy() {
1091        let runtime = Runtime::test();
1092
1093        // Worker pools are lazy — not initialized until first access.
1094        assert!(!runtime.0.bal_streaming_pool.is_initialized());
1095        assert!(!runtime.0.proof_storage_worker_pool.is_initialized());
1096        assert!(!runtime.0.state_trie_overlay_worker_pool.is_initialized());
1097
1098        // Accessing them triggers initialization and returns the configured thread count.
1099        assert_eq!(runtime.bal_streaming_pool().current_num_threads(), 2);
1100        assert!(runtime.0.bal_streaming_pool.is_initialized());
1101
1102        assert_eq!(runtime.proof_storage_worker_pool().current_num_threads(), 2);
1103        assert_eq!(runtime.proof_account_worker_pool().current_num_threads(), 2);
1104        assert_eq!(runtime.prewarming_pool().current_num_threads(), 2);
1105        assert_eq!(runtime.state_trie_overlay_worker_pool().current_num_threads(), 2);
1106    }
1107}