1//! Executor for mixed I/O and CPU workloads.
23use rayon::ThreadPool as RayonPool;
4use std::sync::{Arc, OnceLock};
5use tokio::{
6 runtime::{Handle, Runtime},
7 task::JoinHandle,
8};
910/// An executor for mixed I/O and CPU workloads.
11///
12/// This type has access to its own rayon pool and uses tokio to spawn blocking tasks.
13///
14/// It will reuse an existing tokio runtime if available or create its own.
15#[derive(Debug, Clone)]
16pub struct WorkloadExecutor {
17 inner: WorkloadExecutorInner,
18}
1920impl Defaultfor WorkloadExecutor {
21fn default() -> Self {
22Self { inner: WorkloadExecutorInner::new(rayon::ThreadPoolBuilder::new().build().unwrap()) }
23 }
24}
2526impl WorkloadExecutor {
27/// Creates a new executor with the given number of threads for cpu bound work (rayon).
28#[allow(unused)]
29pub(super) fn with_num_cpu_threads(cpu_threads: usize) -> Self {
30Self {
31 inner: WorkloadExecutorInner::new(
32 rayon::ThreadPoolBuilder::new().num_threads(cpu_threads).build().unwrap(),
33 ),
34 }
35 }
3637/// Returns the handle to the tokio runtime
38pub(super) fn handle(&self) -> &Handle {
39&self.inner.handle
40 }
4142/// Shorthand for [`Runtime::spawn_blocking`]
43#[track_caller]
44pub(super) fn spawn_blocking<F, R>(&self, func: F) -> JoinHandle<R>
45where
46F: FnOnce() -> R + Send + 'static,
47 R: Send + 'static,
48 {
49self.inner.handle.spawn_blocking(func)
50 }
5152/// Returns access to the rayon pool
53pub(super) fn rayon_pool(&self) -> &Arc<rayon::ThreadPool> {
54&self.inner.rayon_pool
55 }
56}
5758#[derive(Debug, Clone)]
59struct WorkloadExecutorInner {
60 handle: Handle,
61 rayon_pool: Arc<RayonPool>,
62}
6364impl WorkloadExecutorInner {
65fn new(rayon_pool: rayon::ThreadPool) -> Self {
66fn get_runtime_handle() -> Handle {
67 Handle::try_current().unwrap_or_else(|_| {
68// Create a new runtime if now runtime is available
69static RT: OnceLock<Runtime> = OnceLock::new();
7071let rt = RT.get_or_init(|| Runtime::new().unwrap());
7273rt.handle().clone()
74 })
75 }
7677Self { handle: get_runtime_handle(), rayon_pool: Arc::new(rayon_pool) }
78 }
79}