reth_rpc_eth_api/helpers/
blocking_task.rs

1//! Spawns a blocking task. CPU heavy tasks are executed with the `rayon` library. IO heavy tasks
2//! are executed on the `tokio` runtime.
3
4use futures::Future;
5use reth_rpc_eth_types::EthApiError;
6use reth_tasks::{
7    pool::{BlockingTaskGuard, BlockingTaskPool},
8    TaskSpawner,
9};
10use tokio::sync::{oneshot, AcquireError, OwnedSemaphorePermit};
11
12use crate::EthApiTypes;
13
14/// Executes code on a blocking thread.
15pub trait SpawnBlocking: EthApiTypes + Clone + Send + Sync + 'static {
16    /// Returns a handle for spawning IO heavy blocking tasks.
17    ///
18    /// Runtime access in default trait method implementations.
19    fn io_task_spawner(&self) -> impl TaskSpawner;
20
21    /// Returns a handle for spawning CPU heavy blocking tasks.
22    ///
23    /// Thread pool access in default trait method implementations.
24    fn tracing_task_pool(&self) -> &BlockingTaskPool;
25
26    /// Returns handle to semaphore for pool of CPU heavy blocking tasks.
27    fn tracing_task_guard(&self) -> &BlockingTaskGuard;
28
29    /// See also [`Semaphore::acquire_owned`](`tokio::sync::Semaphore::acquire_owned`).
30    fn acquire_owned(
31        &self,
32    ) -> impl Future<Output = Result<OwnedSemaphorePermit, AcquireError>> + Send {
33        self.tracing_task_guard().clone().acquire_owned()
34    }
35
36    /// See also [`Semaphore::acquire_many_owned`](`tokio::sync::Semaphore::acquire_many_owned`).
37    fn acquire_many_owned(
38        &self,
39        n: u32,
40    ) -> impl Future<Output = Result<OwnedSemaphorePermit, AcquireError>> + Send {
41        self.tracing_task_guard().clone().acquire_many_owned(n)
42    }
43
44    /// Executes the future on a new blocking task.
45    ///
46    /// Note: This is expected for futures that are dominated by blocking IO operations, for tracing
47    /// or CPU bound operations in general use [`spawn_tracing`](Self::spawn_tracing).
48    fn spawn_blocking_io<F, R>(&self, f: F) -> impl Future<Output = Result<R, Self::Error>> + Send
49    where
50        F: FnOnce(Self) -> Result<R, Self::Error> + Send + 'static,
51        R: Send + 'static,
52    {
53        let (tx, rx) = oneshot::channel();
54        let this = self.clone();
55        self.io_task_spawner().spawn_blocking(Box::pin(async move {
56            let res = f(this);
57            let _ = tx.send(res);
58        }));
59
60        async move { rx.await.map_err(|_| EthApiError::InternalEthError)? }
61    }
62
63    /// Executes a blocking task on the tracing pool.
64    ///
65    /// Note: This is expected for futures that are predominantly CPU bound, as it uses `rayon`
66    /// under the hood, for blocking IO futures use [`spawn_blocking`](Self::spawn_blocking_io). See
67    /// <https://ryhl.io/blog/async-what-is-blocking/>.
68    fn spawn_tracing<F, R>(&self, f: F) -> impl Future<Output = Result<R, Self::Error>> + Send
69    where
70        F: FnOnce(Self) -> Result<R, Self::Error> + Send + 'static,
71        R: Send + 'static,
72    {
73        let this = self.clone();
74        let fut = self.tracing_task_pool().spawn(move || f(this));
75        async move { fut.await.map_err(|_| EthApiError::InternalBlockingTaskError)? }
76    }
77}