Skip to main content

reth_provider/providers/static_file/
writer.rs

1use super::{
2    manager::StaticFileProviderInner, metrics::StaticFileProviderMetrics, StaticFileProvider,
3};
4use crate::providers::static_file::metrics::StaticFileProviderOperation;
5use alloy_consensus::BlockHeader;
6use alloy_primitives::{BlockHash, BlockNumber, TxNumber, U256};
7use parking_lot::{lock_api::RwLockWriteGuard, RawRwLock, RwLock};
8use reth_codecs::Compact;
9use reth_db::models::{AccountBeforeTx, StorageBeforeTx};
10use reth_db_api::models::CompactU256;
11use reth_nippy_jar::{NippyJar, NippyJarError, NippyJarWriter};
12use reth_node_types::NodePrimitives;
13use reth_primitives_traits::FastInstant as Instant;
14use reth_static_file_types::{
15    ChangesetOffset, ChangesetOffsetReader, ChangesetOffsetWriter, SegmentHeader,
16    SegmentRangeInclusive, StaticFileSegment,
17};
18use reth_storage_errors::provider::{ProviderError, ProviderResult, StaticFileWriterError};
19use std::{
20    borrow::Borrow,
21    cmp::Ordering,
22    fmt::Debug,
23    path::{Path, PathBuf},
24    sync::{Arc, Weak},
25};
26use tracing::{debug, instrument};
27
28/// Represents different pruning strategies for various static file segments.
29#[derive(Debug, Clone, Copy)]
30enum PruneStrategy {
31    /// Prune headers by number of blocks to delete.
32    Headers {
33        /// Number of blocks to delete.
34        num_blocks: u64,
35    },
36    /// Prune transactions by number of rows and last block.
37    Transactions {
38        /// Number of transaction rows to delete.
39        num_rows: u64,
40        /// The last block number after pruning.
41        last_block: BlockNumber,
42    },
43    /// Prune receipts by number of rows and last block.
44    Receipts {
45        /// Number of receipt rows to delete.
46        num_rows: u64,
47        /// The last block number after pruning.
48        last_block: BlockNumber,
49    },
50    /// Prune transaction senders by number of rows and last block.
51    TransactionSenders {
52        /// Number of transaction sender rows to delete.
53        num_rows: u64,
54        /// The last block number after pruning.
55        last_block: BlockNumber,
56    },
57    /// Prune account changesets to a target block number.
58    AccountChangeSets {
59        /// The target block number to prune to.
60        last_block: BlockNumber,
61    },
62    /// Prune storage changesets to a target block number.
63    StorageChangeSets {
64        /// The target block number to prune to.
65        last_block: BlockNumber,
66    },
67}
68
69/// Static file writers for every known [`StaticFileSegment`].
70///
71/// WARNING: Trying to use more than one writer for the same segment type **will result in a
72/// deadlock**.
73#[derive(Debug)]
74pub(crate) struct StaticFileWriters<N> {
75    headers: RwLock<Option<StaticFileProviderRW<N>>>,
76    transactions: RwLock<Option<StaticFileProviderRW<N>>>,
77    receipts: RwLock<Option<StaticFileProviderRW<N>>>,
78    transaction_senders: RwLock<Option<StaticFileProviderRW<N>>>,
79    account_change_sets: RwLock<Option<StaticFileProviderRW<N>>>,
80    storage_change_sets: RwLock<Option<StaticFileProviderRW<N>>>,
81}
82
83impl<N> Default for StaticFileWriters<N> {
84    fn default() -> Self {
85        Self {
86            headers: Default::default(),
87            transactions: Default::default(),
88            receipts: Default::default(),
89            transaction_senders: Default::default(),
90            account_change_sets: Default::default(),
91            storage_change_sets: Default::default(),
92        }
93    }
94}
95
96impl<N: NodePrimitives> StaticFileWriters<N> {
97    pub(crate) fn get_or_create(
98        &self,
99        segment: StaticFileSegment,
100        create_fn: impl FnOnce() -> ProviderResult<StaticFileProviderRW<N>>,
101    ) -> ProviderResult<StaticFileProviderRWRefMut<'_, N>> {
102        let mut write_guard = match segment {
103            StaticFileSegment::Headers => self.headers.write(),
104            StaticFileSegment::Transactions => self.transactions.write(),
105            StaticFileSegment::Receipts => self.receipts.write(),
106            StaticFileSegment::TransactionSenders => self.transaction_senders.write(),
107            StaticFileSegment::AccountChangeSets => self.account_change_sets.write(),
108            StaticFileSegment::StorageChangeSets => self.storage_change_sets.write(),
109        };
110
111        if write_guard.is_none() {
112            *write_guard = Some(create_fn()?);
113        }
114
115        Ok(StaticFileProviderRWRefMut(write_guard))
116    }
117
118    #[instrument(
119        name = "StaticFileWriters::commit",
120        level = "debug",
121        target = "providers::static_file",
122        skip_all
123    )]
124    pub(crate) fn commit(&self) -> ProviderResult<()> {
125        debug!(target: "providers::static_file", "Committing all static file segments");
126
127        for writer_lock in [
128            &self.headers,
129            &self.transactions,
130            &self.receipts,
131            &self.transaction_senders,
132            &self.account_change_sets,
133            &self.storage_change_sets,
134        ] {
135            let mut writer = writer_lock.write();
136            if let Some(writer) = writer.as_mut() {
137                writer.commit()?;
138            }
139        }
140
141        debug!(target: "providers::static_file", "Committed all static file segments");
142        Ok(())
143    }
144
145    pub(crate) fn has_unwind_queued(&self) -> bool {
146        for writer_lock in [
147            &self.headers,
148            &self.transactions,
149            &self.receipts,
150            &self.transaction_senders,
151            &self.account_change_sets,
152            &self.storage_change_sets,
153        ] {
154            let writer = writer_lock.read();
155            if let Some(writer) = writer.as_ref() &&
156                writer.will_prune_on_commit()
157            {
158                return true
159            }
160        }
161        false
162    }
163
164    /// Finalizes all writers by committing their configuration to disk and updating indices.
165    ///
166    /// Must be called after `sync_all` was called on individual writers.
167    /// Returns an error if any writer has prune queued.
168    #[instrument(
169        name = "StaticFileWriters::finalize",
170        level = "debug",
171        target = "providers::static_file",
172        skip_all
173    )]
174    pub(crate) fn finalize(&self) -> ProviderResult<()> {
175        debug!(target: "providers::static_file", "Finalizing all static file segments into disk");
176
177        for writer_lock in [
178            &self.headers,
179            &self.transactions,
180            &self.receipts,
181            &self.transaction_senders,
182            &self.account_change_sets,
183            &self.storage_change_sets,
184        ] {
185            let mut writer = writer_lock.write();
186            if let Some(writer) = writer.as_mut() {
187                writer.finalize()?;
188            }
189        }
190
191        debug!(target: "providers::static_file", "Finalized all static file segments into disk");
192        Ok(())
193    }
194}
195
196/// Mutable reference to a [`StaticFileProviderRW`] behind a [`RwLockWriteGuard`].
197#[derive(Debug)]
198pub struct StaticFileProviderRWRefMut<'a, N>(
199    pub(crate) RwLockWriteGuard<'a, RawRwLock, Option<StaticFileProviderRW<N>>>,
200);
201
202impl<N> std::ops::DerefMut for StaticFileProviderRWRefMut<'_, N> {
203    fn deref_mut(&mut self) -> &mut Self::Target {
204        // This is always created by [`StaticFileWriters::get_or_create`]
205        self.0.as_mut().expect("static file writer provider should be init")
206    }
207}
208
209impl<N> std::ops::Deref for StaticFileProviderRWRefMut<'_, N> {
210    type Target = StaticFileProviderRW<N>;
211
212    fn deref(&self) -> &Self::Target {
213        // This is always created by [`StaticFileWriters::get_or_create`]
214        self.0.as_ref().expect("static file writer provider should be init")
215    }
216}
217
218#[derive(Debug)]
219/// Extends `StaticFileProvider` with writing capabilities
220pub struct StaticFileProviderRW<N> {
221    /// Reference back to the provider. We need [Weak] here because [`StaticFileProviderRW`] is
222    /// stored in a [`reth_primitives_traits::dashmap::DashMap`] inside the parent
223    /// [`StaticFileProvider`].which is an [Arc]. If we were to use an [Arc] here, we would
224    /// create a reference cycle.
225    reader: Weak<StaticFileProviderInner<N>>,
226    /// A [`NippyJarWriter`] instance.
227    writer: NippyJarWriter<SegmentHeader>,
228    /// Path to opened file.
229    data_path: PathBuf,
230    /// Reusable buffer for encoding appended data.
231    buf: Vec<u8>,
232    /// Metrics.
233    metrics: Option<Arc<StaticFileProviderMetrics>>,
234    /// On commit, contains the pruning strategy to apply for the segment.
235    prune_on_commit: Option<PruneStrategy>,
236    /// Whether `sync_all()` has been called. Used by `finalize()` to avoid redundant syncs.
237    synced: bool,
238    /// Changeset offsets sidecar writer (only for changeset segments).
239    changeset_offsets: Option<ChangesetOffsetWriter>,
240    /// Current block's changeset offset being written.
241    current_changeset_offset: Option<ChangesetOffset>,
242}
243
244impl<N: NodePrimitives> StaticFileProviderRW<N> {
245    /// Creates a new [`StaticFileProviderRW`] for a [`StaticFileSegment`].
246    ///
247    /// Before use, transaction based segments should ensure the block end range is the expected
248    /// one, and heal if not. For more check `Self::ensure_end_range_consistency`.
249    pub fn new(
250        segment: StaticFileSegment,
251        block: BlockNumber,
252        reader: Weak<StaticFileProviderInner<N>>,
253        metrics: Option<Arc<StaticFileProviderMetrics>>,
254    ) -> ProviderResult<Self> {
255        let (writer, data_path) = Self::open(segment, block, reader.clone(), metrics.clone())?;
256
257        // Create writer WITHOUT sidecar first - we'll add it after healing
258        let mut writer = Self {
259            writer,
260            data_path,
261            buf: Vec::with_capacity(100),
262            reader,
263            metrics,
264            prune_on_commit: None,
265            synced: false,
266            changeset_offsets: None,
267            current_changeset_offset: None,
268        };
269
270        // Run NippyJar healing BEFORE setting up changeset sidecar
271        // This may reduce rows, which affects valid sidecar offsets
272        writer.ensure_end_range_consistency()?;
273
274        // Now set up changeset sidecar with post-heal header values
275        if segment.is_change_based() {
276            writer.heal_changeset_sidecar()?;
277        }
278
279        Ok(writer)
280    }
281
282    fn open(
283        segment: StaticFileSegment,
284        block: u64,
285        reader: Weak<StaticFileProviderInner<N>>,
286        metrics: Option<Arc<StaticFileProviderMetrics>>,
287    ) -> ProviderResult<(NippyJarWriter<SegmentHeader>, PathBuf)> {
288        let start = Instant::now();
289
290        let static_file_provider = Self::upgrade_provider_to_strong_reference(&reader);
291
292        let block_range = static_file_provider.find_fixed_range(segment, block);
293        let (jar, path) = match static_file_provider.get_segment_provider_for_block(
294            segment,
295            block_range.start(),
296            None,
297        ) {
298            Ok(provider) => (
299                NippyJar::load(provider.data_path()).map_err(ProviderError::other)?,
300                provider.data_path().into(),
301            ),
302            Err(ProviderError::MissingStaticFileBlock(_, _)) => {
303                let path = static_file_provider.directory().join(segment.filename(&block_range));
304                (create_jar(segment, &path, block_range), path)
305            }
306            Err(err) => return Err(err),
307        };
308
309        let result = match NippyJarWriter::new(jar) {
310            Ok(writer) => Ok((writer, path)),
311            Err(NippyJarError::FrozenJar) => {
312                // This static file has been frozen, so we should
313                Err(ProviderError::FinalizedStaticFile(segment, block))
314            }
315            Err(e) => Err(ProviderError::other(e)),
316        }?;
317
318        if let Some(metrics) = &metrics {
319            metrics.record_segment_operation(
320                segment,
321                StaticFileProviderOperation::OpenWriter,
322                Some(start.elapsed()),
323            );
324        }
325
326        Ok(result)
327    }
328
329    /// If a file level healing happens, we need to update the end range on the
330    /// [`SegmentHeader`].
331    ///
332    /// However, for transaction based segments, the block end range has to be found and healed
333    /// externally.
334    ///
335    /// Check [`reth_nippy_jar::NippyJarChecker`] &
336    /// [`NippyJarWriter`] for more on healing.
337    fn ensure_end_range_consistency(&mut self) -> ProviderResult<()> {
338        // If we have lost rows (in this run or previous), we need to update the [SegmentHeader].
339        let expected_rows = if self.user_header().segment().is_headers() {
340            self.user_header().block_len().unwrap_or_default()
341        } else {
342            self.user_header().tx_len().unwrap_or_default()
343        };
344        let actual_rows = self.writer.rows() as u64;
345        let pruned_rows = expected_rows.saturating_sub(actual_rows);
346        if pruned_rows > 0 {
347            self.user_header_mut().prune(pruned_rows);
348        }
349
350        debug!(
351            target: "providers::static_file",
352            segment = ?self.writer.user_header().segment(),
353            path = ?self.data_path,
354            pruned_rows,
355            "Ensuring end range consistency"
356        );
357
358        self.writer.commit().map_err(ProviderError::other)?;
359
360        // Updates the [SnapshotProvider] manager
361        self.update_index()?;
362        Ok(())
363    }
364
365    /// Returns `true` if the writer will prune on commit.
366    pub const fn will_prune_on_commit(&self) -> bool {
367        self.prune_on_commit.is_some()
368    }
369
370    /// Heals the changeset offset sidecar after `NippyJar` healing.
371    ///
372    /// This must be called AFTER `ensure_end_range_consistency()` which may reduce rows.
373    /// Performs three-way consistency check between header, `NippyJar` rows, and sidecar file:
374    /// - Validates sidecar offsets don't point past actual `NippyJar` rows
375    /// - Heals header if sidecar was truncated during interrupted prune
376    /// - Truncates sidecar if offsets point past healed `NippyJar` data
377    fn heal_changeset_sidecar(&mut self) -> ProviderResult<()> {
378        let csoff_path = self.data_path.with_extension("csoff");
379
380        // Step 1: Read all three sources of truth
381        let header_claims_blocks = self.writer.user_header().changeset_offsets_len();
382        let actual_nippy_rows = self.writer.rows() as u64;
383
384        // Get actual sidecar file size (may differ from header after crash)
385        let actual_sidecar_blocks = if csoff_path.exists() {
386            let file_len = reth_fs_util::metadata(&csoff_path).map_err(ProviderError::other)?.len();
387            // Remove partial records from crash mid-write
388            let aligned_len = file_len - (file_len % 16);
389            aligned_len / 16
390        } else {
391            0
392        };
393
394        // Fresh segment or no sidecar data - nothing to heal
395        if header_claims_blocks == 0 && actual_sidecar_blocks == 0 {
396            self.changeset_offsets =
397                Some(ChangesetOffsetWriter::new(&csoff_path, 0).map_err(ProviderError::other)?);
398            return Ok(());
399        }
400
401        // Step 2: Validate sidecar offsets against actual NippyJar state
402        let valid_blocks = if actual_sidecar_blocks > 0 {
403            let reader = ChangesetOffsetReader::new(&csoff_path, actual_sidecar_blocks)
404                .map_err(ProviderError::other)?;
405
406            // Find last block where offset + num_changes <= actual_nippy_rows
407            // This correctly handles rows=0 with offset=0, num_changes=0 (empty blocks)
408            let mut valid = 0u64;
409            for i in 0..actual_sidecar_blocks {
410                if let Some(offset) = reader.get(i).map_err(ProviderError::other)? {
411                    if offset.offset() + offset.num_changes() <= actual_nippy_rows {
412                        valid = i + 1;
413                    } else {
414                        // This block points past EOF - stop here
415                        break;
416                    }
417                }
418            }
419            valid
420        } else {
421            0
422        };
423
424        // Step 3: Determine correct state from synced files (source of truth)
425        // Header is the commit marker - never enlarge, only shrink
426        let correct_blocks = valid_blocks.min(header_claims_blocks);
427
428        // Step 4: Heal if header doesn't match validated truth
429        let mut needs_header_commit = false;
430
431        if correct_blocks != header_claims_blocks || actual_sidecar_blocks != correct_blocks {
432            tracing::warn!(
433                target: "reth::static_file",
434                path = %csoff_path.display(),
435                header_claims = header_claims_blocks,
436                sidecar_has = actual_sidecar_blocks,
437                valid_blocks = correct_blocks,
438                actual_rows = actual_nippy_rows,
439                "Three-way healing: syncing header, sidecar, and NippyJar state"
440            );
441
442            // Truncate sidecar file if it has invalid blocks
443            if actual_sidecar_blocks > correct_blocks {
444                use std::fs::OpenOptions;
445                let file = OpenOptions::new()
446                    .write(true)
447                    .open(&csoff_path)
448                    .map_err(ProviderError::other)?;
449                file.set_len(correct_blocks * 16).map_err(ProviderError::other)?;
450                file.sync_all().map_err(ProviderError::other)?;
451
452                tracing::debug!(
453                    target: "reth::static_file",
454                    "Truncated sidecar from {} to {} blocks",
455                    actual_sidecar_blocks,
456                    correct_blocks
457                );
458            }
459
460            // Update header to match validated truth (can only shrink, never enlarge)
461            if correct_blocks < header_claims_blocks {
462                // Blocks were removed - use prune() to update both block_range and
463                // changeset_offsets_len atomically
464                let blocks_removed = header_claims_blocks - correct_blocks;
465                self.writer.user_header_mut().prune(blocks_removed);
466
467                tracing::debug!(
468                    target: "reth::static_file",
469                    "Updated header: removed {} blocks (changeset_offsets_len: {} -> {})",
470                    blocks_removed,
471                    header_claims_blocks,
472                    correct_blocks
473                );
474
475                needs_header_commit = true;
476            }
477        } else {
478            tracing::debug!(
479                target: "reth::static_file",
480                path = %csoff_path.display(),
481                blocks = correct_blocks,
482                "Changeset sidecar consistent, no healing needed"
483            );
484        }
485
486        // Open sidecar writer with corrected count (won't error now that sizes match)
487        let csoff_writer = ChangesetOffsetWriter::new(&csoff_path, correct_blocks)
488            .map_err(ProviderError::other)?;
489
490        self.changeset_offsets = Some(csoff_writer);
491
492        // Commit healed header if needed (after sidecar writer is set up)
493        if needs_header_commit {
494            self.writer.commit().map_err(ProviderError::other)?;
495
496            tracing::info!(
497                target: "reth::static_file",
498                path = %csoff_path.display(),
499                blocks = correct_blocks,
500                "Committed healed changeset offset header"
501            );
502        }
503
504        Ok(())
505    }
506
507    /// Flushes the current changeset offset (if any) to the `.csoff` sidecar file.
508    ///
509    /// This is idempotent - safe to call multiple times. After flushing, the current offset
510    /// is cleared to prevent duplicate writes.
511    ///
512    /// This must be called before committing or syncing to ensure the last block's offset
513    /// is persisted, since `increment_block()` only writes the *previous* block's offset.
514    fn flush_current_changeset_offset(&mut self) -> ProviderResult<()> {
515        if !self.writer.user_header().segment().is_change_based() {
516            return Ok(());
517        }
518
519        if let Some(offset) = self.current_changeset_offset.take() &&
520            let Some(writer) = &mut self.changeset_offsets
521        {
522            writer.append(&offset).map_err(ProviderError::other)?;
523        }
524        Ok(())
525    }
526
527    /// Syncs all data (rows, offsets, and changeset offsets sidecar) to disk.
528    ///
529    /// This does NOT commit the configuration. Call [`Self::finalize`] after to write the
530    /// configuration and mark the writer as clean.
531    ///
532    /// Returns an error if prune is queued (use [`Self::commit`] instead).
533    pub fn sync_all(&mut self) -> ProviderResult<()> {
534        if self.prune_on_commit.is_some() {
535            return Err(StaticFileWriterError::FinalizeWithPruneQueued.into());
536        }
537
538        // Write the final block's offset and sync the sidecar for changeset segments
539        self.flush_current_changeset_offset()?;
540        if let Some(writer) = &mut self.changeset_offsets {
541            writer.sync().map_err(ProviderError::other)?;
542            // Update the header with the actual number of offsets written
543            self.writer.user_header_mut().set_changeset_offsets_len(writer.len());
544        }
545
546        if self.writer.is_dirty() {
547            self.writer.sync_all().map_err(ProviderError::other)?;
548        }
549        self.synced = true;
550        Ok(())
551    }
552
553    /// Commits configuration to disk and updates the reader index.
554    ///
555    /// If `sync_all()` was not called, this will call it first to ensure data is persisted.
556    ///
557    /// Returns an error if prune is queued (use [`Self::commit`] instead).
558    #[instrument(
559        name = "StaticFileProviderRW::finalize",
560        level = "debug",
561        target = "providers::static_file",
562        skip_all
563    )]
564    pub fn finalize(&mut self) -> ProviderResult<()> {
565        if self.prune_on_commit.is_some() {
566            return Err(StaticFileWriterError::FinalizeWithPruneQueued.into());
567        }
568        if self.writer.is_dirty() {
569            if !self.synced {
570                // Must call self.sync_all() to flush changeset offsets and update
571                // the header's changeset_offsets_len, not just the inner writer
572                self.sync_all()?;
573            }
574
575            self.writer.finalize().map_err(ProviderError::other)?;
576            self.update_index()?;
577        }
578        self.synced = false;
579        Ok(())
580    }
581
582    /// Commits configuration changes to disk and updates the reader index with the new changes.
583    #[instrument(
584        name = "StaticFileProviderRW::commit",
585        level = "debug",
586        target = "providers::static_file",
587        skip_all
588    )]
589    pub fn commit(&mut self) -> ProviderResult<()> {
590        let start = Instant::now();
591
592        // Truncates the data file if instructed to.
593        if let Some(strategy) = self.prune_on_commit.take() {
594            debug!(
595                target: "providers::static_file",
596                segment = ?self.writer.user_header().segment(),
597                "Pruning data on commit"
598            );
599            match strategy {
600                PruneStrategy::Headers { num_blocks } => self.prune_header_data(num_blocks)?,
601                PruneStrategy::Transactions { num_rows, last_block } => {
602                    self.prune_transaction_data(num_rows, last_block)?
603                }
604                PruneStrategy::Receipts { num_rows, last_block } => {
605                    self.prune_receipt_data(num_rows, last_block)?
606                }
607                PruneStrategy::TransactionSenders { num_rows, last_block } => {
608                    self.prune_transaction_sender_data(num_rows, last_block)?
609                }
610                PruneStrategy::AccountChangeSets { last_block } => {
611                    self.prune_account_changeset_data(last_block)?
612                }
613                PruneStrategy::StorageChangeSets { last_block } => {
614                    self.prune_storage_changeset_data(last_block)?
615                }
616            }
617        }
618
619        // For changeset segments, flush and sync the sidecar file before committing the main file.
620        // This ensures crash consistency: the sidecar is durable before the header references it.
621        self.flush_current_changeset_offset()?;
622        if let Some(writer) = &mut self.changeset_offsets {
623            writer.sync().map_err(ProviderError::other)?;
624            // Update the header with the actual number of offsets written
625            self.writer.user_header_mut().set_changeset_offsets_len(writer.len());
626        }
627
628        if self.writer.is_dirty() {
629            debug!(
630                target: "providers::static_file",
631                segment = ?self.writer.user_header().segment(),
632                "Committing writer to disk"
633            );
634
635            // Commits offsets and new user_header to disk
636            self.writer.commit().map_err(ProviderError::other)?;
637
638            if let Some(metrics) = &self.metrics {
639                metrics.record_segment_operation(
640                    self.writer.user_header().segment(),
641                    StaticFileProviderOperation::CommitWriter,
642                    Some(start.elapsed()),
643                );
644            }
645
646            debug!(
647                target: "providers::static_file",
648                segment = ?self.writer.user_header().segment(),
649                path = ?self.data_path,
650                duration = ?start.elapsed(),
651                "Committed writer to disk"
652            );
653
654            self.update_index()?;
655        }
656
657        Ok(())
658    }
659
660    /// Commits configuration changes to disk and updates the reader index with the new changes.
661    ///
662    /// CAUTION: does not call `sync_all` on the files.
663    #[cfg(feature = "test-utils")]
664    pub fn commit_without_sync_all(&mut self) -> ProviderResult<()> {
665        let start = Instant::now();
666
667        debug!(
668            target: "providers::static_file",
669            segment = ?self.writer.user_header().segment(),
670            "Committing writer to disk (without sync)"
671        );
672
673        // Commits offsets and new user_header to disk
674        self.writer.commit_without_sync_all().map_err(ProviderError::other)?;
675
676        if let Some(metrics) = &self.metrics {
677            metrics.record_segment_operation(
678                self.writer.user_header().segment(),
679                StaticFileProviderOperation::CommitWriter,
680                Some(start.elapsed()),
681            );
682        }
683
684        debug!(
685            target: "providers::static_file",
686            segment = ?self.writer.user_header().segment(),
687            path = ?self.data_path,
688            duration = ?start.elapsed(),
689            "Committed writer to disk (without sync)"
690        );
691
692        self.update_index()?;
693
694        Ok(())
695    }
696
697    /// Updates the `self.reader` internal index.
698    fn update_index(&self) -> ProviderResult<()> {
699        let segment = self.writer.user_header().segment();
700
701        // We find the maximum block of the segment by checking this writer's last block.
702        //
703        // However if there's no block range (because there's no data), we try to calculate it by
704        // subtracting 1 from the expected block start, resulting on the last block of the
705        // previous file — but only if that file actually exists. If the previous file doesn't
706        // exist (e.g. first-ever file for a segment starting past range boundary), there's
707        // nothing to index.
708        let segment_max_block = self
709            .writer
710            .user_header()
711            .block_range()
712            .as_ref()
713            .map(|block_range| block_range.end())
714            .or_else(|| {
715                let expected_start = self.writer.user_header().expected_block_start();
716                if expected_start <= self.reader().genesis_block_number() {
717                    return None;
718                }
719
720                let prev_block = expected_start - 1;
721                let prev_range = self.reader().find_fixed_range(segment, prev_block);
722                let prev_path = self.reader().directory().join(segment.filename(&prev_range));
723                prev_path.exists().then_some(prev_block)
724            });
725
726        self.reader().update_index(segment, segment_max_block)
727    }
728
729    /// Ensures that the writer is positioned at the specified block number.
730    ///
731    /// If the writer is positioned at a greater block number than the specified one, the writer
732    /// will NOT be unwound and the error will be returned.
733    pub fn ensure_at_block(&mut self, advance_to: BlockNumber) -> ProviderResult<()> {
734        let current_block = if let Some(current_block_number) = self.current_block_number() {
735            current_block_number
736        } else {
737            self.increment_block(0)?;
738            0
739        };
740
741        match current_block.cmp(&advance_to) {
742            Ordering::Less => {
743                for block in current_block + 1..=advance_to {
744                    self.increment_block(block)?;
745                }
746            }
747            Ordering::Equal => {}
748            Ordering::Greater => {
749                return Err(ProviderError::UnexpectedStaticFileBlockNumber(
750                    self.writer.user_header().segment(),
751                    current_block,
752                    advance_to,
753                ));
754            }
755        }
756
757        Ok(())
758    }
759
760    /// Allows to increment the [`SegmentHeader`] end block. It will commit the current static file,
761    /// and create the next one if we are past the end range.
762    pub fn increment_block(&mut self, expected_block_number: BlockNumber) -> ProviderResult<()> {
763        let segment = self.writer.user_header().segment();
764
765        self.check_next_block_number(expected_block_number)?;
766
767        let start = Instant::now();
768        if let Some(last_block) = self.writer.user_header().block_end() {
769            // We have finished the previous static file and must freeze it
770            if last_block == self.writer.user_header().expected_block_end() {
771                // Commits offsets and new user_header to disk
772                self.commit()?;
773
774                // Opens the new static file
775                let (writer, data_path) =
776                    Self::open(segment, last_block + 1, self.reader.clone(), self.metrics.clone())?;
777                self.writer = writer;
778                self.data_path = data_path.clone();
779
780                // Update changeset offsets writer for the new file (starts empty)
781                if segment.is_change_based() {
782                    let csoff_path = data_path.with_extension("csoff");
783                    self.changeset_offsets = Some(
784                        ChangesetOffsetWriter::new(&csoff_path, 0).map_err(ProviderError::other)?,
785                    );
786                }
787
788                *self.writer.user_header_mut() = SegmentHeader::new(
789                    self.reader().find_fixed_range(segment, last_block + 1),
790                    None,
791                    None,
792                    segment,
793                );
794            }
795        }
796
797        self.writer.user_header_mut().increment_block();
798
799        // Handle changeset offset tracking for changeset segments
800        if segment.is_change_based() {
801            // Write previous block's offset if we have one
802            if let Some(offset) = self.current_changeset_offset.take() &&
803                let Some(writer) = &mut self.changeset_offsets
804            {
805                writer.append(&offset).map_err(ProviderError::other)?;
806            }
807            // Start tracking new block's offset
808            let new_offset = self.writer.rows() as u64;
809            self.current_changeset_offset = Some(ChangesetOffset::new(new_offset, 0));
810        }
811
812        if let Some(metrics) = &self.metrics {
813            metrics.record_segment_operation(
814                segment,
815                StaticFileProviderOperation::IncrementBlock,
816                Some(start.elapsed()),
817            );
818        }
819
820        Ok(())
821    }
822
823    /// Returns the current block number of the static file writer.
824    pub fn current_block_number(&self) -> Option<u64> {
825        self.writer.user_header().block_end()
826    }
827
828    /// Returns a block number that is one next to the current tip of static files.
829    pub fn next_block_number(&self) -> u64 {
830        // The next static file block number can be found by checking the one after block_end.
831        // However, if it's a new file that hasn't been added any data, its block range will
832        // actually be None. In that case, the next block will be found on `expected_block_start`.
833        self.writer
834            .user_header()
835            .block_end()
836            .map(|b| b + 1)
837            .unwrap_or_else(|| self.writer.user_header().expected_block_start())
838    }
839
840    /// Verifies if the incoming block number matches the next expected block number
841    /// for a static file. This ensures data continuity when adding new blocks.
842    fn check_next_block_number(&self, expected_block_number: u64) -> ProviderResult<()> {
843        let next_static_file_block = self.next_block_number();
844
845        if expected_block_number != next_static_file_block {
846            return Err(ProviderError::UnexpectedStaticFileBlockNumber(
847                self.writer.user_header().segment(),
848                expected_block_number,
849                next_static_file_block,
850            ))
851        }
852        Ok(())
853    }
854
855    /// Truncates account changesets to the given block. It deletes and loads an older static file
856    /// if the block goes beyond the start of the current block range.
857    ///
858    /// # Note
859    /// Commits to the configuration file at the end
860    fn truncate_changesets(&mut self, last_block: u64) -> ProviderResult<()> {
861        let segment = self.writer.user_header().segment();
862        debug_assert!(segment.is_change_based());
863
864        // Get the current block range
865        let current_block_end = self
866            .writer
867            .user_header()
868            .block_end()
869            .ok_or(ProviderError::MissingStaticFileBlock(segment, 0))?;
870
871        // If we're already at or before the target block, nothing to do
872        if current_block_end <= last_block {
873            return Ok(())
874        }
875
876        // Navigate to the correct file if the target block is in a previous file
877        let mut expected_block_start = self.writer.user_header().expected_block_start();
878        while last_block < expected_block_start && expected_block_start > 0 {
879            self.delete_current_and_open_previous()?;
880            expected_block_start = self.writer.user_header().expected_block_start();
881        }
882
883        // Find the number of rows to keep (up to and including last_block)
884        let blocks_to_keep = if last_block >= expected_block_start {
885            last_block - expected_block_start + 1
886        } else {
887            0
888        };
889
890        // Read changeset offsets from sidecar file to find where to truncate
891        let csoff_path = self.data_path.with_extension("csoff");
892        let changeset_offsets_len = self.writer.user_header().changeset_offsets_len();
893
894        // Flush any pending changeset offset before reading the sidecar
895        self.flush_current_changeset_offset()?;
896
897        let rows_to_keep = if blocks_to_keep == 0 {
898            0
899        } else if blocks_to_keep >= changeset_offsets_len {
900            // Keep all rows in this file
901            self.writer.rows() as u64
902        } else {
903            // Read offset for the block after last_block from sidecar.
904            // Use committed length from header, ignoring any uncommitted records
905            // that may exist in the file after a crash.
906            let reader = ChangesetOffsetReader::new(&csoff_path, changeset_offsets_len)
907                .map_err(ProviderError::other)?;
908            if let Some(next_offset) = reader.get(blocks_to_keep).map_err(ProviderError::other)? {
909                next_offset.offset()
910            } else {
911                // If we can't read the offset, keep all rows
912                self.writer.rows() as u64
913            }
914        };
915
916        let total_rows = self.writer.rows() as u64;
917        let rows_to_delete = total_rows.saturating_sub(rows_to_keep);
918
919        if rows_to_delete > 0 {
920            // Calculate the number of blocks to prune
921            let current_block_end = self
922                .writer
923                .user_header()
924                .block_end()
925                .ok_or(ProviderError::MissingStaticFileBlock(segment, 0))?;
926            let blocks_to_remove = current_block_end - last_block;
927
928            // Update segment header - for changesets, prune expects number of blocks, not rows
929            self.writer.user_header_mut().prune(blocks_to_remove);
930
931            // Prune the actual rows
932            self.writer.prune_rows(rows_to_delete as usize).map_err(ProviderError::other)?;
933        }
934
935        // Update the block range
936        self.writer.user_header_mut().set_block_range(expected_block_start, last_block);
937
938        // Sync changeset offsets to match the new block range
939        self.writer.user_header_mut().sync_changeset_offsets();
940
941        // Truncate the sidecar file to match the new block count
942        if let Some(writer) = &mut self.changeset_offsets {
943            writer.truncate(blocks_to_keep).map_err(ProviderError::other)?;
944        }
945
946        // Clear current changeset offset tracking since we've pruned
947        self.current_changeset_offset = None;
948
949        // Commits new changes to disk
950        self.commit()?;
951
952        Ok(())
953    }
954
955    /// Truncates a number of rows from disk. It deletes and loads an older static file if block
956    /// goes beyond the start of the current block range.
957    ///
958    /// **`last_block`** should be passed only with transaction based segments.
959    ///
960    /// # Note
961    /// Commits to the configuration file at the end.
962    fn truncate(&mut self, num_rows: u64, last_block: Option<u64>) -> ProviderResult<()> {
963        let mut remaining_rows = num_rows;
964        let segment = self.writer.user_header().segment();
965        while remaining_rows > 0 {
966            let len = if segment.is_block_based() {
967                self.writer.user_header().block_len().unwrap_or_default()
968            } else {
969                self.writer.user_header().tx_len().unwrap_or_default()
970            };
971
972            if remaining_rows >= len {
973                // If there's more rows to delete than this static file contains, then just
974                // delete the whole file and go to the next static file
975                let block_start = self.writer.user_header().expected_block_start();
976
977                // We only delete the file if it's NOT the first static file AND:
978                // * it's a Header segment  OR
979                // * it's a tx-based segment AND `last_block` is lower than the first block of this
980                //   file's block range. Otherwise, having no rows simply means that this block
981                //   range has no transactions, but the file should remain.
982                if block_start != 0 &&
983                    (segment.is_headers() || last_block.is_some_and(|b| b < block_start))
984                {
985                    self.delete_current_and_open_previous()?;
986                } else {
987                    // Update `SegmentHeader`
988                    self.writer.user_header_mut().prune(len);
989                    self.writer.prune_rows(len as usize).map_err(ProviderError::other)?;
990                    break
991                }
992
993                remaining_rows -= len;
994            } else {
995                // Update `SegmentHeader`
996                self.writer.user_header_mut().prune(remaining_rows);
997
998                // Truncate data
999                self.writer.prune_rows(remaining_rows as usize).map_err(ProviderError::other)?;
1000                remaining_rows = 0;
1001            }
1002        }
1003
1004        // Only Transactions and Receipts
1005        if let Some(last_block) = last_block {
1006            let mut expected_block_start = self.writer.user_header().expected_block_start();
1007
1008            if num_rows == 0 {
1009                // Edge case for when we are unwinding a chain of empty blocks that goes across
1010                // files, and therefore, the only reference point to know which file
1011                // we are supposed to be at is `last_block`.
1012                while last_block < expected_block_start {
1013                    self.delete_current_and_open_previous()?;
1014                    expected_block_start = self.writer.user_header().expected_block_start();
1015                }
1016            }
1017            self.writer.user_header_mut().set_block_range(expected_block_start, last_block);
1018        }
1019
1020        // Commits new changes to disk.
1021        self.commit()?;
1022
1023        Ok(())
1024    }
1025
1026    /// Delete the current static file, and replace this provider writer with the previous static
1027    /// file.
1028    fn delete_current_and_open_previous(&mut self) -> Result<(), ProviderError> {
1029        let segment = self.user_header().segment();
1030        let current_path = self.data_path.clone();
1031        let (previous_writer, data_path) = Self::open(
1032            segment,
1033            self.writer.user_header().expected_block_start() - 1,
1034            self.reader.clone(),
1035            self.metrics.clone(),
1036        )?;
1037        self.writer = previous_writer;
1038        self.writer.set_dirty();
1039        self.data_path = data_path.clone();
1040
1041        // Delete the sidecar file for changeset segments before deleting the main jar
1042        if segment.is_change_based() {
1043            let csoff_path = current_path.with_extension("csoff");
1044            if csoff_path.exists() {
1045                std::fs::remove_file(&csoff_path).map_err(ProviderError::other)?;
1046            }
1047            // Re-initialize the changeset offsets writer for the previous file
1048            let new_csoff_path = data_path.with_extension("csoff");
1049            let committed_len = self.writer.user_header().changeset_offsets_len();
1050            self.changeset_offsets = Some(
1051                ChangesetOffsetWriter::new(&new_csoff_path, committed_len)
1052                    .map_err(ProviderError::other)?,
1053            );
1054        }
1055
1056        // Clear current changeset offset tracking since we're switching files
1057        self.current_changeset_offset = None;
1058
1059        NippyJar::<SegmentHeader>::load(&current_path)
1060            .map_err(ProviderError::other)?
1061            .delete()
1062            .map_err(ProviderError::other)?;
1063        Ok(())
1064    }
1065
1066    /// Appends column to static file.
1067    fn append_column<T: Compact>(&mut self, column: T) -> ProviderResult<()> {
1068        self.buf.clear();
1069        column.to_compact(&mut self.buf);
1070
1071        self.writer.append_column(Some(Ok(&self.buf))).map_err(ProviderError::other)?;
1072        Ok(())
1073    }
1074
1075    /// Appends to tx number-based static file.
1076    fn append_with_tx_number<V: Compact>(
1077        &mut self,
1078        tx_num: TxNumber,
1079        value: V,
1080    ) -> ProviderResult<()> {
1081        if let Some(range) = self.writer.user_header().tx_range() {
1082            let next_tx = range.end() + 1;
1083            if next_tx != tx_num {
1084                return Err(ProviderError::UnexpectedStaticFileTxNumber(
1085                    self.writer.user_header().segment(),
1086                    tx_num,
1087                    next_tx,
1088                ))
1089            }
1090            self.writer.user_header_mut().increment_tx();
1091        } else {
1092            self.writer.user_header_mut().set_tx_range(tx_num, tx_num);
1093        }
1094
1095        self.append_column(value)?;
1096
1097        Ok(())
1098    }
1099
1100    /// Appends change to changeset static file.
1101    fn append_change<V: Compact>(&mut self, change: &V) -> ProviderResult<()> {
1102        if let Some(ref mut offset) = self.current_changeset_offset {
1103            offset.increment_num_changes();
1104        }
1105        self.append_column(change)?;
1106        Ok(())
1107    }
1108
1109    /// Appends header to static file.
1110    ///
1111    /// It **CALLS** `increment_block()` since the number of headers is equal to the number of
1112    /// blocks.
1113    pub fn append_header(&mut self, header: &N::BlockHeader, hash: &BlockHash) -> ProviderResult<()>
1114    where
1115        N::BlockHeader: Compact,
1116    {
1117        self.append_header_with_td(header, U256::ZERO, hash)
1118    }
1119
1120    /// Appends header to static file with a specified total difficulty.
1121    ///
1122    /// It **CALLS** `increment_block()` since the number of headers is equal to the number of
1123    /// blocks.
1124    pub fn append_header_with_td(
1125        &mut self,
1126        header: &N::BlockHeader,
1127        total_difficulty: U256,
1128        hash: &BlockHash,
1129    ) -> ProviderResult<()>
1130    where
1131        N::BlockHeader: Compact,
1132    {
1133        let start = Instant::now();
1134        self.ensure_no_queued_prune()?;
1135
1136        debug_assert!(self.writer.user_header().segment() == StaticFileSegment::Headers);
1137
1138        self.increment_block(header.number())?;
1139
1140        self.append_column(header)?;
1141        self.append_column(CompactU256::from(total_difficulty))?;
1142        self.append_column(hash)?;
1143
1144        if let Some(metrics) = &self.metrics {
1145            metrics.record_segment_operation(
1146                StaticFileSegment::Headers,
1147                StaticFileProviderOperation::Append,
1148                Some(start.elapsed()),
1149            );
1150        }
1151
1152        Ok(())
1153    }
1154
1155    /// Appends header to static file without calling `increment_block`.
1156    /// This is useful for genesis blocks with non-zero block numbers.
1157    pub fn append_header_direct(
1158        &mut self,
1159        header: &N::BlockHeader,
1160        total_difficulty: U256,
1161        hash: &BlockHash,
1162    ) -> ProviderResult<()>
1163    where
1164        N::BlockHeader: Compact,
1165    {
1166        let start = Instant::now();
1167        self.ensure_no_queued_prune()?;
1168
1169        debug_assert!(self.writer.user_header().segment() == StaticFileSegment::Headers);
1170
1171        self.append_column(header)?;
1172        self.append_column(CompactU256::from(total_difficulty))?;
1173        self.append_column(hash)?;
1174
1175        if let Some(metrics) = &self.metrics {
1176            metrics.record_segment_operation(
1177                StaticFileSegment::Headers,
1178                StaticFileProviderOperation::Append,
1179                Some(start.elapsed()),
1180            );
1181        }
1182
1183        Ok(())
1184    }
1185
1186    /// Appends transaction to static file.
1187    ///
1188    /// It **DOES NOT CALL** `increment_block()`, it should be handled elsewhere. There might be
1189    /// empty blocks and this function wouldn't be called.
1190    pub fn append_transaction(&mut self, tx_num: TxNumber, tx: &N::SignedTx) -> ProviderResult<()>
1191    where
1192        N::SignedTx: Compact,
1193    {
1194        let start = Instant::now();
1195        self.ensure_no_queued_prune()?;
1196
1197        debug_assert!(self.writer.user_header().segment() == StaticFileSegment::Transactions);
1198        self.append_with_tx_number(tx_num, tx)?;
1199
1200        if let Some(metrics) = &self.metrics {
1201            metrics.record_segment_operation(
1202                StaticFileSegment::Transactions,
1203                StaticFileProviderOperation::Append,
1204                Some(start.elapsed()),
1205            );
1206        }
1207
1208        Ok(())
1209    }
1210
1211    /// Appends receipt to static file.
1212    ///
1213    /// It **DOES NOT** call `increment_block()`, it should be handled elsewhere. There might be
1214    /// empty blocks and this function wouldn't be called.
1215    pub fn append_receipt(&mut self, tx_num: TxNumber, receipt: &N::Receipt) -> ProviderResult<()>
1216    where
1217        N::Receipt: Compact,
1218    {
1219        let start = Instant::now();
1220        self.ensure_no_queued_prune()?;
1221
1222        debug_assert!(self.writer.user_header().segment() == StaticFileSegment::Receipts);
1223        self.append_with_tx_number(tx_num, receipt)?;
1224
1225        if let Some(metrics) = &self.metrics {
1226            metrics.record_segment_operation(
1227                StaticFileSegment::Receipts,
1228                StaticFileProviderOperation::Append,
1229                Some(start.elapsed()),
1230            );
1231        }
1232
1233        Ok(())
1234    }
1235
1236    /// Appends multiple receipts to the static file.
1237    pub fn append_receipts<I, R>(&mut self, receipts: I) -> ProviderResult<()>
1238    where
1239        I: Iterator<Item = Result<(TxNumber, R), ProviderError>>,
1240        R: Borrow<N::Receipt>,
1241        N::Receipt: Compact,
1242    {
1243        debug_assert!(self.writer.user_header().segment() == StaticFileSegment::Receipts);
1244
1245        let mut receipts_iter = receipts.into_iter().peekable();
1246        // If receipts are empty, we can simply return None
1247        if receipts_iter.peek().is_none() {
1248            return Ok(());
1249        }
1250
1251        let start = Instant::now();
1252        self.ensure_no_queued_prune()?;
1253
1254        // At this point receipts contains at least one receipt, so this would be overwritten.
1255        let mut count: u64 = 0;
1256
1257        for receipt_result in receipts_iter {
1258            let (tx_num, receipt) = receipt_result?;
1259            self.append_with_tx_number(tx_num, receipt.borrow())?;
1260            count += 1;
1261        }
1262
1263        if let Some(metrics) = &self.metrics {
1264            metrics.record_segment_operations(
1265                StaticFileSegment::Receipts,
1266                StaticFileProviderOperation::Append,
1267                count,
1268                Some(start.elapsed()),
1269            );
1270        }
1271
1272        Ok(())
1273    }
1274
1275    /// Appends transaction sender to static file.
1276    ///
1277    /// It **DOES NOT** call `increment_block()`, it should be handled elsewhere. There might be
1278    /// empty blocks and this function wouldn't be called.
1279    pub fn append_transaction_sender(
1280        &mut self,
1281        tx_num: TxNumber,
1282        sender: &alloy_primitives::Address,
1283    ) -> ProviderResult<()> {
1284        let start = Instant::now();
1285        self.ensure_no_queued_prune()?;
1286
1287        debug_assert!(self.writer.user_header().segment() == StaticFileSegment::TransactionSenders);
1288        self.append_with_tx_number(tx_num, sender)?;
1289
1290        if let Some(metrics) = &self.metrics {
1291            metrics.record_segment_operation(
1292                StaticFileSegment::TransactionSenders,
1293                StaticFileProviderOperation::Append,
1294                Some(start.elapsed()),
1295            );
1296        }
1297
1298        Ok(())
1299    }
1300
1301    /// Appends multiple transaction senders to the static file.
1302    pub fn append_transaction_senders<I>(&mut self, senders: I) -> ProviderResult<()>
1303    where
1304        I: Iterator<Item = (TxNumber, alloy_primitives::Address)>,
1305    {
1306        debug_assert!(self.writer.user_header().segment() == StaticFileSegment::TransactionSenders);
1307
1308        let mut senders_iter = senders.into_iter().peekable();
1309        // If senders are empty, we can simply return
1310        if senders_iter.peek().is_none() {
1311            return Ok(());
1312        }
1313
1314        let start = Instant::now();
1315        self.ensure_no_queued_prune()?;
1316
1317        // At this point senders contains at least one sender, so this would be overwritten.
1318        let mut count: u64 = 0;
1319        for (tx_num, sender) in senders_iter {
1320            self.append_with_tx_number(tx_num, sender)?;
1321            count += 1;
1322        }
1323
1324        if let Some(metrics) = &self.metrics {
1325            metrics.record_segment_operations(
1326                StaticFileSegment::TransactionSenders,
1327                StaticFileProviderOperation::Append,
1328                count,
1329                Some(start.elapsed()),
1330            );
1331        }
1332
1333        Ok(())
1334    }
1335
1336    /// Appends a block changeset to the static file.
1337    ///
1338    /// It **CALLS** `increment_block()`.
1339    ///
1340    /// Returns the current number of changesets in the file, if any.
1341    pub fn append_account_changeset(
1342        &mut self,
1343        mut changeset: Vec<AccountBeforeTx>,
1344        block_number: u64,
1345    ) -> ProviderResult<()> {
1346        debug_assert!(self.writer.user_header().segment() == StaticFileSegment::AccountChangeSets);
1347        let start = Instant::now();
1348
1349        self.increment_block(block_number)?;
1350        self.ensure_no_queued_prune()?;
1351
1352        // first sort the changeset by address
1353        changeset.sort_by_key(|change| change.address);
1354
1355        let mut count: u64 = 0;
1356
1357        for change in changeset {
1358            self.append_change(&change)?;
1359            count += 1;
1360        }
1361
1362        if let Some(metrics) = &self.metrics {
1363            metrics.record_segment_operations(
1364                StaticFileSegment::AccountChangeSets,
1365                StaticFileProviderOperation::Append,
1366                count,
1367                Some(start.elapsed()),
1368            );
1369        }
1370
1371        Ok(())
1372    }
1373
1374    /// Appends a block storage changeset to the static file.
1375    ///
1376    /// It **CALLS** `increment_block()`.
1377    pub fn append_storage_changeset(
1378        &mut self,
1379        mut changeset: Vec<StorageBeforeTx>,
1380        block_number: u64,
1381    ) -> ProviderResult<()> {
1382        debug_assert!(self.writer.user_header().segment() == StaticFileSegment::StorageChangeSets);
1383        let start = Instant::now();
1384
1385        self.increment_block(block_number)?;
1386        self.ensure_no_queued_prune()?;
1387
1388        // sort by address + storage key
1389        changeset.sort_by_key(|change| (change.address, change.key));
1390
1391        let mut count: u64 = 0;
1392        for change in changeset {
1393            self.append_change(&change)?;
1394            count += 1;
1395        }
1396
1397        if let Some(metrics) = &self.metrics {
1398            metrics.record_segment_operations(
1399                StaticFileSegment::StorageChangeSets,
1400                StaticFileProviderOperation::Append,
1401                count,
1402                Some(start.elapsed()),
1403            );
1404        }
1405
1406        Ok(())
1407    }
1408
1409    /// Adds an instruction to prune `to_delete` transactions during commit.
1410    ///
1411    /// Note: `last_block` refers to the block the unwinds ends at.
1412    pub fn prune_transactions(
1413        &mut self,
1414        to_delete: u64,
1415        last_block: BlockNumber,
1416    ) -> ProviderResult<()> {
1417        debug_assert_eq!(self.writer.user_header().segment(), StaticFileSegment::Transactions);
1418        self.queue_prune(PruneStrategy::Transactions { num_rows: to_delete, last_block })
1419    }
1420
1421    /// Adds an instruction to prune `to_delete` receipts during commit.
1422    ///
1423    /// Note: `last_block` refers to the block the unwinds ends at.
1424    pub fn prune_receipts(
1425        &mut self,
1426        to_delete: u64,
1427        last_block: BlockNumber,
1428    ) -> ProviderResult<()> {
1429        debug_assert_eq!(self.writer.user_header().segment(), StaticFileSegment::Receipts);
1430        self.queue_prune(PruneStrategy::Receipts { num_rows: to_delete, last_block })
1431    }
1432
1433    /// Adds an instruction to prune `to_delete` transaction senders during commit.
1434    ///
1435    /// Note: `last_block` refers to the block the unwinds ends at.
1436    pub fn prune_transaction_senders(
1437        &mut self,
1438        to_delete: u64,
1439        last_block: BlockNumber,
1440    ) -> ProviderResult<()> {
1441        debug_assert_eq!(
1442            self.writer.user_header().segment(),
1443            StaticFileSegment::TransactionSenders
1444        );
1445        self.queue_prune(PruneStrategy::TransactionSenders { num_rows: to_delete, last_block })
1446    }
1447
1448    /// Adds an instruction to prune `to_delete` headers during commit.
1449    pub fn prune_headers(&mut self, to_delete: u64) -> ProviderResult<()> {
1450        debug_assert_eq!(self.writer.user_header().segment(), StaticFileSegment::Headers);
1451        self.queue_prune(PruneStrategy::Headers { num_blocks: to_delete })
1452    }
1453
1454    /// Adds an instruction to prune changesets until the given block.
1455    pub fn prune_account_changesets(&mut self, last_block: u64) -> ProviderResult<()> {
1456        debug_assert_eq!(self.writer.user_header().segment(), StaticFileSegment::AccountChangeSets);
1457        self.queue_prune(PruneStrategy::AccountChangeSets { last_block })
1458    }
1459
1460    /// Adds an instruction to prune storage changesets until the given block.
1461    pub fn prune_storage_changesets(&mut self, last_block: u64) -> ProviderResult<()> {
1462        debug_assert_eq!(self.writer.user_header().segment(), StaticFileSegment::StorageChangeSets);
1463        self.queue_prune(PruneStrategy::StorageChangeSets { last_block })
1464    }
1465
1466    /// Adds an instruction to prune elements during commit using the specified strategy.
1467    fn queue_prune(&mut self, strategy: PruneStrategy) -> ProviderResult<()> {
1468        self.ensure_no_queued_prune()?;
1469        self.prune_on_commit = Some(strategy);
1470        Ok(())
1471    }
1472
1473    /// Returns Error if there is a pruning instruction that needs to be applied.
1474    fn ensure_no_queued_prune(&self) -> ProviderResult<()> {
1475        if self.prune_on_commit.is_some() {
1476            return Err(ProviderError::other(StaticFileWriterError::new(
1477                "Pruning should be committed before appending or pruning more data",
1478            )));
1479        }
1480        Ok(())
1481    }
1482
1483    /// Removes the last `to_delete` transactions from the data file.
1484    fn prune_transaction_data(
1485        &mut self,
1486        to_delete: u64,
1487        last_block: BlockNumber,
1488    ) -> ProviderResult<()> {
1489        let start = Instant::now();
1490
1491        debug_assert!(self.writer.user_header().segment() == StaticFileSegment::Transactions);
1492
1493        self.truncate(to_delete, Some(last_block))?;
1494
1495        if let Some(metrics) = &self.metrics {
1496            metrics.record_segment_operation(
1497                StaticFileSegment::Transactions,
1498                StaticFileProviderOperation::Prune,
1499                Some(start.elapsed()),
1500            );
1501        }
1502
1503        Ok(())
1504    }
1505
1506    /// Prunes the last `to_delete` account changesets from the data file.
1507    fn prune_account_changeset_data(&mut self, last_block: BlockNumber) -> ProviderResult<()> {
1508        let start = Instant::now();
1509
1510        debug_assert!(self.writer.user_header().segment() == StaticFileSegment::AccountChangeSets);
1511
1512        self.truncate_changesets(last_block)?;
1513
1514        if let Some(metrics) = &self.metrics {
1515            metrics.record_segment_operation(
1516                StaticFileSegment::AccountChangeSets,
1517                StaticFileProviderOperation::Prune,
1518                Some(start.elapsed()),
1519            );
1520        }
1521
1522        Ok(())
1523    }
1524
1525    /// Prunes the last storage changesets from the data file.
1526    fn prune_storage_changeset_data(&mut self, last_block: BlockNumber) -> ProviderResult<()> {
1527        let start = Instant::now();
1528
1529        debug_assert!(self.writer.user_header().segment() == StaticFileSegment::StorageChangeSets);
1530
1531        self.truncate_changesets(last_block)?;
1532
1533        if let Some(metrics) = &self.metrics {
1534            metrics.record_segment_operation(
1535                StaticFileSegment::StorageChangeSets,
1536                StaticFileProviderOperation::Prune,
1537                Some(start.elapsed()),
1538            );
1539        }
1540
1541        Ok(())
1542    }
1543
1544    /// Prunes the last `to_delete` receipts from the data file.
1545    fn prune_receipt_data(
1546        &mut self,
1547        to_delete: u64,
1548        last_block: BlockNumber,
1549    ) -> ProviderResult<()> {
1550        let start = Instant::now();
1551
1552        debug_assert!(self.writer.user_header().segment() == StaticFileSegment::Receipts);
1553
1554        self.truncate(to_delete, Some(last_block))?;
1555
1556        if let Some(metrics) = &self.metrics {
1557            metrics.record_segment_operation(
1558                StaticFileSegment::Receipts,
1559                StaticFileProviderOperation::Prune,
1560                Some(start.elapsed()),
1561            );
1562        }
1563
1564        Ok(())
1565    }
1566
1567    /// Prunes the last `to_delete` transaction senders from the data file.
1568    fn prune_transaction_sender_data(
1569        &mut self,
1570        to_delete: u64,
1571        last_block: BlockNumber,
1572    ) -> ProviderResult<()> {
1573        let start = Instant::now();
1574
1575        debug_assert!(self.writer.user_header().segment() == StaticFileSegment::TransactionSenders);
1576
1577        self.truncate(to_delete, Some(last_block))?;
1578
1579        if let Some(metrics) = &self.metrics {
1580            metrics.record_segment_operation(
1581                StaticFileSegment::TransactionSenders,
1582                StaticFileProviderOperation::Prune,
1583                Some(start.elapsed()),
1584            );
1585        }
1586
1587        Ok(())
1588    }
1589
1590    /// Prunes the last `to_delete` headers from the data file.
1591    fn prune_header_data(&mut self, to_delete: u64) -> ProviderResult<()> {
1592        let start = Instant::now();
1593
1594        debug_assert!(self.writer.user_header().segment() == StaticFileSegment::Headers);
1595
1596        self.truncate(to_delete, None)?;
1597
1598        if let Some(metrics) = &self.metrics {
1599            metrics.record_segment_operation(
1600                StaticFileSegment::Headers,
1601                StaticFileProviderOperation::Prune,
1602                Some(start.elapsed()),
1603            );
1604        }
1605
1606        Ok(())
1607    }
1608
1609    /// Returns a [`StaticFileProvider`] associated with this writer.
1610    pub fn reader(&self) -> StaticFileProvider<N> {
1611        Self::upgrade_provider_to_strong_reference(&self.reader)
1612    }
1613
1614    /// Upgrades a weak reference of [`StaticFileProviderInner`] to a strong reference
1615    /// [`StaticFileProvider`].
1616    ///
1617    /// # Panics
1618    ///
1619    /// Panics if the parent [`StaticFileProvider`] is fully dropped while the child writer is still
1620    /// active. In reality, it's impossible to detach the [`StaticFileProviderRW`] from the
1621    /// [`StaticFileProvider`].
1622    fn upgrade_provider_to_strong_reference(
1623        provider: &Weak<StaticFileProviderInner<N>>,
1624    ) -> StaticFileProvider<N> {
1625        provider.upgrade().map(StaticFileProvider).expect("StaticFileProvider is dropped")
1626    }
1627
1628    /// Helper function to access [`SegmentHeader`].
1629    pub const fn user_header(&self) -> &SegmentHeader {
1630        self.writer.user_header()
1631    }
1632
1633    /// Helper function to access a mutable reference to [`SegmentHeader`].
1634    pub const fn user_header_mut(&mut self) -> &mut SegmentHeader {
1635        self.writer.user_header_mut()
1636    }
1637
1638    /// Helper function to override block range for testing.
1639    #[cfg(any(test, feature = "test-utils"))]
1640    pub const fn set_block_range(&mut self, block_range: std::ops::RangeInclusive<BlockNumber>) {
1641        self.writer.user_header_mut().set_block_range(*block_range.start(), *block_range.end())
1642    }
1643
1644    /// Helper function to override block range for testing.
1645    #[cfg(any(test, feature = "test-utils"))]
1646    pub const fn inner(&mut self) -> &mut NippyJarWriter<SegmentHeader> {
1647        &mut self.writer
1648    }
1649}
1650
1651fn create_jar(
1652    segment: StaticFileSegment,
1653    path: &Path,
1654    expected_block_range: SegmentRangeInclusive,
1655) -> NippyJar<SegmentHeader> {
1656    let mut jar = NippyJar::new(
1657        segment.columns(),
1658        path,
1659        SegmentHeader::new(expected_block_range, None, None, segment),
1660    );
1661
1662    // Transaction and Receipt already have the compression scheme used natively in its encoding.
1663    // (zstd-dictionary)
1664    if segment.is_headers() {
1665        jar = jar.with_lz4();
1666    }
1667
1668    jar
1669}