use crate::{
blobstore::{BlobStoreCanonTracker, BlobStoreUpdates},
error::PoolError,
metrics::MaintainPoolMetrics,
traits::{CanonicalStateUpdate, TransactionPool, TransactionPoolExt},
BlockInfo, PoolTransaction, PoolUpdateKind,
};
use alloy_eips::BlockNumberOrTag;
use alloy_primitives::{Address, BlockHash, BlockNumber};
use futures_util::{
future::{BoxFuture, Fuse, FusedFuture},
FutureExt, Stream, StreamExt,
};
use reth_chain_state::CanonStateNotification;
use reth_chainspec::{ChainSpecProvider, EthChainSpec};
use reth_execution_types::ChangedAccount;
use reth_fs_util::FsPathError;
use reth_primitives::{
PooledTransactionsElementEcRecovered, SealedHeader, TransactionSigned,
TransactionSignedEcRecovered,
};
use reth_storage_api::{errors::provider::ProviderError, BlockReaderIdExt, StateProviderFactory};
use reth_tasks::TaskSpawner;
use std::{
borrow::Borrow,
collections::HashSet,
hash::{Hash, Hasher},
path::{Path, PathBuf},
sync::Arc,
};
use tokio::sync::oneshot;
use tracing::{debug, error, info, trace, warn};
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct MaintainPoolConfig {
pub max_update_depth: u64,
pub max_reload_accounts: usize,
}
impl Default for MaintainPoolConfig {
fn default() -> Self {
Self { max_update_depth: 64, max_reload_accounts: 100 }
}
}
#[derive(Debug, Clone, Default)]
pub struct LocalTransactionBackupConfig {
pub transactions_path: Option<PathBuf>,
}
impl LocalTransactionBackupConfig {
pub const fn with_local_txs_backup(transactions_path: PathBuf) -> Self {
Self { transactions_path: Some(transactions_path) }
}
}
pub fn maintain_transaction_pool_future<Client, P, St, Tasks>(
client: Client,
pool: P,
events: St,
task_spawner: Tasks,
config: MaintainPoolConfig,
) -> BoxFuture<'static, ()>
where
Client: StateProviderFactory + BlockReaderIdExt + ChainSpecProvider + Clone + Send + 'static,
P: TransactionPoolExt + 'static,
St: Stream<Item = CanonStateNotification> + Send + Unpin + 'static,
Tasks: TaskSpawner + 'static,
{
async move {
maintain_transaction_pool(client, pool, events, task_spawner, config).await;
}
.boxed()
}
pub async fn maintain_transaction_pool<Client, P, St, Tasks>(
client: Client,
pool: P,
mut events: St,
task_spawner: Tasks,
config: MaintainPoolConfig,
) where
Client: StateProviderFactory + BlockReaderIdExt + ChainSpecProvider + Clone + Send + 'static,
P: TransactionPoolExt + 'static,
St: Stream<Item = CanonStateNotification> + Send + Unpin + 'static,
Tasks: TaskSpawner + 'static,
{
let metrics = MaintainPoolMetrics::default();
let MaintainPoolConfig { max_update_depth, max_reload_accounts, .. } = config;
if let Ok(Some(latest)) = client.header_by_number_or_tag(BlockNumberOrTag::Latest) {
let latest = SealedHeader::seal(latest);
let chain_spec = client.chain_spec();
let info = BlockInfo {
block_gas_limit: latest.gas_limit,
last_seen_block_hash: latest.hash(),
last_seen_block_number: latest.number,
pending_basefee: latest
.next_block_base_fee(chain_spec.base_fee_params_at_timestamp(latest.timestamp + 12))
.unwrap_or_default(),
pending_blob_fee: latest.next_block_blob_fee(),
};
pool.set_block_info(info);
}
let mut blob_store_tracker = BlobStoreCanonTracker::default();
let mut last_finalized_block =
FinalizedBlockTracker::new(client.finalized_block_number().ok().flatten());
let mut dirty_addresses = HashSet::default();
let mut maintained_state = MaintainedPoolState::InSync;
let mut reload_accounts_fut = Fuse::terminated();
loop {
trace!(target: "txpool", state=?maintained_state, "awaiting new block or reorg");
metrics.set_dirty_accounts_len(dirty_addresses.len());
let pool_info = pool.block_info();
if maintained_state.is_drifted() {
metrics.inc_drift();
dirty_addresses = pool.unique_senders();
maintained_state = MaintainedPoolState::InSync;
}
if !dirty_addresses.is_empty() && reload_accounts_fut.is_terminated() {
let (tx, rx) = oneshot::channel();
let c = client.clone();
let at = pool_info.last_seen_block_hash;
let fut = if dirty_addresses.len() > max_reload_accounts {
let accs_to_reload =
dirty_addresses.iter().copied().take(max_reload_accounts).collect::<Vec<_>>();
for acc in &accs_to_reload {
dirty_addresses.remove(acc);
}
async move {
let res = load_accounts(c, at, accs_to_reload.into_iter());
let _ = tx.send(res);
}
.boxed()
} else {
let accs_to_reload = std::mem::take(&mut dirty_addresses);
async move {
let res = load_accounts(c, at, accs_to_reload.into_iter());
let _ = tx.send(res);
}
.boxed()
};
reload_accounts_fut = rx.fuse();
task_spawner.spawn_blocking(fut);
}
if let Some(finalized) =
last_finalized_block.update(client.finalized_block_number().ok().flatten())
{
if let BlobStoreUpdates::Finalized(blobs) =
blob_store_tracker.on_finalized_block(finalized)
{
metrics.inc_deleted_tracked_blobs(blobs.len());
pool.delete_blobs(blobs);
let pool = pool.clone();
task_spawner.spawn_blocking(Box::pin(async move {
debug!(target: "txpool", finalized_block = %finalized, "cleaning up blob store");
pool.cleanup_blobs();
}));
}
}
let mut event = None;
let mut reloaded = None;
tokio::select! {
res = &mut reload_accounts_fut => {
reloaded = Some(res);
}
ev = events.next() => {
if ev.is_none() {
break;
}
event = ev;
}
}
match reloaded {
Some(Ok(Ok(LoadedAccounts { accounts, failed_to_load }))) => {
dirty_addresses.extend(failed_to_load);
pool.update_accounts(accounts);
}
Some(Ok(Err(res))) => {
let (accs, err) = *res;
debug!(target: "txpool", %err, "failed to load accounts");
dirty_addresses.extend(accs);
}
Some(Err(_)) => {
maintained_state = MaintainedPoolState::Drifted;
}
None => {}
}
let Some(event) = event else { continue };
match event {
CanonStateNotification::Reorg { old, new } => {
let (old_blocks, old_state) = old.inner();
let (new_blocks, new_state) = new.inner();
let new_tip = new_blocks.tip();
let new_first = new_blocks.first();
let old_first = old_blocks.first();
if !(old_first.parent_hash == pool_info.last_seen_block_hash ||
new_first.parent_hash == pool_info.last_seen_block_hash)
{
maintained_state = MaintainedPoolState::Drifted;
}
let chain_spec = client.chain_spec();
let pending_block_base_fee = new_tip
.next_block_base_fee(
chain_spec.base_fee_params_at_timestamp(new_tip.timestamp + 12),
)
.unwrap_or_default();
let pending_block_blob_fee = new_tip.next_block_blob_fee();
let new_changed_accounts: HashSet<_> =
new_state.changed_accounts().map(ChangedAccountEntry).collect();
let missing_changed_acc = old_state
.accounts_iter()
.map(|(a, _)| a)
.filter(|addr| !new_changed_accounts.contains(addr));
let mut changed_accounts =
match load_accounts(client.clone(), new_tip.hash(), missing_changed_acc) {
Ok(LoadedAccounts { accounts, failed_to_load }) => {
dirty_addresses.extend(failed_to_load);
accounts
}
Err(err) => {
let (addresses, err) = *err;
debug!(
target: "txpool",
%err,
"failed to load missing changed accounts at new tip: {:?}",
new_tip.hash()
);
dirty_addresses.extend(addresses);
vec![]
}
};
changed_accounts.extend(new_changed_accounts.into_iter().map(|entry| entry.0));
let new_mined_transactions: HashSet<_> = new_blocks.transaction_hashes().collect();
let pruned_old_transactions = old_blocks
.transactions_ecrecovered()
.filter(|tx| !new_mined_transactions.contains(&tx.hash))
.filter_map(|tx| {
if tx.is_eip4844() {
pool.get_blob(tx.hash)
.ok()
.flatten()
.map(Arc::unwrap_or_clone)
.and_then(|sidecar| {
PooledTransactionsElementEcRecovered::try_from_blob_transaction(
tx, sidecar,
)
.ok()
})
.map(|tx| {
<P as TransactionPool>::Transaction::from_pooled(tx.into())
})
} else {
<P as TransactionPool>::Transaction::try_from_consensus(tx.into()).ok()
}
})
.collect::<Vec<_>>();
let update = CanonicalStateUpdate {
new_tip: &new_tip.block,
pending_block_base_fee,
pending_block_blob_fee,
changed_accounts,
mined_transactions: new_blocks.transaction_hashes().collect(),
update_kind: PoolUpdateKind::Reorg,
};
pool.on_canonical_state_change(update);
metrics.inc_reinserted_transactions(pruned_old_transactions.len());
let _ = pool.add_external_transactions(pruned_old_transactions).await;
blob_store_tracker.add_new_chain_blocks(&new_blocks);
}
CanonStateNotification::Commit { new } => {
let (blocks, state) = new.inner();
let tip = blocks.tip();
let chain_spec = client.chain_spec();
let pending_block_base_fee = tip
.next_block_base_fee(
chain_spec.base_fee_params_at_timestamp(tip.timestamp + 12),
)
.unwrap_or_default();
let pending_block_blob_fee = tip.next_block_blob_fee();
let first_block = blocks.first();
trace!(
target: "txpool",
first = first_block.number,
tip = tip.number,
pool_block = pool_info.last_seen_block_number,
"update pool on new commit"
);
let depth = tip.number.abs_diff(pool_info.last_seen_block_number);
if depth > max_update_depth {
maintained_state = MaintainedPoolState::Drifted;
debug!(target: "txpool", ?depth, "skipping deep canonical update");
let info = BlockInfo {
block_gas_limit: tip.gas_limit,
last_seen_block_hash: tip.hash(),
last_seen_block_number: tip.number,
pending_basefee: pending_block_base_fee,
pending_blob_fee: pending_block_blob_fee,
};
pool.set_block_info(info);
blob_store_tracker.add_new_chain_blocks(&blocks);
continue
}
let mut changed_accounts = Vec::with_capacity(state.state().len());
for acc in state.changed_accounts() {
dirty_addresses.remove(&acc.address);
changed_accounts.push(acc);
}
let mined_transactions = blocks.transaction_hashes().collect();
if first_block.parent_hash != pool_info.last_seen_block_hash {
maintained_state = MaintainedPoolState::Drifted;
}
let update = CanonicalStateUpdate {
new_tip: &tip.block,
pending_block_base_fee,
pending_block_blob_fee,
changed_accounts,
mined_transactions,
update_kind: PoolUpdateKind::Commit,
};
pool.on_canonical_state_change(update);
blob_store_tracker.add_new_chain_blocks(&blocks);
}
}
}
}
struct FinalizedBlockTracker {
last_finalized_block: Option<BlockNumber>,
}
impl FinalizedBlockTracker {
const fn new(last_finalized_block: Option<BlockNumber>) -> Self {
Self { last_finalized_block }
}
fn update(&mut self, finalized_block: Option<BlockNumber>) -> Option<BlockNumber> {
let finalized = finalized_block?;
self.last_finalized_block
.replace(finalized)
.map_or(true, |last| last < finalized)
.then_some(finalized)
}
}
#[derive(Debug, PartialEq, Eq)]
enum MaintainedPoolState {
InSync,
Drifted,
}
impl MaintainedPoolState {
#[inline]
const fn is_drifted(&self) -> bool {
matches!(self, Self::Drifted)
}
}
#[derive(Eq)]
struct ChangedAccountEntry(ChangedAccount);
impl PartialEq for ChangedAccountEntry {
fn eq(&self, other: &Self) -> bool {
self.0.address == other.0.address
}
}
impl Hash for ChangedAccountEntry {
fn hash<H: Hasher>(&self, state: &mut H) {
self.0.address.hash(state);
}
}
impl Borrow<Address> for ChangedAccountEntry {
fn borrow(&self) -> &Address {
&self.0.address
}
}
#[derive(Default)]
struct LoadedAccounts {
accounts: Vec<ChangedAccount>,
failed_to_load: Vec<Address>,
}
fn load_accounts<Client, I>(
client: Client,
at: BlockHash,
addresses: I,
) -> Result<LoadedAccounts, Box<(HashSet<Address>, ProviderError)>>
where
I: IntoIterator<Item = Address>,
Client: StateProviderFactory,
{
let addresses = addresses.into_iter();
let mut res = LoadedAccounts::default();
let state = match client.history_by_block_hash(at) {
Ok(state) => state,
Err(err) => return Err(Box::new((addresses.collect(), err))),
};
for addr in addresses {
if let Ok(maybe_acc) = state.basic_account(addr) {
let acc = maybe_acc
.map(|acc| ChangedAccount { address: addr, nonce: acc.nonce, balance: acc.balance })
.unwrap_or_else(|| ChangedAccount::empty(addr));
res.accounts.push(acc)
} else {
res.failed_to_load.push(addr);
}
}
Ok(res)
}
async fn load_and_reinsert_transactions<P>(
pool: P,
file_path: &Path,
) -> Result<(), TransactionsBackupError>
where
P: TransactionPool,
{
if !file_path.exists() {
return Ok(())
}
debug!(target: "txpool", txs_file =?file_path, "Check local persistent storage for saved transactions");
let data = reth_fs_util::read(file_path)?;
if data.is_empty() {
return Ok(())
}
let txs_signed: Vec<TransactionSigned> = alloy_rlp::Decodable::decode(&mut data.as_slice())?;
let pool_transactions = txs_signed
.into_iter()
.filter_map(|tx| tx.try_ecrecovered())
.filter_map(|tx| {
<P::Transaction as PoolTransaction>::try_from_consensus(tx.into()).ok()
})
.collect();
let outcome = pool.add_transactions(crate::TransactionOrigin::Local, pool_transactions).await;
info!(target: "txpool", txs_file =?file_path, num_txs=%outcome.len(), "Successfully reinserted local transactions from file");
reth_fs_util::remove_file(file_path)?;
Ok(())
}
fn save_local_txs_backup<P>(pool: P, file_path: &Path)
where
P: TransactionPool,
{
let local_transactions = pool.get_local_transactions();
if local_transactions.is_empty() {
trace!(target: "txpool", "no local transactions to save");
return
}
let local_transactions = local_transactions
.into_iter()
.map(|tx| {
let recovered: TransactionSignedEcRecovered =
tx.transaction.clone().into_consensus().into();
recovered.into_signed()
})
.collect::<Vec<_>>();
let num_txs = local_transactions.len();
let mut buf = Vec::new();
alloy_rlp::encode_list(&local_transactions, &mut buf);
info!(target: "txpool", txs_file =?file_path, num_txs=%num_txs, "Saving current local transactions");
let parent_dir = file_path.parent().map(std::fs::create_dir_all).transpose();
match parent_dir.map(|_| reth_fs_util::write(file_path, buf)) {
Ok(_) => {
info!(target: "txpool", txs_file=?file_path, "Wrote local transactions to file");
}
Err(err) => {
warn!(target: "txpool", %err, txs_file=?file_path, "Failed to write local transactions to file");
}
}
}
#[derive(thiserror::Error, Debug)]
pub enum TransactionsBackupError {
#[error("failed to apply transactions backup. Encountered RLP decode error: {0}")]
Decode(#[from] alloy_rlp::Error),
#[error("failed to apply transactions backup. Encountered file error: {0}")]
FsPath(#[from] FsPathError),
#[error("failed to insert transactions to the transactions pool. Encountered pool error: {0}")]
Pool(#[from] PoolError),
}
pub async fn backup_local_transactions_task<P>(
shutdown: reth_tasks::shutdown::GracefulShutdown,
pool: P,
config: LocalTransactionBackupConfig,
) where
P: TransactionPool + Clone,
{
let Some(transactions_path) = config.transactions_path else {
return
};
if let Err(err) = load_and_reinsert_transactions(pool.clone(), &transactions_path).await {
error!(target: "txpool", "{}", err)
}
let graceful_guard = shutdown.await;
save_local_txs_backup(pool, &transactions_path);
drop(graceful_guard)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{
blobstore::InMemoryBlobStore, validate::EthTransactionValidatorBuilder,
CoinbaseTipOrdering, EthPooledTransaction, Pool, TransactionOrigin,
};
use alloy_eips::eip2718::Decodable2718;
use alloy_primitives::{hex, U256};
use reth_chainspec::MAINNET;
use reth_fs_util as fs;
use reth_primitives::PooledTransactionsElement;
use reth_provider::test_utils::{ExtendedAccount, MockEthProvider};
use reth_tasks::TaskManager;
#[test]
fn changed_acc_entry() {
let changed_acc = ChangedAccountEntry(ChangedAccount::empty(Address::random()));
let mut copy = changed_acc.0;
copy.nonce = 10;
assert!(changed_acc.eq(&ChangedAccountEntry(copy)));
}
const EXTENSION: &str = "rlp";
const FILENAME: &str = "test_transactions_backup";
#[tokio::test(flavor = "multi_thread")]
async fn test_save_local_txs_backup() {
let temp_dir = tempfile::tempdir().unwrap();
let transactions_path = temp_dir.path().join(FILENAME).with_extension(EXTENSION);
let tx_bytes = hex!("02f87201830655c2808505ef61f08482565f94388c818ca8b9251b393131c08a736a67ccb192978801049e39c4b5b1f580c001a01764ace353514e8abdfb92446de356b260e3c1225b73fc4c8876a6258d12a129a04f02294aa61ca7676061cd99f29275491218b4754b46a0248e5e42bc5091f507");
let tx = PooledTransactionsElement::decode_2718(&mut &tx_bytes[..]).unwrap();
let provider = MockEthProvider::default();
let transaction: EthPooledTransaction = tx.try_into_ecrecovered().unwrap().into();
let tx_to_cmp = transaction.clone();
let sender = hex!("1f9090aaE28b8a3dCeaDf281B0F12828e676c326").into();
provider.add_account(sender, ExtendedAccount::new(42, U256::MAX));
let blob_store = InMemoryBlobStore::default();
let validator = EthTransactionValidatorBuilder::new(MAINNET.clone())
.build(provider, blob_store.clone());
let txpool = Pool::new(
validator.clone(),
CoinbaseTipOrdering::default(),
blob_store.clone(),
Default::default(),
);
txpool.add_transaction(TransactionOrigin::Local, transaction.clone()).await.unwrap();
let handle = tokio::runtime::Handle::current();
let manager = TaskManager::new(handle);
let config = LocalTransactionBackupConfig::with_local_txs_backup(transactions_path.clone());
manager.executor().spawn_critical_with_graceful_shutdown_signal("test task", |shutdown| {
backup_local_transactions_task(shutdown, txpool.clone(), config)
});
let mut txns = txpool.get_local_transactions();
let tx_on_finish = txns.pop().expect("there should be 1 transaction");
assert_eq!(*tx_to_cmp.hash(), *tx_on_finish.hash());
manager.graceful_shutdown();
let data = fs::read(transactions_path).unwrap();
let txs: Vec<TransactionSigned> =
alloy_rlp::Decodable::decode(&mut data.as_slice()).unwrap();
assert_eq!(txs.len(), 1);
temp_dir.close().unwrap();
}
#[test]
fn test_update_with_higher_finalized_block() {
let mut tracker = FinalizedBlockTracker::new(Some(10));
assert_eq!(tracker.update(Some(15)), Some(15));
assert_eq!(tracker.last_finalized_block, Some(15));
}
#[test]
fn test_update_with_lower_finalized_block() {
let mut tracker = FinalizedBlockTracker::new(Some(20));
assert_eq!(tracker.update(Some(15)), None);
assert_eq!(tracker.last_finalized_block, Some(15));
}
#[test]
fn test_update_with_equal_finalized_block() {
let mut tracker = FinalizedBlockTracker::new(Some(20));
assert_eq!(tracker.update(Some(20)), None);
assert_eq!(tracker.last_finalized_block, Some(20));
}
#[test]
fn test_update_with_no_last_finalized_block() {
let mut tracker = FinalizedBlockTracker::new(None);
assert_eq!(tracker.update(Some(10)), Some(10));
assert_eq!(tracker.last_finalized_block, Some(10));
}
#[test]
fn test_update_with_no_new_finalized_block() {
let mut tracker = FinalizedBlockTracker::new(Some(10));
assert_eq!(tracker.update(None), None);
assert_eq!(tracker.last_finalized_block, Some(10));
}
#[test]
fn test_update_with_no_finalized_blocks() {
let mut tracker = FinalizedBlockTracker::new(None);
assert_eq!(tracker.update(None), None);
assert_eq!(tracker.last_finalized_block, None);
}
}