reth_engine_tree/
backfill.rsuse futures::FutureExt;
use reth_provider::providers::ProviderNodeTypes;
use reth_stages_api::{ControlFlow, Pipeline, PipelineError, PipelineTarget, PipelineWithResult};
use reth_tasks::TaskSpawner;
use std::task::{ready, Context, Poll};
use tokio::sync::oneshot;
use tracing::trace;
#[derive(Debug, PartialEq, Eq, Default)]
pub enum BackfillSyncState {
#[default]
Idle,
Pending,
Active,
}
impl BackfillSyncState {
pub const fn is_idle(&self) -> bool {
matches!(self, Self::Idle)
}
pub const fn is_pending(&self) -> bool {
matches!(self, Self::Pending)
}
pub const fn is_active(&self) -> bool {
matches!(self, Self::Active)
}
}
pub trait BackfillSync: Send + Sync {
fn on_action(&mut self, action: BackfillAction);
fn poll(&mut self, cx: &mut Context<'_>) -> Poll<BackfillEvent>;
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum BackfillAction {
Start(PipelineTarget),
}
#[derive(Debug)]
pub enum BackfillEvent {
Started(PipelineTarget),
Finished(Result<ControlFlow, PipelineError>),
TaskDropped(String),
}
#[derive(Debug)]
pub struct PipelineSync<N: ProviderNodeTypes> {
pipeline_task_spawner: Box<dyn TaskSpawner>,
pipeline_state: PipelineState<N>,
pending_pipeline_target: Option<PipelineTarget>,
}
impl<N: ProviderNodeTypes> PipelineSync<N> {
pub fn new(pipeline: Pipeline<N>, pipeline_task_spawner: Box<dyn TaskSpawner>) -> Self {
Self {
pipeline_task_spawner,
pipeline_state: PipelineState::Idle(Some(pipeline)),
pending_pipeline_target: None,
}
}
#[allow(dead_code)]
const fn is_pipeline_sync_pending(&self) -> bool {
self.pending_pipeline_target.is_some() && self.pipeline_state.is_idle()
}
const fn is_pipeline_idle(&self) -> bool {
self.pipeline_state.is_idle()
}
const fn is_pipeline_active(&self) -> bool {
!self.is_pipeline_idle()
}
fn set_pipeline_sync_target(&mut self, target: PipelineTarget) {
if target.sync_target().is_some_and(|target| target.is_zero()) {
trace!(
target: "consensus::engine::sync",
"Pipeline target cannot be zero hash."
);
return
}
self.pending_pipeline_target = Some(target);
}
fn try_spawn_pipeline(&mut self) -> Option<BackfillEvent> {
match &mut self.pipeline_state {
PipelineState::Idle(pipeline) => {
let target = self.pending_pipeline_target.take()?;
let (tx, rx) = oneshot::channel();
let pipeline = pipeline.take().expect("exists");
self.pipeline_task_spawner.spawn_critical_blocking(
"pipeline task",
Box::pin(async move {
let result = pipeline.run_as_fut(Some(target)).await;
let _ = tx.send(result);
}),
);
self.pipeline_state = PipelineState::Running(rx);
Some(BackfillEvent::Started(target))
}
PipelineState::Running(_) => None,
}
}
fn poll_pipeline(&mut self, cx: &mut Context<'_>) -> Poll<BackfillEvent> {
let res = match self.pipeline_state {
PipelineState::Idle(_) => return Poll::Pending,
PipelineState::Running(ref mut fut) => {
ready!(fut.poll_unpin(cx))
}
};
let ev = match res {
Ok((pipeline, result)) => {
self.pipeline_state = PipelineState::Idle(Some(pipeline));
BackfillEvent::Finished(result)
}
Err(why) => {
BackfillEvent::TaskDropped(why.to_string())
}
};
Poll::Ready(ev)
}
}
impl<N: ProviderNodeTypes> BackfillSync for PipelineSync<N> {
fn on_action(&mut self, event: BackfillAction) {
match event {
BackfillAction::Start(target) => self.set_pipeline_sync_target(target),
}
}
fn poll(&mut self, cx: &mut Context<'_>) -> Poll<BackfillEvent> {
if let Some(event) = self.try_spawn_pipeline() {
return Poll::Ready(event)
}
if self.is_pipeline_active() {
if let Poll::Ready(event) = self.poll_pipeline(cx) {
return Poll::Ready(event)
}
}
Poll::Pending
}
}
#[derive(Debug)]
enum PipelineState<N: ProviderNodeTypes> {
Idle(Option<Pipeline<N>>),
Running(oneshot::Receiver<PipelineWithResult<N>>),
}
impl<N: ProviderNodeTypes> PipelineState<N> {
const fn is_idle(&self) -> bool {
matches!(self, Self::Idle(_))
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::test_utils::{insert_headers_into_client, TestPipelineBuilder};
use alloy_consensus::Header;
use alloy_eips::eip1559::ETHEREUM_BLOCK_GAS_LIMIT;
use alloy_primitives::{BlockNumber, B256};
use assert_matches::assert_matches;
use futures::poll;
use reth_chainspec::{ChainSpecBuilder, MAINNET};
use reth_network_p2p::test_utils::TestFullBlockClient;
use reth_primitives::SealedHeader;
use reth_provider::test_utils::MockNodeTypesWithDB;
use reth_stages::ExecOutput;
use reth_stages_api::StageCheckpoint;
use reth_tasks::TokioTaskExecutor;
use std::{collections::VecDeque, future::poll_fn, sync::Arc};
struct TestHarness {
pipeline_sync: PipelineSync<MockNodeTypesWithDB>,
tip: B256,
}
impl TestHarness {
fn new(total_blocks: usize, pipeline_done_after: u64) -> Self {
let chain_spec = Arc::new(
ChainSpecBuilder::default()
.chain(MAINNET.chain)
.genesis(MAINNET.genesis.clone())
.paris_activated()
.build(),
);
let pipeline = TestPipelineBuilder::new()
.with_pipeline_exec_outputs(VecDeque::from([Ok(ExecOutput {
checkpoint: StageCheckpoint::new(BlockNumber::from(pipeline_done_after)),
done: true,
})]))
.build(chain_spec);
let pipeline_sync = PipelineSync::new(pipeline, Box::<TokioTaskExecutor>::default());
let client = TestFullBlockClient::default();
let header = Header {
base_fee_per_gas: Some(7),
gas_limit: ETHEREUM_BLOCK_GAS_LIMIT,
..Default::default()
};
let header = SealedHeader::seal(header);
insert_headers_into_client(&client, header, 0..total_blocks);
let tip = client.highest_block().expect("there should be blocks here").hash();
Self { pipeline_sync, tip }
}
}
#[tokio::test]
async fn pipeline_started_and_finished() {
const TOTAL_BLOCKS: usize = 10;
const PIPELINE_DONE_AFTER: u64 = 5;
let TestHarness { mut pipeline_sync, tip } =
TestHarness::new(TOTAL_BLOCKS, PIPELINE_DONE_AFTER);
let sync_future = poll_fn(|cx| pipeline_sync.poll(cx));
let next_event = poll!(sync_future);
assert_matches!(next_event, Poll::Pending);
pipeline_sync.on_action(BackfillAction::Start(PipelineTarget::Sync(tip)));
let sync_future = poll_fn(|cx| pipeline_sync.poll(cx));
let next_event = poll!(sync_future);
assert_matches!(next_event, Poll::Ready(BackfillEvent::Started(target)) => {
assert_eq!(target.sync_target().unwrap(), tip);
});
let sync_future = poll_fn(|cx| pipeline_sync.poll(cx));
let next_ready = sync_future.await;
assert_matches!(next_ready, BackfillEvent::Finished(result) => {
assert_matches!(result, Ok(control_flow) => assert_eq!(control_flow, ControlFlow::Continue { block_number: PIPELINE_DONE_AFTER }));
});
}
}