reth_engine_tree/tree/payload_processor/
executor.rs

1//! Executor for mixed I/O and CPU workloads.
2
3use rayon::ThreadPool as RayonPool;
4use std::{
5    sync::{Arc, OnceLock},
6    time::Duration,
7};
8use tokio::{
9    runtime::{Builder, Handle, Runtime},
10    task::JoinHandle,
11};
12
13/// An executor for mixed I/O and CPU workloads.
14///
15/// This type has access to its own rayon pool and uses tokio to spawn blocking tasks.
16///
17/// It will reuse an existing tokio runtime if available or create its own.
18#[derive(Debug, Clone)]
19pub struct WorkloadExecutor {
20    inner: WorkloadExecutorInner,
21}
22
23impl Default for WorkloadExecutor {
24    fn default() -> Self {
25        Self { inner: WorkloadExecutorInner::new(rayon::ThreadPoolBuilder::new().build().unwrap()) }
26    }
27}
28
29impl WorkloadExecutor {
30    /// Creates a new executor with the given number of threads for cpu bound work (rayon).
31    #[expect(unused)]
32    pub(super) fn with_num_cpu_threads(cpu_threads: usize) -> Self {
33        Self {
34            inner: WorkloadExecutorInner::new(
35                rayon::ThreadPoolBuilder::new().num_threads(cpu_threads).build().unwrap(),
36            ),
37        }
38    }
39
40    /// Returns the handle to the tokio runtime
41    pub(super) const fn handle(&self) -> &Handle {
42        &self.inner.handle
43    }
44
45    /// Shorthand for [`Runtime::spawn_blocking`]
46    #[track_caller]
47    pub(super) fn spawn_blocking<F, R>(&self, func: F) -> JoinHandle<R>
48    where
49        F: FnOnce() -> R + Send + 'static,
50        R: Send + 'static,
51    {
52        self.inner.handle.spawn_blocking(func)
53    }
54
55    /// Returns access to the rayon pool
56    #[expect(unused)]
57    pub(super) const fn rayon_pool(&self) -> &Arc<rayon::ThreadPool> {
58        &self.inner.rayon_pool
59    }
60}
61
62#[derive(Debug, Clone)]
63struct WorkloadExecutorInner {
64    handle: Handle,
65    rayon_pool: Arc<RayonPool>,
66}
67
68impl WorkloadExecutorInner {
69    fn new(rayon_pool: rayon::ThreadPool) -> Self {
70        fn get_runtime_handle() -> Handle {
71            Handle::try_current().unwrap_or_else(|_| {
72                // Create a new runtime if no runtime is available
73                static RT: OnceLock<Runtime> = OnceLock::new();
74
75                let rt = RT.get_or_init(|| {
76                    Builder::new_multi_thread()
77                        .enable_all()
78                        // Keep the threads alive for at least the block time, which is 12 seconds
79                        // at the time of writing, plus a little extra.
80                        //
81                        // This is to prevent the costly process of spawning new threads on every
82                        // new block, and instead reuse the existing
83                        // threads.
84                        .thread_keep_alive(Duration::from_secs(15))
85                        .build()
86                        .unwrap()
87                });
88
89                rt.handle().clone()
90            })
91        }
92
93        Self { handle: get_runtime_handle(), rayon_pool: Arc::new(rayon_pool) }
94    }
95}