1mod ctrl;
2mod event;
3pub use crate::pipeline::ctrl::ControlFlow;
4use crate::{PipelineTarget, StageCheckpoint, StageId};
5use alloy_primitives::{BlockNumber, B256};
6pub use event::*;
7use futures_util::Future;
8use reth_primitives_traits::constants::BEACON_CONSENSUS_REORG_UNWIND_DEPTH;
9use reth_provider::{
10 providers::ProviderNodeTypes, writer::UnifiedStorageWriter, BlockHashReader, BlockNumReader,
11 ChainStateBlockReader, ChainStateBlockWriter, DatabaseProviderFactory, ProviderFactory,
12 StageCheckpointReader, StageCheckpointWriter,
13};
14use reth_prune::PrunerBuilder;
15use reth_static_file::StaticFileProducer;
16use reth_tokio_util::{EventSender, EventStream};
17use std::{
18 pin::Pin,
19 time::{Duration, Instant},
20};
21use tokio::sync::watch;
22use tracing::*;
23
24mod builder;
25mod progress;
26mod set;
27
28use crate::{
29 BlockErrorKind, ExecInput, ExecOutput, MetricEvent, MetricEventsSender, PipelineError, Stage,
30 StageError, StageExt, UnwindInput,
31};
32pub use builder::*;
33use progress::*;
34use reth_errors::RethResult;
35pub use set::*;
36
37pub(crate) type BoxedStage<DB> = Box<dyn Stage<DB>>;
39
40pub type PipelineFut<N> = Pin<Box<dyn Future<Output = PipelineWithResult<N>> + Send>>;
43
44pub type PipelineWithResult<N> = (Pipeline<N>, Result<ControlFlow, PipelineError>);
46
47#[cfg_attr(doc, aquamarine::aquamarine)]
48pub struct Pipeline<N: ProviderNodeTypes> {
70 provider_factory: ProviderFactory<N>,
72 stages: Vec<BoxedStage<<ProviderFactory<N> as DatabaseProviderFactory>::ProviderRW>>,
74 max_block: Option<BlockNumber>,
76 static_file_producer: StaticFileProducer<ProviderFactory<N>>,
77 event_sender: EventSender<PipelineEvent>,
79 progress: PipelineProgress,
81 tip_tx: Option<watch::Sender<B256>>,
85 metrics_tx: Option<MetricEventsSender>,
86 fail_on_unwind: bool,
89 last_detached_head_unwind_target: Option<B256>,
92 detached_head_attempts: u64,
95}
96
97impl<N: ProviderNodeTypes> Pipeline<N> {
98 pub fn builder() -> PipelineBuilder<<ProviderFactory<N> as DatabaseProviderFactory>::ProviderRW>
100 {
101 PipelineBuilder::default()
102 }
103
104 pub const fn minimum_block_number(&self) -> Option<u64> {
107 self.progress.minimum_block_number
108 }
109
110 #[track_caller]
112 pub fn set_tip(&self, tip: B256) {
113 let _ = self.tip_tx.as_ref().expect("tip sender is set").send(tip).map_err(|_| {
114 warn!(target: "sync::pipeline", "Chain tip channel closed");
115 });
116 }
117
118 pub fn events(&self) -> EventStream<PipelineEvent> {
120 self.event_sender.new_listener()
121 }
122
123 pub fn stage(
125 &mut self,
126 idx: usize,
127 ) -> &mut dyn Stage<<ProviderFactory<N> as DatabaseProviderFactory>::ProviderRW> {
128 &mut self.stages[idx]
129 }
130}
131
132impl<N: ProviderNodeTypes> Pipeline<N> {
133 pub fn register_metrics(&mut self) -> Result<(), PipelineError> {
135 let Some(metrics_tx) = &mut self.metrics_tx else { return Ok(()) };
136 let provider = self.provider_factory.provider()?;
137
138 for stage in &self.stages {
139 let stage_id = stage.id();
140 let _ = metrics_tx.send(MetricEvent::StageCheckpoint {
141 stage_id,
142 checkpoint: provider.get_stage_checkpoint(stage_id)?.unwrap_or_default(),
143 max_block_number: None,
144 elapsed: Duration::default(),
145 });
146 }
147 Ok(())
148 }
149
150 #[track_caller]
153 pub fn run_as_fut(mut self, target: Option<PipelineTarget>) -> PipelineFut<N> {
154 let _ = self.register_metrics();
155 Box::pin(async move {
156 if let Some(target) = target {
158 match target {
159 PipelineTarget::Sync(tip) => self.set_tip(tip),
160 PipelineTarget::Unwind(target) => {
161 if let Err(err) = self.move_to_static_files() {
162 return (self, Err(err.into()))
163 }
164 if let Err(err) = self.unwind(target, None) {
165 return (self, Err(err))
166 }
167 self.progress.update(target);
168
169 return (self, Ok(ControlFlow::Continue { block_number: target }))
170 }
171 }
172 }
173
174 let result = self.run_loop().await;
175 trace!(target: "sync::pipeline", ?target, ?result, "Pipeline finished");
176 (self, result)
177 })
178 }
179
180 pub async fn run(&mut self) -> Result<(), PipelineError> {
183 let _ = self.register_metrics(); loop {
186 let next_action = self.run_loop().await?;
187
188 if next_action.is_unwind() && self.fail_on_unwind {
189 return Err(PipelineError::UnexpectedUnwind)
190 }
191
192 if next_action.should_continue() &&
195 self.progress
196 .minimum_block_number
197 .zip(self.max_block)
198 .is_some_and(|(progress, target)| progress >= target)
199 {
200 trace!(
201 target: "sync::pipeline",
202 ?next_action,
203 minimum_block_number = ?self.progress.minimum_block_number,
204 max_block = ?self.max_block,
205 "Terminating pipeline."
206 );
207 return Ok(())
208 }
209 }
210 }
211
212 pub async fn run_loop(&mut self) -> Result<ControlFlow, PipelineError> {
224 self.move_to_static_files()?;
225
226 let mut previous_stage = None;
227 for stage_index in 0..self.stages.len() {
228 let stage = &self.stages[stage_index];
229 let stage_id = stage.id();
230
231 trace!(target: "sync::pipeline", stage = %stage_id, "Executing stage");
232 let next = self.execute_stage_to_completion(previous_stage, stage_index).await?;
233
234 trace!(target: "sync::pipeline", stage = %stage_id, ?next, "Completed stage");
235
236 match next {
237 ControlFlow::NoProgress { block_number } => {
238 if let Some(block_number) = block_number {
239 self.progress.update(block_number);
240 }
241 }
242 ControlFlow::Continue { block_number } => self.progress.update(block_number),
243 ControlFlow::Unwind { target, bad_block } => {
244 self.unwind(target, Some(bad_block.block.number))?;
245 return Ok(ControlFlow::Unwind { target, bad_block })
246 }
247 }
248
249 previous_stage = Some(
250 self.provider_factory
251 .provider()?
252 .get_stage_checkpoint(stage_id)?
253 .unwrap_or_default()
254 .block_number,
255 );
256 }
257
258 Ok(self.progress.next_ctrl())
259 }
260
261 pub fn move_to_static_files(&self) -> RethResult<()> {
275 let lowest_static_file_height =
277 self.static_file_producer.lock().copy_to_static_files()?.min_block_num();
278
279 if let Some(prune_tip) = lowest_static_file_height {
281 let mut pruner = PrunerBuilder::new(Default::default())
284 .delete_limit(usize::MAX)
285 .build_with_provider_factory(self.provider_factory.clone());
286
287 pruner.run(prune_tip)?;
288 }
289
290 Ok(())
291 }
292
293 pub fn unwind(
297 &mut self,
298 to: BlockNumber,
299 bad_block: Option<BlockNumber>,
300 ) -> Result<(), PipelineError> {
301 let provider = self.provider_factory.provider()?;
303 let latest_block = provider.last_block_number()?;
304
305 let prune_modes = provider.prune_modes_ref();
307
308 prune_modes.ensure_unwind_target_unpruned(latest_block, to)?;
309
310 let unwind_pipeline = self.stages.iter_mut().rev();
312
313 let _locked_sf_producer = self.static_file_producer.lock();
316
317 let mut provider_rw = self.provider_factory.database_provider_rw()?;
318
319 for stage in unwind_pipeline {
320 let stage_id = stage.id();
321 let span = info_span!("Unwinding", stage = %stage_id);
322 let _enter = span.enter();
323
324 let mut checkpoint = provider_rw.get_stage_checkpoint(stage_id)?.unwrap_or_default();
325 if checkpoint.block_number < to {
326 debug!(
327 target: "sync::pipeline",
328 from = %checkpoint.block_number,
329 %to,
330 "Unwind point too far for stage"
331 );
332 self.event_sender.notify(PipelineEvent::Skipped { stage_id });
333
334 continue
335 }
336
337 info!(
338 target: "sync::pipeline",
339 from = %checkpoint.block_number,
340 %to,
341 ?bad_block,
342 "Starting unwind"
343 );
344 while checkpoint.block_number > to {
345 let unwind_started_at = Instant::now();
346 let input = UnwindInput { checkpoint, unwind_to: to, bad_block };
347 self.event_sender.notify(PipelineEvent::Unwind { stage_id, input });
348
349 let output = stage.unwind(&provider_rw, input);
350 match output {
351 Ok(unwind_output) => {
352 checkpoint = unwind_output.checkpoint;
353 info!(
354 target: "sync::pipeline",
355 stage = %stage_id,
356 unwind_to = to,
357 progress = checkpoint.block_number,
358 done = checkpoint.block_number == to,
359 "Stage unwound"
360 );
361
362 provider_rw.save_stage_checkpoint(stage_id, checkpoint)?;
363
364 self.event_sender
366 .notify(PipelineEvent::Unwound { stage_id, result: unwind_output });
367
368 if let Some(metrics_tx) = &mut self.metrics_tx {
369 let _ = metrics_tx.send(MetricEvent::StageCheckpoint {
370 stage_id,
371 checkpoint,
372 max_block_number: None,
375 elapsed: unwind_started_at.elapsed(),
376 });
377 }
378
379 let last_saved_finalized_block_number =
381 provider_rw.last_finalized_block_number()?;
382
383 if last_saved_finalized_block_number.is_none() ||
386 Some(checkpoint.block_number) < last_saved_finalized_block_number
387 {
388 provider_rw.save_finalized_block_number(BlockNumber::from(
389 checkpoint.block_number,
390 ))?;
391 }
392
393 UnifiedStorageWriter::commit_unwind(provider_rw)?;
394
395 stage.post_unwind_commit()?;
396
397 provider_rw = self.provider_factory.database_provider_rw()?;
398 }
399 Err(err) => {
400 self.event_sender.notify(PipelineEvent::Error { stage_id });
401
402 return Err(PipelineError::Stage(StageError::Fatal(Box::new(err))))
403 }
404 }
405 }
406 }
407
408 Ok(())
409 }
410
411 async fn execute_stage_to_completion(
412 &mut self,
413 previous_stage: Option<BlockNumber>,
414 stage_index: usize,
415 ) -> Result<ControlFlow, PipelineError> {
416 let total_stages = self.stages.len();
417
418 let stage_id = self.stage(stage_index).id();
419 let mut made_progress = false;
420 let target = self.max_block.or(previous_stage);
421
422 loop {
423 let prev_checkpoint = self.provider_factory.get_stage_checkpoint(stage_id)?;
424
425 let stage_reached_max_block = prev_checkpoint
426 .zip(self.max_block)
427 .is_some_and(|(prev_progress, target)| prev_progress.block_number >= target);
428 if stage_reached_max_block {
429 warn!(
430 target: "sync::pipeline",
431 stage = %stage_id,
432 max_block = self.max_block,
433 prev_block = prev_checkpoint.map(|progress| progress.block_number),
434 "Stage reached target block, skipping."
435 );
436 self.event_sender.notify(PipelineEvent::Skipped { stage_id });
437
438 return Ok(ControlFlow::NoProgress {
440 block_number: prev_checkpoint.map(|progress| progress.block_number),
441 })
442 }
443
444 let exec_input = ExecInput { target, checkpoint: prev_checkpoint };
445
446 self.event_sender.notify(PipelineEvent::Prepare {
447 pipeline_stages_progress: PipelineStagesProgress {
448 current: stage_index + 1,
449 total: total_stages,
450 },
451 stage_id,
452 checkpoint: prev_checkpoint,
453 target,
454 });
455
456 if let Err(err) = self.stage(stage_index).execute_ready(exec_input).await {
457 self.event_sender.notify(PipelineEvent::Error { stage_id });
458 match self.on_stage_error(stage_id, prev_checkpoint, err)? {
459 Some(ctrl) => return Ok(ctrl),
460 None => continue,
461 };
462 }
463
464 let stage_started_at = Instant::now();
465 let provider_rw = self.provider_factory.database_provider_rw()?;
466
467 self.event_sender.notify(PipelineEvent::Run {
468 pipeline_stages_progress: PipelineStagesProgress {
469 current: stage_index + 1,
470 total: total_stages,
471 },
472 stage_id,
473 checkpoint: prev_checkpoint,
474 target,
475 });
476
477 match self.stage(stage_index).execute(&provider_rw, exec_input) {
478 Ok(out @ ExecOutput { checkpoint, done }) => {
479 provider_rw.save_stage_checkpoint(stage_id, checkpoint)?;
481
482 UnifiedStorageWriter::commit(provider_rw)?;
484
485 self.stage(stage_index).post_execute_commit()?;
487
488 self.event_sender.notify(PipelineEvent::Ran {
490 pipeline_stages_progress: PipelineStagesProgress {
491 current: stage_index + 1,
492 total: total_stages,
493 },
494 stage_id,
495 result: out.clone(),
496 });
497 if let Some(metrics_tx) = &mut self.metrics_tx {
498 let _ = metrics_tx.send(MetricEvent::StageCheckpoint {
499 stage_id,
500 checkpoint,
501 max_block_number: target,
502 elapsed: stage_started_at.elapsed(),
503 });
504 }
505
506 let block_number = checkpoint.block_number;
507 let prev_block_number = prev_checkpoint.unwrap_or_default().block_number;
508 made_progress |= block_number != prev_block_number;
509 if done {
510 return Ok(if made_progress {
511 ControlFlow::Continue { block_number }
512 } else {
513 ControlFlow::NoProgress { block_number: Some(block_number) }
514 })
515 }
516 }
517 Err(err) => {
518 drop(provider_rw);
519 self.event_sender.notify(PipelineEvent::Error { stage_id });
520
521 if let Some(ctrl) = self.on_stage_error(stage_id, prev_checkpoint, err)? {
522 return Ok(ctrl)
523 }
524 }
525 }
526 }
527 }
528
529 fn on_stage_error(
530 &mut self,
531 stage_id: StageId,
532 prev_checkpoint: Option<StageCheckpoint>,
533 err: StageError,
534 ) -> Result<Option<ControlFlow>, PipelineError> {
535 if let StageError::DetachedHead { local_head, header, error } = err {
536 warn!(target: "sync::pipeline", stage = %stage_id, ?local_head, ?header, %error, "Stage encountered detached head");
537
538 if let Some(last_detached_head_unwind_target) = self.last_detached_head_unwind_target {
539 if local_head.block.hash == last_detached_head_unwind_target &&
540 header.block.number == local_head.block.number + 1
541 {
542 self.detached_head_attempts += 1;
543 } else {
544 self.detached_head_attempts = 1;
545 }
546 } else {
547 self.detached_head_attempts = 1;
548 }
549
550 let unwind_to = local_head
552 .block
553 .number
554 .saturating_sub(
555 BEACON_CONSENSUS_REORG_UNWIND_DEPTH.saturating_mul(self.detached_head_attempts),
556 )
557 .max(1);
558
559 self.last_detached_head_unwind_target = self.provider_factory.block_hash(unwind_to)?;
560 Ok(Some(ControlFlow::Unwind { target: unwind_to, bad_block: local_head }))
561 } else if let StageError::Block { block, error } = err {
562 match error {
563 BlockErrorKind::Validation(validation_error) => {
564 error!(
565 target: "sync::pipeline",
566 stage = %stage_id,
567 bad_block = %block.block.number,
568 "Stage encountered a validation error: {validation_error}"
569 );
570
571 let provider_rw = self.provider_factory.database_provider_rw()?;
575 provider_rw.save_stage_checkpoint_progress(StageId::MerkleExecute, vec![])?;
576 provider_rw.save_stage_checkpoint(
577 StageId::MerkleExecute,
578 prev_checkpoint.unwrap_or_default(),
579 )?;
580
581 UnifiedStorageWriter::commit(provider_rw)?;
582
583 Ok(Some(ControlFlow::Unwind {
588 target: prev_checkpoint.unwrap_or_default().block_number,
589 bad_block: block,
590 }))
591 }
592 BlockErrorKind::Execution(execution_error) => {
593 error!(
594 target: "sync::pipeline",
595 stage = %stage_id,
596 bad_block = %block.block.number,
597 "Stage encountered an execution error: {execution_error}"
598 );
599
600 Ok(Some(ControlFlow::Unwind {
605 target: prev_checkpoint.unwrap_or_default().block_number,
606 bad_block: block,
607 }))
608 }
609 }
610 } else if let StageError::MissingStaticFileData { block, segment } = err {
611 error!(
612 target: "sync::pipeline",
613 stage = %stage_id,
614 bad_block = %block.block.number,
615 segment = %segment,
616 "Stage is missing static file data."
617 );
618
619 Ok(Some(ControlFlow::Unwind { target: block.block.number - 1, bad_block: block }))
620 } else if err.is_fatal() {
621 error!(target: "sync::pipeline", stage = %stage_id, "Stage encountered a fatal error: {err}");
622 Err(err.into())
623 } else {
624 warn!(
627 target: "sync::pipeline",
628 stage = %stage_id,
629 "Stage encountered a non-fatal error: {err}. Retrying..."
630 );
631 Ok(None)
632 }
633 }
634}
635
636impl<N: ProviderNodeTypes> std::fmt::Debug for Pipeline<N> {
637 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
638 f.debug_struct("Pipeline")
639 .field("stages", &self.stages.iter().map(|stage| stage.id()).collect::<Vec<StageId>>())
640 .field("max_block", &self.max_block)
641 .field("event_sender", &self.event_sender)
642 .field("fail_on_unwind", &self.fail_on_unwind)
643 .finish()
644 }
645}
646
647#[cfg(test)]
648mod tests {
649 use std::sync::atomic::Ordering;
650
651 use super::*;
652 use crate::{test_utils::TestStage, UnwindOutput};
653 use assert_matches::assert_matches;
654 use reth_consensus::ConsensusError;
655 use reth_errors::ProviderError;
656 use reth_provider::test_utils::{create_test_provider_factory, MockNodeTypesWithDB};
657 use reth_prune::PruneModes;
658 use reth_testing_utils::generators::{self, random_block_with_parent};
659 use tokio_stream::StreamExt;
660
661 #[test]
662 fn record_progress_calculates_outliers() {
663 let mut progress = PipelineProgress::default();
664
665 progress.update(10);
666 assert_eq!(progress.minimum_block_number, Some(10));
667 assert_eq!(progress.maximum_block_number, Some(10));
668
669 progress.update(20);
670 assert_eq!(progress.minimum_block_number, Some(10));
671 assert_eq!(progress.maximum_block_number, Some(20));
672
673 progress.update(1);
674 assert_eq!(progress.minimum_block_number, Some(1));
675 assert_eq!(progress.maximum_block_number, Some(20));
676 }
677
678 #[test]
679 fn progress_ctrl_flow() {
680 let mut progress = PipelineProgress::default();
681
682 assert_eq!(progress.next_ctrl(), ControlFlow::NoProgress { block_number: None });
683
684 progress.update(1);
685 assert_eq!(progress.next_ctrl(), ControlFlow::Continue { block_number: 1 });
686 }
687
688 #[tokio::test]
690 async fn run_pipeline() {
691 let provider_factory = create_test_provider_factory();
692
693 let stage_a = TestStage::new(StageId::Other("A"))
694 .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(20), done: true }));
695 let (stage_a, post_execute_commit_counter_a) = stage_a.with_post_execute_commit_counter();
696 let (stage_a, post_unwind_commit_counter_a) = stage_a.with_post_unwind_commit_counter();
697
698 let stage_b = TestStage::new(StageId::Other("B"))
699 .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(10), done: true }));
700 let (stage_b, post_execute_commit_counter_b) = stage_b.with_post_execute_commit_counter();
701 let (stage_b, post_unwind_commit_counter_b) = stage_b.with_post_unwind_commit_counter();
702
703 let mut pipeline = Pipeline::<MockNodeTypesWithDB>::builder()
704 .add_stage(stage_a)
705 .add_stage(stage_b)
706 .with_max_block(10)
707 .build(
708 provider_factory.clone(),
709 StaticFileProducer::new(provider_factory.clone(), PruneModes::default()),
710 );
711 let events = pipeline.events();
712
713 tokio::spawn(async move {
715 pipeline.run().await.unwrap();
716 });
717
718 assert_eq!(
720 events.collect::<Vec<PipelineEvent>>().await,
721 vec![
722 PipelineEvent::Prepare {
723 pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 },
724 stage_id: StageId::Other("A"),
725 checkpoint: None,
726 target: Some(10),
727 },
728 PipelineEvent::Run {
729 pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 },
730 stage_id: StageId::Other("A"),
731 checkpoint: None,
732 target: Some(10),
733 },
734 PipelineEvent::Ran {
735 pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 },
736 stage_id: StageId::Other("A"),
737 result: ExecOutput { checkpoint: StageCheckpoint::new(20), done: true },
738 },
739 PipelineEvent::Prepare {
740 pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 },
741 stage_id: StageId::Other("B"),
742 checkpoint: None,
743 target: Some(10),
744 },
745 PipelineEvent::Run {
746 pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 },
747 stage_id: StageId::Other("B"),
748 checkpoint: None,
749 target: Some(10),
750 },
751 PipelineEvent::Ran {
752 pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 },
753 stage_id: StageId::Other("B"),
754 result: ExecOutput { checkpoint: StageCheckpoint::new(10), done: true },
755 },
756 ]
757 );
758
759 assert_eq!(post_execute_commit_counter_a.load(Ordering::Relaxed), 1);
760 assert_eq!(post_unwind_commit_counter_a.load(Ordering::Relaxed), 0);
761
762 assert_eq!(post_execute_commit_counter_b.load(Ordering::Relaxed), 1);
763 assert_eq!(post_unwind_commit_counter_b.load(Ordering::Relaxed), 0);
764 }
765
766 #[tokio::test]
768 async fn unwind_pipeline() {
769 let provider_factory = create_test_provider_factory();
770
771 let stage_a = TestStage::new(StageId::Other("A"))
772 .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(100), done: true }))
773 .add_unwind(Ok(UnwindOutput { checkpoint: StageCheckpoint::new(1) }));
774 let (stage_a, post_execute_commit_counter_a) = stage_a.with_post_execute_commit_counter();
775 let (stage_a, post_unwind_commit_counter_a) = stage_a.with_post_unwind_commit_counter();
776
777 let stage_b = TestStage::new(StageId::Other("B"))
778 .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(10), done: true }))
779 .add_unwind(Ok(UnwindOutput { checkpoint: StageCheckpoint::new(1) }));
780 let (stage_b, post_execute_commit_counter_b) = stage_b.with_post_execute_commit_counter();
781 let (stage_b, post_unwind_commit_counter_b) = stage_b.with_post_unwind_commit_counter();
782
783 let stage_c = TestStage::new(StageId::Other("C"))
784 .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(20), done: true }))
785 .add_unwind(Ok(UnwindOutput { checkpoint: StageCheckpoint::new(1) }));
786 let (stage_c, post_execute_commit_counter_c) = stage_c.with_post_execute_commit_counter();
787 let (stage_c, post_unwind_commit_counter_c) = stage_c.with_post_unwind_commit_counter();
788
789 let mut pipeline = Pipeline::<MockNodeTypesWithDB>::builder()
790 .add_stage(stage_a)
791 .add_stage(stage_b)
792 .add_stage(stage_c)
793 .with_max_block(10)
794 .build(
795 provider_factory.clone(),
796 StaticFileProducer::new(provider_factory.clone(), PruneModes::default()),
797 );
798 let events = pipeline.events();
799
800 tokio::spawn(async move {
802 pipeline.run().await.expect("Could not run pipeline");
804
805 pipeline.unwind(1, None).expect("Could not unwind pipeline");
807 });
808
809 assert_eq!(
811 events.collect::<Vec<PipelineEvent>>().await,
812 vec![
813 PipelineEvent::Prepare {
815 pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 3 },
816 stage_id: StageId::Other("A"),
817 checkpoint: None,
818 target: Some(10),
819 },
820 PipelineEvent::Run {
821 pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 3 },
822 stage_id: StageId::Other("A"),
823 checkpoint: None,
824 target: Some(10),
825 },
826 PipelineEvent::Ran {
827 pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 3 },
828 stage_id: StageId::Other("A"),
829 result: ExecOutput { checkpoint: StageCheckpoint::new(100), done: true },
830 },
831 PipelineEvent::Prepare {
832 pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 3 },
833 stage_id: StageId::Other("B"),
834 checkpoint: None,
835 target: Some(10),
836 },
837 PipelineEvent::Run {
838 pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 3 },
839 stage_id: StageId::Other("B"),
840 checkpoint: None,
841 target: Some(10),
842 },
843 PipelineEvent::Ran {
844 pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 3 },
845 stage_id: StageId::Other("B"),
846 result: ExecOutput { checkpoint: StageCheckpoint::new(10), done: true },
847 },
848 PipelineEvent::Prepare {
849 pipeline_stages_progress: PipelineStagesProgress { current: 3, total: 3 },
850 stage_id: StageId::Other("C"),
851 checkpoint: None,
852 target: Some(10),
853 },
854 PipelineEvent::Run {
855 pipeline_stages_progress: PipelineStagesProgress { current: 3, total: 3 },
856 stage_id: StageId::Other("C"),
857 checkpoint: None,
858 target: Some(10),
859 },
860 PipelineEvent::Ran {
861 pipeline_stages_progress: PipelineStagesProgress { current: 3, total: 3 },
862 stage_id: StageId::Other("C"),
863 result: ExecOutput { checkpoint: StageCheckpoint::new(20), done: true },
864 },
865 PipelineEvent::Unwind {
867 stage_id: StageId::Other("C"),
868 input: UnwindInput {
869 checkpoint: StageCheckpoint::new(20),
870 unwind_to: 1,
871 bad_block: None
872 }
873 },
874 PipelineEvent::Unwound {
875 stage_id: StageId::Other("C"),
876 result: UnwindOutput { checkpoint: StageCheckpoint::new(1) },
877 },
878 PipelineEvent::Unwind {
879 stage_id: StageId::Other("B"),
880 input: UnwindInput {
881 checkpoint: StageCheckpoint::new(10),
882 unwind_to: 1,
883 bad_block: None
884 }
885 },
886 PipelineEvent::Unwound {
887 stage_id: StageId::Other("B"),
888 result: UnwindOutput { checkpoint: StageCheckpoint::new(1) },
889 },
890 PipelineEvent::Unwind {
891 stage_id: StageId::Other("A"),
892 input: UnwindInput {
893 checkpoint: StageCheckpoint::new(100),
894 unwind_to: 1,
895 bad_block: None
896 }
897 },
898 PipelineEvent::Unwound {
899 stage_id: StageId::Other("A"),
900 result: UnwindOutput { checkpoint: StageCheckpoint::new(1) },
901 },
902 ]
903 );
904
905 assert_eq!(post_execute_commit_counter_a.load(Ordering::Relaxed), 1);
906 assert_eq!(post_unwind_commit_counter_a.load(Ordering::Relaxed), 1);
907
908 assert_eq!(post_execute_commit_counter_b.load(Ordering::Relaxed), 1);
909 assert_eq!(post_unwind_commit_counter_b.load(Ordering::Relaxed), 1);
910
911 assert_eq!(post_execute_commit_counter_c.load(Ordering::Relaxed), 1);
912 assert_eq!(post_unwind_commit_counter_c.load(Ordering::Relaxed), 1);
913 }
914
915 #[tokio::test]
917 async fn unwind_pipeline_with_intermediate_progress() {
918 let provider_factory = create_test_provider_factory();
919
920 let mut pipeline = Pipeline::<MockNodeTypesWithDB>::builder()
921 .add_stage(
922 TestStage::new(StageId::Other("A"))
923 .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(100), done: true }))
924 .add_unwind(Ok(UnwindOutput { checkpoint: StageCheckpoint::new(50) })),
925 )
926 .add_stage(
927 TestStage::new(StageId::Other("B"))
928 .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(10), done: true })),
929 )
930 .with_max_block(10)
931 .build(
932 provider_factory.clone(),
933 StaticFileProducer::new(provider_factory.clone(), PruneModes::default()),
934 );
935 let events = pipeline.events();
936
937 tokio::spawn(async move {
939 pipeline.run().await.expect("Could not run pipeline");
941
942 pipeline.unwind(50, None).expect("Could not unwind pipeline");
944 });
945
946 assert_eq!(
948 events.collect::<Vec<PipelineEvent>>().await,
949 vec![
950 PipelineEvent::Prepare {
952 pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 },
953 stage_id: StageId::Other("A"),
954 checkpoint: None,
955 target: Some(10),
956 },
957 PipelineEvent::Run {
958 pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 },
959 stage_id: StageId::Other("A"),
960 checkpoint: None,
961 target: Some(10),
962 },
963 PipelineEvent::Ran {
964 pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 },
965 stage_id: StageId::Other("A"),
966 result: ExecOutput { checkpoint: StageCheckpoint::new(100), done: true },
967 },
968 PipelineEvent::Prepare {
969 pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 },
970 stage_id: StageId::Other("B"),
971 checkpoint: None,
972 target: Some(10),
973 },
974 PipelineEvent::Run {
975 pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 },
976 stage_id: StageId::Other("B"),
977 checkpoint: None,
978 target: Some(10),
979 },
980 PipelineEvent::Ran {
981 pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 },
982 stage_id: StageId::Other("B"),
983 result: ExecOutput { checkpoint: StageCheckpoint::new(10), done: true },
984 },
985 PipelineEvent::Skipped { stage_id: StageId::Other("B") },
988 PipelineEvent::Unwind {
989 stage_id: StageId::Other("A"),
990 input: UnwindInput {
991 checkpoint: StageCheckpoint::new(100),
992 unwind_to: 50,
993 bad_block: None
994 }
995 },
996 PipelineEvent::Unwound {
997 stage_id: StageId::Other("A"),
998 result: UnwindOutput { checkpoint: StageCheckpoint::new(50) },
999 },
1000 ]
1001 );
1002 }
1003
1004 #[tokio::test]
1017 async fn run_pipeline_with_unwind() {
1018 let provider_factory = create_test_provider_factory();
1019
1020 let mut pipeline = Pipeline::<MockNodeTypesWithDB>::builder()
1021 .add_stage(
1022 TestStage::new(StageId::Other("A"))
1023 .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(10), done: true }))
1024 .add_unwind(Ok(UnwindOutput { checkpoint: StageCheckpoint::new(0) }))
1025 .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(10), done: true })),
1026 )
1027 .add_stage(
1028 TestStage::new(StageId::Other("B"))
1029 .add_exec(Err(StageError::Block {
1030 block: Box::new(random_block_with_parent(
1031 &mut generators::rng(),
1032 5,
1033 Default::default(),
1034 )),
1035 error: BlockErrorKind::Validation(ConsensusError::BaseFeeMissing),
1036 }))
1037 .add_unwind(Ok(UnwindOutput { checkpoint: StageCheckpoint::new(0) }))
1038 .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(10), done: true })),
1039 )
1040 .with_max_block(10)
1041 .build(
1042 provider_factory.clone(),
1043 StaticFileProducer::new(provider_factory.clone(), PruneModes::default()),
1044 );
1045 let events = pipeline.events();
1046
1047 tokio::spawn(async move {
1049 pipeline.run().await.expect("Could not run pipeline");
1050 });
1051
1052 assert_eq!(
1054 events.collect::<Vec<PipelineEvent>>().await,
1055 vec![
1056 PipelineEvent::Prepare {
1057 pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 },
1058 stage_id: StageId::Other("A"),
1059 checkpoint: None,
1060 target: Some(10),
1061 },
1062 PipelineEvent::Run {
1063 pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 },
1064 stage_id: StageId::Other("A"),
1065 checkpoint: None,
1066 target: Some(10),
1067 },
1068 PipelineEvent::Ran {
1069 pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 },
1070 stage_id: StageId::Other("A"),
1071 result: ExecOutput { checkpoint: StageCheckpoint::new(10), done: true },
1072 },
1073 PipelineEvent::Prepare {
1074 pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 },
1075 stage_id: StageId::Other("B"),
1076 checkpoint: None,
1077 target: Some(10),
1078 },
1079 PipelineEvent::Run {
1080 pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 },
1081 stage_id: StageId::Other("B"),
1082 checkpoint: None,
1083 target: Some(10),
1084 },
1085 PipelineEvent::Error { stage_id: StageId::Other("B") },
1086 PipelineEvent::Unwind {
1087 stage_id: StageId::Other("A"),
1088 input: UnwindInput {
1089 checkpoint: StageCheckpoint::new(10),
1090 unwind_to: 0,
1091 bad_block: Some(5)
1092 }
1093 },
1094 PipelineEvent::Unwound {
1095 stage_id: StageId::Other("A"),
1096 result: UnwindOutput { checkpoint: StageCheckpoint::new(0) },
1097 },
1098 PipelineEvent::Prepare {
1099 pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 },
1100 stage_id: StageId::Other("A"),
1101 checkpoint: Some(StageCheckpoint::new(0)),
1102 target: Some(10),
1103 },
1104 PipelineEvent::Run {
1105 pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 },
1106 stage_id: StageId::Other("A"),
1107 checkpoint: Some(StageCheckpoint::new(0)),
1108 target: Some(10),
1109 },
1110 PipelineEvent::Ran {
1111 pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 },
1112 stage_id: StageId::Other("A"),
1113 result: ExecOutput { checkpoint: StageCheckpoint::new(10), done: true },
1114 },
1115 PipelineEvent::Prepare {
1116 pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 },
1117 stage_id: StageId::Other("B"),
1118 checkpoint: None,
1119 target: Some(10),
1120 },
1121 PipelineEvent::Run {
1122 pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 },
1123 stage_id: StageId::Other("B"),
1124 checkpoint: None,
1125 target: Some(10),
1126 },
1127 PipelineEvent::Ran {
1128 pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 },
1129 stage_id: StageId::Other("B"),
1130 result: ExecOutput { checkpoint: StageCheckpoint::new(10), done: true },
1131 },
1132 ]
1133 );
1134 }
1135
1136 #[tokio::test]
1138 async fn pipeline_error_handling() {
1139 let provider_factory = create_test_provider_factory();
1141 let mut pipeline = Pipeline::<MockNodeTypesWithDB>::builder()
1142 .add_stage(
1143 TestStage::new(StageId::Other("NonFatal"))
1144 .add_exec(Err(StageError::Recoverable(Box::new(std::fmt::Error))))
1145 .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(10), done: true })),
1146 )
1147 .with_max_block(10)
1148 .build(
1149 provider_factory.clone(),
1150 StaticFileProducer::new(provider_factory.clone(), PruneModes::default()),
1151 );
1152 let result = pipeline.run().await;
1153 assert_matches!(result, Ok(()));
1154
1155 let provider_factory = create_test_provider_factory();
1157 let mut pipeline = Pipeline::<MockNodeTypesWithDB>::builder()
1158 .add_stage(TestStage::new(StageId::Other("Fatal")).add_exec(Err(
1159 StageError::DatabaseIntegrity(ProviderError::BlockBodyIndicesNotFound(5)),
1160 )))
1161 .build(
1162 provider_factory.clone(),
1163 StaticFileProducer::new(provider_factory.clone(), PruneModes::default()),
1164 );
1165 let result = pipeline.run().await;
1166 assert_matches!(
1167 result,
1168 Err(PipelineError::Stage(StageError::DatabaseIntegrity(
1169 ProviderError::BlockBodyIndicesNotFound(5)
1170 )))
1171 );
1172 }
1173}