Skip to main content

reth_tasks/
runtime.rs

1//! Centralized management of async and parallel execution.
2//!
3//! This module provides [`Runtime`], a cheaply cloneable handle that manages:
4//! - Tokio runtime (either owned or attached)
5//! - Task spawning with shutdown awareness and panic monitoring
6//! - Dedicated rayon thread pools for different workloads (with `rayon` feature)
7//! - [`BlockingTaskGuard`] for rate-limiting expensive operations (with `rayon` feature)
8
9#[cfg(feature = "rayon")]
10use crate::pool::{BlockingTaskGuard, BlockingTaskPool, WorkerPool};
11use crate::{
12    metrics::{IncCounterOnDrop, TaskExecutorMetrics},
13    shutdown::{GracefulShutdown, GracefulShutdownGuard, Shutdown},
14    worker_map::WorkerMap,
15    PanickedTaskError, TaskEvent, TaskManager,
16};
17use futures_util::{future::select, Future, FutureExt, TryFutureExt};
18#[cfg(feature = "rayon")]
19use std::{num::NonZeroUsize, thread::available_parallelism};
20use std::{
21    pin::pin,
22    sync::{
23        atomic::{AtomicUsize, Ordering},
24        Arc, Mutex,
25    },
26    time::{Duration, Instant},
27};
28use tokio::{runtime::Handle, sync::mpsc::UnboundedSender, task::JoinHandle};
29use tracing::{debug, error};
30use tracing_futures::Instrument;
31
32use tokio::runtime::Runtime as TokioRuntime;
33
34/// Default thread keep-alive duration for the tokio runtime.
35pub const DEFAULT_THREAD_KEEP_ALIVE: Duration = Duration::from_secs(15);
36
37/// Default reserved CPU cores for OS and other processes.
38pub const DEFAULT_RESERVED_CPU_CORES: usize = 2;
39
40/// Default number of threads for the storage I/O pool.
41pub const DEFAULT_STORAGE_POOL_THREADS: usize = 16;
42
43/// Default maximum number of concurrent blocking tasks (for RPC tracing guard).
44pub const DEFAULT_MAX_BLOCKING_TASKS: usize = 512;
45
46/// Configuration for the tokio runtime.
47#[derive(Debug, Clone)]
48pub enum TokioConfig {
49    /// Build and own a new tokio runtime.
50    Owned {
51        /// Number of worker threads. If `None`, uses tokio's default (number of CPU cores).
52        worker_threads: Option<usize>,
53        /// How long to keep worker threads alive when idle.
54        thread_keep_alive: Duration,
55        /// Thread name prefix.
56        thread_name: &'static str,
57    },
58    /// Attach to an existing tokio runtime handle.
59    ExistingHandle(Handle),
60}
61
62impl Default for TokioConfig {
63    fn default() -> Self {
64        Self::Owned {
65            worker_threads: None,
66            thread_keep_alive: DEFAULT_THREAD_KEEP_ALIVE,
67            thread_name: "tokio-rt",
68        }
69    }
70}
71
72impl TokioConfig {
73    /// Create a config that attaches to an existing runtime handle.
74    pub const fn existing_handle(handle: Handle) -> Self {
75        Self::ExistingHandle(handle)
76    }
77
78    /// Create a config for an owned runtime with the specified number of worker threads.
79    pub const fn with_worker_threads(worker_threads: usize) -> Self {
80        Self::Owned {
81            worker_threads: Some(worker_threads),
82            thread_keep_alive: DEFAULT_THREAD_KEEP_ALIVE,
83            thread_name: "tokio-rt",
84        }
85    }
86}
87
88/// Configuration for the rayon thread pools.
89#[derive(Debug, Clone)]
90#[cfg(feature = "rayon")]
91pub struct RayonConfig {
92    /// Number of threads for the general CPU pool.
93    /// If `None`, derived from available parallelism minus reserved cores.
94    pub cpu_threads: Option<usize>,
95    /// Number of CPU cores to reserve for OS and other processes.
96    pub reserved_cpu_cores: usize,
97    /// Number of threads for the RPC blocking pool (trace calls, `eth_getProof`, etc.).
98    /// If `None`, uses the same as `cpu_threads`.
99    pub rpc_threads: Option<usize>,
100    /// Number of threads for the storage I/O pool (static file, `RocksDB` writes in
101    /// `save_blocks`). If `None`, uses [`DEFAULT_STORAGE_POOL_THREADS`].
102    pub storage_threads: Option<usize>,
103    /// Maximum number of concurrent blocking tasks for the RPC guard semaphore.
104    pub max_blocking_tasks: usize,
105    /// Number of threads for the proof storage worker pool (trie storage proof workers).
106    /// If `None`, derived from available parallelism.
107    pub proof_storage_worker_threads: Option<usize>,
108    /// Number of threads for the proof account worker pool (trie account proof workers).
109    /// If `None`, derived from available parallelism.
110    pub proof_account_worker_threads: Option<usize>,
111    /// Number of threads for the prewarming pool (execution prewarming workers).
112    /// If `None`, derived from available parallelism.
113    pub prewarming_threads: Option<usize>,
114}
115
116#[cfg(feature = "rayon")]
117impl Default for RayonConfig {
118    fn default() -> Self {
119        Self {
120            cpu_threads: None,
121            reserved_cpu_cores: DEFAULT_RESERVED_CPU_CORES,
122            rpc_threads: None,
123            storage_threads: None,
124            max_blocking_tasks: DEFAULT_MAX_BLOCKING_TASKS,
125            proof_storage_worker_threads: None,
126            proof_account_worker_threads: None,
127            prewarming_threads: None,
128        }
129    }
130}
131
132#[cfg(feature = "rayon")]
133impl RayonConfig {
134    /// Set the number of reserved CPU cores.
135    pub const fn with_reserved_cpu_cores(mut self, reserved_cpu_cores: usize) -> Self {
136        self.reserved_cpu_cores = reserved_cpu_cores;
137        self
138    }
139
140    /// Set the maximum number of concurrent blocking tasks.
141    pub const fn with_max_blocking_tasks(mut self, max_blocking_tasks: usize) -> Self {
142        self.max_blocking_tasks = max_blocking_tasks;
143        self
144    }
145
146    /// Set the number of threads for the RPC blocking pool.
147    pub const fn with_rpc_threads(mut self, rpc_threads: usize) -> Self {
148        self.rpc_threads = Some(rpc_threads);
149        self
150    }
151
152    /// Set the number of threads for the storage I/O pool.
153    pub const fn with_storage_threads(mut self, storage_threads: usize) -> Self {
154        self.storage_threads = Some(storage_threads);
155        self
156    }
157
158    /// Set the number of threads for the proof storage worker pool.
159    pub const fn with_proof_storage_worker_threads(
160        mut self,
161        proof_storage_worker_threads: usize,
162    ) -> Self {
163        self.proof_storage_worker_threads = Some(proof_storage_worker_threads);
164        self
165    }
166
167    /// Set the number of threads for the proof account worker pool.
168    pub const fn with_proof_account_worker_threads(
169        mut self,
170        proof_account_worker_threads: usize,
171    ) -> Self {
172        self.proof_account_worker_threads = Some(proof_account_worker_threads);
173        self
174    }
175
176    /// Set the number of threads for the prewarming pool.
177    pub const fn with_prewarming_threads(mut self, prewarming_threads: usize) -> Self {
178        self.prewarming_threads = Some(prewarming_threads);
179        self
180    }
181
182    /// Compute the default number of threads based on available parallelism.
183    fn default_thread_count(&self) -> usize {
184        // TODO: reserved_cpu_cores is currently ignored because subtracting from thread pool
185        // sizes doesn't actually reserve CPU cores for other processes.
186        let _ = self.reserved_cpu_cores;
187        self.cpu_threads.unwrap_or_else(|| available_parallelism().map_or(1, NonZeroUsize::get))
188    }
189}
190
191/// Configuration for building a [`Runtime`].
192#[derive(Debug, Clone, Default)]
193pub struct RuntimeConfig {
194    /// Tokio runtime configuration.
195    pub tokio: TokioConfig,
196    /// Rayon thread pool configuration.
197    #[cfg(feature = "rayon")]
198    pub rayon: RayonConfig,
199}
200
201impl RuntimeConfig {
202    /// Set the tokio configuration.
203    pub fn with_tokio(mut self, tokio: TokioConfig) -> Self {
204        self.tokio = tokio;
205        self
206    }
207
208    /// Set the rayon configuration.
209    #[cfg(feature = "rayon")]
210    pub const fn with_rayon(mut self, rayon: RayonConfig) -> Self {
211        self.rayon = rayon;
212        self
213    }
214}
215
216/// Error returned when [`RuntimeBuilder::build`] fails.
217#[derive(Debug, thiserror::Error)]
218pub enum RuntimeBuildError {
219    /// Failed to build the tokio runtime.
220    #[error("Failed to build tokio runtime: {0}")]
221    TokioBuild(#[from] std::io::Error),
222    /// Failed to build a rayon thread pool.
223    #[cfg(feature = "rayon")]
224    #[error("Failed to build rayon thread pool: {0}")]
225    RayonBuild(#[from] rayon::ThreadPoolBuildError),
226}
227
228// ── RuntimeInner ──────────────────────────────────────────────────────
229
230struct RuntimeInner {
231    /// Owned tokio runtime, if we built one. Kept alive via the `Arc<RuntimeInner>`.
232    _tokio_runtime: Option<TokioRuntime>,
233    /// Handle to the tokio runtime.
234    handle: Handle,
235    /// Receiver of the shutdown signal.
236    on_shutdown: Shutdown,
237    /// Sender half for sending task events to the [`TaskManager`].
238    task_events_tx: UnboundedSender<TaskEvent>,
239    /// Task executor metrics.
240    metrics: TaskExecutorMetrics,
241    /// How many [`GracefulShutdown`] tasks are currently active.
242    graceful_tasks: Arc<AtomicUsize>,
243    /// General-purpose rayon CPU pool.
244    #[cfg(feature = "rayon")]
245    cpu_pool: rayon::ThreadPool,
246    /// RPC blocking pool.
247    #[cfg(feature = "rayon")]
248    rpc_pool: BlockingTaskPool,
249    /// Storage I/O pool.
250    #[cfg(feature = "rayon")]
251    storage_pool: rayon::ThreadPool,
252    /// Rate limiter for expensive RPC operations.
253    #[cfg(feature = "rayon")]
254    blocking_guard: BlockingTaskGuard,
255    /// Proof storage worker pool (trie storage proof computation).
256    #[cfg(feature = "rayon")]
257    proof_storage_worker_pool: WorkerPool,
258    /// Proof account worker pool (trie account proof computation).
259    #[cfg(feature = "rayon")]
260    proof_account_worker_pool: WorkerPool,
261    /// Prewarming pool (execution prewarming workers).
262    #[cfg(feature = "rayon")]
263    prewarming_pool: WorkerPool,
264    /// Named single-thread worker map. Each unique name gets a dedicated OS thread
265    /// that is reused across all tasks submitted under that name.
266    worker_map: WorkerMap,
267    /// Handle to the spawned [`TaskManager`] background task.
268    /// The task monitors critical tasks for panics and fires the shutdown signal.
269    /// Can be taken via [`Runtime::take_task_manager_handle`] to poll for panic errors.
270    task_manager_handle: Mutex<Option<JoinHandle<Result<(), PanickedTaskError>>>>,
271}
272
273// ── Runtime ───────────────────────────────────────────────────────────
274
275/// A cheaply cloneable handle to the runtime resources.
276///
277/// Wraps an `Arc<RuntimeInner>` and provides access to:
278/// - The tokio [`Handle`]
279/// - Task spawning with shutdown awareness and panic monitoring
280/// - Rayon thread pools (with `rayon` feature)
281#[derive(Clone)]
282pub struct Runtime(Arc<RuntimeInner>);
283
284impl std::fmt::Debug for Runtime {
285    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
286        f.debug_struct("Runtime").field("handle", &self.0.handle).finish()
287    }
288}
289
290// ── Pool accessors ────────────────────────────────────────────────────
291
292impl Runtime {
293    /// Takes the [`TaskManager`] handle out of this runtime, if one is stored.
294    ///
295    /// The handle resolves with `Err(PanickedTaskError)` if a critical task panicked,
296    /// or `Ok(())` if shutdown was requested. If not taken, the background task still
297    /// runs and logs panics at `debug!` level.
298    pub fn take_task_manager_handle(&self) -> Option<JoinHandle<Result<(), PanickedTaskError>>> {
299        self.0.task_manager_handle.lock().unwrap().take()
300    }
301
302    /// Returns the tokio runtime [`Handle`].
303    pub fn handle(&self) -> &Handle {
304        &self.0.handle
305    }
306
307    /// Get the general-purpose rayon CPU thread pool.
308    #[cfg(feature = "rayon")]
309    pub fn cpu_pool(&self) -> &rayon::ThreadPool {
310        &self.0.cpu_pool
311    }
312
313    /// Get the RPC blocking task pool.
314    #[cfg(feature = "rayon")]
315    pub fn rpc_pool(&self) -> &BlockingTaskPool {
316        &self.0.rpc_pool
317    }
318
319    /// Get the storage I/O pool.
320    #[cfg(feature = "rayon")]
321    pub fn storage_pool(&self) -> &rayon::ThreadPool {
322        &self.0.storage_pool
323    }
324
325    /// Get a clone of the [`BlockingTaskGuard`].
326    #[cfg(feature = "rayon")]
327    pub fn blocking_guard(&self) -> BlockingTaskGuard {
328        self.0.blocking_guard.clone()
329    }
330
331    /// Get the proof storage worker pool.
332    #[cfg(feature = "rayon")]
333    pub fn proof_storage_worker_pool(&self) -> &WorkerPool {
334        &self.0.proof_storage_worker_pool
335    }
336
337    /// Get the proof account worker pool.
338    #[cfg(feature = "rayon")]
339    pub fn proof_account_worker_pool(&self) -> &WorkerPool {
340        &self.0.proof_account_worker_pool
341    }
342
343    /// Get the prewarming pool.
344    #[cfg(feature = "rayon")]
345    pub fn prewarming_pool(&self) -> &WorkerPool {
346        &self.0.prewarming_pool
347    }
348}
349
350// ── Test helpers ──────────────────────────────────────────────────────
351
352impl Runtime {
353    /// Creates a lightweight [`Runtime`] for tests with minimal thread pools.
354    ///
355    /// If called from within a tokio runtime (e.g. `#[tokio::test]`), attaches to the existing
356    /// handle to avoid shutdown panics when the test runtime is dropped.
357    pub fn test() -> Self {
358        let config = match Handle::try_current() {
359            Ok(handle) => Self::test_config().with_tokio(TokioConfig::existing_handle(handle)),
360            Err(_) => Self::test_config(),
361        };
362        RuntimeBuilder::new(config).build().expect("failed to build test Runtime")
363    }
364
365    const fn test_config() -> RuntimeConfig {
366        RuntimeConfig {
367            tokio: TokioConfig::Owned {
368                worker_threads: Some(2),
369                thread_keep_alive: DEFAULT_THREAD_KEEP_ALIVE,
370                thread_name: "tokio-test",
371            },
372            #[cfg(feature = "rayon")]
373            rayon: RayonConfig {
374                cpu_threads: Some(2),
375                reserved_cpu_cores: 0,
376                rpc_threads: Some(2),
377                storage_threads: Some(2),
378                max_blocking_tasks: 16,
379                proof_storage_worker_threads: Some(2),
380                proof_account_worker_threads: Some(2),
381                prewarming_threads: Some(2),
382            },
383        }
384    }
385}
386
387// ── Spawn methods ─────────────────────────────────────────────────────
388
389/// Determines how a task is spawned.
390enum TaskKind {
391    /// Spawn the task to the default executor [`Handle::spawn`].
392    Default,
393    /// Spawn the task to the blocking executor [`Handle::spawn_blocking`].
394    Blocking,
395}
396
397impl Runtime {
398    /// Returns the receiver of the shutdown signal.
399    pub fn on_shutdown_signal(&self) -> &Shutdown {
400        &self.0.on_shutdown
401    }
402
403    /// Spawns a future on the tokio runtime depending on the [`TaskKind`].
404    fn spawn_on_rt<F>(&self, fut: F, task_kind: TaskKind) -> JoinHandle<()>
405    where
406        F: Future<Output = ()> + Send + 'static,
407    {
408        match task_kind {
409            TaskKind::Default => self.0.handle.spawn(fut),
410            TaskKind::Blocking => {
411                let handle = self.0.handle.clone();
412                self.0.handle.spawn_blocking(move || handle.block_on(fut))
413            }
414        }
415    }
416
417    /// Spawns a regular task depending on the given [`TaskKind`].
418    fn spawn_task_as<F>(&self, fut: F, task_kind: TaskKind) -> JoinHandle<()>
419    where
420        F: Future<Output = ()> + Send + 'static,
421    {
422        match task_kind {
423            TaskKind::Default => self.0.metrics.inc_regular_tasks(),
424            TaskKind::Blocking => self.0.metrics.inc_regular_blocking_tasks(),
425        }
426        let on_shutdown = self.0.on_shutdown.clone();
427
428        let finished_counter = match task_kind {
429            TaskKind::Default => self.0.metrics.finished_regular_tasks_total.clone(),
430            TaskKind::Blocking => self.0.metrics.finished_regular_blocking_tasks_total.clone(),
431        };
432
433        let task = {
434            async move {
435                let _inc_counter_on_drop = IncCounterOnDrop::new(finished_counter);
436                let fut = pin!(fut);
437                let _ = select(on_shutdown, fut).await;
438            }
439        }
440        .in_current_span();
441
442        self.spawn_on_rt(task, task_kind)
443    }
444
445    /// Spawns the task onto the runtime.
446    /// The given future resolves as soon as the [Shutdown] signal is received.
447    ///
448    /// See also [`Handle::spawn`].
449    pub fn spawn_task<F>(&self, fut: F) -> JoinHandle<()>
450    where
451        F: Future<Output = ()> + Send + 'static,
452    {
453        self.spawn_task_as(fut, TaskKind::Default)
454    }
455
456    /// Spawns a blocking task onto the runtime.
457    /// The given future resolves as soon as the [Shutdown] signal is received.
458    ///
459    /// See also [`Handle::spawn_blocking`].
460    pub fn spawn_blocking_task<F>(&self, fut: F) -> JoinHandle<()>
461    where
462        F: Future<Output = ()> + Send + 'static,
463    {
464        self.spawn_task_as(fut, TaskKind::Blocking)
465    }
466
467    /// Spawns a blocking closure directly on the tokio runtime, bypassing shutdown
468    /// awareness. Useful for raw CPU-bound work.
469    pub fn spawn_blocking<F, R>(&self, func: F) -> JoinHandle<R>
470    where
471        F: FnOnce() -> R + Send + 'static,
472        R: Send + 'static,
473    {
474        self.0.handle.spawn_blocking(func)
475    }
476
477    /// Spawns a blocking closure on a dedicated, named OS thread.
478    ///
479    /// Unlike [`spawn_blocking`](Self::spawn_blocking) which uses tokio's blocking thread pool,
480    /// this reuses the same OS thread for all tasks submitted under the same `name`. The thread
481    /// is created lazily on first use and its OS thread name is set to `name`.
482    ///
483    /// This is useful for tasks that benefit from running on a stable thread, e.g. for
484    /// thread-local state reuse or to avoid thread creation overhead on hot paths.
485    ///
486    /// Returns a [`LazyHandle`](crate::LazyHandle) handle that resolves on first access and caches
487    /// the result.
488    pub fn spawn_blocking_named<F, R>(&self, name: &'static str, func: F) -> crate::LazyHandle<R>
489    where
490        F: FnOnce() -> R + Send + 'static,
491        R: Send + 'static,
492    {
493        crate::LazyHandle::new(self.0.worker_map.spawn_on(name, func))
494    }
495
496    /// Spawns the task onto the runtime.
497    /// The given future resolves as soon as the [Shutdown] signal is received.
498    ///
499    /// See also [`Handle::spawn`].
500    pub fn spawn_with_signal<F>(&self, f: impl FnOnce(Shutdown) -> F) -> JoinHandle<()>
501    where
502        F: Future<Output = ()> + Send + 'static,
503    {
504        let on_shutdown = self.0.on_shutdown.clone();
505        let fut = f(on_shutdown);
506        let task = fut.in_current_span();
507        self.0.handle.spawn(task)
508    }
509
510    /// Spawns a critical task depending on the given [`TaskKind`].
511    fn spawn_critical_as<F>(
512        &self,
513        name: &'static str,
514        fut: F,
515        task_kind: TaskKind,
516    ) -> JoinHandle<()>
517    where
518        F: Future<Output = ()> + Send + 'static,
519    {
520        self.0.metrics.inc_critical_tasks();
521        let panicked_tasks_tx = self.0.task_events_tx.clone();
522        let on_shutdown = self.0.on_shutdown.clone();
523
524        // wrap the task in catch unwind
525        let task = std::panic::AssertUnwindSafe(fut)
526            .catch_unwind()
527            .map_err(move |error| {
528                let task_error = PanickedTaskError::new(name, error);
529                error!("{task_error}");
530                let _ = panicked_tasks_tx.send(TaskEvent::Panic(task_error));
531            })
532            .in_current_span();
533
534        let finished_critical_tasks_total_metrics =
535            self.0.metrics.finished_critical_tasks_total.clone();
536        let task = async move {
537            let _inc_counter_on_drop = IncCounterOnDrop::new(finished_critical_tasks_total_metrics);
538            let task = pin!(task);
539            let _ = select(on_shutdown, task).await;
540        };
541
542        self.spawn_on_rt(task, task_kind)
543    }
544
545    /// This spawns a critical task onto the runtime.
546    /// The given future resolves as soon as the [Shutdown] signal is received.
547    ///
548    /// If this task panics, the [`TaskManager`] is notified.
549    pub fn spawn_critical_task<F>(&self, name: &'static str, fut: F) -> JoinHandle<()>
550    where
551        F: Future<Output = ()> + Send + 'static,
552    {
553        self.spawn_critical_as(name, fut, TaskKind::Default)
554    }
555
556    /// This spawns a critical blocking task onto the runtime.
557    /// The given future resolves as soon as the [Shutdown] signal is received.
558    ///
559    /// If this task panics, the [`TaskManager`] is notified.
560    pub fn spawn_critical_blocking_task<F>(&self, name: &'static str, fut: F) -> JoinHandle<()>
561    where
562        F: Future<Output = ()> + Send + 'static,
563    {
564        self.spawn_critical_as(name, fut, TaskKind::Blocking)
565    }
566
567    /// This spawns a critical task onto the runtime.
568    ///
569    /// If this task panics, the [`TaskManager`] is notified.
570    pub fn spawn_critical_with_shutdown_signal<F>(
571        &self,
572        name: &'static str,
573        f: impl FnOnce(Shutdown) -> F,
574    ) -> JoinHandle<()>
575    where
576        F: Future<Output = ()> + Send + 'static,
577    {
578        let panicked_tasks_tx = self.0.task_events_tx.clone();
579        let on_shutdown = self.0.on_shutdown.clone();
580        let fut = f(on_shutdown);
581
582        // wrap the task in catch unwind
583        let task = std::panic::AssertUnwindSafe(fut)
584            .catch_unwind()
585            .map_err(move |error| {
586                let task_error = PanickedTaskError::new(name, error);
587                error!("{task_error}");
588                let _ = panicked_tasks_tx.send(TaskEvent::Panic(task_error));
589            })
590            .map(drop)
591            .in_current_span();
592
593        self.0.handle.spawn(task)
594    }
595
596    /// This spawns a critical task onto the runtime.
597    ///
598    /// If this task panics, the [`TaskManager`] is notified.
599    /// The [`TaskManager`] will wait until the given future has completed before shutting down.
600    ///
601    /// # Example
602    ///
603    /// ```no_run
604    /// # async fn t(executor: reth_tasks::TaskExecutor) {
605    ///
606    /// executor.spawn_critical_with_graceful_shutdown_signal("grace", async move |shutdown| {
607    ///     // await the shutdown signal
608    ///     let guard = shutdown.await;
609    ///     // do work before exiting the program
610    ///     tokio::time::sleep(std::time::Duration::from_secs(1)).await;
611    ///     // allow graceful shutdown
612    ///     drop(guard);
613    /// });
614    /// # }
615    /// ```
616    pub fn spawn_critical_with_graceful_shutdown_signal<F>(
617        &self,
618        name: &'static str,
619        f: impl FnOnce(GracefulShutdown) -> F,
620    ) -> JoinHandle<()>
621    where
622        F: Future<Output = ()> + Send + 'static,
623    {
624        let panicked_tasks_tx = self.0.task_events_tx.clone();
625        let on_shutdown = GracefulShutdown::new(
626            self.0.on_shutdown.clone(),
627            GracefulShutdownGuard::new(Arc::clone(&self.0.graceful_tasks)),
628        );
629        let fut = f(on_shutdown);
630
631        // wrap the task in catch unwind
632        let task = std::panic::AssertUnwindSafe(fut)
633            .catch_unwind()
634            .map_err(move |error| {
635                let task_error = PanickedTaskError::new(name, error);
636                error!("{task_error}");
637                let _ = panicked_tasks_tx.send(TaskEvent::Panic(task_error));
638            })
639            .map(drop)
640            .in_current_span();
641
642        self.0.handle.spawn(task)
643    }
644
645    /// This spawns a regular task onto the runtime.
646    ///
647    /// The [`TaskManager`] will wait until the given future has completed before shutting down.
648    ///
649    /// # Example
650    ///
651    /// ```no_run
652    /// # async fn t(executor: reth_tasks::TaskExecutor) {
653    ///
654    /// executor.spawn_with_graceful_shutdown_signal(async move |shutdown| {
655    ///     // await the shutdown signal
656    ///     let guard = shutdown.await;
657    ///     // do work before exiting the program
658    ///     tokio::time::sleep(std::time::Duration::from_secs(1)).await;
659    ///     // allow graceful shutdown
660    ///     drop(guard);
661    /// });
662    /// # }
663    /// ```
664    pub fn spawn_with_graceful_shutdown_signal<F>(
665        &self,
666        f: impl FnOnce(GracefulShutdown) -> F,
667    ) -> JoinHandle<()>
668    where
669        F: Future<Output = ()> + Send + 'static,
670    {
671        let on_shutdown = GracefulShutdown::new(
672            self.0.on_shutdown.clone(),
673            GracefulShutdownGuard::new(Arc::clone(&self.0.graceful_tasks)),
674        );
675        let fut = f(on_shutdown);
676
677        self.0.handle.spawn(fut)
678    }
679
680    /// Sends a request to the `TaskManager` to initiate a graceful shutdown.
681    ///
682    /// Caution: This will terminate the entire program.
683    pub fn initiate_graceful_shutdown(
684        &self,
685    ) -> Result<GracefulShutdown, tokio::sync::mpsc::error::SendError<()>> {
686        self.0
687            .task_events_tx
688            .send(TaskEvent::GracefulShutdown)
689            .map_err(|_send_error_with_task_event| tokio::sync::mpsc::error::SendError(()))?;
690
691        Ok(GracefulShutdown::new(
692            self.0.on_shutdown.clone(),
693            GracefulShutdownGuard::new(Arc::clone(&self.0.graceful_tasks)),
694        ))
695    }
696
697    /// Fires the shutdown signal and waits until all graceful tasks complete.
698    pub fn graceful_shutdown(&self) {
699        let _ = self.do_graceful_shutdown(None);
700    }
701
702    /// Fires the shutdown signal and waits until all graceful tasks complete or the timeout
703    /// elapses.
704    ///
705    /// Returns `true` if all tasks completed before the timeout.
706    pub fn graceful_shutdown_with_timeout(&self, timeout: Duration) -> bool {
707        self.do_graceful_shutdown(Some(timeout))
708    }
709
710    fn do_graceful_shutdown(&self, timeout: Option<Duration>) -> bool {
711        let _ = self.0.task_events_tx.send(TaskEvent::GracefulShutdown);
712        let deadline = timeout.map(|t| Instant::now() + t);
713        while self.0.graceful_tasks.load(Ordering::SeqCst) > 0 {
714            if deadline.is_some_and(|d| Instant::now() > d) {
715                debug!("graceful shutdown timed out");
716                return false;
717            }
718            std::thread::yield_now();
719        }
720        debug!("gracefully shut down");
721        true
722    }
723}
724
725// ── RuntimeBuilder ────────────────────────────────────────────────────
726
727/// Builder for constructing a [`Runtime`].
728#[derive(Debug, Clone)]
729pub struct RuntimeBuilder {
730    config: RuntimeConfig,
731}
732
733impl RuntimeBuilder {
734    /// Create a new builder with the given configuration.
735    pub const fn new(config: RuntimeConfig) -> Self {
736        Self { config }
737    }
738
739    /// Build the [`Runtime`].
740    ///
741    /// The [`TaskManager`] is automatically spawned as a background task that monitors
742    /// critical tasks for panics. Use [`Runtime::take_task_manager_handle`] to extract
743    /// the join handle if you need to poll for panic errors.
744    #[tracing::instrument(name = "RuntimeBuilder::build", level = "debug", skip_all)]
745    pub fn build(self) -> Result<Runtime, RuntimeBuildError> {
746        debug!(?self.config, "Building runtime");
747        let config = self.config;
748
749        let (owned_runtime, handle) = match &config.tokio {
750            TokioConfig::Owned { worker_threads, thread_keep_alive, thread_name } => {
751                let mut builder = tokio::runtime::Builder::new_multi_thread();
752                builder
753                    .enable_all()
754                    .thread_keep_alive(*thread_keep_alive)
755                    .thread_name(*thread_name);
756
757                if let Some(threads) = worker_threads {
758                    builder.worker_threads(*threads);
759                }
760
761                let runtime = builder.build()?;
762                let h = runtime.handle().clone();
763                (Some(runtime), h)
764            }
765            TokioConfig::ExistingHandle(h) => (None, h.clone()),
766        };
767
768        let (task_manager, on_shutdown, task_events_tx, graceful_tasks) =
769            TaskManager::new_parts(handle.clone());
770
771        #[cfg(feature = "rayon")]
772        let (
773            cpu_pool,
774            rpc_pool,
775            storage_pool,
776            blocking_guard,
777            proof_storage_worker_pool,
778            proof_account_worker_pool,
779            prewarming_pool,
780        ) = {
781            let default_threads = config.rayon.default_thread_count();
782            let rpc_threads = config.rayon.rpc_threads.unwrap_or(default_threads);
783
784            let cpu_pool = rayon::ThreadPoolBuilder::new()
785                .num_threads(default_threads)
786                .thread_name(|i| format!("cpu-{i:02}"))
787                .build()?;
788
789            let rpc_raw = rayon::ThreadPoolBuilder::new()
790                .num_threads(rpc_threads)
791                .thread_name(|i| format!("rpc-{i:02}"))
792                .build()?;
793            let rpc_pool = BlockingTaskPool::new(rpc_raw);
794
795            let storage_threads =
796                config.rayon.storage_threads.unwrap_or(DEFAULT_STORAGE_POOL_THREADS);
797            let storage_pool = rayon::ThreadPoolBuilder::new()
798                .num_threads(storage_threads)
799                .thread_name(|i| format!("storage-{i:02}"))
800                .build()?;
801
802            let blocking_guard = BlockingTaskGuard::new(config.rayon.max_blocking_tasks);
803
804            let proof_storage_worker_threads =
805                config.rayon.proof_storage_worker_threads.unwrap_or(default_threads * 2);
806            let proof_storage_worker_pool = WorkerPool::from_builder(
807                rayon::ThreadPoolBuilder::new()
808                    .num_threads(proof_storage_worker_threads)
809                    .thread_name(|i| format!("proof-strg-{i:02}")),
810            )?;
811
812            let proof_account_worker_threads =
813                config.rayon.proof_account_worker_threads.unwrap_or(default_threads * 2);
814            let proof_account_worker_pool = WorkerPool::from_builder(
815                rayon::ThreadPoolBuilder::new()
816                    .num_threads(proof_account_worker_threads)
817                    .thread_name(|i| format!("proof-acct-{i:02}")),
818            )?;
819
820            let prewarming_threads = config.rayon.prewarming_threads.unwrap_or(default_threads);
821            let prewarming_pool = WorkerPool::from_builder(
822                rayon::ThreadPoolBuilder::new()
823                    .num_threads(prewarming_threads)
824                    .thread_name(|i| format!("prewarm-{i:02}")),
825            )?;
826
827            debug!(
828                default_threads,
829                rpc_threads,
830                storage_threads,
831                proof_storage_worker_threads,
832                proof_account_worker_threads,
833                prewarming_threads,
834                max_blocking_tasks = config.rayon.max_blocking_tasks,
835                "Initialized rayon thread pools"
836            );
837
838            (
839                cpu_pool,
840                rpc_pool,
841                storage_pool,
842                blocking_guard,
843                proof_storage_worker_pool,
844                proof_account_worker_pool,
845                prewarming_pool,
846            )
847        };
848
849        let task_manager_handle = handle.spawn(async move {
850            let result = task_manager.await;
851            if let Err(ref err) = result {
852                debug!("{err}");
853            }
854            result
855        });
856
857        let inner = RuntimeInner {
858            _tokio_runtime: owned_runtime,
859            handle,
860            on_shutdown,
861            task_events_tx,
862            metrics: Default::default(),
863            graceful_tasks,
864            #[cfg(feature = "rayon")]
865            cpu_pool,
866            #[cfg(feature = "rayon")]
867            rpc_pool,
868            #[cfg(feature = "rayon")]
869            storage_pool,
870            #[cfg(feature = "rayon")]
871            blocking_guard,
872            #[cfg(feature = "rayon")]
873            proof_storage_worker_pool,
874            #[cfg(feature = "rayon")]
875            proof_account_worker_pool,
876            #[cfg(feature = "rayon")]
877            prewarming_pool,
878            worker_map: WorkerMap::new(),
879            task_manager_handle: Mutex::new(Some(task_manager_handle)),
880        };
881
882        Ok(Runtime(Arc::new(inner)))
883    }
884}
885
886#[cfg(test)]
887mod tests {
888    use super::*;
889
890    #[test]
891    fn test_runtime_config_default() {
892        let config = RuntimeConfig::default();
893        assert!(matches!(config.tokio, TokioConfig::Owned { .. }));
894    }
895
896    #[test]
897    fn test_runtime_config_existing_handle() {
898        let rt = TokioRuntime::new().unwrap();
899        let config =
900            Runtime::test_config().with_tokio(TokioConfig::existing_handle(rt.handle().clone()));
901        assert!(matches!(config.tokio, TokioConfig::ExistingHandle(_)));
902    }
903
904    #[cfg(feature = "rayon")]
905    #[test]
906    fn test_rayon_config_thread_count() {
907        let config = RayonConfig::default();
908        let count = config.default_thread_count();
909        assert!(count >= 1);
910    }
911
912    #[test]
913    fn test_runtime_builder() {
914        let rt = TokioRuntime::new().unwrap();
915        let config =
916            Runtime::test_config().with_tokio(TokioConfig::existing_handle(rt.handle().clone()));
917        let runtime = RuntimeBuilder::new(config).build().unwrap();
918        let _ = runtime.handle();
919    }
920}