Skip to main content

reth_consensus_debug_client/providers/
rpc.rs

1use crate::BlockProvider;
2use alloy_provider::{ConnectionConfig, Network, Provider, ProviderBuilder, WebSocketConfig};
3use alloy_transport::TransportResult;
4use futures::{Stream, StreamExt};
5use reth_node_api::Block;
6use reth_tracing::tracing::{debug, warn};
7use std::sync::Arc;
8use tokio::sync::mpsc::Sender;
9
10/// Block provider that fetches new blocks from an RPC endpoint using a connection that supports
11/// RPC subscriptions.
12#[derive(derive_more::Debug, Clone)]
13pub struct RpcBlockProvider<N: Network, PrimitiveBlock> {
14    #[debug(skip)]
15    provider: Arc<dyn Provider<N>>,
16    url: String,
17    #[debug(skip)]
18    convert: Arc<dyn Fn(N::BlockResponse) -> PrimitiveBlock + Send + Sync>,
19}
20
21impl<N: Network, PrimitiveBlock> RpcBlockProvider<N, PrimitiveBlock> {
22    /// Create a new RPC block provider with the given RPC URL.
23    pub async fn new(
24        rpc_url: &str,
25        convert: impl Fn(N::BlockResponse) -> PrimitiveBlock + Send + Sync + 'static,
26    ) -> eyre::Result<Self> {
27        Ok(Self {
28            provider: Arc::new(
29                ProviderBuilder::default()
30                    .connect_with_config(
31                        rpc_url,
32                        ConnectionConfig::default().with_max_retries(u32::MAX).with_ws_config(
33                            WebSocketConfig::default()
34                                // allow larger messages/frames for big blocks
35                                .max_frame_size(Some(128 * 1024 * 1024))
36                                .max_message_size(Some(128 * 1024 * 1024)),
37                        ),
38                    )
39                    .await?,
40            ),
41            url: rpc_url.to_string(),
42            convert: Arc::new(convert),
43        })
44    }
45
46    /// Obtains a full block stream.
47    ///
48    /// This first attempts to obtain an `eth_subscribe` subscription, if that fails because the
49    /// connection is not a websocket, this falls back to poll based subscription.
50    async fn full_block_stream(
51        &self,
52    ) -> TransportResult<impl Stream<Item = TransportResult<N::BlockResponse>>> {
53        // first try to obtain a regular subscription
54        match self.provider.subscribe_full_blocks().full().into_stream().await {
55            Ok(sub) => Ok(sub.left_stream()),
56            Err(err) => {
57                debug!(
58                    target: "consensus::debug-client",
59                    %err,
60                    url=%self.url,
61                    "Failed to establish block subscription",
62                );
63                Ok(self.provider.watch_full_blocks().await?.full().into_stream().right_stream())
64            }
65        }
66    }
67}
68
69impl<N: Network, PrimitiveBlock> BlockProvider for RpcBlockProvider<N, PrimitiveBlock>
70where
71    PrimitiveBlock: Block + 'static,
72{
73    type Block = PrimitiveBlock;
74
75    async fn subscribe_blocks(&self, tx: Sender<Self::Block>) {
76        loop {
77            let Ok(mut stream) = self.full_block_stream().await.inspect_err(|err| {
78                warn!(
79                    target: "consensus::debug-client",
80                    %err,
81                    url=%self.url,
82                    "Failed to subscribe to blocks, retrying in 5s",
83                );
84            }) else {
85                // Exit if the receiver has been dropped (e.g. during shutdown) so we
86                // don't keep retrying after the consumer is gone.
87                if tx.is_closed() {
88                    return;
89                }
90                tokio::time::sleep(std::time::Duration::from_secs(5)).await;
91                continue
92            };
93
94            while let Some(res) = stream.next().await {
95                match res {
96                    Ok(block) => {
97                        if tx.send((self.convert)(block)).await.is_err() {
98                            // Channel closed - receiver dropped, exit completely.
99                            return;
100                        }
101                    }
102                    Err(err) => {
103                        warn!(
104                            target: "consensus::debug-client",
105                            %err,
106                            url=%self.url,
107                            "Failed to fetch a block",
108                        );
109                    }
110                }
111            }
112            // if stream terminated we want to re-establish it again
113            debug!(
114                target: "consensus::debug-client",
115                url=%self.url,
116                "Re-establishing block subscription",
117            );
118        }
119    }
120
121    async fn get_block(&self, block_number: u64) -> eyre::Result<Self::Block> {
122        let block = self
123            .provider
124            .get_block_by_number(block_number.into())
125            .full()
126            .await?
127            .ok_or_else(|| eyre::eyre!("block not found by number {}", block_number))?;
128        Ok((self.convert)(block))
129    }
130}