reth_provider/providers/static_file/
writer.rs

1use super::{
2    manager::StaticFileProviderInner, metrics::StaticFileProviderMetrics, StaticFileProvider,
3};
4use crate::providers::static_file::metrics::StaticFileProviderOperation;
5use alloy_consensus::BlockHeader;
6use alloy_primitives::{BlockHash, BlockNumber, TxNumber, U256};
7use parking_lot::{lock_api::RwLockWriteGuard, RawRwLock, RwLock};
8use reth_codecs::Compact;
9use reth_db_api::models::CompactU256;
10use reth_nippy_jar::{NippyJar, NippyJarError, NippyJarWriter};
11use reth_node_types::NodePrimitives;
12use reth_static_file_types::{SegmentHeader, SegmentRangeInclusive, StaticFileSegment};
13use reth_storage_errors::provider::{ProviderError, ProviderResult, StaticFileWriterError};
14use std::{
15    borrow::Borrow,
16    cmp::Ordering,
17    fmt::Debug,
18    path::{Path, PathBuf},
19    sync::{Arc, Weak},
20    time::Instant,
21};
22use tracing::debug;
23
24/// Static file writers for every known [`StaticFileSegment`].
25///
26/// WARNING: Trying to use more than one writer for the same segment type **will result in a
27/// deadlock**.
28#[derive(Debug)]
29pub(crate) struct StaticFileWriters<N> {
30    headers: RwLock<Option<StaticFileProviderRW<N>>>,
31    transactions: RwLock<Option<StaticFileProviderRW<N>>>,
32    receipts: RwLock<Option<StaticFileProviderRW<N>>>,
33    transaction_senders: RwLock<Option<StaticFileProviderRW<N>>>,
34}
35
36impl<N> Default for StaticFileWriters<N> {
37    fn default() -> Self {
38        Self {
39            headers: Default::default(),
40            transactions: Default::default(),
41            receipts: Default::default(),
42            transaction_senders: Default::default(),
43        }
44    }
45}
46
47impl<N: NodePrimitives> StaticFileWriters<N> {
48    pub(crate) fn get_or_create(
49        &self,
50        segment: StaticFileSegment,
51        create_fn: impl FnOnce() -> ProviderResult<StaticFileProviderRW<N>>,
52    ) -> ProviderResult<StaticFileProviderRWRefMut<'_, N>> {
53        let mut write_guard = match segment {
54            StaticFileSegment::Headers => self.headers.write(),
55            StaticFileSegment::Transactions => self.transactions.write(),
56            StaticFileSegment::Receipts => self.receipts.write(),
57            StaticFileSegment::TransactionSenders => self.transaction_senders.write(),
58        };
59
60        if write_guard.is_none() {
61            *write_guard = Some(create_fn()?);
62        }
63
64        Ok(StaticFileProviderRWRefMut(write_guard))
65    }
66
67    pub(crate) fn commit(&self) -> ProviderResult<()> {
68        debug!(target: "provider::static_file", "Committing all static file segments");
69
70        for writer_lock in
71            [&self.headers, &self.transactions, &self.receipts, &self.transaction_senders]
72        {
73            let mut writer = writer_lock.write();
74            if let Some(writer) = writer.as_mut() {
75                writer.commit()?;
76            }
77        }
78
79        debug!(target: "provider::static_file", "Committed all static file segments");
80        Ok(())
81    }
82
83    pub(crate) fn has_unwind_queued(&self) -> bool {
84        for writer_lock in
85            [&self.headers, &self.transactions, &self.receipts, &self.transaction_senders]
86        {
87            let writer = writer_lock.read();
88            if let Some(writer) = writer.as_ref() &&
89                writer.will_prune_on_commit()
90            {
91                return true
92            }
93        }
94        false
95    }
96}
97
98/// Mutable reference to a [`StaticFileProviderRW`] behind a [`RwLockWriteGuard`].
99#[derive(Debug)]
100pub struct StaticFileProviderRWRefMut<'a, N>(
101    pub(crate) RwLockWriteGuard<'a, RawRwLock, Option<StaticFileProviderRW<N>>>,
102);
103
104impl<N> std::ops::DerefMut for StaticFileProviderRWRefMut<'_, N> {
105    fn deref_mut(&mut self) -> &mut Self::Target {
106        // This is always created by [`StaticFileWriters::get_or_create`]
107        self.0.as_mut().expect("static file writer provider should be init")
108    }
109}
110
111impl<N> std::ops::Deref for StaticFileProviderRWRefMut<'_, N> {
112    type Target = StaticFileProviderRW<N>;
113
114    fn deref(&self) -> &Self::Target {
115        // This is always created by [`StaticFileWriters::get_or_create`]
116        self.0.as_ref().expect("static file writer provider should be init")
117    }
118}
119
120#[derive(Debug)]
121/// Extends `StaticFileProvider` with writing capabilities
122pub struct StaticFileProviderRW<N> {
123    /// Reference back to the provider. We need [Weak] here because [`StaticFileProviderRW`] is
124    /// stored in a [`dashmap::DashMap`] inside the parent [`StaticFileProvider`].which is an
125    /// [Arc]. If we were to use an [Arc] here, we would create a reference cycle.
126    reader: Weak<StaticFileProviderInner<N>>,
127    /// A [`NippyJarWriter`] instance.
128    writer: NippyJarWriter<SegmentHeader>,
129    /// Path to opened file.
130    data_path: PathBuf,
131    /// Reusable buffer for encoding appended data.
132    buf: Vec<u8>,
133    /// Metrics.
134    metrics: Option<Arc<StaticFileProviderMetrics>>,
135    /// On commit, does the instructed pruning: number of lines, and if it applies, the last block
136    /// it ends at.
137    prune_on_commit: Option<(u64, Option<BlockNumber>)>,
138}
139
140impl<N: NodePrimitives> StaticFileProviderRW<N> {
141    /// Creates a new [`StaticFileProviderRW`] for a [`StaticFileSegment`].
142    ///
143    /// Before use, transaction based segments should ensure the block end range is the expected
144    /// one, and heal if not. For more check `Self::ensure_end_range_consistency`.
145    pub fn new(
146        segment: StaticFileSegment,
147        block: BlockNumber,
148        reader: Weak<StaticFileProviderInner<N>>,
149        metrics: Option<Arc<StaticFileProviderMetrics>>,
150    ) -> ProviderResult<Self> {
151        let (writer, data_path) = Self::open(segment, block, reader.clone(), metrics.clone())?;
152        let mut writer = Self {
153            writer,
154            data_path,
155            buf: Vec::with_capacity(100),
156            reader,
157            metrics,
158            prune_on_commit: None,
159        };
160
161        writer.ensure_end_range_consistency()?;
162
163        Ok(writer)
164    }
165
166    fn open(
167        segment: StaticFileSegment,
168        block: u64,
169        reader: Weak<StaticFileProviderInner<N>>,
170        metrics: Option<Arc<StaticFileProviderMetrics>>,
171    ) -> ProviderResult<(NippyJarWriter<SegmentHeader>, PathBuf)> {
172        let start = Instant::now();
173
174        let static_file_provider = Self::upgrade_provider_to_strong_reference(&reader);
175
176        let block_range = static_file_provider.find_fixed_range(segment, block);
177        let (jar, path) = match static_file_provider.get_segment_provider_for_block(
178            segment,
179            block_range.start(),
180            None,
181        ) {
182            Ok(provider) => (
183                NippyJar::load(provider.data_path()).map_err(ProviderError::other)?,
184                provider.data_path().into(),
185            ),
186            Err(ProviderError::MissingStaticFileBlock(_, _)) => {
187                let path = static_file_provider.directory().join(segment.filename(&block_range));
188                (create_jar(segment, &path, block_range), path)
189            }
190            Err(err) => return Err(err),
191        };
192
193        let result = match NippyJarWriter::new(jar) {
194            Ok(writer) => Ok((writer, path)),
195            Err(NippyJarError::FrozenJar) => {
196                // This static file has been frozen, so we should
197                Err(ProviderError::FinalizedStaticFile(segment, block))
198            }
199            Err(e) => Err(ProviderError::other(e)),
200        }?;
201
202        if let Some(metrics) = &metrics {
203            metrics.record_segment_operation(
204                segment,
205                StaticFileProviderOperation::OpenWriter,
206                Some(start.elapsed()),
207            );
208        }
209
210        Ok(result)
211    }
212
213    /// If a file level healing happens, we need to update the end range on the
214    /// [`SegmentHeader`].
215    ///
216    /// However, for transaction based segments, the block end range has to be found and healed
217    /// externally.
218    ///
219    /// Check [`reth_nippy_jar::NippyJarChecker`] &
220    /// [`NippyJarWriter`] for more on healing.
221    fn ensure_end_range_consistency(&mut self) -> ProviderResult<()> {
222        // If we have lost rows (in this run or previous), we need to update the [SegmentHeader].
223        let expected_rows = if self.user_header().segment().is_headers() {
224            self.user_header().block_len().unwrap_or_default()
225        } else {
226            self.user_header().tx_len().unwrap_or_default()
227        };
228        let actual_rows = self.writer.rows() as u64;
229        let pruned_rows = expected_rows.saturating_sub(actual_rows);
230        if pruned_rows > 0 {
231            self.user_header_mut().prune(pruned_rows);
232        }
233
234        debug!(
235            target: "provider::static_file",
236            segment = ?self.writer.user_header().segment(),
237            path = ?self.data_path,
238            pruned_rows,
239            "Ensuring end range consistency"
240        );
241
242        self.writer.commit().map_err(ProviderError::other)?;
243
244        // Updates the [SnapshotProvider] manager
245        self.update_index()?;
246        Ok(())
247    }
248
249    /// Returns `true` if the writer will prune on commit.
250    pub const fn will_prune_on_commit(&self) -> bool {
251        self.prune_on_commit.is_some()
252    }
253
254    /// Commits configuration changes to disk and updates the reader index with the new changes.
255    pub fn commit(&mut self) -> ProviderResult<()> {
256        let start = Instant::now();
257
258        // Truncates the data file if instructed to.
259        if let Some((to_delete, last_block_number)) = self.prune_on_commit.take() {
260            debug!(
261                target: "provider::static_file",
262                segment = ?self.writer.user_header().segment(),
263                to_delete,
264                "Pruning data on commit"
265            );
266            match self.writer.user_header().segment() {
267                StaticFileSegment::Headers => self.prune_header_data(to_delete)?,
268                StaticFileSegment::Transactions => self
269                    .prune_transaction_data(to_delete, last_block_number.expect("should exist"))?,
270                StaticFileSegment::Receipts => {
271                    self.prune_receipt_data(to_delete, last_block_number.expect("should exist"))?
272                }
273                StaticFileSegment::TransactionSenders => self.prune_transaction_sender_data(
274                    to_delete,
275                    last_block_number.expect("should exist"),
276                )?,
277            }
278        }
279
280        if self.writer.is_dirty() {
281            debug!(
282                target: "provider::static_file",
283                segment = ?self.writer.user_header().segment(),
284                "Committing writer to disk"
285            );
286
287            // Commits offsets and new user_header to disk
288            self.writer.commit().map_err(ProviderError::other)?;
289
290            if let Some(metrics) = &self.metrics {
291                metrics.record_segment_operation(
292                    self.writer.user_header().segment(),
293                    StaticFileProviderOperation::CommitWriter,
294                    Some(start.elapsed()),
295                );
296            }
297
298            debug!(
299                target: "provider::static_file",
300                segment = ?self.writer.user_header().segment(),
301                path = ?self.data_path,
302                duration = ?start.elapsed(),
303                "Committed writer to disk"
304            );
305
306            self.update_index()?;
307        }
308
309        Ok(())
310    }
311
312    /// Commits configuration changes to disk and updates the reader index with the new changes.
313    ///
314    /// CAUTION: does not call `sync_all` on the files.
315    #[cfg(feature = "test-utils")]
316    pub fn commit_without_sync_all(&mut self) -> ProviderResult<()> {
317        let start = Instant::now();
318
319        debug!(
320            target: "provider::static_file",
321            segment = ?self.writer.user_header().segment(),
322            "Committing writer to disk (without sync)"
323        );
324
325        // Commits offsets and new user_header to disk
326        self.writer.commit_without_sync_all().map_err(ProviderError::other)?;
327
328        if let Some(metrics) = &self.metrics {
329            metrics.record_segment_operation(
330                self.writer.user_header().segment(),
331                StaticFileProviderOperation::CommitWriter,
332                Some(start.elapsed()),
333            );
334        }
335
336        debug!(
337            target: "provider::static_file",
338            segment = ?self.writer.user_header().segment(),
339            path = ?self.data_path,
340            duration = ?start.elapsed(),
341            "Committed writer to disk (without sync)"
342        );
343
344        self.update_index()?;
345
346        Ok(())
347    }
348
349    /// Updates the `self.reader` internal index.
350    fn update_index(&self) -> ProviderResult<()> {
351        // We find the maximum block of the segment by checking this writer's last block.
352        //
353        // However if there's no block range (because there's no data), we try to calculate it by
354        // subtracting 1 from the expected block start, resulting on the last block of the
355        // previous file.
356        //
357        // If that expected block start is 0, then it means that there's no actual block data, and
358        // there's no block data in static files.
359        let segment_max_block = self
360            .writer
361            .user_header()
362            .block_range()
363            .as_ref()
364            .map(|block_range| block_range.end())
365            .or_else(|| {
366                (self.writer.user_header().expected_block_start() > 0)
367                    .then(|| self.writer.user_header().expected_block_start() - 1)
368            });
369
370        self.reader().update_index(self.writer.user_header().segment(), segment_max_block)
371    }
372
373    /// Ensures that the writer is positioned at the specified block number.
374    ///
375    /// If the writer is positioned at a greater block number than the specified one, the writer
376    /// will NOT be unwound and the error will be returned.
377    pub fn ensure_at_block(&mut self, advance_to: BlockNumber) -> ProviderResult<()> {
378        let current_block = if let Some(current_block_number) = self.current_block_number() {
379            current_block_number
380        } else {
381            self.increment_block(0)?;
382            0
383        };
384
385        match current_block.cmp(&advance_to) {
386            Ordering::Less => {
387                for block in current_block + 1..=advance_to {
388                    self.increment_block(block)?;
389                }
390            }
391            Ordering::Equal => {}
392            Ordering::Greater => {
393                return Err(ProviderError::UnexpectedStaticFileBlockNumber(
394                    self.writer.user_header().segment(),
395                    current_block,
396                    advance_to,
397                ));
398            }
399        }
400
401        Ok(())
402    }
403
404    /// Allows to increment the [`SegmentHeader`] end block. It will commit the current static file,
405    /// and create the next one if we are past the end range.
406    pub fn increment_block(&mut self, expected_block_number: BlockNumber) -> ProviderResult<()> {
407        let segment = self.writer.user_header().segment();
408
409        self.check_next_block_number(expected_block_number)?;
410
411        let start = Instant::now();
412        if let Some(last_block) = self.writer.user_header().block_end() {
413            // We have finished the previous static file and must freeze it
414            if last_block == self.writer.user_header().expected_block_end() {
415                // Commits offsets and new user_header to disk
416                self.commit()?;
417
418                // Opens the new static file
419                let (writer, data_path) =
420                    Self::open(segment, last_block + 1, self.reader.clone(), self.metrics.clone())?;
421                self.writer = writer;
422                self.data_path = data_path;
423
424                *self.writer.user_header_mut() = SegmentHeader::new(
425                    self.reader().find_fixed_range(segment, last_block + 1),
426                    None,
427                    None,
428                    segment,
429                );
430            }
431        }
432
433        self.writer.user_header_mut().increment_block();
434        if let Some(metrics) = &self.metrics {
435            metrics.record_segment_operation(
436                segment,
437                StaticFileProviderOperation::IncrementBlock,
438                Some(start.elapsed()),
439            );
440        }
441
442        Ok(())
443    }
444
445    /// Returns the current block number of the static file writer.
446    pub fn current_block_number(&self) -> Option<u64> {
447        self.writer.user_header().block_end()
448    }
449
450    /// Returns a block number that is one next to the current tip of static files.
451    pub fn next_block_number(&self) -> u64 {
452        // The next static file block number can be found by checking the one after block_end.
453        // However, if it's a new file that hasn't been added any data, its block range will
454        // actually be None. In that case, the next block will be found on `expected_block_start`.
455        self.writer
456            .user_header()
457            .block_end()
458            .map(|b| b + 1)
459            .unwrap_or_else(|| self.writer.user_header().expected_block_start())
460    }
461
462    /// Verifies if the incoming block number matches the next expected block number
463    /// for a static file. This ensures data continuity when adding new blocks.
464    fn check_next_block_number(&self, expected_block_number: u64) -> ProviderResult<()> {
465        let next_static_file_block = self.next_block_number();
466
467        if expected_block_number != next_static_file_block {
468            return Err(ProviderError::UnexpectedStaticFileBlockNumber(
469                self.writer.user_header().segment(),
470                expected_block_number,
471                next_static_file_block,
472            ))
473        }
474        Ok(())
475    }
476
477    /// Truncates a number of rows from disk. It deletes and loads an older static file if block
478    /// goes beyond the start of the current block range.
479    ///
480    /// **`last_block`** should be passed only with transaction based segments.
481    ///
482    /// # Note
483    /// Commits to the configuration file at the end.
484    fn truncate(&mut self, num_rows: u64, last_block: Option<u64>) -> ProviderResult<()> {
485        let mut remaining_rows = num_rows;
486        let segment = self.writer.user_header().segment();
487        while remaining_rows > 0 {
488            let len = if segment.is_block_based() {
489                self.writer.user_header().block_len().unwrap_or_default()
490            } else {
491                self.writer.user_header().tx_len().unwrap_or_default()
492            };
493
494            if remaining_rows >= len {
495                // If there's more rows to delete than this static file contains, then just
496                // delete the whole file and go to the next static file
497                let block_start = self.writer.user_header().expected_block_start();
498
499                // We only delete the file if it's NOT the first static file AND:
500                // * it's a Header segment  OR
501                // * it's a tx-based segment AND `last_block` is lower than the first block of this
502                //   file's block range. Otherwise, having no rows simply means that this block
503                //   range has no transactions, but the file should remain.
504                if block_start != 0 &&
505                    (segment.is_headers() || last_block.is_some_and(|b| b < block_start))
506                {
507                    self.delete_current_and_open_previous()?;
508                } else {
509                    // Update `SegmentHeader`
510                    self.writer.user_header_mut().prune(len);
511                    self.writer.prune_rows(len as usize).map_err(ProviderError::other)?;
512                    break
513                }
514
515                remaining_rows -= len;
516            } else {
517                // Update `SegmentHeader`
518                self.writer.user_header_mut().prune(remaining_rows);
519
520                // Truncate data
521                self.writer.prune_rows(remaining_rows as usize).map_err(ProviderError::other)?;
522                remaining_rows = 0;
523            }
524        }
525
526        // Only Transactions and Receipts
527        if let Some(last_block) = last_block {
528            let mut expected_block_start = self.writer.user_header().expected_block_start();
529
530            if num_rows == 0 {
531                // Edge case for when we are unwinding a chain of empty blocks that goes across
532                // files, and therefore, the only reference point to know which file
533                // we are supposed to be at is `last_block`.
534                while last_block < expected_block_start {
535                    self.delete_current_and_open_previous()?;
536                    expected_block_start = self.writer.user_header().expected_block_start();
537                }
538            }
539            self.writer.user_header_mut().set_block_range(expected_block_start, last_block);
540        }
541
542        // Commits new changes to disk.
543        self.commit()?;
544
545        Ok(())
546    }
547
548    /// Delete the current static file, and replace this provider writer with the previous static
549    /// file.
550    fn delete_current_and_open_previous(&mut self) -> Result<(), ProviderError> {
551        let current_path = self.data_path.clone();
552        let (previous_writer, data_path) = Self::open(
553            self.user_header().segment(),
554            self.writer.user_header().expected_block_start() - 1,
555            self.reader.clone(),
556            self.metrics.clone(),
557        )?;
558        self.writer = previous_writer;
559        self.writer.set_dirty();
560        self.data_path = data_path;
561        NippyJar::<SegmentHeader>::load(&current_path)
562            .map_err(ProviderError::other)?
563            .delete()
564            .map_err(ProviderError::other)?;
565        Ok(())
566    }
567
568    /// Appends column to static file.
569    fn append_column<T: Compact>(&mut self, column: T) -> ProviderResult<()> {
570        self.buf.clear();
571        column.to_compact(&mut self.buf);
572
573        self.writer.append_column(Some(Ok(&self.buf))).map_err(ProviderError::other)?;
574        Ok(())
575    }
576
577    /// Appends to tx number-based static file.
578    fn append_with_tx_number<V: Compact>(
579        &mut self,
580        tx_num: TxNumber,
581        value: V,
582    ) -> ProviderResult<()> {
583        if let Some(range) = self.writer.user_header().tx_range() {
584            let next_tx = range.end() + 1;
585            if next_tx != tx_num {
586                return Err(ProviderError::UnexpectedStaticFileTxNumber(
587                    self.writer.user_header().segment(),
588                    tx_num,
589                    next_tx,
590                ))
591            }
592            self.writer.user_header_mut().increment_tx();
593        } else {
594            self.writer.user_header_mut().set_tx_range(tx_num, tx_num);
595        }
596
597        self.append_column(value)?;
598
599        Ok(())
600    }
601
602    /// Appends header to static file.
603    ///
604    /// It **CALLS** `increment_block()` since the number of headers is equal to the number of
605    /// blocks.
606    pub fn append_header(&mut self, header: &N::BlockHeader, hash: &BlockHash) -> ProviderResult<()>
607    where
608        N::BlockHeader: Compact,
609    {
610        self.append_header_with_td(header, U256::ZERO, hash)
611    }
612
613    /// Appends header to static file with a specified total difficulty.
614    ///
615    /// It **CALLS** `increment_block()` since the number of headers is equal to the number of
616    /// blocks.
617    pub fn append_header_with_td(
618        &mut self,
619        header: &N::BlockHeader,
620        total_difficulty: U256,
621        hash: &BlockHash,
622    ) -> ProviderResult<()>
623    where
624        N::BlockHeader: Compact,
625    {
626        let start = Instant::now();
627        self.ensure_no_queued_prune()?;
628
629        debug_assert!(self.writer.user_header().segment() == StaticFileSegment::Headers);
630
631        self.increment_block(header.number())?;
632
633        self.append_column(header)?;
634        self.append_column(CompactU256::from(total_difficulty))?;
635        self.append_column(hash)?;
636
637        if let Some(metrics) = &self.metrics {
638            metrics.record_segment_operation(
639                StaticFileSegment::Headers,
640                StaticFileProviderOperation::Append,
641                Some(start.elapsed()),
642            );
643        }
644
645        Ok(())
646    }
647
648    /// Appends transaction to static file.
649    ///
650    /// It **DOES NOT CALL** `increment_block()`, it should be handled elsewhere. There might be
651    /// empty blocks and this function wouldn't be called.
652    pub fn append_transaction(&mut self, tx_num: TxNumber, tx: &N::SignedTx) -> ProviderResult<()>
653    where
654        N::SignedTx: Compact,
655    {
656        let start = Instant::now();
657        self.ensure_no_queued_prune()?;
658
659        debug_assert!(self.writer.user_header().segment() == StaticFileSegment::Transactions);
660        self.append_with_tx_number(tx_num, tx)?;
661
662        if let Some(metrics) = &self.metrics {
663            metrics.record_segment_operation(
664                StaticFileSegment::Transactions,
665                StaticFileProviderOperation::Append,
666                Some(start.elapsed()),
667            );
668        }
669
670        Ok(())
671    }
672
673    /// Appends receipt to static file.
674    ///
675    /// It **DOES NOT** call `increment_block()`, it should be handled elsewhere. There might be
676    /// empty blocks and this function wouldn't be called.
677    pub fn append_receipt(&mut self, tx_num: TxNumber, receipt: &N::Receipt) -> ProviderResult<()>
678    where
679        N::Receipt: Compact,
680    {
681        let start = Instant::now();
682        self.ensure_no_queued_prune()?;
683
684        debug_assert!(self.writer.user_header().segment() == StaticFileSegment::Receipts);
685        self.append_with_tx_number(tx_num, receipt)?;
686
687        if let Some(metrics) = &self.metrics {
688            metrics.record_segment_operation(
689                StaticFileSegment::Receipts,
690                StaticFileProviderOperation::Append,
691                Some(start.elapsed()),
692            );
693        }
694
695        Ok(())
696    }
697
698    /// Appends multiple receipts to the static file.
699    pub fn append_receipts<I, R>(&mut self, receipts: I) -> ProviderResult<()>
700    where
701        I: Iterator<Item = Result<(TxNumber, R), ProviderError>>,
702        R: Borrow<N::Receipt>,
703        N::Receipt: Compact,
704    {
705        debug_assert!(self.writer.user_header().segment() == StaticFileSegment::Receipts);
706
707        let mut receipts_iter = receipts.into_iter().peekable();
708        // If receipts are empty, we can simply return None
709        if receipts_iter.peek().is_none() {
710            return Ok(());
711        }
712
713        let start = Instant::now();
714        self.ensure_no_queued_prune()?;
715
716        // At this point receipts contains at least one receipt, so this would be overwritten.
717        let mut count: u64 = 0;
718
719        for receipt_result in receipts_iter {
720            let (tx_num, receipt) = receipt_result?;
721            self.append_with_tx_number(tx_num, receipt.borrow())?;
722            count += 1;
723        }
724
725        if let Some(metrics) = &self.metrics {
726            metrics.record_segment_operations(
727                StaticFileSegment::Receipts,
728                StaticFileProviderOperation::Append,
729                count,
730                Some(start.elapsed()),
731            );
732        }
733
734        Ok(())
735    }
736
737    /// Appends transaction sender to static file.
738    ///
739    /// It **DOES NOT** call `increment_block()`, it should be handled elsewhere. There might be
740    /// empty blocks and this function wouldn't be called.
741    pub fn append_transaction_sender(
742        &mut self,
743        tx_num: TxNumber,
744        sender: &alloy_primitives::Address,
745    ) -> ProviderResult<()> {
746        let start = Instant::now();
747        self.ensure_no_queued_prune()?;
748
749        debug_assert!(self.writer.user_header().segment() == StaticFileSegment::TransactionSenders);
750        self.append_with_tx_number(tx_num, sender)?;
751
752        if let Some(metrics) = &self.metrics {
753            metrics.record_segment_operation(
754                StaticFileSegment::TransactionSenders,
755                StaticFileProviderOperation::Append,
756                Some(start.elapsed()),
757            );
758        }
759
760        Ok(())
761    }
762
763    /// Appends multiple transaction senders to the static file.
764    pub fn append_transaction_senders<I>(&mut self, senders: I) -> ProviderResult<()>
765    where
766        I: Iterator<Item = (TxNumber, alloy_primitives::Address)>,
767    {
768        debug_assert!(self.writer.user_header().segment() == StaticFileSegment::TransactionSenders);
769
770        let mut senders_iter = senders.into_iter().peekable();
771        // If senders are empty, we can simply return
772        if senders_iter.peek().is_none() {
773            return Ok(());
774        }
775
776        let start = Instant::now();
777        self.ensure_no_queued_prune()?;
778
779        // At this point senders contains at least one sender, so this would be overwritten.
780        let mut count: u64 = 0;
781        for (tx_num, sender) in senders_iter {
782            self.append_with_tx_number(tx_num, sender)?;
783            count += 1;
784        }
785
786        if let Some(metrics) = &self.metrics {
787            metrics.record_segment_operations(
788                StaticFileSegment::TransactionSenders,
789                StaticFileProviderOperation::Append,
790                count,
791                Some(start.elapsed()),
792            );
793        }
794
795        Ok(())
796    }
797
798    /// Adds an instruction to prune `to_delete` transactions during commit.
799    ///
800    /// Note: `last_block` refers to the block the unwinds ends at.
801    pub fn prune_transactions(
802        &mut self,
803        to_delete: u64,
804        last_block: BlockNumber,
805    ) -> ProviderResult<()> {
806        debug_assert_eq!(self.writer.user_header().segment(), StaticFileSegment::Transactions);
807        self.queue_prune(to_delete, Some(last_block))
808    }
809
810    /// Adds an instruction to prune `to_delete` receipts during commit.
811    ///
812    /// Note: `last_block` refers to the block the unwinds ends at.
813    pub fn prune_receipts(
814        &mut self,
815        to_delete: u64,
816        last_block: BlockNumber,
817    ) -> ProviderResult<()> {
818        debug_assert_eq!(self.writer.user_header().segment(), StaticFileSegment::Receipts);
819        self.queue_prune(to_delete, Some(last_block))
820    }
821
822    /// Adds an instruction to prune `to_delete` transaction senders during commit.
823    ///
824    /// Note: `last_block` refers to the block the unwinds ends at.
825    pub fn prune_transaction_senders(
826        &mut self,
827        to_delete: u64,
828        last_block: BlockNumber,
829    ) -> ProviderResult<()> {
830        debug_assert_eq!(
831            self.writer.user_header().segment(),
832            StaticFileSegment::TransactionSenders
833        );
834        self.queue_prune(to_delete, Some(last_block))
835    }
836
837    /// Adds an instruction to prune `to_delete` headers during commit.
838    pub fn prune_headers(&mut self, to_delete: u64) -> ProviderResult<()> {
839        debug_assert_eq!(self.writer.user_header().segment(), StaticFileSegment::Headers);
840        self.queue_prune(to_delete, None)
841    }
842
843    /// Adds an instruction to prune `to_delete` elements during commit.
844    ///
845    /// Note: `last_block` refers to the block the unwinds ends at if dealing with transaction-based
846    /// data.
847    fn queue_prune(
848        &mut self,
849        to_delete: u64,
850        last_block: Option<BlockNumber>,
851    ) -> ProviderResult<()> {
852        self.ensure_no_queued_prune()?;
853        self.prune_on_commit = Some((to_delete, last_block));
854        Ok(())
855    }
856
857    /// Returns Error if there is a pruning instruction that needs to be applied.
858    fn ensure_no_queued_prune(&self) -> ProviderResult<()> {
859        if self.prune_on_commit.is_some() {
860            return Err(ProviderError::other(StaticFileWriterError::new(
861                "Pruning should be committed before appending or pruning more data",
862            )));
863        }
864        Ok(())
865    }
866
867    /// Removes the last `to_delete` transactions from the data file.
868    fn prune_transaction_data(
869        &mut self,
870        to_delete: u64,
871        last_block: BlockNumber,
872    ) -> ProviderResult<()> {
873        let start = Instant::now();
874
875        debug_assert!(self.writer.user_header().segment() == StaticFileSegment::Transactions);
876
877        self.truncate(to_delete, Some(last_block))?;
878
879        if let Some(metrics) = &self.metrics {
880            metrics.record_segment_operation(
881                StaticFileSegment::Transactions,
882                StaticFileProviderOperation::Prune,
883                Some(start.elapsed()),
884            );
885        }
886
887        Ok(())
888    }
889
890    /// Prunes the last `to_delete` receipts from the data file.
891    fn prune_receipt_data(
892        &mut self,
893        to_delete: u64,
894        last_block: BlockNumber,
895    ) -> ProviderResult<()> {
896        let start = Instant::now();
897
898        debug_assert!(self.writer.user_header().segment() == StaticFileSegment::Receipts);
899
900        self.truncate(to_delete, Some(last_block))?;
901
902        if let Some(metrics) = &self.metrics {
903            metrics.record_segment_operation(
904                StaticFileSegment::Receipts,
905                StaticFileProviderOperation::Prune,
906                Some(start.elapsed()),
907            );
908        }
909
910        Ok(())
911    }
912
913    /// Prunes the last `to_delete` transaction senders from the data file.
914    fn prune_transaction_sender_data(
915        &mut self,
916        to_delete: u64,
917        last_block: BlockNumber,
918    ) -> ProviderResult<()> {
919        let start = Instant::now();
920
921        debug_assert!(self.writer.user_header().segment() == StaticFileSegment::TransactionSenders);
922
923        self.truncate(to_delete, Some(last_block))?;
924
925        if let Some(metrics) = &self.metrics {
926            metrics.record_segment_operation(
927                StaticFileSegment::TransactionSenders,
928                StaticFileProviderOperation::Prune,
929                Some(start.elapsed()),
930            );
931        }
932
933        Ok(())
934    }
935
936    /// Prunes the last `to_delete` headers from the data file.
937    fn prune_header_data(&mut self, to_delete: u64) -> ProviderResult<()> {
938        let start = Instant::now();
939
940        debug_assert!(self.writer.user_header().segment() == StaticFileSegment::Headers);
941
942        self.truncate(to_delete, None)?;
943
944        if let Some(metrics) = &self.metrics {
945            metrics.record_segment_operation(
946                StaticFileSegment::Headers,
947                StaticFileProviderOperation::Prune,
948                Some(start.elapsed()),
949            );
950        }
951
952        Ok(())
953    }
954
955    /// Returns a [`StaticFileProvider`] associated with this writer.
956    pub fn reader(&self) -> StaticFileProvider<N> {
957        Self::upgrade_provider_to_strong_reference(&self.reader)
958    }
959
960    /// Upgrades a weak reference of [`StaticFileProviderInner`] to a strong reference
961    /// [`StaticFileProvider`].
962    ///
963    /// # Panics
964    ///
965    /// Panics if the parent [`StaticFileProvider`] is fully dropped while the child writer is still
966    /// active. In reality, it's impossible to detach the [`StaticFileProviderRW`] from the
967    /// [`StaticFileProvider`].
968    fn upgrade_provider_to_strong_reference(
969        provider: &Weak<StaticFileProviderInner<N>>,
970    ) -> StaticFileProvider<N> {
971        provider.upgrade().map(StaticFileProvider).expect("StaticFileProvider is dropped")
972    }
973
974    /// Helper function to access [`SegmentHeader`].
975    pub const fn user_header(&self) -> &SegmentHeader {
976        self.writer.user_header()
977    }
978
979    /// Helper function to access a mutable reference to [`SegmentHeader`].
980    pub const fn user_header_mut(&mut self) -> &mut SegmentHeader {
981        self.writer.user_header_mut()
982    }
983
984    /// Helper function to override block range for testing.
985    #[cfg(any(test, feature = "test-utils"))]
986    pub const fn set_block_range(&mut self, block_range: std::ops::RangeInclusive<BlockNumber>) {
987        self.writer.user_header_mut().set_block_range(*block_range.start(), *block_range.end())
988    }
989
990    /// Helper function to override block range for testing.
991    #[cfg(any(test, feature = "test-utils"))]
992    pub const fn inner(&mut self) -> &mut NippyJarWriter<SegmentHeader> {
993        &mut self.writer
994    }
995}
996
997fn create_jar(
998    segment: StaticFileSegment,
999    path: &Path,
1000    expected_block_range: SegmentRangeInclusive,
1001) -> NippyJar<SegmentHeader> {
1002    let mut jar = NippyJar::new(
1003        segment.columns(),
1004        path,
1005        SegmentHeader::new(expected_block_range, None, None, segment),
1006    );
1007
1008    // Transaction and Receipt already have the compression scheme used natively in its encoding.
1009    // (zstd-dictionary)
1010    if segment.is_headers() {
1011        jar = jar.with_lz4();
1012    }
1013
1014    jar
1015}