1use crate::{
4 blobstore::{BlobStoreCanonTracker, BlobStoreUpdates},
5 error::PoolError,
6 metrics::MaintainPoolMetrics,
7 traits::{CanonicalStateUpdate, EthPoolTransaction, TransactionPool, TransactionPoolExt},
8 BlockInfo, PoolTransaction, PoolUpdateKind, TransactionOrigin,
9};
10use alloy_consensus::{transaction::TxHashRef, BlockHeader, Typed2718};
11use alloy_eips::{BlockNumberOrTag, Decodable2718, Encodable2718};
12use alloy_primitives::{Address, BlockHash, BlockNumber};
13use alloy_rlp::{Bytes, Encodable};
14use futures_util::{
15 future::{BoxFuture, Fuse, FusedFuture},
16 FutureExt, Stream, StreamExt,
17};
18use reth_chain_state::CanonStateNotification;
19use reth_chainspec::{ChainSpecProvider, EthChainSpec};
20use reth_execution_types::ChangedAccount;
21use reth_fs_util::FsPathError;
22use reth_primitives_traits::{
23 transaction::signed::SignedTransaction, NodePrimitives, SealedHeader,
24};
25use reth_storage_api::{errors::provider::ProviderError, BlockReaderIdExt, StateProviderFactory};
26use reth_tasks::TaskSpawner;
27use serde::{Deserialize, Serialize};
28use std::{
29 borrow::Borrow,
30 collections::HashSet,
31 hash::{Hash, Hasher},
32 path::{Path, PathBuf},
33 sync::Arc,
34};
35use tokio::{
36 sync::oneshot,
37 time::{self, Duration},
38};
39use tracing::{debug, error, info, trace, warn};
40
41pub const MAX_QUEUED_TRANSACTION_LIFETIME: Duration = Duration::from_secs(3 * 60 * 60);
43
44#[derive(Debug, Clone, Copy, PartialEq, Eq)]
46pub struct MaintainPoolConfig {
47 pub max_update_depth: u64,
52 pub max_reload_accounts: usize,
56
57 pub max_tx_lifetime: Duration,
60
61 pub no_local_exemptions: bool,
67}
68
69impl Default for MaintainPoolConfig {
70 fn default() -> Self {
71 Self {
72 max_update_depth: 64,
73 max_reload_accounts: 100,
74 max_tx_lifetime: MAX_QUEUED_TRANSACTION_LIFETIME,
75 no_local_exemptions: false,
76 }
77 }
78}
79
80#[derive(Debug, Clone, Default)]
82pub struct LocalTransactionBackupConfig {
83 pub transactions_path: Option<PathBuf>,
85}
86
87impl LocalTransactionBackupConfig {
88 pub const fn with_local_txs_backup(transactions_path: PathBuf) -> Self {
90 Self { transactions_path: Some(transactions_path) }
91 }
92}
93
94pub fn maintain_transaction_pool_future<N, Client, P, St, Tasks>(
96 client: Client,
97 pool: P,
98 events: St,
99 task_spawner: Tasks,
100 config: MaintainPoolConfig,
101) -> BoxFuture<'static, ()>
102where
103 N: NodePrimitives,
104 Client: StateProviderFactory
105 + BlockReaderIdExt<Header = N::BlockHeader>
106 + ChainSpecProvider<ChainSpec: EthChainSpec<Header = N::BlockHeader>>
107 + Clone
108 + 'static,
109 P: TransactionPoolExt<Transaction: PoolTransaction<Consensus = N::SignedTx>> + 'static,
110 St: Stream<Item = CanonStateNotification<N>> + Send + Unpin + 'static,
111 Tasks: TaskSpawner + 'static,
112{
113 async move {
114 maintain_transaction_pool(client, pool, events, task_spawner, config).await;
115 }
116 .boxed()
117}
118
119pub async fn maintain_transaction_pool<N, Client, P, St, Tasks>(
123 client: Client,
124 pool: P,
125 mut events: St,
126 task_spawner: Tasks,
127 config: MaintainPoolConfig,
128) where
129 N: NodePrimitives,
130 Client: StateProviderFactory
131 + BlockReaderIdExt<Header = N::BlockHeader>
132 + ChainSpecProvider<ChainSpec: EthChainSpec<Header = N::BlockHeader>>
133 + Clone
134 + 'static,
135 P: TransactionPoolExt<Transaction: PoolTransaction<Consensus = N::SignedTx>> + 'static,
136 St: Stream<Item = CanonStateNotification<N>> + Send + Unpin + 'static,
137 Tasks: TaskSpawner + 'static,
138{
139 let metrics = MaintainPoolMetrics::default();
140 let MaintainPoolConfig { max_update_depth, max_reload_accounts, .. } = config;
141 if let Ok(Some(latest)) = client.header_by_number_or_tag(BlockNumberOrTag::Latest) {
143 let latest = SealedHeader::seal_slow(latest);
144 let chain_spec = client.chain_spec();
145 let info = BlockInfo {
146 block_gas_limit: latest.gas_limit(),
147 last_seen_block_hash: latest.hash(),
148 last_seen_block_number: latest.number(),
149 pending_basefee: chain_spec
150 .next_block_base_fee(latest.header(), latest.timestamp())
151 .unwrap_or_default(),
152 pending_blob_fee: latest
153 .maybe_next_block_blob_fee(chain_spec.blob_params_at_timestamp(latest.timestamp())),
154 };
155 pool.set_block_info(info);
156 }
157
158 let mut blob_store_tracker = BlobStoreCanonTracker::default();
160
161 let mut last_finalized_block =
163 FinalizedBlockTracker::new(client.finalized_block_number().ok().flatten());
164
165 let mut dirty_addresses = HashSet::default();
167
168 let mut maintained_state = MaintainedPoolState::InSync;
170
171 let mut reload_accounts_fut = Fuse::terminated();
173
174 let mut stale_eviction_interval = time::interval(config.max_tx_lifetime);
176
177 let mut first_event = true;
179
180 loop {
183 trace!(target: "txpool", state=?maintained_state, "awaiting new block or reorg");
184
185 metrics.set_dirty_accounts_len(dirty_addresses.len());
186 let pool_info = pool.block_info();
187
188 if maintained_state.is_drifted() {
192 metrics.inc_drift();
193 dirty_addresses = pool.unique_senders();
195 maintained_state = MaintainedPoolState::InSync;
197 }
198
199 if !dirty_addresses.is_empty() && reload_accounts_fut.is_terminated() {
201 let (tx, rx) = oneshot::channel();
202 let c = client.clone();
203 let at = pool_info.last_seen_block_hash;
204 let fut = if dirty_addresses.len() > max_reload_accounts {
205 let accs_to_reload =
207 dirty_addresses.iter().copied().take(max_reload_accounts).collect::<Vec<_>>();
208 for acc in &accs_to_reload {
209 dirty_addresses.remove(acc);
211 }
212 async move {
213 let res = load_accounts(c, at, accs_to_reload.into_iter());
214 let _ = tx.send(res);
215 }
216 .boxed()
217 } else {
218 let accs_to_reload = std::mem::take(&mut dirty_addresses);
220 async move {
221 let res = load_accounts(c, at, accs_to_reload.into_iter());
222 let _ = tx.send(res);
223 }
224 .boxed()
225 };
226 reload_accounts_fut = rx.fuse();
227 task_spawner.spawn_blocking(fut);
228 }
229
230 if let Some(finalized) =
232 last_finalized_block.update(client.finalized_block_number().ok().flatten()) &&
233 let BlobStoreUpdates::Finalized(blobs) =
234 blob_store_tracker.on_finalized_block(finalized)
235 {
236 metrics.inc_deleted_tracked_blobs(blobs.len());
237 pool.delete_blobs(blobs);
239 let pool = pool.clone();
241 task_spawner.spawn_blocking(Box::pin(async move {
242 debug!(target: "txpool", finalized_block = %finalized, "cleaning up blob store");
243 pool.cleanup_blobs();
244 }));
245 }
246
247 let mut event = None;
249 let mut reloaded = None;
250
251 tokio::select! {
254 res = &mut reload_accounts_fut => {
255 reloaded = Some(res);
256 }
257 ev = events.next() => {
258 if ev.is_none() {
259 break;
261 }
262 event = ev;
263 if first_event {
266 maintained_state = MaintainedPoolState::Drifted;
267 first_event = false
268 }
269 }
270 _ = stale_eviction_interval.tick() => {
271 let stale_txs: Vec<_> = pool
272 .queued_transactions()
273 .into_iter()
274 .filter(|tx| {
275 (tx.origin.is_external() || config.no_local_exemptions) && tx.timestamp.elapsed() > config.max_tx_lifetime
277 })
278 .map(|tx| *tx.hash())
279 .collect();
280 debug!(target: "txpool", count=%stale_txs.len(), "removing stale transactions");
281 pool.remove_transactions(stale_txs);
282 }
283 }
284 match reloaded {
286 Some(Ok(Ok(LoadedAccounts { accounts, failed_to_load }))) => {
287 dirty_addresses.extend(failed_to_load);
290 pool.update_accounts(accounts);
292 }
293 Some(Ok(Err(res))) => {
294 let (accs, err) = *res;
296 debug!(target: "txpool", %err, "failed to load accounts");
297 dirty_addresses.extend(accs);
298 }
299 Some(Err(_)) => {
300 maintained_state = MaintainedPoolState::Drifted;
302 }
303 None => {}
304 }
305
306 let Some(event) = event else { continue };
308 match event {
309 CanonStateNotification::Reorg { old, new } => {
310 let (old_blocks, old_state) = old.inner();
311 let (new_blocks, new_state) = new.inner();
312 let new_tip = new_blocks.tip();
313 let new_first = new_blocks.first();
314 let old_first = old_blocks.first();
315
316 if !(old_first.parent_hash() == pool_info.last_seen_block_hash ||
318 new_first.parent_hash() == pool_info.last_seen_block_hash)
319 {
320 maintained_state = MaintainedPoolState::Drifted;
322 }
323
324 let chain_spec = client.chain_spec();
325
326 let pending_block_base_fee = chain_spec
328 .next_block_base_fee(new_tip.header(), new_tip.timestamp())
329 .unwrap_or_default();
330 let pending_block_blob_fee = new_tip.header().maybe_next_block_blob_fee(
331 chain_spec.blob_params_at_timestamp(new_tip.timestamp()),
332 );
333
334 let new_changed_accounts: HashSet<_> =
336 new_state.changed_accounts().map(ChangedAccountEntry).collect();
337
338 let missing_changed_acc = old_state
340 .accounts_iter()
341 .map(|(a, _)| a)
342 .filter(|addr| !new_changed_accounts.contains(addr));
343
344 let mut changed_accounts =
346 match load_accounts(client.clone(), new_tip.hash(), missing_changed_acc) {
347 Ok(LoadedAccounts { accounts, failed_to_load }) => {
348 dirty_addresses.extend(failed_to_load);
350
351 accounts
352 }
353 Err(err) => {
354 let (addresses, err) = *err;
355 debug!(
356 target: "txpool",
357 %err,
358 "failed to load missing changed accounts at new tip: {:?}",
359 new_tip.hash()
360 );
361 dirty_addresses.extend(addresses);
362 vec![]
363 }
364 };
365
366 changed_accounts.extend(new_changed_accounts.into_iter().map(|entry| entry.0));
369
370 let new_mined_transactions: HashSet<_> = new_blocks.transaction_hashes().collect();
372
373 let pruned_old_transactions = old_blocks
376 .transactions_ecrecovered()
377 .filter(|tx| !new_mined_transactions.contains(tx.tx_hash()))
378 .filter_map(|tx| {
379 if tx.is_eip4844() {
380 pool.get_blob(*tx.tx_hash())
386 .ok()
387 .flatten()
388 .map(Arc::unwrap_or_clone)
389 .and_then(|sidecar| {
390 <P as TransactionPool>::Transaction::try_from_eip4844(
391 tx, sidecar,
392 )
393 })
394 } else {
395 <P as TransactionPool>::Transaction::try_from_consensus(tx).ok()
396 }
397 })
398 .collect::<Vec<_>>();
399
400 let update = CanonicalStateUpdate {
402 new_tip: new_tip.sealed_block(),
403 pending_block_base_fee,
404 pending_block_blob_fee,
405 changed_accounts,
406 mined_transactions: new_blocks.transaction_hashes().collect(),
408 update_kind: PoolUpdateKind::Reorg,
409 };
410 pool.on_canonical_state_change(update);
411
412 metrics.inc_reinserted_transactions(pruned_old_transactions.len());
419 let _ = pool.add_external_transactions(pruned_old_transactions).await;
420
421 blob_store_tracker.add_new_chain_blocks(&new_blocks);
423 }
424 CanonStateNotification::Commit { new } => {
425 let (blocks, state) = new.inner();
426 let tip = blocks.tip();
427 let chain_spec = client.chain_spec();
428
429 let pending_block_base_fee = chain_spec
431 .next_block_base_fee(tip.header(), tip.timestamp())
432 .unwrap_or_default();
433 let pending_block_blob_fee = tip.header().maybe_next_block_blob_fee(
434 chain_spec.blob_params_at_timestamp(tip.timestamp()),
435 );
436
437 let first_block = blocks.first();
438 trace!(
439 target: "txpool",
440 first = first_block.number(),
441 tip = tip.number(),
442 pool_block = pool_info.last_seen_block_number,
443 "update pool on new commit"
444 );
445
446 let depth = tip.number().abs_diff(pool_info.last_seen_block_number);
449 if depth > max_update_depth {
450 maintained_state = MaintainedPoolState::Drifted;
451 debug!(target: "txpool", ?depth, "skipping deep canonical update");
452 let info = BlockInfo {
453 block_gas_limit: tip.header().gas_limit(),
454 last_seen_block_hash: tip.hash(),
455 last_seen_block_number: tip.number(),
456 pending_basefee: pending_block_base_fee,
457 pending_blob_fee: pending_block_blob_fee,
458 };
459 pool.set_block_info(info);
460
461 blob_store_tracker.add_new_chain_blocks(&blocks);
463
464 continue
465 }
466
467 let mut changed_accounts = Vec::with_capacity(state.state().len());
468 for acc in state.changed_accounts() {
469 dirty_addresses.remove(&acc.address);
471 changed_accounts.push(acc);
472 }
473
474 let mined_transactions = blocks.transaction_hashes().collect();
475
476 if first_block.parent_hash() != pool_info.last_seen_block_hash {
478 maintained_state = MaintainedPoolState::Drifted;
482 }
483
484 let update = CanonicalStateUpdate {
486 new_tip: tip.sealed_block(),
487 pending_block_base_fee,
488 pending_block_blob_fee,
489 changed_accounts,
490 mined_transactions,
491 update_kind: PoolUpdateKind::Commit,
492 };
493 pool.on_canonical_state_change(update);
494
495 blob_store_tracker.add_new_chain_blocks(&blocks);
497 }
498 }
499 }
500}
501
502struct FinalizedBlockTracker {
503 last_finalized_block: Option<BlockNumber>,
504}
505
506impl FinalizedBlockTracker {
507 const fn new(last_finalized_block: Option<BlockNumber>) -> Self {
508 Self { last_finalized_block }
509 }
510
511 fn update(&mut self, finalized_block: Option<BlockNumber>) -> Option<BlockNumber> {
513 let finalized = finalized_block?;
514 self.last_finalized_block
515 .replace(finalized)
516 .is_none_or(|last| last < finalized)
517 .then_some(finalized)
518 }
519}
520
521#[derive(Debug, PartialEq, Eq)]
524enum MaintainedPoolState {
525 InSync,
527 Drifted,
529}
530
531impl MaintainedPoolState {
532 #[inline]
534 const fn is_drifted(&self) -> bool {
535 matches!(self, Self::Drifted)
536 }
537}
538
539#[derive(Eq)]
541struct ChangedAccountEntry(ChangedAccount);
542
543impl PartialEq for ChangedAccountEntry {
544 fn eq(&self, other: &Self) -> bool {
545 self.0.address == other.0.address
546 }
547}
548
549impl Hash for ChangedAccountEntry {
550 fn hash<H: Hasher>(&self, state: &mut H) {
551 self.0.address.hash(state);
552 }
553}
554
555impl Borrow<Address> for ChangedAccountEntry {
556 fn borrow(&self) -> &Address {
557 &self.0.address
558 }
559}
560
561#[derive(Default)]
562struct LoadedAccounts {
563 accounts: Vec<ChangedAccount>,
565 failed_to_load: Vec<Address>,
567}
568
569fn load_accounts<Client, I>(
575 client: Client,
576 at: BlockHash,
577 addresses: I,
578) -> Result<LoadedAccounts, Box<(HashSet<Address>, ProviderError)>>
579where
580 I: IntoIterator<Item = Address>,
581 Client: StateProviderFactory,
582{
583 let addresses = addresses.into_iter();
584 let mut res = LoadedAccounts::default();
585 let state = match client.history_by_block_hash(at) {
586 Ok(state) => state,
587 Err(err) => return Err(Box::new((addresses.collect(), err))),
588 };
589 for addr in addresses {
590 if let Ok(maybe_acc) = state.basic_account(&addr) {
591 let acc = maybe_acc
592 .map(|acc| ChangedAccount { address: addr, nonce: acc.nonce, balance: acc.balance })
593 .unwrap_or_else(|| ChangedAccount::empty(addr));
594 res.accounts.push(acc)
595 } else {
596 res.failed_to_load.push(addr);
598 }
599 }
600 Ok(res)
601}
602
603async fn load_and_reinsert_transactions<P>(
607 pool: P,
608 file_path: &Path,
609) -> Result<(), TransactionsBackupError>
610where
611 P: TransactionPool<Transaction: PoolTransaction<Consensus: SignedTransaction>>,
612{
613 if !file_path.exists() {
614 return Ok(())
615 }
616
617 debug!(target: "txpool", txs_file =?file_path, "Check local persistent storage for saved transactions");
618 let data = reth_fs_util::read(file_path)?;
619
620 if data.is_empty() {
621 return Ok(())
622 }
623
624 let pool_transactions: Vec<(TransactionOrigin, <P as TransactionPool>::Transaction)> =
625 if let Ok(tx_backups) = serde_json::from_slice::<Vec<TxBackup>>(&data) {
626 tx_backups
627 .into_iter()
628 .filter_map(|backup| {
629 let tx_signed =
630 <P::Transaction as PoolTransaction>::Consensus::decode_2718_exact(
631 backup.rlp.as_ref(),
632 )
633 .ok()?;
634 let recovered = tx_signed.try_into_recovered().ok()?;
635 let pool_tx =
636 <P::Transaction as PoolTransaction>::try_from_consensus(recovered).ok()?;
637
638 Some((backup.origin, pool_tx))
639 })
640 .collect()
641 } else {
642 let txs_signed: Vec<<P::Transaction as PoolTransaction>::Consensus> =
643 alloy_rlp::Decodable::decode(&mut data.as_slice())?;
644
645 txs_signed
646 .into_iter()
647 .filter_map(|tx| tx.try_into_recovered().ok())
648 .filter_map(|tx| {
649 <P::Transaction as PoolTransaction>::try_from_consensus(tx)
650 .ok()
651 .map(|pool_tx| (TransactionOrigin::Local, pool_tx))
652 })
653 .collect()
654 };
655
656 let inserted = futures_util::future::join_all(
657 pool_transactions.into_iter().map(|(origin, tx)| pool.add_transaction(origin, tx)),
658 )
659 .await;
660
661 info!(target: "txpool", txs_file =?file_path, num_txs=%inserted.len(), "Successfully reinserted local transactions from file");
662 reth_fs_util::remove_file(file_path)?;
663 Ok(())
664}
665
666fn save_local_txs_backup<P>(pool: P, file_path: &Path)
667where
668 P: TransactionPool<Transaction: PoolTransaction<Consensus: Encodable>>,
669{
670 let local_transactions = pool.get_local_transactions();
671 if local_transactions.is_empty() {
672 trace!(target: "txpool", "no local transactions to save");
673 return
674 }
675
676 let local_transactions = local_transactions
677 .into_iter()
678 .map(|tx| {
679 let consensus_tx = tx.transaction.clone_into_consensus().into_inner();
680 let rlp_data = consensus_tx.encoded_2718();
681
682 TxBackup { rlp: rlp_data.into(), origin: tx.origin }
683 })
684 .collect::<Vec<_>>();
685
686 let json_data = match serde_json::to_string(&local_transactions) {
687 Ok(data) => data,
688 Err(err) => {
689 warn!(target: "txpool", %err, txs_file=?file_path, "failed to serialize local transactions to json");
690 return
691 }
692 };
693
694 info!(target: "txpool", txs_file =?file_path, num_txs=%local_transactions.len(), "Saving current local transactions");
695 let parent_dir = file_path.parent().map(std::fs::create_dir_all).transpose();
696
697 match parent_dir.map(|_| reth_fs_util::write(file_path, json_data)) {
698 Ok(_) => {
699 info!(target: "txpool", txs_file=?file_path, "Wrote local transactions to file");
700 }
701 Err(err) => {
702 warn!(target: "txpool", %err, txs_file=?file_path, "Failed to write local transactions to file");
703 }
704 }
705}
706
707#[derive(Debug, Deserialize, Serialize)]
710pub struct TxBackup {
711 pub rlp: Bytes,
713 pub origin: TransactionOrigin,
715}
716
717#[derive(thiserror::Error, Debug)]
719pub enum TransactionsBackupError {
720 #[error("failed to apply transactions backup. Encountered RLP decode error: {0}")]
722 Decode(#[from] alloy_rlp::Error),
723 #[error("failed to apply transactions backup. Encountered JSON decode error: {0}")]
725 Json(#[from] serde_json::Error),
726 #[error("failed to apply transactions backup. Encountered file error: {0}")]
728 FsPath(#[from] FsPathError),
729 #[error("failed to insert transactions to the transactions pool. Encountered pool error: {0}")]
731 Pool(#[from] PoolError),
732}
733
734pub async fn backup_local_transactions_task<P>(
737 shutdown: reth_tasks::shutdown::GracefulShutdown,
738 pool: P,
739 config: LocalTransactionBackupConfig,
740) where
741 P: TransactionPool<Transaction: PoolTransaction<Consensus: SignedTransaction>> + Clone,
742{
743 let Some(transactions_path) = config.transactions_path else {
744 return
746 };
747
748 if let Err(err) = load_and_reinsert_transactions(pool.clone(), &transactions_path).await {
749 error!(target: "txpool", "{}", err)
750 }
751
752 let graceful_guard = shutdown.await;
753
754 save_local_txs_backup(pool, &transactions_path);
756
757 drop(graceful_guard)
758}
759
760#[cfg(test)]
761mod tests {
762 use super::*;
763 use crate::{
764 blobstore::InMemoryBlobStore, validate::EthTransactionValidatorBuilder,
765 CoinbaseTipOrdering, EthPooledTransaction, Pool, TransactionOrigin,
766 };
767 use alloy_eips::eip2718::Decodable2718;
768 use alloy_primitives::{hex, U256};
769 use reth_ethereum_primitives::PooledTransactionVariant;
770 use reth_fs_util as fs;
771 use reth_provider::test_utils::{ExtendedAccount, MockEthProvider};
772 use reth_tasks::TaskManager;
773
774 #[test]
775 fn changed_acc_entry() {
776 let changed_acc = ChangedAccountEntry(ChangedAccount::empty(Address::random()));
777 let mut copy = changed_acc.0;
778 copy.nonce = 10;
779 assert!(changed_acc.eq(&ChangedAccountEntry(copy)));
780 }
781
782 const EXTENSION: &str = "json";
783 const FILENAME: &str = "test_transactions_backup";
784
785 #[tokio::test(flavor = "multi_thread")]
786 async fn test_save_local_txs_backup() {
787 let temp_dir = tempfile::tempdir().unwrap();
788 let transactions_path = temp_dir.path().join(FILENAME).with_extension(EXTENSION);
789 let tx_bytes = hex!(
790 "02f87201830655c2808505ef61f08482565f94388c818ca8b9251b393131c08a736a67ccb192978801049e39c4b5b1f580c001a01764ace353514e8abdfb92446de356b260e3c1225b73fc4c8876a6258d12a129a04f02294aa61ca7676061cd99f29275491218b4754b46a0248e5e42bc5091f507"
791 );
792 let tx = PooledTransactionVariant::decode_2718(&mut &tx_bytes[..]).unwrap();
793 let provider = MockEthProvider::default();
794 let transaction = EthPooledTransaction::from_pooled(tx.try_into_recovered().unwrap());
795 let tx_to_cmp = transaction.clone();
796 let sender = hex!("1f9090aaE28b8a3dCeaDf281B0F12828e676c326").into();
797 provider.add_account(sender, ExtendedAccount::new(42, U256::MAX));
798 let blob_store = InMemoryBlobStore::default();
799 let validator = EthTransactionValidatorBuilder::new(provider).build(blob_store.clone());
800
801 let txpool = Pool::new(
802 validator,
803 CoinbaseTipOrdering::default(),
804 blob_store.clone(),
805 Default::default(),
806 );
807
808 txpool.add_transaction(TransactionOrigin::Local, transaction.clone()).await.unwrap();
809
810 let handle = tokio::runtime::Handle::current();
811 let manager = TaskManager::new(handle);
812 let config = LocalTransactionBackupConfig::with_local_txs_backup(transactions_path.clone());
813 manager.executor().spawn_critical_with_graceful_shutdown_signal("test task", |shutdown| {
814 backup_local_transactions_task(shutdown, txpool.clone(), config)
815 });
816
817 let mut txns = txpool.get_local_transactions();
818 let tx_on_finish = txns.pop().expect("there should be 1 transaction");
819
820 assert_eq!(*tx_to_cmp.hash(), *tx_on_finish.hash());
821
822 manager.graceful_shutdown();
824
825 let data = fs::read(transactions_path).unwrap();
826
827 let txs: Vec<TxBackup> = serde_json::from_slice::<Vec<TxBackup>>(&data).unwrap();
828 assert_eq!(txs.len(), 1);
829
830 temp_dir.close().unwrap();
831 }
832
833 #[test]
834 fn test_update_with_higher_finalized_block() {
835 let mut tracker = FinalizedBlockTracker::new(Some(10));
836 assert_eq!(tracker.update(Some(15)), Some(15));
837 assert_eq!(tracker.last_finalized_block, Some(15));
838 }
839
840 #[test]
841 fn test_update_with_lower_finalized_block() {
842 let mut tracker = FinalizedBlockTracker::new(Some(20));
843 assert_eq!(tracker.update(Some(15)), None);
844 assert_eq!(tracker.last_finalized_block, Some(15));
845 }
846
847 #[test]
848 fn test_update_with_equal_finalized_block() {
849 let mut tracker = FinalizedBlockTracker::new(Some(20));
850 assert_eq!(tracker.update(Some(20)), None);
851 assert_eq!(tracker.last_finalized_block, Some(20));
852 }
853
854 #[test]
855 fn test_update_with_no_last_finalized_block() {
856 let mut tracker = FinalizedBlockTracker::new(None);
857 assert_eq!(tracker.update(Some(10)), Some(10));
858 assert_eq!(tracker.last_finalized_block, Some(10));
859 }
860
861 #[test]
862 fn test_update_with_no_new_finalized_block() {
863 let mut tracker = FinalizedBlockTracker::new(Some(10));
864 assert_eq!(tracker.update(None), None);
865 assert_eq!(tracker.last_finalized_block, Some(10));
866 }
867
868 #[test]
869 fn test_update_with_no_finalized_blocks() {
870 let mut tracker = FinalizedBlockTracker::new(None);
871 assert_eq!(tracker.update(None), None);
872 assert_eq!(tracker.last_finalized_block, None);
873 }
874}