1use crate::metrics::PersistenceMetrics;
2use alloy_consensus::BlockHeader;
3use alloy_eips::BlockNumHash;
4use reth_chain_state::ExecutedBlockWithTrieUpdates;
5use reth_errors::ProviderError;
6use reth_ethereum_primitives::EthPrimitives;
7use reth_primitives_traits::NodePrimitives;
8use reth_provider::{
9 providers::ProviderNodeTypes, writer::UnifiedStorageWriter, BlockHashReader,
10 ChainStateBlockWriter, DatabaseProviderFactory, ProviderFactory, StaticFileProviderFactory,
11};
12use reth_prune::{PrunerError, PrunerOutput, PrunerWithFactory};
13use reth_stages_api::{MetricEvent, MetricEventsSender};
14use std::{
15 sync::mpsc::{Receiver, SendError, Sender},
16 time::Instant,
17};
18use thiserror::Error;
19use tokio::sync::oneshot;
20use tracing::{debug, error};
21
22#[derive(Debug)]
30pub struct PersistenceService<N>
31where
32 N: ProviderNodeTypes,
33{
34 provider: ProviderFactory<N>,
36 incoming: Receiver<PersistenceAction<N::Primitives>>,
38 pruner: PrunerWithFactory<ProviderFactory<N>>,
40 metrics: PersistenceMetrics,
42 sync_metrics_tx: MetricEventsSender,
44}
45
46impl<N> PersistenceService<N>
47where
48 N: ProviderNodeTypes,
49{
50 pub fn new(
52 provider: ProviderFactory<N>,
53 incoming: Receiver<PersistenceAction<N::Primitives>>,
54 pruner: PrunerWithFactory<ProviderFactory<N>>,
55 sync_metrics_tx: MetricEventsSender,
56 ) -> Self {
57 Self { provider, incoming, pruner, metrics: PersistenceMetrics::default(), sync_metrics_tx }
58 }
59
60 fn prune_before(&mut self, block_num: u64) -> Result<PrunerOutput, PrunerError> {
63 debug!(target: "engine::persistence", ?block_num, "Running pruner");
64 let start_time = Instant::now();
65 let result = self.pruner.run(block_num);
67 self.metrics.prune_before_duration_seconds.record(start_time.elapsed());
68 result
69 }
70}
71
72impl<N> PersistenceService<N>
73where
74 N: ProviderNodeTypes,
75{
76 pub fn run(mut self) -> Result<(), PersistenceError> {
79 while let Ok(action) = self.incoming.recv() {
81 match action {
82 PersistenceAction::RemoveBlocksAbove(new_tip_num, sender) => {
83 let result = self.on_remove_blocks_above(new_tip_num)?;
84 let _ =
86 self.sync_metrics_tx.send(MetricEvent::SyncHeight { height: new_tip_num });
87 let _ = sender.send(result);
89 }
90 PersistenceAction::SaveBlocks(blocks, sender) => {
91 let result = self.on_save_blocks(blocks)?;
92 let result_number = result.map(|r| r.number);
93
94 let _ = sender.send(result);
96
97 if let Some(block_number) = result_number {
98 let _ = self
100 .sync_metrics_tx
101 .send(MetricEvent::SyncHeight { height: block_number });
102
103 if self.pruner.is_pruning_needed(block_number) {
104 let _ = self.prune_before(block_number)?;
106 }
107 }
108 }
109 PersistenceAction::SaveFinalizedBlock(finalized_block) => {
110 let provider = self.provider.database_provider_rw()?;
111 provider.save_finalized_block_number(finalized_block)?;
112 provider.commit()?;
113 }
114 PersistenceAction::SaveSafeBlock(safe_block) => {
115 let provider = self.provider.database_provider_rw()?;
116 provider.save_safe_block_number(safe_block)?;
117 provider.commit()?;
118 }
119 }
120 }
121 Ok(())
122 }
123
124 fn on_remove_blocks_above(
125 &self,
126 new_tip_num: u64,
127 ) -> Result<Option<BlockNumHash>, PersistenceError> {
128 debug!(target: "engine::persistence", ?new_tip_num, "Removing blocks");
129 let start_time = Instant::now();
130 let provider_rw = self.provider.database_provider_rw()?;
131 let sf_provider = self.provider.static_file_provider();
132
133 let new_tip_hash = provider_rw.block_hash(new_tip_num)?;
134 UnifiedStorageWriter::from(&provider_rw, &sf_provider).remove_blocks_above(new_tip_num)?;
135 UnifiedStorageWriter::commit_unwind(provider_rw)?;
136
137 debug!(target: "engine::persistence", ?new_tip_num, ?new_tip_hash, "Removed blocks from disk");
138 self.metrics.remove_blocks_above_duration_seconds.record(start_time.elapsed());
139 Ok(new_tip_hash.map(|hash| BlockNumHash { hash, number: new_tip_num }))
140 }
141
142 fn on_save_blocks(
143 &self,
144 blocks: Vec<ExecutedBlockWithTrieUpdates<N::Primitives>>,
145 ) -> Result<Option<BlockNumHash>, PersistenceError> {
146 debug!(target: "engine::persistence", first=?blocks.first().map(|b| b.recovered_block.num_hash()), last=?blocks.last().map(|b| b.recovered_block.num_hash()), "Saving range of blocks");
147 let start_time = Instant::now();
148 let last_block_hash_num = blocks.last().map(|block| BlockNumHash {
149 hash: block.recovered_block().hash(),
150 number: block.recovered_block().header().number(),
151 });
152
153 if last_block_hash_num.is_some() {
154 let provider_rw = self.provider.database_provider_rw()?;
155 let static_file_provider = self.provider.static_file_provider();
156
157 UnifiedStorageWriter::from(&provider_rw, &static_file_provider).save_blocks(blocks)?;
158 UnifiedStorageWriter::commit(provider_rw)?;
159 }
160 self.metrics.save_blocks_duration_seconds.record(start_time.elapsed());
161 Ok(last_block_hash_num)
162 }
163}
164
165#[derive(Debug, Error)]
167pub enum PersistenceError {
168 #[error(transparent)]
170 PrunerError(#[from] PrunerError),
171
172 #[error(transparent)]
174 ProviderError(#[from] ProviderError),
175}
176
177#[derive(Debug)]
179pub enum PersistenceAction<N: NodePrimitives = EthPrimitives> {
180 SaveBlocks(Vec<ExecutedBlockWithTrieUpdates<N>>, oneshot::Sender<Option<BlockNumHash>>),
186
187 RemoveBlocksAbove(u64, oneshot::Sender<Option<BlockNumHash>>),
192
193 SaveFinalizedBlock(u64),
195
196 SaveSafeBlock(u64),
198}
199
200#[derive(Debug, Clone)]
202pub struct PersistenceHandle<N: NodePrimitives = EthPrimitives> {
203 sender: Sender<PersistenceAction<N>>,
205}
206
207impl<T: NodePrimitives> PersistenceHandle<T> {
208 pub const fn new(sender: Sender<PersistenceAction<T>>) -> Self {
210 Self { sender }
211 }
212
213 pub fn spawn_service<N>(
215 provider_factory: ProviderFactory<N>,
216 pruner: PrunerWithFactory<ProviderFactory<N>>,
217 sync_metrics_tx: MetricEventsSender,
218 ) -> PersistenceHandle<N::Primitives>
219 where
220 N: ProviderNodeTypes,
221 {
222 let (db_service_tx, db_service_rx) = std::sync::mpsc::channel();
224
225 let persistence_handle = PersistenceHandle::new(db_service_tx);
227
228 let db_service =
230 PersistenceService::new(provider_factory, db_service_rx, pruner, sync_metrics_tx);
231 std::thread::Builder::new()
232 .name("Persistence Service".to_string())
233 .spawn(|| {
234 if let Err(err) = db_service.run() {
235 error!(target: "engine::persistence", ?err, "Persistence service failed");
236 }
237 })
238 .unwrap();
239
240 persistence_handle
241 }
242
243 pub fn send_action(
246 &self,
247 action: PersistenceAction<T>,
248 ) -> Result<(), SendError<PersistenceAction<T>>> {
249 self.sender.send(action)
250 }
251
252 pub fn save_blocks(
261 &self,
262 blocks: Vec<ExecutedBlockWithTrieUpdates<T>>,
263 tx: oneshot::Sender<Option<BlockNumHash>>,
264 ) -> Result<(), SendError<PersistenceAction<T>>> {
265 self.send_action(PersistenceAction::SaveBlocks(blocks, tx))
266 }
267
268 pub fn save_finalized_block_number(
270 &self,
271 finalized_block: u64,
272 ) -> Result<(), SendError<PersistenceAction<T>>> {
273 self.send_action(PersistenceAction::SaveFinalizedBlock(finalized_block))
274 }
275
276 pub fn save_safe_block_number(
278 &self,
279 safe_block: u64,
280 ) -> Result<(), SendError<PersistenceAction<T>>> {
281 self.send_action(PersistenceAction::SaveSafeBlock(safe_block))
282 }
283
284 pub fn remove_blocks_above(
290 &self,
291 block_num: u64,
292 tx: oneshot::Sender<Option<BlockNumHash>>,
293 ) -> Result<(), SendError<PersistenceAction<T>>> {
294 self.send_action(PersistenceAction::RemoveBlocksAbove(block_num, tx))
295 }
296}
297
298#[cfg(test)]
299mod tests {
300 use super::*;
301 use alloy_primitives::B256;
302 use reth_chain_state::test_utils::TestBlockBuilder;
303 use reth_exex_types::FinishedExExHeight;
304 use reth_provider::test_utils::create_test_provider_factory;
305 use reth_prune::Pruner;
306 use tokio::sync::mpsc::unbounded_channel;
307
308 fn default_persistence_handle() -> PersistenceHandle<EthPrimitives> {
309 let provider = create_test_provider_factory();
310
311 let (_finished_exex_height_tx, finished_exex_height_rx) =
312 tokio::sync::watch::channel(FinishedExExHeight::NoExExs);
313
314 let pruner =
315 Pruner::new_with_factory(provider.clone(), vec![], 5, 0, None, finished_exex_height_rx);
316
317 let (sync_metrics_tx, _sync_metrics_rx) = unbounded_channel();
318 PersistenceHandle::<EthPrimitives>::spawn_service(provider, pruner, sync_metrics_tx)
319 }
320
321 #[tokio::test]
322 async fn test_save_blocks_empty() {
323 reth_tracing::init_test_tracing();
324 let persistence_handle = default_persistence_handle();
325
326 let blocks = vec![];
327 let (tx, rx) = oneshot::channel();
328
329 persistence_handle.save_blocks(blocks, tx).unwrap();
330
331 let hash = rx.await.unwrap();
332 assert_eq!(hash, None);
333 }
334
335 #[tokio::test]
336 async fn test_save_blocks_single_block() {
337 reth_tracing::init_test_tracing();
338 let persistence_handle = default_persistence_handle();
339 let block_number = 0;
340 let mut test_block_builder = TestBlockBuilder::eth();
341 let executed =
342 test_block_builder.get_executed_block_with_number(block_number, B256::random());
343 let block_hash = executed.recovered_block().hash();
344
345 let blocks = vec![executed];
346 let (tx, rx) = oneshot::channel();
347
348 persistence_handle.save_blocks(blocks, tx).unwrap();
349
350 let BlockNumHash { hash: actual_hash, number: _ } =
351 tokio::time::timeout(std::time::Duration::from_secs(10), rx)
352 .await
353 .expect("test timed out")
354 .expect("channel closed unexpectedly")
355 .expect("no hash returned");
356
357 assert_eq!(block_hash, actual_hash);
358 }
359
360 #[tokio::test]
361 async fn test_save_blocks_multiple_blocks() {
362 reth_tracing::init_test_tracing();
363 let persistence_handle = default_persistence_handle();
364
365 let mut test_block_builder = TestBlockBuilder::eth();
366 let blocks = test_block_builder.get_executed_blocks(0..5).collect::<Vec<_>>();
367 let last_hash = blocks.last().unwrap().recovered_block().hash();
368 let (tx, rx) = oneshot::channel();
369
370 persistence_handle.save_blocks(blocks, tx).unwrap();
371 let BlockNumHash { hash: actual_hash, number: _ } = rx.await.unwrap().unwrap();
372 assert_eq!(last_hash, actual_hash);
373 }
374
375 #[tokio::test]
376 async fn test_save_blocks_multiple_calls() {
377 reth_tracing::init_test_tracing();
378 let persistence_handle = default_persistence_handle();
379
380 let ranges = [0..1, 1..2, 2..4, 4..5];
381 let mut test_block_builder = TestBlockBuilder::eth();
382 for range in ranges {
383 let blocks = test_block_builder.get_executed_blocks(range).collect::<Vec<_>>();
384 let last_hash = blocks.last().unwrap().recovered_block().hash();
385 let (tx, rx) = oneshot::channel();
386
387 persistence_handle.save_blocks(blocks, tx).unwrap();
388
389 let BlockNumHash { hash: actual_hash, number: _ } = rx.await.unwrap().unwrap();
390 assert_eq!(last_hash, actual_hash);
391 }
392 }
393}