pub mod config;
pub mod constants;
pub mod fetcher;
pub mod validation;
pub use self::constants::{
tx_fetcher::DEFAULT_SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESP_ON_PACK_GET_POOLED_TRANSACTIONS_REQ,
SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESPONSE,
};
pub use config::{TransactionFetcherConfig, TransactionPropagationMode, TransactionsManagerConfig};
pub use validation::*;
pub(crate) use fetcher::{FetchEvent, TransactionFetcher};
use self::constants::{tx_manager::*, DEFAULT_SOFT_LIMIT_BYTE_SIZE_TRANSACTIONS_BROADCAST_MESSAGE};
use crate::{
budget::{
DEFAULT_BUDGET_TRY_DRAIN_NETWORK_TRANSACTION_EVENTS,
DEFAULT_BUDGET_TRY_DRAIN_PENDING_POOL_IMPORTS, DEFAULT_BUDGET_TRY_DRAIN_POOL_IMPORTS,
DEFAULT_BUDGET_TRY_DRAIN_STREAM,
},
cache::LruCache,
duration_metered_exec, metered_poll_nested_stream_with_budget,
metrics::{TransactionsManagerMetrics, NETWORK_POOL_TRANSACTIONS_SCOPE},
NetworkHandle,
};
use alloy_primitives::{TxHash, B256};
use constants::SOFT_LIMIT_COUNT_HASHES_IN_NEW_POOLED_TRANSACTIONS_BROADCAST_MESSAGE;
use futures::{stream::FuturesUnordered, Future, StreamExt};
use reth_eth_wire::{
DedupPayload, EthNetworkPrimitives, EthVersion, GetPooledTransactions, HandleMempoolData,
HandleVersionedMempoolData, NetworkPrimitives, NewPooledTransactionHashes,
NewPooledTransactionHashes66, NewPooledTransactionHashes68, PooledTransactions,
RequestTxHashes, Transactions,
};
use reth_metrics::common::mpsc::UnboundedMeteredReceiver;
use reth_network_api::{
events::{PeerEvent, SessionInfo},
NetworkEvent, NetworkEventListenerProvider, PeerRequest, PeerRequestSender, Peers,
};
use reth_network_p2p::{
error::{RequestError, RequestResult},
sync::SyncStateProvider,
};
use reth_network_peers::PeerId;
use reth_network_types::ReputationChangeKind;
use reth_primitives::{transaction::SignedTransactionIntoRecoveredExt, TransactionSigned};
use reth_primitives_traits::SignedTransaction;
use reth_tokio_util::EventStream;
use reth_transaction_pool::{
error::{PoolError, PoolResult},
GetPooledTransactionLimit, PoolTransaction, PropagateKind, PropagatedTransactions,
TransactionPool, ValidPoolTransaction,
};
use std::{
collections::{hash_map::Entry, HashMap, HashSet},
pin::Pin,
sync::{
atomic::{AtomicUsize, Ordering},
Arc,
},
task::{Context, Poll},
time::{Duration, Instant},
};
use tokio::sync::{mpsc, oneshot, oneshot::error::RecvError};
use tokio_stream::wrappers::{ReceiverStream, UnboundedReceiverStream};
use tracing::{debug, trace};
pub type PoolImportFuture = Pin<Box<dyn Future<Output = Vec<PoolResult<TxHash>>> + Send + 'static>>;
#[derive(Debug, Clone)]
pub struct TransactionsHandle<N: NetworkPrimitives = EthNetworkPrimitives> {
manager_tx: mpsc::UnboundedSender<TransactionsCommand<N>>,
}
impl<N: NetworkPrimitives> TransactionsHandle<N> {
fn send(&self, cmd: TransactionsCommand<N>) {
let _ = self.manager_tx.send(cmd);
}
async fn peer_handle(
&self,
peer_id: PeerId,
) -> Result<Option<PeerRequestSender<PeerRequest<N>>>, RecvError> {
let (tx, rx) = oneshot::channel();
self.send(TransactionsCommand::GetPeerSender { peer_id, peer_request_sender: tx });
rx.await
}
pub fn propagate(&self, hash: TxHash) {
self.send(TransactionsCommand::PropagateHash(hash))
}
pub fn propagate_hash_to(&self, hash: TxHash, peer: PeerId) {
self.propagate_hashes_to(Some(hash), peer)
}
pub fn propagate_hashes_to(&self, hash: impl IntoIterator<Item = TxHash>, peer: PeerId) {
let hashes = hash.into_iter().collect::<Vec<_>>();
if hashes.is_empty() {
return
}
self.send(TransactionsCommand::PropagateHashesTo(hashes, peer))
}
pub async fn get_active_peers(&self) -> Result<HashSet<PeerId>, RecvError> {
let (tx, rx) = oneshot::channel();
self.send(TransactionsCommand::GetActivePeers(tx));
rx.await
}
pub fn propagate_transactions_to(&self, transactions: Vec<TxHash>, peer: PeerId) {
if transactions.is_empty() {
return
}
self.send(TransactionsCommand::PropagateTransactionsTo(transactions, peer))
}
pub fn propagate_transactions(&self, transactions: Vec<TxHash>) {
if transactions.is_empty() {
return
}
self.send(TransactionsCommand::PropagateTransactions(transactions))
}
pub fn broadcast_transactions(
&self,
transactions: impl IntoIterator<Item = N::BroadcastedTransaction>,
) {
let transactions =
transactions.into_iter().map(PropagateTransaction::new).collect::<Vec<_>>();
if transactions.is_empty() {
return
}
self.send(TransactionsCommand::BroadcastTransactions(transactions))
}
pub async fn get_transaction_hashes(
&self,
peers: Vec<PeerId>,
) -> Result<HashMap<PeerId, HashSet<TxHash>>, RecvError> {
if peers.is_empty() {
return Ok(Default::default())
}
let (tx, rx) = oneshot::channel();
self.send(TransactionsCommand::GetTransactionHashes { peers, tx });
rx.await
}
pub async fn get_peer_transaction_hashes(
&self,
peer: PeerId,
) -> Result<HashSet<TxHash>, RecvError> {
let res = self.get_transaction_hashes(vec![peer]).await?;
Ok(res.into_values().next().unwrap_or_default())
}
pub async fn get_pooled_transactions_from(
&self,
peer_id: PeerId,
hashes: Vec<B256>,
) -> Result<Option<Vec<N::PooledTransaction>>, RequestError> {
let Some(peer) = self.peer_handle(peer_id).await? else { return Ok(None) };
let (tx, rx) = oneshot::channel();
let request = PeerRequest::GetPooledTransactions { request: hashes.into(), response: tx };
peer.try_send(request).ok();
rx.await?.map(|res| Some(res.0))
}
}
#[derive(Debug)]
#[must_use = "Manager does nothing unless polled."]
pub struct TransactionsManager<Pool, N: NetworkPrimitives = EthNetworkPrimitives> {
pool: Pool,
network: NetworkHandle<N>,
network_events: EventStream<NetworkEvent<PeerRequest<N>>>,
transaction_fetcher: TransactionFetcher<N>,
transactions_by_peers: HashMap<TxHash, HashSet<PeerId>>,
pool_imports: FuturesUnordered<PoolImportFuture>,
pending_pool_imports_info: PendingPoolImportsInfo,
bad_imports: LruCache<TxHash>,
peers: HashMap<PeerId, PeerMetadata<N>>,
command_tx: mpsc::UnboundedSender<TransactionsCommand<N>>,
command_rx: UnboundedReceiverStream<TransactionsCommand<N>>,
pending_transactions: ReceiverStream<TxHash>,
transaction_events: UnboundedMeteredReceiver<NetworkTransactionEvent<N>>,
config: TransactionsManagerConfig,
metrics: TransactionsManagerMetrics,
}
impl<Pool: TransactionPool, N: NetworkPrimitives> TransactionsManager<Pool, N> {
pub fn new(
network: NetworkHandle<N>,
pool: Pool,
from_network: mpsc::UnboundedReceiver<NetworkTransactionEvent<N>>,
transactions_manager_config: TransactionsManagerConfig,
) -> Self {
let network_events = network.event_listener();
let (command_tx, command_rx) = mpsc::unbounded_channel();
let transaction_fetcher = TransactionFetcher::with_transaction_fetcher_config(
&transactions_manager_config.transaction_fetcher_config,
);
let pending = pool.pending_transactions_listener();
let pending_pool_imports_info = PendingPoolImportsInfo::default();
let metrics = TransactionsManagerMetrics::default();
metrics
.capacity_pending_pool_imports
.increment(pending_pool_imports_info.max_pending_pool_imports as u64);
Self {
pool,
network,
network_events,
transaction_fetcher,
transactions_by_peers: Default::default(),
pool_imports: Default::default(),
pending_pool_imports_info: PendingPoolImportsInfo::new(
DEFAULT_MAX_COUNT_PENDING_POOL_IMPORTS,
),
bad_imports: LruCache::new(DEFAULT_MAX_COUNT_BAD_IMPORTS),
peers: Default::default(),
command_tx,
command_rx: UnboundedReceiverStream::new(command_rx),
pending_transactions: ReceiverStream::new(pending),
transaction_events: UnboundedMeteredReceiver::new(
from_network,
NETWORK_POOL_TRANSACTIONS_SCOPE,
),
config: transactions_manager_config,
metrics,
}
}
pub fn handle(&self) -> TransactionsHandle<N> {
TransactionsHandle { manager_tx: self.command_tx.clone() }
}
fn has_capacity_for_fetching_pending_hashes(&self) -> bool {
self.pending_pool_imports_info
.has_capacity(self.pending_pool_imports_info.max_pending_pool_imports) &&
self.transaction_fetcher.has_capacity_for_fetching_pending_hashes()
}
fn report_peer_bad_transactions(&self, peer_id: PeerId) {
self.report_peer(peer_id, ReputationChangeKind::BadTransactions);
self.metrics.reported_bad_transactions.increment(1);
}
fn report_peer(&self, peer_id: PeerId, kind: ReputationChangeKind) {
trace!(target: "net::tx", ?peer_id, ?kind, "reporting reputation change");
self.network.reputation_change(peer_id, kind);
}
fn report_already_seen(&self, peer_id: PeerId) {
trace!(target: "net::tx", ?peer_id, "Penalizing peer for already seen transaction");
self.network.reputation_change(peer_id, ReputationChangeKind::AlreadySeenTransaction);
}
fn on_good_import(&mut self, hash: TxHash) {
self.transactions_by_peers.remove(&hash);
}
fn on_bad_import(&mut self, err: PoolError) {
let peers = self.transactions_by_peers.remove(&err.hash);
if !err.is_bad_transaction() || self.network.is_syncing() {
return
}
if let Some(peers) = peers {
for peer_id in peers {
self.report_peer_bad_transactions(peer_id);
}
}
self.metrics.bad_imports.increment(1);
self.bad_imports.insert(err.hash);
}
fn on_fetch_hashes_pending_fetch(&mut self) {
let info = &self.pending_pool_imports_info;
let max_pending_pool_imports = info.max_pending_pool_imports;
let has_capacity_wrt_pending_pool_imports =
|divisor| info.has_capacity(max_pending_pool_imports / divisor);
self.transaction_fetcher
.on_fetch_pending_hashes(&self.peers, has_capacity_wrt_pending_pool_imports);
}
fn on_request_error(&self, peer_id: PeerId, req_err: RequestError) {
let kind = match req_err {
RequestError::UnsupportedCapability => ReputationChangeKind::BadProtocol,
RequestError::Timeout => ReputationChangeKind::Timeout,
RequestError::ChannelClosed | RequestError::ConnectionDropped => {
return
}
RequestError::BadResponse => return self.report_peer_bad_transactions(peer_id),
};
self.report_peer(peer_id, kind);
}
#[inline]
fn update_poll_metrics(&self, start: Instant, poll_durations: TxManagerPollDurations) {
let metrics = &self.metrics;
let TxManagerPollDurations {
acc_network_events,
acc_pending_imports,
acc_tx_events,
acc_imported_txns,
acc_fetch_events,
acc_pending_fetch,
acc_cmds,
} = poll_durations;
metrics.duration_poll_tx_manager.set(start.elapsed().as_secs_f64());
metrics.acc_duration_poll_network_events.set(acc_network_events.as_secs_f64());
metrics.acc_duration_poll_pending_pool_imports.set(acc_pending_imports.as_secs_f64());
metrics.acc_duration_poll_transaction_events.set(acc_tx_events.as_secs_f64());
metrics.acc_duration_poll_imported_transactions.set(acc_imported_txns.as_secs_f64());
metrics.acc_duration_poll_fetch_events.set(acc_fetch_events.as_secs_f64());
metrics.acc_duration_fetch_pending_hashes.set(acc_pending_fetch.as_secs_f64());
metrics.acc_duration_poll_commands.set(acc_cmds.as_secs_f64());
}
}
impl<Pool, N> TransactionsManager<Pool, N>
where
Pool: TransactionPool,
N: NetworkPrimitives,
{
fn on_batch_import_result(&mut self, batch_results: Vec<PoolResult<TxHash>>) {
for res in batch_results {
match res {
Ok(hash) => {
self.on_good_import(hash);
}
Err(err) => {
self.on_bad_import(err);
}
}
}
}
fn on_new_pooled_transaction_hashes(
&mut self,
peer_id: PeerId,
msg: NewPooledTransactionHashes,
) {
if self.network.is_initially_syncing() {
return
}
if self.network.tx_gossip_disabled() {
return
}
let Some(peer) = self.peers.get_mut(&peer_id) else {
trace!(
peer_id = format!("{peer_id:#}"),
?msg,
"discarding announcement from inactive peer"
);
return
};
let client = peer.client_version.clone();
let mut count_txns_already_seen_by_peer = 0;
for tx in msg.iter_hashes().copied() {
if !peer.seen_transactions.insert(tx) {
count_txns_already_seen_by_peer += 1;
}
}
if count_txns_already_seen_by_peer > 0 {
self.metrics.messages_with_hashes_already_seen_by_peer.increment(1);
self.metrics
.occurrences_hash_already_seen_by_peer
.increment(count_txns_already_seen_by_peer);
trace!(target: "net::tx",
%count_txns_already_seen_by_peer,
peer_id=format!("{peer_id:#}"),
?client,
"Peer sent hashes that have already been marked as seen by peer"
);
self.report_already_seen(peer_id);
}
let (validation_outcome, mut partially_valid_msg) =
self.transaction_fetcher.filter_valid_message.partially_filter_valid_entries(msg);
if validation_outcome == FilterOutcome::ReportPeer {
self.report_peer(peer_id, ReputationChangeKind::BadAnnouncement);
}
partially_valid_msg.retain_by_hash(|hash| !self.transactions_by_peers.contains_key(hash));
let hashes_count_pre_pool_filter = partially_valid_msg.len();
self.pool.retain_unknown(&mut partially_valid_msg);
if hashes_count_pre_pool_filter > partially_valid_msg.len() {
let already_known_hashes_count =
hashes_count_pre_pool_filter - partially_valid_msg.len();
self.metrics
.occurrences_hashes_already_in_pool
.increment(already_known_hashes_count as u64);
}
if partially_valid_msg.is_empty() {
return
}
let (validation_outcome, mut valid_announcement_data) = if partially_valid_msg
.msg_version()
.expect("partially valid announcement should have version")
.is_eth68()
{
self.transaction_fetcher
.filter_valid_message
.filter_valid_entries_68(partially_valid_msg)
} else {
self.transaction_fetcher
.filter_valid_message
.filter_valid_entries_66(partially_valid_msg)
};
if validation_outcome == FilterOutcome::ReportPeer {
self.report_peer(peer_id, ReputationChangeKind::BadAnnouncement);
}
if valid_announcement_data.is_empty() {
return
}
let bad_imports = &self.bad_imports;
self.transaction_fetcher.filter_unseen_and_pending_hashes(
&mut valid_announcement_data,
|hash| bad_imports.contains(hash),
&peer_id,
&client,
);
if valid_announcement_data.is_empty() {
return
}
trace!(target: "net::tx::propagation",
peer_id=format!("{peer_id:#}"),
hashes_len=valid_announcement_data.iter().count(),
hashes=?valid_announcement_data.keys().collect::<Vec<_>>(),
msg_version=%valid_announcement_data.msg_version(),
client_version=%client,
"received previously unseen and pending hashes in announcement from peer"
);
if !self.transaction_fetcher.is_idle(&peer_id) {
let msg_version = valid_announcement_data.msg_version();
let (hashes, _version) = valid_announcement_data.into_request_hashes();
trace!(target: "net::tx",
peer_id=format!("{peer_id:#}"),
hashes=?*hashes,
%msg_version,
%client,
"buffering hashes announced by busy peer"
);
self.transaction_fetcher.buffer_hashes(hashes, Some(peer_id));
return
}
let mut hashes_to_request =
RequestTxHashes::with_capacity(valid_announcement_data.len() / 4);
let surplus_hashes =
self.transaction_fetcher.pack_request(&mut hashes_to_request, valid_announcement_data);
if !surplus_hashes.is_empty() {
trace!(target: "net::tx",
peer_id=format!("{peer_id:#}"),
surplus_hashes=?*surplus_hashes,
%client,
"some hashes in announcement from peer didn't fit in `GetPooledTransactions` request, buffering surplus hashes"
);
self.transaction_fetcher.buffer_hashes(surplus_hashes, Some(peer_id));
}
trace!(target: "net::tx",
peer_id=format!("{peer_id:#}"),
hashes=?*hashes_to_request,
%client,
"sending hashes in `GetPooledTransactions` request to peer's session"
);
let Some(peer) = self.peers.get_mut(&peer_id) else { return };
if let Some(failed_to_request_hashes) =
self.transaction_fetcher.request_transactions_from_peer(hashes_to_request, peer)
{
let conn_eth_version = peer.version;
trace!(target: "net::tx",
peer_id=format!("{peer_id:#}"),
failed_to_request_hashes=?*failed_to_request_hashes,
%conn_eth_version,
%client,
"sending `GetPooledTransactions` request to peer's session failed, buffering hashes"
);
self.transaction_fetcher.buffer_hashes(failed_to_request_hashes, Some(peer_id));
}
}
}
impl<Pool, N> TransactionsManager<Pool, N>
where
Pool: TransactionPool + 'static,
N: NetworkPrimitives<
BroadcastedTransaction: SignedTransaction,
PooledTransaction: SignedTransaction,
>,
Pool::Transaction:
PoolTransaction<Consensus = N::BroadcastedTransaction, Pooled = N::PooledTransaction>,
{
fn on_new_pending_transactions(&mut self, hashes: Vec<TxHash>) {
if self.network.is_initially_syncing() {
return
}
if self.network.tx_gossip_disabled() {
return
}
trace!(target: "net::tx", num_hashes=?hashes.len(), "Start propagating transactions");
self.propagate_all(hashes);
}
fn propagate_full_transactions_to_peer(
&mut self,
txs: Vec<TxHash>,
peer_id: PeerId,
propagation_mode: PropagationMode,
) -> Option<PropagatedTransactions> {
trace!(target: "net::tx", ?peer_id, "Propagating transactions to peer");
let peer = self.peers.get_mut(&peer_id)?;
let mut propagated = PropagatedTransactions::default();
let mut full_transactions = FullTransactionsBuilder::new(peer.version);
let to_propagate = self.pool.get_all(txs).into_iter().map(PropagateTransaction::pool_tx);
if propagation_mode.is_forced() {
full_transactions.extend(to_propagate);
} else {
for tx in to_propagate {
if !peer.seen_transactions.contains(tx.tx_hash()) {
full_transactions.push(&tx);
}
}
}
if full_transactions.is_empty() {
return None
}
let PropagateTransactions { pooled, full } = full_transactions.build();
if let Some(new_pooled_hashes) = pooled {
for hash in new_pooled_hashes.iter_hashes().copied() {
propagated.0.entry(hash).or_default().push(PropagateKind::Hash(peer_id));
peer.seen_transactions.insert(hash);
}
self.network.send_transactions_hashes(peer_id, new_pooled_hashes);
}
if let Some(new_full_transactions) = full {
for tx in &new_full_transactions {
propagated.0.entry(*tx.tx_hash()).or_default().push(PropagateKind::Full(peer_id));
peer.seen_transactions.insert(*tx.tx_hash());
}
self.network.send_transactions(peer_id, new_full_transactions);
}
self.metrics.propagated_transactions.increment(propagated.0.len() as u64);
Some(propagated)
}
fn propagate_hashes_to(
&mut self,
hashes: Vec<TxHash>,
peer_id: PeerId,
propagation_mode: PropagationMode,
) {
trace!(target: "net::tx", "Start propagating transactions as hashes");
let propagated = {
let Some(peer) = self.peers.get_mut(&peer_id) else {
return
};
let to_propagate = self
.pool
.get_all(hashes)
.into_iter()
.map(PropagateTransaction::pool_tx)
.collect::<Vec<_>>();
let mut propagated = PropagatedTransactions::default();
let mut hashes = PooledTransactionsHashesBuilder::new(peer.version);
if propagation_mode.is_forced() {
hashes.extend(to_propagate)
} else {
for tx in to_propagate {
if !peer.seen_transactions.contains(tx.tx_hash()) {
hashes.push(&tx);
}
}
}
let new_pooled_hashes = hashes.build();
if new_pooled_hashes.is_empty() {
return
}
for hash in new_pooled_hashes.iter_hashes().copied() {
propagated.0.entry(hash).or_default().push(PropagateKind::Hash(peer_id));
}
trace!(target: "net::tx::propagation", ?peer_id, ?new_pooled_hashes, "Propagating transactions to peer");
self.network.send_transactions_hashes(peer_id, new_pooled_hashes);
self.metrics.propagated_transactions.increment(propagated.0.len() as u64);
propagated
};
self.pool.on_propagated(propagated);
}
fn propagate_transactions(
&mut self,
to_propagate: Vec<PropagateTransaction<N::BroadcastedTransaction>>,
propagation_mode: PropagationMode,
) -> PropagatedTransactions {
let mut propagated = PropagatedTransactions::default();
if self.network.tx_gossip_disabled() {
return propagated
}
let max_num_full = self.config.propagation_mode.full_peer_count(self.peers.len());
for (peer_idx, (peer_id, peer)) in self.peers.iter_mut().enumerate() {
let mut builder = if peer_idx > max_num_full {
PropagateTransactionsBuilder::pooled(peer.version)
} else {
PropagateTransactionsBuilder::full(peer.version)
};
if propagation_mode.is_forced() {
builder.extend(to_propagate.iter());
} else {
for tx in &to_propagate {
if !peer.seen_transactions.contains(tx.tx_hash()) {
builder.push(tx);
}
}
}
if builder.is_empty() {
trace!(target: "net::tx", ?peer_id, "Nothing to propagate to peer; has seen all transactions");
continue
}
let PropagateTransactions { pooled, full } = builder.build();
if let Some(mut new_pooled_hashes) = pooled {
new_pooled_hashes
.truncate(SOFT_LIMIT_COUNT_HASHES_IN_NEW_POOLED_TRANSACTIONS_BROADCAST_MESSAGE);
for hash in new_pooled_hashes.iter_hashes().copied() {
propagated.0.entry(hash).or_default().push(PropagateKind::Hash(*peer_id));
peer.seen_transactions.insert(hash);
}
trace!(target: "net::tx", ?peer_id, num_txs=?new_pooled_hashes.len(), "Propagating tx hashes to peer");
self.network.send_transactions_hashes(*peer_id, new_pooled_hashes);
}
if let Some(new_full_transactions) = full {
for tx in &new_full_transactions {
propagated
.0
.entry(*tx.tx_hash())
.or_default()
.push(PropagateKind::Full(*peer_id));
peer.seen_transactions.insert(*tx.tx_hash());
}
trace!(target: "net::tx", ?peer_id, num_txs=?new_full_transactions.len(), "Propagating full transactions to peer");
self.network.send_transactions(*peer_id, new_full_transactions);
}
}
self.metrics.propagated_transactions.increment(propagated.0.len() as u64);
propagated
}
fn propagate_all(&mut self, hashes: Vec<TxHash>) {
let propagated = self.propagate_transactions(
self.pool.get_all(hashes).into_iter().map(PropagateTransaction::pool_tx).collect(),
PropagationMode::Basic,
);
self.pool.on_propagated(propagated);
}
fn on_get_pooled_transactions(
&mut self,
peer_id: PeerId,
request: GetPooledTransactions,
response: oneshot::Sender<RequestResult<PooledTransactions<N::PooledTransaction>>>,
) {
if let Some(peer) = self.peers.get_mut(&peer_id) {
if self.network.tx_gossip_disabled() {
let _ = response.send(Ok(PooledTransactions::default()));
return
}
let transactions = self.pool.get_pooled_transaction_elements(
request.0,
GetPooledTransactionLimit::ResponseSizeSoftLimit(
self.transaction_fetcher.info.soft_limit_byte_size_pooled_transactions_response,
),
);
trace!(target: "net::tx::propagation", sent_txs=?transactions.iter().map(|tx| tx.tx_hash()), "Sending requested transactions to peer");
peer.seen_transactions.extend(transactions.iter().map(|tx| *tx.tx_hash()));
let resp = PooledTransactions(transactions);
let _ = response.send(Ok(resp));
}
}
fn on_command(&mut self, cmd: TransactionsCommand<N>) {
match cmd {
TransactionsCommand::PropagateHash(hash) => {
self.on_new_pending_transactions(vec![hash])
}
TransactionsCommand::PropagateHashesTo(hashes, peer) => {
self.propagate_hashes_to(hashes, peer, PropagationMode::Forced)
}
TransactionsCommand::GetActivePeers(tx) => {
let peers = self.peers.keys().copied().collect::<HashSet<_>>();
tx.send(peers).ok();
}
TransactionsCommand::PropagateTransactionsTo(txs, peer) => {
if let Some(propagated) =
self.propagate_full_transactions_to_peer(txs, peer, PropagationMode::Forced)
{
self.pool.on_propagated(propagated);
}
}
TransactionsCommand::PropagateTransactions(txs) => self.propagate_all(txs),
TransactionsCommand::BroadcastTransactions(txs) => {
self.propagate_transactions(txs, PropagationMode::Forced);
}
TransactionsCommand::GetTransactionHashes { peers, tx } => {
let mut res = HashMap::with_capacity(peers.len());
for peer_id in peers {
let hashes = self
.peers
.get(&peer_id)
.map(|peer| peer.seen_transactions.iter().copied().collect::<HashSet<_>>())
.unwrap_or_default();
res.insert(peer_id, hashes);
}
tx.send(res).ok();
}
TransactionsCommand::GetPeerSender { peer_id, peer_request_sender } => {
let sender = self.peers.get(&peer_id).map(|peer| peer.request_tx.clone());
peer_request_sender.send(sender).ok();
}
}
}
fn handle_peer_session(
&mut self,
info: SessionInfo,
messages: PeerRequestSender<PeerRequest<N>>,
) {
let SessionInfo { peer_id, client_version, version, .. } = info;
let peer = PeerMetadata::<N>::new(
messages,
version,
client_version,
self.config.max_transactions_seen_by_peer_history,
);
let peer = match self.peers.entry(peer_id) {
Entry::Occupied(mut entry) => {
entry.insert(peer);
entry.into_mut()
}
Entry::Vacant(entry) => entry.insert(peer),
};
if self.network.is_initially_syncing() || self.network.tx_gossip_disabled() {
trace!(target: "net::tx", ?peer_id, "Skipping transaction broadcast: node syncing or gossip disabled");
return
}
let pooled_txs = self.pool.pooled_transactions_max(
SOFT_LIMIT_COUNT_HASHES_IN_NEW_POOLED_TRANSACTIONS_BROADCAST_MESSAGE,
);
if pooled_txs.is_empty() {
trace!(target: "net::tx", ?peer_id, "No transactions in the pool to broadcast");
return;
}
let mut msg_builder = PooledTransactionsHashesBuilder::new(version);
for pooled_tx in pooled_txs {
peer.seen_transactions.insert(*pooled_tx.hash());
msg_builder.push_pooled(pooled_tx);
}
debug!(target: "net::tx", ?peer_id, tx_count = msg_builder.is_empty(), "Broadcasting transaction hashes");
let msg = msg_builder.build();
self.network.send_transactions_hashes(peer_id, msg);
}
fn on_network_event(&mut self, event_result: NetworkEvent<PeerRequest<N>>) {
match event_result {
NetworkEvent::Peer(PeerEvent::SessionClosed { peer_id, .. }) => {
self.peers.remove(&peer_id);
self.transaction_fetcher.remove_peer(&peer_id);
}
NetworkEvent::ActivePeerSession { info, messages } => {
self.handle_peer_session(info, messages);
}
NetworkEvent::Peer(PeerEvent::SessionEstablished(info)) => {
let peer_id = info.peer_id;
let messages = match self.peers.get(&peer_id) {
Some(p) => p.request_tx.clone(),
None => {
debug!(target: "net::tx", ?peer_id, "No peer request sender found");
return;
}
};
self.handle_peer_session(info, messages);
}
_ => {}
}
}
fn on_network_tx_event(&mut self, event: NetworkTransactionEvent<N>) {
match event {
NetworkTransactionEvent::IncomingTransactions { peer_id, msg } => {
let has_blob_txs = msg.has_eip4844();
let non_blob_txs = msg
.0
.into_iter()
.map(N::PooledTransaction::try_from)
.filter_map(Result::ok)
.collect();
self.import_transactions(peer_id, non_blob_txs, TransactionSource::Broadcast);
if has_blob_txs {
debug!(target: "net::tx", ?peer_id, "received bad full blob transaction broadcast");
self.report_peer_bad_transactions(peer_id);
}
}
NetworkTransactionEvent::IncomingPooledTransactionHashes { peer_id, msg } => {
self.on_new_pooled_transaction_hashes(peer_id, msg)
}
NetworkTransactionEvent::GetPooledTransactions { peer_id, request, response } => {
self.on_get_pooled_transactions(peer_id, request, response)
}
NetworkTransactionEvent::GetTransactionsHandle(response) => {
let _ = response.send(Some(self.handle()));
}
}
}
fn import_transactions(
&mut self,
peer_id: PeerId,
transactions: PooledTransactions<N::PooledTransaction>,
source: TransactionSource,
) {
if self.network.is_initially_syncing() {
return
}
if self.network.tx_gossip_disabled() {
return
}
let Some(peer) = self.peers.get_mut(&peer_id) else { return };
let mut transactions = transactions.0;
self.transaction_fetcher
.remove_hashes_from_transaction_fetcher(transactions.iter().map(|tx| *tx.tx_hash()));
let mut num_already_seen_by_peer = 0;
for tx in &transactions {
if source.is_broadcast() && !peer.seen_transactions.insert(*tx.tx_hash()) {
num_already_seen_by_peer += 1;
}
}
let txns_count_pre_pool_filter = transactions.len();
self.pool.retain_unknown(&mut transactions);
if txns_count_pre_pool_filter > transactions.len() {
let already_known_txns_count = txns_count_pre_pool_filter - transactions.len();
self.metrics
.occurrences_transactions_already_in_pool
.increment(already_known_txns_count as u64);
}
let mut has_bad_transactions = false;
if let Some(peer) = self.peers.get_mut(&peer_id) {
let mut new_txs = Vec::with_capacity(transactions.len());
for tx in transactions {
let tx = match tx.try_into_ecrecovered() {
Ok(tx) => tx,
Err(badtx) => {
trace!(target: "net::tx",
peer_id=format!("{peer_id:#}"),
hash=%badtx.tx_hash(),
client_version=%peer.client_version,
"failed ecrecovery for transaction"
);
has_bad_transactions = true;
continue
}
};
match self.transactions_by_peers.entry(*tx.tx_hash()) {
Entry::Occupied(mut entry) => {
entry.get_mut().insert(peer_id);
}
Entry::Vacant(entry) => {
if self.bad_imports.contains(tx.tx_hash()) {
trace!(target: "net::tx",
peer_id=format!("{peer_id:#}"),
hash=%tx.tx_hash(),
client_version=%peer.client_version,
"received a known bad transaction from peer"
);
has_bad_transactions = true;
} else {
let pool_transaction = Pool::Transaction::from_pooled(tx);
new_txs.push(pool_transaction);
entry.insert(HashSet::from([peer_id]));
}
}
}
}
new_txs.shrink_to_fit();
if !new_txs.is_empty() {
let pool = self.pool.clone();
let metric_pending_pool_imports = self.metrics.pending_pool_imports.clone();
metric_pending_pool_imports.increment(new_txs.len() as f64);
self.pending_pool_imports_info
.pending_pool_imports
.fetch_add(new_txs.len(), Ordering::Relaxed);
let tx_manager_info_pending_pool_imports =
self.pending_pool_imports_info.pending_pool_imports.clone();
trace!(target: "net::tx::propagation", new_txs_len=?new_txs.len(), "Importing new transactions");
let import = Box::pin(async move {
let added = new_txs.len();
let res = pool.add_external_transactions(new_txs).await;
metric_pending_pool_imports.decrement(added as f64);
tx_manager_info_pending_pool_imports.fetch_sub(added, Ordering::Relaxed);
res
});
self.pool_imports.push(import);
}
if num_already_seen_by_peer > 0 {
self.metrics.messages_with_transactions_already_seen_by_peer.increment(1);
self.metrics
.occurrences_of_transaction_already_seen_by_peer
.increment(num_already_seen_by_peer);
trace!(target: "net::tx", num_txs=%num_already_seen_by_peer, ?peer_id, client=?peer.client_version, "Peer sent already seen transactions");
}
}
if has_bad_transactions {
self.report_peer_bad_transactions(peer_id)
}
if num_already_seen_by_peer > 0 {
self.report_already_seen(peer_id);
}
}
fn on_fetch_event(&mut self, fetch_event: FetchEvent<N::PooledTransaction>) {
match fetch_event {
FetchEvent::TransactionsFetched { peer_id, transactions } => {
self.import_transactions(peer_id, transactions, TransactionSource::Response);
}
FetchEvent::FetchError { peer_id, error } => {
trace!(target: "net::tx", ?peer_id, %error, "requesting transactions from peer failed");
self.on_request_error(peer_id, error);
}
FetchEvent::EmptyResponse { peer_id } => {
trace!(target: "net::tx", ?peer_id, "peer returned empty response");
}
}
}
}
impl<Pool, N> Future for TransactionsManager<Pool, N>
where
Pool: TransactionPool + Unpin + 'static,
N: NetworkPrimitives<
BroadcastedTransaction: SignedTransaction,
PooledTransaction: SignedTransaction,
>,
Pool::Transaction:
PoolTransaction<Consensus = N::BroadcastedTransaction, Pooled = N::PooledTransaction>,
{
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let start = Instant::now();
let mut poll_durations = TxManagerPollDurations::default();
let this = self.get_mut();
let maybe_more_network_events = metered_poll_nested_stream_with_budget!(
poll_durations.acc_network_events,
"net::tx",
"Network events stream",
DEFAULT_BUDGET_TRY_DRAIN_STREAM,
this.network_events.poll_next_unpin(cx),
|event| this.on_network_event(event)
);
let mut new_txs = Vec::new();
let maybe_more_pending_txns = metered_poll_nested_stream_with_budget!(
poll_durations.acc_imported_txns,
"net::tx",
"Pending transactions stream",
DEFAULT_BUDGET_TRY_DRAIN_POOL_IMPORTS,
this.pending_transactions.poll_next_unpin(cx),
|hash| new_txs.push(hash)
);
if !new_txs.is_empty() {
this.on_new_pending_transactions(new_txs);
}
let maybe_more_tx_fetch_events = metered_poll_nested_stream_with_budget!(
poll_durations.acc_fetch_events,
"net::tx",
"Transaction fetch events stream",
DEFAULT_BUDGET_TRY_DRAIN_STREAM,
this.transaction_fetcher.poll_next_unpin(cx),
|event| this.on_fetch_event(event),
);
let maybe_more_tx_events = metered_poll_nested_stream_with_budget!(
poll_durations.acc_tx_events,
"net::tx",
"Network transaction events stream",
DEFAULT_BUDGET_TRY_DRAIN_NETWORK_TRANSACTION_EVENTS,
this.transaction_events.poll_next_unpin(cx),
|event| this.on_network_tx_event(event),
);
let maybe_more_pool_imports = metered_poll_nested_stream_with_budget!(
poll_durations.acc_pending_imports,
"net::tx",
"Batched pool imports stream",
DEFAULT_BUDGET_TRY_DRAIN_PENDING_POOL_IMPORTS,
this.pool_imports.poll_next_unpin(cx),
|batch_results| this.on_batch_import_result(batch_results)
);
duration_metered_exec!(
{
if this.has_capacity_for_fetching_pending_hashes() {
this.on_fetch_hashes_pending_fetch();
}
},
poll_durations.acc_pending_fetch
);
let maybe_more_commands = metered_poll_nested_stream_with_budget!(
poll_durations.acc_cmds,
"net::tx",
"Commands channel",
DEFAULT_BUDGET_TRY_DRAIN_STREAM,
this.command_rx.poll_next_unpin(cx),
|cmd| this.on_command(cmd)
);
this.transaction_fetcher.update_metrics();
if maybe_more_network_events ||
maybe_more_commands ||
maybe_more_tx_events ||
maybe_more_tx_fetch_events ||
maybe_more_pool_imports ||
maybe_more_pending_txns
{
cx.waker().wake_by_ref();
return Poll::Pending
}
this.update_poll_metrics(start, poll_durations);
Poll::Pending
}
}
#[derive(Debug, Copy, Clone, Eq, PartialEq)]
enum PropagationMode {
Basic,
Forced,
}
impl PropagationMode {
const fn is_forced(self) -> bool {
matches!(self, Self::Forced)
}
}
#[derive(Debug, Clone)]
struct PropagateTransaction<T = TransactionSigned> {
size: usize,
transaction: Arc<T>,
}
impl<T: SignedTransaction> PropagateTransaction<T> {
pub fn new(transaction: T) -> Self {
let size = transaction.length();
Self { size, transaction: Arc::new(transaction) }
}
fn pool_tx<P>(tx: Arc<ValidPoolTransaction<P>>) -> Self
where
P: PoolTransaction<Consensus = T>,
{
let size = tx.encoded_length();
let transaction = tx.transaction.clone_into_consensus();
let transaction = Arc::new(transaction.into_signed());
Self { size, transaction }
}
fn tx_hash(&self) -> &TxHash {
self.transaction.tx_hash()
}
}
#[derive(Debug, Clone)]
enum PropagateTransactionsBuilder<T> {
Pooled(PooledTransactionsHashesBuilder),
Full(FullTransactionsBuilder<T>),
}
impl<T> PropagateTransactionsBuilder<T> {
fn pooled(version: EthVersion) -> Self {
Self::Pooled(PooledTransactionsHashesBuilder::new(version))
}
fn full(version: EthVersion) -> Self {
Self::Full(FullTransactionsBuilder::new(version))
}
fn is_empty(&self) -> bool {
match self {
Self::Pooled(builder) => builder.is_empty(),
Self::Full(builder) => builder.is_empty(),
}
}
fn build(self) -> PropagateTransactions<T> {
match self {
Self::Pooled(pooled) => {
PropagateTransactions { pooled: Some(pooled.build()), full: None }
}
Self::Full(full) => full.build(),
}
}
}
impl<T: SignedTransaction> PropagateTransactionsBuilder<T> {
fn extend<'a>(&mut self, txs: impl IntoIterator<Item = &'a PropagateTransaction<T>>) {
for tx in txs {
self.push(tx);
}
}
fn push(&mut self, transaction: &PropagateTransaction<T>) {
match self {
Self::Pooled(builder) => builder.push(transaction),
Self::Full(builder) => builder.push(transaction),
}
}
}
struct PropagateTransactions<T> {
pooled: Option<NewPooledTransactionHashes>,
full: Option<Vec<Arc<T>>>,
}
#[derive(Debug, Clone)]
struct FullTransactionsBuilder<T> {
total_size: usize,
transactions: Vec<Arc<T>>,
pooled: PooledTransactionsHashesBuilder,
}
impl<T> FullTransactionsBuilder<T> {
fn new(version: EthVersion) -> Self {
Self {
total_size: 0,
pooled: PooledTransactionsHashesBuilder::new(version),
transactions: vec![],
}
}
fn is_empty(&self) -> bool {
self.transactions.is_empty() && self.pooled.is_empty()
}
fn build(self) -> PropagateTransactions<T> {
let pooled = Some(self.pooled.build()).filter(|pooled| !pooled.is_empty());
let full = Some(self.transactions).filter(|full| !full.is_empty());
PropagateTransactions { pooled, full }
}
}
impl<T: SignedTransaction> FullTransactionsBuilder<T> {
fn extend(&mut self, txs: impl IntoIterator<Item = PropagateTransaction<T>>) {
for tx in txs {
self.push(&tx)
}
}
fn push(&mut self, transaction: &PropagateTransaction<T>) {
if !transaction.transaction.is_broadcastable_in_full() {
self.pooled.push(transaction);
return
}
let new_size = self.total_size + transaction.size;
if new_size > DEFAULT_SOFT_LIMIT_BYTE_SIZE_TRANSACTIONS_BROADCAST_MESSAGE &&
self.total_size > 0
{
self.pooled.push(transaction);
return
}
self.total_size = new_size;
self.transactions.push(Arc::clone(&transaction.transaction));
}
}
#[derive(Debug, Clone)]
enum PooledTransactionsHashesBuilder {
Eth66(NewPooledTransactionHashes66),
Eth68(NewPooledTransactionHashes68),
}
impl PooledTransactionsHashesBuilder {
fn push_pooled<T: PoolTransaction>(&mut self, pooled_tx: Arc<ValidPoolTransaction<T>>) {
match self {
Self::Eth66(msg) => msg.0.push(*pooled_tx.hash()),
Self::Eth68(msg) => {
msg.hashes.push(*pooled_tx.hash());
msg.sizes.push(pooled_tx.encoded_length());
msg.types.push(pooled_tx.transaction.tx_type());
}
}
}
fn is_empty(&self) -> bool {
match self {
Self::Eth66(hashes) => hashes.is_empty(),
Self::Eth68(hashes) => hashes.is_empty(),
}
}
fn extend<T: SignedTransaction>(
&mut self,
txs: impl IntoIterator<Item = PropagateTransaction<T>>,
) {
for tx in txs {
self.push(&tx);
}
}
fn push<T: SignedTransaction>(&mut self, tx: &PropagateTransaction<T>) {
match self {
Self::Eth66(msg) => msg.0.push(*tx.tx_hash()),
Self::Eth68(msg) => {
msg.hashes.push(*tx.tx_hash());
msg.sizes.push(tx.size);
msg.types.push(tx.transaction.ty());
}
}
}
fn new(version: EthVersion) -> Self {
match version {
EthVersion::Eth66 | EthVersion::Eth67 => Self::Eth66(Default::default()),
EthVersion::Eth68 | EthVersion::Eth69 => Self::Eth68(Default::default()),
}
}
fn build(self) -> NewPooledTransactionHashes {
match self {
Self::Eth66(msg) => msg.into(),
Self::Eth68(msg) => msg.into(),
}
}
}
enum TransactionSource {
Broadcast,
Response,
}
impl TransactionSource {
const fn is_broadcast(&self) -> bool {
matches!(self, Self::Broadcast)
}
}
#[derive(Debug)]
pub struct PeerMetadata<N: NetworkPrimitives = EthNetworkPrimitives> {
seen_transactions: LruCache<TxHash>,
request_tx: PeerRequestSender<PeerRequest<N>>,
version: EthVersion,
client_version: Arc<str>,
}
impl<N: NetworkPrimitives> PeerMetadata<N> {
fn new(
request_tx: PeerRequestSender<PeerRequest<N>>,
version: EthVersion,
client_version: Arc<str>,
max_transactions_seen_by_peer: u32,
) -> Self {
Self {
seen_transactions: LruCache::new(max_transactions_seen_by_peer),
request_tx,
version,
client_version,
}
}
}
#[derive(Debug)]
enum TransactionsCommand<N: NetworkPrimitives = EthNetworkPrimitives> {
PropagateHash(B256),
PropagateHashesTo(Vec<B256>, PeerId),
GetActivePeers(oneshot::Sender<HashSet<PeerId>>),
PropagateTransactionsTo(Vec<TxHash>, PeerId),
PropagateTransactions(Vec<TxHash>),
BroadcastTransactions(Vec<PropagateTransaction<N::BroadcastedTransaction>>),
GetTransactionHashes {
peers: Vec<PeerId>,
tx: oneshot::Sender<HashMap<PeerId, HashSet<TxHash>>>,
},
GetPeerSender {
peer_id: PeerId,
peer_request_sender: oneshot::Sender<Option<PeerRequestSender<PeerRequest<N>>>>,
},
}
#[derive(Debug)]
pub enum NetworkTransactionEvent<N: NetworkPrimitives = EthNetworkPrimitives> {
IncomingTransactions {
peer_id: PeerId,
msg: Transactions<N::BroadcastedTransaction>,
},
IncomingPooledTransactionHashes {
peer_id: PeerId,
msg: NewPooledTransactionHashes,
},
GetPooledTransactions {
peer_id: PeerId,
request: GetPooledTransactions,
response: oneshot::Sender<RequestResult<PooledTransactions<N::PooledTransaction>>>,
},
GetTransactionsHandle(oneshot::Sender<Option<TransactionsHandle<N>>>),
}
#[derive(Debug)]
pub struct PendingPoolImportsInfo {
pending_pool_imports: Arc<AtomicUsize>,
max_pending_pool_imports: usize,
}
impl PendingPoolImportsInfo {
pub fn new(max_pending_pool_imports: usize) -> Self {
Self { pending_pool_imports: Arc::new(AtomicUsize::default()), max_pending_pool_imports }
}
pub fn has_capacity(&self, max_pending_pool_imports: usize) -> bool {
self.pending_pool_imports.load(Ordering::Relaxed) < max_pending_pool_imports
}
}
impl Default for PendingPoolImportsInfo {
fn default() -> Self {
Self::new(DEFAULT_MAX_COUNT_PENDING_POOL_IMPORTS)
}
}
#[derive(Debug, Default)]
struct TxManagerPollDurations {
acc_network_events: Duration,
acc_pending_imports: Duration,
acc_tx_events: Duration,
acc_imported_txns: Duration,
acc_fetch_events: Duration,
acc_pending_fetch: Duration,
acc_cmds: Duration,
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{test_utils::Testnet, NetworkConfigBuilder, NetworkManager};
use alloy_primitives::hex;
use alloy_rlp::Decodable;
use constants::tx_fetcher::DEFAULT_MAX_COUNT_FALLBACK_PEERS;
use futures::FutureExt;
use reth_network_api::NetworkInfo;
use reth_network_p2p::{
error::{RequestError, RequestResult},
sync::{NetworkSyncUpdater, SyncState},
};
use reth_storage_api::noop::NoopProvider;
use reth_transaction_pool::test_utils::{
testing_pool, MockTransaction, MockTransactionFactory, TestPool,
};
use secp256k1::SecretKey;
use std::{
fmt,
future::poll_fn,
hash,
net::{IpAddr, Ipv4Addr, SocketAddr},
};
use tests::fetcher::TxFetchMetadata;
use tracing::error;
async fn new_tx_manager(
) -> (TransactionsManager<TestPool, EthNetworkPrimitives>, NetworkManager<EthNetworkPrimitives>)
{
let secret_key = SecretKey::new(&mut rand::thread_rng());
let client = NoopProvider::default();
let config = NetworkConfigBuilder::new(secret_key)
.listener_port(0)
.disable_discovery()
.build(client);
let pool = testing_pool();
let transactions_manager_config = config.transactions_manager_config.clone();
let (_network_handle, network, transactions, _) = NetworkManager::new(config)
.await
.unwrap()
.into_builder()
.transactions(pool.clone(), transactions_manager_config)
.split_with_handle();
(transactions, network)
}
pub(super) fn default_cache<T: hash::Hash + Eq + fmt::Debug>() -> LruCache<T> {
LruCache::new(DEFAULT_MAX_COUNT_FALLBACK_PEERS as u32)
}
pub(super) fn new_mock_session(
peer_id: PeerId,
version: EthVersion,
) -> (PeerMetadata<EthNetworkPrimitives>, mpsc::Receiver<PeerRequest>) {
let (to_mock_session_tx, to_mock_session_rx) = mpsc::channel(1);
(
PeerMetadata::new(
PeerRequestSender::new(peer_id, to_mock_session_tx),
version,
Arc::from(""),
DEFAULT_MAX_COUNT_TRANSACTIONS_SEEN_BY_PEER,
),
to_mock_session_rx,
)
}
#[tokio::test(flavor = "multi_thread")]
async fn test_ignored_tx_broadcasts_while_initially_syncing() {
reth_tracing::init_test_tracing();
let net = Testnet::create(3).await;
let mut handles = net.handles();
let handle0 = handles.next().unwrap();
let handle1 = handles.next().unwrap();
drop(handles);
let handle = net.spawn();
let listener0 = handle0.event_listener();
handle0.add_peer(*handle1.peer_id(), handle1.local_addr());
let secret_key = SecretKey::new(&mut rand::thread_rng());
let client = NoopProvider::default();
let pool = testing_pool();
let config = NetworkConfigBuilder::<EthNetworkPrimitives>::new(secret_key)
.disable_discovery()
.listener_port(0)
.build(client);
let transactions_manager_config = config.transactions_manager_config.clone();
let (network_handle, network, mut transactions, _) = NetworkManager::new(config)
.await
.unwrap()
.into_builder()
.transactions(pool.clone(), transactions_manager_config)
.split_with_handle();
tokio::task::spawn(network);
network_handle.update_sync_state(SyncState::Syncing);
assert!(NetworkInfo::is_syncing(&network_handle));
assert!(NetworkInfo::is_initially_syncing(&network_handle));
let mut established = listener0.take(2);
while let Some(ev) = established.next().await {
match ev {
NetworkEvent::Peer(PeerEvent::SessionEstablished(info)) => {
transactions
.on_network_event(NetworkEvent::Peer(PeerEvent::SessionEstablished(info)))
}
NetworkEvent::Peer(PeerEvent::PeerAdded(_peer_id)) => continue,
ev => {
error!("unexpected event {ev:?}")
}
}
}
let input = hex!("02f871018302a90f808504890aef60826b6c94ddf4c5025d1a5742cf12f74eec246d4432c295e487e09c3bbcc12b2b80c080a0f21a4eacd0bf8fea9c5105c543be5a1d8c796516875710fafafdf16d16d8ee23a001280915021bb446d1973501a67f93d2b38894a514b976e7b46dc2fe54598d76");
let signed_tx = TransactionSigned::decode(&mut &input[..]).unwrap();
transactions.on_network_tx_event(NetworkTransactionEvent::IncomingTransactions {
peer_id: *handle1.peer_id(),
msg: Transactions(vec![signed_tx.clone()]),
});
poll_fn(|cx| {
let _ = transactions.poll_unpin(cx);
Poll::Ready(())
})
.await;
assert!(pool.is_empty());
handle.terminate().await;
}
#[tokio::test(flavor = "multi_thread")]
async fn test_tx_broadcasts_through_two_syncs() {
reth_tracing::init_test_tracing();
let net = Testnet::create(3).await;
let mut handles = net.handles();
let handle0 = handles.next().unwrap();
let handle1 = handles.next().unwrap();
drop(handles);
let handle = net.spawn();
let listener0 = handle0.event_listener();
handle0.add_peer(*handle1.peer_id(), handle1.local_addr());
let secret_key = SecretKey::new(&mut rand::thread_rng());
let client = NoopProvider::default();
let pool = testing_pool();
let config = NetworkConfigBuilder::new(secret_key)
.disable_discovery()
.listener_port(0)
.build(client);
let transactions_manager_config = config.transactions_manager_config.clone();
let (network_handle, network, mut transactions, _) = NetworkManager::new(config)
.await
.unwrap()
.into_builder()
.transactions(pool.clone(), transactions_manager_config)
.split_with_handle();
tokio::task::spawn(network);
network_handle.update_sync_state(SyncState::Syncing);
assert!(NetworkInfo::is_syncing(&network_handle));
network_handle.update_sync_state(SyncState::Idle);
assert!(!NetworkInfo::is_syncing(&network_handle));
network_handle.update_sync_state(SyncState::Syncing);
assert!(NetworkInfo::is_syncing(&network_handle));
let mut established = listener0.take(2);
while let Some(ev) = established.next().await {
match ev {
NetworkEvent::ActivePeerSession { .. } |
NetworkEvent::Peer(PeerEvent::SessionEstablished(_)) => {
transactions.on_network_event(ev);
}
NetworkEvent::Peer(PeerEvent::PeerAdded(_peer_id)) => continue,
_ => {
error!("unexpected event {ev:?}")
}
}
}
let input = hex!("02f871018302a90f808504890aef60826b6c94ddf4c5025d1a5742cf12f74eec246d4432c295e487e09c3bbcc12b2b80c080a0f21a4eacd0bf8fea9c5105c543be5a1d8c796516875710fafafdf16d16d8ee23a001280915021bb446d1973501a67f93d2b38894a514b976e7b46dc2fe54598d76");
let signed_tx = TransactionSigned::decode(&mut &input[..]).unwrap();
transactions.on_network_tx_event(NetworkTransactionEvent::IncomingTransactions {
peer_id: *handle1.peer_id(),
msg: Transactions(vec![signed_tx.clone()]),
});
poll_fn(|cx| {
let _ = transactions.poll_unpin(cx);
Poll::Ready(())
})
.await;
assert!(!NetworkInfo::is_initially_syncing(&network_handle));
assert!(NetworkInfo::is_syncing(&network_handle));
assert!(!pool.is_empty());
handle.terminate().await;
}
#[tokio::test(flavor = "multi_thread")]
async fn test_handle_incoming_transactions() {
reth_tracing::init_test_tracing();
let net = Testnet::create(3).await;
let mut handles = net.handles();
let handle0 = handles.next().unwrap();
let handle1 = handles.next().unwrap();
drop(handles);
let handle = net.spawn();
let listener0 = handle0.event_listener();
handle0.add_peer(*handle1.peer_id(), handle1.local_addr());
let secret_key = SecretKey::new(&mut rand::thread_rng());
let client = NoopProvider::default();
let pool = testing_pool();
let config = NetworkConfigBuilder::new(secret_key)
.disable_discovery()
.listener_port(0)
.build(client);
let transactions_manager_config = config.transactions_manager_config.clone();
let (network_handle, network, mut transactions, _) = NetworkManager::new(config)
.await
.unwrap()
.into_builder()
.transactions(pool.clone(), transactions_manager_config)
.split_with_handle();
tokio::task::spawn(network);
network_handle.update_sync_state(SyncState::Idle);
assert!(!NetworkInfo::is_syncing(&network_handle));
let mut established = listener0.take(2);
while let Some(ev) = established.next().await {
match ev {
NetworkEvent::ActivePeerSession { .. } |
NetworkEvent::Peer(PeerEvent::SessionEstablished(_)) => {
transactions.on_network_event(ev);
}
NetworkEvent::Peer(PeerEvent::PeerAdded(_peer_id)) => continue,
ev => {
error!("unexpected event {ev:?}")
}
}
}
let input = hex!("02f871018302a90f808504890aef60826b6c94ddf4c5025d1a5742cf12f74eec246d4432c295e487e09c3bbcc12b2b80c080a0f21a4eacd0bf8fea9c5105c543be5a1d8c796516875710fafafdf16d16d8ee23a001280915021bb446d1973501a67f93d2b38894a514b976e7b46dc2fe54598d76");
let signed_tx = TransactionSigned::decode(&mut &input[..]).unwrap();
transactions.on_network_tx_event(NetworkTransactionEvent::IncomingTransactions {
peer_id: *handle1.peer_id(),
msg: Transactions(vec![signed_tx.clone()]),
});
assert!(transactions
.transactions_by_peers
.get(&signed_tx.hash())
.unwrap()
.contains(handle1.peer_id()));
poll_fn(|cx| {
let _ = transactions.poll_unpin(cx);
Poll::Ready(())
})
.await;
assert!(!pool.is_empty());
assert!(pool.get(signed_tx.tx_hash()).is_some());
handle.terminate().await;
}
#[tokio::test(flavor = "multi_thread")]
async fn test_on_get_pooled_transactions_network() {
reth_tracing::init_test_tracing();
let net = Testnet::create(2).await;
let mut handles = net.handles();
let handle0 = handles.next().unwrap();
let handle1 = handles.next().unwrap();
drop(handles);
let handle = net.spawn();
let listener0 = handle0.event_listener();
handle0.add_peer(*handle1.peer_id(), handle1.local_addr());
let secret_key = SecretKey::new(&mut rand::thread_rng());
let client = NoopProvider::default();
let pool = testing_pool();
let config = NetworkConfigBuilder::new(secret_key)
.disable_discovery()
.listener_port(0)
.build(client);
let transactions_manager_config = config.transactions_manager_config.clone();
let (network_handle, network, mut transactions, _) = NetworkManager::new(config)
.await
.unwrap()
.into_builder()
.transactions(pool.clone(), transactions_manager_config)
.split_with_handle();
tokio::task::spawn(network);
network_handle.update_sync_state(SyncState::Idle);
assert!(!NetworkInfo::is_syncing(&network_handle));
let mut established = listener0.take(2);
while let Some(ev) = established.next().await {
match ev {
NetworkEvent::ActivePeerSession { .. } |
NetworkEvent::Peer(PeerEvent::SessionEstablished(_)) => {
transactions.on_network_event(ev);
}
NetworkEvent::Peer(PeerEvent::PeerAdded(_peer_id)) => continue,
ev => {
error!("unexpected event {ev:?}")
}
}
}
handle.terminate().await;
let tx = MockTransaction::eip1559();
let _ = transactions
.pool
.add_transaction(reth_transaction_pool::TransactionOrigin::External, tx.clone())
.await;
let request = GetPooledTransactions(vec![*tx.get_hash()]);
let (send, receive) = oneshot::channel::<RequestResult<PooledTransactions>>();
transactions.on_network_tx_event(NetworkTransactionEvent::GetPooledTransactions {
peer_id: *handle1.peer_id(),
request,
response: send,
});
match receive.await.unwrap() {
Ok(PooledTransactions(transactions)) => {
assert_eq!(transactions.len(), 1);
}
Err(e) => {
panic!("error: {e:?}");
}
}
}
#[tokio::test]
async fn test_max_retries_tx_request() {
reth_tracing::init_test_tracing();
let mut tx_manager = new_tx_manager().await.0;
let tx_fetcher = &mut tx_manager.transaction_fetcher;
let peer_id_1 = PeerId::new([1; 64]);
let peer_id_2 = PeerId::new([2; 64]);
let eth_version = EthVersion::Eth66;
let seen_hashes = [B256::from_slice(&[1; 32]), B256::from_slice(&[2; 32])];
let (mut peer_1, mut to_mock_session_rx) = new_mock_session(peer_id_1, eth_version);
peer_1.seen_transactions.insert(seen_hashes[0]);
peer_1.seen_transactions.insert(seen_hashes[1]);
tx_manager.peers.insert(peer_id_1, peer_1);
let retries = 1;
let mut backups = default_cache();
backups.insert(peer_id_1);
let mut backups1 = default_cache();
backups1.insert(peer_id_1);
tx_fetcher
.hashes_fetch_inflight_and_pending_fetch
.insert(seen_hashes[1], TxFetchMetadata::new(retries, backups, None));
tx_fetcher
.hashes_fetch_inflight_and_pending_fetch
.insert(seen_hashes[0], TxFetchMetadata::new(retries, backups1, None));
tx_fetcher.hashes_pending_fetch.insert(seen_hashes[1]);
tx_fetcher.hashes_pending_fetch.insert(seen_hashes[0]);
assert!(tx_fetcher.is_idle(&peer_id_1));
assert_eq!(tx_fetcher.active_peers.len(), 0);
tx_fetcher.on_fetch_pending_hashes(&tx_manager.peers, |_| true);
let tx_fetcher = &mut tx_manager.transaction_fetcher;
assert!(tx_fetcher.hashes_pending_fetch.is_empty());
assert!(!tx_fetcher.is_idle(&peer_id_1));
assert_eq!(tx_fetcher.active_peers.len(), 1);
let req = to_mock_session_rx
.recv()
.await
.expect("peer_1 session should receive request with buffered hashes");
let PeerRequest::GetPooledTransactions { request, response } = req else { unreachable!() };
let GetPooledTransactions(hashes) = request;
let hashes = hashes.into_iter().collect::<HashSet<_>>();
assert_eq!(hashes, seen_hashes.into_iter().collect::<HashSet<_>>());
response
.send(Err(RequestError::BadResponse))
.expect("should send peer_1 response to tx manager");
let Some(FetchEvent::FetchError { peer_id, .. }) = tx_fetcher.next().await else {
unreachable!()
};
assert!(tx_fetcher.is_idle(&peer_id));
assert_eq!(tx_fetcher.active_peers.len(), 0);
assert_eq!(tx_fetcher.hashes_pending_fetch.len(), 2);
let (peer_2, mut to_mock_session_rx) = new_mock_session(peer_id_2, eth_version);
tx_manager.peers.insert(peer_id_2, peer_2);
let msg =
NewPooledTransactionHashes::Eth66(NewPooledTransactionHashes66(seen_hashes.to_vec()));
tx_manager.on_new_pooled_transaction_hashes(peer_id_2, msg);
let tx_fetcher = &mut tx_manager.transaction_fetcher;
assert_eq!(tx_fetcher.active_peers.len(), 1);
assert_eq!(tx_fetcher.hashes_fetch_inflight_and_pending_fetch.len(), 2);
assert!(tx_fetcher.hashes_pending_fetch.is_empty());
let req = to_mock_session_rx
.recv()
.await
.expect("peer_2 session should receive request with buffered hashes");
let PeerRequest::GetPooledTransactions { response, .. } = req else { unreachable!() };
response
.send(Err(RequestError::BadResponse))
.expect("should send peer_2 response to tx manager");
let Some(FetchEvent::FetchError { .. }) = tx_fetcher.next().await else { unreachable!() };
assert!(tx_fetcher.hashes_pending_fetch.is_empty());
assert_eq!(tx_fetcher.active_peers.len(), 0);
}
#[test]
fn test_transaction_builder_empty() {
let mut builder =
PropagateTransactionsBuilder::<TransactionSigned>::pooled(EthVersion::Eth68);
assert!(builder.is_empty());
let mut factory = MockTransactionFactory::default();
let tx = PropagateTransaction::pool_tx(Arc::new(factory.create_eip1559()));
builder.push(&tx);
assert!(!builder.is_empty());
let txs = builder.build();
assert!(txs.full.is_none());
let txs = txs.pooled.unwrap();
assert_eq!(txs.len(), 1);
}
#[test]
fn test_transaction_builder_large() {
let mut builder =
PropagateTransactionsBuilder::<TransactionSigned>::full(EthVersion::Eth68);
assert!(builder.is_empty());
let mut factory = MockTransactionFactory::default();
let mut tx = factory.create_eip1559();
tx.transaction.set_size(DEFAULT_SOFT_LIMIT_BYTE_SIZE_TRANSACTIONS_BROADCAST_MESSAGE + 1);
let tx = Arc::new(tx);
let tx = PropagateTransaction::pool_tx(tx);
builder.push(&tx);
assert!(!builder.is_empty());
let txs = builder.clone().build();
assert!(txs.pooled.is_none());
let txs = txs.full.unwrap();
assert_eq!(txs.len(), 1);
builder.push(&tx);
let txs = builder.clone().build();
let pooled = txs.pooled.unwrap();
assert_eq!(pooled.len(), 1);
let txs = txs.full.unwrap();
assert_eq!(txs.len(), 1);
}
#[test]
fn test_transaction_builder_eip4844() {
let mut builder =
PropagateTransactionsBuilder::<TransactionSigned>::full(EthVersion::Eth68);
assert!(builder.is_empty());
let mut factory = MockTransactionFactory::default();
let tx = PropagateTransaction::pool_tx(Arc::new(factory.create_eip4844()));
builder.push(&tx);
assert!(!builder.is_empty());
let txs = builder.clone().build();
assert!(txs.full.is_none());
let txs = txs.pooled.unwrap();
assert_eq!(txs.len(), 1);
let tx = PropagateTransaction::pool_tx(Arc::new(factory.create_eip1559()));
builder.push(&tx);
let txs = builder.clone().build();
let pooled = txs.pooled.unwrap();
assert_eq!(pooled.len(), 1);
let txs = txs.full.unwrap();
assert_eq!(txs.len(), 1);
}
#[tokio::test]
async fn test_propagate_full() {
reth_tracing::init_test_tracing();
let (mut tx_manager, network) = new_tx_manager().await;
let peer_id = PeerId::random();
network.handle().update_sync_state(SyncState::Idle);
let (tx, _rx) = mpsc::channel::<PeerRequest>(1);
let session_info = SessionInfo {
peer_id,
remote_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0),
client_version: Arc::from(""),
capabilities: Arc::new(vec![].into()),
status: Arc::new(Default::default()),
version: EthVersion::Eth68,
};
let messages: PeerRequestSender<PeerRequest> = PeerRequestSender::new(peer_id, tx);
tx_manager
.on_network_event(NetworkEvent::ActivePeerSession { info: session_info, messages });
let mut propagate = vec![];
let mut factory = MockTransactionFactory::default();
let eip1559_tx = Arc::new(factory.create_eip1559());
propagate.push(PropagateTransaction::pool_tx(eip1559_tx.clone()));
let eip4844_tx = Arc::new(factory.create_eip4844());
propagate.push(PropagateTransaction::pool_tx(eip4844_tx.clone()));
let propagated =
tx_manager.propagate_transactions(propagate.clone(), PropagationMode::Basic);
assert_eq!(propagated.0.len(), 2);
let prop_txs = propagated.0.get(eip1559_tx.transaction.hash()).unwrap();
assert_eq!(prop_txs.len(), 1);
assert!(prop_txs[0].is_full());
let prop_txs = propagated.0.get(eip4844_tx.transaction.hash()).unwrap();
assert_eq!(prop_txs.len(), 1);
assert!(prop_txs[0].is_hash());
let peer = tx_manager.peers.get(&peer_id).unwrap();
assert!(peer.seen_transactions.contains(eip1559_tx.transaction.hash()));
assert!(peer.seen_transactions.contains(eip1559_tx.transaction.hash()));
peer.seen_transactions.contains(eip4844_tx.transaction.hash());
let propagated = tx_manager.propagate_transactions(propagate, PropagationMode::Basic);
assert!(propagated.0.is_empty());
}
}