reth_consensus_debug_client/providers/
rpc.rs

1use crate::BlockProvider;
2use alloy_consensus::BlockHeader;
3use alloy_provider::{Network, Provider, ProviderBuilder};
4use futures::StreamExt;
5use reth_node_api::Block;
6use reth_tracing::tracing::warn;
7use std::sync::Arc;
8use tokio::sync::mpsc::Sender;
9
10/// Block provider that fetches new blocks from an RPC endpoint using a connection that supports
11/// RPC subscriptions.
12#[derive(derive_more::Debug, Clone)]
13pub struct RpcBlockProvider<N: Network, PrimitiveBlock> {
14    #[debug(skip)]
15    provider: Arc<dyn Provider<N>>,
16    url: String,
17    #[debug(skip)]
18    convert: Arc<dyn Fn(N::BlockResponse) -> PrimitiveBlock + Send + Sync>,
19}
20
21impl<N: Network, PrimitiveBlock> RpcBlockProvider<N, PrimitiveBlock> {
22    /// Create a new RPC block provider with the given RPC URL.
23    pub async fn new(
24        rpc_url: &str,
25        convert: impl Fn(N::BlockResponse) -> PrimitiveBlock + Send + Sync + 'static,
26    ) -> eyre::Result<Self> {
27        Ok(Self {
28            provider: Arc::new(
29                ProviderBuilder::new()
30                    .disable_recommended_fillers()
31                    .network::<N>()
32                    .connect(rpc_url)
33                    .await?,
34            ),
35            url: rpc_url.to_string(),
36            convert: Arc::new(convert),
37        })
38    }
39}
40
41impl<N: Network, PrimitiveBlock> BlockProvider for RpcBlockProvider<N, PrimitiveBlock>
42where
43    PrimitiveBlock: Block + 'static,
44{
45    type Block = PrimitiveBlock;
46
47    async fn subscribe_blocks(&self, tx: Sender<Self::Block>) {
48        let mut stream = match self.provider.subscribe_blocks().await {
49            Ok(sub) => sub.into_stream(),
50            Err(err) => {
51                warn!(
52                    target: "consensus::debug-client",
53                    %err,
54                    url=%self.url,
55                    "Failed to subscribe to blocks",
56                );
57                return;
58            }
59        };
60        while let Some(header) = stream.next().await {
61            match self.get_block(header.number()).await {
62                Ok(block) => {
63                    if tx.send(block).await.is_err() {
64                        // Channel closed.
65                        break;
66                    }
67                }
68                Err(err) => {
69                    warn!(
70                        target: "consensus::debug-client",
71                        %err,
72                        url=%self.url,
73                        "Failed to fetch a block",
74                    );
75                }
76            }
77        }
78    }
79
80    async fn get_block(&self, block_number: u64) -> eyre::Result<Self::Block> {
81        let block = self
82            .provider
83            .get_block_by_number(block_number.into())
84            .full()
85            .await?
86            .ok_or_else(|| eyre::eyre!("block not found by number {}", block_number))?;
87        Ok((self.convert)(block))
88    }
89}