1use reth_ethereum_primitives::EthPrimitives;
2use reth_evm::execute::BlockExecutorProvider;
3use reth_network::{protocol::IntoRlpxSubProtocol, NetworkProtocols};
4use reth_network_api::FullNetwork;
5use reth_node_api::BeaconConsensusEngineEvent;
6use reth_node_core::args::RessArgs;
7use reth_provider::providers::{BlockchainProvider, ProviderNodeTypes};
8use reth_ress_protocol::{NodeType, ProtocolState, RessProtocolHandler};
9use reth_ress_provider::{maintain_pending_state, PendingState, RethRessProtocolProvider};
10use reth_tasks::TaskExecutor;
11use reth_tokio_util::EventStream;
12use tokio::sync::mpsc;
13use tracing::*;
14
15pub fn install_ress_subprotocol<P, E, N>(
17 args: RessArgs,
18 provider: BlockchainProvider<P>,
19 block_executor: E,
20 network: N,
21 task_executor: TaskExecutor,
22 engine_events: EventStream<BeaconConsensusEngineEvent<EthPrimitives>>,
23) -> eyre::Result<()>
24where
25 P: ProviderNodeTypes<Primitives = EthPrimitives>,
26 E: BlockExecutorProvider<Primitives = EthPrimitives> + Clone,
27 N: FullNetwork + NetworkProtocols,
28{
29 info!(target: "reth::cli", "Installing ress subprotocol");
30 let pending_state = PendingState::default();
31
32 task_executor.spawn(maintain_pending_state(
34 engine_events,
35 provider.clone(),
36 pending_state.clone(),
37 ));
38
39 let (tx, mut rx) = mpsc::unbounded_channel();
40 let provider = RethRessProtocolProvider::new(
41 provider,
42 block_executor,
43 Box::new(task_executor.clone()),
44 args.max_witness_window,
45 args.witness_max_parallel,
46 args.witness_cache_size,
47 pending_state,
48 )?;
49 network.add_rlpx_sub_protocol(
50 RessProtocolHandler {
51 provider,
52 node_type: NodeType::Stateful,
53 peers_handle: network.peers_handle().clone(),
54 max_active_connections: args.max_active_connections,
55 state: ProtocolState::new(tx),
56 }
57 .into_rlpx_sub_protocol(),
58 );
59 info!(target: "reth::cli", "Ress subprotocol support enabled");
60
61 task_executor.spawn(async move {
62 while let Some(event) = rx.recv().await {
63 trace!(target: "reth::ress", ?event, "Received ress event");
64 }
65 });
66 Ok(())
67}