1use crate::{segments, segments::Segment, StaticFileProducerEvent};
4use alloy_primitives::BlockNumber;
5use parking_lot::Mutex;
6use rayon::prelude::*;
7use reth_codecs::Compact;
8use reth_db_api::table::Value;
9use reth_primitives_traits::{FastInstant as Instant, NodePrimitives};
10use reth_provider::{
11 providers::StaticFileWriter, BlockReader, ChainStateBlockReader, DBProvider,
12 DatabaseProviderFactory, StageCheckpointReader, StaticFileProviderFactory,
13};
14use reth_prune_types::PruneModes;
15use reth_stages_types::StageId;
16use reth_static_file_types::{HighestStaticFiles, StaticFileTargets};
17use reth_storage_errors::provider::ProviderResult;
18use reth_tokio_util::{EventSender, EventStream};
19use std::{
20 ops::{Deref, RangeInclusive},
21 sync::Arc,
22};
23use tracing::{debug, trace};
24
25pub type StaticFileProducerResult = ProviderResult<StaticFileTargets>;
27
28pub type StaticFileProducerWithResult<Provider> =
30 (StaticFileProducer<Provider>, StaticFileProducerResult);
31
32#[derive(Debug)]
35pub struct StaticFileProducer<Provider>(Arc<Mutex<StaticFileProducerInner<Provider>>>);
36
37impl<Provider> StaticFileProducer<Provider> {
38 pub fn new(provider: Provider, prune_modes: PruneModes) -> Self {
40 Self(Arc::new(Mutex::new(StaticFileProducerInner::new(provider, prune_modes))))
41 }
42}
43
44impl<Provider> Clone for StaticFileProducer<Provider> {
45 fn clone(&self) -> Self {
46 Self(self.0.clone())
47 }
48}
49
50impl<Provider> Deref for StaticFileProducer<Provider> {
51 type Target = Arc<Mutex<StaticFileProducerInner<Provider>>>;
52
53 fn deref(&self) -> &Self::Target {
54 &self.0
55 }
56}
57
58#[derive(Debug)]
61pub struct StaticFileProducerInner<Provider> {
62 provider: Provider,
64 prune_modes: PruneModes,
68 event_sender: EventSender<StaticFileProducerEvent>,
69}
70
71impl<Provider> StaticFileProducerInner<Provider> {
72 fn new(provider: Provider, prune_modes: PruneModes) -> Self {
73 Self { provider, prune_modes, event_sender: Default::default() }
74 }
75}
76
77impl<Provider> StaticFileProducerInner<Provider>
78where
79 Provider: StaticFileProviderFactory + DatabaseProviderFactory<Provider: ChainStateBlockReader>,
80{
81 pub fn last_finalized_block(&self) -> ProviderResult<Option<BlockNumber>> {
83 self.provider.database_provider_ro()?.last_finalized_block_number()
84 }
85}
86
87impl<Provider> StaticFileProducerInner<Provider>
88where
89 Provider: StaticFileProviderFactory
90 + DatabaseProviderFactory<
91 Provider: StaticFileProviderFactory<
92 Primitives: NodePrimitives<
93 SignedTx: Value + Compact,
94 BlockHeader: Value + Compact,
95 Receipt: Value + Compact,
96 >,
97 > + StageCheckpointReader
98 + BlockReader
99 + reth_provider::ChangeSetReader,
100 >,
101{
102 pub fn events(&self) -> EventStream<StaticFileProducerEvent> {
104 self.event_sender.new_listener()
105 }
106
107 pub fn run(&self, targets: StaticFileTargets) -> StaticFileProducerResult {
117 if !targets.any() {
119 return Ok(targets)
120 }
121
122 debug_assert!(targets.is_contiguous_to_highest_static_files(
123 self.provider.static_file_provider().get_highest_static_files()
124 ));
125
126 self.event_sender.notify(StaticFileProducerEvent::Started { targets: targets.clone() });
127
128 debug!(target: "static_file", ?targets, "StaticFileProducer started");
129 let start = Instant::now();
130
131 let mut segments =
132 Vec::<(Box<dyn Segment<Provider::Provider>>, RangeInclusive<BlockNumber>)>::new();
133
134 if let Some(block_range) = targets.receipts.clone() {
135 segments.push((Box::new(segments::Receipts), block_range));
136 }
137
138 segments.par_iter().try_for_each(|(segment, block_range)| -> ProviderResult<()> {
139 debug!(target: "static_file", segment = %segment.segment(), ?block_range, "StaticFileProducer segment");
140 let start = Instant::now();
141
142 let provider = self.provider.database_provider_ro()?.disable_long_read_transaction_safety();
145 segment.copy_to_static_files(provider, block_range.clone())?;
146
147 let elapsed = start.elapsed(); debug!(target: "static_file", segment = %segment.segment(), ?block_range, ?elapsed, "Finished StaticFileProducer segment");
149
150 Ok(())
151 })?;
152
153 self.provider.static_file_provider().commit()?;
154 for (segment, block_range) in segments {
155 self.provider
156 .static_file_provider()
157 .update_index(segment.segment(), Some(*block_range.end()))?;
158 }
159
160 let elapsed = start.elapsed(); debug!(target: "static_file", ?targets, ?elapsed, "StaticFileProducer finished");
162
163 self.event_sender
164 .notify(StaticFileProducerEvent::Finished { targets: targets.clone(), elapsed });
165
166 Ok(targets)
167 }
168
169 pub fn copy_to_static_files(&self) -> ProviderResult<HighestStaticFiles> {
174 let provider = self.provider.database_provider_ro()?;
175 let execution_checkpoint =
176 provider.get_stage_checkpoint(StageId::Execution)?.map(|c| c.block_number);
177
178 let highest_static_files = HighestStaticFiles { receipts: execution_checkpoint };
179 let targets = self.get_static_file_targets(highest_static_files)?;
180 self.run(targets)?;
181
182 Ok(highest_static_files)
183 }
184
185 pub fn get_static_file_targets(
189 &self,
190 finalized_block_numbers: HighestStaticFiles,
191 ) -> ProviderResult<StaticFileTargets> {
192 let highest_static_files = self.provider.static_file_provider().get_highest_static_files();
193
194 let targets = StaticFileTargets {
195 receipts: if self.prune_modes.receipts.is_none() &&
197 self.prune_modes.receipts_log_filter.is_empty()
198 {
199 finalized_block_numbers.receipts.and_then(|finalized_block_number| {
200 self.get_static_file_target(
201 highest_static_files.receipts,
202 finalized_block_number,
203 )
204 })
205 } else {
206 None
207 },
208 };
209
210 trace!(
211 target: "static_file",
212 ?finalized_block_numbers,
213 ?highest_static_files,
214 ?targets,
215 any = %targets.any(),
216 "StaticFile targets"
217 );
218
219 Ok(targets)
220 }
221
222 fn get_static_file_target(
223 &self,
224 highest_static_file: Option<BlockNumber>,
225 finalized_block_number: BlockNumber,
226 ) -> Option<RangeInclusive<BlockNumber>> {
227 let range = highest_static_file.map_or(0, |block| block + 1)..=finalized_block_number;
228 (!range.is_empty()).then_some(range)
229 }
230}
231
232#[cfg(test)]
233mod tests {
234 use crate::static_file_producer::{
235 StaticFileProducer, StaticFileProducerInner, StaticFileTargets,
236 };
237 use alloy_primitives::B256;
238 use assert_matches::assert_matches;
239 use reth_provider::{
240 providers::StaticFileWriter, test_utils::MockNodeTypesWithDB, ProviderError,
241 ProviderFactory, StaticFileProviderFactory,
242 };
243 use reth_prune_types::PruneModes;
244 use reth_stages::test_utils::{StorageKind, TestStageDB};
245 use reth_static_file_types::{HighestStaticFiles, StaticFileSegment};
246 use reth_testing_utils::generators::{
247 self, random_block_range, random_receipt, BlockRangeParams,
248 };
249 use std::{sync::mpsc::channel, time::Duration};
250 use tempfile::TempDir;
251
252 fn setup() -> (ProviderFactory<MockNodeTypesWithDB>, TempDir) {
253 let mut rng = generators::rng();
254 let db = TestStageDB::default();
255
256 let blocks = random_block_range(
257 &mut rng,
258 0..=3,
259 BlockRangeParams { parent: Some(B256::ZERO), tx_count: 2..3, ..Default::default() },
260 );
261 db.insert_blocks(blocks.iter(), StorageKind::Database(None)).expect("insert blocks");
262 let static_file_provider = db.factory.static_file_provider();
265 let mut static_file_writer = static_file_provider
266 .latest_writer(StaticFileSegment::Headers)
267 .expect("get static file writer for headers");
268 static_file_writer.prune_headers(blocks.len() as u64).unwrap();
269 static_file_writer.commit().expect("prune headers");
270 drop(static_file_writer);
271
272 db.insert_blocks(blocks.iter(), StorageKind::Database(None)).expect("insert blocks");
273
274 let mut receipts = Vec::new();
275 for block in &blocks {
276 for transaction in &block.body().transactions {
277 receipts.push((
278 receipts.len() as u64,
279 random_receipt(&mut rng, transaction, Some(0), None),
280 ));
281 }
282 }
283 db.insert_receipts(receipts).expect("insert receipts");
284
285 let provider_factory = db.factory;
286 (provider_factory, db.temp_static_files_dir)
287 }
288
289 #[test]
290 fn run() {
291 let (provider_factory, _temp_static_files_dir) = setup();
292
293 let static_file_producer =
294 StaticFileProducerInner::new(provider_factory.clone(), PruneModes::default());
295
296 let targets = static_file_producer
297 .get_static_file_targets(HighestStaticFiles { receipts: Some(1) })
298 .expect("get static file targets");
299 assert_eq!(targets, StaticFileTargets { receipts: Some(0..=1) });
300 assert_matches!(static_file_producer.run(targets), Ok(_));
301 assert_eq!(
302 provider_factory.static_file_provider().get_highest_static_files(),
303 HighestStaticFiles { receipts: Some(1) }
304 );
305
306 let targets = static_file_producer
307 .get_static_file_targets(HighestStaticFiles { receipts: Some(3) })
308 .expect("get static file targets");
309 assert_eq!(targets, StaticFileTargets { receipts: Some(2..=3) });
310 assert_matches!(static_file_producer.run(targets), Ok(_));
311 assert_eq!(
312 provider_factory.static_file_provider().get_highest_static_files(),
313 HighestStaticFiles { receipts: Some(3) }
314 );
315
316 let targets = static_file_producer
317 .get_static_file_targets(HighestStaticFiles { receipts: Some(4) })
318 .expect("get static file targets");
319 assert_eq!(targets, StaticFileTargets { receipts: Some(4..=4) });
320 assert_matches!(
321 static_file_producer.run(targets),
322 Err(ProviderError::BlockBodyIndicesNotFound(4))
323 );
324 assert_eq!(
325 provider_factory.static_file_provider().get_highest_static_files(),
326 HighestStaticFiles { receipts: Some(3) }
327 );
328 }
329
330 #[test]
332 fn only_one() {
333 let (provider_factory, _temp_static_files_dir) = setup();
334
335 let static_file_producer = StaticFileProducer::new(provider_factory, PruneModes::default());
336
337 let (tx, rx) = channel();
338
339 for i in 0..5 {
340 let producer = static_file_producer.clone();
341 let tx = tx.clone();
342
343 std::thread::spawn(move || {
344 let locked_producer = producer.lock();
345 if i == 0 {
346 std::thread::sleep(Duration::from_millis(100));
348 }
349 let targets = locked_producer
350 .get_static_file_targets(HighestStaticFiles { receipts: Some(1) })
351 .expect("get static file targets");
352 assert_matches!(locked_producer.run(targets.clone()), Ok(_));
353 tx.send(targets).unwrap();
354 });
355 }
356
357 drop(tx);
358
359 let mut only_one = Some(());
360 for target in rx {
361 assert!(only_one.take().is_some_and(|_| target.any()) || !target.any())
363 }
364 }
365}