reth_engine_tree/tree/payload_processor/
executor.rs

1//! Executor for mixed I/O and CPU workloads.
2
3use std::{sync::OnceLock, time::Duration};
4use tokio::{
5    runtime::{Builder, Handle, Runtime},
6    task::JoinHandle,
7};
8
9/// An executor for mixed I/O and CPU workloads.
10///
11/// This type uses tokio to spawn blocking tasks and will reuse an existing tokio
12/// runtime if available or create its own.
13#[derive(Debug, Clone)]
14pub struct WorkloadExecutor {
15    inner: WorkloadExecutorInner,
16}
17
18impl Default for WorkloadExecutor {
19    fn default() -> Self {
20        Self { inner: WorkloadExecutorInner::new() }
21    }
22}
23
24impl WorkloadExecutor {
25    /// Returns the handle to the tokio runtime
26    pub(super) const fn handle(&self) -> &Handle {
27        &self.inner.handle
28    }
29
30    /// Shorthand for [`Runtime::spawn_blocking`]
31    #[track_caller]
32    pub(super) fn spawn_blocking<F, R>(&self, func: F) -> JoinHandle<R>
33    where
34        F: FnOnce() -> R + Send + 'static,
35        R: Send + 'static,
36    {
37        self.inner.handle.spawn_blocking(func)
38    }
39}
40
41#[derive(Debug, Clone)]
42struct WorkloadExecutorInner {
43    handle: Handle,
44}
45
46impl WorkloadExecutorInner {
47    fn new() -> Self {
48        fn get_runtime_handle() -> Handle {
49            Handle::try_current().unwrap_or_else(|_| {
50                // Create a new runtime if no runtime is available
51                static RT: OnceLock<Runtime> = OnceLock::new();
52
53                let rt = RT.get_or_init(|| {
54                    Builder::new_multi_thread()
55                        .enable_all()
56                        // Keep the threads alive for at least the block time, which is 12 seconds
57                        // at the time of writing, plus a little extra.
58                        //
59                        // This is to prevent the costly process of spawning new threads on every
60                        // new block, and instead reuse the existing
61                        // threads.
62                        .thread_keep_alive(Duration::from_secs(15))
63                        .build()
64                        .unwrap()
65                });
66
67                rt.handle().clone()
68            })
69        }
70
71        Self { handle: get_runtime_handle() }
72    }
73}