reth_transaction_pool/
maintain.rs

1//! Support for maintaining the state of the transaction pool
2
3use crate::{
4    blobstore::{BlobStoreCanonTracker, BlobStoreUpdates},
5    error::PoolError,
6    metrics::MaintainPoolMetrics,
7    traits::{CanonicalStateUpdate, EthPoolTransaction, TransactionPool, TransactionPoolExt},
8    BlockInfo, PoolTransaction, PoolUpdateKind,
9};
10use alloy_consensus::{BlockHeader, Typed2718};
11use alloy_eips::BlockNumberOrTag;
12use alloy_primitives::{Address, BlockHash, BlockNumber};
13use alloy_rlp::Encodable;
14use futures_util::{
15    future::{BoxFuture, Fuse, FusedFuture},
16    FutureExt, Stream, StreamExt,
17};
18use reth_chain_state::CanonStateNotification;
19use reth_chainspec::{ChainSpecProvider, EthChainSpec};
20use reth_execution_types::ChangedAccount;
21use reth_fs_util::FsPathError;
22use reth_primitives_traits::{
23    transaction::signed::SignedTransaction, NodePrimitives, SealedHeader,
24};
25use reth_storage_api::{errors::provider::ProviderError, BlockReaderIdExt, StateProviderFactory};
26use reth_tasks::TaskSpawner;
27use std::{
28    borrow::Borrow,
29    collections::HashSet,
30    hash::{Hash, Hasher},
31    path::{Path, PathBuf},
32    sync::Arc,
33};
34use tokio::{
35    sync::oneshot,
36    time::{self, Duration},
37};
38use tracing::{debug, error, info, trace, warn};
39
40/// Maximum amount of time non-executable transaction are queued.
41pub const MAX_QUEUED_TRANSACTION_LIFETIME: Duration = Duration::from_secs(3 * 60 * 60);
42
43/// Additional settings for maintaining the transaction pool
44#[derive(Debug, Clone, Copy, PartialEq, Eq)]
45pub struct MaintainPoolConfig {
46    /// Maximum (reorg) depth we handle when updating the transaction pool: `new.number -
47    /// last_seen.number`
48    ///
49    /// Default: 64 (2 epochs)
50    pub max_update_depth: u64,
51    /// Maximum number of accounts to reload from state at once when updating the transaction pool.
52    ///
53    /// Default: 100
54    pub max_reload_accounts: usize,
55
56    /// Maximum amount of time non-executable, non local transactions are queued.
57    /// Default: 3 hours
58    pub max_tx_lifetime: Duration,
59
60    /// Apply no exemptions to the locally received transactions.
61    ///
62    /// This includes:
63    ///   - no price exemptions
64    ///   - no eviction exemptions
65    pub no_local_exemptions: bool,
66}
67
68impl Default for MaintainPoolConfig {
69    fn default() -> Self {
70        Self {
71            max_update_depth: 64,
72            max_reload_accounts: 100,
73            max_tx_lifetime: MAX_QUEUED_TRANSACTION_LIFETIME,
74            no_local_exemptions: false,
75        }
76    }
77}
78
79/// Settings for local transaction backup task
80#[derive(Debug, Clone, Default)]
81pub struct LocalTransactionBackupConfig {
82    /// Path to transactions backup file
83    pub transactions_path: Option<PathBuf>,
84}
85
86impl LocalTransactionBackupConfig {
87    /// Receive path to transactions backup and return initialized config
88    pub const fn with_local_txs_backup(transactions_path: PathBuf) -> Self {
89        Self { transactions_path: Some(transactions_path) }
90    }
91}
92
93/// Returns a spawnable future for maintaining the state of the transaction pool.
94pub fn maintain_transaction_pool_future<N, Client, P, St, Tasks>(
95    client: Client,
96    pool: P,
97    events: St,
98    task_spawner: Tasks,
99    config: MaintainPoolConfig,
100) -> BoxFuture<'static, ()>
101where
102    N: NodePrimitives,
103    Client: StateProviderFactory + BlockReaderIdExt + ChainSpecProvider + Clone + 'static,
104    P: TransactionPoolExt<Transaction: PoolTransaction<Consensus = N::SignedTx>> + 'static,
105    St: Stream<Item = CanonStateNotification<N>> + Send + Unpin + 'static,
106    Tasks: TaskSpawner + 'static,
107{
108    async move {
109        maintain_transaction_pool(client, pool, events, task_spawner, config).await;
110    }
111    .boxed()
112}
113
114/// Maintains the state of the transaction pool by handling new blocks and reorgs.
115///
116/// This listens for any new blocks and reorgs and updates the transaction pool's state accordingly
117pub async fn maintain_transaction_pool<N, Client, P, St, Tasks>(
118    client: Client,
119    pool: P,
120    mut events: St,
121    task_spawner: Tasks,
122    config: MaintainPoolConfig,
123) where
124    N: NodePrimitives,
125    Client: StateProviderFactory + BlockReaderIdExt + ChainSpecProvider + Clone + 'static,
126    P: TransactionPoolExt<Transaction: PoolTransaction<Consensus = N::SignedTx>> + 'static,
127    St: Stream<Item = CanonStateNotification<N>> + Send + Unpin + 'static,
128    Tasks: TaskSpawner + 'static,
129{
130    let metrics = MaintainPoolMetrics::default();
131    let MaintainPoolConfig { max_update_depth, max_reload_accounts, .. } = config;
132    // ensure the pool points to latest state
133    if let Ok(Some(latest)) = client.header_by_number_or_tag(BlockNumberOrTag::Latest) {
134        let latest = SealedHeader::seal_slow(latest);
135        let chain_spec = client.chain_spec();
136        let info = BlockInfo {
137            block_gas_limit: latest.gas_limit(),
138            last_seen_block_hash: latest.hash(),
139            last_seen_block_number: latest.number(),
140            pending_basefee: latest
141                .next_block_base_fee(chain_spec.base_fee_params_at_timestamp(latest.timestamp()))
142                .unwrap_or_default(),
143            pending_blob_fee: latest
144                .maybe_next_block_blob_fee(chain_spec.blob_params_at_timestamp(latest.timestamp())),
145        };
146        pool.set_block_info(info);
147    }
148
149    // keeps track of mined blob transaction so we can clean finalized transactions
150    let mut blob_store_tracker = BlobStoreCanonTracker::default();
151
152    // keeps track of the latest finalized block
153    let mut last_finalized_block =
154        FinalizedBlockTracker::new(client.finalized_block_number().ok().flatten());
155
156    // keeps track of any dirty accounts that we know of are out of sync with the pool
157    let mut dirty_addresses = HashSet::default();
158
159    // keeps track of the state of the pool wrt to blocks
160    let mut maintained_state = MaintainedPoolState::InSync;
161
162    // the future that reloads accounts from state
163    let mut reload_accounts_fut = Fuse::terminated();
164
165    // eviction interval for stale non local txs
166    let mut stale_eviction_interval = time::interval(config.max_tx_lifetime);
167
168    // toggle for the first notification
169    let mut first_event = true;
170
171    // The update loop that waits for new blocks and reorgs and performs pool updated
172    // Listen for new chain events and derive the update action for the pool
173    loop {
174        trace!(target: "txpool", state=?maintained_state, "awaiting new block or reorg");
175
176        metrics.set_dirty_accounts_len(dirty_addresses.len());
177        let pool_info = pool.block_info();
178
179        // after performing a pool update after a new block we have some time to properly update
180        // dirty accounts and correct if the pool drifted from current state, for example after
181        // restart or a pipeline run
182        if maintained_state.is_drifted() {
183            metrics.inc_drift();
184            // assuming all senders are dirty
185            dirty_addresses = pool.unique_senders();
186            // make sure we toggle the state back to in sync
187            maintained_state = MaintainedPoolState::InSync;
188        }
189
190        // if we have accounts that are out of sync with the pool, we reload them in chunks
191        if !dirty_addresses.is_empty() && reload_accounts_fut.is_terminated() {
192            let (tx, rx) = oneshot::channel();
193            let c = client.clone();
194            let at = pool_info.last_seen_block_hash;
195            let fut = if dirty_addresses.len() > max_reload_accounts {
196                // need to chunk accounts to reload
197                let accs_to_reload =
198                    dirty_addresses.iter().copied().take(max_reload_accounts).collect::<Vec<_>>();
199                for acc in &accs_to_reload {
200                    // make sure we remove them from the dirty set
201                    dirty_addresses.remove(acc);
202                }
203                async move {
204                    let res = load_accounts(c, at, accs_to_reload.into_iter());
205                    let _ = tx.send(res);
206                }
207                .boxed()
208            } else {
209                // can fetch all dirty accounts at once
210                let accs_to_reload = std::mem::take(&mut dirty_addresses);
211                async move {
212                    let res = load_accounts(c, at, accs_to_reload.into_iter());
213                    let _ = tx.send(res);
214                }
215                .boxed()
216            };
217            reload_accounts_fut = rx.fuse();
218            task_spawner.spawn_blocking(fut);
219        }
220
221        // check if we have a new finalized block
222        if let Some(finalized) =
223            last_finalized_block.update(client.finalized_block_number().ok().flatten())
224        {
225            if let BlobStoreUpdates::Finalized(blobs) =
226                blob_store_tracker.on_finalized_block(finalized)
227            {
228                metrics.inc_deleted_tracked_blobs(blobs.len());
229                // remove all finalized blobs from the blob store
230                pool.delete_blobs(blobs);
231                // and also do periodic cleanup
232                let pool = pool.clone();
233                task_spawner.spawn_blocking(Box::pin(async move {
234                    debug!(target: "txpool", finalized_block = %finalized, "cleaning up blob store");
235                    pool.cleanup_blobs();
236                }));
237            }
238        }
239
240        // outcomes of the futures we are waiting on
241        let mut event = None;
242        let mut reloaded = None;
243
244        // select of account reloads and new canonical state updates which should arrive at the rate
245        // of the block time
246        tokio::select! {
247            res = &mut reload_accounts_fut =>  {
248                reloaded = Some(res);
249            }
250            ev = events.next() =>  {
251                 if ev.is_none() {
252                    // the stream ended, we are done
253                    break;
254                }
255                event = ev;
256                // on receiving the first event on start up, mark the pool as drifted to explicitly
257                // trigger revalidation and clear out outdated txs.
258                if first_event {
259                    maintained_state = MaintainedPoolState::Drifted;
260                    first_event = false
261                }
262            }
263            _ = stale_eviction_interval.tick() => {
264                let stale_txs: Vec<_> = pool
265                    .queued_transactions()
266                    .into_iter()
267                    .filter(|tx| {
268                        // filter stale transactions based on config
269                        (tx.origin.is_external() || config.no_local_exemptions) && tx.timestamp.elapsed() > config.max_tx_lifetime
270                    })
271                    .map(|tx| *tx.hash())
272                    .collect();
273                debug!(target: "txpool", count=%stale_txs.len(), "removing stale transactions");
274                pool.remove_transactions(stale_txs);
275            }
276        }
277        // handle the result of the account reload
278        match reloaded {
279            Some(Ok(Ok(LoadedAccounts { accounts, failed_to_load }))) => {
280                // reloaded accounts successfully
281                // extend accounts we failed to load from database
282                dirty_addresses.extend(failed_to_load);
283                // update the pool with the loaded accounts
284                pool.update_accounts(accounts);
285            }
286            Some(Ok(Err(res))) => {
287                // Failed to load accounts from state
288                let (accs, err) = *res;
289                debug!(target: "txpool", %err, "failed to load accounts");
290                dirty_addresses.extend(accs);
291            }
292            Some(Err(_)) => {
293                // failed to receive the accounts, sender dropped, only possible if task panicked
294                maintained_state = MaintainedPoolState::Drifted;
295            }
296            None => {}
297        }
298
299        // handle the new block or reorg
300        let Some(event) = event else { continue };
301        match event {
302            CanonStateNotification::Reorg { old, new } => {
303                let (old_blocks, old_state) = old.inner();
304                let (new_blocks, new_state) = new.inner();
305                let new_tip = new_blocks.tip();
306                let new_first = new_blocks.first();
307                let old_first = old_blocks.first();
308
309                // check if the reorg is not canonical with the pool's block
310                if !(old_first.parent_hash() == pool_info.last_seen_block_hash ||
311                    new_first.parent_hash() == pool_info.last_seen_block_hash)
312                {
313                    // the new block points to a higher block than the oldest block in the old chain
314                    maintained_state = MaintainedPoolState::Drifted;
315                }
316
317                let chain_spec = client.chain_spec();
318
319                // fees for the next block: `new_tip+1`
320                let pending_block_base_fee = new_tip
321                    .header()
322                    .next_block_base_fee(
323                        chain_spec.base_fee_params_at_timestamp(new_tip.timestamp()),
324                    )
325                    .unwrap_or_default();
326                let pending_block_blob_fee = new_tip.header().maybe_next_block_blob_fee(
327                    chain_spec.blob_params_at_timestamp(new_tip.timestamp()),
328                );
329
330                // we know all changed account in the new chain
331                let new_changed_accounts: HashSet<_> =
332                    new_state.changed_accounts().map(ChangedAccountEntry).collect();
333
334                // find all accounts that were changed in the old chain but _not_ in the new chain
335                let missing_changed_acc = old_state
336                    .accounts_iter()
337                    .map(|(a, _)| a)
338                    .filter(|addr| !new_changed_accounts.contains(addr));
339
340                // for these we need to fetch the nonce+balance from the db at the new tip
341                let mut changed_accounts =
342                    match load_accounts(client.clone(), new_tip.hash(), missing_changed_acc) {
343                        Ok(LoadedAccounts { accounts, failed_to_load }) => {
344                            // extend accounts we failed to load from database
345                            dirty_addresses.extend(failed_to_load);
346
347                            accounts
348                        }
349                        Err(err) => {
350                            let (addresses, err) = *err;
351                            debug!(
352                                target: "txpool",
353                                %err,
354                                "failed to load missing changed accounts at new tip: {:?}",
355                                new_tip.hash()
356                            );
357                            dirty_addresses.extend(addresses);
358                            vec![]
359                        }
360                    };
361
362                // also include all accounts from new chain
363                // we can use extend here because they are unique
364                changed_accounts.extend(new_changed_accounts.into_iter().map(|entry| entry.0));
365
366                // all transactions mined in the new chain
367                let new_mined_transactions: HashSet<_> = new_blocks.transaction_hashes().collect();
368
369                // update the pool then re-inject the pruned transactions
370                // find all transactions that were mined in the old chain but not in the new chain
371                let pruned_old_transactions = old_blocks
372                    .transactions_ecrecovered()
373                    .filter(|tx| !new_mined_transactions.contains(tx.tx_hash()))
374                    .filter_map(|tx| {
375                        if tx.is_eip4844() {
376                            // reorged blobs no longer include the blob, which is necessary for
377                            // validating the transaction. Even though the transaction could have
378                            // been validated previously, we still need the blob in order to
379                            // accurately set the transaction's
380                            // encoded-length which is propagated over the network.
381                            pool.get_blob(*tx.tx_hash())
382                                .ok()
383                                .flatten()
384                                .map(Arc::unwrap_or_clone)
385                                .and_then(|sidecar| {
386                                    <P as TransactionPool>::Transaction::try_from_eip4844(
387                                        tx, sidecar,
388                                    )
389                                })
390                        } else {
391                            <P as TransactionPool>::Transaction::try_from_consensus(tx).ok()
392                        }
393                    })
394                    .collect::<Vec<_>>();
395
396                // update the pool first
397                let update = CanonicalStateUpdate {
398                    new_tip: new_tip.sealed_block(),
399                    pending_block_base_fee,
400                    pending_block_blob_fee,
401                    changed_accounts,
402                    // all transactions mined in the new chain need to be removed from the pool
403                    mined_transactions: new_blocks.transaction_hashes().collect(),
404                    update_kind: PoolUpdateKind::Reorg,
405                };
406                pool.on_canonical_state_change(update);
407
408                // all transactions that were mined in the old chain but not in the new chain need
409                // to be re-injected
410                //
411                // Note: we no longer know if the tx was local or external
412                // Because the transactions are not finalized, the corresponding blobs are still in
413                // blob store (if we previously received them from the network)
414                metrics.inc_reinserted_transactions(pruned_old_transactions.len());
415                let _ = pool.add_external_transactions(pruned_old_transactions).await;
416
417                // keep track of new mined blob transactions
418                blob_store_tracker.add_new_chain_blocks(&new_blocks);
419            }
420            CanonStateNotification::Commit { new } => {
421                let (blocks, state) = new.inner();
422                let tip = blocks.tip();
423                let chain_spec = client.chain_spec();
424
425                // fees for the next block: `tip+1`
426                let pending_block_base_fee = tip
427                    .header()
428                    .next_block_base_fee(chain_spec.base_fee_params_at_timestamp(tip.timestamp()))
429                    .unwrap_or_default();
430                let pending_block_blob_fee = tip.header().maybe_next_block_blob_fee(
431                    chain_spec.blob_params_at_timestamp(tip.timestamp()),
432                );
433
434                let first_block = blocks.first();
435                trace!(
436                    target: "txpool",
437                    first = first_block.number(),
438                    tip = tip.number(),
439                    pool_block = pool_info.last_seen_block_number,
440                    "update pool on new commit"
441                );
442
443                // check if the depth is too large and should be skipped, this could happen after
444                // initial sync or long re-sync
445                let depth = tip.number().abs_diff(pool_info.last_seen_block_number);
446                if depth > max_update_depth {
447                    maintained_state = MaintainedPoolState::Drifted;
448                    debug!(target: "txpool", ?depth, "skipping deep canonical update");
449                    let info = BlockInfo {
450                        block_gas_limit: tip.header().gas_limit(),
451                        last_seen_block_hash: tip.hash(),
452                        last_seen_block_number: tip.number(),
453                        pending_basefee: pending_block_base_fee,
454                        pending_blob_fee: pending_block_blob_fee,
455                    };
456                    pool.set_block_info(info);
457
458                    // keep track of mined blob transactions
459                    blob_store_tracker.add_new_chain_blocks(&blocks);
460
461                    continue
462                }
463
464                let mut changed_accounts = Vec::with_capacity(state.state().len());
465                for acc in state.changed_accounts() {
466                    // we can always clear the dirty flag for this account
467                    dirty_addresses.remove(&acc.address);
468                    changed_accounts.push(acc);
469                }
470
471                let mined_transactions = blocks.transaction_hashes().collect();
472
473                // check if the range of the commit is canonical with the pool's block
474                if first_block.parent_hash() != pool_info.last_seen_block_hash {
475                    // we received a new canonical chain commit but the commit is not canonical with
476                    // the pool's block, this could happen after initial sync or
477                    // long re-sync
478                    maintained_state = MaintainedPoolState::Drifted;
479                }
480
481                // Canonical update
482                let update = CanonicalStateUpdate {
483                    new_tip: tip.sealed_block(),
484                    pending_block_base_fee,
485                    pending_block_blob_fee,
486                    changed_accounts,
487                    mined_transactions,
488                    update_kind: PoolUpdateKind::Commit,
489                };
490                pool.on_canonical_state_change(update);
491
492                // keep track of mined blob transactions
493                blob_store_tracker.add_new_chain_blocks(&blocks);
494            }
495        }
496    }
497}
498
499struct FinalizedBlockTracker {
500    last_finalized_block: Option<BlockNumber>,
501}
502
503impl FinalizedBlockTracker {
504    const fn new(last_finalized_block: Option<BlockNumber>) -> Self {
505        Self { last_finalized_block }
506    }
507
508    /// Updates the tracked finalized block and returns the new finalized block if it changed
509    fn update(&mut self, finalized_block: Option<BlockNumber>) -> Option<BlockNumber> {
510        let finalized = finalized_block?;
511        self.last_finalized_block
512            .replace(finalized)
513            .is_none_or(|last| last < finalized)
514            .then_some(finalized)
515    }
516}
517
518/// Keeps track of the pool's state, whether the accounts in the pool are in sync with the actual
519/// state.
520#[derive(Debug, PartialEq, Eq)]
521enum MaintainedPoolState {
522    /// Pool is assumed to be in sync with the current state
523    InSync,
524    /// Pool could be out of sync with the state
525    Drifted,
526}
527
528impl MaintainedPoolState {
529    /// Returns `true` if the pool is assumed to be out of sync with the current state.
530    #[inline]
531    const fn is_drifted(&self) -> bool {
532        matches!(self, Self::Drifted)
533    }
534}
535
536/// A unique [`ChangedAccount`] identified by its address that can be used for deduplication
537#[derive(Eq)]
538struct ChangedAccountEntry(ChangedAccount);
539
540impl PartialEq for ChangedAccountEntry {
541    fn eq(&self, other: &Self) -> bool {
542        self.0.address == other.0.address
543    }
544}
545
546impl Hash for ChangedAccountEntry {
547    fn hash<H: Hasher>(&self, state: &mut H) {
548        self.0.address.hash(state);
549    }
550}
551
552impl Borrow<Address> for ChangedAccountEntry {
553    fn borrow(&self) -> &Address {
554        &self.0.address
555    }
556}
557
558#[derive(Default)]
559struct LoadedAccounts {
560    /// All accounts that were loaded
561    accounts: Vec<ChangedAccount>,
562    /// All accounts that failed to load
563    failed_to_load: Vec<Address>,
564}
565
566/// Loads all accounts at the given state
567///
568/// Returns an error with all given addresses if the state is not available.
569///
570/// Note: this expects _unique_ addresses
571fn load_accounts<Client, I>(
572    client: Client,
573    at: BlockHash,
574    addresses: I,
575) -> Result<LoadedAccounts, Box<(HashSet<Address>, ProviderError)>>
576where
577    I: IntoIterator<Item = Address>,
578    Client: StateProviderFactory,
579{
580    let addresses = addresses.into_iter();
581    let mut res = LoadedAccounts::default();
582    let state = match client.history_by_block_hash(at) {
583        Ok(state) => state,
584        Err(err) => return Err(Box::new((addresses.collect(), err))),
585    };
586    for addr in addresses {
587        if let Ok(maybe_acc) = state.basic_account(&addr) {
588            let acc = maybe_acc
589                .map(|acc| ChangedAccount { address: addr, nonce: acc.nonce, balance: acc.balance })
590                .unwrap_or_else(|| ChangedAccount::empty(addr));
591            res.accounts.push(acc)
592        } else {
593            // failed to load account.
594            res.failed_to_load.push(addr);
595        }
596    }
597    Ok(res)
598}
599
600/// Loads transactions from a file, decodes them from the RLP format, and inserts them
601/// into the transaction pool on node boot up.
602/// The file is removed after the transactions have been successfully processed.
603async fn load_and_reinsert_transactions<P>(
604    pool: P,
605    file_path: &Path,
606) -> Result<(), TransactionsBackupError>
607where
608    P: TransactionPool<Transaction: PoolTransaction<Consensus: SignedTransaction>>,
609{
610    if !file_path.exists() {
611        return Ok(())
612    }
613
614    debug!(target: "txpool", txs_file =?file_path, "Check local persistent storage for saved transactions");
615    let data = reth_fs_util::read(file_path)?;
616
617    if data.is_empty() {
618        return Ok(())
619    }
620
621    let txs_signed: Vec<<P::Transaction as PoolTransaction>::Consensus> =
622        alloy_rlp::Decodable::decode(&mut data.as_slice())?;
623
624    let pool_transactions = txs_signed
625        .into_iter()
626        .filter_map(|tx| tx.try_clone_into_recovered().ok())
627        .filter_map(|tx| {
628            // Filter out errors
629            <P::Transaction as PoolTransaction>::try_from_consensus(tx).ok()
630        })
631        .collect();
632
633    let outcome = pool.add_transactions(crate::TransactionOrigin::Local, pool_transactions).await;
634
635    info!(target: "txpool", txs_file =?file_path, num_txs=%outcome.len(), "Successfully reinserted local transactions from file");
636    reth_fs_util::remove_file(file_path)?;
637    Ok(())
638}
639
640fn save_local_txs_backup<P>(pool: P, file_path: &Path)
641where
642    P: TransactionPool<Transaction: PoolTransaction<Consensus: Encodable>>,
643{
644    let local_transactions = pool.get_local_transactions();
645    if local_transactions.is_empty() {
646        trace!(target: "txpool", "no local transactions to save");
647        return
648    }
649
650    let local_transactions = local_transactions
651        .into_iter()
652        .map(|tx| tx.transaction.clone_into_consensus().into_inner())
653        .collect::<Vec<_>>();
654
655    let num_txs = local_transactions.len();
656    let mut buf = Vec::new();
657    alloy_rlp::encode_list(&local_transactions, &mut buf);
658    info!(target: "txpool", txs_file =?file_path, num_txs=%num_txs, "Saving current local transactions");
659    let parent_dir = file_path.parent().map(std::fs::create_dir_all).transpose();
660
661    match parent_dir.map(|_| reth_fs_util::write(file_path, buf)) {
662        Ok(_) => {
663            info!(target: "txpool", txs_file=?file_path, "Wrote local transactions to file");
664        }
665        Err(err) => {
666            warn!(target: "txpool", %err, txs_file=?file_path, "Failed to write local transactions to file");
667        }
668    }
669}
670
671/// Errors possible during txs backup load and decode
672#[derive(thiserror::Error, Debug)]
673pub enum TransactionsBackupError {
674    /// Error during RLP decoding of transactions
675    #[error("failed to apply transactions backup. Encountered RLP decode error: {0}")]
676    Decode(#[from] alloy_rlp::Error),
677    /// Error during file upload
678    #[error("failed to apply transactions backup. Encountered file error: {0}")]
679    FsPath(#[from] FsPathError),
680    /// Error adding transactions to the transaction pool
681    #[error("failed to insert transactions to the transactions pool. Encountered pool error: {0}")]
682    Pool(#[from] PoolError),
683}
684
685/// Task which manages saving local transactions to the persistent file in case of shutdown.
686/// Reloads the transactions from the file on the boot up and inserts them into the pool.
687pub async fn backup_local_transactions_task<P>(
688    shutdown: reth_tasks::shutdown::GracefulShutdown,
689    pool: P,
690    config: LocalTransactionBackupConfig,
691) where
692    P: TransactionPool<Transaction: PoolTransaction<Consensus: SignedTransaction>> + Clone,
693{
694    let Some(transactions_path) = config.transactions_path else {
695        // nothing to do
696        return
697    };
698
699    if let Err(err) = load_and_reinsert_transactions(pool.clone(), &transactions_path).await {
700        error!(target: "txpool", "{}", err)
701    }
702
703    let graceful_guard = shutdown.await;
704
705    // write transactions to disk
706    save_local_txs_backup(pool, &transactions_path);
707
708    drop(graceful_guard)
709}
710
711#[cfg(test)]
712mod tests {
713    use super::*;
714    use crate::{
715        blobstore::InMemoryBlobStore, validate::EthTransactionValidatorBuilder,
716        CoinbaseTipOrdering, EthPooledTransaction, Pool, TransactionOrigin,
717    };
718    use alloy_consensus::transaction::PooledTransaction;
719    use alloy_eips::eip2718::Decodable2718;
720    use alloy_primitives::{hex, U256};
721    use reth_ethereum_primitives::TransactionSigned;
722    use reth_fs_util as fs;
723    use reth_provider::test_utils::{ExtendedAccount, MockEthProvider};
724    use reth_tasks::TaskManager;
725
726    #[test]
727    fn changed_acc_entry() {
728        let changed_acc = ChangedAccountEntry(ChangedAccount::empty(Address::random()));
729        let mut copy = changed_acc.0;
730        copy.nonce = 10;
731        assert!(changed_acc.eq(&ChangedAccountEntry(copy)));
732    }
733
734    const EXTENSION: &str = "rlp";
735    const FILENAME: &str = "test_transactions_backup";
736
737    #[tokio::test(flavor = "multi_thread")]
738    async fn test_save_local_txs_backup() {
739        let temp_dir = tempfile::tempdir().unwrap();
740        let transactions_path = temp_dir.path().join(FILENAME).with_extension(EXTENSION);
741        let tx_bytes = hex!(
742            "02f87201830655c2808505ef61f08482565f94388c818ca8b9251b393131c08a736a67ccb192978801049e39c4b5b1f580c001a01764ace353514e8abdfb92446de356b260e3c1225b73fc4c8876a6258d12a129a04f02294aa61ca7676061cd99f29275491218b4754b46a0248e5e42bc5091f507"
743        );
744        let tx = PooledTransaction::decode_2718(&mut &tx_bytes[..]).unwrap();
745        let provider = MockEthProvider::default();
746        let transaction = EthPooledTransaction::from_pooled(tx.try_into_recovered().unwrap());
747        let tx_to_cmp = transaction.clone();
748        let sender = hex!("1f9090aaE28b8a3dCeaDf281B0F12828e676c326").into();
749        provider.add_account(sender, ExtendedAccount::new(42, U256::MAX));
750        let blob_store = InMemoryBlobStore::default();
751        let validator = EthTransactionValidatorBuilder::new(provider).build(blob_store.clone());
752
753        let txpool = Pool::new(
754            validator.clone(),
755            CoinbaseTipOrdering::default(),
756            blob_store.clone(),
757            Default::default(),
758        );
759
760        txpool.add_transaction(TransactionOrigin::Local, transaction.clone()).await.unwrap();
761
762        let handle = tokio::runtime::Handle::current();
763        let manager = TaskManager::new(handle);
764        let config = LocalTransactionBackupConfig::with_local_txs_backup(transactions_path.clone());
765        manager.executor().spawn_critical_with_graceful_shutdown_signal("test task", |shutdown| {
766            backup_local_transactions_task(shutdown, txpool.clone(), config)
767        });
768
769        let mut txns = txpool.get_local_transactions();
770        let tx_on_finish = txns.pop().expect("there should be 1 transaction");
771
772        assert_eq!(*tx_to_cmp.hash(), *tx_on_finish.hash());
773
774        // shutdown the executor
775        manager.graceful_shutdown();
776
777        let data = fs::read(transactions_path).unwrap();
778
779        let txs: Vec<TransactionSigned> =
780            alloy_rlp::Decodable::decode(&mut data.as_slice()).unwrap();
781        assert_eq!(txs.len(), 1);
782
783        temp_dir.close().unwrap();
784    }
785
786    #[test]
787    fn test_update_with_higher_finalized_block() {
788        let mut tracker = FinalizedBlockTracker::new(Some(10));
789        assert_eq!(tracker.update(Some(15)), Some(15));
790        assert_eq!(tracker.last_finalized_block, Some(15));
791    }
792
793    #[test]
794    fn test_update_with_lower_finalized_block() {
795        let mut tracker = FinalizedBlockTracker::new(Some(20));
796        assert_eq!(tracker.update(Some(15)), None);
797        assert_eq!(tracker.last_finalized_block, Some(15));
798    }
799
800    #[test]
801    fn test_update_with_equal_finalized_block() {
802        let mut tracker = FinalizedBlockTracker::new(Some(20));
803        assert_eq!(tracker.update(Some(20)), None);
804        assert_eq!(tracker.last_finalized_block, Some(20));
805    }
806
807    #[test]
808    fn test_update_with_no_last_finalized_block() {
809        let mut tracker = FinalizedBlockTracker::new(None);
810        assert_eq!(tracker.update(Some(10)), Some(10));
811        assert_eq!(tracker.last_finalized_block, Some(10));
812    }
813
814    #[test]
815    fn test_update_with_no_new_finalized_block() {
816        let mut tracker = FinalizedBlockTracker::new(Some(10));
817        assert_eq!(tracker.update(None), None);
818        assert_eq!(tracker.last_finalized_block, Some(10));
819    }
820
821    #[test]
822    fn test_update_with_no_finalized_blocks() {
823        let mut tracker = FinalizedBlockTracker::new(None);
824        assert_eq!(tracker.update(None), None);
825        assert_eq!(tracker.last_finalized_block, None);
826    }
827}