1use crate::metrics::PersistenceMetrics;
2use alloy_eips::BlockNumHash;
3use crossbeam_channel::Sender as CrossbeamSender;
4use reth_chain_state::ExecutedBlock;
5use reth_errors::ProviderError;
6use reth_ethereum_primitives::EthPrimitives;
7use reth_primitives_traits::{FastInstant as Instant, NodePrimitives};
8use reth_provider::{
9 providers::ProviderNodeTypes, BlockExecutionWriter, BlockHashReader, ChainStateBlockWriter,
10 DBProvider, DatabaseProviderFactory, ProviderFactory, SaveBlocksMode,
11};
12use reth_prune::{PrunerError, PrunerWithFactory};
13use reth_stages_api::{MetricEvent, MetricEventsSender};
14use reth_tasks::spawn_os_thread;
15use std::{
16 sync::{
17 mpsc::{Receiver, SendError, Sender},
18 Arc,
19 },
20 thread::JoinHandle,
21};
22use thiserror::Error;
23use tracing::{debug, error, instrument};
24
25#[derive(Debug)]
33pub struct PersistenceService<N>
34where
35 N: ProviderNodeTypes,
36{
37 provider: ProviderFactory<N>,
39 incoming: Receiver<PersistenceAction<N::Primitives>>,
41 pruner: PrunerWithFactory<ProviderFactory<N>>,
43 metrics: PersistenceMetrics,
45 sync_metrics_tx: MetricEventsSender,
47 pending_finalized_block: Option<u64>,
50 pending_safe_block: Option<u64>,
53}
54
55impl<N> PersistenceService<N>
56where
57 N: ProviderNodeTypes,
58{
59 pub fn new(
61 provider: ProviderFactory<N>,
62 incoming: Receiver<PersistenceAction<N::Primitives>>,
63 pruner: PrunerWithFactory<ProviderFactory<N>>,
64 sync_metrics_tx: MetricEventsSender,
65 ) -> Self {
66 Self {
67 provider,
68 incoming,
69 pruner,
70 metrics: PersistenceMetrics::default(),
71 sync_metrics_tx,
72 pending_finalized_block: None,
73 pending_safe_block: None,
74 }
75 }
76}
77
78impl<N> PersistenceService<N>
79where
80 N: ProviderNodeTypes,
81{
82 pub fn run(mut self) -> Result<(), PersistenceError> {
85 while let Ok(action) = self.incoming.recv() {
87 match action {
88 PersistenceAction::RemoveBlocksAbove(new_tip_num, sender) => {
89 let result = self.on_remove_blocks_above(new_tip_num)?;
90 let _ =
92 self.sync_metrics_tx.send(MetricEvent::SyncHeight { height: new_tip_num });
93 let _ = sender.send(result);
95 }
96 PersistenceAction::SaveBlocks(blocks, sender) => {
97 let result = self.on_save_blocks(blocks)?;
98 let result_number = result.map(|r| r.number);
99
100 let _ = sender.send(result);
102
103 if let Some(block_number) = result_number {
104 let _ = self
106 .sync_metrics_tx
107 .send(MetricEvent::SyncHeight { height: block_number });
108 }
109 }
110 PersistenceAction::SaveFinalizedBlock(finalized_block) => {
111 self.pending_finalized_block = Some(finalized_block);
112 }
113 PersistenceAction::SaveSafeBlock(safe_block) => {
114 self.pending_safe_block = Some(safe_block);
115 }
116 }
117 }
118 Ok(())
119 }
120
121 #[instrument(level = "debug", target = "engine::persistence", skip_all, fields(%new_tip_num))]
122 fn on_remove_blocks_above(
123 &self,
124 new_tip_num: u64,
125 ) -> Result<Option<BlockNumHash>, PersistenceError> {
126 debug!(target: "engine::persistence", ?new_tip_num, "Removing blocks");
127 let start_time = Instant::now();
128 let provider_rw = self.provider.database_provider_rw()?;
129
130 let new_tip_hash = provider_rw.block_hash(new_tip_num)?;
131 provider_rw.remove_block_and_execution_above(new_tip_num)?;
132 provider_rw.commit()?;
133
134 debug!(target: "engine::persistence", ?new_tip_num, ?new_tip_hash, "Removed blocks from disk");
135 self.metrics.remove_blocks_above_duration_seconds.record(start_time.elapsed());
136 Ok(new_tip_hash.map(|hash| BlockNumHash { hash, number: new_tip_num }))
137 }
138
139 #[instrument(level = "debug", target = "engine::persistence", skip_all, fields(block_count = blocks.len()))]
140 fn on_save_blocks(
141 &mut self,
142 blocks: Vec<ExecutedBlock<N::Primitives>>,
143 ) -> Result<Option<BlockNumHash>, PersistenceError> {
144 let first_block = blocks.first().map(|b| b.recovered_block.num_hash());
145 let last_block = blocks.last().map(|b| b.recovered_block.num_hash());
146 let block_count = blocks.len();
147
148 let pending_finalized = self.pending_finalized_block.take();
149 let pending_safe = self.pending_safe_block.take();
150
151 debug!(target: "engine::persistence", ?block_count, first=?first_block, last=?last_block, "Saving range of blocks");
152
153 let start_time = Instant::now();
154
155 if let Some(last) = last_block {
156 let provider_rw = self.provider.database_provider_rw()?;
157 provider_rw.save_blocks(blocks, SaveBlocksMode::Full)?;
158
159 if let Some(finalized) = pending_finalized {
160 provider_rw.save_finalized_block_number(finalized.min(last.number))?;
163 if finalized > last.number {
164 self.pending_finalized_block = Some(finalized);
165 }
166 }
167 if let Some(safe) = pending_safe {
168 provider_rw.save_safe_block_number(safe.min(last.number))?;
169 if safe > last.number {
170 self.pending_safe_block = Some(safe);
171 }
172 }
173
174 if self.pruner.is_pruning_needed(last.number) {
175 debug!(target: "engine::persistence", block_num=?last.number, "Running pruner");
176 let prune_start = Instant::now();
177 let _ = self.pruner.run_with_provider(&provider_rw, last.number)?;
178 self.metrics.prune_before_duration_seconds.record(prune_start.elapsed());
179 }
180
181 provider_rw.commit()?;
182 }
183
184 debug!(target: "engine::persistence", first=?first_block, last=?last_block, "Saved range of blocks");
185
186 self.metrics.save_blocks_batch_size.record(block_count as f64);
187 self.metrics.save_blocks_duration_seconds.record(start_time.elapsed());
188
189 Ok(last_block)
190 }
191}
192
193#[derive(Debug, Error)]
195pub enum PersistenceError {
196 #[error(transparent)]
198 PrunerError(#[from] PrunerError),
199
200 #[error(transparent)]
202 ProviderError(#[from] ProviderError),
203}
204
205#[derive(Debug)]
207pub enum PersistenceAction<N: NodePrimitives = EthPrimitives> {
208 SaveBlocks(Vec<ExecutedBlock<N>>, CrossbeamSender<Option<BlockNumHash>>),
214
215 RemoveBlocksAbove(u64, CrossbeamSender<Option<BlockNumHash>>),
220
221 SaveFinalizedBlock(u64),
223
224 SaveSafeBlock(u64),
226}
227
228#[derive(Debug, Clone)]
230pub struct PersistenceHandle<N: NodePrimitives = EthPrimitives> {
231 sender: Sender<PersistenceAction<N>>,
233 _service_guard: Arc<ServiceGuard>,
236}
237
238impl<T: NodePrimitives> PersistenceHandle<T> {
239 pub fn new(sender: Sender<PersistenceAction<T>>) -> Self {
244 Self { sender, _service_guard: Arc::new(ServiceGuard(None)) }
245 }
246
247 pub fn spawn_service<N>(
253 provider_factory: ProviderFactory<N>,
254 pruner: PrunerWithFactory<ProviderFactory<N>>,
255 sync_metrics_tx: MetricEventsSender,
256 ) -> PersistenceHandle<N::Primitives>
257 where
258 N: ProviderNodeTypes,
259 {
260 let (db_service_tx, db_service_rx) = std::sync::mpsc::channel();
262
263 let db_service =
265 PersistenceService::new(provider_factory, db_service_rx, pruner, sync_metrics_tx);
266 let join_handle = spawn_os_thread("persistence", || {
267 if let Err(err) = db_service.run() {
268 error!(target: "engine::persistence", ?err, "Persistence service failed");
269 }
270 });
271
272 PersistenceHandle {
273 sender: db_service_tx,
274 _service_guard: Arc::new(ServiceGuard(Some(join_handle))),
275 }
276 }
277
278 pub fn send_action(
281 &self,
282 action: PersistenceAction<T>,
283 ) -> Result<(), SendError<PersistenceAction<T>>> {
284 self.sender.send(action)
285 }
286
287 pub fn save_blocks(
296 &self,
297 blocks: Vec<ExecutedBlock<T>>,
298 tx: CrossbeamSender<Option<BlockNumHash>>,
299 ) -> Result<(), SendError<PersistenceAction<T>>> {
300 self.send_action(PersistenceAction::SaveBlocks(blocks, tx))
301 }
302
303 pub fn save_finalized_block_number(
308 &self,
309 finalized_block: u64,
310 ) -> Result<(), SendError<PersistenceAction<T>>> {
311 self.send_action(PersistenceAction::SaveFinalizedBlock(finalized_block))
312 }
313
314 pub fn save_safe_block_number(
319 &self,
320 safe_block: u64,
321 ) -> Result<(), SendError<PersistenceAction<T>>> {
322 self.send_action(PersistenceAction::SaveSafeBlock(safe_block))
323 }
324
325 pub fn remove_blocks_above(
331 &self,
332 block_num: u64,
333 tx: CrossbeamSender<Option<BlockNumHash>>,
334 ) -> Result<(), SendError<PersistenceAction<T>>> {
335 self.send_action(PersistenceAction::RemoveBlocksAbove(block_num, tx))
336 }
337}
338
339struct ServiceGuard(Option<JoinHandle<()>>);
345
346impl std::fmt::Debug for ServiceGuard {
347 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
348 f.debug_tuple("ServiceGuard").field(&self.0.as_ref().map(|_| "...")).finish()
349 }
350}
351
352impl Drop for ServiceGuard {
353 fn drop(&mut self) {
354 if let Some(join_handle) = self.0.take() {
355 let _ = join_handle.join();
356 }
357 }
358}
359
360#[cfg(test)]
361mod tests {
362 use super::*;
363 use alloy_primitives::B256;
364 use reth_chain_state::test_utils::TestBlockBuilder;
365 use reth_exex_types::FinishedExExHeight;
366 use reth_provider::test_utils::create_test_provider_factory;
367 use reth_prune::Pruner;
368 use tokio::sync::mpsc::unbounded_channel;
369
370 fn default_persistence_handle() -> PersistenceHandle<EthPrimitives> {
371 let provider = create_test_provider_factory();
372
373 let (_finished_exex_height_tx, finished_exex_height_rx) =
374 tokio::sync::watch::channel(FinishedExExHeight::NoExExs);
375
376 let pruner =
377 Pruner::new_with_factory(provider.clone(), vec![], 5, 0, None, finished_exex_height_rx);
378
379 let (sync_metrics_tx, _sync_metrics_rx) = unbounded_channel();
380 PersistenceHandle::<EthPrimitives>::spawn_service(provider, pruner, sync_metrics_tx)
381 }
382
383 #[test]
384 fn test_save_blocks_empty() {
385 reth_tracing::init_test_tracing();
386 let handle = default_persistence_handle();
387
388 let blocks = vec![];
389 let (tx, rx) = crossbeam_channel::bounded(1);
390
391 handle.save_blocks(blocks, tx).unwrap();
392
393 let hash = rx.recv().unwrap();
394 assert_eq!(hash, None);
395 }
396
397 #[test]
398 fn test_save_blocks_single_block() {
399 reth_tracing::init_test_tracing();
400 let handle = default_persistence_handle();
401 let block_number = 0;
402 let mut test_block_builder = TestBlockBuilder::eth();
403 let executed =
404 test_block_builder.get_executed_block_with_number(block_number, B256::random());
405 let block_hash = executed.recovered_block().hash();
406
407 let blocks = vec![executed];
408 let (tx, rx) = crossbeam_channel::bounded(1);
409
410 handle.save_blocks(blocks, tx).unwrap();
411
412 let BlockNumHash { hash: actual_hash, number: _ } = rx
413 .recv_timeout(std::time::Duration::from_secs(10))
414 .expect("test timed out")
415 .expect("no hash returned");
416
417 assert_eq!(block_hash, actual_hash);
418 }
419
420 #[test]
421 fn test_save_blocks_multiple_blocks() {
422 reth_tracing::init_test_tracing();
423 let handle = default_persistence_handle();
424
425 let mut test_block_builder = TestBlockBuilder::eth();
426 let blocks = test_block_builder.get_executed_blocks(0..5).collect::<Vec<_>>();
427 let last_hash = blocks.last().unwrap().recovered_block().hash();
428 let (tx, rx) = crossbeam_channel::bounded(1);
429
430 handle.save_blocks(blocks, tx).unwrap();
431 let BlockNumHash { hash: actual_hash, number: _ } = rx.recv().unwrap().unwrap();
432 assert_eq!(last_hash, actual_hash);
433 }
434
435 #[test]
436 fn test_save_blocks_multiple_calls() {
437 reth_tracing::init_test_tracing();
438 let handle = default_persistence_handle();
439
440 let ranges = [0..1, 1..2, 2..4, 4..5];
441 let mut test_block_builder = TestBlockBuilder::eth();
442 for range in ranges {
443 let blocks = test_block_builder.get_executed_blocks(range).collect::<Vec<_>>();
444 let last_hash = blocks.last().unwrap().recovered_block().hash();
445 let (tx, rx) = crossbeam_channel::bounded(1);
446
447 handle.save_blocks(blocks, tx).unwrap();
448
449 let BlockNumHash { hash: actual_hash, number: _ } = rx.recv().unwrap().unwrap();
450 assert_eq!(last_hash, actual_hash);
451 }
452 }
453}