1use super::{
2 manager::StaticFileProviderInner, metrics::StaticFileProviderMetrics, StaticFileProvider,
3};
4use crate::providers::static_file::metrics::StaticFileProviderOperation;
5use alloy_consensus::BlockHeader;
6use alloy_primitives::{BlockHash, BlockNumber, TxNumber, U256};
7use parking_lot::{lock_api::RwLockWriteGuard, RawRwLock, RwLock};
8use reth_codecs::Compact;
9use reth_db::models::{AccountBeforeTx, StorageBeforeTx};
10use reth_db_api::models::CompactU256;
11use reth_nippy_jar::{NippyJar, NippyJarError, NippyJarWriter};
12use reth_node_types::NodePrimitives;
13use reth_static_file_types::{SegmentHeader, SegmentRangeInclusive, StaticFileSegment};
14use reth_storage_errors::provider::{ProviderError, ProviderResult, StaticFileWriterError};
15use std::{
16 borrow::Borrow,
17 cmp::Ordering,
18 fmt::Debug,
19 path::{Path, PathBuf},
20 sync::{Arc, Weak},
21 time::Instant,
22};
23use tracing::debug;
24
25#[derive(Debug, Clone, Copy)]
27enum PruneStrategy {
28 Headers {
30 num_blocks: u64,
32 },
33 Transactions {
35 num_rows: u64,
37 last_block: BlockNumber,
39 },
40 Receipts {
42 num_rows: u64,
44 last_block: BlockNumber,
46 },
47 TransactionSenders {
49 num_rows: u64,
51 last_block: BlockNumber,
53 },
54 AccountChangeSets {
56 last_block: BlockNumber,
58 },
59 StorageChangeSets {
61 last_block: BlockNumber,
63 },
64}
65
66#[derive(Debug)]
71pub(crate) struct StaticFileWriters<N> {
72 headers: RwLock<Option<StaticFileProviderRW<N>>>,
73 transactions: RwLock<Option<StaticFileProviderRW<N>>>,
74 receipts: RwLock<Option<StaticFileProviderRW<N>>>,
75 transaction_senders: RwLock<Option<StaticFileProviderRW<N>>>,
76 account_change_sets: RwLock<Option<StaticFileProviderRW<N>>>,
77 storage_change_sets: RwLock<Option<StaticFileProviderRW<N>>>,
78}
79
80impl<N> Default for StaticFileWriters<N> {
81 fn default() -> Self {
82 Self {
83 headers: Default::default(),
84 transactions: Default::default(),
85 receipts: Default::default(),
86 transaction_senders: Default::default(),
87 account_change_sets: Default::default(),
88 storage_change_sets: Default::default(),
89 }
90 }
91}
92
93impl<N: NodePrimitives> StaticFileWriters<N> {
94 pub(crate) fn get_or_create(
95 &self,
96 segment: StaticFileSegment,
97 create_fn: impl FnOnce() -> ProviderResult<StaticFileProviderRW<N>>,
98 ) -> ProviderResult<StaticFileProviderRWRefMut<'_, N>> {
99 let mut write_guard = match segment {
100 StaticFileSegment::Headers => self.headers.write(),
101 StaticFileSegment::Transactions => self.transactions.write(),
102 StaticFileSegment::Receipts => self.receipts.write(),
103 StaticFileSegment::TransactionSenders => self.transaction_senders.write(),
104 StaticFileSegment::AccountChangeSets => self.account_change_sets.write(),
105 StaticFileSegment::StorageChangeSets => self.storage_change_sets.write(),
106 };
107
108 if write_guard.is_none() {
109 *write_guard = Some(create_fn()?);
110 }
111
112 Ok(StaticFileProviderRWRefMut(write_guard))
113 }
114
115 pub(crate) fn commit(&self) -> ProviderResult<()> {
116 debug!(target: "provider::static_file", "Committing all static file segments");
117
118 for writer_lock in [
119 &self.headers,
120 &self.transactions,
121 &self.receipts,
122 &self.transaction_senders,
123 &self.account_change_sets,
124 &self.storage_change_sets,
125 ] {
126 let mut writer = writer_lock.write();
127 if let Some(writer) = writer.as_mut() {
128 writer.commit()?;
129 }
130 }
131
132 debug!(target: "provider::static_file", "Committed all static file segments");
133 Ok(())
134 }
135
136 pub(crate) fn has_unwind_queued(&self) -> bool {
137 for writer_lock in [
138 &self.headers,
139 &self.transactions,
140 &self.receipts,
141 &self.transaction_senders,
142 &self.account_change_sets,
143 &self.storage_change_sets,
144 ] {
145 let writer = writer_lock.read();
146 if let Some(writer) = writer.as_ref() &&
147 writer.will_prune_on_commit()
148 {
149 return true
150 }
151 }
152 false
153 }
154
155 pub(crate) fn finalize(&self) -> ProviderResult<()> {
160 debug!(target: "provider::static_file", "Finalizing all static file segments into disk");
161
162 for writer_lock in [
163 &self.headers,
164 &self.transactions,
165 &self.receipts,
166 &self.transaction_senders,
167 &self.account_change_sets,
168 &self.storage_change_sets,
169 ] {
170 let mut writer = writer_lock.write();
171 if let Some(writer) = writer.as_mut() {
172 writer.finalize()?;
173 }
174 }
175
176 debug!(target: "provider::static_file", "Finalized all static file segments into disk");
177 Ok(())
178 }
179}
180
181#[derive(Debug)]
183pub struct StaticFileProviderRWRefMut<'a, N>(
184 pub(crate) RwLockWriteGuard<'a, RawRwLock, Option<StaticFileProviderRW<N>>>,
185);
186
187impl<N> std::ops::DerefMut for StaticFileProviderRWRefMut<'_, N> {
188 fn deref_mut(&mut self) -> &mut Self::Target {
189 self.0.as_mut().expect("static file writer provider should be init")
191 }
192}
193
194impl<N> std::ops::Deref for StaticFileProviderRWRefMut<'_, N> {
195 type Target = StaticFileProviderRW<N>;
196
197 fn deref(&self) -> &Self::Target {
198 self.0.as_ref().expect("static file writer provider should be init")
200 }
201}
202
203#[derive(Debug)]
204pub struct StaticFileProviderRW<N> {
206 reader: Weak<StaticFileProviderInner<N>>,
210 writer: NippyJarWriter<SegmentHeader>,
212 data_path: PathBuf,
214 buf: Vec<u8>,
216 metrics: Option<Arc<StaticFileProviderMetrics>>,
218 prune_on_commit: Option<PruneStrategy>,
220 synced: bool,
222}
223
224impl<N: NodePrimitives> StaticFileProviderRW<N> {
225 pub fn new(
230 segment: StaticFileSegment,
231 block: BlockNumber,
232 reader: Weak<StaticFileProviderInner<N>>,
233 metrics: Option<Arc<StaticFileProviderMetrics>>,
234 ) -> ProviderResult<Self> {
235 let (writer, data_path) = Self::open(segment, block, reader.clone(), metrics.clone())?;
236 let mut writer = Self {
237 writer,
238 data_path,
239 buf: Vec::with_capacity(100),
240 reader,
241 metrics,
242 prune_on_commit: None,
243 synced: false,
244 };
245
246 writer.ensure_end_range_consistency()?;
247
248 Ok(writer)
249 }
250
251 fn open(
252 segment: StaticFileSegment,
253 block: u64,
254 reader: Weak<StaticFileProviderInner<N>>,
255 metrics: Option<Arc<StaticFileProviderMetrics>>,
256 ) -> ProviderResult<(NippyJarWriter<SegmentHeader>, PathBuf)> {
257 let start = Instant::now();
258
259 let static_file_provider = Self::upgrade_provider_to_strong_reference(&reader);
260
261 let block_range = static_file_provider.find_fixed_range(segment, block);
262 let (jar, path) = match static_file_provider.get_segment_provider_for_block(
263 segment,
264 block_range.start(),
265 None,
266 ) {
267 Ok(provider) => (
268 NippyJar::load(provider.data_path()).map_err(ProviderError::other)?,
269 provider.data_path().into(),
270 ),
271 Err(ProviderError::MissingStaticFileBlock(_, _)) => {
272 let path = static_file_provider.directory().join(segment.filename(&block_range));
273 (create_jar(segment, &path, block_range), path)
274 }
275 Err(err) => return Err(err),
276 };
277
278 let result = match NippyJarWriter::new(jar) {
279 Ok(writer) => Ok((writer, path)),
280 Err(NippyJarError::FrozenJar) => {
281 Err(ProviderError::FinalizedStaticFile(segment, block))
283 }
284 Err(e) => Err(ProviderError::other(e)),
285 }?;
286
287 if let Some(metrics) = &metrics {
288 metrics.record_segment_operation(
289 segment,
290 StaticFileProviderOperation::OpenWriter,
291 Some(start.elapsed()),
292 );
293 }
294
295 Ok(result)
296 }
297
298 fn ensure_end_range_consistency(&mut self) -> ProviderResult<()> {
307 let expected_rows = if self.user_header().segment().is_headers() {
309 self.user_header().block_len().unwrap_or_default()
310 } else {
311 self.user_header().tx_len().unwrap_or_default()
312 };
313 let actual_rows = self.writer.rows() as u64;
314 let pruned_rows = expected_rows.saturating_sub(actual_rows);
315 if pruned_rows > 0 {
316 self.user_header_mut().prune(pruned_rows);
317 }
318
319 debug!(
320 target: "provider::static_file",
321 segment = ?self.writer.user_header().segment(),
322 path = ?self.data_path,
323 pruned_rows,
324 "Ensuring end range consistency"
325 );
326
327 self.writer.commit().map_err(ProviderError::other)?;
328
329 self.update_index()?;
331 Ok(())
332 }
333
334 pub const fn will_prune_on_commit(&self) -> bool {
336 self.prune_on_commit.is_some()
337 }
338
339 pub fn sync_all(&mut self) -> ProviderResult<()> {
346 if self.prune_on_commit.is_some() {
347 return Err(StaticFileWriterError::FinalizeWithPruneQueued.into());
348 }
349 if self.writer.is_dirty() {
350 self.writer.sync_all().map_err(ProviderError::other)?;
351 }
352 self.synced = true;
353 Ok(())
354 }
355
356 pub fn finalize(&mut self) -> ProviderResult<()> {
362 if self.prune_on_commit.is_some() {
363 return Err(StaticFileWriterError::FinalizeWithPruneQueued.into());
364 }
365 if self.writer.is_dirty() {
366 if !self.synced {
367 self.writer.sync_all().map_err(ProviderError::other)?;
368 }
369
370 self.writer.finalize().map_err(ProviderError::other)?;
371 self.update_index()?;
372 }
373 self.synced = false;
374 Ok(())
375 }
376
377 pub fn commit(&mut self) -> ProviderResult<()> {
379 let start = Instant::now();
380
381 if let Some(strategy) = self.prune_on_commit.take() {
383 debug!(
384 target: "provider::static_file",
385 segment = ?self.writer.user_header().segment(),
386 "Pruning data on commit"
387 );
388 match strategy {
389 PruneStrategy::Headers { num_blocks } => self.prune_header_data(num_blocks)?,
390 PruneStrategy::Transactions { num_rows, last_block } => {
391 self.prune_transaction_data(num_rows, last_block)?
392 }
393 PruneStrategy::Receipts { num_rows, last_block } => {
394 self.prune_receipt_data(num_rows, last_block)?
395 }
396 PruneStrategy::TransactionSenders { num_rows, last_block } => {
397 self.prune_transaction_sender_data(num_rows, last_block)?
398 }
399 PruneStrategy::AccountChangeSets { last_block } => {
400 self.prune_account_changeset_data(last_block)?
401 }
402 PruneStrategy::StorageChangeSets { last_block } => {
403 self.prune_storage_changeset_data(last_block)?
404 }
405 }
406 }
407
408 if self.writer.is_dirty() {
409 debug!(
410 target: "provider::static_file",
411 segment = ?self.writer.user_header().segment(),
412 "Committing writer to disk"
413 );
414
415 self.writer.commit().map_err(ProviderError::other)?;
417
418 if let Some(metrics) = &self.metrics {
419 metrics.record_segment_operation(
420 self.writer.user_header().segment(),
421 StaticFileProviderOperation::CommitWriter,
422 Some(start.elapsed()),
423 );
424 }
425
426 debug!(
427 target: "provider::static_file",
428 segment = ?self.writer.user_header().segment(),
429 path = ?self.data_path,
430 duration = ?start.elapsed(),
431 "Committed writer to disk"
432 );
433
434 self.update_index()?;
435 }
436
437 Ok(())
438 }
439
440 #[cfg(feature = "test-utils")]
444 pub fn commit_without_sync_all(&mut self) -> ProviderResult<()> {
445 let start = Instant::now();
446
447 debug!(
448 target: "provider::static_file",
449 segment = ?self.writer.user_header().segment(),
450 "Committing writer to disk (without sync)"
451 );
452
453 self.writer.commit_without_sync_all().map_err(ProviderError::other)?;
455
456 if let Some(metrics) = &self.metrics {
457 metrics.record_segment_operation(
458 self.writer.user_header().segment(),
459 StaticFileProviderOperation::CommitWriter,
460 Some(start.elapsed()),
461 );
462 }
463
464 debug!(
465 target: "provider::static_file",
466 segment = ?self.writer.user_header().segment(),
467 path = ?self.data_path,
468 duration = ?start.elapsed(),
469 "Committed writer to disk (without sync)"
470 );
471
472 self.update_index()?;
473
474 Ok(())
475 }
476
477 fn update_index(&self) -> ProviderResult<()> {
479 let segment_max_block = self
488 .writer
489 .user_header()
490 .block_range()
491 .as_ref()
492 .map(|block_range| block_range.end())
493 .or_else(|| {
494 (self.writer.user_header().expected_block_start() >
495 self.reader().genesis_block_number())
496 .then(|| self.writer.user_header().expected_block_start() - 1)
497 });
498
499 self.reader().update_index(self.writer.user_header().segment(), segment_max_block)
500 }
501
502 pub fn ensure_at_block(&mut self, advance_to: BlockNumber) -> ProviderResult<()> {
507 let current_block = if let Some(current_block_number) = self.current_block_number() {
508 current_block_number
509 } else {
510 self.increment_block(0)?;
511 0
512 };
513
514 match current_block.cmp(&advance_to) {
515 Ordering::Less => {
516 for block in current_block + 1..=advance_to {
517 self.increment_block(block)?;
518 }
519 }
520 Ordering::Equal => {}
521 Ordering::Greater => {
522 return Err(ProviderError::UnexpectedStaticFileBlockNumber(
523 self.writer.user_header().segment(),
524 current_block,
525 advance_to,
526 ));
527 }
528 }
529
530 Ok(())
531 }
532
533 pub fn increment_block(&mut self, expected_block_number: BlockNumber) -> ProviderResult<()> {
536 let segment = self.writer.user_header().segment();
537
538 self.check_next_block_number(expected_block_number)?;
539
540 let start = Instant::now();
541 if let Some(last_block) = self.writer.user_header().block_end() {
542 if last_block == self.writer.user_header().expected_block_end() {
544 self.commit()?;
546
547 let (writer, data_path) =
549 Self::open(segment, last_block + 1, self.reader.clone(), self.metrics.clone())?;
550 self.writer = writer;
551 self.data_path = data_path;
552
553 *self.writer.user_header_mut() = SegmentHeader::new(
554 self.reader().find_fixed_range(segment, last_block + 1),
555 None,
556 None,
557 segment,
558 );
559 }
560 }
561
562 self.writer.user_header_mut().increment_block();
563 if let Some(metrics) = &self.metrics {
564 metrics.record_segment_operation(
565 segment,
566 StaticFileProviderOperation::IncrementBlock,
567 Some(start.elapsed()),
568 );
569 }
570
571 Ok(())
572 }
573
574 pub fn current_block_number(&self) -> Option<u64> {
576 self.writer.user_header().block_end()
577 }
578
579 pub fn next_block_number(&self) -> u64 {
581 self.writer
585 .user_header()
586 .block_end()
587 .map(|b| b + 1)
588 .unwrap_or_else(|| self.writer.user_header().expected_block_start())
589 }
590
591 fn check_next_block_number(&self, expected_block_number: u64) -> ProviderResult<()> {
594 let next_static_file_block = self.next_block_number();
595
596 if expected_block_number != next_static_file_block {
597 return Err(ProviderError::UnexpectedStaticFileBlockNumber(
598 self.writer.user_header().segment(),
599 expected_block_number,
600 next_static_file_block,
601 ))
602 }
603 Ok(())
604 }
605
606 fn truncate_changesets(&mut self, last_block: u64) -> ProviderResult<()> {
612 let segment = self.writer.user_header().segment();
613 debug_assert!(segment.is_change_based());
614
615 let current_block_end = self
617 .writer
618 .user_header()
619 .block_end()
620 .ok_or(ProviderError::MissingStaticFileBlock(segment, 0))?;
621
622 if current_block_end <= last_block {
624 return Ok(())
625 }
626
627 let mut expected_block_start = self.writer.user_header().expected_block_start();
629 while last_block < expected_block_start && expected_block_start > 0 {
630 self.delete_current_and_open_previous()?;
631 expected_block_start = self.writer.user_header().expected_block_start();
632 }
633
634 let changeset_offsets = self.writer.user_header().changeset_offsets().ok_or_else(|| {
638 ProviderError::other(StaticFileWriterError::new("Missing changeset offsets"))
639 })?;
640
641 let blocks_to_keep = if last_block >= expected_block_start {
643 last_block - expected_block_start + 1
644 } else {
645 0
646 };
647
648 let rows_to_keep = if blocks_to_keep == 0 {
649 0
650 } else if blocks_to_keep as usize > changeset_offsets.len() {
651 self.writer.rows() as u64
653 } else if blocks_to_keep as usize == changeset_offsets.len() {
654 self.writer.rows() as u64
656 } else {
657 changeset_offsets[blocks_to_keep as usize].offset()
660 };
661
662 let total_rows = self.writer.rows() as u64;
663 let rows_to_delete = total_rows.saturating_sub(rows_to_keep);
664
665 if rows_to_delete > 0 {
666 let current_block_end = self
668 .writer
669 .user_header()
670 .block_end()
671 .ok_or(ProviderError::MissingStaticFileBlock(segment, 0))?;
672 let blocks_to_remove = current_block_end - last_block;
673
674 self.writer.user_header_mut().prune(blocks_to_remove);
676
677 self.writer.prune_rows(rows_to_delete as usize).map_err(ProviderError::other)?;
679 }
680
681 self.writer.user_header_mut().set_block_range(expected_block_start, last_block);
683
684 self.writer.user_header_mut().sync_changeset_offsets();
686
687 self.commit()?;
689
690 Ok(())
691 }
692
693 fn truncate(&mut self, num_rows: u64, last_block: Option<u64>) -> ProviderResult<()> {
701 let mut remaining_rows = num_rows;
702 let segment = self.writer.user_header().segment();
703 while remaining_rows > 0 {
704 let len = if segment.is_block_based() {
705 self.writer.user_header().block_len().unwrap_or_default()
706 } else {
707 self.writer.user_header().tx_len().unwrap_or_default()
708 };
709
710 if remaining_rows >= len {
711 let block_start = self.writer.user_header().expected_block_start();
714
715 if block_start != 0 &&
721 (segment.is_headers() || last_block.is_some_and(|b| b < block_start))
722 {
723 self.delete_current_and_open_previous()?;
724 } else {
725 self.writer.user_header_mut().prune(len);
727 self.writer.prune_rows(len as usize).map_err(ProviderError::other)?;
728 break
729 }
730
731 remaining_rows -= len;
732 } else {
733 self.writer.user_header_mut().prune(remaining_rows);
735
736 self.writer.prune_rows(remaining_rows as usize).map_err(ProviderError::other)?;
738 remaining_rows = 0;
739 }
740 }
741
742 if let Some(last_block) = last_block {
744 let mut expected_block_start = self.writer.user_header().expected_block_start();
745
746 if num_rows == 0 {
747 while last_block < expected_block_start {
751 self.delete_current_and_open_previous()?;
752 expected_block_start = self.writer.user_header().expected_block_start();
753 }
754 }
755 self.writer.user_header_mut().set_block_range(expected_block_start, last_block);
756 }
757
758 self.commit()?;
760
761 Ok(())
762 }
763
764 fn delete_current_and_open_previous(&mut self) -> Result<(), ProviderError> {
767 let current_path = self.data_path.clone();
768 let (previous_writer, data_path) = Self::open(
769 self.user_header().segment(),
770 self.writer.user_header().expected_block_start() - 1,
771 self.reader.clone(),
772 self.metrics.clone(),
773 )?;
774 self.writer = previous_writer;
775 self.writer.set_dirty();
776 self.data_path = data_path;
777 NippyJar::<SegmentHeader>::load(¤t_path)
778 .map_err(ProviderError::other)?
779 .delete()
780 .map_err(ProviderError::other)?;
781 Ok(())
782 }
783
784 fn append_column<T: Compact>(&mut self, column: T) -> ProviderResult<()> {
786 self.buf.clear();
787 column.to_compact(&mut self.buf);
788
789 self.writer.append_column(Some(Ok(&self.buf))).map_err(ProviderError::other)?;
790 Ok(())
791 }
792
793 fn append_with_tx_number<V: Compact>(
795 &mut self,
796 tx_num: TxNumber,
797 value: V,
798 ) -> ProviderResult<()> {
799 if let Some(range) = self.writer.user_header().tx_range() {
800 let next_tx = range.end() + 1;
801 if next_tx != tx_num {
802 return Err(ProviderError::UnexpectedStaticFileTxNumber(
803 self.writer.user_header().segment(),
804 tx_num,
805 next_tx,
806 ))
807 }
808 self.writer.user_header_mut().increment_tx();
809 } else {
810 self.writer.user_header_mut().set_tx_range(tx_num, tx_num);
811 }
812
813 self.append_column(value)?;
814
815 Ok(())
816 }
817
818 fn append_change<V: Compact>(&mut self, change: &V) -> ProviderResult<()> {
820 if self.writer.user_header().changeset_offsets().is_some() {
821 self.writer.user_header_mut().increment_block_changes();
822 }
823
824 self.append_column(change)?;
825 Ok(())
826 }
827
828 pub fn append_header(&mut self, header: &N::BlockHeader, hash: &BlockHash) -> ProviderResult<()>
833 where
834 N::BlockHeader: Compact,
835 {
836 self.append_header_with_td(header, U256::ZERO, hash)
837 }
838
839 pub fn append_header_with_td(
844 &mut self,
845 header: &N::BlockHeader,
846 total_difficulty: U256,
847 hash: &BlockHash,
848 ) -> ProviderResult<()>
849 where
850 N::BlockHeader: Compact,
851 {
852 let start = Instant::now();
853 self.ensure_no_queued_prune()?;
854
855 debug_assert!(self.writer.user_header().segment() == StaticFileSegment::Headers);
856
857 self.increment_block(header.number())?;
858
859 self.append_column(header)?;
860 self.append_column(CompactU256::from(total_difficulty))?;
861 self.append_column(hash)?;
862
863 if let Some(metrics) = &self.metrics {
864 metrics.record_segment_operation(
865 StaticFileSegment::Headers,
866 StaticFileProviderOperation::Append,
867 Some(start.elapsed()),
868 );
869 }
870
871 Ok(())
872 }
873
874 pub fn append_header_direct(
877 &mut self,
878 header: &N::BlockHeader,
879 total_difficulty: U256,
880 hash: &BlockHash,
881 ) -> ProviderResult<()>
882 where
883 N::BlockHeader: Compact,
884 {
885 let start = Instant::now();
886 self.ensure_no_queued_prune()?;
887
888 debug_assert!(self.writer.user_header().segment() == StaticFileSegment::Headers);
889
890 self.append_column(header)?;
891 self.append_column(CompactU256::from(total_difficulty))?;
892 self.append_column(hash)?;
893
894 if let Some(metrics) = &self.metrics {
895 metrics.record_segment_operation(
896 StaticFileSegment::Headers,
897 StaticFileProviderOperation::Append,
898 Some(start.elapsed()),
899 );
900 }
901
902 Ok(())
903 }
904
905 pub fn append_transaction(&mut self, tx_num: TxNumber, tx: &N::SignedTx) -> ProviderResult<()>
910 where
911 N::SignedTx: Compact,
912 {
913 let start = Instant::now();
914 self.ensure_no_queued_prune()?;
915
916 debug_assert!(self.writer.user_header().segment() == StaticFileSegment::Transactions);
917 self.append_with_tx_number(tx_num, tx)?;
918
919 if let Some(metrics) = &self.metrics {
920 metrics.record_segment_operation(
921 StaticFileSegment::Transactions,
922 StaticFileProviderOperation::Append,
923 Some(start.elapsed()),
924 );
925 }
926
927 Ok(())
928 }
929
930 pub fn append_receipt(&mut self, tx_num: TxNumber, receipt: &N::Receipt) -> ProviderResult<()>
935 where
936 N::Receipt: Compact,
937 {
938 let start = Instant::now();
939 self.ensure_no_queued_prune()?;
940
941 debug_assert!(self.writer.user_header().segment() == StaticFileSegment::Receipts);
942 self.append_with_tx_number(tx_num, receipt)?;
943
944 if let Some(metrics) = &self.metrics {
945 metrics.record_segment_operation(
946 StaticFileSegment::Receipts,
947 StaticFileProviderOperation::Append,
948 Some(start.elapsed()),
949 );
950 }
951
952 Ok(())
953 }
954
955 pub fn append_receipts<I, R>(&mut self, receipts: I) -> ProviderResult<()>
957 where
958 I: Iterator<Item = Result<(TxNumber, R), ProviderError>>,
959 R: Borrow<N::Receipt>,
960 N::Receipt: Compact,
961 {
962 debug_assert!(self.writer.user_header().segment() == StaticFileSegment::Receipts);
963
964 let mut receipts_iter = receipts.into_iter().peekable();
965 if receipts_iter.peek().is_none() {
967 return Ok(());
968 }
969
970 let start = Instant::now();
971 self.ensure_no_queued_prune()?;
972
973 let mut count: u64 = 0;
975
976 for receipt_result in receipts_iter {
977 let (tx_num, receipt) = receipt_result?;
978 self.append_with_tx_number(tx_num, receipt.borrow())?;
979 count += 1;
980 }
981
982 if let Some(metrics) = &self.metrics {
983 metrics.record_segment_operations(
984 StaticFileSegment::Receipts,
985 StaticFileProviderOperation::Append,
986 count,
987 Some(start.elapsed()),
988 );
989 }
990
991 Ok(())
992 }
993
994 pub fn append_transaction_sender(
999 &mut self,
1000 tx_num: TxNumber,
1001 sender: &alloy_primitives::Address,
1002 ) -> ProviderResult<()> {
1003 let start = Instant::now();
1004 self.ensure_no_queued_prune()?;
1005
1006 debug_assert!(self.writer.user_header().segment() == StaticFileSegment::TransactionSenders);
1007 self.append_with_tx_number(tx_num, sender)?;
1008
1009 if let Some(metrics) = &self.metrics {
1010 metrics.record_segment_operation(
1011 StaticFileSegment::TransactionSenders,
1012 StaticFileProviderOperation::Append,
1013 Some(start.elapsed()),
1014 );
1015 }
1016
1017 Ok(())
1018 }
1019
1020 pub fn append_transaction_senders<I>(&mut self, senders: I) -> ProviderResult<()>
1022 where
1023 I: Iterator<Item = (TxNumber, alloy_primitives::Address)>,
1024 {
1025 debug_assert!(self.writer.user_header().segment() == StaticFileSegment::TransactionSenders);
1026
1027 let mut senders_iter = senders.into_iter().peekable();
1028 if senders_iter.peek().is_none() {
1030 return Ok(());
1031 }
1032
1033 let start = Instant::now();
1034 self.ensure_no_queued_prune()?;
1035
1036 let mut count: u64 = 0;
1038 for (tx_num, sender) in senders_iter {
1039 self.append_with_tx_number(tx_num, sender)?;
1040 count += 1;
1041 }
1042
1043 if let Some(metrics) = &self.metrics {
1044 metrics.record_segment_operations(
1045 StaticFileSegment::TransactionSenders,
1046 StaticFileProviderOperation::Append,
1047 count,
1048 Some(start.elapsed()),
1049 );
1050 }
1051
1052 Ok(())
1053 }
1054
1055 pub fn append_account_changeset(
1061 &mut self,
1062 mut changeset: Vec<AccountBeforeTx>,
1063 block_number: u64,
1064 ) -> ProviderResult<()> {
1065 debug_assert!(self.writer.user_header().segment() == StaticFileSegment::AccountChangeSets);
1066 let start = Instant::now();
1067
1068 self.increment_block(block_number)?;
1069 self.ensure_no_queued_prune()?;
1070
1071 changeset.sort_by_key(|change| change.address);
1073
1074 let mut count: u64 = 0;
1075
1076 for change in changeset {
1077 self.append_change(&change)?;
1078 count += 1;
1079 }
1080
1081 if let Some(metrics) = &self.metrics {
1082 metrics.record_segment_operations(
1083 StaticFileSegment::AccountChangeSets,
1084 StaticFileProviderOperation::Append,
1085 count,
1086 Some(start.elapsed()),
1087 );
1088 }
1089
1090 Ok(())
1091 }
1092
1093 pub fn append_storage_changeset(
1097 &mut self,
1098 mut changeset: Vec<StorageBeforeTx>,
1099 block_number: u64,
1100 ) -> ProviderResult<()> {
1101 debug_assert!(self.writer.user_header().segment() == StaticFileSegment::StorageChangeSets);
1102 let start = Instant::now();
1103
1104 self.increment_block(block_number)?;
1105 self.ensure_no_queued_prune()?;
1106
1107 changeset.sort_by_key(|change| (change.address, change.key));
1109
1110 let mut count: u64 = 0;
1111 for change in changeset {
1112 self.append_change(&change)?;
1113 count += 1;
1114 }
1115
1116 if let Some(metrics) = &self.metrics {
1117 metrics.record_segment_operations(
1118 StaticFileSegment::StorageChangeSets,
1119 StaticFileProviderOperation::Append,
1120 count,
1121 Some(start.elapsed()),
1122 );
1123 }
1124
1125 Ok(())
1126 }
1127
1128 pub fn prune_transactions(
1132 &mut self,
1133 to_delete: u64,
1134 last_block: BlockNumber,
1135 ) -> ProviderResult<()> {
1136 debug_assert_eq!(self.writer.user_header().segment(), StaticFileSegment::Transactions);
1137 self.queue_prune(PruneStrategy::Transactions { num_rows: to_delete, last_block })
1138 }
1139
1140 pub fn prune_receipts(
1144 &mut self,
1145 to_delete: u64,
1146 last_block: BlockNumber,
1147 ) -> ProviderResult<()> {
1148 debug_assert_eq!(self.writer.user_header().segment(), StaticFileSegment::Receipts);
1149 self.queue_prune(PruneStrategy::Receipts { num_rows: to_delete, last_block })
1150 }
1151
1152 pub fn prune_transaction_senders(
1156 &mut self,
1157 to_delete: u64,
1158 last_block: BlockNumber,
1159 ) -> ProviderResult<()> {
1160 debug_assert_eq!(
1161 self.writer.user_header().segment(),
1162 StaticFileSegment::TransactionSenders
1163 );
1164 self.queue_prune(PruneStrategy::TransactionSenders { num_rows: to_delete, last_block })
1165 }
1166
1167 pub fn prune_headers(&mut self, to_delete: u64) -> ProviderResult<()> {
1169 debug_assert_eq!(self.writer.user_header().segment(), StaticFileSegment::Headers);
1170 self.queue_prune(PruneStrategy::Headers { num_blocks: to_delete })
1171 }
1172
1173 pub fn prune_account_changesets(&mut self, last_block: u64) -> ProviderResult<()> {
1175 debug_assert_eq!(self.writer.user_header().segment(), StaticFileSegment::AccountChangeSets);
1176 self.queue_prune(PruneStrategy::AccountChangeSets { last_block })
1177 }
1178
1179 pub fn prune_storage_changesets(&mut self, last_block: u64) -> ProviderResult<()> {
1181 debug_assert_eq!(self.writer.user_header().segment(), StaticFileSegment::StorageChangeSets);
1182 self.queue_prune(PruneStrategy::StorageChangeSets { last_block })
1183 }
1184
1185 fn queue_prune(&mut self, strategy: PruneStrategy) -> ProviderResult<()> {
1187 self.ensure_no_queued_prune()?;
1188 self.prune_on_commit = Some(strategy);
1189 Ok(())
1190 }
1191
1192 fn ensure_no_queued_prune(&self) -> ProviderResult<()> {
1194 if self.prune_on_commit.is_some() {
1195 return Err(ProviderError::other(StaticFileWriterError::new(
1196 "Pruning should be committed before appending or pruning more data",
1197 )));
1198 }
1199 Ok(())
1200 }
1201
1202 fn prune_transaction_data(
1204 &mut self,
1205 to_delete: u64,
1206 last_block: BlockNumber,
1207 ) -> ProviderResult<()> {
1208 let start = Instant::now();
1209
1210 debug_assert!(self.writer.user_header().segment() == StaticFileSegment::Transactions);
1211
1212 self.truncate(to_delete, Some(last_block))?;
1213
1214 if let Some(metrics) = &self.metrics {
1215 metrics.record_segment_operation(
1216 StaticFileSegment::Transactions,
1217 StaticFileProviderOperation::Prune,
1218 Some(start.elapsed()),
1219 );
1220 }
1221
1222 Ok(())
1223 }
1224
1225 fn prune_account_changeset_data(&mut self, last_block: BlockNumber) -> ProviderResult<()> {
1227 let start = Instant::now();
1228
1229 debug_assert!(self.writer.user_header().segment() == StaticFileSegment::AccountChangeSets);
1230
1231 self.truncate_changesets(last_block)?;
1232
1233 if let Some(metrics) = &self.metrics {
1234 metrics.record_segment_operation(
1235 StaticFileSegment::AccountChangeSets,
1236 StaticFileProviderOperation::Prune,
1237 Some(start.elapsed()),
1238 );
1239 }
1240
1241 Ok(())
1242 }
1243
1244 fn prune_storage_changeset_data(&mut self, last_block: BlockNumber) -> ProviderResult<()> {
1246 let start = Instant::now();
1247
1248 debug_assert!(self.writer.user_header().segment() == StaticFileSegment::StorageChangeSets);
1249
1250 self.truncate_changesets(last_block)?;
1251
1252 if let Some(metrics) = &self.metrics {
1253 metrics.record_segment_operation(
1254 StaticFileSegment::StorageChangeSets,
1255 StaticFileProviderOperation::Prune,
1256 Some(start.elapsed()),
1257 );
1258 }
1259
1260 Ok(())
1261 }
1262
1263 fn prune_receipt_data(
1265 &mut self,
1266 to_delete: u64,
1267 last_block: BlockNumber,
1268 ) -> ProviderResult<()> {
1269 let start = Instant::now();
1270
1271 debug_assert!(self.writer.user_header().segment() == StaticFileSegment::Receipts);
1272
1273 self.truncate(to_delete, Some(last_block))?;
1274
1275 if let Some(metrics) = &self.metrics {
1276 metrics.record_segment_operation(
1277 StaticFileSegment::Receipts,
1278 StaticFileProviderOperation::Prune,
1279 Some(start.elapsed()),
1280 );
1281 }
1282
1283 Ok(())
1284 }
1285
1286 fn prune_transaction_sender_data(
1288 &mut self,
1289 to_delete: u64,
1290 last_block: BlockNumber,
1291 ) -> ProviderResult<()> {
1292 let start = Instant::now();
1293
1294 debug_assert!(self.writer.user_header().segment() == StaticFileSegment::TransactionSenders);
1295
1296 self.truncate(to_delete, Some(last_block))?;
1297
1298 if let Some(metrics) = &self.metrics {
1299 metrics.record_segment_operation(
1300 StaticFileSegment::TransactionSenders,
1301 StaticFileProviderOperation::Prune,
1302 Some(start.elapsed()),
1303 );
1304 }
1305
1306 Ok(())
1307 }
1308
1309 fn prune_header_data(&mut self, to_delete: u64) -> ProviderResult<()> {
1311 let start = Instant::now();
1312
1313 debug_assert!(self.writer.user_header().segment() == StaticFileSegment::Headers);
1314
1315 self.truncate(to_delete, None)?;
1316
1317 if let Some(metrics) = &self.metrics {
1318 metrics.record_segment_operation(
1319 StaticFileSegment::Headers,
1320 StaticFileProviderOperation::Prune,
1321 Some(start.elapsed()),
1322 );
1323 }
1324
1325 Ok(())
1326 }
1327
1328 pub fn reader(&self) -> StaticFileProvider<N> {
1330 Self::upgrade_provider_to_strong_reference(&self.reader)
1331 }
1332
1333 fn upgrade_provider_to_strong_reference(
1342 provider: &Weak<StaticFileProviderInner<N>>,
1343 ) -> StaticFileProvider<N> {
1344 provider.upgrade().map(StaticFileProvider).expect("StaticFileProvider is dropped")
1345 }
1346
1347 pub const fn user_header(&self) -> &SegmentHeader {
1349 self.writer.user_header()
1350 }
1351
1352 pub const fn user_header_mut(&mut self) -> &mut SegmentHeader {
1354 self.writer.user_header_mut()
1355 }
1356
1357 #[cfg(any(test, feature = "test-utils"))]
1359 pub const fn set_block_range(&mut self, block_range: std::ops::RangeInclusive<BlockNumber>) {
1360 self.writer.user_header_mut().set_block_range(*block_range.start(), *block_range.end())
1361 }
1362
1363 #[cfg(any(test, feature = "test-utils"))]
1365 pub const fn inner(&mut self) -> &mut NippyJarWriter<SegmentHeader> {
1366 &mut self.writer
1367 }
1368}
1369
1370fn create_jar(
1371 segment: StaticFileSegment,
1372 path: &Path,
1373 expected_block_range: SegmentRangeInclusive,
1374) -> NippyJar<SegmentHeader> {
1375 let mut jar = NippyJar::new(
1376 segment.columns(),
1377 path,
1378 SegmentHeader::new(expected_block_range, None, None, segment),
1379 );
1380
1381 if segment.is_headers() {
1384 jar = jar.with_lz4();
1385 }
1386
1387 jar
1388}