reth_provider/providers/static_file/
writer.rs1use super::{
2 manager::StaticFileProviderInner, metrics::StaticFileProviderMetrics, StaticFileProvider,
3};
4use crate::providers::static_file::metrics::StaticFileProviderOperation;
5use alloy_consensus::BlockHeader;
6use alloy_primitives::{BlockHash, BlockNumber, TxNumber, U256};
7use parking_lot::{lock_api::RwLockWriteGuard, RawRwLock, RwLock};
8use reth_codecs::Compact;
9use reth_db_api::models::CompactU256;
10use reth_nippy_jar::{NippyJar, NippyJarError, NippyJarWriter};
11use reth_node_types::NodePrimitives;
12use reth_static_file_types::{SegmentHeader, SegmentRangeInclusive, StaticFileSegment};
13use reth_storage_errors::provider::{ProviderError, ProviderResult, StaticFileWriterError};
14use std::{
15 borrow::Borrow,
16 fmt::Debug,
17 path::{Path, PathBuf},
18 sync::{Arc, Weak},
19 time::Instant,
20};
21use tracing::debug;
22
23#[derive(Debug)]
28pub(crate) struct StaticFileWriters<N> {
29 headers: RwLock<Option<StaticFileProviderRW<N>>>,
30 transactions: RwLock<Option<StaticFileProviderRW<N>>>,
31 receipts: RwLock<Option<StaticFileProviderRW<N>>>,
32}
33
34impl<N> Default for StaticFileWriters<N> {
35 fn default() -> Self {
36 Self {
37 headers: Default::default(),
38 transactions: Default::default(),
39 receipts: Default::default(),
40 }
41 }
42}
43
44impl<N: NodePrimitives> StaticFileWriters<N> {
45 pub(crate) fn get_or_create(
46 &self,
47 segment: StaticFileSegment,
48 create_fn: impl FnOnce() -> ProviderResult<StaticFileProviderRW<N>>,
49 ) -> ProviderResult<StaticFileProviderRWRefMut<'_, N>> {
50 let mut write_guard = match segment {
51 StaticFileSegment::Headers => self.headers.write(),
52 StaticFileSegment::Transactions => self.transactions.write(),
53 StaticFileSegment::Receipts => self.receipts.write(),
54 };
55
56 if write_guard.is_none() {
57 *write_guard = Some(create_fn()?);
58 }
59
60 Ok(StaticFileProviderRWRefMut(write_guard))
61 }
62
63 pub(crate) fn commit(&self) -> ProviderResult<()> {
64 for writer_lock in [&self.headers, &self.transactions, &self.receipts] {
65 let mut writer = writer_lock.write();
66 if let Some(writer) = writer.as_mut() {
67 writer.commit()?;
68 }
69 }
70 Ok(())
71 }
72
73 pub(crate) fn has_unwind_queued(&self) -> bool {
74 for writer_lock in [&self.headers, &self.transactions, &self.receipts] {
75 let writer = writer_lock.read();
76 if let Some(writer) = writer.as_ref() &&
77 writer.will_prune_on_commit()
78 {
79 return true
80 }
81 }
82 false
83 }
84}
85
86#[derive(Debug)]
88pub struct StaticFileProviderRWRefMut<'a, N>(
89 pub(crate) RwLockWriteGuard<'a, RawRwLock, Option<StaticFileProviderRW<N>>>,
90);
91
92impl<N> std::ops::DerefMut for StaticFileProviderRWRefMut<'_, N> {
93 fn deref_mut(&mut self) -> &mut Self::Target {
94 self.0.as_mut().expect("static file writer provider should be init")
96 }
97}
98
99impl<N> std::ops::Deref for StaticFileProviderRWRefMut<'_, N> {
100 type Target = StaticFileProviderRW<N>;
101
102 fn deref(&self) -> &Self::Target {
103 self.0.as_ref().expect("static file writer provider should be init")
105 }
106}
107
108#[derive(Debug)]
109pub struct StaticFileProviderRW<N> {
111 reader: Weak<StaticFileProviderInner<N>>,
115 writer: NippyJarWriter<SegmentHeader>,
117 data_path: PathBuf,
119 buf: Vec<u8>,
121 metrics: Option<Arc<StaticFileProviderMetrics>>,
123 prune_on_commit: Option<(u64, Option<BlockNumber>)>,
126}
127
128impl<N: NodePrimitives> StaticFileProviderRW<N> {
129 pub fn new(
134 segment: StaticFileSegment,
135 block: BlockNumber,
136 reader: Weak<StaticFileProviderInner<N>>,
137 metrics: Option<Arc<StaticFileProviderMetrics>>,
138 ) -> ProviderResult<Self> {
139 let (writer, data_path) = Self::open(segment, block, reader.clone(), metrics.clone())?;
140 let mut writer = Self {
141 writer,
142 data_path,
143 buf: Vec::with_capacity(100),
144 reader,
145 metrics,
146 prune_on_commit: None,
147 };
148
149 writer.ensure_end_range_consistency()?;
150
151 Ok(writer)
152 }
153
154 fn open(
155 segment: StaticFileSegment,
156 block: u64,
157 reader: Weak<StaticFileProviderInner<N>>,
158 metrics: Option<Arc<StaticFileProviderMetrics>>,
159 ) -> ProviderResult<(NippyJarWriter<SegmentHeader>, PathBuf)> {
160 let start = Instant::now();
161
162 let static_file_provider = Self::upgrade_provider_to_strong_reference(&reader);
163
164 let block_range = static_file_provider.find_fixed_range(block);
165 let (jar, path) = match static_file_provider.get_segment_provider_from_block(
166 segment,
167 block_range.start(),
168 None,
169 ) {
170 Ok(provider) => (
171 NippyJar::load(provider.data_path()).map_err(ProviderError::other)?,
172 provider.data_path().into(),
173 ),
174 Err(ProviderError::MissingStaticFileBlock(_, _)) => {
175 let path = static_file_provider.directory().join(segment.filename(&block_range));
176 (create_jar(segment, &path, block_range), path)
177 }
178 Err(err) => return Err(err),
179 };
180
181 let result = match NippyJarWriter::new(jar) {
182 Ok(writer) => Ok((writer, path)),
183 Err(NippyJarError::FrozenJar) => {
184 Err(ProviderError::FinalizedStaticFile(segment, block))
186 }
187 Err(e) => Err(ProviderError::other(e)),
188 }?;
189
190 if let Some(metrics) = &metrics {
191 metrics.record_segment_operation(
192 segment,
193 StaticFileProviderOperation::OpenWriter,
194 Some(start.elapsed()),
195 );
196 }
197
198 Ok(result)
199 }
200
201 fn ensure_end_range_consistency(&mut self) -> ProviderResult<()> {
210 let expected_rows = if self.user_header().segment().is_headers() {
212 self.user_header().block_len().unwrap_or_default()
213 } else {
214 self.user_header().tx_len().unwrap_or_default()
215 };
216 let actual_rows = self.writer.rows() as u64;
217 let pruned_rows = expected_rows.saturating_sub(actual_rows);
218 if pruned_rows > 0 {
219 self.user_header_mut().prune(pruned_rows);
220 }
221
222 self.writer.commit().map_err(ProviderError::other)?;
223
224 self.update_index()?;
226 Ok(())
227 }
228
229 pub const fn will_prune_on_commit(&self) -> bool {
231 self.prune_on_commit.is_some()
232 }
233
234 pub fn commit(&mut self) -> ProviderResult<()> {
236 let start = Instant::now();
237
238 if let Some((to_delete, last_block_number)) = self.prune_on_commit.take() {
240 match self.writer.user_header().segment() {
241 StaticFileSegment::Headers => self.prune_header_data(to_delete)?,
242 StaticFileSegment::Transactions => self
243 .prune_transaction_data(to_delete, last_block_number.expect("should exist"))?,
244 StaticFileSegment::Receipts => {
245 self.prune_receipt_data(to_delete, last_block_number.expect("should exist"))?
246 }
247 }
248 }
249
250 if self.writer.is_dirty() {
251 self.writer.commit().map_err(ProviderError::other)?;
253
254 if let Some(metrics) = &self.metrics {
255 metrics.record_segment_operation(
256 self.writer.user_header().segment(),
257 StaticFileProviderOperation::CommitWriter,
258 Some(start.elapsed()),
259 );
260 }
261
262 debug!(
263 target: "provider::static_file",
264 segment = ?self.writer.user_header().segment(),
265 path = ?self.data_path,
266 duration = ?start.elapsed(),
267 "Commit"
268 );
269
270 self.update_index()?;
271 }
272
273 Ok(())
274 }
275
276 #[cfg(feature = "test-utils")]
280 pub fn commit_without_sync_all(&mut self) -> ProviderResult<()> {
281 let start = Instant::now();
282
283 self.writer.commit_without_sync_all().map_err(ProviderError::other)?;
285
286 if let Some(metrics) = &self.metrics {
287 metrics.record_segment_operation(
288 self.writer.user_header().segment(),
289 StaticFileProviderOperation::CommitWriter,
290 Some(start.elapsed()),
291 );
292 }
293
294 debug!(
295 target: "provider::static_file",
296 segment = ?self.writer.user_header().segment(),
297 path = ?self.data_path,
298 duration = ?start.elapsed(),
299 "Commit"
300 );
301
302 self.update_index()?;
303
304 Ok(())
305 }
306
307 fn update_index(&self) -> ProviderResult<()> {
309 let segment_max_block = self
318 .writer
319 .user_header()
320 .block_range()
321 .as_ref()
322 .map(|block_range| block_range.end())
323 .or_else(|| {
324 (self.writer.user_header().expected_block_start() > 0)
325 .then(|| self.writer.user_header().expected_block_start() - 1)
326 });
327
328 self.reader().update_index(self.writer.user_header().segment(), segment_max_block)
329 }
330
331 pub fn increment_block(&mut self, expected_block_number: BlockNumber) -> ProviderResult<()> {
336 let segment = self.writer.user_header().segment();
337
338 self.check_next_block_number(expected_block_number)?;
339
340 let start = Instant::now();
341 if let Some(last_block) = self.writer.user_header().block_end() {
342 if last_block == self.writer.user_header().expected_block_end() {
344 self.commit()?;
346
347 let (writer, data_path) =
349 Self::open(segment, last_block + 1, self.reader.clone(), self.metrics.clone())?;
350 self.writer = writer;
351 self.data_path = data_path;
352
353 *self.writer.user_header_mut() = SegmentHeader::new(
354 self.reader().find_fixed_range(last_block + 1),
355 None,
356 None,
357 segment,
358 );
359 }
360 }
361
362 self.writer.user_header_mut().increment_block();
363 if let Some(metrics) = &self.metrics {
364 metrics.record_segment_operation(
365 segment,
366 StaticFileProviderOperation::IncrementBlock,
367 Some(start.elapsed()),
368 );
369 }
370
371 Ok(())
372 }
373
374 pub fn next_block_number(&self) -> u64 {
376 self.writer
380 .user_header()
381 .block_end()
382 .map(|b| b + 1)
383 .unwrap_or_else(|| self.writer.user_header().expected_block_start())
384 }
385
386 fn check_next_block_number(&self, expected_block_number: u64) -> ProviderResult<()> {
389 let next_static_file_block = self.next_block_number();
390
391 if expected_block_number != next_static_file_block {
392 return Err(ProviderError::UnexpectedStaticFileBlockNumber(
393 self.writer.user_header().segment(),
394 expected_block_number,
395 next_static_file_block,
396 ))
397 }
398 Ok(())
399 }
400
401 fn truncate(&mut self, num_rows: u64, last_block: Option<u64>) -> ProviderResult<()> {
409 let mut remaining_rows = num_rows;
410 let segment = self.writer.user_header().segment();
411 while remaining_rows > 0 {
412 let len = if segment.is_block_based() {
413 self.writer.user_header().block_len().unwrap_or_default()
414 } else {
415 self.writer.user_header().tx_len().unwrap_or_default()
416 };
417
418 if remaining_rows >= len {
419 let block_start = self.writer.user_header().expected_block_start();
422
423 if block_start != 0 &&
429 (segment.is_headers() || last_block.is_some_and(|b| b < block_start))
430 {
431 self.delete_current_and_open_previous()?;
432 } else {
433 self.writer.user_header_mut().prune(len);
435 self.writer.prune_rows(len as usize).map_err(ProviderError::other)?;
436 break
437 }
438
439 remaining_rows -= len;
440 } else {
441 self.writer.user_header_mut().prune(remaining_rows);
443
444 self.writer.prune_rows(remaining_rows as usize).map_err(ProviderError::other)?;
446 remaining_rows = 0;
447 }
448 }
449
450 if let Some(last_block) = last_block {
452 let mut expected_block_start = self.writer.user_header().expected_block_start();
453
454 if num_rows == 0 {
455 while last_block < expected_block_start {
459 self.delete_current_and_open_previous()?;
460 expected_block_start = self.writer.user_header().expected_block_start();
461 }
462 }
463 self.writer.user_header_mut().set_block_range(expected_block_start, last_block);
464 }
465
466 self.commit()?;
468
469 Ok(())
470 }
471
472 fn delete_current_and_open_previous(&mut self) -> Result<(), ProviderError> {
475 let current_path = self.data_path.clone();
476 let (previous_writer, data_path) = Self::open(
477 self.user_header().segment(),
478 self.writer.user_header().expected_block_start() - 1,
479 self.reader.clone(),
480 self.metrics.clone(),
481 )?;
482 self.writer = previous_writer;
483 self.writer.set_dirty();
484 self.data_path = data_path;
485 NippyJar::<SegmentHeader>::load(¤t_path)
486 .map_err(ProviderError::other)?
487 .delete()
488 .map_err(ProviderError::other)?;
489 Ok(())
490 }
491
492 fn append_column<T: Compact>(&mut self, column: T) -> ProviderResult<()> {
494 self.buf.clear();
495 column.to_compact(&mut self.buf);
496
497 self.writer.append_column(Some(Ok(&self.buf))).map_err(ProviderError::other)?;
498 Ok(())
499 }
500
501 fn append_with_tx_number<V: Compact>(
505 &mut self,
506 tx_num: TxNumber,
507 value: V,
508 ) -> ProviderResult<()> {
509 if let Some(range) = self.writer.user_header().tx_range() {
510 let next_tx = range.end() + 1;
511 if next_tx != tx_num {
512 return Err(ProviderError::UnexpectedStaticFileTxNumber(
513 self.writer.user_header().segment(),
514 tx_num,
515 next_tx,
516 ))
517 }
518 self.writer.user_header_mut().increment_tx();
519 } else {
520 self.writer.user_header_mut().set_tx_range(tx_num, tx_num);
521 }
522
523 self.append_column(value)?;
524
525 Ok(())
526 }
527
528 pub fn append_header(&mut self, header: &N::BlockHeader, hash: &BlockHash) -> ProviderResult<()>
535 where
536 N::BlockHeader: Compact,
537 {
538 self.append_header_with_td(header, U256::ZERO, hash)
539 }
540
541 pub fn append_header_with_td(
548 &mut self,
549 header: &N::BlockHeader,
550 total_difficulty: U256,
551 hash: &BlockHash,
552 ) -> ProviderResult<()>
553 where
554 N::BlockHeader: Compact,
555 {
556 let start = Instant::now();
557 self.ensure_no_queued_prune()?;
558
559 debug_assert!(self.writer.user_header().segment() == StaticFileSegment::Headers);
560
561 self.increment_block(header.number())?;
562
563 self.append_column(header)?;
564 self.append_column(CompactU256::from(total_difficulty))?;
565 self.append_column(hash)?;
566
567 if let Some(metrics) = &self.metrics {
568 metrics.record_segment_operation(
569 StaticFileSegment::Headers,
570 StaticFileProviderOperation::Append,
571 Some(start.elapsed()),
572 );
573 }
574
575 Ok(())
576 }
577
578 pub fn append_transaction(&mut self, tx_num: TxNumber, tx: &N::SignedTx) -> ProviderResult<()>
585 where
586 N::SignedTx: Compact,
587 {
588 let start = Instant::now();
589 self.ensure_no_queued_prune()?;
590
591 debug_assert!(self.writer.user_header().segment() == StaticFileSegment::Transactions);
592 self.append_with_tx_number(tx_num, tx)?;
593
594 if let Some(metrics) = &self.metrics {
595 metrics.record_segment_operation(
596 StaticFileSegment::Transactions,
597 StaticFileProviderOperation::Append,
598 Some(start.elapsed()),
599 );
600 }
601
602 Ok(())
603 }
604
605 pub fn append_receipt(&mut self, tx_num: TxNumber, receipt: &N::Receipt) -> ProviderResult<()>
612 where
613 N::Receipt: Compact,
614 {
615 let start = Instant::now();
616 self.ensure_no_queued_prune()?;
617
618 debug_assert!(self.writer.user_header().segment() == StaticFileSegment::Receipts);
619 self.append_with_tx_number(tx_num, receipt)?;
620
621 if let Some(metrics) = &self.metrics {
622 metrics.record_segment_operation(
623 StaticFileSegment::Receipts,
624 StaticFileProviderOperation::Append,
625 Some(start.elapsed()),
626 );
627 }
628
629 Ok(())
630 }
631
632 pub fn append_receipts<I, R>(&mut self, receipts: I) -> ProviderResult<Option<TxNumber>>
636 where
637 I: Iterator<Item = Result<(TxNumber, R), ProviderError>>,
638 R: Borrow<N::Receipt>,
639 N::Receipt: Compact,
640 {
641 debug_assert!(self.writer.user_header().segment() == StaticFileSegment::Receipts);
642
643 let mut receipts_iter = receipts.into_iter().peekable();
644 if receipts_iter.peek().is_none() {
646 return Ok(None);
647 }
648
649 let start = Instant::now();
650 self.ensure_no_queued_prune()?;
651
652 let mut tx_number = 0;
654 let mut count: u64 = 0;
655
656 for receipt_result in receipts_iter {
657 let (tx_num, receipt) = receipt_result?;
658 self.append_with_tx_number(tx_num, receipt.borrow())?;
659 tx_number = tx_num;
660 count += 1;
661 }
662
663 if let Some(metrics) = &self.metrics {
664 metrics.record_segment_operations(
665 StaticFileSegment::Receipts,
666 StaticFileProviderOperation::Append,
667 count,
668 Some(start.elapsed()),
669 );
670 }
671
672 Ok(Some(tx_number))
673 }
674
675 pub fn prune_transactions(
679 &mut self,
680 to_delete: u64,
681 last_block: BlockNumber,
682 ) -> ProviderResult<()> {
683 debug_assert_eq!(self.writer.user_header().segment(), StaticFileSegment::Transactions);
684 self.queue_prune(to_delete, Some(last_block))
685 }
686
687 pub fn prune_receipts(
691 &mut self,
692 to_delete: u64,
693 last_block: BlockNumber,
694 ) -> ProviderResult<()> {
695 debug_assert_eq!(self.writer.user_header().segment(), StaticFileSegment::Receipts);
696 self.queue_prune(to_delete, Some(last_block))
697 }
698
699 pub fn prune_headers(&mut self, to_delete: u64) -> ProviderResult<()> {
701 debug_assert_eq!(self.writer.user_header().segment(), StaticFileSegment::Headers);
702 self.queue_prune(to_delete, None)
703 }
704
705 fn queue_prune(
710 &mut self,
711 to_delete: u64,
712 last_block: Option<BlockNumber>,
713 ) -> ProviderResult<()> {
714 self.ensure_no_queued_prune()?;
715 self.prune_on_commit = Some((to_delete, last_block));
716 Ok(())
717 }
718
719 fn ensure_no_queued_prune(&self) -> ProviderResult<()> {
721 if self.prune_on_commit.is_some() {
722 return Err(ProviderError::other(StaticFileWriterError::new(
723 "Pruning should be committed before appending or pruning more data",
724 )));
725 }
726 Ok(())
727 }
728
729 fn prune_transaction_data(
731 &mut self,
732 to_delete: u64,
733 last_block: BlockNumber,
734 ) -> ProviderResult<()> {
735 let start = Instant::now();
736
737 debug_assert!(self.writer.user_header().segment() == StaticFileSegment::Transactions);
738
739 self.truncate(to_delete, Some(last_block))?;
740
741 if let Some(metrics) = &self.metrics {
742 metrics.record_segment_operation(
743 StaticFileSegment::Transactions,
744 StaticFileProviderOperation::Prune,
745 Some(start.elapsed()),
746 );
747 }
748
749 Ok(())
750 }
751
752 fn prune_receipt_data(
754 &mut self,
755 to_delete: u64,
756 last_block: BlockNumber,
757 ) -> ProviderResult<()> {
758 let start = Instant::now();
759
760 debug_assert!(self.writer.user_header().segment() == StaticFileSegment::Receipts);
761
762 self.truncate(to_delete, Some(last_block))?;
763
764 if let Some(metrics) = &self.metrics {
765 metrics.record_segment_operation(
766 StaticFileSegment::Receipts,
767 StaticFileProviderOperation::Prune,
768 Some(start.elapsed()),
769 );
770 }
771
772 Ok(())
773 }
774
775 fn prune_header_data(&mut self, to_delete: u64) -> ProviderResult<()> {
777 let start = Instant::now();
778
779 debug_assert!(self.writer.user_header().segment() == StaticFileSegment::Headers);
780
781 self.truncate(to_delete, None)?;
782
783 if let Some(metrics) = &self.metrics {
784 metrics.record_segment_operation(
785 StaticFileSegment::Headers,
786 StaticFileProviderOperation::Prune,
787 Some(start.elapsed()),
788 );
789 }
790
791 Ok(())
792 }
793
794 fn reader(&self) -> StaticFileProvider<N> {
795 Self::upgrade_provider_to_strong_reference(&self.reader)
796 }
797
798 fn upgrade_provider_to_strong_reference(
807 provider: &Weak<StaticFileProviderInner<N>>,
808 ) -> StaticFileProvider<N> {
809 provider.upgrade().map(StaticFileProvider).expect("StaticFileProvider is dropped")
810 }
811
812 pub const fn user_header(&self) -> &SegmentHeader {
814 self.writer.user_header()
815 }
816
817 pub const fn user_header_mut(&mut self) -> &mut SegmentHeader {
819 self.writer.user_header_mut()
820 }
821
822 #[cfg(any(test, feature = "test-utils"))]
824 pub const fn set_block_range(&mut self, block_range: std::ops::RangeInclusive<BlockNumber>) {
825 self.writer.user_header_mut().set_block_range(*block_range.start(), *block_range.end())
826 }
827
828 #[cfg(any(test, feature = "test-utils"))]
830 pub const fn inner(&mut self) -> &mut NippyJarWriter<SegmentHeader> {
831 &mut self.writer
832 }
833}
834
835fn create_jar(
836 segment: StaticFileSegment,
837 path: &Path,
838 expected_block_range: SegmentRangeInclusive,
839) -> NippyJar<SegmentHeader> {
840 let mut jar = NippyJar::new(
841 segment.columns(),
842 path,
843 SegmentHeader::new(expected_block_range, None, None, segment),
844 );
845
846 if segment.is_headers() {
849 jar = jar.with_lz4();
850 }
851
852 jar
853}