1mod ctrl;
2mod event;
3pub use crate::pipeline::ctrl::ControlFlow;
4use crate::{PipelineTarget, StageCheckpoint, StageId};
5use alloy_primitives::{BlockNumber, B256};
6pub use event::*;
7use futures_util::Future;
8use reth_primitives_traits::constants::BEACON_CONSENSUS_REORG_UNWIND_DEPTH;
9use reth_provider::{
10 providers::ProviderNodeTypes, BlockHashReader, BlockNumReader, ChainStateBlockReader,
11 ChainStateBlockWriter, DBProvider, DatabaseProviderFactory, ProviderFactory,
12 PruneCheckpointReader, StageCheckpointReader, StageCheckpointWriter,
13};
14use reth_prune::PrunerBuilder;
15use reth_static_file::StaticFileProducer;
16use reth_tokio_util::{EventSender, EventStream};
17use std::{
18 pin::Pin,
19 time::{Duration, Instant},
20};
21use tokio::sync::watch;
22use tracing::*;
23
24mod builder;
25mod progress;
26mod set;
27
28use crate::{
29 BlockErrorKind, ExecInput, ExecOutput, MetricEvent, MetricEventsSender, PipelineError, Stage,
30 StageError, StageExt, UnwindInput,
31};
32pub use builder::*;
33use progress::*;
34use reth_errors::RethResult;
35pub use set::*;
36
37pub(crate) type BoxedStage<DB> = Box<dyn Stage<DB>>;
39
40pub type PipelineFut<N> = Pin<Box<dyn Future<Output = PipelineWithResult<N>> + Send>>;
43
44pub type PipelineWithResult<N> = (Pipeline<N>, Result<ControlFlow, PipelineError>);
46
47#[cfg_attr(doc, aquamarine::aquamarine)]
48pub struct Pipeline<N: ProviderNodeTypes> {
70 provider_factory: ProviderFactory<N>,
72 stages: Vec<BoxedStage<<ProviderFactory<N> as DatabaseProviderFactory>::ProviderRW>>,
74 max_block: Option<BlockNumber>,
76 static_file_producer: StaticFileProducer<ProviderFactory<N>>,
77 event_sender: EventSender<PipelineEvent>,
79 progress: PipelineProgress,
81 tip_tx: Option<watch::Sender<B256>>,
85 metrics_tx: Option<MetricEventsSender>,
86 fail_on_unwind: bool,
89 last_detached_head_unwind_target: Option<B256>,
92 detached_head_attempts: u64,
95}
96
97impl<N: ProviderNodeTypes> Pipeline<N> {
98 pub fn builder() -> PipelineBuilder<<ProviderFactory<N> as DatabaseProviderFactory>::ProviderRW>
100 {
101 PipelineBuilder::default()
102 }
103
104 pub const fn minimum_block_number(&self) -> Option<u64> {
107 self.progress.minimum_block_number
108 }
109
110 #[track_caller]
112 pub fn set_tip(&self, tip: B256) {
113 let _ = self.tip_tx.as_ref().expect("tip sender is set").send(tip).map_err(|_| {
114 warn!(target: "sync::pipeline", "Chain tip channel closed");
115 });
116 }
117
118 pub fn events(&self) -> EventStream<PipelineEvent> {
120 self.event_sender.new_listener()
121 }
122
123 pub fn stage(
125 &mut self,
126 idx: usize,
127 ) -> &mut dyn Stage<<ProviderFactory<N> as DatabaseProviderFactory>::ProviderRW> {
128 &mut self.stages[idx]
129 }
130}
131
132impl<N: ProviderNodeTypes> Pipeline<N> {
133 pub fn register_metrics(&mut self) -> Result<(), PipelineError> {
135 let Some(metrics_tx) = &mut self.metrics_tx else { return Ok(()) };
136 let provider = self.provider_factory.provider()?;
137
138 for stage in &self.stages {
139 let stage_id = stage.id();
140 let _ = metrics_tx.send(MetricEvent::StageCheckpoint {
141 stage_id,
142 checkpoint: provider.get_stage_checkpoint(stage_id)?.unwrap_or_default(),
143 max_block_number: None,
144 elapsed: Duration::default(),
145 });
146 }
147 Ok(())
148 }
149
150 #[track_caller]
153 pub fn run_as_fut(mut self, target: Option<PipelineTarget>) -> PipelineFut<N> {
154 let _ = self.register_metrics();
155 Box::pin(async move {
156 if let Some(target) = target {
158 match target {
159 PipelineTarget::Sync(tip) => self.set_tip(tip),
160 PipelineTarget::Unwind(target) => {
161 if let Err(err) = self.move_to_static_files() {
162 return (self, Err(err.into()))
163 }
164 if let Err(err) = self.unwind(target, None) {
165 return (self, Err(err))
166 }
167 self.progress.update(target);
168
169 return (self, Ok(ControlFlow::Continue { block_number: target }))
170 }
171 }
172 }
173
174 let result = self.run_loop().await;
175 trace!(target: "sync::pipeline", ?target, ?result, "Pipeline finished");
176 (self, result)
177 })
178 }
179
180 pub async fn run(&mut self) -> Result<(), PipelineError> {
183 let _ = self.register_metrics(); loop {
186 let next_action = self.run_loop().await?;
187
188 if next_action.is_unwind() && self.fail_on_unwind {
189 return Err(PipelineError::UnexpectedUnwind)
190 }
191
192 if next_action.should_continue() &&
195 self.progress
196 .minimum_block_number
197 .zip(self.max_block)
198 .is_some_and(|(progress, target)| progress >= target)
199 {
200 trace!(
201 target: "sync::pipeline",
202 ?next_action,
203 minimum_block_number = ?self.progress.minimum_block_number,
204 max_block = ?self.max_block,
205 "Terminating pipeline."
206 );
207 return Ok(())
208 }
209 }
210 }
211
212 pub async fn run_loop(&mut self) -> Result<ControlFlow, PipelineError> {
224 self.move_to_static_files()?;
225
226 let mut previous_stage = None;
227 for stage_index in 0..self.stages.len() {
228 let stage = &self.stages[stage_index];
229 let stage_id = stage.id();
230
231 trace!(target: "sync::pipeline", stage = %stage_id, "Executing stage");
232 let next = self.execute_stage_to_completion(previous_stage, stage_index).await?;
233
234 trace!(target: "sync::pipeline", stage = %stage_id, ?next, "Completed stage");
235
236 match next {
237 ControlFlow::NoProgress { block_number } => {
238 if let Some(block_number) = block_number {
239 self.progress.update(block_number);
240 }
241 }
242 ControlFlow::Continue { block_number } => self.progress.update(block_number),
243 ControlFlow::Unwind { target, bad_block } => {
244 self.unwind(target, Some(bad_block.block.number))?;
245 return Ok(ControlFlow::Unwind { target, bad_block })
246 }
247 }
248
249 previous_stage = Some(
250 self.provider_factory
251 .provider()?
252 .get_stage_checkpoint(stage_id)?
253 .unwrap_or_default()
254 .block_number,
255 );
256 }
257
258 Ok(self.progress.next_ctrl())
259 }
260
261 pub fn move_to_static_files(&self) -> RethResult<()> {
275 let lowest_static_file_height =
277 self.static_file_producer.lock().copy_to_static_files()?.min_block_num();
278
279 if let Some(prune_tip) = lowest_static_file_height {
281 let mut pruner = PrunerBuilder::new(Default::default())
284 .delete_limit(usize::MAX)
285 .build_with_provider_factory(self.provider_factory.clone());
286
287 pruner.run(prune_tip)?;
288 }
289
290 Ok(())
291 }
292
293 pub fn unwind(
297 &mut self,
298 to: BlockNumber,
299 bad_block: Option<BlockNumber>,
300 ) -> Result<(), PipelineError> {
301 let provider = self.provider_factory.provider()?;
303 let latest_block = provider.last_block_number()?;
304
305 let prune_modes = provider.prune_modes_ref();
307
308 let checkpoints = provider.get_prune_checkpoints()?;
309 prune_modes.ensure_unwind_target_unpruned(latest_block, to, &checkpoints)?;
310
311 let unwind_pipeline = self.stages.iter_mut().rev();
313
314 let _locked_sf_producer = self.static_file_producer.lock();
317
318 let mut provider_rw = self.provider_factory.database_provider_rw()?;
319
320 for stage in unwind_pipeline {
321 let stage_id = stage.id();
322 let span = info_span!("Unwinding", stage = %stage_id);
323 let _enter = span.enter();
324
325 let mut checkpoint = provider_rw.get_stage_checkpoint(stage_id)?.unwrap_or_default();
326 if checkpoint.block_number < to {
327 debug!(
328 target: "sync::pipeline",
329 from = %checkpoint.block_number,
330 %to,
331 "Unwind point too far for stage"
332 );
333 self.event_sender.notify(PipelineEvent::Skipped { stage_id });
334
335 continue
336 }
337
338 info!(
339 target: "sync::pipeline",
340 from = %checkpoint.block_number,
341 %to,
342 ?bad_block,
343 "Starting unwind"
344 );
345 while checkpoint.block_number > to {
346 let unwind_started_at = Instant::now();
347 let input = UnwindInput { checkpoint, unwind_to: to, bad_block };
348 self.event_sender.notify(PipelineEvent::Unwind { stage_id, input });
349
350 let output = stage.unwind(&provider_rw, input);
351 match output {
352 Ok(unwind_output) => {
353 checkpoint = unwind_output.checkpoint;
354 info!(
355 target: "sync::pipeline",
356 stage = %stage_id,
357 unwind_to = to,
358 progress = checkpoint.block_number,
359 done = checkpoint.block_number == to,
360 "Stage unwound"
361 );
362
363 provider_rw.save_stage_checkpoint(stage_id, checkpoint)?;
364
365 self.event_sender
367 .notify(PipelineEvent::Unwound { stage_id, result: unwind_output });
368
369 if let Some(metrics_tx) = &mut self.metrics_tx {
370 let _ = metrics_tx.send(MetricEvent::StageCheckpoint {
371 stage_id,
372 checkpoint,
373 max_block_number: None,
376 elapsed: unwind_started_at.elapsed(),
377 });
378 }
379
380 let last_saved_finalized_block_number =
382 provider_rw.last_finalized_block_number()?;
383
384 if last_saved_finalized_block_number.is_none() ||
387 Some(checkpoint.block_number) < last_saved_finalized_block_number
388 {
389 provider_rw.save_finalized_block_number(BlockNumber::from(
390 checkpoint.block_number,
391 ))?;
392 }
393
394 provider_rw.commit()?;
395
396 stage.post_unwind_commit()?;
397
398 provider_rw = self.provider_factory.database_provider_rw()?;
399 }
400 Err(err) => {
401 self.event_sender.notify(PipelineEvent::Error { stage_id });
402
403 return Err(PipelineError::Stage(StageError::Fatal(Box::new(err))))
404 }
405 }
406 }
407 }
408
409 Ok(())
410 }
411
412 async fn execute_stage_to_completion(
413 &mut self,
414 previous_stage: Option<BlockNumber>,
415 stage_index: usize,
416 ) -> Result<ControlFlow, PipelineError> {
417 let total_stages = self.stages.len();
418
419 let stage_id = self.stage(stage_index).id();
420 let mut made_progress = false;
421 let target = self.max_block.or(previous_stage);
422
423 loop {
424 let prev_checkpoint = self.provider_factory.get_stage_checkpoint(stage_id)?;
425
426 let stage_reached_max_block = prev_checkpoint
427 .zip(self.max_block)
428 .is_some_and(|(prev_progress, target)| prev_progress.block_number >= target);
429 if stage_reached_max_block {
430 warn!(
431 target: "sync::pipeline",
432 stage = %stage_id,
433 max_block = self.max_block,
434 prev_block = prev_checkpoint.map(|progress| progress.block_number),
435 "Stage reached target block, skipping."
436 );
437 self.event_sender.notify(PipelineEvent::Skipped { stage_id });
438
439 return Ok(ControlFlow::NoProgress {
441 block_number: prev_checkpoint.map(|progress| progress.block_number),
442 })
443 }
444
445 let exec_input = ExecInput { target, checkpoint: prev_checkpoint };
446
447 self.event_sender.notify(PipelineEvent::Prepare {
448 pipeline_stages_progress: PipelineStagesProgress {
449 current: stage_index + 1,
450 total: total_stages,
451 },
452 stage_id,
453 checkpoint: prev_checkpoint,
454 target,
455 });
456
457 if let Err(err) = self.stage(stage_index).execute_ready(exec_input).await {
458 self.event_sender.notify(PipelineEvent::Error { stage_id });
459 match self.on_stage_error(stage_id, prev_checkpoint, err)? {
460 Some(ctrl) => return Ok(ctrl),
461 None => continue,
462 };
463 }
464
465 let stage_started_at = Instant::now();
466 let provider_rw = self.provider_factory.database_provider_rw()?;
467
468 self.event_sender.notify(PipelineEvent::Run {
469 pipeline_stages_progress: PipelineStagesProgress {
470 current: stage_index + 1,
471 total: total_stages,
472 },
473 stage_id,
474 checkpoint: prev_checkpoint,
475 target,
476 });
477
478 match self.stage(stage_index).execute(&provider_rw, exec_input) {
479 Ok(out @ ExecOutput { checkpoint, done }) => {
480 provider_rw.save_stage_checkpoint(stage_id, checkpoint)?;
482
483 provider_rw.commit()?;
485
486 self.stage(stage_index).post_execute_commit()?;
488
489 self.event_sender.notify(PipelineEvent::Ran {
491 pipeline_stages_progress: PipelineStagesProgress {
492 current: stage_index + 1,
493 total: total_stages,
494 },
495 stage_id,
496 result: out.clone(),
497 });
498 if let Some(metrics_tx) = &mut self.metrics_tx {
499 let _ = metrics_tx.send(MetricEvent::StageCheckpoint {
500 stage_id,
501 checkpoint,
502 max_block_number: target,
503 elapsed: stage_started_at.elapsed(),
504 });
505 }
506
507 let block_number = checkpoint.block_number;
508 let prev_block_number = prev_checkpoint.unwrap_or_default().block_number;
509 made_progress |= block_number != prev_block_number;
510 if done {
511 return Ok(if made_progress {
512 ControlFlow::Continue { block_number }
513 } else {
514 ControlFlow::NoProgress { block_number: Some(block_number) }
515 })
516 }
517 }
518 Err(err) => {
519 drop(provider_rw);
520 self.event_sender.notify(PipelineEvent::Error { stage_id });
521
522 if let Some(ctrl) = self.on_stage_error(stage_id, prev_checkpoint, err)? {
523 return Ok(ctrl)
524 }
525 }
526 }
527 }
528 }
529
530 fn on_stage_error(
531 &mut self,
532 stage_id: StageId,
533 prev_checkpoint: Option<StageCheckpoint>,
534 err: StageError,
535 ) -> Result<Option<ControlFlow>, PipelineError> {
536 if let StageError::DetachedHead { local_head, header, error } = err {
537 warn!(target: "sync::pipeline", stage = %stage_id, ?local_head, ?header, %error, "Stage encountered detached head");
538
539 if let Some(last_detached_head_unwind_target) = self.last_detached_head_unwind_target {
540 if local_head.block.hash == last_detached_head_unwind_target &&
541 header.block.number == local_head.block.number + 1
542 {
543 self.detached_head_attempts += 1;
544 } else {
545 self.detached_head_attempts = 1;
546 }
547 } else {
548 self.detached_head_attempts = 1;
549 }
550
551 let unwind_to = local_head
553 .block
554 .number
555 .saturating_sub(
556 BEACON_CONSENSUS_REORG_UNWIND_DEPTH.saturating_mul(self.detached_head_attempts),
557 )
558 .max(1);
559
560 self.last_detached_head_unwind_target = self.provider_factory.block_hash(unwind_to)?;
561 Ok(Some(ControlFlow::Unwind { target: unwind_to, bad_block: local_head }))
562 } else if let StageError::Block { block, error } = err {
563 match error {
564 BlockErrorKind::Validation(validation_error) => {
565 error!(
566 target: "sync::pipeline",
567 stage = %stage_id,
568 bad_block = %block.block.number,
569 "Stage encountered a validation error: {validation_error}"
570 );
571
572 if stage_id == StageId::MerkleExecute {
577 let provider_rw = self.provider_factory.database_provider_rw()?;
578 provider_rw
579 .save_stage_checkpoint_progress(StageId::MerkleExecute, vec![])?;
580 provider_rw.save_stage_checkpoint(
581 StageId::MerkleExecute,
582 prev_checkpoint.unwrap_or_default(),
583 )?;
584
585 provider_rw.commit()?;
586 }
587
588 Ok(Some(ControlFlow::Unwind {
593 target: prev_checkpoint.unwrap_or_default().block_number,
594 bad_block: block,
595 }))
596 }
597 BlockErrorKind::Execution(execution_error) => {
598 error!(
599 target: "sync::pipeline",
600 stage = %stage_id,
601 bad_block = %block.block.number,
602 "Stage encountered an execution error: {execution_error}"
603 );
604
605 Ok(Some(ControlFlow::Unwind {
610 target: prev_checkpoint.unwrap_or_default().block_number,
611 bad_block: block,
612 }))
613 }
614 }
615 } else if let StageError::MissingStaticFileData { block, segment } = err {
616 error!(
617 target: "sync::pipeline",
618 stage = %stage_id,
619 bad_block = %block.block.number,
620 segment = %segment,
621 "Stage is missing static file data."
622 );
623
624 Ok(Some(ControlFlow::Unwind {
625 target: block.block.number.saturating_sub(1),
626 bad_block: block,
627 }))
628 } else if err.is_fatal() {
629 error!(target: "sync::pipeline", stage = %stage_id, "Stage encountered a fatal error: {err}");
630 Err(err.into())
631 } else {
632 warn!(
635 target: "sync::pipeline",
636 stage = %stage_id,
637 "Stage encountered a non-fatal error: {err}. Retrying..."
638 );
639 Ok(None)
640 }
641 }
642}
643
644impl<N: ProviderNodeTypes> std::fmt::Debug for Pipeline<N> {
645 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
646 f.debug_struct("Pipeline")
647 .field("stages", &self.stages.iter().map(|stage| stage.id()).collect::<Vec<StageId>>())
648 .field("max_block", &self.max_block)
649 .field("event_sender", &self.event_sender)
650 .field("fail_on_unwind", &self.fail_on_unwind)
651 .finish()
652 }
653}
654
655#[cfg(test)]
656mod tests {
657 use std::sync::atomic::Ordering;
658
659 use super::*;
660 use crate::{test_utils::TestStage, UnwindOutput};
661 use assert_matches::assert_matches;
662 use reth_consensus::ConsensusError;
663 use reth_errors::ProviderError;
664 use reth_provider::test_utils::{create_test_provider_factory, MockNodeTypesWithDB};
665 use reth_prune::PruneModes;
666 use reth_testing_utils::generators::{self, random_block_with_parent};
667 use tokio_stream::StreamExt;
668
669 #[test]
670 fn record_progress_calculates_outliers() {
671 let mut progress = PipelineProgress::default();
672
673 progress.update(10);
674 assert_eq!(progress.minimum_block_number, Some(10));
675 assert_eq!(progress.maximum_block_number, Some(10));
676
677 progress.update(20);
678 assert_eq!(progress.minimum_block_number, Some(10));
679 assert_eq!(progress.maximum_block_number, Some(20));
680
681 progress.update(1);
682 assert_eq!(progress.minimum_block_number, Some(1));
683 assert_eq!(progress.maximum_block_number, Some(20));
684 }
685
686 #[test]
687 fn progress_ctrl_flow() {
688 let mut progress = PipelineProgress::default();
689
690 assert_eq!(progress.next_ctrl(), ControlFlow::NoProgress { block_number: None });
691
692 progress.update(1);
693 assert_eq!(progress.next_ctrl(), ControlFlow::Continue { block_number: 1 });
694 }
695
696 #[tokio::test]
698 async fn run_pipeline() {
699 let provider_factory = create_test_provider_factory();
700
701 let stage_a = TestStage::new(StageId::Other("A"))
702 .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(20), done: true }));
703 let (stage_a, post_execute_commit_counter_a) = stage_a.with_post_execute_commit_counter();
704 let (stage_a, post_unwind_commit_counter_a) = stage_a.with_post_unwind_commit_counter();
705
706 let stage_b = TestStage::new(StageId::Other("B"))
707 .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(10), done: true }));
708 let (stage_b, post_execute_commit_counter_b) = stage_b.with_post_execute_commit_counter();
709 let (stage_b, post_unwind_commit_counter_b) = stage_b.with_post_unwind_commit_counter();
710
711 let mut pipeline = Pipeline::<MockNodeTypesWithDB>::builder()
712 .add_stage(stage_a)
713 .add_stage(stage_b)
714 .with_max_block(10)
715 .build(
716 provider_factory.clone(),
717 StaticFileProducer::new(provider_factory.clone(), PruneModes::default()),
718 );
719 let events = pipeline.events();
720
721 tokio::spawn(async move {
723 pipeline.run().await.unwrap();
724 });
725
726 assert_eq!(
728 events.collect::<Vec<PipelineEvent>>().await,
729 vec![
730 PipelineEvent::Prepare {
731 pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 },
732 stage_id: StageId::Other("A"),
733 checkpoint: None,
734 target: Some(10),
735 },
736 PipelineEvent::Run {
737 pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 },
738 stage_id: StageId::Other("A"),
739 checkpoint: None,
740 target: Some(10),
741 },
742 PipelineEvent::Ran {
743 pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 },
744 stage_id: StageId::Other("A"),
745 result: ExecOutput { checkpoint: StageCheckpoint::new(20), done: true },
746 },
747 PipelineEvent::Prepare {
748 pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 },
749 stage_id: StageId::Other("B"),
750 checkpoint: None,
751 target: Some(10),
752 },
753 PipelineEvent::Run {
754 pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 },
755 stage_id: StageId::Other("B"),
756 checkpoint: None,
757 target: Some(10),
758 },
759 PipelineEvent::Ran {
760 pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 },
761 stage_id: StageId::Other("B"),
762 result: ExecOutput { checkpoint: StageCheckpoint::new(10), done: true },
763 },
764 ]
765 );
766
767 assert_eq!(post_execute_commit_counter_a.load(Ordering::Relaxed), 1);
768 assert_eq!(post_unwind_commit_counter_a.load(Ordering::Relaxed), 0);
769
770 assert_eq!(post_execute_commit_counter_b.load(Ordering::Relaxed), 1);
771 assert_eq!(post_unwind_commit_counter_b.load(Ordering::Relaxed), 0);
772 }
773
774 #[tokio::test]
776 async fn unwind_pipeline() {
777 let provider_factory = create_test_provider_factory();
778
779 let stage_a = TestStage::new(StageId::Other("A"))
780 .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(100), done: true }))
781 .add_unwind(Ok(UnwindOutput { checkpoint: StageCheckpoint::new(1) }));
782 let (stage_a, post_execute_commit_counter_a) = stage_a.with_post_execute_commit_counter();
783 let (stage_a, post_unwind_commit_counter_a) = stage_a.with_post_unwind_commit_counter();
784
785 let stage_b = TestStage::new(StageId::Other("B"))
786 .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(10), done: true }))
787 .add_unwind(Ok(UnwindOutput { checkpoint: StageCheckpoint::new(1) }));
788 let (stage_b, post_execute_commit_counter_b) = stage_b.with_post_execute_commit_counter();
789 let (stage_b, post_unwind_commit_counter_b) = stage_b.with_post_unwind_commit_counter();
790
791 let stage_c = TestStage::new(StageId::Other("C"))
792 .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(20), done: true }))
793 .add_unwind(Ok(UnwindOutput { checkpoint: StageCheckpoint::new(1) }));
794 let (stage_c, post_execute_commit_counter_c) = stage_c.with_post_execute_commit_counter();
795 let (stage_c, post_unwind_commit_counter_c) = stage_c.with_post_unwind_commit_counter();
796
797 let mut pipeline = Pipeline::<MockNodeTypesWithDB>::builder()
798 .add_stage(stage_a)
799 .add_stage(stage_b)
800 .add_stage(stage_c)
801 .with_max_block(10)
802 .build(
803 provider_factory.clone(),
804 StaticFileProducer::new(provider_factory.clone(), PruneModes::default()),
805 );
806 let events = pipeline.events();
807
808 tokio::spawn(async move {
810 pipeline.run().await.expect("Could not run pipeline");
812
813 pipeline.unwind(1, None).expect("Could not unwind pipeline");
815 });
816
817 assert_eq!(
819 events.collect::<Vec<PipelineEvent>>().await,
820 vec![
821 PipelineEvent::Prepare {
823 pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 3 },
824 stage_id: StageId::Other("A"),
825 checkpoint: None,
826 target: Some(10),
827 },
828 PipelineEvent::Run {
829 pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 3 },
830 stage_id: StageId::Other("A"),
831 checkpoint: None,
832 target: Some(10),
833 },
834 PipelineEvent::Ran {
835 pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 3 },
836 stage_id: StageId::Other("A"),
837 result: ExecOutput { checkpoint: StageCheckpoint::new(100), done: true },
838 },
839 PipelineEvent::Prepare {
840 pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 3 },
841 stage_id: StageId::Other("B"),
842 checkpoint: None,
843 target: Some(10),
844 },
845 PipelineEvent::Run {
846 pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 3 },
847 stage_id: StageId::Other("B"),
848 checkpoint: None,
849 target: Some(10),
850 },
851 PipelineEvent::Ran {
852 pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 3 },
853 stage_id: StageId::Other("B"),
854 result: ExecOutput { checkpoint: StageCheckpoint::new(10), done: true },
855 },
856 PipelineEvent::Prepare {
857 pipeline_stages_progress: PipelineStagesProgress { current: 3, total: 3 },
858 stage_id: StageId::Other("C"),
859 checkpoint: None,
860 target: Some(10),
861 },
862 PipelineEvent::Run {
863 pipeline_stages_progress: PipelineStagesProgress { current: 3, total: 3 },
864 stage_id: StageId::Other("C"),
865 checkpoint: None,
866 target: Some(10),
867 },
868 PipelineEvent::Ran {
869 pipeline_stages_progress: PipelineStagesProgress { current: 3, total: 3 },
870 stage_id: StageId::Other("C"),
871 result: ExecOutput { checkpoint: StageCheckpoint::new(20), done: true },
872 },
873 PipelineEvent::Unwind {
875 stage_id: StageId::Other("C"),
876 input: UnwindInput {
877 checkpoint: StageCheckpoint::new(20),
878 unwind_to: 1,
879 bad_block: None
880 }
881 },
882 PipelineEvent::Unwound {
883 stage_id: StageId::Other("C"),
884 result: UnwindOutput { checkpoint: StageCheckpoint::new(1) },
885 },
886 PipelineEvent::Unwind {
887 stage_id: StageId::Other("B"),
888 input: UnwindInput {
889 checkpoint: StageCheckpoint::new(10),
890 unwind_to: 1,
891 bad_block: None
892 }
893 },
894 PipelineEvent::Unwound {
895 stage_id: StageId::Other("B"),
896 result: UnwindOutput { checkpoint: StageCheckpoint::new(1) },
897 },
898 PipelineEvent::Unwind {
899 stage_id: StageId::Other("A"),
900 input: UnwindInput {
901 checkpoint: StageCheckpoint::new(100),
902 unwind_to: 1,
903 bad_block: None
904 }
905 },
906 PipelineEvent::Unwound {
907 stage_id: StageId::Other("A"),
908 result: UnwindOutput { checkpoint: StageCheckpoint::new(1) },
909 },
910 ]
911 );
912
913 assert_eq!(post_execute_commit_counter_a.load(Ordering::Relaxed), 1);
914 assert_eq!(post_unwind_commit_counter_a.load(Ordering::Relaxed), 1);
915
916 assert_eq!(post_execute_commit_counter_b.load(Ordering::Relaxed), 1);
917 assert_eq!(post_unwind_commit_counter_b.load(Ordering::Relaxed), 1);
918
919 assert_eq!(post_execute_commit_counter_c.load(Ordering::Relaxed), 1);
920 assert_eq!(post_unwind_commit_counter_c.load(Ordering::Relaxed), 1);
921 }
922
923 #[tokio::test]
925 async fn unwind_pipeline_with_intermediate_progress() {
926 let provider_factory = create_test_provider_factory();
927
928 let mut pipeline = Pipeline::<MockNodeTypesWithDB>::builder()
929 .add_stage(
930 TestStage::new(StageId::Other("A"))
931 .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(100), done: true }))
932 .add_unwind(Ok(UnwindOutput { checkpoint: StageCheckpoint::new(50) })),
933 )
934 .add_stage(
935 TestStage::new(StageId::Other("B"))
936 .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(10), done: true })),
937 )
938 .with_max_block(10)
939 .build(
940 provider_factory.clone(),
941 StaticFileProducer::new(provider_factory.clone(), PruneModes::default()),
942 );
943 let events = pipeline.events();
944
945 tokio::spawn(async move {
947 pipeline.run().await.expect("Could not run pipeline");
949
950 pipeline.unwind(50, None).expect("Could not unwind pipeline");
952 });
953
954 assert_eq!(
956 events.collect::<Vec<PipelineEvent>>().await,
957 vec![
958 PipelineEvent::Prepare {
960 pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 },
961 stage_id: StageId::Other("A"),
962 checkpoint: None,
963 target: Some(10),
964 },
965 PipelineEvent::Run {
966 pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 },
967 stage_id: StageId::Other("A"),
968 checkpoint: None,
969 target: Some(10),
970 },
971 PipelineEvent::Ran {
972 pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 },
973 stage_id: StageId::Other("A"),
974 result: ExecOutput { checkpoint: StageCheckpoint::new(100), done: true },
975 },
976 PipelineEvent::Prepare {
977 pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 },
978 stage_id: StageId::Other("B"),
979 checkpoint: None,
980 target: Some(10),
981 },
982 PipelineEvent::Run {
983 pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 },
984 stage_id: StageId::Other("B"),
985 checkpoint: None,
986 target: Some(10),
987 },
988 PipelineEvent::Ran {
989 pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 },
990 stage_id: StageId::Other("B"),
991 result: ExecOutput { checkpoint: StageCheckpoint::new(10), done: true },
992 },
993 PipelineEvent::Skipped { stage_id: StageId::Other("B") },
996 PipelineEvent::Unwind {
997 stage_id: StageId::Other("A"),
998 input: UnwindInput {
999 checkpoint: StageCheckpoint::new(100),
1000 unwind_to: 50,
1001 bad_block: None
1002 }
1003 },
1004 PipelineEvent::Unwound {
1005 stage_id: StageId::Other("A"),
1006 result: UnwindOutput { checkpoint: StageCheckpoint::new(50) },
1007 },
1008 ]
1009 );
1010 }
1011
1012 #[tokio::test]
1025 async fn run_pipeline_with_unwind() {
1026 let provider_factory = create_test_provider_factory();
1027
1028 let mut pipeline = Pipeline::<MockNodeTypesWithDB>::builder()
1029 .add_stage(
1030 TestStage::new(StageId::Other("A"))
1031 .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(10), done: true }))
1032 .add_unwind(Ok(UnwindOutput { checkpoint: StageCheckpoint::new(0) }))
1033 .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(10), done: true })),
1034 )
1035 .add_stage(
1036 TestStage::new(StageId::Other("B"))
1037 .add_exec(Err(StageError::Block {
1038 block: Box::new(random_block_with_parent(
1039 &mut generators::rng(),
1040 5,
1041 Default::default(),
1042 )),
1043 error: BlockErrorKind::Validation(ConsensusError::BaseFeeMissing),
1044 }))
1045 .add_unwind(Ok(UnwindOutput { checkpoint: StageCheckpoint::new(0) }))
1046 .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(10), done: true })),
1047 )
1048 .with_max_block(10)
1049 .build(
1050 provider_factory.clone(),
1051 StaticFileProducer::new(provider_factory.clone(), PruneModes::default()),
1052 );
1053 let events = pipeline.events();
1054
1055 tokio::spawn(async move {
1057 pipeline.run().await.expect("Could not run pipeline");
1058 });
1059
1060 assert_eq!(
1062 events.collect::<Vec<PipelineEvent>>().await,
1063 vec![
1064 PipelineEvent::Prepare {
1065 pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 },
1066 stage_id: StageId::Other("A"),
1067 checkpoint: None,
1068 target: Some(10),
1069 },
1070 PipelineEvent::Run {
1071 pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 },
1072 stage_id: StageId::Other("A"),
1073 checkpoint: None,
1074 target: Some(10),
1075 },
1076 PipelineEvent::Ran {
1077 pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 },
1078 stage_id: StageId::Other("A"),
1079 result: ExecOutput { checkpoint: StageCheckpoint::new(10), done: true },
1080 },
1081 PipelineEvent::Prepare {
1082 pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 },
1083 stage_id: StageId::Other("B"),
1084 checkpoint: None,
1085 target: Some(10),
1086 },
1087 PipelineEvent::Run {
1088 pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 },
1089 stage_id: StageId::Other("B"),
1090 checkpoint: None,
1091 target: Some(10),
1092 },
1093 PipelineEvent::Error { stage_id: StageId::Other("B") },
1094 PipelineEvent::Unwind {
1095 stage_id: StageId::Other("A"),
1096 input: UnwindInput {
1097 checkpoint: StageCheckpoint::new(10),
1098 unwind_to: 0,
1099 bad_block: Some(5)
1100 }
1101 },
1102 PipelineEvent::Unwound {
1103 stage_id: StageId::Other("A"),
1104 result: UnwindOutput { checkpoint: StageCheckpoint::new(0) },
1105 },
1106 PipelineEvent::Prepare {
1107 pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 },
1108 stage_id: StageId::Other("A"),
1109 checkpoint: Some(StageCheckpoint::new(0)),
1110 target: Some(10),
1111 },
1112 PipelineEvent::Run {
1113 pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 },
1114 stage_id: StageId::Other("A"),
1115 checkpoint: Some(StageCheckpoint::new(0)),
1116 target: Some(10),
1117 },
1118 PipelineEvent::Ran {
1119 pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 },
1120 stage_id: StageId::Other("A"),
1121 result: ExecOutput { checkpoint: StageCheckpoint::new(10), done: true },
1122 },
1123 PipelineEvent::Prepare {
1124 pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 },
1125 stage_id: StageId::Other("B"),
1126 checkpoint: None,
1127 target: Some(10),
1128 },
1129 PipelineEvent::Run {
1130 pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 },
1131 stage_id: StageId::Other("B"),
1132 checkpoint: None,
1133 target: Some(10),
1134 },
1135 PipelineEvent::Ran {
1136 pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 },
1137 stage_id: StageId::Other("B"),
1138 result: ExecOutput { checkpoint: StageCheckpoint::new(10), done: true },
1139 },
1140 ]
1141 );
1142 }
1143
1144 #[tokio::test]
1146 async fn pipeline_error_handling() {
1147 let provider_factory = create_test_provider_factory();
1149 let mut pipeline = Pipeline::<MockNodeTypesWithDB>::builder()
1150 .add_stage(
1151 TestStage::new(StageId::Other("NonFatal"))
1152 .add_exec(Err(StageError::Recoverable(Box::new(std::fmt::Error))))
1153 .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(10), done: true })),
1154 )
1155 .with_max_block(10)
1156 .build(
1157 provider_factory.clone(),
1158 StaticFileProducer::new(provider_factory.clone(), PruneModes::default()),
1159 );
1160 let result = pipeline.run().await;
1161 assert_matches!(result, Ok(()));
1162
1163 let provider_factory = create_test_provider_factory();
1165 let mut pipeline = Pipeline::<MockNodeTypesWithDB>::builder()
1166 .add_stage(TestStage::new(StageId::Other("Fatal")).add_exec(Err(
1167 StageError::DatabaseIntegrity(ProviderError::BlockBodyIndicesNotFound(5)),
1168 )))
1169 .build(
1170 provider_factory.clone(),
1171 StaticFileProducer::new(provider_factory.clone(), PruneModes::default()),
1172 );
1173 let result = pipeline.run().await;
1174 assert_matches!(
1175 result,
1176 Err(PipelineError::Stage(StageError::DatabaseIntegrity(
1177 ProviderError::BlockBodyIndicesNotFound(5)
1178 )))
1179 );
1180 }
1181}