Skip to main content

reth_provider/providers/static_file/
writer.rs

1use super::{
2    manager::StaticFileProviderInner, metrics::StaticFileProviderMetrics, StaticFileProvider,
3};
4use crate::providers::static_file::metrics::StaticFileProviderOperation;
5use alloy_consensus::BlockHeader;
6use alloy_primitives::{BlockHash, BlockNumber, TxNumber, U256};
7use parking_lot::{lock_api::RwLockWriteGuard, RawRwLock, RwLock};
8use reth_codecs::Compact;
9use reth_db::models::{AccountBeforeTx, StorageBeforeTx};
10use reth_db_api::models::CompactU256;
11use reth_nippy_jar::{NippyJar, NippyJarError, NippyJarWriter};
12use reth_node_types::NodePrimitives;
13use reth_primitives_traits::FastInstant as Instant;
14use reth_static_file_types::{
15    ChangesetOffset, ChangesetOffsetReader, ChangesetOffsetWriter, SegmentHeader,
16    SegmentRangeInclusive, StaticFileSegment,
17};
18use reth_storage_errors::provider::{ProviderError, ProviderResult, StaticFileWriterError};
19use std::{
20    borrow::Borrow,
21    cmp::Ordering,
22    fmt::Debug,
23    path::{Path, PathBuf},
24    sync::{Arc, Weak},
25};
26use tracing::{debug, instrument};
27
28/// Represents different pruning strategies for various static file segments.
29#[derive(Debug, Clone, Copy)]
30enum PruneStrategy {
31    /// Prune headers by number of blocks to delete.
32    Headers {
33        /// Number of blocks to delete.
34        num_blocks: u64,
35    },
36    /// Prune transactions by number of rows and last block.
37    Transactions {
38        /// Number of transaction rows to delete.
39        num_rows: u64,
40        /// The last block number after pruning.
41        last_block: BlockNumber,
42    },
43    /// Prune receipts by number of rows and last block.
44    Receipts {
45        /// Number of receipt rows to delete.
46        num_rows: u64,
47        /// The last block number after pruning.
48        last_block: BlockNumber,
49    },
50    /// Prune transaction senders by number of rows and last block.
51    TransactionSenders {
52        /// Number of transaction sender rows to delete.
53        num_rows: u64,
54        /// The last block number after pruning.
55        last_block: BlockNumber,
56    },
57    /// Prune account changesets to a target block number.
58    AccountChangeSets {
59        /// The target block number to prune to.
60        last_block: BlockNumber,
61    },
62    /// Prune storage changesets to a target block number.
63    StorageChangeSets {
64        /// The target block number to prune to.
65        last_block: BlockNumber,
66    },
67}
68
69/// Static file writers for every known [`StaticFileSegment`].
70///
71/// WARNING: Trying to use more than one writer for the same segment type **will result in a
72/// deadlock**.
73#[derive(Debug)]
74pub(crate) struct StaticFileWriters<N> {
75    headers: RwLock<Option<StaticFileProviderRW<N>>>,
76    transactions: RwLock<Option<StaticFileProviderRW<N>>>,
77    receipts: RwLock<Option<StaticFileProviderRW<N>>>,
78    transaction_senders: RwLock<Option<StaticFileProviderRW<N>>>,
79    account_change_sets: RwLock<Option<StaticFileProviderRW<N>>>,
80    storage_change_sets: RwLock<Option<StaticFileProviderRW<N>>>,
81}
82
83impl<N> Default for StaticFileWriters<N> {
84    fn default() -> Self {
85        Self {
86            headers: Default::default(),
87            transactions: Default::default(),
88            receipts: Default::default(),
89            transaction_senders: Default::default(),
90            account_change_sets: Default::default(),
91            storage_change_sets: Default::default(),
92        }
93    }
94}
95
96impl<N: NodePrimitives> StaticFileWriters<N> {
97    pub(crate) fn get_or_create(
98        &self,
99        segment: StaticFileSegment,
100        create_fn: impl FnOnce() -> ProviderResult<StaticFileProviderRW<N>>,
101    ) -> ProviderResult<StaticFileProviderRWRefMut<'_, N>> {
102        let mut write_guard = match segment {
103            StaticFileSegment::Headers => self.headers.write(),
104            StaticFileSegment::Transactions => self.transactions.write(),
105            StaticFileSegment::Receipts => self.receipts.write(),
106            StaticFileSegment::TransactionSenders => self.transaction_senders.write(),
107            StaticFileSegment::AccountChangeSets => self.account_change_sets.write(),
108            StaticFileSegment::StorageChangeSets => self.storage_change_sets.write(),
109        };
110
111        if write_guard.is_none() {
112            *write_guard = Some(create_fn()?);
113        }
114
115        Ok(StaticFileProviderRWRefMut(write_guard))
116    }
117
118    /// Drops the cached writer for a segment before destructive segment-level operations.
119    pub(crate) fn remove(&self, segment: StaticFileSegment) {
120        let mut write_guard = match segment {
121            StaticFileSegment::Headers => self.headers.write(),
122            StaticFileSegment::Transactions => self.transactions.write(),
123            StaticFileSegment::Receipts => self.receipts.write(),
124            StaticFileSegment::TransactionSenders => self.transaction_senders.write(),
125            StaticFileSegment::AccountChangeSets => self.account_change_sets.write(),
126            StaticFileSegment::StorageChangeSets => self.storage_change_sets.write(),
127        };
128
129        *write_guard = None;
130    }
131
132    #[instrument(
133        name = "StaticFileWriters::commit",
134        level = "debug",
135        target = "providers::static_file",
136        skip_all
137    )]
138    pub(crate) fn commit(&self) -> ProviderResult<()> {
139        debug!(target: "providers::static_file", "Committing all static file segments");
140
141        for writer_lock in [
142            &self.headers,
143            &self.transactions,
144            &self.receipts,
145            &self.transaction_senders,
146            &self.account_change_sets,
147            &self.storage_change_sets,
148        ] {
149            let mut writer = writer_lock.write();
150            if let Some(writer) = writer.as_mut() {
151                writer.commit()?;
152            }
153        }
154
155        debug!(target: "providers::static_file", "Committed all static file segments");
156        Ok(())
157    }
158
159    pub(crate) fn has_unwind_queued(&self) -> bool {
160        for writer_lock in [
161            &self.headers,
162            &self.transactions,
163            &self.receipts,
164            &self.transaction_senders,
165            &self.account_change_sets,
166            &self.storage_change_sets,
167        ] {
168            let writer = writer_lock.read();
169            if let Some(writer) = writer.as_ref() &&
170                writer.will_prune_on_commit()
171            {
172                return true
173            }
174        }
175        false
176    }
177
178    /// Finalizes all writers by committing their configuration to disk and updating indices.
179    ///
180    /// Must be called after `sync_all` was called on individual writers.
181    /// Returns an error if any writer has prune queued.
182    #[instrument(
183        name = "StaticFileWriters::finalize",
184        level = "debug",
185        target = "providers::static_file",
186        skip_all
187    )]
188    pub(crate) fn finalize(&self) -> ProviderResult<()> {
189        debug!(target: "providers::static_file", "Finalizing all static file segments into disk");
190
191        for writer_lock in [
192            &self.headers,
193            &self.transactions,
194            &self.receipts,
195            &self.transaction_senders,
196            &self.account_change_sets,
197            &self.storage_change_sets,
198        ] {
199            let mut writer = writer_lock.write();
200            if let Some(writer) = writer.as_mut() {
201                writer.finalize()?;
202            }
203        }
204
205        debug!(target: "providers::static_file", "Finalized all static file segments into disk");
206        Ok(())
207    }
208}
209
210/// Mutable reference to a [`StaticFileProviderRW`] behind a [`RwLockWriteGuard`].
211#[derive(Debug)]
212pub struct StaticFileProviderRWRefMut<'a, N>(
213    pub(crate) RwLockWriteGuard<'a, RawRwLock, Option<StaticFileProviderRW<N>>>,
214);
215
216impl<N> std::ops::DerefMut for StaticFileProviderRWRefMut<'_, N> {
217    fn deref_mut(&mut self) -> &mut Self::Target {
218        // This is always created by [`StaticFileWriters::get_or_create`]
219        self.0.as_mut().expect("static file writer provider should be init")
220    }
221}
222
223impl<N> std::ops::Deref for StaticFileProviderRWRefMut<'_, N> {
224    type Target = StaticFileProviderRW<N>;
225
226    fn deref(&self) -> &Self::Target {
227        // This is always created by [`StaticFileWriters::get_or_create`]
228        self.0.as_ref().expect("static file writer provider should be init")
229    }
230}
231
232#[derive(Debug)]
233/// Extends `StaticFileProvider` with writing capabilities
234pub struct StaticFileProviderRW<N> {
235    /// Reference back to the provider. We need [Weak] here because [`StaticFileProviderRW`] is
236    /// stored in a [`reth_primitives_traits::dashmap::DashMap`] inside the parent
237    /// [`StaticFileProvider`].which is an [Arc]. If we were to use an [Arc] here, we would
238    /// create a reference cycle.
239    reader: Weak<StaticFileProviderInner<N>>,
240    /// A [`NippyJarWriter`] instance.
241    writer: NippyJarWriter<SegmentHeader>,
242    /// Path to opened file.
243    data_path: PathBuf,
244    /// Reusable buffer for encoding appended data.
245    buf: Vec<u8>,
246    /// Metrics.
247    metrics: Option<Arc<StaticFileProviderMetrics>>,
248    /// On commit, contains the pruning strategy to apply for the segment.
249    prune_on_commit: Option<PruneStrategy>,
250    /// Whether `sync_all()` has been called. Used by `finalize()` to avoid redundant syncs.
251    synced: bool,
252    /// Changeset offsets sidecar writer (only for changeset segments).
253    changeset_offsets: Option<ChangesetOffsetWriter>,
254    /// Current block's changeset offset being written.
255    current_changeset_offset: Option<ChangesetOffset>,
256}
257
258impl<N: NodePrimitives> StaticFileProviderRW<N> {
259    /// Creates a new [`StaticFileProviderRW`] for a [`StaticFileSegment`].
260    ///
261    /// Before use, transaction based segments should ensure the block end range is the expected
262    /// one, and heal if not. For more check `Self::ensure_end_range_consistency`.
263    pub fn new(
264        segment: StaticFileSegment,
265        block: BlockNumber,
266        reader: Weak<StaticFileProviderInner<N>>,
267        metrics: Option<Arc<StaticFileProviderMetrics>>,
268    ) -> ProviderResult<Self> {
269        let (writer, data_path) = Self::open(segment, block, reader.clone(), metrics.clone())?;
270
271        // Create writer WITHOUT sidecar first - we'll add it after healing
272        let mut writer = Self {
273            writer,
274            data_path,
275            buf: Vec::with_capacity(100),
276            reader,
277            metrics,
278            prune_on_commit: None,
279            synced: false,
280            changeset_offsets: None,
281            current_changeset_offset: None,
282        };
283
284        // Run NippyJar healing BEFORE setting up changeset sidecar
285        // This may reduce rows, which affects valid sidecar offsets
286        writer.ensure_end_range_consistency()?;
287
288        // Now set up changeset sidecar with post-heal header values
289        if segment.is_change_based() {
290            writer.heal_changeset_sidecar()?;
291        }
292
293        Ok(writer)
294    }
295
296    fn open(
297        segment: StaticFileSegment,
298        block: u64,
299        reader: Weak<StaticFileProviderInner<N>>,
300        metrics: Option<Arc<StaticFileProviderMetrics>>,
301    ) -> ProviderResult<(NippyJarWriter<SegmentHeader>, PathBuf)> {
302        let start = Instant::now();
303
304        let static_file_provider = Self::upgrade_provider_to_strong_reference(&reader);
305
306        let block_range = static_file_provider.find_fixed_range(segment, block);
307        let (jar, path) = match static_file_provider.get_segment_provider_for_block(
308            segment,
309            block_range.start(),
310            None,
311        ) {
312            Ok(provider) => (
313                NippyJar::load(provider.data_path()).map_err(ProviderError::other)?,
314                provider.data_path().into(),
315            ),
316            Err(ProviderError::MissingStaticFileBlock(_, _)) => {
317                let path = static_file_provider.directory().join(segment.filename(&block_range));
318                (create_jar(segment, &path, block_range), path)
319            }
320            Err(err) => return Err(err),
321        };
322
323        let result = match NippyJarWriter::new(jar) {
324            Ok(writer) => Ok((writer, path)),
325            Err(NippyJarError::FrozenJar) => {
326                // This static file has been frozen, so we should
327                Err(ProviderError::FinalizedStaticFile(segment, block))
328            }
329            Err(e) => Err(ProviderError::other(e)),
330        }?;
331
332        if let Some(metrics) = &metrics {
333            metrics.record_segment_operation(
334                segment,
335                StaticFileProviderOperation::OpenWriter,
336                Some(start.elapsed()),
337            );
338        }
339
340        Ok(result)
341    }
342
343    /// If a file level healing happens, we need to update the end range on the
344    /// [`SegmentHeader`].
345    ///
346    /// However, for transaction based segments, the block end range has to be found and healed
347    /// externally.
348    ///
349    /// Check [`reth_nippy_jar::NippyJarChecker`] &
350    /// [`NippyJarWriter`] for more on healing.
351    fn ensure_end_range_consistency(&mut self) -> ProviderResult<()> {
352        // If we have lost rows (in this run or previous), we need to update the [SegmentHeader].
353        let expected_rows = if self.user_header().segment().is_headers() {
354            self.user_header().block_len().unwrap_or_default()
355        } else {
356            self.user_header().tx_len().unwrap_or_default()
357        };
358        let actual_rows = self.writer.rows() as u64;
359        let pruned_rows = expected_rows.saturating_sub(actual_rows);
360        if pruned_rows > 0 {
361            self.user_header_mut().prune(pruned_rows);
362        }
363
364        debug!(
365            target: "providers::static_file",
366            segment = ?self.writer.user_header().segment(),
367            path = ?self.data_path,
368            pruned_rows,
369            "Ensuring end range consistency"
370        );
371
372        self.writer.commit().map_err(ProviderError::other)?;
373
374        // Updates the [SnapshotProvider] manager
375        self.update_index()?;
376        Ok(())
377    }
378
379    /// Returns `true` if the writer will prune on commit.
380    pub const fn will_prune_on_commit(&self) -> bool {
381        self.prune_on_commit.is_some()
382    }
383
384    /// Heals the changeset offset sidecar after `NippyJar` healing.
385    ///
386    /// This must be called AFTER `ensure_end_range_consistency()` which may reduce rows.
387    /// Performs three-way consistency check between header, `NippyJar` rows, and sidecar file:
388    /// - Validates sidecar offsets don't point past actual `NippyJar` rows
389    /// - Heals header if sidecar was truncated during interrupted prune
390    /// - Truncates sidecar if offsets point past healed `NippyJar` data
391    fn heal_changeset_sidecar(&mut self) -> ProviderResult<()> {
392        let csoff_path = self.data_path.with_extension("csoff");
393
394        // Step 1: Read all three sources of truth
395        let header_claims_blocks = self.writer.user_header().changeset_offsets_len();
396        let actual_nippy_rows = self.writer.rows() as u64;
397
398        // Get actual sidecar file size (may differ from header after crash)
399        let actual_sidecar_blocks = if csoff_path.exists() {
400            let file_len = reth_fs_util::metadata(&csoff_path).map_err(ProviderError::other)?.len();
401            // Remove partial records from crash mid-write
402            let aligned_len = file_len - (file_len % 16);
403            aligned_len / 16
404        } else {
405            0
406        };
407
408        // Fresh segment or no sidecar data - nothing to heal
409        if header_claims_blocks == 0 && actual_sidecar_blocks == 0 {
410            self.changeset_offsets =
411                Some(ChangesetOffsetWriter::new(&csoff_path, 0).map_err(ProviderError::other)?);
412            return Ok(());
413        }
414
415        // Step 2: Validate sidecar offsets against actual NippyJar state
416        let valid_blocks = if actual_sidecar_blocks > 0 {
417            let reader = ChangesetOffsetReader::new(&csoff_path, actual_sidecar_blocks)
418                .map_err(ProviderError::other)?;
419
420            // Find last block where offset + num_changes <= actual_nippy_rows
421            // This correctly handles rows=0 with offset=0, num_changes=0 (empty blocks)
422            let mut valid = 0u64;
423            for i in 0..actual_sidecar_blocks {
424                if let Some(offset) = reader.get(i).map_err(ProviderError::other)? {
425                    if offset.offset() + offset.num_changes() <= actual_nippy_rows {
426                        valid = i + 1;
427                    } else {
428                        // This block points past EOF - stop here
429                        break;
430                    }
431                }
432            }
433            valid
434        } else {
435            0
436        };
437
438        // Step 3: Determine correct state from synced files (source of truth)
439        // Header is the commit marker - never enlarge, only shrink
440        let correct_blocks = valid_blocks.min(header_claims_blocks);
441
442        // Step 4: Heal if header doesn't match validated truth
443        let mut needs_header_commit = false;
444
445        if correct_blocks != header_claims_blocks || actual_sidecar_blocks != correct_blocks {
446            tracing::warn!(
447                target: "reth::static_file",
448                path = %csoff_path.display(),
449                header_claims = header_claims_blocks,
450                sidecar_has = actual_sidecar_blocks,
451                valid_blocks = correct_blocks,
452                actual_rows = actual_nippy_rows,
453                "Three-way healing: syncing header, sidecar, and NippyJar state"
454            );
455
456            // Truncate sidecar file if it has invalid blocks
457            if actual_sidecar_blocks > correct_blocks {
458                use std::fs::OpenOptions;
459                let file = OpenOptions::new()
460                    .write(true)
461                    .open(&csoff_path)
462                    .map_err(ProviderError::other)?;
463                file.set_len(correct_blocks * 16).map_err(ProviderError::other)?;
464                file.sync_all().map_err(ProviderError::other)?;
465
466                tracing::debug!(
467                    target: "reth::static_file",
468                    "Truncated sidecar from {} to {} blocks",
469                    actual_sidecar_blocks,
470                    correct_blocks
471                );
472            }
473
474            // Update header to match validated truth (can only shrink, never enlarge)
475            if correct_blocks < header_claims_blocks {
476                // Blocks were removed - use prune() to update both block_range and
477                // changeset_offsets_len atomically
478                let blocks_removed = header_claims_blocks - correct_blocks;
479                self.writer.user_header_mut().prune(blocks_removed);
480
481                tracing::debug!(
482                    target: "reth::static_file",
483                    "Updated header: removed {} blocks (changeset_offsets_len: {} -> {})",
484                    blocks_removed,
485                    header_claims_blocks,
486                    correct_blocks
487                );
488
489                needs_header_commit = true;
490            }
491        } else {
492            tracing::debug!(
493                target: "reth::static_file",
494                path = %csoff_path.display(),
495                blocks = correct_blocks,
496                "Changeset sidecar consistent, no healing needed"
497            );
498        }
499
500        // Open sidecar writer with corrected count (won't error now that sizes match)
501        let csoff_writer = ChangesetOffsetWriter::new(&csoff_path, correct_blocks)
502            .map_err(ProviderError::other)?;
503
504        self.changeset_offsets = Some(csoff_writer);
505
506        // Commit healed header if needed (after sidecar writer is set up)
507        if needs_header_commit {
508            self.writer.commit().map_err(ProviderError::other)?;
509
510            tracing::info!(
511                target: "reth::static_file",
512                path = %csoff_path.display(),
513                blocks = correct_blocks,
514                "Committed healed changeset offset header"
515            );
516        }
517
518        Ok(())
519    }
520
521    /// Flushes the current changeset offset (if any) to the `.csoff` sidecar file.
522    ///
523    /// This is idempotent - safe to call multiple times. After flushing, the current offset
524    /// is cleared to prevent duplicate writes.
525    ///
526    /// This must be called before committing or syncing to ensure the last block's offset
527    /// is persisted, since `increment_block()` only writes the *previous* block's offset.
528    fn flush_current_changeset_offset(&mut self) -> ProviderResult<()> {
529        if !self.writer.user_header().segment().is_change_based() {
530            return Ok(());
531        }
532
533        if let Some(offset) = self.current_changeset_offset.take() &&
534            let Some(writer) = &mut self.changeset_offsets
535        {
536            writer.append(&offset).map_err(ProviderError::other)?;
537        }
538        Ok(())
539    }
540
541    /// Syncs all data (rows, offsets, and changeset offsets sidecar) to disk.
542    ///
543    /// This does NOT commit the configuration. Call [`Self::finalize`] after to write the
544    /// configuration and mark the writer as clean.
545    ///
546    /// Returns an error if prune is queued (use [`Self::commit`] instead).
547    pub fn sync_all(&mut self) -> ProviderResult<()> {
548        if self.prune_on_commit.is_some() {
549            return Err(StaticFileWriterError::FinalizeWithPruneQueued.into());
550        }
551
552        // Write the final block's offset and sync the sidecar for changeset segments
553        self.flush_current_changeset_offset()?;
554        if let Some(writer) = &mut self.changeset_offsets {
555            writer.sync().map_err(ProviderError::other)?;
556            // Update the header with the actual number of offsets written
557            self.writer.user_header_mut().set_changeset_offsets_len(writer.len());
558        }
559
560        if self.writer.is_dirty() {
561            self.writer.sync_all().map_err(ProviderError::other)?;
562        }
563        self.synced = true;
564        Ok(())
565    }
566
567    /// Commits configuration to disk and updates the reader index.
568    ///
569    /// If `sync_all()` was not called, this will call it first to ensure data is persisted.
570    ///
571    /// Returns an error if prune is queued (use [`Self::commit`] instead).
572    #[instrument(
573        name = "StaticFileProviderRW::finalize",
574        level = "debug",
575        target = "providers::static_file",
576        skip_all
577    )]
578    pub fn finalize(&mut self) -> ProviderResult<()> {
579        if self.prune_on_commit.is_some() {
580            return Err(StaticFileWriterError::FinalizeWithPruneQueued.into());
581        }
582        if self.writer.is_dirty() {
583            if !self.synced {
584                // Must call self.sync_all() to flush changeset offsets and update
585                // the header's changeset_offsets_len, not just the inner writer
586                self.sync_all()?;
587            }
588
589            self.writer.finalize().map_err(ProviderError::other)?;
590            self.update_index()?;
591        }
592        self.synced = false;
593        Ok(())
594    }
595
596    /// Commits configuration changes to disk and updates the reader index with the new changes.
597    #[instrument(
598        name = "StaticFileProviderRW::commit",
599        level = "debug",
600        target = "providers::static_file",
601        skip_all
602    )]
603    pub fn commit(&mut self) -> ProviderResult<()> {
604        let start = Instant::now();
605
606        // Truncates the data file if instructed to.
607        if let Some(strategy) = self.prune_on_commit.take() {
608            debug!(
609                target: "providers::static_file",
610                segment = ?self.writer.user_header().segment(),
611                "Pruning data on commit"
612            );
613            match strategy {
614                PruneStrategy::Headers { num_blocks } => self.prune_header_data(num_blocks)?,
615                PruneStrategy::Transactions { num_rows, last_block } => {
616                    self.prune_transaction_data(num_rows, last_block)?
617                }
618                PruneStrategy::Receipts { num_rows, last_block } => {
619                    self.prune_receipt_data(num_rows, last_block)?
620                }
621                PruneStrategy::TransactionSenders { num_rows, last_block } => {
622                    self.prune_transaction_sender_data(num_rows, last_block)?
623                }
624                PruneStrategy::AccountChangeSets { last_block } => {
625                    self.prune_account_changeset_data(last_block)?
626                }
627                PruneStrategy::StorageChangeSets { last_block } => {
628                    self.prune_storage_changeset_data(last_block)?
629                }
630            }
631        }
632
633        // For changeset segments, flush and sync the sidecar file before committing the main file.
634        // This ensures crash consistency: the sidecar is durable before the header references it.
635        self.flush_current_changeset_offset()?;
636        if let Some(writer) = &mut self.changeset_offsets {
637            writer.sync().map_err(ProviderError::other)?;
638            // Update the header with the actual number of offsets written
639            self.writer.user_header_mut().set_changeset_offsets_len(writer.len());
640        }
641
642        if self.writer.is_dirty() {
643            debug!(
644                target: "providers::static_file",
645                segment = ?self.writer.user_header().segment(),
646                "Committing writer to disk"
647            );
648
649            // Commits offsets and new user_header to disk
650            self.writer.commit().map_err(ProviderError::other)?;
651
652            if let Some(metrics) = &self.metrics {
653                metrics.record_segment_operation(
654                    self.writer.user_header().segment(),
655                    StaticFileProviderOperation::CommitWriter,
656                    Some(start.elapsed()),
657                );
658            }
659
660            debug!(
661                target: "providers::static_file",
662                segment = ?self.writer.user_header().segment(),
663                path = ?self.data_path,
664                duration = ?start.elapsed(),
665                "Committed writer to disk"
666            );
667
668            self.update_index()?;
669        }
670
671        Ok(())
672    }
673
674    /// Commits configuration changes to disk and updates the reader index with the new changes.
675    ///
676    /// CAUTION: does not call `sync_all` on the files.
677    #[cfg(feature = "test-utils")]
678    pub fn commit_without_sync_all(&mut self) -> ProviderResult<()> {
679        let start = Instant::now();
680
681        debug!(
682            target: "providers::static_file",
683            segment = ?self.writer.user_header().segment(),
684            "Committing writer to disk (without sync)"
685        );
686
687        // Commits offsets and new user_header to disk
688        self.writer.commit_without_sync_all().map_err(ProviderError::other)?;
689
690        if let Some(metrics) = &self.metrics {
691            metrics.record_segment_operation(
692                self.writer.user_header().segment(),
693                StaticFileProviderOperation::CommitWriter,
694                Some(start.elapsed()),
695            );
696        }
697
698        debug!(
699            target: "providers::static_file",
700            segment = ?self.writer.user_header().segment(),
701            path = ?self.data_path,
702            duration = ?start.elapsed(),
703            "Committed writer to disk (without sync)"
704        );
705
706        self.update_index()?;
707
708        Ok(())
709    }
710
711    /// Updates the `self.reader` internal index.
712    fn update_index(&self) -> ProviderResult<()> {
713        let segment = self.writer.user_header().segment();
714
715        // We find the maximum block of the segment by checking this writer's last block.
716        //
717        // However if there's no block range (because there's no data), we try to calculate it by
718        // subtracting 1 from the expected block start, resulting on the last block of the
719        // previous file — but only if that file actually exists. If the previous file doesn't
720        // exist (e.g. first-ever file for a segment starting past range boundary), there's
721        // nothing to index.
722        let segment_max_block = self
723            .writer
724            .user_header()
725            .block_range()
726            .as_ref()
727            .map(|block_range| block_range.end())
728            .or_else(|| {
729                let expected_start = self.writer.user_header().expected_block_start();
730                if expected_start <= self.reader().genesis_block_number() {
731                    return None;
732                }
733
734                let prev_block = expected_start - 1;
735                let prev_range = self.reader().find_fixed_range(segment, prev_block);
736                let prev_path = self.reader().directory().join(segment.filename(&prev_range));
737                prev_path.exists().then_some(prev_block)
738            });
739
740        self.reader().update_index(segment, segment_max_block)
741    }
742
743    /// Ensures that the writer is positioned at the specified block number.
744    ///
745    /// If the writer is positioned at a greater block number than the specified one, the writer
746    /// will NOT be unwound and the error will be returned.
747    pub fn ensure_at_block(&mut self, advance_to: BlockNumber) -> ProviderResult<()> {
748        let current_block = if let Some(current_block_number) = self.current_block_number() {
749            current_block_number
750        } else {
751            self.increment_block(0)?;
752            0
753        };
754
755        match current_block.cmp(&advance_to) {
756            Ordering::Less => {
757                for block in current_block + 1..=advance_to {
758                    self.increment_block(block)?;
759                }
760            }
761            Ordering::Equal => {}
762            Ordering::Greater => {
763                return Err(ProviderError::UnexpectedStaticFileBlockNumber(
764                    self.writer.user_header().segment(),
765                    current_block,
766                    advance_to,
767                ));
768            }
769        }
770
771        Ok(())
772    }
773
774    /// Allows to increment the [`SegmentHeader`] end block. It will commit the current static file,
775    /// and create the next one if we are past the end range.
776    pub fn increment_block(&mut self, expected_block_number: BlockNumber) -> ProviderResult<()> {
777        let segment = self.writer.user_header().segment();
778
779        self.check_next_block_number(expected_block_number)?;
780
781        let start = Instant::now();
782        if let Some(last_block) = self.writer.user_header().block_end() {
783            // We have finished the previous static file and must freeze it
784            if last_block == self.writer.user_header().expected_block_end() {
785                // Commits offsets and new user_header to disk
786                self.commit()?;
787
788                // Opens the new static file
789                let (writer, data_path) =
790                    Self::open(segment, last_block + 1, self.reader.clone(), self.metrics.clone())?;
791                self.writer = writer;
792                self.data_path = data_path.clone();
793
794                // Update changeset offsets writer for the new file (starts empty)
795                if segment.is_change_based() {
796                    let csoff_path = data_path.with_extension("csoff");
797                    self.changeset_offsets = Some(
798                        ChangesetOffsetWriter::new(&csoff_path, 0).map_err(ProviderError::other)?,
799                    );
800                }
801
802                *self.writer.user_header_mut() = SegmentHeader::new(
803                    self.reader().find_fixed_range(segment, last_block + 1),
804                    None,
805                    None,
806                    segment,
807                );
808            }
809        }
810
811        self.writer.user_header_mut().increment_block();
812
813        // Handle changeset offset tracking for changeset segments
814        if segment.is_change_based() {
815            // Write previous block's offset if we have one
816            if let Some(offset) = self.current_changeset_offset.take() &&
817                let Some(writer) = &mut self.changeset_offsets
818            {
819                writer.append(&offset).map_err(ProviderError::other)?;
820            }
821            // Start tracking new block's offset
822            let new_offset = self.writer.rows() as u64;
823            self.current_changeset_offset = Some(ChangesetOffset::new(new_offset, 0));
824        }
825
826        if let Some(metrics) = &self.metrics {
827            metrics.record_segment_operation(
828                segment,
829                StaticFileProviderOperation::IncrementBlock,
830                Some(start.elapsed()),
831            );
832        }
833
834        Ok(())
835    }
836
837    /// Returns the current block number of the static file writer.
838    pub fn current_block_number(&self) -> Option<u64> {
839        self.writer.user_header().block_end()
840    }
841
842    /// Returns a block number that is one next to the current tip of static files.
843    pub fn next_block_number(&self) -> u64 {
844        // The next static file block number can be found by checking the one after block_end.
845        // However, if it's a new file that hasn't been added any data, its block range will
846        // actually be None. In that case, the next block will be found on `expected_block_start`.
847        self.writer
848            .user_header()
849            .block_end()
850            .map(|b| b + 1)
851            .unwrap_or_else(|| self.writer.user_header().expected_block_start())
852    }
853
854    /// Verifies if the incoming block number matches the next expected block number
855    /// for a static file. This ensures data continuity when adding new blocks.
856    fn check_next_block_number(&self, expected_block_number: u64) -> ProviderResult<()> {
857        let next_static_file_block = self.next_block_number();
858
859        if expected_block_number != next_static_file_block {
860            return Err(ProviderError::UnexpectedStaticFileBlockNumber(
861                self.writer.user_header().segment(),
862                expected_block_number,
863                next_static_file_block,
864            ))
865        }
866        Ok(())
867    }
868
869    /// Truncates account changesets to the given block. It deletes and loads an older static file
870    /// if the block goes beyond the start of the current block range.
871    ///
872    /// # Note
873    /// Commits to the configuration file at the end
874    fn truncate_changesets(&mut self, last_block: u64) -> ProviderResult<()> {
875        let segment = self.writer.user_header().segment();
876        debug_assert!(segment.is_change_based());
877
878        // Get the current block range
879        let current_block_end = self
880            .writer
881            .user_header()
882            .block_end()
883            .ok_or(ProviderError::MissingStaticFileBlock(segment, 0))?;
884
885        // If we're already at or before the target block, nothing to do
886        if current_block_end <= last_block {
887            return Ok(())
888        }
889
890        // Navigate to the correct file if the target block is in a previous file
891        let mut expected_block_start = self.writer.user_header().expected_block_start();
892        while last_block < expected_block_start && expected_block_start > 0 {
893            self.delete_current_and_open_previous()?;
894            expected_block_start = self.writer.user_header().expected_block_start();
895        }
896
897        // Find the number of rows to keep (up to and including last_block)
898        let blocks_to_keep = if last_block >= expected_block_start {
899            last_block - expected_block_start + 1
900        } else {
901            0
902        };
903
904        // Read changeset offsets from sidecar file to find where to truncate
905        let csoff_path = self.data_path.with_extension("csoff");
906        let changeset_offsets_len = self.writer.user_header().changeset_offsets_len();
907
908        // Flush any pending changeset offset before reading the sidecar
909        self.flush_current_changeset_offset()?;
910
911        let rows_to_keep = if blocks_to_keep == 0 {
912            0
913        } else if blocks_to_keep >= changeset_offsets_len {
914            // Keep all rows in this file
915            self.writer.rows() as u64
916        } else {
917            // Read offset for the block after last_block from sidecar.
918            // Use committed length from header, ignoring any uncommitted records
919            // that may exist in the file after a crash.
920            let reader = ChangesetOffsetReader::new(&csoff_path, changeset_offsets_len)
921                .map_err(ProviderError::other)?;
922            if let Some(next_offset) = reader.get(blocks_to_keep).map_err(ProviderError::other)? {
923                next_offset.offset()
924            } else {
925                // If we can't read the offset, keep all rows
926                self.writer.rows() as u64
927            }
928        };
929
930        let total_rows = self.writer.rows() as u64;
931        let rows_to_delete = total_rows.saturating_sub(rows_to_keep);
932
933        if rows_to_delete > 0 {
934            // Calculate the number of blocks to prune
935            let current_block_end = self
936                .writer
937                .user_header()
938                .block_end()
939                .ok_or(ProviderError::MissingStaticFileBlock(segment, 0))?;
940            let blocks_to_remove = current_block_end - last_block;
941
942            // Update segment header - for changesets, prune expects number of blocks, not rows
943            self.writer.user_header_mut().prune(blocks_to_remove);
944
945            // Prune the actual rows
946            self.writer.prune_rows(rows_to_delete as usize).map_err(ProviderError::other)?;
947        }
948
949        // Update the block range
950        self.writer.user_header_mut().set_block_range(expected_block_start, last_block);
951
952        // Sync changeset offsets to match the new block range
953        self.writer.user_header_mut().sync_changeset_offsets();
954
955        // Truncate the sidecar file to match the new block count
956        if let Some(writer) = &mut self.changeset_offsets {
957            writer.truncate(blocks_to_keep).map_err(ProviderError::other)?;
958        }
959
960        // Clear current changeset offset tracking since we've pruned
961        self.current_changeset_offset = None;
962
963        // Commits new changes to disk
964        self.commit()?;
965
966        Ok(())
967    }
968
969    /// Truncates a number of rows from disk. It deletes and loads an older static file if block
970    /// goes beyond the start of the current block range.
971    ///
972    /// **`last_block`** should be passed only with transaction based segments.
973    ///
974    /// # Note
975    /// Commits to the configuration file at the end.
976    fn truncate(&mut self, num_rows: u64, last_block: Option<u64>) -> ProviderResult<()> {
977        let mut remaining_rows = num_rows;
978        let segment = self.writer.user_header().segment();
979        while remaining_rows > 0 {
980            let len = if segment.is_block_based() {
981                self.writer.user_header().block_len().unwrap_or_default()
982            } else {
983                self.writer.user_header().tx_len().unwrap_or_default()
984            };
985
986            if remaining_rows >= len {
987                // If there's more rows to delete than this static file contains, then just
988                // delete the whole file and go to the next static file
989                let block_start = self.writer.user_header().expected_block_start();
990
991                // We only delete the file if it's NOT the first static file AND:
992                // * it's a Header segment  OR
993                // * it's a tx-based segment AND `last_block` is lower than the first block of this
994                //   file's block range. Otherwise, having no rows simply means that this block
995                //   range has no transactions, but the file should remain.
996                if block_start != 0 &&
997                    (segment.is_headers() || last_block.is_some_and(|b| b < block_start))
998                {
999                    self.delete_current_and_open_previous()?;
1000                } else {
1001                    // Update `SegmentHeader`
1002                    self.writer.user_header_mut().prune(len);
1003                    self.writer.prune_rows(len as usize).map_err(ProviderError::other)?;
1004                    break
1005                }
1006
1007                remaining_rows -= len;
1008            } else {
1009                // Update `SegmentHeader`
1010                self.writer.user_header_mut().prune(remaining_rows);
1011
1012                // Truncate data
1013                self.writer.prune_rows(remaining_rows as usize).map_err(ProviderError::other)?;
1014                remaining_rows = 0;
1015            }
1016        }
1017
1018        // Only Transactions and Receipts
1019        if let Some(last_block) = last_block {
1020            let mut expected_block_start = self.writer.user_header().expected_block_start();
1021
1022            if num_rows == 0 {
1023                // Edge case for when we are unwinding a chain of empty blocks that goes across
1024                // files, and therefore, the only reference point to know which file
1025                // we are supposed to be at is `last_block`.
1026                while last_block < expected_block_start {
1027                    self.delete_current_and_open_previous()?;
1028                    expected_block_start = self.writer.user_header().expected_block_start();
1029                }
1030            }
1031            self.writer.user_header_mut().set_block_range(expected_block_start, last_block);
1032        }
1033
1034        // Commits new changes to disk.
1035        self.commit()?;
1036
1037        Ok(())
1038    }
1039
1040    /// Delete the current static file, and replace this provider writer with the previous static
1041    /// file.
1042    fn delete_current_and_open_previous(&mut self) -> Result<(), ProviderError> {
1043        let segment = self.user_header().segment();
1044        let current_path = self.data_path.clone();
1045        let (previous_writer, data_path) = Self::open(
1046            segment,
1047            self.writer.user_header().expected_block_start() - 1,
1048            self.reader.clone(),
1049            self.metrics.clone(),
1050        )?;
1051        self.writer = previous_writer;
1052        self.writer.set_dirty();
1053        self.data_path = data_path.clone();
1054
1055        // Delete the sidecar file for changeset segments before deleting the main jar
1056        if segment.is_change_based() {
1057            let csoff_path = current_path.with_extension("csoff");
1058            if csoff_path.exists() {
1059                std::fs::remove_file(&csoff_path).map_err(ProviderError::other)?;
1060            }
1061            // Re-initialize the changeset offsets writer for the previous file
1062            let new_csoff_path = data_path.with_extension("csoff");
1063            let committed_len = self.writer.user_header().changeset_offsets_len();
1064            self.changeset_offsets = Some(
1065                ChangesetOffsetWriter::new(&new_csoff_path, committed_len)
1066                    .map_err(ProviderError::other)?,
1067            );
1068        }
1069
1070        // Clear current changeset offset tracking since we're switching files
1071        self.current_changeset_offset = None;
1072
1073        NippyJar::<SegmentHeader>::load(&current_path)
1074            .map_err(ProviderError::other)?
1075            .delete()
1076            .map_err(ProviderError::other)?;
1077        Ok(())
1078    }
1079
1080    /// Appends column to static file.
1081    fn append_column<T: Compact>(&mut self, column: T) -> ProviderResult<()> {
1082        self.buf.clear();
1083        column.to_compact(&mut self.buf);
1084
1085        self.writer.append_column(Some(Ok(&self.buf))).map_err(ProviderError::other)?;
1086        Ok(())
1087    }
1088
1089    /// Appends to tx number-based static file.
1090    fn append_with_tx_number<V: Compact>(
1091        &mut self,
1092        tx_num: TxNumber,
1093        value: V,
1094    ) -> ProviderResult<()> {
1095        if let Some(range) = self.writer.user_header().tx_range() {
1096            let next_tx = range.end() + 1;
1097            if next_tx != tx_num {
1098                return Err(ProviderError::UnexpectedStaticFileTxNumber(
1099                    self.writer.user_header().segment(),
1100                    tx_num,
1101                    next_tx,
1102                ))
1103            }
1104            self.writer.user_header_mut().increment_tx();
1105        } else {
1106            self.writer.user_header_mut().set_tx_range(tx_num, tx_num);
1107        }
1108
1109        self.append_column(value)?;
1110
1111        Ok(())
1112    }
1113
1114    /// Appends change to changeset static file.
1115    fn append_change<V: Compact>(&mut self, change: &V) -> ProviderResult<()> {
1116        if let Some(ref mut offset) = self.current_changeset_offset {
1117            offset.increment_num_changes();
1118        }
1119        self.append_column(change)?;
1120        Ok(())
1121    }
1122
1123    /// Appends header to static file.
1124    ///
1125    /// It **CALLS** `increment_block()` since the number of headers is equal to the number of
1126    /// blocks.
1127    pub fn append_header(&mut self, header: &N::BlockHeader, hash: &BlockHash) -> ProviderResult<()>
1128    where
1129        N::BlockHeader: Compact,
1130    {
1131        self.append_header_with_td(header, U256::ZERO, hash)
1132    }
1133
1134    /// Appends header to static file with a specified total difficulty.
1135    ///
1136    /// It **CALLS** `increment_block()` since the number of headers is equal to the number of
1137    /// blocks.
1138    pub fn append_header_with_td(
1139        &mut self,
1140        header: &N::BlockHeader,
1141        total_difficulty: U256,
1142        hash: &BlockHash,
1143    ) -> ProviderResult<()>
1144    where
1145        N::BlockHeader: Compact,
1146    {
1147        let start = Instant::now();
1148        self.ensure_no_queued_prune()?;
1149
1150        debug_assert!(self.writer.user_header().segment() == StaticFileSegment::Headers);
1151
1152        self.increment_block(header.number())?;
1153
1154        self.append_column(header)?;
1155        self.append_column(CompactU256::from(total_difficulty))?;
1156        self.append_column(hash)?;
1157
1158        if let Some(metrics) = &self.metrics {
1159            metrics.record_segment_operation(
1160                StaticFileSegment::Headers,
1161                StaticFileProviderOperation::Append,
1162                Some(start.elapsed()),
1163            );
1164        }
1165
1166        Ok(())
1167    }
1168
1169    /// Appends header to static file without calling `increment_block`.
1170    /// This is useful for genesis blocks with non-zero block numbers.
1171    pub fn append_header_direct(
1172        &mut self,
1173        header: &N::BlockHeader,
1174        total_difficulty: U256,
1175        hash: &BlockHash,
1176    ) -> ProviderResult<()>
1177    where
1178        N::BlockHeader: Compact,
1179    {
1180        let start = Instant::now();
1181        self.ensure_no_queued_prune()?;
1182
1183        debug_assert!(self.writer.user_header().segment() == StaticFileSegment::Headers);
1184
1185        self.append_column(header)?;
1186        self.append_column(CompactU256::from(total_difficulty))?;
1187        self.append_column(hash)?;
1188
1189        if let Some(metrics) = &self.metrics {
1190            metrics.record_segment_operation(
1191                StaticFileSegment::Headers,
1192                StaticFileProviderOperation::Append,
1193                Some(start.elapsed()),
1194            );
1195        }
1196
1197        Ok(())
1198    }
1199
1200    /// Appends transaction to static file.
1201    ///
1202    /// It **DOES NOT CALL** `increment_block()`, it should be handled elsewhere. There might be
1203    /// empty blocks and this function wouldn't be called.
1204    pub fn append_transaction(&mut self, tx_num: TxNumber, tx: &N::SignedTx) -> ProviderResult<()>
1205    where
1206        N::SignedTx: Compact,
1207    {
1208        let start = Instant::now();
1209        self.ensure_no_queued_prune()?;
1210
1211        debug_assert!(self.writer.user_header().segment() == StaticFileSegment::Transactions);
1212        self.append_with_tx_number(tx_num, tx)?;
1213
1214        if let Some(metrics) = &self.metrics {
1215            metrics.record_segment_operation(
1216                StaticFileSegment::Transactions,
1217                StaticFileProviderOperation::Append,
1218                Some(start.elapsed()),
1219            );
1220        }
1221
1222        Ok(())
1223    }
1224
1225    /// Appends receipt to static file.
1226    ///
1227    /// It **DOES NOT** call `increment_block()`, it should be handled elsewhere. There might be
1228    /// empty blocks and this function wouldn't be called.
1229    pub fn append_receipt(&mut self, tx_num: TxNumber, receipt: &N::Receipt) -> ProviderResult<()>
1230    where
1231        N::Receipt: Compact,
1232    {
1233        let start = Instant::now();
1234        self.ensure_no_queued_prune()?;
1235
1236        debug_assert!(self.writer.user_header().segment() == StaticFileSegment::Receipts);
1237        self.append_with_tx_number(tx_num, receipt)?;
1238
1239        if let Some(metrics) = &self.metrics {
1240            metrics.record_segment_operation(
1241                StaticFileSegment::Receipts,
1242                StaticFileProviderOperation::Append,
1243                Some(start.elapsed()),
1244            );
1245        }
1246
1247        Ok(())
1248    }
1249
1250    /// Appends multiple receipts to the static file.
1251    pub fn append_receipts<I, R>(&mut self, receipts: I) -> ProviderResult<()>
1252    where
1253        I: Iterator<Item = Result<(TxNumber, R), ProviderError>>,
1254        R: Borrow<N::Receipt>,
1255        N::Receipt: Compact,
1256    {
1257        debug_assert!(self.writer.user_header().segment() == StaticFileSegment::Receipts);
1258
1259        let mut receipts_iter = receipts.into_iter().peekable();
1260        // If receipts are empty, we can simply return None
1261        if receipts_iter.peek().is_none() {
1262            return Ok(());
1263        }
1264
1265        let start = Instant::now();
1266        self.ensure_no_queued_prune()?;
1267
1268        // At this point receipts contains at least one receipt, so this would be overwritten.
1269        let mut count: u64 = 0;
1270
1271        for receipt_result in receipts_iter {
1272            let (tx_num, receipt) = receipt_result?;
1273            self.append_with_tx_number(tx_num, receipt.borrow())?;
1274            count += 1;
1275        }
1276
1277        if let Some(metrics) = &self.metrics {
1278            metrics.record_segment_operations(
1279                StaticFileSegment::Receipts,
1280                StaticFileProviderOperation::Append,
1281                count,
1282                Some(start.elapsed()),
1283            );
1284        }
1285
1286        Ok(())
1287    }
1288
1289    /// Appends transaction sender to static file.
1290    ///
1291    /// It **DOES NOT** call `increment_block()`, it should be handled elsewhere. There might be
1292    /// empty blocks and this function wouldn't be called.
1293    pub fn append_transaction_sender(
1294        &mut self,
1295        tx_num: TxNumber,
1296        sender: &alloy_primitives::Address,
1297    ) -> ProviderResult<()> {
1298        let start = Instant::now();
1299        self.ensure_no_queued_prune()?;
1300
1301        debug_assert!(self.writer.user_header().segment() == StaticFileSegment::TransactionSenders);
1302        self.append_with_tx_number(tx_num, sender)?;
1303
1304        if let Some(metrics) = &self.metrics {
1305            metrics.record_segment_operation(
1306                StaticFileSegment::TransactionSenders,
1307                StaticFileProviderOperation::Append,
1308                Some(start.elapsed()),
1309            );
1310        }
1311
1312        Ok(())
1313    }
1314
1315    /// Appends multiple transaction senders to the static file.
1316    pub fn append_transaction_senders<I>(&mut self, senders: I) -> ProviderResult<()>
1317    where
1318        I: Iterator<Item = (TxNumber, alloy_primitives::Address)>,
1319    {
1320        debug_assert!(self.writer.user_header().segment() == StaticFileSegment::TransactionSenders);
1321
1322        let mut senders_iter = senders.into_iter().peekable();
1323        // If senders are empty, we can simply return
1324        if senders_iter.peek().is_none() {
1325            return Ok(());
1326        }
1327
1328        let start = Instant::now();
1329        self.ensure_no_queued_prune()?;
1330
1331        // At this point senders contains at least one sender, so this would be overwritten.
1332        let mut count: u64 = 0;
1333        for (tx_num, sender) in senders_iter {
1334            self.append_with_tx_number(tx_num, sender)?;
1335            count += 1;
1336        }
1337
1338        if let Some(metrics) = &self.metrics {
1339            metrics.record_segment_operations(
1340                StaticFileSegment::TransactionSenders,
1341                StaticFileProviderOperation::Append,
1342                count,
1343                Some(start.elapsed()),
1344            );
1345        }
1346
1347        Ok(())
1348    }
1349
1350    /// Appends a block changeset to the static file.
1351    ///
1352    /// It **CALLS** `increment_block()`.
1353    ///
1354    /// Returns the current number of changesets in the file, if any.
1355    pub fn append_account_changeset(
1356        &mut self,
1357        mut changeset: Vec<AccountBeforeTx>,
1358        block_number: u64,
1359    ) -> ProviderResult<()> {
1360        debug_assert!(self.writer.user_header().segment() == StaticFileSegment::AccountChangeSets);
1361        let start = Instant::now();
1362
1363        self.increment_block(block_number)?;
1364        self.ensure_no_queued_prune()?;
1365
1366        // first sort the changeset by address
1367        changeset.sort_by_key(|change| change.address);
1368
1369        let mut count: u64 = 0;
1370
1371        for change in changeset {
1372            self.append_change(&change)?;
1373            count += 1;
1374        }
1375
1376        if let Some(metrics) = &self.metrics {
1377            metrics.record_segment_operations(
1378                StaticFileSegment::AccountChangeSets,
1379                StaticFileProviderOperation::Append,
1380                count,
1381                Some(start.elapsed()),
1382            );
1383        }
1384
1385        Ok(())
1386    }
1387
1388    /// Starts a block account changeset that will be appended one entry at a time.
1389    ///
1390    /// Callers must append entries sorted by address and keep the writer open until the block is
1391    /// complete so the changeset offset sidecar is finalized with the correct row count.
1392    pub fn begin_account_changeset(&mut self, block_number: u64) -> ProviderResult<()> {
1393        debug_assert!(self.writer.user_header().segment() == StaticFileSegment::AccountChangeSets);
1394
1395        self.increment_block(block_number)?;
1396        self.ensure_no_queued_prune()
1397    }
1398
1399    /// Appends one account changeset entry to the current block.
1400    ///
1401    /// [`Self::begin_account_changeset`] must be called first.
1402    pub fn append_account_changeset_entry(
1403        &mut self,
1404        change: AccountBeforeTx,
1405    ) -> ProviderResult<()> {
1406        debug_assert!(self.writer.user_header().segment() == StaticFileSegment::AccountChangeSets);
1407        if self.current_changeset_offset.is_none() {
1408            return Err(ProviderError::other(StaticFileWriterError::new(
1409                "account changeset stream must be started before appending entries",
1410            )))
1411        }
1412
1413        self.append_change(&change)
1414    }
1415
1416    /// Appends a block storage changeset to the static file.
1417    ///
1418    /// It **CALLS** `increment_block()`.
1419    pub fn append_storage_changeset(
1420        &mut self,
1421        mut changeset: Vec<StorageBeforeTx>,
1422        block_number: u64,
1423    ) -> ProviderResult<()> {
1424        debug_assert!(self.writer.user_header().segment() == StaticFileSegment::StorageChangeSets);
1425        let start = Instant::now();
1426
1427        self.increment_block(block_number)?;
1428        self.ensure_no_queued_prune()?;
1429
1430        // sort by address + storage key
1431        changeset.sort_by_key(|change| (change.address, change.key));
1432
1433        let mut count: u64 = 0;
1434        for change in changeset {
1435            self.append_change(&change)?;
1436            count += 1;
1437        }
1438
1439        if let Some(metrics) = &self.metrics {
1440            metrics.record_segment_operations(
1441                StaticFileSegment::StorageChangeSets,
1442                StaticFileProviderOperation::Append,
1443                count,
1444                Some(start.elapsed()),
1445            );
1446        }
1447
1448        Ok(())
1449    }
1450
1451    /// Starts a block storage changeset that will be appended one entry at a time.
1452    ///
1453    /// Callers must append entries sorted by address and storage key and keep the writer open until
1454    /// the block is complete so the changeset offset sidecar is finalized with the correct row
1455    /// count.
1456    pub fn begin_storage_changeset(&mut self, block_number: u64) -> ProviderResult<()> {
1457        debug_assert!(self.writer.user_header().segment() == StaticFileSegment::StorageChangeSets);
1458
1459        self.increment_block(block_number)?;
1460        self.ensure_no_queued_prune()
1461    }
1462
1463    /// Appends one storage changeset entry to the current block.
1464    ///
1465    /// [`Self::begin_storage_changeset`] must be called first.
1466    pub fn append_storage_changeset_entry(
1467        &mut self,
1468        change: StorageBeforeTx,
1469    ) -> ProviderResult<()> {
1470        debug_assert!(self.writer.user_header().segment() == StaticFileSegment::StorageChangeSets);
1471        if self.current_changeset_offset.is_none() {
1472            return Err(ProviderError::other(StaticFileWriterError::new(
1473                "storage changeset stream must be started before appending entries",
1474            )))
1475        }
1476
1477        self.append_change(&change)
1478    }
1479
1480    /// Adds an instruction to prune `to_delete` transactions during commit.
1481    ///
1482    /// Note: `last_block` refers to the block the unwinds ends at.
1483    pub fn prune_transactions(
1484        &mut self,
1485        to_delete: u64,
1486        last_block: BlockNumber,
1487    ) -> ProviderResult<()> {
1488        debug_assert_eq!(self.writer.user_header().segment(), StaticFileSegment::Transactions);
1489        self.queue_prune(PruneStrategy::Transactions { num_rows: to_delete, last_block })
1490    }
1491
1492    /// Adds an instruction to prune `to_delete` receipts during commit.
1493    ///
1494    /// Note: `last_block` refers to the block the unwinds ends at.
1495    pub fn prune_receipts(
1496        &mut self,
1497        to_delete: u64,
1498        last_block: BlockNumber,
1499    ) -> ProviderResult<()> {
1500        debug_assert_eq!(self.writer.user_header().segment(), StaticFileSegment::Receipts);
1501        self.queue_prune(PruneStrategy::Receipts { num_rows: to_delete, last_block })
1502    }
1503
1504    /// Adds an instruction to prune `to_delete` transaction senders during commit.
1505    ///
1506    /// Note: `last_block` refers to the block the unwinds ends at.
1507    pub fn prune_transaction_senders(
1508        &mut self,
1509        to_delete: u64,
1510        last_block: BlockNumber,
1511    ) -> ProviderResult<()> {
1512        debug_assert_eq!(
1513            self.writer.user_header().segment(),
1514            StaticFileSegment::TransactionSenders
1515        );
1516        self.queue_prune(PruneStrategy::TransactionSenders { num_rows: to_delete, last_block })
1517    }
1518
1519    /// Adds an instruction to prune `to_delete` headers during commit.
1520    pub fn prune_headers(&mut self, to_delete: u64) -> ProviderResult<()> {
1521        debug_assert_eq!(self.writer.user_header().segment(), StaticFileSegment::Headers);
1522        self.queue_prune(PruneStrategy::Headers { num_blocks: to_delete })
1523    }
1524
1525    /// Adds an instruction to prune changesets until the given block.
1526    pub fn prune_account_changesets(&mut self, last_block: u64) -> ProviderResult<()> {
1527        debug_assert_eq!(self.writer.user_header().segment(), StaticFileSegment::AccountChangeSets);
1528        self.queue_prune(PruneStrategy::AccountChangeSets { last_block })
1529    }
1530
1531    /// Adds an instruction to prune storage changesets until the given block.
1532    pub fn prune_storage_changesets(&mut self, last_block: u64) -> ProviderResult<()> {
1533        debug_assert_eq!(self.writer.user_header().segment(), StaticFileSegment::StorageChangeSets);
1534        self.queue_prune(PruneStrategy::StorageChangeSets { last_block })
1535    }
1536
1537    /// Adds an instruction to prune elements during commit using the specified strategy.
1538    fn queue_prune(&mut self, strategy: PruneStrategy) -> ProviderResult<()> {
1539        self.ensure_no_queued_prune()?;
1540        self.prune_on_commit = Some(strategy);
1541        Ok(())
1542    }
1543
1544    /// Returns Error if there is a pruning instruction that needs to be applied.
1545    fn ensure_no_queued_prune(&self) -> ProviderResult<()> {
1546        if self.prune_on_commit.is_some() {
1547            return Err(ProviderError::other(StaticFileWriterError::new(
1548                "Pruning should be committed before appending or pruning more data",
1549            )));
1550        }
1551        Ok(())
1552    }
1553
1554    /// Removes the last `to_delete` transactions from the data file.
1555    fn prune_transaction_data(
1556        &mut self,
1557        to_delete: u64,
1558        last_block: BlockNumber,
1559    ) -> ProviderResult<()> {
1560        let start = Instant::now();
1561
1562        debug_assert!(self.writer.user_header().segment() == StaticFileSegment::Transactions);
1563
1564        self.truncate(to_delete, Some(last_block))?;
1565
1566        if let Some(metrics) = &self.metrics {
1567            metrics.record_segment_operation(
1568                StaticFileSegment::Transactions,
1569                StaticFileProviderOperation::Prune,
1570                Some(start.elapsed()),
1571            );
1572        }
1573
1574        Ok(())
1575    }
1576
1577    /// Prunes the last `to_delete` account changesets from the data file.
1578    fn prune_account_changeset_data(&mut self, last_block: BlockNumber) -> ProviderResult<()> {
1579        let start = Instant::now();
1580
1581        debug_assert!(self.writer.user_header().segment() == StaticFileSegment::AccountChangeSets);
1582
1583        self.truncate_changesets(last_block)?;
1584
1585        if let Some(metrics) = &self.metrics {
1586            metrics.record_segment_operation(
1587                StaticFileSegment::AccountChangeSets,
1588                StaticFileProviderOperation::Prune,
1589                Some(start.elapsed()),
1590            );
1591        }
1592
1593        Ok(())
1594    }
1595
1596    /// Prunes the last storage changesets from the data file.
1597    fn prune_storage_changeset_data(&mut self, last_block: BlockNumber) -> ProviderResult<()> {
1598        let start = Instant::now();
1599
1600        debug_assert!(self.writer.user_header().segment() == StaticFileSegment::StorageChangeSets);
1601
1602        self.truncate_changesets(last_block)?;
1603
1604        if let Some(metrics) = &self.metrics {
1605            metrics.record_segment_operation(
1606                StaticFileSegment::StorageChangeSets,
1607                StaticFileProviderOperation::Prune,
1608                Some(start.elapsed()),
1609            );
1610        }
1611
1612        Ok(())
1613    }
1614
1615    /// Prunes the last `to_delete` receipts from the data file.
1616    fn prune_receipt_data(
1617        &mut self,
1618        to_delete: u64,
1619        last_block: BlockNumber,
1620    ) -> ProviderResult<()> {
1621        let start = Instant::now();
1622
1623        debug_assert!(self.writer.user_header().segment() == StaticFileSegment::Receipts);
1624
1625        self.truncate(to_delete, Some(last_block))?;
1626
1627        if let Some(metrics) = &self.metrics {
1628            metrics.record_segment_operation(
1629                StaticFileSegment::Receipts,
1630                StaticFileProviderOperation::Prune,
1631                Some(start.elapsed()),
1632            );
1633        }
1634
1635        Ok(())
1636    }
1637
1638    /// Prunes the last `to_delete` transaction senders from the data file.
1639    fn prune_transaction_sender_data(
1640        &mut self,
1641        to_delete: u64,
1642        last_block: BlockNumber,
1643    ) -> ProviderResult<()> {
1644        let start = Instant::now();
1645
1646        debug_assert!(self.writer.user_header().segment() == StaticFileSegment::TransactionSenders);
1647
1648        self.truncate(to_delete, Some(last_block))?;
1649
1650        if let Some(metrics) = &self.metrics {
1651            metrics.record_segment_operation(
1652                StaticFileSegment::TransactionSenders,
1653                StaticFileProviderOperation::Prune,
1654                Some(start.elapsed()),
1655            );
1656        }
1657
1658        Ok(())
1659    }
1660
1661    /// Prunes the last `to_delete` headers from the data file.
1662    fn prune_header_data(&mut self, to_delete: u64) -> ProviderResult<()> {
1663        let start = Instant::now();
1664
1665        debug_assert!(self.writer.user_header().segment() == StaticFileSegment::Headers);
1666
1667        self.truncate(to_delete, None)?;
1668
1669        if let Some(metrics) = &self.metrics {
1670            metrics.record_segment_operation(
1671                StaticFileSegment::Headers,
1672                StaticFileProviderOperation::Prune,
1673                Some(start.elapsed()),
1674            );
1675        }
1676
1677        Ok(())
1678    }
1679
1680    /// Returns a [`StaticFileProvider`] associated with this writer.
1681    pub fn reader(&self) -> StaticFileProvider<N> {
1682        Self::upgrade_provider_to_strong_reference(&self.reader)
1683    }
1684
1685    /// Upgrades a weak reference of [`StaticFileProviderInner`] to a strong reference
1686    /// [`StaticFileProvider`].
1687    ///
1688    /// # Panics
1689    ///
1690    /// Panics if the parent [`StaticFileProvider`] is fully dropped while the child writer is still
1691    /// active. In reality, it's impossible to detach the [`StaticFileProviderRW`] from the
1692    /// [`StaticFileProvider`].
1693    fn upgrade_provider_to_strong_reference(
1694        provider: &Weak<StaticFileProviderInner<N>>,
1695    ) -> StaticFileProvider<N> {
1696        provider.upgrade().map(StaticFileProvider).expect("StaticFileProvider is dropped")
1697    }
1698
1699    /// Helper function to access [`SegmentHeader`].
1700    pub const fn user_header(&self) -> &SegmentHeader {
1701        self.writer.user_header()
1702    }
1703
1704    /// Helper function to access a mutable reference to [`SegmentHeader`].
1705    pub const fn user_header_mut(&mut self) -> &mut SegmentHeader {
1706        self.writer.user_header_mut()
1707    }
1708
1709    /// Helper function to override block range for testing.
1710    #[cfg(any(test, feature = "test-utils"))]
1711    pub const fn set_block_range(&mut self, block_range: std::ops::RangeInclusive<BlockNumber>) {
1712        self.writer.user_header_mut().set_block_range(*block_range.start(), *block_range.end())
1713    }
1714
1715    /// Helper function to override block range for testing.
1716    #[cfg(any(test, feature = "test-utils"))]
1717    pub const fn inner(&mut self) -> &mut NippyJarWriter<SegmentHeader> {
1718        &mut self.writer
1719    }
1720}
1721
1722fn create_jar(
1723    segment: StaticFileSegment,
1724    path: &Path,
1725    expected_block_range: SegmentRangeInclusive,
1726) -> NippyJar<SegmentHeader> {
1727    let mut jar = NippyJar::new(
1728        segment.columns(),
1729        path,
1730        SegmentHeader::new(expected_block_range, None, None, segment),
1731    );
1732
1733    // Transaction and Receipt already have the compression scheme used natively in its encoding.
1734    // (zstd-dictionary)
1735    if segment.is_headers() {
1736        jar = jar.with_lz4();
1737    }
1738
1739    jar
1740}