reth/
ress.rs

1use reth_ethereum_primitives::EthPrimitives;
2use reth_evm::execute::BlockExecutorProvider;
3use reth_network::{protocol::IntoRlpxSubProtocol, NetworkProtocols};
4use reth_network_api::FullNetwork;
5use reth_node_api::BeaconConsensusEngineEvent;
6use reth_node_core::args::RessArgs;
7use reth_provider::providers::{BlockchainProvider, ProviderNodeTypes};
8use reth_ress_protocol::{NodeType, ProtocolState, RessProtocolHandler};
9use reth_ress_provider::{maintain_pending_state, PendingState, RethRessProtocolProvider};
10use reth_tasks::TaskExecutor;
11use reth_tokio_util::EventStream;
12use tokio::sync::mpsc;
13use tracing::*;
14
15/// Install `ress` subprotocol if it's enabled.
16pub fn install_ress_subprotocol<P, E, N>(
17    args: RessArgs,
18    provider: BlockchainProvider<P>,
19    block_executor: E,
20    network: N,
21    task_executor: TaskExecutor,
22    engine_events: EventStream<BeaconConsensusEngineEvent<EthPrimitives>>,
23) -> eyre::Result<()>
24where
25    P: ProviderNodeTypes<Primitives = EthPrimitives>,
26    E: BlockExecutorProvider<Primitives = EthPrimitives> + Clone,
27    N: FullNetwork + NetworkProtocols,
28{
29    info!(target: "reth::cli", "Installing ress subprotocol");
30    let pending_state = PendingState::default();
31
32    // Spawn maintenance task for pending state.
33    task_executor.spawn(maintain_pending_state(
34        engine_events,
35        provider.clone(),
36        pending_state.clone(),
37    ));
38
39    let (tx, mut rx) = mpsc::unbounded_channel();
40    let provider = RethRessProtocolProvider::new(
41        provider,
42        block_executor,
43        Box::new(task_executor.clone()),
44        args.max_witness_window,
45        args.witness_max_parallel,
46        args.witness_cache_size,
47        pending_state,
48    )?;
49    network.add_rlpx_sub_protocol(
50        RessProtocolHandler {
51            provider,
52            node_type: NodeType::Stateful,
53            peers_handle: network.peers_handle().clone(),
54            max_active_connections: args.max_active_connections,
55            state: ProtocolState::new(tx),
56        }
57        .into_rlpx_sub_protocol(),
58    );
59    info!(target: "reth::cli", "Ress subprotocol support enabled");
60
61    task_executor.spawn(async move {
62        while let Some(event) = rx.recv().await {
63            trace!(target: "reth::ress", ?event, "Received ress event");
64        }
65    });
66    Ok(())
67}