reth_network/transactions/
config.rs

1use core::fmt;
2use std::{fmt::Debug, str::FromStr};
3
4use super::{
5    PeerMetadata, DEFAULT_MAX_COUNT_TRANSACTIONS_SEEN_BY_PEER,
6    DEFAULT_SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESP_ON_PACK_GET_POOLED_TRANSACTIONS_REQ,
7    SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESPONSE,
8};
9use crate::transactions::constants::tx_fetcher::{
10    DEFAULT_MAX_CAPACITY_CACHE_PENDING_FETCH, DEFAULT_MAX_COUNT_CONCURRENT_REQUESTS,
11    DEFAULT_MAX_COUNT_CONCURRENT_REQUESTS_PER_PEER,
12};
13use alloy_eips::eip2718::IsTyped2718;
14use alloy_primitives::B256;
15use derive_more::{Constructor, Display};
16use reth_eth_wire::NetworkPrimitives;
17use reth_network_types::peers::kind::PeerKind;
18
19/// Configuration for managing transactions within the network.
20#[derive(Debug, Clone)]
21#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
22pub struct TransactionsManagerConfig {
23    /// Configuration for fetching transactions.
24    pub transaction_fetcher_config: TransactionFetcherConfig,
25    /// Max number of seen transactions to store for each peer.
26    pub max_transactions_seen_by_peer_history: u32,
27    /// How new pending transactions are propagated.
28    #[cfg_attr(feature = "serde", serde(default))]
29    pub propagation_mode: TransactionPropagationMode,
30    /// Which peers we accept incoming transactions or announcements from.
31    #[cfg_attr(feature = "serde", serde(default))]
32    pub ingress_policy: TransactionIngressPolicy,
33}
34
35impl Default for TransactionsManagerConfig {
36    fn default() -> Self {
37        Self {
38            transaction_fetcher_config: TransactionFetcherConfig::default(),
39            max_transactions_seen_by_peer_history: DEFAULT_MAX_COUNT_TRANSACTIONS_SEEN_BY_PEER,
40            propagation_mode: TransactionPropagationMode::default(),
41            ingress_policy: TransactionIngressPolicy::default(),
42        }
43    }
44}
45
46/// Determines how new pending transactions are propagated to other peers in full.
47#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
48#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
49pub enum TransactionPropagationMode {
50    /// Send full transactions to sqrt of current peers.
51    #[default]
52    Sqrt,
53    /// Always send transactions in full.
54    All,
55    /// Send full transactions to a maximum number of peers
56    Max(usize),
57}
58
59impl TransactionPropagationMode {
60    /// Returns the number of peers full transactions should be propagated to.
61    pub(crate) fn full_peer_count(&self, peer_count: usize) -> usize {
62        match self {
63            Self::Sqrt => (peer_count as f64).sqrt().round() as usize,
64            Self::All => peer_count,
65            Self::Max(max) => peer_count.min(*max),
66        }
67    }
68}
69impl FromStr for TransactionPropagationMode {
70    type Err = String;
71
72    fn from_str(s: &str) -> Result<Self, Self::Err> {
73        let s = s.to_lowercase();
74        match s.as_str() {
75            "sqrt" => Ok(Self::Sqrt),
76            "all" => Ok(Self::All),
77            s => {
78                if let Some(num) = s.strip_prefix("max:") {
79                    num.parse::<usize>()
80                        .map(TransactionPropagationMode::Max)
81                        .map_err(|_| format!("Invalid number for Max variant: {num}"))
82                } else {
83                    Err(format!("Invalid transaction propagation mode: {s}"))
84                }
85            }
86        }
87    }
88}
89
90/// Configuration for fetching transactions.
91#[derive(Debug, Constructor, Clone)]
92#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
93pub struct TransactionFetcherConfig {
94    /// Max inflight [`GetPooledTransactions`](reth_eth_wire::GetPooledTransactions) requests.
95    pub max_inflight_requests: u32,
96    /// Max inflight [`GetPooledTransactions`](reth_eth_wire::GetPooledTransactions) requests per
97    /// peer.
98    pub max_inflight_requests_per_peer: u8,
99    /// Soft limit for the byte size of a
100    /// [`PooledTransactions`](reth_eth_wire::PooledTransactions) response on assembling a
101    /// [`GetPooledTransactions`](reth_eth_wire::GetPooledTransactions) request. Spec'd at 2
102    /// MiB.
103    pub soft_limit_byte_size_pooled_transactions_response: usize,
104    /// Soft limit for the byte size of the expected
105    /// [`PooledTransactions`](reth_eth_wire::PooledTransactions) response on packing a
106    /// [`GetPooledTransactions`](reth_eth_wire::GetPooledTransactions) request with hashes.
107    pub soft_limit_byte_size_pooled_transactions_response_on_pack_request: usize,
108    /// Max capacity of the cache of transaction hashes, for transactions that weren't yet fetched.
109    /// A transaction is pending fetch if its hash didn't fit into a
110    /// [`GetPooledTransactions`](reth_eth_wire::GetPooledTransactions) yet, or it wasn't returned
111    /// upon request to peers.
112    pub max_capacity_cache_txns_pending_fetch: u32,
113}
114
115impl Default for TransactionFetcherConfig {
116    fn default() -> Self {
117        Self {
118            max_inflight_requests: DEFAULT_MAX_COUNT_CONCURRENT_REQUESTS,
119            max_inflight_requests_per_peer: DEFAULT_MAX_COUNT_CONCURRENT_REQUESTS_PER_PEER,
120            soft_limit_byte_size_pooled_transactions_response:
121                SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESPONSE,
122            soft_limit_byte_size_pooled_transactions_response_on_pack_request:
123                DEFAULT_SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESP_ON_PACK_GET_POOLED_TRANSACTIONS_REQ,
124                max_capacity_cache_txns_pending_fetch: DEFAULT_MAX_CAPACITY_CACHE_PENDING_FETCH,
125        }
126    }
127}
128
129/// A policy defining which peers pending transactions are gossiped to.
130pub trait TransactionPropagationPolicy<N: NetworkPrimitives>:
131    Send + Sync + Unpin + fmt::Debug + 'static
132{
133    /// Filter a given peer based on the policy.
134    ///
135    /// This determines whether transactions can be propagated to this peer.
136    fn can_propagate(&self, peer: &mut PeerMetadata<N>) -> bool;
137
138    /// A callback on the policy when a new peer session is established.
139    fn on_session_established(&mut self, peer: &mut PeerMetadata<N>);
140
141    /// A callback on the policy when a peer session is closed.
142    fn on_session_closed(&mut self, peer: &mut PeerMetadata<N>);
143}
144
145/// Determines which peers pending transactions are propagated to.
146#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Display)]
147#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
148pub enum TransactionPropagationKind {
149    /// Propagate transactions to all peers.
150    ///
151    /// No restrictions
152    #[default]
153    All,
154    /// Propagate transactions to only trusted peers.
155    Trusted,
156    /// Do not propagate transactions
157    None,
158}
159
160impl<N: NetworkPrimitives> TransactionPropagationPolicy<N> for TransactionPropagationKind {
161    fn can_propagate(&self, peer: &mut PeerMetadata<N>) -> bool {
162        match self {
163            Self::All => true,
164            Self::Trusted => peer.peer_kind.is_trusted(),
165            Self::None => false,
166        }
167    }
168
169    fn on_session_established(&mut self, _peer: &mut PeerMetadata<N>) {}
170
171    fn on_session_closed(&mut self, _peer: &mut PeerMetadata<N>) {}
172}
173
174impl FromStr for TransactionPropagationKind {
175    type Err = String;
176
177    fn from_str(s: &str) -> Result<Self, Self::Err> {
178        match s {
179            "All" | "all" => Ok(Self::All),
180            "Trusted" | "trusted" => Ok(Self::Trusted),
181            "None" | "none" => Ok(Self::None),
182            _ => Err(format!("Invalid transaction propagation policy: {s}")),
183        }
184    }
185}
186
187/// Determines which peers we will accept incoming transactions or announcements from.
188#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Display)]
189#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
190pub enum TransactionIngressPolicy {
191    /// Accept transactions from any peer.
192    #[default]
193    All,
194    /// Accept transactions only from trusted peers.
195    Trusted,
196    /// Drop all incoming transactions.
197    None,
198}
199
200impl TransactionIngressPolicy {
201    /// Returns true if the ingress policy allows the provided peer kind.
202    pub const fn allows(&self, peer_kind: PeerKind) -> bool {
203        match self {
204            Self::All => true,
205            Self::Trusted => peer_kind.is_trusted(),
206            Self::None => false,
207        }
208    }
209
210    /// Returns true if the ingress policy accepts transactions from any peer.
211    pub const fn allows_all(&self) -> bool {
212        matches!(self, Self::All)
213    }
214}
215
216impl FromStr for TransactionIngressPolicy {
217    type Err = String;
218
219    fn from_str(s: &str) -> Result<Self, Self::Err> {
220        match s {
221            "All" | "all" => Ok(Self::All),
222            "Trusted" | "trusted" => Ok(Self::Trusted),
223            "None" | "none" => Ok(Self::None),
224            _ => Err(format!("Invalid transaction ingress policy: {s}")),
225        }
226    }
227}
228
229/// Defines the outcome of evaluating a transaction against an `AnnouncementFilteringPolicy`.
230///
231/// Dictates how the `TransactionManager` should proceed on an announced transaction.
232#[derive(Debug, Clone, Copy, PartialEq, Eq)]
233pub enum AnnouncementAcceptance {
234    /// Accept the transaction announcement.
235    Accept,
236    /// Log the transaction but not fetching the transaction or penalizing the peer.
237    Ignore,
238    /// Reject
239    Reject {
240        /// If true, the peer sending this announcement should be penalized.
241        penalize_peer: bool,
242    },
243}
244
245/// A policy that defines how to handle incoming transaction announcements,
246/// particularly concerning transaction types and other announcement metadata.
247pub trait AnnouncementFilteringPolicy<N: NetworkPrimitives>:
248    Send + Sync + Unpin + fmt::Debug + 'static
249{
250    /// Decides how to handle a transaction announcement based on its type, hash, and size.
251    fn decide_on_announcement(&self, ty: u8, hash: &B256, size: usize) -> AnnouncementAcceptance;
252}
253
254/// A generic `AnnouncementFilteringPolicy` that enforces strict validation
255/// of transaction type based on a generic type `T`.
256#[derive(Debug, Clone, Default)]
257#[non_exhaustive]
258pub struct TypedStrictFilter;
259
260impl<N: NetworkPrimitives> AnnouncementFilteringPolicy<N> for TypedStrictFilter {
261    fn decide_on_announcement(&self, ty: u8, hash: &B256, size: usize) -> AnnouncementAcceptance {
262        if N::PooledTransaction::is_type(ty) {
263            AnnouncementAcceptance::Accept
264        } else {
265            tracing::trace!(target: "net::tx::policy::strict_typed",
266                %ty,
267                %size,
268                %hash,
269                "Invalid or unrecognized transaction type byte. Rejecting entry and recommending peer penalization."
270            );
271            AnnouncementAcceptance::Reject { penalize_peer: true }
272        }
273    }
274}
275
276/// Type alias for a `TypedStrictFilter`. This is the default strict announcement filter.
277pub type StrictEthAnnouncementFilter = TypedStrictFilter;
278
279/// An [`AnnouncementFilteringPolicy`] that permissively handles unknown type bytes
280/// based on a given type `T` using `T::try_from(u8)`.
281///
282/// If `T::try_from(ty)` succeeds, the announcement is accepted. Otherwise, it's ignored.
283#[derive(Debug, Clone, Default)]
284#[non_exhaustive]
285pub struct TypedRelaxedFilter;
286
287impl<N: NetworkPrimitives> AnnouncementFilteringPolicy<N> for TypedRelaxedFilter {
288    fn decide_on_announcement(&self, ty: u8, hash: &B256, size: usize) -> AnnouncementAcceptance {
289        if N::PooledTransaction::is_type(ty) {
290            AnnouncementAcceptance::Accept
291        } else {
292            tracing::trace!(target: "net::tx::policy::relaxed_typed",
293                %ty,
294                %size,
295                %hash,
296                "Unknown transaction type byte. Ignoring entry."
297            );
298            AnnouncementAcceptance::Ignore
299        }
300    }
301}
302
303/// Type alias for `TypedRelaxedFilter`. This filter accepts known Ethereum transaction types and
304/// ignores unknown ones without penalizing the peer.
305pub type RelaxedEthAnnouncementFilter = TypedRelaxedFilter;
306
307#[cfg(test)]
308mod tests {
309    use super::*;
310
311    #[test]
312    fn test_transaction_propagation_mode_from_str() {
313        // Test "sqrt" variant
314        assert_eq!(
315            TransactionPropagationMode::from_str("sqrt").unwrap(),
316            TransactionPropagationMode::Sqrt
317        );
318        assert_eq!(
319            TransactionPropagationMode::from_str("SQRT").unwrap(),
320            TransactionPropagationMode::Sqrt
321        );
322        assert_eq!(
323            TransactionPropagationMode::from_str("Sqrt").unwrap(),
324            TransactionPropagationMode::Sqrt
325        );
326
327        // Test "all" variant
328        assert_eq!(
329            TransactionPropagationMode::from_str("all").unwrap(),
330            TransactionPropagationMode::All
331        );
332        assert_eq!(
333            TransactionPropagationMode::from_str("ALL").unwrap(),
334            TransactionPropagationMode::All
335        );
336        assert_eq!(
337            TransactionPropagationMode::from_str("All").unwrap(),
338            TransactionPropagationMode::All
339        );
340
341        // Test "max:N" variant
342        assert_eq!(
343            TransactionPropagationMode::from_str("max:10").unwrap(),
344            TransactionPropagationMode::Max(10)
345        );
346        assert_eq!(
347            TransactionPropagationMode::from_str("MAX:42").unwrap(),
348            TransactionPropagationMode::Max(42)
349        );
350        assert_eq!(
351            TransactionPropagationMode::from_str("Max:100").unwrap(),
352            TransactionPropagationMode::Max(100)
353        );
354
355        // Test invalid inputs
356        assert!(TransactionPropagationMode::from_str("invalid").is_err());
357        assert!(TransactionPropagationMode::from_str("max:not_a_number").is_err());
358        assert!(TransactionPropagationMode::from_str("max:").is_err());
359        assert!(TransactionPropagationMode::from_str("max").is_err());
360        assert!(TransactionPropagationMode::from_str("").is_err());
361    }
362}