reth_consensus_debug_client/providers/
rpc.rs
1use crate::BlockProvider;
2use alloy_consensus::BlockHeader;
3use alloy_provider::{Network, Provider, ProviderBuilder};
4use futures::StreamExt;
5use reth_node_api::Block;
6use reth_tracing::tracing::warn;
7use std::sync::Arc;
8use tokio::sync::mpsc::Sender;
9
10#[derive(derive_more::Debug, Clone)]
13pub struct RpcBlockProvider<N: Network, PrimitiveBlock> {
14 #[debug(skip)]
15 provider: Arc<dyn Provider<N>>,
16 url: String,
17 #[debug(skip)]
18 convert: Arc<dyn Fn(N::BlockResponse) -> PrimitiveBlock + Send + Sync>,
19}
20
21impl<N: Network, PrimitiveBlock> RpcBlockProvider<N, PrimitiveBlock> {
22 pub async fn new(
24 rpc_url: &str,
25 convert: impl Fn(N::BlockResponse) -> PrimitiveBlock + Send + Sync + 'static,
26 ) -> eyre::Result<Self> {
27 Ok(Self {
28 provider: Arc::new(
29 ProviderBuilder::new()
30 .disable_recommended_fillers()
31 .network::<N>()
32 .connect(rpc_url)
33 .await?,
34 ),
35 url: rpc_url.to_string(),
36 convert: Arc::new(convert),
37 })
38 }
39}
40
41impl<N: Network, PrimitiveBlock> BlockProvider for RpcBlockProvider<N, PrimitiveBlock>
42where
43 PrimitiveBlock: Block + 'static,
44{
45 type Block = PrimitiveBlock;
46
47 async fn subscribe_blocks(&self, tx: Sender<Self::Block>) {
48 let mut stream = match self.provider.subscribe_blocks().await {
49 Ok(sub) => sub.into_stream(),
50 Err(err) => {
51 warn!(
52 target: "consensus::debug-client",
53 %err,
54 url=%self.url,
55 "Failed to subscribe to blocks",
56 );
57 return;
58 }
59 };
60 while let Some(header) = stream.next().await {
61 match self.get_block(header.number()).await {
62 Ok(block) => {
63 if tx.send(block).await.is_err() {
64 break;
66 }
67 }
68 Err(err) => {
69 warn!(
70 target: "consensus::debug-client",
71 %err,
72 url=%self.url,
73 "Failed to fetch a block",
74 );
75 }
76 }
77 }
78 }
79
80 async fn get_block(&self, block_number: u64) -> eyre::Result<Self::Block> {
81 let block = self
82 .provider
83 .get_block_by_number(block_number.into())
84 .full()
85 .await?
86 .ok_or_else(|| eyre::eyre!("block not found by number {}", block_number))?;
87 Ok((self.convert)(block))
88 }
89}