reth_engine_tree/tree/
persistence_state.rs

1//! Persistence state management for background database operations.
2//!
3//! This module manages the state of background tasks that persist cached data
4//! to the database. The persistence system works asynchronously to avoid blocking
5//! block execution while ensuring data durability.
6//!
7//! ## Background Persistence
8//!
9//! The execution engine maintains an in-memory cache of state changes that need
10//! to be persisted to disk. Rather than writing synchronously (which would slow
11//! down block processing), persistence happens in background tasks.
12//!
13//! ## Persistence Actions
14//!
15//! - **Saving Blocks**: Persist newly executed blocks and their state changes
16//! - **Removing Blocks**: Remove invalid blocks during chain reorganizations
17//!
18//! ## Coordination
19//!
20//! The [`PersistenceState`] tracks ongoing persistence operations and coordinates
21//! between the main execution thread and background persistence workers.
22
23use alloy_eips::BlockNumHash;
24use alloy_primitives::B256;
25use std::time::Instant;
26use tokio::sync::oneshot;
27use tracing::trace;
28
29/// The state of the persistence task.
30#[derive(Default, Debug)]
31pub struct PersistenceState {
32    /// Hash and number of the last block persisted.
33    ///
34    /// This tracks the chain height that is persisted on disk
35    pub(crate) last_persisted_block: BlockNumHash,
36    /// Receiver end of channel where the result of the persistence task will be
37    /// sent when done. A None value means there's no persistence task in progress.
38    pub(crate) rx:
39        Option<(oneshot::Receiver<Option<BlockNumHash>>, Instant, CurrentPersistenceAction)>,
40}
41
42impl PersistenceState {
43    /// Determines if there is a persistence task in progress by checking if the
44    /// receiver is set.
45    pub(crate) const fn in_progress(&self) -> bool {
46        self.rx.is_some()
47    }
48
49    /// Sets the state for a block removal operation.
50    pub(crate) fn start_remove(
51        &mut self,
52        new_tip_num: u64,
53        rx: oneshot::Receiver<Option<BlockNumHash>>,
54    ) {
55        self.rx =
56            Some((rx, Instant::now(), CurrentPersistenceAction::RemovingBlocks { new_tip_num }));
57    }
58
59    /// Sets the state for a block save operation.
60    pub(crate) fn start_save(
61        &mut self,
62        highest: BlockNumHash,
63        rx: oneshot::Receiver<Option<BlockNumHash>>,
64    ) {
65        self.rx = Some((rx, Instant::now(), CurrentPersistenceAction::SavingBlocks { highest }));
66    }
67
68    /// Returns the current persistence action. If there is no persistence task in progress, then
69    /// this returns `None`.
70    pub(crate) fn current_action(&self) -> Option<&CurrentPersistenceAction> {
71        self.rx.as_ref().map(|rx| &rx.2)
72    }
73
74    /// Sets state for a finished persistence task.
75    pub(crate) fn finish(
76        &mut self,
77        last_persisted_block_hash: B256,
78        last_persisted_block_number: u64,
79    ) {
80        trace!(target: "engine::tree", block= %last_persisted_block_number, hash=%last_persisted_block_hash, "updating persistence state");
81        self.rx = None;
82        self.last_persisted_block =
83            BlockNumHash::new(last_persisted_block_number, last_persisted_block_hash);
84    }
85}
86
87/// The currently running persistence action.
88#[derive(Debug, Clone, PartialEq, Eq)]
89pub(crate) enum CurrentPersistenceAction {
90    /// The persistence task is saving blocks.
91    SavingBlocks {
92        /// The highest block being saved.
93        highest: BlockNumHash,
94    },
95    /// The persistence task is removing blocks.
96    RemovingBlocks {
97        /// The tip, above which we are removing blocks.
98        new_tip_num: u64,
99    },
100}