reth_rpc_builder/
metrics.rsuse jsonrpsee::{server::middleware::rpc::RpcServiceT, types::Request, MethodResponse, RpcModule};
use reth_metrics::{
metrics::{Counter, Histogram},
Metrics,
};
use std::{
collections::HashMap,
future::Future,
pin::Pin,
sync::Arc,
task::{Context, Poll},
time::Instant,
};
use tower::Layer;
#[derive(Default, Debug, Clone)]
pub(crate) struct RpcRequestMetrics {
inner: Arc<RpcServerMetricsInner>,
}
impl RpcRequestMetrics {
pub(crate) fn new(module: &RpcModule<()>, transport: RpcTransport) -> Self {
Self {
inner: Arc::new(RpcServerMetricsInner {
connection_metrics: transport.connection_metrics(),
call_metrics: module
.method_names()
.map(|method| {
(method, RpcServerCallMetrics::new_with_labels(&[("method", method)]))
})
.collect(),
}),
}
}
pub(crate) fn http(module: &RpcModule<()>) -> Self {
Self::new(module, RpcTransport::Http)
}
pub(crate) fn same_port(module: &RpcModule<()>) -> Self {
Self::http(module)
}
pub(crate) fn ws(module: &RpcModule<()>) -> Self {
Self::new(module, RpcTransport::WebSocket)
}
#[allow(unused)]
pub(crate) fn ipc(module: &RpcModule<()>) -> Self {
Self::new(module, RpcTransport::Ipc)
}
}
impl<S> Layer<S> for RpcRequestMetrics {
type Service = RpcRequestMetricsService<S>;
fn layer(&self, inner: S) -> Self::Service {
RpcRequestMetricsService::new(inner, self.clone())
}
}
#[derive(Default, Clone, Debug)]
struct RpcServerMetricsInner {
connection_metrics: RpcServerConnectionMetrics,
call_metrics: HashMap<&'static str, RpcServerCallMetrics>,
}
#[derive(Clone, Debug)]
pub struct RpcRequestMetricsService<S> {
metrics: RpcRequestMetrics,
inner: S,
}
impl<S> RpcRequestMetricsService<S> {
pub(crate) fn new(service: S, metrics: RpcRequestMetrics) -> Self {
metrics.inner.connection_metrics.connections_opened_total.increment(1);
Self { inner: service, metrics }
}
}
impl<'a, S> RpcServiceT<'a> for RpcRequestMetricsService<S>
where
S: RpcServiceT<'a> + Send + Sync + Clone + 'static,
{
type Future = MeteredRequestFuture<S::Future>;
fn call(&self, req: Request<'a>) -> Self::Future {
self.metrics.inner.connection_metrics.requests_started_total.increment(1);
let call_metrics = self.metrics.inner.call_metrics.get_key_value(req.method.as_ref());
if let Some((_, call_metrics)) = &call_metrics {
call_metrics.started_total.increment(1);
}
MeteredRequestFuture {
fut: self.inner.call(req),
started_at: Instant::now(),
metrics: self.metrics.clone(),
method: call_metrics.map(|(method, _)| *method),
}
}
}
impl<S> Drop for RpcRequestMetricsService<S> {
fn drop(&mut self) {
self.metrics.inner.connection_metrics.connections_closed_total.increment(1);
}
}
#[pin_project::pin_project]
pub struct MeteredRequestFuture<F> {
#[pin]
fut: F,
started_at: Instant,
metrics: RpcRequestMetrics,
method: Option<&'static str>,
}
impl<F> std::fmt::Debug for MeteredRequestFuture<F> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str("MeteredRequestFuture")
}
}
impl<F: Future<Output = MethodResponse>> Future for MeteredRequestFuture<F> {
type Output = F::Output;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
let res = this.fut.poll(cx);
if let Poll::Ready(resp) = &res {
let elapsed = this.started_at.elapsed().as_secs_f64();
this.metrics.inner.connection_metrics.requests_finished_total.increment(1);
this.metrics.inner.connection_metrics.request_time_seconds.record(elapsed);
if let Some(call_metrics) =
this.method.and_then(|method| this.metrics.inner.call_metrics.get(method))
{
call_metrics.time_seconds.record(elapsed);
if resp.is_success() {
call_metrics.successful_total.increment(1);
} else {
call_metrics.failed_total.increment(1);
}
}
}
res
}
}
#[derive(Debug, Clone, Copy, Eq, PartialEq)]
pub(crate) enum RpcTransport {
Http,
WebSocket,
#[allow(unused)]
Ipc,
}
impl RpcTransport {
pub(crate) const fn as_str(&self) -> &'static str {
match self {
Self::Http => "http",
Self::WebSocket => "ws",
Self::Ipc => "ipc",
}
}
fn connection_metrics(&self) -> RpcServerConnectionMetrics {
RpcServerConnectionMetrics::new_with_labels(&[("transport", self.as_str())])
}
}
#[derive(Metrics, Clone)]
#[metrics(scope = "rpc_server.connections")]
struct RpcServerConnectionMetrics {
connections_opened_total: Counter,
connections_closed_total: Counter,
requests_started_total: Counter,
requests_finished_total: Counter,
request_time_seconds: Histogram,
}
#[derive(Metrics, Clone)]
#[metrics(scope = "rpc_server.calls")]
struct RpcServerCallMetrics {
started_total: Counter,
successful_total: Counter,
failed_total: Counter,
time_seconds: Histogram,
}