reth_stages/stages/
bodies.rs

1use super::missing_static_data_error;
2use futures_util::TryStreamExt;
3use reth_db_api::{
4    cursor::DbCursorRO,
5    tables,
6    transaction::{DbTx, DbTxMut},
7};
8use reth_network_p2p::bodies::{downloader::BodyDownloader, response::BlockResponse};
9use reth_provider::{
10    providers::StaticFileWriter, BlockReader, BlockWriter, DBProvider, ProviderError,
11    StaticFileProviderFactory, StatsReader,
12};
13use reth_stages_api::{
14    EntitiesCheckpoint, ExecInput, ExecOutput, Stage, StageCheckpoint, StageError, StageId,
15    UnwindInput, UnwindOutput,
16};
17use reth_static_file_types::StaticFileSegment;
18use reth_storage_errors::provider::ProviderResult;
19use std::{
20    cmp::Ordering,
21    task::{ready, Context, Poll},
22};
23use tracing::*;
24
25/// The body stage downloads block bodies.
26///
27/// The body stage downloads block bodies for all block headers stored locally in storage.
28///
29/// # Empty blocks
30///
31/// Blocks with an ommers hash corresponding to no ommers *and* a transaction root corresponding to
32/// no transactions will not have a block body downloaded for them, since it would be meaningless to
33/// do so.
34///
35/// This also means that if there is no body for the block in storage (assuming the
36/// block number <= the synced block of this stage), then the block can be considered empty.
37///
38/// # Tables
39///
40/// The bodies are processed and data is inserted into these tables:
41///
42/// - [`BlockOmmers`][reth_db_api::tables::BlockOmmers]
43/// - [`BlockBodies`][reth_db_api::tables::BlockBodyIndices]
44/// - [`Transactions`][reth_db_api::tables::Transactions]
45/// - [`TransactionBlocks`][reth_db_api::tables::TransactionBlocks]
46///
47/// # Genesis
48///
49/// This stage expects that the genesis has been inserted into the appropriate tables:
50///
51/// - The header tables (see [`HeaderStage`][crate::stages::HeaderStage])
52/// - The [`BlockOmmers`][reth_db_api::tables::BlockOmmers] table
53/// - The [`BlockBodies`][reth_db_api::tables::BlockBodyIndices] table
54/// - The [`Transactions`][reth_db_api::tables::Transactions] table
55#[derive(Debug)]
56pub struct BodyStage<D: BodyDownloader> {
57    /// The body downloader.
58    downloader: D,
59    /// Block response buffer.
60    buffer: Option<Vec<BlockResponse<D::Block>>>,
61}
62
63impl<D: BodyDownloader> BodyStage<D> {
64    /// Create new bodies stage from downloader.
65    pub const fn new(downloader: D) -> Self {
66        Self { downloader, buffer: None }
67    }
68}
69
70/// Ensures that static files and database are in sync.
71pub(crate) fn ensure_consistency<Provider>(
72    provider: &Provider,
73    unwind_block: Option<u64>,
74) -> Result<(), StageError>
75where
76    Provider: DBProvider<Tx: DbTxMut> + BlockReader + StaticFileProviderFactory,
77{
78    // Get id for the next tx_num of zero if there are no transactions.
79    let next_tx_num = provider
80        .tx_ref()
81        .cursor_read::<tables::TransactionBlocks>()?
82        .last()?
83        .map(|(id, _)| id + 1)
84        .unwrap_or_default();
85
86    let static_file_provider = provider.static_file_provider();
87
88    // Make sure Transactions static file is at the same height. If it's further, this
89    // input execution was interrupted previously and we need to unwind the static file.
90    let next_static_file_tx_num = static_file_provider
91        .get_highest_static_file_tx(StaticFileSegment::Transactions)
92        .map(|id| id + 1)
93        .unwrap_or_default();
94
95    match next_static_file_tx_num.cmp(&next_tx_num) {
96        // If static files are ahead, we are currently unwinding the stage or we didn't reach
97        // the database commit in a previous stage run. So, our only solution is to unwind the
98        // static files and proceed from the database expected height.
99        Ordering::Greater => {
100            let highest_db_block = provider.tx_ref().entries::<tables::BlockBodyIndices>()? as u64;
101            let mut static_file_producer =
102                static_file_provider.latest_writer(StaticFileSegment::Transactions)?;
103            static_file_producer
104                .prune_transactions(next_static_file_tx_num - next_tx_num, highest_db_block)?;
105            // Since this is a database <-> static file inconsistency, we commit the change
106            // straight away.
107            static_file_producer.commit()?;
108        }
109        // If static files are behind, then there was some corruption or loss of files. This
110        // error will trigger an unwind, that will bring the database to the same height as the
111        // static files.
112        Ordering::Less => {
113            // If we are already in the process of unwind, this might be fine because we will
114            // fix the inconsistency right away.
115            if let Some(unwind_to) = unwind_block {
116                let next_tx_num_after_unwind = provider
117                    .block_body_indices(unwind_to)?
118                    .map(|b| b.next_tx_num())
119                    .ok_or(ProviderError::BlockBodyIndicesNotFound(unwind_to))?;
120
121                // This means we need a deeper unwind.
122                if next_tx_num_after_unwind > next_static_file_tx_num {
123                    return Err(missing_static_data_error(
124                        next_static_file_tx_num.saturating_sub(1),
125                        &static_file_provider,
126                        provider,
127                        StaticFileSegment::Transactions,
128                    )?)
129                }
130            } else {
131                return Err(missing_static_data_error(
132                    next_static_file_tx_num.saturating_sub(1),
133                    &static_file_provider,
134                    provider,
135                    StaticFileSegment::Transactions,
136                )?)
137            }
138        }
139        Ordering::Equal => {}
140    }
141
142    Ok(())
143}
144
145impl<Provider, D> Stage<Provider> for BodyStage<D>
146where
147    Provider: DBProvider<Tx: DbTxMut>
148        + StaticFileProviderFactory
149        + StatsReader
150        + BlockReader
151        + BlockWriter<Block = D::Block>,
152    D: BodyDownloader,
153{
154    /// Return the id of the stage
155    fn id(&self) -> StageId {
156        StageId::Bodies
157    }
158
159    fn poll_execute_ready(
160        &mut self,
161        cx: &mut Context<'_>,
162        input: ExecInput,
163    ) -> Poll<Result<(), StageError>> {
164        if input.target_reached() || self.buffer.is_some() {
165            return Poll::Ready(Ok(()))
166        }
167
168        // Update the header range on the downloader
169        self.downloader.set_download_range(input.next_block_range())?;
170
171        // Poll next downloader item.
172        let maybe_next_result = ready!(self.downloader.try_poll_next_unpin(cx));
173
174        // Task downloader can return `None` only if the response relaying channel was closed. This
175        // is a fatal error to prevent the pipeline from running forever.
176        let response = match maybe_next_result {
177            Some(Ok(downloaded)) => {
178                self.buffer = Some(downloaded);
179                Ok(())
180            }
181            Some(Err(err)) => Err(err.into()),
182            None => Err(StageError::ChannelClosed),
183        };
184        Poll::Ready(response)
185    }
186
187    /// Download block bodies from the last checkpoint for this stage up until the latest synced
188    /// header, limited by the stage's batch size.
189    fn execute(&mut self, provider: &Provider, input: ExecInput) -> Result<ExecOutput, StageError> {
190        if input.target_reached() {
191            return Ok(ExecOutput::done(input.checkpoint()))
192        }
193        let (from_block, to_block) = input.next_block_range().into_inner();
194
195        ensure_consistency(provider, None)?;
196
197        debug!(target: "sync::stages::bodies", stage_progress = from_block, target = to_block, "Commencing sync");
198
199        let buffer = self.buffer.take().ok_or(StageError::MissingDownloadBuffer)?;
200        trace!(target: "sync::stages::bodies", bodies_len = buffer.len(), "Writing blocks");
201        let highest_block = buffer.last().map(|r| r.block_number()).unwrap_or(from_block);
202
203        // Write bodies to database.
204        provider.append_block_bodies(
205            buffer.iter().map(|response| (response.block_number(), response.body())).collect(),
206        )?;
207
208        // The stage is "done" if:
209        // - We got fewer blocks than our target
210        // - We reached our target and the target was not limited by the batch size of the stage
211        let done = highest_block == to_block;
212        Ok(ExecOutput {
213            checkpoint: StageCheckpoint::new(highest_block)
214                .with_entities_stage_checkpoint(stage_checkpoint(provider)?),
215            done,
216        })
217    }
218
219    /// Unwind the stage.
220    fn unwind(
221        &mut self,
222        provider: &Provider,
223        input: UnwindInput,
224    ) -> Result<UnwindOutput, StageError> {
225        self.buffer.take();
226
227        ensure_consistency(provider, Some(input.unwind_to))?;
228        provider.remove_bodies_above(input.unwind_to)?;
229
230        Ok(UnwindOutput {
231            checkpoint: StageCheckpoint::new(input.unwind_to)
232                .with_entities_stage_checkpoint(stage_checkpoint(provider)?),
233        })
234    }
235}
236
237// TODO(alexey): ideally, we want to measure Bodies stage progress in bytes, but it's hard to know
238//  beforehand how many bytes we need to download. So the good solution would be to measure the
239//  progress in gas as a proxy to size. Execution stage uses a similar approach.
240fn stage_checkpoint<Provider>(provider: &Provider) -> ProviderResult<EntitiesCheckpoint>
241where
242    Provider: StatsReader + StaticFileProviderFactory,
243{
244    Ok(EntitiesCheckpoint {
245        processed: provider.count_entries::<tables::BlockBodyIndices>()? as u64,
246        // Count only static files entries. If we count the database entries too, we may have
247        // duplicates. We're sure that the static files have all entries that database has,
248        // because we run the `StaticFileProducer` before starting the pipeline.
249        total: provider.static_file_provider().count_entries::<tables::Headers>()? as u64,
250    })
251}
252
253#[cfg(test)]
254mod tests {
255    use super::*;
256    use crate::test_utils::{
257        stage_test_suite_ext, ExecuteStageTestRunner, StageTestRunner, UnwindStageTestRunner,
258    };
259    use assert_matches::assert_matches;
260    use reth_provider::StaticFileProviderFactory;
261    use reth_stages_api::StageUnitCheckpoint;
262    use test_utils::*;
263
264    stage_test_suite_ext!(BodyTestRunner, body);
265
266    /// Checks that the stage downloads at most `batch_size` blocks.
267    #[tokio::test]
268    async fn partial_body_download() {
269        let (stage_progress, previous_stage) = (1, 200);
270
271        // Set up test runner
272        let mut runner = BodyTestRunner::default();
273        let input = ExecInput {
274            target: Some(previous_stage),
275            checkpoint: Some(StageCheckpoint::new(stage_progress)),
276        };
277        runner.seed_execution(input).expect("failed to seed execution");
278
279        // Set the batch size (max we sync per stage execution) to less than the number of blocks
280        // the previous stage synced (10 vs 20)
281        let batch_size = 10;
282        runner.set_batch_size(batch_size);
283
284        // Run the stage
285        let rx = runner.execute(input);
286
287        // Check that we only synced around `batch_size` blocks even though the number of blocks
288        // synced by the previous stage is higher
289        let output = rx.await.unwrap();
290        runner.db().factory.static_file_provider().commit().unwrap();
291        assert_matches!(
292            output,
293            Ok(ExecOutput { checkpoint: StageCheckpoint {
294                block_number,
295                stage_checkpoint: Some(StageUnitCheckpoint::Entities(EntitiesCheckpoint {
296                    processed, // 1 seeded block body + batch size
297                    total // seeded headers
298                }))
299            }, done: false }) if block_number < 200 &&
300                processed == batch_size + 1 && total == previous_stage + 1
301        );
302        assert!(runner.validate_execution(input, output.ok()).is_ok(), "execution validation");
303    }
304
305    /// Same as [`partial_body_download`] except the `batch_size` is not hit.
306    #[tokio::test]
307    async fn full_body_download() {
308        let (stage_progress, previous_stage) = (1, 20);
309
310        // Set up test runner
311        let mut runner = BodyTestRunner::default();
312        let input = ExecInput {
313            target: Some(previous_stage),
314            checkpoint: Some(StageCheckpoint::new(stage_progress)),
315        };
316        runner.seed_execution(input).expect("failed to seed execution");
317
318        // Set the batch size to more than what the previous stage synced (40 vs 20)
319        runner.set_batch_size(40);
320
321        // Run the stage
322        let rx = runner.execute(input);
323
324        // Check that we synced all blocks successfully, even though our `batch_size` allows us to
325        // sync more (if there were more headers)
326        let output = rx.await.unwrap();
327        runner.db().factory.static_file_provider().commit().unwrap();
328        assert_matches!(
329            output,
330            Ok(ExecOutput {
331                checkpoint: StageCheckpoint {
332                    block_number: 20,
333                    stage_checkpoint: Some(StageUnitCheckpoint::Entities(EntitiesCheckpoint {
334                        processed,
335                        total
336                    }))
337                },
338                done: true
339            }) if processed + 1 == total && total == previous_stage + 1
340        );
341        assert!(runner.validate_execution(input, output.ok()).is_ok(), "execution validation");
342    }
343
344    /// Same as [`full_body_download`] except we have made progress before
345    #[tokio::test]
346    async fn sync_from_previous_progress() {
347        let (stage_progress, previous_stage) = (1, 21);
348
349        // Set up test runner
350        let mut runner = BodyTestRunner::default();
351        let input = ExecInput {
352            target: Some(previous_stage),
353            checkpoint: Some(StageCheckpoint::new(stage_progress)),
354        };
355        runner.seed_execution(input).expect("failed to seed execution");
356
357        let batch_size = 10;
358        runner.set_batch_size(batch_size);
359
360        // Run the stage
361        let rx = runner.execute(input);
362
363        // Check that we synced at least 10 blocks
364        let first_run = rx.await.unwrap();
365        runner.db().factory.static_file_provider().commit().unwrap();
366        assert_matches!(
367            first_run,
368            Ok(ExecOutput { checkpoint: StageCheckpoint {
369                block_number,
370                stage_checkpoint: Some(StageUnitCheckpoint::Entities(EntitiesCheckpoint {
371                    processed,
372                    total
373                }))
374            }, done: false }) if block_number >= 10 &&
375                processed - 1 == batch_size && total == previous_stage + 1
376        );
377        let first_run_checkpoint = first_run.unwrap().checkpoint;
378
379        // Execute again on top of the previous run
380        let input =
381            ExecInput { target: Some(previous_stage), checkpoint: Some(first_run_checkpoint) };
382        let rx = runner.execute(input);
383
384        // Check that we synced more blocks
385        let output = rx.await.unwrap();
386        runner.db().factory.static_file_provider().commit().unwrap();
387        assert_matches!(
388            output,
389            Ok(ExecOutput { checkpoint: StageCheckpoint {
390                block_number,
391                stage_checkpoint: Some(StageUnitCheckpoint::Entities(EntitiesCheckpoint {
392                    processed,
393                    total
394                }))
395            }, done: true }) if block_number > first_run_checkpoint.block_number &&
396                processed + 1 == total && total == previous_stage + 1
397        );
398        assert_matches!(
399            runner.validate_execution(input, output.ok()),
400            Ok(_),
401            "execution validation"
402        );
403    }
404
405    /// Checks that the stage unwinds correctly, even if a transaction in a block is missing.
406    #[tokio::test]
407    async fn unwind_missing_tx() {
408        let (stage_progress, previous_stage) = (1, 20);
409
410        // Set up test runner
411        let mut runner = BodyTestRunner::default();
412        let input = ExecInput {
413            target: Some(previous_stage),
414            checkpoint: Some(StageCheckpoint::new(stage_progress)),
415        };
416        runner.seed_execution(input).expect("failed to seed execution");
417
418        // Set the batch size to more than what the previous stage synced (40 vs 20)
419        runner.set_batch_size(40);
420
421        // Run the stage
422        let rx = runner.execute(input);
423
424        // Check that we synced all blocks successfully, even though our `batch_size` allows us to
425        // sync more (if there were more headers)
426        let output = rx.await.unwrap();
427        runner.db().factory.static_file_provider().commit().unwrap();
428        assert_matches!(
429            output,
430            Ok(ExecOutput { checkpoint: StageCheckpoint {
431                block_number,
432                stage_checkpoint: Some(StageUnitCheckpoint::Entities(EntitiesCheckpoint {
433                    processed,
434                    total
435                }))
436            }, done: true }) if block_number == previous_stage &&
437                processed + 1 == total && total == previous_stage + 1
438        );
439        let checkpoint = output.unwrap().checkpoint;
440        runner
441            .validate_db_blocks(input.checkpoint().block_number, checkpoint.block_number)
442            .expect("Written block data invalid");
443
444        // Delete a transaction
445        let static_file_provider = runner.db().factory.static_file_provider();
446        {
447            let mut static_file_producer =
448                static_file_provider.latest_writer(StaticFileSegment::Transactions).unwrap();
449            static_file_producer.prune_transactions(1, checkpoint.block_number).unwrap();
450            static_file_producer.commit().unwrap();
451        }
452        // Unwind all of it
453        let unwind_to = 1;
454        let input = UnwindInput { bad_block: None, checkpoint, unwind_to };
455        let res = runner.unwind(input).await;
456        assert_matches!(
457            res,
458            Ok(UnwindOutput { checkpoint: StageCheckpoint {
459                block_number: 1,
460                stage_checkpoint: Some(StageUnitCheckpoint::Entities(EntitiesCheckpoint {
461                    processed: 1,
462                    total
463                }))
464            }}) if total == previous_stage + 1
465        );
466
467        assert_matches!(runner.validate_unwind(input), Ok(_), "unwind validation");
468    }
469
470    mod test_utils {
471        use crate::{
472            stages::bodies::BodyStage,
473            test_utils::{
474                ExecuteStageTestRunner, StageTestRunner, TestRunnerError, TestStageDB,
475                UnwindStageTestRunner,
476            },
477        };
478        use alloy_consensus::{BlockHeader, Header};
479        use alloy_primitives::{BlockNumber, TxNumber, B256};
480        use futures_util::Stream;
481        use reth_db::{static_file::HeaderWithHashMask, tables};
482        use reth_db_api::{
483            cursor::DbCursorRO,
484            models::{StoredBlockBodyIndices, StoredBlockOmmers},
485            transaction::{DbTx, DbTxMut},
486        };
487        use reth_ethereum_primitives::{Block, BlockBody};
488        use reth_network_p2p::{
489            bodies::{
490                downloader::{BodyDownloader, BodyDownloaderResult},
491                response::BlockResponse,
492            },
493            error::DownloadResult,
494        };
495        use reth_primitives_traits::{SealedBlock, SealedHeader};
496        use reth_provider::{
497            providers::StaticFileWriter, test_utils::MockNodeTypesWithDB, HeaderProvider,
498            ProviderFactory, StaticFileProviderFactory, TransactionsProvider,
499        };
500        use reth_stages_api::{ExecInput, ExecOutput, UnwindInput};
501        use reth_static_file_types::StaticFileSegment;
502        use reth_testing_utils::generators::{
503            self, random_block_range, random_signed_tx, BlockRangeParams,
504        };
505        use std::{
506            collections::{HashMap, VecDeque},
507            ops::RangeInclusive,
508            pin::Pin,
509            task::{Context, Poll},
510        };
511
512        /// The block hash of the genesis block.
513        pub(crate) const GENESIS_HASH: B256 = B256::ZERO;
514
515        /// A helper to create a collection of block bodies keyed by their hash.
516        pub(crate) fn body_by_hash(block: &SealedBlock<Block>) -> (B256, BlockBody) {
517            (block.hash(), block.body().clone())
518        }
519
520        /// A helper struct for running the [`BodyStage`].
521        pub(crate) struct BodyTestRunner {
522            responses: HashMap<B256, BlockBody>,
523            db: TestStageDB,
524            batch_size: u64,
525        }
526
527        impl Default for BodyTestRunner {
528            fn default() -> Self {
529                Self { responses: HashMap::default(), db: TestStageDB::default(), batch_size: 1000 }
530            }
531        }
532
533        impl BodyTestRunner {
534            pub(crate) fn set_batch_size(&mut self, batch_size: u64) {
535                self.batch_size = batch_size;
536            }
537
538            pub(crate) fn set_responses(&mut self, responses: HashMap<B256, BlockBody>) {
539                self.responses = responses;
540            }
541        }
542
543        impl StageTestRunner for BodyTestRunner {
544            type S = BodyStage<TestBodyDownloader>;
545
546            fn db(&self) -> &TestStageDB {
547                &self.db
548            }
549
550            fn stage(&self) -> Self::S {
551                BodyStage::new(TestBodyDownloader::new(
552                    self.db.factory.clone(),
553                    self.responses.clone(),
554                    self.batch_size,
555                ))
556            }
557        }
558
559        impl ExecuteStageTestRunner for BodyTestRunner {
560            type Seed = Vec<SealedBlock<Block>>;
561
562            fn seed_execution(&mut self, input: ExecInput) -> Result<Self::Seed, TestRunnerError> {
563                let start = input.checkpoint().block_number;
564                let end = input.target();
565
566                let static_file_provider = self.db.factory.static_file_provider();
567
568                let mut rng = generators::rng();
569
570                // Static files do not support gaps in headers, so we need to generate 0 to end
571                let blocks = random_block_range(
572                    &mut rng,
573                    0..=end,
574                    BlockRangeParams {
575                        parent: Some(GENESIS_HASH),
576                        tx_count: 0..2,
577                        ..Default::default()
578                    },
579                );
580                self.db.insert_headers(blocks.iter().map(|block| block.sealed_header()))?;
581                if let Some(progress) = blocks.get(start as usize) {
582                    // Insert last progress data
583                    {
584                        let tx = self.db.factory.provider_rw()?.into_tx();
585                        let mut static_file_producer = static_file_provider
586                            .get_writer(start, StaticFileSegment::Transactions)?;
587
588                        let body = StoredBlockBodyIndices {
589                            first_tx_num: 0,
590                            tx_count: progress.transaction_count() as u64,
591                        };
592
593                        static_file_producer.set_block_range(0..=progress.number);
594
595                        body.tx_num_range().try_for_each(|tx_num| {
596                            let transaction = random_signed_tx(&mut rng);
597                            static_file_producer.append_transaction(tx_num, &transaction).map(drop)
598                        })?;
599
600                        if body.tx_count != 0 {
601                            tx.put::<tables::TransactionBlocks>(
602                                body.last_tx_num(),
603                                progress.number,
604                            )?;
605                        }
606
607                        tx.put::<tables::BlockBodyIndices>(progress.number, body)?;
608
609                        if !progress.ommers_hash_is_empty() {
610                            tx.put::<tables::BlockOmmers>(
611                                progress.number,
612                                StoredBlockOmmers { ommers: progress.body().ommers.clone() },
613                            )?;
614                        }
615
616                        static_file_producer.commit()?;
617                        tx.commit()?;
618                    }
619                }
620                self.set_responses(blocks.iter().map(body_by_hash).collect());
621                Ok(blocks)
622            }
623
624            fn validate_execution(
625                &self,
626                input: ExecInput,
627                output: Option<ExecOutput>,
628            ) -> Result<(), TestRunnerError> {
629                let highest_block = match output.as_ref() {
630                    Some(output) => output.checkpoint,
631                    None => input.checkpoint(),
632                }
633                .block_number;
634                self.validate_db_blocks(highest_block, highest_block)
635            }
636        }
637
638        impl UnwindStageTestRunner for BodyTestRunner {
639            fn validate_unwind(&self, input: UnwindInput) -> Result<(), TestRunnerError> {
640                self.db.ensure_no_entry_above::<tables::BlockBodyIndices, _>(
641                    input.unwind_to,
642                    |key| key,
643                )?;
644                self.db
645                    .ensure_no_entry_above::<tables::BlockOmmers, _>(input.unwind_to, |key| key)?;
646                if let Some(last_tx_id) = self.get_last_tx_id()? {
647                    self.db
648                        .ensure_no_entry_above::<tables::Transactions, _>(last_tx_id, |key| key)?;
649                    self.db.ensure_no_entry_above::<tables::TransactionBlocks, _>(
650                        last_tx_id,
651                        |key| key,
652                    )?;
653                }
654                Ok(())
655            }
656        }
657
658        impl BodyTestRunner {
659            /// Get the last available tx id if any
660            pub(crate) fn get_last_tx_id(&self) -> Result<Option<TxNumber>, TestRunnerError> {
661                let last_body = self.db.query(|tx| {
662                    let v = tx.cursor_read::<tables::BlockBodyIndices>()?.last()?;
663                    Ok(v)
664                })?;
665                Ok(match last_body {
666                    Some((_, body)) if body.tx_count != 0 => {
667                        Some(body.first_tx_num + body.tx_count - 1)
668                    }
669                    _ => None,
670                })
671            }
672
673            /// Validate that the inserted block data is valid
674            pub(crate) fn validate_db_blocks(
675                &self,
676                prev_progress: BlockNumber,
677                highest_block: BlockNumber,
678            ) -> Result<(), TestRunnerError> {
679                let static_file_provider = self.db.factory.static_file_provider();
680
681                self.db.query(|tx| {
682                    // Acquire cursors on body related tables
683                    let mut bodies_cursor = tx.cursor_read::<tables::BlockBodyIndices>()?;
684                    let mut ommers_cursor = tx.cursor_read::<tables::BlockOmmers>()?;
685                    let mut tx_block_cursor = tx.cursor_read::<tables::TransactionBlocks>()?;
686
687                    let first_body_key = match bodies_cursor.first()? {
688                        Some((key, _)) => key,
689                        None => return Ok(()),
690                    };
691
692                    let mut prev_number: Option<BlockNumber> = None;
693
694
695                    for entry in bodies_cursor.walk(Some(first_body_key))? {
696                        let (number, body) = entry?;
697
698                        // Validate sequentiality only after prev progress,
699                        // since the data before is mocked and can contain gaps
700                        if number > prev_progress
701                            && let Some(prev_key) = prev_number {
702                                assert_eq!(prev_key + 1, number, "Body entries must be sequential");
703                            }
704
705                        // Validate that the current entry is below or equals to the highest allowed block
706                        assert!(
707                            number <= highest_block,
708                            "We wrote a block body outside of our synced range. Found block with number {number}, highest block according to stage is {highest_block}",
709                        );
710
711                        let header = static_file_provider.header_by_number(number)?.expect("to be present");
712                        // Validate that ommers exist if any
713                        let stored_ommers =  ommers_cursor.seek_exact(number)?;
714                        if header.ommers_hash_is_empty() {
715                            assert!(stored_ommers.is_none(), "Unexpected ommers entry");
716                        } else {
717                            assert!(stored_ommers.is_some(), "Missing ommers entry");
718                        }
719
720                        let tx_block_id = tx_block_cursor.seek_exact(body.last_tx_num())?.map(|(_,b)| b);
721                        if body.tx_count == 0 {
722                            assert_ne!(tx_block_id,Some(number));
723                        } else {
724                            assert_eq!(tx_block_id, Some(number));
725                        }
726
727                        for tx_id in body.tx_num_range() {
728                            assert!(static_file_provider.transaction_by_id(tx_id)?.is_some(), "Transaction is missing.");
729                        }
730
731                        prev_number = Some(number);
732                    }
733                    Ok(())
734                })?;
735                Ok(())
736            }
737        }
738
739        /// A [`BodyDownloader`] that is backed by an internal [`HashMap`] for testing.
740        #[derive(Debug)]
741        pub(crate) struct TestBodyDownloader {
742            provider_factory: ProviderFactory<MockNodeTypesWithDB>,
743            responses: HashMap<B256, BlockBody>,
744            headers: VecDeque<SealedHeader>,
745            batch_size: u64,
746        }
747
748        impl TestBodyDownloader {
749            pub(crate) fn new(
750                provider_factory: ProviderFactory<MockNodeTypesWithDB>,
751                responses: HashMap<B256, BlockBody>,
752                batch_size: u64,
753            ) -> Self {
754                Self { provider_factory, responses, headers: VecDeque::default(), batch_size }
755            }
756        }
757
758        impl BodyDownloader for TestBodyDownloader {
759            type Block = Block;
760
761            fn set_download_range(
762                &mut self,
763                range: RangeInclusive<BlockNumber>,
764            ) -> DownloadResult<()> {
765                let static_file_provider = self.provider_factory.static_file_provider();
766
767                for header in static_file_provider.fetch_range_iter(
768                    StaticFileSegment::Headers,
769                    *range.start()..*range.end() + 1,
770                    |cursor, number| cursor.get_two::<HeaderWithHashMask<Header>>(number.into()),
771                )? {
772                    if let Some((header, hash)) = header? {
773                        self.headers.push_back(SealedHeader::new(header, hash));
774                    }
775                }
776
777                Ok(())
778            }
779        }
780
781        impl Stream for TestBodyDownloader {
782            type Item = BodyDownloaderResult<Block>;
783            fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
784                let this = self.get_mut();
785
786                if this.headers.is_empty() {
787                    return Poll::Ready(None)
788                }
789
790                let mut response =
791                    Vec::with_capacity(std::cmp::min(this.headers.len(), this.batch_size as usize));
792                while let Some(header) = this.headers.pop_front() {
793                    if header.is_empty() {
794                        response.push(BlockResponse::Empty(header))
795                    } else {
796                        let body =
797                            this.responses.remove(&header.hash()).expect("requested unknown body");
798                        response.push(BlockResponse::Full(SealedBlock::from_sealed_parts(
799                            header, body,
800                        )));
801                    }
802
803                    if response.len() as u64 >= this.batch_size {
804                        break
805                    }
806                }
807
808                if !response.is_empty() {
809                    return Poll::Ready(Some(Ok(response)))
810                }
811
812                panic!("requested bodies without setting headers")
813            }
814        }
815    }
816}