1use crate::{
4 blobstore::{BlobSidecarConverter, BlobStoreCanonTracker, BlobStoreUpdates},
5 error::PoolError,
6 metrics::MaintainPoolMetrics,
7 traits::{CanonicalStateUpdate, EthPoolTransaction, TransactionPool, TransactionPoolExt},
8 AllPoolTransactions, BlobTransactionSidecarVariant, BlockInfo, PoolTransaction, PoolUpdateKind,
9 TransactionOrigin,
10};
11use alloy_consensus::{transaction::TxHashRef, BlockHeader, Typed2718};
12use alloy_eips::{BlockNumberOrTag, Decodable2718, Encodable2718};
13use alloy_primitives::{Address, BlockHash, BlockNumber, Bytes};
14use alloy_rlp::Encodable;
15use futures_util::{
16 future::{BoxFuture, Fuse, FusedFuture},
17 FutureExt, Stream, StreamExt,
18};
19use reth_chain_state::CanonStateNotification;
20use reth_chainspec::{ChainSpecProvider, EthChainSpec, EthereumHardforks};
21use reth_execution_types::ChangedAccount;
22use reth_fs_util::FsPathError;
23use reth_primitives_traits::{
24 transaction::signed::SignedTransaction, NodePrimitives, SealedHeader,
25};
26use reth_storage_api::{errors::provider::ProviderError, BlockReaderIdExt, StateProviderFactory};
27use reth_tasks::TaskSpawner;
28use serde::{Deserialize, Serialize};
29use std::{
30 borrow::Borrow,
31 collections::HashSet,
32 hash::{Hash, Hasher},
33 path::{Path, PathBuf},
34 sync::Arc,
35};
36use tokio::{
37 sync::oneshot,
38 time::{self, Duration},
39};
40use tracing::{debug, error, info, trace, warn};
41
42pub const MAX_QUEUED_TRANSACTION_LIFETIME: Duration = Duration::from_secs(3 * 60 * 60);
44
45#[derive(Debug, Clone, Copy, PartialEq, Eq)]
47pub struct MaintainPoolConfig {
48 pub max_update_depth: u64,
53 pub max_reload_accounts: usize,
57
58 pub max_tx_lifetime: Duration,
61
62 pub no_local_exemptions: bool,
68}
69
70impl Default for MaintainPoolConfig {
71 fn default() -> Self {
72 Self {
73 max_update_depth: 64,
74 max_reload_accounts: 100,
75 max_tx_lifetime: MAX_QUEUED_TRANSACTION_LIFETIME,
76 no_local_exemptions: false,
77 }
78 }
79}
80
81#[derive(Debug, Clone, Default)]
83pub struct LocalTransactionBackupConfig {
84 pub transactions_path: Option<PathBuf>,
86}
87
88impl LocalTransactionBackupConfig {
89 pub const fn with_local_txs_backup(transactions_path: PathBuf) -> Self {
91 Self { transactions_path: Some(transactions_path) }
92 }
93}
94
95pub fn maintain_transaction_pool_future<N, Client, P, St, Tasks>(
97 client: Client,
98 pool: P,
99 events: St,
100 task_spawner: Tasks,
101 config: MaintainPoolConfig,
102) -> BoxFuture<'static, ()>
103where
104 N: NodePrimitives,
105 Client: StateProviderFactory
106 + BlockReaderIdExt<Header = N::BlockHeader>
107 + ChainSpecProvider<ChainSpec: EthChainSpec<Header = N::BlockHeader> + EthereumHardforks>
108 + Clone
109 + 'static,
110 P: TransactionPoolExt<Transaction: PoolTransaction<Consensus = N::SignedTx>, Block = N::Block>
111 + 'static,
112 St: Stream<Item = CanonStateNotification<N>> + Send + Unpin + 'static,
113 Tasks: TaskSpawner + Clone + 'static,
114{
115 async move {
116 maintain_transaction_pool(client, pool, events, task_spawner, config).await;
117 }
118 .boxed()
119}
120
121pub async fn maintain_transaction_pool<N, Client, P, St, Tasks>(
125 client: Client,
126 pool: P,
127 mut events: St,
128 task_spawner: Tasks,
129 config: MaintainPoolConfig,
130) where
131 N: NodePrimitives,
132 Client: StateProviderFactory
133 + BlockReaderIdExt<Header = N::BlockHeader>
134 + ChainSpecProvider<ChainSpec: EthChainSpec<Header = N::BlockHeader> + EthereumHardforks>
135 + Clone
136 + 'static,
137 P: TransactionPoolExt<Transaction: PoolTransaction<Consensus = N::SignedTx>, Block = N::Block>
138 + 'static,
139 St: Stream<Item = CanonStateNotification<N>> + Send + Unpin + 'static,
140 Tasks: TaskSpawner + Clone + 'static,
141{
142 let metrics = MaintainPoolMetrics::default();
143 let MaintainPoolConfig { max_update_depth, max_reload_accounts, .. } = config;
144 if let Ok(Some(latest)) = client.header_by_number_or_tag(BlockNumberOrTag::Latest) {
146 let latest = SealedHeader::seal_slow(latest);
147 let chain_spec = client.chain_spec();
148 let info = BlockInfo {
149 block_gas_limit: latest.gas_limit(),
150 last_seen_block_hash: latest.hash(),
151 last_seen_block_number: latest.number(),
152 pending_basefee: chain_spec
153 .next_block_base_fee(latest.header(), latest.timestamp())
154 .unwrap_or_default(),
155 pending_blob_fee: latest
156 .maybe_next_block_blob_fee(chain_spec.blob_params_at_timestamp(latest.timestamp())),
157 };
158 pool.set_block_info(info);
159 }
160
161 let mut blob_store_tracker = BlobStoreCanonTracker::default();
163
164 let mut last_finalized_block =
166 FinalizedBlockTracker::new(client.finalized_block_number().ok().flatten());
167
168 let mut dirty_addresses = HashSet::default();
170
171 let mut maintained_state = MaintainedPoolState::InSync;
173
174 let mut reload_accounts_fut = Fuse::terminated();
176
177 let mut stale_eviction_interval = time::interval(config.max_tx_lifetime);
179
180 let mut first_event = true;
182
183 loop {
186 trace!(target: "txpool", state=?maintained_state, "awaiting new block or reorg");
187
188 metrics.set_dirty_accounts_len(dirty_addresses.len());
189 let pool_info = pool.block_info();
190
191 if maintained_state.is_drifted() {
195 metrics.inc_drift();
196 dirty_addresses = pool.unique_senders();
198 maintained_state = MaintainedPoolState::InSync;
200 }
201
202 if !dirty_addresses.is_empty() && reload_accounts_fut.is_terminated() {
204 let (tx, rx) = oneshot::channel();
205 let c = client.clone();
206 let at = pool_info.last_seen_block_hash;
207 let fut = if dirty_addresses.len() > max_reload_accounts {
208 let accs_to_reload =
210 dirty_addresses.iter().copied().take(max_reload_accounts).collect::<Vec<_>>();
211 for acc in &accs_to_reload {
212 dirty_addresses.remove(acc);
214 }
215 async move {
216 let res = load_accounts(c, at, accs_to_reload.into_iter());
217 let _ = tx.send(res);
218 }
219 .boxed()
220 } else {
221 let accs_to_reload = std::mem::take(&mut dirty_addresses);
223 async move {
224 let res = load_accounts(c, at, accs_to_reload.into_iter());
225 let _ = tx.send(res);
226 }
227 .boxed()
228 };
229 reload_accounts_fut = rx.fuse();
230 task_spawner.spawn_blocking(fut);
231 }
232
233 if let Some(finalized) =
235 last_finalized_block.update(client.finalized_block_number().ok().flatten()) &&
236 let BlobStoreUpdates::Finalized(blobs) =
237 blob_store_tracker.on_finalized_block(finalized)
238 {
239 metrics.inc_deleted_tracked_blobs(blobs.len());
240 pool.delete_blobs(blobs);
242 let pool = pool.clone();
244 task_spawner.spawn_blocking(Box::pin(async move {
245 debug!(target: "txpool", finalized_block = %finalized, "cleaning up blob store");
246 pool.cleanup_blobs();
247 }));
248 }
249
250 let mut event = None;
252 let mut reloaded = None;
253
254 tokio::select! {
257 res = &mut reload_accounts_fut => {
258 reloaded = Some(res);
259 }
260 ev = events.next() => {
261 if ev.is_none() {
262 break;
264 }
265 event = ev;
266 if first_event {
269 maintained_state = MaintainedPoolState::Drifted;
270 first_event = false
271 }
272 }
273 _ = stale_eviction_interval.tick() => {
274 let queued = pool
275 .queued_transactions();
276 let mut stale_blobs = Vec::new();
277 let now = std::time::Instant::now();
278 let stale_txs: Vec<_> = queued
279 .into_iter()
280 .filter(|tx| {
281 (tx.origin.is_external() || config.no_local_exemptions) && now - tx.timestamp > config.max_tx_lifetime
283 })
284 .map(|tx| {
285 if tx.is_eip4844() {
286 stale_blobs.push(*tx.hash());
287 }
288 *tx.hash()
289 })
290 .collect();
291 debug!(target: "txpool", count=%stale_txs.len(), "removing stale transactions");
292 pool.remove_transactions(stale_txs);
293 pool.delete_blobs(stale_blobs);
294 }
295 }
296 match reloaded {
298 Some(Ok(Ok(LoadedAccounts { accounts, failed_to_load }))) => {
299 dirty_addresses.extend(failed_to_load);
302 pool.update_accounts(accounts);
304 }
305 Some(Ok(Err(res))) => {
306 let (accs, err) = *res;
308 debug!(target: "txpool", %err, "failed to load accounts");
309 dirty_addresses.extend(accs);
310 }
311 Some(Err(_)) => {
312 maintained_state = MaintainedPoolState::Drifted;
314 }
315 None => {}
316 }
317
318 let Some(event) = event else { continue };
320 match event {
321 CanonStateNotification::Reorg { old, new } => {
322 let (old_blocks, old_state) = old.inner();
323 let (new_blocks, new_state) = new.inner();
324 let new_tip = new_blocks.tip();
325 let new_first = new_blocks.first();
326 let old_first = old_blocks.first();
327
328 if !(old_first.parent_hash() == pool_info.last_seen_block_hash ||
330 new_first.parent_hash() == pool_info.last_seen_block_hash)
331 {
332 maintained_state = MaintainedPoolState::Drifted;
334 }
335
336 let chain_spec = client.chain_spec();
337
338 let pending_block_base_fee = chain_spec
340 .next_block_base_fee(new_tip.header(), new_tip.timestamp())
341 .unwrap_or_default();
342 let pending_block_blob_fee = new_tip.header().maybe_next_block_blob_fee(
343 chain_spec.blob_params_at_timestamp(new_tip.timestamp()),
344 );
345
346 let new_changed_accounts: HashSet<_> =
348 new_state.changed_accounts().map(ChangedAccountEntry).collect();
349
350 let missing_changed_acc = old_state
352 .accounts_iter()
353 .map(|(a, _)| a)
354 .filter(|addr| !new_changed_accounts.contains(addr));
355
356 let mut changed_accounts =
358 match load_accounts(client.clone(), new_tip.hash(), missing_changed_acc) {
359 Ok(LoadedAccounts { accounts, failed_to_load }) => {
360 dirty_addresses.extend(failed_to_load);
362
363 accounts
364 }
365 Err(err) => {
366 let (addresses, err) = *err;
367 debug!(
368 target: "txpool",
369 %err,
370 "failed to load missing changed accounts at new tip: {:?}",
371 new_tip.hash()
372 );
373 dirty_addresses.extend(addresses);
374 vec![]
375 }
376 };
377
378 changed_accounts.extend(new_changed_accounts.into_iter().map(|entry| entry.0));
381
382 let new_mined_transactions: HashSet<_> = new_blocks.transaction_hashes().collect();
384
385 let pruned_old_transactions = old_blocks
388 .transactions_ecrecovered()
389 .filter(|tx| !new_mined_transactions.contains(tx.tx_hash()))
390 .filter_map(|tx| {
391 if tx.is_eip4844() {
392 pool.get_blob(*tx.tx_hash())
398 .ok()
399 .flatten()
400 .map(Arc::unwrap_or_clone)
401 .and_then(|sidecar| {
402 <P as TransactionPool>::Transaction::try_from_eip4844(
403 tx, sidecar,
404 )
405 })
406 } else {
407 <P as TransactionPool>::Transaction::try_from_consensus(tx).ok()
408 }
409 })
410 .collect::<Vec<_>>();
411
412 let update = CanonicalStateUpdate {
414 new_tip: new_tip.sealed_block(),
415 pending_block_base_fee,
416 pending_block_blob_fee,
417 changed_accounts,
418 mined_transactions: new_blocks.transaction_hashes().collect(),
420 update_kind: PoolUpdateKind::Reorg,
421 };
422 pool.on_canonical_state_change(update);
423
424 metrics.inc_reinserted_transactions(pruned_old_transactions.len());
431 let _ = pool.add_external_transactions(pruned_old_transactions).await;
432
433 blob_store_tracker.add_new_chain_blocks(&new_blocks);
435 }
436 CanonStateNotification::Commit { new } => {
437 let (blocks, state) = new.inner();
438 let tip = blocks.tip();
439 let chain_spec = client.chain_spec();
440
441 let pending_block_base_fee = chain_spec
443 .next_block_base_fee(tip.header(), tip.timestamp())
444 .unwrap_or_default();
445 let pending_block_blob_fee = tip.header().maybe_next_block_blob_fee(
446 chain_spec.blob_params_at_timestamp(tip.timestamp()),
447 );
448
449 let first_block = blocks.first();
450 trace!(
451 target: "txpool",
452 first = first_block.number(),
453 tip = tip.number(),
454 pool_block = pool_info.last_seen_block_number,
455 "update pool on new commit"
456 );
457
458 let depth = tip.number().abs_diff(pool_info.last_seen_block_number);
461 if depth > max_update_depth {
462 maintained_state = MaintainedPoolState::Drifted;
463 debug!(target: "txpool", ?depth, "skipping deep canonical update");
464 let info = BlockInfo {
465 block_gas_limit: tip.header().gas_limit(),
466 last_seen_block_hash: tip.hash(),
467 last_seen_block_number: tip.number(),
468 pending_basefee: pending_block_base_fee,
469 pending_blob_fee: pending_block_blob_fee,
470 };
471 pool.set_block_info(info);
472
473 blob_store_tracker.add_new_chain_blocks(&blocks);
475
476 continue
477 }
478
479 let mut changed_accounts = Vec::with_capacity(state.state().len());
480 for acc in state.changed_accounts() {
481 dirty_addresses.remove(&acc.address);
483 changed_accounts.push(acc);
484 }
485
486 let mined_transactions = blocks.transaction_hashes().collect();
487
488 if first_block.parent_hash() != pool_info.last_seen_block_hash {
490 maintained_state = MaintainedPoolState::Drifted;
494 }
495
496 let update = CanonicalStateUpdate {
498 new_tip: tip.sealed_block(),
499 pending_block_base_fee,
500 pending_block_blob_fee,
501 changed_accounts,
502 mined_transactions,
503 update_kind: PoolUpdateKind::Commit,
504 };
505 pool.on_canonical_state_change(update);
506
507 blob_store_tracker.add_new_chain_blocks(&blocks);
509
510 if !chain_spec.is_osaka_active_at_timestamp(tip.timestamp()) &&
512 !chain_spec.is_osaka_active_at_timestamp(tip.timestamp().saturating_add(12)) &&
513 chain_spec.is_osaka_active_at_timestamp(tip.timestamp().saturating_add(24))
514 {
515 let pool = pool.clone();
516 let spawner = task_spawner.clone();
517 let client = client.clone();
518 task_spawner.spawn(Box::pin(async move {
519 tokio::time::sleep(Duration::from_secs(4)).await;
523
524 let mut interval = tokio::time::interval(Duration::from_secs(1));
525 loop {
526 let last_iteration =
529 client.latest_header().ok().flatten().is_none_or(|header| {
530 client
531 .chain_spec()
532 .is_osaka_active_at_timestamp(header.timestamp())
533 });
534
535 let AllPoolTransactions { pending, queued } = pool.all_transactions();
536 for tx in pending
537 .into_iter()
538 .chain(queued)
539 .filter(|tx| tx.transaction.is_eip4844())
540 {
541 let tx_hash = *tx.transaction.hash();
542
543 let Ok(Some(sidecar)) = pool.get_blob(tx_hash) else {
545 continue;
546 };
547 if !sidecar.is_eip4844() {
549 continue;
550 }
551 let Some(tx) = pool.remove_transactions(vec![tx_hash]).pop() else {
554 continue;
555 };
556 pool.delete_blob(tx_hash);
557
558 let BlobTransactionSidecarVariant::Eip4844(sidecar) =
559 Arc::unwrap_or_clone(sidecar)
560 else {
561 continue;
562 };
563
564 let converter = BlobSidecarConverter::new();
565 let pool = pool.clone();
566 spawner.spawn(Box::pin(async move {
567 let Some(sidecar) = converter.convert(sidecar).await else {
569 return;
570 };
571
572 let origin = tx.origin;
574 let Some(tx) = EthPoolTransaction::try_from_eip4844(
575 tx.transaction.clone_into_consensus(),
576 sidecar.into(),
577 ) else {
578 return;
579 };
580 let _ = pool.add_transaction(origin, tx).await;
581 }));
582 }
583
584 if last_iteration {
585 break;
586 }
587
588 interval.tick().await;
589 }
590 }));
591 }
592 }
593 }
594 }
595}
596
597struct FinalizedBlockTracker {
598 last_finalized_block: Option<BlockNumber>,
599}
600
601impl FinalizedBlockTracker {
602 const fn new(last_finalized_block: Option<BlockNumber>) -> Self {
603 Self { last_finalized_block }
604 }
605
606 fn update(&mut self, finalized_block: Option<BlockNumber>) -> Option<BlockNumber> {
608 let finalized = finalized_block?;
609 self.last_finalized_block.is_none_or(|last| last < finalized).then(|| {
610 self.last_finalized_block = Some(finalized);
611 finalized
612 })
613 }
614}
615
616#[derive(Debug, PartialEq, Eq)]
619enum MaintainedPoolState {
620 InSync,
622 Drifted,
624}
625
626impl MaintainedPoolState {
627 #[inline]
629 const fn is_drifted(&self) -> bool {
630 matches!(self, Self::Drifted)
631 }
632}
633
634#[derive(Eq)]
636struct ChangedAccountEntry(ChangedAccount);
637
638impl PartialEq for ChangedAccountEntry {
639 fn eq(&self, other: &Self) -> bool {
640 self.0.address == other.0.address
641 }
642}
643
644impl Hash for ChangedAccountEntry {
645 fn hash<H: Hasher>(&self, state: &mut H) {
646 self.0.address.hash(state);
647 }
648}
649
650impl Borrow<Address> for ChangedAccountEntry {
651 fn borrow(&self) -> &Address {
652 &self.0.address
653 }
654}
655
656#[derive(Default)]
657struct LoadedAccounts {
658 accounts: Vec<ChangedAccount>,
660 failed_to_load: Vec<Address>,
662}
663
664fn load_accounts<Client, I>(
670 client: Client,
671 at: BlockHash,
672 addresses: I,
673) -> Result<LoadedAccounts, Box<(HashSet<Address>, ProviderError)>>
674where
675 I: IntoIterator<Item = Address>,
676 Client: StateProviderFactory,
677{
678 let addresses = addresses.into_iter();
679 let mut res = LoadedAccounts::default();
680 let state = match client.history_by_block_hash(at) {
681 Ok(state) => state,
682 Err(err) => return Err(Box::new((addresses.collect(), err))),
683 };
684 for addr in addresses {
685 if let Ok(maybe_acc) = state.basic_account(&addr) {
686 let acc = maybe_acc
687 .map(|acc| ChangedAccount { address: addr, nonce: acc.nonce, balance: acc.balance })
688 .unwrap_or_else(|| ChangedAccount::empty(addr));
689 res.accounts.push(acc)
690 } else {
691 res.failed_to_load.push(addr);
693 }
694 }
695 Ok(res)
696}
697
698async fn load_and_reinsert_transactions<P>(
702 pool: P,
703 file_path: &Path,
704) -> Result<(), TransactionsBackupError>
705where
706 P: TransactionPool<Transaction: PoolTransaction<Consensus: SignedTransaction>>,
707{
708 if !file_path.exists() {
709 return Ok(())
710 }
711
712 debug!(target: "txpool", txs_file =?file_path, "Check local persistent storage for saved transactions");
713 let data = reth_fs_util::read(file_path)?;
714
715 if data.is_empty() {
716 return Ok(())
717 }
718
719 let pool_transactions: Vec<(TransactionOrigin, <P as TransactionPool>::Transaction)> =
720 if let Ok(tx_backups) = serde_json::from_slice::<Vec<TxBackup>>(&data) {
721 tx_backups
722 .into_iter()
723 .filter_map(|backup| {
724 let tx_signed =
725 <P::Transaction as PoolTransaction>::Consensus::decode_2718_exact(
726 backup.rlp.as_ref(),
727 )
728 .ok()?;
729 let recovered = tx_signed.try_into_recovered().ok()?;
730 let pool_tx =
731 <P::Transaction as PoolTransaction>::try_from_consensus(recovered).ok()?;
732
733 Some((backup.origin, pool_tx))
734 })
735 .collect()
736 } else {
737 let txs_signed: Vec<<P::Transaction as PoolTransaction>::Consensus> =
738 alloy_rlp::Decodable::decode(&mut data.as_slice())?;
739
740 txs_signed
741 .into_iter()
742 .filter_map(|tx| tx.try_into_recovered().ok())
743 .filter_map(|tx| {
744 <P::Transaction as PoolTransaction>::try_from_consensus(tx)
745 .ok()
746 .map(|pool_tx| (TransactionOrigin::Local, pool_tx))
747 })
748 .collect()
749 };
750
751 let inserted = futures_util::future::join_all(
752 pool_transactions.into_iter().map(|(origin, tx)| pool.add_transaction(origin, tx)),
753 )
754 .await;
755
756 info!(target: "txpool", txs_file =?file_path, num_txs=%inserted.len(), "Successfully reinserted local transactions from file");
757 reth_fs_util::remove_file(file_path)?;
758 Ok(())
759}
760
761fn save_local_txs_backup<P>(pool: P, file_path: &Path)
762where
763 P: TransactionPool<Transaction: PoolTransaction<Consensus: Encodable>>,
764{
765 let local_transactions = pool.get_local_transactions();
766 if local_transactions.is_empty() {
767 trace!(target: "txpool", "no local transactions to save");
768 return
769 }
770
771 let local_transactions = local_transactions
772 .into_iter()
773 .map(|tx| {
774 let consensus_tx = tx.transaction.clone_into_consensus().into_inner();
775 let rlp_data = consensus_tx.encoded_2718();
776
777 TxBackup { rlp: rlp_data.into(), origin: tx.origin }
778 })
779 .collect::<Vec<_>>();
780
781 let json_data = match serde_json::to_string(&local_transactions) {
782 Ok(data) => data,
783 Err(err) => {
784 warn!(target: "txpool", %err, txs_file=?file_path, "failed to serialize local transactions to json");
785 return
786 }
787 };
788
789 info!(target: "txpool", txs_file =?file_path, num_txs=%local_transactions.len(), "Saving current local transactions");
790 let parent_dir = file_path.parent().map(std::fs::create_dir_all).transpose();
791
792 match parent_dir.map(|_| reth_fs_util::write(file_path, json_data)) {
793 Ok(_) => {
794 info!(target: "txpool", txs_file=?file_path, "Wrote local transactions to file");
795 }
796 Err(err) => {
797 warn!(target: "txpool", %err, txs_file=?file_path, "Failed to write local transactions to file");
798 }
799 }
800}
801
802#[derive(Debug, Deserialize, Serialize)]
805pub struct TxBackup {
806 pub rlp: Bytes,
808 pub origin: TransactionOrigin,
810}
811
812#[derive(thiserror::Error, Debug)]
814pub enum TransactionsBackupError {
815 #[error("failed to apply transactions backup. Encountered RLP decode error: {0}")]
817 Decode(#[from] alloy_rlp::Error),
818 #[error("failed to apply transactions backup. Encountered JSON decode error: {0}")]
820 Json(#[from] serde_json::Error),
821 #[error("failed to apply transactions backup. Encountered file error: {0}")]
823 FsPath(#[from] FsPathError),
824 #[error("failed to insert transactions to the transactions pool. Encountered pool error: {0}")]
826 Pool(#[from] PoolError),
827}
828
829pub async fn backup_local_transactions_task<P>(
832 shutdown: reth_tasks::shutdown::GracefulShutdown,
833 pool: P,
834 config: LocalTransactionBackupConfig,
835) where
836 P: TransactionPool<Transaction: PoolTransaction<Consensus: SignedTransaction>> + Clone,
837{
838 let Some(transactions_path) = config.transactions_path else {
839 return
841 };
842
843 if let Err(err) = load_and_reinsert_transactions(pool.clone(), &transactions_path).await {
844 error!(target: "txpool", "{}", err)
845 }
846
847 let graceful_guard = shutdown.await;
848
849 save_local_txs_backup(pool, &transactions_path);
851
852 drop(graceful_guard)
853}
854
855#[cfg(test)]
856mod tests {
857 use super::*;
858 use crate::{
859 blobstore::InMemoryBlobStore, validate::EthTransactionValidatorBuilder,
860 CoinbaseTipOrdering, EthPooledTransaction, EthTransactionValidator, Pool,
861 TransactionOrigin,
862 };
863 use alloy_eips::eip2718::Decodable2718;
864 use alloy_primitives::{hex, U256};
865 use reth_ethereum_primitives::PooledTransactionVariant;
866 use reth_fs_util as fs;
867 use reth_provider::test_utils::{ExtendedAccount, MockEthProvider};
868 use reth_tasks::TaskManager;
869
870 #[test]
871 fn changed_acc_entry() {
872 let changed_acc = ChangedAccountEntry(ChangedAccount::empty(Address::random()));
873 let mut copy = changed_acc.0;
874 copy.nonce = 10;
875 assert!(changed_acc.eq(&ChangedAccountEntry(copy)));
876 }
877
878 const EXTENSION: &str = "json";
879 const FILENAME: &str = "test_transactions_backup";
880
881 #[tokio::test(flavor = "multi_thread")]
882 async fn test_save_local_txs_backup() {
883 let temp_dir = tempfile::tempdir().unwrap();
884 let transactions_path = temp_dir.path().join(FILENAME).with_extension(EXTENSION);
885 let tx_bytes = hex!(
886 "02f87201830655c2808505ef61f08482565f94388c818ca8b9251b393131c08a736a67ccb192978801049e39c4b5b1f580c001a01764ace353514e8abdfb92446de356b260e3c1225b73fc4c8876a6258d12a129a04f02294aa61ca7676061cd99f29275491218b4754b46a0248e5e42bc5091f507"
887 );
888 let tx = PooledTransactionVariant::decode_2718(&mut &tx_bytes[..]).unwrap();
889 let provider = MockEthProvider::default();
890 let transaction = EthPooledTransaction::from_pooled(tx.try_into_recovered().unwrap());
891 let tx_to_cmp = transaction.clone();
892 let sender = hex!("1f9090aaE28b8a3dCeaDf281B0F12828e676c326").into();
893 provider.add_account(sender, ExtendedAccount::new(42, U256::MAX));
894 let blob_store = InMemoryBlobStore::default();
895 let validator: EthTransactionValidator<_, _, reth_ethereum_primitives::Block> =
896 EthTransactionValidatorBuilder::new(provider).build(blob_store.clone());
897
898 let txpool = Pool::new(
899 validator,
900 CoinbaseTipOrdering::default(),
901 blob_store.clone(),
902 Default::default(),
903 );
904
905 txpool.add_transaction(TransactionOrigin::Local, transaction.clone()).await.unwrap();
906
907 let handle = tokio::runtime::Handle::current();
908 let manager = TaskManager::new(handle);
909 let config = LocalTransactionBackupConfig::with_local_txs_backup(transactions_path.clone());
910 manager.executor().spawn_critical_with_graceful_shutdown_signal("test task", |shutdown| {
911 backup_local_transactions_task(shutdown, txpool.clone(), config)
912 });
913
914 let mut txns = txpool.get_local_transactions();
915 let tx_on_finish = txns.pop().expect("there should be 1 transaction");
916
917 assert_eq!(*tx_to_cmp.hash(), *tx_on_finish.hash());
918
919 manager.graceful_shutdown();
921
922 let data = fs::read(transactions_path).unwrap();
923
924 let txs: Vec<TxBackup> = serde_json::from_slice::<Vec<TxBackup>>(&data).unwrap();
925 assert_eq!(txs.len(), 1);
926
927 temp_dir.close().unwrap();
928 }
929
930 #[test]
931 fn test_update_with_higher_finalized_block() {
932 let mut tracker = FinalizedBlockTracker::new(Some(10));
933 assert_eq!(tracker.update(Some(15)), Some(15));
934 assert_eq!(tracker.last_finalized_block, Some(15));
935 }
936
937 #[test]
938 fn test_update_with_lower_finalized_block() {
939 let mut tracker = FinalizedBlockTracker::new(Some(20));
940 assert_eq!(tracker.update(Some(15)), None);
941 assert_eq!(tracker.last_finalized_block, Some(20));
943 }
944
945 #[test]
946 fn test_update_with_equal_finalized_block() {
947 let mut tracker = FinalizedBlockTracker::new(Some(20));
948 assert_eq!(tracker.update(Some(20)), None);
949 assert_eq!(tracker.last_finalized_block, Some(20));
950 }
951
952 #[test]
953 fn test_update_with_no_last_finalized_block() {
954 let mut tracker = FinalizedBlockTracker::new(None);
955 assert_eq!(tracker.update(Some(10)), Some(10));
956 assert_eq!(tracker.last_finalized_block, Some(10));
957 }
958
959 #[test]
960 fn test_update_with_no_new_finalized_block() {
961 let mut tracker = FinalizedBlockTracker::new(Some(10));
962 assert_eq!(tracker.update(None), None);
963 assert_eq!(tracker.last_finalized_block, Some(10));
964 }
965
966 #[test]
967 fn test_update_with_no_finalized_blocks() {
968 let mut tracker = FinalizedBlockTracker::new(None);
969 assert_eq!(tracker.update(None), None);
970 assert_eq!(tracker.last_finalized_block, None);
971 }
972}