1use super::{
2 manager::StaticFileProviderInner, metrics::StaticFileProviderMetrics, StaticFileProvider,
3};
4use crate::providers::static_file::metrics::StaticFileProviderOperation;
5use alloy_consensus::BlockHeader;
6use alloy_primitives::{BlockHash, BlockNumber, TxNumber, U256};
7use parking_lot::{lock_api::RwLockWriteGuard, RawRwLock, RwLock};
8use reth_codecs::Compact;
9use reth_db_api::models::{
10 CompactU256, StoredBlockBodyIndices, StoredBlockOmmers, StoredBlockWithdrawals,
11};
12use reth_nippy_jar::{NippyJar, NippyJarError, NippyJarWriter};
13use reth_node_types::NodePrimitives;
14use reth_primitives::{
15 static_file::{SegmentHeader, SegmentRangeInclusive},
16 StaticFileSegment,
17};
18use reth_storage_errors::provider::{ProviderError, ProviderResult, StaticFileWriterError};
19use std::{
20 borrow::Borrow,
21 fmt::Debug,
22 path::{Path, PathBuf},
23 sync::{Arc, Weak},
24 time::Instant,
25};
26use tracing::debug;
27
28#[derive(Debug)]
33pub(crate) struct StaticFileWriters<N> {
34 headers: RwLock<Option<StaticFileProviderRW<N>>>,
35 transactions: RwLock<Option<StaticFileProviderRW<N>>>,
36 receipts: RwLock<Option<StaticFileProviderRW<N>>>,
37 block_meta: RwLock<Option<StaticFileProviderRW<N>>>,
38}
39
40impl<N> Default for StaticFileWriters<N> {
41 fn default() -> Self {
42 Self {
43 headers: Default::default(),
44 transactions: Default::default(),
45 receipts: Default::default(),
46 block_meta: Default::default(),
47 }
48 }
49}
50
51impl<N: NodePrimitives> StaticFileWriters<N> {
52 pub(crate) fn get_or_create(
53 &self,
54 segment: StaticFileSegment,
55 create_fn: impl FnOnce() -> ProviderResult<StaticFileProviderRW<N>>,
56 ) -> ProviderResult<StaticFileProviderRWRefMut<'_, N>> {
57 let mut write_guard = match segment {
58 StaticFileSegment::Headers => self.headers.write(),
59 StaticFileSegment::Transactions => self.transactions.write(),
60 StaticFileSegment::Receipts => self.receipts.write(),
61 StaticFileSegment::BlockMeta => self.block_meta.write(),
62 };
63
64 if write_guard.is_none() {
65 *write_guard = Some(create_fn()?);
66 }
67
68 Ok(StaticFileProviderRWRefMut(write_guard))
69 }
70
71 pub(crate) fn commit(&self) -> ProviderResult<()> {
72 for writer_lock in [&self.headers, &self.transactions, &self.receipts] {
73 let mut writer = writer_lock.write();
74 if let Some(writer) = writer.as_mut() {
75 writer.commit()?;
76 }
77 }
78 Ok(())
79 }
80}
81
82#[derive(Debug)]
84pub struct StaticFileProviderRWRefMut<'a, N>(
85 pub(crate) RwLockWriteGuard<'a, RawRwLock, Option<StaticFileProviderRW<N>>>,
86);
87
88impl<N> std::ops::DerefMut for StaticFileProviderRWRefMut<'_, N> {
89 fn deref_mut(&mut self) -> &mut Self::Target {
90 self.0.as_mut().expect("static file writer provider should be init")
92 }
93}
94
95impl<N> std::ops::Deref for StaticFileProviderRWRefMut<'_, N> {
96 type Target = StaticFileProviderRW<N>;
97
98 fn deref(&self) -> &Self::Target {
99 self.0.as_ref().expect("static file writer provider should be init")
101 }
102}
103
104#[derive(Debug)]
105pub struct StaticFileProviderRW<N> {
107 reader: Weak<StaticFileProviderInner<N>>,
111 writer: NippyJarWriter<SegmentHeader>,
113 data_path: PathBuf,
115 buf: Vec<u8>,
117 metrics: Option<Arc<StaticFileProviderMetrics>>,
119 prune_on_commit: Option<(u64, Option<BlockNumber>)>,
122}
123
124impl<N: NodePrimitives> StaticFileProviderRW<N> {
125 pub fn new(
130 segment: StaticFileSegment,
131 block: BlockNumber,
132 reader: Weak<StaticFileProviderInner<N>>,
133 metrics: Option<Arc<StaticFileProviderMetrics>>,
134 ) -> ProviderResult<Self> {
135 let (writer, data_path) = Self::open(segment, block, reader.clone(), metrics.clone())?;
136 let mut writer = Self {
137 writer,
138 data_path,
139 buf: Vec::with_capacity(100),
140 reader,
141 metrics,
142 prune_on_commit: None,
143 };
144
145 writer.ensure_end_range_consistency()?;
146
147 Ok(writer)
148 }
149
150 fn open(
151 segment: StaticFileSegment,
152 block: u64,
153 reader: Weak<StaticFileProviderInner<N>>,
154 metrics: Option<Arc<StaticFileProviderMetrics>>,
155 ) -> ProviderResult<(NippyJarWriter<SegmentHeader>, PathBuf)> {
156 let start = Instant::now();
157
158 let static_file_provider = Self::upgrade_provider_to_strong_reference(&reader);
159
160 let block_range = static_file_provider.find_fixed_range(block);
161 let (jar, path) = match static_file_provider.get_segment_provider_from_block(
162 segment,
163 block_range.start(),
164 None,
165 ) {
166 Ok(provider) => (
167 NippyJar::load(provider.data_path()).map_err(ProviderError::other)?,
168 provider.data_path().into(),
169 ),
170 Err(ProviderError::MissingStaticFileBlock(_, _)) => {
171 let path = static_file_provider.directory().join(segment.filename(&block_range));
172 (create_jar(segment, &path, block_range), path)
173 }
174 Err(err) => return Err(err),
175 };
176
177 let result = match NippyJarWriter::new(jar) {
178 Ok(writer) => Ok((writer, path)),
179 Err(NippyJarError::FrozenJar) => {
180 Err(ProviderError::FinalizedStaticFile(segment, block))
182 }
183 Err(e) => Err(ProviderError::other(e)),
184 }?;
185
186 if let Some(metrics) = &metrics {
187 metrics.record_segment_operation(
188 segment,
189 StaticFileProviderOperation::OpenWriter,
190 Some(start.elapsed()),
191 );
192 }
193
194 Ok(result)
195 }
196
197 fn ensure_end_range_consistency(&mut self) -> ProviderResult<()> {
206 let expected_rows = if self.user_header().segment().is_headers() {
208 self.user_header().block_len().unwrap_or_default()
209 } else {
210 self.user_header().tx_len().unwrap_or_default()
211 };
212 let pruned_rows = expected_rows - self.writer.rows() as u64;
213 if pruned_rows > 0 {
214 self.user_header_mut().prune(pruned_rows);
215 }
216
217 self.writer.commit().map_err(ProviderError::other)?;
218
219 self.update_index()?;
221 Ok(())
222 }
223
224 pub fn commit(&mut self) -> ProviderResult<()> {
226 let start = Instant::now();
227
228 if let Some((to_delete, last_block_number)) = self.prune_on_commit.take() {
230 match self.writer.user_header().segment() {
231 StaticFileSegment::Headers => self.prune_header_data(to_delete)?,
232 StaticFileSegment::Transactions => self
233 .prune_transaction_data(to_delete, last_block_number.expect("should exist"))?,
234 StaticFileSegment::Receipts => {
235 self.prune_receipt_data(to_delete, last_block_number.expect("should exist"))?
236 }
237 StaticFileSegment::BlockMeta => todo!(),
238 }
239 }
240
241 if self.writer.is_dirty() {
242 self.writer.commit().map_err(ProviderError::other)?;
244
245 if let Some(metrics) = &self.metrics {
246 metrics.record_segment_operation(
247 self.writer.user_header().segment(),
248 StaticFileProviderOperation::CommitWriter,
249 Some(start.elapsed()),
250 );
251 }
252
253 debug!(
254 target: "provider::static_file",
255 segment = ?self.writer.user_header().segment(),
256 path = ?self.data_path,
257 duration = ?start.elapsed(),
258 "Commit"
259 );
260
261 self.update_index()?;
262 }
263
264 Ok(())
265 }
266
267 #[cfg(feature = "test-utils")]
271 pub fn commit_without_sync_all(&mut self) -> ProviderResult<()> {
272 let start = Instant::now();
273
274 self.writer.commit_without_sync_all().map_err(ProviderError::other)?;
276
277 if let Some(metrics) = &self.metrics {
278 metrics.record_segment_operation(
279 self.writer.user_header().segment(),
280 StaticFileProviderOperation::CommitWriter,
281 Some(start.elapsed()),
282 );
283 }
284
285 debug!(
286 target: "provider::static_file",
287 segment = ?self.writer.user_header().segment(),
288 path = ?self.data_path,
289 duration = ?start.elapsed(),
290 "Commit"
291 );
292
293 self.update_index()?;
294
295 Ok(())
296 }
297
298 fn update_index(&self) -> ProviderResult<()> {
300 let segment_max_block = self
309 .writer
310 .user_header()
311 .block_range()
312 .as_ref()
313 .map(|block_range| block_range.end())
314 .or_else(|| {
315 (self.writer.user_header().expected_block_start() > 0)
316 .then(|| self.writer.user_header().expected_block_start() - 1)
317 });
318
319 self.reader().update_index(self.writer.user_header().segment(), segment_max_block)
320 }
321
322 pub fn increment_block(&mut self, expected_block_number: BlockNumber) -> ProviderResult<()> {
327 let segment = self.writer.user_header().segment();
328
329 self.check_next_block_number(expected_block_number)?;
330
331 let start = Instant::now();
332 if let Some(last_block) = self.writer.user_header().block_end() {
333 if last_block == self.writer.user_header().expected_block_end() {
335 self.commit()?;
337
338 let (writer, data_path) =
340 Self::open(segment, last_block + 1, self.reader.clone(), self.metrics.clone())?;
341 self.writer = writer;
342 self.data_path = data_path;
343
344 *self.writer.user_header_mut() = SegmentHeader::new(
345 self.reader().find_fixed_range(last_block + 1),
346 None,
347 None,
348 segment,
349 );
350 }
351 }
352
353 self.writer.user_header_mut().increment_block();
354 if let Some(metrics) = &self.metrics {
355 metrics.record_segment_operation(
356 segment,
357 StaticFileProviderOperation::IncrementBlock,
358 Some(start.elapsed()),
359 );
360 }
361
362 Ok(())
363 }
364
365 fn check_next_block_number(&self, expected_block_number: u64) -> ProviderResult<()> {
368 let next_static_file_block = self
372 .writer
373 .user_header()
374 .block_end()
375 .map(|b| b + 1)
376 .unwrap_or_else(|| self.writer.user_header().expected_block_start());
377
378 if expected_block_number != next_static_file_block {
379 return Err(ProviderError::UnexpectedStaticFileBlockNumber(
380 self.writer.user_header().segment(),
381 expected_block_number,
382 next_static_file_block,
383 ))
384 }
385 Ok(())
386 }
387
388 fn truncate(&mut self, num_rows: u64, last_block: Option<u64>) -> ProviderResult<()> {
396 let mut remaining_rows = num_rows;
397 let segment = self.writer.user_header().segment();
398 while remaining_rows > 0 {
399 let len = if segment.is_block_based() {
400 self.writer.user_header().block_len().unwrap_or_default()
401 } else {
402 self.writer.user_header().tx_len().unwrap_or_default()
403 };
404
405 if remaining_rows >= len {
406 let block_start = self.writer.user_header().expected_block_start();
409
410 if block_start != 0 &&
416 (segment.is_headers() || last_block.is_some_and(|b| b < block_start))
417 {
418 self.delete_current_and_open_previous()?;
419 } else {
420 self.writer.user_header_mut().prune(len);
422 self.writer.prune_rows(len as usize).map_err(ProviderError::other)?;
423 break
424 }
425
426 remaining_rows -= len;
427 } else {
428 self.writer.user_header_mut().prune(remaining_rows);
430
431 self.writer.prune_rows(remaining_rows as usize).map_err(ProviderError::other)?;
433 remaining_rows = 0;
434 }
435 }
436
437 if let Some(last_block) = last_block {
439 let mut expected_block_start = self.writer.user_header().expected_block_start();
440
441 if num_rows == 0 {
442 while last_block < expected_block_start {
446 self.delete_current_and_open_previous()?;
447 expected_block_start = self.writer.user_header().expected_block_start();
448 }
449 }
450 self.writer.user_header_mut().set_block_range(expected_block_start, last_block);
451 }
452
453 self.commit()?;
455
456 Ok(())
457 }
458
459 fn delete_current_and_open_previous(&mut self) -> Result<(), ProviderError> {
462 let current_path = self.data_path.clone();
463 let (previous_writer, data_path) = Self::open(
464 self.user_header().segment(),
465 self.writer.user_header().expected_block_start() - 1,
466 self.reader.clone(),
467 self.metrics.clone(),
468 )?;
469 self.writer = previous_writer;
470 self.writer.set_dirty();
471 self.data_path = data_path;
472 NippyJar::<SegmentHeader>::load(¤t_path)
473 .map_err(ProviderError::other)?
474 .delete()
475 .map_err(ProviderError::other)?;
476 Ok(())
477 }
478
479 fn append_column<T: Compact>(&mut self, column: T) -> ProviderResult<()> {
481 self.buf.clear();
482 column.to_compact(&mut self.buf);
483
484 self.writer.append_column(Some(Ok(&self.buf))).map_err(ProviderError::other)?;
485 Ok(())
486 }
487
488 fn append_with_tx_number<V: Compact>(
492 &mut self,
493 tx_num: TxNumber,
494 value: V,
495 ) -> ProviderResult<()> {
496 if let Some(range) = self.writer.user_header().tx_range() {
497 let next_tx = range.end() + 1;
498 if next_tx != tx_num {
499 return Err(ProviderError::UnexpectedStaticFileTxNumber(
500 self.writer.user_header().segment(),
501 tx_num,
502 next_tx,
503 ))
504 }
505 self.writer.user_header_mut().increment_tx();
506 } else {
507 self.writer.user_header_mut().set_tx_range(tx_num, tx_num);
508 }
509
510 self.append_column(value)?;
511
512 Ok(())
513 }
514
515 pub fn append_header(
522 &mut self,
523 header: &N::BlockHeader,
524 total_difficulty: U256,
525 hash: &BlockHash,
526 ) -> ProviderResult<()>
527 where
528 N::BlockHeader: Compact,
529 {
530 let start = Instant::now();
531 self.ensure_no_queued_prune()?;
532
533 debug_assert!(self.writer.user_header().segment() == StaticFileSegment::Headers);
534
535 self.increment_block(header.number())?;
536
537 self.append_column(header)?;
538 self.append_column(CompactU256::from(total_difficulty))?;
539 self.append_column(hash)?;
540
541 if let Some(metrics) = &self.metrics {
542 metrics.record_segment_operation(
543 StaticFileSegment::Headers,
544 StaticFileProviderOperation::Append,
545 Some(start.elapsed()),
546 );
547 }
548
549 Ok(())
550 }
551
552 pub fn append_eth_block_meta(
557 &mut self,
558 body_indices: &StoredBlockBodyIndices,
559 ommers: &StoredBlockOmmers<N::BlockHeader>,
560 withdrawals: &StoredBlockWithdrawals,
561 expected_block_number: BlockNumber,
562 ) -> ProviderResult<()>
563 where
564 N::BlockHeader: Compact,
565 {
566 self.append_block_meta(body_indices, ommers, withdrawals, expected_block_number)
567 }
568
569 pub fn append_block_meta<F1, F2>(
574 &mut self,
575 body_indices: &StoredBlockBodyIndices,
576 field1: &F1,
577 field2: &F2,
578 expected_block_number: BlockNumber,
579 ) -> ProviderResult<()>
580 where
581 N::BlockHeader: Compact,
582 F1: Compact,
583 F2: Compact,
584 {
585 let start = Instant::now();
586 self.ensure_no_queued_prune()?;
587
588 debug_assert!(self.writer.user_header().segment() == StaticFileSegment::BlockMeta);
589
590 self.increment_block(expected_block_number)?;
591
592 self.append_column(body_indices)?;
593 self.append_column(field1)?;
594 self.append_column(field2)?;
595
596 if let Some(metrics) = &self.metrics {
597 metrics.record_segment_operation(
598 StaticFileSegment::BlockMeta,
599 StaticFileProviderOperation::Append,
600 Some(start.elapsed()),
601 );
602 }
603
604 Ok(())
605 }
606
607 pub fn append_transaction(&mut self, tx_num: TxNumber, tx: &N::SignedTx) -> ProviderResult<()>
614 where
615 N::SignedTx: Compact,
616 {
617 let start = Instant::now();
618 self.ensure_no_queued_prune()?;
619
620 debug_assert!(self.writer.user_header().segment() == StaticFileSegment::Transactions);
621 self.append_with_tx_number(tx_num, tx)?;
622
623 if let Some(metrics) = &self.metrics {
624 metrics.record_segment_operation(
625 StaticFileSegment::Transactions,
626 StaticFileProviderOperation::Append,
627 Some(start.elapsed()),
628 );
629 }
630
631 Ok(())
632 }
633
634 pub fn append_receipt(&mut self, tx_num: TxNumber, receipt: &N::Receipt) -> ProviderResult<()>
641 where
642 N::Receipt: Compact,
643 {
644 let start = Instant::now();
645 self.ensure_no_queued_prune()?;
646
647 debug_assert!(self.writer.user_header().segment() == StaticFileSegment::Receipts);
648 self.append_with_tx_number(tx_num, receipt)?;
649
650 if let Some(metrics) = &self.metrics {
651 metrics.record_segment_operation(
652 StaticFileSegment::Receipts,
653 StaticFileProviderOperation::Append,
654 Some(start.elapsed()),
655 );
656 }
657
658 Ok(())
659 }
660
661 pub fn append_receipts<I, R>(&mut self, receipts: I) -> ProviderResult<Option<TxNumber>>
665 where
666 I: Iterator<Item = Result<(TxNumber, R), ProviderError>>,
667 R: Borrow<N::Receipt>,
668 N::Receipt: Compact,
669 {
670 debug_assert!(self.writer.user_header().segment() == StaticFileSegment::Receipts);
671
672 let mut receipts_iter = receipts.into_iter().peekable();
673 if receipts_iter.peek().is_none() {
675 return Ok(None);
676 }
677
678 let start = Instant::now();
679 self.ensure_no_queued_prune()?;
680
681 let mut tx_number = 0;
683 let mut count: u64 = 0;
684
685 for receipt_result in receipts_iter {
686 let (tx_num, receipt) = receipt_result?;
687 self.append_with_tx_number(tx_num, receipt.borrow())?;
688 tx_number = tx_num;
689 count += 1;
690 }
691
692 if let Some(metrics) = &self.metrics {
693 metrics.record_segment_operations(
694 StaticFileSegment::Receipts,
695 StaticFileProviderOperation::Append,
696 count,
697 Some(start.elapsed()),
698 );
699 }
700
701 Ok(Some(tx_number))
702 }
703
704 pub fn prune_transactions(
708 &mut self,
709 to_delete: u64,
710 last_block: BlockNumber,
711 ) -> ProviderResult<()> {
712 debug_assert_eq!(self.writer.user_header().segment(), StaticFileSegment::Transactions);
713 self.queue_prune(to_delete, Some(last_block))
714 }
715
716 pub fn prune_receipts(
720 &mut self,
721 to_delete: u64,
722 last_block: BlockNumber,
723 ) -> ProviderResult<()> {
724 debug_assert_eq!(self.writer.user_header().segment(), StaticFileSegment::Receipts);
725 self.queue_prune(to_delete, Some(last_block))
726 }
727
728 pub fn prune_headers(&mut self, to_delete: u64) -> ProviderResult<()> {
730 debug_assert_eq!(self.writer.user_header().segment(), StaticFileSegment::Headers);
731 self.queue_prune(to_delete, None)
732 }
733
734 pub fn prune_block_meta(&mut self, to_delete: u64) -> ProviderResult<()> {
736 debug_assert_eq!(self.writer.user_header().segment(), StaticFileSegment::BlockMeta);
737 self.queue_prune(to_delete, None)
738 }
739
740 fn queue_prune(
745 &mut self,
746 to_delete: u64,
747 last_block: Option<BlockNumber>,
748 ) -> ProviderResult<()> {
749 self.ensure_no_queued_prune()?;
750 self.prune_on_commit = Some((to_delete, last_block));
751 Ok(())
752 }
753
754 fn ensure_no_queued_prune(&self) -> ProviderResult<()> {
756 if self.prune_on_commit.is_some() {
757 return Err(ProviderError::other(StaticFileWriterError::new(
758 "Pruning should be committed before appending or pruning more data",
759 )));
760 }
761 Ok(())
762 }
763
764 fn prune_transaction_data(
766 &mut self,
767 to_delete: u64,
768 last_block: BlockNumber,
769 ) -> ProviderResult<()> {
770 let start = Instant::now();
771
772 debug_assert!(self.writer.user_header().segment() == StaticFileSegment::Transactions);
773
774 self.truncate(to_delete, Some(last_block))?;
775
776 if let Some(metrics) = &self.metrics {
777 metrics.record_segment_operation(
778 StaticFileSegment::Transactions,
779 StaticFileProviderOperation::Prune,
780 Some(start.elapsed()),
781 );
782 }
783
784 Ok(())
785 }
786
787 fn prune_receipt_data(
789 &mut self,
790 to_delete: u64,
791 last_block: BlockNumber,
792 ) -> ProviderResult<()> {
793 let start = Instant::now();
794
795 debug_assert!(self.writer.user_header().segment() == StaticFileSegment::Receipts);
796
797 self.truncate(to_delete, Some(last_block))?;
798
799 if let Some(metrics) = &self.metrics {
800 metrics.record_segment_operation(
801 StaticFileSegment::Receipts,
802 StaticFileProviderOperation::Prune,
803 Some(start.elapsed()),
804 );
805 }
806
807 Ok(())
808 }
809
810 fn prune_header_data(&mut self, to_delete: u64) -> ProviderResult<()> {
812 let start = Instant::now();
813
814 debug_assert!(self.writer.user_header().segment() == StaticFileSegment::Headers);
815
816 self.truncate(to_delete, None)?;
817
818 if let Some(metrics) = &self.metrics {
819 metrics.record_segment_operation(
820 StaticFileSegment::Headers,
821 StaticFileProviderOperation::Prune,
822 Some(start.elapsed()),
823 );
824 }
825
826 Ok(())
827 }
828
829 fn reader(&self) -> StaticFileProvider<N> {
830 Self::upgrade_provider_to_strong_reference(&self.reader)
831 }
832
833 fn upgrade_provider_to_strong_reference(
842 provider: &Weak<StaticFileProviderInner<N>>,
843 ) -> StaticFileProvider<N> {
844 provider.upgrade().map(StaticFileProvider).expect("StaticFileProvider is dropped")
845 }
846
847 pub const fn user_header(&self) -> &SegmentHeader {
849 self.writer.user_header()
850 }
851
852 pub fn user_header_mut(&mut self) -> &mut SegmentHeader {
854 self.writer.user_header_mut()
855 }
856
857 #[cfg(any(test, feature = "test-utils"))]
859 pub fn set_block_range(&mut self, block_range: std::ops::RangeInclusive<BlockNumber>) {
860 self.writer.user_header_mut().set_block_range(*block_range.start(), *block_range.end())
861 }
862
863 #[cfg(any(test, feature = "test-utils"))]
865 pub fn inner(&mut self) -> &mut NippyJarWriter<SegmentHeader> {
866 &mut self.writer
867 }
868}
869
870fn create_jar(
871 segment: StaticFileSegment,
872 path: &Path,
873 expected_block_range: SegmentRangeInclusive,
874) -> NippyJar<SegmentHeader> {
875 let mut jar = NippyJar::new(
876 segment.columns(),
877 path,
878 SegmentHeader::new(expected_block_range, None, None, segment),
879 );
880
881 if segment.is_headers() {
884 jar = jar.with_lz4();
885 }
886
887 jar
888}