1use crate::metrics::PersistenceMetrics;
2use alloy_eips::BlockNumHash;
3use reth_chain_state::ExecutedBlock;
4use reth_errors::ProviderError;
5use reth_ethereum_primitives::EthPrimitives;
6use reth_primitives_traits::NodePrimitives;
7use reth_provider::{
8 providers::ProviderNodeTypes, BlockExecutionWriter, BlockHashReader, ChainStateBlockWriter,
9 DBProvider, DatabaseProviderFactory, ProviderFactory,
10};
11use reth_prune::{PrunerError, PrunerOutput, PrunerWithFactory};
12use reth_stages_api::{MetricEvent, MetricEventsSender};
13use std::{
14 sync::mpsc::{Receiver, SendError, Sender},
15 time::Instant,
16};
17use thiserror::Error;
18use tokio::sync::oneshot;
19use tracing::{debug, error};
20
21#[derive(Debug)]
29pub struct PersistenceService<N>
30where
31 N: ProviderNodeTypes,
32{
33 provider: ProviderFactory<N>,
35 incoming: Receiver<PersistenceAction<N::Primitives>>,
37 pruner: PrunerWithFactory<ProviderFactory<N>>,
39 metrics: PersistenceMetrics,
41 sync_metrics_tx: MetricEventsSender,
43}
44
45impl<N> PersistenceService<N>
46where
47 N: ProviderNodeTypes,
48{
49 pub fn new(
51 provider: ProviderFactory<N>,
52 incoming: Receiver<PersistenceAction<N::Primitives>>,
53 pruner: PrunerWithFactory<ProviderFactory<N>>,
54 sync_metrics_tx: MetricEventsSender,
55 ) -> Self {
56 Self { provider, incoming, pruner, metrics: PersistenceMetrics::default(), sync_metrics_tx }
57 }
58
59 fn prune_before(&mut self, block_num: u64) -> Result<PrunerOutput, PrunerError> {
62 debug!(target: "engine::persistence", ?block_num, "Running pruner");
63 let start_time = Instant::now();
64 let result = self.pruner.run(block_num);
66 self.metrics.prune_before_duration_seconds.record(start_time.elapsed());
67 result
68 }
69}
70
71impl<N> PersistenceService<N>
72where
73 N: ProviderNodeTypes,
74{
75 pub fn run(mut self) -> Result<(), PersistenceError> {
78 while let Ok(action) = self.incoming.recv() {
80 match action {
81 PersistenceAction::RemoveBlocksAbove(new_tip_num, sender) => {
82 let result = self.on_remove_blocks_above(new_tip_num)?;
83 let _ =
85 self.sync_metrics_tx.send(MetricEvent::SyncHeight { height: new_tip_num });
86 let _ = sender.send(result);
88 }
89 PersistenceAction::SaveBlocks(blocks, sender) => {
90 let result = self.on_save_blocks(blocks)?;
91 let result_number = result.map(|r| r.number);
92
93 let _ = sender.send(result);
95
96 if let Some(block_number) = result_number {
97 let _ = self
99 .sync_metrics_tx
100 .send(MetricEvent::SyncHeight { height: block_number });
101
102 if self.pruner.is_pruning_needed(block_number) {
103 let _ = self.prune_before(block_number)?;
105 }
106 }
107 }
108 PersistenceAction::SaveFinalizedBlock(finalized_block) => {
109 let provider = self.provider.database_provider_rw()?;
110 provider.save_finalized_block_number(finalized_block)?;
111 provider.commit()?;
112 }
113 PersistenceAction::SaveSafeBlock(safe_block) => {
114 let provider = self.provider.database_provider_rw()?;
115 provider.save_safe_block_number(safe_block)?;
116 provider.commit()?;
117 }
118 }
119 }
120 Ok(())
121 }
122
123 fn on_remove_blocks_above(
124 &self,
125 new_tip_num: u64,
126 ) -> Result<Option<BlockNumHash>, PersistenceError> {
127 debug!(target: "engine::persistence", ?new_tip_num, "Removing blocks");
128 let start_time = Instant::now();
129 let provider_rw = self.provider.database_provider_rw()?;
130
131 let new_tip_hash = provider_rw.block_hash(new_tip_num)?;
132 provider_rw.remove_block_and_execution_above(new_tip_num)?;
133 provider_rw.commit()?;
134
135 debug!(target: "engine::persistence", ?new_tip_num, ?new_tip_hash, "Removed blocks from disk");
136 self.metrics.remove_blocks_above_duration_seconds.record(start_time.elapsed());
137 Ok(new_tip_hash.map(|hash| BlockNumHash { hash, number: new_tip_num }))
138 }
139
140 fn on_save_blocks(
141 &self,
142 blocks: Vec<ExecutedBlock<N::Primitives>>,
143 ) -> Result<Option<BlockNumHash>, PersistenceError> {
144 let first_block = blocks.first().map(|b| b.recovered_block.num_hash());
145 let last_block = blocks.last().map(|b| b.recovered_block.num_hash());
146 let block_count = blocks.len();
147 debug!(target: "engine::persistence", ?block_count, first=?first_block, last=?last_block, "Saving range of blocks");
148
149 let start_time = Instant::now();
150
151 if last_block.is_some() {
152 let provider_rw = self.provider.database_provider_rw()?;
153
154 provider_rw.save_blocks(blocks)?;
155 provider_rw.commit()?;
156 }
157
158 debug!(target: "engine::persistence", first=?first_block, last=?last_block, "Saved range of blocks");
159
160 self.metrics.save_blocks_block_count.record(block_count as f64);
161 self.metrics.save_blocks_duration_seconds.record(start_time.elapsed());
162 Ok(last_block)
163 }
164}
165
166#[derive(Debug, Error)]
168pub enum PersistenceError {
169 #[error(transparent)]
171 PrunerError(#[from] PrunerError),
172
173 #[error(transparent)]
175 ProviderError(#[from] ProviderError),
176}
177
178#[derive(Debug)]
180pub enum PersistenceAction<N: NodePrimitives = EthPrimitives> {
181 SaveBlocks(Vec<ExecutedBlock<N>>, oneshot::Sender<Option<BlockNumHash>>),
187
188 RemoveBlocksAbove(u64, oneshot::Sender<Option<BlockNumHash>>),
193
194 SaveFinalizedBlock(u64),
196
197 SaveSafeBlock(u64),
199}
200
201#[derive(Debug, Clone)]
203pub struct PersistenceHandle<N: NodePrimitives = EthPrimitives> {
204 sender: Sender<PersistenceAction<N>>,
206}
207
208impl<T: NodePrimitives> PersistenceHandle<T> {
209 pub const fn new(sender: Sender<PersistenceAction<T>>) -> Self {
211 Self { sender }
212 }
213
214 pub fn spawn_service<N>(
216 provider_factory: ProviderFactory<N>,
217 pruner: PrunerWithFactory<ProviderFactory<N>>,
218 sync_metrics_tx: MetricEventsSender,
219 ) -> PersistenceHandle<N::Primitives>
220 where
221 N: ProviderNodeTypes,
222 {
223 let (db_service_tx, db_service_rx) = std::sync::mpsc::channel();
225
226 let persistence_handle = PersistenceHandle::new(db_service_tx);
228
229 let db_service =
231 PersistenceService::new(provider_factory, db_service_rx, pruner, sync_metrics_tx);
232 std::thread::Builder::new()
233 .name("Persistence Service".to_string())
234 .spawn(|| {
235 if let Err(err) = db_service.run() {
236 error!(target: "engine::persistence", ?err, "Persistence service failed");
237 }
238 })
239 .unwrap();
240
241 persistence_handle
242 }
243
244 pub fn send_action(
247 &self,
248 action: PersistenceAction<T>,
249 ) -> Result<(), SendError<PersistenceAction<T>>> {
250 self.sender.send(action)
251 }
252
253 pub fn save_blocks(
262 &self,
263 blocks: Vec<ExecutedBlock<T>>,
264 tx: oneshot::Sender<Option<BlockNumHash>>,
265 ) -> Result<(), SendError<PersistenceAction<T>>> {
266 self.send_action(PersistenceAction::SaveBlocks(blocks, tx))
267 }
268
269 pub fn save_finalized_block_number(
271 &self,
272 finalized_block: u64,
273 ) -> Result<(), SendError<PersistenceAction<T>>> {
274 self.send_action(PersistenceAction::SaveFinalizedBlock(finalized_block))
275 }
276
277 pub fn save_safe_block_number(
279 &self,
280 safe_block: u64,
281 ) -> Result<(), SendError<PersistenceAction<T>>> {
282 self.send_action(PersistenceAction::SaveSafeBlock(safe_block))
283 }
284
285 pub fn remove_blocks_above(
291 &self,
292 block_num: u64,
293 tx: oneshot::Sender<Option<BlockNumHash>>,
294 ) -> Result<(), SendError<PersistenceAction<T>>> {
295 self.send_action(PersistenceAction::RemoveBlocksAbove(block_num, tx))
296 }
297}
298
299#[cfg(test)]
300mod tests {
301 use super::*;
302 use alloy_primitives::B256;
303 use reth_chain_state::test_utils::TestBlockBuilder;
304 use reth_exex_types::FinishedExExHeight;
305 use reth_provider::test_utils::create_test_provider_factory;
306 use reth_prune::Pruner;
307 use tokio::sync::mpsc::unbounded_channel;
308
309 fn default_persistence_handle() -> PersistenceHandle<EthPrimitives> {
310 let provider = create_test_provider_factory();
311
312 let (_finished_exex_height_tx, finished_exex_height_rx) =
313 tokio::sync::watch::channel(FinishedExExHeight::NoExExs);
314
315 let pruner =
316 Pruner::new_with_factory(provider.clone(), vec![], 5, 0, None, finished_exex_height_rx);
317
318 let (sync_metrics_tx, _sync_metrics_rx) = unbounded_channel();
319 PersistenceHandle::<EthPrimitives>::spawn_service(provider, pruner, sync_metrics_tx)
320 }
321
322 #[tokio::test]
323 async fn test_save_blocks_empty() {
324 reth_tracing::init_test_tracing();
325 let persistence_handle = default_persistence_handle();
326
327 let blocks = vec![];
328 let (tx, rx) = oneshot::channel();
329
330 persistence_handle.save_blocks(blocks, tx).unwrap();
331
332 let hash = rx.await.unwrap();
333 assert_eq!(hash, None);
334 }
335
336 #[tokio::test]
337 async fn test_save_blocks_single_block() {
338 reth_tracing::init_test_tracing();
339 let persistence_handle = default_persistence_handle();
340 let block_number = 0;
341 let mut test_block_builder = TestBlockBuilder::eth();
342 let executed =
343 test_block_builder.get_executed_block_with_number(block_number, B256::random());
344 let block_hash = executed.recovered_block().hash();
345
346 let blocks = vec![executed];
347 let (tx, rx) = oneshot::channel();
348
349 persistence_handle.save_blocks(blocks, tx).unwrap();
350
351 let BlockNumHash { hash: actual_hash, number: _ } =
352 tokio::time::timeout(std::time::Duration::from_secs(10), rx)
353 .await
354 .expect("test timed out")
355 .expect("channel closed unexpectedly")
356 .expect("no hash returned");
357
358 assert_eq!(block_hash, actual_hash);
359 }
360
361 #[tokio::test]
362 async fn test_save_blocks_multiple_blocks() {
363 reth_tracing::init_test_tracing();
364 let persistence_handle = default_persistence_handle();
365
366 let mut test_block_builder = TestBlockBuilder::eth();
367 let blocks = test_block_builder.get_executed_blocks(0..5).collect::<Vec<_>>();
368 let last_hash = blocks.last().unwrap().recovered_block().hash();
369 let (tx, rx) = oneshot::channel();
370
371 persistence_handle.save_blocks(blocks, tx).unwrap();
372 let BlockNumHash { hash: actual_hash, number: _ } = rx.await.unwrap().unwrap();
373 assert_eq!(last_hash, actual_hash);
374 }
375
376 #[tokio::test]
377 async fn test_save_blocks_multiple_calls() {
378 reth_tracing::init_test_tracing();
379 let persistence_handle = default_persistence_handle();
380
381 let ranges = [0..1, 1..2, 2..4, 4..5];
382 let mut test_block_builder = TestBlockBuilder::eth();
383 for range in ranges {
384 let blocks = test_block_builder.get_executed_blocks(range).collect::<Vec<_>>();
385 let last_hash = blocks.last().unwrap().recovered_block().hash();
386 let (tx, rx) = oneshot::channel();
387
388 persistence_handle.save_blocks(blocks, tx).unwrap();
389
390 let BlockNumHash { hash: actual_hash, number: _ } = rx.await.unwrap().unwrap();
391 assert_eq!(last_hash, actual_hash);
392 }
393 }
394}