1mod ctrl;
2mod event;
3pub use crate::pipeline::ctrl::ControlFlow;
4use crate::{PipelineTarget, StageCheckpoint, StageId};
5use alloy_primitives::{BlockNumber, B256};
6pub use event::*;
7use futures_util::Future;
8use reth_primitives_traits::constants::BEACON_CONSENSUS_REORG_UNWIND_DEPTH;
9use reth_provider::{
10 providers::ProviderNodeTypes, writer::UnifiedStorageWriter, ChainStateBlockReader,
11 ChainStateBlockWriter, DatabaseProviderFactory, ProviderFactory, StageCheckpointReader,
12 StageCheckpointWriter,
13};
14use reth_prune::PrunerBuilder;
15use reth_static_file::StaticFileProducer;
16use reth_tokio_util::{EventSender, EventStream};
17use std::pin::Pin;
18use tokio::sync::watch;
19use tracing::*;
20
21mod builder;
22mod progress;
23mod set;
24
25use crate::{
26 BlockErrorKind, ExecInput, ExecOutput, MetricEvent, MetricEventsSender, PipelineError, Stage,
27 StageError, StageExt, UnwindInput,
28};
29pub use builder::*;
30use progress::*;
31use reth_errors::RethResult;
32pub use set::*;
33
34pub(crate) type BoxedStage<DB> = Box<dyn Stage<DB>>;
36
37pub type PipelineFut<N> = Pin<Box<dyn Future<Output = PipelineWithResult<N>> + Send>>;
40
41pub type PipelineWithResult<N> = (Pipeline<N>, Result<ControlFlow, PipelineError>);
43
44#[cfg_attr(doc, aquamarine::aquamarine)]
45pub struct Pipeline<N: ProviderNodeTypes> {
67 provider_factory: ProviderFactory<N>,
69 stages: Vec<BoxedStage<<ProviderFactory<N> as DatabaseProviderFactory>::ProviderRW>>,
71 max_block: Option<BlockNumber>,
73 static_file_producer: StaticFileProducer<ProviderFactory<N>>,
74 event_sender: EventSender<PipelineEvent>,
76 progress: PipelineProgress,
78 tip_tx: Option<watch::Sender<B256>>,
82 metrics_tx: Option<MetricEventsSender>,
83 fail_on_unwind: bool,
86}
87
88impl<N: ProviderNodeTypes> Pipeline<N> {
89 pub fn builder() -> PipelineBuilder<<ProviderFactory<N> as DatabaseProviderFactory>::ProviderRW>
91 {
92 PipelineBuilder::default()
93 }
94
95 pub const fn minimum_block_number(&self) -> Option<u64> {
98 self.progress.minimum_block_number
99 }
100
101 #[track_caller]
103 pub fn set_tip(&self, tip: B256) {
104 let _ = self.tip_tx.as_ref().expect("tip sender is set").send(tip).map_err(|_| {
105 warn!(target: "sync::pipeline", "Chain tip channel closed");
106 });
107 }
108
109 pub fn events(&self) -> EventStream<PipelineEvent> {
111 self.event_sender.new_listener()
112 }
113}
114
115impl<N: ProviderNodeTypes> Pipeline<N> {
116 pub fn register_metrics(&mut self) -> Result<(), PipelineError> {
118 let Some(metrics_tx) = &mut self.metrics_tx else { return Ok(()) };
119 let provider = self.provider_factory.provider()?;
120
121 for stage in &self.stages {
122 let stage_id = stage.id();
123 let _ = metrics_tx.send(MetricEvent::StageCheckpoint {
124 stage_id,
125 checkpoint: provider.get_stage_checkpoint(stage_id)?.unwrap_or_default(),
126 max_block_number: None,
127 });
128 }
129 Ok(())
130 }
131
132 #[track_caller]
135 pub fn run_as_fut(mut self, target: Option<PipelineTarget>) -> PipelineFut<N> {
136 let _ = self.register_metrics();
137 Box::pin(async move {
138 if let Some(target) = target {
140 match target {
141 PipelineTarget::Sync(tip) => self.set_tip(tip),
142 PipelineTarget::Unwind(target) => {
143 if let Err(err) = self.move_to_static_files() {
144 return (self, Err(err.into()))
145 }
146 if let Err(err) = self.unwind(target, None) {
147 return (self, Err(err))
148 }
149 self.progress.update(target);
150
151 return (self, Ok(ControlFlow::Continue { block_number: target }))
152 }
153 }
154 }
155
156 let result = self.run_loop().await;
157 trace!(target: "sync::pipeline", ?target, ?result, "Pipeline finished");
158 (self, result)
159 })
160 }
161
162 pub async fn run(&mut self) -> Result<(), PipelineError> {
165 let _ = self.register_metrics(); loop {
168 let next_action = self.run_loop().await?;
169
170 if next_action.is_unwind() && self.fail_on_unwind {
171 return Err(PipelineError::UnexpectedUnwind)
172 }
173
174 if next_action.should_continue() &&
177 self.progress
178 .minimum_block_number
179 .zip(self.max_block)
180 .is_some_and(|(progress, target)| progress >= target)
181 {
182 trace!(
183 target: "sync::pipeline",
184 ?next_action,
185 minimum_block_number = ?self.progress.minimum_block_number,
186 max_block = ?self.max_block,
187 "Terminating pipeline."
188 );
189 return Ok(())
190 }
191 }
192 }
193
194 pub async fn run_loop(&mut self) -> Result<ControlFlow, PipelineError> {
206 self.move_to_static_files()?;
207
208 let mut previous_stage = None;
209 for stage_index in 0..self.stages.len() {
210 let stage = &self.stages[stage_index];
211 let stage_id = stage.id();
212
213 trace!(target: "sync::pipeline", stage = %stage_id, "Executing stage");
214 let next = self.execute_stage_to_completion(previous_stage, stage_index).await?;
215
216 trace!(target: "sync::pipeline", stage = %stage_id, ?next, "Completed stage");
217
218 match next {
219 ControlFlow::NoProgress { block_number } => {
220 if let Some(block_number) = block_number {
221 self.progress.update(block_number);
222 }
223 }
224 ControlFlow::Continue { block_number } => self.progress.update(block_number),
225 ControlFlow::Unwind { target, bad_block } => {
226 self.unwind(target, Some(bad_block.block.number))?;
227 return Ok(ControlFlow::Unwind { target, bad_block })
228 }
229 }
230
231 previous_stage = Some(
232 self.provider_factory
233 .provider()?
234 .get_stage_checkpoint(stage_id)?
235 .unwrap_or_default()
236 .block_number,
237 );
238 }
239
240 Ok(self.progress.next_ctrl())
241 }
242
243 pub fn move_to_static_files(&self) -> RethResult<()> {
257 let lowest_static_file_height =
259 self.static_file_producer.lock().copy_to_static_files()?.min_block_num();
260
261 if let Some(prune_tip) = lowest_static_file_height {
263 let mut pruner = PrunerBuilder::new(Default::default())
266 .delete_limit(usize::MAX)
267 .build_with_provider_factory(self.provider_factory.clone());
268
269 pruner.run(prune_tip)?;
270 }
271
272 Ok(())
273 }
274
275 pub fn unwind(
279 &mut self,
280 to: BlockNumber,
281 bad_block: Option<BlockNumber>,
282 ) -> Result<(), PipelineError> {
283 let unwind_pipeline = self.stages.iter_mut().rev();
285
286 let _locked_sf_producer = self.static_file_producer.lock();
289
290 let mut provider_rw = self.provider_factory.database_provider_rw()?;
291
292 for stage in unwind_pipeline {
293 let stage_id = stage.id();
294 let span = info_span!("Unwinding", stage = %stage_id);
295 let _enter = span.enter();
296
297 let mut checkpoint = provider_rw.get_stage_checkpoint(stage_id)?.unwrap_or_default();
298 if checkpoint.block_number < to {
299 debug!(
300 target: "sync::pipeline",
301 from = %checkpoint.block_number,
302 %to,
303 "Unwind point too far for stage"
304 );
305 self.event_sender.notify(PipelineEvent::Skipped { stage_id });
306
307 continue
308 }
309
310 info!(
311 target: "sync::pipeline",
312 from = %checkpoint.block_number,
313 %to,
314 ?bad_block,
315 "Starting unwind"
316 );
317 while checkpoint.block_number > to {
318 let input = UnwindInput { checkpoint, unwind_to: to, bad_block };
319 self.event_sender.notify(PipelineEvent::Unwind { stage_id, input });
320
321 let output = stage.unwind(&provider_rw, input);
322 match output {
323 Ok(unwind_output) => {
324 checkpoint = unwind_output.checkpoint;
325 info!(
326 target: "sync::pipeline",
327 stage = %stage_id,
328 unwind_to = to,
329 progress = checkpoint.block_number,
330 done = checkpoint.block_number == to,
331 "Stage unwound"
332 );
333 if let Some(metrics_tx) = &mut self.metrics_tx {
334 let _ = metrics_tx.send(MetricEvent::StageCheckpoint {
335 stage_id,
336 checkpoint,
337 max_block_number: None,
340 });
341 }
342 provider_rw.save_stage_checkpoint(stage_id, checkpoint)?;
343
344 self.event_sender
345 .notify(PipelineEvent::Unwound { stage_id, result: unwind_output });
346
347 let last_saved_finalized_block_number =
349 provider_rw.last_finalized_block_number()?;
350
351 if last_saved_finalized_block_number.is_none() ||
354 Some(checkpoint.block_number) < last_saved_finalized_block_number
355 {
356 provider_rw.save_finalized_block_number(BlockNumber::from(
357 checkpoint.block_number,
358 ))?;
359 }
360
361 UnifiedStorageWriter::commit_unwind(provider_rw)?;
362
363 stage.post_unwind_commit()?;
364
365 provider_rw = self.provider_factory.database_provider_rw()?;
366 }
367 Err(err) => {
368 self.event_sender.notify(PipelineEvent::Error { stage_id });
369
370 return Err(PipelineError::Stage(StageError::Fatal(Box::new(err))))
371 }
372 }
373 }
374 }
375
376 Ok(())
377 }
378
379 async fn execute_stage_to_completion(
380 &mut self,
381 previous_stage: Option<BlockNumber>,
382 stage_index: usize,
383 ) -> Result<ControlFlow, PipelineError> {
384 let total_stages = self.stages.len();
385
386 let stage = &mut self.stages[stage_index];
387 let stage_id = stage.id();
388 let mut made_progress = false;
389 let target = self.max_block.or(previous_stage);
390
391 loop {
392 let prev_checkpoint = self.provider_factory.get_stage_checkpoint(stage_id)?;
393
394 let stage_reached_max_block = prev_checkpoint
395 .zip(self.max_block)
396 .is_some_and(|(prev_progress, target)| prev_progress.block_number >= target);
397 if stage_reached_max_block {
398 warn!(
399 target: "sync::pipeline",
400 stage = %stage_id,
401 max_block = self.max_block,
402 prev_block = prev_checkpoint.map(|progress| progress.block_number),
403 "Stage reached target block, skipping."
404 );
405 self.event_sender.notify(PipelineEvent::Skipped { stage_id });
406
407 return Ok(ControlFlow::NoProgress {
409 block_number: prev_checkpoint.map(|progress| progress.block_number),
410 })
411 }
412
413 let exec_input = ExecInput { target, checkpoint: prev_checkpoint };
414
415 self.event_sender.notify(PipelineEvent::Prepare {
416 pipeline_stages_progress: PipelineStagesProgress {
417 current: stage_index + 1,
418 total: total_stages,
419 },
420 stage_id,
421 checkpoint: prev_checkpoint,
422 target,
423 });
424
425 if let Err(err) = stage.execute_ready(exec_input).await {
426 self.event_sender.notify(PipelineEvent::Error { stage_id });
427
428 match on_stage_error(&self.provider_factory, stage_id, prev_checkpoint, err)? {
429 Some(ctrl) => return Ok(ctrl),
430 None => continue,
431 };
432 }
433
434 let provider_rw = self.provider_factory.database_provider_rw()?;
435
436 self.event_sender.notify(PipelineEvent::Run {
437 pipeline_stages_progress: PipelineStagesProgress {
438 current: stage_index + 1,
439 total: total_stages,
440 },
441 stage_id,
442 checkpoint: prev_checkpoint,
443 target,
444 });
445
446 match stage.execute(&provider_rw, exec_input) {
447 Ok(out @ ExecOutput { checkpoint, done }) => {
448 made_progress |=
449 checkpoint.block_number != prev_checkpoint.unwrap_or_default().block_number;
450
451 if let Some(metrics_tx) = &mut self.metrics_tx {
452 let _ = metrics_tx.send(MetricEvent::StageCheckpoint {
453 stage_id,
454 checkpoint,
455 max_block_number: target,
456 });
457 }
458 provider_rw.save_stage_checkpoint(stage_id, checkpoint)?;
459
460 self.event_sender.notify(PipelineEvent::Ran {
461 pipeline_stages_progress: PipelineStagesProgress {
462 current: stage_index + 1,
463 total: total_stages,
464 },
465 stage_id,
466 result: out.clone(),
467 });
468
469 UnifiedStorageWriter::commit(provider_rw)?;
470
471 stage.post_execute_commit()?;
472
473 if done {
474 let block_number = checkpoint.block_number;
475 return Ok(if made_progress {
476 ControlFlow::Continue { block_number }
477 } else {
478 ControlFlow::NoProgress { block_number: Some(block_number) }
479 })
480 }
481 }
482 Err(err) => {
483 drop(provider_rw);
484 self.event_sender.notify(PipelineEvent::Error { stage_id });
485
486 if let Some(ctrl) =
487 on_stage_error(&self.provider_factory, stage_id, prev_checkpoint, err)?
488 {
489 return Ok(ctrl)
490 }
491 }
492 }
493 }
494 }
495}
496
497fn on_stage_error<N: ProviderNodeTypes>(
498 factory: &ProviderFactory<N>,
499 stage_id: StageId,
500 prev_checkpoint: Option<StageCheckpoint>,
501 err: StageError,
502) -> Result<Option<ControlFlow>, PipelineError> {
503 if let StageError::DetachedHead { local_head, header, error } = err {
504 warn!(target: "sync::pipeline", stage = %stage_id, ?local_head, ?header, %error, "Stage encountered detached head");
505
506 let unwind_to =
508 local_head.block.number.saturating_sub(BEACON_CONSENSUS_REORG_UNWIND_DEPTH).max(1);
509 Ok(Some(ControlFlow::Unwind { target: unwind_to, bad_block: local_head }))
510 } else if let StageError::Block { block, error } = err {
511 match error {
512 BlockErrorKind::Validation(validation_error) => {
513 error!(
514 target: "sync::pipeline",
515 stage = %stage_id,
516 bad_block = %block.block.number,
517 "Stage encountered a validation error: {validation_error}"
518 );
519
520 let provider_rw = factory.database_provider_rw()?;
524 provider_rw.save_stage_checkpoint_progress(StageId::MerkleExecute, vec![])?;
525 provider_rw.save_stage_checkpoint(
526 StageId::MerkleExecute,
527 prev_checkpoint.unwrap_or_default(),
528 )?;
529
530 UnifiedStorageWriter::commit(provider_rw)?;
531
532 Ok(Some(ControlFlow::Unwind {
537 target: prev_checkpoint.unwrap_or_default().block_number,
538 bad_block: block,
539 }))
540 }
541 BlockErrorKind::Execution(execution_error) => {
542 error!(
543 target: "sync::pipeline",
544 stage = %stage_id,
545 bad_block = %block.block.number,
546 "Stage encountered an execution error: {execution_error}"
547 );
548
549 Ok(Some(ControlFlow::Unwind {
554 target: prev_checkpoint.unwrap_or_default().block_number,
555 bad_block: block,
556 }))
557 }
558 }
559 } else if let StageError::MissingStaticFileData { block, segment } = err {
560 error!(
561 target: "sync::pipeline",
562 stage = %stage_id,
563 bad_block = %block.block.number,
564 segment = %segment,
565 "Stage is missing static file data."
566 );
567
568 Ok(Some(ControlFlow::Unwind { target: block.block.number - 1, bad_block: block }))
569 } else if err.is_fatal() {
570 error!(target: "sync::pipeline", stage = %stage_id, "Stage encountered a fatal error: {err}");
571 Err(err.into())
572 } else {
573 warn!(
576 target: "sync::pipeline",
577 stage = %stage_id,
578 "Stage encountered a non-fatal error: {err}. Retrying..."
579 );
580 Ok(None)
581 }
582}
583
584impl<N: ProviderNodeTypes> std::fmt::Debug for Pipeline<N> {
585 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
586 f.debug_struct("Pipeline")
587 .field("stages", &self.stages.iter().map(|stage| stage.id()).collect::<Vec<StageId>>())
588 .field("max_block", &self.max_block)
589 .field("event_sender", &self.event_sender)
590 .field("fail_on_unwind", &self.fail_on_unwind)
591 .finish()
592 }
593}
594
595#[cfg(test)]
596mod tests {
597 use std::sync::atomic::Ordering;
598
599 use super::*;
600 use crate::{test_utils::TestStage, UnwindOutput};
601 use assert_matches::assert_matches;
602 use reth_consensus::ConsensusError;
603 use reth_errors::ProviderError;
604 use reth_provider::test_utils::{create_test_provider_factory, MockNodeTypesWithDB};
605 use reth_prune::PruneModes;
606 use reth_testing_utils::generators::{self, random_block_with_parent};
607 use tokio_stream::StreamExt;
608
609 #[test]
610 fn record_progress_calculates_outliers() {
611 let mut progress = PipelineProgress::default();
612
613 progress.update(10);
614 assert_eq!(progress.minimum_block_number, Some(10));
615 assert_eq!(progress.maximum_block_number, Some(10));
616
617 progress.update(20);
618 assert_eq!(progress.minimum_block_number, Some(10));
619 assert_eq!(progress.maximum_block_number, Some(20));
620
621 progress.update(1);
622 assert_eq!(progress.minimum_block_number, Some(1));
623 assert_eq!(progress.maximum_block_number, Some(20));
624 }
625
626 #[test]
627 fn progress_ctrl_flow() {
628 let mut progress = PipelineProgress::default();
629
630 assert_eq!(progress.next_ctrl(), ControlFlow::NoProgress { block_number: None });
631
632 progress.update(1);
633 assert_eq!(progress.next_ctrl(), ControlFlow::Continue { block_number: 1 });
634 }
635
636 #[tokio::test]
638 async fn run_pipeline() {
639 let provider_factory = create_test_provider_factory();
640
641 let stage_a = TestStage::new(StageId::Other("A"))
642 .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(20), done: true }));
643 let (stage_a, post_execute_commit_counter_a) = stage_a.with_post_execute_commit_counter();
644 let (stage_a, post_unwind_commit_counter_a) = stage_a.with_post_unwind_commit_counter();
645
646 let stage_b = TestStage::new(StageId::Other("B"))
647 .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(10), done: true }));
648 let (stage_b, post_execute_commit_counter_b) = stage_b.with_post_execute_commit_counter();
649 let (stage_b, post_unwind_commit_counter_b) = stage_b.with_post_unwind_commit_counter();
650
651 let mut pipeline = Pipeline::<MockNodeTypesWithDB>::builder()
652 .add_stage(stage_a)
653 .add_stage(stage_b)
654 .with_max_block(10)
655 .build(
656 provider_factory.clone(),
657 StaticFileProducer::new(provider_factory.clone(), PruneModes::default()),
658 );
659 let events = pipeline.events();
660
661 tokio::spawn(async move {
663 pipeline.run().await.unwrap();
664 });
665
666 assert_eq!(
668 events.collect::<Vec<PipelineEvent>>().await,
669 vec![
670 PipelineEvent::Prepare {
671 pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 },
672 stage_id: StageId::Other("A"),
673 checkpoint: None,
674 target: Some(10),
675 },
676 PipelineEvent::Run {
677 pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 },
678 stage_id: StageId::Other("A"),
679 checkpoint: None,
680 target: Some(10),
681 },
682 PipelineEvent::Ran {
683 pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 },
684 stage_id: StageId::Other("A"),
685 result: ExecOutput { checkpoint: StageCheckpoint::new(20), done: true },
686 },
687 PipelineEvent::Prepare {
688 pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 },
689 stage_id: StageId::Other("B"),
690 checkpoint: None,
691 target: Some(10),
692 },
693 PipelineEvent::Run {
694 pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 },
695 stage_id: StageId::Other("B"),
696 checkpoint: None,
697 target: Some(10),
698 },
699 PipelineEvent::Ran {
700 pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 },
701 stage_id: StageId::Other("B"),
702 result: ExecOutput { checkpoint: StageCheckpoint::new(10), done: true },
703 },
704 ]
705 );
706
707 assert_eq!(post_execute_commit_counter_a.load(Ordering::Relaxed), 1);
708 assert_eq!(post_unwind_commit_counter_a.load(Ordering::Relaxed), 0);
709
710 assert_eq!(post_execute_commit_counter_b.load(Ordering::Relaxed), 1);
711 assert_eq!(post_unwind_commit_counter_b.load(Ordering::Relaxed), 0);
712 }
713
714 #[tokio::test]
716 async fn unwind_pipeline() {
717 let provider_factory = create_test_provider_factory();
718
719 let stage_a = TestStage::new(StageId::Other("A"))
720 .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(100), done: true }))
721 .add_unwind(Ok(UnwindOutput { checkpoint: StageCheckpoint::new(1) }));
722 let (stage_a, post_execute_commit_counter_a) = stage_a.with_post_execute_commit_counter();
723 let (stage_a, post_unwind_commit_counter_a) = stage_a.with_post_unwind_commit_counter();
724
725 let stage_b = TestStage::new(StageId::Other("B"))
726 .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(10), done: true }))
727 .add_unwind(Ok(UnwindOutput { checkpoint: StageCheckpoint::new(1) }));
728 let (stage_b, post_execute_commit_counter_b) = stage_b.with_post_execute_commit_counter();
729 let (stage_b, post_unwind_commit_counter_b) = stage_b.with_post_unwind_commit_counter();
730
731 let stage_c = TestStage::new(StageId::Other("C"))
732 .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(20), done: true }))
733 .add_unwind(Ok(UnwindOutput { checkpoint: StageCheckpoint::new(1) }));
734 let (stage_c, post_execute_commit_counter_c) = stage_c.with_post_execute_commit_counter();
735 let (stage_c, post_unwind_commit_counter_c) = stage_c.with_post_unwind_commit_counter();
736
737 let mut pipeline = Pipeline::<MockNodeTypesWithDB>::builder()
738 .add_stage(stage_a)
739 .add_stage(stage_b)
740 .add_stage(stage_c)
741 .with_max_block(10)
742 .build(
743 provider_factory.clone(),
744 StaticFileProducer::new(provider_factory.clone(), PruneModes::default()),
745 );
746 let events = pipeline.events();
747
748 tokio::spawn(async move {
750 pipeline.run().await.expect("Could not run pipeline");
752
753 pipeline.unwind(1, None).expect("Could not unwind pipeline");
755 });
756
757 assert_eq!(
759 events.collect::<Vec<PipelineEvent>>().await,
760 vec![
761 PipelineEvent::Prepare {
763 pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 3 },
764 stage_id: StageId::Other("A"),
765 checkpoint: None,
766 target: Some(10),
767 },
768 PipelineEvent::Run {
769 pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 3 },
770 stage_id: StageId::Other("A"),
771 checkpoint: None,
772 target: Some(10),
773 },
774 PipelineEvent::Ran {
775 pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 3 },
776 stage_id: StageId::Other("A"),
777 result: ExecOutput { checkpoint: StageCheckpoint::new(100), done: true },
778 },
779 PipelineEvent::Prepare {
780 pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 3 },
781 stage_id: StageId::Other("B"),
782 checkpoint: None,
783 target: Some(10),
784 },
785 PipelineEvent::Run {
786 pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 3 },
787 stage_id: StageId::Other("B"),
788 checkpoint: None,
789 target: Some(10),
790 },
791 PipelineEvent::Ran {
792 pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 3 },
793 stage_id: StageId::Other("B"),
794 result: ExecOutput { checkpoint: StageCheckpoint::new(10), done: true },
795 },
796 PipelineEvent::Prepare {
797 pipeline_stages_progress: PipelineStagesProgress { current: 3, total: 3 },
798 stage_id: StageId::Other("C"),
799 checkpoint: None,
800 target: Some(10),
801 },
802 PipelineEvent::Run {
803 pipeline_stages_progress: PipelineStagesProgress { current: 3, total: 3 },
804 stage_id: StageId::Other("C"),
805 checkpoint: None,
806 target: Some(10),
807 },
808 PipelineEvent::Ran {
809 pipeline_stages_progress: PipelineStagesProgress { current: 3, total: 3 },
810 stage_id: StageId::Other("C"),
811 result: ExecOutput { checkpoint: StageCheckpoint::new(20), done: true },
812 },
813 PipelineEvent::Unwind {
815 stage_id: StageId::Other("C"),
816 input: UnwindInput {
817 checkpoint: StageCheckpoint::new(20),
818 unwind_to: 1,
819 bad_block: None
820 }
821 },
822 PipelineEvent::Unwound {
823 stage_id: StageId::Other("C"),
824 result: UnwindOutput { checkpoint: StageCheckpoint::new(1) },
825 },
826 PipelineEvent::Unwind {
827 stage_id: StageId::Other("B"),
828 input: UnwindInput {
829 checkpoint: StageCheckpoint::new(10),
830 unwind_to: 1,
831 bad_block: None
832 }
833 },
834 PipelineEvent::Unwound {
835 stage_id: StageId::Other("B"),
836 result: UnwindOutput { checkpoint: StageCheckpoint::new(1) },
837 },
838 PipelineEvent::Unwind {
839 stage_id: StageId::Other("A"),
840 input: UnwindInput {
841 checkpoint: StageCheckpoint::new(100),
842 unwind_to: 1,
843 bad_block: None
844 }
845 },
846 PipelineEvent::Unwound {
847 stage_id: StageId::Other("A"),
848 result: UnwindOutput { checkpoint: StageCheckpoint::new(1) },
849 },
850 ]
851 );
852
853 assert_eq!(post_execute_commit_counter_a.load(Ordering::Relaxed), 1);
854 assert_eq!(post_unwind_commit_counter_a.load(Ordering::Relaxed), 1);
855
856 assert_eq!(post_execute_commit_counter_b.load(Ordering::Relaxed), 1);
857 assert_eq!(post_unwind_commit_counter_b.load(Ordering::Relaxed), 1);
858
859 assert_eq!(post_execute_commit_counter_c.load(Ordering::Relaxed), 1);
860 assert_eq!(post_unwind_commit_counter_c.load(Ordering::Relaxed), 1);
861 }
862
863 #[tokio::test]
865 async fn unwind_pipeline_with_intermediate_progress() {
866 let provider_factory = create_test_provider_factory();
867
868 let mut pipeline = Pipeline::<MockNodeTypesWithDB>::builder()
869 .add_stage(
870 TestStage::new(StageId::Other("A"))
871 .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(100), done: true }))
872 .add_unwind(Ok(UnwindOutput { checkpoint: StageCheckpoint::new(50) })),
873 )
874 .add_stage(
875 TestStage::new(StageId::Other("B"))
876 .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(10), done: true })),
877 )
878 .with_max_block(10)
879 .build(
880 provider_factory.clone(),
881 StaticFileProducer::new(provider_factory.clone(), PruneModes::default()),
882 );
883 let events = pipeline.events();
884
885 tokio::spawn(async move {
887 pipeline.run().await.expect("Could not run pipeline");
889
890 pipeline.unwind(50, None).expect("Could not unwind pipeline");
892 });
893
894 assert_eq!(
896 events.collect::<Vec<PipelineEvent>>().await,
897 vec![
898 PipelineEvent::Prepare {
900 pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 },
901 stage_id: StageId::Other("A"),
902 checkpoint: None,
903 target: Some(10),
904 },
905 PipelineEvent::Run {
906 pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 },
907 stage_id: StageId::Other("A"),
908 checkpoint: None,
909 target: Some(10),
910 },
911 PipelineEvent::Ran {
912 pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 },
913 stage_id: StageId::Other("A"),
914 result: ExecOutput { checkpoint: StageCheckpoint::new(100), done: true },
915 },
916 PipelineEvent::Prepare {
917 pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 },
918 stage_id: StageId::Other("B"),
919 checkpoint: None,
920 target: Some(10),
921 },
922 PipelineEvent::Run {
923 pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 },
924 stage_id: StageId::Other("B"),
925 checkpoint: None,
926 target: Some(10),
927 },
928 PipelineEvent::Ran {
929 pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 },
930 stage_id: StageId::Other("B"),
931 result: ExecOutput { checkpoint: StageCheckpoint::new(10), done: true },
932 },
933 PipelineEvent::Skipped { stage_id: StageId::Other("B") },
936 PipelineEvent::Unwind {
937 stage_id: StageId::Other("A"),
938 input: UnwindInput {
939 checkpoint: StageCheckpoint::new(100),
940 unwind_to: 50,
941 bad_block: None
942 }
943 },
944 PipelineEvent::Unwound {
945 stage_id: StageId::Other("A"),
946 result: UnwindOutput { checkpoint: StageCheckpoint::new(50) },
947 },
948 ]
949 );
950 }
951
952 #[tokio::test]
965 async fn run_pipeline_with_unwind() {
966 let provider_factory = create_test_provider_factory();
967
968 let mut pipeline = Pipeline::<MockNodeTypesWithDB>::builder()
969 .add_stage(
970 TestStage::new(StageId::Other("A"))
971 .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(10), done: true }))
972 .add_unwind(Ok(UnwindOutput { checkpoint: StageCheckpoint::new(0) }))
973 .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(10), done: true })),
974 )
975 .add_stage(
976 TestStage::new(StageId::Other("B"))
977 .add_exec(Err(StageError::Block {
978 block: Box::new(random_block_with_parent(
979 &mut generators::rng(),
980 5,
981 Default::default(),
982 )),
983 error: BlockErrorKind::Validation(ConsensusError::BaseFeeMissing),
984 }))
985 .add_unwind(Ok(UnwindOutput { checkpoint: StageCheckpoint::new(0) }))
986 .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(10), done: true })),
987 )
988 .with_max_block(10)
989 .build(
990 provider_factory.clone(),
991 StaticFileProducer::new(provider_factory.clone(), PruneModes::default()),
992 );
993 let events = pipeline.events();
994
995 tokio::spawn(async move {
997 pipeline.run().await.expect("Could not run pipeline");
998 });
999
1000 assert_eq!(
1002 events.collect::<Vec<PipelineEvent>>().await,
1003 vec![
1004 PipelineEvent::Prepare {
1005 pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 },
1006 stage_id: StageId::Other("A"),
1007 checkpoint: None,
1008 target: Some(10),
1009 },
1010 PipelineEvent::Run {
1011 pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 },
1012 stage_id: StageId::Other("A"),
1013 checkpoint: None,
1014 target: Some(10),
1015 },
1016 PipelineEvent::Ran {
1017 pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 },
1018 stage_id: StageId::Other("A"),
1019 result: ExecOutput { checkpoint: StageCheckpoint::new(10), done: true },
1020 },
1021 PipelineEvent::Prepare {
1022 pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 },
1023 stage_id: StageId::Other("B"),
1024 checkpoint: None,
1025 target: Some(10),
1026 },
1027 PipelineEvent::Run {
1028 pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 },
1029 stage_id: StageId::Other("B"),
1030 checkpoint: None,
1031 target: Some(10),
1032 },
1033 PipelineEvent::Error { stage_id: StageId::Other("B") },
1034 PipelineEvent::Unwind {
1035 stage_id: StageId::Other("A"),
1036 input: UnwindInput {
1037 checkpoint: StageCheckpoint::new(10),
1038 unwind_to: 0,
1039 bad_block: Some(5)
1040 }
1041 },
1042 PipelineEvent::Unwound {
1043 stage_id: StageId::Other("A"),
1044 result: UnwindOutput { checkpoint: StageCheckpoint::new(0) },
1045 },
1046 PipelineEvent::Prepare {
1047 pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 },
1048 stage_id: StageId::Other("A"),
1049 checkpoint: Some(StageCheckpoint::new(0)),
1050 target: Some(10),
1051 },
1052 PipelineEvent::Run {
1053 pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 },
1054 stage_id: StageId::Other("A"),
1055 checkpoint: Some(StageCheckpoint::new(0)),
1056 target: Some(10),
1057 },
1058 PipelineEvent::Ran {
1059 pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 },
1060 stage_id: StageId::Other("A"),
1061 result: ExecOutput { checkpoint: StageCheckpoint::new(10), done: true },
1062 },
1063 PipelineEvent::Prepare {
1064 pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 },
1065 stage_id: StageId::Other("B"),
1066 checkpoint: None,
1067 target: Some(10),
1068 },
1069 PipelineEvent::Run {
1070 pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 },
1071 stage_id: StageId::Other("B"),
1072 checkpoint: None,
1073 target: Some(10),
1074 },
1075 PipelineEvent::Ran {
1076 pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 },
1077 stage_id: StageId::Other("B"),
1078 result: ExecOutput { checkpoint: StageCheckpoint::new(10), done: true },
1079 },
1080 ]
1081 );
1082 }
1083
1084 #[tokio::test]
1086 async fn pipeline_error_handling() {
1087 let provider_factory = create_test_provider_factory();
1089 let mut pipeline = Pipeline::<MockNodeTypesWithDB>::builder()
1090 .add_stage(
1091 TestStage::new(StageId::Other("NonFatal"))
1092 .add_exec(Err(StageError::Recoverable(Box::new(std::fmt::Error))))
1093 .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(10), done: true })),
1094 )
1095 .with_max_block(10)
1096 .build(
1097 provider_factory.clone(),
1098 StaticFileProducer::new(provider_factory.clone(), PruneModes::default()),
1099 );
1100 let result = pipeline.run().await;
1101 assert_matches!(result, Ok(()));
1102
1103 let provider_factory = create_test_provider_factory();
1105 let mut pipeline = Pipeline::<MockNodeTypesWithDB>::builder()
1106 .add_stage(TestStage::new(StageId::Other("Fatal")).add_exec(Err(
1107 StageError::DatabaseIntegrity(ProviderError::BlockBodyIndicesNotFound(5)),
1108 )))
1109 .build(
1110 provider_factory.clone(),
1111 StaticFileProducer::new(provider_factory.clone(), PruneModes::default()),
1112 );
1113 let result = pipeline.run().await;
1114 assert_matches!(
1115 result,
1116 Err(PipelineError::Stage(StageError::DatabaseIntegrity(
1117 ProviderError::BlockBodyIndicesNotFound(5)
1118 )))
1119 );
1120 }
1121}