reth_provider/providers/static_file/
writer.rs

1use super::{
2    manager::StaticFileProviderInner, metrics::StaticFileProviderMetrics, StaticFileProvider,
3};
4use crate::providers::static_file::metrics::StaticFileProviderOperation;
5use alloy_consensus::BlockHeader;
6use alloy_primitives::{BlockHash, BlockNumber, TxNumber, U256};
7use parking_lot::{lock_api::RwLockWriteGuard, RawRwLock, RwLock};
8use reth_codecs::Compact;
9use reth_db_api::models::CompactU256;
10use reth_nippy_jar::{NippyJar, NippyJarError, NippyJarWriter};
11use reth_node_types::NodePrimitives;
12use reth_static_file_types::{SegmentHeader, SegmentRangeInclusive, StaticFileSegment};
13use reth_storage_errors::provider::{ProviderError, ProviderResult, StaticFileWriterError};
14use std::{
15    borrow::Borrow,
16    fmt::Debug,
17    path::{Path, PathBuf},
18    sync::{Arc, Weak},
19    time::Instant,
20};
21use tracing::debug;
22
23/// Static file writers for every known [`StaticFileSegment`].
24///
25/// WARNING: Trying to use more than one writer for the same segment type **will result in a
26/// deadlock**.
27#[derive(Debug)]
28pub(crate) struct StaticFileWriters<N> {
29    headers: RwLock<Option<StaticFileProviderRW<N>>>,
30    transactions: RwLock<Option<StaticFileProviderRW<N>>>,
31    receipts: RwLock<Option<StaticFileProviderRW<N>>>,
32}
33
34impl<N> Default for StaticFileWriters<N> {
35    fn default() -> Self {
36        Self {
37            headers: Default::default(),
38            transactions: Default::default(),
39            receipts: Default::default(),
40        }
41    }
42}
43
44impl<N: NodePrimitives> StaticFileWriters<N> {
45    pub(crate) fn get_or_create(
46        &self,
47        segment: StaticFileSegment,
48        create_fn: impl FnOnce() -> ProviderResult<StaticFileProviderRW<N>>,
49    ) -> ProviderResult<StaticFileProviderRWRefMut<'_, N>> {
50        let mut write_guard = match segment {
51            StaticFileSegment::Headers => self.headers.write(),
52            StaticFileSegment::Transactions => self.transactions.write(),
53            StaticFileSegment::Receipts => self.receipts.write(),
54        };
55
56        if write_guard.is_none() {
57            *write_guard = Some(create_fn()?);
58        }
59
60        Ok(StaticFileProviderRWRefMut(write_guard))
61    }
62
63    pub(crate) fn commit(&self) -> ProviderResult<()> {
64        for writer_lock in [&self.headers, &self.transactions, &self.receipts] {
65            let mut writer = writer_lock.write();
66            if let Some(writer) = writer.as_mut() {
67                writer.commit()?;
68            }
69        }
70        Ok(())
71    }
72
73    pub(crate) fn has_unwind_queued(&self) -> bool {
74        for writer_lock in [&self.headers, &self.transactions, &self.receipts] {
75            let writer = writer_lock.read();
76            if let Some(writer) = writer.as_ref() &&
77                writer.will_prune_on_commit()
78            {
79                return true
80            }
81        }
82        false
83    }
84}
85
86/// Mutable reference to a [`StaticFileProviderRW`] behind a [`RwLockWriteGuard`].
87#[derive(Debug)]
88pub struct StaticFileProviderRWRefMut<'a, N>(
89    pub(crate) RwLockWriteGuard<'a, RawRwLock, Option<StaticFileProviderRW<N>>>,
90);
91
92impl<N> std::ops::DerefMut for StaticFileProviderRWRefMut<'_, N> {
93    fn deref_mut(&mut self) -> &mut Self::Target {
94        // This is always created by [`StaticFileWriters::get_or_create`]
95        self.0.as_mut().expect("static file writer provider should be init")
96    }
97}
98
99impl<N> std::ops::Deref for StaticFileProviderRWRefMut<'_, N> {
100    type Target = StaticFileProviderRW<N>;
101
102    fn deref(&self) -> &Self::Target {
103        // This is always created by [`StaticFileWriters::get_or_create`]
104        self.0.as_ref().expect("static file writer provider should be init")
105    }
106}
107
108#[derive(Debug)]
109/// Extends `StaticFileProvider` with writing capabilities
110pub struct StaticFileProviderRW<N> {
111    /// Reference back to the provider. We need [Weak] here because [`StaticFileProviderRW`] is
112    /// stored in a [`dashmap::DashMap`] inside the parent [`StaticFileProvider`].which is an
113    /// [Arc]. If we were to use an [Arc] here, we would create a reference cycle.
114    reader: Weak<StaticFileProviderInner<N>>,
115    /// A [`NippyJarWriter`] instance.
116    writer: NippyJarWriter<SegmentHeader>,
117    /// Path to opened file.
118    data_path: PathBuf,
119    /// Reusable buffer for encoding appended data.
120    buf: Vec<u8>,
121    /// Metrics.
122    metrics: Option<Arc<StaticFileProviderMetrics>>,
123    /// On commit, does the instructed pruning: number of lines, and if it applies, the last block
124    /// it ends at.
125    prune_on_commit: Option<(u64, Option<BlockNumber>)>,
126}
127
128impl<N: NodePrimitives> StaticFileProviderRW<N> {
129    /// Creates a new [`StaticFileProviderRW`] for a [`StaticFileSegment`].
130    ///
131    /// Before use, transaction based segments should ensure the block end range is the expected
132    /// one, and heal if not. For more check `Self::ensure_end_range_consistency`.
133    pub fn new(
134        segment: StaticFileSegment,
135        block: BlockNumber,
136        reader: Weak<StaticFileProviderInner<N>>,
137        metrics: Option<Arc<StaticFileProviderMetrics>>,
138    ) -> ProviderResult<Self> {
139        let (writer, data_path) = Self::open(segment, block, reader.clone(), metrics.clone())?;
140        let mut writer = Self {
141            writer,
142            data_path,
143            buf: Vec::with_capacity(100),
144            reader,
145            metrics,
146            prune_on_commit: None,
147        };
148
149        writer.ensure_end_range_consistency()?;
150
151        Ok(writer)
152    }
153
154    fn open(
155        segment: StaticFileSegment,
156        block: u64,
157        reader: Weak<StaticFileProviderInner<N>>,
158        metrics: Option<Arc<StaticFileProviderMetrics>>,
159    ) -> ProviderResult<(NippyJarWriter<SegmentHeader>, PathBuf)> {
160        let start = Instant::now();
161
162        let static_file_provider = Self::upgrade_provider_to_strong_reference(&reader);
163
164        let block_range = static_file_provider.find_fixed_range(block);
165        let (jar, path) = match static_file_provider.get_segment_provider_from_block(
166            segment,
167            block_range.start(),
168            None,
169        ) {
170            Ok(provider) => (
171                NippyJar::load(provider.data_path()).map_err(ProviderError::other)?,
172                provider.data_path().into(),
173            ),
174            Err(ProviderError::MissingStaticFileBlock(_, _)) => {
175                let path = static_file_provider.directory().join(segment.filename(&block_range));
176                (create_jar(segment, &path, block_range), path)
177            }
178            Err(err) => return Err(err),
179        };
180
181        let result = match NippyJarWriter::new(jar) {
182            Ok(writer) => Ok((writer, path)),
183            Err(NippyJarError::FrozenJar) => {
184                // This static file has been frozen, so we should
185                Err(ProviderError::FinalizedStaticFile(segment, block))
186            }
187            Err(e) => Err(ProviderError::other(e)),
188        }?;
189
190        if let Some(metrics) = &metrics {
191            metrics.record_segment_operation(
192                segment,
193                StaticFileProviderOperation::OpenWriter,
194                Some(start.elapsed()),
195            );
196        }
197
198        Ok(result)
199    }
200
201    /// If a file level healing happens, we need to update the end range on the
202    /// [`SegmentHeader`].
203    ///
204    /// However, for transaction based segments, the block end range has to be found and healed
205    /// externally.
206    ///
207    /// Check [`reth_nippy_jar::NippyJarChecker`] &
208    /// [`NippyJarWriter`] for more on healing.
209    fn ensure_end_range_consistency(&mut self) -> ProviderResult<()> {
210        // If we have lost rows (in this run or previous), we need to update the [SegmentHeader].
211        let expected_rows = if self.user_header().segment().is_headers() {
212            self.user_header().block_len().unwrap_or_default()
213        } else {
214            self.user_header().tx_len().unwrap_or_default()
215        };
216        let actual_rows = self.writer.rows() as u64;
217        let pruned_rows = expected_rows.saturating_sub(actual_rows);
218        if pruned_rows > 0 {
219            self.user_header_mut().prune(pruned_rows);
220        }
221
222        self.writer.commit().map_err(ProviderError::other)?;
223
224        // Updates the [SnapshotProvider] manager
225        self.update_index()?;
226        Ok(())
227    }
228
229    /// Returns `true` if the writer will prune on commit.
230    pub const fn will_prune_on_commit(&self) -> bool {
231        self.prune_on_commit.is_some()
232    }
233
234    /// Commits configuration changes to disk and updates the reader index with the new changes.
235    pub fn commit(&mut self) -> ProviderResult<()> {
236        let start = Instant::now();
237
238        // Truncates the data file if instructed to.
239        if let Some((to_delete, last_block_number)) = self.prune_on_commit.take() {
240            match self.writer.user_header().segment() {
241                StaticFileSegment::Headers => self.prune_header_data(to_delete)?,
242                StaticFileSegment::Transactions => self
243                    .prune_transaction_data(to_delete, last_block_number.expect("should exist"))?,
244                StaticFileSegment::Receipts => {
245                    self.prune_receipt_data(to_delete, last_block_number.expect("should exist"))?
246                }
247            }
248        }
249
250        if self.writer.is_dirty() {
251            // Commits offsets and new user_header to disk
252            self.writer.commit().map_err(ProviderError::other)?;
253
254            if let Some(metrics) = &self.metrics {
255                metrics.record_segment_operation(
256                    self.writer.user_header().segment(),
257                    StaticFileProviderOperation::CommitWriter,
258                    Some(start.elapsed()),
259                );
260            }
261
262            debug!(
263                target: "provider::static_file",
264                segment = ?self.writer.user_header().segment(),
265                path = ?self.data_path,
266                duration = ?start.elapsed(),
267                "Commit"
268            );
269
270            self.update_index()?;
271        }
272
273        Ok(())
274    }
275
276    /// Commits configuration changes to disk and updates the reader index with the new changes.
277    ///
278    /// CAUTION: does not call `sync_all` on the files.
279    #[cfg(feature = "test-utils")]
280    pub fn commit_without_sync_all(&mut self) -> ProviderResult<()> {
281        let start = Instant::now();
282
283        // Commits offsets and new user_header to disk
284        self.writer.commit_without_sync_all().map_err(ProviderError::other)?;
285
286        if let Some(metrics) = &self.metrics {
287            metrics.record_segment_operation(
288                self.writer.user_header().segment(),
289                StaticFileProviderOperation::CommitWriter,
290                Some(start.elapsed()),
291            );
292        }
293
294        debug!(
295            target: "provider::static_file",
296            segment = ?self.writer.user_header().segment(),
297            path = ?self.data_path,
298            duration = ?start.elapsed(),
299            "Commit"
300        );
301
302        self.update_index()?;
303
304        Ok(())
305    }
306
307    /// Updates the `self.reader` internal index.
308    fn update_index(&self) -> ProviderResult<()> {
309        // We find the maximum block of the segment by checking this writer's last block.
310        //
311        // However if there's no block range (because there's no data), we try to calculate it by
312        // subtracting 1 from the expected block start, resulting on the last block of the
313        // previous file.
314        //
315        // If that expected block start is 0, then it means that there's no actual block data, and
316        // there's no block data in static files.
317        let segment_max_block = self
318            .writer
319            .user_header()
320            .block_range()
321            .as_ref()
322            .map(|block_range| block_range.end())
323            .or_else(|| {
324                (self.writer.user_header().expected_block_start() > 0)
325                    .then(|| self.writer.user_header().expected_block_start() - 1)
326            });
327
328        self.reader().update_index(self.writer.user_header().segment(), segment_max_block)
329    }
330
331    /// Allows to increment the [`SegmentHeader`] end block. It will commit the current static file,
332    /// and create the next one if we are past the end range.
333    ///
334    /// Returns the current [`BlockNumber`] as seen in the static file.
335    pub fn increment_block(&mut self, expected_block_number: BlockNumber) -> ProviderResult<()> {
336        let segment = self.writer.user_header().segment();
337
338        self.check_next_block_number(expected_block_number)?;
339
340        let start = Instant::now();
341        if let Some(last_block) = self.writer.user_header().block_end() {
342            // We have finished the previous static file and must freeze it
343            if last_block == self.writer.user_header().expected_block_end() {
344                // Commits offsets and new user_header to disk
345                self.commit()?;
346
347                // Opens the new static file
348                let (writer, data_path) =
349                    Self::open(segment, last_block + 1, self.reader.clone(), self.metrics.clone())?;
350                self.writer = writer;
351                self.data_path = data_path;
352
353                *self.writer.user_header_mut() = SegmentHeader::new(
354                    self.reader().find_fixed_range(last_block + 1),
355                    None,
356                    None,
357                    segment,
358                );
359            }
360        }
361
362        self.writer.user_header_mut().increment_block();
363        if let Some(metrics) = &self.metrics {
364            metrics.record_segment_operation(
365                segment,
366                StaticFileProviderOperation::IncrementBlock,
367                Some(start.elapsed()),
368            );
369        }
370
371        Ok(())
372    }
373
374    /// Returns a block number that is one next to the current tip of static files.
375    pub fn next_block_number(&self) -> u64 {
376        // The next static file block number can be found by checking the one after block_end.
377        // However, if it's a new file that hasn't been added any data, its block range will
378        // actually be None. In that case, the next block will be found on `expected_block_start`.
379        self.writer
380            .user_header()
381            .block_end()
382            .map(|b| b + 1)
383            .unwrap_or_else(|| self.writer.user_header().expected_block_start())
384    }
385
386    /// Verifies if the incoming block number matches the next expected block number
387    /// for a static file. This ensures data continuity when adding new blocks.
388    fn check_next_block_number(&self, expected_block_number: u64) -> ProviderResult<()> {
389        let next_static_file_block = self.next_block_number();
390
391        if expected_block_number != next_static_file_block {
392            return Err(ProviderError::UnexpectedStaticFileBlockNumber(
393                self.writer.user_header().segment(),
394                expected_block_number,
395                next_static_file_block,
396            ))
397        }
398        Ok(())
399    }
400
401    /// Truncates a number of rows from disk. It deletes and loads an older static file if block
402    /// goes beyond the start of the current block range.
403    ///
404    /// **`last_block`** should be passed only with transaction based segments.
405    ///
406    /// # Note
407    /// Commits to the configuration file at the end.
408    fn truncate(&mut self, num_rows: u64, last_block: Option<u64>) -> ProviderResult<()> {
409        let mut remaining_rows = num_rows;
410        let segment = self.writer.user_header().segment();
411        while remaining_rows > 0 {
412            let len = if segment.is_block_based() {
413                self.writer.user_header().block_len().unwrap_or_default()
414            } else {
415                self.writer.user_header().tx_len().unwrap_or_default()
416            };
417
418            if remaining_rows >= len {
419                // If there's more rows to delete than this static file contains, then just
420                // delete the whole file and go to the next static file
421                let block_start = self.writer.user_header().expected_block_start();
422
423                // We only delete the file if it's NOT the first static file AND:
424                // * it's a Header segment  OR
425                // * it's a tx-based segment AND `last_block` is lower than the first block of this
426                //   file's block range. Otherwise, having no rows simply means that this block
427                //   range has no transactions, but the file should remain.
428                if block_start != 0 &&
429                    (segment.is_headers() || last_block.is_some_and(|b| b < block_start))
430                {
431                    self.delete_current_and_open_previous()?;
432                } else {
433                    // Update `SegmentHeader`
434                    self.writer.user_header_mut().prune(len);
435                    self.writer.prune_rows(len as usize).map_err(ProviderError::other)?;
436                    break
437                }
438
439                remaining_rows -= len;
440            } else {
441                // Update `SegmentHeader`
442                self.writer.user_header_mut().prune(remaining_rows);
443
444                // Truncate data
445                self.writer.prune_rows(remaining_rows as usize).map_err(ProviderError::other)?;
446                remaining_rows = 0;
447            }
448        }
449
450        // Only Transactions and Receipts
451        if let Some(last_block) = last_block {
452            let mut expected_block_start = self.writer.user_header().expected_block_start();
453
454            if num_rows == 0 {
455                // Edge case for when we are unwinding a chain of empty blocks that goes across
456                // files, and therefore, the only reference point to know which file
457                // we are supposed to be at is `last_block`.
458                while last_block < expected_block_start {
459                    self.delete_current_and_open_previous()?;
460                    expected_block_start = self.writer.user_header().expected_block_start();
461                }
462            }
463            self.writer.user_header_mut().set_block_range(expected_block_start, last_block);
464        }
465
466        // Commits new changes to disk.
467        self.commit()?;
468
469        Ok(())
470    }
471
472    /// Delete the current static file, and replace this provider writer with the previous static
473    /// file.
474    fn delete_current_and_open_previous(&mut self) -> Result<(), ProviderError> {
475        let current_path = self.data_path.clone();
476        let (previous_writer, data_path) = Self::open(
477            self.user_header().segment(),
478            self.writer.user_header().expected_block_start() - 1,
479            self.reader.clone(),
480            self.metrics.clone(),
481        )?;
482        self.writer = previous_writer;
483        self.writer.set_dirty();
484        self.data_path = data_path;
485        NippyJar::<SegmentHeader>::load(&current_path)
486            .map_err(ProviderError::other)?
487            .delete()
488            .map_err(ProviderError::other)?;
489        Ok(())
490    }
491
492    /// Appends column to static file.
493    fn append_column<T: Compact>(&mut self, column: T) -> ProviderResult<()> {
494        self.buf.clear();
495        column.to_compact(&mut self.buf);
496
497        self.writer.append_column(Some(Ok(&self.buf))).map_err(ProviderError::other)?;
498        Ok(())
499    }
500
501    /// Appends to tx number-based static file.
502    ///
503    /// Returns the current [`TxNumber`] as seen in the static file.
504    fn append_with_tx_number<V: Compact>(
505        &mut self,
506        tx_num: TxNumber,
507        value: V,
508    ) -> ProviderResult<()> {
509        if let Some(range) = self.writer.user_header().tx_range() {
510            let next_tx = range.end() + 1;
511            if next_tx != tx_num {
512                return Err(ProviderError::UnexpectedStaticFileTxNumber(
513                    self.writer.user_header().segment(),
514                    tx_num,
515                    next_tx,
516                ))
517            }
518            self.writer.user_header_mut().increment_tx();
519        } else {
520            self.writer.user_header_mut().set_tx_range(tx_num, tx_num);
521        }
522
523        self.append_column(value)?;
524
525        Ok(())
526    }
527
528    /// Appends header to static file.
529    ///
530    /// It **CALLS** `increment_block()` since the number of headers is equal to the number of
531    /// blocks.
532    ///
533    /// Returns the current [`BlockNumber`] as seen in the static file.
534    pub fn append_header(
535        &mut self,
536        header: &N::BlockHeader,
537        total_difficulty: U256,
538        hash: &BlockHash,
539    ) -> ProviderResult<()>
540    where
541        N::BlockHeader: Compact,
542    {
543        let start = Instant::now();
544        self.ensure_no_queued_prune()?;
545
546        debug_assert!(self.writer.user_header().segment() == StaticFileSegment::Headers);
547
548        self.increment_block(header.number())?;
549
550        self.append_column(header)?;
551        self.append_column(CompactU256::from(total_difficulty))?;
552        self.append_column(hash)?;
553
554        if let Some(metrics) = &self.metrics {
555            metrics.record_segment_operation(
556                StaticFileSegment::Headers,
557                StaticFileProviderOperation::Append,
558                Some(start.elapsed()),
559            );
560        }
561
562        Ok(())
563    }
564
565    /// Appends transaction to static file.
566    ///
567    /// It **DOES NOT CALL** `increment_block()`, it should be handled elsewhere. There might be
568    /// empty blocks and this function wouldn't be called.
569    ///
570    /// Returns the current [`TxNumber`] as seen in the static file.
571    pub fn append_transaction(&mut self, tx_num: TxNumber, tx: &N::SignedTx) -> ProviderResult<()>
572    where
573        N::SignedTx: Compact,
574    {
575        let start = Instant::now();
576        self.ensure_no_queued_prune()?;
577
578        debug_assert!(self.writer.user_header().segment() == StaticFileSegment::Transactions);
579        self.append_with_tx_number(tx_num, tx)?;
580
581        if let Some(metrics) = &self.metrics {
582            metrics.record_segment_operation(
583                StaticFileSegment::Transactions,
584                StaticFileProviderOperation::Append,
585                Some(start.elapsed()),
586            );
587        }
588
589        Ok(())
590    }
591
592    /// Appends receipt to static file.
593    ///
594    /// It **DOES NOT** call `increment_block()`, it should be handled elsewhere. There might be
595    /// empty blocks and this function wouldn't be called.
596    ///
597    /// Returns the current [`TxNumber`] as seen in the static file.
598    pub fn append_receipt(&mut self, tx_num: TxNumber, receipt: &N::Receipt) -> ProviderResult<()>
599    where
600        N::Receipt: Compact,
601    {
602        let start = Instant::now();
603        self.ensure_no_queued_prune()?;
604
605        debug_assert!(self.writer.user_header().segment() == StaticFileSegment::Receipts);
606        self.append_with_tx_number(tx_num, receipt)?;
607
608        if let Some(metrics) = &self.metrics {
609            metrics.record_segment_operation(
610                StaticFileSegment::Receipts,
611                StaticFileProviderOperation::Append,
612                Some(start.elapsed()),
613            );
614        }
615
616        Ok(())
617    }
618
619    /// Appends multiple receipts to the static file.
620    ///
621    /// Returns the current [`TxNumber`] as seen in the static file, if any.
622    pub fn append_receipts<I, R>(&mut self, receipts: I) -> ProviderResult<Option<TxNumber>>
623    where
624        I: Iterator<Item = Result<(TxNumber, R), ProviderError>>,
625        R: Borrow<N::Receipt>,
626        N::Receipt: Compact,
627    {
628        debug_assert!(self.writer.user_header().segment() == StaticFileSegment::Receipts);
629
630        let mut receipts_iter = receipts.into_iter().peekable();
631        // If receipts are empty, we can simply return None
632        if receipts_iter.peek().is_none() {
633            return Ok(None);
634        }
635
636        let start = Instant::now();
637        self.ensure_no_queued_prune()?;
638
639        // At this point receipts contains at least one receipt, so this would be overwritten.
640        let mut tx_number = 0;
641        let mut count: u64 = 0;
642
643        for receipt_result in receipts_iter {
644            let (tx_num, receipt) = receipt_result?;
645            self.append_with_tx_number(tx_num, receipt.borrow())?;
646            tx_number = tx_num;
647            count += 1;
648        }
649
650        if let Some(metrics) = &self.metrics {
651            metrics.record_segment_operations(
652                StaticFileSegment::Receipts,
653                StaticFileProviderOperation::Append,
654                count,
655                Some(start.elapsed()),
656            );
657        }
658
659        Ok(Some(tx_number))
660    }
661
662    /// Adds an instruction to prune `to_delete` transactions during commit.
663    ///
664    /// Note: `last_block` refers to the block the unwinds ends at.
665    pub fn prune_transactions(
666        &mut self,
667        to_delete: u64,
668        last_block: BlockNumber,
669    ) -> ProviderResult<()> {
670        debug_assert_eq!(self.writer.user_header().segment(), StaticFileSegment::Transactions);
671        self.queue_prune(to_delete, Some(last_block))
672    }
673
674    /// Adds an instruction to prune `to_delete` receipts during commit.
675    ///
676    /// Note: `last_block` refers to the block the unwinds ends at.
677    pub fn prune_receipts(
678        &mut self,
679        to_delete: u64,
680        last_block: BlockNumber,
681    ) -> ProviderResult<()> {
682        debug_assert_eq!(self.writer.user_header().segment(), StaticFileSegment::Receipts);
683        self.queue_prune(to_delete, Some(last_block))
684    }
685
686    /// Adds an instruction to prune `to_delete` headers during commit.
687    pub fn prune_headers(&mut self, to_delete: u64) -> ProviderResult<()> {
688        debug_assert_eq!(self.writer.user_header().segment(), StaticFileSegment::Headers);
689        self.queue_prune(to_delete, None)
690    }
691
692    /// Adds an instruction to prune `to_delete` elements during commit.
693    ///
694    /// Note: `last_block` refers to the block the unwinds ends at if dealing with transaction-based
695    /// data.
696    fn queue_prune(
697        &mut self,
698        to_delete: u64,
699        last_block: Option<BlockNumber>,
700    ) -> ProviderResult<()> {
701        self.ensure_no_queued_prune()?;
702        self.prune_on_commit = Some((to_delete, last_block));
703        Ok(())
704    }
705
706    /// Returns Error if there is a pruning instruction that needs to be applied.
707    fn ensure_no_queued_prune(&self) -> ProviderResult<()> {
708        if self.prune_on_commit.is_some() {
709            return Err(ProviderError::other(StaticFileWriterError::new(
710                "Pruning should be committed before appending or pruning more data",
711            )));
712        }
713        Ok(())
714    }
715
716    /// Removes the last `to_delete` transactions from the data file.
717    fn prune_transaction_data(
718        &mut self,
719        to_delete: u64,
720        last_block: BlockNumber,
721    ) -> ProviderResult<()> {
722        let start = Instant::now();
723
724        debug_assert!(self.writer.user_header().segment() == StaticFileSegment::Transactions);
725
726        self.truncate(to_delete, Some(last_block))?;
727
728        if let Some(metrics) = &self.metrics {
729            metrics.record_segment_operation(
730                StaticFileSegment::Transactions,
731                StaticFileProviderOperation::Prune,
732                Some(start.elapsed()),
733            );
734        }
735
736        Ok(())
737    }
738
739    /// Prunes the last `to_delete` receipts from the data file.
740    fn prune_receipt_data(
741        &mut self,
742        to_delete: u64,
743        last_block: BlockNumber,
744    ) -> ProviderResult<()> {
745        let start = Instant::now();
746
747        debug_assert!(self.writer.user_header().segment() == StaticFileSegment::Receipts);
748
749        self.truncate(to_delete, Some(last_block))?;
750
751        if let Some(metrics) = &self.metrics {
752            metrics.record_segment_operation(
753                StaticFileSegment::Receipts,
754                StaticFileProviderOperation::Prune,
755                Some(start.elapsed()),
756            );
757        }
758
759        Ok(())
760    }
761
762    /// Prunes the last `to_delete` headers from the data file.
763    fn prune_header_data(&mut self, to_delete: u64) -> ProviderResult<()> {
764        let start = Instant::now();
765
766        debug_assert!(self.writer.user_header().segment() == StaticFileSegment::Headers);
767
768        self.truncate(to_delete, None)?;
769
770        if let Some(metrics) = &self.metrics {
771            metrics.record_segment_operation(
772                StaticFileSegment::Headers,
773                StaticFileProviderOperation::Prune,
774                Some(start.elapsed()),
775            );
776        }
777
778        Ok(())
779    }
780
781    fn reader(&self) -> StaticFileProvider<N> {
782        Self::upgrade_provider_to_strong_reference(&self.reader)
783    }
784
785    /// Upgrades a weak reference of [`StaticFileProviderInner`] to a strong reference
786    /// [`StaticFileProvider`].
787    ///
788    /// # Panics
789    ///
790    /// Panics if the parent [`StaticFileProvider`] is fully dropped while the child writer is still
791    /// active. In reality, it's impossible to detach the [`StaticFileProviderRW`] from the
792    /// [`StaticFileProvider`].
793    fn upgrade_provider_to_strong_reference(
794        provider: &Weak<StaticFileProviderInner<N>>,
795    ) -> StaticFileProvider<N> {
796        provider.upgrade().map(StaticFileProvider).expect("StaticFileProvider is dropped")
797    }
798
799    /// Helper function to access [`SegmentHeader`].
800    pub const fn user_header(&self) -> &SegmentHeader {
801        self.writer.user_header()
802    }
803
804    /// Helper function to access a mutable reference to [`SegmentHeader`].
805    pub const fn user_header_mut(&mut self) -> &mut SegmentHeader {
806        self.writer.user_header_mut()
807    }
808
809    /// Helper function to override block range for testing.
810    #[cfg(any(test, feature = "test-utils"))]
811    pub const fn set_block_range(&mut self, block_range: std::ops::RangeInclusive<BlockNumber>) {
812        self.writer.user_header_mut().set_block_range(*block_range.start(), *block_range.end())
813    }
814
815    /// Helper function to override block range for testing.
816    #[cfg(any(test, feature = "test-utils"))]
817    pub const fn inner(&mut self) -> &mut NippyJarWriter<SegmentHeader> {
818        &mut self.writer
819    }
820}
821
822fn create_jar(
823    segment: StaticFileSegment,
824    path: &Path,
825    expected_block_range: SegmentRangeInclusive,
826) -> NippyJar<SegmentHeader> {
827    let mut jar = NippyJar::new(
828        segment.columns(),
829        path,
830        SegmentHeader::new(expected_block_range, None, None, segment),
831    );
832
833    // Transaction and Receipt already have the compression scheme used natively in its encoding.
834    // (zstd-dictionary)
835    if segment.is_headers() {
836        jar = jar.with_lz4();
837    }
838
839    jar
840}