Skip to main content

reth_tasks/
runtime.rs

1//! Centralized management of async and parallel execution.
2//!
3//! This module provides [`Runtime`], a cheaply cloneable handle that manages:
4//! - Tokio runtime (either owned or attached)
5//! - Task spawning with shutdown awareness and panic monitoring
6//! - Dedicated rayon thread pools for different workloads (with `rayon` feature)
7//! - [`BlockingTaskGuard`] for rate-limiting expensive operations (with `rayon` feature)
8
9#[cfg(feature = "rayon")]
10use crate::pool::{BlockingTaskGuard, BlockingTaskPool};
11use crate::{
12    metrics::{IncCounterOnDrop, TaskExecutorMetrics},
13    shutdown::{GracefulShutdown, GracefulShutdownGuard, Shutdown},
14    PanickedTaskError, TaskEvent, TaskManager,
15};
16use futures_util::{
17    future::{select, BoxFuture},
18    Future, FutureExt, TryFutureExt,
19};
20#[cfg(feature = "rayon")]
21use std::thread::available_parallelism;
22use std::{
23    pin::pin,
24    sync::{
25        atomic::{AtomicUsize, Ordering},
26        Arc, Mutex,
27    },
28    time::Duration,
29};
30use tokio::{runtime::Handle, sync::mpsc::UnboundedSender, task::JoinHandle};
31use tracing::{debug, error};
32use tracing_futures::Instrument;
33
34use tokio::runtime::Runtime as TokioRuntime;
35
36/// Default thread keep-alive duration for the tokio runtime.
37pub const DEFAULT_THREAD_KEEP_ALIVE: Duration = Duration::from_secs(15);
38
39/// Default reserved CPU cores for OS and other processes.
40pub const DEFAULT_RESERVED_CPU_CORES: usize = 2;
41
42/// Default number of threads for the storage I/O pool.
43pub const DEFAULT_STORAGE_POOL_THREADS: usize = 16;
44
45/// Default maximum number of concurrent blocking tasks (for RPC tracing guard).
46pub const DEFAULT_MAX_BLOCKING_TASKS: usize = 512;
47
48/// Configuration for the tokio runtime.
49#[derive(Debug, Clone)]
50pub enum TokioConfig {
51    /// Build and own a new tokio runtime.
52    Owned {
53        /// Number of worker threads. If `None`, uses tokio's default (number of CPU cores).
54        worker_threads: Option<usize>,
55        /// How long to keep worker threads alive when idle.
56        thread_keep_alive: Duration,
57        /// Thread name prefix.
58        thread_name: &'static str,
59    },
60    /// Attach to an existing tokio runtime handle.
61    ExistingHandle(Handle),
62}
63
64impl Default for TokioConfig {
65    fn default() -> Self {
66        Self::Owned {
67            worker_threads: None,
68            thread_keep_alive: DEFAULT_THREAD_KEEP_ALIVE,
69            thread_name: "tokio-rt",
70        }
71    }
72}
73
74impl TokioConfig {
75    /// Create a config that attaches to an existing runtime handle.
76    pub const fn existing_handle(handle: Handle) -> Self {
77        Self::ExistingHandle(handle)
78    }
79
80    /// Create a config for an owned runtime with the specified number of worker threads.
81    pub const fn with_worker_threads(worker_threads: usize) -> Self {
82        Self::Owned {
83            worker_threads: Some(worker_threads),
84            thread_keep_alive: DEFAULT_THREAD_KEEP_ALIVE,
85            thread_name: "tokio-rt",
86        }
87    }
88}
89
90/// Configuration for the rayon thread pools.
91#[derive(Debug, Clone)]
92#[cfg(feature = "rayon")]
93pub struct RayonConfig {
94    /// Number of threads for the general CPU pool.
95    /// If `None`, derived from available parallelism minus reserved cores.
96    pub cpu_threads: Option<usize>,
97    /// Number of CPU cores to reserve for OS and other processes.
98    pub reserved_cpu_cores: usize,
99    /// Number of threads for the RPC blocking pool (trace calls, `eth_getProof`, etc.).
100    /// If `None`, uses the same as `cpu_threads`.
101    pub rpc_threads: Option<usize>,
102    /// Number of threads for the storage I/O pool (static file, `RocksDB` writes in
103    /// `save_blocks`). If `None`, uses [`DEFAULT_STORAGE_POOL_THREADS`].
104    pub storage_threads: Option<usize>,
105    /// Maximum number of concurrent blocking tasks for the RPC guard semaphore.
106    pub max_blocking_tasks: usize,
107    /// Number of threads for the proof storage worker pool (trie storage proof workers).
108    /// If `None`, derived from available parallelism.
109    pub proof_storage_worker_threads: Option<usize>,
110    /// Number of threads for the proof account worker pool (trie account proof workers).
111    /// If `None`, derived from available parallelism.
112    pub proof_account_worker_threads: Option<usize>,
113}
114
115#[cfg(feature = "rayon")]
116impl Default for RayonConfig {
117    fn default() -> Self {
118        Self {
119            cpu_threads: None,
120            reserved_cpu_cores: DEFAULT_RESERVED_CPU_CORES,
121            rpc_threads: None,
122            storage_threads: None,
123            max_blocking_tasks: DEFAULT_MAX_BLOCKING_TASKS,
124            proof_storage_worker_threads: None,
125            proof_account_worker_threads: None,
126        }
127    }
128}
129
130#[cfg(feature = "rayon")]
131impl RayonConfig {
132    /// Set the number of reserved CPU cores.
133    pub const fn with_reserved_cpu_cores(mut self, reserved_cpu_cores: usize) -> Self {
134        self.reserved_cpu_cores = reserved_cpu_cores;
135        self
136    }
137
138    /// Set the maximum number of concurrent blocking tasks.
139    pub const fn with_max_blocking_tasks(mut self, max_blocking_tasks: usize) -> Self {
140        self.max_blocking_tasks = max_blocking_tasks;
141        self
142    }
143
144    /// Set the number of threads for the RPC blocking pool.
145    pub const fn with_rpc_threads(mut self, rpc_threads: usize) -> Self {
146        self.rpc_threads = Some(rpc_threads);
147        self
148    }
149
150    /// Set the number of threads for the storage I/O pool.
151    pub const fn with_storage_threads(mut self, storage_threads: usize) -> Self {
152        self.storage_threads = Some(storage_threads);
153        self
154    }
155
156    /// Set the number of threads for the proof storage worker pool.
157    pub const fn with_proof_storage_worker_threads(
158        mut self,
159        proof_storage_worker_threads: usize,
160    ) -> Self {
161        self.proof_storage_worker_threads = Some(proof_storage_worker_threads);
162        self
163    }
164
165    /// Set the number of threads for the proof account worker pool.
166    pub const fn with_proof_account_worker_threads(
167        mut self,
168        proof_account_worker_threads: usize,
169    ) -> Self {
170        self.proof_account_worker_threads = Some(proof_account_worker_threads);
171        self
172    }
173
174    /// Compute the default number of threads based on available parallelism.
175    fn default_thread_count(&self) -> usize {
176        self.cpu_threads.unwrap_or_else(|| {
177            available_parallelism()
178                .map_or(1, |num| num.get().saturating_sub(self.reserved_cpu_cores).max(1))
179        })
180    }
181}
182
183/// Configuration for building a [`Runtime`].
184#[derive(Debug, Clone, Default)]
185pub struct RuntimeConfig {
186    /// Tokio runtime configuration.
187    pub tokio: TokioConfig,
188    /// Rayon thread pool configuration.
189    #[cfg(feature = "rayon")]
190    pub rayon: RayonConfig,
191}
192
193impl RuntimeConfig {
194    /// Create a config that attaches to an existing tokio runtime handle.
195    #[cfg_attr(not(feature = "rayon"), allow(clippy::missing_const_for_fn))]
196    pub fn with_existing_handle(handle: Handle) -> Self {
197        Self {
198            tokio: TokioConfig::ExistingHandle(handle),
199            #[cfg(feature = "rayon")]
200            rayon: RayonConfig::default(),
201        }
202    }
203
204    /// Set the tokio configuration.
205    pub fn with_tokio(mut self, tokio: TokioConfig) -> Self {
206        self.tokio = tokio;
207        self
208    }
209
210    /// Set the rayon configuration.
211    #[cfg(feature = "rayon")]
212    pub const fn with_rayon(mut self, rayon: RayonConfig) -> Self {
213        self.rayon = rayon;
214        self
215    }
216}
217
218/// Error returned when [`RuntimeBuilder::build`] fails.
219#[derive(Debug, thiserror::Error)]
220pub enum RuntimeBuildError {
221    /// Failed to build the tokio runtime.
222    #[error("Failed to build tokio runtime: {0}")]
223    TokioBuild(#[from] std::io::Error),
224    /// Failed to build a rayon thread pool.
225    #[cfg(feature = "rayon")]
226    #[error("Failed to build rayon thread pool: {0}")]
227    RayonBuild(#[from] rayon::ThreadPoolBuildError),
228}
229
230// ── RuntimeInner ──────────────────────────────────────────────────────
231
232struct RuntimeInner {
233    /// Owned tokio runtime, if we built one. Kept alive via the `Arc<RuntimeInner>`.
234    _tokio_runtime: Option<TokioRuntime>,
235    /// Handle to the tokio runtime.
236    handle: Handle,
237    /// Receiver of the shutdown signal.
238    on_shutdown: Shutdown,
239    /// Sender half for sending task events to the [`TaskManager`].
240    task_events_tx: UnboundedSender<TaskEvent>,
241    /// Task executor metrics.
242    metrics: TaskExecutorMetrics,
243    /// How many [`GracefulShutdown`] tasks are currently active.
244    graceful_tasks: Arc<AtomicUsize>,
245    /// General-purpose rayon CPU pool.
246    #[cfg(feature = "rayon")]
247    cpu_pool: rayon::ThreadPool,
248    /// RPC blocking pool.
249    #[cfg(feature = "rayon")]
250    rpc_pool: BlockingTaskPool,
251    /// Storage I/O pool.
252    #[cfg(feature = "rayon")]
253    storage_pool: rayon::ThreadPool,
254    /// Rate limiter for expensive RPC operations.
255    #[cfg(feature = "rayon")]
256    blocking_guard: BlockingTaskGuard,
257    /// Proof storage worker pool (trie storage proof computation).
258    #[cfg(feature = "rayon")]
259    proof_storage_worker_pool: rayon::ThreadPool,
260    /// Proof account worker pool (trie account proof computation).
261    #[cfg(feature = "rayon")]
262    proof_account_worker_pool: rayon::ThreadPool,
263    /// Handle to the spawned [`TaskManager`] background task.
264    /// The task monitors critical tasks for panics and fires the shutdown signal.
265    /// Can be taken via [`Runtime::take_task_manager_handle`] to poll for panic errors.
266    task_manager_handle: Mutex<Option<JoinHandle<Result<(), PanickedTaskError>>>>,
267}
268
269// ── Runtime ───────────────────────────────────────────────────────────
270
271/// A cheaply cloneable handle to the runtime resources.
272///
273/// Wraps an `Arc<RuntimeInner>` and provides access to:
274/// - The tokio [`Handle`]
275/// - Task spawning with shutdown awareness and panic monitoring
276/// - Rayon thread pools (with `rayon` feature)
277#[derive(Clone)]
278pub struct Runtime(Arc<RuntimeInner>);
279
280impl std::fmt::Debug for Runtime {
281    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
282        f.debug_struct("Runtime").field("handle", &self.0.handle).finish()
283    }
284}
285
286#[cfg(any(test, feature = "test-utils"))]
287impl Default for Runtime {
288    fn default() -> Self {
289        let config = match Handle::try_current() {
290            Ok(handle) => RuntimeConfig::with_existing_handle(handle),
291            Err(_) => RuntimeConfig::default(),
292        };
293        RuntimeBuilder::new(config).build().expect("failed to build default Runtime")
294    }
295}
296
297// ── Constructors ──────────────────────────────────────────────────────
298
299impl Runtime {
300    /// Creates a [`Runtime`] that attaches to an existing tokio runtime handle.
301    pub fn with_existing_handle(handle: Handle) -> Result<Self, RuntimeBuildError> {
302        RuntimeBuilder::new(RuntimeConfig::with_existing_handle(handle)).build()
303    }
304}
305
306// ── Pool accessors ────────────────────────────────────────────────────
307
308impl Runtime {
309    /// Takes the [`TaskManager`] handle out of this runtime, if one is stored.
310    ///
311    /// The handle resolves with `Err(PanickedTaskError)` if a critical task panicked,
312    /// or `Ok(())` if shutdown was requested. If not taken, the background task still
313    /// runs and logs panics at `debug!` level.
314    pub fn take_task_manager_handle(&self) -> Option<JoinHandle<Result<(), PanickedTaskError>>> {
315        self.0.task_manager_handle.lock().unwrap().take()
316    }
317
318    /// Returns the tokio runtime [`Handle`].
319    pub fn handle(&self) -> &Handle {
320        &self.0.handle
321    }
322
323    /// Get the general-purpose rayon CPU thread pool.
324    #[cfg(feature = "rayon")]
325    pub fn cpu_pool(&self) -> &rayon::ThreadPool {
326        &self.0.cpu_pool
327    }
328
329    /// Get the RPC blocking task pool.
330    #[cfg(feature = "rayon")]
331    pub fn rpc_pool(&self) -> &BlockingTaskPool {
332        &self.0.rpc_pool
333    }
334
335    /// Get the storage I/O pool.
336    #[cfg(feature = "rayon")]
337    pub fn storage_pool(&self) -> &rayon::ThreadPool {
338        &self.0.storage_pool
339    }
340
341    /// Get a clone of the [`BlockingTaskGuard`].
342    #[cfg(feature = "rayon")]
343    pub fn blocking_guard(&self) -> BlockingTaskGuard {
344        self.0.blocking_guard.clone()
345    }
346
347    /// Get the proof storage worker pool.
348    #[cfg(feature = "rayon")]
349    pub fn proof_storage_worker_pool(&self) -> &rayon::ThreadPool {
350        &self.0.proof_storage_worker_pool
351    }
352
353    /// Get the proof account worker pool.
354    #[cfg(feature = "rayon")]
355    pub fn proof_account_worker_pool(&self) -> &rayon::ThreadPool {
356        &self.0.proof_account_worker_pool
357    }
358}
359
360// ── Test helpers ──────────────────────────────────────────────────────
361
362impl Runtime {
363    /// Creates a lightweight [`Runtime`] for tests with minimal thread pools.
364    ///
365    /// If called from within a tokio runtime (e.g. `#[tokio::test]`), attaches to the existing
366    /// handle to avoid shutdown panics when the test runtime is dropped.
367    pub fn test() -> Self {
368        let config = match Handle::try_current() {
369            Ok(handle) => Self::test_config().with_tokio(TokioConfig::existing_handle(handle)),
370            Err(_) => Self::test_config(),
371        };
372        RuntimeBuilder::new(config).build().expect("failed to build test Runtime")
373    }
374
375    /// Creates a lightweight [`Runtime`] for tests, attaching to the given tokio handle.
376    pub fn test_with_handle(handle: Handle) -> Self {
377        let config = Self::test_config().with_tokio(TokioConfig::existing_handle(handle));
378        RuntimeBuilder::new(config).build().expect("failed to build test Runtime")
379    }
380
381    const fn test_config() -> RuntimeConfig {
382        RuntimeConfig {
383            tokio: TokioConfig::Owned {
384                worker_threads: Some(2),
385                thread_keep_alive: DEFAULT_THREAD_KEEP_ALIVE,
386                thread_name: "tokio-test",
387            },
388            #[cfg(feature = "rayon")]
389            rayon: RayonConfig {
390                cpu_threads: Some(2),
391                reserved_cpu_cores: 0,
392                rpc_threads: Some(2),
393                storage_threads: Some(2),
394                max_blocking_tasks: 16,
395                proof_storage_worker_threads: Some(2),
396                proof_account_worker_threads: Some(2),
397            },
398        }
399    }
400}
401
402// ── Spawn methods ─────────────────────────────────────────────────────
403
404/// Determines how a task is spawned.
405enum TaskKind {
406    /// Spawn the task to the default executor [`Handle::spawn`].
407    Default,
408    /// Spawn the task to the blocking executor [`Handle::spawn_blocking`].
409    Blocking,
410}
411
412impl Runtime {
413    /// Returns the receiver of the shutdown signal.
414    pub fn on_shutdown_signal(&self) -> &Shutdown {
415        &self.0.on_shutdown
416    }
417
418    /// Spawns a future on the tokio runtime depending on the [`TaskKind`].
419    fn spawn_on_rt<F>(&self, fut: F, task_kind: TaskKind) -> JoinHandle<()>
420    where
421        F: Future<Output = ()> + Send + 'static,
422    {
423        match task_kind {
424            TaskKind::Default => self.0.handle.spawn(fut),
425            TaskKind::Blocking => {
426                let handle = self.0.handle.clone();
427                self.0.handle.spawn_blocking(move || handle.block_on(fut))
428            }
429        }
430    }
431
432    /// Spawns a regular task depending on the given [`TaskKind`].
433    fn spawn_task_as<F>(&self, fut: F, task_kind: TaskKind) -> JoinHandle<()>
434    where
435        F: Future<Output = ()> + Send + 'static,
436    {
437        let on_shutdown = self.0.on_shutdown.clone();
438
439        let finished_counter = match task_kind {
440            TaskKind::Default => self.0.metrics.finished_regular_tasks_total.clone(),
441            TaskKind::Blocking => self.0.metrics.finished_regular_blocking_tasks_total.clone(),
442        };
443
444        let task = {
445            async move {
446                let _inc_counter_on_drop = IncCounterOnDrop::new(finished_counter);
447                let fut = pin!(fut);
448                let _ = select(on_shutdown, fut).await;
449            }
450        }
451        .in_current_span();
452
453        self.spawn_on_rt(task, task_kind)
454    }
455
456    /// Spawns the task onto the runtime.
457    /// The given future resolves as soon as the [Shutdown] signal is received.
458    ///
459    /// See also [`Handle::spawn`].
460    pub fn spawn_task<F>(&self, fut: F) -> JoinHandle<()>
461    where
462        F: Future<Output = ()> + Send + 'static,
463    {
464        self.spawn_task_as(fut, TaskKind::Default)
465    }
466
467    /// Spawns a blocking task onto the runtime.
468    /// The given future resolves as soon as the [Shutdown] signal is received.
469    ///
470    /// See also [`Handle::spawn_blocking`].
471    pub fn spawn_blocking_task<F>(&self, fut: F) -> JoinHandle<()>
472    where
473        F: Future<Output = ()> + Send + 'static,
474    {
475        self.spawn_task_as(fut, TaskKind::Blocking)
476    }
477
478    /// Spawns a blocking closure directly on the tokio runtime, bypassing shutdown
479    /// awareness. Useful for raw CPU-bound work.
480    pub fn spawn_blocking<F, R>(&self, func: F) -> JoinHandle<R>
481    where
482        F: FnOnce() -> R + Send + 'static,
483        R: Send + 'static,
484    {
485        self.0.handle.spawn_blocking(func)
486    }
487
488    /// Spawns the task onto the runtime.
489    /// The given future resolves as soon as the [Shutdown] signal is received.
490    ///
491    /// See also [`Handle::spawn`].
492    pub fn spawn_with_signal<F>(&self, f: impl FnOnce(Shutdown) -> F) -> JoinHandle<()>
493    where
494        F: Future<Output = ()> + Send + 'static,
495    {
496        let on_shutdown = self.0.on_shutdown.clone();
497        let fut = f(on_shutdown);
498        let task = fut.in_current_span();
499        self.0.handle.spawn(task)
500    }
501
502    /// Spawns a critical task depending on the given [`TaskKind`].
503    fn spawn_critical_as<F>(
504        &self,
505        name: &'static str,
506        fut: F,
507        task_kind: TaskKind,
508    ) -> JoinHandle<()>
509    where
510        F: Future<Output = ()> + Send + 'static,
511    {
512        let panicked_tasks_tx = self.0.task_events_tx.clone();
513        let on_shutdown = self.0.on_shutdown.clone();
514
515        // wrap the task in catch unwind
516        let task = std::panic::AssertUnwindSafe(fut)
517            .catch_unwind()
518            .map_err(move |error| {
519                let task_error = PanickedTaskError::new(name, error);
520                error!("{task_error}");
521                let _ = panicked_tasks_tx.send(TaskEvent::Panic(task_error));
522            })
523            .in_current_span();
524
525        let finished_critical_tasks_total_metrics =
526            self.0.metrics.finished_critical_tasks_total.clone();
527        let task = async move {
528            let _inc_counter_on_drop = IncCounterOnDrop::new(finished_critical_tasks_total_metrics);
529            let task = pin!(task);
530            let _ = select(on_shutdown, task).await;
531        };
532
533        self.spawn_on_rt(task, task_kind)
534    }
535
536    /// This spawns a critical task onto the runtime.
537    /// The given future resolves as soon as the [Shutdown] signal is received.
538    ///
539    /// If this task panics, the [`TaskManager`] is notified.
540    pub fn spawn_critical_task<F>(&self, name: &'static str, fut: F) -> JoinHandle<()>
541    where
542        F: Future<Output = ()> + Send + 'static,
543    {
544        self.spawn_critical_as(name, fut, TaskKind::Default)
545    }
546
547    /// This spawns a critical blocking task onto the runtime.
548    /// The given future resolves as soon as the [Shutdown] signal is received.
549    ///
550    /// If this task panics, the [`TaskManager`] is notified.
551    pub fn spawn_critical_blocking_task<F>(&self, name: &'static str, fut: F) -> JoinHandle<()>
552    where
553        F: Future<Output = ()> + Send + 'static,
554    {
555        self.spawn_critical_as(name, fut, TaskKind::Blocking)
556    }
557
558    /// This spawns a critical task onto the runtime.
559    ///
560    /// If this task panics, the [`TaskManager`] is notified.
561    pub fn spawn_critical_with_shutdown_signal<F>(
562        &self,
563        name: &'static str,
564        f: impl FnOnce(Shutdown) -> F,
565    ) -> JoinHandle<()>
566    where
567        F: Future<Output = ()> + Send + 'static,
568    {
569        let panicked_tasks_tx = self.0.task_events_tx.clone();
570        let on_shutdown = self.0.on_shutdown.clone();
571        let fut = f(on_shutdown);
572
573        // wrap the task in catch unwind
574        let task = std::panic::AssertUnwindSafe(fut)
575            .catch_unwind()
576            .map_err(move |error| {
577                let task_error = PanickedTaskError::new(name, error);
578                error!("{task_error}");
579                let _ = panicked_tasks_tx.send(TaskEvent::Panic(task_error));
580            })
581            .map(drop)
582            .in_current_span();
583
584        self.0.handle.spawn(task)
585    }
586
587    /// This spawns a critical task onto the runtime.
588    ///
589    /// If this task panics, the [`TaskManager`] is notified.
590    /// The [`TaskManager`] will wait until the given future has completed before shutting down.
591    ///
592    /// # Example
593    ///
594    /// ```no_run
595    /// # async fn t(executor: reth_tasks::TaskExecutor) {
596    ///
597    /// executor.spawn_critical_with_graceful_shutdown_signal("grace", |shutdown| async move {
598    ///     // await the shutdown signal
599    ///     let guard = shutdown.await;
600    ///     // do work before exiting the program
601    ///     tokio::time::sleep(std::time::Duration::from_secs(1)).await;
602    ///     // allow graceful shutdown
603    ///     drop(guard);
604    /// });
605    /// # }
606    /// ```
607    pub fn spawn_critical_with_graceful_shutdown_signal<F>(
608        &self,
609        name: &'static str,
610        f: impl FnOnce(GracefulShutdown) -> F,
611    ) -> JoinHandle<()>
612    where
613        F: Future<Output = ()> + Send + 'static,
614    {
615        let panicked_tasks_tx = self.0.task_events_tx.clone();
616        let on_shutdown = GracefulShutdown::new(
617            self.0.on_shutdown.clone(),
618            GracefulShutdownGuard::new(Arc::clone(&self.0.graceful_tasks)),
619        );
620        let fut = f(on_shutdown);
621
622        // wrap the task in catch unwind
623        let task = std::panic::AssertUnwindSafe(fut)
624            .catch_unwind()
625            .map_err(move |error| {
626                let task_error = PanickedTaskError::new(name, error);
627                error!("{task_error}");
628                let _ = panicked_tasks_tx.send(TaskEvent::Panic(task_error));
629            })
630            .map(drop)
631            .in_current_span();
632
633        self.0.handle.spawn(task)
634    }
635
636    /// This spawns a regular task onto the runtime.
637    ///
638    /// The [`TaskManager`] will wait until the given future has completed before shutting down.
639    ///
640    /// # Example
641    ///
642    /// ```no_run
643    /// # async fn t(executor: reth_tasks::TaskExecutor) {
644    ///
645    /// executor.spawn_with_graceful_shutdown_signal(|shutdown| async move {
646    ///     // await the shutdown signal
647    ///     let guard = shutdown.await;
648    ///     // do work before exiting the program
649    ///     tokio::time::sleep(std::time::Duration::from_secs(1)).await;
650    ///     // allow graceful shutdown
651    ///     drop(guard);
652    /// });
653    /// # }
654    /// ```
655    pub fn spawn_with_graceful_shutdown_signal<F>(
656        &self,
657        f: impl FnOnce(GracefulShutdown) -> F,
658    ) -> JoinHandle<()>
659    where
660        F: Future<Output = ()> + Send + 'static,
661    {
662        let on_shutdown = GracefulShutdown::new(
663            self.0.on_shutdown.clone(),
664            GracefulShutdownGuard::new(Arc::clone(&self.0.graceful_tasks)),
665        );
666        let fut = f(on_shutdown);
667
668        self.0.handle.spawn(fut)
669    }
670
671    /// Sends a request to the `TaskManager` to initiate a graceful shutdown.
672    ///
673    /// Caution: This will terminate the entire program.
674    pub fn initiate_graceful_shutdown(
675        &self,
676    ) -> Result<GracefulShutdown, tokio::sync::mpsc::error::SendError<()>> {
677        self.0
678            .task_events_tx
679            .send(TaskEvent::GracefulShutdown)
680            .map_err(|_send_error_with_task_event| tokio::sync::mpsc::error::SendError(()))?;
681
682        Ok(GracefulShutdown::new(
683            self.0.on_shutdown.clone(),
684            GracefulShutdownGuard::new(Arc::clone(&self.0.graceful_tasks)),
685        ))
686    }
687
688    /// Fires the shutdown signal and waits until all graceful tasks complete.
689    pub fn graceful_shutdown(&self) {
690        let _ = self.do_graceful_shutdown(None);
691    }
692
693    /// Fires the shutdown signal and waits until all graceful tasks complete or the timeout
694    /// elapses.
695    ///
696    /// Returns `true` if all tasks completed before the timeout.
697    pub fn graceful_shutdown_with_timeout(&self, timeout: Duration) -> bool {
698        self.do_graceful_shutdown(Some(timeout))
699    }
700
701    fn do_graceful_shutdown(&self, timeout: Option<Duration>) -> bool {
702        let _ = self.0.task_events_tx.send(TaskEvent::GracefulShutdown);
703        let deadline = timeout.map(|t| std::time::Instant::now() + t);
704        while self.0.graceful_tasks.load(Ordering::SeqCst) > 0 {
705            if deadline.is_some_and(|d| std::time::Instant::now() > d) {
706                debug!("graceful shutdown timed out");
707                return false;
708            }
709            std::thread::yield_now();
710        }
711        debug!("gracefully shut down");
712        true
713    }
714}
715
716// ── TaskSpawner impl ──────────────────────────────────────────────────
717
718impl crate::TaskSpawner for Runtime {
719    fn spawn_task(&self, fut: BoxFuture<'static, ()>) -> JoinHandle<()> {
720        self.0.metrics.inc_regular_tasks();
721        Self::spawn_task(self, fut)
722    }
723
724    fn spawn_critical_task(
725        &self,
726        name: &'static str,
727        fut: BoxFuture<'static, ()>,
728    ) -> JoinHandle<()> {
729        self.0.metrics.inc_critical_tasks();
730        Self::spawn_critical_task(self, name, fut)
731    }
732
733    fn spawn_blocking_task(&self, fut: BoxFuture<'static, ()>) -> JoinHandle<()> {
734        self.0.metrics.inc_regular_blocking_tasks();
735        Self::spawn_blocking_task(self, fut)
736    }
737
738    fn spawn_critical_blocking_task(
739        &self,
740        name: &'static str,
741        fut: BoxFuture<'static, ()>,
742    ) -> JoinHandle<()> {
743        self.0.metrics.inc_critical_tasks();
744        Self::spawn_critical_blocking_task(self, name, fut)
745    }
746}
747
748// ── TaskSpawnerExt impl ──────────────────────────────────────────────
749
750impl crate::TaskSpawnerExt for Runtime {
751    fn spawn_critical_with_graceful_shutdown_signal<F>(
752        &self,
753        name: &'static str,
754        f: impl FnOnce(GracefulShutdown) -> F,
755    ) -> JoinHandle<()>
756    where
757        F: Future<Output = ()> + Send + 'static,
758    {
759        Self::spawn_critical_with_graceful_shutdown_signal(self, name, f)
760    }
761
762    fn spawn_with_graceful_shutdown_signal<F>(
763        &self,
764        f: impl FnOnce(GracefulShutdown) -> F,
765    ) -> JoinHandle<()>
766    where
767        F: Future<Output = ()> + Send + 'static,
768    {
769        Self::spawn_with_graceful_shutdown_signal(self, f)
770    }
771}
772
773// ── RuntimeBuilder ────────────────────────────────────────────────────
774
775/// Builder for constructing a [`Runtime`].
776#[derive(Debug, Clone)]
777pub struct RuntimeBuilder {
778    config: RuntimeConfig,
779}
780
781impl RuntimeBuilder {
782    /// Create a new builder with the given configuration.
783    pub const fn new(config: RuntimeConfig) -> Self {
784        Self { config }
785    }
786
787    /// Build the [`Runtime`].
788    ///
789    /// The [`TaskManager`] is automatically spawned as a background task that monitors
790    /// critical tasks for panics. Use [`Runtime::take_task_manager_handle`] to extract
791    /// the join handle if you need to poll for panic errors.
792    #[tracing::instrument(level = "debug", skip_all)]
793    pub fn build(self) -> Result<Runtime, RuntimeBuildError> {
794        debug!(?self.config, "Building runtime");
795        let config = self.config;
796
797        let (owned_runtime, handle) = match &config.tokio {
798            TokioConfig::Owned { worker_threads, thread_keep_alive, thread_name } => {
799                let mut builder = tokio::runtime::Builder::new_multi_thread();
800                builder
801                    .enable_all()
802                    .thread_keep_alive(*thread_keep_alive)
803                    .thread_name(*thread_name);
804
805                if let Some(threads) = worker_threads {
806                    builder.worker_threads(*threads);
807                }
808
809                let runtime = builder.build()?;
810                let h = runtime.handle().clone();
811                (Some(runtime), h)
812            }
813            TokioConfig::ExistingHandle(h) => (None, h.clone()),
814        };
815
816        let (task_manager, on_shutdown, task_events_tx, graceful_tasks) =
817            TaskManager::new_parts(handle.clone());
818
819        #[cfg(feature = "rayon")]
820        let (
821            cpu_pool,
822            rpc_pool,
823            storage_pool,
824            blocking_guard,
825            proof_storage_worker_pool,
826            proof_account_worker_pool,
827        ) = {
828            let default_threads = config.rayon.default_thread_count();
829            let rpc_threads = config.rayon.rpc_threads.unwrap_or(default_threads);
830
831            let cpu_pool = rayon::ThreadPoolBuilder::new()
832                .num_threads(default_threads)
833                .thread_name(|i| format!("cpu-{i:02}"))
834                .build()?;
835
836            let rpc_raw = rayon::ThreadPoolBuilder::new()
837                .num_threads(rpc_threads)
838                .thread_name(|i| format!("rpc-{i:02}"))
839                .build()?;
840            let rpc_pool = BlockingTaskPool::new(rpc_raw);
841
842            let storage_threads =
843                config.rayon.storage_threads.unwrap_or(DEFAULT_STORAGE_POOL_THREADS);
844            let storage_pool = rayon::ThreadPoolBuilder::new()
845                .num_threads(storage_threads)
846                .thread_name(|i| format!("storage-{i:02}"))
847                .build()?;
848
849            let blocking_guard = BlockingTaskGuard::new(config.rayon.max_blocking_tasks);
850
851            let proof_storage_worker_threads =
852                config.rayon.proof_storage_worker_threads.unwrap_or(default_threads);
853            let proof_storage_worker_pool = rayon::ThreadPoolBuilder::new()
854                .num_threads(proof_storage_worker_threads)
855                .thread_name(|i| format!("proof-strg-{i:02}"))
856                .build()?;
857
858            let proof_account_worker_threads =
859                config.rayon.proof_account_worker_threads.unwrap_or(default_threads);
860            let proof_account_worker_pool = rayon::ThreadPoolBuilder::new()
861                .num_threads(proof_account_worker_threads)
862                .thread_name(|i| format!("proof-acct-{i:02}"))
863                .build()?;
864
865            debug!(
866                default_threads,
867                rpc_threads,
868                storage_threads,
869                proof_storage_worker_threads,
870                proof_account_worker_threads,
871                max_blocking_tasks = config.rayon.max_blocking_tasks,
872                "Initialized rayon thread pools"
873            );
874
875            (
876                cpu_pool,
877                rpc_pool,
878                storage_pool,
879                blocking_guard,
880                proof_storage_worker_pool,
881                proof_account_worker_pool,
882            )
883        };
884
885        let task_manager_handle = handle.spawn(async move {
886            let result = task_manager.await;
887            if let Err(ref err) = result {
888                debug!("{err}");
889            }
890            result
891        });
892
893        let inner = RuntimeInner {
894            _tokio_runtime: owned_runtime,
895            handle,
896            on_shutdown,
897            task_events_tx,
898            metrics: Default::default(),
899            graceful_tasks,
900            #[cfg(feature = "rayon")]
901            cpu_pool,
902            #[cfg(feature = "rayon")]
903            rpc_pool,
904            #[cfg(feature = "rayon")]
905            storage_pool,
906            #[cfg(feature = "rayon")]
907            blocking_guard,
908            #[cfg(feature = "rayon")]
909            proof_storage_worker_pool,
910            #[cfg(feature = "rayon")]
911            proof_account_worker_pool,
912            task_manager_handle: Mutex::new(Some(task_manager_handle)),
913        };
914
915        Ok(Runtime(Arc::new(inner)))
916    }
917}
918
919#[cfg(test)]
920mod tests {
921    use super::*;
922
923    #[test]
924    fn test_runtime_config_default() {
925        let config = RuntimeConfig::default();
926        assert!(matches!(config.tokio, TokioConfig::Owned { .. }));
927    }
928
929    #[test]
930    fn test_runtime_config_existing_handle() {
931        let rt = TokioRuntime::new().unwrap();
932        let config = RuntimeConfig::with_existing_handle(rt.handle().clone());
933        assert!(matches!(config.tokio, TokioConfig::ExistingHandle(_)));
934    }
935
936    #[cfg(feature = "rayon")]
937    #[test]
938    fn test_rayon_config_thread_count() {
939        let config = RayonConfig::default();
940        let count = config.default_thread_count();
941        assert!(count >= 1);
942    }
943
944    #[test]
945    fn test_runtime_builder() {
946        let rt = TokioRuntime::new().unwrap();
947        let config = RuntimeConfig::with_existing_handle(rt.handle().clone());
948        let runtime = RuntimeBuilder::new(config).build().unwrap();
949        let _ = runtime.handle();
950    }
951}