reth_network/transactions/
config.rs

1use std::{fmt::Debug, marker::PhantomData, str::FromStr};
2
3use super::{
4    PeerMetadata, DEFAULT_MAX_COUNT_TRANSACTIONS_SEEN_BY_PEER,
5    DEFAULT_SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESP_ON_PACK_GET_POOLED_TRANSACTIONS_REQ,
6    SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESPONSE,
7};
8use crate::transactions::constants::tx_fetcher::{
9    DEFAULT_MAX_CAPACITY_CACHE_PENDING_FETCH, DEFAULT_MAX_COUNT_CONCURRENT_REQUESTS,
10    DEFAULT_MAX_COUNT_CONCURRENT_REQUESTS_PER_PEER,
11};
12use alloy_primitives::B256;
13use derive_more::{Constructor, Display};
14
15use reth_eth_wire::NetworkPrimitives;
16use reth_ethereum_primitives::TxType;
17
18/// Configuration for managing transactions within the network.
19#[derive(Debug, Clone)]
20#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
21pub struct TransactionsManagerConfig {
22    /// Configuration for fetching transactions.
23    pub transaction_fetcher_config: TransactionFetcherConfig,
24    /// Max number of seen transactions to store for each peer.
25    pub max_transactions_seen_by_peer_history: u32,
26    /// How new pending transactions are propagated.
27    #[cfg_attr(feature = "serde", serde(default))]
28    pub propagation_mode: TransactionPropagationMode,
29}
30
31impl Default for TransactionsManagerConfig {
32    fn default() -> Self {
33        Self {
34            transaction_fetcher_config: TransactionFetcherConfig::default(),
35            max_transactions_seen_by_peer_history: DEFAULT_MAX_COUNT_TRANSACTIONS_SEEN_BY_PEER,
36            propagation_mode: TransactionPropagationMode::default(),
37        }
38    }
39}
40
41/// Determines how new pending transactions are propagated to other peers in full.
42#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
43#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
44pub enum TransactionPropagationMode {
45    /// Send full transactions to sqrt of current peers.
46    #[default]
47    Sqrt,
48    /// Always send transactions in full.
49    All,
50    /// Send full transactions to a maximum number of peers
51    Max(usize),
52}
53
54impl TransactionPropagationMode {
55    /// Returns the number of peers full transactions should be propagated to.
56    pub(crate) fn full_peer_count(&self, peer_count: usize) -> usize {
57        match self {
58            Self::Sqrt => (peer_count as f64).sqrt().round() as usize,
59            Self::All => peer_count,
60            Self::Max(max) => peer_count.min(*max),
61        }
62    }
63}
64impl FromStr for TransactionPropagationMode {
65    type Err = String;
66
67    fn from_str(s: &str) -> Result<Self, Self::Err> {
68        let s = s.to_lowercase();
69        match s.as_str() {
70            "sqrt" => Ok(Self::Sqrt),
71            "all" => Ok(Self::All),
72            s => {
73                if let Some(num) = s.strip_prefix("max:") {
74                    num.parse::<usize>()
75                        .map(TransactionPropagationMode::Max)
76                        .map_err(|_| format!("Invalid number for Max variant: {num}"))
77                } else {
78                    Err(format!("Invalid transaction propagation mode: {s}"))
79                }
80            }
81        }
82    }
83}
84
85/// Configuration for fetching transactions.
86#[derive(Debug, Constructor, Clone)]
87#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
88pub struct TransactionFetcherConfig {
89    /// Max inflight [`GetPooledTransactions`](reth_eth_wire::GetPooledTransactions) requests.
90    pub max_inflight_requests: u32,
91    /// Max inflight [`GetPooledTransactions`](reth_eth_wire::GetPooledTransactions) requests per
92    /// peer.
93    pub max_inflight_requests_per_peer: u8,
94    /// Soft limit for the byte size of a
95    /// [`PooledTransactions`](reth_eth_wire::PooledTransactions) response on assembling a
96    /// [`GetPooledTransactions`](reth_eth_wire::GetPooledTransactions) request. Spec'd at 2
97    /// MiB.
98    pub soft_limit_byte_size_pooled_transactions_response: usize,
99    /// Soft limit for the byte size of the expected
100    /// [`PooledTransactions`](reth_eth_wire::PooledTransactions) response on packing a
101    /// [`GetPooledTransactions`](reth_eth_wire::GetPooledTransactions) request with hashes.
102    pub soft_limit_byte_size_pooled_transactions_response_on_pack_request: usize,
103    /// Max capacity of the cache of transaction hashes, for transactions that weren't yet fetched.
104    /// A transaction is pending fetch if its hash didn't fit into a
105    /// [`GetPooledTransactions`](reth_eth_wire::GetPooledTransactions) yet, or it wasn't returned
106    /// upon request to peers.
107    pub max_capacity_cache_txns_pending_fetch: u32,
108}
109
110impl Default for TransactionFetcherConfig {
111    fn default() -> Self {
112        Self {
113            max_inflight_requests: DEFAULT_MAX_COUNT_CONCURRENT_REQUESTS,
114            max_inflight_requests_per_peer: DEFAULT_MAX_COUNT_CONCURRENT_REQUESTS_PER_PEER,
115            soft_limit_byte_size_pooled_transactions_response:
116                SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESPONSE,
117            soft_limit_byte_size_pooled_transactions_response_on_pack_request:
118                DEFAULT_SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESP_ON_PACK_GET_POOLED_TRANSACTIONS_REQ,
119                max_capacity_cache_txns_pending_fetch: DEFAULT_MAX_CAPACITY_CACHE_PENDING_FETCH,
120        }
121    }
122}
123
124/// A policy defining which peers pending transactions are gossiped to.
125pub trait TransactionPropagationPolicy: Send + Sync + Unpin + 'static {
126    /// Filter a given peer based on the policy.
127    ///
128    /// This determines whether transactions can be propagated to this peer.
129    fn can_propagate<N: NetworkPrimitives>(&self, peer: &mut PeerMetadata<N>) -> bool;
130
131    /// A callback on the policy when a new peer session is established.
132    fn on_session_established<N: NetworkPrimitives>(&mut self, peer: &mut PeerMetadata<N>);
133
134    /// A callback on the policy when a peer session is closed.
135    fn on_session_closed<N: NetworkPrimitives>(&mut self, peer: &mut PeerMetadata<N>);
136}
137
138/// Determines which peers pending transactions are propagated to.
139#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Display)]
140#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
141pub enum TransactionPropagationKind {
142    /// Propagate transactions to all peers.
143    ///
144    /// No restrictions
145    #[default]
146    All,
147    /// Propagate transactions to only trusted peers.
148    Trusted,
149    /// Do not propagate transactions
150    None,
151}
152
153impl TransactionPropagationPolicy for TransactionPropagationKind {
154    fn can_propagate<N: NetworkPrimitives>(&self, peer: &mut PeerMetadata<N>) -> bool {
155        match self {
156            Self::All => true,
157            Self::Trusted => peer.peer_kind.is_trusted(),
158            Self::None => false,
159        }
160    }
161
162    fn on_session_established<N: NetworkPrimitives>(&mut self, _peer: &mut PeerMetadata<N>) {}
163
164    fn on_session_closed<N: NetworkPrimitives>(&mut self, _peer: &mut PeerMetadata<N>) {}
165}
166
167impl FromStr for TransactionPropagationKind {
168    type Err = String;
169
170    fn from_str(s: &str) -> Result<Self, Self::Err> {
171        match s {
172            "All" | "all" => Ok(Self::All),
173            "Trusted" | "trusted" => Ok(Self::Trusted),
174            "None" | "none" => Ok(Self::None),
175            _ => Err(format!("Invalid transaction propagation policy: {s}")),
176        }
177    }
178}
179
180/// Defines the outcome of evaluating a transaction against an `AnnouncementFilteringPolicy`.
181///
182/// Dictates how the `TransactionManager` should proceed on an announced transaction.
183#[derive(Debug, Clone, Copy, PartialEq, Eq)]
184pub enum AnnouncementAcceptance {
185    /// Accept the transaction announcement.
186    Accept,
187    /// Log the transaction but not fetching the transaction or penalizing the peer.
188    Ignore,
189    /// Reject
190    Reject {
191        /// If true, the peer sending this announcement should be penalized.
192        penalize_peer: bool,
193    },
194}
195
196/// A policy that defines how to handle incoming transaction announcements,
197/// particularly concerning transaction types and other announcement metadata.
198pub trait AnnouncementFilteringPolicy: Send + Sync + Unpin + 'static {
199    /// Decides how to handle a transaction announcement based on its type, hash, and size.
200    fn decide_on_announcement(&self, ty: u8, hash: &B256, size: usize) -> AnnouncementAcceptance;
201}
202
203/// A generic `AnnouncementFilteringPolicy` that enforces strict validation
204/// of transaction type based on a generic type `T`.
205#[derive(Debug, Clone)]
206pub struct TypedStrictFilter<T: TryFrom<u8> + Debug + Send + Sync + 'static>(PhantomData<T>);
207
208impl<T: TryFrom<u8> + Debug + Send + Sync + 'static> Default for TypedStrictFilter<T> {
209    fn default() -> Self {
210        Self(PhantomData)
211    }
212}
213
214impl<T> AnnouncementFilteringPolicy for TypedStrictFilter<T>
215where
216    T: TryFrom<u8> + Debug + Send + Sync + Unpin + 'static,
217    <T as TryFrom<u8>>::Error: Debug,
218{
219    fn decide_on_announcement(&self, ty: u8, hash: &B256, size: usize) -> AnnouncementAcceptance {
220        match T::try_from(ty) {
221            Ok(_valid_type) => AnnouncementAcceptance::Accept,
222            Err(e) => {
223                tracing::trace!(target: "net::tx::policy::strict_typed",
224                    type_param = %std::any::type_name::<T>(),
225                    %ty,
226                    %size,
227                    %hash,
228                    error = ?e,
229                    "Invalid or unrecognized transaction type byte. Rejecting entry and recommending peer penalization."
230                );
231                AnnouncementAcceptance::Reject { penalize_peer: true }
232            }
233        }
234    }
235}
236
237/// Type alias for a `TypedStrictFilter`. This is the default strict announcement filter.
238pub type StrictEthAnnouncementFilter = TypedStrictFilter<TxType>;
239
240/// An [`AnnouncementFilteringPolicy`] that permissively handles unknown type bytes
241/// based on a given type `T` using `T::try_from(u8)`.
242///
243/// If `T::try_from(ty)` succeeds, the announcement is accepted. Otherwise, it's ignored.
244#[derive(Debug, Clone)]
245pub struct TypedRelaxedFilter<T: TryFrom<u8> + Debug + Send + Sync + 'static>(PhantomData<T>);
246
247impl<T: TryFrom<u8> + Debug + Send + Sync + 'static> Default for TypedRelaxedFilter<T> {
248    fn default() -> Self {
249        Self(PhantomData)
250    }
251}
252
253impl<T> AnnouncementFilteringPolicy for TypedRelaxedFilter<T>
254where
255    T: TryFrom<u8> + Debug + Send + Sync + Unpin + 'static,
256    <T as TryFrom<u8>>::Error: Debug,
257{
258    fn decide_on_announcement(&self, ty: u8, hash: &B256, size: usize) -> AnnouncementAcceptance {
259        match T::try_from(ty) {
260            Ok(_valid_type) => AnnouncementAcceptance::Accept,
261            Err(e) => {
262                tracing::trace!(target: "net::tx::policy::relaxed_typed",
263                    type_param = %std::any::type_name::<T>(),
264                    %ty,
265                    %size,
266                    %hash,
267                    error = ?e,
268                    "Unknown transaction type byte. Ignoring entry."
269                );
270                AnnouncementAcceptance::Ignore
271            }
272        }
273    }
274}
275
276/// Type alias for `TypedRelaxedFilter`. This filter accepts known Ethereum transaction types and
277/// ignores unknown ones without penalizing the peer.
278pub type RelaxedEthAnnouncementFilter = TypedRelaxedFilter<TxType>;
279
280#[cfg(test)]
281mod tests {
282    use super::*;
283
284    #[test]
285    fn test_transaction_propagation_mode_from_str() {
286        // Test "sqrt" variant
287        assert_eq!(
288            TransactionPropagationMode::from_str("sqrt").unwrap(),
289            TransactionPropagationMode::Sqrt
290        );
291        assert_eq!(
292            TransactionPropagationMode::from_str("SQRT").unwrap(),
293            TransactionPropagationMode::Sqrt
294        );
295        assert_eq!(
296            TransactionPropagationMode::from_str("Sqrt").unwrap(),
297            TransactionPropagationMode::Sqrt
298        );
299
300        // Test "all" variant
301        assert_eq!(
302            TransactionPropagationMode::from_str("all").unwrap(),
303            TransactionPropagationMode::All
304        );
305        assert_eq!(
306            TransactionPropagationMode::from_str("ALL").unwrap(),
307            TransactionPropagationMode::All
308        );
309        assert_eq!(
310            TransactionPropagationMode::from_str("All").unwrap(),
311            TransactionPropagationMode::All
312        );
313
314        // Test "max:N" variant
315        assert_eq!(
316            TransactionPropagationMode::from_str("max:10").unwrap(),
317            TransactionPropagationMode::Max(10)
318        );
319        assert_eq!(
320            TransactionPropagationMode::from_str("MAX:42").unwrap(),
321            TransactionPropagationMode::Max(42)
322        );
323        assert_eq!(
324            TransactionPropagationMode::from_str("Max:100").unwrap(),
325            TransactionPropagationMode::Max(100)
326        );
327
328        // Test invalid inputs
329        assert!(TransactionPropagationMode::from_str("invalid").is_err());
330        assert!(TransactionPropagationMode::from_str("max:not_a_number").is_err());
331        assert!(TransactionPropagationMode::from_str("max:").is_err());
332        assert!(TransactionPropagationMode::from_str("max").is_err());
333        assert!(TransactionPropagationMode::from_str("").is_err());
334    }
335}