reth_network/transactions/
config.rs1use core::fmt;
2use std::{fmt::Debug, str::FromStr};
3
4use super::{
5 PeerMetadata, DEFAULT_MAX_COUNT_TRANSACTIONS_SEEN_BY_PEER,
6 DEFAULT_SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESP_ON_PACK_GET_POOLED_TRANSACTIONS_REQ,
7 SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESPONSE,
8};
9use crate::transactions::constants::{
10 tx_fetcher::{
11 DEFAULT_MAX_CAPACITY_CACHE_PENDING_FETCH, DEFAULT_MAX_COUNT_CONCURRENT_REQUESTS,
12 DEFAULT_MAX_COUNT_CONCURRENT_REQUESTS_PER_PEER,
13 },
14 tx_manager::DEFAULT_TX_MANAGER_CHANNEL_MEMORY_LIMIT_BYTES,
15};
16use alloy_eips::eip2718::IsTyped2718;
17use alloy_primitives::B256;
18use derive_more::{Constructor, Display};
19use reth_eth_wire::NetworkPrimitives;
20use reth_network_types::peers::kind::PeerKind;
21
22#[derive(Debug, Clone)]
24#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
25pub struct TransactionsManagerConfig {
26 pub transaction_fetcher_config: TransactionFetcherConfig,
28 pub max_transactions_seen_by_peer_history: u32,
30 #[cfg_attr(feature = "serde", serde(default))]
32 pub propagation_mode: TransactionPropagationMode,
33 #[cfg_attr(feature = "serde", serde(default))]
35 pub ingress_policy: TransactionIngressPolicy,
36 #[cfg_attr(feature = "serde", serde(default = "default_tx_channel_memory_limit_bytes"))]
41 pub tx_channel_memory_limit_bytes: usize,
42}
43
44#[cfg(feature = "serde")]
45const fn default_tx_channel_memory_limit_bytes() -> usize {
46 DEFAULT_TX_MANAGER_CHANNEL_MEMORY_LIMIT_BYTES
47}
48
49impl Default for TransactionsManagerConfig {
50 fn default() -> Self {
51 Self {
52 transaction_fetcher_config: TransactionFetcherConfig::default(),
53 max_transactions_seen_by_peer_history: DEFAULT_MAX_COUNT_TRANSACTIONS_SEEN_BY_PEER,
54 propagation_mode: TransactionPropagationMode::default(),
55 ingress_policy: TransactionIngressPolicy::default(),
56 tx_channel_memory_limit_bytes: DEFAULT_TX_MANAGER_CHANNEL_MEMORY_LIMIT_BYTES,
57 }
58 }
59}
60
61#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
63#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
64pub enum TransactionPropagationMode {
65 #[default]
67 Sqrt,
68 All,
70 Max(usize),
72}
73
74impl TransactionPropagationMode {
75 pub(crate) fn full_peer_count(&self, peer_count: usize) -> usize {
77 match self {
78 Self::Sqrt => (peer_count as f64).sqrt().round() as usize,
79 Self::All => peer_count,
80 Self::Max(max) => peer_count.min(*max),
81 }
82 }
83}
84impl std::fmt::Display for TransactionPropagationMode {
85 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
86 match self {
87 Self::Sqrt => write!(f, "sqrt"),
88 Self::All => write!(f, "all"),
89 Self::Max(max) => write!(f, "max:{max}"),
90 }
91 }
92}
93
94impl FromStr for TransactionPropagationMode {
95 type Err = String;
96
97 fn from_str(s: &str) -> Result<Self, Self::Err> {
98 let s = s.to_lowercase();
99 match s.as_str() {
100 "sqrt" => Ok(Self::Sqrt),
101 "all" => Ok(Self::All),
102 s => {
103 if let Some(num) = s.strip_prefix("max:") {
104 num.parse::<usize>()
105 .map(TransactionPropagationMode::Max)
106 .map_err(|_| format!("Invalid number for Max variant: {num}"))
107 } else {
108 Err(format!("Invalid transaction propagation mode: {s}"))
109 }
110 }
111 }
112 }
113}
114
115#[derive(Debug, Constructor, Clone)]
117#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
118pub struct TransactionFetcherConfig {
119 pub max_inflight_requests: u32,
121 pub max_inflight_requests_per_peer: u8,
124 pub soft_limit_byte_size_pooled_transactions_response: usize,
129 pub soft_limit_byte_size_pooled_transactions_response_on_pack_request: usize,
133 pub max_capacity_cache_txns_pending_fetch: u32,
138}
139
140impl Default for TransactionFetcherConfig {
141 fn default() -> Self {
142 Self {
143 max_inflight_requests: DEFAULT_MAX_COUNT_CONCURRENT_REQUESTS,
144 max_inflight_requests_per_peer: DEFAULT_MAX_COUNT_CONCURRENT_REQUESTS_PER_PEER,
145 soft_limit_byte_size_pooled_transactions_response:
146 SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESPONSE,
147 soft_limit_byte_size_pooled_transactions_response_on_pack_request:
148 DEFAULT_SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESP_ON_PACK_GET_POOLED_TRANSACTIONS_REQ,
149 max_capacity_cache_txns_pending_fetch: DEFAULT_MAX_CAPACITY_CACHE_PENDING_FETCH,
150 }
151 }
152}
153
154pub trait TransactionPropagationPolicy<N: NetworkPrimitives>:
156 Send + Sync + Unpin + fmt::Debug + 'static
157{
158 fn can_propagate(&self, peer: &mut PeerMetadata<N>) -> bool;
162
163 fn on_session_established(&mut self, peer: &mut PeerMetadata<N>);
165
166 fn on_session_closed(&mut self, peer: &mut PeerMetadata<N>);
168}
169
170#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Display)]
172#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
173pub enum TransactionPropagationKind {
174 #[default]
178 All,
179 Trusted,
181 None,
183}
184
185impl<N: NetworkPrimitives> TransactionPropagationPolicy<N> for TransactionPropagationKind {
186 fn can_propagate(&self, peer: &mut PeerMetadata<N>) -> bool {
187 match self {
188 Self::All => true,
189 Self::Trusted => peer.peer_kind.is_trusted(),
190 Self::None => false,
191 }
192 }
193
194 fn on_session_established(&mut self, _peer: &mut PeerMetadata<N>) {}
195
196 fn on_session_closed(&mut self, _peer: &mut PeerMetadata<N>) {}
197}
198
199impl FromStr for TransactionPropagationKind {
200 type Err = String;
201
202 fn from_str(s: &str) -> Result<Self, Self::Err> {
203 match s {
204 "All" | "all" => Ok(Self::All),
205 "Trusted" | "trusted" => Ok(Self::Trusted),
206 "None" | "none" => Ok(Self::None),
207 _ => Err(format!("Invalid transaction propagation policy: {s}")),
208 }
209 }
210}
211
212#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Display)]
214#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
215pub enum TransactionIngressPolicy {
216 #[default]
218 All,
219 Trusted,
221 None,
223}
224
225impl TransactionIngressPolicy {
226 pub const fn allows(&self, peer_kind: PeerKind) -> bool {
228 match self {
229 Self::All => true,
230 Self::Trusted => peer_kind.is_trusted(),
231 Self::None => false,
232 }
233 }
234
235 pub const fn allows_all(&self) -> bool {
237 matches!(self, Self::All)
238 }
239}
240
241impl FromStr for TransactionIngressPolicy {
242 type Err = String;
243
244 fn from_str(s: &str) -> Result<Self, Self::Err> {
245 match s {
246 "All" | "all" => Ok(Self::All),
247 "Trusted" | "trusted" => Ok(Self::Trusted),
248 "None" | "none" => Ok(Self::None),
249 _ => Err(format!("Invalid transaction ingress policy: {s}")),
250 }
251 }
252}
253
254#[derive(Debug, Clone, Copy, PartialEq, Eq)]
258pub enum AnnouncementAcceptance {
259 Accept,
261 Ignore,
263 Reject {
265 penalize_peer: bool,
267 },
268}
269
270pub trait AnnouncementFilteringPolicy<N: NetworkPrimitives>:
273 Send + Sync + Unpin + fmt::Debug + 'static
274{
275 fn decide_on_announcement(&self, ty: u8, hash: &B256, size: usize) -> AnnouncementAcceptance;
277}
278
279#[derive(Debug, Clone, Default)]
282#[non_exhaustive]
283pub struct TypedStrictFilter;
284
285impl<N: NetworkPrimitives> AnnouncementFilteringPolicy<N> for TypedStrictFilter {
286 fn decide_on_announcement(&self, ty: u8, hash: &B256, size: usize) -> AnnouncementAcceptance {
287 if N::PooledTransaction::is_type(ty) {
288 AnnouncementAcceptance::Accept
289 } else {
290 tracing::trace!(target: "net::tx::policy::strict_typed",
291 %ty,
292 %size,
293 %hash,
294 "Invalid or unrecognized transaction type byte. Rejecting entry and recommending peer penalization."
295 );
296 AnnouncementAcceptance::Reject { penalize_peer: true }
297 }
298 }
299}
300
301pub type StrictEthAnnouncementFilter = TypedStrictFilter;
303
304#[derive(Debug, Clone, Default)]
309#[non_exhaustive]
310pub struct TypedRelaxedFilter;
311
312impl<N: NetworkPrimitives> AnnouncementFilteringPolicy<N> for TypedRelaxedFilter {
313 fn decide_on_announcement(&self, ty: u8, hash: &B256, size: usize) -> AnnouncementAcceptance {
314 if N::PooledTransaction::is_type(ty) {
315 AnnouncementAcceptance::Accept
316 } else {
317 tracing::trace!(target: "net::tx::policy::relaxed_typed",
318 %ty,
319 %size,
320 %hash,
321 "Unknown transaction type byte. Ignoring entry."
322 );
323 AnnouncementAcceptance::Ignore
324 }
325 }
326}
327
328pub type RelaxedEthAnnouncementFilter = TypedRelaxedFilter;
331
332#[cfg(test)]
333mod tests {
334 use super::*;
335
336 #[test]
337 fn test_transaction_propagation_mode_from_str() {
338 assert_eq!(
340 TransactionPropagationMode::from_str("sqrt").unwrap(),
341 TransactionPropagationMode::Sqrt
342 );
343 assert_eq!(
344 TransactionPropagationMode::from_str("SQRT").unwrap(),
345 TransactionPropagationMode::Sqrt
346 );
347 assert_eq!(
348 TransactionPropagationMode::from_str("Sqrt").unwrap(),
349 TransactionPropagationMode::Sqrt
350 );
351
352 assert_eq!(
354 TransactionPropagationMode::from_str("all").unwrap(),
355 TransactionPropagationMode::All
356 );
357 assert_eq!(
358 TransactionPropagationMode::from_str("ALL").unwrap(),
359 TransactionPropagationMode::All
360 );
361 assert_eq!(
362 TransactionPropagationMode::from_str("All").unwrap(),
363 TransactionPropagationMode::All
364 );
365
366 assert_eq!(
368 TransactionPropagationMode::from_str("max:10").unwrap(),
369 TransactionPropagationMode::Max(10)
370 );
371 assert_eq!(
372 TransactionPropagationMode::from_str("MAX:42").unwrap(),
373 TransactionPropagationMode::Max(42)
374 );
375 assert_eq!(
376 TransactionPropagationMode::from_str("Max:100").unwrap(),
377 TransactionPropagationMode::Max(100)
378 );
379
380 assert!(TransactionPropagationMode::from_str("invalid").is_err());
382 assert!(TransactionPropagationMode::from_str("max:not_a_number").is_err());
383 assert!(TransactionPropagationMode::from_str("max:").is_err());
384 assert!(TransactionPropagationMode::from_str("max").is_err());
385 assert!(TransactionPropagationMode::from_str("").is_err());
386 }
387}