1use crate::metrics::PersistenceMetrics;
2use alloy_consensus::BlockHeader;
3use alloy_eips::BlockNumHash;
4use reth_chain_state::ExecutedBlock;
5use reth_errors::ProviderError;
6use reth_ethereum_primitives::EthPrimitives;
7use reth_primitives_traits::NodePrimitives;
8use reth_provider::{
9 providers::ProviderNodeTypes, BlockExecutionWriter, BlockHashReader, ChainStateBlockWriter,
10 DBProvider, DatabaseProviderFactory, ProviderFactory,
11};
12use reth_prune::{PrunerError, PrunerOutput, PrunerWithFactory};
13use reth_stages_api::{MetricEvent, MetricEventsSender};
14use std::{
15 sync::mpsc::{Receiver, SendError, Sender},
16 time::Instant,
17};
18use thiserror::Error;
19use tokio::sync::oneshot;
20use tracing::{debug, error};
21
22#[derive(Debug)]
30pub struct PersistenceService<N>
31where
32 N: ProviderNodeTypes,
33{
34 provider: ProviderFactory<N>,
36 incoming: Receiver<PersistenceAction<N::Primitives>>,
38 pruner: PrunerWithFactory<ProviderFactory<N>>,
40 metrics: PersistenceMetrics,
42 sync_metrics_tx: MetricEventsSender,
44}
45
46impl<N> PersistenceService<N>
47where
48 N: ProviderNodeTypes,
49{
50 pub fn new(
52 provider: ProviderFactory<N>,
53 incoming: Receiver<PersistenceAction<N::Primitives>>,
54 pruner: PrunerWithFactory<ProviderFactory<N>>,
55 sync_metrics_tx: MetricEventsSender,
56 ) -> Self {
57 Self { provider, incoming, pruner, metrics: PersistenceMetrics::default(), sync_metrics_tx }
58 }
59
60 fn prune_before(&mut self, block_num: u64) -> Result<PrunerOutput, PrunerError> {
63 debug!(target: "engine::persistence", ?block_num, "Running pruner");
64 let start_time = Instant::now();
65 let result = self.pruner.run(block_num);
67 self.metrics.prune_before_duration_seconds.record(start_time.elapsed());
68 result
69 }
70}
71
72impl<N> PersistenceService<N>
73where
74 N: ProviderNodeTypes,
75{
76 pub fn run(mut self) -> Result<(), PersistenceError> {
79 while let Ok(action) = self.incoming.recv() {
81 match action {
82 PersistenceAction::RemoveBlocksAbove(new_tip_num, sender) => {
83 let result = self.on_remove_blocks_above(new_tip_num)?;
84 let _ =
86 self.sync_metrics_tx.send(MetricEvent::SyncHeight { height: new_tip_num });
87 let _ = sender.send(result);
89 }
90 PersistenceAction::SaveBlocks(blocks, sender) => {
91 let result = self.on_save_blocks(blocks)?;
92 let result_number = result.map(|r| r.number);
93
94 let _ = sender.send(result);
96
97 if let Some(block_number) = result_number {
98 let _ = self
100 .sync_metrics_tx
101 .send(MetricEvent::SyncHeight { height: block_number });
102
103 if self.pruner.is_pruning_needed(block_number) {
104 let _ = self.prune_before(block_number)?;
106 }
107 }
108 }
109 PersistenceAction::SaveFinalizedBlock(finalized_block) => {
110 let provider = self.provider.database_provider_rw()?;
111 provider.save_finalized_block_number(finalized_block)?;
112 provider.commit()?;
113 }
114 PersistenceAction::SaveSafeBlock(safe_block) => {
115 let provider = self.provider.database_provider_rw()?;
116 provider.save_safe_block_number(safe_block)?;
117 provider.commit()?;
118 }
119 }
120 }
121 Ok(())
122 }
123
124 fn on_remove_blocks_above(
125 &self,
126 new_tip_num: u64,
127 ) -> Result<Option<BlockNumHash>, PersistenceError> {
128 debug!(target: "engine::persistence", ?new_tip_num, "Removing blocks");
129 let start_time = Instant::now();
130 let provider_rw = self.provider.database_provider_rw()?;
131
132 let new_tip_hash = provider_rw.block_hash(new_tip_num)?;
133 provider_rw.remove_block_and_execution_above(new_tip_num)?;
134 provider_rw.commit()?;
135
136 debug!(target: "engine::persistence", ?new_tip_num, ?new_tip_hash, "Removed blocks from disk");
137 self.metrics.remove_blocks_above_duration_seconds.record(start_time.elapsed());
138 Ok(new_tip_hash.map(|hash| BlockNumHash { hash, number: new_tip_num }))
139 }
140
141 fn on_save_blocks(
142 &self,
143 blocks: Vec<ExecutedBlock<N::Primitives>>,
144 ) -> Result<Option<BlockNumHash>, PersistenceError> {
145 let first_block_hash = blocks.first().map(|b| b.recovered_block.num_hash());
146 let last_block_hash = blocks.last().map(|b| b.recovered_block.num_hash());
147 debug!(target: "engine::persistence", first=?first_block_hash, last=?last_block_hash, "Saving range of blocks");
148
149 let start_time = Instant::now();
150 let last_block_hash_num = blocks.last().map(|block| BlockNumHash {
151 hash: block.recovered_block().hash(),
152 number: block.recovered_block().header().number(),
153 });
154
155 if last_block_hash_num.is_some() {
156 let provider_rw = self.provider.database_provider_rw()?;
157
158 provider_rw.save_blocks(blocks)?;
159 provider_rw.commit()?;
160 }
161
162 debug!(target: "engine::persistence", first=?first_block_hash, last=?last_block_hash, "Saved range of blocks");
163
164 self.metrics.save_blocks_duration_seconds.record(start_time.elapsed());
165 Ok(last_block_hash_num)
166 }
167}
168
169#[derive(Debug, Error)]
171pub enum PersistenceError {
172 #[error(transparent)]
174 PrunerError(#[from] PrunerError),
175
176 #[error(transparent)]
178 ProviderError(#[from] ProviderError),
179}
180
181#[derive(Debug)]
183pub enum PersistenceAction<N: NodePrimitives = EthPrimitives> {
184 SaveBlocks(Vec<ExecutedBlock<N>>, oneshot::Sender<Option<BlockNumHash>>),
190
191 RemoveBlocksAbove(u64, oneshot::Sender<Option<BlockNumHash>>),
196
197 SaveFinalizedBlock(u64),
199
200 SaveSafeBlock(u64),
202}
203
204#[derive(Debug, Clone)]
206pub struct PersistenceHandle<N: NodePrimitives = EthPrimitives> {
207 sender: Sender<PersistenceAction<N>>,
209}
210
211impl<T: NodePrimitives> PersistenceHandle<T> {
212 pub const fn new(sender: Sender<PersistenceAction<T>>) -> Self {
214 Self { sender }
215 }
216
217 pub fn spawn_service<N>(
219 provider_factory: ProviderFactory<N>,
220 pruner: PrunerWithFactory<ProviderFactory<N>>,
221 sync_metrics_tx: MetricEventsSender,
222 ) -> PersistenceHandle<N::Primitives>
223 where
224 N: ProviderNodeTypes,
225 {
226 let (db_service_tx, db_service_rx) = std::sync::mpsc::channel();
228
229 let persistence_handle = PersistenceHandle::new(db_service_tx);
231
232 let db_service =
234 PersistenceService::new(provider_factory, db_service_rx, pruner, sync_metrics_tx);
235 std::thread::Builder::new()
236 .name("Persistence Service".to_string())
237 .spawn(|| {
238 if let Err(err) = db_service.run() {
239 error!(target: "engine::persistence", ?err, "Persistence service failed");
240 }
241 })
242 .unwrap();
243
244 persistence_handle
245 }
246
247 pub fn send_action(
250 &self,
251 action: PersistenceAction<T>,
252 ) -> Result<(), SendError<PersistenceAction<T>>> {
253 self.sender.send(action)
254 }
255
256 pub fn save_blocks(
265 &self,
266 blocks: Vec<ExecutedBlock<T>>,
267 tx: oneshot::Sender<Option<BlockNumHash>>,
268 ) -> Result<(), SendError<PersistenceAction<T>>> {
269 self.send_action(PersistenceAction::SaveBlocks(blocks, tx))
270 }
271
272 pub fn save_finalized_block_number(
274 &self,
275 finalized_block: u64,
276 ) -> Result<(), SendError<PersistenceAction<T>>> {
277 self.send_action(PersistenceAction::SaveFinalizedBlock(finalized_block))
278 }
279
280 pub fn save_safe_block_number(
282 &self,
283 safe_block: u64,
284 ) -> Result<(), SendError<PersistenceAction<T>>> {
285 self.send_action(PersistenceAction::SaveSafeBlock(safe_block))
286 }
287
288 pub fn remove_blocks_above(
294 &self,
295 block_num: u64,
296 tx: oneshot::Sender<Option<BlockNumHash>>,
297 ) -> Result<(), SendError<PersistenceAction<T>>> {
298 self.send_action(PersistenceAction::RemoveBlocksAbove(block_num, tx))
299 }
300}
301
302#[cfg(test)]
303mod tests {
304 use super::*;
305 use alloy_primitives::B256;
306 use reth_chain_state::test_utils::TestBlockBuilder;
307 use reth_exex_types::FinishedExExHeight;
308 use reth_provider::test_utils::create_test_provider_factory;
309 use reth_prune::Pruner;
310 use tokio::sync::mpsc::unbounded_channel;
311
312 fn default_persistence_handle() -> PersistenceHandle<EthPrimitives> {
313 let provider = create_test_provider_factory();
314
315 let (_finished_exex_height_tx, finished_exex_height_rx) =
316 tokio::sync::watch::channel(FinishedExExHeight::NoExExs);
317
318 let pruner =
319 Pruner::new_with_factory(provider.clone(), vec![], 5, 0, None, finished_exex_height_rx);
320
321 let (sync_metrics_tx, _sync_metrics_rx) = unbounded_channel();
322 PersistenceHandle::<EthPrimitives>::spawn_service(provider, pruner, sync_metrics_tx)
323 }
324
325 #[tokio::test]
326 async fn test_save_blocks_empty() {
327 reth_tracing::init_test_tracing();
328 let persistence_handle = default_persistence_handle();
329
330 let blocks = vec![];
331 let (tx, rx) = oneshot::channel();
332
333 persistence_handle.save_blocks(blocks, tx).unwrap();
334
335 let hash = rx.await.unwrap();
336 assert_eq!(hash, None);
337 }
338
339 #[tokio::test]
340 async fn test_save_blocks_single_block() {
341 reth_tracing::init_test_tracing();
342 let persistence_handle = default_persistence_handle();
343 let block_number = 0;
344 let mut test_block_builder = TestBlockBuilder::eth();
345 let executed =
346 test_block_builder.get_executed_block_with_number(block_number, B256::random());
347 let block_hash = executed.recovered_block().hash();
348
349 let blocks = vec![executed];
350 let (tx, rx) = oneshot::channel();
351
352 persistence_handle.save_blocks(blocks, tx).unwrap();
353
354 let BlockNumHash { hash: actual_hash, number: _ } =
355 tokio::time::timeout(std::time::Duration::from_secs(10), rx)
356 .await
357 .expect("test timed out")
358 .expect("channel closed unexpectedly")
359 .expect("no hash returned");
360
361 assert_eq!(block_hash, actual_hash);
362 }
363
364 #[tokio::test]
365 async fn test_save_blocks_multiple_blocks() {
366 reth_tracing::init_test_tracing();
367 let persistence_handle = default_persistence_handle();
368
369 let mut test_block_builder = TestBlockBuilder::eth();
370 let blocks = test_block_builder.get_executed_blocks(0..5).collect::<Vec<_>>();
371 let last_hash = blocks.last().unwrap().recovered_block().hash();
372 let (tx, rx) = oneshot::channel();
373
374 persistence_handle.save_blocks(blocks, tx).unwrap();
375 let BlockNumHash { hash: actual_hash, number: _ } = rx.await.unwrap().unwrap();
376 assert_eq!(last_hash, actual_hash);
377 }
378
379 #[tokio::test]
380 async fn test_save_blocks_multiple_calls() {
381 reth_tracing::init_test_tracing();
382 let persistence_handle = default_persistence_handle();
383
384 let ranges = [0..1, 1..2, 2..4, 4..5];
385 let mut test_block_builder = TestBlockBuilder::eth();
386 for range in ranges {
387 let blocks = test_block_builder.get_executed_blocks(range).collect::<Vec<_>>();
388 let last_hash = blocks.last().unwrap().recovered_block().hash();
389 let (tx, rx) = oneshot::channel();
390
391 persistence_handle.save_blocks(blocks, tx).unwrap();
392
393 let BlockNumHash { hash: actual_hash, number: _ } = rx.await.unwrap().unwrap();
394 assert_eq!(last_hash, actual_hash);
395 }
396 }
397}