reth_rpc_eth_api/helpers/
blocking_task.rs1use futures::Future;
5use reth_rpc_eth_types::EthApiError;
6use reth_tasks::{
7 pool::{BlockingTaskGuard, BlockingTaskPool},
8 TaskSpawner,
9};
10use tokio::sync::{oneshot, AcquireError, OwnedSemaphorePermit};
11
12use crate::EthApiTypes;
13
14pub trait SpawnBlocking: EthApiTypes + Clone + Send + Sync + 'static {
16 fn io_task_spawner(&self) -> impl TaskSpawner;
20
21 fn tracing_task_pool(&self) -> &BlockingTaskPool;
25
26 fn tracing_task_guard(&self) -> &BlockingTaskGuard;
28
29 fn acquire_owned(
31 &self,
32 ) -> impl Future<Output = Result<OwnedSemaphorePermit, AcquireError>> + Send {
33 self.tracing_task_guard().clone().acquire_owned()
34 }
35
36 fn acquire_many_owned(
38 &self,
39 n: u32,
40 ) -> impl Future<Output = Result<OwnedSemaphorePermit, AcquireError>> + Send {
41 self.tracing_task_guard().clone().acquire_many_owned(n)
42 }
43
44 fn spawn_blocking_io<F, R>(&self, f: F) -> impl Future<Output = Result<R, Self::Error>> + Send
49 where
50 F: FnOnce(Self) -> Result<R, Self::Error> + Send + 'static,
51 R: Send + 'static,
52 {
53 let (tx, rx) = oneshot::channel();
54 let this = self.clone();
55 self.io_task_spawner().spawn_blocking(Box::pin(async move {
56 let res = f(this);
57 let _ = tx.send(res);
58 }));
59
60 async move { rx.await.map_err(|_| EthApiError::InternalEthError)? }
61 }
62
63 fn spawn_blocking_io_fut<F, R, Fut>(
68 &self,
69 f: F,
70 ) -> impl Future<Output = Result<R, Self::Error>> + Send
71 where
72 Fut: Future<Output = Result<R, Self::Error>> + Send + 'static,
73 F: FnOnce(Self) -> Fut + Send + 'static,
74 R: Send + 'static,
75 {
76 let (tx, rx) = oneshot::channel();
77 let this = self.clone();
78 self.io_task_spawner().spawn_blocking(Box::pin(async move {
79 let res = f(this).await;
80 let _ = tx.send(res);
81 }));
82
83 async move { rx.await.map_err(|_| EthApiError::InternalEthError)? }
84 }
85
86 fn spawn_tracing<F, R>(&self, f: F) -> impl Future<Output = Result<R, Self::Error>> + Send
92 where
93 F: FnOnce(Self) -> Result<R, Self::Error> + Send + 'static,
94 R: Send + 'static,
95 {
96 let this = self.clone();
97 let fut = self.tracing_task_pool().spawn(move || f(this));
98 async move { fut.await.map_err(|_| EthApiError::InternalBlockingTaskError)? }
99 }
100}