1use crate::{
4 blobstore::{BlobSidecarConverter, BlobStoreCanonTracker, BlobStoreUpdates},
5 error::PoolError,
6 metrics::MaintainPoolMetrics,
7 traits::{CanonicalStateUpdate, EthPoolTransaction, TransactionPool, TransactionPoolExt},
8 AllPoolTransactions, BlobTransactionSidecarVariant, BlockInfo, PoolTransaction, PoolUpdateKind,
9 TransactionOrigin,
10};
11use alloy_consensus::{transaction::TxHashRef, BlockHeader, Typed2718};
12use alloy_eips::{BlockNumberOrTag, Decodable2718, Encodable2718};
13use alloy_primitives::{Address, BlockHash, BlockNumber, Bytes};
14use alloy_rlp::Encodable;
15use futures_util::{
16 future::{BoxFuture, Fuse, FusedFuture},
17 FutureExt, Stream, StreamExt,
18};
19use reth_chain_state::CanonStateNotification;
20use reth_chainspec::{ChainSpecProvider, EthChainSpec, EthereumHardforks};
21use reth_execution_types::ChangedAccount;
22use reth_fs_util::FsPathError;
23use reth_primitives_traits::{
24 transaction::signed::SignedTransaction, NodePrimitives, SealedHeader,
25};
26use reth_storage_api::{errors::provider::ProviderError, BlockReaderIdExt, StateProviderFactory};
27use reth_tasks::TaskSpawner;
28use serde::{Deserialize, Serialize};
29use std::{
30 borrow::Borrow,
31 collections::HashSet,
32 hash::{Hash, Hasher},
33 path::{Path, PathBuf},
34 sync::Arc,
35};
36use tokio::{
37 sync::oneshot,
38 time::{self, Duration},
39};
40use tracing::{debug, error, info, trace, warn};
41
42pub const MAX_QUEUED_TRANSACTION_LIFETIME: Duration = Duration::from_secs(3 * 60 * 60);
44
45#[derive(Debug, Clone, Copy, PartialEq, Eq)]
47pub struct MaintainPoolConfig {
48 pub max_update_depth: u64,
53 pub max_reload_accounts: usize,
57
58 pub max_tx_lifetime: Duration,
61
62 pub no_local_exemptions: bool,
68}
69
70impl Default for MaintainPoolConfig {
71 fn default() -> Self {
72 Self {
73 max_update_depth: 64,
74 max_reload_accounts: 100,
75 max_tx_lifetime: MAX_QUEUED_TRANSACTION_LIFETIME,
76 no_local_exemptions: false,
77 }
78 }
79}
80
81#[derive(Debug, Clone, Default)]
83pub struct LocalTransactionBackupConfig {
84 pub transactions_path: Option<PathBuf>,
86}
87
88impl LocalTransactionBackupConfig {
89 pub const fn with_local_txs_backup(transactions_path: PathBuf) -> Self {
91 Self { transactions_path: Some(transactions_path) }
92 }
93}
94
95pub fn maintain_transaction_pool_future<N, Client, P, St, Tasks>(
97 client: Client,
98 pool: P,
99 events: St,
100 task_spawner: Tasks,
101 config: MaintainPoolConfig,
102) -> BoxFuture<'static, ()>
103where
104 N: NodePrimitives,
105 Client: StateProviderFactory
106 + BlockReaderIdExt<Header = N::BlockHeader>
107 + ChainSpecProvider<ChainSpec: EthChainSpec<Header = N::BlockHeader> + EthereumHardforks>
108 + Clone
109 + 'static,
110 P: TransactionPoolExt<Transaction: PoolTransaction<Consensus = N::SignedTx>> + 'static,
111 St: Stream<Item = CanonStateNotification<N>> + Send + Unpin + 'static,
112 Tasks: TaskSpawner + Clone + 'static,
113{
114 async move {
115 maintain_transaction_pool(client, pool, events, task_spawner, config).await;
116 }
117 .boxed()
118}
119
120pub async fn maintain_transaction_pool<N, Client, P, St, Tasks>(
124 client: Client,
125 pool: P,
126 mut events: St,
127 task_spawner: Tasks,
128 config: MaintainPoolConfig,
129) where
130 N: NodePrimitives,
131 Client: StateProviderFactory
132 + BlockReaderIdExt<Header = N::BlockHeader>
133 + ChainSpecProvider<ChainSpec: EthChainSpec<Header = N::BlockHeader> + EthereumHardforks>
134 + Clone
135 + 'static,
136 P: TransactionPoolExt<Transaction: PoolTransaction<Consensus = N::SignedTx>> + 'static,
137 St: Stream<Item = CanonStateNotification<N>> + Send + Unpin + 'static,
138 Tasks: TaskSpawner + Clone + 'static,
139{
140 let metrics = MaintainPoolMetrics::default();
141 let MaintainPoolConfig { max_update_depth, max_reload_accounts, .. } = config;
142 if let Ok(Some(latest)) = client.header_by_number_or_tag(BlockNumberOrTag::Latest) {
144 let latest = SealedHeader::seal_slow(latest);
145 let chain_spec = client.chain_spec();
146 let info = BlockInfo {
147 block_gas_limit: latest.gas_limit(),
148 last_seen_block_hash: latest.hash(),
149 last_seen_block_number: latest.number(),
150 pending_basefee: chain_spec
151 .next_block_base_fee(latest.header(), latest.timestamp())
152 .unwrap_or_default(),
153 pending_blob_fee: latest
154 .maybe_next_block_blob_fee(chain_spec.blob_params_at_timestamp(latest.timestamp())),
155 };
156 pool.set_block_info(info);
157 }
158
159 let mut blob_store_tracker = BlobStoreCanonTracker::default();
161
162 let mut last_finalized_block =
164 FinalizedBlockTracker::new(client.finalized_block_number().ok().flatten());
165
166 let mut dirty_addresses = HashSet::default();
168
169 let mut maintained_state = MaintainedPoolState::InSync;
171
172 let mut reload_accounts_fut = Fuse::terminated();
174
175 let mut stale_eviction_interval = time::interval(config.max_tx_lifetime);
177
178 let mut first_event = true;
180
181 loop {
184 trace!(target: "txpool", state=?maintained_state, "awaiting new block or reorg");
185
186 metrics.set_dirty_accounts_len(dirty_addresses.len());
187 let pool_info = pool.block_info();
188
189 if maintained_state.is_drifted() {
193 metrics.inc_drift();
194 dirty_addresses = pool.unique_senders();
196 maintained_state = MaintainedPoolState::InSync;
198 }
199
200 if !dirty_addresses.is_empty() && reload_accounts_fut.is_terminated() {
202 let (tx, rx) = oneshot::channel();
203 let c = client.clone();
204 let at = pool_info.last_seen_block_hash;
205 let fut = if dirty_addresses.len() > max_reload_accounts {
206 let accs_to_reload =
208 dirty_addresses.iter().copied().take(max_reload_accounts).collect::<Vec<_>>();
209 for acc in &accs_to_reload {
210 dirty_addresses.remove(acc);
212 }
213 async move {
214 let res = load_accounts(c, at, accs_to_reload.into_iter());
215 let _ = tx.send(res);
216 }
217 .boxed()
218 } else {
219 let accs_to_reload = std::mem::take(&mut dirty_addresses);
221 async move {
222 let res = load_accounts(c, at, accs_to_reload.into_iter());
223 let _ = tx.send(res);
224 }
225 .boxed()
226 };
227 reload_accounts_fut = rx.fuse();
228 task_spawner.spawn_blocking(fut);
229 }
230
231 if let Some(finalized) =
233 last_finalized_block.update(client.finalized_block_number().ok().flatten()) &&
234 let BlobStoreUpdates::Finalized(blobs) =
235 blob_store_tracker.on_finalized_block(finalized)
236 {
237 metrics.inc_deleted_tracked_blobs(blobs.len());
238 pool.delete_blobs(blobs);
240 let pool = pool.clone();
242 task_spawner.spawn_blocking(Box::pin(async move {
243 debug!(target: "txpool", finalized_block = %finalized, "cleaning up blob store");
244 pool.cleanup_blobs();
245 }));
246 }
247
248 let mut event = None;
250 let mut reloaded = None;
251
252 tokio::select! {
255 res = &mut reload_accounts_fut => {
256 reloaded = Some(res);
257 }
258 ev = events.next() => {
259 if ev.is_none() {
260 break;
262 }
263 event = ev;
264 if first_event {
267 maintained_state = MaintainedPoolState::Drifted;
268 first_event = false
269 }
270 }
271 _ = stale_eviction_interval.tick() => {
272 let queued = pool
273 .queued_transactions();
274 let mut stale_blobs = Vec::new();
275 let now = std::time::Instant::now();
276 let stale_txs: Vec<_> = queued
277 .into_iter()
278 .filter(|tx| {
279 (tx.origin.is_external() || config.no_local_exemptions) && now - tx.timestamp > config.max_tx_lifetime
281 })
282 .map(|tx| {
283 if tx.is_eip4844() {
284 stale_blobs.push(*tx.hash());
285 }
286 *tx.hash()
287 })
288 .collect();
289 debug!(target: "txpool", count=%stale_txs.len(), "removing stale transactions");
290 pool.remove_transactions(stale_txs);
291 pool.delete_blobs(stale_blobs);
292 }
293 }
294 match reloaded {
296 Some(Ok(Ok(LoadedAccounts { accounts, failed_to_load }))) => {
297 dirty_addresses.extend(failed_to_load);
300 pool.update_accounts(accounts);
302 }
303 Some(Ok(Err(res))) => {
304 let (accs, err) = *res;
306 debug!(target: "txpool", %err, "failed to load accounts");
307 dirty_addresses.extend(accs);
308 }
309 Some(Err(_)) => {
310 maintained_state = MaintainedPoolState::Drifted;
312 }
313 None => {}
314 }
315
316 let Some(event) = event else { continue };
318 match event {
319 CanonStateNotification::Reorg { old, new } => {
320 let (old_blocks, old_state) = old.inner();
321 let (new_blocks, new_state) = new.inner();
322 let new_tip = new_blocks.tip();
323 let new_first = new_blocks.first();
324 let old_first = old_blocks.first();
325
326 if !(old_first.parent_hash() == pool_info.last_seen_block_hash ||
328 new_first.parent_hash() == pool_info.last_seen_block_hash)
329 {
330 maintained_state = MaintainedPoolState::Drifted;
332 }
333
334 let chain_spec = client.chain_spec();
335
336 let pending_block_base_fee = chain_spec
338 .next_block_base_fee(new_tip.header(), new_tip.timestamp())
339 .unwrap_or_default();
340 let pending_block_blob_fee = new_tip.header().maybe_next_block_blob_fee(
341 chain_spec.blob_params_at_timestamp(new_tip.timestamp()),
342 );
343
344 let new_changed_accounts: HashSet<_> =
346 new_state.changed_accounts().map(ChangedAccountEntry).collect();
347
348 let missing_changed_acc = old_state
350 .accounts_iter()
351 .map(|(a, _)| a)
352 .filter(|addr| !new_changed_accounts.contains(addr));
353
354 let mut changed_accounts =
356 match load_accounts(client.clone(), new_tip.hash(), missing_changed_acc) {
357 Ok(LoadedAccounts { accounts, failed_to_load }) => {
358 dirty_addresses.extend(failed_to_load);
360
361 accounts
362 }
363 Err(err) => {
364 let (addresses, err) = *err;
365 debug!(
366 target: "txpool",
367 %err,
368 "failed to load missing changed accounts at new tip: {:?}",
369 new_tip.hash()
370 );
371 dirty_addresses.extend(addresses);
372 vec![]
373 }
374 };
375
376 changed_accounts.extend(new_changed_accounts.into_iter().map(|entry| entry.0));
379
380 let new_mined_transactions: HashSet<_> = new_blocks.transaction_hashes().collect();
382
383 let pruned_old_transactions = old_blocks
386 .transactions_ecrecovered()
387 .filter(|tx| !new_mined_transactions.contains(tx.tx_hash()))
388 .filter_map(|tx| {
389 if tx.is_eip4844() {
390 pool.get_blob(*tx.tx_hash())
396 .ok()
397 .flatten()
398 .map(Arc::unwrap_or_clone)
399 .and_then(|sidecar| {
400 <P as TransactionPool>::Transaction::try_from_eip4844(
401 tx, sidecar,
402 )
403 })
404 } else {
405 <P as TransactionPool>::Transaction::try_from_consensus(tx).ok()
406 }
407 })
408 .collect::<Vec<_>>();
409
410 let update = CanonicalStateUpdate {
412 new_tip: new_tip.sealed_block(),
413 pending_block_base_fee,
414 pending_block_blob_fee,
415 changed_accounts,
416 mined_transactions: new_blocks.transaction_hashes().collect(),
418 update_kind: PoolUpdateKind::Reorg,
419 };
420 pool.on_canonical_state_change(update);
421
422 metrics.inc_reinserted_transactions(pruned_old_transactions.len());
429 let _ = pool.add_external_transactions(pruned_old_transactions).await;
430
431 blob_store_tracker.add_new_chain_blocks(&new_blocks);
433 }
434 CanonStateNotification::Commit { new } => {
435 let (blocks, state) = new.inner();
436 let tip = blocks.tip();
437 let chain_spec = client.chain_spec();
438
439 let pending_block_base_fee = chain_spec
441 .next_block_base_fee(tip.header(), tip.timestamp())
442 .unwrap_or_default();
443 let pending_block_blob_fee = tip.header().maybe_next_block_blob_fee(
444 chain_spec.blob_params_at_timestamp(tip.timestamp()),
445 );
446
447 let first_block = blocks.first();
448 trace!(
449 target: "txpool",
450 first = first_block.number(),
451 tip = tip.number(),
452 pool_block = pool_info.last_seen_block_number,
453 "update pool on new commit"
454 );
455
456 let depth = tip.number().abs_diff(pool_info.last_seen_block_number);
459 if depth > max_update_depth {
460 maintained_state = MaintainedPoolState::Drifted;
461 debug!(target: "txpool", ?depth, "skipping deep canonical update");
462 let info = BlockInfo {
463 block_gas_limit: tip.header().gas_limit(),
464 last_seen_block_hash: tip.hash(),
465 last_seen_block_number: tip.number(),
466 pending_basefee: pending_block_base_fee,
467 pending_blob_fee: pending_block_blob_fee,
468 };
469 pool.set_block_info(info);
470
471 blob_store_tracker.add_new_chain_blocks(&blocks);
473
474 continue
475 }
476
477 let mut changed_accounts = Vec::with_capacity(state.state().len());
478 for acc in state.changed_accounts() {
479 dirty_addresses.remove(&acc.address);
481 changed_accounts.push(acc);
482 }
483
484 let mined_transactions = blocks.transaction_hashes().collect();
485
486 if first_block.parent_hash() != pool_info.last_seen_block_hash {
488 maintained_state = MaintainedPoolState::Drifted;
492 }
493
494 let update = CanonicalStateUpdate {
496 new_tip: tip.sealed_block(),
497 pending_block_base_fee,
498 pending_block_blob_fee,
499 changed_accounts,
500 mined_transactions,
501 update_kind: PoolUpdateKind::Commit,
502 };
503 pool.on_canonical_state_change(update);
504
505 blob_store_tracker.add_new_chain_blocks(&blocks);
507
508 if !chain_spec.is_osaka_active_at_timestamp(tip.timestamp()) &&
510 !chain_spec.is_osaka_active_at_timestamp(tip.timestamp().saturating_add(12)) &&
511 chain_spec.is_osaka_active_at_timestamp(tip.timestamp().saturating_add(24))
512 {
513 let pool = pool.clone();
514 let spawner = task_spawner.clone();
515 let client = client.clone();
516 task_spawner.spawn(Box::pin(async move {
517 tokio::time::sleep(Duration::from_secs(4)).await;
521
522 let mut interval = tokio::time::interval(Duration::from_secs(1));
523 loop {
524 let last_iteration =
527 client.latest_header().ok().flatten().is_none_or(|header| {
528 client
529 .chain_spec()
530 .is_osaka_active_at_timestamp(header.timestamp())
531 });
532
533 let AllPoolTransactions { pending, queued } = pool.all_transactions();
534 for tx in pending
535 .into_iter()
536 .chain(queued)
537 .filter(|tx| tx.transaction.is_eip4844())
538 {
539 let tx_hash = *tx.transaction.hash();
540
541 let Ok(Some(sidecar)) = pool.get_blob(tx_hash) else {
543 continue;
544 };
545 if !sidecar.is_eip4844() {
547 continue;
548 }
549 let Some(tx) = pool.remove_transactions(vec![tx_hash]).pop() else {
552 continue;
553 };
554 pool.delete_blob(tx_hash);
555
556 let BlobTransactionSidecarVariant::Eip4844(sidecar) =
557 Arc::unwrap_or_clone(sidecar)
558 else {
559 continue;
560 };
561
562 let converter = BlobSidecarConverter::new();
563 let pool = pool.clone();
564 spawner.spawn(Box::pin(async move {
565 let Some(sidecar) = converter.convert(sidecar).await else {
567 return;
568 };
569
570 let origin = tx.origin;
572 let Some(tx) = EthPoolTransaction::try_from_eip4844(
573 tx.transaction.clone_into_consensus(),
574 sidecar.into(),
575 ) else {
576 return;
577 };
578 let _ = pool.add_transaction(origin, tx).await;
579 }));
580 }
581
582 if last_iteration {
583 break;
584 }
585
586 interval.tick().await;
587 }
588 }));
589 }
590 }
591 }
592 }
593}
594
595struct FinalizedBlockTracker {
596 last_finalized_block: Option<BlockNumber>,
597}
598
599impl FinalizedBlockTracker {
600 const fn new(last_finalized_block: Option<BlockNumber>) -> Self {
601 Self { last_finalized_block }
602 }
603
604 fn update(&mut self, finalized_block: Option<BlockNumber>) -> Option<BlockNumber> {
606 let finalized = finalized_block?;
607 self.last_finalized_block
608 .replace(finalized)
609 .is_none_or(|last| last < finalized)
610 .then_some(finalized)
611 }
612}
613
614#[derive(Debug, PartialEq, Eq)]
617enum MaintainedPoolState {
618 InSync,
620 Drifted,
622}
623
624impl MaintainedPoolState {
625 #[inline]
627 const fn is_drifted(&self) -> bool {
628 matches!(self, Self::Drifted)
629 }
630}
631
632#[derive(Eq)]
634struct ChangedAccountEntry(ChangedAccount);
635
636impl PartialEq for ChangedAccountEntry {
637 fn eq(&self, other: &Self) -> bool {
638 self.0.address == other.0.address
639 }
640}
641
642impl Hash for ChangedAccountEntry {
643 fn hash<H: Hasher>(&self, state: &mut H) {
644 self.0.address.hash(state);
645 }
646}
647
648impl Borrow<Address> for ChangedAccountEntry {
649 fn borrow(&self) -> &Address {
650 &self.0.address
651 }
652}
653
654#[derive(Default)]
655struct LoadedAccounts {
656 accounts: Vec<ChangedAccount>,
658 failed_to_load: Vec<Address>,
660}
661
662fn load_accounts<Client, I>(
668 client: Client,
669 at: BlockHash,
670 addresses: I,
671) -> Result<LoadedAccounts, Box<(HashSet<Address>, ProviderError)>>
672where
673 I: IntoIterator<Item = Address>,
674 Client: StateProviderFactory,
675{
676 let addresses = addresses.into_iter();
677 let mut res = LoadedAccounts::default();
678 let state = match client.history_by_block_hash(at) {
679 Ok(state) => state,
680 Err(err) => return Err(Box::new((addresses.collect(), err))),
681 };
682 for addr in addresses {
683 if let Ok(maybe_acc) = state.basic_account(&addr) {
684 let acc = maybe_acc
685 .map(|acc| ChangedAccount { address: addr, nonce: acc.nonce, balance: acc.balance })
686 .unwrap_or_else(|| ChangedAccount::empty(addr));
687 res.accounts.push(acc)
688 } else {
689 res.failed_to_load.push(addr);
691 }
692 }
693 Ok(res)
694}
695
696async fn load_and_reinsert_transactions<P>(
700 pool: P,
701 file_path: &Path,
702) -> Result<(), TransactionsBackupError>
703where
704 P: TransactionPool<Transaction: PoolTransaction<Consensus: SignedTransaction>>,
705{
706 if !file_path.exists() {
707 return Ok(())
708 }
709
710 debug!(target: "txpool", txs_file =?file_path, "Check local persistent storage for saved transactions");
711 let data = reth_fs_util::read(file_path)?;
712
713 if data.is_empty() {
714 return Ok(())
715 }
716
717 let pool_transactions: Vec<(TransactionOrigin, <P as TransactionPool>::Transaction)> =
718 if let Ok(tx_backups) = serde_json::from_slice::<Vec<TxBackup>>(&data) {
719 tx_backups
720 .into_iter()
721 .filter_map(|backup| {
722 let tx_signed =
723 <P::Transaction as PoolTransaction>::Consensus::decode_2718_exact(
724 backup.rlp.as_ref(),
725 )
726 .ok()?;
727 let recovered = tx_signed.try_into_recovered().ok()?;
728 let pool_tx =
729 <P::Transaction as PoolTransaction>::try_from_consensus(recovered).ok()?;
730
731 Some((backup.origin, pool_tx))
732 })
733 .collect()
734 } else {
735 let txs_signed: Vec<<P::Transaction as PoolTransaction>::Consensus> =
736 alloy_rlp::Decodable::decode(&mut data.as_slice())?;
737
738 txs_signed
739 .into_iter()
740 .filter_map(|tx| tx.try_into_recovered().ok())
741 .filter_map(|tx| {
742 <P::Transaction as PoolTransaction>::try_from_consensus(tx)
743 .ok()
744 .map(|pool_tx| (TransactionOrigin::Local, pool_tx))
745 })
746 .collect()
747 };
748
749 let inserted = futures_util::future::join_all(
750 pool_transactions.into_iter().map(|(origin, tx)| pool.add_transaction(origin, tx)),
751 )
752 .await;
753
754 info!(target: "txpool", txs_file =?file_path, num_txs=%inserted.len(), "Successfully reinserted local transactions from file");
755 reth_fs_util::remove_file(file_path)?;
756 Ok(())
757}
758
759fn save_local_txs_backup<P>(pool: P, file_path: &Path)
760where
761 P: TransactionPool<Transaction: PoolTransaction<Consensus: Encodable>>,
762{
763 let local_transactions = pool.get_local_transactions();
764 if local_transactions.is_empty() {
765 trace!(target: "txpool", "no local transactions to save");
766 return
767 }
768
769 let local_transactions = local_transactions
770 .into_iter()
771 .map(|tx| {
772 let consensus_tx = tx.transaction.clone_into_consensus().into_inner();
773 let rlp_data = consensus_tx.encoded_2718();
774
775 TxBackup { rlp: rlp_data.into(), origin: tx.origin }
776 })
777 .collect::<Vec<_>>();
778
779 let json_data = match serde_json::to_string(&local_transactions) {
780 Ok(data) => data,
781 Err(err) => {
782 warn!(target: "txpool", %err, txs_file=?file_path, "failed to serialize local transactions to json");
783 return
784 }
785 };
786
787 info!(target: "txpool", txs_file =?file_path, num_txs=%local_transactions.len(), "Saving current local transactions");
788 let parent_dir = file_path.parent().map(std::fs::create_dir_all).transpose();
789
790 match parent_dir.map(|_| reth_fs_util::write(file_path, json_data)) {
791 Ok(_) => {
792 info!(target: "txpool", txs_file=?file_path, "Wrote local transactions to file");
793 }
794 Err(err) => {
795 warn!(target: "txpool", %err, txs_file=?file_path, "Failed to write local transactions to file");
796 }
797 }
798}
799
800#[derive(Debug, Deserialize, Serialize)]
803pub struct TxBackup {
804 pub rlp: Bytes,
806 pub origin: TransactionOrigin,
808}
809
810#[derive(thiserror::Error, Debug)]
812pub enum TransactionsBackupError {
813 #[error("failed to apply transactions backup. Encountered RLP decode error: {0}")]
815 Decode(#[from] alloy_rlp::Error),
816 #[error("failed to apply transactions backup. Encountered JSON decode error: {0}")]
818 Json(#[from] serde_json::Error),
819 #[error("failed to apply transactions backup. Encountered file error: {0}")]
821 FsPath(#[from] FsPathError),
822 #[error("failed to insert transactions to the transactions pool. Encountered pool error: {0}")]
824 Pool(#[from] PoolError),
825}
826
827pub async fn backup_local_transactions_task<P>(
830 shutdown: reth_tasks::shutdown::GracefulShutdown,
831 pool: P,
832 config: LocalTransactionBackupConfig,
833) where
834 P: TransactionPool<Transaction: PoolTransaction<Consensus: SignedTransaction>> + Clone,
835{
836 let Some(transactions_path) = config.transactions_path else {
837 return
839 };
840
841 if let Err(err) = load_and_reinsert_transactions(pool.clone(), &transactions_path).await {
842 error!(target: "txpool", "{}", err)
843 }
844
845 let graceful_guard = shutdown.await;
846
847 save_local_txs_backup(pool, &transactions_path);
849
850 drop(graceful_guard)
851}
852
853#[cfg(test)]
854mod tests {
855 use super::*;
856 use crate::{
857 blobstore::InMemoryBlobStore, validate::EthTransactionValidatorBuilder,
858 CoinbaseTipOrdering, EthPooledTransaction, Pool, TransactionOrigin,
859 };
860 use alloy_eips::eip2718::Decodable2718;
861 use alloy_primitives::{hex, U256};
862 use reth_ethereum_primitives::PooledTransactionVariant;
863 use reth_fs_util as fs;
864 use reth_provider::test_utils::{ExtendedAccount, MockEthProvider};
865 use reth_tasks::TaskManager;
866
867 #[test]
868 fn changed_acc_entry() {
869 let changed_acc = ChangedAccountEntry(ChangedAccount::empty(Address::random()));
870 let mut copy = changed_acc.0;
871 copy.nonce = 10;
872 assert!(changed_acc.eq(&ChangedAccountEntry(copy)));
873 }
874
875 const EXTENSION: &str = "json";
876 const FILENAME: &str = "test_transactions_backup";
877
878 #[tokio::test(flavor = "multi_thread")]
879 async fn test_save_local_txs_backup() {
880 let temp_dir = tempfile::tempdir().unwrap();
881 let transactions_path = temp_dir.path().join(FILENAME).with_extension(EXTENSION);
882 let tx_bytes = hex!(
883 "02f87201830655c2808505ef61f08482565f94388c818ca8b9251b393131c08a736a67ccb192978801049e39c4b5b1f580c001a01764ace353514e8abdfb92446de356b260e3c1225b73fc4c8876a6258d12a129a04f02294aa61ca7676061cd99f29275491218b4754b46a0248e5e42bc5091f507"
884 );
885 let tx = PooledTransactionVariant::decode_2718(&mut &tx_bytes[..]).unwrap();
886 let provider = MockEthProvider::default();
887 let transaction = EthPooledTransaction::from_pooled(tx.try_into_recovered().unwrap());
888 let tx_to_cmp = transaction.clone();
889 let sender = hex!("1f9090aaE28b8a3dCeaDf281B0F12828e676c326").into();
890 provider.add_account(sender, ExtendedAccount::new(42, U256::MAX));
891 let blob_store = InMemoryBlobStore::default();
892 let validator = EthTransactionValidatorBuilder::new(provider).build(blob_store.clone());
893
894 let txpool = Pool::new(
895 validator,
896 CoinbaseTipOrdering::default(),
897 blob_store.clone(),
898 Default::default(),
899 );
900
901 txpool.add_transaction(TransactionOrigin::Local, transaction.clone()).await.unwrap();
902
903 let handle = tokio::runtime::Handle::current();
904 let manager = TaskManager::new(handle);
905 let config = LocalTransactionBackupConfig::with_local_txs_backup(transactions_path.clone());
906 manager.executor().spawn_critical_with_graceful_shutdown_signal("test task", |shutdown| {
907 backup_local_transactions_task(shutdown, txpool.clone(), config)
908 });
909
910 let mut txns = txpool.get_local_transactions();
911 let tx_on_finish = txns.pop().expect("there should be 1 transaction");
912
913 assert_eq!(*tx_to_cmp.hash(), *tx_on_finish.hash());
914
915 manager.graceful_shutdown();
917
918 let data = fs::read(transactions_path).unwrap();
919
920 let txs: Vec<TxBackup> = serde_json::from_slice::<Vec<TxBackup>>(&data).unwrap();
921 assert_eq!(txs.len(), 1);
922
923 temp_dir.close().unwrap();
924 }
925
926 #[test]
927 fn test_update_with_higher_finalized_block() {
928 let mut tracker = FinalizedBlockTracker::new(Some(10));
929 assert_eq!(tracker.update(Some(15)), Some(15));
930 assert_eq!(tracker.last_finalized_block, Some(15));
931 }
932
933 #[test]
934 fn test_update_with_lower_finalized_block() {
935 let mut tracker = FinalizedBlockTracker::new(Some(20));
936 assert_eq!(tracker.update(Some(15)), None);
937 assert_eq!(tracker.last_finalized_block, Some(15));
938 }
939
940 #[test]
941 fn test_update_with_equal_finalized_block() {
942 let mut tracker = FinalizedBlockTracker::new(Some(20));
943 assert_eq!(tracker.update(Some(20)), None);
944 assert_eq!(tracker.last_finalized_block, Some(20));
945 }
946
947 #[test]
948 fn test_update_with_no_last_finalized_block() {
949 let mut tracker = FinalizedBlockTracker::new(None);
950 assert_eq!(tracker.update(Some(10)), Some(10));
951 assert_eq!(tracker.last_finalized_block, Some(10));
952 }
953
954 #[test]
955 fn test_update_with_no_new_finalized_block() {
956 let mut tracker = FinalizedBlockTracker::new(Some(10));
957 assert_eq!(tracker.update(None), None);
958 assert_eq!(tracker.last_finalized_block, Some(10));
959 }
960
961 #[test]
962 fn test_update_with_no_finalized_blocks() {
963 let mut tracker = FinalizedBlockTracker::new(None);
964 assert_eq!(tracker.update(None), None);
965 assert_eq!(tracker.last_finalized_block, None);
966 }
967}