reth_rpc_builder/
metrics.rs

1use jsonrpsee::{
2    core::middleware::{Batch, Notification},
3    server::middleware::rpc::RpcServiceT,
4    types::Request,
5    MethodResponse, RpcModule,
6};
7use reth_metrics::{
8    metrics::{Counter, Histogram},
9    Metrics,
10};
11use std::{
12    collections::HashMap,
13    future::Future,
14    pin::Pin,
15    sync::Arc,
16    task::{Context, Poll},
17    time::Instant,
18};
19use tower::Layer;
20
21/// Metrics for the RPC server.
22///
23/// Metrics are divided into two categories:
24/// - Connection metrics: metrics for the connection (e.g. number of connections opened, relevant
25///   for WS and IPC)
26/// - Request metrics: metrics for each RPC method (e.g. number of calls started, time taken to
27///   process a call)
28#[derive(Default, Debug, Clone)]
29pub(crate) struct RpcRequestMetrics {
30    inner: Arc<RpcServerMetricsInner>,
31}
32
33impl RpcRequestMetrics {
34    pub(crate) fn new(module: &RpcModule<()>, transport: RpcTransport) -> Self {
35        Self {
36            inner: Arc::new(RpcServerMetricsInner {
37                connection_metrics: transport.connection_metrics(),
38                call_metrics: module
39                    .method_names()
40                    .map(|method| {
41                        (method, RpcServerCallMetrics::new_with_labels(&[("method", method)]))
42                    })
43                    .collect(),
44            }),
45        }
46    }
47
48    /// Creates a new instance of the metrics layer for HTTP.
49    pub(crate) fn http(module: &RpcModule<()>) -> Self {
50        Self::new(module, RpcTransport::Http)
51    }
52
53    /// Creates a new instance of the metrics layer for same port.
54    ///
55    /// Note: currently it's not possible to track transport specific metrics for a server that runs http and ws on the same port: <https://github.com/paritytech/jsonrpsee/issues/1345> until we have this feature we will use the http metrics for this case.
56    pub(crate) fn same_port(module: &RpcModule<()>) -> Self {
57        Self::http(module)
58    }
59
60    /// Creates a new instance of the metrics layer for Ws.
61    pub(crate) fn ws(module: &RpcModule<()>) -> Self {
62        Self::new(module, RpcTransport::WebSocket)
63    }
64
65    /// Creates a new instance of the metrics layer for Ipc.
66    pub(crate) fn ipc(module: &RpcModule<()>) -> Self {
67        Self::new(module, RpcTransport::Ipc)
68    }
69}
70
71impl<S> Layer<S> for RpcRequestMetrics {
72    type Service = RpcRequestMetricsService<S>;
73
74    fn layer(&self, inner: S) -> Self::Service {
75        RpcRequestMetricsService::new(inner, self.clone())
76    }
77}
78
79/// Metrics for the RPC server
80#[derive(Default, Clone, Debug)]
81struct RpcServerMetricsInner {
82    /// Connection metrics per transport type
83    connection_metrics: RpcServerConnectionMetrics,
84    /// Call metrics per RPC method
85    call_metrics: HashMap<&'static str, RpcServerCallMetrics>,
86}
87
88/// A [`RpcServiceT`] middleware that captures RPC metrics for the server.
89///
90/// This is created per connection and captures metrics for each request.
91#[derive(Clone, Debug)]
92pub struct RpcRequestMetricsService<S> {
93    /// The metrics collector for RPC requests
94    metrics: RpcRequestMetrics,
95    /// The inner service being wrapped
96    inner: S,
97}
98
99impl<S> RpcRequestMetricsService<S> {
100    pub(crate) fn new(service: S, metrics: RpcRequestMetrics) -> Self {
101        // this instance is kept alive for the duration of the connection
102        metrics.inner.connection_metrics.connections_opened_total.increment(1);
103        Self { inner: service, metrics }
104    }
105}
106
107impl<S> RpcServiceT for RpcRequestMetricsService<S>
108where
109    S: RpcServiceT<MethodResponse = MethodResponse> + Send + Sync + Clone + 'static,
110{
111    type MethodResponse = S::MethodResponse;
112    type NotificationResponse = S::NotificationResponse;
113    type BatchResponse = S::BatchResponse;
114
115    fn call<'a>(&self, req: Request<'a>) -> impl Future<Output = S::MethodResponse> + Send + 'a {
116        self.metrics.inner.connection_metrics.requests_started_total.increment(1);
117        let call_metrics = self.metrics.inner.call_metrics.get_key_value(req.method.as_ref());
118        if let Some((_, call_metrics)) = &call_metrics {
119            call_metrics.started_total.increment(1);
120        }
121        MeteredRequestFuture {
122            fut: self.inner.call(req),
123            started_at: Instant::now(),
124            metrics: self.metrics.clone(),
125            method: call_metrics.map(|(method, _)| *method),
126        }
127    }
128
129    fn batch<'a>(&self, req: Batch<'a>) -> impl Future<Output = Self::BatchResponse> + Send + 'a {
130        self.metrics.inner.connection_metrics.batches_started_total.increment(1);
131
132        for batch_entry in req.iter().flatten() {
133            let method_name = batch_entry.method_name();
134            if let Some(call_metrics) = self.metrics.inner.call_metrics.get(method_name) {
135                call_metrics.started_total.increment(1);
136            }
137        }
138
139        MeteredBatchRequestsFuture {
140            fut: self.inner.batch(req),
141            started_at: Instant::now(),
142            metrics: self.metrics.clone(),
143        }
144    }
145
146    fn notification<'a>(
147        &self,
148        n: Notification<'a>,
149    ) -> impl Future<Output = Self::NotificationResponse> + Send + 'a {
150        self.inner.notification(n)
151    }
152}
153
154impl<S> Drop for RpcRequestMetricsService<S> {
155    fn drop(&mut self) {
156        // update connection metrics, connection closed
157        self.metrics.inner.connection_metrics.connections_closed_total.increment(1);
158    }
159}
160
161/// Response future to update the metrics for a single request/response pair.
162#[pin_project::pin_project]
163pub struct MeteredRequestFuture<F> {
164    #[pin]
165    fut: F,
166    /// time when the request started
167    started_at: Instant,
168    /// metrics for the method call
169    metrics: RpcRequestMetrics,
170    /// the method name if known
171    method: Option<&'static str>,
172}
173
174impl<F> std::fmt::Debug for MeteredRequestFuture<F> {
175    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
176        f.write_str("MeteredRequestFuture")
177    }
178}
179
180impl<F: Future<Output = MethodResponse>> Future for MeteredRequestFuture<F> {
181    type Output = F::Output;
182
183    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
184        let this = self.project();
185
186        let res = this.fut.poll(cx);
187        if let Poll::Ready(resp) = &res {
188            let elapsed = this.started_at.elapsed().as_secs_f64();
189
190            // update transport metrics
191            this.metrics.inner.connection_metrics.requests_finished_total.increment(1);
192            this.metrics.inner.connection_metrics.request_time_seconds.record(elapsed);
193
194            // update call metrics
195            if let Some(call_metrics) =
196                this.method.and_then(|method| this.metrics.inner.call_metrics.get(method))
197            {
198                call_metrics.time_seconds.record(elapsed);
199                if resp.is_success() {
200                    call_metrics.successful_total.increment(1);
201                } else {
202                    call_metrics.failed_total.increment(1);
203                }
204            }
205        }
206        res
207    }
208}
209
210/// Response future to update the metrics for a batch of request/response pairs.
211#[pin_project::pin_project]
212pub struct MeteredBatchRequestsFuture<F> {
213    #[pin]
214    fut: F,
215    /// time when the batch request started
216    started_at: Instant,
217    /// metrics for the batch
218    metrics: RpcRequestMetrics,
219}
220
221impl<F> std::fmt::Debug for MeteredBatchRequestsFuture<F> {
222    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
223        f.write_str("MeteredBatchRequestsFuture")
224    }
225}
226
227impl<F> Future for MeteredBatchRequestsFuture<F>
228where
229    F: Future,
230{
231    type Output = F::Output;
232
233    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
234        let this = self.project();
235        let res = this.fut.poll(cx);
236
237        if res.is_ready() {
238            let elapsed = this.started_at.elapsed().as_secs_f64();
239            this.metrics.inner.connection_metrics.batches_finished_total.increment(1);
240            this.metrics.inner.connection_metrics.batch_response_time_seconds.record(elapsed);
241        }
242        res
243    }
244}
245
246/// The transport protocol used for the RPC connection.
247#[derive(Debug, Clone, Copy, Eq, PartialEq)]
248pub(crate) enum RpcTransport {
249    Http,
250    WebSocket,
251    Ipc,
252}
253
254impl RpcTransport {
255    /// Returns the string representation of the transport protocol.
256    pub(crate) const fn as_str(&self) -> &'static str {
257        match self {
258            Self::Http => "http",
259            Self::WebSocket => "ws",
260            Self::Ipc => "ipc",
261        }
262    }
263
264    /// Returns the connection metrics for the transport protocol.
265    fn connection_metrics(&self) -> RpcServerConnectionMetrics {
266        RpcServerConnectionMetrics::new_with_labels(&[("transport", self.as_str())])
267    }
268}
269
270/// Metrics for the RPC connections
271#[derive(Metrics, Clone)]
272#[metrics(scope = "rpc_server.connections")]
273struct RpcServerConnectionMetrics {
274    /// The number of connections opened
275    connections_opened_total: Counter,
276    /// The number of connections closed
277    connections_closed_total: Counter,
278    /// The number of requests started
279    requests_started_total: Counter,
280    /// The number of requests finished
281    requests_finished_total: Counter,
282    /// Response for a single request/response pair
283    request_time_seconds: Histogram,
284    /// The number of batch requests started
285    batches_started_total: Counter,
286    /// The number of batch requests finished
287    batches_finished_total: Counter,
288    /// Response time for a batch request
289    batch_response_time_seconds: Histogram,
290}
291
292/// Metrics for the RPC calls
293#[derive(Metrics, Clone)]
294#[metrics(scope = "rpc_server.calls")]
295struct RpcServerCallMetrics {
296    /// The number of calls started
297    started_total: Counter,
298    /// The number of successful calls
299    successful_total: Counter,
300    /// The number of failed calls
301    failed_total: Counter,
302    /// Response for a single call
303    time_seconds: Histogram,
304}