reth_provider/providers/static_file/
writer.rs

1use super::{
2    manager::StaticFileProviderInner, metrics::StaticFileProviderMetrics, StaticFileProvider,
3};
4use crate::providers::static_file::metrics::StaticFileProviderOperation;
5use alloy_consensus::BlockHeader;
6use alloy_primitives::{BlockHash, BlockNumber, TxNumber, U256};
7use parking_lot::{lock_api::RwLockWriteGuard, RawRwLock, RwLock};
8use reth_codecs::Compact;
9use reth_db::models::{AccountBeforeTx, StorageBeforeTx};
10use reth_db_api::models::CompactU256;
11use reth_nippy_jar::{NippyJar, NippyJarError, NippyJarWriter};
12use reth_node_types::NodePrimitives;
13use reth_static_file_types::{SegmentHeader, SegmentRangeInclusive, StaticFileSegment};
14use reth_storage_errors::provider::{ProviderError, ProviderResult, StaticFileWriterError};
15use std::{
16    borrow::Borrow,
17    cmp::Ordering,
18    fmt::Debug,
19    path::{Path, PathBuf},
20    sync::{Arc, Weak},
21    time::Instant,
22};
23use tracing::debug;
24
25/// Represents different pruning strategies for various static file segments.
26#[derive(Debug, Clone, Copy)]
27enum PruneStrategy {
28    /// Prune headers by number of blocks to delete.
29    Headers {
30        /// Number of blocks to delete.
31        num_blocks: u64,
32    },
33    /// Prune transactions by number of rows and last block.
34    Transactions {
35        /// Number of transaction rows to delete.
36        num_rows: u64,
37        /// The last block number after pruning.
38        last_block: BlockNumber,
39    },
40    /// Prune receipts by number of rows and last block.
41    Receipts {
42        /// Number of receipt rows to delete.
43        num_rows: u64,
44        /// The last block number after pruning.
45        last_block: BlockNumber,
46    },
47    /// Prune transaction senders by number of rows and last block.
48    TransactionSenders {
49        /// Number of transaction sender rows to delete.
50        num_rows: u64,
51        /// The last block number after pruning.
52        last_block: BlockNumber,
53    },
54    /// Prune account changesets to a target block number.
55    AccountChangeSets {
56        /// The target block number to prune to.
57        last_block: BlockNumber,
58    },
59    /// Prune storage changesets to a target block number.
60    StorageChangeSets {
61        /// The target block number to prune to.
62        last_block: BlockNumber,
63    },
64}
65
66/// Static file writers for every known [`StaticFileSegment`].
67///
68/// WARNING: Trying to use more than one writer for the same segment type **will result in a
69/// deadlock**.
70#[derive(Debug)]
71pub(crate) struct StaticFileWriters<N> {
72    headers: RwLock<Option<StaticFileProviderRW<N>>>,
73    transactions: RwLock<Option<StaticFileProviderRW<N>>>,
74    receipts: RwLock<Option<StaticFileProviderRW<N>>>,
75    transaction_senders: RwLock<Option<StaticFileProviderRW<N>>>,
76    account_change_sets: RwLock<Option<StaticFileProviderRW<N>>>,
77    storage_change_sets: RwLock<Option<StaticFileProviderRW<N>>>,
78}
79
80impl<N> Default for StaticFileWriters<N> {
81    fn default() -> Self {
82        Self {
83            headers: Default::default(),
84            transactions: Default::default(),
85            receipts: Default::default(),
86            transaction_senders: Default::default(),
87            account_change_sets: Default::default(),
88            storage_change_sets: Default::default(),
89        }
90    }
91}
92
93impl<N: NodePrimitives> StaticFileWriters<N> {
94    pub(crate) fn get_or_create(
95        &self,
96        segment: StaticFileSegment,
97        create_fn: impl FnOnce() -> ProviderResult<StaticFileProviderRW<N>>,
98    ) -> ProviderResult<StaticFileProviderRWRefMut<'_, N>> {
99        let mut write_guard = match segment {
100            StaticFileSegment::Headers => self.headers.write(),
101            StaticFileSegment::Transactions => self.transactions.write(),
102            StaticFileSegment::Receipts => self.receipts.write(),
103            StaticFileSegment::TransactionSenders => self.transaction_senders.write(),
104            StaticFileSegment::AccountChangeSets => self.account_change_sets.write(),
105            StaticFileSegment::StorageChangeSets => self.storage_change_sets.write(),
106        };
107
108        if write_guard.is_none() {
109            *write_guard = Some(create_fn()?);
110        }
111
112        Ok(StaticFileProviderRWRefMut(write_guard))
113    }
114
115    pub(crate) fn commit(&self) -> ProviderResult<()> {
116        debug!(target: "provider::static_file", "Committing all static file segments");
117
118        for writer_lock in [
119            &self.headers,
120            &self.transactions,
121            &self.receipts,
122            &self.transaction_senders,
123            &self.account_change_sets,
124            &self.storage_change_sets,
125        ] {
126            let mut writer = writer_lock.write();
127            if let Some(writer) = writer.as_mut() {
128                writer.commit()?;
129            }
130        }
131
132        debug!(target: "provider::static_file", "Committed all static file segments");
133        Ok(())
134    }
135
136    pub(crate) fn has_unwind_queued(&self) -> bool {
137        for writer_lock in [
138            &self.headers,
139            &self.transactions,
140            &self.receipts,
141            &self.transaction_senders,
142            &self.account_change_sets,
143            &self.storage_change_sets,
144        ] {
145            let writer = writer_lock.read();
146            if let Some(writer) = writer.as_ref() &&
147                writer.will_prune_on_commit()
148            {
149                return true
150            }
151        }
152        false
153    }
154
155    /// Finalizes all writers by committing their configuration to disk and updating indices.
156    ///
157    /// Must be called after `sync_all` was called on individual writers.
158    /// Returns an error if any writer has prune queued.
159    pub(crate) fn finalize(&self) -> ProviderResult<()> {
160        debug!(target: "provider::static_file", "Finalizing all static file segments into disk");
161
162        for writer_lock in [
163            &self.headers,
164            &self.transactions,
165            &self.receipts,
166            &self.transaction_senders,
167            &self.account_change_sets,
168            &self.storage_change_sets,
169        ] {
170            let mut writer = writer_lock.write();
171            if let Some(writer) = writer.as_mut() {
172                writer.finalize()?;
173            }
174        }
175
176        debug!(target: "provider::static_file", "Finalized all static file segments into disk");
177        Ok(())
178    }
179}
180
181/// Mutable reference to a [`StaticFileProviderRW`] behind a [`RwLockWriteGuard`].
182#[derive(Debug)]
183pub struct StaticFileProviderRWRefMut<'a, N>(
184    pub(crate) RwLockWriteGuard<'a, RawRwLock, Option<StaticFileProviderRW<N>>>,
185);
186
187impl<N> std::ops::DerefMut for StaticFileProviderRWRefMut<'_, N> {
188    fn deref_mut(&mut self) -> &mut Self::Target {
189        // This is always created by [`StaticFileWriters::get_or_create`]
190        self.0.as_mut().expect("static file writer provider should be init")
191    }
192}
193
194impl<N> std::ops::Deref for StaticFileProviderRWRefMut<'_, N> {
195    type Target = StaticFileProviderRW<N>;
196
197    fn deref(&self) -> &Self::Target {
198        // This is always created by [`StaticFileWriters::get_or_create`]
199        self.0.as_ref().expect("static file writer provider should be init")
200    }
201}
202
203#[derive(Debug)]
204/// Extends `StaticFileProvider` with writing capabilities
205pub struct StaticFileProviderRW<N> {
206    /// Reference back to the provider. We need [Weak] here because [`StaticFileProviderRW`] is
207    /// stored in a [`dashmap::DashMap`] inside the parent [`StaticFileProvider`].which is an
208    /// [Arc]. If we were to use an [Arc] here, we would create a reference cycle.
209    reader: Weak<StaticFileProviderInner<N>>,
210    /// A [`NippyJarWriter`] instance.
211    writer: NippyJarWriter<SegmentHeader>,
212    /// Path to opened file.
213    data_path: PathBuf,
214    /// Reusable buffer for encoding appended data.
215    buf: Vec<u8>,
216    /// Metrics.
217    metrics: Option<Arc<StaticFileProviderMetrics>>,
218    /// On commit, contains the pruning strategy to apply for the segment.
219    prune_on_commit: Option<PruneStrategy>,
220    /// Whether `sync_all()` has been called. Used by `finalize()` to avoid redundant syncs.
221    synced: bool,
222}
223
224impl<N: NodePrimitives> StaticFileProviderRW<N> {
225    /// Creates a new [`StaticFileProviderRW`] for a [`StaticFileSegment`].
226    ///
227    /// Before use, transaction based segments should ensure the block end range is the expected
228    /// one, and heal if not. For more check `Self::ensure_end_range_consistency`.
229    pub fn new(
230        segment: StaticFileSegment,
231        block: BlockNumber,
232        reader: Weak<StaticFileProviderInner<N>>,
233        metrics: Option<Arc<StaticFileProviderMetrics>>,
234    ) -> ProviderResult<Self> {
235        let (writer, data_path) = Self::open(segment, block, reader.clone(), metrics.clone())?;
236        let mut writer = Self {
237            writer,
238            data_path,
239            buf: Vec::with_capacity(100),
240            reader,
241            metrics,
242            prune_on_commit: None,
243            synced: false,
244        };
245
246        writer.ensure_end_range_consistency()?;
247
248        Ok(writer)
249    }
250
251    fn open(
252        segment: StaticFileSegment,
253        block: u64,
254        reader: Weak<StaticFileProviderInner<N>>,
255        metrics: Option<Arc<StaticFileProviderMetrics>>,
256    ) -> ProviderResult<(NippyJarWriter<SegmentHeader>, PathBuf)> {
257        let start = Instant::now();
258
259        let static_file_provider = Self::upgrade_provider_to_strong_reference(&reader);
260
261        let block_range = static_file_provider.find_fixed_range(segment, block);
262        let (jar, path) = match static_file_provider.get_segment_provider_for_block(
263            segment,
264            block_range.start(),
265            None,
266        ) {
267            Ok(provider) => (
268                NippyJar::load(provider.data_path()).map_err(ProviderError::other)?,
269                provider.data_path().into(),
270            ),
271            Err(ProviderError::MissingStaticFileBlock(_, _)) => {
272                let path = static_file_provider.directory().join(segment.filename(&block_range));
273                (create_jar(segment, &path, block_range), path)
274            }
275            Err(err) => return Err(err),
276        };
277
278        let result = match NippyJarWriter::new(jar) {
279            Ok(writer) => Ok((writer, path)),
280            Err(NippyJarError::FrozenJar) => {
281                // This static file has been frozen, so we should
282                Err(ProviderError::FinalizedStaticFile(segment, block))
283            }
284            Err(e) => Err(ProviderError::other(e)),
285        }?;
286
287        if let Some(metrics) = &metrics {
288            metrics.record_segment_operation(
289                segment,
290                StaticFileProviderOperation::OpenWriter,
291                Some(start.elapsed()),
292            );
293        }
294
295        Ok(result)
296    }
297
298    /// If a file level healing happens, we need to update the end range on the
299    /// [`SegmentHeader`].
300    ///
301    /// However, for transaction based segments, the block end range has to be found and healed
302    /// externally.
303    ///
304    /// Check [`reth_nippy_jar::NippyJarChecker`] &
305    /// [`NippyJarWriter`] for more on healing.
306    fn ensure_end_range_consistency(&mut self) -> ProviderResult<()> {
307        // If we have lost rows (in this run or previous), we need to update the [SegmentHeader].
308        let expected_rows = if self.user_header().segment().is_headers() {
309            self.user_header().block_len().unwrap_or_default()
310        } else {
311            self.user_header().tx_len().unwrap_or_default()
312        };
313        let actual_rows = self.writer.rows() as u64;
314        let pruned_rows = expected_rows.saturating_sub(actual_rows);
315        if pruned_rows > 0 {
316            self.user_header_mut().prune(pruned_rows);
317        }
318
319        debug!(
320            target: "provider::static_file",
321            segment = ?self.writer.user_header().segment(),
322            path = ?self.data_path,
323            pruned_rows,
324            "Ensuring end range consistency"
325        );
326
327        self.writer.commit().map_err(ProviderError::other)?;
328
329        // Updates the [SnapshotProvider] manager
330        self.update_index()?;
331        Ok(())
332    }
333
334    /// Returns `true` if the writer will prune on commit.
335    pub const fn will_prune_on_commit(&self) -> bool {
336        self.prune_on_commit.is_some()
337    }
338
339    /// Syncs all data (rows and offsets) to disk.
340    ///
341    /// This does NOT commit the configuration. Call [`Self::finalize`] after to write the
342    /// configuration and mark the writer as clean.
343    ///
344    /// Returns an error if prune is queued (use [`Self::commit`] instead).
345    pub fn sync_all(&mut self) -> ProviderResult<()> {
346        if self.prune_on_commit.is_some() {
347            return Err(StaticFileWriterError::FinalizeWithPruneQueued.into());
348        }
349        if self.writer.is_dirty() {
350            self.writer.sync_all().map_err(ProviderError::other)?;
351        }
352        self.synced = true;
353        Ok(())
354    }
355
356    /// Commits configuration to disk and updates the reader index.
357    ///
358    /// If `sync_all()` was not called, this will call it first to ensure data is persisted.
359    ///
360    /// Returns an error if prune is queued (use [`Self::commit`] instead).
361    pub fn finalize(&mut self) -> ProviderResult<()> {
362        if self.prune_on_commit.is_some() {
363            return Err(StaticFileWriterError::FinalizeWithPruneQueued.into());
364        }
365        if self.writer.is_dirty() {
366            if !self.synced {
367                self.writer.sync_all().map_err(ProviderError::other)?;
368            }
369
370            self.writer.finalize().map_err(ProviderError::other)?;
371            self.update_index()?;
372        }
373        self.synced = false;
374        Ok(())
375    }
376
377    /// Commits configuration changes to disk and updates the reader index with the new changes.
378    pub fn commit(&mut self) -> ProviderResult<()> {
379        let start = Instant::now();
380
381        // Truncates the data file if instructed to.
382        if let Some(strategy) = self.prune_on_commit.take() {
383            debug!(
384                target: "provider::static_file",
385                segment = ?self.writer.user_header().segment(),
386                "Pruning data on commit"
387            );
388            match strategy {
389                PruneStrategy::Headers { num_blocks } => self.prune_header_data(num_blocks)?,
390                PruneStrategy::Transactions { num_rows, last_block } => {
391                    self.prune_transaction_data(num_rows, last_block)?
392                }
393                PruneStrategy::Receipts { num_rows, last_block } => {
394                    self.prune_receipt_data(num_rows, last_block)?
395                }
396                PruneStrategy::TransactionSenders { num_rows, last_block } => {
397                    self.prune_transaction_sender_data(num_rows, last_block)?
398                }
399                PruneStrategy::AccountChangeSets { last_block } => {
400                    self.prune_account_changeset_data(last_block)?
401                }
402                PruneStrategy::StorageChangeSets { last_block } => {
403                    self.prune_storage_changeset_data(last_block)?
404                }
405            }
406        }
407
408        if self.writer.is_dirty() {
409            debug!(
410                target: "provider::static_file",
411                segment = ?self.writer.user_header().segment(),
412                "Committing writer to disk"
413            );
414
415            // Commits offsets and new user_header to disk
416            self.writer.commit().map_err(ProviderError::other)?;
417
418            if let Some(metrics) = &self.metrics {
419                metrics.record_segment_operation(
420                    self.writer.user_header().segment(),
421                    StaticFileProviderOperation::CommitWriter,
422                    Some(start.elapsed()),
423                );
424            }
425
426            debug!(
427                target: "provider::static_file",
428                segment = ?self.writer.user_header().segment(),
429                path = ?self.data_path,
430                duration = ?start.elapsed(),
431                "Committed writer to disk"
432            );
433
434            self.update_index()?;
435        }
436
437        Ok(())
438    }
439
440    /// Commits configuration changes to disk and updates the reader index with the new changes.
441    ///
442    /// CAUTION: does not call `sync_all` on the files.
443    #[cfg(feature = "test-utils")]
444    pub fn commit_without_sync_all(&mut self) -> ProviderResult<()> {
445        let start = Instant::now();
446
447        debug!(
448            target: "provider::static_file",
449            segment = ?self.writer.user_header().segment(),
450            "Committing writer to disk (without sync)"
451        );
452
453        // Commits offsets and new user_header to disk
454        self.writer.commit_without_sync_all().map_err(ProviderError::other)?;
455
456        if let Some(metrics) = &self.metrics {
457            metrics.record_segment_operation(
458                self.writer.user_header().segment(),
459                StaticFileProviderOperation::CommitWriter,
460                Some(start.elapsed()),
461            );
462        }
463
464        debug!(
465            target: "provider::static_file",
466            segment = ?self.writer.user_header().segment(),
467            path = ?self.data_path,
468            duration = ?start.elapsed(),
469            "Committed writer to disk (without sync)"
470        );
471
472        self.update_index()?;
473
474        Ok(())
475    }
476
477    /// Updates the `self.reader` internal index.
478    fn update_index(&self) -> ProviderResult<()> {
479        // We find the maximum block of the segment by checking this writer's last block.
480        //
481        // However if there's no block range (because there's no data), we try to calculate it by
482        // subtracting 1 from the expected block start, resulting on the last block of the
483        // previous file.
484        //
485        // If that expected block start is 0, then it means that there's no actual block data, and
486        // there's no block data in static files.
487        let segment_max_block = self
488            .writer
489            .user_header()
490            .block_range()
491            .as_ref()
492            .map(|block_range| block_range.end())
493            .or_else(|| {
494                (self.writer.user_header().expected_block_start() >
495                    self.reader().genesis_block_number())
496                .then(|| self.writer.user_header().expected_block_start() - 1)
497            });
498
499        self.reader().update_index(self.writer.user_header().segment(), segment_max_block)
500    }
501
502    /// Ensures that the writer is positioned at the specified block number.
503    ///
504    /// If the writer is positioned at a greater block number than the specified one, the writer
505    /// will NOT be unwound and the error will be returned.
506    pub fn ensure_at_block(&mut self, advance_to: BlockNumber) -> ProviderResult<()> {
507        let current_block = if let Some(current_block_number) = self.current_block_number() {
508            current_block_number
509        } else {
510            self.increment_block(0)?;
511            0
512        };
513
514        match current_block.cmp(&advance_to) {
515            Ordering::Less => {
516                for block in current_block + 1..=advance_to {
517                    self.increment_block(block)?;
518                }
519            }
520            Ordering::Equal => {}
521            Ordering::Greater => {
522                return Err(ProviderError::UnexpectedStaticFileBlockNumber(
523                    self.writer.user_header().segment(),
524                    current_block,
525                    advance_to,
526                ));
527            }
528        }
529
530        Ok(())
531    }
532
533    /// Allows to increment the [`SegmentHeader`] end block. It will commit the current static file,
534    /// and create the next one if we are past the end range.
535    pub fn increment_block(&mut self, expected_block_number: BlockNumber) -> ProviderResult<()> {
536        let segment = self.writer.user_header().segment();
537
538        self.check_next_block_number(expected_block_number)?;
539
540        let start = Instant::now();
541        if let Some(last_block) = self.writer.user_header().block_end() {
542            // We have finished the previous static file and must freeze it
543            if last_block == self.writer.user_header().expected_block_end() {
544                // Commits offsets and new user_header to disk
545                self.commit()?;
546
547                // Opens the new static file
548                let (writer, data_path) =
549                    Self::open(segment, last_block + 1, self.reader.clone(), self.metrics.clone())?;
550                self.writer = writer;
551                self.data_path = data_path;
552
553                *self.writer.user_header_mut() = SegmentHeader::new(
554                    self.reader().find_fixed_range(segment, last_block + 1),
555                    None,
556                    None,
557                    segment,
558                );
559            }
560        }
561
562        self.writer.user_header_mut().increment_block();
563        if let Some(metrics) = &self.metrics {
564            metrics.record_segment_operation(
565                segment,
566                StaticFileProviderOperation::IncrementBlock,
567                Some(start.elapsed()),
568            );
569        }
570
571        Ok(())
572    }
573
574    /// Returns the current block number of the static file writer.
575    pub fn current_block_number(&self) -> Option<u64> {
576        self.writer.user_header().block_end()
577    }
578
579    /// Returns a block number that is one next to the current tip of static files.
580    pub fn next_block_number(&self) -> u64 {
581        // The next static file block number can be found by checking the one after block_end.
582        // However, if it's a new file that hasn't been added any data, its block range will
583        // actually be None. In that case, the next block will be found on `expected_block_start`.
584        self.writer
585            .user_header()
586            .block_end()
587            .map(|b| b + 1)
588            .unwrap_or_else(|| self.writer.user_header().expected_block_start())
589    }
590
591    /// Verifies if the incoming block number matches the next expected block number
592    /// for a static file. This ensures data continuity when adding new blocks.
593    fn check_next_block_number(&self, expected_block_number: u64) -> ProviderResult<()> {
594        let next_static_file_block = self.next_block_number();
595
596        if expected_block_number != next_static_file_block {
597            return Err(ProviderError::UnexpectedStaticFileBlockNumber(
598                self.writer.user_header().segment(),
599                expected_block_number,
600                next_static_file_block,
601            ))
602        }
603        Ok(())
604    }
605
606    /// Truncates account changesets to the given block. It deletes and loads an older static file
607    /// if the block goes beyond the start of the current block range.
608    ///
609    /// # Note
610    /// Commits to the configuration file at the end
611    fn truncate_changesets(&mut self, last_block: u64) -> ProviderResult<()> {
612        let segment = self.writer.user_header().segment();
613        debug_assert!(segment.is_change_based());
614
615        // Get the current block range
616        let current_block_end = self
617            .writer
618            .user_header()
619            .block_end()
620            .ok_or(ProviderError::MissingStaticFileBlock(segment, 0))?;
621
622        // If we're already at or before the target block, nothing to do
623        if current_block_end <= last_block {
624            return Ok(())
625        }
626
627        // Navigate to the correct file if the target block is in a previous file
628        let mut expected_block_start = self.writer.user_header().expected_block_start();
629        while last_block < expected_block_start && expected_block_start > 0 {
630            self.delete_current_and_open_previous()?;
631            expected_block_start = self.writer.user_header().expected_block_start();
632        }
633
634        // Now we're in the correct file, we need to find how many rows to prune
635        // We need to iterate through the changesets to find the correct position
636        // Since changesets are stored per block, we need to find the offset for the block
637        let changeset_offsets = self.writer.user_header().changeset_offsets().ok_or_else(|| {
638            ProviderError::other(StaticFileWriterError::new("Missing changeset offsets"))
639        })?;
640
641        // Find the number of rows to keep (up to and including last_block)
642        let blocks_to_keep = if last_block >= expected_block_start {
643            last_block - expected_block_start + 1
644        } else {
645            0
646        };
647
648        let rows_to_keep = if blocks_to_keep == 0 {
649            0
650        } else if blocks_to_keep as usize > changeset_offsets.len() {
651            // Keep all rows in this file (shouldn't happen if data is consistent)
652            self.writer.rows() as u64
653        } else if blocks_to_keep as usize == changeset_offsets.len() {
654            // Keep all rows
655            self.writer.rows() as u64
656        } else {
657            // Find the offset for the block after last_block
658            // This gives us the number of rows to keep
659            changeset_offsets[blocks_to_keep as usize].offset()
660        };
661
662        let total_rows = self.writer.rows() as u64;
663        let rows_to_delete = total_rows.saturating_sub(rows_to_keep);
664
665        if rows_to_delete > 0 {
666            // Calculate the number of blocks to prune
667            let current_block_end = self
668                .writer
669                .user_header()
670                .block_end()
671                .ok_or(ProviderError::MissingStaticFileBlock(segment, 0))?;
672            let blocks_to_remove = current_block_end - last_block;
673
674            // Update segment header - for changesets, prune expects number of blocks, not rows
675            self.writer.user_header_mut().prune(blocks_to_remove);
676
677            // Prune the actual rows
678            self.writer.prune_rows(rows_to_delete as usize).map_err(ProviderError::other)?;
679        }
680
681        // Update the block range
682        self.writer.user_header_mut().set_block_range(expected_block_start, last_block);
683
684        // Sync changeset offsets to match the new block range
685        self.writer.user_header_mut().sync_changeset_offsets();
686
687        // Commits new changes to disk
688        self.commit()?;
689
690        Ok(())
691    }
692
693    /// Truncates a number of rows from disk. It deletes and loads an older static file if block
694    /// goes beyond the start of the current block range.
695    ///
696    /// **`last_block`** should be passed only with transaction based segments.
697    ///
698    /// # Note
699    /// Commits to the configuration file at the end.
700    fn truncate(&mut self, num_rows: u64, last_block: Option<u64>) -> ProviderResult<()> {
701        let mut remaining_rows = num_rows;
702        let segment = self.writer.user_header().segment();
703        while remaining_rows > 0 {
704            let len = if segment.is_block_based() {
705                self.writer.user_header().block_len().unwrap_or_default()
706            } else {
707                self.writer.user_header().tx_len().unwrap_or_default()
708            };
709
710            if remaining_rows >= len {
711                // If there's more rows to delete than this static file contains, then just
712                // delete the whole file and go to the next static file
713                let block_start = self.writer.user_header().expected_block_start();
714
715                // We only delete the file if it's NOT the first static file AND:
716                // * it's a Header segment  OR
717                // * it's a tx-based segment AND `last_block` is lower than the first block of this
718                //   file's block range. Otherwise, having no rows simply means that this block
719                //   range has no transactions, but the file should remain.
720                if block_start != 0 &&
721                    (segment.is_headers() || last_block.is_some_and(|b| b < block_start))
722                {
723                    self.delete_current_and_open_previous()?;
724                } else {
725                    // Update `SegmentHeader`
726                    self.writer.user_header_mut().prune(len);
727                    self.writer.prune_rows(len as usize).map_err(ProviderError::other)?;
728                    break
729                }
730
731                remaining_rows -= len;
732            } else {
733                // Update `SegmentHeader`
734                self.writer.user_header_mut().prune(remaining_rows);
735
736                // Truncate data
737                self.writer.prune_rows(remaining_rows as usize).map_err(ProviderError::other)?;
738                remaining_rows = 0;
739            }
740        }
741
742        // Only Transactions and Receipts
743        if let Some(last_block) = last_block {
744            let mut expected_block_start = self.writer.user_header().expected_block_start();
745
746            if num_rows == 0 {
747                // Edge case for when we are unwinding a chain of empty blocks that goes across
748                // files, and therefore, the only reference point to know which file
749                // we are supposed to be at is `last_block`.
750                while last_block < expected_block_start {
751                    self.delete_current_and_open_previous()?;
752                    expected_block_start = self.writer.user_header().expected_block_start();
753                }
754            }
755            self.writer.user_header_mut().set_block_range(expected_block_start, last_block);
756        }
757
758        // Commits new changes to disk.
759        self.commit()?;
760
761        Ok(())
762    }
763
764    /// Delete the current static file, and replace this provider writer with the previous static
765    /// file.
766    fn delete_current_and_open_previous(&mut self) -> Result<(), ProviderError> {
767        let current_path = self.data_path.clone();
768        let (previous_writer, data_path) = Self::open(
769            self.user_header().segment(),
770            self.writer.user_header().expected_block_start() - 1,
771            self.reader.clone(),
772            self.metrics.clone(),
773        )?;
774        self.writer = previous_writer;
775        self.writer.set_dirty();
776        self.data_path = data_path;
777        NippyJar::<SegmentHeader>::load(&current_path)
778            .map_err(ProviderError::other)?
779            .delete()
780            .map_err(ProviderError::other)?;
781        Ok(())
782    }
783
784    /// Appends column to static file.
785    fn append_column<T: Compact>(&mut self, column: T) -> ProviderResult<()> {
786        self.buf.clear();
787        column.to_compact(&mut self.buf);
788
789        self.writer.append_column(Some(Ok(&self.buf))).map_err(ProviderError::other)?;
790        Ok(())
791    }
792
793    /// Appends to tx number-based static file.
794    fn append_with_tx_number<V: Compact>(
795        &mut self,
796        tx_num: TxNumber,
797        value: V,
798    ) -> ProviderResult<()> {
799        if let Some(range) = self.writer.user_header().tx_range() {
800            let next_tx = range.end() + 1;
801            if next_tx != tx_num {
802                return Err(ProviderError::UnexpectedStaticFileTxNumber(
803                    self.writer.user_header().segment(),
804                    tx_num,
805                    next_tx,
806                ))
807            }
808            self.writer.user_header_mut().increment_tx();
809        } else {
810            self.writer.user_header_mut().set_tx_range(tx_num, tx_num);
811        }
812
813        self.append_column(value)?;
814
815        Ok(())
816    }
817
818    /// Appends change to changeset static file.
819    fn append_change<V: Compact>(&mut self, change: &V) -> ProviderResult<()> {
820        if self.writer.user_header().changeset_offsets().is_some() {
821            self.writer.user_header_mut().increment_block_changes();
822        }
823
824        self.append_column(change)?;
825        Ok(())
826    }
827
828    /// Appends header to static file.
829    ///
830    /// It **CALLS** `increment_block()` since the number of headers is equal to the number of
831    /// blocks.
832    pub fn append_header(&mut self, header: &N::BlockHeader, hash: &BlockHash) -> ProviderResult<()>
833    where
834        N::BlockHeader: Compact,
835    {
836        self.append_header_with_td(header, U256::ZERO, hash)
837    }
838
839    /// Appends header to static file with a specified total difficulty.
840    ///
841    /// It **CALLS** `increment_block()` since the number of headers is equal to the number of
842    /// blocks.
843    pub fn append_header_with_td(
844        &mut self,
845        header: &N::BlockHeader,
846        total_difficulty: U256,
847        hash: &BlockHash,
848    ) -> ProviderResult<()>
849    where
850        N::BlockHeader: Compact,
851    {
852        let start = Instant::now();
853        self.ensure_no_queued_prune()?;
854
855        debug_assert!(self.writer.user_header().segment() == StaticFileSegment::Headers);
856
857        self.increment_block(header.number())?;
858
859        self.append_column(header)?;
860        self.append_column(CompactU256::from(total_difficulty))?;
861        self.append_column(hash)?;
862
863        if let Some(metrics) = &self.metrics {
864            metrics.record_segment_operation(
865                StaticFileSegment::Headers,
866                StaticFileProviderOperation::Append,
867                Some(start.elapsed()),
868            );
869        }
870
871        Ok(())
872    }
873
874    /// Appends header to static file without calling `increment_block`.
875    /// This is useful for genesis blocks with non-zero block numbers.
876    pub fn append_header_direct(
877        &mut self,
878        header: &N::BlockHeader,
879        total_difficulty: U256,
880        hash: &BlockHash,
881    ) -> ProviderResult<()>
882    where
883        N::BlockHeader: Compact,
884    {
885        let start = Instant::now();
886        self.ensure_no_queued_prune()?;
887
888        debug_assert!(self.writer.user_header().segment() == StaticFileSegment::Headers);
889
890        self.append_column(header)?;
891        self.append_column(CompactU256::from(total_difficulty))?;
892        self.append_column(hash)?;
893
894        if let Some(metrics) = &self.metrics {
895            metrics.record_segment_operation(
896                StaticFileSegment::Headers,
897                StaticFileProviderOperation::Append,
898                Some(start.elapsed()),
899            );
900        }
901
902        Ok(())
903    }
904
905    /// Appends transaction to static file.
906    ///
907    /// It **DOES NOT CALL** `increment_block()`, it should be handled elsewhere. There might be
908    /// empty blocks and this function wouldn't be called.
909    pub fn append_transaction(&mut self, tx_num: TxNumber, tx: &N::SignedTx) -> ProviderResult<()>
910    where
911        N::SignedTx: Compact,
912    {
913        let start = Instant::now();
914        self.ensure_no_queued_prune()?;
915
916        debug_assert!(self.writer.user_header().segment() == StaticFileSegment::Transactions);
917        self.append_with_tx_number(tx_num, tx)?;
918
919        if let Some(metrics) = &self.metrics {
920            metrics.record_segment_operation(
921                StaticFileSegment::Transactions,
922                StaticFileProviderOperation::Append,
923                Some(start.elapsed()),
924            );
925        }
926
927        Ok(())
928    }
929
930    /// Appends receipt to static file.
931    ///
932    /// It **DOES NOT** call `increment_block()`, it should be handled elsewhere. There might be
933    /// empty blocks and this function wouldn't be called.
934    pub fn append_receipt(&mut self, tx_num: TxNumber, receipt: &N::Receipt) -> ProviderResult<()>
935    where
936        N::Receipt: Compact,
937    {
938        let start = Instant::now();
939        self.ensure_no_queued_prune()?;
940
941        debug_assert!(self.writer.user_header().segment() == StaticFileSegment::Receipts);
942        self.append_with_tx_number(tx_num, receipt)?;
943
944        if let Some(metrics) = &self.metrics {
945            metrics.record_segment_operation(
946                StaticFileSegment::Receipts,
947                StaticFileProviderOperation::Append,
948                Some(start.elapsed()),
949            );
950        }
951
952        Ok(())
953    }
954
955    /// Appends multiple receipts to the static file.
956    pub fn append_receipts<I, R>(&mut self, receipts: I) -> ProviderResult<()>
957    where
958        I: Iterator<Item = Result<(TxNumber, R), ProviderError>>,
959        R: Borrow<N::Receipt>,
960        N::Receipt: Compact,
961    {
962        debug_assert!(self.writer.user_header().segment() == StaticFileSegment::Receipts);
963
964        let mut receipts_iter = receipts.into_iter().peekable();
965        // If receipts are empty, we can simply return None
966        if receipts_iter.peek().is_none() {
967            return Ok(());
968        }
969
970        let start = Instant::now();
971        self.ensure_no_queued_prune()?;
972
973        // At this point receipts contains at least one receipt, so this would be overwritten.
974        let mut count: u64 = 0;
975
976        for receipt_result in receipts_iter {
977            let (tx_num, receipt) = receipt_result?;
978            self.append_with_tx_number(tx_num, receipt.borrow())?;
979            count += 1;
980        }
981
982        if let Some(metrics) = &self.metrics {
983            metrics.record_segment_operations(
984                StaticFileSegment::Receipts,
985                StaticFileProviderOperation::Append,
986                count,
987                Some(start.elapsed()),
988            );
989        }
990
991        Ok(())
992    }
993
994    /// Appends transaction sender to static file.
995    ///
996    /// It **DOES NOT** call `increment_block()`, it should be handled elsewhere. There might be
997    /// empty blocks and this function wouldn't be called.
998    pub fn append_transaction_sender(
999        &mut self,
1000        tx_num: TxNumber,
1001        sender: &alloy_primitives::Address,
1002    ) -> ProviderResult<()> {
1003        let start = Instant::now();
1004        self.ensure_no_queued_prune()?;
1005
1006        debug_assert!(self.writer.user_header().segment() == StaticFileSegment::TransactionSenders);
1007        self.append_with_tx_number(tx_num, sender)?;
1008
1009        if let Some(metrics) = &self.metrics {
1010            metrics.record_segment_operation(
1011                StaticFileSegment::TransactionSenders,
1012                StaticFileProviderOperation::Append,
1013                Some(start.elapsed()),
1014            );
1015        }
1016
1017        Ok(())
1018    }
1019
1020    /// Appends multiple transaction senders to the static file.
1021    pub fn append_transaction_senders<I>(&mut self, senders: I) -> ProviderResult<()>
1022    where
1023        I: Iterator<Item = (TxNumber, alloy_primitives::Address)>,
1024    {
1025        debug_assert!(self.writer.user_header().segment() == StaticFileSegment::TransactionSenders);
1026
1027        let mut senders_iter = senders.into_iter().peekable();
1028        // If senders are empty, we can simply return
1029        if senders_iter.peek().is_none() {
1030            return Ok(());
1031        }
1032
1033        let start = Instant::now();
1034        self.ensure_no_queued_prune()?;
1035
1036        // At this point senders contains at least one sender, so this would be overwritten.
1037        let mut count: u64 = 0;
1038        for (tx_num, sender) in senders_iter {
1039            self.append_with_tx_number(tx_num, sender)?;
1040            count += 1;
1041        }
1042
1043        if let Some(metrics) = &self.metrics {
1044            metrics.record_segment_operations(
1045                StaticFileSegment::TransactionSenders,
1046                StaticFileProviderOperation::Append,
1047                count,
1048                Some(start.elapsed()),
1049            );
1050        }
1051
1052        Ok(())
1053    }
1054
1055    /// Appends a block changeset to the static file.
1056    ///
1057    /// It **CALLS** `increment_block()`.
1058    ///
1059    /// Returns the current number of changesets in the file, if any.
1060    pub fn append_account_changeset(
1061        &mut self,
1062        mut changeset: Vec<AccountBeforeTx>,
1063        block_number: u64,
1064    ) -> ProviderResult<()> {
1065        debug_assert!(self.writer.user_header().segment() == StaticFileSegment::AccountChangeSets);
1066        let start = Instant::now();
1067
1068        self.increment_block(block_number)?;
1069        self.ensure_no_queued_prune()?;
1070
1071        // first sort the changeset by address
1072        changeset.sort_by_key(|change| change.address);
1073
1074        let mut count: u64 = 0;
1075
1076        for change in changeset {
1077            self.append_change(&change)?;
1078            count += 1;
1079        }
1080
1081        if let Some(metrics) = &self.metrics {
1082            metrics.record_segment_operations(
1083                StaticFileSegment::AccountChangeSets,
1084                StaticFileProviderOperation::Append,
1085                count,
1086                Some(start.elapsed()),
1087            );
1088        }
1089
1090        Ok(())
1091    }
1092
1093    /// Appends a block storage changeset to the static file.
1094    ///
1095    /// It **CALLS** `increment_block()`.
1096    pub fn append_storage_changeset(
1097        &mut self,
1098        mut changeset: Vec<StorageBeforeTx>,
1099        block_number: u64,
1100    ) -> ProviderResult<()> {
1101        debug_assert!(self.writer.user_header().segment() == StaticFileSegment::StorageChangeSets);
1102        let start = Instant::now();
1103
1104        self.increment_block(block_number)?;
1105        self.ensure_no_queued_prune()?;
1106
1107        // sort by address + storage key
1108        changeset.sort_by_key(|change| (change.address, change.key));
1109
1110        let mut count: u64 = 0;
1111        for change in changeset {
1112            self.append_change(&change)?;
1113            count += 1;
1114        }
1115
1116        if let Some(metrics) = &self.metrics {
1117            metrics.record_segment_operations(
1118                StaticFileSegment::StorageChangeSets,
1119                StaticFileProviderOperation::Append,
1120                count,
1121                Some(start.elapsed()),
1122            );
1123        }
1124
1125        Ok(())
1126    }
1127
1128    /// Adds an instruction to prune `to_delete` transactions during commit.
1129    ///
1130    /// Note: `last_block` refers to the block the unwinds ends at.
1131    pub fn prune_transactions(
1132        &mut self,
1133        to_delete: u64,
1134        last_block: BlockNumber,
1135    ) -> ProviderResult<()> {
1136        debug_assert_eq!(self.writer.user_header().segment(), StaticFileSegment::Transactions);
1137        self.queue_prune(PruneStrategy::Transactions { num_rows: to_delete, last_block })
1138    }
1139
1140    /// Adds an instruction to prune `to_delete` receipts during commit.
1141    ///
1142    /// Note: `last_block` refers to the block the unwinds ends at.
1143    pub fn prune_receipts(
1144        &mut self,
1145        to_delete: u64,
1146        last_block: BlockNumber,
1147    ) -> ProviderResult<()> {
1148        debug_assert_eq!(self.writer.user_header().segment(), StaticFileSegment::Receipts);
1149        self.queue_prune(PruneStrategy::Receipts { num_rows: to_delete, last_block })
1150    }
1151
1152    /// Adds an instruction to prune `to_delete` transaction senders during commit.
1153    ///
1154    /// Note: `last_block` refers to the block the unwinds ends at.
1155    pub fn prune_transaction_senders(
1156        &mut self,
1157        to_delete: u64,
1158        last_block: BlockNumber,
1159    ) -> ProviderResult<()> {
1160        debug_assert_eq!(
1161            self.writer.user_header().segment(),
1162            StaticFileSegment::TransactionSenders
1163        );
1164        self.queue_prune(PruneStrategy::TransactionSenders { num_rows: to_delete, last_block })
1165    }
1166
1167    /// Adds an instruction to prune `to_delete` headers during commit.
1168    pub fn prune_headers(&mut self, to_delete: u64) -> ProviderResult<()> {
1169        debug_assert_eq!(self.writer.user_header().segment(), StaticFileSegment::Headers);
1170        self.queue_prune(PruneStrategy::Headers { num_blocks: to_delete })
1171    }
1172
1173    /// Adds an instruction to prune changesets until the given block.
1174    pub fn prune_account_changesets(&mut self, last_block: u64) -> ProviderResult<()> {
1175        debug_assert_eq!(self.writer.user_header().segment(), StaticFileSegment::AccountChangeSets);
1176        self.queue_prune(PruneStrategy::AccountChangeSets { last_block })
1177    }
1178
1179    /// Adds an instruction to prune storage changesets until the given block.
1180    pub fn prune_storage_changesets(&mut self, last_block: u64) -> ProviderResult<()> {
1181        debug_assert_eq!(self.writer.user_header().segment(), StaticFileSegment::StorageChangeSets);
1182        self.queue_prune(PruneStrategy::StorageChangeSets { last_block })
1183    }
1184
1185    /// Adds an instruction to prune elements during commit using the specified strategy.
1186    fn queue_prune(&mut self, strategy: PruneStrategy) -> ProviderResult<()> {
1187        self.ensure_no_queued_prune()?;
1188        self.prune_on_commit = Some(strategy);
1189        Ok(())
1190    }
1191
1192    /// Returns Error if there is a pruning instruction that needs to be applied.
1193    fn ensure_no_queued_prune(&self) -> ProviderResult<()> {
1194        if self.prune_on_commit.is_some() {
1195            return Err(ProviderError::other(StaticFileWriterError::new(
1196                "Pruning should be committed before appending or pruning more data",
1197            )));
1198        }
1199        Ok(())
1200    }
1201
1202    /// Removes the last `to_delete` transactions from the data file.
1203    fn prune_transaction_data(
1204        &mut self,
1205        to_delete: u64,
1206        last_block: BlockNumber,
1207    ) -> ProviderResult<()> {
1208        let start = Instant::now();
1209
1210        debug_assert!(self.writer.user_header().segment() == StaticFileSegment::Transactions);
1211
1212        self.truncate(to_delete, Some(last_block))?;
1213
1214        if let Some(metrics) = &self.metrics {
1215            metrics.record_segment_operation(
1216                StaticFileSegment::Transactions,
1217                StaticFileProviderOperation::Prune,
1218                Some(start.elapsed()),
1219            );
1220        }
1221
1222        Ok(())
1223    }
1224
1225    /// Prunes the last `to_delete` account changesets from the data file.
1226    fn prune_account_changeset_data(&mut self, last_block: BlockNumber) -> ProviderResult<()> {
1227        let start = Instant::now();
1228
1229        debug_assert!(self.writer.user_header().segment() == StaticFileSegment::AccountChangeSets);
1230
1231        self.truncate_changesets(last_block)?;
1232
1233        if let Some(metrics) = &self.metrics {
1234            metrics.record_segment_operation(
1235                StaticFileSegment::AccountChangeSets,
1236                StaticFileProviderOperation::Prune,
1237                Some(start.elapsed()),
1238            );
1239        }
1240
1241        Ok(())
1242    }
1243
1244    /// Prunes the last storage changesets from the data file.
1245    fn prune_storage_changeset_data(&mut self, last_block: BlockNumber) -> ProviderResult<()> {
1246        let start = Instant::now();
1247
1248        debug_assert!(self.writer.user_header().segment() == StaticFileSegment::StorageChangeSets);
1249
1250        self.truncate_changesets(last_block)?;
1251
1252        if let Some(metrics) = &self.metrics {
1253            metrics.record_segment_operation(
1254                StaticFileSegment::StorageChangeSets,
1255                StaticFileProviderOperation::Prune,
1256                Some(start.elapsed()),
1257            );
1258        }
1259
1260        Ok(())
1261    }
1262
1263    /// Prunes the last `to_delete` receipts from the data file.
1264    fn prune_receipt_data(
1265        &mut self,
1266        to_delete: u64,
1267        last_block: BlockNumber,
1268    ) -> ProviderResult<()> {
1269        let start = Instant::now();
1270
1271        debug_assert!(self.writer.user_header().segment() == StaticFileSegment::Receipts);
1272
1273        self.truncate(to_delete, Some(last_block))?;
1274
1275        if let Some(metrics) = &self.metrics {
1276            metrics.record_segment_operation(
1277                StaticFileSegment::Receipts,
1278                StaticFileProviderOperation::Prune,
1279                Some(start.elapsed()),
1280            );
1281        }
1282
1283        Ok(())
1284    }
1285
1286    /// Prunes the last `to_delete` transaction senders from the data file.
1287    fn prune_transaction_sender_data(
1288        &mut self,
1289        to_delete: u64,
1290        last_block: BlockNumber,
1291    ) -> ProviderResult<()> {
1292        let start = Instant::now();
1293
1294        debug_assert!(self.writer.user_header().segment() == StaticFileSegment::TransactionSenders);
1295
1296        self.truncate(to_delete, Some(last_block))?;
1297
1298        if let Some(metrics) = &self.metrics {
1299            metrics.record_segment_operation(
1300                StaticFileSegment::TransactionSenders,
1301                StaticFileProviderOperation::Prune,
1302                Some(start.elapsed()),
1303            );
1304        }
1305
1306        Ok(())
1307    }
1308
1309    /// Prunes the last `to_delete` headers from the data file.
1310    fn prune_header_data(&mut self, to_delete: u64) -> ProviderResult<()> {
1311        let start = Instant::now();
1312
1313        debug_assert!(self.writer.user_header().segment() == StaticFileSegment::Headers);
1314
1315        self.truncate(to_delete, None)?;
1316
1317        if let Some(metrics) = &self.metrics {
1318            metrics.record_segment_operation(
1319                StaticFileSegment::Headers,
1320                StaticFileProviderOperation::Prune,
1321                Some(start.elapsed()),
1322            );
1323        }
1324
1325        Ok(())
1326    }
1327
1328    /// Returns a [`StaticFileProvider`] associated with this writer.
1329    pub fn reader(&self) -> StaticFileProvider<N> {
1330        Self::upgrade_provider_to_strong_reference(&self.reader)
1331    }
1332
1333    /// Upgrades a weak reference of [`StaticFileProviderInner`] to a strong reference
1334    /// [`StaticFileProvider`].
1335    ///
1336    /// # Panics
1337    ///
1338    /// Panics if the parent [`StaticFileProvider`] is fully dropped while the child writer is still
1339    /// active. In reality, it's impossible to detach the [`StaticFileProviderRW`] from the
1340    /// [`StaticFileProvider`].
1341    fn upgrade_provider_to_strong_reference(
1342        provider: &Weak<StaticFileProviderInner<N>>,
1343    ) -> StaticFileProvider<N> {
1344        provider.upgrade().map(StaticFileProvider).expect("StaticFileProvider is dropped")
1345    }
1346
1347    /// Helper function to access [`SegmentHeader`].
1348    pub const fn user_header(&self) -> &SegmentHeader {
1349        self.writer.user_header()
1350    }
1351
1352    /// Helper function to access a mutable reference to [`SegmentHeader`].
1353    pub const fn user_header_mut(&mut self) -> &mut SegmentHeader {
1354        self.writer.user_header_mut()
1355    }
1356
1357    /// Helper function to override block range for testing.
1358    #[cfg(any(test, feature = "test-utils"))]
1359    pub const fn set_block_range(&mut self, block_range: std::ops::RangeInclusive<BlockNumber>) {
1360        self.writer.user_header_mut().set_block_range(*block_range.start(), *block_range.end())
1361    }
1362
1363    /// Helper function to override block range for testing.
1364    #[cfg(any(test, feature = "test-utils"))]
1365    pub const fn inner(&mut self) -> &mut NippyJarWriter<SegmentHeader> {
1366        &mut self.writer
1367    }
1368}
1369
1370fn create_jar(
1371    segment: StaticFileSegment,
1372    path: &Path,
1373    expected_block_range: SegmentRangeInclusive,
1374) -> NippyJar<SegmentHeader> {
1375    let mut jar = NippyJar::new(
1376        segment.columns(),
1377        path,
1378        SegmentHeader::new(expected_block_range, None, None, segment),
1379    );
1380
1381    // Transaction and Receipt already have the compression scheme used natively in its encoding.
1382    // (zstd-dictionary)
1383    if segment.is_headers() {
1384        jar = jar.with_lz4();
1385    }
1386
1387    jar
1388}