reth_rpc_eth_api/helpers/
blocking_task.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
//! Spawns a blocking task. CPU heavy tasks are executed with the `rayon` library. IO heavy tasks
//! are executed on the `tokio` runtime.

use futures::Future;
use reth_rpc_eth_types::EthApiError;
use reth_tasks::{
    pool::{BlockingTaskGuard, BlockingTaskPool},
    TaskSpawner,
};
use tokio::sync::{oneshot, AcquireError, OwnedSemaphorePermit};

use crate::EthApiTypes;

/// Executes code on a blocking thread.
pub trait SpawnBlocking: EthApiTypes + Clone + Send + Sync + 'static {
    /// Returns a handle for spawning IO heavy blocking tasks.
    ///
    /// Runtime access in default trait method implementations.
    fn io_task_spawner(&self) -> impl TaskSpawner;

    /// Returns a handle for spawning CPU heavy blocking tasks.
    ///
    /// Thread pool access in default trait method implementations.
    fn tracing_task_pool(&self) -> &BlockingTaskPool;

    /// Returns handle to semaphore for pool of CPU heavy blocking tasks.
    fn tracing_task_guard(&self) -> &BlockingTaskGuard;

    /// See also [`Semaphore::acquire_owned`](`tokio::sync::Semaphore::acquire_owned`).
    fn acquire_owned(
        &self,
    ) -> impl Future<Output = Result<OwnedSemaphorePermit, AcquireError>> + Send {
        self.tracing_task_guard().clone().acquire_owned()
    }

    /// See also  [`Semaphore::acquire_many_owned`](`tokio::sync::Semaphore::acquire_many_owned`).
    fn acquire_many_owned(
        &self,
        n: u32,
    ) -> impl Future<Output = Result<OwnedSemaphorePermit, AcquireError>> + Send {
        self.tracing_task_guard().clone().acquire_many_owned(n)
    }

    /// Executes the future on a new blocking task.
    ///
    /// Note: This is expected for futures that are dominated by blocking IO operations, for tracing
    /// or CPU bound operations in general use [`spawn_tracing`](Self::spawn_tracing).
    fn spawn_blocking_io<F, R>(&self, f: F) -> impl Future<Output = Result<R, Self::Error>> + Send
    where
        F: FnOnce(Self) -> Result<R, Self::Error> + Send + 'static,
        R: Send + 'static,
    {
        let (tx, rx) = oneshot::channel();
        let this = self.clone();
        self.io_task_spawner().spawn_blocking(Box::pin(async move {
            let res = async move { f(this) }.await;
            let _ = tx.send(res);
        }));

        async move { rx.await.map_err(|_| EthApiError::InternalEthError)? }
    }

    /// Executes a blocking task on the tracing pool.
    ///
    /// Note: This is expected for futures that are predominantly CPU bound, as it uses `rayon`
    /// under the hood, for blocking IO futures use [`spawn_blocking`](Self::spawn_blocking_io). See
    /// <https://ryhl.io/blog/async-what-is-blocking/>.
    fn spawn_tracing<F, R>(&self, f: F) -> impl Future<Output = Result<R, Self::Error>> + Send
    where
        F: FnOnce(Self) -> Result<R, Self::Error> + Send + 'static,
        R: Send + 'static,
    {
        let this = self.clone();
        let fut = self.tracing_task_pool().spawn(move || f(this));
        async move { fut.await.map_err(|_| EthApiError::InternalBlockingTaskError)? }
    }
}