reth_transaction_pool/
maintain.rs

1//! Support for maintaining the state of the transaction pool
2
3use crate::{
4    blobstore::{BlobStoreCanonTracker, BlobStoreUpdates},
5    error::PoolError,
6    metrics::MaintainPoolMetrics,
7    traits::{CanonicalStateUpdate, EthPoolTransaction, TransactionPool, TransactionPoolExt},
8    BlockInfo, PoolTransaction, PoolUpdateKind, TransactionOrigin,
9};
10use alloy_consensus::{BlockHeader, Typed2718};
11use alloy_eips::{BlockNumberOrTag, Decodable2718, Encodable2718};
12use alloy_primitives::{Address, BlockHash, BlockNumber};
13use alloy_rlp::{Bytes, Encodable};
14use futures_util::{
15    future::{BoxFuture, Fuse, FusedFuture},
16    FutureExt, Stream, StreamExt,
17};
18use reth_chain_state::CanonStateNotification;
19use reth_chainspec::{ChainSpecProvider, EthChainSpec};
20use reth_execution_types::ChangedAccount;
21use reth_fs_util::FsPathError;
22use reth_primitives_traits::{
23    transaction::signed::SignedTransaction, NodePrimitives, SealedHeader,
24};
25use reth_storage_api::{errors::provider::ProviderError, BlockReaderIdExt, StateProviderFactory};
26use reth_tasks::TaskSpawner;
27use serde::{Deserialize, Serialize};
28use std::{
29    borrow::Borrow,
30    collections::HashSet,
31    hash::{Hash, Hasher},
32    path::{Path, PathBuf},
33    sync::Arc,
34};
35use tokio::{
36    sync::oneshot,
37    time::{self, Duration},
38};
39use tracing::{debug, error, info, trace, warn};
40
41/// Maximum amount of time non-executable transaction are queued.
42pub const MAX_QUEUED_TRANSACTION_LIFETIME: Duration = Duration::from_secs(3 * 60 * 60);
43
44/// Additional settings for maintaining the transaction pool
45#[derive(Debug, Clone, Copy, PartialEq, Eq)]
46pub struct MaintainPoolConfig {
47    /// Maximum (reorg) depth we handle when updating the transaction pool: `new.number -
48    /// last_seen.number`
49    ///
50    /// Default: 64 (2 epochs)
51    pub max_update_depth: u64,
52    /// Maximum number of accounts to reload from state at once when updating the transaction pool.
53    ///
54    /// Default: 100
55    pub max_reload_accounts: usize,
56
57    /// Maximum amount of time non-executable, non local transactions are queued.
58    /// Default: 3 hours
59    pub max_tx_lifetime: Duration,
60
61    /// Apply no exemptions to the locally received transactions.
62    ///
63    /// This includes:
64    ///   - no price exemptions
65    ///   - no eviction exemptions
66    pub no_local_exemptions: bool,
67}
68
69impl Default for MaintainPoolConfig {
70    fn default() -> Self {
71        Self {
72            max_update_depth: 64,
73            max_reload_accounts: 100,
74            max_tx_lifetime: MAX_QUEUED_TRANSACTION_LIFETIME,
75            no_local_exemptions: false,
76        }
77    }
78}
79
80/// Settings for local transaction backup task
81#[derive(Debug, Clone, Default)]
82pub struct LocalTransactionBackupConfig {
83    /// Path to transactions backup file
84    pub transactions_path: Option<PathBuf>,
85}
86
87impl LocalTransactionBackupConfig {
88    /// Receive path to transactions backup and return initialized config
89    pub const fn with_local_txs_backup(transactions_path: PathBuf) -> Self {
90        Self { transactions_path: Some(transactions_path) }
91    }
92}
93
94/// Returns a spawnable future for maintaining the state of the transaction pool.
95pub fn maintain_transaction_pool_future<N, Client, P, St, Tasks>(
96    client: Client,
97    pool: P,
98    events: St,
99    task_spawner: Tasks,
100    config: MaintainPoolConfig,
101) -> BoxFuture<'static, ()>
102where
103    N: NodePrimitives,
104    Client: StateProviderFactory
105        + BlockReaderIdExt<Header = N::BlockHeader>
106        + ChainSpecProvider<ChainSpec: EthChainSpec<Header = N::BlockHeader>>
107        + Clone
108        + 'static,
109    P: TransactionPoolExt<Transaction: PoolTransaction<Consensus = N::SignedTx>> + 'static,
110    St: Stream<Item = CanonStateNotification<N>> + Send + Unpin + 'static,
111    Tasks: TaskSpawner + 'static,
112{
113    async move {
114        maintain_transaction_pool(client, pool, events, task_spawner, config).await;
115    }
116    .boxed()
117}
118
119/// Maintains the state of the transaction pool by handling new blocks and reorgs.
120///
121/// This listens for any new blocks and reorgs and updates the transaction pool's state accordingly
122pub async fn maintain_transaction_pool<N, Client, P, St, Tasks>(
123    client: Client,
124    pool: P,
125    mut events: St,
126    task_spawner: Tasks,
127    config: MaintainPoolConfig,
128) where
129    N: NodePrimitives,
130    Client: StateProviderFactory
131        + BlockReaderIdExt<Header = N::BlockHeader>
132        + ChainSpecProvider<ChainSpec: EthChainSpec<Header = N::BlockHeader>>
133        + Clone
134        + 'static,
135    P: TransactionPoolExt<Transaction: PoolTransaction<Consensus = N::SignedTx>> + 'static,
136    St: Stream<Item = CanonStateNotification<N>> + Send + Unpin + 'static,
137    Tasks: TaskSpawner + 'static,
138{
139    let metrics = MaintainPoolMetrics::default();
140    let MaintainPoolConfig { max_update_depth, max_reload_accounts, .. } = config;
141    // ensure the pool points to latest state
142    if let Ok(Some(latest)) = client.header_by_number_or_tag(BlockNumberOrTag::Latest) {
143        let latest = SealedHeader::seal_slow(latest);
144        let chain_spec = client.chain_spec();
145        let info = BlockInfo {
146            block_gas_limit: latest.gas_limit(),
147            last_seen_block_hash: latest.hash(),
148            last_seen_block_number: latest.number(),
149            pending_basefee: chain_spec
150                .next_block_base_fee(latest.header(), latest.timestamp())
151                .unwrap_or_default(),
152            pending_blob_fee: latest
153                .maybe_next_block_blob_fee(chain_spec.blob_params_at_timestamp(latest.timestamp())),
154        };
155        pool.set_block_info(info);
156    }
157
158    // keeps track of mined blob transaction so we can clean finalized transactions
159    let mut blob_store_tracker = BlobStoreCanonTracker::default();
160
161    // keeps track of the latest finalized block
162    let mut last_finalized_block =
163        FinalizedBlockTracker::new(client.finalized_block_number().ok().flatten());
164
165    // keeps track of any dirty accounts that we know of are out of sync with the pool
166    let mut dirty_addresses = HashSet::default();
167
168    // keeps track of the state of the pool wrt to blocks
169    let mut maintained_state = MaintainedPoolState::InSync;
170
171    // the future that reloads accounts from state
172    let mut reload_accounts_fut = Fuse::terminated();
173
174    // eviction interval for stale non local txs
175    let mut stale_eviction_interval = time::interval(config.max_tx_lifetime);
176
177    // toggle for the first notification
178    let mut first_event = true;
179
180    // The update loop that waits for new blocks and reorgs and performs pool updated
181    // Listen for new chain events and derive the update action for the pool
182    loop {
183        trace!(target: "txpool", state=?maintained_state, "awaiting new block or reorg");
184
185        metrics.set_dirty_accounts_len(dirty_addresses.len());
186        let pool_info = pool.block_info();
187
188        // after performing a pool update after a new block we have some time to properly update
189        // dirty accounts and correct if the pool drifted from current state, for example after
190        // restart or a pipeline run
191        if maintained_state.is_drifted() {
192            metrics.inc_drift();
193            // assuming all senders are dirty
194            dirty_addresses = pool.unique_senders();
195            // make sure we toggle the state back to in sync
196            maintained_state = MaintainedPoolState::InSync;
197        }
198
199        // if we have accounts that are out of sync with the pool, we reload them in chunks
200        if !dirty_addresses.is_empty() && reload_accounts_fut.is_terminated() {
201            let (tx, rx) = oneshot::channel();
202            let c = client.clone();
203            let at = pool_info.last_seen_block_hash;
204            let fut = if dirty_addresses.len() > max_reload_accounts {
205                // need to chunk accounts to reload
206                let accs_to_reload =
207                    dirty_addresses.iter().copied().take(max_reload_accounts).collect::<Vec<_>>();
208                for acc in &accs_to_reload {
209                    // make sure we remove them from the dirty set
210                    dirty_addresses.remove(acc);
211                }
212                async move {
213                    let res = load_accounts(c, at, accs_to_reload.into_iter());
214                    let _ = tx.send(res);
215                }
216                .boxed()
217            } else {
218                // can fetch all dirty accounts at once
219                let accs_to_reload = std::mem::take(&mut dirty_addresses);
220                async move {
221                    let res = load_accounts(c, at, accs_to_reload.into_iter());
222                    let _ = tx.send(res);
223                }
224                .boxed()
225            };
226            reload_accounts_fut = rx.fuse();
227            task_spawner.spawn_blocking(fut);
228        }
229
230        // check if we have a new finalized block
231        if let Some(finalized) =
232            last_finalized_block.update(client.finalized_block_number().ok().flatten())
233        {
234            if let BlobStoreUpdates::Finalized(blobs) =
235                blob_store_tracker.on_finalized_block(finalized)
236            {
237                metrics.inc_deleted_tracked_blobs(blobs.len());
238                // remove all finalized blobs from the blob store
239                pool.delete_blobs(blobs);
240                // and also do periodic cleanup
241                let pool = pool.clone();
242                task_spawner.spawn_blocking(Box::pin(async move {
243                    debug!(target: "txpool", finalized_block = %finalized, "cleaning up blob store");
244                    pool.cleanup_blobs();
245                }));
246            }
247        }
248
249        // outcomes of the futures we are waiting on
250        let mut event = None;
251        let mut reloaded = None;
252
253        // select of account reloads and new canonical state updates which should arrive at the rate
254        // of the block time
255        tokio::select! {
256            res = &mut reload_accounts_fut =>  {
257                reloaded = Some(res);
258            }
259            ev = events.next() =>  {
260                 if ev.is_none() {
261                    // the stream ended, we are done
262                    break;
263                }
264                event = ev;
265                // on receiving the first event on start up, mark the pool as drifted to explicitly
266                // trigger revalidation and clear out outdated txs.
267                if first_event {
268                    maintained_state = MaintainedPoolState::Drifted;
269                    first_event = false
270                }
271            }
272            _ = stale_eviction_interval.tick() => {
273                let stale_txs: Vec<_> = pool
274                    .queued_transactions()
275                    .into_iter()
276                    .filter(|tx| {
277                        // filter stale transactions based on config
278                        (tx.origin.is_external() || config.no_local_exemptions) && tx.timestamp.elapsed() > config.max_tx_lifetime
279                    })
280                    .map(|tx| *tx.hash())
281                    .collect();
282                debug!(target: "txpool", count=%stale_txs.len(), "removing stale transactions");
283                pool.remove_transactions(stale_txs);
284            }
285        }
286        // handle the result of the account reload
287        match reloaded {
288            Some(Ok(Ok(LoadedAccounts { accounts, failed_to_load }))) => {
289                // reloaded accounts successfully
290                // extend accounts we failed to load from database
291                dirty_addresses.extend(failed_to_load);
292                // update the pool with the loaded accounts
293                pool.update_accounts(accounts);
294            }
295            Some(Ok(Err(res))) => {
296                // Failed to load accounts from state
297                let (accs, err) = *res;
298                debug!(target: "txpool", %err, "failed to load accounts");
299                dirty_addresses.extend(accs);
300            }
301            Some(Err(_)) => {
302                // failed to receive the accounts, sender dropped, only possible if task panicked
303                maintained_state = MaintainedPoolState::Drifted;
304            }
305            None => {}
306        }
307
308        // handle the new block or reorg
309        let Some(event) = event else { continue };
310        match event {
311            CanonStateNotification::Reorg { old, new } => {
312                let (old_blocks, old_state) = old.inner();
313                let (new_blocks, new_state) = new.inner();
314                let new_tip = new_blocks.tip();
315                let new_first = new_blocks.first();
316                let old_first = old_blocks.first();
317
318                // check if the reorg is not canonical with the pool's block
319                if !(old_first.parent_hash() == pool_info.last_seen_block_hash ||
320                    new_first.parent_hash() == pool_info.last_seen_block_hash)
321                {
322                    // the new block points to a higher block than the oldest block in the old chain
323                    maintained_state = MaintainedPoolState::Drifted;
324                }
325
326                let chain_spec = client.chain_spec();
327
328                // fees for the next block: `new_tip+1`
329                let pending_block_base_fee = chain_spec
330                    .next_block_base_fee(new_tip.header(), new_tip.timestamp())
331                    .unwrap_or_default();
332                let pending_block_blob_fee = new_tip.header().maybe_next_block_blob_fee(
333                    chain_spec.blob_params_at_timestamp(new_tip.timestamp()),
334                );
335
336                // we know all changed account in the new chain
337                let new_changed_accounts: HashSet<_> =
338                    new_state.changed_accounts().map(ChangedAccountEntry).collect();
339
340                // find all accounts that were changed in the old chain but _not_ in the new chain
341                let missing_changed_acc = old_state
342                    .accounts_iter()
343                    .map(|(a, _)| a)
344                    .filter(|addr| !new_changed_accounts.contains(addr));
345
346                // for these we need to fetch the nonce+balance from the db at the new tip
347                let mut changed_accounts =
348                    match load_accounts(client.clone(), new_tip.hash(), missing_changed_acc) {
349                        Ok(LoadedAccounts { accounts, failed_to_load }) => {
350                            // extend accounts we failed to load from database
351                            dirty_addresses.extend(failed_to_load);
352
353                            accounts
354                        }
355                        Err(err) => {
356                            let (addresses, err) = *err;
357                            debug!(
358                                target: "txpool",
359                                %err,
360                                "failed to load missing changed accounts at new tip: {:?}",
361                                new_tip.hash()
362                            );
363                            dirty_addresses.extend(addresses);
364                            vec![]
365                        }
366                    };
367
368                // also include all accounts from new chain
369                // we can use extend here because they are unique
370                changed_accounts.extend(new_changed_accounts.into_iter().map(|entry| entry.0));
371
372                // all transactions mined in the new chain
373                let new_mined_transactions: HashSet<_> = new_blocks.transaction_hashes().collect();
374
375                // update the pool then re-inject the pruned transactions
376                // find all transactions that were mined in the old chain but not in the new chain
377                let pruned_old_transactions = old_blocks
378                    .transactions_ecrecovered()
379                    .filter(|tx| !new_mined_transactions.contains(tx.tx_hash()))
380                    .filter_map(|tx| {
381                        if tx.is_eip4844() {
382                            // reorged blobs no longer include the blob, which is necessary for
383                            // validating the transaction. Even though the transaction could have
384                            // been validated previously, we still need the blob in order to
385                            // accurately set the transaction's
386                            // encoded-length which is propagated over the network.
387                            pool.get_blob(*tx.tx_hash())
388                                .ok()
389                                .flatten()
390                                .map(Arc::unwrap_or_clone)
391                                .and_then(|sidecar| {
392                                    <P as TransactionPool>::Transaction::try_from_eip4844(
393                                        tx, sidecar,
394                                    )
395                                })
396                        } else {
397                            <P as TransactionPool>::Transaction::try_from_consensus(tx).ok()
398                        }
399                    })
400                    .collect::<Vec<_>>();
401
402                // update the pool first
403                let update = CanonicalStateUpdate {
404                    new_tip: new_tip.sealed_block(),
405                    pending_block_base_fee,
406                    pending_block_blob_fee,
407                    changed_accounts,
408                    // all transactions mined in the new chain need to be removed from the pool
409                    mined_transactions: new_blocks.transaction_hashes().collect(),
410                    update_kind: PoolUpdateKind::Reorg,
411                };
412                pool.on_canonical_state_change(update);
413
414                // all transactions that were mined in the old chain but not in the new chain need
415                // to be re-injected
416                //
417                // Note: we no longer know if the tx was local or external
418                // Because the transactions are not finalized, the corresponding blobs are still in
419                // blob store (if we previously received them from the network)
420                metrics.inc_reinserted_transactions(pruned_old_transactions.len());
421                let _ = pool.add_external_transactions(pruned_old_transactions).await;
422
423                // keep track of new mined blob transactions
424                blob_store_tracker.add_new_chain_blocks(&new_blocks);
425            }
426            CanonStateNotification::Commit { new } => {
427                let (blocks, state) = new.inner();
428                let tip = blocks.tip();
429                let chain_spec = client.chain_spec();
430
431                // fees for the next block: `tip+1`
432                let pending_block_base_fee = chain_spec
433                    .next_block_base_fee(tip.header(), tip.timestamp())
434                    .unwrap_or_default();
435                let pending_block_blob_fee = tip.header().maybe_next_block_blob_fee(
436                    chain_spec.blob_params_at_timestamp(tip.timestamp()),
437                );
438
439                let first_block = blocks.first();
440                trace!(
441                    target: "txpool",
442                    first = first_block.number(),
443                    tip = tip.number(),
444                    pool_block = pool_info.last_seen_block_number,
445                    "update pool on new commit"
446                );
447
448                // check if the depth is too large and should be skipped, this could happen after
449                // initial sync or long re-sync
450                let depth = tip.number().abs_diff(pool_info.last_seen_block_number);
451                if depth > max_update_depth {
452                    maintained_state = MaintainedPoolState::Drifted;
453                    debug!(target: "txpool", ?depth, "skipping deep canonical update");
454                    let info = BlockInfo {
455                        block_gas_limit: tip.header().gas_limit(),
456                        last_seen_block_hash: tip.hash(),
457                        last_seen_block_number: tip.number(),
458                        pending_basefee: pending_block_base_fee,
459                        pending_blob_fee: pending_block_blob_fee,
460                    };
461                    pool.set_block_info(info);
462
463                    // keep track of mined blob transactions
464                    blob_store_tracker.add_new_chain_blocks(&blocks);
465
466                    continue
467                }
468
469                let mut changed_accounts = Vec::with_capacity(state.state().len());
470                for acc in state.changed_accounts() {
471                    // we can always clear the dirty flag for this account
472                    dirty_addresses.remove(&acc.address);
473                    changed_accounts.push(acc);
474                }
475
476                let mined_transactions = blocks.transaction_hashes().collect();
477
478                // check if the range of the commit is canonical with the pool's block
479                if first_block.parent_hash() != pool_info.last_seen_block_hash {
480                    // we received a new canonical chain commit but the commit is not canonical with
481                    // the pool's block, this could happen after initial sync or
482                    // long re-sync
483                    maintained_state = MaintainedPoolState::Drifted;
484                }
485
486                // Canonical update
487                let update = CanonicalStateUpdate {
488                    new_tip: tip.sealed_block(),
489                    pending_block_base_fee,
490                    pending_block_blob_fee,
491                    changed_accounts,
492                    mined_transactions,
493                    update_kind: PoolUpdateKind::Commit,
494                };
495                pool.on_canonical_state_change(update);
496
497                // keep track of mined blob transactions
498                blob_store_tracker.add_new_chain_blocks(&blocks);
499            }
500        }
501    }
502}
503
504struct FinalizedBlockTracker {
505    last_finalized_block: Option<BlockNumber>,
506}
507
508impl FinalizedBlockTracker {
509    const fn new(last_finalized_block: Option<BlockNumber>) -> Self {
510        Self { last_finalized_block }
511    }
512
513    /// Updates the tracked finalized block and returns the new finalized block if it changed
514    fn update(&mut self, finalized_block: Option<BlockNumber>) -> Option<BlockNumber> {
515        let finalized = finalized_block?;
516        self.last_finalized_block
517            .replace(finalized)
518            .is_none_or(|last| last < finalized)
519            .then_some(finalized)
520    }
521}
522
523/// Keeps track of the pool's state, whether the accounts in the pool are in sync with the actual
524/// state.
525#[derive(Debug, PartialEq, Eq)]
526enum MaintainedPoolState {
527    /// Pool is assumed to be in sync with the current state
528    InSync,
529    /// Pool could be out of sync with the state
530    Drifted,
531}
532
533impl MaintainedPoolState {
534    /// Returns `true` if the pool is assumed to be out of sync with the current state.
535    #[inline]
536    const fn is_drifted(&self) -> bool {
537        matches!(self, Self::Drifted)
538    }
539}
540
541/// A unique [`ChangedAccount`] identified by its address that can be used for deduplication
542#[derive(Eq)]
543struct ChangedAccountEntry(ChangedAccount);
544
545impl PartialEq for ChangedAccountEntry {
546    fn eq(&self, other: &Self) -> bool {
547        self.0.address == other.0.address
548    }
549}
550
551impl Hash for ChangedAccountEntry {
552    fn hash<H: Hasher>(&self, state: &mut H) {
553        self.0.address.hash(state);
554    }
555}
556
557impl Borrow<Address> for ChangedAccountEntry {
558    fn borrow(&self) -> &Address {
559        &self.0.address
560    }
561}
562
563#[derive(Default)]
564struct LoadedAccounts {
565    /// All accounts that were loaded
566    accounts: Vec<ChangedAccount>,
567    /// All accounts that failed to load
568    failed_to_load: Vec<Address>,
569}
570
571/// Loads all accounts at the given state
572///
573/// Returns an error with all given addresses if the state is not available.
574///
575/// Note: this expects _unique_ addresses
576fn load_accounts<Client, I>(
577    client: Client,
578    at: BlockHash,
579    addresses: I,
580) -> Result<LoadedAccounts, Box<(HashSet<Address>, ProviderError)>>
581where
582    I: IntoIterator<Item = Address>,
583    Client: StateProviderFactory,
584{
585    let addresses = addresses.into_iter();
586    let mut res = LoadedAccounts::default();
587    let state = match client.history_by_block_hash(at) {
588        Ok(state) => state,
589        Err(err) => return Err(Box::new((addresses.collect(), err))),
590    };
591    for addr in addresses {
592        if let Ok(maybe_acc) = state.basic_account(&addr) {
593            let acc = maybe_acc
594                .map(|acc| ChangedAccount { address: addr, nonce: acc.nonce, balance: acc.balance })
595                .unwrap_or_else(|| ChangedAccount::empty(addr));
596            res.accounts.push(acc)
597        } else {
598            // failed to load account.
599            res.failed_to_load.push(addr);
600        }
601    }
602    Ok(res)
603}
604
605/// Loads transactions from a file, decodes them from the JSON or RLP format, and
606/// inserts them into the transaction pool on node boot up.
607/// The file is removed after the transactions have been successfully processed.
608async fn load_and_reinsert_transactions<P>(
609    pool: P,
610    file_path: &Path,
611) -> Result<(), TransactionsBackupError>
612where
613    P: TransactionPool<Transaction: PoolTransaction<Consensus: SignedTransaction>>,
614{
615    if !file_path.exists() {
616        return Ok(())
617    }
618
619    debug!(target: "txpool", txs_file =?file_path, "Check local persistent storage for saved transactions");
620    let data = reth_fs_util::read(file_path)?;
621
622    if data.is_empty() {
623        return Ok(())
624    }
625
626    let pool_transactions: Vec<(TransactionOrigin, <P as TransactionPool>::Transaction)> =
627        if let Ok(tx_backups) = serde_json::from_slice::<Vec<TxBackup>>(&data) {
628            tx_backups
629                .into_iter()
630                .filter_map(|backup| {
631                    let tx_signed =
632                        <P::Transaction as PoolTransaction>::Consensus::decode_2718_exact(
633                            backup.rlp.as_ref(),
634                        )
635                        .ok()?;
636                    let recovered = tx_signed.try_into_recovered().ok()?;
637                    let pool_tx =
638                        <P::Transaction as PoolTransaction>::try_from_consensus(recovered).ok()?;
639
640                    Some((backup.origin, pool_tx))
641                })
642                .collect()
643        } else {
644            let txs_signed: Vec<<P::Transaction as PoolTransaction>::Consensus> =
645                alloy_rlp::Decodable::decode(&mut data.as_slice())?;
646
647            txs_signed
648                .into_iter()
649                .filter_map(|tx| tx.try_into_recovered().ok())
650                .filter_map(|tx| {
651                    <P::Transaction as PoolTransaction>::try_from_consensus(tx)
652                        .ok()
653                        .map(|pool_tx| (TransactionOrigin::Local, pool_tx))
654                })
655                .collect()
656        };
657
658    let inserted = futures_util::future::join_all(
659        pool_transactions.into_iter().map(|(origin, tx)| pool.add_transaction(origin, tx)),
660    )
661    .await;
662
663    info!(target: "txpool", txs_file =?file_path, num_txs=%inserted.len(), "Successfully reinserted local transactions from file");
664    reth_fs_util::remove_file(file_path)?;
665    Ok(())
666}
667
668fn save_local_txs_backup<P>(pool: P, file_path: &Path)
669where
670    P: TransactionPool<Transaction: PoolTransaction<Consensus: Encodable>>,
671{
672    let local_transactions = pool.get_local_transactions();
673    if local_transactions.is_empty() {
674        trace!(target: "txpool", "no local transactions to save");
675        return
676    }
677
678    let local_transactions = local_transactions
679        .into_iter()
680        .map(|tx| {
681            let consensus_tx = tx.transaction.clone_into_consensus().into_inner();
682            let rlp_data = consensus_tx.encoded_2718();
683
684            TxBackup { rlp: rlp_data.into(), origin: tx.origin }
685        })
686        .collect::<Vec<_>>();
687
688    let json_data = match serde_json::to_string(&local_transactions) {
689        Ok(data) => data,
690        Err(err) => {
691            warn!(target: "txpool", %err, txs_file=?file_path, "failed to serialize local transactions to json");
692            return
693        }
694    };
695
696    info!(target: "txpool", txs_file =?file_path, num_txs=%local_transactions.len(), "Saving current local transactions");
697    let parent_dir = file_path.parent().map(std::fs::create_dir_all).transpose();
698
699    match parent_dir.map(|_| reth_fs_util::write(file_path, json_data)) {
700        Ok(_) => {
701            info!(target: "txpool", txs_file=?file_path, "Wrote local transactions to file");
702        }
703        Err(err) => {
704            warn!(target: "txpool", %err, txs_file=?file_path, "Failed to write local transactions to file");
705        }
706    }
707}
708
709/// A transaction backup that is saved as json to a file for
710/// reinsertion into the pool
711#[derive(Debug, Deserialize, Serialize)]
712pub struct TxBackup {
713    /// Encoded transaction
714    pub rlp: Bytes,
715    /// The origin of the transaction
716    pub origin: TransactionOrigin,
717}
718
719/// Errors possible during txs backup load and decode
720#[derive(thiserror::Error, Debug)]
721pub enum TransactionsBackupError {
722    /// Error during RLP decoding of transactions
723    #[error("failed to apply transactions backup. Encountered RLP decode error: {0}")]
724    Decode(#[from] alloy_rlp::Error),
725    /// Error during json decoding of transactions
726    #[error("failed to apply transactions backup. Encountered JSON decode error: {0}")]
727    Json(#[from] serde_json::Error),
728    /// Error during file upload
729    #[error("failed to apply transactions backup. Encountered file error: {0}")]
730    FsPath(#[from] FsPathError),
731    /// Error adding transactions to the transaction pool
732    #[error("failed to insert transactions to the transactions pool. Encountered pool error: {0}")]
733    Pool(#[from] PoolError),
734}
735
736/// Task which manages saving local transactions to the persistent file in case of shutdown.
737/// Reloads the transactions from the file on the boot up and inserts them into the pool.
738pub async fn backup_local_transactions_task<P>(
739    shutdown: reth_tasks::shutdown::GracefulShutdown,
740    pool: P,
741    config: LocalTransactionBackupConfig,
742) where
743    P: TransactionPool<Transaction: PoolTransaction<Consensus: SignedTransaction>> + Clone,
744{
745    let Some(transactions_path) = config.transactions_path else {
746        // nothing to do
747        return
748    };
749
750    if let Err(err) = load_and_reinsert_transactions(pool.clone(), &transactions_path).await {
751        error!(target: "txpool", "{}", err)
752    }
753
754    let graceful_guard = shutdown.await;
755
756    // write transactions to disk
757    save_local_txs_backup(pool, &transactions_path);
758
759    drop(graceful_guard)
760}
761
762#[cfg(test)]
763mod tests {
764    use super::*;
765    use crate::{
766        blobstore::InMemoryBlobStore, validate::EthTransactionValidatorBuilder,
767        CoinbaseTipOrdering, EthPooledTransaction, Pool, TransactionOrigin,
768    };
769    use alloy_eips::eip2718::Decodable2718;
770    use alloy_primitives::{hex, U256};
771    use reth_ethereum_primitives::PooledTransactionVariant;
772    use reth_fs_util as fs;
773    use reth_provider::test_utils::{ExtendedAccount, MockEthProvider};
774    use reth_tasks::TaskManager;
775
776    #[test]
777    fn changed_acc_entry() {
778        let changed_acc = ChangedAccountEntry(ChangedAccount::empty(Address::random()));
779        let mut copy = changed_acc.0;
780        copy.nonce = 10;
781        assert!(changed_acc.eq(&ChangedAccountEntry(copy)));
782    }
783
784    const EXTENSION: &str = "json";
785    const FILENAME: &str = "test_transactions_backup";
786
787    #[tokio::test(flavor = "multi_thread")]
788    async fn test_save_local_txs_backup() {
789        let temp_dir = tempfile::tempdir().unwrap();
790        let transactions_path = temp_dir.path().join(FILENAME).with_extension(EXTENSION);
791        let tx_bytes = hex!(
792            "02f87201830655c2808505ef61f08482565f94388c818ca8b9251b393131c08a736a67ccb192978801049e39c4b5b1f580c001a01764ace353514e8abdfb92446de356b260e3c1225b73fc4c8876a6258d12a129a04f02294aa61ca7676061cd99f29275491218b4754b46a0248e5e42bc5091f507"
793        );
794        let tx = PooledTransactionVariant::decode_2718(&mut &tx_bytes[..]).unwrap();
795        let provider = MockEthProvider::default();
796        let transaction = EthPooledTransaction::from_pooled(tx.try_into_recovered().unwrap());
797        let tx_to_cmp = transaction.clone();
798        let sender = hex!("1f9090aaE28b8a3dCeaDf281B0F12828e676c326").into();
799        provider.add_account(sender, ExtendedAccount::new(42, U256::MAX));
800        let blob_store = InMemoryBlobStore::default();
801        let validator = EthTransactionValidatorBuilder::new(provider).build(blob_store.clone());
802
803        let txpool = Pool::new(
804            validator,
805            CoinbaseTipOrdering::default(),
806            blob_store.clone(),
807            Default::default(),
808        );
809
810        txpool.add_transaction(TransactionOrigin::Local, transaction.clone()).await.unwrap();
811
812        let handle = tokio::runtime::Handle::current();
813        let manager = TaskManager::new(handle);
814        let config = LocalTransactionBackupConfig::with_local_txs_backup(transactions_path.clone());
815        manager.executor().spawn_critical_with_graceful_shutdown_signal("test task", |shutdown| {
816            backup_local_transactions_task(shutdown, txpool.clone(), config)
817        });
818
819        let mut txns = txpool.get_local_transactions();
820        let tx_on_finish = txns.pop().expect("there should be 1 transaction");
821
822        assert_eq!(*tx_to_cmp.hash(), *tx_on_finish.hash());
823
824        // shutdown the executor
825        manager.graceful_shutdown();
826
827        let data = fs::read(transactions_path).unwrap();
828
829        let txs: Vec<TxBackup> = serde_json::from_slice::<Vec<TxBackup>>(&data).unwrap();
830        assert_eq!(txs.len(), 1);
831
832        temp_dir.close().unwrap();
833    }
834
835    #[test]
836    fn test_update_with_higher_finalized_block() {
837        let mut tracker = FinalizedBlockTracker::new(Some(10));
838        assert_eq!(tracker.update(Some(15)), Some(15));
839        assert_eq!(tracker.last_finalized_block, Some(15));
840    }
841
842    #[test]
843    fn test_update_with_lower_finalized_block() {
844        let mut tracker = FinalizedBlockTracker::new(Some(20));
845        assert_eq!(tracker.update(Some(15)), None);
846        assert_eq!(tracker.last_finalized_block, Some(15));
847    }
848
849    #[test]
850    fn test_update_with_equal_finalized_block() {
851        let mut tracker = FinalizedBlockTracker::new(Some(20));
852        assert_eq!(tracker.update(Some(20)), None);
853        assert_eq!(tracker.last_finalized_block, Some(20));
854    }
855
856    #[test]
857    fn test_update_with_no_last_finalized_block() {
858        let mut tracker = FinalizedBlockTracker::new(None);
859        assert_eq!(tracker.update(Some(10)), Some(10));
860        assert_eq!(tracker.last_finalized_block, Some(10));
861    }
862
863    #[test]
864    fn test_update_with_no_new_finalized_block() {
865        let mut tracker = FinalizedBlockTracker::new(Some(10));
866        assert_eq!(tracker.update(None), None);
867        assert_eq!(tracker.last_finalized_block, Some(10));
868    }
869
870    #[test]
871    fn test_update_with_no_finalized_blocks() {
872        let mut tracker = FinalizedBlockTracker::new(None);
873        assert_eq!(tracker.update(None), None);
874        assert_eq!(tracker.last_finalized_block, None);
875    }
876}