1use crate::{
4 eth_requests::EthRequestHandler,
5 metrics::NETWORK_POOL_TRANSACTIONS_SCOPE,
6 transactions::{
7 config::{
8 AnnouncementFilteringPolicy, StrictEthAnnouncementFilter, TransactionPropagationKind,
9 },
10 policy::NetworkPolicies,
11 TransactionPropagationPolicy, TransactionsManager, TransactionsManagerConfig,
12 },
13 NetworkHandle, NetworkManager,
14};
15use reth_eth_wire::{EthNetworkPrimitives, NetworkPrimitives};
16use reth_metrics::common::mpsc::memory_bounded_channel;
17use reth_network_api::test_utils::PeersHandleProvider;
18use reth_storage_api::BalProvider;
19use reth_transaction_pool::{BlobStore, TransactionPool};
20use tokio::sync::mpsc;
21
22pub(crate) const ETH_REQUEST_CHANNEL_CAPACITY: usize = 256;
25
26#[expect(missing_debug_implementations)]
28pub struct NetworkBuilder<Tx, Eth, N: NetworkPrimitives = EthNetworkPrimitives> {
29 pub(crate) network: NetworkManager<N>,
30 pub(crate) transactions: Tx,
31 pub(crate) request_handler: Eth,
32}
33
34impl<Tx, Eth, N: NetworkPrimitives> NetworkBuilder<Tx, Eth, N> {
37 pub fn split(self) -> (NetworkManager<N>, Tx, Eth) {
39 let Self { network, transactions, request_handler } = self;
40 (network, transactions, request_handler)
41 }
42
43 pub const fn network(&self) -> &NetworkManager<N> {
45 &self.network
46 }
47
48 pub const fn network_mut(&mut self) -> &mut NetworkManager<N> {
50 &mut self.network
51 }
52
53 pub fn handle(&self) -> NetworkHandle<N> {
55 self.network.handle().clone()
56 }
57
58 pub fn split_with_handle(self) -> (NetworkHandle<N>, NetworkManager<N>, Tx, Eth) {
60 let Self { network, transactions, request_handler } = self;
61 let handle = network.handle().clone();
62 (handle, network, transactions, request_handler)
63 }
64
65 pub fn request_handler<Client>(
67 self,
68 client: Client,
69 ) -> NetworkBuilder<Tx, EthRequestHandler<Client, N>, N>
70 where
71 Client: BalProvider,
72 {
73 let Self { mut network, transactions, .. } = self;
74 let (tx, rx) = mpsc::channel(ETH_REQUEST_CHANNEL_CAPACITY);
75 network.set_eth_request_handler(tx);
76 let peers = network.handle().peers_handle().clone();
77 let request_handler = EthRequestHandler::new(client, peers, rx);
78 NetworkBuilder { network, request_handler, transactions }
79 }
80
81 pub fn request_handler_with_blob_store<Client>(
83 self,
84 client: Client,
85 blob_store: Box<dyn BlobStore>,
86 ) -> NetworkBuilder<Tx, EthRequestHandler<Client, N>, N>
87 where
88 Client: BalProvider,
89 {
90 let NetworkBuilder { network, transactions, request_handler } =
91 self.request_handler(client);
92 let request_handler = request_handler.with_blob_store(blob_store);
93 NetworkBuilder { network, request_handler, transactions }
94 }
95
96 pub fn transactions<Pool: TransactionPool>(
98 self,
99 pool: Pool,
100 transactions_manager_config: TransactionsManagerConfig,
101 ) -> NetworkBuilder<TransactionsManager<Pool, N>, Eth, N> {
102 self.transactions_with_policy(
103 pool,
104 transactions_manager_config,
105 TransactionPropagationKind::default(),
106 )
107 }
108
109 pub fn transactions_with_policy<Pool: TransactionPool, P: TransactionPropagationPolicy<N>>(
113 self,
114 pool: Pool,
115 transactions_manager_config: TransactionsManagerConfig,
116 propagation_policy: P,
117 ) -> NetworkBuilder<TransactionsManager<Pool, N>, Eth, N> {
118 self.transactions_with_policies(
119 pool,
120 transactions_manager_config,
121 propagation_policy,
122 StrictEthAnnouncementFilter::default(),
123 )
124 }
125
126 pub fn transactions_with_policies<
131 Pool: TransactionPool,
132 P: TransactionPropagationPolicy<N>,
133 A: AnnouncementFilteringPolicy<N>,
134 >(
135 self,
136 pool: Pool,
137 transactions_manager_config: TransactionsManagerConfig,
138 propagation_policy: P,
139 announcement_policy: A,
140 ) -> NetworkBuilder<TransactionsManager<Pool, N>, Eth, N> {
141 let Self { mut network, request_handler, .. } = self;
142 let (tx, rx) = memory_bounded_channel(
143 transactions_manager_config.tx_channel_memory_limit_bytes,
144 NETWORK_POOL_TRANSACTIONS_SCOPE,
145 );
146 network.set_transactions(tx);
147 let handle = network.handle().clone();
148 let policies = NetworkPolicies::new(propagation_policy, announcement_policy);
149
150 let transactions = TransactionsManager::with_policy(
151 handle,
152 pool,
153 rx,
154 transactions_manager_config,
155 policies,
156 );
157 NetworkBuilder { network, request_handler, transactions }
158 }
159}