Skip to main content

reth_tasks/
runtime.rs

1//! Centralized management of async and parallel execution.
2//!
3//! This module provides [`Runtime`], a cheaply cloneable handle that manages:
4//! - Tokio runtime (either owned or attached)
5//! - Task spawning with shutdown awareness and panic monitoring
6//! - Dedicated rayon thread pools for different workloads (with `rayon` feature)
7//! - [`BlockingTaskGuard`] for rate-limiting expensive operations (with `rayon` feature)
8
9#[cfg(feature = "rayon")]
10use crate::pool::{build_pool_with_panic_handler, BlockingTaskGuard, BlockingTaskPool, WorkerPool};
11use crate::{
12    metrics::{IncCounterOnDrop, TaskExecutorMetrics},
13    shutdown::{GracefulShutdown, GracefulShutdownGuard, Shutdown},
14    worker_map::WorkerMap,
15    PanickedTaskError, TaskEvent, TaskManager,
16};
17use futures_util::{future::select, Future, FutureExt, TryFutureExt};
18#[cfg(feature = "rayon")]
19use std::{num::NonZeroUsize, thread::available_parallelism};
20use std::{
21    pin::pin,
22    sync::{
23        atomic::{AtomicUsize, Ordering},
24        Arc, Mutex,
25    },
26    thread,
27    time::{Duration, Instant},
28};
29use tokio::{runtime::Handle, sync::mpsc::UnboundedSender, task::JoinHandle};
30use tracing::{debug, error};
31use tracing_futures::Instrument;
32
33use tokio::runtime::Runtime as TokioRuntime;
34
35/// Default thread keep-alive duration for the tokio runtime.
36pub const DEFAULT_THREAD_KEEP_ALIVE: Duration = Duration::from_secs(15);
37
38/// Default reserved CPU cores for OS and other processes.
39pub const DEFAULT_RESERVED_CPU_CORES: usize = 2;
40
41/// Default number of threads for the storage I/O pool.
42pub const DEFAULT_STORAGE_POOL_THREADS: usize = 16;
43
44/// Default maximum number of concurrent blocking tasks (for RPC tracing guard).
45pub const DEFAULT_MAX_BLOCKING_TASKS: usize = 512;
46
47/// Configuration for the tokio runtime.
48#[derive(Debug, Clone)]
49pub enum TokioConfig {
50    /// Build and own a new tokio runtime.
51    Owned {
52        /// Number of worker threads. If `None`, uses tokio's default (number of CPU cores).
53        worker_threads: Option<usize>,
54        /// How long to keep worker threads alive when idle.
55        thread_keep_alive: Duration,
56        /// Thread name prefix.
57        thread_name: &'static str,
58    },
59    /// Attach to an existing tokio runtime handle.
60    ExistingHandle(Handle),
61}
62
63impl Default for TokioConfig {
64    fn default() -> Self {
65        Self::Owned {
66            worker_threads: None,
67            thread_keep_alive: DEFAULT_THREAD_KEEP_ALIVE,
68            thread_name: "tokio-rt",
69        }
70    }
71}
72
73impl TokioConfig {
74    /// Create a config that attaches to an existing runtime handle.
75    pub const fn existing_handle(handle: Handle) -> Self {
76        Self::ExistingHandle(handle)
77    }
78
79    /// Create a config for an owned runtime with the specified number of worker threads.
80    pub const fn with_worker_threads(worker_threads: usize) -> Self {
81        Self::Owned {
82            worker_threads: Some(worker_threads),
83            thread_keep_alive: DEFAULT_THREAD_KEEP_ALIVE,
84            thread_name: "tokio-rt",
85        }
86    }
87}
88
89/// Configuration for the rayon thread pools.
90#[derive(Debug, Clone)]
91#[cfg(feature = "rayon")]
92pub struct RayonConfig {
93    /// Number of threads for the general CPU pool.
94    /// If `None`, derived from available parallelism minus reserved cores.
95    pub cpu_threads: Option<usize>,
96    /// Number of CPU cores to reserve for OS and other processes.
97    pub reserved_cpu_cores: usize,
98    /// Number of threads for the RPC blocking pool (trace calls, `eth_getProof`, etc.).
99    /// If `None`, uses the same as `cpu_threads`.
100    pub rpc_threads: Option<usize>,
101    /// Number of threads for the storage I/O pool (static file, `RocksDB` writes in
102    /// `save_blocks`). If `None`, uses [`DEFAULT_STORAGE_POOL_THREADS`].
103    pub storage_threads: Option<usize>,
104    /// Maximum number of concurrent blocking tasks for the RPC guard semaphore.
105    pub max_blocking_tasks: usize,
106    /// Number of threads for the proof storage worker pool (trie storage proof workers).
107    /// If `None`, derived from available parallelism.
108    pub proof_storage_worker_threads: Option<usize>,
109    /// Number of threads for the proof account worker pool (trie account proof workers).
110    /// If `None`, derived from available parallelism.
111    pub proof_account_worker_threads: Option<usize>,
112    /// Number of threads for the prewarming pool (execution prewarming workers).
113    /// If `None`, derived from available parallelism.
114    pub prewarming_threads: Option<usize>,
115    /// Number of threads for the BAL streaming pool (BAL hashed state streaming).
116    /// If `None`, derived from available parallelism.
117    pub bal_streaming_threads: Option<usize>,
118}
119
120#[cfg(feature = "rayon")]
121impl Default for RayonConfig {
122    fn default() -> Self {
123        Self {
124            cpu_threads: None,
125            reserved_cpu_cores: DEFAULT_RESERVED_CPU_CORES,
126            rpc_threads: None,
127            storage_threads: None,
128            max_blocking_tasks: DEFAULT_MAX_BLOCKING_TASKS,
129            proof_storage_worker_threads: None,
130            proof_account_worker_threads: None,
131            prewarming_threads: None,
132            bal_streaming_threads: None,
133        }
134    }
135}
136
137#[cfg(feature = "rayon")]
138impl RayonConfig {
139    /// Set the number of reserved CPU cores.
140    pub const fn with_reserved_cpu_cores(mut self, reserved_cpu_cores: usize) -> Self {
141        self.reserved_cpu_cores = reserved_cpu_cores;
142        self
143    }
144
145    /// Set the maximum number of concurrent blocking tasks.
146    pub const fn with_max_blocking_tasks(mut self, max_blocking_tasks: usize) -> Self {
147        self.max_blocking_tasks = max_blocking_tasks;
148        self
149    }
150
151    /// Set the number of threads for the RPC blocking pool.
152    pub const fn with_rpc_threads(mut self, rpc_threads: usize) -> Self {
153        self.rpc_threads = Some(rpc_threads);
154        self
155    }
156
157    /// Set the number of threads for the storage I/O pool.
158    pub const fn with_storage_threads(mut self, storage_threads: usize) -> Self {
159        self.storage_threads = Some(storage_threads);
160        self
161    }
162
163    /// Set the number of threads for the proof storage worker pool.
164    pub const fn with_proof_storage_worker_threads(
165        mut self,
166        proof_storage_worker_threads: usize,
167    ) -> Self {
168        self.proof_storage_worker_threads = Some(proof_storage_worker_threads);
169        self
170    }
171
172    /// Set the number of threads for the proof account worker pool.
173    pub const fn with_proof_account_worker_threads(
174        mut self,
175        proof_account_worker_threads: usize,
176    ) -> Self {
177        self.proof_account_worker_threads = Some(proof_account_worker_threads);
178        self
179    }
180
181    /// Set the number of threads for the prewarming pool.
182    pub const fn with_prewarming_threads(mut self, prewarming_threads: usize) -> Self {
183        self.prewarming_threads = Some(prewarming_threads);
184        self
185    }
186
187    /// Set the number of threads for the BAL streaming pool.
188    pub const fn with_bal_streaming_threads(mut self, bal_streaming_threads: usize) -> Self {
189        self.bal_streaming_threads = Some(bal_streaming_threads);
190        self
191    }
192
193    /// Compute the default number of threads based on available parallelism.
194    fn default_thread_count(&self) -> usize {
195        // TODO: reserved_cpu_cores is currently ignored because subtracting from thread pool
196        // sizes doesn't actually reserve CPU cores for other processes.
197        let _ = self.reserved_cpu_cores;
198        self.cpu_threads.unwrap_or_else(|| available_parallelism().map_or(1, NonZeroUsize::get))
199    }
200}
201
202/// Configuration for building a [`Runtime`].
203#[derive(Debug, Clone, Default)]
204pub struct RuntimeConfig {
205    /// Tokio runtime configuration.
206    pub tokio: TokioConfig,
207    /// Rayon thread pool configuration.
208    #[cfg(feature = "rayon")]
209    pub rayon: RayonConfig,
210}
211
212impl RuntimeConfig {
213    /// Set the tokio configuration.
214    pub fn with_tokio(mut self, tokio: TokioConfig) -> Self {
215        self.tokio = tokio;
216        self
217    }
218
219    /// Set the rayon configuration.
220    #[cfg(feature = "rayon")]
221    pub const fn with_rayon(mut self, rayon: RayonConfig) -> Self {
222        self.rayon = rayon;
223        self
224    }
225}
226
227/// Error returned when [`RuntimeBuilder::build`] fails.
228#[derive(Debug, thiserror::Error)]
229pub enum RuntimeBuildError {
230    /// Failed to build the tokio runtime.
231    #[error("Failed to build tokio runtime: {0}")]
232    TokioBuild(#[from] std::io::Error),
233    /// Failed to build a rayon thread pool.
234    #[cfg(feature = "rayon")]
235    #[error("Failed to build rayon thread pool: {0}")]
236    RayonBuild(#[from] rayon::ThreadPoolBuildError),
237}
238
239// ── RuntimeInner ──────────────────────────────────────────────────────
240
241struct RuntimeInner {
242    /// Owned tokio runtime, if we built one. Kept alive via the `Arc<RuntimeInner>`.
243    _tokio_runtime: Option<TokioRuntime>,
244    /// Handle to the tokio runtime.
245    handle: Handle,
246    /// Receiver of the shutdown signal.
247    on_shutdown: Shutdown,
248    /// Sender half for sending task events to the [`TaskManager`].
249    task_events_tx: UnboundedSender<TaskEvent>,
250    /// Task executor metrics.
251    metrics: TaskExecutorMetrics,
252    /// How many [`GracefulShutdown`] tasks are currently active.
253    graceful_tasks: Arc<AtomicUsize>,
254    /// General-purpose rayon CPU pool.
255    #[cfg(feature = "rayon")]
256    cpu_pool: rayon::ThreadPool,
257    /// RPC blocking pool.
258    #[cfg(feature = "rayon")]
259    rpc_pool: BlockingTaskPool,
260    /// Storage I/O pool.
261    #[cfg(feature = "rayon")]
262    storage_pool: rayon::ThreadPool,
263    /// Rate limiter for expensive RPC operations.
264    #[cfg(feature = "rayon")]
265    blocking_guard: BlockingTaskGuard,
266    /// Proof storage worker pool (trie storage proof computation).
267    #[cfg(feature = "rayon")]
268    proof_storage_worker_pool: WorkerPool,
269    /// Proof account worker pool (trie account proof computation).
270    #[cfg(feature = "rayon")]
271    proof_account_worker_pool: WorkerPool,
272    /// Prewarming pool (execution prewarming workers).
273    #[cfg(feature = "rayon")]
274    prewarming_pool: WorkerPool,
275    /// BAL streaming pool (BAL hashed state streaming).
276    #[cfg(feature = "rayon")]
277    bal_streaming_pool: WorkerPool,
278    /// Named single-thread worker map. Each unique name gets a dedicated OS thread
279    /// that is reused across all tasks submitted under that name.
280    worker_map: WorkerMap,
281    /// Handle to the spawned [`TaskManager`] background task.
282    /// The task monitors critical tasks for panics and fires the shutdown signal.
283    /// Can be taken via [`Runtime::take_task_manager_handle`] to poll for panic errors.
284    task_manager_handle: Mutex<Option<JoinHandle<Result<(), PanickedTaskError>>>>,
285}
286
287// ── Runtime ───────────────────────────────────────────────────────────
288
289/// A cheaply cloneable handle to the runtime resources.
290///
291/// Wraps an `Arc<RuntimeInner>` and provides access to:
292/// - The tokio [`Handle`]
293/// - Task spawning with shutdown awareness and panic monitoring
294/// - Rayon thread pools (with `rayon` feature)
295#[derive(Clone)]
296pub struct Runtime(Arc<RuntimeInner>);
297
298impl std::fmt::Debug for Runtime {
299    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
300        f.debug_struct("Runtime").field("handle", &self.0.handle).finish()
301    }
302}
303
304// ── Pool accessors ────────────────────────────────────────────────────
305
306impl Runtime {
307    /// Takes the [`TaskManager`] handle out of this runtime, if one is stored.
308    ///
309    /// The handle resolves with `Err(PanickedTaskError)` if a critical task panicked,
310    /// or `Ok(())` if shutdown was requested. If not taken, the background task still
311    /// runs and logs panics at `debug!` level.
312    pub fn take_task_manager_handle(&self) -> Option<JoinHandle<Result<(), PanickedTaskError>>> {
313        self.0.task_manager_handle.lock().unwrap().take()
314    }
315
316    /// Returns the tokio runtime [`Handle`].
317    pub fn handle(&self) -> &Handle {
318        &self.0.handle
319    }
320
321    /// Get the general-purpose rayon CPU thread pool.
322    #[cfg(feature = "rayon")]
323    pub fn cpu_pool(&self) -> &rayon::ThreadPool {
324        &self.0.cpu_pool
325    }
326
327    /// Get the RPC blocking task pool.
328    #[cfg(feature = "rayon")]
329    pub fn rpc_pool(&self) -> &BlockingTaskPool {
330        &self.0.rpc_pool
331    }
332
333    /// Get the storage I/O pool.
334    #[cfg(feature = "rayon")]
335    pub fn storage_pool(&self) -> &rayon::ThreadPool {
336        &self.0.storage_pool
337    }
338
339    /// Get a clone of the [`BlockingTaskGuard`].
340    #[cfg(feature = "rayon")]
341    pub fn blocking_guard(&self) -> BlockingTaskGuard {
342        self.0.blocking_guard.clone()
343    }
344
345    /// Get the proof storage worker pool.
346    #[cfg(feature = "rayon")]
347    pub fn proof_storage_worker_pool(&self) -> &WorkerPool {
348        &self.0.proof_storage_worker_pool
349    }
350
351    /// Get the proof account worker pool.
352    #[cfg(feature = "rayon")]
353    pub fn proof_account_worker_pool(&self) -> &WorkerPool {
354        &self.0.proof_account_worker_pool
355    }
356
357    /// Get the prewarming pool.
358    #[cfg(feature = "rayon")]
359    pub fn prewarming_pool(&self) -> &WorkerPool {
360        &self.0.prewarming_pool
361    }
362
363    /// Get the BAL streaming pool.
364    #[cfg(feature = "rayon")]
365    pub fn bal_streaming_pool(&self) -> &WorkerPool {
366        &self.0.bal_streaming_pool
367    }
368}
369
370// ── Test helpers ──────────────────────────────────────────────────────
371
372impl Runtime {
373    /// Creates a lightweight [`Runtime`] for tests with minimal thread pools.
374    ///
375    /// If called from within a tokio runtime (e.g. `#[tokio::test]`), attaches to the existing
376    /// handle to avoid shutdown panics when the test runtime is dropped.
377    pub fn test() -> Self {
378        let config = match Handle::try_current() {
379            Ok(handle) => Self::test_config().with_tokio(TokioConfig::existing_handle(handle)),
380            Err(_) => Self::test_config(),
381        };
382        RuntimeBuilder::new(config).build().expect("failed to build test Runtime")
383    }
384
385    const fn test_config() -> RuntimeConfig {
386        RuntimeConfig {
387            tokio: TokioConfig::Owned {
388                worker_threads: Some(2),
389                thread_keep_alive: DEFAULT_THREAD_KEEP_ALIVE,
390                thread_name: "tokio-test",
391            },
392            #[cfg(feature = "rayon")]
393            rayon: RayonConfig {
394                cpu_threads: Some(2),
395                reserved_cpu_cores: 0,
396                rpc_threads: Some(2),
397                storage_threads: Some(2),
398                max_blocking_tasks: 16,
399                proof_storage_worker_threads: Some(2),
400                proof_account_worker_threads: Some(2),
401                prewarming_threads: Some(2),
402                bal_streaming_threads: Some(2),
403            },
404        }
405    }
406}
407
408// ── Spawn methods ─────────────────────────────────────────────────────
409
410/// Determines how a task is spawned.
411enum TaskKind {
412    /// Spawn the task to the default executor [`Handle::spawn`].
413    Default,
414    /// Spawn the task to the blocking executor [`Handle::spawn_blocking`].
415    Blocking,
416}
417
418impl Runtime {
419    /// Returns the receiver of the shutdown signal.
420    pub fn on_shutdown_signal(&self) -> &Shutdown {
421        &self.0.on_shutdown
422    }
423
424    /// Spawns a future on the tokio runtime depending on the [`TaskKind`].
425    fn spawn_on_rt<F>(&self, fut: F, task_kind: TaskKind) -> JoinHandle<()>
426    where
427        F: Future<Output = ()> + Send + 'static,
428    {
429        match task_kind {
430            TaskKind::Default => self.0.handle.spawn(fut),
431            TaskKind::Blocking => {
432                let handle = self.0.handle.clone();
433                self.0.handle.spawn_blocking(move || handle.block_on(fut))
434            }
435        }
436    }
437
438    /// Spawns a regular task depending on the given [`TaskKind`].
439    fn spawn_task_as<F>(&self, fut: F, task_kind: TaskKind) -> JoinHandle<()>
440    where
441        F: Future<Output = ()> + Send + 'static,
442    {
443        match task_kind {
444            TaskKind::Default => self.0.metrics.inc_regular_tasks(),
445            TaskKind::Blocking => self.0.metrics.inc_regular_blocking_tasks(),
446        }
447        let on_shutdown = self.0.on_shutdown.clone();
448
449        let finished_counter = match task_kind {
450            TaskKind::Default => self.0.metrics.finished_regular_tasks_total.clone(),
451            TaskKind::Blocking => self.0.metrics.finished_regular_blocking_tasks_total.clone(),
452        };
453
454        let task = {
455            async move {
456                let _inc_counter_on_drop = IncCounterOnDrop::new(finished_counter);
457                let fut = pin!(fut);
458                let _ = select(on_shutdown, fut).await;
459            }
460        }
461        .in_current_span();
462
463        self.spawn_on_rt(task, task_kind)
464    }
465
466    /// Spawns the task onto the runtime.
467    /// The given future resolves as soon as the [Shutdown] signal is received.
468    ///
469    /// See also [`Handle::spawn`].
470    pub fn spawn_task<F>(&self, fut: F) -> JoinHandle<()>
471    where
472        F: Future<Output = ()> + Send + 'static,
473    {
474        self.spawn_task_as(fut, TaskKind::Default)
475    }
476
477    /// Spawns a blocking task onto the runtime.
478    /// The given future resolves as soon as the [Shutdown] signal is received.
479    ///
480    /// See also [`Handle::spawn_blocking`].
481    pub fn spawn_blocking_task<F>(&self, fut: F) -> JoinHandle<()>
482    where
483        F: Future<Output = ()> + Send + 'static,
484    {
485        self.spawn_task_as(fut, TaskKind::Blocking)
486    }
487
488    /// Spawns a blocking closure directly on the tokio runtime, bypassing shutdown
489    /// awareness. Useful for raw CPU-bound work.
490    pub fn spawn_blocking<F, R>(&self, func: F) -> JoinHandle<R>
491    where
492        F: FnOnce() -> R + Send + 'static,
493        R: Send + 'static,
494    {
495        self.0.handle.spawn_blocking(func)
496    }
497
498    /// Moves the given value to a dedicated background thread for deallocation.
499    ///
500    /// This is useful when dropping a value is expensive (e.g. large nested collections)
501    /// and should not block the current task. Uses a persistent named thread (`"drop"`)
502    /// to avoid thread creation overhead on hot paths.
503    pub fn spawn_drop<T: Send + 'static>(&self, value: T) {
504        self.spawn_blocking_named("drop", move || drop(value));
505    }
506
507    /// Spawns a blocking closure on a dedicated, named OS thread.
508    ///
509    /// Unlike [`spawn_blocking`](Self::spawn_blocking) which uses tokio's blocking thread pool,
510    /// this reuses the same OS thread for all tasks submitted under the same `name`. The thread
511    /// is created lazily on first use and its OS thread name is set to `name`.
512    ///
513    /// This is useful for tasks that benefit from running on a stable thread, e.g. for
514    /// thread-local state reuse or to avoid thread creation overhead on hot paths.
515    ///
516    /// Returns a [`LazyHandle`](crate::LazyHandle) handle that resolves on first access and caches
517    /// the result.
518    pub fn spawn_blocking_named<F, R>(&self, name: &'static str, func: F) -> crate::LazyHandle<R>
519    where
520        F: FnOnce() -> R + Send + 'static,
521        R: Send + 'static,
522    {
523        crate::LazyHandle::new(self.0.worker_map.spawn_on(name, func))
524    }
525
526    /// Spawns the task onto the runtime.
527    /// The given future resolves as soon as the [Shutdown] signal is received.
528    ///
529    /// See also [`Handle::spawn`].
530    pub fn spawn_with_signal<F>(&self, f: impl FnOnce(Shutdown) -> F) -> JoinHandle<()>
531    where
532        F: Future<Output = ()> + Send + 'static,
533    {
534        let on_shutdown = self.0.on_shutdown.clone();
535        let fut = f(on_shutdown);
536        let task = fut.in_current_span();
537        self.0.handle.spawn(task)
538    }
539
540    /// Spawns a critical task depending on the given [`TaskKind`].
541    fn spawn_critical_as<F>(
542        &self,
543        name: &'static str,
544        fut: F,
545        task_kind: TaskKind,
546    ) -> JoinHandle<()>
547    where
548        F: Future<Output = ()> + Send + 'static,
549    {
550        self.0.metrics.inc_critical_tasks();
551        let panicked_tasks_tx = self.0.task_events_tx.clone();
552        let on_shutdown = self.0.on_shutdown.clone();
553
554        // wrap the task in catch unwind
555        let task = std::panic::AssertUnwindSafe(fut)
556            .catch_unwind()
557            .map_err(move |error| {
558                let task_error = PanickedTaskError::new(name, error);
559                error!("{task_error}");
560                let _ = panicked_tasks_tx.send(TaskEvent::Panic(task_error));
561            })
562            .in_current_span();
563
564        let finished_critical_tasks_total_metrics =
565            self.0.metrics.finished_critical_tasks_total.clone();
566        let task = async move {
567            let _inc_counter_on_drop = IncCounterOnDrop::new(finished_critical_tasks_total_metrics);
568            let task = pin!(task);
569            let _ = select(on_shutdown, task).await;
570        };
571
572        self.spawn_on_rt(task, task_kind)
573    }
574
575    /// This spawns a critical task onto the runtime.
576    /// The given future resolves as soon as the [Shutdown] signal is received.
577    ///
578    /// If this task panics, the [`TaskManager`] is notified.
579    pub fn spawn_critical_task<F>(&self, name: &'static str, fut: F) -> JoinHandle<()>
580    where
581        F: Future<Output = ()> + Send + 'static,
582    {
583        self.spawn_critical_as(name, fut, TaskKind::Default)
584    }
585
586    /// This spawns a critical blocking task onto the runtime.
587    /// The given future resolves as soon as the [Shutdown] signal is received.
588    ///
589    /// If this task panics, the [`TaskManager`] is notified.
590    pub fn spawn_critical_blocking_task<F>(&self, name: &'static str, fut: F) -> JoinHandle<()>
591    where
592        F: Future<Output = ()> + Send + 'static,
593    {
594        self.spawn_critical_as(name, fut, TaskKind::Blocking)
595    }
596
597    /// This spawns a critical task onto a dedicated named OS thread.
598    /// The given future resolves as soon as the [`Shutdown`] signal is received.
599    ///
600    /// If this task panics, the [`TaskManager`] is notified.
601    pub fn spawn_critical_os_thread<F>(
602        &self,
603        thread_name: &'static str,
604        task_name: &'static str,
605        fut: F,
606    ) -> thread::JoinHandle<()>
607    where
608        F: Future<Output = ()> + Send + 'static,
609    {
610        self.0.metrics.inc_critical_tasks();
611        let handle = self.0.handle.clone();
612        let panicked_tasks_tx = self.0.task_events_tx.clone();
613        let on_shutdown = self.0.on_shutdown.clone();
614
615        let task = std::panic::AssertUnwindSafe(fut)
616            .catch_unwind()
617            .map_err(move |error| {
618                let task_error = PanickedTaskError::new(task_name, error);
619                error!("{task_error}");
620                let _ = panicked_tasks_tx.send(TaskEvent::Panic(task_error));
621            })
622            .in_current_span();
623
624        let finished_critical_tasks_total_metrics =
625            self.0.metrics.finished_critical_tasks_total.clone();
626        let task = async move {
627            let _inc_counter_on_drop = IncCounterOnDrop::new(finished_critical_tasks_total_metrics);
628            let task = pin!(task);
629            let _ = select(on_shutdown, task).await;
630        };
631
632        thread::Builder::new()
633            .name(thread_name.to_string())
634            .spawn(move || {
635                let _guard = handle.enter();
636                handle.block_on(task);
637            })
638            .unwrap_or_else(|e| panic!("failed to spawn critical OS thread {thread_name:?}: {e}"))
639    }
640
641    /// This spawns a critical task onto the runtime.
642    ///
643    /// If this task panics, the [`TaskManager`] is notified.
644    /// The [`TaskManager`] will wait until the given future has completed before shutting down.
645    ///
646    /// # Example
647    ///
648    /// ```no_run
649    /// # async fn t(executor: reth_tasks::TaskExecutor) {
650    ///
651    /// executor.spawn_critical_with_graceful_shutdown_signal("grace", async move |shutdown| {
652    ///     // await the shutdown signal
653    ///     let guard = shutdown.await;
654    ///     // do work before exiting the program
655    ///     tokio::time::sleep(std::time::Duration::from_secs(1)).await;
656    ///     // allow graceful shutdown
657    ///     drop(guard);
658    /// });
659    /// # }
660    /// ```
661    pub fn spawn_critical_with_graceful_shutdown_signal<F>(
662        &self,
663        name: &'static str,
664        f: impl FnOnce(GracefulShutdown) -> F,
665    ) -> JoinHandle<()>
666    where
667        F: Future<Output = ()> + Send + 'static,
668    {
669        let panicked_tasks_tx = self.0.task_events_tx.clone();
670        let on_shutdown = GracefulShutdown::new(
671            self.0.on_shutdown.clone(),
672            GracefulShutdownGuard::new(Arc::clone(&self.0.graceful_tasks)),
673        );
674        let fut = f(on_shutdown);
675
676        // wrap the task in catch unwind
677        let task = std::panic::AssertUnwindSafe(fut)
678            .catch_unwind()
679            .map_err(move |error| {
680                let task_error = PanickedTaskError::new(name, error);
681                error!("{task_error}");
682                let _ = panicked_tasks_tx.send(TaskEvent::Panic(task_error));
683            })
684            .map(drop)
685            .in_current_span();
686
687        self.0.handle.spawn(task)
688    }
689
690    /// This spawns a regular task onto the runtime.
691    ///
692    /// The [`TaskManager`] will wait until the given future has completed before shutting down.
693    ///
694    /// # Example
695    ///
696    /// ```no_run
697    /// # async fn t(executor: reth_tasks::TaskExecutor) {
698    ///
699    /// executor.spawn_with_graceful_shutdown_signal(async move |shutdown| {
700    ///     // await the shutdown signal
701    ///     let guard = shutdown.await;
702    ///     // do work before exiting the program
703    ///     tokio::time::sleep(std::time::Duration::from_secs(1)).await;
704    ///     // allow graceful shutdown
705    ///     drop(guard);
706    /// });
707    /// # }
708    /// ```
709    pub fn spawn_with_graceful_shutdown_signal<F>(
710        &self,
711        f: impl FnOnce(GracefulShutdown) -> F,
712    ) -> JoinHandle<()>
713    where
714        F: Future<Output = ()> + Send + 'static,
715    {
716        let on_shutdown = GracefulShutdown::new(
717            self.0.on_shutdown.clone(),
718            GracefulShutdownGuard::new(Arc::clone(&self.0.graceful_tasks)),
719        );
720        let fut = f(on_shutdown);
721
722        self.0.handle.spawn(fut)
723    }
724
725    /// Sends a request to the `TaskManager` to initiate a graceful shutdown.
726    ///
727    /// Caution: This will terminate the entire program.
728    pub fn initiate_graceful_shutdown(
729        &self,
730    ) -> Result<GracefulShutdown, tokio::sync::mpsc::error::SendError<()>> {
731        self.0
732            .task_events_tx
733            .send(TaskEvent::GracefulShutdown)
734            .map_err(|_send_error_with_task_event| tokio::sync::mpsc::error::SendError(()))?;
735
736        Ok(GracefulShutdown::new(
737            self.0.on_shutdown.clone(),
738            GracefulShutdownGuard::new(Arc::clone(&self.0.graceful_tasks)),
739        ))
740    }
741
742    /// Fires the shutdown signal and waits until all graceful tasks complete.
743    pub fn graceful_shutdown(&self) {
744        let _ = self.do_graceful_shutdown(None);
745    }
746
747    /// Fires the shutdown signal and waits until all graceful tasks complete or the timeout
748    /// elapses.
749    ///
750    /// Returns `true` if all tasks completed before the timeout.
751    pub fn graceful_shutdown_with_timeout(&self, timeout: Duration) -> bool {
752        self.do_graceful_shutdown(Some(timeout))
753    }
754
755    fn do_graceful_shutdown(&self, timeout: Option<Duration>) -> bool {
756        let _ = self.0.task_events_tx.send(TaskEvent::GracefulShutdown);
757        let deadline = timeout.map(|t| Instant::now() + t);
758        while self.0.graceful_tasks.load(Ordering::SeqCst) > 0 {
759            if deadline.is_some_and(|d| Instant::now() > d) {
760                debug!("graceful shutdown timed out");
761                return false;
762            }
763            std::thread::yield_now();
764        }
765        debug!("gracefully shut down");
766        true
767    }
768}
769
770// ── RuntimeBuilder ────────────────────────────────────────────────────
771
772/// Builder for constructing a [`Runtime`].
773#[derive(Debug, Clone)]
774pub struct RuntimeBuilder {
775    config: RuntimeConfig,
776}
777
778impl RuntimeBuilder {
779    /// Create a new builder with the given configuration.
780    pub const fn new(config: RuntimeConfig) -> Self {
781        Self { config }
782    }
783
784    /// Build the [`Runtime`].
785    ///
786    /// The [`TaskManager`] is automatically spawned as a background task that monitors
787    /// critical tasks for panics. Use [`Runtime::take_task_manager_handle`] to extract
788    /// the join handle if you need to poll for panic errors.
789    #[tracing::instrument(name = "RuntimeBuilder::build", level = "debug", skip_all)]
790    pub fn build(self) -> Result<Runtime, RuntimeBuildError> {
791        debug!(?self.config, "Building runtime");
792        let config = self.config;
793
794        let (owned_runtime, handle) = match &config.tokio {
795            TokioConfig::Owned { worker_threads, thread_keep_alive, thread_name } => {
796                let mut builder = tokio::runtime::Builder::new_multi_thread();
797                builder
798                    .enable_all()
799                    .thread_keep_alive(*thread_keep_alive)
800                    .thread_name(*thread_name);
801
802                if let Some(threads) = worker_threads {
803                    builder.worker_threads(*threads);
804                }
805
806                let runtime = builder.build()?;
807                let h = runtime.handle().clone();
808                (Some(runtime), h)
809            }
810            TokioConfig::ExistingHandle(h) => (None, h.clone()),
811        };
812
813        let (task_manager, on_shutdown, task_events_tx, graceful_tasks) =
814            TaskManager::new_parts(handle.clone());
815
816        #[cfg(feature = "rayon")]
817        let (
818            cpu_pool,
819            rpc_pool,
820            storage_pool,
821            blocking_guard,
822            proof_storage_worker_pool,
823            proof_account_worker_pool,
824            prewarming_pool,
825            bal_streaming_pool,
826        ) = {
827            let default_threads = config.rayon.default_thread_count();
828            let rpc_threads = config.rayon.rpc_threads.unwrap_or(default_threads);
829
830            let cpu_pool = build_pool_with_panic_handler(
831                rayon::ThreadPoolBuilder::new()
832                    .num_threads(default_threads)
833                    .thread_name(|i| format!("cpu-{i:02}")),
834            )?;
835
836            let rpc_raw = build_pool_with_panic_handler(
837                rayon::ThreadPoolBuilder::new()
838                    .num_threads(rpc_threads)
839                    .thread_name(|i| format!("rpc-{i:02}")),
840            )?;
841            let rpc_pool = BlockingTaskPool::new(rpc_raw);
842
843            let storage_threads =
844                config.rayon.storage_threads.unwrap_or(DEFAULT_STORAGE_POOL_THREADS);
845            let storage_pool = build_pool_with_panic_handler(
846                rayon::ThreadPoolBuilder::new()
847                    .num_threads(storage_threads)
848                    .thread_name(|i| format!("storage-{i:02}")),
849            )?;
850
851            let blocking_guard = BlockingTaskGuard::new(config.rayon.max_blocking_tasks);
852
853            let proof_storage_worker_threads =
854                config.rayon.proof_storage_worker_threads.unwrap_or(default_threads * 2);
855            let proof_storage_worker_pool =
856                WorkerPool::new(proof_storage_worker_threads, "proof-strg");
857
858            let proof_account_worker_threads =
859                config.rayon.proof_account_worker_threads.unwrap_or(default_threads * 2);
860            let proof_account_worker_pool =
861                WorkerPool::new(proof_account_worker_threads, "proof-acct");
862
863            let prewarming_threads = config.rayon.prewarming_threads.unwrap_or(default_threads);
864            let prewarming_pool = WorkerPool::new(prewarming_threads, "prewarm");
865
866            let bal_streaming_threads =
867                config.rayon.bal_streaming_threads.unwrap_or(default_threads);
868            let bal_streaming_pool = WorkerPool::new(bal_streaming_threads, "bal-stream");
869
870            debug!(
871                default_threads,
872                rpc_threads,
873                storage_threads,
874                proof_storage_worker_threads,
875                proof_account_worker_threads,
876                prewarming_threads,
877                bal_streaming_threads,
878                max_blocking_tasks = config.rayon.max_blocking_tasks,
879                "Configured lazy rayon worker pools"
880            );
881
882            (
883                cpu_pool,
884                rpc_pool,
885                storage_pool,
886                blocking_guard,
887                proof_storage_worker_pool,
888                proof_account_worker_pool,
889                prewarming_pool,
890                bal_streaming_pool,
891            )
892        };
893
894        let task_manager_handle = handle.spawn(async move {
895            let result = task_manager.await;
896            if let Err(ref err) = result {
897                debug!("{err}");
898            }
899            result
900        });
901
902        let inner = RuntimeInner {
903            _tokio_runtime: owned_runtime,
904            handle,
905            on_shutdown,
906            task_events_tx,
907            metrics: Default::default(),
908            graceful_tasks,
909            #[cfg(feature = "rayon")]
910            cpu_pool,
911            #[cfg(feature = "rayon")]
912            rpc_pool,
913            #[cfg(feature = "rayon")]
914            storage_pool,
915            #[cfg(feature = "rayon")]
916            blocking_guard,
917            #[cfg(feature = "rayon")]
918            proof_storage_worker_pool,
919            #[cfg(feature = "rayon")]
920            proof_account_worker_pool,
921            #[cfg(feature = "rayon")]
922            prewarming_pool,
923            #[cfg(feature = "rayon")]
924            bal_streaming_pool,
925            worker_map: WorkerMap::new(),
926            task_manager_handle: Mutex::new(Some(task_manager_handle)),
927        };
928
929        Ok(Runtime(Arc::new(inner)))
930    }
931}
932
933#[cfg(test)]
934mod tests {
935    use super::*;
936
937    #[test]
938    fn test_runtime_config_default() {
939        let config = RuntimeConfig::default();
940        assert!(matches!(config.tokio, TokioConfig::Owned { .. }));
941    }
942
943    #[test]
944    fn test_runtime_config_existing_handle() {
945        let rt = TokioRuntime::new().unwrap();
946        let config =
947            Runtime::test_config().with_tokio(TokioConfig::existing_handle(rt.handle().clone()));
948        assert!(matches!(config.tokio, TokioConfig::ExistingHandle(_)));
949    }
950
951    #[cfg(feature = "rayon")]
952    #[test]
953    fn test_rayon_config_thread_count() {
954        let config = RayonConfig::default();
955        let count = config.default_thread_count();
956        assert!(count >= 1);
957    }
958
959    #[test]
960    fn test_runtime_builder() {
961        let rt = TokioRuntime::new().unwrap();
962        let config =
963            Runtime::test_config().with_tokio(TokioConfig::existing_handle(rt.handle().clone()));
964        let runtime = RuntimeBuilder::new(config).build().unwrap();
965        let _ = runtime.handle();
966    }
967
968    #[test]
969    fn critical_os_thread_uses_requested_name() {
970        let runtime = Runtime::test();
971        let (tx, rx) = std::sync::mpsc::channel();
972
973        let handle = runtime.spawn_critical_os_thread(
974            "critical-os-test",
975            "critical os thread test",
976            async move {
977                let name = thread::current().name().unwrap().to_string();
978                tx.send(name).unwrap();
979            },
980        );
981
982        let name = rx.recv_timeout(Duration::from_secs(5)).unwrap();
983        assert_eq!(name, "critical-os-test");
984        handle.join().unwrap();
985    }
986
987    #[test]
988    fn critical_os_thread_panic_is_reported() {
989        let runtime = Runtime::test();
990        let manager_handle = runtime.take_task_manager_handle().unwrap();
991
992        let handle = runtime.spawn_critical_os_thread(
993            "critical-os-panic",
994            "critical os thread panic test",
995            async { panic!("critical os thread panic") },
996        );
997
998        let err =
999            runtime.handle().block_on(async move { manager_handle.await.unwrap().unwrap_err() });
1000        assert_eq!(err.task_name, "critical os thread panic test");
1001        assert_eq!(err.error, Some("critical os thread panic".to_string()));
1002        handle.join().unwrap();
1003    }
1004
1005    #[cfg(feature = "rayon")]
1006    #[test]
1007    fn test_worker_pools_are_lazy() {
1008        let runtime = Runtime::test();
1009
1010        // Worker pools are lazy — not initialized until first access.
1011        assert!(!runtime.0.bal_streaming_pool.is_initialized());
1012        assert!(!runtime.0.proof_storage_worker_pool.is_initialized());
1013
1014        // Accessing them triggers initialization and returns the configured thread count.
1015        assert_eq!(runtime.bal_streaming_pool().current_num_threads(), 2);
1016        assert!(runtime.0.bal_streaming_pool.is_initialized());
1017
1018        assert_eq!(runtime.proof_storage_worker_pool().current_num_threads(), 2);
1019        assert_eq!(runtime.proof_account_worker_pool().current_num_threads(), 2);
1020        assert_eq!(runtime.prewarming_pool().current_num_threads(), 2);
1021    }
1022}