1use super::{
2 manager::StaticFileProviderInner, metrics::StaticFileProviderMetrics, StaticFileProvider,
3};
4use crate::providers::static_file::metrics::StaticFileProviderOperation;
5use alloy_consensus::BlockHeader;
6use alloy_primitives::{BlockHash, BlockNumber, TxNumber, U256};
7use parking_lot::{lock_api::RwLockWriteGuard, RawRwLock, RwLock};
8use reth_codecs::Compact;
9use reth_db_api::models::CompactU256;
10use reth_nippy_jar::{NippyJar, NippyJarError, NippyJarWriter};
11use reth_node_types::NodePrimitives;
12use reth_static_file_types::{SegmentHeader, SegmentRangeInclusive, StaticFileSegment};
13use reth_storage_errors::provider::{ProviderError, ProviderResult, StaticFileWriterError};
14use std::{
15 borrow::Borrow,
16 cmp::Ordering,
17 fmt::Debug,
18 path::{Path, PathBuf},
19 sync::{Arc, Weak},
20 time::Instant,
21};
22use tracing::debug;
23
24#[derive(Debug)]
29pub(crate) struct StaticFileWriters<N> {
30 headers: RwLock<Option<StaticFileProviderRW<N>>>,
31 transactions: RwLock<Option<StaticFileProviderRW<N>>>,
32 receipts: RwLock<Option<StaticFileProviderRW<N>>>,
33 transaction_senders: RwLock<Option<StaticFileProviderRW<N>>>,
34}
35
36impl<N> Default for StaticFileWriters<N> {
37 fn default() -> Self {
38 Self {
39 headers: Default::default(),
40 transactions: Default::default(),
41 receipts: Default::default(),
42 transaction_senders: Default::default(),
43 }
44 }
45}
46
47impl<N: NodePrimitives> StaticFileWriters<N> {
48 pub(crate) fn get_or_create(
49 &self,
50 segment: StaticFileSegment,
51 create_fn: impl FnOnce() -> ProviderResult<StaticFileProviderRW<N>>,
52 ) -> ProviderResult<StaticFileProviderRWRefMut<'_, N>> {
53 let mut write_guard = match segment {
54 StaticFileSegment::Headers => self.headers.write(),
55 StaticFileSegment::Transactions => self.transactions.write(),
56 StaticFileSegment::Receipts => self.receipts.write(),
57 StaticFileSegment::TransactionSenders => self.transaction_senders.write(),
58 };
59
60 if write_guard.is_none() {
61 *write_guard = Some(create_fn()?);
62 }
63
64 Ok(StaticFileProviderRWRefMut(write_guard))
65 }
66
67 pub(crate) fn commit(&self) -> ProviderResult<()> {
68 debug!(target: "provider::static_file", "Committing all static file segments");
69
70 for writer_lock in
71 [&self.headers, &self.transactions, &self.receipts, &self.transaction_senders]
72 {
73 let mut writer = writer_lock.write();
74 if let Some(writer) = writer.as_mut() {
75 writer.commit()?;
76 }
77 }
78
79 debug!(target: "provider::static_file", "Committed all static file segments");
80 Ok(())
81 }
82
83 pub(crate) fn has_unwind_queued(&self) -> bool {
84 for writer_lock in
85 [&self.headers, &self.transactions, &self.receipts, &self.transaction_senders]
86 {
87 let writer = writer_lock.read();
88 if let Some(writer) = writer.as_ref() &&
89 writer.will_prune_on_commit()
90 {
91 return true
92 }
93 }
94 false
95 }
96}
97
98#[derive(Debug)]
100pub struct StaticFileProviderRWRefMut<'a, N>(
101 pub(crate) RwLockWriteGuard<'a, RawRwLock, Option<StaticFileProviderRW<N>>>,
102);
103
104impl<N> std::ops::DerefMut for StaticFileProviderRWRefMut<'_, N> {
105 fn deref_mut(&mut self) -> &mut Self::Target {
106 self.0.as_mut().expect("static file writer provider should be init")
108 }
109}
110
111impl<N> std::ops::Deref for StaticFileProviderRWRefMut<'_, N> {
112 type Target = StaticFileProviderRW<N>;
113
114 fn deref(&self) -> &Self::Target {
115 self.0.as_ref().expect("static file writer provider should be init")
117 }
118}
119
120#[derive(Debug)]
121pub struct StaticFileProviderRW<N> {
123 reader: Weak<StaticFileProviderInner<N>>,
127 writer: NippyJarWriter<SegmentHeader>,
129 data_path: PathBuf,
131 buf: Vec<u8>,
133 metrics: Option<Arc<StaticFileProviderMetrics>>,
135 prune_on_commit: Option<(u64, Option<BlockNumber>)>,
138}
139
140impl<N: NodePrimitives> StaticFileProviderRW<N> {
141 pub fn new(
146 segment: StaticFileSegment,
147 block: BlockNumber,
148 reader: Weak<StaticFileProviderInner<N>>,
149 metrics: Option<Arc<StaticFileProviderMetrics>>,
150 ) -> ProviderResult<Self> {
151 let (writer, data_path) = Self::open(segment, block, reader.clone(), metrics.clone())?;
152 let mut writer = Self {
153 writer,
154 data_path,
155 buf: Vec::with_capacity(100),
156 reader,
157 metrics,
158 prune_on_commit: None,
159 };
160
161 writer.ensure_end_range_consistency()?;
162
163 Ok(writer)
164 }
165
166 fn open(
167 segment: StaticFileSegment,
168 block: u64,
169 reader: Weak<StaticFileProviderInner<N>>,
170 metrics: Option<Arc<StaticFileProviderMetrics>>,
171 ) -> ProviderResult<(NippyJarWriter<SegmentHeader>, PathBuf)> {
172 let start = Instant::now();
173
174 let static_file_provider = Self::upgrade_provider_to_strong_reference(&reader);
175
176 let block_range = static_file_provider.find_fixed_range(segment, block);
177 let (jar, path) = match static_file_provider.get_segment_provider_for_block(
178 segment,
179 block_range.start(),
180 None,
181 ) {
182 Ok(provider) => (
183 NippyJar::load(provider.data_path()).map_err(ProviderError::other)?,
184 provider.data_path().into(),
185 ),
186 Err(ProviderError::MissingStaticFileBlock(_, _)) => {
187 let path = static_file_provider.directory().join(segment.filename(&block_range));
188 (create_jar(segment, &path, block_range), path)
189 }
190 Err(err) => return Err(err),
191 };
192
193 let result = match NippyJarWriter::new(jar) {
194 Ok(writer) => Ok((writer, path)),
195 Err(NippyJarError::FrozenJar) => {
196 Err(ProviderError::FinalizedStaticFile(segment, block))
198 }
199 Err(e) => Err(ProviderError::other(e)),
200 }?;
201
202 if let Some(metrics) = &metrics {
203 metrics.record_segment_operation(
204 segment,
205 StaticFileProviderOperation::OpenWriter,
206 Some(start.elapsed()),
207 );
208 }
209
210 Ok(result)
211 }
212
213 fn ensure_end_range_consistency(&mut self) -> ProviderResult<()> {
222 let expected_rows = if self.user_header().segment().is_headers() {
224 self.user_header().block_len().unwrap_or_default()
225 } else {
226 self.user_header().tx_len().unwrap_or_default()
227 };
228 let actual_rows = self.writer.rows() as u64;
229 let pruned_rows = expected_rows.saturating_sub(actual_rows);
230 if pruned_rows > 0 {
231 self.user_header_mut().prune(pruned_rows);
232 }
233
234 debug!(
235 target: "provider::static_file",
236 segment = ?self.writer.user_header().segment(),
237 path = ?self.data_path,
238 pruned_rows,
239 "Ensuring end range consistency"
240 );
241
242 self.writer.commit().map_err(ProviderError::other)?;
243
244 self.update_index()?;
246 Ok(())
247 }
248
249 pub const fn will_prune_on_commit(&self) -> bool {
251 self.prune_on_commit.is_some()
252 }
253
254 pub fn commit(&mut self) -> ProviderResult<()> {
256 let start = Instant::now();
257
258 if let Some((to_delete, last_block_number)) = self.prune_on_commit.take() {
260 debug!(
261 target: "provider::static_file",
262 segment = ?self.writer.user_header().segment(),
263 to_delete,
264 "Pruning data on commit"
265 );
266 match self.writer.user_header().segment() {
267 StaticFileSegment::Headers => self.prune_header_data(to_delete)?,
268 StaticFileSegment::Transactions => self
269 .prune_transaction_data(to_delete, last_block_number.expect("should exist"))?,
270 StaticFileSegment::Receipts => {
271 self.prune_receipt_data(to_delete, last_block_number.expect("should exist"))?
272 }
273 StaticFileSegment::TransactionSenders => self.prune_transaction_sender_data(
274 to_delete,
275 last_block_number.expect("should exist"),
276 )?,
277 }
278 }
279
280 if self.writer.is_dirty() {
281 debug!(
282 target: "provider::static_file",
283 segment = ?self.writer.user_header().segment(),
284 "Committing writer to disk"
285 );
286
287 self.writer.commit().map_err(ProviderError::other)?;
289
290 if let Some(metrics) = &self.metrics {
291 metrics.record_segment_operation(
292 self.writer.user_header().segment(),
293 StaticFileProviderOperation::CommitWriter,
294 Some(start.elapsed()),
295 );
296 }
297
298 debug!(
299 target: "provider::static_file",
300 segment = ?self.writer.user_header().segment(),
301 path = ?self.data_path,
302 duration = ?start.elapsed(),
303 "Committed writer to disk"
304 );
305
306 self.update_index()?;
307 }
308
309 Ok(())
310 }
311
312 #[cfg(feature = "test-utils")]
316 pub fn commit_without_sync_all(&mut self) -> ProviderResult<()> {
317 let start = Instant::now();
318
319 debug!(
320 target: "provider::static_file",
321 segment = ?self.writer.user_header().segment(),
322 "Committing writer to disk (without sync)"
323 );
324
325 self.writer.commit_without_sync_all().map_err(ProviderError::other)?;
327
328 if let Some(metrics) = &self.metrics {
329 metrics.record_segment_operation(
330 self.writer.user_header().segment(),
331 StaticFileProviderOperation::CommitWriter,
332 Some(start.elapsed()),
333 );
334 }
335
336 debug!(
337 target: "provider::static_file",
338 segment = ?self.writer.user_header().segment(),
339 path = ?self.data_path,
340 duration = ?start.elapsed(),
341 "Committed writer to disk (without sync)"
342 );
343
344 self.update_index()?;
345
346 Ok(())
347 }
348
349 fn update_index(&self) -> ProviderResult<()> {
351 let segment_max_block = self
360 .writer
361 .user_header()
362 .block_range()
363 .as_ref()
364 .map(|block_range| block_range.end())
365 .or_else(|| {
366 (self.writer.user_header().expected_block_start() >
367 self.reader().genesis_block_number())
368 .then(|| self.writer.user_header().expected_block_start() - 1)
369 });
370
371 self.reader().update_index(self.writer.user_header().segment(), segment_max_block)
372 }
373
374 pub fn ensure_at_block(&mut self, advance_to: BlockNumber) -> ProviderResult<()> {
379 let current_block = if let Some(current_block_number) = self.current_block_number() {
380 current_block_number
381 } else {
382 self.increment_block(0)?;
383 0
384 };
385
386 match current_block.cmp(&advance_to) {
387 Ordering::Less => {
388 for block in current_block + 1..=advance_to {
389 self.increment_block(block)?;
390 }
391 }
392 Ordering::Equal => {}
393 Ordering::Greater => {
394 return Err(ProviderError::UnexpectedStaticFileBlockNumber(
395 self.writer.user_header().segment(),
396 current_block,
397 advance_to,
398 ));
399 }
400 }
401
402 Ok(())
403 }
404
405 pub fn increment_block(&mut self, expected_block_number: BlockNumber) -> ProviderResult<()> {
408 let segment = self.writer.user_header().segment();
409
410 self.check_next_block_number(expected_block_number)?;
411
412 let start = Instant::now();
413 if let Some(last_block) = self.writer.user_header().block_end() {
414 if last_block == self.writer.user_header().expected_block_end() {
416 self.commit()?;
418
419 let (writer, data_path) =
421 Self::open(segment, last_block + 1, self.reader.clone(), self.metrics.clone())?;
422 self.writer = writer;
423 self.data_path = data_path;
424
425 *self.writer.user_header_mut() = SegmentHeader::new(
426 self.reader().find_fixed_range(segment, last_block + 1),
427 None,
428 None,
429 segment,
430 );
431 }
432 }
433
434 self.writer.user_header_mut().increment_block();
435 if let Some(metrics) = &self.metrics {
436 metrics.record_segment_operation(
437 segment,
438 StaticFileProviderOperation::IncrementBlock,
439 Some(start.elapsed()),
440 );
441 }
442
443 Ok(())
444 }
445
446 pub fn current_block_number(&self) -> Option<u64> {
448 self.writer.user_header().block_end()
449 }
450
451 pub fn next_block_number(&self) -> u64 {
453 self.writer
457 .user_header()
458 .block_end()
459 .map(|b| b + 1)
460 .unwrap_or_else(|| self.writer.user_header().expected_block_start())
461 }
462
463 fn check_next_block_number(&self, expected_block_number: u64) -> ProviderResult<()> {
466 let next_static_file_block = self.next_block_number();
467
468 if expected_block_number != next_static_file_block {
469 return Err(ProviderError::UnexpectedStaticFileBlockNumber(
470 self.writer.user_header().segment(),
471 expected_block_number,
472 next_static_file_block,
473 ))
474 }
475 Ok(())
476 }
477
478 fn truncate(&mut self, num_rows: u64, last_block: Option<u64>) -> ProviderResult<()> {
486 let mut remaining_rows = num_rows;
487 let segment = self.writer.user_header().segment();
488 while remaining_rows > 0 {
489 let len = if segment.is_block_based() {
490 self.writer.user_header().block_len().unwrap_or_default()
491 } else {
492 self.writer.user_header().tx_len().unwrap_or_default()
493 };
494
495 if remaining_rows >= len {
496 let block_start = self.writer.user_header().expected_block_start();
499
500 if block_start != 0 &&
506 (segment.is_headers() || last_block.is_some_and(|b| b < block_start))
507 {
508 self.delete_current_and_open_previous()?;
509 } else {
510 self.writer.user_header_mut().prune(len);
512 self.writer.prune_rows(len as usize).map_err(ProviderError::other)?;
513 break
514 }
515
516 remaining_rows -= len;
517 } else {
518 self.writer.user_header_mut().prune(remaining_rows);
520
521 self.writer.prune_rows(remaining_rows as usize).map_err(ProviderError::other)?;
523 remaining_rows = 0;
524 }
525 }
526
527 if let Some(last_block) = last_block {
529 let mut expected_block_start = self.writer.user_header().expected_block_start();
530
531 if num_rows == 0 {
532 while last_block < expected_block_start {
536 self.delete_current_and_open_previous()?;
537 expected_block_start = self.writer.user_header().expected_block_start();
538 }
539 }
540 self.writer.user_header_mut().set_block_range(expected_block_start, last_block);
541 }
542
543 self.commit()?;
545
546 Ok(())
547 }
548
549 fn delete_current_and_open_previous(&mut self) -> Result<(), ProviderError> {
552 let current_path = self.data_path.clone();
553 let (previous_writer, data_path) = Self::open(
554 self.user_header().segment(),
555 self.writer.user_header().expected_block_start() - 1,
556 self.reader.clone(),
557 self.metrics.clone(),
558 )?;
559 self.writer = previous_writer;
560 self.writer.set_dirty();
561 self.data_path = data_path;
562 NippyJar::<SegmentHeader>::load(¤t_path)
563 .map_err(ProviderError::other)?
564 .delete()
565 .map_err(ProviderError::other)?;
566 Ok(())
567 }
568
569 fn append_column<T: Compact>(&mut self, column: T) -> ProviderResult<()> {
571 self.buf.clear();
572 column.to_compact(&mut self.buf);
573
574 self.writer.append_column(Some(Ok(&self.buf))).map_err(ProviderError::other)?;
575 Ok(())
576 }
577
578 fn append_with_tx_number<V: Compact>(
580 &mut self,
581 tx_num: TxNumber,
582 value: V,
583 ) -> ProviderResult<()> {
584 if let Some(range) = self.writer.user_header().tx_range() {
585 let next_tx = range.end() + 1;
586 if next_tx != tx_num {
587 return Err(ProviderError::UnexpectedStaticFileTxNumber(
588 self.writer.user_header().segment(),
589 tx_num,
590 next_tx,
591 ))
592 }
593 self.writer.user_header_mut().increment_tx();
594 } else {
595 self.writer.user_header_mut().set_tx_range(tx_num, tx_num);
596 }
597
598 self.append_column(value)?;
599
600 Ok(())
601 }
602
603 pub fn append_header(&mut self, header: &N::BlockHeader, hash: &BlockHash) -> ProviderResult<()>
608 where
609 N::BlockHeader: Compact,
610 {
611 self.append_header_with_td(header, U256::ZERO, hash)
612 }
613
614 pub fn append_header_with_td(
619 &mut self,
620 header: &N::BlockHeader,
621 total_difficulty: U256,
622 hash: &BlockHash,
623 ) -> ProviderResult<()>
624 where
625 N::BlockHeader: Compact,
626 {
627 let start = Instant::now();
628 self.ensure_no_queued_prune()?;
629
630 debug_assert!(self.writer.user_header().segment() == StaticFileSegment::Headers);
631
632 self.increment_block(header.number())?;
633
634 self.append_column(header)?;
635 self.append_column(CompactU256::from(total_difficulty))?;
636 self.append_column(hash)?;
637
638 if let Some(metrics) = &self.metrics {
639 metrics.record_segment_operation(
640 StaticFileSegment::Headers,
641 StaticFileProviderOperation::Append,
642 Some(start.elapsed()),
643 );
644 }
645
646 Ok(())
647 }
648
649 pub fn append_header_direct(
652 &mut self,
653 header: &N::BlockHeader,
654 total_difficulty: U256,
655 hash: &BlockHash,
656 ) -> ProviderResult<()>
657 where
658 N::BlockHeader: Compact,
659 {
660 let start = Instant::now();
661 self.ensure_no_queued_prune()?;
662
663 debug_assert!(self.writer.user_header().segment() == StaticFileSegment::Headers);
664
665 self.append_column(header)?;
666 self.append_column(CompactU256::from(total_difficulty))?;
667 self.append_column(hash)?;
668
669 if let Some(metrics) = &self.metrics {
670 metrics.record_segment_operation(
671 StaticFileSegment::Headers,
672 StaticFileProviderOperation::Append,
673 Some(start.elapsed()),
674 );
675 }
676
677 Ok(())
678 }
679
680 pub fn append_transaction(&mut self, tx_num: TxNumber, tx: &N::SignedTx) -> ProviderResult<()>
685 where
686 N::SignedTx: Compact,
687 {
688 let start = Instant::now();
689 self.ensure_no_queued_prune()?;
690
691 debug_assert!(self.writer.user_header().segment() == StaticFileSegment::Transactions);
692 self.append_with_tx_number(tx_num, tx)?;
693
694 if let Some(metrics) = &self.metrics {
695 metrics.record_segment_operation(
696 StaticFileSegment::Transactions,
697 StaticFileProviderOperation::Append,
698 Some(start.elapsed()),
699 );
700 }
701
702 Ok(())
703 }
704
705 pub fn append_receipt(&mut self, tx_num: TxNumber, receipt: &N::Receipt) -> ProviderResult<()>
710 where
711 N::Receipt: Compact,
712 {
713 let start = Instant::now();
714 self.ensure_no_queued_prune()?;
715
716 debug_assert!(self.writer.user_header().segment() == StaticFileSegment::Receipts);
717 self.append_with_tx_number(tx_num, receipt)?;
718
719 if let Some(metrics) = &self.metrics {
720 metrics.record_segment_operation(
721 StaticFileSegment::Receipts,
722 StaticFileProviderOperation::Append,
723 Some(start.elapsed()),
724 );
725 }
726
727 Ok(())
728 }
729
730 pub fn append_receipts<I, R>(&mut self, receipts: I) -> ProviderResult<()>
732 where
733 I: Iterator<Item = Result<(TxNumber, R), ProviderError>>,
734 R: Borrow<N::Receipt>,
735 N::Receipt: Compact,
736 {
737 debug_assert!(self.writer.user_header().segment() == StaticFileSegment::Receipts);
738
739 let mut receipts_iter = receipts.into_iter().peekable();
740 if receipts_iter.peek().is_none() {
742 return Ok(());
743 }
744
745 let start = Instant::now();
746 self.ensure_no_queued_prune()?;
747
748 let mut count: u64 = 0;
750
751 for receipt_result in receipts_iter {
752 let (tx_num, receipt) = receipt_result?;
753 self.append_with_tx_number(tx_num, receipt.borrow())?;
754 count += 1;
755 }
756
757 if let Some(metrics) = &self.metrics {
758 metrics.record_segment_operations(
759 StaticFileSegment::Receipts,
760 StaticFileProviderOperation::Append,
761 count,
762 Some(start.elapsed()),
763 );
764 }
765
766 Ok(())
767 }
768
769 pub fn append_transaction_sender(
774 &mut self,
775 tx_num: TxNumber,
776 sender: &alloy_primitives::Address,
777 ) -> ProviderResult<()> {
778 let start = Instant::now();
779 self.ensure_no_queued_prune()?;
780
781 debug_assert!(self.writer.user_header().segment() == StaticFileSegment::TransactionSenders);
782 self.append_with_tx_number(tx_num, sender)?;
783
784 if let Some(metrics) = &self.metrics {
785 metrics.record_segment_operation(
786 StaticFileSegment::TransactionSenders,
787 StaticFileProviderOperation::Append,
788 Some(start.elapsed()),
789 );
790 }
791
792 Ok(())
793 }
794
795 pub fn append_transaction_senders<I>(&mut self, senders: I) -> ProviderResult<()>
797 where
798 I: Iterator<Item = (TxNumber, alloy_primitives::Address)>,
799 {
800 debug_assert!(self.writer.user_header().segment() == StaticFileSegment::TransactionSenders);
801
802 let mut senders_iter = senders.into_iter().peekable();
803 if senders_iter.peek().is_none() {
805 return Ok(());
806 }
807
808 let start = Instant::now();
809 self.ensure_no_queued_prune()?;
810
811 let mut count: u64 = 0;
813 for (tx_num, sender) in senders_iter {
814 self.append_with_tx_number(tx_num, sender)?;
815 count += 1;
816 }
817
818 if let Some(metrics) = &self.metrics {
819 metrics.record_segment_operations(
820 StaticFileSegment::TransactionSenders,
821 StaticFileProviderOperation::Append,
822 count,
823 Some(start.elapsed()),
824 );
825 }
826
827 Ok(())
828 }
829
830 pub fn prune_transactions(
834 &mut self,
835 to_delete: u64,
836 last_block: BlockNumber,
837 ) -> ProviderResult<()> {
838 debug_assert_eq!(self.writer.user_header().segment(), StaticFileSegment::Transactions);
839 self.queue_prune(to_delete, Some(last_block))
840 }
841
842 pub fn prune_receipts(
846 &mut self,
847 to_delete: u64,
848 last_block: BlockNumber,
849 ) -> ProviderResult<()> {
850 debug_assert_eq!(self.writer.user_header().segment(), StaticFileSegment::Receipts);
851 self.queue_prune(to_delete, Some(last_block))
852 }
853
854 pub fn prune_transaction_senders(
858 &mut self,
859 to_delete: u64,
860 last_block: BlockNumber,
861 ) -> ProviderResult<()> {
862 debug_assert_eq!(
863 self.writer.user_header().segment(),
864 StaticFileSegment::TransactionSenders
865 );
866 self.queue_prune(to_delete, Some(last_block))
867 }
868
869 pub fn prune_headers(&mut self, to_delete: u64) -> ProviderResult<()> {
871 debug_assert_eq!(self.writer.user_header().segment(), StaticFileSegment::Headers);
872 self.queue_prune(to_delete, None)
873 }
874
875 fn queue_prune(
880 &mut self,
881 to_delete: u64,
882 last_block: Option<BlockNumber>,
883 ) -> ProviderResult<()> {
884 self.ensure_no_queued_prune()?;
885 self.prune_on_commit = Some((to_delete, last_block));
886 Ok(())
887 }
888
889 fn ensure_no_queued_prune(&self) -> ProviderResult<()> {
891 if self.prune_on_commit.is_some() {
892 return Err(ProviderError::other(StaticFileWriterError::new(
893 "Pruning should be committed before appending or pruning more data",
894 )));
895 }
896 Ok(())
897 }
898
899 fn prune_transaction_data(
901 &mut self,
902 to_delete: u64,
903 last_block: BlockNumber,
904 ) -> ProviderResult<()> {
905 let start = Instant::now();
906
907 debug_assert!(self.writer.user_header().segment() == StaticFileSegment::Transactions);
908
909 self.truncate(to_delete, Some(last_block))?;
910
911 if let Some(metrics) = &self.metrics {
912 metrics.record_segment_operation(
913 StaticFileSegment::Transactions,
914 StaticFileProviderOperation::Prune,
915 Some(start.elapsed()),
916 );
917 }
918
919 Ok(())
920 }
921
922 fn prune_receipt_data(
924 &mut self,
925 to_delete: u64,
926 last_block: BlockNumber,
927 ) -> ProviderResult<()> {
928 let start = Instant::now();
929
930 debug_assert!(self.writer.user_header().segment() == StaticFileSegment::Receipts);
931
932 self.truncate(to_delete, Some(last_block))?;
933
934 if let Some(metrics) = &self.metrics {
935 metrics.record_segment_operation(
936 StaticFileSegment::Receipts,
937 StaticFileProviderOperation::Prune,
938 Some(start.elapsed()),
939 );
940 }
941
942 Ok(())
943 }
944
945 fn prune_transaction_sender_data(
947 &mut self,
948 to_delete: u64,
949 last_block: BlockNumber,
950 ) -> ProviderResult<()> {
951 let start = Instant::now();
952
953 debug_assert!(self.writer.user_header().segment() == StaticFileSegment::TransactionSenders);
954
955 self.truncate(to_delete, Some(last_block))?;
956
957 if let Some(metrics) = &self.metrics {
958 metrics.record_segment_operation(
959 StaticFileSegment::TransactionSenders,
960 StaticFileProviderOperation::Prune,
961 Some(start.elapsed()),
962 );
963 }
964
965 Ok(())
966 }
967
968 fn prune_header_data(&mut self, to_delete: u64) -> ProviderResult<()> {
970 let start = Instant::now();
971
972 debug_assert!(self.writer.user_header().segment() == StaticFileSegment::Headers);
973
974 self.truncate(to_delete, None)?;
975
976 if let Some(metrics) = &self.metrics {
977 metrics.record_segment_operation(
978 StaticFileSegment::Headers,
979 StaticFileProviderOperation::Prune,
980 Some(start.elapsed()),
981 );
982 }
983
984 Ok(())
985 }
986
987 pub fn reader(&self) -> StaticFileProvider<N> {
989 Self::upgrade_provider_to_strong_reference(&self.reader)
990 }
991
992 fn upgrade_provider_to_strong_reference(
1001 provider: &Weak<StaticFileProviderInner<N>>,
1002 ) -> StaticFileProvider<N> {
1003 provider.upgrade().map(StaticFileProvider).expect("StaticFileProvider is dropped")
1004 }
1005
1006 pub const fn user_header(&self) -> &SegmentHeader {
1008 self.writer.user_header()
1009 }
1010
1011 pub const fn user_header_mut(&mut self) -> &mut SegmentHeader {
1013 self.writer.user_header_mut()
1014 }
1015
1016 #[cfg(any(test, feature = "test-utils"))]
1018 pub const fn set_block_range(&mut self, block_range: std::ops::RangeInclusive<BlockNumber>) {
1019 self.writer.user_header_mut().set_block_range(*block_range.start(), *block_range.end())
1020 }
1021
1022 #[cfg(any(test, feature = "test-utils"))]
1024 pub const fn inner(&mut self) -> &mut NippyJarWriter<SegmentHeader> {
1025 &mut self.writer
1026 }
1027}
1028
1029fn create_jar(
1030 segment: StaticFileSegment,
1031 path: &Path,
1032 expected_block_range: SegmentRangeInclusive,
1033) -> NippyJar<SegmentHeader> {
1034 let mut jar = NippyJar::new(
1035 segment.columns(),
1036 path,
1037 SegmentHeader::new(expected_block_range, None, None, segment),
1038 );
1039
1040 if segment.is_headers() {
1043 jar = jar.with_lz4();
1044 }
1045
1046 jar
1047}