Skip to main content

reth_static_file/
static_file_producer.rs

1//! Support for producing static files.
2
3use crate::{segments, segments::Segment, StaticFileProducerEvent};
4use alloy_primitives::BlockNumber;
5use parking_lot::Mutex;
6use rayon::prelude::*;
7use reth_codecs::Compact;
8use reth_db_api::table::Value;
9use reth_primitives_traits::{FastInstant as Instant, NodePrimitives};
10use reth_provider::{
11    providers::StaticFileWriter, BlockReader, ChainStateBlockReader, DBProvider,
12    DatabaseProviderFactory, StageCheckpointReader, StaticFileProviderFactory,
13};
14use reth_prune_types::PruneModes;
15use reth_stages_types::StageId;
16use reth_static_file_types::{HighestStaticFiles, StaticFileTargets};
17use reth_storage_errors::provider::ProviderResult;
18use reth_tokio_util::{EventSender, EventStream};
19use std::{
20    ops::{Deref, RangeInclusive},
21    sync::Arc,
22};
23use tracing::{debug, trace};
24
25/// Result of [`StaticFileProducerInner::run`] execution.
26pub type StaticFileProducerResult = ProviderResult<StaticFileTargets>;
27
28/// The [`StaticFileProducer`] instance itself with the result of [`StaticFileProducerInner::run`]
29pub type StaticFileProducerWithResult<Provider> =
30    (StaticFileProducer<Provider>, StaticFileProducerResult);
31
32/// Static File producer. It's a wrapper around [`StaticFileProducerInner`] that allows to share it
33/// between threads.
34#[derive(Debug)]
35pub struct StaticFileProducer<Provider>(Arc<Mutex<StaticFileProducerInner<Provider>>>);
36
37impl<Provider> StaticFileProducer<Provider> {
38    /// Creates a new [`StaticFileProducer`].
39    pub fn new(provider: Provider, prune_modes: PruneModes) -> Self {
40        Self(Arc::new(Mutex::new(StaticFileProducerInner::new(provider, prune_modes))))
41    }
42}
43
44impl<Provider> Clone for StaticFileProducer<Provider> {
45    fn clone(&self) -> Self {
46        Self(self.0.clone())
47    }
48}
49
50impl<Provider> Deref for StaticFileProducer<Provider> {
51    type Target = Arc<Mutex<StaticFileProducerInner<Provider>>>;
52
53    fn deref(&self) -> &Self::Target {
54        &self.0
55    }
56}
57
58/// Static File producer routine. See [`StaticFileProducerInner::run`] for more detailed
59/// description.
60#[derive(Debug)]
61pub struct StaticFileProducerInner<Provider> {
62    /// Provider factory
63    provider: Provider,
64    /// Pruning configuration for every part of the data that can be pruned. Set by user, and
65    /// needed in [`StaticFileProducerInner`] to prevent attempting to move prunable data to static
66    /// files. See [`StaticFileProducerInner::get_static_file_targets`].
67    prune_modes: PruneModes,
68    event_sender: EventSender<StaticFileProducerEvent>,
69}
70
71impl<Provider> StaticFileProducerInner<Provider> {
72    fn new(provider: Provider, prune_modes: PruneModes) -> Self {
73        Self { provider, prune_modes, event_sender: Default::default() }
74    }
75}
76
77impl<Provider> StaticFileProducerInner<Provider>
78where
79    Provider: StaticFileProviderFactory + DatabaseProviderFactory<Provider: ChainStateBlockReader>,
80{
81    /// Returns the last finalized block number on disk.
82    pub fn last_finalized_block(&self) -> ProviderResult<Option<BlockNumber>> {
83        self.provider.database_provider_ro()?.last_finalized_block_number()
84    }
85}
86
87impl<Provider> StaticFileProducerInner<Provider>
88where
89    Provider: StaticFileProviderFactory
90        + DatabaseProviderFactory<
91            Provider: StaticFileProviderFactory<
92                Primitives: NodePrimitives<
93                    SignedTx: Value + Compact,
94                    BlockHeader: Value + Compact,
95                    Receipt: Value + Compact,
96                >,
97            > + StageCheckpointReader
98                          + BlockReader
99                          + reth_provider::ChangeSetReader,
100        >,
101{
102    /// Listen for events on the `static_file_producer`.
103    pub fn events(&self) -> EventStream<StaticFileProducerEvent> {
104        self.event_sender.new_listener()
105    }
106
107    /// Run the `static_file_producer`.
108    ///
109    /// For each [Some] target in [`StaticFileTargets`], initializes a corresponding [Segment] and
110    /// runs it with the provided block range using [`reth_provider::providers::StaticFileProvider`]
111    /// and a read-only database transaction from [`DatabaseProviderFactory`]. All segments are run
112    /// in parallel.
113    ///
114    /// NOTE: it doesn't delete the data from database, and the actual deleting (aka pruning) logic
115    /// lives in the `prune` crate.
116    pub fn run(&self, targets: StaticFileTargets) -> StaticFileProducerResult {
117        // If there are no targets, do not produce any static files and return early
118        if !targets.any() {
119            return Ok(targets)
120        }
121
122        debug_assert!(targets.is_contiguous_to_highest_static_files(
123            self.provider.static_file_provider().get_highest_static_files()
124        ));
125
126        self.event_sender.notify(StaticFileProducerEvent::Started { targets: targets.clone() });
127
128        debug!(target: "static_file", ?targets, "StaticFileProducer started");
129        let start = Instant::now();
130
131        let mut segments =
132            Vec::<(Box<dyn Segment<Provider::Provider>>, RangeInclusive<BlockNumber>)>::new();
133
134        if let Some(block_range) = targets.receipts.clone() {
135            segments.push((Box::new(segments::Receipts), block_range));
136        }
137
138        segments.par_iter().try_for_each(|(segment, block_range)| -> ProviderResult<()> {
139            debug!(target: "static_file", segment = %segment.segment(), ?block_range, "StaticFileProducer segment");
140            let start = Instant::now();
141
142            // Create a new database transaction on every segment to prevent long-lived read-only
143            // transactions
144            let provider = self.provider.database_provider_ro()?.disable_long_read_transaction_safety();
145            segment.copy_to_static_files(provider,  block_range.clone())?;
146
147            let elapsed = start.elapsed(); // TODO(alexey): track in metrics
148            debug!(target: "static_file", segment = %segment.segment(), ?block_range, ?elapsed, "Finished StaticFileProducer segment");
149
150            Ok(())
151        })?;
152
153        self.provider.static_file_provider().commit()?;
154        for (segment, block_range) in segments {
155            self.provider
156                .static_file_provider()
157                .update_index(segment.segment(), Some(*block_range.end()))?;
158        }
159
160        let elapsed = start.elapsed(); // TODO(alexey): track in metrics
161        debug!(target: "static_file", ?targets, ?elapsed, "StaticFileProducer finished");
162
163        self.event_sender
164            .notify(StaticFileProducerEvent::Finished { targets: targets.clone(), elapsed });
165
166        Ok(targets)
167    }
168
169    /// Copies data from database to static files according to
170    /// [stage checkpoints](reth_stages_types::StageCheckpoint).
171    ///
172    /// Returns highest block numbers for all static file segments.
173    pub fn copy_to_static_files(&self) -> ProviderResult<HighestStaticFiles> {
174        let provider = self.provider.database_provider_ro()?;
175        let execution_checkpoint =
176            provider.get_stage_checkpoint(StageId::Execution)?.map(|c| c.block_number);
177
178        let highest_static_files = HighestStaticFiles { receipts: execution_checkpoint };
179        let targets = self.get_static_file_targets(highest_static_files)?;
180        self.run(targets)?;
181
182        Ok(highest_static_files)
183    }
184
185    /// Returns a static file targets at the provided finalized block numbers per segment.
186    /// The target is determined by the check against highest `static_files` using
187    /// [`reth_provider::providers::StaticFileProvider::get_highest_static_files`].
188    pub fn get_static_file_targets(
189        &self,
190        finalized_block_numbers: HighestStaticFiles,
191    ) -> ProviderResult<StaticFileTargets> {
192        let highest_static_files = self.provider.static_file_provider().get_highest_static_files();
193
194        let targets = StaticFileTargets {
195            // StaticFile receipts only if they're not pruned according to the user configuration
196            receipts: if self.prune_modes.receipts.is_none() &&
197                self.prune_modes.receipts_log_filter.is_empty()
198            {
199                finalized_block_numbers.receipts.and_then(|finalized_block_number| {
200                    self.get_static_file_target(
201                        highest_static_files.receipts,
202                        finalized_block_number,
203                    )
204                })
205            } else {
206                None
207            },
208        };
209
210        trace!(
211            target: "static_file",
212            ?finalized_block_numbers,
213            ?highest_static_files,
214            ?targets,
215            any = %targets.any(),
216            "StaticFile targets"
217        );
218
219        Ok(targets)
220    }
221
222    fn get_static_file_target(
223        &self,
224        highest_static_file: Option<BlockNumber>,
225        finalized_block_number: BlockNumber,
226    ) -> Option<RangeInclusive<BlockNumber>> {
227        let range = highest_static_file.map_or(0, |block| block + 1)..=finalized_block_number;
228        (!range.is_empty()).then_some(range)
229    }
230}
231
232#[cfg(test)]
233mod tests {
234    use crate::static_file_producer::{
235        StaticFileProducer, StaticFileProducerInner, StaticFileTargets,
236    };
237    use alloy_primitives::B256;
238    use assert_matches::assert_matches;
239    use reth_provider::{
240        providers::StaticFileWriter, test_utils::MockNodeTypesWithDB, ProviderError,
241        ProviderFactory, StaticFileProviderFactory,
242    };
243    use reth_prune_types::PruneModes;
244    use reth_stages::test_utils::{StorageKind, TestStageDB};
245    use reth_static_file_types::{HighestStaticFiles, StaticFileSegment};
246    use reth_testing_utils::generators::{
247        self, random_block_range, random_receipt, BlockRangeParams,
248    };
249    use std::{sync::mpsc::channel, time::Duration};
250    use tempfile::TempDir;
251
252    fn setup() -> (ProviderFactory<MockNodeTypesWithDB>, TempDir) {
253        let mut rng = generators::rng();
254        let db = TestStageDB::default();
255
256        let blocks = random_block_range(
257            &mut rng,
258            0..=3,
259            BlockRangeParams { parent: Some(B256::ZERO), tx_count: 2..3, ..Default::default() },
260        );
261        db.insert_blocks(blocks.iter(), StorageKind::Database(None)).expect("insert blocks");
262        // Unwind headers from static_files and manually insert them into the database, so we're
263        // able to check that static_file_producer works
264        let static_file_provider = db.factory.static_file_provider();
265        let mut static_file_writer = static_file_provider
266            .latest_writer(StaticFileSegment::Headers)
267            .expect("get static file writer for headers");
268        static_file_writer.prune_headers(blocks.len() as u64).unwrap();
269        static_file_writer.commit().expect("prune headers");
270        drop(static_file_writer);
271
272        db.insert_blocks(blocks.iter(), StorageKind::Database(None)).expect("insert blocks");
273
274        let mut receipts = Vec::new();
275        for block in &blocks {
276            for transaction in &block.body().transactions {
277                receipts.push((
278                    receipts.len() as u64,
279                    random_receipt(&mut rng, transaction, Some(0), None),
280                ));
281            }
282        }
283        db.insert_receipts(receipts).expect("insert receipts");
284
285        let provider_factory = db.factory;
286        (provider_factory, db.temp_static_files_dir)
287    }
288
289    #[test]
290    fn run() {
291        let (provider_factory, _temp_static_files_dir) = setup();
292
293        let static_file_producer =
294            StaticFileProducerInner::new(provider_factory.clone(), PruneModes::default());
295
296        let targets = static_file_producer
297            .get_static_file_targets(HighestStaticFiles { receipts: Some(1) })
298            .expect("get static file targets");
299        assert_eq!(targets, StaticFileTargets { receipts: Some(0..=1) });
300        assert_matches!(static_file_producer.run(targets), Ok(_));
301        assert_eq!(
302            provider_factory.static_file_provider().get_highest_static_files(),
303            HighestStaticFiles { receipts: Some(1) }
304        );
305
306        let targets = static_file_producer
307            .get_static_file_targets(HighestStaticFiles { receipts: Some(3) })
308            .expect("get static file targets");
309        assert_eq!(targets, StaticFileTargets { receipts: Some(2..=3) });
310        assert_matches!(static_file_producer.run(targets), Ok(_));
311        assert_eq!(
312            provider_factory.static_file_provider().get_highest_static_files(),
313            HighestStaticFiles { receipts: Some(3) }
314        );
315
316        let targets = static_file_producer
317            .get_static_file_targets(HighestStaticFiles { receipts: Some(4) })
318            .expect("get static file targets");
319        assert_eq!(targets, StaticFileTargets { receipts: Some(4..=4) });
320        assert_matches!(
321            static_file_producer.run(targets),
322            Err(ProviderError::BlockBodyIndicesNotFound(4))
323        );
324        assert_eq!(
325            provider_factory.static_file_provider().get_highest_static_files(),
326            HighestStaticFiles { receipts: Some(3) }
327        );
328    }
329
330    /// Tests that a cloneable [`StaticFileProducer`] type is not susceptible to any race condition.
331    #[test]
332    fn only_one() {
333        let (provider_factory, _temp_static_files_dir) = setup();
334
335        let static_file_producer = StaticFileProducer::new(provider_factory, PruneModes::default());
336
337        let (tx, rx) = channel();
338
339        for i in 0..5 {
340            let producer = static_file_producer.clone();
341            let tx = tx.clone();
342
343            std::thread::spawn(move || {
344                let locked_producer = producer.lock();
345                if i == 0 {
346                    // Let other threads spawn as well.
347                    std::thread::sleep(Duration::from_millis(100));
348                }
349                let targets = locked_producer
350                    .get_static_file_targets(HighestStaticFiles { receipts: Some(1) })
351                    .expect("get static file targets");
352                assert_matches!(locked_producer.run(targets.clone()), Ok(_));
353                tx.send(targets).unwrap();
354            });
355        }
356
357        drop(tx);
358
359        let mut only_one = Some(());
360        for target in rx {
361            // Only the first spawn should have any meaningful target.
362            assert!(only_one.take().is_some_and(|_| target.any()) || !target.any())
363        }
364    }
365}