Skip to main content

reth_engine_tree/
persistence.rs

1use crate::metrics::PersistenceMetrics;
2use alloy_eips::BlockNumHash;
3use crossbeam_channel::Sender as CrossbeamSender;
4use reth_chain_state::ExecutedBlock;
5use reth_errors::ProviderError;
6use reth_ethereum_primitives::EthPrimitives;
7use reth_primitives_traits::{FastInstant as Instant, NodePrimitives};
8use reth_provider::{
9    providers::ProviderNodeTypes, BlockExecutionWriter, BlockHashReader, ChainStateBlockWriter,
10    DBProvider, DatabaseProviderFactory, ProviderFactory, SaveBlocksMode,
11};
12use reth_prune::{PrunerError, PrunerWithFactory};
13use reth_stages_api::{MetricEvent, MetricEventsSender};
14use reth_tasks::spawn_os_thread;
15use std::{
16    sync::{
17        mpsc::{Receiver, SendError, Sender},
18        Arc,
19    },
20    thread::JoinHandle,
21};
22use thiserror::Error;
23use tracing::{debug, error, instrument};
24
25/// Writes parts of reth's in memory tree state to the database and static files.
26///
27/// This is meant to be a spawned service that listens for various incoming persistence operations,
28/// performing those actions on disk, and returning the result in a channel.
29///
30/// This should be spawned in its own thread with [`std::thread::spawn`], since this performs
31/// blocking I/O operations in an endless loop.
32#[derive(Debug)]
33pub struct PersistenceService<N>
34where
35    N: ProviderNodeTypes,
36{
37    /// The provider factory to use
38    provider: ProviderFactory<N>,
39    /// Incoming requests
40    incoming: Receiver<PersistenceAction<N::Primitives>>,
41    /// The pruner
42    pruner: PrunerWithFactory<ProviderFactory<N>>,
43    /// metrics
44    metrics: PersistenceMetrics,
45    /// Sender for sync metrics - we only submit sync metrics for persisted blocks
46    sync_metrics_tx: MetricEventsSender,
47    /// Pending finalized block number to be committed with the next block save.
48    /// This avoids triggering a separate fsync for each finalized block update.
49    pending_finalized_block: Option<u64>,
50    /// Pending safe block number to be committed with the next block save.
51    /// This avoids triggering a separate fsync for each safe block update.
52    pending_safe_block: Option<u64>,
53}
54
55impl<N> PersistenceService<N>
56where
57    N: ProviderNodeTypes,
58{
59    /// Create a new persistence service
60    pub fn new(
61        provider: ProviderFactory<N>,
62        incoming: Receiver<PersistenceAction<N::Primitives>>,
63        pruner: PrunerWithFactory<ProviderFactory<N>>,
64        sync_metrics_tx: MetricEventsSender,
65    ) -> Self {
66        Self {
67            provider,
68            incoming,
69            pruner,
70            metrics: PersistenceMetrics::default(),
71            sync_metrics_tx,
72            pending_finalized_block: None,
73            pending_safe_block: None,
74        }
75    }
76}
77
78impl<N> PersistenceService<N>
79where
80    N: ProviderNodeTypes,
81{
82    /// This is the main loop, that will listen to database events and perform the requested
83    /// database actions
84    pub fn run(mut self) -> Result<(), PersistenceError> {
85        // If the receiver errors then senders have disconnected, so the loop should then end.
86        while let Ok(action) = self.incoming.recv() {
87            match action {
88                PersistenceAction::RemoveBlocksAbove(new_tip_num, sender) => {
89                    let result = self.on_remove_blocks_above(new_tip_num)?;
90                    // send new sync metrics based on removed blocks
91                    let _ =
92                        self.sync_metrics_tx.send(MetricEvent::SyncHeight { height: new_tip_num });
93                    // we ignore the error because the caller may or may not care about the result
94                    let _ = sender.send(result);
95                }
96                PersistenceAction::SaveBlocks(blocks, sender) => {
97                    let result = self.on_save_blocks(blocks)?;
98                    let result_number = result.map(|r| r.number);
99
100                    // we ignore the error because the caller may or may not care about the result
101                    let _ = sender.send(result);
102
103                    if let Some(block_number) = result_number {
104                        // send new sync metrics based on saved blocks
105                        let _ = self
106                            .sync_metrics_tx
107                            .send(MetricEvent::SyncHeight { height: block_number });
108                    }
109                }
110                PersistenceAction::SaveFinalizedBlock(finalized_block) => {
111                    self.pending_finalized_block = Some(finalized_block);
112                }
113                PersistenceAction::SaveSafeBlock(safe_block) => {
114                    self.pending_safe_block = Some(safe_block);
115                }
116            }
117        }
118        Ok(())
119    }
120
121    #[instrument(level = "debug", target = "engine::persistence", skip_all, fields(%new_tip_num))]
122    fn on_remove_blocks_above(
123        &self,
124        new_tip_num: u64,
125    ) -> Result<Option<BlockNumHash>, PersistenceError> {
126        debug!(target: "engine::persistence", ?new_tip_num, "Removing blocks");
127        let start_time = Instant::now();
128        let provider_rw = self.provider.database_provider_rw()?;
129
130        let new_tip_hash = provider_rw.block_hash(new_tip_num)?;
131        provider_rw.remove_block_and_execution_above(new_tip_num)?;
132        provider_rw.commit()?;
133
134        debug!(target: "engine::persistence", ?new_tip_num, ?new_tip_hash, "Removed blocks from disk");
135        self.metrics.remove_blocks_above_duration_seconds.record(start_time.elapsed());
136        Ok(new_tip_hash.map(|hash| BlockNumHash { hash, number: new_tip_num }))
137    }
138
139    #[instrument(level = "debug", target = "engine::persistence", skip_all, fields(block_count = blocks.len()))]
140    fn on_save_blocks(
141        &mut self,
142        blocks: Vec<ExecutedBlock<N::Primitives>>,
143    ) -> Result<Option<BlockNumHash>, PersistenceError> {
144        let first_block = blocks.first().map(|b| b.recovered_block.num_hash());
145        let last_block = blocks.last().map(|b| b.recovered_block.num_hash());
146        let block_count = blocks.len();
147
148        let pending_finalized = self.pending_finalized_block.take();
149        let pending_safe = self.pending_safe_block.take();
150
151        debug!(target: "engine::persistence", ?block_count, first=?first_block, last=?last_block, "Saving range of blocks");
152
153        let start_time = Instant::now();
154
155        if let Some(last) = last_block {
156            let provider_rw = self.provider.database_provider_rw()?;
157            provider_rw.save_blocks(blocks, SaveBlocksMode::Full)?;
158
159            if let Some(finalized) = pending_finalized {
160                // Clamp to the highest persisted block so that on restart
161                // `last_finalized_block_number` never points past available state.
162                provider_rw.save_finalized_block_number(finalized.min(last.number))?;
163                if finalized > last.number {
164                    self.pending_finalized_block = Some(finalized);
165                }
166            }
167            if let Some(safe) = pending_safe {
168                provider_rw.save_safe_block_number(safe.min(last.number))?;
169                if safe > last.number {
170                    self.pending_safe_block = Some(safe);
171                }
172            }
173
174            if self.pruner.is_pruning_needed(last.number) {
175                debug!(target: "engine::persistence", block_num=?last.number, "Running pruner");
176                let prune_start = Instant::now();
177                let _ = self.pruner.run_with_provider(&provider_rw, last.number)?;
178                self.metrics.prune_before_duration_seconds.record(prune_start.elapsed());
179            }
180
181            provider_rw.commit()?;
182        }
183
184        debug!(target: "engine::persistence", first=?first_block, last=?last_block, "Saved range of blocks");
185
186        self.metrics.save_blocks_batch_size.record(block_count as f64);
187        self.metrics.save_blocks_duration_seconds.record(start_time.elapsed());
188
189        Ok(last_block)
190    }
191}
192
193/// One of the errors that can happen when using the persistence service.
194#[derive(Debug, Error)]
195pub enum PersistenceError {
196    /// A pruner error
197    #[error(transparent)]
198    PrunerError(#[from] PrunerError),
199
200    /// A provider error
201    #[error(transparent)]
202    ProviderError(#[from] ProviderError),
203}
204
205/// A signal to the persistence service that part of the tree state can be persisted.
206#[derive(Debug)]
207pub enum PersistenceAction<N: NodePrimitives = EthPrimitives> {
208    /// The section of tree state that should be persisted. These blocks are expected in order of
209    /// increasing block number.
210    ///
211    /// First, header, transaction, and receipt-related data should be written to static files.
212    /// Then the execution history-related data will be written to the database.
213    SaveBlocks(Vec<ExecutedBlock<N>>, CrossbeamSender<Option<BlockNumHash>>),
214
215    /// Removes block data above the given block number from the database.
216    ///
217    /// This will first update checkpoints from the database, then remove actual block data from
218    /// static files.
219    RemoveBlocksAbove(u64, CrossbeamSender<Option<BlockNumHash>>),
220
221    /// Update the persisted finalized block on disk
222    SaveFinalizedBlock(u64),
223
224    /// Update the persisted safe block on disk
225    SaveSafeBlock(u64),
226}
227
228/// A handle to the persistence service
229#[derive(Debug, Clone)]
230pub struct PersistenceHandle<N: NodePrimitives = EthPrimitives> {
231    /// The channel used to communicate with the persistence service
232    sender: Sender<PersistenceAction<N>>,
233    /// Guard that joins the service thread when all handles are dropped.
234    /// Uses `Arc` so the handle remains `Clone`.
235    _service_guard: Arc<ServiceGuard>,
236}
237
238impl<T: NodePrimitives> PersistenceHandle<T> {
239    /// Create a new [`PersistenceHandle`] from a [`Sender<PersistenceAction>`].
240    ///
241    /// This is intended for testing purposes where you want to mock the persistence service.
242    /// For production use, prefer [`spawn_service`](Self::spawn_service).
243    pub fn new(sender: Sender<PersistenceAction<T>>) -> Self {
244        Self { sender, _service_guard: Arc::new(ServiceGuard(None)) }
245    }
246
247    /// Create a new [`PersistenceHandle`], and spawn the persistence service.
248    ///
249    /// The returned handle can be cloned and shared. When all clones are dropped, the service
250    /// thread will be joined, ensuring graceful shutdown before resources (like `RocksDB`) are
251    /// released.
252    pub fn spawn_service<N>(
253        provider_factory: ProviderFactory<N>,
254        pruner: PrunerWithFactory<ProviderFactory<N>>,
255        sync_metrics_tx: MetricEventsSender,
256    ) -> PersistenceHandle<N::Primitives>
257    where
258        N: ProviderNodeTypes,
259    {
260        // create the initial channels
261        let (db_service_tx, db_service_rx) = std::sync::mpsc::channel();
262
263        // spawn the persistence service
264        let db_service =
265            PersistenceService::new(provider_factory, db_service_rx, pruner, sync_metrics_tx);
266        let join_handle = spawn_os_thread("persistence", || {
267            if let Err(err) = db_service.run() {
268                error!(target: "engine::persistence", ?err, "Persistence service failed");
269            }
270        });
271
272        PersistenceHandle {
273            sender: db_service_tx,
274            _service_guard: Arc::new(ServiceGuard(Some(join_handle))),
275        }
276    }
277
278    /// Sends a specific [`PersistenceAction`] in the contained channel. The caller is responsible
279    /// for creating any channels for the given action.
280    pub fn send_action(
281        &self,
282        action: PersistenceAction<T>,
283    ) -> Result<(), SendError<PersistenceAction<T>>> {
284        self.sender.send(action)
285    }
286
287    /// Tells the persistence service to save a certain list of finalized blocks. The blocks are
288    /// assumed to be ordered by block number.
289    ///
290    /// This returns the latest hash that has been saved, allowing removal of that block and any
291    /// previous blocks from in-memory data structures. This value is returned in the receiver end
292    /// of the sender argument.
293    ///
294    /// If there are no blocks to persist, then `None` is sent in the sender.
295    pub fn save_blocks(
296        &self,
297        blocks: Vec<ExecutedBlock<T>>,
298        tx: CrossbeamSender<Option<BlockNumHash>>,
299    ) -> Result<(), SendError<PersistenceAction<T>>> {
300        self.send_action(PersistenceAction::SaveBlocks(blocks, tx))
301    }
302
303    /// Queues the finalized block number to be persisted on disk.
304    ///
305    /// The update is deferred and will be committed together with the next [`Self::save_blocks`]
306    /// call to avoid triggering a separate fsync for each update.
307    pub fn save_finalized_block_number(
308        &self,
309        finalized_block: u64,
310    ) -> Result<(), SendError<PersistenceAction<T>>> {
311        self.send_action(PersistenceAction::SaveFinalizedBlock(finalized_block))
312    }
313
314    /// Queues the safe block number to be persisted on disk.
315    ///
316    /// The update is deferred and will be committed together with the next [`Self::save_blocks`]
317    /// call to avoid triggering a separate fsync for each update.
318    pub fn save_safe_block_number(
319        &self,
320        safe_block: u64,
321    ) -> Result<(), SendError<PersistenceAction<T>>> {
322        self.send_action(PersistenceAction::SaveSafeBlock(safe_block))
323    }
324
325    /// Tells the persistence service to remove blocks above a certain block number. The removed
326    /// blocks are returned by the service.
327    ///
328    /// When the operation completes, the new tip hash is returned in the receiver end of the sender
329    /// argument.
330    pub fn remove_blocks_above(
331        &self,
332        block_num: u64,
333        tx: CrossbeamSender<Option<BlockNumHash>>,
334    ) -> Result<(), SendError<PersistenceAction<T>>> {
335        self.send_action(PersistenceAction::RemoveBlocksAbove(block_num, tx))
336    }
337}
338
339/// Guard that joins the persistence service thread when dropped.
340///
341/// This ensures graceful shutdown - the service thread completes before resources like
342/// `RocksDB` are released. Stored in an `Arc` inside [`PersistenceHandle`] so the handle
343/// can be cloned while sharing the same guard.
344struct ServiceGuard(Option<JoinHandle<()>>);
345
346impl std::fmt::Debug for ServiceGuard {
347    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
348        f.debug_tuple("ServiceGuard").field(&self.0.as_ref().map(|_| "...")).finish()
349    }
350}
351
352impl Drop for ServiceGuard {
353    fn drop(&mut self) {
354        if let Some(join_handle) = self.0.take() {
355            let _ = join_handle.join();
356        }
357    }
358}
359
360#[cfg(test)]
361mod tests {
362    use super::*;
363    use alloy_primitives::B256;
364    use reth_chain_state::test_utils::TestBlockBuilder;
365    use reth_exex_types::FinishedExExHeight;
366    use reth_provider::test_utils::create_test_provider_factory;
367    use reth_prune::Pruner;
368    use tokio::sync::mpsc::unbounded_channel;
369
370    fn default_persistence_handle() -> PersistenceHandle<EthPrimitives> {
371        let provider = create_test_provider_factory();
372
373        let (_finished_exex_height_tx, finished_exex_height_rx) =
374            tokio::sync::watch::channel(FinishedExExHeight::NoExExs);
375
376        let pruner =
377            Pruner::new_with_factory(provider.clone(), vec![], 5, 0, None, finished_exex_height_rx);
378
379        let (sync_metrics_tx, _sync_metrics_rx) = unbounded_channel();
380        PersistenceHandle::<EthPrimitives>::spawn_service(provider, pruner, sync_metrics_tx)
381    }
382
383    #[test]
384    fn test_save_blocks_empty() {
385        reth_tracing::init_test_tracing();
386        let handle = default_persistence_handle();
387
388        let blocks = vec![];
389        let (tx, rx) = crossbeam_channel::bounded(1);
390
391        handle.save_blocks(blocks, tx).unwrap();
392
393        let hash = rx.recv().unwrap();
394        assert_eq!(hash, None);
395    }
396
397    #[test]
398    fn test_save_blocks_single_block() {
399        reth_tracing::init_test_tracing();
400        let handle = default_persistence_handle();
401        let block_number = 0;
402        let mut test_block_builder = TestBlockBuilder::eth();
403        let executed =
404            test_block_builder.get_executed_block_with_number(block_number, B256::random());
405        let block_hash = executed.recovered_block().hash();
406
407        let blocks = vec![executed];
408        let (tx, rx) = crossbeam_channel::bounded(1);
409
410        handle.save_blocks(blocks, tx).unwrap();
411
412        let BlockNumHash { hash: actual_hash, number: _ } = rx
413            .recv_timeout(std::time::Duration::from_secs(10))
414            .expect("test timed out")
415            .expect("no hash returned");
416
417        assert_eq!(block_hash, actual_hash);
418    }
419
420    #[test]
421    fn test_save_blocks_multiple_blocks() {
422        reth_tracing::init_test_tracing();
423        let handle = default_persistence_handle();
424
425        let mut test_block_builder = TestBlockBuilder::eth();
426        let blocks = test_block_builder.get_executed_blocks(0..5).collect::<Vec<_>>();
427        let last_hash = blocks.last().unwrap().recovered_block().hash();
428        let (tx, rx) = crossbeam_channel::bounded(1);
429
430        handle.save_blocks(blocks, tx).unwrap();
431        let BlockNumHash { hash: actual_hash, number: _ } = rx.recv().unwrap().unwrap();
432        assert_eq!(last_hash, actual_hash);
433    }
434
435    #[test]
436    fn test_save_blocks_multiple_calls() {
437        reth_tracing::init_test_tracing();
438        let handle = default_persistence_handle();
439
440        let ranges = [0..1, 1..2, 2..4, 4..5];
441        let mut test_block_builder = TestBlockBuilder::eth();
442        for range in ranges {
443            let blocks = test_block_builder.get_executed_blocks(range).collect::<Vec<_>>();
444            let last_hash = blocks.last().unwrap().recovered_block().hash();
445            let (tx, rx) = crossbeam_channel::bounded(1);
446
447            handle.save_blocks(blocks, tx).unwrap();
448
449            let BlockNumHash { hash: actual_hash, number: _ } = rx.recv().unwrap().unwrap();
450            assert_eq!(last_hash, actual_hash);
451        }
452    }
453}