reth_engine_tree/
backfill.rs

1//! It is expected that the node has two sync modes:
2//!
3//!  - Backfill sync: Sync to a certain block height in stages, e.g. download data from p2p then
4//!    execute that range.
5//!  - Live sync: In this mode the node is keeping up with the latest tip and listens for new
6//!    requests from the consensus client.
7//!
8//! These modes are mutually exclusive and the node can only be in one mode at a time.
9
10use futures::FutureExt;
11use reth_provider::providers::ProviderNodeTypes;
12use reth_stages_api::{ControlFlow, Pipeline, PipelineError, PipelineTarget, PipelineWithResult};
13use reth_tasks::TaskSpawner;
14use std::task::{ready, Context, Poll};
15use tokio::sync::oneshot;
16use tracing::trace;
17
18/// Represents the state of the backfill synchronization process.
19#[derive(Debug, PartialEq, Eq, Default)]
20pub enum BackfillSyncState {
21    /// The node is not performing any backfill synchronization.
22    /// This is the initial or default state.
23    #[default]
24    Idle,
25    /// A backfill synchronization has been requested or planned, but processing has not started
26    /// yet.
27    Pending,
28    /// The node is actively engaged in backfill synchronization.
29    Active,
30}
31
32impl BackfillSyncState {
33    /// Returns true if the state is idle.
34    pub const fn is_idle(&self) -> bool {
35        matches!(self, Self::Idle)
36    }
37
38    /// Returns true if the state is pending.
39    pub const fn is_pending(&self) -> bool {
40        matches!(self, Self::Pending)
41    }
42
43    /// Returns true if the state is active.
44    pub const fn is_active(&self) -> bool {
45        matches!(self, Self::Active)
46    }
47}
48
49/// Backfill sync mode functionality.
50pub trait BackfillSync: Send + Sync {
51    /// Performs a backfill action.
52    fn on_action(&mut self, action: BackfillAction);
53
54    /// Polls the pipeline for completion.
55    fn poll(&mut self, cx: &mut Context<'_>) -> Poll<BackfillEvent>;
56}
57
58/// The backfill actions that can be performed.
59#[derive(Debug, Clone, PartialEq, Eq)]
60pub enum BackfillAction {
61    /// Start backfilling with the given target.
62    Start(PipelineTarget),
63}
64
65/// The events that can be emitted on backfill sync.
66#[derive(Debug)]
67pub enum BackfillEvent {
68    /// Backfill sync started.
69    Started(PipelineTarget),
70    /// Backfill sync finished.
71    ///
72    /// If this is returned, backfill sync is idle.
73    Finished(Result<ControlFlow, PipelineError>),
74    /// Sync task was dropped after it was started, unable to receive it because
75    /// channel closed. This would indicate a panicked task.
76    TaskDropped(String),
77}
78
79/// Pipeline sync.
80#[derive(Debug)]
81pub struct PipelineSync<N: ProviderNodeTypes> {
82    /// The type that can spawn the pipeline task.
83    pipeline_task_spawner: Box<dyn TaskSpawner>,
84    /// The current state of the pipeline.
85    /// The pipeline is used for large ranges.
86    pipeline_state: PipelineState<N>,
87    /// Pending target block for the pipeline to sync
88    pending_pipeline_target: Option<PipelineTarget>,
89}
90
91impl<N: ProviderNodeTypes> PipelineSync<N> {
92    /// Create a new instance.
93    pub fn new(pipeline: Pipeline<N>, pipeline_task_spawner: Box<dyn TaskSpawner>) -> Self {
94        Self {
95            pipeline_task_spawner,
96            pipeline_state: PipelineState::Idle(Some(pipeline)),
97            pending_pipeline_target: None,
98        }
99    }
100
101    /// Returns `true` if a pipeline target is queued and will be triggered on the next `poll`.
102    #[allow(dead_code)]
103    const fn is_pipeline_sync_pending(&self) -> bool {
104        self.pending_pipeline_target.is_some() && self.pipeline_state.is_idle()
105    }
106
107    /// Returns `true` if the pipeline is idle.
108    const fn is_pipeline_idle(&self) -> bool {
109        self.pipeline_state.is_idle()
110    }
111
112    /// Returns `true` if the pipeline is active.
113    const fn is_pipeline_active(&self) -> bool {
114        !self.is_pipeline_idle()
115    }
116
117    /// Sets a new target to sync the pipeline to.
118    ///
119    /// But ensures the target is not the zero hash.
120    fn set_pipeline_sync_target(&mut self, target: PipelineTarget) {
121        if target.sync_target().is_some_and(|target| target.is_zero()) {
122            trace!(
123                target: "consensus::engine::sync",
124                "Pipeline target cannot be zero hash."
125            );
126            // precaution to never sync to the zero hash
127            return
128        }
129        self.pending_pipeline_target = Some(target);
130    }
131
132    /// This will spawn the pipeline if it is idle and a target is set or if the pipeline is set to
133    /// run continuously.
134    fn try_spawn_pipeline(&mut self) -> Option<BackfillEvent> {
135        match &mut self.pipeline_state {
136            PipelineState::Idle(pipeline) => {
137                let target = self.pending_pipeline_target.take()?;
138                let (tx, rx) = oneshot::channel();
139
140                let pipeline = pipeline.take().expect("exists");
141                self.pipeline_task_spawner.spawn_critical_blocking(
142                    "pipeline task",
143                    Box::pin(async move {
144                        let result = pipeline.run_as_fut(Some(target)).await;
145                        let _ = tx.send(result);
146                    }),
147                );
148                self.pipeline_state = PipelineState::Running(rx);
149
150                Some(BackfillEvent::Started(target))
151            }
152            PipelineState::Running(_) => None,
153        }
154    }
155
156    /// Advances the pipeline state.
157    ///
158    /// This checks for the result in the channel, or returns pending if the pipeline is idle.
159    fn poll_pipeline(&mut self, cx: &mut Context<'_>) -> Poll<BackfillEvent> {
160        let res = match self.pipeline_state {
161            PipelineState::Idle(_) => return Poll::Pending,
162            PipelineState::Running(ref mut fut) => {
163                ready!(fut.poll_unpin(cx))
164            }
165        };
166        let ev = match res {
167            Ok((pipeline, result)) => {
168                self.pipeline_state = PipelineState::Idle(Some(pipeline));
169                BackfillEvent::Finished(result)
170            }
171            Err(why) => {
172                // failed to receive the pipeline
173                BackfillEvent::TaskDropped(why.to_string())
174            }
175        };
176        Poll::Ready(ev)
177    }
178}
179
180impl<N: ProviderNodeTypes> BackfillSync for PipelineSync<N> {
181    fn on_action(&mut self, event: BackfillAction) {
182        match event {
183            BackfillAction::Start(target) => self.set_pipeline_sync_target(target),
184        }
185    }
186
187    fn poll(&mut self, cx: &mut Context<'_>) -> Poll<BackfillEvent> {
188        // try to spawn a pipeline if a target is set
189        if let Some(event) = self.try_spawn_pipeline() {
190            return Poll::Ready(event)
191        }
192
193        // make sure we poll the pipeline if it's active, and return any ready pipeline events
194        if self.is_pipeline_active() {
195            // advance the pipeline
196            if let Poll::Ready(event) = self.poll_pipeline(cx) {
197                return Poll::Ready(event)
198            }
199        }
200
201        Poll::Pending
202    }
203}
204
205/// The possible pipeline states within the sync controller.
206///
207/// [`PipelineState::Idle`] means that the pipeline is currently idle.
208/// [`PipelineState::Running`] means that the pipeline is currently running.
209///
210/// NOTE: The differentiation between these two states is important, because when the pipeline is
211/// running, it acquires the write lock over the database. This means that we cannot forward to the
212/// blockchain tree any messages that would result in database writes, since it would result in a
213/// deadlock.
214#[derive(Debug)]
215#[allow(clippy::large_enum_variant)]
216enum PipelineState<N: ProviderNodeTypes> {
217    /// Pipeline is idle.
218    Idle(Option<Pipeline<N>>),
219    /// Pipeline is running and waiting for a response
220    Running(oneshot::Receiver<PipelineWithResult<N>>),
221}
222
223impl<N: ProviderNodeTypes> PipelineState<N> {
224    /// Returns `true` if the state matches idle.
225    const fn is_idle(&self) -> bool {
226        matches!(self, Self::Idle(_))
227    }
228}
229
230#[cfg(test)]
231mod tests {
232    use super::*;
233    use crate::test_utils::{insert_headers_into_client, TestPipelineBuilder};
234    use alloy_consensus::Header;
235    use alloy_eips::eip1559::ETHEREUM_BLOCK_GAS_LIMIT_30M;
236    use alloy_primitives::{BlockNumber, B256};
237    use assert_matches::assert_matches;
238    use futures::poll;
239    use reth_chainspec::{ChainSpecBuilder, MAINNET};
240    use reth_network_p2p::test_utils::TestFullBlockClient;
241    use reth_primitives_traits::SealedHeader;
242    use reth_provider::test_utils::MockNodeTypesWithDB;
243    use reth_stages::ExecOutput;
244    use reth_stages_api::StageCheckpoint;
245    use reth_tasks::TokioTaskExecutor;
246    use std::{collections::VecDeque, future::poll_fn, sync::Arc};
247
248    struct TestHarness {
249        pipeline_sync: PipelineSync<MockNodeTypesWithDB>,
250        tip: B256,
251    }
252
253    impl TestHarness {
254        fn new(total_blocks: usize, pipeline_done_after: u64) -> Self {
255            let chain_spec = Arc::new(
256                ChainSpecBuilder::default()
257                    .chain(MAINNET.chain)
258                    .genesis(MAINNET.genesis.clone())
259                    .paris_activated()
260                    .build(),
261            );
262
263            // force the pipeline to be "done" after `pipeline_done_after` blocks
264            let pipeline = TestPipelineBuilder::new()
265                .with_pipeline_exec_outputs(VecDeque::from([Ok(ExecOutput {
266                    checkpoint: StageCheckpoint::new(BlockNumber::from(pipeline_done_after)),
267                    done: true,
268                })]))
269                .build(chain_spec);
270
271            let pipeline_sync = PipelineSync::new(pipeline, Box::<TokioTaskExecutor>::default());
272            let client = TestFullBlockClient::default();
273            let header = Header {
274                base_fee_per_gas: Some(7),
275                gas_limit: ETHEREUM_BLOCK_GAS_LIMIT_30M,
276                ..Default::default()
277            };
278            let header = SealedHeader::seal_slow(header);
279            insert_headers_into_client(&client, header, 0..total_blocks);
280
281            let tip = client.highest_block().expect("there should be blocks here").hash();
282
283            Self { pipeline_sync, tip }
284        }
285    }
286
287    #[tokio::test]
288    async fn pipeline_started_and_finished() {
289        const TOTAL_BLOCKS: usize = 10;
290        const PIPELINE_DONE_AFTER: u64 = 5;
291        let TestHarness { mut pipeline_sync, tip } =
292            TestHarness::new(TOTAL_BLOCKS, PIPELINE_DONE_AFTER);
293
294        let sync_future = poll_fn(|cx| pipeline_sync.poll(cx));
295        let next_event = poll!(sync_future);
296
297        // sync target not set, pipeline not started
298        assert_matches!(next_event, Poll::Pending);
299
300        pipeline_sync.on_action(BackfillAction::Start(PipelineTarget::Sync(tip)));
301
302        let sync_future = poll_fn(|cx| pipeline_sync.poll(cx));
303        let next_event = poll!(sync_future);
304
305        // sync target set, pipeline started
306        assert_matches!(next_event, Poll::Ready(BackfillEvent::Started(target)) => {
307            assert_eq!(target.sync_target().unwrap(), tip);
308        });
309
310        // the next event should be the pipeline finishing in a good state
311        let sync_future = poll_fn(|cx| pipeline_sync.poll(cx));
312        let next_ready = sync_future.await;
313        assert_matches!(next_ready, BackfillEvent::Finished(result) => {
314            assert_matches!(result, Ok(control_flow) => assert_eq!(control_flow, ControlFlow::Continue { block_number: PIPELINE_DONE_AFTER }));
315        });
316    }
317}