1mod ctrl;
2mod event;
3pub use crate::pipeline::ctrl::ControlFlow;
4use crate::{PipelineTarget, StageCheckpoint, StageId};
5use alloy_primitives::{BlockNumber, B256};
6pub use event::*;
7use futures_util::Future;
8use reth_primitives_traits::constants::BEACON_CONSENSUS_REORG_UNWIND_DEPTH;
9use reth_provider::{
10 providers::ProviderNodeTypes, BlockHashReader, BlockNumReader, ChainStateBlockReader,
11 ChainStateBlockWriter, DBProvider, DatabaseProviderFactory, ProviderFactory,
12 PruneCheckpointReader, StageCheckpointReader, StageCheckpointWriter, StorageSettingsCache,
13};
14use reth_prune::PrunerBuilder;
15use reth_static_file::StaticFileProducer;
16use reth_tokio_util::{EventSender, EventStream};
17use std::{
18 pin::Pin,
19 time::{Duration, Instant},
20};
21use tokio::sync::watch;
22use tracing::*;
23
24mod builder;
25mod progress;
26mod set;
27
28use crate::{
29 BlockErrorKind, ExecInput, ExecOutput, MetricEvent, MetricEventsSender, PipelineError, Stage,
30 StageError, StageExt, UnwindInput,
31};
32pub use builder::*;
33use progress::*;
34use reth_errors::RethResult;
35pub use set::*;
36
37pub(crate) type BoxedStage<DB> = Box<dyn Stage<DB>>;
39
40pub type PipelineFut<N> = Pin<Box<dyn Future<Output = PipelineWithResult<N>> + Send>>;
43
44pub type PipelineWithResult<N> = (Pipeline<N>, Result<ControlFlow, PipelineError>);
46
47#[cfg_attr(doc, aquamarine::aquamarine)]
48pub struct Pipeline<N: ProviderNodeTypes> {
70 provider_factory: ProviderFactory<N>,
72 stages: Vec<BoxedStage<<ProviderFactory<N> as DatabaseProviderFactory>::ProviderRW>>,
74 max_block: Option<BlockNumber>,
76 static_file_producer: StaticFileProducer<ProviderFactory<N>>,
77 event_sender: EventSender<PipelineEvent>,
79 progress: PipelineProgress,
81 tip_tx: Option<watch::Sender<B256>>,
85 metrics_tx: Option<MetricEventsSender>,
86 fail_on_unwind: bool,
89 last_detached_head_unwind_target: Option<B256>,
92 detached_head_attempts: u64,
95}
96
97impl<N: ProviderNodeTypes> Pipeline<N> {
98 pub fn builder() -> PipelineBuilder<<ProviderFactory<N> as DatabaseProviderFactory>::ProviderRW>
100 {
101 PipelineBuilder::default()
102 }
103
104 pub const fn minimum_block_number(&self) -> Option<u64> {
107 self.progress.minimum_block_number
108 }
109
110 #[track_caller]
112 pub fn set_tip(&self, tip: B256) {
113 let _ = self.tip_tx.as_ref().expect("tip sender is set").send(tip).map_err(|_| {
114 warn!(target: "sync::pipeline", "Chain tip channel closed");
115 });
116 }
117
118 pub fn events(&self) -> EventStream<PipelineEvent> {
120 self.event_sender.new_listener()
121 }
122
123 pub fn stage(
125 &mut self,
126 idx: usize,
127 ) -> &mut dyn Stage<<ProviderFactory<N> as DatabaseProviderFactory>::ProviderRW> {
128 &mut self.stages[idx]
129 }
130}
131
132impl<N: ProviderNodeTypes> Pipeline<N> {
133 pub fn register_metrics(&mut self) -> Result<(), PipelineError> {
135 let Some(metrics_tx) = &mut self.metrics_tx else { return Ok(()) };
136 let provider = self.provider_factory.provider()?;
137
138 for stage in &self.stages {
139 let stage_id = stage.id();
140 let _ = metrics_tx.send(MetricEvent::StageCheckpoint {
141 stage_id,
142 checkpoint: provider.get_stage_checkpoint(stage_id)?.unwrap_or_default(),
143 max_block_number: None,
144 elapsed: Duration::default(),
145 });
146 }
147 Ok(())
148 }
149
150 #[track_caller]
153 pub fn run_as_fut(mut self, target: Option<PipelineTarget>) -> PipelineFut<N> {
154 let _ = self.register_metrics();
155 Box::pin(async move {
156 if let Some(target) = target {
158 match target {
159 PipelineTarget::Sync(tip) => self.set_tip(tip),
160 PipelineTarget::Unwind(target) => {
161 if let Err(err) = self.move_to_static_files() {
162 return (self, Err(err.into()))
163 }
164 if let Err(err) = self.unwind(target, None) {
165 return (self, Err(err))
166 }
167 self.progress.update(target);
168
169 return (self, Ok(ControlFlow::Continue { block_number: target }))
170 }
171 }
172 }
173
174 let result = self.run_loop().await;
175 trace!(target: "sync::pipeline", ?target, ?result, "Pipeline finished");
176 (self, result)
177 })
178 }
179
180 pub async fn run(&mut self) -> Result<(), PipelineError> {
183 let _ = self.register_metrics(); loop {
186 let next_action = self.run_loop().await?;
187
188 if next_action.is_unwind() && self.fail_on_unwind {
189 return Err(PipelineError::UnexpectedUnwind)
190 }
191
192 if next_action.should_continue() &&
195 self.progress
196 .minimum_block_number
197 .zip(self.max_block)
198 .is_some_and(|(progress, target)| progress >= target)
199 {
200 trace!(
201 target: "sync::pipeline",
202 ?next_action,
203 minimum_block_number = ?self.progress.minimum_block_number,
204 max_block = ?self.max_block,
205 "Terminating pipeline."
206 );
207 return Ok(())
208 }
209 }
210 }
211
212 pub async fn run_loop(&mut self) -> Result<ControlFlow, PipelineError> {
224 self.move_to_static_files()?;
225
226 let mut previous_stage = None;
227 for stage_index in 0..self.stages.len() {
228 let stage = &self.stages[stage_index];
229 let stage_id = stage.id();
230
231 trace!(target: "sync::pipeline", stage = %stage_id, "Executing stage");
232 let next = self.execute_stage_to_completion(previous_stage, stage_index).await?;
233
234 trace!(target: "sync::pipeline", stage = %stage_id, ?next, "Completed stage");
235
236 match next {
237 ControlFlow::NoProgress { block_number } => {
238 if let Some(block_number) = block_number {
239 self.progress.update(block_number);
240 }
241 }
242 ControlFlow::Continue { block_number } => self.progress.update(block_number),
243 ControlFlow::Unwind { target, bad_block } => {
244 self.unwind(target, Some(bad_block.block.number))?;
245 return Ok(ControlFlow::Unwind { target, bad_block })
246 }
247 }
248
249 previous_stage = Some(
250 self.provider_factory
251 .provider()?
252 .get_stage_checkpoint(stage_id)?
253 .unwrap_or_default()
254 .block_number,
255 );
256 }
257
258 Ok(self.progress.next_ctrl())
259 }
260
261 pub fn move_to_static_files(&self) -> RethResult<()> {
278 if self.provider_factory.cached_storage_settings().is_v2() {
279 return Ok(())
280 }
281
282 let lowest_static_file_height =
284 self.static_file_producer.lock().copy_to_static_files()?.min_block_num();
285
286 if let Some(prune_tip) = lowest_static_file_height {
288 let mut pruner = PrunerBuilder::new(Default::default())
291 .delete_limit(usize::MAX)
292 .build_with_provider_factory(self.provider_factory.clone());
293
294 pruner.run(prune_tip)?;
295 }
296
297 Ok(())
298 }
299
300 pub fn unwind(
304 &mut self,
305 to: BlockNumber,
306 bad_block: Option<BlockNumber>,
307 ) -> Result<(), PipelineError> {
308 let (latest_block, prune_modes, checkpoints) = {
310 let provider = self.provider_factory.provider()?;
311 (
312 provider.last_block_number()?,
313 provider.prune_modes_ref().clone(),
314 provider.get_prune_checkpoints()?,
315 )
316 };
317 prune_modes.ensure_unwind_target_unpruned(latest_block, to, &checkpoints)?;
318
319 let unwind_pipeline = self.stages.iter_mut().rev();
321
322 let _locked_sf_producer = self.static_file_producer.lock();
325
326 let mut provider_rw =
327 self.provider_factory.unwind_provider_rw()?.disable_long_read_transaction_safety();
328
329 for stage in unwind_pipeline {
330 let stage_id = stage.id();
331 let span = info_span!("Unwinding", stage = %stage_id);
332 let _enter = span.enter();
333
334 let mut checkpoint = provider_rw.get_stage_checkpoint(stage_id)?.unwrap_or_default();
335 if checkpoint.block_number < to {
336 debug!(
337 target: "sync::pipeline",
338 from = %checkpoint.block_number,
339 %to,
340 "Unwind point too far for stage"
341 );
342 self.event_sender.notify(PipelineEvent::Skipped { stage_id });
343
344 continue
345 }
346
347 info!(
348 target: "sync::pipeline",
349 from = %checkpoint.block_number,
350 %to,
351 ?bad_block,
352 "Starting unwind"
353 );
354 while checkpoint.block_number > to {
355 let unwind_started_at = Instant::now();
356 let input = UnwindInput { checkpoint, unwind_to: to, bad_block };
357 self.event_sender.notify(PipelineEvent::Unwind { stage_id, input });
358
359 let output = stage.unwind(&provider_rw, input);
360 match output {
361 Ok(unwind_output) => {
362 checkpoint = unwind_output.checkpoint;
363 info!(
364 target: "sync::pipeline",
365 stage = %stage_id,
366 unwind_to = to,
367 progress = checkpoint.block_number,
368 done = checkpoint.block_number == to,
369 "Stage unwound"
370 );
371
372 provider_rw.save_stage_checkpoint(stage_id, checkpoint)?;
373
374 self.event_sender
376 .notify(PipelineEvent::Unwound { stage_id, result: unwind_output });
377
378 if let Some(metrics_tx) = &mut self.metrics_tx {
379 let _ = metrics_tx.send(MetricEvent::StageCheckpoint {
380 stage_id,
381 checkpoint,
382 max_block_number: None,
385 elapsed: unwind_started_at.elapsed(),
386 });
387 }
388
389 let last_saved_finalized_block_number =
391 provider_rw.last_finalized_block_number()?;
392
393 if last_saved_finalized_block_number.is_none() ||
396 Some(checkpoint.block_number) < last_saved_finalized_block_number
397 {
398 provider_rw.save_finalized_block_number(BlockNumber::from(
399 checkpoint.block_number,
400 ))?;
401 }
402
403 let last_saved_safe_block_number = provider_rw.last_safe_block_number()?;
404
405 if last_saved_safe_block_number.is_none() ||
406 Some(checkpoint.block_number) < last_saved_safe_block_number
407 {
408 provider_rw.save_safe_block_number(BlockNumber::from(
409 checkpoint.block_number,
410 ))?;
411 }
412
413 provider_rw.commit()?;
414
415 stage.post_unwind_commit()?;
416
417 provider_rw = self.provider_factory.unwind_provider_rw()?;
418 }
419 Err(err) => {
420 self.event_sender.notify(PipelineEvent::Error { stage_id });
421
422 return Err(PipelineError::Stage(StageError::Fatal(Box::new(err))))
423 }
424 }
425 }
426 }
427
428 Ok(())
429 }
430
431 async fn execute_stage_to_completion(
432 &mut self,
433 previous_stage: Option<BlockNumber>,
434 stage_index: usize,
435 ) -> Result<ControlFlow, PipelineError> {
436 let total_stages = self.stages.len();
437
438 let stage_id = self.stage(stage_index).id();
439 let mut made_progress = false;
440 let target = self.max_block.or(previous_stage);
441
442 loop {
443 let prev_checkpoint = self.provider_factory.get_stage_checkpoint(stage_id)?;
444
445 let stage_reached_max_block = prev_checkpoint
446 .zip(self.max_block)
447 .is_some_and(|(prev_progress, target)| prev_progress.block_number >= target);
448 if stage_reached_max_block {
449 warn!(
450 target: "sync::pipeline",
451 stage = %stage_id,
452 max_block = self.max_block,
453 prev_block = prev_checkpoint.map(|progress| progress.block_number),
454 "Stage reached target block, skipping."
455 );
456 self.event_sender.notify(PipelineEvent::Skipped { stage_id });
457
458 return Ok(ControlFlow::NoProgress {
460 block_number: prev_checkpoint.map(|progress| progress.block_number),
461 })
462 }
463
464 let exec_input = ExecInput { target, checkpoint: prev_checkpoint };
465
466 self.event_sender.notify(PipelineEvent::Prepare {
467 pipeline_stages_progress: PipelineStagesProgress {
468 current: stage_index + 1,
469 total: total_stages,
470 },
471 stage_id,
472 checkpoint: prev_checkpoint,
473 target,
474 });
475
476 if let Err(err) = self.stage(stage_index).execute_ready(exec_input).await {
477 self.event_sender.notify(PipelineEvent::Error { stage_id });
478 match self.on_stage_error(stage_id, prev_checkpoint, err)? {
479 Some(ctrl) => return Ok(ctrl),
480 None => continue,
481 };
482 }
483
484 let stage_started_at = Instant::now();
485 let provider_rw = self.provider_factory.database_provider_rw()?;
486
487 self.event_sender.notify(PipelineEvent::Run {
488 pipeline_stages_progress: PipelineStagesProgress {
489 current: stage_index + 1,
490 total: total_stages,
491 },
492 stage_id,
493 checkpoint: prev_checkpoint,
494 target,
495 });
496
497 match self.stage(stage_index).execute(&provider_rw, exec_input) {
498 Ok(out @ ExecOutput { checkpoint, done }) => {
499 provider_rw.save_stage_checkpoint(stage_id, checkpoint)?;
501
502 provider_rw.commit()?;
504
505 self.stage(stage_index).post_execute_commit()?;
507
508 self.event_sender.notify(PipelineEvent::Ran {
510 pipeline_stages_progress: PipelineStagesProgress {
511 current: stage_index + 1,
512 total: total_stages,
513 },
514 stage_id,
515 result: out.clone(),
516 });
517 if let Some(metrics_tx) = &mut self.metrics_tx {
518 let _ = metrics_tx.send(MetricEvent::StageCheckpoint {
519 stage_id,
520 checkpoint,
521 max_block_number: target,
522 elapsed: stage_started_at.elapsed(),
523 });
524 }
525
526 let block_number = checkpoint.block_number;
527 let prev_block_number = prev_checkpoint.unwrap_or_default().block_number;
528 made_progress |= block_number != prev_block_number;
529 if done {
530 return Ok(if made_progress {
531 ControlFlow::Continue { block_number }
532 } else {
533 ControlFlow::NoProgress { block_number: Some(block_number) }
534 })
535 }
536 }
537 Err(err) => {
538 drop(provider_rw);
539 self.event_sender.notify(PipelineEvent::Error { stage_id });
540
541 if let Some(ctrl) = self.on_stage_error(stage_id, prev_checkpoint, err)? {
542 return Ok(ctrl)
543 }
544 }
545 }
546 }
547 }
548
549 fn on_stage_error(
550 &mut self,
551 stage_id: StageId,
552 prev_checkpoint: Option<StageCheckpoint>,
553 err: StageError,
554 ) -> Result<Option<ControlFlow>, PipelineError> {
555 if let StageError::DetachedHead { local_head, header, error } = err {
556 warn!(target: "sync::pipeline", stage = %stage_id, ?local_head, ?header, %error, "Stage encountered detached head");
557
558 if let Some(last_detached_head_unwind_target) = self.last_detached_head_unwind_target {
559 if local_head.block.hash == last_detached_head_unwind_target &&
560 header.block.number == local_head.block.number + 1
561 {
562 self.detached_head_attempts += 1;
563 } else {
564 self.detached_head_attempts = 1;
565 }
566 } else {
567 self.detached_head_attempts = 1;
568 }
569
570 let unwind_to = local_head
572 .block
573 .number
574 .saturating_sub(
575 BEACON_CONSENSUS_REORG_UNWIND_DEPTH.saturating_mul(self.detached_head_attempts),
576 )
577 .max(1);
578
579 self.last_detached_head_unwind_target = self.provider_factory.block_hash(unwind_to)?;
580 Ok(Some(ControlFlow::Unwind { target: unwind_to, bad_block: local_head }))
581 } else if let StageError::Block { block, error } = err {
582 match error {
583 BlockErrorKind::Validation(validation_error) => {
584 error!(
585 target: "sync::pipeline",
586 stage = %stage_id,
587 bad_block = %block.block.number,
588 "Stage encountered a validation error: {validation_error}"
589 );
590
591 if stage_id == StageId::MerkleExecute {
596 let provider_rw = self.provider_factory.database_provider_rw()?;
597 provider_rw
598 .save_stage_checkpoint_progress(StageId::MerkleExecute, vec![])?;
599 provider_rw.save_stage_checkpoint(
600 StageId::MerkleExecute,
601 prev_checkpoint.unwrap_or_default(),
602 )?;
603
604 provider_rw.commit()?;
605 }
606
607 Ok(Some(ControlFlow::Unwind {
612 target: prev_checkpoint.unwrap_or_default().block_number,
613 bad_block: block,
614 }))
615 }
616 BlockErrorKind::Execution(execution_error) => {
617 error!(
618 target: "sync::pipeline",
619 stage = %stage_id,
620 bad_block = %block.block.number,
621 "Stage encountered an execution error: {execution_error}"
622 );
623
624 Ok(Some(ControlFlow::Unwind {
629 target: prev_checkpoint.unwrap_or_default().block_number,
630 bad_block: block,
631 }))
632 }
633 }
634 } else if let StageError::MissingStaticFileData { block, segment } = err {
635 error!(
636 target: "sync::pipeline",
637 stage = %stage_id,
638 bad_block = %block.block.number,
639 segment = %segment,
640 "Stage is missing static file data."
641 );
642
643 Ok(Some(ControlFlow::Unwind {
644 target: block.block.number.saturating_sub(1),
645 bad_block: block,
646 }))
647 } else if err.is_fatal() {
648 error!(target: "sync::pipeline", stage = %stage_id, "Stage encountered a fatal error: {err}");
649 Err(err.into())
650 } else {
651 warn!(
654 target: "sync::pipeline",
655 stage = %stage_id,
656 "Stage encountered a non-fatal error: {err}. Retrying..."
657 );
658 Ok(None)
659 }
660 }
661}
662
663impl<N: ProviderNodeTypes> std::fmt::Debug for Pipeline<N> {
664 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
665 f.debug_struct("Pipeline")
666 .field("stages", &self.stages.iter().map(|stage| stage.id()).collect::<Vec<StageId>>())
667 .field("max_block", &self.max_block)
668 .field("event_sender", &self.event_sender)
669 .field("fail_on_unwind", &self.fail_on_unwind)
670 .finish()
671 }
672}
673
674#[cfg(test)]
675mod tests {
676 use std::sync::atomic::Ordering;
677
678 use super::*;
679 use crate::{test_utils::TestStage, UnwindOutput};
680 use assert_matches::assert_matches;
681 use reth_consensus::ConsensusError;
682 use reth_errors::ProviderError;
683 use reth_provider::test_utils::{create_test_provider_factory, MockNodeTypesWithDB};
684 use reth_prune::PruneModes;
685 use reth_testing_utils::generators::{self, random_block_with_parent};
686 use tokio_stream::StreamExt;
687
688 #[test]
689 fn record_progress_calculates_outliers() {
690 let mut progress = PipelineProgress::default();
691
692 progress.update(10);
693 assert_eq!(progress.minimum_block_number, Some(10));
694 assert_eq!(progress.maximum_block_number, Some(10));
695
696 progress.update(20);
697 assert_eq!(progress.minimum_block_number, Some(10));
698 assert_eq!(progress.maximum_block_number, Some(20));
699
700 progress.update(1);
701 assert_eq!(progress.minimum_block_number, Some(1));
702 assert_eq!(progress.maximum_block_number, Some(20));
703 }
704
705 #[test]
706 fn progress_ctrl_flow() {
707 let mut progress = PipelineProgress::default();
708
709 assert_eq!(progress.next_ctrl(), ControlFlow::NoProgress { block_number: None });
710
711 progress.update(1);
712 assert_eq!(progress.next_ctrl(), ControlFlow::Continue { block_number: 1 });
713 }
714
715 #[tokio::test]
717 async fn run_pipeline() {
718 let provider_factory = create_test_provider_factory();
719
720 let stage_a = TestStage::new(StageId::Other("A"))
721 .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(20), done: true }));
722 let (stage_a, post_execute_commit_counter_a) = stage_a.with_post_execute_commit_counter();
723 let (stage_a, post_unwind_commit_counter_a) = stage_a.with_post_unwind_commit_counter();
724
725 let stage_b = TestStage::new(StageId::Other("B"))
726 .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(10), done: true }));
727 let (stage_b, post_execute_commit_counter_b) = stage_b.with_post_execute_commit_counter();
728 let (stage_b, post_unwind_commit_counter_b) = stage_b.with_post_unwind_commit_counter();
729
730 let mut pipeline = Pipeline::<MockNodeTypesWithDB>::builder()
731 .add_stage(stage_a)
732 .add_stage(stage_b)
733 .with_max_block(10)
734 .build(
735 provider_factory.clone(),
736 StaticFileProducer::new(provider_factory.clone(), PruneModes::default()),
737 );
738 let events = pipeline.events();
739
740 tokio::spawn(async move {
742 pipeline.run().await.unwrap();
743 });
744
745 assert_eq!(
747 events.collect::<Vec<PipelineEvent>>().await,
748 vec![
749 PipelineEvent::Prepare {
750 pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 },
751 stage_id: StageId::Other("A"),
752 checkpoint: None,
753 target: Some(10),
754 },
755 PipelineEvent::Run {
756 pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 },
757 stage_id: StageId::Other("A"),
758 checkpoint: None,
759 target: Some(10),
760 },
761 PipelineEvent::Ran {
762 pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 },
763 stage_id: StageId::Other("A"),
764 result: ExecOutput { checkpoint: StageCheckpoint::new(20), done: true },
765 },
766 PipelineEvent::Prepare {
767 pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 },
768 stage_id: StageId::Other("B"),
769 checkpoint: None,
770 target: Some(10),
771 },
772 PipelineEvent::Run {
773 pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 },
774 stage_id: StageId::Other("B"),
775 checkpoint: None,
776 target: Some(10),
777 },
778 PipelineEvent::Ran {
779 pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 },
780 stage_id: StageId::Other("B"),
781 result: ExecOutput { checkpoint: StageCheckpoint::new(10), done: true },
782 },
783 ]
784 );
785
786 assert_eq!(post_execute_commit_counter_a.load(Ordering::Relaxed), 1);
787 assert_eq!(post_unwind_commit_counter_a.load(Ordering::Relaxed), 0);
788
789 assert_eq!(post_execute_commit_counter_b.load(Ordering::Relaxed), 1);
790 assert_eq!(post_unwind_commit_counter_b.load(Ordering::Relaxed), 0);
791 }
792
793 #[tokio::test]
795 async fn unwind_pipeline() {
796 let provider_factory = create_test_provider_factory();
797
798 let stage_a = TestStage::new(StageId::Other("A"))
799 .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(100), done: true }))
800 .add_unwind(Ok(UnwindOutput { checkpoint: StageCheckpoint::new(1) }));
801 let (stage_a, post_execute_commit_counter_a) = stage_a.with_post_execute_commit_counter();
802 let (stage_a, post_unwind_commit_counter_a) = stage_a.with_post_unwind_commit_counter();
803
804 let stage_b = TestStage::new(StageId::Other("B"))
805 .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(10), done: true }))
806 .add_unwind(Ok(UnwindOutput { checkpoint: StageCheckpoint::new(1) }));
807 let (stage_b, post_execute_commit_counter_b) = stage_b.with_post_execute_commit_counter();
808 let (stage_b, post_unwind_commit_counter_b) = stage_b.with_post_unwind_commit_counter();
809
810 let stage_c = TestStage::new(StageId::Other("C"))
811 .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(20), done: true }))
812 .add_unwind(Ok(UnwindOutput { checkpoint: StageCheckpoint::new(1) }));
813 let (stage_c, post_execute_commit_counter_c) = stage_c.with_post_execute_commit_counter();
814 let (stage_c, post_unwind_commit_counter_c) = stage_c.with_post_unwind_commit_counter();
815
816 let mut pipeline = Pipeline::<MockNodeTypesWithDB>::builder()
817 .add_stage(stage_a)
818 .add_stage(stage_b)
819 .add_stage(stage_c)
820 .with_max_block(10)
821 .build(
822 provider_factory.clone(),
823 StaticFileProducer::new(provider_factory.clone(), PruneModes::default()),
824 );
825 let events = pipeline.events();
826
827 tokio::spawn(async move {
829 pipeline.run().await.expect("Could not run pipeline");
831
832 pipeline.unwind(1, None).expect("Could not unwind pipeline");
834 });
835
836 assert_eq!(
838 events.collect::<Vec<PipelineEvent>>().await,
839 vec![
840 PipelineEvent::Prepare {
842 pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 3 },
843 stage_id: StageId::Other("A"),
844 checkpoint: None,
845 target: Some(10),
846 },
847 PipelineEvent::Run {
848 pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 3 },
849 stage_id: StageId::Other("A"),
850 checkpoint: None,
851 target: Some(10),
852 },
853 PipelineEvent::Ran {
854 pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 3 },
855 stage_id: StageId::Other("A"),
856 result: ExecOutput { checkpoint: StageCheckpoint::new(100), done: true },
857 },
858 PipelineEvent::Prepare {
859 pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 3 },
860 stage_id: StageId::Other("B"),
861 checkpoint: None,
862 target: Some(10),
863 },
864 PipelineEvent::Run {
865 pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 3 },
866 stage_id: StageId::Other("B"),
867 checkpoint: None,
868 target: Some(10),
869 },
870 PipelineEvent::Ran {
871 pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 3 },
872 stage_id: StageId::Other("B"),
873 result: ExecOutput { checkpoint: StageCheckpoint::new(10), done: true },
874 },
875 PipelineEvent::Prepare {
876 pipeline_stages_progress: PipelineStagesProgress { current: 3, total: 3 },
877 stage_id: StageId::Other("C"),
878 checkpoint: None,
879 target: Some(10),
880 },
881 PipelineEvent::Run {
882 pipeline_stages_progress: PipelineStagesProgress { current: 3, total: 3 },
883 stage_id: StageId::Other("C"),
884 checkpoint: None,
885 target: Some(10),
886 },
887 PipelineEvent::Ran {
888 pipeline_stages_progress: PipelineStagesProgress { current: 3, total: 3 },
889 stage_id: StageId::Other("C"),
890 result: ExecOutput { checkpoint: StageCheckpoint::new(20), done: true },
891 },
892 PipelineEvent::Unwind {
894 stage_id: StageId::Other("C"),
895 input: UnwindInput {
896 checkpoint: StageCheckpoint::new(20),
897 unwind_to: 1,
898 bad_block: None
899 }
900 },
901 PipelineEvent::Unwound {
902 stage_id: StageId::Other("C"),
903 result: UnwindOutput { checkpoint: StageCheckpoint::new(1) },
904 },
905 PipelineEvent::Unwind {
906 stage_id: StageId::Other("B"),
907 input: UnwindInput {
908 checkpoint: StageCheckpoint::new(10),
909 unwind_to: 1,
910 bad_block: None
911 }
912 },
913 PipelineEvent::Unwound {
914 stage_id: StageId::Other("B"),
915 result: UnwindOutput { checkpoint: StageCheckpoint::new(1) },
916 },
917 PipelineEvent::Unwind {
918 stage_id: StageId::Other("A"),
919 input: UnwindInput {
920 checkpoint: StageCheckpoint::new(100),
921 unwind_to: 1,
922 bad_block: None
923 }
924 },
925 PipelineEvent::Unwound {
926 stage_id: StageId::Other("A"),
927 result: UnwindOutput { checkpoint: StageCheckpoint::new(1) },
928 },
929 ]
930 );
931
932 assert_eq!(post_execute_commit_counter_a.load(Ordering::Relaxed), 1);
933 assert_eq!(post_unwind_commit_counter_a.load(Ordering::Relaxed), 1);
934
935 assert_eq!(post_execute_commit_counter_b.load(Ordering::Relaxed), 1);
936 assert_eq!(post_unwind_commit_counter_b.load(Ordering::Relaxed), 1);
937
938 assert_eq!(post_execute_commit_counter_c.load(Ordering::Relaxed), 1);
939 assert_eq!(post_unwind_commit_counter_c.load(Ordering::Relaxed), 1);
940 }
941
942 #[tokio::test]
944 async fn unwind_pipeline_with_intermediate_progress() {
945 let provider_factory = create_test_provider_factory();
946
947 let mut pipeline = Pipeline::<MockNodeTypesWithDB>::builder()
948 .add_stage(
949 TestStage::new(StageId::Other("A"))
950 .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(100), done: true }))
951 .add_unwind(Ok(UnwindOutput { checkpoint: StageCheckpoint::new(50) })),
952 )
953 .add_stage(
954 TestStage::new(StageId::Other("B"))
955 .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(10), done: true })),
956 )
957 .with_max_block(10)
958 .build(
959 provider_factory.clone(),
960 StaticFileProducer::new(provider_factory.clone(), PruneModes::default()),
961 );
962 let events = pipeline.events();
963
964 tokio::spawn(async move {
966 pipeline.run().await.expect("Could not run pipeline");
968
969 pipeline.unwind(50, None).expect("Could not unwind pipeline");
971 });
972
973 assert_eq!(
975 events.collect::<Vec<PipelineEvent>>().await,
976 vec![
977 PipelineEvent::Prepare {
979 pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 },
980 stage_id: StageId::Other("A"),
981 checkpoint: None,
982 target: Some(10),
983 },
984 PipelineEvent::Run {
985 pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 },
986 stage_id: StageId::Other("A"),
987 checkpoint: None,
988 target: Some(10),
989 },
990 PipelineEvent::Ran {
991 pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 },
992 stage_id: StageId::Other("A"),
993 result: ExecOutput { checkpoint: StageCheckpoint::new(100), done: true },
994 },
995 PipelineEvent::Prepare {
996 pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 },
997 stage_id: StageId::Other("B"),
998 checkpoint: None,
999 target: Some(10),
1000 },
1001 PipelineEvent::Run {
1002 pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 },
1003 stage_id: StageId::Other("B"),
1004 checkpoint: None,
1005 target: Some(10),
1006 },
1007 PipelineEvent::Ran {
1008 pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 },
1009 stage_id: StageId::Other("B"),
1010 result: ExecOutput { checkpoint: StageCheckpoint::new(10), done: true },
1011 },
1012 PipelineEvent::Skipped { stage_id: StageId::Other("B") },
1015 PipelineEvent::Unwind {
1016 stage_id: StageId::Other("A"),
1017 input: UnwindInput {
1018 checkpoint: StageCheckpoint::new(100),
1019 unwind_to: 50,
1020 bad_block: None
1021 }
1022 },
1023 PipelineEvent::Unwound {
1024 stage_id: StageId::Other("A"),
1025 result: UnwindOutput { checkpoint: StageCheckpoint::new(50) },
1026 },
1027 ]
1028 );
1029 }
1030
1031 #[tokio::test]
1044 async fn run_pipeline_with_unwind() {
1045 let provider_factory = create_test_provider_factory();
1046
1047 let mut pipeline = Pipeline::<MockNodeTypesWithDB>::builder()
1048 .add_stage(
1049 TestStage::new(StageId::Other("A"))
1050 .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(10), done: true }))
1051 .add_unwind(Ok(UnwindOutput { checkpoint: StageCheckpoint::new(0) }))
1052 .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(10), done: true })),
1053 )
1054 .add_stage(
1055 TestStage::new(StageId::Other("B"))
1056 .add_exec(Err(StageError::Block {
1057 block: Box::new(random_block_with_parent(
1058 &mut generators::rng(),
1059 5,
1060 Default::default(),
1061 )),
1062 error: BlockErrorKind::Validation(ConsensusError::BaseFeeMissing),
1063 }))
1064 .add_unwind(Ok(UnwindOutput { checkpoint: StageCheckpoint::new(0) }))
1065 .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(10), done: true })),
1066 )
1067 .with_max_block(10)
1068 .build(
1069 provider_factory.clone(),
1070 StaticFileProducer::new(provider_factory.clone(), PruneModes::default()),
1071 );
1072 let events = pipeline.events();
1073
1074 tokio::spawn(async move {
1076 pipeline.run().await.expect("Could not run pipeline");
1077 });
1078
1079 assert_eq!(
1081 events.collect::<Vec<PipelineEvent>>().await,
1082 vec![
1083 PipelineEvent::Prepare {
1084 pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 },
1085 stage_id: StageId::Other("A"),
1086 checkpoint: None,
1087 target: Some(10),
1088 },
1089 PipelineEvent::Run {
1090 pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 },
1091 stage_id: StageId::Other("A"),
1092 checkpoint: None,
1093 target: Some(10),
1094 },
1095 PipelineEvent::Ran {
1096 pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 },
1097 stage_id: StageId::Other("A"),
1098 result: ExecOutput { checkpoint: StageCheckpoint::new(10), done: true },
1099 },
1100 PipelineEvent::Prepare {
1101 pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 },
1102 stage_id: StageId::Other("B"),
1103 checkpoint: None,
1104 target: Some(10),
1105 },
1106 PipelineEvent::Run {
1107 pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 },
1108 stage_id: StageId::Other("B"),
1109 checkpoint: None,
1110 target: Some(10),
1111 },
1112 PipelineEvent::Error { stage_id: StageId::Other("B") },
1113 PipelineEvent::Unwind {
1114 stage_id: StageId::Other("A"),
1115 input: UnwindInput {
1116 checkpoint: StageCheckpoint::new(10),
1117 unwind_to: 0,
1118 bad_block: Some(5)
1119 }
1120 },
1121 PipelineEvent::Unwound {
1122 stage_id: StageId::Other("A"),
1123 result: UnwindOutput { checkpoint: StageCheckpoint::new(0) },
1124 },
1125 PipelineEvent::Prepare {
1126 pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 },
1127 stage_id: StageId::Other("A"),
1128 checkpoint: Some(StageCheckpoint::new(0)),
1129 target: Some(10),
1130 },
1131 PipelineEvent::Run {
1132 pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 },
1133 stage_id: StageId::Other("A"),
1134 checkpoint: Some(StageCheckpoint::new(0)),
1135 target: Some(10),
1136 },
1137 PipelineEvent::Ran {
1138 pipeline_stages_progress: PipelineStagesProgress { current: 1, total: 2 },
1139 stage_id: StageId::Other("A"),
1140 result: ExecOutput { checkpoint: StageCheckpoint::new(10), done: true },
1141 },
1142 PipelineEvent::Prepare {
1143 pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 },
1144 stage_id: StageId::Other("B"),
1145 checkpoint: None,
1146 target: Some(10),
1147 },
1148 PipelineEvent::Run {
1149 pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 },
1150 stage_id: StageId::Other("B"),
1151 checkpoint: None,
1152 target: Some(10),
1153 },
1154 PipelineEvent::Ran {
1155 pipeline_stages_progress: PipelineStagesProgress { current: 2, total: 2 },
1156 stage_id: StageId::Other("B"),
1157 result: ExecOutput { checkpoint: StageCheckpoint::new(10), done: true },
1158 },
1159 ]
1160 );
1161 }
1162
1163 #[tokio::test]
1165 async fn pipeline_error_handling() {
1166 let provider_factory = create_test_provider_factory();
1168 let mut pipeline = Pipeline::<MockNodeTypesWithDB>::builder()
1169 .add_stage(
1170 TestStage::new(StageId::Other("NonFatal"))
1171 .add_exec(Err(StageError::Recoverable(Box::new(std::fmt::Error))))
1172 .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(10), done: true })),
1173 )
1174 .with_max_block(10)
1175 .build(
1176 provider_factory.clone(),
1177 StaticFileProducer::new(provider_factory.clone(), PruneModes::default()),
1178 );
1179 let result = pipeline.run().await;
1180 assert_matches!(result, Ok(()));
1181
1182 let provider_factory = create_test_provider_factory();
1184 let mut pipeline = Pipeline::<MockNodeTypesWithDB>::builder()
1185 .add_stage(TestStage::new(StageId::Other("Fatal")).add_exec(Err(
1186 StageError::DatabaseIntegrity(ProviderError::BlockBodyIndicesNotFound(5)),
1187 )))
1188 .build(
1189 provider_factory.clone(),
1190 StaticFileProducer::new(provider_factory.clone(), PruneModes::default()),
1191 );
1192 let result = pipeline.run().await;
1193 assert_matches!(
1194 result,
1195 Err(PipelineError::Stage(StageError::DatabaseIntegrity(
1196 ProviderError::BlockBodyIndicesNotFound(5)
1197 )))
1198 );
1199 }
1200}