reth_provider/providers/static_file/
writer.rs

1use super::{
2    manager::StaticFileProviderInner, metrics::StaticFileProviderMetrics, StaticFileProvider,
3};
4use crate::providers::static_file::metrics::StaticFileProviderOperation;
5use alloy_consensus::BlockHeader;
6use alloy_primitives::{BlockHash, BlockNumber, TxNumber, U256};
7use parking_lot::{lock_api::RwLockWriteGuard, RawRwLock, RwLock};
8use reth_codecs::Compact;
9use reth_db_api::models::CompactU256;
10use reth_nippy_jar::{NippyJar, NippyJarError, NippyJarWriter};
11use reth_node_types::NodePrimitives;
12use reth_static_file_types::{SegmentHeader, SegmentRangeInclusive, StaticFileSegment};
13use reth_storage_errors::provider::{ProviderError, ProviderResult, StaticFileWriterError};
14use std::{
15    borrow::Borrow,
16    fmt::Debug,
17    path::{Path, PathBuf},
18    sync::{Arc, Weak},
19    time::Instant,
20};
21use tracing::debug;
22
23/// Static file writers for every known [`StaticFileSegment`].
24///
25/// WARNING: Trying to use more than one writer for the same segment type **will result in a
26/// deadlock**.
27#[derive(Debug)]
28pub(crate) struct StaticFileWriters<N> {
29    headers: RwLock<Option<StaticFileProviderRW<N>>>,
30    transactions: RwLock<Option<StaticFileProviderRW<N>>>,
31    receipts: RwLock<Option<StaticFileProviderRW<N>>>,
32}
33
34impl<N> Default for StaticFileWriters<N> {
35    fn default() -> Self {
36        Self {
37            headers: Default::default(),
38            transactions: Default::default(),
39            receipts: Default::default(),
40        }
41    }
42}
43
44impl<N: NodePrimitives> StaticFileWriters<N> {
45    pub(crate) fn get_or_create(
46        &self,
47        segment: StaticFileSegment,
48        create_fn: impl FnOnce() -> ProviderResult<StaticFileProviderRW<N>>,
49    ) -> ProviderResult<StaticFileProviderRWRefMut<'_, N>> {
50        let mut write_guard = match segment {
51            StaticFileSegment::Headers => self.headers.write(),
52            StaticFileSegment::Transactions => self.transactions.write(),
53            StaticFileSegment::Receipts => self.receipts.write(),
54        };
55
56        if write_guard.is_none() {
57            *write_guard = Some(create_fn()?);
58        }
59
60        Ok(StaticFileProviderRWRefMut(write_guard))
61    }
62
63    pub(crate) fn commit(&self) -> ProviderResult<()> {
64        for writer_lock in [&self.headers, &self.transactions, &self.receipts] {
65            let mut writer = writer_lock.write();
66            if let Some(writer) = writer.as_mut() {
67                writer.commit()?;
68            }
69        }
70        Ok(())
71    }
72
73    pub(crate) fn has_unwind_queued(&self) -> bool {
74        for writer_lock in [&self.headers, &self.transactions, &self.receipts] {
75            let writer = writer_lock.read();
76            if let Some(writer) = writer.as_ref() &&
77                writer.will_prune_on_commit()
78            {
79                return true
80            }
81        }
82        false
83    }
84}
85
86/// Mutable reference to a [`StaticFileProviderRW`] behind a [`RwLockWriteGuard`].
87#[derive(Debug)]
88pub struct StaticFileProviderRWRefMut<'a, N>(
89    pub(crate) RwLockWriteGuard<'a, RawRwLock, Option<StaticFileProviderRW<N>>>,
90);
91
92impl<N> std::ops::DerefMut for StaticFileProviderRWRefMut<'_, N> {
93    fn deref_mut(&mut self) -> &mut Self::Target {
94        // This is always created by [`StaticFileWriters::get_or_create`]
95        self.0.as_mut().expect("static file writer provider should be init")
96    }
97}
98
99impl<N> std::ops::Deref for StaticFileProviderRWRefMut<'_, N> {
100    type Target = StaticFileProviderRW<N>;
101
102    fn deref(&self) -> &Self::Target {
103        // This is always created by [`StaticFileWriters::get_or_create`]
104        self.0.as_ref().expect("static file writer provider should be init")
105    }
106}
107
108#[derive(Debug)]
109/// Extends `StaticFileProvider` with writing capabilities
110pub struct StaticFileProviderRW<N> {
111    /// Reference back to the provider. We need [Weak] here because [`StaticFileProviderRW`] is
112    /// stored in a [`dashmap::DashMap`] inside the parent [`StaticFileProvider`].which is an
113    /// [Arc]. If we were to use an [Arc] here, we would create a reference cycle.
114    reader: Weak<StaticFileProviderInner<N>>,
115    /// A [`NippyJarWriter`] instance.
116    writer: NippyJarWriter<SegmentHeader>,
117    /// Path to opened file.
118    data_path: PathBuf,
119    /// Reusable buffer for encoding appended data.
120    buf: Vec<u8>,
121    /// Metrics.
122    metrics: Option<Arc<StaticFileProviderMetrics>>,
123    /// On commit, does the instructed pruning: number of lines, and if it applies, the last block
124    /// it ends at.
125    prune_on_commit: Option<(u64, Option<BlockNumber>)>,
126}
127
128impl<N: NodePrimitives> StaticFileProviderRW<N> {
129    /// Creates a new [`StaticFileProviderRW`] for a [`StaticFileSegment`].
130    ///
131    /// Before use, transaction based segments should ensure the block end range is the expected
132    /// one, and heal if not. For more check `Self::ensure_end_range_consistency`.
133    pub fn new(
134        segment: StaticFileSegment,
135        block: BlockNumber,
136        reader: Weak<StaticFileProviderInner<N>>,
137        metrics: Option<Arc<StaticFileProviderMetrics>>,
138    ) -> ProviderResult<Self> {
139        let (writer, data_path) = Self::open(segment, block, reader.clone(), metrics.clone())?;
140        let mut writer = Self {
141            writer,
142            data_path,
143            buf: Vec::with_capacity(100),
144            reader,
145            metrics,
146            prune_on_commit: None,
147        };
148
149        writer.ensure_end_range_consistency()?;
150
151        Ok(writer)
152    }
153
154    fn open(
155        segment: StaticFileSegment,
156        block: u64,
157        reader: Weak<StaticFileProviderInner<N>>,
158        metrics: Option<Arc<StaticFileProviderMetrics>>,
159    ) -> ProviderResult<(NippyJarWriter<SegmentHeader>, PathBuf)> {
160        let start = Instant::now();
161
162        let static_file_provider = Self::upgrade_provider_to_strong_reference(&reader);
163
164        let block_range = static_file_provider.find_fixed_range(block);
165        let (jar, path) = match static_file_provider.get_segment_provider_from_block(
166            segment,
167            block_range.start(),
168            None,
169        ) {
170            Ok(provider) => (
171                NippyJar::load(provider.data_path()).map_err(ProviderError::other)?,
172                provider.data_path().into(),
173            ),
174            Err(ProviderError::MissingStaticFileBlock(_, _)) => {
175                let path = static_file_provider.directory().join(segment.filename(&block_range));
176                (create_jar(segment, &path, block_range), path)
177            }
178            Err(err) => return Err(err),
179        };
180
181        let result = match NippyJarWriter::new(jar) {
182            Ok(writer) => Ok((writer, path)),
183            Err(NippyJarError::FrozenJar) => {
184                // This static file has been frozen, so we should
185                Err(ProviderError::FinalizedStaticFile(segment, block))
186            }
187            Err(e) => Err(ProviderError::other(e)),
188        }?;
189
190        if let Some(metrics) = &metrics {
191            metrics.record_segment_operation(
192                segment,
193                StaticFileProviderOperation::OpenWriter,
194                Some(start.elapsed()),
195            );
196        }
197
198        Ok(result)
199    }
200
201    /// If a file level healing happens, we need to update the end range on the
202    /// [`SegmentHeader`].
203    ///
204    /// However, for transaction based segments, the block end range has to be found and healed
205    /// externally.
206    ///
207    /// Check [`reth_nippy_jar::NippyJarChecker`] &
208    /// [`NippyJarWriter`] for more on healing.
209    fn ensure_end_range_consistency(&mut self) -> ProviderResult<()> {
210        // If we have lost rows (in this run or previous), we need to update the [SegmentHeader].
211        let expected_rows = if self.user_header().segment().is_headers() {
212            self.user_header().block_len().unwrap_or_default()
213        } else {
214            self.user_header().tx_len().unwrap_or_default()
215        };
216        let actual_rows = self.writer.rows() as u64;
217        let pruned_rows = expected_rows.saturating_sub(actual_rows);
218        if pruned_rows > 0 {
219            self.user_header_mut().prune(pruned_rows);
220        }
221
222        self.writer.commit().map_err(ProviderError::other)?;
223
224        // Updates the [SnapshotProvider] manager
225        self.update_index()?;
226        Ok(())
227    }
228
229    /// Returns `true` if the writer will prune on commit.
230    pub const fn will_prune_on_commit(&self) -> bool {
231        self.prune_on_commit.is_some()
232    }
233
234    /// Commits configuration changes to disk and updates the reader index with the new changes.
235    pub fn commit(&mut self) -> ProviderResult<()> {
236        let start = Instant::now();
237
238        // Truncates the data file if instructed to.
239        if let Some((to_delete, last_block_number)) = self.prune_on_commit.take() {
240            match self.writer.user_header().segment() {
241                StaticFileSegment::Headers => self.prune_header_data(to_delete)?,
242                StaticFileSegment::Transactions => self
243                    .prune_transaction_data(to_delete, last_block_number.expect("should exist"))?,
244                StaticFileSegment::Receipts => {
245                    self.prune_receipt_data(to_delete, last_block_number.expect("should exist"))?
246                }
247            }
248        }
249
250        if self.writer.is_dirty() {
251            // Commits offsets and new user_header to disk
252            self.writer.commit().map_err(ProviderError::other)?;
253
254            if let Some(metrics) = &self.metrics {
255                metrics.record_segment_operation(
256                    self.writer.user_header().segment(),
257                    StaticFileProviderOperation::CommitWriter,
258                    Some(start.elapsed()),
259                );
260            }
261
262            debug!(
263                target: "provider::static_file",
264                segment = ?self.writer.user_header().segment(),
265                path = ?self.data_path,
266                duration = ?start.elapsed(),
267                "Commit"
268            );
269
270            self.update_index()?;
271        }
272
273        Ok(())
274    }
275
276    /// Commits configuration changes to disk and updates the reader index with the new changes.
277    ///
278    /// CAUTION: does not call `sync_all` on the files.
279    #[cfg(feature = "test-utils")]
280    pub fn commit_without_sync_all(&mut self) -> ProviderResult<()> {
281        let start = Instant::now();
282
283        // Commits offsets and new user_header to disk
284        self.writer.commit_without_sync_all().map_err(ProviderError::other)?;
285
286        if let Some(metrics) = &self.metrics {
287            metrics.record_segment_operation(
288                self.writer.user_header().segment(),
289                StaticFileProviderOperation::CommitWriter,
290                Some(start.elapsed()),
291            );
292        }
293
294        debug!(
295            target: "provider::static_file",
296            segment = ?self.writer.user_header().segment(),
297            path = ?self.data_path,
298            duration = ?start.elapsed(),
299            "Commit"
300        );
301
302        self.update_index()?;
303
304        Ok(())
305    }
306
307    /// Updates the `self.reader` internal index.
308    fn update_index(&self) -> ProviderResult<()> {
309        // We find the maximum block of the segment by checking this writer's last block.
310        //
311        // However if there's no block range (because there's no data), we try to calculate it by
312        // subtracting 1 from the expected block start, resulting on the last block of the
313        // previous file.
314        //
315        // If that expected block start is 0, then it means that there's no actual block data, and
316        // there's no block data in static files.
317        let segment_max_block = self
318            .writer
319            .user_header()
320            .block_range()
321            .as_ref()
322            .map(|block_range| block_range.end())
323            .or_else(|| {
324                (self.writer.user_header().expected_block_start() > 0)
325                    .then(|| self.writer.user_header().expected_block_start() - 1)
326            });
327
328        self.reader().update_index(self.writer.user_header().segment(), segment_max_block)
329    }
330
331    /// Allows to increment the [`SegmentHeader`] end block. It will commit the current static file,
332    /// and create the next one if we are past the end range.
333    ///
334    /// Returns the current [`BlockNumber`] as seen in the static file.
335    pub fn increment_block(&mut self, expected_block_number: BlockNumber) -> ProviderResult<()> {
336        let segment = self.writer.user_header().segment();
337
338        self.check_next_block_number(expected_block_number)?;
339
340        let start = Instant::now();
341        if let Some(last_block) = self.writer.user_header().block_end() {
342            // We have finished the previous static file and must freeze it
343            if last_block == self.writer.user_header().expected_block_end() {
344                // Commits offsets and new user_header to disk
345                self.commit()?;
346
347                // Opens the new static file
348                let (writer, data_path) =
349                    Self::open(segment, last_block + 1, self.reader.clone(), self.metrics.clone())?;
350                self.writer = writer;
351                self.data_path = data_path;
352
353                *self.writer.user_header_mut() = SegmentHeader::new(
354                    self.reader().find_fixed_range(last_block + 1),
355                    None,
356                    None,
357                    segment,
358                );
359            }
360        }
361
362        self.writer.user_header_mut().increment_block();
363        if let Some(metrics) = &self.metrics {
364            metrics.record_segment_operation(
365                segment,
366                StaticFileProviderOperation::IncrementBlock,
367                Some(start.elapsed()),
368            );
369        }
370
371        Ok(())
372    }
373
374    /// Returns a block number that is one next to the current tip of static files.
375    pub fn next_block_number(&self) -> u64 {
376        // The next static file block number can be found by checking the one after block_end.
377        // However, if it's a new file that hasn't been added any data, its block range will
378        // actually be None. In that case, the next block will be found on `expected_block_start`.
379        self.writer
380            .user_header()
381            .block_end()
382            .map(|b| b + 1)
383            .unwrap_or_else(|| self.writer.user_header().expected_block_start())
384    }
385
386    /// Verifies if the incoming block number matches the next expected block number
387    /// for a static file. This ensures data continuity when adding new blocks.
388    fn check_next_block_number(&self, expected_block_number: u64) -> ProviderResult<()> {
389        let next_static_file_block = self.next_block_number();
390
391        if expected_block_number != next_static_file_block {
392            return Err(ProviderError::UnexpectedStaticFileBlockNumber(
393                self.writer.user_header().segment(),
394                expected_block_number,
395                next_static_file_block,
396            ))
397        }
398        Ok(())
399    }
400
401    /// Truncates a number of rows from disk. It deletes and loads an older static file if block
402    /// goes beyond the start of the current block range.
403    ///
404    /// **`last_block`** should be passed only with transaction based segments.
405    ///
406    /// # Note
407    /// Commits to the configuration file at the end.
408    fn truncate(&mut self, num_rows: u64, last_block: Option<u64>) -> ProviderResult<()> {
409        let mut remaining_rows = num_rows;
410        let segment = self.writer.user_header().segment();
411        while remaining_rows > 0 {
412            let len = if segment.is_block_based() {
413                self.writer.user_header().block_len().unwrap_or_default()
414            } else {
415                self.writer.user_header().tx_len().unwrap_or_default()
416            };
417
418            if remaining_rows >= len {
419                // If there's more rows to delete than this static file contains, then just
420                // delete the whole file and go to the next static file
421                let block_start = self.writer.user_header().expected_block_start();
422
423                // We only delete the file if it's NOT the first static file AND:
424                // * it's a Header segment  OR
425                // * it's a tx-based segment AND `last_block` is lower than the first block of this
426                //   file's block range. Otherwise, having no rows simply means that this block
427                //   range has no transactions, but the file should remain.
428                if block_start != 0 &&
429                    (segment.is_headers() || last_block.is_some_and(|b| b < block_start))
430                {
431                    self.delete_current_and_open_previous()?;
432                } else {
433                    // Update `SegmentHeader`
434                    self.writer.user_header_mut().prune(len);
435                    self.writer.prune_rows(len as usize).map_err(ProviderError::other)?;
436                    break
437                }
438
439                remaining_rows -= len;
440            } else {
441                // Update `SegmentHeader`
442                self.writer.user_header_mut().prune(remaining_rows);
443
444                // Truncate data
445                self.writer.prune_rows(remaining_rows as usize).map_err(ProviderError::other)?;
446                remaining_rows = 0;
447            }
448        }
449
450        // Only Transactions and Receipts
451        if let Some(last_block) = last_block {
452            let mut expected_block_start = self.writer.user_header().expected_block_start();
453
454            if num_rows == 0 {
455                // Edge case for when we are unwinding a chain of empty blocks that goes across
456                // files, and therefore, the only reference point to know which file
457                // we are supposed to be at is `last_block`.
458                while last_block < expected_block_start {
459                    self.delete_current_and_open_previous()?;
460                    expected_block_start = self.writer.user_header().expected_block_start();
461                }
462            }
463            self.writer.user_header_mut().set_block_range(expected_block_start, last_block);
464        }
465
466        // Commits new changes to disk.
467        self.commit()?;
468
469        Ok(())
470    }
471
472    /// Delete the current static file, and replace this provider writer with the previous static
473    /// file.
474    fn delete_current_and_open_previous(&mut self) -> Result<(), ProviderError> {
475        let current_path = self.data_path.clone();
476        let (previous_writer, data_path) = Self::open(
477            self.user_header().segment(),
478            self.writer.user_header().expected_block_start() - 1,
479            self.reader.clone(),
480            self.metrics.clone(),
481        )?;
482        self.writer = previous_writer;
483        self.writer.set_dirty();
484        self.data_path = data_path;
485        NippyJar::<SegmentHeader>::load(&current_path)
486            .map_err(ProviderError::other)?
487            .delete()
488            .map_err(ProviderError::other)?;
489        Ok(())
490    }
491
492    /// Appends column to static file.
493    fn append_column<T: Compact>(&mut self, column: T) -> ProviderResult<()> {
494        self.buf.clear();
495        column.to_compact(&mut self.buf);
496
497        self.writer.append_column(Some(Ok(&self.buf))).map_err(ProviderError::other)?;
498        Ok(())
499    }
500
501    /// Appends to tx number-based static file.
502    ///
503    /// Returns the current [`TxNumber`] as seen in the static file.
504    fn append_with_tx_number<V: Compact>(
505        &mut self,
506        tx_num: TxNumber,
507        value: V,
508    ) -> ProviderResult<()> {
509        if let Some(range) = self.writer.user_header().tx_range() {
510            let next_tx = range.end() + 1;
511            if next_tx != tx_num {
512                return Err(ProviderError::UnexpectedStaticFileTxNumber(
513                    self.writer.user_header().segment(),
514                    tx_num,
515                    next_tx,
516                ))
517            }
518            self.writer.user_header_mut().increment_tx();
519        } else {
520            self.writer.user_header_mut().set_tx_range(tx_num, tx_num);
521        }
522
523        self.append_column(value)?;
524
525        Ok(())
526    }
527
528    /// Appends header to static file.
529    ///
530    /// It **CALLS** `increment_block()` since the number of headers is equal to the number of
531    /// blocks.
532    ///
533    /// Returns the current [`BlockNumber`] as seen in the static file.
534    pub fn append_header(&mut self, header: &N::BlockHeader, hash: &BlockHash) -> ProviderResult<()>
535    where
536        N::BlockHeader: Compact,
537    {
538        self.append_header_with_td(header, U256::ZERO, hash)
539    }
540
541    /// Appends header to static file with a specified total difficulty.
542    ///
543    /// It **CALLS** `increment_block()` since the number of headers is equal to the number of
544    /// blocks.
545    ///
546    /// Returns the current [`BlockNumber`] as seen in the static file.
547    pub fn append_header_with_td(
548        &mut self,
549        header: &N::BlockHeader,
550        total_difficulty: U256,
551        hash: &BlockHash,
552    ) -> ProviderResult<()>
553    where
554        N::BlockHeader: Compact,
555    {
556        let start = Instant::now();
557        self.ensure_no_queued_prune()?;
558
559        debug_assert!(self.writer.user_header().segment() == StaticFileSegment::Headers);
560
561        self.increment_block(header.number())?;
562
563        self.append_column(header)?;
564        self.append_column(CompactU256::from(total_difficulty))?;
565        self.append_column(hash)?;
566
567        if let Some(metrics) = &self.metrics {
568            metrics.record_segment_operation(
569                StaticFileSegment::Headers,
570                StaticFileProviderOperation::Append,
571                Some(start.elapsed()),
572            );
573        }
574
575        Ok(())
576    }
577
578    /// Appends transaction to static file.
579    ///
580    /// It **DOES NOT CALL** `increment_block()`, it should be handled elsewhere. There might be
581    /// empty blocks and this function wouldn't be called.
582    ///
583    /// Returns the current [`TxNumber`] as seen in the static file.
584    pub fn append_transaction(&mut self, tx_num: TxNumber, tx: &N::SignedTx) -> ProviderResult<()>
585    where
586        N::SignedTx: Compact,
587    {
588        let start = Instant::now();
589        self.ensure_no_queued_prune()?;
590
591        debug_assert!(self.writer.user_header().segment() == StaticFileSegment::Transactions);
592        self.append_with_tx_number(tx_num, tx)?;
593
594        if let Some(metrics) = &self.metrics {
595            metrics.record_segment_operation(
596                StaticFileSegment::Transactions,
597                StaticFileProviderOperation::Append,
598                Some(start.elapsed()),
599            );
600        }
601
602        Ok(())
603    }
604
605    /// Appends receipt to static file.
606    ///
607    /// It **DOES NOT** call `increment_block()`, it should be handled elsewhere. There might be
608    /// empty blocks and this function wouldn't be called.
609    ///
610    /// Returns the current [`TxNumber`] as seen in the static file.
611    pub fn append_receipt(&mut self, tx_num: TxNumber, receipt: &N::Receipt) -> ProviderResult<()>
612    where
613        N::Receipt: Compact,
614    {
615        let start = Instant::now();
616        self.ensure_no_queued_prune()?;
617
618        debug_assert!(self.writer.user_header().segment() == StaticFileSegment::Receipts);
619        self.append_with_tx_number(tx_num, receipt)?;
620
621        if let Some(metrics) = &self.metrics {
622            metrics.record_segment_operation(
623                StaticFileSegment::Receipts,
624                StaticFileProviderOperation::Append,
625                Some(start.elapsed()),
626            );
627        }
628
629        Ok(())
630    }
631
632    /// Appends multiple receipts to the static file.
633    ///
634    /// Returns the current [`TxNumber`] as seen in the static file, if any.
635    pub fn append_receipts<I, R>(&mut self, receipts: I) -> ProviderResult<Option<TxNumber>>
636    where
637        I: Iterator<Item = Result<(TxNumber, R), ProviderError>>,
638        R: Borrow<N::Receipt>,
639        N::Receipt: Compact,
640    {
641        debug_assert!(self.writer.user_header().segment() == StaticFileSegment::Receipts);
642
643        let mut receipts_iter = receipts.into_iter().peekable();
644        // If receipts are empty, we can simply return None
645        if receipts_iter.peek().is_none() {
646            return Ok(None);
647        }
648
649        let start = Instant::now();
650        self.ensure_no_queued_prune()?;
651
652        // At this point receipts contains at least one receipt, so this would be overwritten.
653        let mut tx_number = 0;
654        let mut count: u64 = 0;
655
656        for receipt_result in receipts_iter {
657            let (tx_num, receipt) = receipt_result?;
658            self.append_with_tx_number(tx_num, receipt.borrow())?;
659            tx_number = tx_num;
660            count += 1;
661        }
662
663        if let Some(metrics) = &self.metrics {
664            metrics.record_segment_operations(
665                StaticFileSegment::Receipts,
666                StaticFileProviderOperation::Append,
667                count,
668                Some(start.elapsed()),
669            );
670        }
671
672        Ok(Some(tx_number))
673    }
674
675    /// Adds an instruction to prune `to_delete` transactions during commit.
676    ///
677    /// Note: `last_block` refers to the block the unwinds ends at.
678    pub fn prune_transactions(
679        &mut self,
680        to_delete: u64,
681        last_block: BlockNumber,
682    ) -> ProviderResult<()> {
683        debug_assert_eq!(self.writer.user_header().segment(), StaticFileSegment::Transactions);
684        self.queue_prune(to_delete, Some(last_block))
685    }
686
687    /// Adds an instruction to prune `to_delete` receipts during commit.
688    ///
689    /// Note: `last_block` refers to the block the unwinds ends at.
690    pub fn prune_receipts(
691        &mut self,
692        to_delete: u64,
693        last_block: BlockNumber,
694    ) -> ProviderResult<()> {
695        debug_assert_eq!(self.writer.user_header().segment(), StaticFileSegment::Receipts);
696        self.queue_prune(to_delete, Some(last_block))
697    }
698
699    /// Adds an instruction to prune `to_delete` headers during commit.
700    pub fn prune_headers(&mut self, to_delete: u64) -> ProviderResult<()> {
701        debug_assert_eq!(self.writer.user_header().segment(), StaticFileSegment::Headers);
702        self.queue_prune(to_delete, None)
703    }
704
705    /// Adds an instruction to prune `to_delete` elements during commit.
706    ///
707    /// Note: `last_block` refers to the block the unwinds ends at if dealing with transaction-based
708    /// data.
709    fn queue_prune(
710        &mut self,
711        to_delete: u64,
712        last_block: Option<BlockNumber>,
713    ) -> ProviderResult<()> {
714        self.ensure_no_queued_prune()?;
715        self.prune_on_commit = Some((to_delete, last_block));
716        Ok(())
717    }
718
719    /// Returns Error if there is a pruning instruction that needs to be applied.
720    fn ensure_no_queued_prune(&self) -> ProviderResult<()> {
721        if self.prune_on_commit.is_some() {
722            return Err(ProviderError::other(StaticFileWriterError::new(
723                "Pruning should be committed before appending or pruning more data",
724            )));
725        }
726        Ok(())
727    }
728
729    /// Removes the last `to_delete` transactions from the data file.
730    fn prune_transaction_data(
731        &mut self,
732        to_delete: u64,
733        last_block: BlockNumber,
734    ) -> ProviderResult<()> {
735        let start = Instant::now();
736
737        debug_assert!(self.writer.user_header().segment() == StaticFileSegment::Transactions);
738
739        self.truncate(to_delete, Some(last_block))?;
740
741        if let Some(metrics) = &self.metrics {
742            metrics.record_segment_operation(
743                StaticFileSegment::Transactions,
744                StaticFileProviderOperation::Prune,
745                Some(start.elapsed()),
746            );
747        }
748
749        Ok(())
750    }
751
752    /// Prunes the last `to_delete` receipts from the data file.
753    fn prune_receipt_data(
754        &mut self,
755        to_delete: u64,
756        last_block: BlockNumber,
757    ) -> ProviderResult<()> {
758        let start = Instant::now();
759
760        debug_assert!(self.writer.user_header().segment() == StaticFileSegment::Receipts);
761
762        self.truncate(to_delete, Some(last_block))?;
763
764        if let Some(metrics) = &self.metrics {
765            metrics.record_segment_operation(
766                StaticFileSegment::Receipts,
767                StaticFileProviderOperation::Prune,
768                Some(start.elapsed()),
769            );
770        }
771
772        Ok(())
773    }
774
775    /// Prunes the last `to_delete` headers from the data file.
776    fn prune_header_data(&mut self, to_delete: u64) -> ProviderResult<()> {
777        let start = Instant::now();
778
779        debug_assert!(self.writer.user_header().segment() == StaticFileSegment::Headers);
780
781        self.truncate(to_delete, None)?;
782
783        if let Some(metrics) = &self.metrics {
784            metrics.record_segment_operation(
785                StaticFileSegment::Headers,
786                StaticFileProviderOperation::Prune,
787                Some(start.elapsed()),
788            );
789        }
790
791        Ok(())
792    }
793
794    fn reader(&self) -> StaticFileProvider<N> {
795        Self::upgrade_provider_to_strong_reference(&self.reader)
796    }
797
798    /// Upgrades a weak reference of [`StaticFileProviderInner`] to a strong reference
799    /// [`StaticFileProvider`].
800    ///
801    /// # Panics
802    ///
803    /// Panics if the parent [`StaticFileProvider`] is fully dropped while the child writer is still
804    /// active. In reality, it's impossible to detach the [`StaticFileProviderRW`] from the
805    /// [`StaticFileProvider`].
806    fn upgrade_provider_to_strong_reference(
807        provider: &Weak<StaticFileProviderInner<N>>,
808    ) -> StaticFileProvider<N> {
809        provider.upgrade().map(StaticFileProvider).expect("StaticFileProvider is dropped")
810    }
811
812    /// Helper function to access [`SegmentHeader`].
813    pub const fn user_header(&self) -> &SegmentHeader {
814        self.writer.user_header()
815    }
816
817    /// Helper function to access a mutable reference to [`SegmentHeader`].
818    pub const fn user_header_mut(&mut self) -> &mut SegmentHeader {
819        self.writer.user_header_mut()
820    }
821
822    /// Helper function to override block range for testing.
823    #[cfg(any(test, feature = "test-utils"))]
824    pub const fn set_block_range(&mut self, block_range: std::ops::RangeInclusive<BlockNumber>) {
825        self.writer.user_header_mut().set_block_range(*block_range.start(), *block_range.end())
826    }
827
828    /// Helper function to override block range for testing.
829    #[cfg(any(test, feature = "test-utils"))]
830    pub const fn inner(&mut self) -> &mut NippyJarWriter<SegmentHeader> {
831        &mut self.writer
832    }
833}
834
835fn create_jar(
836    segment: StaticFileSegment,
837    path: &Path,
838    expected_block_range: SegmentRangeInclusive,
839) -> NippyJar<SegmentHeader> {
840    let mut jar = NippyJar::new(
841        segment.columns(),
842        path,
843        SegmentHeader::new(expected_block_range, None, None, segment),
844    );
845
846    // Transaction and Receipt already have the compression scheme used natively in its encoding.
847    // (zstd-dictionary)
848    if segment.is_headers() {
849        jar = jar.with_lz4();
850    }
851
852    jar
853}