1use crate::{
4 eth_requests::EthRequestHandler,
5 metrics::NETWORK_POOL_TRANSACTIONS_SCOPE,
6 transactions::{
7 config::{
8 AnnouncementFilteringPolicy, StrictEthAnnouncementFilter, TransactionPropagationKind,
9 },
10 policy::NetworkPolicies,
11 TransactionPropagationPolicy, TransactionsManager, TransactionsManagerConfig,
12 },
13 NetworkHandle, NetworkManager,
14};
15use reth_eth_wire::{EthNetworkPrimitives, NetworkPrimitives};
16use reth_metrics::common::mpsc::memory_bounded_channel;
17use reth_network_api::test_utils::PeersHandleProvider;
18use reth_storage_api::BalProvider;
19use reth_transaction_pool::TransactionPool;
20use tokio::sync::mpsc;
21
22pub(crate) const ETH_REQUEST_CHANNEL_CAPACITY: usize = 256;
25
26#[expect(missing_debug_implementations)]
28pub struct NetworkBuilder<Tx, Eth, N: NetworkPrimitives = EthNetworkPrimitives> {
29 pub(crate) network: NetworkManager<N>,
30 pub(crate) transactions: Tx,
31 pub(crate) request_handler: Eth,
32}
33
34impl<Tx, Eth, N: NetworkPrimitives> NetworkBuilder<Tx, Eth, N> {
37 pub fn split(self) -> (NetworkManager<N>, Tx, Eth) {
39 let Self { network, transactions, request_handler } = self;
40 (network, transactions, request_handler)
41 }
42
43 pub const fn network(&self) -> &NetworkManager<N> {
45 &self.network
46 }
47
48 pub const fn network_mut(&mut self) -> &mut NetworkManager<N> {
50 &mut self.network
51 }
52
53 pub fn handle(&self) -> NetworkHandle<N> {
55 self.network.handle().clone()
56 }
57
58 pub fn split_with_handle(self) -> (NetworkHandle<N>, NetworkManager<N>, Tx, Eth) {
60 let Self { network, transactions, request_handler } = self;
61 let handle = network.handle().clone();
62 (handle, network, transactions, request_handler)
63 }
64
65 pub fn request_handler<Client>(
67 self,
68 client: Client,
69 ) -> NetworkBuilder<Tx, EthRequestHandler<Client, N>, N>
70 where
71 Client: BalProvider,
72 {
73 let Self { mut network, transactions, .. } = self;
74 let (tx, rx) = mpsc::channel(ETH_REQUEST_CHANNEL_CAPACITY);
75 network.set_eth_request_handler(tx);
76 let peers = network.handle().peers_handle().clone();
77 let request_handler = EthRequestHandler::new(client, peers, rx);
78 NetworkBuilder { network, request_handler, transactions }
79 }
80
81 pub fn transactions<Pool: TransactionPool>(
83 self,
84 pool: Pool,
85 transactions_manager_config: TransactionsManagerConfig,
86 ) -> NetworkBuilder<TransactionsManager<Pool, N>, Eth, N> {
87 self.transactions_with_policy(
88 pool,
89 transactions_manager_config,
90 TransactionPropagationKind::default(),
91 )
92 }
93
94 pub fn transactions_with_policy<Pool: TransactionPool, P: TransactionPropagationPolicy<N>>(
98 self,
99 pool: Pool,
100 transactions_manager_config: TransactionsManagerConfig,
101 propagation_policy: P,
102 ) -> NetworkBuilder<TransactionsManager<Pool, N>, Eth, N> {
103 self.transactions_with_policies(
104 pool,
105 transactions_manager_config,
106 propagation_policy,
107 StrictEthAnnouncementFilter::default(),
108 )
109 }
110
111 pub fn transactions_with_policies<
116 Pool: TransactionPool,
117 P: TransactionPropagationPolicy<N>,
118 A: AnnouncementFilteringPolicy<N>,
119 >(
120 self,
121 pool: Pool,
122 transactions_manager_config: TransactionsManagerConfig,
123 propagation_policy: P,
124 announcement_policy: A,
125 ) -> NetworkBuilder<TransactionsManager<Pool, N>, Eth, N> {
126 let Self { mut network, request_handler, .. } = self;
127 let (tx, rx) = memory_bounded_channel(
128 transactions_manager_config.tx_channel_memory_limit_bytes,
129 NETWORK_POOL_TRANSACTIONS_SCOPE,
130 );
131 network.set_transactions(tx);
132 let handle = network.handle().clone();
133 let policies = NetworkPolicies::new(propagation_policy, announcement_policy);
134
135 let transactions = TransactionsManager::with_policy(
136 handle,
137 pool,
138 rx,
139 transactions_manager_config,
140 policies,
141 );
142 NetworkBuilder { network, request_handler, transactions }
143 }
144}