reth_network/transactions/
config.rs1use core::fmt;
2use std::{fmt::Debug, str::FromStr};
3
4use super::{
5 PeerMetadata, DEFAULT_MAX_COUNT_TRANSACTIONS_SEEN_BY_PEER,
6 DEFAULT_SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESP_ON_PACK_GET_POOLED_TRANSACTIONS_REQ,
7 SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESPONSE,
8};
9use crate::transactions::constants::tx_fetcher::{
10 DEFAULT_MAX_CAPACITY_CACHE_PENDING_FETCH, DEFAULT_MAX_COUNT_CONCURRENT_REQUESTS,
11 DEFAULT_MAX_COUNT_CONCURRENT_REQUESTS_PER_PEER,
12};
13use alloy_eips::eip2718::IsTyped2718;
14use alloy_primitives::B256;
15use derive_more::{Constructor, Display};
16use reth_eth_wire::NetworkPrimitives;
17use reth_network_types::peers::kind::PeerKind;
18
19#[derive(Debug, Clone)]
21#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
22pub struct TransactionsManagerConfig {
23 pub transaction_fetcher_config: TransactionFetcherConfig,
25 pub max_transactions_seen_by_peer_history: u32,
27 #[cfg_attr(feature = "serde", serde(default))]
29 pub propagation_mode: TransactionPropagationMode,
30 #[cfg_attr(feature = "serde", serde(default))]
32 pub ingress_policy: TransactionIngressPolicy,
33}
34
35impl Default for TransactionsManagerConfig {
36 fn default() -> Self {
37 Self {
38 transaction_fetcher_config: TransactionFetcherConfig::default(),
39 max_transactions_seen_by_peer_history: DEFAULT_MAX_COUNT_TRANSACTIONS_SEEN_BY_PEER,
40 propagation_mode: TransactionPropagationMode::default(),
41 ingress_policy: TransactionIngressPolicy::default(),
42 }
43 }
44}
45
46#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
48#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
49pub enum TransactionPropagationMode {
50 #[default]
52 Sqrt,
53 All,
55 Max(usize),
57}
58
59impl TransactionPropagationMode {
60 pub(crate) fn full_peer_count(&self, peer_count: usize) -> usize {
62 match self {
63 Self::Sqrt => (peer_count as f64).sqrt().round() as usize,
64 Self::All => peer_count,
65 Self::Max(max) => peer_count.min(*max),
66 }
67 }
68}
69impl FromStr for TransactionPropagationMode {
70 type Err = String;
71
72 fn from_str(s: &str) -> Result<Self, Self::Err> {
73 let s = s.to_lowercase();
74 match s.as_str() {
75 "sqrt" => Ok(Self::Sqrt),
76 "all" => Ok(Self::All),
77 s => {
78 if let Some(num) = s.strip_prefix("max:") {
79 num.parse::<usize>()
80 .map(TransactionPropagationMode::Max)
81 .map_err(|_| format!("Invalid number for Max variant: {num}"))
82 } else {
83 Err(format!("Invalid transaction propagation mode: {s}"))
84 }
85 }
86 }
87 }
88}
89
90#[derive(Debug, Constructor, Clone)]
92#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
93pub struct TransactionFetcherConfig {
94 pub max_inflight_requests: u32,
96 pub max_inflight_requests_per_peer: u8,
99 pub soft_limit_byte_size_pooled_transactions_response: usize,
104 pub soft_limit_byte_size_pooled_transactions_response_on_pack_request: usize,
108 pub max_capacity_cache_txns_pending_fetch: u32,
113}
114
115impl Default for TransactionFetcherConfig {
116 fn default() -> Self {
117 Self {
118 max_inflight_requests: DEFAULT_MAX_COUNT_CONCURRENT_REQUESTS,
119 max_inflight_requests_per_peer: DEFAULT_MAX_COUNT_CONCURRENT_REQUESTS_PER_PEER,
120 soft_limit_byte_size_pooled_transactions_response:
121 SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESPONSE,
122 soft_limit_byte_size_pooled_transactions_response_on_pack_request:
123 DEFAULT_SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESP_ON_PACK_GET_POOLED_TRANSACTIONS_REQ,
124 max_capacity_cache_txns_pending_fetch: DEFAULT_MAX_CAPACITY_CACHE_PENDING_FETCH,
125 }
126 }
127}
128
129pub trait TransactionPropagationPolicy<N: NetworkPrimitives>:
131 Send + Sync + Unpin + fmt::Debug + 'static
132{
133 fn can_propagate(&self, peer: &mut PeerMetadata<N>) -> bool;
137
138 fn on_session_established(&mut self, peer: &mut PeerMetadata<N>);
140
141 fn on_session_closed(&mut self, peer: &mut PeerMetadata<N>);
143}
144
145#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Display)]
147#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
148pub enum TransactionPropagationKind {
149 #[default]
153 All,
154 Trusted,
156 None,
158}
159
160impl<N: NetworkPrimitives> TransactionPropagationPolicy<N> for TransactionPropagationKind {
161 fn can_propagate(&self, peer: &mut PeerMetadata<N>) -> bool {
162 match self {
163 Self::All => true,
164 Self::Trusted => peer.peer_kind.is_trusted(),
165 Self::None => false,
166 }
167 }
168
169 fn on_session_established(&mut self, _peer: &mut PeerMetadata<N>) {}
170
171 fn on_session_closed(&mut self, _peer: &mut PeerMetadata<N>) {}
172}
173
174impl FromStr for TransactionPropagationKind {
175 type Err = String;
176
177 fn from_str(s: &str) -> Result<Self, Self::Err> {
178 match s {
179 "All" | "all" => Ok(Self::All),
180 "Trusted" | "trusted" => Ok(Self::Trusted),
181 "None" | "none" => Ok(Self::None),
182 _ => Err(format!("Invalid transaction propagation policy: {s}")),
183 }
184 }
185}
186
187#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Display)]
189#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
190pub enum TransactionIngressPolicy {
191 #[default]
193 All,
194 Trusted,
196 None,
198}
199
200impl TransactionIngressPolicy {
201 pub const fn allows(&self, peer_kind: PeerKind) -> bool {
203 match self {
204 Self::All => true,
205 Self::Trusted => peer_kind.is_trusted(),
206 Self::None => false,
207 }
208 }
209
210 pub const fn allows_all(&self) -> bool {
212 matches!(self, Self::All)
213 }
214}
215
216impl FromStr for TransactionIngressPolicy {
217 type Err = String;
218
219 fn from_str(s: &str) -> Result<Self, Self::Err> {
220 match s {
221 "All" | "all" => Ok(Self::All),
222 "Trusted" | "trusted" => Ok(Self::Trusted),
223 "None" | "none" => Ok(Self::None),
224 _ => Err(format!("Invalid transaction ingress policy: {s}")),
225 }
226 }
227}
228
229#[derive(Debug, Clone, Copy, PartialEq, Eq)]
233pub enum AnnouncementAcceptance {
234 Accept,
236 Ignore,
238 Reject {
240 penalize_peer: bool,
242 },
243}
244
245pub trait AnnouncementFilteringPolicy<N: NetworkPrimitives>:
248 Send + Sync + Unpin + fmt::Debug + 'static
249{
250 fn decide_on_announcement(&self, ty: u8, hash: &B256, size: usize) -> AnnouncementAcceptance;
252}
253
254#[derive(Debug, Clone, Default)]
257#[non_exhaustive]
258pub struct TypedStrictFilter;
259
260impl<N: NetworkPrimitives> AnnouncementFilteringPolicy<N> for TypedStrictFilter {
261 fn decide_on_announcement(&self, ty: u8, hash: &B256, size: usize) -> AnnouncementAcceptance {
262 if N::PooledTransaction::is_type(ty) {
263 AnnouncementAcceptance::Accept
264 } else {
265 tracing::trace!(target: "net::tx::policy::strict_typed",
266 %ty,
267 %size,
268 %hash,
269 "Invalid or unrecognized transaction type byte. Rejecting entry and recommending peer penalization."
270 );
271 AnnouncementAcceptance::Reject { penalize_peer: true }
272 }
273 }
274}
275
276pub type StrictEthAnnouncementFilter = TypedStrictFilter;
278
279#[derive(Debug, Clone, Default)]
284#[non_exhaustive]
285pub struct TypedRelaxedFilter;
286
287impl<N: NetworkPrimitives> AnnouncementFilteringPolicy<N> for TypedRelaxedFilter {
288 fn decide_on_announcement(&self, ty: u8, hash: &B256, size: usize) -> AnnouncementAcceptance {
289 if N::PooledTransaction::is_type(ty) {
290 AnnouncementAcceptance::Accept
291 } else {
292 tracing::trace!(target: "net::tx::policy::relaxed_typed",
293 %ty,
294 %size,
295 %hash,
296 "Unknown transaction type byte. Ignoring entry."
297 );
298 AnnouncementAcceptance::Ignore
299 }
300 }
301}
302
303pub type RelaxedEthAnnouncementFilter = TypedRelaxedFilter;
306
307#[cfg(test)]
308mod tests {
309 use super::*;
310
311 #[test]
312 fn test_transaction_propagation_mode_from_str() {
313 assert_eq!(
315 TransactionPropagationMode::from_str("sqrt").unwrap(),
316 TransactionPropagationMode::Sqrt
317 );
318 assert_eq!(
319 TransactionPropagationMode::from_str("SQRT").unwrap(),
320 TransactionPropagationMode::Sqrt
321 );
322 assert_eq!(
323 TransactionPropagationMode::from_str("Sqrt").unwrap(),
324 TransactionPropagationMode::Sqrt
325 );
326
327 assert_eq!(
329 TransactionPropagationMode::from_str("all").unwrap(),
330 TransactionPropagationMode::All
331 );
332 assert_eq!(
333 TransactionPropagationMode::from_str("ALL").unwrap(),
334 TransactionPropagationMode::All
335 );
336 assert_eq!(
337 TransactionPropagationMode::from_str("All").unwrap(),
338 TransactionPropagationMode::All
339 );
340
341 assert_eq!(
343 TransactionPropagationMode::from_str("max:10").unwrap(),
344 TransactionPropagationMode::Max(10)
345 );
346 assert_eq!(
347 TransactionPropagationMode::from_str("MAX:42").unwrap(),
348 TransactionPropagationMode::Max(42)
349 );
350 assert_eq!(
351 TransactionPropagationMode::from_str("Max:100").unwrap(),
352 TransactionPropagationMode::Max(100)
353 );
354
355 assert!(TransactionPropagationMode::from_str("invalid").is_err());
357 assert!(TransactionPropagationMode::from_str("max:not_a_number").is_err());
358 assert!(TransactionPropagationMode::from_str("max:").is_err());
359 assert!(TransactionPropagationMode::from_str("max").is_err());
360 assert!(TransactionPropagationMode::from_str("").is_err());
361 }
362}