1mod ctrl;
2mod event;
3pub use crate::pipeline::ctrl::ControlFlow;
4use crate::{PipelineTarget, StageCheckpoint, StageId};
5use alloy_primitives::{BlockNumber, B256};
6pub use event::*;
7use futures_util::Future;
8use reth_primitives_traits::constants::BEACON_CONSENSUS_REORG_UNWIND_DEPTH;
9use reth_provider::{
10 providers::ProviderNodeTypes, BlockHashReader, BlockNumReader, ChainStateBlockReader,
11 ChainStateBlockWriter, DBProvider, DatabaseProviderFactory, ProviderFactory,
12 PruneCheckpointReader, StageCheckpointReader, StageCheckpointWriter,
13};
14use reth_prune::PrunerBuilder;
15use reth_static_file::StaticFileProducer;
16use reth_tokio_util::{EventSender, EventStream};
17use std::{
18 pin::Pin,
19 time::{Duration, Instant},
20};
21use tokio::sync::watch;
22use tracing::*;
23
24mod builder;
25mod progress;
26mod set;
27
28use crate::{
29 BlockErrorKind, ExecInput, ExecOutput, MetricEvent, MetricEventsSender, PipelineError, Stage,
30 StageError, StageExt, UnwindInput,
31};
32pub use builder::*;
33use progress::*;
34use reth_errors::RethResult;
35pub use set::*;
36
37pub(crate) type BoxedStage<DB> = Box<dyn Stage<DB>>;
39
40pub type PipelineFut<N> = Pin<Box<dyn Future<Output = PipelineWithResult<N>> + Send>>;
43
44pub type PipelineWithResult<N> = (Pipeline<N>, Result<ControlFlow, PipelineError>);
46
47#[cfg_attr(doc, aquamarine::aquamarine)]
48pub struct Pipeline<N: ProviderNodeTypes> {
70 provider_factory: ProviderFactory<N>,
72 stages: Vec<BoxedStage<<ProviderFactory<N> as DatabaseProviderFactory>::ProviderRW>>,
74 max_block: Option<BlockNumber>,
76 static_file_producer: StaticFileProducer<ProviderFactory<N>>,
77 event_sender: EventSender<PipelineEvent>,
79 progress: PipelineProgress,
81 tip_tx: Option<watch::Sender<B256>>,
85 metrics_tx: Option<MetricEventsSender>,
86 fail_on_unwind: bool,
89 last_detached_head_unwind_target: Option<B256>,
92 detached_head_attempts: u64,
95}
96
97impl<N: ProviderNodeTypes> Pipeline<N> {
98 pub fn builder() -> PipelineBuilder<<ProviderFactory<N> as DatabaseProviderFactory>::ProviderRW>
100 {
101 PipelineBuilder::default()
102 }
103
104 pub const fn minimum_block_number(&self) -> Option<u64> {
107 self.progress.minimum_block_number
108 }
109
110 #[track_caller]
112 pub fn set_tip(&self, tip: B256) {
113 let _ = self.tip_tx.as_ref().expect("tip sender is set").send(tip).map_err(|_| {
114 warn!(target: "sync::pipeline", "Chain tip channel closed");
115 });
116 }
117
118 pub fn events(&self) -> EventStream<PipelineEvent> {
120 self.event_sender.new_listener()
121 }
122
123 pub fn stage(
125 &mut self,
126 idx: usize,
127 ) -> &mut dyn Stage<<ProviderFactory<N> as DatabaseProviderFactory>::ProviderRW> {
128 &mut self.stages[idx]
129 }
130}
131
132impl<N: ProviderNodeTypes> Pipeline<N> {
133 pub fn register_metrics(&mut self) -> Result<(), PipelineError> {
135 let Some(metrics_tx) = &mut self.metrics_tx else { return Ok(()) };
136 let provider = self.provider_factory.provider()?;
137
138 for stage in &self.stages {
139 let stage_id = stage.id();
140 let _ = metrics_tx.send(MetricEvent::StageCheckpoint {
141 stage_id,
142 checkpoint: provider.get_stage_checkpoint(stage_id)?.unwrap_or_default(),
143 max_block_number: None,
144 elapsed: Duration::default(),
145 });
146 }
147 Ok(())
148 }
149
150 #[track_caller]
153 pub fn run_as_fut(mut self, target: Option<PipelineTarget>) -> PipelineFut<N> {
154 let _ = self.register_metrics();
155 Box::pin(async move {
156 if let Some(target) = target {
158 match target {
159 PipelineTarget::Sync(tip) => self.set_tip(tip),
160 PipelineTarget::Unwind(target) => {
161 if let Err(err) = self.move_to_static_files() {
162 return (self, Err(err.into()))
163 }
164 if let Err(err) = self.unwind(target, None) {
165 return (self, Err(err))
166 }
167 self.progress.update(target);
168
169 return (self, Ok(ControlFlow::Continue { block_number: target }))
170 }
171 }
172 }
173
174 let result = self.run_loop().await;
175 trace!(target: "sync::pipeline", ?target, ?result, "Pipeline finished");
176 (self, result)
177 })
178 }
179
180 pub async fn run(&mut self) -> Result<(), PipelineError> {
183 let _ = self.register_metrics(); loop {
186 let next_action = self.run_loop().await?;
187
188 if next_action.is_unwind() && self.fail_on_unwind {
189 return Err(PipelineError::UnexpectedUnwind)
190 }
191
192 if next_action.should_continue() &&
195 self.progress
196 .minimum_block_number
197 .zip(self.max_block)
198 .is_some_and(|(progress, target)| progress >= target)
199 {
200 trace!(
201 target: "sync::pipeline",
202 ?next_action,
203 minimum_block_number = ?self.progress.minimum_block_number,
204 max_block = ?self.max_block,
205 "Terminating pipeline."
206 );
207 return Ok(())
208 }
209 }
210 }
211
212 pub async fn run_loop(&mut self) -> Result<ControlFlow, PipelineError> {
224 self.move_to_static_files()?;
225
226 let mut previous_stage = None;
227 for stage_index in 0..self.stages.len() {
228 let stage = &self.stages[stage_index];
229 let stage_id = stage.id();
230
231 trace!(target: "sync::pipeline", stage = %stage_id, "Executing stage");
232 let next = self.execute_stage_to_completion(previous_stage, stage_index).await?;
233
234 trace!(target: "sync::pipeline", stage = %stage_id, ?next, "Completed stage");
235
236 match next {
237 ControlFlow::NoProgress { block_number } => {
238 if let Some(block_number) = block_number {
239 self.progress.update(block_number);
240 }
241 }
242 ControlFlow::Continue { block_number } => self.progress.update(block_number),
243 ControlFlow::Unwind { target, bad_block } => {
244 self.unwind(target, Some(bad_block.block.number))?;
245 return Ok(ControlFlow::Unwind { target, bad_block })
246 }
247 }
248
249 previous_stage = Some(
250 self.provider_factory
251 .provider()?
252 .get_stage_checkpoint(stage_id)?
253 .unwrap_or_default()
254 .block_number,
255 );
256 }
257
258 Ok(self.progress.next_ctrl())
259 }
260
261 pub fn move_to_static_files(&self) -> RethResult<()> {
275 let lowest_static_file_height =
277 self.static_file_producer.lock().copy_to_static_files()?.min_block_num();
278
279 if let Some(prune_tip) = lowest_static_file_height {
281 let mut pruner = PrunerBuilder::new(Default::default())
284 .delete_limit(usize::MAX)
285 .build_with_provider_factory(self.provider_factory.clone());
286
287 pruner.run(prune_tip)?;
288 }
289
290 Ok(())
291 }
292
293 pub fn unwind(
297 &mut self,
298 to: BlockNumber,
299 bad_block: Option<BlockNumber>,
300 ) -> Result<(), PipelineError> {
301 let (latest_block, prune_modes, checkpoints) = {
303 let provider = self.provider_factory.provider()?;
304 (
305 provider.last_block_number()?,
306 provider.prune_modes_ref().clone(),
307 provider.get_prune_checkpoints()?,
308 )
309 };
310 prune_modes.ensure_unwind_target_unpruned(latest_block, to, &checkpoints)?;
311
312 let unwind_pipeline = self.stages.iter_mut().rev();
314
315 let _locked_sf_producer = self.static_file_producer.lock();
318
319 let mut provider_rw =
320 self.provider_factory.unwind_provider_rw()?.disable_long_read_transaction_safety();
321
322 for stage in unwind_pipeline {
323 let stage_id = stage.id();
324 let span = info_span!("Unwinding", stage = %stage_id);
325 let _enter = span.enter();
326
327 let mut checkpoint = provider_rw.get_stage_checkpoint(stage_id)?.unwrap_or_default();
328 if checkpoint.block_number < to {
329 debug!(
330 target: "sync::pipeline",
331 from = %checkpoint.block_number,
332 %to,
333 "Unwind point too far for stage"
334 );
335 self.event_sender.notify(PipelineEvent::Skipped { stage_id });
336
337 continue
338 }
339
340 info!(
341 target: "sync::pipeline",
342 from = %checkpoint.block_number,
343 %to,
344 ?bad_block,
345 "Starting unwind"
346 );
347 while checkpoint.block_number > to {
348 let unwind_started_at = Instant::now();
349 let input = UnwindInput { checkpoint, unwind_to: to, bad_block };
350 self.event_sender.notify(PipelineEvent::Unwind { stage_id, input });
351
352 let output = stage.unwind(&provider_rw, input);
353 match output {
354 Ok(unwind_output) => {
355 checkpoint = unwind_output.checkpoint;
356 info!(
357 target: "sync::pipeline",
358 stage = %stage_id,
359 unwind_to = to,
360 progress = checkpoint.block_number,
361 done = checkpoint.block_number == to,
362 "Stage unwound"
363 );
364
365 provider_rw.save_stage_checkpoint(stage_id, checkpoint)?;
366
367 self.event_sender
369 .notify(PipelineEvent::Unwound { stage_id, result: unwind_output });
370
371 if let Some(metrics_tx) = &mut self.metrics_tx {
372 let _ = metrics_tx.send(MetricEvent::StageCheckpoint {
373 stage_id,
374 checkpoint,
375 max_block_number: None,
378 elapsed: unwind_started_at.elapsed(),
379 });
380 }
381
382 let last_saved_finalized_block_number =
384 provider_rw.last_finalized_block_number()?;
385
386 if last_saved_finalized_block_number.is_none() ||
389 Some(checkpoint.block_number) < last_saved_finalized_block_number
390 {
391 provider_rw.save_finalized_block_number(BlockNumber::from(
392 checkpoint.block_number,
393 ))?;
394 }
395
396 let last_saved_safe_block_number = provider_rw.last_safe_block_number()?;
397
398 if last_saved_safe_block_number.is_none() ||
399 Some(checkpoint.block_number) < last_saved_safe_block_number
400 {
401 provider_rw.save_safe_block_number(BlockNumber::from(
402 checkpoint.block_number,
403 ))?;
404 }
405
406 provider_rw.commit()?;
407
408 stage.post_unwind_commit()?;
409
410 provider_rw = self.provider_factory.unwind_provider_rw()?;
411 }
412 Err(err) => {
413 self.event_sender.notify(PipelineEvent::Error { stage_id });
414
415 return Err(PipelineError::Stage(StageError::Fatal(Box::new(err))))
416 }
417 }
418 }
419 }
420
421 Ok(())
422 }
423
424 async fn execute_stage_to_completion(
425 &mut self,
426 previous_stage: Option<BlockNumber>,
427 stage_index: usize,
428 ) -> Result<ControlFlow, PipelineError> {
429 let total_stages = self.stages.len();
430
431 let stage_id = self.stage(stage_index).id();
432 let mut made_progress = false;
433 let target = self.max_block.or(previous_stage);
434
435 loop {
436 let prev_checkpoint = self.provider_factory.get_stage_checkpoint(stage_id)?;
437
438 let stage_reached_max_block = prev_checkpoint
439 .zip(self.max_block)
440 .is_some_and(|(prev_progress, target)| prev_progress.block_number >= target);
441 if stage_reached_max_block {
442 warn!(
443 target: "sync::pipeline",
444 stage = %stage_id,
445 max_block = self.max_block,
446 prev_block = prev_checkpoint.map(|progress| progress.block_number),
447 "Stage reached target block, skipping."
448 );
449 self.event_sender.notify(PipelineEvent::Skipped { stage_id });
450
451 return Ok(ControlFlow::NoProgress {
453 block_number: prev_checkpoint.map(|progress| progress.block_number),
454 })
455 }
456
457 let exec_input = ExecInput { target, checkpoint: prev_checkpoint };
458
459 self.event_sender.notify(PipelineEvent::Prepare {
460 pipeline_stages_progress: PipelineStagesProgress {
461 current: stage_index + 1,
462 total: total_stages,
463 },
464 stage_id,
465 checkpoint: prev_checkpoint,
466 target,
467 });
468
469 if let Err(err) = self.stage(stage_index).execute_ready(exec_input).await {
470 self.event_sender.notify(PipelineEvent::Error { stage_id });
471 match self.on_stage_error(stage_id, prev_checkpoint, err)? {
472 Some(ctrl) => return Ok(ctrl),
473 None => continue,
474 };
475 }
476
477 let stage_started_at = Instant::now();
478 let provider_rw = self.provider_factory.database_provider_rw()?;
479
480 self.event_sender.notify(PipelineEvent::Run {
481 pipeline_stages_progress: PipelineStagesProgress {
482 current: stage_index + 1,
483 total: total_stages,
484 },
485 stage_id,
486 checkpoint: prev_checkpoint,
487 target,
488 });
489
490 match self.stage(stage_index).execute(&provider_rw, exec_input) {
491 Ok(out @ ExecOutput { checkpoint, done }) => {
492 provider_rw.save_stage_checkpoint(stage_id, checkpoint)?;
494
495 provider_rw.commit()?;
497
498 self.stage(stage_index).post_execute_commit()?;
500
501 self.event_sender.notify(PipelineEvent::Ran {
503 pipeline_stages_progress: PipelineStagesProgress {
504 current: stage_index + 1,
505 total: total_stages,
506 },
507 stage_id,
508 result: out.clone(),
509 });
510 if let Some(metrics_tx) = &mut self.metrics_tx {
511 let _ = metrics_tx.send(MetricEvent::StageCheckpoint {
512 stage_id,
513 checkpoint,
514 max_block_number: target,
515 elapsed: stage_started_at.elapsed(),
516 });
517 }
518
519 let block_number = checkpoint.block_number;
520 let prev_block_number = prev_checkpoint.unwrap_or_default().block_number;
521 made_progress |= block_number != prev_block_number;
522 if done {
523 return Ok(if made_progress {
524 ControlFlow::Continue { block_number }
525 } else {
526 ControlFlow::NoProgress { block_number: Some(block_number) }
527 })
528 }
529 }
530 Err(err) => {
531 drop(provider_rw);
532 self.event_sender.notify(PipelineEvent::Error { stage_id });
533
534 if let Some(ctrl) = self.on_stage_error(stage_id, prev_checkpoint, err)? {
535 return Ok(ctrl)
536 }
537 }
538 }
539 }
540 }
541
542 fn on_stage_error(
543 &mut self,
544 stage_id: StageId,
545 prev_checkpoint: Option<StageCheckpoint>,
546 err: StageError,
547 ) -> Result<Option<ControlFlow>, PipelineError> {
548 if let StageError::DetachedHead { local_head, header, error } = err {
549 warn!(target: "sync::pipeline", stage = %stage_id, ?local_head, ?header, %error, "Stage encountered detached head");
550
551 if let Some(last_detached_head_unwind_target) = self.last_detached_head_unwind_target {
552 if local_head.block.hash == last_detached_head_unwind_target &&
553 header.block.number == local_head.block.number + 1
554 {
555 self.detached_head_attempts += 1;
556 } else {
557 self.detached_head_attempts = 1;
558 }
559 } else {
560 self.detached_head_attempts = 1;
561 }
562
563 let unwind_to = local_head
565 .block
566 .number
567 .saturating_sub(
568 BEACON_CONSENSUS_REORG_UNWIND_DEPTH.saturating_mul(self.detached_head_attempts),
569 )
570 .max(1);
571
572 self.last_detached_head_unwind_target = self.provider_factory.block_hash(unwind_to)?;
573 Ok(Some(ControlFlow::Unwind { target: unwind_to, bad_block: local_head }))
574 } else if let StageError::Block { block, error } = err {
575 match error {
576 BlockErrorKind::Validation(validation_error) => {
577 error!(
578 target: "sync::pipeline",
579 stage = %stage_id,
580 bad_block = %block.block.number,
581 "Stage encountered a validation error: {validation_error}"
582 );
583
584 if stage_id == StageId::MerkleExecute {
589 let provider_rw = self.provider_factory.database_provider_rw()?;
590 provider_rw
591 .save_stage_checkpoint_progress(StageId::MerkleExecute, vec![])?;
592 provider_rw.save_stage_checkpoint(
593 StageId::MerkleExecute,
594 prev_checkpoint.unwrap_or_default(),
595 )?;
596
597 provider_rw.commit()?;
598 }
599
600 Ok(Some(ControlFlow::Unwind {
605 target: prev_checkpoint.unwrap_or_default().block_number,
606 bad_block: block,
607 }))
608 }
609 BlockErrorKind::Execution(execution_error) => {
610 error!(
611 target: "sync::pipeline",
612 stage = %stage_id,
613 bad_block = %block.block.number,
614 "Stage encountered an execution error: {execution_error}"
615 );
616
617 Ok(Some(ControlFlow::Unwind {
622 target: prev_checkpoint.unwrap_or_default().block_number,
623 bad_block: block,
624 }))
625 }
626 }
627 } else if let StageError::MissingStaticFileData { block, segment } = err {
628 error!(
629 target: "sync::pipeline",
630 stage = %stage_id,
631 bad_block = %block.block.number,
632 segment = %segment,
633 "Stage is missing static file data."
634 );
635
636 Ok(Some(ControlFlow::Unwind {
637 target: block.block.number.saturating_sub(1),
638 bad_block: block,
639 }))
640 } else if err.is_fatal() {
641 error!(target: "sync::pipeline", stage = %stage_id, "Stage encountered a fatal error: {err}");
642 Err(err.into())
643 } else {
644 warn!(
647 target: "sync::pipeline",
648 stage = %stage_id,
649 "Stage encountered a non-fatal error: {err}. Retrying..."
650 );
651 Ok(None)
652 }
653 }
654}
655
656impl<N: ProviderNodeTypes> std::fmt::Debug for Pipeline<N> {
657 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
658 f.debug_struct("Pipeline")
659 .field("stages", &self.stages.iter().map(|stage| stage.id()).collect::<Vec<StageId>>())
660 .field("max_block", &self.max_block)
661 .field("event_sender", &self.event_sender)
662 .field("fail_on_unwind", &self.fail_on_unwind)
663 .finish()
664 }
665}
666
667#[cfg(test)]
668mod tests {
669 use std::sync::atomic::Ordering;
670
671 use super::*;
672 use crate::{test_utils::TestStage, UnwindOutput};
673 use assert_matches::assert_matches;
674 use reth_consensus::ConsensusError;
675 use reth_errors::ProviderError;
676 use reth_provider::test_utils::{create_test_provider_factory, MockNodeTypesWithDB};
677 use reth_prune::PruneModes;
678 use reth_testing_utils::generators::{self, random_block_with_parent};
679 use tokio_stream::StreamExt;
680
681 #[test]
682 fn record_progress_calculates_outliers() {
683 let mut progress = PipelineProgress::default();
684
685 progress.update(10);
686 assert_eq!(progress.minimum_block_number, Some(10));
687 assert_eq!(progress.maximum_block_number, Some(10));
688
689 progress.update(20);
690 assert_eq!(progress.minimum_block_number, Some(10));
691 assert_eq!(progress.maximum_block_number, Some(20));
692
693 progress.update(1);
694 assert_eq!(progress.minimum_block_number, Some(1));
695 assert_eq!(progress.maximum_block_number, Some(20));
696 }
697
698 #[test]
699 fn progress_ctrl_flow() {
700 let mut progress = PipelineProgress::default();
701
702 assert_eq!(progress.next_ctrl(), ControlFlow::NoProgress { block_number: None });
703
704 progress.update(1);
705 assert_eq!(progress.next_ctrl(), ControlFlow::Continue { block_number: 1 });
706 }
707
708 #[tokio::test]
710 async fn run_pipeline() {
711 let provider_factory = create_test_provider_factory();
712
713 let stage_a = TestStage::new(StageId::Other("A"))
714 .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(20), done: true }));
715 let (stage_a, post_execute_commit_counter_a) = stage_a.with_post_execute_commit_counter();
716 let (stage_a, post_unwind_commit_counter_a) = stage_a.with_post_unwind_commit_counter();
717
718 let stage_b = TestStage::new(StageId::Other("B"))
719 .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(10), done: true }));
720 let (stage_b, post_execute_commit_counter_b) = stage_b.with_post_execute_commit_counter();
721 let (stage_b, post_unwind_commit_counter_b) = stage_b.with_post_unwind_commit_counter();
722
723 let mut pipeline = Pipeline::<MockNodeTypesWithDB>::builder()
724 .add_stage(stage_a)
725 .add_stage(stage_b)
726 .with_max_block(10)
727 .build(
728 provider_factory.clone(),
729 StaticFileProducer::new(provider_factory.clone(), PruneModes::default()),
730 );
731 let events = pipeline.events();
732
733 tokio::spawn(async move {
735 pipeline.run().await.unwrap();
736 });
737
738 assert_eq!(
740 events.collect::<Vec<PipelineEvent>>().await,
741 vec![
742 PipelineEvent::Prepare {
743 pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 },
744 stage_id: StageId::Other("A"),
745 checkpoint: None,
746 target: Some(10),
747 },
748 PipelineEvent::Run {
749 pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 },
750 stage_id: StageId::Other("A"),
751 checkpoint: None,
752 target: Some(10),
753 },
754 PipelineEvent::Ran {
755 pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 },
756 stage_id: StageId::Other("A"),
757 result: ExecOutput { checkpoint: StageCheckpoint::new(20), done: true },
758 },
759 PipelineEvent::Prepare {
760 pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 },
761 stage_id: StageId::Other("B"),
762 checkpoint: None,
763 target: Some(10),
764 },
765 PipelineEvent::Run {
766 pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 },
767 stage_id: StageId::Other("B"),
768 checkpoint: None,
769 target: Some(10),
770 },
771 PipelineEvent::Ran {
772 pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 },
773 stage_id: StageId::Other("B"),
774 result: ExecOutput { checkpoint: StageCheckpoint::new(10), done: true },
775 },
776 ]
777 );
778
779 assert_eq!(post_execute_commit_counter_a.load(Ordering::Relaxed), 1);
780 assert_eq!(post_unwind_commit_counter_a.load(Ordering::Relaxed), 0);
781
782 assert_eq!(post_execute_commit_counter_b.load(Ordering::Relaxed), 1);
783 assert_eq!(post_unwind_commit_counter_b.load(Ordering::Relaxed), 0);
784 }
785
786 #[tokio::test]
788 async fn unwind_pipeline() {
789 let provider_factory = create_test_provider_factory();
790
791 let stage_a = TestStage::new(StageId::Other("A"))
792 .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(100), done: true }))
793 .add_unwind(Ok(UnwindOutput { checkpoint: StageCheckpoint::new(1) }));
794 let (stage_a, post_execute_commit_counter_a) = stage_a.with_post_execute_commit_counter();
795 let (stage_a, post_unwind_commit_counter_a) = stage_a.with_post_unwind_commit_counter();
796
797 let stage_b = TestStage::new(StageId::Other("B"))
798 .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(10), done: true }))
799 .add_unwind(Ok(UnwindOutput { checkpoint: StageCheckpoint::new(1) }));
800 let (stage_b, post_execute_commit_counter_b) = stage_b.with_post_execute_commit_counter();
801 let (stage_b, post_unwind_commit_counter_b) = stage_b.with_post_unwind_commit_counter();
802
803 let stage_c = TestStage::new(StageId::Other("C"))
804 .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(20), done: true }))
805 .add_unwind(Ok(UnwindOutput { checkpoint: StageCheckpoint::new(1) }));
806 let (stage_c, post_execute_commit_counter_c) = stage_c.with_post_execute_commit_counter();
807 let (stage_c, post_unwind_commit_counter_c) = stage_c.with_post_unwind_commit_counter();
808
809 let mut pipeline = Pipeline::<MockNodeTypesWithDB>::builder()
810 .add_stage(stage_a)
811 .add_stage(stage_b)
812 .add_stage(stage_c)
813 .with_max_block(10)
814 .build(
815 provider_factory.clone(),
816 StaticFileProducer::new(provider_factory.clone(), PruneModes::default()),
817 );
818 let events = pipeline.events();
819
820 tokio::spawn(async move {
822 pipeline.run().await.expect("Could not run pipeline");
824
825 pipeline.unwind(1, None).expect("Could not unwind pipeline");
827 });
828
829 assert_eq!(
831 events.collect::<Vec<PipelineEvent>>().await,
832 vec![
833 PipelineEvent::Prepare {
835 pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 3 },
836 stage_id: StageId::Other("A"),
837 checkpoint: None,
838 target: Some(10),
839 },
840 PipelineEvent::Run {
841 pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 3 },
842 stage_id: StageId::Other("A"),
843 checkpoint: None,
844 target: Some(10),
845 },
846 PipelineEvent::Ran {
847 pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 3 },
848 stage_id: StageId::Other("A"),
849 result: ExecOutput { checkpoint: StageCheckpoint::new(100), done: true },
850 },
851 PipelineEvent::Prepare {
852 pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 3 },
853 stage_id: StageId::Other("B"),
854 checkpoint: None,
855 target: Some(10),
856 },
857 PipelineEvent::Run {
858 pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 3 },
859 stage_id: StageId::Other("B"),
860 checkpoint: None,
861 target: Some(10),
862 },
863 PipelineEvent::Ran {
864 pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 3 },
865 stage_id: StageId::Other("B"),
866 result: ExecOutput { checkpoint: StageCheckpoint::new(10), done: true },
867 },
868 PipelineEvent::Prepare {
869 pipeline_stages_progress: PipelineStagesProgress { current: 3, total: 3 },
870 stage_id: StageId::Other("C"),
871 checkpoint: None,
872 target: Some(10),
873 },
874 PipelineEvent::Run {
875 pipeline_stages_progress: PipelineStagesProgress { current: 3, total: 3 },
876 stage_id: StageId::Other("C"),
877 checkpoint: None,
878 target: Some(10),
879 },
880 PipelineEvent::Ran {
881 pipeline_stages_progress: PipelineStagesProgress { current: 3, total: 3 },
882 stage_id: StageId::Other("C"),
883 result: ExecOutput { checkpoint: StageCheckpoint::new(20), done: true },
884 },
885 PipelineEvent::Unwind {
887 stage_id: StageId::Other("C"),
888 input: UnwindInput {
889 checkpoint: StageCheckpoint::new(20),
890 unwind_to: 1,
891 bad_block: None
892 }
893 },
894 PipelineEvent::Unwound {
895 stage_id: StageId::Other("C"),
896 result: UnwindOutput { checkpoint: StageCheckpoint::new(1) },
897 },
898 PipelineEvent::Unwind {
899 stage_id: StageId::Other("B"),
900 input: UnwindInput {
901 checkpoint: StageCheckpoint::new(10),
902 unwind_to: 1,
903 bad_block: None
904 }
905 },
906 PipelineEvent::Unwound {
907 stage_id: StageId::Other("B"),
908 result: UnwindOutput { checkpoint: StageCheckpoint::new(1) },
909 },
910 PipelineEvent::Unwind {
911 stage_id: StageId::Other("A"),
912 input: UnwindInput {
913 checkpoint: StageCheckpoint::new(100),
914 unwind_to: 1,
915 bad_block: None
916 }
917 },
918 PipelineEvent::Unwound {
919 stage_id: StageId::Other("A"),
920 result: UnwindOutput { checkpoint: StageCheckpoint::new(1) },
921 },
922 ]
923 );
924
925 assert_eq!(post_execute_commit_counter_a.load(Ordering::Relaxed), 1);
926 assert_eq!(post_unwind_commit_counter_a.load(Ordering::Relaxed), 1);
927
928 assert_eq!(post_execute_commit_counter_b.load(Ordering::Relaxed), 1);
929 assert_eq!(post_unwind_commit_counter_b.load(Ordering::Relaxed), 1);
930
931 assert_eq!(post_execute_commit_counter_c.load(Ordering::Relaxed), 1);
932 assert_eq!(post_unwind_commit_counter_c.load(Ordering::Relaxed), 1);
933 }
934
935 #[tokio::test]
937 async fn unwind_pipeline_with_intermediate_progress() {
938 let provider_factory = create_test_provider_factory();
939
940 let mut pipeline = Pipeline::<MockNodeTypesWithDB>::builder()
941 .add_stage(
942 TestStage::new(StageId::Other("A"))
943 .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(100), done: true }))
944 .add_unwind(Ok(UnwindOutput { checkpoint: StageCheckpoint::new(50) })),
945 )
946 .add_stage(
947 TestStage::new(StageId::Other("B"))
948 .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(10), done: true })),
949 )
950 .with_max_block(10)
951 .build(
952 provider_factory.clone(),
953 StaticFileProducer::new(provider_factory.clone(), PruneModes::default()),
954 );
955 let events = pipeline.events();
956
957 tokio::spawn(async move {
959 pipeline.run().await.expect("Could not run pipeline");
961
962 pipeline.unwind(50, None).expect("Could not unwind pipeline");
964 });
965
966 assert_eq!(
968 events.collect::<Vec<PipelineEvent>>().await,
969 vec![
970 PipelineEvent::Prepare {
972 pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 },
973 stage_id: StageId::Other("A"),
974 checkpoint: None,
975 target: Some(10),
976 },
977 PipelineEvent::Run {
978 pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 },
979 stage_id: StageId::Other("A"),
980 checkpoint: None,
981 target: Some(10),
982 },
983 PipelineEvent::Ran {
984 pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 },
985 stage_id: StageId::Other("A"),
986 result: ExecOutput { checkpoint: StageCheckpoint::new(100), done: true },
987 },
988 PipelineEvent::Prepare {
989 pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 },
990 stage_id: StageId::Other("B"),
991 checkpoint: None,
992 target: Some(10),
993 },
994 PipelineEvent::Run {
995 pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 },
996 stage_id: StageId::Other("B"),
997 checkpoint: None,
998 target: Some(10),
999 },
1000 PipelineEvent::Ran {
1001 pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 },
1002 stage_id: StageId::Other("B"),
1003 result: ExecOutput { checkpoint: StageCheckpoint::new(10), done: true },
1004 },
1005 PipelineEvent::Skipped { stage_id: StageId::Other("B") },
1008 PipelineEvent::Unwind {
1009 stage_id: StageId::Other("A"),
1010 input: UnwindInput {
1011 checkpoint: StageCheckpoint::new(100),
1012 unwind_to: 50,
1013 bad_block: None
1014 }
1015 },
1016 PipelineEvent::Unwound {
1017 stage_id: StageId::Other("A"),
1018 result: UnwindOutput { checkpoint: StageCheckpoint::new(50) },
1019 },
1020 ]
1021 );
1022 }
1023
1024 #[tokio::test]
1037 async fn run_pipeline_with_unwind() {
1038 let provider_factory = create_test_provider_factory();
1039
1040 let mut pipeline = Pipeline::<MockNodeTypesWithDB>::builder()
1041 .add_stage(
1042 TestStage::new(StageId::Other("A"))
1043 .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(10), done: true }))
1044 .add_unwind(Ok(UnwindOutput { checkpoint: StageCheckpoint::new(0) }))
1045 .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(10), done: true })),
1046 )
1047 .add_stage(
1048 TestStage::new(StageId::Other("B"))
1049 .add_exec(Err(StageError::Block {
1050 block: Box::new(random_block_with_parent(
1051 &mut generators::rng(),
1052 5,
1053 Default::default(),
1054 )),
1055 error: BlockErrorKind::Validation(ConsensusError::BaseFeeMissing),
1056 }))
1057 .add_unwind(Ok(UnwindOutput { checkpoint: StageCheckpoint::new(0) }))
1058 .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(10), done: true })),
1059 )
1060 .with_max_block(10)
1061 .build(
1062 provider_factory.clone(),
1063 StaticFileProducer::new(provider_factory.clone(), PruneModes::default()),
1064 );
1065 let events = pipeline.events();
1066
1067 tokio::spawn(async move {
1069 pipeline.run().await.expect("Could not run pipeline");
1070 });
1071
1072 assert_eq!(
1074 events.collect::<Vec<PipelineEvent>>().await,
1075 vec![
1076 PipelineEvent::Prepare {
1077 pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 },
1078 stage_id: StageId::Other("A"),
1079 checkpoint: None,
1080 target: Some(10),
1081 },
1082 PipelineEvent::Run {
1083 pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 },
1084 stage_id: StageId::Other("A"),
1085 checkpoint: None,
1086 target: Some(10),
1087 },
1088 PipelineEvent::Ran {
1089 pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 },
1090 stage_id: StageId::Other("A"),
1091 result: ExecOutput { checkpoint: StageCheckpoint::new(10), done: true },
1092 },
1093 PipelineEvent::Prepare {
1094 pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 },
1095 stage_id: StageId::Other("B"),
1096 checkpoint: None,
1097 target: Some(10),
1098 },
1099 PipelineEvent::Run {
1100 pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 },
1101 stage_id: StageId::Other("B"),
1102 checkpoint: None,
1103 target: Some(10),
1104 },
1105 PipelineEvent::Error { stage_id: StageId::Other("B") },
1106 PipelineEvent::Unwind {
1107 stage_id: StageId::Other("A"),
1108 input: UnwindInput {
1109 checkpoint: StageCheckpoint::new(10),
1110 unwind_to: 0,
1111 bad_block: Some(5)
1112 }
1113 },
1114 PipelineEvent::Unwound {
1115 stage_id: StageId::Other("A"),
1116 result: UnwindOutput { checkpoint: StageCheckpoint::new(0) },
1117 },
1118 PipelineEvent::Prepare {
1119 pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 },
1120 stage_id: StageId::Other("A"),
1121 checkpoint: Some(StageCheckpoint::new(0)),
1122 target: Some(10),
1123 },
1124 PipelineEvent::Run {
1125 pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 },
1126 stage_id: StageId::Other("A"),
1127 checkpoint: Some(StageCheckpoint::new(0)),
1128 target: Some(10),
1129 },
1130 PipelineEvent::Ran {
1131 pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 },
1132 stage_id: StageId::Other("A"),
1133 result: ExecOutput { checkpoint: StageCheckpoint::new(10), done: true },
1134 },
1135 PipelineEvent::Prepare {
1136 pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 },
1137 stage_id: StageId::Other("B"),
1138 checkpoint: None,
1139 target: Some(10),
1140 },
1141 PipelineEvent::Run {
1142 pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 },
1143 stage_id: StageId::Other("B"),
1144 checkpoint: None,
1145 target: Some(10),
1146 },
1147 PipelineEvent::Ran {
1148 pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 },
1149 stage_id: StageId::Other("B"),
1150 result: ExecOutput { checkpoint: StageCheckpoint::new(10), done: true },
1151 },
1152 ]
1153 );
1154 }
1155
1156 #[tokio::test]
1158 async fn pipeline_error_handling() {
1159 let provider_factory = create_test_provider_factory();
1161 let mut pipeline = Pipeline::<MockNodeTypesWithDB>::builder()
1162 .add_stage(
1163 TestStage::new(StageId::Other("NonFatal"))
1164 .add_exec(Err(StageError::Recoverable(Box::new(std::fmt::Error))))
1165 .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(10), done: true })),
1166 )
1167 .with_max_block(10)
1168 .build(
1169 provider_factory.clone(),
1170 StaticFileProducer::new(provider_factory.clone(), PruneModes::default()),
1171 );
1172 let result = pipeline.run().await;
1173 assert_matches!(result, Ok(()));
1174
1175 let provider_factory = create_test_provider_factory();
1177 let mut pipeline = Pipeline::<MockNodeTypesWithDB>::builder()
1178 .add_stage(TestStage::new(StageId::Other("Fatal")).add_exec(Err(
1179 StageError::DatabaseIntegrity(ProviderError::BlockBodyIndicesNotFound(5)),
1180 )))
1181 .build(
1182 provider_factory.clone(),
1183 StaticFileProducer::new(provider_factory.clone(), PruneModes::default()),
1184 );
1185 let result = pipeline.run().await;
1186 assert_matches!(
1187 result,
1188 Err(PipelineError::Stage(StageError::DatabaseIntegrity(
1189 ProviderError::BlockBodyIndicesNotFound(5)
1190 )))
1191 );
1192 }
1193}