mod ctrl;
mod event;
pub use crate::pipeline::ctrl::ControlFlow;
use crate::{PipelineTarget, StageCheckpoint, StageId};
use alloy_primitives::{BlockNumber, B256};
pub use event::*;
use futures_util::Future;
use reth_primitives_traits::constants::BEACON_CONSENSUS_REORG_UNWIND_DEPTH;
use reth_provider::{
providers::ProviderNodeTypes, writer::UnifiedStorageWriter, ChainStateBlockReader,
ChainStateBlockWriter, DatabaseProviderFactory, ProviderFactory, StageCheckpointReader,
StageCheckpointWriter,
};
use reth_prune::PrunerBuilder;
use reth_static_file::StaticFileProducer;
use reth_tokio_util::{EventSender, EventStream};
use std::pin::Pin;
use tokio::sync::watch;
use tracing::*;
mod builder;
mod progress;
mod set;
use crate::{
BlockErrorKind, ExecInput, ExecOutput, MetricEvent, MetricEventsSender, PipelineError, Stage,
StageError, StageExt, UnwindInput,
};
pub use builder::*;
use progress::*;
use reth_errors::RethResult;
pub use set::*;
pub(crate) type BoxedStage<DB> = Box<dyn Stage<DB>>;
pub type PipelineFut<N> = Pin<Box<dyn Future<Output = PipelineWithResult<N>> + Send>>;
pub type PipelineWithResult<N> = (Pipeline<N>, Result<ControlFlow, PipelineError>);
#[cfg_attr(doc, aquamarine::aquamarine)]
pub struct Pipeline<N: ProviderNodeTypes> {
provider_factory: ProviderFactory<N>,
stages: Vec<BoxedStage<<ProviderFactory<N> as DatabaseProviderFactory>::ProviderRW>>,
max_block: Option<BlockNumber>,
static_file_producer: StaticFileProducer<ProviderFactory<N>>,
event_sender: EventSender<PipelineEvent>,
progress: PipelineProgress,
tip_tx: Option<watch::Sender<B256>>,
metrics_tx: Option<MetricEventsSender>,
fail_on_unwind: bool,
}
impl<N: ProviderNodeTypes> Pipeline<N> {
pub fn builder() -> PipelineBuilder<<ProviderFactory<N> as DatabaseProviderFactory>::ProviderRW>
{
PipelineBuilder::default()
}
pub const fn minimum_block_number(&self) -> Option<u64> {
self.progress.minimum_block_number
}
#[track_caller]
pub fn set_tip(&self, tip: B256) {
let _ = self.tip_tx.as_ref().expect("tip sender is set").send(tip).map_err(|_| {
warn!(target: "sync::pipeline", "Chain tip channel closed");
});
}
pub fn events(&self) -> EventStream<PipelineEvent> {
self.event_sender.new_listener()
}
}
impl<N: ProviderNodeTypes> Pipeline<N> {
pub fn register_metrics(&mut self) -> Result<(), PipelineError> {
let Some(metrics_tx) = &mut self.metrics_tx else { return Ok(()) };
let provider = self.provider_factory.provider()?;
for stage in &self.stages {
let stage_id = stage.id();
let _ = metrics_tx.send(MetricEvent::StageCheckpoint {
stage_id,
checkpoint: provider.get_stage_checkpoint(stage_id)?.unwrap_or_default(),
max_block_number: None,
});
}
Ok(())
}
#[track_caller]
pub fn run_as_fut(mut self, target: Option<PipelineTarget>) -> PipelineFut<N> {
let _ = self.register_metrics(); Box::pin(async move {
if let Some(target) = target {
match target {
PipelineTarget::Sync(tip) => self.set_tip(tip),
PipelineTarget::Unwind(target) => {
if let Err(err) = self.move_to_static_files() {
return (self, Err(err.into()))
}
if let Err(err) = self.unwind(target, None) {
return (self, Err(err))
}
self.progress.update(target);
return (self, Ok(ControlFlow::Continue { block_number: target }))
}
}
}
let result = self.run_loop().await;
trace!(target: "sync::pipeline", ?target, ?result, "Pipeline finished");
(self, result)
})
}
pub async fn run(&mut self) -> Result<(), PipelineError> {
let _ = self.register_metrics(); loop {
let next_action = self.run_loop().await?;
if next_action.is_unwind() && self.fail_on_unwind {
return Err(PipelineError::UnexpectedUnwind)
}
if next_action.should_continue() &&
self.progress
.minimum_block_number
.zip(self.max_block)
.is_some_and(|(progress, target)| progress >= target)
{
trace!(
target: "sync::pipeline",
?next_action,
minimum_block_number = ?self.progress.minimum_block_number,
max_block = ?self.max_block,
"Terminating pipeline."
);
return Ok(())
}
}
}
pub async fn run_loop(&mut self) -> Result<ControlFlow, PipelineError> {
self.move_to_static_files()?;
let mut previous_stage = None;
for stage_index in 0..self.stages.len() {
let stage = &self.stages[stage_index];
let stage_id = stage.id();
trace!(target: "sync::pipeline", stage = %stage_id, "Executing stage");
let next = self.execute_stage_to_completion(previous_stage, stage_index).await?;
trace!(target: "sync::pipeline", stage = %stage_id, ?next, "Completed stage");
match next {
ControlFlow::NoProgress { block_number } => {
if let Some(block_number) = block_number {
self.progress.update(block_number);
}
}
ControlFlow::Continue { block_number } => self.progress.update(block_number),
ControlFlow::Unwind { target, bad_block } => {
self.unwind(target, Some(bad_block.block.number))?;
return Ok(ControlFlow::Unwind { target, bad_block })
}
}
previous_stage = Some(
self.provider_factory
.provider()?
.get_stage_checkpoint(stage_id)?
.unwrap_or_default()
.block_number,
);
}
Ok(self.progress.next_ctrl())
}
pub fn move_to_static_files(&self) -> RethResult<()> {
let lowest_static_file_height =
self.static_file_producer.lock().copy_to_static_files()?.min_block_num();
if let Some(prune_tip) = lowest_static_file_height {
let mut pruner = PrunerBuilder::new(Default::default())
.delete_limit(usize::MAX)
.build_with_provider_factory(self.provider_factory.clone());
pruner.run(prune_tip)?;
}
Ok(())
}
pub fn unwind(
&mut self,
to: BlockNumber,
bad_block: Option<BlockNumber>,
) -> Result<(), PipelineError> {
let unwind_pipeline = self.stages.iter_mut().rev();
let _locked_sf_producer = self.static_file_producer.lock();
let mut provider_rw = self.provider_factory.database_provider_rw()?;
for stage in unwind_pipeline {
let stage_id = stage.id();
let span = info_span!("Unwinding", stage = %stage_id);
let _enter = span.enter();
let mut checkpoint = provider_rw.get_stage_checkpoint(stage_id)?.unwrap_or_default();
if checkpoint.block_number < to {
debug!(
target: "sync::pipeline",
from = %checkpoint.block_number,
%to,
"Unwind point too far for stage"
);
self.event_sender.notify(PipelineEvent::Skipped { stage_id });
continue
}
info!(
target: "sync::pipeline",
from = %checkpoint.block_number,
%to,
?bad_block,
"Starting unwind"
);
while checkpoint.block_number > to {
let input = UnwindInput { checkpoint, unwind_to: to, bad_block };
self.event_sender.notify(PipelineEvent::Unwind { stage_id, input });
let output = stage.unwind(&provider_rw, input);
match output {
Ok(unwind_output) => {
checkpoint = unwind_output.checkpoint;
info!(
target: "sync::pipeline",
stage = %stage_id,
unwind_to = to,
progress = checkpoint.block_number,
done = checkpoint.block_number == to,
"Stage unwound"
);
if let Some(metrics_tx) = &mut self.metrics_tx {
let _ = metrics_tx.send(MetricEvent::StageCheckpoint {
stage_id,
checkpoint,
max_block_number: None,
});
}
provider_rw.save_stage_checkpoint(stage_id, checkpoint)?;
self.event_sender
.notify(PipelineEvent::Unwound { stage_id, result: unwind_output });
let last_saved_finalized_block_number =
provider_rw.last_finalized_block_number()?;
if last_saved_finalized_block_number.is_none() ||
Some(checkpoint.block_number) < last_saved_finalized_block_number
{
provider_rw.save_finalized_block_number(BlockNumber::from(
checkpoint.block_number,
))?;
}
UnifiedStorageWriter::commit_unwind(provider_rw)?;
stage.post_unwind_commit()?;
provider_rw = self.provider_factory.database_provider_rw()?;
}
Err(err) => {
self.event_sender.notify(PipelineEvent::Error { stage_id });
return Err(PipelineError::Stage(StageError::Fatal(Box::new(err))))
}
}
}
}
Ok(())
}
async fn execute_stage_to_completion(
&mut self,
previous_stage: Option<BlockNumber>,
stage_index: usize,
) -> Result<ControlFlow, PipelineError> {
let total_stages = self.stages.len();
let stage = &mut self.stages[stage_index];
let stage_id = stage.id();
let mut made_progress = false;
let target = self.max_block.or(previous_stage);
loop {
let prev_checkpoint = self.provider_factory.get_stage_checkpoint(stage_id)?;
let stage_reached_max_block = prev_checkpoint
.zip(self.max_block)
.is_some_and(|(prev_progress, target)| prev_progress.block_number >= target);
if stage_reached_max_block {
warn!(
target: "sync::pipeline",
stage = %stage_id,
max_block = self.max_block,
prev_block = prev_checkpoint.map(|progress| progress.block_number),
"Stage reached target block, skipping."
);
self.event_sender.notify(PipelineEvent::Skipped { stage_id });
return Ok(ControlFlow::NoProgress {
block_number: prev_checkpoint.map(|progress| progress.block_number),
})
}
let exec_input = ExecInput { target, checkpoint: prev_checkpoint };
self.event_sender.notify(PipelineEvent::Prepare {
pipeline_stages_progress: PipelineStagesProgress {
current: stage_index + 1,
total: total_stages,
},
stage_id,
checkpoint: prev_checkpoint,
target,
});
if let Err(err) = stage.execute_ready(exec_input).await {
self.event_sender.notify(PipelineEvent::Error { stage_id });
match on_stage_error(&self.provider_factory, stage_id, prev_checkpoint, err)? {
Some(ctrl) => return Ok(ctrl),
None => continue,
};
}
let provider_rw = self.provider_factory.database_provider_rw()?;
self.event_sender.notify(PipelineEvent::Run {
pipeline_stages_progress: PipelineStagesProgress {
current: stage_index + 1,
total: total_stages,
},
stage_id,
checkpoint: prev_checkpoint,
target,
});
match stage.execute(&provider_rw, exec_input) {
Ok(out @ ExecOutput { checkpoint, done }) => {
made_progress |=
checkpoint.block_number != prev_checkpoint.unwrap_or_default().block_number;
if let Some(metrics_tx) = &mut self.metrics_tx {
let _ = metrics_tx.send(MetricEvent::StageCheckpoint {
stage_id,
checkpoint,
max_block_number: target,
});
}
provider_rw.save_stage_checkpoint(stage_id, checkpoint)?;
self.event_sender.notify(PipelineEvent::Ran {
pipeline_stages_progress: PipelineStagesProgress {
current: stage_index + 1,
total: total_stages,
},
stage_id,
result: out.clone(),
});
UnifiedStorageWriter::commit(provider_rw)?;
stage.post_execute_commit()?;
if done {
let block_number = checkpoint.block_number;
return Ok(if made_progress {
ControlFlow::Continue { block_number }
} else {
ControlFlow::NoProgress { block_number: Some(block_number) }
})
}
}
Err(err) => {
drop(provider_rw);
self.event_sender.notify(PipelineEvent::Error { stage_id });
if let Some(ctrl) =
on_stage_error(&self.provider_factory, stage_id, prev_checkpoint, err)?
{
return Ok(ctrl)
}
}
}
}
}
}
fn on_stage_error<N: ProviderNodeTypes>(
factory: &ProviderFactory<N>,
stage_id: StageId,
prev_checkpoint: Option<StageCheckpoint>,
err: StageError,
) -> Result<Option<ControlFlow>, PipelineError> {
if let StageError::DetachedHead { local_head, header, error } = err {
warn!(target: "sync::pipeline", stage = %stage_id, ?local_head, ?header, %error, "Stage encountered detached head");
let unwind_to =
local_head.block.number.saturating_sub(BEACON_CONSENSUS_REORG_UNWIND_DEPTH).max(1);
Ok(Some(ControlFlow::Unwind { target: unwind_to, bad_block: local_head }))
} else if let StageError::Block { block, error } = err {
match error {
BlockErrorKind::Validation(validation_error) => {
error!(
target: "sync::pipeline",
stage = %stage_id,
bad_block = %block.block.number,
"Stage encountered a validation error: {validation_error}"
);
let provider_rw = factory.database_provider_rw()?;
provider_rw.save_stage_checkpoint_progress(StageId::MerkleExecute, vec![])?;
provider_rw.save_stage_checkpoint(
StageId::MerkleExecute,
prev_checkpoint.unwrap_or_default(),
)?;
UnifiedStorageWriter::commit(provider_rw)?;
Ok(Some(ControlFlow::Unwind {
target: prev_checkpoint.unwrap_or_default().block_number,
bad_block: block,
}))
}
BlockErrorKind::Execution(execution_error) => {
error!(
target: "sync::pipeline",
stage = %stage_id,
bad_block = %block.block.number,
"Stage encountered an execution error: {execution_error}"
);
Ok(Some(ControlFlow::Unwind {
target: prev_checkpoint.unwrap_or_default().block_number,
bad_block: block,
}))
}
}
} else if let StageError::MissingStaticFileData { block, segment } = err {
error!(
target: "sync::pipeline",
stage = %stage_id,
bad_block = %block.block.number,
segment = %segment,
"Stage is missing static file data."
);
Ok(Some(ControlFlow::Unwind { target: block.block.number - 1, bad_block: block }))
} else if err.is_fatal() {
error!(target: "sync::pipeline", stage = %stage_id, "Stage encountered a fatal error: {err}");
Err(err.into())
} else {
warn!(
target: "sync::pipeline",
stage = %stage_id,
"Stage encountered a non-fatal error: {err}. Retrying..."
);
Ok(None)
}
}
impl<N: ProviderNodeTypes> std::fmt::Debug for Pipeline<N> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Pipeline")
.field("stages", &self.stages.iter().map(|stage| stage.id()).collect::<Vec<StageId>>())
.field("max_block", &self.max_block)
.field("event_sender", &self.event_sender)
.field("fail_on_unwind", &self.fail_on_unwind)
.finish()
}
}
#[cfg(test)]
mod tests {
use std::sync::atomic::Ordering;
use super::*;
use crate::{test_utils::TestStage, UnwindOutput};
use assert_matches::assert_matches;
use reth_consensus::ConsensusError;
use reth_errors::ProviderError;
use reth_provider::test_utils::{create_test_provider_factory, MockNodeTypesWithDB};
use reth_prune::PruneModes;
use reth_testing_utils::generators::{self, random_block_with_parent};
use tokio_stream::StreamExt;
#[test]
fn record_progress_calculates_outliers() {
let mut progress = PipelineProgress::default();
progress.update(10);
assert_eq!(progress.minimum_block_number, Some(10));
assert_eq!(progress.maximum_block_number, Some(10));
progress.update(20);
assert_eq!(progress.minimum_block_number, Some(10));
assert_eq!(progress.maximum_block_number, Some(20));
progress.update(1);
assert_eq!(progress.minimum_block_number, Some(1));
assert_eq!(progress.maximum_block_number, Some(20));
}
#[test]
fn progress_ctrl_flow() {
let mut progress = PipelineProgress::default();
assert_eq!(progress.next_ctrl(), ControlFlow::NoProgress { block_number: None });
progress.update(1);
assert_eq!(progress.next_ctrl(), ControlFlow::Continue { block_number: 1 });
}
#[tokio::test]
async fn run_pipeline() {
let provider_factory = create_test_provider_factory();
let stage_a = TestStage::new(StageId::Other("A"))
.add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(20), done: true }));
let (stage_a, post_execute_commit_counter_a) = stage_a.with_post_execute_commit_counter();
let (stage_a, post_unwind_commit_counter_a) = stage_a.with_post_unwind_commit_counter();
let stage_b = TestStage::new(StageId::Other("B"))
.add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(10), done: true }));
let (stage_b, post_execute_commit_counter_b) = stage_b.with_post_execute_commit_counter();
let (stage_b, post_unwind_commit_counter_b) = stage_b.with_post_unwind_commit_counter();
let mut pipeline = Pipeline::<MockNodeTypesWithDB>::builder()
.add_stage(stage_a)
.add_stage(stage_b)
.with_max_block(10)
.build(
provider_factory.clone(),
StaticFileProducer::new(provider_factory.clone(), PruneModes::default()),
);
let events = pipeline.events();
tokio::spawn(async move {
pipeline.run().await.unwrap();
});
assert_eq!(
events.collect::<Vec<PipelineEvent>>().await,
vec![
PipelineEvent::Prepare {
pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 },
stage_id: StageId::Other("A"),
checkpoint: None,
target: Some(10),
},
PipelineEvent::Run {
pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 },
stage_id: StageId::Other("A"),
checkpoint: None,
target: Some(10),
},
PipelineEvent::Ran {
pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 },
stage_id: StageId::Other("A"),
result: ExecOutput { checkpoint: StageCheckpoint::new(20), done: true },
},
PipelineEvent::Prepare {
pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 },
stage_id: StageId::Other("B"),
checkpoint: None,
target: Some(10),
},
PipelineEvent::Run {
pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 },
stage_id: StageId::Other("B"),
checkpoint: None,
target: Some(10),
},
PipelineEvent::Ran {
pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 },
stage_id: StageId::Other("B"),
result: ExecOutput { checkpoint: StageCheckpoint::new(10), done: true },
},
]
);
assert_eq!(post_execute_commit_counter_a.load(Ordering::Relaxed), 1);
assert_eq!(post_unwind_commit_counter_a.load(Ordering::Relaxed), 0);
assert_eq!(post_execute_commit_counter_b.load(Ordering::Relaxed), 1);
assert_eq!(post_unwind_commit_counter_b.load(Ordering::Relaxed), 0);
}
#[tokio::test]
async fn unwind_pipeline() {
let provider_factory = create_test_provider_factory();
let stage_a = TestStage::new(StageId::Other("A"))
.add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(100), done: true }))
.add_unwind(Ok(UnwindOutput { checkpoint: StageCheckpoint::new(1) }));
let (stage_a, post_execute_commit_counter_a) = stage_a.with_post_execute_commit_counter();
let (stage_a, post_unwind_commit_counter_a) = stage_a.with_post_unwind_commit_counter();
let stage_b = TestStage::new(StageId::Other("B"))
.add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(10), done: true }))
.add_unwind(Ok(UnwindOutput { checkpoint: StageCheckpoint::new(1) }));
let (stage_b, post_execute_commit_counter_b) = stage_b.with_post_execute_commit_counter();
let (stage_b, post_unwind_commit_counter_b) = stage_b.with_post_unwind_commit_counter();
let stage_c = TestStage::new(StageId::Other("C"))
.add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(20), done: true }))
.add_unwind(Ok(UnwindOutput { checkpoint: StageCheckpoint::new(1) }));
let (stage_c, post_execute_commit_counter_c) = stage_c.with_post_execute_commit_counter();
let (stage_c, post_unwind_commit_counter_c) = stage_c.with_post_unwind_commit_counter();
let mut pipeline = Pipeline::<MockNodeTypesWithDB>::builder()
.add_stage(stage_a)
.add_stage(stage_b)
.add_stage(stage_c)
.with_max_block(10)
.build(
provider_factory.clone(),
StaticFileProducer::new(provider_factory.clone(), PruneModes::default()),
);
let events = pipeline.events();
tokio::spawn(async move {
pipeline.run().await.expect("Could not run pipeline");
pipeline.unwind(1, None).expect("Could not unwind pipeline");
});
assert_eq!(
events.collect::<Vec<PipelineEvent>>().await,
vec![
PipelineEvent::Prepare {
pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 3 },
stage_id: StageId::Other("A"),
checkpoint: None,
target: Some(10),
},
PipelineEvent::Run {
pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 3 },
stage_id: StageId::Other("A"),
checkpoint: None,
target: Some(10),
},
PipelineEvent::Ran {
pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 3 },
stage_id: StageId::Other("A"),
result: ExecOutput { checkpoint: StageCheckpoint::new(100), done: true },
},
PipelineEvent::Prepare {
pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 3 },
stage_id: StageId::Other("B"),
checkpoint: None,
target: Some(10),
},
PipelineEvent::Run {
pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 3 },
stage_id: StageId::Other("B"),
checkpoint: None,
target: Some(10),
},
PipelineEvent::Ran {
pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 3 },
stage_id: StageId::Other("B"),
result: ExecOutput { checkpoint: StageCheckpoint::new(10), done: true },
},
PipelineEvent::Prepare {
pipeline_stages_progress: PipelineStagesProgress { current: 3, total: 3 },
stage_id: StageId::Other("C"),
checkpoint: None,
target: Some(10),
},
PipelineEvent::Run {
pipeline_stages_progress: PipelineStagesProgress { current: 3, total: 3 },
stage_id: StageId::Other("C"),
checkpoint: None,
target: Some(10),
},
PipelineEvent::Ran {
pipeline_stages_progress: PipelineStagesProgress { current: 3, total: 3 },
stage_id: StageId::Other("C"),
result: ExecOutput { checkpoint: StageCheckpoint::new(20), done: true },
},
PipelineEvent::Unwind {
stage_id: StageId::Other("C"),
input: UnwindInput {
checkpoint: StageCheckpoint::new(20),
unwind_to: 1,
bad_block: None
}
},
PipelineEvent::Unwound {
stage_id: StageId::Other("C"),
result: UnwindOutput { checkpoint: StageCheckpoint::new(1) },
},
PipelineEvent::Unwind {
stage_id: StageId::Other("B"),
input: UnwindInput {
checkpoint: StageCheckpoint::new(10),
unwind_to: 1,
bad_block: None
}
},
PipelineEvent::Unwound {
stage_id: StageId::Other("B"),
result: UnwindOutput { checkpoint: StageCheckpoint::new(1) },
},
PipelineEvent::Unwind {
stage_id: StageId::Other("A"),
input: UnwindInput {
checkpoint: StageCheckpoint::new(100),
unwind_to: 1,
bad_block: None
}
},
PipelineEvent::Unwound {
stage_id: StageId::Other("A"),
result: UnwindOutput { checkpoint: StageCheckpoint::new(1) },
},
]
);
assert_eq!(post_execute_commit_counter_a.load(Ordering::Relaxed), 1);
assert_eq!(post_unwind_commit_counter_a.load(Ordering::Relaxed), 1);
assert_eq!(post_execute_commit_counter_b.load(Ordering::Relaxed), 1);
assert_eq!(post_unwind_commit_counter_b.load(Ordering::Relaxed), 1);
assert_eq!(post_execute_commit_counter_c.load(Ordering::Relaxed), 1);
assert_eq!(post_unwind_commit_counter_c.load(Ordering::Relaxed), 1);
}
#[tokio::test]
async fn unwind_pipeline_with_intermediate_progress() {
let provider_factory = create_test_provider_factory();
let mut pipeline = Pipeline::<MockNodeTypesWithDB>::builder()
.add_stage(
TestStage::new(StageId::Other("A"))
.add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(100), done: true }))
.add_unwind(Ok(UnwindOutput { checkpoint: StageCheckpoint::new(50) })),
)
.add_stage(
TestStage::new(StageId::Other("B"))
.add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(10), done: true })),
)
.with_max_block(10)
.build(
provider_factory.clone(),
StaticFileProducer::new(provider_factory.clone(), PruneModes::default()),
);
let events = pipeline.events();
tokio::spawn(async move {
pipeline.run().await.expect("Could not run pipeline");
pipeline.unwind(50, None).expect("Could not unwind pipeline");
});
assert_eq!(
events.collect::<Vec<PipelineEvent>>().await,
vec![
PipelineEvent::Prepare {
pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 },
stage_id: StageId::Other("A"),
checkpoint: None,
target: Some(10),
},
PipelineEvent::Run {
pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 },
stage_id: StageId::Other("A"),
checkpoint: None,
target: Some(10),
},
PipelineEvent::Ran {
pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 },
stage_id: StageId::Other("A"),
result: ExecOutput { checkpoint: StageCheckpoint::new(100), done: true },
},
PipelineEvent::Prepare {
pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 },
stage_id: StageId::Other("B"),
checkpoint: None,
target: Some(10),
},
PipelineEvent::Run {
pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 },
stage_id: StageId::Other("B"),
checkpoint: None,
target: Some(10),
},
PipelineEvent::Ran {
pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 },
stage_id: StageId::Other("B"),
result: ExecOutput { checkpoint: StageCheckpoint::new(10), done: true },
},
PipelineEvent::Skipped { stage_id: StageId::Other("B") },
PipelineEvent::Unwind {
stage_id: StageId::Other("A"),
input: UnwindInput {
checkpoint: StageCheckpoint::new(100),
unwind_to: 50,
bad_block: None
}
},
PipelineEvent::Unwound {
stage_id: StageId::Other("A"),
result: UnwindOutput { checkpoint: StageCheckpoint::new(50) },
},
]
);
}
#[tokio::test]
async fn run_pipeline_with_unwind() {
let provider_factory = create_test_provider_factory();
let mut pipeline = Pipeline::<MockNodeTypesWithDB>::builder()
.add_stage(
TestStage::new(StageId::Other("A"))
.add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(10), done: true }))
.add_unwind(Ok(UnwindOutput { checkpoint: StageCheckpoint::new(0) }))
.add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(10), done: true })),
)
.add_stage(
TestStage::new(StageId::Other("B"))
.add_exec(Err(StageError::Block {
block: Box::new(random_block_with_parent(
&mut generators::rng(),
5,
Default::default(),
)),
error: BlockErrorKind::Validation(ConsensusError::BaseFeeMissing),
}))
.add_unwind(Ok(UnwindOutput { checkpoint: StageCheckpoint::new(0) }))
.add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(10), done: true })),
)
.with_max_block(10)
.build(
provider_factory.clone(),
StaticFileProducer::new(provider_factory.clone(), PruneModes::default()),
);
let events = pipeline.events();
tokio::spawn(async move {
pipeline.run().await.expect("Could not run pipeline");
});
assert_eq!(
events.collect::<Vec<PipelineEvent>>().await,
vec![
PipelineEvent::Prepare {
pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 },
stage_id: StageId::Other("A"),
checkpoint: None,
target: Some(10),
},
PipelineEvent::Run {
pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 },
stage_id: StageId::Other("A"),
checkpoint: None,
target: Some(10),
},
PipelineEvent::Ran {
pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 },
stage_id: StageId::Other("A"),
result: ExecOutput { checkpoint: StageCheckpoint::new(10), done: true },
},
PipelineEvent::Prepare {
pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 },
stage_id: StageId::Other("B"),
checkpoint: None,
target: Some(10),
},
PipelineEvent::Run {
pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 },
stage_id: StageId::Other("B"),
checkpoint: None,
target: Some(10),
},
PipelineEvent::Error { stage_id: StageId::Other("B") },
PipelineEvent::Unwind {
stage_id: StageId::Other("A"),
input: UnwindInput {
checkpoint: StageCheckpoint::new(10),
unwind_to: 0,
bad_block: Some(5)
}
},
PipelineEvent::Unwound {
stage_id: StageId::Other("A"),
result: UnwindOutput { checkpoint: StageCheckpoint::new(0) },
},
PipelineEvent::Prepare {
pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 },
stage_id: StageId::Other("A"),
checkpoint: Some(StageCheckpoint::new(0)),
target: Some(10),
},
PipelineEvent::Run {
pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 },
stage_id: StageId::Other("A"),
checkpoint: Some(StageCheckpoint::new(0)),
target: Some(10),
},
PipelineEvent::Ran {
pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 },
stage_id: StageId::Other("A"),
result: ExecOutput { checkpoint: StageCheckpoint::new(10), done: true },
},
PipelineEvent::Prepare {
pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 },
stage_id: StageId::Other("B"),
checkpoint: None,
target: Some(10),
},
PipelineEvent::Run {
pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 },
stage_id: StageId::Other("B"),
checkpoint: None,
target: Some(10),
},
PipelineEvent::Ran {
pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 },
stage_id: StageId::Other("B"),
result: ExecOutput { checkpoint: StageCheckpoint::new(10), done: true },
},
]
);
}
#[tokio::test]
async fn pipeline_error_handling() {
let provider_factory = create_test_provider_factory();
let mut pipeline = Pipeline::<MockNodeTypesWithDB>::builder()
.add_stage(
TestStage::new(StageId::Other("NonFatal"))
.add_exec(Err(StageError::Recoverable(Box::new(std::fmt::Error))))
.add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(10), done: true })),
)
.with_max_block(10)
.build(
provider_factory.clone(),
StaticFileProducer::new(provider_factory.clone(), PruneModes::default()),
);
let result = pipeline.run().await;
assert_matches!(result, Ok(()));
let provider_factory = create_test_provider_factory();
let mut pipeline = Pipeline::<MockNodeTypesWithDB>::builder()
.add_stage(TestStage::new(StageId::Other("Fatal")).add_exec(Err(
StageError::DatabaseIntegrity(ProviderError::BlockBodyIndicesNotFound(5)),
)))
.build(
provider_factory.clone(),
StaticFileProducer::new(provider_factory.clone(), PruneModes::default()),
);
let result = pipeline.run().await;
assert_matches!(
result,
Err(PipelineError::Stage(StageError::DatabaseIntegrity(
ProviderError::BlockBodyIndicesNotFound(5)
)))
);
}
}