1//! It is expected that the node has two sync modes:
2//!
3//! - Backfill sync: Sync to a certain block height in stages, e.g. download data from p2p then
4//! execute that range.
5//! - Live sync: In this mode the node is keeping up with the latest tip and listens for new
6//! requests from the consensus client.
7//!
8//! These modes are mutually exclusive and the node can only be in one mode at a time.
910use futures::FutureExt;
11use reth_provider::providers::ProviderNodeTypes;
12use reth_stages_api::{ControlFlow, Pipeline, PipelineError, PipelineTarget, PipelineWithResult};
13use reth_tasks::TaskSpawner;
14use std::task::{ready, Context, Poll};
15use tokio::sync::oneshot;
16use tracing::trace;
1718/// Represents the state of the backfill synchronization process.
19#[derive(Debug, PartialEq, Eq, Default)]
20pub enum BackfillSyncState {
21/// The node is not performing any backfill synchronization.
22 /// This is the initial or default state.
23#[default]
24Idle,
25/// A backfill synchronization has been requested or planned, but processing has not started
26 /// yet.
27Pending,
28/// The node is actively engaged in backfill synchronization.
29Active,
30}
3132impl BackfillSyncState {
33/// Returns true if the state is idle.
34pub const fn is_idle(&self) -> bool {
35matches!(self, Self::Idle)
36 }
3738/// Returns true if the state is pending.
39pub const fn is_pending(&self) -> bool {
40matches!(self, Self::Pending)
41 }
4243/// Returns true if the state is active.
44pub const fn is_active(&self) -> bool {
45matches!(self, Self::Active)
46 }
47}
4849/// Backfill sync mode functionality.
50pub trait BackfillSync: Send + Sync {
51/// Performs a backfill action.
52fn on_action(&mut self, action: BackfillAction);
5354/// Polls the pipeline for completion.
55fn poll(&mut self, cx: &mut Context<'_>) -> Poll<BackfillEvent>;
56}
5758/// The backfill actions that can be performed.
59#[derive(Debug, Clone, PartialEq, Eq)]
60pub enum BackfillAction {
61/// Start backfilling with the given target.
62Start(PipelineTarget),
63}
6465/// The events that can be emitted on backfill sync.
66#[derive(Debug)]
67pub enum BackfillEvent {
68/// Backfill sync started.
69Started(PipelineTarget),
70/// Backfill sync finished.
71 ///
72 /// If this is returned, backfill sync is idle.
73Finished(Result<ControlFlow, PipelineError>),
74/// Sync task was dropped after it was started, unable to receive it because
75 /// channel closed. This would indicate a panicked task.
76TaskDropped(String),
77}
7879/// Pipeline sync.
80#[derive(Debug)]
81pub struct PipelineSync<N: ProviderNodeTypes> {
82/// The type that can spawn the pipeline task.
83pipeline_task_spawner: Box<dyn TaskSpawner>,
84/// The current state of the pipeline.
85 /// The pipeline is used for large ranges.
86pipeline_state: PipelineState<N>,
87/// Pending target block for the pipeline to sync
88pending_pipeline_target: Option<PipelineTarget>,
89}
9091impl<N: ProviderNodeTypes> PipelineSync<N> {
92/// Create a new instance.
93pub fn new(pipeline: Pipeline<N>, pipeline_task_spawner: Box<dyn TaskSpawner>) -> Self {
94Self {
95pipeline_task_spawner,
96 pipeline_state: PipelineState::Idle(Some(pipeline)),
97 pending_pipeline_target: None,
98 }
99 }
100101/// Returns `true` if a pipeline target is queued and will be triggered on the next `poll`.
102#[allow(dead_code)]
103const fn is_pipeline_sync_pending(&self) -> bool {
104self.pending_pipeline_target.is_some() && self.pipeline_state.is_idle()
105 }
106107/// Returns `true` if the pipeline is idle.
108const fn is_pipeline_idle(&self) -> bool {
109self.pipeline_state.is_idle()
110 }
111112/// Returns `true` if the pipeline is active.
113const fn is_pipeline_active(&self) -> bool {
114 !self.is_pipeline_idle()
115 }
116117/// Sets a new target to sync the pipeline to.
118 ///
119 /// But ensures the target is not the zero hash.
120fn set_pipeline_sync_target(&mut self, target: PipelineTarget) {
121if target.sync_target().is_some_and(|target| target.is_zero()) {
122trace!(
123 target: "consensus::engine::sync",
124"Pipeline target cannot be zero hash."
125);
126// precaution to never sync to the zero hash
127return
128}
129self.pending_pipeline_target = Some(target);
130 }
131132/// This will spawn the pipeline if it is idle and a target is set or if the pipeline is set to
133 /// run continuously.
134fn try_spawn_pipeline(&mut self) -> Option<BackfillEvent> {
135match &mut self.pipeline_state {
136PipelineState::Idle(pipeline) => {
137let target = self.pending_pipeline_target.take()?;
138let (tx, rx) = oneshot::channel();
139140let pipeline = pipeline.take().expect("exists");
141self.pipeline_task_spawner.spawn_critical_blocking(
142"pipeline task",
143Box::pin(async move {
144let result = pipeline.run_as_fut(Some(target)).await;
145let _ = tx.send(result);
146 }),
147 );
148self.pipeline_state = PipelineState::Running(rx);
149150Some(BackfillEvent::Started(target))
151 }
152PipelineState::Running(_) => None,
153 }
154 }
155156/// Advances the pipeline state.
157 ///
158 /// This checks for the result in the channel, or returns pending if the pipeline is idle.
159fn poll_pipeline(&mut self, cx: &mut Context<'_>) -> Poll<BackfillEvent> {
160let res = match self.pipeline_state {
161PipelineState::Idle(_) => return Poll::Pending,
162PipelineState::Running(ref mut fut) => {
163ready!(fut.poll_unpin(cx))
164 }
165 };
166let ev = match res {
167Ok((pipeline, result)) => {
168self.pipeline_state = PipelineState::Idle(Some(pipeline));
169BackfillEvent::Finished(result)
170 }
171Err(why) => {
172// failed to receive the pipeline
173BackfillEvent::TaskDropped(why.to_string())
174 }
175 };
176Poll::Ready(ev)
177 }
178}
179180impl<N: ProviderNodeTypes> BackfillSyncfor PipelineSync<N> {
181fn on_action(&mut self, event: BackfillAction) {
182match event {
183BackfillAction::Start(target) => self.set_pipeline_sync_target(target),
184 }
185 }
186187fn poll(&mut self, cx: &mut Context<'_>) -> Poll<BackfillEvent> {
188// try to spawn a pipeline if a target is set
189if let Some(event) = self.try_spawn_pipeline() {
190return Poll::Ready(event)
191 }
192193// make sure we poll the pipeline if it's active, and return any ready pipeline events
194if self.is_pipeline_active() {
195// advance the pipeline
196if let Poll::Ready(event) = self.poll_pipeline(cx) {
197return Poll::Ready(event)
198 }
199 }
200201Poll::Pending202 }
203}
204205/// The possible pipeline states within the sync controller.
206///
207/// [`PipelineState::Idle`] means that the pipeline is currently idle.
208/// [`PipelineState::Running`] means that the pipeline is currently running.
209///
210/// NOTE: The differentiation between these two states is important, because when the pipeline is
211/// running, it acquires the write lock over the database. This means that we cannot forward to the
212/// blockchain tree any messages that would result in database writes, since it would result in a
213/// deadlock.
214#[derive(Debug)]
215#[allow(clippy::large_enum_variant)]
216enum PipelineState<N: ProviderNodeTypes> {
217/// Pipeline is idle.
218Idle(Option<Pipeline<N>>),
219/// Pipeline is running and waiting for a response
220Running(oneshot::Receiver<PipelineWithResult<N>>),
221}
222223impl<N: ProviderNodeTypes> PipelineState<N> {
224/// Returns `true` if the state matches idle.
225const fn is_idle(&self) -> bool {
226matches!(self, Self::Idle(_))
227 }
228}
229230#[cfg(test)]
231mod tests {
232use super::*;
233use crate::test_utils::{insert_headers_into_client, TestPipelineBuilder};
234use alloy_consensus::Header;
235use alloy_eips::eip1559::ETHEREUM_BLOCK_GAS_LIMIT_30M;
236use alloy_primitives::{BlockNumber, B256};
237use assert_matches::assert_matches;
238use futures::poll;
239use reth_chainspec::{ChainSpecBuilder, MAINNET};
240use reth_network_p2p::test_utils::TestFullBlockClient;
241use reth_primitives_traits::SealedHeader;
242use reth_provider::test_utils::MockNodeTypesWithDB;
243use reth_stages::ExecOutput;
244use reth_stages_api::StageCheckpoint;
245use reth_tasks::TokioTaskExecutor;
246use std::{collections::VecDeque, future::poll_fn, sync::Arc};
247248struct TestHarness {
249 pipeline_sync: PipelineSync<MockNodeTypesWithDB>,
250 tip: B256,
251 }
252253impl TestHarness {
254fn new(total_blocks: usize, pipeline_done_after: u64) -> Self {
255let chain_spec = Arc::new(
256 ChainSpecBuilder::default()
257 .chain(MAINNET.chain)
258 .genesis(MAINNET.genesis.clone())
259 .paris_activated()
260 .build(),
261 );
262263// force the pipeline to be "done" after `pipeline_done_after` blocks
264let pipeline = TestPipelineBuilder::new()
265 .with_pipeline_exec_outputs(VecDeque::from([Ok(ExecOutput {
266 checkpoint: StageCheckpoint::new(BlockNumber::from(pipeline_done_after)),
267 done: true,
268 })]))
269 .build(chain_spec);
270271let pipeline_sync = PipelineSync::new(pipeline, Box::<TokioTaskExecutor>::default());
272let client = TestFullBlockClient::default();
273let header = Header {
274 base_fee_per_gas: Some(7),
275 gas_limit: ETHEREUM_BLOCK_GAS_LIMIT_30M,
276 ..Default::default()
277 };
278let header = SealedHeader::seal_slow(header);
279 insert_headers_into_client(&client, header, 0..total_blocks);
280281let tip = client.highest_block().expect("there should be blocks here").hash();
282283Self { pipeline_sync, tip }
284 }
285 }
286287#[tokio::test]
288async fn pipeline_started_and_finished() {
289const TOTAL_BLOCKS: usize = 10;
290const PIPELINE_DONE_AFTER: u64 = 5;
291let TestHarness { mut pipeline_sync, tip } =
292 TestHarness::new(TOTAL_BLOCKS, PIPELINE_DONE_AFTER);
293294let sync_future = poll_fn(|cx| pipeline_sync.poll(cx));
295let next_event = poll!(sync_future);
296297// sync target not set, pipeline not started
298assert_matches!(next_event, Poll::Pending);
299300 pipeline_sync.on_action(BackfillAction::Start(PipelineTarget::Sync(tip)));
301302let sync_future = poll_fn(|cx| pipeline_sync.poll(cx));
303let next_event = poll!(sync_future);
304305// sync target set, pipeline started
306assert_matches!(next_event, Poll::Ready(BackfillEvent::Started(target)) => {
307assert_eq!(target.sync_target().unwrap(), tip);
308 });
309310// the next event should be the pipeline finishing in a good state
311let sync_future = poll_fn(|cx| pipeline_sync.poll(cx));
312let next_ready = sync_future.await;
313assert_matches!(next_ready, BackfillEvent::Finished(result) => {
314assert_matches!(result, Ok(control_flow) => assert_eq!(control_flow, ControlFlow::Continue { block_number: PIPELINE_DONE_AFTER }));
315 });
316 }
317}