1use crate::{segments, segments::Segment, StaticFileProducerEvent};
4use alloy_primitives::BlockNumber;
5use parking_lot::Mutex;
6use rayon::prelude::*;
7use reth_codecs::Compact;
8use reth_db_api::table::Value;
9use reth_primitives_traits::NodePrimitives;
10use reth_provider::{
11 providers::StaticFileWriter, BlockReader, ChainStateBlockReader, DBProvider,
12 DatabaseProviderFactory, StageCheckpointReader, StaticFileProviderFactory,
13};
14use reth_prune_types::PruneModes;
15use reth_stages_types::StageId;
16use reth_static_file_types::{HighestStaticFiles, StaticFileTargets};
17use reth_storage_errors::provider::ProviderResult;
18use reth_tokio_util::{EventSender, EventStream};
19use std::{
20 ops::{Deref, RangeInclusive},
21 sync::Arc,
22 time::Instant,
23};
24use tracing::{debug, trace};
25
26pub type StaticFileProducerResult = ProviderResult<StaticFileTargets>;
28
29pub type StaticFileProducerWithResult<Provider> =
31 (StaticFileProducer<Provider>, StaticFileProducerResult);
32
33#[derive(Debug)]
36pub struct StaticFileProducer<Provider>(Arc<Mutex<StaticFileProducerInner<Provider>>>);
37
38impl<Provider> StaticFileProducer<Provider> {
39 pub fn new(provider: Provider, prune_modes: PruneModes) -> Self {
41 Self(Arc::new(Mutex::new(StaticFileProducerInner::new(provider, prune_modes))))
42 }
43}
44
45impl<Provider> Clone for StaticFileProducer<Provider> {
46 fn clone(&self) -> Self {
47 Self(self.0.clone())
48 }
49}
50
51impl<Provider> Deref for StaticFileProducer<Provider> {
52 type Target = Arc<Mutex<StaticFileProducerInner<Provider>>>;
53
54 fn deref(&self) -> &Self::Target {
55 &self.0
56 }
57}
58
59#[derive(Debug)]
62pub struct StaticFileProducerInner<Provider> {
63 provider: Provider,
65 prune_modes: PruneModes,
69 event_sender: EventSender<StaticFileProducerEvent>,
70}
71
72impl<Provider> StaticFileProducerInner<Provider> {
73 fn new(provider: Provider, prune_modes: PruneModes) -> Self {
74 Self { provider, prune_modes, event_sender: Default::default() }
75 }
76}
77
78impl<Provider> StaticFileProducerInner<Provider>
79where
80 Provider: StaticFileProviderFactory + DatabaseProviderFactory<Provider: ChainStateBlockReader>,
81{
82 pub fn last_finalized_block(&self) -> ProviderResult<Option<BlockNumber>> {
84 self.provider.database_provider_ro()?.last_finalized_block_number()
85 }
86}
87
88impl<Provider> StaticFileProducerInner<Provider>
89where
90 Provider: StaticFileProviderFactory
91 + DatabaseProviderFactory<
92 Provider: StaticFileProviderFactory<
93 Primitives: NodePrimitives<
94 SignedTx: Value + Compact,
95 BlockHeader: Value + Compact,
96 Receipt: Value + Compact,
97 >,
98 > + StageCheckpointReader
99 + BlockReader
100 + reth_provider::ChangeSetReader,
101 >,
102{
103 pub fn events(&self) -> EventStream<StaticFileProducerEvent> {
105 self.event_sender.new_listener()
106 }
107
108 pub fn run(&self, targets: StaticFileTargets) -> StaticFileProducerResult {
118 if !targets.any() {
120 return Ok(targets)
121 }
122
123 debug_assert!(targets.is_contiguous_to_highest_static_files(
124 self.provider.static_file_provider().get_highest_static_files()
125 ));
126
127 self.event_sender.notify(StaticFileProducerEvent::Started { targets: targets.clone() });
128
129 debug!(target: "static_file", ?targets, "StaticFileProducer started");
130 let start = Instant::now();
131
132 let mut segments =
133 Vec::<(Box<dyn Segment<Provider::Provider>>, RangeInclusive<BlockNumber>)>::new();
134
135 if let Some(block_range) = targets.receipts.clone() {
136 segments.push((Box::new(segments::Receipts), block_range));
137 }
138
139 segments.par_iter().try_for_each(|(segment, block_range)| -> ProviderResult<()> {
140 debug!(target: "static_file", segment = %segment.segment(), ?block_range, "StaticFileProducer segment");
141 let start = Instant::now();
142
143 let provider = self.provider.database_provider_ro()?.disable_long_read_transaction_safety();
146 segment.copy_to_static_files(provider, block_range.clone())?;
147
148 let elapsed = start.elapsed(); debug!(target: "static_file", segment = %segment.segment(), ?block_range, ?elapsed, "Finished StaticFileProducer segment");
150
151 Ok(())
152 })?;
153
154 self.provider.static_file_provider().commit()?;
155 for (segment, block_range) in segments {
156 self.provider
157 .static_file_provider()
158 .update_index(segment.segment(), Some(*block_range.end()))?;
159 }
160
161 let elapsed = start.elapsed(); debug!(target: "static_file", ?targets, ?elapsed, "StaticFileProducer finished");
163
164 self.event_sender
165 .notify(StaticFileProducerEvent::Finished { targets: targets.clone(), elapsed });
166
167 Ok(targets)
168 }
169
170 pub fn copy_to_static_files(&self) -> ProviderResult<HighestStaticFiles> {
175 let provider = self.provider.database_provider_ro()?;
176 let stages_checkpoints = std::iter::once(StageId::Execution)
177 .map(|stage| provider.get_stage_checkpoint(stage).map(|c| c.map(|c| c.block_number)))
178 .collect::<Result<Vec<_>, _>>()?;
179
180 let highest_static_files = HighestStaticFiles { receipts: stages_checkpoints[0] };
181 let targets = self.get_static_file_targets(highest_static_files)?;
182 self.run(targets)?;
183
184 Ok(highest_static_files)
185 }
186
187 pub fn get_static_file_targets(
191 &self,
192 finalized_block_numbers: HighestStaticFiles,
193 ) -> ProviderResult<StaticFileTargets> {
194 let highest_static_files = self.provider.static_file_provider().get_highest_static_files();
195
196 let targets = StaticFileTargets {
197 receipts: if self.prune_modes.receipts.is_none() &&
199 self.prune_modes.receipts_log_filter.is_empty()
200 {
201 finalized_block_numbers.receipts.and_then(|finalized_block_number| {
202 self.get_static_file_target(
203 highest_static_files.receipts,
204 finalized_block_number,
205 )
206 })
207 } else {
208 None
209 },
210 };
211
212 trace!(
213 target: "static_file",
214 ?finalized_block_numbers,
215 ?highest_static_files,
216 ?targets,
217 any = %targets.any(),
218 "StaticFile targets"
219 );
220
221 Ok(targets)
222 }
223
224 fn get_static_file_target(
225 &self,
226 highest_static_file: Option<BlockNumber>,
227 finalized_block_number: BlockNumber,
228 ) -> Option<RangeInclusive<BlockNumber>> {
229 let range = highest_static_file.map_or(0, |block| block + 1)..=finalized_block_number;
230 (!range.is_empty()).then_some(range)
231 }
232}
233
234#[cfg(test)]
235mod tests {
236 use crate::static_file_producer::{
237 StaticFileProducer, StaticFileProducerInner, StaticFileTargets,
238 };
239 use alloy_primitives::B256;
240 use assert_matches::assert_matches;
241 use reth_provider::{
242 providers::StaticFileWriter, test_utils::MockNodeTypesWithDB, ProviderError,
243 ProviderFactory, StaticFileProviderFactory,
244 };
245 use reth_prune_types::PruneModes;
246 use reth_stages::test_utils::{StorageKind, TestStageDB};
247 use reth_static_file_types::{HighestStaticFiles, StaticFileSegment};
248 use reth_testing_utils::generators::{
249 self, random_block_range, random_receipt, BlockRangeParams,
250 };
251 use std::{sync::mpsc::channel, time::Duration};
252 use tempfile::TempDir;
253
254 fn setup() -> (ProviderFactory<MockNodeTypesWithDB>, TempDir) {
255 let mut rng = generators::rng();
256 let db = TestStageDB::default();
257
258 let blocks = random_block_range(
259 &mut rng,
260 0..=3,
261 BlockRangeParams { parent: Some(B256::ZERO), tx_count: 2..3, ..Default::default() },
262 );
263 db.insert_blocks(blocks.iter(), StorageKind::Database(None)).expect("insert blocks");
264 let static_file_provider = db.factory.static_file_provider();
267 let mut static_file_writer = static_file_provider
268 .latest_writer(StaticFileSegment::Headers)
269 .expect("get static file writer for headers");
270 static_file_writer.prune_headers(blocks.len() as u64).unwrap();
271 static_file_writer.commit().expect("prune headers");
272 drop(static_file_writer);
273
274 db.insert_blocks(blocks.iter(), StorageKind::Database(None)).expect("insert blocks");
275
276 let mut receipts = Vec::new();
277 for block in &blocks {
278 for transaction in &block.body().transactions {
279 receipts.push((
280 receipts.len() as u64,
281 random_receipt(&mut rng, transaction, Some(0), None),
282 ));
283 }
284 }
285 db.insert_receipts(receipts).expect("insert receipts");
286
287 let provider_factory = db.factory;
288 (provider_factory, db.temp_static_files_dir)
289 }
290
291 #[test]
292 fn run() {
293 let (provider_factory, _temp_static_files_dir) = setup();
294
295 let static_file_producer =
296 StaticFileProducerInner::new(provider_factory.clone(), PruneModes::default());
297
298 let targets = static_file_producer
299 .get_static_file_targets(HighestStaticFiles { receipts: Some(1) })
300 .expect("get static file targets");
301 assert_eq!(targets, StaticFileTargets { receipts: Some(0..=1) });
302 assert_matches!(static_file_producer.run(targets), Ok(_));
303 assert_eq!(
304 provider_factory.static_file_provider().get_highest_static_files(),
305 HighestStaticFiles { receipts: Some(1) }
306 );
307
308 let targets = static_file_producer
309 .get_static_file_targets(HighestStaticFiles { receipts: Some(3) })
310 .expect("get static file targets");
311 assert_eq!(targets, StaticFileTargets { receipts: Some(2..=3) });
312 assert_matches!(static_file_producer.run(targets), Ok(_));
313 assert_eq!(
314 provider_factory.static_file_provider().get_highest_static_files(),
315 HighestStaticFiles { receipts: Some(3) }
316 );
317
318 let targets = static_file_producer
319 .get_static_file_targets(HighestStaticFiles { receipts: Some(4) })
320 .expect("get static file targets");
321 assert_eq!(targets, StaticFileTargets { receipts: Some(4..=4) });
322 assert_matches!(
323 static_file_producer.run(targets),
324 Err(ProviderError::BlockBodyIndicesNotFound(4))
325 );
326 assert_eq!(
327 provider_factory.static_file_provider().get_highest_static_files(),
328 HighestStaticFiles { receipts: Some(3) }
329 );
330 }
331
332 #[test]
334 fn only_one() {
335 let (provider_factory, _temp_static_files_dir) = setup();
336
337 let static_file_producer = StaticFileProducer::new(provider_factory, PruneModes::default());
338
339 let (tx, rx) = channel();
340
341 for i in 0..5 {
342 let producer = static_file_producer.clone();
343 let tx = tx.clone();
344
345 std::thread::spawn(move || {
346 let locked_producer = producer.lock();
347 if i == 0 {
348 std::thread::sleep(Duration::from_millis(100));
350 }
351 let targets = locked_producer
352 .get_static_file_targets(HighestStaticFiles { receipts: Some(1) })
353 .expect("get static file targets");
354 assert_matches!(locked_producer.run(targets.clone()), Ok(_));
355 tx.send(targets).unwrap();
356 });
357 }
358
359 drop(tx);
360
361 let mut only_one = Some(());
362 for target in rx {
363 assert!(only_one.take().is_some_and(|_| target.any()) || !target.any())
365 }
366 }
367}