reth_stages/stages/
bodies.rs

1use super::missing_static_data_error;
2use futures_util::TryStreamExt;
3use reth_db_api::{
4    cursor::DbCursorRO,
5    tables,
6    transaction::{DbTx, DbTxMut},
7};
8use reth_network_p2p::bodies::{downloader::BodyDownloader, response::BlockResponse};
9use reth_provider::{
10    providers::StaticFileWriter, BlockReader, BlockWriter, DBProvider, ProviderError,
11    StaticFileProviderFactory, StatsReader, StorageLocation,
12};
13use reth_stages_api::{
14    EntitiesCheckpoint, ExecInput, ExecOutput, Stage, StageCheckpoint, StageError, StageId,
15    UnwindInput, UnwindOutput,
16};
17use reth_static_file_types::StaticFileSegment;
18use reth_storage_errors::provider::ProviderResult;
19use std::{
20    cmp::Ordering,
21    task::{ready, Context, Poll},
22};
23use tracing::*;
24
25/// The body stage downloads block bodies.
26///
27/// The body stage downloads block bodies for all block headers stored locally in storage.
28///
29/// # Empty blocks
30///
31/// Blocks with an ommers hash corresponding to no ommers *and* a transaction root corresponding to
32/// no transactions will not have a block body downloaded for them, since it would be meaningless to
33/// do so.
34///
35/// This also means that if there is no body for the block in storage (assuming the
36/// block number <= the synced block of this stage), then the block can be considered empty.
37///
38/// # Tables
39///
40/// The bodies are processed and data is inserted into these tables:
41///
42/// - [`BlockOmmers`][reth_db_api::tables::BlockOmmers]
43/// - [`BlockBodies`][reth_db_api::tables::BlockBodyIndices]
44/// - [`Transactions`][reth_db_api::tables::Transactions]
45/// - [`TransactionBlocks`][reth_db_api::tables::TransactionBlocks]
46///
47/// # Genesis
48///
49/// This stage expects that the genesis has been inserted into the appropriate tables:
50///
51/// - The header tables (see [`HeaderStage`][crate::stages::HeaderStage])
52/// - The [`BlockOmmers`][reth_db_api::tables::BlockOmmers] table
53/// - The [`BlockBodies`][reth_db_api::tables::BlockBodyIndices] table
54/// - The [`Transactions`][reth_db_api::tables::Transactions] table
55#[derive(Debug)]
56pub struct BodyStage<D: BodyDownloader> {
57    /// The body downloader.
58    downloader: D,
59    /// Block response buffer.
60    buffer: Option<Vec<BlockResponse<D::Block>>>,
61}
62
63impl<D: BodyDownloader> BodyStage<D> {
64    /// Create new bodies stage from downloader.
65    pub const fn new(downloader: D) -> Self {
66        Self { downloader, buffer: None }
67    }
68}
69
70/// Ensures that static files and database are in sync.
71pub(crate) fn ensure_consistency<Provider>(
72    provider: &Provider,
73    unwind_block: Option<u64>,
74) -> Result<(), StageError>
75where
76    Provider: DBProvider<Tx: DbTxMut> + BlockReader + StaticFileProviderFactory,
77{
78    // Get id for the next tx_num of zero if there are no transactions.
79    let next_tx_num = provider
80        .tx_ref()
81        .cursor_read::<tables::TransactionBlocks>()?
82        .last()?
83        .map(|(id, _)| id + 1)
84        .unwrap_or_default();
85
86    let static_file_provider = provider.static_file_provider();
87
88    // Make sure Transactions static file is at the same height. If it's further, this
89    // input execution was interrupted previously and we need to unwind the static file.
90    let next_static_file_tx_num = static_file_provider
91        .get_highest_static_file_tx(StaticFileSegment::Transactions)
92        .map(|id| id + 1)
93        .unwrap_or_default();
94
95    match next_static_file_tx_num.cmp(&next_tx_num) {
96        // If static files are ahead, we are currently unwinding the stage or we didn't reach
97        // the database commit in a previous stage run. So, our only solution is to unwind the
98        // static files and proceed from the database expected height.
99        Ordering::Greater => {
100            let highest_db_block = provider.tx_ref().entries::<tables::BlockBodyIndices>()? as u64;
101            let mut static_file_producer =
102                static_file_provider.latest_writer(StaticFileSegment::Transactions)?;
103            static_file_producer
104                .prune_transactions(next_static_file_tx_num - next_tx_num, highest_db_block)?;
105            // Since this is a database <-> static file inconsistency, we commit the change
106            // straight away.
107            static_file_producer.commit()?;
108        }
109        // If static files are behind, then there was some corruption or loss of files. This
110        // error will trigger an unwind, that will bring the database to the same height as the
111        // static files.
112        Ordering::Less => {
113            // If we are already in the process of unwind, this might be fine because we will
114            // fix the inconsistency right away.
115            if let Some(unwind_to) = unwind_block {
116                let next_tx_num_after_unwind = provider
117                    .block_body_indices(unwind_to)?
118                    .map(|b| b.next_tx_num())
119                    .ok_or(ProviderError::BlockBodyIndicesNotFound(unwind_to))?;
120
121                // This means we need a deeper unwind.
122                if next_tx_num_after_unwind > next_static_file_tx_num {
123                    return Err(missing_static_data_error(
124                        next_static_file_tx_num.saturating_sub(1),
125                        &static_file_provider,
126                        provider,
127                        StaticFileSegment::Transactions,
128                    )?)
129                }
130            } else {
131                return Err(missing_static_data_error(
132                    next_static_file_tx_num.saturating_sub(1),
133                    &static_file_provider,
134                    provider,
135                    StaticFileSegment::Transactions,
136                )?)
137            }
138        }
139        Ordering::Equal => {}
140    }
141
142    Ok(())
143}
144
145impl<Provider, D> Stage<Provider> for BodyStage<D>
146where
147    Provider: DBProvider<Tx: DbTxMut>
148        + StaticFileProviderFactory
149        + StatsReader
150        + BlockReader
151        + BlockWriter<Block = D::Block>,
152    D: BodyDownloader,
153{
154    /// Return the id of the stage
155    fn id(&self) -> StageId {
156        StageId::Bodies
157    }
158
159    fn poll_execute_ready(
160        &mut self,
161        cx: &mut Context<'_>,
162        input: ExecInput,
163    ) -> Poll<Result<(), StageError>> {
164        if input.target_reached() || self.buffer.is_some() {
165            return Poll::Ready(Ok(()))
166        }
167
168        // Update the header range on the downloader
169        self.downloader.set_download_range(input.next_block_range())?;
170
171        // Poll next downloader item.
172        let maybe_next_result = ready!(self.downloader.try_poll_next_unpin(cx));
173
174        // Task downloader can return `None` only if the response relaying channel was closed. This
175        // is a fatal error to prevent the pipeline from running forever.
176        let response = match maybe_next_result {
177            Some(Ok(downloaded)) => {
178                self.buffer = Some(downloaded);
179                Ok(())
180            }
181            Some(Err(err)) => Err(err.into()),
182            None => Err(StageError::ChannelClosed),
183        };
184        Poll::Ready(response)
185    }
186
187    /// Download block bodies from the last checkpoint for this stage up until the latest synced
188    /// header, limited by the stage's batch size.
189    fn execute(&mut self, provider: &Provider, input: ExecInput) -> Result<ExecOutput, StageError> {
190        if input.target_reached() {
191            return Ok(ExecOutput::done(input.checkpoint()))
192        }
193        let (from_block, to_block) = input.next_block_range().into_inner();
194
195        ensure_consistency(provider, None)?;
196
197        debug!(target: "sync::stages::bodies", stage_progress = from_block, target = to_block, "Commencing sync");
198
199        let buffer = self.buffer.take().ok_or(StageError::MissingDownloadBuffer)?;
200        trace!(target: "sync::stages::bodies", bodies_len = buffer.len(), "Writing blocks");
201        let highest_block = buffer.last().map(|r| r.block_number()).unwrap_or(from_block);
202
203        // Write bodies to database.
204        provider.append_block_bodies(
205            buffer
206                .into_iter()
207                .map(|response| (response.block_number(), response.into_body()))
208                .collect(),
209            // We are writing transactions directly to static files.
210            StorageLocation::StaticFiles,
211        )?;
212
213        // The stage is "done" if:
214        // - We got fewer blocks than our target
215        // - We reached our target and the target was not limited by the batch size of the stage
216        let done = highest_block == to_block;
217        Ok(ExecOutput {
218            checkpoint: StageCheckpoint::new(highest_block)
219                .with_entities_stage_checkpoint(stage_checkpoint(provider)?),
220            done,
221        })
222    }
223
224    /// Unwind the stage.
225    fn unwind(
226        &mut self,
227        provider: &Provider,
228        input: UnwindInput,
229    ) -> Result<UnwindOutput, StageError> {
230        self.buffer.take();
231
232        ensure_consistency(provider, Some(input.unwind_to))?;
233        provider.remove_bodies_above(input.unwind_to, StorageLocation::Both)?;
234
235        Ok(UnwindOutput {
236            checkpoint: StageCheckpoint::new(input.unwind_to)
237                .with_entities_stage_checkpoint(stage_checkpoint(provider)?),
238        })
239    }
240}
241
242// TODO(alexey): ideally, we want to measure Bodies stage progress in bytes, but it's hard to know
243//  beforehand how many bytes we need to download. So the good solution would be to measure the
244//  progress in gas as a proxy to size. Execution stage uses a similar approach.
245fn stage_checkpoint<Provider>(provider: &Provider) -> ProviderResult<EntitiesCheckpoint>
246where
247    Provider: StatsReader + StaticFileProviderFactory,
248{
249    Ok(EntitiesCheckpoint {
250        processed: provider.count_entries::<tables::BlockBodyIndices>()? as u64,
251        // Count only static files entries. If we count the database entries too, we may have
252        // duplicates. We're sure that the static files have all entries that database has,
253        // because we run the `StaticFileProducer` before starting the pipeline.
254        total: provider.static_file_provider().count_entries::<tables::Headers>()? as u64,
255    })
256}
257
258#[cfg(test)]
259mod tests {
260    use super::*;
261    use crate::test_utils::{
262        stage_test_suite_ext, ExecuteStageTestRunner, StageTestRunner, UnwindStageTestRunner,
263    };
264    use assert_matches::assert_matches;
265    use reth_provider::StaticFileProviderFactory;
266    use reth_stages_api::StageUnitCheckpoint;
267    use test_utils::*;
268
269    stage_test_suite_ext!(BodyTestRunner, body);
270
271    /// Checks that the stage downloads at most `batch_size` blocks.
272    #[tokio::test]
273    async fn partial_body_download() {
274        let (stage_progress, previous_stage) = (1, 200);
275
276        // Set up test runner
277        let mut runner = BodyTestRunner::default();
278        let input = ExecInput {
279            target: Some(previous_stage),
280            checkpoint: Some(StageCheckpoint::new(stage_progress)),
281        };
282        runner.seed_execution(input).expect("failed to seed execution");
283
284        // Set the batch size (max we sync per stage execution) to less than the number of blocks
285        // the previous stage synced (10 vs 20)
286        let batch_size = 10;
287        runner.set_batch_size(batch_size);
288
289        // Run the stage
290        let rx = runner.execute(input);
291
292        // Check that we only synced around `batch_size` blocks even though the number of blocks
293        // synced by the previous stage is higher
294        let output = rx.await.unwrap();
295        runner.db().factory.static_file_provider().commit().unwrap();
296        assert_matches!(
297            output,
298            Ok(ExecOutput { checkpoint: StageCheckpoint {
299                block_number,
300                stage_checkpoint: Some(StageUnitCheckpoint::Entities(EntitiesCheckpoint {
301                    processed, // 1 seeded block body + batch size
302                    total // seeded headers
303                }))
304            }, done: false }) if block_number < 200 &&
305                processed == batch_size + 1 && total == previous_stage + 1
306        );
307        assert!(runner.validate_execution(input, output.ok()).is_ok(), "execution validation");
308    }
309
310    /// Same as [`partial_body_download`] except the `batch_size` is not hit.
311    #[tokio::test]
312    async fn full_body_download() {
313        let (stage_progress, previous_stage) = (1, 20);
314
315        // Set up test runner
316        let mut runner = BodyTestRunner::default();
317        let input = ExecInput {
318            target: Some(previous_stage),
319            checkpoint: Some(StageCheckpoint::new(stage_progress)),
320        };
321        runner.seed_execution(input).expect("failed to seed execution");
322
323        // Set the batch size to more than what the previous stage synced (40 vs 20)
324        runner.set_batch_size(40);
325
326        // Run the stage
327        let rx = runner.execute(input);
328
329        // Check that we synced all blocks successfully, even though our `batch_size` allows us to
330        // sync more (if there were more headers)
331        let output = rx.await.unwrap();
332        runner.db().factory.static_file_provider().commit().unwrap();
333        assert_matches!(
334            output,
335            Ok(ExecOutput {
336                checkpoint: StageCheckpoint {
337                    block_number: 20,
338                    stage_checkpoint: Some(StageUnitCheckpoint::Entities(EntitiesCheckpoint {
339                        processed,
340                        total
341                    }))
342                },
343                done: true
344            }) if processed + 1 == total && total == previous_stage + 1
345        );
346        assert!(runner.validate_execution(input, output.ok()).is_ok(), "execution validation");
347    }
348
349    /// Same as [`full_body_download`] except we have made progress before
350    #[tokio::test]
351    async fn sync_from_previous_progress() {
352        let (stage_progress, previous_stage) = (1, 21);
353
354        // Set up test runner
355        let mut runner = BodyTestRunner::default();
356        let input = ExecInput {
357            target: Some(previous_stage),
358            checkpoint: Some(StageCheckpoint::new(stage_progress)),
359        };
360        runner.seed_execution(input).expect("failed to seed execution");
361
362        let batch_size = 10;
363        runner.set_batch_size(batch_size);
364
365        // Run the stage
366        let rx = runner.execute(input);
367
368        // Check that we synced at least 10 blocks
369        let first_run = rx.await.unwrap();
370        runner.db().factory.static_file_provider().commit().unwrap();
371        assert_matches!(
372            first_run,
373            Ok(ExecOutput { checkpoint: StageCheckpoint {
374                block_number,
375                stage_checkpoint: Some(StageUnitCheckpoint::Entities(EntitiesCheckpoint {
376                    processed,
377                    total
378                }))
379            }, done: false }) if block_number >= 10 &&
380                processed - 1 == batch_size && total == previous_stage + 1
381        );
382        let first_run_checkpoint = first_run.unwrap().checkpoint;
383
384        // Execute again on top of the previous run
385        let input =
386            ExecInput { target: Some(previous_stage), checkpoint: Some(first_run_checkpoint) };
387        let rx = runner.execute(input);
388
389        // Check that we synced more blocks
390        let output = rx.await.unwrap();
391        runner.db().factory.static_file_provider().commit().unwrap();
392        assert_matches!(
393            output,
394            Ok(ExecOutput { checkpoint: StageCheckpoint {
395                block_number,
396                stage_checkpoint: Some(StageUnitCheckpoint::Entities(EntitiesCheckpoint {
397                    processed,
398                    total
399                }))
400            }, done: true }) if block_number > first_run_checkpoint.block_number &&
401                processed + 1 == total && total == previous_stage + 1
402        );
403        assert_matches!(
404            runner.validate_execution(input, output.ok()),
405            Ok(_),
406            "execution validation"
407        );
408    }
409
410    /// Checks that the stage unwinds correctly, even if a transaction in a block is missing.
411    #[tokio::test]
412    async fn unwind_missing_tx() {
413        let (stage_progress, previous_stage) = (1, 20);
414
415        // Set up test runner
416        let mut runner = BodyTestRunner::default();
417        let input = ExecInput {
418            target: Some(previous_stage),
419            checkpoint: Some(StageCheckpoint::new(stage_progress)),
420        };
421        runner.seed_execution(input).expect("failed to seed execution");
422
423        // Set the batch size to more than what the previous stage synced (40 vs 20)
424        runner.set_batch_size(40);
425
426        // Run the stage
427        let rx = runner.execute(input);
428
429        // Check that we synced all blocks successfully, even though our `batch_size` allows us to
430        // sync more (if there were more headers)
431        let output = rx.await.unwrap();
432        runner.db().factory.static_file_provider().commit().unwrap();
433        assert_matches!(
434            output,
435            Ok(ExecOutput { checkpoint: StageCheckpoint {
436                block_number,
437                stage_checkpoint: Some(StageUnitCheckpoint::Entities(EntitiesCheckpoint {
438                    processed,
439                    total
440                }))
441            }, done: true }) if block_number == previous_stage &&
442                processed + 1 == total && total == previous_stage + 1
443        );
444        let checkpoint = output.unwrap().checkpoint;
445        runner
446            .validate_db_blocks(input.checkpoint().block_number, checkpoint.block_number)
447            .expect("Written block data invalid");
448
449        // Delete a transaction
450        let static_file_provider = runner.db().factory.static_file_provider();
451        {
452            let mut static_file_producer =
453                static_file_provider.latest_writer(StaticFileSegment::Transactions).unwrap();
454            static_file_producer.prune_transactions(1, checkpoint.block_number).unwrap();
455            static_file_producer.commit().unwrap();
456        }
457        // Unwind all of it
458        let unwind_to = 1;
459        let input = UnwindInput { bad_block: None, checkpoint, unwind_to };
460        let res = runner.unwind(input).await;
461        assert_matches!(
462            res,
463            Ok(UnwindOutput { checkpoint: StageCheckpoint {
464                block_number: 1,
465                stage_checkpoint: Some(StageUnitCheckpoint::Entities(EntitiesCheckpoint {
466                    processed: 1,
467                    total
468                }))
469            }}) if total == previous_stage + 1
470        );
471
472        assert_matches!(runner.validate_unwind(input), Ok(_), "unwind validation");
473    }
474
475    mod test_utils {
476        use crate::{
477            stages::bodies::BodyStage,
478            test_utils::{
479                ExecuteStageTestRunner, StageTestRunner, TestRunnerError, TestStageDB,
480                UnwindStageTestRunner,
481            },
482        };
483        use alloy_consensus::{BlockHeader, Header};
484        use alloy_primitives::{BlockNumber, TxNumber, B256};
485        use futures_util::Stream;
486        use reth_db::{static_file::HeaderWithHashMask, tables};
487        use reth_db_api::{
488            cursor::DbCursorRO,
489            models::{StoredBlockBodyIndices, StoredBlockOmmers},
490            transaction::{DbTx, DbTxMut},
491        };
492        use reth_ethereum_primitives::{Block, BlockBody};
493        use reth_network_p2p::{
494            bodies::{
495                downloader::{BodyDownloader, BodyDownloaderResult},
496                response::BlockResponse,
497            },
498            error::DownloadResult,
499        };
500        use reth_primitives_traits::{SealedBlock, SealedHeader};
501        use reth_provider::{
502            providers::StaticFileWriter, test_utils::MockNodeTypesWithDB, HeaderProvider,
503            ProviderFactory, StaticFileProviderFactory, TransactionsProvider,
504        };
505        use reth_stages_api::{ExecInput, ExecOutput, UnwindInput};
506        use reth_static_file_types::StaticFileSegment;
507        use reth_testing_utils::generators::{
508            self, random_block_range, random_signed_tx, BlockRangeParams,
509        };
510        use std::{
511            collections::{HashMap, VecDeque},
512            ops::RangeInclusive,
513            pin::Pin,
514            task::{Context, Poll},
515        };
516
517        /// The block hash of the genesis block.
518        pub(crate) const GENESIS_HASH: B256 = B256::ZERO;
519
520        /// A helper to create a collection of block bodies keyed by their hash.
521        pub(crate) fn body_by_hash(block: &SealedBlock<Block>) -> (B256, BlockBody) {
522            (block.hash(), block.body().clone())
523        }
524
525        /// A helper struct for running the [`BodyStage`].
526        pub(crate) struct BodyTestRunner {
527            responses: HashMap<B256, BlockBody>,
528            db: TestStageDB,
529            batch_size: u64,
530        }
531
532        impl Default for BodyTestRunner {
533            fn default() -> Self {
534                Self { responses: HashMap::default(), db: TestStageDB::default(), batch_size: 1000 }
535            }
536        }
537
538        impl BodyTestRunner {
539            pub(crate) fn set_batch_size(&mut self, batch_size: u64) {
540                self.batch_size = batch_size;
541            }
542
543            pub(crate) fn set_responses(&mut self, responses: HashMap<B256, BlockBody>) {
544                self.responses = responses;
545            }
546        }
547
548        impl StageTestRunner for BodyTestRunner {
549            type S = BodyStage<TestBodyDownloader>;
550
551            fn db(&self) -> &TestStageDB {
552                &self.db
553            }
554
555            fn stage(&self) -> Self::S {
556                BodyStage::new(TestBodyDownloader::new(
557                    self.db.factory.clone(),
558                    self.responses.clone(),
559                    self.batch_size,
560                ))
561            }
562        }
563
564        impl ExecuteStageTestRunner for BodyTestRunner {
565            type Seed = Vec<SealedBlock<Block>>;
566
567            fn seed_execution(&mut self, input: ExecInput) -> Result<Self::Seed, TestRunnerError> {
568                let start = input.checkpoint().block_number;
569                let end = input.target();
570
571                let static_file_provider = self.db.factory.static_file_provider();
572
573                let mut rng = generators::rng();
574
575                // Static files do not support gaps in headers, so we need to generate 0 to end
576                let blocks = random_block_range(
577                    &mut rng,
578                    0..=end,
579                    BlockRangeParams {
580                        parent: Some(GENESIS_HASH),
581                        tx_count: 0..2,
582                        ..Default::default()
583                    },
584                );
585                self.db.insert_headers_with_td(blocks.iter().map(|block| block.sealed_header()))?;
586                if let Some(progress) = blocks.get(start as usize) {
587                    // Insert last progress data
588                    {
589                        let tx = self.db.factory.provider_rw()?.into_tx();
590                        let mut static_file_producer = static_file_provider
591                            .get_writer(start, StaticFileSegment::Transactions)?;
592
593                        let body = StoredBlockBodyIndices {
594                            first_tx_num: 0,
595                            tx_count: progress.transaction_count() as u64,
596                        };
597
598                        static_file_producer.set_block_range(0..=progress.number);
599
600                        body.tx_num_range().try_for_each(|tx_num| {
601                            let transaction = random_signed_tx(&mut rng);
602                            static_file_producer.append_transaction(tx_num, &transaction).map(drop)
603                        })?;
604
605                        if body.tx_count != 0 {
606                            tx.put::<tables::TransactionBlocks>(
607                                body.last_tx_num(),
608                                progress.number,
609                            )?;
610                        }
611
612                        tx.put::<tables::BlockBodyIndices>(progress.number, body)?;
613
614                        if !progress.ommers_hash_is_empty() {
615                            tx.put::<tables::BlockOmmers>(
616                                progress.number,
617                                StoredBlockOmmers { ommers: progress.body().ommers.clone() },
618                            )?;
619                        }
620
621                        static_file_producer.commit()?;
622                        tx.commit()?;
623                    }
624                }
625                self.set_responses(blocks.iter().map(body_by_hash).collect());
626                Ok(blocks)
627            }
628
629            fn validate_execution(
630                &self,
631                input: ExecInput,
632                output: Option<ExecOutput>,
633            ) -> Result<(), TestRunnerError> {
634                let highest_block = match output.as_ref() {
635                    Some(output) => output.checkpoint,
636                    None => input.checkpoint(),
637                }
638                .block_number;
639                self.validate_db_blocks(highest_block, highest_block)
640            }
641        }
642
643        impl UnwindStageTestRunner for BodyTestRunner {
644            fn validate_unwind(&self, input: UnwindInput) -> Result<(), TestRunnerError> {
645                self.db.ensure_no_entry_above::<tables::BlockBodyIndices, _>(
646                    input.unwind_to,
647                    |key| key,
648                )?;
649                self.db
650                    .ensure_no_entry_above::<tables::BlockOmmers, _>(input.unwind_to, |key| key)?;
651                if let Some(last_tx_id) = self.get_last_tx_id()? {
652                    self.db
653                        .ensure_no_entry_above::<tables::Transactions, _>(last_tx_id, |key| key)?;
654                    self.db.ensure_no_entry_above::<tables::TransactionBlocks, _>(
655                        last_tx_id,
656                        |key| key,
657                    )?;
658                }
659                Ok(())
660            }
661        }
662
663        impl BodyTestRunner {
664            /// Get the last available tx id if any
665            pub(crate) fn get_last_tx_id(&self) -> Result<Option<TxNumber>, TestRunnerError> {
666                let last_body = self.db.query(|tx| {
667                    let v = tx.cursor_read::<tables::BlockBodyIndices>()?.last()?;
668                    Ok(v)
669                })?;
670                Ok(match last_body {
671                    Some((_, body)) if body.tx_count != 0 => {
672                        Some(body.first_tx_num + body.tx_count - 1)
673                    }
674                    _ => None,
675                })
676            }
677
678            /// Validate that the inserted block data is valid
679            pub(crate) fn validate_db_blocks(
680                &self,
681                prev_progress: BlockNumber,
682                highest_block: BlockNumber,
683            ) -> Result<(), TestRunnerError> {
684                let static_file_provider = self.db.factory.static_file_provider();
685
686                self.db.query(|tx| {
687                    // Acquire cursors on body related tables
688                    let mut bodies_cursor = tx.cursor_read::<tables::BlockBodyIndices>()?;
689                    let mut ommers_cursor = tx.cursor_read::<tables::BlockOmmers>()?;
690                    let mut tx_block_cursor = tx.cursor_read::<tables::TransactionBlocks>()?;
691
692                    let first_body_key = match bodies_cursor.first()? {
693                        Some((key, _)) => key,
694                        None => return Ok(()),
695                    };
696
697                    let mut prev_number: Option<BlockNumber> = None;
698
699
700                    for entry in bodies_cursor.walk(Some(first_body_key))? {
701                        let (number, body) = entry?;
702
703                        // Validate sequentiality only after prev progress,
704                        // since the data before is mocked and can contain gaps
705                        if number > prev_progress {
706                            if let Some(prev_key) = prev_number {
707                                assert_eq!(prev_key + 1, number, "Body entries must be sequential");
708                            }
709                        }
710
711                        // Validate that the current entry is below or equals to the highest allowed block
712                        assert!(
713                            number <= highest_block,
714                            "We wrote a block body outside of our synced range. Found block with number {number}, highest block according to stage is {highest_block}",
715                        );
716
717                        let header = static_file_provider.header_by_number(number)?.expect("to be present");
718                        // Validate that ommers exist if any
719                        let stored_ommers =  ommers_cursor.seek_exact(number)?;
720                        if header.ommers_hash_is_empty() {
721                            assert!(stored_ommers.is_none(), "Unexpected ommers entry");
722                        } else {
723                            assert!(stored_ommers.is_some(), "Missing ommers entry");
724                        }
725
726                        let tx_block_id = tx_block_cursor.seek_exact(body.last_tx_num())?.map(|(_,b)| b);
727                        if body.tx_count == 0 {
728                            assert_ne!(tx_block_id,Some(number));
729                        } else {
730                            assert_eq!(tx_block_id, Some(number));
731                        }
732
733                        for tx_id in body.tx_num_range() {
734                            assert!(static_file_provider.transaction_by_id(tx_id)?.is_some(), "Transaction is missing.");
735                        }
736
737                        prev_number = Some(number);
738                    }
739                    Ok(())
740                })?;
741                Ok(())
742            }
743        }
744
745        /// A [`BodyDownloader`] that is backed by an internal [`HashMap`] for testing.
746        #[derive(Debug)]
747        pub(crate) struct TestBodyDownloader {
748            provider_factory: ProviderFactory<MockNodeTypesWithDB>,
749            responses: HashMap<B256, BlockBody>,
750            headers: VecDeque<SealedHeader>,
751            batch_size: u64,
752        }
753
754        impl TestBodyDownloader {
755            pub(crate) fn new(
756                provider_factory: ProviderFactory<MockNodeTypesWithDB>,
757                responses: HashMap<B256, BlockBody>,
758                batch_size: u64,
759            ) -> Self {
760                Self { provider_factory, responses, headers: VecDeque::default(), batch_size }
761            }
762        }
763
764        impl BodyDownloader for TestBodyDownloader {
765            type Block = Block;
766
767            fn set_download_range(
768                &mut self,
769                range: RangeInclusive<BlockNumber>,
770            ) -> DownloadResult<()> {
771                let static_file_provider = self.provider_factory.static_file_provider();
772
773                for header in static_file_provider.fetch_range_iter(
774                    StaticFileSegment::Headers,
775                    *range.start()..*range.end() + 1,
776                    |cursor, number| cursor.get_two::<HeaderWithHashMask<Header>>(number.into()),
777                )? {
778                    let (header, hash) = header?;
779                    self.headers.push_back(SealedHeader::new(header, hash));
780                }
781
782                Ok(())
783            }
784        }
785
786        impl Stream for TestBodyDownloader {
787            type Item = BodyDownloaderResult<Block>;
788            fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
789                let this = self.get_mut();
790
791                if this.headers.is_empty() {
792                    return Poll::Ready(None)
793                }
794
795                let mut response =
796                    Vec::with_capacity(std::cmp::min(this.headers.len(), this.batch_size as usize));
797                while let Some(header) = this.headers.pop_front() {
798                    if header.is_empty() {
799                        response.push(BlockResponse::Empty(header))
800                    } else {
801                        let body =
802                            this.responses.remove(&header.hash()).expect("requested unknown body");
803                        response.push(BlockResponse::Full(SealedBlock::from_sealed_parts(
804                            header, body,
805                        )));
806                    }
807
808                    if response.len() as u64 >= this.batch_size {
809                        break
810                    }
811                }
812
813                if !response.is_empty() {
814                    return Poll::Ready(Some(Ok(response)))
815                }
816
817                panic!("requested bodies without setting headers")
818            }
819        }
820    }
821}