1mod ctrl;
2mod event;
3pub use crate::pipeline::ctrl::ControlFlow;
4use crate::{PipelineTarget, StageCheckpoint, StageId};
5use alloy_primitives::{BlockNumber, B256};
6pub use event::*;
7use futures_util::Future;
8use reth_primitives_traits::constants::BEACON_CONSENSUS_REORG_UNWIND_DEPTH;
9use reth_provider::{
10 providers::ProviderNodeTypes, BlockHashReader, BlockNumReader, ChainStateBlockReader,
11 ChainStateBlockWriter, DBProvider, DatabaseProviderFactory, ProviderFactory,
12 PruneCheckpointReader, StageCheckpointReader, StageCheckpointWriter,
13};
14use reth_prune::PrunerBuilder;
15use reth_static_file::StaticFileProducer;
16use reth_tokio_util::{EventSender, EventStream};
17use std::{
18 pin::Pin,
19 time::{Duration, Instant},
20};
21use tokio::sync::watch;
22use tracing::*;
23
24mod builder;
25mod progress;
26mod set;
27
28use crate::{
29 BlockErrorKind, ExecInput, ExecOutput, MetricEvent, MetricEventsSender, PipelineError, Stage,
30 StageError, StageExt, UnwindInput,
31};
32pub use builder::*;
33use progress::*;
34use reth_errors::RethResult;
35pub use set::*;
36
37pub(crate) type BoxedStage<DB> = Box<dyn Stage<DB>>;
39
40pub type PipelineFut<N> = Pin<Box<dyn Future<Output = PipelineWithResult<N>> + Send>>;
43
44pub type PipelineWithResult<N> = (Pipeline<N>, Result<ControlFlow, PipelineError>);
46
47#[cfg_attr(doc, aquamarine::aquamarine)]
48pub struct Pipeline<N: ProviderNodeTypes> {
70 provider_factory: ProviderFactory<N>,
72 stages: Vec<BoxedStage<<ProviderFactory<N> as DatabaseProviderFactory>::ProviderRW>>,
74 max_block: Option<BlockNumber>,
76 static_file_producer: StaticFileProducer<ProviderFactory<N>>,
77 event_sender: EventSender<PipelineEvent>,
79 progress: PipelineProgress,
81 tip_tx: Option<watch::Sender<B256>>,
85 metrics_tx: Option<MetricEventsSender>,
86 fail_on_unwind: bool,
89 last_detached_head_unwind_target: Option<B256>,
92 detached_head_attempts: u64,
95}
96
97impl<N: ProviderNodeTypes> Pipeline<N> {
98 pub fn builder() -> PipelineBuilder<<ProviderFactory<N> as DatabaseProviderFactory>::ProviderRW>
100 {
101 PipelineBuilder::default()
102 }
103
104 pub const fn minimum_block_number(&self) -> Option<u64> {
107 self.progress.minimum_block_number
108 }
109
110 #[track_caller]
112 pub fn set_tip(&self, tip: B256) {
113 let _ = self.tip_tx.as_ref().expect("tip sender is set").send(tip).map_err(|_| {
114 warn!(target: "sync::pipeline", "Chain tip channel closed");
115 });
116 }
117
118 pub fn events(&self) -> EventStream<PipelineEvent> {
120 self.event_sender.new_listener()
121 }
122
123 pub fn stage(
125 &mut self,
126 idx: usize,
127 ) -> &mut dyn Stage<<ProviderFactory<N> as DatabaseProviderFactory>::ProviderRW> {
128 &mut self.stages[idx]
129 }
130}
131
132impl<N: ProviderNodeTypes> Pipeline<N> {
133 pub fn register_metrics(&mut self) -> Result<(), PipelineError> {
135 let Some(metrics_tx) = &mut self.metrics_tx else { return Ok(()) };
136 let provider = self.provider_factory.provider()?;
137
138 for stage in &self.stages {
139 let stage_id = stage.id();
140 let _ = metrics_tx.send(MetricEvent::StageCheckpoint {
141 stage_id,
142 checkpoint: provider.get_stage_checkpoint(stage_id)?.unwrap_or_default(),
143 max_block_number: None,
144 elapsed: Duration::default(),
145 });
146 }
147 Ok(())
148 }
149
150 #[track_caller]
153 pub fn run_as_fut(mut self, target: Option<PipelineTarget>) -> PipelineFut<N> {
154 let _ = self.register_metrics();
155 Box::pin(async move {
156 if let Some(target) = target {
158 match target {
159 PipelineTarget::Sync(tip) => self.set_tip(tip),
160 PipelineTarget::Unwind(target) => {
161 if let Err(err) = self.move_to_static_files() {
162 return (self, Err(err.into()))
163 }
164 if let Err(err) = self.unwind(target, None) {
165 return (self, Err(err))
166 }
167 self.progress.update(target);
168
169 return (self, Ok(ControlFlow::Continue { block_number: target }))
170 }
171 }
172 }
173
174 let result = self.run_loop().await;
175 trace!(target: "sync::pipeline", ?target, ?result, "Pipeline finished");
176 (self, result)
177 })
178 }
179
180 pub async fn run(&mut self) -> Result<(), PipelineError> {
183 let _ = self.register_metrics(); loop {
186 let next_action = self.run_loop().await?;
187
188 if next_action.is_unwind() && self.fail_on_unwind {
189 return Err(PipelineError::UnexpectedUnwind)
190 }
191
192 if next_action.should_continue() &&
195 self.progress
196 .minimum_block_number
197 .zip(self.max_block)
198 .is_some_and(|(progress, target)| progress >= target)
199 {
200 trace!(
201 target: "sync::pipeline",
202 ?next_action,
203 minimum_block_number = ?self.progress.minimum_block_number,
204 max_block = ?self.max_block,
205 "Terminating pipeline."
206 );
207 return Ok(())
208 }
209 }
210 }
211
212 pub async fn run_loop(&mut self) -> Result<ControlFlow, PipelineError> {
224 self.move_to_static_files()?;
225
226 let mut previous_stage = None;
227 for stage_index in 0..self.stages.len() {
228 let stage = &self.stages[stage_index];
229 let stage_id = stage.id();
230
231 trace!(target: "sync::pipeline", stage = %stage_id, "Executing stage");
232 let next = self.execute_stage_to_completion(previous_stage, stage_index).await?;
233
234 trace!(target: "sync::pipeline", stage = %stage_id, ?next, "Completed stage");
235
236 match next {
237 ControlFlow::NoProgress { block_number } => {
238 if let Some(block_number) = block_number {
239 self.progress.update(block_number);
240 }
241 }
242 ControlFlow::Continue { block_number } => self.progress.update(block_number),
243 ControlFlow::Unwind { target, bad_block } => {
244 self.unwind(target, Some(bad_block.block.number))?;
245 return Ok(ControlFlow::Unwind { target, bad_block })
246 }
247 }
248
249 previous_stage = Some(
250 self.provider_factory
251 .provider()?
252 .get_stage_checkpoint(stage_id)?
253 .unwrap_or_default()
254 .block_number,
255 );
256 }
257
258 Ok(self.progress.next_ctrl())
259 }
260
261 pub fn move_to_static_files(&self) -> RethResult<()> {
275 let lowest_static_file_height =
277 self.static_file_producer.lock().copy_to_static_files()?.min_block_num();
278
279 if let Some(prune_tip) = lowest_static_file_height {
281 let mut pruner = PrunerBuilder::new(Default::default())
284 .delete_limit(usize::MAX)
285 .build_with_provider_factory(self.provider_factory.clone());
286
287 pruner.run(prune_tip)?;
288 }
289
290 Ok(())
291 }
292
293 pub fn unwind(
297 &mut self,
298 to: BlockNumber,
299 bad_block: Option<BlockNumber>,
300 ) -> Result<(), PipelineError> {
301 let provider = self.provider_factory.provider()?;
303 let latest_block = provider.last_block_number()?;
304
305 let prune_modes = provider.prune_modes_ref();
307
308 let checkpoints = provider.get_prune_checkpoints()?;
309 prune_modes.ensure_unwind_target_unpruned(latest_block, to, &checkpoints)?;
310
311 let unwind_pipeline = self.stages.iter_mut().rev();
313
314 let _locked_sf_producer = self.static_file_producer.lock();
317
318 let mut provider_rw =
319 self.provider_factory.database_provider_rw()?.disable_long_read_transaction_safety();
320
321 for stage in unwind_pipeline {
322 let stage_id = stage.id();
323 let span = info_span!("Unwinding", stage = %stage_id);
324 let _enter = span.enter();
325
326 let mut checkpoint = provider_rw.get_stage_checkpoint(stage_id)?.unwrap_or_default();
327 if checkpoint.block_number < to {
328 debug!(
329 target: "sync::pipeline",
330 from = %checkpoint.block_number,
331 %to,
332 "Unwind point too far for stage"
333 );
334 self.event_sender.notify(PipelineEvent::Skipped { stage_id });
335
336 continue
337 }
338
339 info!(
340 target: "sync::pipeline",
341 from = %checkpoint.block_number,
342 %to,
343 ?bad_block,
344 "Starting unwind"
345 );
346 while checkpoint.block_number > to {
347 let unwind_started_at = Instant::now();
348 let input = UnwindInput { checkpoint, unwind_to: to, bad_block };
349 self.event_sender.notify(PipelineEvent::Unwind { stage_id, input });
350
351 let output = stage.unwind(&provider_rw, input);
352 match output {
353 Ok(unwind_output) => {
354 checkpoint = unwind_output.checkpoint;
355 info!(
356 target: "sync::pipeline",
357 stage = %stage_id,
358 unwind_to = to,
359 progress = checkpoint.block_number,
360 done = checkpoint.block_number == to,
361 "Stage unwound"
362 );
363
364 provider_rw.save_stage_checkpoint(stage_id, checkpoint)?;
365
366 self.event_sender
368 .notify(PipelineEvent::Unwound { stage_id, result: unwind_output });
369
370 if let Some(metrics_tx) = &mut self.metrics_tx {
371 let _ = metrics_tx.send(MetricEvent::StageCheckpoint {
372 stage_id,
373 checkpoint,
374 max_block_number: None,
377 elapsed: unwind_started_at.elapsed(),
378 });
379 }
380
381 let last_saved_finalized_block_number =
383 provider_rw.last_finalized_block_number()?;
384
385 if last_saved_finalized_block_number.is_none() ||
388 Some(checkpoint.block_number) < last_saved_finalized_block_number
389 {
390 provider_rw.save_finalized_block_number(BlockNumber::from(
391 checkpoint.block_number,
392 ))?;
393 }
394
395 provider_rw.commit()?;
396
397 stage.post_unwind_commit()?;
398
399 provider_rw = self.provider_factory.database_provider_rw()?;
400 }
401 Err(err) => {
402 self.event_sender.notify(PipelineEvent::Error { stage_id });
403
404 return Err(PipelineError::Stage(StageError::Fatal(Box::new(err))))
405 }
406 }
407 }
408 }
409
410 Ok(())
411 }
412
413 async fn execute_stage_to_completion(
414 &mut self,
415 previous_stage: Option<BlockNumber>,
416 stage_index: usize,
417 ) -> Result<ControlFlow, PipelineError> {
418 let total_stages = self.stages.len();
419
420 let stage_id = self.stage(stage_index).id();
421 let mut made_progress = false;
422 let target = self.max_block.or(previous_stage);
423
424 loop {
425 let prev_checkpoint = self.provider_factory.get_stage_checkpoint(stage_id)?;
426
427 let stage_reached_max_block = prev_checkpoint
428 .zip(self.max_block)
429 .is_some_and(|(prev_progress, target)| prev_progress.block_number >= target);
430 if stage_reached_max_block {
431 warn!(
432 target: "sync::pipeline",
433 stage = %stage_id,
434 max_block = self.max_block,
435 prev_block = prev_checkpoint.map(|progress| progress.block_number),
436 "Stage reached target block, skipping."
437 );
438 self.event_sender.notify(PipelineEvent::Skipped { stage_id });
439
440 return Ok(ControlFlow::NoProgress {
442 block_number: prev_checkpoint.map(|progress| progress.block_number),
443 })
444 }
445
446 let exec_input = ExecInput { target, checkpoint: prev_checkpoint };
447
448 self.event_sender.notify(PipelineEvent::Prepare {
449 pipeline_stages_progress: PipelineStagesProgress {
450 current: stage_index + 1,
451 total: total_stages,
452 },
453 stage_id,
454 checkpoint: prev_checkpoint,
455 target,
456 });
457
458 if let Err(err) = self.stage(stage_index).execute_ready(exec_input).await {
459 self.event_sender.notify(PipelineEvent::Error { stage_id });
460 match self.on_stage_error(stage_id, prev_checkpoint, err)? {
461 Some(ctrl) => return Ok(ctrl),
462 None => continue,
463 };
464 }
465
466 let stage_started_at = Instant::now();
467 let provider_rw = self.provider_factory.database_provider_rw()?;
468
469 self.event_sender.notify(PipelineEvent::Run {
470 pipeline_stages_progress: PipelineStagesProgress {
471 current: stage_index + 1,
472 total: total_stages,
473 },
474 stage_id,
475 checkpoint: prev_checkpoint,
476 target,
477 });
478
479 match self.stage(stage_index).execute(&provider_rw, exec_input) {
480 Ok(out @ ExecOutput { checkpoint, done }) => {
481 provider_rw.save_stage_checkpoint(stage_id, checkpoint)?;
483
484 provider_rw.commit()?;
486
487 self.stage(stage_index).post_execute_commit()?;
489
490 self.event_sender.notify(PipelineEvent::Ran {
492 pipeline_stages_progress: PipelineStagesProgress {
493 current: stage_index + 1,
494 total: total_stages,
495 },
496 stage_id,
497 result: out.clone(),
498 });
499 if let Some(metrics_tx) = &mut self.metrics_tx {
500 let _ = metrics_tx.send(MetricEvent::StageCheckpoint {
501 stage_id,
502 checkpoint,
503 max_block_number: target,
504 elapsed: stage_started_at.elapsed(),
505 });
506 }
507
508 let block_number = checkpoint.block_number;
509 let prev_block_number = prev_checkpoint.unwrap_or_default().block_number;
510 made_progress |= block_number != prev_block_number;
511 if done {
512 return Ok(if made_progress {
513 ControlFlow::Continue { block_number }
514 } else {
515 ControlFlow::NoProgress { block_number: Some(block_number) }
516 })
517 }
518 }
519 Err(err) => {
520 drop(provider_rw);
521 self.event_sender.notify(PipelineEvent::Error { stage_id });
522
523 if let Some(ctrl) = self.on_stage_error(stage_id, prev_checkpoint, err)? {
524 return Ok(ctrl)
525 }
526 }
527 }
528 }
529 }
530
531 fn on_stage_error(
532 &mut self,
533 stage_id: StageId,
534 prev_checkpoint: Option<StageCheckpoint>,
535 err: StageError,
536 ) -> Result<Option<ControlFlow>, PipelineError> {
537 if let StageError::DetachedHead { local_head, header, error } = err {
538 warn!(target: "sync::pipeline", stage = %stage_id, ?local_head, ?header, %error, "Stage encountered detached head");
539
540 if let Some(last_detached_head_unwind_target) = self.last_detached_head_unwind_target {
541 if local_head.block.hash == last_detached_head_unwind_target &&
542 header.block.number == local_head.block.number + 1
543 {
544 self.detached_head_attempts += 1;
545 } else {
546 self.detached_head_attempts = 1;
547 }
548 } else {
549 self.detached_head_attempts = 1;
550 }
551
552 let unwind_to = local_head
554 .block
555 .number
556 .saturating_sub(
557 BEACON_CONSENSUS_REORG_UNWIND_DEPTH.saturating_mul(self.detached_head_attempts),
558 )
559 .max(1);
560
561 self.last_detached_head_unwind_target = self.provider_factory.block_hash(unwind_to)?;
562 Ok(Some(ControlFlow::Unwind { target: unwind_to, bad_block: local_head }))
563 } else if let StageError::Block { block, error } = err {
564 match error {
565 BlockErrorKind::Validation(validation_error) => {
566 error!(
567 target: "sync::pipeline",
568 stage = %stage_id,
569 bad_block = %block.block.number,
570 "Stage encountered a validation error: {validation_error}"
571 );
572
573 if stage_id == StageId::MerkleExecute {
578 let provider_rw = self.provider_factory.database_provider_rw()?;
579 provider_rw
580 .save_stage_checkpoint_progress(StageId::MerkleExecute, vec![])?;
581 provider_rw.save_stage_checkpoint(
582 StageId::MerkleExecute,
583 prev_checkpoint.unwrap_or_default(),
584 )?;
585
586 provider_rw.commit()?;
587 }
588
589 Ok(Some(ControlFlow::Unwind {
594 target: prev_checkpoint.unwrap_or_default().block_number,
595 bad_block: block,
596 }))
597 }
598 BlockErrorKind::Execution(execution_error) => {
599 error!(
600 target: "sync::pipeline",
601 stage = %stage_id,
602 bad_block = %block.block.number,
603 "Stage encountered an execution error: {execution_error}"
604 );
605
606 Ok(Some(ControlFlow::Unwind {
611 target: prev_checkpoint.unwrap_or_default().block_number,
612 bad_block: block,
613 }))
614 }
615 }
616 } else if let StageError::MissingStaticFileData { block, segment } = err {
617 error!(
618 target: "sync::pipeline",
619 stage = %stage_id,
620 bad_block = %block.block.number,
621 segment = %segment,
622 "Stage is missing static file data."
623 );
624
625 Ok(Some(ControlFlow::Unwind {
626 target: block.block.number.saturating_sub(1),
627 bad_block: block,
628 }))
629 } else if err.is_fatal() {
630 error!(target: "sync::pipeline", stage = %stage_id, "Stage encountered a fatal error: {err}");
631 Err(err.into())
632 } else {
633 warn!(
636 target: "sync::pipeline",
637 stage = %stage_id,
638 "Stage encountered a non-fatal error: {err}. Retrying..."
639 );
640 Ok(None)
641 }
642 }
643}
644
645impl<N: ProviderNodeTypes> std::fmt::Debug for Pipeline<N> {
646 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
647 f.debug_struct("Pipeline")
648 .field("stages", &self.stages.iter().map(|stage| stage.id()).collect::<Vec<StageId>>())
649 .field("max_block", &self.max_block)
650 .field("event_sender", &self.event_sender)
651 .field("fail_on_unwind", &self.fail_on_unwind)
652 .finish()
653 }
654}
655
656#[cfg(test)]
657mod tests {
658 use std::sync::atomic::Ordering;
659
660 use super::*;
661 use crate::{test_utils::TestStage, UnwindOutput};
662 use assert_matches::assert_matches;
663 use reth_consensus::ConsensusError;
664 use reth_errors::ProviderError;
665 use reth_provider::test_utils::{create_test_provider_factory, MockNodeTypesWithDB};
666 use reth_prune::PruneModes;
667 use reth_testing_utils::generators::{self, random_block_with_parent};
668 use tokio_stream::StreamExt;
669
670 #[test]
671 fn record_progress_calculates_outliers() {
672 let mut progress = PipelineProgress::default();
673
674 progress.update(10);
675 assert_eq!(progress.minimum_block_number, Some(10));
676 assert_eq!(progress.maximum_block_number, Some(10));
677
678 progress.update(20);
679 assert_eq!(progress.minimum_block_number, Some(10));
680 assert_eq!(progress.maximum_block_number, Some(20));
681
682 progress.update(1);
683 assert_eq!(progress.minimum_block_number, Some(1));
684 assert_eq!(progress.maximum_block_number, Some(20));
685 }
686
687 #[test]
688 fn progress_ctrl_flow() {
689 let mut progress = PipelineProgress::default();
690
691 assert_eq!(progress.next_ctrl(), ControlFlow::NoProgress { block_number: None });
692
693 progress.update(1);
694 assert_eq!(progress.next_ctrl(), ControlFlow::Continue { block_number: 1 });
695 }
696
697 #[tokio::test]
699 async fn run_pipeline() {
700 let provider_factory = create_test_provider_factory();
701
702 let stage_a = TestStage::new(StageId::Other("A"))
703 .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(20), done: true }));
704 let (stage_a, post_execute_commit_counter_a) = stage_a.with_post_execute_commit_counter();
705 let (stage_a, post_unwind_commit_counter_a) = stage_a.with_post_unwind_commit_counter();
706
707 let stage_b = TestStage::new(StageId::Other("B"))
708 .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(10), done: true }));
709 let (stage_b, post_execute_commit_counter_b) = stage_b.with_post_execute_commit_counter();
710 let (stage_b, post_unwind_commit_counter_b) = stage_b.with_post_unwind_commit_counter();
711
712 let mut pipeline = Pipeline::<MockNodeTypesWithDB>::builder()
713 .add_stage(stage_a)
714 .add_stage(stage_b)
715 .with_max_block(10)
716 .build(
717 provider_factory.clone(),
718 StaticFileProducer::new(provider_factory.clone(), PruneModes::default()),
719 );
720 let events = pipeline.events();
721
722 tokio::spawn(async move {
724 pipeline.run().await.unwrap();
725 });
726
727 assert_eq!(
729 events.collect::<Vec<PipelineEvent>>().await,
730 vec![
731 PipelineEvent::Prepare {
732 pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 },
733 stage_id: StageId::Other("A"),
734 checkpoint: None,
735 target: Some(10),
736 },
737 PipelineEvent::Run {
738 pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 },
739 stage_id: StageId::Other("A"),
740 checkpoint: None,
741 target: Some(10),
742 },
743 PipelineEvent::Ran {
744 pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 },
745 stage_id: StageId::Other("A"),
746 result: ExecOutput { checkpoint: StageCheckpoint::new(20), done: true },
747 },
748 PipelineEvent::Prepare {
749 pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 },
750 stage_id: StageId::Other("B"),
751 checkpoint: None,
752 target: Some(10),
753 },
754 PipelineEvent::Run {
755 pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 },
756 stage_id: StageId::Other("B"),
757 checkpoint: None,
758 target: Some(10),
759 },
760 PipelineEvent::Ran {
761 pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 },
762 stage_id: StageId::Other("B"),
763 result: ExecOutput { checkpoint: StageCheckpoint::new(10), done: true },
764 },
765 ]
766 );
767
768 assert_eq!(post_execute_commit_counter_a.load(Ordering::Relaxed), 1);
769 assert_eq!(post_unwind_commit_counter_a.load(Ordering::Relaxed), 0);
770
771 assert_eq!(post_execute_commit_counter_b.load(Ordering::Relaxed), 1);
772 assert_eq!(post_unwind_commit_counter_b.load(Ordering::Relaxed), 0);
773 }
774
775 #[tokio::test]
777 async fn unwind_pipeline() {
778 let provider_factory = create_test_provider_factory();
779
780 let stage_a = TestStage::new(StageId::Other("A"))
781 .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(100), done: true }))
782 .add_unwind(Ok(UnwindOutput { checkpoint: StageCheckpoint::new(1) }));
783 let (stage_a, post_execute_commit_counter_a) = stage_a.with_post_execute_commit_counter();
784 let (stage_a, post_unwind_commit_counter_a) = stage_a.with_post_unwind_commit_counter();
785
786 let stage_b = TestStage::new(StageId::Other("B"))
787 .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(10), done: true }))
788 .add_unwind(Ok(UnwindOutput { checkpoint: StageCheckpoint::new(1) }));
789 let (stage_b, post_execute_commit_counter_b) = stage_b.with_post_execute_commit_counter();
790 let (stage_b, post_unwind_commit_counter_b) = stage_b.with_post_unwind_commit_counter();
791
792 let stage_c = TestStage::new(StageId::Other("C"))
793 .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(20), done: true }))
794 .add_unwind(Ok(UnwindOutput { checkpoint: StageCheckpoint::new(1) }));
795 let (stage_c, post_execute_commit_counter_c) = stage_c.with_post_execute_commit_counter();
796 let (stage_c, post_unwind_commit_counter_c) = stage_c.with_post_unwind_commit_counter();
797
798 let mut pipeline = Pipeline::<MockNodeTypesWithDB>::builder()
799 .add_stage(stage_a)
800 .add_stage(stage_b)
801 .add_stage(stage_c)
802 .with_max_block(10)
803 .build(
804 provider_factory.clone(),
805 StaticFileProducer::new(provider_factory.clone(), PruneModes::default()),
806 );
807 let events = pipeline.events();
808
809 tokio::spawn(async move {
811 pipeline.run().await.expect("Could not run pipeline");
813
814 pipeline.unwind(1, None).expect("Could not unwind pipeline");
816 });
817
818 assert_eq!(
820 events.collect::<Vec<PipelineEvent>>().await,
821 vec![
822 PipelineEvent::Prepare {
824 pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 3 },
825 stage_id: StageId::Other("A"),
826 checkpoint: None,
827 target: Some(10),
828 },
829 PipelineEvent::Run {
830 pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 3 },
831 stage_id: StageId::Other("A"),
832 checkpoint: None,
833 target: Some(10),
834 },
835 PipelineEvent::Ran {
836 pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 3 },
837 stage_id: StageId::Other("A"),
838 result: ExecOutput { checkpoint: StageCheckpoint::new(100), done: true },
839 },
840 PipelineEvent::Prepare {
841 pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 3 },
842 stage_id: StageId::Other("B"),
843 checkpoint: None,
844 target: Some(10),
845 },
846 PipelineEvent::Run {
847 pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 3 },
848 stage_id: StageId::Other("B"),
849 checkpoint: None,
850 target: Some(10),
851 },
852 PipelineEvent::Ran {
853 pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 3 },
854 stage_id: StageId::Other("B"),
855 result: ExecOutput { checkpoint: StageCheckpoint::new(10), done: true },
856 },
857 PipelineEvent::Prepare {
858 pipeline_stages_progress: PipelineStagesProgress { current: 3, total: 3 },
859 stage_id: StageId::Other("C"),
860 checkpoint: None,
861 target: Some(10),
862 },
863 PipelineEvent::Run {
864 pipeline_stages_progress: PipelineStagesProgress { current: 3, total: 3 },
865 stage_id: StageId::Other("C"),
866 checkpoint: None,
867 target: Some(10),
868 },
869 PipelineEvent::Ran {
870 pipeline_stages_progress: PipelineStagesProgress { current: 3, total: 3 },
871 stage_id: StageId::Other("C"),
872 result: ExecOutput { checkpoint: StageCheckpoint::new(20), done: true },
873 },
874 PipelineEvent::Unwind {
876 stage_id: StageId::Other("C"),
877 input: UnwindInput {
878 checkpoint: StageCheckpoint::new(20),
879 unwind_to: 1,
880 bad_block: None
881 }
882 },
883 PipelineEvent::Unwound {
884 stage_id: StageId::Other("C"),
885 result: UnwindOutput { checkpoint: StageCheckpoint::new(1) },
886 },
887 PipelineEvent::Unwind {
888 stage_id: StageId::Other("B"),
889 input: UnwindInput {
890 checkpoint: StageCheckpoint::new(10),
891 unwind_to: 1,
892 bad_block: None
893 }
894 },
895 PipelineEvent::Unwound {
896 stage_id: StageId::Other("B"),
897 result: UnwindOutput { checkpoint: StageCheckpoint::new(1) },
898 },
899 PipelineEvent::Unwind {
900 stage_id: StageId::Other("A"),
901 input: UnwindInput {
902 checkpoint: StageCheckpoint::new(100),
903 unwind_to: 1,
904 bad_block: None
905 }
906 },
907 PipelineEvent::Unwound {
908 stage_id: StageId::Other("A"),
909 result: UnwindOutput { checkpoint: StageCheckpoint::new(1) },
910 },
911 ]
912 );
913
914 assert_eq!(post_execute_commit_counter_a.load(Ordering::Relaxed), 1);
915 assert_eq!(post_unwind_commit_counter_a.load(Ordering::Relaxed), 1);
916
917 assert_eq!(post_execute_commit_counter_b.load(Ordering::Relaxed), 1);
918 assert_eq!(post_unwind_commit_counter_b.load(Ordering::Relaxed), 1);
919
920 assert_eq!(post_execute_commit_counter_c.load(Ordering::Relaxed), 1);
921 assert_eq!(post_unwind_commit_counter_c.load(Ordering::Relaxed), 1);
922 }
923
924 #[tokio::test]
926 async fn unwind_pipeline_with_intermediate_progress() {
927 let provider_factory = create_test_provider_factory();
928
929 let mut pipeline = Pipeline::<MockNodeTypesWithDB>::builder()
930 .add_stage(
931 TestStage::new(StageId::Other("A"))
932 .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(100), done: true }))
933 .add_unwind(Ok(UnwindOutput { checkpoint: StageCheckpoint::new(50) })),
934 )
935 .add_stage(
936 TestStage::new(StageId::Other("B"))
937 .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(10), done: true })),
938 )
939 .with_max_block(10)
940 .build(
941 provider_factory.clone(),
942 StaticFileProducer::new(provider_factory.clone(), PruneModes::default()),
943 );
944 let events = pipeline.events();
945
946 tokio::spawn(async move {
948 pipeline.run().await.expect("Could not run pipeline");
950
951 pipeline.unwind(50, None).expect("Could not unwind pipeline");
953 });
954
955 assert_eq!(
957 events.collect::<Vec<PipelineEvent>>().await,
958 vec![
959 PipelineEvent::Prepare {
961 pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 },
962 stage_id: StageId::Other("A"),
963 checkpoint: None,
964 target: Some(10),
965 },
966 PipelineEvent::Run {
967 pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 },
968 stage_id: StageId::Other("A"),
969 checkpoint: None,
970 target: Some(10),
971 },
972 PipelineEvent::Ran {
973 pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 },
974 stage_id: StageId::Other("A"),
975 result: ExecOutput { checkpoint: StageCheckpoint::new(100), done: true },
976 },
977 PipelineEvent::Prepare {
978 pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 },
979 stage_id: StageId::Other("B"),
980 checkpoint: None,
981 target: Some(10),
982 },
983 PipelineEvent::Run {
984 pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 },
985 stage_id: StageId::Other("B"),
986 checkpoint: None,
987 target: Some(10),
988 },
989 PipelineEvent::Ran {
990 pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 },
991 stage_id: StageId::Other("B"),
992 result: ExecOutput { checkpoint: StageCheckpoint::new(10), done: true },
993 },
994 PipelineEvent::Skipped { stage_id: StageId::Other("B") },
997 PipelineEvent::Unwind {
998 stage_id: StageId::Other("A"),
999 input: UnwindInput {
1000 checkpoint: StageCheckpoint::new(100),
1001 unwind_to: 50,
1002 bad_block: None
1003 }
1004 },
1005 PipelineEvent::Unwound {
1006 stage_id: StageId::Other("A"),
1007 result: UnwindOutput { checkpoint: StageCheckpoint::new(50) },
1008 },
1009 ]
1010 );
1011 }
1012
1013 #[tokio::test]
1026 async fn run_pipeline_with_unwind() {
1027 let provider_factory = create_test_provider_factory();
1028
1029 let mut pipeline = Pipeline::<MockNodeTypesWithDB>::builder()
1030 .add_stage(
1031 TestStage::new(StageId::Other("A"))
1032 .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(10), done: true }))
1033 .add_unwind(Ok(UnwindOutput { checkpoint: StageCheckpoint::new(0) }))
1034 .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(10), done: true })),
1035 )
1036 .add_stage(
1037 TestStage::new(StageId::Other("B"))
1038 .add_exec(Err(StageError::Block {
1039 block: Box::new(random_block_with_parent(
1040 &mut generators::rng(),
1041 5,
1042 Default::default(),
1043 )),
1044 error: BlockErrorKind::Validation(ConsensusError::BaseFeeMissing),
1045 }))
1046 .add_unwind(Ok(UnwindOutput { checkpoint: StageCheckpoint::new(0) }))
1047 .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(10), done: true })),
1048 )
1049 .with_max_block(10)
1050 .build(
1051 provider_factory.clone(),
1052 StaticFileProducer::new(provider_factory.clone(), PruneModes::default()),
1053 );
1054 let events = pipeline.events();
1055
1056 tokio::spawn(async move {
1058 pipeline.run().await.expect("Could not run pipeline");
1059 });
1060
1061 assert_eq!(
1063 events.collect::<Vec<PipelineEvent>>().await,
1064 vec![
1065 PipelineEvent::Prepare {
1066 pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 },
1067 stage_id: StageId::Other("A"),
1068 checkpoint: None,
1069 target: Some(10),
1070 },
1071 PipelineEvent::Run {
1072 pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 },
1073 stage_id: StageId::Other("A"),
1074 checkpoint: None,
1075 target: Some(10),
1076 },
1077 PipelineEvent::Ran {
1078 pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 },
1079 stage_id: StageId::Other("A"),
1080 result: ExecOutput { checkpoint: StageCheckpoint::new(10), done: true },
1081 },
1082 PipelineEvent::Prepare {
1083 pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 },
1084 stage_id: StageId::Other("B"),
1085 checkpoint: None,
1086 target: Some(10),
1087 },
1088 PipelineEvent::Run {
1089 pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 },
1090 stage_id: StageId::Other("B"),
1091 checkpoint: None,
1092 target: Some(10),
1093 },
1094 PipelineEvent::Error { stage_id: StageId::Other("B") },
1095 PipelineEvent::Unwind {
1096 stage_id: StageId::Other("A"),
1097 input: UnwindInput {
1098 checkpoint: StageCheckpoint::new(10),
1099 unwind_to: 0,
1100 bad_block: Some(5)
1101 }
1102 },
1103 PipelineEvent::Unwound {
1104 stage_id: StageId::Other("A"),
1105 result: UnwindOutput { checkpoint: StageCheckpoint::new(0) },
1106 },
1107 PipelineEvent::Prepare {
1108 pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 },
1109 stage_id: StageId::Other("A"),
1110 checkpoint: Some(StageCheckpoint::new(0)),
1111 target: Some(10),
1112 },
1113 PipelineEvent::Run {
1114 pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 },
1115 stage_id: StageId::Other("A"),
1116 checkpoint: Some(StageCheckpoint::new(0)),
1117 target: Some(10),
1118 },
1119 PipelineEvent::Ran {
1120 pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 },
1121 stage_id: StageId::Other("A"),
1122 result: ExecOutput { checkpoint: StageCheckpoint::new(10), done: true },
1123 },
1124 PipelineEvent::Prepare {
1125 pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 },
1126 stage_id: StageId::Other("B"),
1127 checkpoint: None,
1128 target: Some(10),
1129 },
1130 PipelineEvent::Run {
1131 pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 },
1132 stage_id: StageId::Other("B"),
1133 checkpoint: None,
1134 target: Some(10),
1135 },
1136 PipelineEvent::Ran {
1137 pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 },
1138 stage_id: StageId::Other("B"),
1139 result: ExecOutput { checkpoint: StageCheckpoint::new(10), done: true },
1140 },
1141 ]
1142 );
1143 }
1144
1145 #[tokio::test]
1147 async fn pipeline_error_handling() {
1148 let provider_factory = create_test_provider_factory();
1150 let mut pipeline = Pipeline::<MockNodeTypesWithDB>::builder()
1151 .add_stage(
1152 TestStage::new(StageId::Other("NonFatal"))
1153 .add_exec(Err(StageError::Recoverable(Box::new(std::fmt::Error))))
1154 .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(10), done: true })),
1155 )
1156 .with_max_block(10)
1157 .build(
1158 provider_factory.clone(),
1159 StaticFileProducer::new(provider_factory.clone(), PruneModes::default()),
1160 );
1161 let result = pipeline.run().await;
1162 assert_matches!(result, Ok(()));
1163
1164 let provider_factory = create_test_provider_factory();
1166 let mut pipeline = Pipeline::<MockNodeTypesWithDB>::builder()
1167 .add_stage(TestStage::new(StageId::Other("Fatal")).add_exec(Err(
1168 StageError::DatabaseIntegrity(ProviderError::BlockBodyIndicesNotFound(5)),
1169 )))
1170 .build(
1171 provider_factory.clone(),
1172 StaticFileProducer::new(provider_factory.clone(), PruneModes::default()),
1173 );
1174 let result = pipeline.run().await;
1175 assert_matches!(
1176 result,
1177 Err(PipelineError::Stage(StageError::DatabaseIntegrity(
1178 ProviderError::BlockBodyIndicesNotFound(5)
1179 )))
1180 );
1181 }
1182}