Skip to main content

reth_transaction_pool/
maintain.rs

1//! Support for maintaining the state of the transaction pool
2
3use crate::{
4    blobstore::{BlobSidecarConverter, BlobStoreCanonTracker, BlobStoreUpdates},
5    error::PoolError,
6    metrics::MaintainPoolMetrics,
7    traits::{CanonicalStateUpdate, EthPoolTransaction, TransactionPool, TransactionPoolExt},
8    AllPoolTransactions, BlobTransactionSidecarVariant, BlockInfo, PoolTransaction, PoolUpdateKind,
9    TransactionOrigin,
10};
11use alloy_consensus::{transaction::TxHashRef, BlockHeader, Typed2718};
12use alloy_eips::{BlockNumberOrTag, Decodable2718, Encodable2718};
13use alloy_primitives::{
14    map::{AddressSet, HashSet},
15    Address, BlockHash, BlockNumber, Bytes,
16};
17use alloy_rlp::Encodable;
18use futures_util::{
19    future::{BoxFuture, Fuse, FusedFuture},
20    FutureExt, Stream, StreamExt,
21};
22use reth_chain_state::CanonStateNotification;
23use reth_chainspec::{ChainSpecProvider, EthChainSpec, EthereumHardforks};
24use reth_execution_types::ChangedAccount;
25use reth_fs_util::FsPathError;
26use reth_primitives_traits::{
27    transaction::signed::SignedTransaction, NodePrimitives, SealedHeader,
28};
29use reth_storage_api::{errors::provider::ProviderError, BlockReaderIdExt, StateProviderFactory};
30use reth_tasks::Runtime;
31use serde::{Deserialize, Serialize};
32use std::{
33    borrow::Borrow,
34    hash::{Hash, Hasher},
35    path::{Path, PathBuf},
36    sync::Arc,
37};
38use tokio::{
39    sync::oneshot,
40    time::{self, Duration},
41};
42use tracing::{debug, error, info, trace, warn};
43
44/// Maximum amount of time non-executable transaction are queued.
45pub const MAX_QUEUED_TRANSACTION_LIFETIME: Duration = Duration::from_secs(3 * 60 * 60);
46
47/// Additional settings for maintaining the transaction pool
48#[derive(Debug, Clone, Copy, PartialEq, Eq)]
49pub struct MaintainPoolConfig {
50    /// Maximum (reorg) depth we handle when updating the transaction pool: `new.number -
51    /// last_seen.number`
52    ///
53    /// Default: 64 (2 epochs)
54    pub max_update_depth: u64,
55    /// Maximum number of accounts to reload from state at once when updating the transaction pool.
56    ///
57    /// Default: 100
58    pub max_reload_accounts: usize,
59
60    /// Maximum amount of time non-executable, non local transactions are queued.
61    /// Default: 3 hours
62    pub max_tx_lifetime: Duration,
63
64    /// Apply no exemptions to the locally received transactions.
65    ///
66    /// This includes:
67    ///   - no price exemptions
68    ///   - no eviction exemptions
69    pub no_local_exemptions: bool,
70}
71
72impl Default for MaintainPoolConfig {
73    fn default() -> Self {
74        Self {
75            max_update_depth: 64,
76            max_reload_accounts: 100,
77            max_tx_lifetime: MAX_QUEUED_TRANSACTION_LIFETIME,
78            no_local_exemptions: false,
79        }
80    }
81}
82
83/// Settings for local transaction backup task
84#[derive(Debug, Clone, Default)]
85pub struct LocalTransactionBackupConfig {
86    /// Path to transactions backup file
87    pub transactions_path: Option<PathBuf>,
88}
89
90impl LocalTransactionBackupConfig {
91    /// Receive path to transactions backup and return initialized config
92    pub const fn with_local_txs_backup(transactions_path: PathBuf) -> Self {
93        Self { transactions_path: Some(transactions_path) }
94    }
95}
96
97/// Returns a spawnable future for maintaining the state of the transaction pool.
98pub fn maintain_transaction_pool_future<N, Client, P, St>(
99    client: Client,
100    pool: P,
101    events: St,
102    task_spawner: Runtime,
103    config: MaintainPoolConfig,
104) -> BoxFuture<'static, ()>
105where
106    N: NodePrimitives,
107    Client: StateProviderFactory
108        + BlockReaderIdExt<Header = N::BlockHeader>
109        + ChainSpecProvider<ChainSpec: EthChainSpec<Header = N::BlockHeader> + EthereumHardforks>
110        + Clone
111        + 'static,
112    P: TransactionPoolExt<Transaction: PoolTransaction<Consensus = N::SignedTx>, Block = N::Block>
113        + 'static,
114    St: Stream<Item = CanonStateNotification<N>> + Send + Unpin + 'static,
115{
116    async move {
117        maintain_transaction_pool(client, pool, events, task_spawner, config).await;
118    }
119    .boxed()
120}
121
122/// Maintains the state of the transaction pool by handling new blocks and reorgs.
123///
124/// This listens for any new blocks and reorgs and updates the transaction pool's state accordingly
125pub async fn maintain_transaction_pool<N, Client, P, St>(
126    client: Client,
127    pool: P,
128    mut events: St,
129    task_spawner: Runtime,
130    config: MaintainPoolConfig,
131) where
132    N: NodePrimitives,
133    Client: StateProviderFactory
134        + BlockReaderIdExt<Header = N::BlockHeader>
135        + ChainSpecProvider<ChainSpec: EthChainSpec<Header = N::BlockHeader> + EthereumHardforks>
136        + Clone
137        + 'static,
138    P: TransactionPoolExt<Transaction: PoolTransaction<Consensus = N::SignedTx>, Block = N::Block>
139        + 'static,
140    St: Stream<Item = CanonStateNotification<N>> + Send + Unpin + 'static,
141{
142    let metrics = MaintainPoolMetrics::default();
143    let MaintainPoolConfig { max_update_depth, max_reload_accounts, .. } = config;
144    // ensure the pool points to latest state
145    if let Ok(Some(latest)) = client.header_by_number_or_tag(BlockNumberOrTag::Latest) {
146        let latest = SealedHeader::seal_slow(latest);
147        let chain_spec = client.chain_spec();
148        let info = BlockInfo {
149            block_gas_limit: latest.gas_limit(),
150            last_seen_block_hash: latest.hash(),
151            last_seen_block_number: latest.number(),
152            pending_basefee: chain_spec
153                .next_block_base_fee(latest.header(), latest.timestamp())
154                .unwrap_or_default(),
155            pending_blob_fee: latest
156                .maybe_next_block_blob_fee(chain_spec.blob_params_at_timestamp(latest.timestamp())),
157        };
158        pool.set_block_info(info);
159    }
160
161    // keeps track of mined blob transaction so we can clean finalized transactions
162    let mut blob_store_tracker = BlobStoreCanonTracker::default();
163
164    // keeps track of the latest finalized block
165    let mut last_finalized_block =
166        FinalizedBlockTracker::new(client.finalized_block_number().ok().flatten());
167
168    // keeps track of any dirty accounts that we know of are out of sync with the pool
169    let mut dirty_addresses = HashSet::default();
170
171    // keeps track of the state of the pool wrt to blocks
172    let mut maintained_state = MaintainedPoolState::InSync;
173
174    // the future that reloads accounts from state
175    let mut reload_accounts_fut = Fuse::terminated();
176
177    // eviction interval for stale non local txs
178    let mut stale_eviction_interval = time::interval(config.max_tx_lifetime);
179
180    // toggle for the first notification
181    let mut first_event = true;
182
183    // The update loop that waits for new blocks and reorgs and performs pool updated
184    // Listen for new chain events and derive the update action for the pool
185    loop {
186        trace!(target: "txpool", state=?maintained_state, "awaiting new block or reorg");
187
188        metrics.set_dirty_accounts_len(dirty_addresses.len());
189        let pool_info = pool.block_info();
190
191        // after performing a pool update after a new block we have some time to properly update
192        // dirty accounts and correct if the pool drifted from current state, for example after
193        // restart or a pipeline run
194        if maintained_state.is_drifted() {
195            metrics.inc_drift();
196            // assuming all senders are dirty
197            dirty_addresses = pool.unique_senders();
198            // make sure we toggle the state back to in sync
199            maintained_state = MaintainedPoolState::InSync;
200        }
201
202        // if we have accounts that are out of sync with the pool, we reload them in chunks
203        if !dirty_addresses.is_empty() && reload_accounts_fut.is_terminated() {
204            let (tx, rx) = oneshot::channel();
205            let c = client.clone();
206            let at = pool_info.last_seen_block_hash;
207            let fut = if dirty_addresses.len() > max_reload_accounts {
208                // need to chunk accounts to reload
209                let accs_to_reload =
210                    dirty_addresses.iter().copied().take(max_reload_accounts).collect::<Vec<_>>();
211                for acc in &accs_to_reload {
212                    // make sure we remove them from the dirty set
213                    dirty_addresses.remove(acc);
214                }
215                async move {
216                    let res = load_accounts(c, at, accs_to_reload);
217                    let _ = tx.send(res);
218                }
219                .boxed()
220            } else {
221                // can fetch all dirty accounts at once
222                let accs_to_reload = std::mem::take(&mut dirty_addresses);
223                async move {
224                    let res = load_accounts(c, at, accs_to_reload);
225                    let _ = tx.send(res);
226                }
227                .boxed()
228            };
229            reload_accounts_fut = rx.fuse();
230            task_spawner.spawn_blocking_task(fut);
231        }
232
233        // check if we have a new finalized block
234        if let Some(finalized) =
235            last_finalized_block.update(client.finalized_block_number().ok().flatten()) &&
236            let BlobStoreUpdates::Finalized(blobs) =
237                blob_store_tracker.on_finalized_block(finalized)
238        {
239            metrics.inc_deleted_tracked_blobs(blobs.len());
240            // remove all finalized blobs from the blob store
241            pool.delete_blobs(blobs);
242            // and also do periodic cleanup
243            let pool = pool.clone();
244            task_spawner.spawn_blocking_task(async move {
245                debug!(target: "txpool", finalized_block = %finalized, "cleaning up blob store");
246                pool.cleanup_blobs();
247            });
248        }
249
250        // outcomes of the futures we are waiting on
251        let mut event = None;
252        let mut reloaded = None;
253
254        // select of account reloads and new canonical state updates which should arrive at the rate
255        // of the block time
256        tokio::select! {
257            res = &mut reload_accounts_fut =>  {
258                reloaded = Some(res);
259            }
260            ev = events.next() =>  {
261                 if ev.is_none() {
262                    // the stream ended, we are done
263                    break;
264                }
265                event = ev;
266                // on receiving the first event on start up, mark the pool as drifted to explicitly
267                // trigger revalidation and clear out outdated txs.
268                if first_event {
269                    maintained_state = MaintainedPoolState::Drifted;
270                    first_event = false
271                }
272            }
273            _ = stale_eviction_interval.tick() => {
274                let queued = pool
275                    .queued_transactions();
276                let mut stale_blobs = Vec::new();
277                let now = std::time::Instant::now();
278                let stale_txs: Vec<_> = queued
279                    .into_iter()
280                    .filter(|tx| {
281                        // filter stale transactions based on config
282                        (tx.origin.is_external() || config.no_local_exemptions) && now - tx.timestamp > config.max_tx_lifetime
283                    })
284                    .map(|tx| {
285                        if tx.is_eip4844() {
286                            stale_blobs.push(*tx.hash());
287                        }
288                        *tx.hash()
289                    })
290                    .collect();
291                debug!(target: "txpool", count=%stale_txs.len(), "removing stale transactions");
292                pool.remove_transactions(stale_txs);
293                pool.delete_blobs(stale_blobs);
294            }
295        }
296        // handle the result of the account reload
297        match reloaded {
298            Some(Ok(Ok(LoadedAccounts { accounts, failed_to_load }))) => {
299                // reloaded accounts successfully
300                // extend accounts we failed to load from database
301                dirty_addresses.extend(failed_to_load);
302                // update the pool with the loaded accounts
303                pool.update_accounts(accounts);
304            }
305            Some(Ok(Err(res))) => {
306                // Failed to load accounts from state
307                let (accs, err) = *res;
308                debug!(target: "txpool", %err, "failed to load accounts");
309                dirty_addresses.extend(accs);
310            }
311            Some(Err(_)) => {
312                // failed to receive the accounts, sender dropped, only possible if task panicked
313                maintained_state = MaintainedPoolState::Drifted;
314            }
315            None => {}
316        }
317
318        // handle the new block or reorg
319        let Some(event) = event else { continue };
320        match event {
321            CanonStateNotification::Reorg { old, new } => {
322                let (old_blocks, old_state) = old.inner();
323                let (new_blocks, new_state) = new.inner();
324                let new_tip = new_blocks.tip();
325                let new_first = new_blocks.first();
326                let old_first = old_blocks.first();
327
328                // check if the reorg is not canonical with the pool's block
329                if !(old_first.parent_hash() == pool_info.last_seen_block_hash ||
330                    new_first.parent_hash() == pool_info.last_seen_block_hash)
331                {
332                    // the new block points to a higher block than the oldest block in the old chain
333                    maintained_state = MaintainedPoolState::Drifted;
334                }
335
336                let chain_spec = client.chain_spec();
337
338                // fees for the next block: `new_tip+1`
339                let pending_block_base_fee = chain_spec
340                    .next_block_base_fee(new_tip.header(), new_tip.timestamp())
341                    .unwrap_or_default();
342                let pending_block_blob_fee = new_tip.header().maybe_next_block_blob_fee(
343                    chain_spec.blob_params_at_timestamp(new_tip.timestamp()),
344                );
345
346                // we know all changed account in the new chain
347                let new_changed_accounts: HashSet<_> =
348                    new_state.changed_accounts().map(ChangedAccountEntry).collect();
349
350                // find all accounts that were changed in the old chain but _not_ in the new chain
351                let missing_changed_acc = old_state
352                    .accounts_iter()
353                    .map(|(a, _)| a)
354                    .filter(|addr| !new_changed_accounts.contains(addr));
355
356                // for these we need to fetch the nonce+balance from the db at the new tip
357                let mut changed_accounts =
358                    match load_accounts(client.clone(), new_tip.hash(), missing_changed_acc) {
359                        Ok(LoadedAccounts { accounts, failed_to_load }) => {
360                            // extend accounts we failed to load from database
361                            dirty_addresses.extend(failed_to_load);
362
363                            accounts
364                        }
365                        Err(err) => {
366                            let (addresses, err) = *err;
367                            debug!(
368                                target: "txpool",
369                                %err,
370                                "failed to load missing changed accounts at new tip: {:?}",
371                                new_tip.hash()
372                            );
373                            dirty_addresses.extend(addresses);
374                            vec![]
375                        }
376                    };
377
378                // also include all accounts from new chain
379                // we can use extend here because they are unique
380                changed_accounts.extend(new_changed_accounts.into_iter().map(|entry| entry.0));
381
382                // all transactions mined in the new chain
383                let mined_transactions = new_blocks.transaction_hashes_vec();
384                let new_mined_transactions =
385                    mined_transactions.iter().copied().collect::<HashSet<_>>();
386
387                // update the pool then re-inject the pruned transactions
388                // find all transactions that were mined in the old chain but not in the new chain
389                let pruned_old_transactions = old_blocks
390                    .transactions_with_sender()
391                    .filter(|(_, tx)| !new_mined_transactions.contains(tx.tx_hash()))
392                    .filter_map(|(signer, tx)| {
393                        let tx = tx.clone().with_signer(*signer);
394                        if tx.is_eip4844() {
395                            // reorged blobs no longer include the blob, which is necessary for
396                            // validating the transaction. Even though the transaction could have
397                            // been validated previously, we still need the blob in order to
398                            // accurately set the transaction's
399                            // encoded-length which is propagated over the network.
400                            pool.get_blob(*tx.tx_hash())
401                                .ok()
402                                .flatten()
403                                .map(Arc::unwrap_or_clone)
404                                .and_then(|sidecar| {
405                                    <P as TransactionPool>::Transaction::try_from_eip4844(
406                                        tx, sidecar,
407                                    )
408                                })
409                        } else {
410                            <P as TransactionPool>::Transaction::try_from_consensus(tx).ok()
411                        }
412                    })
413                    .collect::<Vec<_>>();
414
415                // update the pool first
416                let update = CanonicalStateUpdate {
417                    new_tip: new_tip.sealed_block(),
418                    pending_block_base_fee,
419                    pending_block_blob_fee,
420                    changed_accounts,
421                    // all transactions mined in the new chain need to be removed from the pool
422                    mined_transactions,
423                    update_kind: PoolUpdateKind::Reorg,
424                };
425                pool.on_canonical_state_change(update);
426
427                // all transactions that were mined in the old chain but not in the new chain need
428                // to be re-injected
429                //
430                // Note: we no longer know if the tx was local or external
431                // Because the transactions are not finalized, the corresponding blobs are still in
432                // blob store (if we previously received them from the network)
433                metrics.inc_reinserted_transactions(pruned_old_transactions.len());
434                let _ = pool.add_external_transactions(pruned_old_transactions).await;
435
436                // keep track of new mined blob transactions
437                blob_store_tracker.add_new_chain_blocks(&new_blocks);
438            }
439            CanonStateNotification::Commit { new } => {
440                let (blocks, state) = new.inner();
441                let tip = blocks.tip();
442                let chain_spec = client.chain_spec();
443
444                // fees for the next block: `tip+1`
445                let pending_block_base_fee = chain_spec
446                    .next_block_base_fee(tip.header(), tip.timestamp())
447                    .unwrap_or_default();
448                let pending_block_blob_fee = tip.header().maybe_next_block_blob_fee(
449                    chain_spec.blob_params_at_timestamp(tip.timestamp()),
450                );
451
452                let first_block = blocks.first();
453                trace!(
454                    target: "txpool",
455                    first = first_block.number(),
456                    tip = tip.number(),
457                    pool_block = pool_info.last_seen_block_number,
458                    "update pool on new commit"
459                );
460
461                // check if the depth is too large and should be skipped, this could happen after
462                // initial sync or long re-sync
463                let depth = tip.number().abs_diff(pool_info.last_seen_block_number);
464                if depth > max_update_depth {
465                    maintained_state = MaintainedPoolState::Drifted;
466                    debug!(target: "txpool", ?depth, "skipping deep canonical update");
467                    let info = BlockInfo {
468                        block_gas_limit: tip.header().gas_limit(),
469                        last_seen_block_hash: tip.hash(),
470                        last_seen_block_number: tip.number(),
471                        pending_basefee: pending_block_base_fee,
472                        pending_blob_fee: pending_block_blob_fee,
473                    };
474                    pool.set_block_info(info);
475
476                    // keep track of mined blob transactions
477                    blob_store_tracker.add_new_chain_blocks(&blocks);
478
479                    continue
480                }
481
482                let mut changed_accounts = Vec::with_capacity(state.state().len());
483                for acc in state.changed_accounts() {
484                    // we can always clear the dirty flag for this account
485                    dirty_addresses.remove(&acc.address);
486                    changed_accounts.push(acc);
487                }
488
489                let mined_transactions = blocks.transaction_hashes_vec();
490
491                // check if the range of the commit is canonical with the pool's block
492                if first_block.parent_hash() != pool_info.last_seen_block_hash {
493                    // we received a new canonical chain commit but the commit is not canonical with
494                    // the pool's block, this could happen after initial sync or
495                    // long re-sync
496                    maintained_state = MaintainedPoolState::Drifted;
497                }
498
499                // Canonical update
500                let update = CanonicalStateUpdate {
501                    new_tip: tip.sealed_block(),
502                    pending_block_base_fee,
503                    pending_block_blob_fee,
504                    changed_accounts,
505                    mined_transactions,
506                    update_kind: PoolUpdateKind::Commit,
507                };
508                pool.on_canonical_state_change(update);
509
510                // keep track of mined blob transactions
511                blob_store_tracker.add_new_chain_blocks(&blocks);
512
513                // If Osaka activates in 2 slots we need to convert blobs to new format.
514                if !chain_spec.is_osaka_active_at_timestamp(tip.timestamp()) &&
515                    !chain_spec.is_osaka_active_at_timestamp(tip.timestamp().saturating_add(12)) &&
516                    chain_spec.is_osaka_active_at_timestamp(tip.timestamp().saturating_add(24))
517                {
518                    let pool = pool.clone();
519                    let spawner = task_spawner.clone();
520                    let client = client.clone();
521                    task_spawner.spawn_task(async move {
522                        // Start converting not eaerlier than 4 seconds into current slot to ensure
523                        // that our pool only contains valid transactions for the next block (as
524                        // it's not Osaka yet).
525                        tokio::time::sleep(Duration::from_secs(4)).await;
526
527                        let mut interval = tokio::time::interval(Duration::from_secs(1));
528                        loop {
529                            // Loop and replace blob transactions until we reach Osaka transition
530                            // block after which no legacy blobs are going to be accepted.
531                            let last_iteration =
532                                client.latest_header().ok().flatten().is_none_or(|header| {
533                                    client
534                                        .chain_spec()
535                                        .is_osaka_active_at_timestamp(header.timestamp())
536                                });
537
538                            let AllPoolTransactions { pending, queued } = pool.all_transactions();
539                            for tx in pending.into_iter().chain(queued).filter(|tx| tx.is_eip4844())
540                            {
541                                let tx_hash = *tx.hash();
542
543                                // Fetch sidecar from the pool
544                                let Ok(Some(sidecar)) = pool.get_blob(tx_hash) else {
545                                    continue;
546                                };
547                                // Ensure it is a legacy blob
548                                if !sidecar.is_eip4844() {
549                                    continue;
550                                }
551                                // Remove transaction and sidecar from the pool, both are in memory
552                                // now
553                                let Some(tx) = pool.remove_transactions(vec![tx_hash]).pop() else {
554                                    continue;
555                                };
556                                pool.delete_blob(tx_hash);
557
558                                let BlobTransactionSidecarVariant::Eip4844(sidecar) =
559                                    Arc::unwrap_or_clone(sidecar)
560                                else {
561                                    continue;
562                                };
563
564                                let converter = BlobSidecarConverter::new();
565                                let pool = pool.clone();
566                                spawner.spawn_task(async move {
567                                    // Convert sidecar to EIP-7594 format
568                                    let Some(sidecar) = converter.convert(sidecar).await else {
569                                        return;
570                                    };
571
572                                    // Re-insert transaction with the new sidecar
573                                    let origin = tx.origin;
574                                    let Some(tx) = EthPoolTransaction::try_from_eip4844(
575                                        tx.to_consensus(),
576                                        sidecar.into(),
577                                    ) else {
578                                        return;
579                                    };
580                                    let _ = pool.add_transaction(origin, tx).await;
581                                });
582                            }
583
584                            if last_iteration {
585                                break;
586                            }
587
588                            interval.tick().await;
589                        }
590                    });
591                }
592            }
593        }
594    }
595}
596
597struct FinalizedBlockTracker {
598    last_finalized_block: Option<BlockNumber>,
599}
600
601impl FinalizedBlockTracker {
602    const fn new(last_finalized_block: Option<BlockNumber>) -> Self {
603        Self { last_finalized_block }
604    }
605
606    /// Updates the tracked finalized block and returns the new finalized block if it changed
607    fn update(&mut self, finalized_block: Option<BlockNumber>) -> Option<BlockNumber> {
608        let finalized = finalized_block?;
609        self.last_finalized_block.is_none_or(|last| last < finalized).then(|| {
610            self.last_finalized_block = Some(finalized);
611            finalized
612        })
613    }
614}
615
616/// Keeps track of the pool's state, whether the accounts in the pool are in sync with the actual
617/// state.
618#[derive(Debug, PartialEq, Eq)]
619enum MaintainedPoolState {
620    /// Pool is assumed to be in sync with the current state
621    InSync,
622    /// Pool could be out of sync with the state
623    Drifted,
624}
625
626impl MaintainedPoolState {
627    /// Returns `true` if the pool is assumed to be out of sync with the current state.
628    #[inline]
629    const fn is_drifted(&self) -> bool {
630        matches!(self, Self::Drifted)
631    }
632}
633
634/// A unique [`ChangedAccount`] identified by its address that can be used for deduplication
635#[derive(Eq)]
636struct ChangedAccountEntry(ChangedAccount);
637
638impl PartialEq for ChangedAccountEntry {
639    fn eq(&self, other: &Self) -> bool {
640        self.0.address == other.0.address
641    }
642}
643
644impl Hash for ChangedAccountEntry {
645    fn hash<H: Hasher>(&self, state: &mut H) {
646        self.0.address.hash(state);
647    }
648}
649
650impl Borrow<Address> for ChangedAccountEntry {
651    fn borrow(&self) -> &Address {
652        &self.0.address
653    }
654}
655
656#[derive(Default)]
657struct LoadedAccounts {
658    /// All accounts that were loaded
659    accounts: Vec<ChangedAccount>,
660    /// All accounts that failed to load
661    failed_to_load: Vec<Address>,
662}
663
664/// Loads all accounts at the given state
665///
666/// Returns an error with all given addresses if the state is not available.
667///
668/// Note: this expects _unique_ addresses
669fn load_accounts<Client, I>(
670    client: Client,
671    at: BlockHash,
672    addresses: I,
673) -> Result<LoadedAccounts, Box<(AddressSet, ProviderError)>>
674where
675    I: IntoIterator<Item = Address>,
676    Client: StateProviderFactory,
677{
678    let addresses = addresses.into_iter();
679    let mut res = LoadedAccounts::default();
680    let state = match client.history_by_block_hash(at) {
681        Ok(state) => state,
682        Err(err) => return Err(Box::new((addresses.collect(), err))),
683    };
684    for addr in addresses {
685        if let Ok(maybe_acc) = state.basic_account(&addr) {
686            let acc = maybe_acc
687                .map(|acc| ChangedAccount { address: addr, nonce: acc.nonce, balance: acc.balance })
688                .unwrap_or_else(|| ChangedAccount::empty(addr));
689            res.accounts.push(acc)
690        } else {
691            // failed to load account.
692            res.failed_to_load.push(addr);
693        }
694    }
695    Ok(res)
696}
697
698/// Loads transactions from a file, decodes them from the JSON or RLP format, and
699/// inserts them into the transaction pool on node boot up.
700/// The file is removed after the transactions have been successfully processed.
701async fn load_and_reinsert_transactions<P>(
702    pool: P,
703    file_path: &Path,
704) -> Result<(), TransactionsBackupError>
705where
706    P: TransactionPool<Transaction: PoolTransaction<Consensus: SignedTransaction>>,
707{
708    if !file_path.exists() {
709        return Ok(())
710    }
711
712    debug!(target: "txpool", txs_file =?file_path, "Check local persistent storage for saved transactions");
713    let data = reth_fs_util::read(file_path)?;
714
715    if data.is_empty() {
716        return Ok(())
717    }
718
719    let pool_transactions: Vec<(TransactionOrigin, <P as TransactionPool>::Transaction)> =
720        if let Ok(tx_backups) = serde_json::from_slice::<Vec<TxBackup>>(&data) {
721            tx_backups
722                .into_iter()
723                .filter_map(|backup| {
724                    let tx_signed =
725                        <P::Transaction as PoolTransaction>::Consensus::decode_2718_exact(
726                            backup.rlp.as_ref(),
727                        )
728                        .ok()?;
729                    let recovered = tx_signed.try_into_recovered().ok()?;
730                    let pool_tx =
731                        <P::Transaction as PoolTransaction>::try_from_consensus(recovered).ok()?;
732
733                    Some((backup.origin, pool_tx))
734                })
735                .collect()
736        } else {
737            let txs_signed: Vec<<P::Transaction as PoolTransaction>::Consensus> =
738                alloy_rlp::Decodable::decode(&mut data.as_slice())?;
739
740            txs_signed
741                .into_iter()
742                .filter_map(|tx| tx.try_into_recovered().ok())
743                .filter_map(|tx| {
744                    <P::Transaction as PoolTransaction>::try_from_consensus(tx)
745                        .ok()
746                        .map(|pool_tx| (TransactionOrigin::Local, pool_tx))
747                })
748                .collect()
749        };
750
751    let inserted = futures_util::future::join_all(
752        pool_transactions.into_iter().map(|(origin, tx)| pool.add_transaction(origin, tx)),
753    )
754    .await;
755
756    info!(target: "txpool", txs_file =?file_path, num_txs=%inserted.len(), "Successfully reinserted local transactions from file");
757    reth_fs_util::remove_file(file_path)?;
758    Ok(())
759}
760
761fn save_local_txs_backup<P>(pool: P, file_path: &Path)
762where
763    P: TransactionPool<Transaction: PoolTransaction<Consensus: Encodable>>,
764{
765    let local_transactions = pool.get_local_transactions();
766    if local_transactions.is_empty() {
767        trace!(target: "txpool", "no local transactions to save");
768        return
769    }
770
771    let local_transactions = local_transactions
772        .into_iter()
773        .map(|tx| {
774            let consensus_tx = tx.to_consensus().into_inner();
775            let rlp_data = consensus_tx.encoded_2718();
776
777            TxBackup { rlp: rlp_data.into(), origin: tx.origin }
778        })
779        .collect::<Vec<_>>();
780
781    let json_data = match serde_json::to_string(&local_transactions) {
782        Ok(data) => data,
783        Err(err) => {
784            warn!(target: "txpool", %err, txs_file=?file_path, "failed to serialize local transactions to json");
785            return
786        }
787    };
788
789    info!(target: "txpool", txs_file =?file_path, num_txs=%local_transactions.len(), "Saving current local transactions");
790    let parent_dir = file_path.parent().map(std::fs::create_dir_all).transpose();
791
792    match parent_dir.map(|_| reth_fs_util::write(file_path, json_data)) {
793        Ok(_) => {
794            info!(target: "txpool", txs_file=?file_path, "Wrote local transactions to file");
795        }
796        Err(err) => {
797            warn!(target: "txpool", %err, txs_file=?file_path, "Failed to write local transactions to file");
798        }
799    }
800}
801
802/// A transaction backup that is saved as json to a file for
803/// reinsertion into the pool
804#[derive(Debug, Deserialize, Serialize)]
805pub struct TxBackup {
806    /// Encoded transaction
807    pub rlp: Bytes,
808    /// The origin of the transaction
809    pub origin: TransactionOrigin,
810}
811
812/// Errors possible during txs backup load and decode
813#[derive(thiserror::Error, Debug)]
814pub enum TransactionsBackupError {
815    /// Error during RLP decoding of transactions
816    #[error("failed to apply transactions backup. Encountered RLP decode error: {0}")]
817    Decode(#[from] alloy_rlp::Error),
818    /// Error during json decoding of transactions
819    #[error("failed to apply transactions backup. Encountered JSON decode error: {0}")]
820    Json(#[from] serde_json::Error),
821    /// Error during file upload
822    #[error("failed to apply transactions backup. Encountered file error: {0}")]
823    FsPath(#[from] FsPathError),
824    /// Error adding transactions to the transaction pool
825    #[error("failed to insert transactions to the transactions pool. Encountered pool error: {0}")]
826    Pool(#[from] PoolError),
827}
828
829/// Task which manages saving local transactions to the persistent file in case of shutdown.
830/// Reloads the transactions from the file on the boot up and inserts them into the pool.
831pub async fn backup_local_transactions_task<P>(
832    shutdown: reth_tasks::shutdown::GracefulShutdown,
833    pool: P,
834    config: LocalTransactionBackupConfig,
835) where
836    P: TransactionPool<Transaction: PoolTransaction<Consensus: SignedTransaction>> + Clone,
837{
838    let Some(transactions_path) = config.transactions_path else {
839        // nothing to do
840        return
841    };
842
843    if let Err(err) = load_and_reinsert_transactions(pool.clone(), &transactions_path).await {
844        error!(target: "txpool", "{}", err)
845    }
846
847    let graceful_guard = shutdown.await;
848
849    // write transactions to disk
850    save_local_txs_backup(pool, &transactions_path);
851
852    drop(graceful_guard)
853}
854
855#[cfg(test)]
856mod tests {
857    use super::*;
858    use crate::{
859        blobstore::InMemoryBlobStore, validate::EthTransactionValidatorBuilder,
860        CoinbaseTipOrdering, EthPooledTransaction, Pool, TransactionOrigin,
861    };
862    use alloy_eips::eip2718::Decodable2718;
863    use alloy_primitives::{hex, U256};
864    use reth_ethereum_primitives::PooledTransactionVariant;
865    use reth_evm_ethereum::EthEvmConfig;
866    use reth_fs_util as fs;
867    use reth_provider::test_utils::{ExtendedAccount, MockEthProvider};
868    use reth_tasks::Runtime;
869
870    #[test]
871    fn changed_acc_entry() {
872        let changed_acc = ChangedAccountEntry(ChangedAccount::empty(Address::random()));
873        let mut copy = changed_acc.0;
874        copy.nonce = 10;
875        assert!(changed_acc.eq(&ChangedAccountEntry(copy)));
876    }
877
878    const EXTENSION: &str = "json";
879    const FILENAME: &str = "test_transactions_backup";
880
881    #[tokio::test(flavor = "multi_thread")]
882    async fn test_save_local_txs_backup() {
883        let temp_dir = tempfile::tempdir().unwrap();
884        let transactions_path = temp_dir.path().join(FILENAME).with_extension(EXTENSION);
885        let tx_bytes = hex!(
886            "02f87201830655c2808505ef61f08482565f94388c818ca8b9251b393131c08a736a67ccb192978801049e39c4b5b1f580c001a01764ace353514e8abdfb92446de356b260e3c1225b73fc4c8876a6258d12a129a04f02294aa61ca7676061cd99f29275491218b4754b46a0248e5e42bc5091f507"
887        );
888        let tx = PooledTransactionVariant::decode_2718(&mut &tx_bytes[..]).unwrap();
889        let provider = MockEthProvider::default().with_genesis_block();
890        let transaction = EthPooledTransaction::from_pooled(tx.try_into_recovered().unwrap());
891        let tx_to_cmp = transaction.clone();
892        let sender = hex!("1f9090aaE28b8a3dCeaDf281B0F12828e676c326").into();
893        provider.add_account(sender, ExtendedAccount::new(42, U256::MAX));
894        let blob_store = InMemoryBlobStore::default();
895        let validator = EthTransactionValidatorBuilder::new(provider, EthEvmConfig::mainnet())
896            .build(blob_store.clone());
897
898        let txpool = Pool::new(
899            validator,
900            CoinbaseTipOrdering::default(),
901            blob_store.clone(),
902            Default::default(),
903        );
904
905        txpool.add_transaction(TransactionOrigin::Local, transaction.clone()).await.unwrap();
906
907        let rt = Runtime::test();
908        let config = LocalTransactionBackupConfig::with_local_txs_backup(transactions_path.clone());
909        rt.spawn_critical_with_graceful_shutdown_signal("test task", |shutdown| {
910            backup_local_transactions_task(shutdown, txpool.clone(), config)
911        });
912
913        let mut txns = txpool.get_local_transactions();
914        let tx_on_finish = txns.pop().expect("there should be 1 transaction");
915
916        assert_eq!(*tx_to_cmp.hash(), *tx_on_finish.hash());
917
918        rt.graceful_shutdown();
919
920        let data = fs::read(transactions_path).unwrap();
921
922        let txs: Vec<TxBackup> = serde_json::from_slice::<Vec<TxBackup>>(&data).unwrap();
923        assert_eq!(txs.len(), 1);
924
925        temp_dir.close().unwrap();
926    }
927
928    #[test]
929    fn test_update_with_higher_finalized_block() {
930        let mut tracker = FinalizedBlockTracker::new(Some(10));
931        assert_eq!(tracker.update(Some(15)), Some(15));
932        assert_eq!(tracker.last_finalized_block, Some(15));
933    }
934
935    #[test]
936    fn test_update_with_lower_finalized_block() {
937        let mut tracker = FinalizedBlockTracker::new(Some(20));
938        assert_eq!(tracker.update(Some(15)), None);
939        // finalized block should NOT go backwards
940        assert_eq!(tracker.last_finalized_block, Some(20));
941    }
942
943    #[test]
944    fn test_update_with_equal_finalized_block() {
945        let mut tracker = FinalizedBlockTracker::new(Some(20));
946        assert_eq!(tracker.update(Some(20)), None);
947        assert_eq!(tracker.last_finalized_block, Some(20));
948    }
949
950    #[test]
951    fn test_update_with_no_last_finalized_block() {
952        let mut tracker = FinalizedBlockTracker::new(None);
953        assert_eq!(tracker.update(Some(10)), Some(10));
954        assert_eq!(tracker.last_finalized_block, Some(10));
955    }
956
957    #[test]
958    fn test_update_with_no_new_finalized_block() {
959        let mut tracker = FinalizedBlockTracker::new(Some(10));
960        assert_eq!(tracker.update(None), None);
961        assert_eq!(tracker.last_finalized_block, Some(10));
962    }
963
964    #[test]
965    fn test_update_with_no_finalized_blocks() {
966        let mut tracker = FinalizedBlockTracker::new(None);
967        assert_eq!(tracker.update(None), None);
968        assert_eq!(tracker.last_finalized_block, None);
969    }
970}