reth_engine_tree/tree/persistence_state.rs
1//! Persistence state management for background database operations.
2//!
3//! This module manages the state of background tasks that persist cached data
4//! to the database. The persistence system works asynchronously to avoid blocking
5//! block execution while ensuring data durability.
6//!
7//! ## Background Persistence
8//!
9//! The execution engine maintains an in-memory cache of state changes that need
10//! to be persisted to disk. Rather than writing synchronously (which would slow
11//! down block processing), persistence happens in background tasks.
12//!
13//! ## Persistence Actions
14//!
15//! - **Saving Blocks**: Persist newly executed blocks and their state changes
16//! - **Removing Blocks**: Remove invalid blocks during chain reorganizations
17//!
18//! ## Coordination
19//!
20//! The [`PersistenceState`] tracks ongoing persistence operations and coordinates
21//! between the main execution thread and background persistence workers.
22
23use crate::persistence::PersistenceResult;
24use alloy_eips::BlockNumHash;
25use alloy_primitives::B256;
26use crossbeam_channel::Receiver as CrossbeamReceiver;
27use reth_primitives_traits::FastInstant as Instant;
28use tracing::trace;
29
30/// The state of the persistence task.
31#[derive(Debug)]
32pub struct PersistenceState {
33 /// Hash and number of the last block persisted.
34 ///
35 /// This tracks the chain height that is persisted on disk
36 pub(crate) last_persisted_block: BlockNumHash,
37 /// Receiver end of channel where the result of the persistence task will be
38 /// sent when done. A None value means there's no persistence task in progress.
39 pub(crate) rx:
40 Option<(CrossbeamReceiver<PersistenceResult>, Instant, CurrentPersistenceAction)>,
41}
42
43impl PersistenceState {
44 /// Determines if there is a persistence task in progress by checking if the
45 /// receiver is set.
46 pub(crate) const fn in_progress(&self) -> bool {
47 self.rx.is_some()
48 }
49
50 /// Sets the state for a block removal operation.
51 pub(crate) fn start_remove(
52 &mut self,
53 new_tip_num: u64,
54 rx: CrossbeamReceiver<PersistenceResult>,
55 ) {
56 self.rx =
57 Some((rx, Instant::now(), CurrentPersistenceAction::RemovingBlocks { new_tip_num }));
58 }
59
60 /// Sets the state for a block save operation.
61 pub(crate) fn start_save(
62 &mut self,
63 highest: BlockNumHash,
64 rx: CrossbeamReceiver<PersistenceResult>,
65 ) {
66 self.rx = Some((rx, Instant::now(), CurrentPersistenceAction::SavingBlocks { highest }));
67 }
68
69 /// Returns the current persistence action. If there is no persistence task in progress, then
70 /// this returns `None`.
71 #[cfg(test)]
72 pub(crate) fn current_action(&self) -> Option<&CurrentPersistenceAction> {
73 self.rx.as_ref().map(|rx| &rx.2)
74 }
75
76 /// Sets state for a finished persistence task.
77 pub(crate) fn finish(
78 &mut self,
79 last_persisted_block_hash: B256,
80 last_persisted_block_number: u64,
81 ) {
82 trace!(target: "engine::tree", block= %last_persisted_block_number, hash=%last_persisted_block_hash, "updating persistence state");
83 self.rx = None;
84 self.last_persisted_block =
85 BlockNumHash::new(last_persisted_block_number, last_persisted_block_hash);
86 }
87}
88
89/// The currently running persistence action.
90#[derive(Debug, Clone, PartialEq, Eq)]
91pub(crate) enum CurrentPersistenceAction {
92 /// The persistence task is saving blocks.
93 SavingBlocks {
94 /// The highest block being saved.
95 highest: BlockNumHash,
96 },
97 /// The persistence task is removing blocks.
98 RemovingBlocks {
99 /// The tip, above which we are removing blocks.
100 new_tip_num: u64,
101 },
102}