reth_provider/providers/static_file/
writer.rs

1use super::{
2    manager::StaticFileProviderInner, metrics::StaticFileProviderMetrics, StaticFileProvider,
3};
4use crate::providers::static_file::metrics::StaticFileProviderOperation;
5use alloy_consensus::BlockHeader;
6use alloy_primitives::{BlockHash, BlockNumber, TxNumber, U256};
7use parking_lot::{lock_api::RwLockWriteGuard, RawRwLock, RwLock};
8use reth_codecs::Compact;
9use reth_db_api::models::{
10    CompactU256, StoredBlockBodyIndices, StoredBlockOmmers, StoredBlockWithdrawals,
11};
12use reth_nippy_jar::{NippyJar, NippyJarError, NippyJarWriter};
13use reth_node_types::NodePrimitives;
14use reth_static_file_types::{SegmentHeader, SegmentRangeInclusive, StaticFileSegment};
15use reth_storage_errors::provider::{ProviderError, ProviderResult, StaticFileWriterError};
16use std::{
17    borrow::Borrow,
18    fmt::Debug,
19    path::{Path, PathBuf},
20    sync::{Arc, Weak},
21    time::Instant,
22};
23use tracing::debug;
24
25/// Static file writers for every known [`StaticFileSegment`].
26///
27/// WARNING: Trying to use more than one writer for the same segment type **will result in a
28/// deadlock**.
29#[derive(Debug)]
30pub(crate) struct StaticFileWriters<N> {
31    headers: RwLock<Option<StaticFileProviderRW<N>>>,
32    transactions: RwLock<Option<StaticFileProviderRW<N>>>,
33    receipts: RwLock<Option<StaticFileProviderRW<N>>>,
34    block_meta: RwLock<Option<StaticFileProviderRW<N>>>,
35}
36
37impl<N> Default for StaticFileWriters<N> {
38    fn default() -> Self {
39        Self {
40            headers: Default::default(),
41            transactions: Default::default(),
42            receipts: Default::default(),
43            block_meta: Default::default(),
44        }
45    }
46}
47
48impl<N: NodePrimitives> StaticFileWriters<N> {
49    pub(crate) fn get_or_create(
50        &self,
51        segment: StaticFileSegment,
52        create_fn: impl FnOnce() -> ProviderResult<StaticFileProviderRW<N>>,
53    ) -> ProviderResult<StaticFileProviderRWRefMut<'_, N>> {
54        let mut write_guard = match segment {
55            StaticFileSegment::Headers => self.headers.write(),
56            StaticFileSegment::Transactions => self.transactions.write(),
57            StaticFileSegment::Receipts => self.receipts.write(),
58            StaticFileSegment::BlockMeta => self.block_meta.write(),
59        };
60
61        if write_guard.is_none() {
62            *write_guard = Some(create_fn()?);
63        }
64
65        Ok(StaticFileProviderRWRefMut(write_guard))
66    }
67
68    pub(crate) fn commit(&self) -> ProviderResult<()> {
69        for writer_lock in [&self.headers, &self.transactions, &self.receipts] {
70            let mut writer = writer_lock.write();
71            if let Some(writer) = writer.as_mut() {
72                writer.commit()?;
73            }
74        }
75        Ok(())
76    }
77}
78
79/// Mutable reference to a [`StaticFileProviderRW`] behind a [`RwLockWriteGuard`].
80#[derive(Debug)]
81pub struct StaticFileProviderRWRefMut<'a, N>(
82    pub(crate) RwLockWriteGuard<'a, RawRwLock, Option<StaticFileProviderRW<N>>>,
83);
84
85impl<N> std::ops::DerefMut for StaticFileProviderRWRefMut<'_, N> {
86    fn deref_mut(&mut self) -> &mut Self::Target {
87        // This is always created by [`StaticFileWriters::get_or_create`]
88        self.0.as_mut().expect("static file writer provider should be init")
89    }
90}
91
92impl<N> std::ops::Deref for StaticFileProviderRWRefMut<'_, N> {
93    type Target = StaticFileProviderRW<N>;
94
95    fn deref(&self) -> &Self::Target {
96        // This is always created by [`StaticFileWriters::get_or_create`]
97        self.0.as_ref().expect("static file writer provider should be init")
98    }
99}
100
101#[derive(Debug)]
102/// Extends `StaticFileProvider` with writing capabilities
103pub struct StaticFileProviderRW<N> {
104    /// Reference back to the provider. We need [Weak] here because [`StaticFileProviderRW`] is
105    /// stored in a [`dashmap::DashMap`] inside the parent [`StaticFileProvider`].which is an
106    /// [Arc]. If we were to use an [Arc] here, we would create a reference cycle.
107    reader: Weak<StaticFileProviderInner<N>>,
108    /// A [`NippyJarWriter`] instance.
109    writer: NippyJarWriter<SegmentHeader>,
110    /// Path to opened file.
111    data_path: PathBuf,
112    /// Reusable buffer for encoding appended data.
113    buf: Vec<u8>,
114    /// Metrics.
115    metrics: Option<Arc<StaticFileProviderMetrics>>,
116    /// On commit, does the instructed pruning: number of lines, and if it applies, the last block
117    /// it ends at.
118    prune_on_commit: Option<(u64, Option<BlockNumber>)>,
119}
120
121impl<N: NodePrimitives> StaticFileProviderRW<N> {
122    /// Creates a new [`StaticFileProviderRW`] for a [`StaticFileSegment`].
123    ///
124    /// Before use, transaction based segments should ensure the block end range is the expected
125    /// one, and heal if not. For more check `Self::ensure_end_range_consistency`.
126    pub fn new(
127        segment: StaticFileSegment,
128        block: BlockNumber,
129        reader: Weak<StaticFileProviderInner<N>>,
130        metrics: Option<Arc<StaticFileProviderMetrics>>,
131    ) -> ProviderResult<Self> {
132        let (writer, data_path) = Self::open(segment, block, reader.clone(), metrics.clone())?;
133        let mut writer = Self {
134            writer,
135            data_path,
136            buf: Vec::with_capacity(100),
137            reader,
138            metrics,
139            prune_on_commit: None,
140        };
141
142        writer.ensure_end_range_consistency()?;
143
144        Ok(writer)
145    }
146
147    fn open(
148        segment: StaticFileSegment,
149        block: u64,
150        reader: Weak<StaticFileProviderInner<N>>,
151        metrics: Option<Arc<StaticFileProviderMetrics>>,
152    ) -> ProviderResult<(NippyJarWriter<SegmentHeader>, PathBuf)> {
153        let start = Instant::now();
154
155        let static_file_provider = Self::upgrade_provider_to_strong_reference(&reader);
156
157        let block_range = static_file_provider.find_fixed_range(block);
158        let (jar, path) = match static_file_provider.get_segment_provider_from_block(
159            segment,
160            block_range.start(),
161            None,
162        ) {
163            Ok(provider) => (
164                NippyJar::load(provider.data_path()).map_err(ProviderError::other)?,
165                provider.data_path().into(),
166            ),
167            Err(ProviderError::MissingStaticFileBlock(_, _)) => {
168                let path = static_file_provider.directory().join(segment.filename(&block_range));
169                (create_jar(segment, &path, block_range), path)
170            }
171            Err(err) => return Err(err),
172        };
173
174        let result = match NippyJarWriter::new(jar) {
175            Ok(writer) => Ok((writer, path)),
176            Err(NippyJarError::FrozenJar) => {
177                // This static file has been frozen, so we should
178                Err(ProviderError::FinalizedStaticFile(segment, block))
179            }
180            Err(e) => Err(ProviderError::other(e)),
181        }?;
182
183        if let Some(metrics) = &metrics {
184            metrics.record_segment_operation(
185                segment,
186                StaticFileProviderOperation::OpenWriter,
187                Some(start.elapsed()),
188            );
189        }
190
191        Ok(result)
192    }
193
194    /// If a file level healing happens, we need to update the end range on the
195    /// [`SegmentHeader`].
196    ///
197    /// However, for transaction based segments, the block end range has to be found and healed
198    /// externally.
199    ///
200    /// Check [`reth_nippy_jar::NippyJarChecker`] &
201    /// [`NippyJarWriter`] for more on healing.
202    fn ensure_end_range_consistency(&mut self) -> ProviderResult<()> {
203        // If we have lost rows (in this run or previous), we need to update the [SegmentHeader].
204        let expected_rows = if self.user_header().segment().is_headers() {
205            self.user_header().block_len().unwrap_or_default()
206        } else {
207            self.user_header().tx_len().unwrap_or_default()
208        };
209        let pruned_rows = expected_rows - self.writer.rows() as u64;
210        if pruned_rows > 0 {
211            self.user_header_mut().prune(pruned_rows);
212        }
213
214        self.writer.commit().map_err(ProviderError::other)?;
215
216        // Updates the [SnapshotProvider] manager
217        self.update_index()?;
218        Ok(())
219    }
220
221    /// Commits configuration changes to disk and updates the reader index with the new changes.
222    pub fn commit(&mut self) -> ProviderResult<()> {
223        let start = Instant::now();
224
225        // Truncates the data file if instructed to.
226        if let Some((to_delete, last_block_number)) = self.prune_on_commit.take() {
227            match self.writer.user_header().segment() {
228                StaticFileSegment::Headers => self.prune_header_data(to_delete)?,
229                StaticFileSegment::Transactions => self
230                    .prune_transaction_data(to_delete, last_block_number.expect("should exist"))?,
231                StaticFileSegment::Receipts => {
232                    self.prune_receipt_data(to_delete, last_block_number.expect("should exist"))?
233                }
234                StaticFileSegment::BlockMeta => todo!(),
235            }
236        }
237
238        if self.writer.is_dirty() {
239            // Commits offsets and new user_header to disk
240            self.writer.commit().map_err(ProviderError::other)?;
241
242            if let Some(metrics) = &self.metrics {
243                metrics.record_segment_operation(
244                    self.writer.user_header().segment(),
245                    StaticFileProviderOperation::CommitWriter,
246                    Some(start.elapsed()),
247                );
248            }
249
250            debug!(
251                target: "provider::static_file",
252                segment = ?self.writer.user_header().segment(),
253                path = ?self.data_path,
254                duration = ?start.elapsed(),
255                "Commit"
256            );
257
258            self.update_index()?;
259        }
260
261        Ok(())
262    }
263
264    /// Commits configuration changes to disk and updates the reader index with the new changes.
265    ///
266    /// CAUTION: does not call `sync_all` on the files.
267    #[cfg(feature = "test-utils")]
268    pub fn commit_without_sync_all(&mut self) -> ProviderResult<()> {
269        let start = Instant::now();
270
271        // Commits offsets and new user_header to disk
272        self.writer.commit_without_sync_all().map_err(ProviderError::other)?;
273
274        if let Some(metrics) = &self.metrics {
275            metrics.record_segment_operation(
276                self.writer.user_header().segment(),
277                StaticFileProviderOperation::CommitWriter,
278                Some(start.elapsed()),
279            );
280        }
281
282        debug!(
283            target: "provider::static_file",
284            segment = ?self.writer.user_header().segment(),
285            path = ?self.data_path,
286            duration = ?start.elapsed(),
287            "Commit"
288        );
289
290        self.update_index()?;
291
292        Ok(())
293    }
294
295    /// Updates the `self.reader` internal index.
296    fn update_index(&self) -> ProviderResult<()> {
297        // We find the maximum block of the segment by checking this writer's last block.
298        //
299        // However if there's no block range (because there's no data), we try to calculate it by
300        // subtracting 1 from the expected block start, resulting on the last block of the
301        // previous file.
302        //
303        // If that expected block start is 0, then it means that there's no actual block data, and
304        // there's no block data in static files.
305        let segment_max_block = self
306            .writer
307            .user_header()
308            .block_range()
309            .as_ref()
310            .map(|block_range| block_range.end())
311            .or_else(|| {
312                (self.writer.user_header().expected_block_start() > 0)
313                    .then(|| self.writer.user_header().expected_block_start() - 1)
314            });
315
316        self.reader().update_index(self.writer.user_header().segment(), segment_max_block)
317    }
318
319    /// Allows to increment the [`SegmentHeader`] end block. It will commit the current static file,
320    /// and create the next one if we are past the end range.
321    ///
322    /// Returns the current [`BlockNumber`] as seen in the static file.
323    pub fn increment_block(&mut self, expected_block_number: BlockNumber) -> ProviderResult<()> {
324        let segment = self.writer.user_header().segment();
325
326        self.check_next_block_number(expected_block_number)?;
327
328        let start = Instant::now();
329        if let Some(last_block) = self.writer.user_header().block_end() {
330            // We have finished the previous static file and must freeze it
331            if last_block == self.writer.user_header().expected_block_end() {
332                // Commits offsets and new user_header to disk
333                self.commit()?;
334
335                // Opens the new static file
336                let (writer, data_path) =
337                    Self::open(segment, last_block + 1, self.reader.clone(), self.metrics.clone())?;
338                self.writer = writer;
339                self.data_path = data_path;
340
341                *self.writer.user_header_mut() = SegmentHeader::new(
342                    self.reader().find_fixed_range(last_block + 1),
343                    None,
344                    None,
345                    segment,
346                );
347            }
348        }
349
350        self.writer.user_header_mut().increment_block();
351        if let Some(metrics) = &self.metrics {
352            metrics.record_segment_operation(
353                segment,
354                StaticFileProviderOperation::IncrementBlock,
355                Some(start.elapsed()),
356            );
357        }
358
359        Ok(())
360    }
361
362    /// Verifies if the incoming block number matches the next expected block number
363    /// for a static file. This ensures data continuity when adding new blocks.
364    fn check_next_block_number(&self, expected_block_number: u64) -> ProviderResult<()> {
365        // The next static file block number can be found by checking the one after block_end.
366        // However if it's a new file that hasn't been added any data, its block range will actually
367        // be None. In that case, the next block will be found on `expected_block_start`.
368        let next_static_file_block = self
369            .writer
370            .user_header()
371            .block_end()
372            .map(|b| b + 1)
373            .unwrap_or_else(|| self.writer.user_header().expected_block_start());
374
375        if expected_block_number != next_static_file_block {
376            return Err(ProviderError::UnexpectedStaticFileBlockNumber(
377                self.writer.user_header().segment(),
378                expected_block_number,
379                next_static_file_block,
380            ))
381        }
382        Ok(())
383    }
384
385    /// Truncates a number of rows from disk. It deletes and loads an older static file if block
386    /// goes beyond the start of the current block range.
387    ///
388    /// **`last_block`** should be passed only with transaction based segments.
389    ///
390    /// # Note
391    /// Commits to the configuration file at the end.
392    fn truncate(&mut self, num_rows: u64, last_block: Option<u64>) -> ProviderResult<()> {
393        let mut remaining_rows = num_rows;
394        let segment = self.writer.user_header().segment();
395        while remaining_rows > 0 {
396            let len = if segment.is_block_based() {
397                self.writer.user_header().block_len().unwrap_or_default()
398            } else {
399                self.writer.user_header().tx_len().unwrap_or_default()
400            };
401
402            if remaining_rows >= len {
403                // If there's more rows to delete than this static file contains, then just
404                // delete the whole file and go to the next static file
405                let block_start = self.writer.user_header().expected_block_start();
406
407                // We only delete the file if it's NOT the first static file AND:
408                // * it's a Header segment  OR
409                // * it's a tx-based segment AND `last_block` is lower than the first block of this
410                //   file's block range. Otherwise, having no rows simply means that this block
411                //   range has no transactions, but the file should remain.
412                if block_start != 0 &&
413                    (segment.is_headers() || last_block.is_some_and(|b| b < block_start))
414                {
415                    self.delete_current_and_open_previous()?;
416                } else {
417                    // Update `SegmentHeader`
418                    self.writer.user_header_mut().prune(len);
419                    self.writer.prune_rows(len as usize).map_err(ProviderError::other)?;
420                    break
421                }
422
423                remaining_rows -= len;
424            } else {
425                // Update `SegmentHeader`
426                self.writer.user_header_mut().prune(remaining_rows);
427
428                // Truncate data
429                self.writer.prune_rows(remaining_rows as usize).map_err(ProviderError::other)?;
430                remaining_rows = 0;
431            }
432        }
433
434        // Only Transactions and Receipts
435        if let Some(last_block) = last_block {
436            let mut expected_block_start = self.writer.user_header().expected_block_start();
437
438            if num_rows == 0 {
439                // Edge case for when we are unwinding a chain of empty blocks that goes across
440                // files, and therefore, the only reference point to know which file
441                // we are supposed to be at is `last_block`.
442                while last_block < expected_block_start {
443                    self.delete_current_and_open_previous()?;
444                    expected_block_start = self.writer.user_header().expected_block_start();
445                }
446            }
447            self.writer.user_header_mut().set_block_range(expected_block_start, last_block);
448        }
449
450        // Commits new changes to disk.
451        self.commit()?;
452
453        Ok(())
454    }
455
456    /// Delete the current static file, and replace this provider writer with the previous static
457    /// file.
458    fn delete_current_and_open_previous(&mut self) -> Result<(), ProviderError> {
459        let current_path = self.data_path.clone();
460        let (previous_writer, data_path) = Self::open(
461            self.user_header().segment(),
462            self.writer.user_header().expected_block_start() - 1,
463            self.reader.clone(),
464            self.metrics.clone(),
465        )?;
466        self.writer = previous_writer;
467        self.writer.set_dirty();
468        self.data_path = data_path;
469        NippyJar::<SegmentHeader>::load(&current_path)
470            .map_err(ProviderError::other)?
471            .delete()
472            .map_err(ProviderError::other)?;
473        Ok(())
474    }
475
476    /// Appends column to static file.
477    fn append_column<T: Compact>(&mut self, column: T) -> ProviderResult<()> {
478        self.buf.clear();
479        column.to_compact(&mut self.buf);
480
481        self.writer.append_column(Some(Ok(&self.buf))).map_err(ProviderError::other)?;
482        Ok(())
483    }
484
485    /// Appends to tx number-based static file.
486    ///
487    /// Returns the current [`TxNumber`] as seen in the static file.
488    fn append_with_tx_number<V: Compact>(
489        &mut self,
490        tx_num: TxNumber,
491        value: V,
492    ) -> ProviderResult<()> {
493        if let Some(range) = self.writer.user_header().tx_range() {
494            let next_tx = range.end() + 1;
495            if next_tx != tx_num {
496                return Err(ProviderError::UnexpectedStaticFileTxNumber(
497                    self.writer.user_header().segment(),
498                    tx_num,
499                    next_tx,
500                ))
501            }
502            self.writer.user_header_mut().increment_tx();
503        } else {
504            self.writer.user_header_mut().set_tx_range(tx_num, tx_num);
505        }
506
507        self.append_column(value)?;
508
509        Ok(())
510    }
511
512    /// Appends header to static file.
513    ///
514    /// It **CALLS** `increment_block()` since the number of headers is equal to the number of
515    /// blocks.
516    ///
517    /// Returns the current [`BlockNumber`] as seen in the static file.
518    pub fn append_header(
519        &mut self,
520        header: &N::BlockHeader,
521        total_difficulty: U256,
522        hash: &BlockHash,
523    ) -> ProviderResult<()>
524    where
525        N::BlockHeader: Compact,
526    {
527        let start = Instant::now();
528        self.ensure_no_queued_prune()?;
529
530        debug_assert!(self.writer.user_header().segment() == StaticFileSegment::Headers);
531
532        self.increment_block(header.number())?;
533
534        self.append_column(header)?;
535        self.append_column(CompactU256::from(total_difficulty))?;
536        self.append_column(hash)?;
537
538        if let Some(metrics) = &self.metrics {
539            metrics.record_segment_operation(
540                StaticFileSegment::Headers,
541                StaticFileProviderOperation::Append,
542                Some(start.elapsed()),
543            );
544        }
545
546        Ok(())
547    }
548
549    /// Appends [`StoredBlockBodyIndices`], [`StoredBlockOmmers`] and [`StoredBlockWithdrawals`] to
550    /// static file.
551    ///
552    /// It **CALLS** `increment_block()` since it's a block based segment.
553    pub fn append_eth_block_meta(
554        &mut self,
555        body_indices: &StoredBlockBodyIndices,
556        ommers: &StoredBlockOmmers<N::BlockHeader>,
557        withdrawals: &StoredBlockWithdrawals,
558        expected_block_number: BlockNumber,
559    ) -> ProviderResult<()>
560    where
561        N::BlockHeader: Compact,
562    {
563        self.append_block_meta(body_indices, ommers, withdrawals, expected_block_number)
564    }
565
566    /// Appends [`StoredBlockBodyIndices`] and any other two arbitrary types belonging to the block
567    /// body to static file.
568    ///
569    /// It **CALLS** `increment_block()` since it's a block based segment.
570    pub fn append_block_meta<F1, F2>(
571        &mut self,
572        body_indices: &StoredBlockBodyIndices,
573        field1: &F1,
574        field2: &F2,
575        expected_block_number: BlockNumber,
576    ) -> ProviderResult<()>
577    where
578        N::BlockHeader: Compact,
579        F1: Compact,
580        F2: Compact,
581    {
582        let start = Instant::now();
583        self.ensure_no_queued_prune()?;
584
585        debug_assert!(self.writer.user_header().segment() == StaticFileSegment::BlockMeta);
586
587        self.increment_block(expected_block_number)?;
588
589        self.append_column(body_indices)?;
590        self.append_column(field1)?;
591        self.append_column(field2)?;
592
593        if let Some(metrics) = &self.metrics {
594            metrics.record_segment_operation(
595                StaticFileSegment::BlockMeta,
596                StaticFileProviderOperation::Append,
597                Some(start.elapsed()),
598            );
599        }
600
601        Ok(())
602    }
603
604    /// Appends transaction to static file.
605    ///
606    /// It **DOES NOT CALL** `increment_block()`, it should be handled elsewhere. There might be
607    /// empty blocks and this function wouldn't be called.
608    ///
609    /// Returns the current [`TxNumber`] as seen in the static file.
610    pub fn append_transaction(&mut self, tx_num: TxNumber, tx: &N::SignedTx) -> ProviderResult<()>
611    where
612        N::SignedTx: Compact,
613    {
614        let start = Instant::now();
615        self.ensure_no_queued_prune()?;
616
617        debug_assert!(self.writer.user_header().segment() == StaticFileSegment::Transactions);
618        self.append_with_tx_number(tx_num, tx)?;
619
620        if let Some(metrics) = &self.metrics {
621            metrics.record_segment_operation(
622                StaticFileSegment::Transactions,
623                StaticFileProviderOperation::Append,
624                Some(start.elapsed()),
625            );
626        }
627
628        Ok(())
629    }
630
631    /// Appends receipt to static file.
632    ///
633    /// It **DOES NOT** call `increment_block()`, it should be handled elsewhere. There might be
634    /// empty blocks and this function wouldn't be called.
635    ///
636    /// Returns the current [`TxNumber`] as seen in the static file.
637    pub fn append_receipt(&mut self, tx_num: TxNumber, receipt: &N::Receipt) -> ProviderResult<()>
638    where
639        N::Receipt: Compact,
640    {
641        let start = Instant::now();
642        self.ensure_no_queued_prune()?;
643
644        debug_assert!(self.writer.user_header().segment() == StaticFileSegment::Receipts);
645        self.append_with_tx_number(tx_num, receipt)?;
646
647        if let Some(metrics) = &self.metrics {
648            metrics.record_segment_operation(
649                StaticFileSegment::Receipts,
650                StaticFileProviderOperation::Append,
651                Some(start.elapsed()),
652            );
653        }
654
655        Ok(())
656    }
657
658    /// Appends multiple receipts to the static file.
659    ///
660    /// Returns the current [`TxNumber`] as seen in the static file, if any.
661    pub fn append_receipts<I, R>(&mut self, receipts: I) -> ProviderResult<Option<TxNumber>>
662    where
663        I: Iterator<Item = Result<(TxNumber, R), ProviderError>>,
664        R: Borrow<N::Receipt>,
665        N::Receipt: Compact,
666    {
667        debug_assert!(self.writer.user_header().segment() == StaticFileSegment::Receipts);
668
669        let mut receipts_iter = receipts.into_iter().peekable();
670        // If receipts are empty, we can simply return None
671        if receipts_iter.peek().is_none() {
672            return Ok(None);
673        }
674
675        let start = Instant::now();
676        self.ensure_no_queued_prune()?;
677
678        // At this point receipts contains at least one receipt, so this would be overwritten.
679        let mut tx_number = 0;
680        let mut count: u64 = 0;
681
682        for receipt_result in receipts_iter {
683            let (tx_num, receipt) = receipt_result?;
684            self.append_with_tx_number(tx_num, receipt.borrow())?;
685            tx_number = tx_num;
686            count += 1;
687        }
688
689        if let Some(metrics) = &self.metrics {
690            metrics.record_segment_operations(
691                StaticFileSegment::Receipts,
692                StaticFileProviderOperation::Append,
693                count,
694                Some(start.elapsed()),
695            );
696        }
697
698        Ok(Some(tx_number))
699    }
700
701    /// Adds an instruction to prune `to_delete`transactions during commit.
702    ///
703    /// Note: `last_block` refers to the block the unwinds ends at.
704    pub fn prune_transactions(
705        &mut self,
706        to_delete: u64,
707        last_block: BlockNumber,
708    ) -> ProviderResult<()> {
709        debug_assert_eq!(self.writer.user_header().segment(), StaticFileSegment::Transactions);
710        self.queue_prune(to_delete, Some(last_block))
711    }
712
713    /// Adds an instruction to prune `to_delete` receipts during commit.
714    ///
715    /// Note: `last_block` refers to the block the unwinds ends at.
716    pub fn prune_receipts(
717        &mut self,
718        to_delete: u64,
719        last_block: BlockNumber,
720    ) -> ProviderResult<()> {
721        debug_assert_eq!(self.writer.user_header().segment(), StaticFileSegment::Receipts);
722        self.queue_prune(to_delete, Some(last_block))
723    }
724
725    /// Adds an instruction to prune `to_delete` headers during commit.
726    pub fn prune_headers(&mut self, to_delete: u64) -> ProviderResult<()> {
727        debug_assert_eq!(self.writer.user_header().segment(), StaticFileSegment::Headers);
728        self.queue_prune(to_delete, None)
729    }
730
731    /// Adds an instruction to prune `to_delete` bloc_ meta rows during commit.
732    pub fn prune_block_meta(&mut self, to_delete: u64) -> ProviderResult<()> {
733        debug_assert_eq!(self.writer.user_header().segment(), StaticFileSegment::BlockMeta);
734        self.queue_prune(to_delete, None)
735    }
736
737    /// Adds an instruction to prune `to_delete` elements during commit.
738    ///
739    /// Note: `last_block` refers to the block the unwinds ends at if dealing with transaction-based
740    /// data.
741    fn queue_prune(
742        &mut self,
743        to_delete: u64,
744        last_block: Option<BlockNumber>,
745    ) -> ProviderResult<()> {
746        self.ensure_no_queued_prune()?;
747        self.prune_on_commit = Some((to_delete, last_block));
748        Ok(())
749    }
750
751    /// Returns Error if there is a pruning instruction that needs to be applied.
752    fn ensure_no_queued_prune(&self) -> ProviderResult<()> {
753        if self.prune_on_commit.is_some() {
754            return Err(ProviderError::other(StaticFileWriterError::new(
755                "Pruning should be committed before appending or pruning more data",
756            )));
757        }
758        Ok(())
759    }
760
761    /// Removes the last `to_delete` transactions from the data file.
762    fn prune_transaction_data(
763        &mut self,
764        to_delete: u64,
765        last_block: BlockNumber,
766    ) -> ProviderResult<()> {
767        let start = Instant::now();
768
769        debug_assert!(self.writer.user_header().segment() == StaticFileSegment::Transactions);
770
771        self.truncate(to_delete, Some(last_block))?;
772
773        if let Some(metrics) = &self.metrics {
774            metrics.record_segment_operation(
775                StaticFileSegment::Transactions,
776                StaticFileProviderOperation::Prune,
777                Some(start.elapsed()),
778            );
779        }
780
781        Ok(())
782    }
783
784    /// Prunes the last `to_delete` receipts from the data file.
785    fn prune_receipt_data(
786        &mut self,
787        to_delete: u64,
788        last_block: BlockNumber,
789    ) -> ProviderResult<()> {
790        let start = Instant::now();
791
792        debug_assert!(self.writer.user_header().segment() == StaticFileSegment::Receipts);
793
794        self.truncate(to_delete, Some(last_block))?;
795
796        if let Some(metrics) = &self.metrics {
797            metrics.record_segment_operation(
798                StaticFileSegment::Receipts,
799                StaticFileProviderOperation::Prune,
800                Some(start.elapsed()),
801            );
802        }
803
804        Ok(())
805    }
806
807    /// Prunes the last `to_delete` headers from the data file.
808    fn prune_header_data(&mut self, to_delete: u64) -> ProviderResult<()> {
809        let start = Instant::now();
810
811        debug_assert!(self.writer.user_header().segment() == StaticFileSegment::Headers);
812
813        self.truncate(to_delete, None)?;
814
815        if let Some(metrics) = &self.metrics {
816            metrics.record_segment_operation(
817                StaticFileSegment::Headers,
818                StaticFileProviderOperation::Prune,
819                Some(start.elapsed()),
820            );
821        }
822
823        Ok(())
824    }
825
826    fn reader(&self) -> StaticFileProvider<N> {
827        Self::upgrade_provider_to_strong_reference(&self.reader)
828    }
829
830    /// Upgrades a weak reference of [`StaticFileProviderInner`] to a strong reference
831    /// [`StaticFileProvider`].
832    ///
833    /// # Panics
834    ///
835    /// Panics if the parent [`StaticFileProvider`] is fully dropped while the child writer is still
836    /// active. In reality, it's impossible to detach the [`StaticFileProviderRW`] from the
837    /// [`StaticFileProvider`].
838    fn upgrade_provider_to_strong_reference(
839        provider: &Weak<StaticFileProviderInner<N>>,
840    ) -> StaticFileProvider<N> {
841        provider.upgrade().map(StaticFileProvider).expect("StaticFileProvider is dropped")
842    }
843
844    /// Helper function to access [`SegmentHeader`].
845    pub const fn user_header(&self) -> &SegmentHeader {
846        self.writer.user_header()
847    }
848
849    /// Helper function to access a mutable reference to [`SegmentHeader`].
850    pub const fn user_header_mut(&mut self) -> &mut SegmentHeader {
851        self.writer.user_header_mut()
852    }
853
854    /// Helper function to override block range for testing.
855    #[cfg(any(test, feature = "test-utils"))]
856    pub const fn set_block_range(&mut self, block_range: std::ops::RangeInclusive<BlockNumber>) {
857        self.writer.user_header_mut().set_block_range(*block_range.start(), *block_range.end())
858    }
859
860    /// Helper function to override block range for testing.
861    #[cfg(any(test, feature = "test-utils"))]
862    pub const fn inner(&mut self) -> &mut NippyJarWriter<SegmentHeader> {
863        &mut self.writer
864    }
865}
866
867fn create_jar(
868    segment: StaticFileSegment,
869    path: &Path,
870    expected_block_range: SegmentRangeInclusive,
871) -> NippyJar<SegmentHeader> {
872    let mut jar = NippyJar::new(
873        segment.columns(),
874        path,
875        SegmentHeader::new(expected_block_range, None, None, segment),
876    );
877
878    // Transaction and Receipt already have the compression scheme used natively in its encoding.
879    // (zstd-dictionary)
880    if segment.is_headers() {
881        jar = jar.with_lz4();
882    }
883
884    jar
885}