reth_transaction_pool/
maintain.rs

1//! Support for maintaining the state of the transaction pool
2
3use crate::{
4    blobstore::{BlobSidecarConverter, BlobStoreCanonTracker, BlobStoreUpdates},
5    error::PoolError,
6    metrics::MaintainPoolMetrics,
7    traits::{CanonicalStateUpdate, EthPoolTransaction, TransactionPool, TransactionPoolExt},
8    AllPoolTransactions, BlobTransactionSidecarVariant, BlockInfo, PoolTransaction, PoolUpdateKind,
9    TransactionOrigin,
10};
11use alloy_consensus::{transaction::TxHashRef, BlockHeader, Typed2718};
12use alloy_eips::{BlockNumberOrTag, Decodable2718, Encodable2718};
13use alloy_primitives::{Address, BlockHash, BlockNumber, Bytes};
14use alloy_rlp::Encodable;
15use futures_util::{
16    future::{BoxFuture, Fuse, FusedFuture},
17    FutureExt, Stream, StreamExt,
18};
19use reth_chain_state::CanonStateNotification;
20use reth_chainspec::{ChainSpecProvider, EthChainSpec, EthereumHardforks};
21use reth_execution_types::ChangedAccount;
22use reth_fs_util::FsPathError;
23use reth_primitives_traits::{
24    transaction::signed::SignedTransaction, NodePrimitives, SealedHeader,
25};
26use reth_storage_api::{errors::provider::ProviderError, BlockReaderIdExt, StateProviderFactory};
27use reth_tasks::TaskSpawner;
28use serde::{Deserialize, Serialize};
29use std::{
30    borrow::Borrow,
31    collections::HashSet,
32    hash::{Hash, Hasher},
33    path::{Path, PathBuf},
34    sync::Arc,
35};
36use tokio::{
37    sync::oneshot,
38    time::{self, Duration},
39};
40use tracing::{debug, error, info, trace, warn};
41
42/// Maximum amount of time non-executable transaction are queued.
43pub const MAX_QUEUED_TRANSACTION_LIFETIME: Duration = Duration::from_secs(3 * 60 * 60);
44
45/// Additional settings for maintaining the transaction pool
46#[derive(Debug, Clone, Copy, PartialEq, Eq)]
47pub struct MaintainPoolConfig {
48    /// Maximum (reorg) depth we handle when updating the transaction pool: `new.number -
49    /// last_seen.number`
50    ///
51    /// Default: 64 (2 epochs)
52    pub max_update_depth: u64,
53    /// Maximum number of accounts to reload from state at once when updating the transaction pool.
54    ///
55    /// Default: 100
56    pub max_reload_accounts: usize,
57
58    /// Maximum amount of time non-executable, non local transactions are queued.
59    /// Default: 3 hours
60    pub max_tx_lifetime: Duration,
61
62    /// Apply no exemptions to the locally received transactions.
63    ///
64    /// This includes:
65    ///   - no price exemptions
66    ///   - no eviction exemptions
67    pub no_local_exemptions: bool,
68}
69
70impl Default for MaintainPoolConfig {
71    fn default() -> Self {
72        Self {
73            max_update_depth: 64,
74            max_reload_accounts: 100,
75            max_tx_lifetime: MAX_QUEUED_TRANSACTION_LIFETIME,
76            no_local_exemptions: false,
77        }
78    }
79}
80
81/// Settings for local transaction backup task
82#[derive(Debug, Clone, Default)]
83pub struct LocalTransactionBackupConfig {
84    /// Path to transactions backup file
85    pub transactions_path: Option<PathBuf>,
86}
87
88impl LocalTransactionBackupConfig {
89    /// Receive path to transactions backup and return initialized config
90    pub const fn with_local_txs_backup(transactions_path: PathBuf) -> Self {
91        Self { transactions_path: Some(transactions_path) }
92    }
93}
94
95/// Returns a spawnable future for maintaining the state of the transaction pool.
96pub fn maintain_transaction_pool_future<N, Client, P, St, Tasks>(
97    client: Client,
98    pool: P,
99    events: St,
100    task_spawner: Tasks,
101    config: MaintainPoolConfig,
102) -> BoxFuture<'static, ()>
103where
104    N: NodePrimitives,
105    Client: StateProviderFactory
106        + BlockReaderIdExt<Header = N::BlockHeader>
107        + ChainSpecProvider<ChainSpec: EthChainSpec<Header = N::BlockHeader> + EthereumHardforks>
108        + Clone
109        + 'static,
110    P: TransactionPoolExt<Transaction: PoolTransaction<Consensus = N::SignedTx>> + 'static,
111    St: Stream<Item = CanonStateNotification<N>> + Send + Unpin + 'static,
112    Tasks: TaskSpawner + Clone + 'static,
113{
114    async move {
115        maintain_transaction_pool(client, pool, events, task_spawner, config).await;
116    }
117    .boxed()
118}
119
120/// Maintains the state of the transaction pool by handling new blocks and reorgs.
121///
122/// This listens for any new blocks and reorgs and updates the transaction pool's state accordingly
123pub async fn maintain_transaction_pool<N, Client, P, St, Tasks>(
124    client: Client,
125    pool: P,
126    mut events: St,
127    task_spawner: Tasks,
128    config: MaintainPoolConfig,
129) where
130    N: NodePrimitives,
131    Client: StateProviderFactory
132        + BlockReaderIdExt<Header = N::BlockHeader>
133        + ChainSpecProvider<ChainSpec: EthChainSpec<Header = N::BlockHeader> + EthereumHardforks>
134        + Clone
135        + 'static,
136    P: TransactionPoolExt<Transaction: PoolTransaction<Consensus = N::SignedTx>> + 'static,
137    St: Stream<Item = CanonStateNotification<N>> + Send + Unpin + 'static,
138    Tasks: TaskSpawner + Clone + 'static,
139{
140    let metrics = MaintainPoolMetrics::default();
141    let MaintainPoolConfig { max_update_depth, max_reload_accounts, .. } = config;
142    // ensure the pool points to latest state
143    if let Ok(Some(latest)) = client.header_by_number_or_tag(BlockNumberOrTag::Latest) {
144        let latest = SealedHeader::seal_slow(latest);
145        let chain_spec = client.chain_spec();
146        let info = BlockInfo {
147            block_gas_limit: latest.gas_limit(),
148            last_seen_block_hash: latest.hash(),
149            last_seen_block_number: latest.number(),
150            pending_basefee: chain_spec
151                .next_block_base_fee(latest.header(), latest.timestamp())
152                .unwrap_or_default(),
153            pending_blob_fee: latest
154                .maybe_next_block_blob_fee(chain_spec.blob_params_at_timestamp(latest.timestamp())),
155        };
156        pool.set_block_info(info);
157    }
158
159    // keeps track of mined blob transaction so we can clean finalized transactions
160    let mut blob_store_tracker = BlobStoreCanonTracker::default();
161
162    // keeps track of the latest finalized block
163    let mut last_finalized_block =
164        FinalizedBlockTracker::new(client.finalized_block_number().ok().flatten());
165
166    // keeps track of any dirty accounts that we know of are out of sync with the pool
167    let mut dirty_addresses = HashSet::default();
168
169    // keeps track of the state of the pool wrt to blocks
170    let mut maintained_state = MaintainedPoolState::InSync;
171
172    // the future that reloads accounts from state
173    let mut reload_accounts_fut = Fuse::terminated();
174
175    // eviction interval for stale non local txs
176    let mut stale_eviction_interval = time::interval(config.max_tx_lifetime);
177
178    // toggle for the first notification
179    let mut first_event = true;
180
181    // The update loop that waits for new blocks and reorgs and performs pool updated
182    // Listen for new chain events and derive the update action for the pool
183    loop {
184        trace!(target: "txpool", state=?maintained_state, "awaiting new block or reorg");
185
186        metrics.set_dirty_accounts_len(dirty_addresses.len());
187        let pool_info = pool.block_info();
188
189        // after performing a pool update after a new block we have some time to properly update
190        // dirty accounts and correct if the pool drifted from current state, for example after
191        // restart or a pipeline run
192        if maintained_state.is_drifted() {
193            metrics.inc_drift();
194            // assuming all senders are dirty
195            dirty_addresses = pool.unique_senders();
196            // make sure we toggle the state back to in sync
197            maintained_state = MaintainedPoolState::InSync;
198        }
199
200        // if we have accounts that are out of sync with the pool, we reload them in chunks
201        if !dirty_addresses.is_empty() && reload_accounts_fut.is_terminated() {
202            let (tx, rx) = oneshot::channel();
203            let c = client.clone();
204            let at = pool_info.last_seen_block_hash;
205            let fut = if dirty_addresses.len() > max_reload_accounts {
206                // need to chunk accounts to reload
207                let accs_to_reload =
208                    dirty_addresses.iter().copied().take(max_reload_accounts).collect::<Vec<_>>();
209                for acc in &accs_to_reload {
210                    // make sure we remove them from the dirty set
211                    dirty_addresses.remove(acc);
212                }
213                async move {
214                    let res = load_accounts(c, at, accs_to_reload.into_iter());
215                    let _ = tx.send(res);
216                }
217                .boxed()
218            } else {
219                // can fetch all dirty accounts at once
220                let accs_to_reload = std::mem::take(&mut dirty_addresses);
221                async move {
222                    let res = load_accounts(c, at, accs_to_reload.into_iter());
223                    let _ = tx.send(res);
224                }
225                .boxed()
226            };
227            reload_accounts_fut = rx.fuse();
228            task_spawner.spawn_blocking(fut);
229        }
230
231        // check if we have a new finalized block
232        if let Some(finalized) =
233            last_finalized_block.update(client.finalized_block_number().ok().flatten()) &&
234            let BlobStoreUpdates::Finalized(blobs) =
235                blob_store_tracker.on_finalized_block(finalized)
236        {
237            metrics.inc_deleted_tracked_blobs(blobs.len());
238            // remove all finalized blobs from the blob store
239            pool.delete_blobs(blobs);
240            // and also do periodic cleanup
241            let pool = pool.clone();
242            task_spawner.spawn_blocking(Box::pin(async move {
243                debug!(target: "txpool", finalized_block = %finalized, "cleaning up blob store");
244                pool.cleanup_blobs();
245            }));
246        }
247
248        // outcomes of the futures we are waiting on
249        let mut event = None;
250        let mut reloaded = None;
251
252        // select of account reloads and new canonical state updates which should arrive at the rate
253        // of the block time
254        tokio::select! {
255            res = &mut reload_accounts_fut =>  {
256                reloaded = Some(res);
257            }
258            ev = events.next() =>  {
259                 if ev.is_none() {
260                    // the stream ended, we are done
261                    break;
262                }
263                event = ev;
264                // on receiving the first event on start up, mark the pool as drifted to explicitly
265                // trigger revalidation and clear out outdated txs.
266                if first_event {
267                    maintained_state = MaintainedPoolState::Drifted;
268                    first_event = false
269                }
270            }
271            _ = stale_eviction_interval.tick() => {
272                let stale_txs: Vec<_> = pool
273                    .queued_transactions()
274                    .into_iter()
275                    .filter(|tx| {
276                        // filter stale transactions based on config
277                        (tx.origin.is_external() || config.no_local_exemptions) && tx.timestamp.elapsed() > config.max_tx_lifetime
278                    })
279                    .map(|tx| *tx.hash())
280                    .collect();
281                debug!(target: "txpool", count=%stale_txs.len(), "removing stale transactions");
282                pool.remove_transactions(stale_txs);
283            }
284        }
285        // handle the result of the account reload
286        match reloaded {
287            Some(Ok(Ok(LoadedAccounts { accounts, failed_to_load }))) => {
288                // reloaded accounts successfully
289                // extend accounts we failed to load from database
290                dirty_addresses.extend(failed_to_load);
291                // update the pool with the loaded accounts
292                pool.update_accounts(accounts);
293            }
294            Some(Ok(Err(res))) => {
295                // Failed to load accounts from state
296                let (accs, err) = *res;
297                debug!(target: "txpool", %err, "failed to load accounts");
298                dirty_addresses.extend(accs);
299            }
300            Some(Err(_)) => {
301                // failed to receive the accounts, sender dropped, only possible if task panicked
302                maintained_state = MaintainedPoolState::Drifted;
303            }
304            None => {}
305        }
306
307        // handle the new block or reorg
308        let Some(event) = event else { continue };
309        match event {
310            CanonStateNotification::Reorg { old, new } => {
311                let (old_blocks, old_state) = old.inner();
312                let (new_blocks, new_state) = new.inner();
313                let new_tip = new_blocks.tip();
314                let new_first = new_blocks.first();
315                let old_first = old_blocks.first();
316
317                // check if the reorg is not canonical with the pool's block
318                if !(old_first.parent_hash() == pool_info.last_seen_block_hash ||
319                    new_first.parent_hash() == pool_info.last_seen_block_hash)
320                {
321                    // the new block points to a higher block than the oldest block in the old chain
322                    maintained_state = MaintainedPoolState::Drifted;
323                }
324
325                let chain_spec = client.chain_spec();
326
327                // fees for the next block: `new_tip+1`
328                let pending_block_base_fee = chain_spec
329                    .next_block_base_fee(new_tip.header(), new_tip.timestamp())
330                    .unwrap_or_default();
331                let pending_block_blob_fee = new_tip.header().maybe_next_block_blob_fee(
332                    chain_spec.blob_params_at_timestamp(new_tip.timestamp()),
333                );
334
335                // we know all changed account in the new chain
336                let new_changed_accounts: HashSet<_> =
337                    new_state.changed_accounts().map(ChangedAccountEntry).collect();
338
339                // find all accounts that were changed in the old chain but _not_ in the new chain
340                let missing_changed_acc = old_state
341                    .accounts_iter()
342                    .map(|(a, _)| a)
343                    .filter(|addr| !new_changed_accounts.contains(addr));
344
345                // for these we need to fetch the nonce+balance from the db at the new tip
346                let mut changed_accounts =
347                    match load_accounts(client.clone(), new_tip.hash(), missing_changed_acc) {
348                        Ok(LoadedAccounts { accounts, failed_to_load }) => {
349                            // extend accounts we failed to load from database
350                            dirty_addresses.extend(failed_to_load);
351
352                            accounts
353                        }
354                        Err(err) => {
355                            let (addresses, err) = *err;
356                            debug!(
357                                target: "txpool",
358                                %err,
359                                "failed to load missing changed accounts at new tip: {:?}",
360                                new_tip.hash()
361                            );
362                            dirty_addresses.extend(addresses);
363                            vec![]
364                        }
365                    };
366
367                // also include all accounts from new chain
368                // we can use extend here because they are unique
369                changed_accounts.extend(new_changed_accounts.into_iter().map(|entry| entry.0));
370
371                // all transactions mined in the new chain
372                let new_mined_transactions: HashSet<_> = new_blocks.transaction_hashes().collect();
373
374                // update the pool then re-inject the pruned transactions
375                // find all transactions that were mined in the old chain but not in the new chain
376                let pruned_old_transactions = old_blocks
377                    .transactions_ecrecovered()
378                    .filter(|tx| !new_mined_transactions.contains(tx.tx_hash()))
379                    .filter_map(|tx| {
380                        if tx.is_eip4844() {
381                            // reorged blobs no longer include the blob, which is necessary for
382                            // validating the transaction. Even though the transaction could have
383                            // been validated previously, we still need the blob in order to
384                            // accurately set the transaction's
385                            // encoded-length which is propagated over the network.
386                            pool.get_blob(*tx.tx_hash())
387                                .ok()
388                                .flatten()
389                                .map(Arc::unwrap_or_clone)
390                                .and_then(|sidecar| {
391                                    <P as TransactionPool>::Transaction::try_from_eip4844(
392                                        tx, sidecar,
393                                    )
394                                })
395                        } else {
396                            <P as TransactionPool>::Transaction::try_from_consensus(tx).ok()
397                        }
398                    })
399                    .collect::<Vec<_>>();
400
401                // update the pool first
402                let update = CanonicalStateUpdate {
403                    new_tip: new_tip.sealed_block(),
404                    pending_block_base_fee,
405                    pending_block_blob_fee,
406                    changed_accounts,
407                    // all transactions mined in the new chain need to be removed from the pool
408                    mined_transactions: new_blocks.transaction_hashes().collect(),
409                    update_kind: PoolUpdateKind::Reorg,
410                };
411                pool.on_canonical_state_change(update);
412
413                // all transactions that were mined in the old chain but not in the new chain need
414                // to be re-injected
415                //
416                // Note: we no longer know if the tx was local or external
417                // Because the transactions are not finalized, the corresponding blobs are still in
418                // blob store (if we previously received them from the network)
419                metrics.inc_reinserted_transactions(pruned_old_transactions.len());
420                let _ = pool.add_external_transactions(pruned_old_transactions).await;
421
422                // keep track of new mined blob transactions
423                blob_store_tracker.add_new_chain_blocks(&new_blocks);
424            }
425            CanonStateNotification::Commit { new } => {
426                let (blocks, state) = new.inner();
427                let tip = blocks.tip();
428                let chain_spec = client.chain_spec();
429
430                // fees for the next block: `tip+1`
431                let pending_block_base_fee = chain_spec
432                    .next_block_base_fee(tip.header(), tip.timestamp())
433                    .unwrap_or_default();
434                let pending_block_blob_fee = tip.header().maybe_next_block_blob_fee(
435                    chain_spec.blob_params_at_timestamp(tip.timestamp()),
436                );
437
438                let first_block = blocks.first();
439                trace!(
440                    target: "txpool",
441                    first = first_block.number(),
442                    tip = tip.number(),
443                    pool_block = pool_info.last_seen_block_number,
444                    "update pool on new commit"
445                );
446
447                // check if the depth is too large and should be skipped, this could happen after
448                // initial sync or long re-sync
449                let depth = tip.number().abs_diff(pool_info.last_seen_block_number);
450                if depth > max_update_depth {
451                    maintained_state = MaintainedPoolState::Drifted;
452                    debug!(target: "txpool", ?depth, "skipping deep canonical update");
453                    let info = BlockInfo {
454                        block_gas_limit: tip.header().gas_limit(),
455                        last_seen_block_hash: tip.hash(),
456                        last_seen_block_number: tip.number(),
457                        pending_basefee: pending_block_base_fee,
458                        pending_blob_fee: pending_block_blob_fee,
459                    };
460                    pool.set_block_info(info);
461
462                    // keep track of mined blob transactions
463                    blob_store_tracker.add_new_chain_blocks(&blocks);
464
465                    continue
466                }
467
468                let mut changed_accounts = Vec::with_capacity(state.state().len());
469                for acc in state.changed_accounts() {
470                    // we can always clear the dirty flag for this account
471                    dirty_addresses.remove(&acc.address);
472                    changed_accounts.push(acc);
473                }
474
475                let mined_transactions = blocks.transaction_hashes().collect();
476
477                // check if the range of the commit is canonical with the pool's block
478                if first_block.parent_hash() != pool_info.last_seen_block_hash {
479                    // we received a new canonical chain commit but the commit is not canonical with
480                    // the pool's block, this could happen after initial sync or
481                    // long re-sync
482                    maintained_state = MaintainedPoolState::Drifted;
483                }
484
485                // Canonical update
486                let update = CanonicalStateUpdate {
487                    new_tip: tip.sealed_block(),
488                    pending_block_base_fee,
489                    pending_block_blob_fee,
490                    changed_accounts,
491                    mined_transactions,
492                    update_kind: PoolUpdateKind::Commit,
493                };
494                pool.on_canonical_state_change(update);
495
496                // keep track of mined blob transactions
497                blob_store_tracker.add_new_chain_blocks(&blocks);
498
499                // If Osaka activates in 2 slots we need to convert blobs to new format.
500                if !chain_spec.is_osaka_active_at_timestamp(tip.timestamp()) &&
501                    !chain_spec.is_osaka_active_at_timestamp(tip.timestamp().saturating_add(12)) &&
502                    chain_spec.is_osaka_active_at_timestamp(tip.timestamp().saturating_add(24))
503                {
504                    let pool = pool.clone();
505                    let spawner = task_spawner.clone();
506                    let client = client.clone();
507                    task_spawner.spawn(Box::pin(async move {
508                        // Start converting not eaerlier than 4 seconds into current slot to ensure
509                        // that our pool only contains valid transactions for the next block (as
510                        // it's not Osaka yet).
511                        tokio::time::sleep(Duration::from_secs(4)).await;
512
513                        let mut interval = tokio::time::interval(Duration::from_secs(1));
514                        loop {
515                            // Loop and replace blob transactions until we reach Osaka transition
516                            // block after which no legacy blobs are going to be accepted.
517                            let last_iteration =
518                                client.latest_header().ok().flatten().is_none_or(|header| {
519                                    client
520                                        .chain_spec()
521                                        .is_osaka_active_at_timestamp(header.timestamp())
522                                });
523
524                            let AllPoolTransactions { pending, queued } = pool.all_transactions();
525                            for tx in pending
526                                .into_iter()
527                                .chain(queued)
528                                .filter(|tx| tx.transaction.is_eip4844())
529                            {
530                                let tx_hash = *tx.transaction.hash();
531
532                                // Fetch sidecar from the pool
533                                let Ok(Some(sidecar)) = pool.get_blob(tx_hash) else {
534                                    continue;
535                                };
536                                // Ensure it is a legacy blob
537                                if !sidecar.is_eip4844() {
538                                    continue;
539                                }
540                                // Remove transaction and sidecar from the pool, both are in memory
541                                // now
542                                let Some(tx) = pool.remove_transactions(vec![tx_hash]).pop() else {
543                                    continue;
544                                };
545                                pool.delete_blob(tx_hash);
546
547                                let BlobTransactionSidecarVariant::Eip4844(sidecar) =
548                                    Arc::unwrap_or_clone(sidecar)
549                                else {
550                                    continue;
551                                };
552
553                                let converter = BlobSidecarConverter::new();
554                                let pool = pool.clone();
555                                spawner.spawn(Box::pin(async move {
556                                    // Convert sidecar to EIP-7594 format
557                                    let Some(sidecar) = converter.convert(sidecar).await else {
558                                        return;
559                                    };
560
561                                    // Re-insert transaction with the new sidecar
562                                    let origin = tx.origin;
563                                    let Some(tx) = EthPoolTransaction::try_from_eip4844(
564                                        tx.transaction.clone_into_consensus(),
565                                        sidecar.into(),
566                                    ) else {
567                                        return;
568                                    };
569                                    let _ = pool.add_transaction(origin, tx).await;
570                                }));
571                            }
572
573                            if last_iteration {
574                                break;
575                            }
576
577                            interval.tick().await;
578                        }
579                    }));
580                }
581            }
582        }
583    }
584}
585
586struct FinalizedBlockTracker {
587    last_finalized_block: Option<BlockNumber>,
588}
589
590impl FinalizedBlockTracker {
591    const fn new(last_finalized_block: Option<BlockNumber>) -> Self {
592        Self { last_finalized_block }
593    }
594
595    /// Updates the tracked finalized block and returns the new finalized block if it changed
596    fn update(&mut self, finalized_block: Option<BlockNumber>) -> Option<BlockNumber> {
597        let finalized = finalized_block?;
598        self.last_finalized_block
599            .replace(finalized)
600            .is_none_or(|last| last < finalized)
601            .then_some(finalized)
602    }
603}
604
605/// Keeps track of the pool's state, whether the accounts in the pool are in sync with the actual
606/// state.
607#[derive(Debug, PartialEq, Eq)]
608enum MaintainedPoolState {
609    /// Pool is assumed to be in sync with the current state
610    InSync,
611    /// Pool could be out of sync with the state
612    Drifted,
613}
614
615impl MaintainedPoolState {
616    /// Returns `true` if the pool is assumed to be out of sync with the current state.
617    #[inline]
618    const fn is_drifted(&self) -> bool {
619        matches!(self, Self::Drifted)
620    }
621}
622
623/// A unique [`ChangedAccount`] identified by its address that can be used for deduplication
624#[derive(Eq)]
625struct ChangedAccountEntry(ChangedAccount);
626
627impl PartialEq for ChangedAccountEntry {
628    fn eq(&self, other: &Self) -> bool {
629        self.0.address == other.0.address
630    }
631}
632
633impl Hash for ChangedAccountEntry {
634    fn hash<H: Hasher>(&self, state: &mut H) {
635        self.0.address.hash(state);
636    }
637}
638
639impl Borrow<Address> for ChangedAccountEntry {
640    fn borrow(&self) -> &Address {
641        &self.0.address
642    }
643}
644
645#[derive(Default)]
646struct LoadedAccounts {
647    /// All accounts that were loaded
648    accounts: Vec<ChangedAccount>,
649    /// All accounts that failed to load
650    failed_to_load: Vec<Address>,
651}
652
653/// Loads all accounts at the given state
654///
655/// Returns an error with all given addresses if the state is not available.
656///
657/// Note: this expects _unique_ addresses
658fn load_accounts<Client, I>(
659    client: Client,
660    at: BlockHash,
661    addresses: I,
662) -> Result<LoadedAccounts, Box<(HashSet<Address>, ProviderError)>>
663where
664    I: IntoIterator<Item = Address>,
665    Client: StateProviderFactory,
666{
667    let addresses = addresses.into_iter();
668    let mut res = LoadedAccounts::default();
669    let state = match client.history_by_block_hash(at) {
670        Ok(state) => state,
671        Err(err) => return Err(Box::new((addresses.collect(), err))),
672    };
673    for addr in addresses {
674        if let Ok(maybe_acc) = state.basic_account(&addr) {
675            let acc = maybe_acc
676                .map(|acc| ChangedAccount { address: addr, nonce: acc.nonce, balance: acc.balance })
677                .unwrap_or_else(|| ChangedAccount::empty(addr));
678            res.accounts.push(acc)
679        } else {
680            // failed to load account.
681            res.failed_to_load.push(addr);
682        }
683    }
684    Ok(res)
685}
686
687/// Loads transactions from a file, decodes them from the JSON or RLP format, and
688/// inserts them into the transaction pool on node boot up.
689/// The file is removed after the transactions have been successfully processed.
690async fn load_and_reinsert_transactions<P>(
691    pool: P,
692    file_path: &Path,
693) -> Result<(), TransactionsBackupError>
694where
695    P: TransactionPool<Transaction: PoolTransaction<Consensus: SignedTransaction>>,
696{
697    if !file_path.exists() {
698        return Ok(())
699    }
700
701    debug!(target: "txpool", txs_file =?file_path, "Check local persistent storage for saved transactions");
702    let data = reth_fs_util::read(file_path)?;
703
704    if data.is_empty() {
705        return Ok(())
706    }
707
708    let pool_transactions: Vec<(TransactionOrigin, <P as TransactionPool>::Transaction)> =
709        if let Ok(tx_backups) = serde_json::from_slice::<Vec<TxBackup>>(&data) {
710            tx_backups
711                .into_iter()
712                .filter_map(|backup| {
713                    let tx_signed =
714                        <P::Transaction as PoolTransaction>::Consensus::decode_2718_exact(
715                            backup.rlp.as_ref(),
716                        )
717                        .ok()?;
718                    let recovered = tx_signed.try_into_recovered().ok()?;
719                    let pool_tx =
720                        <P::Transaction as PoolTransaction>::try_from_consensus(recovered).ok()?;
721
722                    Some((backup.origin, pool_tx))
723                })
724                .collect()
725        } else {
726            let txs_signed: Vec<<P::Transaction as PoolTransaction>::Consensus> =
727                alloy_rlp::Decodable::decode(&mut data.as_slice())?;
728
729            txs_signed
730                .into_iter()
731                .filter_map(|tx| tx.try_into_recovered().ok())
732                .filter_map(|tx| {
733                    <P::Transaction as PoolTransaction>::try_from_consensus(tx)
734                        .ok()
735                        .map(|pool_tx| (TransactionOrigin::Local, pool_tx))
736                })
737                .collect()
738        };
739
740    let inserted = futures_util::future::join_all(
741        pool_transactions.into_iter().map(|(origin, tx)| pool.add_transaction(origin, tx)),
742    )
743    .await;
744
745    info!(target: "txpool", txs_file =?file_path, num_txs=%inserted.len(), "Successfully reinserted local transactions from file");
746    reth_fs_util::remove_file(file_path)?;
747    Ok(())
748}
749
750fn save_local_txs_backup<P>(pool: P, file_path: &Path)
751where
752    P: TransactionPool<Transaction: PoolTransaction<Consensus: Encodable>>,
753{
754    let local_transactions = pool.get_local_transactions();
755    if local_transactions.is_empty() {
756        trace!(target: "txpool", "no local transactions to save");
757        return
758    }
759
760    let local_transactions = local_transactions
761        .into_iter()
762        .map(|tx| {
763            let consensus_tx = tx.transaction.clone_into_consensus().into_inner();
764            let rlp_data = consensus_tx.encoded_2718();
765
766            TxBackup { rlp: rlp_data.into(), origin: tx.origin }
767        })
768        .collect::<Vec<_>>();
769
770    let json_data = match serde_json::to_string(&local_transactions) {
771        Ok(data) => data,
772        Err(err) => {
773            warn!(target: "txpool", %err, txs_file=?file_path, "failed to serialize local transactions to json");
774            return
775        }
776    };
777
778    info!(target: "txpool", txs_file =?file_path, num_txs=%local_transactions.len(), "Saving current local transactions");
779    let parent_dir = file_path.parent().map(std::fs::create_dir_all).transpose();
780
781    match parent_dir.map(|_| reth_fs_util::write(file_path, json_data)) {
782        Ok(_) => {
783            info!(target: "txpool", txs_file=?file_path, "Wrote local transactions to file");
784        }
785        Err(err) => {
786            warn!(target: "txpool", %err, txs_file=?file_path, "Failed to write local transactions to file");
787        }
788    }
789}
790
791/// A transaction backup that is saved as json to a file for
792/// reinsertion into the pool
793#[derive(Debug, Deserialize, Serialize)]
794pub struct TxBackup {
795    /// Encoded transaction
796    pub rlp: Bytes,
797    /// The origin of the transaction
798    pub origin: TransactionOrigin,
799}
800
801/// Errors possible during txs backup load and decode
802#[derive(thiserror::Error, Debug)]
803pub enum TransactionsBackupError {
804    /// Error during RLP decoding of transactions
805    #[error("failed to apply transactions backup. Encountered RLP decode error: {0}")]
806    Decode(#[from] alloy_rlp::Error),
807    /// Error during json decoding of transactions
808    #[error("failed to apply transactions backup. Encountered JSON decode error: {0}")]
809    Json(#[from] serde_json::Error),
810    /// Error during file upload
811    #[error("failed to apply transactions backup. Encountered file error: {0}")]
812    FsPath(#[from] FsPathError),
813    /// Error adding transactions to the transaction pool
814    #[error("failed to insert transactions to the transactions pool. Encountered pool error: {0}")]
815    Pool(#[from] PoolError),
816}
817
818/// Task which manages saving local transactions to the persistent file in case of shutdown.
819/// Reloads the transactions from the file on the boot up and inserts them into the pool.
820pub async fn backup_local_transactions_task<P>(
821    shutdown: reth_tasks::shutdown::GracefulShutdown,
822    pool: P,
823    config: LocalTransactionBackupConfig,
824) where
825    P: TransactionPool<Transaction: PoolTransaction<Consensus: SignedTransaction>> + Clone,
826{
827    let Some(transactions_path) = config.transactions_path else {
828        // nothing to do
829        return
830    };
831
832    if let Err(err) = load_and_reinsert_transactions(pool.clone(), &transactions_path).await {
833        error!(target: "txpool", "{}", err)
834    }
835
836    let graceful_guard = shutdown.await;
837
838    // write transactions to disk
839    save_local_txs_backup(pool, &transactions_path);
840
841    drop(graceful_guard)
842}
843
844#[cfg(test)]
845mod tests {
846    use super::*;
847    use crate::{
848        blobstore::InMemoryBlobStore, validate::EthTransactionValidatorBuilder,
849        CoinbaseTipOrdering, EthPooledTransaction, Pool, TransactionOrigin,
850    };
851    use alloy_eips::eip2718::Decodable2718;
852    use alloy_primitives::{hex, U256};
853    use reth_ethereum_primitives::PooledTransactionVariant;
854    use reth_fs_util as fs;
855    use reth_provider::test_utils::{ExtendedAccount, MockEthProvider};
856    use reth_tasks::TaskManager;
857
858    #[test]
859    fn changed_acc_entry() {
860        let changed_acc = ChangedAccountEntry(ChangedAccount::empty(Address::random()));
861        let mut copy = changed_acc.0;
862        copy.nonce = 10;
863        assert!(changed_acc.eq(&ChangedAccountEntry(copy)));
864    }
865
866    const EXTENSION: &str = "json";
867    const FILENAME: &str = "test_transactions_backup";
868
869    #[tokio::test(flavor = "multi_thread")]
870    async fn test_save_local_txs_backup() {
871        let temp_dir = tempfile::tempdir().unwrap();
872        let transactions_path = temp_dir.path().join(FILENAME).with_extension(EXTENSION);
873        let tx_bytes = hex!(
874            "02f87201830655c2808505ef61f08482565f94388c818ca8b9251b393131c08a736a67ccb192978801049e39c4b5b1f580c001a01764ace353514e8abdfb92446de356b260e3c1225b73fc4c8876a6258d12a129a04f02294aa61ca7676061cd99f29275491218b4754b46a0248e5e42bc5091f507"
875        );
876        let tx = PooledTransactionVariant::decode_2718(&mut &tx_bytes[..]).unwrap();
877        let provider = MockEthProvider::default();
878        let transaction = EthPooledTransaction::from_pooled(tx.try_into_recovered().unwrap());
879        let tx_to_cmp = transaction.clone();
880        let sender = hex!("1f9090aaE28b8a3dCeaDf281B0F12828e676c326").into();
881        provider.add_account(sender, ExtendedAccount::new(42, U256::MAX));
882        let blob_store = InMemoryBlobStore::default();
883        let validator = EthTransactionValidatorBuilder::new(provider).build(blob_store.clone());
884
885        let txpool = Pool::new(
886            validator,
887            CoinbaseTipOrdering::default(),
888            blob_store.clone(),
889            Default::default(),
890        );
891
892        txpool.add_transaction(TransactionOrigin::Local, transaction.clone()).await.unwrap();
893
894        let handle = tokio::runtime::Handle::current();
895        let manager = TaskManager::new(handle);
896        let config = LocalTransactionBackupConfig::with_local_txs_backup(transactions_path.clone());
897        manager.executor().spawn_critical_with_graceful_shutdown_signal("test task", |shutdown| {
898            backup_local_transactions_task(shutdown, txpool.clone(), config)
899        });
900
901        let mut txns = txpool.get_local_transactions();
902        let tx_on_finish = txns.pop().expect("there should be 1 transaction");
903
904        assert_eq!(*tx_to_cmp.hash(), *tx_on_finish.hash());
905
906        // shutdown the executor
907        manager.graceful_shutdown();
908
909        let data = fs::read(transactions_path).unwrap();
910
911        let txs: Vec<TxBackup> = serde_json::from_slice::<Vec<TxBackup>>(&data).unwrap();
912        assert_eq!(txs.len(), 1);
913
914        temp_dir.close().unwrap();
915    }
916
917    #[test]
918    fn test_update_with_higher_finalized_block() {
919        let mut tracker = FinalizedBlockTracker::new(Some(10));
920        assert_eq!(tracker.update(Some(15)), Some(15));
921        assert_eq!(tracker.last_finalized_block, Some(15));
922    }
923
924    #[test]
925    fn test_update_with_lower_finalized_block() {
926        let mut tracker = FinalizedBlockTracker::new(Some(20));
927        assert_eq!(tracker.update(Some(15)), None);
928        assert_eq!(tracker.last_finalized_block, Some(15));
929    }
930
931    #[test]
932    fn test_update_with_equal_finalized_block() {
933        let mut tracker = FinalizedBlockTracker::new(Some(20));
934        assert_eq!(tracker.update(Some(20)), None);
935        assert_eq!(tracker.last_finalized_block, Some(20));
936    }
937
938    #[test]
939    fn test_update_with_no_last_finalized_block() {
940        let mut tracker = FinalizedBlockTracker::new(None);
941        assert_eq!(tracker.update(Some(10)), Some(10));
942        assert_eq!(tracker.last_finalized_block, Some(10));
943    }
944
945    #[test]
946    fn test_update_with_no_new_finalized_block() {
947        let mut tracker = FinalizedBlockTracker::new(Some(10));
948        assert_eq!(tracker.update(None), None);
949        assert_eq!(tracker.last_finalized_block, Some(10));
950    }
951
952    #[test]
953    fn test_update_with_no_finalized_blocks() {
954        let mut tracker = FinalizedBlockTracker::new(None);
955        assert_eq!(tracker.update(None), None);
956        assert_eq!(tracker.last_finalized_block, None);
957    }
958}