1use super::{
2 manager::StaticFileProviderInner, metrics::StaticFileProviderMetrics, StaticFileProvider,
3};
4use crate::providers::static_file::metrics::StaticFileProviderOperation;
5use alloy_consensus::BlockHeader;
6use alloy_primitives::{BlockHash, BlockNumber, TxNumber, U256};
7use parking_lot::{lock_api::RwLockWriteGuard, RawRwLock, RwLock};
8use reth_codecs::Compact;
9use reth_db::models::{AccountBeforeTx, StorageBeforeTx};
10use reth_db_api::models::CompactU256;
11use reth_nippy_jar::{NippyJar, NippyJarError, NippyJarWriter};
12use reth_node_types::NodePrimitives;
13use reth_primitives_traits::FastInstant as Instant;
14use reth_static_file_types::{
15 ChangesetOffset, ChangesetOffsetReader, ChangesetOffsetWriter, SegmentHeader,
16 SegmentRangeInclusive, StaticFileSegment,
17};
18use reth_storage_errors::provider::{ProviderError, ProviderResult, StaticFileWriterError};
19use std::{
20 borrow::Borrow,
21 cmp::Ordering,
22 fmt::Debug,
23 path::{Path, PathBuf},
24 sync::{Arc, Weak},
25};
26use tracing::{debug, instrument};
27
28#[derive(Debug, Clone, Copy)]
30enum PruneStrategy {
31 Headers {
33 num_blocks: u64,
35 },
36 Transactions {
38 num_rows: u64,
40 last_block: BlockNumber,
42 },
43 Receipts {
45 num_rows: u64,
47 last_block: BlockNumber,
49 },
50 TransactionSenders {
52 num_rows: u64,
54 last_block: BlockNumber,
56 },
57 AccountChangeSets {
59 last_block: BlockNumber,
61 },
62 StorageChangeSets {
64 last_block: BlockNumber,
66 },
67}
68
69#[derive(Debug)]
74pub(crate) struct StaticFileWriters<N> {
75 headers: RwLock<Option<StaticFileProviderRW<N>>>,
76 transactions: RwLock<Option<StaticFileProviderRW<N>>>,
77 receipts: RwLock<Option<StaticFileProviderRW<N>>>,
78 transaction_senders: RwLock<Option<StaticFileProviderRW<N>>>,
79 account_change_sets: RwLock<Option<StaticFileProviderRW<N>>>,
80 storage_change_sets: RwLock<Option<StaticFileProviderRW<N>>>,
81}
82
83impl<N> Default for StaticFileWriters<N> {
84 fn default() -> Self {
85 Self {
86 headers: Default::default(),
87 transactions: Default::default(),
88 receipts: Default::default(),
89 transaction_senders: Default::default(),
90 account_change_sets: Default::default(),
91 storage_change_sets: Default::default(),
92 }
93 }
94}
95
96impl<N: NodePrimitives> StaticFileWriters<N> {
97 pub(crate) fn get_or_create(
98 &self,
99 segment: StaticFileSegment,
100 create_fn: impl FnOnce() -> ProviderResult<StaticFileProviderRW<N>>,
101 ) -> ProviderResult<StaticFileProviderRWRefMut<'_, N>> {
102 let mut write_guard = match segment {
103 StaticFileSegment::Headers => self.headers.write(),
104 StaticFileSegment::Transactions => self.transactions.write(),
105 StaticFileSegment::Receipts => self.receipts.write(),
106 StaticFileSegment::TransactionSenders => self.transaction_senders.write(),
107 StaticFileSegment::AccountChangeSets => self.account_change_sets.write(),
108 StaticFileSegment::StorageChangeSets => self.storage_change_sets.write(),
109 };
110
111 if write_guard.is_none() {
112 *write_guard = Some(create_fn()?);
113 }
114
115 Ok(StaticFileProviderRWRefMut(write_guard))
116 }
117
118 #[instrument(
119 name = "StaticFileWriters::commit",
120 level = "debug",
121 target = "providers::static_file",
122 skip_all
123 )]
124 pub(crate) fn commit(&self) -> ProviderResult<()> {
125 debug!(target: "providers::static_file", "Committing all static file segments");
126
127 for writer_lock in [
128 &self.headers,
129 &self.transactions,
130 &self.receipts,
131 &self.transaction_senders,
132 &self.account_change_sets,
133 &self.storage_change_sets,
134 ] {
135 let mut writer = writer_lock.write();
136 if let Some(writer) = writer.as_mut() {
137 writer.commit()?;
138 }
139 }
140
141 debug!(target: "providers::static_file", "Committed all static file segments");
142 Ok(())
143 }
144
145 pub(crate) fn has_unwind_queued(&self) -> bool {
146 for writer_lock in [
147 &self.headers,
148 &self.transactions,
149 &self.receipts,
150 &self.transaction_senders,
151 &self.account_change_sets,
152 &self.storage_change_sets,
153 ] {
154 let writer = writer_lock.read();
155 if let Some(writer) = writer.as_ref() &&
156 writer.will_prune_on_commit()
157 {
158 return true
159 }
160 }
161 false
162 }
163
164 #[instrument(
169 name = "StaticFileWriters::finalize",
170 level = "debug",
171 target = "providers::static_file",
172 skip_all
173 )]
174 pub(crate) fn finalize(&self) -> ProviderResult<()> {
175 debug!(target: "providers::static_file", "Finalizing all static file segments into disk");
176
177 for writer_lock in [
178 &self.headers,
179 &self.transactions,
180 &self.receipts,
181 &self.transaction_senders,
182 &self.account_change_sets,
183 &self.storage_change_sets,
184 ] {
185 let mut writer = writer_lock.write();
186 if let Some(writer) = writer.as_mut() {
187 writer.finalize()?;
188 }
189 }
190
191 debug!(target: "providers::static_file", "Finalized all static file segments into disk");
192 Ok(())
193 }
194}
195
196#[derive(Debug)]
198pub struct StaticFileProviderRWRefMut<'a, N>(
199 pub(crate) RwLockWriteGuard<'a, RawRwLock, Option<StaticFileProviderRW<N>>>,
200);
201
202impl<N> std::ops::DerefMut for StaticFileProviderRWRefMut<'_, N> {
203 fn deref_mut(&mut self) -> &mut Self::Target {
204 self.0.as_mut().expect("static file writer provider should be init")
206 }
207}
208
209impl<N> std::ops::Deref for StaticFileProviderRWRefMut<'_, N> {
210 type Target = StaticFileProviderRW<N>;
211
212 fn deref(&self) -> &Self::Target {
213 self.0.as_ref().expect("static file writer provider should be init")
215 }
216}
217
218#[derive(Debug)]
219pub struct StaticFileProviderRW<N> {
221 reader: Weak<StaticFileProviderInner<N>>,
226 writer: NippyJarWriter<SegmentHeader>,
228 data_path: PathBuf,
230 buf: Vec<u8>,
232 metrics: Option<Arc<StaticFileProviderMetrics>>,
234 prune_on_commit: Option<PruneStrategy>,
236 synced: bool,
238 changeset_offsets: Option<ChangesetOffsetWriter>,
240 current_changeset_offset: Option<ChangesetOffset>,
242}
243
244impl<N: NodePrimitives> StaticFileProviderRW<N> {
245 pub fn new(
250 segment: StaticFileSegment,
251 block: BlockNumber,
252 reader: Weak<StaticFileProviderInner<N>>,
253 metrics: Option<Arc<StaticFileProviderMetrics>>,
254 ) -> ProviderResult<Self> {
255 let (writer, data_path) = Self::open(segment, block, reader.clone(), metrics.clone())?;
256
257 let mut writer = Self {
259 writer,
260 data_path,
261 buf: Vec::with_capacity(100),
262 reader,
263 metrics,
264 prune_on_commit: None,
265 synced: false,
266 changeset_offsets: None,
267 current_changeset_offset: None,
268 };
269
270 writer.ensure_end_range_consistency()?;
273
274 if segment.is_change_based() {
276 writer.heal_changeset_sidecar()?;
277 }
278
279 Ok(writer)
280 }
281
282 fn open(
283 segment: StaticFileSegment,
284 block: u64,
285 reader: Weak<StaticFileProviderInner<N>>,
286 metrics: Option<Arc<StaticFileProviderMetrics>>,
287 ) -> ProviderResult<(NippyJarWriter<SegmentHeader>, PathBuf)> {
288 let start = Instant::now();
289
290 let static_file_provider = Self::upgrade_provider_to_strong_reference(&reader);
291
292 let block_range = static_file_provider.find_fixed_range(segment, block);
293 let (jar, path) = match static_file_provider.get_segment_provider_for_block(
294 segment,
295 block_range.start(),
296 None,
297 ) {
298 Ok(provider) => (
299 NippyJar::load(provider.data_path()).map_err(ProviderError::other)?,
300 provider.data_path().into(),
301 ),
302 Err(ProviderError::MissingStaticFileBlock(_, _)) => {
303 let path = static_file_provider.directory().join(segment.filename(&block_range));
304 (create_jar(segment, &path, block_range), path)
305 }
306 Err(err) => return Err(err),
307 };
308
309 let result = match NippyJarWriter::new(jar) {
310 Ok(writer) => Ok((writer, path)),
311 Err(NippyJarError::FrozenJar) => {
312 Err(ProviderError::FinalizedStaticFile(segment, block))
314 }
315 Err(e) => Err(ProviderError::other(e)),
316 }?;
317
318 if let Some(metrics) = &metrics {
319 metrics.record_segment_operation(
320 segment,
321 StaticFileProviderOperation::OpenWriter,
322 Some(start.elapsed()),
323 );
324 }
325
326 Ok(result)
327 }
328
329 fn ensure_end_range_consistency(&mut self) -> ProviderResult<()> {
338 let expected_rows = if self.user_header().segment().is_headers() {
340 self.user_header().block_len().unwrap_or_default()
341 } else {
342 self.user_header().tx_len().unwrap_or_default()
343 };
344 let actual_rows = self.writer.rows() as u64;
345 let pruned_rows = expected_rows.saturating_sub(actual_rows);
346 if pruned_rows > 0 {
347 self.user_header_mut().prune(pruned_rows);
348 }
349
350 debug!(
351 target: "providers::static_file",
352 segment = ?self.writer.user_header().segment(),
353 path = ?self.data_path,
354 pruned_rows,
355 "Ensuring end range consistency"
356 );
357
358 self.writer.commit().map_err(ProviderError::other)?;
359
360 self.update_index()?;
362 Ok(())
363 }
364
365 pub const fn will_prune_on_commit(&self) -> bool {
367 self.prune_on_commit.is_some()
368 }
369
370 fn heal_changeset_sidecar(&mut self) -> ProviderResult<()> {
378 let csoff_path = self.data_path.with_extension("csoff");
379
380 let header_claims_blocks = self.writer.user_header().changeset_offsets_len();
382 let actual_nippy_rows = self.writer.rows() as u64;
383
384 let actual_sidecar_blocks = if csoff_path.exists() {
386 let file_len = reth_fs_util::metadata(&csoff_path).map_err(ProviderError::other)?.len();
387 let aligned_len = file_len - (file_len % 16);
389 aligned_len / 16
390 } else {
391 0
392 };
393
394 if header_claims_blocks == 0 && actual_sidecar_blocks == 0 {
396 self.changeset_offsets =
397 Some(ChangesetOffsetWriter::new(&csoff_path, 0).map_err(ProviderError::other)?);
398 return Ok(());
399 }
400
401 let valid_blocks = if actual_sidecar_blocks > 0 {
403 let reader = ChangesetOffsetReader::new(&csoff_path, actual_sidecar_blocks)
404 .map_err(ProviderError::other)?;
405
406 let mut valid = 0u64;
409 for i in 0..actual_sidecar_blocks {
410 if let Some(offset) = reader.get(i).map_err(ProviderError::other)? {
411 if offset.offset() + offset.num_changes() <= actual_nippy_rows {
412 valid = i + 1;
413 } else {
414 break;
416 }
417 }
418 }
419 valid
420 } else {
421 0
422 };
423
424 let correct_blocks = valid_blocks.min(header_claims_blocks);
427
428 let mut needs_header_commit = false;
430
431 if correct_blocks != header_claims_blocks || actual_sidecar_blocks != correct_blocks {
432 tracing::warn!(
433 target: "reth::static_file",
434 path = %csoff_path.display(),
435 header_claims = header_claims_blocks,
436 sidecar_has = actual_sidecar_blocks,
437 valid_blocks = correct_blocks,
438 actual_rows = actual_nippy_rows,
439 "Three-way healing: syncing header, sidecar, and NippyJar state"
440 );
441
442 if actual_sidecar_blocks > correct_blocks {
444 use std::fs::OpenOptions;
445 let file = OpenOptions::new()
446 .write(true)
447 .open(&csoff_path)
448 .map_err(ProviderError::other)?;
449 file.set_len(correct_blocks * 16).map_err(ProviderError::other)?;
450 file.sync_all().map_err(ProviderError::other)?;
451
452 tracing::debug!(
453 target: "reth::static_file",
454 "Truncated sidecar from {} to {} blocks",
455 actual_sidecar_blocks,
456 correct_blocks
457 );
458 }
459
460 if correct_blocks < header_claims_blocks {
462 let blocks_removed = header_claims_blocks - correct_blocks;
465 self.writer.user_header_mut().prune(blocks_removed);
466
467 tracing::debug!(
468 target: "reth::static_file",
469 "Updated header: removed {} blocks (changeset_offsets_len: {} -> {})",
470 blocks_removed,
471 header_claims_blocks,
472 correct_blocks
473 );
474
475 needs_header_commit = true;
476 }
477 } else {
478 tracing::debug!(
479 target: "reth::static_file",
480 path = %csoff_path.display(),
481 blocks = correct_blocks,
482 "Changeset sidecar consistent, no healing needed"
483 );
484 }
485
486 let csoff_writer = ChangesetOffsetWriter::new(&csoff_path, correct_blocks)
488 .map_err(ProviderError::other)?;
489
490 self.changeset_offsets = Some(csoff_writer);
491
492 if needs_header_commit {
494 self.writer.commit().map_err(ProviderError::other)?;
495
496 tracing::info!(
497 target: "reth::static_file",
498 path = %csoff_path.display(),
499 blocks = correct_blocks,
500 "Committed healed changeset offset header"
501 );
502 }
503
504 Ok(())
505 }
506
507 fn flush_current_changeset_offset(&mut self) -> ProviderResult<()> {
515 if !self.writer.user_header().segment().is_change_based() {
516 return Ok(());
517 }
518
519 if let Some(offset) = self.current_changeset_offset.take() &&
520 let Some(writer) = &mut self.changeset_offsets
521 {
522 writer.append(&offset).map_err(ProviderError::other)?;
523 }
524 Ok(())
525 }
526
527 pub fn sync_all(&mut self) -> ProviderResult<()> {
534 if self.prune_on_commit.is_some() {
535 return Err(StaticFileWriterError::FinalizeWithPruneQueued.into());
536 }
537
538 self.flush_current_changeset_offset()?;
540 if let Some(writer) = &mut self.changeset_offsets {
541 writer.sync().map_err(ProviderError::other)?;
542 self.writer.user_header_mut().set_changeset_offsets_len(writer.len());
544 }
545
546 if self.writer.is_dirty() {
547 self.writer.sync_all().map_err(ProviderError::other)?;
548 }
549 self.synced = true;
550 Ok(())
551 }
552
553 #[instrument(
559 name = "StaticFileProviderRW::finalize",
560 level = "debug",
561 target = "providers::static_file",
562 skip_all
563 )]
564 pub fn finalize(&mut self) -> ProviderResult<()> {
565 if self.prune_on_commit.is_some() {
566 return Err(StaticFileWriterError::FinalizeWithPruneQueued.into());
567 }
568 if self.writer.is_dirty() {
569 if !self.synced {
570 self.sync_all()?;
573 }
574
575 self.writer.finalize().map_err(ProviderError::other)?;
576 self.update_index()?;
577 }
578 self.synced = false;
579 Ok(())
580 }
581
582 #[instrument(
584 name = "StaticFileProviderRW::commit",
585 level = "debug",
586 target = "providers::static_file",
587 skip_all
588 )]
589 pub fn commit(&mut self) -> ProviderResult<()> {
590 let start = Instant::now();
591
592 if let Some(strategy) = self.prune_on_commit.take() {
594 debug!(
595 target: "providers::static_file",
596 segment = ?self.writer.user_header().segment(),
597 "Pruning data on commit"
598 );
599 match strategy {
600 PruneStrategy::Headers { num_blocks } => self.prune_header_data(num_blocks)?,
601 PruneStrategy::Transactions { num_rows, last_block } => {
602 self.prune_transaction_data(num_rows, last_block)?
603 }
604 PruneStrategy::Receipts { num_rows, last_block } => {
605 self.prune_receipt_data(num_rows, last_block)?
606 }
607 PruneStrategy::TransactionSenders { num_rows, last_block } => {
608 self.prune_transaction_sender_data(num_rows, last_block)?
609 }
610 PruneStrategy::AccountChangeSets { last_block } => {
611 self.prune_account_changeset_data(last_block)?
612 }
613 PruneStrategy::StorageChangeSets { last_block } => {
614 self.prune_storage_changeset_data(last_block)?
615 }
616 }
617 }
618
619 self.flush_current_changeset_offset()?;
622 if let Some(writer) = &mut self.changeset_offsets {
623 writer.sync().map_err(ProviderError::other)?;
624 self.writer.user_header_mut().set_changeset_offsets_len(writer.len());
626 }
627
628 if self.writer.is_dirty() {
629 debug!(
630 target: "providers::static_file",
631 segment = ?self.writer.user_header().segment(),
632 "Committing writer to disk"
633 );
634
635 self.writer.commit().map_err(ProviderError::other)?;
637
638 if let Some(metrics) = &self.metrics {
639 metrics.record_segment_operation(
640 self.writer.user_header().segment(),
641 StaticFileProviderOperation::CommitWriter,
642 Some(start.elapsed()),
643 );
644 }
645
646 debug!(
647 target: "providers::static_file",
648 segment = ?self.writer.user_header().segment(),
649 path = ?self.data_path,
650 duration = ?start.elapsed(),
651 "Committed writer to disk"
652 );
653
654 self.update_index()?;
655 }
656
657 Ok(())
658 }
659
660 #[cfg(feature = "test-utils")]
664 pub fn commit_without_sync_all(&mut self) -> ProviderResult<()> {
665 let start = Instant::now();
666
667 debug!(
668 target: "providers::static_file",
669 segment = ?self.writer.user_header().segment(),
670 "Committing writer to disk (without sync)"
671 );
672
673 self.writer.commit_without_sync_all().map_err(ProviderError::other)?;
675
676 if let Some(metrics) = &self.metrics {
677 metrics.record_segment_operation(
678 self.writer.user_header().segment(),
679 StaticFileProviderOperation::CommitWriter,
680 Some(start.elapsed()),
681 );
682 }
683
684 debug!(
685 target: "providers::static_file",
686 segment = ?self.writer.user_header().segment(),
687 path = ?self.data_path,
688 duration = ?start.elapsed(),
689 "Committed writer to disk (without sync)"
690 );
691
692 self.update_index()?;
693
694 Ok(())
695 }
696
697 fn update_index(&self) -> ProviderResult<()> {
699 let segment = self.writer.user_header().segment();
700
701 let segment_max_block = self
709 .writer
710 .user_header()
711 .block_range()
712 .as_ref()
713 .map(|block_range| block_range.end())
714 .or_else(|| {
715 let expected_start = self.writer.user_header().expected_block_start();
716 if expected_start <= self.reader().genesis_block_number() {
717 return None;
718 }
719
720 let prev_block = expected_start - 1;
721 let prev_range = self.reader().find_fixed_range(segment, prev_block);
722 let prev_path = self.reader().directory().join(segment.filename(&prev_range));
723 prev_path.exists().then_some(prev_block)
724 });
725
726 self.reader().update_index(segment, segment_max_block)
727 }
728
729 pub fn ensure_at_block(&mut self, advance_to: BlockNumber) -> ProviderResult<()> {
734 let current_block = if let Some(current_block_number) = self.current_block_number() {
735 current_block_number
736 } else {
737 self.increment_block(0)?;
738 0
739 };
740
741 match current_block.cmp(&advance_to) {
742 Ordering::Less => {
743 for block in current_block + 1..=advance_to {
744 self.increment_block(block)?;
745 }
746 }
747 Ordering::Equal => {}
748 Ordering::Greater => {
749 return Err(ProviderError::UnexpectedStaticFileBlockNumber(
750 self.writer.user_header().segment(),
751 current_block,
752 advance_to,
753 ));
754 }
755 }
756
757 Ok(())
758 }
759
760 pub fn increment_block(&mut self, expected_block_number: BlockNumber) -> ProviderResult<()> {
763 let segment = self.writer.user_header().segment();
764
765 self.check_next_block_number(expected_block_number)?;
766
767 let start = Instant::now();
768 if let Some(last_block) = self.writer.user_header().block_end() {
769 if last_block == self.writer.user_header().expected_block_end() {
771 self.commit()?;
773
774 let (writer, data_path) =
776 Self::open(segment, last_block + 1, self.reader.clone(), self.metrics.clone())?;
777 self.writer = writer;
778 self.data_path = data_path.clone();
779
780 if segment.is_change_based() {
782 let csoff_path = data_path.with_extension("csoff");
783 self.changeset_offsets = Some(
784 ChangesetOffsetWriter::new(&csoff_path, 0).map_err(ProviderError::other)?,
785 );
786 }
787
788 *self.writer.user_header_mut() = SegmentHeader::new(
789 self.reader().find_fixed_range(segment, last_block + 1),
790 None,
791 None,
792 segment,
793 );
794 }
795 }
796
797 self.writer.user_header_mut().increment_block();
798
799 if segment.is_change_based() {
801 if let Some(offset) = self.current_changeset_offset.take() &&
803 let Some(writer) = &mut self.changeset_offsets
804 {
805 writer.append(&offset).map_err(ProviderError::other)?;
806 }
807 let new_offset = self.writer.rows() as u64;
809 self.current_changeset_offset = Some(ChangesetOffset::new(new_offset, 0));
810 }
811
812 if let Some(metrics) = &self.metrics {
813 metrics.record_segment_operation(
814 segment,
815 StaticFileProviderOperation::IncrementBlock,
816 Some(start.elapsed()),
817 );
818 }
819
820 Ok(())
821 }
822
823 pub fn current_block_number(&self) -> Option<u64> {
825 self.writer.user_header().block_end()
826 }
827
828 pub fn next_block_number(&self) -> u64 {
830 self.writer
834 .user_header()
835 .block_end()
836 .map(|b| b + 1)
837 .unwrap_or_else(|| self.writer.user_header().expected_block_start())
838 }
839
840 fn check_next_block_number(&self, expected_block_number: u64) -> ProviderResult<()> {
843 let next_static_file_block = self.next_block_number();
844
845 if expected_block_number != next_static_file_block {
846 return Err(ProviderError::UnexpectedStaticFileBlockNumber(
847 self.writer.user_header().segment(),
848 expected_block_number,
849 next_static_file_block,
850 ))
851 }
852 Ok(())
853 }
854
855 fn truncate_changesets(&mut self, last_block: u64) -> ProviderResult<()> {
861 let segment = self.writer.user_header().segment();
862 debug_assert!(segment.is_change_based());
863
864 let current_block_end = self
866 .writer
867 .user_header()
868 .block_end()
869 .ok_or(ProviderError::MissingStaticFileBlock(segment, 0))?;
870
871 if current_block_end <= last_block {
873 return Ok(())
874 }
875
876 let mut expected_block_start = self.writer.user_header().expected_block_start();
878 while last_block < expected_block_start && expected_block_start > 0 {
879 self.delete_current_and_open_previous()?;
880 expected_block_start = self.writer.user_header().expected_block_start();
881 }
882
883 let blocks_to_keep = if last_block >= expected_block_start {
885 last_block - expected_block_start + 1
886 } else {
887 0
888 };
889
890 let csoff_path = self.data_path.with_extension("csoff");
892 let changeset_offsets_len = self.writer.user_header().changeset_offsets_len();
893
894 self.flush_current_changeset_offset()?;
896
897 let rows_to_keep = if blocks_to_keep == 0 {
898 0
899 } else if blocks_to_keep >= changeset_offsets_len {
900 self.writer.rows() as u64
902 } else {
903 let reader = ChangesetOffsetReader::new(&csoff_path, changeset_offsets_len)
907 .map_err(ProviderError::other)?;
908 if let Some(next_offset) = reader.get(blocks_to_keep).map_err(ProviderError::other)? {
909 next_offset.offset()
910 } else {
911 self.writer.rows() as u64
913 }
914 };
915
916 let total_rows = self.writer.rows() as u64;
917 let rows_to_delete = total_rows.saturating_sub(rows_to_keep);
918
919 if rows_to_delete > 0 {
920 let current_block_end = self
922 .writer
923 .user_header()
924 .block_end()
925 .ok_or(ProviderError::MissingStaticFileBlock(segment, 0))?;
926 let blocks_to_remove = current_block_end - last_block;
927
928 self.writer.user_header_mut().prune(blocks_to_remove);
930
931 self.writer.prune_rows(rows_to_delete as usize).map_err(ProviderError::other)?;
933 }
934
935 self.writer.user_header_mut().set_block_range(expected_block_start, last_block);
937
938 self.writer.user_header_mut().sync_changeset_offsets();
940
941 if let Some(writer) = &mut self.changeset_offsets {
943 writer.truncate(blocks_to_keep).map_err(ProviderError::other)?;
944 }
945
946 self.current_changeset_offset = None;
948
949 self.commit()?;
951
952 Ok(())
953 }
954
955 fn truncate(&mut self, num_rows: u64, last_block: Option<u64>) -> ProviderResult<()> {
963 let mut remaining_rows = num_rows;
964 let segment = self.writer.user_header().segment();
965 while remaining_rows > 0 {
966 let len = if segment.is_block_based() {
967 self.writer.user_header().block_len().unwrap_or_default()
968 } else {
969 self.writer.user_header().tx_len().unwrap_or_default()
970 };
971
972 if remaining_rows >= len {
973 let block_start = self.writer.user_header().expected_block_start();
976
977 if block_start != 0 &&
983 (segment.is_headers() || last_block.is_some_and(|b| b < block_start))
984 {
985 self.delete_current_and_open_previous()?;
986 } else {
987 self.writer.user_header_mut().prune(len);
989 self.writer.prune_rows(len as usize).map_err(ProviderError::other)?;
990 break
991 }
992
993 remaining_rows -= len;
994 } else {
995 self.writer.user_header_mut().prune(remaining_rows);
997
998 self.writer.prune_rows(remaining_rows as usize).map_err(ProviderError::other)?;
1000 remaining_rows = 0;
1001 }
1002 }
1003
1004 if let Some(last_block) = last_block {
1006 let mut expected_block_start = self.writer.user_header().expected_block_start();
1007
1008 if num_rows == 0 {
1009 while last_block < expected_block_start {
1013 self.delete_current_and_open_previous()?;
1014 expected_block_start = self.writer.user_header().expected_block_start();
1015 }
1016 }
1017 self.writer.user_header_mut().set_block_range(expected_block_start, last_block);
1018 }
1019
1020 self.commit()?;
1022
1023 Ok(())
1024 }
1025
1026 fn delete_current_and_open_previous(&mut self) -> Result<(), ProviderError> {
1029 let segment = self.user_header().segment();
1030 let current_path = self.data_path.clone();
1031 let (previous_writer, data_path) = Self::open(
1032 segment,
1033 self.writer.user_header().expected_block_start() - 1,
1034 self.reader.clone(),
1035 self.metrics.clone(),
1036 )?;
1037 self.writer = previous_writer;
1038 self.writer.set_dirty();
1039 self.data_path = data_path.clone();
1040
1041 if segment.is_change_based() {
1043 let csoff_path = current_path.with_extension("csoff");
1044 if csoff_path.exists() {
1045 std::fs::remove_file(&csoff_path).map_err(ProviderError::other)?;
1046 }
1047 let new_csoff_path = data_path.with_extension("csoff");
1049 let committed_len = self.writer.user_header().changeset_offsets_len();
1050 self.changeset_offsets = Some(
1051 ChangesetOffsetWriter::new(&new_csoff_path, committed_len)
1052 .map_err(ProviderError::other)?,
1053 );
1054 }
1055
1056 self.current_changeset_offset = None;
1058
1059 NippyJar::<SegmentHeader>::load(¤t_path)
1060 .map_err(ProviderError::other)?
1061 .delete()
1062 .map_err(ProviderError::other)?;
1063 Ok(())
1064 }
1065
1066 fn append_column<T: Compact>(&mut self, column: T) -> ProviderResult<()> {
1068 self.buf.clear();
1069 column.to_compact(&mut self.buf);
1070
1071 self.writer.append_column(Some(Ok(&self.buf))).map_err(ProviderError::other)?;
1072 Ok(())
1073 }
1074
1075 fn append_with_tx_number<V: Compact>(
1077 &mut self,
1078 tx_num: TxNumber,
1079 value: V,
1080 ) -> ProviderResult<()> {
1081 if let Some(range) = self.writer.user_header().tx_range() {
1082 let next_tx = range.end() + 1;
1083 if next_tx != tx_num {
1084 return Err(ProviderError::UnexpectedStaticFileTxNumber(
1085 self.writer.user_header().segment(),
1086 tx_num,
1087 next_tx,
1088 ))
1089 }
1090 self.writer.user_header_mut().increment_tx();
1091 } else {
1092 self.writer.user_header_mut().set_tx_range(tx_num, tx_num);
1093 }
1094
1095 self.append_column(value)?;
1096
1097 Ok(())
1098 }
1099
1100 fn append_change<V: Compact>(&mut self, change: &V) -> ProviderResult<()> {
1102 if let Some(ref mut offset) = self.current_changeset_offset {
1103 offset.increment_num_changes();
1104 }
1105 self.append_column(change)?;
1106 Ok(())
1107 }
1108
1109 pub fn append_header(&mut self, header: &N::BlockHeader, hash: &BlockHash) -> ProviderResult<()>
1114 where
1115 N::BlockHeader: Compact,
1116 {
1117 self.append_header_with_td(header, U256::ZERO, hash)
1118 }
1119
1120 pub fn append_header_with_td(
1125 &mut self,
1126 header: &N::BlockHeader,
1127 total_difficulty: U256,
1128 hash: &BlockHash,
1129 ) -> ProviderResult<()>
1130 where
1131 N::BlockHeader: Compact,
1132 {
1133 let start = Instant::now();
1134 self.ensure_no_queued_prune()?;
1135
1136 debug_assert!(self.writer.user_header().segment() == StaticFileSegment::Headers);
1137
1138 self.increment_block(header.number())?;
1139
1140 self.append_column(header)?;
1141 self.append_column(CompactU256::from(total_difficulty))?;
1142 self.append_column(hash)?;
1143
1144 if let Some(metrics) = &self.metrics {
1145 metrics.record_segment_operation(
1146 StaticFileSegment::Headers,
1147 StaticFileProviderOperation::Append,
1148 Some(start.elapsed()),
1149 );
1150 }
1151
1152 Ok(())
1153 }
1154
1155 pub fn append_header_direct(
1158 &mut self,
1159 header: &N::BlockHeader,
1160 total_difficulty: U256,
1161 hash: &BlockHash,
1162 ) -> ProviderResult<()>
1163 where
1164 N::BlockHeader: Compact,
1165 {
1166 let start = Instant::now();
1167 self.ensure_no_queued_prune()?;
1168
1169 debug_assert!(self.writer.user_header().segment() == StaticFileSegment::Headers);
1170
1171 self.append_column(header)?;
1172 self.append_column(CompactU256::from(total_difficulty))?;
1173 self.append_column(hash)?;
1174
1175 if let Some(metrics) = &self.metrics {
1176 metrics.record_segment_operation(
1177 StaticFileSegment::Headers,
1178 StaticFileProviderOperation::Append,
1179 Some(start.elapsed()),
1180 );
1181 }
1182
1183 Ok(())
1184 }
1185
1186 pub fn append_transaction(&mut self, tx_num: TxNumber, tx: &N::SignedTx) -> ProviderResult<()>
1191 where
1192 N::SignedTx: Compact,
1193 {
1194 let start = Instant::now();
1195 self.ensure_no_queued_prune()?;
1196
1197 debug_assert!(self.writer.user_header().segment() == StaticFileSegment::Transactions);
1198 self.append_with_tx_number(tx_num, tx)?;
1199
1200 if let Some(metrics) = &self.metrics {
1201 metrics.record_segment_operation(
1202 StaticFileSegment::Transactions,
1203 StaticFileProviderOperation::Append,
1204 Some(start.elapsed()),
1205 );
1206 }
1207
1208 Ok(())
1209 }
1210
1211 pub fn append_receipt(&mut self, tx_num: TxNumber, receipt: &N::Receipt) -> ProviderResult<()>
1216 where
1217 N::Receipt: Compact,
1218 {
1219 let start = Instant::now();
1220 self.ensure_no_queued_prune()?;
1221
1222 debug_assert!(self.writer.user_header().segment() == StaticFileSegment::Receipts);
1223 self.append_with_tx_number(tx_num, receipt)?;
1224
1225 if let Some(metrics) = &self.metrics {
1226 metrics.record_segment_operation(
1227 StaticFileSegment::Receipts,
1228 StaticFileProviderOperation::Append,
1229 Some(start.elapsed()),
1230 );
1231 }
1232
1233 Ok(())
1234 }
1235
1236 pub fn append_receipts<I, R>(&mut self, receipts: I) -> ProviderResult<()>
1238 where
1239 I: Iterator<Item = Result<(TxNumber, R), ProviderError>>,
1240 R: Borrow<N::Receipt>,
1241 N::Receipt: Compact,
1242 {
1243 debug_assert!(self.writer.user_header().segment() == StaticFileSegment::Receipts);
1244
1245 let mut receipts_iter = receipts.into_iter().peekable();
1246 if receipts_iter.peek().is_none() {
1248 return Ok(());
1249 }
1250
1251 let start = Instant::now();
1252 self.ensure_no_queued_prune()?;
1253
1254 let mut count: u64 = 0;
1256
1257 for receipt_result in receipts_iter {
1258 let (tx_num, receipt) = receipt_result?;
1259 self.append_with_tx_number(tx_num, receipt.borrow())?;
1260 count += 1;
1261 }
1262
1263 if let Some(metrics) = &self.metrics {
1264 metrics.record_segment_operations(
1265 StaticFileSegment::Receipts,
1266 StaticFileProviderOperation::Append,
1267 count,
1268 Some(start.elapsed()),
1269 );
1270 }
1271
1272 Ok(())
1273 }
1274
1275 pub fn append_transaction_sender(
1280 &mut self,
1281 tx_num: TxNumber,
1282 sender: &alloy_primitives::Address,
1283 ) -> ProviderResult<()> {
1284 let start = Instant::now();
1285 self.ensure_no_queued_prune()?;
1286
1287 debug_assert!(self.writer.user_header().segment() == StaticFileSegment::TransactionSenders);
1288 self.append_with_tx_number(tx_num, sender)?;
1289
1290 if let Some(metrics) = &self.metrics {
1291 metrics.record_segment_operation(
1292 StaticFileSegment::TransactionSenders,
1293 StaticFileProviderOperation::Append,
1294 Some(start.elapsed()),
1295 );
1296 }
1297
1298 Ok(())
1299 }
1300
1301 pub fn append_transaction_senders<I>(&mut self, senders: I) -> ProviderResult<()>
1303 where
1304 I: Iterator<Item = (TxNumber, alloy_primitives::Address)>,
1305 {
1306 debug_assert!(self.writer.user_header().segment() == StaticFileSegment::TransactionSenders);
1307
1308 let mut senders_iter = senders.into_iter().peekable();
1309 if senders_iter.peek().is_none() {
1311 return Ok(());
1312 }
1313
1314 let start = Instant::now();
1315 self.ensure_no_queued_prune()?;
1316
1317 let mut count: u64 = 0;
1319 for (tx_num, sender) in senders_iter {
1320 self.append_with_tx_number(tx_num, sender)?;
1321 count += 1;
1322 }
1323
1324 if let Some(metrics) = &self.metrics {
1325 metrics.record_segment_operations(
1326 StaticFileSegment::TransactionSenders,
1327 StaticFileProviderOperation::Append,
1328 count,
1329 Some(start.elapsed()),
1330 );
1331 }
1332
1333 Ok(())
1334 }
1335
1336 pub fn append_account_changeset(
1342 &mut self,
1343 mut changeset: Vec<AccountBeforeTx>,
1344 block_number: u64,
1345 ) -> ProviderResult<()> {
1346 debug_assert!(self.writer.user_header().segment() == StaticFileSegment::AccountChangeSets);
1347 let start = Instant::now();
1348
1349 self.increment_block(block_number)?;
1350 self.ensure_no_queued_prune()?;
1351
1352 changeset.sort_by_key(|change| change.address);
1354
1355 let mut count: u64 = 0;
1356
1357 for change in changeset {
1358 self.append_change(&change)?;
1359 count += 1;
1360 }
1361
1362 if let Some(metrics) = &self.metrics {
1363 metrics.record_segment_operations(
1364 StaticFileSegment::AccountChangeSets,
1365 StaticFileProviderOperation::Append,
1366 count,
1367 Some(start.elapsed()),
1368 );
1369 }
1370
1371 Ok(())
1372 }
1373
1374 pub fn append_storage_changeset(
1378 &mut self,
1379 mut changeset: Vec<StorageBeforeTx>,
1380 block_number: u64,
1381 ) -> ProviderResult<()> {
1382 debug_assert!(self.writer.user_header().segment() == StaticFileSegment::StorageChangeSets);
1383 let start = Instant::now();
1384
1385 self.increment_block(block_number)?;
1386 self.ensure_no_queued_prune()?;
1387
1388 changeset.sort_by_key(|change| (change.address, change.key));
1390
1391 let mut count: u64 = 0;
1392 for change in changeset {
1393 self.append_change(&change)?;
1394 count += 1;
1395 }
1396
1397 if let Some(metrics) = &self.metrics {
1398 metrics.record_segment_operations(
1399 StaticFileSegment::StorageChangeSets,
1400 StaticFileProviderOperation::Append,
1401 count,
1402 Some(start.elapsed()),
1403 );
1404 }
1405
1406 Ok(())
1407 }
1408
1409 pub fn prune_transactions(
1413 &mut self,
1414 to_delete: u64,
1415 last_block: BlockNumber,
1416 ) -> ProviderResult<()> {
1417 debug_assert_eq!(self.writer.user_header().segment(), StaticFileSegment::Transactions);
1418 self.queue_prune(PruneStrategy::Transactions { num_rows: to_delete, last_block })
1419 }
1420
1421 pub fn prune_receipts(
1425 &mut self,
1426 to_delete: u64,
1427 last_block: BlockNumber,
1428 ) -> ProviderResult<()> {
1429 debug_assert_eq!(self.writer.user_header().segment(), StaticFileSegment::Receipts);
1430 self.queue_prune(PruneStrategy::Receipts { num_rows: to_delete, last_block })
1431 }
1432
1433 pub fn prune_transaction_senders(
1437 &mut self,
1438 to_delete: u64,
1439 last_block: BlockNumber,
1440 ) -> ProviderResult<()> {
1441 debug_assert_eq!(
1442 self.writer.user_header().segment(),
1443 StaticFileSegment::TransactionSenders
1444 );
1445 self.queue_prune(PruneStrategy::TransactionSenders { num_rows: to_delete, last_block })
1446 }
1447
1448 pub fn prune_headers(&mut self, to_delete: u64) -> ProviderResult<()> {
1450 debug_assert_eq!(self.writer.user_header().segment(), StaticFileSegment::Headers);
1451 self.queue_prune(PruneStrategy::Headers { num_blocks: to_delete })
1452 }
1453
1454 pub fn prune_account_changesets(&mut self, last_block: u64) -> ProviderResult<()> {
1456 debug_assert_eq!(self.writer.user_header().segment(), StaticFileSegment::AccountChangeSets);
1457 self.queue_prune(PruneStrategy::AccountChangeSets { last_block })
1458 }
1459
1460 pub fn prune_storage_changesets(&mut self, last_block: u64) -> ProviderResult<()> {
1462 debug_assert_eq!(self.writer.user_header().segment(), StaticFileSegment::StorageChangeSets);
1463 self.queue_prune(PruneStrategy::StorageChangeSets { last_block })
1464 }
1465
1466 fn queue_prune(&mut self, strategy: PruneStrategy) -> ProviderResult<()> {
1468 self.ensure_no_queued_prune()?;
1469 self.prune_on_commit = Some(strategy);
1470 Ok(())
1471 }
1472
1473 fn ensure_no_queued_prune(&self) -> ProviderResult<()> {
1475 if self.prune_on_commit.is_some() {
1476 return Err(ProviderError::other(StaticFileWriterError::new(
1477 "Pruning should be committed before appending or pruning more data",
1478 )));
1479 }
1480 Ok(())
1481 }
1482
1483 fn prune_transaction_data(
1485 &mut self,
1486 to_delete: u64,
1487 last_block: BlockNumber,
1488 ) -> ProviderResult<()> {
1489 let start = Instant::now();
1490
1491 debug_assert!(self.writer.user_header().segment() == StaticFileSegment::Transactions);
1492
1493 self.truncate(to_delete, Some(last_block))?;
1494
1495 if let Some(metrics) = &self.metrics {
1496 metrics.record_segment_operation(
1497 StaticFileSegment::Transactions,
1498 StaticFileProviderOperation::Prune,
1499 Some(start.elapsed()),
1500 );
1501 }
1502
1503 Ok(())
1504 }
1505
1506 fn prune_account_changeset_data(&mut self, last_block: BlockNumber) -> ProviderResult<()> {
1508 let start = Instant::now();
1509
1510 debug_assert!(self.writer.user_header().segment() == StaticFileSegment::AccountChangeSets);
1511
1512 self.truncate_changesets(last_block)?;
1513
1514 if let Some(metrics) = &self.metrics {
1515 metrics.record_segment_operation(
1516 StaticFileSegment::AccountChangeSets,
1517 StaticFileProviderOperation::Prune,
1518 Some(start.elapsed()),
1519 );
1520 }
1521
1522 Ok(())
1523 }
1524
1525 fn prune_storage_changeset_data(&mut self, last_block: BlockNumber) -> ProviderResult<()> {
1527 let start = Instant::now();
1528
1529 debug_assert!(self.writer.user_header().segment() == StaticFileSegment::StorageChangeSets);
1530
1531 self.truncate_changesets(last_block)?;
1532
1533 if let Some(metrics) = &self.metrics {
1534 metrics.record_segment_operation(
1535 StaticFileSegment::StorageChangeSets,
1536 StaticFileProviderOperation::Prune,
1537 Some(start.elapsed()),
1538 );
1539 }
1540
1541 Ok(())
1542 }
1543
1544 fn prune_receipt_data(
1546 &mut self,
1547 to_delete: u64,
1548 last_block: BlockNumber,
1549 ) -> ProviderResult<()> {
1550 let start = Instant::now();
1551
1552 debug_assert!(self.writer.user_header().segment() == StaticFileSegment::Receipts);
1553
1554 self.truncate(to_delete, Some(last_block))?;
1555
1556 if let Some(metrics) = &self.metrics {
1557 metrics.record_segment_operation(
1558 StaticFileSegment::Receipts,
1559 StaticFileProviderOperation::Prune,
1560 Some(start.elapsed()),
1561 );
1562 }
1563
1564 Ok(())
1565 }
1566
1567 fn prune_transaction_sender_data(
1569 &mut self,
1570 to_delete: u64,
1571 last_block: BlockNumber,
1572 ) -> ProviderResult<()> {
1573 let start = Instant::now();
1574
1575 debug_assert!(self.writer.user_header().segment() == StaticFileSegment::TransactionSenders);
1576
1577 self.truncate(to_delete, Some(last_block))?;
1578
1579 if let Some(metrics) = &self.metrics {
1580 metrics.record_segment_operation(
1581 StaticFileSegment::TransactionSenders,
1582 StaticFileProviderOperation::Prune,
1583 Some(start.elapsed()),
1584 );
1585 }
1586
1587 Ok(())
1588 }
1589
1590 fn prune_header_data(&mut self, to_delete: u64) -> ProviderResult<()> {
1592 let start = Instant::now();
1593
1594 debug_assert!(self.writer.user_header().segment() == StaticFileSegment::Headers);
1595
1596 self.truncate(to_delete, None)?;
1597
1598 if let Some(metrics) = &self.metrics {
1599 metrics.record_segment_operation(
1600 StaticFileSegment::Headers,
1601 StaticFileProviderOperation::Prune,
1602 Some(start.elapsed()),
1603 );
1604 }
1605
1606 Ok(())
1607 }
1608
1609 pub fn reader(&self) -> StaticFileProvider<N> {
1611 Self::upgrade_provider_to_strong_reference(&self.reader)
1612 }
1613
1614 fn upgrade_provider_to_strong_reference(
1623 provider: &Weak<StaticFileProviderInner<N>>,
1624 ) -> StaticFileProvider<N> {
1625 provider.upgrade().map(StaticFileProvider).expect("StaticFileProvider is dropped")
1626 }
1627
1628 pub const fn user_header(&self) -> &SegmentHeader {
1630 self.writer.user_header()
1631 }
1632
1633 pub const fn user_header_mut(&mut self) -> &mut SegmentHeader {
1635 self.writer.user_header_mut()
1636 }
1637
1638 #[cfg(any(test, feature = "test-utils"))]
1640 pub const fn set_block_range(&mut self, block_range: std::ops::RangeInclusive<BlockNumber>) {
1641 self.writer.user_header_mut().set_block_range(*block_range.start(), *block_range.end())
1642 }
1643
1644 #[cfg(any(test, feature = "test-utils"))]
1646 pub const fn inner(&mut self) -> &mut NippyJarWriter<SegmentHeader> {
1647 &mut self.writer
1648 }
1649}
1650
1651fn create_jar(
1652 segment: StaticFileSegment,
1653 path: &Path,
1654 expected_block_range: SegmentRangeInclusive,
1655) -> NippyJar<SegmentHeader> {
1656 let mut jar = NippyJar::new(
1657 segment.columns(),
1658 path,
1659 SegmentHeader::new(expected_block_range, None, None, segment),
1660 );
1661
1662 if segment.is_headers() {
1665 jar = jar.with_lz4();
1666 }
1667
1668 jar
1669}