1use crate::metrics::PersistenceMetrics;
2use alloy_eips::BlockNumHash;
3use crossbeam_channel::Sender as CrossbeamSender;
4use reth_chain_state::ExecutedBlock;
5use reth_errors::ProviderError;
6use reth_ethereum_primitives::EthPrimitives;
7use reth_primitives_traits::{FastInstant as Instant, NodePrimitives};
8use reth_provider::{
9 providers::ProviderNodeTypes, BalProvider, BlockExecutionWriter, BlockHashReader,
10 ChainStateBlockWriter, DBProvider, DatabaseProviderFactory, ProviderFactory, SaveBlocksMode,
11};
12use reth_prune::{PrunerError, PrunerWithFactory};
13use reth_stages_api::{MetricEvent, MetricEventsSender};
14use reth_tasks::spawn_os_thread;
15use std::{
16 sync::{
17 mpsc::{Receiver, SendError, Sender},
18 Arc,
19 },
20 thread::JoinHandle,
21 time::Duration,
22};
23use thiserror::Error;
24use tracing::{debug, error, instrument, warn};
25
26#[derive(Debug)]
28pub struct PersistenceResult {
29 pub last_block: Option<BlockNumHash>,
31 pub commit_duration: Option<Duration>,
33}
34
35#[derive(Debug)]
43pub struct PersistenceService<N>
44where
45 N: ProviderNodeTypes,
46{
47 provider: ProviderFactory<N>,
49 incoming: Receiver<PersistenceAction<N::Primitives>>,
51 pruner: PrunerWithFactory<ProviderFactory<N>>,
53 metrics: PersistenceMetrics,
55 sync_metrics_tx: MetricEventsSender,
57 pending_finalized_block: Option<u64>,
60 pending_safe_block: Option<u64>,
63}
64
65impl<N> PersistenceService<N>
66where
67 N: ProviderNodeTypes,
68{
69 pub fn new(
71 provider: ProviderFactory<N>,
72 incoming: Receiver<PersistenceAction<N::Primitives>>,
73 pruner: PrunerWithFactory<ProviderFactory<N>>,
74 sync_metrics_tx: MetricEventsSender,
75 ) -> Self {
76 Self {
77 provider,
78 incoming,
79 pruner,
80 metrics: PersistenceMetrics::default(),
81 sync_metrics_tx,
82 pending_finalized_block: None,
83 pending_safe_block: None,
84 }
85 }
86}
87
88impl<N> PersistenceService<N>
89where
90 N: ProviderNodeTypes,
91{
92 pub fn run(mut self) -> Result<(), PersistenceError> {
95 while let Ok(action) = self.incoming.recv() {
97 match action {
98 PersistenceAction::RemoveBlocksAbove(new_tip_num, sender) => {
99 let last_block = self.on_remove_blocks_above(new_tip_num)?;
100 let _ =
102 self.sync_metrics_tx.send(MetricEvent::SyncHeight { height: new_tip_num });
103 let _ = sender.send(PersistenceResult { last_block, commit_duration: None });
104 }
105 PersistenceAction::SaveBlocks(blocks, sender) => {
106 let result = self.on_save_blocks(blocks)?;
107 let result_number = result.last_block.map(|b| b.number);
108
109 let _ = sender.send(result);
110
111 if let Some(block_number) = result_number {
112 let _ = self
114 .sync_metrics_tx
115 .send(MetricEvent::SyncHeight { height: block_number });
116 self.maybe_run_pruner(block_number)?;
117 }
118 }
119 PersistenceAction::SaveFinalizedBlock(finalized_block) => {
120 self.pending_finalized_block = Some(finalized_block);
121 }
122 PersistenceAction::SaveSafeBlock(safe_block) => {
123 self.pending_safe_block = Some(safe_block);
124 }
125 }
126 }
127 Ok(())
128 }
129
130 #[instrument(level = "debug", target = "engine::persistence", skip_all, fields(%new_tip_num))]
131 fn on_remove_blocks_above(
132 &self,
133 new_tip_num: u64,
134 ) -> Result<Option<BlockNumHash>, PersistenceError> {
135 debug!(target: "engine::persistence", ?new_tip_num, "Removing blocks");
136 let start_time = Instant::now();
137 let provider_rw = self.provider.database_provider_rw()?;
138
139 let new_tip_hash = provider_rw.block_hash(new_tip_num)?;
140 provider_rw.remove_block_and_execution_above(new_tip_num)?;
141 provider_rw.commit()?;
142
143 debug!(target: "engine::persistence", ?new_tip_num, ?new_tip_hash, "Removed blocks from disk");
144 self.metrics.remove_blocks_above_duration_seconds.record(start_time.elapsed());
145 Ok(new_tip_hash.map(|hash| BlockNumHash { hash, number: new_tip_num }))
146 }
147
148 #[instrument(level = "debug", target = "engine::persistence", skip_all, fields(block_count = blocks.len()))]
149 fn on_save_blocks(
150 &mut self,
151 blocks: Vec<ExecutedBlock<N::Primitives>>,
152 ) -> Result<PersistenceResult, PersistenceError> {
153 let first_block = blocks.first().map(|b| b.recovered_block.num_hash());
154 let last_block = blocks.last().map(|b| b.recovered_block.num_hash());
155 let block_count = blocks.len();
156
157 let pending_finalized = self.pending_finalized_block.take();
158 let pending_safe = self.pending_safe_block.take();
159
160 debug!(target: "engine::persistence", ?block_count, first=?first_block, last=?last_block, "Saving range of blocks");
161
162 let start_time = Instant::now();
163
164 if let Some(last) = last_block {
165 let provider_rw = self.provider.database_provider_rw()?;
166 provider_rw.save_blocks(blocks, SaveBlocksMode::Full)?;
167
168 if let Some(finalized) = pending_finalized {
169 provider_rw.save_finalized_block_number(finalized.min(last.number))?;
170 if finalized > last.number {
171 self.pending_finalized_block = Some(finalized);
172 }
173 }
174 if let Some(safe) = pending_safe {
175 provider_rw.save_safe_block_number(safe.min(last.number))?;
176 if safe > last.number {
177 self.pending_safe_block = Some(safe);
178 }
179 }
180
181 provider_rw.commit()?;
182 let _ = self.provider.bal_store().flush().inspect_err(|err| {
183 warn!(target: "engine::persistence", last=?last_block, ?err, "Failed to flush BAL store");
184 });
185 debug!(target: "engine::persistence", first=?first_block, last=?last_block, "Saved range of blocks");
186 }
187
188 let elapsed = start_time.elapsed();
189 self.metrics.save_blocks_batch_size.record(block_count as f64);
190 self.metrics.save_blocks_duration_seconds.record(elapsed);
191
192 Ok(PersistenceResult { last_block, commit_duration: Some(elapsed) })
193 }
194
195 fn maybe_run_pruner(&mut self, block_number: u64) -> Result<(), PersistenceError> {
196 if self.pruner.is_pruning_needed(block_number) {
199 debug!(target: "engine::persistence", block_num=?block_number, "Running pruner");
200 let prune_start = Instant::now();
201 let provider_rw = self.provider.database_provider_rw()?;
202 let _ = self.pruner.run_with_provider(&provider_rw, block_number)?;
203 provider_rw.commit()?;
204 let pruned_bals = self
205 .provider
206 .bal_store()
207 .prune(block_number)
208 .inspect_err(|err| {
209 warn!(target: "engine::persistence", tip=?block_number, ?err, "Failed to prune BAL store");
210 })
211 .unwrap_or_default();
212 debug!(target: "engine::persistence", tip=?block_number, pruned_bals, "Finished pruning after saving blocks");
213 self.metrics.prune_before_duration_seconds.record(prune_start.elapsed());
214 }
215
216 Ok(())
217 }
218}
219
220#[derive(Debug, Error)]
222pub enum PersistenceError {
223 #[error(transparent)]
225 PrunerError(#[from] PrunerError),
226
227 #[error(transparent)]
229 ProviderError(#[from] ProviderError),
230}
231
232#[derive(Debug)]
234pub enum PersistenceAction<N: NodePrimitives = EthPrimitives> {
235 SaveBlocks(Vec<ExecutedBlock<N>>, CrossbeamSender<PersistenceResult>),
241
242 RemoveBlocksAbove(u64, CrossbeamSender<PersistenceResult>),
247
248 SaveFinalizedBlock(u64),
250
251 SaveSafeBlock(u64),
253}
254
255#[derive(Debug, Clone)]
257pub struct PersistenceHandle<N: NodePrimitives = EthPrimitives> {
258 sender: Sender<PersistenceAction<N>>,
260 _service_guard: Arc<ServiceGuard>,
263}
264
265impl<T: NodePrimitives> PersistenceHandle<T> {
266 pub fn new(sender: Sender<PersistenceAction<T>>) -> Self {
271 Self { sender, _service_guard: Arc::new(ServiceGuard(None)) }
272 }
273
274 pub fn spawn_service<N>(
280 provider_factory: ProviderFactory<N>,
281 pruner: PrunerWithFactory<ProviderFactory<N>>,
282 sync_metrics_tx: MetricEventsSender,
283 ) -> PersistenceHandle<N::Primitives>
284 where
285 N: ProviderNodeTypes,
286 {
287 let (db_service_tx, db_service_rx) = std::sync::mpsc::channel();
289
290 let db_service =
292 PersistenceService::new(provider_factory, db_service_rx, pruner, sync_metrics_tx);
293 let join_handle = spawn_os_thread("persistence", || {
294 if let Err(err) = db_service.run() {
295 error!(target: "engine::persistence", ?err, "Persistence service failed");
296 }
297 });
298
299 PersistenceHandle {
300 sender: db_service_tx,
301 _service_guard: Arc::new(ServiceGuard(Some(join_handle))),
302 }
303 }
304
305 pub fn send_action(
308 &self,
309 action: PersistenceAction<T>,
310 ) -> Result<(), SendError<PersistenceAction<T>>> {
311 self.sender.send(action)
312 }
313
314 pub fn save_blocks(
323 &self,
324 blocks: Vec<ExecutedBlock<T>>,
325 tx: CrossbeamSender<PersistenceResult>,
326 ) -> Result<(), SendError<PersistenceAction<T>>> {
327 self.send_action(PersistenceAction::SaveBlocks(blocks, tx))
328 }
329
330 pub fn save_finalized_block_number(
335 &self,
336 finalized_block: u64,
337 ) -> Result<(), SendError<PersistenceAction<T>>> {
338 self.send_action(PersistenceAction::SaveFinalizedBlock(finalized_block))
339 }
340
341 pub fn save_safe_block_number(
346 &self,
347 safe_block: u64,
348 ) -> Result<(), SendError<PersistenceAction<T>>> {
349 self.send_action(PersistenceAction::SaveSafeBlock(safe_block))
350 }
351
352 pub fn remove_blocks_above(
358 &self,
359 block_num: u64,
360 tx: CrossbeamSender<PersistenceResult>,
361 ) -> Result<(), SendError<PersistenceAction<T>>> {
362 self.send_action(PersistenceAction::RemoveBlocksAbove(block_num, tx))
363 }
364}
365
366struct ServiceGuard(Option<JoinHandle<()>>);
372
373impl std::fmt::Debug for ServiceGuard {
374 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
375 f.debug_tuple("ServiceGuard").field(&self.0.as_ref().map(|_| "...")).finish()
376 }
377}
378
379impl Drop for ServiceGuard {
380 fn drop(&mut self) {
381 if let Some(join_handle) = self.0.take() {
382 let _ = join_handle.join();
383 }
384 }
385}
386
387#[cfg(test)]
388mod tests {
389 use super::*;
390 use alloy_eips::NumHash;
391 use alloy_primitives::{keccak256, BlockHash, BlockNumber, Bytes, Sealed, B256, U256};
392 use reth_chain_state::test_utils::TestBlockBuilder;
393 use reth_exex_types::FinishedExExHeight;
394 use reth_provider::{
395 providers::{ProviderFactoryBuilder, ReadOnlyConfig},
396 test_utils::{create_test_provider_factory, MockNodeTypes},
397 AccountReader, BalConfig, BalNotificationStream, BalStore, BalStoreHandle,
398 ChainSpecProvider, HeaderProvider, InMemoryBalStore, ProviderError, ProviderResult,
399 SealedBal, StorageSettingsCache, TryIntoHistoricalStateProvider,
400 };
401 use reth_prune::Pruner;
402 use reth_prune_types::PruneMode;
403 use tokio::sync::mpsc::unbounded_channel;
404
405 fn default_persistence_handle() -> PersistenceHandle<EthPrimitives> {
406 let provider = create_test_provider_factory();
407
408 let (_finished_exex_height_tx, finished_exex_height_rx) =
409 tokio::sync::watch::channel(FinishedExExHeight::NoExExs);
410
411 let pruner =
412 Pruner::new_with_factory(provider.clone(), vec![], 5, 0, None, finished_exex_height_rx);
413
414 let (sync_metrics_tx, _sync_metrics_rx) = unbounded_channel();
415 PersistenceHandle::<EthPrimitives>::spawn_service(provider, pruner, sync_metrics_tx)
416 }
417
418 #[test]
419 fn test_pruner_prunes_bal_store() {
420 reth_tracing::init_test_tracing();
421
422 let old_hash = B256::random();
423 let retained_hash = B256::random();
424 let old_bal = Bytes::from_static(b"old");
425 let retained_bal = Bytes::from_static(b"retained");
426 let bal_store = BalStoreHandle::new(InMemoryBalStore::new(
427 BalConfig::with_in_memory_retention(PruneMode::Before(2)),
428 ));
429
430 bal_store
431 .insert(
432 NumHash::new(1, old_hash),
433 Sealed::new_unchecked(old_bal.clone(), keccak256(&old_bal)),
434 )
435 .unwrap();
436 bal_store
437 .insert(
438 NumHash::new(2, retained_hash),
439 Sealed::new_unchecked(retained_bal.clone(), keccak256(&retained_bal)),
440 )
441 .unwrap();
442
443 let provider = create_test_provider_factory().with_bal_store(bal_store.clone());
444 let (_finished_exex_height_tx, finished_exex_height_rx) =
445 tokio::sync::watch::channel(FinishedExExHeight::NoExExs);
446 let pruner =
447 Pruner::new_with_factory(provider.clone(), vec![], 0, 0, None, finished_exex_height_rx);
448 let (_db_service_tx, db_service_rx) = std::sync::mpsc::channel();
449 let (sync_metrics_tx, _sync_metrics_rx) = unbounded_channel();
450 let mut service = PersistenceService::new(provider, db_service_rx, pruner, sync_metrics_tx);
451
452 service.maybe_run_pruner(2).unwrap();
453
454 assert_eq!(
455 bal_store.get_by_hashes(&[old_hash, retained_hash]).unwrap(),
456 vec![None, Some(retained_bal)]
457 );
458 }
459
460 #[test]
461 fn test_pruner_ignores_bal_store_prune_error() {
462 reth_tracing::init_test_tracing();
463
464 let provider = create_test_provider_factory()
465 .with_bal_store(BalStoreHandle::new(FailingPruneBalStore));
466 let (_finished_exex_height_tx, finished_exex_height_rx) =
467 tokio::sync::watch::channel(FinishedExExHeight::NoExExs);
468 let pruner =
469 Pruner::new_with_factory(provider.clone(), vec![], 0, 0, None, finished_exex_height_rx);
470 let (_db_service_tx, db_service_rx) = std::sync::mpsc::channel();
471 let (sync_metrics_tx, _sync_metrics_rx) = unbounded_channel();
472 let mut service = PersistenceService::new(provider, db_service_rx, pruner, sync_metrics_tx);
473
474 service.maybe_run_pruner(2).unwrap();
475 }
476
477 #[derive(Debug)]
478 struct FailingPruneBalStore;
479
480 impl BalStore for FailingPruneBalStore {
481 fn insert(&self, _num_hash: NumHash, _bal: SealedBal) -> ProviderResult<()> {
482 Ok(())
483 }
484
485 fn prune(&self, _tip: BlockNumber) -> ProviderResult<usize> {
486 Err(ProviderError::other(std::io::Error::other("BAL store prune failed")))
487 }
488
489 fn get_by_hashes(&self, block_hashes: &[BlockHash]) -> ProviderResult<Vec<Option<Bytes>>> {
490 Ok(vec![None; block_hashes.len()])
491 }
492
493 fn get_by_range(&self, _start: BlockNumber, _count: u64) -> ProviderResult<Vec<Bytes>> {
494 Ok(Vec::new())
495 }
496
497 fn bal_stream(&self) -> BalNotificationStream {
498 BalStoreHandle::noop().bal_stream()
499 }
500 }
501
502 #[test]
503 fn test_save_blocks_empty() {
504 reth_tracing::init_test_tracing();
505 let handle = default_persistence_handle();
506
507 let blocks = vec![];
508 let (tx, rx) = crossbeam_channel::bounded(1);
509
510 handle.save_blocks(blocks, tx).unwrap();
511
512 let result = rx.recv().unwrap();
513 assert!(result.last_block.is_none());
514 }
515
516 #[test]
517 fn test_save_blocks_single_block() {
518 reth_tracing::init_test_tracing();
519 let handle = default_persistence_handle();
520 let block_number = 0;
521 let mut test_block_builder = TestBlockBuilder::eth();
522 let executed =
523 test_block_builder.get_executed_block_with_number(block_number, B256::random());
524 let block_hash = executed.recovered_block().hash();
525
526 let blocks = vec![executed];
527 let (tx, rx) = crossbeam_channel::bounded(1);
528
529 handle.save_blocks(blocks, tx).unwrap();
530
531 let result = rx.recv_timeout(std::time::Duration::from_secs(10)).expect("test timed out");
532
533 assert_eq!(block_hash, result.last_block.unwrap().hash);
534 }
535
536 #[test]
537 fn test_save_blocks_multiple_blocks() {
538 reth_tracing::init_test_tracing();
539 let handle = default_persistence_handle();
540
541 let mut test_block_builder = TestBlockBuilder::eth();
542 let blocks = test_block_builder.get_executed_blocks(0..5).collect::<Vec<_>>();
543 let last_hash = blocks.last().unwrap().recovered_block().hash();
544 let (tx, rx) = crossbeam_channel::bounded(1);
545
546 handle.save_blocks(blocks, tx).unwrap();
547 let result = rx.recv().unwrap();
548 assert_eq!(last_hash, result.last_block.unwrap().hash);
549 }
550
551 #[test]
552 fn test_save_blocks_multiple_calls() {
553 reth_tracing::init_test_tracing();
554 let handle = default_persistence_handle();
555
556 let ranges = [0..1, 1..2, 2..4, 4..5];
557 let mut test_block_builder = TestBlockBuilder::eth();
558 for range in ranges {
559 let blocks = test_block_builder.get_executed_blocks(range).collect::<Vec<_>>();
560 let last_hash = blocks.last().unwrap().recovered_block().hash();
561 let (tx, rx) = crossbeam_channel::bounded(1);
562
563 handle.save_blocks(blocks, tx).unwrap();
564
565 let result = rx.recv().unwrap();
566 assert_eq!(last_hash, result.last_block.unwrap().hash);
567 }
568 }
569
570 #[test]
578 fn test_save_blocks_then_prune_preserves_new_history() {
579 use reth_db::{models::ShardedKey, tables, BlockNumberList};
580 use reth_provider::RocksDBProviderFactory;
581
582 reth_tracing::init_test_tracing();
583
584 let provider_factory = create_test_provider_factory();
585 let tracked_addr = alloy_primitives::Address::from([0xBE; 20]);
586
587 let rocksdb = provider_factory.rocksdb_provider();
589 {
590 let mut batch = rocksdb.batch();
591 let initial_blocks: Vec<u64> = (0..20).collect();
592 let shard = BlockNumberList::new_pre_sorted(initial_blocks.iter().copied());
593 batch
594 .put::<tables::AccountsHistory>(ShardedKey::new(tracked_addr, u64::MAX), &shard)
595 .unwrap();
596 batch.commit().unwrap();
597 }
598
599 let mut batch1 = rocksdb.batch();
602 batch1.append_account_history_shard(tracked_addr, 20..25u64).unwrap();
603 batch1.commit().unwrap();
604
605 let mut batch2 = rocksdb.batch();
608 batch2.prune_account_history_to(tracked_addr, 14).unwrap();
609 batch2.commit().unwrap();
610
611 let shards = rocksdb.account_history_shards(tracked_addr).unwrap();
613 let entries: Vec<u64> = shards.iter().flat_map(|(_, list)| list.iter()).collect();
614 let expected: Vec<u64> = (15..25).collect();
615 assert_eq!(entries, expected, "new entries 20..25 must survive pruning");
616 }
617
618 #[test]
619 fn test_read_only_consistency_across_reorg() {
620 reth_tracing::init_test_tracing();
621
622 reth_db::test_utils::enable_legacy_multiopen();
624
625 let provider_factory = create_test_provider_factory();
626 provider_factory.set_storage_settings_cache(reth_provider::StorageSettings::v2());
627
628 let secondary = ProviderFactoryBuilder::<MockNodeTypes>::default()
630 .open_read_only(
631 provider_factory.chain_spec(),
632 ReadOnlyConfig::from_datadir(provider_factory.db_ref().path()),
633 reth_tasks::Runtime::test(),
634 )
635 .expect("failed to open read-only provider factory");
636 secondary.set_storage_settings_cache(reth_provider::StorageSettings::v2());
637
638 let mut test_block_builder = TestBlockBuilder::eth().with_state();
640 let signer = test_block_builder.signer;
641 let blocks_a: Vec<_> = test_block_builder.get_executed_blocks(0..3).collect();
642 let hash_a1 = blocks_a[1].recovered_block().hash();
643 let hash_a2 = blocks_a[2].recovered_block().hash();
644
645 let single_cost = TestBlockBuilder::<EthPrimitives>::single_tx_cost();
647 let initial_balance = U256::from(10).pow(U256::from(18));
648 let txs_in_block0 = blocks_a[0].recovered_block().body().transactions.len() as u64;
649 let txs_in_block1 = blocks_a[1].recovered_block().body().transactions.len() as u64;
650
651 let balance_after_block0 = initial_balance - single_cost * U256::from(txs_in_block0);
652 let nonce_after_block0 = txs_in_block0;
653 let balance_after_block1 = balance_after_block0 - single_cost * U256::from(txs_in_block1);
654 let nonce_after_block1 = nonce_after_block0 + txs_in_block1;
655
656 {
657 let provider_rw = provider_factory.database_provider_rw().unwrap();
658 provider_rw.save_blocks(blocks_a, SaveBlocksMode::Full).unwrap();
659 provider_rw.commit().unwrap();
660 }
661
662 let pre_reorg_provider = secondary.provider().unwrap();
665 assert_eq!(
666 pre_reorg_provider.sealed_header(2).unwrap().as_ref().map(|h| h.hash()),
667 Some(hash_a2),
668 "secondary must see block 2 after initial append"
669 );
670
671 {
673 let primary_state_at_1 = provider_factory.history_by_block_number(1).unwrap();
674 let primary_account = primary_state_at_1.basic_account(&signer).unwrap();
675 assert!(primary_account.is_some(), "primary: signer must exist at block 1");
676 }
677
678 {
680 let state_at_1 = secondary.history_by_block_number(1).unwrap();
681 let account_at_1 = state_at_1.basic_account(&signer).unwrap();
682 assert!(account_at_1.is_some(), "signer account must exist at block 1");
683 let account_at_1 = account_at_1.unwrap();
684 assert_eq!(account_at_1.balance, balance_after_block1, "signer balance at block 1");
685 assert_eq!(account_at_1.nonce, nonce_after_block1, "signer nonce at block 1");
686 }
687
688 let block_b2 = test_block_builder.get_executed_block_with_number(2, hash_a1);
692 let hash_b2 = block_b2.recovered_block().hash();
693 let txs_in_block_b2 = block_b2.recovered_block().body().transactions.len() as u64;
694 assert_ne!(hash_a2, hash_b2, "reorg block must differ");
695
696 let balance_after_reorg_block2 =
698 balance_after_block1 - single_cost * U256::from(txs_in_block_b2);
699 let nonce_after_reorg_block2 = nonce_after_block1 + txs_in_block_b2;
700
701 #[expect(clippy::redundant_clone)]
708 let pf = provider_factory.clone();
709 let reorg_handle = std::thread::spawn(move || {
710 let provider_rw = pf.database_provider_rw().unwrap();
711 provider_rw.remove_block_and_execution_above(1).unwrap();
712 provider_rw.commit().unwrap();
713
714 let provider_rw = pf.database_provider_rw().unwrap();
715 provider_rw.save_blocks(vec![block_b2], SaveBlocksMode::Full).unwrap();
716 provider_rw.commit().unwrap();
717 });
718
719 std::thread::sleep(std::time::Duration::from_millis(100));
721
722 assert_eq!(
725 pre_reorg_provider.sealed_header(2).unwrap().as_ref().map(|h| h.hash()),
726 Some(hash_a2),
727 "pre-reorg provider must still see the original block 2"
728 );
729 assert_eq!(
730 pre_reorg_provider.sealed_header(1).unwrap().as_ref().map(|h| h.hash()),
731 Some(hash_a1),
732 "pre-reorg provider must still see block 1"
733 );
734
735 let state_at_1 = pre_reorg_provider.try_into_history_at_block(1).unwrap();
739 let account = state_at_1.basic_account(&signer).unwrap();
740 assert!(
741 account.is_some(),
742 "pre-reorg RO tx must still read signer at block 1 during reorg"
743 );
744 let account = account.unwrap();
745 assert_eq!(
746 account.balance, balance_after_block1,
747 "pre-reorg RO tx: signer balance at block 1 during reorg"
748 );
749 assert_eq!(
750 account.nonce, nonce_after_block1,
751 "pre-reorg RO tx: signer nonce at block 1 during reorg"
752 );
753 drop(state_at_1);
754 reorg_handle.join().expect("reorg thread panicked");
755
756 let obs_header = secondary.provider().unwrap().sealed_header(2).unwrap();
758 assert_eq!(
759 obs_header.as_ref().map(|h| h.hash()),
760 Some(hash_b2),
761 "secondary must see the reorged block 2, not the old one"
762 );
763
764 let obs_header = secondary.provider().unwrap().sealed_header(1).unwrap();
766 assert_eq!(
767 obs_header.as_ref().map(|h| h.hash()),
768 Some(hash_a1),
769 "secondary must still see block 1"
770 );
771
772 let state_at_1 = secondary.history_by_block_number(1).unwrap();
774 let account_at_1 = state_at_1.basic_account(&signer).unwrap();
775 assert!(account_at_1.is_some(), "signer account must exist at block 1 after reorg");
776 let account_at_1 = account_at_1.unwrap();
777 assert_eq!(
778 account_at_1.balance, balance_after_block1,
779 "signer balance at block 1 must survive reorg"
780 );
781 assert_eq!(
782 account_at_1.nonce, nonce_after_block1,
783 "signer nonce at block 1 must survive reorg"
784 );
785
786 let state_at_2 = secondary.history_by_block_number(2).unwrap();
788 let account_at_2 = state_at_2.basic_account(&signer).unwrap();
789 assert!(account_at_2.is_some(), "signer account must exist at block 2 after reorg");
790 let account_at_2 = account_at_2.unwrap();
791 assert_eq!(
792 account_at_2.balance, balance_after_reorg_block2,
793 "signer balance at block 2 must reflect reorged execution"
794 );
795 assert_eq!(
796 account_at_2.nonce, nonce_after_reorg_block2,
797 "signer nonce at block 2 must reflect reorged execution"
798 );
799 }
800}