Skip to main content

reth_network/transactions/
config.rs

1use core::fmt;
2use std::{fmt::Debug, str::FromStr};
3
4use super::{
5    PeerMetadata, DEFAULT_MAX_COUNT_TRANSACTIONS_SEEN_BY_PEER,
6    DEFAULT_SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESP_ON_PACK_GET_POOLED_TRANSACTIONS_REQ,
7    SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESPONSE,
8};
9use crate::transactions::constants::tx_fetcher::{
10    DEFAULT_MAX_CAPACITY_CACHE_PENDING_FETCH, DEFAULT_MAX_COUNT_CONCURRENT_REQUESTS,
11    DEFAULT_MAX_COUNT_CONCURRENT_REQUESTS_PER_PEER,
12};
13use alloy_eips::eip2718::IsTyped2718;
14use alloy_primitives::B256;
15use derive_more::{Constructor, Display};
16use reth_eth_wire::NetworkPrimitives;
17use reth_network_types::peers::kind::PeerKind;
18
19/// Configuration for managing transactions within the network.
20#[derive(Debug, Clone)]
21#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
22pub struct TransactionsManagerConfig {
23    /// Configuration for fetching transactions.
24    pub transaction_fetcher_config: TransactionFetcherConfig,
25    /// Max number of seen transactions to store for each peer.
26    pub max_transactions_seen_by_peer_history: u32,
27    /// How new pending transactions are propagated.
28    #[cfg_attr(feature = "serde", serde(default))]
29    pub propagation_mode: TransactionPropagationMode,
30    /// Which peers we accept incoming transactions or announcements from.
31    #[cfg_attr(feature = "serde", serde(default))]
32    pub ingress_policy: TransactionIngressPolicy,
33}
34
35impl Default for TransactionsManagerConfig {
36    fn default() -> Self {
37        Self {
38            transaction_fetcher_config: TransactionFetcherConfig::default(),
39            max_transactions_seen_by_peer_history: DEFAULT_MAX_COUNT_TRANSACTIONS_SEEN_BY_PEER,
40            propagation_mode: TransactionPropagationMode::default(),
41            ingress_policy: TransactionIngressPolicy::default(),
42        }
43    }
44}
45
46/// Determines how new pending transactions are propagated to other peers in full.
47#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
48#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
49pub enum TransactionPropagationMode {
50    /// Send full transactions to sqrt of current peers.
51    #[default]
52    Sqrt,
53    /// Always send transactions in full.
54    All,
55    /// Send full transactions to a maximum number of peers
56    Max(usize),
57}
58
59impl TransactionPropagationMode {
60    /// Returns the number of peers full transactions should be propagated to.
61    pub(crate) fn full_peer_count(&self, peer_count: usize) -> usize {
62        match self {
63            Self::Sqrt => (peer_count as f64).sqrt().round() as usize,
64            Self::All => peer_count,
65            Self::Max(max) => peer_count.min(*max),
66        }
67    }
68}
69impl std::fmt::Display for TransactionPropagationMode {
70    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
71        match self {
72            Self::Sqrt => write!(f, "sqrt"),
73            Self::All => write!(f, "all"),
74            Self::Max(max) => write!(f, "max:{max}"),
75        }
76    }
77}
78
79impl FromStr for TransactionPropagationMode {
80    type Err = String;
81
82    fn from_str(s: &str) -> Result<Self, Self::Err> {
83        let s = s.to_lowercase();
84        match s.as_str() {
85            "sqrt" => Ok(Self::Sqrt),
86            "all" => Ok(Self::All),
87            s => {
88                if let Some(num) = s.strip_prefix("max:") {
89                    num.parse::<usize>()
90                        .map(TransactionPropagationMode::Max)
91                        .map_err(|_| format!("Invalid number for Max variant: {num}"))
92                } else {
93                    Err(format!("Invalid transaction propagation mode: {s}"))
94                }
95            }
96        }
97    }
98}
99
100/// Configuration for fetching transactions.
101#[derive(Debug, Constructor, Clone)]
102#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
103pub struct TransactionFetcherConfig {
104    /// Max inflight [`GetPooledTransactions`](reth_eth_wire::GetPooledTransactions) requests.
105    pub max_inflight_requests: u32,
106    /// Max inflight [`GetPooledTransactions`](reth_eth_wire::GetPooledTransactions) requests per
107    /// peer.
108    pub max_inflight_requests_per_peer: u8,
109    /// Soft limit for the byte size of a
110    /// [`PooledTransactions`](reth_eth_wire::PooledTransactions) response on assembling a
111    /// [`GetPooledTransactions`](reth_eth_wire::GetPooledTransactions) request. Spec'd at 2
112    /// MiB.
113    pub soft_limit_byte_size_pooled_transactions_response: usize,
114    /// Soft limit for the byte size of the expected
115    /// [`PooledTransactions`](reth_eth_wire::PooledTransactions) response on packing a
116    /// [`GetPooledTransactions`](reth_eth_wire::GetPooledTransactions) request with hashes.
117    pub soft_limit_byte_size_pooled_transactions_response_on_pack_request: usize,
118    /// Max capacity of the cache of transaction hashes, for transactions that weren't yet fetched.
119    /// A transaction is pending fetch if its hash didn't fit into a
120    /// [`GetPooledTransactions`](reth_eth_wire::GetPooledTransactions) yet, or it wasn't returned
121    /// upon request to peers.
122    pub max_capacity_cache_txns_pending_fetch: u32,
123}
124
125impl Default for TransactionFetcherConfig {
126    fn default() -> Self {
127        Self {
128            max_inflight_requests: DEFAULT_MAX_COUNT_CONCURRENT_REQUESTS,
129            max_inflight_requests_per_peer: DEFAULT_MAX_COUNT_CONCURRENT_REQUESTS_PER_PEER,
130            soft_limit_byte_size_pooled_transactions_response:
131                SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESPONSE,
132            soft_limit_byte_size_pooled_transactions_response_on_pack_request:
133                DEFAULT_SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESP_ON_PACK_GET_POOLED_TRANSACTIONS_REQ,
134                max_capacity_cache_txns_pending_fetch: DEFAULT_MAX_CAPACITY_CACHE_PENDING_FETCH,
135        }
136    }
137}
138
139/// A policy defining which peers pending transactions are gossiped to.
140pub trait TransactionPropagationPolicy<N: NetworkPrimitives>:
141    Send + Sync + Unpin + fmt::Debug + 'static
142{
143    /// Filter a given peer based on the policy.
144    ///
145    /// This determines whether transactions can be propagated to this peer.
146    fn can_propagate(&self, peer: &mut PeerMetadata<N>) -> bool;
147
148    /// A callback on the policy when a new peer session is established.
149    fn on_session_established(&mut self, peer: &mut PeerMetadata<N>);
150
151    /// A callback on the policy when a peer session is closed.
152    fn on_session_closed(&mut self, peer: &mut PeerMetadata<N>);
153}
154
155/// Determines which peers pending transactions are propagated to.
156#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Display)]
157#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
158pub enum TransactionPropagationKind {
159    /// Propagate transactions to all peers.
160    ///
161    /// No restrictions
162    #[default]
163    All,
164    /// Propagate transactions to only trusted peers.
165    Trusted,
166    /// Do not propagate transactions
167    None,
168}
169
170impl<N: NetworkPrimitives> TransactionPropagationPolicy<N> for TransactionPropagationKind {
171    fn can_propagate(&self, peer: &mut PeerMetadata<N>) -> bool {
172        match self {
173            Self::All => true,
174            Self::Trusted => peer.peer_kind.is_trusted(),
175            Self::None => false,
176        }
177    }
178
179    fn on_session_established(&mut self, _peer: &mut PeerMetadata<N>) {}
180
181    fn on_session_closed(&mut self, _peer: &mut PeerMetadata<N>) {}
182}
183
184impl FromStr for TransactionPropagationKind {
185    type Err = String;
186
187    fn from_str(s: &str) -> Result<Self, Self::Err> {
188        match s {
189            "All" | "all" => Ok(Self::All),
190            "Trusted" | "trusted" => Ok(Self::Trusted),
191            "None" | "none" => Ok(Self::None),
192            _ => Err(format!("Invalid transaction propagation policy: {s}")),
193        }
194    }
195}
196
197/// Determines which peers we will accept incoming transactions or announcements from.
198#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Display)]
199#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
200pub enum TransactionIngressPolicy {
201    /// Accept transactions from any peer.
202    #[default]
203    All,
204    /// Accept transactions only from trusted peers.
205    Trusted,
206    /// Drop all incoming transactions.
207    None,
208}
209
210impl TransactionIngressPolicy {
211    /// Returns true if the ingress policy allows the provided peer kind.
212    pub const fn allows(&self, peer_kind: PeerKind) -> bool {
213        match self {
214            Self::All => true,
215            Self::Trusted => peer_kind.is_trusted(),
216            Self::None => false,
217        }
218    }
219
220    /// Returns true if the ingress policy accepts transactions from any peer.
221    pub const fn allows_all(&self) -> bool {
222        matches!(self, Self::All)
223    }
224}
225
226impl FromStr for TransactionIngressPolicy {
227    type Err = String;
228
229    fn from_str(s: &str) -> Result<Self, Self::Err> {
230        match s {
231            "All" | "all" => Ok(Self::All),
232            "Trusted" | "trusted" => Ok(Self::Trusted),
233            "None" | "none" => Ok(Self::None),
234            _ => Err(format!("Invalid transaction ingress policy: {s}")),
235        }
236    }
237}
238
239/// Defines the outcome of evaluating a transaction against an `AnnouncementFilteringPolicy`.
240///
241/// Dictates how the `TransactionManager` should proceed on an announced transaction.
242#[derive(Debug, Clone, Copy, PartialEq, Eq)]
243pub enum AnnouncementAcceptance {
244    /// Accept the transaction announcement.
245    Accept,
246    /// Log the transaction but not fetching the transaction or penalizing the peer.
247    Ignore,
248    /// Reject
249    Reject {
250        /// If true, the peer sending this announcement should be penalized.
251        penalize_peer: bool,
252    },
253}
254
255/// A policy that defines how to handle incoming transaction announcements,
256/// particularly concerning transaction types and other announcement metadata.
257pub trait AnnouncementFilteringPolicy<N: NetworkPrimitives>:
258    Send + Sync + Unpin + fmt::Debug + 'static
259{
260    /// Decides how to handle a transaction announcement based on its type, hash, and size.
261    fn decide_on_announcement(&self, ty: u8, hash: &B256, size: usize) -> AnnouncementAcceptance;
262}
263
264/// A generic `AnnouncementFilteringPolicy` that enforces strict validation
265/// of transaction type based on a generic type `T`.
266#[derive(Debug, Clone, Default)]
267#[non_exhaustive]
268pub struct TypedStrictFilter;
269
270impl<N: NetworkPrimitives> AnnouncementFilteringPolicy<N> for TypedStrictFilter {
271    fn decide_on_announcement(&self, ty: u8, hash: &B256, size: usize) -> AnnouncementAcceptance {
272        if N::PooledTransaction::is_type(ty) {
273            AnnouncementAcceptance::Accept
274        } else {
275            tracing::trace!(target: "net::tx::policy::strict_typed",
276                %ty,
277                %size,
278                %hash,
279                "Invalid or unrecognized transaction type byte. Rejecting entry and recommending peer penalization."
280            );
281            AnnouncementAcceptance::Reject { penalize_peer: true }
282        }
283    }
284}
285
286/// Type alias for a `TypedStrictFilter`. This is the default strict announcement filter.
287pub type StrictEthAnnouncementFilter = TypedStrictFilter;
288
289/// An [`AnnouncementFilteringPolicy`] that permissively handles unknown type bytes
290/// based on a given type `T` using `T::try_from(u8)`.
291///
292/// If `T::try_from(ty)` succeeds, the announcement is accepted. Otherwise, it's ignored.
293#[derive(Debug, Clone, Default)]
294#[non_exhaustive]
295pub struct TypedRelaxedFilter;
296
297impl<N: NetworkPrimitives> AnnouncementFilteringPolicy<N> for TypedRelaxedFilter {
298    fn decide_on_announcement(&self, ty: u8, hash: &B256, size: usize) -> AnnouncementAcceptance {
299        if N::PooledTransaction::is_type(ty) {
300            AnnouncementAcceptance::Accept
301        } else {
302            tracing::trace!(target: "net::tx::policy::relaxed_typed",
303                %ty,
304                %size,
305                %hash,
306                "Unknown transaction type byte. Ignoring entry."
307            );
308            AnnouncementAcceptance::Ignore
309        }
310    }
311}
312
313/// Type alias for `TypedRelaxedFilter`. This filter accepts known Ethereum transaction types and
314/// ignores unknown ones without penalizing the peer.
315pub type RelaxedEthAnnouncementFilter = TypedRelaxedFilter;
316
317#[cfg(test)]
318mod tests {
319    use super::*;
320
321    #[test]
322    fn test_transaction_propagation_mode_from_str() {
323        // Test "sqrt" variant
324        assert_eq!(
325            TransactionPropagationMode::from_str("sqrt").unwrap(),
326            TransactionPropagationMode::Sqrt
327        );
328        assert_eq!(
329            TransactionPropagationMode::from_str("SQRT").unwrap(),
330            TransactionPropagationMode::Sqrt
331        );
332        assert_eq!(
333            TransactionPropagationMode::from_str("Sqrt").unwrap(),
334            TransactionPropagationMode::Sqrt
335        );
336
337        // Test "all" variant
338        assert_eq!(
339            TransactionPropagationMode::from_str("all").unwrap(),
340            TransactionPropagationMode::All
341        );
342        assert_eq!(
343            TransactionPropagationMode::from_str("ALL").unwrap(),
344            TransactionPropagationMode::All
345        );
346        assert_eq!(
347            TransactionPropagationMode::from_str("All").unwrap(),
348            TransactionPropagationMode::All
349        );
350
351        // Test "max:N" variant
352        assert_eq!(
353            TransactionPropagationMode::from_str("max:10").unwrap(),
354            TransactionPropagationMode::Max(10)
355        );
356        assert_eq!(
357            TransactionPropagationMode::from_str("MAX:42").unwrap(),
358            TransactionPropagationMode::Max(42)
359        );
360        assert_eq!(
361            TransactionPropagationMode::from_str("Max:100").unwrap(),
362            TransactionPropagationMode::Max(100)
363        );
364
365        // Test invalid inputs
366        assert!(TransactionPropagationMode::from_str("invalid").is_err());
367        assert!(TransactionPropagationMode::from_str("max:not_a_number").is_err());
368        assert!(TransactionPropagationMode::from_str("max:").is_err());
369        assert!(TransactionPropagationMode::from_str("max").is_err());
370        assert!(TransactionPropagationMode::from_str("").is_err());
371    }
372}