reth_consensus_debug_client/providers/
rpc.rs1use crate::BlockProvider;
2use alloy_provider::{ConnectionConfig, Network, Provider, ProviderBuilder, WebSocketConfig};
3use alloy_transport::TransportResult;
4use futures::{Stream, StreamExt};
5use reth_node_api::Block;
6use reth_tracing::tracing::{debug, warn};
7use std::sync::Arc;
8use tokio::sync::mpsc::Sender;
9
10#[derive(derive_more::Debug, Clone)]
13pub struct RpcBlockProvider<N: Network, PrimitiveBlock> {
14 #[debug(skip)]
15 provider: Arc<dyn Provider<N>>,
16 url: String,
17 #[debug(skip)]
18 convert: Arc<dyn Fn(N::BlockResponse) -> PrimitiveBlock + Send + Sync>,
19}
20
21impl<N: Network, PrimitiveBlock> RpcBlockProvider<N, PrimitiveBlock> {
22 pub async fn new(
24 rpc_url: &str,
25 convert: impl Fn(N::BlockResponse) -> PrimitiveBlock + Send + Sync + 'static,
26 ) -> eyre::Result<Self> {
27 Ok(Self {
28 provider: Arc::new(
29 ProviderBuilder::default()
30 .connect_with_config(
31 rpc_url,
32 ConnectionConfig::default().with_max_retries(u32::MAX).with_ws_config(
33 WebSocketConfig::default()
34 .max_frame_size(Some(128 * 1024 * 1024))
36 .max_message_size(Some(128 * 1024 * 1024)),
37 ),
38 )
39 .await?,
40 ),
41 url: rpc_url.to_string(),
42 convert: Arc::new(convert),
43 })
44 }
45
46 async fn full_block_stream(
51 &self,
52 ) -> TransportResult<impl Stream<Item = TransportResult<N::BlockResponse>>> {
53 match self.provider.subscribe_full_blocks().full().into_stream().await {
55 Ok(sub) => Ok(sub.left_stream()),
56 Err(err) => {
57 debug!(
58 target: "consensus::debug-client",
59 %err,
60 url=%self.url,
61 "Failed to establish block subscription",
62 );
63 Ok(self.provider.watch_full_blocks().await?.full().into_stream().right_stream())
64 }
65 }
66 }
67}
68
69impl<N: Network, PrimitiveBlock> BlockProvider for RpcBlockProvider<N, PrimitiveBlock>
70where
71 PrimitiveBlock: Block + 'static,
72{
73 type Block = PrimitiveBlock;
74
75 async fn subscribe_blocks(&self, tx: Sender<Self::Block>) {
76 loop {
77 let Ok(mut stream) = self.full_block_stream().await.inspect_err(|err| {
78 warn!(
79 target: "consensus::debug-client",
80 %err,
81 url=%self.url,
82 "Failed to subscribe to blocks, retrying in 5s",
83 );
84 }) else {
85 if tx.is_closed() {
88 return;
89 }
90 tokio::time::sleep(std::time::Duration::from_secs(5)).await;
91 continue
92 };
93
94 while let Some(res) = stream.next().await {
95 match res {
96 Ok(block) => {
97 if tx.send((self.convert)(block)).await.is_err() {
98 return;
100 }
101 }
102 Err(err) => {
103 warn!(
104 target: "consensus::debug-client",
105 %err,
106 url=%self.url,
107 "Failed to fetch a block",
108 );
109 }
110 }
111 }
112 debug!(
114 target: "consensus::debug-client",
115 url=%self.url,
116 "Re-establishing block subscription",
117 );
118 }
119 }
120
121 async fn get_block(&self, block_number: u64) -> eyre::Result<Self::Block> {
122 let block = self
123 .provider
124 .get_block_by_number(block_number.into())
125 .full()
126 .await?
127 .ok_or_else(|| eyre::eyre!("block not found by number {}", block_number))?;
128 Ok((self.convert)(block))
129 }
130}