Skip to main content

reth_tasks/
runtime.rs

1//! Centralized management of async and parallel execution.
2//!
3//! This module provides [`Runtime`], a cheaply cloneable handle that manages:
4//! - Tokio runtime (either owned or attached)
5//! - Task spawning with shutdown awareness and panic monitoring
6//! - Dedicated rayon thread pools for different workloads (with `rayon` feature)
7//! - [`BlockingTaskGuard`] for rate-limiting expensive operations (with `rayon` feature)
8
9#[cfg(feature = "rayon")]
10use crate::pool::{build_pool_with_panic_handler, BlockingTaskGuard, BlockingTaskPool, WorkerPool};
11use crate::{
12    metrics::{IncCounterOnDrop, TaskExecutorMetrics},
13    shutdown::{GracefulShutdown, GracefulShutdownGuard, Shutdown},
14    worker_map::WorkerMap,
15    PanickedTaskError, TaskEvent, TaskManager,
16};
17use futures_util::{future::select, Future, FutureExt, TryFutureExt};
18#[cfg(feature = "rayon")]
19use std::{num::NonZeroUsize, thread::available_parallelism};
20use std::{
21    pin::pin,
22    sync::{
23        atomic::{AtomicUsize, Ordering},
24        Arc, Mutex,
25    },
26    time::{Duration, Instant},
27};
28use tokio::{runtime::Handle, sync::mpsc::UnboundedSender, task::JoinHandle};
29use tracing::{debug, error};
30use tracing_futures::Instrument;
31
32use tokio::runtime::Runtime as TokioRuntime;
33
34/// Default thread keep-alive duration for the tokio runtime.
35pub const DEFAULT_THREAD_KEEP_ALIVE: Duration = Duration::from_secs(15);
36
37/// Default reserved CPU cores for OS and other processes.
38pub const DEFAULT_RESERVED_CPU_CORES: usize = 2;
39
40/// Default number of threads for the storage I/O pool.
41pub const DEFAULT_STORAGE_POOL_THREADS: usize = 16;
42
43/// Default maximum number of concurrent blocking tasks (for RPC tracing guard).
44pub const DEFAULT_MAX_BLOCKING_TASKS: usize = 512;
45
46/// Configuration for the tokio runtime.
47#[derive(Debug, Clone)]
48pub enum TokioConfig {
49    /// Build and own a new tokio runtime.
50    Owned {
51        /// Number of worker threads. If `None`, uses tokio's default (number of CPU cores).
52        worker_threads: Option<usize>,
53        /// How long to keep worker threads alive when idle.
54        thread_keep_alive: Duration,
55        /// Thread name prefix.
56        thread_name: &'static str,
57    },
58    /// Attach to an existing tokio runtime handle.
59    ExistingHandle(Handle),
60}
61
62impl Default for TokioConfig {
63    fn default() -> Self {
64        Self::Owned {
65            worker_threads: None,
66            thread_keep_alive: DEFAULT_THREAD_KEEP_ALIVE,
67            thread_name: "tokio-rt",
68        }
69    }
70}
71
72impl TokioConfig {
73    /// Create a config that attaches to an existing runtime handle.
74    pub const fn existing_handle(handle: Handle) -> Self {
75        Self::ExistingHandle(handle)
76    }
77
78    /// Create a config for an owned runtime with the specified number of worker threads.
79    pub const fn with_worker_threads(worker_threads: usize) -> Self {
80        Self::Owned {
81            worker_threads: Some(worker_threads),
82            thread_keep_alive: DEFAULT_THREAD_KEEP_ALIVE,
83            thread_name: "tokio-rt",
84        }
85    }
86}
87
88/// Configuration for the rayon thread pools.
89#[derive(Debug, Clone)]
90#[cfg(feature = "rayon")]
91pub struct RayonConfig {
92    /// Number of threads for the general CPU pool.
93    /// If `None`, derived from available parallelism minus reserved cores.
94    pub cpu_threads: Option<usize>,
95    /// Number of CPU cores to reserve for OS and other processes.
96    pub reserved_cpu_cores: usize,
97    /// Number of threads for the RPC blocking pool (trace calls, `eth_getProof`, etc.).
98    /// If `None`, uses the same as `cpu_threads`.
99    pub rpc_threads: Option<usize>,
100    /// Number of threads for the storage I/O pool (static file, `RocksDB` writes in
101    /// `save_blocks`). If `None`, uses [`DEFAULT_STORAGE_POOL_THREADS`].
102    pub storage_threads: Option<usize>,
103    /// Maximum number of concurrent blocking tasks for the RPC guard semaphore.
104    pub max_blocking_tasks: usize,
105    /// Number of threads for the proof storage worker pool (trie storage proof workers).
106    /// If `None`, derived from available parallelism.
107    pub proof_storage_worker_threads: Option<usize>,
108    /// Number of threads for the proof account worker pool (trie account proof workers).
109    /// If `None`, derived from available parallelism.
110    pub proof_account_worker_threads: Option<usize>,
111    /// Number of threads for the prewarming pool (execution prewarming workers).
112    /// If `None`, derived from available parallelism.
113    pub prewarming_threads: Option<usize>,
114}
115
116#[cfg(feature = "rayon")]
117impl Default for RayonConfig {
118    fn default() -> Self {
119        Self {
120            cpu_threads: None,
121            reserved_cpu_cores: DEFAULT_RESERVED_CPU_CORES,
122            rpc_threads: None,
123            storage_threads: None,
124            max_blocking_tasks: DEFAULT_MAX_BLOCKING_TASKS,
125            proof_storage_worker_threads: None,
126            proof_account_worker_threads: None,
127            prewarming_threads: None,
128        }
129    }
130}
131
132#[cfg(feature = "rayon")]
133impl RayonConfig {
134    /// Set the number of reserved CPU cores.
135    pub const fn with_reserved_cpu_cores(mut self, reserved_cpu_cores: usize) -> Self {
136        self.reserved_cpu_cores = reserved_cpu_cores;
137        self
138    }
139
140    /// Set the maximum number of concurrent blocking tasks.
141    pub const fn with_max_blocking_tasks(mut self, max_blocking_tasks: usize) -> Self {
142        self.max_blocking_tasks = max_blocking_tasks;
143        self
144    }
145
146    /// Set the number of threads for the RPC blocking pool.
147    pub const fn with_rpc_threads(mut self, rpc_threads: usize) -> Self {
148        self.rpc_threads = Some(rpc_threads);
149        self
150    }
151
152    /// Set the number of threads for the storage I/O pool.
153    pub const fn with_storage_threads(mut self, storage_threads: usize) -> Self {
154        self.storage_threads = Some(storage_threads);
155        self
156    }
157
158    /// Set the number of threads for the proof storage worker pool.
159    pub const fn with_proof_storage_worker_threads(
160        mut self,
161        proof_storage_worker_threads: usize,
162    ) -> Self {
163        self.proof_storage_worker_threads = Some(proof_storage_worker_threads);
164        self
165    }
166
167    /// Set the number of threads for the proof account worker pool.
168    pub const fn with_proof_account_worker_threads(
169        mut self,
170        proof_account_worker_threads: usize,
171    ) -> Self {
172        self.proof_account_worker_threads = Some(proof_account_worker_threads);
173        self
174    }
175
176    /// Set the number of threads for the prewarming pool.
177    pub const fn with_prewarming_threads(mut self, prewarming_threads: usize) -> Self {
178        self.prewarming_threads = Some(prewarming_threads);
179        self
180    }
181
182    /// Compute the default number of threads based on available parallelism.
183    fn default_thread_count(&self) -> usize {
184        // TODO: reserved_cpu_cores is currently ignored because subtracting from thread pool
185        // sizes doesn't actually reserve CPU cores for other processes.
186        let _ = self.reserved_cpu_cores;
187        self.cpu_threads.unwrap_or_else(|| available_parallelism().map_or(1, NonZeroUsize::get))
188    }
189}
190
191/// Configuration for building a [`Runtime`].
192#[derive(Debug, Clone, Default)]
193pub struct RuntimeConfig {
194    /// Tokio runtime configuration.
195    pub tokio: TokioConfig,
196    /// Rayon thread pool configuration.
197    #[cfg(feature = "rayon")]
198    pub rayon: RayonConfig,
199}
200
201impl RuntimeConfig {
202    /// Set the tokio configuration.
203    pub fn with_tokio(mut self, tokio: TokioConfig) -> Self {
204        self.tokio = tokio;
205        self
206    }
207
208    /// Set the rayon configuration.
209    #[cfg(feature = "rayon")]
210    pub const fn with_rayon(mut self, rayon: RayonConfig) -> Self {
211        self.rayon = rayon;
212        self
213    }
214}
215
216/// Error returned when [`RuntimeBuilder::build`] fails.
217#[derive(Debug, thiserror::Error)]
218pub enum RuntimeBuildError {
219    /// Failed to build the tokio runtime.
220    #[error("Failed to build tokio runtime: {0}")]
221    TokioBuild(#[from] std::io::Error),
222    /// Failed to build a rayon thread pool.
223    #[cfg(feature = "rayon")]
224    #[error("Failed to build rayon thread pool: {0}")]
225    RayonBuild(#[from] rayon::ThreadPoolBuildError),
226}
227
228// ── RuntimeInner ──────────────────────────────────────────────────────
229
230struct RuntimeInner {
231    /// Owned tokio runtime, if we built one. Kept alive via the `Arc<RuntimeInner>`.
232    _tokio_runtime: Option<TokioRuntime>,
233    /// Handle to the tokio runtime.
234    handle: Handle,
235    /// Receiver of the shutdown signal.
236    on_shutdown: Shutdown,
237    /// Sender half for sending task events to the [`TaskManager`].
238    task_events_tx: UnboundedSender<TaskEvent>,
239    /// Task executor metrics.
240    metrics: TaskExecutorMetrics,
241    /// How many [`GracefulShutdown`] tasks are currently active.
242    graceful_tasks: Arc<AtomicUsize>,
243    /// General-purpose rayon CPU pool.
244    #[cfg(feature = "rayon")]
245    cpu_pool: rayon::ThreadPool,
246    /// RPC blocking pool.
247    #[cfg(feature = "rayon")]
248    rpc_pool: BlockingTaskPool,
249    /// Storage I/O pool.
250    #[cfg(feature = "rayon")]
251    storage_pool: rayon::ThreadPool,
252    /// Rate limiter for expensive RPC operations.
253    #[cfg(feature = "rayon")]
254    blocking_guard: BlockingTaskGuard,
255    /// Proof storage worker pool (trie storage proof computation).
256    #[cfg(feature = "rayon")]
257    proof_storage_worker_pool: WorkerPool,
258    /// Proof account worker pool (trie account proof computation).
259    #[cfg(feature = "rayon")]
260    proof_account_worker_pool: WorkerPool,
261    /// Prewarming pool (execution prewarming workers).
262    #[cfg(feature = "rayon")]
263    prewarming_pool: WorkerPool,
264    /// Named single-thread worker map. Each unique name gets a dedicated OS thread
265    /// that is reused across all tasks submitted under that name.
266    worker_map: WorkerMap,
267    /// Handle to the spawned [`TaskManager`] background task.
268    /// The task monitors critical tasks for panics and fires the shutdown signal.
269    /// Can be taken via [`Runtime::take_task_manager_handle`] to poll for panic errors.
270    task_manager_handle: Mutex<Option<JoinHandle<Result<(), PanickedTaskError>>>>,
271}
272
273// ── Runtime ───────────────────────────────────────────────────────────
274
275/// A cheaply cloneable handle to the runtime resources.
276///
277/// Wraps an `Arc<RuntimeInner>` and provides access to:
278/// - The tokio [`Handle`]
279/// - Task spawning with shutdown awareness and panic monitoring
280/// - Rayon thread pools (with `rayon` feature)
281#[derive(Clone)]
282pub struct Runtime(Arc<RuntimeInner>);
283
284impl std::fmt::Debug for Runtime {
285    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
286        f.debug_struct("Runtime").field("handle", &self.0.handle).finish()
287    }
288}
289
290// ── Pool accessors ────────────────────────────────────────────────────
291
292impl Runtime {
293    /// Takes the [`TaskManager`] handle out of this runtime, if one is stored.
294    ///
295    /// The handle resolves with `Err(PanickedTaskError)` if a critical task panicked,
296    /// or `Ok(())` if shutdown was requested. If not taken, the background task still
297    /// runs and logs panics at `debug!` level.
298    pub fn take_task_manager_handle(&self) -> Option<JoinHandle<Result<(), PanickedTaskError>>> {
299        self.0.task_manager_handle.lock().unwrap().take()
300    }
301
302    /// Returns the tokio runtime [`Handle`].
303    pub fn handle(&self) -> &Handle {
304        &self.0.handle
305    }
306
307    /// Get the general-purpose rayon CPU thread pool.
308    #[cfg(feature = "rayon")]
309    pub fn cpu_pool(&self) -> &rayon::ThreadPool {
310        &self.0.cpu_pool
311    }
312
313    /// Get the RPC blocking task pool.
314    #[cfg(feature = "rayon")]
315    pub fn rpc_pool(&self) -> &BlockingTaskPool {
316        &self.0.rpc_pool
317    }
318
319    /// Get the storage I/O pool.
320    #[cfg(feature = "rayon")]
321    pub fn storage_pool(&self) -> &rayon::ThreadPool {
322        &self.0.storage_pool
323    }
324
325    /// Get a clone of the [`BlockingTaskGuard`].
326    #[cfg(feature = "rayon")]
327    pub fn blocking_guard(&self) -> BlockingTaskGuard {
328        self.0.blocking_guard.clone()
329    }
330
331    /// Get the proof storage worker pool.
332    #[cfg(feature = "rayon")]
333    pub fn proof_storage_worker_pool(&self) -> &WorkerPool {
334        &self.0.proof_storage_worker_pool
335    }
336
337    /// Get the proof account worker pool.
338    #[cfg(feature = "rayon")]
339    pub fn proof_account_worker_pool(&self) -> &WorkerPool {
340        &self.0.proof_account_worker_pool
341    }
342
343    /// Get the prewarming pool.
344    #[cfg(feature = "rayon")]
345    pub fn prewarming_pool(&self) -> &WorkerPool {
346        &self.0.prewarming_pool
347    }
348}
349
350// ── Test helpers ──────────────────────────────────────────────────────
351
352impl Runtime {
353    /// Creates a lightweight [`Runtime`] for tests with minimal thread pools.
354    ///
355    /// If called from within a tokio runtime (e.g. `#[tokio::test]`), attaches to the existing
356    /// handle to avoid shutdown panics when the test runtime is dropped.
357    pub fn test() -> Self {
358        let config = match Handle::try_current() {
359            Ok(handle) => Self::test_config().with_tokio(TokioConfig::existing_handle(handle)),
360            Err(_) => Self::test_config(),
361        };
362        RuntimeBuilder::new(config).build().expect("failed to build test Runtime")
363    }
364
365    const fn test_config() -> RuntimeConfig {
366        RuntimeConfig {
367            tokio: TokioConfig::Owned {
368                worker_threads: Some(2),
369                thread_keep_alive: DEFAULT_THREAD_KEEP_ALIVE,
370                thread_name: "tokio-test",
371            },
372            #[cfg(feature = "rayon")]
373            rayon: RayonConfig {
374                cpu_threads: Some(2),
375                reserved_cpu_cores: 0,
376                rpc_threads: Some(2),
377                storage_threads: Some(2),
378                max_blocking_tasks: 16,
379                proof_storage_worker_threads: Some(2),
380                proof_account_worker_threads: Some(2),
381                prewarming_threads: Some(2),
382            },
383        }
384    }
385}
386
387// ── Spawn methods ─────────────────────────────────────────────────────
388
389/// Determines how a task is spawned.
390enum TaskKind {
391    /// Spawn the task to the default executor [`Handle::spawn`].
392    Default,
393    /// Spawn the task to the blocking executor [`Handle::spawn_blocking`].
394    Blocking,
395}
396
397impl Runtime {
398    /// Returns the receiver of the shutdown signal.
399    pub fn on_shutdown_signal(&self) -> &Shutdown {
400        &self.0.on_shutdown
401    }
402
403    /// Spawns a future on the tokio runtime depending on the [`TaskKind`].
404    fn spawn_on_rt<F>(&self, fut: F, task_kind: TaskKind) -> JoinHandle<()>
405    where
406        F: Future<Output = ()> + Send + 'static,
407    {
408        match task_kind {
409            TaskKind::Default => self.0.handle.spawn(fut),
410            TaskKind::Blocking => {
411                let handle = self.0.handle.clone();
412                self.0.handle.spawn_blocking(move || handle.block_on(fut))
413            }
414        }
415    }
416
417    /// Spawns a regular task depending on the given [`TaskKind`].
418    fn spawn_task_as<F>(&self, fut: F, task_kind: TaskKind) -> JoinHandle<()>
419    where
420        F: Future<Output = ()> + Send + 'static,
421    {
422        match task_kind {
423            TaskKind::Default => self.0.metrics.inc_regular_tasks(),
424            TaskKind::Blocking => self.0.metrics.inc_regular_blocking_tasks(),
425        }
426        let on_shutdown = self.0.on_shutdown.clone();
427
428        let finished_counter = match task_kind {
429            TaskKind::Default => self.0.metrics.finished_regular_tasks_total.clone(),
430            TaskKind::Blocking => self.0.metrics.finished_regular_blocking_tasks_total.clone(),
431        };
432
433        let task = {
434            async move {
435                let _inc_counter_on_drop = IncCounterOnDrop::new(finished_counter);
436                let fut = pin!(fut);
437                let _ = select(on_shutdown, fut).await;
438            }
439        }
440        .in_current_span();
441
442        self.spawn_on_rt(task, task_kind)
443    }
444
445    /// Spawns the task onto the runtime.
446    /// The given future resolves as soon as the [Shutdown] signal is received.
447    ///
448    /// See also [`Handle::spawn`].
449    pub fn spawn_task<F>(&self, fut: F) -> JoinHandle<()>
450    where
451        F: Future<Output = ()> + Send + 'static,
452    {
453        self.spawn_task_as(fut, TaskKind::Default)
454    }
455
456    /// Spawns a blocking task onto the runtime.
457    /// The given future resolves as soon as the [Shutdown] signal is received.
458    ///
459    /// See also [`Handle::spawn_blocking`].
460    pub fn spawn_blocking_task<F>(&self, fut: F) -> JoinHandle<()>
461    where
462        F: Future<Output = ()> + Send + 'static,
463    {
464        self.spawn_task_as(fut, TaskKind::Blocking)
465    }
466
467    /// Spawns a blocking closure directly on the tokio runtime, bypassing shutdown
468    /// awareness. Useful for raw CPU-bound work.
469    pub fn spawn_blocking<F, R>(&self, func: F) -> JoinHandle<R>
470    where
471        F: FnOnce() -> R + Send + 'static,
472        R: Send + 'static,
473    {
474        self.0.handle.spawn_blocking(func)
475    }
476
477    /// Moves the given value to a dedicated background thread for deallocation.
478    ///
479    /// This is useful when dropping a value is expensive (e.g. large nested collections)
480    /// and should not block the current task. Uses a persistent named thread (`"drop"`)
481    /// to avoid thread creation overhead on hot paths.
482    pub fn spawn_drop<T: Send + 'static>(&self, value: T) {
483        self.spawn_blocking_named("drop", move || drop(value));
484    }
485
486    /// Spawns a blocking closure on a dedicated, named OS thread.
487    ///
488    /// Unlike [`spawn_blocking`](Self::spawn_blocking) which uses tokio's blocking thread pool,
489    /// this reuses the same OS thread for all tasks submitted under the same `name`. The thread
490    /// is created lazily on first use and its OS thread name is set to `name`.
491    ///
492    /// This is useful for tasks that benefit from running on a stable thread, e.g. for
493    /// thread-local state reuse or to avoid thread creation overhead on hot paths.
494    ///
495    /// Returns a [`LazyHandle`](crate::LazyHandle) handle that resolves on first access and caches
496    /// the result.
497    pub fn spawn_blocking_named<F, R>(&self, name: &'static str, func: F) -> crate::LazyHandle<R>
498    where
499        F: FnOnce() -> R + Send + 'static,
500        R: Send + 'static,
501    {
502        crate::LazyHandle::new(self.0.worker_map.spawn_on(name, func))
503    }
504
505    /// Spawns the task onto the runtime.
506    /// The given future resolves as soon as the [Shutdown] signal is received.
507    ///
508    /// See also [`Handle::spawn`].
509    pub fn spawn_with_signal<F>(&self, f: impl FnOnce(Shutdown) -> F) -> JoinHandle<()>
510    where
511        F: Future<Output = ()> + Send + 'static,
512    {
513        let on_shutdown = self.0.on_shutdown.clone();
514        let fut = f(on_shutdown);
515        let task = fut.in_current_span();
516        self.0.handle.spawn(task)
517    }
518
519    /// Spawns a critical task depending on the given [`TaskKind`].
520    fn spawn_critical_as<F>(
521        &self,
522        name: &'static str,
523        fut: F,
524        task_kind: TaskKind,
525    ) -> JoinHandle<()>
526    where
527        F: Future<Output = ()> + Send + 'static,
528    {
529        self.0.metrics.inc_critical_tasks();
530        let panicked_tasks_tx = self.0.task_events_tx.clone();
531        let on_shutdown = self.0.on_shutdown.clone();
532
533        // wrap the task in catch unwind
534        let task = std::panic::AssertUnwindSafe(fut)
535            .catch_unwind()
536            .map_err(move |error| {
537                let task_error = PanickedTaskError::new(name, error);
538                error!("{task_error}");
539                let _ = panicked_tasks_tx.send(TaskEvent::Panic(task_error));
540            })
541            .in_current_span();
542
543        let finished_critical_tasks_total_metrics =
544            self.0.metrics.finished_critical_tasks_total.clone();
545        let task = async move {
546            let _inc_counter_on_drop = IncCounterOnDrop::new(finished_critical_tasks_total_metrics);
547            let task = pin!(task);
548            let _ = select(on_shutdown, task).await;
549        };
550
551        self.spawn_on_rt(task, task_kind)
552    }
553
554    /// This spawns a critical task onto the runtime.
555    /// The given future resolves as soon as the [Shutdown] signal is received.
556    ///
557    /// If this task panics, the [`TaskManager`] is notified.
558    pub fn spawn_critical_task<F>(&self, name: &'static str, fut: F) -> JoinHandle<()>
559    where
560        F: Future<Output = ()> + Send + 'static,
561    {
562        self.spawn_critical_as(name, fut, TaskKind::Default)
563    }
564
565    /// This spawns a critical blocking task onto the runtime.
566    /// The given future resolves as soon as the [Shutdown] signal is received.
567    ///
568    /// If this task panics, the [`TaskManager`] is notified.
569    pub fn spawn_critical_blocking_task<F>(&self, name: &'static str, fut: F) -> JoinHandle<()>
570    where
571        F: Future<Output = ()> + Send + 'static,
572    {
573        self.spawn_critical_as(name, fut, TaskKind::Blocking)
574    }
575
576    /// This spawns a critical task onto the runtime.
577    ///
578    /// If this task panics, the [`TaskManager`] is notified.
579    /// The [`TaskManager`] will wait until the given future has completed before shutting down.
580    ///
581    /// # Example
582    ///
583    /// ```no_run
584    /// # async fn t(executor: reth_tasks::TaskExecutor) {
585    ///
586    /// executor.spawn_critical_with_graceful_shutdown_signal("grace", async move |shutdown| {
587    ///     // await the shutdown signal
588    ///     let guard = shutdown.await;
589    ///     // do work before exiting the program
590    ///     tokio::time::sleep(std::time::Duration::from_secs(1)).await;
591    ///     // allow graceful shutdown
592    ///     drop(guard);
593    /// });
594    /// # }
595    /// ```
596    pub fn spawn_critical_with_graceful_shutdown_signal<F>(
597        &self,
598        name: &'static str,
599        f: impl FnOnce(GracefulShutdown) -> F,
600    ) -> JoinHandle<()>
601    where
602        F: Future<Output = ()> + Send + 'static,
603    {
604        let panicked_tasks_tx = self.0.task_events_tx.clone();
605        let on_shutdown = GracefulShutdown::new(
606            self.0.on_shutdown.clone(),
607            GracefulShutdownGuard::new(Arc::clone(&self.0.graceful_tasks)),
608        );
609        let fut = f(on_shutdown);
610
611        // wrap the task in catch unwind
612        let task = std::panic::AssertUnwindSafe(fut)
613            .catch_unwind()
614            .map_err(move |error| {
615                let task_error = PanickedTaskError::new(name, error);
616                error!("{task_error}");
617                let _ = panicked_tasks_tx.send(TaskEvent::Panic(task_error));
618            })
619            .map(drop)
620            .in_current_span();
621
622        self.0.handle.spawn(task)
623    }
624
625    /// This spawns a regular task onto the runtime.
626    ///
627    /// The [`TaskManager`] will wait until the given future has completed before shutting down.
628    ///
629    /// # Example
630    ///
631    /// ```no_run
632    /// # async fn t(executor: reth_tasks::TaskExecutor) {
633    ///
634    /// executor.spawn_with_graceful_shutdown_signal(async move |shutdown| {
635    ///     // await the shutdown signal
636    ///     let guard = shutdown.await;
637    ///     // do work before exiting the program
638    ///     tokio::time::sleep(std::time::Duration::from_secs(1)).await;
639    ///     // allow graceful shutdown
640    ///     drop(guard);
641    /// });
642    /// # }
643    /// ```
644    pub fn spawn_with_graceful_shutdown_signal<F>(
645        &self,
646        f: impl FnOnce(GracefulShutdown) -> F,
647    ) -> JoinHandle<()>
648    where
649        F: Future<Output = ()> + Send + 'static,
650    {
651        let on_shutdown = GracefulShutdown::new(
652            self.0.on_shutdown.clone(),
653            GracefulShutdownGuard::new(Arc::clone(&self.0.graceful_tasks)),
654        );
655        let fut = f(on_shutdown);
656
657        self.0.handle.spawn(fut)
658    }
659
660    /// Sends a request to the `TaskManager` to initiate a graceful shutdown.
661    ///
662    /// Caution: This will terminate the entire program.
663    pub fn initiate_graceful_shutdown(
664        &self,
665    ) -> Result<GracefulShutdown, tokio::sync::mpsc::error::SendError<()>> {
666        self.0
667            .task_events_tx
668            .send(TaskEvent::GracefulShutdown)
669            .map_err(|_send_error_with_task_event| tokio::sync::mpsc::error::SendError(()))?;
670
671        Ok(GracefulShutdown::new(
672            self.0.on_shutdown.clone(),
673            GracefulShutdownGuard::new(Arc::clone(&self.0.graceful_tasks)),
674        ))
675    }
676
677    /// Fires the shutdown signal and waits until all graceful tasks complete.
678    pub fn graceful_shutdown(&self) {
679        let _ = self.do_graceful_shutdown(None);
680    }
681
682    /// Fires the shutdown signal and waits until all graceful tasks complete or the timeout
683    /// elapses.
684    ///
685    /// Returns `true` if all tasks completed before the timeout.
686    pub fn graceful_shutdown_with_timeout(&self, timeout: Duration) -> bool {
687        self.do_graceful_shutdown(Some(timeout))
688    }
689
690    fn do_graceful_shutdown(&self, timeout: Option<Duration>) -> bool {
691        let _ = self.0.task_events_tx.send(TaskEvent::GracefulShutdown);
692        let deadline = timeout.map(|t| Instant::now() + t);
693        while self.0.graceful_tasks.load(Ordering::SeqCst) > 0 {
694            if deadline.is_some_and(|d| Instant::now() > d) {
695                debug!("graceful shutdown timed out");
696                return false;
697            }
698            std::thread::yield_now();
699        }
700        debug!("gracefully shut down");
701        true
702    }
703}
704
705// ── RuntimeBuilder ────────────────────────────────────────────────────
706
707/// Builder for constructing a [`Runtime`].
708#[derive(Debug, Clone)]
709pub struct RuntimeBuilder {
710    config: RuntimeConfig,
711}
712
713impl RuntimeBuilder {
714    /// Create a new builder with the given configuration.
715    pub const fn new(config: RuntimeConfig) -> Self {
716        Self { config }
717    }
718
719    /// Build the [`Runtime`].
720    ///
721    /// The [`TaskManager`] is automatically spawned as a background task that monitors
722    /// critical tasks for panics. Use [`Runtime::take_task_manager_handle`] to extract
723    /// the join handle if you need to poll for panic errors.
724    #[tracing::instrument(name = "RuntimeBuilder::build", level = "debug", skip_all)]
725    pub fn build(self) -> Result<Runtime, RuntimeBuildError> {
726        debug!(?self.config, "Building runtime");
727        let config = self.config;
728
729        let (owned_runtime, handle) = match &config.tokio {
730            TokioConfig::Owned { worker_threads, thread_keep_alive, thread_name } => {
731                let mut builder = tokio::runtime::Builder::new_multi_thread();
732                builder
733                    .enable_all()
734                    .thread_keep_alive(*thread_keep_alive)
735                    .thread_name(*thread_name);
736
737                if let Some(threads) = worker_threads {
738                    builder.worker_threads(*threads);
739                }
740
741                let runtime = builder.build()?;
742                let h = runtime.handle().clone();
743                (Some(runtime), h)
744            }
745            TokioConfig::ExistingHandle(h) => (None, h.clone()),
746        };
747
748        let (task_manager, on_shutdown, task_events_tx, graceful_tasks) =
749            TaskManager::new_parts(handle.clone());
750
751        #[cfg(feature = "rayon")]
752        let (
753            cpu_pool,
754            rpc_pool,
755            storage_pool,
756            blocking_guard,
757            proof_storage_worker_pool,
758            proof_account_worker_pool,
759            prewarming_pool,
760        ) = {
761            let default_threads = config.rayon.default_thread_count();
762            let rpc_threads = config.rayon.rpc_threads.unwrap_or(default_threads);
763
764            let cpu_pool = build_pool_with_panic_handler(
765                rayon::ThreadPoolBuilder::new()
766                    .num_threads(default_threads)
767                    .thread_name(|i| format!("cpu-{i:02}")),
768            )?;
769
770            let rpc_raw = build_pool_with_panic_handler(
771                rayon::ThreadPoolBuilder::new()
772                    .num_threads(rpc_threads)
773                    .thread_name(|i| format!("rpc-{i:02}")),
774            )?;
775            let rpc_pool = BlockingTaskPool::new(rpc_raw);
776
777            let storage_threads =
778                config.rayon.storage_threads.unwrap_or(DEFAULT_STORAGE_POOL_THREADS);
779            let storage_pool = build_pool_with_panic_handler(
780                rayon::ThreadPoolBuilder::new()
781                    .num_threads(storage_threads)
782                    .thread_name(|i| format!("storage-{i:02}")),
783            )?;
784
785            let blocking_guard = BlockingTaskGuard::new(config.rayon.max_blocking_tasks);
786
787            let proof_storage_worker_threads =
788                config.rayon.proof_storage_worker_threads.unwrap_or(default_threads * 2);
789            let proof_storage_worker_pool = WorkerPool::from_builder(
790                rayon::ThreadPoolBuilder::new()
791                    .num_threads(proof_storage_worker_threads)
792                    .thread_name(|i| format!("proof-strg-{i:02}")),
793            )?;
794
795            let proof_account_worker_threads =
796                config.rayon.proof_account_worker_threads.unwrap_or(default_threads * 2);
797            let proof_account_worker_pool = WorkerPool::from_builder(
798                rayon::ThreadPoolBuilder::new()
799                    .num_threads(proof_account_worker_threads)
800                    .thread_name(|i| format!("proof-acct-{i:02}")),
801            )?;
802
803            let prewarming_threads = config.rayon.prewarming_threads.unwrap_or(default_threads);
804            let prewarming_pool = WorkerPool::from_builder(
805                rayon::ThreadPoolBuilder::new()
806                    .num_threads(prewarming_threads)
807                    .thread_name(|i| format!("prewarm-{i:02}")),
808            )?;
809
810            debug!(
811                default_threads,
812                rpc_threads,
813                storage_threads,
814                proof_storage_worker_threads,
815                proof_account_worker_threads,
816                prewarming_threads,
817                max_blocking_tasks = config.rayon.max_blocking_tasks,
818                "Initialized rayon thread pools"
819            );
820
821            (
822                cpu_pool,
823                rpc_pool,
824                storage_pool,
825                blocking_guard,
826                proof_storage_worker_pool,
827                proof_account_worker_pool,
828                prewarming_pool,
829            )
830        };
831
832        let task_manager_handle = handle.spawn(async move {
833            let result = task_manager.await;
834            if let Err(ref err) = result {
835                debug!("{err}");
836            }
837            result
838        });
839
840        let inner = RuntimeInner {
841            _tokio_runtime: owned_runtime,
842            handle,
843            on_shutdown,
844            task_events_tx,
845            metrics: Default::default(),
846            graceful_tasks,
847            #[cfg(feature = "rayon")]
848            cpu_pool,
849            #[cfg(feature = "rayon")]
850            rpc_pool,
851            #[cfg(feature = "rayon")]
852            storage_pool,
853            #[cfg(feature = "rayon")]
854            blocking_guard,
855            #[cfg(feature = "rayon")]
856            proof_storage_worker_pool,
857            #[cfg(feature = "rayon")]
858            proof_account_worker_pool,
859            #[cfg(feature = "rayon")]
860            prewarming_pool,
861            worker_map: WorkerMap::new(),
862            task_manager_handle: Mutex::new(Some(task_manager_handle)),
863        };
864
865        Ok(Runtime(Arc::new(inner)))
866    }
867}
868
869#[cfg(test)]
870mod tests {
871    use super::*;
872
873    #[test]
874    fn test_runtime_config_default() {
875        let config = RuntimeConfig::default();
876        assert!(matches!(config.tokio, TokioConfig::Owned { .. }));
877    }
878
879    #[test]
880    fn test_runtime_config_existing_handle() {
881        let rt = TokioRuntime::new().unwrap();
882        let config =
883            Runtime::test_config().with_tokio(TokioConfig::existing_handle(rt.handle().clone()));
884        assert!(matches!(config.tokio, TokioConfig::ExistingHandle(_)));
885    }
886
887    #[cfg(feature = "rayon")]
888    #[test]
889    fn test_rayon_config_thread_count() {
890        let config = RayonConfig::default();
891        let count = config.default_thread_count();
892        assert!(count >= 1);
893    }
894
895    #[test]
896    fn test_runtime_builder() {
897        let rt = TokioRuntime::new().unwrap();
898        let config =
899            Runtime::test_config().with_tokio(TokioConfig::existing_handle(rt.handle().clone()));
900        let runtime = RuntimeBuilder::new(config).build().unwrap();
901        let _ = runtime.handle();
902    }
903}