reth_engine_tree/
persistence.rs

1use crate::metrics::PersistenceMetrics;
2use alloy_consensus::BlockHeader;
3use alloy_eips::BlockNumHash;
4use reth_chain_state::ExecutedBlockWithTrieUpdates;
5use reth_errors::ProviderError;
6use reth_ethereum_primitives::EthPrimitives;
7use reth_primitives_traits::NodePrimitives;
8use reth_provider::{
9    providers::ProviderNodeTypes, BlockExecutionWriter, BlockHashReader, ChainStateBlockWriter,
10    DBProvider, DatabaseProviderFactory, ProviderFactory,
11};
12use reth_prune::{PrunerError, PrunerOutput, PrunerWithFactory};
13use reth_stages_api::{MetricEvent, MetricEventsSender};
14use std::{
15    sync::mpsc::{Receiver, SendError, Sender},
16    time::Instant,
17};
18use thiserror::Error;
19use tokio::sync::oneshot;
20use tracing::{debug, error};
21
22/// Writes parts of reth's in memory tree state to the database and static files.
23///
24/// This is meant to be a spawned service that listens for various incoming persistence operations,
25/// performing those actions on disk, and returning the result in a channel.
26///
27/// This should be spawned in its own thread with [`std::thread::spawn`], since this performs
28/// blocking I/O operations in an endless loop.
29#[derive(Debug)]
30pub struct PersistenceService<N>
31where
32    N: ProviderNodeTypes,
33{
34    /// The provider factory to use
35    provider: ProviderFactory<N>,
36    /// Incoming requests
37    incoming: Receiver<PersistenceAction<N::Primitives>>,
38    /// The pruner
39    pruner: PrunerWithFactory<ProviderFactory<N>>,
40    /// metrics
41    metrics: PersistenceMetrics,
42    /// Sender for sync metrics - we only submit sync metrics for persisted blocks
43    sync_metrics_tx: MetricEventsSender,
44}
45
46impl<N> PersistenceService<N>
47where
48    N: ProviderNodeTypes,
49{
50    /// Create a new persistence service
51    pub fn new(
52        provider: ProviderFactory<N>,
53        incoming: Receiver<PersistenceAction<N::Primitives>>,
54        pruner: PrunerWithFactory<ProviderFactory<N>>,
55        sync_metrics_tx: MetricEventsSender,
56    ) -> Self {
57        Self { provider, incoming, pruner, metrics: PersistenceMetrics::default(), sync_metrics_tx }
58    }
59
60    /// Prunes block data before the given block number according to the configured prune
61    /// configuration.
62    fn prune_before(&mut self, block_num: u64) -> Result<PrunerOutput, PrunerError> {
63        debug!(target: "engine::persistence", ?block_num, "Running pruner");
64        let start_time = Instant::now();
65        // TODO: doing this properly depends on pruner segment changes
66        let result = self.pruner.run(block_num);
67        self.metrics.prune_before_duration_seconds.record(start_time.elapsed());
68        result
69    }
70}
71
72impl<N> PersistenceService<N>
73where
74    N: ProviderNodeTypes,
75{
76    /// This is the main loop, that will listen to database events and perform the requested
77    /// database actions
78    pub fn run(mut self) -> Result<(), PersistenceError> {
79        // If the receiver errors then senders have disconnected, so the loop should then end.
80        while let Ok(action) = self.incoming.recv() {
81            match action {
82                PersistenceAction::RemoveBlocksAbove(new_tip_num, sender) => {
83                    let result = self.on_remove_blocks_above(new_tip_num)?;
84                    // send new sync metrics based on removed blocks
85                    let _ =
86                        self.sync_metrics_tx.send(MetricEvent::SyncHeight { height: new_tip_num });
87                    // we ignore the error because the caller may or may not care about the result
88                    let _ = sender.send(result);
89                }
90                PersistenceAction::SaveBlocks(blocks, sender) => {
91                    let result = self.on_save_blocks(blocks)?;
92                    let result_number = result.map(|r| r.number);
93
94                    // we ignore the error because the caller may or may not care about the result
95                    let _ = sender.send(result);
96
97                    if let Some(block_number) = result_number {
98                        // send new sync metrics based on saved blocks
99                        let _ = self
100                            .sync_metrics_tx
101                            .send(MetricEvent::SyncHeight { height: block_number });
102
103                        if self.pruner.is_pruning_needed(block_number) {
104                            // We log `PrunerOutput` inside the `Pruner`
105                            let _ = self.prune_before(block_number)?;
106                        }
107                    }
108                }
109                PersistenceAction::SaveFinalizedBlock(finalized_block) => {
110                    let provider = self.provider.database_provider_rw()?;
111                    provider.save_finalized_block_number(finalized_block)?;
112                    provider.commit()?;
113                }
114                PersistenceAction::SaveSafeBlock(safe_block) => {
115                    let provider = self.provider.database_provider_rw()?;
116                    provider.save_safe_block_number(safe_block)?;
117                    provider.commit()?;
118                }
119            }
120        }
121        Ok(())
122    }
123
124    fn on_remove_blocks_above(
125        &self,
126        new_tip_num: u64,
127    ) -> Result<Option<BlockNumHash>, PersistenceError> {
128        debug!(target: "engine::persistence", ?new_tip_num, "Removing blocks");
129        let start_time = Instant::now();
130        let provider_rw = self.provider.database_provider_rw()?;
131
132        let new_tip_hash = provider_rw.block_hash(new_tip_num)?;
133        provider_rw.remove_block_and_execution_above(new_tip_num)?;
134        provider_rw.commit()?;
135
136        debug!(target: "engine::persistence", ?new_tip_num, ?new_tip_hash, "Removed blocks from disk");
137        self.metrics.remove_blocks_above_duration_seconds.record(start_time.elapsed());
138        Ok(new_tip_hash.map(|hash| BlockNumHash { hash, number: new_tip_num }))
139    }
140
141    fn on_save_blocks(
142        &self,
143        blocks: Vec<ExecutedBlockWithTrieUpdates<N::Primitives>>,
144    ) -> Result<Option<BlockNumHash>, PersistenceError> {
145        debug!(target: "engine::persistence", first=?blocks.first().map(|b| b.recovered_block.num_hash()), last=?blocks.last().map(|b| b.recovered_block.num_hash()), "Saving range of blocks");
146        let start_time = Instant::now();
147        let last_block_hash_num = blocks.last().map(|block| BlockNumHash {
148            hash: block.recovered_block().hash(),
149            number: block.recovered_block().header().number(),
150        });
151
152        if last_block_hash_num.is_some() {
153            let provider_rw = self.provider.database_provider_rw()?;
154
155            provider_rw.save_blocks(blocks)?;
156            provider_rw.commit()?;
157        }
158        self.metrics.save_blocks_duration_seconds.record(start_time.elapsed());
159        Ok(last_block_hash_num)
160    }
161}
162
163/// One of the errors that can happen when using the persistence service.
164#[derive(Debug, Error)]
165pub enum PersistenceError {
166    /// A pruner error
167    #[error(transparent)]
168    PrunerError(#[from] PrunerError),
169
170    /// A provider error
171    #[error(transparent)]
172    ProviderError(#[from] ProviderError),
173}
174
175/// A signal to the persistence service that part of the tree state can be persisted.
176#[derive(Debug)]
177pub enum PersistenceAction<N: NodePrimitives = EthPrimitives> {
178    /// The section of tree state that should be persisted. These blocks are expected in order of
179    /// increasing block number.
180    ///
181    /// First, header, transaction, and receipt-related data should be written to static files.
182    /// Then the execution history-related data will be written to the database.
183    SaveBlocks(Vec<ExecutedBlockWithTrieUpdates<N>>, oneshot::Sender<Option<BlockNumHash>>),
184
185    /// Removes block data above the given block number from the database.
186    ///
187    /// This will first update checkpoints from the database, then remove actual block data from
188    /// static files.
189    RemoveBlocksAbove(u64, oneshot::Sender<Option<BlockNumHash>>),
190
191    /// Update the persisted finalized block on disk
192    SaveFinalizedBlock(u64),
193
194    /// Update the persisted safe block on disk
195    SaveSafeBlock(u64),
196}
197
198/// A handle to the persistence service
199#[derive(Debug, Clone)]
200pub struct PersistenceHandle<N: NodePrimitives = EthPrimitives> {
201    /// The channel used to communicate with the persistence service
202    sender: Sender<PersistenceAction<N>>,
203}
204
205impl<T: NodePrimitives> PersistenceHandle<T> {
206    /// Create a new [`PersistenceHandle`] from a [`Sender<PersistenceAction>`].
207    pub const fn new(sender: Sender<PersistenceAction<T>>) -> Self {
208        Self { sender }
209    }
210
211    /// Create a new [`PersistenceHandle`], and spawn the persistence service.
212    pub fn spawn_service<N>(
213        provider_factory: ProviderFactory<N>,
214        pruner: PrunerWithFactory<ProviderFactory<N>>,
215        sync_metrics_tx: MetricEventsSender,
216    ) -> PersistenceHandle<N::Primitives>
217    where
218        N: ProviderNodeTypes,
219    {
220        // create the initial channels
221        let (db_service_tx, db_service_rx) = std::sync::mpsc::channel();
222
223        // construct persistence handle
224        let persistence_handle = PersistenceHandle::new(db_service_tx);
225
226        // spawn the persistence service
227        let db_service =
228            PersistenceService::new(provider_factory, db_service_rx, pruner, sync_metrics_tx);
229        std::thread::Builder::new()
230            .name("Persistence Service".to_string())
231            .spawn(|| {
232                if let Err(err) = db_service.run() {
233                    error!(target: "engine::persistence", ?err, "Persistence service failed");
234                }
235            })
236            .unwrap();
237
238        persistence_handle
239    }
240
241    /// Sends a specific [`PersistenceAction`] in the contained channel. The caller is responsible
242    /// for creating any channels for the given action.
243    pub fn send_action(
244        &self,
245        action: PersistenceAction<T>,
246    ) -> Result<(), SendError<PersistenceAction<T>>> {
247        self.sender.send(action)
248    }
249
250    /// Tells the persistence service to save a certain list of finalized blocks. The blocks are
251    /// assumed to be ordered by block number.
252    ///
253    /// This returns the latest hash that has been saved, allowing removal of that block and any
254    /// previous blocks from in-memory data structures. This value is returned in the receiver end
255    /// of the sender argument.
256    ///
257    /// If there are no blocks to persist, then `None` is sent in the sender.
258    pub fn save_blocks(
259        &self,
260        blocks: Vec<ExecutedBlockWithTrieUpdates<T>>,
261        tx: oneshot::Sender<Option<BlockNumHash>>,
262    ) -> Result<(), SendError<PersistenceAction<T>>> {
263        self.send_action(PersistenceAction::SaveBlocks(blocks, tx))
264    }
265
266    /// Persists the finalized block number on disk.
267    pub fn save_finalized_block_number(
268        &self,
269        finalized_block: u64,
270    ) -> Result<(), SendError<PersistenceAction<T>>> {
271        self.send_action(PersistenceAction::SaveFinalizedBlock(finalized_block))
272    }
273
274    /// Persists the safe block number on disk.
275    pub fn save_safe_block_number(
276        &self,
277        safe_block: u64,
278    ) -> Result<(), SendError<PersistenceAction<T>>> {
279        self.send_action(PersistenceAction::SaveSafeBlock(safe_block))
280    }
281
282    /// Tells the persistence service to remove blocks above a certain block number. The removed
283    /// blocks are returned by the service.
284    ///
285    /// When the operation completes, the new tip hash is returned in the receiver end of the sender
286    /// argument.
287    pub fn remove_blocks_above(
288        &self,
289        block_num: u64,
290        tx: oneshot::Sender<Option<BlockNumHash>>,
291    ) -> Result<(), SendError<PersistenceAction<T>>> {
292        self.send_action(PersistenceAction::RemoveBlocksAbove(block_num, tx))
293    }
294}
295
296#[cfg(test)]
297mod tests {
298    use super::*;
299    use alloy_primitives::B256;
300    use reth_chain_state::test_utils::TestBlockBuilder;
301    use reth_exex_types::FinishedExExHeight;
302    use reth_provider::test_utils::create_test_provider_factory;
303    use reth_prune::Pruner;
304    use tokio::sync::mpsc::unbounded_channel;
305
306    fn default_persistence_handle() -> PersistenceHandle<EthPrimitives> {
307        let provider = create_test_provider_factory();
308
309        let (_finished_exex_height_tx, finished_exex_height_rx) =
310            tokio::sync::watch::channel(FinishedExExHeight::NoExExs);
311
312        let pruner =
313            Pruner::new_with_factory(provider.clone(), vec![], 5, 0, None, finished_exex_height_rx);
314
315        let (sync_metrics_tx, _sync_metrics_rx) = unbounded_channel();
316        PersistenceHandle::<EthPrimitives>::spawn_service(provider, pruner, sync_metrics_tx)
317    }
318
319    #[tokio::test]
320    async fn test_save_blocks_empty() {
321        reth_tracing::init_test_tracing();
322        let persistence_handle = default_persistence_handle();
323
324        let blocks = vec![];
325        let (tx, rx) = oneshot::channel();
326
327        persistence_handle.save_blocks(blocks, tx).unwrap();
328
329        let hash = rx.await.unwrap();
330        assert_eq!(hash, None);
331    }
332
333    #[tokio::test]
334    async fn test_save_blocks_single_block() {
335        reth_tracing::init_test_tracing();
336        let persistence_handle = default_persistence_handle();
337        let block_number = 0;
338        let mut test_block_builder = TestBlockBuilder::eth();
339        let executed =
340            test_block_builder.get_executed_block_with_number(block_number, B256::random());
341        let block_hash = executed.recovered_block().hash();
342
343        let blocks = vec![executed];
344        let (tx, rx) = oneshot::channel();
345
346        persistence_handle.save_blocks(blocks, tx).unwrap();
347
348        let BlockNumHash { hash: actual_hash, number: _ } =
349            tokio::time::timeout(std::time::Duration::from_secs(10), rx)
350                .await
351                .expect("test timed out")
352                .expect("channel closed unexpectedly")
353                .expect("no hash returned");
354
355        assert_eq!(block_hash, actual_hash);
356    }
357
358    #[tokio::test]
359    async fn test_save_blocks_multiple_blocks() {
360        reth_tracing::init_test_tracing();
361        let persistence_handle = default_persistence_handle();
362
363        let mut test_block_builder = TestBlockBuilder::eth();
364        let blocks = test_block_builder.get_executed_blocks(0..5).collect::<Vec<_>>();
365        let last_hash = blocks.last().unwrap().recovered_block().hash();
366        let (tx, rx) = oneshot::channel();
367
368        persistence_handle.save_blocks(blocks, tx).unwrap();
369        let BlockNumHash { hash: actual_hash, number: _ } = rx.await.unwrap().unwrap();
370        assert_eq!(last_hash, actual_hash);
371    }
372
373    #[tokio::test]
374    async fn test_save_blocks_multiple_calls() {
375        reth_tracing::init_test_tracing();
376        let persistence_handle = default_persistence_handle();
377
378        let ranges = [0..1, 1..2, 2..4, 4..5];
379        let mut test_block_builder = TestBlockBuilder::eth();
380        for range in ranges {
381            let blocks = test_block_builder.get_executed_blocks(range).collect::<Vec<_>>();
382            let last_hash = blocks.last().unwrap().recovered_block().hash();
383            let (tx, rx) = oneshot::channel();
384
385            persistence_handle.save_blocks(blocks, tx).unwrap();
386
387            let BlockNumHash { hash: actual_hash, number: _ } = rx.await.unwrap().unwrap();
388            assert_eq!(last_hash, actual_hash);
389        }
390    }
391}