use crate::{
engine::hooks::{EngineHook, EngineHookContext, EngineHookError, EngineHookEvent},
hooks::EngineHookDBAccessLevel,
};
use futures::FutureExt;
use metrics::Counter;
use reth_db_api::database::Database;
use reth_errors::{RethError, RethResult};
use reth_primitives::BlockNumber;
use reth_provider::ProviderFactory;
use reth_prune::{Pruner, PrunerError, PrunerWithResult};
use reth_tasks::TaskSpawner;
use std::{
fmt,
task::{ready, Context, Poll},
};
use tokio::sync::oneshot;
pub struct PruneHook<DB> {
pruner_state: PrunerState<DB>,
pruner_task_spawner: Box<dyn TaskSpawner>,
metrics: Metrics,
}
impl<DB: fmt::Debug> fmt::Debug for PruneHook<DB> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("PruneHook")
.field("pruner_state", &self.pruner_state)
.field("metrics", &self.metrics)
.finish()
}
}
impl<DB: Database + 'static> PruneHook<DB> {
pub fn new(
pruner: Pruner<DB, ProviderFactory<DB>>,
pruner_task_spawner: Box<dyn TaskSpawner>,
) -> Self {
Self {
pruner_state: PrunerState::Idle(Some(pruner)),
pruner_task_spawner,
metrics: Metrics::default(),
}
}
fn poll_pruner(&mut self, cx: &mut Context<'_>) -> Poll<RethResult<EngineHookEvent>> {
let result = match self.pruner_state {
PrunerState::Idle(_) => return Poll::Pending,
PrunerState::Running(ref mut fut) => {
ready!(fut.poll_unpin(cx))
}
};
let event = match result {
Ok((pruner, result)) => {
self.pruner_state = PrunerState::Idle(Some(pruner));
match result {
Ok(_) => EngineHookEvent::Finished(Ok(())),
Err(err) => EngineHookEvent::Finished(Err(err.into())),
}
}
Err(_) => {
EngineHookEvent::Finished(Err(EngineHookError::ChannelClosed))
}
};
Poll::Ready(Ok(event))
}
fn try_spawn_pruner(&mut self, tip_block_number: BlockNumber) -> Option<EngineHookEvent> {
match &mut self.pruner_state {
PrunerState::Idle(pruner) => {
let mut pruner = pruner.take()?;
if pruner.is_pruning_needed(tip_block_number) {
let (tx, rx) = oneshot::channel();
self.pruner_task_spawner.spawn_critical_blocking(
"pruner task",
Box::pin(async move {
let result = pruner.run(tip_block_number);
let _ = tx.send((pruner, result));
}),
);
self.metrics.runs_total.increment(1);
self.pruner_state = PrunerState::Running(rx);
Some(EngineHookEvent::Started)
} else {
self.pruner_state = PrunerState::Idle(Some(pruner));
Some(EngineHookEvent::NotReady)
}
}
PrunerState::Running(_) => None,
}
}
}
impl<DB: Database + 'static> EngineHook for PruneHook<DB> {
fn name(&self) -> &'static str {
"Prune"
}
fn poll(
&mut self,
cx: &mut Context<'_>,
ctx: EngineHookContext,
) -> Poll<RethResult<EngineHookEvent>> {
match self.try_spawn_pruner(ctx.tip_block_number) {
Some(EngineHookEvent::NotReady) => return Poll::Pending,
Some(event) => return Poll::Ready(Ok(event)),
None => (),
}
self.poll_pruner(cx)
}
fn db_access_level(&self) -> EngineHookDBAccessLevel {
EngineHookDBAccessLevel::ReadWrite
}
}
#[derive(Debug)]
enum PrunerState<DB> {
Idle(Option<Pruner<DB, ProviderFactory<DB>>>),
Running(oneshot::Receiver<PrunerWithResult<DB, ProviderFactory<DB>>>),
}
#[derive(reth_metrics::Metrics)]
#[metrics(scope = "consensus.engine.prune")]
struct Metrics {
runs_total: Counter,
}
impl From<PrunerError> for EngineHookError {
fn from(err: PrunerError) -> Self {
match err {
PrunerError::PruneSegment(_) | PrunerError::InconsistentData(_) => {
Self::Internal(Box::new(err))
}
PrunerError::Database(err) => RethError::Database(err).into(),
PrunerError::Provider(err) => RethError::Provider(err).into(),
}
}
}