1use jsonrpsee::{
2 core::middleware::{Batch, Notification},
3 server::middleware::rpc::RpcServiceT,
4 types::Request,
5 MethodResponse, RpcModule,
6};
7use reth_metrics::{
8 metrics::{Counter, Histogram},
9 Metrics,
10};
11use std::{
12 collections::HashMap,
13 future::Future,
14 pin::Pin,
15 sync::Arc,
16 task::{Context, Poll},
17 time::Instant,
18};
19use tower::Layer;
20
21#[derive(Default, Debug, Clone)]
29pub(crate) struct RpcRequestMetrics {
30 inner: Arc<RpcServerMetricsInner>,
31}
32
33impl RpcRequestMetrics {
34 pub(crate) fn new(module: &RpcModule<()>, transport: RpcTransport) -> Self {
35 Self {
36 inner: Arc::new(RpcServerMetricsInner {
37 connection_metrics: transport.connection_metrics(),
38 call_metrics: module
39 .method_names()
40 .map(|method| {
41 (method, RpcServerCallMetrics::new_with_labels(&[("method", method)]))
42 })
43 .collect(),
44 }),
45 }
46 }
47
48 pub(crate) fn http(module: &RpcModule<()>) -> Self {
50 Self::new(module, RpcTransport::Http)
51 }
52
53 pub(crate) fn same_port(module: &RpcModule<()>) -> Self {
57 Self::http(module)
58 }
59
60 pub(crate) fn ws(module: &RpcModule<()>) -> Self {
62 Self::new(module, RpcTransport::WebSocket)
63 }
64
65 pub(crate) fn ipc(module: &RpcModule<()>) -> Self {
67 Self::new(module, RpcTransport::Ipc)
68 }
69}
70
71impl<S> Layer<S> for RpcRequestMetrics {
72 type Service = RpcRequestMetricsService<S>;
73
74 fn layer(&self, inner: S) -> Self::Service {
75 RpcRequestMetricsService::new(inner, self.clone())
76 }
77}
78
79#[derive(Default, Clone, Debug)]
81struct RpcServerMetricsInner {
82 connection_metrics: RpcServerConnectionMetrics,
84 call_metrics: HashMap<&'static str, RpcServerCallMetrics>,
86}
87
88#[derive(Clone, Debug)]
92pub struct RpcRequestMetricsService<S> {
93 metrics: RpcRequestMetrics,
95 inner: S,
97}
98
99impl<S> RpcRequestMetricsService<S> {
100 pub(crate) fn new(service: S, metrics: RpcRequestMetrics) -> Self {
101 metrics.inner.connection_metrics.connections_opened_total.increment(1);
103 Self { inner: service, metrics }
104 }
105}
106
107impl<S> RpcServiceT for RpcRequestMetricsService<S>
108where
109 S: RpcServiceT<MethodResponse = MethodResponse> + Send + Sync + Clone + 'static,
110{
111 type MethodResponse = S::MethodResponse;
112 type NotificationResponse = S::NotificationResponse;
113 type BatchResponse = S::BatchResponse;
114
115 fn call<'a>(&self, req: Request<'a>) -> impl Future<Output = S::MethodResponse> + Send + 'a {
116 self.metrics.inner.connection_metrics.requests_started_total.increment(1);
117 let call_metrics = self.metrics.inner.call_metrics.get_key_value(req.method.as_ref());
118 if let Some((_, call_metrics)) = &call_metrics {
119 call_metrics.started_total.increment(1);
120 }
121 MeteredRequestFuture {
122 fut: self.inner.call(req),
123 started_at: Instant::now(),
124 metrics: self.metrics.clone(),
125 method: call_metrics.map(|(method, _)| *method),
126 }
127 }
128
129 fn batch<'a>(&self, req: Batch<'a>) -> impl Future<Output = Self::BatchResponse> + Send + 'a {
130 self.metrics.inner.connection_metrics.batches_started_total.increment(1);
131
132 for batch_entry in req.iter().flatten() {
133 let method_name = batch_entry.method_name();
134 if let Some(call_metrics) = self.metrics.inner.call_metrics.get(method_name) {
135 call_metrics.started_total.increment(1);
136 }
137 }
138
139 MeteredBatchRequestsFuture {
140 fut: self.inner.batch(req),
141 started_at: Instant::now(),
142 metrics: self.metrics.clone(),
143 }
144 }
145
146 fn notification<'a>(
147 &self,
148 n: Notification<'a>,
149 ) -> impl Future<Output = Self::NotificationResponse> + Send + 'a {
150 self.inner.notification(n)
151 }
152}
153
154impl<S> Drop for RpcRequestMetricsService<S> {
155 fn drop(&mut self) {
156 self.metrics.inner.connection_metrics.connections_closed_total.increment(1);
158 }
159}
160
161#[pin_project::pin_project]
163pub struct MeteredRequestFuture<F> {
164 #[pin]
165 fut: F,
166 started_at: Instant,
168 metrics: RpcRequestMetrics,
170 method: Option<&'static str>,
172}
173
174impl<F> std::fmt::Debug for MeteredRequestFuture<F> {
175 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
176 f.write_str("MeteredRequestFuture")
177 }
178}
179
180impl<F: Future<Output = MethodResponse>> Future for MeteredRequestFuture<F> {
181 type Output = F::Output;
182
183 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
184 let this = self.project();
185
186 let res = this.fut.poll(cx);
187 if let Poll::Ready(resp) = &res {
188 let elapsed = this.started_at.elapsed().as_secs_f64();
189
190 this.metrics.inner.connection_metrics.requests_finished_total.increment(1);
192 this.metrics.inner.connection_metrics.request_time_seconds.record(elapsed);
193
194 if let Some(call_metrics) =
196 this.method.and_then(|method| this.metrics.inner.call_metrics.get(method))
197 {
198 call_metrics.time_seconds.record(elapsed);
199 if resp.is_success() {
200 call_metrics.successful_total.increment(1);
201 } else {
202 call_metrics.failed_total.increment(1);
203 }
204 }
205 }
206 res
207 }
208}
209
210#[pin_project::pin_project]
212pub struct MeteredBatchRequestsFuture<F> {
213 #[pin]
214 fut: F,
215 started_at: Instant,
217 metrics: RpcRequestMetrics,
219}
220
221impl<F> std::fmt::Debug for MeteredBatchRequestsFuture<F> {
222 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
223 f.write_str("MeteredBatchRequestsFuture")
224 }
225}
226
227impl<F> Future for MeteredBatchRequestsFuture<F>
228where
229 F: Future,
230{
231 type Output = F::Output;
232
233 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
234 let this = self.project();
235 let res = this.fut.poll(cx);
236
237 if res.is_ready() {
238 let elapsed = this.started_at.elapsed().as_secs_f64();
239 this.metrics.inner.connection_metrics.batches_finished_total.increment(1);
240 this.metrics.inner.connection_metrics.batch_response_time_seconds.record(elapsed);
241 }
242 res
243 }
244}
245
246#[derive(Debug, Clone, Copy, Eq, PartialEq)]
248pub(crate) enum RpcTransport {
249 Http,
250 WebSocket,
251 Ipc,
252}
253
254impl RpcTransport {
255 pub(crate) const fn as_str(&self) -> &'static str {
257 match self {
258 Self::Http => "http",
259 Self::WebSocket => "ws",
260 Self::Ipc => "ipc",
261 }
262 }
263
264 fn connection_metrics(&self) -> RpcServerConnectionMetrics {
266 RpcServerConnectionMetrics::new_with_labels(&[("transport", self.as_str())])
267 }
268}
269
270#[derive(Metrics, Clone)]
272#[metrics(scope = "rpc_server.connections")]
273struct RpcServerConnectionMetrics {
274 connections_opened_total: Counter,
276 connections_closed_total: Counter,
278 requests_started_total: Counter,
280 requests_finished_total: Counter,
282 request_time_seconds: Histogram,
284 batches_started_total: Counter,
286 batches_finished_total: Counter,
288 batch_response_time_seconds: Histogram,
290}
291
292#[derive(Metrics, Clone)]
294#[metrics(scope = "rpc_server.calls")]
295struct RpcServerCallMetrics {
296 started_total: Counter,
298 successful_total: Counter,
300 failed_total: Counter,
302 time_seconds: Histogram,
304}