reth_transaction_pool/
maintain.rs

1//! Support for maintaining the state of the transaction pool
2
3use crate::{
4    blobstore::{BlobStoreCanonTracker, BlobStoreUpdates},
5    error::PoolError,
6    metrics::MaintainPoolMetrics,
7    traits::{CanonicalStateUpdate, EthPoolTransaction, TransactionPool, TransactionPoolExt},
8    BlockInfo, PoolTransaction, PoolUpdateKind,
9};
10use alloy_consensus::{BlockHeader, Typed2718};
11use alloy_eips::BlockNumberOrTag;
12use alloy_primitives::{Address, BlockHash, BlockNumber};
13use alloy_rlp::Encodable;
14use futures_util::{
15    future::{BoxFuture, Fuse, FusedFuture},
16    FutureExt, Stream, StreamExt,
17};
18use reth_chain_state::CanonStateNotification;
19use reth_chainspec::{ChainSpecProvider, EthChainSpec};
20use reth_execution_types::ChangedAccount;
21use reth_fs_util::FsPathError;
22use reth_primitives_traits::{
23    transaction::signed::SignedTransaction, NodePrimitives, SealedHeader,
24};
25use reth_storage_api::{errors::provider::ProviderError, BlockReaderIdExt, StateProviderFactory};
26use reth_tasks::TaskSpawner;
27use std::{
28    borrow::Borrow,
29    collections::HashSet,
30    hash::{Hash, Hasher},
31    path::{Path, PathBuf},
32    sync::Arc,
33};
34use tokio::{
35    sync::oneshot,
36    time::{self, Duration},
37};
38use tracing::{debug, error, info, trace, warn};
39
40/// Maximum amount of time non-executable transaction are queued.
41pub const MAX_QUEUED_TRANSACTION_LIFETIME: Duration = Duration::from_secs(3 * 60 * 60);
42
43/// Additional settings for maintaining the transaction pool
44#[derive(Debug, Clone, Copy, PartialEq, Eq)]
45pub struct MaintainPoolConfig {
46    /// Maximum (reorg) depth we handle when updating the transaction pool: `new.number -
47    /// last_seen.number`
48    ///
49    /// Default: 64 (2 epochs)
50    pub max_update_depth: u64,
51    /// Maximum number of accounts to reload from state at once when updating the transaction pool.
52    ///
53    /// Default: 100
54    pub max_reload_accounts: usize,
55
56    /// Maximum amount of time non-executable, non local transactions are queued.
57    /// Default: 3 hours
58    pub max_tx_lifetime: Duration,
59}
60
61impl Default for MaintainPoolConfig {
62    fn default() -> Self {
63        Self {
64            max_update_depth: 64,
65            max_reload_accounts: 100,
66            max_tx_lifetime: MAX_QUEUED_TRANSACTION_LIFETIME,
67        }
68    }
69}
70
71/// Settings for local transaction backup task
72#[derive(Debug, Clone, Default)]
73pub struct LocalTransactionBackupConfig {
74    /// Path to transactions backup file
75    pub transactions_path: Option<PathBuf>,
76}
77
78impl LocalTransactionBackupConfig {
79    /// Receive path to transactions backup and return initialized config
80    pub const fn with_local_txs_backup(transactions_path: PathBuf) -> Self {
81        Self { transactions_path: Some(transactions_path) }
82    }
83}
84
85/// Returns a spawnable future for maintaining the state of the transaction pool.
86pub fn maintain_transaction_pool_future<N, Client, P, St, Tasks>(
87    client: Client,
88    pool: P,
89    events: St,
90    task_spawner: Tasks,
91    config: MaintainPoolConfig,
92) -> BoxFuture<'static, ()>
93where
94    N: NodePrimitives,
95    Client: StateProviderFactory + BlockReaderIdExt + ChainSpecProvider + Clone + 'static,
96    P: TransactionPoolExt<Transaction: PoolTransaction<Consensus = N::SignedTx>> + 'static,
97    St: Stream<Item = CanonStateNotification<N>> + Send + Unpin + 'static,
98    Tasks: TaskSpawner + 'static,
99{
100    async move {
101        maintain_transaction_pool(client, pool, events, task_spawner, config).await;
102    }
103    .boxed()
104}
105
106/// Maintains the state of the transaction pool by handling new blocks and reorgs.
107///
108/// This listens for any new blocks and reorgs and updates the transaction pool's state accordingly
109pub async fn maintain_transaction_pool<N, Client, P, St, Tasks>(
110    client: Client,
111    pool: P,
112    mut events: St,
113    task_spawner: Tasks,
114    config: MaintainPoolConfig,
115) where
116    N: NodePrimitives,
117    Client: StateProviderFactory + BlockReaderIdExt + ChainSpecProvider + Clone + 'static,
118    P: TransactionPoolExt<Transaction: PoolTransaction<Consensus = N::SignedTx>> + 'static,
119    St: Stream<Item = CanonStateNotification<N>> + Send + Unpin + 'static,
120    Tasks: TaskSpawner + 'static,
121{
122    let metrics = MaintainPoolMetrics::default();
123    let MaintainPoolConfig { max_update_depth, max_reload_accounts, .. } = config;
124    // ensure the pool points to latest state
125    if let Ok(Some(latest)) = client.header_by_number_or_tag(BlockNumberOrTag::Latest) {
126        let latest = SealedHeader::seal_slow(latest);
127        let chain_spec = client.chain_spec();
128        let info = BlockInfo {
129            block_gas_limit: latest.gas_limit(),
130            last_seen_block_hash: latest.hash(),
131            last_seen_block_number: latest.number(),
132            pending_basefee: latest
133                .next_block_base_fee(chain_spec.base_fee_params_at_timestamp(latest.timestamp()))
134                .unwrap_or_default(),
135            pending_blob_fee: latest
136                .maybe_next_block_blob_fee(chain_spec.blob_params_at_timestamp(latest.timestamp())),
137        };
138        pool.set_block_info(info);
139    }
140
141    // keeps track of mined blob transaction so we can clean finalized transactions
142    let mut blob_store_tracker = BlobStoreCanonTracker::default();
143
144    // keeps track of the latest finalized block
145    let mut last_finalized_block =
146        FinalizedBlockTracker::new(client.finalized_block_number().ok().flatten());
147
148    // keeps track of any dirty accounts that we know of are out of sync with the pool
149    let mut dirty_addresses = HashSet::default();
150
151    // keeps track of the state of the pool wrt to blocks
152    let mut maintained_state = MaintainedPoolState::InSync;
153
154    // the future that reloads accounts from state
155    let mut reload_accounts_fut = Fuse::terminated();
156
157    // eviction interval for stale non local txs
158    let mut stale_eviction_interval = time::interval(config.max_tx_lifetime);
159
160    // toggle for the first notification
161    let mut first_event = true;
162
163    // The update loop that waits for new blocks and reorgs and performs pool updated
164    // Listen for new chain events and derive the update action for the pool
165    loop {
166        trace!(target: "txpool", state=?maintained_state, "awaiting new block or reorg");
167
168        metrics.set_dirty_accounts_len(dirty_addresses.len());
169        let pool_info = pool.block_info();
170
171        // after performing a pool update after a new block we have some time to properly update
172        // dirty accounts and correct if the pool drifted from current state, for example after
173        // restart or a pipeline run
174        if maintained_state.is_drifted() {
175            metrics.inc_drift();
176            // assuming all senders are dirty
177            dirty_addresses = pool.unique_senders();
178            // make sure we toggle the state back to in sync
179            maintained_state = MaintainedPoolState::InSync;
180        }
181
182        // if we have accounts that are out of sync with the pool, we reload them in chunks
183        if !dirty_addresses.is_empty() && reload_accounts_fut.is_terminated() {
184            let (tx, rx) = oneshot::channel();
185            let c = client.clone();
186            let at = pool_info.last_seen_block_hash;
187            let fut = if dirty_addresses.len() > max_reload_accounts {
188                // need to chunk accounts to reload
189                let accs_to_reload =
190                    dirty_addresses.iter().copied().take(max_reload_accounts).collect::<Vec<_>>();
191                for acc in &accs_to_reload {
192                    // make sure we remove them from the dirty set
193                    dirty_addresses.remove(acc);
194                }
195                async move {
196                    let res = load_accounts(c, at, accs_to_reload.into_iter());
197                    let _ = tx.send(res);
198                }
199                .boxed()
200            } else {
201                // can fetch all dirty accounts at once
202                let accs_to_reload = std::mem::take(&mut dirty_addresses);
203                async move {
204                    let res = load_accounts(c, at, accs_to_reload.into_iter());
205                    let _ = tx.send(res);
206                }
207                .boxed()
208            };
209            reload_accounts_fut = rx.fuse();
210            task_spawner.spawn_blocking(fut);
211        }
212
213        // check if we have a new finalized block
214        if let Some(finalized) =
215            last_finalized_block.update(client.finalized_block_number().ok().flatten())
216        {
217            if let BlobStoreUpdates::Finalized(blobs) =
218                blob_store_tracker.on_finalized_block(finalized)
219            {
220                metrics.inc_deleted_tracked_blobs(blobs.len());
221                // remove all finalized blobs from the blob store
222                pool.delete_blobs(blobs);
223                // and also do periodic cleanup
224                let pool = pool.clone();
225                task_spawner.spawn_blocking(Box::pin(async move {
226                    debug!(target: "txpool", finalized_block = %finalized, "cleaning up blob store");
227                    pool.cleanup_blobs();
228                }));
229            }
230        }
231
232        // outcomes of the futures we are waiting on
233        let mut event = None;
234        let mut reloaded = None;
235
236        // select of account reloads and new canonical state updates which should arrive at the rate
237        // of the block time
238        tokio::select! {
239            res = &mut reload_accounts_fut =>  {
240                reloaded = Some(res);
241            }
242            ev = events.next() =>  {
243                 if ev.is_none() {
244                    // the stream ended, we are done
245                    break;
246                }
247                event = ev;
248                // on receiving the first event on start up, mark the pool as drifted to explicitly
249                // trigger revalidation and clear out outdated txs.
250                if first_event {
251                    maintained_state = MaintainedPoolState::Drifted;
252                    first_event = false
253                }
254            }
255            _ = stale_eviction_interval.tick() => {
256                let stale_txs: Vec<_> = pool
257                    .queued_transactions()
258                    .into_iter()
259                    .filter(|tx| {
260                        // filter stale external txs
261                        tx.origin.is_external() && tx.timestamp.elapsed() > config.max_tx_lifetime
262                    })
263                    .map(|tx| *tx.hash())
264                    .collect();
265                debug!(target: "txpool", count=%stale_txs.len(), "removing stale transactions");
266                pool.remove_transactions(stale_txs);
267            }
268        }
269        // handle the result of the account reload
270        match reloaded {
271            Some(Ok(Ok(LoadedAccounts { accounts, failed_to_load }))) => {
272                // reloaded accounts successfully
273                // extend accounts we failed to load from database
274                dirty_addresses.extend(failed_to_load);
275                // update the pool with the loaded accounts
276                pool.update_accounts(accounts);
277            }
278            Some(Ok(Err(res))) => {
279                // Failed to load accounts from state
280                let (accs, err) = *res;
281                debug!(target: "txpool", %err, "failed to load accounts");
282                dirty_addresses.extend(accs);
283            }
284            Some(Err(_)) => {
285                // failed to receive the accounts, sender dropped, only possible if task panicked
286                maintained_state = MaintainedPoolState::Drifted;
287            }
288            None => {}
289        }
290
291        // handle the new block or reorg
292        let Some(event) = event else { continue };
293        match event {
294            CanonStateNotification::Reorg { old, new } => {
295                let (old_blocks, old_state) = old.inner();
296                let (new_blocks, new_state) = new.inner();
297                let new_tip = new_blocks.tip();
298                let new_first = new_blocks.first();
299                let old_first = old_blocks.first();
300
301                // check if the reorg is not canonical with the pool's block
302                if !(old_first.parent_hash() == pool_info.last_seen_block_hash ||
303                    new_first.parent_hash() == pool_info.last_seen_block_hash)
304                {
305                    // the new block points to a higher block than the oldest block in the old chain
306                    maintained_state = MaintainedPoolState::Drifted;
307                }
308
309                let chain_spec = client.chain_spec();
310
311                // fees for the next block: `new_tip+1`
312                let pending_block_base_fee = new_tip
313                    .header()
314                    .next_block_base_fee(
315                        chain_spec.base_fee_params_at_timestamp(new_tip.timestamp()),
316                    )
317                    .unwrap_or_default();
318                let pending_block_blob_fee = new_tip.header().maybe_next_block_blob_fee(
319                    chain_spec.blob_params_at_timestamp(new_tip.timestamp()),
320                );
321
322                // we know all changed account in the new chain
323                let new_changed_accounts: HashSet<_> =
324                    new_state.changed_accounts().map(ChangedAccountEntry).collect();
325
326                // find all accounts that were changed in the old chain but _not_ in the new chain
327                let missing_changed_acc = old_state
328                    .accounts_iter()
329                    .map(|(a, _)| a)
330                    .filter(|addr| !new_changed_accounts.contains(addr));
331
332                // for these we need to fetch the nonce+balance from the db at the new tip
333                let mut changed_accounts =
334                    match load_accounts(client.clone(), new_tip.hash(), missing_changed_acc) {
335                        Ok(LoadedAccounts { accounts, failed_to_load }) => {
336                            // extend accounts we failed to load from database
337                            dirty_addresses.extend(failed_to_load);
338
339                            accounts
340                        }
341                        Err(err) => {
342                            let (addresses, err) = *err;
343                            debug!(
344                                target: "txpool",
345                                %err,
346                                "failed to load missing changed accounts at new tip: {:?}",
347                                new_tip.hash()
348                            );
349                            dirty_addresses.extend(addresses);
350                            vec![]
351                        }
352                    };
353
354                // also include all accounts from new chain
355                // we can use extend here because they are unique
356                changed_accounts.extend(new_changed_accounts.into_iter().map(|entry| entry.0));
357
358                // all transactions mined in the new chain
359                let new_mined_transactions: HashSet<_> = new_blocks.transaction_hashes().collect();
360
361                // update the pool then re-inject the pruned transactions
362                // find all transactions that were mined in the old chain but not in the new chain
363                let pruned_old_transactions = old_blocks
364                    .transactions_ecrecovered()
365                    .filter(|tx| !new_mined_transactions.contains(tx.tx_hash()))
366                    .filter_map(|tx| {
367                        if tx.is_eip4844() {
368                            // reorged blobs no longer include the blob, which is necessary for
369                            // validating the transaction. Even though the transaction could have
370                            // been validated previously, we still need the blob in order to
371                            // accurately set the transaction's
372                            // encoded-length which is propagated over the network.
373                            pool.get_blob(*tx.tx_hash())
374                                .ok()
375                                .flatten()
376                                .map(Arc::unwrap_or_clone)
377                                .and_then(|sidecar| {
378                                    <P as TransactionPool>::Transaction::try_from_eip4844(
379                                        tx, sidecar,
380                                    )
381                                })
382                        } else {
383                            <P as TransactionPool>::Transaction::try_from_consensus(tx).ok()
384                        }
385                    })
386                    .collect::<Vec<_>>();
387
388                // update the pool first
389                let update = CanonicalStateUpdate {
390                    new_tip: new_tip.sealed_block(),
391                    pending_block_base_fee,
392                    pending_block_blob_fee,
393                    changed_accounts,
394                    // all transactions mined in the new chain need to be removed from the pool
395                    mined_transactions: new_blocks.transaction_hashes().collect(),
396                    update_kind: PoolUpdateKind::Reorg,
397                };
398                pool.on_canonical_state_change(update);
399
400                // all transactions that were mined in the old chain but not in the new chain need
401                // to be re-injected
402                //
403                // Note: we no longer know if the tx was local or external
404                // Because the transactions are not finalized, the corresponding blobs are still in
405                // blob store (if we previously received them from the network)
406                metrics.inc_reinserted_transactions(pruned_old_transactions.len());
407                let _ = pool.add_external_transactions(pruned_old_transactions).await;
408
409                // keep track of new mined blob transactions
410                blob_store_tracker.add_new_chain_blocks(&new_blocks);
411            }
412            CanonStateNotification::Commit { new } => {
413                let (blocks, state) = new.inner();
414                let tip = blocks.tip();
415                let chain_spec = client.chain_spec();
416
417                // fees for the next block: `tip+1`
418                let pending_block_base_fee = tip
419                    .header()
420                    .next_block_base_fee(chain_spec.base_fee_params_at_timestamp(tip.timestamp()))
421                    .unwrap_or_default();
422                let pending_block_blob_fee = tip.header().maybe_next_block_blob_fee(
423                    chain_spec.blob_params_at_timestamp(tip.timestamp()),
424                );
425
426                let first_block = blocks.first();
427                trace!(
428                    target: "txpool",
429                    first = first_block.number(),
430                    tip = tip.number(),
431                    pool_block = pool_info.last_seen_block_number,
432                    "update pool on new commit"
433                );
434
435                // check if the depth is too large and should be skipped, this could happen after
436                // initial sync or long re-sync
437                let depth = tip.number().abs_diff(pool_info.last_seen_block_number);
438                if depth > max_update_depth {
439                    maintained_state = MaintainedPoolState::Drifted;
440                    debug!(target: "txpool", ?depth, "skipping deep canonical update");
441                    let info = BlockInfo {
442                        block_gas_limit: tip.header().gas_limit(),
443                        last_seen_block_hash: tip.hash(),
444                        last_seen_block_number: tip.number(),
445                        pending_basefee: pending_block_base_fee,
446                        pending_blob_fee: pending_block_blob_fee,
447                    };
448                    pool.set_block_info(info);
449
450                    // keep track of mined blob transactions
451                    blob_store_tracker.add_new_chain_blocks(&blocks);
452
453                    continue
454                }
455
456                let mut changed_accounts = Vec::with_capacity(state.state().len());
457                for acc in state.changed_accounts() {
458                    // we can always clear the dirty flag for this account
459                    dirty_addresses.remove(&acc.address);
460                    changed_accounts.push(acc);
461                }
462
463                let mined_transactions = blocks.transaction_hashes().collect();
464
465                // check if the range of the commit is canonical with the pool's block
466                if first_block.parent_hash() != pool_info.last_seen_block_hash {
467                    // we received a new canonical chain commit but the commit is not canonical with
468                    // the pool's block, this could happen after initial sync or
469                    // long re-sync
470                    maintained_state = MaintainedPoolState::Drifted;
471                }
472
473                // Canonical update
474                let update = CanonicalStateUpdate {
475                    new_tip: tip.sealed_block(),
476                    pending_block_base_fee,
477                    pending_block_blob_fee,
478                    changed_accounts,
479                    mined_transactions,
480                    update_kind: PoolUpdateKind::Commit,
481                };
482                pool.on_canonical_state_change(update);
483
484                // keep track of mined blob transactions
485                blob_store_tracker.add_new_chain_blocks(&blocks);
486            }
487        }
488    }
489}
490
491struct FinalizedBlockTracker {
492    last_finalized_block: Option<BlockNumber>,
493}
494
495impl FinalizedBlockTracker {
496    const fn new(last_finalized_block: Option<BlockNumber>) -> Self {
497        Self { last_finalized_block }
498    }
499
500    /// Updates the tracked finalized block and returns the new finalized block if it changed
501    fn update(&mut self, finalized_block: Option<BlockNumber>) -> Option<BlockNumber> {
502        let finalized = finalized_block?;
503        self.last_finalized_block
504            .replace(finalized)
505            .is_none_or(|last| last < finalized)
506            .then_some(finalized)
507    }
508}
509
510/// Keeps track of the pool's state, whether the accounts in the pool are in sync with the actual
511/// state.
512#[derive(Debug, PartialEq, Eq)]
513enum MaintainedPoolState {
514    /// Pool is assumed to be in sync with the current state
515    InSync,
516    /// Pool could be out of sync with the state
517    Drifted,
518}
519
520impl MaintainedPoolState {
521    /// Returns `true` if the pool is assumed to be out of sync with the current state.
522    #[inline]
523    const fn is_drifted(&self) -> bool {
524        matches!(self, Self::Drifted)
525    }
526}
527
528/// A unique [`ChangedAccount`] identified by its address that can be used for deduplication
529#[derive(Eq)]
530struct ChangedAccountEntry(ChangedAccount);
531
532impl PartialEq for ChangedAccountEntry {
533    fn eq(&self, other: &Self) -> bool {
534        self.0.address == other.0.address
535    }
536}
537
538impl Hash for ChangedAccountEntry {
539    fn hash<H: Hasher>(&self, state: &mut H) {
540        self.0.address.hash(state);
541    }
542}
543
544impl Borrow<Address> for ChangedAccountEntry {
545    fn borrow(&self) -> &Address {
546        &self.0.address
547    }
548}
549
550#[derive(Default)]
551struct LoadedAccounts {
552    /// All accounts that were loaded
553    accounts: Vec<ChangedAccount>,
554    /// All accounts that failed to load
555    failed_to_load: Vec<Address>,
556}
557
558/// Loads all accounts at the given state
559///
560/// Returns an error with all given addresses if the state is not available.
561///
562/// Note: this expects _unique_ addresses
563fn load_accounts<Client, I>(
564    client: Client,
565    at: BlockHash,
566    addresses: I,
567) -> Result<LoadedAccounts, Box<(HashSet<Address>, ProviderError)>>
568where
569    I: IntoIterator<Item = Address>,
570
571    Client: StateProviderFactory,
572{
573    let addresses = addresses.into_iter();
574    let mut res = LoadedAccounts::default();
575    let state = match client.history_by_block_hash(at) {
576        Ok(state) => state,
577        Err(err) => return Err(Box::new((addresses.collect(), err))),
578    };
579    for addr in addresses {
580        if let Ok(maybe_acc) = state.basic_account(&addr) {
581            let acc = maybe_acc
582                .map(|acc| ChangedAccount { address: addr, nonce: acc.nonce, balance: acc.balance })
583                .unwrap_or_else(|| ChangedAccount::empty(addr));
584            res.accounts.push(acc)
585        } else {
586            // failed to load account.
587            res.failed_to_load.push(addr);
588        }
589    }
590    Ok(res)
591}
592
593/// Loads transactions from a file, decodes them from the RLP format, and inserts them
594/// into the transaction pool on node boot up.
595/// The file is removed after the transactions have been successfully processed.
596async fn load_and_reinsert_transactions<P>(
597    pool: P,
598    file_path: &Path,
599) -> Result<(), TransactionsBackupError>
600where
601    P: TransactionPool<Transaction: PoolTransaction<Consensus: SignedTransaction>>,
602{
603    if !file_path.exists() {
604        return Ok(())
605    }
606
607    debug!(target: "txpool", txs_file =?file_path, "Check local persistent storage for saved transactions");
608    let data = reth_fs_util::read(file_path)?;
609
610    if data.is_empty() {
611        return Ok(())
612    }
613
614    let txs_signed: Vec<<P::Transaction as PoolTransaction>::Consensus> =
615        alloy_rlp::Decodable::decode(&mut data.as_slice())?;
616
617    let pool_transactions = txs_signed
618        .into_iter()
619        .filter_map(|tx| tx.try_clone_into_recovered().ok())
620        .filter_map(|tx| {
621            // Filter out errors
622            <P::Transaction as PoolTransaction>::try_from_consensus(tx).ok()
623        })
624        .collect();
625
626    let outcome = pool.add_transactions(crate::TransactionOrigin::Local, pool_transactions).await;
627
628    info!(target: "txpool", txs_file =?file_path, num_txs=%outcome.len(), "Successfully reinserted local transactions from file");
629    reth_fs_util::remove_file(file_path)?;
630    Ok(())
631}
632
633fn save_local_txs_backup<P>(pool: P, file_path: &Path)
634where
635    P: TransactionPool<Transaction: PoolTransaction<Consensus: Encodable>>,
636{
637    let local_transactions = pool.get_local_transactions();
638    if local_transactions.is_empty() {
639        trace!(target: "txpool", "no local transactions to save");
640        return
641    }
642
643    let local_transactions = local_transactions
644        .into_iter()
645        .map(|tx| tx.transaction.clone_into_consensus().into_inner())
646        .collect::<Vec<_>>();
647
648    let num_txs = local_transactions.len();
649    let mut buf = Vec::new();
650    alloy_rlp::encode_list(&local_transactions, &mut buf);
651    info!(target: "txpool", txs_file =?file_path, num_txs=%num_txs, "Saving current local transactions");
652    let parent_dir = file_path.parent().map(std::fs::create_dir_all).transpose();
653
654    match parent_dir.map(|_| reth_fs_util::write(file_path, buf)) {
655        Ok(_) => {
656            info!(target: "txpool", txs_file=?file_path, "Wrote local transactions to file");
657        }
658        Err(err) => {
659            warn!(target: "txpool", %err, txs_file=?file_path, "Failed to write local transactions to file");
660        }
661    }
662}
663
664/// Errors possible during txs backup load and decode
665#[derive(thiserror::Error, Debug)]
666pub enum TransactionsBackupError {
667    /// Error during RLP decoding of transactions
668    #[error("failed to apply transactions backup. Encountered RLP decode error: {0}")]
669    Decode(#[from] alloy_rlp::Error),
670    /// Error during file upload
671    #[error("failed to apply transactions backup. Encountered file error: {0}")]
672    FsPath(#[from] FsPathError),
673    /// Error adding transactions to the transaction pool
674    #[error("failed to insert transactions to the transactions pool. Encountered pool error: {0}")]
675    Pool(#[from] PoolError),
676}
677
678/// Task which manages saving local transactions to the persistent file in case of shutdown.
679/// Reloads the transactions from the file on the boot up and inserts them into the pool.
680pub async fn backup_local_transactions_task<P>(
681    shutdown: reth_tasks::shutdown::GracefulShutdown,
682    pool: P,
683    config: LocalTransactionBackupConfig,
684) where
685    P: TransactionPool<Transaction: PoolTransaction<Consensus: SignedTransaction>> + Clone,
686{
687    let Some(transactions_path) = config.transactions_path else {
688        // nothing to do
689        return
690    };
691
692    if let Err(err) = load_and_reinsert_transactions(pool.clone(), &transactions_path).await {
693        error!(target: "txpool", "{}", err)
694    }
695
696    let graceful_guard = shutdown.await;
697
698    // write transactions to disk
699    save_local_txs_backup(pool, &transactions_path);
700
701    drop(graceful_guard)
702}
703
704#[cfg(test)]
705mod tests {
706    use super::*;
707    use crate::{
708        blobstore::InMemoryBlobStore, validate::EthTransactionValidatorBuilder,
709        CoinbaseTipOrdering, EthPooledTransaction, Pool, TransactionOrigin,
710    };
711    use alloy_consensus::transaction::PooledTransaction;
712    use alloy_eips::eip2718::Decodable2718;
713    use alloy_primitives::{hex, U256};
714    use reth_ethereum_primitives::TransactionSigned;
715    use reth_fs_util as fs;
716    use reth_provider::test_utils::{ExtendedAccount, MockEthProvider};
717    use reth_tasks::TaskManager;
718
719    #[test]
720    fn changed_acc_entry() {
721        let changed_acc = ChangedAccountEntry(ChangedAccount::empty(Address::random()));
722        let mut copy = changed_acc.0;
723        copy.nonce = 10;
724        assert!(changed_acc.eq(&ChangedAccountEntry(copy)));
725    }
726
727    const EXTENSION: &str = "rlp";
728    const FILENAME: &str = "test_transactions_backup";
729
730    #[tokio::test(flavor = "multi_thread")]
731    async fn test_save_local_txs_backup() {
732        let temp_dir = tempfile::tempdir().unwrap();
733        let transactions_path = temp_dir.path().join(FILENAME).with_extension(EXTENSION);
734        let tx_bytes = hex!("02f87201830655c2808505ef61f08482565f94388c818ca8b9251b393131c08a736a67ccb192978801049e39c4b5b1f580c001a01764ace353514e8abdfb92446de356b260e3c1225b73fc4c8876a6258d12a129a04f02294aa61ca7676061cd99f29275491218b4754b46a0248e5e42bc5091f507");
735        let tx = PooledTransaction::decode_2718(&mut &tx_bytes[..]).unwrap();
736        let provider = MockEthProvider::default();
737        let transaction = EthPooledTransaction::from_pooled(tx.try_into_recovered().unwrap());
738        let tx_to_cmp = transaction.clone();
739        let sender = hex!("1f9090aaE28b8a3dCeaDf281B0F12828e676c326").into();
740        provider.add_account(sender, ExtendedAccount::new(42, U256::MAX));
741        let blob_store = InMemoryBlobStore::default();
742        let validator = EthTransactionValidatorBuilder::new(provider).build(blob_store.clone());
743
744        let txpool = Pool::new(
745            validator.clone(),
746            CoinbaseTipOrdering::default(),
747            blob_store.clone(),
748            Default::default(),
749        );
750
751        txpool.add_transaction(TransactionOrigin::Local, transaction.clone()).await.unwrap();
752
753        let handle = tokio::runtime::Handle::current();
754        let manager = TaskManager::new(handle);
755        let config = LocalTransactionBackupConfig::with_local_txs_backup(transactions_path.clone());
756        manager.executor().spawn_critical_with_graceful_shutdown_signal("test task", |shutdown| {
757            backup_local_transactions_task(shutdown, txpool.clone(), config)
758        });
759
760        let mut txns = txpool.get_local_transactions();
761        let tx_on_finish = txns.pop().expect("there should be 1 transaction");
762
763        assert_eq!(*tx_to_cmp.hash(), *tx_on_finish.hash());
764
765        // shutdown the executor
766        manager.graceful_shutdown();
767
768        let data = fs::read(transactions_path).unwrap();
769
770        let txs: Vec<TransactionSigned> =
771            alloy_rlp::Decodable::decode(&mut data.as_slice()).unwrap();
772        assert_eq!(txs.len(), 1);
773
774        temp_dir.close().unwrap();
775    }
776
777    #[test]
778    fn test_update_with_higher_finalized_block() {
779        let mut tracker = FinalizedBlockTracker::new(Some(10));
780        assert_eq!(tracker.update(Some(15)), Some(15));
781        assert_eq!(tracker.last_finalized_block, Some(15));
782    }
783
784    #[test]
785    fn test_update_with_lower_finalized_block() {
786        let mut tracker = FinalizedBlockTracker::new(Some(20));
787        assert_eq!(tracker.update(Some(15)), None);
788        assert_eq!(tracker.last_finalized_block, Some(15));
789    }
790
791    #[test]
792    fn test_update_with_equal_finalized_block() {
793        let mut tracker = FinalizedBlockTracker::new(Some(20));
794        assert_eq!(tracker.update(Some(20)), None);
795        assert_eq!(tracker.last_finalized_block, Some(20));
796    }
797
798    #[test]
799    fn test_update_with_no_last_finalized_block() {
800        let mut tracker = FinalizedBlockTracker::new(None);
801        assert_eq!(tracker.update(Some(10)), Some(10));
802        assert_eq!(tracker.last_finalized_block, Some(10));
803    }
804
805    #[test]
806    fn test_update_with_no_new_finalized_block() {
807        let mut tracker = FinalizedBlockTracker::new(Some(10));
808        assert_eq!(tracker.update(None), None);
809        assert_eq!(tracker.last_finalized_block, Some(10));
810    }
811
812    #[test]
813    fn test_update_with_no_finalized_blocks() {
814        let mut tracker = FinalizedBlockTracker::new(None);
815        assert_eq!(tracker.update(None), None);
816        assert_eq!(tracker.last_finalized_block, None);
817    }
818}