1mod ctrl;
2mod event;
3pub use crate::pipeline::ctrl::ControlFlow;
4use crate::{PipelineTarget, StageCheckpoint, StageId};
5use alloy_primitives::{BlockNumber, B256};
6pub use event::*;
7use futures_util::Future;
8use reth_primitives_traits::constants::BEACON_CONSENSUS_REORG_UNWIND_DEPTH;
9use reth_provider::{
10 providers::ProviderNodeTypes, writer::UnifiedStorageWriter, ChainStateBlockReader,
11 ChainStateBlockWriter, DatabaseProviderFactory, ProviderFactory, StageCheckpointReader,
12 StageCheckpointWriter,
13};
14use reth_prune::PrunerBuilder;
15use reth_static_file::StaticFileProducer;
16use reth_tokio_util::{EventSender, EventStream};
17use std::pin::Pin;
18use tokio::sync::watch;
19use tracing::*;
20
21mod builder;
22mod progress;
23mod set;
24
25use crate::{
26 BlockErrorKind, ExecInput, ExecOutput, MetricEvent, MetricEventsSender, PipelineError, Stage,
27 StageError, StageExt, UnwindInput,
28};
29pub use builder::*;
30use progress::*;
31use reth_errors::RethResult;
32pub use set::*;
33
34pub(crate) type BoxedStage<DB> = Box<dyn Stage<DB>>;
36
37pub type PipelineFut<N> = Pin<Box<dyn Future<Output = PipelineWithResult<N>> + Send>>;
40
41pub type PipelineWithResult<N> = (Pipeline<N>, Result<ControlFlow, PipelineError>);
43
44#[cfg_attr(doc, aquamarine::aquamarine)]
45pub struct Pipeline<N: ProviderNodeTypes> {
67 provider_factory: ProviderFactory<N>,
69 stages: Vec<BoxedStage<<ProviderFactory<N> as DatabaseProviderFactory>::ProviderRW>>,
71 max_block: Option<BlockNumber>,
73 static_file_producer: StaticFileProducer<ProviderFactory<N>>,
74 event_sender: EventSender<PipelineEvent>,
76 progress: PipelineProgress,
78 tip_tx: Option<watch::Sender<B256>>,
80 metrics_tx: Option<MetricEventsSender>,
81 fail_on_unwind: bool,
84}
85
86impl<N: ProviderNodeTypes> Pipeline<N> {
87 pub fn builder() -> PipelineBuilder<<ProviderFactory<N> as DatabaseProviderFactory>::ProviderRW>
89 {
90 PipelineBuilder::default()
91 }
92
93 pub const fn minimum_block_number(&self) -> Option<u64> {
96 self.progress.minimum_block_number
97 }
98
99 #[track_caller]
101 pub fn set_tip(&self, tip: B256) {
102 let _ = self.tip_tx.as_ref().expect("tip sender is set").send(tip).map_err(|_| {
103 warn!(target: "sync::pipeline", "Chain tip channel closed");
104 });
105 }
106
107 pub fn events(&self) -> EventStream<PipelineEvent> {
109 self.event_sender.new_listener()
110 }
111}
112
113impl<N: ProviderNodeTypes> Pipeline<N> {
114 pub fn register_metrics(&mut self) -> Result<(), PipelineError> {
116 let Some(metrics_tx) = &mut self.metrics_tx else { return Ok(()) };
117 let provider = self.provider_factory.provider()?;
118
119 for stage in &self.stages {
120 let stage_id = stage.id();
121 let _ = metrics_tx.send(MetricEvent::StageCheckpoint {
122 stage_id,
123 checkpoint: provider.get_stage_checkpoint(stage_id)?.unwrap_or_default(),
124 max_block_number: None,
125 });
126 }
127 Ok(())
128 }
129
130 #[track_caller]
133 pub fn run_as_fut(mut self, target: Option<PipelineTarget>) -> PipelineFut<N> {
134 let _ = self.register_metrics();
135 Box::pin(async move {
136 if let Some(target) = target {
138 match target {
139 PipelineTarget::Sync(tip) => self.set_tip(tip),
140 PipelineTarget::Unwind(target) => {
141 if let Err(err) = self.move_to_static_files() {
142 return (self, Err(err.into()))
143 }
144 if let Err(err) = self.unwind(target, None) {
145 return (self, Err(err))
146 }
147 self.progress.update(target);
148
149 return (self, Ok(ControlFlow::Continue { block_number: target }))
150 }
151 }
152 }
153
154 let result = self.run_loop().await;
155 trace!(target: "sync::pipeline", ?target, ?result, "Pipeline finished");
156 (self, result)
157 })
158 }
159
160 pub async fn run(&mut self) -> Result<(), PipelineError> {
163 let _ = self.register_metrics(); loop {
166 let next_action = self.run_loop().await?;
167
168 if next_action.is_unwind() && self.fail_on_unwind {
169 return Err(PipelineError::UnexpectedUnwind)
170 }
171
172 if next_action.should_continue() &&
175 self.progress
176 .minimum_block_number
177 .zip(self.max_block)
178 .is_some_and(|(progress, target)| progress >= target)
179 {
180 trace!(
181 target: "sync::pipeline",
182 ?next_action,
183 minimum_block_number = ?self.progress.minimum_block_number,
184 max_block = ?self.max_block,
185 "Terminating pipeline."
186 );
187 return Ok(())
188 }
189 }
190 }
191
192 pub async fn run_loop(&mut self) -> Result<ControlFlow, PipelineError> {
204 self.move_to_static_files()?;
205
206 let mut previous_stage = None;
207 for stage_index in 0..self.stages.len() {
208 let stage = &self.stages[stage_index];
209 let stage_id = stage.id();
210
211 trace!(target: "sync::pipeline", stage = %stage_id, "Executing stage");
212 let next = self.execute_stage_to_completion(previous_stage, stage_index).await?;
213
214 trace!(target: "sync::pipeline", stage = %stage_id, ?next, "Completed stage");
215
216 match next {
217 ControlFlow::NoProgress { block_number } => {
218 if let Some(block_number) = block_number {
219 self.progress.update(block_number);
220 }
221 }
222 ControlFlow::Continue { block_number } => self.progress.update(block_number),
223 ControlFlow::Unwind { target, bad_block } => {
224 self.unwind(target, Some(bad_block.block.number))?;
225 return Ok(ControlFlow::Unwind { target, bad_block })
226 }
227 }
228
229 previous_stage = Some(
230 self.provider_factory
231 .provider()?
232 .get_stage_checkpoint(stage_id)?
233 .unwrap_or_default()
234 .block_number,
235 );
236 }
237
238 Ok(self.progress.next_ctrl())
239 }
240
241 pub fn move_to_static_files(&self) -> RethResult<()> {
255 let lowest_static_file_height =
257 self.static_file_producer.lock().copy_to_static_files()?.min_block_num();
258
259 if let Some(prune_tip) = lowest_static_file_height {
261 let mut pruner = PrunerBuilder::new(Default::default())
264 .delete_limit(usize::MAX)
265 .build_with_provider_factory(self.provider_factory.clone());
266
267 pruner.run(prune_tip)?;
268 }
269
270 Ok(())
271 }
272
273 pub fn unwind(
277 &mut self,
278 to: BlockNumber,
279 bad_block: Option<BlockNumber>,
280 ) -> Result<(), PipelineError> {
281 let unwind_pipeline = self.stages.iter_mut().rev();
283
284 let _locked_sf_producer = self.static_file_producer.lock();
287
288 let mut provider_rw = self.provider_factory.database_provider_rw()?;
289
290 for stage in unwind_pipeline {
291 let stage_id = stage.id();
292 let span = info_span!("Unwinding", stage = %stage_id);
293 let _enter = span.enter();
294
295 let mut checkpoint = provider_rw.get_stage_checkpoint(stage_id)?.unwrap_or_default();
296 if checkpoint.block_number < to {
297 debug!(
298 target: "sync::pipeline",
299 from = %checkpoint.block_number,
300 %to,
301 "Unwind point too far for stage"
302 );
303 self.event_sender.notify(PipelineEvent::Skipped { stage_id });
304
305 continue
306 }
307
308 info!(
309 target: "sync::pipeline",
310 from = %checkpoint.block_number,
311 %to,
312 ?bad_block,
313 "Starting unwind"
314 );
315 while checkpoint.block_number > to {
316 let input = UnwindInput { checkpoint, unwind_to: to, bad_block };
317 self.event_sender.notify(PipelineEvent::Unwind { stage_id, input });
318
319 let output = stage.unwind(&provider_rw, input);
320 match output {
321 Ok(unwind_output) => {
322 checkpoint = unwind_output.checkpoint;
323 info!(
324 target: "sync::pipeline",
325 stage = %stage_id,
326 unwind_to = to,
327 progress = checkpoint.block_number,
328 done = checkpoint.block_number == to,
329 "Stage unwound"
330 );
331 if let Some(metrics_tx) = &mut self.metrics_tx {
332 let _ = metrics_tx.send(MetricEvent::StageCheckpoint {
333 stage_id,
334 checkpoint,
335 max_block_number: None,
338 });
339 }
340 provider_rw.save_stage_checkpoint(stage_id, checkpoint)?;
341
342 self.event_sender
343 .notify(PipelineEvent::Unwound { stage_id, result: unwind_output });
344
345 let last_saved_finalized_block_number =
347 provider_rw.last_finalized_block_number()?;
348
349 if last_saved_finalized_block_number.is_none() ||
352 Some(checkpoint.block_number) < last_saved_finalized_block_number
353 {
354 provider_rw.save_finalized_block_number(BlockNumber::from(
355 checkpoint.block_number,
356 ))?;
357 }
358
359 UnifiedStorageWriter::commit_unwind(provider_rw)?;
360
361 stage.post_unwind_commit()?;
362
363 provider_rw = self.provider_factory.database_provider_rw()?;
364 }
365 Err(err) => {
366 self.event_sender.notify(PipelineEvent::Error { stage_id });
367
368 return Err(PipelineError::Stage(StageError::Fatal(Box::new(err))))
369 }
370 }
371 }
372 }
373
374 Ok(())
375 }
376
377 async fn execute_stage_to_completion(
378 &mut self,
379 previous_stage: Option<BlockNumber>,
380 stage_index: usize,
381 ) -> Result<ControlFlow, PipelineError> {
382 let total_stages = self.stages.len();
383
384 let stage = &mut self.stages[stage_index];
385 let stage_id = stage.id();
386 let mut made_progress = false;
387 let target = self.max_block.or(previous_stage);
388
389 loop {
390 let prev_checkpoint = self.provider_factory.get_stage_checkpoint(stage_id)?;
391
392 let stage_reached_max_block = prev_checkpoint
393 .zip(self.max_block)
394 .is_some_and(|(prev_progress, target)| prev_progress.block_number >= target);
395 if stage_reached_max_block {
396 warn!(
397 target: "sync::pipeline",
398 stage = %stage_id,
399 max_block = self.max_block,
400 prev_block = prev_checkpoint.map(|progress| progress.block_number),
401 "Stage reached target block, skipping."
402 );
403 self.event_sender.notify(PipelineEvent::Skipped { stage_id });
404
405 return Ok(ControlFlow::NoProgress {
407 block_number: prev_checkpoint.map(|progress| progress.block_number),
408 })
409 }
410
411 let exec_input = ExecInput { target, checkpoint: prev_checkpoint };
412
413 self.event_sender.notify(PipelineEvent::Prepare {
414 pipeline_stages_progress: PipelineStagesProgress {
415 current: stage_index + 1,
416 total: total_stages,
417 },
418 stage_id,
419 checkpoint: prev_checkpoint,
420 target,
421 });
422
423 if let Err(err) = stage.execute_ready(exec_input).await {
424 self.event_sender.notify(PipelineEvent::Error { stage_id });
425
426 match on_stage_error(&self.provider_factory, stage_id, prev_checkpoint, err)? {
427 Some(ctrl) => return Ok(ctrl),
428 None => continue,
429 };
430 }
431
432 let provider_rw = self.provider_factory.database_provider_rw()?;
433
434 self.event_sender.notify(PipelineEvent::Run {
435 pipeline_stages_progress: PipelineStagesProgress {
436 current: stage_index + 1,
437 total: total_stages,
438 },
439 stage_id,
440 checkpoint: prev_checkpoint,
441 target,
442 });
443
444 match stage.execute(&provider_rw, exec_input) {
445 Ok(out @ ExecOutput { checkpoint, done }) => {
446 made_progress |=
447 checkpoint.block_number != prev_checkpoint.unwrap_or_default().block_number;
448
449 if let Some(metrics_tx) = &mut self.metrics_tx {
450 let _ = metrics_tx.send(MetricEvent::StageCheckpoint {
451 stage_id,
452 checkpoint,
453 max_block_number: target,
454 });
455 }
456 provider_rw.save_stage_checkpoint(stage_id, checkpoint)?;
457
458 self.event_sender.notify(PipelineEvent::Ran {
459 pipeline_stages_progress: PipelineStagesProgress {
460 current: stage_index + 1,
461 total: total_stages,
462 },
463 stage_id,
464 result: out.clone(),
465 });
466
467 UnifiedStorageWriter::commit(provider_rw)?;
468
469 stage.post_execute_commit()?;
470
471 if done {
472 let block_number = checkpoint.block_number;
473 return Ok(if made_progress {
474 ControlFlow::Continue { block_number }
475 } else {
476 ControlFlow::NoProgress { block_number: Some(block_number) }
477 })
478 }
479 }
480 Err(err) => {
481 drop(provider_rw);
482 self.event_sender.notify(PipelineEvent::Error { stage_id });
483
484 if let Some(ctrl) =
485 on_stage_error(&self.provider_factory, stage_id, prev_checkpoint, err)?
486 {
487 return Ok(ctrl)
488 }
489 }
490 }
491 }
492 }
493}
494
495fn on_stage_error<N: ProviderNodeTypes>(
496 factory: &ProviderFactory<N>,
497 stage_id: StageId,
498 prev_checkpoint: Option<StageCheckpoint>,
499 err: StageError,
500) -> Result<Option<ControlFlow>, PipelineError> {
501 if let StageError::DetachedHead { local_head, header, error } = err {
502 warn!(target: "sync::pipeline", stage = %stage_id, ?local_head, ?header, %error, "Stage encountered detached head");
503
504 let unwind_to =
506 local_head.block.number.saturating_sub(BEACON_CONSENSUS_REORG_UNWIND_DEPTH).max(1);
507 Ok(Some(ControlFlow::Unwind { target: unwind_to, bad_block: local_head }))
508 } else if let StageError::Block { block, error } = err {
509 match error {
510 BlockErrorKind::Validation(validation_error) => {
511 error!(
512 target: "sync::pipeline",
513 stage = %stage_id,
514 bad_block = %block.block.number,
515 "Stage encountered a validation error: {validation_error}"
516 );
517
518 let provider_rw = factory.database_provider_rw()?;
522 provider_rw.save_stage_checkpoint_progress(StageId::MerkleExecute, vec![])?;
523 provider_rw.save_stage_checkpoint(
524 StageId::MerkleExecute,
525 prev_checkpoint.unwrap_or_default(),
526 )?;
527
528 UnifiedStorageWriter::commit(provider_rw)?;
529
530 Ok(Some(ControlFlow::Unwind {
535 target: prev_checkpoint.unwrap_or_default().block_number,
536 bad_block: block,
537 }))
538 }
539 BlockErrorKind::Execution(execution_error) => {
540 error!(
541 target: "sync::pipeline",
542 stage = %stage_id,
543 bad_block = %block.block.number,
544 "Stage encountered an execution error: {execution_error}"
545 );
546
547 Ok(Some(ControlFlow::Unwind {
552 target: prev_checkpoint.unwrap_or_default().block_number,
553 bad_block: block,
554 }))
555 }
556 }
557 } else if let StageError::MissingStaticFileData { block, segment } = err {
558 error!(
559 target: "sync::pipeline",
560 stage = %stage_id,
561 bad_block = %block.block.number,
562 segment = %segment,
563 "Stage is missing static file data."
564 );
565
566 Ok(Some(ControlFlow::Unwind { target: block.block.number - 1, bad_block: block }))
567 } else if err.is_fatal() {
568 error!(target: "sync::pipeline", stage = %stage_id, "Stage encountered a fatal error: {err}");
569 Err(err.into())
570 } else {
571 warn!(
574 target: "sync::pipeline",
575 stage = %stage_id,
576 "Stage encountered a non-fatal error: {err}. Retrying..."
577 );
578 Ok(None)
579 }
580}
581
582impl<N: ProviderNodeTypes> std::fmt::Debug for Pipeline<N> {
583 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
584 f.debug_struct("Pipeline")
585 .field("stages", &self.stages.iter().map(|stage| stage.id()).collect::<Vec<StageId>>())
586 .field("max_block", &self.max_block)
587 .field("event_sender", &self.event_sender)
588 .field("fail_on_unwind", &self.fail_on_unwind)
589 .finish()
590 }
591}
592
593#[cfg(test)]
594mod tests {
595 use std::sync::atomic::Ordering;
596
597 use super::*;
598 use crate::{test_utils::TestStage, UnwindOutput};
599 use assert_matches::assert_matches;
600 use reth_consensus::ConsensusError;
601 use reth_errors::ProviderError;
602 use reth_provider::test_utils::{create_test_provider_factory, MockNodeTypesWithDB};
603 use reth_prune::PruneModes;
604 use reth_testing_utils::generators::{self, random_block_with_parent};
605 use tokio_stream::StreamExt;
606
607 #[test]
608 fn record_progress_calculates_outliers() {
609 let mut progress = PipelineProgress::default();
610
611 progress.update(10);
612 assert_eq!(progress.minimum_block_number, Some(10));
613 assert_eq!(progress.maximum_block_number, Some(10));
614
615 progress.update(20);
616 assert_eq!(progress.minimum_block_number, Some(10));
617 assert_eq!(progress.maximum_block_number, Some(20));
618
619 progress.update(1);
620 assert_eq!(progress.minimum_block_number, Some(1));
621 assert_eq!(progress.maximum_block_number, Some(20));
622 }
623
624 #[test]
625 fn progress_ctrl_flow() {
626 let mut progress = PipelineProgress::default();
627
628 assert_eq!(progress.next_ctrl(), ControlFlow::NoProgress { block_number: None });
629
630 progress.update(1);
631 assert_eq!(progress.next_ctrl(), ControlFlow::Continue { block_number: 1 });
632 }
633
634 #[tokio::test]
636 async fn run_pipeline() {
637 let provider_factory = create_test_provider_factory();
638
639 let stage_a = TestStage::new(StageId::Other("A"))
640 .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(20), done: true }));
641 let (stage_a, post_execute_commit_counter_a) = stage_a.with_post_execute_commit_counter();
642 let (stage_a, post_unwind_commit_counter_a) = stage_a.with_post_unwind_commit_counter();
643
644 let stage_b = TestStage::new(StageId::Other("B"))
645 .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(10), done: true }));
646 let (stage_b, post_execute_commit_counter_b) = stage_b.with_post_execute_commit_counter();
647 let (stage_b, post_unwind_commit_counter_b) = stage_b.with_post_unwind_commit_counter();
648
649 let mut pipeline = Pipeline::<MockNodeTypesWithDB>::builder()
650 .add_stage(stage_a)
651 .add_stage(stage_b)
652 .with_max_block(10)
653 .build(
654 provider_factory.clone(),
655 StaticFileProducer::new(provider_factory.clone(), PruneModes::default()),
656 );
657 let events = pipeline.events();
658
659 tokio::spawn(async move {
661 pipeline.run().await.unwrap();
662 });
663
664 assert_eq!(
666 events.collect::<Vec<PipelineEvent>>().await,
667 vec![
668 PipelineEvent::Prepare {
669 pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 },
670 stage_id: StageId::Other("A"),
671 checkpoint: None,
672 target: Some(10),
673 },
674 PipelineEvent::Run {
675 pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 },
676 stage_id: StageId::Other("A"),
677 checkpoint: None,
678 target: Some(10),
679 },
680 PipelineEvent::Ran {
681 pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 },
682 stage_id: StageId::Other("A"),
683 result: ExecOutput { checkpoint: StageCheckpoint::new(20), done: true },
684 },
685 PipelineEvent::Prepare {
686 pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 },
687 stage_id: StageId::Other("B"),
688 checkpoint: None,
689 target: Some(10),
690 },
691 PipelineEvent::Run {
692 pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 },
693 stage_id: StageId::Other("B"),
694 checkpoint: None,
695 target: Some(10),
696 },
697 PipelineEvent::Ran {
698 pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 },
699 stage_id: StageId::Other("B"),
700 result: ExecOutput { checkpoint: StageCheckpoint::new(10), done: true },
701 },
702 ]
703 );
704
705 assert_eq!(post_execute_commit_counter_a.load(Ordering::Relaxed), 1);
706 assert_eq!(post_unwind_commit_counter_a.load(Ordering::Relaxed), 0);
707
708 assert_eq!(post_execute_commit_counter_b.load(Ordering::Relaxed), 1);
709 assert_eq!(post_unwind_commit_counter_b.load(Ordering::Relaxed), 0);
710 }
711
712 #[tokio::test]
714 async fn unwind_pipeline() {
715 let provider_factory = create_test_provider_factory();
716
717 let stage_a = TestStage::new(StageId::Other("A"))
718 .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(100), done: true }))
719 .add_unwind(Ok(UnwindOutput { checkpoint: StageCheckpoint::new(1) }));
720 let (stage_a, post_execute_commit_counter_a) = stage_a.with_post_execute_commit_counter();
721 let (stage_a, post_unwind_commit_counter_a) = stage_a.with_post_unwind_commit_counter();
722
723 let stage_b = TestStage::new(StageId::Other("B"))
724 .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(10), done: true }))
725 .add_unwind(Ok(UnwindOutput { checkpoint: StageCheckpoint::new(1) }));
726 let (stage_b, post_execute_commit_counter_b) = stage_b.with_post_execute_commit_counter();
727 let (stage_b, post_unwind_commit_counter_b) = stage_b.with_post_unwind_commit_counter();
728
729 let stage_c = TestStage::new(StageId::Other("C"))
730 .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(20), done: true }))
731 .add_unwind(Ok(UnwindOutput { checkpoint: StageCheckpoint::new(1) }));
732 let (stage_c, post_execute_commit_counter_c) = stage_c.with_post_execute_commit_counter();
733 let (stage_c, post_unwind_commit_counter_c) = stage_c.with_post_unwind_commit_counter();
734
735 let mut pipeline = Pipeline::<MockNodeTypesWithDB>::builder()
736 .add_stage(stage_a)
737 .add_stage(stage_b)
738 .add_stage(stage_c)
739 .with_max_block(10)
740 .build(
741 provider_factory.clone(),
742 StaticFileProducer::new(provider_factory.clone(), PruneModes::default()),
743 );
744 let events = pipeline.events();
745
746 tokio::spawn(async move {
748 pipeline.run().await.expect("Could not run pipeline");
750
751 pipeline.unwind(1, None).expect("Could not unwind pipeline");
753 });
754
755 assert_eq!(
757 events.collect::<Vec<PipelineEvent>>().await,
758 vec![
759 PipelineEvent::Prepare {
761 pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 3 },
762 stage_id: StageId::Other("A"),
763 checkpoint: None,
764 target: Some(10),
765 },
766 PipelineEvent::Run {
767 pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 3 },
768 stage_id: StageId::Other("A"),
769 checkpoint: None,
770 target: Some(10),
771 },
772 PipelineEvent::Ran {
773 pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 3 },
774 stage_id: StageId::Other("A"),
775 result: ExecOutput { checkpoint: StageCheckpoint::new(100), done: true },
776 },
777 PipelineEvent::Prepare {
778 pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 3 },
779 stage_id: StageId::Other("B"),
780 checkpoint: None,
781 target: Some(10),
782 },
783 PipelineEvent::Run {
784 pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 3 },
785 stage_id: StageId::Other("B"),
786 checkpoint: None,
787 target: Some(10),
788 },
789 PipelineEvent::Ran {
790 pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 3 },
791 stage_id: StageId::Other("B"),
792 result: ExecOutput { checkpoint: StageCheckpoint::new(10), done: true },
793 },
794 PipelineEvent::Prepare {
795 pipeline_stages_progress: PipelineStagesProgress { current: 3, total: 3 },
796 stage_id: StageId::Other("C"),
797 checkpoint: None,
798 target: Some(10),
799 },
800 PipelineEvent::Run {
801 pipeline_stages_progress: PipelineStagesProgress { current: 3, total: 3 },
802 stage_id: StageId::Other("C"),
803 checkpoint: None,
804 target: Some(10),
805 },
806 PipelineEvent::Ran {
807 pipeline_stages_progress: PipelineStagesProgress { current: 3, total: 3 },
808 stage_id: StageId::Other("C"),
809 result: ExecOutput { checkpoint: StageCheckpoint::new(20), done: true },
810 },
811 PipelineEvent::Unwind {
813 stage_id: StageId::Other("C"),
814 input: UnwindInput {
815 checkpoint: StageCheckpoint::new(20),
816 unwind_to: 1,
817 bad_block: None
818 }
819 },
820 PipelineEvent::Unwound {
821 stage_id: StageId::Other("C"),
822 result: UnwindOutput { checkpoint: StageCheckpoint::new(1) },
823 },
824 PipelineEvent::Unwind {
825 stage_id: StageId::Other("B"),
826 input: UnwindInput {
827 checkpoint: StageCheckpoint::new(10),
828 unwind_to: 1,
829 bad_block: None
830 }
831 },
832 PipelineEvent::Unwound {
833 stage_id: StageId::Other("B"),
834 result: UnwindOutput { checkpoint: StageCheckpoint::new(1) },
835 },
836 PipelineEvent::Unwind {
837 stage_id: StageId::Other("A"),
838 input: UnwindInput {
839 checkpoint: StageCheckpoint::new(100),
840 unwind_to: 1,
841 bad_block: None
842 }
843 },
844 PipelineEvent::Unwound {
845 stage_id: StageId::Other("A"),
846 result: UnwindOutput { checkpoint: StageCheckpoint::new(1) },
847 },
848 ]
849 );
850
851 assert_eq!(post_execute_commit_counter_a.load(Ordering::Relaxed), 1);
852 assert_eq!(post_unwind_commit_counter_a.load(Ordering::Relaxed), 1);
853
854 assert_eq!(post_execute_commit_counter_b.load(Ordering::Relaxed), 1);
855 assert_eq!(post_unwind_commit_counter_b.load(Ordering::Relaxed), 1);
856
857 assert_eq!(post_execute_commit_counter_c.load(Ordering::Relaxed), 1);
858 assert_eq!(post_unwind_commit_counter_c.load(Ordering::Relaxed), 1);
859 }
860
861 #[tokio::test]
863 async fn unwind_pipeline_with_intermediate_progress() {
864 let provider_factory = create_test_provider_factory();
865
866 let mut pipeline = Pipeline::<MockNodeTypesWithDB>::builder()
867 .add_stage(
868 TestStage::new(StageId::Other("A"))
869 .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(100), done: true }))
870 .add_unwind(Ok(UnwindOutput { checkpoint: StageCheckpoint::new(50) })),
871 )
872 .add_stage(
873 TestStage::new(StageId::Other("B"))
874 .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(10), done: true })),
875 )
876 .with_max_block(10)
877 .build(
878 provider_factory.clone(),
879 StaticFileProducer::new(provider_factory.clone(), PruneModes::default()),
880 );
881 let events = pipeline.events();
882
883 tokio::spawn(async move {
885 pipeline.run().await.expect("Could not run pipeline");
887
888 pipeline.unwind(50, None).expect("Could not unwind pipeline");
890 });
891
892 assert_eq!(
894 events.collect::<Vec<PipelineEvent>>().await,
895 vec![
896 PipelineEvent::Prepare {
898 pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 },
899 stage_id: StageId::Other("A"),
900 checkpoint: None,
901 target: Some(10),
902 },
903 PipelineEvent::Run {
904 pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 },
905 stage_id: StageId::Other("A"),
906 checkpoint: None,
907 target: Some(10),
908 },
909 PipelineEvent::Ran {
910 pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 },
911 stage_id: StageId::Other("A"),
912 result: ExecOutput { checkpoint: StageCheckpoint::new(100), done: true },
913 },
914 PipelineEvent::Prepare {
915 pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 },
916 stage_id: StageId::Other("B"),
917 checkpoint: None,
918 target: Some(10),
919 },
920 PipelineEvent::Run {
921 pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 },
922 stage_id: StageId::Other("B"),
923 checkpoint: None,
924 target: Some(10),
925 },
926 PipelineEvent::Ran {
927 pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 },
928 stage_id: StageId::Other("B"),
929 result: ExecOutput { checkpoint: StageCheckpoint::new(10), done: true },
930 },
931 PipelineEvent::Skipped { stage_id: StageId::Other("B") },
934 PipelineEvent::Unwind {
935 stage_id: StageId::Other("A"),
936 input: UnwindInput {
937 checkpoint: StageCheckpoint::new(100),
938 unwind_to: 50,
939 bad_block: None
940 }
941 },
942 PipelineEvent::Unwound {
943 stage_id: StageId::Other("A"),
944 result: UnwindOutput { checkpoint: StageCheckpoint::new(50) },
945 },
946 ]
947 );
948 }
949
950 #[tokio::test]
963 async fn run_pipeline_with_unwind() {
964 let provider_factory = create_test_provider_factory();
965
966 let mut pipeline = Pipeline::<MockNodeTypesWithDB>::builder()
967 .add_stage(
968 TestStage::new(StageId::Other("A"))
969 .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(10), done: true }))
970 .add_unwind(Ok(UnwindOutput { checkpoint: StageCheckpoint::new(0) }))
971 .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(10), done: true })),
972 )
973 .add_stage(
974 TestStage::new(StageId::Other("B"))
975 .add_exec(Err(StageError::Block {
976 block: Box::new(random_block_with_parent(
977 &mut generators::rng(),
978 5,
979 Default::default(),
980 )),
981 error: BlockErrorKind::Validation(ConsensusError::BaseFeeMissing),
982 }))
983 .add_unwind(Ok(UnwindOutput { checkpoint: StageCheckpoint::new(0) }))
984 .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(10), done: true })),
985 )
986 .with_max_block(10)
987 .build(
988 provider_factory.clone(),
989 StaticFileProducer::new(provider_factory.clone(), PruneModes::default()),
990 );
991 let events = pipeline.events();
992
993 tokio::spawn(async move {
995 pipeline.run().await.expect("Could not run pipeline");
996 });
997
998 assert_eq!(
1000 events.collect::<Vec<PipelineEvent>>().await,
1001 vec![
1002 PipelineEvent::Prepare {
1003 pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 },
1004 stage_id: StageId::Other("A"),
1005 checkpoint: None,
1006 target: Some(10),
1007 },
1008 PipelineEvent::Run {
1009 pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 },
1010 stage_id: StageId::Other("A"),
1011 checkpoint: None,
1012 target: Some(10),
1013 },
1014 PipelineEvent::Ran {
1015 pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 },
1016 stage_id: StageId::Other("A"),
1017 result: ExecOutput { checkpoint: StageCheckpoint::new(10), done: true },
1018 },
1019 PipelineEvent::Prepare {
1020 pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 },
1021 stage_id: StageId::Other("B"),
1022 checkpoint: None,
1023 target: Some(10),
1024 },
1025 PipelineEvent::Run {
1026 pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 },
1027 stage_id: StageId::Other("B"),
1028 checkpoint: None,
1029 target: Some(10),
1030 },
1031 PipelineEvent::Error { stage_id: StageId::Other("B") },
1032 PipelineEvent::Unwind {
1033 stage_id: StageId::Other("A"),
1034 input: UnwindInput {
1035 checkpoint: StageCheckpoint::new(10),
1036 unwind_to: 0,
1037 bad_block: Some(5)
1038 }
1039 },
1040 PipelineEvent::Unwound {
1041 stage_id: StageId::Other("A"),
1042 result: UnwindOutput { checkpoint: StageCheckpoint::new(0) },
1043 },
1044 PipelineEvent::Prepare {
1045 pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 },
1046 stage_id: StageId::Other("A"),
1047 checkpoint: Some(StageCheckpoint::new(0)),
1048 target: Some(10),
1049 },
1050 PipelineEvent::Run {
1051 pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 },
1052 stage_id: StageId::Other("A"),
1053 checkpoint: Some(StageCheckpoint::new(0)),
1054 target: Some(10),
1055 },
1056 PipelineEvent::Ran {
1057 pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 },
1058 stage_id: StageId::Other("A"),
1059 result: ExecOutput { checkpoint: StageCheckpoint::new(10), done: true },
1060 },
1061 PipelineEvent::Prepare {
1062 pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 },
1063 stage_id: StageId::Other("B"),
1064 checkpoint: None,
1065 target: Some(10),
1066 },
1067 PipelineEvent::Run {
1068 pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 },
1069 stage_id: StageId::Other("B"),
1070 checkpoint: None,
1071 target: Some(10),
1072 },
1073 PipelineEvent::Ran {
1074 pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 },
1075 stage_id: StageId::Other("B"),
1076 result: ExecOutput { checkpoint: StageCheckpoint::new(10), done: true },
1077 },
1078 ]
1079 );
1080 }
1081
1082 #[tokio::test]
1084 async fn pipeline_error_handling() {
1085 let provider_factory = create_test_provider_factory();
1087 let mut pipeline = Pipeline::<MockNodeTypesWithDB>::builder()
1088 .add_stage(
1089 TestStage::new(StageId::Other("NonFatal"))
1090 .add_exec(Err(StageError::Recoverable(Box::new(std::fmt::Error))))
1091 .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(10), done: true })),
1092 )
1093 .with_max_block(10)
1094 .build(
1095 provider_factory.clone(),
1096 StaticFileProducer::new(provider_factory.clone(), PruneModes::default()),
1097 );
1098 let result = pipeline.run().await;
1099 assert_matches!(result, Ok(()));
1100
1101 let provider_factory = create_test_provider_factory();
1103 let mut pipeline = Pipeline::<MockNodeTypesWithDB>::builder()
1104 .add_stage(TestStage::new(StageId::Other("Fatal")).add_exec(Err(
1105 StageError::DatabaseIntegrity(ProviderError::BlockBodyIndicesNotFound(5)),
1106 )))
1107 .build(
1108 provider_factory.clone(),
1109 StaticFileProducer::new(provider_factory.clone(), PruneModes::default()),
1110 );
1111 let result = pipeline.run().await;
1112 assert_matches!(
1113 result,
1114 Err(PipelineError::Stage(StageError::DatabaseIntegrity(
1115 ProviderError::BlockBodyIndicesNotFound(5)
1116 )))
1117 );
1118 }
1119}