1use super::{
2 manager::StaticFileProviderInner, metrics::StaticFileProviderMetrics, StaticFileProvider,
3};
4use crate::providers::static_file::metrics::StaticFileProviderOperation;
5use alloy_consensus::BlockHeader;
6use alloy_primitives::{BlockHash, BlockNumber, TxNumber, U256};
7use parking_lot::{lock_api::RwLockWriteGuard, RawRwLock, RwLock};
8use reth_codecs::Compact;
9use reth_db_api::models::CompactU256;
10use reth_nippy_jar::{NippyJar, NippyJarError, NippyJarWriter};
11use reth_node_types::NodePrimitives;
12use reth_static_file_types::{SegmentHeader, SegmentRangeInclusive, StaticFileSegment};
13use reth_storage_errors::provider::{ProviderError, ProviderResult, StaticFileWriterError};
14use std::{
15 borrow::Borrow,
16 cmp::Ordering,
17 fmt::Debug,
18 path::{Path, PathBuf},
19 sync::{Arc, Weak},
20 time::Instant,
21};
22use tracing::debug;
23
24#[derive(Debug)]
29pub(crate) struct StaticFileWriters<N> {
30 headers: RwLock<Option<StaticFileProviderRW<N>>>,
31 transactions: RwLock<Option<StaticFileProviderRW<N>>>,
32 receipts: RwLock<Option<StaticFileProviderRW<N>>>,
33 transaction_senders: RwLock<Option<StaticFileProviderRW<N>>>,
34}
35
36impl<N> Default for StaticFileWriters<N> {
37 fn default() -> Self {
38 Self {
39 headers: Default::default(),
40 transactions: Default::default(),
41 receipts: Default::default(),
42 transaction_senders: Default::default(),
43 }
44 }
45}
46
47impl<N: NodePrimitives> StaticFileWriters<N> {
48 pub(crate) fn get_or_create(
49 &self,
50 segment: StaticFileSegment,
51 create_fn: impl FnOnce() -> ProviderResult<StaticFileProviderRW<N>>,
52 ) -> ProviderResult<StaticFileProviderRWRefMut<'_, N>> {
53 let mut write_guard = match segment {
54 StaticFileSegment::Headers => self.headers.write(),
55 StaticFileSegment::Transactions => self.transactions.write(),
56 StaticFileSegment::Receipts => self.receipts.write(),
57 StaticFileSegment::TransactionSenders => self.transaction_senders.write(),
58 };
59
60 if write_guard.is_none() {
61 *write_guard = Some(create_fn()?);
62 }
63
64 Ok(StaticFileProviderRWRefMut(write_guard))
65 }
66
67 pub(crate) fn commit(&self) -> ProviderResult<()> {
68 debug!(target: "provider::static_file", "Committing all static file segments");
69
70 for writer_lock in
71 [&self.headers, &self.transactions, &self.receipts, &self.transaction_senders]
72 {
73 let mut writer = writer_lock.write();
74 if let Some(writer) = writer.as_mut() {
75 writer.commit()?;
76 }
77 }
78
79 debug!(target: "provider::static_file", "Committed all static file segments");
80 Ok(())
81 }
82
83 pub(crate) fn has_unwind_queued(&self) -> bool {
84 for writer_lock in
85 [&self.headers, &self.transactions, &self.receipts, &self.transaction_senders]
86 {
87 let writer = writer_lock.read();
88 if let Some(writer) = writer.as_ref() &&
89 writer.will_prune_on_commit()
90 {
91 return true
92 }
93 }
94 false
95 }
96}
97
98#[derive(Debug)]
100pub struct StaticFileProviderRWRefMut<'a, N>(
101 pub(crate) RwLockWriteGuard<'a, RawRwLock, Option<StaticFileProviderRW<N>>>,
102);
103
104impl<N> std::ops::DerefMut for StaticFileProviderRWRefMut<'_, N> {
105 fn deref_mut(&mut self) -> &mut Self::Target {
106 self.0.as_mut().expect("static file writer provider should be init")
108 }
109}
110
111impl<N> std::ops::Deref for StaticFileProviderRWRefMut<'_, N> {
112 type Target = StaticFileProviderRW<N>;
113
114 fn deref(&self) -> &Self::Target {
115 self.0.as_ref().expect("static file writer provider should be init")
117 }
118}
119
120#[derive(Debug)]
121pub struct StaticFileProviderRW<N> {
123 reader: Weak<StaticFileProviderInner<N>>,
127 writer: NippyJarWriter<SegmentHeader>,
129 data_path: PathBuf,
131 buf: Vec<u8>,
133 metrics: Option<Arc<StaticFileProviderMetrics>>,
135 prune_on_commit: Option<(u64, Option<BlockNumber>)>,
138}
139
140impl<N: NodePrimitives> StaticFileProviderRW<N> {
141 pub fn new(
146 segment: StaticFileSegment,
147 block: BlockNumber,
148 reader: Weak<StaticFileProviderInner<N>>,
149 metrics: Option<Arc<StaticFileProviderMetrics>>,
150 ) -> ProviderResult<Self> {
151 let (writer, data_path) = Self::open(segment, block, reader.clone(), metrics.clone())?;
152 let mut writer = Self {
153 writer,
154 data_path,
155 buf: Vec::with_capacity(100),
156 reader,
157 metrics,
158 prune_on_commit: None,
159 };
160
161 writer.ensure_end_range_consistency()?;
162
163 Ok(writer)
164 }
165
166 fn open(
167 segment: StaticFileSegment,
168 block: u64,
169 reader: Weak<StaticFileProviderInner<N>>,
170 metrics: Option<Arc<StaticFileProviderMetrics>>,
171 ) -> ProviderResult<(NippyJarWriter<SegmentHeader>, PathBuf)> {
172 let start = Instant::now();
173
174 let static_file_provider = Self::upgrade_provider_to_strong_reference(&reader);
175
176 let block_range = static_file_provider.find_fixed_range(segment, block);
177 let (jar, path) = match static_file_provider.get_segment_provider_for_block(
178 segment,
179 block_range.start(),
180 None,
181 ) {
182 Ok(provider) => (
183 NippyJar::load(provider.data_path()).map_err(ProviderError::other)?,
184 provider.data_path().into(),
185 ),
186 Err(ProviderError::MissingStaticFileBlock(_, _)) => {
187 let path = static_file_provider.directory().join(segment.filename(&block_range));
188 (create_jar(segment, &path, block_range), path)
189 }
190 Err(err) => return Err(err),
191 };
192
193 let result = match NippyJarWriter::new(jar) {
194 Ok(writer) => Ok((writer, path)),
195 Err(NippyJarError::FrozenJar) => {
196 Err(ProviderError::FinalizedStaticFile(segment, block))
198 }
199 Err(e) => Err(ProviderError::other(e)),
200 }?;
201
202 if let Some(metrics) = &metrics {
203 metrics.record_segment_operation(
204 segment,
205 StaticFileProviderOperation::OpenWriter,
206 Some(start.elapsed()),
207 );
208 }
209
210 Ok(result)
211 }
212
213 fn ensure_end_range_consistency(&mut self) -> ProviderResult<()> {
222 let expected_rows = if self.user_header().segment().is_headers() {
224 self.user_header().block_len().unwrap_or_default()
225 } else {
226 self.user_header().tx_len().unwrap_or_default()
227 };
228 let actual_rows = self.writer.rows() as u64;
229 let pruned_rows = expected_rows.saturating_sub(actual_rows);
230 if pruned_rows > 0 {
231 self.user_header_mut().prune(pruned_rows);
232 }
233
234 debug!(
235 target: "provider::static_file",
236 segment = ?self.writer.user_header().segment(),
237 path = ?self.data_path,
238 pruned_rows,
239 "Ensuring end range consistency"
240 );
241
242 self.writer.commit().map_err(ProviderError::other)?;
243
244 self.update_index()?;
246 Ok(())
247 }
248
249 pub const fn will_prune_on_commit(&self) -> bool {
251 self.prune_on_commit.is_some()
252 }
253
254 pub fn commit(&mut self) -> ProviderResult<()> {
256 let start = Instant::now();
257
258 if let Some((to_delete, last_block_number)) = self.prune_on_commit.take() {
260 debug!(
261 target: "provider::static_file",
262 segment = ?self.writer.user_header().segment(),
263 to_delete,
264 "Pruning data on commit"
265 );
266 match self.writer.user_header().segment() {
267 StaticFileSegment::Headers => self.prune_header_data(to_delete)?,
268 StaticFileSegment::Transactions => self
269 .prune_transaction_data(to_delete, last_block_number.expect("should exist"))?,
270 StaticFileSegment::Receipts => {
271 self.prune_receipt_data(to_delete, last_block_number.expect("should exist"))?
272 }
273 StaticFileSegment::TransactionSenders => self.prune_transaction_sender_data(
274 to_delete,
275 last_block_number.expect("should exist"),
276 )?,
277 }
278 }
279
280 if self.writer.is_dirty() {
281 debug!(
282 target: "provider::static_file",
283 segment = ?self.writer.user_header().segment(),
284 "Committing writer to disk"
285 );
286
287 self.writer.commit().map_err(ProviderError::other)?;
289
290 if let Some(metrics) = &self.metrics {
291 metrics.record_segment_operation(
292 self.writer.user_header().segment(),
293 StaticFileProviderOperation::CommitWriter,
294 Some(start.elapsed()),
295 );
296 }
297
298 debug!(
299 target: "provider::static_file",
300 segment = ?self.writer.user_header().segment(),
301 path = ?self.data_path,
302 duration = ?start.elapsed(),
303 "Committed writer to disk"
304 );
305
306 self.update_index()?;
307 }
308
309 Ok(())
310 }
311
312 #[cfg(feature = "test-utils")]
316 pub fn commit_without_sync_all(&mut self) -> ProviderResult<()> {
317 let start = Instant::now();
318
319 debug!(
320 target: "provider::static_file",
321 segment = ?self.writer.user_header().segment(),
322 "Committing writer to disk (without sync)"
323 );
324
325 self.writer.commit_without_sync_all().map_err(ProviderError::other)?;
327
328 if let Some(metrics) = &self.metrics {
329 metrics.record_segment_operation(
330 self.writer.user_header().segment(),
331 StaticFileProviderOperation::CommitWriter,
332 Some(start.elapsed()),
333 );
334 }
335
336 debug!(
337 target: "provider::static_file",
338 segment = ?self.writer.user_header().segment(),
339 path = ?self.data_path,
340 duration = ?start.elapsed(),
341 "Committed writer to disk (without sync)"
342 );
343
344 self.update_index()?;
345
346 Ok(())
347 }
348
349 fn update_index(&self) -> ProviderResult<()> {
351 let segment_max_block = self
360 .writer
361 .user_header()
362 .block_range()
363 .as_ref()
364 .map(|block_range| block_range.end())
365 .or_else(|| {
366 (self.writer.user_header().expected_block_start() > 0)
367 .then(|| self.writer.user_header().expected_block_start() - 1)
368 });
369
370 self.reader().update_index(self.writer.user_header().segment(), segment_max_block)
371 }
372
373 pub fn ensure_at_block(&mut self, advance_to: BlockNumber) -> ProviderResult<()> {
378 let current_block = if let Some(current_block_number) = self.current_block_number() {
379 current_block_number
380 } else {
381 self.increment_block(0)?;
382 0
383 };
384
385 match current_block.cmp(&advance_to) {
386 Ordering::Less => {
387 for block in current_block + 1..=advance_to {
388 self.increment_block(block)?;
389 }
390 }
391 Ordering::Equal => {}
392 Ordering::Greater => {
393 return Err(ProviderError::UnexpectedStaticFileBlockNumber(
394 self.writer.user_header().segment(),
395 current_block,
396 advance_to,
397 ));
398 }
399 }
400
401 Ok(())
402 }
403
404 pub fn increment_block(&mut self, expected_block_number: BlockNumber) -> ProviderResult<()> {
407 let segment = self.writer.user_header().segment();
408
409 self.check_next_block_number(expected_block_number)?;
410
411 let start = Instant::now();
412 if let Some(last_block) = self.writer.user_header().block_end() {
413 if last_block == self.writer.user_header().expected_block_end() {
415 self.commit()?;
417
418 let (writer, data_path) =
420 Self::open(segment, last_block + 1, self.reader.clone(), self.metrics.clone())?;
421 self.writer = writer;
422 self.data_path = data_path;
423
424 *self.writer.user_header_mut() = SegmentHeader::new(
425 self.reader().find_fixed_range(segment, last_block + 1),
426 None,
427 None,
428 segment,
429 );
430 }
431 }
432
433 self.writer.user_header_mut().increment_block();
434 if let Some(metrics) = &self.metrics {
435 metrics.record_segment_operation(
436 segment,
437 StaticFileProviderOperation::IncrementBlock,
438 Some(start.elapsed()),
439 );
440 }
441
442 Ok(())
443 }
444
445 pub fn current_block_number(&self) -> Option<u64> {
447 self.writer.user_header().block_end()
448 }
449
450 pub fn next_block_number(&self) -> u64 {
452 self.writer
456 .user_header()
457 .block_end()
458 .map(|b| b + 1)
459 .unwrap_or_else(|| self.writer.user_header().expected_block_start())
460 }
461
462 fn check_next_block_number(&self, expected_block_number: u64) -> ProviderResult<()> {
465 let next_static_file_block = self.next_block_number();
466
467 if expected_block_number != next_static_file_block {
468 return Err(ProviderError::UnexpectedStaticFileBlockNumber(
469 self.writer.user_header().segment(),
470 expected_block_number,
471 next_static_file_block,
472 ))
473 }
474 Ok(())
475 }
476
477 fn truncate(&mut self, num_rows: u64, last_block: Option<u64>) -> ProviderResult<()> {
485 let mut remaining_rows = num_rows;
486 let segment = self.writer.user_header().segment();
487 while remaining_rows > 0 {
488 let len = if segment.is_block_based() {
489 self.writer.user_header().block_len().unwrap_or_default()
490 } else {
491 self.writer.user_header().tx_len().unwrap_or_default()
492 };
493
494 if remaining_rows >= len {
495 let block_start = self.writer.user_header().expected_block_start();
498
499 if block_start != 0 &&
505 (segment.is_headers() || last_block.is_some_and(|b| b < block_start))
506 {
507 self.delete_current_and_open_previous()?;
508 } else {
509 self.writer.user_header_mut().prune(len);
511 self.writer.prune_rows(len as usize).map_err(ProviderError::other)?;
512 break
513 }
514
515 remaining_rows -= len;
516 } else {
517 self.writer.user_header_mut().prune(remaining_rows);
519
520 self.writer.prune_rows(remaining_rows as usize).map_err(ProviderError::other)?;
522 remaining_rows = 0;
523 }
524 }
525
526 if let Some(last_block) = last_block {
528 let mut expected_block_start = self.writer.user_header().expected_block_start();
529
530 if num_rows == 0 {
531 while last_block < expected_block_start {
535 self.delete_current_and_open_previous()?;
536 expected_block_start = self.writer.user_header().expected_block_start();
537 }
538 }
539 self.writer.user_header_mut().set_block_range(expected_block_start, last_block);
540 }
541
542 self.commit()?;
544
545 Ok(())
546 }
547
548 fn delete_current_and_open_previous(&mut self) -> Result<(), ProviderError> {
551 let current_path = self.data_path.clone();
552 let (previous_writer, data_path) = Self::open(
553 self.user_header().segment(),
554 self.writer.user_header().expected_block_start() - 1,
555 self.reader.clone(),
556 self.metrics.clone(),
557 )?;
558 self.writer = previous_writer;
559 self.writer.set_dirty();
560 self.data_path = data_path;
561 NippyJar::<SegmentHeader>::load(¤t_path)
562 .map_err(ProviderError::other)?
563 .delete()
564 .map_err(ProviderError::other)?;
565 Ok(())
566 }
567
568 fn append_column<T: Compact>(&mut self, column: T) -> ProviderResult<()> {
570 self.buf.clear();
571 column.to_compact(&mut self.buf);
572
573 self.writer.append_column(Some(Ok(&self.buf))).map_err(ProviderError::other)?;
574 Ok(())
575 }
576
577 fn append_with_tx_number<V: Compact>(
579 &mut self,
580 tx_num: TxNumber,
581 value: V,
582 ) -> ProviderResult<()> {
583 if let Some(range) = self.writer.user_header().tx_range() {
584 let next_tx = range.end() + 1;
585 if next_tx != tx_num {
586 return Err(ProviderError::UnexpectedStaticFileTxNumber(
587 self.writer.user_header().segment(),
588 tx_num,
589 next_tx,
590 ))
591 }
592 self.writer.user_header_mut().increment_tx();
593 } else {
594 self.writer.user_header_mut().set_tx_range(tx_num, tx_num);
595 }
596
597 self.append_column(value)?;
598
599 Ok(())
600 }
601
602 pub fn append_header(&mut self, header: &N::BlockHeader, hash: &BlockHash) -> ProviderResult<()>
607 where
608 N::BlockHeader: Compact,
609 {
610 self.append_header_with_td(header, U256::ZERO, hash)
611 }
612
613 pub fn append_header_with_td(
618 &mut self,
619 header: &N::BlockHeader,
620 total_difficulty: U256,
621 hash: &BlockHash,
622 ) -> ProviderResult<()>
623 where
624 N::BlockHeader: Compact,
625 {
626 let start = Instant::now();
627 self.ensure_no_queued_prune()?;
628
629 debug_assert!(self.writer.user_header().segment() == StaticFileSegment::Headers);
630
631 self.increment_block(header.number())?;
632
633 self.append_column(header)?;
634 self.append_column(CompactU256::from(total_difficulty))?;
635 self.append_column(hash)?;
636
637 if let Some(metrics) = &self.metrics {
638 metrics.record_segment_operation(
639 StaticFileSegment::Headers,
640 StaticFileProviderOperation::Append,
641 Some(start.elapsed()),
642 );
643 }
644
645 Ok(())
646 }
647
648 pub fn append_transaction(&mut self, tx_num: TxNumber, tx: &N::SignedTx) -> ProviderResult<()>
653 where
654 N::SignedTx: Compact,
655 {
656 let start = Instant::now();
657 self.ensure_no_queued_prune()?;
658
659 debug_assert!(self.writer.user_header().segment() == StaticFileSegment::Transactions);
660 self.append_with_tx_number(tx_num, tx)?;
661
662 if let Some(metrics) = &self.metrics {
663 metrics.record_segment_operation(
664 StaticFileSegment::Transactions,
665 StaticFileProviderOperation::Append,
666 Some(start.elapsed()),
667 );
668 }
669
670 Ok(())
671 }
672
673 pub fn append_receipt(&mut self, tx_num: TxNumber, receipt: &N::Receipt) -> ProviderResult<()>
678 where
679 N::Receipt: Compact,
680 {
681 let start = Instant::now();
682 self.ensure_no_queued_prune()?;
683
684 debug_assert!(self.writer.user_header().segment() == StaticFileSegment::Receipts);
685 self.append_with_tx_number(tx_num, receipt)?;
686
687 if let Some(metrics) = &self.metrics {
688 metrics.record_segment_operation(
689 StaticFileSegment::Receipts,
690 StaticFileProviderOperation::Append,
691 Some(start.elapsed()),
692 );
693 }
694
695 Ok(())
696 }
697
698 pub fn append_receipts<I, R>(&mut self, receipts: I) -> ProviderResult<()>
700 where
701 I: Iterator<Item = Result<(TxNumber, R), ProviderError>>,
702 R: Borrow<N::Receipt>,
703 N::Receipt: Compact,
704 {
705 debug_assert!(self.writer.user_header().segment() == StaticFileSegment::Receipts);
706
707 let mut receipts_iter = receipts.into_iter().peekable();
708 if receipts_iter.peek().is_none() {
710 return Ok(());
711 }
712
713 let start = Instant::now();
714 self.ensure_no_queued_prune()?;
715
716 let mut count: u64 = 0;
718
719 for receipt_result in receipts_iter {
720 let (tx_num, receipt) = receipt_result?;
721 self.append_with_tx_number(tx_num, receipt.borrow())?;
722 count += 1;
723 }
724
725 if let Some(metrics) = &self.metrics {
726 metrics.record_segment_operations(
727 StaticFileSegment::Receipts,
728 StaticFileProviderOperation::Append,
729 count,
730 Some(start.elapsed()),
731 );
732 }
733
734 Ok(())
735 }
736
737 pub fn append_transaction_sender(
742 &mut self,
743 tx_num: TxNumber,
744 sender: &alloy_primitives::Address,
745 ) -> ProviderResult<()> {
746 let start = Instant::now();
747 self.ensure_no_queued_prune()?;
748
749 debug_assert!(self.writer.user_header().segment() == StaticFileSegment::TransactionSenders);
750 self.append_with_tx_number(tx_num, sender)?;
751
752 if let Some(metrics) = &self.metrics {
753 metrics.record_segment_operation(
754 StaticFileSegment::TransactionSenders,
755 StaticFileProviderOperation::Append,
756 Some(start.elapsed()),
757 );
758 }
759
760 Ok(())
761 }
762
763 pub fn append_transaction_senders<I>(&mut self, senders: I) -> ProviderResult<()>
765 where
766 I: Iterator<Item = (TxNumber, alloy_primitives::Address)>,
767 {
768 debug_assert!(self.writer.user_header().segment() == StaticFileSegment::TransactionSenders);
769
770 let mut senders_iter = senders.into_iter().peekable();
771 if senders_iter.peek().is_none() {
773 return Ok(());
774 }
775
776 let start = Instant::now();
777 self.ensure_no_queued_prune()?;
778
779 let mut count: u64 = 0;
781 for (tx_num, sender) in senders_iter {
782 self.append_with_tx_number(tx_num, sender)?;
783 count += 1;
784 }
785
786 if let Some(metrics) = &self.metrics {
787 metrics.record_segment_operations(
788 StaticFileSegment::TransactionSenders,
789 StaticFileProviderOperation::Append,
790 count,
791 Some(start.elapsed()),
792 );
793 }
794
795 Ok(())
796 }
797
798 pub fn prune_transactions(
802 &mut self,
803 to_delete: u64,
804 last_block: BlockNumber,
805 ) -> ProviderResult<()> {
806 debug_assert_eq!(self.writer.user_header().segment(), StaticFileSegment::Transactions);
807 self.queue_prune(to_delete, Some(last_block))
808 }
809
810 pub fn prune_receipts(
814 &mut self,
815 to_delete: u64,
816 last_block: BlockNumber,
817 ) -> ProviderResult<()> {
818 debug_assert_eq!(self.writer.user_header().segment(), StaticFileSegment::Receipts);
819 self.queue_prune(to_delete, Some(last_block))
820 }
821
822 pub fn prune_transaction_senders(
826 &mut self,
827 to_delete: u64,
828 last_block: BlockNumber,
829 ) -> ProviderResult<()> {
830 debug_assert_eq!(
831 self.writer.user_header().segment(),
832 StaticFileSegment::TransactionSenders
833 );
834 self.queue_prune(to_delete, Some(last_block))
835 }
836
837 pub fn prune_headers(&mut self, to_delete: u64) -> ProviderResult<()> {
839 debug_assert_eq!(self.writer.user_header().segment(), StaticFileSegment::Headers);
840 self.queue_prune(to_delete, None)
841 }
842
843 fn queue_prune(
848 &mut self,
849 to_delete: u64,
850 last_block: Option<BlockNumber>,
851 ) -> ProviderResult<()> {
852 self.ensure_no_queued_prune()?;
853 self.prune_on_commit = Some((to_delete, last_block));
854 Ok(())
855 }
856
857 fn ensure_no_queued_prune(&self) -> ProviderResult<()> {
859 if self.prune_on_commit.is_some() {
860 return Err(ProviderError::other(StaticFileWriterError::new(
861 "Pruning should be committed before appending or pruning more data",
862 )));
863 }
864 Ok(())
865 }
866
867 fn prune_transaction_data(
869 &mut self,
870 to_delete: u64,
871 last_block: BlockNumber,
872 ) -> ProviderResult<()> {
873 let start = Instant::now();
874
875 debug_assert!(self.writer.user_header().segment() == StaticFileSegment::Transactions);
876
877 self.truncate(to_delete, Some(last_block))?;
878
879 if let Some(metrics) = &self.metrics {
880 metrics.record_segment_operation(
881 StaticFileSegment::Transactions,
882 StaticFileProviderOperation::Prune,
883 Some(start.elapsed()),
884 );
885 }
886
887 Ok(())
888 }
889
890 fn prune_receipt_data(
892 &mut self,
893 to_delete: u64,
894 last_block: BlockNumber,
895 ) -> ProviderResult<()> {
896 let start = Instant::now();
897
898 debug_assert!(self.writer.user_header().segment() == StaticFileSegment::Receipts);
899
900 self.truncate(to_delete, Some(last_block))?;
901
902 if let Some(metrics) = &self.metrics {
903 metrics.record_segment_operation(
904 StaticFileSegment::Receipts,
905 StaticFileProviderOperation::Prune,
906 Some(start.elapsed()),
907 );
908 }
909
910 Ok(())
911 }
912
913 fn prune_transaction_sender_data(
915 &mut self,
916 to_delete: u64,
917 last_block: BlockNumber,
918 ) -> ProviderResult<()> {
919 let start = Instant::now();
920
921 debug_assert!(self.writer.user_header().segment() == StaticFileSegment::TransactionSenders);
922
923 self.truncate(to_delete, Some(last_block))?;
924
925 if let Some(metrics) = &self.metrics {
926 metrics.record_segment_operation(
927 StaticFileSegment::TransactionSenders,
928 StaticFileProviderOperation::Prune,
929 Some(start.elapsed()),
930 );
931 }
932
933 Ok(())
934 }
935
936 fn prune_header_data(&mut self, to_delete: u64) -> ProviderResult<()> {
938 let start = Instant::now();
939
940 debug_assert!(self.writer.user_header().segment() == StaticFileSegment::Headers);
941
942 self.truncate(to_delete, None)?;
943
944 if let Some(metrics) = &self.metrics {
945 metrics.record_segment_operation(
946 StaticFileSegment::Headers,
947 StaticFileProviderOperation::Prune,
948 Some(start.elapsed()),
949 );
950 }
951
952 Ok(())
953 }
954
955 pub fn reader(&self) -> StaticFileProvider<N> {
957 Self::upgrade_provider_to_strong_reference(&self.reader)
958 }
959
960 fn upgrade_provider_to_strong_reference(
969 provider: &Weak<StaticFileProviderInner<N>>,
970 ) -> StaticFileProvider<N> {
971 provider.upgrade().map(StaticFileProvider).expect("StaticFileProvider is dropped")
972 }
973
974 pub const fn user_header(&self) -> &SegmentHeader {
976 self.writer.user_header()
977 }
978
979 pub const fn user_header_mut(&mut self) -> &mut SegmentHeader {
981 self.writer.user_header_mut()
982 }
983
984 #[cfg(any(test, feature = "test-utils"))]
986 pub const fn set_block_range(&mut self, block_range: std::ops::RangeInclusive<BlockNumber>) {
987 self.writer.user_header_mut().set_block_range(*block_range.start(), *block_range.end())
988 }
989
990 #[cfg(any(test, feature = "test-utils"))]
992 pub const fn inner(&mut self) -> &mut NippyJarWriter<SegmentHeader> {
993 &mut self.writer
994 }
995}
996
997fn create_jar(
998 segment: StaticFileSegment,
999 path: &Path,
1000 expected_block_range: SegmentRangeInclusive,
1001) -> NippyJar<SegmentHeader> {
1002 let mut jar = NippyJar::new(
1003 segment.columns(),
1004 path,
1005 SegmentHeader::new(expected_block_range, None, None, segment),
1006 );
1007
1008 if segment.is_headers() {
1011 jar = jar.with_lz4();
1012 }
1013
1014 jar
1015}