1use crate::{
4 blobstore::{BlobStoreCanonTracker, BlobStoreUpdates},
5 error::PoolError,
6 metrics::MaintainPoolMetrics,
7 traits::{CanonicalStateUpdate, EthPoolTransaction, TransactionPool, TransactionPoolExt},
8 BlockInfo, PoolTransaction, PoolUpdateKind,
9};
10use alloy_consensus::{BlockHeader, Typed2718};
11use alloy_eips::BlockNumberOrTag;
12use alloy_primitives::{Address, BlockHash, BlockNumber};
13use alloy_rlp::Encodable;
14use futures_util::{
15 future::{BoxFuture, Fuse, FusedFuture},
16 FutureExt, Stream, StreamExt,
17};
18use reth_chain_state::CanonStateNotification;
19use reth_chainspec::{ChainSpecProvider, EthChainSpec};
20use reth_execution_types::ChangedAccount;
21use reth_fs_util::FsPathError;
22use reth_primitives_traits::{
23 transaction::signed::SignedTransaction, NodePrimitives, SealedHeader,
24};
25use reth_storage_api::{errors::provider::ProviderError, BlockReaderIdExt, StateProviderFactory};
26use reth_tasks::TaskSpawner;
27use std::{
28 borrow::Borrow,
29 collections::HashSet,
30 hash::{Hash, Hasher},
31 path::{Path, PathBuf},
32 sync::Arc,
33};
34use tokio::{
35 sync::oneshot,
36 time::{self, Duration},
37};
38use tracing::{debug, error, info, trace, warn};
39
40pub const MAX_QUEUED_TRANSACTION_LIFETIME: Duration = Duration::from_secs(3 * 60 * 60);
42
43#[derive(Debug, Clone, Copy, PartialEq, Eq)]
45pub struct MaintainPoolConfig {
46 pub max_update_depth: u64,
51 pub max_reload_accounts: usize,
55
56 pub max_tx_lifetime: Duration,
59
60 pub no_local_exemptions: bool,
66}
67
68impl Default for MaintainPoolConfig {
69 fn default() -> Self {
70 Self {
71 max_update_depth: 64,
72 max_reload_accounts: 100,
73 max_tx_lifetime: MAX_QUEUED_TRANSACTION_LIFETIME,
74 no_local_exemptions: false,
75 }
76 }
77}
78
79#[derive(Debug, Clone, Default)]
81pub struct LocalTransactionBackupConfig {
82 pub transactions_path: Option<PathBuf>,
84}
85
86impl LocalTransactionBackupConfig {
87 pub const fn with_local_txs_backup(transactions_path: PathBuf) -> Self {
89 Self { transactions_path: Some(transactions_path) }
90 }
91}
92
93pub fn maintain_transaction_pool_future<N, Client, P, St, Tasks>(
95 client: Client,
96 pool: P,
97 events: St,
98 task_spawner: Tasks,
99 config: MaintainPoolConfig,
100) -> BoxFuture<'static, ()>
101where
102 N: NodePrimitives,
103 Client: StateProviderFactory + BlockReaderIdExt + ChainSpecProvider + Clone + 'static,
104 P: TransactionPoolExt<Transaction: PoolTransaction<Consensus = N::SignedTx>> + 'static,
105 St: Stream<Item = CanonStateNotification<N>> + Send + Unpin + 'static,
106 Tasks: TaskSpawner + 'static,
107{
108 async move {
109 maintain_transaction_pool(client, pool, events, task_spawner, config).await;
110 }
111 .boxed()
112}
113
114pub async fn maintain_transaction_pool<N, Client, P, St, Tasks>(
118 client: Client,
119 pool: P,
120 mut events: St,
121 task_spawner: Tasks,
122 config: MaintainPoolConfig,
123) where
124 N: NodePrimitives,
125 Client: StateProviderFactory + BlockReaderIdExt + ChainSpecProvider + Clone + 'static,
126 P: TransactionPoolExt<Transaction: PoolTransaction<Consensus = N::SignedTx>> + 'static,
127 St: Stream<Item = CanonStateNotification<N>> + Send + Unpin + 'static,
128 Tasks: TaskSpawner + 'static,
129{
130 let metrics = MaintainPoolMetrics::default();
131 let MaintainPoolConfig { max_update_depth, max_reload_accounts, .. } = config;
132 if let Ok(Some(latest)) = client.header_by_number_or_tag(BlockNumberOrTag::Latest) {
134 let latest = SealedHeader::seal_slow(latest);
135 let chain_spec = client.chain_spec();
136 let info = BlockInfo {
137 block_gas_limit: latest.gas_limit(),
138 last_seen_block_hash: latest.hash(),
139 last_seen_block_number: latest.number(),
140 pending_basefee: latest
141 .next_block_base_fee(chain_spec.base_fee_params_at_timestamp(latest.timestamp()))
142 .unwrap_or_default(),
143 pending_blob_fee: latest
144 .maybe_next_block_blob_fee(chain_spec.blob_params_at_timestamp(latest.timestamp())),
145 };
146 pool.set_block_info(info);
147 }
148
149 let mut blob_store_tracker = BlobStoreCanonTracker::default();
151
152 let mut last_finalized_block =
154 FinalizedBlockTracker::new(client.finalized_block_number().ok().flatten());
155
156 let mut dirty_addresses = HashSet::default();
158
159 let mut maintained_state = MaintainedPoolState::InSync;
161
162 let mut reload_accounts_fut = Fuse::terminated();
164
165 let mut stale_eviction_interval = time::interval(config.max_tx_lifetime);
167
168 let mut first_event = true;
170
171 loop {
174 trace!(target: "txpool", state=?maintained_state, "awaiting new block or reorg");
175
176 metrics.set_dirty_accounts_len(dirty_addresses.len());
177 let pool_info = pool.block_info();
178
179 if maintained_state.is_drifted() {
183 metrics.inc_drift();
184 dirty_addresses = pool.unique_senders();
186 maintained_state = MaintainedPoolState::InSync;
188 }
189
190 if !dirty_addresses.is_empty() && reload_accounts_fut.is_terminated() {
192 let (tx, rx) = oneshot::channel();
193 let c = client.clone();
194 let at = pool_info.last_seen_block_hash;
195 let fut = if dirty_addresses.len() > max_reload_accounts {
196 let accs_to_reload =
198 dirty_addresses.iter().copied().take(max_reload_accounts).collect::<Vec<_>>();
199 for acc in &accs_to_reload {
200 dirty_addresses.remove(acc);
202 }
203 async move {
204 let res = load_accounts(c, at, accs_to_reload.into_iter());
205 let _ = tx.send(res);
206 }
207 .boxed()
208 } else {
209 let accs_to_reload = std::mem::take(&mut dirty_addresses);
211 async move {
212 let res = load_accounts(c, at, accs_to_reload.into_iter());
213 let _ = tx.send(res);
214 }
215 .boxed()
216 };
217 reload_accounts_fut = rx.fuse();
218 task_spawner.spawn_blocking(fut);
219 }
220
221 if let Some(finalized) =
223 last_finalized_block.update(client.finalized_block_number().ok().flatten())
224 {
225 if let BlobStoreUpdates::Finalized(blobs) =
226 blob_store_tracker.on_finalized_block(finalized)
227 {
228 metrics.inc_deleted_tracked_blobs(blobs.len());
229 pool.delete_blobs(blobs);
231 let pool = pool.clone();
233 task_spawner.spawn_blocking(Box::pin(async move {
234 debug!(target: "txpool", finalized_block = %finalized, "cleaning up blob store");
235 pool.cleanup_blobs();
236 }));
237 }
238 }
239
240 let mut event = None;
242 let mut reloaded = None;
243
244 tokio::select! {
247 res = &mut reload_accounts_fut => {
248 reloaded = Some(res);
249 }
250 ev = events.next() => {
251 if ev.is_none() {
252 break;
254 }
255 event = ev;
256 if first_event {
259 maintained_state = MaintainedPoolState::Drifted;
260 first_event = false
261 }
262 }
263 _ = stale_eviction_interval.tick() => {
264 let stale_txs: Vec<_> = pool
265 .queued_transactions()
266 .into_iter()
267 .filter(|tx| {
268 (tx.origin.is_external() || config.no_local_exemptions) && tx.timestamp.elapsed() > config.max_tx_lifetime
270 })
271 .map(|tx| *tx.hash())
272 .collect();
273 debug!(target: "txpool", count=%stale_txs.len(), "removing stale transactions");
274 pool.remove_transactions(stale_txs);
275 }
276 }
277 match reloaded {
279 Some(Ok(Ok(LoadedAccounts { accounts, failed_to_load }))) => {
280 dirty_addresses.extend(failed_to_load);
283 pool.update_accounts(accounts);
285 }
286 Some(Ok(Err(res))) => {
287 let (accs, err) = *res;
289 debug!(target: "txpool", %err, "failed to load accounts");
290 dirty_addresses.extend(accs);
291 }
292 Some(Err(_)) => {
293 maintained_state = MaintainedPoolState::Drifted;
295 }
296 None => {}
297 }
298
299 let Some(event) = event else { continue };
301 match event {
302 CanonStateNotification::Reorg { old, new } => {
303 let (old_blocks, old_state) = old.inner();
304 let (new_blocks, new_state) = new.inner();
305 let new_tip = new_blocks.tip();
306 let new_first = new_blocks.first();
307 let old_first = old_blocks.first();
308
309 if !(old_first.parent_hash() == pool_info.last_seen_block_hash ||
311 new_first.parent_hash() == pool_info.last_seen_block_hash)
312 {
313 maintained_state = MaintainedPoolState::Drifted;
315 }
316
317 let chain_spec = client.chain_spec();
318
319 let pending_block_base_fee = new_tip
321 .header()
322 .next_block_base_fee(
323 chain_spec.base_fee_params_at_timestamp(new_tip.timestamp()),
324 )
325 .unwrap_or_default();
326 let pending_block_blob_fee = new_tip.header().maybe_next_block_blob_fee(
327 chain_spec.blob_params_at_timestamp(new_tip.timestamp()),
328 );
329
330 let new_changed_accounts: HashSet<_> =
332 new_state.changed_accounts().map(ChangedAccountEntry).collect();
333
334 let missing_changed_acc = old_state
336 .accounts_iter()
337 .map(|(a, _)| a)
338 .filter(|addr| !new_changed_accounts.contains(addr));
339
340 let mut changed_accounts =
342 match load_accounts(client.clone(), new_tip.hash(), missing_changed_acc) {
343 Ok(LoadedAccounts { accounts, failed_to_load }) => {
344 dirty_addresses.extend(failed_to_load);
346
347 accounts
348 }
349 Err(err) => {
350 let (addresses, err) = *err;
351 debug!(
352 target: "txpool",
353 %err,
354 "failed to load missing changed accounts at new tip: {:?}",
355 new_tip.hash()
356 );
357 dirty_addresses.extend(addresses);
358 vec![]
359 }
360 };
361
362 changed_accounts.extend(new_changed_accounts.into_iter().map(|entry| entry.0));
365
366 let new_mined_transactions: HashSet<_> = new_blocks.transaction_hashes().collect();
368
369 let pruned_old_transactions = old_blocks
372 .transactions_ecrecovered()
373 .filter(|tx| !new_mined_transactions.contains(tx.tx_hash()))
374 .filter_map(|tx| {
375 if tx.is_eip4844() {
376 pool.get_blob(*tx.tx_hash())
382 .ok()
383 .flatten()
384 .map(Arc::unwrap_or_clone)
385 .and_then(|sidecar| {
386 <P as TransactionPool>::Transaction::try_from_eip4844(
387 tx, sidecar,
388 )
389 })
390 } else {
391 <P as TransactionPool>::Transaction::try_from_consensus(tx).ok()
392 }
393 })
394 .collect::<Vec<_>>();
395
396 let update = CanonicalStateUpdate {
398 new_tip: new_tip.sealed_block(),
399 pending_block_base_fee,
400 pending_block_blob_fee,
401 changed_accounts,
402 mined_transactions: new_blocks.transaction_hashes().collect(),
404 update_kind: PoolUpdateKind::Reorg,
405 };
406 pool.on_canonical_state_change(update);
407
408 metrics.inc_reinserted_transactions(pruned_old_transactions.len());
415 let _ = pool.add_external_transactions(pruned_old_transactions).await;
416
417 blob_store_tracker.add_new_chain_blocks(&new_blocks);
419 }
420 CanonStateNotification::Commit { new } => {
421 let (blocks, state) = new.inner();
422 let tip = blocks.tip();
423 let chain_spec = client.chain_spec();
424
425 let pending_block_base_fee = tip
427 .header()
428 .next_block_base_fee(chain_spec.base_fee_params_at_timestamp(tip.timestamp()))
429 .unwrap_or_default();
430 let pending_block_blob_fee = tip.header().maybe_next_block_blob_fee(
431 chain_spec.blob_params_at_timestamp(tip.timestamp()),
432 );
433
434 let first_block = blocks.first();
435 trace!(
436 target: "txpool",
437 first = first_block.number(),
438 tip = tip.number(),
439 pool_block = pool_info.last_seen_block_number,
440 "update pool on new commit"
441 );
442
443 let depth = tip.number().abs_diff(pool_info.last_seen_block_number);
446 if depth > max_update_depth {
447 maintained_state = MaintainedPoolState::Drifted;
448 debug!(target: "txpool", ?depth, "skipping deep canonical update");
449 let info = BlockInfo {
450 block_gas_limit: tip.header().gas_limit(),
451 last_seen_block_hash: tip.hash(),
452 last_seen_block_number: tip.number(),
453 pending_basefee: pending_block_base_fee,
454 pending_blob_fee: pending_block_blob_fee,
455 };
456 pool.set_block_info(info);
457
458 blob_store_tracker.add_new_chain_blocks(&blocks);
460
461 continue
462 }
463
464 let mut changed_accounts = Vec::with_capacity(state.state().len());
465 for acc in state.changed_accounts() {
466 dirty_addresses.remove(&acc.address);
468 changed_accounts.push(acc);
469 }
470
471 let mined_transactions = blocks.transaction_hashes().collect();
472
473 if first_block.parent_hash() != pool_info.last_seen_block_hash {
475 maintained_state = MaintainedPoolState::Drifted;
479 }
480
481 let update = CanonicalStateUpdate {
483 new_tip: tip.sealed_block(),
484 pending_block_base_fee,
485 pending_block_blob_fee,
486 changed_accounts,
487 mined_transactions,
488 update_kind: PoolUpdateKind::Commit,
489 };
490 pool.on_canonical_state_change(update);
491
492 blob_store_tracker.add_new_chain_blocks(&blocks);
494 }
495 }
496 }
497}
498
499struct FinalizedBlockTracker {
500 last_finalized_block: Option<BlockNumber>,
501}
502
503impl FinalizedBlockTracker {
504 const fn new(last_finalized_block: Option<BlockNumber>) -> Self {
505 Self { last_finalized_block }
506 }
507
508 fn update(&mut self, finalized_block: Option<BlockNumber>) -> Option<BlockNumber> {
510 let finalized = finalized_block?;
511 self.last_finalized_block
512 .replace(finalized)
513 .is_none_or(|last| last < finalized)
514 .then_some(finalized)
515 }
516}
517
518#[derive(Debug, PartialEq, Eq)]
521enum MaintainedPoolState {
522 InSync,
524 Drifted,
526}
527
528impl MaintainedPoolState {
529 #[inline]
531 const fn is_drifted(&self) -> bool {
532 matches!(self, Self::Drifted)
533 }
534}
535
536#[derive(Eq)]
538struct ChangedAccountEntry(ChangedAccount);
539
540impl PartialEq for ChangedAccountEntry {
541 fn eq(&self, other: &Self) -> bool {
542 self.0.address == other.0.address
543 }
544}
545
546impl Hash for ChangedAccountEntry {
547 fn hash<H: Hasher>(&self, state: &mut H) {
548 self.0.address.hash(state);
549 }
550}
551
552impl Borrow<Address> for ChangedAccountEntry {
553 fn borrow(&self) -> &Address {
554 &self.0.address
555 }
556}
557
558#[derive(Default)]
559struct LoadedAccounts {
560 accounts: Vec<ChangedAccount>,
562 failed_to_load: Vec<Address>,
564}
565
566fn load_accounts<Client, I>(
572 client: Client,
573 at: BlockHash,
574 addresses: I,
575) -> Result<LoadedAccounts, Box<(HashSet<Address>, ProviderError)>>
576where
577 I: IntoIterator<Item = Address>,
578 Client: StateProviderFactory,
579{
580 let addresses = addresses.into_iter();
581 let mut res = LoadedAccounts::default();
582 let state = match client.history_by_block_hash(at) {
583 Ok(state) => state,
584 Err(err) => return Err(Box::new((addresses.collect(), err))),
585 };
586 for addr in addresses {
587 if let Ok(maybe_acc) = state.basic_account(&addr) {
588 let acc = maybe_acc
589 .map(|acc| ChangedAccount { address: addr, nonce: acc.nonce, balance: acc.balance })
590 .unwrap_or_else(|| ChangedAccount::empty(addr));
591 res.accounts.push(acc)
592 } else {
593 res.failed_to_load.push(addr);
595 }
596 }
597 Ok(res)
598}
599
600async fn load_and_reinsert_transactions<P>(
604 pool: P,
605 file_path: &Path,
606) -> Result<(), TransactionsBackupError>
607where
608 P: TransactionPool<Transaction: PoolTransaction<Consensus: SignedTransaction>>,
609{
610 if !file_path.exists() {
611 return Ok(())
612 }
613
614 debug!(target: "txpool", txs_file =?file_path, "Check local persistent storage for saved transactions");
615 let data = reth_fs_util::read(file_path)?;
616
617 if data.is_empty() {
618 return Ok(())
619 }
620
621 let txs_signed: Vec<<P::Transaction as PoolTransaction>::Consensus> =
622 alloy_rlp::Decodable::decode(&mut data.as_slice())?;
623
624 let pool_transactions = txs_signed
625 .into_iter()
626 .filter_map(|tx| tx.try_clone_into_recovered().ok())
627 .filter_map(|tx| {
628 <P::Transaction as PoolTransaction>::try_from_consensus(tx).ok()
630 })
631 .collect();
632
633 let outcome = pool.add_transactions(crate::TransactionOrigin::Local, pool_transactions).await;
634
635 info!(target: "txpool", txs_file =?file_path, num_txs=%outcome.len(), "Successfully reinserted local transactions from file");
636 reth_fs_util::remove_file(file_path)?;
637 Ok(())
638}
639
640fn save_local_txs_backup<P>(pool: P, file_path: &Path)
641where
642 P: TransactionPool<Transaction: PoolTransaction<Consensus: Encodable>>,
643{
644 let local_transactions = pool.get_local_transactions();
645 if local_transactions.is_empty() {
646 trace!(target: "txpool", "no local transactions to save");
647 return
648 }
649
650 let local_transactions = local_transactions
651 .into_iter()
652 .map(|tx| tx.transaction.clone_into_consensus().into_inner())
653 .collect::<Vec<_>>();
654
655 let num_txs = local_transactions.len();
656 let mut buf = Vec::new();
657 alloy_rlp::encode_list(&local_transactions, &mut buf);
658 info!(target: "txpool", txs_file =?file_path, num_txs=%num_txs, "Saving current local transactions");
659 let parent_dir = file_path.parent().map(std::fs::create_dir_all).transpose();
660
661 match parent_dir.map(|_| reth_fs_util::write(file_path, buf)) {
662 Ok(_) => {
663 info!(target: "txpool", txs_file=?file_path, "Wrote local transactions to file");
664 }
665 Err(err) => {
666 warn!(target: "txpool", %err, txs_file=?file_path, "Failed to write local transactions to file");
667 }
668 }
669}
670
671#[derive(thiserror::Error, Debug)]
673pub enum TransactionsBackupError {
674 #[error("failed to apply transactions backup. Encountered RLP decode error: {0}")]
676 Decode(#[from] alloy_rlp::Error),
677 #[error("failed to apply transactions backup. Encountered file error: {0}")]
679 FsPath(#[from] FsPathError),
680 #[error("failed to insert transactions to the transactions pool. Encountered pool error: {0}")]
682 Pool(#[from] PoolError),
683}
684
685pub async fn backup_local_transactions_task<P>(
688 shutdown: reth_tasks::shutdown::GracefulShutdown,
689 pool: P,
690 config: LocalTransactionBackupConfig,
691) where
692 P: TransactionPool<Transaction: PoolTransaction<Consensus: SignedTransaction>> + Clone,
693{
694 let Some(transactions_path) = config.transactions_path else {
695 return
697 };
698
699 if let Err(err) = load_and_reinsert_transactions(pool.clone(), &transactions_path).await {
700 error!(target: "txpool", "{}", err)
701 }
702
703 let graceful_guard = shutdown.await;
704
705 save_local_txs_backup(pool, &transactions_path);
707
708 drop(graceful_guard)
709}
710
711#[cfg(test)]
712mod tests {
713 use super::*;
714 use crate::{
715 blobstore::InMemoryBlobStore, validate::EthTransactionValidatorBuilder,
716 CoinbaseTipOrdering, EthPooledTransaction, Pool, TransactionOrigin,
717 };
718 use alloy_consensus::transaction::PooledTransaction;
719 use alloy_eips::eip2718::Decodable2718;
720 use alloy_primitives::{hex, U256};
721 use reth_ethereum_primitives::TransactionSigned;
722 use reth_fs_util as fs;
723 use reth_provider::test_utils::{ExtendedAccount, MockEthProvider};
724 use reth_tasks::TaskManager;
725
726 #[test]
727 fn changed_acc_entry() {
728 let changed_acc = ChangedAccountEntry(ChangedAccount::empty(Address::random()));
729 let mut copy = changed_acc.0;
730 copy.nonce = 10;
731 assert!(changed_acc.eq(&ChangedAccountEntry(copy)));
732 }
733
734 const EXTENSION: &str = "rlp";
735 const FILENAME: &str = "test_transactions_backup";
736
737 #[tokio::test(flavor = "multi_thread")]
738 async fn test_save_local_txs_backup() {
739 let temp_dir = tempfile::tempdir().unwrap();
740 let transactions_path = temp_dir.path().join(FILENAME).with_extension(EXTENSION);
741 let tx_bytes = hex!(
742 "02f87201830655c2808505ef61f08482565f94388c818ca8b9251b393131c08a736a67ccb192978801049e39c4b5b1f580c001a01764ace353514e8abdfb92446de356b260e3c1225b73fc4c8876a6258d12a129a04f02294aa61ca7676061cd99f29275491218b4754b46a0248e5e42bc5091f507"
743 );
744 let tx = PooledTransaction::decode_2718(&mut &tx_bytes[..]).unwrap();
745 let provider = MockEthProvider::default();
746 let transaction = EthPooledTransaction::from_pooled(tx.try_into_recovered().unwrap());
747 let tx_to_cmp = transaction.clone();
748 let sender = hex!("1f9090aaE28b8a3dCeaDf281B0F12828e676c326").into();
749 provider.add_account(sender, ExtendedAccount::new(42, U256::MAX));
750 let blob_store = InMemoryBlobStore::default();
751 let validator = EthTransactionValidatorBuilder::new(provider).build(blob_store.clone());
752
753 let txpool = Pool::new(
754 validator.clone(),
755 CoinbaseTipOrdering::default(),
756 blob_store.clone(),
757 Default::default(),
758 );
759
760 txpool.add_transaction(TransactionOrigin::Local, transaction.clone()).await.unwrap();
761
762 let handle = tokio::runtime::Handle::current();
763 let manager = TaskManager::new(handle);
764 let config = LocalTransactionBackupConfig::with_local_txs_backup(transactions_path.clone());
765 manager.executor().spawn_critical_with_graceful_shutdown_signal("test task", |shutdown| {
766 backup_local_transactions_task(shutdown, txpool.clone(), config)
767 });
768
769 let mut txns = txpool.get_local_transactions();
770 let tx_on_finish = txns.pop().expect("there should be 1 transaction");
771
772 assert_eq!(*tx_to_cmp.hash(), *tx_on_finish.hash());
773
774 manager.graceful_shutdown();
776
777 let data = fs::read(transactions_path).unwrap();
778
779 let txs: Vec<TransactionSigned> =
780 alloy_rlp::Decodable::decode(&mut data.as_slice()).unwrap();
781 assert_eq!(txs.len(), 1);
782
783 temp_dir.close().unwrap();
784 }
785
786 #[test]
787 fn test_update_with_higher_finalized_block() {
788 let mut tracker = FinalizedBlockTracker::new(Some(10));
789 assert_eq!(tracker.update(Some(15)), Some(15));
790 assert_eq!(tracker.last_finalized_block, Some(15));
791 }
792
793 #[test]
794 fn test_update_with_lower_finalized_block() {
795 let mut tracker = FinalizedBlockTracker::new(Some(20));
796 assert_eq!(tracker.update(Some(15)), None);
797 assert_eq!(tracker.last_finalized_block, Some(15));
798 }
799
800 #[test]
801 fn test_update_with_equal_finalized_block() {
802 let mut tracker = FinalizedBlockTracker::new(Some(20));
803 assert_eq!(tracker.update(Some(20)), None);
804 assert_eq!(tracker.last_finalized_block, Some(20));
805 }
806
807 #[test]
808 fn test_update_with_no_last_finalized_block() {
809 let mut tracker = FinalizedBlockTracker::new(None);
810 assert_eq!(tracker.update(Some(10)), Some(10));
811 assert_eq!(tracker.last_finalized_block, Some(10));
812 }
813
814 #[test]
815 fn test_update_with_no_new_finalized_block() {
816 let mut tracker = FinalizedBlockTracker::new(Some(10));
817 assert_eq!(tracker.update(None), None);
818 assert_eq!(tracker.last_finalized_block, Some(10));
819 }
820
821 #[test]
822 fn test_update_with_no_finalized_blocks() {
823 let mut tracker = FinalizedBlockTracker::new(None);
824 assert_eq!(tracker.update(None), None);
825 assert_eq!(tracker.last_finalized_block, None);
826 }
827}