reth_tasks/pool.rs
1//! Additional helpers for executing tracing calls
2
3use std::{
4 future::Future,
5 panic::{catch_unwind, AssertUnwindSafe},
6 pin::Pin,
7 sync::Arc,
8 task::{ready, Context, Poll},
9 thread,
10};
11use tokio::sync::{oneshot, AcquireError, OwnedSemaphorePermit, Semaphore};
12
13/// RPC Tracing call guard semaphore.
14///
15/// This is used to restrict the number of concurrent RPC requests to tracing methods like
16/// `debug_traceTransaction` as well as `eth_getProof` because they can consume a lot of
17/// memory and CPU.
18///
19/// This types serves as an entry guard for the [`BlockingTaskPool`] and is used to rate limit
20/// parallel blocking tasks in the pool.
21#[derive(Clone, Debug)]
22pub struct BlockingTaskGuard(Arc<Semaphore>);
23
24impl BlockingTaskGuard {
25 /// Create a new `BlockingTaskGuard` with the given maximum number of blocking tasks in
26 /// parallel.
27 pub fn new(max_blocking_tasks: usize) -> Self {
28 Self(Arc::new(Semaphore::new(max_blocking_tasks)))
29 }
30
31 /// See also [`Semaphore::acquire_owned`]
32 pub async fn acquire_owned(self) -> Result<OwnedSemaphorePermit, AcquireError> {
33 self.0.acquire_owned().await
34 }
35
36 /// See also [`Semaphore::acquire_many_owned`]
37 pub async fn acquire_many_owned(self, n: u32) -> Result<OwnedSemaphorePermit, AcquireError> {
38 self.0.acquire_many_owned(n).await
39 }
40}
41
42/// Used to execute blocking tasks on a rayon threadpool from within a tokio runtime.
43///
44/// This is a dedicated threadpool for blocking tasks which are CPU bound.
45/// RPC calls that perform blocking IO (disk lookups) are not executed on this pool but on the tokio
46/// runtime's blocking pool, which performs poorly with CPU bound tasks (see
47/// <https://ryhl.io/blog/async-what-is-blocking/>). Once the tokio blocking
48/// pool is saturated it is converted into a queue, blocking tasks could then interfere with the
49/// queue and block other RPC calls.
50///
51/// See also [tokio-docs] for more information.
52///
53/// [tokio-docs]: https://docs.rs/tokio/latest/tokio/index.html#cpu-bound-tasks-and-blocking-code
54#[derive(Clone, Debug)]
55pub struct BlockingTaskPool {
56 pool: Arc<rayon::ThreadPool>,
57}
58
59impl BlockingTaskPool {
60 /// Create a new `BlockingTaskPool` with the given threadpool.
61 pub fn new(pool: rayon::ThreadPool) -> Self {
62 Self { pool: Arc::new(pool) }
63 }
64
65 /// Convenience function to start building a new threadpool.
66 pub fn builder() -> rayon::ThreadPoolBuilder {
67 rayon::ThreadPoolBuilder::new()
68 }
69
70 /// Convenience function to build a new threadpool with the default configuration.
71 ///
72 /// Uses [`rayon::ThreadPoolBuilder::build`](rayon::ThreadPoolBuilder::build) defaults but
73 /// increases the stack size to 8MB.
74 pub fn build() -> Result<Self, rayon::ThreadPoolBuildError> {
75 Self::builder().build().map(Self::new)
76 }
77
78 /// Asynchronous wrapper around Rayon's
79 /// [`ThreadPool::spawn`](rayon::ThreadPool::spawn).
80 ///
81 /// Runs a function on the configured threadpool, returning a future that resolves with the
82 /// function's return value.
83 ///
84 /// If the function panics, the future will resolve to an error.
85 pub fn spawn<F, R>(&self, func: F) -> BlockingTaskHandle<R>
86 where
87 F: FnOnce() -> R + Send + 'static,
88 R: Send + 'static,
89 {
90 let (tx, rx) = oneshot::channel();
91
92 self.pool.spawn(move || {
93 let _result = tx.send(catch_unwind(AssertUnwindSafe(func)));
94 });
95
96 BlockingTaskHandle { rx }
97 }
98
99 /// Asynchronous wrapper around Rayon's
100 /// [`ThreadPool::spawn_fifo`](rayon::ThreadPool::spawn_fifo).
101 ///
102 /// Runs a function on the configured threadpool, returning a future that resolves with the
103 /// function's return value.
104 ///
105 /// If the function panics, the future will resolve to an error.
106 pub fn spawn_fifo<F, R>(&self, func: F) -> BlockingTaskHandle<R>
107 where
108 F: FnOnce() -> R + Send + 'static,
109 R: Send + 'static,
110 {
111 let (tx, rx) = oneshot::channel();
112
113 self.pool.spawn_fifo(move || {
114 let _result = tx.send(catch_unwind(AssertUnwindSafe(func)));
115 });
116
117 BlockingTaskHandle { rx }
118 }
119}
120
121/// Async handle for a blocking task running in a Rayon thread pool.
122///
123/// ## Panics
124///
125/// If polled from outside a tokio runtime.
126#[derive(Debug)]
127#[must_use = "futures do nothing unless you `.await` or poll them"]
128#[pin_project::pin_project]
129pub struct BlockingTaskHandle<T> {
130 #[pin]
131 pub(crate) rx: oneshot::Receiver<thread::Result<T>>,
132}
133
134impl<T> Future for BlockingTaskHandle<T> {
135 type Output = thread::Result<T>;
136
137 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
138 match ready!(self.project().rx.poll(cx)) {
139 Ok(res) => Poll::Ready(res),
140 Err(_) => Poll::Ready(Err(Box::<TokioBlockingTaskError>::default())),
141 }
142 }
143}
144
145/// An error returned when the Tokio channel is dropped while awaiting a result.
146///
147/// This should only happen
148#[derive(Debug, Default, thiserror::Error)]
149#[error("tokio channel dropped while awaiting result")]
150#[non_exhaustive]
151pub struct TokioBlockingTaskError;
152
153#[cfg(test)]
154mod tests {
155 use super::*;
156
157 #[tokio::test]
158 async fn blocking_pool() {
159 let pool = BlockingTaskPool::build().unwrap();
160 let res = pool.spawn(move || 5);
161 let res = res.await.unwrap();
162 assert_eq!(res, 5);
163 }
164
165 #[tokio::test]
166 async fn blocking_pool_panic() {
167 let pool = BlockingTaskPool::build().unwrap();
168 let res = pool.spawn(move || -> i32 {
169 panic!();
170 });
171 let res = res.await;
172 assert!(res.is_err());
173 }
174}