Skip to main content

reth_tasks/
runtime.rs

1//! Centralized management of async and parallel execution.
2//!
3//! This module provides [`Runtime`], a cheaply cloneable handle that manages:
4//! - Tokio runtime (either owned or attached)
5//! - Task spawning with shutdown awareness and panic monitoring
6//! - Dedicated rayon thread pools for different workloads (with `rayon` feature)
7//! - [`BlockingTaskGuard`] for rate-limiting expensive operations (with `rayon` feature)
8
9#[cfg(feature = "rayon")]
10use crate::pool::{build_pool_with_panic_handler, BlockingTaskGuard, BlockingTaskPool, WorkerPool};
11use crate::{
12    metrics::{IncCounterOnDrop, TaskExecutorMetrics},
13    shutdown::{GracefulShutdown, GracefulShutdownGuard, Shutdown},
14    worker_map::WorkerMap,
15    PanickedTaskError, TaskEvent, TaskManager,
16};
17use futures_util::{future::select, Future, FutureExt, TryFutureExt};
18#[cfg(feature = "rayon")]
19use std::sync::OnceLock;
20#[cfg(feature = "rayon")]
21use std::{num::NonZeroUsize, thread::available_parallelism};
22use std::{
23    pin::pin,
24    sync::{
25        atomic::{AtomicUsize, Ordering},
26        Arc, Mutex,
27    },
28    time::{Duration, Instant},
29};
30use tokio::{runtime::Handle, sync::mpsc::UnboundedSender, task::JoinHandle};
31use tracing::{debug, error};
32use tracing_futures::Instrument;
33
34use tokio::runtime::Runtime as TokioRuntime;
35
36/// Default thread keep-alive duration for the tokio runtime.
37pub const DEFAULT_THREAD_KEEP_ALIVE: Duration = Duration::from_secs(15);
38
39/// Default reserved CPU cores for OS and other processes.
40pub const DEFAULT_RESERVED_CPU_CORES: usize = 2;
41
42/// Default number of threads for the storage I/O pool.
43pub const DEFAULT_STORAGE_POOL_THREADS: usize = 16;
44
45/// Default maximum number of concurrent blocking tasks (for RPC tracing guard).
46pub const DEFAULT_MAX_BLOCKING_TASKS: usize = 512;
47
48/// Configuration for the tokio runtime.
49#[derive(Debug, Clone)]
50pub enum TokioConfig {
51    /// Build and own a new tokio runtime.
52    Owned {
53        /// Number of worker threads. If `None`, uses tokio's default (number of CPU cores).
54        worker_threads: Option<usize>,
55        /// How long to keep worker threads alive when idle.
56        thread_keep_alive: Duration,
57        /// Thread name prefix.
58        thread_name: &'static str,
59    },
60    /// Attach to an existing tokio runtime handle.
61    ExistingHandle(Handle),
62}
63
64impl Default for TokioConfig {
65    fn default() -> Self {
66        Self::Owned {
67            worker_threads: None,
68            thread_keep_alive: DEFAULT_THREAD_KEEP_ALIVE,
69            thread_name: "tokio-rt",
70        }
71    }
72}
73
74impl TokioConfig {
75    /// Create a config that attaches to an existing runtime handle.
76    pub const fn existing_handle(handle: Handle) -> Self {
77        Self::ExistingHandle(handle)
78    }
79
80    /// Create a config for an owned runtime with the specified number of worker threads.
81    pub const fn with_worker_threads(worker_threads: usize) -> Self {
82        Self::Owned {
83            worker_threads: Some(worker_threads),
84            thread_keep_alive: DEFAULT_THREAD_KEEP_ALIVE,
85            thread_name: "tokio-rt",
86        }
87    }
88}
89
90/// Configuration for the rayon thread pools.
91#[derive(Debug, Clone)]
92#[cfg(feature = "rayon")]
93pub struct RayonConfig {
94    /// Number of threads for the general CPU pool.
95    /// If `None`, derived from available parallelism minus reserved cores.
96    pub cpu_threads: Option<usize>,
97    /// Number of CPU cores to reserve for OS and other processes.
98    pub reserved_cpu_cores: usize,
99    /// Number of threads for the RPC blocking pool (trace calls, `eth_getProof`, etc.).
100    /// If `None`, uses the same as `cpu_threads`.
101    pub rpc_threads: Option<usize>,
102    /// Number of threads for the storage I/O pool (static file, `RocksDB` writes in
103    /// `save_blocks`). If `None`, uses [`DEFAULT_STORAGE_POOL_THREADS`].
104    pub storage_threads: Option<usize>,
105    /// Maximum number of concurrent blocking tasks for the RPC guard semaphore.
106    pub max_blocking_tasks: usize,
107    /// Number of threads for the proof storage worker pool (trie storage proof workers).
108    /// If `None`, derived from available parallelism.
109    pub proof_storage_worker_threads: Option<usize>,
110    /// Number of threads for the proof account worker pool (trie account proof workers).
111    /// If `None`, derived from available parallelism.
112    pub proof_account_worker_threads: Option<usize>,
113    /// Number of threads for the prewarming pool (execution prewarming workers).
114    /// If `None`, derived from available parallelism.
115    pub prewarming_threads: Option<usize>,
116    /// Number of threads for the BAL streaming pool (BAL hashed state streaming).
117    /// If `None`, derived from available parallelism.
118    pub bal_streaming_threads: Option<usize>,
119}
120
121#[cfg(feature = "rayon")]
122impl Default for RayonConfig {
123    fn default() -> Self {
124        Self {
125            cpu_threads: None,
126            reserved_cpu_cores: DEFAULT_RESERVED_CPU_CORES,
127            rpc_threads: None,
128            storage_threads: None,
129            max_blocking_tasks: DEFAULT_MAX_BLOCKING_TASKS,
130            proof_storage_worker_threads: None,
131            proof_account_worker_threads: None,
132            prewarming_threads: None,
133            bal_streaming_threads: None,
134        }
135    }
136}
137
138#[cfg(feature = "rayon")]
139impl RayonConfig {
140    /// Set the number of reserved CPU cores.
141    pub const fn with_reserved_cpu_cores(mut self, reserved_cpu_cores: usize) -> Self {
142        self.reserved_cpu_cores = reserved_cpu_cores;
143        self
144    }
145
146    /// Set the maximum number of concurrent blocking tasks.
147    pub const fn with_max_blocking_tasks(mut self, max_blocking_tasks: usize) -> Self {
148        self.max_blocking_tasks = max_blocking_tasks;
149        self
150    }
151
152    /// Set the number of threads for the RPC blocking pool.
153    pub const fn with_rpc_threads(mut self, rpc_threads: usize) -> Self {
154        self.rpc_threads = Some(rpc_threads);
155        self
156    }
157
158    /// Set the number of threads for the storage I/O pool.
159    pub const fn with_storage_threads(mut self, storage_threads: usize) -> Self {
160        self.storage_threads = Some(storage_threads);
161        self
162    }
163
164    /// Set the number of threads for the proof storage worker pool.
165    pub const fn with_proof_storage_worker_threads(
166        mut self,
167        proof_storage_worker_threads: usize,
168    ) -> Self {
169        self.proof_storage_worker_threads = Some(proof_storage_worker_threads);
170        self
171    }
172
173    /// Set the number of threads for the proof account worker pool.
174    pub const fn with_proof_account_worker_threads(
175        mut self,
176        proof_account_worker_threads: usize,
177    ) -> Self {
178        self.proof_account_worker_threads = Some(proof_account_worker_threads);
179        self
180    }
181
182    /// Set the number of threads for the prewarming pool.
183    pub const fn with_prewarming_threads(mut self, prewarming_threads: usize) -> Self {
184        self.prewarming_threads = Some(prewarming_threads);
185        self
186    }
187
188    /// Set the number of threads for the BAL streaming pool.
189    pub const fn with_bal_streaming_threads(mut self, bal_streaming_threads: usize) -> Self {
190        self.bal_streaming_threads = Some(bal_streaming_threads);
191        self
192    }
193
194    /// Compute the default number of threads based on available parallelism.
195    fn default_thread_count(&self) -> usize {
196        // TODO: reserved_cpu_cores is currently ignored because subtracting from thread pool
197        // sizes doesn't actually reserve CPU cores for other processes.
198        let _ = self.reserved_cpu_cores;
199        self.cpu_threads.unwrap_or_else(|| available_parallelism().map_or(1, NonZeroUsize::get))
200    }
201}
202
203/// Configuration for building a [`Runtime`].
204#[derive(Debug, Clone, Default)]
205pub struct RuntimeConfig {
206    /// Tokio runtime configuration.
207    pub tokio: TokioConfig,
208    /// Rayon thread pool configuration.
209    #[cfg(feature = "rayon")]
210    pub rayon: RayonConfig,
211}
212
213impl RuntimeConfig {
214    /// Set the tokio configuration.
215    pub fn with_tokio(mut self, tokio: TokioConfig) -> Self {
216        self.tokio = tokio;
217        self
218    }
219
220    /// Set the rayon configuration.
221    #[cfg(feature = "rayon")]
222    pub const fn with_rayon(mut self, rayon: RayonConfig) -> Self {
223        self.rayon = rayon;
224        self
225    }
226}
227
228/// Error returned when [`RuntimeBuilder::build`] fails.
229#[derive(Debug, thiserror::Error)]
230pub enum RuntimeBuildError {
231    /// Failed to build the tokio runtime.
232    #[error("Failed to build tokio runtime: {0}")]
233    TokioBuild(#[from] std::io::Error),
234    /// Failed to build a rayon thread pool.
235    #[cfg(feature = "rayon")]
236    #[error("Failed to build rayon thread pool: {0}")]
237    RayonBuild(#[from] rayon::ThreadPoolBuildError),
238}
239
240#[cfg(feature = "rayon")]
241#[derive(Debug)]
242struct LazyWorkerPool {
243    pool: OnceLock<WorkerPool>,
244    num_threads: usize,
245    thread_name_prefix: &'static str,
246}
247
248#[cfg(feature = "rayon")]
249impl LazyWorkerPool {
250    const fn new(num_threads: usize, thread_name_prefix: &'static str) -> Self {
251        Self { pool: OnceLock::new(), num_threads, thread_name_prefix }
252    }
253
254    fn get(&self) -> &WorkerPool {
255        let num_threads = self.num_threads;
256        let thread_name_prefix = self.thread_name_prefix;
257        self.pool.get_or_init(|| {
258            WorkerPool::from_builder(
259                rayon::ThreadPoolBuilder::new()
260                    .num_threads(num_threads)
261                    .thread_name(move |i| format!("{thread_name_prefix}-{i:02}")),
262            )
263            .unwrap_or_else(|err| panic!("failed to build {thread_name_prefix} worker pool: {err}"))
264        })
265    }
266}
267
268// ── RuntimeInner ──────────────────────────────────────────────────────
269
270struct RuntimeInner {
271    /// Owned tokio runtime, if we built one. Kept alive via the `Arc<RuntimeInner>`.
272    _tokio_runtime: Option<TokioRuntime>,
273    /// Handle to the tokio runtime.
274    handle: Handle,
275    /// Receiver of the shutdown signal.
276    on_shutdown: Shutdown,
277    /// Sender half for sending task events to the [`TaskManager`].
278    task_events_tx: UnboundedSender<TaskEvent>,
279    /// Task executor metrics.
280    metrics: TaskExecutorMetrics,
281    /// How many [`GracefulShutdown`] tasks are currently active.
282    graceful_tasks: Arc<AtomicUsize>,
283    /// General-purpose rayon CPU pool.
284    #[cfg(feature = "rayon")]
285    cpu_pool: rayon::ThreadPool,
286    /// RPC blocking pool.
287    #[cfg(feature = "rayon")]
288    rpc_pool: BlockingTaskPool,
289    /// Storage I/O pool.
290    #[cfg(feature = "rayon")]
291    storage_pool: rayon::ThreadPool,
292    /// Rate limiter for expensive RPC operations.
293    #[cfg(feature = "rayon")]
294    blocking_guard: BlockingTaskGuard,
295    /// Proof storage worker pool (trie storage proof computation).
296    #[cfg(feature = "rayon")]
297    proof_storage_worker_pool: WorkerPool,
298    /// Proof account worker pool (trie account proof computation).
299    #[cfg(feature = "rayon")]
300    proof_account_worker_pool: WorkerPool,
301    /// Prewarming pool (execution prewarming workers).
302    #[cfg(feature = "rayon")]
303    prewarming_pool: WorkerPool,
304    /// BAL streaming pool (BAL hashed state streaming).
305    #[cfg(feature = "rayon")]
306    bal_streaming_pool: LazyWorkerPool,
307    /// Named single-thread worker map. Each unique name gets a dedicated OS thread
308    /// that is reused across all tasks submitted under that name.
309    worker_map: WorkerMap,
310    /// Handle to the spawned [`TaskManager`] background task.
311    /// The task monitors critical tasks for panics and fires the shutdown signal.
312    /// Can be taken via [`Runtime::take_task_manager_handle`] to poll for panic errors.
313    task_manager_handle: Mutex<Option<JoinHandle<Result<(), PanickedTaskError>>>>,
314}
315
316// ── Runtime ───────────────────────────────────────────────────────────
317
318/// A cheaply cloneable handle to the runtime resources.
319///
320/// Wraps an `Arc<RuntimeInner>` and provides access to:
321/// - The tokio [`Handle`]
322/// - Task spawning with shutdown awareness and panic monitoring
323/// - Rayon thread pools (with `rayon` feature)
324#[derive(Clone)]
325pub struct Runtime(Arc<RuntimeInner>);
326
327impl std::fmt::Debug for Runtime {
328    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
329        f.debug_struct("Runtime").field("handle", &self.0.handle).finish()
330    }
331}
332
333// ── Pool accessors ────────────────────────────────────────────────────
334
335impl Runtime {
336    /// Takes the [`TaskManager`] handle out of this runtime, if one is stored.
337    ///
338    /// The handle resolves with `Err(PanickedTaskError)` if a critical task panicked,
339    /// or `Ok(())` if shutdown was requested. If not taken, the background task still
340    /// runs and logs panics at `debug!` level.
341    pub fn take_task_manager_handle(&self) -> Option<JoinHandle<Result<(), PanickedTaskError>>> {
342        self.0.task_manager_handle.lock().unwrap().take()
343    }
344
345    /// Returns the tokio runtime [`Handle`].
346    pub fn handle(&self) -> &Handle {
347        &self.0.handle
348    }
349
350    /// Get the general-purpose rayon CPU thread pool.
351    #[cfg(feature = "rayon")]
352    pub fn cpu_pool(&self) -> &rayon::ThreadPool {
353        &self.0.cpu_pool
354    }
355
356    /// Get the RPC blocking task pool.
357    #[cfg(feature = "rayon")]
358    pub fn rpc_pool(&self) -> &BlockingTaskPool {
359        &self.0.rpc_pool
360    }
361
362    /// Get the storage I/O pool.
363    #[cfg(feature = "rayon")]
364    pub fn storage_pool(&self) -> &rayon::ThreadPool {
365        &self.0.storage_pool
366    }
367
368    /// Get a clone of the [`BlockingTaskGuard`].
369    #[cfg(feature = "rayon")]
370    pub fn blocking_guard(&self) -> BlockingTaskGuard {
371        self.0.blocking_guard.clone()
372    }
373
374    /// Get the proof storage worker pool.
375    #[cfg(feature = "rayon")]
376    pub fn proof_storage_worker_pool(&self) -> &WorkerPool {
377        &self.0.proof_storage_worker_pool
378    }
379
380    /// Get the proof account worker pool.
381    #[cfg(feature = "rayon")]
382    pub fn proof_account_worker_pool(&self) -> &WorkerPool {
383        &self.0.proof_account_worker_pool
384    }
385
386    /// Get the prewarming pool.
387    #[cfg(feature = "rayon")]
388    pub fn prewarming_pool(&self) -> &WorkerPool {
389        &self.0.prewarming_pool
390    }
391
392    /// Get the BAL streaming pool.
393    #[cfg(feature = "rayon")]
394    pub fn bal_streaming_pool(&self) -> &WorkerPool {
395        self.0.bal_streaming_pool.get()
396    }
397}
398
399// ── Test helpers ──────────────────────────────────────────────────────
400
401impl Runtime {
402    /// Creates a lightweight [`Runtime`] for tests with minimal thread pools.
403    ///
404    /// If called from within a tokio runtime (e.g. `#[tokio::test]`), attaches to the existing
405    /// handle to avoid shutdown panics when the test runtime is dropped.
406    pub fn test() -> Self {
407        let config = match Handle::try_current() {
408            Ok(handle) => Self::test_config().with_tokio(TokioConfig::existing_handle(handle)),
409            Err(_) => Self::test_config(),
410        };
411        RuntimeBuilder::new(config).build().expect("failed to build test Runtime")
412    }
413
414    const fn test_config() -> RuntimeConfig {
415        RuntimeConfig {
416            tokio: TokioConfig::Owned {
417                worker_threads: Some(2),
418                thread_keep_alive: DEFAULT_THREAD_KEEP_ALIVE,
419                thread_name: "tokio-test",
420            },
421            #[cfg(feature = "rayon")]
422            rayon: RayonConfig {
423                cpu_threads: Some(2),
424                reserved_cpu_cores: 0,
425                rpc_threads: Some(2),
426                storage_threads: Some(2),
427                max_blocking_tasks: 16,
428                proof_storage_worker_threads: Some(2),
429                proof_account_worker_threads: Some(2),
430                prewarming_threads: Some(2),
431                bal_streaming_threads: Some(2),
432            },
433        }
434    }
435}
436
437// ── Spawn methods ─────────────────────────────────────────────────────
438
439/// Determines how a task is spawned.
440enum TaskKind {
441    /// Spawn the task to the default executor [`Handle::spawn`].
442    Default,
443    /// Spawn the task to the blocking executor [`Handle::spawn_blocking`].
444    Blocking,
445}
446
447impl Runtime {
448    /// Returns the receiver of the shutdown signal.
449    pub fn on_shutdown_signal(&self) -> &Shutdown {
450        &self.0.on_shutdown
451    }
452
453    /// Spawns a future on the tokio runtime depending on the [`TaskKind`].
454    fn spawn_on_rt<F>(&self, fut: F, task_kind: TaskKind) -> JoinHandle<()>
455    where
456        F: Future<Output = ()> + Send + 'static,
457    {
458        match task_kind {
459            TaskKind::Default => self.0.handle.spawn(fut),
460            TaskKind::Blocking => {
461                let handle = self.0.handle.clone();
462                self.0.handle.spawn_blocking(move || handle.block_on(fut))
463            }
464        }
465    }
466
467    /// Spawns a regular task depending on the given [`TaskKind`].
468    fn spawn_task_as<F>(&self, fut: F, task_kind: TaskKind) -> JoinHandle<()>
469    where
470        F: Future<Output = ()> + Send + 'static,
471    {
472        match task_kind {
473            TaskKind::Default => self.0.metrics.inc_regular_tasks(),
474            TaskKind::Blocking => self.0.metrics.inc_regular_blocking_tasks(),
475        }
476        let on_shutdown = self.0.on_shutdown.clone();
477
478        let finished_counter = match task_kind {
479            TaskKind::Default => self.0.metrics.finished_regular_tasks_total.clone(),
480            TaskKind::Blocking => self.0.metrics.finished_regular_blocking_tasks_total.clone(),
481        };
482
483        let task = {
484            async move {
485                let _inc_counter_on_drop = IncCounterOnDrop::new(finished_counter);
486                let fut = pin!(fut);
487                let _ = select(on_shutdown, fut).await;
488            }
489        }
490        .in_current_span();
491
492        self.spawn_on_rt(task, task_kind)
493    }
494
495    /// Spawns the task onto the runtime.
496    /// The given future resolves as soon as the [Shutdown] signal is received.
497    ///
498    /// See also [`Handle::spawn`].
499    pub fn spawn_task<F>(&self, fut: F) -> JoinHandle<()>
500    where
501        F: Future<Output = ()> + Send + 'static,
502    {
503        self.spawn_task_as(fut, TaskKind::Default)
504    }
505
506    /// Spawns a blocking task onto the runtime.
507    /// The given future resolves as soon as the [Shutdown] signal is received.
508    ///
509    /// See also [`Handle::spawn_blocking`].
510    pub fn spawn_blocking_task<F>(&self, fut: F) -> JoinHandle<()>
511    where
512        F: Future<Output = ()> + Send + 'static,
513    {
514        self.spawn_task_as(fut, TaskKind::Blocking)
515    }
516
517    /// Spawns a blocking closure directly on the tokio runtime, bypassing shutdown
518    /// awareness. Useful for raw CPU-bound work.
519    pub fn spawn_blocking<F, R>(&self, func: F) -> JoinHandle<R>
520    where
521        F: FnOnce() -> R + Send + 'static,
522        R: Send + 'static,
523    {
524        self.0.handle.spawn_blocking(func)
525    }
526
527    /// Moves the given value to a dedicated background thread for deallocation.
528    ///
529    /// This is useful when dropping a value is expensive (e.g. large nested collections)
530    /// and should not block the current task. Uses a persistent named thread (`"drop"`)
531    /// to avoid thread creation overhead on hot paths.
532    pub fn spawn_drop<T: Send + 'static>(&self, value: T) {
533        self.spawn_blocking_named("drop", move || drop(value));
534    }
535
536    /// Spawns a blocking closure on a dedicated, named OS thread.
537    ///
538    /// Unlike [`spawn_blocking`](Self::spawn_blocking) which uses tokio's blocking thread pool,
539    /// this reuses the same OS thread for all tasks submitted under the same `name`. The thread
540    /// is created lazily on first use and its OS thread name is set to `name`.
541    ///
542    /// This is useful for tasks that benefit from running on a stable thread, e.g. for
543    /// thread-local state reuse or to avoid thread creation overhead on hot paths.
544    ///
545    /// Returns a [`LazyHandle`](crate::LazyHandle) handle that resolves on first access and caches
546    /// the result.
547    pub fn spawn_blocking_named<F, R>(&self, name: &'static str, func: F) -> crate::LazyHandle<R>
548    where
549        F: FnOnce() -> R + Send + 'static,
550        R: Send + 'static,
551    {
552        crate::LazyHandle::new(self.0.worker_map.spawn_on(name, func))
553    }
554
555    /// Spawns the task onto the runtime.
556    /// The given future resolves as soon as the [Shutdown] signal is received.
557    ///
558    /// See also [`Handle::spawn`].
559    pub fn spawn_with_signal<F>(&self, f: impl FnOnce(Shutdown) -> F) -> JoinHandle<()>
560    where
561        F: Future<Output = ()> + Send + 'static,
562    {
563        let on_shutdown = self.0.on_shutdown.clone();
564        let fut = f(on_shutdown);
565        let task = fut.in_current_span();
566        self.0.handle.spawn(task)
567    }
568
569    /// Spawns a critical task depending on the given [`TaskKind`].
570    fn spawn_critical_as<F>(
571        &self,
572        name: &'static str,
573        fut: F,
574        task_kind: TaskKind,
575    ) -> JoinHandle<()>
576    where
577        F: Future<Output = ()> + Send + 'static,
578    {
579        self.0.metrics.inc_critical_tasks();
580        let panicked_tasks_tx = self.0.task_events_tx.clone();
581        let on_shutdown = self.0.on_shutdown.clone();
582
583        // wrap the task in catch unwind
584        let task = std::panic::AssertUnwindSafe(fut)
585            .catch_unwind()
586            .map_err(move |error| {
587                let task_error = PanickedTaskError::new(name, error);
588                error!("{task_error}");
589                let _ = panicked_tasks_tx.send(TaskEvent::Panic(task_error));
590            })
591            .in_current_span();
592
593        let finished_critical_tasks_total_metrics =
594            self.0.metrics.finished_critical_tasks_total.clone();
595        let task = async move {
596            let _inc_counter_on_drop = IncCounterOnDrop::new(finished_critical_tasks_total_metrics);
597            let task = pin!(task);
598            let _ = select(on_shutdown, task).await;
599        };
600
601        self.spawn_on_rt(task, task_kind)
602    }
603
604    /// This spawns a critical task onto the runtime.
605    /// The given future resolves as soon as the [Shutdown] signal is received.
606    ///
607    /// If this task panics, the [`TaskManager`] is notified.
608    pub fn spawn_critical_task<F>(&self, name: &'static str, fut: F) -> JoinHandle<()>
609    where
610        F: Future<Output = ()> + Send + 'static,
611    {
612        self.spawn_critical_as(name, fut, TaskKind::Default)
613    }
614
615    /// This spawns a critical blocking task onto the runtime.
616    /// The given future resolves as soon as the [Shutdown] signal is received.
617    ///
618    /// If this task panics, the [`TaskManager`] is notified.
619    pub fn spawn_critical_blocking_task<F>(&self, name: &'static str, fut: F) -> JoinHandle<()>
620    where
621        F: Future<Output = ()> + Send + 'static,
622    {
623        self.spawn_critical_as(name, fut, TaskKind::Blocking)
624    }
625
626    /// This spawns a critical task onto the runtime.
627    ///
628    /// If this task panics, the [`TaskManager`] is notified.
629    /// The [`TaskManager`] will wait until the given future has completed before shutting down.
630    ///
631    /// # Example
632    ///
633    /// ```no_run
634    /// # async fn t(executor: reth_tasks::TaskExecutor) {
635    ///
636    /// executor.spawn_critical_with_graceful_shutdown_signal("grace", async move |shutdown| {
637    ///     // await the shutdown signal
638    ///     let guard = shutdown.await;
639    ///     // do work before exiting the program
640    ///     tokio::time::sleep(std::time::Duration::from_secs(1)).await;
641    ///     // allow graceful shutdown
642    ///     drop(guard);
643    /// });
644    /// # }
645    /// ```
646    pub fn spawn_critical_with_graceful_shutdown_signal<F>(
647        &self,
648        name: &'static str,
649        f: impl FnOnce(GracefulShutdown) -> F,
650    ) -> JoinHandle<()>
651    where
652        F: Future<Output = ()> + Send + 'static,
653    {
654        let panicked_tasks_tx = self.0.task_events_tx.clone();
655        let on_shutdown = GracefulShutdown::new(
656            self.0.on_shutdown.clone(),
657            GracefulShutdownGuard::new(Arc::clone(&self.0.graceful_tasks)),
658        );
659        let fut = f(on_shutdown);
660
661        // wrap the task in catch unwind
662        let task = std::panic::AssertUnwindSafe(fut)
663            .catch_unwind()
664            .map_err(move |error| {
665                let task_error = PanickedTaskError::new(name, error);
666                error!("{task_error}");
667                let _ = panicked_tasks_tx.send(TaskEvent::Panic(task_error));
668            })
669            .map(drop)
670            .in_current_span();
671
672        self.0.handle.spawn(task)
673    }
674
675    /// This spawns a regular task onto the runtime.
676    ///
677    /// The [`TaskManager`] will wait until the given future has completed before shutting down.
678    ///
679    /// # Example
680    ///
681    /// ```no_run
682    /// # async fn t(executor: reth_tasks::TaskExecutor) {
683    ///
684    /// executor.spawn_with_graceful_shutdown_signal(async move |shutdown| {
685    ///     // await the shutdown signal
686    ///     let guard = shutdown.await;
687    ///     // do work before exiting the program
688    ///     tokio::time::sleep(std::time::Duration::from_secs(1)).await;
689    ///     // allow graceful shutdown
690    ///     drop(guard);
691    /// });
692    /// # }
693    /// ```
694    pub fn spawn_with_graceful_shutdown_signal<F>(
695        &self,
696        f: impl FnOnce(GracefulShutdown) -> F,
697    ) -> JoinHandle<()>
698    where
699        F: Future<Output = ()> + Send + 'static,
700    {
701        let on_shutdown = GracefulShutdown::new(
702            self.0.on_shutdown.clone(),
703            GracefulShutdownGuard::new(Arc::clone(&self.0.graceful_tasks)),
704        );
705        let fut = f(on_shutdown);
706
707        self.0.handle.spawn(fut)
708    }
709
710    /// Sends a request to the `TaskManager` to initiate a graceful shutdown.
711    ///
712    /// Caution: This will terminate the entire program.
713    pub fn initiate_graceful_shutdown(
714        &self,
715    ) -> Result<GracefulShutdown, tokio::sync::mpsc::error::SendError<()>> {
716        self.0
717            .task_events_tx
718            .send(TaskEvent::GracefulShutdown)
719            .map_err(|_send_error_with_task_event| tokio::sync::mpsc::error::SendError(()))?;
720
721        Ok(GracefulShutdown::new(
722            self.0.on_shutdown.clone(),
723            GracefulShutdownGuard::new(Arc::clone(&self.0.graceful_tasks)),
724        ))
725    }
726
727    /// Fires the shutdown signal and waits until all graceful tasks complete.
728    pub fn graceful_shutdown(&self) {
729        let _ = self.do_graceful_shutdown(None);
730    }
731
732    /// Fires the shutdown signal and waits until all graceful tasks complete or the timeout
733    /// elapses.
734    ///
735    /// Returns `true` if all tasks completed before the timeout.
736    pub fn graceful_shutdown_with_timeout(&self, timeout: Duration) -> bool {
737        self.do_graceful_shutdown(Some(timeout))
738    }
739
740    fn do_graceful_shutdown(&self, timeout: Option<Duration>) -> bool {
741        let _ = self.0.task_events_tx.send(TaskEvent::GracefulShutdown);
742        let deadline = timeout.map(|t| Instant::now() + t);
743        while self.0.graceful_tasks.load(Ordering::SeqCst) > 0 {
744            if deadline.is_some_and(|d| Instant::now() > d) {
745                debug!("graceful shutdown timed out");
746                return false;
747            }
748            std::thread::yield_now();
749        }
750        debug!("gracefully shut down");
751        true
752    }
753}
754
755// ── RuntimeBuilder ────────────────────────────────────────────────────
756
757/// Builder for constructing a [`Runtime`].
758#[derive(Debug, Clone)]
759pub struct RuntimeBuilder {
760    config: RuntimeConfig,
761}
762
763impl RuntimeBuilder {
764    /// Create a new builder with the given configuration.
765    pub const fn new(config: RuntimeConfig) -> Self {
766        Self { config }
767    }
768
769    /// Build the [`Runtime`].
770    ///
771    /// The [`TaskManager`] is automatically spawned as a background task that monitors
772    /// critical tasks for panics. Use [`Runtime::take_task_manager_handle`] to extract
773    /// the join handle if you need to poll for panic errors.
774    #[tracing::instrument(name = "RuntimeBuilder::build", level = "debug", skip_all)]
775    pub fn build(self) -> Result<Runtime, RuntimeBuildError> {
776        debug!(?self.config, "Building runtime");
777        let config = self.config;
778
779        let (owned_runtime, handle) = match &config.tokio {
780            TokioConfig::Owned { worker_threads, thread_keep_alive, thread_name } => {
781                let mut builder = tokio::runtime::Builder::new_multi_thread();
782                builder
783                    .enable_all()
784                    .thread_keep_alive(*thread_keep_alive)
785                    .thread_name(*thread_name);
786
787                if let Some(threads) = worker_threads {
788                    builder.worker_threads(*threads);
789                }
790
791                let runtime = builder.build()?;
792                let h = runtime.handle().clone();
793                (Some(runtime), h)
794            }
795            TokioConfig::ExistingHandle(h) => (None, h.clone()),
796        };
797
798        let (task_manager, on_shutdown, task_events_tx, graceful_tasks) =
799            TaskManager::new_parts(handle.clone());
800
801        #[cfg(feature = "rayon")]
802        let (
803            cpu_pool,
804            rpc_pool,
805            storage_pool,
806            blocking_guard,
807            proof_storage_worker_pool,
808            proof_account_worker_pool,
809            prewarming_pool,
810            bal_streaming_pool,
811        ) = {
812            let default_threads = config.rayon.default_thread_count();
813            let rpc_threads = config.rayon.rpc_threads.unwrap_or(default_threads);
814
815            let cpu_pool = build_pool_with_panic_handler(
816                rayon::ThreadPoolBuilder::new()
817                    .num_threads(default_threads)
818                    .thread_name(|i| format!("cpu-{i:02}")),
819            )?;
820
821            let rpc_raw = build_pool_with_panic_handler(
822                rayon::ThreadPoolBuilder::new()
823                    .num_threads(rpc_threads)
824                    .thread_name(|i| format!("rpc-{i:02}")),
825            )?;
826            let rpc_pool = BlockingTaskPool::new(rpc_raw);
827
828            let storage_threads =
829                config.rayon.storage_threads.unwrap_or(DEFAULT_STORAGE_POOL_THREADS);
830            let storage_pool = build_pool_with_panic_handler(
831                rayon::ThreadPoolBuilder::new()
832                    .num_threads(storage_threads)
833                    .thread_name(|i| format!("storage-{i:02}")),
834            )?;
835
836            let blocking_guard = BlockingTaskGuard::new(config.rayon.max_blocking_tasks);
837
838            let proof_storage_worker_threads =
839                config.rayon.proof_storage_worker_threads.unwrap_or(default_threads * 2);
840            let proof_storage_worker_pool = WorkerPool::from_builder(
841                rayon::ThreadPoolBuilder::new()
842                    .num_threads(proof_storage_worker_threads)
843                    .thread_name(|i| format!("proof-strg-{i:02}")),
844            )?;
845
846            let proof_account_worker_threads =
847                config.rayon.proof_account_worker_threads.unwrap_or(default_threads * 2);
848            let proof_account_worker_pool = WorkerPool::from_builder(
849                rayon::ThreadPoolBuilder::new()
850                    .num_threads(proof_account_worker_threads)
851                    .thread_name(|i| format!("proof-acct-{i:02}")),
852            )?;
853
854            let prewarming_threads = config.rayon.prewarming_threads.unwrap_or(default_threads);
855            let prewarming_pool = WorkerPool::from_builder(
856                rayon::ThreadPoolBuilder::new()
857                    .num_threads(prewarming_threads)
858                    .thread_name(|i| format!("prewarm-{i:02}")),
859            )?;
860
861            let bal_streaming_threads =
862                config.rayon.bal_streaming_threads.unwrap_or(default_threads);
863            let bal_streaming_pool = LazyWorkerPool::new(bal_streaming_threads, "bal-stream");
864
865            debug!(
866                default_threads,
867                rpc_threads,
868                storage_threads,
869                proof_storage_worker_threads,
870                proof_account_worker_threads,
871                prewarming_threads,
872                bal_streaming_threads,
873                max_blocking_tasks = config.rayon.max_blocking_tasks,
874                "Initialized rayon thread pools and configured lazy BAL streaming pool"
875            );
876
877            (
878                cpu_pool,
879                rpc_pool,
880                storage_pool,
881                blocking_guard,
882                proof_storage_worker_pool,
883                proof_account_worker_pool,
884                prewarming_pool,
885                bal_streaming_pool,
886            )
887        };
888
889        let task_manager_handle = handle.spawn(async move {
890            let result = task_manager.await;
891            if let Err(ref err) = result {
892                debug!("{err}");
893            }
894            result
895        });
896
897        let inner = RuntimeInner {
898            _tokio_runtime: owned_runtime,
899            handle,
900            on_shutdown,
901            task_events_tx,
902            metrics: Default::default(),
903            graceful_tasks,
904            #[cfg(feature = "rayon")]
905            cpu_pool,
906            #[cfg(feature = "rayon")]
907            rpc_pool,
908            #[cfg(feature = "rayon")]
909            storage_pool,
910            #[cfg(feature = "rayon")]
911            blocking_guard,
912            #[cfg(feature = "rayon")]
913            proof_storage_worker_pool,
914            #[cfg(feature = "rayon")]
915            proof_account_worker_pool,
916            #[cfg(feature = "rayon")]
917            prewarming_pool,
918            #[cfg(feature = "rayon")]
919            bal_streaming_pool,
920            worker_map: WorkerMap::new(),
921            task_manager_handle: Mutex::new(Some(task_manager_handle)),
922        };
923
924        Ok(Runtime(Arc::new(inner)))
925    }
926}
927
928#[cfg(test)]
929mod tests {
930    use super::*;
931
932    #[test]
933    fn test_runtime_config_default() {
934        let config = RuntimeConfig::default();
935        assert!(matches!(config.tokio, TokioConfig::Owned { .. }));
936    }
937
938    #[test]
939    fn test_runtime_config_existing_handle() {
940        let rt = TokioRuntime::new().unwrap();
941        let config =
942            Runtime::test_config().with_tokio(TokioConfig::existing_handle(rt.handle().clone()));
943        assert!(matches!(config.tokio, TokioConfig::ExistingHandle(_)));
944    }
945
946    #[cfg(feature = "rayon")]
947    #[test]
948    fn test_rayon_config_thread_count() {
949        let config = RayonConfig::default();
950        let count = config.default_thread_count();
951        assert!(count >= 1);
952    }
953
954    #[test]
955    fn test_runtime_builder() {
956        let rt = TokioRuntime::new().unwrap();
957        let config =
958            Runtime::test_config().with_tokio(TokioConfig::existing_handle(rt.handle().clone()));
959        let runtime = RuntimeBuilder::new(config).build().unwrap();
960        let _ = runtime.handle();
961    }
962
963    #[cfg(feature = "rayon")]
964    #[test]
965    fn test_bal_streaming_pool_is_lazy() {
966        let runtime = Runtime::test();
967
968        assert!(runtime.0.bal_streaming_pool.pool.get().is_none());
969
970        assert_eq!(runtime.bal_streaming_pool().current_num_threads(), 2);
971        assert!(runtime.0.bal_streaming_pool.pool.get().is_some());
972    }
973}