1use crate::metrics::PersistenceMetrics;
2use alloy_eips::BlockNumHash;
3use crossbeam_channel::Sender as CrossbeamSender;
4use reth_chain_state::ExecutedBlock;
5use reth_errors::ProviderError;
6use reth_ethereum_primitives::EthPrimitives;
7use reth_primitives_traits::NodePrimitives;
8use reth_provider::{
9 providers::ProviderNodeTypes, BlockExecutionWriter, BlockHashReader, ChainStateBlockWriter,
10 DBProvider, DatabaseProviderFactory, ProviderFactory, SaveBlocksMode,
11};
12use reth_prune::{PrunerError, PrunerOutput, PrunerWithFactory};
13use reth_stages_api::{MetricEvent, MetricEventsSender};
14use std::{
15 sync::mpsc::{Receiver, SendError, Sender},
16 time::Instant,
17};
18use thiserror::Error;
19use tracing::{debug, error};
20
21#[derive(Debug)]
29pub struct PersistenceService<N>
30where
31 N: ProviderNodeTypes,
32{
33 provider: ProviderFactory<N>,
35 incoming: Receiver<PersistenceAction<N::Primitives>>,
37 pruner: PrunerWithFactory<ProviderFactory<N>>,
39 metrics: PersistenceMetrics,
41 sync_metrics_tx: MetricEventsSender,
43}
44
45impl<N> PersistenceService<N>
46where
47 N: ProviderNodeTypes,
48{
49 pub fn new(
51 provider: ProviderFactory<N>,
52 incoming: Receiver<PersistenceAction<N::Primitives>>,
53 pruner: PrunerWithFactory<ProviderFactory<N>>,
54 sync_metrics_tx: MetricEventsSender,
55 ) -> Self {
56 Self { provider, incoming, pruner, metrics: PersistenceMetrics::default(), sync_metrics_tx }
57 }
58
59 fn prune_before(&mut self, block_num: u64) -> Result<PrunerOutput, PrunerError> {
62 debug!(target: "engine::persistence", ?block_num, "Running pruner");
63 let start_time = Instant::now();
64 let result = self.pruner.run(block_num);
66 self.metrics.prune_before_duration_seconds.record(start_time.elapsed());
67 result
68 }
69}
70
71impl<N> PersistenceService<N>
72where
73 N: ProviderNodeTypes,
74{
75 pub fn run(mut self) -> Result<(), PersistenceError> {
78 while let Ok(action) = self.incoming.recv() {
80 match action {
81 PersistenceAction::RemoveBlocksAbove(new_tip_num, sender) => {
82 let result = self.on_remove_blocks_above(new_tip_num)?;
83 let _ =
85 self.sync_metrics_tx.send(MetricEvent::SyncHeight { height: new_tip_num });
86 let _ = sender.send(result);
88 }
89 PersistenceAction::SaveBlocks(blocks, sender) => {
90 let result = self.on_save_blocks(blocks)?;
91 let result_number = result.map(|r| r.number);
92
93 let _ = sender.send(result);
95
96 if let Some(block_number) = result_number {
97 let _ = self
99 .sync_metrics_tx
100 .send(MetricEvent::SyncHeight { height: block_number });
101
102 if self.pruner.is_pruning_needed(block_number) {
103 let _ = self.prune_before(block_number)?;
105 }
106 }
107 }
108 PersistenceAction::SaveFinalizedBlock(finalized_block) => {
109 let provider = self.provider.database_provider_rw()?;
110 provider.save_finalized_block_number(finalized_block)?;
111 provider.commit()?;
112 }
113 PersistenceAction::SaveSafeBlock(safe_block) => {
114 let provider = self.provider.database_provider_rw()?;
115 provider.save_safe_block_number(safe_block)?;
116 provider.commit()?;
117 }
118 }
119 }
120 Ok(())
121 }
122
123 fn on_remove_blocks_above(
124 &self,
125 new_tip_num: u64,
126 ) -> Result<Option<BlockNumHash>, PersistenceError> {
127 debug!(target: "engine::persistence", ?new_tip_num, "Removing blocks");
128 let start_time = Instant::now();
129 let provider_rw = self.provider.database_provider_rw()?;
130
131 let new_tip_hash = provider_rw.block_hash(new_tip_num)?;
132 provider_rw.remove_block_and_execution_above(new_tip_num)?;
133 provider_rw.commit()?;
134
135 debug!(target: "engine::persistence", ?new_tip_num, ?new_tip_hash, "Removed blocks from disk");
136 self.metrics.remove_blocks_above_duration_seconds.record(start_time.elapsed());
137 Ok(new_tip_hash.map(|hash| BlockNumHash { hash, number: new_tip_num }))
138 }
139
140 fn on_save_blocks(
141 &self,
142 blocks: Vec<ExecutedBlock<N::Primitives>>,
143 ) -> Result<Option<BlockNumHash>, PersistenceError> {
144 let first_block = blocks.first().map(|b| b.recovered_block.num_hash());
145 let last_block = blocks.last().map(|b| b.recovered_block.num_hash());
146 let block_count = blocks.len();
147 debug!(target: "engine::persistence", ?block_count, first=?first_block, last=?last_block, "Saving range of blocks");
148
149 let start_time = Instant::now();
150
151 if last_block.is_some() {
152 let provider_rw = self.provider.database_provider_rw()?;
153
154 provider_rw.save_blocks(blocks, SaveBlocksMode::Full)?;
155 provider_rw.commit()?;
156 }
157
158 debug!(target: "engine::persistence", first=?first_block, last=?last_block, "Saved range of blocks");
159
160 self.metrics.save_blocks_block_count.record(block_count as f64);
161 self.metrics.save_blocks_duration_seconds.record(start_time.elapsed());
162
163 Ok(last_block)
164 }
165}
166
167#[derive(Debug, Error)]
169pub enum PersistenceError {
170 #[error(transparent)]
172 PrunerError(#[from] PrunerError),
173
174 #[error(transparent)]
176 ProviderError(#[from] ProviderError),
177}
178
179#[derive(Debug)]
181pub enum PersistenceAction<N: NodePrimitives = EthPrimitives> {
182 SaveBlocks(Vec<ExecutedBlock<N>>, CrossbeamSender<Option<BlockNumHash>>),
188
189 RemoveBlocksAbove(u64, CrossbeamSender<Option<BlockNumHash>>),
194
195 SaveFinalizedBlock(u64),
197
198 SaveSafeBlock(u64),
200}
201
202#[derive(Debug, Clone)]
204pub struct PersistenceHandle<N: NodePrimitives = EthPrimitives> {
205 sender: Sender<PersistenceAction<N>>,
207}
208
209impl<T: NodePrimitives> PersistenceHandle<T> {
210 pub const fn new(sender: Sender<PersistenceAction<T>>) -> Self {
212 Self { sender }
213 }
214
215 pub fn spawn_service<N>(
217 provider_factory: ProviderFactory<N>,
218 pruner: PrunerWithFactory<ProviderFactory<N>>,
219 sync_metrics_tx: MetricEventsSender,
220 ) -> PersistenceHandle<N::Primitives>
221 where
222 N: ProviderNodeTypes,
223 {
224 let (db_service_tx, db_service_rx) = std::sync::mpsc::channel();
226
227 let persistence_handle = PersistenceHandle::new(db_service_tx);
229
230 let db_service =
232 PersistenceService::new(provider_factory, db_service_rx, pruner, sync_metrics_tx);
233 std::thread::Builder::new()
234 .name("Persistence Service".to_string())
235 .spawn(|| {
236 if let Err(err) = db_service.run() {
237 error!(target: "engine::persistence", ?err, "Persistence service failed");
238 }
239 })
240 .unwrap();
241
242 persistence_handle
243 }
244
245 pub fn send_action(
248 &self,
249 action: PersistenceAction<T>,
250 ) -> Result<(), SendError<PersistenceAction<T>>> {
251 self.sender.send(action)
252 }
253
254 pub fn save_blocks(
263 &self,
264 blocks: Vec<ExecutedBlock<T>>,
265 tx: CrossbeamSender<Option<BlockNumHash>>,
266 ) -> Result<(), SendError<PersistenceAction<T>>> {
267 self.send_action(PersistenceAction::SaveBlocks(blocks, tx))
268 }
269
270 pub fn save_finalized_block_number(
272 &self,
273 finalized_block: u64,
274 ) -> Result<(), SendError<PersistenceAction<T>>> {
275 self.send_action(PersistenceAction::SaveFinalizedBlock(finalized_block))
276 }
277
278 pub fn save_safe_block_number(
280 &self,
281 safe_block: u64,
282 ) -> Result<(), SendError<PersistenceAction<T>>> {
283 self.send_action(PersistenceAction::SaveSafeBlock(safe_block))
284 }
285
286 pub fn remove_blocks_above(
292 &self,
293 block_num: u64,
294 tx: CrossbeamSender<Option<BlockNumHash>>,
295 ) -> Result<(), SendError<PersistenceAction<T>>> {
296 self.send_action(PersistenceAction::RemoveBlocksAbove(block_num, tx))
297 }
298}
299
300#[cfg(test)]
301mod tests {
302 use super::*;
303 use alloy_primitives::B256;
304 use reth_chain_state::test_utils::TestBlockBuilder;
305 use reth_exex_types::FinishedExExHeight;
306 use reth_provider::test_utils::create_test_provider_factory;
307 use reth_prune::Pruner;
308 use tokio::sync::mpsc::unbounded_channel;
309
310 fn default_persistence_handle() -> PersistenceHandle<EthPrimitives> {
311 let provider = create_test_provider_factory();
312
313 let (_finished_exex_height_tx, finished_exex_height_rx) =
314 tokio::sync::watch::channel(FinishedExExHeight::NoExExs);
315
316 let pruner =
317 Pruner::new_with_factory(provider.clone(), vec![], 5, 0, None, finished_exex_height_rx);
318
319 let (sync_metrics_tx, _sync_metrics_rx) = unbounded_channel();
320 PersistenceHandle::<EthPrimitives>::spawn_service(provider, pruner, sync_metrics_tx)
321 }
322
323 #[test]
324 fn test_save_blocks_empty() {
325 reth_tracing::init_test_tracing();
326 let persistence_handle = default_persistence_handle();
327
328 let blocks = vec![];
329 let (tx, rx) = crossbeam_channel::bounded(1);
330
331 persistence_handle.save_blocks(blocks, tx).unwrap();
332
333 let hash = rx.recv().unwrap();
334 assert_eq!(hash, None);
335 }
336
337 #[test]
338 fn test_save_blocks_single_block() {
339 reth_tracing::init_test_tracing();
340 let persistence_handle = default_persistence_handle();
341 let block_number = 0;
342 let mut test_block_builder = TestBlockBuilder::eth();
343 let executed =
344 test_block_builder.get_executed_block_with_number(block_number, B256::random());
345 let block_hash = executed.recovered_block().hash();
346
347 let blocks = vec![executed];
348 let (tx, rx) = crossbeam_channel::bounded(1);
349
350 persistence_handle.save_blocks(blocks, tx).unwrap();
351
352 let BlockNumHash { hash: actual_hash, number: _ } = rx
353 .recv_timeout(std::time::Duration::from_secs(10))
354 .expect("test timed out")
355 .expect("no hash returned");
356
357 assert_eq!(block_hash, actual_hash);
358 }
359
360 #[test]
361 fn test_save_blocks_multiple_blocks() {
362 reth_tracing::init_test_tracing();
363 let persistence_handle = default_persistence_handle();
364
365 let mut test_block_builder = TestBlockBuilder::eth();
366 let blocks = test_block_builder.get_executed_blocks(0..5).collect::<Vec<_>>();
367 let last_hash = blocks.last().unwrap().recovered_block().hash();
368 let (tx, rx) = crossbeam_channel::bounded(1);
369
370 persistence_handle.save_blocks(blocks, tx).unwrap();
371 let BlockNumHash { hash: actual_hash, number: _ } = rx.recv().unwrap().unwrap();
372 assert_eq!(last_hash, actual_hash);
373 }
374
375 #[test]
376 fn test_save_blocks_multiple_calls() {
377 reth_tracing::init_test_tracing();
378 let persistence_handle = default_persistence_handle();
379
380 let ranges = [0..1, 1..2, 2..4, 4..5];
381 let mut test_block_builder = TestBlockBuilder::eth();
382 for range in ranges {
383 let blocks = test_block_builder.get_executed_blocks(range).collect::<Vec<_>>();
384 let last_hash = blocks.last().unwrap().recovered_block().hash();
385 let (tx, rx) = crossbeam_channel::bounded(1);
386
387 persistence_handle.save_blocks(blocks, tx).unwrap();
388
389 let BlockNumHash { hash: actual_hash, number: _ } = rx.recv().unwrap().unwrap();
390 assert_eq!(last_hash, actual_hash);
391 }
392 }
393}