reth_consensus_debug_client/providers/
rpc.rs1use crate::PayloadProvider;
2use alloy_consensus::BlockHeader;
3use alloy_eips::BlockId;
4use alloy_provider::{
5 network::{primitives::HeaderResponse, BlockResponse, Network},
6 ConnectionConfig, Provider, ProviderBuilder, WebSocketConfig,
7};
8use alloy_rpc_types_engine::PayloadExtras;
9use alloy_transport::TransportResult;
10use futures::{Stream, StreamExt};
11use reth_node_api::ExecutionPayload;
12use reth_tracing::tracing::{debug, warn};
13use std::sync::Arc;
14use tokio::sync::mpsc::Sender;
15
16#[derive(derive_more::Debug, Clone)]
19pub struct RpcBlockProvider<N: Network, ExecutionData> {
20 #[debug(skip)]
21 provider: Arc<dyn Provider<N>>,
22 url: String,
23 fetch_block_access_list: bool,
24 #[debug(skip)]
25 convert: Arc<dyn Fn(N::BlockResponse, PayloadExtras) -> ExecutionData + Send + Sync>,
26}
27
28impl<N: Network, ExecutionData> RpcBlockProvider<N, ExecutionData> {
29 pub async fn new(
31 rpc_url: &str,
32 convert: impl Fn(N::BlockResponse, PayloadExtras) -> ExecutionData + Send + Sync + 'static,
33 ) -> eyre::Result<Self> {
34 Ok(Self {
35 provider: Arc::new(
36 ProviderBuilder::default()
37 .connect_with_config(
38 rpc_url,
39 ConnectionConfig::default().with_max_retries(u32::MAX).with_ws_config(
40 WebSocketConfig::default()
41 .max_frame_size(Some(128 * 1024 * 1024))
43 .max_message_size(Some(128 * 1024 * 1024)),
44 ),
45 )
46 .await?,
47 ),
48 url: rpc_url.to_string(),
49 fetch_block_access_list: true,
50 convert: Arc::new(convert),
51 })
52 }
53
54 pub const fn without_block_access_lists(mut self) -> Self {
56 self.fetch_block_access_list = false;
57 self
58 }
59
60 async fn full_block_stream(
65 &self,
66 ) -> TransportResult<impl Stream<Item = TransportResult<N::BlockResponse>>> {
67 match self.provider.subscribe_full_blocks().full().into_stream().await {
69 Ok(sub) => Ok(sub.left_stream()),
70 Err(err) => {
71 debug!(
72 target: "consensus::debug-client",
73 %err,
74 url=%self.url,
75 "Failed to establish block subscription",
76 );
77 Ok(self.provider.watch_full_blocks().await?.full().into_stream().right_stream())
78 }
79 }
80 }
81
82 async fn payload_extras(&self, header: &N::HeaderResponse) -> PayloadExtras {
87 if !self.fetch_block_access_list {
88 return PayloadExtras::default()
89 }
90
91 let block_hash = header.hash();
92 if header.block_access_list_hash().is_none() {
93 return PayloadExtras::default()
94 };
95
96 let block_access_list = self
97 .provider
98 .get_block_access_list_raw(BlockId::from(block_hash))
99 .await
100 .inspect_err(|err| {
101 warn!(
102 target: "consensus::debug-client",
103 %err,
104 url=%self.url,
105 %block_hash,
106 "Failed to fetch block access list",
107 );
108 })
109 .ok()
110 .flatten();
111
112 PayloadExtras::from(block_access_list)
113 }
114}
115
116impl<N: Network, ExecutionData> PayloadProvider for RpcBlockProvider<N, ExecutionData>
117where
118 ExecutionData: ExecutionPayload,
119{
120 type ExecutionData = ExecutionData;
121
122 async fn subscribe_payloads(&self, tx: Sender<Self::ExecutionData>) {
123 loop {
124 let Ok(mut stream) = self.full_block_stream().await.inspect_err(|err| {
125 warn!(
126 target: "consensus::debug-client",
127 %err,
128 url=%self.url,
129 "Failed to subscribe to blocks, retrying in 5s",
130 );
131 }) else {
132 if tx.is_closed() {
135 return;
136 }
137 tokio::time::sleep(std::time::Duration::from_secs(5)).await;
138 continue
139 };
140
141 while let Some(res) = stream.next().await {
142 match res {
143 Ok(block) => {
144 let extras = self.payload_extras(block.header()).await;
145 let payload = (self.convert)(block, extras);
146 if tx.send(payload).await.is_err() {
147 return;
149 }
150 }
151 Err(err) => {
152 warn!(
153 target: "consensus::debug-client",
154 %err,
155 url=%self.url,
156 "Failed to fetch a block",
157 );
158 }
159 }
160 }
161 debug!(
163 target: "consensus::debug-client",
164 url=%self.url,
165 "Re-establishing block subscription",
166 );
167 }
168 }
169
170 async fn get_payload(&self, block_number: u64) -> eyre::Result<Self::ExecutionData> {
171 let block = self
172 .provider
173 .get_block_by_number(block_number.into())
174 .full()
175 .await?
176 .ok_or_else(|| eyre::eyre!("block not found by number {}", block_number))?;
177 let extras = self.payload_extras(block.header()).await;
178 Ok((self.convert)(block, extras))
179 }
180}