reth_consensus_debug_client/providers/
rpc.rs

1use crate::BlockProvider;
2use alloy_provider::{Network, Provider, ProviderBuilder};
3use alloy_transport::TransportResult;
4use futures::{Stream, StreamExt};
5use reth_node_api::Block;
6use reth_tracing::tracing::{debug, warn};
7use std::sync::Arc;
8use tokio::sync::mpsc::Sender;
9
10/// Block provider that fetches new blocks from an RPC endpoint using a connection that supports
11/// RPC subscriptions.
12#[derive(derive_more::Debug, Clone)]
13pub struct RpcBlockProvider<N: Network, PrimitiveBlock> {
14    #[debug(skip)]
15    provider: Arc<dyn Provider<N>>,
16    url: String,
17    #[debug(skip)]
18    convert: Arc<dyn Fn(N::BlockResponse) -> PrimitiveBlock + Send + Sync>,
19}
20
21impl<N: Network, PrimitiveBlock> RpcBlockProvider<N, PrimitiveBlock> {
22    /// Create a new RPC block provider with the given RPC URL.
23    pub async fn new(
24        rpc_url: &str,
25        convert: impl Fn(N::BlockResponse) -> PrimitiveBlock + Send + Sync + 'static,
26    ) -> eyre::Result<Self> {
27        Ok(Self {
28            provider: Arc::new(ProviderBuilder::default().connect(rpc_url).await?),
29            url: rpc_url.to_string(),
30            convert: Arc::new(convert),
31        })
32    }
33
34    /// Obtains a full block stream.
35    ///
36    /// This first attempts to obtain an `eth_subscribe` subscription, if that fails because the
37    /// connection is not a websocket, this falls back to poll based subscription.
38    async fn full_block_stream(
39        &self,
40    ) -> TransportResult<impl Stream<Item = TransportResult<N::BlockResponse>>> {
41        // first try to obtain a regular subscription
42        match self.provider.subscribe_full_blocks().full().into_stream().await {
43            Ok(sub) => Ok(sub.left_stream()),
44            Err(err) => {
45                debug!(
46                    target: "consensus::debug-client",
47                    %err,
48                    url=%self.url,
49                    "Failed to establish block subscription",
50                );
51                Ok(self.provider.watch_full_blocks().await?.full().into_stream().right_stream())
52            }
53        }
54    }
55}
56
57impl<N: Network, PrimitiveBlock> BlockProvider for RpcBlockProvider<N, PrimitiveBlock>
58where
59    PrimitiveBlock: Block + 'static,
60{
61    type Block = PrimitiveBlock;
62
63    async fn subscribe_blocks(&self, tx: Sender<Self::Block>) {
64        let Ok(mut stream) = self.full_block_stream().await.inspect_err(|err| {
65            warn!(
66                target: "consensus::debug-client",
67                %err,
68                url=%self.url,
69                "Failed to subscribe to blocks",
70            );
71        }) else {
72            return
73        };
74
75        while let Some(res) = stream.next().await {
76            match res {
77                Ok(block) => {
78                    if tx.send((self.convert)(block)).await.is_err() {
79                        // Channel closed.
80                        break;
81                    }
82                }
83                Err(err) => {
84                    warn!(
85                        target: "consensus::debug-client",
86                        %err,
87                        url=%self.url,
88                        "Failed to fetch a block",
89                    );
90                }
91            }
92        }
93    }
94
95    async fn get_block(&self, block_number: u64) -> eyre::Result<Self::Block> {
96        let block = self
97            .provider
98            .get_block_by_number(block_number.into())
99            .full()
100            .await?
101            .ok_or_else(|| eyre::eyre!("block not found by number {}", block_number))?;
102        Ok((self.convert)(block))
103    }
104}