reth_rpc_builder/
rate_limiter.rs

1//! [`jsonrpsee`] helper layer for rate limiting certain methods.
2
3use jsonrpsee::{server::middleware::rpc::RpcServiceT, types::Request, MethodResponse};
4use std::{
5    future::Future,
6    pin::Pin,
7    sync::Arc,
8    task::{ready, Context, Poll},
9};
10use tokio::sync::{OwnedSemaphorePermit, Semaphore};
11use tokio_util::sync::PollSemaphore;
12use tower::Layer;
13
14/// Rate limiter for the RPC server.
15///
16/// Rate limits expensive calls such as debug_ and trace_.
17#[derive(Debug, Clone)]
18pub struct RpcRequestRateLimiter {
19    inner: Arc<RpcRequestRateLimiterInner>,
20}
21
22impl RpcRequestRateLimiter {
23    /// Create a new rate limit layer with the given number of permits.
24    pub fn new(rate_limit: usize) -> Self {
25        Self {
26            inner: Arc::new(RpcRequestRateLimiterInner {
27                call_guard: PollSemaphore::new(Arc::new(Semaphore::new(rate_limit))),
28            }),
29        }
30    }
31}
32
33impl<S> Layer<S> for RpcRequestRateLimiter {
34    type Service = RpcRequestRateLimitingService<S>;
35
36    fn layer(&self, inner: S) -> Self::Service {
37        RpcRequestRateLimitingService::new(inner, self.clone())
38    }
39}
40
41/// Rate Limiter for the RPC server
42#[derive(Debug, Clone)]
43struct RpcRequestRateLimiterInner {
44    /// Semaphore to rate limit calls
45    call_guard: PollSemaphore,
46}
47
48/// A [`RpcServiceT`] middleware that rate limits RPC calls to the server.
49#[derive(Debug, Clone)]
50pub struct RpcRequestRateLimitingService<S> {
51    /// The rate limiter for RPC requests
52    rate_limiter: RpcRequestRateLimiter,
53    /// The inner service being wrapped
54    inner: S,
55}
56
57impl<S> RpcRequestRateLimitingService<S> {
58    /// Create a new rate limited service.
59    pub const fn new(service: S, rate_limiter: RpcRequestRateLimiter) -> Self {
60        Self { inner: service, rate_limiter }
61    }
62}
63
64impl<'a, S> RpcServiceT<'a> for RpcRequestRateLimitingService<S>
65where
66    S: RpcServiceT<'a> + Send + Sync + Clone + 'static,
67{
68    type Future = RateLimitingRequestFuture<S::Future>;
69
70    fn call(&self, req: Request<'a>) -> Self::Future {
71        let method_name = req.method_name();
72        if method_name.starts_with("trace_") || method_name.starts_with("debug_") {
73            RateLimitingRequestFuture {
74                fut: self.inner.call(req),
75                guard: Some(self.rate_limiter.inner.call_guard.clone()),
76                permit: None,
77            }
78        } else {
79            // if we don't need to rate limit, then there
80            // is no need to get a semaphore permit
81            RateLimitingRequestFuture { fut: self.inner.call(req), guard: None, permit: None }
82        }
83    }
84}
85
86/// Response future.
87#[pin_project::pin_project]
88pub struct RateLimitingRequestFuture<F> {
89    #[pin]
90    fut: F,
91    guard: Option<PollSemaphore>,
92    permit: Option<OwnedSemaphorePermit>,
93}
94
95impl<F> std::fmt::Debug for RateLimitingRequestFuture<F> {
96    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
97        f.write_str("RateLimitingRequestFuture")
98    }
99}
100
101impl<F: Future<Output = MethodResponse>> Future for RateLimitingRequestFuture<F> {
102    type Output = F::Output;
103
104    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
105        let this = self.project();
106        if let Some(guard) = this.guard.as_mut() {
107            *this.permit = ready!(guard.poll_acquire(cx));
108            *this.guard = None;
109        }
110        let res = this.fut.poll(cx);
111        if res.is_ready() {
112            *this.permit = None;
113        }
114        res
115    }
116}