reth_engine_tree/
persistence.rs

1use crate::metrics::PersistenceMetrics;
2use alloy_consensus::BlockHeader;
3use alloy_eips::BlockNumHash;
4use reth_chain_state::ExecutedBlockWithTrieUpdates;
5use reth_errors::ProviderError;
6use reth_ethereum_primitives::EthPrimitives;
7use reth_primitives_traits::NodePrimitives;
8use reth_provider::{
9    providers::ProviderNodeTypes, writer::UnifiedStorageWriter, BlockHashReader,
10    ChainStateBlockWriter, DatabaseProviderFactory, ProviderFactory, StaticFileProviderFactory,
11};
12use reth_prune::{PrunerError, PrunerOutput, PrunerWithFactory};
13use reth_stages_api::{MetricEvent, MetricEventsSender};
14use std::{
15    sync::mpsc::{Receiver, SendError, Sender},
16    time::Instant,
17};
18use thiserror::Error;
19use tokio::sync::oneshot;
20use tracing::{debug, error};
21
22/// Writes parts of reth's in memory tree state to the database and static files.
23///
24/// This is meant to be a spawned service that listens for various incoming persistence operations,
25/// performing those actions on disk, and returning the result in a channel.
26///
27/// This should be spawned in its own thread with [`std::thread::spawn`], since this performs
28/// blocking I/O operations in an endless loop.
29#[derive(Debug)]
30pub struct PersistenceService<N>
31where
32    N: ProviderNodeTypes,
33{
34    /// The provider factory to use
35    provider: ProviderFactory<N>,
36    /// Incoming requests
37    incoming: Receiver<PersistenceAction<N::Primitives>>,
38    /// The pruner
39    pruner: PrunerWithFactory<ProviderFactory<N>>,
40    /// metrics
41    metrics: PersistenceMetrics,
42    /// Sender for sync metrics - we only submit sync metrics for persisted blocks
43    sync_metrics_tx: MetricEventsSender,
44}
45
46impl<N> PersistenceService<N>
47where
48    N: ProviderNodeTypes,
49{
50    /// Create a new persistence service
51    pub fn new(
52        provider: ProviderFactory<N>,
53        incoming: Receiver<PersistenceAction<N::Primitives>>,
54        pruner: PrunerWithFactory<ProviderFactory<N>>,
55        sync_metrics_tx: MetricEventsSender,
56    ) -> Self {
57        Self { provider, incoming, pruner, metrics: PersistenceMetrics::default(), sync_metrics_tx }
58    }
59
60    /// Prunes block data before the given block hash according to the configured prune
61    /// configuration.
62    fn prune_before(&mut self, block_num: u64) -> Result<PrunerOutput, PrunerError> {
63        debug!(target: "engine::persistence", ?block_num, "Running pruner");
64        let start_time = Instant::now();
65        // TODO: doing this properly depends on pruner segment changes
66        let result = self.pruner.run(block_num);
67        self.metrics.prune_before_duration_seconds.record(start_time.elapsed());
68        result
69    }
70}
71
72impl<N> PersistenceService<N>
73where
74    N: ProviderNodeTypes,
75{
76    /// This is the main loop, that will listen to database events and perform the requested
77    /// database actions
78    pub fn run(mut self) -> Result<(), PersistenceError> {
79        // If the receiver errors then senders have disconnected, so the loop should then end.
80        while let Ok(action) = self.incoming.recv() {
81            match action {
82                PersistenceAction::RemoveBlocksAbove(new_tip_num, sender) => {
83                    let result = self.on_remove_blocks_above(new_tip_num)?;
84                    // send new sync metrics based on removed blocks
85                    let _ =
86                        self.sync_metrics_tx.send(MetricEvent::SyncHeight { height: new_tip_num });
87                    // we ignore the error because the caller may or may not care about the result
88                    let _ = sender.send(result);
89                }
90                PersistenceAction::SaveBlocks(blocks, sender) => {
91                    let result = self.on_save_blocks(blocks)?;
92                    let result_number = result.map(|r| r.number);
93
94                    // we ignore the error because the caller may or may not care about the result
95                    let _ = sender.send(result);
96
97                    if let Some(block_number) = result_number {
98                        // send new sync metrics based on saved blocks
99                        let _ = self
100                            .sync_metrics_tx
101                            .send(MetricEvent::SyncHeight { height: block_number });
102
103                        if self.pruner.is_pruning_needed(block_number) {
104                            // We log `PrunerOutput` inside the `Pruner`
105                            let _ = self.prune_before(block_number)?;
106                        }
107                    }
108                }
109                PersistenceAction::SaveFinalizedBlock(finalized_block) => {
110                    let provider = self.provider.database_provider_rw()?;
111                    provider.save_finalized_block_number(finalized_block)?;
112                    provider.commit()?;
113                }
114                PersistenceAction::SaveSafeBlock(safe_block) => {
115                    let provider = self.provider.database_provider_rw()?;
116                    provider.save_safe_block_number(safe_block)?;
117                    provider.commit()?;
118                }
119            }
120        }
121        Ok(())
122    }
123
124    fn on_remove_blocks_above(
125        &self,
126        new_tip_num: u64,
127    ) -> Result<Option<BlockNumHash>, PersistenceError> {
128        debug!(target: "engine::persistence", ?new_tip_num, "Removing blocks");
129        let start_time = Instant::now();
130        let provider_rw = self.provider.database_provider_rw()?;
131        let sf_provider = self.provider.static_file_provider();
132
133        let new_tip_hash = provider_rw.block_hash(new_tip_num)?;
134        UnifiedStorageWriter::from(&provider_rw, &sf_provider).remove_blocks_above(new_tip_num)?;
135        UnifiedStorageWriter::commit_unwind(provider_rw)?;
136
137        debug!(target: "engine::persistence", ?new_tip_num, ?new_tip_hash, "Removed blocks from disk");
138        self.metrics.remove_blocks_above_duration_seconds.record(start_time.elapsed());
139        Ok(new_tip_hash.map(|hash| BlockNumHash { hash, number: new_tip_num }))
140    }
141
142    fn on_save_blocks(
143        &self,
144        blocks: Vec<ExecutedBlockWithTrieUpdates<N::Primitives>>,
145    ) -> Result<Option<BlockNumHash>, PersistenceError> {
146        debug!(target: "engine::persistence", first=?blocks.first().map(|b| b.recovered_block.num_hash()), last=?blocks.last().map(|b| b.recovered_block.num_hash()), "Saving range of blocks");
147        let start_time = Instant::now();
148        let last_block_hash_num = blocks.last().map(|block| BlockNumHash {
149            hash: block.recovered_block().hash(),
150            number: block.recovered_block().header().number(),
151        });
152
153        if last_block_hash_num.is_some() {
154            let provider_rw = self.provider.database_provider_rw()?;
155            let static_file_provider = self.provider.static_file_provider();
156
157            UnifiedStorageWriter::from(&provider_rw, &static_file_provider).save_blocks(blocks)?;
158            UnifiedStorageWriter::commit(provider_rw)?;
159        }
160        self.metrics.save_blocks_duration_seconds.record(start_time.elapsed());
161        Ok(last_block_hash_num)
162    }
163}
164
165/// One of the errors that can happen when using the persistence service.
166#[derive(Debug, Error)]
167pub enum PersistenceError {
168    /// A pruner error
169    #[error(transparent)]
170    PrunerError(#[from] PrunerError),
171
172    /// A provider error
173    #[error(transparent)]
174    ProviderError(#[from] ProviderError),
175}
176
177/// A signal to the persistence service that part of the tree state can be persisted.
178#[derive(Debug)]
179pub enum PersistenceAction<N: NodePrimitives = EthPrimitives> {
180    /// The section of tree state that should be persisted. These blocks are expected in order of
181    /// increasing block number.
182    ///
183    /// First, header, transaction, and receipt-related data should be written to static files.
184    /// Then the execution history-related data will be written to the database.
185    SaveBlocks(Vec<ExecutedBlockWithTrieUpdates<N>>, oneshot::Sender<Option<BlockNumHash>>),
186
187    /// Removes block data above the given block number from the database.
188    ///
189    /// This will first update checkpoints from the database, then remove actual block data from
190    /// static files.
191    RemoveBlocksAbove(u64, oneshot::Sender<Option<BlockNumHash>>),
192
193    /// Update the persisted finalized block on disk
194    SaveFinalizedBlock(u64),
195
196    /// Update the persisted safe block on disk
197    SaveSafeBlock(u64),
198}
199
200/// A handle to the persistence service
201#[derive(Debug, Clone)]
202pub struct PersistenceHandle<N: NodePrimitives = EthPrimitives> {
203    /// The channel used to communicate with the persistence service
204    sender: Sender<PersistenceAction<N>>,
205}
206
207impl<T: NodePrimitives> PersistenceHandle<T> {
208    /// Create a new [`PersistenceHandle`] from a [`Sender<PersistenceAction>`].
209    pub const fn new(sender: Sender<PersistenceAction<T>>) -> Self {
210        Self { sender }
211    }
212
213    /// Create a new [`PersistenceHandle`], and spawn the persistence service.
214    pub fn spawn_service<N>(
215        provider_factory: ProviderFactory<N>,
216        pruner: PrunerWithFactory<ProviderFactory<N>>,
217        sync_metrics_tx: MetricEventsSender,
218    ) -> PersistenceHandle<N::Primitives>
219    where
220        N: ProviderNodeTypes,
221    {
222        // create the initial channels
223        let (db_service_tx, db_service_rx) = std::sync::mpsc::channel();
224
225        // construct persistence handle
226        let persistence_handle = PersistenceHandle::new(db_service_tx);
227
228        // spawn the persistence service
229        let db_service =
230            PersistenceService::new(provider_factory, db_service_rx, pruner, sync_metrics_tx);
231        std::thread::Builder::new()
232            .name("Persistence Service".to_string())
233            .spawn(|| {
234                if let Err(err) = db_service.run() {
235                    error!(target: "engine::persistence", ?err, "Persistence service failed");
236                }
237            })
238            .unwrap();
239
240        persistence_handle
241    }
242
243    /// Sends a specific [`PersistenceAction`] in the contained channel. The caller is responsible
244    /// for creating any channels for the given action.
245    pub fn send_action(
246        &self,
247        action: PersistenceAction<T>,
248    ) -> Result<(), SendError<PersistenceAction<T>>> {
249        self.sender.send(action)
250    }
251
252    /// Tells the persistence service to save a certain list of finalized blocks. The blocks are
253    /// assumed to be ordered by block number.
254    ///
255    /// This returns the latest hash that has been saved, allowing removal of that block and any
256    /// previous blocks from in-memory data structures. This value is returned in the receiver end
257    /// of the sender argument.
258    ///
259    /// If there are no blocks to persist, then `None` is sent in the sender.
260    pub fn save_blocks(
261        &self,
262        blocks: Vec<ExecutedBlockWithTrieUpdates<T>>,
263        tx: oneshot::Sender<Option<BlockNumHash>>,
264    ) -> Result<(), SendError<PersistenceAction<T>>> {
265        self.send_action(PersistenceAction::SaveBlocks(blocks, tx))
266    }
267
268    /// Persists the finalized block number on disk.
269    pub fn save_finalized_block_number(
270        &self,
271        finalized_block: u64,
272    ) -> Result<(), SendError<PersistenceAction<T>>> {
273        self.send_action(PersistenceAction::SaveFinalizedBlock(finalized_block))
274    }
275
276    /// Persists the finalized block number on disk.
277    pub fn save_safe_block_number(
278        &self,
279        safe_block: u64,
280    ) -> Result<(), SendError<PersistenceAction<T>>> {
281        self.send_action(PersistenceAction::SaveSafeBlock(safe_block))
282    }
283
284    /// Tells the persistence service to remove blocks above a certain block number. The removed
285    /// blocks are returned by the service.
286    ///
287    /// When the operation completes, the new tip hash is returned in the receiver end of the sender
288    /// argument.
289    pub fn remove_blocks_above(
290        &self,
291        block_num: u64,
292        tx: oneshot::Sender<Option<BlockNumHash>>,
293    ) -> Result<(), SendError<PersistenceAction<T>>> {
294        self.send_action(PersistenceAction::RemoveBlocksAbove(block_num, tx))
295    }
296}
297
298#[cfg(test)]
299mod tests {
300    use super::*;
301    use alloy_primitives::B256;
302    use reth_chain_state::test_utils::TestBlockBuilder;
303    use reth_exex_types::FinishedExExHeight;
304    use reth_provider::test_utils::create_test_provider_factory;
305    use reth_prune::Pruner;
306    use tokio::sync::mpsc::unbounded_channel;
307
308    fn default_persistence_handle() -> PersistenceHandle<EthPrimitives> {
309        let provider = create_test_provider_factory();
310
311        let (_finished_exex_height_tx, finished_exex_height_rx) =
312            tokio::sync::watch::channel(FinishedExExHeight::NoExExs);
313
314        let pruner =
315            Pruner::new_with_factory(provider.clone(), vec![], 5, 0, None, finished_exex_height_rx);
316
317        let (sync_metrics_tx, _sync_metrics_rx) = unbounded_channel();
318        PersistenceHandle::<EthPrimitives>::spawn_service(provider, pruner, sync_metrics_tx)
319    }
320
321    #[tokio::test]
322    async fn test_save_blocks_empty() {
323        reth_tracing::init_test_tracing();
324        let persistence_handle = default_persistence_handle();
325
326        let blocks = vec![];
327        let (tx, rx) = oneshot::channel();
328
329        persistence_handle.save_blocks(blocks, tx).unwrap();
330
331        let hash = rx.await.unwrap();
332        assert_eq!(hash, None);
333    }
334
335    #[tokio::test]
336    async fn test_save_blocks_single_block() {
337        reth_tracing::init_test_tracing();
338        let persistence_handle = default_persistence_handle();
339        let block_number = 0;
340        let mut test_block_builder = TestBlockBuilder::eth();
341        let executed =
342            test_block_builder.get_executed_block_with_number(block_number, B256::random());
343        let block_hash = executed.recovered_block().hash();
344
345        let blocks = vec![executed];
346        let (tx, rx) = oneshot::channel();
347
348        persistence_handle.save_blocks(blocks, tx).unwrap();
349
350        let BlockNumHash { hash: actual_hash, number: _ } =
351            tokio::time::timeout(std::time::Duration::from_secs(10), rx)
352                .await
353                .expect("test timed out")
354                .expect("channel closed unexpectedly")
355                .expect("no hash returned");
356
357        assert_eq!(block_hash, actual_hash);
358    }
359
360    #[tokio::test]
361    async fn test_save_blocks_multiple_blocks() {
362        reth_tracing::init_test_tracing();
363        let persistence_handle = default_persistence_handle();
364
365        let mut test_block_builder = TestBlockBuilder::eth();
366        let blocks = test_block_builder.get_executed_blocks(0..5).collect::<Vec<_>>();
367        let last_hash = blocks.last().unwrap().recovered_block().hash();
368        let (tx, rx) = oneshot::channel();
369
370        persistence_handle.save_blocks(blocks, tx).unwrap();
371        let BlockNumHash { hash: actual_hash, number: _ } = rx.await.unwrap().unwrap();
372        assert_eq!(last_hash, actual_hash);
373    }
374
375    #[tokio::test]
376    async fn test_save_blocks_multiple_calls() {
377        reth_tracing::init_test_tracing();
378        let persistence_handle = default_persistence_handle();
379
380        let ranges = [0..1, 1..2, 2..4, 4..5];
381        let mut test_block_builder = TestBlockBuilder::eth();
382        for range in ranges {
383            let blocks = test_block_builder.get_executed_blocks(range).collect::<Vec<_>>();
384            let last_hash = blocks.last().unwrap().recovered_block().hash();
385            let (tx, rx) = oneshot::channel();
386
387            persistence_handle.save_blocks(blocks, tx).unwrap();
388
389            let BlockNumHash { hash: actual_hash, number: _ } = rx.await.unwrap().unwrap();
390            assert_eq!(last_hash, actual_hash);
391        }
392    }
393}