reth_static_file/
static_file_producer.rs

1//! Support for producing static files.
2
3use crate::{segments, segments::Segment, StaticFileProducerEvent};
4use alloy_primitives::BlockNumber;
5use parking_lot::Mutex;
6use rayon::prelude::*;
7use reth_codecs::Compact;
8use reth_db_api::table::Value;
9use reth_primitives_traits::NodePrimitives;
10use reth_provider::{
11    providers::StaticFileWriter, BlockReader, ChainStateBlockReader, DBProvider,
12    DatabaseProviderFactory, StageCheckpointReader, StaticFileProviderFactory,
13};
14use reth_prune_types::PruneModes;
15use reth_stages_types::StageId;
16use reth_static_file_types::{HighestStaticFiles, StaticFileTargets};
17use reth_storage_errors::provider::ProviderResult;
18use reth_tokio_util::{EventSender, EventStream};
19use std::{
20    ops::{Deref, RangeInclusive},
21    sync::Arc,
22    time::Instant,
23};
24use tracing::{debug, trace};
25
26/// Result of [`StaticFileProducerInner::run`] execution.
27pub type StaticFileProducerResult = ProviderResult<StaticFileTargets>;
28
29/// The [`StaticFileProducer`] instance itself with the result of [`StaticFileProducerInner::run`]
30pub type StaticFileProducerWithResult<Provider> =
31    (StaticFileProducer<Provider>, StaticFileProducerResult);
32
33/// Static File producer. It's a wrapper around [`StaticFileProducer`] that allows to share it
34/// between threads.
35#[derive(Debug)]
36pub struct StaticFileProducer<Provider>(Arc<Mutex<StaticFileProducerInner<Provider>>>);
37
38impl<Provider> StaticFileProducer<Provider> {
39    /// Creates a new [`StaticFileProducer`].
40    pub fn new(provider: Provider, prune_modes: PruneModes) -> Self {
41        Self(Arc::new(Mutex::new(StaticFileProducerInner::new(provider, prune_modes))))
42    }
43}
44
45impl<Provider> Clone for StaticFileProducer<Provider> {
46    fn clone(&self) -> Self {
47        Self(self.0.clone())
48    }
49}
50
51impl<Provider> Deref for StaticFileProducer<Provider> {
52    type Target = Arc<Mutex<StaticFileProducerInner<Provider>>>;
53
54    fn deref(&self) -> &Self::Target {
55        &self.0
56    }
57}
58
59/// Static File producer routine. See [`StaticFileProducerInner::run`] for more detailed
60/// description.
61#[derive(Debug)]
62pub struct StaticFileProducerInner<Provider> {
63    /// Provider factory
64    provider: Provider,
65    /// Pruning configuration for every part of the data that can be pruned. Set by user, and
66    /// needed in [`StaticFileProducerInner`] to prevent attempting to move prunable data to static
67    /// files. See [`StaticFileProducerInner::get_static_file_targets`].
68    prune_modes: PruneModes,
69    event_sender: EventSender<StaticFileProducerEvent>,
70}
71
72impl<Provider> StaticFileProducerInner<Provider> {
73    fn new(provider: Provider, prune_modes: PruneModes) -> Self {
74        Self { provider, prune_modes, event_sender: Default::default() }
75    }
76}
77
78impl<Provider> StaticFileProducerInner<Provider>
79where
80    Provider: StaticFileProviderFactory + DatabaseProviderFactory<Provider: ChainStateBlockReader>,
81{
82    /// Returns the last finalized block number on disk.
83    pub fn last_finalized_block(&self) -> ProviderResult<Option<BlockNumber>> {
84        self.provider.database_provider_ro()?.last_finalized_block_number()
85    }
86}
87
88impl<Provider> StaticFileProducerInner<Provider>
89where
90    Provider: StaticFileProviderFactory
91        + DatabaseProviderFactory<
92            Provider: StaticFileProviderFactory<
93                Primitives: NodePrimitives<
94                    SignedTx: Value + Compact,
95                    BlockHeader: Value + Compact,
96                    Receipt: Value + Compact,
97                >,
98            > + StageCheckpointReader
99                          + BlockReader,
100        >,
101{
102    /// Listen for events on the `static_file_producer`.
103    pub fn events(&self) -> EventStream<StaticFileProducerEvent> {
104        self.event_sender.new_listener()
105    }
106
107    /// Run the `static_file_producer`.
108    ///
109    /// For each [Some] target in [`StaticFileTargets`], initializes a corresponding [Segment] and
110    /// runs it with the provided block range using [`reth_provider::providers::StaticFileProvider`]
111    /// and a read-only database transaction from [`DatabaseProviderFactory`]. All segments are run
112    /// in parallel.
113    ///
114    /// NOTE: it doesn't delete the data from database, and the actual deleting (aka pruning) logic
115    /// lives in the `prune` crate.
116    pub fn run(&self, targets: StaticFileTargets) -> StaticFileProducerResult {
117        // If there are no targets, do not produce any static files and return early
118        if !targets.any() {
119            return Ok(targets)
120        }
121
122        debug_assert!(targets.is_contiguous_to_highest_static_files(
123            self.provider.static_file_provider().get_highest_static_files()
124        ));
125
126        self.event_sender.notify(StaticFileProducerEvent::Started { targets: targets.clone() });
127
128        debug!(target: "static_file", ?targets, "StaticFileProducer started");
129        let start = Instant::now();
130
131        let mut segments =
132            Vec::<(Box<dyn Segment<Provider::Provider>>, RangeInclusive<BlockNumber>)>::new();
133
134        if let Some(block_range) = targets.transactions.clone() {
135            segments.push((Box::new(segments::Transactions), block_range));
136        }
137        if let Some(block_range) = targets.headers.clone() {
138            segments.push((Box::new(segments::Headers), block_range));
139        }
140        if let Some(block_range) = targets.receipts.clone() {
141            segments.push((Box::new(segments::Receipts), block_range));
142        }
143
144        segments.par_iter().try_for_each(|(segment, block_range)| -> ProviderResult<()> {
145            debug!(target: "static_file", segment = %segment.segment(), ?block_range, "StaticFileProducer segment");
146            let start = Instant::now();
147
148            // Create a new database transaction on every segment to prevent long-lived read-only
149            // transactions
150            let provider = self.provider.database_provider_ro()?.disable_long_read_transaction_safety();
151            segment.copy_to_static_files(provider,  block_range.clone())?;
152
153            let elapsed = start.elapsed(); // TODO(alexey): track in metrics
154            debug!(target: "static_file", segment = %segment.segment(), ?block_range, ?elapsed, "Finished StaticFileProducer segment");
155
156            Ok(())
157        })?;
158
159        self.provider.static_file_provider().commit()?;
160        for (segment, block_range) in segments {
161            self.provider
162                .static_file_provider()
163                .update_index(segment.segment(), Some(*block_range.end()))?;
164        }
165
166        let elapsed = start.elapsed(); // TODO(alexey): track in metrics
167        debug!(target: "static_file", ?targets, ?elapsed, "StaticFileProducer finished");
168
169        self.event_sender
170            .notify(StaticFileProducerEvent::Finished { targets: targets.clone(), elapsed });
171
172        Ok(targets)
173    }
174
175    /// Copies data from database to static files according to
176    /// [stage checkpoints](reth_stages_types::StageCheckpoint).
177    ///
178    /// Returns highest block numbers for all static file segments.
179    pub fn copy_to_static_files(&self) -> ProviderResult<HighestStaticFiles> {
180        let provider = self.provider.database_provider_ro()?;
181        let stages_checkpoints = [StageId::Headers, StageId::Execution, StageId::Bodies]
182            .into_iter()
183            .map(|stage| provider.get_stage_checkpoint(stage).map(|c| c.map(|c| c.block_number)))
184            .collect::<Result<Vec<_>, _>>()?;
185
186        let highest_static_files = HighestStaticFiles {
187            headers: stages_checkpoints[0],
188            receipts: stages_checkpoints[1],
189            transactions: stages_checkpoints[2],
190            block_meta: stages_checkpoints[2],
191        };
192        let targets = self.get_static_file_targets(highest_static_files)?;
193        self.run(targets)?;
194
195        Ok(highest_static_files)
196    }
197
198    /// Returns a static file targets at the provided finalized block numbers per segment.
199    /// The target is determined by the check against highest `static_files` using
200    /// [`reth_provider::providers::StaticFileProvider::get_highest_static_files`].
201    pub fn get_static_file_targets(
202        &self,
203        finalized_block_numbers: HighestStaticFiles,
204    ) -> ProviderResult<StaticFileTargets> {
205        let highest_static_files = self.provider.static_file_provider().get_highest_static_files();
206
207        let targets = StaticFileTargets {
208            headers: finalized_block_numbers.headers.and_then(|finalized_block_number| {
209                self.get_static_file_target(highest_static_files.headers, finalized_block_number)
210            }),
211            // StaticFile receipts only if they're not pruned according to the user configuration
212            receipts: if self.prune_modes.receipts.is_none() &&
213                self.prune_modes.receipts_log_filter.is_empty()
214            {
215                finalized_block_numbers.receipts.and_then(|finalized_block_number| {
216                    self.get_static_file_target(
217                        highest_static_files.receipts,
218                        finalized_block_number,
219                    )
220                })
221            } else {
222                None
223            },
224            transactions: finalized_block_numbers.transactions.and_then(|finalized_block_number| {
225                self.get_static_file_target(
226                    highest_static_files.transactions,
227                    finalized_block_number,
228                )
229            }),
230            block_meta: finalized_block_numbers.block_meta.and_then(|finalized_block_number| {
231                self.get_static_file_target(highest_static_files.block_meta, finalized_block_number)
232            }),
233        };
234
235        trace!(
236            target: "static_file",
237            ?finalized_block_numbers,
238            ?highest_static_files,
239            ?targets,
240            any = %targets.any(),
241            "StaticFile targets"
242        );
243
244        Ok(targets)
245    }
246
247    fn get_static_file_target(
248        &self,
249        highest_static_file: Option<BlockNumber>,
250        finalized_block_number: BlockNumber,
251    ) -> Option<RangeInclusive<BlockNumber>> {
252        let range = highest_static_file.map_or(0, |block| block + 1)..=finalized_block_number;
253        (!range.is_empty()).then_some(range)
254    }
255}
256
257#[cfg(test)]
258mod tests {
259    use crate::static_file_producer::{
260        StaticFileProducer, StaticFileProducerInner, StaticFileTargets,
261    };
262    use alloy_primitives::{B256, U256};
263    use assert_matches::assert_matches;
264    use reth_db_api::{database::Database, transaction::DbTx};
265    use reth_provider::{
266        providers::StaticFileWriter, test_utils::MockNodeTypesWithDB, ProviderError,
267        ProviderFactory, StaticFileProviderFactory,
268    };
269    use reth_prune_types::PruneModes;
270    use reth_stages::test_utils::{StorageKind, TestStageDB};
271    use reth_static_file_types::{HighestStaticFiles, StaticFileSegment};
272    use reth_testing_utils::generators::{
273        self, random_block_range, random_receipt, BlockRangeParams,
274    };
275    use std::{sync::mpsc::channel, time::Duration};
276    use tempfile::TempDir;
277
278    fn setup() -> (ProviderFactory<MockNodeTypesWithDB>, TempDir) {
279        let mut rng = generators::rng();
280        let db = TestStageDB::default();
281
282        let blocks = random_block_range(
283            &mut rng,
284            0..=3,
285            BlockRangeParams { parent: Some(B256::ZERO), tx_count: 2..3, ..Default::default() },
286        );
287        db.insert_blocks(blocks.iter(), StorageKind::Database(None)).expect("insert blocks");
288        // Unwind headers from static_files and manually insert them into the database, so we're
289        // able to check that static_file_producer works
290        let static_file_provider = db.factory.static_file_provider();
291        let mut static_file_writer = static_file_provider
292            .latest_writer(StaticFileSegment::Headers)
293            .expect("get static file writer for headers");
294        static_file_writer.prune_headers(blocks.len() as u64).unwrap();
295        static_file_writer.commit().expect("prune headers");
296
297        let tx = db.factory.db_ref().tx_mut().expect("init tx");
298        for block in &blocks {
299            TestStageDB::insert_header(None, &tx, block.sealed_header(), U256::ZERO)
300                .expect("insert block header");
301        }
302        tx.commit().expect("commit tx");
303
304        let mut receipts = Vec::new();
305        for block in &blocks {
306            for transaction in &block.body().transactions {
307                receipts
308                    .push((receipts.len() as u64, random_receipt(&mut rng, transaction, Some(0))));
309            }
310        }
311        db.insert_receipts(receipts).expect("insert receipts");
312
313        let provider_factory = db.factory;
314        (provider_factory, db.temp_static_files_dir)
315    }
316
317    #[test]
318    fn run() {
319        let (provider_factory, _temp_static_files_dir) = setup();
320
321        let static_file_producer =
322            StaticFileProducerInner::new(provider_factory.clone(), PruneModes::default());
323
324        let targets = static_file_producer
325            .get_static_file_targets(HighestStaticFiles {
326                headers: Some(1),
327                receipts: Some(1),
328                transactions: Some(1),
329                block_meta: None,
330            })
331            .expect("get static file targets");
332        assert_eq!(
333            targets,
334            StaticFileTargets {
335                headers: Some(0..=1),
336                receipts: Some(0..=1),
337                transactions: Some(0..=1),
338                block_meta: None
339            }
340        );
341        assert_matches!(static_file_producer.run(targets), Ok(_));
342        assert_eq!(
343            provider_factory.static_file_provider().get_highest_static_files(),
344            HighestStaticFiles {
345                headers: Some(1),
346                receipts: Some(1),
347                transactions: Some(1),
348                block_meta: None
349            }
350        );
351
352        let targets = static_file_producer
353            .get_static_file_targets(HighestStaticFiles {
354                headers: Some(3),
355                receipts: Some(3),
356                transactions: Some(3),
357                block_meta: None,
358            })
359            .expect("get static file targets");
360        assert_eq!(
361            targets,
362            StaticFileTargets {
363                headers: Some(2..=3),
364                receipts: Some(2..=3),
365                transactions: Some(2..=3),
366                block_meta: None
367            }
368        );
369        assert_matches!(static_file_producer.run(targets), Ok(_));
370        assert_eq!(
371            provider_factory.static_file_provider().get_highest_static_files(),
372            HighestStaticFiles {
373                headers: Some(3),
374                receipts: Some(3),
375                transactions: Some(3),
376                block_meta: None
377            }
378        );
379
380        let targets = static_file_producer
381            .get_static_file_targets(HighestStaticFiles {
382                headers: Some(4),
383                receipts: Some(4),
384                transactions: Some(4),
385                block_meta: None,
386            })
387            .expect("get static file targets");
388        assert_eq!(
389            targets,
390            StaticFileTargets {
391                headers: Some(4..=4),
392                receipts: Some(4..=4),
393                transactions: Some(4..=4),
394                block_meta: None
395            }
396        );
397        assert_matches!(
398            static_file_producer.run(targets),
399            Err(ProviderError::BlockBodyIndicesNotFound(4))
400        );
401        assert_eq!(
402            provider_factory.static_file_provider().get_highest_static_files(),
403            HighestStaticFiles {
404                headers: Some(3),
405                receipts: Some(3),
406                transactions: Some(3),
407                block_meta: None
408            }
409        );
410    }
411
412    /// Tests that a cloneable [`StaticFileProducer`] type is not susceptible to any race condition.
413    #[test]
414    fn only_one() {
415        let (provider_factory, _temp_static_files_dir) = setup();
416
417        let static_file_producer = StaticFileProducer::new(provider_factory, PruneModes::default());
418
419        let (tx, rx) = channel();
420
421        for i in 0..5 {
422            let producer = static_file_producer.clone();
423            let tx = tx.clone();
424
425            std::thread::spawn(move || {
426                let locked_producer = producer.lock();
427                if i == 0 {
428                    // Let other threads spawn as well.
429                    std::thread::sleep(Duration::from_millis(100));
430                }
431                let targets = locked_producer
432                    .get_static_file_targets(HighestStaticFiles {
433                        headers: Some(1),
434                        receipts: Some(1),
435                        transactions: Some(1),
436                        block_meta: None,
437                    })
438                    .expect("get static file targets");
439                assert_matches!(locked_producer.run(targets.clone()), Ok(_));
440                tx.send(targets).unwrap();
441            });
442        }
443
444        drop(tx);
445
446        let mut only_one = Some(());
447        for target in rx {
448            // Only the first spawn should have any meaningful target.
449            assert!(only_one.take().is_some_and(|_| target.any()) || !target.any())
450        }
451    }
452}