reth_provider/providers/static_file/
writer.rs

1use super::{
2    manager::StaticFileProviderInner, metrics::StaticFileProviderMetrics, StaticFileProvider,
3};
4use crate::providers::static_file::metrics::StaticFileProviderOperation;
5use alloy_consensus::BlockHeader;
6use alloy_primitives::{BlockHash, BlockNumber, TxNumber, U256};
7use parking_lot::{lock_api::RwLockWriteGuard, RawRwLock, RwLock};
8use reth_codecs::Compact;
9use reth_db_api::models::CompactU256;
10use reth_nippy_jar::{NippyJar, NippyJarError, NippyJarWriter};
11use reth_node_types::NodePrimitives;
12use reth_static_file_types::{SegmentHeader, SegmentRangeInclusive, StaticFileSegment};
13use reth_storage_errors::provider::{ProviderError, ProviderResult, StaticFileWriterError};
14use std::{
15    borrow::Borrow,
16    cmp::Ordering,
17    fmt::Debug,
18    path::{Path, PathBuf},
19    sync::{Arc, Weak},
20    time::Instant,
21};
22use tracing::debug;
23
24/// Static file writers for every known [`StaticFileSegment`].
25///
26/// WARNING: Trying to use more than one writer for the same segment type **will result in a
27/// deadlock**.
28#[derive(Debug)]
29pub(crate) struct StaticFileWriters<N> {
30    headers: RwLock<Option<StaticFileProviderRW<N>>>,
31    transactions: RwLock<Option<StaticFileProviderRW<N>>>,
32    receipts: RwLock<Option<StaticFileProviderRW<N>>>,
33    transaction_senders: RwLock<Option<StaticFileProviderRW<N>>>,
34}
35
36impl<N> Default for StaticFileWriters<N> {
37    fn default() -> Self {
38        Self {
39            headers: Default::default(),
40            transactions: Default::default(),
41            receipts: Default::default(),
42            transaction_senders: Default::default(),
43        }
44    }
45}
46
47impl<N: NodePrimitives> StaticFileWriters<N> {
48    pub(crate) fn get_or_create(
49        &self,
50        segment: StaticFileSegment,
51        create_fn: impl FnOnce() -> ProviderResult<StaticFileProviderRW<N>>,
52    ) -> ProviderResult<StaticFileProviderRWRefMut<'_, N>> {
53        let mut write_guard = match segment {
54            StaticFileSegment::Headers => self.headers.write(),
55            StaticFileSegment::Transactions => self.transactions.write(),
56            StaticFileSegment::Receipts => self.receipts.write(),
57            StaticFileSegment::TransactionSenders => self.transaction_senders.write(),
58        };
59
60        if write_guard.is_none() {
61            *write_guard = Some(create_fn()?);
62        }
63
64        Ok(StaticFileProviderRWRefMut(write_guard))
65    }
66
67    pub(crate) fn commit(&self) -> ProviderResult<()> {
68        debug!(target: "provider::static_file", "Committing all static file segments");
69
70        for writer_lock in
71            [&self.headers, &self.transactions, &self.receipts, &self.transaction_senders]
72        {
73            let mut writer = writer_lock.write();
74            if let Some(writer) = writer.as_mut() {
75                writer.commit()?;
76            }
77        }
78
79        debug!(target: "provider::static_file", "Committed all static file segments");
80        Ok(())
81    }
82
83    pub(crate) fn has_unwind_queued(&self) -> bool {
84        for writer_lock in
85            [&self.headers, &self.transactions, &self.receipts, &self.transaction_senders]
86        {
87            let writer = writer_lock.read();
88            if let Some(writer) = writer.as_ref() &&
89                writer.will_prune_on_commit()
90            {
91                return true
92            }
93        }
94        false
95    }
96}
97
98/// Mutable reference to a [`StaticFileProviderRW`] behind a [`RwLockWriteGuard`].
99#[derive(Debug)]
100pub struct StaticFileProviderRWRefMut<'a, N>(
101    pub(crate) RwLockWriteGuard<'a, RawRwLock, Option<StaticFileProviderRW<N>>>,
102);
103
104impl<N> std::ops::DerefMut for StaticFileProviderRWRefMut<'_, N> {
105    fn deref_mut(&mut self) -> &mut Self::Target {
106        // This is always created by [`StaticFileWriters::get_or_create`]
107        self.0.as_mut().expect("static file writer provider should be init")
108    }
109}
110
111impl<N> std::ops::Deref for StaticFileProviderRWRefMut<'_, N> {
112    type Target = StaticFileProviderRW<N>;
113
114    fn deref(&self) -> &Self::Target {
115        // This is always created by [`StaticFileWriters::get_or_create`]
116        self.0.as_ref().expect("static file writer provider should be init")
117    }
118}
119
120#[derive(Debug)]
121/// Extends `StaticFileProvider` with writing capabilities
122pub struct StaticFileProviderRW<N> {
123    /// Reference back to the provider. We need [Weak] here because [`StaticFileProviderRW`] is
124    /// stored in a [`dashmap::DashMap`] inside the parent [`StaticFileProvider`].which is an
125    /// [Arc]. If we were to use an [Arc] here, we would create a reference cycle.
126    reader: Weak<StaticFileProviderInner<N>>,
127    /// A [`NippyJarWriter`] instance.
128    writer: NippyJarWriter<SegmentHeader>,
129    /// Path to opened file.
130    data_path: PathBuf,
131    /// Reusable buffer for encoding appended data.
132    buf: Vec<u8>,
133    /// Metrics.
134    metrics: Option<Arc<StaticFileProviderMetrics>>,
135    /// On commit, does the instructed pruning: number of lines, and if it applies, the last block
136    /// it ends at.
137    prune_on_commit: Option<(u64, Option<BlockNumber>)>,
138}
139
140impl<N: NodePrimitives> StaticFileProviderRW<N> {
141    /// Creates a new [`StaticFileProviderRW`] for a [`StaticFileSegment`].
142    ///
143    /// Before use, transaction based segments should ensure the block end range is the expected
144    /// one, and heal if not. For more check `Self::ensure_end_range_consistency`.
145    pub fn new(
146        segment: StaticFileSegment,
147        block: BlockNumber,
148        reader: Weak<StaticFileProviderInner<N>>,
149        metrics: Option<Arc<StaticFileProviderMetrics>>,
150    ) -> ProviderResult<Self> {
151        let (writer, data_path) = Self::open(segment, block, reader.clone(), metrics.clone())?;
152        let mut writer = Self {
153            writer,
154            data_path,
155            buf: Vec::with_capacity(100),
156            reader,
157            metrics,
158            prune_on_commit: None,
159        };
160
161        writer.ensure_end_range_consistency()?;
162
163        Ok(writer)
164    }
165
166    fn open(
167        segment: StaticFileSegment,
168        block: u64,
169        reader: Weak<StaticFileProviderInner<N>>,
170        metrics: Option<Arc<StaticFileProviderMetrics>>,
171    ) -> ProviderResult<(NippyJarWriter<SegmentHeader>, PathBuf)> {
172        let start = Instant::now();
173
174        let static_file_provider = Self::upgrade_provider_to_strong_reference(&reader);
175
176        let block_range = static_file_provider.find_fixed_range(segment, block);
177        let (jar, path) = match static_file_provider.get_segment_provider_for_block(
178            segment,
179            block_range.start(),
180            None,
181        ) {
182            Ok(provider) => (
183                NippyJar::load(provider.data_path()).map_err(ProviderError::other)?,
184                provider.data_path().into(),
185            ),
186            Err(ProviderError::MissingStaticFileBlock(_, _)) => {
187                let path = static_file_provider.directory().join(segment.filename(&block_range));
188                (create_jar(segment, &path, block_range), path)
189            }
190            Err(err) => return Err(err),
191        };
192
193        let result = match NippyJarWriter::new(jar) {
194            Ok(writer) => Ok((writer, path)),
195            Err(NippyJarError::FrozenJar) => {
196                // This static file has been frozen, so we should
197                Err(ProviderError::FinalizedStaticFile(segment, block))
198            }
199            Err(e) => Err(ProviderError::other(e)),
200        }?;
201
202        if let Some(metrics) = &metrics {
203            metrics.record_segment_operation(
204                segment,
205                StaticFileProviderOperation::OpenWriter,
206                Some(start.elapsed()),
207            );
208        }
209
210        Ok(result)
211    }
212
213    /// If a file level healing happens, we need to update the end range on the
214    /// [`SegmentHeader`].
215    ///
216    /// However, for transaction based segments, the block end range has to be found and healed
217    /// externally.
218    ///
219    /// Check [`reth_nippy_jar::NippyJarChecker`] &
220    /// [`NippyJarWriter`] for more on healing.
221    fn ensure_end_range_consistency(&mut self) -> ProviderResult<()> {
222        // If we have lost rows (in this run or previous), we need to update the [SegmentHeader].
223        let expected_rows = if self.user_header().segment().is_headers() {
224            self.user_header().block_len().unwrap_or_default()
225        } else {
226            self.user_header().tx_len().unwrap_or_default()
227        };
228        let actual_rows = self.writer.rows() as u64;
229        let pruned_rows = expected_rows.saturating_sub(actual_rows);
230        if pruned_rows > 0 {
231            self.user_header_mut().prune(pruned_rows);
232        }
233
234        debug!(
235            target: "provider::static_file",
236            segment = ?self.writer.user_header().segment(),
237            path = ?self.data_path,
238            pruned_rows,
239            "Ensuring end range consistency"
240        );
241
242        self.writer.commit().map_err(ProviderError::other)?;
243
244        // Updates the [SnapshotProvider] manager
245        self.update_index()?;
246        Ok(())
247    }
248
249    /// Returns `true` if the writer will prune on commit.
250    pub const fn will_prune_on_commit(&self) -> bool {
251        self.prune_on_commit.is_some()
252    }
253
254    /// Commits configuration changes to disk and updates the reader index with the new changes.
255    pub fn commit(&mut self) -> ProviderResult<()> {
256        let start = Instant::now();
257
258        // Truncates the data file if instructed to.
259        if let Some((to_delete, last_block_number)) = self.prune_on_commit.take() {
260            debug!(
261                target: "provider::static_file",
262                segment = ?self.writer.user_header().segment(),
263                to_delete,
264                "Pruning data on commit"
265            );
266            match self.writer.user_header().segment() {
267                StaticFileSegment::Headers => self.prune_header_data(to_delete)?,
268                StaticFileSegment::Transactions => self
269                    .prune_transaction_data(to_delete, last_block_number.expect("should exist"))?,
270                StaticFileSegment::Receipts => {
271                    self.prune_receipt_data(to_delete, last_block_number.expect("should exist"))?
272                }
273                StaticFileSegment::TransactionSenders => self.prune_transaction_sender_data(
274                    to_delete,
275                    last_block_number.expect("should exist"),
276                )?,
277            }
278        }
279
280        if self.writer.is_dirty() {
281            debug!(
282                target: "provider::static_file",
283                segment = ?self.writer.user_header().segment(),
284                "Committing writer to disk"
285            );
286
287            // Commits offsets and new user_header to disk
288            self.writer.commit().map_err(ProviderError::other)?;
289
290            if let Some(metrics) = &self.metrics {
291                metrics.record_segment_operation(
292                    self.writer.user_header().segment(),
293                    StaticFileProviderOperation::CommitWriter,
294                    Some(start.elapsed()),
295                );
296            }
297
298            debug!(
299                target: "provider::static_file",
300                segment = ?self.writer.user_header().segment(),
301                path = ?self.data_path,
302                duration = ?start.elapsed(),
303                "Committed writer to disk"
304            );
305
306            self.update_index()?;
307        }
308
309        Ok(())
310    }
311
312    /// Commits configuration changes to disk and updates the reader index with the new changes.
313    ///
314    /// CAUTION: does not call `sync_all` on the files.
315    #[cfg(feature = "test-utils")]
316    pub fn commit_without_sync_all(&mut self) -> ProviderResult<()> {
317        let start = Instant::now();
318
319        debug!(
320            target: "provider::static_file",
321            segment = ?self.writer.user_header().segment(),
322            "Committing writer to disk (without sync)"
323        );
324
325        // Commits offsets and new user_header to disk
326        self.writer.commit_without_sync_all().map_err(ProviderError::other)?;
327
328        if let Some(metrics) = &self.metrics {
329            metrics.record_segment_operation(
330                self.writer.user_header().segment(),
331                StaticFileProviderOperation::CommitWriter,
332                Some(start.elapsed()),
333            );
334        }
335
336        debug!(
337            target: "provider::static_file",
338            segment = ?self.writer.user_header().segment(),
339            path = ?self.data_path,
340            duration = ?start.elapsed(),
341            "Committed writer to disk (without sync)"
342        );
343
344        self.update_index()?;
345
346        Ok(())
347    }
348
349    /// Updates the `self.reader` internal index.
350    fn update_index(&self) -> ProviderResult<()> {
351        // We find the maximum block of the segment by checking this writer's last block.
352        //
353        // However if there's no block range (because there's no data), we try to calculate it by
354        // subtracting 1 from the expected block start, resulting on the last block of the
355        // previous file.
356        //
357        // If that expected block start is 0, then it means that there's no actual block data, and
358        // there's no block data in static files.
359        let segment_max_block = self
360            .writer
361            .user_header()
362            .block_range()
363            .as_ref()
364            .map(|block_range| block_range.end())
365            .or_else(|| {
366                (self.writer.user_header().expected_block_start() >
367                    self.reader().genesis_block_number())
368                .then(|| self.writer.user_header().expected_block_start() - 1)
369            });
370
371        self.reader().update_index(self.writer.user_header().segment(), segment_max_block)
372    }
373
374    /// Ensures that the writer is positioned at the specified block number.
375    ///
376    /// If the writer is positioned at a greater block number than the specified one, the writer
377    /// will NOT be unwound and the error will be returned.
378    pub fn ensure_at_block(&mut self, advance_to: BlockNumber) -> ProviderResult<()> {
379        let current_block = if let Some(current_block_number) = self.current_block_number() {
380            current_block_number
381        } else {
382            self.increment_block(0)?;
383            0
384        };
385
386        match current_block.cmp(&advance_to) {
387            Ordering::Less => {
388                for block in current_block + 1..=advance_to {
389                    self.increment_block(block)?;
390                }
391            }
392            Ordering::Equal => {}
393            Ordering::Greater => {
394                return Err(ProviderError::UnexpectedStaticFileBlockNumber(
395                    self.writer.user_header().segment(),
396                    current_block,
397                    advance_to,
398                ));
399            }
400        }
401
402        Ok(())
403    }
404
405    /// Allows to increment the [`SegmentHeader`] end block. It will commit the current static file,
406    /// and create the next one if we are past the end range.
407    pub fn increment_block(&mut self, expected_block_number: BlockNumber) -> ProviderResult<()> {
408        let segment = self.writer.user_header().segment();
409
410        self.check_next_block_number(expected_block_number)?;
411
412        let start = Instant::now();
413        if let Some(last_block) = self.writer.user_header().block_end() {
414            // We have finished the previous static file and must freeze it
415            if last_block == self.writer.user_header().expected_block_end() {
416                // Commits offsets and new user_header to disk
417                self.commit()?;
418
419                // Opens the new static file
420                let (writer, data_path) =
421                    Self::open(segment, last_block + 1, self.reader.clone(), self.metrics.clone())?;
422                self.writer = writer;
423                self.data_path = data_path;
424
425                *self.writer.user_header_mut() = SegmentHeader::new(
426                    self.reader().find_fixed_range(segment, last_block + 1),
427                    None,
428                    None,
429                    segment,
430                );
431            }
432        }
433
434        self.writer.user_header_mut().increment_block();
435        if let Some(metrics) = &self.metrics {
436            metrics.record_segment_operation(
437                segment,
438                StaticFileProviderOperation::IncrementBlock,
439                Some(start.elapsed()),
440            );
441        }
442
443        Ok(())
444    }
445
446    /// Returns the current block number of the static file writer.
447    pub fn current_block_number(&self) -> Option<u64> {
448        self.writer.user_header().block_end()
449    }
450
451    /// Returns a block number that is one next to the current tip of static files.
452    pub fn next_block_number(&self) -> u64 {
453        // The next static file block number can be found by checking the one after block_end.
454        // However, if it's a new file that hasn't been added any data, its block range will
455        // actually be None. In that case, the next block will be found on `expected_block_start`.
456        self.writer
457            .user_header()
458            .block_end()
459            .map(|b| b + 1)
460            .unwrap_or_else(|| self.writer.user_header().expected_block_start())
461    }
462
463    /// Verifies if the incoming block number matches the next expected block number
464    /// for a static file. This ensures data continuity when adding new blocks.
465    fn check_next_block_number(&self, expected_block_number: u64) -> ProviderResult<()> {
466        let next_static_file_block = self.next_block_number();
467
468        if expected_block_number != next_static_file_block {
469            return Err(ProviderError::UnexpectedStaticFileBlockNumber(
470                self.writer.user_header().segment(),
471                expected_block_number,
472                next_static_file_block,
473            ))
474        }
475        Ok(())
476    }
477
478    /// Truncates a number of rows from disk. It deletes and loads an older static file if block
479    /// goes beyond the start of the current block range.
480    ///
481    /// **`last_block`** should be passed only with transaction based segments.
482    ///
483    /// # Note
484    /// Commits to the configuration file at the end.
485    fn truncate(&mut self, num_rows: u64, last_block: Option<u64>) -> ProviderResult<()> {
486        let mut remaining_rows = num_rows;
487        let segment = self.writer.user_header().segment();
488        while remaining_rows > 0 {
489            let len = if segment.is_block_based() {
490                self.writer.user_header().block_len().unwrap_or_default()
491            } else {
492                self.writer.user_header().tx_len().unwrap_or_default()
493            };
494
495            if remaining_rows >= len {
496                // If there's more rows to delete than this static file contains, then just
497                // delete the whole file and go to the next static file
498                let block_start = self.writer.user_header().expected_block_start();
499
500                // We only delete the file if it's NOT the first static file AND:
501                // * it's a Header segment  OR
502                // * it's a tx-based segment AND `last_block` is lower than the first block of this
503                //   file's block range. Otherwise, having no rows simply means that this block
504                //   range has no transactions, but the file should remain.
505                if block_start != 0 &&
506                    (segment.is_headers() || last_block.is_some_and(|b| b < block_start))
507                {
508                    self.delete_current_and_open_previous()?;
509                } else {
510                    // Update `SegmentHeader`
511                    self.writer.user_header_mut().prune(len);
512                    self.writer.prune_rows(len as usize).map_err(ProviderError::other)?;
513                    break
514                }
515
516                remaining_rows -= len;
517            } else {
518                // Update `SegmentHeader`
519                self.writer.user_header_mut().prune(remaining_rows);
520
521                // Truncate data
522                self.writer.prune_rows(remaining_rows as usize).map_err(ProviderError::other)?;
523                remaining_rows = 0;
524            }
525        }
526
527        // Only Transactions and Receipts
528        if let Some(last_block) = last_block {
529            let mut expected_block_start = self.writer.user_header().expected_block_start();
530
531            if num_rows == 0 {
532                // Edge case for when we are unwinding a chain of empty blocks that goes across
533                // files, and therefore, the only reference point to know which file
534                // we are supposed to be at is `last_block`.
535                while last_block < expected_block_start {
536                    self.delete_current_and_open_previous()?;
537                    expected_block_start = self.writer.user_header().expected_block_start();
538                }
539            }
540            self.writer.user_header_mut().set_block_range(expected_block_start, last_block);
541        }
542
543        // Commits new changes to disk.
544        self.commit()?;
545
546        Ok(())
547    }
548
549    /// Delete the current static file, and replace this provider writer with the previous static
550    /// file.
551    fn delete_current_and_open_previous(&mut self) -> Result<(), ProviderError> {
552        let current_path = self.data_path.clone();
553        let (previous_writer, data_path) = Self::open(
554            self.user_header().segment(),
555            self.writer.user_header().expected_block_start() - 1,
556            self.reader.clone(),
557            self.metrics.clone(),
558        )?;
559        self.writer = previous_writer;
560        self.writer.set_dirty();
561        self.data_path = data_path;
562        NippyJar::<SegmentHeader>::load(&current_path)
563            .map_err(ProviderError::other)?
564            .delete()
565            .map_err(ProviderError::other)?;
566        Ok(())
567    }
568
569    /// Appends column to static file.
570    fn append_column<T: Compact>(&mut self, column: T) -> ProviderResult<()> {
571        self.buf.clear();
572        column.to_compact(&mut self.buf);
573
574        self.writer.append_column(Some(Ok(&self.buf))).map_err(ProviderError::other)?;
575        Ok(())
576    }
577
578    /// Appends to tx number-based static file.
579    fn append_with_tx_number<V: Compact>(
580        &mut self,
581        tx_num: TxNumber,
582        value: V,
583    ) -> ProviderResult<()> {
584        if let Some(range) = self.writer.user_header().tx_range() {
585            let next_tx = range.end() + 1;
586            if next_tx != tx_num {
587                return Err(ProviderError::UnexpectedStaticFileTxNumber(
588                    self.writer.user_header().segment(),
589                    tx_num,
590                    next_tx,
591                ))
592            }
593            self.writer.user_header_mut().increment_tx();
594        } else {
595            self.writer.user_header_mut().set_tx_range(tx_num, tx_num);
596        }
597
598        self.append_column(value)?;
599
600        Ok(())
601    }
602
603    /// Appends header to static file.
604    ///
605    /// It **CALLS** `increment_block()` since the number of headers is equal to the number of
606    /// blocks.
607    pub fn append_header(&mut self, header: &N::BlockHeader, hash: &BlockHash) -> ProviderResult<()>
608    where
609        N::BlockHeader: Compact,
610    {
611        self.append_header_with_td(header, U256::ZERO, hash)
612    }
613
614    /// Appends header to static file with a specified total difficulty.
615    ///
616    /// It **CALLS** `increment_block()` since the number of headers is equal to the number of
617    /// blocks.
618    pub fn append_header_with_td(
619        &mut self,
620        header: &N::BlockHeader,
621        total_difficulty: U256,
622        hash: &BlockHash,
623    ) -> ProviderResult<()>
624    where
625        N::BlockHeader: Compact,
626    {
627        let start = Instant::now();
628        self.ensure_no_queued_prune()?;
629
630        debug_assert!(self.writer.user_header().segment() == StaticFileSegment::Headers);
631
632        self.increment_block(header.number())?;
633
634        self.append_column(header)?;
635        self.append_column(CompactU256::from(total_difficulty))?;
636        self.append_column(hash)?;
637
638        if let Some(metrics) = &self.metrics {
639            metrics.record_segment_operation(
640                StaticFileSegment::Headers,
641                StaticFileProviderOperation::Append,
642                Some(start.elapsed()),
643            );
644        }
645
646        Ok(())
647    }
648
649    /// Appends header to static file without calling `increment_block`.
650    /// This is useful for genesis blocks with non-zero block numbers.
651    pub fn append_header_direct(
652        &mut self,
653        header: &N::BlockHeader,
654        total_difficulty: U256,
655        hash: &BlockHash,
656    ) -> ProviderResult<()>
657    where
658        N::BlockHeader: Compact,
659    {
660        let start = Instant::now();
661        self.ensure_no_queued_prune()?;
662
663        debug_assert!(self.writer.user_header().segment() == StaticFileSegment::Headers);
664
665        self.append_column(header)?;
666        self.append_column(CompactU256::from(total_difficulty))?;
667        self.append_column(hash)?;
668
669        if let Some(metrics) = &self.metrics {
670            metrics.record_segment_operation(
671                StaticFileSegment::Headers,
672                StaticFileProviderOperation::Append,
673                Some(start.elapsed()),
674            );
675        }
676
677        Ok(())
678    }
679
680    /// Appends transaction to static file.
681    ///
682    /// It **DOES NOT CALL** `increment_block()`, it should be handled elsewhere. There might be
683    /// empty blocks and this function wouldn't be called.
684    pub fn append_transaction(&mut self, tx_num: TxNumber, tx: &N::SignedTx) -> ProviderResult<()>
685    where
686        N::SignedTx: Compact,
687    {
688        let start = Instant::now();
689        self.ensure_no_queued_prune()?;
690
691        debug_assert!(self.writer.user_header().segment() == StaticFileSegment::Transactions);
692        self.append_with_tx_number(tx_num, tx)?;
693
694        if let Some(metrics) = &self.metrics {
695            metrics.record_segment_operation(
696                StaticFileSegment::Transactions,
697                StaticFileProviderOperation::Append,
698                Some(start.elapsed()),
699            );
700        }
701
702        Ok(())
703    }
704
705    /// Appends receipt to static file.
706    ///
707    /// It **DOES NOT** call `increment_block()`, it should be handled elsewhere. There might be
708    /// empty blocks and this function wouldn't be called.
709    pub fn append_receipt(&mut self, tx_num: TxNumber, receipt: &N::Receipt) -> ProviderResult<()>
710    where
711        N::Receipt: Compact,
712    {
713        let start = Instant::now();
714        self.ensure_no_queued_prune()?;
715
716        debug_assert!(self.writer.user_header().segment() == StaticFileSegment::Receipts);
717        self.append_with_tx_number(tx_num, receipt)?;
718
719        if let Some(metrics) = &self.metrics {
720            metrics.record_segment_operation(
721                StaticFileSegment::Receipts,
722                StaticFileProviderOperation::Append,
723                Some(start.elapsed()),
724            );
725        }
726
727        Ok(())
728    }
729
730    /// Appends multiple receipts to the static file.
731    pub fn append_receipts<I, R>(&mut self, receipts: I) -> ProviderResult<()>
732    where
733        I: Iterator<Item = Result<(TxNumber, R), ProviderError>>,
734        R: Borrow<N::Receipt>,
735        N::Receipt: Compact,
736    {
737        debug_assert!(self.writer.user_header().segment() == StaticFileSegment::Receipts);
738
739        let mut receipts_iter = receipts.into_iter().peekable();
740        // If receipts are empty, we can simply return None
741        if receipts_iter.peek().is_none() {
742            return Ok(());
743        }
744
745        let start = Instant::now();
746        self.ensure_no_queued_prune()?;
747
748        // At this point receipts contains at least one receipt, so this would be overwritten.
749        let mut count: u64 = 0;
750
751        for receipt_result in receipts_iter {
752            let (tx_num, receipt) = receipt_result?;
753            self.append_with_tx_number(tx_num, receipt.borrow())?;
754            count += 1;
755        }
756
757        if let Some(metrics) = &self.metrics {
758            metrics.record_segment_operations(
759                StaticFileSegment::Receipts,
760                StaticFileProviderOperation::Append,
761                count,
762                Some(start.elapsed()),
763            );
764        }
765
766        Ok(())
767    }
768
769    /// Appends transaction sender to static file.
770    ///
771    /// It **DOES NOT** call `increment_block()`, it should be handled elsewhere. There might be
772    /// empty blocks and this function wouldn't be called.
773    pub fn append_transaction_sender(
774        &mut self,
775        tx_num: TxNumber,
776        sender: &alloy_primitives::Address,
777    ) -> ProviderResult<()> {
778        let start = Instant::now();
779        self.ensure_no_queued_prune()?;
780
781        debug_assert!(self.writer.user_header().segment() == StaticFileSegment::TransactionSenders);
782        self.append_with_tx_number(tx_num, sender)?;
783
784        if let Some(metrics) = &self.metrics {
785            metrics.record_segment_operation(
786                StaticFileSegment::TransactionSenders,
787                StaticFileProviderOperation::Append,
788                Some(start.elapsed()),
789            );
790        }
791
792        Ok(())
793    }
794
795    /// Appends multiple transaction senders to the static file.
796    pub fn append_transaction_senders<I>(&mut self, senders: I) -> ProviderResult<()>
797    where
798        I: Iterator<Item = (TxNumber, alloy_primitives::Address)>,
799    {
800        debug_assert!(self.writer.user_header().segment() == StaticFileSegment::TransactionSenders);
801
802        let mut senders_iter = senders.into_iter().peekable();
803        // If senders are empty, we can simply return
804        if senders_iter.peek().is_none() {
805            return Ok(());
806        }
807
808        let start = Instant::now();
809        self.ensure_no_queued_prune()?;
810
811        // At this point senders contains at least one sender, so this would be overwritten.
812        let mut count: u64 = 0;
813        for (tx_num, sender) in senders_iter {
814            self.append_with_tx_number(tx_num, sender)?;
815            count += 1;
816        }
817
818        if let Some(metrics) = &self.metrics {
819            metrics.record_segment_operations(
820                StaticFileSegment::TransactionSenders,
821                StaticFileProviderOperation::Append,
822                count,
823                Some(start.elapsed()),
824            );
825        }
826
827        Ok(())
828    }
829
830    /// Adds an instruction to prune `to_delete` transactions during commit.
831    ///
832    /// Note: `last_block` refers to the block the unwinds ends at.
833    pub fn prune_transactions(
834        &mut self,
835        to_delete: u64,
836        last_block: BlockNumber,
837    ) -> ProviderResult<()> {
838        debug_assert_eq!(self.writer.user_header().segment(), StaticFileSegment::Transactions);
839        self.queue_prune(to_delete, Some(last_block))
840    }
841
842    /// Adds an instruction to prune `to_delete` receipts during commit.
843    ///
844    /// Note: `last_block` refers to the block the unwinds ends at.
845    pub fn prune_receipts(
846        &mut self,
847        to_delete: u64,
848        last_block: BlockNumber,
849    ) -> ProviderResult<()> {
850        debug_assert_eq!(self.writer.user_header().segment(), StaticFileSegment::Receipts);
851        self.queue_prune(to_delete, Some(last_block))
852    }
853
854    /// Adds an instruction to prune `to_delete` transaction senders during commit.
855    ///
856    /// Note: `last_block` refers to the block the unwinds ends at.
857    pub fn prune_transaction_senders(
858        &mut self,
859        to_delete: u64,
860        last_block: BlockNumber,
861    ) -> ProviderResult<()> {
862        debug_assert_eq!(
863            self.writer.user_header().segment(),
864            StaticFileSegment::TransactionSenders
865        );
866        self.queue_prune(to_delete, Some(last_block))
867    }
868
869    /// Adds an instruction to prune `to_delete` headers during commit.
870    pub fn prune_headers(&mut self, to_delete: u64) -> ProviderResult<()> {
871        debug_assert_eq!(self.writer.user_header().segment(), StaticFileSegment::Headers);
872        self.queue_prune(to_delete, None)
873    }
874
875    /// Adds an instruction to prune `to_delete` elements during commit.
876    ///
877    /// Note: `last_block` refers to the block the unwinds ends at if dealing with transaction-based
878    /// data.
879    fn queue_prune(
880        &mut self,
881        to_delete: u64,
882        last_block: Option<BlockNumber>,
883    ) -> ProviderResult<()> {
884        self.ensure_no_queued_prune()?;
885        self.prune_on_commit = Some((to_delete, last_block));
886        Ok(())
887    }
888
889    /// Returns Error if there is a pruning instruction that needs to be applied.
890    fn ensure_no_queued_prune(&self) -> ProviderResult<()> {
891        if self.prune_on_commit.is_some() {
892            return Err(ProviderError::other(StaticFileWriterError::new(
893                "Pruning should be committed before appending or pruning more data",
894            )));
895        }
896        Ok(())
897    }
898
899    /// Removes the last `to_delete` transactions from the data file.
900    fn prune_transaction_data(
901        &mut self,
902        to_delete: u64,
903        last_block: BlockNumber,
904    ) -> ProviderResult<()> {
905        let start = Instant::now();
906
907        debug_assert!(self.writer.user_header().segment() == StaticFileSegment::Transactions);
908
909        self.truncate(to_delete, Some(last_block))?;
910
911        if let Some(metrics) = &self.metrics {
912            metrics.record_segment_operation(
913                StaticFileSegment::Transactions,
914                StaticFileProviderOperation::Prune,
915                Some(start.elapsed()),
916            );
917        }
918
919        Ok(())
920    }
921
922    /// Prunes the last `to_delete` receipts from the data file.
923    fn prune_receipt_data(
924        &mut self,
925        to_delete: u64,
926        last_block: BlockNumber,
927    ) -> ProviderResult<()> {
928        let start = Instant::now();
929
930        debug_assert!(self.writer.user_header().segment() == StaticFileSegment::Receipts);
931
932        self.truncate(to_delete, Some(last_block))?;
933
934        if let Some(metrics) = &self.metrics {
935            metrics.record_segment_operation(
936                StaticFileSegment::Receipts,
937                StaticFileProviderOperation::Prune,
938                Some(start.elapsed()),
939            );
940        }
941
942        Ok(())
943    }
944
945    /// Prunes the last `to_delete` transaction senders from the data file.
946    fn prune_transaction_sender_data(
947        &mut self,
948        to_delete: u64,
949        last_block: BlockNumber,
950    ) -> ProviderResult<()> {
951        let start = Instant::now();
952
953        debug_assert!(self.writer.user_header().segment() == StaticFileSegment::TransactionSenders);
954
955        self.truncate(to_delete, Some(last_block))?;
956
957        if let Some(metrics) = &self.metrics {
958            metrics.record_segment_operation(
959                StaticFileSegment::TransactionSenders,
960                StaticFileProviderOperation::Prune,
961                Some(start.elapsed()),
962            );
963        }
964
965        Ok(())
966    }
967
968    /// Prunes the last `to_delete` headers from the data file.
969    fn prune_header_data(&mut self, to_delete: u64) -> ProviderResult<()> {
970        let start = Instant::now();
971
972        debug_assert!(self.writer.user_header().segment() == StaticFileSegment::Headers);
973
974        self.truncate(to_delete, None)?;
975
976        if let Some(metrics) = &self.metrics {
977            metrics.record_segment_operation(
978                StaticFileSegment::Headers,
979                StaticFileProviderOperation::Prune,
980                Some(start.elapsed()),
981            );
982        }
983
984        Ok(())
985    }
986
987    /// Returns a [`StaticFileProvider`] associated with this writer.
988    pub fn reader(&self) -> StaticFileProvider<N> {
989        Self::upgrade_provider_to_strong_reference(&self.reader)
990    }
991
992    /// Upgrades a weak reference of [`StaticFileProviderInner`] to a strong reference
993    /// [`StaticFileProvider`].
994    ///
995    /// # Panics
996    ///
997    /// Panics if the parent [`StaticFileProvider`] is fully dropped while the child writer is still
998    /// active. In reality, it's impossible to detach the [`StaticFileProviderRW`] from the
999    /// [`StaticFileProvider`].
1000    fn upgrade_provider_to_strong_reference(
1001        provider: &Weak<StaticFileProviderInner<N>>,
1002    ) -> StaticFileProvider<N> {
1003        provider.upgrade().map(StaticFileProvider).expect("StaticFileProvider is dropped")
1004    }
1005
1006    /// Helper function to access [`SegmentHeader`].
1007    pub const fn user_header(&self) -> &SegmentHeader {
1008        self.writer.user_header()
1009    }
1010
1011    /// Helper function to access a mutable reference to [`SegmentHeader`].
1012    pub const fn user_header_mut(&mut self) -> &mut SegmentHeader {
1013        self.writer.user_header_mut()
1014    }
1015
1016    /// Helper function to override block range for testing.
1017    #[cfg(any(test, feature = "test-utils"))]
1018    pub const fn set_block_range(&mut self, block_range: std::ops::RangeInclusive<BlockNumber>) {
1019        self.writer.user_header_mut().set_block_range(*block_range.start(), *block_range.end())
1020    }
1021
1022    /// Helper function to override block range for testing.
1023    #[cfg(any(test, feature = "test-utils"))]
1024    pub const fn inner(&mut self) -> &mut NippyJarWriter<SegmentHeader> {
1025        &mut self.writer
1026    }
1027}
1028
1029fn create_jar(
1030    segment: StaticFileSegment,
1031    path: &Path,
1032    expected_block_range: SegmentRangeInclusive,
1033) -> NippyJar<SegmentHeader> {
1034    let mut jar = NippyJar::new(
1035        segment.columns(),
1036        path,
1037        SegmentHeader::new(expected_block_range, None, None, segment),
1038    );
1039
1040    // Transaction and Receipt already have the compression scheme used natively in its encoding.
1041    // (zstd-dictionary)
1042    if segment.is_headers() {
1043        jar = jar.with_lz4();
1044    }
1045
1046    jar
1047}