1use super::{
2 manager::StaticFileProviderInner, metrics::StaticFileProviderMetrics, StaticFileProvider,
3};
4use crate::providers::static_file::metrics::StaticFileProviderOperation;
5use alloy_consensus::BlockHeader;
6use alloy_primitives::{BlockHash, BlockNumber, TxNumber, U256};
7use parking_lot::{lock_api::RwLockWriteGuard, RawRwLock, RwLock};
8use reth_codecs::Compact;
9use reth_db::models::{AccountBeforeTx, StorageBeforeTx};
10use reth_db_api::models::CompactU256;
11use reth_nippy_jar::{NippyJar, NippyJarError, NippyJarWriter};
12use reth_node_types::NodePrimitives;
13use reth_primitives_traits::FastInstant as Instant;
14use reth_static_file_types::{
15 ChangesetOffset, ChangesetOffsetReader, ChangesetOffsetWriter, SegmentHeader,
16 SegmentRangeInclusive, StaticFileSegment,
17};
18use reth_storage_errors::provider::{ProviderError, ProviderResult, StaticFileWriterError};
19use std::{
20 borrow::Borrow,
21 cmp::Ordering,
22 fmt::Debug,
23 path::{Path, PathBuf},
24 sync::{Arc, Weak},
25};
26use tracing::{debug, instrument};
27
28#[derive(Debug, Clone, Copy)]
30enum PruneStrategy {
31 Headers {
33 num_blocks: u64,
35 },
36 Transactions {
38 num_rows: u64,
40 last_block: BlockNumber,
42 },
43 Receipts {
45 num_rows: u64,
47 last_block: BlockNumber,
49 },
50 TransactionSenders {
52 num_rows: u64,
54 last_block: BlockNumber,
56 },
57 AccountChangeSets {
59 last_block: BlockNumber,
61 },
62 StorageChangeSets {
64 last_block: BlockNumber,
66 },
67}
68
69#[derive(Debug)]
74pub(crate) struct StaticFileWriters<N> {
75 headers: RwLock<Option<StaticFileProviderRW<N>>>,
76 transactions: RwLock<Option<StaticFileProviderRW<N>>>,
77 receipts: RwLock<Option<StaticFileProviderRW<N>>>,
78 transaction_senders: RwLock<Option<StaticFileProviderRW<N>>>,
79 account_change_sets: RwLock<Option<StaticFileProviderRW<N>>>,
80 storage_change_sets: RwLock<Option<StaticFileProviderRW<N>>>,
81}
82
83impl<N> Default for StaticFileWriters<N> {
84 fn default() -> Self {
85 Self {
86 headers: Default::default(),
87 transactions: Default::default(),
88 receipts: Default::default(),
89 transaction_senders: Default::default(),
90 account_change_sets: Default::default(),
91 storage_change_sets: Default::default(),
92 }
93 }
94}
95
96impl<N: NodePrimitives> StaticFileWriters<N> {
97 pub(crate) fn get_or_create(
98 &self,
99 segment: StaticFileSegment,
100 create_fn: impl FnOnce() -> ProviderResult<StaticFileProviderRW<N>>,
101 ) -> ProviderResult<StaticFileProviderRWRefMut<'_, N>> {
102 let mut write_guard = match segment {
103 StaticFileSegment::Headers => self.headers.write(),
104 StaticFileSegment::Transactions => self.transactions.write(),
105 StaticFileSegment::Receipts => self.receipts.write(),
106 StaticFileSegment::TransactionSenders => self.transaction_senders.write(),
107 StaticFileSegment::AccountChangeSets => self.account_change_sets.write(),
108 StaticFileSegment::StorageChangeSets => self.storage_change_sets.write(),
109 };
110
111 if write_guard.is_none() {
112 *write_guard = Some(create_fn()?);
113 }
114
115 Ok(StaticFileProviderRWRefMut(write_guard))
116 }
117
118 pub(crate) fn remove(&self, segment: StaticFileSegment) {
120 let mut write_guard = match segment {
121 StaticFileSegment::Headers => self.headers.write(),
122 StaticFileSegment::Transactions => self.transactions.write(),
123 StaticFileSegment::Receipts => self.receipts.write(),
124 StaticFileSegment::TransactionSenders => self.transaction_senders.write(),
125 StaticFileSegment::AccountChangeSets => self.account_change_sets.write(),
126 StaticFileSegment::StorageChangeSets => self.storage_change_sets.write(),
127 };
128
129 *write_guard = None;
130 }
131
132 #[instrument(
133 name = "StaticFileWriters::commit",
134 level = "debug",
135 target = "providers::static_file",
136 skip_all
137 )]
138 pub(crate) fn commit(&self) -> ProviderResult<()> {
139 debug!(target: "providers::static_file", "Committing all static file segments");
140
141 for writer_lock in [
142 &self.headers,
143 &self.transactions,
144 &self.receipts,
145 &self.transaction_senders,
146 &self.account_change_sets,
147 &self.storage_change_sets,
148 ] {
149 let mut writer = writer_lock.write();
150 if let Some(writer) = writer.as_mut() {
151 writer.commit()?;
152 }
153 }
154
155 debug!(target: "providers::static_file", "Committed all static file segments");
156 Ok(())
157 }
158
159 pub(crate) fn has_unwind_queued(&self) -> bool {
160 for writer_lock in [
161 &self.headers,
162 &self.transactions,
163 &self.receipts,
164 &self.transaction_senders,
165 &self.account_change_sets,
166 &self.storage_change_sets,
167 ] {
168 let writer = writer_lock.read();
169 if let Some(writer) = writer.as_ref() &&
170 writer.will_prune_on_commit()
171 {
172 return true
173 }
174 }
175 false
176 }
177
178 #[instrument(
183 name = "StaticFileWriters::finalize",
184 level = "debug",
185 target = "providers::static_file",
186 skip_all
187 )]
188 pub(crate) fn finalize(&self) -> ProviderResult<()> {
189 debug!(target: "providers::static_file", "Finalizing all static file segments into disk");
190
191 for writer_lock in [
192 &self.headers,
193 &self.transactions,
194 &self.receipts,
195 &self.transaction_senders,
196 &self.account_change_sets,
197 &self.storage_change_sets,
198 ] {
199 let mut writer = writer_lock.write();
200 if let Some(writer) = writer.as_mut() {
201 writer.finalize()?;
202 }
203 }
204
205 debug!(target: "providers::static_file", "Finalized all static file segments into disk");
206 Ok(())
207 }
208}
209
210#[derive(Debug)]
212pub struct StaticFileProviderRWRefMut<'a, N>(
213 pub(crate) RwLockWriteGuard<'a, RawRwLock, Option<StaticFileProviderRW<N>>>,
214);
215
216impl<N> std::ops::DerefMut for StaticFileProviderRWRefMut<'_, N> {
217 fn deref_mut(&mut self) -> &mut Self::Target {
218 self.0.as_mut().expect("static file writer provider should be init")
220 }
221}
222
223impl<N> std::ops::Deref for StaticFileProviderRWRefMut<'_, N> {
224 type Target = StaticFileProviderRW<N>;
225
226 fn deref(&self) -> &Self::Target {
227 self.0.as_ref().expect("static file writer provider should be init")
229 }
230}
231
232#[derive(Debug)]
233pub struct StaticFileProviderRW<N> {
235 reader: Weak<StaticFileProviderInner<N>>,
240 writer: NippyJarWriter<SegmentHeader>,
242 data_path: PathBuf,
244 buf: Vec<u8>,
246 metrics: Option<Arc<StaticFileProviderMetrics>>,
248 prune_on_commit: Option<PruneStrategy>,
250 synced: bool,
252 changeset_offsets: Option<ChangesetOffsetWriter>,
254 current_changeset_offset: Option<ChangesetOffset>,
256}
257
258impl<N: NodePrimitives> StaticFileProviderRW<N> {
259 pub fn new(
264 segment: StaticFileSegment,
265 block: BlockNumber,
266 reader: Weak<StaticFileProviderInner<N>>,
267 metrics: Option<Arc<StaticFileProviderMetrics>>,
268 ) -> ProviderResult<Self> {
269 let (writer, data_path) = Self::open(segment, block, reader.clone(), metrics.clone())?;
270
271 let mut writer = Self {
273 writer,
274 data_path,
275 buf: Vec::with_capacity(100),
276 reader,
277 metrics,
278 prune_on_commit: None,
279 synced: false,
280 changeset_offsets: None,
281 current_changeset_offset: None,
282 };
283
284 writer.ensure_end_range_consistency()?;
287
288 if segment.is_change_based() {
290 writer.heal_changeset_sidecar()?;
291 }
292
293 Ok(writer)
294 }
295
296 fn open(
297 segment: StaticFileSegment,
298 block: u64,
299 reader: Weak<StaticFileProviderInner<N>>,
300 metrics: Option<Arc<StaticFileProviderMetrics>>,
301 ) -> ProviderResult<(NippyJarWriter<SegmentHeader>, PathBuf)> {
302 let start = Instant::now();
303
304 let static_file_provider = Self::upgrade_provider_to_strong_reference(&reader);
305
306 let block_range = static_file_provider.find_fixed_range(segment, block);
307 let (jar, path) = match static_file_provider.get_segment_provider_for_block(
308 segment,
309 block_range.start(),
310 None,
311 ) {
312 Ok(provider) => (
313 NippyJar::load(provider.data_path()).map_err(ProviderError::other)?,
314 provider.data_path().into(),
315 ),
316 Err(ProviderError::MissingStaticFileBlock(_, _)) => {
317 let path = static_file_provider.directory().join(segment.filename(&block_range));
318 (create_jar(segment, &path, block_range), path)
319 }
320 Err(err) => return Err(err),
321 };
322
323 let result = match NippyJarWriter::new(jar) {
324 Ok(writer) => Ok((writer, path)),
325 Err(NippyJarError::FrozenJar) => {
326 Err(ProviderError::FinalizedStaticFile(segment, block))
328 }
329 Err(e) => Err(ProviderError::other(e)),
330 }?;
331
332 if let Some(metrics) = &metrics {
333 metrics.record_segment_operation(
334 segment,
335 StaticFileProviderOperation::OpenWriter,
336 Some(start.elapsed()),
337 );
338 }
339
340 Ok(result)
341 }
342
343 fn ensure_end_range_consistency(&mut self) -> ProviderResult<()> {
352 let expected_rows = if self.user_header().segment().is_headers() {
354 self.user_header().block_len().unwrap_or_default()
355 } else {
356 self.user_header().tx_len().unwrap_or_default()
357 };
358 let actual_rows = self.writer.rows() as u64;
359 let pruned_rows = expected_rows.saturating_sub(actual_rows);
360 if pruned_rows > 0 {
361 self.user_header_mut().prune(pruned_rows);
362 }
363
364 debug!(
365 target: "providers::static_file",
366 segment = ?self.writer.user_header().segment(),
367 path = ?self.data_path,
368 pruned_rows,
369 "Ensuring end range consistency"
370 );
371
372 self.writer.commit().map_err(ProviderError::other)?;
373
374 self.update_index()?;
376 Ok(())
377 }
378
379 pub const fn will_prune_on_commit(&self) -> bool {
381 self.prune_on_commit.is_some()
382 }
383
384 fn heal_changeset_sidecar(&mut self) -> ProviderResult<()> {
392 let csoff_path = self.data_path.with_extension("csoff");
393
394 let header_claims_blocks = self.writer.user_header().changeset_offsets_len();
396 let actual_nippy_rows = self.writer.rows() as u64;
397
398 let actual_sidecar_blocks = if csoff_path.exists() {
400 let file_len = reth_fs_util::metadata(&csoff_path).map_err(ProviderError::other)?.len();
401 let aligned_len = file_len - (file_len % 16);
403 aligned_len / 16
404 } else {
405 0
406 };
407
408 if header_claims_blocks == 0 && actual_sidecar_blocks == 0 {
410 self.changeset_offsets =
411 Some(ChangesetOffsetWriter::new(&csoff_path, 0).map_err(ProviderError::other)?);
412 return Ok(());
413 }
414
415 let valid_blocks = if actual_sidecar_blocks > 0 {
417 let reader = ChangesetOffsetReader::new(&csoff_path, actual_sidecar_blocks)
418 .map_err(ProviderError::other)?;
419
420 let mut valid = 0u64;
423 for i in 0..actual_sidecar_blocks {
424 if let Some(offset) = reader.get(i).map_err(ProviderError::other)? {
425 if offset.offset() + offset.num_changes() <= actual_nippy_rows {
426 valid = i + 1;
427 } else {
428 break;
430 }
431 }
432 }
433 valid
434 } else {
435 0
436 };
437
438 let correct_blocks = valid_blocks.min(header_claims_blocks);
441
442 let mut needs_header_commit = false;
444
445 if correct_blocks != header_claims_blocks || actual_sidecar_blocks != correct_blocks {
446 tracing::warn!(
447 target: "reth::static_file",
448 path = %csoff_path.display(),
449 header_claims = header_claims_blocks,
450 sidecar_has = actual_sidecar_blocks,
451 valid_blocks = correct_blocks,
452 actual_rows = actual_nippy_rows,
453 "Three-way healing: syncing header, sidecar, and NippyJar state"
454 );
455
456 if actual_sidecar_blocks > correct_blocks {
458 use std::fs::OpenOptions;
459 let file = OpenOptions::new()
460 .write(true)
461 .open(&csoff_path)
462 .map_err(ProviderError::other)?;
463 file.set_len(correct_blocks * 16).map_err(ProviderError::other)?;
464 file.sync_all().map_err(ProviderError::other)?;
465
466 tracing::debug!(
467 target: "reth::static_file",
468 "Truncated sidecar from {} to {} blocks",
469 actual_sidecar_blocks,
470 correct_blocks
471 );
472 }
473
474 if correct_blocks < header_claims_blocks {
476 let blocks_removed = header_claims_blocks - correct_blocks;
479 self.writer.user_header_mut().prune(blocks_removed);
480
481 tracing::debug!(
482 target: "reth::static_file",
483 "Updated header: removed {} blocks (changeset_offsets_len: {} -> {})",
484 blocks_removed,
485 header_claims_blocks,
486 correct_blocks
487 );
488
489 needs_header_commit = true;
490 }
491 } else {
492 tracing::debug!(
493 target: "reth::static_file",
494 path = %csoff_path.display(),
495 blocks = correct_blocks,
496 "Changeset sidecar consistent, no healing needed"
497 );
498 }
499
500 let csoff_writer = ChangesetOffsetWriter::new(&csoff_path, correct_blocks)
502 .map_err(ProviderError::other)?;
503
504 self.changeset_offsets = Some(csoff_writer);
505
506 if needs_header_commit {
508 self.writer.commit().map_err(ProviderError::other)?;
509
510 tracing::info!(
511 target: "reth::static_file",
512 path = %csoff_path.display(),
513 blocks = correct_blocks,
514 "Committed healed changeset offset header"
515 );
516 }
517
518 Ok(())
519 }
520
521 fn flush_current_changeset_offset(&mut self) -> ProviderResult<()> {
529 if !self.writer.user_header().segment().is_change_based() {
530 return Ok(());
531 }
532
533 if let Some(offset) = self.current_changeset_offset.take() &&
534 let Some(writer) = &mut self.changeset_offsets
535 {
536 writer.append(&offset).map_err(ProviderError::other)?;
537 }
538 Ok(())
539 }
540
541 pub fn sync_all(&mut self) -> ProviderResult<()> {
548 if self.prune_on_commit.is_some() {
549 return Err(StaticFileWriterError::FinalizeWithPruneQueued.into());
550 }
551
552 self.flush_current_changeset_offset()?;
554 if let Some(writer) = &mut self.changeset_offsets {
555 writer.sync().map_err(ProviderError::other)?;
556 self.writer.user_header_mut().set_changeset_offsets_len(writer.len());
558 }
559
560 if self.writer.is_dirty() {
561 self.writer.sync_all().map_err(ProviderError::other)?;
562 }
563 self.synced = true;
564 Ok(())
565 }
566
567 #[instrument(
573 name = "StaticFileProviderRW::finalize",
574 level = "debug",
575 target = "providers::static_file",
576 skip_all
577 )]
578 pub fn finalize(&mut self) -> ProviderResult<()> {
579 if self.prune_on_commit.is_some() {
580 return Err(StaticFileWriterError::FinalizeWithPruneQueued.into());
581 }
582 if self.writer.is_dirty() {
583 if !self.synced {
584 self.sync_all()?;
587 }
588
589 self.writer.finalize().map_err(ProviderError::other)?;
590 self.update_index()?;
591 }
592 self.synced = false;
593 Ok(())
594 }
595
596 #[instrument(
598 name = "StaticFileProviderRW::commit",
599 level = "debug",
600 target = "providers::static_file",
601 skip_all
602 )]
603 pub fn commit(&mut self) -> ProviderResult<()> {
604 let start = Instant::now();
605
606 if let Some(strategy) = self.prune_on_commit.take() {
608 debug!(
609 target: "providers::static_file",
610 segment = ?self.writer.user_header().segment(),
611 "Pruning data on commit"
612 );
613 match strategy {
614 PruneStrategy::Headers { num_blocks } => self.prune_header_data(num_blocks)?,
615 PruneStrategy::Transactions { num_rows, last_block } => {
616 self.prune_transaction_data(num_rows, last_block)?
617 }
618 PruneStrategy::Receipts { num_rows, last_block } => {
619 self.prune_receipt_data(num_rows, last_block)?
620 }
621 PruneStrategy::TransactionSenders { num_rows, last_block } => {
622 self.prune_transaction_sender_data(num_rows, last_block)?
623 }
624 PruneStrategy::AccountChangeSets { last_block } => {
625 self.prune_account_changeset_data(last_block)?
626 }
627 PruneStrategy::StorageChangeSets { last_block } => {
628 self.prune_storage_changeset_data(last_block)?
629 }
630 }
631 }
632
633 self.flush_current_changeset_offset()?;
636 if let Some(writer) = &mut self.changeset_offsets {
637 writer.sync().map_err(ProviderError::other)?;
638 self.writer.user_header_mut().set_changeset_offsets_len(writer.len());
640 }
641
642 if self.writer.is_dirty() {
643 debug!(
644 target: "providers::static_file",
645 segment = ?self.writer.user_header().segment(),
646 "Committing writer to disk"
647 );
648
649 self.writer.commit().map_err(ProviderError::other)?;
651
652 if let Some(metrics) = &self.metrics {
653 metrics.record_segment_operation(
654 self.writer.user_header().segment(),
655 StaticFileProviderOperation::CommitWriter,
656 Some(start.elapsed()),
657 );
658 }
659
660 debug!(
661 target: "providers::static_file",
662 segment = ?self.writer.user_header().segment(),
663 path = ?self.data_path,
664 duration = ?start.elapsed(),
665 "Committed writer to disk"
666 );
667
668 self.update_index()?;
669 }
670
671 Ok(())
672 }
673
674 #[cfg(feature = "test-utils")]
678 pub fn commit_without_sync_all(&mut self) -> ProviderResult<()> {
679 let start = Instant::now();
680
681 debug!(
682 target: "providers::static_file",
683 segment = ?self.writer.user_header().segment(),
684 "Committing writer to disk (without sync)"
685 );
686
687 self.writer.commit_without_sync_all().map_err(ProviderError::other)?;
689
690 if let Some(metrics) = &self.metrics {
691 metrics.record_segment_operation(
692 self.writer.user_header().segment(),
693 StaticFileProviderOperation::CommitWriter,
694 Some(start.elapsed()),
695 );
696 }
697
698 debug!(
699 target: "providers::static_file",
700 segment = ?self.writer.user_header().segment(),
701 path = ?self.data_path,
702 duration = ?start.elapsed(),
703 "Committed writer to disk (without sync)"
704 );
705
706 self.update_index()?;
707
708 Ok(())
709 }
710
711 fn update_index(&self) -> ProviderResult<()> {
713 let segment = self.writer.user_header().segment();
714
715 let segment_max_block = self
723 .writer
724 .user_header()
725 .block_range()
726 .as_ref()
727 .map(|block_range| block_range.end())
728 .or_else(|| {
729 let expected_start = self.writer.user_header().expected_block_start();
730 if expected_start <= self.reader().genesis_block_number() {
731 return None;
732 }
733
734 let prev_block = expected_start - 1;
735 let prev_range = self.reader().find_fixed_range(segment, prev_block);
736 let prev_path = self.reader().directory().join(segment.filename(&prev_range));
737 prev_path.exists().then_some(prev_block)
738 });
739
740 self.reader().update_index(segment, segment_max_block)
741 }
742
743 pub fn ensure_at_block(&mut self, advance_to: BlockNumber) -> ProviderResult<()> {
748 let current_block = if let Some(current_block_number) = self.current_block_number() {
749 current_block_number
750 } else {
751 self.increment_block(0)?;
752 0
753 };
754
755 match current_block.cmp(&advance_to) {
756 Ordering::Less => {
757 for block in current_block + 1..=advance_to {
758 self.increment_block(block)?;
759 }
760 }
761 Ordering::Equal => {}
762 Ordering::Greater => {
763 return Err(ProviderError::UnexpectedStaticFileBlockNumber(
764 self.writer.user_header().segment(),
765 current_block,
766 advance_to,
767 ));
768 }
769 }
770
771 Ok(())
772 }
773
774 pub fn increment_block(&mut self, expected_block_number: BlockNumber) -> ProviderResult<()> {
777 let segment = self.writer.user_header().segment();
778
779 self.check_next_block_number(expected_block_number)?;
780
781 let start = Instant::now();
782 if let Some(last_block) = self.writer.user_header().block_end() {
783 if last_block == self.writer.user_header().expected_block_end() {
785 self.commit()?;
787
788 let (writer, data_path) =
790 Self::open(segment, last_block + 1, self.reader.clone(), self.metrics.clone())?;
791 self.writer = writer;
792 self.data_path = data_path.clone();
793
794 if segment.is_change_based() {
796 let csoff_path = data_path.with_extension("csoff");
797 self.changeset_offsets = Some(
798 ChangesetOffsetWriter::new(&csoff_path, 0).map_err(ProviderError::other)?,
799 );
800 }
801
802 *self.writer.user_header_mut() = SegmentHeader::new(
803 self.reader().find_fixed_range(segment, last_block + 1),
804 None,
805 None,
806 segment,
807 );
808 }
809 }
810
811 self.writer.user_header_mut().increment_block();
812
813 if segment.is_change_based() {
815 if let Some(offset) = self.current_changeset_offset.take() &&
817 let Some(writer) = &mut self.changeset_offsets
818 {
819 writer.append(&offset).map_err(ProviderError::other)?;
820 }
821 let new_offset = self.writer.rows() as u64;
823 self.current_changeset_offset = Some(ChangesetOffset::new(new_offset, 0));
824 }
825
826 if let Some(metrics) = &self.metrics {
827 metrics.record_segment_operation(
828 segment,
829 StaticFileProviderOperation::IncrementBlock,
830 Some(start.elapsed()),
831 );
832 }
833
834 Ok(())
835 }
836
837 pub fn current_block_number(&self) -> Option<u64> {
839 self.writer.user_header().block_end()
840 }
841
842 pub fn next_block_number(&self) -> u64 {
844 self.writer
848 .user_header()
849 .block_end()
850 .map(|b| b + 1)
851 .unwrap_or_else(|| self.writer.user_header().expected_block_start())
852 }
853
854 fn check_next_block_number(&self, expected_block_number: u64) -> ProviderResult<()> {
857 let next_static_file_block = self.next_block_number();
858
859 if expected_block_number != next_static_file_block {
860 return Err(ProviderError::UnexpectedStaticFileBlockNumber(
861 self.writer.user_header().segment(),
862 expected_block_number,
863 next_static_file_block,
864 ))
865 }
866 Ok(())
867 }
868
869 fn truncate_changesets(&mut self, last_block: u64) -> ProviderResult<()> {
875 let segment = self.writer.user_header().segment();
876 debug_assert!(segment.is_change_based());
877
878 let current_block_end = self
880 .writer
881 .user_header()
882 .block_end()
883 .ok_or(ProviderError::MissingStaticFileBlock(segment, 0))?;
884
885 if current_block_end <= last_block {
887 return Ok(())
888 }
889
890 let mut expected_block_start = self.writer.user_header().expected_block_start();
892 while last_block < expected_block_start && expected_block_start > 0 {
893 self.delete_current_and_open_previous()?;
894 expected_block_start = self.writer.user_header().expected_block_start();
895 }
896
897 let blocks_to_keep = if last_block >= expected_block_start {
899 last_block - expected_block_start + 1
900 } else {
901 0
902 };
903
904 let csoff_path = self.data_path.with_extension("csoff");
906 let changeset_offsets_len = self.writer.user_header().changeset_offsets_len();
907
908 self.flush_current_changeset_offset()?;
910
911 let rows_to_keep = if blocks_to_keep == 0 {
912 0
913 } else if blocks_to_keep >= changeset_offsets_len {
914 self.writer.rows() as u64
916 } else {
917 let reader = ChangesetOffsetReader::new(&csoff_path, changeset_offsets_len)
921 .map_err(ProviderError::other)?;
922 if let Some(next_offset) = reader.get(blocks_to_keep).map_err(ProviderError::other)? {
923 next_offset.offset()
924 } else {
925 self.writer.rows() as u64
927 }
928 };
929
930 let total_rows = self.writer.rows() as u64;
931 let rows_to_delete = total_rows.saturating_sub(rows_to_keep);
932
933 if rows_to_delete > 0 {
934 let current_block_end = self
936 .writer
937 .user_header()
938 .block_end()
939 .ok_or(ProviderError::MissingStaticFileBlock(segment, 0))?;
940 let blocks_to_remove = current_block_end - last_block;
941
942 self.writer.user_header_mut().prune(blocks_to_remove);
944
945 self.writer.prune_rows(rows_to_delete as usize).map_err(ProviderError::other)?;
947 }
948
949 self.writer.user_header_mut().set_block_range(expected_block_start, last_block);
951
952 self.writer.user_header_mut().sync_changeset_offsets();
954
955 if let Some(writer) = &mut self.changeset_offsets {
957 writer.truncate(blocks_to_keep).map_err(ProviderError::other)?;
958 }
959
960 self.current_changeset_offset = None;
962
963 self.commit()?;
965
966 Ok(())
967 }
968
969 fn truncate(&mut self, num_rows: u64, last_block: Option<u64>) -> ProviderResult<()> {
977 let mut remaining_rows = num_rows;
978 let segment = self.writer.user_header().segment();
979 while remaining_rows > 0 {
980 let len = if segment.is_block_based() {
981 self.writer.user_header().block_len().unwrap_or_default()
982 } else {
983 self.writer.user_header().tx_len().unwrap_or_default()
984 };
985
986 if remaining_rows >= len {
987 let block_start = self.writer.user_header().expected_block_start();
990
991 if block_start != 0 &&
997 (segment.is_headers() || last_block.is_some_and(|b| b < block_start))
998 {
999 self.delete_current_and_open_previous()?;
1000 } else {
1001 self.writer.user_header_mut().prune(len);
1003 self.writer.prune_rows(len as usize).map_err(ProviderError::other)?;
1004 break
1005 }
1006
1007 remaining_rows -= len;
1008 } else {
1009 self.writer.user_header_mut().prune(remaining_rows);
1011
1012 self.writer.prune_rows(remaining_rows as usize).map_err(ProviderError::other)?;
1014 remaining_rows = 0;
1015 }
1016 }
1017
1018 if let Some(last_block) = last_block {
1020 let mut expected_block_start = self.writer.user_header().expected_block_start();
1021
1022 if num_rows == 0 {
1023 while last_block < expected_block_start {
1027 self.delete_current_and_open_previous()?;
1028 expected_block_start = self.writer.user_header().expected_block_start();
1029 }
1030 }
1031 self.writer.user_header_mut().set_block_range(expected_block_start, last_block);
1032 }
1033
1034 self.commit()?;
1036
1037 Ok(())
1038 }
1039
1040 fn delete_current_and_open_previous(&mut self) -> Result<(), ProviderError> {
1043 let segment = self.user_header().segment();
1044 let current_path = self.data_path.clone();
1045 let (previous_writer, data_path) = Self::open(
1046 segment,
1047 self.writer.user_header().expected_block_start() - 1,
1048 self.reader.clone(),
1049 self.metrics.clone(),
1050 )?;
1051 self.writer = previous_writer;
1052 self.writer.set_dirty();
1053 self.data_path = data_path.clone();
1054
1055 if segment.is_change_based() {
1057 let csoff_path = current_path.with_extension("csoff");
1058 if csoff_path.exists() {
1059 std::fs::remove_file(&csoff_path).map_err(ProviderError::other)?;
1060 }
1061 let new_csoff_path = data_path.with_extension("csoff");
1063 let committed_len = self.writer.user_header().changeset_offsets_len();
1064 self.changeset_offsets = Some(
1065 ChangesetOffsetWriter::new(&new_csoff_path, committed_len)
1066 .map_err(ProviderError::other)?,
1067 );
1068 }
1069
1070 self.current_changeset_offset = None;
1072
1073 NippyJar::<SegmentHeader>::load(¤t_path)
1074 .map_err(ProviderError::other)?
1075 .delete()
1076 .map_err(ProviderError::other)?;
1077 Ok(())
1078 }
1079
1080 fn append_column<T: Compact>(&mut self, column: T) -> ProviderResult<()> {
1082 self.buf.clear();
1083 column.to_compact(&mut self.buf);
1084
1085 self.writer.append_column(Some(Ok(&self.buf))).map_err(ProviderError::other)?;
1086 Ok(())
1087 }
1088
1089 fn append_with_tx_number<V: Compact>(
1091 &mut self,
1092 tx_num: TxNumber,
1093 value: V,
1094 ) -> ProviderResult<()> {
1095 if let Some(range) = self.writer.user_header().tx_range() {
1096 let next_tx = range.end() + 1;
1097 if next_tx != tx_num {
1098 return Err(ProviderError::UnexpectedStaticFileTxNumber(
1099 self.writer.user_header().segment(),
1100 tx_num,
1101 next_tx,
1102 ))
1103 }
1104 self.writer.user_header_mut().increment_tx();
1105 } else {
1106 self.writer.user_header_mut().set_tx_range(tx_num, tx_num);
1107 }
1108
1109 self.append_column(value)?;
1110
1111 Ok(())
1112 }
1113
1114 fn append_change<V: Compact>(&mut self, change: &V) -> ProviderResult<()> {
1116 if let Some(ref mut offset) = self.current_changeset_offset {
1117 offset.increment_num_changes();
1118 }
1119 self.append_column(change)?;
1120 Ok(())
1121 }
1122
1123 pub fn append_header(&mut self, header: &N::BlockHeader, hash: &BlockHash) -> ProviderResult<()>
1128 where
1129 N::BlockHeader: Compact,
1130 {
1131 self.append_header_with_td(header, U256::ZERO, hash)
1132 }
1133
1134 pub fn append_header_with_td(
1139 &mut self,
1140 header: &N::BlockHeader,
1141 total_difficulty: U256,
1142 hash: &BlockHash,
1143 ) -> ProviderResult<()>
1144 where
1145 N::BlockHeader: Compact,
1146 {
1147 let start = Instant::now();
1148 self.ensure_no_queued_prune()?;
1149
1150 debug_assert!(self.writer.user_header().segment() == StaticFileSegment::Headers);
1151
1152 self.increment_block(header.number())?;
1153
1154 self.append_column(header)?;
1155 self.append_column(CompactU256::from(total_difficulty))?;
1156 self.append_column(hash)?;
1157
1158 if let Some(metrics) = &self.metrics {
1159 metrics.record_segment_operation(
1160 StaticFileSegment::Headers,
1161 StaticFileProviderOperation::Append,
1162 Some(start.elapsed()),
1163 );
1164 }
1165
1166 Ok(())
1167 }
1168
1169 pub fn append_header_direct(
1172 &mut self,
1173 header: &N::BlockHeader,
1174 total_difficulty: U256,
1175 hash: &BlockHash,
1176 ) -> ProviderResult<()>
1177 where
1178 N::BlockHeader: Compact,
1179 {
1180 let start = Instant::now();
1181 self.ensure_no_queued_prune()?;
1182
1183 debug_assert!(self.writer.user_header().segment() == StaticFileSegment::Headers);
1184
1185 self.append_column(header)?;
1186 self.append_column(CompactU256::from(total_difficulty))?;
1187 self.append_column(hash)?;
1188
1189 if let Some(metrics) = &self.metrics {
1190 metrics.record_segment_operation(
1191 StaticFileSegment::Headers,
1192 StaticFileProviderOperation::Append,
1193 Some(start.elapsed()),
1194 );
1195 }
1196
1197 Ok(())
1198 }
1199
1200 pub fn append_transaction(&mut self, tx_num: TxNumber, tx: &N::SignedTx) -> ProviderResult<()>
1205 where
1206 N::SignedTx: Compact,
1207 {
1208 let start = Instant::now();
1209 self.ensure_no_queued_prune()?;
1210
1211 debug_assert!(self.writer.user_header().segment() == StaticFileSegment::Transactions);
1212 self.append_with_tx_number(tx_num, tx)?;
1213
1214 if let Some(metrics) = &self.metrics {
1215 metrics.record_segment_operation(
1216 StaticFileSegment::Transactions,
1217 StaticFileProviderOperation::Append,
1218 Some(start.elapsed()),
1219 );
1220 }
1221
1222 Ok(())
1223 }
1224
1225 pub fn append_receipt(&mut self, tx_num: TxNumber, receipt: &N::Receipt) -> ProviderResult<()>
1230 where
1231 N::Receipt: Compact,
1232 {
1233 let start = Instant::now();
1234 self.ensure_no_queued_prune()?;
1235
1236 debug_assert!(self.writer.user_header().segment() == StaticFileSegment::Receipts);
1237 self.append_with_tx_number(tx_num, receipt)?;
1238
1239 if let Some(metrics) = &self.metrics {
1240 metrics.record_segment_operation(
1241 StaticFileSegment::Receipts,
1242 StaticFileProviderOperation::Append,
1243 Some(start.elapsed()),
1244 );
1245 }
1246
1247 Ok(())
1248 }
1249
1250 pub fn append_receipts<I, R>(&mut self, receipts: I) -> ProviderResult<()>
1252 where
1253 I: Iterator<Item = Result<(TxNumber, R), ProviderError>>,
1254 R: Borrow<N::Receipt>,
1255 N::Receipt: Compact,
1256 {
1257 debug_assert!(self.writer.user_header().segment() == StaticFileSegment::Receipts);
1258
1259 let mut receipts_iter = receipts.into_iter().peekable();
1260 if receipts_iter.peek().is_none() {
1262 return Ok(());
1263 }
1264
1265 let start = Instant::now();
1266 self.ensure_no_queued_prune()?;
1267
1268 let mut count: u64 = 0;
1270
1271 for receipt_result in receipts_iter {
1272 let (tx_num, receipt) = receipt_result?;
1273 self.append_with_tx_number(tx_num, receipt.borrow())?;
1274 count += 1;
1275 }
1276
1277 if let Some(metrics) = &self.metrics {
1278 metrics.record_segment_operations(
1279 StaticFileSegment::Receipts,
1280 StaticFileProviderOperation::Append,
1281 count,
1282 Some(start.elapsed()),
1283 );
1284 }
1285
1286 Ok(())
1287 }
1288
1289 pub fn append_transaction_sender(
1294 &mut self,
1295 tx_num: TxNumber,
1296 sender: &alloy_primitives::Address,
1297 ) -> ProviderResult<()> {
1298 let start = Instant::now();
1299 self.ensure_no_queued_prune()?;
1300
1301 debug_assert!(self.writer.user_header().segment() == StaticFileSegment::TransactionSenders);
1302 self.append_with_tx_number(tx_num, sender)?;
1303
1304 if let Some(metrics) = &self.metrics {
1305 metrics.record_segment_operation(
1306 StaticFileSegment::TransactionSenders,
1307 StaticFileProviderOperation::Append,
1308 Some(start.elapsed()),
1309 );
1310 }
1311
1312 Ok(())
1313 }
1314
1315 pub fn append_transaction_senders<I>(&mut self, senders: I) -> ProviderResult<()>
1317 where
1318 I: Iterator<Item = (TxNumber, alloy_primitives::Address)>,
1319 {
1320 debug_assert!(self.writer.user_header().segment() == StaticFileSegment::TransactionSenders);
1321
1322 let mut senders_iter = senders.into_iter().peekable();
1323 if senders_iter.peek().is_none() {
1325 return Ok(());
1326 }
1327
1328 let start = Instant::now();
1329 self.ensure_no_queued_prune()?;
1330
1331 let mut count: u64 = 0;
1333 for (tx_num, sender) in senders_iter {
1334 self.append_with_tx_number(tx_num, sender)?;
1335 count += 1;
1336 }
1337
1338 if let Some(metrics) = &self.metrics {
1339 metrics.record_segment_operations(
1340 StaticFileSegment::TransactionSenders,
1341 StaticFileProviderOperation::Append,
1342 count,
1343 Some(start.elapsed()),
1344 );
1345 }
1346
1347 Ok(())
1348 }
1349
1350 pub fn append_account_changeset(
1356 &mut self,
1357 mut changeset: Vec<AccountBeforeTx>,
1358 block_number: u64,
1359 ) -> ProviderResult<()> {
1360 debug_assert!(self.writer.user_header().segment() == StaticFileSegment::AccountChangeSets);
1361 let start = Instant::now();
1362
1363 self.increment_block(block_number)?;
1364 self.ensure_no_queued_prune()?;
1365
1366 changeset.sort_by_key(|change| change.address);
1368
1369 let mut count: u64 = 0;
1370
1371 for change in changeset {
1372 self.append_change(&change)?;
1373 count += 1;
1374 }
1375
1376 if let Some(metrics) = &self.metrics {
1377 metrics.record_segment_operations(
1378 StaticFileSegment::AccountChangeSets,
1379 StaticFileProviderOperation::Append,
1380 count,
1381 Some(start.elapsed()),
1382 );
1383 }
1384
1385 Ok(())
1386 }
1387
1388 pub fn begin_account_changeset(&mut self, block_number: u64) -> ProviderResult<()> {
1393 debug_assert!(self.writer.user_header().segment() == StaticFileSegment::AccountChangeSets);
1394
1395 self.increment_block(block_number)?;
1396 self.ensure_no_queued_prune()
1397 }
1398
1399 pub fn append_account_changeset_entry(
1403 &mut self,
1404 change: AccountBeforeTx,
1405 ) -> ProviderResult<()> {
1406 debug_assert!(self.writer.user_header().segment() == StaticFileSegment::AccountChangeSets);
1407 if self.current_changeset_offset.is_none() {
1408 return Err(ProviderError::other(StaticFileWriterError::new(
1409 "account changeset stream must be started before appending entries",
1410 )))
1411 }
1412
1413 self.append_change(&change)
1414 }
1415
1416 pub fn append_storage_changeset(
1420 &mut self,
1421 mut changeset: Vec<StorageBeforeTx>,
1422 block_number: u64,
1423 ) -> ProviderResult<()> {
1424 debug_assert!(self.writer.user_header().segment() == StaticFileSegment::StorageChangeSets);
1425 let start = Instant::now();
1426
1427 self.increment_block(block_number)?;
1428 self.ensure_no_queued_prune()?;
1429
1430 changeset.sort_by_key(|change| (change.address, change.key));
1432
1433 let mut count: u64 = 0;
1434 for change in changeset {
1435 self.append_change(&change)?;
1436 count += 1;
1437 }
1438
1439 if let Some(metrics) = &self.metrics {
1440 metrics.record_segment_operations(
1441 StaticFileSegment::StorageChangeSets,
1442 StaticFileProviderOperation::Append,
1443 count,
1444 Some(start.elapsed()),
1445 );
1446 }
1447
1448 Ok(())
1449 }
1450
1451 pub fn begin_storage_changeset(&mut self, block_number: u64) -> ProviderResult<()> {
1457 debug_assert!(self.writer.user_header().segment() == StaticFileSegment::StorageChangeSets);
1458
1459 self.increment_block(block_number)?;
1460 self.ensure_no_queued_prune()
1461 }
1462
1463 pub fn append_storage_changeset_entry(
1467 &mut self,
1468 change: StorageBeforeTx,
1469 ) -> ProviderResult<()> {
1470 debug_assert!(self.writer.user_header().segment() == StaticFileSegment::StorageChangeSets);
1471 if self.current_changeset_offset.is_none() {
1472 return Err(ProviderError::other(StaticFileWriterError::new(
1473 "storage changeset stream must be started before appending entries",
1474 )))
1475 }
1476
1477 self.append_change(&change)
1478 }
1479
1480 pub fn prune_transactions(
1484 &mut self,
1485 to_delete: u64,
1486 last_block: BlockNumber,
1487 ) -> ProviderResult<()> {
1488 debug_assert_eq!(self.writer.user_header().segment(), StaticFileSegment::Transactions);
1489 self.queue_prune(PruneStrategy::Transactions { num_rows: to_delete, last_block })
1490 }
1491
1492 pub fn prune_receipts(
1496 &mut self,
1497 to_delete: u64,
1498 last_block: BlockNumber,
1499 ) -> ProviderResult<()> {
1500 debug_assert_eq!(self.writer.user_header().segment(), StaticFileSegment::Receipts);
1501 self.queue_prune(PruneStrategy::Receipts { num_rows: to_delete, last_block })
1502 }
1503
1504 pub fn prune_transaction_senders(
1508 &mut self,
1509 to_delete: u64,
1510 last_block: BlockNumber,
1511 ) -> ProviderResult<()> {
1512 debug_assert_eq!(
1513 self.writer.user_header().segment(),
1514 StaticFileSegment::TransactionSenders
1515 );
1516 self.queue_prune(PruneStrategy::TransactionSenders { num_rows: to_delete, last_block })
1517 }
1518
1519 pub fn prune_headers(&mut self, to_delete: u64) -> ProviderResult<()> {
1521 debug_assert_eq!(self.writer.user_header().segment(), StaticFileSegment::Headers);
1522 self.queue_prune(PruneStrategy::Headers { num_blocks: to_delete })
1523 }
1524
1525 pub fn prune_account_changesets(&mut self, last_block: u64) -> ProviderResult<()> {
1527 debug_assert_eq!(self.writer.user_header().segment(), StaticFileSegment::AccountChangeSets);
1528 self.queue_prune(PruneStrategy::AccountChangeSets { last_block })
1529 }
1530
1531 pub fn prune_storage_changesets(&mut self, last_block: u64) -> ProviderResult<()> {
1533 debug_assert_eq!(self.writer.user_header().segment(), StaticFileSegment::StorageChangeSets);
1534 self.queue_prune(PruneStrategy::StorageChangeSets { last_block })
1535 }
1536
1537 fn queue_prune(&mut self, strategy: PruneStrategy) -> ProviderResult<()> {
1539 self.ensure_no_queued_prune()?;
1540 self.prune_on_commit = Some(strategy);
1541 Ok(())
1542 }
1543
1544 fn ensure_no_queued_prune(&self) -> ProviderResult<()> {
1546 if self.prune_on_commit.is_some() {
1547 return Err(ProviderError::other(StaticFileWriterError::new(
1548 "Pruning should be committed before appending or pruning more data",
1549 )));
1550 }
1551 Ok(())
1552 }
1553
1554 fn prune_transaction_data(
1556 &mut self,
1557 to_delete: u64,
1558 last_block: BlockNumber,
1559 ) -> ProviderResult<()> {
1560 let start = Instant::now();
1561
1562 debug_assert!(self.writer.user_header().segment() == StaticFileSegment::Transactions);
1563
1564 self.truncate(to_delete, Some(last_block))?;
1565
1566 if let Some(metrics) = &self.metrics {
1567 metrics.record_segment_operation(
1568 StaticFileSegment::Transactions,
1569 StaticFileProviderOperation::Prune,
1570 Some(start.elapsed()),
1571 );
1572 }
1573
1574 Ok(())
1575 }
1576
1577 fn prune_account_changeset_data(&mut self, last_block: BlockNumber) -> ProviderResult<()> {
1579 let start = Instant::now();
1580
1581 debug_assert!(self.writer.user_header().segment() == StaticFileSegment::AccountChangeSets);
1582
1583 self.truncate_changesets(last_block)?;
1584
1585 if let Some(metrics) = &self.metrics {
1586 metrics.record_segment_operation(
1587 StaticFileSegment::AccountChangeSets,
1588 StaticFileProviderOperation::Prune,
1589 Some(start.elapsed()),
1590 );
1591 }
1592
1593 Ok(())
1594 }
1595
1596 fn prune_storage_changeset_data(&mut self, last_block: BlockNumber) -> ProviderResult<()> {
1598 let start = Instant::now();
1599
1600 debug_assert!(self.writer.user_header().segment() == StaticFileSegment::StorageChangeSets);
1601
1602 self.truncate_changesets(last_block)?;
1603
1604 if let Some(metrics) = &self.metrics {
1605 metrics.record_segment_operation(
1606 StaticFileSegment::StorageChangeSets,
1607 StaticFileProviderOperation::Prune,
1608 Some(start.elapsed()),
1609 );
1610 }
1611
1612 Ok(())
1613 }
1614
1615 fn prune_receipt_data(
1617 &mut self,
1618 to_delete: u64,
1619 last_block: BlockNumber,
1620 ) -> ProviderResult<()> {
1621 let start = Instant::now();
1622
1623 debug_assert!(self.writer.user_header().segment() == StaticFileSegment::Receipts);
1624
1625 self.truncate(to_delete, Some(last_block))?;
1626
1627 if let Some(metrics) = &self.metrics {
1628 metrics.record_segment_operation(
1629 StaticFileSegment::Receipts,
1630 StaticFileProviderOperation::Prune,
1631 Some(start.elapsed()),
1632 );
1633 }
1634
1635 Ok(())
1636 }
1637
1638 fn prune_transaction_sender_data(
1640 &mut self,
1641 to_delete: u64,
1642 last_block: BlockNumber,
1643 ) -> ProviderResult<()> {
1644 let start = Instant::now();
1645
1646 debug_assert!(self.writer.user_header().segment() == StaticFileSegment::TransactionSenders);
1647
1648 self.truncate(to_delete, Some(last_block))?;
1649
1650 if let Some(metrics) = &self.metrics {
1651 metrics.record_segment_operation(
1652 StaticFileSegment::TransactionSenders,
1653 StaticFileProviderOperation::Prune,
1654 Some(start.elapsed()),
1655 );
1656 }
1657
1658 Ok(())
1659 }
1660
1661 fn prune_header_data(&mut self, to_delete: u64) -> ProviderResult<()> {
1663 let start = Instant::now();
1664
1665 debug_assert!(self.writer.user_header().segment() == StaticFileSegment::Headers);
1666
1667 self.truncate(to_delete, None)?;
1668
1669 if let Some(metrics) = &self.metrics {
1670 metrics.record_segment_operation(
1671 StaticFileSegment::Headers,
1672 StaticFileProviderOperation::Prune,
1673 Some(start.elapsed()),
1674 );
1675 }
1676
1677 Ok(())
1678 }
1679
1680 pub fn reader(&self) -> StaticFileProvider<N> {
1682 Self::upgrade_provider_to_strong_reference(&self.reader)
1683 }
1684
1685 fn upgrade_provider_to_strong_reference(
1694 provider: &Weak<StaticFileProviderInner<N>>,
1695 ) -> StaticFileProvider<N> {
1696 provider.upgrade().map(StaticFileProvider).expect("StaticFileProvider is dropped")
1697 }
1698
1699 pub const fn user_header(&self) -> &SegmentHeader {
1701 self.writer.user_header()
1702 }
1703
1704 pub const fn user_header_mut(&mut self) -> &mut SegmentHeader {
1706 self.writer.user_header_mut()
1707 }
1708
1709 #[cfg(any(test, feature = "test-utils"))]
1711 pub const fn set_block_range(&mut self, block_range: std::ops::RangeInclusive<BlockNumber>) {
1712 self.writer.user_header_mut().set_block_range(*block_range.start(), *block_range.end())
1713 }
1714
1715 #[cfg(any(test, feature = "test-utils"))]
1717 pub const fn inner(&mut self) -> &mut NippyJarWriter<SegmentHeader> {
1718 &mut self.writer
1719 }
1720}
1721
1722fn create_jar(
1723 segment: StaticFileSegment,
1724 path: &Path,
1725 expected_block_range: SegmentRangeInclusive,
1726) -> NippyJar<SegmentHeader> {
1727 let mut jar = NippyJar::new(
1728 segment.columns(),
1729 path,
1730 SegmentHeader::new(expected_block_range, None, None, segment),
1731 );
1732
1733 if segment.is_headers() {
1736 jar = jar.with_lz4();
1737 }
1738
1739 jar
1740}