reth_engine_tree/tree/payload_processor/
executor.rs

1//! Executor for mixed I/O and CPU workloads.
2
3use rayon::ThreadPool as RayonPool;
4use std::sync::{Arc, OnceLock};
5use tokio::{
6    runtime::{Handle, Runtime},
7    task::JoinHandle,
8};
9
10/// An executor for mixed I/O and CPU workloads.
11///
12/// This type has access to its own rayon pool and uses tokio to spawn blocking tasks.
13///
14/// It will reuse an existing tokio runtime if available or create its own.
15#[derive(Debug, Clone)]
16pub struct WorkloadExecutor {
17    inner: WorkloadExecutorInner,
18}
19
20impl Default for WorkloadExecutor {
21    fn default() -> Self {
22        Self { inner: WorkloadExecutorInner::new(rayon::ThreadPoolBuilder::new().build().unwrap()) }
23    }
24}
25
26impl WorkloadExecutor {
27    /// Creates a new executor with the given number of threads for cpu bound work (rayon).
28    #[allow(unused)]
29    pub(super) fn with_num_cpu_threads(cpu_threads: usize) -> Self {
30        Self {
31            inner: WorkloadExecutorInner::new(
32                rayon::ThreadPoolBuilder::new().num_threads(cpu_threads).build().unwrap(),
33            ),
34        }
35    }
36
37    /// Returns the handle to the tokio runtime
38    pub(super) fn handle(&self) -> &Handle {
39        &self.inner.handle
40    }
41
42    /// Shorthand for [`Runtime::spawn_blocking`]
43    #[track_caller]
44    pub(super) fn spawn_blocking<F, R>(&self, func: F) -> JoinHandle<R>
45    where
46        F: FnOnce() -> R + Send + 'static,
47        R: Send + 'static,
48    {
49        self.inner.handle.spawn_blocking(func)
50    }
51
52    /// Returns access to the rayon pool
53    pub(super) fn rayon_pool(&self) -> &Arc<rayon::ThreadPool> {
54        &self.inner.rayon_pool
55    }
56}
57
58#[derive(Debug, Clone)]
59struct WorkloadExecutorInner {
60    handle: Handle,
61    rayon_pool: Arc<RayonPool>,
62}
63
64impl WorkloadExecutorInner {
65    fn new(rayon_pool: rayon::ThreadPool) -> Self {
66        fn get_runtime_handle() -> Handle {
67            Handle::try_current().unwrap_or_else(|_| {
68                // Create a new runtime if now runtime is available
69                static RT: OnceLock<Runtime> = OnceLock::new();
70
71                let rt = RT.get_or_init(|| Runtime::new().unwrap());
72
73                rt.handle().clone()
74            })
75        }
76
77        Self { handle: get_runtime_handle(), rayon_pool: Arc::new(rayon_pool) }
78    }
79}