reth_transaction_pool/pool/mod.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051 1052 1053 1054 1055 1056 1057 1058 1059 1060 1061 1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 1079 1080 1081 1082 1083 1084 1085 1086 1087 1088 1089 1090 1091 1092 1093 1094 1095 1096 1097 1098 1099 1100 1101 1102 1103 1104 1105 1106 1107 1108 1109 1110 1111 1112 1113 1114 1115 1116 1117 1118 1119 1120 1121 1122 1123 1124 1125 1126 1127 1128 1129 1130 1131 1132 1133 1134 1135 1136 1137 1138 1139 1140 1141 1142 1143 1144 1145 1146 1147 1148 1149 1150 1151 1152 1153 1154 1155 1156 1157 1158 1159 1160 1161 1162 1163 1164 1165 1166 1167 1168 1169 1170 1171 1172 1173 1174 1175 1176 1177 1178 1179 1180 1181 1182 1183 1184 1185 1186 1187 1188 1189 1190 1191 1192 1193 1194 1195 1196 1197 1198 1199 1200 1201 1202 1203 1204 1205 1206 1207 1208 1209 1210 1211 1212 1213 1214 1215 1216 1217 1218 1219 1220 1221 1222 1223 1224 1225 1226 1227 1228 1229 1230 1231 1232 1233 1234 1235 1236 1237 1238 1239 1240 1241 1242 1243 1244 1245 1246 1247 1248 1249 1250 1251 1252 1253 1254 1255 1256 1257 1258 1259 1260 1261 1262 1263 1264 1265 1266 1267 1268 1269 1270 1271 1272 1273 1274 1275 1276 1277 1278 1279 1280 1281 1282 1283 1284 1285 1286 1287 1288 1289 1290 1291 1292 1293 1294 1295 1296 1297 1298 1299 1300 1301 1302 1303 1304 1305 1306 1307 1308 1309 1310 1311 1312 1313 1314 1315 1316 1317 1318 1319 1320 1321 1322 1323 1324 1325 1326 1327 1328 1329 1330 1331 1332 1333 1334 1335 1336 1337 1338 1339 1340 1341 1342 1343 1344 1345 1346 1347 1348 1349
//! Transaction Pool internals.
//!
//! Incoming transactions are validated before they enter the pool first. The validation outcome can
//! have 3 states:
//!
//! 1. Transaction can _never_ be valid
//! 2. Transaction is _currently_ valid
//! 3. Transaction is _currently_ invalid, but could potentially become valid in the future
//!
//! However, (2.) and (3.) of a transaction can only be determined on the basis of the current
//! state, whereas (1.) holds indefinitely. This means once the state changes (2.) and (3.) the
//! state of a transaction needs to be reevaluated again.
//!
//! The transaction pool is responsible for storing new, valid transactions and providing the next
//! best transactions sorted by their priority. Where priority is determined by the transaction's
//! score ([`TransactionOrdering`]).
//!
//! Furthermore, the following characteristics fall under (3.):
//!
//! a) Nonce of a transaction is higher than the expected nonce for the next transaction of its
//! sender. A distinction is made here whether multiple transactions from the same sender have
//! gapless nonce increments.
//!
//! a)(1) If _no_ transaction is missing in a chain of multiple
//! transactions from the same sender (all nonce in row), all of them can in principle be executed
//! on the current state one after the other.
//!
//! a)(2) If there's a nonce gap, then all
//! transactions after the missing transaction are blocked until the missing transaction arrives.
//!
//! b) Transaction does not meet the dynamic fee cap requirement introduced by EIP-1559: The
//! fee cap of the transaction needs to be no less than the base fee of block.
//!
//!
//! In essence the transaction pool is made of three separate sub-pools:
//!
//! - Pending Pool: Contains all transactions that are valid on the current state and satisfy (3.
//! a)(1): _No_ nonce gaps. A _pending_ transaction is considered _ready_ when it has the lowest
//! nonce of all transactions from the same sender. Once a _ready_ transaction with nonce `n` has
//! been executed, the next highest transaction from the same sender `n + 1` becomes ready.
//!
//! - Queued Pool: Contains all transactions that are currently blocked by missing transactions:
//! (3. a)(2): _With_ nonce gaps or due to lack of funds.
//!
//! - Basefee Pool: To account for the dynamic base fee requirement (3. b) which could render an
//! EIP-1559 and all subsequent transactions of the sender currently invalid.
//!
//! The classification of transactions is always dependent on the current state that is changed as
//! soon as a new block is mined. Once a new block is mined, the account changeset must be applied
//! to the transaction pool.
//!
//!
//! Depending on the use case, consumers of the [`TransactionPool`](crate::traits::TransactionPool)
//! are interested in (2.) and/or (3.).
//! A generic [`TransactionPool`](crate::traits::TransactionPool) that only handles transactions.
//!
//! This Pool maintains two separate sub-pools for (2.) and (3.)
//!
//! ## Terminology
//!
//! - _Pending_: pending transactions are transactions that fall under (2.). These transactions can
//! currently be executed and are stored in the pending sub-pool
//! - _Queued_: queued transactions are transactions that fall under category (3.). Those
//! transactions are _currently_ waiting for state changes that eventually move them into
//! category (2.) and become pending.
use crate::{
error::{PoolError, PoolErrorKind, PoolResult},
identifier::{SenderId, SenderIdentifiers, TransactionId},
pool::{
listener::PoolEventBroadcast,
state::SubPool,
txpool::{SenderInfo, TxPool},
},
traits::{
AllPoolTransactions, BestTransactionsAttributes, BlockInfo, NewTransactionEvent, PoolSize,
PoolTransaction, PropagatedTransactions, TransactionOrigin,
},
validate::{TransactionValidationOutcome, ValidPoolTransaction},
CanonicalStateUpdate, EthPoolTransaction, PoolConfig, TransactionOrdering,
TransactionValidator,
};
use alloy_primitives::{Address, TxHash, B256};
use best::BestTransactions;
use parking_lot::{Mutex, RwLock, RwLockReadGuard, RwLockWriteGuard};
use reth_eth_wire_types::HandleMempoolData;
use reth_execution_types::ChangedAccount;
use alloy_eips::eip4844::BlobTransactionSidecar;
use reth_primitives::RecoveredTx;
use rustc_hash::FxHashMap;
use std::{collections::HashSet, fmt, sync::Arc, time::Instant};
use tokio::sync::mpsc;
use tracing::{debug, trace, warn};
mod events;
use crate::{
blobstore::BlobStore,
metrics::BlobStoreMetrics,
pool::txpool::UpdateOutcome,
traits::{GetPooledTransactionLimit, NewBlobSidecar, TransactionListenerKind},
validate::ValidTransaction,
};
pub use best::{
BestPayloadTransactions, BestTransactionFilter, BestTransactionsWithPrioritizedSenders,
};
pub use blob::{blob_tx_priority, fee_delta};
pub use events::{FullTransactionEvent, TransactionEvent};
pub use listener::{AllTransactionsEvents, TransactionEvents};
pub use parked::{BasefeeOrd, ParkedOrd, ParkedPool, QueuedOrd};
pub use pending::PendingPool;
use reth_primitives_traits::{BlockBody, BlockHeader};
mod best;
mod blob;
mod listener;
mod parked;
pub(crate) mod pending;
pub(crate) mod size;
pub(crate) mod state;
pub mod txpool;
mod update;
/// Bound on number of pending transactions from `reth_network::TransactionsManager` to buffer.
pub const PENDING_TX_LISTENER_BUFFER_SIZE: usize = 2048;
/// Bound on number of new transactions from `reth_network::TransactionsManager` to buffer.
pub const NEW_TX_LISTENER_BUFFER_SIZE: usize = 1024;
const BLOB_SIDECAR_LISTENER_BUFFER_SIZE: usize = 512;
/// Transaction pool internals.
pub struct PoolInner<V, T, S>
where
T: TransactionOrdering,
{
/// Internal mapping of addresses to plain ints.
identifiers: RwLock<SenderIdentifiers>,
/// Transaction validation.
validator: V,
/// Storage for blob transactions
blob_store: S,
/// The internal pool that manages all transactions.
pool: RwLock<TxPool<T>>,
/// Pool settings.
config: PoolConfig,
/// Manages listeners for transaction state change events.
event_listener: RwLock<PoolEventBroadcast<T::Transaction>>,
/// Listeners for new _full_ pending transactions.
pending_transaction_listener: Mutex<Vec<PendingTransactionHashListener>>,
/// Listeners for new transactions added to the pool.
transaction_listener: Mutex<Vec<TransactionListener<T::Transaction>>>,
/// Listener for new blob transaction sidecars added to the pool.
blob_transaction_sidecar_listener: Mutex<Vec<BlobTransactionSidecarListener>>,
/// Metrics for the blob store
blob_store_metrics: BlobStoreMetrics,
}
// === impl PoolInner ===
impl<V, T, S> PoolInner<V, T, S>
where
V: TransactionValidator,
T: TransactionOrdering<Transaction = <V as TransactionValidator>::Transaction>,
S: BlobStore,
{
/// Create a new transaction pool instance.
pub fn new(validator: V, ordering: T, blob_store: S, config: PoolConfig) -> Self {
Self {
identifiers: Default::default(),
validator,
event_listener: Default::default(),
pool: RwLock::new(TxPool::new(ordering, config.clone())),
pending_transaction_listener: Default::default(),
transaction_listener: Default::default(),
blob_transaction_sidecar_listener: Default::default(),
config,
blob_store,
blob_store_metrics: Default::default(),
}
}
/// Returns the configured blob store.
pub const fn blob_store(&self) -> &S {
&self.blob_store
}
/// Returns stats about the size of the pool.
pub fn size(&self) -> PoolSize {
self.get_pool_data().size()
}
/// Returns the currently tracked block
pub fn block_info(&self) -> BlockInfo {
self.get_pool_data().block_info()
}
/// Sets the currently tracked block
pub fn set_block_info(&self, info: BlockInfo) {
self.pool.write().set_block_info(info)
}
/// Returns the internal [`SenderId`] for this address
pub fn get_sender_id(&self, addr: Address) -> SenderId {
self.identifiers.write().sender_id_or_create(addr)
}
/// Returns all senders in the pool
pub fn unique_senders(&self) -> HashSet<Address> {
self.get_pool_data().unique_senders()
}
/// Converts the changed accounts to a map of sender ids to sender info (internal identifier
/// used for accounts)
fn changed_senders(
&self,
accs: impl Iterator<Item = ChangedAccount>,
) -> FxHashMap<SenderId, SenderInfo> {
let mut identifiers = self.identifiers.write();
accs.into_iter()
.map(|acc| {
let ChangedAccount { address, nonce, balance } = acc;
let sender_id = identifiers.sender_id_or_create(address);
(sender_id, SenderInfo { state_nonce: nonce, balance })
})
.collect()
}
/// Get the config the pool was configured with.
pub const fn config(&self) -> &PoolConfig {
&self.config
}
/// Get the validator reference.
pub const fn validator(&self) -> &V {
&self.validator
}
/// Adds a new transaction listener to the pool that gets notified about every new _pending_
/// transaction inserted into the pool
pub fn add_pending_listener(&self, kind: TransactionListenerKind) -> mpsc::Receiver<TxHash> {
let (sender, rx) = mpsc::channel(self.config.pending_tx_listener_buffer_size);
let listener = PendingTransactionHashListener { sender, kind };
self.pending_transaction_listener.lock().push(listener);
rx
}
/// Adds a new transaction listener to the pool that gets notified about every new transaction.
pub fn add_new_transaction_listener(
&self,
kind: TransactionListenerKind,
) -> mpsc::Receiver<NewTransactionEvent<T::Transaction>> {
let (sender, rx) = mpsc::channel(self.config.new_tx_listener_buffer_size);
let listener = TransactionListener { sender, kind };
self.transaction_listener.lock().push(listener);
rx
}
/// Adds a new blob sidecar listener to the pool that gets notified about every new
/// eip4844 transaction's blob sidecar.
pub fn add_blob_sidecar_listener(&self) -> mpsc::Receiver<NewBlobSidecar> {
let (sender, rx) = mpsc::channel(BLOB_SIDECAR_LISTENER_BUFFER_SIZE);
let listener = BlobTransactionSidecarListener { sender };
self.blob_transaction_sidecar_listener.lock().push(listener);
rx
}
/// If the pool contains the transaction, this adds a new listener that gets notified about
/// transaction events.
pub fn add_transaction_event_listener(&self, tx_hash: TxHash) -> Option<TransactionEvents> {
self.get_pool_data()
.contains(&tx_hash)
.then(|| self.event_listener.write().subscribe(tx_hash))
}
/// Adds a listener for all transaction events.
pub fn add_all_transactions_event_listener(&self) -> AllTransactionsEvents<T::Transaction> {
self.event_listener.write().subscribe_all()
}
/// Returns a read lock to the pool's data.
pub fn get_pool_data(&self) -> RwLockReadGuard<'_, TxPool<T>> {
self.pool.read()
}
/// Returns hashes of _all_ transactions in the pool.
pub fn pooled_transactions_hashes(&self) -> Vec<TxHash> {
self.get_pool_data()
.all()
.transactions_iter()
.filter(|tx| tx.propagate)
.map(|tx| *tx.hash())
.collect()
}
/// Returns _all_ transactions in the pool.
pub fn pooled_transactions(&self) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
self.get_pool_data().all().transactions_iter().filter(|tx| tx.propagate).cloned().collect()
}
/// Returns only the first `max` transactions in the pool.
pub fn pooled_transactions_max(
&self,
max: usize,
) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
self.get_pool_data()
.all()
.transactions_iter()
.filter(|tx| tx.propagate)
.take(max)
.cloned()
.collect()
}
/// Converts the internally tracked transaction to the pooled format.
///
/// If the transaction is an EIP-4844 transaction, the blob sidecar is fetched from the blob
/// store and attached to the transaction.
fn to_pooled_transaction(
&self,
transaction: Arc<ValidPoolTransaction<T::Transaction>>,
) -> Option<RecoveredTx<<<V as TransactionValidator>::Transaction as PoolTransaction>::Pooled>>
where
<V as TransactionValidator>::Transaction: EthPoolTransaction,
{
if transaction.is_eip4844() {
let sidecar = self.blob_store.get(*transaction.hash()).ok()??;
transaction.transaction.clone().try_into_pooled_eip4844(sidecar)
} else {
transaction
.transaction
.clone()
.try_into_pooled()
.inspect_err(|err| {
debug!(
target: "txpool", %err,
"failed to convert transaction to pooled element; skipping",
);
})
.ok()
}
}
/// Returns pooled transactions for the given transaction hashes.
pub fn get_pooled_transaction_elements(
&self,
tx_hashes: Vec<TxHash>,
limit: GetPooledTransactionLimit,
) -> Vec<<<V as TransactionValidator>::Transaction as PoolTransaction>::Pooled>
where
<V as TransactionValidator>::Transaction: EthPoolTransaction,
{
let transactions = self.get_all(tx_hashes);
let mut elements = Vec::with_capacity(transactions.len());
let mut size = 0;
for transaction in transactions {
let encoded_len = transaction.encoded_length();
let Some(pooled) = self.to_pooled_transaction(transaction) else {
continue;
};
size += encoded_len;
elements.push(pooled.into_signed());
if limit.exceeds(size) {
break
}
}
elements
}
/// Returns converted pooled transaction for the given transaction hash.
pub fn get_pooled_transaction_element(
&self,
tx_hash: TxHash,
) -> Option<RecoveredTx<<<V as TransactionValidator>::Transaction as PoolTransaction>::Pooled>>
where
<V as TransactionValidator>::Transaction: EthPoolTransaction,
{
self.get(&tx_hash).and_then(|tx| self.to_pooled_transaction(tx))
}
/// Updates the entire pool after a new block was executed.
pub fn on_canonical_state_change<H, B>(&self, update: CanonicalStateUpdate<'_, H, B>)
where
H: BlockHeader,
B: BlockBody,
{
trace!(target: "txpool", ?update, "updating pool on canonical state change");
let block_info = update.block_info();
let CanonicalStateUpdate {
new_tip, changed_accounts, mined_transactions, update_kind, ..
} = update;
self.validator.on_new_head_block(new_tip);
let changed_senders = self.changed_senders(changed_accounts.into_iter());
// update the pool
let outcome = self.pool.write().on_canonical_state_change(
block_info,
mined_transactions,
changed_senders,
update_kind,
);
// This will discard outdated transactions based on the account's nonce
self.delete_discarded_blobs(outcome.discarded.iter());
// notify listeners about updates
self.notify_on_new_state(outcome);
}
/// Performs account updates on the pool.
///
/// This will either promote or discard transactions based on the new account state.
pub fn update_accounts(&self, accounts: Vec<ChangedAccount>) {
let changed_senders = self.changed_senders(accounts.into_iter());
let UpdateOutcome { promoted, discarded } =
self.pool.write().update_accounts(changed_senders);
let mut listener = self.event_listener.write();
promoted.iter().for_each(|tx| listener.pending(tx.hash(), None));
discarded.iter().for_each(|tx| listener.discarded(tx.hash()));
// This deletes outdated blob txs from the blob store, based on the account's nonce. This is
// called during txpool maintenance when the pool drifted.
self.delete_discarded_blobs(discarded.iter());
}
/// Add a single validated transaction into the pool.
///
/// Note: this is only used internally by [`Self::add_transactions()`], all new transaction(s)
/// come in through that function, either as a batch or `std::iter::once`.
fn add_transaction(
&self,
pool: &mut RwLockWriteGuard<'_, TxPool<T>>,
origin: TransactionOrigin,
tx: TransactionValidationOutcome<T::Transaction>,
) -> PoolResult<TxHash> {
match tx {
TransactionValidationOutcome::Valid {
balance,
state_nonce,
transaction,
propagate,
} => {
let sender_id = self.get_sender_id(transaction.sender());
let transaction_id = TransactionId::new(sender_id, transaction.nonce());
// split the valid transaction and the blob sidecar if it has any
let (transaction, maybe_sidecar) = match transaction {
ValidTransaction::Valid(tx) => (tx, None),
ValidTransaction::ValidWithSidecar { transaction, sidecar } => {
debug_assert!(
transaction.is_eip4844(),
"validator returned sidecar for non EIP-4844 transaction"
);
(transaction, Some(sidecar))
}
};
let tx = ValidPoolTransaction {
transaction,
transaction_id,
propagate,
timestamp: Instant::now(),
origin,
};
let added = pool.add_transaction(tx, balance, state_nonce)?;
let hash = *added.hash();
// transaction was successfully inserted into the pool
if let Some(sidecar) = maybe_sidecar {
// notify blob sidecar listeners
self.on_new_blob_sidecar(&hash, &sidecar);
// store the sidecar in the blob store
self.insert_blob(hash, sidecar);
}
if let Some(replaced) = added.replaced_blob_transaction() {
debug!(target: "txpool", "[{:?}] delete replaced blob sidecar", replaced);
// delete the replaced transaction from the blob store
self.delete_blob(replaced);
}
// Notify about new pending transactions
if let Some(pending) = added.as_pending() {
self.on_new_pending_transaction(pending);
}
// Notify tx event listeners
self.notify_event_listeners(&added);
if let Some(discarded) = added.discarded_transactions() {
self.delete_discarded_blobs(discarded.iter());
}
// Notify listeners for _all_ transactions
self.on_new_transaction(added.into_new_transaction_event());
Ok(hash)
}
TransactionValidationOutcome::Invalid(tx, err) => {
let mut listener = self.event_listener.write();
listener.discarded(tx.hash());
Err(PoolError::new(*tx.hash(), err))
}
TransactionValidationOutcome::Error(tx_hash, err) => {
let mut listener = self.event_listener.write();
listener.discarded(&tx_hash);
Err(PoolError::other(tx_hash, err))
}
}
}
/// Adds a transaction and returns the event stream.
pub fn add_transaction_and_subscribe(
&self,
origin: TransactionOrigin,
tx: TransactionValidationOutcome<T::Transaction>,
) -> PoolResult<TransactionEvents> {
let listener = {
let mut listener = self.event_listener.write();
listener.subscribe(tx.tx_hash())
};
let mut results = self.add_transactions(origin, std::iter::once(tx));
results.pop().expect("result length is the same as the input")?;
Ok(listener)
}
/// Adds all transactions in the iterator to the pool, returning a list of results.
///
/// Note: A large batch may lock the pool for a long time that blocks important operations
/// like updating the pool on canonical state changes. The caller should consider having
/// a max batch size to balance transaction insertions with other updates.
pub fn add_transactions(
&self,
origin: TransactionOrigin,
transactions: impl IntoIterator<Item = TransactionValidationOutcome<T::Transaction>>,
) -> Vec<PoolResult<TxHash>> {
// Add the transactions and enforce the pool size limits in one write lock
let (mut added, discarded) = {
let mut pool = self.pool.write();
let added = transactions
.into_iter()
.map(|tx| self.add_transaction(&mut pool, origin, tx))
.collect::<Vec<_>>();
// Enforce the pool size limits if at least one transaction was added successfully
let discarded = if added.iter().any(Result::is_ok) {
pool.discard_worst()
} else {
Default::default()
};
(added, discarded)
};
if !discarded.is_empty() {
// Delete any blobs associated with discarded blob transactions
self.delete_discarded_blobs(discarded.iter());
let discarded_hashes =
discarded.into_iter().map(|tx| *tx.hash()).collect::<HashSet<_>>();
{
let mut listener = self.event_listener.write();
discarded_hashes.iter().for_each(|hash| listener.discarded(hash));
}
// A newly added transaction may be immediately discarded, so we need to
// adjust the result here
for res in &mut added {
if let Ok(hash) = res {
if discarded_hashes.contains(hash) {
*res = Err(PoolError::new(*hash, PoolErrorKind::DiscardedOnInsert))
}
}
}
}
added
}
/// Notify all listeners about a new pending transaction.
fn on_new_pending_transaction(&self, pending: &AddedPendingTransaction<T::Transaction>) {
let propagate_allowed = pending.is_propagate_allowed();
let mut transaction_listeners = self.pending_transaction_listener.lock();
transaction_listeners.retain_mut(|listener| {
if listener.kind.is_propagate_only() && !propagate_allowed {
// only emit this hash to listeners that are only allowed to receive propagate only
// transactions, such as network
return !listener.sender.is_closed()
}
// broadcast all pending transactions to the listener
listener.send_all(pending.pending_transactions(listener.kind))
});
}
/// Notify all listeners about a newly inserted pending transaction.
fn on_new_transaction(&self, event: NewTransactionEvent<T::Transaction>) {
let mut transaction_listeners = self.transaction_listener.lock();
transaction_listeners.retain_mut(|listener| {
if listener.kind.is_propagate_only() && !event.transaction.propagate {
// only emit this hash to listeners that are only allowed to receive propagate only
// transactions, such as network
return !listener.sender.is_closed()
}
listener.send(event.clone())
});
}
/// Notify all listeners about a blob sidecar for a newly inserted blob (eip4844) transaction.
fn on_new_blob_sidecar(&self, tx_hash: &TxHash, sidecar: &BlobTransactionSidecar) {
let mut sidecar_listeners = self.blob_transaction_sidecar_listener.lock();
if sidecar_listeners.is_empty() {
return
}
let sidecar = Arc::new(sidecar.clone());
sidecar_listeners.retain_mut(|listener| {
let new_blob_event = NewBlobSidecar { tx_hash: *tx_hash, sidecar: sidecar.clone() };
match listener.sender.try_send(new_blob_event) {
Ok(()) => true,
Err(err) => {
if matches!(err, mpsc::error::TrySendError::Full(_)) {
debug!(
target: "txpool",
"[{:?}] failed to send blob sidecar; channel full",
sidecar,
);
true
} else {
false
}
}
}
})
}
/// Notifies transaction listeners about changes once a block was processed.
fn notify_on_new_state(&self, outcome: OnNewCanonicalStateOutcome<T::Transaction>) {
trace!(target: "txpool", promoted=outcome.promoted.len(), discarded= outcome.discarded.len() ,"notifying listeners on state change");
// notify about promoted pending transactions
// emit hashes
self.pending_transaction_listener
.lock()
.retain_mut(|listener| listener.send_all(outcome.pending_transactions(listener.kind)));
// emit full transactions
self.transaction_listener.lock().retain_mut(|listener| {
listener.send_all(outcome.full_pending_transactions(listener.kind))
});
let OnNewCanonicalStateOutcome { mined, promoted, discarded, block_hash } = outcome;
// broadcast specific transaction events
let mut listener = self.event_listener.write();
mined.iter().for_each(|tx| listener.mined(tx, block_hash));
promoted.iter().for_each(|tx| listener.pending(tx.hash(), None));
discarded.iter().for_each(|tx| listener.discarded(tx.hash()));
}
/// Fire events for the newly added transaction if there are any.
fn notify_event_listeners(&self, tx: &AddedTransaction<T::Transaction>) {
let mut listener = self.event_listener.write();
match tx {
AddedTransaction::Pending(tx) => {
let AddedPendingTransaction { transaction, promoted, discarded, replaced } = tx;
listener.pending(transaction.hash(), replaced.clone());
promoted.iter().for_each(|tx| listener.pending(tx.hash(), None));
discarded.iter().for_each(|tx| listener.discarded(tx.hash()));
}
AddedTransaction::Parked { transaction, replaced, .. } => {
listener.queued(transaction.hash());
if let Some(replaced) = replaced {
listener.replaced(replaced.clone(), *transaction.hash());
}
}
}
}
/// Returns an iterator that yields transactions that are ready to be included in the block.
pub fn best_transactions(&self) -> BestTransactions<T> {
self.get_pool_data().best_transactions()
}
/// Returns an iterator that yields transactions that are ready to be included in the block with
/// the given base fee and optional blob fee attributes.
pub fn best_transactions_with_attributes(
&self,
best_transactions_attributes: BestTransactionsAttributes,
) -> Box<dyn crate::traits::BestTransactions<Item = Arc<ValidPoolTransaction<T::Transaction>>>>
{
self.get_pool_data().best_transactions_with_attributes(best_transactions_attributes)
}
/// Returns only the first `max` transactions in the pending pool.
pub fn pending_transactions_max(
&self,
max: usize,
) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
self.get_pool_data().pending_transactions_iter().take(max).collect()
}
/// Returns all transactions from the pending sub-pool
pub fn pending_transactions(&self) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
self.get_pool_data().pending_transactions()
}
/// Returns all transactions from parked pools
pub fn queued_transactions(&self) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
self.get_pool_data().queued_transactions()
}
/// Returns all transactions in the pool
pub fn all_transactions(&self) -> AllPoolTransactions<T::Transaction> {
let pool = self.get_pool_data();
AllPoolTransactions {
pending: pool.pending_transactions(),
queued: pool.queued_transactions(),
}
}
/// Removes and returns all matching transactions from the pool.
pub fn remove_transactions(
&self,
hashes: Vec<TxHash>,
) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
if hashes.is_empty() {
return Vec::new()
}
let removed = self.pool.write().remove_transactions(hashes);
let mut listener = self.event_listener.write();
removed.iter().for_each(|tx| listener.discarded(tx.hash()));
removed
}
/// Removes and returns all matching transactions and their dependent transactions from the
/// pool.
pub fn remove_transactions_and_descendants(
&self,
hashes: Vec<TxHash>,
) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
if hashes.is_empty() {
return Vec::new()
}
let removed = self.pool.write().remove_transactions_and_descendants(hashes);
let mut listener = self.event_listener.write();
removed.iter().for_each(|tx| listener.discarded(tx.hash()));
removed
}
/// Removes and returns all transactions by the specified sender from the pool.
pub fn remove_transactions_by_sender(
&self,
sender: Address,
) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
let sender_id = self.get_sender_id(sender);
let removed = self.pool.write().remove_transactions_by_sender(sender_id);
let mut listener = self.event_listener.write();
removed.iter().for_each(|tx| listener.discarded(tx.hash()));
removed
}
/// Removes and returns all transactions that are present in the pool.
pub fn retain_unknown<A>(&self, announcement: &mut A)
where
A: HandleMempoolData,
{
if announcement.is_empty() {
return
}
let pool = self.get_pool_data();
announcement.retain_by_hash(|tx| !pool.contains(tx))
}
/// Returns the transaction by hash.
pub fn get(&self, tx_hash: &TxHash) -> Option<Arc<ValidPoolTransaction<T::Transaction>>> {
self.get_pool_data().get(tx_hash)
}
/// Returns all transactions of the address
pub fn get_transactions_by_sender(
&self,
sender: Address,
) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
let sender_id = self.get_sender_id(sender);
self.get_pool_data().get_transactions_by_sender(sender_id)
}
/// Returns all queued transactions of the address by sender
pub fn get_queued_transactions_by_sender(
&self,
sender: Address,
) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
let sender_id = self.get_sender_id(sender);
self.get_pool_data().pending_txs_by_sender(sender_id)
}
/// Returns all pending transactions filtered by predicate
pub fn pending_transactions_with_predicate(
&self,
predicate: impl FnMut(&ValidPoolTransaction<T::Transaction>) -> bool,
) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
self.get_pool_data().pending_transactions_with_predicate(predicate)
}
/// Returns all pending transactions of the address by sender
pub fn get_pending_transactions_by_sender(
&self,
sender: Address,
) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
let sender_id = self.get_sender_id(sender);
self.get_pool_data().queued_txs_by_sender(sender_id)
}
/// Returns the highest transaction of the address
pub fn get_highest_transaction_by_sender(
&self,
sender: Address,
) -> Option<Arc<ValidPoolTransaction<T::Transaction>>> {
let sender_id = self.get_sender_id(sender);
self.get_pool_data().get_highest_transaction_by_sender(sender_id)
}
/// Returns the transaction with the highest nonce that is executable given the on chain nonce.
pub fn get_highest_consecutive_transaction_by_sender(
&self,
sender: Address,
on_chain_nonce: u64,
) -> Option<Arc<ValidPoolTransaction<T::Transaction>>> {
let sender_id = self.get_sender_id(sender);
self.get_pool_data().get_highest_consecutive_transaction_by_sender(
sender_id.into_transaction_id(on_chain_nonce),
)
}
/// Returns the transaction given a [`TransactionId`]
pub fn get_transaction_by_transaction_id(
&self,
transaction_id: &TransactionId,
) -> Option<Arc<ValidPoolTransaction<T::Transaction>>> {
self.get_pool_data().all().get(transaction_id).map(|tx| tx.transaction.clone())
}
/// Returns all transactions that where submitted with the given [`TransactionOrigin`]
pub fn get_transactions_by_origin(
&self,
origin: TransactionOrigin,
) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
self.get_pool_data()
.all()
.transactions_iter()
.filter(|tx| tx.origin == origin)
.cloned()
.collect()
}
/// Returns all pending transactions filted by [`TransactionOrigin`]
pub fn get_pending_transactions_by_origin(
&self,
origin: TransactionOrigin,
) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
self.get_pool_data().pending_transactions_iter().filter(|tx| tx.origin == origin).collect()
}
/// Returns all the transactions belonging to the hashes.
///
/// If no transaction exists, it is skipped.
pub fn get_all(&self, txs: Vec<TxHash>) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
if txs.is_empty() {
return Vec::new()
}
self.get_pool_data().get_all(txs).collect()
}
/// Notify about propagated transactions.
pub fn on_propagated(&self, txs: PropagatedTransactions) {
if txs.0.is_empty() {
return
}
let mut listener = self.event_listener.write();
txs.0.into_iter().for_each(|(hash, peers)| listener.propagated(&hash, peers))
}
/// Number of transactions in the entire pool
pub fn len(&self) -> usize {
self.get_pool_data().len()
}
/// Whether the pool is empty
pub fn is_empty(&self) -> bool {
self.get_pool_data().is_empty()
}
/// Returns whether or not the pool is over its configured size and transaction count limits.
pub fn is_exceeded(&self) -> bool {
self.pool.read().is_exceeded()
}
/// Inserts a blob transaction into the blob store
fn insert_blob(&self, hash: TxHash, blob: BlobTransactionSidecar) {
debug!(target: "txpool", "[{:?}] storing blob sidecar", hash);
if let Err(err) = self.blob_store.insert(hash, blob) {
warn!(target: "txpool", %err, "[{:?}] failed to insert blob", hash);
self.blob_store_metrics.blobstore_failed_inserts.increment(1);
}
self.update_blob_store_metrics();
}
/// Delete a blob from the blob store
pub fn delete_blob(&self, blob: TxHash) {
let _ = self.blob_store.delete(blob);
}
/// Delete all blobs from the blob store
pub fn delete_blobs(&self, txs: Vec<TxHash>) {
let _ = self.blob_store.delete_all(txs);
}
/// Cleans up the blob store
pub fn cleanup_blobs(&self) {
let stat = self.blob_store.cleanup();
self.blob_store_metrics.blobstore_failed_deletes.increment(stat.delete_failed as u64);
self.update_blob_store_metrics();
}
fn update_blob_store_metrics(&self) {
if let Some(data_size) = self.blob_store.data_size_hint() {
self.blob_store_metrics.blobstore_byte_size.set(data_size as f64);
}
self.blob_store_metrics.blobstore_entries.set(self.blob_store.blobs_len() as f64);
}
/// Deletes all blob transactions that were discarded.
fn delete_discarded_blobs<'a>(
&'a self,
transactions: impl IntoIterator<Item = &'a Arc<ValidPoolTransaction<T::Transaction>>>,
) {
let blob_txs = transactions
.into_iter()
.filter(|tx| tx.transaction.is_eip4844())
.map(|tx| *tx.hash())
.collect();
self.delete_blobs(blob_txs);
}
}
impl<V, T: TransactionOrdering, S> fmt::Debug for PoolInner<V, T, S> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("PoolInner").field("config", &self.config).finish_non_exhaustive()
}
}
/// An active listener for new pending transactions.
#[derive(Debug)]
struct PendingTransactionHashListener {
sender: mpsc::Sender<TxHash>,
/// Whether to include transactions that should not be propagated over the network.
kind: TransactionListenerKind,
}
impl PendingTransactionHashListener {
/// Attempts to send all hashes to the listener.
///
/// Returns false if the channel is closed (receiver dropped)
fn send_all(&self, hashes: impl IntoIterator<Item = TxHash>) -> bool {
for tx_hash in hashes {
match self.sender.try_send(tx_hash) {
Ok(()) => {}
Err(err) => {
return if matches!(err, mpsc::error::TrySendError::Full(_)) {
debug!(
target: "txpool",
"[{:?}] failed to send pending tx; channel full",
tx_hash,
);
true
} else {
false
}
}
}
}
true
}
}
/// An active listener for new pending transactions.
#[derive(Debug)]
struct TransactionListener<T: PoolTransaction> {
sender: mpsc::Sender<NewTransactionEvent<T>>,
/// Whether to include transactions that should not be propagated over the network.
kind: TransactionListenerKind,
}
impl<T: PoolTransaction> TransactionListener<T> {
/// Attempts to send the event to the listener.
///
/// Returns false if the channel is closed (receiver dropped)
fn send(&self, event: NewTransactionEvent<T>) -> bool {
self.send_all(std::iter::once(event))
}
/// Attempts to send all events to the listener.
///
/// Returns false if the channel is closed (receiver dropped)
fn send_all(&self, events: impl IntoIterator<Item = NewTransactionEvent<T>>) -> bool {
for event in events {
match self.sender.try_send(event) {
Ok(()) => {}
Err(err) => {
return if let mpsc::error::TrySendError::Full(event) = err {
debug!(
target: "txpool",
"[{:?}] failed to send pending tx; channel full",
event.transaction.hash(),
);
true
} else {
false
}
}
}
}
true
}
}
/// An active listener for new blobs
#[derive(Debug)]
struct BlobTransactionSidecarListener {
sender: mpsc::Sender<NewBlobSidecar>,
}
/// Tracks an added transaction and all graph changes caused by adding it.
#[derive(Debug, Clone)]
pub struct AddedPendingTransaction<T: PoolTransaction> {
/// Inserted transaction.
transaction: Arc<ValidPoolTransaction<T>>,
/// Replaced transaction.
replaced: Option<Arc<ValidPoolTransaction<T>>>,
/// transactions promoted to the pending queue
promoted: Vec<Arc<ValidPoolTransaction<T>>>,
/// transactions that failed and became discarded
discarded: Vec<Arc<ValidPoolTransaction<T>>>,
}
impl<T: PoolTransaction> AddedPendingTransaction<T> {
/// Returns all transactions that were promoted to the pending pool and adhere to the given
/// [`TransactionListenerKind`].
///
/// If the kind is [`TransactionListenerKind::PropagateOnly`], then only transactions that
/// are allowed to be propagated are returned.
pub(crate) fn pending_transactions(
&self,
kind: TransactionListenerKind,
) -> impl Iterator<Item = B256> + '_ {
let iter = std::iter::once(&self.transaction).chain(self.promoted.iter());
PendingTransactionIter { kind, iter }
}
/// Returns if the transaction should be propagated.
pub(crate) fn is_propagate_allowed(&self) -> bool {
self.transaction.propagate
}
}
pub(crate) struct PendingTransactionIter<Iter> {
kind: TransactionListenerKind,
iter: Iter,
}
impl<'a, Iter, T> Iterator for PendingTransactionIter<Iter>
where
Iter: Iterator<Item = &'a Arc<ValidPoolTransaction<T>>>,
T: PoolTransaction + 'a,
{
type Item = B256;
fn next(&mut self) -> Option<Self::Item> {
loop {
let next = self.iter.next()?;
if self.kind.is_propagate_only() && !next.propagate {
continue
}
return Some(*next.hash())
}
}
}
/// An iterator over full pending transactions
pub(crate) struct FullPendingTransactionIter<Iter> {
kind: TransactionListenerKind,
iter: Iter,
}
impl<'a, Iter, T> Iterator for FullPendingTransactionIter<Iter>
where
Iter: Iterator<Item = &'a Arc<ValidPoolTransaction<T>>>,
T: PoolTransaction + 'a,
{
type Item = NewTransactionEvent<T>;
fn next(&mut self) -> Option<Self::Item> {
loop {
let next = self.iter.next()?;
if self.kind.is_propagate_only() && !next.propagate {
continue
}
return Some(NewTransactionEvent {
subpool: SubPool::Pending,
transaction: next.clone(),
})
}
}
}
/// Represents a transaction that was added into the pool and its state
#[derive(Debug, Clone)]
pub enum AddedTransaction<T: PoolTransaction> {
/// Transaction was successfully added and moved to the pending pool.
Pending(AddedPendingTransaction<T>),
/// Transaction was successfully added but not yet ready for processing and moved to a
/// parked pool instead.
Parked {
/// Inserted transaction.
transaction: Arc<ValidPoolTransaction<T>>,
/// Replaced transaction.
replaced: Option<Arc<ValidPoolTransaction<T>>>,
/// The subpool it was moved to.
subpool: SubPool,
},
}
impl<T: PoolTransaction> AddedTransaction<T> {
/// Returns whether the transaction has been added to the pending pool.
pub(crate) const fn as_pending(&self) -> Option<&AddedPendingTransaction<T>> {
match self {
Self::Pending(tx) => Some(tx),
_ => None,
}
}
/// Returns the replaced transaction if there was one
pub(crate) const fn replaced(&self) -> Option<&Arc<ValidPoolTransaction<T>>> {
match self {
Self::Pending(tx) => tx.replaced.as_ref(),
Self::Parked { replaced, .. } => replaced.as_ref(),
}
}
/// Returns the discarded transactions if there were any
pub(crate) fn discarded_transactions(&self) -> Option<&[Arc<ValidPoolTransaction<T>>]> {
match self {
Self::Pending(tx) => Some(&tx.discarded),
Self::Parked { .. } => None,
}
}
/// Returns the hash of the replaced transaction if it is a blob transaction.
pub(crate) fn replaced_blob_transaction(&self) -> Option<B256> {
self.replaced().filter(|tx| tx.transaction.is_eip4844()).map(|tx| *tx.transaction.hash())
}
/// Returns the hash of the transaction
pub(crate) fn hash(&self) -> &TxHash {
match self {
Self::Pending(tx) => tx.transaction.hash(),
Self::Parked { transaction, .. } => transaction.hash(),
}
}
/// Converts this type into the event type for listeners
pub(crate) fn into_new_transaction_event(self) -> NewTransactionEvent<T> {
match self {
Self::Pending(tx) => {
NewTransactionEvent { subpool: SubPool::Pending, transaction: tx.transaction }
}
Self::Parked { transaction, subpool, .. } => {
NewTransactionEvent { transaction, subpool }
}
}
}
/// Returns the subpool this transaction was added to
#[cfg(test)]
pub(crate) const fn subpool(&self) -> SubPool {
match self {
Self::Pending(_) => SubPool::Pending,
Self::Parked { subpool, .. } => *subpool,
}
}
/// Returns the [`TransactionId`] of the added transaction
#[cfg(test)]
pub(crate) fn id(&self) -> &TransactionId {
match self {
Self::Pending(added) => added.transaction.id(),
Self::Parked { transaction, .. } => transaction.id(),
}
}
}
/// Contains all state changes after a [`CanonicalStateUpdate`] was processed
#[derive(Debug)]
pub(crate) struct OnNewCanonicalStateOutcome<T: PoolTransaction> {
/// Hash of the block.
pub(crate) block_hash: B256,
/// All mined transactions.
pub(crate) mined: Vec<TxHash>,
/// Transactions promoted to the pending pool.
pub(crate) promoted: Vec<Arc<ValidPoolTransaction<T>>>,
/// transaction that were discarded during the update
pub(crate) discarded: Vec<Arc<ValidPoolTransaction<T>>>,
}
impl<T: PoolTransaction> OnNewCanonicalStateOutcome<T> {
/// Returns all transactions that were promoted to the pending pool and adhere to the given
/// [`TransactionListenerKind`].
///
/// If the kind is [`TransactionListenerKind::PropagateOnly`], then only transactions that
/// are allowed to be propagated are returned.
pub(crate) fn pending_transactions(
&self,
kind: TransactionListenerKind,
) -> impl Iterator<Item = B256> + '_ {
let iter = self.promoted.iter();
PendingTransactionIter { kind, iter }
}
/// Returns all FULL transactions that were promoted to the pending pool and adhere to the given
/// [`TransactionListenerKind`].
///
/// If the kind is [`TransactionListenerKind::PropagateOnly`], then only transactions that
/// are allowed to be propagated are returned.
pub(crate) fn full_pending_transactions(
&self,
kind: TransactionListenerKind,
) -> impl Iterator<Item = NewTransactionEvent<T>> + '_ {
let iter = self.promoted.iter();
FullPendingTransactionIter { kind, iter }
}
}
#[cfg(test)]
mod tests {
use crate::{
blobstore::{BlobStore, InMemoryBlobStore},
test_utils::{MockTransaction, TestPoolBuilder},
validate::ValidTransaction,
BlockInfo, PoolConfig, SubPoolLimit, TransactionOrigin, TransactionValidationOutcome, U256,
};
use alloy_eips::eip4844::BlobTransactionSidecar;
use reth_primitives::kzg::Blob;
use std::{fs, path::PathBuf};
#[test]
fn test_discard_blobs_on_blob_tx_eviction() {
// Define the maximum limit for blobs in the sub-pool.
let blob_limit = SubPoolLimit::new(1000, usize::MAX);
// Create a test pool with default configuration and the specified blob limit.
let test_pool = &TestPoolBuilder::default()
.with_config(PoolConfig { blob_limit, ..Default::default() })
.pool;
// Set the block info for the pool, including a pending blob fee.
test_pool
.set_block_info(BlockInfo { pending_blob_fee: Some(10_000_000), ..Default::default() });
// Read the contents of the JSON file into a string.
let json_content = fs::read_to_string(
PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("test_data/blob1.json"),
)
.expect("Failed to read the blob data file");
// Parse the JSON contents into a serde_json::Value.
let json_value: serde_json::Value =
serde_json::from_str(&json_content).expect("Failed to deserialize JSON");
// Extract blob data from JSON and convert it to Blob.
let blobs: Vec<Blob> = vec![Blob::from_hex(
// Extract the "data" field from the JSON and parse it as a string.
json_value.get("data").unwrap().as_str().expect("Data is not a valid string"),
)
.unwrap()];
// Generate a BlobTransactionSidecar from the blobs.
let sidecar = BlobTransactionSidecar::try_from_blobs(blobs).unwrap();
// Create an in-memory blob store.
let blob_store = InMemoryBlobStore::default();
// Loop to add transactions to the pool and test blob eviction.
for n in 0..blob_limit.max_txs + 10 {
// Create a mock transaction with the generated blob sidecar.
let mut tx = MockTransaction::eip4844_with_sidecar(sidecar.clone());
// Set non zero size
tx.set_size(1844674407370951);
// Insert the sidecar into the blob store if the current index is within the blob limit.
if n < blob_limit.max_txs {
blob_store.insert(*tx.get_hash(), sidecar.clone()).unwrap();
}
// Add the transaction to the pool with external origin and valid outcome.
test_pool.add_transactions(
TransactionOrigin::External,
[TransactionValidationOutcome::Valid {
balance: U256::from(1_000),
state_nonce: 0,
transaction: ValidTransaction::ValidWithSidecar {
transaction: tx,
sidecar: sidecar.clone(),
},
propagate: true,
}],
);
}
// Assert that the size of the pool's blob component is equal to the maximum blob limit.
assert_eq!(test_pool.size().blob, blob_limit.max_txs);
// Assert that the size of the pool's blob_size component matches the expected value.
assert_eq!(test_pool.size().blob_size, 1844674407370951000);
// Assert that the pool's blob store matches the expected blob store.
assert_eq!(*test_pool.blob_store(), blob_store);
}
}