1use crate::metrics::PersistenceMetrics;
2use alloy_consensus::BlockHeader;
3use alloy_eips::BlockNumHash;
4use reth_chain_state::ExecutedBlockWithTrieUpdates;
5use reth_errors::ProviderError;
6use reth_ethereum_primitives::EthPrimitives;
7use reth_primitives_traits::NodePrimitives;
8use reth_provider::{
9 providers::ProviderNodeTypes, BlockExecutionWriter, BlockHashReader, ChainStateBlockWriter,
10 DBProvider, DatabaseProviderFactory, ProviderFactory,
11};
12use reth_prune::{PrunerError, PrunerOutput, PrunerWithFactory};
13use reth_stages_api::{MetricEvent, MetricEventsSender};
14use std::{
15 sync::mpsc::{Receiver, SendError, Sender},
16 time::Instant,
17};
18use thiserror::Error;
19use tokio::sync::oneshot;
20use tracing::{debug, error};
21
22#[derive(Debug)]
30pub struct PersistenceService<N>
31where
32 N: ProviderNodeTypes,
33{
34 provider: ProviderFactory<N>,
36 incoming: Receiver<PersistenceAction<N::Primitives>>,
38 pruner: PrunerWithFactory<ProviderFactory<N>>,
40 metrics: PersistenceMetrics,
42 sync_metrics_tx: MetricEventsSender,
44}
45
46impl<N> PersistenceService<N>
47where
48 N: ProviderNodeTypes,
49{
50 pub fn new(
52 provider: ProviderFactory<N>,
53 incoming: Receiver<PersistenceAction<N::Primitives>>,
54 pruner: PrunerWithFactory<ProviderFactory<N>>,
55 sync_metrics_tx: MetricEventsSender,
56 ) -> Self {
57 Self { provider, incoming, pruner, metrics: PersistenceMetrics::default(), sync_metrics_tx }
58 }
59
60 fn prune_before(&mut self, block_num: u64) -> Result<PrunerOutput, PrunerError> {
63 debug!(target: "engine::persistence", ?block_num, "Running pruner");
64 let start_time = Instant::now();
65 let result = self.pruner.run(block_num);
67 self.metrics.prune_before_duration_seconds.record(start_time.elapsed());
68 result
69 }
70}
71
72impl<N> PersistenceService<N>
73where
74 N: ProviderNodeTypes,
75{
76 pub fn run(mut self) -> Result<(), PersistenceError> {
79 while let Ok(action) = self.incoming.recv() {
81 match action {
82 PersistenceAction::RemoveBlocksAbove(new_tip_num, sender) => {
83 let result = self.on_remove_blocks_above(new_tip_num)?;
84 let _ =
86 self.sync_metrics_tx.send(MetricEvent::SyncHeight { height: new_tip_num });
87 let _ = sender.send(result);
89 }
90 PersistenceAction::SaveBlocks(blocks, sender) => {
91 let result = self.on_save_blocks(blocks)?;
92 let result_number = result.map(|r| r.number);
93
94 let _ = sender.send(result);
96
97 if let Some(block_number) = result_number {
98 let _ = self
100 .sync_metrics_tx
101 .send(MetricEvent::SyncHeight { height: block_number });
102
103 if self.pruner.is_pruning_needed(block_number) {
104 let _ = self.prune_before(block_number)?;
106 }
107 }
108 }
109 PersistenceAction::SaveFinalizedBlock(finalized_block) => {
110 let provider = self.provider.database_provider_rw()?;
111 provider.save_finalized_block_number(finalized_block)?;
112 provider.commit()?;
113 }
114 PersistenceAction::SaveSafeBlock(safe_block) => {
115 let provider = self.provider.database_provider_rw()?;
116 provider.save_safe_block_number(safe_block)?;
117 provider.commit()?;
118 }
119 }
120 }
121 Ok(())
122 }
123
124 fn on_remove_blocks_above(
125 &self,
126 new_tip_num: u64,
127 ) -> Result<Option<BlockNumHash>, PersistenceError> {
128 debug!(target: "engine::persistence", ?new_tip_num, "Removing blocks");
129 let start_time = Instant::now();
130 let provider_rw = self.provider.database_provider_rw()?;
131
132 let new_tip_hash = provider_rw.block_hash(new_tip_num)?;
133 provider_rw.remove_block_and_execution_above(new_tip_num)?;
134 provider_rw.commit()?;
135
136 debug!(target: "engine::persistence", ?new_tip_num, ?new_tip_hash, "Removed blocks from disk");
137 self.metrics.remove_blocks_above_duration_seconds.record(start_time.elapsed());
138 Ok(new_tip_hash.map(|hash| BlockNumHash { hash, number: new_tip_num }))
139 }
140
141 fn on_save_blocks(
142 &self,
143 blocks: Vec<ExecutedBlockWithTrieUpdates<N::Primitives>>,
144 ) -> Result<Option<BlockNumHash>, PersistenceError> {
145 debug!(target: "engine::persistence", first=?blocks.first().map(|b| b.recovered_block.num_hash()), last=?blocks.last().map(|b| b.recovered_block.num_hash()), "Saving range of blocks");
146 let start_time = Instant::now();
147 let last_block_hash_num = blocks.last().map(|block| BlockNumHash {
148 hash: block.recovered_block().hash(),
149 number: block.recovered_block().header().number(),
150 });
151
152 if last_block_hash_num.is_some() {
153 let provider_rw = self.provider.database_provider_rw()?;
154
155 provider_rw.save_blocks(blocks)?;
156 provider_rw.commit()?;
157 }
158 self.metrics.save_blocks_duration_seconds.record(start_time.elapsed());
159 Ok(last_block_hash_num)
160 }
161}
162
163#[derive(Debug, Error)]
165pub enum PersistenceError {
166 #[error(transparent)]
168 PrunerError(#[from] PrunerError),
169
170 #[error(transparent)]
172 ProviderError(#[from] ProviderError),
173}
174
175#[derive(Debug)]
177pub enum PersistenceAction<N: NodePrimitives = EthPrimitives> {
178 SaveBlocks(Vec<ExecutedBlockWithTrieUpdates<N>>, oneshot::Sender<Option<BlockNumHash>>),
184
185 RemoveBlocksAbove(u64, oneshot::Sender<Option<BlockNumHash>>),
190
191 SaveFinalizedBlock(u64),
193
194 SaveSafeBlock(u64),
196}
197
198#[derive(Debug, Clone)]
200pub struct PersistenceHandle<N: NodePrimitives = EthPrimitives> {
201 sender: Sender<PersistenceAction<N>>,
203}
204
205impl<T: NodePrimitives> PersistenceHandle<T> {
206 pub const fn new(sender: Sender<PersistenceAction<T>>) -> Self {
208 Self { sender }
209 }
210
211 pub fn spawn_service<N>(
213 provider_factory: ProviderFactory<N>,
214 pruner: PrunerWithFactory<ProviderFactory<N>>,
215 sync_metrics_tx: MetricEventsSender,
216 ) -> PersistenceHandle<N::Primitives>
217 where
218 N: ProviderNodeTypes,
219 {
220 let (db_service_tx, db_service_rx) = std::sync::mpsc::channel();
222
223 let persistence_handle = PersistenceHandle::new(db_service_tx);
225
226 let db_service =
228 PersistenceService::new(provider_factory, db_service_rx, pruner, sync_metrics_tx);
229 std::thread::Builder::new()
230 .name("Persistence Service".to_string())
231 .spawn(|| {
232 if let Err(err) = db_service.run() {
233 error!(target: "engine::persistence", ?err, "Persistence service failed");
234 }
235 })
236 .unwrap();
237
238 persistence_handle
239 }
240
241 pub fn send_action(
244 &self,
245 action: PersistenceAction<T>,
246 ) -> Result<(), SendError<PersistenceAction<T>>> {
247 self.sender.send(action)
248 }
249
250 pub fn save_blocks(
259 &self,
260 blocks: Vec<ExecutedBlockWithTrieUpdates<T>>,
261 tx: oneshot::Sender<Option<BlockNumHash>>,
262 ) -> Result<(), SendError<PersistenceAction<T>>> {
263 self.send_action(PersistenceAction::SaveBlocks(blocks, tx))
264 }
265
266 pub fn save_finalized_block_number(
268 &self,
269 finalized_block: u64,
270 ) -> Result<(), SendError<PersistenceAction<T>>> {
271 self.send_action(PersistenceAction::SaveFinalizedBlock(finalized_block))
272 }
273
274 pub fn save_safe_block_number(
276 &self,
277 safe_block: u64,
278 ) -> Result<(), SendError<PersistenceAction<T>>> {
279 self.send_action(PersistenceAction::SaveSafeBlock(safe_block))
280 }
281
282 pub fn remove_blocks_above(
288 &self,
289 block_num: u64,
290 tx: oneshot::Sender<Option<BlockNumHash>>,
291 ) -> Result<(), SendError<PersistenceAction<T>>> {
292 self.send_action(PersistenceAction::RemoveBlocksAbove(block_num, tx))
293 }
294}
295
296#[cfg(test)]
297mod tests {
298 use super::*;
299 use alloy_primitives::B256;
300 use reth_chain_state::test_utils::TestBlockBuilder;
301 use reth_exex_types::FinishedExExHeight;
302 use reth_provider::test_utils::create_test_provider_factory;
303 use reth_prune::Pruner;
304 use tokio::sync::mpsc::unbounded_channel;
305
306 fn default_persistence_handle() -> PersistenceHandle<EthPrimitives> {
307 let provider = create_test_provider_factory();
308
309 let (_finished_exex_height_tx, finished_exex_height_rx) =
310 tokio::sync::watch::channel(FinishedExExHeight::NoExExs);
311
312 let pruner =
313 Pruner::new_with_factory(provider.clone(), vec![], 5, 0, None, finished_exex_height_rx);
314
315 let (sync_metrics_tx, _sync_metrics_rx) = unbounded_channel();
316 PersistenceHandle::<EthPrimitives>::spawn_service(provider, pruner, sync_metrics_tx)
317 }
318
319 #[tokio::test]
320 async fn test_save_blocks_empty() {
321 reth_tracing::init_test_tracing();
322 let persistence_handle = default_persistence_handle();
323
324 let blocks = vec![];
325 let (tx, rx) = oneshot::channel();
326
327 persistence_handle.save_blocks(blocks, tx).unwrap();
328
329 let hash = rx.await.unwrap();
330 assert_eq!(hash, None);
331 }
332
333 #[tokio::test]
334 async fn test_save_blocks_single_block() {
335 reth_tracing::init_test_tracing();
336 let persistence_handle = default_persistence_handle();
337 let block_number = 0;
338 let mut test_block_builder = TestBlockBuilder::eth();
339 let executed =
340 test_block_builder.get_executed_block_with_number(block_number, B256::random());
341 let block_hash = executed.recovered_block().hash();
342
343 let blocks = vec![executed];
344 let (tx, rx) = oneshot::channel();
345
346 persistence_handle.save_blocks(blocks, tx).unwrap();
347
348 let BlockNumHash { hash: actual_hash, number: _ } =
349 tokio::time::timeout(std::time::Duration::from_secs(10), rx)
350 .await
351 .expect("test timed out")
352 .expect("channel closed unexpectedly")
353 .expect("no hash returned");
354
355 assert_eq!(block_hash, actual_hash);
356 }
357
358 #[tokio::test]
359 async fn test_save_blocks_multiple_blocks() {
360 reth_tracing::init_test_tracing();
361 let persistence_handle = default_persistence_handle();
362
363 let mut test_block_builder = TestBlockBuilder::eth();
364 let blocks = test_block_builder.get_executed_blocks(0..5).collect::<Vec<_>>();
365 let last_hash = blocks.last().unwrap().recovered_block().hash();
366 let (tx, rx) = oneshot::channel();
367
368 persistence_handle.save_blocks(blocks, tx).unwrap();
369 let BlockNumHash { hash: actual_hash, number: _ } = rx.await.unwrap().unwrap();
370 assert_eq!(last_hash, actual_hash);
371 }
372
373 #[tokio::test]
374 async fn test_save_blocks_multiple_calls() {
375 reth_tracing::init_test_tracing();
376 let persistence_handle = default_persistence_handle();
377
378 let ranges = [0..1, 1..2, 2..4, 4..5];
379 let mut test_block_builder = TestBlockBuilder::eth();
380 for range in ranges {
381 let blocks = test_block_builder.get_executed_blocks(range).collect::<Vec<_>>();
382 let last_hash = blocks.last().unwrap().recovered_block().hash();
383 let (tx, rx) = oneshot::channel();
384
385 persistence_handle.save_blocks(blocks, tx).unwrap();
386
387 let BlockNumHash { hash: actual_hash, number: _ } = rx.await.unwrap().unwrap();
388 assert_eq!(last_hash, actual_hash);
389 }
390 }
391}