1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
use crate::BlockProvider;
use alloy_eips::BlockNumberOrTag;
use alloy_provider::{Provider, ProviderBuilder};
use futures::StreamExt;
use reth_node_core::rpc::types::Block;
use reth_rpc_types::BlockTransactionsKind;
use tokio::sync::mpsc::Sender;

/// Block provider that fetches new blocks from an RPC endpoint using a websocket connection.
#[derive(Debug, Clone)]
pub struct RpcBlockProvider {
    ws_rpc_url: String,
}

impl RpcBlockProvider {
    /// Create a new RPC block provider with the given WS RPC URL.
    pub const fn new(ws_rpc_url: String) -> Self {
        Self { ws_rpc_url }
    }
}

impl BlockProvider for RpcBlockProvider {
    async fn subscribe_blocks(&self, tx: Sender<Block>) {
        let ws_provider = ProviderBuilder::new()
            .on_builtin(&self.ws_rpc_url)
            .await
            .expect("failed to create WS provider");
        let mut stream = ws_provider
            .subscribe_blocks()
            .await
            .expect("failed to subscribe on new blocks")
            .into_stream();

        while let Some(block) = stream.next().await {
            let full_block = ws_provider
                .get_block_by_hash(block.header.hash, BlockTransactionsKind::Full)
                .await
                .expect("failed to get block")
                .expect("block not found");
            if tx.send(full_block).await.is_err() {
                // channel closed
                break;
            }
        }
    }

    async fn get_block(&self, block_number: u64) -> eyre::Result<Block> {
        let ws_provider = ProviderBuilder::new()
            .on_builtin(&self.ws_rpc_url)
            .await
            .expect("failed to create WS provider");
        let block: Block = ws_provider
            .get_block_by_number(BlockNumberOrTag::Number(block_number), true)
            .await?
            .ok_or_else(|| eyre::eyre!("block not found by number {}", block_number))?;
        Ok(block)
    }
}