reth_engine_tree/
persistence.rs

1use crate::metrics::PersistenceMetrics;
2use alloy_eips::BlockNumHash;
3use crossbeam_channel::Sender as CrossbeamSender;
4use reth_chain_state::ExecutedBlock;
5use reth_errors::ProviderError;
6use reth_ethereum_primitives::EthPrimitives;
7use reth_primitives_traits::NodePrimitives;
8use reth_provider::{
9    providers::ProviderNodeTypes, BlockExecutionWriter, BlockHashReader, ChainStateBlockWriter,
10    DBProvider, DatabaseProviderFactory, ProviderFactory, SaveBlocksMode,
11};
12use reth_prune::{PrunerError, PrunerOutput, PrunerWithFactory};
13use reth_stages_api::{MetricEvent, MetricEventsSender};
14use std::{
15    sync::mpsc::{Receiver, SendError, Sender},
16    time::Instant,
17};
18use thiserror::Error;
19use tracing::{debug, error};
20
21/// Writes parts of reth's in memory tree state to the database and static files.
22///
23/// This is meant to be a spawned service that listens for various incoming persistence operations,
24/// performing those actions on disk, and returning the result in a channel.
25///
26/// This should be spawned in its own thread with [`std::thread::spawn`], since this performs
27/// blocking I/O operations in an endless loop.
28#[derive(Debug)]
29pub struct PersistenceService<N>
30where
31    N: ProviderNodeTypes,
32{
33    /// The provider factory to use
34    provider: ProviderFactory<N>,
35    /// Incoming requests
36    incoming: Receiver<PersistenceAction<N::Primitives>>,
37    /// The pruner
38    pruner: PrunerWithFactory<ProviderFactory<N>>,
39    /// metrics
40    metrics: PersistenceMetrics,
41    /// Sender for sync metrics - we only submit sync metrics for persisted blocks
42    sync_metrics_tx: MetricEventsSender,
43}
44
45impl<N> PersistenceService<N>
46where
47    N: ProviderNodeTypes,
48{
49    /// Create a new persistence service
50    pub fn new(
51        provider: ProviderFactory<N>,
52        incoming: Receiver<PersistenceAction<N::Primitives>>,
53        pruner: PrunerWithFactory<ProviderFactory<N>>,
54        sync_metrics_tx: MetricEventsSender,
55    ) -> Self {
56        Self { provider, incoming, pruner, metrics: PersistenceMetrics::default(), sync_metrics_tx }
57    }
58
59    /// Prunes block data before the given block number according to the configured prune
60    /// configuration.
61    fn prune_before(&mut self, block_num: u64) -> Result<PrunerOutput, PrunerError> {
62        debug!(target: "engine::persistence", ?block_num, "Running pruner");
63        let start_time = Instant::now();
64        // TODO: doing this properly depends on pruner segment changes
65        let result = self.pruner.run(block_num);
66        self.metrics.prune_before_duration_seconds.record(start_time.elapsed());
67        result
68    }
69}
70
71impl<N> PersistenceService<N>
72where
73    N: ProviderNodeTypes,
74{
75    /// This is the main loop, that will listen to database events and perform the requested
76    /// database actions
77    pub fn run(mut self) -> Result<(), PersistenceError> {
78        // If the receiver errors then senders have disconnected, so the loop should then end.
79        while let Ok(action) = self.incoming.recv() {
80            match action {
81                PersistenceAction::RemoveBlocksAbove(new_tip_num, sender) => {
82                    let result = self.on_remove_blocks_above(new_tip_num)?;
83                    // send new sync metrics based on removed blocks
84                    let _ =
85                        self.sync_metrics_tx.send(MetricEvent::SyncHeight { height: new_tip_num });
86                    // we ignore the error because the caller may or may not care about the result
87                    let _ = sender.send(result);
88                }
89                PersistenceAction::SaveBlocks(blocks, sender) => {
90                    let result = self.on_save_blocks(blocks)?;
91                    let result_number = result.map(|r| r.number);
92
93                    // we ignore the error because the caller may or may not care about the result
94                    let _ = sender.send(result);
95
96                    if let Some(block_number) = result_number {
97                        // send new sync metrics based on saved blocks
98                        let _ = self
99                            .sync_metrics_tx
100                            .send(MetricEvent::SyncHeight { height: block_number });
101
102                        if self.pruner.is_pruning_needed(block_number) {
103                            // We log `PrunerOutput` inside the `Pruner`
104                            let _ = self.prune_before(block_number)?;
105                        }
106                    }
107                }
108                PersistenceAction::SaveFinalizedBlock(finalized_block) => {
109                    let provider = self.provider.database_provider_rw()?;
110                    provider.save_finalized_block_number(finalized_block)?;
111                    provider.commit()?;
112                }
113                PersistenceAction::SaveSafeBlock(safe_block) => {
114                    let provider = self.provider.database_provider_rw()?;
115                    provider.save_safe_block_number(safe_block)?;
116                    provider.commit()?;
117                }
118            }
119        }
120        Ok(())
121    }
122
123    fn on_remove_blocks_above(
124        &self,
125        new_tip_num: u64,
126    ) -> Result<Option<BlockNumHash>, PersistenceError> {
127        debug!(target: "engine::persistence", ?new_tip_num, "Removing blocks");
128        let start_time = Instant::now();
129        let provider_rw = self.provider.database_provider_rw()?;
130
131        let new_tip_hash = provider_rw.block_hash(new_tip_num)?;
132        provider_rw.remove_block_and_execution_above(new_tip_num)?;
133        provider_rw.commit()?;
134
135        debug!(target: "engine::persistence", ?new_tip_num, ?new_tip_hash, "Removed blocks from disk");
136        self.metrics.remove_blocks_above_duration_seconds.record(start_time.elapsed());
137        Ok(new_tip_hash.map(|hash| BlockNumHash { hash, number: new_tip_num }))
138    }
139
140    fn on_save_blocks(
141        &self,
142        blocks: Vec<ExecutedBlock<N::Primitives>>,
143    ) -> Result<Option<BlockNumHash>, PersistenceError> {
144        let first_block = blocks.first().map(|b| b.recovered_block.num_hash());
145        let last_block = blocks.last().map(|b| b.recovered_block.num_hash());
146        let block_count = blocks.len();
147        debug!(target: "engine::persistence", ?block_count, first=?first_block, last=?last_block, "Saving range of blocks");
148
149        let start_time = Instant::now();
150
151        if last_block.is_some() {
152            let provider_rw = self.provider.database_provider_rw()?;
153
154            provider_rw.save_blocks(blocks, SaveBlocksMode::Full)?;
155            provider_rw.commit()?;
156        }
157
158        debug!(target: "engine::persistence", first=?first_block, last=?last_block, "Saved range of blocks");
159
160        self.metrics.save_blocks_block_count.record(block_count as f64);
161        self.metrics.save_blocks_duration_seconds.record(start_time.elapsed());
162
163        Ok(last_block)
164    }
165}
166
167/// One of the errors that can happen when using the persistence service.
168#[derive(Debug, Error)]
169pub enum PersistenceError {
170    /// A pruner error
171    #[error(transparent)]
172    PrunerError(#[from] PrunerError),
173
174    /// A provider error
175    #[error(transparent)]
176    ProviderError(#[from] ProviderError),
177}
178
179/// A signal to the persistence service that part of the tree state can be persisted.
180#[derive(Debug)]
181pub enum PersistenceAction<N: NodePrimitives = EthPrimitives> {
182    /// The section of tree state that should be persisted. These blocks are expected in order of
183    /// increasing block number.
184    ///
185    /// First, header, transaction, and receipt-related data should be written to static files.
186    /// Then the execution history-related data will be written to the database.
187    SaveBlocks(Vec<ExecutedBlock<N>>, CrossbeamSender<Option<BlockNumHash>>),
188
189    /// Removes block data above the given block number from the database.
190    ///
191    /// This will first update checkpoints from the database, then remove actual block data from
192    /// static files.
193    RemoveBlocksAbove(u64, CrossbeamSender<Option<BlockNumHash>>),
194
195    /// Update the persisted finalized block on disk
196    SaveFinalizedBlock(u64),
197
198    /// Update the persisted safe block on disk
199    SaveSafeBlock(u64),
200}
201
202/// A handle to the persistence service
203#[derive(Debug, Clone)]
204pub struct PersistenceHandle<N: NodePrimitives = EthPrimitives> {
205    /// The channel used to communicate with the persistence service
206    sender: Sender<PersistenceAction<N>>,
207}
208
209impl<T: NodePrimitives> PersistenceHandle<T> {
210    /// Create a new [`PersistenceHandle`] from a [`Sender<PersistenceAction>`].
211    pub const fn new(sender: Sender<PersistenceAction<T>>) -> Self {
212        Self { sender }
213    }
214
215    /// Create a new [`PersistenceHandle`], and spawn the persistence service.
216    pub fn spawn_service<N>(
217        provider_factory: ProviderFactory<N>,
218        pruner: PrunerWithFactory<ProviderFactory<N>>,
219        sync_metrics_tx: MetricEventsSender,
220    ) -> PersistenceHandle<N::Primitives>
221    where
222        N: ProviderNodeTypes,
223    {
224        // create the initial channels
225        let (db_service_tx, db_service_rx) = std::sync::mpsc::channel();
226
227        // construct persistence handle
228        let persistence_handle = PersistenceHandle::new(db_service_tx);
229
230        // spawn the persistence service
231        let db_service =
232            PersistenceService::new(provider_factory, db_service_rx, pruner, sync_metrics_tx);
233        std::thread::Builder::new()
234            .name("Persistence Service".to_string())
235            .spawn(|| {
236                if let Err(err) = db_service.run() {
237                    error!(target: "engine::persistence", ?err, "Persistence service failed");
238                }
239            })
240            .unwrap();
241
242        persistence_handle
243    }
244
245    /// Sends a specific [`PersistenceAction`] in the contained channel. The caller is responsible
246    /// for creating any channels for the given action.
247    pub fn send_action(
248        &self,
249        action: PersistenceAction<T>,
250    ) -> Result<(), SendError<PersistenceAction<T>>> {
251        self.sender.send(action)
252    }
253
254    /// Tells the persistence service to save a certain list of finalized blocks. The blocks are
255    /// assumed to be ordered by block number.
256    ///
257    /// This returns the latest hash that has been saved, allowing removal of that block and any
258    /// previous blocks from in-memory data structures. This value is returned in the receiver end
259    /// of the sender argument.
260    ///
261    /// If there are no blocks to persist, then `None` is sent in the sender.
262    pub fn save_blocks(
263        &self,
264        blocks: Vec<ExecutedBlock<T>>,
265        tx: CrossbeamSender<Option<BlockNumHash>>,
266    ) -> Result<(), SendError<PersistenceAction<T>>> {
267        self.send_action(PersistenceAction::SaveBlocks(blocks, tx))
268    }
269
270    /// Persists the finalized block number on disk.
271    pub fn save_finalized_block_number(
272        &self,
273        finalized_block: u64,
274    ) -> Result<(), SendError<PersistenceAction<T>>> {
275        self.send_action(PersistenceAction::SaveFinalizedBlock(finalized_block))
276    }
277
278    /// Persists the safe block number on disk.
279    pub fn save_safe_block_number(
280        &self,
281        safe_block: u64,
282    ) -> Result<(), SendError<PersistenceAction<T>>> {
283        self.send_action(PersistenceAction::SaveSafeBlock(safe_block))
284    }
285
286    /// Tells the persistence service to remove blocks above a certain block number. The removed
287    /// blocks are returned by the service.
288    ///
289    /// When the operation completes, the new tip hash is returned in the receiver end of the sender
290    /// argument.
291    pub fn remove_blocks_above(
292        &self,
293        block_num: u64,
294        tx: CrossbeamSender<Option<BlockNumHash>>,
295    ) -> Result<(), SendError<PersistenceAction<T>>> {
296        self.send_action(PersistenceAction::RemoveBlocksAbove(block_num, tx))
297    }
298}
299
300#[cfg(test)]
301mod tests {
302    use super::*;
303    use alloy_primitives::B256;
304    use reth_chain_state::test_utils::TestBlockBuilder;
305    use reth_exex_types::FinishedExExHeight;
306    use reth_provider::test_utils::create_test_provider_factory;
307    use reth_prune::Pruner;
308    use tokio::sync::mpsc::unbounded_channel;
309
310    fn default_persistence_handle() -> PersistenceHandle<EthPrimitives> {
311        let provider = create_test_provider_factory();
312
313        let (_finished_exex_height_tx, finished_exex_height_rx) =
314            tokio::sync::watch::channel(FinishedExExHeight::NoExExs);
315
316        let pruner =
317            Pruner::new_with_factory(provider.clone(), vec![], 5, 0, None, finished_exex_height_rx);
318
319        let (sync_metrics_tx, _sync_metrics_rx) = unbounded_channel();
320        PersistenceHandle::<EthPrimitives>::spawn_service(provider, pruner, sync_metrics_tx)
321    }
322
323    #[test]
324    fn test_save_blocks_empty() {
325        reth_tracing::init_test_tracing();
326        let persistence_handle = default_persistence_handle();
327
328        let blocks = vec![];
329        let (tx, rx) = crossbeam_channel::bounded(1);
330
331        persistence_handle.save_blocks(blocks, tx).unwrap();
332
333        let hash = rx.recv().unwrap();
334        assert_eq!(hash, None);
335    }
336
337    #[test]
338    fn test_save_blocks_single_block() {
339        reth_tracing::init_test_tracing();
340        let persistence_handle = default_persistence_handle();
341        let block_number = 0;
342        let mut test_block_builder = TestBlockBuilder::eth();
343        let executed =
344            test_block_builder.get_executed_block_with_number(block_number, B256::random());
345        let block_hash = executed.recovered_block().hash();
346
347        let blocks = vec![executed];
348        let (tx, rx) = crossbeam_channel::bounded(1);
349
350        persistence_handle.save_blocks(blocks, tx).unwrap();
351
352        let BlockNumHash { hash: actual_hash, number: _ } = rx
353            .recv_timeout(std::time::Duration::from_secs(10))
354            .expect("test timed out")
355            .expect("no hash returned");
356
357        assert_eq!(block_hash, actual_hash);
358    }
359
360    #[test]
361    fn test_save_blocks_multiple_blocks() {
362        reth_tracing::init_test_tracing();
363        let persistence_handle = default_persistence_handle();
364
365        let mut test_block_builder = TestBlockBuilder::eth();
366        let blocks = test_block_builder.get_executed_blocks(0..5).collect::<Vec<_>>();
367        let last_hash = blocks.last().unwrap().recovered_block().hash();
368        let (tx, rx) = crossbeam_channel::bounded(1);
369
370        persistence_handle.save_blocks(blocks, tx).unwrap();
371        let BlockNumHash { hash: actual_hash, number: _ } = rx.recv().unwrap().unwrap();
372        assert_eq!(last_hash, actual_hash);
373    }
374
375    #[test]
376    fn test_save_blocks_multiple_calls() {
377        reth_tracing::init_test_tracing();
378        let persistence_handle = default_persistence_handle();
379
380        let ranges = [0..1, 1..2, 2..4, 4..5];
381        let mut test_block_builder = TestBlockBuilder::eth();
382        for range in ranges {
383            let blocks = test_block_builder.get_executed_blocks(range).collect::<Vec<_>>();
384            let last_hash = blocks.last().unwrap().recovered_block().hash();
385            let (tx, rx) = crossbeam_channel::bounded(1);
386
387            persistence_handle.save_blocks(blocks, tx).unwrap();
388
389            let BlockNumHash { hash: actual_hash, number: _ } = rx.recv().unwrap().unwrap();
390            assert_eq!(last_hash, actual_hash);
391        }
392    }
393}