reth_consensus_debug_client/providers/
rpc.rs1use crate::BlockProvider;
2use alloy_provider::{Network, Provider, ProviderBuilder};
3use alloy_transport::TransportResult;
4use futures::{Stream, StreamExt};
5use reth_node_api::Block;
6use reth_tracing::tracing::{debug, warn};
7use std::sync::Arc;
8use tokio::sync::mpsc::Sender;
9
10#[derive(derive_more::Debug, Clone)]
13pub struct RpcBlockProvider<N: Network, PrimitiveBlock> {
14 #[debug(skip)]
15 provider: Arc<dyn Provider<N>>,
16 url: String,
17 #[debug(skip)]
18 convert: Arc<dyn Fn(N::BlockResponse) -> PrimitiveBlock + Send + Sync>,
19}
20
21impl<N: Network, PrimitiveBlock> RpcBlockProvider<N, PrimitiveBlock> {
22 pub async fn new(
24 rpc_url: &str,
25 convert: impl Fn(N::BlockResponse) -> PrimitiveBlock + Send + Sync + 'static,
26 ) -> eyre::Result<Self> {
27 Ok(Self {
28 provider: Arc::new(ProviderBuilder::default().connect(rpc_url).await?),
29 url: rpc_url.to_string(),
30 convert: Arc::new(convert),
31 })
32 }
33
34 async fn full_block_stream(
39 &self,
40 ) -> TransportResult<impl Stream<Item = TransportResult<N::BlockResponse>>> {
41 match self.provider.subscribe_full_blocks().full().into_stream().await {
43 Ok(sub) => Ok(sub.left_stream()),
44 Err(err) => {
45 debug!(
46 target: "consensus::debug-client",
47 %err,
48 url=%self.url,
49 "Failed to establish block subscription",
50 );
51 Ok(self.provider.watch_full_blocks().await?.full().into_stream().right_stream())
52 }
53 }
54 }
55}
56
57impl<N: Network, PrimitiveBlock> BlockProvider for RpcBlockProvider<N, PrimitiveBlock>
58where
59 PrimitiveBlock: Block + 'static,
60{
61 type Block = PrimitiveBlock;
62
63 async fn subscribe_blocks(&self, tx: Sender<Self::Block>) {
64 let Ok(mut stream) = self.full_block_stream().await.inspect_err(|err| {
65 warn!(
66 target: "consensus::debug-client",
67 %err,
68 url=%self.url,
69 "Failed to subscribe to blocks",
70 );
71 }) else {
72 return
73 };
74
75 while let Some(res) = stream.next().await {
76 match res {
77 Ok(block) => {
78 if tx.send((self.convert)(block)).await.is_err() {
79 break;
81 }
82 }
83 Err(err) => {
84 warn!(
85 target: "consensus::debug-client",
86 %err,
87 url=%self.url,
88 "Failed to fetch a block",
89 );
90 }
91 }
92 }
93 }
94
95 async fn get_block(&self, block_number: u64) -> eyre::Result<Self::Block> {
96 let block = self
97 .provider
98 .get_block_by_number(block_number.into())
99 .full()
100 .await?
101 .ok_or_else(|| eyre::eyre!("block not found by number {}", block_number))?;
102 Ok((self.convert)(block))
103 }
104}