reth_network/transactions/
config.rs1use core::fmt;
2use std::{fmt::Debug, str::FromStr};
3
4use super::{
5 PeerMetadata, DEFAULT_MAX_COUNT_TRANSACTIONS_SEEN_BY_PEER,
6 DEFAULT_SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESP_ON_PACK_GET_POOLED_TRANSACTIONS_REQ,
7 SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESPONSE,
8};
9use crate::transactions::constants::tx_fetcher::{
10 DEFAULT_MAX_CAPACITY_CACHE_PENDING_FETCH, DEFAULT_MAX_COUNT_CONCURRENT_REQUESTS,
11 DEFAULT_MAX_COUNT_CONCURRENT_REQUESTS_PER_PEER,
12};
13use alloy_eips::eip2718::IsTyped2718;
14use alloy_primitives::B256;
15use derive_more::{Constructor, Display};
16use reth_eth_wire::NetworkPrimitives;
17use reth_network_types::peers::kind::PeerKind;
18
19#[derive(Debug, Clone)]
21#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
22pub struct TransactionsManagerConfig {
23 pub transaction_fetcher_config: TransactionFetcherConfig,
25 pub max_transactions_seen_by_peer_history: u32,
27 #[cfg_attr(feature = "serde", serde(default))]
29 pub propagation_mode: TransactionPropagationMode,
30 #[cfg_attr(feature = "serde", serde(default))]
32 pub ingress_policy: TransactionIngressPolicy,
33}
34
35impl Default for TransactionsManagerConfig {
36 fn default() -> Self {
37 Self {
38 transaction_fetcher_config: TransactionFetcherConfig::default(),
39 max_transactions_seen_by_peer_history: DEFAULT_MAX_COUNT_TRANSACTIONS_SEEN_BY_PEER,
40 propagation_mode: TransactionPropagationMode::default(),
41 ingress_policy: TransactionIngressPolicy::default(),
42 }
43 }
44}
45
46#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
48#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
49pub enum TransactionPropagationMode {
50 #[default]
52 Sqrt,
53 All,
55 Max(usize),
57}
58
59impl TransactionPropagationMode {
60 pub(crate) fn full_peer_count(&self, peer_count: usize) -> usize {
62 match self {
63 Self::Sqrt => (peer_count as f64).sqrt().round() as usize,
64 Self::All => peer_count,
65 Self::Max(max) => peer_count.min(*max),
66 }
67 }
68}
69impl std::fmt::Display for TransactionPropagationMode {
70 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
71 match self {
72 Self::Sqrt => write!(f, "sqrt"),
73 Self::All => write!(f, "all"),
74 Self::Max(max) => write!(f, "max:{max}"),
75 }
76 }
77}
78
79impl FromStr for TransactionPropagationMode {
80 type Err = String;
81
82 fn from_str(s: &str) -> Result<Self, Self::Err> {
83 let s = s.to_lowercase();
84 match s.as_str() {
85 "sqrt" => Ok(Self::Sqrt),
86 "all" => Ok(Self::All),
87 s => {
88 if let Some(num) = s.strip_prefix("max:") {
89 num.parse::<usize>()
90 .map(TransactionPropagationMode::Max)
91 .map_err(|_| format!("Invalid number for Max variant: {num}"))
92 } else {
93 Err(format!("Invalid transaction propagation mode: {s}"))
94 }
95 }
96 }
97 }
98}
99
100#[derive(Debug, Constructor, Clone)]
102#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
103pub struct TransactionFetcherConfig {
104 pub max_inflight_requests: u32,
106 pub max_inflight_requests_per_peer: u8,
109 pub soft_limit_byte_size_pooled_transactions_response: usize,
114 pub soft_limit_byte_size_pooled_transactions_response_on_pack_request: usize,
118 pub max_capacity_cache_txns_pending_fetch: u32,
123}
124
125impl Default for TransactionFetcherConfig {
126 fn default() -> Self {
127 Self {
128 max_inflight_requests: DEFAULT_MAX_COUNT_CONCURRENT_REQUESTS,
129 max_inflight_requests_per_peer: DEFAULT_MAX_COUNT_CONCURRENT_REQUESTS_PER_PEER,
130 soft_limit_byte_size_pooled_transactions_response:
131 SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESPONSE,
132 soft_limit_byte_size_pooled_transactions_response_on_pack_request:
133 DEFAULT_SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESP_ON_PACK_GET_POOLED_TRANSACTIONS_REQ,
134 max_capacity_cache_txns_pending_fetch: DEFAULT_MAX_CAPACITY_CACHE_PENDING_FETCH,
135 }
136 }
137}
138
139pub trait TransactionPropagationPolicy<N: NetworkPrimitives>:
141 Send + Sync + Unpin + fmt::Debug + 'static
142{
143 fn can_propagate(&self, peer: &mut PeerMetadata<N>) -> bool;
147
148 fn on_session_established(&mut self, peer: &mut PeerMetadata<N>);
150
151 fn on_session_closed(&mut self, peer: &mut PeerMetadata<N>);
153}
154
155#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Display)]
157#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
158pub enum TransactionPropagationKind {
159 #[default]
163 All,
164 Trusted,
166 None,
168}
169
170impl<N: NetworkPrimitives> TransactionPropagationPolicy<N> for TransactionPropagationKind {
171 fn can_propagate(&self, peer: &mut PeerMetadata<N>) -> bool {
172 match self {
173 Self::All => true,
174 Self::Trusted => peer.peer_kind.is_trusted(),
175 Self::None => false,
176 }
177 }
178
179 fn on_session_established(&mut self, _peer: &mut PeerMetadata<N>) {}
180
181 fn on_session_closed(&mut self, _peer: &mut PeerMetadata<N>) {}
182}
183
184impl FromStr for TransactionPropagationKind {
185 type Err = String;
186
187 fn from_str(s: &str) -> Result<Self, Self::Err> {
188 match s {
189 "All" | "all" => Ok(Self::All),
190 "Trusted" | "trusted" => Ok(Self::Trusted),
191 "None" | "none" => Ok(Self::None),
192 _ => Err(format!("Invalid transaction propagation policy: {s}")),
193 }
194 }
195}
196
197#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Display)]
199#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
200pub enum TransactionIngressPolicy {
201 #[default]
203 All,
204 Trusted,
206 None,
208}
209
210impl TransactionIngressPolicy {
211 pub const fn allows(&self, peer_kind: PeerKind) -> bool {
213 match self {
214 Self::All => true,
215 Self::Trusted => peer_kind.is_trusted(),
216 Self::None => false,
217 }
218 }
219
220 pub const fn allows_all(&self) -> bool {
222 matches!(self, Self::All)
223 }
224}
225
226impl FromStr for TransactionIngressPolicy {
227 type Err = String;
228
229 fn from_str(s: &str) -> Result<Self, Self::Err> {
230 match s {
231 "All" | "all" => Ok(Self::All),
232 "Trusted" | "trusted" => Ok(Self::Trusted),
233 "None" | "none" => Ok(Self::None),
234 _ => Err(format!("Invalid transaction ingress policy: {s}")),
235 }
236 }
237}
238
239#[derive(Debug, Clone, Copy, PartialEq, Eq)]
243pub enum AnnouncementAcceptance {
244 Accept,
246 Ignore,
248 Reject {
250 penalize_peer: bool,
252 },
253}
254
255pub trait AnnouncementFilteringPolicy<N: NetworkPrimitives>:
258 Send + Sync + Unpin + fmt::Debug + 'static
259{
260 fn decide_on_announcement(&self, ty: u8, hash: &B256, size: usize) -> AnnouncementAcceptance;
262}
263
264#[derive(Debug, Clone, Default)]
267#[non_exhaustive]
268pub struct TypedStrictFilter;
269
270impl<N: NetworkPrimitives> AnnouncementFilteringPolicy<N> for TypedStrictFilter {
271 fn decide_on_announcement(&self, ty: u8, hash: &B256, size: usize) -> AnnouncementAcceptance {
272 if N::PooledTransaction::is_type(ty) {
273 AnnouncementAcceptance::Accept
274 } else {
275 tracing::trace!(target: "net::tx::policy::strict_typed",
276 %ty,
277 %size,
278 %hash,
279 "Invalid or unrecognized transaction type byte. Rejecting entry and recommending peer penalization."
280 );
281 AnnouncementAcceptance::Reject { penalize_peer: true }
282 }
283 }
284}
285
286pub type StrictEthAnnouncementFilter = TypedStrictFilter;
288
289#[derive(Debug, Clone, Default)]
294#[non_exhaustive]
295pub struct TypedRelaxedFilter;
296
297impl<N: NetworkPrimitives> AnnouncementFilteringPolicy<N> for TypedRelaxedFilter {
298 fn decide_on_announcement(&self, ty: u8, hash: &B256, size: usize) -> AnnouncementAcceptance {
299 if N::PooledTransaction::is_type(ty) {
300 AnnouncementAcceptance::Accept
301 } else {
302 tracing::trace!(target: "net::tx::policy::relaxed_typed",
303 %ty,
304 %size,
305 %hash,
306 "Unknown transaction type byte. Ignoring entry."
307 );
308 AnnouncementAcceptance::Ignore
309 }
310 }
311}
312
313pub type RelaxedEthAnnouncementFilter = TypedRelaxedFilter;
316
317#[cfg(test)]
318mod tests {
319 use super::*;
320
321 #[test]
322 fn test_transaction_propagation_mode_from_str() {
323 assert_eq!(
325 TransactionPropagationMode::from_str("sqrt").unwrap(),
326 TransactionPropagationMode::Sqrt
327 );
328 assert_eq!(
329 TransactionPropagationMode::from_str("SQRT").unwrap(),
330 TransactionPropagationMode::Sqrt
331 );
332 assert_eq!(
333 TransactionPropagationMode::from_str("Sqrt").unwrap(),
334 TransactionPropagationMode::Sqrt
335 );
336
337 assert_eq!(
339 TransactionPropagationMode::from_str("all").unwrap(),
340 TransactionPropagationMode::All
341 );
342 assert_eq!(
343 TransactionPropagationMode::from_str("ALL").unwrap(),
344 TransactionPropagationMode::All
345 );
346 assert_eq!(
347 TransactionPropagationMode::from_str("All").unwrap(),
348 TransactionPropagationMode::All
349 );
350
351 assert_eq!(
353 TransactionPropagationMode::from_str("max:10").unwrap(),
354 TransactionPropagationMode::Max(10)
355 );
356 assert_eq!(
357 TransactionPropagationMode::from_str("MAX:42").unwrap(),
358 TransactionPropagationMode::Max(42)
359 );
360 assert_eq!(
361 TransactionPropagationMode::from_str("Max:100").unwrap(),
362 TransactionPropagationMode::Max(100)
363 );
364
365 assert!(TransactionPropagationMode::from_str("invalid").is_err());
367 assert!(TransactionPropagationMode::from_str("max:not_a_number").is_err());
368 assert!(TransactionPropagationMode::from_str("max:").is_err());
369 assert!(TransactionPropagationMode::from_str("max").is_err());
370 assert!(TransactionPropagationMode::from_str("").is_err());
371 }
372}