Skip to main content

reth_consensus_debug_client/providers/
rpc.rs

1use crate::PayloadProvider;
2use alloy_consensus::BlockHeader;
3use alloy_eips::BlockId;
4use alloy_provider::{
5    network::{primitives::HeaderResponse, BlockResponse, Network},
6    ConnectionConfig, Provider, ProviderBuilder, WebSocketConfig,
7};
8use alloy_rpc_types_engine::PayloadExtras;
9use alloy_transport::TransportResult;
10use futures::{Stream, StreamExt};
11use reth_node_api::ExecutionPayload;
12use reth_tracing::tracing::{debug, warn};
13use std::sync::Arc;
14use tokio::sync::mpsc::Sender;
15
16/// Block provider that fetches new blocks from an RPC endpoint using a connection that supports
17/// RPC subscriptions.
18#[derive(derive_more::Debug, Clone)]
19pub struct RpcBlockProvider<N: Network, ExecutionData> {
20    #[debug(skip)]
21    provider: Arc<dyn Provider<N>>,
22    url: String,
23    fetch_block_access_list: bool,
24    #[debug(skip)]
25    convert: Arc<dyn Fn(N::BlockResponse, PayloadExtras) -> ExecutionData + Send + Sync>,
26}
27
28impl<N: Network, ExecutionData> RpcBlockProvider<N, ExecutionData> {
29    /// Create a new RPC block provider with the given RPC URL.
30    pub async fn new(
31        rpc_url: &str,
32        convert: impl Fn(N::BlockResponse, PayloadExtras) -> ExecutionData + Send + Sync + 'static,
33    ) -> eyre::Result<Self> {
34        Ok(Self {
35            provider: Arc::new(
36                ProviderBuilder::default()
37                    .connect_with_config(
38                        rpc_url,
39                        ConnectionConfig::default().with_max_retries(u32::MAX).with_ws_config(
40                            WebSocketConfig::default()
41                                // allow larger messages/frames for big blocks
42                                .max_frame_size(Some(128 * 1024 * 1024))
43                                .max_message_size(Some(128 * 1024 * 1024)),
44                        ),
45                    )
46                    .await?,
47            ),
48            url: rpc_url.to_string(),
49            fetch_block_access_list: true,
50            convert: Arc::new(convert),
51        })
52    }
53
54    /// Disables fetching raw block access list bytes.
55    pub const fn without_block_access_lists(mut self) -> Self {
56        self.fetch_block_access_list = false;
57        self
58    }
59
60    /// Obtains a full block stream.
61    ///
62    /// This first attempts to obtain an `eth_subscribe` subscription, if that fails because the
63    /// connection is not a websocket, this falls back to poll based subscription.
64    async fn full_block_stream(
65        &self,
66    ) -> TransportResult<impl Stream<Item = TransportResult<N::BlockResponse>>> {
67        // first try to obtain a regular subscription
68        match self.provider.subscribe_full_blocks().full().into_stream().await {
69            Ok(sub) => Ok(sub.left_stream()),
70            Err(err) => {
71                debug!(
72                    target: "consensus::debug-client",
73                    %err,
74                    url=%self.url,
75                    "Failed to establish block subscription",
76                );
77                Ok(self.provider.watch_full_blocks().await?.full().into_stream().right_stream())
78            }
79        }
80    }
81
82    /// Fetches optional payload side data for blocks that advertise a block access list hash.
83    ///
84    /// Block access lists are best effort here: RPC providers may not support
85    /// `eth_getBlockAccessListByHash`, so failed or missing responses fall back to empty extras.
86    async fn payload_extras(&self, header: &N::HeaderResponse) -> PayloadExtras {
87        if !self.fetch_block_access_list {
88            return PayloadExtras::default()
89        }
90
91        let block_hash = header.hash();
92        if header.block_access_list_hash().is_none() {
93            return PayloadExtras::default()
94        };
95
96        let block_access_list = self
97            .provider
98            .get_block_access_list_raw(BlockId::from(block_hash))
99            .await
100            .inspect_err(|err| {
101                warn!(
102                    target: "consensus::debug-client",
103                    %err,
104                    url=%self.url,
105                    %block_hash,
106                    "Failed to fetch block access list",
107                );
108            })
109            .ok()
110            .flatten();
111
112        PayloadExtras::from(block_access_list)
113    }
114}
115
116impl<N: Network, ExecutionData> PayloadProvider for RpcBlockProvider<N, ExecutionData>
117where
118    ExecutionData: ExecutionPayload,
119{
120    type ExecutionData = ExecutionData;
121
122    async fn subscribe_payloads(&self, tx: Sender<Self::ExecutionData>) {
123        loop {
124            let Ok(mut stream) = self.full_block_stream().await.inspect_err(|err| {
125                warn!(
126                    target: "consensus::debug-client",
127                    %err,
128                    url=%self.url,
129                    "Failed to subscribe to blocks, retrying in 5s",
130                );
131            }) else {
132                // Exit if the receiver has been dropped (e.g. during shutdown) so we
133                // don't keep retrying after the consumer is gone.
134                if tx.is_closed() {
135                    return;
136                }
137                tokio::time::sleep(std::time::Duration::from_secs(5)).await;
138                continue
139            };
140
141            while let Some(res) = stream.next().await {
142                match res {
143                    Ok(block) => {
144                        let extras = self.payload_extras(block.header()).await;
145                        let payload = (self.convert)(block, extras);
146                        if tx.send(payload).await.is_err() {
147                            // Channel closed - receiver dropped, exit completely.
148                            return;
149                        }
150                    }
151                    Err(err) => {
152                        warn!(
153                            target: "consensus::debug-client",
154                            %err,
155                            url=%self.url,
156                            "Failed to fetch a block",
157                        );
158                    }
159                }
160            }
161            // if stream terminated we want to re-establish it again
162            debug!(
163                target: "consensus::debug-client",
164                url=%self.url,
165                "Re-establishing block subscription",
166            );
167        }
168    }
169
170    async fn get_payload(&self, block_number: u64) -> eyre::Result<Self::ExecutionData> {
171        let block = self
172            .provider
173            .get_block_by_number(block_number.into())
174            .full()
175            .await?
176            .ok_or_else(|| eyre::eyre!("block not found by number {}", block_number))?;
177        let extras = self.payload_extras(block.header()).await;
178        Ok((self.convert)(block, extras))
179    }
180}