reth_engine_tree/tree/payload_processor/
executor.rs

1//! Executor for mixed I/O and CPU workloads.
2
3use rayon::ThreadPool as RayonPool;
4use std::sync::{Arc, OnceLock};
5use tokio::{
6    runtime::{Handle, Runtime},
7    task::JoinHandle,
8};
9
10/// An executor for mixed I/O and CPU workloads.
11///
12/// This type has access to its own rayon pool and uses tokio to spawn blocking tasks.
13///
14/// It will reuse an existing tokio runtime if available or create its own.
15#[derive(Debug, Clone)]
16pub struct WorkloadExecutor {
17    inner: WorkloadExecutorInner,
18}
19
20impl Default for WorkloadExecutor {
21    fn default() -> Self {
22        Self { inner: WorkloadExecutorInner::new(rayon::ThreadPoolBuilder::new().build().unwrap()) }
23    }
24}
25
26impl WorkloadExecutor {
27    /// Creates a new executor with the given number of threads for cpu bound work (rayon).
28    #[expect(unused)]
29    pub(super) fn with_num_cpu_threads(cpu_threads: usize) -> Self {
30        Self {
31            inner: WorkloadExecutorInner::new(
32                rayon::ThreadPoolBuilder::new().num_threads(cpu_threads).build().unwrap(),
33            ),
34        }
35    }
36
37    /// Returns the handle to the tokio runtime
38    pub(super) const fn handle(&self) -> &Handle {
39        &self.inner.handle
40    }
41
42    /// Shorthand for [`Runtime::spawn_blocking`]
43    #[track_caller]
44    pub(super) fn spawn_blocking<F, R>(&self, func: F) -> JoinHandle<R>
45    where
46        F: FnOnce() -> R + Send + 'static,
47        R: Send + 'static,
48    {
49        self.inner.handle.spawn_blocking(func)
50    }
51
52    /// Returns access to the rayon pool
53    #[expect(unused)]
54    pub(super) const fn rayon_pool(&self) -> &Arc<rayon::ThreadPool> {
55        &self.inner.rayon_pool
56    }
57}
58
59#[derive(Debug, Clone)]
60struct WorkloadExecutorInner {
61    handle: Handle,
62    rayon_pool: Arc<RayonPool>,
63}
64
65impl WorkloadExecutorInner {
66    fn new(rayon_pool: rayon::ThreadPool) -> Self {
67        fn get_runtime_handle() -> Handle {
68            Handle::try_current().unwrap_or_else(|_| {
69                // Create a new runtime if now runtime is available
70                static RT: OnceLock<Runtime> = OnceLock::new();
71
72                let rt = RT.get_or_init(|| Runtime::new().unwrap());
73
74                rt.handle().clone()
75            })
76        }
77
78        Self { handle: get_runtime_handle(), rayon_pool: Arc::new(rayon_pool) }
79    }
80}