reth_provider/providers/static_file/
writer.rs1use super::{
2 manager::StaticFileProviderInner, metrics::StaticFileProviderMetrics, StaticFileProvider,
3};
4use crate::providers::static_file::metrics::StaticFileProviderOperation;
5use alloy_consensus::BlockHeader;
6use alloy_primitives::{BlockHash, BlockNumber, TxNumber, U256};
7use parking_lot::{lock_api::RwLockWriteGuard, RawRwLock, RwLock};
8use reth_codecs::Compact;
9use reth_db_api::models::CompactU256;
10use reth_nippy_jar::{NippyJar, NippyJarError, NippyJarWriter};
11use reth_node_types::NodePrimitives;
12use reth_static_file_types::{SegmentHeader, SegmentRangeInclusive, StaticFileSegment};
13use reth_storage_errors::provider::{ProviderError, ProviderResult, StaticFileWriterError};
14use std::{
15 borrow::Borrow,
16 fmt::Debug,
17 path::{Path, PathBuf},
18 sync::{Arc, Weak},
19 time::Instant,
20};
21use tracing::debug;
22
23#[derive(Debug)]
28pub(crate) struct StaticFileWriters<N> {
29 headers: RwLock<Option<StaticFileProviderRW<N>>>,
30 transactions: RwLock<Option<StaticFileProviderRW<N>>>,
31 receipts: RwLock<Option<StaticFileProviderRW<N>>>,
32}
33
34impl<N> Default for StaticFileWriters<N> {
35 fn default() -> Self {
36 Self {
37 headers: Default::default(),
38 transactions: Default::default(),
39 receipts: Default::default(),
40 }
41 }
42}
43
44impl<N: NodePrimitives> StaticFileWriters<N> {
45 pub(crate) fn get_or_create(
46 &self,
47 segment: StaticFileSegment,
48 create_fn: impl FnOnce() -> ProviderResult<StaticFileProviderRW<N>>,
49 ) -> ProviderResult<StaticFileProviderRWRefMut<'_, N>> {
50 let mut write_guard = match segment {
51 StaticFileSegment::Headers => self.headers.write(),
52 StaticFileSegment::Transactions => self.transactions.write(),
53 StaticFileSegment::Receipts => self.receipts.write(),
54 };
55
56 if write_guard.is_none() {
57 *write_guard = Some(create_fn()?);
58 }
59
60 Ok(StaticFileProviderRWRefMut(write_guard))
61 }
62
63 pub(crate) fn commit(&self) -> ProviderResult<()> {
64 for writer_lock in [&self.headers, &self.transactions, &self.receipts] {
65 let mut writer = writer_lock.write();
66 if let Some(writer) = writer.as_mut() {
67 writer.commit()?;
68 }
69 }
70 Ok(())
71 }
72
73 pub(crate) fn has_unwind_queued(&self) -> bool {
74 for writer_lock in [&self.headers, &self.transactions, &self.receipts] {
75 let writer = writer_lock.read();
76 if let Some(writer) = writer.as_ref() &&
77 writer.will_prune_on_commit()
78 {
79 return true
80 }
81 }
82 false
83 }
84}
85
86#[derive(Debug)]
88pub struct StaticFileProviderRWRefMut<'a, N>(
89 pub(crate) RwLockWriteGuard<'a, RawRwLock, Option<StaticFileProviderRW<N>>>,
90);
91
92impl<N> std::ops::DerefMut for StaticFileProviderRWRefMut<'_, N> {
93 fn deref_mut(&mut self) -> &mut Self::Target {
94 self.0.as_mut().expect("static file writer provider should be init")
96 }
97}
98
99impl<N> std::ops::Deref for StaticFileProviderRWRefMut<'_, N> {
100 type Target = StaticFileProviderRW<N>;
101
102 fn deref(&self) -> &Self::Target {
103 self.0.as_ref().expect("static file writer provider should be init")
105 }
106}
107
108#[derive(Debug)]
109pub struct StaticFileProviderRW<N> {
111 reader: Weak<StaticFileProviderInner<N>>,
115 writer: NippyJarWriter<SegmentHeader>,
117 data_path: PathBuf,
119 buf: Vec<u8>,
121 metrics: Option<Arc<StaticFileProviderMetrics>>,
123 prune_on_commit: Option<(u64, Option<BlockNumber>)>,
126}
127
128impl<N: NodePrimitives> StaticFileProviderRW<N> {
129 pub fn new(
134 segment: StaticFileSegment,
135 block: BlockNumber,
136 reader: Weak<StaticFileProviderInner<N>>,
137 metrics: Option<Arc<StaticFileProviderMetrics>>,
138 ) -> ProviderResult<Self> {
139 let (writer, data_path) = Self::open(segment, block, reader.clone(), metrics.clone())?;
140 let mut writer = Self {
141 writer,
142 data_path,
143 buf: Vec::with_capacity(100),
144 reader,
145 metrics,
146 prune_on_commit: None,
147 };
148
149 writer.ensure_end_range_consistency()?;
150
151 Ok(writer)
152 }
153
154 fn open(
155 segment: StaticFileSegment,
156 block: u64,
157 reader: Weak<StaticFileProviderInner<N>>,
158 metrics: Option<Arc<StaticFileProviderMetrics>>,
159 ) -> ProviderResult<(NippyJarWriter<SegmentHeader>, PathBuf)> {
160 let start = Instant::now();
161
162 let static_file_provider = Self::upgrade_provider_to_strong_reference(&reader);
163
164 let block_range = static_file_provider.find_fixed_range(block);
165 let (jar, path) = match static_file_provider.get_segment_provider_from_block(
166 segment,
167 block_range.start(),
168 None,
169 ) {
170 Ok(provider) => (
171 NippyJar::load(provider.data_path()).map_err(ProviderError::other)?,
172 provider.data_path().into(),
173 ),
174 Err(ProviderError::MissingStaticFileBlock(_, _)) => {
175 let path = static_file_provider.directory().join(segment.filename(&block_range));
176 (create_jar(segment, &path, block_range), path)
177 }
178 Err(err) => return Err(err),
179 };
180
181 let result = match NippyJarWriter::new(jar) {
182 Ok(writer) => Ok((writer, path)),
183 Err(NippyJarError::FrozenJar) => {
184 Err(ProviderError::FinalizedStaticFile(segment, block))
186 }
187 Err(e) => Err(ProviderError::other(e)),
188 }?;
189
190 if let Some(metrics) = &metrics {
191 metrics.record_segment_operation(
192 segment,
193 StaticFileProviderOperation::OpenWriter,
194 Some(start.elapsed()),
195 );
196 }
197
198 Ok(result)
199 }
200
201 fn ensure_end_range_consistency(&mut self) -> ProviderResult<()> {
210 let expected_rows = if self.user_header().segment().is_headers() {
212 self.user_header().block_len().unwrap_or_default()
213 } else {
214 self.user_header().tx_len().unwrap_or_default()
215 };
216 let actual_rows = self.writer.rows() as u64;
217 let pruned_rows = expected_rows.saturating_sub(actual_rows);
218 if pruned_rows > 0 {
219 self.user_header_mut().prune(pruned_rows);
220 }
221
222 self.writer.commit().map_err(ProviderError::other)?;
223
224 self.update_index()?;
226 Ok(())
227 }
228
229 pub const fn will_prune_on_commit(&self) -> bool {
231 self.prune_on_commit.is_some()
232 }
233
234 pub fn commit(&mut self) -> ProviderResult<()> {
236 let start = Instant::now();
237
238 if let Some((to_delete, last_block_number)) = self.prune_on_commit.take() {
240 match self.writer.user_header().segment() {
241 StaticFileSegment::Headers => self.prune_header_data(to_delete)?,
242 StaticFileSegment::Transactions => self
243 .prune_transaction_data(to_delete, last_block_number.expect("should exist"))?,
244 StaticFileSegment::Receipts => {
245 self.prune_receipt_data(to_delete, last_block_number.expect("should exist"))?
246 }
247 }
248 }
249
250 if self.writer.is_dirty() {
251 self.writer.commit().map_err(ProviderError::other)?;
253
254 if let Some(metrics) = &self.metrics {
255 metrics.record_segment_operation(
256 self.writer.user_header().segment(),
257 StaticFileProviderOperation::CommitWriter,
258 Some(start.elapsed()),
259 );
260 }
261
262 debug!(
263 target: "provider::static_file",
264 segment = ?self.writer.user_header().segment(),
265 path = ?self.data_path,
266 duration = ?start.elapsed(),
267 "Commit"
268 );
269
270 self.update_index()?;
271 }
272
273 Ok(())
274 }
275
276 #[cfg(feature = "test-utils")]
280 pub fn commit_without_sync_all(&mut self) -> ProviderResult<()> {
281 let start = Instant::now();
282
283 self.writer.commit_without_sync_all().map_err(ProviderError::other)?;
285
286 if let Some(metrics) = &self.metrics {
287 metrics.record_segment_operation(
288 self.writer.user_header().segment(),
289 StaticFileProviderOperation::CommitWriter,
290 Some(start.elapsed()),
291 );
292 }
293
294 debug!(
295 target: "provider::static_file",
296 segment = ?self.writer.user_header().segment(),
297 path = ?self.data_path,
298 duration = ?start.elapsed(),
299 "Commit"
300 );
301
302 self.update_index()?;
303
304 Ok(())
305 }
306
307 fn update_index(&self) -> ProviderResult<()> {
309 let segment_max_block = self
318 .writer
319 .user_header()
320 .block_range()
321 .as_ref()
322 .map(|block_range| block_range.end())
323 .or_else(|| {
324 (self.writer.user_header().expected_block_start() > 0)
325 .then(|| self.writer.user_header().expected_block_start() - 1)
326 });
327
328 self.reader().update_index(self.writer.user_header().segment(), segment_max_block)
329 }
330
331 pub fn increment_block(&mut self, expected_block_number: BlockNumber) -> ProviderResult<()> {
336 let segment = self.writer.user_header().segment();
337
338 self.check_next_block_number(expected_block_number)?;
339
340 let start = Instant::now();
341 if let Some(last_block) = self.writer.user_header().block_end() {
342 if last_block == self.writer.user_header().expected_block_end() {
344 self.commit()?;
346
347 let (writer, data_path) =
349 Self::open(segment, last_block + 1, self.reader.clone(), self.metrics.clone())?;
350 self.writer = writer;
351 self.data_path = data_path;
352
353 *self.writer.user_header_mut() = SegmentHeader::new(
354 self.reader().find_fixed_range(last_block + 1),
355 None,
356 None,
357 segment,
358 );
359 }
360 }
361
362 self.writer.user_header_mut().increment_block();
363 if let Some(metrics) = &self.metrics {
364 metrics.record_segment_operation(
365 segment,
366 StaticFileProviderOperation::IncrementBlock,
367 Some(start.elapsed()),
368 );
369 }
370
371 Ok(())
372 }
373
374 pub fn next_block_number(&self) -> u64 {
376 self.writer
380 .user_header()
381 .block_end()
382 .map(|b| b + 1)
383 .unwrap_or_else(|| self.writer.user_header().expected_block_start())
384 }
385
386 fn check_next_block_number(&self, expected_block_number: u64) -> ProviderResult<()> {
389 let next_static_file_block = self.next_block_number();
390
391 if expected_block_number != next_static_file_block {
392 return Err(ProviderError::UnexpectedStaticFileBlockNumber(
393 self.writer.user_header().segment(),
394 expected_block_number,
395 next_static_file_block,
396 ))
397 }
398 Ok(())
399 }
400
401 fn truncate(&mut self, num_rows: u64, last_block: Option<u64>) -> ProviderResult<()> {
409 let mut remaining_rows = num_rows;
410 let segment = self.writer.user_header().segment();
411 while remaining_rows > 0 {
412 let len = if segment.is_block_based() {
413 self.writer.user_header().block_len().unwrap_or_default()
414 } else {
415 self.writer.user_header().tx_len().unwrap_or_default()
416 };
417
418 if remaining_rows >= len {
419 let block_start = self.writer.user_header().expected_block_start();
422
423 if block_start != 0 &&
429 (segment.is_headers() || last_block.is_some_and(|b| b < block_start))
430 {
431 self.delete_current_and_open_previous()?;
432 } else {
433 self.writer.user_header_mut().prune(len);
435 self.writer.prune_rows(len as usize).map_err(ProviderError::other)?;
436 break
437 }
438
439 remaining_rows -= len;
440 } else {
441 self.writer.user_header_mut().prune(remaining_rows);
443
444 self.writer.prune_rows(remaining_rows as usize).map_err(ProviderError::other)?;
446 remaining_rows = 0;
447 }
448 }
449
450 if let Some(last_block) = last_block {
452 let mut expected_block_start = self.writer.user_header().expected_block_start();
453
454 if num_rows == 0 {
455 while last_block < expected_block_start {
459 self.delete_current_and_open_previous()?;
460 expected_block_start = self.writer.user_header().expected_block_start();
461 }
462 }
463 self.writer.user_header_mut().set_block_range(expected_block_start, last_block);
464 }
465
466 self.commit()?;
468
469 Ok(())
470 }
471
472 fn delete_current_and_open_previous(&mut self) -> Result<(), ProviderError> {
475 let current_path = self.data_path.clone();
476 let (previous_writer, data_path) = Self::open(
477 self.user_header().segment(),
478 self.writer.user_header().expected_block_start() - 1,
479 self.reader.clone(),
480 self.metrics.clone(),
481 )?;
482 self.writer = previous_writer;
483 self.writer.set_dirty();
484 self.data_path = data_path;
485 NippyJar::<SegmentHeader>::load(¤t_path)
486 .map_err(ProviderError::other)?
487 .delete()
488 .map_err(ProviderError::other)?;
489 Ok(())
490 }
491
492 fn append_column<T: Compact>(&mut self, column: T) -> ProviderResult<()> {
494 self.buf.clear();
495 column.to_compact(&mut self.buf);
496
497 self.writer.append_column(Some(Ok(&self.buf))).map_err(ProviderError::other)?;
498 Ok(())
499 }
500
501 fn append_with_tx_number<V: Compact>(
505 &mut self,
506 tx_num: TxNumber,
507 value: V,
508 ) -> ProviderResult<()> {
509 if let Some(range) = self.writer.user_header().tx_range() {
510 let next_tx = range.end() + 1;
511 if next_tx != tx_num {
512 return Err(ProviderError::UnexpectedStaticFileTxNumber(
513 self.writer.user_header().segment(),
514 tx_num,
515 next_tx,
516 ))
517 }
518 self.writer.user_header_mut().increment_tx();
519 } else {
520 self.writer.user_header_mut().set_tx_range(tx_num, tx_num);
521 }
522
523 self.append_column(value)?;
524
525 Ok(())
526 }
527
528 pub fn append_header(
535 &mut self,
536 header: &N::BlockHeader,
537 total_difficulty: U256,
538 hash: &BlockHash,
539 ) -> ProviderResult<()>
540 where
541 N::BlockHeader: Compact,
542 {
543 let start = Instant::now();
544 self.ensure_no_queued_prune()?;
545
546 debug_assert!(self.writer.user_header().segment() == StaticFileSegment::Headers);
547
548 self.increment_block(header.number())?;
549
550 self.append_column(header)?;
551 self.append_column(CompactU256::from(total_difficulty))?;
552 self.append_column(hash)?;
553
554 if let Some(metrics) = &self.metrics {
555 metrics.record_segment_operation(
556 StaticFileSegment::Headers,
557 StaticFileProviderOperation::Append,
558 Some(start.elapsed()),
559 );
560 }
561
562 Ok(())
563 }
564
565 pub fn append_transaction(&mut self, tx_num: TxNumber, tx: &N::SignedTx) -> ProviderResult<()>
572 where
573 N::SignedTx: Compact,
574 {
575 let start = Instant::now();
576 self.ensure_no_queued_prune()?;
577
578 debug_assert!(self.writer.user_header().segment() == StaticFileSegment::Transactions);
579 self.append_with_tx_number(tx_num, tx)?;
580
581 if let Some(metrics) = &self.metrics {
582 metrics.record_segment_operation(
583 StaticFileSegment::Transactions,
584 StaticFileProviderOperation::Append,
585 Some(start.elapsed()),
586 );
587 }
588
589 Ok(())
590 }
591
592 pub fn append_receipt(&mut self, tx_num: TxNumber, receipt: &N::Receipt) -> ProviderResult<()>
599 where
600 N::Receipt: Compact,
601 {
602 let start = Instant::now();
603 self.ensure_no_queued_prune()?;
604
605 debug_assert!(self.writer.user_header().segment() == StaticFileSegment::Receipts);
606 self.append_with_tx_number(tx_num, receipt)?;
607
608 if let Some(metrics) = &self.metrics {
609 metrics.record_segment_operation(
610 StaticFileSegment::Receipts,
611 StaticFileProviderOperation::Append,
612 Some(start.elapsed()),
613 );
614 }
615
616 Ok(())
617 }
618
619 pub fn append_receipts<I, R>(&mut self, receipts: I) -> ProviderResult<Option<TxNumber>>
623 where
624 I: Iterator<Item = Result<(TxNumber, R), ProviderError>>,
625 R: Borrow<N::Receipt>,
626 N::Receipt: Compact,
627 {
628 debug_assert!(self.writer.user_header().segment() == StaticFileSegment::Receipts);
629
630 let mut receipts_iter = receipts.into_iter().peekable();
631 if receipts_iter.peek().is_none() {
633 return Ok(None);
634 }
635
636 let start = Instant::now();
637 self.ensure_no_queued_prune()?;
638
639 let mut tx_number = 0;
641 let mut count: u64 = 0;
642
643 for receipt_result in receipts_iter {
644 let (tx_num, receipt) = receipt_result?;
645 self.append_with_tx_number(tx_num, receipt.borrow())?;
646 tx_number = tx_num;
647 count += 1;
648 }
649
650 if let Some(metrics) = &self.metrics {
651 metrics.record_segment_operations(
652 StaticFileSegment::Receipts,
653 StaticFileProviderOperation::Append,
654 count,
655 Some(start.elapsed()),
656 );
657 }
658
659 Ok(Some(tx_number))
660 }
661
662 pub fn prune_transactions(
666 &mut self,
667 to_delete: u64,
668 last_block: BlockNumber,
669 ) -> ProviderResult<()> {
670 debug_assert_eq!(self.writer.user_header().segment(), StaticFileSegment::Transactions);
671 self.queue_prune(to_delete, Some(last_block))
672 }
673
674 pub fn prune_receipts(
678 &mut self,
679 to_delete: u64,
680 last_block: BlockNumber,
681 ) -> ProviderResult<()> {
682 debug_assert_eq!(self.writer.user_header().segment(), StaticFileSegment::Receipts);
683 self.queue_prune(to_delete, Some(last_block))
684 }
685
686 pub fn prune_headers(&mut self, to_delete: u64) -> ProviderResult<()> {
688 debug_assert_eq!(self.writer.user_header().segment(), StaticFileSegment::Headers);
689 self.queue_prune(to_delete, None)
690 }
691
692 fn queue_prune(
697 &mut self,
698 to_delete: u64,
699 last_block: Option<BlockNumber>,
700 ) -> ProviderResult<()> {
701 self.ensure_no_queued_prune()?;
702 self.prune_on_commit = Some((to_delete, last_block));
703 Ok(())
704 }
705
706 fn ensure_no_queued_prune(&self) -> ProviderResult<()> {
708 if self.prune_on_commit.is_some() {
709 return Err(ProviderError::other(StaticFileWriterError::new(
710 "Pruning should be committed before appending or pruning more data",
711 )));
712 }
713 Ok(())
714 }
715
716 fn prune_transaction_data(
718 &mut self,
719 to_delete: u64,
720 last_block: BlockNumber,
721 ) -> ProviderResult<()> {
722 let start = Instant::now();
723
724 debug_assert!(self.writer.user_header().segment() == StaticFileSegment::Transactions);
725
726 self.truncate(to_delete, Some(last_block))?;
727
728 if let Some(metrics) = &self.metrics {
729 metrics.record_segment_operation(
730 StaticFileSegment::Transactions,
731 StaticFileProviderOperation::Prune,
732 Some(start.elapsed()),
733 );
734 }
735
736 Ok(())
737 }
738
739 fn prune_receipt_data(
741 &mut self,
742 to_delete: u64,
743 last_block: BlockNumber,
744 ) -> ProviderResult<()> {
745 let start = Instant::now();
746
747 debug_assert!(self.writer.user_header().segment() == StaticFileSegment::Receipts);
748
749 self.truncate(to_delete, Some(last_block))?;
750
751 if let Some(metrics) = &self.metrics {
752 metrics.record_segment_operation(
753 StaticFileSegment::Receipts,
754 StaticFileProviderOperation::Prune,
755 Some(start.elapsed()),
756 );
757 }
758
759 Ok(())
760 }
761
762 fn prune_header_data(&mut self, to_delete: u64) -> ProviderResult<()> {
764 let start = Instant::now();
765
766 debug_assert!(self.writer.user_header().segment() == StaticFileSegment::Headers);
767
768 self.truncate(to_delete, None)?;
769
770 if let Some(metrics) = &self.metrics {
771 metrics.record_segment_operation(
772 StaticFileSegment::Headers,
773 StaticFileProviderOperation::Prune,
774 Some(start.elapsed()),
775 );
776 }
777
778 Ok(())
779 }
780
781 fn reader(&self) -> StaticFileProvider<N> {
782 Self::upgrade_provider_to_strong_reference(&self.reader)
783 }
784
785 fn upgrade_provider_to_strong_reference(
794 provider: &Weak<StaticFileProviderInner<N>>,
795 ) -> StaticFileProvider<N> {
796 provider.upgrade().map(StaticFileProvider).expect("StaticFileProvider is dropped")
797 }
798
799 pub const fn user_header(&self) -> &SegmentHeader {
801 self.writer.user_header()
802 }
803
804 pub const fn user_header_mut(&mut self) -> &mut SegmentHeader {
806 self.writer.user_header_mut()
807 }
808
809 #[cfg(any(test, feature = "test-utils"))]
811 pub const fn set_block_range(&mut self, block_range: std::ops::RangeInclusive<BlockNumber>) {
812 self.writer.user_header_mut().set_block_range(*block_range.start(), *block_range.end())
813 }
814
815 #[cfg(any(test, feature = "test-utils"))]
817 pub const fn inner(&mut self) -> &mut NippyJarWriter<SegmentHeader> {
818 &mut self.writer
819 }
820}
821
822fn create_jar(
823 segment: StaticFileSegment,
824 path: &Path,
825 expected_block_range: SegmentRangeInclusive,
826) -> NippyJar<SegmentHeader> {
827 let mut jar = NippyJar::new(
828 segment.columns(),
829 path,
830 SegmentHeader::new(expected_block_range, None, None, segment),
831 );
832
833 if segment.is_headers() {
836 jar = jar.with_lz4();
837 }
838
839 jar
840}