1use crate::{
4 blobstore::{BlobSidecarConverter, BlobStoreCanonTracker, BlobStoreUpdates},
5 error::PoolError,
6 metrics::MaintainPoolMetrics,
7 traits::{CanonicalStateUpdate, EthPoolTransaction, TransactionPool, TransactionPoolExt},
8 AllPoolTransactions, BlobTransactionSidecarVariant, BlockInfo, PoolTransaction, PoolUpdateKind,
9 TransactionOrigin,
10};
11use alloy_consensus::{transaction::TxHashRef, BlockHeader, Typed2718};
12use alloy_eips::{BlockNumberOrTag, Decodable2718, Encodable2718};
13use alloy_primitives::{
14 map::{AddressSet, HashSet},
15 Address, BlockHash, BlockNumber, Bytes,
16};
17use alloy_rlp::Encodable;
18use futures_util::{
19 future::{BoxFuture, Fuse, FusedFuture},
20 FutureExt, Stream, StreamExt,
21};
22use reth_chain_state::CanonStateNotification;
23use reth_chainspec::{ChainSpecProvider, EthChainSpec, EthereumHardforks};
24use reth_execution_types::ChangedAccount;
25use reth_fs_util::FsPathError;
26use reth_primitives_traits::{
27 transaction::signed::SignedTransaction, NodePrimitives, SealedHeader,
28};
29use reth_storage_api::{errors::provider::ProviderError, BlockReaderIdExt, StateProviderFactory};
30use reth_tasks::TaskSpawner;
31use serde::{Deserialize, Serialize};
32use std::{
33 borrow::Borrow,
34 hash::{Hash, Hasher},
35 path::{Path, PathBuf},
36 sync::Arc,
37};
38use tokio::{
39 sync::oneshot,
40 time::{self, Duration},
41};
42use tracing::{debug, error, info, trace, warn};
43
44pub const MAX_QUEUED_TRANSACTION_LIFETIME: Duration = Duration::from_secs(3 * 60 * 60);
46
47#[derive(Debug, Clone, Copy, PartialEq, Eq)]
49pub struct MaintainPoolConfig {
50 pub max_update_depth: u64,
55 pub max_reload_accounts: usize,
59
60 pub max_tx_lifetime: Duration,
63
64 pub no_local_exemptions: bool,
70}
71
72impl Default for MaintainPoolConfig {
73 fn default() -> Self {
74 Self {
75 max_update_depth: 64,
76 max_reload_accounts: 100,
77 max_tx_lifetime: MAX_QUEUED_TRANSACTION_LIFETIME,
78 no_local_exemptions: false,
79 }
80 }
81}
82
83#[derive(Debug, Clone, Default)]
85pub struct LocalTransactionBackupConfig {
86 pub transactions_path: Option<PathBuf>,
88}
89
90impl LocalTransactionBackupConfig {
91 pub const fn with_local_txs_backup(transactions_path: PathBuf) -> Self {
93 Self { transactions_path: Some(transactions_path) }
94 }
95}
96
97pub fn maintain_transaction_pool_future<N, Client, P, St, Tasks>(
99 client: Client,
100 pool: P,
101 events: St,
102 task_spawner: Tasks,
103 config: MaintainPoolConfig,
104) -> BoxFuture<'static, ()>
105where
106 N: NodePrimitives,
107 Client: StateProviderFactory
108 + BlockReaderIdExt<Header = N::BlockHeader>
109 + ChainSpecProvider<ChainSpec: EthChainSpec<Header = N::BlockHeader> + EthereumHardforks>
110 + Clone
111 + 'static,
112 P: TransactionPoolExt<Transaction: PoolTransaction<Consensus = N::SignedTx>, Block = N::Block>
113 + 'static,
114 St: Stream<Item = CanonStateNotification<N>> + Send + Unpin + 'static,
115 Tasks: TaskSpawner + Clone + 'static,
116{
117 async move {
118 maintain_transaction_pool(client, pool, events, task_spawner, config).await;
119 }
120 .boxed()
121}
122
123pub async fn maintain_transaction_pool<N, Client, P, St, Tasks>(
127 client: Client,
128 pool: P,
129 mut events: St,
130 task_spawner: Tasks,
131 config: MaintainPoolConfig,
132) where
133 N: NodePrimitives,
134 Client: StateProviderFactory
135 + BlockReaderIdExt<Header = N::BlockHeader>
136 + ChainSpecProvider<ChainSpec: EthChainSpec<Header = N::BlockHeader> + EthereumHardforks>
137 + Clone
138 + 'static,
139 P: TransactionPoolExt<Transaction: PoolTransaction<Consensus = N::SignedTx>, Block = N::Block>
140 + 'static,
141 St: Stream<Item = CanonStateNotification<N>> + Send + Unpin + 'static,
142 Tasks: TaskSpawner + Clone + 'static,
143{
144 let metrics = MaintainPoolMetrics::default();
145 let MaintainPoolConfig { max_update_depth, max_reload_accounts, .. } = config;
146 if let Ok(Some(latest)) = client.header_by_number_or_tag(BlockNumberOrTag::Latest) {
148 let latest = SealedHeader::seal_slow(latest);
149 let chain_spec = client.chain_spec();
150 let info = BlockInfo {
151 block_gas_limit: latest.gas_limit(),
152 last_seen_block_hash: latest.hash(),
153 last_seen_block_number: latest.number(),
154 pending_basefee: chain_spec
155 .next_block_base_fee(latest.header(), latest.timestamp())
156 .unwrap_or_default(),
157 pending_blob_fee: latest
158 .maybe_next_block_blob_fee(chain_spec.blob_params_at_timestamp(latest.timestamp())),
159 };
160 pool.set_block_info(info);
161 }
162
163 let mut blob_store_tracker = BlobStoreCanonTracker::default();
165
166 let mut last_finalized_block =
168 FinalizedBlockTracker::new(client.finalized_block_number().ok().flatten());
169
170 let mut dirty_addresses = HashSet::default();
172
173 let mut maintained_state = MaintainedPoolState::InSync;
175
176 let mut reload_accounts_fut = Fuse::terminated();
178
179 let mut stale_eviction_interval = time::interval(config.max_tx_lifetime);
181
182 let mut first_event = true;
184
185 loop {
188 trace!(target: "txpool", state=?maintained_state, "awaiting new block or reorg");
189
190 metrics.set_dirty_accounts_len(dirty_addresses.len());
191 let pool_info = pool.block_info();
192
193 if maintained_state.is_drifted() {
197 metrics.inc_drift();
198 dirty_addresses = pool.unique_senders();
200 maintained_state = MaintainedPoolState::InSync;
202 }
203
204 if !dirty_addresses.is_empty() && reload_accounts_fut.is_terminated() {
206 let (tx, rx) = oneshot::channel();
207 let c = client.clone();
208 let at = pool_info.last_seen_block_hash;
209 let fut = if dirty_addresses.len() > max_reload_accounts {
210 let accs_to_reload =
212 dirty_addresses.iter().copied().take(max_reload_accounts).collect::<Vec<_>>();
213 for acc in &accs_to_reload {
214 dirty_addresses.remove(acc);
216 }
217 async move {
218 let res = load_accounts(c, at, accs_to_reload.into_iter());
219 let _ = tx.send(res);
220 }
221 .boxed()
222 } else {
223 let accs_to_reload = std::mem::take(&mut dirty_addresses);
225 async move {
226 let res = load_accounts(c, at, accs_to_reload.into_iter());
227 let _ = tx.send(res);
228 }
229 .boxed()
230 };
231 reload_accounts_fut = rx.fuse();
232 task_spawner.spawn_blocking_task(fut);
233 }
234
235 if let Some(finalized) =
237 last_finalized_block.update(client.finalized_block_number().ok().flatten()) &&
238 let BlobStoreUpdates::Finalized(blobs) =
239 blob_store_tracker.on_finalized_block(finalized)
240 {
241 metrics.inc_deleted_tracked_blobs(blobs.len());
242 pool.delete_blobs(blobs);
244 let pool = pool.clone();
246 task_spawner.spawn_blocking_task(Box::pin(async move {
247 debug!(target: "txpool", finalized_block = %finalized, "cleaning up blob store");
248 pool.cleanup_blobs();
249 }));
250 }
251
252 let mut event = None;
254 let mut reloaded = None;
255
256 tokio::select! {
259 res = &mut reload_accounts_fut => {
260 reloaded = Some(res);
261 }
262 ev = events.next() => {
263 if ev.is_none() {
264 break;
266 }
267 event = ev;
268 if first_event {
271 maintained_state = MaintainedPoolState::Drifted;
272 first_event = false
273 }
274 }
275 _ = stale_eviction_interval.tick() => {
276 let queued = pool
277 .queued_transactions();
278 let mut stale_blobs = Vec::new();
279 let now = std::time::Instant::now();
280 let stale_txs: Vec<_> = queued
281 .into_iter()
282 .filter(|tx| {
283 (tx.origin.is_external() || config.no_local_exemptions) && now - tx.timestamp > config.max_tx_lifetime
285 })
286 .map(|tx| {
287 if tx.is_eip4844() {
288 stale_blobs.push(*tx.hash());
289 }
290 *tx.hash()
291 })
292 .collect();
293 debug!(target: "txpool", count=%stale_txs.len(), "removing stale transactions");
294 pool.remove_transactions(stale_txs);
295 pool.delete_blobs(stale_blobs);
296 }
297 }
298 match reloaded {
300 Some(Ok(Ok(LoadedAccounts { accounts, failed_to_load }))) => {
301 dirty_addresses.extend(failed_to_load);
304 pool.update_accounts(accounts);
306 }
307 Some(Ok(Err(res))) => {
308 let (accs, err) = *res;
310 debug!(target: "txpool", %err, "failed to load accounts");
311 dirty_addresses.extend(accs);
312 }
313 Some(Err(_)) => {
314 maintained_state = MaintainedPoolState::Drifted;
316 }
317 None => {}
318 }
319
320 let Some(event) = event else { continue };
322 match event {
323 CanonStateNotification::Reorg { old, new } => {
324 let (old_blocks, old_state) = old.inner();
325 let (new_blocks, new_state) = new.inner();
326 let new_tip = new_blocks.tip();
327 let new_first = new_blocks.first();
328 let old_first = old_blocks.first();
329
330 if !(old_first.parent_hash() == pool_info.last_seen_block_hash ||
332 new_first.parent_hash() == pool_info.last_seen_block_hash)
333 {
334 maintained_state = MaintainedPoolState::Drifted;
336 }
337
338 let chain_spec = client.chain_spec();
339
340 let pending_block_base_fee = chain_spec
342 .next_block_base_fee(new_tip.header(), new_tip.timestamp())
343 .unwrap_or_default();
344 let pending_block_blob_fee = new_tip.header().maybe_next_block_blob_fee(
345 chain_spec.blob_params_at_timestamp(new_tip.timestamp()),
346 );
347
348 let new_changed_accounts: HashSet<_> =
350 new_state.changed_accounts().map(ChangedAccountEntry).collect();
351
352 let missing_changed_acc = old_state
354 .accounts_iter()
355 .map(|(a, _)| a)
356 .filter(|addr| !new_changed_accounts.contains(addr));
357
358 let mut changed_accounts =
360 match load_accounts(client.clone(), new_tip.hash(), missing_changed_acc) {
361 Ok(LoadedAccounts { accounts, failed_to_load }) => {
362 dirty_addresses.extend(failed_to_load);
364
365 accounts
366 }
367 Err(err) => {
368 let (addresses, err) = *err;
369 debug!(
370 target: "txpool",
371 %err,
372 "failed to load missing changed accounts at new tip: {:?}",
373 new_tip.hash()
374 );
375 dirty_addresses.extend(addresses);
376 vec![]
377 }
378 };
379
380 changed_accounts.extend(new_changed_accounts.into_iter().map(|entry| entry.0));
383
384 let new_mined_transactions: HashSet<_> = new_blocks.transaction_hashes().collect();
386
387 let pruned_old_transactions = old_blocks
390 .transactions_ecrecovered()
391 .filter(|tx| !new_mined_transactions.contains(tx.tx_hash()))
392 .filter_map(|tx| {
393 if tx.is_eip4844() {
394 pool.get_blob(*tx.tx_hash())
400 .ok()
401 .flatten()
402 .map(Arc::unwrap_or_clone)
403 .and_then(|sidecar| {
404 <P as TransactionPool>::Transaction::try_from_eip4844(
405 tx, sidecar,
406 )
407 })
408 } else {
409 <P as TransactionPool>::Transaction::try_from_consensus(tx).ok()
410 }
411 })
412 .collect::<Vec<_>>();
413
414 let update = CanonicalStateUpdate {
416 new_tip: new_tip.sealed_block(),
417 pending_block_base_fee,
418 pending_block_blob_fee,
419 changed_accounts,
420 mined_transactions: new_blocks.transaction_hashes().collect(),
422 update_kind: PoolUpdateKind::Reorg,
423 };
424 pool.on_canonical_state_change(update);
425
426 metrics.inc_reinserted_transactions(pruned_old_transactions.len());
433 let _ = pool.add_external_transactions(pruned_old_transactions).await;
434
435 blob_store_tracker.add_new_chain_blocks(&new_blocks);
437 }
438 CanonStateNotification::Commit { new } => {
439 let (blocks, state) = new.inner();
440 let tip = blocks.tip();
441 let chain_spec = client.chain_spec();
442
443 let pending_block_base_fee = chain_spec
445 .next_block_base_fee(tip.header(), tip.timestamp())
446 .unwrap_or_default();
447 let pending_block_blob_fee = tip.header().maybe_next_block_blob_fee(
448 chain_spec.blob_params_at_timestamp(tip.timestamp()),
449 );
450
451 let first_block = blocks.first();
452 trace!(
453 target: "txpool",
454 first = first_block.number(),
455 tip = tip.number(),
456 pool_block = pool_info.last_seen_block_number,
457 "update pool on new commit"
458 );
459
460 let depth = tip.number().abs_diff(pool_info.last_seen_block_number);
463 if depth > max_update_depth {
464 maintained_state = MaintainedPoolState::Drifted;
465 debug!(target: "txpool", ?depth, "skipping deep canonical update");
466 let info = BlockInfo {
467 block_gas_limit: tip.header().gas_limit(),
468 last_seen_block_hash: tip.hash(),
469 last_seen_block_number: tip.number(),
470 pending_basefee: pending_block_base_fee,
471 pending_blob_fee: pending_block_blob_fee,
472 };
473 pool.set_block_info(info);
474
475 blob_store_tracker.add_new_chain_blocks(&blocks);
477
478 continue
479 }
480
481 let mut changed_accounts = Vec::with_capacity(state.state().len());
482 for acc in state.changed_accounts() {
483 dirty_addresses.remove(&acc.address);
485 changed_accounts.push(acc);
486 }
487
488 let mined_transactions = blocks.transaction_hashes().collect();
489
490 if first_block.parent_hash() != pool_info.last_seen_block_hash {
492 maintained_state = MaintainedPoolState::Drifted;
496 }
497
498 let update = CanonicalStateUpdate {
500 new_tip: tip.sealed_block(),
501 pending_block_base_fee,
502 pending_block_blob_fee,
503 changed_accounts,
504 mined_transactions,
505 update_kind: PoolUpdateKind::Commit,
506 };
507 pool.on_canonical_state_change(update);
508
509 blob_store_tracker.add_new_chain_blocks(&blocks);
511
512 if !chain_spec.is_osaka_active_at_timestamp(tip.timestamp()) &&
514 !chain_spec.is_osaka_active_at_timestamp(tip.timestamp().saturating_add(12)) &&
515 chain_spec.is_osaka_active_at_timestamp(tip.timestamp().saturating_add(24))
516 {
517 let pool = pool.clone();
518 let spawner = task_spawner.clone();
519 let client = client.clone();
520 task_spawner.spawn_task(Box::pin(async move {
521 tokio::time::sleep(Duration::from_secs(4)).await;
525
526 let mut interval = tokio::time::interval(Duration::from_secs(1));
527 loop {
528 let last_iteration =
531 client.latest_header().ok().flatten().is_none_or(|header| {
532 client
533 .chain_spec()
534 .is_osaka_active_at_timestamp(header.timestamp())
535 });
536
537 let AllPoolTransactions { pending, queued } = pool.all_transactions();
538 for tx in pending
539 .into_iter()
540 .chain(queued)
541 .filter(|tx| tx.transaction.is_eip4844())
542 {
543 let tx_hash = *tx.transaction.hash();
544
545 let Ok(Some(sidecar)) = pool.get_blob(tx_hash) else {
547 continue;
548 };
549 if !sidecar.is_eip4844() {
551 continue;
552 }
553 let Some(tx) = pool.remove_transactions(vec![tx_hash]).pop() else {
556 continue;
557 };
558 pool.delete_blob(tx_hash);
559
560 let BlobTransactionSidecarVariant::Eip4844(sidecar) =
561 Arc::unwrap_or_clone(sidecar)
562 else {
563 continue;
564 };
565
566 let converter = BlobSidecarConverter::new();
567 let pool = pool.clone();
568 spawner.spawn_task(Box::pin(async move {
569 let Some(sidecar) = converter.convert(sidecar).await else {
571 return;
572 };
573
574 let origin = tx.origin;
576 let Some(tx) = EthPoolTransaction::try_from_eip4844(
577 tx.transaction.clone_into_consensus(),
578 sidecar.into(),
579 ) else {
580 return;
581 };
582 let _ = pool.add_transaction(origin, tx).await;
583 }));
584 }
585
586 if last_iteration {
587 break;
588 }
589
590 interval.tick().await;
591 }
592 }));
593 }
594 }
595 }
596 }
597}
598
599struct FinalizedBlockTracker {
600 last_finalized_block: Option<BlockNumber>,
601}
602
603impl FinalizedBlockTracker {
604 const fn new(last_finalized_block: Option<BlockNumber>) -> Self {
605 Self { last_finalized_block }
606 }
607
608 fn update(&mut self, finalized_block: Option<BlockNumber>) -> Option<BlockNumber> {
610 let finalized = finalized_block?;
611 self.last_finalized_block.is_none_or(|last| last < finalized).then(|| {
612 self.last_finalized_block = Some(finalized);
613 finalized
614 })
615 }
616}
617
618#[derive(Debug, PartialEq, Eq)]
621enum MaintainedPoolState {
622 InSync,
624 Drifted,
626}
627
628impl MaintainedPoolState {
629 #[inline]
631 const fn is_drifted(&self) -> bool {
632 matches!(self, Self::Drifted)
633 }
634}
635
636#[derive(Eq)]
638struct ChangedAccountEntry(ChangedAccount);
639
640impl PartialEq for ChangedAccountEntry {
641 fn eq(&self, other: &Self) -> bool {
642 self.0.address == other.0.address
643 }
644}
645
646impl Hash for ChangedAccountEntry {
647 fn hash<H: Hasher>(&self, state: &mut H) {
648 self.0.address.hash(state);
649 }
650}
651
652impl Borrow<Address> for ChangedAccountEntry {
653 fn borrow(&self) -> &Address {
654 &self.0.address
655 }
656}
657
658#[derive(Default)]
659struct LoadedAccounts {
660 accounts: Vec<ChangedAccount>,
662 failed_to_load: Vec<Address>,
664}
665
666fn load_accounts<Client, I>(
672 client: Client,
673 at: BlockHash,
674 addresses: I,
675) -> Result<LoadedAccounts, Box<(AddressSet, ProviderError)>>
676where
677 I: IntoIterator<Item = Address>,
678 Client: StateProviderFactory,
679{
680 let addresses = addresses.into_iter();
681 let mut res = LoadedAccounts::default();
682 let state = match client.history_by_block_hash(at) {
683 Ok(state) => state,
684 Err(err) => return Err(Box::new((addresses.collect(), err))),
685 };
686 for addr in addresses {
687 if let Ok(maybe_acc) = state.basic_account(&addr) {
688 let acc = maybe_acc
689 .map(|acc| ChangedAccount { address: addr, nonce: acc.nonce, balance: acc.balance })
690 .unwrap_or_else(|| ChangedAccount::empty(addr));
691 res.accounts.push(acc)
692 } else {
693 res.failed_to_load.push(addr);
695 }
696 }
697 Ok(res)
698}
699
700async fn load_and_reinsert_transactions<P>(
704 pool: P,
705 file_path: &Path,
706) -> Result<(), TransactionsBackupError>
707where
708 P: TransactionPool<Transaction: PoolTransaction<Consensus: SignedTransaction>>,
709{
710 if !file_path.exists() {
711 return Ok(())
712 }
713
714 debug!(target: "txpool", txs_file =?file_path, "Check local persistent storage for saved transactions");
715 let data = reth_fs_util::read(file_path)?;
716
717 if data.is_empty() {
718 return Ok(())
719 }
720
721 let pool_transactions: Vec<(TransactionOrigin, <P as TransactionPool>::Transaction)> =
722 if let Ok(tx_backups) = serde_json::from_slice::<Vec<TxBackup>>(&data) {
723 tx_backups
724 .into_iter()
725 .filter_map(|backup| {
726 let tx_signed =
727 <P::Transaction as PoolTransaction>::Consensus::decode_2718_exact(
728 backup.rlp.as_ref(),
729 )
730 .ok()?;
731 let recovered = tx_signed.try_into_recovered().ok()?;
732 let pool_tx =
733 <P::Transaction as PoolTransaction>::try_from_consensus(recovered).ok()?;
734
735 Some((backup.origin, pool_tx))
736 })
737 .collect()
738 } else {
739 let txs_signed: Vec<<P::Transaction as PoolTransaction>::Consensus> =
740 alloy_rlp::Decodable::decode(&mut data.as_slice())?;
741
742 txs_signed
743 .into_iter()
744 .filter_map(|tx| tx.try_into_recovered().ok())
745 .filter_map(|tx| {
746 <P::Transaction as PoolTransaction>::try_from_consensus(tx)
747 .ok()
748 .map(|pool_tx| (TransactionOrigin::Local, pool_tx))
749 })
750 .collect()
751 };
752
753 let inserted = futures_util::future::join_all(
754 pool_transactions.into_iter().map(|(origin, tx)| pool.add_transaction(origin, tx)),
755 )
756 .await;
757
758 info!(target: "txpool", txs_file =?file_path, num_txs=%inserted.len(), "Successfully reinserted local transactions from file");
759 reth_fs_util::remove_file(file_path)?;
760 Ok(())
761}
762
763fn save_local_txs_backup<P>(pool: P, file_path: &Path)
764where
765 P: TransactionPool<Transaction: PoolTransaction<Consensus: Encodable>>,
766{
767 let local_transactions = pool.get_local_transactions();
768 if local_transactions.is_empty() {
769 trace!(target: "txpool", "no local transactions to save");
770 return
771 }
772
773 let local_transactions = local_transactions
774 .into_iter()
775 .map(|tx| {
776 let consensus_tx = tx.transaction.clone_into_consensus().into_inner();
777 let rlp_data = consensus_tx.encoded_2718();
778
779 TxBackup { rlp: rlp_data.into(), origin: tx.origin }
780 })
781 .collect::<Vec<_>>();
782
783 let json_data = match serde_json::to_string(&local_transactions) {
784 Ok(data) => data,
785 Err(err) => {
786 warn!(target: "txpool", %err, txs_file=?file_path, "failed to serialize local transactions to json");
787 return
788 }
789 };
790
791 info!(target: "txpool", txs_file =?file_path, num_txs=%local_transactions.len(), "Saving current local transactions");
792 let parent_dir = file_path.parent().map(std::fs::create_dir_all).transpose();
793
794 match parent_dir.map(|_| reth_fs_util::write(file_path, json_data)) {
795 Ok(_) => {
796 info!(target: "txpool", txs_file=?file_path, "Wrote local transactions to file");
797 }
798 Err(err) => {
799 warn!(target: "txpool", %err, txs_file=?file_path, "Failed to write local transactions to file");
800 }
801 }
802}
803
804#[derive(Debug, Deserialize, Serialize)]
807pub struct TxBackup {
808 pub rlp: Bytes,
810 pub origin: TransactionOrigin,
812}
813
814#[derive(thiserror::Error, Debug)]
816pub enum TransactionsBackupError {
817 #[error("failed to apply transactions backup. Encountered RLP decode error: {0}")]
819 Decode(#[from] alloy_rlp::Error),
820 #[error("failed to apply transactions backup. Encountered JSON decode error: {0}")]
822 Json(#[from] serde_json::Error),
823 #[error("failed to apply transactions backup. Encountered file error: {0}")]
825 FsPath(#[from] FsPathError),
826 #[error("failed to insert transactions to the transactions pool. Encountered pool error: {0}")]
828 Pool(#[from] PoolError),
829}
830
831pub async fn backup_local_transactions_task<P>(
834 shutdown: reth_tasks::shutdown::GracefulShutdown,
835 pool: P,
836 config: LocalTransactionBackupConfig,
837) where
838 P: TransactionPool<Transaction: PoolTransaction<Consensus: SignedTransaction>> + Clone,
839{
840 let Some(transactions_path) = config.transactions_path else {
841 return
843 };
844
845 if let Err(err) = load_and_reinsert_transactions(pool.clone(), &transactions_path).await {
846 error!(target: "txpool", "{}", err)
847 }
848
849 let graceful_guard = shutdown.await;
850
851 save_local_txs_backup(pool, &transactions_path);
853
854 drop(graceful_guard)
855}
856
857#[cfg(test)]
858mod tests {
859 use super::*;
860 use crate::{
861 blobstore::InMemoryBlobStore, validate::EthTransactionValidatorBuilder,
862 CoinbaseTipOrdering, EthPooledTransaction, Pool, TransactionOrigin,
863 };
864 use alloy_eips::eip2718::Decodable2718;
865 use alloy_primitives::{hex, U256};
866 use reth_ethereum_primitives::PooledTransactionVariant;
867 use reth_evm_ethereum::EthEvmConfig;
868 use reth_fs_util as fs;
869 use reth_provider::test_utils::{ExtendedAccount, MockEthProvider};
870 use reth_tasks::Runtime;
871
872 #[test]
873 fn changed_acc_entry() {
874 let changed_acc = ChangedAccountEntry(ChangedAccount::empty(Address::random()));
875 let mut copy = changed_acc.0;
876 copy.nonce = 10;
877 assert!(changed_acc.eq(&ChangedAccountEntry(copy)));
878 }
879
880 const EXTENSION: &str = "json";
881 const FILENAME: &str = "test_transactions_backup";
882
883 #[tokio::test(flavor = "multi_thread")]
884 async fn test_save_local_txs_backup() {
885 let temp_dir = tempfile::tempdir().unwrap();
886 let transactions_path = temp_dir.path().join(FILENAME).with_extension(EXTENSION);
887 let tx_bytes = hex!(
888 "02f87201830655c2808505ef61f08482565f94388c818ca8b9251b393131c08a736a67ccb192978801049e39c4b5b1f580c001a01764ace353514e8abdfb92446de356b260e3c1225b73fc4c8876a6258d12a129a04f02294aa61ca7676061cd99f29275491218b4754b46a0248e5e42bc5091f507"
889 );
890 let tx = PooledTransactionVariant::decode_2718(&mut &tx_bytes[..]).unwrap();
891 let provider = MockEthProvider::default().with_genesis_block();
892 let transaction = EthPooledTransaction::from_pooled(tx.try_into_recovered().unwrap());
893 let tx_to_cmp = transaction.clone();
894 let sender = hex!("1f9090aaE28b8a3dCeaDf281B0F12828e676c326").into();
895 provider.add_account(sender, ExtendedAccount::new(42, U256::MAX));
896 let blob_store = InMemoryBlobStore::default();
897 let validator = EthTransactionValidatorBuilder::new(provider, EthEvmConfig::mainnet())
898 .build(blob_store.clone());
899
900 let txpool = Pool::new(
901 validator,
902 CoinbaseTipOrdering::default(),
903 blob_store.clone(),
904 Default::default(),
905 );
906
907 txpool.add_transaction(TransactionOrigin::Local, transaction.clone()).await.unwrap();
908
909 let rt = Runtime::with_existing_handle(tokio::runtime::Handle::current()).unwrap();
910 let config = LocalTransactionBackupConfig::with_local_txs_backup(transactions_path.clone());
911 rt.spawn_critical_with_graceful_shutdown_signal("test task", |shutdown| {
912 backup_local_transactions_task(shutdown, txpool.clone(), config)
913 });
914
915 let mut txns = txpool.get_local_transactions();
916 let tx_on_finish = txns.pop().expect("there should be 1 transaction");
917
918 assert_eq!(*tx_to_cmp.hash(), *tx_on_finish.hash());
919
920 rt.graceful_shutdown();
921
922 let data = fs::read(transactions_path).unwrap();
923
924 let txs: Vec<TxBackup> = serde_json::from_slice::<Vec<TxBackup>>(&data).unwrap();
925 assert_eq!(txs.len(), 1);
926
927 temp_dir.close().unwrap();
928 }
929
930 #[test]
931 fn test_update_with_higher_finalized_block() {
932 let mut tracker = FinalizedBlockTracker::new(Some(10));
933 assert_eq!(tracker.update(Some(15)), Some(15));
934 assert_eq!(tracker.last_finalized_block, Some(15));
935 }
936
937 #[test]
938 fn test_update_with_lower_finalized_block() {
939 let mut tracker = FinalizedBlockTracker::new(Some(20));
940 assert_eq!(tracker.update(Some(15)), None);
941 assert_eq!(tracker.last_finalized_block, Some(20));
943 }
944
945 #[test]
946 fn test_update_with_equal_finalized_block() {
947 let mut tracker = FinalizedBlockTracker::new(Some(20));
948 assert_eq!(tracker.update(Some(20)), None);
949 assert_eq!(tracker.last_finalized_block, Some(20));
950 }
951
952 #[test]
953 fn test_update_with_no_last_finalized_block() {
954 let mut tracker = FinalizedBlockTracker::new(None);
955 assert_eq!(tracker.update(Some(10)), Some(10));
956 assert_eq!(tracker.last_finalized_block, Some(10));
957 }
958
959 #[test]
960 fn test_update_with_no_new_finalized_block() {
961 let mut tracker = FinalizedBlockTracker::new(Some(10));
962 assert_eq!(tracker.update(None), None);
963 assert_eq!(tracker.last_finalized_block, Some(10));
964 }
965
966 #[test]
967 fn test_update_with_no_finalized_blocks() {
968 let mut tracker = FinalizedBlockTracker::new(None);
969 assert_eq!(tracker.update(None), None);
970 assert_eq!(tracker.last_finalized_block, None);
971 }
972}