reth_provider/providers/static_file/
writer.rs

1use super::{
2    manager::StaticFileProviderInner, metrics::StaticFileProviderMetrics, StaticFileProvider,
3};
4use crate::providers::static_file::metrics::StaticFileProviderOperation;
5use alloy_consensus::BlockHeader;
6use alloy_primitives::{BlockHash, BlockNumber, TxNumber, U256};
7use parking_lot::{lock_api::RwLockWriteGuard, RawRwLock, RwLock};
8use reth_codecs::Compact;
9use reth_db_api::models::{
10    CompactU256, StoredBlockBodyIndices, StoredBlockOmmers, StoredBlockWithdrawals,
11};
12use reth_nippy_jar::{NippyJar, NippyJarError, NippyJarWriter};
13use reth_node_types::NodePrimitives;
14use reth_primitives::{
15    static_file::{SegmentHeader, SegmentRangeInclusive},
16    StaticFileSegment,
17};
18use reth_storage_errors::provider::{ProviderError, ProviderResult, StaticFileWriterError};
19use std::{
20    borrow::Borrow,
21    fmt::Debug,
22    path::{Path, PathBuf},
23    sync::{Arc, Weak},
24    time::Instant,
25};
26use tracing::debug;
27
28/// Static file writers for every known [`StaticFileSegment`].
29///
30/// WARNING: Trying to use more than one writer for the same segment type **will result in a
31/// deadlock**.
32#[derive(Debug)]
33pub(crate) struct StaticFileWriters<N> {
34    headers: RwLock<Option<StaticFileProviderRW<N>>>,
35    transactions: RwLock<Option<StaticFileProviderRW<N>>>,
36    receipts: RwLock<Option<StaticFileProviderRW<N>>>,
37    block_meta: RwLock<Option<StaticFileProviderRW<N>>>,
38}
39
40impl<N> Default for StaticFileWriters<N> {
41    fn default() -> Self {
42        Self {
43            headers: Default::default(),
44            transactions: Default::default(),
45            receipts: Default::default(),
46            block_meta: Default::default(),
47        }
48    }
49}
50
51impl<N: NodePrimitives> StaticFileWriters<N> {
52    pub(crate) fn get_or_create(
53        &self,
54        segment: StaticFileSegment,
55        create_fn: impl FnOnce() -> ProviderResult<StaticFileProviderRW<N>>,
56    ) -> ProviderResult<StaticFileProviderRWRefMut<'_, N>> {
57        let mut write_guard = match segment {
58            StaticFileSegment::Headers => self.headers.write(),
59            StaticFileSegment::Transactions => self.transactions.write(),
60            StaticFileSegment::Receipts => self.receipts.write(),
61            StaticFileSegment::BlockMeta => self.block_meta.write(),
62        };
63
64        if write_guard.is_none() {
65            *write_guard = Some(create_fn()?);
66        }
67
68        Ok(StaticFileProviderRWRefMut(write_guard))
69    }
70
71    pub(crate) fn commit(&self) -> ProviderResult<()> {
72        for writer_lock in [&self.headers, &self.transactions, &self.receipts] {
73            let mut writer = writer_lock.write();
74            if let Some(writer) = writer.as_mut() {
75                writer.commit()?;
76            }
77        }
78        Ok(())
79    }
80}
81
82/// Mutable reference to a [`StaticFileProviderRW`] behind a [`RwLockWriteGuard`].
83#[derive(Debug)]
84pub struct StaticFileProviderRWRefMut<'a, N>(
85    pub(crate) RwLockWriteGuard<'a, RawRwLock, Option<StaticFileProviderRW<N>>>,
86);
87
88impl<N> std::ops::DerefMut for StaticFileProviderRWRefMut<'_, N> {
89    fn deref_mut(&mut self) -> &mut Self::Target {
90        // This is always created by [`StaticFileWriters::get_or_create`]
91        self.0.as_mut().expect("static file writer provider should be init")
92    }
93}
94
95impl<N> std::ops::Deref for StaticFileProviderRWRefMut<'_, N> {
96    type Target = StaticFileProviderRW<N>;
97
98    fn deref(&self) -> &Self::Target {
99        // This is always created by [`StaticFileWriters::get_or_create`]
100        self.0.as_ref().expect("static file writer provider should be init")
101    }
102}
103
104#[derive(Debug)]
105/// Extends `StaticFileProvider` with writing capabilities
106pub struct StaticFileProviderRW<N> {
107    /// Reference back to the provider. We need [Weak] here because [`StaticFileProviderRW`] is
108    /// stored in a [`dashmap::DashMap`] inside the parent [`StaticFileProvider`].which is an
109    /// [Arc]. If we were to use an [Arc] here, we would create a reference cycle.
110    reader: Weak<StaticFileProviderInner<N>>,
111    /// A [`NippyJarWriter`] instance.
112    writer: NippyJarWriter<SegmentHeader>,
113    /// Path to opened file.
114    data_path: PathBuf,
115    /// Reusable buffer for encoding appended data.
116    buf: Vec<u8>,
117    /// Metrics.
118    metrics: Option<Arc<StaticFileProviderMetrics>>,
119    /// On commit, does the instructed pruning: number of lines, and if it applies, the last block
120    /// it ends at.
121    prune_on_commit: Option<(u64, Option<BlockNumber>)>,
122}
123
124impl<N: NodePrimitives> StaticFileProviderRW<N> {
125    /// Creates a new [`StaticFileProviderRW`] for a [`StaticFileSegment`].
126    ///
127    /// Before use, transaction based segments should ensure the block end range is the expected
128    /// one, and heal if not. For more check `Self::ensure_end_range_consistency`.
129    pub fn new(
130        segment: StaticFileSegment,
131        block: BlockNumber,
132        reader: Weak<StaticFileProviderInner<N>>,
133        metrics: Option<Arc<StaticFileProviderMetrics>>,
134    ) -> ProviderResult<Self> {
135        let (writer, data_path) = Self::open(segment, block, reader.clone(), metrics.clone())?;
136        let mut writer = Self {
137            writer,
138            data_path,
139            buf: Vec::with_capacity(100),
140            reader,
141            metrics,
142            prune_on_commit: None,
143        };
144
145        writer.ensure_end_range_consistency()?;
146
147        Ok(writer)
148    }
149
150    fn open(
151        segment: StaticFileSegment,
152        block: u64,
153        reader: Weak<StaticFileProviderInner<N>>,
154        metrics: Option<Arc<StaticFileProviderMetrics>>,
155    ) -> ProviderResult<(NippyJarWriter<SegmentHeader>, PathBuf)> {
156        let start = Instant::now();
157
158        let static_file_provider = Self::upgrade_provider_to_strong_reference(&reader);
159
160        let block_range = static_file_provider.find_fixed_range(block);
161        let (jar, path) = match static_file_provider.get_segment_provider_from_block(
162            segment,
163            block_range.start(),
164            None,
165        ) {
166            Ok(provider) => (
167                NippyJar::load(provider.data_path()).map_err(ProviderError::other)?,
168                provider.data_path().into(),
169            ),
170            Err(ProviderError::MissingStaticFileBlock(_, _)) => {
171                let path = static_file_provider.directory().join(segment.filename(&block_range));
172                (create_jar(segment, &path, block_range), path)
173            }
174            Err(err) => return Err(err),
175        };
176
177        let result = match NippyJarWriter::new(jar) {
178            Ok(writer) => Ok((writer, path)),
179            Err(NippyJarError::FrozenJar) => {
180                // This static file has been frozen, so we should
181                Err(ProviderError::FinalizedStaticFile(segment, block))
182            }
183            Err(e) => Err(ProviderError::other(e)),
184        }?;
185
186        if let Some(metrics) = &metrics {
187            metrics.record_segment_operation(
188                segment,
189                StaticFileProviderOperation::OpenWriter,
190                Some(start.elapsed()),
191            );
192        }
193
194        Ok(result)
195    }
196
197    /// If a file level healing happens, we need to update the end range on the
198    /// [`SegmentHeader`].
199    ///
200    /// However, for transaction based segments, the block end range has to be found and healed
201    /// externally.
202    ///
203    /// Check [`reth_nippy_jar::NippyJarChecker`] &
204    /// [`NippyJarWriter`] for more on healing.
205    fn ensure_end_range_consistency(&mut self) -> ProviderResult<()> {
206        // If we have lost rows (in this run or previous), we need to update the [SegmentHeader].
207        let expected_rows = if self.user_header().segment().is_headers() {
208            self.user_header().block_len().unwrap_or_default()
209        } else {
210            self.user_header().tx_len().unwrap_or_default()
211        };
212        let pruned_rows = expected_rows - self.writer.rows() as u64;
213        if pruned_rows > 0 {
214            self.user_header_mut().prune(pruned_rows);
215        }
216
217        self.writer.commit().map_err(ProviderError::other)?;
218
219        // Updates the [SnapshotProvider] manager
220        self.update_index()?;
221        Ok(())
222    }
223
224    /// Commits configuration changes to disk and updates the reader index with the new changes.
225    pub fn commit(&mut self) -> ProviderResult<()> {
226        let start = Instant::now();
227
228        // Truncates the data file if instructed to.
229        if let Some((to_delete, last_block_number)) = self.prune_on_commit.take() {
230            match self.writer.user_header().segment() {
231                StaticFileSegment::Headers => self.prune_header_data(to_delete)?,
232                StaticFileSegment::Transactions => self
233                    .prune_transaction_data(to_delete, last_block_number.expect("should exist"))?,
234                StaticFileSegment::Receipts => {
235                    self.prune_receipt_data(to_delete, last_block_number.expect("should exist"))?
236                }
237                StaticFileSegment::BlockMeta => todo!(),
238            }
239        }
240
241        if self.writer.is_dirty() {
242            // Commits offsets and new user_header to disk
243            self.writer.commit().map_err(ProviderError::other)?;
244
245            if let Some(metrics) = &self.metrics {
246                metrics.record_segment_operation(
247                    self.writer.user_header().segment(),
248                    StaticFileProviderOperation::CommitWriter,
249                    Some(start.elapsed()),
250                );
251            }
252
253            debug!(
254                target: "provider::static_file",
255                segment = ?self.writer.user_header().segment(),
256                path = ?self.data_path,
257                duration = ?start.elapsed(),
258                "Commit"
259            );
260
261            self.update_index()?;
262        }
263
264        Ok(())
265    }
266
267    /// Commits configuration changes to disk and updates the reader index with the new changes.
268    ///
269    /// CAUTION: does not call `sync_all` on the files.
270    #[cfg(feature = "test-utils")]
271    pub fn commit_without_sync_all(&mut self) -> ProviderResult<()> {
272        let start = Instant::now();
273
274        // Commits offsets and new user_header to disk
275        self.writer.commit_without_sync_all().map_err(ProviderError::other)?;
276
277        if let Some(metrics) = &self.metrics {
278            metrics.record_segment_operation(
279                self.writer.user_header().segment(),
280                StaticFileProviderOperation::CommitWriter,
281                Some(start.elapsed()),
282            );
283        }
284
285        debug!(
286            target: "provider::static_file",
287            segment = ?self.writer.user_header().segment(),
288            path = ?self.data_path,
289            duration = ?start.elapsed(),
290            "Commit"
291        );
292
293        self.update_index()?;
294
295        Ok(())
296    }
297
298    /// Updates the `self.reader` internal index.
299    fn update_index(&self) -> ProviderResult<()> {
300        // We find the maximum block of the segment by checking this writer's last block.
301        //
302        // However if there's no block range (because there's no data), we try to calculate it by
303        // subtracting 1 from the expected block start, resulting on the last block of the
304        // previous file.
305        //
306        // If that expected block start is 0, then it means that there's no actual block data, and
307        // there's no block data in static files.
308        let segment_max_block = self
309            .writer
310            .user_header()
311            .block_range()
312            .as_ref()
313            .map(|block_range| block_range.end())
314            .or_else(|| {
315                (self.writer.user_header().expected_block_start() > 0)
316                    .then(|| self.writer.user_header().expected_block_start() - 1)
317            });
318
319        self.reader().update_index(self.writer.user_header().segment(), segment_max_block)
320    }
321
322    /// Allows to increment the [`SegmentHeader`] end block. It will commit the current static file,
323    /// and create the next one if we are past the end range.
324    ///
325    /// Returns the current [`BlockNumber`] as seen in the static file.
326    pub fn increment_block(&mut self, expected_block_number: BlockNumber) -> ProviderResult<()> {
327        let segment = self.writer.user_header().segment();
328
329        self.check_next_block_number(expected_block_number)?;
330
331        let start = Instant::now();
332        if let Some(last_block) = self.writer.user_header().block_end() {
333            // We have finished the previous static file and must freeze it
334            if last_block == self.writer.user_header().expected_block_end() {
335                // Commits offsets and new user_header to disk
336                self.commit()?;
337
338                // Opens the new static file
339                let (writer, data_path) =
340                    Self::open(segment, last_block + 1, self.reader.clone(), self.metrics.clone())?;
341                self.writer = writer;
342                self.data_path = data_path;
343
344                *self.writer.user_header_mut() = SegmentHeader::new(
345                    self.reader().find_fixed_range(last_block + 1),
346                    None,
347                    None,
348                    segment,
349                );
350            }
351        }
352
353        self.writer.user_header_mut().increment_block();
354        if let Some(metrics) = &self.metrics {
355            metrics.record_segment_operation(
356                segment,
357                StaticFileProviderOperation::IncrementBlock,
358                Some(start.elapsed()),
359            );
360        }
361
362        Ok(())
363    }
364
365    /// Verifies if the incoming block number matches the next expected block number
366    /// for a static file. This ensures data continuity when adding new blocks.
367    fn check_next_block_number(&self, expected_block_number: u64) -> ProviderResult<()> {
368        // The next static file block number can be found by checking the one after block_end.
369        // However if it's a new file that hasn't been added any data, its block range will actually
370        // be None. In that case, the next block will be found on `expected_block_start`.
371        let next_static_file_block = self
372            .writer
373            .user_header()
374            .block_end()
375            .map(|b| b + 1)
376            .unwrap_or_else(|| self.writer.user_header().expected_block_start());
377
378        if expected_block_number != next_static_file_block {
379            return Err(ProviderError::UnexpectedStaticFileBlockNumber(
380                self.writer.user_header().segment(),
381                expected_block_number,
382                next_static_file_block,
383            ))
384        }
385        Ok(())
386    }
387
388    /// Truncates a number of rows from disk. It deletes and loads an older static file if block
389    /// goes beyond the start of the current block range.
390    ///
391    /// **`last_block`** should be passed only with transaction based segments.
392    ///
393    /// # Note
394    /// Commits to the configuration file at the end.
395    fn truncate(&mut self, num_rows: u64, last_block: Option<u64>) -> ProviderResult<()> {
396        let mut remaining_rows = num_rows;
397        let segment = self.writer.user_header().segment();
398        while remaining_rows > 0 {
399            let len = if segment.is_block_based() {
400                self.writer.user_header().block_len().unwrap_or_default()
401            } else {
402                self.writer.user_header().tx_len().unwrap_or_default()
403            };
404
405            if remaining_rows >= len {
406                // If there's more rows to delete than this static file contains, then just
407                // delete the whole file and go to the next static file
408                let block_start = self.writer.user_header().expected_block_start();
409
410                // We only delete the file if it's NOT the first static file AND:
411                // * it's a Header segment  OR
412                // * it's a tx-based segment AND `last_block` is lower than the first block of this
413                //   file's block range. Otherwise, having no rows simply means that this block
414                //   range has no transactions, but the file should remain.
415                if block_start != 0 &&
416                    (segment.is_headers() || last_block.is_some_and(|b| b < block_start))
417                {
418                    self.delete_current_and_open_previous()?;
419                } else {
420                    // Update `SegmentHeader`
421                    self.writer.user_header_mut().prune(len);
422                    self.writer.prune_rows(len as usize).map_err(ProviderError::other)?;
423                    break
424                }
425
426                remaining_rows -= len;
427            } else {
428                // Update `SegmentHeader`
429                self.writer.user_header_mut().prune(remaining_rows);
430
431                // Truncate data
432                self.writer.prune_rows(remaining_rows as usize).map_err(ProviderError::other)?;
433                remaining_rows = 0;
434            }
435        }
436
437        // Only Transactions and Receipts
438        if let Some(last_block) = last_block {
439            let mut expected_block_start = self.writer.user_header().expected_block_start();
440
441            if num_rows == 0 {
442                // Edge case for when we are unwinding a chain of empty blocks that goes across
443                // files, and therefore, the only reference point to know which file
444                // we are supposed to be at is `last_block`.
445                while last_block < expected_block_start {
446                    self.delete_current_and_open_previous()?;
447                    expected_block_start = self.writer.user_header().expected_block_start();
448                }
449            }
450            self.writer.user_header_mut().set_block_range(expected_block_start, last_block);
451        }
452
453        // Commits new changes to disk.
454        self.commit()?;
455
456        Ok(())
457    }
458
459    /// Delete the current static file, and replace this provider writer with the previous static
460    /// file.
461    fn delete_current_and_open_previous(&mut self) -> Result<(), ProviderError> {
462        let current_path = self.data_path.clone();
463        let (previous_writer, data_path) = Self::open(
464            self.user_header().segment(),
465            self.writer.user_header().expected_block_start() - 1,
466            self.reader.clone(),
467            self.metrics.clone(),
468        )?;
469        self.writer = previous_writer;
470        self.writer.set_dirty();
471        self.data_path = data_path;
472        NippyJar::<SegmentHeader>::load(&current_path)
473            .map_err(ProviderError::other)?
474            .delete()
475            .map_err(ProviderError::other)?;
476        Ok(())
477    }
478
479    /// Appends column to static file.
480    fn append_column<T: Compact>(&mut self, column: T) -> ProviderResult<()> {
481        self.buf.clear();
482        column.to_compact(&mut self.buf);
483
484        self.writer.append_column(Some(Ok(&self.buf))).map_err(ProviderError::other)?;
485        Ok(())
486    }
487
488    /// Appends to tx number-based static file.
489    ///
490    /// Returns the current [`TxNumber`] as seen in the static file.
491    fn append_with_tx_number<V: Compact>(
492        &mut self,
493        tx_num: TxNumber,
494        value: V,
495    ) -> ProviderResult<()> {
496        if let Some(range) = self.writer.user_header().tx_range() {
497            let next_tx = range.end() + 1;
498            if next_tx != tx_num {
499                return Err(ProviderError::UnexpectedStaticFileTxNumber(
500                    self.writer.user_header().segment(),
501                    tx_num,
502                    next_tx,
503                ))
504            }
505            self.writer.user_header_mut().increment_tx();
506        } else {
507            self.writer.user_header_mut().set_tx_range(tx_num, tx_num);
508        }
509
510        self.append_column(value)?;
511
512        Ok(())
513    }
514
515    /// Appends header to static file.
516    ///
517    /// It **CALLS** `increment_block()` since the number of headers is equal to the number of
518    /// blocks.
519    ///
520    /// Returns the current [`BlockNumber`] as seen in the static file.
521    pub fn append_header(
522        &mut self,
523        header: &N::BlockHeader,
524        total_difficulty: U256,
525        hash: &BlockHash,
526    ) -> ProviderResult<()>
527    where
528        N::BlockHeader: Compact,
529    {
530        let start = Instant::now();
531        self.ensure_no_queued_prune()?;
532
533        debug_assert!(self.writer.user_header().segment() == StaticFileSegment::Headers);
534
535        self.increment_block(header.number())?;
536
537        self.append_column(header)?;
538        self.append_column(CompactU256::from(total_difficulty))?;
539        self.append_column(hash)?;
540
541        if let Some(metrics) = &self.metrics {
542            metrics.record_segment_operation(
543                StaticFileSegment::Headers,
544                StaticFileProviderOperation::Append,
545                Some(start.elapsed()),
546            );
547        }
548
549        Ok(())
550    }
551
552    /// Appends [`StoredBlockBodyIndices`], [`StoredBlockOmmers`] and [`StoredBlockWithdrawals`] to
553    /// static file.
554    ///
555    /// It **CALLS** `increment_block()` since it's a block based segment.
556    pub fn append_eth_block_meta(
557        &mut self,
558        body_indices: &StoredBlockBodyIndices,
559        ommers: &StoredBlockOmmers<N::BlockHeader>,
560        withdrawals: &StoredBlockWithdrawals,
561        expected_block_number: BlockNumber,
562    ) -> ProviderResult<()>
563    where
564        N::BlockHeader: Compact,
565    {
566        self.append_block_meta(body_indices, ommers, withdrawals, expected_block_number)
567    }
568
569    /// Appends [`StoredBlockBodyIndices`] and any other two arbitrary types belonging to the block
570    /// body to static file.
571    ///
572    /// It **CALLS** `increment_block()` since it's a block based segment.
573    pub fn append_block_meta<F1, F2>(
574        &mut self,
575        body_indices: &StoredBlockBodyIndices,
576        field1: &F1,
577        field2: &F2,
578        expected_block_number: BlockNumber,
579    ) -> ProviderResult<()>
580    where
581        N::BlockHeader: Compact,
582        F1: Compact,
583        F2: Compact,
584    {
585        let start = Instant::now();
586        self.ensure_no_queued_prune()?;
587
588        debug_assert!(self.writer.user_header().segment() == StaticFileSegment::BlockMeta);
589
590        self.increment_block(expected_block_number)?;
591
592        self.append_column(body_indices)?;
593        self.append_column(field1)?;
594        self.append_column(field2)?;
595
596        if let Some(metrics) = &self.metrics {
597            metrics.record_segment_operation(
598                StaticFileSegment::BlockMeta,
599                StaticFileProviderOperation::Append,
600                Some(start.elapsed()),
601            );
602        }
603
604        Ok(())
605    }
606
607    /// Appends transaction to static file.
608    ///
609    /// It **DOES NOT CALL** `increment_block()`, it should be handled elsewhere. There might be
610    /// empty blocks and this function wouldn't be called.
611    ///
612    /// Returns the current [`TxNumber`] as seen in the static file.
613    pub fn append_transaction(&mut self, tx_num: TxNumber, tx: &N::SignedTx) -> ProviderResult<()>
614    where
615        N::SignedTx: Compact,
616    {
617        let start = Instant::now();
618        self.ensure_no_queued_prune()?;
619
620        debug_assert!(self.writer.user_header().segment() == StaticFileSegment::Transactions);
621        self.append_with_tx_number(tx_num, tx)?;
622
623        if let Some(metrics) = &self.metrics {
624            metrics.record_segment_operation(
625                StaticFileSegment::Transactions,
626                StaticFileProviderOperation::Append,
627                Some(start.elapsed()),
628            );
629        }
630
631        Ok(())
632    }
633
634    /// Appends receipt to static file.
635    ///
636    /// It **DOES NOT** call `increment_block()`, it should be handled elsewhere. There might be
637    /// empty blocks and this function wouldn't be called.
638    ///
639    /// Returns the current [`TxNumber`] as seen in the static file.
640    pub fn append_receipt(&mut self, tx_num: TxNumber, receipt: &N::Receipt) -> ProviderResult<()>
641    where
642        N::Receipt: Compact,
643    {
644        let start = Instant::now();
645        self.ensure_no_queued_prune()?;
646
647        debug_assert!(self.writer.user_header().segment() == StaticFileSegment::Receipts);
648        self.append_with_tx_number(tx_num, receipt)?;
649
650        if let Some(metrics) = &self.metrics {
651            metrics.record_segment_operation(
652                StaticFileSegment::Receipts,
653                StaticFileProviderOperation::Append,
654                Some(start.elapsed()),
655            );
656        }
657
658        Ok(())
659    }
660
661    /// Appends multiple receipts to the static file.
662    ///
663    /// Returns the current [`TxNumber`] as seen in the static file, if any.
664    pub fn append_receipts<I, R>(&mut self, receipts: I) -> ProviderResult<Option<TxNumber>>
665    where
666        I: Iterator<Item = Result<(TxNumber, R), ProviderError>>,
667        R: Borrow<N::Receipt>,
668        N::Receipt: Compact,
669    {
670        debug_assert!(self.writer.user_header().segment() == StaticFileSegment::Receipts);
671
672        let mut receipts_iter = receipts.into_iter().peekable();
673        // If receipts are empty, we can simply return None
674        if receipts_iter.peek().is_none() {
675            return Ok(None);
676        }
677
678        let start = Instant::now();
679        self.ensure_no_queued_prune()?;
680
681        // At this point receipts contains at least one receipt, so this would be overwritten.
682        let mut tx_number = 0;
683        let mut count: u64 = 0;
684
685        for receipt_result in receipts_iter {
686            let (tx_num, receipt) = receipt_result?;
687            self.append_with_tx_number(tx_num, receipt.borrow())?;
688            tx_number = tx_num;
689            count += 1;
690        }
691
692        if let Some(metrics) = &self.metrics {
693            metrics.record_segment_operations(
694                StaticFileSegment::Receipts,
695                StaticFileProviderOperation::Append,
696                count,
697                Some(start.elapsed()),
698            );
699        }
700
701        Ok(Some(tx_number))
702    }
703
704    /// Adds an instruction to prune `to_delete`transactions during commit.
705    ///
706    /// Note: `last_block` refers to the block the unwinds ends at.
707    pub fn prune_transactions(
708        &mut self,
709        to_delete: u64,
710        last_block: BlockNumber,
711    ) -> ProviderResult<()> {
712        debug_assert_eq!(self.writer.user_header().segment(), StaticFileSegment::Transactions);
713        self.queue_prune(to_delete, Some(last_block))
714    }
715
716    /// Adds an instruction to prune `to_delete` receipts during commit.
717    ///
718    /// Note: `last_block` refers to the block the unwinds ends at.
719    pub fn prune_receipts(
720        &mut self,
721        to_delete: u64,
722        last_block: BlockNumber,
723    ) -> ProviderResult<()> {
724        debug_assert_eq!(self.writer.user_header().segment(), StaticFileSegment::Receipts);
725        self.queue_prune(to_delete, Some(last_block))
726    }
727
728    /// Adds an instruction to prune `to_delete` headers during commit.
729    pub fn prune_headers(&mut self, to_delete: u64) -> ProviderResult<()> {
730        debug_assert_eq!(self.writer.user_header().segment(), StaticFileSegment::Headers);
731        self.queue_prune(to_delete, None)
732    }
733
734    /// Adds an instruction to prune `to_delete` bloc_ meta rows during commit.
735    pub fn prune_block_meta(&mut self, to_delete: u64) -> ProviderResult<()> {
736        debug_assert_eq!(self.writer.user_header().segment(), StaticFileSegment::BlockMeta);
737        self.queue_prune(to_delete, None)
738    }
739
740    /// Adds an instruction to prune `to_delete` elements during commit.
741    ///
742    /// Note: `last_block` refers to the block the unwinds ends at if dealing with transaction-based
743    /// data.
744    fn queue_prune(
745        &mut self,
746        to_delete: u64,
747        last_block: Option<BlockNumber>,
748    ) -> ProviderResult<()> {
749        self.ensure_no_queued_prune()?;
750        self.prune_on_commit = Some((to_delete, last_block));
751        Ok(())
752    }
753
754    /// Returns Error if there is a pruning instruction that needs to be applied.
755    fn ensure_no_queued_prune(&self) -> ProviderResult<()> {
756        if self.prune_on_commit.is_some() {
757            return Err(ProviderError::other(StaticFileWriterError::new(
758                "Pruning should be committed before appending or pruning more data",
759            )));
760        }
761        Ok(())
762    }
763
764    /// Removes the last `to_delete` transactions from the data file.
765    fn prune_transaction_data(
766        &mut self,
767        to_delete: u64,
768        last_block: BlockNumber,
769    ) -> ProviderResult<()> {
770        let start = Instant::now();
771
772        debug_assert!(self.writer.user_header().segment() == StaticFileSegment::Transactions);
773
774        self.truncate(to_delete, Some(last_block))?;
775
776        if let Some(metrics) = &self.metrics {
777            metrics.record_segment_operation(
778                StaticFileSegment::Transactions,
779                StaticFileProviderOperation::Prune,
780                Some(start.elapsed()),
781            );
782        }
783
784        Ok(())
785    }
786
787    /// Prunes the last `to_delete` receipts from the data file.
788    fn prune_receipt_data(
789        &mut self,
790        to_delete: u64,
791        last_block: BlockNumber,
792    ) -> ProviderResult<()> {
793        let start = Instant::now();
794
795        debug_assert!(self.writer.user_header().segment() == StaticFileSegment::Receipts);
796
797        self.truncate(to_delete, Some(last_block))?;
798
799        if let Some(metrics) = &self.metrics {
800            metrics.record_segment_operation(
801                StaticFileSegment::Receipts,
802                StaticFileProviderOperation::Prune,
803                Some(start.elapsed()),
804            );
805        }
806
807        Ok(())
808    }
809
810    /// Prunes the last `to_delete` headers from the data file.
811    fn prune_header_data(&mut self, to_delete: u64) -> ProviderResult<()> {
812        let start = Instant::now();
813
814        debug_assert!(self.writer.user_header().segment() == StaticFileSegment::Headers);
815
816        self.truncate(to_delete, None)?;
817
818        if let Some(metrics) = &self.metrics {
819            metrics.record_segment_operation(
820                StaticFileSegment::Headers,
821                StaticFileProviderOperation::Prune,
822                Some(start.elapsed()),
823            );
824        }
825
826        Ok(())
827    }
828
829    fn reader(&self) -> StaticFileProvider<N> {
830        Self::upgrade_provider_to_strong_reference(&self.reader)
831    }
832
833    /// Upgrades a weak reference of [`StaticFileProviderInner`] to a strong reference
834    /// [`StaticFileProvider`].
835    ///
836    /// # Panics
837    ///
838    /// Panics if the parent [`StaticFileProvider`] is fully dropped while the child writer is still
839    /// active. In reality, it's impossible to detach the [`StaticFileProviderRW`] from the
840    /// [`StaticFileProvider`].
841    fn upgrade_provider_to_strong_reference(
842        provider: &Weak<StaticFileProviderInner<N>>,
843    ) -> StaticFileProvider<N> {
844        provider.upgrade().map(StaticFileProvider).expect("StaticFileProvider is dropped")
845    }
846
847    /// Helper function to access [`SegmentHeader`].
848    pub const fn user_header(&self) -> &SegmentHeader {
849        self.writer.user_header()
850    }
851
852    /// Helper function to access a mutable reference to [`SegmentHeader`].
853    pub fn user_header_mut(&mut self) -> &mut SegmentHeader {
854        self.writer.user_header_mut()
855    }
856
857    /// Helper function to override block range for testing.
858    #[cfg(any(test, feature = "test-utils"))]
859    pub fn set_block_range(&mut self, block_range: std::ops::RangeInclusive<BlockNumber>) {
860        self.writer.user_header_mut().set_block_range(*block_range.start(), *block_range.end())
861    }
862
863    /// Helper function to override block range for testing.
864    #[cfg(any(test, feature = "test-utils"))]
865    pub fn inner(&mut self) -> &mut NippyJarWriter<SegmentHeader> {
866        &mut self.writer
867    }
868}
869
870fn create_jar(
871    segment: StaticFileSegment,
872    path: &Path,
873    expected_block_range: SegmentRangeInclusive,
874) -> NippyJar<SegmentHeader> {
875    let mut jar = NippyJar::new(
876        segment.columns(),
877        path,
878        SegmentHeader::new(expected_block_range, None, None, segment),
879    );
880
881    // Transaction and Receipt already have the compression scheme used natively in its encoding.
882    // (zstd-dictionary)
883    if segment.is_headers() {
884        jar = jar.with_lz4();
885    }
886
887    jar
888}