reth_rpc_eth_api/helpers/
blocking_task.rs

1//! Spawns a blocking task. CPU heavy tasks are executed with the `rayon` library. IO heavy tasks
2//! are executed on the `tokio` runtime.
3
4use futures::Future;
5use reth_rpc_eth_types::EthApiError;
6use reth_tasks::{
7    pool::{BlockingTaskGuard, BlockingTaskPool},
8    TaskSpawner,
9};
10use std::sync::Arc;
11use tokio::sync::{oneshot, AcquireError, OwnedSemaphorePermit, Semaphore};
12
13use crate::EthApiTypes;
14
15/// Helpers for spawning blocking operations.
16///
17/// Operations can be blocking because they require lots of CPU work and/or IO.
18///
19/// This differentiates between workloads that are primarily CPU bound and heavier in general (such
20/// as tracing tasks) and tasks that have a more balanced profile (io and cpu), such as `eth_call`
21/// and alike.
22///
23/// This provides access to semaphores that permit how many of those are permitted concurrently.
24/// It's expected that tracing related tasks are configured with a lower threshold, because not only
25/// are they CPU heavy but they can also accumulate more memory for the traces.
26pub trait SpawnBlocking: EthApiTypes + Clone + Send + Sync + 'static {
27    /// Returns a handle for spawning IO heavy blocking tasks.
28    ///
29    /// Runtime access in default trait method implementations.
30    fn io_task_spawner(&self) -> impl TaskSpawner;
31
32    /// Returns a handle for spawning __CPU heavy__ blocking tasks, such as tracing requests.
33    ///
34    /// Thread pool access in default trait method implementations.
35    fn tracing_task_pool(&self) -> &BlockingTaskPool;
36
37    /// Returns handle to semaphore for pool of CPU heavy blocking tasks.
38    fn tracing_task_guard(&self) -> &BlockingTaskGuard;
39
40    /// Returns handle to semaphore for blocking IO tasks.
41    ///
42    /// This semaphore is used to limit concurrent blocking IO operations like `eth_call`,
43    /// `eth_estimateGas`, and similar methods that require EVM execution.
44    fn blocking_io_task_guard(&self) -> &Arc<Semaphore>;
45
46    /// Acquires a permit from the tracing task semaphore.
47    ///
48    /// This should be used for __CPU heavy__ operations like `debug_traceTransaction`,
49    /// `debug_traceCall`, and similar tracing methods. These tasks are typically:
50    /// - Primarily CPU bound with intensive computation
51    /// - Can accumulate significant memory for trace results
52    /// - Expected to have lower concurrency limits than general blocking IO tasks
53    ///
54    /// For blocking IO tasks like `eth_call` or `eth_estimateGas`, use
55    /// [`acquire_owned_blocking_io`](Self::acquire_owned_blocking_io) instead.
56    ///
57    /// See also [`Semaphore::acquire_owned`](`tokio::sync::Semaphore::acquire_owned`).
58    fn acquire_owned_tracing(
59        &self,
60    ) -> impl Future<Output = Result<OwnedSemaphorePermit, AcquireError>> + Send {
61        self.tracing_task_guard().clone().acquire_owned()
62    }
63
64    /// Acquires multiple permits from the tracing task semaphore.
65    ///
66    /// This should be used for particularly heavy tracing operations that require more resources
67    /// than a standard trace. The permit count should reflect the expected resource consumption
68    /// relative to a standard tracing operation.
69    ///
70    /// Like [`acquire_owned_tracing`](Self::acquire_owned_tracing), this is specifically for
71    /// CPU-intensive tracing tasks, not general blocking IO operations.
72    ///
73    /// See also [`Semaphore::acquire_many_owned`](`tokio::sync::Semaphore::acquire_many_owned`).
74    fn acquire_many_owned_tracing(
75        &self,
76        n: u32,
77    ) -> impl Future<Output = Result<OwnedSemaphorePermit, AcquireError>> + Send {
78        self.tracing_task_guard().clone().acquire_many_owned(n)
79    }
80
81    /// Acquires a permit from the blocking IO request semaphore.
82    ///
83    /// This should be used for operations like `eth_call`, `eth_estimateGas`, and similar methods
84    /// that require EVM execution and are spawned as blocking tasks.
85    ///
86    /// See also [`Semaphore::acquire_owned`](`tokio::sync::Semaphore::acquire_owned`).
87    fn acquire_owned_blocking_io(
88        &self,
89    ) -> impl Future<Output = Result<OwnedSemaphorePermit, AcquireError>> + Send {
90        self.blocking_io_task_guard().clone().acquire_owned()
91    }
92
93    /// Acquires multiple permits from the blocking IO request semaphore.
94    ///
95    /// This should be used for operations that may require more resources than a single permit
96    /// allows.
97    ///
98    /// See also [`Semaphore::acquire_many_owned`](`tokio::sync::Semaphore::acquire_many_owned`).
99    fn acquire_many_owned_blocking_io(
100        &self,
101        n: u32,
102    ) -> impl Future<Output = Result<OwnedSemaphorePermit, AcquireError>> + Send {
103        self.blocking_io_task_guard().clone().acquire_many_owned(n)
104    }
105
106    /// Acquires permits from the blocking IO request semaphore based on a calculated weight.
107    ///
108    /// The weight determines the maximum number of concurrent requests of this type that can run.
109    /// For example, if the semaphore has 256 total permits and `weight=10`, then at most 10
110    /// concurrent requests of this type are allowed.
111    ///
112    /// The permits acquired per request is calculated as `total_permits / weight`, with an
113    /// adjustment: if this result is even, we add 1 to ensure that `weight - 1` permits are
114    /// always available for other tasks, preventing complete semaphore exhaustion.
115    ///
116    /// This should be used to explicitly limit concurrent requests based on their expected
117    /// resource consumption:
118    ///
119    /// - **Block range queries**: Higher weight for larger ranges (fewer concurrent requests)
120    /// - **Complex calls**: Higher weight for expensive operations
121    /// - **Batch operations**: Higher weight for larger batches
122    /// - **Historical queries**: Higher weight for deeper history lookups
123    ///
124    /// # Examples
125    ///
126    /// ```ignore
127    /// // For a heavy request, use higher weight to limit concurrency
128    /// let weight = 20; // Allow at most 20 concurrent requests of this type
129    /// let _permit = self.acquire_weighted_blocking_io(weight).await?;
130    /// ```
131    ///
132    /// This helps prevent resource exhaustion from concurrent expensive operations while allowing
133    /// many cheap operations to run in parallel.
134    ///
135    /// See also [`Semaphore::acquire_many_owned`](`tokio::sync::Semaphore::acquire_many_owned`).
136    fn acquire_weighted_blocking_io(
137        &self,
138        weight: u32,
139    ) -> impl Future<Output = Result<OwnedSemaphorePermit, AcquireError>> + Send {
140        let guard = self.blocking_io_task_guard();
141        let total_permits = guard.available_permits().max(1) as u32;
142        let weight = weight.max(1);
143        let mut permits_to_acquire = (total_permits / weight).max(1);
144
145        // If total_permits divides evenly by weight, add 1 to ensure that when `weight`
146        // concurrent requests are running, at least `weight - 1` permits remain available
147        // for other tasks
148        if total_permits.is_multiple_of(weight) {
149            permits_to_acquire += 1;
150        }
151
152        guard.clone().acquire_many_owned(permits_to_acquire)
153    }
154
155    /// Executes the future on a new blocking task.
156    ///
157    /// Note: This is expected for futures that are dominated by blocking IO operations, for tracing
158    /// or CPU bound operations in general use [`spawn_tracing`](Self::spawn_tracing).
159    fn spawn_blocking_io<F, R>(&self, f: F) -> impl Future<Output = Result<R, Self::Error>> + Send
160    where
161        F: FnOnce(Self) -> Result<R, Self::Error> + Send + 'static,
162        R: Send + 'static,
163    {
164        let (tx, rx) = oneshot::channel();
165        let this = self.clone();
166        self.io_task_spawner().spawn_blocking(Box::pin(async move {
167            let res = f(this);
168            let _ = tx.send(res);
169        }));
170
171        async move { rx.await.map_err(|_| EthApiError::InternalEthError)? }
172    }
173
174    /// Executes the future on a new blocking task.
175    ///
176    /// Note: This is expected for futures that are dominated by blocking IO operations, for tracing
177    /// or CPU bound operations in general use [`spawn_tracing`](Self::spawn_tracing).
178    fn spawn_blocking_io_fut<F, R, Fut>(
179        &self,
180        f: F,
181    ) -> impl Future<Output = Result<R, Self::Error>> + Send
182    where
183        Fut: Future<Output = Result<R, Self::Error>> + Send + 'static,
184        F: FnOnce(Self) -> Fut + Send + 'static,
185        R: Send + 'static,
186    {
187        let (tx, rx) = oneshot::channel();
188        let this = self.clone();
189        self.io_task_spawner().spawn_blocking(Box::pin(async move {
190            let res = f(this).await;
191            let _ = tx.send(res);
192        }));
193
194        async move { rx.await.map_err(|_| EthApiError::InternalEthError)? }
195    }
196
197    /// Executes a blocking task on the tracing pool.
198    ///
199    /// Note: This is expected for futures that are predominantly CPU bound, as it uses `rayon`
200    /// under the hood, for blocking IO futures use [`spawn_blocking`](Self::spawn_blocking_io). See
201    /// <https://ryhl.io/blog/async-what-is-blocking/>.
202    fn spawn_tracing<F, R>(&self, f: F) -> impl Future<Output = Result<R, Self::Error>> + Send
203    where
204        F: FnOnce(Self) -> Result<R, Self::Error> + Send + 'static,
205        R: Send + 'static,
206    {
207        let this = self.clone();
208        let fut = self.tracing_task_pool().spawn(move || f(this));
209        async move { fut.await.map_err(|_| EthApiError::InternalBlockingTaskError)? }
210    }
211}