Skip to main content

reth_provider/providers/static_file/
writer.rs

1use super::{
2    manager::StaticFileProviderInner, metrics::StaticFileProviderMetrics, StaticFileProvider,
3};
4use crate::providers::static_file::metrics::StaticFileProviderOperation;
5use alloy_consensus::BlockHeader;
6use alloy_primitives::{BlockHash, BlockNumber, TxNumber, U256};
7use parking_lot::{lock_api::RwLockWriteGuard, RawRwLock, RwLock};
8use reth_codecs::Compact;
9use reth_db::models::{AccountBeforeTx, StorageBeforeTx};
10use reth_db_api::models::CompactU256;
11use reth_nippy_jar::{NippyJar, NippyJarError, NippyJarWriter};
12use reth_node_types::NodePrimitives;
13use reth_static_file_types::{
14    ChangesetOffset, ChangesetOffsetReader, ChangesetOffsetWriter, SegmentHeader,
15    SegmentRangeInclusive, StaticFileSegment,
16};
17use reth_storage_errors::provider::{ProviderError, ProviderResult, StaticFileWriterError};
18use std::{
19    borrow::Borrow,
20    cmp::Ordering,
21    fmt::Debug,
22    path::{Path, PathBuf},
23    sync::{Arc, Weak},
24    time::Instant,
25};
26use tracing::{debug, instrument};
27
28/// Represents different pruning strategies for various static file segments.
29#[derive(Debug, Clone, Copy)]
30enum PruneStrategy {
31    /// Prune headers by number of blocks to delete.
32    Headers {
33        /// Number of blocks to delete.
34        num_blocks: u64,
35    },
36    /// Prune transactions by number of rows and last block.
37    Transactions {
38        /// Number of transaction rows to delete.
39        num_rows: u64,
40        /// The last block number after pruning.
41        last_block: BlockNumber,
42    },
43    /// Prune receipts by number of rows and last block.
44    Receipts {
45        /// Number of receipt rows to delete.
46        num_rows: u64,
47        /// The last block number after pruning.
48        last_block: BlockNumber,
49    },
50    /// Prune transaction senders by number of rows and last block.
51    TransactionSenders {
52        /// Number of transaction sender rows to delete.
53        num_rows: u64,
54        /// The last block number after pruning.
55        last_block: BlockNumber,
56    },
57    /// Prune account changesets to a target block number.
58    AccountChangeSets {
59        /// The target block number to prune to.
60        last_block: BlockNumber,
61    },
62    /// Prune storage changesets to a target block number.
63    StorageChangeSets {
64        /// The target block number to prune to.
65        last_block: BlockNumber,
66    },
67}
68
69/// Static file writers for every known [`StaticFileSegment`].
70///
71/// WARNING: Trying to use more than one writer for the same segment type **will result in a
72/// deadlock**.
73#[derive(Debug)]
74pub(crate) struct StaticFileWriters<N> {
75    headers: RwLock<Option<StaticFileProviderRW<N>>>,
76    transactions: RwLock<Option<StaticFileProviderRW<N>>>,
77    receipts: RwLock<Option<StaticFileProviderRW<N>>>,
78    transaction_senders: RwLock<Option<StaticFileProviderRW<N>>>,
79    account_change_sets: RwLock<Option<StaticFileProviderRW<N>>>,
80    storage_change_sets: RwLock<Option<StaticFileProviderRW<N>>>,
81}
82
83impl<N> Default for StaticFileWriters<N> {
84    fn default() -> Self {
85        Self {
86            headers: Default::default(),
87            transactions: Default::default(),
88            receipts: Default::default(),
89            transaction_senders: Default::default(),
90            account_change_sets: Default::default(),
91            storage_change_sets: Default::default(),
92        }
93    }
94}
95
96impl<N: NodePrimitives> StaticFileWriters<N> {
97    pub(crate) fn get_or_create(
98        &self,
99        segment: StaticFileSegment,
100        create_fn: impl FnOnce() -> ProviderResult<StaticFileProviderRW<N>>,
101    ) -> ProviderResult<StaticFileProviderRWRefMut<'_, N>> {
102        let mut write_guard = match segment {
103            StaticFileSegment::Headers => self.headers.write(),
104            StaticFileSegment::Transactions => self.transactions.write(),
105            StaticFileSegment::Receipts => self.receipts.write(),
106            StaticFileSegment::TransactionSenders => self.transaction_senders.write(),
107            StaticFileSegment::AccountChangeSets => self.account_change_sets.write(),
108            StaticFileSegment::StorageChangeSets => self.storage_change_sets.write(),
109        };
110
111        if write_guard.is_none() {
112            *write_guard = Some(create_fn()?);
113        }
114
115        Ok(StaticFileProviderRWRefMut(write_guard))
116    }
117
118    #[instrument(
119        name = "StaticFileWriters::commit",
120        level = "debug",
121        target = "providers::static_file",
122        skip_all
123    )]
124    pub(crate) fn commit(&self) -> ProviderResult<()> {
125        debug!(target: "providers::static_file", "Committing all static file segments");
126
127        for writer_lock in [
128            &self.headers,
129            &self.transactions,
130            &self.receipts,
131            &self.transaction_senders,
132            &self.account_change_sets,
133            &self.storage_change_sets,
134        ] {
135            let mut writer = writer_lock.write();
136            if let Some(writer) = writer.as_mut() {
137                writer.commit()?;
138            }
139        }
140
141        debug!(target: "providers::static_file", "Committed all static file segments");
142        Ok(())
143    }
144
145    pub(crate) fn has_unwind_queued(&self) -> bool {
146        for writer_lock in [
147            &self.headers,
148            &self.transactions,
149            &self.receipts,
150            &self.transaction_senders,
151            &self.account_change_sets,
152            &self.storage_change_sets,
153        ] {
154            let writer = writer_lock.read();
155            if let Some(writer) = writer.as_ref() &&
156                writer.will_prune_on_commit()
157            {
158                return true
159            }
160        }
161        false
162    }
163
164    /// Finalizes all writers by committing their configuration to disk and updating indices.
165    ///
166    /// Must be called after `sync_all` was called on individual writers.
167    /// Returns an error if any writer has prune queued.
168    #[instrument(
169        name = "StaticFileWriters::finalize",
170        level = "debug",
171        target = "providers::static_file",
172        skip_all
173    )]
174    pub(crate) fn finalize(&self) -> ProviderResult<()> {
175        debug!(target: "providers::static_file", "Finalizing all static file segments into disk");
176
177        for writer_lock in [
178            &self.headers,
179            &self.transactions,
180            &self.receipts,
181            &self.transaction_senders,
182            &self.account_change_sets,
183            &self.storage_change_sets,
184        ] {
185            let mut writer = writer_lock.write();
186            if let Some(writer) = writer.as_mut() {
187                writer.finalize()?;
188            }
189        }
190
191        debug!(target: "providers::static_file", "Finalized all static file segments into disk");
192        Ok(())
193    }
194}
195
196/// Mutable reference to a [`StaticFileProviderRW`] behind a [`RwLockWriteGuard`].
197#[derive(Debug)]
198pub struct StaticFileProviderRWRefMut<'a, N>(
199    pub(crate) RwLockWriteGuard<'a, RawRwLock, Option<StaticFileProviderRW<N>>>,
200);
201
202impl<N> std::ops::DerefMut for StaticFileProviderRWRefMut<'_, N> {
203    fn deref_mut(&mut self) -> &mut Self::Target {
204        // This is always created by [`StaticFileWriters::get_or_create`]
205        self.0.as_mut().expect("static file writer provider should be init")
206    }
207}
208
209impl<N> std::ops::Deref for StaticFileProviderRWRefMut<'_, N> {
210    type Target = StaticFileProviderRW<N>;
211
212    fn deref(&self) -> &Self::Target {
213        // This is always created by [`StaticFileWriters::get_or_create`]
214        self.0.as_ref().expect("static file writer provider should be init")
215    }
216}
217
218#[derive(Debug)]
219/// Extends `StaticFileProvider` with writing capabilities
220pub struct StaticFileProviderRW<N> {
221    /// Reference back to the provider. We need [Weak] here because [`StaticFileProviderRW`] is
222    /// stored in a [`reth_primitives_traits::dashmap::DashMap`] inside the parent
223    /// [`StaticFileProvider`].which is an [Arc]. If we were to use an [Arc] here, we would
224    /// create a reference cycle.
225    reader: Weak<StaticFileProviderInner<N>>,
226    /// A [`NippyJarWriter`] instance.
227    writer: NippyJarWriter<SegmentHeader>,
228    /// Path to opened file.
229    data_path: PathBuf,
230    /// Reusable buffer for encoding appended data.
231    buf: Vec<u8>,
232    /// Metrics.
233    metrics: Option<Arc<StaticFileProviderMetrics>>,
234    /// On commit, contains the pruning strategy to apply for the segment.
235    prune_on_commit: Option<PruneStrategy>,
236    /// Whether `sync_all()` has been called. Used by `finalize()` to avoid redundant syncs.
237    synced: bool,
238    /// Changeset offsets sidecar writer (only for changeset segments).
239    changeset_offsets: Option<ChangesetOffsetWriter>,
240    /// Current block's changeset offset being written.
241    current_changeset_offset: Option<ChangesetOffset>,
242}
243
244impl<N: NodePrimitives> StaticFileProviderRW<N> {
245    /// Creates a new [`StaticFileProviderRW`] for a [`StaticFileSegment`].
246    ///
247    /// Before use, transaction based segments should ensure the block end range is the expected
248    /// one, and heal if not. For more check `Self::ensure_end_range_consistency`.
249    pub fn new(
250        segment: StaticFileSegment,
251        block: BlockNumber,
252        reader: Weak<StaticFileProviderInner<N>>,
253        metrics: Option<Arc<StaticFileProviderMetrics>>,
254    ) -> ProviderResult<Self> {
255        let (writer, data_path) = Self::open(segment, block, reader.clone(), metrics.clone())?;
256
257        // Create writer WITHOUT sidecar first - we'll add it after healing
258        let mut writer = Self {
259            writer,
260            data_path,
261            buf: Vec::with_capacity(100),
262            reader,
263            metrics,
264            prune_on_commit: None,
265            synced: false,
266            changeset_offsets: None,
267            current_changeset_offset: None,
268        };
269
270        // Run NippyJar healing BEFORE setting up changeset sidecar
271        // This may reduce rows, which affects valid sidecar offsets
272        writer.ensure_end_range_consistency()?;
273
274        // Now set up changeset sidecar with post-heal header values
275        if segment.is_change_based() {
276            writer.heal_changeset_sidecar()?;
277        }
278
279        Ok(writer)
280    }
281
282    fn open(
283        segment: StaticFileSegment,
284        block: u64,
285        reader: Weak<StaticFileProviderInner<N>>,
286        metrics: Option<Arc<StaticFileProviderMetrics>>,
287    ) -> ProviderResult<(NippyJarWriter<SegmentHeader>, PathBuf)> {
288        let start = Instant::now();
289
290        let static_file_provider = Self::upgrade_provider_to_strong_reference(&reader);
291
292        let block_range = static_file_provider.find_fixed_range(segment, block);
293        let (jar, path) = match static_file_provider.get_segment_provider_for_block(
294            segment,
295            block_range.start(),
296            None,
297        ) {
298            Ok(provider) => (
299                NippyJar::load(provider.data_path()).map_err(ProviderError::other)?,
300                provider.data_path().into(),
301            ),
302            Err(ProviderError::MissingStaticFileBlock(_, _)) => {
303                let path = static_file_provider.directory().join(segment.filename(&block_range));
304                (create_jar(segment, &path, block_range), path)
305            }
306            Err(err) => return Err(err),
307        };
308
309        let result = match NippyJarWriter::new(jar) {
310            Ok(writer) => Ok((writer, path)),
311            Err(NippyJarError::FrozenJar) => {
312                // This static file has been frozen, so we should
313                Err(ProviderError::FinalizedStaticFile(segment, block))
314            }
315            Err(e) => Err(ProviderError::other(e)),
316        }?;
317
318        if let Some(metrics) = &metrics {
319            metrics.record_segment_operation(
320                segment,
321                StaticFileProviderOperation::OpenWriter,
322                Some(start.elapsed()),
323            );
324        }
325
326        Ok(result)
327    }
328
329    /// If a file level healing happens, we need to update the end range on the
330    /// [`SegmentHeader`].
331    ///
332    /// However, for transaction based segments, the block end range has to be found and healed
333    /// externally.
334    ///
335    /// Check [`reth_nippy_jar::NippyJarChecker`] &
336    /// [`NippyJarWriter`] for more on healing.
337    fn ensure_end_range_consistency(&mut self) -> ProviderResult<()> {
338        // If we have lost rows (in this run or previous), we need to update the [SegmentHeader].
339        let expected_rows = if self.user_header().segment().is_headers() {
340            self.user_header().block_len().unwrap_or_default()
341        } else {
342            self.user_header().tx_len().unwrap_or_default()
343        };
344        let actual_rows = self.writer.rows() as u64;
345        let pruned_rows = expected_rows.saturating_sub(actual_rows);
346        if pruned_rows > 0 {
347            self.user_header_mut().prune(pruned_rows);
348        }
349
350        debug!(
351            target: "providers::static_file",
352            segment = ?self.writer.user_header().segment(),
353            path = ?self.data_path,
354            pruned_rows,
355            "Ensuring end range consistency"
356        );
357
358        self.writer.commit().map_err(ProviderError::other)?;
359
360        // Updates the [SnapshotProvider] manager
361        self.update_index()?;
362        Ok(())
363    }
364
365    /// Returns `true` if the writer will prune on commit.
366    pub const fn will_prune_on_commit(&self) -> bool {
367        self.prune_on_commit.is_some()
368    }
369
370    /// Heals the changeset offset sidecar after `NippyJar` healing.
371    ///
372    /// This must be called AFTER `ensure_end_range_consistency()` which may reduce rows.
373    /// Performs three-way consistency check between header, `NippyJar` rows, and sidecar file:
374    /// - Validates sidecar offsets don't point past actual `NippyJar` rows
375    /// - Heals header if sidecar was truncated during interrupted prune
376    /// - Truncates sidecar if offsets point past healed `NippyJar` data
377    fn heal_changeset_sidecar(&mut self) -> ProviderResult<()> {
378        let csoff_path = self.data_path.with_extension("csoff");
379
380        // Step 1: Read all three sources of truth
381        let header_claims_blocks = self.writer.user_header().changeset_offsets_len();
382        let actual_nippy_rows = self.writer.rows() as u64;
383
384        // Get actual sidecar file size (may differ from header after crash)
385        let actual_sidecar_blocks = if csoff_path.exists() {
386            let file_len = reth_fs_util::metadata(&csoff_path).map_err(ProviderError::other)?.len();
387            // Remove partial records from crash mid-write
388            let aligned_len = file_len - (file_len % 16);
389            aligned_len / 16
390        } else {
391            0
392        };
393
394        // Fresh segment or no sidecar data - nothing to heal
395        if header_claims_blocks == 0 && actual_sidecar_blocks == 0 {
396            self.changeset_offsets =
397                Some(ChangesetOffsetWriter::new(&csoff_path, 0).map_err(ProviderError::other)?);
398            return Ok(());
399        }
400
401        // Step 2: Validate sidecar offsets against actual NippyJar state
402        let valid_blocks = if actual_sidecar_blocks > 0 {
403            let mut reader = ChangesetOffsetReader::new(&csoff_path, actual_sidecar_blocks)
404                .map_err(ProviderError::other)?;
405
406            // Find last block where offset + num_changes <= actual_nippy_rows
407            // This correctly handles rows=0 with offset=0, num_changes=0 (empty blocks)
408            let mut valid = 0u64;
409            for i in 0..actual_sidecar_blocks {
410                if let Some(offset) = reader.get(i).map_err(ProviderError::other)? {
411                    if offset.offset() + offset.num_changes() <= actual_nippy_rows {
412                        valid = i + 1;
413                    } else {
414                        // This block points past EOF - stop here
415                        break;
416                    }
417                }
418            }
419            valid
420        } else {
421            0
422        };
423
424        // Step 3: Determine correct state from synced files (source of truth)
425        // Header is the commit marker - never enlarge, only shrink
426        let correct_blocks = valid_blocks.min(header_claims_blocks);
427
428        // Step 4: Heal if header doesn't match validated truth
429        let mut needs_header_commit = false;
430
431        if correct_blocks != header_claims_blocks || actual_sidecar_blocks != correct_blocks {
432            tracing::warn!(
433                target: "reth::static_file",
434                path = %csoff_path.display(),
435                header_claims = header_claims_blocks,
436                sidecar_has = actual_sidecar_blocks,
437                valid_blocks = correct_blocks,
438                actual_rows = actual_nippy_rows,
439                "Three-way healing: syncing header, sidecar, and NippyJar state"
440            );
441
442            // Truncate sidecar file if it has invalid blocks
443            if actual_sidecar_blocks > correct_blocks {
444                use std::fs::OpenOptions;
445                let file = OpenOptions::new()
446                    .write(true)
447                    .open(&csoff_path)
448                    .map_err(ProviderError::other)?;
449                file.set_len(correct_blocks * 16).map_err(ProviderError::other)?;
450                file.sync_all().map_err(ProviderError::other)?;
451
452                tracing::debug!(
453                    target: "reth::static_file",
454                    "Truncated sidecar from {} to {} blocks",
455                    actual_sidecar_blocks,
456                    correct_blocks
457                );
458            }
459
460            // Update header to match validated truth (can only shrink, never enlarge)
461            if correct_blocks < header_claims_blocks {
462                // Blocks were removed - use prune() to update both block_range and
463                // changeset_offsets_len atomically
464                let blocks_removed = header_claims_blocks - correct_blocks;
465                self.writer.user_header_mut().prune(blocks_removed);
466
467                tracing::debug!(
468                    target: "reth::static_file",
469                    "Updated header: removed {} blocks (changeset_offsets_len: {} -> {})",
470                    blocks_removed,
471                    header_claims_blocks,
472                    correct_blocks
473                );
474
475                needs_header_commit = true;
476            }
477        } else {
478            tracing::debug!(
479                target: "reth::static_file",
480                path = %csoff_path.display(),
481                blocks = correct_blocks,
482                "Changeset sidecar consistent, no healing needed"
483            );
484        }
485
486        // Open sidecar writer with corrected count (won't error now that sizes match)
487        let csoff_writer = ChangesetOffsetWriter::new(&csoff_path, correct_blocks)
488            .map_err(ProviderError::other)?;
489
490        self.changeset_offsets = Some(csoff_writer);
491
492        // Commit healed header if needed (after sidecar writer is set up)
493        if needs_header_commit {
494            self.writer.commit().map_err(ProviderError::other)?;
495
496            tracing::info!(
497                target: "reth::static_file",
498                path = %csoff_path.display(),
499                blocks = correct_blocks,
500                "Committed healed changeset offset header"
501            );
502        }
503
504        Ok(())
505    }
506
507    /// Flushes the current changeset offset (if any) to the `.csoff` sidecar file.
508    ///
509    /// This is idempotent - safe to call multiple times. After flushing, the current offset
510    /// is cleared to prevent duplicate writes.
511    ///
512    /// This must be called before committing or syncing to ensure the last block's offset
513    /// is persisted, since `increment_block()` only writes the *previous* block's offset.
514    fn flush_current_changeset_offset(&mut self) -> ProviderResult<()> {
515        if !self.writer.user_header().segment().is_change_based() {
516            return Ok(());
517        }
518
519        if let Some(offset) = self.current_changeset_offset.take() &&
520            let Some(writer) = &mut self.changeset_offsets
521        {
522            writer.append(&offset).map_err(ProviderError::other)?;
523        }
524        Ok(())
525    }
526
527    /// Syncs all data (rows, offsets, and changeset offsets sidecar) to disk.
528    ///
529    /// This does NOT commit the configuration. Call [`Self::finalize`] after to write the
530    /// configuration and mark the writer as clean.
531    ///
532    /// Returns an error if prune is queued (use [`Self::commit`] instead).
533    pub fn sync_all(&mut self) -> ProviderResult<()> {
534        if self.prune_on_commit.is_some() {
535            return Err(StaticFileWriterError::FinalizeWithPruneQueued.into());
536        }
537
538        // Write the final block's offset and sync the sidecar for changeset segments
539        self.flush_current_changeset_offset()?;
540        if let Some(writer) = &mut self.changeset_offsets {
541            writer.sync().map_err(ProviderError::other)?;
542            // Update the header with the actual number of offsets written
543            self.writer.user_header_mut().set_changeset_offsets_len(writer.len());
544        }
545
546        if self.writer.is_dirty() {
547            self.writer.sync_all().map_err(ProviderError::other)?;
548        }
549        self.synced = true;
550        Ok(())
551    }
552
553    /// Commits configuration to disk and updates the reader index.
554    ///
555    /// If `sync_all()` was not called, this will call it first to ensure data is persisted.
556    ///
557    /// Returns an error if prune is queued (use [`Self::commit`] instead).
558    #[instrument(
559        name = "StaticFileProviderRW::finalize",
560        level = "debug",
561        target = "providers::static_file",
562        skip_all
563    )]
564    pub fn finalize(&mut self) -> ProviderResult<()> {
565        if self.prune_on_commit.is_some() {
566            return Err(StaticFileWriterError::FinalizeWithPruneQueued.into());
567        }
568        if self.writer.is_dirty() {
569            if !self.synced {
570                // Must call self.sync_all() to flush changeset offsets and update
571                // the header's changeset_offsets_len, not just the inner writer
572                self.sync_all()?;
573            }
574
575            self.writer.finalize().map_err(ProviderError::other)?;
576            self.update_index()?;
577        }
578        self.synced = false;
579        Ok(())
580    }
581
582    /// Commits configuration changes to disk and updates the reader index with the new changes.
583    #[instrument(
584        name = "StaticFileProviderRW::commit",
585        level = "debug",
586        target = "providers::static_file",
587        skip_all
588    )]
589    pub fn commit(&mut self) -> ProviderResult<()> {
590        let start = Instant::now();
591
592        // Truncates the data file if instructed to.
593        if let Some(strategy) = self.prune_on_commit.take() {
594            debug!(
595                target: "providers::static_file",
596                segment = ?self.writer.user_header().segment(),
597                "Pruning data on commit"
598            );
599            match strategy {
600                PruneStrategy::Headers { num_blocks } => self.prune_header_data(num_blocks)?,
601                PruneStrategy::Transactions { num_rows, last_block } => {
602                    self.prune_transaction_data(num_rows, last_block)?
603                }
604                PruneStrategy::Receipts { num_rows, last_block } => {
605                    self.prune_receipt_data(num_rows, last_block)?
606                }
607                PruneStrategy::TransactionSenders { num_rows, last_block } => {
608                    self.prune_transaction_sender_data(num_rows, last_block)?
609                }
610                PruneStrategy::AccountChangeSets { last_block } => {
611                    self.prune_account_changeset_data(last_block)?
612                }
613                PruneStrategy::StorageChangeSets { last_block } => {
614                    self.prune_storage_changeset_data(last_block)?
615                }
616            }
617        }
618
619        // For changeset segments, flush and sync the sidecar file before committing the main file.
620        // This ensures crash consistency: the sidecar is durable before the header references it.
621        self.flush_current_changeset_offset()?;
622        if let Some(writer) = &mut self.changeset_offsets {
623            writer.sync().map_err(ProviderError::other)?;
624            // Update the header with the actual number of offsets written
625            self.writer.user_header_mut().set_changeset_offsets_len(writer.len());
626        }
627
628        if self.writer.is_dirty() {
629            debug!(
630                target: "providers::static_file",
631                segment = ?self.writer.user_header().segment(),
632                "Committing writer to disk"
633            );
634
635            // Commits offsets and new user_header to disk
636            self.writer.commit().map_err(ProviderError::other)?;
637
638            if let Some(metrics) = &self.metrics {
639                metrics.record_segment_operation(
640                    self.writer.user_header().segment(),
641                    StaticFileProviderOperation::CommitWriter,
642                    Some(start.elapsed()),
643                );
644            }
645
646            debug!(
647                target: "providers::static_file",
648                segment = ?self.writer.user_header().segment(),
649                path = ?self.data_path,
650                duration = ?start.elapsed(),
651                "Committed writer to disk"
652            );
653
654            self.update_index()?;
655        }
656
657        Ok(())
658    }
659
660    /// Commits configuration changes to disk and updates the reader index with the new changes.
661    ///
662    /// CAUTION: does not call `sync_all` on the files.
663    #[cfg(feature = "test-utils")]
664    pub fn commit_without_sync_all(&mut self) -> ProviderResult<()> {
665        let start = Instant::now();
666
667        debug!(
668            target: "providers::static_file",
669            segment = ?self.writer.user_header().segment(),
670            "Committing writer to disk (without sync)"
671        );
672
673        // Commits offsets and new user_header to disk
674        self.writer.commit_without_sync_all().map_err(ProviderError::other)?;
675
676        if let Some(metrics) = &self.metrics {
677            metrics.record_segment_operation(
678                self.writer.user_header().segment(),
679                StaticFileProviderOperation::CommitWriter,
680                Some(start.elapsed()),
681            );
682        }
683
684        debug!(
685            target: "providers::static_file",
686            segment = ?self.writer.user_header().segment(),
687            path = ?self.data_path,
688            duration = ?start.elapsed(),
689            "Committed writer to disk (without sync)"
690        );
691
692        self.update_index()?;
693
694        Ok(())
695    }
696
697    /// Updates the `self.reader` internal index.
698    fn update_index(&self) -> ProviderResult<()> {
699        // We find the maximum block of the segment by checking this writer's last block.
700        //
701        // However if there's no block range (because there's no data), we try to calculate it by
702        // subtracting 1 from the expected block start, resulting on the last block of the
703        // previous file.
704        //
705        // If that expected block start is 0, then it means that there's no actual block data, and
706        // there's no block data in static files.
707        let segment_max_block = self
708            .writer
709            .user_header()
710            .block_range()
711            .as_ref()
712            .map(|block_range| block_range.end())
713            .or_else(|| {
714                (self.writer.user_header().expected_block_start() >
715                    self.reader().genesis_block_number())
716                .then(|| self.writer.user_header().expected_block_start() - 1)
717            });
718
719        self.reader().update_index(self.writer.user_header().segment(), segment_max_block)
720    }
721
722    /// Ensures that the writer is positioned at the specified block number.
723    ///
724    /// If the writer is positioned at a greater block number than the specified one, the writer
725    /// will NOT be unwound and the error will be returned.
726    pub fn ensure_at_block(&mut self, advance_to: BlockNumber) -> ProviderResult<()> {
727        let current_block = if let Some(current_block_number) = self.current_block_number() {
728            current_block_number
729        } else {
730            self.increment_block(0)?;
731            0
732        };
733
734        match current_block.cmp(&advance_to) {
735            Ordering::Less => {
736                for block in current_block + 1..=advance_to {
737                    self.increment_block(block)?;
738                }
739            }
740            Ordering::Equal => {}
741            Ordering::Greater => {
742                return Err(ProviderError::UnexpectedStaticFileBlockNumber(
743                    self.writer.user_header().segment(),
744                    current_block,
745                    advance_to,
746                ));
747            }
748        }
749
750        Ok(())
751    }
752
753    /// Allows to increment the [`SegmentHeader`] end block. It will commit the current static file,
754    /// and create the next one if we are past the end range.
755    pub fn increment_block(&mut self, expected_block_number: BlockNumber) -> ProviderResult<()> {
756        let segment = self.writer.user_header().segment();
757
758        self.check_next_block_number(expected_block_number)?;
759
760        let start = Instant::now();
761        if let Some(last_block) = self.writer.user_header().block_end() {
762            // We have finished the previous static file and must freeze it
763            if last_block == self.writer.user_header().expected_block_end() {
764                // Commits offsets and new user_header to disk
765                self.commit()?;
766
767                // Opens the new static file
768                let (writer, data_path) =
769                    Self::open(segment, last_block + 1, self.reader.clone(), self.metrics.clone())?;
770                self.writer = writer;
771                self.data_path = data_path.clone();
772
773                // Update changeset offsets writer for the new file (starts empty)
774                if segment.is_change_based() {
775                    let csoff_path = data_path.with_extension("csoff");
776                    self.changeset_offsets = Some(
777                        ChangesetOffsetWriter::new(&csoff_path, 0).map_err(ProviderError::other)?,
778                    );
779                }
780
781                *self.writer.user_header_mut() = SegmentHeader::new(
782                    self.reader().find_fixed_range(segment, last_block + 1),
783                    None,
784                    None,
785                    segment,
786                );
787            }
788        }
789
790        self.writer.user_header_mut().increment_block();
791
792        // Handle changeset offset tracking for changeset segments
793        if segment.is_change_based() {
794            // Write previous block's offset if we have one
795            if let Some(offset) = self.current_changeset_offset.take() &&
796                let Some(writer) = &mut self.changeset_offsets
797            {
798                writer.append(&offset).map_err(ProviderError::other)?;
799            }
800            // Start tracking new block's offset
801            let new_offset = self.writer.rows() as u64;
802            self.current_changeset_offset = Some(ChangesetOffset::new(new_offset, 0));
803        }
804
805        if let Some(metrics) = &self.metrics {
806            metrics.record_segment_operation(
807                segment,
808                StaticFileProviderOperation::IncrementBlock,
809                Some(start.elapsed()),
810            );
811        }
812
813        Ok(())
814    }
815
816    /// Returns the current block number of the static file writer.
817    pub fn current_block_number(&self) -> Option<u64> {
818        self.writer.user_header().block_end()
819    }
820
821    /// Returns a block number that is one next to the current tip of static files.
822    pub fn next_block_number(&self) -> u64 {
823        // The next static file block number can be found by checking the one after block_end.
824        // However, if it's a new file that hasn't been added any data, its block range will
825        // actually be None. In that case, the next block will be found on `expected_block_start`.
826        self.writer
827            .user_header()
828            .block_end()
829            .map(|b| b + 1)
830            .unwrap_or_else(|| self.writer.user_header().expected_block_start())
831    }
832
833    /// Verifies if the incoming block number matches the next expected block number
834    /// for a static file. This ensures data continuity when adding new blocks.
835    fn check_next_block_number(&self, expected_block_number: u64) -> ProviderResult<()> {
836        let next_static_file_block = self.next_block_number();
837
838        if expected_block_number != next_static_file_block {
839            return Err(ProviderError::UnexpectedStaticFileBlockNumber(
840                self.writer.user_header().segment(),
841                expected_block_number,
842                next_static_file_block,
843            ))
844        }
845        Ok(())
846    }
847
848    /// Truncates account changesets to the given block. It deletes and loads an older static file
849    /// if the block goes beyond the start of the current block range.
850    ///
851    /// # Note
852    /// Commits to the configuration file at the end
853    fn truncate_changesets(&mut self, last_block: u64) -> ProviderResult<()> {
854        let segment = self.writer.user_header().segment();
855        debug_assert!(segment.is_change_based());
856
857        // Get the current block range
858        let current_block_end = self
859            .writer
860            .user_header()
861            .block_end()
862            .ok_or(ProviderError::MissingStaticFileBlock(segment, 0))?;
863
864        // If we're already at or before the target block, nothing to do
865        if current_block_end <= last_block {
866            return Ok(())
867        }
868
869        // Navigate to the correct file if the target block is in a previous file
870        let mut expected_block_start = self.writer.user_header().expected_block_start();
871        while last_block < expected_block_start && expected_block_start > 0 {
872            self.delete_current_and_open_previous()?;
873            expected_block_start = self.writer.user_header().expected_block_start();
874        }
875
876        // Find the number of rows to keep (up to and including last_block)
877        let blocks_to_keep = if last_block >= expected_block_start {
878            last_block - expected_block_start + 1
879        } else {
880            0
881        };
882
883        // Read changeset offsets from sidecar file to find where to truncate
884        let csoff_path = self.data_path.with_extension("csoff");
885        let changeset_offsets_len = self.writer.user_header().changeset_offsets_len();
886
887        // Flush any pending changeset offset before reading the sidecar
888        self.flush_current_changeset_offset()?;
889
890        let rows_to_keep = if blocks_to_keep == 0 {
891            0
892        } else if blocks_to_keep >= changeset_offsets_len {
893            // Keep all rows in this file
894            self.writer.rows() as u64
895        } else {
896            // Read offset for the block after last_block from sidecar.
897            // Use committed length from header, ignoring any uncommitted records
898            // that may exist in the file after a crash.
899            let mut reader = ChangesetOffsetReader::new(&csoff_path, changeset_offsets_len)
900                .map_err(ProviderError::other)?;
901            if let Some(next_offset) = reader.get(blocks_to_keep).map_err(ProviderError::other)? {
902                next_offset.offset()
903            } else {
904                // If we can't read the offset, keep all rows
905                self.writer.rows() as u64
906            }
907        };
908
909        let total_rows = self.writer.rows() as u64;
910        let rows_to_delete = total_rows.saturating_sub(rows_to_keep);
911
912        if rows_to_delete > 0 {
913            // Calculate the number of blocks to prune
914            let current_block_end = self
915                .writer
916                .user_header()
917                .block_end()
918                .ok_or(ProviderError::MissingStaticFileBlock(segment, 0))?;
919            let blocks_to_remove = current_block_end - last_block;
920
921            // Update segment header - for changesets, prune expects number of blocks, not rows
922            self.writer.user_header_mut().prune(blocks_to_remove);
923
924            // Prune the actual rows
925            self.writer.prune_rows(rows_to_delete as usize).map_err(ProviderError::other)?;
926        }
927
928        // Update the block range
929        self.writer.user_header_mut().set_block_range(expected_block_start, last_block);
930
931        // Sync changeset offsets to match the new block range
932        self.writer.user_header_mut().sync_changeset_offsets();
933
934        // Truncate the sidecar file to match the new block count
935        if let Some(writer) = &mut self.changeset_offsets {
936            writer.truncate(blocks_to_keep).map_err(ProviderError::other)?;
937        }
938
939        // Clear current changeset offset tracking since we've pruned
940        self.current_changeset_offset = None;
941
942        // Commits new changes to disk
943        self.commit()?;
944
945        Ok(())
946    }
947
948    /// Truncates a number of rows from disk. It deletes and loads an older static file if block
949    /// goes beyond the start of the current block range.
950    ///
951    /// **`last_block`** should be passed only with transaction based segments.
952    ///
953    /// # Note
954    /// Commits to the configuration file at the end.
955    fn truncate(&mut self, num_rows: u64, last_block: Option<u64>) -> ProviderResult<()> {
956        let mut remaining_rows = num_rows;
957        let segment = self.writer.user_header().segment();
958        while remaining_rows > 0 {
959            let len = if segment.is_block_based() {
960                self.writer.user_header().block_len().unwrap_or_default()
961            } else {
962                self.writer.user_header().tx_len().unwrap_or_default()
963            };
964
965            if remaining_rows >= len {
966                // If there's more rows to delete than this static file contains, then just
967                // delete the whole file and go to the next static file
968                let block_start = self.writer.user_header().expected_block_start();
969
970                // We only delete the file if it's NOT the first static file AND:
971                // * it's a Header segment  OR
972                // * it's a tx-based segment AND `last_block` is lower than the first block of this
973                //   file's block range. Otherwise, having no rows simply means that this block
974                //   range has no transactions, but the file should remain.
975                if block_start != 0 &&
976                    (segment.is_headers() || last_block.is_some_and(|b| b < block_start))
977                {
978                    self.delete_current_and_open_previous()?;
979                } else {
980                    // Update `SegmentHeader`
981                    self.writer.user_header_mut().prune(len);
982                    self.writer.prune_rows(len as usize).map_err(ProviderError::other)?;
983                    break
984                }
985
986                remaining_rows -= len;
987            } else {
988                // Update `SegmentHeader`
989                self.writer.user_header_mut().prune(remaining_rows);
990
991                // Truncate data
992                self.writer.prune_rows(remaining_rows as usize).map_err(ProviderError::other)?;
993                remaining_rows = 0;
994            }
995        }
996
997        // Only Transactions and Receipts
998        if let Some(last_block) = last_block {
999            let mut expected_block_start = self.writer.user_header().expected_block_start();
1000
1001            if num_rows == 0 {
1002                // Edge case for when we are unwinding a chain of empty blocks that goes across
1003                // files, and therefore, the only reference point to know which file
1004                // we are supposed to be at is `last_block`.
1005                while last_block < expected_block_start {
1006                    self.delete_current_and_open_previous()?;
1007                    expected_block_start = self.writer.user_header().expected_block_start();
1008                }
1009            }
1010            self.writer.user_header_mut().set_block_range(expected_block_start, last_block);
1011        }
1012
1013        // Commits new changes to disk.
1014        self.commit()?;
1015
1016        Ok(())
1017    }
1018
1019    /// Delete the current static file, and replace this provider writer with the previous static
1020    /// file.
1021    fn delete_current_and_open_previous(&mut self) -> Result<(), ProviderError> {
1022        let segment = self.user_header().segment();
1023        let current_path = self.data_path.clone();
1024        let (previous_writer, data_path) = Self::open(
1025            segment,
1026            self.writer.user_header().expected_block_start() - 1,
1027            self.reader.clone(),
1028            self.metrics.clone(),
1029        )?;
1030        self.writer = previous_writer;
1031        self.writer.set_dirty();
1032        self.data_path = data_path.clone();
1033
1034        // Delete the sidecar file for changeset segments before deleting the main jar
1035        if segment.is_change_based() {
1036            let csoff_path = current_path.with_extension("csoff");
1037            if csoff_path.exists() {
1038                std::fs::remove_file(&csoff_path).map_err(ProviderError::other)?;
1039            }
1040            // Re-initialize the changeset offsets writer for the previous file
1041            let new_csoff_path = data_path.with_extension("csoff");
1042            let committed_len = self.writer.user_header().changeset_offsets_len();
1043            self.changeset_offsets = Some(
1044                ChangesetOffsetWriter::new(&new_csoff_path, committed_len)
1045                    .map_err(ProviderError::other)?,
1046            );
1047        }
1048
1049        // Clear current changeset offset tracking since we're switching files
1050        self.current_changeset_offset = None;
1051
1052        NippyJar::<SegmentHeader>::load(&current_path)
1053            .map_err(ProviderError::other)?
1054            .delete()
1055            .map_err(ProviderError::other)?;
1056        Ok(())
1057    }
1058
1059    /// Appends column to static file.
1060    fn append_column<T: Compact>(&mut self, column: T) -> ProviderResult<()> {
1061        self.buf.clear();
1062        column.to_compact(&mut self.buf);
1063
1064        self.writer.append_column(Some(Ok(&self.buf))).map_err(ProviderError::other)?;
1065        Ok(())
1066    }
1067
1068    /// Appends to tx number-based static file.
1069    fn append_with_tx_number<V: Compact>(
1070        &mut self,
1071        tx_num: TxNumber,
1072        value: V,
1073    ) -> ProviderResult<()> {
1074        if let Some(range) = self.writer.user_header().tx_range() {
1075            let next_tx = range.end() + 1;
1076            if next_tx != tx_num {
1077                return Err(ProviderError::UnexpectedStaticFileTxNumber(
1078                    self.writer.user_header().segment(),
1079                    tx_num,
1080                    next_tx,
1081                ))
1082            }
1083            self.writer.user_header_mut().increment_tx();
1084        } else {
1085            self.writer.user_header_mut().set_tx_range(tx_num, tx_num);
1086        }
1087
1088        self.append_column(value)?;
1089
1090        Ok(())
1091    }
1092
1093    /// Appends change to changeset static file.
1094    fn append_change<V: Compact>(&mut self, change: &V) -> ProviderResult<()> {
1095        if let Some(ref mut offset) = self.current_changeset_offset {
1096            offset.increment_num_changes();
1097        }
1098        self.append_column(change)?;
1099        Ok(())
1100    }
1101
1102    /// Appends header to static file.
1103    ///
1104    /// It **CALLS** `increment_block()` since the number of headers is equal to the number of
1105    /// blocks.
1106    pub fn append_header(&mut self, header: &N::BlockHeader, hash: &BlockHash) -> ProviderResult<()>
1107    where
1108        N::BlockHeader: Compact,
1109    {
1110        self.append_header_with_td(header, U256::ZERO, hash)
1111    }
1112
1113    /// Appends header to static file with a specified total difficulty.
1114    ///
1115    /// It **CALLS** `increment_block()` since the number of headers is equal to the number of
1116    /// blocks.
1117    pub fn append_header_with_td(
1118        &mut self,
1119        header: &N::BlockHeader,
1120        total_difficulty: U256,
1121        hash: &BlockHash,
1122    ) -> ProviderResult<()>
1123    where
1124        N::BlockHeader: Compact,
1125    {
1126        let start = Instant::now();
1127        self.ensure_no_queued_prune()?;
1128
1129        debug_assert!(self.writer.user_header().segment() == StaticFileSegment::Headers);
1130
1131        self.increment_block(header.number())?;
1132
1133        self.append_column(header)?;
1134        self.append_column(CompactU256::from(total_difficulty))?;
1135        self.append_column(hash)?;
1136
1137        if let Some(metrics) = &self.metrics {
1138            metrics.record_segment_operation(
1139                StaticFileSegment::Headers,
1140                StaticFileProviderOperation::Append,
1141                Some(start.elapsed()),
1142            );
1143        }
1144
1145        Ok(())
1146    }
1147
1148    /// Appends header to static file without calling `increment_block`.
1149    /// This is useful for genesis blocks with non-zero block numbers.
1150    pub fn append_header_direct(
1151        &mut self,
1152        header: &N::BlockHeader,
1153        total_difficulty: U256,
1154        hash: &BlockHash,
1155    ) -> ProviderResult<()>
1156    where
1157        N::BlockHeader: Compact,
1158    {
1159        let start = Instant::now();
1160        self.ensure_no_queued_prune()?;
1161
1162        debug_assert!(self.writer.user_header().segment() == StaticFileSegment::Headers);
1163
1164        self.append_column(header)?;
1165        self.append_column(CompactU256::from(total_difficulty))?;
1166        self.append_column(hash)?;
1167
1168        if let Some(metrics) = &self.metrics {
1169            metrics.record_segment_operation(
1170                StaticFileSegment::Headers,
1171                StaticFileProviderOperation::Append,
1172                Some(start.elapsed()),
1173            );
1174        }
1175
1176        Ok(())
1177    }
1178
1179    /// Appends transaction to static file.
1180    ///
1181    /// It **DOES NOT CALL** `increment_block()`, it should be handled elsewhere. There might be
1182    /// empty blocks and this function wouldn't be called.
1183    pub fn append_transaction(&mut self, tx_num: TxNumber, tx: &N::SignedTx) -> ProviderResult<()>
1184    where
1185        N::SignedTx: Compact,
1186    {
1187        let start = Instant::now();
1188        self.ensure_no_queued_prune()?;
1189
1190        debug_assert!(self.writer.user_header().segment() == StaticFileSegment::Transactions);
1191        self.append_with_tx_number(tx_num, tx)?;
1192
1193        if let Some(metrics) = &self.metrics {
1194            metrics.record_segment_operation(
1195                StaticFileSegment::Transactions,
1196                StaticFileProviderOperation::Append,
1197                Some(start.elapsed()),
1198            );
1199        }
1200
1201        Ok(())
1202    }
1203
1204    /// Appends receipt to static file.
1205    ///
1206    /// It **DOES NOT** call `increment_block()`, it should be handled elsewhere. There might be
1207    /// empty blocks and this function wouldn't be called.
1208    pub fn append_receipt(&mut self, tx_num: TxNumber, receipt: &N::Receipt) -> ProviderResult<()>
1209    where
1210        N::Receipt: Compact,
1211    {
1212        let start = Instant::now();
1213        self.ensure_no_queued_prune()?;
1214
1215        debug_assert!(self.writer.user_header().segment() == StaticFileSegment::Receipts);
1216        self.append_with_tx_number(tx_num, receipt)?;
1217
1218        if let Some(metrics) = &self.metrics {
1219            metrics.record_segment_operation(
1220                StaticFileSegment::Receipts,
1221                StaticFileProviderOperation::Append,
1222                Some(start.elapsed()),
1223            );
1224        }
1225
1226        Ok(())
1227    }
1228
1229    /// Appends multiple receipts to the static file.
1230    pub fn append_receipts<I, R>(&mut self, receipts: I) -> ProviderResult<()>
1231    where
1232        I: Iterator<Item = Result<(TxNumber, R), ProviderError>>,
1233        R: Borrow<N::Receipt>,
1234        N::Receipt: Compact,
1235    {
1236        debug_assert!(self.writer.user_header().segment() == StaticFileSegment::Receipts);
1237
1238        let mut receipts_iter = receipts.into_iter().peekable();
1239        // If receipts are empty, we can simply return None
1240        if receipts_iter.peek().is_none() {
1241            return Ok(());
1242        }
1243
1244        let start = Instant::now();
1245        self.ensure_no_queued_prune()?;
1246
1247        // At this point receipts contains at least one receipt, so this would be overwritten.
1248        let mut count: u64 = 0;
1249
1250        for receipt_result in receipts_iter {
1251            let (tx_num, receipt) = receipt_result?;
1252            self.append_with_tx_number(tx_num, receipt.borrow())?;
1253            count += 1;
1254        }
1255
1256        if let Some(metrics) = &self.metrics {
1257            metrics.record_segment_operations(
1258                StaticFileSegment::Receipts,
1259                StaticFileProviderOperation::Append,
1260                count,
1261                Some(start.elapsed()),
1262            );
1263        }
1264
1265        Ok(())
1266    }
1267
1268    /// Appends transaction sender to static file.
1269    ///
1270    /// It **DOES NOT** call `increment_block()`, it should be handled elsewhere. There might be
1271    /// empty blocks and this function wouldn't be called.
1272    pub fn append_transaction_sender(
1273        &mut self,
1274        tx_num: TxNumber,
1275        sender: &alloy_primitives::Address,
1276    ) -> ProviderResult<()> {
1277        let start = Instant::now();
1278        self.ensure_no_queued_prune()?;
1279
1280        debug_assert!(self.writer.user_header().segment() == StaticFileSegment::TransactionSenders);
1281        self.append_with_tx_number(tx_num, sender)?;
1282
1283        if let Some(metrics) = &self.metrics {
1284            metrics.record_segment_operation(
1285                StaticFileSegment::TransactionSenders,
1286                StaticFileProviderOperation::Append,
1287                Some(start.elapsed()),
1288            );
1289        }
1290
1291        Ok(())
1292    }
1293
1294    /// Appends multiple transaction senders to the static file.
1295    pub fn append_transaction_senders<I>(&mut self, senders: I) -> ProviderResult<()>
1296    where
1297        I: Iterator<Item = (TxNumber, alloy_primitives::Address)>,
1298    {
1299        debug_assert!(self.writer.user_header().segment() == StaticFileSegment::TransactionSenders);
1300
1301        let mut senders_iter = senders.into_iter().peekable();
1302        // If senders are empty, we can simply return
1303        if senders_iter.peek().is_none() {
1304            return Ok(());
1305        }
1306
1307        let start = Instant::now();
1308        self.ensure_no_queued_prune()?;
1309
1310        // At this point senders contains at least one sender, so this would be overwritten.
1311        let mut count: u64 = 0;
1312        for (tx_num, sender) in senders_iter {
1313            self.append_with_tx_number(tx_num, sender)?;
1314            count += 1;
1315        }
1316
1317        if let Some(metrics) = &self.metrics {
1318            metrics.record_segment_operations(
1319                StaticFileSegment::TransactionSenders,
1320                StaticFileProviderOperation::Append,
1321                count,
1322                Some(start.elapsed()),
1323            );
1324        }
1325
1326        Ok(())
1327    }
1328
1329    /// Appends a block changeset to the static file.
1330    ///
1331    /// It **CALLS** `increment_block()`.
1332    ///
1333    /// Returns the current number of changesets in the file, if any.
1334    pub fn append_account_changeset(
1335        &mut self,
1336        mut changeset: Vec<AccountBeforeTx>,
1337        block_number: u64,
1338    ) -> ProviderResult<()> {
1339        debug_assert!(self.writer.user_header().segment() == StaticFileSegment::AccountChangeSets);
1340        let start = Instant::now();
1341
1342        self.increment_block(block_number)?;
1343        self.ensure_no_queued_prune()?;
1344
1345        // first sort the changeset by address
1346        changeset.sort_by_key(|change| change.address);
1347
1348        let mut count: u64 = 0;
1349
1350        for change in changeset {
1351            self.append_change(&change)?;
1352            count += 1;
1353        }
1354
1355        if let Some(metrics) = &self.metrics {
1356            metrics.record_segment_operations(
1357                StaticFileSegment::AccountChangeSets,
1358                StaticFileProviderOperation::Append,
1359                count,
1360                Some(start.elapsed()),
1361            );
1362        }
1363
1364        Ok(())
1365    }
1366
1367    /// Appends a block storage changeset to the static file.
1368    ///
1369    /// It **CALLS** `increment_block()`.
1370    pub fn append_storage_changeset(
1371        &mut self,
1372        mut changeset: Vec<StorageBeforeTx>,
1373        block_number: u64,
1374    ) -> ProviderResult<()> {
1375        debug_assert!(self.writer.user_header().segment() == StaticFileSegment::StorageChangeSets);
1376        let start = Instant::now();
1377
1378        self.increment_block(block_number)?;
1379        self.ensure_no_queued_prune()?;
1380
1381        // sort by address + storage key
1382        changeset.sort_by_key(|change| (change.address, change.key));
1383
1384        let mut count: u64 = 0;
1385        for change in changeset {
1386            self.append_change(&change)?;
1387            count += 1;
1388        }
1389
1390        if let Some(metrics) = &self.metrics {
1391            metrics.record_segment_operations(
1392                StaticFileSegment::StorageChangeSets,
1393                StaticFileProviderOperation::Append,
1394                count,
1395                Some(start.elapsed()),
1396            );
1397        }
1398
1399        Ok(())
1400    }
1401
1402    /// Adds an instruction to prune `to_delete` transactions during commit.
1403    ///
1404    /// Note: `last_block` refers to the block the unwinds ends at.
1405    pub fn prune_transactions(
1406        &mut self,
1407        to_delete: u64,
1408        last_block: BlockNumber,
1409    ) -> ProviderResult<()> {
1410        debug_assert_eq!(self.writer.user_header().segment(), StaticFileSegment::Transactions);
1411        self.queue_prune(PruneStrategy::Transactions { num_rows: to_delete, last_block })
1412    }
1413
1414    /// Adds an instruction to prune `to_delete` receipts during commit.
1415    ///
1416    /// Note: `last_block` refers to the block the unwinds ends at.
1417    pub fn prune_receipts(
1418        &mut self,
1419        to_delete: u64,
1420        last_block: BlockNumber,
1421    ) -> ProviderResult<()> {
1422        debug_assert_eq!(self.writer.user_header().segment(), StaticFileSegment::Receipts);
1423        self.queue_prune(PruneStrategy::Receipts { num_rows: to_delete, last_block })
1424    }
1425
1426    /// Adds an instruction to prune `to_delete` transaction senders during commit.
1427    ///
1428    /// Note: `last_block` refers to the block the unwinds ends at.
1429    pub fn prune_transaction_senders(
1430        &mut self,
1431        to_delete: u64,
1432        last_block: BlockNumber,
1433    ) -> ProviderResult<()> {
1434        debug_assert_eq!(
1435            self.writer.user_header().segment(),
1436            StaticFileSegment::TransactionSenders
1437        );
1438        self.queue_prune(PruneStrategy::TransactionSenders { num_rows: to_delete, last_block })
1439    }
1440
1441    /// Adds an instruction to prune `to_delete` headers during commit.
1442    pub fn prune_headers(&mut self, to_delete: u64) -> ProviderResult<()> {
1443        debug_assert_eq!(self.writer.user_header().segment(), StaticFileSegment::Headers);
1444        self.queue_prune(PruneStrategy::Headers { num_blocks: to_delete })
1445    }
1446
1447    /// Adds an instruction to prune changesets until the given block.
1448    pub fn prune_account_changesets(&mut self, last_block: u64) -> ProviderResult<()> {
1449        debug_assert_eq!(self.writer.user_header().segment(), StaticFileSegment::AccountChangeSets);
1450        self.queue_prune(PruneStrategy::AccountChangeSets { last_block })
1451    }
1452
1453    /// Adds an instruction to prune storage changesets until the given block.
1454    pub fn prune_storage_changesets(&mut self, last_block: u64) -> ProviderResult<()> {
1455        debug_assert_eq!(self.writer.user_header().segment(), StaticFileSegment::StorageChangeSets);
1456        self.queue_prune(PruneStrategy::StorageChangeSets { last_block })
1457    }
1458
1459    /// Adds an instruction to prune elements during commit using the specified strategy.
1460    fn queue_prune(&mut self, strategy: PruneStrategy) -> ProviderResult<()> {
1461        self.ensure_no_queued_prune()?;
1462        self.prune_on_commit = Some(strategy);
1463        Ok(())
1464    }
1465
1466    /// Returns Error if there is a pruning instruction that needs to be applied.
1467    fn ensure_no_queued_prune(&self) -> ProviderResult<()> {
1468        if self.prune_on_commit.is_some() {
1469            return Err(ProviderError::other(StaticFileWriterError::new(
1470                "Pruning should be committed before appending or pruning more data",
1471            )));
1472        }
1473        Ok(())
1474    }
1475
1476    /// Removes the last `to_delete` transactions from the data file.
1477    fn prune_transaction_data(
1478        &mut self,
1479        to_delete: u64,
1480        last_block: BlockNumber,
1481    ) -> ProviderResult<()> {
1482        let start = Instant::now();
1483
1484        debug_assert!(self.writer.user_header().segment() == StaticFileSegment::Transactions);
1485
1486        self.truncate(to_delete, Some(last_block))?;
1487
1488        if let Some(metrics) = &self.metrics {
1489            metrics.record_segment_operation(
1490                StaticFileSegment::Transactions,
1491                StaticFileProviderOperation::Prune,
1492                Some(start.elapsed()),
1493            );
1494        }
1495
1496        Ok(())
1497    }
1498
1499    /// Prunes the last `to_delete` account changesets from the data file.
1500    fn prune_account_changeset_data(&mut self, last_block: BlockNumber) -> ProviderResult<()> {
1501        let start = Instant::now();
1502
1503        debug_assert!(self.writer.user_header().segment() == StaticFileSegment::AccountChangeSets);
1504
1505        self.truncate_changesets(last_block)?;
1506
1507        if let Some(metrics) = &self.metrics {
1508            metrics.record_segment_operation(
1509                StaticFileSegment::AccountChangeSets,
1510                StaticFileProviderOperation::Prune,
1511                Some(start.elapsed()),
1512            );
1513        }
1514
1515        Ok(())
1516    }
1517
1518    /// Prunes the last storage changesets from the data file.
1519    fn prune_storage_changeset_data(&mut self, last_block: BlockNumber) -> ProviderResult<()> {
1520        let start = Instant::now();
1521
1522        debug_assert!(self.writer.user_header().segment() == StaticFileSegment::StorageChangeSets);
1523
1524        self.truncate_changesets(last_block)?;
1525
1526        if let Some(metrics) = &self.metrics {
1527            metrics.record_segment_operation(
1528                StaticFileSegment::StorageChangeSets,
1529                StaticFileProviderOperation::Prune,
1530                Some(start.elapsed()),
1531            );
1532        }
1533
1534        Ok(())
1535    }
1536
1537    /// Prunes the last `to_delete` receipts from the data file.
1538    fn prune_receipt_data(
1539        &mut self,
1540        to_delete: u64,
1541        last_block: BlockNumber,
1542    ) -> ProviderResult<()> {
1543        let start = Instant::now();
1544
1545        debug_assert!(self.writer.user_header().segment() == StaticFileSegment::Receipts);
1546
1547        self.truncate(to_delete, Some(last_block))?;
1548
1549        if let Some(metrics) = &self.metrics {
1550            metrics.record_segment_operation(
1551                StaticFileSegment::Receipts,
1552                StaticFileProviderOperation::Prune,
1553                Some(start.elapsed()),
1554            );
1555        }
1556
1557        Ok(())
1558    }
1559
1560    /// Prunes the last `to_delete` transaction senders from the data file.
1561    fn prune_transaction_sender_data(
1562        &mut self,
1563        to_delete: u64,
1564        last_block: BlockNumber,
1565    ) -> ProviderResult<()> {
1566        let start = Instant::now();
1567
1568        debug_assert!(self.writer.user_header().segment() == StaticFileSegment::TransactionSenders);
1569
1570        self.truncate(to_delete, Some(last_block))?;
1571
1572        if let Some(metrics) = &self.metrics {
1573            metrics.record_segment_operation(
1574                StaticFileSegment::TransactionSenders,
1575                StaticFileProviderOperation::Prune,
1576                Some(start.elapsed()),
1577            );
1578        }
1579
1580        Ok(())
1581    }
1582
1583    /// Prunes the last `to_delete` headers from the data file.
1584    fn prune_header_data(&mut self, to_delete: u64) -> ProviderResult<()> {
1585        let start = Instant::now();
1586
1587        debug_assert!(self.writer.user_header().segment() == StaticFileSegment::Headers);
1588
1589        self.truncate(to_delete, None)?;
1590
1591        if let Some(metrics) = &self.metrics {
1592            metrics.record_segment_operation(
1593                StaticFileSegment::Headers,
1594                StaticFileProviderOperation::Prune,
1595                Some(start.elapsed()),
1596            );
1597        }
1598
1599        Ok(())
1600    }
1601
1602    /// Returns a [`StaticFileProvider`] associated with this writer.
1603    pub fn reader(&self) -> StaticFileProvider<N> {
1604        Self::upgrade_provider_to_strong_reference(&self.reader)
1605    }
1606
1607    /// Upgrades a weak reference of [`StaticFileProviderInner`] to a strong reference
1608    /// [`StaticFileProvider`].
1609    ///
1610    /// # Panics
1611    ///
1612    /// Panics if the parent [`StaticFileProvider`] is fully dropped while the child writer is still
1613    /// active. In reality, it's impossible to detach the [`StaticFileProviderRW`] from the
1614    /// [`StaticFileProvider`].
1615    fn upgrade_provider_to_strong_reference(
1616        provider: &Weak<StaticFileProviderInner<N>>,
1617    ) -> StaticFileProvider<N> {
1618        provider.upgrade().map(StaticFileProvider).expect("StaticFileProvider is dropped")
1619    }
1620
1621    /// Helper function to access [`SegmentHeader`].
1622    pub const fn user_header(&self) -> &SegmentHeader {
1623        self.writer.user_header()
1624    }
1625
1626    /// Helper function to access a mutable reference to [`SegmentHeader`].
1627    pub const fn user_header_mut(&mut self) -> &mut SegmentHeader {
1628        self.writer.user_header_mut()
1629    }
1630
1631    /// Helper function to override block range for testing.
1632    #[cfg(any(test, feature = "test-utils"))]
1633    pub const fn set_block_range(&mut self, block_range: std::ops::RangeInclusive<BlockNumber>) {
1634        self.writer.user_header_mut().set_block_range(*block_range.start(), *block_range.end())
1635    }
1636
1637    /// Helper function to override block range for testing.
1638    #[cfg(any(test, feature = "test-utils"))]
1639    pub const fn inner(&mut self) -> &mut NippyJarWriter<SegmentHeader> {
1640        &mut self.writer
1641    }
1642}
1643
1644fn create_jar(
1645    segment: StaticFileSegment,
1646    path: &Path,
1647    expected_block_range: SegmentRangeInclusive,
1648) -> NippyJar<SegmentHeader> {
1649    let mut jar = NippyJar::new(
1650        segment.columns(),
1651        path,
1652        SegmentHeader::new(expected_block_range, None, None, segment),
1653    );
1654
1655    // Transaction and Receipt already have the compression scheme used natively in its encoding.
1656    // (zstd-dictionary)
1657    if segment.is_headers() {
1658        jar = jar.with_lz4();
1659    }
1660
1661    jar
1662}