Skip to main content

reth_transaction_pool/
maintain.rs

1//! Support for maintaining the state of the transaction pool
2
3use crate::{
4    blobstore::{BlobSidecarConverter, BlobStoreCanonTracker, BlobStoreUpdates},
5    error::PoolError,
6    metrics::MaintainPoolMetrics,
7    traits::{CanonicalStateUpdate, EthPoolTransaction, TransactionPool, TransactionPoolExt},
8    AllPoolTransactions, BlobTransactionSidecarVariant, BlockInfo, PoolTransaction, PoolUpdateKind,
9    TransactionOrigin,
10};
11use alloy_consensus::{transaction::TxHashRef, BlockHeader, Typed2718};
12use alloy_eips::{BlockNumberOrTag, Decodable2718, Encodable2718};
13use alloy_primitives::{
14    map::{AddressSet, HashSet},
15    Address, BlockHash, BlockNumber, Bytes,
16};
17use alloy_rlp::Encodable;
18use futures_util::{
19    future::{BoxFuture, Fuse, FusedFuture},
20    FutureExt, Stream, StreamExt,
21};
22use reth_chain_state::CanonStateNotification;
23use reth_chainspec::{ChainSpecProvider, EthChainSpec, EthereumHardforks};
24use reth_execution_types::ChangedAccount;
25use reth_fs_util::FsPathError;
26use reth_primitives_traits::{
27    transaction::signed::SignedTransaction, NodePrimitives, SealedHeader,
28};
29use reth_storage_api::{errors::provider::ProviderError, BlockReaderIdExt, StateProviderFactory};
30use reth_tasks::TaskSpawner;
31use serde::{Deserialize, Serialize};
32use std::{
33    borrow::Borrow,
34    hash::{Hash, Hasher},
35    path::{Path, PathBuf},
36    sync::Arc,
37};
38use tokio::{
39    sync::oneshot,
40    time::{self, Duration},
41};
42use tracing::{debug, error, info, trace, warn};
43
44/// Maximum amount of time non-executable transaction are queued.
45pub const MAX_QUEUED_TRANSACTION_LIFETIME: Duration = Duration::from_secs(3 * 60 * 60);
46
47/// Additional settings for maintaining the transaction pool
48#[derive(Debug, Clone, Copy, PartialEq, Eq)]
49pub struct MaintainPoolConfig {
50    /// Maximum (reorg) depth we handle when updating the transaction pool: `new.number -
51    /// last_seen.number`
52    ///
53    /// Default: 64 (2 epochs)
54    pub max_update_depth: u64,
55    /// Maximum number of accounts to reload from state at once when updating the transaction pool.
56    ///
57    /// Default: 100
58    pub max_reload_accounts: usize,
59
60    /// Maximum amount of time non-executable, non local transactions are queued.
61    /// Default: 3 hours
62    pub max_tx_lifetime: Duration,
63
64    /// Apply no exemptions to the locally received transactions.
65    ///
66    /// This includes:
67    ///   - no price exemptions
68    ///   - no eviction exemptions
69    pub no_local_exemptions: bool,
70}
71
72impl Default for MaintainPoolConfig {
73    fn default() -> Self {
74        Self {
75            max_update_depth: 64,
76            max_reload_accounts: 100,
77            max_tx_lifetime: MAX_QUEUED_TRANSACTION_LIFETIME,
78            no_local_exemptions: false,
79        }
80    }
81}
82
83/// Settings for local transaction backup task
84#[derive(Debug, Clone, Default)]
85pub struct LocalTransactionBackupConfig {
86    /// Path to transactions backup file
87    pub transactions_path: Option<PathBuf>,
88}
89
90impl LocalTransactionBackupConfig {
91    /// Receive path to transactions backup and return initialized config
92    pub const fn with_local_txs_backup(transactions_path: PathBuf) -> Self {
93        Self { transactions_path: Some(transactions_path) }
94    }
95}
96
97/// Returns a spawnable future for maintaining the state of the transaction pool.
98pub fn maintain_transaction_pool_future<N, Client, P, St, Tasks>(
99    client: Client,
100    pool: P,
101    events: St,
102    task_spawner: Tasks,
103    config: MaintainPoolConfig,
104) -> BoxFuture<'static, ()>
105where
106    N: NodePrimitives,
107    Client: StateProviderFactory
108        + BlockReaderIdExt<Header = N::BlockHeader>
109        + ChainSpecProvider<ChainSpec: EthChainSpec<Header = N::BlockHeader> + EthereumHardforks>
110        + Clone
111        + 'static,
112    P: TransactionPoolExt<Transaction: PoolTransaction<Consensus = N::SignedTx>, Block = N::Block>
113        + 'static,
114    St: Stream<Item = CanonStateNotification<N>> + Send + Unpin + 'static,
115    Tasks: TaskSpawner + Clone + 'static,
116{
117    async move {
118        maintain_transaction_pool(client, pool, events, task_spawner, config).await;
119    }
120    .boxed()
121}
122
123/// Maintains the state of the transaction pool by handling new blocks and reorgs.
124///
125/// This listens for any new blocks and reorgs and updates the transaction pool's state accordingly
126pub async fn maintain_transaction_pool<N, Client, P, St, Tasks>(
127    client: Client,
128    pool: P,
129    mut events: St,
130    task_spawner: Tasks,
131    config: MaintainPoolConfig,
132) where
133    N: NodePrimitives,
134    Client: StateProviderFactory
135        + BlockReaderIdExt<Header = N::BlockHeader>
136        + ChainSpecProvider<ChainSpec: EthChainSpec<Header = N::BlockHeader> + EthereumHardforks>
137        + Clone
138        + 'static,
139    P: TransactionPoolExt<Transaction: PoolTransaction<Consensus = N::SignedTx>, Block = N::Block>
140        + 'static,
141    St: Stream<Item = CanonStateNotification<N>> + Send + Unpin + 'static,
142    Tasks: TaskSpawner + Clone + 'static,
143{
144    let metrics = MaintainPoolMetrics::default();
145    let MaintainPoolConfig { max_update_depth, max_reload_accounts, .. } = config;
146    // ensure the pool points to latest state
147    if let Ok(Some(latest)) = client.header_by_number_or_tag(BlockNumberOrTag::Latest) {
148        let latest = SealedHeader::seal_slow(latest);
149        let chain_spec = client.chain_spec();
150        let info = BlockInfo {
151            block_gas_limit: latest.gas_limit(),
152            last_seen_block_hash: latest.hash(),
153            last_seen_block_number: latest.number(),
154            pending_basefee: chain_spec
155                .next_block_base_fee(latest.header(), latest.timestamp())
156                .unwrap_or_default(),
157            pending_blob_fee: latest
158                .maybe_next_block_blob_fee(chain_spec.blob_params_at_timestamp(latest.timestamp())),
159        };
160        pool.set_block_info(info);
161    }
162
163    // keeps track of mined blob transaction so we can clean finalized transactions
164    let mut blob_store_tracker = BlobStoreCanonTracker::default();
165
166    // keeps track of the latest finalized block
167    let mut last_finalized_block =
168        FinalizedBlockTracker::new(client.finalized_block_number().ok().flatten());
169
170    // keeps track of any dirty accounts that we know of are out of sync with the pool
171    let mut dirty_addresses = HashSet::default();
172
173    // keeps track of the state of the pool wrt to blocks
174    let mut maintained_state = MaintainedPoolState::InSync;
175
176    // the future that reloads accounts from state
177    let mut reload_accounts_fut = Fuse::terminated();
178
179    // eviction interval for stale non local txs
180    let mut stale_eviction_interval = time::interval(config.max_tx_lifetime);
181
182    // toggle for the first notification
183    let mut first_event = true;
184
185    // The update loop that waits for new blocks and reorgs and performs pool updated
186    // Listen for new chain events and derive the update action for the pool
187    loop {
188        trace!(target: "txpool", state=?maintained_state, "awaiting new block or reorg");
189
190        metrics.set_dirty_accounts_len(dirty_addresses.len());
191        let pool_info = pool.block_info();
192
193        // after performing a pool update after a new block we have some time to properly update
194        // dirty accounts and correct if the pool drifted from current state, for example after
195        // restart or a pipeline run
196        if maintained_state.is_drifted() {
197            metrics.inc_drift();
198            // assuming all senders are dirty
199            dirty_addresses = pool.unique_senders();
200            // make sure we toggle the state back to in sync
201            maintained_state = MaintainedPoolState::InSync;
202        }
203
204        // if we have accounts that are out of sync with the pool, we reload them in chunks
205        if !dirty_addresses.is_empty() && reload_accounts_fut.is_terminated() {
206            let (tx, rx) = oneshot::channel();
207            let c = client.clone();
208            let at = pool_info.last_seen_block_hash;
209            let fut = if dirty_addresses.len() > max_reload_accounts {
210                // need to chunk accounts to reload
211                let accs_to_reload =
212                    dirty_addresses.iter().copied().take(max_reload_accounts).collect::<Vec<_>>();
213                for acc in &accs_to_reload {
214                    // make sure we remove them from the dirty set
215                    dirty_addresses.remove(acc);
216                }
217                async move {
218                    let res = load_accounts(c, at, accs_to_reload.into_iter());
219                    let _ = tx.send(res);
220                }
221                .boxed()
222            } else {
223                // can fetch all dirty accounts at once
224                let accs_to_reload = std::mem::take(&mut dirty_addresses);
225                async move {
226                    let res = load_accounts(c, at, accs_to_reload.into_iter());
227                    let _ = tx.send(res);
228                }
229                .boxed()
230            };
231            reload_accounts_fut = rx.fuse();
232            task_spawner.spawn_blocking_task(fut);
233        }
234
235        // check if we have a new finalized block
236        if let Some(finalized) =
237            last_finalized_block.update(client.finalized_block_number().ok().flatten()) &&
238            let BlobStoreUpdates::Finalized(blobs) =
239                blob_store_tracker.on_finalized_block(finalized)
240        {
241            metrics.inc_deleted_tracked_blobs(blobs.len());
242            // remove all finalized blobs from the blob store
243            pool.delete_blobs(blobs);
244            // and also do periodic cleanup
245            let pool = pool.clone();
246            task_spawner.spawn_blocking_task(Box::pin(async move {
247                debug!(target: "txpool", finalized_block = %finalized, "cleaning up blob store");
248                pool.cleanup_blobs();
249            }));
250        }
251
252        // outcomes of the futures we are waiting on
253        let mut event = None;
254        let mut reloaded = None;
255
256        // select of account reloads and new canonical state updates which should arrive at the rate
257        // of the block time
258        tokio::select! {
259            res = &mut reload_accounts_fut =>  {
260                reloaded = Some(res);
261            }
262            ev = events.next() =>  {
263                 if ev.is_none() {
264                    // the stream ended, we are done
265                    break;
266                }
267                event = ev;
268                // on receiving the first event on start up, mark the pool as drifted to explicitly
269                // trigger revalidation and clear out outdated txs.
270                if first_event {
271                    maintained_state = MaintainedPoolState::Drifted;
272                    first_event = false
273                }
274            }
275            _ = stale_eviction_interval.tick() => {
276                let queued = pool
277                    .queued_transactions();
278                let mut stale_blobs = Vec::new();
279                let now = std::time::Instant::now();
280                let stale_txs: Vec<_> = queued
281                    .into_iter()
282                    .filter(|tx| {
283                        // filter stale transactions based on config
284                        (tx.origin.is_external() || config.no_local_exemptions) && now - tx.timestamp > config.max_tx_lifetime
285                    })
286                    .map(|tx| {
287                        if tx.is_eip4844() {
288                            stale_blobs.push(*tx.hash());
289                        }
290                        *tx.hash()
291                    })
292                    .collect();
293                debug!(target: "txpool", count=%stale_txs.len(), "removing stale transactions");
294                pool.remove_transactions(stale_txs);
295                pool.delete_blobs(stale_blobs);
296            }
297        }
298        // handle the result of the account reload
299        match reloaded {
300            Some(Ok(Ok(LoadedAccounts { accounts, failed_to_load }))) => {
301                // reloaded accounts successfully
302                // extend accounts we failed to load from database
303                dirty_addresses.extend(failed_to_load);
304                // update the pool with the loaded accounts
305                pool.update_accounts(accounts);
306            }
307            Some(Ok(Err(res))) => {
308                // Failed to load accounts from state
309                let (accs, err) = *res;
310                debug!(target: "txpool", %err, "failed to load accounts");
311                dirty_addresses.extend(accs);
312            }
313            Some(Err(_)) => {
314                // failed to receive the accounts, sender dropped, only possible if task panicked
315                maintained_state = MaintainedPoolState::Drifted;
316            }
317            None => {}
318        }
319
320        // handle the new block or reorg
321        let Some(event) = event else { continue };
322        match event {
323            CanonStateNotification::Reorg { old, new } => {
324                let (old_blocks, old_state) = old.inner();
325                let (new_blocks, new_state) = new.inner();
326                let new_tip = new_blocks.tip();
327                let new_first = new_blocks.first();
328                let old_first = old_blocks.first();
329
330                // check if the reorg is not canonical with the pool's block
331                if !(old_first.parent_hash() == pool_info.last_seen_block_hash ||
332                    new_first.parent_hash() == pool_info.last_seen_block_hash)
333                {
334                    // the new block points to a higher block than the oldest block in the old chain
335                    maintained_state = MaintainedPoolState::Drifted;
336                }
337
338                let chain_spec = client.chain_spec();
339
340                // fees for the next block: `new_tip+1`
341                let pending_block_base_fee = chain_spec
342                    .next_block_base_fee(new_tip.header(), new_tip.timestamp())
343                    .unwrap_or_default();
344                let pending_block_blob_fee = new_tip.header().maybe_next_block_blob_fee(
345                    chain_spec.blob_params_at_timestamp(new_tip.timestamp()),
346                );
347
348                // we know all changed account in the new chain
349                let new_changed_accounts: HashSet<_> =
350                    new_state.changed_accounts().map(ChangedAccountEntry).collect();
351
352                // find all accounts that were changed in the old chain but _not_ in the new chain
353                let missing_changed_acc = old_state
354                    .accounts_iter()
355                    .map(|(a, _)| a)
356                    .filter(|addr| !new_changed_accounts.contains(addr));
357
358                // for these we need to fetch the nonce+balance from the db at the new tip
359                let mut changed_accounts =
360                    match load_accounts(client.clone(), new_tip.hash(), missing_changed_acc) {
361                        Ok(LoadedAccounts { accounts, failed_to_load }) => {
362                            // extend accounts we failed to load from database
363                            dirty_addresses.extend(failed_to_load);
364
365                            accounts
366                        }
367                        Err(err) => {
368                            let (addresses, err) = *err;
369                            debug!(
370                                target: "txpool",
371                                %err,
372                                "failed to load missing changed accounts at new tip: {:?}",
373                                new_tip.hash()
374                            );
375                            dirty_addresses.extend(addresses);
376                            vec![]
377                        }
378                    };
379
380                // also include all accounts from new chain
381                // we can use extend here because they are unique
382                changed_accounts.extend(new_changed_accounts.into_iter().map(|entry| entry.0));
383
384                // all transactions mined in the new chain
385                let new_mined_transactions: HashSet<_> = new_blocks.transaction_hashes().collect();
386
387                // update the pool then re-inject the pruned transactions
388                // find all transactions that were mined in the old chain but not in the new chain
389                let pruned_old_transactions = old_blocks
390                    .transactions_ecrecovered()
391                    .filter(|tx| !new_mined_transactions.contains(tx.tx_hash()))
392                    .filter_map(|tx| {
393                        if tx.is_eip4844() {
394                            // reorged blobs no longer include the blob, which is necessary for
395                            // validating the transaction. Even though the transaction could have
396                            // been validated previously, we still need the blob in order to
397                            // accurately set the transaction's
398                            // encoded-length which is propagated over the network.
399                            pool.get_blob(*tx.tx_hash())
400                                .ok()
401                                .flatten()
402                                .map(Arc::unwrap_or_clone)
403                                .and_then(|sidecar| {
404                                    <P as TransactionPool>::Transaction::try_from_eip4844(
405                                        tx, sidecar,
406                                    )
407                                })
408                        } else {
409                            <P as TransactionPool>::Transaction::try_from_consensus(tx).ok()
410                        }
411                    })
412                    .collect::<Vec<_>>();
413
414                // update the pool first
415                let update = CanonicalStateUpdate {
416                    new_tip: new_tip.sealed_block(),
417                    pending_block_base_fee,
418                    pending_block_blob_fee,
419                    changed_accounts,
420                    // all transactions mined in the new chain need to be removed from the pool
421                    mined_transactions: new_blocks.transaction_hashes().collect(),
422                    update_kind: PoolUpdateKind::Reorg,
423                };
424                pool.on_canonical_state_change(update);
425
426                // all transactions that were mined in the old chain but not in the new chain need
427                // to be re-injected
428                //
429                // Note: we no longer know if the tx was local or external
430                // Because the transactions are not finalized, the corresponding blobs are still in
431                // blob store (if we previously received them from the network)
432                metrics.inc_reinserted_transactions(pruned_old_transactions.len());
433                let _ = pool.add_external_transactions(pruned_old_transactions).await;
434
435                // keep track of new mined blob transactions
436                blob_store_tracker.add_new_chain_blocks(&new_blocks);
437            }
438            CanonStateNotification::Commit { new } => {
439                let (blocks, state) = new.inner();
440                let tip = blocks.tip();
441                let chain_spec = client.chain_spec();
442
443                // fees for the next block: `tip+1`
444                let pending_block_base_fee = chain_spec
445                    .next_block_base_fee(tip.header(), tip.timestamp())
446                    .unwrap_or_default();
447                let pending_block_blob_fee = tip.header().maybe_next_block_blob_fee(
448                    chain_spec.blob_params_at_timestamp(tip.timestamp()),
449                );
450
451                let first_block = blocks.first();
452                trace!(
453                    target: "txpool",
454                    first = first_block.number(),
455                    tip = tip.number(),
456                    pool_block = pool_info.last_seen_block_number,
457                    "update pool on new commit"
458                );
459
460                // check if the depth is too large and should be skipped, this could happen after
461                // initial sync or long re-sync
462                let depth = tip.number().abs_diff(pool_info.last_seen_block_number);
463                if depth > max_update_depth {
464                    maintained_state = MaintainedPoolState::Drifted;
465                    debug!(target: "txpool", ?depth, "skipping deep canonical update");
466                    let info = BlockInfo {
467                        block_gas_limit: tip.header().gas_limit(),
468                        last_seen_block_hash: tip.hash(),
469                        last_seen_block_number: tip.number(),
470                        pending_basefee: pending_block_base_fee,
471                        pending_blob_fee: pending_block_blob_fee,
472                    };
473                    pool.set_block_info(info);
474
475                    // keep track of mined blob transactions
476                    blob_store_tracker.add_new_chain_blocks(&blocks);
477
478                    continue
479                }
480
481                let mut changed_accounts = Vec::with_capacity(state.state().len());
482                for acc in state.changed_accounts() {
483                    // we can always clear the dirty flag for this account
484                    dirty_addresses.remove(&acc.address);
485                    changed_accounts.push(acc);
486                }
487
488                let mined_transactions = blocks.transaction_hashes().collect();
489
490                // check if the range of the commit is canonical with the pool's block
491                if first_block.parent_hash() != pool_info.last_seen_block_hash {
492                    // we received a new canonical chain commit but the commit is not canonical with
493                    // the pool's block, this could happen after initial sync or
494                    // long re-sync
495                    maintained_state = MaintainedPoolState::Drifted;
496                }
497
498                // Canonical update
499                let update = CanonicalStateUpdate {
500                    new_tip: tip.sealed_block(),
501                    pending_block_base_fee,
502                    pending_block_blob_fee,
503                    changed_accounts,
504                    mined_transactions,
505                    update_kind: PoolUpdateKind::Commit,
506                };
507                pool.on_canonical_state_change(update);
508
509                // keep track of mined blob transactions
510                blob_store_tracker.add_new_chain_blocks(&blocks);
511
512                // If Osaka activates in 2 slots we need to convert blobs to new format.
513                if !chain_spec.is_osaka_active_at_timestamp(tip.timestamp()) &&
514                    !chain_spec.is_osaka_active_at_timestamp(tip.timestamp().saturating_add(12)) &&
515                    chain_spec.is_osaka_active_at_timestamp(tip.timestamp().saturating_add(24))
516                {
517                    let pool = pool.clone();
518                    let spawner = task_spawner.clone();
519                    let client = client.clone();
520                    task_spawner.spawn_task(Box::pin(async move {
521                        // Start converting not eaerlier than 4 seconds into current slot to ensure
522                        // that our pool only contains valid transactions for the next block (as
523                        // it's not Osaka yet).
524                        tokio::time::sleep(Duration::from_secs(4)).await;
525
526                        let mut interval = tokio::time::interval(Duration::from_secs(1));
527                        loop {
528                            // Loop and replace blob transactions until we reach Osaka transition
529                            // block after which no legacy blobs are going to be accepted.
530                            let last_iteration =
531                                client.latest_header().ok().flatten().is_none_or(|header| {
532                                    client
533                                        .chain_spec()
534                                        .is_osaka_active_at_timestamp(header.timestamp())
535                                });
536
537                            let AllPoolTransactions { pending, queued } = pool.all_transactions();
538                            for tx in pending
539                                .into_iter()
540                                .chain(queued)
541                                .filter(|tx| tx.transaction.is_eip4844())
542                            {
543                                let tx_hash = *tx.transaction.hash();
544
545                                // Fetch sidecar from the pool
546                                let Ok(Some(sidecar)) = pool.get_blob(tx_hash) else {
547                                    continue;
548                                };
549                                // Ensure it is a legacy blob
550                                if !sidecar.is_eip4844() {
551                                    continue;
552                                }
553                                // Remove transaction and sidecar from the pool, both are in memory
554                                // now
555                                let Some(tx) = pool.remove_transactions(vec![tx_hash]).pop() else {
556                                    continue;
557                                };
558                                pool.delete_blob(tx_hash);
559
560                                let BlobTransactionSidecarVariant::Eip4844(sidecar) =
561                                    Arc::unwrap_or_clone(sidecar)
562                                else {
563                                    continue;
564                                };
565
566                                let converter = BlobSidecarConverter::new();
567                                let pool = pool.clone();
568                                spawner.spawn_task(Box::pin(async move {
569                                    // Convert sidecar to EIP-7594 format
570                                    let Some(sidecar) = converter.convert(sidecar).await else {
571                                        return;
572                                    };
573
574                                    // Re-insert transaction with the new sidecar
575                                    let origin = tx.origin;
576                                    let Some(tx) = EthPoolTransaction::try_from_eip4844(
577                                        tx.transaction.clone_into_consensus(),
578                                        sidecar.into(),
579                                    ) else {
580                                        return;
581                                    };
582                                    let _ = pool.add_transaction(origin, tx).await;
583                                }));
584                            }
585
586                            if last_iteration {
587                                break;
588                            }
589
590                            interval.tick().await;
591                        }
592                    }));
593                }
594            }
595        }
596    }
597}
598
599struct FinalizedBlockTracker {
600    last_finalized_block: Option<BlockNumber>,
601}
602
603impl FinalizedBlockTracker {
604    const fn new(last_finalized_block: Option<BlockNumber>) -> Self {
605        Self { last_finalized_block }
606    }
607
608    /// Updates the tracked finalized block and returns the new finalized block if it changed
609    fn update(&mut self, finalized_block: Option<BlockNumber>) -> Option<BlockNumber> {
610        let finalized = finalized_block?;
611        self.last_finalized_block.is_none_or(|last| last < finalized).then(|| {
612            self.last_finalized_block = Some(finalized);
613            finalized
614        })
615    }
616}
617
618/// Keeps track of the pool's state, whether the accounts in the pool are in sync with the actual
619/// state.
620#[derive(Debug, PartialEq, Eq)]
621enum MaintainedPoolState {
622    /// Pool is assumed to be in sync with the current state
623    InSync,
624    /// Pool could be out of sync with the state
625    Drifted,
626}
627
628impl MaintainedPoolState {
629    /// Returns `true` if the pool is assumed to be out of sync with the current state.
630    #[inline]
631    const fn is_drifted(&self) -> bool {
632        matches!(self, Self::Drifted)
633    }
634}
635
636/// A unique [`ChangedAccount`] identified by its address that can be used for deduplication
637#[derive(Eq)]
638struct ChangedAccountEntry(ChangedAccount);
639
640impl PartialEq for ChangedAccountEntry {
641    fn eq(&self, other: &Self) -> bool {
642        self.0.address == other.0.address
643    }
644}
645
646impl Hash for ChangedAccountEntry {
647    fn hash<H: Hasher>(&self, state: &mut H) {
648        self.0.address.hash(state);
649    }
650}
651
652impl Borrow<Address> for ChangedAccountEntry {
653    fn borrow(&self) -> &Address {
654        &self.0.address
655    }
656}
657
658#[derive(Default)]
659struct LoadedAccounts {
660    /// All accounts that were loaded
661    accounts: Vec<ChangedAccount>,
662    /// All accounts that failed to load
663    failed_to_load: Vec<Address>,
664}
665
666/// Loads all accounts at the given state
667///
668/// Returns an error with all given addresses if the state is not available.
669///
670/// Note: this expects _unique_ addresses
671fn load_accounts<Client, I>(
672    client: Client,
673    at: BlockHash,
674    addresses: I,
675) -> Result<LoadedAccounts, Box<(AddressSet, ProviderError)>>
676where
677    I: IntoIterator<Item = Address>,
678    Client: StateProviderFactory,
679{
680    let addresses = addresses.into_iter();
681    let mut res = LoadedAccounts::default();
682    let state = match client.history_by_block_hash(at) {
683        Ok(state) => state,
684        Err(err) => return Err(Box::new((addresses.collect(), err))),
685    };
686    for addr in addresses {
687        if let Ok(maybe_acc) = state.basic_account(&addr) {
688            let acc = maybe_acc
689                .map(|acc| ChangedAccount { address: addr, nonce: acc.nonce, balance: acc.balance })
690                .unwrap_or_else(|| ChangedAccount::empty(addr));
691            res.accounts.push(acc)
692        } else {
693            // failed to load account.
694            res.failed_to_load.push(addr);
695        }
696    }
697    Ok(res)
698}
699
700/// Loads transactions from a file, decodes them from the JSON or RLP format, and
701/// inserts them into the transaction pool on node boot up.
702/// The file is removed after the transactions have been successfully processed.
703async fn load_and_reinsert_transactions<P>(
704    pool: P,
705    file_path: &Path,
706) -> Result<(), TransactionsBackupError>
707where
708    P: TransactionPool<Transaction: PoolTransaction<Consensus: SignedTransaction>>,
709{
710    if !file_path.exists() {
711        return Ok(())
712    }
713
714    debug!(target: "txpool", txs_file =?file_path, "Check local persistent storage for saved transactions");
715    let data = reth_fs_util::read(file_path)?;
716
717    if data.is_empty() {
718        return Ok(())
719    }
720
721    let pool_transactions: Vec<(TransactionOrigin, <P as TransactionPool>::Transaction)> =
722        if let Ok(tx_backups) = serde_json::from_slice::<Vec<TxBackup>>(&data) {
723            tx_backups
724                .into_iter()
725                .filter_map(|backup| {
726                    let tx_signed =
727                        <P::Transaction as PoolTransaction>::Consensus::decode_2718_exact(
728                            backup.rlp.as_ref(),
729                        )
730                        .ok()?;
731                    let recovered = tx_signed.try_into_recovered().ok()?;
732                    let pool_tx =
733                        <P::Transaction as PoolTransaction>::try_from_consensus(recovered).ok()?;
734
735                    Some((backup.origin, pool_tx))
736                })
737                .collect()
738        } else {
739            let txs_signed: Vec<<P::Transaction as PoolTransaction>::Consensus> =
740                alloy_rlp::Decodable::decode(&mut data.as_slice())?;
741
742            txs_signed
743                .into_iter()
744                .filter_map(|tx| tx.try_into_recovered().ok())
745                .filter_map(|tx| {
746                    <P::Transaction as PoolTransaction>::try_from_consensus(tx)
747                        .ok()
748                        .map(|pool_tx| (TransactionOrigin::Local, pool_tx))
749                })
750                .collect()
751        };
752
753    let inserted = futures_util::future::join_all(
754        pool_transactions.into_iter().map(|(origin, tx)| pool.add_transaction(origin, tx)),
755    )
756    .await;
757
758    info!(target: "txpool", txs_file =?file_path, num_txs=%inserted.len(), "Successfully reinserted local transactions from file");
759    reth_fs_util::remove_file(file_path)?;
760    Ok(())
761}
762
763fn save_local_txs_backup<P>(pool: P, file_path: &Path)
764where
765    P: TransactionPool<Transaction: PoolTransaction<Consensus: Encodable>>,
766{
767    let local_transactions = pool.get_local_transactions();
768    if local_transactions.is_empty() {
769        trace!(target: "txpool", "no local transactions to save");
770        return
771    }
772
773    let local_transactions = local_transactions
774        .into_iter()
775        .map(|tx| {
776            let consensus_tx = tx.transaction.clone_into_consensus().into_inner();
777            let rlp_data = consensus_tx.encoded_2718();
778
779            TxBackup { rlp: rlp_data.into(), origin: tx.origin }
780        })
781        .collect::<Vec<_>>();
782
783    let json_data = match serde_json::to_string(&local_transactions) {
784        Ok(data) => data,
785        Err(err) => {
786            warn!(target: "txpool", %err, txs_file=?file_path, "failed to serialize local transactions to json");
787            return
788        }
789    };
790
791    info!(target: "txpool", txs_file =?file_path, num_txs=%local_transactions.len(), "Saving current local transactions");
792    let parent_dir = file_path.parent().map(std::fs::create_dir_all).transpose();
793
794    match parent_dir.map(|_| reth_fs_util::write(file_path, json_data)) {
795        Ok(_) => {
796            info!(target: "txpool", txs_file=?file_path, "Wrote local transactions to file");
797        }
798        Err(err) => {
799            warn!(target: "txpool", %err, txs_file=?file_path, "Failed to write local transactions to file");
800        }
801    }
802}
803
804/// A transaction backup that is saved as json to a file for
805/// reinsertion into the pool
806#[derive(Debug, Deserialize, Serialize)]
807pub struct TxBackup {
808    /// Encoded transaction
809    pub rlp: Bytes,
810    /// The origin of the transaction
811    pub origin: TransactionOrigin,
812}
813
814/// Errors possible during txs backup load and decode
815#[derive(thiserror::Error, Debug)]
816pub enum TransactionsBackupError {
817    /// Error during RLP decoding of transactions
818    #[error("failed to apply transactions backup. Encountered RLP decode error: {0}")]
819    Decode(#[from] alloy_rlp::Error),
820    /// Error during json decoding of transactions
821    #[error("failed to apply transactions backup. Encountered JSON decode error: {0}")]
822    Json(#[from] serde_json::Error),
823    /// Error during file upload
824    #[error("failed to apply transactions backup. Encountered file error: {0}")]
825    FsPath(#[from] FsPathError),
826    /// Error adding transactions to the transaction pool
827    #[error("failed to insert transactions to the transactions pool. Encountered pool error: {0}")]
828    Pool(#[from] PoolError),
829}
830
831/// Task which manages saving local transactions to the persistent file in case of shutdown.
832/// Reloads the transactions from the file on the boot up and inserts them into the pool.
833pub async fn backup_local_transactions_task<P>(
834    shutdown: reth_tasks::shutdown::GracefulShutdown,
835    pool: P,
836    config: LocalTransactionBackupConfig,
837) where
838    P: TransactionPool<Transaction: PoolTransaction<Consensus: SignedTransaction>> + Clone,
839{
840    let Some(transactions_path) = config.transactions_path else {
841        // nothing to do
842        return
843    };
844
845    if let Err(err) = load_and_reinsert_transactions(pool.clone(), &transactions_path).await {
846        error!(target: "txpool", "{}", err)
847    }
848
849    let graceful_guard = shutdown.await;
850
851    // write transactions to disk
852    save_local_txs_backup(pool, &transactions_path);
853
854    drop(graceful_guard)
855}
856
857#[cfg(test)]
858mod tests {
859    use super::*;
860    use crate::{
861        blobstore::InMemoryBlobStore, validate::EthTransactionValidatorBuilder,
862        CoinbaseTipOrdering, EthPooledTransaction, Pool, TransactionOrigin,
863    };
864    use alloy_eips::eip2718::Decodable2718;
865    use alloy_primitives::{hex, U256};
866    use reth_ethereum_primitives::PooledTransactionVariant;
867    use reth_evm_ethereum::EthEvmConfig;
868    use reth_fs_util as fs;
869    use reth_provider::test_utils::{ExtendedAccount, MockEthProvider};
870    use reth_tasks::Runtime;
871
872    #[test]
873    fn changed_acc_entry() {
874        let changed_acc = ChangedAccountEntry(ChangedAccount::empty(Address::random()));
875        let mut copy = changed_acc.0;
876        copy.nonce = 10;
877        assert!(changed_acc.eq(&ChangedAccountEntry(copy)));
878    }
879
880    const EXTENSION: &str = "json";
881    const FILENAME: &str = "test_transactions_backup";
882
883    #[tokio::test(flavor = "multi_thread")]
884    async fn test_save_local_txs_backup() {
885        let temp_dir = tempfile::tempdir().unwrap();
886        let transactions_path = temp_dir.path().join(FILENAME).with_extension(EXTENSION);
887        let tx_bytes = hex!(
888            "02f87201830655c2808505ef61f08482565f94388c818ca8b9251b393131c08a736a67ccb192978801049e39c4b5b1f580c001a01764ace353514e8abdfb92446de356b260e3c1225b73fc4c8876a6258d12a129a04f02294aa61ca7676061cd99f29275491218b4754b46a0248e5e42bc5091f507"
889        );
890        let tx = PooledTransactionVariant::decode_2718(&mut &tx_bytes[..]).unwrap();
891        let provider = MockEthProvider::default().with_genesis_block();
892        let transaction = EthPooledTransaction::from_pooled(tx.try_into_recovered().unwrap());
893        let tx_to_cmp = transaction.clone();
894        let sender = hex!("1f9090aaE28b8a3dCeaDf281B0F12828e676c326").into();
895        provider.add_account(sender, ExtendedAccount::new(42, U256::MAX));
896        let blob_store = InMemoryBlobStore::default();
897        let validator = EthTransactionValidatorBuilder::new(provider, EthEvmConfig::mainnet())
898            .build(blob_store.clone());
899
900        let txpool = Pool::new(
901            validator,
902            CoinbaseTipOrdering::default(),
903            blob_store.clone(),
904            Default::default(),
905        );
906
907        txpool.add_transaction(TransactionOrigin::Local, transaction.clone()).await.unwrap();
908
909        let rt = Runtime::with_existing_handle(tokio::runtime::Handle::current()).unwrap();
910        let config = LocalTransactionBackupConfig::with_local_txs_backup(transactions_path.clone());
911        rt.spawn_critical_with_graceful_shutdown_signal("test task", |shutdown| {
912            backup_local_transactions_task(shutdown, txpool.clone(), config)
913        });
914
915        let mut txns = txpool.get_local_transactions();
916        let tx_on_finish = txns.pop().expect("there should be 1 transaction");
917
918        assert_eq!(*tx_to_cmp.hash(), *tx_on_finish.hash());
919
920        rt.graceful_shutdown();
921
922        let data = fs::read(transactions_path).unwrap();
923
924        let txs: Vec<TxBackup> = serde_json::from_slice::<Vec<TxBackup>>(&data).unwrap();
925        assert_eq!(txs.len(), 1);
926
927        temp_dir.close().unwrap();
928    }
929
930    #[test]
931    fn test_update_with_higher_finalized_block() {
932        let mut tracker = FinalizedBlockTracker::new(Some(10));
933        assert_eq!(tracker.update(Some(15)), Some(15));
934        assert_eq!(tracker.last_finalized_block, Some(15));
935    }
936
937    #[test]
938    fn test_update_with_lower_finalized_block() {
939        let mut tracker = FinalizedBlockTracker::new(Some(20));
940        assert_eq!(tracker.update(Some(15)), None);
941        // finalized block should NOT go backwards
942        assert_eq!(tracker.last_finalized_block, Some(20));
943    }
944
945    #[test]
946    fn test_update_with_equal_finalized_block() {
947        let mut tracker = FinalizedBlockTracker::new(Some(20));
948        assert_eq!(tracker.update(Some(20)), None);
949        assert_eq!(tracker.last_finalized_block, Some(20));
950    }
951
952    #[test]
953    fn test_update_with_no_last_finalized_block() {
954        let mut tracker = FinalizedBlockTracker::new(None);
955        assert_eq!(tracker.update(Some(10)), Some(10));
956        assert_eq!(tracker.last_finalized_block, Some(10));
957    }
958
959    #[test]
960    fn test_update_with_no_new_finalized_block() {
961        let mut tracker = FinalizedBlockTracker::new(Some(10));
962        assert_eq!(tracker.update(None), None);
963        assert_eq!(tracker.last_finalized_block, Some(10));
964    }
965
966    #[test]
967    fn test_update_with_no_finalized_blocks() {
968        let mut tracker = FinalizedBlockTracker::new(None);
969        assert_eq!(tracker.update(None), None);
970        assert_eq!(tracker.last_finalized_block, None);
971    }
972}