reth_beacon_consensus/engine/hooks/
static_file.rsuse crate::{
engine::hooks::{EngineHook, EngineHookContext, EngineHookError, EngineHookEvent},
hooks::EngineHookDBAccessLevel,
};
use alloy_primitives::BlockNumber;
use futures::FutureExt;
use reth_codecs::Compact;
use reth_db_api::table::Value;
use reth_errors::RethResult;
use reth_primitives::{static_file::HighestStaticFiles, NodePrimitives};
use reth_provider::{
BlockReader, ChainStateBlockReader, DatabaseProviderFactory, StageCheckpointReader,
StaticFileProviderFactory,
};
use reth_static_file::{StaticFileProducer, StaticFileProducerWithResult};
use reth_tasks::TaskSpawner;
use std::task::{ready, Context, Poll};
use tokio::sync::oneshot;
use tracing::trace;
#[derive(Debug)]
pub struct StaticFileHook<Provider> {
state: StaticFileProducerState<Provider>,
task_spawner: Box<dyn TaskSpawner>,
}
impl<Provider> StaticFileHook<Provider>
where
Provider: StaticFileProviderFactory
+ DatabaseProviderFactory<
Provider: StaticFileProviderFactory<
Primitives: NodePrimitives<
SignedTx: Value + Compact,
BlockHeader: Value + Compact,
Receipt: Value + Compact,
>,
> + StageCheckpointReader
+ BlockReader
+ ChainStateBlockReader,
> + 'static,
{
pub fn new(
static_file_producer: StaticFileProducer<Provider>,
task_spawner: Box<dyn TaskSpawner>,
) -> Self {
Self { state: StaticFileProducerState::Idle(Some(static_file_producer)), task_spawner }
}
fn poll_static_file_producer(
&mut self,
cx: &mut Context<'_>,
) -> Poll<RethResult<EngineHookEvent>> {
let result = match self.state {
StaticFileProducerState::Idle(_) => return Poll::Pending,
StaticFileProducerState::Running(ref mut fut) => {
ready!(fut.poll_unpin(cx))
}
};
let event = match result {
Ok((static_file_producer, result)) => {
self.state = StaticFileProducerState::Idle(Some(static_file_producer));
match result {
Ok(_) => EngineHookEvent::Finished(Ok(())),
Err(err) => EngineHookEvent::Finished(Err(EngineHookError::Common(err.into()))),
}
}
Err(_) => {
EngineHookEvent::Finished(Err(EngineHookError::ChannelClosed))
}
};
Poll::Ready(Ok(event))
}
fn try_spawn_static_file_producer(
&mut self,
finalized_block_number: BlockNumber,
) -> RethResult<Option<EngineHookEvent>> {
Ok(match &mut self.state {
StaticFileProducerState::Idle(static_file_producer) => {
let Some(static_file_producer) = static_file_producer.take() else {
trace!(target: "consensus::engine::hooks::static_file", "StaticFileProducer is already running but the state is idle");
return Ok(None)
};
let Some(locked_static_file_producer) = static_file_producer.try_lock_arc() else {
trace!(target: "consensus::engine::hooks::static_file", "StaticFileProducer lock is already taken");
return Ok(None)
};
let finalized_block_number = locked_static_file_producer
.last_finalized_block()?
.map(|on_disk| finalized_block_number.min(on_disk))
.unwrap_or(finalized_block_number);
let targets =
locked_static_file_producer.get_static_file_targets(HighestStaticFiles {
headers: Some(finalized_block_number),
receipts: Some(finalized_block_number),
transactions: Some(finalized_block_number),
})?;
if targets.any() {
let (tx, rx) = oneshot::channel();
self.task_spawner.spawn_critical_blocking(
"static_file_producer task",
Box::pin(async move {
let result = locked_static_file_producer.run(targets);
let _ = tx.send((static_file_producer, result));
}),
);
self.state = StaticFileProducerState::Running(rx);
Some(EngineHookEvent::Started)
} else {
self.state = StaticFileProducerState::Idle(Some(static_file_producer));
Some(EngineHookEvent::NotReady)
}
}
StaticFileProducerState::Running(_) => None,
})
}
}
impl<Provider> EngineHook for StaticFileHook<Provider>
where
Provider: StaticFileProviderFactory
+ DatabaseProviderFactory<
Provider: StaticFileProviderFactory<
Primitives: NodePrimitives<
SignedTx: Value + Compact,
BlockHeader: Value + Compact,
Receipt: Value + Compact,
>,
> + StageCheckpointReader
+ BlockReader
+ ChainStateBlockReader,
> + 'static,
{
fn name(&self) -> &'static str {
"StaticFile"
}
fn poll(
&mut self,
cx: &mut Context<'_>,
ctx: EngineHookContext,
) -> Poll<RethResult<EngineHookEvent>> {
let Some(finalized_block_number) = ctx.finalized_block_number else {
trace!(target: "consensus::engine::hooks::static_file", ?ctx, "Finalized block number is not available");
return Poll::Pending
};
match self.try_spawn_static_file_producer(finalized_block_number)? {
Some(EngineHookEvent::NotReady) => return Poll::Pending,
Some(event) => return Poll::Ready(Ok(event)),
None => (),
}
self.poll_static_file_producer(cx)
}
fn db_access_level(&self) -> EngineHookDBAccessLevel {
EngineHookDBAccessLevel::ReadOnly
}
}
#[derive(Debug)]
enum StaticFileProducerState<Provider> {
Idle(Option<StaticFileProducer<Provider>>),
Running(oneshot::Receiver<StaticFileProducerWithResult<Provider>>),
}