1mod ctrl;
2mod event;
3pub use crate::pipeline::ctrl::ControlFlow;
4use crate::{PipelineTarget, StageCheckpoint, StageId};
5use alloy_primitives::{BlockNumber, B256};
6pub use event::*;
7use futures_util::Future;
8use reth_primitives_traits::constants::BEACON_CONSENSUS_REORG_UNWIND_DEPTH;
9use reth_provider::{
10 providers::ProviderNodeTypes, BlockHashReader, BlockNumReader, ChainStateBlockReader,
11 ChainStateBlockWriter, DBProvider, DatabaseProviderFactory, ProviderFactory,
12 PruneCheckpointReader, StageCheckpointReader, StageCheckpointWriter,
13};
14use reth_prune::PrunerBuilder;
15use reth_static_file::StaticFileProducer;
16use reth_tokio_util::{EventSender, EventStream};
17use std::{
18 pin::Pin,
19 time::{Duration, Instant},
20};
21use tokio::sync::watch;
22use tracing::*;
23
24mod builder;
25mod progress;
26mod set;
27
28use crate::{
29 BlockErrorKind, ExecInput, ExecOutput, MetricEvent, MetricEventsSender, PipelineError, Stage,
30 StageError, StageExt, UnwindInput,
31};
32pub use builder::*;
33use progress::*;
34use reth_errors::RethResult;
35pub use set::*;
36
37pub(crate) type BoxedStage<DB> = Box<dyn Stage<DB>>;
39
40pub type PipelineFut<N> = Pin<Box<dyn Future<Output = PipelineWithResult<N>> + Send>>;
43
44pub type PipelineWithResult<N> = (Pipeline<N>, Result<ControlFlow, PipelineError>);
46
47#[cfg_attr(doc, aquamarine::aquamarine)]
48pub struct Pipeline<N: ProviderNodeTypes> {
70 provider_factory: ProviderFactory<N>,
72 stages: Vec<BoxedStage<<ProviderFactory<N> as DatabaseProviderFactory>::ProviderRW>>,
74 max_block: Option<BlockNumber>,
76 static_file_producer: StaticFileProducer<ProviderFactory<N>>,
77 event_sender: EventSender<PipelineEvent>,
79 progress: PipelineProgress,
81 tip_tx: Option<watch::Sender<B256>>,
85 metrics_tx: Option<MetricEventsSender>,
86 fail_on_unwind: bool,
89 last_detached_head_unwind_target: Option<B256>,
92 detached_head_attempts: u64,
95}
96
97impl<N: ProviderNodeTypes> Pipeline<N> {
98 pub fn builder() -> PipelineBuilder<<ProviderFactory<N> as DatabaseProviderFactory>::ProviderRW>
100 {
101 PipelineBuilder::default()
102 }
103
104 pub const fn minimum_block_number(&self) -> Option<u64> {
107 self.progress.minimum_block_number
108 }
109
110 #[track_caller]
112 pub fn set_tip(&self, tip: B256) {
113 let _ = self.tip_tx.as_ref().expect("tip sender is set").send(tip).map_err(|_| {
114 warn!(target: "sync::pipeline", "Chain tip channel closed");
115 });
116 }
117
118 pub fn events(&self) -> EventStream<PipelineEvent> {
120 self.event_sender.new_listener()
121 }
122
123 pub fn stage(
125 &mut self,
126 idx: usize,
127 ) -> &mut dyn Stage<<ProviderFactory<N> as DatabaseProviderFactory>::ProviderRW> {
128 &mut self.stages[idx]
129 }
130}
131
132impl<N: ProviderNodeTypes> Pipeline<N> {
133 pub fn register_metrics(&mut self) -> Result<(), PipelineError> {
135 let Some(metrics_tx) = &mut self.metrics_tx else { return Ok(()) };
136 let provider = self.provider_factory.provider()?;
137
138 for stage in &self.stages {
139 let stage_id = stage.id();
140 let _ = metrics_tx.send(MetricEvent::StageCheckpoint {
141 stage_id,
142 checkpoint: provider.get_stage_checkpoint(stage_id)?.unwrap_or_default(),
143 max_block_number: None,
144 elapsed: Duration::default(),
145 });
146 }
147 Ok(())
148 }
149
150 #[track_caller]
153 pub fn run_as_fut(mut self, target: Option<PipelineTarget>) -> PipelineFut<N> {
154 let _ = self.register_metrics();
155 Box::pin(async move {
156 if let Some(target) = target {
158 match target {
159 PipelineTarget::Sync(tip) => self.set_tip(tip),
160 PipelineTarget::Unwind(target) => {
161 if let Err(err) = self.move_to_static_files() {
162 return (self, Err(err.into()))
163 }
164 if let Err(err) = self.unwind(target, None) {
165 return (self, Err(err))
166 }
167 self.progress.update(target);
168
169 return (self, Ok(ControlFlow::Continue { block_number: target }))
170 }
171 }
172 }
173
174 let result = self.run_loop().await;
175 trace!(target: "sync::pipeline", ?target, ?result, "Pipeline finished");
176 (self, result)
177 })
178 }
179
180 pub async fn run(&mut self) -> Result<(), PipelineError> {
183 let _ = self.register_metrics(); loop {
186 let next_action = self.run_loop().await?;
187
188 if next_action.is_unwind() && self.fail_on_unwind {
189 return Err(PipelineError::UnexpectedUnwind)
190 }
191
192 if next_action.should_continue() &&
195 self.progress
196 .minimum_block_number
197 .zip(self.max_block)
198 .is_some_and(|(progress, target)| progress >= target)
199 {
200 trace!(
201 target: "sync::pipeline",
202 ?next_action,
203 minimum_block_number = ?self.progress.minimum_block_number,
204 max_block = ?self.max_block,
205 "Terminating pipeline."
206 );
207 return Ok(())
208 }
209 }
210 }
211
212 pub async fn run_loop(&mut self) -> Result<ControlFlow, PipelineError> {
224 self.move_to_static_files()?;
225
226 let mut previous_stage = None;
227 for stage_index in 0..self.stages.len() {
228 let stage = &self.stages[stage_index];
229 let stage_id = stage.id();
230
231 trace!(target: "sync::pipeline", stage = %stage_id, "Executing stage");
232 let next = self.execute_stage_to_completion(previous_stage, stage_index).await?;
233
234 trace!(target: "sync::pipeline", stage = %stage_id, ?next, "Completed stage");
235
236 match next {
237 ControlFlow::NoProgress { block_number } => {
238 if let Some(block_number) = block_number {
239 self.progress.update(block_number);
240 }
241 }
242 ControlFlow::Continue { block_number } => self.progress.update(block_number),
243 ControlFlow::Unwind { target, bad_block } => {
244 self.unwind(target, Some(bad_block.block.number))?;
245 return Ok(ControlFlow::Unwind { target, bad_block })
246 }
247 }
248
249 previous_stage = Some(
250 self.provider_factory
251 .provider()?
252 .get_stage_checkpoint(stage_id)?
253 .unwrap_or_default()
254 .block_number,
255 );
256 }
257
258 Ok(self.progress.next_ctrl())
259 }
260
261 pub fn move_to_static_files(&self) -> RethResult<()> {
275 let lowest_static_file_height =
277 self.static_file_producer.lock().copy_to_static_files()?.min_block_num();
278
279 if let Some(prune_tip) = lowest_static_file_height {
281 let mut pruner = PrunerBuilder::new(Default::default())
284 .delete_limit(usize::MAX)
285 .build_with_provider_factory(self.provider_factory.clone());
286
287 pruner.run(prune_tip)?;
288 }
289
290 Ok(())
291 }
292
293 pub fn unwind(
297 &mut self,
298 to: BlockNumber,
299 bad_block: Option<BlockNumber>,
300 ) -> Result<(), PipelineError> {
301 let provider = self.provider_factory.provider()?;
303 let latest_block = provider.last_block_number()?;
304
305 let prune_modes = provider.prune_modes_ref();
307
308 let checkpoints = provider.get_prune_checkpoints()?;
309 prune_modes.ensure_unwind_target_unpruned(latest_block, to, &checkpoints)?;
310
311 let unwind_pipeline = self.stages.iter_mut().rev();
313
314 let _locked_sf_producer = self.static_file_producer.lock();
317
318 let mut provider_rw =
319 self.provider_factory.unwind_provider_rw()?.disable_long_read_transaction_safety();
320
321 for stage in unwind_pipeline {
322 let stage_id = stage.id();
323 let span = info_span!("Unwinding", stage = %stage_id);
324 let _enter = span.enter();
325
326 let mut checkpoint = provider_rw.get_stage_checkpoint(stage_id)?.unwrap_or_default();
327 if checkpoint.block_number < to {
328 debug!(
329 target: "sync::pipeline",
330 from = %checkpoint.block_number,
331 %to,
332 "Unwind point too far for stage"
333 );
334 self.event_sender.notify(PipelineEvent::Skipped { stage_id });
335
336 continue
337 }
338
339 info!(
340 target: "sync::pipeline",
341 from = %checkpoint.block_number,
342 %to,
343 ?bad_block,
344 "Starting unwind"
345 );
346 while checkpoint.block_number > to {
347 let unwind_started_at = Instant::now();
348 let input = UnwindInput { checkpoint, unwind_to: to, bad_block };
349 self.event_sender.notify(PipelineEvent::Unwind { stage_id, input });
350
351 let output = stage.unwind(&provider_rw, input);
352 match output {
353 Ok(unwind_output) => {
354 checkpoint = unwind_output.checkpoint;
355 info!(
356 target: "sync::pipeline",
357 stage = %stage_id,
358 unwind_to = to,
359 progress = checkpoint.block_number,
360 done = checkpoint.block_number == to,
361 "Stage unwound"
362 );
363
364 provider_rw.save_stage_checkpoint(stage_id, checkpoint)?;
365
366 self.event_sender
368 .notify(PipelineEvent::Unwound { stage_id, result: unwind_output });
369
370 if let Some(metrics_tx) = &mut self.metrics_tx {
371 let _ = metrics_tx.send(MetricEvent::StageCheckpoint {
372 stage_id,
373 checkpoint,
374 max_block_number: None,
377 elapsed: unwind_started_at.elapsed(),
378 });
379 }
380
381 let last_saved_finalized_block_number =
383 provider_rw.last_finalized_block_number()?;
384
385 if last_saved_finalized_block_number.is_none() ||
388 Some(checkpoint.block_number) < last_saved_finalized_block_number
389 {
390 provider_rw.save_finalized_block_number(BlockNumber::from(
391 checkpoint.block_number,
392 ))?;
393 }
394
395 let last_saved_safe_block_number = provider_rw.last_safe_block_number()?;
396
397 if last_saved_safe_block_number.is_none() ||
398 Some(checkpoint.block_number) < last_saved_safe_block_number
399 {
400 provider_rw.save_safe_block_number(BlockNumber::from(
401 checkpoint.block_number,
402 ))?;
403 }
404
405 provider_rw.commit()?;
406
407 stage.post_unwind_commit()?;
408
409 provider_rw = self.provider_factory.unwind_provider_rw()?;
410 }
411 Err(err) => {
412 self.event_sender.notify(PipelineEvent::Error { stage_id });
413
414 return Err(PipelineError::Stage(StageError::Fatal(Box::new(err))))
415 }
416 }
417 }
418 }
419
420 Ok(())
421 }
422
423 async fn execute_stage_to_completion(
424 &mut self,
425 previous_stage: Option<BlockNumber>,
426 stage_index: usize,
427 ) -> Result<ControlFlow, PipelineError> {
428 let total_stages = self.stages.len();
429
430 let stage_id = self.stage(stage_index).id();
431 let mut made_progress = false;
432 let target = self.max_block.or(previous_stage);
433
434 loop {
435 let prev_checkpoint = self.provider_factory.get_stage_checkpoint(stage_id)?;
436
437 let stage_reached_max_block = prev_checkpoint
438 .zip(self.max_block)
439 .is_some_and(|(prev_progress, target)| prev_progress.block_number >= target);
440 if stage_reached_max_block {
441 warn!(
442 target: "sync::pipeline",
443 stage = %stage_id,
444 max_block = self.max_block,
445 prev_block = prev_checkpoint.map(|progress| progress.block_number),
446 "Stage reached target block, skipping."
447 );
448 self.event_sender.notify(PipelineEvent::Skipped { stage_id });
449
450 return Ok(ControlFlow::NoProgress {
452 block_number: prev_checkpoint.map(|progress| progress.block_number),
453 })
454 }
455
456 let exec_input = ExecInput { target, checkpoint: prev_checkpoint };
457
458 self.event_sender.notify(PipelineEvent::Prepare {
459 pipeline_stages_progress: PipelineStagesProgress {
460 current: stage_index + 1,
461 total: total_stages,
462 },
463 stage_id,
464 checkpoint: prev_checkpoint,
465 target,
466 });
467
468 if let Err(err) = self.stage(stage_index).execute_ready(exec_input).await {
469 self.event_sender.notify(PipelineEvent::Error { stage_id });
470 match self.on_stage_error(stage_id, prev_checkpoint, err)? {
471 Some(ctrl) => return Ok(ctrl),
472 None => continue,
473 };
474 }
475
476 let stage_started_at = Instant::now();
477 let provider_rw = self.provider_factory.database_provider_rw()?;
478
479 self.event_sender.notify(PipelineEvent::Run {
480 pipeline_stages_progress: PipelineStagesProgress {
481 current: stage_index + 1,
482 total: total_stages,
483 },
484 stage_id,
485 checkpoint: prev_checkpoint,
486 target,
487 });
488
489 match self.stage(stage_index).execute(&provider_rw, exec_input) {
490 Ok(out @ ExecOutput { checkpoint, done }) => {
491 provider_rw.save_stage_checkpoint(stage_id, checkpoint)?;
493
494 provider_rw.commit()?;
496
497 self.stage(stage_index).post_execute_commit()?;
499
500 self.event_sender.notify(PipelineEvent::Ran {
502 pipeline_stages_progress: PipelineStagesProgress {
503 current: stage_index + 1,
504 total: total_stages,
505 },
506 stage_id,
507 result: out.clone(),
508 });
509 if let Some(metrics_tx) = &mut self.metrics_tx {
510 let _ = metrics_tx.send(MetricEvent::StageCheckpoint {
511 stage_id,
512 checkpoint,
513 max_block_number: target,
514 elapsed: stage_started_at.elapsed(),
515 });
516 }
517
518 let block_number = checkpoint.block_number;
519 let prev_block_number = prev_checkpoint.unwrap_or_default().block_number;
520 made_progress |= block_number != prev_block_number;
521 if done {
522 return Ok(if made_progress {
523 ControlFlow::Continue { block_number }
524 } else {
525 ControlFlow::NoProgress { block_number: Some(block_number) }
526 })
527 }
528 }
529 Err(err) => {
530 drop(provider_rw);
531 self.event_sender.notify(PipelineEvent::Error { stage_id });
532
533 if let Some(ctrl) = self.on_stage_error(stage_id, prev_checkpoint, err)? {
534 return Ok(ctrl)
535 }
536 }
537 }
538 }
539 }
540
541 fn on_stage_error(
542 &mut self,
543 stage_id: StageId,
544 prev_checkpoint: Option<StageCheckpoint>,
545 err: StageError,
546 ) -> Result<Option<ControlFlow>, PipelineError> {
547 if let StageError::DetachedHead { local_head, header, error } = err {
548 warn!(target: "sync::pipeline", stage = %stage_id, ?local_head, ?header, %error, "Stage encountered detached head");
549
550 if let Some(last_detached_head_unwind_target) = self.last_detached_head_unwind_target {
551 if local_head.block.hash == last_detached_head_unwind_target &&
552 header.block.number == local_head.block.number + 1
553 {
554 self.detached_head_attempts += 1;
555 } else {
556 self.detached_head_attempts = 1;
557 }
558 } else {
559 self.detached_head_attempts = 1;
560 }
561
562 let unwind_to = local_head
564 .block
565 .number
566 .saturating_sub(
567 BEACON_CONSENSUS_REORG_UNWIND_DEPTH.saturating_mul(self.detached_head_attempts),
568 )
569 .max(1);
570
571 self.last_detached_head_unwind_target = self.provider_factory.block_hash(unwind_to)?;
572 Ok(Some(ControlFlow::Unwind { target: unwind_to, bad_block: local_head }))
573 } else if let StageError::Block { block, error } = err {
574 match error {
575 BlockErrorKind::Validation(validation_error) => {
576 error!(
577 target: "sync::pipeline",
578 stage = %stage_id,
579 bad_block = %block.block.number,
580 "Stage encountered a validation error: {validation_error}"
581 );
582
583 if stage_id == StageId::MerkleExecute {
588 let provider_rw = self.provider_factory.database_provider_rw()?;
589 provider_rw
590 .save_stage_checkpoint_progress(StageId::MerkleExecute, vec![])?;
591 provider_rw.save_stage_checkpoint(
592 StageId::MerkleExecute,
593 prev_checkpoint.unwrap_or_default(),
594 )?;
595
596 provider_rw.commit()?;
597 }
598
599 Ok(Some(ControlFlow::Unwind {
604 target: prev_checkpoint.unwrap_or_default().block_number,
605 bad_block: block,
606 }))
607 }
608 BlockErrorKind::Execution(execution_error) => {
609 error!(
610 target: "sync::pipeline",
611 stage = %stage_id,
612 bad_block = %block.block.number,
613 "Stage encountered an execution error: {execution_error}"
614 );
615
616 Ok(Some(ControlFlow::Unwind {
621 target: prev_checkpoint.unwrap_or_default().block_number,
622 bad_block: block,
623 }))
624 }
625 }
626 } else if let StageError::MissingStaticFileData { block, segment } = err {
627 error!(
628 target: "sync::pipeline",
629 stage = %stage_id,
630 bad_block = %block.block.number,
631 segment = %segment,
632 "Stage is missing static file data."
633 );
634
635 Ok(Some(ControlFlow::Unwind {
636 target: block.block.number.saturating_sub(1),
637 bad_block: block,
638 }))
639 } else if err.is_fatal() {
640 error!(target: "sync::pipeline", stage = %stage_id, "Stage encountered a fatal error: {err}");
641 Err(err.into())
642 } else {
643 warn!(
646 target: "sync::pipeline",
647 stage = %stage_id,
648 "Stage encountered a non-fatal error: {err}. Retrying..."
649 );
650 Ok(None)
651 }
652 }
653}
654
655impl<N: ProviderNodeTypes> std::fmt::Debug for Pipeline<N> {
656 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
657 f.debug_struct("Pipeline")
658 .field("stages", &self.stages.iter().map(|stage| stage.id()).collect::<Vec<StageId>>())
659 .field("max_block", &self.max_block)
660 .field("event_sender", &self.event_sender)
661 .field("fail_on_unwind", &self.fail_on_unwind)
662 .finish()
663 }
664}
665
666#[cfg(test)]
667mod tests {
668 use std::sync::atomic::Ordering;
669
670 use super::*;
671 use crate::{test_utils::TestStage, UnwindOutput};
672 use assert_matches::assert_matches;
673 use reth_consensus::ConsensusError;
674 use reth_errors::ProviderError;
675 use reth_provider::test_utils::{create_test_provider_factory, MockNodeTypesWithDB};
676 use reth_prune::PruneModes;
677 use reth_testing_utils::generators::{self, random_block_with_parent};
678 use tokio_stream::StreamExt;
679
680 #[test]
681 fn record_progress_calculates_outliers() {
682 let mut progress = PipelineProgress::default();
683
684 progress.update(10);
685 assert_eq!(progress.minimum_block_number, Some(10));
686 assert_eq!(progress.maximum_block_number, Some(10));
687
688 progress.update(20);
689 assert_eq!(progress.minimum_block_number, Some(10));
690 assert_eq!(progress.maximum_block_number, Some(20));
691
692 progress.update(1);
693 assert_eq!(progress.minimum_block_number, Some(1));
694 assert_eq!(progress.maximum_block_number, Some(20));
695 }
696
697 #[test]
698 fn progress_ctrl_flow() {
699 let mut progress = PipelineProgress::default();
700
701 assert_eq!(progress.next_ctrl(), ControlFlow::NoProgress { block_number: None });
702
703 progress.update(1);
704 assert_eq!(progress.next_ctrl(), ControlFlow::Continue { block_number: 1 });
705 }
706
707 #[tokio::test]
709 async fn run_pipeline() {
710 let provider_factory = create_test_provider_factory();
711
712 let stage_a = TestStage::new(StageId::Other("A"))
713 .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(20), done: true }));
714 let (stage_a, post_execute_commit_counter_a) = stage_a.with_post_execute_commit_counter();
715 let (stage_a, post_unwind_commit_counter_a) = stage_a.with_post_unwind_commit_counter();
716
717 let stage_b = TestStage::new(StageId::Other("B"))
718 .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(10), done: true }));
719 let (stage_b, post_execute_commit_counter_b) = stage_b.with_post_execute_commit_counter();
720 let (stage_b, post_unwind_commit_counter_b) = stage_b.with_post_unwind_commit_counter();
721
722 let mut pipeline = Pipeline::<MockNodeTypesWithDB>::builder()
723 .add_stage(stage_a)
724 .add_stage(stage_b)
725 .with_max_block(10)
726 .build(
727 provider_factory.clone(),
728 StaticFileProducer::new(provider_factory.clone(), PruneModes::default()),
729 );
730 let events = pipeline.events();
731
732 tokio::spawn(async move {
734 pipeline.run().await.unwrap();
735 });
736
737 assert_eq!(
739 events.collect::<Vec<PipelineEvent>>().await,
740 vec![
741 PipelineEvent::Prepare {
742 pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 },
743 stage_id: StageId::Other("A"),
744 checkpoint: None,
745 target: Some(10),
746 },
747 PipelineEvent::Run {
748 pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 },
749 stage_id: StageId::Other("A"),
750 checkpoint: None,
751 target: Some(10),
752 },
753 PipelineEvent::Ran {
754 pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 },
755 stage_id: StageId::Other("A"),
756 result: ExecOutput { checkpoint: StageCheckpoint::new(20), done: true },
757 },
758 PipelineEvent::Prepare {
759 pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 },
760 stage_id: StageId::Other("B"),
761 checkpoint: None,
762 target: Some(10),
763 },
764 PipelineEvent::Run {
765 pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 },
766 stage_id: StageId::Other("B"),
767 checkpoint: None,
768 target: Some(10),
769 },
770 PipelineEvent::Ran {
771 pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 },
772 stage_id: StageId::Other("B"),
773 result: ExecOutput { checkpoint: StageCheckpoint::new(10), done: true },
774 },
775 ]
776 );
777
778 assert_eq!(post_execute_commit_counter_a.load(Ordering::Relaxed), 1);
779 assert_eq!(post_unwind_commit_counter_a.load(Ordering::Relaxed), 0);
780
781 assert_eq!(post_execute_commit_counter_b.load(Ordering::Relaxed), 1);
782 assert_eq!(post_unwind_commit_counter_b.load(Ordering::Relaxed), 0);
783 }
784
785 #[tokio::test]
787 async fn unwind_pipeline() {
788 let provider_factory = create_test_provider_factory();
789
790 let stage_a = TestStage::new(StageId::Other("A"))
791 .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(100), done: true }))
792 .add_unwind(Ok(UnwindOutput { checkpoint: StageCheckpoint::new(1) }));
793 let (stage_a, post_execute_commit_counter_a) = stage_a.with_post_execute_commit_counter();
794 let (stage_a, post_unwind_commit_counter_a) = stage_a.with_post_unwind_commit_counter();
795
796 let stage_b = TestStage::new(StageId::Other("B"))
797 .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(10), done: true }))
798 .add_unwind(Ok(UnwindOutput { checkpoint: StageCheckpoint::new(1) }));
799 let (stage_b, post_execute_commit_counter_b) = stage_b.with_post_execute_commit_counter();
800 let (stage_b, post_unwind_commit_counter_b) = stage_b.with_post_unwind_commit_counter();
801
802 let stage_c = TestStage::new(StageId::Other("C"))
803 .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(20), done: true }))
804 .add_unwind(Ok(UnwindOutput { checkpoint: StageCheckpoint::new(1) }));
805 let (stage_c, post_execute_commit_counter_c) = stage_c.with_post_execute_commit_counter();
806 let (stage_c, post_unwind_commit_counter_c) = stage_c.with_post_unwind_commit_counter();
807
808 let mut pipeline = Pipeline::<MockNodeTypesWithDB>::builder()
809 .add_stage(stage_a)
810 .add_stage(stage_b)
811 .add_stage(stage_c)
812 .with_max_block(10)
813 .build(
814 provider_factory.clone(),
815 StaticFileProducer::new(provider_factory.clone(), PruneModes::default()),
816 );
817 let events = pipeline.events();
818
819 tokio::spawn(async move {
821 pipeline.run().await.expect("Could not run pipeline");
823
824 pipeline.unwind(1, None).expect("Could not unwind pipeline");
826 });
827
828 assert_eq!(
830 events.collect::<Vec<PipelineEvent>>().await,
831 vec![
832 PipelineEvent::Prepare {
834 pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 3 },
835 stage_id: StageId::Other("A"),
836 checkpoint: None,
837 target: Some(10),
838 },
839 PipelineEvent::Run {
840 pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 3 },
841 stage_id: StageId::Other("A"),
842 checkpoint: None,
843 target: Some(10),
844 },
845 PipelineEvent::Ran {
846 pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 3 },
847 stage_id: StageId::Other("A"),
848 result: ExecOutput { checkpoint: StageCheckpoint::new(100), done: true },
849 },
850 PipelineEvent::Prepare {
851 pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 3 },
852 stage_id: StageId::Other("B"),
853 checkpoint: None,
854 target: Some(10),
855 },
856 PipelineEvent::Run {
857 pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 3 },
858 stage_id: StageId::Other("B"),
859 checkpoint: None,
860 target: Some(10),
861 },
862 PipelineEvent::Ran {
863 pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 3 },
864 stage_id: StageId::Other("B"),
865 result: ExecOutput { checkpoint: StageCheckpoint::new(10), done: true },
866 },
867 PipelineEvent::Prepare {
868 pipeline_stages_progress: PipelineStagesProgress { current: 3, total: 3 },
869 stage_id: StageId::Other("C"),
870 checkpoint: None,
871 target: Some(10),
872 },
873 PipelineEvent::Run {
874 pipeline_stages_progress: PipelineStagesProgress { current: 3, total: 3 },
875 stage_id: StageId::Other("C"),
876 checkpoint: None,
877 target: Some(10),
878 },
879 PipelineEvent::Ran {
880 pipeline_stages_progress: PipelineStagesProgress { current: 3, total: 3 },
881 stage_id: StageId::Other("C"),
882 result: ExecOutput { checkpoint: StageCheckpoint::new(20), done: true },
883 },
884 PipelineEvent::Unwind {
886 stage_id: StageId::Other("C"),
887 input: UnwindInput {
888 checkpoint: StageCheckpoint::new(20),
889 unwind_to: 1,
890 bad_block: None
891 }
892 },
893 PipelineEvent::Unwound {
894 stage_id: StageId::Other("C"),
895 result: UnwindOutput { checkpoint: StageCheckpoint::new(1) },
896 },
897 PipelineEvent::Unwind {
898 stage_id: StageId::Other("B"),
899 input: UnwindInput {
900 checkpoint: StageCheckpoint::new(10),
901 unwind_to: 1,
902 bad_block: None
903 }
904 },
905 PipelineEvent::Unwound {
906 stage_id: StageId::Other("B"),
907 result: UnwindOutput { checkpoint: StageCheckpoint::new(1) },
908 },
909 PipelineEvent::Unwind {
910 stage_id: StageId::Other("A"),
911 input: UnwindInput {
912 checkpoint: StageCheckpoint::new(100),
913 unwind_to: 1,
914 bad_block: None
915 }
916 },
917 PipelineEvent::Unwound {
918 stage_id: StageId::Other("A"),
919 result: UnwindOutput { checkpoint: StageCheckpoint::new(1) },
920 },
921 ]
922 );
923
924 assert_eq!(post_execute_commit_counter_a.load(Ordering::Relaxed), 1);
925 assert_eq!(post_unwind_commit_counter_a.load(Ordering::Relaxed), 1);
926
927 assert_eq!(post_execute_commit_counter_b.load(Ordering::Relaxed), 1);
928 assert_eq!(post_unwind_commit_counter_b.load(Ordering::Relaxed), 1);
929
930 assert_eq!(post_execute_commit_counter_c.load(Ordering::Relaxed), 1);
931 assert_eq!(post_unwind_commit_counter_c.load(Ordering::Relaxed), 1);
932 }
933
934 #[tokio::test]
936 async fn unwind_pipeline_with_intermediate_progress() {
937 let provider_factory = create_test_provider_factory();
938
939 let mut pipeline = Pipeline::<MockNodeTypesWithDB>::builder()
940 .add_stage(
941 TestStage::new(StageId::Other("A"))
942 .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(100), done: true }))
943 .add_unwind(Ok(UnwindOutput { checkpoint: StageCheckpoint::new(50) })),
944 )
945 .add_stage(
946 TestStage::new(StageId::Other("B"))
947 .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(10), done: true })),
948 )
949 .with_max_block(10)
950 .build(
951 provider_factory.clone(),
952 StaticFileProducer::new(provider_factory.clone(), PruneModes::default()),
953 );
954 let events = pipeline.events();
955
956 tokio::spawn(async move {
958 pipeline.run().await.expect("Could not run pipeline");
960
961 pipeline.unwind(50, None).expect("Could not unwind pipeline");
963 });
964
965 assert_eq!(
967 events.collect::<Vec<PipelineEvent>>().await,
968 vec![
969 PipelineEvent::Prepare {
971 pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 },
972 stage_id: StageId::Other("A"),
973 checkpoint: None,
974 target: Some(10),
975 },
976 PipelineEvent::Run {
977 pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 },
978 stage_id: StageId::Other("A"),
979 checkpoint: None,
980 target: Some(10),
981 },
982 PipelineEvent::Ran {
983 pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 },
984 stage_id: StageId::Other("A"),
985 result: ExecOutput { checkpoint: StageCheckpoint::new(100), done: true },
986 },
987 PipelineEvent::Prepare {
988 pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 },
989 stage_id: StageId::Other("B"),
990 checkpoint: None,
991 target: Some(10),
992 },
993 PipelineEvent::Run {
994 pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 },
995 stage_id: StageId::Other("B"),
996 checkpoint: None,
997 target: Some(10),
998 },
999 PipelineEvent::Ran {
1000 pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 },
1001 stage_id: StageId::Other("B"),
1002 result: ExecOutput { checkpoint: StageCheckpoint::new(10), done: true },
1003 },
1004 PipelineEvent::Skipped { stage_id: StageId::Other("B") },
1007 PipelineEvent::Unwind {
1008 stage_id: StageId::Other("A"),
1009 input: UnwindInput {
1010 checkpoint: StageCheckpoint::new(100),
1011 unwind_to: 50,
1012 bad_block: None
1013 }
1014 },
1015 PipelineEvent::Unwound {
1016 stage_id: StageId::Other("A"),
1017 result: UnwindOutput { checkpoint: StageCheckpoint::new(50) },
1018 },
1019 ]
1020 );
1021 }
1022
1023 #[tokio::test]
1036 async fn run_pipeline_with_unwind() {
1037 let provider_factory = create_test_provider_factory();
1038
1039 let mut pipeline = Pipeline::<MockNodeTypesWithDB>::builder()
1040 .add_stage(
1041 TestStage::new(StageId::Other("A"))
1042 .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(10), done: true }))
1043 .add_unwind(Ok(UnwindOutput { checkpoint: StageCheckpoint::new(0) }))
1044 .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(10), done: true })),
1045 )
1046 .add_stage(
1047 TestStage::new(StageId::Other("B"))
1048 .add_exec(Err(StageError::Block {
1049 block: Box::new(random_block_with_parent(
1050 &mut generators::rng(),
1051 5,
1052 Default::default(),
1053 )),
1054 error: BlockErrorKind::Validation(ConsensusError::BaseFeeMissing),
1055 }))
1056 .add_unwind(Ok(UnwindOutput { checkpoint: StageCheckpoint::new(0) }))
1057 .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(10), done: true })),
1058 )
1059 .with_max_block(10)
1060 .build(
1061 provider_factory.clone(),
1062 StaticFileProducer::new(provider_factory.clone(), PruneModes::default()),
1063 );
1064 let events = pipeline.events();
1065
1066 tokio::spawn(async move {
1068 pipeline.run().await.expect("Could not run pipeline");
1069 });
1070
1071 assert_eq!(
1073 events.collect::<Vec<PipelineEvent>>().await,
1074 vec![
1075 PipelineEvent::Prepare {
1076 pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 },
1077 stage_id: StageId::Other("A"),
1078 checkpoint: None,
1079 target: Some(10),
1080 },
1081 PipelineEvent::Run {
1082 pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 },
1083 stage_id: StageId::Other("A"),
1084 checkpoint: None,
1085 target: Some(10),
1086 },
1087 PipelineEvent::Ran {
1088 pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 },
1089 stage_id: StageId::Other("A"),
1090 result: ExecOutput { checkpoint: StageCheckpoint::new(10), done: true },
1091 },
1092 PipelineEvent::Prepare {
1093 pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 },
1094 stage_id: StageId::Other("B"),
1095 checkpoint: None,
1096 target: Some(10),
1097 },
1098 PipelineEvent::Run {
1099 pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 },
1100 stage_id: StageId::Other("B"),
1101 checkpoint: None,
1102 target: Some(10),
1103 },
1104 PipelineEvent::Error { stage_id: StageId::Other("B") },
1105 PipelineEvent::Unwind {
1106 stage_id: StageId::Other("A"),
1107 input: UnwindInput {
1108 checkpoint: StageCheckpoint::new(10),
1109 unwind_to: 0,
1110 bad_block: Some(5)
1111 }
1112 },
1113 PipelineEvent::Unwound {
1114 stage_id: StageId::Other("A"),
1115 result: UnwindOutput { checkpoint: StageCheckpoint::new(0) },
1116 },
1117 PipelineEvent::Prepare {
1118 pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 },
1119 stage_id: StageId::Other("A"),
1120 checkpoint: Some(StageCheckpoint::new(0)),
1121 target: Some(10),
1122 },
1123 PipelineEvent::Run {
1124 pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 },
1125 stage_id: StageId::Other("A"),
1126 checkpoint: Some(StageCheckpoint::new(0)),
1127 target: Some(10),
1128 },
1129 PipelineEvent::Ran {
1130 pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 },
1131 stage_id: StageId::Other("A"),
1132 result: ExecOutput { checkpoint: StageCheckpoint::new(10), done: true },
1133 },
1134 PipelineEvent::Prepare {
1135 pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 },
1136 stage_id: StageId::Other("B"),
1137 checkpoint: None,
1138 target: Some(10),
1139 },
1140 PipelineEvent::Run {
1141 pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 },
1142 stage_id: StageId::Other("B"),
1143 checkpoint: None,
1144 target: Some(10),
1145 },
1146 PipelineEvent::Ran {
1147 pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 },
1148 stage_id: StageId::Other("B"),
1149 result: ExecOutput { checkpoint: StageCheckpoint::new(10), done: true },
1150 },
1151 ]
1152 );
1153 }
1154
1155 #[tokio::test]
1157 async fn pipeline_error_handling() {
1158 let provider_factory = create_test_provider_factory();
1160 let mut pipeline = Pipeline::<MockNodeTypesWithDB>::builder()
1161 .add_stage(
1162 TestStage::new(StageId::Other("NonFatal"))
1163 .add_exec(Err(StageError::Recoverable(Box::new(std::fmt::Error))))
1164 .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(10), done: true })),
1165 )
1166 .with_max_block(10)
1167 .build(
1168 provider_factory.clone(),
1169 StaticFileProducer::new(provider_factory.clone(), PruneModes::default()),
1170 );
1171 let result = pipeline.run().await;
1172 assert_matches!(result, Ok(()));
1173
1174 let provider_factory = create_test_provider_factory();
1176 let mut pipeline = Pipeline::<MockNodeTypesWithDB>::builder()
1177 .add_stage(TestStage::new(StageId::Other("Fatal")).add_exec(Err(
1178 StageError::DatabaseIntegrity(ProviderError::BlockBodyIndicesNotFound(5)),
1179 )))
1180 .build(
1181 provider_factory.clone(),
1182 StaticFileProducer::new(provider_factory.clone(), PruneModes::default()),
1183 );
1184 let result = pipeline.run().await;
1185 assert_matches!(
1186 result,
1187 Err(PipelineError::Stage(StageError::DatabaseIntegrity(
1188 ProviderError::BlockBodyIndicesNotFound(5)
1189 )))
1190 );
1191 }
1192}