Skip to main content

reth_network/transactions/
config.rs

1use core::fmt;
2use std::{fmt::Debug, str::FromStr};
3
4use super::{
5    PeerMetadata, DEFAULT_MAX_COUNT_TRANSACTIONS_SEEN_BY_PEER,
6    DEFAULT_SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESP_ON_PACK_GET_POOLED_TRANSACTIONS_REQ,
7    SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESPONSE,
8};
9use crate::transactions::constants::{
10    tx_fetcher::{
11        DEFAULT_MAX_CAPACITY_CACHE_PENDING_FETCH, DEFAULT_MAX_COUNT_CONCURRENT_REQUESTS,
12        DEFAULT_MAX_COUNT_CONCURRENT_REQUESTS_PER_PEER,
13    },
14    tx_manager::DEFAULT_TX_MANAGER_CHANNEL_MEMORY_LIMIT_BYTES,
15};
16use alloy_eips::eip2718::IsTyped2718;
17use alloy_primitives::B256;
18use derive_more::{Constructor, Display};
19use reth_eth_wire::NetworkPrimitives;
20use reth_network_types::peers::kind::PeerKind;
21
22/// Configuration for managing transactions within the network.
23#[derive(Debug, Clone)]
24#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
25pub struct TransactionsManagerConfig {
26    /// Configuration for fetching transactions.
27    pub transaction_fetcher_config: TransactionFetcherConfig,
28    /// Max number of seen transactions to store for each peer.
29    pub max_transactions_seen_by_peer_history: u32,
30    /// How new pending transactions are propagated.
31    #[cfg_attr(feature = "serde", serde(default))]
32    pub propagation_mode: TransactionPropagationMode,
33    /// Which peers we accept incoming transactions or announcements from.
34    #[cfg_attr(feature = "serde", serde(default))]
35    pub ingress_policy: TransactionIngressPolicy,
36    /// Memory limit (in bytes) for the channel that carries
37    /// `NetworkTransactionEvent`s from the `NetworkManager` to the `TransactionsManager`.
38    ///
39    /// When the budget is exhausted, new events are dropped.
40    #[cfg_attr(feature = "serde", serde(default = "default_tx_channel_memory_limit_bytes"))]
41    pub tx_channel_memory_limit_bytes: usize,
42}
43
44#[cfg(feature = "serde")]
45const fn default_tx_channel_memory_limit_bytes() -> usize {
46    DEFAULT_TX_MANAGER_CHANNEL_MEMORY_LIMIT_BYTES
47}
48
49impl Default for TransactionsManagerConfig {
50    fn default() -> Self {
51        Self {
52            transaction_fetcher_config: TransactionFetcherConfig::default(),
53            max_transactions_seen_by_peer_history: DEFAULT_MAX_COUNT_TRANSACTIONS_SEEN_BY_PEER,
54            propagation_mode: TransactionPropagationMode::default(),
55            ingress_policy: TransactionIngressPolicy::default(),
56            tx_channel_memory_limit_bytes: DEFAULT_TX_MANAGER_CHANNEL_MEMORY_LIMIT_BYTES,
57        }
58    }
59}
60
61/// Determines how new pending transactions are propagated to other peers in full.
62#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
63#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
64pub enum TransactionPropagationMode {
65    /// Send full transactions to sqrt of current peers.
66    #[default]
67    Sqrt,
68    /// Always send transactions in full.
69    All,
70    /// Send full transactions to a maximum number of peers
71    Max(usize),
72}
73
74impl TransactionPropagationMode {
75    /// Returns the number of peers full transactions should be propagated to.
76    pub(crate) fn full_peer_count(&self, peer_count: usize) -> usize {
77        match self {
78            Self::Sqrt => (peer_count as f64).sqrt().round() as usize,
79            Self::All => peer_count,
80            Self::Max(max) => peer_count.min(*max),
81        }
82    }
83}
84impl std::fmt::Display for TransactionPropagationMode {
85    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
86        match self {
87            Self::Sqrt => write!(f, "sqrt"),
88            Self::All => write!(f, "all"),
89            Self::Max(max) => write!(f, "max:{max}"),
90        }
91    }
92}
93
94impl FromStr for TransactionPropagationMode {
95    type Err = String;
96
97    fn from_str(s: &str) -> Result<Self, Self::Err> {
98        let s = s.to_lowercase();
99        match s.as_str() {
100            "sqrt" => Ok(Self::Sqrt),
101            "all" => Ok(Self::All),
102            s => {
103                if let Some(num) = s.strip_prefix("max:") {
104                    num.parse::<usize>()
105                        .map(TransactionPropagationMode::Max)
106                        .map_err(|_| format!("Invalid number for Max variant: {num}"))
107                } else {
108                    Err(format!("Invalid transaction propagation mode: {s}"))
109                }
110            }
111        }
112    }
113}
114
115/// Configuration for fetching transactions.
116#[derive(Debug, Constructor, Clone)]
117#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
118pub struct TransactionFetcherConfig {
119    /// Max inflight [`GetPooledTransactions`](reth_eth_wire::GetPooledTransactions) requests.
120    pub max_inflight_requests: u32,
121    /// Max inflight [`GetPooledTransactions`](reth_eth_wire::GetPooledTransactions) requests per
122    /// peer.
123    pub max_inflight_requests_per_peer: u8,
124    /// Soft limit for the byte size of a
125    /// [`PooledTransactions`](reth_eth_wire::PooledTransactions) response on assembling a
126    /// [`GetPooledTransactions`](reth_eth_wire::GetPooledTransactions) request. Spec'd at 2
127    /// MiB.
128    pub soft_limit_byte_size_pooled_transactions_response: usize,
129    /// Soft limit for the byte size of the expected
130    /// [`PooledTransactions`](reth_eth_wire::PooledTransactions) response on packing a
131    /// [`GetPooledTransactions`](reth_eth_wire::GetPooledTransactions) request with hashes.
132    pub soft_limit_byte_size_pooled_transactions_response_on_pack_request: usize,
133    /// Max capacity of the cache of transaction hashes, for transactions that weren't yet fetched.
134    /// A transaction is pending fetch if its hash didn't fit into a
135    /// [`GetPooledTransactions`](reth_eth_wire::GetPooledTransactions) yet, or it wasn't returned
136    /// upon request to peers.
137    pub max_capacity_cache_txns_pending_fetch: u32,
138}
139
140impl Default for TransactionFetcherConfig {
141    fn default() -> Self {
142        Self {
143            max_inflight_requests: DEFAULT_MAX_COUNT_CONCURRENT_REQUESTS,
144            max_inflight_requests_per_peer: DEFAULT_MAX_COUNT_CONCURRENT_REQUESTS_PER_PEER,
145            soft_limit_byte_size_pooled_transactions_response:
146                SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESPONSE,
147            soft_limit_byte_size_pooled_transactions_response_on_pack_request:
148                DEFAULT_SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESP_ON_PACK_GET_POOLED_TRANSACTIONS_REQ,
149                max_capacity_cache_txns_pending_fetch: DEFAULT_MAX_CAPACITY_CACHE_PENDING_FETCH,
150        }
151    }
152}
153
154/// A policy defining which peers pending transactions are gossiped to.
155pub trait TransactionPropagationPolicy<N: NetworkPrimitives>:
156    Send + Sync + Unpin + fmt::Debug + 'static
157{
158    /// Filter a given peer based on the policy.
159    ///
160    /// This determines whether transactions can be propagated to this peer.
161    fn can_propagate(&self, peer: &mut PeerMetadata<N>) -> bool;
162
163    /// A callback on the policy when a new peer session is established.
164    fn on_session_established(&mut self, peer: &mut PeerMetadata<N>);
165
166    /// A callback on the policy when a peer session is closed.
167    fn on_session_closed(&mut self, peer: &mut PeerMetadata<N>);
168}
169
170/// Determines which peers pending transactions are propagated to.
171#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Display)]
172#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
173pub enum TransactionPropagationKind {
174    /// Propagate transactions to all peers.
175    ///
176    /// No restrictions
177    #[default]
178    All,
179    /// Propagate transactions to only trusted peers.
180    Trusted,
181    /// Do not propagate transactions
182    None,
183}
184
185impl<N: NetworkPrimitives> TransactionPropagationPolicy<N> for TransactionPropagationKind {
186    fn can_propagate(&self, peer: &mut PeerMetadata<N>) -> bool {
187        match self {
188            Self::All => true,
189            Self::Trusted => peer.peer_kind.is_trusted(),
190            Self::None => false,
191        }
192    }
193
194    fn on_session_established(&mut self, _peer: &mut PeerMetadata<N>) {}
195
196    fn on_session_closed(&mut self, _peer: &mut PeerMetadata<N>) {}
197}
198
199impl FromStr for TransactionPropagationKind {
200    type Err = String;
201
202    fn from_str(s: &str) -> Result<Self, Self::Err> {
203        match s {
204            "All" | "all" => Ok(Self::All),
205            "Trusted" | "trusted" => Ok(Self::Trusted),
206            "None" | "none" => Ok(Self::None),
207            _ => Err(format!("Invalid transaction propagation policy: {s}")),
208        }
209    }
210}
211
212/// Determines which peers we will accept incoming transactions or announcements from.
213#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Display)]
214#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
215pub enum TransactionIngressPolicy {
216    /// Accept transactions from any peer.
217    #[default]
218    All,
219    /// Accept transactions only from trusted peers.
220    Trusted,
221    /// Drop all incoming transactions.
222    None,
223}
224
225impl TransactionIngressPolicy {
226    /// Returns true if the ingress policy allows the provided peer kind.
227    pub const fn allows(&self, peer_kind: PeerKind) -> bool {
228        match self {
229            Self::All => true,
230            Self::Trusted => peer_kind.is_trusted(),
231            Self::None => false,
232        }
233    }
234
235    /// Returns true if the ingress policy accepts transactions from any peer.
236    pub const fn allows_all(&self) -> bool {
237        matches!(self, Self::All)
238    }
239}
240
241impl FromStr for TransactionIngressPolicy {
242    type Err = String;
243
244    fn from_str(s: &str) -> Result<Self, Self::Err> {
245        match s {
246            "All" | "all" => Ok(Self::All),
247            "Trusted" | "trusted" => Ok(Self::Trusted),
248            "None" | "none" => Ok(Self::None),
249            _ => Err(format!("Invalid transaction ingress policy: {s}")),
250        }
251    }
252}
253
254/// Defines the outcome of evaluating a transaction against an `AnnouncementFilteringPolicy`.
255///
256/// Dictates how the `TransactionManager` should proceed on an announced transaction.
257#[derive(Debug, Clone, Copy, PartialEq, Eq)]
258pub enum AnnouncementAcceptance {
259    /// Accept the transaction announcement.
260    Accept,
261    /// Log the transaction but not fetching the transaction or penalizing the peer.
262    Ignore,
263    /// Reject
264    Reject {
265        /// If true, the peer sending this announcement should be penalized.
266        penalize_peer: bool,
267    },
268}
269
270/// A policy that defines how to handle incoming transaction announcements,
271/// particularly concerning transaction types and other announcement metadata.
272pub trait AnnouncementFilteringPolicy<N: NetworkPrimitives>:
273    Send + Sync + Unpin + fmt::Debug + 'static
274{
275    /// Decides how to handle a transaction announcement based on its type, hash, and size.
276    fn decide_on_announcement(&self, ty: u8, hash: &B256, size: usize) -> AnnouncementAcceptance;
277}
278
279/// A generic `AnnouncementFilteringPolicy` that enforces strict validation
280/// of transaction type based on a generic type `T`.
281#[derive(Debug, Clone, Default)]
282#[non_exhaustive]
283pub struct TypedStrictFilter;
284
285impl<N: NetworkPrimitives> AnnouncementFilteringPolicy<N> for TypedStrictFilter {
286    fn decide_on_announcement(&self, ty: u8, hash: &B256, size: usize) -> AnnouncementAcceptance {
287        if N::PooledTransaction::is_type(ty) {
288            AnnouncementAcceptance::Accept
289        } else {
290            tracing::trace!(target: "net::tx::policy::strict_typed",
291                %ty,
292                %size,
293                %hash,
294                "Invalid or unrecognized transaction type byte. Rejecting entry and recommending peer penalization."
295            );
296            AnnouncementAcceptance::Reject { penalize_peer: true }
297        }
298    }
299}
300
301/// Type alias for a `TypedStrictFilter`. This is the default strict announcement filter.
302pub type StrictEthAnnouncementFilter = TypedStrictFilter;
303
304/// An [`AnnouncementFilteringPolicy`] that permissively handles unknown type bytes
305/// based on a given type `T` using `T::try_from(u8)`.
306///
307/// If `T::try_from(ty)` succeeds, the announcement is accepted. Otherwise, it's ignored.
308#[derive(Debug, Clone, Default)]
309#[non_exhaustive]
310pub struct TypedRelaxedFilter;
311
312impl<N: NetworkPrimitives> AnnouncementFilteringPolicy<N> for TypedRelaxedFilter {
313    fn decide_on_announcement(&self, ty: u8, hash: &B256, size: usize) -> AnnouncementAcceptance {
314        if N::PooledTransaction::is_type(ty) {
315            AnnouncementAcceptance::Accept
316        } else {
317            tracing::trace!(target: "net::tx::policy::relaxed_typed",
318                %ty,
319                %size,
320                %hash,
321                "Unknown transaction type byte. Ignoring entry."
322            );
323            AnnouncementAcceptance::Ignore
324        }
325    }
326}
327
328/// Type alias for `TypedRelaxedFilter`. This filter accepts known Ethereum transaction types and
329/// ignores unknown ones without penalizing the peer.
330pub type RelaxedEthAnnouncementFilter = TypedRelaxedFilter;
331
332#[cfg(test)]
333mod tests {
334    use super::*;
335
336    #[test]
337    fn test_transaction_propagation_mode_from_str() {
338        // Test "sqrt" variant
339        assert_eq!(
340            TransactionPropagationMode::from_str("sqrt").unwrap(),
341            TransactionPropagationMode::Sqrt
342        );
343        assert_eq!(
344            TransactionPropagationMode::from_str("SQRT").unwrap(),
345            TransactionPropagationMode::Sqrt
346        );
347        assert_eq!(
348            TransactionPropagationMode::from_str("Sqrt").unwrap(),
349            TransactionPropagationMode::Sqrt
350        );
351
352        // Test "all" variant
353        assert_eq!(
354            TransactionPropagationMode::from_str("all").unwrap(),
355            TransactionPropagationMode::All
356        );
357        assert_eq!(
358            TransactionPropagationMode::from_str("ALL").unwrap(),
359            TransactionPropagationMode::All
360        );
361        assert_eq!(
362            TransactionPropagationMode::from_str("All").unwrap(),
363            TransactionPropagationMode::All
364        );
365
366        // Test "max:N" variant
367        assert_eq!(
368            TransactionPropagationMode::from_str("max:10").unwrap(),
369            TransactionPropagationMode::Max(10)
370        );
371        assert_eq!(
372            TransactionPropagationMode::from_str("MAX:42").unwrap(),
373            TransactionPropagationMode::Max(42)
374        );
375        assert_eq!(
376            TransactionPropagationMode::from_str("Max:100").unwrap(),
377            TransactionPropagationMode::Max(100)
378        );
379
380        // Test invalid inputs
381        assert!(TransactionPropagationMode::from_str("invalid").is_err());
382        assert!(TransactionPropagationMode::from_str("max:not_a_number").is_err());
383        assert!(TransactionPropagationMode::from_str("max:").is_err());
384        assert!(TransactionPropagationMode::from_str("max").is_err());
385        assert!(TransactionPropagationMode::from_str("").is_err());
386    }
387}