1use crate::{segments, segments::Segment, StaticFileProducerEvent};
4use alloy_primitives::BlockNumber;
5use parking_lot::Mutex;
6use rayon::prelude::*;
7use reth_codecs::Compact;
8use reth_db_api::table::Value;
9use reth_primitives_traits::NodePrimitives;
10use reth_provider::{
11 providers::StaticFileWriter, BlockReader, ChainStateBlockReader, DBProvider,
12 DatabaseProviderFactory, StageCheckpointReader, StaticFileProviderFactory,
13};
14use reth_prune_types::PruneModes;
15use reth_stages_types::StageId;
16use reth_static_file_types::{HighestStaticFiles, StaticFileTargets};
17use reth_storage_errors::provider::ProviderResult;
18use reth_tokio_util::{EventSender, EventStream};
19use std::{
20 ops::{Deref, RangeInclusive},
21 sync::Arc,
22 time::Instant,
23};
24use tracing::{debug, trace};
25
26pub type StaticFileProducerResult = ProviderResult<StaticFileTargets>;
28
29pub type StaticFileProducerWithResult<Provider> =
31 (StaticFileProducer<Provider>, StaticFileProducerResult);
32
33#[derive(Debug)]
36pub struct StaticFileProducer<Provider>(Arc<Mutex<StaticFileProducerInner<Provider>>>);
37
38impl<Provider> StaticFileProducer<Provider> {
39 pub fn new(provider: Provider, prune_modes: PruneModes) -> Self {
41 Self(Arc::new(Mutex::new(StaticFileProducerInner::new(provider, prune_modes))))
42 }
43}
44
45impl<Provider> Clone for StaticFileProducer<Provider> {
46 fn clone(&self) -> Self {
47 Self(self.0.clone())
48 }
49}
50
51impl<Provider> Deref for StaticFileProducer<Provider> {
52 type Target = Arc<Mutex<StaticFileProducerInner<Provider>>>;
53
54 fn deref(&self) -> &Self::Target {
55 &self.0
56 }
57}
58
59#[derive(Debug)]
62pub struct StaticFileProducerInner<Provider> {
63 provider: Provider,
65 prune_modes: PruneModes,
69 event_sender: EventSender<StaticFileProducerEvent>,
70}
71
72impl<Provider> StaticFileProducerInner<Provider> {
73 fn new(provider: Provider, prune_modes: PruneModes) -> Self {
74 Self { provider, prune_modes, event_sender: Default::default() }
75 }
76}
77
78impl<Provider> StaticFileProducerInner<Provider>
79where
80 Provider: StaticFileProviderFactory + DatabaseProviderFactory<Provider: ChainStateBlockReader>,
81{
82 pub fn last_finalized_block(&self) -> ProviderResult<Option<BlockNumber>> {
84 self.provider.database_provider_ro()?.last_finalized_block_number()
85 }
86}
87
88impl<Provider> StaticFileProducerInner<Provider>
89where
90 Provider: StaticFileProviderFactory
91 + DatabaseProviderFactory<
92 Provider: StaticFileProviderFactory<
93 Primitives: NodePrimitives<
94 SignedTx: Value + Compact,
95 BlockHeader: Value + Compact,
96 Receipt: Value + Compact,
97 >,
98 > + StageCheckpointReader
99 + BlockReader,
100 >,
101{
102 pub fn events(&self) -> EventStream<StaticFileProducerEvent> {
104 self.event_sender.new_listener()
105 }
106
107 pub fn run(&self, targets: StaticFileTargets) -> StaticFileProducerResult {
117 if !targets.any() {
119 return Ok(targets)
120 }
121
122 debug_assert!(targets.is_contiguous_to_highest_static_files(
123 self.provider.static_file_provider().get_highest_static_files()
124 ));
125
126 self.event_sender.notify(StaticFileProducerEvent::Started { targets: targets.clone() });
127
128 debug!(target: "static_file", ?targets, "StaticFileProducer started");
129 let start = Instant::now();
130
131 let mut segments =
132 Vec::<(Box<dyn Segment<Provider::Provider>>, RangeInclusive<BlockNumber>)>::new();
133
134 if let Some(block_range) = targets.transactions.clone() {
135 segments.push((Box::new(segments::Transactions), block_range));
136 }
137 if let Some(block_range) = targets.headers.clone() {
138 segments.push((Box::new(segments::Headers), block_range));
139 }
140 if let Some(block_range) = targets.receipts.clone() {
141 segments.push((Box::new(segments::Receipts), block_range));
142 }
143
144 segments.par_iter().try_for_each(|(segment, block_range)| -> ProviderResult<()> {
145 debug!(target: "static_file", segment = %segment.segment(), ?block_range, "StaticFileProducer segment");
146 let start = Instant::now();
147
148 let provider = self.provider.database_provider_ro()?.disable_long_read_transaction_safety();
151 segment.copy_to_static_files(provider, block_range.clone())?;
152
153 let elapsed = start.elapsed(); debug!(target: "static_file", segment = %segment.segment(), ?block_range, ?elapsed, "Finished StaticFileProducer segment");
155
156 Ok(())
157 })?;
158
159 self.provider.static_file_provider().commit()?;
160 for (segment, block_range) in segments {
161 self.provider
162 .static_file_provider()
163 .update_index(segment.segment(), Some(*block_range.end()))?;
164 }
165
166 let elapsed = start.elapsed(); debug!(target: "static_file", ?targets, ?elapsed, "StaticFileProducer finished");
168
169 self.event_sender
170 .notify(StaticFileProducerEvent::Finished { targets: targets.clone(), elapsed });
171
172 Ok(targets)
173 }
174
175 pub fn copy_to_static_files(&self) -> ProviderResult<HighestStaticFiles> {
180 let provider = self.provider.database_provider_ro()?;
181 let stages_checkpoints = [StageId::Headers, StageId::Execution, StageId::Bodies]
182 .into_iter()
183 .map(|stage| provider.get_stage_checkpoint(stage).map(|c| c.map(|c| c.block_number)))
184 .collect::<Result<Vec<_>, _>>()?;
185
186 let highest_static_files = HighestStaticFiles {
187 headers: stages_checkpoints[0],
188 receipts: stages_checkpoints[1],
189 transactions: stages_checkpoints[2],
190 block_meta: stages_checkpoints[2],
191 };
192 let targets = self.get_static_file_targets(highest_static_files)?;
193 self.run(targets)?;
194
195 Ok(highest_static_files)
196 }
197
198 pub fn get_static_file_targets(
202 &self,
203 finalized_block_numbers: HighestStaticFiles,
204 ) -> ProviderResult<StaticFileTargets> {
205 let highest_static_files = self.provider.static_file_provider().get_highest_static_files();
206
207 let targets = StaticFileTargets {
208 headers: finalized_block_numbers.headers.and_then(|finalized_block_number| {
209 self.get_static_file_target(highest_static_files.headers, finalized_block_number)
210 }),
211 receipts: if self.prune_modes.receipts.is_none() &&
213 self.prune_modes.receipts_log_filter.is_empty()
214 {
215 finalized_block_numbers.receipts.and_then(|finalized_block_number| {
216 self.get_static_file_target(
217 highest_static_files.receipts,
218 finalized_block_number,
219 )
220 })
221 } else {
222 None
223 },
224 transactions: finalized_block_numbers.transactions.and_then(|finalized_block_number| {
225 self.get_static_file_target(
226 highest_static_files.transactions,
227 finalized_block_number,
228 )
229 }),
230 block_meta: finalized_block_numbers.block_meta.and_then(|finalized_block_number| {
231 self.get_static_file_target(highest_static_files.block_meta, finalized_block_number)
232 }),
233 };
234
235 trace!(
236 target: "static_file",
237 ?finalized_block_numbers,
238 ?highest_static_files,
239 ?targets,
240 any = %targets.any(),
241 "StaticFile targets"
242 );
243
244 Ok(targets)
245 }
246
247 fn get_static_file_target(
248 &self,
249 highest_static_file: Option<BlockNumber>,
250 finalized_block_number: BlockNumber,
251 ) -> Option<RangeInclusive<BlockNumber>> {
252 let range = highest_static_file.map_or(0, |block| block + 1)..=finalized_block_number;
253 (!range.is_empty()).then_some(range)
254 }
255}
256
257#[cfg(test)]
258mod tests {
259 use crate::static_file_producer::{
260 StaticFileProducer, StaticFileProducerInner, StaticFileTargets,
261 };
262 use alloy_primitives::{B256, U256};
263 use assert_matches::assert_matches;
264 use reth_db_api::{database::Database, transaction::DbTx};
265 use reth_provider::{
266 providers::StaticFileWriter, test_utils::MockNodeTypesWithDB, ProviderError,
267 ProviderFactory, StaticFileProviderFactory,
268 };
269 use reth_prune_types::PruneModes;
270 use reth_stages::test_utils::{StorageKind, TestStageDB};
271 use reth_static_file_types::{HighestStaticFiles, StaticFileSegment};
272 use reth_testing_utils::generators::{
273 self, random_block_range, random_receipt, BlockRangeParams,
274 };
275 use std::{sync::mpsc::channel, time::Duration};
276 use tempfile::TempDir;
277
278 fn setup() -> (ProviderFactory<MockNodeTypesWithDB>, TempDir) {
279 let mut rng = generators::rng();
280 let db = TestStageDB::default();
281
282 let blocks = random_block_range(
283 &mut rng,
284 0..=3,
285 BlockRangeParams { parent: Some(B256::ZERO), tx_count: 2..3, ..Default::default() },
286 );
287 db.insert_blocks(blocks.iter(), StorageKind::Database(None)).expect("insert blocks");
288 let static_file_provider = db.factory.static_file_provider();
291 let mut static_file_writer = static_file_provider
292 .latest_writer(StaticFileSegment::Headers)
293 .expect("get static file writer for headers");
294 static_file_writer.prune_headers(blocks.len() as u64).unwrap();
295 static_file_writer.commit().expect("prune headers");
296
297 let tx = db.factory.db_ref().tx_mut().expect("init tx");
298 for block in &blocks {
299 TestStageDB::insert_header(None, &tx, block.sealed_header(), U256::ZERO)
300 .expect("insert block header");
301 }
302 tx.commit().expect("commit tx");
303
304 let mut receipts = Vec::new();
305 for block in &blocks {
306 for transaction in &block.body().transactions {
307 receipts
308 .push((receipts.len() as u64, random_receipt(&mut rng, transaction, Some(0))));
309 }
310 }
311 db.insert_receipts(receipts).expect("insert receipts");
312
313 let provider_factory = db.factory;
314 (provider_factory, db.temp_static_files_dir)
315 }
316
317 #[test]
318 fn run() {
319 let (provider_factory, _temp_static_files_dir) = setup();
320
321 let static_file_producer =
322 StaticFileProducerInner::new(provider_factory.clone(), PruneModes::default());
323
324 let targets = static_file_producer
325 .get_static_file_targets(HighestStaticFiles {
326 headers: Some(1),
327 receipts: Some(1),
328 transactions: Some(1),
329 block_meta: None,
330 })
331 .expect("get static file targets");
332 assert_eq!(
333 targets,
334 StaticFileTargets {
335 headers: Some(0..=1),
336 receipts: Some(0..=1),
337 transactions: Some(0..=1),
338 block_meta: None
339 }
340 );
341 assert_matches!(static_file_producer.run(targets), Ok(_));
342 assert_eq!(
343 provider_factory.static_file_provider().get_highest_static_files(),
344 HighestStaticFiles {
345 headers: Some(1),
346 receipts: Some(1),
347 transactions: Some(1),
348 block_meta: None
349 }
350 );
351
352 let targets = static_file_producer
353 .get_static_file_targets(HighestStaticFiles {
354 headers: Some(3),
355 receipts: Some(3),
356 transactions: Some(3),
357 block_meta: None,
358 })
359 .expect("get static file targets");
360 assert_eq!(
361 targets,
362 StaticFileTargets {
363 headers: Some(2..=3),
364 receipts: Some(2..=3),
365 transactions: Some(2..=3),
366 block_meta: None
367 }
368 );
369 assert_matches!(static_file_producer.run(targets), Ok(_));
370 assert_eq!(
371 provider_factory.static_file_provider().get_highest_static_files(),
372 HighestStaticFiles {
373 headers: Some(3),
374 receipts: Some(3),
375 transactions: Some(3),
376 block_meta: None
377 }
378 );
379
380 let targets = static_file_producer
381 .get_static_file_targets(HighestStaticFiles {
382 headers: Some(4),
383 receipts: Some(4),
384 transactions: Some(4),
385 block_meta: None,
386 })
387 .expect("get static file targets");
388 assert_eq!(
389 targets,
390 StaticFileTargets {
391 headers: Some(4..=4),
392 receipts: Some(4..=4),
393 transactions: Some(4..=4),
394 block_meta: None
395 }
396 );
397 assert_matches!(
398 static_file_producer.run(targets),
399 Err(ProviderError::BlockBodyIndicesNotFound(4))
400 );
401 assert_eq!(
402 provider_factory.static_file_provider().get_highest_static_files(),
403 HighestStaticFiles {
404 headers: Some(3),
405 receipts: Some(3),
406 transactions: Some(3),
407 block_meta: None
408 }
409 );
410 }
411
412 #[test]
414 fn only_one() {
415 let (provider_factory, _temp_static_files_dir) = setup();
416
417 let static_file_producer = StaticFileProducer::new(provider_factory, PruneModes::default());
418
419 let (tx, rx) = channel();
420
421 for i in 0..5 {
422 let producer = static_file_producer.clone();
423 let tx = tx.clone();
424
425 std::thread::spawn(move || {
426 let locked_producer = producer.lock();
427 if i == 0 {
428 std::thread::sleep(Duration::from_millis(100));
430 }
431 let targets = locked_producer
432 .get_static_file_targets(HighestStaticFiles {
433 headers: Some(1),
434 receipts: Some(1),
435 transactions: Some(1),
436 block_meta: None,
437 })
438 .expect("get static file targets");
439 assert_matches!(locked_producer.run(targets.clone()), Ok(_));
440 tx.send(targets).unwrap();
441 });
442 }
443
444 drop(tx);
445
446 let mut only_one = Some(());
447 for target in rx {
448 assert!(only_one.take().is_some_and(|_| target.any()) || !target.any())
450 }
451 }
452}