reth_engine_tree/tree/payload_processor/
executor.rs1use rayon::ThreadPool as RayonPool;
4use std::{
5 sync::{Arc, OnceLock},
6 time::Duration,
7};
8use tokio::{
9 runtime::{Builder, Handle, Runtime},
10 task::JoinHandle,
11};
12
13#[derive(Debug, Clone)]
19pub struct WorkloadExecutor {
20 inner: WorkloadExecutorInner,
21}
22
23impl Default for WorkloadExecutor {
24 fn default() -> Self {
25 Self { inner: WorkloadExecutorInner::new(rayon::ThreadPoolBuilder::new().build().unwrap()) }
26 }
27}
28
29impl WorkloadExecutor {
30 #[expect(unused)]
32 pub(super) fn with_num_cpu_threads(cpu_threads: usize) -> Self {
33 Self {
34 inner: WorkloadExecutorInner::new(
35 rayon::ThreadPoolBuilder::new().num_threads(cpu_threads).build().unwrap(),
36 ),
37 }
38 }
39
40 pub(super) const fn handle(&self) -> &Handle {
42 &self.inner.handle
43 }
44
45 #[track_caller]
47 pub(super) fn spawn_blocking<F, R>(&self, func: F) -> JoinHandle<R>
48 where
49 F: FnOnce() -> R + Send + 'static,
50 R: Send + 'static,
51 {
52 self.inner.handle.spawn_blocking(func)
53 }
54
55 #[expect(unused)]
57 pub(super) const fn rayon_pool(&self) -> &Arc<rayon::ThreadPool> {
58 &self.inner.rayon_pool
59 }
60}
61
62#[derive(Debug, Clone)]
63struct WorkloadExecutorInner {
64 handle: Handle,
65 rayon_pool: Arc<RayonPool>,
66}
67
68impl WorkloadExecutorInner {
69 fn new(rayon_pool: rayon::ThreadPool) -> Self {
70 fn get_runtime_handle() -> Handle {
71 Handle::try_current().unwrap_or_else(|_| {
72 static RT: OnceLock<Runtime> = OnceLock::new();
74
75 let rt = RT.get_or_init(|| {
76 Builder::new_multi_thread()
77 .enable_all()
78 .thread_keep_alive(Duration::from_secs(15))
85 .build()
86 .unwrap()
87 });
88
89 rt.handle().clone()
90 })
91 }
92
93 Self { handle: get_runtime_handle(), rayon_pool: Arc::new(rayon_pool) }
94 }
95}