reth_tasks/
pool.rs

1//! Additional helpers for executing tracing calls
2
3use std::{
4    future::Future,
5    panic::{catch_unwind, AssertUnwindSafe},
6    pin::Pin,
7    sync::Arc,
8    task::{ready, Context, Poll},
9    thread,
10};
11use tokio::sync::{oneshot, AcquireError, OwnedSemaphorePermit, Semaphore};
12
13/// RPC Tracing call guard semaphore.
14///
15/// This is used to restrict the number of concurrent RPC requests to tracing methods like
16/// `debug_traceTransaction` as well as `eth_getProof` because they can consume a lot of
17/// memory and CPU.
18///
19/// This types serves as an entry guard for the [`BlockingTaskPool`] and is used to rate limit
20/// parallel blocking tasks in the pool.
21#[derive(Clone, Debug)]
22pub struct BlockingTaskGuard(Arc<Semaphore>);
23
24impl BlockingTaskGuard {
25    /// Create a new `BlockingTaskGuard` with the given maximum number of blocking tasks in
26    /// parallel.
27    pub fn new(max_blocking_tasks: usize) -> Self {
28        Self(Arc::new(Semaphore::new(max_blocking_tasks)))
29    }
30
31    /// See also [`Semaphore::acquire_owned`]
32    pub async fn acquire_owned(self) -> Result<OwnedSemaphorePermit, AcquireError> {
33        self.0.acquire_owned().await
34    }
35
36    /// See also [`Semaphore::acquire_many_owned`]
37    pub async fn acquire_many_owned(self, n: u32) -> Result<OwnedSemaphorePermit, AcquireError> {
38        self.0.acquire_many_owned(n).await
39    }
40}
41
42/// Used to execute blocking tasks on a rayon threadpool from within a tokio runtime.
43///
44/// This is a dedicated threadpool for blocking tasks which are CPU bound.
45/// RPC calls that perform blocking IO (disk lookups) are not executed on this pool but on the tokio
46/// runtime's blocking pool, which performs poorly with CPU bound tasks (see
47/// <https://ryhl.io/blog/async-what-is-blocking/>). Once the tokio blocking
48/// pool is saturated it is converted into a queue, blocking tasks could then interfere with the
49/// queue and block other RPC calls.
50///
51/// See also [tokio-docs] for more information.
52///
53/// [tokio-docs]: https://docs.rs/tokio/latest/tokio/index.html#cpu-bound-tasks-and-blocking-code
54#[derive(Clone, Debug)]
55pub struct BlockingTaskPool {
56    pool: Arc<rayon::ThreadPool>,
57}
58
59impl BlockingTaskPool {
60    /// Create a new `BlockingTaskPool` with the given threadpool.
61    pub fn new(pool: rayon::ThreadPool) -> Self {
62        Self { pool: Arc::new(pool) }
63    }
64
65    /// Convenience function to start building a new threadpool.
66    pub fn builder() -> rayon::ThreadPoolBuilder {
67        rayon::ThreadPoolBuilder::new()
68    }
69
70    /// Convenience function to build a new threadpool with the default configuration.
71    ///
72    /// Uses [`rayon::ThreadPoolBuilder::build`](rayon::ThreadPoolBuilder::build) defaults but
73    /// increases the stack size to 8MB.
74    pub fn build() -> Result<Self, rayon::ThreadPoolBuildError> {
75        Self::builder().build().map(Self::new)
76    }
77
78    /// Asynchronous wrapper around Rayon's
79    /// [`ThreadPool::spawn`](rayon::ThreadPool::spawn).
80    ///
81    /// Runs a function on the configured threadpool, returning a future that resolves with the
82    /// function's return value.
83    ///
84    /// If the function panics, the future will resolve to an error.
85    pub fn spawn<F, R>(&self, func: F) -> BlockingTaskHandle<R>
86    where
87        F: FnOnce() -> R + Send + 'static,
88        R: Send + 'static,
89    {
90        let (tx, rx) = oneshot::channel();
91
92        self.pool.spawn(move || {
93            let _result = tx.send(catch_unwind(AssertUnwindSafe(func)));
94        });
95
96        BlockingTaskHandle { rx }
97    }
98
99    /// Asynchronous wrapper around Rayon's
100    /// [`ThreadPool::spawn_fifo`](rayon::ThreadPool::spawn_fifo).
101    ///
102    /// Runs a function on the configured threadpool, returning a future that resolves with the
103    /// function's return value.
104    ///
105    /// If the function panics, the future will resolve to an error.
106    pub fn spawn_fifo<F, R>(&self, func: F) -> BlockingTaskHandle<R>
107    where
108        F: FnOnce() -> R + Send + 'static,
109        R: Send + 'static,
110    {
111        let (tx, rx) = oneshot::channel();
112
113        self.pool.spawn_fifo(move || {
114            let _result = tx.send(catch_unwind(AssertUnwindSafe(func)));
115        });
116
117        BlockingTaskHandle { rx }
118    }
119}
120
121/// Async handle for a blocking task running in a Rayon thread pool.
122///
123/// ## Panics
124///
125/// If polled from outside a tokio runtime.
126#[derive(Debug)]
127#[must_use = "futures do nothing unless you `.await` or poll them"]
128#[pin_project::pin_project]
129pub struct BlockingTaskHandle<T> {
130    #[pin]
131    pub(crate) rx: oneshot::Receiver<thread::Result<T>>,
132}
133
134impl<T> Future for BlockingTaskHandle<T> {
135    type Output = thread::Result<T>;
136
137    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
138        match ready!(self.project().rx.poll(cx)) {
139            Ok(res) => Poll::Ready(res),
140            Err(_) => Poll::Ready(Err(Box::<TokioBlockingTaskError>::default())),
141        }
142    }
143}
144
145/// An error returned when the Tokio channel is dropped while awaiting a result.
146///
147/// This should only happen
148#[derive(Debug, Default, thiserror::Error)]
149#[error("tokio channel dropped while awaiting result")]
150#[non_exhaustive]
151pub struct TokioBlockingTaskError;
152
153#[cfg(test)]
154mod tests {
155    use super::*;
156
157    #[tokio::test]
158    async fn blocking_pool() {
159        let pool = BlockingTaskPool::build().unwrap();
160        let res = pool.spawn(move || 5);
161        let res = res.await.unwrap();
162        assert_eq!(res, 5);
163    }
164
165    #[tokio::test]
166    async fn blocking_pool_panic() {
167        let pool = BlockingTaskPool::build().unwrap();
168        let res = pool.spawn(move || -> i32 {
169            panic!();
170        });
171        let res = res.await;
172        assert!(res.is_err());
173    }
174}