1use super::{
2 manager::StaticFileProviderInner, metrics::StaticFileProviderMetrics, StaticFileProvider,
3};
4use crate::providers::static_file::metrics::StaticFileProviderOperation;
5use alloy_consensus::BlockHeader;
6use alloy_primitives::{BlockHash, BlockNumber, TxNumber, U256};
7use parking_lot::{lock_api::RwLockWriteGuard, RawRwLock, RwLock};
8use reth_codecs::Compact;
9use reth_db::models::{AccountBeforeTx, StorageBeforeTx};
10use reth_db_api::models::CompactU256;
11use reth_nippy_jar::{NippyJar, NippyJarError, NippyJarWriter};
12use reth_node_types::NodePrimitives;
13use reth_static_file_types::{
14 ChangesetOffset, ChangesetOffsetReader, ChangesetOffsetWriter, SegmentHeader,
15 SegmentRangeInclusive, StaticFileSegment,
16};
17use reth_storage_errors::provider::{ProviderError, ProviderResult, StaticFileWriterError};
18use std::{
19 borrow::Borrow,
20 cmp::Ordering,
21 fmt::Debug,
22 path::{Path, PathBuf},
23 sync::{Arc, Weak},
24 time::Instant,
25};
26use tracing::{debug, instrument};
27
28#[derive(Debug, Clone, Copy)]
30enum PruneStrategy {
31 Headers {
33 num_blocks: u64,
35 },
36 Transactions {
38 num_rows: u64,
40 last_block: BlockNumber,
42 },
43 Receipts {
45 num_rows: u64,
47 last_block: BlockNumber,
49 },
50 TransactionSenders {
52 num_rows: u64,
54 last_block: BlockNumber,
56 },
57 AccountChangeSets {
59 last_block: BlockNumber,
61 },
62 StorageChangeSets {
64 last_block: BlockNumber,
66 },
67}
68
69#[derive(Debug)]
74pub(crate) struct StaticFileWriters<N> {
75 headers: RwLock<Option<StaticFileProviderRW<N>>>,
76 transactions: RwLock<Option<StaticFileProviderRW<N>>>,
77 receipts: RwLock<Option<StaticFileProviderRW<N>>>,
78 transaction_senders: RwLock<Option<StaticFileProviderRW<N>>>,
79 account_change_sets: RwLock<Option<StaticFileProviderRW<N>>>,
80 storage_change_sets: RwLock<Option<StaticFileProviderRW<N>>>,
81}
82
83impl<N> Default for StaticFileWriters<N> {
84 fn default() -> Self {
85 Self {
86 headers: Default::default(),
87 transactions: Default::default(),
88 receipts: Default::default(),
89 transaction_senders: Default::default(),
90 account_change_sets: Default::default(),
91 storage_change_sets: Default::default(),
92 }
93 }
94}
95
96impl<N: NodePrimitives> StaticFileWriters<N> {
97 pub(crate) fn get_or_create(
98 &self,
99 segment: StaticFileSegment,
100 create_fn: impl FnOnce() -> ProviderResult<StaticFileProviderRW<N>>,
101 ) -> ProviderResult<StaticFileProviderRWRefMut<'_, N>> {
102 let mut write_guard = match segment {
103 StaticFileSegment::Headers => self.headers.write(),
104 StaticFileSegment::Transactions => self.transactions.write(),
105 StaticFileSegment::Receipts => self.receipts.write(),
106 StaticFileSegment::TransactionSenders => self.transaction_senders.write(),
107 StaticFileSegment::AccountChangeSets => self.account_change_sets.write(),
108 StaticFileSegment::StorageChangeSets => self.storage_change_sets.write(),
109 };
110
111 if write_guard.is_none() {
112 *write_guard = Some(create_fn()?);
113 }
114
115 Ok(StaticFileProviderRWRefMut(write_guard))
116 }
117
118 #[instrument(
119 name = "StaticFileWriters::commit",
120 level = "debug",
121 target = "providers::static_file",
122 skip_all
123 )]
124 pub(crate) fn commit(&self) -> ProviderResult<()> {
125 debug!(target: "providers::static_file", "Committing all static file segments");
126
127 for writer_lock in [
128 &self.headers,
129 &self.transactions,
130 &self.receipts,
131 &self.transaction_senders,
132 &self.account_change_sets,
133 &self.storage_change_sets,
134 ] {
135 let mut writer = writer_lock.write();
136 if let Some(writer) = writer.as_mut() {
137 writer.commit()?;
138 }
139 }
140
141 debug!(target: "providers::static_file", "Committed all static file segments");
142 Ok(())
143 }
144
145 pub(crate) fn has_unwind_queued(&self) -> bool {
146 for writer_lock in [
147 &self.headers,
148 &self.transactions,
149 &self.receipts,
150 &self.transaction_senders,
151 &self.account_change_sets,
152 &self.storage_change_sets,
153 ] {
154 let writer = writer_lock.read();
155 if let Some(writer) = writer.as_ref() &&
156 writer.will_prune_on_commit()
157 {
158 return true
159 }
160 }
161 false
162 }
163
164 #[instrument(
169 name = "StaticFileWriters::finalize",
170 level = "debug",
171 target = "providers::static_file",
172 skip_all
173 )]
174 pub(crate) fn finalize(&self) -> ProviderResult<()> {
175 debug!(target: "providers::static_file", "Finalizing all static file segments into disk");
176
177 for writer_lock in [
178 &self.headers,
179 &self.transactions,
180 &self.receipts,
181 &self.transaction_senders,
182 &self.account_change_sets,
183 &self.storage_change_sets,
184 ] {
185 let mut writer = writer_lock.write();
186 if let Some(writer) = writer.as_mut() {
187 writer.finalize()?;
188 }
189 }
190
191 debug!(target: "providers::static_file", "Finalized all static file segments into disk");
192 Ok(())
193 }
194}
195
196#[derive(Debug)]
198pub struct StaticFileProviderRWRefMut<'a, N>(
199 pub(crate) RwLockWriteGuard<'a, RawRwLock, Option<StaticFileProviderRW<N>>>,
200);
201
202impl<N> std::ops::DerefMut for StaticFileProviderRWRefMut<'_, N> {
203 fn deref_mut(&mut self) -> &mut Self::Target {
204 self.0.as_mut().expect("static file writer provider should be init")
206 }
207}
208
209impl<N> std::ops::Deref for StaticFileProviderRWRefMut<'_, N> {
210 type Target = StaticFileProviderRW<N>;
211
212 fn deref(&self) -> &Self::Target {
213 self.0.as_ref().expect("static file writer provider should be init")
215 }
216}
217
218#[derive(Debug)]
219pub struct StaticFileProviderRW<N> {
221 reader: Weak<StaticFileProviderInner<N>>,
226 writer: NippyJarWriter<SegmentHeader>,
228 data_path: PathBuf,
230 buf: Vec<u8>,
232 metrics: Option<Arc<StaticFileProviderMetrics>>,
234 prune_on_commit: Option<PruneStrategy>,
236 synced: bool,
238 changeset_offsets: Option<ChangesetOffsetWriter>,
240 current_changeset_offset: Option<ChangesetOffset>,
242}
243
244impl<N: NodePrimitives> StaticFileProviderRW<N> {
245 pub fn new(
250 segment: StaticFileSegment,
251 block: BlockNumber,
252 reader: Weak<StaticFileProviderInner<N>>,
253 metrics: Option<Arc<StaticFileProviderMetrics>>,
254 ) -> ProviderResult<Self> {
255 let (writer, data_path) = Self::open(segment, block, reader.clone(), metrics.clone())?;
256
257 let mut writer = Self {
259 writer,
260 data_path,
261 buf: Vec::with_capacity(100),
262 reader,
263 metrics,
264 prune_on_commit: None,
265 synced: false,
266 changeset_offsets: None,
267 current_changeset_offset: None,
268 };
269
270 writer.ensure_end_range_consistency()?;
273
274 if segment.is_change_based() {
276 writer.heal_changeset_sidecar()?;
277 }
278
279 Ok(writer)
280 }
281
282 fn open(
283 segment: StaticFileSegment,
284 block: u64,
285 reader: Weak<StaticFileProviderInner<N>>,
286 metrics: Option<Arc<StaticFileProviderMetrics>>,
287 ) -> ProviderResult<(NippyJarWriter<SegmentHeader>, PathBuf)> {
288 let start = Instant::now();
289
290 let static_file_provider = Self::upgrade_provider_to_strong_reference(&reader);
291
292 let block_range = static_file_provider.find_fixed_range(segment, block);
293 let (jar, path) = match static_file_provider.get_segment_provider_for_block(
294 segment,
295 block_range.start(),
296 None,
297 ) {
298 Ok(provider) => (
299 NippyJar::load(provider.data_path()).map_err(ProviderError::other)?,
300 provider.data_path().into(),
301 ),
302 Err(ProviderError::MissingStaticFileBlock(_, _)) => {
303 let path = static_file_provider.directory().join(segment.filename(&block_range));
304 (create_jar(segment, &path, block_range), path)
305 }
306 Err(err) => return Err(err),
307 };
308
309 let result = match NippyJarWriter::new(jar) {
310 Ok(writer) => Ok((writer, path)),
311 Err(NippyJarError::FrozenJar) => {
312 Err(ProviderError::FinalizedStaticFile(segment, block))
314 }
315 Err(e) => Err(ProviderError::other(e)),
316 }?;
317
318 if let Some(metrics) = &metrics {
319 metrics.record_segment_operation(
320 segment,
321 StaticFileProviderOperation::OpenWriter,
322 Some(start.elapsed()),
323 );
324 }
325
326 Ok(result)
327 }
328
329 fn ensure_end_range_consistency(&mut self) -> ProviderResult<()> {
338 let expected_rows = if self.user_header().segment().is_headers() {
340 self.user_header().block_len().unwrap_or_default()
341 } else {
342 self.user_header().tx_len().unwrap_or_default()
343 };
344 let actual_rows = self.writer.rows() as u64;
345 let pruned_rows = expected_rows.saturating_sub(actual_rows);
346 if pruned_rows > 0 {
347 self.user_header_mut().prune(pruned_rows);
348 }
349
350 debug!(
351 target: "providers::static_file",
352 segment = ?self.writer.user_header().segment(),
353 path = ?self.data_path,
354 pruned_rows,
355 "Ensuring end range consistency"
356 );
357
358 self.writer.commit().map_err(ProviderError::other)?;
359
360 self.update_index()?;
362 Ok(())
363 }
364
365 pub const fn will_prune_on_commit(&self) -> bool {
367 self.prune_on_commit.is_some()
368 }
369
370 fn heal_changeset_sidecar(&mut self) -> ProviderResult<()> {
378 let csoff_path = self.data_path.with_extension("csoff");
379
380 let header_claims_blocks = self.writer.user_header().changeset_offsets_len();
382 let actual_nippy_rows = self.writer.rows() as u64;
383
384 let actual_sidecar_blocks = if csoff_path.exists() {
386 let file_len = reth_fs_util::metadata(&csoff_path).map_err(ProviderError::other)?.len();
387 let aligned_len = file_len - (file_len % 16);
389 aligned_len / 16
390 } else {
391 0
392 };
393
394 if header_claims_blocks == 0 && actual_sidecar_blocks == 0 {
396 self.changeset_offsets =
397 Some(ChangesetOffsetWriter::new(&csoff_path, 0).map_err(ProviderError::other)?);
398 return Ok(());
399 }
400
401 let valid_blocks = if actual_sidecar_blocks > 0 {
403 let mut reader = ChangesetOffsetReader::new(&csoff_path, actual_sidecar_blocks)
404 .map_err(ProviderError::other)?;
405
406 let mut valid = 0u64;
409 for i in 0..actual_sidecar_blocks {
410 if let Some(offset) = reader.get(i).map_err(ProviderError::other)? {
411 if offset.offset() + offset.num_changes() <= actual_nippy_rows {
412 valid = i + 1;
413 } else {
414 break;
416 }
417 }
418 }
419 valid
420 } else {
421 0
422 };
423
424 let correct_blocks = valid_blocks.min(header_claims_blocks);
427
428 let mut needs_header_commit = false;
430
431 if correct_blocks != header_claims_blocks || actual_sidecar_blocks != correct_blocks {
432 tracing::warn!(
433 target: "reth::static_file",
434 path = %csoff_path.display(),
435 header_claims = header_claims_blocks,
436 sidecar_has = actual_sidecar_blocks,
437 valid_blocks = correct_blocks,
438 actual_rows = actual_nippy_rows,
439 "Three-way healing: syncing header, sidecar, and NippyJar state"
440 );
441
442 if actual_sidecar_blocks > correct_blocks {
444 use std::fs::OpenOptions;
445 let file = OpenOptions::new()
446 .write(true)
447 .open(&csoff_path)
448 .map_err(ProviderError::other)?;
449 file.set_len(correct_blocks * 16).map_err(ProviderError::other)?;
450 file.sync_all().map_err(ProviderError::other)?;
451
452 tracing::debug!(
453 target: "reth::static_file",
454 "Truncated sidecar from {} to {} blocks",
455 actual_sidecar_blocks,
456 correct_blocks
457 );
458 }
459
460 if correct_blocks < header_claims_blocks {
462 let blocks_removed = header_claims_blocks - correct_blocks;
465 self.writer.user_header_mut().prune(blocks_removed);
466
467 tracing::debug!(
468 target: "reth::static_file",
469 "Updated header: removed {} blocks (changeset_offsets_len: {} -> {})",
470 blocks_removed,
471 header_claims_blocks,
472 correct_blocks
473 );
474
475 needs_header_commit = true;
476 }
477 } else {
478 tracing::debug!(
479 target: "reth::static_file",
480 path = %csoff_path.display(),
481 blocks = correct_blocks,
482 "Changeset sidecar consistent, no healing needed"
483 );
484 }
485
486 let csoff_writer = ChangesetOffsetWriter::new(&csoff_path, correct_blocks)
488 .map_err(ProviderError::other)?;
489
490 self.changeset_offsets = Some(csoff_writer);
491
492 if needs_header_commit {
494 self.writer.commit().map_err(ProviderError::other)?;
495
496 tracing::info!(
497 target: "reth::static_file",
498 path = %csoff_path.display(),
499 blocks = correct_blocks,
500 "Committed healed changeset offset header"
501 );
502 }
503
504 Ok(())
505 }
506
507 fn flush_current_changeset_offset(&mut self) -> ProviderResult<()> {
515 if !self.writer.user_header().segment().is_change_based() {
516 return Ok(());
517 }
518
519 if let Some(offset) = self.current_changeset_offset.take() &&
520 let Some(writer) = &mut self.changeset_offsets
521 {
522 writer.append(&offset).map_err(ProviderError::other)?;
523 }
524 Ok(())
525 }
526
527 pub fn sync_all(&mut self) -> ProviderResult<()> {
534 if self.prune_on_commit.is_some() {
535 return Err(StaticFileWriterError::FinalizeWithPruneQueued.into());
536 }
537
538 self.flush_current_changeset_offset()?;
540 if let Some(writer) = &mut self.changeset_offsets {
541 writer.sync().map_err(ProviderError::other)?;
542 self.writer.user_header_mut().set_changeset_offsets_len(writer.len());
544 }
545
546 if self.writer.is_dirty() {
547 self.writer.sync_all().map_err(ProviderError::other)?;
548 }
549 self.synced = true;
550 Ok(())
551 }
552
553 #[instrument(
559 name = "StaticFileProviderRW::finalize",
560 level = "debug",
561 target = "providers::static_file",
562 skip_all
563 )]
564 pub fn finalize(&mut self) -> ProviderResult<()> {
565 if self.prune_on_commit.is_some() {
566 return Err(StaticFileWriterError::FinalizeWithPruneQueued.into());
567 }
568 if self.writer.is_dirty() {
569 if !self.synced {
570 self.sync_all()?;
573 }
574
575 self.writer.finalize().map_err(ProviderError::other)?;
576 self.update_index()?;
577 }
578 self.synced = false;
579 Ok(())
580 }
581
582 #[instrument(
584 name = "StaticFileProviderRW::commit",
585 level = "debug",
586 target = "providers::static_file",
587 skip_all
588 )]
589 pub fn commit(&mut self) -> ProviderResult<()> {
590 let start = Instant::now();
591
592 if let Some(strategy) = self.prune_on_commit.take() {
594 debug!(
595 target: "providers::static_file",
596 segment = ?self.writer.user_header().segment(),
597 "Pruning data on commit"
598 );
599 match strategy {
600 PruneStrategy::Headers { num_blocks } => self.prune_header_data(num_blocks)?,
601 PruneStrategy::Transactions { num_rows, last_block } => {
602 self.prune_transaction_data(num_rows, last_block)?
603 }
604 PruneStrategy::Receipts { num_rows, last_block } => {
605 self.prune_receipt_data(num_rows, last_block)?
606 }
607 PruneStrategy::TransactionSenders { num_rows, last_block } => {
608 self.prune_transaction_sender_data(num_rows, last_block)?
609 }
610 PruneStrategy::AccountChangeSets { last_block } => {
611 self.prune_account_changeset_data(last_block)?
612 }
613 PruneStrategy::StorageChangeSets { last_block } => {
614 self.prune_storage_changeset_data(last_block)?
615 }
616 }
617 }
618
619 self.flush_current_changeset_offset()?;
622 if let Some(writer) = &mut self.changeset_offsets {
623 writer.sync().map_err(ProviderError::other)?;
624 self.writer.user_header_mut().set_changeset_offsets_len(writer.len());
626 }
627
628 if self.writer.is_dirty() {
629 debug!(
630 target: "providers::static_file",
631 segment = ?self.writer.user_header().segment(),
632 "Committing writer to disk"
633 );
634
635 self.writer.commit().map_err(ProviderError::other)?;
637
638 if let Some(metrics) = &self.metrics {
639 metrics.record_segment_operation(
640 self.writer.user_header().segment(),
641 StaticFileProviderOperation::CommitWriter,
642 Some(start.elapsed()),
643 );
644 }
645
646 debug!(
647 target: "providers::static_file",
648 segment = ?self.writer.user_header().segment(),
649 path = ?self.data_path,
650 duration = ?start.elapsed(),
651 "Committed writer to disk"
652 );
653
654 self.update_index()?;
655 }
656
657 Ok(())
658 }
659
660 #[cfg(feature = "test-utils")]
664 pub fn commit_without_sync_all(&mut self) -> ProviderResult<()> {
665 let start = Instant::now();
666
667 debug!(
668 target: "providers::static_file",
669 segment = ?self.writer.user_header().segment(),
670 "Committing writer to disk (without sync)"
671 );
672
673 self.writer.commit_without_sync_all().map_err(ProviderError::other)?;
675
676 if let Some(metrics) = &self.metrics {
677 metrics.record_segment_operation(
678 self.writer.user_header().segment(),
679 StaticFileProviderOperation::CommitWriter,
680 Some(start.elapsed()),
681 );
682 }
683
684 debug!(
685 target: "providers::static_file",
686 segment = ?self.writer.user_header().segment(),
687 path = ?self.data_path,
688 duration = ?start.elapsed(),
689 "Committed writer to disk (without sync)"
690 );
691
692 self.update_index()?;
693
694 Ok(())
695 }
696
697 fn update_index(&self) -> ProviderResult<()> {
699 let segment_max_block = self
708 .writer
709 .user_header()
710 .block_range()
711 .as_ref()
712 .map(|block_range| block_range.end())
713 .or_else(|| {
714 (self.writer.user_header().expected_block_start() >
715 self.reader().genesis_block_number())
716 .then(|| self.writer.user_header().expected_block_start() - 1)
717 });
718
719 self.reader().update_index(self.writer.user_header().segment(), segment_max_block)
720 }
721
722 pub fn ensure_at_block(&mut self, advance_to: BlockNumber) -> ProviderResult<()> {
727 let current_block = if let Some(current_block_number) = self.current_block_number() {
728 current_block_number
729 } else {
730 self.increment_block(0)?;
731 0
732 };
733
734 match current_block.cmp(&advance_to) {
735 Ordering::Less => {
736 for block in current_block + 1..=advance_to {
737 self.increment_block(block)?;
738 }
739 }
740 Ordering::Equal => {}
741 Ordering::Greater => {
742 return Err(ProviderError::UnexpectedStaticFileBlockNumber(
743 self.writer.user_header().segment(),
744 current_block,
745 advance_to,
746 ));
747 }
748 }
749
750 Ok(())
751 }
752
753 pub fn increment_block(&mut self, expected_block_number: BlockNumber) -> ProviderResult<()> {
756 let segment = self.writer.user_header().segment();
757
758 self.check_next_block_number(expected_block_number)?;
759
760 let start = Instant::now();
761 if let Some(last_block) = self.writer.user_header().block_end() {
762 if last_block == self.writer.user_header().expected_block_end() {
764 self.commit()?;
766
767 let (writer, data_path) =
769 Self::open(segment, last_block + 1, self.reader.clone(), self.metrics.clone())?;
770 self.writer = writer;
771 self.data_path = data_path.clone();
772
773 if segment.is_change_based() {
775 let csoff_path = data_path.with_extension("csoff");
776 self.changeset_offsets = Some(
777 ChangesetOffsetWriter::new(&csoff_path, 0).map_err(ProviderError::other)?,
778 );
779 }
780
781 *self.writer.user_header_mut() = SegmentHeader::new(
782 self.reader().find_fixed_range(segment, last_block + 1),
783 None,
784 None,
785 segment,
786 );
787 }
788 }
789
790 self.writer.user_header_mut().increment_block();
791
792 if segment.is_change_based() {
794 if let Some(offset) = self.current_changeset_offset.take() &&
796 let Some(writer) = &mut self.changeset_offsets
797 {
798 writer.append(&offset).map_err(ProviderError::other)?;
799 }
800 let new_offset = self.writer.rows() as u64;
802 self.current_changeset_offset = Some(ChangesetOffset::new(new_offset, 0));
803 }
804
805 if let Some(metrics) = &self.metrics {
806 metrics.record_segment_operation(
807 segment,
808 StaticFileProviderOperation::IncrementBlock,
809 Some(start.elapsed()),
810 );
811 }
812
813 Ok(())
814 }
815
816 pub fn current_block_number(&self) -> Option<u64> {
818 self.writer.user_header().block_end()
819 }
820
821 pub fn next_block_number(&self) -> u64 {
823 self.writer
827 .user_header()
828 .block_end()
829 .map(|b| b + 1)
830 .unwrap_or_else(|| self.writer.user_header().expected_block_start())
831 }
832
833 fn check_next_block_number(&self, expected_block_number: u64) -> ProviderResult<()> {
836 let next_static_file_block = self.next_block_number();
837
838 if expected_block_number != next_static_file_block {
839 return Err(ProviderError::UnexpectedStaticFileBlockNumber(
840 self.writer.user_header().segment(),
841 expected_block_number,
842 next_static_file_block,
843 ))
844 }
845 Ok(())
846 }
847
848 fn truncate_changesets(&mut self, last_block: u64) -> ProviderResult<()> {
854 let segment = self.writer.user_header().segment();
855 debug_assert!(segment.is_change_based());
856
857 let current_block_end = self
859 .writer
860 .user_header()
861 .block_end()
862 .ok_or(ProviderError::MissingStaticFileBlock(segment, 0))?;
863
864 if current_block_end <= last_block {
866 return Ok(())
867 }
868
869 let mut expected_block_start = self.writer.user_header().expected_block_start();
871 while last_block < expected_block_start && expected_block_start > 0 {
872 self.delete_current_and_open_previous()?;
873 expected_block_start = self.writer.user_header().expected_block_start();
874 }
875
876 let blocks_to_keep = if last_block >= expected_block_start {
878 last_block - expected_block_start + 1
879 } else {
880 0
881 };
882
883 let csoff_path = self.data_path.with_extension("csoff");
885 let changeset_offsets_len = self.writer.user_header().changeset_offsets_len();
886
887 self.flush_current_changeset_offset()?;
889
890 let rows_to_keep = if blocks_to_keep == 0 {
891 0
892 } else if blocks_to_keep >= changeset_offsets_len {
893 self.writer.rows() as u64
895 } else {
896 let mut reader = ChangesetOffsetReader::new(&csoff_path, changeset_offsets_len)
900 .map_err(ProviderError::other)?;
901 if let Some(next_offset) = reader.get(blocks_to_keep).map_err(ProviderError::other)? {
902 next_offset.offset()
903 } else {
904 self.writer.rows() as u64
906 }
907 };
908
909 let total_rows = self.writer.rows() as u64;
910 let rows_to_delete = total_rows.saturating_sub(rows_to_keep);
911
912 if rows_to_delete > 0 {
913 let current_block_end = self
915 .writer
916 .user_header()
917 .block_end()
918 .ok_or(ProviderError::MissingStaticFileBlock(segment, 0))?;
919 let blocks_to_remove = current_block_end - last_block;
920
921 self.writer.user_header_mut().prune(blocks_to_remove);
923
924 self.writer.prune_rows(rows_to_delete as usize).map_err(ProviderError::other)?;
926 }
927
928 self.writer.user_header_mut().set_block_range(expected_block_start, last_block);
930
931 self.writer.user_header_mut().sync_changeset_offsets();
933
934 if let Some(writer) = &mut self.changeset_offsets {
936 writer.truncate(blocks_to_keep).map_err(ProviderError::other)?;
937 }
938
939 self.current_changeset_offset = None;
941
942 self.commit()?;
944
945 Ok(())
946 }
947
948 fn truncate(&mut self, num_rows: u64, last_block: Option<u64>) -> ProviderResult<()> {
956 let mut remaining_rows = num_rows;
957 let segment = self.writer.user_header().segment();
958 while remaining_rows > 0 {
959 let len = if segment.is_block_based() {
960 self.writer.user_header().block_len().unwrap_or_default()
961 } else {
962 self.writer.user_header().tx_len().unwrap_or_default()
963 };
964
965 if remaining_rows >= len {
966 let block_start = self.writer.user_header().expected_block_start();
969
970 if block_start != 0 &&
976 (segment.is_headers() || last_block.is_some_and(|b| b < block_start))
977 {
978 self.delete_current_and_open_previous()?;
979 } else {
980 self.writer.user_header_mut().prune(len);
982 self.writer.prune_rows(len as usize).map_err(ProviderError::other)?;
983 break
984 }
985
986 remaining_rows -= len;
987 } else {
988 self.writer.user_header_mut().prune(remaining_rows);
990
991 self.writer.prune_rows(remaining_rows as usize).map_err(ProviderError::other)?;
993 remaining_rows = 0;
994 }
995 }
996
997 if let Some(last_block) = last_block {
999 let mut expected_block_start = self.writer.user_header().expected_block_start();
1000
1001 if num_rows == 0 {
1002 while last_block < expected_block_start {
1006 self.delete_current_and_open_previous()?;
1007 expected_block_start = self.writer.user_header().expected_block_start();
1008 }
1009 }
1010 self.writer.user_header_mut().set_block_range(expected_block_start, last_block);
1011 }
1012
1013 self.commit()?;
1015
1016 Ok(())
1017 }
1018
1019 fn delete_current_and_open_previous(&mut self) -> Result<(), ProviderError> {
1022 let segment = self.user_header().segment();
1023 let current_path = self.data_path.clone();
1024 let (previous_writer, data_path) = Self::open(
1025 segment,
1026 self.writer.user_header().expected_block_start() - 1,
1027 self.reader.clone(),
1028 self.metrics.clone(),
1029 )?;
1030 self.writer = previous_writer;
1031 self.writer.set_dirty();
1032 self.data_path = data_path.clone();
1033
1034 if segment.is_change_based() {
1036 let csoff_path = current_path.with_extension("csoff");
1037 if csoff_path.exists() {
1038 std::fs::remove_file(&csoff_path).map_err(ProviderError::other)?;
1039 }
1040 let new_csoff_path = data_path.with_extension("csoff");
1042 let committed_len = self.writer.user_header().changeset_offsets_len();
1043 self.changeset_offsets = Some(
1044 ChangesetOffsetWriter::new(&new_csoff_path, committed_len)
1045 .map_err(ProviderError::other)?,
1046 );
1047 }
1048
1049 self.current_changeset_offset = None;
1051
1052 NippyJar::<SegmentHeader>::load(¤t_path)
1053 .map_err(ProviderError::other)?
1054 .delete()
1055 .map_err(ProviderError::other)?;
1056 Ok(())
1057 }
1058
1059 fn append_column<T: Compact>(&mut self, column: T) -> ProviderResult<()> {
1061 self.buf.clear();
1062 column.to_compact(&mut self.buf);
1063
1064 self.writer.append_column(Some(Ok(&self.buf))).map_err(ProviderError::other)?;
1065 Ok(())
1066 }
1067
1068 fn append_with_tx_number<V: Compact>(
1070 &mut self,
1071 tx_num: TxNumber,
1072 value: V,
1073 ) -> ProviderResult<()> {
1074 if let Some(range) = self.writer.user_header().tx_range() {
1075 let next_tx = range.end() + 1;
1076 if next_tx != tx_num {
1077 return Err(ProviderError::UnexpectedStaticFileTxNumber(
1078 self.writer.user_header().segment(),
1079 tx_num,
1080 next_tx,
1081 ))
1082 }
1083 self.writer.user_header_mut().increment_tx();
1084 } else {
1085 self.writer.user_header_mut().set_tx_range(tx_num, tx_num);
1086 }
1087
1088 self.append_column(value)?;
1089
1090 Ok(())
1091 }
1092
1093 fn append_change<V: Compact>(&mut self, change: &V) -> ProviderResult<()> {
1095 if let Some(ref mut offset) = self.current_changeset_offset {
1096 offset.increment_num_changes();
1097 }
1098 self.append_column(change)?;
1099 Ok(())
1100 }
1101
1102 pub fn append_header(&mut self, header: &N::BlockHeader, hash: &BlockHash) -> ProviderResult<()>
1107 where
1108 N::BlockHeader: Compact,
1109 {
1110 self.append_header_with_td(header, U256::ZERO, hash)
1111 }
1112
1113 pub fn append_header_with_td(
1118 &mut self,
1119 header: &N::BlockHeader,
1120 total_difficulty: U256,
1121 hash: &BlockHash,
1122 ) -> ProviderResult<()>
1123 where
1124 N::BlockHeader: Compact,
1125 {
1126 let start = Instant::now();
1127 self.ensure_no_queued_prune()?;
1128
1129 debug_assert!(self.writer.user_header().segment() == StaticFileSegment::Headers);
1130
1131 self.increment_block(header.number())?;
1132
1133 self.append_column(header)?;
1134 self.append_column(CompactU256::from(total_difficulty))?;
1135 self.append_column(hash)?;
1136
1137 if let Some(metrics) = &self.metrics {
1138 metrics.record_segment_operation(
1139 StaticFileSegment::Headers,
1140 StaticFileProviderOperation::Append,
1141 Some(start.elapsed()),
1142 );
1143 }
1144
1145 Ok(())
1146 }
1147
1148 pub fn append_header_direct(
1151 &mut self,
1152 header: &N::BlockHeader,
1153 total_difficulty: U256,
1154 hash: &BlockHash,
1155 ) -> ProviderResult<()>
1156 where
1157 N::BlockHeader: Compact,
1158 {
1159 let start = Instant::now();
1160 self.ensure_no_queued_prune()?;
1161
1162 debug_assert!(self.writer.user_header().segment() == StaticFileSegment::Headers);
1163
1164 self.append_column(header)?;
1165 self.append_column(CompactU256::from(total_difficulty))?;
1166 self.append_column(hash)?;
1167
1168 if let Some(metrics) = &self.metrics {
1169 metrics.record_segment_operation(
1170 StaticFileSegment::Headers,
1171 StaticFileProviderOperation::Append,
1172 Some(start.elapsed()),
1173 );
1174 }
1175
1176 Ok(())
1177 }
1178
1179 pub fn append_transaction(&mut self, tx_num: TxNumber, tx: &N::SignedTx) -> ProviderResult<()>
1184 where
1185 N::SignedTx: Compact,
1186 {
1187 let start = Instant::now();
1188 self.ensure_no_queued_prune()?;
1189
1190 debug_assert!(self.writer.user_header().segment() == StaticFileSegment::Transactions);
1191 self.append_with_tx_number(tx_num, tx)?;
1192
1193 if let Some(metrics) = &self.metrics {
1194 metrics.record_segment_operation(
1195 StaticFileSegment::Transactions,
1196 StaticFileProviderOperation::Append,
1197 Some(start.elapsed()),
1198 );
1199 }
1200
1201 Ok(())
1202 }
1203
1204 pub fn append_receipt(&mut self, tx_num: TxNumber, receipt: &N::Receipt) -> ProviderResult<()>
1209 where
1210 N::Receipt: Compact,
1211 {
1212 let start = Instant::now();
1213 self.ensure_no_queued_prune()?;
1214
1215 debug_assert!(self.writer.user_header().segment() == StaticFileSegment::Receipts);
1216 self.append_with_tx_number(tx_num, receipt)?;
1217
1218 if let Some(metrics) = &self.metrics {
1219 metrics.record_segment_operation(
1220 StaticFileSegment::Receipts,
1221 StaticFileProviderOperation::Append,
1222 Some(start.elapsed()),
1223 );
1224 }
1225
1226 Ok(())
1227 }
1228
1229 pub fn append_receipts<I, R>(&mut self, receipts: I) -> ProviderResult<()>
1231 where
1232 I: Iterator<Item = Result<(TxNumber, R), ProviderError>>,
1233 R: Borrow<N::Receipt>,
1234 N::Receipt: Compact,
1235 {
1236 debug_assert!(self.writer.user_header().segment() == StaticFileSegment::Receipts);
1237
1238 let mut receipts_iter = receipts.into_iter().peekable();
1239 if receipts_iter.peek().is_none() {
1241 return Ok(());
1242 }
1243
1244 let start = Instant::now();
1245 self.ensure_no_queued_prune()?;
1246
1247 let mut count: u64 = 0;
1249
1250 for receipt_result in receipts_iter {
1251 let (tx_num, receipt) = receipt_result?;
1252 self.append_with_tx_number(tx_num, receipt.borrow())?;
1253 count += 1;
1254 }
1255
1256 if let Some(metrics) = &self.metrics {
1257 metrics.record_segment_operations(
1258 StaticFileSegment::Receipts,
1259 StaticFileProviderOperation::Append,
1260 count,
1261 Some(start.elapsed()),
1262 );
1263 }
1264
1265 Ok(())
1266 }
1267
1268 pub fn append_transaction_sender(
1273 &mut self,
1274 tx_num: TxNumber,
1275 sender: &alloy_primitives::Address,
1276 ) -> ProviderResult<()> {
1277 let start = Instant::now();
1278 self.ensure_no_queued_prune()?;
1279
1280 debug_assert!(self.writer.user_header().segment() == StaticFileSegment::TransactionSenders);
1281 self.append_with_tx_number(tx_num, sender)?;
1282
1283 if let Some(metrics) = &self.metrics {
1284 metrics.record_segment_operation(
1285 StaticFileSegment::TransactionSenders,
1286 StaticFileProviderOperation::Append,
1287 Some(start.elapsed()),
1288 );
1289 }
1290
1291 Ok(())
1292 }
1293
1294 pub fn append_transaction_senders<I>(&mut self, senders: I) -> ProviderResult<()>
1296 where
1297 I: Iterator<Item = (TxNumber, alloy_primitives::Address)>,
1298 {
1299 debug_assert!(self.writer.user_header().segment() == StaticFileSegment::TransactionSenders);
1300
1301 let mut senders_iter = senders.into_iter().peekable();
1302 if senders_iter.peek().is_none() {
1304 return Ok(());
1305 }
1306
1307 let start = Instant::now();
1308 self.ensure_no_queued_prune()?;
1309
1310 let mut count: u64 = 0;
1312 for (tx_num, sender) in senders_iter {
1313 self.append_with_tx_number(tx_num, sender)?;
1314 count += 1;
1315 }
1316
1317 if let Some(metrics) = &self.metrics {
1318 metrics.record_segment_operations(
1319 StaticFileSegment::TransactionSenders,
1320 StaticFileProviderOperation::Append,
1321 count,
1322 Some(start.elapsed()),
1323 );
1324 }
1325
1326 Ok(())
1327 }
1328
1329 pub fn append_account_changeset(
1335 &mut self,
1336 mut changeset: Vec<AccountBeforeTx>,
1337 block_number: u64,
1338 ) -> ProviderResult<()> {
1339 debug_assert!(self.writer.user_header().segment() == StaticFileSegment::AccountChangeSets);
1340 let start = Instant::now();
1341
1342 self.increment_block(block_number)?;
1343 self.ensure_no_queued_prune()?;
1344
1345 changeset.sort_by_key(|change| change.address);
1347
1348 let mut count: u64 = 0;
1349
1350 for change in changeset {
1351 self.append_change(&change)?;
1352 count += 1;
1353 }
1354
1355 if let Some(metrics) = &self.metrics {
1356 metrics.record_segment_operations(
1357 StaticFileSegment::AccountChangeSets,
1358 StaticFileProviderOperation::Append,
1359 count,
1360 Some(start.elapsed()),
1361 );
1362 }
1363
1364 Ok(())
1365 }
1366
1367 pub fn append_storage_changeset(
1371 &mut self,
1372 mut changeset: Vec<StorageBeforeTx>,
1373 block_number: u64,
1374 ) -> ProviderResult<()> {
1375 debug_assert!(self.writer.user_header().segment() == StaticFileSegment::StorageChangeSets);
1376 let start = Instant::now();
1377
1378 self.increment_block(block_number)?;
1379 self.ensure_no_queued_prune()?;
1380
1381 changeset.sort_by_key(|change| (change.address, change.key));
1383
1384 let mut count: u64 = 0;
1385 for change in changeset {
1386 self.append_change(&change)?;
1387 count += 1;
1388 }
1389
1390 if let Some(metrics) = &self.metrics {
1391 metrics.record_segment_operations(
1392 StaticFileSegment::StorageChangeSets,
1393 StaticFileProviderOperation::Append,
1394 count,
1395 Some(start.elapsed()),
1396 );
1397 }
1398
1399 Ok(())
1400 }
1401
1402 pub fn prune_transactions(
1406 &mut self,
1407 to_delete: u64,
1408 last_block: BlockNumber,
1409 ) -> ProviderResult<()> {
1410 debug_assert_eq!(self.writer.user_header().segment(), StaticFileSegment::Transactions);
1411 self.queue_prune(PruneStrategy::Transactions { num_rows: to_delete, last_block })
1412 }
1413
1414 pub fn prune_receipts(
1418 &mut self,
1419 to_delete: u64,
1420 last_block: BlockNumber,
1421 ) -> ProviderResult<()> {
1422 debug_assert_eq!(self.writer.user_header().segment(), StaticFileSegment::Receipts);
1423 self.queue_prune(PruneStrategy::Receipts { num_rows: to_delete, last_block })
1424 }
1425
1426 pub fn prune_transaction_senders(
1430 &mut self,
1431 to_delete: u64,
1432 last_block: BlockNumber,
1433 ) -> ProviderResult<()> {
1434 debug_assert_eq!(
1435 self.writer.user_header().segment(),
1436 StaticFileSegment::TransactionSenders
1437 );
1438 self.queue_prune(PruneStrategy::TransactionSenders { num_rows: to_delete, last_block })
1439 }
1440
1441 pub fn prune_headers(&mut self, to_delete: u64) -> ProviderResult<()> {
1443 debug_assert_eq!(self.writer.user_header().segment(), StaticFileSegment::Headers);
1444 self.queue_prune(PruneStrategy::Headers { num_blocks: to_delete })
1445 }
1446
1447 pub fn prune_account_changesets(&mut self, last_block: u64) -> ProviderResult<()> {
1449 debug_assert_eq!(self.writer.user_header().segment(), StaticFileSegment::AccountChangeSets);
1450 self.queue_prune(PruneStrategy::AccountChangeSets { last_block })
1451 }
1452
1453 pub fn prune_storage_changesets(&mut self, last_block: u64) -> ProviderResult<()> {
1455 debug_assert_eq!(self.writer.user_header().segment(), StaticFileSegment::StorageChangeSets);
1456 self.queue_prune(PruneStrategy::StorageChangeSets { last_block })
1457 }
1458
1459 fn queue_prune(&mut self, strategy: PruneStrategy) -> ProviderResult<()> {
1461 self.ensure_no_queued_prune()?;
1462 self.prune_on_commit = Some(strategy);
1463 Ok(())
1464 }
1465
1466 fn ensure_no_queued_prune(&self) -> ProviderResult<()> {
1468 if self.prune_on_commit.is_some() {
1469 return Err(ProviderError::other(StaticFileWriterError::new(
1470 "Pruning should be committed before appending or pruning more data",
1471 )));
1472 }
1473 Ok(())
1474 }
1475
1476 fn prune_transaction_data(
1478 &mut self,
1479 to_delete: u64,
1480 last_block: BlockNumber,
1481 ) -> ProviderResult<()> {
1482 let start = Instant::now();
1483
1484 debug_assert!(self.writer.user_header().segment() == StaticFileSegment::Transactions);
1485
1486 self.truncate(to_delete, Some(last_block))?;
1487
1488 if let Some(metrics) = &self.metrics {
1489 metrics.record_segment_operation(
1490 StaticFileSegment::Transactions,
1491 StaticFileProviderOperation::Prune,
1492 Some(start.elapsed()),
1493 );
1494 }
1495
1496 Ok(())
1497 }
1498
1499 fn prune_account_changeset_data(&mut self, last_block: BlockNumber) -> ProviderResult<()> {
1501 let start = Instant::now();
1502
1503 debug_assert!(self.writer.user_header().segment() == StaticFileSegment::AccountChangeSets);
1504
1505 self.truncate_changesets(last_block)?;
1506
1507 if let Some(metrics) = &self.metrics {
1508 metrics.record_segment_operation(
1509 StaticFileSegment::AccountChangeSets,
1510 StaticFileProviderOperation::Prune,
1511 Some(start.elapsed()),
1512 );
1513 }
1514
1515 Ok(())
1516 }
1517
1518 fn prune_storage_changeset_data(&mut self, last_block: BlockNumber) -> ProviderResult<()> {
1520 let start = Instant::now();
1521
1522 debug_assert!(self.writer.user_header().segment() == StaticFileSegment::StorageChangeSets);
1523
1524 self.truncate_changesets(last_block)?;
1525
1526 if let Some(metrics) = &self.metrics {
1527 metrics.record_segment_operation(
1528 StaticFileSegment::StorageChangeSets,
1529 StaticFileProviderOperation::Prune,
1530 Some(start.elapsed()),
1531 );
1532 }
1533
1534 Ok(())
1535 }
1536
1537 fn prune_receipt_data(
1539 &mut self,
1540 to_delete: u64,
1541 last_block: BlockNumber,
1542 ) -> ProviderResult<()> {
1543 let start = Instant::now();
1544
1545 debug_assert!(self.writer.user_header().segment() == StaticFileSegment::Receipts);
1546
1547 self.truncate(to_delete, Some(last_block))?;
1548
1549 if let Some(metrics) = &self.metrics {
1550 metrics.record_segment_operation(
1551 StaticFileSegment::Receipts,
1552 StaticFileProviderOperation::Prune,
1553 Some(start.elapsed()),
1554 );
1555 }
1556
1557 Ok(())
1558 }
1559
1560 fn prune_transaction_sender_data(
1562 &mut self,
1563 to_delete: u64,
1564 last_block: BlockNumber,
1565 ) -> ProviderResult<()> {
1566 let start = Instant::now();
1567
1568 debug_assert!(self.writer.user_header().segment() == StaticFileSegment::TransactionSenders);
1569
1570 self.truncate(to_delete, Some(last_block))?;
1571
1572 if let Some(metrics) = &self.metrics {
1573 metrics.record_segment_operation(
1574 StaticFileSegment::TransactionSenders,
1575 StaticFileProviderOperation::Prune,
1576 Some(start.elapsed()),
1577 );
1578 }
1579
1580 Ok(())
1581 }
1582
1583 fn prune_header_data(&mut self, to_delete: u64) -> ProviderResult<()> {
1585 let start = Instant::now();
1586
1587 debug_assert!(self.writer.user_header().segment() == StaticFileSegment::Headers);
1588
1589 self.truncate(to_delete, None)?;
1590
1591 if let Some(metrics) = &self.metrics {
1592 metrics.record_segment_operation(
1593 StaticFileSegment::Headers,
1594 StaticFileProviderOperation::Prune,
1595 Some(start.elapsed()),
1596 );
1597 }
1598
1599 Ok(())
1600 }
1601
1602 pub fn reader(&self) -> StaticFileProvider<N> {
1604 Self::upgrade_provider_to_strong_reference(&self.reader)
1605 }
1606
1607 fn upgrade_provider_to_strong_reference(
1616 provider: &Weak<StaticFileProviderInner<N>>,
1617 ) -> StaticFileProvider<N> {
1618 provider.upgrade().map(StaticFileProvider).expect("StaticFileProvider is dropped")
1619 }
1620
1621 pub const fn user_header(&self) -> &SegmentHeader {
1623 self.writer.user_header()
1624 }
1625
1626 pub const fn user_header_mut(&mut self) -> &mut SegmentHeader {
1628 self.writer.user_header_mut()
1629 }
1630
1631 #[cfg(any(test, feature = "test-utils"))]
1633 pub const fn set_block_range(&mut self, block_range: std::ops::RangeInclusive<BlockNumber>) {
1634 self.writer.user_header_mut().set_block_range(*block_range.start(), *block_range.end())
1635 }
1636
1637 #[cfg(any(test, feature = "test-utils"))]
1639 pub const fn inner(&mut self) -> &mut NippyJarWriter<SegmentHeader> {
1640 &mut self.writer
1641 }
1642}
1643
1644fn create_jar(
1645 segment: StaticFileSegment,
1646 path: &Path,
1647 expected_block_range: SegmentRangeInclusive,
1648) -> NippyJar<SegmentHeader> {
1649 let mut jar = NippyJar::new(
1650 segment.columns(),
1651 path,
1652 SegmentHeader::new(expected_block_range, None, None, segment),
1653 );
1654
1655 if segment.is_headers() {
1658 jar = jar.with_lz4();
1659 }
1660
1661 jar
1662}