reth_provider/providers/static_file/
writer.rs

1use super::{
2    manager::StaticFileProviderInner, metrics::StaticFileProviderMetrics, StaticFileProvider,
3};
4use crate::providers::static_file::metrics::StaticFileProviderOperation;
5use alloy_consensus::BlockHeader;
6use alloy_primitives::{BlockHash, BlockNumber, TxNumber, U256};
7use parking_lot::{lock_api::RwLockWriteGuard, RawRwLock, RwLock};
8use reth_codecs::Compact;
9use reth_db_api::models::CompactU256;
10use reth_nippy_jar::{NippyJar, NippyJarError, NippyJarWriter};
11use reth_node_types::NodePrimitives;
12use reth_static_file_types::{SegmentHeader, SegmentRangeInclusive, StaticFileSegment};
13use reth_storage_errors::provider::{ProviderError, ProviderResult, StaticFileWriterError};
14use std::{
15    borrow::Borrow,
16    fmt::Debug,
17    path::{Path, PathBuf},
18    sync::{Arc, Weak},
19    time::Instant,
20};
21use tracing::debug;
22
23/// Static file writers for every known [`StaticFileSegment`].
24///
25/// WARNING: Trying to use more than one writer for the same segment type **will result in a
26/// deadlock**.
27#[derive(Debug)]
28pub(crate) struct StaticFileWriters<N> {
29    headers: RwLock<Option<StaticFileProviderRW<N>>>,
30    transactions: RwLock<Option<StaticFileProviderRW<N>>>,
31    receipts: RwLock<Option<StaticFileProviderRW<N>>>,
32}
33
34impl<N> Default for StaticFileWriters<N> {
35    fn default() -> Self {
36        Self {
37            headers: Default::default(),
38            transactions: Default::default(),
39            receipts: Default::default(),
40        }
41    }
42}
43
44impl<N: NodePrimitives> StaticFileWriters<N> {
45    pub(crate) fn get_or_create(
46        &self,
47        segment: StaticFileSegment,
48        create_fn: impl FnOnce() -> ProviderResult<StaticFileProviderRW<N>>,
49    ) -> ProviderResult<StaticFileProviderRWRefMut<'_, N>> {
50        let mut write_guard = match segment {
51            StaticFileSegment::Headers => self.headers.write(),
52            StaticFileSegment::Transactions => self.transactions.write(),
53            StaticFileSegment::Receipts => self.receipts.write(),
54        };
55
56        if write_guard.is_none() {
57            *write_guard = Some(create_fn()?);
58        }
59
60        Ok(StaticFileProviderRWRefMut(write_guard))
61    }
62
63    pub(crate) fn commit(&self) -> ProviderResult<()> {
64        for writer_lock in [&self.headers, &self.transactions, &self.receipts] {
65            let mut writer = writer_lock.write();
66            if let Some(writer) = writer.as_mut() {
67                writer.commit()?;
68            }
69        }
70        Ok(())
71    }
72}
73
74/// Mutable reference to a [`StaticFileProviderRW`] behind a [`RwLockWriteGuard`].
75#[derive(Debug)]
76pub struct StaticFileProviderRWRefMut<'a, N>(
77    pub(crate) RwLockWriteGuard<'a, RawRwLock, Option<StaticFileProviderRW<N>>>,
78);
79
80impl<N> std::ops::DerefMut for StaticFileProviderRWRefMut<'_, N> {
81    fn deref_mut(&mut self) -> &mut Self::Target {
82        // This is always created by [`StaticFileWriters::get_or_create`]
83        self.0.as_mut().expect("static file writer provider should be init")
84    }
85}
86
87impl<N> std::ops::Deref for StaticFileProviderRWRefMut<'_, N> {
88    type Target = StaticFileProviderRW<N>;
89
90    fn deref(&self) -> &Self::Target {
91        // This is always created by [`StaticFileWriters::get_or_create`]
92        self.0.as_ref().expect("static file writer provider should be init")
93    }
94}
95
96#[derive(Debug)]
97/// Extends `StaticFileProvider` with writing capabilities
98pub struct StaticFileProviderRW<N> {
99    /// Reference back to the provider. We need [Weak] here because [`StaticFileProviderRW`] is
100    /// stored in a [`dashmap::DashMap`] inside the parent [`StaticFileProvider`].which is an
101    /// [Arc]. If we were to use an [Arc] here, we would create a reference cycle.
102    reader: Weak<StaticFileProviderInner<N>>,
103    /// A [`NippyJarWriter`] instance.
104    writer: NippyJarWriter<SegmentHeader>,
105    /// Path to opened file.
106    data_path: PathBuf,
107    /// Reusable buffer for encoding appended data.
108    buf: Vec<u8>,
109    /// Metrics.
110    metrics: Option<Arc<StaticFileProviderMetrics>>,
111    /// On commit, does the instructed pruning: number of lines, and if it applies, the last block
112    /// it ends at.
113    prune_on_commit: Option<(u64, Option<BlockNumber>)>,
114}
115
116impl<N: NodePrimitives> StaticFileProviderRW<N> {
117    /// Creates a new [`StaticFileProviderRW`] for a [`StaticFileSegment`].
118    ///
119    /// Before use, transaction based segments should ensure the block end range is the expected
120    /// one, and heal if not. For more check `Self::ensure_end_range_consistency`.
121    pub fn new(
122        segment: StaticFileSegment,
123        block: BlockNumber,
124        reader: Weak<StaticFileProviderInner<N>>,
125        metrics: Option<Arc<StaticFileProviderMetrics>>,
126    ) -> ProviderResult<Self> {
127        let (writer, data_path) = Self::open(segment, block, reader.clone(), metrics.clone())?;
128        let mut writer = Self {
129            writer,
130            data_path,
131            buf: Vec::with_capacity(100),
132            reader,
133            metrics,
134            prune_on_commit: None,
135        };
136
137        writer.ensure_end_range_consistency()?;
138
139        Ok(writer)
140    }
141
142    fn open(
143        segment: StaticFileSegment,
144        block: u64,
145        reader: Weak<StaticFileProviderInner<N>>,
146        metrics: Option<Arc<StaticFileProviderMetrics>>,
147    ) -> ProviderResult<(NippyJarWriter<SegmentHeader>, PathBuf)> {
148        let start = Instant::now();
149
150        let static_file_provider = Self::upgrade_provider_to_strong_reference(&reader);
151
152        let block_range = static_file_provider.find_fixed_range(block);
153        let (jar, path) = match static_file_provider.get_segment_provider_from_block(
154            segment,
155            block_range.start(),
156            None,
157        ) {
158            Ok(provider) => (
159                NippyJar::load(provider.data_path()).map_err(ProviderError::other)?,
160                provider.data_path().into(),
161            ),
162            Err(ProviderError::MissingStaticFileBlock(_, _)) => {
163                let path = static_file_provider.directory().join(segment.filename(&block_range));
164                (create_jar(segment, &path, block_range), path)
165            }
166            Err(err) => return Err(err),
167        };
168
169        let result = match NippyJarWriter::new(jar) {
170            Ok(writer) => Ok((writer, path)),
171            Err(NippyJarError::FrozenJar) => {
172                // This static file has been frozen, so we should
173                Err(ProviderError::FinalizedStaticFile(segment, block))
174            }
175            Err(e) => Err(ProviderError::other(e)),
176        }?;
177
178        if let Some(metrics) = &metrics {
179            metrics.record_segment_operation(
180                segment,
181                StaticFileProviderOperation::OpenWriter,
182                Some(start.elapsed()),
183            );
184        }
185
186        Ok(result)
187    }
188
189    /// If a file level healing happens, we need to update the end range on the
190    /// [`SegmentHeader`].
191    ///
192    /// However, for transaction based segments, the block end range has to be found and healed
193    /// externally.
194    ///
195    /// Check [`reth_nippy_jar::NippyJarChecker`] &
196    /// [`NippyJarWriter`] for more on healing.
197    fn ensure_end_range_consistency(&mut self) -> ProviderResult<()> {
198        // If we have lost rows (in this run or previous), we need to update the [SegmentHeader].
199        let expected_rows = if self.user_header().segment().is_headers() {
200            self.user_header().block_len().unwrap_or_default()
201        } else {
202            self.user_header().tx_len().unwrap_or_default()
203        };
204        let pruned_rows = expected_rows - self.writer.rows() as u64;
205        if pruned_rows > 0 {
206            self.user_header_mut().prune(pruned_rows);
207        }
208
209        self.writer.commit().map_err(ProviderError::other)?;
210
211        // Updates the [SnapshotProvider] manager
212        self.update_index()?;
213        Ok(())
214    }
215
216    /// Commits configuration changes to disk and updates the reader index with the new changes.
217    pub fn commit(&mut self) -> ProviderResult<()> {
218        let start = Instant::now();
219
220        // Truncates the data file if instructed to.
221        if let Some((to_delete, last_block_number)) = self.prune_on_commit.take() {
222            match self.writer.user_header().segment() {
223                StaticFileSegment::Headers => self.prune_header_data(to_delete)?,
224                StaticFileSegment::Transactions => self
225                    .prune_transaction_data(to_delete, last_block_number.expect("should exist"))?,
226                StaticFileSegment::Receipts => {
227                    self.prune_receipt_data(to_delete, last_block_number.expect("should exist"))?
228                }
229            }
230        }
231
232        if self.writer.is_dirty() {
233            // Commits offsets and new user_header to disk
234            self.writer.commit().map_err(ProviderError::other)?;
235
236            if let Some(metrics) = &self.metrics {
237                metrics.record_segment_operation(
238                    self.writer.user_header().segment(),
239                    StaticFileProviderOperation::CommitWriter,
240                    Some(start.elapsed()),
241                );
242            }
243
244            debug!(
245                target: "provider::static_file",
246                segment = ?self.writer.user_header().segment(),
247                path = ?self.data_path,
248                duration = ?start.elapsed(),
249                "Commit"
250            );
251
252            self.update_index()?;
253        }
254
255        Ok(())
256    }
257
258    /// Commits configuration changes to disk and updates the reader index with the new changes.
259    ///
260    /// CAUTION: does not call `sync_all` on the files.
261    #[cfg(feature = "test-utils")]
262    pub fn commit_without_sync_all(&mut self) -> ProviderResult<()> {
263        let start = Instant::now();
264
265        // Commits offsets and new user_header to disk
266        self.writer.commit_without_sync_all().map_err(ProviderError::other)?;
267
268        if let Some(metrics) = &self.metrics {
269            metrics.record_segment_operation(
270                self.writer.user_header().segment(),
271                StaticFileProviderOperation::CommitWriter,
272                Some(start.elapsed()),
273            );
274        }
275
276        debug!(
277            target: "provider::static_file",
278            segment = ?self.writer.user_header().segment(),
279            path = ?self.data_path,
280            duration = ?start.elapsed(),
281            "Commit"
282        );
283
284        self.update_index()?;
285
286        Ok(())
287    }
288
289    /// Updates the `self.reader` internal index.
290    fn update_index(&self) -> ProviderResult<()> {
291        // We find the maximum block of the segment by checking this writer's last block.
292        //
293        // However if there's no block range (because there's no data), we try to calculate it by
294        // subtracting 1 from the expected block start, resulting on the last block of the
295        // previous file.
296        //
297        // If that expected block start is 0, then it means that there's no actual block data, and
298        // there's no block data in static files.
299        let segment_max_block = self
300            .writer
301            .user_header()
302            .block_range()
303            .as_ref()
304            .map(|block_range| block_range.end())
305            .or_else(|| {
306                (self.writer.user_header().expected_block_start() > 0)
307                    .then(|| self.writer.user_header().expected_block_start() - 1)
308            });
309
310        self.reader().update_index(self.writer.user_header().segment(), segment_max_block)
311    }
312
313    /// Allows to increment the [`SegmentHeader`] end block. It will commit the current static file,
314    /// and create the next one if we are past the end range.
315    ///
316    /// Returns the current [`BlockNumber`] as seen in the static file.
317    pub fn increment_block(&mut self, expected_block_number: BlockNumber) -> ProviderResult<()> {
318        let segment = self.writer.user_header().segment();
319
320        self.check_next_block_number(expected_block_number)?;
321
322        let start = Instant::now();
323        if let Some(last_block) = self.writer.user_header().block_end() {
324            // We have finished the previous static file and must freeze it
325            if last_block == self.writer.user_header().expected_block_end() {
326                // Commits offsets and new user_header to disk
327                self.commit()?;
328
329                // Opens the new static file
330                let (writer, data_path) =
331                    Self::open(segment, last_block + 1, self.reader.clone(), self.metrics.clone())?;
332                self.writer = writer;
333                self.data_path = data_path;
334
335                *self.writer.user_header_mut() = SegmentHeader::new(
336                    self.reader().find_fixed_range(last_block + 1),
337                    None,
338                    None,
339                    segment,
340                );
341            }
342        }
343
344        self.writer.user_header_mut().increment_block();
345        if let Some(metrics) = &self.metrics {
346            metrics.record_segment_operation(
347                segment,
348                StaticFileProviderOperation::IncrementBlock,
349                Some(start.elapsed()),
350            );
351        }
352
353        Ok(())
354    }
355
356    /// Returns a block number that is one next to the current tip of static files.
357    pub fn next_block_number(&self) -> u64 {
358        // The next static file block number can be found by checking the one after block_end.
359        // However, if it's a new file that hasn't been added any data, its block range will
360        // actually be None. In that case, the next block will be found on `expected_block_start`.
361        self.writer
362            .user_header()
363            .block_end()
364            .map(|b| b + 1)
365            .unwrap_or_else(|| self.writer.user_header().expected_block_start())
366    }
367
368    /// Verifies if the incoming block number matches the next expected block number
369    /// for a static file. This ensures data continuity when adding new blocks.
370    fn check_next_block_number(&self, expected_block_number: u64) -> ProviderResult<()> {
371        let next_static_file_block = self.next_block_number();
372
373        if expected_block_number != next_static_file_block {
374            return Err(ProviderError::UnexpectedStaticFileBlockNumber(
375                self.writer.user_header().segment(),
376                expected_block_number,
377                next_static_file_block,
378            ))
379        }
380        Ok(())
381    }
382
383    /// Truncates a number of rows from disk. It deletes and loads an older static file if block
384    /// goes beyond the start of the current block range.
385    ///
386    /// **`last_block`** should be passed only with transaction based segments.
387    ///
388    /// # Note
389    /// Commits to the configuration file at the end.
390    fn truncate(&mut self, num_rows: u64, last_block: Option<u64>) -> ProviderResult<()> {
391        let mut remaining_rows = num_rows;
392        let segment = self.writer.user_header().segment();
393        while remaining_rows > 0 {
394            let len = if segment.is_block_based() {
395                self.writer.user_header().block_len().unwrap_or_default()
396            } else {
397                self.writer.user_header().tx_len().unwrap_or_default()
398            };
399
400            if remaining_rows >= len {
401                // If there's more rows to delete than this static file contains, then just
402                // delete the whole file and go to the next static file
403                let block_start = self.writer.user_header().expected_block_start();
404
405                // We only delete the file if it's NOT the first static file AND:
406                // * it's a Header segment  OR
407                // * it's a tx-based segment AND `last_block` is lower than the first block of this
408                //   file's block range. Otherwise, having no rows simply means that this block
409                //   range has no transactions, but the file should remain.
410                if block_start != 0 &&
411                    (segment.is_headers() || last_block.is_some_and(|b| b < block_start))
412                {
413                    self.delete_current_and_open_previous()?;
414                } else {
415                    // Update `SegmentHeader`
416                    self.writer.user_header_mut().prune(len);
417                    self.writer.prune_rows(len as usize).map_err(ProviderError::other)?;
418                    break
419                }
420
421                remaining_rows -= len;
422            } else {
423                // Update `SegmentHeader`
424                self.writer.user_header_mut().prune(remaining_rows);
425
426                // Truncate data
427                self.writer.prune_rows(remaining_rows as usize).map_err(ProviderError::other)?;
428                remaining_rows = 0;
429            }
430        }
431
432        // Only Transactions and Receipts
433        if let Some(last_block) = last_block {
434            let mut expected_block_start = self.writer.user_header().expected_block_start();
435
436            if num_rows == 0 {
437                // Edge case for when we are unwinding a chain of empty blocks that goes across
438                // files, and therefore, the only reference point to know which file
439                // we are supposed to be at is `last_block`.
440                while last_block < expected_block_start {
441                    self.delete_current_and_open_previous()?;
442                    expected_block_start = self.writer.user_header().expected_block_start();
443                }
444            }
445            self.writer.user_header_mut().set_block_range(expected_block_start, last_block);
446        }
447
448        // Commits new changes to disk.
449        self.commit()?;
450
451        Ok(())
452    }
453
454    /// Delete the current static file, and replace this provider writer with the previous static
455    /// file.
456    fn delete_current_and_open_previous(&mut self) -> Result<(), ProviderError> {
457        let current_path = self.data_path.clone();
458        let (previous_writer, data_path) = Self::open(
459            self.user_header().segment(),
460            self.writer.user_header().expected_block_start() - 1,
461            self.reader.clone(),
462            self.metrics.clone(),
463        )?;
464        self.writer = previous_writer;
465        self.writer.set_dirty();
466        self.data_path = data_path;
467        NippyJar::<SegmentHeader>::load(&current_path)
468            .map_err(ProviderError::other)?
469            .delete()
470            .map_err(ProviderError::other)?;
471        Ok(())
472    }
473
474    /// Appends column to static file.
475    fn append_column<T: Compact>(&mut self, column: T) -> ProviderResult<()> {
476        self.buf.clear();
477        column.to_compact(&mut self.buf);
478
479        self.writer.append_column(Some(Ok(&self.buf))).map_err(ProviderError::other)?;
480        Ok(())
481    }
482
483    /// Appends to tx number-based static file.
484    ///
485    /// Returns the current [`TxNumber`] as seen in the static file.
486    fn append_with_tx_number<V: Compact>(
487        &mut self,
488        tx_num: TxNumber,
489        value: V,
490    ) -> ProviderResult<()> {
491        if let Some(range) = self.writer.user_header().tx_range() {
492            let next_tx = range.end() + 1;
493            if next_tx != tx_num {
494                return Err(ProviderError::UnexpectedStaticFileTxNumber(
495                    self.writer.user_header().segment(),
496                    tx_num,
497                    next_tx,
498                ))
499            }
500            self.writer.user_header_mut().increment_tx();
501        } else {
502            self.writer.user_header_mut().set_tx_range(tx_num, tx_num);
503        }
504
505        self.append_column(value)?;
506
507        Ok(())
508    }
509
510    /// Appends header to static file.
511    ///
512    /// It **CALLS** `increment_block()` since the number of headers is equal to the number of
513    /// blocks.
514    ///
515    /// Returns the current [`BlockNumber`] as seen in the static file.
516    pub fn append_header(
517        &mut self,
518        header: &N::BlockHeader,
519        total_difficulty: U256,
520        hash: &BlockHash,
521    ) -> ProviderResult<()>
522    where
523        N::BlockHeader: Compact,
524    {
525        let start = Instant::now();
526        self.ensure_no_queued_prune()?;
527
528        debug_assert!(self.writer.user_header().segment() == StaticFileSegment::Headers);
529
530        self.increment_block(header.number())?;
531
532        self.append_column(header)?;
533        self.append_column(CompactU256::from(total_difficulty))?;
534        self.append_column(hash)?;
535
536        if let Some(metrics) = &self.metrics {
537            metrics.record_segment_operation(
538                StaticFileSegment::Headers,
539                StaticFileProviderOperation::Append,
540                Some(start.elapsed()),
541            );
542        }
543
544        Ok(())
545    }
546
547    /// Appends transaction to static file.
548    ///
549    /// It **DOES NOT CALL** `increment_block()`, it should be handled elsewhere. There might be
550    /// empty blocks and this function wouldn't be called.
551    ///
552    /// Returns the current [`TxNumber`] as seen in the static file.
553    pub fn append_transaction(&mut self, tx_num: TxNumber, tx: &N::SignedTx) -> ProviderResult<()>
554    where
555        N::SignedTx: Compact,
556    {
557        let start = Instant::now();
558        self.ensure_no_queued_prune()?;
559
560        debug_assert!(self.writer.user_header().segment() == StaticFileSegment::Transactions);
561        self.append_with_tx_number(tx_num, tx)?;
562
563        if let Some(metrics) = &self.metrics {
564            metrics.record_segment_operation(
565                StaticFileSegment::Transactions,
566                StaticFileProviderOperation::Append,
567                Some(start.elapsed()),
568            );
569        }
570
571        Ok(())
572    }
573
574    /// Appends receipt to static file.
575    ///
576    /// It **DOES NOT** call `increment_block()`, it should be handled elsewhere. There might be
577    /// empty blocks and this function wouldn't be called.
578    ///
579    /// Returns the current [`TxNumber`] as seen in the static file.
580    pub fn append_receipt(&mut self, tx_num: TxNumber, receipt: &N::Receipt) -> ProviderResult<()>
581    where
582        N::Receipt: Compact,
583    {
584        let start = Instant::now();
585        self.ensure_no_queued_prune()?;
586
587        debug_assert!(self.writer.user_header().segment() == StaticFileSegment::Receipts);
588        self.append_with_tx_number(tx_num, receipt)?;
589
590        if let Some(metrics) = &self.metrics {
591            metrics.record_segment_operation(
592                StaticFileSegment::Receipts,
593                StaticFileProviderOperation::Append,
594                Some(start.elapsed()),
595            );
596        }
597
598        Ok(())
599    }
600
601    /// Appends multiple receipts to the static file.
602    ///
603    /// Returns the current [`TxNumber`] as seen in the static file, if any.
604    pub fn append_receipts<I, R>(&mut self, receipts: I) -> ProviderResult<Option<TxNumber>>
605    where
606        I: Iterator<Item = Result<(TxNumber, R), ProviderError>>,
607        R: Borrow<N::Receipt>,
608        N::Receipt: Compact,
609    {
610        debug_assert!(self.writer.user_header().segment() == StaticFileSegment::Receipts);
611
612        let mut receipts_iter = receipts.into_iter().peekable();
613        // If receipts are empty, we can simply return None
614        if receipts_iter.peek().is_none() {
615            return Ok(None);
616        }
617
618        let start = Instant::now();
619        self.ensure_no_queued_prune()?;
620
621        // At this point receipts contains at least one receipt, so this would be overwritten.
622        let mut tx_number = 0;
623        let mut count: u64 = 0;
624
625        for receipt_result in receipts_iter {
626            let (tx_num, receipt) = receipt_result?;
627            self.append_with_tx_number(tx_num, receipt.borrow())?;
628            tx_number = tx_num;
629            count += 1;
630        }
631
632        if let Some(metrics) = &self.metrics {
633            metrics.record_segment_operations(
634                StaticFileSegment::Receipts,
635                StaticFileProviderOperation::Append,
636                count,
637                Some(start.elapsed()),
638            );
639        }
640
641        Ok(Some(tx_number))
642    }
643
644    /// Adds an instruction to prune `to_delete` transactions during commit.
645    ///
646    /// Note: `last_block` refers to the block the unwinds ends at.
647    pub fn prune_transactions(
648        &mut self,
649        to_delete: u64,
650        last_block: BlockNumber,
651    ) -> ProviderResult<()> {
652        debug_assert_eq!(self.writer.user_header().segment(), StaticFileSegment::Transactions);
653        self.queue_prune(to_delete, Some(last_block))
654    }
655
656    /// Adds an instruction to prune `to_delete` receipts during commit.
657    ///
658    /// Note: `last_block` refers to the block the unwinds ends at.
659    pub fn prune_receipts(
660        &mut self,
661        to_delete: u64,
662        last_block: BlockNumber,
663    ) -> ProviderResult<()> {
664        debug_assert_eq!(self.writer.user_header().segment(), StaticFileSegment::Receipts);
665        self.queue_prune(to_delete, Some(last_block))
666    }
667
668    /// Adds an instruction to prune `to_delete` headers during commit.
669    pub fn prune_headers(&mut self, to_delete: u64) -> ProviderResult<()> {
670        debug_assert_eq!(self.writer.user_header().segment(), StaticFileSegment::Headers);
671        self.queue_prune(to_delete, None)
672    }
673
674    /// Adds an instruction to prune `to_delete` elements during commit.
675    ///
676    /// Note: `last_block` refers to the block the unwinds ends at if dealing with transaction-based
677    /// data.
678    fn queue_prune(
679        &mut self,
680        to_delete: u64,
681        last_block: Option<BlockNumber>,
682    ) -> ProviderResult<()> {
683        self.ensure_no_queued_prune()?;
684        self.prune_on_commit = Some((to_delete, last_block));
685        Ok(())
686    }
687
688    /// Returns Error if there is a pruning instruction that needs to be applied.
689    fn ensure_no_queued_prune(&self) -> ProviderResult<()> {
690        if self.prune_on_commit.is_some() {
691            return Err(ProviderError::other(StaticFileWriterError::new(
692                "Pruning should be committed before appending or pruning more data",
693            )));
694        }
695        Ok(())
696    }
697
698    /// Removes the last `to_delete` transactions from the data file.
699    fn prune_transaction_data(
700        &mut self,
701        to_delete: u64,
702        last_block: BlockNumber,
703    ) -> ProviderResult<()> {
704        let start = Instant::now();
705
706        debug_assert!(self.writer.user_header().segment() == StaticFileSegment::Transactions);
707
708        self.truncate(to_delete, Some(last_block))?;
709
710        if let Some(metrics) = &self.metrics {
711            metrics.record_segment_operation(
712                StaticFileSegment::Transactions,
713                StaticFileProviderOperation::Prune,
714                Some(start.elapsed()),
715            );
716        }
717
718        Ok(())
719    }
720
721    /// Prunes the last `to_delete` receipts from the data file.
722    fn prune_receipt_data(
723        &mut self,
724        to_delete: u64,
725        last_block: BlockNumber,
726    ) -> ProviderResult<()> {
727        let start = Instant::now();
728
729        debug_assert!(self.writer.user_header().segment() == StaticFileSegment::Receipts);
730
731        self.truncate(to_delete, Some(last_block))?;
732
733        if let Some(metrics) = &self.metrics {
734            metrics.record_segment_operation(
735                StaticFileSegment::Receipts,
736                StaticFileProviderOperation::Prune,
737                Some(start.elapsed()),
738            );
739        }
740
741        Ok(())
742    }
743
744    /// Prunes the last `to_delete` headers from the data file.
745    fn prune_header_data(&mut self, to_delete: u64) -> ProviderResult<()> {
746        let start = Instant::now();
747
748        debug_assert!(self.writer.user_header().segment() == StaticFileSegment::Headers);
749
750        self.truncate(to_delete, None)?;
751
752        if let Some(metrics) = &self.metrics {
753            metrics.record_segment_operation(
754                StaticFileSegment::Headers,
755                StaticFileProviderOperation::Prune,
756                Some(start.elapsed()),
757            );
758        }
759
760        Ok(())
761    }
762
763    fn reader(&self) -> StaticFileProvider<N> {
764        Self::upgrade_provider_to_strong_reference(&self.reader)
765    }
766
767    /// Upgrades a weak reference of [`StaticFileProviderInner`] to a strong reference
768    /// [`StaticFileProvider`].
769    ///
770    /// # Panics
771    ///
772    /// Panics if the parent [`StaticFileProvider`] is fully dropped while the child writer is still
773    /// active. In reality, it's impossible to detach the [`StaticFileProviderRW`] from the
774    /// [`StaticFileProvider`].
775    fn upgrade_provider_to_strong_reference(
776        provider: &Weak<StaticFileProviderInner<N>>,
777    ) -> StaticFileProvider<N> {
778        provider.upgrade().map(StaticFileProvider).expect("StaticFileProvider is dropped")
779    }
780
781    /// Helper function to access [`SegmentHeader`].
782    pub const fn user_header(&self) -> &SegmentHeader {
783        self.writer.user_header()
784    }
785
786    /// Helper function to access a mutable reference to [`SegmentHeader`].
787    pub const fn user_header_mut(&mut self) -> &mut SegmentHeader {
788        self.writer.user_header_mut()
789    }
790
791    /// Helper function to override block range for testing.
792    #[cfg(any(test, feature = "test-utils"))]
793    pub const fn set_block_range(&mut self, block_range: std::ops::RangeInclusive<BlockNumber>) {
794        self.writer.user_header_mut().set_block_range(*block_range.start(), *block_range.end())
795    }
796
797    /// Helper function to override block range for testing.
798    #[cfg(any(test, feature = "test-utils"))]
799    pub const fn inner(&mut self) -> &mut NippyJarWriter<SegmentHeader> {
800        &mut self.writer
801    }
802}
803
804fn create_jar(
805    segment: StaticFileSegment,
806    path: &Path,
807    expected_block_range: SegmentRangeInclusive,
808) -> NippyJar<SegmentHeader> {
809    let mut jar = NippyJar::new(
810        segment.columns(),
811        path,
812        SegmentHeader::new(expected_block_range, None, None, segment),
813    );
814
815    // Transaction and Receipt already have the compression scheme used natively in its encoding.
816    // (zstd-dictionary)
817    if segment.is_headers() {
818        jar = jar.with_lz4();
819    }
820
821    jar
822}