Skip to main content

reth_engine_tree/
persistence.rs

1use crate::metrics::PersistenceMetrics;
2use alloy_eips::BlockNumHash;
3use crossbeam_channel::Sender as CrossbeamSender;
4use reth_chain_state::ExecutedBlock;
5use reth_errors::ProviderError;
6use reth_ethereum_primitives::EthPrimitives;
7use reth_primitives_traits::{FastInstant as Instant, NodePrimitives};
8use reth_provider::{
9    providers::ProviderNodeTypes, BalProvider, BlockExecutionWriter, BlockHashReader,
10    ChainStateBlockWriter, DBProvider, DatabaseProviderFactory, ProviderFactory, SaveBlocksMode,
11};
12use reth_prune::{PrunerError, PrunerWithFactory};
13use reth_stages_api::{MetricEvent, MetricEventsSender};
14use reth_tasks::spawn_os_thread;
15use std::{
16    sync::{
17        mpsc::{Receiver, SendError, Sender},
18        Arc,
19    },
20    thread::JoinHandle,
21    time::Duration,
22};
23use thiserror::Error;
24use tracing::{debug, error, instrument, warn};
25
26/// Unified result of any persistence operation.
27#[derive(Debug)]
28pub struct PersistenceResult {
29    /// The last block that was persisted, if any.
30    pub last_block: Option<BlockNumHash>,
31    /// The commit duration, only available for save-blocks operations.
32    pub commit_duration: Option<Duration>,
33}
34
35/// Writes parts of reth's in memory tree state to the database and static files.
36///
37/// This is meant to be a spawned service that listens for various incoming persistence operations,
38/// performing those actions on disk, and returning the result in a channel.
39///
40/// This should be spawned in its own thread with [`std::thread::spawn`], since this performs
41/// blocking I/O operations in an endless loop.
42#[derive(Debug)]
43pub struct PersistenceService<N>
44where
45    N: ProviderNodeTypes,
46{
47    /// The provider factory to use
48    provider: ProviderFactory<N>,
49    /// Incoming requests
50    incoming: Receiver<PersistenceAction<N::Primitives>>,
51    /// The pruner
52    pruner: PrunerWithFactory<ProviderFactory<N>>,
53    /// metrics
54    metrics: PersistenceMetrics,
55    /// Sender for sync metrics - we only submit sync metrics for persisted blocks
56    sync_metrics_tx: MetricEventsSender,
57    /// Pending finalized block number to be committed with the next block save.
58    /// This avoids triggering a separate fsync for each finalized block update.
59    pending_finalized_block: Option<u64>,
60    /// Pending safe block number to be committed with the next block save.
61    /// This avoids triggering a separate fsync for each safe block update.
62    pending_safe_block: Option<u64>,
63}
64
65impl<N> PersistenceService<N>
66where
67    N: ProviderNodeTypes,
68{
69    /// Create a new persistence service
70    pub fn new(
71        provider: ProviderFactory<N>,
72        incoming: Receiver<PersistenceAction<N::Primitives>>,
73        pruner: PrunerWithFactory<ProviderFactory<N>>,
74        sync_metrics_tx: MetricEventsSender,
75    ) -> Self {
76        Self {
77            provider,
78            incoming,
79            pruner,
80            metrics: PersistenceMetrics::default(),
81            sync_metrics_tx,
82            pending_finalized_block: None,
83            pending_safe_block: None,
84        }
85    }
86}
87
88impl<N> PersistenceService<N>
89where
90    N: ProviderNodeTypes,
91{
92    /// This is the main loop, that will listen to database events and perform the requested
93    /// database actions
94    pub fn run(mut self) -> Result<(), PersistenceError> {
95        // If the receiver errors then senders have disconnected, so the loop should then end.
96        while let Ok(action) = self.incoming.recv() {
97            match action {
98                PersistenceAction::RemoveBlocksAbove(new_tip_num, sender) => {
99                    let last_block = self.on_remove_blocks_above(new_tip_num)?;
100                    // send new sync metrics based on removed blocks
101                    let _ =
102                        self.sync_metrics_tx.send(MetricEvent::SyncHeight { height: new_tip_num });
103                    let _ = sender.send(PersistenceResult { last_block, commit_duration: None });
104                }
105                PersistenceAction::SaveBlocks(blocks, sender) => {
106                    let result = self.on_save_blocks(blocks)?;
107                    let result_number = result.last_block.map(|b| b.number);
108
109                    let _ = sender.send(result);
110
111                    if let Some(block_number) = result_number {
112                        // send new sync metrics based on saved blocks
113                        let _ = self
114                            .sync_metrics_tx
115                            .send(MetricEvent::SyncHeight { height: block_number });
116                        self.maybe_run_pruner(block_number)?;
117                    }
118                }
119                PersistenceAction::SaveFinalizedBlock(finalized_block) => {
120                    self.pending_finalized_block = Some(finalized_block);
121                }
122                PersistenceAction::SaveSafeBlock(safe_block) => {
123                    self.pending_safe_block = Some(safe_block);
124                }
125            }
126        }
127        Ok(())
128    }
129
130    #[instrument(level = "debug", target = "engine::persistence", skip_all, fields(%new_tip_num))]
131    fn on_remove_blocks_above(
132        &self,
133        new_tip_num: u64,
134    ) -> Result<Option<BlockNumHash>, PersistenceError> {
135        debug!(target: "engine::persistence", ?new_tip_num, "Removing blocks");
136        let start_time = Instant::now();
137        let provider_rw = self.provider.database_provider_rw()?;
138
139        let new_tip_hash = provider_rw.block_hash(new_tip_num)?;
140        provider_rw.remove_block_and_execution_above(new_tip_num)?;
141        provider_rw.commit()?;
142
143        debug!(target: "engine::persistence", ?new_tip_num, ?new_tip_hash, "Removed blocks from disk");
144        self.metrics.remove_blocks_above_duration_seconds.record(start_time.elapsed());
145        Ok(new_tip_hash.map(|hash| BlockNumHash { hash, number: new_tip_num }))
146    }
147
148    #[instrument(level = "debug", target = "engine::persistence", skip_all, fields(block_count = blocks.len()))]
149    fn on_save_blocks(
150        &mut self,
151        blocks: Vec<ExecutedBlock<N::Primitives>>,
152    ) -> Result<PersistenceResult, PersistenceError> {
153        let first_block = blocks.first().map(|b| b.recovered_block.num_hash());
154        let last_block = blocks.last().map(|b| b.recovered_block.num_hash());
155        let block_count = blocks.len();
156
157        let pending_finalized = self.pending_finalized_block.take();
158        let pending_safe = self.pending_safe_block.take();
159
160        debug!(target: "engine::persistence", ?block_count, first=?first_block, last=?last_block, "Saving range of blocks");
161
162        let start_time = Instant::now();
163
164        if let Some(last) = last_block {
165            let provider_rw = self.provider.database_provider_rw()?;
166            provider_rw.save_blocks(blocks, SaveBlocksMode::Full)?;
167
168            if let Some(finalized) = pending_finalized {
169                provider_rw.save_finalized_block_number(finalized.min(last.number))?;
170                if finalized > last.number {
171                    self.pending_finalized_block = Some(finalized);
172                }
173            }
174            if let Some(safe) = pending_safe {
175                provider_rw.save_safe_block_number(safe.min(last.number))?;
176                if safe > last.number {
177                    self.pending_safe_block = Some(safe);
178                }
179            }
180
181            provider_rw.commit()?;
182            let _ = self.provider.bal_store().flush().inspect_err(|err| {
183                warn!(target: "engine::persistence", last=?last_block, ?err, "Failed to flush BAL store");
184            });
185            debug!(target: "engine::persistence", first=?first_block, last=?last_block, "Saved range of blocks");
186        }
187
188        let elapsed = start_time.elapsed();
189        self.metrics.save_blocks_batch_size.record(block_count as f64);
190        self.metrics.save_blocks_duration_seconds.record(elapsed);
191
192        Ok(PersistenceResult { last_block, commit_duration: Some(elapsed) })
193    }
194
195    fn maybe_run_pruner(&mut self, block_number: u64) -> Result<(), PersistenceError> {
196        // The durable save is already committed at this point, so pruning can happen after we
197        // acknowledge the save without extending the synchronous persistence wait.
198        if self.pruner.is_pruning_needed(block_number) {
199            debug!(target: "engine::persistence", block_num=?block_number, "Running pruner");
200            let prune_start = Instant::now();
201            let provider_rw = self.provider.database_provider_rw()?;
202            let _ = self.pruner.run_with_provider(&provider_rw, block_number)?;
203            provider_rw.commit()?;
204            let pruned_bals = self
205                .provider
206                .bal_store()
207                .prune(block_number)
208                .inspect_err(|err| {
209                    warn!(target: "engine::persistence", tip=?block_number, ?err, "Failed to prune BAL store");
210                })
211                .unwrap_or_default();
212            debug!(target: "engine::persistence", tip=?block_number, pruned_bals, "Finished pruning after saving blocks");
213            self.metrics.prune_before_duration_seconds.record(prune_start.elapsed());
214        }
215
216        Ok(())
217    }
218}
219
220/// One of the errors that can happen when using the persistence service.
221#[derive(Debug, Error)]
222pub enum PersistenceError {
223    /// A pruner error
224    #[error(transparent)]
225    PrunerError(#[from] PrunerError),
226
227    /// A provider error
228    #[error(transparent)]
229    ProviderError(#[from] ProviderError),
230}
231
232/// A signal to the persistence service that part of the tree state can be persisted.
233#[derive(Debug)]
234pub enum PersistenceAction<N: NodePrimitives = EthPrimitives> {
235    /// The section of tree state that should be persisted. These blocks are expected in order of
236    /// increasing block number.
237    ///
238    /// First, header, transaction, and receipt-related data should be written to static files.
239    /// Then the execution history-related data will be written to the database.
240    SaveBlocks(Vec<ExecutedBlock<N>>, CrossbeamSender<PersistenceResult>),
241
242    /// Removes block data above the given block number from the database.
243    ///
244    /// This will first update checkpoints from the database, then remove actual block data from
245    /// static files.
246    RemoveBlocksAbove(u64, CrossbeamSender<PersistenceResult>),
247
248    /// Update the persisted finalized block on disk
249    SaveFinalizedBlock(u64),
250
251    /// Update the persisted safe block on disk
252    SaveSafeBlock(u64),
253}
254
255/// A handle to the persistence service
256#[derive(Debug, Clone)]
257pub struct PersistenceHandle<N: NodePrimitives = EthPrimitives> {
258    /// The channel used to communicate with the persistence service
259    sender: Sender<PersistenceAction<N>>,
260    /// Guard that joins the service thread when all handles are dropped.
261    /// Uses `Arc` so the handle remains `Clone`.
262    _service_guard: Arc<ServiceGuard>,
263}
264
265impl<T: NodePrimitives> PersistenceHandle<T> {
266    /// Create a new [`PersistenceHandle`] from a [`Sender<PersistenceAction>`].
267    ///
268    /// This is intended for testing purposes where you want to mock the persistence service.
269    /// For production use, prefer [`spawn_service`](Self::spawn_service).
270    pub fn new(sender: Sender<PersistenceAction<T>>) -> Self {
271        Self { sender, _service_guard: Arc::new(ServiceGuard(None)) }
272    }
273
274    /// Create a new [`PersistenceHandle`], and spawn the persistence service.
275    ///
276    /// The returned handle can be cloned and shared. When all clones are dropped, the service
277    /// thread will be joined, ensuring graceful shutdown before resources (like `RocksDB`) are
278    /// released.
279    pub fn spawn_service<N>(
280        provider_factory: ProviderFactory<N>,
281        pruner: PrunerWithFactory<ProviderFactory<N>>,
282        sync_metrics_tx: MetricEventsSender,
283    ) -> PersistenceHandle<N::Primitives>
284    where
285        N: ProviderNodeTypes,
286    {
287        // create the initial channels
288        let (db_service_tx, db_service_rx) = std::sync::mpsc::channel();
289
290        // spawn the persistence service
291        let db_service =
292            PersistenceService::new(provider_factory, db_service_rx, pruner, sync_metrics_tx);
293        let join_handle = spawn_os_thread("persistence", || {
294            if let Err(err) = db_service.run() {
295                error!(target: "engine::persistence", ?err, "Persistence service failed");
296            }
297        });
298
299        PersistenceHandle {
300            sender: db_service_tx,
301            _service_guard: Arc::new(ServiceGuard(Some(join_handle))),
302        }
303    }
304
305    /// Sends a specific [`PersistenceAction`] in the contained channel. The caller is responsible
306    /// for creating any channels for the given action.
307    pub fn send_action(
308        &self,
309        action: PersistenceAction<T>,
310    ) -> Result<(), SendError<PersistenceAction<T>>> {
311        self.sender.send(action)
312    }
313
314    /// Tells the persistence service to save a certain list of finalized blocks. The blocks are
315    /// assumed to be ordered by block number.
316    ///
317    /// This returns the latest hash that has been saved, allowing removal of that block and any
318    /// previous blocks from in-memory data structures. This value is returned in the receiver end
319    /// of the sender argument.
320    ///
321    /// If there are no blocks to persist, then `None` is sent in the sender.
322    pub fn save_blocks(
323        &self,
324        blocks: Vec<ExecutedBlock<T>>,
325        tx: CrossbeamSender<PersistenceResult>,
326    ) -> Result<(), SendError<PersistenceAction<T>>> {
327        self.send_action(PersistenceAction::SaveBlocks(blocks, tx))
328    }
329
330    /// Queues the finalized block number to be persisted on disk.
331    ///
332    /// The update is deferred and will be committed together with the next [`Self::save_blocks`]
333    /// call to avoid triggering a separate fsync for each update.
334    pub fn save_finalized_block_number(
335        &self,
336        finalized_block: u64,
337    ) -> Result<(), SendError<PersistenceAction<T>>> {
338        self.send_action(PersistenceAction::SaveFinalizedBlock(finalized_block))
339    }
340
341    /// Queues the safe block number to be persisted on disk.
342    ///
343    /// The update is deferred and will be committed together with the next [`Self::save_blocks`]
344    /// call to avoid triggering a separate fsync for each update.
345    pub fn save_safe_block_number(
346        &self,
347        safe_block: u64,
348    ) -> Result<(), SendError<PersistenceAction<T>>> {
349        self.send_action(PersistenceAction::SaveSafeBlock(safe_block))
350    }
351
352    /// Tells the persistence service to remove blocks above a certain block number. The removed
353    /// blocks are returned by the service.
354    ///
355    /// When the operation completes, the new tip hash is returned in the receiver end of the sender
356    /// argument.
357    pub fn remove_blocks_above(
358        &self,
359        block_num: u64,
360        tx: CrossbeamSender<PersistenceResult>,
361    ) -> Result<(), SendError<PersistenceAction<T>>> {
362        self.send_action(PersistenceAction::RemoveBlocksAbove(block_num, tx))
363    }
364}
365
366/// Guard that joins the persistence service thread when dropped.
367///
368/// This ensures graceful shutdown - the service thread completes before resources like
369/// `RocksDB` are released. Stored in an `Arc` inside [`PersistenceHandle`] so the handle
370/// can be cloned while sharing the same guard.
371struct ServiceGuard(Option<JoinHandle<()>>);
372
373impl std::fmt::Debug for ServiceGuard {
374    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
375        f.debug_tuple("ServiceGuard").field(&self.0.as_ref().map(|_| "...")).finish()
376    }
377}
378
379impl Drop for ServiceGuard {
380    fn drop(&mut self) {
381        if let Some(join_handle) = self.0.take() {
382            let _ = join_handle.join();
383        }
384    }
385}
386
387#[cfg(test)]
388mod tests {
389    use super::*;
390    use alloy_eips::NumHash;
391    use alloy_primitives::{keccak256, BlockHash, BlockNumber, Bytes, Sealed, B256, U256};
392    use reth_chain_state::test_utils::TestBlockBuilder;
393    use reth_exex_types::FinishedExExHeight;
394    use reth_provider::{
395        providers::{ProviderFactoryBuilder, ReadOnlyConfig},
396        test_utils::{create_test_provider_factory, MockNodeTypes},
397        AccountReader, BalConfig, BalNotificationStream, BalStore, BalStoreHandle,
398        ChainSpecProvider, HeaderProvider, InMemoryBalStore, ProviderError, ProviderResult,
399        SealedBal, StorageSettingsCache, TryIntoHistoricalStateProvider,
400    };
401    use reth_prune::Pruner;
402    use reth_prune_types::PruneMode;
403    use tokio::sync::mpsc::unbounded_channel;
404
405    fn default_persistence_handle() -> PersistenceHandle<EthPrimitives> {
406        let provider = create_test_provider_factory();
407
408        let (_finished_exex_height_tx, finished_exex_height_rx) =
409            tokio::sync::watch::channel(FinishedExExHeight::NoExExs);
410
411        let pruner =
412            Pruner::new_with_factory(provider.clone(), vec![], 5, 0, None, finished_exex_height_rx);
413
414        let (sync_metrics_tx, _sync_metrics_rx) = unbounded_channel();
415        PersistenceHandle::<EthPrimitives>::spawn_service(provider, pruner, sync_metrics_tx)
416    }
417
418    #[test]
419    fn test_pruner_prunes_bal_store() {
420        reth_tracing::init_test_tracing();
421
422        let old_hash = B256::random();
423        let retained_hash = B256::random();
424        let old_bal = Bytes::from_static(b"old");
425        let retained_bal = Bytes::from_static(b"retained");
426        let bal_store = BalStoreHandle::new(InMemoryBalStore::new(
427            BalConfig::with_in_memory_retention(PruneMode::Before(2)),
428        ));
429
430        bal_store
431            .insert(
432                NumHash::new(1, old_hash),
433                Sealed::new_unchecked(old_bal.clone(), keccak256(&old_bal)),
434            )
435            .unwrap();
436        bal_store
437            .insert(
438                NumHash::new(2, retained_hash),
439                Sealed::new_unchecked(retained_bal.clone(), keccak256(&retained_bal)),
440            )
441            .unwrap();
442
443        let provider = create_test_provider_factory().with_bal_store(bal_store.clone());
444        let (_finished_exex_height_tx, finished_exex_height_rx) =
445            tokio::sync::watch::channel(FinishedExExHeight::NoExExs);
446        let pruner =
447            Pruner::new_with_factory(provider.clone(), vec![], 0, 0, None, finished_exex_height_rx);
448        let (_db_service_tx, db_service_rx) = std::sync::mpsc::channel();
449        let (sync_metrics_tx, _sync_metrics_rx) = unbounded_channel();
450        let mut service = PersistenceService::new(provider, db_service_rx, pruner, sync_metrics_tx);
451
452        service.maybe_run_pruner(2).unwrap();
453
454        assert_eq!(
455            bal_store.get_by_hashes(&[old_hash, retained_hash]).unwrap(),
456            vec![None, Some(retained_bal)]
457        );
458    }
459
460    #[test]
461    fn test_pruner_ignores_bal_store_prune_error() {
462        reth_tracing::init_test_tracing();
463
464        let provider = create_test_provider_factory()
465            .with_bal_store(BalStoreHandle::new(FailingPruneBalStore));
466        let (_finished_exex_height_tx, finished_exex_height_rx) =
467            tokio::sync::watch::channel(FinishedExExHeight::NoExExs);
468        let pruner =
469            Pruner::new_with_factory(provider.clone(), vec![], 0, 0, None, finished_exex_height_rx);
470        let (_db_service_tx, db_service_rx) = std::sync::mpsc::channel();
471        let (sync_metrics_tx, _sync_metrics_rx) = unbounded_channel();
472        let mut service = PersistenceService::new(provider, db_service_rx, pruner, sync_metrics_tx);
473
474        service.maybe_run_pruner(2).unwrap();
475    }
476
477    #[derive(Debug)]
478    struct FailingPruneBalStore;
479
480    impl BalStore for FailingPruneBalStore {
481        fn insert(&self, _num_hash: NumHash, _bal: SealedBal) -> ProviderResult<()> {
482            Ok(())
483        }
484
485        fn prune(&self, _tip: BlockNumber) -> ProviderResult<usize> {
486            Err(ProviderError::other(std::io::Error::other("BAL store prune failed")))
487        }
488
489        fn get_by_hashes(&self, block_hashes: &[BlockHash]) -> ProviderResult<Vec<Option<Bytes>>> {
490            Ok(vec![None; block_hashes.len()])
491        }
492
493        fn get_by_range(&self, _start: BlockNumber, _count: u64) -> ProviderResult<Vec<Bytes>> {
494            Ok(Vec::new())
495        }
496
497        fn bal_stream(&self) -> BalNotificationStream {
498            BalStoreHandle::noop().bal_stream()
499        }
500    }
501
502    #[test]
503    fn test_save_blocks_empty() {
504        reth_tracing::init_test_tracing();
505        let handle = default_persistence_handle();
506
507        let blocks = vec![];
508        let (tx, rx) = crossbeam_channel::bounded(1);
509
510        handle.save_blocks(blocks, tx).unwrap();
511
512        let result = rx.recv().unwrap();
513        assert!(result.last_block.is_none());
514    }
515
516    #[test]
517    fn test_save_blocks_single_block() {
518        reth_tracing::init_test_tracing();
519        let handle = default_persistence_handle();
520        let block_number = 0;
521        let mut test_block_builder = TestBlockBuilder::eth();
522        let executed =
523            test_block_builder.get_executed_block_with_number(block_number, B256::random());
524        let block_hash = executed.recovered_block().hash();
525
526        let blocks = vec![executed];
527        let (tx, rx) = crossbeam_channel::bounded(1);
528
529        handle.save_blocks(blocks, tx).unwrap();
530
531        let result = rx.recv_timeout(std::time::Duration::from_secs(10)).expect("test timed out");
532
533        assert_eq!(block_hash, result.last_block.unwrap().hash);
534    }
535
536    #[test]
537    fn test_save_blocks_multiple_blocks() {
538        reth_tracing::init_test_tracing();
539        let handle = default_persistence_handle();
540
541        let mut test_block_builder = TestBlockBuilder::eth();
542        let blocks = test_block_builder.get_executed_blocks(0..5).collect::<Vec<_>>();
543        let last_hash = blocks.last().unwrap().recovered_block().hash();
544        let (tx, rx) = crossbeam_channel::bounded(1);
545
546        handle.save_blocks(blocks, tx).unwrap();
547        let result = rx.recv().unwrap();
548        assert_eq!(last_hash, result.last_block.unwrap().hash);
549    }
550
551    #[test]
552    fn test_save_blocks_multiple_calls() {
553        reth_tracing::init_test_tracing();
554        let handle = default_persistence_handle();
555
556        let ranges = [0..1, 1..2, 2..4, 4..5];
557        let mut test_block_builder = TestBlockBuilder::eth();
558        for range in ranges {
559            let blocks = test_block_builder.get_executed_blocks(range).collect::<Vec<_>>();
560            let last_hash = blocks.last().unwrap().recovered_block().hash();
561            let (tx, rx) = crossbeam_channel::bounded(1);
562
563            handle.save_blocks(blocks, tx).unwrap();
564
565            let result = rx.recv().unwrap();
566            assert_eq!(last_hash, result.last_block.unwrap().hash);
567        }
568    }
569
570    /// Verifies that committing `save_blocks` history before running the pruner
571    /// prevents the pruner from overwriting new entries.
572    ///
573    /// Previously, both `save_blocks` and the pruner pushed `RocksDB` batches before
574    /// a single commit. Both read committed state, so the pruner didn't see the
575    /// new entries and its batch overwrote them. The fix commits `save_blocks`
576    /// first, then runs the pruner against committed state in a separate provider.
577    #[test]
578    fn test_save_blocks_then_prune_preserves_new_history() {
579        use reth_db::{models::ShardedKey, tables, BlockNumberList};
580        use reth_provider::RocksDBProviderFactory;
581
582        reth_tracing::init_test_tracing();
583
584        let provider_factory = create_test_provider_factory();
585        let tracked_addr = alloy_primitives::Address::from([0xBE; 20]);
586
587        // Phase 1: Establish baseline history for blocks 0..20.
588        let rocksdb = provider_factory.rocksdb_provider();
589        {
590            let mut batch = rocksdb.batch();
591            let initial_blocks: Vec<u64> = (0..20).collect();
592            let shard = BlockNumberList::new_pre_sorted(initial_blocks.iter().copied());
593            batch
594                .put::<tables::AccountsHistory>(ShardedKey::new(tracked_addr, u64::MAX), &shard)
595                .unwrap();
596            batch.commit().unwrap();
597        }
598
599        // Phase 2: Simulate the fixed on_save_blocks flow.
600        // Step 1: save_blocks appends new entries 20..25 and commits immediately.
601        let mut batch1 = rocksdb.batch();
602        batch1.append_account_history_shard(tracked_addr, 20..25u64).unwrap();
603        batch1.commit().unwrap();
604
605        // Step 2: Pruner runs AFTER commit, so it reads state that includes 20..25.
606        // Prunes entries ≤ 14, leaving [15..25).
607        let mut batch2 = rocksdb.batch();
608        batch2.prune_account_history_to(tracked_addr, 14).unwrap();
609        batch2.commit().unwrap();
610
611        // Verify new entries survived pruning.
612        let shards = rocksdb.account_history_shards(tracked_addr).unwrap();
613        let entries: Vec<u64> = shards.iter().flat_map(|(_, list)| list.iter()).collect();
614        let expected: Vec<u64> = (15..25).collect();
615        assert_eq!(entries, expected, "new entries 20..25 must survive pruning");
616    }
617
618    #[test]
619    fn test_read_only_consistency_across_reorg() {
620        reth_tracing::init_test_tracing();
621
622        // Allow opening the same MDBX env twice in-process
623        reth_db::test_utils::enable_legacy_multiopen();
624
625        let provider_factory = create_test_provider_factory();
626        provider_factory.set_storage_settings_cache(reth_provider::StorageSettings::v2());
627
628        // Open the secondary provider concurrently with the primary.
629        let secondary = ProviderFactoryBuilder::<MockNodeTypes>::default()
630            .open_read_only(
631                provider_factory.chain_spec(),
632                ReadOnlyConfig::from_datadir(provider_factory.db_ref().path()),
633                reth_tasks::Runtime::test(),
634            )
635            .expect("failed to open read-only provider factory");
636        secondary.set_storage_settings_cache(reth_provider::StorageSettings::v2());
637
638        // --- Phase 1: Write blocks 0..3 via the primary ---
639        let mut test_block_builder = TestBlockBuilder::eth().with_state();
640        let signer = test_block_builder.signer;
641        let blocks_a: Vec<_> = test_block_builder.get_executed_blocks(0..3).collect();
642        let hash_a1 = blocks_a[1].recovered_block().hash();
643        let hash_a2 = blocks_a[2].recovered_block().hash();
644
645        // Compute expected signer state after each block from tx counts.
646        let single_cost = TestBlockBuilder::<EthPrimitives>::single_tx_cost();
647        let initial_balance = U256::from(10).pow(U256::from(18));
648        let txs_in_block0 = blocks_a[0].recovered_block().body().transactions.len() as u64;
649        let txs_in_block1 = blocks_a[1].recovered_block().body().transactions.len() as u64;
650
651        let balance_after_block0 = initial_balance - single_cost * U256::from(txs_in_block0);
652        let nonce_after_block0 = txs_in_block0;
653        let balance_after_block1 = balance_after_block0 - single_cost * U256::from(txs_in_block1);
654        let nonce_after_block1 = nonce_after_block0 + txs_in_block1;
655
656        {
657            let provider_rw = provider_factory.database_provider_rw().unwrap();
658            provider_rw.save_blocks(blocks_a, SaveBlocksMode::Full).unwrap();
659            provider_rw.commit().unwrap();
660        }
661
662        // Secondary catches up and sees all 3 blocks.
663        // Hold this provider (and its MDBX RO tx) across the reorg to test snapshot isolation.
664        let pre_reorg_provider = secondary.provider().unwrap();
665        assert_eq!(
666            pre_reorg_provider.sealed_header(2).unwrap().as_ref().map(|h| h.hash()),
667            Some(hash_a2),
668            "secondary must see block 2 after initial append"
669        );
670
671        // Check the primary can read its own historical state.
672        {
673            let primary_state_at_1 = provider_factory.history_by_block_number(1).unwrap();
674            let primary_account = primary_state_at_1.basic_account(&signer).unwrap();
675            assert!(primary_account.is_some(), "primary: signer must exist at block 1");
676        }
677
678        // Verify historical state at block 1 is accessible via changesets on the secondary.
679        {
680            let state_at_1 = secondary.history_by_block_number(1).unwrap();
681            let account_at_1 = state_at_1.basic_account(&signer).unwrap();
682            assert!(account_at_1.is_some(), "signer account must exist at block 1");
683            let account_at_1 = account_at_1.unwrap();
684            assert_eq!(account_at_1.balance, balance_after_block1, "signer balance at block 1");
685            assert_eq!(account_at_1.nonce, nonce_after_block1, "signer nonce at block 1");
686        }
687
688        // --- Phase 2: Reorg — remove block 2 and append a different block 2 ---
689        // Build the reorg block before starting the commit so we can write it in the
690        // same thread after the unwind.
691        let block_b2 = test_block_builder.get_executed_block_with_number(2, hash_a1);
692        let hash_b2 = block_b2.recovered_block().hash();
693        let txs_in_block_b2 = block_b2.recovered_block().body().transactions.len() as u64;
694        assert_ne!(hash_a2, hash_b2, "reorg block must differ");
695
696        // Expected signer state after the reorged block 2.
697        let balance_after_reorg_block2 =
698            balance_after_block1 - single_cost * U256::from(txs_in_block_b2);
699        let nonce_after_reorg_block2 = nonce_after_block1 + txs_in_block_b2;
700
701        // Spawn the reorg on a background thread because `commit_unwind` calls
702        // `wait_for_pre_commit_readers()` which blocks until the secondary's held
703        // RO tx is dropped.
704        //
705        // We want to keep provider factory around, otherwise it's gonna drop mdbx env before the
706        // reorg thread is on
707        #[expect(clippy::redundant_clone)]
708        let pf = provider_factory.clone();
709        let reorg_handle = std::thread::spawn(move || {
710            let provider_rw = pf.database_provider_rw().unwrap();
711            provider_rw.remove_block_and_execution_above(1).unwrap();
712            provider_rw.commit().unwrap();
713
714            let provider_rw = pf.database_provider_rw().unwrap();
715            provider_rw.save_blocks(vec![block_b2], SaveBlocksMode::Full).unwrap();
716            provider_rw.commit().unwrap();
717        });
718
719        // Give the reorg thread time to start and block on wait_for_pre_commit_readers.
720        std::thread::sleep(std::time::Duration::from_millis(100));
721
722        // The pre-reorg provider still holds its MDBX snapshot — it must still see
723        // the OLD block 2 from before the reorg.
724        assert_eq!(
725            pre_reorg_provider.sealed_header(2).unwrap().as_ref().map(|h| h.hash()),
726            Some(hash_a2),
727            "pre-reorg provider must still see the original block 2"
728        );
729        assert_eq!(
730            pre_reorg_provider.sealed_header(1).unwrap().as_ref().map(|h| h.hash()),
731            Some(hash_a1),
732            "pre-reorg provider must still see block 1"
733        );
734
735        // The held RO tx must still be able to read historical state at block 1 via
736        // changesets, even though the reorg thread is about to rewrite block 2's data.
737        // Consuming pre_reorg_provider here also unblocks the reorg commit.
738        let state_at_1 = pre_reorg_provider.try_into_history_at_block(1).unwrap();
739        let account = state_at_1.basic_account(&signer).unwrap();
740        assert!(
741            account.is_some(),
742            "pre-reorg RO tx must still read signer at block 1 during reorg"
743        );
744        let account = account.unwrap();
745        assert_eq!(
746            account.balance, balance_after_block1,
747            "pre-reorg RO tx: signer balance at block 1 during reorg"
748        );
749        assert_eq!(
750            account.nonce, nonce_after_block1,
751            "pre-reorg RO tx: signer nonce at block 1 during reorg"
752        );
753        drop(state_at_1);
754        reorg_handle.join().expect("reorg thread panicked");
755
756        // A new provider catches up and sees the reorged chain.
757        let obs_header = secondary.provider().unwrap().sealed_header(2).unwrap();
758        assert_eq!(
759            obs_header.as_ref().map(|h| h.hash()),
760            Some(hash_b2),
761            "secondary must see the reorged block 2, not the old one"
762        );
763
764        // Block 1 should still be the original.
765        let obs_header = secondary.provider().unwrap().sealed_header(1).unwrap();
766        assert_eq!(
767            obs_header.as_ref().map(|h| h.hash()),
768            Some(hash_a1),
769            "secondary must still see block 1"
770        );
771
772        // Verify historical state at block 1 is still accessible after the reorg.
773        let state_at_1 = secondary.history_by_block_number(1).unwrap();
774        let account_at_1 = state_at_1.basic_account(&signer).unwrap();
775        assert!(account_at_1.is_some(), "signer account must exist at block 1 after reorg");
776        let account_at_1 = account_at_1.unwrap();
777        assert_eq!(
778            account_at_1.balance, balance_after_block1,
779            "signer balance at block 1 must survive reorg"
780        );
781        assert_eq!(
782            account_at_1.nonce, nonce_after_block1,
783            "signer nonce at block 1 must survive reorg"
784        );
785
786        // Verify the latest state (at block 2) reflects the reorged execution.
787        let state_at_2 = secondary.history_by_block_number(2).unwrap();
788        let account_at_2 = state_at_2.basic_account(&signer).unwrap();
789        assert!(account_at_2.is_some(), "signer account must exist at block 2 after reorg");
790        let account_at_2 = account_at_2.unwrap();
791        assert_eq!(
792            account_at_2.balance, balance_after_reorg_block2,
793            "signer balance at block 2 must reflect reorged execution"
794        );
795        assert_eq!(
796            account_at_2.nonce, nonce_after_reorg_block2,
797            "signer nonce at block 2 must reflect reorged execution"
798        );
799    }
800}