reth_transaction_pool/
maintain.rs

1//! Support for maintaining the state of the transaction pool
2
3use crate::{
4    blobstore::{BlobSidecarConverter, BlobStoreCanonTracker, BlobStoreUpdates},
5    error::PoolError,
6    metrics::MaintainPoolMetrics,
7    traits::{CanonicalStateUpdate, EthPoolTransaction, TransactionPool, TransactionPoolExt},
8    AllPoolTransactions, BlobTransactionSidecarVariant, BlockInfo, PoolTransaction, PoolUpdateKind,
9    TransactionOrigin,
10};
11use alloy_consensus::{transaction::TxHashRef, BlockHeader, Typed2718};
12use alloy_eips::{BlockNumberOrTag, Decodable2718, Encodable2718};
13use alloy_primitives::{Address, BlockHash, BlockNumber, Bytes};
14use alloy_rlp::Encodable;
15use futures_util::{
16    future::{BoxFuture, Fuse, FusedFuture},
17    FutureExt, Stream, StreamExt,
18};
19use reth_chain_state::CanonStateNotification;
20use reth_chainspec::{ChainSpecProvider, EthChainSpec, EthereumHardforks};
21use reth_execution_types::ChangedAccount;
22use reth_fs_util::FsPathError;
23use reth_primitives_traits::{
24    transaction::signed::SignedTransaction, NodePrimitives, SealedHeader,
25};
26use reth_storage_api::{errors::provider::ProviderError, BlockReaderIdExt, StateProviderFactory};
27use reth_tasks::TaskSpawner;
28use serde::{Deserialize, Serialize};
29use std::{
30    borrow::Borrow,
31    collections::HashSet,
32    hash::{Hash, Hasher},
33    path::{Path, PathBuf},
34    sync::Arc,
35};
36use tokio::{
37    sync::oneshot,
38    time::{self, Duration},
39};
40use tracing::{debug, error, info, trace, warn};
41
42/// Maximum amount of time non-executable transaction are queued.
43pub const MAX_QUEUED_TRANSACTION_LIFETIME: Duration = Duration::from_secs(3 * 60 * 60);
44
45/// Additional settings for maintaining the transaction pool
46#[derive(Debug, Clone, Copy, PartialEq, Eq)]
47pub struct MaintainPoolConfig {
48    /// Maximum (reorg) depth we handle when updating the transaction pool: `new.number -
49    /// last_seen.number`
50    ///
51    /// Default: 64 (2 epochs)
52    pub max_update_depth: u64,
53    /// Maximum number of accounts to reload from state at once when updating the transaction pool.
54    ///
55    /// Default: 100
56    pub max_reload_accounts: usize,
57
58    /// Maximum amount of time non-executable, non local transactions are queued.
59    /// Default: 3 hours
60    pub max_tx_lifetime: Duration,
61
62    /// Apply no exemptions to the locally received transactions.
63    ///
64    /// This includes:
65    ///   - no price exemptions
66    ///   - no eviction exemptions
67    pub no_local_exemptions: bool,
68}
69
70impl Default for MaintainPoolConfig {
71    fn default() -> Self {
72        Self {
73            max_update_depth: 64,
74            max_reload_accounts: 100,
75            max_tx_lifetime: MAX_QUEUED_TRANSACTION_LIFETIME,
76            no_local_exemptions: false,
77        }
78    }
79}
80
81/// Settings for local transaction backup task
82#[derive(Debug, Clone, Default)]
83pub struct LocalTransactionBackupConfig {
84    /// Path to transactions backup file
85    pub transactions_path: Option<PathBuf>,
86}
87
88impl LocalTransactionBackupConfig {
89    /// Receive path to transactions backup and return initialized config
90    pub const fn with_local_txs_backup(transactions_path: PathBuf) -> Self {
91        Self { transactions_path: Some(transactions_path) }
92    }
93}
94
95/// Returns a spawnable future for maintaining the state of the transaction pool.
96pub fn maintain_transaction_pool_future<N, Client, P, St, Tasks>(
97    client: Client,
98    pool: P,
99    events: St,
100    task_spawner: Tasks,
101    config: MaintainPoolConfig,
102) -> BoxFuture<'static, ()>
103where
104    N: NodePrimitives,
105    Client: StateProviderFactory
106        + BlockReaderIdExt<Header = N::BlockHeader>
107        + ChainSpecProvider<ChainSpec: EthChainSpec<Header = N::BlockHeader> + EthereumHardforks>
108        + Clone
109        + 'static,
110    P: TransactionPoolExt<Transaction: PoolTransaction<Consensus = N::SignedTx>> + 'static,
111    St: Stream<Item = CanonStateNotification<N>> + Send + Unpin + 'static,
112    Tasks: TaskSpawner + Clone + 'static,
113{
114    async move {
115        maintain_transaction_pool(client, pool, events, task_spawner, config).await;
116    }
117    .boxed()
118}
119
120/// Maintains the state of the transaction pool by handling new blocks and reorgs.
121///
122/// This listens for any new blocks and reorgs and updates the transaction pool's state accordingly
123pub async fn maintain_transaction_pool<N, Client, P, St, Tasks>(
124    client: Client,
125    pool: P,
126    mut events: St,
127    task_spawner: Tasks,
128    config: MaintainPoolConfig,
129) where
130    N: NodePrimitives,
131    Client: StateProviderFactory
132        + BlockReaderIdExt<Header = N::BlockHeader>
133        + ChainSpecProvider<ChainSpec: EthChainSpec<Header = N::BlockHeader> + EthereumHardforks>
134        + Clone
135        + 'static,
136    P: TransactionPoolExt<Transaction: PoolTransaction<Consensus = N::SignedTx>> + 'static,
137    St: Stream<Item = CanonStateNotification<N>> + Send + Unpin + 'static,
138    Tasks: TaskSpawner + Clone + 'static,
139{
140    let metrics = MaintainPoolMetrics::default();
141    let MaintainPoolConfig { max_update_depth, max_reload_accounts, .. } = config;
142    // ensure the pool points to latest state
143    if let Ok(Some(latest)) = client.header_by_number_or_tag(BlockNumberOrTag::Latest) {
144        let latest = SealedHeader::seal_slow(latest);
145        let chain_spec = client.chain_spec();
146        let info = BlockInfo {
147            block_gas_limit: latest.gas_limit(),
148            last_seen_block_hash: latest.hash(),
149            last_seen_block_number: latest.number(),
150            pending_basefee: chain_spec
151                .next_block_base_fee(latest.header(), latest.timestamp())
152                .unwrap_or_default(),
153            pending_blob_fee: latest
154                .maybe_next_block_blob_fee(chain_spec.blob_params_at_timestamp(latest.timestamp())),
155        };
156        pool.set_block_info(info);
157    }
158
159    // keeps track of mined blob transaction so we can clean finalized transactions
160    let mut blob_store_tracker = BlobStoreCanonTracker::default();
161
162    // keeps track of the latest finalized block
163    let mut last_finalized_block =
164        FinalizedBlockTracker::new(client.finalized_block_number().ok().flatten());
165
166    // keeps track of any dirty accounts that we know of are out of sync with the pool
167    let mut dirty_addresses = HashSet::default();
168
169    // keeps track of the state of the pool wrt to blocks
170    let mut maintained_state = MaintainedPoolState::InSync;
171
172    // the future that reloads accounts from state
173    let mut reload_accounts_fut = Fuse::terminated();
174
175    // eviction interval for stale non local txs
176    let mut stale_eviction_interval = time::interval(config.max_tx_lifetime);
177
178    // toggle for the first notification
179    let mut first_event = true;
180
181    // The update loop that waits for new blocks and reorgs and performs pool updated
182    // Listen for new chain events and derive the update action for the pool
183    loop {
184        trace!(target: "txpool", state=?maintained_state, "awaiting new block or reorg");
185
186        metrics.set_dirty_accounts_len(dirty_addresses.len());
187        let pool_info = pool.block_info();
188
189        // after performing a pool update after a new block we have some time to properly update
190        // dirty accounts and correct if the pool drifted from current state, for example after
191        // restart or a pipeline run
192        if maintained_state.is_drifted() {
193            metrics.inc_drift();
194            // assuming all senders are dirty
195            dirty_addresses = pool.unique_senders();
196            // make sure we toggle the state back to in sync
197            maintained_state = MaintainedPoolState::InSync;
198        }
199
200        // if we have accounts that are out of sync with the pool, we reload them in chunks
201        if !dirty_addresses.is_empty() && reload_accounts_fut.is_terminated() {
202            let (tx, rx) = oneshot::channel();
203            let c = client.clone();
204            let at = pool_info.last_seen_block_hash;
205            let fut = if dirty_addresses.len() > max_reload_accounts {
206                // need to chunk accounts to reload
207                let accs_to_reload =
208                    dirty_addresses.iter().copied().take(max_reload_accounts).collect::<Vec<_>>();
209                for acc in &accs_to_reload {
210                    // make sure we remove them from the dirty set
211                    dirty_addresses.remove(acc);
212                }
213                async move {
214                    let res = load_accounts(c, at, accs_to_reload.into_iter());
215                    let _ = tx.send(res);
216                }
217                .boxed()
218            } else {
219                // can fetch all dirty accounts at once
220                let accs_to_reload = std::mem::take(&mut dirty_addresses);
221                async move {
222                    let res = load_accounts(c, at, accs_to_reload.into_iter());
223                    let _ = tx.send(res);
224                }
225                .boxed()
226            };
227            reload_accounts_fut = rx.fuse();
228            task_spawner.spawn_blocking(fut);
229        }
230
231        // check if we have a new finalized block
232        if let Some(finalized) =
233            last_finalized_block.update(client.finalized_block_number().ok().flatten()) &&
234            let BlobStoreUpdates::Finalized(blobs) =
235                blob_store_tracker.on_finalized_block(finalized)
236        {
237            metrics.inc_deleted_tracked_blobs(blobs.len());
238            // remove all finalized blobs from the blob store
239            pool.delete_blobs(blobs);
240            // and also do periodic cleanup
241            let pool = pool.clone();
242            task_spawner.spawn_blocking(Box::pin(async move {
243                debug!(target: "txpool", finalized_block = %finalized, "cleaning up blob store");
244                pool.cleanup_blobs();
245            }));
246        }
247
248        // outcomes of the futures we are waiting on
249        let mut event = None;
250        let mut reloaded = None;
251
252        // select of account reloads and new canonical state updates which should arrive at the rate
253        // of the block time
254        tokio::select! {
255            res = &mut reload_accounts_fut =>  {
256                reloaded = Some(res);
257            }
258            ev = events.next() =>  {
259                 if ev.is_none() {
260                    // the stream ended, we are done
261                    break;
262                }
263                event = ev;
264                // on receiving the first event on start up, mark the pool as drifted to explicitly
265                // trigger revalidation and clear out outdated txs.
266                if first_event {
267                    maintained_state = MaintainedPoolState::Drifted;
268                    first_event = false
269                }
270            }
271            _ = stale_eviction_interval.tick() => {
272                let queued = pool
273                    .queued_transactions();
274                let mut stale_blobs = Vec::new();
275                let now = std::time::Instant::now();
276                let stale_txs: Vec<_> = queued
277                    .into_iter()
278                    .filter(|tx| {
279                        // filter stale transactions based on config
280                        (tx.origin.is_external() || config.no_local_exemptions) && now - tx.timestamp > config.max_tx_lifetime
281                    })
282                    .map(|tx| {
283                        if tx.is_eip4844() {
284                            stale_blobs.push(*tx.hash());
285                        }
286                        *tx.hash()
287                    })
288                    .collect();
289                debug!(target: "txpool", count=%stale_txs.len(), "removing stale transactions");
290                pool.remove_transactions(stale_txs);
291                pool.delete_blobs(stale_blobs);
292            }
293        }
294        // handle the result of the account reload
295        match reloaded {
296            Some(Ok(Ok(LoadedAccounts { accounts, failed_to_load }))) => {
297                // reloaded accounts successfully
298                // extend accounts we failed to load from database
299                dirty_addresses.extend(failed_to_load);
300                // update the pool with the loaded accounts
301                pool.update_accounts(accounts);
302            }
303            Some(Ok(Err(res))) => {
304                // Failed to load accounts from state
305                let (accs, err) = *res;
306                debug!(target: "txpool", %err, "failed to load accounts");
307                dirty_addresses.extend(accs);
308            }
309            Some(Err(_)) => {
310                // failed to receive the accounts, sender dropped, only possible if task panicked
311                maintained_state = MaintainedPoolState::Drifted;
312            }
313            None => {}
314        }
315
316        // handle the new block or reorg
317        let Some(event) = event else { continue };
318        match event {
319            CanonStateNotification::Reorg { old, new } => {
320                let (old_blocks, old_state) = old.inner();
321                let (new_blocks, new_state) = new.inner();
322                let new_tip = new_blocks.tip();
323                let new_first = new_blocks.first();
324                let old_first = old_blocks.first();
325
326                // check if the reorg is not canonical with the pool's block
327                if !(old_first.parent_hash() == pool_info.last_seen_block_hash ||
328                    new_first.parent_hash() == pool_info.last_seen_block_hash)
329                {
330                    // the new block points to a higher block than the oldest block in the old chain
331                    maintained_state = MaintainedPoolState::Drifted;
332                }
333
334                let chain_spec = client.chain_spec();
335
336                // fees for the next block: `new_tip+1`
337                let pending_block_base_fee = chain_spec
338                    .next_block_base_fee(new_tip.header(), new_tip.timestamp())
339                    .unwrap_or_default();
340                let pending_block_blob_fee = new_tip.header().maybe_next_block_blob_fee(
341                    chain_spec.blob_params_at_timestamp(new_tip.timestamp()),
342                );
343
344                // we know all changed account in the new chain
345                let new_changed_accounts: HashSet<_> =
346                    new_state.changed_accounts().map(ChangedAccountEntry).collect();
347
348                // find all accounts that were changed in the old chain but _not_ in the new chain
349                let missing_changed_acc = old_state
350                    .accounts_iter()
351                    .map(|(a, _)| a)
352                    .filter(|addr| !new_changed_accounts.contains(addr));
353
354                // for these we need to fetch the nonce+balance from the db at the new tip
355                let mut changed_accounts =
356                    match load_accounts(client.clone(), new_tip.hash(), missing_changed_acc) {
357                        Ok(LoadedAccounts { accounts, failed_to_load }) => {
358                            // extend accounts we failed to load from database
359                            dirty_addresses.extend(failed_to_load);
360
361                            accounts
362                        }
363                        Err(err) => {
364                            let (addresses, err) = *err;
365                            debug!(
366                                target: "txpool",
367                                %err,
368                                "failed to load missing changed accounts at new tip: {:?}",
369                                new_tip.hash()
370                            );
371                            dirty_addresses.extend(addresses);
372                            vec![]
373                        }
374                    };
375
376                // also include all accounts from new chain
377                // we can use extend here because they are unique
378                changed_accounts.extend(new_changed_accounts.into_iter().map(|entry| entry.0));
379
380                // all transactions mined in the new chain
381                let new_mined_transactions: HashSet<_> = new_blocks.transaction_hashes().collect();
382
383                // update the pool then re-inject the pruned transactions
384                // find all transactions that were mined in the old chain but not in the new chain
385                let pruned_old_transactions = old_blocks
386                    .transactions_ecrecovered()
387                    .filter(|tx| !new_mined_transactions.contains(tx.tx_hash()))
388                    .filter_map(|tx| {
389                        if tx.is_eip4844() {
390                            // reorged blobs no longer include the blob, which is necessary for
391                            // validating the transaction. Even though the transaction could have
392                            // been validated previously, we still need the blob in order to
393                            // accurately set the transaction's
394                            // encoded-length which is propagated over the network.
395                            pool.get_blob(*tx.tx_hash())
396                                .ok()
397                                .flatten()
398                                .map(Arc::unwrap_or_clone)
399                                .and_then(|sidecar| {
400                                    <P as TransactionPool>::Transaction::try_from_eip4844(
401                                        tx, sidecar,
402                                    )
403                                })
404                        } else {
405                            <P as TransactionPool>::Transaction::try_from_consensus(tx).ok()
406                        }
407                    })
408                    .collect::<Vec<_>>();
409
410                // update the pool first
411                let update = CanonicalStateUpdate {
412                    new_tip: new_tip.sealed_block(),
413                    pending_block_base_fee,
414                    pending_block_blob_fee,
415                    changed_accounts,
416                    // all transactions mined in the new chain need to be removed from the pool
417                    mined_transactions: new_blocks.transaction_hashes().collect(),
418                    update_kind: PoolUpdateKind::Reorg,
419                };
420                pool.on_canonical_state_change(update);
421
422                // all transactions that were mined in the old chain but not in the new chain need
423                // to be re-injected
424                //
425                // Note: we no longer know if the tx was local or external
426                // Because the transactions are not finalized, the corresponding blobs are still in
427                // blob store (if we previously received them from the network)
428                metrics.inc_reinserted_transactions(pruned_old_transactions.len());
429                let _ = pool.add_external_transactions(pruned_old_transactions).await;
430
431                // keep track of new mined blob transactions
432                blob_store_tracker.add_new_chain_blocks(&new_blocks);
433            }
434            CanonStateNotification::Commit { new } => {
435                let (blocks, state) = new.inner();
436                let tip = blocks.tip();
437                let chain_spec = client.chain_spec();
438
439                // fees for the next block: `tip+1`
440                let pending_block_base_fee = chain_spec
441                    .next_block_base_fee(tip.header(), tip.timestamp())
442                    .unwrap_or_default();
443                let pending_block_blob_fee = tip.header().maybe_next_block_blob_fee(
444                    chain_spec.blob_params_at_timestamp(tip.timestamp()),
445                );
446
447                let first_block = blocks.first();
448                trace!(
449                    target: "txpool",
450                    first = first_block.number(),
451                    tip = tip.number(),
452                    pool_block = pool_info.last_seen_block_number,
453                    "update pool on new commit"
454                );
455
456                // check if the depth is too large and should be skipped, this could happen after
457                // initial sync or long re-sync
458                let depth = tip.number().abs_diff(pool_info.last_seen_block_number);
459                if depth > max_update_depth {
460                    maintained_state = MaintainedPoolState::Drifted;
461                    debug!(target: "txpool", ?depth, "skipping deep canonical update");
462                    let info = BlockInfo {
463                        block_gas_limit: tip.header().gas_limit(),
464                        last_seen_block_hash: tip.hash(),
465                        last_seen_block_number: tip.number(),
466                        pending_basefee: pending_block_base_fee,
467                        pending_blob_fee: pending_block_blob_fee,
468                    };
469                    pool.set_block_info(info);
470
471                    // keep track of mined blob transactions
472                    blob_store_tracker.add_new_chain_blocks(&blocks);
473
474                    continue
475                }
476
477                let mut changed_accounts = Vec::with_capacity(state.state().len());
478                for acc in state.changed_accounts() {
479                    // we can always clear the dirty flag for this account
480                    dirty_addresses.remove(&acc.address);
481                    changed_accounts.push(acc);
482                }
483
484                let mined_transactions = blocks.transaction_hashes().collect();
485
486                // check if the range of the commit is canonical with the pool's block
487                if first_block.parent_hash() != pool_info.last_seen_block_hash {
488                    // we received a new canonical chain commit but the commit is not canonical with
489                    // the pool's block, this could happen after initial sync or
490                    // long re-sync
491                    maintained_state = MaintainedPoolState::Drifted;
492                }
493
494                // Canonical update
495                let update = CanonicalStateUpdate {
496                    new_tip: tip.sealed_block(),
497                    pending_block_base_fee,
498                    pending_block_blob_fee,
499                    changed_accounts,
500                    mined_transactions,
501                    update_kind: PoolUpdateKind::Commit,
502                };
503                pool.on_canonical_state_change(update);
504
505                // keep track of mined blob transactions
506                blob_store_tracker.add_new_chain_blocks(&blocks);
507
508                // If Osaka activates in 2 slots we need to convert blobs to new format.
509                if !chain_spec.is_osaka_active_at_timestamp(tip.timestamp()) &&
510                    !chain_spec.is_osaka_active_at_timestamp(tip.timestamp().saturating_add(12)) &&
511                    chain_spec.is_osaka_active_at_timestamp(tip.timestamp().saturating_add(24))
512                {
513                    let pool = pool.clone();
514                    let spawner = task_spawner.clone();
515                    let client = client.clone();
516                    task_spawner.spawn(Box::pin(async move {
517                        // Start converting not eaerlier than 4 seconds into current slot to ensure
518                        // that our pool only contains valid transactions for the next block (as
519                        // it's not Osaka yet).
520                        tokio::time::sleep(Duration::from_secs(4)).await;
521
522                        let mut interval = tokio::time::interval(Duration::from_secs(1));
523                        loop {
524                            // Loop and replace blob transactions until we reach Osaka transition
525                            // block after which no legacy blobs are going to be accepted.
526                            let last_iteration =
527                                client.latest_header().ok().flatten().is_none_or(|header| {
528                                    client
529                                        .chain_spec()
530                                        .is_osaka_active_at_timestamp(header.timestamp())
531                                });
532
533                            let AllPoolTransactions { pending, queued } = pool.all_transactions();
534                            for tx in pending
535                                .into_iter()
536                                .chain(queued)
537                                .filter(|tx| tx.transaction.is_eip4844())
538                            {
539                                let tx_hash = *tx.transaction.hash();
540
541                                // Fetch sidecar from the pool
542                                let Ok(Some(sidecar)) = pool.get_blob(tx_hash) else {
543                                    continue;
544                                };
545                                // Ensure it is a legacy blob
546                                if !sidecar.is_eip4844() {
547                                    continue;
548                                }
549                                // Remove transaction and sidecar from the pool, both are in memory
550                                // now
551                                let Some(tx) = pool.remove_transactions(vec![tx_hash]).pop() else {
552                                    continue;
553                                };
554                                pool.delete_blob(tx_hash);
555
556                                let BlobTransactionSidecarVariant::Eip4844(sidecar) =
557                                    Arc::unwrap_or_clone(sidecar)
558                                else {
559                                    continue;
560                                };
561
562                                let converter = BlobSidecarConverter::new();
563                                let pool = pool.clone();
564                                spawner.spawn(Box::pin(async move {
565                                    // Convert sidecar to EIP-7594 format
566                                    let Some(sidecar) = converter.convert(sidecar).await else {
567                                        return;
568                                    };
569
570                                    // Re-insert transaction with the new sidecar
571                                    let origin = tx.origin;
572                                    let Some(tx) = EthPoolTransaction::try_from_eip4844(
573                                        tx.transaction.clone_into_consensus(),
574                                        sidecar.into(),
575                                    ) else {
576                                        return;
577                                    };
578                                    let _ = pool.add_transaction(origin, tx).await;
579                                }));
580                            }
581
582                            if last_iteration {
583                                break;
584                            }
585
586                            interval.tick().await;
587                        }
588                    }));
589                }
590            }
591        }
592    }
593}
594
595struct FinalizedBlockTracker {
596    last_finalized_block: Option<BlockNumber>,
597}
598
599impl FinalizedBlockTracker {
600    const fn new(last_finalized_block: Option<BlockNumber>) -> Self {
601        Self { last_finalized_block }
602    }
603
604    /// Updates the tracked finalized block and returns the new finalized block if it changed
605    fn update(&mut self, finalized_block: Option<BlockNumber>) -> Option<BlockNumber> {
606        let finalized = finalized_block?;
607        self.last_finalized_block
608            .replace(finalized)
609            .is_none_or(|last| last < finalized)
610            .then_some(finalized)
611    }
612}
613
614/// Keeps track of the pool's state, whether the accounts in the pool are in sync with the actual
615/// state.
616#[derive(Debug, PartialEq, Eq)]
617enum MaintainedPoolState {
618    /// Pool is assumed to be in sync with the current state
619    InSync,
620    /// Pool could be out of sync with the state
621    Drifted,
622}
623
624impl MaintainedPoolState {
625    /// Returns `true` if the pool is assumed to be out of sync with the current state.
626    #[inline]
627    const fn is_drifted(&self) -> bool {
628        matches!(self, Self::Drifted)
629    }
630}
631
632/// A unique [`ChangedAccount`] identified by its address that can be used for deduplication
633#[derive(Eq)]
634struct ChangedAccountEntry(ChangedAccount);
635
636impl PartialEq for ChangedAccountEntry {
637    fn eq(&self, other: &Self) -> bool {
638        self.0.address == other.0.address
639    }
640}
641
642impl Hash for ChangedAccountEntry {
643    fn hash<H: Hasher>(&self, state: &mut H) {
644        self.0.address.hash(state);
645    }
646}
647
648impl Borrow<Address> for ChangedAccountEntry {
649    fn borrow(&self) -> &Address {
650        &self.0.address
651    }
652}
653
654#[derive(Default)]
655struct LoadedAccounts {
656    /// All accounts that were loaded
657    accounts: Vec<ChangedAccount>,
658    /// All accounts that failed to load
659    failed_to_load: Vec<Address>,
660}
661
662/// Loads all accounts at the given state
663///
664/// Returns an error with all given addresses if the state is not available.
665///
666/// Note: this expects _unique_ addresses
667fn load_accounts<Client, I>(
668    client: Client,
669    at: BlockHash,
670    addresses: I,
671) -> Result<LoadedAccounts, Box<(HashSet<Address>, ProviderError)>>
672where
673    I: IntoIterator<Item = Address>,
674    Client: StateProviderFactory,
675{
676    let addresses = addresses.into_iter();
677    let mut res = LoadedAccounts::default();
678    let state = match client.history_by_block_hash(at) {
679        Ok(state) => state,
680        Err(err) => return Err(Box::new((addresses.collect(), err))),
681    };
682    for addr in addresses {
683        if let Ok(maybe_acc) = state.basic_account(&addr) {
684            let acc = maybe_acc
685                .map(|acc| ChangedAccount { address: addr, nonce: acc.nonce, balance: acc.balance })
686                .unwrap_or_else(|| ChangedAccount::empty(addr));
687            res.accounts.push(acc)
688        } else {
689            // failed to load account.
690            res.failed_to_load.push(addr);
691        }
692    }
693    Ok(res)
694}
695
696/// Loads transactions from a file, decodes them from the JSON or RLP format, and
697/// inserts them into the transaction pool on node boot up.
698/// The file is removed after the transactions have been successfully processed.
699async fn load_and_reinsert_transactions<P>(
700    pool: P,
701    file_path: &Path,
702) -> Result<(), TransactionsBackupError>
703where
704    P: TransactionPool<Transaction: PoolTransaction<Consensus: SignedTransaction>>,
705{
706    if !file_path.exists() {
707        return Ok(())
708    }
709
710    debug!(target: "txpool", txs_file =?file_path, "Check local persistent storage for saved transactions");
711    let data = reth_fs_util::read(file_path)?;
712
713    if data.is_empty() {
714        return Ok(())
715    }
716
717    let pool_transactions: Vec<(TransactionOrigin, <P as TransactionPool>::Transaction)> =
718        if let Ok(tx_backups) = serde_json::from_slice::<Vec<TxBackup>>(&data) {
719            tx_backups
720                .into_iter()
721                .filter_map(|backup| {
722                    let tx_signed =
723                        <P::Transaction as PoolTransaction>::Consensus::decode_2718_exact(
724                            backup.rlp.as_ref(),
725                        )
726                        .ok()?;
727                    let recovered = tx_signed.try_into_recovered().ok()?;
728                    let pool_tx =
729                        <P::Transaction as PoolTransaction>::try_from_consensus(recovered).ok()?;
730
731                    Some((backup.origin, pool_tx))
732                })
733                .collect()
734        } else {
735            let txs_signed: Vec<<P::Transaction as PoolTransaction>::Consensus> =
736                alloy_rlp::Decodable::decode(&mut data.as_slice())?;
737
738            txs_signed
739                .into_iter()
740                .filter_map(|tx| tx.try_into_recovered().ok())
741                .filter_map(|tx| {
742                    <P::Transaction as PoolTransaction>::try_from_consensus(tx)
743                        .ok()
744                        .map(|pool_tx| (TransactionOrigin::Local, pool_tx))
745                })
746                .collect()
747        };
748
749    let inserted = futures_util::future::join_all(
750        pool_transactions.into_iter().map(|(origin, tx)| pool.add_transaction(origin, tx)),
751    )
752    .await;
753
754    info!(target: "txpool", txs_file =?file_path, num_txs=%inserted.len(), "Successfully reinserted local transactions from file");
755    reth_fs_util::remove_file(file_path)?;
756    Ok(())
757}
758
759fn save_local_txs_backup<P>(pool: P, file_path: &Path)
760where
761    P: TransactionPool<Transaction: PoolTransaction<Consensus: Encodable>>,
762{
763    let local_transactions = pool.get_local_transactions();
764    if local_transactions.is_empty() {
765        trace!(target: "txpool", "no local transactions to save");
766        return
767    }
768
769    let local_transactions = local_transactions
770        .into_iter()
771        .map(|tx| {
772            let consensus_tx = tx.transaction.clone_into_consensus().into_inner();
773            let rlp_data = consensus_tx.encoded_2718();
774
775            TxBackup { rlp: rlp_data.into(), origin: tx.origin }
776        })
777        .collect::<Vec<_>>();
778
779    let json_data = match serde_json::to_string(&local_transactions) {
780        Ok(data) => data,
781        Err(err) => {
782            warn!(target: "txpool", %err, txs_file=?file_path, "failed to serialize local transactions to json");
783            return
784        }
785    };
786
787    info!(target: "txpool", txs_file =?file_path, num_txs=%local_transactions.len(), "Saving current local transactions");
788    let parent_dir = file_path.parent().map(std::fs::create_dir_all).transpose();
789
790    match parent_dir.map(|_| reth_fs_util::write(file_path, json_data)) {
791        Ok(_) => {
792            info!(target: "txpool", txs_file=?file_path, "Wrote local transactions to file");
793        }
794        Err(err) => {
795            warn!(target: "txpool", %err, txs_file=?file_path, "Failed to write local transactions to file");
796        }
797    }
798}
799
800/// A transaction backup that is saved as json to a file for
801/// reinsertion into the pool
802#[derive(Debug, Deserialize, Serialize)]
803pub struct TxBackup {
804    /// Encoded transaction
805    pub rlp: Bytes,
806    /// The origin of the transaction
807    pub origin: TransactionOrigin,
808}
809
810/// Errors possible during txs backup load and decode
811#[derive(thiserror::Error, Debug)]
812pub enum TransactionsBackupError {
813    /// Error during RLP decoding of transactions
814    #[error("failed to apply transactions backup. Encountered RLP decode error: {0}")]
815    Decode(#[from] alloy_rlp::Error),
816    /// Error during json decoding of transactions
817    #[error("failed to apply transactions backup. Encountered JSON decode error: {0}")]
818    Json(#[from] serde_json::Error),
819    /// Error during file upload
820    #[error("failed to apply transactions backup. Encountered file error: {0}")]
821    FsPath(#[from] FsPathError),
822    /// Error adding transactions to the transaction pool
823    #[error("failed to insert transactions to the transactions pool. Encountered pool error: {0}")]
824    Pool(#[from] PoolError),
825}
826
827/// Task which manages saving local transactions to the persistent file in case of shutdown.
828/// Reloads the transactions from the file on the boot up and inserts them into the pool.
829pub async fn backup_local_transactions_task<P>(
830    shutdown: reth_tasks::shutdown::GracefulShutdown,
831    pool: P,
832    config: LocalTransactionBackupConfig,
833) where
834    P: TransactionPool<Transaction: PoolTransaction<Consensus: SignedTransaction>> + Clone,
835{
836    let Some(transactions_path) = config.transactions_path else {
837        // nothing to do
838        return
839    };
840
841    if let Err(err) = load_and_reinsert_transactions(pool.clone(), &transactions_path).await {
842        error!(target: "txpool", "{}", err)
843    }
844
845    let graceful_guard = shutdown.await;
846
847    // write transactions to disk
848    save_local_txs_backup(pool, &transactions_path);
849
850    drop(graceful_guard)
851}
852
853#[cfg(test)]
854mod tests {
855    use super::*;
856    use crate::{
857        blobstore::InMemoryBlobStore, validate::EthTransactionValidatorBuilder,
858        CoinbaseTipOrdering, EthPooledTransaction, Pool, TransactionOrigin,
859    };
860    use alloy_eips::eip2718::Decodable2718;
861    use alloy_primitives::{hex, U256};
862    use reth_ethereum_primitives::PooledTransactionVariant;
863    use reth_fs_util as fs;
864    use reth_provider::test_utils::{ExtendedAccount, MockEthProvider};
865    use reth_tasks::TaskManager;
866
867    #[test]
868    fn changed_acc_entry() {
869        let changed_acc = ChangedAccountEntry(ChangedAccount::empty(Address::random()));
870        let mut copy = changed_acc.0;
871        copy.nonce = 10;
872        assert!(changed_acc.eq(&ChangedAccountEntry(copy)));
873    }
874
875    const EXTENSION: &str = "json";
876    const FILENAME: &str = "test_transactions_backup";
877
878    #[tokio::test(flavor = "multi_thread")]
879    async fn test_save_local_txs_backup() {
880        let temp_dir = tempfile::tempdir().unwrap();
881        let transactions_path = temp_dir.path().join(FILENAME).with_extension(EXTENSION);
882        let tx_bytes = hex!(
883            "02f87201830655c2808505ef61f08482565f94388c818ca8b9251b393131c08a736a67ccb192978801049e39c4b5b1f580c001a01764ace353514e8abdfb92446de356b260e3c1225b73fc4c8876a6258d12a129a04f02294aa61ca7676061cd99f29275491218b4754b46a0248e5e42bc5091f507"
884        );
885        let tx = PooledTransactionVariant::decode_2718(&mut &tx_bytes[..]).unwrap();
886        let provider = MockEthProvider::default();
887        let transaction = EthPooledTransaction::from_pooled(tx.try_into_recovered().unwrap());
888        let tx_to_cmp = transaction.clone();
889        let sender = hex!("1f9090aaE28b8a3dCeaDf281B0F12828e676c326").into();
890        provider.add_account(sender, ExtendedAccount::new(42, U256::MAX));
891        let blob_store = InMemoryBlobStore::default();
892        let validator = EthTransactionValidatorBuilder::new(provider).build(blob_store.clone());
893
894        let txpool = Pool::new(
895            validator,
896            CoinbaseTipOrdering::default(),
897            blob_store.clone(),
898            Default::default(),
899        );
900
901        txpool.add_transaction(TransactionOrigin::Local, transaction.clone()).await.unwrap();
902
903        let handle = tokio::runtime::Handle::current();
904        let manager = TaskManager::new(handle);
905        let config = LocalTransactionBackupConfig::with_local_txs_backup(transactions_path.clone());
906        manager.executor().spawn_critical_with_graceful_shutdown_signal("test task", |shutdown| {
907            backup_local_transactions_task(shutdown, txpool.clone(), config)
908        });
909
910        let mut txns = txpool.get_local_transactions();
911        let tx_on_finish = txns.pop().expect("there should be 1 transaction");
912
913        assert_eq!(*tx_to_cmp.hash(), *tx_on_finish.hash());
914
915        // shutdown the executor
916        manager.graceful_shutdown();
917
918        let data = fs::read(transactions_path).unwrap();
919
920        let txs: Vec<TxBackup> = serde_json::from_slice::<Vec<TxBackup>>(&data).unwrap();
921        assert_eq!(txs.len(), 1);
922
923        temp_dir.close().unwrap();
924    }
925
926    #[test]
927    fn test_update_with_higher_finalized_block() {
928        let mut tracker = FinalizedBlockTracker::new(Some(10));
929        assert_eq!(tracker.update(Some(15)), Some(15));
930        assert_eq!(tracker.last_finalized_block, Some(15));
931    }
932
933    #[test]
934    fn test_update_with_lower_finalized_block() {
935        let mut tracker = FinalizedBlockTracker::new(Some(20));
936        assert_eq!(tracker.update(Some(15)), None);
937        assert_eq!(tracker.last_finalized_block, Some(15));
938    }
939
940    #[test]
941    fn test_update_with_equal_finalized_block() {
942        let mut tracker = FinalizedBlockTracker::new(Some(20));
943        assert_eq!(tracker.update(Some(20)), None);
944        assert_eq!(tracker.last_finalized_block, Some(20));
945    }
946
947    #[test]
948    fn test_update_with_no_last_finalized_block() {
949        let mut tracker = FinalizedBlockTracker::new(None);
950        assert_eq!(tracker.update(Some(10)), Some(10));
951        assert_eq!(tracker.last_finalized_block, Some(10));
952    }
953
954    #[test]
955    fn test_update_with_no_new_finalized_block() {
956        let mut tracker = FinalizedBlockTracker::new(Some(10));
957        assert_eq!(tracker.update(None), None);
958        assert_eq!(tracker.last_finalized_block, Some(10));
959    }
960
961    #[test]
962    fn test_update_with_no_finalized_blocks() {
963        let mut tracker = FinalizedBlockTracker::new(None);
964        assert_eq!(tracker.update(None), None);
965        assert_eq!(tracker.last_finalized_block, None);
966    }
967}