reth_rpc_eth_api/helpers/blocking_task.rs
1//! Spawns a blocking task. CPU heavy tasks are executed with the `rayon` library. IO heavy tasks
2//! are executed on the `tokio` runtime.
3
4use futures::Future;
5use reth_rpc_eth_types::EthApiError;
6use reth_tasks::{
7 pool::{BlockingTaskGuard, BlockingTaskPool},
8 TaskSpawner,
9};
10use std::sync::Arc;
11use tokio::sync::{oneshot, AcquireError, OwnedSemaphorePermit, Semaphore};
12
13use crate::EthApiTypes;
14
15/// Helpers for spawning blocking operations.
16///
17/// Operations can be blocking because they require lots of CPU work and/or IO.
18///
19/// This differentiates between workloads that are primarily CPU bound and heavier in general (such
20/// as tracing tasks) and tasks that have a more balanced profile (io and cpu), such as `eth_call`
21/// and alike.
22///
23/// This provides access to semaphores that permit how many of those are permitted concurrently.
24/// It's expected that tracing related tasks are configured with a lower threshold, because not only
25/// are they CPU heavy but they can also accumulate more memory for the traces.
26pub trait SpawnBlocking: EthApiTypes + Clone + Send + Sync + 'static {
27 /// Returns a handle for spawning IO heavy blocking tasks.
28 ///
29 /// Runtime access in default trait method implementations.
30 fn io_task_spawner(&self) -> impl TaskSpawner;
31
32 /// Returns a handle for spawning __CPU heavy__ blocking tasks, such as tracing requests.
33 ///
34 /// Thread pool access in default trait method implementations.
35 fn tracing_task_pool(&self) -> &BlockingTaskPool;
36
37 /// Returns handle to semaphore for pool of CPU heavy blocking tasks.
38 fn tracing_task_guard(&self) -> &BlockingTaskGuard;
39
40 /// Returns handle to semaphore for blocking IO tasks.
41 ///
42 /// This semaphore is used to limit concurrent blocking IO operations like `eth_call`,
43 /// `eth_estimateGas`, and similar methods that require EVM execution.
44 fn blocking_io_task_guard(&self) -> &Arc<Semaphore>;
45
46 /// Acquires a permit from the tracing task semaphore.
47 ///
48 /// This should be used for __CPU heavy__ operations like `debug_traceTransaction`,
49 /// `debug_traceCall`, and similar tracing methods. These tasks are typically:
50 /// - Primarily CPU bound with intensive computation
51 /// - Can accumulate significant memory for trace results
52 /// - Expected to have lower concurrency limits than general blocking IO tasks
53 ///
54 /// For blocking IO tasks like `eth_call` or `eth_estimateGas`, use
55 /// [`acquire_owned_blocking_io`](Self::acquire_owned_blocking_io) instead.
56 ///
57 /// See also [`Semaphore::acquire_owned`](`tokio::sync::Semaphore::acquire_owned`).
58 fn acquire_owned_tracing(
59 &self,
60 ) -> impl Future<Output = Result<OwnedSemaphorePermit, AcquireError>> + Send {
61 self.tracing_task_guard().clone().acquire_owned()
62 }
63
64 /// Acquires multiple permits from the tracing task semaphore.
65 ///
66 /// This should be used for particularly heavy tracing operations that require more resources
67 /// than a standard trace. The permit count should reflect the expected resource consumption
68 /// relative to a standard tracing operation.
69 ///
70 /// Like [`acquire_owned_tracing`](Self::acquire_owned_tracing), this is specifically for
71 /// CPU-intensive tracing tasks, not general blocking IO operations.
72 ///
73 /// See also [`Semaphore::acquire_many_owned`](`tokio::sync::Semaphore::acquire_many_owned`).
74 fn acquire_many_owned_tracing(
75 &self,
76 n: u32,
77 ) -> impl Future<Output = Result<OwnedSemaphorePermit, AcquireError>> + Send {
78 self.tracing_task_guard().clone().acquire_many_owned(n)
79 }
80
81 /// Acquires a permit from the blocking IO request semaphore.
82 ///
83 /// This should be used for operations like `eth_call`, `eth_estimateGas`, and similar methods
84 /// that require EVM execution and are spawned as blocking tasks.
85 ///
86 /// See also [`Semaphore::acquire_owned`](`tokio::sync::Semaphore::acquire_owned`).
87 fn acquire_owned_blocking_io(
88 &self,
89 ) -> impl Future<Output = Result<OwnedSemaphorePermit, AcquireError>> + Send {
90 self.blocking_io_task_guard().clone().acquire_owned()
91 }
92
93 /// Acquires multiple permits from the blocking IO request semaphore.
94 ///
95 /// This should be used for operations that may require more resources than a single permit
96 /// allows.
97 ///
98 /// See also [`Semaphore::acquire_many_owned`](`tokio::sync::Semaphore::acquire_many_owned`).
99 fn acquire_many_owned_blocking_io(
100 &self,
101 n: u32,
102 ) -> impl Future<Output = Result<OwnedSemaphorePermit, AcquireError>> + Send {
103 self.blocking_io_task_guard().clone().acquire_many_owned(n)
104 }
105
106 /// Acquires permits from the blocking IO request semaphore based on a calculated weight.
107 ///
108 /// The weight determines the maximum number of concurrent requests of this type that can run.
109 /// For example, if the semaphore has 256 total permits and `weight=10`, then at most 10
110 /// concurrent requests of this type are allowed.
111 ///
112 /// The permits acquired per request is calculated as `total_permits / weight`, with an
113 /// adjustment: if this result is even, we add 1 to ensure that `weight - 1` permits are
114 /// always available for other tasks, preventing complete semaphore exhaustion.
115 ///
116 /// This should be used to explicitly limit concurrent requests based on their expected
117 /// resource consumption:
118 ///
119 /// - **Block range queries**: Higher weight for larger ranges (fewer concurrent requests)
120 /// - **Complex calls**: Higher weight for expensive operations
121 /// - **Batch operations**: Higher weight for larger batches
122 /// - **Historical queries**: Higher weight for deeper history lookups
123 ///
124 /// # Examples
125 ///
126 /// ```ignore
127 /// // For a heavy request, use higher weight to limit concurrency
128 /// let weight = 20; // Allow at most 20 concurrent requests of this type
129 /// let _permit = self.acquire_weighted_blocking_io(weight).await?;
130 /// ```
131 ///
132 /// This helps prevent resource exhaustion from concurrent expensive operations while allowing
133 /// many cheap operations to run in parallel.
134 ///
135 /// See also [`Semaphore::acquire_many_owned`](`tokio::sync::Semaphore::acquire_many_owned`).
136 fn acquire_weighted_blocking_io(
137 &self,
138 weight: u32,
139 ) -> impl Future<Output = Result<OwnedSemaphorePermit, AcquireError>> + Send {
140 let guard = self.blocking_io_task_guard();
141 let total_permits = guard.available_permits().max(1) as u32;
142 let weight = weight.max(1);
143 let mut permits_to_acquire = (total_permits / weight).max(1);
144
145 // If total_permits divides evenly by weight, add 1 to ensure that when `weight`
146 // concurrent requests are running, at least `weight - 1` permits remain available
147 // for other tasks
148 if total_permits.is_multiple_of(weight) {
149 permits_to_acquire += 1;
150 }
151
152 guard.clone().acquire_many_owned(permits_to_acquire)
153 }
154
155 /// Executes the future on a new blocking task.
156 ///
157 /// Note: This is expected for futures that are dominated by blocking IO operations, for tracing
158 /// or CPU bound operations in general use [`spawn_tracing`](Self::spawn_tracing).
159 fn spawn_blocking_io<F, R>(&self, f: F) -> impl Future<Output = Result<R, Self::Error>> + Send
160 where
161 F: FnOnce(Self) -> Result<R, Self::Error> + Send + 'static,
162 R: Send + 'static,
163 {
164 let (tx, rx) = oneshot::channel();
165 let this = self.clone();
166 self.io_task_spawner().spawn_blocking(Box::pin(async move {
167 let res = f(this);
168 let _ = tx.send(res);
169 }));
170
171 async move { rx.await.map_err(|_| EthApiError::InternalEthError)? }
172 }
173
174 /// Executes the future on a new blocking task.
175 ///
176 /// Note: This is expected for futures that are dominated by blocking IO operations, for tracing
177 /// or CPU bound operations in general use [`spawn_tracing`](Self::spawn_tracing).
178 fn spawn_blocking_io_fut<F, R, Fut>(
179 &self,
180 f: F,
181 ) -> impl Future<Output = Result<R, Self::Error>> + Send
182 where
183 Fut: Future<Output = Result<R, Self::Error>> + Send + 'static,
184 F: FnOnce(Self) -> Fut + Send + 'static,
185 R: Send + 'static,
186 {
187 let (tx, rx) = oneshot::channel();
188 let this = self.clone();
189 self.io_task_spawner().spawn_blocking(Box::pin(async move {
190 let res = f(this).await;
191 let _ = tx.send(res);
192 }));
193
194 async move { rx.await.map_err(|_| EthApiError::InternalEthError)? }
195 }
196
197 /// Executes a blocking task on the tracing pool.
198 ///
199 /// Note: This is expected for futures that are predominantly CPU bound, as it uses `rayon`
200 /// under the hood, for blocking IO futures use [`spawn_blocking`](Self::spawn_blocking_io). See
201 /// <https://ryhl.io/blog/async-what-is-blocking/>.
202 fn spawn_tracing<F, R>(&self, f: F) -> impl Future<Output = Result<R, Self::Error>> + Send
203 where
204 F: FnOnce(Self) -> Result<R, Self::Error> + Send + 'static,
205 R: Send + 'static,
206 {
207 let this = self.clone();
208 let fut = self.tracing_task_pool().spawn(move || f(this));
209 async move { fut.await.map_err(|_| EthApiError::InternalBlockingTaskError)? }
210 }
211}